From f49fa5b3bf10dfdde9cd3915e55a4a7b20898656 Mon Sep 17 00:00:00 2001 From: leandrofars Date: Mon, 19 Feb 2024 22:28:29 -0300 Subject: [PATCH] feat(controller): usp message via websockets --- .../services/controller/cmd/oktopus/main.go | 4 +- .../services/controller/internal/api/api.go | 38 +++++++++++-------- .../controller/internal/api/device.go | 1 + .../controller/internal/api/fw_update.go | 10 ++++- .../services/controller/internal/api/wifi.go | 12 +++++- backend/services/controller/internal/ws/ws.go | 18 ++++++++- 6 files changed, 62 insertions(+), 21 deletions(-) diff --git a/backend/services/controller/cmd/oktopus/main.go b/backend/services/controller/cmd/oktopus/main.go index 37cd03c..1df0996 100755 --- a/backend/services/controller/cmd/oktopus/main.go +++ b/backend/services/controller/cmd/oktopus/main.go @@ -179,6 +179,8 @@ func main() { InsecureSkipVerify: *flWsSkipVerify, DB: database, Ctx: ctx, + MsgQueue: apiMsgQueue, + QMutex: &m, } wsDone = make(chan os.Signal, 1) @@ -192,7 +194,7 @@ func main() { wg.Wait() - a := api.NewApi(*flApiPort, database, &mqttClient, apiMsgQueue, &m) + a := api.NewApi(*flApiPort, database, &mqttClient, apiMsgQueue, &m, wsClient) //TODO: websockets instance api.StartApi(a) <-done diff --git a/backend/services/controller/internal/api/api.go b/backend/services/controller/internal/api/api.go index d893654..576f365 100644 --- a/backend/services/controller/internal/api/api.go +++ b/backend/services/controller/internal/api/api.go @@ -12,19 +12,19 @@ import ( "github.com/leandrofars/oktopus/internal/api/middleware" "github.com/leandrofars/oktopus/internal/db" "github.com/leandrofars/oktopus/internal/mqtt" - "github.com/leandrofars/oktopus/internal/mtp" usp_msg "github.com/leandrofars/oktopus/internal/usp_message" "github.com/leandrofars/oktopus/internal/utils" + "github.com/leandrofars/oktopus/internal/ws" "google.golang.org/protobuf/proto" ) type Api struct { - Port string - Db db.Database - Broker mtp.Broker - MsgQueue map[string](chan usp_msg.Msg) - QMutex *sync.Mutex - Mqtt mqtt.Mqtt + Port string + Db db.Database + MsgQueue map[string](chan usp_msg.Msg) + QMutex *sync.Mutex + Mqtt *mqtt.Mqtt + Websockets *ws.Ws } const REQUEST_TIMEOUT = time.Second * 30 @@ -34,14 +34,14 @@ const ( AdminUser ) -func NewApi(port string, db db.Database, mqtt *mqtt.Mqtt, msgQueue map[string](chan usp_msg.Msg), m *sync.Mutex) Api { +func NewApi(port string, db db.Database, mqtt *mqtt.Mqtt, msgQueue map[string](chan usp_msg.Msg), m *sync.Mutex, w ws.Ws) Api { return Api{ - Port: port, - Db: db, - Broker: mqtt, - MsgQueue: msgQueue, - QMutex: m, - Mqtt: *mqtt, + Port: port, + Db: db, + MsgQueue: msgQueue, + QMutex: m, + Mqtt: mqtt, + Websockets: &w, } } @@ -133,8 +133,14 @@ func (a *Api) uspCall(msg usp_msg.Msg, sn string, w http.ResponseWriter, device a.MsgQueue[msg.Header.MsgId] = make(chan usp_msg.Msg) a.QMutex.Unlock() log.Println("Sending Msg:", msg.Header.MsgId) - //TODO: Check what MTP the device is connected to - a.Broker.Publish(tr369Message, "oktopus/v1/agent/"+sn, "oktopus/v1/api/"+sn, false) + + if device.Mqtt == db.Online { + a.Mqtt.Publish(tr369Message, "oktopus/v1/agent/"+sn, "oktopus/v1/api/"+sn, false) + } else if device.Websockets == db.Online { + a.Websockets.Publish(tr369Message, "", "", false) + } else if device.Stomp == db.Online { + //TODO: send stomp message + } select { case msg := <-a.MsgQueue[msg.Header.MsgId]: diff --git a/backend/services/controller/internal/api/device.go b/backend/services/controller/internal/api/device.go index 0993583..429180c 100644 --- a/backend/services/controller/internal/api/device.go +++ b/backend/services/controller/internal/api/device.go @@ -232,6 +232,7 @@ func (a *Api) deviceUpdateMsg(w http.ResponseWriter, r *http.Request) { a.uspCall(msg, sn, w, device) } +// TODO: react this function, return err and deal with it in the caller, remove header superfluos func (a *Api) deviceExists(sn string, w http.ResponseWriter) db.Device { device, err := a.Db.RetrieveDevice(sn) if err != nil { diff --git a/backend/services/controller/internal/api/fw_update.go b/backend/services/controller/internal/api/fw_update.go index c32eec4..9807b16 100644 --- a/backend/services/controller/internal/api/fw_update.go +++ b/backend/services/controller/internal/api/fw_update.go @@ -7,6 +7,7 @@ import ( "time" "github.com/gorilla/mux" + "github.com/leandrofars/oktopus/internal/db" usp_msg "github.com/leandrofars/oktopus/internal/usp_message" "github.com/leandrofars/oktopus/internal/utils" "google.golang.org/protobuf/proto" @@ -51,7 +52,14 @@ func (a *Api) deviceFwUpdate(w http.ResponseWriter, r *http.Request) { a.MsgQueue[msg.Header.MsgId] = make(chan usp_msg.Msg) a.QMutex.Unlock() log.Println("Sending Msg:", msg.Header.MsgId) - a.Broker.Publish(tr369Message, "oktopus/v1/agent/"+sn, "oktopus/v1/api/"+sn, false) + + if device.Mqtt == db.Online { + a.Mqtt.Publish(tr369Message, "oktopus/v1/agent/"+sn, "oktopus/v1/api/"+sn, false) + } else if device.Websockets == db.Online { + a.Websockets.Publish(tr369Message, "", "", false) + } else if device.Stomp == db.Online { + //TODO: send stomp message + } var getMsgAnswer *usp_msg.GetResp diff --git a/backend/services/controller/internal/api/wifi.go b/backend/services/controller/internal/api/wifi.go index d51c18d..9de469a 100644 --- a/backend/services/controller/internal/api/wifi.go +++ b/backend/services/controller/internal/api/wifi.go @@ -10,6 +10,7 @@ import ( "github.com/gorilla/mux" + "github.com/leandrofars/oktopus/internal/db" usp_msg "github.com/leandrofars/oktopus/internal/usp_message" "github.com/leandrofars/oktopus/internal/utils" "google.golang.org/protobuf/proto" @@ -31,7 +32,7 @@ type WiFi struct { func (a *Api) deviceWifi(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) sn := vars["sn"] - a.deviceExists(sn, w) + device := a.deviceExists(sn, w) if r.Method == http.MethodGet { msg := utils.NewGetMsg(usp_msg.Get{ @@ -69,7 +70,14 @@ func (a *Api) deviceWifi(w http.ResponseWriter, r *http.Request) { a.MsgQueue[msg.Header.MsgId] = make(chan usp_msg.Msg) a.QMutex.Unlock() log.Println("Sending Msg:", msg.Header.MsgId) - a.Broker.Publish(tr369Message, "oktopus/v1/agent/"+sn, "oktopus/v1/api/"+sn, false) + + if device.Mqtt == db.Online { + a.Mqtt.Publish(tr369Message, "oktopus/v1/agent/"+sn, "oktopus/v1/api/"+sn, false) + } else if device.Websockets == db.Online { + a.Websockets.Publish(tr369Message, "", "", false) + } else if device.Stomp == db.Online { + //TODO: send stomp message + } //TODO: verify in protocol and in other models, the Device.Wifi parameters. Maybe in the future, to use SSIDReference from AccessPoint select { diff --git a/backend/services/controller/internal/ws/ws.go b/backend/services/controller/internal/ws/ws.go index e03c81f..9410dcd 100644 --- a/backend/services/controller/internal/ws/ws.go +++ b/backend/services/controller/internal/ws/ws.go @@ -12,6 +12,7 @@ import ( "github.com/gorilla/websocket" "github.com/leandrofars/oktopus/internal/db" "github.com/leandrofars/oktopus/internal/mtp/handler" + usp_msg "github.com/leandrofars/oktopus/internal/usp_message" "github.com/leandrofars/oktopus/internal/usp_record" "google.golang.org/protobuf/proto" @@ -29,6 +30,8 @@ type Ws struct { NewDeviceQueue map[string]string NewDevQMutex *sync.Mutex DB db.Database + MsgQueue map[string](chan usp_msg.Msg) + QMutex *sync.Mutex } const ( @@ -204,7 +207,20 @@ func (w *Ws) Subscribe() { continue } - //TODO: send message to Api Msg Queue + log.Println("Handle api request") + var msg usp_msg.Msg + err = proto.Unmarshal(record.GetNoSessionContext().Payload, &msg) + if err != nil { + log.Println(err) + continue + } + if _, ok := w.MsgQueue[msg.Header.MsgId]; ok { + //m.QMutex.Lock() + w.MsgQueue[msg.Header.MsgId] <- msg + //m.QMutex.Unlock() + } else { + log.Printf("Message answer to request %s arrived too late", msg.Header.MsgId) + } }