diff --git a/backend/services/mtp/mqtt-adapter/cmd/mqtt-adapter/main.go b/backend/services/mtp/mqtt-adapter/cmd/mqtt-adapter/main.go index eab14f6..0e42607 100644 --- a/backend/services/mtp/mqtt-adapter/cmd/mqtt-adapter/main.go +++ b/backend/services/mtp/mqtt-adapter/cmd/mqtt-adapter/main.go @@ -21,7 +21,14 @@ func main() { kv, publisher, subscriber := nats.StartNatsClient(c.Nats) bridge := bridge.NewBridge(publisher, subscriber, c.Mqtt.Ctx, c.Mqtt, kv) - bridge.StartBridge() + + if c.Mqtt.Url != "" { + bridge.StartBridge(c.Mqtt.Url, c.Mqtt.ClientId) + } + + if c.Mqtt.UrlForTls != "" { + bridge.StartBridge(c.Mqtt.UrlForTls, c.Mqtt.ClientId+"-tls") + } <-done diff --git a/backend/services/mtp/mqtt-adapter/internal/bridge/bridge.go b/backend/services/mtp/mqtt-adapter/internal/bridge/bridge.go index 0a65158..487a85e 100644 --- a/backend/services/mtp/mqtt-adapter/internal/bridge/bridge.go +++ b/backend/services/mtp/mqtt-adapter/internal/bridge/bridge.go @@ -2,6 +2,7 @@ package bridge import ( "context" + "crypto/tls" "encoding/json" "log" "net" @@ -56,9 +57,9 @@ func NewBridge(p Publisher, s Subscriber, ctx context.Context, m config.Mqtt, kv } } -func (b *Bridge) StartBridge() { +func (b *Bridge) StartBridge(serverUrl, clientId string) { - broker, _ := url.Parse(b.Mqtt.Url) + broker, _ := url.Parse(serverUrl) status := make(chan *paho.Publish) controller := make(chan *paho.Publish) @@ -66,7 +67,7 @@ func (b *Bridge) StartBridge() { go b.mqttMessageHandler(status, controller, apiMsg) - pahoClientConfig := buildClientConfig(status, controller, apiMsg, b.Mqtt.ClientId) + pahoClientConfig := buildClientConfig(status, controller, apiMsg, clientId) autopahoClientConfig := autopaho.ClientConfig{ BrokerUrls: []*url.URL{ @@ -76,13 +77,16 @@ func (b *Bridge) StartBridge() { ConnectRetryDelay: 5 * time.Second, ConnectTimeout: 5 * time.Second, OnConnectionUp: func(cm *autopaho.ConnectionManager, connAck *paho.Connack) { - log.Printf("Connected to MQTT broker--> %s", b.Mqtt.Url) + log.Printf("Connected to MQTT broker--> %s", serverUrl) subscribe(b.Mqtt.Ctx, b.Mqtt.Qos, cm) }, OnConnectError: func(err error) { log.Printf("Error while attempting connection: %s\n", err) }, ClientConfig: *pahoClientConfig, + TlsCfg: &tls.Config{ + InsecureSkipVerify: b.Mqtt.SkipVerify, + }, } b.setMqttPassword() diff --git a/backend/services/mtp/mqtt-adapter/internal/config/config.go b/backend/services/mtp/mqtt-adapter/internal/config/config.go index a29f4dc..9bac4a7 100644 --- a/backend/services/mtp/mqtt-adapter/internal/config/config.go +++ b/backend/services/mtp/mqtt-adapter/internal/config/config.go @@ -20,12 +20,14 @@ type Nats struct { } type Mqtt struct { - Url string - ClientId string - Username string - Password string - Qos int - Ctx context.Context + Url string + UrlForTls string + SkipVerify bool + ClientId string + Username string + Password string + Qos int + Ctx context.Context } type Config struct { @@ -42,6 +44,8 @@ func NewConfig() *Config { natsName := flag.String("nats_name", lookupEnvOrString("NATS_NAME", "mqtt-adapter"), "name for nats client") natsVerifyCertificates := flag.Bool("nats_verify_certificates", lookupEnvOrBool("NATS_VERIFY_CERTIFICATES", false), "verify validity of certificates from nats server") mqttUrl := flag.String("mqtt_url", lookupEnvOrString("MQTT_URL", "tcp://localhost:1883"), "url for mqtt server") + mqttsUrl := flag.String("mqtts_url", lookupEnvOrString("MQTTS_URL", ""), "url for mqtts server") + mqttsSkipVerify := flag.Bool("mqtts_skip_verify", lookupEnvOrBool("MQTTS_SKIP_VERIFY", false), "skip verification of server certificate for mqtts") mqttClientId := flag.String("mqtt_client_id", lookupEnvOrString("MQTT_CLIENT_ID", "mqtt-adapter"), "client id for mqtt") mqttUsername := flag.String("mqtt_username", lookupEnvOrString("MQTT_USERNAME", "oktopusController"), "username for mqtt") mqttQos := flag.Int("mqtt_qos", lookupEnvOrInt("MQTT_QOS", 1), "quality of service for mqtt") @@ -71,11 +75,13 @@ func NewConfig() *Config { Ctx: ctx, }, Mqtt: Mqtt{ - Url: *mqttUrl, - ClientId: *mqttClientId, - Username: *mqttUsername, - Ctx: ctx, - Qos: *mqttQos, + Url: *mqttUrl, + UrlForTls: *mqttsUrl, + SkipVerify: *mqttsSkipVerify, + ClientId: *mqttClientId, + Username: *mqttUsername, + Ctx: ctx, + Qos: *mqttQos, }, } } diff --git a/backend/services/mtp/mqtt/internal/config/config.go b/backend/services/mtp/mqtt/internal/config/config.go index b33504a..805d4ae 100644 --- a/backend/services/mtp/mqtt/internal/config/config.go +++ b/backend/services/mtp/mqtt/internal/config/config.go @@ -14,7 +14,9 @@ const LOCAL_ENV = ".env.local" type Config struct { MqttPort string + NoTls bool Tls bool + MqttTlsPort string Fullchain string Privkey string AuthEnable bool @@ -48,7 +50,9 @@ func NewConfig() Config { */ mqttPort := flag.String("mqtt_port", lookupEnvOrString("MQTT_PORT", ":1883"), "port for MQTT listener") + mqttTlsPort := flag.String("mqtt_tls_port", lookupEnvOrString("MQTT_TLS_PORT", ":8883"), "port for MQTT TLS listener") tls := flag.Bool("mqtt_tls", lookupEnvOrBool("MQTT_TLS", false), "enable/disable TLS") + noTls := flag.Bool("mqtt_no_tls", lookupEnvOrBool("MQTT_NO_TLS", true), "enable/disable mqtt without TLS") fullchain := flag.String("full_chain_path", lookupEnvOrString("FULL_CHAIN_PATH", ""), "path to fullchain.pem certificate") privkey := flag.String("private_key_path", lookupEnvOrString("PRIVATE_KEY_PATH", ""), "path to privkey.pem certificate") authEnable := flag.Bool("auth_enable", lookupEnvOrBool("AUTH_ENABLE", false), "enable authentication") @@ -72,10 +76,16 @@ func NewConfig() Config { os.Exit(0) } + if !*noTls && *tls { + log.Fatalln("You can't disable mqtt with and without TLS, choose at least one option.") + } + ctx := context.TODO() conf := Config{ MqttPort: *mqttPort, + MqttTlsPort: *mqttTlsPort, + NoTls: *noTls, Tls: *tls, Fullchain: *fullchain, Privkey: *privkey, diff --git a/backend/services/mtp/mqtt/internal/listeners/listeners.go b/backend/services/mtp/mqtt/internal/listeners/listeners.go index 6e601bc..d72bd73 100644 --- a/backend/services/mtp/mqtt/internal/listeners/listeners.go +++ b/backend/services/mtp/mqtt/internal/listeners/listeners.go @@ -53,6 +53,8 @@ func StartServers(c config.Config) { func newMqttServer(c config.Config) *broker.Mqtt { return &broker.Mqtt{ Port: c.MqttPort, + TlsPort: c.MqttTlsPort, + NoTls: c.NoTls, Tls: c.Tls, Fullchain: c.Fullchain, Privkey: c.Privkey, diff --git a/backend/services/mtp/mqtt/internal/listeners/mqtt/mqtt.go b/backend/services/mtp/mqtt/internal/listeners/mqtt/mqtt.go index bdf3728..f45725b 100644 --- a/backend/services/mtp/mqtt/internal/listeners/mqtt/mqtt.go +++ b/backend/services/mtp/mqtt/internal/listeners/mqtt/mqtt.go @@ -21,7 +21,9 @@ var ( type Mqtt struct { Port string + TlsPort string Tls bool + NoTls bool Fullchain string Privkey string Redis Redis @@ -46,9 +48,13 @@ func (m *Mqtt) Start(mqttServer *mqtt.Server) { var tlsConfig *listeners.Config if m.Tls { tlsConfig = defineServerTls(m.Fullchain, m.Privkey) + createListener(mqttServer, m.TlsPort, tlsConfig) + } + + if m.NoTls { + createListener(mqttServer, m.Port, nil) } - createListener(mqttServer, m.Port, tlsConfig) addHooks(mqttServer, m.Redis) }