diff --git a/backend/services/controller/cmd/oktopus/main.go b/backend/services/controller/cmd/oktopus/main.go index 4150f5f..81ceec3 100755 --- a/backend/services/controller/cmd/oktopus/main.go +++ b/backend/services/controller/cmd/oktopus/main.go @@ -50,7 +50,7 @@ func main() { flSubTopic := flag.String("sub", "oktopus/+/controller/+", "That's the topic agent must publish to, and the controller keeps on listening.") flBrokerAddr := flag.String("a", "localhost", "Mqtt broker adrress") flBrokerPort := flag.String("p", "1883", "Mqtt broker port") - flTlsCert := flag.String("ca", "", "TLS ca certificate") + flTlsCert := flag.Bool("tls", false, "Connect to broker over TLS") flBrokerUsername := flag.String("u", "", "Mqtt broker username") flBrokerPassword := flag.String("P", "", "Mqtt broker password") flBrokerClientId := flag.String("i", "", "A clientid for the Mqtt connection") @@ -87,7 +87,7 @@ func main() { SubTopic: *flSubTopic, DevicesTopic: *flDevicesTopic, DisconnectTopic: *flDisconTopic, - CA: *flTlsCert, + TLS: *flTlsCert, DB: database, MsgQueue: apiMsgQueue, QMutex: &m, diff --git a/backend/services/controller/internal/db/db.go b/backend/services/controller/internal/db/db.go index 7ff27ba..63e785b 100644 --- a/backend/services/controller/internal/db/db.go +++ b/backend/services/controller/internal/db/db.go @@ -20,6 +20,11 @@ func NewDatabase(ctx context.Context, mongoUri string) Database { if err != nil { log.Fatal(err) } + log.Println("Trying to ping Mongo database...") + err = client.Ping(ctx, nil) + if err != nil { + log.Fatal("Couldn't connect to MongoDB --> ", err) + } log.Println("Connected to MongoDB-->", mongoUri) devices := client.Database("oktopus").Collection("devices") diff --git a/backend/services/controller/internal/mqtt/mqtt.go b/backend/services/controller/internal/mqtt/mqtt.go index 7a53378..2db7a2a 100644 --- a/backend/services/controller/internal/mqtt/mqtt.go +++ b/backend/services/controller/internal/mqtt/mqtt.go @@ -3,18 +3,18 @@ package mqtt import ( "context" "crypto/tls" - "crypto/x509" "github.com/eclipse/paho.golang/paho" "github.com/leandrofars/oktopus/internal/db" usp_msg "github.com/leandrofars/oktopus/internal/usp_message" "github.com/leandrofars/oktopus/internal/usp_record" "github.com/leandrofars/oktopus/internal/utils" "google.golang.org/protobuf/proto" - "io/ioutil" "log" "net" + "os" "strings" "sync" + "time" ) type Mqtt struct { @@ -28,7 +28,7 @@ type Mqtt struct { SubTopic string DevicesTopic string DisconnectTopic string - CA string + TLS bool DB db.Database MsgQueue map[string](chan usp_msg.Msg) QMutex *sync.Mutex @@ -120,8 +120,8 @@ func (m *Mqtt) startClient(devices, controller, disconnect, apiMsg chan *paho.Pu } }) - if m.CA != "" { - conn := connWithTls(m.CA, m.Addr+":"+m.Port, m.Ctx) + if m.TLS { + conn := connWithTls(m.Addr+":"+m.Port, m.Ctx) clientConfig := paho.ClientConfig{ Conn: conn, Router: singleHandler, @@ -135,10 +135,7 @@ func (m *Mqtt) startClient(devices, controller, disconnect, apiMsg chan *paho.Pu return paho.NewClient(clientConfig) } - conn, err := net.Dial("tcp", m.Addr+":"+m.Port) - if err != nil { - log.Println(err) - } + conn, _ := connWithoutTLS(m.Addr, m.Port) clientConfig := paho.ClientConfig{ Conn: conn, @@ -148,22 +145,29 @@ func (m *Mqtt) startClient(devices, controller, disconnect, apiMsg chan *paho.Pu return paho.NewClient(clientConfig) } -func connWithTls(tlsCa, address string, ctx context.Context) net.Conn { - ca, err := ioutil.ReadFile(tlsCa) - if err != nil { - log.Fatal(err) - } - - roots := x509.NewCertPool() - ok := roots.AppendCertsFromPEM(ca) - if !ok { - panic("failed to parse root certificate") +func connWithoutTLS(addr, port string) (net.Conn, error) { + x := 1 + for { + log.Println("Trying connection to broker...") + conn, err := net.Dial("tcp", addr+":"+port) + if err == nil { + log.Println("Successfully connected to broker!") + return conn, err + } else { + if x == 5 { + log.Println("Couldn't connect to broker") + os.Exit(1) + } + x = x + 1 + time.Sleep(5 * time.Second) + } } +} +func connWithTls(address string, ctx context.Context) net.Conn { config := &tls.Config{ // After going to cloud, certificates must match names, and we must take this option below InsecureSkipVerify: true, - RootCAs: roots, } d := tls.Dialer{