diff --git a/backend/services/controller/internal/mqtt/mqtt.go b/backend/services/controller/internal/mqtt/mqtt.go index ca5cf6d..9432ace 100644 --- a/backend/services/controller/internal/mqtt/mqtt.go +++ b/backend/services/controller/internal/mqtt/mqtt.go @@ -2,7 +2,7 @@ package mqtt import ( "context" - "crypto/tls" + "github.com/eclipse/paho.golang/autopaho" "github.com/eclipse/paho.golang/paho" "github.com/leandrofars/oktopus/internal/db" usp_msg "github.com/leandrofars/oktopus/internal/usp_message" @@ -10,8 +10,7 @@ import ( "github.com/leandrofars/oktopus/internal/utils" "google.golang.org/protobuf/proto" "log" - "net" - "os" + "net/url" "strings" "sync" "time" @@ -34,36 +33,76 @@ type Mqtt struct { QMutex *sync.Mutex } -var c *paho.Client +var c *autopaho.ConnectionManager /* ------------------- Implementations of broker interface ------------------ */ func (m *Mqtt) Connect() { + + broker, _ := url.Parse("tcp://" + m.Addr + ":" + m.Port) + devices := make(chan *paho.Publish) controller := make(chan *paho.Publish) disconnect := make(chan *paho.Publish) apiMsg := make(chan *paho.Publish) + go m.messageHandler(devices, controller, disconnect, apiMsg) - clientConfig := m.startClient(devices, controller, disconnect, apiMsg) - connParameters := startConnection(m.Id, m.User, m.Passwd) + pahoClientConfig := m.buildClientConfig(devices, controller, disconnect, apiMsg) - conn, err := clientConfig.Connect(m.Ctx, &connParameters) + autopahoClientConfig := autopaho.ClientConfig{ + BrokerUrls: []*url.URL{broker}, + KeepAlive: 30, + ConnectRetryDelay: 5 * time.Second, + ConnectTimeout: 5 * time.Second, + OnConnectionUp: func(cm *autopaho.ConnectionManager, connAck *paho.Connack) { + log.Printf("Connected to broker--> %s:%s", m.Addr, m.Port) + m.Subscribe() + }, + OnConnectError: func(err error) { + log.Printf("Error while attempting connection: %s\n", err) + }, + ClientConfig: *pahoClientConfig, + } + + if m.User != "" && m.Passwd != "" { + autopahoClientConfig.SetUsernamePassword(m.User, []byte(m.Passwd)) + } + + log.Println("MQTT client id:", pahoClientConfig.ClientID) + log.Println("MQTT username:", m.User) + log.Println("MQTT password:", m.Passwd) + + cm, err := autopaho.NewConnection(m.Ctx, autopahoClientConfig) if err != nil { - log.Println(err) - } - if conn.ReasonCode != 0 { - log.Fatalf("Failed to connect to %s : %d - %s", m.Addr+m.Port, conn.ReasonCode, conn.Properties.ReasonString) + log.Fatalln(err) } - // Sets global client to be used by other mqtt functions - c = clientConfig + c = cm - log.Printf("Connected to broker--> %s:%s", m.Addr, m.Port) + //devices := make(chan *paho.Publish) + //controller := make(chan *paho.Publish) + //disconnect := make(chan *paho.Publish) + //apiMsg := make(chan *paho.Publish) + //go m.messageHandler(devices, controller, disconnect, apiMsg) + //clientConfig := m.startClient(devices, controller, disconnect, apiMsg) + //connParameters := startConnection(m.Id, m.User, m.Passwd) + // + //conn, err := clientConfig.Connect(m.Ctx, &connParameters) + //if err != nil { + // log.Println(err) + //} + //if conn.ReasonCode != 0 { + // log.Fatalf("Failed to connect to %s : %d - %s", m.Addr+m.Port, conn.ReasonCode, conn.Properties.ReasonString) + //} + // + //// Sets global client to be used by other mqtt functions + //c = clientConfig + // + //log.Printf("Connected to broker--> %s:%s", m.Addr, m.Port) } func (m *Mqtt) Disconnect() { - d := &paho.Disconnect{ReasonCode: 0} - err := c.Disconnect(d) + err := c.Disconnect(m.Ctx) if err != nil { log.Fatalf("failed to send Disconnect: %s", err) } @@ -84,7 +123,7 @@ func (m *Mqtt) Subscribe() { log.Printf("Subscribed to %s", m.SubTopic) log.Printf("Subscribed to %s", m.DevicesTopic) log.Printf("Subscribed to %s", m.DisconnectTopic) - + log.Println("Subscribed to %s", "oktopus/+/api/+") } func (m *Mqtt) Publish(msg []byte, topic, respTopic string) { @@ -105,7 +144,8 @@ func (m *Mqtt) Publish(msg []byte, topic, respTopic string) { /* -------------------------------------------------------------------------- */ -func (m *Mqtt) startClient(devices, controller, disconnect, apiMsg chan *paho.Publish) *paho.Client { +func (m *Mqtt) buildClientConfig(devices, controller, disconnect, apiMsg chan *paho.Publish) *paho.ClientConfig { + log.Println("Starting new mqtt client") singleHandler := paho.NewSingleHandlerRouter(func(p *paho.Publish) { if p.Topic == m.DevicesTopic { devices <- p @@ -120,119 +160,140 @@ func (m *Mqtt) startClient(devices, controller, disconnect, apiMsg chan *paho.Pu } }) - if m.TLS { - conn := connWithTls(m.Addr+":"+m.Port, m.Ctx) - clientConfig := paho.ClientConfig{ - Conn: conn, - Router: singleHandler, - OnServerDisconnect: func(disconnect *paho.Disconnect) { - log.Println("disconnected from mqtt server, reason code: ", disconnect.ReasonCode) - }, - OnClientError: func(err error) { - log.Println(err) - }, - } - return paho.NewClient(clientConfig) - } + //if m.TLS { + // conn := connWithTls(m.Addr+":"+m.Port, m.Ctx) + // clientConfig := paho.ClientConfig{ + // Conn: conn, + // Router: singleHandler, + // OnServerDisconnect: func(discon *paho.Disconnect) { + // log.Println("disconnected from mqtt server, reason code: ", discon.ReasonCode) + // }, + // OnClientError: func(err error) { + // log.Println(err) + // }, + // } + // return &clientConfig + //} - conn, _ := connWithoutTLS(m.Addr, m.Port) + //conn, _ := connWithoutTLS(m.Addr, m.Port) - clientConfig := paho.ClientConfig{ - Conn: conn, + clientConfig := paho.ClientConfig{} + + clientConfig = paho.ClientConfig{ + //Conn: conn, Router: singleHandler, - OnServerDisconnect: func(disconnect *paho.Disconnect) { - log.Println("disconnected from mqtt server, reason code: ", disconnect.ReasonCode) + OnServerDisconnect: func(d *paho.Disconnect) { + if d.Properties != nil { + log.Printf("Requested disconnect: %s\n", clientConfig.ClientID, d.Properties.ReasonString) + } else { + log.Printf("Requested disconnect; reason code: %d\n", clientConfig.ClientID, d.ReasonCode) + } }, OnClientError: func(err error) { log.Println(err) }, } - return paho.NewClient(clientConfig) -} - -func connWithoutTLS(addr, port string) (net.Conn, error) { - x := 1 - for { - log.Println("Trying connection to broker...") - conn, err := net.Dial("tcp", addr+":"+port) - if err == nil { - log.Println("Successfully connected to broker!") - return conn, err - } else { - if x == 5 { - log.Println("Couldn't connect to broker") - os.Exit(1) - } - x = x + 1 - time.Sleep(5 * time.Second) - } - } -} - -func connWithTls(address string, ctx context.Context) net.Conn { - config := &tls.Config{ - // After going to cloud, certificates must match names, and we must take this option below - InsecureSkipVerify: true, - } - - d := tls.Dialer{ - Config: config, - } - - conn, err := d.DialContext(ctx, "tcp", address) - if err != nil { - log.Fatal(err) - } - - conn = newThreadSafeConnection(conn) - - return conn -} - -// Custom net.Conn with thread safety -func newThreadSafeConnection(c net.Conn) net.Conn { - type threadSafeConn struct { - net.Conn - sync.Locker - } - - return &threadSafeConn{ - Conn: c, - Locker: &sync.Mutex{}, - } -} - -func startConnection(id, user, pass string) paho.Connect { - - connParameters := paho.Connect{ - KeepAlive: 30, - ClientID: id, - CleanStart: true, - } - - if id != "" { - connParameters.ClientID = id + if m.Id != "" { + clientConfig.ClientID = m.Id } else { mac, err := utils.GetMacAddr() if err != nil { log.Fatal(err) } - connParameters.ClientID = mac[0] + clientConfig.ClientID = mac[0] } - if user != "" { - connParameters.Username = user - connParameters.UsernameFlag = true - } - if pass != "" { - connParameters.Password = []byte(pass) - connParameters.PasswordFlag = true - } - - return connParameters + return &clientConfig } +//Not used by autopaho package +//func connWithoutTLS(addr, port string) (net.Conn, error) { +// x := 1 +// for { +// log.Println("Trying connection to broker...") +// conn, err := net.Dial("tcp", addr+":"+port) +// if err == nil { +// log.Println("Successfully connected to broker!") +// return conn, err +// } else { +// if x == 5 { +// log.Println("Couldn't connect to broker") +// os.Exit(1) +// } +// x = x + 1 +// time.Sleep(5 * time.Second) +// } +// } +//} + +//Not used by autopaho package +//func connWithTls(address string, ctx context.Context) net.Conn { +// config := &tls.Config{ +// // After going to cloud, certificates must match names, and we must take this option below +// InsecureSkipVerify: true, +// } +// +// d := tls.Dialer{ +// Config: config, +// } +// +// conn, err := d.DialContext(ctx, "tcp", address) +// if err != nil { +// log.Fatal(err) +// } +// +// conn = newThreadSafeConnection(conn) +// +// return conn +//} + +// Custom net.Conn with thread safety +//func newThreadSafeConnection(c net.Conn) net.Conn { +// type threadSafeConn struct { +// net.Conn +// sync.Locker +// } +// +// return &threadSafeConn{ +// Conn: c, +// Locker: &sync.Mutex{}, +// } +//} + +//func startConnection(id, user, pass string) paho.Connect { +// +// connParameters := paho.Connect{ +// KeepAlive: 30, +// ClientID: id, +// CleanStart: true, +// } +// +// if id != "" { +// connParameters.ClientID = id +// } else { +// mac, err := utils.GetMacAddr() +// if err != nil { +// log.Fatal(err) +// } +// connParameters.ClientID = mac[0] +// log.Println("MQTT client id:", connParameters.ClientID) +// } +// +// if user != "" { +// connParameters.Username = user +// connParameters.UsernameFlag = true +// log.Println("MQTT username:", connParameters.Username) +// } +// if pass != "" { +// connParameters.Password = []byte(pass) +// connParameters.PasswordFlag = true +// log.Println("MQTT password:", pass) +// } +// +// return connParameters +//} + func (m *Mqtt) messageHandler(devices, controller, disconnect, apiMsg chan *paho.Publish) { for { select { diff --git a/backend/services/controller/internal/mtp/mtp.go b/backend/services/controller/internal/mtp/mtp.go index c9ef304..f5852f6 100644 --- a/backend/services/controller/internal/mtp/mtp.go +++ b/backend/services/controller/internal/mtp/mtp.go @@ -38,5 +38,5 @@ func MtpService(b Broker, done chan os.Signal) { done <- os.Interrupt } }() - b.Subscribe() + //b.Subscribe() }