feat: save device status at database + add mqtt persistence at redis #30

This commit is contained in:
Leandro Antônio Farias Machado 2023-04-23 13:27:41 -03:00
parent 2d7da446fc
commit 3f28344ae3
6 changed files with 114 additions and 28 deletions

View File

@ -29,6 +29,7 @@ func main() {
log.Println("Starting Oktopus Project TR-369 Controller Version:", VERSION)
// fl_endpointId := flag.String("endpoint_id", "proto::oktopus-controller", "Defines the enpoint id the Agent must trust on.")
flDevicesTopic := flag.String("d", "oktopus/devices", "That's the topic mqtt broker end new devices info.")
flDisconTopic := flag.String("dis", "oktopus/disconnect", "It's where disconnected IoTs are known.")
flSubTopic := flag.String("sub", "oktopus/+/controller/+", "That's the topic agent must publish to, and the controller keeps on listening.")
flBrokerAddr := flag.String("a", "localhost", "Mqtt broker adrress")
flBrokerPort := flag.String("p", "1883", "Mqtt broker port")
@ -57,17 +58,18 @@ func main() {
If you want to use another message protocol just make it implement Broker interface.
*/
mqttClient := mqtt.Mqtt{
Addr: *flBrokerAddr,
Port: *flBrokerPort,
Id: *flBrokerClientId,
User: *flBrokerUsername,
Passwd: *flBrokerPassword,
Ctx: ctx,
QoS: *flBrokerQos,
SubTopic: *flSubTopic,
DevicesTopic: *flDevicesTopic,
CA: *flTlsCert,
DB: database,
Addr: *flBrokerAddr,
Port: *flBrokerPort,
Id: *flBrokerClientId,
User: *flBrokerUsername,
Passwd: *flBrokerPassword,
Ctx: ctx,
QoS: *flBrokerQos,
SubTopic: *flSubTopic,
DevicesTopic: *flDevicesTopic,
DisconnectTopic: *flDisconTopic,
CA: *flTlsCert,
DB: database,
}
mtp.MtpService(&mqttClient, done)

View File

@ -13,6 +13,7 @@ type Device struct {
Customer string
Vendor string
Version string
Status uint8
}
func (d *Database) CreateDevice(device Device) error {

View File

@ -0,0 +1,21 @@
package db
import (
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"log"
)
func (d *Database) UpdateStatus(sn string, status uint8) error {
var result bson.M
err := d.devices.FindOneAndUpdate(d.ctx, bson.D{{"sn", sn}}, bson.D{{"$set", bson.D{{"status", status}}}}).Decode(&result)
if err != nil {
if err == mongo.ErrNoDocuments {
log.Printf("Device %s is not mapped into database", sn)
return nil
}
log.Println(err)
}
log.Printf("%s is now offline.", sn)
return err
}

View File

@ -18,17 +18,18 @@ import (
)
type Mqtt struct {
Addr string
Port string
Id string
User string
Passwd string
Ctx context.Context
QoS int
SubTopic string
DevicesTopic string
CA string
DB db.Database
Addr string
Port string
Id string
User string
Passwd string
Ctx context.Context
QoS int
SubTopic string
DevicesTopic string
DisconnectTopic string
CA string
DB db.Database
}
var c *paho.Client
@ -38,8 +39,9 @@ var c *paho.Client
func (m *Mqtt) Connect() {
devices := make(chan *paho.Publish)
controller := make(chan *paho.Publish)
go m.messageHandler(devices, controller)
clientConfig := m.startClient(devices, controller)
disconnect := make(chan *paho.Publish)
go m.messageHandler(devices, controller, disconnect)
clientConfig := m.startClient(devices, controller, disconnect)
connParameters := startConnection(m.Id, m.User, m.Passwd)
conn, err := clientConfig.Connect(m.Ctx, &connParameters)
@ -67,8 +69,9 @@ func (m *Mqtt) Disconnect() {
func (m *Mqtt) Subscribe() {
if _, err := c.Subscribe(m.Ctx, &paho.Subscribe{
Subscriptions: map[string]paho.SubscribeOptions{
m.SubTopic: {QoS: byte(m.QoS), NoLocal: true},
m.DevicesTopic: {QoS: byte(m.QoS), NoLocal: true},
m.SubTopic: {QoS: byte(m.QoS), NoLocal: true},
m.DevicesTopic: {QoS: byte(m.QoS), NoLocal: true},
m.DisconnectTopic: {QoS: byte(m.QoS), NoLocal: true},
},
}); err != nil {
log.Fatalln(err)
@ -76,6 +79,8 @@ func (m *Mqtt) Subscribe() {
log.Printf("Subscribed to %s", m.SubTopic)
log.Printf("Subscribed to %s", m.DevicesTopic)
log.Printf("Subscribed to %s", m.DisconnectTopic)
}
func (m *Mqtt) Publish(msg []byte, topic, respTopic string) {
@ -96,12 +101,14 @@ func (m *Mqtt) Publish(msg []byte, topic, respTopic string) {
/* -------------------------------------------------------------------------- */
func (m *Mqtt) startClient(devices, controller chan *paho.Publish) *paho.Client {
func (m *Mqtt) startClient(devices, controller, disconnect chan *paho.Publish) *paho.Client {
singleHandler := paho.NewSingleHandlerRouter(func(p *paho.Publish) {
if p.Topic == m.DevicesTopic {
devices <- p
} else if strings.Contains(p.Topic, "controller") {
controller <- p
} else if p.Topic == m.DisconnectTopic {
disconnect <- p
} else {
log.Println("No handler for topic: ", p.Topic)
}
@ -210,7 +217,7 @@ func startConnection(id, user, pass string) paho.Connect {
return connParameters
}
func (m *Mqtt) messageHandler(devices, controller chan *paho.Publish) {
func (m *Mqtt) messageHandler(devices, controller, disconnect chan *paho.Publish) {
for {
select {
case d := <-devices:
@ -219,6 +226,10 @@ func (m *Mqtt) messageHandler(devices, controller chan *paho.Publish) {
m.handleNewDevice(payload)
case c := <-controller:
m.handleDevicesResponse(c.Payload)
case dis := <-disconnect:
payload := string(dis.Payload)
log.Println("Device disconnected: ", payload)
m.handleDevicesDisconnect(payload)
}
}
}
@ -287,9 +298,18 @@ func (m *Mqtt) handleDevicesResponse(p []byte) {
device.Model = msg.ReqPathResults[1].ResolvedPathResults[0].ResultParams["ModelName"]
device.Version = msg.ReqPathResults[2].ResolvedPathResults[0].ResultParams["SoftwareVersion"]
device.SN = msg.ReqPathResults[3].ResolvedPathResults[0].ResultParams["SerialNumber"]
device.Status = utils.Online
err = m.DB.CreateDevice(device)
if err != nil {
log.Fatal(err)
}
}
func (m *Mqtt) handleDevicesDisconnect(p string) {
// Update status of device at database
err := m.DB.UpdateStatus(p, utils.Offline)
if err != nil {
log.Fatal(err)
}
}

View File

@ -4,6 +4,13 @@ import (
"net"
)
//Status are saved at database as numbers
const (
Online = iota
Associating
Offline
)
// Get interfaces MACs, and the first interface MAC is gonna be used as mqtt clientId
func GetMacAddr() ([]string, error) {
ifas, err := net.Interfaces()

View File

@ -15,8 +15,10 @@ import (
"strings"
"syscall"
rv8 "github.com/go-redis/redis/v8"
"github.com/mochi-co/mqtt/v2"
"github.com/mochi-co/mqtt/v2/hooks/auth"
"github.com/mochi-co/mqtt/v2/hooks/storage/redis"
"github.com/mochi-co/mqtt/v2/listeners"
)
@ -34,6 +36,7 @@ var server = mqtt.New(&mqtt.Options{
func main() {
tcpAddr := flag.String("tcp", ":1883", "network address for TCP listener")
redisAddr := flag.String("redis", "172.17.0.2:6379", "host address of redis db")
wsAddr := flag.String("ws", "", "network address for Websocket listener")
infoAddr := flag.String("info", "", "network address for web info dashboard listener")
path := flag.String("path", "", "path to data auth file")
@ -98,6 +101,17 @@ func main() {
log.Fatal(err)
}
err = server.AddHook(new(redis.Hook), &redis.Options{
Options: &rv8.Options{
Addr: *redisAddr, // default redis address
Password: "", // your password
DB: 0, // your redis db
},
})
if err != nil {
log.Fatal(err)
}
go func() {
err := server.Serve()
if err != nil {
@ -123,6 +137,7 @@ func (h *MyHook) ID() string {
func (h *MyHook) Provides(b byte) bool {
return bytes.Contains([]byte{
mqtt.OnSubscribed,
mqtt.OnDisconnect,
}, []byte{b})
}
@ -131,6 +146,26 @@ func (h *MyHook) Init(config any) error {
return nil
}
func (h *MyHook) Red(config any) error {
h.Log.Info().Msg("initialised")
return nil
}
func (h *MyHook) OnDisconnect(cl *mqtt.Client, err error, expire bool) {
var clUser string
if len(cl.Properties.Props.User) > 0 {
clUser = cl.Properties.Props.User[0].Val
}
if clUser != "" {
sn := strings.Split(clUser, "-")
err := server.Publish("oktopus/disconnect", []byte(sn[1]), false, 1)
if err != nil {
log.Println("server publish error: ", err)
}
}
}
func (h *MyHook) OnSubscribed(cl *mqtt.Client, pk packets.Packet, reasonCodes []byte) {
// Verifies if it's a device who is subscribed
if strings.Contains(pk.Filters[0].Filter, "oktopus/v1/agent") {