feat(controller): send publish to IoT and watch new connections

This commit is contained in:
Leandro Antônio Farias Machado 2023-03-28 11:49:10 -03:00
parent f10eda9683
commit f816e19f75
4 changed files with 133 additions and 39 deletions

View File

@ -25,10 +25,9 @@ func main() {
log.SetFlags(log.LstdFlags | log.Lshortfile)
log.Println("Starting Oktopus Project TR-369 Controller Version:", VERSION)
//flBroker := flag.Bool("m", false, "Defines if mosquitto container must run or not")
// fl_endpointId := flag.String("endpoint_id", "proto::oktopus-controller", "Defines the enpoint id the Agent must trust on.")
flSubTopic := flag.String("s", "oktopus/+/agent/+", "That's the topic agent must publish to, and the controller keeps on listening.")
// fl_pub_topic := flag.String("pub_topic", "oktopus/v1/controller", "That's the topic controller must publish to, and the agent keeps on listening.")
flDevicesTopic := flag.String("d", "oktopus/devices", "That's the topic mqtt broker end new devices info.")
flSubTopic := flag.String("sub", "oktopus/+/controller/+", "That's the topic agent must publish to, and the controller keeps on listening.")
flBrokerAddr := flag.String("a", "localhost", "Mqtt broker adrress")
flBrokerPort := flag.String("p", "1883", "Mqtt broker port")
flTlsCert := flag.String("ca", "", "TLS ca certificate")
@ -44,10 +43,6 @@ func main() {
flag.Usage()
os.Exit(0)
}
//if *flBroker {
// log.Println("Starting Mqtt Broker")
// mqtt.StartMqttBroker()
//}
/*
This context suppress our needs, but we can use a more sofisticate
approach with cancel and timeout options passing it through paho mqtt functions.
@ -58,15 +53,16 @@ func main() {
If you want to use another message protocol just make it implement Broker interface.
*/
mqttClient := mqtt.Mqtt{
Addr: *flBrokerAddr,
Port: *flBrokerPort,
Id: *flBrokerClientId,
User: *flBrokerUsername,
Passwd: *flBrokerPassword,
Ctx: ctx,
QoS: *flBrokerQos,
SubTopic: *flSubTopic,
CA: *flTlsCert,
Addr: *flBrokerAddr,
Port: *flBrokerPort,
Id: *flBrokerClientId,
User: *flBrokerUsername,
Passwd: *flBrokerPassword,
Ctx: ctx,
QoS: *flBrokerQos,
SubTopic: *flSubTopic,
DevicesTopic: *flDevicesTopic,
CA: *flTlsCert,
}
mtp.MtpService(&mqttClient, done)

View File

@ -4,25 +4,31 @@ import (
"context"
"crypto/tls"
"crypto/x509"
usp_msg "github.com/leandrofars/oktopus/internal/usp_message"
"github.com/leandrofars/oktopus/internal/usp_record"
"google.golang.org/protobuf/proto"
"io/ioutil"
"log"
"net"
"strings"
"sync"
"time"
"github.com/eclipse/paho.golang/paho"
"github.com/leandrofars/oktopus/internal/utils"
)
type Mqtt struct {
Addr string
Port string
Id string
User string
Passwd string
Ctx context.Context
QoS int
SubTopic string
CA string
Addr string
Port string
Id string
User string
Passwd string
Ctx context.Context
QoS int
SubTopic string
DevicesTopic string
CA string
}
var c *paho.Client
@ -30,15 +36,20 @@ var c *paho.Client
/* ------------------- Implementations of broker interface ------------------ */
func (m *Mqtt) Connect() {
msgChan := make(chan *paho.Publish)
go messageHandler(msgChan)
clientConfig := startClient(m.Addr, m.Port, m.CA, m.Ctx, msgChan)
devices := make(chan *paho.Publish)
controller := make(chan *paho.Publish)
go m.messageHandler(devices, controller)
clientConfig := m.startClient(devices, controller)
connParameters := startConnection(m.Id, m.User, m.Passwd)
conn, err := clientConfig.Connect(m.Ctx, &connParameters)
if err != nil {
log.Println(err)
}
if conn.ReasonCode != 0 {
log.Fatalf("Failed to connect to %s : %d - %s", m.Addr+m.Port, conn.ReasonCode, conn.Properties.ReasonString)
}
// Sets global client to be used by other mqtt functions
c = clientConfig
@ -60,24 +71,48 @@ func (m *Mqtt) Disconnect() {
func (m *Mqtt) Subscribe() {
if _, err := c.Subscribe(m.Ctx, &paho.Subscribe{
Subscriptions: map[string]paho.SubscribeOptions{
m.SubTopic: {QoS: byte(m.QoS), NoLocal: true},
m.SubTopic: {QoS: byte(m.QoS), NoLocal: true},
m.DevicesTopic: {QoS: byte(m.QoS), NoLocal: true},
},
}); err != nil {
log.Fatalln(err)
}
log.Printf("Subscribed to %s", m.SubTopic)
log.Printf("Subscribed to %s", m.DevicesTopic)
}
func (m *Mqtt) Publish(msg []byte, topic, respTopic string) {
if _, err := c.Publish(context.Background(), &paho.Publish{
Topic: topic,
QoS: byte(m.QoS),
Retain: false,
Payload: msg,
Properties: &paho.PublishProperties{
ResponseTopic: respTopic,
},
}); err != nil {
log.Println("error sending message:", err)
}
log.Printf("Published to %s", topic)
}
/* -------------------------------------------------------------------------- */
func startClient(addr string, port string, tlsCa string, ctx context.Context, msgChan chan *paho.Publish) *paho.Client {
singleHandler := paho.NewSingleHandlerRouter(func(m *paho.Publish) {
msgChan <- m
func (m *Mqtt) startClient(devices, controller chan *paho.Publish) *paho.Client {
singleHandler := paho.NewSingleHandlerRouter(func(p *paho.Publish) {
if p.Topic == m.DevicesTopic {
devices <- p
} else if strings.Contains(p.Topic, "controller") {
controller <- p
} else {
log.Println("No handler for topic: ", p.Topic)
}
})
if tlsCa != "" {
conn := connWithTls(tlsCa, addr+":"+port, ctx)
if m.CA != "" {
conn := connWithTls(m.CA, m.Addr+":"+m.Port, m.Ctx)
clientConfig := paho.ClientConfig{
Conn: conn,
Router: singleHandler,
@ -91,7 +126,7 @@ func startClient(addr string, port string, tlsCa string, ctx context.Context, ms
return paho.NewClient(clientConfig)
}
conn, err := net.Dial("tcp", addr+":"+port)
conn, err := net.Dial("tcp", m.Addr+":"+m.Port)
if err != nil {
log.Println(err)
}
@ -179,8 +214,71 @@ func startConnection(id, user, pass string) paho.Connect {
return connParameters
}
func messageHandler(msg chan *paho.Publish) {
for m := range msg {
log.Println("Received message:", string(m.Payload))
func (m *Mqtt) messageHandler(devices, controller chan *paho.Publish) {
for {
select {
case d := <-devices:
payload := string(d.Payload)
log.Println("New device: ", payload)
m.handleNewDevice(payload)
case c := <-controller:
m.handleDevicesResponse(c.Payload)
}
}
}
func (m *Mqtt) handleNewDevice(deviceMac string) {
payload := usp_msg.Msg{
Header: &usp_msg.Header{
MsgId: "uniqueIdentifierForThismessage",
MsgType: usp_msg.Header_GET,
},
Body: &usp_msg.Body{
MsgBody: &usp_msg.Body_Request{
Request: &usp_msg.Request{
ReqType: &usp_msg.Request_Get{
Get: &usp_msg.Get{
ParamPaths: []string{"Device.DeviceInfo."},
MaxDepth: 1,
},
},
},
},
},
}
teste, _ := proto.Marshal(&payload)
record := usp_record.Record{
Version: "0.1",
ToId: deviceMac,
FromId: "leleco",
PayloadSecurity: usp_record.Record_PLAINTEXT,
RecordType: &usp_record.Record_NoSessionContext{
NoSessionContext: &usp_record.NoSessionContextRecord{
Payload: teste,
},
},
}
tr369Message, err := proto.Marshal(&record)
if err != nil {
log.Fatalln("Failed to encode address book:", err)
}
time.Sleep(5 * time.Second)
m.Publish(tr369Message, "oktopus/v1/agent/"+deviceMac, "oktopus/v1/controller/"+deviceMac)
}
func (m *Mqtt) handleDevicesResponse(p []byte) {
var record usp_record.Record
var message usp_msg.Msg
err := proto.Unmarshal(p, &record)
if err != nil {
log.Fatal(err)
}
err = proto.Unmarshal(record.GetNoSessionContext().Payload, &message)
if err != nil {
log.Fatal(err)
}
log.Printf("Received a usp_message: %s\n", message.String())
}

View File

@ -13,7 +13,7 @@ import (
type Broker interface {
Connect()
Disconnect()
// Publish()
Publish(msg []byte, topic, respTopic string)
Subscribe()
}

Binary file not shown.

After

Width:  |  Height:  |  Size: 42 KiB