feat(controller): connect to stomp server

This commit is contained in:
leandrofars 2023-10-27 01:47:04 -03:00
parent c26cf8ebd0
commit f6d53f8e73
6 changed files with 81 additions and 27 deletions

View File

@ -10,4 +10,9 @@ BROKER_PASSWORD=""
BROKER_CLIENTID=""
BROKER_QOS=""
REST_API_PORT=""
REST_API_CORS="" # addresses must be separated by commas example: "http://localhost:3000,http://myapp.com"
REST_API_CORS="" # addresses must be separated by commas example: "http://localhost:3000,http://myapp.com"
STOMP_ADDR="" # example: localhost:61613
STOMP_USERNAME=""
STOMP_PASSWORD=""
MQTT_DISABLE=""
STOMP_DISABLE=""

View File

@ -15,10 +15,10 @@ import (
"github.com/joho/godotenv"
"github.com/leandrofars/oktopus/internal/api"
"github.com/leandrofars/oktopus/internal/db"
usp_msg "github.com/leandrofars/oktopus/internal/usp_message"
"github.com/leandrofars/oktopus/internal/mqtt"
"github.com/leandrofars/oktopus/internal/mtp"
"github.com/leandrofars/oktopus/internal/stomp"
usp_msg "github.com/leandrofars/oktopus/internal/usp_message"
)
const VERSION = "0.0.1"
@ -54,8 +54,8 @@ func main() {
log.Println("Starting Oktopus Project TR-369 Controller Version:", VERSION)
// fl_endpointId := flag.String("endpoint_id", "proto::oktopus-controller", "Defines the enpoint id the Agent must trust on.")
flDevicesTopic := flag.String("d", lookupEnvOrString("DEVICES_STATUS_TOPIC", "oktopus/+/status/+"), "That's the topic mqtt broker end new devices info.")
flSubTopic := flag.String("sub", lookupEnvOrString("DEVICE_PUB_TOPIC", "oktopus/+/controller/+"), "That's the topic agent must publish to, and the controller keeps on listening.")
flDevicesTopic := flag.String("d", lookupEnvOrString("DEVICES_STATUS_TOPIC", "oktopus/+/status/+"), "That's the topic mqtt broker send new devices info.")
flSubTopic := flag.String("sub", lookupEnvOrString("DEVICE_PUB_TOPIC", "oktopus/+/controller/+"), "That's the topic agent must publish to")
flBrokerAddr := flag.String("a", lookupEnvOrString("BROKER_ADDR", "localhost"), "Mqtt broker adrress")
flBrokerPort := flag.String("p", lookupEnvOrString("BROKER_PORT", "1883"), "Mqtt broker port")
flTlsCert := flag.Bool("tls", lookupEnvOrBool("BROKER_TLS", false), "Connect to broker over TLS")
@ -65,6 +65,11 @@ func main() {
flBrokerQos := flag.Int("q", lookupEnvOrInt("BROKER_QOS", 0), "Quality of service of mqtt messages delivery")
flAddrDB := flag.String("mongo", lookupEnvOrString("MONGO_URI", "mongodb://localhost:27017"), "MongoDB URI")
flApiPort := flag.String("ap", lookupEnvOrString("REST_API_PORT", "8000"), "Rest api port")
flStompAddr := flag.String("stomp", lookupEnvOrString("STOMP_ADDR", "127.0.0.1:61613"), "Stomp broker address")
flStompUser := flag.String("stomp_user", lookupEnvOrString("STOMP_USERNAME", ""), "Stomp broker username")
flStompPasswd := flag.String("stomp_passwd", lookupEnvOrString("STOMP_PASSWORD", ""), "Stomp broker password")
flDisableStomp := flag.Bool("stomp_disable", lookupEnvOrBool("STOMP_DISABLE", false), "Disable STOMP MTP")
flDisableMqtt := flag.Bool("mqtt_disable", lookupEnvOrBool("MQTT_DISABLE", false), "Disable MQTT MTP")
flHelp := flag.Bool("help", false, "Help")
flag.Parse()
@ -81,26 +86,65 @@ func main() {
database := db.NewDatabase(ctx, *flAddrDB)
apiMsgQueue := make(map[string](chan usp_msg.Msg))
var m sync.Mutex
/*
If you want to use another message protocol just make it implement Broker interface.
*/
mqttClient := mqtt.Mqtt{
Addr: *flBrokerAddr,
Port: *flBrokerPort,
Id: *flBrokerClientId,
User: *flBrokerUsername,
Passwd: *flBrokerPassword,
Ctx: ctx,
QoS: *flBrokerQos,
SubTopic: *flSubTopic,
DevicesTopic: *flDevicesTopic,
TLS: *flTlsCert,
DB: database,
MsgQueue: apiMsgQueue,
QMutex: &m,
log.Println("Start MTP protocols: MQTT | Websockets | STOMP")
if *flDisableMqtt && *flDisableStomp {
log.Println("ERROR: you have to enable at least one MTP")
os.Exit(0)
}
mtp.MtpService(&mqttClient, done)
wg := new(sync.WaitGroup)
wg.Add(2)
var stompClient stomp.Stomp
var mqttClient mqtt.Mqtt
go func() {
mqttClient = mqtt.Mqtt{
Addr: *flBrokerAddr,
Port: *flBrokerPort,
Id: *flBrokerClientId,
User: *flBrokerUsername,
Passwd: *flBrokerPassword,
Ctx: ctx,
QoS: *flBrokerQos,
SubTopic: *flSubTopic,
DevicesTopic: *flDevicesTopic,
TLS: *flTlsCert,
DB: database,
MsgQueue: apiMsgQueue,
QMutex: &m,
}
if !*flDisableMqtt {
// MQTT will try connect to broker forever
go mtp.MtpService(&mqttClient, done, wg)
} else {
wg.Done()
}
}()
go func() {
stompClient = stomp.Stomp{
Addr: *flStompAddr,
Username: *flStompUser,
Password: *flStompPasswd,
}
if !*flDisableStomp {
// STOMP will try to connect for a bunch of times and then exit
go mtp.MtpService(&stompClient, done, wg)
} else {
wg.Done()
}
}()
wg.Wait()
a := api.NewApi(*flApiPort, database, &mqttClient, apiMsgQueue, &m)
api.StartApi(a)
@ -133,7 +177,7 @@ func lookupEnvOrBool(key string, defaultVal bool) bool {
if val, _ := os.LookupEnv(key); val != "" {
v, err := strconv.ParseBool(val)
if err != nil {
log.Fatalf("LookupEnvOrInt[%s]: %v", key, err)
log.Fatalf("LookupEnvOrBool[%s]: %v", key, err)
}
return v
}

View File

@ -17,6 +17,7 @@ require (
)
require (
github.com/go-stomp/stomp v2.1.4+incompatible // indirect
github.com/gofrs/uuid v4.0.0+incompatible // indirect
github.com/golang/snappy v0.0.1 // indirect
github.com/gomodule/redigo v1.8.4 // indirect

View File

@ -5,6 +5,8 @@ github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumC
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/eclipse/paho.golang v0.10.0 h1:oUGPjRwWcZQRgDD9wVDV7y7i7yBSxts3vcvcNJo8B4Q=
github.com/eclipse/paho.golang v0.10.0/go.mod h1:rhrV37IEwauUyx8FHrvmXOKo+QRKng5ncoN1vJiJMcs=
github.com/go-stomp/stomp v2.1.4+incompatible h1:D3SheUVDOz9RsjVWkoh/1iCOwD0qWjyeTZMUZ0EXg2Y=
github.com/go-stomp/stomp v2.1.4+incompatible/go.mod h1:VqCtqNZv1226A1/79yh+rMiFUcfY3R109np+7ke4n0c=
github.com/gofrs/uuid v4.0.0+incompatible h1:1SD/1F5pU8p29ybwgQSwpQk+mwdRrXCYuPhW6m+TnJw=
github.com/gofrs/uuid v4.0.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=

View File

@ -127,7 +127,7 @@ func (m *Mqtt) Publish(msg []byte, topic, respTopic string, retain bool) {
/* -------------------------------------------------------------------------- */
func (m *Mqtt) buildClientConfig(status, controller, apiMsg chan *paho.Publish) *paho.ClientConfig {
log.Println("Starting new mqtt client")
log.Println("Starting new MQTT client")
singleHandler := paho.NewSingleHandlerRouter(func(p *paho.Publish) {
if strings.Contains(p.Topic, "status") {
status <- p

View File

@ -4,11 +4,12 @@ package mtp
import (
"log"
"os"
"sync"
)
/*
Message Transfer Protocol layer, which can use WebSockets, MQTT, COAP or STOMP; as defined in tr369 protocol.
It was made thinking in a broker architeture instead of a server-client p2p.
Message Transfer Protocol layer, which can use WebSockets, MQTT, COAP or STOMP; as defined in tr369 protocol.
It was made thinking in a broker architeture instead of a server-client p2p.
*/
type Broker interface {
Connect()
@ -22,17 +23,18 @@ type Broker interface {
//Request(msg []byte, msgType usp_msg.Header_MsgType, pubTopic string, subTopic string)
}
// Not used, since we are using a broker solution with MQTT.
// Not used, since we are using a broker solution.
type P2P interface {
}
// Start the service which enable the communication with IoTs (MTP protocol layer).
func MtpService(b Broker, done chan os.Signal) {
func MtpService(b Broker, done chan os.Signal, wg *sync.WaitGroup) {
b.Connect()
wg.Done()
go func() {
for range done {
b.Disconnect()
log.Println("Successfully disconnected to broker!")
log.Println("Successfully disconnected to MTPs!")
// Receives signal and then replicates it to the rest of the app.
done <- os.Interrupt