From 9f05b8f364b7fb8e967ecc23e35ef0aa0a54e558 Mon Sep 17 00:00:00 2001 From: leandrofars Date: Thu, 29 Feb 2024 19:35:11 -0300 Subject: [PATCH] refact(mqtt): code organization + own readme --- backend/services/mqtt/.env | 0 backend/services/mqtt/.gitignore | 4 + backend/services/mqtt/README.md | 4 + backend/services/mqtt/cmd/auth.json | 54 ---- backend/services/mqtt/cmd/main.go | 272 ------------------ backend/services/mqtt/cmd/run.sh | 1 - backend/services/mqtt/go.mod | 2 + backend/services/mqtt/go.sum | 4 + .../services/mqtt/internal/config/config.go | 147 ++++++++++ .../mqtt/internal/listeners/http/http.go | 21 ++ .../mqtt/internal/listeners/listeners.go | 79 +++++ .../mqtt/internal/listeners/mqtt/hook.go | 88 ++++++ .../mqtt/internal/listeners/mqtt/mqtt.go | 136 +++++++++ .../services/mqtt/internal/listeners/ws/ws.go | 21 ++ 14 files changed, 506 insertions(+), 327 deletions(-) create mode 100644 backend/services/mqtt/.env create mode 100644 backend/services/mqtt/README.md delete mode 100644 backend/services/mqtt/cmd/auth.json delete mode 100644 backend/services/mqtt/cmd/main.go delete mode 100644 backend/services/mqtt/cmd/run.sh create mode 100644 backend/services/mqtt/internal/config/config.go create mode 100644 backend/services/mqtt/internal/listeners/http/http.go create mode 100644 backend/services/mqtt/internal/listeners/listeners.go create mode 100644 backend/services/mqtt/internal/listeners/mqtt/hook.go create mode 100644 backend/services/mqtt/internal/listeners/mqtt/mqtt.go create mode 100644 backend/services/mqtt/internal/listeners/ws/ws.go diff --git a/backend/services/mqtt/.env b/backend/services/mqtt/.env new file mode 100644 index 0000000..e69de29 diff --git a/backend/services/mqtt/.gitignore b/backend/services/mqtt/.gitignore index e9ee887..d07460d 100644 --- a/backend/services/mqtt/.gitignore +++ b/backend/services/mqtt/.gitignore @@ -4,3 +4,7 @@ cmd/mqtt auth.prod.json run.prod.sh mochi +*.pem +*.crt +*.key +*.local \ No newline at end of file diff --git a/backend/services/mqtt/README.md b/backend/services/mqtt/README.md new file mode 100644 index 0000000..ee92491 --- /dev/null +++ b/backend/services/mqtt/README.md @@ -0,0 +1,4 @@ +- This MQTT broker runs with https://github.com/mochi-mqtt/server +- It's compatible with MQTTv5 and MQTTv3.1.1 + +For more info access https://github.com/mochi-mqtt/server and/or execute "go run cmd/mqtt/main.go --help" \ No newline at end of file diff --git a/backend/services/mqtt/cmd/auth.json b/backend/services/mqtt/cmd/auth.json deleted file mode 100644 index f05253a..0000000 --- a/backend/services/mqtt/cmd/auth.json +++ /dev/null @@ -1,54 +0,0 @@ -{ - "auth": [ - { - "username": "leandro", - "password": "leandro", - "allow": true - }, - { - "username": "steve", - "password": "steve", - "allow": true - }, - { - "username": "root", - "password": "root", - "allow": true - }, - { - "remote": "*", - "allow": false - }, - { - "remote": "*", - "allow": false - } - ], - "acl": [ - { - "remote": "*" - }, - { - "username": "leandro", - "filters": { - "oktopus/+/agent/+": 1, - "oktopus/+/controller/+": 2, - "oktopus/+/get/+": 2 - } - }, - { - "username": "steve", - "filters": { - "oktopus/+/agent/+": 1, - "oktopus/+/controller/+": 2, - "oktopus/+/get/+": 2 - } - }, - { - "username": "root", - "filters": { - "#": 3 - } - } - ] -} \ No newline at end of file diff --git a/backend/services/mqtt/cmd/main.go b/backend/services/mqtt/cmd/main.go deleted file mode 100644 index 3383a5d..0000000 --- a/backend/services/mqtt/cmd/main.go +++ /dev/null @@ -1,272 +0,0 @@ -package main - -import ( - "bytes" - "crypto/tls" - "flag" - "io/ioutil" - "log" - "os" - "os/signal" - "strings" - "syscall" - - rv8 "github.com/go-redis/redis/v8" - "github.com/mochi-co/mqtt/v2" - "github.com/mochi-co/mqtt/v2/hooks/storage/redis" - "github.com/mochi-co/mqtt/v2/packets" - "github.com/rs/zerolog" - - "github.com/mochi-co/mqtt/v2/hooks/auth" - "github.com/mochi-co/mqtt/v2/listeners" -) - -var ( - //TODO: create custom mqtt server options - server = mqtt.New(&mqtt.Options{ - //Capabilities: &mqtt.Capabilities{ - // ServerKeepAlive: 10000, - // ReceiveMaximum: math.MaxUint16, - // MaximumMessageExpiryInterval: math.MaxUint32, - // MaximumSessionExpiryInterval: math.MaxUint32, // maximum number of seconds to keep disconnected sessions - // MaximumClientWritesPending: 65536, - // MaximumPacketSize: 0, - // MaximumQos: 2, - //}, - }) -) - -func main() { - tcpAddr := flag.String("tcp", ":1883", "network address for TCP listener") - redisAddr := flag.String("redis", "172.17.0.2:6379", "host address of redis db") - redisPassword := flag.String("redis_passwd", "", "redis db password") - wsAddr := flag.String("ws", "", "network address for Websocket listener") - infoAddr := flag.String("info", ":8080", "network address for web info dashboard listener") - path := flag.String("path", "", "path to data auth file") - fullchain := flag.String("full_chain_path", "", "path to fullchain.pem certificate") - privkey := flag.String("private_key_path", "", "path to privkey.pem certificate") - logLevel := flag.Int("logLevel", 1, "log level, default is INFO, 0 value is DEBUG") - - flag.Parse() - - if *logLevel > 2 || *logLevel < 0 { - log.Println("Log level not valid, choose a number between 0 and 7") - log.Println("For more info access zeroLog documentation: https://github.com/rs/zerolog") - os.Exit(1) - } - - sigs := make(chan os.Signal, 1) - done := make(chan bool, 1) - signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) - go func() { - <-sigs - done <- true - }() - - serverForTLS := mqtt.New(&mqtt.Options{}) - - lTls := serverForTLS.Log.Level(zerolog.Level(*logLevel)) - serverForTLS.Log = &lTls - - l := server.Log.Level(zerolog.Level(*logLevel)) - server.Log = &l - - if *path != "" { - data, err := os.ReadFile(*path) - if err != nil { - log.Fatal(err) - } - - err = server.AddHook(new(auth.Hook), &auth.Options{ - Data: data, - }) - if err != nil { - log.Fatal(err) - } - } else { - err := server.AddHook(new(auth.AllowHook), nil) - if err != nil { - log.Fatal(err) - } - } - - if *fullchain != "" && *privkey != "" { - chain, err := ioutil.ReadFile(*fullchain) - if err != nil { - log.Fatal(err) - } - - pv, err := ioutil.ReadFile(*privkey) - if err != nil { - log.Fatal(err) - } - - cert, err := tls.X509KeyPair(chain, pv) - if err != nil { - log.Fatal(err) - } - - //Basic TLS Config - tlsConfig := &tls.Config{ - Certificates: []tls.Certificate{cert}, - } - - if *tcpAddr != "" { - tcp := listeners.NewTCP("t1", ":8883", &listeners.Config{ - TLSConfig: tlsConfig, - }) - err := serverForTLS.AddListener(tcp) - if err != nil { - log.Fatal(err) - } - } - - err = serverForTLS.AddHook(new(MyHook), map[string]any{}) - if err != nil { - log.Fatal(err) - } - - log.Println("Mqtt Broker is running with TLS at port 8883") - } - if *tcpAddr != "" { - //tcp := listeners.NewTCP("t1", *tcpAddr, &listeners.Config{ - // TLSConfig: tlsConfig, - //}) - tcp := listeners.NewTCP("t1", ":1883", nil) - err := server.AddListener(tcp) - if err != nil { - log.Fatal(err) - } - } - log.Println("Mqtt Broker is running without TLS at port 1883") - - if *wsAddr != "" { - ws := listeners.NewWebsocket("ws1", *wsAddr, nil) - err := server.AddListener(ws) - if err != nil { - log.Fatal(err) - } - log.Println("Websockets is running without TLS at port " + *wsAddr) - } - - if *infoAddr != "" { - stats := listeners.NewHTTPStats("stats", *infoAddr, nil, server.Info) - err := server.AddListener(stats) - if err != nil { - log.Fatal(err) - } - } - - err := server.AddHook(new(MyHook), map[string]any{}) - if err != nil { - log.Fatal(err) - } - - if *redisAddr != "" { - err = server.AddHook(new(redis.Hook), &redis.Options{ - Options: &rv8.Options{ - Addr: *redisAddr, // default redis address - Password: *redisPassword, // your password - DB: 0, // your redis db - }, - }) - if err != nil { - log.Fatal(err) - } - } - - go func() { - err := server.Serve() - if err != nil { - log.Fatal(err) - } - err = serverForTLS.Serve() - if err != nil { - log.Fatal(err) - } - }() - - <-done - server.Log.Warn().Msg("caught signal, stopping...") - server.Close() - server.Log.Info().Msg("main.go finished") - -} - -type MyHook struct { - mqtt.HookBase -} - -func (h *MyHook) ID() string { - return "events-controller" -} - -func (h *MyHook) Provides(b byte) bool { - return bytes.Contains([]byte{ - mqtt.OnSubscribed, - mqtt.OnDisconnect, - mqtt.OnClientExpired, - mqtt.OnPacketEncode, - }, []byte{b}) -} - -func (h *MyHook) Init(config any) error { - h.Log.Info().Msg("initialised") - return nil -} - -func (h *MyHook) OnClientExpired(cl *mqtt.Client) { - log.Printf("Client id %s expired", cl.ID) -} - -func (h *MyHook) OnDisconnect(cl *mqtt.Client, err error, expire bool) { - var clUser string - if len(cl.Properties.Props.User) > 0 { - clUser = cl.Properties.Props.User[0].Val - } - - if clUser != "" { - err := server.Publish("oktopus/v1/status/"+clUser, []byte("1"), false, 1) - if err != nil { - log.Println("server publish error: ", err) - } - } -} - -func (h *MyHook) OnSubscribed(cl *mqtt.Client, pk packets.Packet, reasonCodes []byte) { - // Verifies if it's a device who is subscribed - if strings.Contains(pk.Filters[0].Filter, "oktopus/v1/agent") { - var clUser string - - if len(cl.Properties.Props.User) > 0 { - clUser = cl.Properties.Props.User[0].Val - } - - if clUser != "" { - cl.Properties.Will = mqtt.Will{ - Qos: 1, - TopicName: "oktopus/v1/status/" + clUser, - Payload: []byte("1"), - Retain: false, - } - log.Println("new device:", clUser) - err := server.Publish("oktopus/v1/status/"+clUser, []byte("0"), false, 1) - if err != nil { - log.Println("server publish error: ", err) - } - } - - } -} - -func (h *MyHook) OnPacketEncode(cl *mqtt.Client, pk packets.Packet) packets.Packet { - var clUser string - if len(cl.Properties.Props.User) > 0 { - clUser = cl.Properties.Props.User[0].Val - } - if pk.FixedHeader.Type == packets.Connack { - pk.Properties.User = []packets.UserProperty{{Key: "subscribe-topic", Val: "oktopus/v1/agent/" + clUser}} - } - - return pk -} diff --git a/backend/services/mqtt/cmd/run.sh b/backend/services/mqtt/cmd/run.sh deleted file mode 100644 index 186907d..0000000 --- a/backend/services/mqtt/cmd/run.sh +++ /dev/null @@ -1 +0,0 @@ -go run . -path auth.json diff --git a/backend/services/mqtt/go.mod b/backend/services/mqtt/go.mod index 3e5c20c..381107f 100644 --- a/backend/services/mqtt/go.mod +++ b/backend/services/mqtt/go.mod @@ -11,7 +11,9 @@ require ( require ( github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + github.com/google/uuid v1.6.0 // indirect github.com/gorilla/websocket v1.5.0 // indirect + github.com/joho/godotenv v1.5.1 // indirect github.com/mattn/go-colorable v0.1.12 // indirect github.com/mattn/go-isatty v0.0.14 // indirect github.com/rs/xid v1.4.0 // indirect diff --git a/backend/services/mqtt/go.sum b/backend/services/mqtt/go.sum index 58e27f6..a4dd6fa 100644 --- a/backend/services/mqtt/go.sum +++ b/backend/services/mqtt/go.sum @@ -10,9 +10,13 @@ github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWo github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI= github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/jinzhu/copier v0.3.5 h1:GlvfUwHk62RokgqVNvYsku0TATCF7bAHVwEXoBh3iJg= +github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= +github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= github.com/mattn/go-colorable v0.1.12 h1:jF+Du6AlPIjs2BiUiQlKOX0rt3SujHxPnksPKZbaA40= github.com/mattn/go-colorable v0.1.12/go.mod h1:u5H1YNBxpqRaxsYJYSkiCWKzEfiAb1Gb520KVy5xxl4= github.com/mattn/go-isatty v0.0.14 h1:yVuAays6BHfxijgZPzw+3Zlu5yQgKGP2/hcQbHb7S9Y= diff --git a/backend/services/mqtt/internal/config/config.go b/backend/services/mqtt/internal/config/config.go new file mode 100644 index 0000000..7c79b22 --- /dev/null +++ b/backend/services/mqtt/internal/config/config.go @@ -0,0 +1,147 @@ +package config + +import ( + "flag" + "log" + "os" + "strconv" + + "github.com/joho/godotenv" +) + +const LOCAL_ENV = ".env.local" + +type Config struct { + MqttPort string + Tls bool + Fullchain string + Privkey string + AuthFile string + RedisEnable bool + RedisAddr string + RedisPassword string + WsEnable bool + WsPort string + HttpEnable bool + HttpPort string + LogLevel int +} + +//TODO: debug websocket and http listeners + +func NewConfig() Config { + + loadEnvVariables() + + /* + App variables priority: + 1º - Flag through command line. + 2º - Env variables. + 3º - Default flag value. + */ + + mqttPort := flag.String("mqtt_port", lookupEnvOrString("MQTT_PORT", ":1883"), "port for MQTT listener") + tls := flag.Bool("mqtt_tls", lookupEnvOrBool("MQTT_TLS", false), "enable/disable TLS") + fullchain := flag.String("full_chain_path", lookupEnvOrString("FULL_CHAIN_PATH", ""), "path to fullchain.pem certificate") + privkey := flag.String("private_key_path", lookupEnvOrString("PRIVATE_KEY_PATH", ""), "path to privkey.pem certificate") + authFile := flag.String("auth_file_path", lookupEnvOrString("AUTH_FILE_PATH", ""), "path to MQTT RBAC auth file") + redisEnable := flag.Bool("redis_enable", lookupEnvOrBool("REDIS_ENABLE", false), "enable/disable Redis db") + redisAddr := flag.String("redis_addr", lookupEnvOrString("REDIS_ADDR", "172.17.0.2:6379"), "address of redis db") + redisPassword := flag.String("redis_passwd", lookupEnvOrString("REDIS_PASSWD", ""), "redis db password") + wsEnable := flag.Bool("ws_enable", lookupEnvOrBool("WS_ENABLE", false), "enable/disable Websocket listener") + wsPort := flag.String("ws_port", lookupEnvOrString("WS_PORT", ":80"), "port for Websocket listener") + httpEnable := flag.Bool("http_enable", lookupEnvOrBool("HTTP_ENABLE", false), "enable/disable HTTP listener of mqtt metrics") + httpPort := flag.String("http_port", lookupEnvOrString("HTTP_PORT", ":8080"), "port for HTTP listener of mqtt metrics") + logLevel := flag.Int("log_level", lookupEnvOrInt("LOG_LEVEL", 1), "0=DEBUG, 1=INFO, 2=WARNING, 3=ERROR") + + flag.Parse() + flHelp := flag.Bool("help", false, "Help") + + if *flHelp { + flag.Usage() + os.Exit(0) + } + + conf := Config{ + MqttPort: *mqttPort, + Tls: *tls, + Fullchain: *fullchain, + Privkey: *privkey, + AuthFile: *authFile, + RedisEnable: *redisEnable, + RedisAddr: *redisAddr, + RedisPassword: *redisPassword, + WsEnable: *wsEnable, + WsPort: *wsPort, + HttpEnable: *httpEnable, + HttpPort: *httpPort, + LogLevel: *logLevel, + } + + conf.validate() + + return conf +} + +func (c *Config) validate() { + + valid := true + + if c.Tls && (c.Fullchain == "" || c.Privkey == "") { + log.Println("TLS is enabled, but fullchain and privkey are not set") + valid = false + } + if c.LogLevel > 3 || c.LogLevel < 0 { + log.Println("Log level not valid, choose a number between 0 and 3") + valid = false + } + + if !valid { + log.Println("For more info execute --help") + os.Exit(1) + } + +} + +func loadEnvVariables() { + err := godotenv.Load() + + if _, err := os.Stat(LOCAL_ENV); err == nil { + _ = godotenv.Overload(LOCAL_ENV) + log.Printf("Loaded variables from '%s'", LOCAL_ENV) + } else { + log.Println("Loaded variables from '.env'") + } + if err != nil { + log.Println("Error to load environment variables:", err) + } +} + +func lookupEnvOrString(key string, defaultVal string) string { + if val, _ := os.LookupEnv(key); val != "" { + return val + } + return defaultVal +} + +func lookupEnvOrInt(key string, defaultVal int) int { + if val, _ := os.LookupEnv(key); val != "" { + v, err := strconv.Atoi(val) + if err != nil { + log.Fatalf("LookupEnvOrInt[%s]: %v", key, err) + } + return v + } + return defaultVal +} + +func lookupEnvOrBool(key string, defaultVal bool) bool { + if val, _ := os.LookupEnv(key); val != "" { + v, err := strconv.ParseBool(val) + if err != nil { + log.Fatalf("LookupEnvOrBool[%s]: %v", key, err) + } + return v + } + return defaultVal +} diff --git a/backend/services/mqtt/internal/listeners/http/http.go b/backend/services/mqtt/internal/listeners/http/http.go new file mode 100644 index 0000000..11b1e25 --- /dev/null +++ b/backend/services/mqtt/internal/listeners/http/http.go @@ -0,0 +1,21 @@ +package http + +import ( + "log" + + "github.com/google/uuid" + "github.com/mochi-co/mqtt/v2" + "github.com/mochi-co/mqtt/v2/listeners" +) + +type Http struct { + HttpPort string +} + +func (h *Http) Start(server *mqtt.Server) { + stats := listeners.NewHTTPStats(uuid.NewString(), h.HttpPort, nil, server.Info) + err := server.AddListener(stats) + if err != nil { + log.Fatal(err) + } +} diff --git a/backend/services/mqtt/internal/listeners/listeners.go b/backend/services/mqtt/internal/listeners/listeners.go new file mode 100644 index 0000000..cd7922e --- /dev/null +++ b/backend/services/mqtt/internal/listeners/listeners.go @@ -0,0 +1,79 @@ +package listeners + +import ( + "broker/internal/config" + "broker/internal/listeners/http" + broker "broker/internal/listeners/mqtt" + "broker/internal/listeners/ws" + "sync" + + "github.com/mochi-co/mqtt/v2" + "github.com/rs/zerolog" +) + +func StartServers(c config.Config) { + + server := mqtt.New(&mqtt.Options{}) + + var wg sync.WaitGroup + wg.Add(3) + + go func() { + mqttServer := newMqttServer(c) + mqttServer.Start(server) + wg.Done() + }() + + go func() { + if c.WsEnable { + wsServer := newWsServer(c) + wsServer.Start(server) + } + wg.Done() + }() + + go func() { + if c.HttpEnable { + httpServer := newHttpServer(c) + httpServer.Start(server) + } + wg.Done() + }() + + server.Log.Level(zerolog.Level(c.LogLevel)) + + wg.Wait() + + err := server.Serve() + if err != nil { + server.Log.Fatal().Err(err).Msg("server error") + } +} + +func newMqttServer(c config.Config) *broker.Mqtt { + return &broker.Mqtt{ + Port: c.MqttPort, + Tls: c.Tls, + Fullchain: c.Fullchain, + Privkey: c.Privkey, + AuthFile: c.AuthFile, + Redis: broker.Redis{ + RedisEnable: c.RedisEnable, + RedisAddr: c.RedisAddr, + RedisPassword: c.RedisPassword, + }, + LogLevel: c.LogLevel, + } +} + +func newWsServer(c config.Config) *ws.Ws { + return &ws.Ws{ + WsPort: c.WsPort, + } +} + +func newHttpServer(c config.Config) *http.Http { + return &http.Http{ + HttpPort: c.HttpPort, + } +} diff --git a/backend/services/mqtt/internal/listeners/mqtt/hook.go b/backend/services/mqtt/internal/listeners/mqtt/hook.go new file mode 100644 index 0000000..db6373e --- /dev/null +++ b/backend/services/mqtt/internal/listeners/mqtt/hook.go @@ -0,0 +1,88 @@ +package mqtt + +import ( + "bytes" + "log" + "strings" + + "github.com/mochi-co/mqtt/v2" + "github.com/mochi-co/mqtt/v2/packets" +) + +type MyHook struct { + mqtt.HookBase +} + +func (h *MyHook) ID() string { + return "events-controller" +} + +func (h *MyHook) Provides(b byte) bool { + return bytes.Contains([]byte{ + mqtt.OnSubscribed, + mqtt.OnDisconnect, + mqtt.OnClientExpired, + mqtt.OnPacketEncode, + }, []byte{b}) +} + +func (h *MyHook) Init(config any) error { + h.Log.Info().Msg("initialised") + return nil +} + +func (h *MyHook) OnClientExpired(cl *mqtt.Client) { + log.Printf("Client id %s expired", cl.ID) +} + +func (h *MyHook) OnDisconnect(cl *mqtt.Client, err error, expire bool) { + var clUser string + if len(cl.Properties.Props.User) > 0 { + clUser = cl.Properties.Props.User[0].Val + } + + if clUser != "" { + err := server.Publish("oktopus/v1/status/"+clUser, []byte("1"), false, 1) + if err != nil { + log.Println("server publish error: ", err) + } + } +} + +func (h *MyHook) OnSubscribed(cl *mqtt.Client, pk packets.Packet, reasonCodes []byte) { + //Verifies if it's a device who is subscribed + if strings.Contains(pk.Filters[0].Filter, "oktopus/v1/agent") { + var clUser string + + if len(cl.Properties.Props.User) > 0 { + clUser = cl.Properties.Props.User[0].Val + } + + if clUser != "" { + cl.Properties.Will = mqtt.Will{ + Qos: 1, + TopicName: "oktopus/v1/status/" + clUser, + Payload: []byte("1"), + Retain: false, + } + log.Println("new device:", clUser) + err := server.Publish("oktopus/v1/status/"+clUser, []byte("0"), false, 1) + if err != nil { + log.Println("server publish error: ", err) + } + } + + } +} + +func (h *MyHook) OnPacketEncode(cl *mqtt.Client, pk packets.Packet) packets.Packet { + var clUser string + if len(cl.Properties.Props.User) > 0 { + clUser = cl.Properties.Props.User[0].Val + } + if pk.FixedHeader.Type == packets.Connack { + pk.Properties.User = []packets.UserProperty{{Key: "subscribe-topic", Val: "oktopus/v1/agent/" + clUser}} + } + + return pk +} diff --git a/backend/services/mqtt/internal/listeners/mqtt/mqtt.go b/backend/services/mqtt/internal/listeners/mqtt/mqtt.go new file mode 100644 index 0000000..84a460f --- /dev/null +++ b/backend/services/mqtt/internal/listeners/mqtt/mqtt.go @@ -0,0 +1,136 @@ +package mqtt + +import ( + "crypto/tls" + "io/ioutil" + "log" + "os" + + rv8 "github.com/go-redis/redis/v8" + "github.com/google/uuid" + "github.com/mochi-co/mqtt/v2" + "github.com/mochi-co/mqtt/v2/hooks/auth" + "github.com/mochi-co/mqtt/v2/hooks/storage/redis" + "github.com/mochi-co/mqtt/v2/listeners" + "github.com/rs/zerolog" +) + +var ( + server *mqtt.Server +) + +type Mqtt struct { + Port string + Tls bool + Fullchain string + Privkey string + AuthFile string + Redis Redis + LogLevel int +} + +type Redis struct { + RedisEnable bool + RedisAddr string + RedisPassword string +} + +func (m *Mqtt) Start(server *mqtt.Server) { + + defineSeverLog(server, m.LogLevel) + defineServerAuth(server, m.AuthFile) + + var tlsConfig *listeners.Config + if m.Tls { + tlsConfig = defineServerTls(m.Fullchain, m.Privkey) + } + + createListener(server, m.Port, tlsConfig) + addHooks(server, m.Redis) +} + +func addHooks(server *mqtt.Server, redisConf Redis) { + + err := server.AddHook(new(MyHook), map[string]any{}) + if err != nil { + log.Fatal(err) + } + + if redisConf.RedisEnable { + if redisConf.RedisAddr != "" { + err = server.AddHook(new(redis.Hook), &redis.Options{ + Options: &rv8.Options{ + Addr: redisConf.RedisAddr, + Password: redisConf.RedisPassword, + DB: 0, + }, + }) + if err != nil { + log.Fatal(err) + } + } + } +} + +func createListener(server *mqtt.Server, port string, listenersConf *listeners.Config) { + tcp := listeners.NewTCP(uuid.NewString(), port, listenersConf) + + err := server.AddListener(tcp) + if err != nil { + log.Fatal(err) + } +} + +func defineServerTls(fullchain, privkey string) *listeners.Config { + if fullchain != "" && privkey != "" { + chain, err := ioutil.ReadFile(fullchain) + if err != nil { + log.Fatal(err) + } + + pv, err := ioutil.ReadFile(privkey) + if err != nil { + log.Fatal(err) + } + + cert, err := tls.X509KeyPair(chain, pv) + if err != nil { + log.Fatal(err) + } + + tlsConfig := &tls.Config{ + Certificates: []tls.Certificate{cert}, + } + + return &listeners.Config{ + TLSConfig: tlsConfig, + } + + } + return nil +} + +func defineServerAuth(server *mqtt.Server, authFile string) { + if authFile != "" { + data, err := os.ReadFile(authFile) + if err != nil { + log.Fatal(err) + } + err = server.AddHook(new(auth.Hook), &auth.Options{ + Data: data, + }) + if err != nil { + log.Fatal(err) + } + } else { + err := server.AddHook(new(auth.AllowHook), nil) + if err != nil { + log.Fatal(err) + } + } +} + +func defineSeverLog(server *mqtt.Server, logLevel int) { + l := server.Log.Level(zerolog.Level(logLevel)) + server.Log = &l +} diff --git a/backend/services/mqtt/internal/listeners/ws/ws.go b/backend/services/mqtt/internal/listeners/ws/ws.go new file mode 100644 index 0000000..a581e4d --- /dev/null +++ b/backend/services/mqtt/internal/listeners/ws/ws.go @@ -0,0 +1,21 @@ +package ws + +import ( + "log" + + "github.com/google/uuid" + "github.com/mochi-co/mqtt/v2" + "github.com/mochi-co/mqtt/v2/listeners" +) + +type Ws struct { + WsPort string +} + +func (w *Ws) Start(server *mqtt.Server) { + ws := listeners.NewWebsocket(uuid.NewString(), w.WsPort, nil) + err := server.AddListener(ws) + if err != nil { + log.Fatal(err) + } +}