From 406d62550f119b1af4321f4eb6b1650c551cfc27 Mon Sep 17 00:00:00 2001 From: leandrofars Date: Sat, 20 Apr 2024 11:20:38 -0300 Subject: [PATCH] feat: initial api for cwmp --- backend/services/acs/cmd/main.go | 8 + backend/services/acs/go.mod | 1 + backend/services/acs/go.sum | 2 + backend/services/acs/internal/auth/auth.go | 57 +- .../services/acs/internal/bridge/bridge.go | 112 ++++ .../acs/internal/server/handler/cwmp.go | 164 ++++++ .../acs/internal/server/handler/handler.go | 171 +----- .../acs/internal/server/handler/status.go | 8 +- .../services/acs/internal/server/server.go | 1 - .../services/controller/internal/api/api.go | 1 + .../services/controller/internal/api/cwmp.go | 48 ++ .../controller/internal/bridge/bridge.go | 42 ++ .../services/controller/internal/cwmp/cwmp.go | 542 ++++++++++++++++++ .../controller/internal/entity/msg.go | 2 +- .../services/controller/internal/nats/nats.go | 2 + 15 files changed, 971 insertions(+), 190 deletions(-) create mode 100644 backend/services/acs/internal/bridge/bridge.go create mode 100644 backend/services/acs/internal/server/handler/cwmp.go create mode 100644 backend/services/controller/internal/api/cwmp.go create mode 100644 backend/services/controller/internal/cwmp/cwmp.go diff --git a/backend/services/acs/cmd/main.go b/backend/services/acs/cmd/main.go index 3ab03b9..53d9379 100644 --- a/backend/services/acs/cmd/main.go +++ b/backend/services/acs/cmd/main.go @@ -1,6 +1,7 @@ package main import ( + "oktopUSP/backend/services/acs/internal/bridge" "oktopUSP/backend/services/acs/internal/config" "oktopUSP/backend/services/acs/internal/nats" "oktopUSP/backend/services/acs/internal/server" @@ -15,5 +16,12 @@ func main() { h := handler.NewHandler(natsActions.Publish, natsActions.Subscribe) + b := bridge.NewBridge( + natsActions.Publish, + natsActions.Subscribe, + h, + ) + b.StartBridge() + server.Run(c.Acs, natsActions, h) } diff --git a/backend/services/acs/go.mod b/backend/services/acs/go.mod index 5ba5c09..f57be7f 100644 --- a/backend/services/acs/go.mod +++ b/backend/services/acs/go.mod @@ -3,6 +3,7 @@ module oktopUSP/backend/services/acs go 1.22.2 require ( + github.com/google/uuid v1.6.0 github.com/joho/godotenv v1.5.1 github.com/nats-io/nats.go v1.34.1 github.com/oleiade/lane v1.0.1 diff --git a/backend/services/acs/go.sum b/backend/services/acs/go.sum index f09390a..c223b41 100644 --- a/backend/services/acs/go.sum +++ b/backend/services/acs/go.sum @@ -1,3 +1,5 @@ +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4= diff --git a/backend/services/acs/internal/auth/auth.go b/backend/services/acs/internal/auth/auth.go index 476936f..3b20e03 100644 --- a/backend/services/acs/internal/auth/auth.go +++ b/backend/services/acs/internal/auth/auth.go @@ -5,7 +5,6 @@ import ( "crypto/rand" "encoding/base64" "fmt" - "io" "net/http" "net/url" "strings" @@ -36,37 +35,37 @@ func Auth(username string, password string, uri string) (bool, error) { if err != nil { return false, err } - if resp.StatusCode == 401 { - var authorization map[string]string = DigestAuthParams(resp) - realmHeader := authorization["realm"] - qopHeader := authorization["qop"] - nonceHeader := authorization["nonce"] - opaqueHeader := authorization["opaque"] - realm := realmHeader - // A1 - h := md5.New() - A1 := fmt.Sprintf("%s:%s:%s", username, realm, password) - io.WriteString(h, A1) - HA1 := fmt.Sprintf("%x", h.Sum(nil)) + // if resp.StatusCode == 401 { + // var authorization map[string]string = DigestAuthParams(resp) + // realmHeader := authorization["realm"] + // qopHeader := authorization["qop"] + // nonceHeader := authorization["nonce"] + // opaqueHeader := authorization["opaque"] + // realm := realmHeader + // // A1 + // h := md5.New() + // A1 := fmt.Sprintf("%s:%s:%s", username, realm, password) + // io.WriteString(h, A1) + // HA1 := fmt.Sprintf("%x", h.Sum(nil)) - // A2 - h = md5.New() - A2 := fmt.Sprintf("GET:%s", "/auth") - io.WriteString(h, A2) - HA2 := fmt.Sprintf("%x", h.Sum(nil)) + // // A2 + // h = md5.New() + // A2 := fmt.Sprintf("GET:%s", "/auth") + // io.WriteString(h, A2) + // HA2 := fmt.Sprintf("%x", h.Sum(nil)) - // response - cnonce := RandomKey() - response := H(strings.Join([]string{HA1, nonceHeader, "00000001", cnonce, qopHeader, HA2}, ":")) + // // response + // cnonce := RandomKey() + // response := H(strings.Join([]string{HA1, nonceHeader, "00000001", cnonce, qopHeader, HA2}, ":")) - // now make header - AuthHeader := fmt.Sprintf(`Digest username="%s", realm="%s", nonce="%s", uri="%s", cnonce="%s", nc=00000001, qop=%s, response="%s", opaque="%s", algorithm=MD5`, - username, realmHeader, nonceHeader, "/auth", cnonce, qopHeader, response, opaqueHeader) - req.Header.Set("Authorization", AuthHeader) - resp, err = client.Do(req) - } else { - return false, fmt.Errorf("response status code should have been 401, it was %v", resp.StatusCode) - } + // // now make header + // AuthHeader := fmt.Sprintf(`Digest username="%s", realm="%s", nonce="%s", uri="%s", cnonce="%s", nc=00000001, qop=%s, response="%s", opaque="%s", algorithm=MD5`, + // username, realmHeader, nonceHeader, "/auth", cnonce, qopHeader, response, opaqueHeader) + // req.Header.Set("Authorization", AuthHeader) + // resp, err = client.Do(req) + // } else { + // return false, fmt.Errorf("response status code should have been 401, it was %v", resp.StatusCode) + // } return resp.StatusCode == 200, err } diff --git a/backend/services/acs/internal/bridge/bridge.go b/backend/services/acs/internal/bridge/bridge.go new file mode 100644 index 0000000..9eda542 --- /dev/null +++ b/backend/services/acs/internal/bridge/bridge.go @@ -0,0 +1,112 @@ +package bridge + +import ( + "encoding/json" + "log" + "net/http" + "oktopUSP/backend/services/acs/internal/server/handler" + "strings" + "time" + + "github.com/google/uuid" + "github.com/nats-io/nats.go" +) + +type Bridge struct { + pub func(string, []byte) error + sub func(string, func(*nats.Msg)) error + cpes map[string]handler.CPE + h *handler.Handler +} + +type msgAnswer struct { + Code int + Msg any +} + +const DEVICE_ANSWER_TIMEOUT = 5 * time.Second + +func NewBridge( + pub func(string, []byte) error, + sub func(string, func(*nats.Msg)) error, + h *handler.Handler, +) *Bridge { + return &Bridge{ + pub: pub, + sub: sub, + cpes: h.Cpes, + h: h, + } +} + +func (b *Bridge) StartBridge() { + b.sub(handler.NATS_CWMP_ADAPTER_SUBJECT_PREFIX+"*.api", func(msg *nats.Msg) { + log.Printf("Received message: %s", string(msg.Data)) + log.Printf("Subject: %s", msg.Subject) + log.Printf("Reply: %s", msg.Reply) + + device := getDeviceFromSubject(msg.Subject) + cpe, ok := b.cpes[device] + if !ok { + log.Printf("Device %s not found", device) + respondMsg(msg.Respond, http.StatusNotFound, "Device not found") + return + } + if cpe.Queue.Size() > 0 { + log.Printf("Device %s is busy", device) + respondMsg(msg.Respond, http.StatusConflict, "Device is busy") + return + } + + deviceAnswer := make(chan []byte) + defer close(deviceAnswer) + + cpe.Queue.Enqueue(handler.Request{ //TODO: pass user and password too + Id: uuid.NewString(), + CwmpMsg: msg.Data, + Callback: deviceAnswer, + }) + + err := b.h.ConnectionRequest(cpe) + if err != nil { + log.Println("Failed to do connection request", err) + cpe.Queue.Dequeue() + respondMsg(msg.Respond, http.StatusBadRequest, err.Error()) + return + } + + //req := cpe.Queue.Dequeue().(handler.Request) + //cpe.Waiting = &req + + select { + case response := <-deviceAnswer: + log.Println("Received response from device: ", string(response)) + respondMsg(msg.Respond, http.StatusOK, response) + case <-time.After(DEVICE_ANSWER_TIMEOUT): + log.Println("Device response timed out") + respondMsg(msg.Respond, http.StatusRequestTimeout, "Request timeout") + } + }) +} + +func respondMsg(respond func(data []byte) error, code int, msgData any) { + + msg, err := json.Marshal(msgAnswer{ + Code: code, + Msg: msgData, + }) + if err != nil { + log.Printf("Failed to marshal message: %q", err) + respond([]byte(err.Error())) + return + } + + respond(msg) + //log.Println("Responded with message: ", string(msg)) +} + +func getDeviceFromSubject(subject string) string { + paths := strings.Split(subject, ".") + device := paths[len(paths)-2] + return device +} diff --git a/backend/services/acs/internal/server/handler/cwmp.go b/backend/services/acs/internal/server/handler/cwmp.go new file mode 100644 index 0000000..c682353 --- /dev/null +++ b/backend/services/acs/internal/server/handler/cwmp.go @@ -0,0 +1,164 @@ +package handler + +import ( + "encoding/json" + "encoding/xml" + "fmt" + "io/ioutil" + "log" + "net/http" + "oktopUSP/backend/services/acs/internal/auth" + "oktopUSP/backend/services/acs/internal/cwmp" + "time" + + "github.com/oleiade/lane" +) + +func (h *Handler) CwmpHandler(w http.ResponseWriter, r *http.Request) { + + log.Printf("--> Connection from %s", r.RemoteAddr) + + defer r.Body.Close() + defer log.Printf("<-- Connection from %s closed", r.RemoteAddr) + + tmp, _ := ioutil.ReadAll(r.Body) + body := string(tmp) + + var envelope cwmp.SoapEnvelope + xml.Unmarshal(tmp, &envelope) + + messageType := envelope.Body.CWMPMessage.XMLName.Local + log.Println("messageType: ", messageType) + + var cpe CPE + var exists bool + + w.Header().Set("Server", "Oktopus "+Version) + + if messageType != "Inform" { + if cookie, err := r.Cookie("oktopus"); err == nil { + if cpe, exists = h.Cpes[cookie.Value]; !exists { + log.Printf("CPE with serial number %s not found", cookie.Value) + } + log.Printf("CPE with serial number %s found", cookie.Value) + } else { + fmt.Println("cookie 'oktopus' missing") + w.WriteHeader(401) + return + } + } + + if messageType == "Inform" { + var Inform cwmp.CWMPInform + xml.Unmarshal(tmp, &Inform) + + var addr string + if r.Header.Get("X-Real-Ip") != "" { + addr = r.Header.Get("X-Real-Ip") + } else { + addr = r.RemoteAddr + } + + sn := Inform.DeviceId.SerialNumber + + if _, exists := h.Cpes[sn]; !exists { + log.Println("New device: " + sn) + h.Cpes[sn] = CPE{ + SerialNumber: sn, + LastConnection: time.Now().UTC(), + SoftwareVersion: Inform.GetSoftwareVersion(), + HardwareVersion: Inform.GetHardwareVersion(), + ExternalIPAddress: addr, + ConnectionRequestURL: Inform.GetConnectionRequest(), + OUI: Inform.DeviceId.OUI, + Queue: lane.NewQueue(), + DataModel: Inform.GetDataModelType(), + } + go h.handleCpeStatus(sn) + h.pub(NATS_CWMP_SUBJECT_PREFIX+sn+".info", tmp) + } + obj := h.Cpes[sn] + cpe := &obj + cpe.LastConnection = time.Now().UTC() + + log.Printf("Received an Inform from device %s withEventCodes %s", addr, Inform.GetEvents()) + + expiration := time.Now().AddDate(0, 0, 1) + + cookie := http.Cookie{Name: "oktopus", Value: sn, Expires: expiration} + http.SetCookie(w, &cookie) + data, _ := xml.Marshal(cwmp.InformResponse(envelope.Header.Id)) + w.Write(data) + } else if messageType == "TransferComplete" { + + } else if messageType == "GetRPC" { + + } else { + + if len(body) == 0 { + log.Println("Got Empty Post") + } + + if cpe.Waiting != nil { + log.Println("CPE was waiting for a response, now received something") + var e cwmp.SoapEnvelope + xml.Unmarshal([]byte(body), &e) + log.Println("Kind of envelope: ", e.KindOf()) + + if e.KindOf() == "GetParameterNamesResponse" { + // var envelope cwmp.GetParameterNamesResponse + // xml.Unmarshal([]byte(body), &envelope) + + // msg := new(NatsSendMessage) + // msg.MsgType = "GetParameterNamesResponse" + // msg.Data, _ = json.Marshal(envelope) + log.Println("Receive GetParameterNamesResponse from CPE:", cpe.SerialNumber) + cpe.Waiting.Callback <- tmp + + } else if e.KindOf() == "GetParameterValuesResponse" { + var envelope cwmp.GetParameterValuesResponse + xml.Unmarshal([]byte(body), &envelope) + + msg := new(NatsSendMessage) + msg.MsgType = "GetParameterValuesResponse" + msg.Data, _ = json.Marshal(envelope) + + cpe.Waiting.Callback <- tmp + + } else { + log.Println("Unknown message type") + cpe.Waiting.Callback <- tmp + } + cpe.Waiting = nil + } else { + log.Println("CPE was not waiting for a response") + } + + log.Printf("CPE %s Queue size: %d", cpe.SerialNumber, cpe.Queue.Size()) + + if cpe.Queue.Size() > 0 { + req := cpe.Queue.Dequeue().(Request) + cpe.Waiting = &req + log.Println("Sending request to CPE:", req.Id) + w.Header().Set("Connection", "keep-alive") + w.Write(req.CwmpMsg) + } else { + w.Header().Set("Connection", "close") + w.WriteHeader(204) + } + } + h.Cpes[cpe.SerialNumber] = cpe +} + +func (h *Handler) ConnectionRequest(cpe CPE) error { + log.Println("--> ConnectionRequest, CPE: ", cpe.SerialNumber) + // log.Println("ConnectionRequestURL: ", cpe.ConnectionRequestURL) + // log.Println("ConnectionRequestUsername: ", cpe.Username) + // log.Println("ConnectionRequestPassword: ", cpe.Password) + + ok, err := auth.Auth("", "", cpe.ConnectionRequestURL) + if !ok { + log.Println("Error while authenticating to CPE: ", err) + } + return err +} diff --git a/backend/services/acs/internal/server/handler/handler.go b/backend/services/acs/internal/server/handler/handler.go index f5e6225..5d1fd00 100644 --- a/backend/services/acs/internal/server/handler/handler.go +++ b/backend/services/acs/internal/server/handler/handler.go @@ -2,26 +2,20 @@ package handler import ( "encoding/json" - "encoding/xml" - "fmt" - "io/ioutil" - "log" - "net/http" - "oktopUSP/backend/services/acs/internal/cwmp" "time" "github.com/nats-io/nats.go" "github.com/oleiade/lane" - "golang.org/x/net/websocket" ) const Version = "1.0.0" type Request struct { - Id string - Websocket *websocket.Conn - CwmpMessage string - Callback func(msg *WsSendMessage) error + Id string + User string + Password string + CwmpMsg []byte + Callback chan []byte } type CPE struct { @@ -36,7 +30,8 @@ type CPE struct { HardwareVersion string LastConnection time.Time DataModel string - KeepConnectionOpen bool + Username string + Password string } type Message struct { @@ -48,7 +43,7 @@ type WsMessage struct { Cmd string } -type WsSendMessage struct { +type NatsSendMessage struct { MsgType string Data json.RawMessage } @@ -60,153 +55,19 @@ type MsgCPEs struct { type Handler struct { pub func(string, []byte) error sub func(string, func(*nats.Msg)) error - cpes map[string]CPE + Cpes map[string]CPE } +const ( + NATS_CWMP_SUBJECT_PREFIX = "cwmp.v1." + NATS_CWMP_ADAPTER_SUBJECT_PREFIX = "cwmp-adapter.v1." + NATS_ADAPTER_SUBJECT_PREFIX = "adapter.v1." +) + func NewHandler(pub func(string, []byte) error, sub func(string, func(*nats.Msg)) error) *Handler { return &Handler{ pub: pub, sub: sub, - cpes: make(map[string]CPE), - } -} - -func (h *Handler) CwmpHandler(w http.ResponseWriter, r *http.Request) { - - log.Printf("--> Connection from %s", r.RemoteAddr) - - defer r.Body.Close() - defer log.Printf("<-- Connection from %s closed", r.RemoteAddr) - - tmp, _ := ioutil.ReadAll(r.Body) - body := string(tmp) - len := len(body) - - var envelope cwmp.SoapEnvelope - xml.Unmarshal(tmp, &envelope) - - messageType := envelope.Body.CWMPMessage.XMLName.Local - - var cpe *CPE - - w.Header().Set("Server", "Oktopus "+Version) - - if messageType != "Inform" { - if cookie, err := r.Cookie("oktopus"); err == nil { - if _, exists := h.cpes[cookie.Value]; !exists { - log.Printf("CPE with serial number %s not found", cookie.Value) - } - } else { - fmt.Println("cookie 'oktopus' missing") - w.WriteHeader(401) - return - } - } - - if messageType == "Inform" { - var Inform cwmp.CWMPInform - xml.Unmarshal(tmp, &Inform) - - var addr string - if r.Header.Get("X-Real-Ip") != "" { - addr = r.Header.Get("X-Real-Ip") - } else { - addr = r.RemoteAddr - } - - sn := Inform.DeviceId.SerialNumber - - if _, exists := h.cpes[sn]; !exists { - fmt.Println("New device: " + sn) - h.cpes[sn] = CPE{ - SerialNumber: sn, - LastConnection: time.Now().UTC(), - SoftwareVersion: Inform.GetSoftwareVersion(), - HardwareVersion: Inform.GetHardwareVersion(), - ExternalIPAddress: addr, - ConnectionRequestURL: Inform.GetConnectionRequest(), - OUI: Inform.DeviceId.OUI, - Queue: lane.NewQueue(), - DataModel: Inform.GetDataModelType(), - KeepConnectionOpen: false, - } - go h.handleCpeStatus(sn) - h.pub("cwmp.v1."+sn+".info", tmp) - } - obj := h.cpes[sn] - cpe := &obj - cpe.LastConnection = time.Now().UTC() - - log.Printf("Received an Inform from %s (%d bytes) with SerialNumber %s and EventCodes %s", addr, len, sn, Inform.GetEvents()) - - expiration := time.Now().AddDate(0, 0, 1) - - cookie := http.Cookie{Name: "oktopus", Value: sn, Expires: expiration} - http.SetCookie(w, &cookie) - } else if messageType == "TransferComplete" { - - } else if messageType == "GetRPC" { - - } else { - if len == 0 { - log.Printf("Got Empty Post") - } - - if cpe.Waiting != nil { - var e cwmp.SoapEnvelope - xml.Unmarshal([]byte(body), &e) - - if e.KindOf() == "GetParameterNamesResponse" { - var envelope cwmp.GetParameterNamesResponse - xml.Unmarshal([]byte(body), &envelope) - - msg := new(WsSendMessage) - msg.MsgType = "GetParameterNamesResponse" - msg.Data, _ = json.Marshal(envelope) - - cpe.Waiting.Callback(msg) - // if err := websocket.JSON.Send(cpe.Waiting.Websocket, msg); err != nil { - // fmt.Println("error while sending back answer:", err) - // } - - } else if e.KindOf() == "GetParameterValuesResponse" { - var envelope cwmp.GetParameterValuesResponse - xml.Unmarshal([]byte(body), &envelope) - - msg := new(WsSendMessage) - msg.MsgType = "GetParameterValuesResponse" - msg.Data, _ = json.Marshal(envelope) - - cpe.Waiting.Callback(msg) - // if err := websocket.JSON.Send(cpe.Waiting.Websocket, msg); err != nil { - // fmt.Println("error while sending back answer:", err) - // } - - } else { - msg := new(WsMessage) - msg.Cmd = body - - if err := websocket.JSON.Send(cpe.Waiting.Websocket, msg); err != nil { - fmt.Println("error while sending back answer:", err) - } - - } - - cpe.Waiting = nil - } - - // Got Empty Post or a Response. Now check for any event to send, otherwise 204 - if cpe.Queue.Size() > 0 { - req := cpe.Queue.Dequeue().(Request) - // fmt.Println("sending "+req.CwmpMessage) - fmt.Fprintf(w, req.CwmpMessage) - cpe.Waiting = &req - } else { - if cpe.KeepConnectionOpen { - fmt.Println("I'm keeping connection open") - } else { - w.WriteHeader(204) - } - } + Cpes: make(map[string]CPE), } } diff --git a/backend/services/acs/internal/server/handler/status.go b/backend/services/acs/internal/server/handler/status.go index 1402629..ec4c7a1 100644 --- a/backend/services/acs/internal/server/handler/status.go +++ b/backend/services/acs/internal/server/handler/status.go @@ -7,14 +7,14 @@ import ( // TODO: make these consts dynamic via config const ( - CHECK_STATUS_INTERVAL = 5 * time.Second - KEEP_ALIVE_INTERVAL = 10 * time.Second + CHECK_STATUS_INTERVAL = 10 * time.Second + KEEP_ALIVE_INTERVAL = 600 * time.Second ) func (h *Handler) handleCpeStatus(cpe string) { for { - if time.Since(h.cpes[cpe].LastConnection) > KEEP_ALIVE_INTERVAL { - delete(h.cpes, cpe) + if time.Since(h.Cpes[cpe].LastConnection) > KEEP_ALIVE_INTERVAL { + delete(h.Cpes, cpe) break } time.Sleep(CHECK_STATUS_INTERVAL) diff --git a/backend/services/acs/internal/server/server.go b/backend/services/acs/internal/server/server.go index 4a2bc6b..8022e2a 100644 --- a/backend/services/acs/internal/server/server.go +++ b/backend/services/acs/internal/server/server.go @@ -12,7 +12,6 @@ import ( func Run(c config.Acs, natsActions nats.NatsActions, h *handler.Handler) { http.HandleFunc(c.Route, h.CwmpHandler) - log.Printf("ACS running at %s%s", c.Port, c.Route) err := http.ListenAndServe(c.Port, nil) diff --git a/backend/services/controller/internal/api/api.go b/backend/services/controller/internal/api/api.go index 33c25fe..bd20320 100644 --- a/backend/services/controller/internal/api/api.go +++ b/backend/services/controller/internal/api/api.go @@ -57,6 +57,7 @@ func (a *Api) StartApi() { authentication.HandleFunc("/admin/exists", a.adminUserExists).Methods("GET") iot := r.PathPrefix("/api/device").Subrouter() iot.HandleFunc("/auth", a.deviceAuth).Methods("GET", "PUT", "DELETE") + iot.HandleFunc("/cwmp/{sn}/getParameterNames", a.cwmpGetParameterNamesMsg).Methods("GET", "PUT", "DELETE") iot.HandleFunc("", a.retrieveDevices).Methods("GET") iot.HandleFunc("/{id}", a.retrieveDevices).Methods("GET") iot.HandleFunc("/{sn}/{mtp}/get", a.deviceGetMsg).Methods("PUT") diff --git a/backend/services/controller/internal/api/cwmp.go b/backend/services/controller/internal/api/cwmp.go new file mode 100644 index 0000000..6b1ba31 --- /dev/null +++ b/backend/services/controller/internal/api/cwmp.go @@ -0,0 +1,48 @@ +package api + +import ( + "encoding/json" + "encoding/xml" + "io" + "net/http" + + "github.com/leandrofars/oktopus/internal/bridge" + "github.com/leandrofars/oktopus/internal/cwmp" + "github.com/leandrofars/oktopus/internal/nats" + "github.com/leandrofars/oktopus/internal/utils" +) + +func (a *Api) cwmpGetParameterNamesMsg(w http.ResponseWriter, r *http.Request) { + sn := getSerialNumberFromRequest(r) + + payload, err := io.ReadAll(r.Body) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + w.Write(utils.Marshall(err.Error())) + return + } + + data, err := bridge.NatsCwmpInteraction( + nats.NATS_CWMP_ADAPTER_SUBJECT_PREFIX+sn+".api", + payload, + w, + a.nc, + ) + if err != nil { + return + } + + var response cwmp.GetParameterNamesResponse + err = xml.Unmarshal(data, &response) + if err != nil { + err = json.Unmarshal(data, &response) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + w.Write(utils.Marshall(err)) + return + } + return + } + + w.Write(data) +} diff --git a/backend/services/controller/internal/bridge/bridge.go b/backend/services/controller/internal/bridge/bridge.go index af3bc1a..319ef13 100644 --- a/backend/services/controller/internal/bridge/bridge.go +++ b/backend/services/controller/internal/bridge/bridge.go @@ -204,3 +204,45 @@ func NatsReqWithoutHttpSet[T entity.DataType]( return answer, nil } + +func NatsCwmpInteraction( + subj string, + body []byte, + w http.ResponseWriter, + nc *nats.Conn, +) ([]byte, error) { + + log.Println("Sending cwmp message") + log.Println("Subject: ", subj) + + var answer entity.MsgAnswer[[]byte] + + msg, err := nc.Request(subj, body, local.NATS_REQUEST_TIMEOUT) + if err != nil { + log.Println(err) + w.WriteHeader(http.StatusInternalServerError) + w.Write(utils.Marshall("Error to communicate with nats: " + err.Error())) + return nil, err + } + + err = json.Unmarshal(msg.Data, &answer) + if err != nil { + + var errMsg *entity.MsgAnswer[*string] + err = json.Unmarshal(msg.Data, &errMsg) + + if err != nil { + log.Println("Bad answer message formatting: ", err.Error()) + w.WriteHeader(http.StatusInternalServerError) + w.Write(msg.Data) + return nil, err + } + + log.Printf("Error message received, msg: %s, code: %d", *errMsg.Msg, errMsg.Code) + w.WriteHeader(errMsg.Code) + w.Write(utils.Marshall(*errMsg.Msg)) + return nil, errNatsMsgReceivedWithErrorData + } + + return answer.Msg, nil +} diff --git a/backend/services/controller/internal/cwmp/cwmp.go b/backend/services/controller/internal/cwmp/cwmp.go new file mode 100644 index 0000000..a0aaa06 --- /dev/null +++ b/backend/services/controller/internal/cwmp/cwmp.go @@ -0,0 +1,542 @@ +package cwmp + +import ( + "crypto/rand" + "encoding/xml" + "fmt" + "strconv" + "strings" + "time" +) + +type SoapEnvelope struct { + XMLName xml.Name + Header SoapHeader + Body SoapBody +} + +type SoapHeader struct { + Id string `xml:"ID"` +} +type SoapBody struct { + CWMPMessage CWMPMessage `xml:",any"` +} + +type CWMPMessage struct { + XMLName xml.Name +} + +type EventStruct struct { + EventCode string + CommandKey string +} + +type ParameterValueStruct struct { + Name string + Value string +} + +type ParameterInfoStruct struct { + Name string + Writable string +} + +type SetParameterValues_ struct { + ParameterList []ParameterValueStruct `xml:"Body>SetParameterValues>ParameterList>ParameterValueStruct"` + ParameterKey string `xml:"Body>SetParameterValues>ParameterKey>string"` +} + +type GetParameterValues_ struct { + ParameterNames []string `xml:"Body>GetParameterValues>ParameterNames>string"` +} + +type GetParameterNames_ struct { + ParameterPath []string `xml:"Body>GetParameterNames>ParameterPath"` + NextLevel string `xml:"Body>GetParameterNames>NextLevel"` +} + +type GetParameterValuesResponse struct { + ParameterList []ParameterValueStruct `xml:"Body>GetParameterValuesResponse>ParameterList>ParameterValueStruct"` +} + +type GetParameterNamesResponse struct { + ParameterList []ParameterInfoStruct `xml:"Body>GetParameterNamesResponse>ParameterList>ParameterInfoStruct"` +} + +type CWMPInform struct { + DeviceId DeviceID `xml:"Body>Inform>DeviceId"` + Events []EventStruct `xml:"Body>Inform>Event>EventStruct"` + ParameterList []ParameterValueStruct `xml:"Body>Inform>ParameterList>ParameterValueStruct"` +} + +func (s *SoapEnvelope) KindOf() string { + return s.Body.CWMPMessage.XMLName.Local +} + +func (i *CWMPInform) GetEvents() string { + res := "" + for idx := range i.Events { + res += i.Events[idx].EventCode + } + + return res +} + +func (i *CWMPInform) GetConnectionRequest() string { + for idx := range i.ParameterList { + // valid condition for both tr98 and tr181 + if strings.HasSuffix(i.ParameterList[idx].Name, "Device.ManagementServer.ConnectionRequestURL") { + return i.ParameterList[idx].Value + } + } + + return "" +} + +func (i *CWMPInform) GetSoftwareVersion() string { + for idx := range i.ParameterList { + if strings.HasSuffix(i.ParameterList[idx].Name, "Device.DeviceInfo.SoftwareVersion") { + return i.ParameterList[idx].Value + } + } + + return "" +} + +func (i *CWMPInform) GetHardwareVersion() string { + for idx := range i.ParameterList { + if strings.HasSuffix(i.ParameterList[idx].Name, "Device.DeviceInfo.HardwareVersion") { + return i.ParameterList[idx].Value + } + } + + return "" +} + +func (i *CWMPInform) GetDataModelType() string { + if strings.HasPrefix(i.ParameterList[0].Name, "InternetGatewayDevice") { + return "TR098" + } else if strings.HasPrefix(i.ParameterList[0].Name, "Device") { + return "TR181" + } + + return "" +} + +type DeviceID struct { + Manufacturer string + OUI string + SerialNumber string + ProductClass string +} + +func InformResponse(mustUnderstand string) string { + mustUnderstandHeader := "" + if mustUnderstand != "" { + mustUnderstandHeader = `` + mustUnderstand + `` + } + + return ` + + ` + mustUnderstandHeader + ` + + + 1 + + +` +} + +func GetParameterValues(leaf string) string { + return ` + + + + + + ` + leaf + ` + + + +` +} + +func GetParameterMultiValues(leaves []string) string { + msg := ` + + + + + ` + + for idx := range leaves { + msg += `` + leaves[idx] + `` + + } + msg += ` + + +` + return msg +} + +func SetParameterValues(leaf string, value string) string { + return ` + + + + + + + ` + leaf + ` + ` + value + ` + + + LC1309` + randToken() + ` + + +` +} + +func randToken() string { + b := make([]byte, 8) + rand.Read(b) + return fmt.Sprintf("%x", b) +} + +func SetParameterMultiValues(data map[string]string) string { + msg := ` + + + + + ` + + for key, value := range data { + msg += ` + ` + key + ` + ` + value + ` + ` + } + + msg += ` + LC1309` + randToken() + ` + + +` + + return msg +} + +func GetParameterNames(leaf string, nextlevel int) string { + return ` + + + + + ` + leaf + ` + ` + strconv.Itoa(nextlevel) + ` + + +` +} + +func FactoryReset() string { + return ` + + + + + +` +} + +func Download(filetype, url, username, password, filesize string) string { + // 3 Vendor Configuration File + // 1 Firmware Upgrade Image + + return ` + + + + + MSDWK + ` + filetype + ` + ` + url + ` + ` + username + ` + ` + password + ` + ` + filesize + ` + + 0 + + + + +` +} + +func CancelTransfer() string { + return ` + + + + + + + +` +} + +type TimeWindowStruct struct { + WindowStart string + WindowEnd string + WindowMode string + UserMessage string + MaxRetries string +} + +func (window *TimeWindowStruct) String() string { + return ` +` + window.WindowStart + ` +` + window.WindowEnd + ` +` + window.WindowMode + ` +` + window.UserMessage + ` +` + window.MaxRetries + ` +` +} + +func ScheduleDownload(filetype, url, username, password, filesize string, windowslist []fmt.Stringer) string { + ret := ` + + + + + MSDWK + ` + filetype + ` + ` + url + ` + ` + username + ` + ` + password + ` + ` + filesize + ` + + ` + + for _, op := range windowslist { + ret += op.String() + } + + ret += ` + + +` + + return ret +} + +type InstallOpStruct struct { + Url string + Uuid string + Username string + Password string + ExecutionEnvironment string +} + +func (op *InstallOpStruct) String() string { + return ` + ` + op.Url + ` + ` + op.Uuid + ` + ` + op.Username + ` + ` + op.Password + ` + ` + op.ExecutionEnvironment + ` +` +} + +type UpdateOpStruct struct { + Uuid string + Version string + Url string + Username string + Password string +} + +func (op *UpdateOpStruct) String() string { + return ` +` + op.Uuid + ` +` + op.Version + ` +` + op.Url + ` +` + op.Username + ` +` + op.Password + ` +` +} + +type UninstallOpStruct struct { + Uuid string + Version string + ExecutionEnvironment string +} + +func (op *UninstallOpStruct) String() string { + return ` +` + op.Uuid + ` +` + op.Version + ` +` + op.ExecutionEnvironment + ` +` +} + +func ChangeDuState(ops []fmt.Stringer) string { + ret := ` + + + + +` + + for _, op := range ops { + ret += op.String() + } + + ret += ` + + + +` + + return ret +} + +// CPE side + +func Inform(serial string) string { + return `5058 + ADB Broadband +0013C8 +VV5522 +` + serial + ` + + +6 CONNECTION REQUEST + + + +1 +` + time.Now().Format(time.RFC3339) + ` +0 + +InternetGatewayDevice.ManagementServer.ConnectionRequestURL +http://104.199.175.27:7547/ConnectionRequest-` + serial + ` + +InternetGatewayDevice.ManagementServer.ParameterKey + + +InternetGatewayDevice.DeviceSummary +InternetGatewayDevice:1.2[](Baseline:1,EthernetLAN:1,WiFiLAN:1,ADSLWAN:1,EthernetWAN:1,QoS:1,QoSDynamicFlow:1,Bridging:1,Time:1,IPPing:1,TraceRoute:1,DeviceAssociation:1,UDPConnReq:1),VoiceService:1.0[1](TAEndpoint:1,SIPEndpoint:1) + +InternetGatewayDevice.DeviceInfo.HardwareVersion +` + serial + ` + +InternetGatewayDevice.DeviceInfo.ProvisioningCode +ABCD + +InternetGatewayDevice.DeviceInfo.SoftwareVersion +4.0.8.17785 + +InternetGatewayDevice.DeviceInfo.SpecVersion +1.0 + +InternetGatewayDevice.WANDevice.1.WANConnectionDevice.1.WANIPConnection.1.ExternalIPAddress +12.0.0.10 + + + +` +} + +/* +func BuildGetParameterValuesResponse(serial string, leaves GetParameterValues_) string { + ret := ` + 3 + ` + + db, _ := sqlite3.Open("/tmp/cpe.db") + + n_leaves := 0 + var temp string + for _, leaf := range leaves.ParameterNames { + sql := "select key, value, tipo from params where key like '" + leaf + "%'" + for s, err := db.Query(sql); err == nil; err = s.Next() { + n_leaves++ + var key string + var value string + var tipo string + s.Scan(&key, &value, &tipo) + temp += ` + ` + key + ` + ` + value + ` + ` + } + } + + ret += `` + ret += temp + ret += `` + + return ret +} + +func BuildGetParameterNamesResponse(serial string, leaves GetParameterNames_) string { + ret := ` + 69 + ` + db, _ := sqlite3.Open("/tmp/cpe.db") + + obj := make(map[string]bool) + var temp string + for _, leaf := range leaves.ParameterPath { + fmt.Println(leaf) + sql := "select key, value, tipo from params where key like '" + leaf + "%'" + for s, err := db.Query(sql); err == nil; err = s.Next() { + var key string + var value string + var tipo string + s.Scan(&key, &value, &tipo) + var sp = strings.Split(strings.Split(key, leaf)[1], ".") + nextlevel, _ := strconv.Atoi(leaves.NextLevel) + if nextlevel == 0 { + root := leaf + obj[root] = true + for idx := range sp { + if idx == len(sp)-1 { + root = root + sp[idx] + } else { + root = root + sp[idx] + "." + } + obj[root] = true + } + } else { + if !obj[sp[0]] { + if len(sp) > 1 { + obj[leaf+sp[0]+"."] = true + } else { + obj[leaf+sp[0]] = true + } + + } + } + + } + } + + for o := range obj { + temp += ` + ` + o + ` + true + ` + } + + fmt.Println(len(obj)) + ret += `` + ret += temp + ret += `` + + return ret +} +*/ diff --git a/backend/services/controller/internal/entity/msg.go b/backend/services/controller/internal/entity/msg.go index a3e7eda..768e4bb 100644 --- a/backend/services/controller/internal/entity/msg.go +++ b/backend/services/controller/internal/entity/msg.go @@ -3,7 +3,7 @@ package entity import "time" type DataType interface { - []map[string]interface{} | *string | Device | int64 | []Device | []VendorsCount | []ProductClassCount | []StatusCount | time.Duration + []map[string]interface{} | *string | Device | int64 | []Device | []VendorsCount | []ProductClassCount | []StatusCount | time.Duration | []byte } type MsgAnswer[T DataType] struct { diff --git a/backend/services/controller/internal/nats/nats.go b/backend/services/controller/internal/nats/nats.go index f2b4998..3786604 100644 --- a/backend/services/controller/internal/nats/nats.go +++ b/backend/services/controller/internal/nats/nats.go @@ -19,8 +19,10 @@ const ( NATS_WS_ADAPTER_SUBJECT_PREFIX = "ws-adapter.usp.v1." NATS_STOMP_ADAPTER_SUBJECT_PREFIX = "stomp-adapter.usp.v1." DEVICE_SUBJECT_PREFIX = "device.usp.v1." + DEVICE_CWMP_SUBJECT_PREFIX = "device.cwmp.v1." BUCKET_NAME = "devices-auth" BUCKET_DESCRIPTION = "Devices authentication" + NATS_CWMP_ADAPTER_SUBJECT_PREFIX = "cwmp-adapter.v1." ) func StartNatsClient(c config.Nats) (jetstream.JetStream, *nats.Conn, jetstream.KeyValue) {