refact(mqtt): code organization + own readme

This commit is contained in:
leandrofars 2024-02-29 19:35:11 -03:00
parent 73a9376a33
commit 9f05b8f364
14 changed files with 506 additions and 327 deletions

View File

View File

@ -4,3 +4,7 @@ cmd/mqtt
auth.prod.json
run.prod.sh
mochi
*.pem
*.crt
*.key
*.local

View File

@ -0,0 +1,4 @@
- This MQTT broker runs with https://github.com/mochi-mqtt/server
- It's compatible with MQTTv5 and MQTTv3.1.1
For more info access https://github.com/mochi-mqtt/server and/or execute "go run cmd/mqtt/main.go --help"

View File

@ -1,54 +0,0 @@
{
"auth": [
{
"username": "leandro",
"password": "leandro",
"allow": true
},
{
"username": "steve",
"password": "steve",
"allow": true
},
{
"username": "root",
"password": "root",
"allow": true
},
{
"remote": "*",
"allow": false
},
{
"remote": "*",
"allow": false
}
],
"acl": [
{
"remote": "*"
},
{
"username": "leandro",
"filters": {
"oktopus/+/agent/+": 1,
"oktopus/+/controller/+": 2,
"oktopus/+/get/+": 2
}
},
{
"username": "steve",
"filters": {
"oktopus/+/agent/+": 1,
"oktopus/+/controller/+": 2,
"oktopus/+/get/+": 2
}
},
{
"username": "root",
"filters": {
"#": 3
}
}
]
}

View File

