From 46743ab279b27c3f40e84969fb655b66e4dff0da Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Leandro=20Ant=C3=B4nio=20Farias=20Machado?= Date: Wed, 26 Apr 2023 01:19:18 -0300 Subject: [PATCH] feat: get route for rest api + async queue for translations between sync http and mqtt uncoupled protocol close #31 --- .../services/controller/cmd/oktopus/main.go | 8 +- backend/services/controller/go.mod | 1 + backend/services/controller/go.sum | 2 + .../services/controller/internal/api/api.go | 82 +++++++++++++++++-- .../services/controller/internal/db/device.go | 10 +++ .../services/controller/internal/mqtt/mqtt.go | 67 ++++++++++----- .../controller/internal/mqtt/mqtt_api.go | 52 ++++++++++++ .../services/controller/internal/mtp/mtp.go | 5 ++ .../controller/internal/utils/utils.go | 35 ++++++++ backend/services/mochi/cmd/auth.json | 6 +- 10 files changed, 240 insertions(+), 28 deletions(-) create mode 100644 backend/services/controller/internal/mqtt/mqtt_api.go diff --git a/backend/services/controller/cmd/oktopus/main.go b/backend/services/controller/cmd/oktopus/main.go index f7ee7b3..b6c6d14 100755 --- a/backend/services/controller/cmd/oktopus/main.go +++ b/backend/services/controller/cmd/oktopus/main.go @@ -7,9 +7,11 @@ import ( "flag" "github.com/leandrofars/oktopus/internal/api" "github.com/leandrofars/oktopus/internal/db" + usp_msg "github.com/leandrofars/oktopus/internal/usp_message" "log" "os" "os/signal" + "sync" "syscall" "github.com/leandrofars/oktopus/internal/mqtt" @@ -54,6 +56,8 @@ func main() { */ ctx, cancel := context.WithCancel(context.Background()) database := db.NewDatabase(ctx, *flAddrDB) + apiMsgQueue := make(map[string](chan usp_msg.Msg)) + var m sync.Mutex /* If you want to use another message protocol just make it implement Broker interface. */ @@ -70,10 +74,12 @@ func main() { DisconnectTopic: *flDisconTopic, CA: *flTlsCert, DB: database, + MsgQueue: apiMsgQueue, + QMutex: &m, } mtp.MtpService(&mqttClient, done) - a := api.NewApi(*flApiPort, database) + a := api.NewApi(*flApiPort, database, &mqttClient, apiMsgQueue, &m) api.StartApi(a) <-done diff --git a/backend/services/controller/go.mod b/backend/services/controller/go.mod index 8457d1a..5507d84 100755 --- a/backend/services/controller/go.mod +++ b/backend/services/controller/go.mod @@ -10,6 +10,7 @@ require ( require ( github.com/golang/snappy v0.0.1 // indirect + github.com/google/uuid v1.3.0 // indirect github.com/gorilla/mux v1.8.0 // indirect github.com/klauspost/compress v1.13.6 // indirect github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe // indirect diff --git a/backend/services/controller/go.sum b/backend/services/controller/go.sum index 19993ba..5d46421 100644 --- a/backend/services/controller/go.sum +++ b/backend/services/controller/go.sum @@ -9,6 +9,8 @@ github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEW github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI= github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= diff --git a/backend/services/controller/internal/api/api.go b/backend/services/controller/internal/api/api.go index 28969b4..fc19bb0 100644 --- a/backend/services/controller/internal/api/api.go +++ b/backend/services/controller/internal/api/api.go @@ -4,20 +4,32 @@ import ( "encoding/json" "github.com/gorilla/mux" "github.com/leandrofars/oktopus/internal/db" + "github.com/leandrofars/oktopus/internal/mtp" + usp_msg "github.com/leandrofars/oktopus/internal/usp_message" + "github.com/leandrofars/oktopus/internal/utils" + "go.mongodb.org/mongo-driver/mongo" + "google.golang.org/protobuf/proto" "log" "net/http" + "sync" "time" ) type Api struct { - Port string - Db db.Database + Port string + Db db.Database + Broker mtp.Broker + MsgQueue map[string](chan usp_msg.Msg) + QMutex *sync.Mutex } -func NewApi(port string, db db.Database) Api { +func NewApi(port string, db db.Database, b mtp.Broker, msgQueue map[string](chan usp_msg.Msg), m *sync.Mutex) Api { return Api{ - Port: port, - Db: db, + Port: port, + Db: db, + Broker: b, + MsgQueue: msgQueue, + QMutex: m, } } @@ -28,7 +40,7 @@ func StartApi(a Api) { return }) r.HandleFunc("/devices", a.retrieveDevices) - //r.HandleFunc("/devices/{sn}", a.devicesMessaging) + r.HandleFunc("/devices/{sn}/get", a.deviceGetMsg) srv := &http.Server{ Addr: "0.0.0.0:" + a.Port, @@ -61,3 +73,61 @@ func (a *Api) retrieveDevices(w http.ResponseWriter, r *http.Request) { return } + +func (a *Api) deviceGetMsg(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + sn := vars["sn"] + _, err := a.Db.RetrieveDevice(sn) + if err != nil { + if err == mongo.ErrNoDocuments { + w.WriteHeader(http.StatusBadRequest) + json.NewEncoder(w).Encode("No device with serial number " + sn + " was found") + return + } + w.WriteHeader(http.StatusInternalServerError) + return + } + var receiver usp_msg.Get + //data := []byte(`{"param_paths": {'Device.DeviceInfo.'},"max_depth": 2}`) + //data := []byte("'opa'") + //var jsonBlob = []byte(`{ + // "param_paths": ["Device.DeviceInfo.","Device.ManagementServer."], + // "max_depth": 2 + //}`) + err = json.NewDecoder(r.Body).Decode(receiver) + if err != nil { + log.Println(err) + w.WriteHeader(http.StatusBadRequest) + return + } + + msg := utils.NewGetMsg(receiver) + encodedMsg, err := proto.Marshal(&msg) + if err != nil { + log.Println(err) + w.WriteHeader(http.StatusBadRequest) + return + } + + record := utils.NewUspRecord(encodedMsg, sn) + tr369Message, err := proto.Marshal(&record) + if err != nil { + log.Fatalln("Failed to encode tr369 record:", err) + } + + //a.Broker.Request(tr369Message, usp_msg.Header_GET, "oktopus/v1/agent/"+sn, "oktopus/v1/get/"+sn) + a.MsgQueue[msg.Header.MsgId] = make(chan usp_msg.Msg) + a.Broker.Publish(tr369Message, "oktopus/v1/agent/"+sn, "oktopus/v1/api/"+sn) + + select { + case msg := <-a.MsgQueue[msg.Header.MsgId]: + log.Printf("Received Msg") + json.NewEncoder(w).Encode(msg) + return + case <-time.After(time.Second * 5): + log.Printf("Request Timed Out") + w.WriteHeader(http.StatusGatewayTimeout) + json.NewEncoder(w).Encode("Request Timed Out") + return + } +} diff --git a/backend/services/controller/internal/db/device.go b/backend/services/controller/internal/db/device.go index 6731f46..3ba7bf4 100644 --- a/backend/services/controller/internal/db/device.go +++ b/backend/services/controller/internal/db/device.go @@ -46,6 +46,16 @@ func (d *Database) RetrieveDevices() ([]Device, error) { return results, nil } +func (d *Database) RetrieveDevice(sn string) (Device, error) { + var result Device + //TODO: filter devices by user ownership + err := d.devices.FindOne(d.ctx, bson.D{{"sn", sn}}, nil).Decode(&result) + if err != nil { + log.Println(err) + } + return result, err +} + func (d *Database) DeleteDevice() { } diff --git a/backend/services/controller/internal/mqtt/mqtt.go b/backend/services/controller/internal/mqtt/mqtt.go index c8e1cd0..b1b3fa0 100644 --- a/backend/services/controller/internal/mqtt/mqtt.go +++ b/backend/services/controller/internal/mqtt/mqtt.go @@ -30,6 +30,8 @@ type Mqtt struct { DisconnectTopic string CA string DB db.Database + MsgQueue map[string](chan usp_msg.Msg) + QMutex *sync.Mutex } var c *paho.Client @@ -40,8 +42,9 @@ func (m *Mqtt) Connect() { devices := make(chan *paho.Publish) controller := make(chan *paho.Publish) disconnect := make(chan *paho.Publish) - go m.messageHandler(devices, controller, disconnect) - clientConfig := m.startClient(devices, controller, disconnect) + apiMsg := make(chan *paho.Publish) + go m.messageHandler(devices, controller, disconnect, apiMsg) + clientConfig := m.startClient(devices, controller, disconnect, apiMsg) connParameters := startConnection(m.Id, m.User, m.Passwd) conn, err := clientConfig.Connect(m.Ctx, &connParameters) @@ -72,6 +75,7 @@ func (m *Mqtt) Subscribe() { m.SubTopic: {QoS: byte(m.QoS), NoLocal: true}, m.DevicesTopic: {QoS: byte(m.QoS), NoLocal: true}, m.DisconnectTopic: {QoS: byte(m.QoS), NoLocal: true}, + "oktopus/+/api/+": {QoS: byte(m.QoS), NoLocal: true}, }, }); err != nil { log.Fatalln(err) @@ -101,7 +105,7 @@ func (m *Mqtt) Publish(msg []byte, topic, respTopic string) { /* -------------------------------------------------------------------------- */ -func (m *Mqtt) startClient(devices, controller, disconnect chan *paho.Publish) *paho.Client { +func (m *Mqtt) startClient(devices, controller, disconnect, apiMsg chan *paho.Publish) *paho.Client { singleHandler := paho.NewSingleHandlerRouter(func(p *paho.Publish) { if p.Topic == m.DevicesTopic { devices <- p @@ -109,6 +113,8 @@ func (m *Mqtt) startClient(devices, controller, disconnect chan *paho.Publish) * controller <- p } else if p.Topic == m.DisconnectTopic { disconnect <- p + } else if strings.Contains(p.Topic, "api") { + apiMsg <- p } else { log.Println("No handler for topic: ", p.Topic) } @@ -217,7 +223,7 @@ func startConnection(id, user, pass string) paho.Connect { return connParameters } -func (m *Mqtt) messageHandler(devices, controller, disconnect chan *paho.Publish) { +func (m *Mqtt) messageHandler(devices, controller, disconnect, apiMsg chan *paho.Publish) { for { select { case d := <-devices: @@ -225,15 +231,43 @@ func (m *Mqtt) messageHandler(devices, controller, disconnect chan *paho.Publish log.Println("New device: ", payload) m.handleNewDevice(payload) case c := <-controller: - m.handleDevicesResponse(c.Payload) + topic := c.Topic + sn := strings.Split(topic, "/") + m.handleNewDevicesResponse(c.Payload, sn[3]) case dis := <-disconnect: payload := string(dis.Payload) log.Println("Device disconnected: ", payload) m.handleDevicesDisconnect(payload) + case api := <-apiMsg: + log.Println("Handle api request") + m.handleApiRequest(api.Payload) } } } +func (m *Mqtt) handleApiRequest(api []byte) { + var record usp_record.Record + err := proto.Unmarshal(api, &record) + if err != nil { + log.Println(err) + } + + //TODO: verify record operation type + var msg usp_msg.Msg + err = proto.Unmarshal(record.GetNoSessionContext().Payload, &msg) + if err != nil { + log.Println(err) + } + + if _, ok := m.MsgQueue[msg.Header.MsgId]; ok { + //m.QMutex.Lock() + m.MsgQueue[msg.Header.MsgId] <- msg + //m.QMutex.Unlock() + } else { + log.Printf("Message answer to request %s arrived too late", msg.Header.MsgId) + } +} + func (m *Mqtt) handleNewDevice(deviceMac string) { payload := usp_msg.Msg{ Header: &usp_msg.Header{ @@ -259,26 +293,16 @@ func (m *Mqtt) handleNewDevice(deviceMac string) { }, } teste, _ := proto.Marshal(&payload) - record := usp_record.Record{ - Version: "0.1", - ToId: deviceMac, - FromId: "leleco", - PayloadSecurity: usp_record.Record_PLAINTEXT, - RecordType: &usp_record.Record_NoSessionContext{ - NoSessionContext: &usp_record.NoSessionContextRecord{ - Payload: teste, - }, - }, - } + record := utils.NewUspRecord(teste, deviceMac) tr369Message, err := proto.Marshal(&record) if err != nil { - log.Fatalln("Failed to encode address book:", err) + log.Fatalln("Failed to encode tr369 record:", err) } m.Publish(tr369Message, "oktopus/v1/agent/"+deviceMac, "oktopus/v1/controller/"+deviceMac) } -func (m *Mqtt) handleDevicesResponse(p []byte) { +func (m *Mqtt) handleNewDevicesResponse(p []byte, sn string) { var record usp_record.Record var message usp_msg.Msg @@ -297,7 +321,7 @@ func (m *Mqtt) handleDevicesResponse(p []byte) { device.Vendor = msg.ReqPathResults[0].ResolvedPathResults[0].ResultParams["Manufacturer"] device.Model = msg.ReqPathResults[1].ResolvedPathResults[0].ResultParams["ModelName"] device.Version = msg.ReqPathResults[2].ResolvedPathResults[0].ResultParams["SoftwareVersion"] - device.SN = msg.ReqPathResults[3].ResolvedPathResults[0].ResultParams["SerialNumber"] + device.SN = sn device.Status = utils.Online err = m.DB.CreateDevice(device) @@ -313,3 +337,8 @@ func (m *Mqtt) handleDevicesDisconnect(p string) { log.Fatal(err) } } + +/* +func (m *Mqtt) Request(msg []byte, msgType usp_msg.Header_MsgType, pubTopic string, respTopic string) { + m.Publish(msg, pubTopic, respTopic) +}*/ diff --git a/backend/services/controller/internal/mqtt/mqtt_api.go b/backend/services/controller/internal/mqtt/mqtt_api.go new file mode 100644 index 0000000..cbe686d --- /dev/null +++ b/backend/services/controller/internal/mqtt/mqtt_api.go @@ -0,0 +1,52 @@ +package mqtt + +// +//import ( +// usp_msg "github.com/leandrofars/oktopus/internal/usp_message" +// "github.com/leandrofars/oktopus/internal/usp_record" +// "google.golang.org/protobuf/proto" +// "log" +//) +// +//func SendGetMsg(sn string) { +// payload := usp_msg.Msg{ +// Header: &usp_msg.Header{ +// MsgId: "uniqueIdentifierForThismessage", +// MsgType: usp_msg.Header_GET, +// }, +// Body: &usp_msg.Body{ +// MsgBody: &usp_msg.Body_Request{ +// Request: &usp_msg.Request{ +// ReqType: &usp_msg.Request_Get{ +// Get: &usp_msg.Get{ +// ParamPaths: []string{ +// "Device.DeviceInfo.Manufacturer", +// "Device.DeviceInfo.ModelName", +// "Device.DeviceInfo.SoftwareVersion", +// }, +// MaxDepth: 1, +// }, +// }, +// }, +// }, +// }, +// } +// teste, _ := proto.Marshal(&payload) +// record := usp_record.Record{ +// Version: "0.1", +// ToId: sn, +// FromId: "leleco", +// PayloadSecurity: usp_record.Record_PLAINTEXT, +// RecordType: &usp_record.Record_NoSessionContext{ +// NoSessionContext: &usp_record.NoSessionContextRecord{ +// Payload: teste, +// }, +// }, +// } +// +// tr369Message, err := proto.Marshal(&record) +// if err != nil { +// log.Fatalln("Failed to encode address book:", err) +// } +// m.Publish(tr369Message, "oktopus/v1/agent/"+deviceMac, "oktopus/v1/controller/"+deviceMac) +//} diff --git a/backend/services/controller/internal/mtp/mtp.go b/backend/services/controller/internal/mtp/mtp.go index 5ba3583..c9ef304 100644 --- a/backend/services/controller/internal/mtp/mtp.go +++ b/backend/services/controller/internal/mtp/mtp.go @@ -15,6 +15,11 @@ type Broker interface { Disconnect() Publish(msg []byte, topic, respTopic string) Subscribe() + /* + At request method we're able to send a message to a topic + and wait until we have a response (in the same topic). + */ + //Request(msg []byte, msgType usp_msg.Header_MsgType, pubTopic string, subTopic string) } // Not used, since we are using a broker solution with MQTT. diff --git a/backend/services/controller/internal/utils/utils.go b/backend/services/controller/internal/utils/utils.go index db23301..c636c4b 100644 --- a/backend/services/controller/internal/utils/utils.go +++ b/backend/services/controller/internal/utils/utils.go @@ -1,6 +1,9 @@ package utils import ( + "github.com/google/uuid" + usp_msg "github.com/leandrofars/oktopus/internal/usp_message" + "github.com/leandrofars/oktopus/internal/usp_record" "net" ) @@ -26,3 +29,35 @@ func GetMacAddr() ([]string, error) { } return as, nil } + +func NewUspRecord(p []byte, toId string) usp_record.Record { + return usp_record.Record{ + Version: "0.1", + ToId: toId, + FromId: "leleco", + PayloadSecurity: usp_record.Record_PLAINTEXT, + RecordType: &usp_record.Record_NoSessionContext{ + NoSessionContext: &usp_record.NoSessionContextRecord{ + Payload: p, + }, + }, + } +} + +func NewGetMsg(getStuff usp_msg.Get) usp_msg.Msg { + return usp_msg.Msg{ + Header: &usp_msg.Header{ + MsgId: uuid.NewString(), + MsgType: usp_msg.Header_GET, + }, + Body: &usp_msg.Body{ + MsgBody: &usp_msg.Body_Request{ + Request: &usp_msg.Request{ + ReqType: &usp_msg.Request_Get{ + Get: &getStuff, + }, + }, + }, + }, + } +} diff --git a/backend/services/mochi/cmd/auth.json b/backend/services/mochi/cmd/auth.json index 65c72a9..f05253a 100644 --- a/backend/services/mochi/cmd/auth.json +++ b/backend/services/mochi/cmd/auth.json @@ -32,14 +32,16 @@ "username": "leandro", "filters": { "oktopus/+/agent/+": 1, - "oktopus/+/controller/+": 2 + "oktopus/+/controller/+": 2, + "oktopus/+/get/+": 2 } }, { "username": "steve", "filters": { "oktopus/+/agent/+": 1, - "oktopus/+/controller/+": 2 + "oktopus/+/controller/+": 2, + "oktopus/+/get/+": 2 } }, {