diff --git a/agent/oktopus-mqtt-obuspa.txt b/agent/oktopus-mqtt-obuspa.txt index 7d5b29a..f49caa0 100644 --- a/agent/oktopus-mqtt-obuspa.txt +++ b/agent/oktopus-mqtt-obuspa.txt @@ -31,7 +31,7 @@ Device.LocalAgent.Subscription.1.NotifType Event Device.LocalAgent.Subscription.1.ReferenceList Device.Boot! Device.LocalAgent.Subscription.1.Persistent true -Device.LocalAgent.MTP.1.MQTT.ResponseTopicConfigured "oktopus/v1/controller" +Device.LocalAgent.MTP.1.MQTT.ResponseTopicConfigured "oktopus/usp/v1/controller" Device.LocalAgent.MTP.1.MQTT.Reference "Device.MQTT.Client.1" Device.MQTT.Client.1.BrokerAddress "localhost" Device.MQTT.Client.1.ProtocolVersion "5.0" @@ -59,7 +59,7 @@ Device.LocalAgent.Controller.1.MTP.1.Enable true Device.LocalAgent.Controller.1.MTP.1.Protocol "MQTT" Device.LocalAgent.Controller.1.EndpointID "oktopusController" Device.LocalAgent.Controller.1.MTP.1.MQTT.Reference "Device.MQTT.Client.1" -Device.LocalAgent.Controller.1.MTP.1.MQTT.Topic "oktopus/v1/controller" +Device.LocalAgent.Controller.1.MTP.1.MQTT.Topic "oktopus/usp/v1/controller" diff --git a/backend/services/controller/cmd/rest-api/main.go b/backend/services/controller/cmd/controller/main.go similarity index 100% rename from backend/services/controller/cmd/rest-api/main.go rename to backend/services/controller/cmd/controller/main.go diff --git a/backend/services/controller/internal/api/api.go b/backend/services/controller/internal/api/api.go index 98cd42f..3c864a3 100644 --- a/backend/services/controller/internal/api/api.go +++ b/backend/services/controller/internal/api/api.go @@ -8,6 +8,7 @@ import ( "github.com/gorilla/mux" "github.com/leandrofars/oktopus/internal/api/cors" + "github.com/leandrofars/oktopus/internal/api/middleware" "github.com/leandrofars/oktopus/internal/bridge" "github.com/leandrofars/oktopus/internal/config" "github.com/leandrofars/oktopus/internal/db" @@ -44,19 +45,19 @@ func NewApi(c config.RestApi, js jetstream.JetStream, nc *nats.Conn, bridge brid func (a *Api) StartApi() { r := mux.NewRouter() - // authentication := r.PathPrefix("/api/auth").Subrouter() - // authentication.HandleFunc("/login", a.generateToken).Methods("PUT") - // authentication.HandleFunc("/register", a.registerUser).Methods("POST") - // authentication.HandleFunc("/admin/register", a.registerAdminUser).Methods("POST") - // authentication.HandleFunc("/admin/exists", a.adminUserExists).Methods("GET") - // iot := r.PathPrefix("/api/device").Subrouter() + authentication := r.PathPrefix("/api/auth").Subrouter() + authentication.HandleFunc("/login", a.generateToken).Methods("PUT") + authentication.HandleFunc("/register", a.registerUser).Methods("POST") + authentication.HandleFunc("/admin/register", a.registerAdminUser).Methods("POST") + authentication.HandleFunc("/admin/exists", a.adminUserExists).Methods("GET") + iot := r.PathPrefix("/api/device").Subrouter() // iot.HandleFunc("", a.retrieveDevices).Methods("GET") // iot.HandleFunc("/{id}", a.retrieveDevices).Methods("GET") // iot.HandleFunc("/{sn}/get", a.deviceGetMsg).Methods("PUT") // iot.HandleFunc("/{sn}/add", a.deviceCreateMsg).Methods("PUT") // iot.HandleFunc("/{sn}/del", a.deviceDeleteMsg).Methods("PUT") // iot.HandleFunc("/{sn}/set", a.deviceUpdateMsg).Methods("PUT") - // iot.HandleFunc("/{sn}/parameters", a.deviceGetSupportedParametersMsg).Methods("PUT") + iot.HandleFunc("/{sn}/parameters", a.deviceGetSupportedParametersMsg).Methods("PUT") // iot.HandleFunc("/{sn}/instances", a.deviceGetParameterInstances).Methods("PUT") // iot.HandleFunc("/{sn}/operate", a.deviceOperateMsg).Methods("PUT") // iot.HandleFunc("/{sn}/fw_update", a.deviceFwUpdate).Methods("PUT") @@ -72,9 +73,9 @@ func (a *Api) StartApi() { users.HandleFunc("", a.retrieveUsers).Methods("GET") /* ----- Middleware for requests which requires user to be authenticated ---- */ - // iot.Use(func(handler http.Handler) http.Handler { - // return middleware.Middleware(handler) - // }) + iot.Use(func(handler http.Handler) http.Handler { + return middleware.Middleware(handler) + }) // mtp.Use(func(handler http.Handler) http.Handler { // return middleware.Middleware(handler) diff --git a/backend/services/controller/internal/api/device.go b/backend/services/controller/internal/api/device.go new file mode 100644 index 0000000..252c39f --- /dev/null +++ b/backend/services/controller/internal/api/device.go @@ -0,0 +1,312 @@ +package api + +import ( + "net/http" + + "github.com/gorilla/mux" + "github.com/leandrofars/oktopus/internal/bridge" + "github.com/leandrofars/oktopus/internal/entity" + local "github.com/leandrofars/oktopus/internal/nats" + "github.com/leandrofars/oktopus/internal/usp/usp_msg" + "github.com/leandrofars/oktopus/internal/usp/usp_record" + "github.com/leandrofars/oktopus/internal/usp/usp_utils" + "github.com/leandrofars/oktopus/internal/utils" + "google.golang.org/protobuf/proto" +) + +func (a *Api) deviceGetSupportedParametersMsg(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + sn := vars["sn"] + + device, err := bridge.NatsReq[entity.Device]( + local.NATS_ADAPTER_SUBJECT+sn+".device", + []byte(""), + w, + a.nc, + ) + if err != nil { + return + } + + if device.Msg.Status != entity.Online { + w.WriteHeader(http.StatusServiceUnavailable) + w.Write(utils.Marshall("Device is offline")) + return + } + + var getSupportedDM usp_msg.GetSupportedDM + utils.MarshallDecoder(&getSupportedDM, r.Body) + msg := usp_utils.NewGetSupportedParametersMsg(getSupportedDM) + protoMsg, err := proto.Marshal(&msg) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + w.Write(utils.Marshall(err.Error())) + return + } + record := usp_utils.NewUspRecord(protoMsg, sn) + protoRecord, err := proto.Marshal(&record) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + w.Write(utils.Marshall(err.Error())) + return + } + + data, err := bridge.NatsUspInteraction( + local.NATS_MQTT_SUBJECT_PREFIX+sn+".api", + local.NATS_MQTT_ADAPTER_SUBJECT_PREFIX+sn+".api", + protoRecord, + w, + a.nc, + ) + if err != nil { + return + } + + var receivedRecord usp_record.Record + err = proto.Unmarshal(data, &receivedRecord) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + w.Write(utils.Marshall(err.Error())) + return + } + var receivedMsg usp_msg.Msg + err = proto.Unmarshal(receivedRecord.GetNoSessionContext().Payload, &receivedMsg) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + w.Write(utils.Marshall(err.Error())) + return + } + body := receivedMsg.Body.GetResponse() + utils.MarshallEncoder(body.GetGetSupportedDmResp(), w) +} + +// func (a *Api) retrieveDevices(w http.ResponseWriter, r *http.Request) { +// const PAGE_SIZE_LIMIT = 50 +// const PAGE_SIZE_DEFAULT = 20 + +// // Get specific device +// id := r.URL.Query().Get("id") +// if id != "" { +// device, err := a.Db.RetrieveDevice(id) +// if err != nil { +// if err == mongo.ErrNoDocuments { +// json.NewEncoder(w).Encode("Device id: " + id + " not found") +// return +// } +// json.NewEncoder(w).Encode(err) +// w.WriteHeader(http.StatusInternalServerError) +// return +// } +// err = json.NewEncoder(w).Encode(device) +// if err != nil { +// log.Println(err) +// } +// return +// } + +// // Get devices with pagination +// page_n := r.URL.Query().Get("page_number") +// page_s := r.URL.Query().Get("page_size") +// var err error + +// var page_number int64 +// if page_n == "" { +// page_number = 0 +// } else { +// page_number, err = strconv.ParseInt(page_n, 10, 64) +// if err != nil { +// w.WriteHeader(http.StatusBadRequest) +// json.NewEncoder(w).Encode("Page number must be an integer") +// return +// } +// } + +// var page_size int64 +// if page_s != "" { +// page_size, err = strconv.ParseInt(page_s, 10, 64) + +// if err != nil { +// w.WriteHeader(http.StatusBadRequest) +// json.NewEncoder(w).Encode("Page size must be an integer") +// return +// } + +// if page_size > PAGE_SIZE_LIMIT { +// w.WriteHeader(http.StatusBadRequest) +// json.NewEncoder(w).Encode("Page size must not exceed " + strconv.Itoa(PAGE_SIZE_LIMIT)) +// return +// } + +// } else { +// page_size = PAGE_SIZE_DEFAULT +// } + +// total, err := a.Db.RetrieveDevicesCount(bson.M{}) +// if err != nil { +// w.WriteHeader(http.StatusInternalServerError) +// json.NewEncoder(w).Encode("Unable to get devices count from database") +// return +// } + +// skip := page_number * (page_size - 1) +// if total < page_size { +// skip = 0 +// } + +// //TODO: Create filters +// //TODO: Create sorting + +// sort := bson.M{} +// sort["status"] = 1 + +// filter := bson.A{ +// //bson.M{"$match": filter}, +// bson.M{"$sort": sort}, // shows online devices first +// bson.M{"$skip": skip}, +// bson.M{"$limit": page_size}, +// } + +// devices, err := a.Db.RetrieveDevices(filter) +// if err != nil { +// w.WriteHeader(http.StatusInternalServerError) +// json.NewEncoder(w).Encode("Unable to aggregate database devices info") +// return +// } + +// err = json.NewEncoder(w).Encode(map[string]interface{}{ +// "pages": total / page_size, +// "page": page_number, +// "size": page_size, +// "devices": devices, +// }) +// if err != nil { +// log.Println(err) +// } +// } + +// func (a *Api) deviceCreateMsg(w http.ResponseWriter, r *http.Request) { +// vars := mux.Vars(r) +// sn := vars["sn"] +// device := a.deviceExists(sn, w) + +// var receiver usp_msg.Add + +// err := json.NewDecoder(r.Body).Decode(&receiver) +// if err != nil { +// log.Println(err) +// w.WriteHeader(http.StatusBadRequest) +// return +// } + +// msg := utils.NewCreateMsg(receiver) +// a.uspCall(msg, sn, w, device) +// } + +// func (a *Api) deviceGetMsg(w http.ResponseWriter, r *http.Request) { +// vars := mux.Vars(r) +// sn := vars["sn"] + +// device := a.deviceExists(sn, w) + +// var receiver usp_msg.Get + +// err := json.NewDecoder(r.Body).Decode(&receiver) +// if err != nil { +// log.Println(err) +// w.WriteHeader(http.StatusBadRequest) +// return +// } + +// msg := utils.NewGetMsg(receiver) +// a.uspCall(msg, sn, w, device) +// } + +// func (a *Api) deviceOperateMsg(w http.ResponseWriter, r *http.Request) { +// vars := mux.Vars(r) +// sn := vars["sn"] + +// device := a.deviceExists(sn, w) + +// var receiver usp_msg.Operate + +// err := json.NewDecoder(r.Body).Decode(&receiver) +// if err != nil { +// log.Println(err) +// w.WriteHeader(http.StatusBadRequest) +// return +// } + +// msg := utils.NewOperateMsg(receiver) +// a.uspCall(msg, sn, w, device) +// } + +// func (a *Api) deviceDeleteMsg(w http.ResponseWriter, r *http.Request) { +// vars := mux.Vars(r) +// sn := vars["sn"] +// device := a.deviceExists(sn, w) + +// var receiver usp_msg.Delete + +// err := json.NewDecoder(r.Body).Decode(&receiver) +// if err != nil { +// log.Println(err) +// w.WriteHeader(http.StatusBadRequest) +// return +// } + +// msg := utils.NewDelMsg(receiver) +// a.uspCall(msg, sn, w, device) + +// //a.Broker.Request(tr369Message, usp_msg.Header_GET, "oktopus/v1/agent/"+sn, "oktopus/v1/get/"+sn) + +// } + +// func (a *Api) deviceUpdateMsg(w http.ResponseWriter, r *http.Request) { +// vars := mux.Vars(r) +// sn := vars["sn"] +// device := a.deviceExists(sn, w) + +// var receiver usp_msg.Set + +// err := json.NewDecoder(r.Body).Decode(&receiver) +// if err != nil { +// log.Println(err) +// w.WriteHeader(http.StatusBadRequest) +// return +// } + +// msg := utils.NewSetMsg(receiver) +// a.uspCall(msg, sn, w, device) +// } + +// // TODO: react this function, return err and deal with it in the caller, remove header superfluos +// func (a *Api) deviceExists(sn string, w http.ResponseWriter) db.Device { +// device, err := a.Db.RetrieveDevice(sn) +// if err != nil { +// if err == mongo.ErrNoDocuments { +// w.WriteHeader(http.StatusBadRequest) +// json.NewEncoder(w).Encode("No device with serial number " + sn + " was found") +// } +// w.WriteHeader(http.StatusInternalServerError) +// return device +// } +// return device +// } + +// func (a *Api) deviceGetParameterInstances(w http.ResponseWriter, r *http.Request) { +// vars := mux.Vars(r) +// sn := vars["sn"] +// device := a.deviceExists(sn, w) + +// var receiver usp_msg.GetInstances + +// err := json.NewDecoder(r.Body).Decode(&receiver) +// if err != nil { +// log.Println(err) +// w.WriteHeader(http.StatusBadRequest) +// return +// } + +// msg := utils.NewGetParametersInstancesMsg(receiver) +// a.uspCall(msg, sn, w, device) +// } diff --git a/backend/services/controller/internal/bridge/bridge.go b/backend/services/controller/internal/bridge/bridge.go index 6ebe091..d3f35ce 100644 --- a/backend/services/controller/internal/bridge/bridge.go +++ b/backend/services/controller/internal/bridge/bridge.go @@ -2,18 +2,20 @@ package bridge import ( "encoding/json" + "errors" "log" "net/http" + "time" + "github.com/leandrofars/oktopus/internal/entity" local "github.com/leandrofars/oktopus/internal/nats" "github.com/leandrofars/oktopus/internal/utils" "github.com/nats-io/nats.go" "github.com/nats-io/nats.go/jetstream" ) -type DataType interface { - []map[string]interface{} -} +var errNatsMsgReceivedWithErrorData = errors.New("Nats message received with error data") +var errNatsRequestTimeout = errors.New("Nats message response timeout") type Bridge struct { js jetstream.JetStream @@ -27,29 +29,141 @@ func NewBridge(js jetstream.JetStream, nc *nats.Conn) Bridge { } } -func NatsReq[T DataType]( +func NatsUspInteraction( + subSubj, pubSubj string, + body []byte, + w http.ResponseWriter, + nc *nats.Conn, +) ([]byte, error) { + + var answer []byte + + ch := make(chan *nats.Msg, 64) + done := make(chan error) + _, err := nc.ChanSubscribe(subSubj, ch) + if err != nil { + log.Println(err) + w.WriteHeader(http.StatusInternalServerError) + w.Write(utils.Marshall("Error to communicate with nats: " + err.Error())) + return []byte{}, err + } + + go func() { + select { + case msg := <-ch: + log.Println("Received an usp message response") + answer = msg.Data + done <- nil + case <-time.After(local.NATS_REQUEST_TIMEOUT): + log.Println("usp message response timeout") + w.WriteHeader(http.StatusGatewayTimeout) + w.Write(utils.Marshall("usp message response timeout")) + done <- errNatsRequestTimeout + } + }() + + err = nc.Publish(pubSubj, body) + if err != nil { + log.Println(err) + w.WriteHeader(http.StatusInternalServerError) + w.Write(utils.Marshall("Error to communicate with nats: " + err.Error())) + return nil, err + } + + err = <-done + + return answer, err +} + +func NatsCustomReq[T entity.DataType]( + subSubj, pubSubj string, + body []byte, + w http.ResponseWriter, + nc *nats.Conn, +) (interface{}, error) { + + var answer T + + ch := make(chan *nats.Msg, 64) + done := make(chan string) + _, err := nc.ChanSubscribe(subSubj, ch) + if err != nil { + log.Println(err) + w.WriteHeader(http.StatusInternalServerError) + w.Write(utils.Marshall("Error to communicate with nats: " + err.Error())) + return nil, err + } + + select { + case msg := <-ch: + log.Println("Received an api message response") + err = json.Unmarshal(msg.Data, &answer) + if err != nil { + log.Println(err) + w.WriteHeader(http.StatusInternalServerError) + w.Write(msg.Data) + return nil, err + } + done <- "done" + case <-time.After(local.NATS_REQUEST_TIMEOUT): + log.Println("Api message response timeout") + done <- "timeout" + } + + err = nc.Publish(pubSubj, body) + if err != nil { + log.Println(err) + w.WriteHeader(http.StatusInternalServerError) + w.Write(utils.Marshall("Error to communicate with nats: " + err.Error())) + return nil, err + } + + <-done + + return nil, nil +} + +/* +- makes a request to nats topic + +- handle nats communication + +- verify if received data is of error type +*/ +func NatsReq[T entity.DataType]( subj string, body []byte, w http.ResponseWriter, nc *nats.Conn, -) (T, error) { +) (*entity.MsgAnswer[T], error) { - var answer T + var answer *entity.MsgAnswer[T] msg, err := nc.Request(subj, body, local.NATS_REQUEST_TIMEOUT) if err != nil { log.Println(err) - w.Write(utils.Marshall("Error to communicate with nats: " + err.Error())) w.WriteHeader(http.StatusInternalServerError) + w.Write(utils.Marshall("Error to communicate with nats: " + err.Error())) return nil, err } err = json.Unmarshal(msg.Data, &answer) if err != nil { - log.Println(err) - w.Write(msg.Data) - w.WriteHeader(http.StatusInternalServerError) - return nil, err + + var errMsg *entity.MsgAnswer[*string] + err = json.Unmarshal(msg.Data, &errMsg) + + if err != nil { + log.Println("Bad answer message formatting: ", err.Error()) + w.WriteHeader(http.StatusInternalServerError) + w.Write(msg.Data) + return nil, err + } + + log.Printf("Error message received, msg: %s, code: %d", *errMsg.Msg, errMsg.Code) + w.WriteHeader(errMsg.Code) + w.Write(utils.Marshall(*errMsg.Msg)) + return nil, errNatsMsgReceivedWithErrorData } return answer, nil diff --git a/backend/services/controller/internal/entity/device.go b/backend/services/controller/internal/entity/device.go new file mode 100644 index 0000000..f7812c5 --- /dev/null +++ b/backend/services/controller/internal/entity/device.go @@ -0,0 +1,14 @@ +package entity + +type Device struct { + SN string + Model string + Customer string + Vendor string + Version string + ProductClass string + Status Status + Mqtt Status + Stomp Status + Websockets Status +} diff --git a/backend/services/controller/internal/entity/msg.go b/backend/services/controller/internal/entity/msg.go new file mode 100644 index 0000000..9ba8498 --- /dev/null +++ b/backend/services/controller/internal/entity/msg.go @@ -0,0 +1,10 @@ +package entity + +type DataType interface { + []map[string]interface{} | *string | Device +} + +type MsgAnswer[T DataType] struct { + Code int + Msg T +} diff --git a/backend/services/controller/internal/entity/status.go b/backend/services/controller/internal/entity/status.go new file mode 100644 index 0000000..9e27033 --- /dev/null +++ b/backend/services/controller/internal/entity/status.go @@ -0,0 +1,9 @@ +package entity + +type Status uint8 + +const ( + Offline Status = iota + Associating + Online +) diff --git a/backend/services/controller/internal/entity/usp.go b/backend/services/controller/internal/entity/usp.go new file mode 100644 index 0000000..a642ecd --- /dev/null +++ b/backend/services/controller/internal/entity/usp.go @@ -0,0 +1,7 @@ +package entity + +import "github.com/leandrofars/oktopus/internal/usp/usp_msg" + +type UspType interface { + usp_msg.GetSupportedDM +} \ No newline at end of file diff --git a/backend/services/controller/internal/nats/nats.go b/backend/services/controller/internal/nats/nats.go index 24ad53b..c3dbdd6 100644 --- a/backend/services/controller/internal/nats/nats.go +++ b/backend/services/controller/internal/nats/nats.go @@ -10,8 +10,11 @@ import ( ) const ( - NATS_ACCOUNT_SUBJ_PREFIX = "account-manager.v1." - NATS_REQUEST_TIMEOUT = 5 * time.Second + NATS_ACCOUNT_SUBJ_PREFIX = "account-manager.v1." + NATS_REQUEST_TIMEOUT = 5 * time.Second + NATS_MQTT_SUBJECT_PREFIX = "mqtt.usp.v1." + NATS_MQTT_ADAPTER_SUBJECT_PREFIX = "mqtt-adapter.usp.v1." + NATS_ADAPTER_SUBJECT = "adapter.usp.v1." ) func StartNatsClient(c config.Nats) (jetstream.JetStream, *nats.Conn) { diff --git a/backend/services/controller/internal/utils/utils.go b/backend/services/controller/internal/utils/utils.go index 72a4679..c86d2f1 100644 --- a/backend/services/controller/internal/utils/utils.go +++ b/backend/services/controller/internal/utils/utils.go @@ -2,6 +2,7 @@ package utils import ( "encoding/json" + "io" "log" ) @@ -13,3 +14,17 @@ func Marshall(data any) []byte { } return fmtData } + +func MarshallEncoder(data any, w io.Writer) { + err := json.NewEncoder(w).Encode(data) + if err != nil { + log.Printf("Error to encode message into json: %q", err) + } +} + +func MarshallDecoder(data any, r io.Reader) { + err := json.NewDecoder(r).Decode(data) + if err != nil { + log.Printf("Error to decode message into json: %q", err) + } +} diff --git a/backend/services/controller/run.sh b/backend/services/controller/run.sh deleted file mode 100644 index bb0a11c..0000000 --- a/backend/services/controller/run.sh +++ /dev/null @@ -1 +0,0 @@ -go run cmd/oktopus/main.go -u root -P root -mongo mongodb://172.16.238.3:27017/ \ No newline at end of file diff --git a/backend/services/mtp/adapter/internal/db/db.go b/backend/services/mtp/adapter/internal/db/db.go index 3c51550..4a70408 100644 --- a/backend/services/mtp/adapter/internal/db/db.go +++ b/backend/services/mtp/adapter/internal/db/db.go @@ -37,6 +37,7 @@ func NewDatabase(ctx context.Context, mongoUri string) Database { devices := client.Database("adapter").Collection("devices") createIndexes(ctx, devices) + resetDeviceStatus(ctx, devices) db.devices = devices db.ctx = ctx @@ -45,6 +46,22 @@ func NewDatabase(ctx context.Context, mongoUri string) Database { return db } +func resetDeviceStatus(ctx context.Context, devices *mongo.Collection) { + _, err := devices.UpdateMany(ctx, bson.D{{}}, bson.D{ + { + "$set", bson.D{ + {"mqtt", 0}, + {"stomp", 0}, + {"websockets", 0}, + {"status", 0}, + }, + }, + }) + if err != nil { + log.Fatalln("ERROR to reset device status in database:", err) + } +} + func createIndexes(ctx context.Context, devices *mongo.Collection) { indexField := bson.M{"sn": 1} _, err := devices.Indexes().CreateOne(ctx, mongo.IndexModel{ diff --git a/backend/services/mtp/adapter/internal/reqs/reqs.go b/backend/services/mtp/adapter/internal/reqs/reqs.go index 887d4ae..5d6e028 100644 --- a/backend/services/mtp/adapter/internal/reqs/reqs.go +++ b/backend/services/mtp/adapter/internal/reqs/reqs.go @@ -28,8 +28,7 @@ func StartRequestsListener(ctx context.Context, nc *nats.Conn, db db.Database) { deviceInfo, err := db.RetrieveDevice(device) if deviceInfo.SN != "" { - body, _ := json.Marshal(deviceInfo) - respondMsg(msg.Respond, 200, body) + respondMsg(msg.Respond, 200, deviceInfo) } else { if err != nil { if err == mongo.ErrNoDocuments { diff --git a/backend/services/mtp/mqtt-adapter/internal/bridge/bridge.go b/backend/services/mtp/mqtt-adapter/internal/bridge/bridge.go index 0569f18..2f7ef4d 100644 --- a/backend/services/mtp/mqtt-adapter/internal/bridge/bridge.go +++ b/backend/services/mtp/mqtt-adapter/internal/bridge/bridge.go @@ -101,6 +101,21 @@ func (b *Bridge) natsMessageHandler(cm *autopaho.ConnectionManager) { ResponseTopic: "oktopus/usp/v1/controller/" + getDeviceFromSubject(m.Subject), }, }) + + }) + + b.Sub(NATS_MQTT_ADAPTER_SUBJECT_PREFIX+"api", func(m *nats.Msg) { + + log.Printf("Received message on api subject") + cm.Publish(b.Ctx, &paho.Publish{ + QoS: byte(b.Mqtt.Qos), + Topic: MQTT_TOPIC_PREFIX + "v1/agent/" + getDeviceFromSubject(m.Subject), + Payload: m.Data, + Properties: &paho.PublishProperties{ + ResponseTopic: "oktopus/usp/v1/api/" + getDeviceFromSubject(m.Subject), + }, + }) + }) }