From f6d53f8e73ba4c181ff89ad513c9a345a92427d8 Mon Sep 17 00:00:00 2001 From: leandrofars Date: Fri, 27 Oct 2023 01:47:04 -0300 Subject: [PATCH] feat(controller): connect to stomp server --- backend/services/controller/.env | 7 +- .../services/controller/cmd/oktopus/main.go | 84 ++++++++++++++----- backend/services/controller/go.mod | 1 + backend/services/controller/go.sum | 2 + .../services/controller/internal/mqtt/mqtt.go | 2 +- .../services/controller/internal/mtp/mtp.go | 12 +-- 6 files changed, 81 insertions(+), 27 deletions(-) diff --git a/backend/services/controller/.env b/backend/services/controller/.env index 728200d..76615a5 100644 --- a/backend/services/controller/.env +++ b/backend/services/controller/.env @@ -10,4 +10,9 @@ BROKER_PASSWORD="" BROKER_CLIENTID="" BROKER_QOS="" REST_API_PORT="" -REST_API_CORS="" # addresses must be separated by commas example: "http://localhost:3000,http://myapp.com" \ No newline at end of file +REST_API_CORS="" # addresses must be separated by commas example: "http://localhost:3000,http://myapp.com" +STOMP_ADDR="" # example: localhost:61613 +STOMP_USERNAME="" +STOMP_PASSWORD="" +MQTT_DISABLE="" +STOMP_DISABLE="" \ No newline at end of file diff --git a/backend/services/controller/cmd/oktopus/main.go b/backend/services/controller/cmd/oktopus/main.go index 8bb9f0b..de354aa 100755 --- a/backend/services/controller/cmd/oktopus/main.go +++ b/backend/services/controller/cmd/oktopus/main.go @@ -15,10 +15,10 @@ import ( "github.com/joho/godotenv" "github.com/leandrofars/oktopus/internal/api" "github.com/leandrofars/oktopus/internal/db" - usp_msg "github.com/leandrofars/oktopus/internal/usp_message" - "github.com/leandrofars/oktopus/internal/mqtt" "github.com/leandrofars/oktopus/internal/mtp" + "github.com/leandrofars/oktopus/internal/stomp" + usp_msg "github.com/leandrofars/oktopus/internal/usp_message" ) const VERSION = "0.0.1" @@ -54,8 +54,8 @@ func main() { log.Println("Starting Oktopus Project TR-369 Controller Version:", VERSION) // fl_endpointId := flag.String("endpoint_id", "proto::oktopus-controller", "Defines the enpoint id the Agent must trust on.") - flDevicesTopic := flag.String("d", lookupEnvOrString("DEVICES_STATUS_TOPIC", "oktopus/+/status/+"), "That's the topic mqtt broker end new devices info.") - flSubTopic := flag.String("sub", lookupEnvOrString("DEVICE_PUB_TOPIC", "oktopus/+/controller/+"), "That's the topic agent must publish to, and the controller keeps on listening.") + flDevicesTopic := flag.String("d", lookupEnvOrString("DEVICES_STATUS_TOPIC", "oktopus/+/status/+"), "That's the topic mqtt broker send new devices info.") + flSubTopic := flag.String("sub", lookupEnvOrString("DEVICE_PUB_TOPIC", "oktopus/+/controller/+"), "That's the topic agent must publish to") flBrokerAddr := flag.String("a", lookupEnvOrString("BROKER_ADDR", "localhost"), "Mqtt broker adrress") flBrokerPort := flag.String("p", lookupEnvOrString("BROKER_PORT", "1883"), "Mqtt broker port") flTlsCert := flag.Bool("tls", lookupEnvOrBool("BROKER_TLS", false), "Connect to broker over TLS") @@ -65,6 +65,11 @@ func main() { flBrokerQos := flag.Int("q", lookupEnvOrInt("BROKER_QOS", 0), "Quality of service of mqtt messages delivery") flAddrDB := flag.String("mongo", lookupEnvOrString("MONGO_URI", "mongodb://localhost:27017"), "MongoDB URI") flApiPort := flag.String("ap", lookupEnvOrString("REST_API_PORT", "8000"), "Rest api port") + flStompAddr := flag.String("stomp", lookupEnvOrString("STOMP_ADDR", "127.0.0.1:61613"), "Stomp broker address") + flStompUser := flag.String("stomp_user", lookupEnvOrString("STOMP_USERNAME", ""), "Stomp broker username") + flStompPasswd := flag.String("stomp_passwd", lookupEnvOrString("STOMP_PASSWORD", ""), "Stomp broker password") + flDisableStomp := flag.Bool("stomp_disable", lookupEnvOrBool("STOMP_DISABLE", false), "Disable STOMP MTP") + flDisableMqtt := flag.Bool("mqtt_disable", lookupEnvOrBool("MQTT_DISABLE", false), "Disable MQTT MTP") flHelp := flag.Bool("help", false, "Help") flag.Parse() @@ -81,26 +86,65 @@ func main() { database := db.NewDatabase(ctx, *flAddrDB) apiMsgQueue := make(map[string](chan usp_msg.Msg)) var m sync.Mutex + /* If you want to use another message protocol just make it implement Broker interface. */ - mqttClient := mqtt.Mqtt{ - Addr: *flBrokerAddr, - Port: *flBrokerPort, - Id: *flBrokerClientId, - User: *flBrokerUsername, - Passwd: *flBrokerPassword, - Ctx: ctx, - QoS: *flBrokerQos, - SubTopic: *flSubTopic, - DevicesTopic: *flDevicesTopic, - TLS: *flTlsCert, - DB: database, - MsgQueue: apiMsgQueue, - QMutex: &m, + log.Println("Start MTP protocols: MQTT | Websockets | STOMP") + + if *flDisableMqtt && *flDisableStomp { + log.Println("ERROR: you have to enable at least one MTP") + os.Exit(0) } - mtp.MtpService(&mqttClient, done) + wg := new(sync.WaitGroup) + wg.Add(2) + + var stompClient stomp.Stomp + var mqttClient mqtt.Mqtt + + go func() { + mqttClient = mqtt.Mqtt{ + Addr: *flBrokerAddr, + Port: *flBrokerPort, + Id: *flBrokerClientId, + User: *flBrokerUsername, + Passwd: *flBrokerPassword, + Ctx: ctx, + QoS: *flBrokerQos, + SubTopic: *flSubTopic, + DevicesTopic: *flDevicesTopic, + TLS: *flTlsCert, + DB: database, + MsgQueue: apiMsgQueue, + QMutex: &m, + } + + if !*flDisableMqtt { + // MQTT will try connect to broker forever + go mtp.MtpService(&mqttClient, done, wg) + } else { + wg.Done() + } + }() + + go func() { + stompClient = stomp.Stomp{ + Addr: *flStompAddr, + Username: *flStompUser, + Password: *flStompPasswd, + } + + if !*flDisableStomp { + // STOMP will try to connect for a bunch of times and then exit + go mtp.MtpService(&stompClient, done, wg) + } else { + wg.Done() + } + }() + + wg.Wait() + a := api.NewApi(*flApiPort, database, &mqttClient, apiMsgQueue, &m) api.StartApi(a) @@ -133,7 +177,7 @@ func lookupEnvOrBool(key string, defaultVal bool) bool { if val, _ := os.LookupEnv(key); val != "" { v, err := strconv.ParseBool(val) if err != nil { - log.Fatalf("LookupEnvOrInt[%s]: %v", key, err) + log.Fatalf("LookupEnvOrBool[%s]: %v", key, err) } return v } diff --git a/backend/services/controller/go.mod b/backend/services/controller/go.mod index 7b3f785..8b2af54 100755 --- a/backend/services/controller/go.mod +++ b/backend/services/controller/go.mod @@ -17,6 +17,7 @@ require ( ) require ( + github.com/go-stomp/stomp v2.1.4+incompatible // indirect github.com/gofrs/uuid v4.0.0+incompatible // indirect github.com/golang/snappy v0.0.1 // indirect github.com/gomodule/redigo v1.8.4 // indirect diff --git a/backend/services/controller/go.sum b/backend/services/controller/go.sum index 44fe0f2..009aa33 100644 --- a/backend/services/controller/go.sum +++ b/backend/services/controller/go.sum @@ -5,6 +5,8 @@ github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumC github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= github.com/eclipse/paho.golang v0.10.0 h1:oUGPjRwWcZQRgDD9wVDV7y7i7yBSxts3vcvcNJo8B4Q= github.com/eclipse/paho.golang v0.10.0/go.mod h1:rhrV37IEwauUyx8FHrvmXOKo+QRKng5ncoN1vJiJMcs= +github.com/go-stomp/stomp v2.1.4+incompatible h1:D3SheUVDOz9RsjVWkoh/1iCOwD0qWjyeTZMUZ0EXg2Y= +github.com/go-stomp/stomp v2.1.4+incompatible/go.mod h1:VqCtqNZv1226A1/79yh+rMiFUcfY3R109np+7ke4n0c= github.com/gofrs/uuid v4.0.0+incompatible h1:1SD/1F5pU8p29ybwgQSwpQk+mwdRrXCYuPhW6m+TnJw= github.com/gofrs/uuid v4.0.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= diff --git a/backend/services/controller/internal/mqtt/mqtt.go b/backend/services/controller/internal/mqtt/mqtt.go index 62d60e8..4d0e3fd 100644 --- a/backend/services/controller/internal/mqtt/mqtt.go +++ b/backend/services/controller/internal/mqtt/mqtt.go @@ -127,7 +127,7 @@ func (m *Mqtt) Publish(msg []byte, topic, respTopic string, retain bool) { /* -------------------------------------------------------------------------- */ func (m *Mqtt) buildClientConfig(status, controller, apiMsg chan *paho.Publish) *paho.ClientConfig { - log.Println("Starting new mqtt client") + log.Println("Starting new MQTT client") singleHandler := paho.NewSingleHandlerRouter(func(p *paho.Publish) { if strings.Contains(p.Topic, "status") { status <- p diff --git a/backend/services/controller/internal/mtp/mtp.go b/backend/services/controller/internal/mtp/mtp.go index fc78e7d..584e853 100644 --- a/backend/services/controller/internal/mtp/mtp.go +++ b/backend/services/controller/internal/mtp/mtp.go @@ -4,11 +4,12 @@ package mtp import ( "log" "os" + "sync" ) /* - Message Transfer Protocol layer, which can use WebSockets, MQTT, COAP or STOMP; as defined in tr369 protocol. - It was made thinking in a broker architeture instead of a server-client p2p. +Message Transfer Protocol layer, which can use WebSockets, MQTT, COAP or STOMP; as defined in tr369 protocol. +It was made thinking in a broker architeture instead of a server-client p2p. */ type Broker interface { Connect() @@ -22,17 +23,18 @@ type Broker interface { //Request(msg []byte, msgType usp_msg.Header_MsgType, pubTopic string, subTopic string) } -// Not used, since we are using a broker solution with MQTT. +// Not used, since we are using a broker solution. type P2P interface { } // Start the service which enable the communication with IoTs (MTP protocol layer). -func MtpService(b Broker, done chan os.Signal) { +func MtpService(b Broker, done chan os.Signal, wg *sync.WaitGroup) { b.Connect() + wg.Done() go func() { for range done { b.Disconnect() - log.Println("Successfully disconnected to broker!") + log.Println("Successfully disconnected to MTPs!") // Receives signal and then replicates it to the rest of the app. done <- os.Interrupt