@ -1,272 +0,0 @@
package main
import (
"bytes"
"crypto/tls"
"flag"
"io/ioutil"
"log"
"os"
"os/signal"
"strings"
"syscall"
rv8 "github.com/go-redis/redis/v8"
"github.com/mochi-co/mqtt/v2"
"github.com/mochi-co/mqtt/v2/hooks/storage/redis"
"github.com/mochi-co/mqtt/v2/packets"
"github.com/rs/zerolog"
"github.com/mochi-co/mqtt/v2/hooks/auth"
"github.com/mochi-co/mqtt/v2/listeners"
)
var (
//TODO: create custom mqtt server options
server = mqtt.New(&mqtt.Options{
//Capabilities: &mqtt.Capabilities{
// ServerKeepAlive: 10000,
// ReceiveMaximum: math.MaxUint16,
// MaximumMessageExpiryInterval: math.MaxUint32,
// MaximumSessionExpiryInterval: math.MaxUint32, // maximum number of seconds to keep disconnected sessions
// MaximumClientWritesPending: 65536,
// MaximumPacketSize: 0,
// MaximumQos: 2,
//},
})
)
func main() {
tcpAddr := flag.String("tcp", ":1883", "network address for TCP listener")
redisAddr := flag.String("redis", "172.17.0.2:6379", "host address of redis db")
redisPassword := flag.String("redis_passwd", "", "redis db password")
wsAddr := flag.String("ws", "", "network address for Websocket listener")
infoAddr := flag.String("info", ":8080", "network address for web info dashboard listener")
path := flag.String("path", "", "path to data auth file")
fullchain := flag.String("full_chain_path", "", "path to fullchain.pem certificate")
privkey := flag.String("private_key_path", "", "path to privkey.pem certificate")
logLevel := flag.Int("logLevel", 1, "log level, default is INFO, 0 value is DEBUG")
flag.Parse()
if *logLevel > 2 || *logLevel < 0 {
log.Println("Log level not valid, choose a number between 0 and 7")
log.Println("For more info access zeroLog documentation: https://github.com/rs/zerolog")
os.Exit(1)
}
sigs := make(chan os.Signal, 1)
done := make(chan bool, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigs
done <- true
}()
serverForTLS := mqtt.New(&mqtt.Options{})
lTls := serverForTLS.Log.Level(zerolog.Level(*logLevel))
serverForTLS.Log = &lTls
l := server.Log.Level(zerolog.Level(*logLevel))
server.Log = &l
if *path != "" {
data, err := os.ReadFile(*path)
if err != nil {
log.Fatal(err)
}
err = server.AddHook(new(auth.Hook), &auth.Options{
Data: data,
})
if err != nil {
log.Fatal(err)
}
} else {
err := server.AddHook(new(auth.AllowHook), nil)
if err != nil {
log.Fatal(err)
}
}
if *fullchain != "" && *privkey != "" {
chain, err := ioutil.ReadFile(*fullchain)
if err != nil {
log.Fatal(err)
}
pv, err := ioutil.ReadFile(*privkey)
if err != nil {
log.Fatal(err)
}
cert, err := tls.X509KeyPair(chain, pv)
if err != nil {
log.Fatal(err)
}
//Basic TLS Config
tlsConfig := &tls.Config{
Certificates: []tls.Certificate{cert},
}
if *tcpAddr != "" {
tcp := listeners.NewTCP("t1", ":8883", &listeners.Config{
TLSConfig: tlsConfig,
})
err := serverForTLS.AddListener(tcp)
if err != nil {
log.Fatal(err)
}
}
err = serverForTLS.AddHook(new(MyHook), map[string]any{})
if err != nil {
log.Fatal(err)
}
log.Println("Mqtt Broker is running with TLS at port 8883")
}
if *tcpAddr != "" {
//tcp := listeners.NewTCP("t1", *tcpAddr, &listeners.Config{
// TLSConfig: tlsConfig,
//})
tcp := listeners.NewTCP("t1", ":1883", nil)
err := server.AddListener(tcp)
if err != nil {
log.Fatal(err)
}
}
log.Println("Mqtt Broker is running without TLS at port 1883")
if *wsAddr != "" {
ws := listeners.NewWebsocket("ws1", *wsAddr, nil)
err := server.AddListener(ws)
if err != nil {
log.Fatal(err)
}
log.Println("Websockets is running without TLS at port " + *wsAddr)
}
if *infoAddr != "" {
stats := listeners.NewHTTPStats("stats", *infoAddr, nil, server.Info)
err := server.AddListener(stats)
if err != nil {
log.Fatal(err)
}
}
err := server.AddHook(new(MyHook), map[string]any{})
if err != nil {
log.Fatal(err)
}
if *redisAddr != "" {
err = server.AddHook(new(redis.Hook), &redis.Options{
Options: &rv8.Options{
Addr: *redisAddr, // default redis address
Password: *redisPassword, // your password
DB: 0, // your redis db
},
})
if err != nil {
log.Fatal(err)
}
}
go func() {
err := server.Serve()
if err != nil {
log.Fatal(err)
}
err = serverForTLS.Serve()
if err != nil {
log.Fatal(err)
}
}()
<-done
server.Log.Warn().Msg("caught signal, stopping...")
server.Close()
server.Log.Info().Msg("main.go finished")
}
type MyHook struct {
mqtt.HookBase
}
func (h *MyHook) ID() string {
return "events-controller"
}
func (h *MyHook) Provides(b byte) bool {
return bytes.Contains([]byte{
mqtt.OnSubscribed,
mqtt.OnDisconnect,
mqtt.OnClientExpired,
mqtt.OnPacketEncode,
}, []byte{b})
}
func (h *MyHook) Init(config any) error {
h.Log.Info().Msg("initialised")
return nil
}
func (h *MyHook) OnClientExpired(cl *mqtt.Client) {
log.Printf("Client id %s expired", cl.ID)
}
func (h *MyHook) OnDisconnect(cl *mqtt.Client, err error, expire bool) {
var clUser string
if len(cl.Properties.Props.User) > 0 {
clUser = cl.Properties.Props.User[0].Val
}
if clUser != "" {
err := server.Publish("oktopus/v1/status/"+clUser, []byte("1"), false, 1)
if err != nil {
log.Println("server publish error: ", err)
}
}
}
func (h *MyHook) OnSubscribed(cl *mqtt.Client, pk packets.Packet, reasonCodes []byte) {
// Verifies if it's a device who is subscribed
if strings.Contains(pk.Filters[0].Filter, "oktopus/v1/agent") {
var clUser string
if len(cl.Properties.Props.User) > 0 {
clUser = cl.Properties.Props.User[0].Val
}
if clUser != "" {
cl.Properties.Will = mqtt.Will{
Qos: 1,
TopicName: "oktopus/v1/status/" + clUser,
Payload: []byte("1"),
Retain: false,
}
log.Println("new device:", clUser)
err := server.Publish("oktopus/v1/status/"+clUser, []byte("0"), false, 1)
if err != nil {
log.Println("server publish error: ", err)
}
}
}
}
func (h *MyHook) OnPacketEncode(cl *mqtt.Client, pk packets.Packet) packets.Packet {
var clUser string
if len(cl.Properties.Props.User) > 0 {
clUser = cl.Properties.Props.User[0].Val
}
if pk.FixedHeader.Type == packets.Connack {
pk.Properties.User = []packets.UserProperty{{Key: "subscribe-topic", Val: "oktopus/v1/agent/" + clUser}}
}
return pk
}

