diff --git a/backend/services/acs/internal/cwmp/cwmp.go b/backend/services/acs/internal/cwmp/cwmp.go index 4d69201..a0aaa06 100644 --- a/backend/services/acs/internal/cwmp/cwmp.go +++ b/backend/services/acs/internal/cwmp/cwmp.go @@ -127,6 +127,7 @@ type DeviceID struct { Manufacturer string OUI string SerialNumber string + ProductClass string } func InformResponse(mustUnderstand string) string { diff --git a/backend/services/acs/internal/server/handler/handler.go b/backend/services/acs/internal/server/handler/handler.go index e86e28a..3f040b2 100644 --- a/backend/services/acs/internal/server/handler/handler.go +++ b/backend/services/acs/internal/server/handler/handler.go @@ -86,7 +86,7 @@ func (h *Handler) CwmpHandler(w http.ResponseWriter, r *http.Request) { body := string(tmp) len := len(body) - log.Printf("body:\n %v", body) + //log.Printf("body:\n %v", body) var envelope cwmp.SoapEnvelope xml.Unmarshal(tmp, &envelope) @@ -134,7 +134,9 @@ func (h *Handler) CwmpHandler(w http.ResponseWriter, r *http.Request) { OUI: Inform.DeviceId.OUI, Queue: lane.NewQueue(), DataModel: Inform.GetDataModelType(), - KeepConnectionOpen: false} + KeepConnectionOpen: false, + } + h.pub("cwmp.v1."+sn+".info", tmp) //TODO: send right info } obj := h.cpes[sn] cpe := &obj diff --git a/backend/services/mtp/adapter/cmd/adapter/main.go b/backend/services/mtp/adapter/cmd/adapter/main.go index cd32564..02c5192 100644 --- a/backend/services/mtp/adapter/cmd/adapter/main.go +++ b/backend/services/mtp/adapter/cmd/adapter/main.go @@ -9,7 +9,8 @@ import ( "github.com/OktopUSP/oktopus/backend/services/mtp/adapter/internal/config" "github.com/OktopUSP/oktopus/backend/services/mtp/adapter/internal/db" "github.com/OktopUSP/oktopus/backend/services/mtp/adapter/internal/events" - "github.com/OktopUSP/oktopus/backend/services/mtp/adapter/internal/events/handler" + "github.com/OktopUSP/oktopus/backend/services/mtp/adapter/internal/events/cwmp_handler" + "github.com/OktopUSP/oktopus/backend/services/mtp/adapter/internal/events/usp_handler" "github.com/OktopUSP/oktopus/backend/services/mtp/adapter/internal/nats" "github.com/OktopUSP/oktopus/backend/services/mtp/adapter/internal/reqs" ) @@ -24,9 +25,10 @@ func main() { db := db.NewDatabase(c.Mongo.Ctx, c.Mongo.Uri) - handler := handler.NewHandler(nc, js, db, c.Controller.ControllerId) + usp_handler := usp_handler.NewHandler(nc, js, db, c.Controller.ControllerId) + cwmp_handler := cwmp_handler.NewHandler(nc, js, db, c.Controller.ControllerId) - events.StartEventsListener(c.Nats.Ctx, js, handler) + events.StartEventsListener(c.Nats.Ctx, js, usp_handler, cwmp_handler) reqs.StartRequestsListener(c.Nats.Ctx, nc, db) diff --git a/backend/services/mtp/adapter/internal/cwmp/cwmp.go b/backend/services/mtp/adapter/internal/cwmp/cwmp.go new file mode 100644 index 0000000..6a8161e --- /dev/null +++ b/backend/services/mtp/adapter/internal/cwmp/cwmp.go @@ -0,0 +1,500 @@ +package cwmp + +import ( + "crypto/rand" + "encoding/xml" + "fmt" + "strconv" + "strings" +) + +type MsgType string + +const ( + INFORM = "Inform" +) + +type SoapEnvelope struct { + XMLName xml.Name + Header SoapHeader + Body SoapBody +} + +type SoapHeader struct { + Id string `xml:"ID"` +} +type SoapBody struct { + CWMPMessage CWMPMessage `xml:",any"` +} + +type CWMPMessage struct { + XMLName xml.Name +} + +type EventStruct struct { + EventCode string + CommandKey string +} + +type ParameterValueStruct struct { + Name string + Value string +} + +type ParameterInfoStruct struct { + Name string + Writable string +} + +type SetParameterValues_ struct { + ParameterList []ParameterValueStruct `xml:"Body>SetParameterValues>ParameterList>ParameterValueStruct"` + ParameterKey string `xml:"Body>SetParameterValues>ParameterKey>string"` +} + +type GetParameterValues_ struct { + ParameterNames []string `xml:"Body>GetParameterValues>ParameterNames>string"` +} + +type GetParameterNames_ struct { + ParameterPath []string `xml:"Body>GetParameterNames>ParameterPath"` + NextLevel string `xml:"Body>GetParameterNames>NextLevel"` +} + +type GetParameterValuesResponse struct { + ParameterList []ParameterValueStruct `xml:"Body>GetParameterValuesResponse>ParameterList>ParameterValueStruct"` +} + +type GetParameterNamesResponse struct { + ParameterList []ParameterInfoStruct `xml:"Body>GetParameterNamesResponse>ParameterList>ParameterInfoStruct"` +} + +type CWMPInform struct { + DeviceId DeviceID `xml:"Body>Inform>DeviceId"` + Events []EventStruct `xml:"Body>Inform>Event>EventStruct"` + ParameterList []ParameterValueStruct `xml:"Body>Inform>ParameterList>ParameterValueStruct"` +} + +func (s *SoapEnvelope) KindOf() string { + return s.Body.CWMPMessage.XMLName.Local +} + +func (i *CWMPInform) GetEvents() string { + res := "" + for idx := range i.Events { + res += i.Events[idx].EventCode + } + + return res +} + +func (i *CWMPInform) GetConnectionRequest() string { + for idx := range i.ParameterList { + // valid condition for both tr98 and tr181 + if strings.HasSuffix(i.ParameterList[idx].Name, "Device.ManagementServer.ConnectionRequestURL") { + return i.ParameterList[idx].Value + } + } + + return "" +} + +func (i *CWMPInform) GetSoftwareVersion() string { + for idx := range i.ParameterList { + if strings.HasSuffix(i.ParameterList[idx].Name, "Device.DeviceInfo.SoftwareVersion") { + return i.ParameterList[idx].Value + } + } + + return "" +} + +func (i *CWMPInform) GetHardwareVersion() string { + for idx := range i.ParameterList { + if strings.HasSuffix(i.ParameterList[idx].Name, "Device.DeviceInfo.HardwareVersion") { + return i.ParameterList[idx].Value + } + } + + return "" +} + +func (i *CWMPInform) GetDataModelType() string { + if strings.HasPrefix(i.ParameterList[0].Name, "InternetGatewayDevice") { + return "TR098" + } else if strings.HasPrefix(i.ParameterList[0].Name, "Device") { + return "TR181" + } + + return "" +} + +type DeviceID struct { + Manufacturer string + OUI string + SerialNumber string + ProductClass string +} + +func InformResponse(mustUnderstand string) string { + mustUnderstandHeader := "" + if mustUnderstand != "" { + mustUnderstandHeader = `` + mustUnderstand + `` + } + + return ` + + ` + mustUnderstandHeader + ` + + + 1 + + +` +} + +func GetParameterValues(leaf string) string { + return ` + + + + + + ` + leaf + ` + + + +` +} + +func GetParameterMultiValues(leaves []string) string { + msg := ` + + + + + ` + + for idx := range leaves { + msg += `` + leaves[idx] + `` + + } + msg += ` + + +` + return msg +} + +func SetParameterValues(leaf string, value string) string { + return ` + + + + + + + ` + leaf + ` + ` + value + ` + + + LC1309` + randToken() + ` + + +` +} + +func randToken() string { + b := make([]byte, 8) + rand.Read(b) + return fmt.Sprintf("%x", b) +} + +func SetParameterMultiValues(data map[string]string) string { + msg := ` + + + + + ` + + for key, value := range data { + msg += ` + ` + key + ` + ` + value + ` + ` + } + + msg += ` + LC1309` + randToken() + ` + + +` + + return msg +} + +func GetParameterNames(leaf string, nextlevel int) string { + return ` + + + + + ` + leaf + ` + ` + strconv.Itoa(nextlevel) + ` + + +` +} + +func FactoryReset() string { + return ` + + + + + +` +} + +func Download(filetype, url, username, password, filesize string) string { + // 3 Vendor Configuration File + // 1 Firmware Upgrade Image + + return ` + + + + + MSDWK + ` + filetype + ` + ` + url + ` + ` + username + ` + ` + password + ` + ` + filesize + ` + + 0 + + + + +` +} + +func CancelTransfer() string { + return ` + + + + + + + +` +} + +type TimeWindowStruct struct { + WindowStart string + WindowEnd string + WindowMode string + UserMessage string + MaxRetries string +} + +func (window *TimeWindowStruct) String() string { + return ` +` + window.WindowStart + ` +` + window.WindowEnd + ` +` + window.WindowMode + ` +` + window.UserMessage + ` +` + window.MaxRetries + ` +` +} + +func ScheduleDownload(filetype, url, username, password, filesize string, windowslist []fmt.Stringer) string { + ret := ` + + + + + MSDWK + ` + filetype + ` + ` + url + ` + ` + username + ` + ` + password + ` + ` + filesize + ` + + ` + + for _, op := range windowslist { + ret += op.String() + } + + ret += ` + + +` + + return ret +} + +type InstallOpStruct struct { + Url string + Uuid string + Username string + Password string + ExecutionEnvironment string +} + +func (op *InstallOpStruct) String() string { + return ` + ` + op.Url + ` + ` + op.Uuid + ` + ` + op.Username + ` + ` + op.Password + ` + ` + op.ExecutionEnvironment + ` +` +} + +type UpdateOpStruct struct { + Uuid string + Version string + Url string + Username string + Password string +} + +func (op *UpdateOpStruct) String() string { + return ` +` + op.Uuid + ` +` + op.Version + ` +` + op.Url + ` +` + op.Username + ` +` + op.Password + ` +` +} + +type UninstallOpStruct struct { + Uuid string + Version string + ExecutionEnvironment string +} + +func (op *UninstallOpStruct) String() string { + return ` +` + op.Uuid + ` +` + op.Version + ` +` + op.ExecutionEnvironment + ` +` +} + +func ChangeDuState(ops []fmt.Stringer) string { + ret := ` + + + + +` + + for _, op := range ops { + ret += op.String() + } + + ret += ` + + + +` + + return ret +} + +/* +func BuildGetParameterValuesResponse(serial string, leaves GetParameterValues_) string { + ret := ` + 3 + ` + + db, _ := sqlite3.Open("/tmp/cpe.db") + + n_leaves := 0 + var temp string + for _, leaf := range leaves.ParameterNames { + sql := "select key, value, tipo from params where key like '" + leaf + "%'" + for s, err := db.Query(sql); err == nil; err = s.Next() { + n_leaves++ + var key string + var value string + var tipo string + s.Scan(&key, &value, &tipo) + temp += ` + ` + key + ` + ` + value + ` + ` + } + } + + ret += `` + ret += temp + ret += `` + + return ret +} + +func BuildGetParameterNamesResponse(serial string, leaves GetParameterNames_) string { + ret := ` + 69 + ` + db, _ := sqlite3.Open("/tmp/cpe.db") + + obj := make(map[string]bool) + var temp string + for _, leaf := range leaves.ParameterPath { + fmt.Println(leaf) + sql := "select key, value, tipo from params where key like '" + leaf + "%'" + for s, err := db.Query(sql); err == nil; err = s.Next() { + var key string + var value string + var tipo string + s.Scan(&key, &value, &tipo) + var sp = strings.Split(strings.Split(key, leaf)[1], ".") + nextlevel, _ := strconv.Atoi(leaves.NextLevel) + if nextlevel == 0 { + root := leaf + obj[root] = true + for idx := range sp { + if idx == len(sp)-1 { + root = root + sp[idx] + } else { + root = root + sp[idx] + "." + } + obj[root] = true + } + } else { + if !obj[sp[0]] { + if len(sp) > 1 { + obj[leaf+sp[0]+"."] = true + } else { + obj[leaf+sp[0]] = true + } + + } + } + + } + } + + for o := range obj { + temp += ` + ` + o + ` + true + ` + } + + fmt.Println(len(obj)) + ret += `` + ret += temp + ret += `` + + return ret +} +*/ diff --git a/backend/services/mtp/adapter/internal/db/device.go b/backend/services/mtp/adapter/internal/db/device.go index 14dcd9d..0097c63 100644 --- a/backend/services/mtp/adapter/internal/db/device.go +++ b/backend/services/mtp/adapter/internal/db/device.go @@ -25,6 +25,15 @@ const ( Online ) +type ManagementProtocol uint8 + +const ( + UNKNOWN ManagementProtocol = iota + USP + CWMP + MATTER +) + type Device struct { SN string Model string @@ -36,6 +45,7 @@ type Device struct { Mqtt Status Stomp Status Websockets Status + Protocol ManagementProtocol } func (d *Database) CreateDevice(device Device) error { @@ -45,32 +55,34 @@ func (d *Database) CreateDevice(device Device) error { d.m.Lock() defer d.m.Unlock() - /* ------------------ Do not overwrite status of other mtp ------------------ */ - err := d.devices.FindOne(d.ctx, bson.D{{"sn", device.SN}}, nil).Decode(&deviceExistent) - if err == nil { - if deviceExistent.Mqtt == Online { - device.Mqtt = Online - } - if deviceExistent.Stomp == Online { - device.Stomp = Online - } - if deviceExistent.Websockets == Online { - device.Websockets = Online - } - } else { - if err != nil && err != mongo.ErrNoDocuments { - log.Println(err) - return err + if device.Protocol == USP { + /* ------------------ Do not overwrite status of other mtp ------------------ */ + err := d.devices.FindOne(d.ctx, bson.D{{"sn", device.SN}}, nil).Decode(&deviceExistent) + if err == nil { + if deviceExistent.Mqtt == Online { + device.Mqtt = Online + } + if deviceExistent.Stomp == Online { + device.Stomp = Online + } + if deviceExistent.Websockets == Online { + device.Websockets = Online + } + } else { + if err != mongo.ErrNoDocuments { + log.Println(err) + return err + } } + /* -------------------------------------------------------------------------- */ } - /* -------------------------------------------------------------------------- */ callback := func(sessCtx mongo.SessionContext) (interface{}, error) { // Important: You must pass sessCtx as the Context parameter to the operations for them to be executed in the // transaction. opts := options.FindOneAndReplace().SetUpsert(true) - err = d.devices.FindOneAndReplace(d.ctx, bson.D{{"sn", device.SN}}, device, opts).Decode(&result) + err := d.devices.FindOneAndReplace(d.ctx, bson.D{{"sn", device.SN}}, device, opts).Decode(&result) if err != nil { if err == mongo.ErrNoDocuments { log.Printf("New device %s added to database", device.SN) diff --git a/backend/services/mtp/adapter/internal/events/cwmp_handler/handler.go b/backend/services/mtp/adapter/internal/events/cwmp_handler/handler.go new file mode 100644 index 0000000..5f03861 --- /dev/null +++ b/backend/services/mtp/adapter/internal/events/cwmp_handler/handler.go @@ -0,0 +1,28 @@ +package cwmp_handler + +import ( + "github.com/OktopUSP/oktopus/backend/services/mtp/adapter/internal/db" + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" +) + +const ( + OFFLINE = iota + ONLINE +) + +type Handler struct { + nc *nats.Conn + js jetstream.JetStream + db db.Database + cid string +} + +func NewHandler(nc *nats.Conn, js jetstream.JetStream, d db.Database, cid string) Handler { + return Handler{ + nc: nc, + js: js, + db: d, + cid: cid, + } +} diff --git a/backend/services/mtp/adapter/internal/events/cwmp_handler/info.go b/backend/services/mtp/adapter/internal/events/cwmp_handler/info.go new file mode 100644 index 0000000..e0f5174 --- /dev/null +++ b/backend/services/mtp/adapter/internal/events/cwmp_handler/info.go @@ -0,0 +1,41 @@ +package cwmp_handler + +import ( + "encoding/xml" + "log" + + "github.com/OktopUSP/oktopus/backend/services/mtp/adapter/internal/cwmp" + "github.com/OktopUSP/oktopus/backend/services/mtp/adapter/internal/db" +) + +func (h *Handler) HandleDeviceInfo(device string, data []byte, ack func()) { + defer ack() + log.Printf("Device %s info", device) + deviceInfo := parseDeviceInfoMsg(data) + err := h.db.CreateDevice(deviceInfo) + if err != nil { + log.Printf("Failed to create device: %v", err) + } +} + +func parseDeviceInfoMsg(data []byte) db.Device { + + var inform cwmp.CWMPInform + err := xml.Unmarshal(data, &inform) + if err != nil { + log.Println("Error unmarshalling xml:", err) + } + + var device db.Device + + device.Vendor = inform.DeviceId.Manufacturer + device.Model = "" + device.Version = inform.GetSoftwareVersion() + device.ProductClass = inform.DeviceId.ProductClass + device.SN = inform.DeviceId.SerialNumber + + device.Status = db.Online + device.Protocol = db.CWMP + + return device +} diff --git a/backend/services/mtp/adapter/internal/events/events.go b/backend/services/mtp/adapter/internal/events/events.go index f5b5179..207337a 100644 --- a/backend/services/mtp/adapter/internal/events/events.go +++ b/backend/services/mtp/adapter/internal/events/events.go @@ -5,16 +5,17 @@ import ( "log" "strings" - "github.com/OktopUSP/oktopus/backend/services/mtp/adapter/internal/events/handler" + "github.com/OktopUSP/oktopus/backend/services/mtp/adapter/internal/events/cwmp_handler" + "github.com/OktopUSP/oktopus/backend/services/mtp/adapter/internal/events/usp_handler" "github.com/OktopUSP/oktopus/backend/services/mtp/adapter/internal/nats" "github.com/nats-io/nats.go/jetstream" ) -func StartEventsListener(ctx context.Context, js jetstream.JetStream, h handler.Handler) { +func StartEventsListener(ctx context.Context, js jetstream.JetStream, uspHandler usp_handler.Handler, cwmpHandler cwmp_handler.Handler) { log.Println("Listening for nats events") - events := []string{ + uspEvents := []string{ nats.MQTT_STREAM_NAME, nats.WS_STREAM_NAME, nats.STOMP_STREAM_NAME, @@ -22,8 +23,8 @@ func StartEventsListener(ctx context.Context, js jetstream.JetStream, h handler. nats.OPC_STREAM_NAME, } - for _, event := range events { - go func() { + for _, uspEvent := range uspEvents { + go func(event string) { consumer, err := js.Consumer(ctx, event, event) if err != nil { log.Fatalf("Failed to get consumer: %v", err) @@ -50,14 +51,57 @@ func StartEventsListener(ctx context.Context, js jetstream.JetStream, h handler. switch msgType { case "status": - h.HandleDeviceStatus(device, msg.Subject(), data, event, func() { msg.Ack() }) + uspHandler.HandleDeviceStatus(device, msg.Subject(), data, event, func() { msg.Ack() }) case "info": - h.HandleDeviceInfo(device, msg.Subject(), data, event, func() { msg.Ack() }) + uspHandler.HandleDeviceInfo(device, msg.Subject(), data, event, func() { msg.Ack() }) default: log.Printf("Unknown message type received, subject: %s", msg.Subject()) msg.Ack() } } - }() + }(uspEvent) + } + + cwmpEvents := []string{ + nats.CWMP_STREAM_NAME, + } + + for _, cwmpEvent := range cwmpEvents { + go func(event string) { + consumer, err := js.Consumer(ctx, event, event) + if err != nil { + log.Fatalf("Failed to get consumer: %v", err) + } + messages, err := consumer.Messages() + if err != nil { + log.Fatalf("Failed to get consumer messages: %v", err) + } + defer messages.Stop() + for { + msg, err := messages.Next() + if err != nil { + log.Println("Error to get next message:", err) + continue + } + + data := msg.Data() + + log.Printf("Received message, subject: %s", msg.Subject()) + + subject := strings.Split(msg.Subject(), ".") + msgType := subject[len(subject)-1] + device := subject[len(subject)-2] + + switch msgType { + case "status": + uspHandler.HandleDeviceStatus(device, msg.Subject(), data, event, func() { msg.Ack() }) + case "info": + cwmpHandler.HandleDeviceInfo(device, data, func() { msg.Ack() }) + default: + log.Printf("Unknown message type received, subject: %s", msg.Subject()) + msg.Ack() + } + } + }(cwmpEvent) } } diff --git a/backend/services/mtp/adapter/internal/events/handler/status.go b/backend/services/mtp/adapter/internal/events/handler/status.go deleted file mode 100644 index 8fefd2e..0000000 --- a/backend/services/mtp/adapter/internal/events/handler/status.go +++ /dev/null @@ -1,72 +0,0 @@ -package handler - -import ( - "log" - "strconv" - - "github.com/OktopUSP/oktopus/backend/services/mtp/adapter/internal/db" - "github.com/OktopUSP/oktopus/backend/services/mtp/adapter/internal/usp" - "github.com/OktopUSP/oktopus/backend/services/mtp/adapter/internal/usp/usp_msg" - "google.golang.org/protobuf/proto" -) - -func (h *Handler) HandleDeviceStatus(device, subject string, data []byte, mtp string, ack func()) { - defer ack() - payload, err := strconv.Atoi(string(data)) - if err != nil { - log.Printf("Status subject payload message error %q", err) - } - - switch payload { - case ONLINE: - h.deviceOnline(device, mtp) - case OFFLINE: - h.deviceOffline(device, mtp) - default: - ignoreMsg(subject, "status", data) - } -} - -func (h *Handler) deviceOnline(device, mtp string) { - - log.Printf("Device %s is online", device) - - msg := usp.NewGetMsg(usp_msg.Get{ - ParamPaths: []string{ - "Device.DeviceInfo.Manufacturer", - "Device.DeviceInfo.ModelName", - "Device.DeviceInfo.SoftwareVersion", - "Device.DeviceInfo.SerialNumber", - "Device.DeviceInfo.ProductClass", - }, - MaxDepth: 1, - }) - - payload, _ := proto.Marshal(&msg) - record := usp.NewUspRecord(payload, device, h.cid) - - tr369Message, err := proto.Marshal(&record) - if err != nil { - log.Fatalln("Failed to encode tr369 record:", err) - } - - err = h.nc.Publish(mtp+"-adapter.usp.v1."+device+".info", tr369Message) - if err != nil { - log.Printf("Failed to publish online device message: %v", err) - } -} - -func (h *Handler) deviceOffline(device, mtp string) { - log.Printf("Device %s is offline", device) - - mtpLayer := getMtp(mtp) - - err := h.db.UpdateStatus(device, db.Offline, mtpLayer) - if err != nil { - log.Fatal(err) - } -} - -func ignoreMsg(subject, ctx string, data []byte) { - log.Printf("Unknown message of %s received, subject: %s, payload: %s. Ignored...", ctx, subject, string(data)) -} diff --git a/backend/services/mtp/adapter/internal/events/handler/handler.go b/backend/services/mtp/adapter/internal/events/usp_handler/handler.go similarity index 95% rename from backend/services/mtp/adapter/internal/events/handler/handler.go rename to backend/services/mtp/adapter/internal/events/usp_handler/handler.go index 439e3ed..ef0263a 100644 --- a/backend/services/mtp/adapter/internal/events/handler/handler.go +++ b/backend/services/mtp/adapter/internal/events/usp_handler/handler.go @@ -1,4 +1,4 @@ -package handler +package usp_handler import ( "github.com/OktopUSP/oktopus/backend/services/mtp/adapter/internal/db" diff --git a/backend/services/mtp/adapter/internal/events/handler/info.go b/backend/services/mtp/adapter/internal/events/usp_handler/info.go similarity index 97% rename from backend/services/mtp/adapter/internal/events/handler/info.go rename to backend/services/mtp/adapter/internal/events/usp_handler/info.go index 32846ce..6024f49 100644 --- a/backend/services/mtp/adapter/internal/events/handler/info.go +++ b/backend/services/mtp/adapter/internal/events/usp_handler/info.go @@ -1,4 +1,4 @@ -package handler +package usp_handler import ( "log" @@ -64,6 +64,7 @@ func parseDeviceInfoMsg(sn, subject string, data []byte, mtp db.MTP) db.Device { } device.Status = db.Online + device.Protocol = db.USP return device } diff --git a/backend/services/mtp/adapter/internal/events/usp_handler/status.go b/backend/services/mtp/adapter/internal/events/usp_handler/status.go new file mode 100644 index 0000000..b9e2c40 --- /dev/null +++ b/backend/services/mtp/adapter/internal/events/usp_handler/status.go @@ -0,0 +1,38 @@ +package usp_handler + +import ( + "log" + "strconv" + + "github.com/OktopUSP/oktopus/backend/services/mtp/adapter/internal/db" +) + +func (h *Handler) HandleDeviceStatus(device, subject string, data []byte, mtp string, ack func()) { + defer ack() + payload, err := strconv.Atoi(string(data)) + if err != nil { + log.Printf("Status subject payload message error %q", err) + } + + switch payload { + case OFFLINE: + h.deviceOffline(device, mtp) + default: + ignoreMsg(subject, "status", data) + } +} + +func (h *Handler) deviceOffline(device, mtp string) { + log.Printf("Device %s is offline", device) + + mtpLayer := getMtp(mtp) + + err := h.db.UpdateStatus(device, db.Offline, mtpLayer) + if err != nil { + log.Fatal(err) + } +} + +func ignoreMsg(subject, ctx string, data []byte) { + log.Printf("Unknown message of %s received, subject: %s, payload: %s. Ignored...", ctx, subject, string(data)) +} diff --git a/backend/services/mtp/adapter/internal/nats/nats.go b/backend/services/mtp/adapter/internal/nats/nats.go index 7754adf..3a9a56e 100644 --- a/backend/services/mtp/adapter/internal/nats/nats.go +++ b/backend/services/mtp/adapter/internal/nats/nats.go @@ -17,6 +17,7 @@ const ( STOMP_STREAM_NAME = "stomp" LORA_STREAM_NAME = "lora" OPC_STREAM_NAME = "opc" + CWMP_STREAM_NAME = "cwmp" ADAPTER_SUBJECT = "adapter" + USP_SUBJECT USP_SUBJECT = ".usp.v1." BUCKET_NAME = "devices-auth" @@ -118,6 +119,7 @@ func defineStreams() []string { STOMP_STREAM_NAME, LORA_STREAM_NAME, OPC_STREAM_NAME, + CWMP_STREAM_NAME, } } @@ -128,6 +130,7 @@ func defineConsumers() []string { STOMP_STREAM_NAME, LORA_STREAM_NAME, OPC_STREAM_NAME, + CWMP_STREAM_NAME, } }