chore: create mqtt client connection #9

This commit is contained in:
Leandro Antônio Farias Machado 2023-03-09 22:00:12 -03:00
parent bddc6d3b13
commit 2d47a451c9
9 changed files with 172 additions and 24 deletions

2
.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
*.log
*.db

View File

@ -3,36 +3,60 @@
package main
import (
//"flag"
//"fmt"
//"github.com/leandrofars/oktopus/internal/usp_record"
//"github.com/leandrofars/oktopus/internal/usp_message"
//"github.com/golang/protobuf/proto"
"github.com/leandrofars/oktopus/internal/mqtt"
"flag"
"log"
//"os/exec"
"os"
"os/signal"
"syscall"
"github.com/eclipse/paho.golang/paho"
"github.com/leandrofars/oktopus/internal/mqtt"
)
func main() {
done := make(chan bool)
done := make(chan os.Signal, 1)
signal.Notify(done, syscall.SIGINT, syscall.SIGTERM)
log.Println("Starting Oktopus Project TR-369 Controller...")
log.Println("Starting Mosquitto Broker")
go mqtt.StartMqttBroker()
fl_broker := flag.Bool("mosquitto", false, "Defines if mosquitto container must run or not")
// fl_endpointId := flag.String("endpoint_id", "proto::oktopus-controller", "Defines the enpoint id the Agent must trust on.")
// fl_sub_topic := flag.String("sub_topic", "oktopus/v1/agent", "That's the topic agent must publish to, and the controller keeps on listening.")
// fl_pub_topic := flag.String("pub_topic", "oktopus/v1/controller", "That's the topic controller must publish to, and the agent keeps on listening.")
fl_broker_addr := flag.String("broker_addr", "localhost", "Mqtt broker adrress")
fl_broker_port := flag.String("broker_port", "1883", "Mqtt broker port")
fl_broker_username := flag.String("broker_user", "", "Mqtt broker username")
fl_broker_password := flag.String("password", "", "Mqtt broker password")
fl_broker_clientid := flag.String("clientid", "", "A clientid for the Mqtt connection")
fl_help := flag.Bool("help", false, "Help")
//TODO: Create more options to set using flags
//TODO: Read user inputs
flag.Parse()
// usp_record.Record{
// Version: "1.0",
// ToId: "os::4851CF-000000000002",
// FromId: "leleco",
// PayloadSecurity: usp_record.Record_PLAINTEXT,
// RecordType: &usp_record.Record_NoSessionContext{
// NoSessionContext: &usp_record.NoSessionContextRecord{
// Payload: []byte("payload"),
// },
// },
// }
if *fl_help {
flag.Usage()
os.Exit(0)
}
if *fl_broker {
log.Println("Starting Mqtt Broker")
mqtt.StartMqttBroker()
}
newClient := mqtt.StartMqttClient(fl_broker_addr, fl_broker_port)
newConnection := mqtt.StartNewConnection(*fl_broker_clientid, *fl_broker_username, *fl_broker_password)
mqtt.ConnectMqttBroker(newClient, newConnection, fl_broker_addr)
<-done
log.Println("Disconnecting broker")
if newClient != nil {
d := &paho.Disconnect{ReasonCode: 0}
err := newClient.Disconnect(d)
if err != nil {
log.Fatalf("failed to send Disconnect: %s", err)
}
}
log.Println(" Oktopus is out!")
}

7
go.mod
View File

@ -1,3 +1,10 @@
module github.com/leandrofars/oktopus
go 1.18
require google.golang.org/protobuf v1.28.1
require (
github.com/eclipse/paho.golang v0.10.0 // indirect
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a // indirect
)

19
go.sum Normal file
View File

@ -0,0 +1,19 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/eclipse/paho.golang v0.10.0 h1:oUGPjRwWcZQRgDD9wVDV7y7i7yBSxts3vcvcNJo8B4Q=
github.com/eclipse/paho.golang v0.10.0/go.mod h1:rhrV37IEwauUyx8FHrvmXOKo+QRKng5ncoN1vJiJMcs=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a h1:DcqTD9SDLc+1P/r1EmRBwnVsrOwW+kk2vWf9n+1sGhs=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.28.1 h1:d0NfwRgPtno5B1Wa6L2DAG+KivqkdutMf1UhdNx175w=
google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@ -8,3 +8,5 @@ services:
- ./mosquitto/config:/mosquitto/config
- ./mosquitto/data:/mosquitto/data
- ./mosquitto/log:/mosquitto/log
ports:
- 1883:1883

View File

@ -2,4 +2,5 @@ allow_anonymous true
listener 1883
persistence true
persistence_location /mosquitto/data
connection_messages true
log_dest file /mosquitto/log/mosquitto.log

View File

@ -0,0 +1,66 @@
package mqtt
import (
"context"
"log"
"net"
"github.com/eclipse/paho.golang/paho"
"github.com/leandrofars/oktopus/internal/utils"
)
func StartMqttClient(addr, port *string) *paho.Client {
conn, err := net.Dial("tcp", *addr+":"+*port)
if err != nil {
log.Fatal(err)
}
clientConfig := paho.ClientConfig{
Conn: conn,
}
return paho.NewClient(clientConfig)
}
func StartNewConnection(id, user, pass string) paho.Connect {
connParameters := paho.Connect{
KeepAlive: 30,
ClientID: id,
CleanStart: true,
Username: user,
Password: []byte(pass),
}
if id != "" {
connParameters.ClientID = id
} else {
mac, err := utils.GetMacAddr()
if err != nil {
log.Fatal(err)
}
connParameters.ClientID = mac[0]
}
if user != "" {
connParameters.UsernameFlag = true
}
if pass != "" {
connParameters.PasswordFlag = true
}
return connParameters
}
func ConnectMqttBroker(c *paho.Client, cp paho.Connect, addr *string) {
conn, err := c.Connect(context.Background(), &cp)
if err != nil {
log.Fatal(err)
}
if conn.ReasonCode != 0 {
log.Fatalf("Failed to connect to %s : %d - %s", *addr, conn.ReasonCode, conn.Properties.ReasonString)
}
}

View File

@ -10,9 +10,13 @@ import (
"os/exec"
)
// Get Mqtt Broker up and running
func StartMqttBroker() {
//TODO: Start Container through Docker SDK for GO, eliminating docker-compose and shell comands.
//TODO: Create Broker with user, password and CA certificate.
//TODO: Set broker access control list to topics.
//TODO: Set MQTTv5 CONNACK packet with topic for agent to use.
cmd := exec.Command("sudo", "docker", "compose", "-f", "internal/mqtt/docker-compose.yml", "up", "-d")
@ -22,4 +26,6 @@ func StartMqttBroker() {
log.Fatal(err.Error())
return
}
log.Println("Broker Mqtt Up and Running!")
}

21
internal/utils/utils.go Normal file
View File

@ -0,0 +1,21 @@
package utils
import (
"net"
)
// Get interfaces MACs, and the first interface MAC is gonna be used as mqtt clientId
func GetMacAddr() ([]string, error) {
ifas, err := net.Interfaces()
if err != nil {
return nil, err
}
var as []string
for _, ifa := range ifas {
a := ifa.HardwareAddr.String()
if a != "" {
as = append(as, a)
}
}
return as, nil
}