From 26c523fc10c38a58887db8dd2614973922b592b7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Leandro=20Ant=C3=B4nio=20Farias=20Machado?= Date: Sat, 18 Mar 2023 00:49:15 -0300 Subject: [PATCH] feat(mqtt): connection with user, password, and acl list --- backend/.gitignore | 5 +- backend/cmd/oktopus/main.go | 46 ++++++++++--------- .../internal/mosquitto/config/mosquitto.conf | 17 +++++++ .../{mqtt => }/mosquitto/creds/doc.md | 0 backend/internal/mosquitto/docker-compose.yml | 16 +++++++ backend/internal/mqtt/docker-compose.yml | 13 ------ .../mqtt/mosquitto/config/mosquitto.conf | 12 ----- backend/internal/mqtt/mqtt-client.go | 31 +++++++++---- backend/internal/mqtt/mqtt-server.go | 2 +- 9 files changed, 86 insertions(+), 56 deletions(-) create mode 100755 backend/internal/mosquitto/config/mosquitto.conf rename backend/internal/{mqtt => }/mosquitto/creds/doc.md (100%) create mode 100755 backend/internal/mosquitto/docker-compose.yml delete mode 100755 backend/internal/mqtt/docker-compose.yml delete mode 100755 backend/internal/mqtt/mosquitto/config/mosquitto.conf diff --git a/backend/.gitignore b/backend/.gitignore index 555aa08..2798a8a 100644 --- a/backend/.gitignore +++ b/backend/.gitignore @@ -12,4 +12,7 @@ go.work *.crt *.key *.csr -*.db.new \ No newline at end of file +*.db.new +*.txt +*.pwd +*.acl \ No newline at end of file diff --git a/backend/cmd/oktopus/main.go b/backend/cmd/oktopus/main.go index 3d6c14f..f201dbb 100755 --- a/backend/cmd/oktopus/main.go +++ b/backend/cmd/oktopus/main.go @@ -22,32 +22,34 @@ func main() { // Locks app running until it receives a stop command as Ctrl+C. signal.Notify(done, syscall.SIGINT, syscall.SIGTERM) + log.SetFlags(log.LstdFlags | log.Lshortfile) + log.Println("Starting Oktopus Project TR-369 Controller Version:", VERSION) - fl_broker := flag.Bool("m", false, "Defines if mosquitto container must run or not") + flBroker := flag.Bool("m", false, "Defines if mosquitto container must run or not") // fl_endpointId := flag.String("endpoint_id", "proto::oktopus-controller", "Defines the enpoint id the Agent must trust on.") - fl_sub_topic := flag.String("s", "oktopus/v1/agent", "That's the topic agent must publish to, and the controller keeps on listening.") + flSubTopic := flag.String("s", "oktopus/+/+/agent", "That's the topic agent must publish to, and the controller keeps on listening.") // fl_pub_topic := flag.String("pub_topic", "oktopus/v1/controller", "That's the topic controller must publish to, and the agent keeps on listening.") - fl_broker_addr := flag.String("a", "localhost", "Mqtt broker adrress") - fl_broker_port := flag.String("p", "1883", "Mqtt broker port") - fl_tls_cert := flag.String("ca", "", "TLS ca certificate") - fl_broker_username := flag.String("u", "", "Mqtt broker username") - fl_broker_password := flag.String("P", "", "Mqtt broker password") - fl_broker_clientid := flag.String("i", "", "A clientid for the Mqtt connection") - fl_broker_qos := flag.Int("q", 2, "Quality of service of mqtt messages delivery") - fl_help := flag.Bool("help", false, "Help") + flBrokerAddr := flag.String("a", "localhost", "Mqtt broker adrress") + flBrokerPort := flag.String("p", "1883", "Mqtt broker port") + flTlsCert := flag.String("ca", "", "TLS ca certificate") + flBrokerUsername := flag.String("u", "", "Mqtt broker username") + flBrokerPassword := flag.String("P", "", "Mqtt broker password") + flBrokerClientId := flag.String("i", "", "A clientid for the Mqtt connection") + flBrokerQos := flag.Int("q", 2, "Quality of service of mqtt messages delivery") + flHelp := flag.Bool("help", false, "Help") flag.Parse() - if *fl_help { + if *flHelp { flag.Usage() os.Exit(0) } - if *fl_broker { + if *flBroker { log.Println("Starting Mqtt Broker") mqtt.StartMqttBroker() } /* - This context suppress our needs, but we can use a more sofisticade + This context suppress our needs, but we can use a more sofisticate approach with cancel and timeout options passing it through paho mqtt functions. */ ctx := context.Background() @@ -56,17 +58,19 @@ func main() { If you want to use another message protocol just make it implement Broker interface. */ mqttClient := mqtt.Mqtt{ - Addr: *fl_broker_addr, - Port: *fl_broker_port, - Id: *fl_broker_clientid, - User: *fl_broker_username, - Passwd: *fl_broker_password, + Addr: *flBrokerAddr, + Port: *flBrokerPort, + Id: *flBrokerClientId, + User: *flBrokerUsername, + Passwd: *flBrokerPassword, Ctx: ctx, - QoS: *fl_broker_qos, - SubTopic: *fl_sub_topic, - CA: *fl_tls_cert, + QoS: *flBrokerQos, + SubTopic: *flSubTopic, + CA: *flTlsCert, } + log.Println() + mtp.MtpService(&mqttClient, done) <-done diff --git a/backend/internal/mosquitto/config/mosquitto.conf b/backend/internal/mosquitto/config/mosquitto.conf new file mode 100755 index 0000000..e63b441 --- /dev/null +++ b/backend/internal/mosquitto/config/mosquitto.conf @@ -0,0 +1,17 @@ +per_listener_settings true + +listener 1883 +allow_anonymous false +persistence true +connection_messages true +log_dest file /mosquitto/log/mosquitto.log +persistence_location /mosquitto/data +password_file /mosquitto/passwd/passwd.pwd +acl_file /mosquitto/acl/acl.acl + +# listener 8883 +# cafile /mosquitto/creds/ca.crt +# certfile /mosquitto/creds/server.crt +# keyfile /mosquitto/creds/server.key + + diff --git a/backend/internal/mqtt/mosquitto/creds/doc.md b/backend/internal/mosquitto/creds/doc.md similarity index 100% rename from backend/internal/mqtt/mosquitto/creds/doc.md rename to backend/internal/mosquitto/creds/doc.md diff --git a/backend/internal/mosquitto/docker-compose.yml b/backend/internal/mosquitto/docker-compose.yml new file mode 100755 index 0000000..a018694 --- /dev/null +++ b/backend/internal/mosquitto/docker-compose.yml @@ -0,0 +1,16 @@ +services: + broker: + image: eclipse-mosquitto + container_name: mosquittto_broker + network_mode: host + user: 1000:1000 + volumes: + - ./config:/mosquitto/config + - ./data:/mosquitto/data + - ./log:/mosquitto/log + - ./creds:/mosquitto/creds + - ./passwd.pwd:/mosquitto/passwd.pwd + - ./acl:/mosquitto/acl + ports: + - 8883:8883 + - 1883:1883 \ No newline at end of file diff --git a/backend/internal/mqtt/docker-compose.yml b/backend/internal/mqtt/docker-compose.yml deleted file mode 100755 index 3aa7f56..0000000 --- a/backend/internal/mqtt/docker-compose.yml +++ /dev/null @@ -1,13 +0,0 @@ -services: - broker: - image: eclipse-mosquitto - container_name: mosquittto_broker - network_mode: host - user: 1000:1000 - volumes: - - ./mosquitto/config:/mosquitto/config - - ./mosquitto/data:/mosquitto/data - - ./mosquitto/log:/mosquitto/log - - ./mosquitto/creds:/mosquitto/creds - ports: - - 8883:8883 \ No newline at end of file diff --git a/backend/internal/mqtt/mosquitto/config/mosquitto.conf b/backend/internal/mqtt/mosquitto/config/mosquitto.conf deleted file mode 100755 index 399e22c..0000000 --- a/backend/internal/mqtt/mosquitto/config/mosquitto.conf +++ /dev/null @@ -1,12 +0,0 @@ -allow_anonymous true -listener 8883 -persistence true -persistence_location /mosquitto/data -connection_messages true -log_dest file /mosquitto/log/mosquitto.log -cafile /mosquitto/creds/ca.crt -certfile /mosquitto/creds/server.crt -keyfile /mosquitto/creds/server.key - -# require_certificate true -# use_identity_as_username true \ No newline at end of file diff --git a/backend/internal/mqtt/mqtt-client.go b/backend/internal/mqtt/mqtt-client.go index ee41f2a..cad2fe2 100644 --- a/backend/internal/mqtt/mqtt-client.go +++ b/backend/internal/mqtt/mqtt-client.go @@ -30,12 +30,14 @@ var c *paho.Client /* ------------------- Implementations of broker interface ------------------ */ func (m *Mqtt) Connect() { - clientConfig := startClient(m.Addr, m.Port, m.CA, m.Ctx) + msgChan := make(chan *paho.Publish) + go messageHandler(msgChan) + clientConfig := startClient(m.Addr, m.Port, m.CA, m.Ctx, msgChan) connParameters := startConnection(m.Id, m.User, m.Passwd) conn, err := clientConfig.Connect(m.Ctx, &connParameters) if err != nil { - log.Fatal(err) + log.Println(err) } // Sets global client to be used by other mqtt functions c = clientConfig @@ -69,12 +71,16 @@ func (m *Mqtt) Subscribe() { /* -------------------------------------------------------------------------- */ -func startClient(addr string, port string, tlsCa string, ctx context.Context) *paho.Client { +func startClient(addr string, port string, tlsCa string, ctx context.Context, msgChan chan *paho.Publish) *paho.Client { + singleHandler := paho.NewSingleHandlerRouter(func(m *paho.Publish) { + msgChan <- m + }) if tlsCa != "" { - conn := conntWithTls(tlsCa, addr+":"+port, ctx) + conn := connWithTls(tlsCa, addr+":"+port, ctx) clientConfig := paho.ClientConfig{ - Conn: conn, + Conn: conn, + Router: singleHandler, } return paho.NewClient(clientConfig) } @@ -85,13 +91,14 @@ func startClient(addr string, port string, tlsCa string, ctx context.Context) *p } clientConfig := paho.ClientConfig{ - Conn: conn, + Conn: conn, + Router: singleHandler, } return paho.NewClient(clientConfig) } -func conntWithTls(tlsCa, address string, ctx context.Context) net.Conn { +func connWithTls(tlsCa, address string, ctx context.Context) net.Conn { ca, err := ioutil.ReadFile(tlsCa) if err != nil { log.Fatal(err) @@ -104,7 +111,7 @@ func conntWithTls(tlsCa, address string, ctx context.Context) net.Conn { } config := &tls.Config{ - // After going to cloud, certificates must match names and we must take this option below + // After going to cloud, certificates must match names, and we must take this option below InsecureSkipVerify: true, RootCAs: roots, } @@ -155,11 +162,19 @@ func startConnection(id, user, pass string) paho.Connect { } if user != "" { + connParameters.Username = user connParameters.UsernameFlag = true } if pass != "" { + connParameters.Password = []byte(pass) connParameters.PasswordFlag = true } return connParameters } + +func messageHandler(msg chan *paho.Publish) { + for m := range msg { + log.Println("Received message:", string(m.Payload)) + } +} diff --git a/backend/internal/mqtt/mqtt-server.go b/backend/internal/mqtt/mqtt-server.go index 25ecf74..c1d9792 100755 --- a/backend/internal/mqtt/mqtt-server.go +++ b/backend/internal/mqtt/mqtt-server.go @@ -18,7 +18,7 @@ func StartMqttBroker() { //TODO: Set broker access control list to topics. //TODO: Set MQTTv5 CONNACK packet with topic for agent to use. - cmd := exec.Command("sudo", "docker", "compose", "-f", "internal/mqtt/docker-compose.yml", "up", "-d") + cmd := exec.Command("sudo", "docker", "compose", "-f", "internal/mosquitto/docker-compose.yml", "up", "-d") err := cmd.Run()