View File

@ -1 +0,0 @@
go run . -path auth.json

View File

@ -11,7 +11,9 @@ require (
require (
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/joho/godotenv v1.5.1 // indirect
github.com/mattn/go-colorable v0.1.12 // indirect
github.com/mattn/go-isatty v0.0.14 // indirect
github.com/rs/xid v1.4.0 // indirect

View File

@ -10,9 +10,13 @@ github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWo
github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI=
github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/jinzhu/copier v0.3.5 h1:GlvfUwHk62RokgqVNvYsku0TATCF7bAHVwEXoBh3iJg=
github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
github.com/mattn/go-colorable v0.1.12 h1:jF+Du6AlPIjs2BiUiQlKOX0rt3SujHxPnksPKZbaA40=
github.com/mattn/go-colorable v0.1.12/go.mod h1:u5H1YNBxpqRaxsYJYSkiCWKzEfiAb1Gb520KVy5xxl4=
github.com/mattn/go-isatty v0.0.14 h1:yVuAays6BHfxijgZPzw+3Zlu5yQgKGP2/hcQbHb7S9Y=

View File

@ -0,0 +1,147 @@
package config
import (
"flag"
"log"
"os"
"strconv"
"github.com/joho/godotenv"
)
const LOCAL_ENV = ".env.local"
type Config struct {
MqttPort string
Tls bool
Fullchain string
Privkey string
AuthFile string
RedisEnable bool
RedisAddr string
RedisPassword string
WsEnable bool
WsPort string
HttpEnable bool
HttpPort string
LogLevel int
}
//TODO: debug websocket and http listeners
func NewConfig() Config {
loadEnvVariables()
/*
App variables priority:
1º - Flag through command line.
2º - Env variables.
3º - Default flag value.
*/
mqttPort := flag.String("mqtt_port", lookupEnvOrString("MQTT_PORT", ":1883"), "port for MQTT listener")
tls := flag.Bool("mqtt_tls", lookupEnvOrBool("MQTT_TLS", false), "enable/disable TLS")
fullchain := flag.String("full_chain_path", lookupEnvOrString("FULL_CHAIN_PATH", ""), "path to fullchain.pem certificate")
privkey := flag.String("private_key_path", lookupEnvOrString("PRIVATE_KEY_PATH", ""), "path to privkey.pem certificate")
authFile := flag.String("auth_file_path", lookupEnvOrString("AUTH_FILE_PATH", ""), "path to MQTT RBAC auth file")
redisEnable := flag.Bool("redis_enable", lookupEnvOrBool("REDIS_ENABLE", false), "enable/disable Redis db")
redisAddr := flag.String("redis_addr", lookupEnvOrString("REDIS_ADDR", "172.17.0.2:6379"), "address of redis db")
redisPassword := flag.String("redis_passwd", lookupEnvOrString("REDIS_PASSWD", ""), "redis db password")
wsEnable := flag.Bool("ws_enable", lookupEnvOrBool("WS_ENABLE", false), "enable/disable Websocket listener")
wsPort := flag.String("ws_port", lookupEnvOrString("WS_PORT", ":80"), "port for Websocket listener")
httpEnable := flag.Bool("http_enable", lookupEnvOrBool("HTTP_ENABLE", false), "enable/disable HTTP listener of mqtt metrics")
httpPort := flag.String("http_port", lookupEnvOrString("HTTP_PORT", ":8080"), "port for HTTP listener of mqtt metrics")
logLevel := flag.Int("log_level", lookupEnvOrInt("LOG_LEVEL", 1), "0=DEBUG, 1=INFO, 2=WARNING, 3=ERROR")
flag.Parse()
flHelp := flag.Bool("help", false, "Help")
if *flHelp {
flag.Usage()
os.Exit(0)
}
conf := Config{
MqttPort: *mqttPort,
Tls: *tls,
Fullchain: *fullchain,
Privkey: *privkey,
AuthFile: *authFile,
RedisEnable: *redisEnable,
RedisAddr: *redisAddr,
RedisPassword: *redisPassword,
WsEnable: *wsEnable,
WsPort: *wsPort,
HttpEnable: *httpEnable,
HttpPort: *httpPort,
LogLevel: *logLevel,
}
conf.validate()
return conf
}
func (c *Config) validate() {
valid := true
if c.Tls && (c.Fullchain == "" || c.Privkey == "") {
log.Println("TLS is enabled, but fullchain and privkey are not set")
valid = false
}
if c.LogLevel > 3 || c.LogLevel < 0 {
log.Println("Log level not valid, choose a number between 0 and 3")
valid = false
}
if !valid {
log.Println("For more info execute --help")
os.Exit(1)
}
}
func loadEnvVariables() {
err := godotenv.Load()
if _, err := os.Stat(LOCAL_ENV); err == nil {
_ = godotenv.Overload(LOCAL_ENV)
log.Printf("Loaded variables from '%s'", LOCAL_ENV)
} else {
log.Println("Loaded variables from '.env'")
}
if err != nil {
log.Println("Error to load environment variables:", err)
}
}
func lookupEnvOrString(key string, defaultVal string) string {
if val, _ := os.LookupEnv(key); val != "" {
return val
}
return defaultVal
}
func lookupEnvOrInt(key string, defaultVal int) int {
if val, _ := os.LookupEnv(key); val != "" {
v, err := strconv.Atoi(val)
if err != nil {
log.Fatalf("LookupEnvOrInt[%s]: %v", key, err)
}
return v
}
return defaultVal
}
func lookupEnvOrBool(key string, defaultVal bool) bool {
if val, _ := os.LookupEnv(key); val != "" {
v, err := strconv.ParseBool(val)
if err != nil {
log.Fatalf("LookupEnvOrBool[%s]: %v", key, err)
}
return v
}
return defaultVal
}

View File

@ -0,0 +1,21 @@
package http
import (
"log"
"github.com/google/uuid"
"github.com/mochi-co/mqtt/v2"
"github.com/mochi-co/mqtt/v2/listeners"
)
type Http struct {
HttpPort string
}
func (h *Http) Start(server *mqtt.Server) {
stats := listeners.NewHTTPStats(uuid.NewString(), h.HttpPort, nil, server.Info)
err := server.AddListener(stats)
if err != nil {
log.Fatal(err)
}
}

View File

@ -0,0 +1,79 @@
package listeners
import (
"broker/internal/config"
"broker/internal/listeners/http"
broker "broker/internal/listeners/mqtt"
"broker/internal/listeners/ws"
"sync"
"github.com/mochi-co/mqtt/v2"
"github.com/rs/zerolog"
)
func StartServers(c config.Config) {
server := mqtt.New(&mqtt.Options{})
var wg sync.WaitGroup
wg.Add(3)
go func() {
mqttServer := newMqttServer(c)
mqttServer.Start(server)
wg.Done()
}()
go func() {
if c.WsEnable {
wsServer := newWsServer(c)
wsServer.Start(server)
}
wg.Done()
}()
go func() {
if c.HttpEnable {
httpServer := newHttpServer(c)
httpServer.Start(server)
}
wg.Done()
}()
server.Log.Level(zerolog.Level(c.LogLevel))
wg.Wait()
err := server.Serve()
if err != nil {
server.Log.Fatal().Err(err).Msg("server error")
}
}
func newMqttServer(c config.Config) *broker.Mqtt {
return &broker.Mqtt{
Port: c.MqttPort,
Tls: c.Tls,
Fullchain: c.Fullchain,
Privkey: c.Privkey,
AuthFile: c.AuthFile,
Redis: broker.Redis{
RedisEnable: c.RedisEnable,
RedisAddr: c.RedisAddr,
RedisPassword: c.RedisPassword,
},
LogLevel: c.LogLevel,
}
}
func newWsServer(c config.Config) *ws.Ws {
return &ws.Ws{
WsPort: c.WsPort,
}
}
func newHttpServer(c config.Config) *http.Http {
return &http.Http{
HttpPort: c.HttpPort,
}
}

View File

@ -0,0 +1,88 @@
package mqtt
import (
"bytes"
"log"
"strings"
"github.com/mochi-co/mqtt/v2"
"github.com/mochi-co/mqtt/v2/packets"
)
type MyHook struct {
mqtt.HookBase
}
func (h *MyHook) ID() string {
return "events-controller"
}
func (h *MyHook) Provides(b byte) bool {
return bytes.Contains([]byte{
mqtt.OnSubscribed,
mqtt.OnDisconnect,
mqtt.OnClientExpired,
mqtt.OnPacketEncode,
}, []byte{b})
}
func (h *MyHook) Init(config any) error {
h.Log.Info().Msg("initialised")
return nil
}
func (h *MyHook) OnClientExpired(cl *mqtt.Client) {
log.Printf("Client id %s expired", cl.ID)
}
func (h *MyHook) OnDisconnect(cl *mqtt.Client, err error, expire bool) {
var clUser string
if len(cl.Properties.Props.User) > 0 {
clUser = cl.Properties.Props.User[0].Val
}
if clUser != "" {
err := server.Publish("oktopus/v1/status/"+clUser, []byte("1"), false, 1)
if err != nil {
log.Println("server publish error: ", err)
}
}
}
func (h *MyHook) OnSubscribed(cl *mqtt.Client, pk packets.Packet, reasonCodes []byte) {
//Verifies if it's a device who is subscribed
if strings.Contains(pk.Filters[0].Filter, "oktopus/v1/agent") {
var clUser string
if len(cl.Properties.Props.User) > 0 {
clUser = cl.Properties.Props.User[0].Val
}
if clUser != "" {
cl.Properties.Will = mqtt.Will{
Qos: 1,
TopicName: "oktopus/v1/status/" + clUser,
Payload: []byte("1"),
Retain: false,
}
log.Println("new device:", clUser)
err := server.Publish("oktopus/v1/status/"+clUser, []byte("0"), false, 1)
if err != nil {
log.Println("server publish error: ", err)
}
}
}
}
func (h *MyHook) OnPacketEncode(cl *mqtt.Client, pk packets.Packet) packets.Packet {
var clUser string
if len(cl.Properties.Props.User) > 0 {
clUser = cl.Properties.Props.User[0].Val
}
if pk.FixedHeader.Type == packets.Connack {
pk.Properties.User = []packets.UserProperty{{Key: "subscribe-topic", Val: "oktopus/v1/agent/" + clUser}}
}
return pk
}

View File

@ -0,0 +1,136 @@
package mqtt
import (
"crypto/tls"
"io/ioutil"
"log"
"os"
rv8 "github.com/go-redis/redis/v8"
"github.com/google/uuid"
"github.com/mochi-co/mqtt/v2"
"github.com/mochi-co/mqtt/v2/hooks/auth"
"github.com/mochi-co/mqtt/v2/hooks/storage/redis"
"github.com/mochi-co/mqtt/v2/listeners"
"github.com/rs/zerolog"
)
var (
server *mqtt.Server
)
type Mqtt struct {
Port string
Tls bool
Fullchain string
Privkey string
AuthFile string
Redis Redis
LogLevel int
}
type Redis struct {
RedisEnable bool
RedisAddr string
RedisPassword string
}
func (m *Mqtt) Start(server *mqtt.Server) {
defineSeverLog(server, m.LogLevel)
defineServerAuth(server, m.AuthFile)
var tlsConfig *listeners.Config
if m.Tls {
tlsConfig = defineServerTls(m.Fullchain, m.Privkey)
}
createListener(server, m.Port, tlsConfig)
addHooks(server, m.Redis)
}
func addHooks(server *mqtt.Server, redisConf Redis) {
err := server.AddHook(new(MyHook), map[string]any{})
if err != nil {
log.Fatal(err)
}
if redisConf.RedisEnable {
if redisConf.RedisAddr != "" {
err = server.AddHook(new(redis.Hook), &redis.Options{
Options: &rv8.Options{
Addr: redisConf.RedisAddr,
Password: redisConf.RedisPassword,
DB: 0,
},
})
if err != nil {
log.Fatal(err)
}
}
}
}
func createListener(server *mqtt.Server, port string, listenersConf *listeners.Config) {
tcp := listeners.NewTCP(uuid.NewString(), port, listenersConf)
err := server.AddListener(tcp)
if err != nil {
log.Fatal(err)
}
}
func defineServerTls(fullchain, privkey string) *listeners.Config {
if fullchain != "" && privkey != "" {
chain, err := ioutil.ReadFile(fullchain)
if err != nil {
log.Fatal(err)
}
pv, err := ioutil.ReadFile(privkey)
if err != nil {
log.Fatal(err)
}
cert, err := tls.X509KeyPair(chain, pv)
if err != nil {
log.Fatal(err)
}
tlsConfig := &tls.Config{
Certificates: []tls.Certificate{cert},
}
return &listeners.Config{
TLSConfig: tlsConfig,
}
}
return nil
}
func defineServerAuth(server *mqtt.Server, authFile string) {
if authFile != "" {
data, err := os.ReadFile(authFile)
if err != nil {
log.Fatal(err)
}
err = server.AddHook(new(auth.Hook), &auth.Options{
Data: data,
})
if err != nil {
log.Fatal(err)
}
} else {
err := server.AddHook(new(auth.AllowHook), nil)
if err != nil {
log.Fatal(err)
}
}
}
func defineSeverLog(server *mqtt.Server, logLevel int) {
l := server.Log.Level(zerolog.Level(logLevel))
server.Log = &l
}

View File

@ -0,0 +1,21 @@
package ws
import (
"log"
"github.com/google/uuid"
"github.com/mochi-co/mqtt/v2"
"github.com/mochi-co/mqtt/v2/listeners"
)
type Ws struct {
WsPort string
}
func (w *Ws) Start(server *mqtt.Server) {
ws := listeners.NewWebsocket(uuid.NewString(), w.WsPort, nil)
err := server.AddListener(ws)
if err != nil {
log.Fatal(err)
}
}