oktopus/backend/internal/mqtt/mqtt-client.go
Leandro Antônio Farias Machado 0e4a915ef4 feat: mqtt over tls #20
2023-03-17 09:49:16 -03:00

166 lines
3.2 KiB
Go

package mqtt
import (
"context"
"crypto/tls"
"crypto/x509"
"io/ioutil"
"log"
"net"
"sync"
"github.com/eclipse/paho.golang/paho"
"github.com/leandrofars/oktopus/internal/utils"
)
type Mqtt struct {
Addr string
Port string
Id string
User string
Passwd string
Ctx context.Context
QoS int
SubTopic string
CA string
}
var c *paho.Client
/* ------------------- Implementations of broker interface ------------------ */
func (m *Mqtt) Connect() {
clientConfig := startClient(m.Addr, m.Port, m.CA, m.Ctx)
connParameters := startConnection(m.Id, m.User, m.Passwd)
conn, err := clientConfig.Connect(m.Ctx, &connParameters)
if err != nil {
log.Fatal(err)
}
// Sets global client to be used by other mqtt functions
c = clientConfig
if conn.ReasonCode != 0 {
log.Fatalf("Failed to connect to %s : %d - %s", m.Addr, conn.ReasonCode, conn.Properties.ReasonString)
}
log.Printf("Connected to broker--> %s:%s", m.Addr, m.Port)
}
func (m *Mqtt) Disconnect() {
d := &paho.Disconnect{ReasonCode: 0}
err := c.Disconnect(d)
if err != nil {
log.Fatalf("failed to send Disconnect: %s", err)
}
}
func (m *Mqtt) Subscribe() {
if _, err := c.Subscribe(m.Ctx, &paho.Subscribe{
Subscriptions: map[string]paho.SubscribeOptions{
m.SubTopic: {QoS: byte(m.QoS), NoLocal: true},
},
}); err != nil {
log.Fatalln(err)
}
log.Printf("Subscribed to %s", m.SubTopic)
}
/* -------------------------------------------------------------------------- */
func startClient(addr string, port string, tlsCa string, ctx context.Context) *paho.Client {
if tlsCa != "" {
conn := conntWithTls(tlsCa, addr+":"+port, ctx)
clientConfig := paho.ClientConfig{
Conn: conn,
}
return paho.NewClient(clientConfig)
}
conn, err := net.Dial("tcp", addr+":"+port)
if err != nil {
log.Println(err)
}
clientConfig := paho.ClientConfig{
Conn: conn,
}
return paho.NewClient(clientConfig)
}
func conntWithTls(tlsCa, address string, ctx context.Context) net.Conn {
ca, err := ioutil.ReadFile(tlsCa)
if err != nil {
log.Fatal(err)
}
roots := x509.NewCertPool()
ok := roots.AppendCertsFromPEM(ca)
if !ok {
panic("failed to parse root certificate")
}
config := &tls.Config{
// After going to cloud, certificates must match names and we must take this option below
InsecureSkipVerify: true,
RootCAs: roots,
}
d := tls.Dialer{
Config: config,
}
conn, err := d.DialContext(ctx, "tcp", address)
if err != nil {
log.Fatal(err)
}
conn = newThreadSafeConnection(conn)
return conn
}
// Custom net.Conn with thread safety
func newThreadSafeConnection(c net.Conn) net.Conn {
type threadSafeConn struct {
net.Conn
sync.Locker
}
return &threadSafeConn{
Conn: c,
Locker: &sync.Mutex{},
}
}
func startConnection(id, user, pass string) paho.Connect {
connParameters := paho.Connect{
KeepAlive: 30,
ClientID: id,
CleanStart: true,
}
if id != "" {
connParameters.ClientID = id
} else {
mac, err := utils.GetMacAddr()
if err != nil {
log.Fatal(err)
}
connParameters.ClientID = mac[0]
}
if user != "" {
connParameters.UsernameFlag = true
}
if pass != "" {
connParameters.PasswordFlag = true
}
return connParameters
}