diff --git a/agent/.gitignore b/agent/.gitignore index 4ecdcdb..f5d69b0 100644 --- a/agent/.gitignore +++ b/agent/.gitignore @@ -1 +1,2 @@ -obuspa/ \ No newline at end of file +obuspa/ +*.local \ No newline at end of file diff --git a/backend/services/mtp/mqtt-adapter/cmd/mqtt-adapter/main.go b/backend/services/mtp/mqtt-adapter/cmd/mqtt-adapter/main.go index eab14f6..0e42607 100644 --- a/backend/services/mtp/mqtt-adapter/cmd/mqtt-adapter/main.go +++ b/backend/services/mtp/mqtt-adapter/cmd/mqtt-adapter/main.go @@ -21,7 +21,14 @@ func main() { kv, publisher, subscriber := nats.StartNatsClient(c.Nats) bridge := bridge.NewBridge(publisher, subscriber, c.Mqtt.Ctx, c.Mqtt, kv) - bridge.StartBridge() + + if c.Mqtt.Url != "" { + bridge.StartBridge(c.Mqtt.Url, c.Mqtt.ClientId) + } + + if c.Mqtt.UrlForTls != "" { + bridge.StartBridge(c.Mqtt.UrlForTls, c.Mqtt.ClientId+"-tls") + } <-done diff --git a/backend/services/mtp/mqtt-adapter/internal/bridge/bridge.go b/backend/services/mtp/mqtt-adapter/internal/bridge/bridge.go index 0a65158..487a85e 100644 --- a/backend/services/mtp/mqtt-adapter/internal/bridge/bridge.go +++ b/backend/services/mtp/mqtt-adapter/internal/bridge/bridge.go @@ -2,6 +2,7 @@ package bridge import ( "context" + "crypto/tls" "encoding/json" "log" "net" @@ -56,9 +57,9 @@ func NewBridge(p Publisher, s Subscriber, ctx context.Context, m config.Mqtt, kv } } -func (b *Bridge) StartBridge() { +func (b *Bridge) StartBridge(serverUrl, clientId string) { - broker, _ := url.Parse(b.Mqtt.Url) + broker, _ := url.Parse(serverUrl) status := make(chan *paho.Publish) controller := make(chan *paho.Publish) @@ -66,7 +67,7 @@ func (b *Bridge) StartBridge() { go b.mqttMessageHandler(status, controller, apiMsg) - pahoClientConfig := buildClientConfig(status, controller, apiMsg, b.Mqtt.ClientId) + pahoClientConfig := buildClientConfig(status, controller, apiMsg, clientId) autopahoClientConfig := autopaho.ClientConfig{ BrokerUrls: []*url.URL{ @@ -76,13 +77,16 @@ func (b *Bridge) StartBridge() { ConnectRetryDelay: 5 * time.Second, ConnectTimeout: 5 * time.Second, OnConnectionUp: func(cm *autopaho.ConnectionManager, connAck *paho.Connack) { - log.Printf("Connected to MQTT broker--> %s", b.Mqtt.Url) + log.Printf("Connected to MQTT broker--> %s", serverUrl) subscribe(b.Mqtt.Ctx, b.Mqtt.Qos, cm) }, OnConnectError: func(err error) { log.Printf("Error while attempting connection: %s\n", err) }, ClientConfig: *pahoClientConfig, + TlsCfg: &tls.Config{ + InsecureSkipVerify: b.Mqtt.SkipVerify, + }, } b.setMqttPassword() diff --git a/backend/services/mtp/mqtt-adapter/internal/config/config.go b/backend/services/mtp/mqtt-adapter/internal/config/config.go index a29f4dc..9bac4a7 100644 --- a/backend/services/mtp/mqtt-adapter/internal/config/config.go +++ b/backend/services/mtp/mqtt-adapter/internal/config/config.go @@ -20,12 +20,14 @@ type Nats struct { } type Mqtt struct { - Url string - ClientId string - Username string - Password string - Qos int - Ctx context.Context + Url string + UrlForTls string + SkipVerify bool + ClientId string + Username string + Password string + Qos int + Ctx context.Context } type Config struct { @@ -42,6 +44,8 @@ func NewConfig() *Config { natsName := flag.String("nats_name", lookupEnvOrString("NATS_NAME", "mqtt-adapter"), "name for nats client") natsVerifyCertificates := flag.Bool("nats_verify_certificates", lookupEnvOrBool("NATS_VERIFY_CERTIFICATES", false), "verify validity of certificates from nats server") mqttUrl := flag.String("mqtt_url", lookupEnvOrString("MQTT_URL", "tcp://localhost:1883"), "url for mqtt server") + mqttsUrl := flag.String("mqtts_url", lookupEnvOrString("MQTTS_URL", ""), "url for mqtts server") + mqttsSkipVerify := flag.Bool("mqtts_skip_verify", lookupEnvOrBool("MQTTS_SKIP_VERIFY", false), "skip verification of server certificate for mqtts") mqttClientId := flag.String("mqtt_client_id", lookupEnvOrString("MQTT_CLIENT_ID", "mqtt-adapter"), "client id for mqtt") mqttUsername := flag.String("mqtt_username", lookupEnvOrString("MQTT_USERNAME", "oktopusController"), "username for mqtt") mqttQos := flag.Int("mqtt_qos", lookupEnvOrInt("MQTT_QOS", 1), "quality of service for mqtt") @@ -71,11 +75,13 @@ func NewConfig() *Config { Ctx: ctx, }, Mqtt: Mqtt{ - Url: *mqttUrl, - ClientId: *mqttClientId, - Username: *mqttUsername, - Ctx: ctx, - Qos: *mqttQos, + Url: *mqttUrl, + UrlForTls: *mqttsUrl, + SkipVerify: *mqttsSkipVerify, + ClientId: *mqttClientId, + Username: *mqttUsername, + Ctx: ctx, + Qos: *mqttQos, }, } } diff --git a/backend/services/mtp/mqtt/internal/config/config.go b/backend/services/mtp/mqtt/internal/config/config.go index b33504a..805d4ae 100644 --- a/backend/services/mtp/mqtt/internal/config/config.go +++ b/backend/services/mtp/mqtt/internal/config/config.go @@ -14,7 +14,9 @@ const LOCAL_ENV = ".env.local" type Config struct { MqttPort string + NoTls bool Tls bool + MqttTlsPort string Fullchain string Privkey string AuthEnable bool @@ -48,7 +50,9 @@ func NewConfig() Config { */ mqttPort := flag.String("mqtt_port", lookupEnvOrString("MQTT_PORT", ":1883"), "port for MQTT listener") + mqttTlsPort := flag.String("mqtt_tls_port", lookupEnvOrString("MQTT_TLS_PORT", ":8883"), "port for MQTT TLS listener") tls := flag.Bool("mqtt_tls", lookupEnvOrBool("MQTT_TLS", false), "enable/disable TLS") + noTls := flag.Bool("mqtt_no_tls", lookupEnvOrBool("MQTT_NO_TLS", true), "enable/disable mqtt without TLS") fullchain := flag.String("full_chain_path", lookupEnvOrString("FULL_CHAIN_PATH", ""), "path to fullchain.pem certificate") privkey := flag.String("private_key_path", lookupEnvOrString("PRIVATE_KEY_PATH", ""), "path to privkey.pem certificate") authEnable := flag.Bool("auth_enable", lookupEnvOrBool("AUTH_ENABLE", false), "enable authentication") @@ -72,10 +76,16 @@ func NewConfig() Config { os.Exit(0) } + if !*noTls && *tls { + log.Fatalln("You can't disable mqtt with and without TLS, choose at least one option.") + } + ctx := context.TODO() conf := Config{ MqttPort: *mqttPort, + MqttTlsPort: *mqttTlsPort, + NoTls: *noTls, Tls: *tls, Fullchain: *fullchain, Privkey: *privkey, diff --git a/backend/services/mtp/mqtt/internal/listeners/listeners.go b/backend/services/mtp/mqtt/internal/listeners/listeners.go index 6e601bc..d72bd73 100644 --- a/backend/services/mtp/mqtt/internal/listeners/listeners.go +++ b/backend/services/mtp/mqtt/internal/listeners/listeners.go @@ -53,6 +53,8 @@ func StartServers(c config.Config) { func newMqttServer(c config.Config) *broker.Mqtt { return &broker.Mqtt{ Port: c.MqttPort, + TlsPort: c.MqttTlsPort, + NoTls: c.NoTls, Tls: c.Tls, Fullchain: c.Fullchain, Privkey: c.Privkey, diff --git a/backend/services/mtp/mqtt/internal/listeners/mqtt/mqtt.go b/backend/services/mtp/mqtt/internal/listeners/mqtt/mqtt.go index bdf3728..f45725b 100644 --- a/backend/services/mtp/mqtt/internal/listeners/mqtt/mqtt.go +++ b/backend/services/mtp/mqtt/internal/listeners/mqtt/mqtt.go @@ -21,7 +21,9 @@ var ( type Mqtt struct { Port string + TlsPort string Tls bool + NoTls bool Fullchain string Privkey string Redis Redis @@ -46,9 +48,13 @@ func (m *Mqtt) Start(mqttServer *mqtt.Server) { var tlsConfig *listeners.Config if m.Tls { tlsConfig = defineServerTls(m.Fullchain, m.Privkey) + createListener(mqttServer, m.TlsPort, tlsConfig) + } + + if m.NoTls { + createListener(mqttServer, m.Port, nil) } - createListener(mqttServer, m.Port, tlsConfig) addHooks(mqttServer, m.Redis) } diff --git a/backend/services/mtp/ws-adapter/cmd/ws-adapter/main.go b/backend/services/mtp/ws-adapter/cmd/ws-adapter/main.go index ba0c588..9c6e989 100644 --- a/backend/services/mtp/ws-adapter/cmd/ws-adapter/main.go +++ b/backend/services/mtp/ws-adapter/cmd/ws-adapter/main.go @@ -21,7 +21,14 @@ func main() { kv, publisher, subscriber := nats.StartNatsClient(c.Nats) bridge := bridge.NewBridge(publisher, subscriber, c.Ws.Ctx, c.Ws, kv) - bridge.StartBridge() + + if !c.Ws.NoTls { + bridge.StartBridge(c.Ws.Port, false) + } + + if c.Ws.TlsEnable { + bridge.StartBridge(c.Ws.TlsPort, true) + } <-done diff --git a/backend/services/mtp/ws-adapter/internal/bridge/bridge.go b/backend/services/mtp/ws-adapter/internal/bridge/bridge.go index 6d2f236..360d1c9 100644 --- a/backend/services/mtp/ws-adapter/internal/bridge/bridge.go +++ b/backend/services/mtp/ws-adapter/internal/bridge/bridge.go @@ -69,11 +69,11 @@ func NewBridge(p Publisher, s Subscriber, ctx context.Context, w config.Ws, kv j } } -func (b *Bridge) StartBridge() { +func (b *Bridge) StartBridge(port string, tls bool) { - go func() { + go func(port string, tls bool) { for { - url := b.urlBuild() + url := b.urlBuild(tls, port) dialer := b.newDialer() wc, _, err := dialer.Dial(url, nil) if err != nil { @@ -89,7 +89,7 @@ func (b *Bridge) StartBridge() { if err != nil { if websocket.IsCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { log.Printf("websocket error: %v", err) - b.StartBridge() + b.StartBridge(port, tls) return } log.Println("websocket unexpected error:", err) @@ -130,7 +130,7 @@ func (b *Bridge) StartBridge() { }(wc) break } - }() + }(port, tls) } func (b *Bridge) subscribe(wc *websocket.Conn) { @@ -224,13 +224,13 @@ func (b *Bridge) statusMsgHandler(wsMsg []byte) { b.Pub(NATS_WS_SUBJECT_PREFIX+deviceStatus.Eid+".status", []byte(deviceStatus.Status)) } -func (b *Bridge) urlBuild() string { +func (b *Bridge) urlBuild(tls bool, port string) string { prefix := "ws://" - if b.Ws.TlsEnable { + if tls { prefix = "wss://" } - wsUrl := prefix + b.Ws.Addr + b.Ws.Port + b.Ws.Route + wsUrl := prefix + b.Ws.Addr + port + b.Ws.Route token, _ := b.kv.Get(b.Ctx, "oktopusController") diff --git a/backend/services/mtp/ws-adapter/internal/config/config.go b/backend/services/mtp/ws-adapter/internal/config/config.go index eb4c026..c78e4dd 100644 --- a/backend/services/mtp/ws-adapter/internal/config/config.go +++ b/backend/services/mtp/ws-adapter/internal/config/config.go @@ -25,7 +25,9 @@ type Ws struct { Port string Route string TlsEnable bool + TlsPort string SkipTlsVerify bool + NoTls bool Ctx context.Context } @@ -44,6 +46,8 @@ func NewConfig() *Config { wsAuthEnable := flag.Bool("ws_auth_enable", lookupEnvOrBool("WS_AUTH_ENABLE", false), "enable authentication for websocket server") wsAddr := flag.String("ws_addr", lookupEnvOrString("WS_ADDR", "localhost"), "websocket server address (domain or ip)") wsPort := flag.String("ws_port", lookupEnvOrString("WS_PORT", ":8080"), "websocket server port") + wsTlsPort := flag.String("ws_tls_port", lookupEnvOrString("WS_TLS_PORT", ":8081"), "websocket tls server port") + wsNoTls := flag.Bool("ws_no_tls", lookupEnvOrBool("WS_NO_TLS", false), "connects to websocket server without tls") wsRoute := flag.String("ws_route", lookupEnvOrString("WS_ROUTE", "/ws/controller"), "websocket server route") wsTlsEnable := flag.Bool("ws_tls_enable", lookupEnvOrBool("WS_TLS_ENABLE", false), "access websocket via tls protocol (wss)") wsSkipTlsVerify := flag.Bool("ws_skip_tls_verify", lookupEnvOrBool("WS_SKIP_TLS_VERIFY", false), "skip tls verification for websocket server") @@ -56,6 +60,10 @@ func NewConfig() *Config { os.Exit(0) } + if *wsNoTls && !*wsTlsEnable { + log.Fatalf("You must configure at least one connection to the websocket server") + } + ctx := context.TODO() return &Config{ @@ -73,6 +81,8 @@ func NewConfig() *Config { TlsEnable: *wsTlsEnable, SkipTlsVerify: *wsSkipTlsVerify, Ctx: ctx, + TlsPort: *wsTlsPort, + NoTls: *wsNoTls, }, } } diff --git a/backend/services/mtp/ws/internal/config/config.go b/backend/services/mtp/ws/internal/config/config.go index 840c0e6..f79f176 100644 --- a/backend/services/mtp/ws/internal/config/config.go +++ b/backend/services/mtp/ws/internal/config/config.go @@ -16,6 +16,10 @@ type Config struct { Auth bool // server auth enable/disable ControllerEID string // controller endpoint id Tls bool // enable/diable websockets server tls + TlsPort string + NoTls bool + FullChain string + PrivateKey string Nats Nats } @@ -47,7 +51,11 @@ func NewConfig() Config { flPort := flag.String("port", lookupEnvOrString("SERVER_PORT", ":8080"), "Server port") flAuth := flag.Bool("auth", lookupEnvOrBool("SERVER_AUTH_ENABLE", false), "Server auth enable/disable") flControllerEid := flag.String("controller-eid", lookupEnvOrString("CONTROLLER_EID", "oktopusController"), "Controller eid") - flTls := flag.Bool("tls", lookupEnvOrBool("SERVER_TLS_ENABLE", false), "Enable/diable websockets server tls") + flTls := flag.Bool("tls", lookupEnvOrBool("SERVER_TLS_ENABLE", false), "Enable/disable websockets server tls") + flTlsPort := flag.String("tls_port", lookupEnvOrString("SERVER_TLS_PORT", ":8081"), "Server Port to use if TLS is enabled") + flNoTls := flag.Bool("no_tls", lookupEnvOrBool("SERVER_NO_TLS", false), "Disable/enable websockets serevr without tls") + flFullchain := flag.String("fullchain_path", lookupEnvOrString("FULL_CHAIN_PATH", "cert.pem"), "Fullchain file path") + flPrivKey := flag.String("privkey_path", lookupEnvOrString("PRIVATE_KEY_PATH", "key.pem"), "Private key file path") flHelp := flag.Bool("help", false, "Help") flag.Parse() /* -------------------------------------------------------------------------- */ @@ -57,13 +65,21 @@ func NewConfig() Config { os.Exit(0) } + if *flNoTls && !*flTls { + log.Fatalf("You must at least choose one between tls and no_tls configs") + } + ctx := context.TODO() return Config{ Port: *flPort, + TlsPort: *flTlsPort, + NoTls: *flNoTls, Auth: *flAuth, ControllerEID: *flControllerEid, Tls: *flTls, + FullChain: *flFullchain, + PrivateKey: *flPrivKey, Nats: Nats{ Url: *natsUrl, Name: *natsName, diff --git a/backend/services/mtp/ws/internal/ws/ws.go b/backend/services/mtp/ws/internal/ws/ws.go index d08ef44..264344a 100644 --- a/backend/services/mtp/ws/internal/ws/ws.go +++ b/backend/services/mtp/ws/internal/ws/ws.go @@ -30,12 +30,16 @@ func StartNewServer(c config.Config, kv jetstream.KeyValue) { go func() { if c.Tls { - log.Println("Websockets server running with TLS") - err := http.ListenAndServeTLS(c.Port, "cert.pem", "key.pem", r) + log.Println("Websockets server running with TLS at port", c.TlsPort) + err := http.ListenAndServeTLS(c.TlsPort, c.FullChain, c.PrivateKey, r) if err != nil { log.Fatal("ListenAndServeTLS: ", err) } - } else { + } + }() + + go func() { + if !c.NoTls { log.Println("Websockets server running at port", c.Port) err := http.ListenAndServe(c.Port, r) if err != nil {