chore(device status): change topic communication to allow retained messages

This commit is contained in:
Leandro Antônio Farias Machado 2023-07-02 15:57:13 -03:00
parent 5f64795225
commit 9924ad50c6
3 changed files with 16 additions and 14 deletions

View File

@ -45,8 +45,8 @@ func main() {
log.Println("Starting Oktopus Project TR-369 Controller Version:", VERSION)
// fl_endpointId := flag.String("endpoint_id", "proto::oktopus-controller", "Defines the enpoint id the Agent must trust on.")
flDevicesTopic := flag.String("d", "oktopus/devices", "That's the topic mqtt broker end new devices info.")
flDisconTopic := flag.String("dis", "oktopus/disconnect", "It's where disconnected IoTs are known.")
flDevicesTopic := flag.String("d", "oktopus/+/devices/+", "That's the topic mqtt broker end new devices info.")
flDisconTopic := flag.String("dis", "oktopus/+/disconnect/+", "It's where disconnected IoTs are known.")
flSubTopic := flag.String("sub", "oktopus/+/controller/+", "That's the topic agent must publish to, and the controller keeps on listening.")
flBrokerAddr := flag.String("a", "localhost", "Mqtt broker adrress")
flBrokerPort := flag.String("p", "1883", "Mqtt broker port")

View File

@ -126,11 +126,11 @@ func (m *Mqtt) Publish(msg []byte, topic, respTopic string) {
func (m *Mqtt) buildClientConfig(devices, controller, disconnect, apiMsg chan *paho.Publish) *paho.ClientConfig {
log.Println("Starting new mqtt client")
singleHandler := paho.NewSingleHandlerRouter(func(p *paho.Publish) {
if p.Topic == m.DevicesTopic {
if strings.Contains(p.Topic, "devices") {
devices <- p
} else if strings.Contains(p.Topic, "controller") {
controller <- p
} else if p.Topic == m.DisconnectTopic {
} else if strings.Contains(p.Topic, "disconnect") {
disconnect <- p
} else if strings.Contains(p.Topic, "api") {
apiMsg <- p
@ -173,17 +173,19 @@ func (m *Mqtt) messageHandler(devices, controller, disconnect, apiMsg chan *paho
for {
select {
case d := <-devices:
payload := string(d.Payload)
log.Println("New device: ", payload)
m.handleNewDevice(payload)
paths := strings.Split(d.Topic, "/")
device := paths[len(paths)-1]
log.Println("New device: ", device)
m.handleNewDevice(device)
case c := <-controller:
topic := c.Topic
sn := strings.Split(topic, "/")
m.handleNewDevicesResponse(c.Payload, sn[3])
case dis := <-disconnect:
payload := string(dis.Payload)
log.Println("Device disconnected: ", payload)
m.handleDevicesDisconnect(payload)
paths := strings.Split(dis.Topic, "/")
device := paths[len(paths)-1]
log.Println("Device disconnected: ", device)
m.handleDevicesDisconnect(device)
case api := <-apiMsg:
log.Println("Handle api request")
m.handleApiRequest(api.Payload)

View File

@ -252,7 +252,7 @@ func (h *MyHook) OnDisconnect(cl *mqtt.Client, err error, expire bool) {
}
if clUser != "" {
err := server.Publish("oktopus/disconnect", []byte(clUser), false, 1)
err := server.Publish("oktopus/v1/disconnect/"+clUser, []byte(""), false, 1)
if err != nil {
log.Println("server publish error: ", err)
}
@ -270,11 +270,11 @@ func (h *MyHook) OnSubscribed(cl *mqtt.Client, pk packets.Packet, reasonCodes []
if clUser != "" {
cl.Properties.Will = mqtt.Will{
Qos: 1,
Payload: []byte(clUser),
Qos: 1,
TopicName: "oktopus/v1/disconnect/" + clUser,
}
log.Println("new device:", clUser)
err := server.Publish("oktopus/devices", []byte(clUser), false, 1)
err := server.Publish("oktopus/v1/devices/"+clUser, []byte(""), false, 1)
if err != nil {
log.Println("server publish error: ", err)
}