diff --git a/backend/services/controller/cmd/oktopus/main.go b/backend/services/controller/cmd/oktopus/main.go index 6c38c42..c75822d 100755 --- a/backend/services/controller/cmd/oktopus/main.go +++ b/backend/services/controller/cmd/oktopus/main.go @@ -45,8 +45,7 @@ func main() { log.Println("Starting Oktopus Project TR-369 Controller Version:", VERSION) // fl_endpointId := flag.String("endpoint_id", "proto::oktopus-controller", "Defines the enpoint id the Agent must trust on.") - flDevicesTopic := flag.String("d", "oktopus/+/devices/+", "That's the topic mqtt broker end new devices info.") - flDisconTopic := flag.String("dis", "oktopus/+/disconnect/+", "It's where disconnected IoTs are known.") + flDevicesTopic := flag.String("d", "oktopus/+/status/+", "That's the topic mqtt broker end new devices info.") flSubTopic := flag.String("sub", "oktopus/+/controller/+", "That's the topic agent must publish to, and the controller keeps on listening.") flBrokerAddr := flag.String("a", "localhost", "Mqtt broker adrress") flBrokerPort := flag.String("p", "1883", "Mqtt broker port") @@ -54,7 +53,7 @@ func main() { flBrokerUsername := flag.String("u", "", "Mqtt broker username") flBrokerPassword := flag.String("P", "", "Mqtt broker password") flBrokerClientId := flag.String("i", "", "A clientid for the Mqtt connection") - flBrokerQos := flag.Int("q", 2, "Quality of service of mqtt messages delivery") + flBrokerQos := flag.Int("q", 0, "Quality of service of mqtt messages delivery") flAddrDB := flag.String("mongo", "mongodb://localhost:27017/", "MongoDB URI") flApiPort := flag.String("ap", "8000", "Rest api port") flHelp := flag.Bool("help", false, "Help") @@ -77,20 +76,19 @@ func main() { If you want to use another message protocol just make it implement Broker interface. */ mqttClient := mqtt.Mqtt{ - Addr: *flBrokerAddr, - Port: *flBrokerPort, - Id: *flBrokerClientId, - User: *flBrokerUsername, - Passwd: *flBrokerPassword, - Ctx: ctx, - QoS: *flBrokerQos, - SubTopic: *flSubTopic, - DevicesTopic: *flDevicesTopic, - DisconnectTopic: *flDisconTopic, - TLS: *flTlsCert, - DB: database, - MsgQueue: apiMsgQueue, - QMutex: &m, + Addr: *flBrokerAddr, + Port: *flBrokerPort, + Id: *flBrokerClientId, + User: *flBrokerUsername, + Passwd: *flBrokerPassword, + Ctx: ctx, + QoS: *flBrokerQos, + SubTopic: *flSubTopic, + DevicesTopic: *flDevicesTopic, + TLS: *flTlsCert, + DB: database, + MsgQueue: apiMsgQueue, + QMutex: &m, } mtp.MtpService(&mqttClient, done) diff --git a/backend/services/controller/internal/api/api.go b/backend/services/controller/internal/api/api.go index 6463de7..807f7f0 100644 --- a/backend/services/controller/internal/api/api.go +++ b/backend/services/controller/internal/api/api.go @@ -171,7 +171,7 @@ func (a *Api) deviceFwUpdate(w http.ResponseWriter, r *http.Request) { a.MsgQueue[msg.Header.MsgId] = make(chan usp_msg.Msg) log.Println("Sending Msg:", msg.Header.MsgId) - a.Broker.Publish(tr369Message, "oktopus/v1/agent/"+sn, "oktopus/v1/api/"+sn) + a.Broker.Publish(tr369Message, "oktopus/v1/agent/"+sn, "oktopus/v1/api/"+sn, false) var getMsgAnswer *usp_msg.GetResp @@ -231,7 +231,7 @@ func (a *Api) deviceFwUpdate(w http.ResponseWriter, r *http.Request) { a.MsgQueue[msg.Header.MsgId] = make(chan usp_msg.Msg) log.Println("Sending Msg:", msg.Header.MsgId) - a.Broker.Publish(tr369Message, "oktopus/v1/agent/"+sn, "oktopus/v1/api/"+sn) + a.Broker.Publish(tr369Message, "oktopus/v1/agent/"+sn, "oktopus/v1/api/"+sn, false) select { case msg := <-a.MsgQueue[msg.Header.MsgId]: @@ -281,7 +281,7 @@ func (a *Api) deviceGetParameterInstances(w http.ResponseWriter, r *http.Request //a.Broker.Request(tr369Message, usp_msg.Header_GET, "oktopus/v1/agent/"+sn, "oktopus/v1/get/"+sn) a.MsgQueue[msg.Header.MsgId] = make(chan usp_msg.Msg) log.Println("Sending Msg:", msg.Header.MsgId) - a.Broker.Publish(tr369Message, "oktopus/v1/agent/"+sn, "oktopus/v1/api/"+sn) + a.Broker.Publish(tr369Message, "oktopus/v1/agent/"+sn, "oktopus/v1/api/"+sn, false) select { case msg := <-a.MsgQueue[msg.Header.MsgId]: @@ -331,7 +331,7 @@ func (a *Api) deviceGetSupportedParametersMsg(w http.ResponseWriter, r *http.Req //a.Broker.Request(tr369Message, usp_msg.Header_GET, "oktopus/v1/agent/"+sn, "oktopus/v1/get/"+sn) a.MsgQueue[msg.Header.MsgId] = make(chan usp_msg.Msg) log.Println("Sending Msg:", msg.Header.MsgId) - a.Broker.Publish(tr369Message, "oktopus/v1/agent/"+sn, "oktopus/v1/api/"+sn) + a.Broker.Publish(tr369Message, "oktopus/v1/agent/"+sn, "oktopus/v1/api/"+sn, false) select { case msg := <-a.MsgQueue[msg.Header.MsgId]: @@ -381,7 +381,7 @@ func (a *Api) deviceCreateMsg(w http.ResponseWriter, r *http.Request) { //a.Broker.Request(tr369Message, usp_msg.Header_GET, "oktopus/v1/agent/"+sn, "oktopus/v1/get/"+sn) a.MsgQueue[msg.Header.MsgId] = make(chan usp_msg.Msg) log.Println("Sending Msg:", msg.Header.MsgId) - a.Broker.Publish(tr369Message, "oktopus/v1/agent/"+sn, "oktopus/v1/api/"+sn) + a.Broker.Publish(tr369Message, "oktopus/v1/agent/"+sn, "oktopus/v1/api/"+sn, false) select { case msg := <-a.MsgQueue[msg.Header.MsgId]: @@ -432,7 +432,7 @@ func (a *Api) deviceGetMsg(w http.ResponseWriter, r *http.Request) { a.MsgQueue[msg.Header.MsgId] = make(chan usp_msg.Msg) log.Println("Sending Msg:", msg.Header.MsgId) - a.Broker.Publish(tr369Message, "oktopus/v1/agent/"+sn, "oktopus/v1/api/"+sn) + a.Broker.Publish(tr369Message, "oktopus/v1/agent/"+sn, "oktopus/v1/api/"+sn, false) select { case msg := <-a.MsgQueue[msg.Header.MsgId]: @@ -482,7 +482,7 @@ func (a *Api) deviceDeleteMsg(w http.ResponseWriter, r *http.Request) { //a.Broker.Request(tr369Message, usp_msg.Header_GET, "oktopus/v1/agent/"+sn, "oktopus/v1/get/"+sn) a.MsgQueue[msg.Header.MsgId] = make(chan usp_msg.Msg) log.Println("Sending Msg:", msg.Header.MsgId) - a.Broker.Publish(tr369Message, "oktopus/v1/agent/"+sn, "oktopus/v1/api/"+sn) + a.Broker.Publish(tr369Message, "oktopus/v1/agent/"+sn, "oktopus/v1/api/"+sn, false) select { case msg := <-a.MsgQueue[msg.Header.MsgId]: @@ -532,7 +532,7 @@ func (a *Api) deviceUpdateMsg(w http.ResponseWriter, r *http.Request) { //a.Broker.Request(tr369Message, usp_msg.Header_GET, "oktopus/v1/agent/"+sn, "oktopus/v1/get/"+sn) a.MsgQueue[msg.Header.MsgId] = make(chan usp_msg.Msg) log.Println("Sending Msg:", msg.Header.MsgId) - a.Broker.Publish(tr369Message, "oktopus/v1/agent/"+sn, "oktopus/v1/api/"+sn) + a.Broker.Publish(tr369Message, "oktopus/v1/agent/"+sn, "oktopus/v1/api/"+sn, false) select { case msg := <-a.MsgQueue[msg.Header.MsgId]: diff --git a/backend/services/controller/internal/mqtt/mqtt.go b/backend/services/controller/internal/mqtt/mqtt.go index 8f544db..ffa902f 100644 --- a/backend/services/controller/internal/mqtt/mqtt.go +++ b/backend/services/controller/internal/mqtt/mqtt.go @@ -11,28 +11,33 @@ import ( "google.golang.org/protobuf/proto" "log" "net/url" + "strconv" "strings" "sync" "time" ) type Mqtt struct { - Addr string - Port string - Id string - User string - Passwd string - Ctx context.Context - QoS int - SubTopic string - DevicesTopic string - DisconnectTopic string - TLS bool - DB db.Database - MsgQueue map[string](chan usp_msg.Msg) - QMutex *sync.Mutex + Addr string + Port string + Id string + User string + Passwd string + Ctx context.Context + QoS int + SubTopic string + DevicesTopic string + TLS bool + DB db.Database + MsgQueue map[string](chan usp_msg.Msg) + QMutex *sync.Mutex } +const ( + ONLINE = iota + OFFLINE +) + var c *autopaho.ConnectionManager /* ------------------- Implementations of broker interface ------------------ */ @@ -41,13 +46,12 @@ func (m *Mqtt) Connect() { broker, _ := url.Parse("tcp://" + m.Addr + ":" + m.Port) - devices := make(chan *paho.Publish) + status := make(chan *paho.Publish) controller := make(chan *paho.Publish) - disconnect := make(chan *paho.Publish) apiMsg := make(chan *paho.Publish) - go m.messageHandler(devices, controller, disconnect, apiMsg) - pahoClientConfig := m.buildClientConfig(devices, controller, disconnect, apiMsg) + go m.messageHandler(status, controller, apiMsg) + pahoClientConfig := m.buildClientConfig(status, controller, apiMsg) autopahoClientConfig := autopaho.ClientConfig{ BrokerUrls: []*url.URL{broker}, @@ -90,10 +94,9 @@ func (m *Mqtt) Disconnect() { func (m *Mqtt) Subscribe() { if _, err := c.Subscribe(m.Ctx, &paho.Subscribe{ Subscriptions: map[string]paho.SubscribeOptions{ - m.SubTopic: {QoS: byte(m.QoS), NoLocal: true}, - m.DevicesTopic: {QoS: byte(m.QoS), NoLocal: true}, - m.DisconnectTopic: {QoS: byte(m.QoS), NoLocal: true}, - "oktopus/+/api/+": {QoS: byte(m.QoS), NoLocal: true}, + m.SubTopic: {QoS: byte(m.QoS)}, + m.DevicesTopic: {QoS: byte(m.QoS)}, + "oktopus/+/api/+": {QoS: byte(m.QoS)}, }, }); err != nil { log.Fatalln(err) @@ -101,15 +104,14 @@ func (m *Mqtt) Subscribe() { log.Printf("Subscribed to %s", m.SubTopic) log.Printf("Subscribed to %s", m.DevicesTopic) - log.Printf("Subscribed to %s", m.DisconnectTopic) - log.Println("Subscribed to %s", "oktopus/+/api/+") + log.Printf("Subscribed to %s", "oktopus/+/api/+") } -func (m *Mqtt) Publish(msg []byte, topic, respTopic string) { +func (m *Mqtt) Publish(msg []byte, topic, respTopic string, retain bool) { if _, err := c.Publish(context.Background(), &paho.Publish{ Topic: topic, QoS: byte(m.QoS), - Retain: false, + Retain: retain, Payload: msg, Properties: &paho.PublishProperties{ ResponseTopic: respTopic, @@ -123,15 +125,13 @@ func (m *Mqtt) Publish(msg []byte, topic, respTopic string) { /* -------------------------------------------------------------------------- */ -func (m *Mqtt) buildClientConfig(devices, controller, disconnect, apiMsg chan *paho.Publish) *paho.ClientConfig { +func (m *Mqtt) buildClientConfig(status, controller, apiMsg chan *paho.Publish) *paho.ClientConfig { log.Println("Starting new mqtt client") singleHandler := paho.NewSingleHandlerRouter(func(p *paho.Publish) { - if strings.Contains(p.Topic, "devices") { - devices <- p + if strings.Contains(p.Topic, "status") { + status <- p } else if strings.Contains(p.Topic, "controller") { controller <- p - } else if strings.Contains(p.Topic, "disconnect") { - disconnect <- p } else if strings.Contains(p.Topic, "api") { apiMsg <- p } else { @@ -169,23 +169,32 @@ func (m *Mqtt) buildClientConfig(devices, controller, disconnect, apiMsg chan *p return &clientConfig } -func (m *Mqtt) messageHandler(devices, controller, disconnect, apiMsg chan *paho.Publish) { +func (m *Mqtt) messageHandler(status, controller, apiMsg chan *paho.Publish) { for { select { - case d := <-devices: + case d := <-status: paths := strings.Split(d.Topic, "/") device := paths[len(paths)-1] - log.Println("New device: ", device) - m.handleNewDevice(device) + payload, err := strconv.Atoi(string(d.Payload)) + if err != nil { + log.Println("Status topic payload message type error") + log.Fatalln(err) + } + if payload == ONLINE { + log.Println("Device connected:", device) + m.handleNewDevice(device) + //m.deleteRetainedMessage(d, device) + } else if payload == OFFLINE { + log.Println("Device disconnected:1", device) + m.handleDevicesDisconnect(device) + //m.deleteRetainedMessage(d, device) + } else { + log.Println("Status topic payload message type error") + } case c := <-controller: topic := c.Topic sn := strings.Split(topic, "/") m.handleNewDevicesResponse(c.Payload, sn[3]) - case dis := <-disconnect: - paths := strings.Split(dis.Topic, "/") - device := paths[len(paths)-1] - log.Println("Device disconnected: ", device) - m.handleDevicesDisconnect(device) case api := <-apiMsg: log.Println("Handle api request") m.handleApiRequest(api.Payload) @@ -193,6 +202,12 @@ func (m *Mqtt) messageHandler(devices, controller, disconnect, apiMsg chan *paho } } +//TODO: handle device status at mochi redis +//func (m *Mqtt) deleteRetainedMessage(message *paho.Publish, deviceMac string) { +// m.Publish([]byte(""), "oktopus/v1/status/"+deviceMac, "", true) +// log.Println("Message contains the retain flag, deleting it, as it's already received") +//} + func (m *Mqtt) handleApiRequest(api []byte) { var record usp_record.Record err := proto.Unmarshal(api, &record) @@ -246,7 +261,7 @@ func (m *Mqtt) handleNewDevice(deviceMac string) { if err != nil { log.Fatalln("Failed to encode tr369 record:", err) } - m.Publish(tr369Message, "oktopus/v1/agent/"+deviceMac, "oktopus/v1/controller/"+deviceMac) + m.Publish(tr369Message, "oktopus/v1/agent/"+deviceMac, "oktopus/v1/controller/"+deviceMac, false) } func (m *Mqtt) handleNewDevicesResponse(p []byte, sn string) { diff --git a/backend/services/controller/internal/mtp/mtp.go b/backend/services/controller/internal/mtp/mtp.go index f5852f6..fc78e7d 100644 --- a/backend/services/controller/internal/mtp/mtp.go +++ b/backend/services/controller/internal/mtp/mtp.go @@ -13,7 +13,7 @@ import ( type Broker interface { Connect() Disconnect() - Publish(msg []byte, topic, respTopic string) + Publish(msg []byte, topic, respTopic string, retain bool) Subscribe() /* At request method we're able to send a message to a topic diff --git a/backend/services/mochi/cmd/main.go b/backend/services/mochi/cmd/main.go index 2eb9a7d..2a64d5e 100644 --- a/backend/services/mochi/cmd/main.go +++ b/backend/services/mochi/cmd/main.go @@ -252,7 +252,7 @@ func (h *MyHook) OnDisconnect(cl *mqtt.Client, err error, expire bool) { } if clUser != "" { - err := server.Publish("oktopus/v1/disconnect/"+clUser, []byte(""), false, 1) + err := server.Publish("oktopus/v1/status/"+clUser, []byte("1"), false, 1) if err != nil { log.Println("server publish error: ", err) } @@ -271,10 +271,12 @@ func (h *MyHook) OnSubscribed(cl *mqtt.Client, pk packets.Packet, reasonCodes [] if clUser != "" { cl.Properties.Will = mqtt.Will{ Qos: 1, - TopicName: "oktopus/v1/disconnect/" + clUser, + TopicName: "oktopus/v1/status/" + clUser, + Payload: []byte("1"), + Retain: false, } log.Println("new device:", clUser) - err := server.Publish("oktopus/v1/devices/"+clUser, []byte(""), false, 1) + err := server.Publish("oktopus/v1/status/"+clUser, []byte("0"), false, 1) if err != nil { log.Println("server publish error: ", err) }