diff --git a/backend/services/acs/cmd/main.go b/backend/services/acs/cmd/main.go
index 3ab03b9..53d9379 100644
--- a/backend/services/acs/cmd/main.go
+++ b/backend/services/acs/cmd/main.go
@@ -1,6 +1,7 @@
package main
import (
+ "oktopUSP/backend/services/acs/internal/bridge"
"oktopUSP/backend/services/acs/internal/config"
"oktopUSP/backend/services/acs/internal/nats"
"oktopUSP/backend/services/acs/internal/server"
@@ -15,5 +16,12 @@ func main() {
h := handler.NewHandler(natsActions.Publish, natsActions.Subscribe)
+ b := bridge.NewBridge(
+ natsActions.Publish,
+ natsActions.Subscribe,
+ h,
+ )
+ b.StartBridge()
+
server.Run(c.Acs, natsActions, h)
}
diff --git a/backend/services/acs/go.mod b/backend/services/acs/go.mod
index 5ba5c09..f57be7f 100644
--- a/backend/services/acs/go.mod
+++ b/backend/services/acs/go.mod
@@ -3,6 +3,7 @@ module oktopUSP/backend/services/acs
go 1.22.2
require (
+ github.com/google/uuid v1.6.0
github.com/joho/godotenv v1.5.1
github.com/nats-io/nats.go v1.34.1
github.com/oleiade/lane v1.0.1
diff --git a/backend/services/acs/go.sum b/backend/services/acs/go.sum
index f09390a..c223b41 100644
--- a/backend/services/acs/go.sum
+++ b/backend/services/acs/go.sum
@@ -1,3 +1,5 @@
+github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
+github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4=
diff --git a/backend/services/acs/internal/auth/auth.go b/backend/services/acs/internal/auth/auth.go
index 476936f..3b20e03 100644
--- a/backend/services/acs/internal/auth/auth.go
+++ b/backend/services/acs/internal/auth/auth.go
@@ -5,7 +5,6 @@ import (
"crypto/rand"
"encoding/base64"
"fmt"
- "io"
"net/http"
"net/url"
"strings"
@@ -36,37 +35,37 @@ func Auth(username string, password string, uri string) (bool, error) {
if err != nil {
return false, err
}
- if resp.StatusCode == 401 {
- var authorization map[string]string = DigestAuthParams(resp)
- realmHeader := authorization["realm"]
- qopHeader := authorization["qop"]
- nonceHeader := authorization["nonce"]
- opaqueHeader := authorization["opaque"]
- realm := realmHeader
- // A1
- h := md5.New()
- A1 := fmt.Sprintf("%s:%s:%s", username, realm, password)
- io.WriteString(h, A1)
- HA1 := fmt.Sprintf("%x", h.Sum(nil))
+ // if resp.StatusCode == 401 {
+ // var authorization map[string]string = DigestAuthParams(resp)
+ // realmHeader := authorization["realm"]
+ // qopHeader := authorization["qop"]
+ // nonceHeader := authorization["nonce"]
+ // opaqueHeader := authorization["opaque"]
+ // realm := realmHeader
+ // // A1
+ // h := md5.New()
+ // A1 := fmt.Sprintf("%s:%s:%s", username, realm, password)
+ // io.WriteString(h, A1)
+ // HA1 := fmt.Sprintf("%x", h.Sum(nil))
- // A2
- h = md5.New()
- A2 := fmt.Sprintf("GET:%s", "/auth")
- io.WriteString(h, A2)
- HA2 := fmt.Sprintf("%x", h.Sum(nil))
+ // // A2
+ // h = md5.New()
+ // A2 := fmt.Sprintf("GET:%s", "/auth")
+ // io.WriteString(h, A2)
+ // HA2 := fmt.Sprintf("%x", h.Sum(nil))
- // response
- cnonce := RandomKey()
- response := H(strings.Join([]string{HA1, nonceHeader, "00000001", cnonce, qopHeader, HA2}, ":"))
+ // // response
+ // cnonce := RandomKey()
+ // response := H(strings.Join([]string{HA1, nonceHeader, "00000001", cnonce, qopHeader, HA2}, ":"))
- // now make header
- AuthHeader := fmt.Sprintf(`Digest username="%s", realm="%s", nonce="%s", uri="%s", cnonce="%s", nc=00000001, qop=%s, response="%s", opaque="%s", algorithm=MD5`,
- username, realmHeader, nonceHeader, "/auth", cnonce, qopHeader, response, opaqueHeader)
- req.Header.Set("Authorization", AuthHeader)
- resp, err = client.Do(req)
- } else {
- return false, fmt.Errorf("response status code should have been 401, it was %v", resp.StatusCode)
- }
+ // // now make header
+ // AuthHeader := fmt.Sprintf(`Digest username="%s", realm="%s", nonce="%s", uri="%s", cnonce="%s", nc=00000001, qop=%s, response="%s", opaque="%s", algorithm=MD5`,
+ // username, realmHeader, nonceHeader, "/auth", cnonce, qopHeader, response, opaqueHeader)
+ // req.Header.Set("Authorization", AuthHeader)
+ // resp, err = client.Do(req)
+ // } else {
+ // return false, fmt.Errorf("response status code should have been 401, it was %v", resp.StatusCode)
+ // }
return resp.StatusCode == 200, err
}
diff --git a/backend/services/acs/internal/bridge/bridge.go b/backend/services/acs/internal/bridge/bridge.go
new file mode 100644
index 0000000..9eda542
--- /dev/null
+++ b/backend/services/acs/internal/bridge/bridge.go
@@ -0,0 +1,112 @@
+package bridge
+
+import (
+ "encoding/json"
+ "log"
+ "net/http"
+ "oktopUSP/backend/services/acs/internal/server/handler"
+ "strings"
+ "time"
+
+ "github.com/google/uuid"
+ "github.com/nats-io/nats.go"
+)
+
+type Bridge struct {
+ pub func(string, []byte) error
+ sub func(string, func(*nats.Msg)) error
+ cpes map[string]handler.CPE
+ h *handler.Handler
+}
+
+type msgAnswer struct {
+ Code int
+ Msg any
+}
+
+const DEVICE_ANSWER_TIMEOUT = 5 * time.Second
+
+func NewBridge(
+ pub func(string, []byte) error,
+ sub func(string, func(*nats.Msg)) error,
+ h *handler.Handler,
+) *Bridge {
+ return &Bridge{
+ pub: pub,
+ sub: sub,
+ cpes: h.Cpes,
+ h: h,
+ }
+}
+
+func (b *Bridge) StartBridge() {
+ b.sub(handler.NATS_CWMP_ADAPTER_SUBJECT_PREFIX+"*.api", func(msg *nats.Msg) {
+ log.Printf("Received message: %s", string(msg.Data))
+ log.Printf("Subject: %s", msg.Subject)
+ log.Printf("Reply: %s", msg.Reply)
+
+ device := getDeviceFromSubject(msg.Subject)
+ cpe, ok := b.cpes[device]
+ if !ok {
+ log.Printf("Device %s not found", device)
+ respondMsg(msg.Respond, http.StatusNotFound, "Device not found")
+ return
+ }
+ if cpe.Queue.Size() > 0 {
+ log.Printf("Device %s is busy", device)
+ respondMsg(msg.Respond, http.StatusConflict, "Device is busy")
+ return
+ }
+
+ deviceAnswer := make(chan []byte)
+ defer close(deviceAnswer)
+
+ cpe.Queue.Enqueue(handler.Request{ //TODO: pass user and password too
+ Id: uuid.NewString(),
+ CwmpMsg: msg.Data,
+ Callback: deviceAnswer,
+ })
+
+ err := b.h.ConnectionRequest(cpe)
+ if err != nil {
+ log.Println("Failed to do connection request", err)
+ cpe.Queue.Dequeue()
+ respondMsg(msg.Respond, http.StatusBadRequest, err.Error())
+ return
+ }
+
+ //req := cpe.Queue.Dequeue().(handler.Request)
+ //cpe.Waiting = &req
+
+ select {
+ case response := <-deviceAnswer:
+ log.Println("Received response from device: ", string(response))
+ respondMsg(msg.Respond, http.StatusOK, response)
+ case <-time.After(DEVICE_ANSWER_TIMEOUT):
+ log.Println("Device response timed out")
+ respondMsg(msg.Respond, http.StatusRequestTimeout, "Request timeout")
+ }
+ })
+}
+
+func respondMsg(respond func(data []byte) error, code int, msgData any) {
+
+ msg, err := json.Marshal(msgAnswer{
+ Code: code,
+ Msg: msgData,
+ })
+ if err != nil {
+ log.Printf("Failed to marshal message: %q", err)
+ respond([]byte(err.Error()))
+ return
+ }
+
+ respond(msg)
+ //log.Println("Responded with message: ", string(msg))
+}
+
+func getDeviceFromSubject(subject string) string {
+ paths := strings.Split(subject, ".")
+ device := paths[len(paths)-2]
+ return device
+}
diff --git a/backend/services/acs/internal/server/handler/cwmp.go b/backend/services/acs/internal/server/handler/cwmp.go
new file mode 100644
index 0000000..c682353
--- /dev/null
+++ b/backend/services/acs/internal/server/handler/cwmp.go
@@ -0,0 +1,164 @@
+package handler
+
+import (
+ "encoding/json"
+ "encoding/xml"
+ "fmt"
+ "io/ioutil"
+ "log"
+ "net/http"
+ "oktopUSP/backend/services/acs/internal/auth"
+ "oktopUSP/backend/services/acs/internal/cwmp"
+ "time"
+
+ "github.com/oleiade/lane"
+)
+
+func (h *Handler) CwmpHandler(w http.ResponseWriter, r *http.Request) {
+
+ log.Printf("--> Connection from %s", r.RemoteAddr)
+
+ defer r.Body.Close()
+ defer log.Printf("<-- Connection from %s closed", r.RemoteAddr)
+
+ tmp, _ := ioutil.ReadAll(r.Body)
+ body := string(tmp)
+
+ var envelope cwmp.SoapEnvelope
+ xml.Unmarshal(tmp, &envelope)
+
+ messageType := envelope.Body.CWMPMessage.XMLName.Local
+ log.Println("messageType: ", messageType)
+
+ var cpe CPE
+ var exists bool
+
+ w.Header().Set("Server", "Oktopus "+Version)
+
+ if messageType != "Inform" {
+ if cookie, err := r.Cookie("oktopus"); err == nil {
+ if cpe, exists = h.Cpes[cookie.Value]; !exists {
+ log.Printf("CPE with serial number %s not found", cookie.Value)
+ }
+ log.Printf("CPE with serial number %s found", cookie.Value)
+ } else {
+ fmt.Println("cookie 'oktopus' missing")
+ w.WriteHeader(401)
+ return
+ }
+ }
+
+ if messageType == "Inform" {
+ var Inform cwmp.CWMPInform
+ xml.Unmarshal(tmp, &Inform)
+
+ var addr string
+ if r.Header.Get("X-Real-Ip") != "" {
+ addr = r.Header.Get("X-Real-Ip")
+ } else {
+ addr = r.RemoteAddr
+ }
+
+ sn := Inform.DeviceId.SerialNumber
+
+ if _, exists := h.Cpes[sn]; !exists {
+ log.Println("New device: " + sn)
+ h.Cpes[sn] = CPE{
+ SerialNumber: sn,
+ LastConnection: time.Now().UTC(),
+ SoftwareVersion: Inform.GetSoftwareVersion(),
+ HardwareVersion: Inform.GetHardwareVersion(),
+ ExternalIPAddress: addr,
+ ConnectionRequestURL: Inform.GetConnectionRequest(),
+ OUI: Inform.DeviceId.OUI,
+ Queue: lane.NewQueue(),
+ DataModel: Inform.GetDataModelType(),
+ }
+ go h.handleCpeStatus(sn)
+ h.pub(NATS_CWMP_SUBJECT_PREFIX+sn+".info", tmp)
+ }
+ obj := h.Cpes[sn]
+ cpe := &obj
+ cpe.LastConnection = time.Now().UTC()
+
+ log.Printf("Received an Inform from device %s withEventCodes %s", addr, Inform.GetEvents())
+
+ expiration := time.Now().AddDate(0, 0, 1)
+
+ cookie := http.Cookie{Name: "oktopus", Value: sn, Expires: expiration}
+ http.SetCookie(w, &cookie)
+ data, _ := xml.Marshal(cwmp.InformResponse(envelope.Header.Id))
+ w.Write(data)
+ } else if messageType == "TransferComplete" {
+
+ } else if messageType == "GetRPC" {
+
+ } else {
+
+ if len(body) == 0 {
+ log.Println("Got Empty Post")
+ }
+
+ if cpe.Waiting != nil {
+ log.Println("CPE was waiting for a response, now received something")
+ var e cwmp.SoapEnvelope
+ xml.Unmarshal([]byte(body), &e)
+ log.Println("Kind of envelope: ", e.KindOf())
+
+ if e.KindOf() == "GetParameterNamesResponse" {
+ // var envelope cwmp.GetParameterNamesResponse
+ // xml.Unmarshal([]byte(body), &envelope)
+
+ // msg := new(NatsSendMessage)
+ // msg.MsgType = "GetParameterNamesResponse"
+ // msg.Data, _ = json.Marshal(envelope)
+ log.Println("Receive GetParameterNamesResponse from CPE:", cpe.SerialNumber)
+ cpe.Waiting.Callback <- tmp
+
+ } else if e.KindOf() == "GetParameterValuesResponse" {
+ var envelope cwmp.GetParameterValuesResponse
+ xml.Unmarshal([]byte(body), &envelope)
+
+ msg := new(NatsSendMessage)
+ msg.MsgType = "GetParameterValuesResponse"
+ msg.Data, _ = json.Marshal(envelope)
+
+ cpe.Waiting.Callback <- tmp
+
+ } else {
+ log.Println("Unknown message type")
+ cpe.Waiting.Callback <- tmp
+ }
+ cpe.Waiting = nil
+ } else {
+ log.Println("CPE was not waiting for a response")
+ }
+
+ log.Printf("CPE %s Queue size: %d", cpe.SerialNumber, cpe.Queue.Size())
+
+ if cpe.Queue.Size() > 0 {
+ req := cpe.Queue.Dequeue().(Request)
+ cpe.Waiting = &req
+ log.Println("Sending request to CPE:", req.Id)
+ w.Header().Set("Connection", "keep-alive")
+ w.Write(req.CwmpMsg)
+ } else {
+ w.Header().Set("Connection", "close")
+ w.WriteHeader(204)
+ }
+ }
+ h.Cpes[cpe.SerialNumber] = cpe
+}
+
+func (h *Handler) ConnectionRequest(cpe CPE) error {
+ log.Println("--> ConnectionRequest, CPE: ", cpe.SerialNumber)
+ // log.Println("ConnectionRequestURL: ", cpe.ConnectionRequestURL)
+ // log.Println("ConnectionRequestUsername: ", cpe.Username)
+ // log.Println("ConnectionRequestPassword: ", cpe.Password)
+
+ ok, err := auth.Auth("", "", cpe.ConnectionRequestURL)
+ if !ok {
+ log.Println("Error while authenticating to CPE: ", err)
+ }
+ return err
+}
diff --git a/backend/services/acs/internal/server/handler/handler.go b/backend/services/acs/internal/server/handler/handler.go
index f5e6225..5d1fd00 100644
--- a/backend/services/acs/internal/server/handler/handler.go
+++ b/backend/services/acs/internal/server/handler/handler.go
@@ -2,26 +2,20 @@ package handler
import (
"encoding/json"
- "encoding/xml"
- "fmt"
- "io/ioutil"
- "log"
- "net/http"
- "oktopUSP/backend/services/acs/internal/cwmp"
"time"
"github.com/nats-io/nats.go"
"github.com/oleiade/lane"
- "golang.org/x/net/websocket"
)
const Version = "1.0.0"
type Request struct {
- Id string
- Websocket *websocket.Conn
- CwmpMessage string
- Callback func(msg *WsSendMessage) error
+ Id string
+ User string
+ Password string
+ CwmpMsg []byte
+ Callback chan []byte
}
type CPE struct {
@@ -36,7 +30,8 @@ type CPE struct {
HardwareVersion string
LastConnection time.Time
DataModel string
- KeepConnectionOpen bool
+ Username string
+ Password string
}
type Message struct {
@@ -48,7 +43,7 @@ type WsMessage struct {
Cmd string
}
-type WsSendMessage struct {
+type NatsSendMessage struct {
MsgType string
Data json.RawMessage
}
@@ -60,153 +55,19 @@ type MsgCPEs struct {
type Handler struct {
pub func(string, []byte) error
sub func(string, func(*nats.Msg)) error
- cpes map[string]CPE
+ Cpes map[string]CPE
}
+const (
+ NATS_CWMP_SUBJECT_PREFIX = "cwmp.v1."
+ NATS_CWMP_ADAPTER_SUBJECT_PREFIX = "cwmp-adapter.v1."
+ NATS_ADAPTER_SUBJECT_PREFIX = "adapter.v1."
+)
+
func NewHandler(pub func(string, []byte) error, sub func(string, func(*nats.Msg)) error) *Handler {
return &Handler{
pub: pub,
sub: sub,
- cpes: make(map[string]CPE),
- }
-}
-
-func (h *Handler) CwmpHandler(w http.ResponseWriter, r *http.Request) {
-
- log.Printf("--> Connection from %s", r.RemoteAddr)
-
- defer r.Body.Close()
- defer log.Printf("<-- Connection from %s closed", r.RemoteAddr)
-
- tmp, _ := ioutil.ReadAll(r.Body)
- body := string(tmp)
- len := len(body)
-
- var envelope cwmp.SoapEnvelope
- xml.Unmarshal(tmp, &envelope)
-
- messageType := envelope.Body.CWMPMessage.XMLName.Local
-
- var cpe *CPE
-
- w.Header().Set("Server", "Oktopus "+Version)
-
- if messageType != "Inform" {
- if cookie, err := r.Cookie("oktopus"); err == nil {
- if _, exists := h.cpes[cookie.Value]; !exists {
- log.Printf("CPE with serial number %s not found", cookie.Value)
- }
- } else {
- fmt.Println("cookie 'oktopus' missing")
- w.WriteHeader(401)
- return
- }
- }
-
- if messageType == "Inform" {
- var Inform cwmp.CWMPInform
- xml.Unmarshal(tmp, &Inform)
-
- var addr string
- if r.Header.Get("X-Real-Ip") != "" {
- addr = r.Header.Get("X-Real-Ip")
- } else {
- addr = r.RemoteAddr
- }
-
- sn := Inform.DeviceId.SerialNumber
-
- if _, exists := h.cpes[sn]; !exists {
- fmt.Println("New device: " + sn)
- h.cpes[sn] = CPE{
- SerialNumber: sn,
- LastConnection: time.Now().UTC(),
- SoftwareVersion: Inform.GetSoftwareVersion(),
- HardwareVersion: Inform.GetHardwareVersion(),
- ExternalIPAddress: addr,
- ConnectionRequestURL: Inform.GetConnectionRequest(),
- OUI: Inform.DeviceId.OUI,
- Queue: lane.NewQueue(),
- DataModel: Inform.GetDataModelType(),
- KeepConnectionOpen: false,
- }
- go h.handleCpeStatus(sn)
- h.pub("cwmp.v1."+sn+".info", tmp)
- }
- obj := h.cpes[sn]
- cpe := &obj
- cpe.LastConnection = time.Now().UTC()
-
- log.Printf("Received an Inform from %s (%d bytes) with SerialNumber %s and EventCodes %s", addr, len, sn, Inform.GetEvents())
-
- expiration := time.Now().AddDate(0, 0, 1)
-
- cookie := http.Cookie{Name: "oktopus", Value: sn, Expires: expiration}
- http.SetCookie(w, &cookie)
- } else if messageType == "TransferComplete" {
-
- } else if messageType == "GetRPC" {
-
- } else {
- if len == 0 {
- log.Printf("Got Empty Post")
- }
-
- if cpe.Waiting != nil {
- var e cwmp.SoapEnvelope
- xml.Unmarshal([]byte(body), &e)
-
- if e.KindOf() == "GetParameterNamesResponse" {
- var envelope cwmp.GetParameterNamesResponse
- xml.Unmarshal([]byte(body), &envelope)
-
- msg := new(WsSendMessage)
- msg.MsgType = "GetParameterNamesResponse"
- msg.Data, _ = json.Marshal(envelope)
-
- cpe.Waiting.Callback(msg)
- // if err := websocket.JSON.Send(cpe.Waiting.Websocket, msg); err != nil {
- // fmt.Println("error while sending back answer:", err)
- // }
-
- } else if e.KindOf() == "GetParameterValuesResponse" {
- var envelope cwmp.GetParameterValuesResponse
- xml.Unmarshal([]byte(body), &envelope)
-
- msg := new(WsSendMessage)
- msg.MsgType = "GetParameterValuesResponse"
- msg.Data, _ = json.Marshal(envelope)
-
- cpe.Waiting.Callback(msg)
- // if err := websocket.JSON.Send(cpe.Waiting.Websocket, msg); err != nil {
- // fmt.Println("error while sending back answer:", err)
- // }
-
- } else {
- msg := new(WsMessage)
- msg.Cmd = body
-
- if err := websocket.JSON.Send(cpe.Waiting.Websocket, msg); err != nil {
- fmt.Println("error while sending back answer:", err)
- }
-
- }
-
- cpe.Waiting = nil
- }
-
- // Got Empty Post or a Response. Now check for any event to send, otherwise 204
- if cpe.Queue.Size() > 0 {
- req := cpe.Queue.Dequeue().(Request)
- // fmt.Println("sending "+req.CwmpMessage)
- fmt.Fprintf(w, req.CwmpMessage)
- cpe.Waiting = &req
- } else {
- if cpe.KeepConnectionOpen {
- fmt.Println("I'm keeping connection open")
- } else {
- w.WriteHeader(204)
- }
- }
+ Cpes: make(map[string]CPE),
}
}
diff --git a/backend/services/acs/internal/server/handler/status.go b/backend/services/acs/internal/server/handler/status.go
index 1402629..ec4c7a1 100644
--- a/backend/services/acs/internal/server/handler/status.go
+++ b/backend/services/acs/internal/server/handler/status.go
@@ -7,14 +7,14 @@ import (
// TODO: make these consts dynamic via config
const (
- CHECK_STATUS_INTERVAL = 5 * time.Second
- KEEP_ALIVE_INTERVAL = 10 * time.Second
+ CHECK_STATUS_INTERVAL = 10 * time.Second
+ KEEP_ALIVE_INTERVAL = 600 * time.Second
)
func (h *Handler) handleCpeStatus(cpe string) {
for {
- if time.Since(h.cpes[cpe].LastConnection) > KEEP_ALIVE_INTERVAL {
- delete(h.cpes, cpe)
+ if time.Since(h.Cpes[cpe].LastConnection) > KEEP_ALIVE_INTERVAL {
+ delete(h.Cpes, cpe)
break
}
time.Sleep(CHECK_STATUS_INTERVAL)
diff --git a/backend/services/acs/internal/server/server.go b/backend/services/acs/internal/server/server.go
index 4a2bc6b..8022e2a 100644
--- a/backend/services/acs/internal/server/server.go
+++ b/backend/services/acs/internal/server/server.go
@@ -12,7 +12,6 @@ import (
func Run(c config.Acs, natsActions nats.NatsActions, h *handler.Handler) {
http.HandleFunc(c.Route, h.CwmpHandler)
-
log.Printf("ACS running at %s%s", c.Port, c.Route)
err := http.ListenAndServe(c.Port, nil)
diff --git a/backend/services/controller/internal/api/api.go b/backend/services/controller/internal/api/api.go
index 33c25fe..bd20320 100644
--- a/backend/services/controller/internal/api/api.go
+++ b/backend/services/controller/internal/api/api.go
@@ -57,6 +57,7 @@ func (a *Api) StartApi() {
authentication.HandleFunc("/admin/exists", a.adminUserExists).Methods("GET")
iot := r.PathPrefix("/api/device").Subrouter()
iot.HandleFunc("/auth", a.deviceAuth).Methods("GET", "PUT", "DELETE")
+ iot.HandleFunc("/cwmp/{sn}/getParameterNames", a.cwmpGetParameterNamesMsg).Methods("GET", "PUT", "DELETE")
iot.HandleFunc("", a.retrieveDevices).Methods("GET")
iot.HandleFunc("/{id}", a.retrieveDevices).Methods("GET")
iot.HandleFunc("/{sn}/{mtp}/get", a.deviceGetMsg).Methods("PUT")
diff --git a/backend/services/controller/internal/api/cwmp.go b/backend/services/controller/internal/api/cwmp.go
new file mode 100644
index 0000000..6b1ba31
--- /dev/null
+++ b/backend/services/controller/internal/api/cwmp.go
@@ -0,0 +1,48 @@
+package api
+
+import (
+ "encoding/json"
+ "encoding/xml"
+ "io"
+ "net/http"
+
+ "github.com/leandrofars/oktopus/internal/bridge"
+ "github.com/leandrofars/oktopus/internal/cwmp"
+ "github.com/leandrofars/oktopus/internal/nats"
+ "github.com/leandrofars/oktopus/internal/utils"
+)
+
+func (a *Api) cwmpGetParameterNamesMsg(w http.ResponseWriter, r *http.Request) {
+ sn := getSerialNumberFromRequest(r)
+
+ payload, err := io.ReadAll(r.Body)
+ if err != nil {
+ w.WriteHeader(http.StatusBadRequest)
+ w.Write(utils.Marshall(err.Error()))
+ return
+ }
+
+ data, err := bridge.NatsCwmpInteraction(
+ nats.NATS_CWMP_ADAPTER_SUBJECT_PREFIX+sn+".api",
+ payload,
+ w,
+ a.nc,
+ )
+ if err != nil {
+ return
+ }
+
+ var response cwmp.GetParameterNamesResponse
+ err = xml.Unmarshal(data, &response)
+ if err != nil {
+ err = json.Unmarshal(data, &response)
+ if err != nil {
+ w.WriteHeader(http.StatusBadRequest)
+ w.Write(utils.Marshall(err))
+ return
+ }
+ return
+ }
+
+ w.Write(data)
+}
diff --git a/backend/services/controller/internal/bridge/bridge.go b/backend/services/controller/internal/bridge/bridge.go
index af3bc1a..319ef13 100644
--- a/backend/services/controller/internal/bridge/bridge.go
+++ b/backend/services/controller/internal/bridge/bridge.go
@@ -204,3 +204,45 @@ func NatsReqWithoutHttpSet[T entity.DataType](
return answer, nil
}
+
+func NatsCwmpInteraction(
+ subj string,
+ body []byte,
+ w http.ResponseWriter,
+ nc *nats.Conn,
+) ([]byte, error) {
+
+ log.Println("Sending cwmp message")
+ log.Println("Subject: ", subj)
+
+ var answer entity.MsgAnswer[[]byte]
+
+ msg, err := nc.Request(subj, body, local.NATS_REQUEST_TIMEOUT)
+ if err != nil {
+ log.Println(err)
+ w.WriteHeader(http.StatusInternalServerError)
+ w.Write(utils.Marshall("Error to communicate with nats: " + err.Error()))
+ return nil, err
+ }
+
+ err = json.Unmarshal(msg.Data, &answer)
+ if err != nil {
+
+ var errMsg *entity.MsgAnswer[*string]
+ err = json.Unmarshal(msg.Data, &errMsg)
+
+ if err != nil {
+ log.Println("Bad answer message formatting: ", err.Error())
+ w.WriteHeader(http.StatusInternalServerError)
+ w.Write(msg.Data)
+ return nil, err
+ }
+
+ log.Printf("Error message received, msg: %s, code: %d", *errMsg.Msg, errMsg.Code)
+ w.WriteHeader(errMsg.Code)
+ w.Write(utils.Marshall(*errMsg.Msg))
+ return nil, errNatsMsgReceivedWithErrorData
+ }
+
+ return answer.Msg, nil
+}
diff --git a/backend/services/controller/internal/cwmp/cwmp.go b/backend/services/controller/internal/cwmp/cwmp.go
new file mode 100644
index 0000000..a0aaa06
--- /dev/null
+++ b/backend/services/controller/internal/cwmp/cwmp.go
@@ -0,0 +1,542 @@
+package cwmp
+
+import (
+ "crypto/rand"
+ "encoding/xml"
+ "fmt"
+ "strconv"
+ "strings"
+ "time"
+)
+
+type SoapEnvelope struct {
+ XMLName xml.Name
+ Header SoapHeader
+ Body SoapBody
+}
+
+type SoapHeader struct {
+ Id string `xml:"ID"`
+}
+type SoapBody struct {
+ CWMPMessage CWMPMessage `xml:",any"`
+}
+
+type CWMPMessage struct {
+ XMLName xml.Name
+}
+
+type EventStruct struct {
+ EventCode string
+ CommandKey string
+}
+
+type ParameterValueStruct struct {
+ Name string
+ Value string
+}
+
+type ParameterInfoStruct struct {
+ Name string
+ Writable string
+}
+
+type SetParameterValues_ struct {
+ ParameterList []ParameterValueStruct `xml:"Body>SetParameterValues>ParameterList>ParameterValueStruct"`
+ ParameterKey string `xml:"Body>SetParameterValues>ParameterKey>string"`
+}
+
+type GetParameterValues_ struct {
+ ParameterNames []string `xml:"Body>GetParameterValues>ParameterNames>string"`
+}
+
+type GetParameterNames_ struct {
+ ParameterPath []string `xml:"Body>GetParameterNames>ParameterPath"`
+ NextLevel string `xml:"Body>GetParameterNames>NextLevel"`
+}
+
+type GetParameterValuesResponse struct {
+ ParameterList []ParameterValueStruct `xml:"Body>GetParameterValuesResponse>ParameterList>ParameterValueStruct"`
+}
+
+type GetParameterNamesResponse struct {
+ ParameterList []ParameterInfoStruct `xml:"Body>GetParameterNamesResponse>ParameterList>ParameterInfoStruct"`
+}
+
+type CWMPInform struct {
+ DeviceId DeviceID `xml:"Body>Inform>DeviceId"`
+ Events []EventStruct `xml:"Body>Inform>Event>EventStruct"`
+ ParameterList []ParameterValueStruct `xml:"Body>Inform>ParameterList>ParameterValueStruct"`
+}
+
+func (s *SoapEnvelope) KindOf() string {
+ return s.Body.CWMPMessage.XMLName.Local
+}
+
+func (i *CWMPInform) GetEvents() string {
+ res := ""
+ for idx := range i.Events {
+ res += i.Events[idx].EventCode
+ }
+
+ return res
+}
+
+func (i *CWMPInform) GetConnectionRequest() string {
+ for idx := range i.ParameterList {
+ // valid condition for both tr98 and tr181
+ if strings.HasSuffix(i.ParameterList[idx].Name, "Device.ManagementServer.ConnectionRequestURL") {
+ return i.ParameterList[idx].Value
+ }
+ }
+
+ return ""
+}
+
+func (i *CWMPInform) GetSoftwareVersion() string {
+ for idx := range i.ParameterList {
+ if strings.HasSuffix(i.ParameterList[idx].Name, "Device.DeviceInfo.SoftwareVersion") {
+ return i.ParameterList[idx].Value
+ }
+ }
+
+ return ""
+}
+
+func (i *CWMPInform) GetHardwareVersion() string {
+ for idx := range i.ParameterList {
+ if strings.HasSuffix(i.ParameterList[idx].Name, "Device.DeviceInfo.HardwareVersion") {
+ return i.ParameterList[idx].Value
+ }
+ }
+
+ return ""
+}
+
+func (i *CWMPInform) GetDataModelType() string {
+ if strings.HasPrefix(i.ParameterList[0].Name, "InternetGatewayDevice") {
+ return "TR098"
+ } else if strings.HasPrefix(i.ParameterList[0].Name, "Device") {
+ return "TR181"
+ }
+
+ return ""
+}
+
+type DeviceID struct {
+ Manufacturer string
+ OUI string
+ SerialNumber string
+ ProductClass string
+}
+
+func InformResponse(mustUnderstand string) string {
+ mustUnderstandHeader := ""
+ if mustUnderstand != "" {
+ mustUnderstandHeader = `` + mustUnderstand + ``
+ }
+
+ return `
+
+ ` + mustUnderstandHeader + `
+
+
+ 1
+
+
+`
+}
+
+func GetParameterValues(leaf string) string {
+ return `
+
+
+
+
+
+ ` + leaf + `
+
+
+
+`
+}
+
+func GetParameterMultiValues(leaves []string) string {
+ msg := `
+
+
+
+
+ `
+
+ for idx := range leaves {
+ msg += `` + leaves[idx] + ``
+
+ }
+ msg += `
+
+
+`
+ return msg
+}
+
+func SetParameterValues(leaf string, value string) string {
+ return `
+
+
+
+
+
+
+ ` + leaf + `
+ ` + value + `
+
+
+ LC1309` + randToken() + `
+
+
+`
+}
+
+func randToken() string {
+ b := make([]byte, 8)
+ rand.Read(b)
+ return fmt.Sprintf("%x", b)
+}
+
+func SetParameterMultiValues(data map[string]string) string {
+ msg := `
+
+
+
+
+ `
+
+ for key, value := range data {
+ msg += `
+ ` + key + `
+ ` + value + `
+ `
+ }
+
+ msg += `
+ LC1309` + randToken() + `
+
+
+`
+
+ return msg
+}
+
+func GetParameterNames(leaf string, nextlevel int) string {
+ return `
+
+
+
+
+ ` + leaf + `
+ ` + strconv.Itoa(nextlevel) + `
+
+
+`
+}
+
+func FactoryReset() string {
+ return `
+
+
+
+
+
+`
+}
+
+func Download(filetype, url, username, password, filesize string) string {
+ // 3 Vendor Configuration File
+ // 1 Firmware Upgrade Image
+
+ return `
+
+
+
+
+ MSDWK
+ ` + filetype + `
+ ` + url + `
+ ` + username + `
+ ` + password + `
+ ` + filesize + `
+
+ 0
+
+
+
+
+`
+}
+
+func CancelTransfer() string {
+ return `
+
+
+
+
+
+
+
+`
+}
+
+type TimeWindowStruct struct {
+ WindowStart string
+ WindowEnd string
+ WindowMode string
+ UserMessage string
+ MaxRetries string
+}
+
+func (window *TimeWindowStruct) String() string {
+ return `
+` + window.WindowStart + `
+` + window.WindowEnd + `
+` + window.WindowMode + `
+` + window.UserMessage + `
+` + window.MaxRetries + `
+`
+}
+
+func ScheduleDownload(filetype, url, username, password, filesize string, windowslist []fmt.Stringer) string {
+ ret := `
+
+
+
+
+ MSDWK
+ ` + filetype + `
+ ` + url + `
+ ` + username + `
+ ` + password + `
+ ` + filesize + `
+
+ `
+
+ for _, op := range windowslist {
+ ret += op.String()
+ }
+
+ ret += `
+
+
+`
+
+ return ret
+}
+
+type InstallOpStruct struct {
+ Url string
+ Uuid string
+ Username string
+ Password string
+ ExecutionEnvironment string
+}
+
+func (op *InstallOpStruct) String() string {
+ return `
+ ` + op.Url + `
+ ` + op.Uuid + `
+ ` + op.Username + `
+ ` + op.Password + `
+ ` + op.ExecutionEnvironment + `
+`
+}
+
+type UpdateOpStruct struct {
+ Uuid string
+ Version string
+ Url string
+ Username string
+ Password string
+}
+
+func (op *UpdateOpStruct) String() string {
+ return `
+` + op.Uuid + `
+` + op.Version + `
+` + op.Url + `
+` + op.Username + `
+` + op.Password + `
+`
+}
+
+type UninstallOpStruct struct {
+ Uuid string
+ Version string
+ ExecutionEnvironment string
+}
+
+func (op *UninstallOpStruct) String() string {
+ return `
+` + op.Uuid + `
+` + op.Version + `
+` + op.ExecutionEnvironment + `
+`
+}
+
+func ChangeDuState(ops []fmt.Stringer) string {
+ ret := `
+
+
+
+
+`
+
+ for _, op := range ops {
+ ret += op.String()
+ }
+
+ ret += `
+
+
+
+`
+
+ return ret
+}
+
+// CPE side
+
+func Inform(serial string) string {
+ return `5058
+ ADB Broadband
+0013C8
+VV5522
+` + serial + `
+
+
+6 CONNECTION REQUEST
+
+
+
+1
+` + time.Now().Format(time.RFC3339) + `
+0
+
+InternetGatewayDevice.ManagementServer.ConnectionRequestURL
+http://104.199.175.27:7547/ConnectionRequest-` + serial + `
+
+InternetGatewayDevice.ManagementServer.ParameterKey
+
+
+InternetGatewayDevice.DeviceSummary
+InternetGatewayDevice:1.2[](Baseline:1,EthernetLAN:1,WiFiLAN:1,ADSLWAN:1,EthernetWAN:1,QoS:1,QoSDynamicFlow:1,Bridging:1,Time:1,IPPing:1,TraceRoute:1,DeviceAssociation:1,UDPConnReq:1),VoiceService:1.0[1](TAEndpoint:1,SIPEndpoint:1)
+
+InternetGatewayDevice.DeviceInfo.HardwareVersion
+` + serial + `
+
+InternetGatewayDevice.DeviceInfo.ProvisioningCode
+ABCD
+
+InternetGatewayDevice.DeviceInfo.SoftwareVersion
+4.0.8.17785
+
+InternetGatewayDevice.DeviceInfo.SpecVersion
+1.0
+
+InternetGatewayDevice.WANDevice.1.WANConnectionDevice.1.WANIPConnection.1.ExternalIPAddress
+12.0.0.10
+
+
+
+`
+}
+
+/*
+func BuildGetParameterValuesResponse(serial string, leaves GetParameterValues_) string {
+ ret := `
+ 3
+ `
+
+ db, _ := sqlite3.Open("/tmp/cpe.db")
+
+ n_leaves := 0
+ var temp string
+ for _, leaf := range leaves.ParameterNames {
+ sql := "select key, value, tipo from params where key like '" + leaf + "%'"
+ for s, err := db.Query(sql); err == nil; err = s.Next() {
+ n_leaves++
+ var key string
+ var value string
+ var tipo string
+ s.Scan(&key, &value, &tipo)
+ temp += `
+ ` + key + `
+ ` + value + `
+ `
+ }
+ }
+
+ ret += ``
+ ret += temp
+ ret += ``
+
+ return ret
+}
+
+func BuildGetParameterNamesResponse(serial string, leaves GetParameterNames_) string {
+ ret := `
+ 69
+ `
+ db, _ := sqlite3.Open("/tmp/cpe.db")
+
+ obj := make(map[string]bool)
+ var temp string
+ for _, leaf := range leaves.ParameterPath {
+ fmt.Println(leaf)
+ sql := "select key, value, tipo from params where key like '" + leaf + "%'"
+ for s, err := db.Query(sql); err == nil; err = s.Next() {
+ var key string
+ var value string
+ var tipo string
+ s.Scan(&key, &value, &tipo)
+ var sp = strings.Split(strings.Split(key, leaf)[1], ".")
+ nextlevel, _ := strconv.Atoi(leaves.NextLevel)
+ if nextlevel == 0 {
+ root := leaf
+ obj[root] = true
+ for idx := range sp {
+ if idx == len(sp)-1 {
+ root = root + sp[idx]
+ } else {
+ root = root + sp[idx] + "."
+ }
+ obj[root] = true
+ }
+ } else {
+ if !obj[sp[0]] {
+ if len(sp) > 1 {
+ obj[leaf+sp[0]+"."] = true
+ } else {
+ obj[leaf+sp[0]] = true
+ }
+
+ }
+ }
+
+ }
+ }
+
+ for o := range obj {
+ temp += `
+ ` + o + `
+ true
+ `
+ }
+
+ fmt.Println(len(obj))
+ ret += ``
+ ret += temp
+ ret += ``
+
+ return ret
+}
+*/
diff --git a/backend/services/controller/internal/entity/msg.go b/backend/services/controller/internal/entity/msg.go
index a3e7eda..768e4bb 100644
--- a/backend/services/controller/internal/entity/msg.go
+++ b/backend/services/controller/internal/entity/msg.go
@@ -3,7 +3,7 @@ package entity
import "time"
type DataType interface {
- []map[string]interface{} | *string | Device | int64 | []Device | []VendorsCount | []ProductClassCount | []StatusCount | time.Duration
+ []map[string]interface{} | *string | Device | int64 | []Device | []VendorsCount | []ProductClassCount | []StatusCount | time.Duration | []byte
}
type MsgAnswer[T DataType] struct {
diff --git a/backend/services/controller/internal/nats/nats.go b/backend/services/controller/internal/nats/nats.go
index f2b4998..3786604 100644
--- a/backend/services/controller/internal/nats/nats.go
+++ b/backend/services/controller/internal/nats/nats.go
@@ -19,8 +19,10 @@ const (
NATS_WS_ADAPTER_SUBJECT_PREFIX = "ws-adapter.usp.v1."
NATS_STOMP_ADAPTER_SUBJECT_PREFIX = "stomp-adapter.usp.v1."
DEVICE_SUBJECT_PREFIX = "device.usp.v1."
+ DEVICE_CWMP_SUBJECT_PREFIX = "device.cwmp.v1."
BUCKET_NAME = "devices-auth"
BUCKET_DESCRIPTION = "Devices authentication"
+ NATS_CWMP_ADAPTER_SUBJECT_PREFIX = "cwmp-adapter.v1."
)
func StartNatsClient(c config.Nats) (jetstream.JetStream, *nats.Conn, jetstream.KeyValue) {