feat: cwmp device status management
This commit is contained in:
parent
f596cf9e7b
commit
5487ede44e
|
|
@ -29,12 +29,8 @@ type CPE struct {
|
||||||
Manufacturer string
|
Manufacturer string
|
||||||
OUI string
|
OUI string
|
||||||
ConnectionRequestURL string
|
ConnectionRequestURL string
|
||||||
XmppId string
|
|
||||||
XmppUsername string
|
|
||||||
XmppPassword string
|
|
||||||
SoftwareVersion string
|
SoftwareVersion string
|
||||||
ExternalIPAddress string
|
ExternalIPAddress string
|
||||||
State string
|
|
||||||
Queue *lane.Queue
|
Queue *lane.Queue
|
||||||
Waiting *Request
|
Waiting *Request
|
||||||
HardwareVersion string
|
HardwareVersion string
|
||||||
|
|
@ -86,8 +82,6 @@ func (h *Handler) CwmpHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
body := string(tmp)
|
body := string(tmp)
|
||||||
len := len(body)
|
len := len(body)
|
||||||
|
|
||||||
//log.Printf("body:\n %v", body)
|
|
||||||
|
|
||||||
var envelope cwmp.SoapEnvelope
|
var envelope cwmp.SoapEnvelope
|
||||||
xml.Unmarshal(tmp, &envelope)
|
xml.Unmarshal(tmp, &envelope)
|
||||||
|
|
||||||
|
|
@ -136,7 +130,8 @@ func (h *Handler) CwmpHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
DataModel: Inform.GetDataModelType(),
|
DataModel: Inform.GetDataModelType(),
|
||||||
KeepConnectionOpen: false,
|
KeepConnectionOpen: false,
|
||||||
}
|
}
|
||||||
h.pub("cwmp.v1."+sn+".info", tmp) //TODO: send right info
|
go h.handleCpeStatus(sn)
|
||||||
|
h.pub("cwmp.v1."+sn+".info", tmp)
|
||||||
}
|
}
|
||||||
obj := h.cpes[sn]
|
obj := h.cpes[sn]
|
||||||
cpe := &obj
|
cpe := &obj
|
||||||
|
|
|
||||||
24
backend/services/acs/internal/server/handler/status.go
Normal file
24
backend/services/acs/internal/server/handler/status.go
Normal file
|
|
@ -0,0 +1,24 @@
|
||||||
|
package handler
|
||||||
|
|
||||||
|
import (
|
||||||
|
"log"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TODO: make these consts dynamic via config
|
||||||
|
const (
|
||||||
|
CHECK_STATUS_INTERVAL = 5 * time.Second
|
||||||
|
KEEP_ALIVE_INTERVAL = 10 * time.Second
|
||||||
|
)
|
||||||
|
|
||||||
|
func (h *Handler) handleCpeStatus(cpe string) {
|
||||||
|
for {
|
||||||
|
if time.Since(h.cpes[cpe].LastConnection) > KEEP_ALIVE_INTERVAL {
|
||||||
|
delete(h.cpes, cpe)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
time.Sleep(CHECK_STATUS_INTERVAL)
|
||||||
|
}
|
||||||
|
log.Println("CPE", cpe, "is offline")
|
||||||
|
h.pub("cwmp.v1."+cpe+".status", []byte("0"))
|
||||||
|
}
|
||||||
|
|
@ -15,6 +15,7 @@ const (
|
||||||
MQTT
|
MQTT
|
||||||
STOMP
|
STOMP
|
||||||
WEBSOCKETS
|
WEBSOCKETS
|
||||||
|
CWMP
|
||||||
)
|
)
|
||||||
|
|
||||||
type Status uint8
|
type Status uint8
|
||||||
|
|
@ -25,15 +26,6 @@ const (
|
||||||
Online
|
Online
|
||||||
)
|
)
|
||||||
|
|
||||||
type ManagementProtocol uint8
|
|
||||||
|
|
||||||
const (
|
|
||||||
UNKNOWN ManagementProtocol = iota
|
|
||||||
USP
|
|
||||||
CWMP
|
|
||||||
MATTER
|
|
||||||
)
|
|
||||||
|
|
||||||
type Device struct {
|
type Device struct {
|
||||||
SN string
|
SN string
|
||||||
Model string
|
Model string
|
||||||
|
|
@ -45,7 +37,7 @@ type Device struct {
|
||||||
Mqtt Status
|
Mqtt Status
|
||||||
Stomp Status
|
Stomp Status
|
||||||
Websockets Status
|
Websockets Status
|
||||||
Protocol ManagementProtocol
|
Cwmp Status
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Database) CreateDevice(device Device) error {
|
func (d *Database) CreateDevice(device Device) error {
|
||||||
|
|
@ -55,27 +47,28 @@ func (d *Database) CreateDevice(device Device) error {
|
||||||
d.m.Lock()
|
d.m.Lock()
|
||||||
defer d.m.Unlock()
|
defer d.m.Unlock()
|
||||||
|
|
||||||
if device.Protocol == USP {
|
/* ------------------ Do not overwrite status of other mtp ------------------ */
|
||||||
/* ------------------ Do not overwrite status of other mtp ------------------ */
|
err := d.devices.FindOne(d.ctx, bson.D{{"sn", device.SN}}, nil).Decode(&deviceExistent)
|
||||||
err := d.devices.FindOne(d.ctx, bson.D{{"sn", device.SN}}, nil).Decode(&deviceExistent)
|
if err == nil {
|
||||||
if err == nil {
|
if deviceExistent.Mqtt == Online {
|
||||||
if deviceExistent.Mqtt == Online {
|
device.Mqtt = Online
|
||||||
device.Mqtt = Online
|
}
|
||||||
}
|
if deviceExistent.Stomp == Online {
|
||||||
if deviceExistent.Stomp == Online {
|
device.Stomp = Online
|
||||||
device.Stomp = Online
|
}
|
||||||
}
|
if deviceExistent.Websockets == Online {
|
||||||
if deviceExistent.Websockets == Online {
|
device.Websockets = Online
|
||||||
device.Websockets = Online
|
}
|
||||||
}
|
if deviceExistent.Cwmp == Online {
|
||||||
} else {
|
device.Cwmp = Online
|
||||||
if err != mongo.ErrNoDocuments {
|
}
|
||||||
log.Println(err)
|
} else {
|
||||||
return err
|
if err != mongo.ErrNoDocuments {
|
||||||
}
|
log.Println(err)
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
/* -------------------------------------------------------------------------- */
|
|
||||||
}
|
}
|
||||||
|
/* -------------------------------------------------------------------------- */
|
||||||
|
|
||||||
callback := func(sessCtx mongo.SessionContext) (interface{}, error) {
|
callback := func(sessCtx mongo.SessionContext) (interface{}, error) {
|
||||||
// Important: You must pass sessCtx as the Context parameter to the operations for them to be executed in the
|
// Important: You must pass sessCtx as the Context parameter to the operations for them to be executed in the
|
||||||
|
|
@ -173,6 +166,8 @@ func (m MTP) String() string {
|
||||||
return "stomp"
|
return "stomp"
|
||||||
case WEBSOCKETS:
|
case WEBSOCKETS:
|
||||||
return "websockets"
|
return "websockets"
|
||||||
|
case CWMP:
|
||||||
|
return "cwmp"
|
||||||
}
|
}
|
||||||
return "unknown"
|
return "unknown"
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -35,16 +35,18 @@ func (d *Database) UpdateStatus(sn string, status Status, mtp MTP) error {
|
||||||
result.Stomp = status
|
result.Stomp = status
|
||||||
case WEBSOCKETS:
|
case WEBSOCKETS:
|
||||||
result.Websockets = status
|
result.Websockets = status
|
||||||
|
case CWMP:
|
||||||
|
result.Cwmp = status
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
check if the global status needs update
|
check if the global status needs update
|
||||||
*/
|
*/
|
||||||
var globalStatus primitive.E
|
var globalStatus primitive.E
|
||||||
if result.Mqtt == Offline && result.Stomp == Offline && result.Websockets == Offline {
|
if result.Mqtt == Offline && result.Stomp == Offline && result.Websockets == Offline && result.Cwmp == Offline {
|
||||||
globalStatus = primitive.E{"status", Offline}
|
globalStatus = primitive.E{"status", Offline}
|
||||||
}
|
}
|
||||||
if result.Mqtt == Online || result.Stomp == Online || result.Websockets == Online {
|
if result.Mqtt == Online || result.Stomp == Online || result.Websockets == Online || result.Cwmp == Online {
|
||||||
globalStatus = primitive.E{"status", Online}
|
globalStatus = primitive.E{"status", Online}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -33,9 +33,8 @@ func parseDeviceInfoMsg(data []byte) db.Device {
|
||||||
device.Version = inform.GetSoftwareVersion()
|
device.Version = inform.GetSoftwareVersion()
|
||||||
device.ProductClass = inform.DeviceId.ProductClass
|
device.ProductClass = inform.DeviceId.ProductClass
|
||||||
device.SN = inform.DeviceId.SerialNumber
|
device.SN = inform.DeviceId.SerialNumber
|
||||||
|
device.Cwmp = db.Online
|
||||||
device.Status = db.Online
|
device.Status = db.Online
|
||||||
device.Protocol = db.CWMP
|
|
||||||
|
|
||||||
return device
|
return device
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,36 @@
|
||||||
|
package cwmp_handler
|
||||||
|
|
||||||
|
import (
|
||||||
|
"log"
|
||||||
|
"strconv"
|
||||||
|
|
||||||
|
"github.com/OktopUSP/oktopus/backend/services/mtp/adapter/internal/db"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (h *Handler) HandleDeviceStatus(device, subject string, data []byte, ack func()) {
|
||||||
|
defer ack()
|
||||||
|
payload, err := strconv.Atoi(string(data))
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("Status subject payload message error %q", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
switch payload {
|
||||||
|
case OFFLINE:
|
||||||
|
h.deviceOffline(device)
|
||||||
|
default:
|
||||||
|
ignoreMsg(subject, "status", data)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *Handler) deviceOffline(device string) {
|
||||||
|
log.Printf("Device %s is offline", device)
|
||||||
|
|
||||||
|
err := h.db.UpdateStatus(device, db.Offline, db.CWMP)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func ignoreMsg(subject, ctx string, data []byte) {
|
||||||
|
log.Printf("Unknown message of %s received, subject: %s, payload: %s. Ignored...", ctx, subject, string(data))
|
||||||
|
}
|
||||||
|
|
@ -94,7 +94,7 @@ func StartEventsListener(ctx context.Context, js jetstream.JetStream, uspHandler
|
||||||
|
|
||||||
switch msgType {
|
switch msgType {
|
||||||
case "status":
|
case "status":
|
||||||
uspHandler.HandleDeviceStatus(device, msg.Subject(), data, event, func() { msg.Ack() })
|
cwmpHandler.HandleDeviceStatus(device, msg.Subject(), data, func() { msg.Ack() })
|
||||||
case "info":
|
case "info":
|
||||||
cwmpHandler.HandleDeviceInfo(device, data, func() { msg.Ack() })
|
cwmpHandler.HandleDeviceInfo(device, data, func() { msg.Ack() })
|
||||||
default:
|
default:
|
||||||
|
|
|
||||||
|
|
@ -64,7 +64,6 @@ func parseDeviceInfoMsg(sn, subject string, data []byte, mtp db.MTP) db.Device {
|
||||||
}
|
}
|
||||||
|
|
||||||
device.Status = db.Online
|
device.Status = db.Online
|
||||||
device.Protocol = db.USP
|
|
||||||
|
|
||||||
return device
|
return device
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue
Block a user