chore(device status): flow of messages and retention change

This commit is contained in:
Leandro Antônio Farias Machado 2023-07-03 20:11:51 -03:00
parent 9924ad50c6
commit b2c102fcf3
5 changed files with 85 additions and 70 deletions

View File

@ -45,8 +45,7 @@ func main() {
log.Println("Starting Oktopus Project TR-369 Controller Version:", VERSION) log.Println("Starting Oktopus Project TR-369 Controller Version:", VERSION)
// fl_endpointId := flag.String("endpoint_id", "proto::oktopus-controller", "Defines the enpoint id the Agent must trust on.") // fl_endpointId := flag.String("endpoint_id", "proto::oktopus-controller", "Defines the enpoint id the Agent must trust on.")
flDevicesTopic := flag.String("d", "oktopus/+/devices/+", "That's the topic mqtt broker end new devices info.") flDevicesTopic := flag.String("d", "oktopus/+/status/+", "That's the topic mqtt broker end new devices info.")
flDisconTopic := flag.String("dis", "oktopus/+/disconnect/+", "It's where disconnected IoTs are known.")
flSubTopic := flag.String("sub", "oktopus/+/controller/+", "That's the topic agent must publish to, and the controller keeps on listening.") flSubTopic := flag.String("sub", "oktopus/+/controller/+", "That's the topic agent must publish to, and the controller keeps on listening.")
flBrokerAddr := flag.String("a", "localhost", "Mqtt broker adrress") flBrokerAddr := flag.String("a", "localhost", "Mqtt broker adrress")
flBrokerPort := flag.String("p", "1883", "Mqtt broker port") flBrokerPort := flag.String("p", "1883", "Mqtt broker port")
@ -54,7 +53,7 @@ func main() {
flBrokerUsername := flag.String("u", "", "Mqtt broker username") flBrokerUsername := flag.String("u", "", "Mqtt broker username")
flBrokerPassword := flag.String("P", "", "Mqtt broker password") flBrokerPassword := flag.String("P", "", "Mqtt broker password")
flBrokerClientId := flag.String("i", "", "A clientid for the Mqtt connection") flBrokerClientId := flag.String("i", "", "A clientid for the Mqtt connection")
flBrokerQos := flag.Int("q", 2, "Quality of service of mqtt messages delivery") flBrokerQos := flag.Int("q", 0, "Quality of service of mqtt messages delivery")
flAddrDB := flag.String("mongo", "mongodb://localhost:27017/", "MongoDB URI") flAddrDB := flag.String("mongo", "mongodb://localhost:27017/", "MongoDB URI")
flApiPort := flag.String("ap", "8000", "Rest api port") flApiPort := flag.String("ap", "8000", "Rest api port")
flHelp := flag.Bool("help", false, "Help") flHelp := flag.Bool("help", false, "Help")
@ -77,20 +76,19 @@ func main() {
If you want to use another message protocol just make it implement Broker interface. If you want to use another message protocol just make it implement Broker interface.
*/ */
mqttClient := mqtt.Mqtt{ mqttClient := mqtt.Mqtt{
Addr: *flBrokerAddr, Addr: *flBrokerAddr,
Port: *flBrokerPort, Port: *flBrokerPort,
Id: *flBrokerClientId, Id: *flBrokerClientId,
User: *flBrokerUsername, User: *flBrokerUsername,
Passwd: *flBrokerPassword, Passwd: *flBrokerPassword,
Ctx: ctx, Ctx: ctx,
QoS: *flBrokerQos, QoS: *flBrokerQos,
SubTopic: *flSubTopic, SubTopic: *flSubTopic,
DevicesTopic: *flDevicesTopic, DevicesTopic: *flDevicesTopic,
DisconnectTopic: *flDisconTopic, TLS: *flTlsCert,
TLS: *flTlsCert, DB: database,
DB: database, MsgQueue: apiMsgQueue,
MsgQueue: apiMsgQueue, QMutex: &m,
QMutex: &m,
} }
mtp.MtpService(&mqttClient, done) mtp.MtpService(&mqttClient, done)

View File

@ -171,7 +171,7 @@ func (a *Api) deviceFwUpdate(w http.ResponseWriter, r *http.Request) {
a.MsgQueue[msg.Header.MsgId] = make(chan usp_msg.Msg) a.MsgQueue[msg.Header.MsgId] = make(chan usp_msg.Msg)
log.Println("Sending Msg:", msg.Header.MsgId) log.Println("Sending Msg:", msg.Header.MsgId)
a.Broker.Publish(tr369Message, "oktopus/v1/agent/"+sn, "oktopus/v1/api/"+sn) a.Broker.Publish(tr369Message, "oktopus/v1/agent/"+sn, "oktopus/v1/api/"+sn, false)
var getMsgAnswer *usp_msg.GetResp var getMsgAnswer *usp_msg.GetResp
@ -231,7 +231,7 @@ func (a *Api) deviceFwUpdate(w http.ResponseWriter, r *http.Request) {
a.MsgQueue[msg.Header.MsgId] = make(chan usp_msg.Msg) a.MsgQueue[msg.Header.MsgId] = make(chan usp_msg.Msg)
log.Println("Sending Msg:", msg.Header.MsgId) log.Println("Sending Msg:", msg.Header.MsgId)
a.Broker.Publish(tr369Message, "oktopus/v1/agent/"+sn, "oktopus/v1/api/"+sn) a.Broker.Publish(tr369Message, "oktopus/v1/agent/"+sn, "oktopus/v1/api/"+sn, false)
select { select {
case msg := <-a.MsgQueue[msg.Header.MsgId]: case msg := <-a.MsgQueue[msg.Header.MsgId]:
@ -281,7 +281,7 @@ func (a *Api) deviceGetParameterInstances(w http.ResponseWriter, r *http.Request
//a.Broker.Request(tr369Message, usp_msg.Header_GET, "oktopus/v1/agent/"+sn, "oktopus/v1/get/"+sn) //a.Broker.Request(tr369Message, usp_msg.Header_GET, "oktopus/v1/agent/"+sn, "oktopus/v1/get/"+sn)
a.MsgQueue[msg.Header.MsgId] = make(chan usp_msg.Msg) a.MsgQueue[msg.Header.MsgId] = make(chan usp_msg.Msg)
log.Println("Sending Msg:", msg.Header.MsgId) log.Println("Sending Msg:", msg.Header.MsgId)
a.Broker.Publish(tr369Message, "oktopus/v1/agent/"+sn, "oktopus/v1/api/"+sn) a.Broker.Publish(tr369Message, "oktopus/v1/agent/"+sn, "oktopus/v1/api/"+sn, false)
select { select {
case msg := <-a.MsgQueue[msg.Header.MsgId]: case msg := <-a.MsgQueue[msg.Header.MsgId]:
@ -331,7 +331,7 @@ func (a *Api) deviceGetSupportedParametersMsg(w http.ResponseWriter, r *http.Req
//a.Broker.Request(tr369Message, usp_msg.Header_GET, "oktopus/v1/agent/"+sn, "oktopus/v1/get/"+sn) //a.Broker.Request(tr369Message, usp_msg.Header_GET, "oktopus/v1/agent/"+sn, "oktopus/v1/get/"+sn)
a.MsgQueue[msg.Header.MsgId] = make(chan usp_msg.Msg) a.MsgQueue[msg.Header.MsgId] = make(chan usp_msg.Msg)
log.Println("Sending Msg:", msg.Header.MsgId) log.Println("Sending Msg:", msg.Header.MsgId)
a.Broker.Publish(tr369Message, "oktopus/v1/agent/"+sn, "oktopus/v1/api/"+sn) a.Broker.Publish(tr369Message, "oktopus/v1/agent/"+sn, "oktopus/v1/api/"+sn, false)
select { select {
case msg := <-a.MsgQueue[msg.Header.MsgId]: case msg := <-a.MsgQueue[msg.Header.MsgId]:
@ -381,7 +381,7 @@ func (a *Api) deviceCreateMsg(w http.ResponseWriter, r *http.Request) {
//a.Broker.Request(tr369Message, usp_msg.Header_GET, "oktopus/v1/agent/"+sn, "oktopus/v1/get/"+sn) //a.Broker.Request(tr369Message, usp_msg.Header_GET, "oktopus/v1/agent/"+sn, "oktopus/v1/get/"+sn)
a.MsgQueue[msg.Header.MsgId] = make(chan usp_msg.Msg) a.MsgQueue[msg.Header.MsgId] = make(chan usp_msg.Msg)
log.Println("Sending Msg:", msg.Header.MsgId) log.Println("Sending Msg:", msg.Header.MsgId)
a.Broker.Publish(tr369Message, "oktopus/v1/agent/"+sn, "oktopus/v1/api/"+sn) a.Broker.Publish(tr369Message, "oktopus/v1/agent/"+sn, "oktopus/v1/api/"+sn, false)
select { select {
case msg := <-a.MsgQueue[msg.Header.MsgId]: case msg := <-a.MsgQueue[msg.Header.MsgId]:
@ -432,7 +432,7 @@ func (a *Api) deviceGetMsg(w http.ResponseWriter, r *http.Request) {
a.MsgQueue[msg.Header.MsgId] = make(chan usp_msg.Msg) a.MsgQueue[msg.Header.MsgId] = make(chan usp_msg.Msg)
log.Println("Sending Msg:", msg.Header.MsgId) log.Println("Sending Msg:", msg.Header.MsgId)
a.Broker.Publish(tr369Message, "oktopus/v1/agent/"+sn, "oktopus/v1/api/"+sn) a.Broker.Publish(tr369Message, "oktopus/v1/agent/"+sn, "oktopus/v1/api/"+sn, false)
select { select {
case msg := <-a.MsgQueue[msg.Header.MsgId]: case msg := <-a.MsgQueue[msg.Header.MsgId]:
@ -482,7 +482,7 @@ func (a *Api) deviceDeleteMsg(w http.ResponseWriter, r *http.Request) {
//a.Broker.Request(tr369Message, usp_msg.Header_GET, "oktopus/v1/agent/"+sn, "oktopus/v1/get/"+sn) //a.Broker.Request(tr369Message, usp_msg.Header_GET, "oktopus/v1/agent/"+sn, "oktopus/v1/get/"+sn)
a.MsgQueue[msg.Header.MsgId] = make(chan usp_msg.Msg) a.MsgQueue[msg.Header.MsgId] = make(chan usp_msg.Msg)
log.Println("Sending Msg:", msg.Header.MsgId) log.Println("Sending Msg:", msg.Header.MsgId)
a.Broker.Publish(tr369Message, "oktopus/v1/agent/"+sn, "oktopus/v1/api/"+sn) a.Broker.Publish(tr369Message, "oktopus/v1/agent/"+sn, "oktopus/v1/api/"+sn, false)
select { select {
case msg := <-a.MsgQueue[msg.Header.MsgId]: case msg := <-a.MsgQueue[msg.Header.MsgId]:
@ -532,7 +532,7 @@ func (a *Api) deviceUpdateMsg(w http.ResponseWriter, r *http.Request) {
//a.Broker.Request(tr369Message, usp_msg.Header_GET, "oktopus/v1/agent/"+sn, "oktopus/v1/get/"+sn) //a.Broker.Request(tr369Message, usp_msg.Header_GET, "oktopus/v1/agent/"+sn, "oktopus/v1/get/"+sn)
a.MsgQueue[msg.Header.MsgId] = make(chan usp_msg.Msg) a.MsgQueue[msg.Header.MsgId] = make(chan usp_msg.Msg)
log.Println("Sending Msg:", msg.Header.MsgId) log.Println("Sending Msg:", msg.Header.MsgId)
a.Broker.Publish(tr369Message, "oktopus/v1/agent/"+sn, "oktopus/v1/api/"+sn) a.Broker.Publish(tr369Message, "oktopus/v1/agent/"+sn, "oktopus/v1/api/"+sn, false)
select { select {
case msg := <-a.MsgQueue[msg.Header.MsgId]: case msg := <-a.MsgQueue[msg.Header.MsgId]:

View File

@ -11,28 +11,33 @@ import (
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
"log" "log"
"net/url" "net/url"
"strconv"
"strings" "strings"
"sync" "sync"
"time" "time"
) )
type Mqtt struct { type Mqtt struct {
Addr string Addr string
Port string Port string
Id string Id string
User string User string
Passwd string Passwd string
Ctx context.Context Ctx context.Context
QoS int QoS int
SubTopic string SubTopic string
DevicesTopic string DevicesTopic string
DisconnectTopic string TLS bool
TLS bool DB db.Database
DB db.Database MsgQueue map[string](chan usp_msg.Msg)
MsgQueue map[string](chan usp_msg.Msg) QMutex *sync.Mutex
QMutex *sync.Mutex
} }
const (
ONLINE = iota
OFFLINE
)
var c *autopaho.ConnectionManager var c *autopaho.ConnectionManager
/* ------------------- Implementations of broker interface ------------------ */ /* ------------------- Implementations of broker interface ------------------ */
@ -41,13 +46,12 @@ func (m *Mqtt) Connect() {
broker, _ := url.Parse("tcp://" + m.Addr + ":" + m.Port) broker, _ := url.Parse("tcp://" + m.Addr + ":" + m.Port)
devices := make(chan *paho.Publish) status := make(chan *paho.Publish)
controller := make(chan *paho.Publish) controller := make(chan *paho.Publish)
disconnect := make(chan *paho.Publish)
apiMsg := make(chan *paho.Publish) apiMsg := make(chan *paho.Publish)
go m.messageHandler(devices, controller, disconnect, apiMsg) go m.messageHandler(status, controller, apiMsg)
pahoClientConfig := m.buildClientConfig(devices, controller, disconnect, apiMsg) pahoClientConfig := m.buildClientConfig(status, controller, apiMsg)
autopahoClientConfig := autopaho.ClientConfig{ autopahoClientConfig := autopaho.ClientConfig{
BrokerUrls: []*url.URL{broker}, BrokerUrls: []*url.URL{broker},
@ -90,10 +94,9 @@ func (m *Mqtt) Disconnect() {
func (m *Mqtt) Subscribe() { func (m *Mqtt) Subscribe() {
if _, err := c.Subscribe(m.Ctx, &paho.Subscribe{ if _, err := c.Subscribe(m.Ctx, &paho.Subscribe{
Subscriptions: map[string]paho.SubscribeOptions{ Subscriptions: map[string]paho.SubscribeOptions{
m.SubTopic: {QoS: byte(m.QoS), NoLocal: true}, m.SubTopic: {QoS: byte(m.QoS)},
m.DevicesTopic: {QoS: byte(m.QoS), NoLocal: true}, m.DevicesTopic: {QoS: byte(m.QoS)},
m.DisconnectTopic: {QoS: byte(m.QoS), NoLocal: true}, "oktopus/+/api/+": {QoS: byte(m.QoS)},
"oktopus/+/api/+": {QoS: byte(m.QoS), NoLocal: true},
}, },
}); err != nil { }); err != nil {
log.Fatalln(err) log.Fatalln(err)
@ -101,15 +104,14 @@ func (m *Mqtt) Subscribe() {
log.Printf("Subscribed to %s", m.SubTopic) log.Printf("Subscribed to %s", m.SubTopic)
log.Printf("Subscribed to %s", m.DevicesTopic) log.Printf("Subscribed to %s", m.DevicesTopic)
log.Printf("Subscribed to %s", m.DisconnectTopic) log.Printf("Subscribed to %s", "oktopus/+/api/+")
log.Println("Subscribed to %s", "oktopus/+/api/+")
} }
func (m *Mqtt) Publish(msg []byte, topic, respTopic string) { func (m *Mqtt) Publish(msg []byte, topic, respTopic string, retain bool) {
if _, err := c.Publish(context.Background(), &paho.Publish{ if _, err := c.Publish(context.Background(), &paho.Publish{
Topic: topic, Topic: topic,
QoS: byte(m.QoS), QoS: byte(m.QoS),
Retain: false, Retain: retain,
Payload: msg, Payload: msg,
Properties: &paho.PublishProperties{ Properties: &paho.PublishProperties{
ResponseTopic: respTopic, ResponseTopic: respTopic,
@ -123,15 +125,13 @@ func (m *Mqtt) Publish(msg []byte, topic, respTopic string) {
/* -------------------------------------------------------------------------- */ /* -------------------------------------------------------------------------- */
func (m *Mqtt) buildClientConfig(devices, controller, disconnect, apiMsg chan *paho.Publish) *paho.ClientConfig { func (m *Mqtt) buildClientConfig(status, controller, apiMsg chan *paho.Publish) *paho.ClientConfig {
log.Println("Starting new mqtt client") log.Println("Starting new mqtt client")
singleHandler := paho.NewSingleHandlerRouter(func(p *paho.Publish) { singleHandler := paho.NewSingleHandlerRouter(func(p *paho.Publish) {
if strings.Contains(p.Topic, "devices") { if strings.Contains(p.Topic, "status") {
devices <- p status <- p
} else if strings.Contains(p.Topic, "controller") { } else if strings.Contains(p.Topic, "controller") {
controller <- p controller <- p
} else if strings.Contains(p.Topic, "disconnect") {
disconnect <- p
} else if strings.Contains(p.Topic, "api") { } else if strings.Contains(p.Topic, "api") {
apiMsg <- p apiMsg <- p
} else { } else {
@ -169,23 +169,32 @@ func (m *Mqtt) buildClientConfig(devices, controller, disconnect, apiMsg chan *p
return &clientConfig return &clientConfig
} }
func (m *Mqtt) messageHandler(devices, controller, disconnect, apiMsg chan *paho.Publish) { func (m *Mqtt) messageHandler(status, controller, apiMsg chan *paho.Publish) {
for { for {
select { select {
case d := <-devices: case d := <-status:
paths := strings.Split(d.Topic, "/") paths := strings.Split(d.Topic, "/")
device := paths[len(paths)-1] device := paths[len(paths)-1]
log.Println("New device: ", device) payload, err := strconv.Atoi(string(d.Payload))
m.handleNewDevice(device) if err != nil {
log.Println("Status topic payload message type error")
log.Fatalln(err)
}
if payload == ONLINE {
log.Println("Device connected:", device)
m.handleNewDevice(device)
//m.deleteRetainedMessage(d, device)
} else if payload == OFFLINE {
log.Println("Device disconnected:1", device)
m.handleDevicesDisconnect(device)
//m.deleteRetainedMessage(d, device)
} else {
log.Println("Status topic payload message type error")
}
case c := <-controller: case c := <-controller:
topic := c.Topic topic := c.Topic
sn := strings.Split(topic, "/") sn := strings.Split(topic, "/")
m.handleNewDevicesResponse(c.Payload, sn[3]) m.handleNewDevicesResponse(c.Payload, sn[3])
case dis := <-disconnect:
paths := strings.Split(dis.Topic, "/")
device := paths[len(paths)-1]
log.Println("Device disconnected: ", device)
m.handleDevicesDisconnect(device)
case api := <-apiMsg: case api := <-apiMsg:
log.Println("Handle api request") log.Println("Handle api request")
m.handleApiRequest(api.Payload) m.handleApiRequest(api.Payload)
@ -193,6 +202,12 @@ func (m *Mqtt) messageHandler(devices, controller, disconnect, apiMsg chan *paho
} }
} }
//TODO: handle device status at mochi redis
//func (m *Mqtt) deleteRetainedMessage(message *paho.Publish, deviceMac string) {
// m.Publish([]byte(""), "oktopus/v1/status/"+deviceMac, "", true)
// log.Println("Message contains the retain flag, deleting it, as it's already received")
//}
func (m *Mqtt) handleApiRequest(api []byte) { func (m *Mqtt) handleApiRequest(api []byte) {
var record usp_record.Record var record usp_record.Record
err := proto.Unmarshal(api, &record) err := proto.Unmarshal(api, &record)
@ -246,7 +261,7 @@ func (m *Mqtt) handleNewDevice(deviceMac string) {
if err != nil { if err != nil {
log.Fatalln("Failed to encode tr369 record:", err) log.Fatalln("Failed to encode tr369 record:", err)
} }
m.Publish(tr369Message, "oktopus/v1/agent/"+deviceMac, "oktopus/v1/controller/"+deviceMac) m.Publish(tr369Message, "oktopus/v1/agent/"+deviceMac, "oktopus/v1/controller/"+deviceMac, false)
} }
func (m *Mqtt) handleNewDevicesResponse(p []byte, sn string) { func (m *Mqtt) handleNewDevicesResponse(p []byte, sn string) {

View File

@ -13,7 +13,7 @@ import (
type Broker interface { type Broker interface {
Connect() Connect()
Disconnect() Disconnect()
Publish(msg []byte, topic, respTopic string) Publish(msg []byte, topic, respTopic string, retain bool)
Subscribe() Subscribe()
/* /*
At request method we're able to send a message to a topic At request method we're able to send a message to a topic

View File

@ -252,7 +252,7 @@ func (h *MyHook) OnDisconnect(cl *mqtt.Client, err error, expire bool) {
} }
if clUser != "" { if clUser != "" {
err := server.Publish("oktopus/v1/disconnect/"+clUser, []byte(""), false, 1) err := server.Publish("oktopus/v1/status/"+clUser, []byte("1"), false, 1)
if err != nil { if err != nil {
log.Println("server publish error: ", err) log.Println("server publish error: ", err)
} }
@ -271,10 +271,12 @@ func (h *MyHook) OnSubscribed(cl *mqtt.Client, pk packets.Packet, reasonCodes []
if clUser != "" { if clUser != "" {
cl.Properties.Will = mqtt.Will{ cl.Properties.Will = mqtt.Will{
Qos: 1, Qos: 1,
TopicName: "oktopus/v1/disconnect/" + clUser, TopicName: "oktopus/v1/status/" + clUser,
Payload: []byte("1"),
Retain: false,
} }
log.Println("new device:", clUser) log.Println("new device:", clUser)
err := server.Publish("oktopus/v1/devices/"+clUser, []byte(""), false, 1) err := server.Publish("oktopus/v1/status/"+clUser, []byte("0"), false, 1)
if err != nil { if err != nil {
log.Println("server publish error: ", err) log.Println("server publish error: ", err)
} }