From 612c5bb265f70a86efdb069393a1d7bfb2766183 Mon Sep 17 00:00:00 2001 From: leandrofars Date: Thu, 15 May 2025 15:59:53 -0300 Subject: [PATCH] feat(mqtt): healthCheck endpoint --- .../mtp/mqtt/internal/config/config.go | 68 ++++++++++--------- .../mqtt/internal/listeners/health/health.go | 21 ++++++ .../mtp/mqtt/internal/listeners/listeners.go | 17 ++++- 3 files changed, 74 insertions(+), 32 deletions(-) create mode 100644 backend/services/mtp/mqtt/internal/listeners/health/health.go diff --git a/backend/services/mtp/mqtt/internal/config/config.go b/backend/services/mtp/mqtt/internal/config/config.go index e701117..f58accf 100644 --- a/backend/services/mtp/mqtt/internal/config/config.go +++ b/backend/services/mtp/mqtt/internal/config/config.go @@ -13,22 +13,24 @@ import ( const LOCAL_ENV = ".env.local" type Config struct { - MqttPort string - NoTls bool - Tls bool - MqttTlsPort string - Fullchain string - Privkey string - AuthEnable bool - RedisEnable bool - RedisAddr string - RedisPassword string - WsEnable bool - WsPort string - HttpEnable bool - HttpPort string - LogLevel int - Nats Nats + MqttPort string + NoTls bool + Tls bool + MqttTlsPort string + Fullchain string + Privkey string + AuthEnable bool + RedisEnable bool + RedisAddr string + RedisPassword string + WsEnable bool + WsPort string + HttpEnable bool + HttpPort string + HttpHealthCheckEnable bool + HttpHealthCheckPort string + LogLevel int + Nats Nats } type Nats struct { @@ -68,6 +70,8 @@ func NewConfig() Config { redisPassword := flag.String("redis_passwd", lookupEnvOrString("REDIS_PASSWD", ""), "redis db password") wsEnable := flag.Bool("ws_enable", lookupEnvOrBool("WS_ENABLE", false), "enable/disable Websocket listener") wsPort := flag.String("ws_port", lookupEnvOrString("WS_PORT", ":80"), "port for Websocket listener") + httpHealthCheckEnable := flag.Bool("http_health_check_enable", lookupEnvOrBool("HTTP_HEALTH_CHECK_ENABLE", true), "enable/disable HTTP health check") + httpHealthCheckPort := flag.String("http_health_check_port", lookupEnvOrString("HTTP_HEALTH_CHECK_PORT", ":8884"), "port for HTTP health check") httpEnable := flag.Bool("http_enable", lookupEnvOrBool("HTTP_ENABLE", false), "enable/disable HTTP listener of mqtt metrics") httpPort := flag.String("http_port", lookupEnvOrString("HTTP_PORT", ":8080"), "port for HTTP listener of mqtt metrics") logLevel := flag.Int("log_level", lookupEnvOrInt("LOG_LEVEL", 1), "0=DEBUG, 1=INFO, 2=WARNING, 3=ERROR") @@ -93,21 +97,23 @@ func NewConfig() Config { ctx := context.TODO() conf := Config{ - MqttPort: *mqttPort, - MqttTlsPort: *mqttTlsPort, - NoTls: *noTls, - Tls: *tls, - Fullchain: *fullchain, - Privkey: *privkey, - AuthEnable: *authEnable, - RedisEnable: *redisEnable, - RedisAddr: *redisAddr, - RedisPassword: *redisPassword, - WsEnable: *wsEnable, - WsPort: *wsPort, - HttpEnable: *httpEnable, - HttpPort: *httpPort, - LogLevel: *logLevel, + MqttPort: *mqttPort, + MqttTlsPort: *mqttTlsPort, + NoTls: *noTls, + Tls: *tls, + Fullchain: *fullchain, + Privkey: *privkey, + AuthEnable: *authEnable, + RedisEnable: *redisEnable, + RedisAddr: *redisAddr, + RedisPassword: *redisPassword, + WsEnable: *wsEnable, + WsPort: *wsPort, + HttpEnable: *httpEnable, + HttpPort: *httpPort, + LogLevel: *logLevel, + HttpHealthCheckEnable: *httpHealthCheckEnable, + HttpHealthCheckPort: *httpHealthCheckPort, Nats: Nats{ Url: *natsUrl, Name: *natsName, diff --git a/backend/services/mtp/mqtt/internal/listeners/health/health.go b/backend/services/mtp/mqtt/internal/listeners/health/health.go new file mode 100644 index 0000000..a483697 --- /dev/null +++ b/backend/services/mtp/mqtt/internal/listeners/health/health.go @@ -0,0 +1,21 @@ +package health + +import ( + "log" + + "github.com/google/uuid" + "github.com/mochi-co/mqtt/v2" + "github.com/mochi-co/mqtt/v2/listeners" +) + +type HttpHealth struct { + HttpPort string +} + +func (h *HttpHealth) Start(server *mqtt.Server) { + healthCheckEndpoint := listeners.NewHTTPHealthCheck(uuid.NewString(), h.HttpPort, nil) + err := server.AddListener(healthCheckEndpoint) + if err != nil { + log.Fatal(err) + } +} diff --git a/backend/services/mtp/mqtt/internal/listeners/listeners.go b/backend/services/mtp/mqtt/internal/listeners/listeners.go index d72bd73..46e4880 100644 --- a/backend/services/mtp/mqtt/internal/listeners/listeners.go +++ b/backend/services/mtp/mqtt/internal/listeners/listeners.go @@ -2,6 +2,7 @@ package listeners import ( "broker/internal/config" + "broker/internal/listeners/health" "broker/internal/listeners/http" broker "broker/internal/listeners/mqtt" "broker/internal/listeners/ws" @@ -16,7 +17,7 @@ func StartServers(c config.Config) { server := mqtt.New(&mqtt.Options{}) var wg sync.WaitGroup - wg.Add(3) + wg.Add(4) go func() { mqttServer := newMqttServer(c) @@ -32,6 +33,14 @@ func StartServers(c config.Config) { wg.Done() }() + go func() { + if c.HttpHealthCheckEnable { + healhCheckServer := NewHTTPHealthCheckServer(c) + healhCheckServer.Start(server) + } + wg.Done() + }() + go func() { if c.HttpEnable { httpServer := newHttpServer(c) @@ -80,3 +89,9 @@ func newHttpServer(c config.Config) *http.Http { HttpPort: c.HttpPort, } } + +func NewHTTPHealthCheckServer(c config.Config) *health.HttpHealth { + return &health.HttpHealth{ + HttpPort: c.HttpHealthCheckPort, + } +}