187 lines
3.8 KiB
Go
187 lines
3.8 KiB
Go
package mqtt
|
|
|
|
import (
|
|
"context"
|
|
"crypto/tls"
|
|
"crypto/x509"
|
|
"io/ioutil"
|
|
"log"
|
|
"net"
|
|
"sync"
|
|
|
|
"github.com/eclipse/paho.golang/paho"
|
|
"github.com/leandrofars/oktopus/internal/utils"
|
|
)
|
|
|
|
type Mqtt struct {
|
|
Addr string
|
|
Port string
|
|
Id string
|
|
User string
|
|
Passwd string
|
|
Ctx context.Context
|
|
QoS int
|
|
SubTopic string
|
|
CA string
|
|
}
|
|
|
|
var c *paho.Client
|
|
|
|
/* ------------------- Implementations of broker interface ------------------ */
|
|
|
|
func (m *Mqtt) Connect() {
|
|
msgChan := make(chan *paho.Publish)
|
|
go messageHandler(msgChan)
|
|
clientConfig := startClient(m.Addr, m.Port, m.CA, m.Ctx, msgChan)
|
|
connParameters := startConnection(m.Id, m.User, m.Passwd)
|
|
|
|
conn, err := clientConfig.Connect(m.Ctx, &connParameters)
|
|
if err != nil {
|
|
log.Println(err)
|
|
}
|
|
// Sets global client to be used by other mqtt functions
|
|
c = clientConfig
|
|
|
|
if conn.ReasonCode != 0 {
|
|
log.Fatalf("Failed to connect to %s : %d - %s", m.Addr, conn.ReasonCode, conn.Properties.ReasonString)
|
|
}
|
|
|
|
log.Printf("Connected to broker--> %s:%s", m.Addr, m.Port)
|
|
}
|
|
|
|
func (m *Mqtt) Disconnect() {
|
|
d := &paho.Disconnect{ReasonCode: 0}
|
|
err := c.Disconnect(d)
|
|
if err != nil {
|
|
log.Fatalf("failed to send Disconnect: %s", err)
|
|
}
|
|
}
|
|
|
|
func (m *Mqtt) Subscribe() {
|
|
if _, err := c.Subscribe(m.Ctx, &paho.Subscribe{
|
|
Subscriptions: map[string]paho.SubscribeOptions{
|
|
m.SubTopic: {QoS: byte(m.QoS), NoLocal: true},
|
|
},
|
|
}); err != nil {
|
|
log.Fatalln(err)
|
|
}
|
|
|
|
log.Printf("Subscribed to %s", m.SubTopic)
|
|
}
|
|
|
|
/* -------------------------------------------------------------------------- */
|
|
|
|
func startClient(addr string, port string, tlsCa string, ctx context.Context, msgChan chan *paho.Publish) *paho.Client {
|
|
singleHandler := paho.NewSingleHandlerRouter(func(m *paho.Publish) {
|
|
msgChan <- m
|
|
})
|
|
|
|
if tlsCa != "" {
|
|
conn := connWithTls(tlsCa, addr+":"+port, ctx)
|
|
clientConfig := paho.ClientConfig{
|
|
Conn: conn,
|
|
Router: singleHandler,
|
|
OnServerDisconnect: func(disconnect *paho.Disconnect) {
|
|
log.Println("disconnected from mqtt server, reason code: ", disconnect.ReasonCode)
|
|
},
|
|
OnClientError: func(err error) {
|
|
log.Println(err)
|
|
},
|
|
}
|
|
return paho.NewClient(clientConfig)
|
|
}
|
|
|
|
conn, err := net.Dial("tcp", addr+":"+port)
|
|
if err != nil {
|
|
log.Println(err)
|
|
}
|
|
|
|
clientConfig := paho.ClientConfig{
|
|
Conn: conn,
|
|
Router: singleHandler,
|
|
}
|
|
|
|
return paho.NewClient(clientConfig)
|
|
}
|
|
|
|
func connWithTls(tlsCa, address string, ctx context.Context) net.Conn {
|
|
ca, err := ioutil.ReadFile(tlsCa)
|
|
if err != nil {
|
|
log.Fatal(err)
|
|
}
|
|
|
|
roots := x509.NewCertPool()
|
|
ok := roots.AppendCertsFromPEM(ca)
|
|
if !ok {
|
|
panic("failed to parse root certificate")
|
|
}
|
|
|
|
config := &tls.Config{
|
|
// After going to cloud, certificates must match names, and we must take this option below
|
|
InsecureSkipVerify: true,
|
|
RootCAs: roots,
|
|
}
|
|
|
|
d := tls.Dialer{
|
|
Config: config,
|
|
}
|
|
|
|
conn, err := d.DialContext(ctx, "tcp", address)
|
|
if err != nil {
|
|
log.Fatal(err)
|
|
}
|
|
|
|
conn = newThreadSafeConnection(conn)
|
|
|
|
return conn
|
|
}
|
|
|
|
// Custom net.Conn with thread safety
|
|
func newThreadSafeConnection(c net.Conn) net.Conn {
|
|
type threadSafeConn struct {
|
|
net.Conn
|
|
sync.Locker
|
|
}
|
|
|
|
return &threadSafeConn{
|
|
Conn: c,
|
|
Locker: &sync.Mutex{},
|
|
}
|
|
}
|
|
|
|
func startConnection(id, user, pass string) paho.Connect {
|
|
|
|
connParameters := paho.Connect{
|
|
KeepAlive: 30,
|
|
ClientID: id,
|
|
CleanStart: true,
|
|
}
|
|
|
|
if id != "" {
|
|
connParameters.ClientID = id
|
|
} else {
|
|
mac, err := utils.GetMacAddr()
|
|
if err != nil {
|
|
log.Fatal(err)
|
|
}
|
|
connParameters.ClientID = mac[0]
|
|
}
|
|
|
|
if user != "" {
|
|
connParameters.Username = user
|
|
connParameters.UsernameFlag = true
|
|
}
|
|
if pass != "" {
|
|
connParameters.Password = []byte(pass)
|
|
connParameters.PasswordFlag = true
|
|
}
|
|
|
|
return connParameters
|
|
}
|
|
|
|
func messageHandler(msg chan *paho.Publish) {
|
|
for m := range msg {
|
|
log.Println("Received message:", string(m.Payload))
|
|
}
|
|
}
|