diff --git a/backend/services/controller/cmd/oktopus/main.go b/backend/services/controller/cmd/oktopus/main.go index 4065f2d..f7ee7b3 100755 --- a/backend/services/controller/cmd/oktopus/main.go +++ b/backend/services/controller/cmd/oktopus/main.go @@ -29,6 +29,7 @@ func main() { log.Println("Starting Oktopus Project TR-369 Controller Version:", VERSION) // fl_endpointId := flag.String("endpoint_id", "proto::oktopus-controller", "Defines the enpoint id the Agent must trust on.") flDevicesTopic := flag.String("d", "oktopus/devices", "That's the topic mqtt broker end new devices info.") + flDisconTopic := flag.String("dis", "oktopus/disconnect", "It's where disconnected IoTs are known.") flSubTopic := flag.String("sub", "oktopus/+/controller/+", "That's the topic agent must publish to, and the controller keeps on listening.") flBrokerAddr := flag.String("a", "localhost", "Mqtt broker adrress") flBrokerPort := flag.String("p", "1883", "Mqtt broker port") @@ -57,17 +58,18 @@ func main() { If you want to use another message protocol just make it implement Broker interface. */ mqttClient := mqtt.Mqtt{ - Addr: *flBrokerAddr, - Port: *flBrokerPort, - Id: *flBrokerClientId, - User: *flBrokerUsername, - Passwd: *flBrokerPassword, - Ctx: ctx, - QoS: *flBrokerQos, - SubTopic: *flSubTopic, - DevicesTopic: *flDevicesTopic, - CA: *flTlsCert, - DB: database, + Addr: *flBrokerAddr, + Port: *flBrokerPort, + Id: *flBrokerClientId, + User: *flBrokerUsername, + Passwd: *flBrokerPassword, + Ctx: ctx, + QoS: *flBrokerQos, + SubTopic: *flSubTopic, + DevicesTopic: *flDevicesTopic, + DisconnectTopic: *flDisconTopic, + CA: *flTlsCert, + DB: database, } mtp.MtpService(&mqttClient, done) diff --git a/backend/services/controller/internal/db/device.go b/backend/services/controller/internal/db/device.go index e152afc..6731f46 100644 --- a/backend/services/controller/internal/db/device.go +++ b/backend/services/controller/internal/db/device.go @@ -13,6 +13,7 @@ type Device struct { Customer string Vendor string Version string + Status uint8 } func (d *Database) CreateDevice(device Device) error { diff --git a/backend/services/controller/internal/db/status.go b/backend/services/controller/internal/db/status.go new file mode 100644 index 0000000..517690d --- /dev/null +++ b/backend/services/controller/internal/db/status.go @@ -0,0 +1,21 @@ +package db + +import ( + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "log" +) + +func (d *Database) UpdateStatus(sn string, status uint8) error { + var result bson.M + err := d.devices.FindOneAndUpdate(d.ctx, bson.D{{"sn", sn}}, bson.D{{"$set", bson.D{{"status", status}}}}).Decode(&result) + if err != nil { + if err == mongo.ErrNoDocuments { + log.Printf("Device %s is not mapped into database", sn) + return nil + } + log.Println(err) + } + log.Printf("%s is now offline.", sn) + return err +} diff --git a/backend/services/controller/internal/mqtt/mqtt.go b/backend/services/controller/internal/mqtt/mqtt.go index 662d886..c8e1cd0 100644 --- a/backend/services/controller/internal/mqtt/mqtt.go +++ b/backend/services/controller/internal/mqtt/mqtt.go @@ -18,17 +18,18 @@ import ( ) type Mqtt struct { - Addr string - Port string - Id string - User string - Passwd string - Ctx context.Context - QoS int - SubTopic string - DevicesTopic string - CA string - DB db.Database + Addr string + Port string + Id string + User string + Passwd string + Ctx context.Context + QoS int + SubTopic string + DevicesTopic string + DisconnectTopic string + CA string + DB db.Database } var c *paho.Client @@ -38,8 +39,9 @@ var c *paho.Client func (m *Mqtt) Connect() { devices := make(chan *paho.Publish) controller := make(chan *paho.Publish) - go m.messageHandler(devices, controller) - clientConfig := m.startClient(devices, controller) + disconnect := make(chan *paho.Publish) + go m.messageHandler(devices, controller, disconnect) + clientConfig := m.startClient(devices, controller, disconnect) connParameters := startConnection(m.Id, m.User, m.Passwd) conn, err := clientConfig.Connect(m.Ctx, &connParameters) @@ -67,8 +69,9 @@ func (m *Mqtt) Disconnect() { func (m *Mqtt) Subscribe() { if _, err := c.Subscribe(m.Ctx, &paho.Subscribe{ Subscriptions: map[string]paho.SubscribeOptions{ - m.SubTopic: {QoS: byte(m.QoS), NoLocal: true}, - m.DevicesTopic: {QoS: byte(m.QoS), NoLocal: true}, + m.SubTopic: {QoS: byte(m.QoS), NoLocal: true}, + m.DevicesTopic: {QoS: byte(m.QoS), NoLocal: true}, + m.DisconnectTopic: {QoS: byte(m.QoS), NoLocal: true}, }, }); err != nil { log.Fatalln(err) @@ -76,6 +79,8 @@ func (m *Mqtt) Subscribe() { log.Printf("Subscribed to %s", m.SubTopic) log.Printf("Subscribed to %s", m.DevicesTopic) + log.Printf("Subscribed to %s", m.DisconnectTopic) + } func (m *Mqtt) Publish(msg []byte, topic, respTopic string) { @@ -96,12 +101,14 @@ func (m *Mqtt) Publish(msg []byte, topic, respTopic string) { /* -------------------------------------------------------------------------- */ -func (m *Mqtt) startClient(devices, controller chan *paho.Publish) *paho.Client { +func (m *Mqtt) startClient(devices, controller, disconnect chan *paho.Publish) *paho.Client { singleHandler := paho.NewSingleHandlerRouter(func(p *paho.Publish) { if p.Topic == m.DevicesTopic { devices <- p } else if strings.Contains(p.Topic, "controller") { controller <- p + } else if p.Topic == m.DisconnectTopic { + disconnect <- p } else { log.Println("No handler for topic: ", p.Topic) } @@ -210,7 +217,7 @@ func startConnection(id, user, pass string) paho.Connect { return connParameters } -func (m *Mqtt) messageHandler(devices, controller chan *paho.Publish) { +func (m *Mqtt) messageHandler(devices, controller, disconnect chan *paho.Publish) { for { select { case d := <-devices: @@ -219,6 +226,10 @@ func (m *Mqtt) messageHandler(devices, controller chan *paho.Publish) { m.handleNewDevice(payload) case c := <-controller: m.handleDevicesResponse(c.Payload) + case dis := <-disconnect: + payload := string(dis.Payload) + log.Println("Device disconnected: ", payload) + m.handleDevicesDisconnect(payload) } } } @@ -287,9 +298,18 @@ func (m *Mqtt) handleDevicesResponse(p []byte) { device.Model = msg.ReqPathResults[1].ResolvedPathResults[0].ResultParams["ModelName"] device.Version = msg.ReqPathResults[2].ResolvedPathResults[0].ResultParams["SoftwareVersion"] device.SN = msg.ReqPathResults[3].ResolvedPathResults[0].ResultParams["SerialNumber"] + device.Status = utils.Online err = m.DB.CreateDevice(device) if err != nil { log.Fatal(err) } } + +func (m *Mqtt) handleDevicesDisconnect(p string) { + // Update status of device at database + err := m.DB.UpdateStatus(p, utils.Offline) + if err != nil { + log.Fatal(err) + } +} diff --git a/backend/services/controller/internal/utils/utils.go b/backend/services/controller/internal/utils/utils.go index 5feec9e..db23301 100644 --- a/backend/services/controller/internal/utils/utils.go +++ b/backend/services/controller/internal/utils/utils.go @@ -4,6 +4,13 @@ import ( "net" ) +//Status are saved at database as numbers +const ( + Online = iota + Associating + Offline +) + // Get interfaces MACs, and the first interface MAC is gonna be used as mqtt clientId func GetMacAddr() ([]string, error) { ifas, err := net.Interfaces() diff --git a/backend/services/mochi/cmd/main.go b/backend/services/mochi/cmd/main.go index b91ce7c..db3a4b7 100644 --- a/backend/services/mochi/cmd/main.go +++ b/backend/services/mochi/cmd/main.go @@ -15,8 +15,10 @@ import ( "strings" "syscall" + rv8 "github.com/go-redis/redis/v8" "github.com/mochi-co/mqtt/v2" "github.com/mochi-co/mqtt/v2/hooks/auth" + "github.com/mochi-co/mqtt/v2/hooks/storage/redis" "github.com/mochi-co/mqtt/v2/listeners" ) @@ -34,6 +36,7 @@ var server = mqtt.New(&mqtt.Options{ func main() { tcpAddr := flag.String("tcp", ":1883", "network address for TCP listener") + redisAddr := flag.String("redis", "172.17.0.2:6379", "host address of redis db") wsAddr := flag.String("ws", "", "network address for Websocket listener") infoAddr := flag.String("info", "", "network address for web info dashboard listener") path := flag.String("path", "", "path to data auth file") @@ -98,6 +101,17 @@ func main() { log.Fatal(err) } + err = server.AddHook(new(redis.Hook), &redis.Options{ + Options: &rv8.Options{ + Addr: *redisAddr, // default redis address + Password: "", // your password + DB: 0, // your redis db + }, + }) + if err != nil { + log.Fatal(err) + } + go func() { err := server.Serve() if err != nil { @@ -123,6 +137,7 @@ func (h *MyHook) ID() string { func (h *MyHook) Provides(b byte) bool { return bytes.Contains([]byte{ mqtt.OnSubscribed, + mqtt.OnDisconnect, }, []byte{b}) } @@ -131,6 +146,26 @@ func (h *MyHook) Init(config any) error { return nil } +func (h *MyHook) Red(config any) error { + h.Log.Info().Msg("initialised") + return nil +} + +func (h *MyHook) OnDisconnect(cl *mqtt.Client, err error, expire bool) { + var clUser string + if len(cl.Properties.Props.User) > 0 { + clUser = cl.Properties.Props.User[0].Val + } + + if clUser != "" { + sn := strings.Split(clUser, "-") + err := server.Publish("oktopus/disconnect", []byte(sn[1]), false, 1) + if err != nil { + log.Println("server publish error: ", err) + } + } +} + func (h *MyHook) OnSubscribed(cl *mqtt.Client, pk packets.Packet, reasonCodes []byte) { // Verifies if it's a device who is subscribed if strings.Contains(pk.Filters[0].Filter, "oktopus/v1/agent") {