From 60b2040b5a842f2036a13fb87612ceb473e2ed71 Mon Sep 17 00:00:00 2001 From: leandrofars Date: Thu, 21 Mar 2024 21:05:16 -0300 Subject: [PATCH] feat(controller): websockets communication --- agent/oktopus-websockets-obuspa.txt | 2 +- .../services/controller/internal/api/api.go | 18 +- .../controller/internal/api/device.go | 207 ------------- .../services/controller/internal/api/usp.go | 274 ++++++++++++++++++ .../services/controller/internal/api/utils.go | 92 ++++++ .../controller/internal/entity/mtp.go | 7 + .../services/controller/internal/nats/nats.go | 3 + .../services/mtp/adapter/internal/db/db.go | 32 +- .../mtp/adapter/internal/events/events.go | 1 + .../mqtt-adapter/internal/bridge/bridge.go | 3 +- .../mtp/ws-adapter/internal/bridge/bridge.go | 41 +-- 11 files changed, 429 insertions(+), 251 deletions(-) create mode 100644 backend/services/controller/internal/api/usp.go create mode 100644 backend/services/controller/internal/api/utils.go create mode 100644 backend/services/controller/internal/entity/mtp.go diff --git a/agent/oktopus-websockets-obuspa.txt b/agent/oktopus-websockets-obuspa.txt index c1cd658..f386fcf 100644 --- a/agent/oktopus-websockets-obuspa.txt +++ b/agent/oktopus-websockets-obuspa.txt @@ -23,7 +23,7 @@ Device.LocalAgent.EndpointID "oktopus-0-ws" # Controller's websocket server (for agent initiated sessions) Device.LocalAgent.Controller.1.EndpointID "oktopusController" -Device.LocalAgent.Controller.1.MTP.1.WebSocket.Host "host.docker.internal" +Device.LocalAgent.Controller.1.MTP.1.WebSocket.Host "127.0.0.1" Device.LocalAgent.Controller.1.MTP.1.WebSocket.Port "8080" Device.LocalAgent.Controller.1.MTP.1.WebSocket.Path "ws/agent" Device.LocalAgent.Controller.1.MTP.1.WebSocket.EnableEncryption "false" diff --git a/backend/services/controller/internal/api/api.go b/backend/services/controller/internal/api/api.go index 3c864a3..254dd31 100644 --- a/backend/services/controller/internal/api/api.go +++ b/backend/services/controller/internal/api/api.go @@ -53,15 +53,15 @@ func (a *Api) StartApi() { iot := r.PathPrefix("/api/device").Subrouter() // iot.HandleFunc("", a.retrieveDevices).Methods("GET") // iot.HandleFunc("/{id}", a.retrieveDevices).Methods("GET") - // iot.HandleFunc("/{sn}/get", a.deviceGetMsg).Methods("PUT") - // iot.HandleFunc("/{sn}/add", a.deviceCreateMsg).Methods("PUT") - // iot.HandleFunc("/{sn}/del", a.deviceDeleteMsg).Methods("PUT") - // iot.HandleFunc("/{sn}/set", a.deviceUpdateMsg).Methods("PUT") - iot.HandleFunc("/{sn}/parameters", a.deviceGetSupportedParametersMsg).Methods("PUT") - // iot.HandleFunc("/{sn}/instances", a.deviceGetParameterInstances).Methods("PUT") - // iot.HandleFunc("/{sn}/operate", a.deviceOperateMsg).Methods("PUT") - // iot.HandleFunc("/{sn}/fw_update", a.deviceFwUpdate).Methods("PUT") - // iot.HandleFunc("/{sn}/wifi", a.deviceWifi).Methods("PUT", "GET") + iot.HandleFunc("/{sn}/{mtp}/get", a.deviceGetMsg).Methods("PUT") + iot.HandleFunc("/{sn}/{mtp}/add", a.deviceCreateMsg).Methods("PUT") + iot.HandleFunc("/{sn}/{mtp}/del", a.deviceDeleteMsg).Methods("PUT") + iot.HandleFunc("/{sn}/{mtp}/set", a.deviceUpdateMsg).Methods("PUT") + iot.HandleFunc("/{sn}/{mtp}/parameters", a.deviceGetSupportedParametersMsg).Methods("PUT") + iot.HandleFunc("/{sn}/{mtp}/instances", a.deviceGetParameterInstances).Methods("PUT") + iot.HandleFunc("/{sn}/{mtp}/operate", a.deviceOperateMsg).Methods("PUT") + // iot.HandleFunc("/{sn}/{mtp}/fw_update", a.deviceFwUpdate).Methods("PUT") + // iot.HandleFunc("/{sn}/{mtp}/wifi", a.deviceWifi).Methods("PUT", "GET") // mtp := r.PathPrefix("/api/mtp").Subrouter() // mtp.HandleFunc("", a.mtpInfo).Methods("GET") // dash := r.PathPrefix("/api/info").Subrouter() diff --git a/backend/services/controller/internal/api/device.go b/backend/services/controller/internal/api/device.go index 252c39f..0f00ca1 100644 --- a/backend/services/controller/internal/api/device.go +++ b/backend/services/controller/internal/api/device.go @@ -1,85 +1,5 @@ package api -import ( - "net/http" - - "github.com/gorilla/mux" - "github.com/leandrofars/oktopus/internal/bridge" - "github.com/leandrofars/oktopus/internal/entity" - local "github.com/leandrofars/oktopus/internal/nats" - "github.com/leandrofars/oktopus/internal/usp/usp_msg" - "github.com/leandrofars/oktopus/internal/usp/usp_record" - "github.com/leandrofars/oktopus/internal/usp/usp_utils" - "github.com/leandrofars/oktopus/internal/utils" - "google.golang.org/protobuf/proto" -) - -func (a *Api) deviceGetSupportedParametersMsg(w http.ResponseWriter, r *http.Request) { - vars := mux.Vars(r) - sn := vars["sn"] - - device, err := bridge.NatsReq[entity.Device]( - local.NATS_ADAPTER_SUBJECT+sn+".device", - []byte(""), - w, - a.nc, - ) - if err != nil { - return - } - - if device.Msg.Status != entity.Online { - w.WriteHeader(http.StatusServiceUnavailable) - w.Write(utils.Marshall("Device is offline")) - return - } - - var getSupportedDM usp_msg.GetSupportedDM - utils.MarshallDecoder(&getSupportedDM, r.Body) - msg := usp_utils.NewGetSupportedParametersMsg(getSupportedDM) - protoMsg, err := proto.Marshal(&msg) - if err != nil { - w.WriteHeader(http.StatusInternalServerError) - w.Write(utils.Marshall(err.Error())) - return - } - record := usp_utils.NewUspRecord(protoMsg, sn) - protoRecord, err := proto.Marshal(&record) - if err != nil { - w.WriteHeader(http.StatusInternalServerError) - w.Write(utils.Marshall(err.Error())) - return - } - - data, err := bridge.NatsUspInteraction( - local.NATS_MQTT_SUBJECT_PREFIX+sn+".api", - local.NATS_MQTT_ADAPTER_SUBJECT_PREFIX+sn+".api", - protoRecord, - w, - a.nc, - ) - if err != nil { - return - } - - var receivedRecord usp_record.Record - err = proto.Unmarshal(data, &receivedRecord) - if err != nil { - w.WriteHeader(http.StatusInternalServerError) - w.Write(utils.Marshall(err.Error())) - return - } - var receivedMsg usp_msg.Msg - err = proto.Unmarshal(receivedRecord.GetNoSessionContext().Payload, &receivedMsg) - if err != nil { - w.WriteHeader(http.StatusInternalServerError) - w.Write(utils.Marshall(err.Error())) - return - } - body := receivedMsg.Body.GetResponse() - utils.MarshallEncoder(body.GetGetSupportedDmResp(), w) -} - // func (a *Api) retrieveDevices(w http.ResponseWriter, r *http.Request) { // const PAGE_SIZE_LIMIT = 50 // const PAGE_SIZE_DEFAULT = 20 @@ -183,130 +103,3 @@ func (a *Api) deviceGetSupportedParametersMsg(w http.ResponseWriter, r *http.Req // log.Println(err) // } // } - -// func (a *Api) deviceCreateMsg(w http.ResponseWriter, r *http.Request) { -// vars := mux.Vars(r) -// sn := vars["sn"] -// device := a.deviceExists(sn, w) - -// var receiver usp_msg.Add - -// err := json.NewDecoder(r.Body).Decode(&receiver) -// if err != nil { -// log.Println(err) -// w.WriteHeader(http.StatusBadRequest) -// return -// } - -// msg := utils.NewCreateMsg(receiver) -// a.uspCall(msg, sn, w, device) -// } - -// func (a *Api) deviceGetMsg(w http.ResponseWriter, r *http.Request) { -// vars := mux.Vars(r) -// sn := vars["sn"] - -// device := a.deviceExists(sn, w) - -// var receiver usp_msg.Get - -// err := json.NewDecoder(r.Body).Decode(&receiver) -// if err != nil { -// log.Println(err) -// w.WriteHeader(http.StatusBadRequest) -// return -// } - -// msg := utils.NewGetMsg(receiver) -// a.uspCall(msg, sn, w, device) -// } - -// func (a *Api) deviceOperateMsg(w http.ResponseWriter, r *http.Request) { -// vars := mux.Vars(r) -// sn := vars["sn"] - -// device := a.deviceExists(sn, w) - -// var receiver usp_msg.Operate - -// err := json.NewDecoder(r.Body).Decode(&receiver) -// if err != nil { -// log.Println(err) -// w.WriteHeader(http.StatusBadRequest) -// return -// } - -// msg := utils.NewOperateMsg(receiver) -// a.uspCall(msg, sn, w, device) -// } - -// func (a *Api) deviceDeleteMsg(w http.ResponseWriter, r *http.Request) { -// vars := mux.Vars(r) -// sn := vars["sn"] -// device := a.deviceExists(sn, w) - -// var receiver usp_msg.Delete - -// err := json.NewDecoder(r.Body).Decode(&receiver) -// if err != nil { -// log.Println(err) -// w.WriteHeader(http.StatusBadRequest) -// return -// } - -// msg := utils.NewDelMsg(receiver) -// a.uspCall(msg, sn, w, device) - -// //a.Broker.Request(tr369Message, usp_msg.Header_GET, "oktopus/v1/agent/"+sn, "oktopus/v1/get/"+sn) - -// } - -// func (a *Api) deviceUpdateMsg(w http.ResponseWriter, r *http.Request) { -// vars := mux.Vars(r) -// sn := vars["sn"] -// device := a.deviceExists(sn, w) - -// var receiver usp_msg.Set - -// err := json.NewDecoder(r.Body).Decode(&receiver) -// if err != nil { -// log.Println(err) -// w.WriteHeader(http.StatusBadRequest) -// return -// } - -// msg := utils.NewSetMsg(receiver) -// a.uspCall(msg, sn, w, device) -// } - -// // TODO: react this function, return err and deal with it in the caller, remove header superfluos -// func (a *Api) deviceExists(sn string, w http.ResponseWriter) db.Device { -// device, err := a.Db.RetrieveDevice(sn) -// if err != nil { -// if err == mongo.ErrNoDocuments { -// w.WriteHeader(http.StatusBadRequest) -// json.NewEncoder(w).Encode("No device with serial number " + sn + " was found") -// } -// w.WriteHeader(http.StatusInternalServerError) -// return device -// } -// return device -// } - -// func (a *Api) deviceGetParameterInstances(w http.ResponseWriter, r *http.Request) { -// vars := mux.Vars(r) -// sn := vars["sn"] -// device := a.deviceExists(sn, w) - -// var receiver usp_msg.GetInstances - -// err := json.NewDecoder(r.Body).Decode(&receiver) -// if err != nil { -// log.Println(err) -// w.WriteHeader(http.StatusBadRequest) -// return -// } - -// msg := utils.NewGetParametersInstancesMsg(receiver) -// a.uspCall(msg, sn, w, device) -// } diff --git a/backend/services/controller/internal/api/usp.go b/backend/services/controller/internal/api/usp.go new file mode 100644 index 0000000..2dc0c8e --- /dev/null +++ b/backend/services/controller/internal/api/usp.go @@ -0,0 +1,274 @@ +package api + +import ( + "net/http" + + "github.com/leandrofars/oktopus/internal/bridge" + local "github.com/leandrofars/oktopus/internal/nats" + "github.com/leandrofars/oktopus/internal/usp/usp_msg" + "github.com/leandrofars/oktopus/internal/usp/usp_record" + "github.com/leandrofars/oktopus/internal/usp/usp_utils" + "github.com/leandrofars/oktopus/internal/utils" + "github.com/nats-io/nats.go" + "google.golang.org/protobuf/proto" +) + +func sendUspMsg(msg usp_msg.Msg, sn string, w http.ResponseWriter, nc *nats.Conn, mtp string) error { + + protoMsg, err := proto.Marshal(&msg) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + w.Write(utils.Marshall(err.Error())) + return err + } + + record := usp_utils.NewUspRecord(protoMsg, sn) + protoRecord, err := proto.Marshal(&record) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + w.Write(utils.Marshall(err.Error())) + return err + } + + data, err := bridge.NatsUspInteraction( + local.DEVICE_SUBJECT_PREFIX+sn+".api", + mtp+"-adapter.usp.v1."+sn+".api", + protoRecord, + w, + nc, + ) + if err != nil { + return err + } + + var receivedRecord usp_record.Record + err = proto.Unmarshal(data, &receivedRecord) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + w.Write(utils.Marshall(err.Error())) + return err + } + var receivedMsg usp_msg.Msg + err = proto.Unmarshal(receivedRecord.GetNoSessionContext().Payload, &receivedMsg) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + w.Write(utils.Marshall(err.Error())) + return err + } + + body := receivedMsg.Body.GetResponse() + + switch body.RespType.(type) { + case *usp_msg.Response_GetResp: + utils.MarshallEncoder(body.GetGetResp(), w) + case *usp_msg.Response_DeleteResp: + utils.MarshallEncoder(body.GetDeleteResp(), w) + case *usp_msg.Response_AddResp: + utils.MarshallEncoder(body.GetAddResp(), w) + case *usp_msg.Response_SetResp: + utils.MarshallEncoder(body.GetSetResp(), w) + case *usp_msg.Response_GetInstancesResp: + utils.MarshallEncoder(body.GetGetInstancesResp(), w) + case *usp_msg.Response_GetSupportedDmResp: + utils.MarshallEncoder(body.GetGetSupportedDmResp(), w) + case *usp_msg.Response_GetSupportedProtocolResp: + utils.MarshallEncoder(body.GetGetSupportedProtocolResp(), w) + case *usp_msg.Response_NotifyResp: + utils.MarshallEncoder(body.GetNotifyResp(), w) + case *usp_msg.Response_OperateResp: + utils.MarshallEncoder(body.GetOperateResp(), w) + default: + utils.MarshallEncoder("Unknown message answer", w) + } + + return nil +} + +func (a *Api) deviceGetMsg(w http.ResponseWriter, r *http.Request) { + + sn := getSerialNumberFromRequest(r) + mtp, err := getMtpFromRequest(r, w) + if err != nil { + return + } + + if mtp == "" { + var ok bool + mtp, ok = deviceStateOK(w, a.nc, sn) + if !ok { + return + } + } + + var get usp_msg.Get + + utils.MarshallDecoder(&get, r.Body) + msg := usp_utils.NewGetMsg(get) + + err = sendUspMsg(msg, sn, w, a.nc, mtp) + if err != nil { + return + } +} + +func (a *Api) deviceGetSupportedParametersMsg(w http.ResponseWriter, r *http.Request) { + + sn := getSerialNumberFromRequest(r) + mtp, err := getMtpFromRequest(r, w) + if err != nil { + return + } + + if mtp == "" { + var ok bool + mtp, ok = deviceStateOK(w, a.nc, sn) + if !ok { + return + } + } + + var getSupportedDM usp_msg.GetSupportedDM + + utils.MarshallDecoder(&getSupportedDM, r.Body) + msg := usp_utils.NewGetSupportedParametersMsg(getSupportedDM) + + err = sendUspMsg(msg, sn, w, a.nc, mtp) + if err != nil { + return + } +} + +func (a *Api) deviceOperateMsg(w http.ResponseWriter, r *http.Request) { + + sn := getSerialNumberFromRequest(r) + mtp, err := getMtpFromRequest(r, w) + if err != nil { + return + } + + if mtp == "" { + var ok bool + mtp, ok = deviceStateOK(w, a.nc, sn) + if !ok { + return + } + } + + var operate usp_msg.Operate + + utils.MarshallDecoder(&operate, r.Body) + msg := usp_utils.NewOperateMsg(operate) + + err = sendUspMsg(msg, sn, w, a.nc, mtp) + if err != nil { + return + } +} + +func (a *Api) deviceUpdateMsg(w http.ResponseWriter, r *http.Request) { + + sn := getSerialNumberFromRequest(r) + mtp, err := getMtpFromRequest(r, w) + if err != nil { + return + } + + if mtp == "" { + var ok bool + mtp, ok = deviceStateOK(w, a.nc, sn) + if !ok { + return + } + } + + var set usp_msg.Set + + utils.MarshallDecoder(&set, r.Body) + msg := usp_utils.NewSetMsg(set) + + err = sendUspMsg(msg, sn, w, a.nc, mtp) + if err != nil { + return + } +} + +func (a *Api) deviceGetParameterInstances(w http.ResponseWriter, r *http.Request) { + + sn := getSerialNumberFromRequest(r) + mtp, err := getMtpFromRequest(r, w) + if err != nil { + return + } + + if mtp == "" { + var ok bool + mtp, ok = deviceStateOK(w, a.nc, sn) + if !ok { + return + } + } + + var getInstances usp_msg.GetInstances + + utils.MarshallDecoder(&getInstances, r.Body) + msg := usp_utils.NewGetParametersInstancesMsg(getInstances) + + err = sendUspMsg(msg, sn, w, a.nc, mtp) + if err != nil { + return + } +} + +func (a *Api) deviceCreateMsg(w http.ResponseWriter, r *http.Request) { + + sn := getSerialNumberFromRequest(r) + mtp, err := getMtpFromRequest(r, w) + if err != nil { + return + } + + if mtp == "" { + var ok bool + mtp, ok = deviceStateOK(w, a.nc, sn) + if !ok { + return + } + } + + var add usp_msg.Add + + utils.MarshallDecoder(&add, r.Body) + msg := usp_utils.NewCreateMsg(add) + + err = sendUspMsg(msg, sn, w, a.nc, mtp) + if err != nil { + return + } +} + +func (a *Api) deviceDeleteMsg(w http.ResponseWriter, r *http.Request) { + + sn := getSerialNumberFromRequest(r) + mtp, err := getMtpFromRequest(r, w) + if err != nil { + return + } + + if mtp == "" { + var ok bool + mtp, ok = deviceStateOK(w, a.nc, sn) + if !ok { + return + } + } + + var del usp_msg.Delete + + utils.MarshallDecoder(&del, r.Body) + msg := usp_utils.NewDelMsg(del) + + err = sendUspMsg(msg, sn, w, a.nc, mtp) + if err != nil { + return + } +} diff --git a/backend/services/controller/internal/api/utils.go b/backend/services/controller/internal/api/utils.go new file mode 100644 index 0000000..77c20b1 --- /dev/null +++ b/backend/services/controller/internal/api/utils.go @@ -0,0 +1,92 @@ +package api + +import ( + "errors" + "net/http" + + "github.com/gorilla/mux" + "github.com/leandrofars/oktopus/internal/bridge" + "github.com/leandrofars/oktopus/internal/entity" + local "github.com/leandrofars/oktopus/internal/nats" + "github.com/leandrofars/oktopus/internal/utils" + "github.com/nats-io/nats.go" +) + +var errInvalidMtp = errors.New("Invalid MTP, valid options are: mqtt, ws, stomp") + +func deviceStateOK(w http.ResponseWriter, nc *nats.Conn, sn string) (string, bool) { + + device, err := getDeviceInfo(w, sn, nc) + if err != nil { + return "", false + } + + if !isDeviceOnline(w, device.Status) { + return "", false + } + + if device.Mqtt == entity.Online { + return entity.Mqtt, true + } + + if device.Websockets == entity.Online { + return entity.Websockets, true + } + + if device.Stomp == entity.Online { + return entity.Stomp, true + } + + return "", false +} + +func getSerialNumberFromRequest(r *http.Request) string { + vars := mux.Vars(r) + return vars["sn"] +} + +func getMtpFromRequest(r *http.Request, w http.ResponseWriter) (string, error) { + vars := mux.Vars(r) + switch vars["mtp"] { + case entity.Mqtt: + return entity.Mqtt, nil + case entity.Websockets: + return entity.Websockets, nil + case entity.Stomp: + return entity.Stomp, nil + case "any": + return "", nil + case ":mtp": + return "", nil + default: + w.WriteHeader(http.StatusBadRequest) + w.Write(utils.Marshall("Invalid MTP, valid options are: " + entity.Mqtt + ", " + entity.Websockets + ", " + entity.Stomp)) + return "", errInvalidMtp + } +} + +func isDeviceOnline(w http.ResponseWriter, deviceStatus entity.Status) bool { + if deviceStatus != entity.Online { + w.WriteHeader(http.StatusServiceUnavailable) + switch deviceStatus { + case entity.Offline: + w.Write(utils.Marshall("Device is offline")) + case entity.Associating: + w.Write(utils.Marshall("Device status is associating")) + default: + w.Write(utils.Marshall("Unknown device status")) + } + return false + } + return true +} + +func getDeviceInfo(w http.ResponseWriter, sn string, nc *nats.Conn) (device *entity.Device, err error) { + msg, err := bridge.NatsReq[entity.Device]( + local.NATS_ADAPTER_SUBJECT+sn+".device", + []byte(""), + w, + nc, + ) + return &msg.Msg, err +} diff --git a/backend/services/controller/internal/entity/mtp.go b/backend/services/controller/internal/entity/mtp.go new file mode 100644 index 0000000..6a3daa0 --- /dev/null +++ b/backend/services/controller/internal/entity/mtp.go @@ -0,0 +1,7 @@ +package entity + +const ( + Mqtt = "mqtt" + Websockets = "ws" + Stomp = "stomp" +) diff --git a/backend/services/controller/internal/nats/nats.go b/backend/services/controller/internal/nats/nats.go index c3dbdd6..2023a5e 100644 --- a/backend/services/controller/internal/nats/nats.go +++ b/backend/services/controller/internal/nats/nats.go @@ -15,6 +15,9 @@ const ( NATS_MQTT_SUBJECT_PREFIX = "mqtt.usp.v1." NATS_MQTT_ADAPTER_SUBJECT_PREFIX = "mqtt-adapter.usp.v1." NATS_ADAPTER_SUBJECT = "adapter.usp.v1." + NATS_WS_SUBJECT_PREFIX = "ws.usp.v1." + NATS_WS_ADAPTER_SUBJECT_PREFIX = "ws-adapter.usp.v1.*." + DEVICE_SUBJECT_PREFIX = "device.usp.v1." ) func StartNatsClient(c config.Nats) (jetstream.JetStream, *nats.Conn) { diff --git a/backend/services/mtp/adapter/internal/db/db.go b/backend/services/mtp/adapter/internal/db/db.go index 4a70408..33ceb0d 100644 --- a/backend/services/mtp/adapter/internal/db/db.go +++ b/backend/services/mtp/adapter/internal/db/db.go @@ -37,7 +37,7 @@ func NewDatabase(ctx context.Context, mongoUri string) Database { devices := client.Database("adapter").Collection("devices") createIndexes(ctx, devices) - resetDeviceStatus(ctx, devices) + //resetDeviceStatus(ctx, devices) db.devices = devices db.ctx = ctx @@ -46,21 +46,21 @@ func NewDatabase(ctx context.Context, mongoUri string) Database { return db } -func resetDeviceStatus(ctx context.Context, devices *mongo.Collection) { - _, err := devices.UpdateMany(ctx, bson.D{{}}, bson.D{ - { - "$set", bson.D{ - {"mqtt", 0}, - {"stomp", 0}, - {"websockets", 0}, - {"status", 0}, - }, - }, - }) - if err != nil { - log.Fatalln("ERROR to reset device status in database:", err) - } -} +// func resetDeviceStatus(ctx context.Context, devices *mongo.Collection) { +// _, err := devices.UpdateMany(ctx, bson.D{{}}, bson.D{ +// { +// "$set", bson.D{ +// {"mqtt", 0}, +// {"stomp", 0}, +// {"websockets", 0}, +// {"status", 0}, +// }, +// }, +// }) +// if err != nil { +// log.Fatalln("ERROR to reset device status in database:", err) +// } +// } func createIndexes(ctx context.Context, devices *mongo.Collection) { indexField := bson.M{"sn": 1} diff --git a/backend/services/mtp/adapter/internal/events/events.go b/backend/services/mtp/adapter/internal/events/events.go index 2415ac9..f5b5179 100644 --- a/backend/services/mtp/adapter/internal/events/events.go +++ b/backend/services/mtp/adapter/internal/events/events.go @@ -55,6 +55,7 @@ func StartEventsListener(ctx context.Context, js jetstream.JetStream, h handler. h.HandleDeviceInfo(device, msg.Subject(), data, event, func() { msg.Ack() }) default: log.Printf("Unknown message type received, subject: %s", msg.Subject()) + msg.Ack() } } }() diff --git a/backend/services/mtp/mqtt-adapter/internal/bridge/bridge.go b/backend/services/mtp/mqtt-adapter/internal/bridge/bridge.go index 2f7ef4d..b3f095a 100644 --- a/backend/services/mtp/mqtt-adapter/internal/bridge/bridge.go +++ b/backend/services/mtp/mqtt-adapter/internal/bridge/bridge.go @@ -21,6 +21,7 @@ const ( const NATS_MQTT_SUBJECT_PREFIX = "mqtt.usp.v1." const NATS_MQTT_ADAPTER_SUBJECT_PREFIX = "mqtt-adapter.usp.v1.*." +const DEVICE_SUBJECT_PREFIX = "device.usp.v1." const MQTT_TOPIC_PREFIX = "oktopus/usp/" type ( @@ -133,7 +134,7 @@ func (b *Bridge) mqttMessageHandler(status, controller, apiMsg chan *paho.Publis case c := <-controller: b.Pub(NATS_MQTT_SUBJECT_PREFIX+getDeviceFromTopic(c.Topic)+".info", c.Payload) case a := <-apiMsg: - b.Pub(NATS_MQTT_SUBJECT_PREFIX+getDeviceFromTopic(a.Topic)+".api", a.Payload) + b.Pub(DEVICE_SUBJECT_PREFIX+getDeviceFromTopic(a.Topic)+".api", a.Payload) } } } diff --git a/backend/services/mtp/ws-adapter/internal/bridge/bridge.go b/backend/services/mtp/ws-adapter/internal/bridge/bridge.go index a1d3a2c..6ddd29d 100644 --- a/backend/services/mtp/ws-adapter/internal/bridge/bridge.go +++ b/backend/services/mtp/ws-adapter/internal/bridge/bridge.go @@ -13,6 +13,7 @@ import ( "time" "github.com/OktopUSP/oktopus/backend/services/mtp/ws-adapter/internal/config" + "github.com/OktopUSP/oktopus/backend/services/mtp/ws-adapter/internal/usp/usp_msg" "github.com/OktopUSP/oktopus/backend/services/mtp/ws-adapter/internal/usp/usp_record" "github.com/gorilla/websocket" "github.com/nats-io/nats.go" @@ -22,7 +23,7 @@ import ( const ( NATS_WS_SUBJECT_PREFIX = "ws.usp.v1." NATS_WS_ADAPTER_SUBJECT_PREFIX = "ws-adapter.usp.v1.*." - WS_TOPIC_PREFIX = "oktopus/usp/" + DEVICE_SUBJECT_PREFIX = "device.usp.v1." WS_CONNECTION_RETRY = 10 * time.Second ) @@ -47,7 +48,8 @@ type Bridge struct { Ws config.Ws NewDeviceQueue map[string]string NewDevQMutex *sync.Mutex - Ctx context.Context + + Ctx context.Context } func NewBridge(p Publisher, s Subscriber, ctx context.Context, w config.Ws) *Bridge { @@ -105,23 +107,17 @@ func (b *Bridge) StartBridge() { b.newDeviceMsgHandler(wc, device, wsMsg) continue } + log.Println("Handle api request") + var msg usp_msg.Msg + err = proto.Unmarshal(record.GetNoSessionContext().Payload, &msg) + if err != nil { + log.Println(err) + continue + } + b.Pub(DEVICE_SUBJECT_PREFIX+device+".api", wsMsg) + continue } - // log.Println("Handle api request") - // var msg usp_msg.Msg - // err = proto.Unmarshal(record.GetNoSessionContext().Payload, &msg) - // if err != nil { - // log.Println(err) - // continue - // } - // if _, ok := w.MsgQueue[msg.Header.MsgId]; ok { - // //m.QMutex.Lock() - // w.MsgQueue[msg.Header.MsgId] <- msg - // //m.QMutex.Unlock() - // } else { - // log.Printf("Message answer to request %s arrived too late", msg.Header.MsgId) - // } - } }(wc) break @@ -151,6 +147,17 @@ func (b *Bridge) subscribe(wc *websocket.Conn) { return } }) + + b.Sub(NATS_WS_ADAPTER_SUBJECT_PREFIX+"api", func(msg *nats.Msg) { + + log.Printf("Received message on api subject") + + err := wc.WriteMessage(websocket.BinaryMessage, msg.Data) + if err != nil { + log.Printf("send websocket msg error: %q", err) + return + } + }) } func (b *Bridge) newDeviceMsgHandler(wc *websocket.Conn, device string, msg []byte) {