Merge pull request #238 from OktopUSP/dev

Enable tls communication | closes #210
This commit is contained in:
Leandro Antônio Farias Machado 2024-04-16 13:53:40 -03:00 committed by GitHub
commit 5d735f130b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 104 additions and 31 deletions

1
agent/.gitignore vendored
View File

@ -1 +1,2 @@
obuspa/
*.local

View File

@ -21,7 +21,14 @@ func main() {
kv, publisher, subscriber := nats.StartNatsClient(c.Nats)
bridge := bridge.NewBridge(publisher, subscriber, c.Mqtt.Ctx, c.Mqtt, kv)
bridge.StartBridge()
if c.Mqtt.Url != "" {
bridge.StartBridge(c.Mqtt.Url, c.Mqtt.ClientId)
}
if c.Mqtt.UrlForTls != "" {
bridge.StartBridge(c.Mqtt.UrlForTls, c.Mqtt.ClientId+"-tls")
}
<-done

View File

@ -2,6 +2,7 @@ package bridge
import (
"context"
"crypto/tls"
"encoding/json"
"log"
"net"
@ -56,9 +57,9 @@ func NewBridge(p Publisher, s Subscriber, ctx context.Context, m config.Mqtt, kv
}
}
func (b *Bridge) StartBridge() {
func (b *Bridge) StartBridge(serverUrl, clientId string) {
broker, _ := url.Parse(b.Mqtt.Url)
broker, _ := url.Parse(serverUrl)
status := make(chan *paho.Publish)
controller := make(chan *paho.Publish)
@ -66,7 +67,7 @@ func (b *Bridge) StartBridge() {
go b.mqttMessageHandler(status, controller, apiMsg)
pahoClientConfig := buildClientConfig(status, controller, apiMsg, b.Mqtt.ClientId)
pahoClientConfig := buildClientConfig(status, controller, apiMsg, clientId)
autopahoClientConfig := autopaho.ClientConfig{
BrokerUrls: []*url.URL{
@ -76,13 +77,16 @@ func (b *Bridge) StartBridge() {
ConnectRetryDelay: 5 * time.Second,
ConnectTimeout: 5 * time.Second,
OnConnectionUp: func(cm *autopaho.ConnectionManager, connAck *paho.Connack) {
log.Printf("Connected to MQTT broker--> %s", b.Mqtt.Url)
log.Printf("Connected to MQTT broker--> %s", serverUrl)
subscribe(b.Mqtt.Ctx, b.Mqtt.Qos, cm)
},
OnConnectError: func(err error) {
log.Printf("Error while attempting connection: %s\n", err)
},
ClientConfig: *pahoClientConfig,
TlsCfg: &tls.Config{
InsecureSkipVerify: b.Mqtt.SkipVerify,
},
}
b.setMqttPassword()

View File

@ -20,12 +20,14 @@ type Nats struct {
}
type Mqtt struct {
Url string
ClientId string
Username string
Password string
Qos int
Ctx context.Context
Url string
UrlForTls string
SkipVerify bool
ClientId string
Username string
Password string
Qos int
Ctx context.Context
}
type Config struct {
@ -42,6 +44,8 @@ func NewConfig() *Config {
natsName := flag.String("nats_name", lookupEnvOrString("NATS_NAME", "mqtt-adapter"), "name for nats client")
natsVerifyCertificates := flag.Bool("nats_verify_certificates", lookupEnvOrBool("NATS_VERIFY_CERTIFICATES", false), "verify validity of certificates from nats server")
mqttUrl := flag.String("mqtt_url", lookupEnvOrString("MQTT_URL", "tcp://localhost:1883"), "url for mqtt server")
mqttsUrl := flag.String("mqtts_url", lookupEnvOrString("MQTTS_URL", ""), "url for mqtts server")
mqttsSkipVerify := flag.Bool("mqtts_skip_verify", lookupEnvOrBool("MQTTS_SKIP_VERIFY", false), "skip verification of server certificate for mqtts")
mqttClientId := flag.String("mqtt_client_id", lookupEnvOrString("MQTT_CLIENT_ID", "mqtt-adapter"), "client id for mqtt")
mqttUsername := flag.String("mqtt_username", lookupEnvOrString("MQTT_USERNAME", "oktopusController"), "username for mqtt")
mqttQos := flag.Int("mqtt_qos", lookupEnvOrInt("MQTT_QOS", 1), "quality of service for mqtt")
@ -71,11 +75,13 @@ func NewConfig() *Config {
Ctx: ctx,
},
Mqtt: Mqtt{
Url: *mqttUrl,
ClientId: *mqttClientId,
Username: *mqttUsername,
Ctx: ctx,
Qos: *mqttQos,
Url: *mqttUrl,
UrlForTls: *mqttsUrl,
SkipVerify: *mqttsSkipVerify,
ClientId: *mqttClientId,
Username: *mqttUsername,
Ctx: ctx,
Qos: *mqttQos,
},
}
}

View File

@ -14,7 +14,9 @@ const LOCAL_ENV = ".env.local"
type Config struct {
MqttPort string
NoTls bool
Tls bool
MqttTlsPort string
Fullchain string
Privkey string
AuthEnable bool
@ -48,7 +50,9 @@ func NewConfig() Config {
*/
mqttPort := flag.String("mqtt_port", lookupEnvOrString("MQTT_PORT", ":1883"), "port for MQTT listener")
mqttTlsPort := flag.String("mqtt_tls_port", lookupEnvOrString("MQTT_TLS_PORT", ":8883"), "port for MQTT TLS listener")
tls := flag.Bool("mqtt_tls", lookupEnvOrBool("MQTT_TLS", false), "enable/disable TLS")
noTls := flag.Bool("mqtt_no_tls", lookupEnvOrBool("MQTT_NO_TLS", true), "enable/disable mqtt without TLS")
fullchain := flag.String("full_chain_path", lookupEnvOrString("FULL_CHAIN_PATH", ""), "path to fullchain.pem certificate")
privkey := flag.String("private_key_path", lookupEnvOrString("PRIVATE_KEY_PATH", ""), "path to privkey.pem certificate")
authEnable := flag.Bool("auth_enable", lookupEnvOrBool("AUTH_ENABLE", false), "enable authentication")
@ -72,10 +76,16 @@ func NewConfig() Config {
os.Exit(0)
}
if !*noTls && *tls {
log.Fatalln("You can't disable mqtt with and without TLS, choose at least one option.")
}
ctx := context.TODO()
conf := Config{
MqttPort: *mqttPort,
MqttTlsPort: *mqttTlsPort,
NoTls: *noTls,
Tls: *tls,
Fullchain: *fullchain,
Privkey: *privkey,

View File

@ -53,6 +53,8 @@ func StartServers(c config.Config) {
func newMqttServer(c config.Config) *broker.Mqtt {
return &broker.Mqtt{
Port: c.MqttPort,
TlsPort: c.MqttTlsPort,
NoTls: c.NoTls,
Tls: c.Tls,
Fullchain: c.Fullchain,
Privkey: c.Privkey,

View File

@ -21,7 +21,9 @@ var (
type Mqtt struct {
Port string
TlsPort string
Tls bool
NoTls bool
Fullchain string
Privkey string
Redis Redis
@ -46,9 +48,13 @@ func (m *Mqtt) Start(mqttServer *mqtt.Server) {
var tlsConfig *listeners.Config
if m.Tls {
tlsConfig = defineServerTls(m.Fullchain, m.Privkey)
createListener(mqttServer, m.TlsPort, tlsConfig)
}
if m.NoTls {
createListener(mqttServer, m.Port, nil)
}
createListener(mqttServer, m.Port, tlsConfig)
addHooks(mqttServer, m.Redis)
}

View File

@ -21,7 +21,14 @@ func main() {
kv, publisher, subscriber := nats.StartNatsClient(c.Nats)
bridge := bridge.NewBridge(publisher, subscriber, c.Ws.Ctx, c.Ws, kv)
bridge.StartBridge()
if !c.Ws.NoTls {
bridge.StartBridge(c.Ws.Port, false)
}
if c.Ws.TlsEnable {
bridge.StartBridge(c.Ws.TlsPort, true)
}
<-done

View File

@ -69,11 +69,11 @@ func NewBridge(p Publisher, s Subscriber, ctx context.Context, w config.Ws, kv j
}
}
func (b *Bridge) StartBridge() {
func (b *Bridge) StartBridge(port string, tls bool) {
go func() {
go func(port string, tls bool) {
for {
url := b.urlBuild()
url := b.urlBuild(tls, port)
dialer := b.newDialer()
wc, _, err := dialer.Dial(url, nil)
if err != nil {
@ -89,7 +89,7 @@ func (b *Bridge) StartBridge() {
if err != nil {
if websocket.IsCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
log.Printf("websocket error: %v", err)
b.StartBridge()
b.StartBridge(port, tls)
return
}
log.Println("websocket unexpected error:", err)
@ -130,7 +130,7 @@ func (b *Bridge) StartBridge() {
}(wc)
break
}
}()
}(port, tls)
}
func (b *Bridge) subscribe(wc *websocket.Conn) {
@ -224,13 +224,13 @@ func (b *Bridge) statusMsgHandler(wsMsg []byte) {
b.Pub(NATS_WS_SUBJECT_PREFIX+deviceStatus.Eid+".status", []byte(deviceStatus.Status))
}
func (b *Bridge) urlBuild() string {
func (b *Bridge) urlBuild(tls bool, port string) string {
prefix := "ws://"
if b.Ws.TlsEnable {
if tls {
prefix = "wss://"
}
wsUrl := prefix + b.Ws.Addr + b.Ws.Port + b.Ws.Route
wsUrl := prefix + b.Ws.Addr + port + b.Ws.Route
token, _ := b.kv.Get(b.Ctx, "oktopusController")

View File

@ -25,7 +25,9 @@ type Ws struct {
Port string
Route string
TlsEnable bool
TlsPort string
SkipTlsVerify bool
NoTls bool
Ctx context.Context
}
@ -44,6 +46,8 @@ func NewConfig() *Config {
wsAuthEnable := flag.Bool("ws_auth_enable", lookupEnvOrBool("WS_AUTH_ENABLE", false), "enable authentication for websocket server")
wsAddr := flag.String("ws_addr", lookupEnvOrString("WS_ADDR", "localhost"), "websocket server address (domain or ip)")
wsPort := flag.String("ws_port", lookupEnvOrString("WS_PORT", ":8080"), "websocket server port")
wsTlsPort := flag.String("ws_tls_port", lookupEnvOrString("WS_TLS_PORT", ":8081"), "websocket tls server port")
wsNoTls := flag.Bool("ws_no_tls", lookupEnvOrBool("WS_NO_TLS", false), "connects to websocket server without tls")
wsRoute := flag.String("ws_route", lookupEnvOrString("WS_ROUTE", "/ws/controller"), "websocket server route")
wsTlsEnable := flag.Bool("ws_tls_enable", lookupEnvOrBool("WS_TLS_ENABLE", false), "access websocket via tls protocol (wss)")
wsSkipTlsVerify := flag.Bool("ws_skip_tls_verify", lookupEnvOrBool("WS_SKIP_TLS_VERIFY", false), "skip tls verification for websocket server")
@ -56,6 +60,10 @@ func NewConfig() *Config {
os.Exit(0)
}
if *wsNoTls && !*wsTlsEnable {
log.Fatalf("You must configure at least one connection to the websocket server")
}
ctx := context.TODO()
return &Config{
@ -73,6 +81,8 @@ func NewConfig() *Config {
TlsEnable: *wsTlsEnable,
SkipTlsVerify: *wsSkipTlsVerify,
Ctx: ctx,
TlsPort: *wsTlsPort,
NoTls: *wsNoTls,
},
}
}

View File

@ -16,6 +16,10 @@ type Config struct {
Auth bool // server auth enable/disable
ControllerEID string // controller endpoint id
Tls bool // enable/diable websockets server tls
TlsPort string
NoTls bool
FullChain string
PrivateKey string
Nats Nats
}
@ -47,7 +51,11 @@ func NewConfig() Config {
flPort := flag.String("port", lookupEnvOrString("SERVER_PORT", ":8080"), "Server port")
flAuth := flag.Bool("auth", lookupEnvOrBool("SERVER_AUTH_ENABLE", false), "Server auth enable/disable")
flControllerEid := flag.String("controller-eid", lookupEnvOrString("CONTROLLER_EID", "oktopusController"), "Controller eid")
flTls := flag.Bool("tls", lookupEnvOrBool("SERVER_TLS_ENABLE", false), "Enable/diable websockets server tls")
flTls := flag.Bool("tls", lookupEnvOrBool("SERVER_TLS_ENABLE", false), "Enable/disable websockets server tls")
flTlsPort := flag.String("tls_port", lookupEnvOrString("SERVER_TLS_PORT", ":8081"), "Server Port to use if TLS is enabled")
flNoTls := flag.Bool("no_tls", lookupEnvOrBool("SERVER_NO_TLS", false), "Disable/enable websockets serevr without tls")
flFullchain := flag.String("fullchain_path", lookupEnvOrString("FULL_CHAIN_PATH", "cert.pem"), "Fullchain file path")
flPrivKey := flag.String("privkey_path", lookupEnvOrString("PRIVATE_KEY_PATH", "key.pem"), "Private key file path")
flHelp := flag.Bool("help", false, "Help")
flag.Parse()
/* -------------------------------------------------------------------------- */
@ -57,13 +65,21 @@ func NewConfig() Config {
os.Exit(0)
}
if *flNoTls && !*flTls {
log.Fatalf("You must at least choose one between tls and no_tls configs")
}
ctx := context.TODO()
return Config{
Port: *flPort,
TlsPort: *flTlsPort,
NoTls: *flNoTls,
Auth: *flAuth,
ControllerEID: *flControllerEid,
Tls: *flTls,
FullChain: *flFullchain,
PrivateKey: *flPrivKey,
Nats: Nats{
Url: *natsUrl,
Name: *natsName,

View File

@ -30,12 +30,16 @@ func StartNewServer(c config.Config, kv jetstream.KeyValue) {
go func() {
if c.Tls {
log.Println("Websockets server running with TLS")
err := http.ListenAndServeTLS(c.Port, "cert.pem", "key.pem", r)
log.Println("Websockets server running with TLS at port", c.TlsPort)
err := http.ListenAndServeTLS(c.TlsPort, c.FullChain, c.PrivateKey, r)
if err != nil {
log.Fatal("ListenAndServeTLS: ", err)
}
} else {
}
}()
go func() {
if !c.NoTls {
log.Println("Websockets server running at port", c.Port)
err := http.ListenAndServe(c.Port, r)
if err != nil {