diff --git a/backend/services/controller/cmd/oktopus/main.go b/backend/services/controller/cmd/oktopus/main.go index 4065f2d..b6c6d14 100755 --- a/backend/services/controller/cmd/oktopus/main.go +++ b/backend/services/controller/cmd/oktopus/main.go @@ -7,9 +7,11 @@ import ( "flag" "github.com/leandrofars/oktopus/internal/api" "github.com/leandrofars/oktopus/internal/db" + usp_msg "github.com/leandrofars/oktopus/internal/usp_message" "log" "os" "os/signal" + "sync" "syscall" "github.com/leandrofars/oktopus/internal/mqtt" @@ -29,6 +31,7 @@ func main() { log.Println("Starting Oktopus Project TR-369 Controller Version:", VERSION) // fl_endpointId := flag.String("endpoint_id", "proto::oktopus-controller", "Defines the enpoint id the Agent must trust on.") flDevicesTopic := flag.String("d", "oktopus/devices", "That's the topic mqtt broker end new devices info.") + flDisconTopic := flag.String("dis", "oktopus/disconnect", "It's where disconnected IoTs are known.") flSubTopic := flag.String("sub", "oktopus/+/controller/+", "That's the topic agent must publish to, and the controller keeps on listening.") flBrokerAddr := flag.String("a", "localhost", "Mqtt broker adrress") flBrokerPort := flag.String("p", "1883", "Mqtt broker port") @@ -53,25 +56,30 @@ func main() { */ ctx, cancel := context.WithCancel(context.Background()) database := db.NewDatabase(ctx, *flAddrDB) + apiMsgQueue := make(map[string](chan usp_msg.Msg)) + var m sync.Mutex /* If you want to use another message protocol just make it implement Broker interface. */ mqttClient := mqtt.Mqtt{ - Addr: *flBrokerAddr, - Port: *flBrokerPort, - Id: *flBrokerClientId, - User: *flBrokerUsername, - Passwd: *flBrokerPassword, - Ctx: ctx, - QoS: *flBrokerQos, - SubTopic: *flSubTopic, - DevicesTopic: *flDevicesTopic, - CA: *flTlsCert, - DB: database, + Addr: *flBrokerAddr, + Port: *flBrokerPort, + Id: *flBrokerClientId, + User: *flBrokerUsername, + Passwd: *flBrokerPassword, + Ctx: ctx, + QoS: *flBrokerQos, + SubTopic: *flSubTopic, + DevicesTopic: *flDevicesTopic, + DisconnectTopic: *flDisconTopic, + CA: *flTlsCert, + DB: database, + MsgQueue: apiMsgQueue, + QMutex: &m, } mtp.MtpService(&mqttClient, done) - a := api.NewApi(*flApiPort, database) + a := api.NewApi(*flApiPort, database, &mqttClient, apiMsgQueue, &m) api.StartApi(a) <-done diff --git a/backend/services/controller/go.mod b/backend/services/controller/go.mod index 8457d1a..5507d84 100755 --- a/backend/services/controller/go.mod +++ b/backend/services/controller/go.mod @@ -10,6 +10,7 @@ require ( require ( github.com/golang/snappy v0.0.1 // indirect + github.com/google/uuid v1.3.0 // indirect github.com/gorilla/mux v1.8.0 // indirect github.com/klauspost/compress v1.13.6 // indirect github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe // indirect diff --git a/backend/services/controller/go.sum b/backend/services/controller/go.sum index 19993ba..5d46421 100644 --- a/backend/services/controller/go.sum +++ b/backend/services/controller/go.sum @@ -9,6 +9,8 @@ github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEW github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI= github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= diff --git a/backend/services/controller/internal/api/api.go b/backend/services/controller/internal/api/api.go index 28969b4..fc19bb0 100644 --- a/backend/services/controller/internal/api/api.go +++ b/backend/services/controller/internal/api/api.go @@ -4,20 +4,32 @@ import ( "encoding/json" "github.com/gorilla/mux" "github.com/leandrofars/oktopus/internal/db" + "github.com/leandrofars/oktopus/internal/mtp" + usp_msg "github.com/leandrofars/oktopus/internal/usp_message" + "github.com/leandrofars/oktopus/internal/utils" + "go.mongodb.org/mongo-driver/mongo" + "google.golang.org/protobuf/proto" "log" "net/http" + "sync" "time" ) type Api struct { - Port string - Db db.Database + Port string + Db db.Database + Broker mtp.Broker + MsgQueue map[string](chan usp_msg.Msg) + QMutex *sync.Mutex } -func NewApi(port string, db db.Database) Api { +func NewApi(port string, db db.Database, b mtp.Broker, msgQueue map[string](chan usp_msg.Msg), m *sync.Mutex) Api { return Api{ - Port: port, - Db: db, + Port: port, + Db: db, + Broker: b, + MsgQueue: msgQueue, + QMutex: m, } } @@ -28,7 +40,7 @@ func StartApi(a Api) { return }) r.HandleFunc("/devices", a.retrieveDevices) - //r.HandleFunc("/devices/{sn}", a.devicesMessaging) + r.HandleFunc("/devices/{sn}/get", a.deviceGetMsg) srv := &http.Server{ Addr: "0.0.0.0:" + a.Port, @@ -61,3 +73,61 @@ func (a *Api) retrieveDevices(w http.ResponseWriter, r *http.Request) { return } + +func (a *Api) deviceGetMsg(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + sn := vars["sn"] + _, err := a.Db.RetrieveDevice(sn) + if err != nil { + if err == mongo.ErrNoDocuments { + w.WriteHeader(http.StatusBadRequest) + json.NewEncoder(w).Encode("No device with serial number " + sn + " was found") + return + } + w.WriteHeader(http.StatusInternalServerError) + return + } + var receiver usp_msg.Get + //data := []byte(`{"param_paths": {'Device.DeviceInfo.'},"max_depth": 2}`) + //data := []byte("'opa'") + //var jsonBlob = []byte(`{ + // "param_paths": ["Device.DeviceInfo.","Device.ManagementServer."], + // "max_depth": 2 + //}`) + err = json.NewDecoder(r.Body).Decode(receiver) + if err != nil { + log.Println(err) + w.WriteHeader(http.StatusBadRequest) + return + } + + msg := utils.NewGetMsg(receiver) + encodedMsg, err := proto.Marshal(&msg) + if err != nil { + log.Println(err) + w.WriteHeader(http.StatusBadRequest) + return + } + + record := utils.NewUspRecord(encodedMsg, sn) + tr369Message, err := proto.Marshal(&record) + if err != nil { + log.Fatalln("Failed to encode tr369 record:", err) + } + + //a.Broker.Request(tr369Message, usp_msg.Header_GET, "oktopus/v1/agent/"+sn, "oktopus/v1/get/"+sn) + a.MsgQueue[msg.Header.MsgId] = make(chan usp_msg.Msg) + a.Broker.Publish(tr369Message, "oktopus/v1/agent/"+sn, "oktopus/v1/api/"+sn) + + select { + case msg := <-a.MsgQueue[msg.Header.MsgId]: + log.Printf("Received Msg") + json.NewEncoder(w).Encode(msg) + return + case <-time.After(time.Second * 5): + log.Printf("Request Timed Out") + w.WriteHeader(http.StatusGatewayTimeout) + json.NewEncoder(w).Encode("Request Timed Out") + return + } +} diff --git a/backend/services/controller/internal/db/device.go b/backend/services/controller/internal/db/device.go index e152afc..3ba7bf4 100644 --- a/backend/services/controller/internal/db/device.go +++ b/backend/services/controller/internal/db/device.go @@ -13,6 +13,7 @@ type Device struct { Customer string Vendor string Version string + Status uint8 } func (d *Database) CreateDevice(device Device) error { @@ -45,6 +46,16 @@ func (d *Database) RetrieveDevices() ([]Device, error) { return results, nil } +func (d *Database) RetrieveDevice(sn string) (Device, error) { + var result Device + //TODO: filter devices by user ownership + err := d.devices.FindOne(d.ctx, bson.D{{"sn", sn}}, nil).Decode(&result) + if err != nil { + log.Println(err) + } + return result, err +} + func (d *Database) DeleteDevice() { } diff --git a/backend/services/controller/internal/db/status.go b/backend/services/controller/internal/db/status.go new file mode 100644 index 0000000..517690d --- /dev/null +++ b/backend/services/controller/internal/db/status.go @@ -0,0 +1,21 @@ +package db + +import ( + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "log" +) + +func (d *Database) UpdateStatus(sn string, status uint8) error { + var result bson.M + err := d.devices.FindOneAndUpdate(d.ctx, bson.D{{"sn", sn}}, bson.D{{"$set", bson.D{{"status", status}}}}).Decode(&result) + if err != nil { + if err == mongo.ErrNoDocuments { + log.Printf("Device %s is not mapped into database", sn) + return nil + } + log.Println(err) + } + log.Printf("%s is now offline.", sn) + return err +} diff --git a/backend/services/controller/internal/mqtt/mqtt.go b/backend/services/controller/internal/mqtt/mqtt.go index 662d886..b1b3fa0 100644 --- a/backend/services/controller/internal/mqtt/mqtt.go +++ b/backend/services/controller/internal/mqtt/mqtt.go @@ -18,17 +18,20 @@ import ( ) type Mqtt struct { - Addr string - Port string - Id string - User string - Passwd string - Ctx context.Context - QoS int - SubTopic string - DevicesTopic string - CA string - DB db.Database + Addr string + Port string + Id string + User string + Passwd string + Ctx context.Context + QoS int + SubTopic string + DevicesTopic string + DisconnectTopic string + CA string + DB db.Database + MsgQueue map[string](chan usp_msg.Msg) + QMutex *sync.Mutex } var c *paho.Client @@ -38,8 +41,10 @@ var c *paho.Client func (m *Mqtt) Connect() { devices := make(chan *paho.Publish) controller := make(chan *paho.Publish) - go m.messageHandler(devices, controller) - clientConfig := m.startClient(devices, controller) + disconnect := make(chan *paho.Publish) + apiMsg := make(chan *paho.Publish) + go m.messageHandler(devices, controller, disconnect, apiMsg) + clientConfig := m.startClient(devices, controller, disconnect, apiMsg) connParameters := startConnection(m.Id, m.User, m.Passwd) conn, err := clientConfig.Connect(m.Ctx, &connParameters) @@ -67,8 +72,10 @@ func (m *Mqtt) Disconnect() { func (m *Mqtt) Subscribe() { if _, err := c.Subscribe(m.Ctx, &paho.Subscribe{ Subscriptions: map[string]paho.SubscribeOptions{ - m.SubTopic: {QoS: byte(m.QoS), NoLocal: true}, - m.DevicesTopic: {QoS: byte(m.QoS), NoLocal: true}, + m.SubTopic: {QoS: byte(m.QoS), NoLocal: true}, + m.DevicesTopic: {QoS: byte(m.QoS), NoLocal: true}, + m.DisconnectTopic: {QoS: byte(m.QoS), NoLocal: true}, + "oktopus/+/api/+": {QoS: byte(m.QoS), NoLocal: true}, }, }); err != nil { log.Fatalln(err) @@ -76,6 +83,8 @@ func (m *Mqtt) Subscribe() { log.Printf("Subscribed to %s", m.SubTopic) log.Printf("Subscribed to %s", m.DevicesTopic) + log.Printf("Subscribed to %s", m.DisconnectTopic) + } func (m *Mqtt) Publish(msg []byte, topic, respTopic string) { @@ -96,12 +105,16 @@ func (m *Mqtt) Publish(msg []byte, topic, respTopic string) { /* -------------------------------------------------------------------------- */ -func (m *Mqtt) startClient(devices, controller chan *paho.Publish) *paho.Client { +func (m *Mqtt) startClient(devices, controller, disconnect, apiMsg chan *paho.Publish) *paho.Client { singleHandler := paho.NewSingleHandlerRouter(func(p *paho.Publish) { if p.Topic == m.DevicesTopic { devices <- p } else if strings.Contains(p.Topic, "controller") { controller <- p + } else if p.Topic == m.DisconnectTopic { + disconnect <- p + } else if strings.Contains(p.Topic, "api") { + apiMsg <- p } else { log.Println("No handler for topic: ", p.Topic) } @@ -210,7 +223,7 @@ func startConnection(id, user, pass string) paho.Connect { return connParameters } -func (m *Mqtt) messageHandler(devices, controller chan *paho.Publish) { +func (m *Mqtt) messageHandler(devices, controller, disconnect, apiMsg chan *paho.Publish) { for { select { case d := <-devices: @@ -218,11 +231,43 @@ func (m *Mqtt) messageHandler(devices, controller chan *paho.Publish) { log.Println("New device: ", payload) m.handleNewDevice(payload) case c := <-controller: - m.handleDevicesResponse(c.Payload) + topic := c.Topic + sn := strings.Split(topic, "/") + m.handleNewDevicesResponse(c.Payload, sn[3]) + case dis := <-disconnect: + payload := string(dis.Payload) + log.Println("Device disconnected: ", payload) + m.handleDevicesDisconnect(payload) + case api := <-apiMsg: + log.Println("Handle api request") + m.handleApiRequest(api.Payload) } } } +func (m *Mqtt) handleApiRequest(api []byte) { + var record usp_record.Record + err := proto.Unmarshal(api, &record) + if err != nil { + log.Println(err) + } + + //TODO: verify record operation type + var msg usp_msg.Msg + err = proto.Unmarshal(record.GetNoSessionContext().Payload, &msg) + if err != nil { + log.Println(err) + } + + if _, ok := m.MsgQueue[msg.Header.MsgId]; ok { + //m.QMutex.Lock() + m.MsgQueue[msg.Header.MsgId] <- msg + //m.QMutex.Unlock() + } else { + log.Printf("Message answer to request %s arrived too late", msg.Header.MsgId) + } +} + func (m *Mqtt) handleNewDevice(deviceMac string) { payload := usp_msg.Msg{ Header: &usp_msg.Header{ @@ -248,26 +293,16 @@ func (m *Mqtt) handleNewDevice(deviceMac string) { }, } teste, _ := proto.Marshal(&payload) - record := usp_record.Record{ - Version: "0.1", - ToId: deviceMac, - FromId: "leleco", - PayloadSecurity: usp_record.Record_PLAINTEXT, - RecordType: &usp_record.Record_NoSessionContext{ - NoSessionContext: &usp_record.NoSessionContextRecord{ - Payload: teste, - }, - }, - } + record := utils.NewUspRecord(teste, deviceMac) tr369Message, err := proto.Marshal(&record) if err != nil { - log.Fatalln("Failed to encode address book:", err) + log.Fatalln("Failed to encode tr369 record:", err) } m.Publish(tr369Message, "oktopus/v1/agent/"+deviceMac, "oktopus/v1/controller/"+deviceMac) } -func (m *Mqtt) handleDevicesResponse(p []byte) { +func (m *Mqtt) handleNewDevicesResponse(p []byte, sn string) { var record usp_record.Record var message usp_msg.Msg @@ -286,10 +321,24 @@ func (m *Mqtt) handleDevicesResponse(p []byte) { device.Vendor = msg.ReqPathResults[0].ResolvedPathResults[0].ResultParams["Manufacturer"] device.Model = msg.ReqPathResults[1].ResolvedPathResults[0].ResultParams["ModelName"] device.Version = msg.ReqPathResults[2].ResolvedPathResults[0].ResultParams["SoftwareVersion"] - device.SN = msg.ReqPathResults[3].ResolvedPathResults[0].ResultParams["SerialNumber"] + device.SN = sn + device.Status = utils.Online err = m.DB.CreateDevice(device) if err != nil { log.Fatal(err) } } + +func (m *Mqtt) handleDevicesDisconnect(p string) { + // Update status of device at database + err := m.DB.UpdateStatus(p, utils.Offline) + if err != nil { + log.Fatal(err) + } +} + +/* +func (m *Mqtt) Request(msg []byte, msgType usp_msg.Header_MsgType, pubTopic string, respTopic string) { + m.Publish(msg, pubTopic, respTopic) +}*/ diff --git a/backend/services/controller/internal/mqtt/mqtt_api.go b/backend/services/controller/internal/mqtt/mqtt_api.go new file mode 100644 index 0000000..cbe686d --- /dev/null +++ b/backend/services/controller/internal/mqtt/mqtt_api.go @@ -0,0 +1,52 @@ +package mqtt + +// +//import ( +// usp_msg "github.com/leandrofars/oktopus/internal/usp_message" +// "github.com/leandrofars/oktopus/internal/usp_record" +// "google.golang.org/protobuf/proto" +// "log" +//) +// +//func SendGetMsg(sn string) { +// payload := usp_msg.Msg{ +// Header: &usp_msg.Header{ +// MsgId: "uniqueIdentifierForThismessage", +// MsgType: usp_msg.Header_GET, +// }, +// Body: &usp_msg.Body{ +// MsgBody: &usp_msg.Body_Request{ +// Request: &usp_msg.Request{ +// ReqType: &usp_msg.Request_Get{ +// Get: &usp_msg.Get{ +// ParamPaths: []string{ +// "Device.DeviceInfo.Manufacturer", +// "Device.DeviceInfo.ModelName", +// "Device.DeviceInfo.SoftwareVersion", +// }, +// MaxDepth: 1, +// }, +// }, +// }, +// }, +// }, +// } +// teste, _ := proto.Marshal(&payload) +// record := usp_record.Record{ +// Version: "0.1", +// ToId: sn, +// FromId: "leleco", +// PayloadSecurity: usp_record.Record_PLAINTEXT, +// RecordType: &usp_record.Record_NoSessionContext{ +// NoSessionContext: &usp_record.NoSessionContextRecord{ +// Payload: teste, +// }, +// }, +// } +// +// tr369Message, err := proto.Marshal(&record) +// if err != nil { +// log.Fatalln("Failed to encode address book:", err) +// } +// m.Publish(tr369Message, "oktopus/v1/agent/"+deviceMac, "oktopus/v1/controller/"+deviceMac) +//} diff --git a/backend/services/controller/internal/mtp/mtp.go b/backend/services/controller/internal/mtp/mtp.go index 5ba3583..c9ef304 100644 --- a/backend/services/controller/internal/mtp/mtp.go +++ b/backend/services/controller/internal/mtp/mtp.go @@ -15,6 +15,11 @@ type Broker interface { Disconnect() Publish(msg []byte, topic, respTopic string) Subscribe() + /* + At request method we're able to send a message to a topic + and wait until we have a response (in the same topic). + */ + //Request(msg []byte, msgType usp_msg.Header_MsgType, pubTopic string, subTopic string) } // Not used, since we are using a broker solution with MQTT. diff --git a/backend/services/controller/internal/utils/utils.go b/backend/services/controller/internal/utils/utils.go index 5feec9e..c636c4b 100644 --- a/backend/services/controller/internal/utils/utils.go +++ b/backend/services/controller/internal/utils/utils.go @@ -1,9 +1,19 @@ package utils import ( + "github.com/google/uuid" + usp_msg "github.com/leandrofars/oktopus/internal/usp_message" + "github.com/leandrofars/oktopus/internal/usp_record" "net" ) +//Status are saved at database as numbers +const ( + Online = iota + Associating + Offline +) + // Get interfaces MACs, and the first interface MAC is gonna be used as mqtt clientId func GetMacAddr() ([]string, error) { ifas, err := net.Interfaces() @@ -19,3 +29,35 @@ func GetMacAddr() ([]string, error) { } return as, nil } + +func NewUspRecord(p []byte, toId string) usp_record.Record { + return usp_record.Record{ + Version: "0.1", + ToId: toId, + FromId: "leleco", + PayloadSecurity: usp_record.Record_PLAINTEXT, + RecordType: &usp_record.Record_NoSessionContext{ + NoSessionContext: &usp_record.NoSessionContextRecord{ + Payload: p, + }, + }, + } +} + +func NewGetMsg(getStuff usp_msg.Get) usp_msg.Msg { + return usp_msg.Msg{ + Header: &usp_msg.Header{ + MsgId: uuid.NewString(), + MsgType: usp_msg.Header_GET, + }, + Body: &usp_msg.Body{ + MsgBody: &usp_msg.Body_Request{ + Request: &usp_msg.Request{ + ReqType: &usp_msg.Request_Get{ + Get: &getStuff, + }, + }, + }, + }, + } +} diff --git a/backend/services/mochi/cmd/auth.json b/backend/services/mochi/cmd/auth.json index 65c72a9..f05253a 100644 --- a/backend/services/mochi/cmd/auth.json +++ b/backend/services/mochi/cmd/auth.json @@ -32,14 +32,16 @@ "username": "leandro", "filters": { "oktopus/+/agent/+": 1, - "oktopus/+/controller/+": 2 + "oktopus/+/controller/+": 2, + "oktopus/+/get/+": 2 } }, { "username": "steve", "filters": { "oktopus/+/agent/+": 1, - "oktopus/+/controller/+": 2 + "oktopus/+/controller/+": 2, + "oktopus/+/get/+": 2 } }, { diff --git a/backend/services/mochi/cmd/main.go b/backend/services/mochi/cmd/main.go index b91ce7c..db3a4b7 100644 --- a/backend/services/mochi/cmd/main.go +++ b/backend/services/mochi/cmd/main.go @@ -15,8 +15,10 @@ import ( "strings" "syscall" + rv8 "github.com/go-redis/redis/v8" "github.com/mochi-co/mqtt/v2" "github.com/mochi-co/mqtt/v2/hooks/auth" + "github.com/mochi-co/mqtt/v2/hooks/storage/redis" "github.com/mochi-co/mqtt/v2/listeners" ) @@ -34,6 +36,7 @@ var server = mqtt.New(&mqtt.Options{ func main() { tcpAddr := flag.String("tcp", ":1883", "network address for TCP listener") + redisAddr := flag.String("redis", "172.17.0.2:6379", "host address of redis db") wsAddr := flag.String("ws", "", "network address for Websocket listener") infoAddr := flag.String("info", "", "network address for web info dashboard listener") path := flag.String("path", "", "path to data auth file") @@ -98,6 +101,17 @@ func main() { log.Fatal(err) } + err = server.AddHook(new(redis.Hook), &redis.Options{ + Options: &rv8.Options{ + Addr: *redisAddr, // default redis address + Password: "", // your password + DB: 0, // your redis db + }, + }) + if err != nil { + log.Fatal(err) + } + go func() { err := server.Serve() if err != nil { @@ -123,6 +137,7 @@ func (h *MyHook) ID() string { func (h *MyHook) Provides(b byte) bool { return bytes.Contains([]byte{ mqtt.OnSubscribed, + mqtt.OnDisconnect, }, []byte{b}) } @@ -131,6 +146,26 @@ func (h *MyHook) Init(config any) error { return nil } +func (h *MyHook) Red(config any) error { + h.Log.Info().Msg("initialised") + return nil +} + +func (h *MyHook) OnDisconnect(cl *mqtt.Client, err error, expire bool) { + var clUser string + if len(cl.Properties.Props.User) > 0 { + clUser = cl.Properties.Props.User[0].Val + } + + if clUser != "" { + sn := strings.Split(clUser, "-") + err := server.Publish("oktopus/disconnect", []byte(sn[1]), false, 1) + if err != nil { + log.Println("server publish error: ", err) + } + } +} + func (h *MyHook) OnSubscribed(cl *mqtt.Client, pk packets.Packet, reasonCodes []byte) { // Verifies if it's a device who is subscribed if strings.Contains(pk.Filters[0].Filter, "oktopus/v1/agent") {