chore(mqtt): enables mqtt connection over tls

This commit is contained in:
Leandro Farias Machado 2023-05-25 07:51:43 -03:00
parent 75603c6f6f
commit 976612057a
2 changed files with 26 additions and 22 deletions

View File

@ -50,7 +50,7 @@ func main() {
flSubTopic := flag.String("sub", "oktopus/+/controller/+", "That's the topic agent must publish to, and the controller keeps on listening.") flSubTopic := flag.String("sub", "oktopus/+/controller/+", "That's the topic agent must publish to, and the controller keeps on listening.")
flBrokerAddr := flag.String("a", "localhost", "Mqtt broker adrress") flBrokerAddr := flag.String("a", "localhost", "Mqtt broker adrress")
flBrokerPort := flag.String("p", "1883", "Mqtt broker port") flBrokerPort := flag.String("p", "1883", "Mqtt broker port")
flTlsCert := flag.String("ca", "", "TLS ca certificate") flTlsCert := flag.Bool("tls", false, "Connect to broker over TLS")
flBrokerUsername := flag.String("u", "", "Mqtt broker username") flBrokerUsername := flag.String("u", "", "Mqtt broker username")
flBrokerPassword := flag.String("P", "", "Mqtt broker password") flBrokerPassword := flag.String("P", "", "Mqtt broker password")
flBrokerClientId := flag.String("i", "", "A clientid for the Mqtt connection") flBrokerClientId := flag.String("i", "", "A clientid for the Mqtt connection")
@ -87,7 +87,7 @@ func main() {
SubTopic: *flSubTopic, SubTopic: *flSubTopic,
DevicesTopic: *flDevicesTopic, DevicesTopic: *flDevicesTopic,
DisconnectTopic: *flDisconTopic, DisconnectTopic: *flDisconTopic,
CA: *flTlsCert, TLS: *flTlsCert,
DB: database, DB: database,
MsgQueue: apiMsgQueue, MsgQueue: apiMsgQueue,
QMutex: &m, QMutex: &m,

View File

@ -3,18 +3,18 @@ package mqtt
import ( import (
"context" "context"
"crypto/tls" "crypto/tls"
"crypto/x509"
"github.com/eclipse/paho.golang/paho" "github.com/eclipse/paho.golang/paho"
"github.com/leandrofars/oktopus/internal/db" "github.com/leandrofars/oktopus/internal/db"
usp_msg "github.com/leandrofars/oktopus/internal/usp_message" usp_msg "github.com/leandrofars/oktopus/internal/usp_message"
"github.com/leandrofars/oktopus/internal/usp_record" "github.com/leandrofars/oktopus/internal/usp_record"
"github.com/leandrofars/oktopus/internal/utils" "github.com/leandrofars/oktopus/internal/utils"
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
"io/ioutil"
"log" "log"
"net" "net"
"os"
"strings" "strings"
"sync" "sync"
"time"
) )
type Mqtt struct { type Mqtt struct {
@ -28,7 +28,7 @@ type Mqtt struct {
SubTopic string SubTopic string
DevicesTopic string DevicesTopic string
DisconnectTopic string DisconnectTopic string
CA string TLS bool
DB db.Database DB db.Database
MsgQueue map[string](chan usp_msg.Msg) MsgQueue map[string](chan usp_msg.Msg)
QMutex *sync.Mutex QMutex *sync.Mutex
@ -120,8 +120,8 @@ func (m *Mqtt) startClient(devices, controller, disconnect, apiMsg chan *paho.Pu
} }
}) })
if m.CA != "" { if m.TLS {
conn := connWithTls(m.CA, m.Addr+":"+m.Port, m.Ctx) conn := connWithTls(m.Addr+":"+m.Port, m.Ctx)
clientConfig := paho.ClientConfig{ clientConfig := paho.ClientConfig{
Conn: conn, Conn: conn,
Router: singleHandler, Router: singleHandler,
@ -135,10 +135,7 @@ func (m *Mqtt) startClient(devices, controller, disconnect, apiMsg chan *paho.Pu
return paho.NewClient(clientConfig) return paho.NewClient(clientConfig)
} }
conn, err := net.Dial("tcp", m.Addr+":"+m.Port) conn, _ := connWithoutTLS(m.Addr, m.Port)
if err != nil {
log.Println(err)
}
clientConfig := paho.ClientConfig{ clientConfig := paho.ClientConfig{
Conn: conn, Conn: conn,
@ -148,22 +145,29 @@ func (m *Mqtt) startClient(devices, controller, disconnect, apiMsg chan *paho.Pu
return paho.NewClient(clientConfig) return paho.NewClient(clientConfig)
} }
func connWithTls(tlsCa, address string, ctx context.Context) net.Conn { func connWithoutTLS(addr, port string) (net.Conn, error) {
ca, err := ioutil.ReadFile(tlsCa) x := 1
if err != nil { for {
log.Fatal(err) log.Println("Trying connection to broker...")
} conn, err := net.Dial("tcp", addr+":"+port)
if err == nil {
roots := x509.NewCertPool() log.Println("Successfully connected to broker!")
ok := roots.AppendCertsFromPEM(ca) return conn, err
if !ok { } else {
panic("failed to parse root certificate") if x == 5 {
log.Println("Couldn't connect to broker")
os.Exit(1)
}
x = x + 1
time.Sleep(5 * time.Second)
}
} }
}
func connWithTls(address string, ctx context.Context) net.Conn {
config := &tls.Config{ config := &tls.Config{
// After going to cloud, certificates must match names, and we must take this option below // After going to cloud, certificates must match names, and we must take this option below
InsecureSkipVerify: true, InsecureSkipVerify: true,
RootCAs: roots,
} }
d := tls.Dialer{ d := tls.Dialer{