From 9924ad50c68bbccaac97f81e37d922af78dd8d20 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Leandro=20Ant=C3=B4nio=20Farias=20Machado?= Date: Sun, 2 Jul 2023 15:57:13 -0300 Subject: [PATCH] chore(device status): change topic communication to allow retained messages --- .../services/controller/cmd/oktopus/main.go | 4 ++-- .../services/controller/internal/mqtt/mqtt.go | 18 ++++++++++-------- backend/services/mochi/cmd/main.go | 8 ++++---- 3 files changed, 16 insertions(+), 14 deletions(-) diff --git a/backend/services/controller/cmd/oktopus/main.go b/backend/services/controller/cmd/oktopus/main.go index 81ceec3..6c38c42 100755 --- a/backend/services/controller/cmd/oktopus/main.go +++ b/backend/services/controller/cmd/oktopus/main.go @@ -45,8 +45,8 @@ func main() { log.Println("Starting Oktopus Project TR-369 Controller Version:", VERSION) // fl_endpointId := flag.String("endpoint_id", "proto::oktopus-controller", "Defines the enpoint id the Agent must trust on.") - flDevicesTopic := flag.String("d", "oktopus/devices", "That's the topic mqtt broker end new devices info.") - flDisconTopic := flag.String("dis", "oktopus/disconnect", "It's where disconnected IoTs are known.") + flDevicesTopic := flag.String("d", "oktopus/+/devices/+", "That's the topic mqtt broker end new devices info.") + flDisconTopic := flag.String("dis", "oktopus/+/disconnect/+", "It's where disconnected IoTs are known.") flSubTopic := flag.String("sub", "oktopus/+/controller/+", "That's the topic agent must publish to, and the controller keeps on listening.") flBrokerAddr := flag.String("a", "localhost", "Mqtt broker adrress") flBrokerPort := flag.String("p", "1883", "Mqtt broker port") diff --git a/backend/services/controller/internal/mqtt/mqtt.go b/backend/services/controller/internal/mqtt/mqtt.go index 9dd59f9..8f544db 100644 --- a/backend/services/controller/internal/mqtt/mqtt.go +++ b/backend/services/controller/internal/mqtt/mqtt.go @@ -126,11 +126,11 @@ func (m *Mqtt) Publish(msg []byte, topic, respTopic string) { func (m *Mqtt) buildClientConfig(devices, controller, disconnect, apiMsg chan *paho.Publish) *paho.ClientConfig { log.Println("Starting new mqtt client") singleHandler := paho.NewSingleHandlerRouter(func(p *paho.Publish) { - if p.Topic == m.DevicesTopic { + if strings.Contains(p.Topic, "devices") { devices <- p } else if strings.Contains(p.Topic, "controller") { controller <- p - } else if p.Topic == m.DisconnectTopic { + } else if strings.Contains(p.Topic, "disconnect") { disconnect <- p } else if strings.Contains(p.Topic, "api") { apiMsg <- p @@ -173,17 +173,19 @@ func (m *Mqtt) messageHandler(devices, controller, disconnect, apiMsg chan *paho for { select { case d := <-devices: - payload := string(d.Payload) - log.Println("New device: ", payload) - m.handleNewDevice(payload) + paths := strings.Split(d.Topic, "/") + device := paths[len(paths)-1] + log.Println("New device: ", device) + m.handleNewDevice(device) case c := <-controller: topic := c.Topic sn := strings.Split(topic, "/") m.handleNewDevicesResponse(c.Payload, sn[3]) case dis := <-disconnect: - payload := string(dis.Payload) - log.Println("Device disconnected: ", payload) - m.handleDevicesDisconnect(payload) + paths := strings.Split(dis.Topic, "/") + device := paths[len(paths)-1] + log.Println("Device disconnected: ", device) + m.handleDevicesDisconnect(device) case api := <-apiMsg: log.Println("Handle api request") m.handleApiRequest(api.Payload) diff --git a/backend/services/mochi/cmd/main.go b/backend/services/mochi/cmd/main.go index a3f3534..2eb9a7d 100644 --- a/backend/services/mochi/cmd/main.go +++ b/backend/services/mochi/cmd/main.go @@ -252,7 +252,7 @@ func (h *MyHook) OnDisconnect(cl *mqtt.Client, err error, expire bool) { } if clUser != "" { - err := server.Publish("oktopus/disconnect", []byte(clUser), false, 1) + err := server.Publish("oktopus/v1/disconnect/"+clUser, []byte(""), false, 1) if err != nil { log.Println("server publish error: ", err) } @@ -270,11 +270,11 @@ func (h *MyHook) OnSubscribed(cl *mqtt.Client, pk packets.Packet, reasonCodes [] if clUser != "" { cl.Properties.Will = mqtt.Will{ - Qos: 1, - Payload: []byte(clUser), + Qos: 1, + TopicName: "oktopus/v1/disconnect/" + clUser, } log.Println("new device:", clUser) - err := server.Publish("oktopus/devices", []byte(clUser), false, 1) + err := server.Publish("oktopus/v1/devices/"+clUser, []byte(""), false, 1) if err != nil { log.Println("server publish error: ", err) }