diff --git a/backend/services/acs/internal/server/handler/handler.go b/backend/services/acs/internal/server/handler/handler.go index 3f040b2..f5e6225 100644 --- a/backend/services/acs/internal/server/handler/handler.go +++ b/backend/services/acs/internal/server/handler/handler.go @@ -29,12 +29,8 @@ type CPE struct { Manufacturer string OUI string ConnectionRequestURL string - XmppId string - XmppUsername string - XmppPassword string SoftwareVersion string ExternalIPAddress string - State string Queue *lane.Queue Waiting *Request HardwareVersion string @@ -86,8 +82,6 @@ func (h *Handler) CwmpHandler(w http.ResponseWriter, r *http.Request) { body := string(tmp) len := len(body) - //log.Printf("body:\n %v", body) - var envelope cwmp.SoapEnvelope xml.Unmarshal(tmp, &envelope) @@ -136,7 +130,8 @@ func (h *Handler) CwmpHandler(w http.ResponseWriter, r *http.Request) { DataModel: Inform.GetDataModelType(), KeepConnectionOpen: false, } - h.pub("cwmp.v1."+sn+".info", tmp) //TODO: send right info + go h.handleCpeStatus(sn) + h.pub("cwmp.v1."+sn+".info", tmp) } obj := h.cpes[sn] cpe := &obj diff --git a/backend/services/acs/internal/server/handler/status.go b/backend/services/acs/internal/server/handler/status.go new file mode 100644 index 0000000..1402629 --- /dev/null +++ b/backend/services/acs/internal/server/handler/status.go @@ -0,0 +1,24 @@ +package handler + +import ( + "log" + "time" +) + +// TODO: make these consts dynamic via config +const ( + CHECK_STATUS_INTERVAL = 5 * time.Second + KEEP_ALIVE_INTERVAL = 10 * time.Second +) + +func (h *Handler) handleCpeStatus(cpe string) { + for { + if time.Since(h.cpes[cpe].LastConnection) > KEEP_ALIVE_INTERVAL { + delete(h.cpes, cpe) + break + } + time.Sleep(CHECK_STATUS_INTERVAL) + } + log.Println("CPE", cpe, "is offline") + h.pub("cwmp.v1."+cpe+".status", []byte("0")) +} diff --git a/backend/services/mtp/adapter/internal/db/device.go b/backend/services/mtp/adapter/internal/db/device.go index 0097c63..3fc3cfb 100644 --- a/backend/services/mtp/adapter/internal/db/device.go +++ b/backend/services/mtp/adapter/internal/db/device.go @@ -15,6 +15,7 @@ const ( MQTT STOMP WEBSOCKETS + CWMP ) type Status uint8 @@ -25,15 +26,6 @@ const ( Online ) -type ManagementProtocol uint8 - -const ( - UNKNOWN ManagementProtocol = iota - USP - CWMP - MATTER -) - type Device struct { SN string Model string @@ -45,7 +37,7 @@ type Device struct { Mqtt Status Stomp Status Websockets Status - Protocol ManagementProtocol + Cwmp Status } func (d *Database) CreateDevice(device Device) error { @@ -55,27 +47,28 @@ func (d *Database) CreateDevice(device Device) error { d.m.Lock() defer d.m.Unlock() - if device.Protocol == USP { - /* ------------------ Do not overwrite status of other mtp ------------------ */ - err := d.devices.FindOne(d.ctx, bson.D{{"sn", device.SN}}, nil).Decode(&deviceExistent) - if err == nil { - if deviceExistent.Mqtt == Online { - device.Mqtt = Online - } - if deviceExistent.Stomp == Online { - device.Stomp = Online - } - if deviceExistent.Websockets == Online { - device.Websockets = Online - } - } else { - if err != mongo.ErrNoDocuments { - log.Println(err) - return err - } + /* ------------------ Do not overwrite status of other mtp ------------------ */ + err := d.devices.FindOne(d.ctx, bson.D{{"sn", device.SN}}, nil).Decode(&deviceExistent) + if err == nil { + if deviceExistent.Mqtt == Online { + device.Mqtt = Online + } + if deviceExistent.Stomp == Online { + device.Stomp = Online + } + if deviceExistent.Websockets == Online { + device.Websockets = Online + } + if deviceExistent.Cwmp == Online { + device.Cwmp = Online + } + } else { + if err != mongo.ErrNoDocuments { + log.Println(err) + return err } - /* -------------------------------------------------------------------------- */ } + /* -------------------------------------------------------------------------- */ callback := func(sessCtx mongo.SessionContext) (interface{}, error) { // Important: You must pass sessCtx as the Context parameter to the operations for them to be executed in the @@ -173,6 +166,8 @@ func (m MTP) String() string { return "stomp" case WEBSOCKETS: return "websockets" + case CWMP: + return "cwmp" } return "unknown" } diff --git a/backend/services/mtp/adapter/internal/db/status.go b/backend/services/mtp/adapter/internal/db/status.go index cff6bdf..47ebf29 100644 --- a/backend/services/mtp/adapter/internal/db/status.go +++ b/backend/services/mtp/adapter/internal/db/status.go @@ -35,16 +35,18 @@ func (d *Database) UpdateStatus(sn string, status Status, mtp MTP) error { result.Stomp = status case WEBSOCKETS: result.Websockets = status + case CWMP: + result.Cwmp = status } /* check if the global status needs update */ var globalStatus primitive.E - if result.Mqtt == Offline && result.Stomp == Offline && result.Websockets == Offline { + if result.Mqtt == Offline && result.Stomp == Offline && result.Websockets == Offline && result.Cwmp == Offline { globalStatus = primitive.E{"status", Offline} } - if result.Mqtt == Online || result.Stomp == Online || result.Websockets == Online { + if result.Mqtt == Online || result.Stomp == Online || result.Websockets == Online || result.Cwmp == Online { globalStatus = primitive.E{"status", Online} } diff --git a/backend/services/mtp/adapter/internal/events/cwmp_handler/info.go b/backend/services/mtp/adapter/internal/events/cwmp_handler/info.go index e0f5174..d46a895 100644 --- a/backend/services/mtp/adapter/internal/events/cwmp_handler/info.go +++ b/backend/services/mtp/adapter/internal/events/cwmp_handler/info.go @@ -33,9 +33,8 @@ func parseDeviceInfoMsg(data []byte) db.Device { device.Version = inform.GetSoftwareVersion() device.ProductClass = inform.DeviceId.ProductClass device.SN = inform.DeviceId.SerialNumber - + device.Cwmp = db.Online device.Status = db.Online - device.Protocol = db.CWMP return device } diff --git a/backend/services/mtp/adapter/internal/events/cwmp_handler/status.go b/backend/services/mtp/adapter/internal/events/cwmp_handler/status.go new file mode 100644 index 0000000..4a6502c --- /dev/null +++ b/backend/services/mtp/adapter/internal/events/cwmp_handler/status.go @@ -0,0 +1,36 @@ +package cwmp_handler + +import ( + "log" + "strconv" + + "github.com/OktopUSP/oktopus/backend/services/mtp/adapter/internal/db" +) + +func (h *Handler) HandleDeviceStatus(device, subject string, data []byte, ack func()) { + defer ack() + payload, err := strconv.Atoi(string(data)) + if err != nil { + log.Printf("Status subject payload message error %q", err) + } + + switch payload { + case OFFLINE: + h.deviceOffline(device) + default: + ignoreMsg(subject, "status", data) + } +} + +func (h *Handler) deviceOffline(device string) { + log.Printf("Device %s is offline", device) + + err := h.db.UpdateStatus(device, db.Offline, db.CWMP) + if err != nil { + log.Fatal(err) + } +} + +func ignoreMsg(subject, ctx string, data []byte) { + log.Printf("Unknown message of %s received, subject: %s, payload: %s. Ignored...", ctx, subject, string(data)) +} diff --git a/backend/services/mtp/adapter/internal/events/events.go b/backend/services/mtp/adapter/internal/events/events.go index 207337a..1148fe8 100644 --- a/backend/services/mtp/adapter/internal/events/events.go +++ b/backend/services/mtp/adapter/internal/events/events.go @@ -94,7 +94,7 @@ func StartEventsListener(ctx context.Context, js jetstream.JetStream, uspHandler switch msgType { case "status": - uspHandler.HandleDeviceStatus(device, msg.Subject(), data, event, func() { msg.Ack() }) + cwmpHandler.HandleDeviceStatus(device, msg.Subject(), data, func() { msg.Ack() }) case "info": cwmpHandler.HandleDeviceInfo(device, data, func() { msg.Ack() }) default: diff --git a/backend/services/mtp/adapter/internal/events/usp_handler/info.go b/backend/services/mtp/adapter/internal/events/usp_handler/info.go index 6024f49..07acca7 100644 --- a/backend/services/mtp/adapter/internal/events/usp_handler/info.go +++ b/backend/services/mtp/adapter/internal/events/usp_handler/info.go @@ -64,7 +64,6 @@ func parseDeviceInfoMsg(sn, subject string, data []byte, mtp db.MTP) db.Device { } device.Status = db.Online - device.Protocol = db.USP return device }