chore(controller): automatic reconnection to broker
This commit is contained in:
parent
3db57b2d9f
commit
ca22a3edb0
|
|
@ -2,7 +2,7 @@ package mqtt
|
|||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"github.com/eclipse/paho.golang/autopaho"
|
||||
"github.com/eclipse/paho.golang/paho"
|
||||
"github.com/leandrofars/oktopus/internal/db"
|
||||
usp_msg "github.com/leandrofars/oktopus/internal/usp_message"
|
||||
|
|
@ -10,8 +10,7 @@ import (
|
|||
"github.com/leandrofars/oktopus/internal/utils"
|
||||
"google.golang.org/protobuf/proto"
|
||||
"log"
|
||||
"net"
|
||||
"os"
|
||||
"net/url"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
|
@ -34,36 +33,76 @@ type Mqtt struct {
|
|||
QMutex *sync.Mutex
|
||||
}
|
||||
|
||||
var c *paho.Client
|
||||
var c *autopaho.ConnectionManager
|
||||
|
||||
/* ------------------- Implementations of broker interface ------------------ */
|
||||
|
||||
func (m *Mqtt) Connect() {
|
||||
|
||||
broker, _ := url.Parse("tcp://" + m.Addr + ":" + m.Port)
|
||||
|
||||
devices := make(chan *paho.Publish)
|
||||
controller := make(chan *paho.Publish)
|
||||
disconnect := make(chan *paho.Publish)
|
||||
apiMsg := make(chan *paho.Publish)
|
||||
|
||||
go m.messageHandler(devices, controller, disconnect, apiMsg)
|
||||
clientConfig := m.startClient(devices, controller, disconnect, apiMsg)
|
||||
connParameters := startConnection(m.Id, m.User, m.Passwd)
|
||||
pahoClientConfig := m.buildClientConfig(devices, controller, disconnect, apiMsg)
|
||||
|
||||
conn, err := clientConfig.Connect(m.Ctx, &connParameters)
|
||||
autopahoClientConfig := autopaho.ClientConfig{
|
||||
BrokerUrls: []*url.URL{broker},
|
||||
KeepAlive: 30,
|
||||
ConnectRetryDelay: 5 * time.Second,
|
||||
ConnectTimeout: 5 * time.Second,
|
||||
OnConnectionUp: func(cm *autopaho.ConnectionManager, connAck *paho.Connack) {
|
||||
log.Printf("Connected to broker--> %s:%s", m.Addr, m.Port)
|
||||
m.Subscribe()
|
||||
},
|
||||
OnConnectError: func(err error) {
|
||||
log.Printf("Error while attempting connection: %s\n", err)
|
||||
},
|
||||
ClientConfig: *pahoClientConfig,
|
||||
}
|
||||
|
||||
if m.User != "" && m.Passwd != "" {
|
||||
autopahoClientConfig.SetUsernamePassword(m.User, []byte(m.Passwd))
|
||||
}
|
||||
|
||||
log.Println("MQTT client id:", pahoClientConfig.ClientID)
|
||||
log.Println("MQTT username:", m.User)
|
||||
log.Println("MQTT password:", m.Passwd)
|
||||
|
||||
cm, err := autopaho.NewConnection(m.Ctx, autopahoClientConfig)
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
}
|
||||
if conn.ReasonCode != 0 {
|
||||
log.Fatalf("Failed to connect to %s : %d - %s", m.Addr+m.Port, conn.ReasonCode, conn.Properties.ReasonString)
|
||||
log.Fatalln(err)
|
||||
}
|
||||
|
||||
// Sets global client to be used by other mqtt functions
|
||||
c = clientConfig
|
||||
c = cm
|
||||
|
||||
log.Printf("Connected to broker--> %s:%s", m.Addr, m.Port)
|
||||
//devices := make(chan *paho.Publish)
|
||||
//controller := make(chan *paho.Publish)
|
||||
//disconnect := make(chan *paho.Publish)
|
||||
//apiMsg := make(chan *paho.Publish)
|
||||
//go m.messageHandler(devices, controller, disconnect, apiMsg)
|
||||
//clientConfig := m.startClient(devices, controller, disconnect, apiMsg)
|
||||
//connParameters := startConnection(m.Id, m.User, m.Passwd)
|
||||
//
|
||||
//conn, err := clientConfig.Connect(m.Ctx, &connParameters)
|
||||
//if err != nil {
|
||||
// log.Println(err)
|
||||
//}
|
||||
//if conn.ReasonCode != 0 {
|
||||
// log.Fatalf("Failed to connect to %s : %d - %s", m.Addr+m.Port, conn.ReasonCode, conn.Properties.ReasonString)
|
||||
//}
|
||||
//
|
||||
//// Sets global client to be used by other mqtt functions
|
||||
//c = clientConfig
|
||||
//
|
||||
//log.Printf("Connected to broker--> %s:%s", m.Addr, m.Port)
|
||||
}
|
||||
|
||||
func (m *Mqtt) Disconnect() {
|
||||
d := &paho.Disconnect{ReasonCode: 0}
|
||||
err := c.Disconnect(d)
|
||||
err := c.Disconnect(m.Ctx)
|
||||
if err != nil {
|
||||
log.Fatalf("failed to send Disconnect: %s", err)
|
||||
}
|
||||
|
|
@ -84,7 +123,7 @@ func (m *Mqtt) Subscribe() {
|
|||
log.Printf("Subscribed to %s", m.SubTopic)
|
||||
log.Printf("Subscribed to %s", m.DevicesTopic)
|
||||
log.Printf("Subscribed to %s", m.DisconnectTopic)
|
||||
|
||||
log.Println("Subscribed to %s", "oktopus/+/api/+")
|
||||
}
|
||||
|
||||
func (m *Mqtt) Publish(msg []byte, topic, respTopic string) {
|
||||
|
|
@ -105,7 +144,8 @@ func (m *Mqtt) Publish(msg []byte, topic, respTopic string) {
|
|||
|
||||
/* -------------------------------------------------------------------------- */
|
||||
|
||||
func (m *Mqtt) startClient(devices, controller, disconnect, apiMsg chan *paho.Publish) *paho.Client {
|
||||
func (m *Mqtt) buildClientConfig(devices, controller, disconnect, apiMsg chan *paho.Publish) *paho.ClientConfig {
|
||||
log.Println("Starting new mqtt client")
|
||||
singleHandler := paho.NewSingleHandlerRouter(func(p *paho.Publish) {
|
||||
if p.Topic == m.DevicesTopic {
|
||||
devices <- p
|
||||
|
|
@ -120,119 +160,140 @@ func (m *Mqtt) startClient(devices, controller, disconnect, apiMsg chan *paho.Pu
|
|||
}
|
||||
})
|
||||
|
||||
if m.TLS {
|
||||
conn := connWithTls(m.Addr+":"+m.Port, m.Ctx)
|
||||
clientConfig := paho.ClientConfig{
|
||||
Conn: conn,
|
||||
Router: singleHandler,
|
||||
OnServerDisconnect: func(disconnect *paho.Disconnect) {
|
||||
log.Println("disconnected from mqtt server, reason code: ", disconnect.ReasonCode)
|
||||
},
|
||||
OnClientError: func(err error) {
|
||||
log.Println(err)
|
||||
},
|
||||
}
|
||||
return paho.NewClient(clientConfig)
|
||||
}
|
||||
//if m.TLS {
|
||||
// conn := connWithTls(m.Addr+":"+m.Port, m.Ctx)
|
||||
// clientConfig := paho.ClientConfig{
|
||||
// Conn: conn,
|
||||
// Router: singleHandler,
|
||||
// OnServerDisconnect: func(discon *paho.Disconnect) {
|
||||
// log.Println("disconnected from mqtt server, reason code: ", discon.ReasonCode)
|
||||
// },
|
||||
// OnClientError: func(err error) {
|
||||
// log.Println(err)
|
||||
// },
|
||||
// }
|
||||
// return &clientConfig
|
||||
//}
|
||||
|
||||
conn, _ := connWithoutTLS(m.Addr, m.Port)
|
||||
//conn, _ := connWithoutTLS(m.Addr, m.Port)
|
||||
|
||||
clientConfig := paho.ClientConfig{
|
||||
Conn: conn,
|
||||
clientConfig := paho.ClientConfig{}
|
||||
|
||||
clientConfig = paho.ClientConfig{
|
||||
//Conn: conn,
|
||||
Router: singleHandler,
|
||||
OnServerDisconnect: func(disconnect *paho.Disconnect) {
|
||||
log.Println("disconnected from mqtt server, reason code: ", disconnect.ReasonCode)
|
||||
OnServerDisconnect: func(d *paho.Disconnect) {
|
||||
if d.Properties != nil {
|
||||
log.Printf("Requested disconnect: %s\n", clientConfig.ClientID, d.Properties.ReasonString)
|
||||
} else {
|
||||
log.Printf("Requested disconnect; reason code: %d\n", clientConfig.ClientID, d.ReasonCode)
|
||||
}
|
||||
},
|
||||
OnClientError: func(err error) {
|
||||
log.Println(err)
|
||||
},
|
||||
}
|
||||
|
||||
return paho.NewClient(clientConfig)
|
||||
}
|
||||
|
||||
func connWithoutTLS(addr, port string) (net.Conn, error) {
|
||||
x := 1
|
||||
for {
|
||||
log.Println("Trying connection to broker...")
|
||||
conn, err := net.Dial("tcp", addr+":"+port)
|
||||
if err == nil {
|
||||
log.Println("Successfully connected to broker!")
|
||||
return conn, err
|
||||
} else {
|
||||
if x == 5 {
|
||||
log.Println("Couldn't connect to broker")
|
||||
os.Exit(1)
|
||||
}
|
||||
x = x + 1
|
||||
time.Sleep(5 * time.Second)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func connWithTls(address string, ctx context.Context) net.Conn {
|
||||
config := &tls.Config{
|
||||
// After going to cloud, certificates must match names, and we must take this option below
|
||||
InsecureSkipVerify: true,
|
||||
}
|
||||
|
||||
d := tls.Dialer{
|
||||
Config: config,
|
||||
}
|
||||
|
||||
conn, err := d.DialContext(ctx, "tcp", address)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
conn = newThreadSafeConnection(conn)
|
||||
|
||||
return conn
|
||||
}
|
||||
|
||||
// Custom net.Conn with thread safety
|
||||
func newThreadSafeConnection(c net.Conn) net.Conn {
|
||||
type threadSafeConn struct {
|
||||
net.Conn
|
||||
sync.Locker
|
||||
}
|
||||
|
||||
return &threadSafeConn{
|
||||
Conn: c,
|
||||
Locker: &sync.Mutex{},
|
||||
}
|
||||
}
|
||||
|
||||
func startConnection(id, user, pass string) paho.Connect {
|
||||
|
||||
connParameters := paho.Connect{
|
||||
KeepAlive: 30,
|
||||
ClientID: id,
|
||||
CleanStart: true,
|
||||
}
|
||||
|
||||
if id != "" {
|
||||
connParameters.ClientID = id
|
||||
if m.Id != "" {
|
||||
clientConfig.ClientID = m.Id
|
||||
} else {
|
||||
mac, err := utils.GetMacAddr()
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
connParameters.ClientID = mac[0]
|
||||
clientConfig.ClientID = mac[0]
|
||||
}
|
||||
|
||||
if user != "" {
|
||||
connParameters.Username = user
|
||||
connParameters.UsernameFlag = true
|
||||
}
|
||||
if pass != "" {
|
||||
connParameters.Password = []byte(pass)
|
||||
connParameters.PasswordFlag = true
|
||||
}
|
||||
|
||||
return connParameters
|
||||
return &clientConfig
|
||||
}
|
||||
|
||||
//Not used by autopaho package
|
||||
//func connWithoutTLS(addr, port string) (net.Conn, error) {
|
||||
// x := 1
|
||||
// for {
|
||||
// log.Println("Trying connection to broker...")
|
||||
// conn, err := net.Dial("tcp", addr+":"+port)
|
||||
// if err == nil {
|
||||
// log.Println("Successfully connected to broker!")
|
||||
// return conn, err
|
||||
// } else {
|
||||
// if x == 5 {
|
||||
// log.Println("Couldn't connect to broker")
|
||||
// os.Exit(1)
|
||||
// }
|
||||
// x = x + 1
|
||||
// time.Sleep(5 * time.Second)
|
||||
// }
|
||||
// }
|
||||
//}
|
||||
|
||||
//Not used by autopaho package
|
||||
//func connWithTls(address string, ctx context.Context) net.Conn {
|
||||
// config := &tls.Config{
|
||||
// // After going to cloud, certificates must match names, and we must take this option below
|
||||
// InsecureSkipVerify: true,
|
||||
// }
|
||||
//
|
||||
// d := tls.Dialer{
|
||||
// Config: config,
|
||||
// }
|
||||
//
|
||||
// conn, err := d.DialContext(ctx, "tcp", address)
|
||||
// if err != nil {
|
||||
// log.Fatal(err)
|
||||
// }
|
||||
//
|
||||
// conn = newThreadSafeConnection(conn)
|
||||
//
|
||||
// return conn
|
||||
//}
|
||||
|
||||
// Custom net.Conn with thread safety
|
||||
//func newThreadSafeConnection(c net.Conn) net.Conn {
|
||||
// type threadSafeConn struct {
|
||||
// net.Conn
|
||||
// sync.Locker
|
||||
// }
|
||||
//
|
||||
// return &threadSafeConn{
|
||||
// Conn: c,
|
||||
// Locker: &sync.Mutex{},
|
||||
// }
|
||||
//}
|
||||
|
||||
//func startConnection(id, user, pass string) paho.Connect {
|
||||
//
|
||||
// connParameters := paho.Connect{
|
||||
// KeepAlive: 30,
|
||||
// ClientID: id,
|
||||
// CleanStart: true,
|
||||
// }
|
||||
//
|
||||
// if id != "" {
|
||||
// connParameters.ClientID = id
|
||||
// } else {
|
||||
// mac, err := utils.GetMacAddr()
|
||||
// if err != nil {
|
||||
// log.Fatal(err)
|
||||
// }
|
||||
// connParameters.ClientID = mac[0]
|
||||
// log.Println("MQTT client id:", connParameters.ClientID)
|
||||
// }
|
||||
//
|
||||
// if user != "" {
|
||||
// connParameters.Username = user
|
||||
// connParameters.UsernameFlag = true
|
||||
// log.Println("MQTT username:", connParameters.Username)
|
||||
// }
|
||||
// if pass != "" {
|
||||
// connParameters.Password = []byte(pass)
|
||||
// connParameters.PasswordFlag = true
|
||||
// log.Println("MQTT password:", pass)
|
||||
// }
|
||||
//
|
||||
// return connParameters
|
||||
//}
|
||||
|
||||
func (m *Mqtt) messageHandler(devices, controller, disconnect, apiMsg chan *paho.Publish) {
|
||||
for {
|
||||
select {
|
||||
|
|
|
|||
|
|
@ -38,5 +38,5 @@ func MtpService(b Broker, done chan os.Signal) {
|
|||
done <- os.Interrupt
|
||||
}
|
||||
}()
|
||||
b.Subscribe()
|
||||
//b.Subscribe()
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user