feat(mqtt): healthCheck endpoint

This commit is contained in:
leandrofars 2025-05-15 15:59:53 -03:00
parent d5806885dc
commit 612c5bb265
3 changed files with 74 additions and 32 deletions

View File

@ -13,22 +13,24 @@ import (
const LOCAL_ENV = ".env.local" const LOCAL_ENV = ".env.local"
type Config struct { type Config struct {
MqttPort string MqttPort string
NoTls bool NoTls bool
Tls bool Tls bool
MqttTlsPort string MqttTlsPort string
Fullchain string Fullchain string
Privkey string Privkey string
AuthEnable bool AuthEnable bool
RedisEnable bool RedisEnable bool
RedisAddr string RedisAddr string
RedisPassword string RedisPassword string
WsEnable bool WsEnable bool
WsPort string WsPort string
HttpEnable bool HttpEnable bool
HttpPort string HttpPort string
LogLevel int HttpHealthCheckEnable bool
Nats Nats HttpHealthCheckPort string
LogLevel int
Nats Nats
} }
type Nats struct { type Nats struct {
@ -68,6 +70,8 @@ func NewConfig() Config {
redisPassword := flag.String("redis_passwd", lookupEnvOrString("REDIS_PASSWD", ""), "redis db password") redisPassword := flag.String("redis_passwd", lookupEnvOrString("REDIS_PASSWD", ""), "redis db password")
wsEnable := flag.Bool("ws_enable", lookupEnvOrBool("WS_ENABLE", false), "enable/disable Websocket listener") wsEnable := flag.Bool("ws_enable", lookupEnvOrBool("WS_ENABLE", false), "enable/disable Websocket listener")
wsPort := flag.String("ws_port", lookupEnvOrString("WS_PORT", ":80"), "port for Websocket listener") wsPort := flag.String("ws_port", lookupEnvOrString("WS_PORT", ":80"), "port for Websocket listener")
httpHealthCheckEnable := flag.Bool("http_health_check_enable", lookupEnvOrBool("HTTP_HEALTH_CHECK_ENABLE", true), "enable/disable HTTP health check")
httpHealthCheckPort := flag.String("http_health_check_port", lookupEnvOrString("HTTP_HEALTH_CHECK_PORT", ":8884"), "port for HTTP health check")
httpEnable := flag.Bool("http_enable", lookupEnvOrBool("HTTP_ENABLE", false), "enable/disable HTTP listener of mqtt metrics") httpEnable := flag.Bool("http_enable", lookupEnvOrBool("HTTP_ENABLE", false), "enable/disable HTTP listener of mqtt metrics")
httpPort := flag.String("http_port", lookupEnvOrString("HTTP_PORT", ":8080"), "port for HTTP listener of mqtt metrics") httpPort := flag.String("http_port", lookupEnvOrString("HTTP_PORT", ":8080"), "port for HTTP listener of mqtt metrics")
logLevel := flag.Int("log_level", lookupEnvOrInt("LOG_LEVEL", 1), "0=DEBUG, 1=INFO, 2=WARNING, 3=ERROR") logLevel := flag.Int("log_level", lookupEnvOrInt("LOG_LEVEL", 1), "0=DEBUG, 1=INFO, 2=WARNING, 3=ERROR")
@ -93,21 +97,23 @@ func NewConfig() Config {
ctx := context.TODO() ctx := context.TODO()
conf := Config{ conf := Config{
MqttPort: *mqttPort, MqttPort: *mqttPort,
MqttTlsPort: *mqttTlsPort, MqttTlsPort: *mqttTlsPort,
NoTls: *noTls, NoTls: *noTls,
Tls: *tls, Tls: *tls,
Fullchain: *fullchain, Fullchain: *fullchain,
Privkey: *privkey, Privkey: *privkey,
AuthEnable: *authEnable, AuthEnable: *authEnable,
RedisEnable: *redisEnable, RedisEnable: *redisEnable,
RedisAddr: *redisAddr, RedisAddr: *redisAddr,
RedisPassword: *redisPassword, RedisPassword: *redisPassword,
WsEnable: *wsEnable, WsEnable: *wsEnable,
WsPort: *wsPort, WsPort: *wsPort,
HttpEnable: *httpEnable, HttpEnable: *httpEnable,
HttpPort: *httpPort, HttpPort: *httpPort,
LogLevel: *logLevel, LogLevel: *logLevel,
HttpHealthCheckEnable: *httpHealthCheckEnable,
HttpHealthCheckPort: *httpHealthCheckPort,
Nats: Nats{ Nats: Nats{
Url: *natsUrl, Url: *natsUrl,
Name: *natsName, Name: *natsName,

View File

@ -0,0 +1,21 @@
package health
import (
"log"
"github.com/google/uuid"
"github.com/mochi-co/mqtt/v2"
"github.com/mochi-co/mqtt/v2/listeners"
)
type HttpHealth struct {
HttpPort string
}
func (h *HttpHealth) Start(server *mqtt.Server) {
healthCheckEndpoint := listeners.NewHTTPHealthCheck(uuid.NewString(), h.HttpPort, nil)
err := server.AddListener(healthCheckEndpoint)
if err != nil {
log.Fatal(err)
}
}

View File

@ -2,6 +2,7 @@ package listeners
import ( import (
"broker/internal/config" "broker/internal/config"
"broker/internal/listeners/health"
"broker/internal/listeners/http" "broker/internal/listeners/http"
broker "broker/internal/listeners/mqtt" broker "broker/internal/listeners/mqtt"
"broker/internal/listeners/ws" "broker/internal/listeners/ws"
@ -16,7 +17,7 @@ func StartServers(c config.Config) {
server := mqtt.New(&mqtt.Options{}) server := mqtt.New(&mqtt.Options{})
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(3) wg.Add(4)
go func() { go func() {
mqttServer := newMqttServer(c) mqttServer := newMqttServer(c)
@ -32,6 +33,14 @@ func StartServers(c config.Config) {
wg.Done() wg.Done()
}() }()
go func() {
if c.HttpHealthCheckEnable {
healhCheckServer := NewHTTPHealthCheckServer(c)
healhCheckServer.Start(server)
}
wg.Done()
}()
go func() { go func() {
if c.HttpEnable { if c.HttpEnable {
httpServer := newHttpServer(c) httpServer := newHttpServer(c)
@ -80,3 +89,9 @@ func newHttpServer(c config.Config) *http.Http {
HttpPort: c.HttpPort, HttpPort: c.HttpPort,
} }
} }
func NewHTTPHealthCheckServer(c config.Config) *health.HttpHealth {
return &health.HttpHealth{
HttpPort: c.HttpHealthCheckPort,
}
}