commit
5ebfc25250
|
|
@ -50,7 +50,7 @@ func main() {
|
|||
flSubTopic := flag.String("sub", "oktopus/+/controller/+", "That's the topic agent must publish to, and the controller keeps on listening.")
|
||||
flBrokerAddr := flag.String("a", "localhost", "Mqtt broker adrress")
|
||||
flBrokerPort := flag.String("p", "1883", "Mqtt broker port")
|
||||
flTlsCert := flag.String("ca", "", "TLS ca certificate")
|
||||
flTlsCert := flag.Bool("tls", false, "Connect to broker over TLS")
|
||||
flBrokerUsername := flag.String("u", "", "Mqtt broker username")
|
||||
flBrokerPassword := flag.String("P", "", "Mqtt broker password")
|
||||
flBrokerClientId := flag.String("i", "", "A clientid for the Mqtt connection")
|
||||
|
|
@ -87,7 +87,7 @@ func main() {
|
|||
SubTopic: *flSubTopic,
|
||||
DevicesTopic: *flDevicesTopic,
|
||||
DisconnectTopic: *flDisconTopic,
|
||||
CA: *flTlsCert,
|
||||
TLS: *flTlsCert,
|
||||
DB: database,
|
||||
MsgQueue: apiMsgQueue,
|
||||
QMutex: &m,
|
||||
|
|
|
|||
|
|
@ -20,6 +20,11 @@ func NewDatabase(ctx context.Context, mongoUri string) Database {
|
|||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
log.Println("Trying to ping Mongo database...")
|
||||
err = client.Ping(ctx, nil)
|
||||
if err != nil {
|
||||
log.Fatal("Couldn't connect to MongoDB --> ", err)
|
||||
}
|
||||
|
||||
log.Println("Connected to MongoDB-->", mongoUri)
|
||||
devices := client.Database("oktopus").Collection("devices")
|
||||
|
|
|
|||
|
|
@ -3,18 +3,18 @@ package mqtt
|
|||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"github.com/eclipse/paho.golang/paho"
|
||||
"github.com/leandrofars/oktopus/internal/db"
|
||||
usp_msg "github.com/leandrofars/oktopus/internal/usp_message"
|
||||
"github.com/leandrofars/oktopus/internal/usp_record"
|
||||
"github.com/leandrofars/oktopus/internal/utils"
|
||||
"google.golang.org/protobuf/proto"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"net"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Mqtt struct {
|
||||
|
|
@ -28,7 +28,7 @@ type Mqtt struct {
|
|||
SubTopic string
|
||||
DevicesTopic string
|
||||
DisconnectTopic string
|
||||
CA string
|
||||
TLS bool
|
||||
DB db.Database
|
||||
MsgQueue map[string](chan usp_msg.Msg)
|
||||
QMutex *sync.Mutex
|
||||
|
|
@ -120,8 +120,8 @@ func (m *Mqtt) startClient(devices, controller, disconnect, apiMsg chan *paho.Pu
|
|||
}
|
||||
})
|
||||
|
||||
if m.CA != "" {
|
||||
conn := connWithTls(m.CA, m.Addr+":"+m.Port, m.Ctx)
|
||||
if m.TLS {
|
||||
conn := connWithTls(m.Addr+":"+m.Port, m.Ctx)
|
||||
clientConfig := paho.ClientConfig{
|
||||
Conn: conn,
|
||||
Router: singleHandler,
|
||||
|
|
@ -135,10 +135,7 @@ func (m *Mqtt) startClient(devices, controller, disconnect, apiMsg chan *paho.Pu
|
|||
return paho.NewClient(clientConfig)
|
||||
}
|
||||
|
||||
conn, err := net.Dial("tcp", m.Addr+":"+m.Port)
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
}
|
||||
conn, _ := connWithoutTLS(m.Addr, m.Port)
|
||||
|
||||
clientConfig := paho.ClientConfig{
|
||||
Conn: conn,
|
||||
|
|
@ -148,22 +145,29 @@ func (m *Mqtt) startClient(devices, controller, disconnect, apiMsg chan *paho.Pu
|
|||
return paho.NewClient(clientConfig)
|
||||
}
|
||||
|
||||
func connWithTls(tlsCa, address string, ctx context.Context) net.Conn {
|
||||
ca, err := ioutil.ReadFile(tlsCa)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
func connWithoutTLS(addr, port string) (net.Conn, error) {
|
||||
x := 1
|
||||
for {
|
||||
log.Println("Trying connection to broker...")
|
||||
conn, err := net.Dial("tcp", addr+":"+port)
|
||||
if err == nil {
|
||||
log.Println("Successfully connected to broker!")
|
||||
return conn, err
|
||||
} else {
|
||||
if x == 5 {
|
||||
log.Println("Couldn't connect to broker")
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
roots := x509.NewCertPool()
|
||||
ok := roots.AppendCertsFromPEM(ca)
|
||||
if !ok {
|
||||
panic("failed to parse root certificate")
|
||||
x = x + 1
|
||||
time.Sleep(5 * time.Second)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func connWithTls(address string, ctx context.Context) net.Conn {
|
||||
config := &tls.Config{
|
||||
// After going to cloud, certificates must match names, and we must take this option below
|
||||
InsecureSkipVerify: true,
|
||||
RootCAs: roots,
|
||||
}
|
||||
|
||||
d := tls.Dialer{
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user