feat: add ws mtp to main code + fix: done channel processes closing

This commit is contained in:
leandrofars 2024-02-05 23:44:12 -03:00
parent 577a50d68b
commit c94ab348b5

View File

@ -19,6 +19,7 @@ import (
"github.com/leandrofars/oktopus/internal/mtp" "github.com/leandrofars/oktopus/internal/mtp"
"github.com/leandrofars/oktopus/internal/stomp" "github.com/leandrofars/oktopus/internal/stomp"
usp_msg "github.com/leandrofars/oktopus/internal/usp_message" usp_msg "github.com/leandrofars/oktopus/internal/usp_message"
"github.com/leandrofars/oktopus/internal/ws"
) )
const VERSION = "0.0.1" const VERSION = "0.0.1"
@ -41,7 +42,7 @@ func main() {
} }
// Locks app running until it receives a stop command as Ctrl+C. // Locks app running until it receives a stop command as Ctrl+C.
signal.Notify(done, syscall.SIGINT, syscall.SIGTERM) signal.Notify(done, syscall.SIGINT)
log.SetFlags(log.LstdFlags | log.Lshortfile) log.SetFlags(log.LstdFlags | log.Lshortfile)
@ -68,6 +69,13 @@ func main() {
flStompAddr := flag.String("stomp", lookupEnvOrString("STOMP_ADDR", "127.0.0.1:61613"), "Stomp broker address") flStompAddr := flag.String("stomp", lookupEnvOrString("STOMP_ADDR", "127.0.0.1:61613"), "Stomp broker address")
flStompUser := flag.String("stomp_user", lookupEnvOrString("STOMP_USERNAME", ""), "Stomp broker username") flStompUser := flag.String("stomp_user", lookupEnvOrString("STOMP_USERNAME", ""), "Stomp broker username")
flStompPasswd := flag.String("stomp_passwd", lookupEnvOrString("STOMP_PASSWORD", ""), "Stomp broker password") flStompPasswd := flag.String("stomp_passwd", lookupEnvOrString("STOMP_PASSWORD", ""), "Stomp broker password")
flWsToken := flag.String("ws_token", lookupEnvOrString("WS_TOKEN", ""), "Websocket token")
flWsAuth := flag.Bool("ws_auth", lookupEnvOrBool("WS_AUTH", true), "Websocket auth enable or not")
flWsAddr := flag.String("ws_addr", lookupEnvOrString("WS_ADDR", "localhost"), "Websocket server address")
flWsPort := flag.String("ws_port", lookupEnvOrString("WS_PORT", "8080"), "Websocket server port")
flWsRoute := flag.String("ws_route", lookupEnvOrString("WS_ROUTE", "/ws/controller"), "Websocket server route")
flWsTls := flag.Bool("ws_tls", lookupEnvOrBool("WS_TLS", false), "Websocket server tls")
flDisableWs := flag.Bool("ws_disable", lookupEnvOrBool("WS_DISABLE", false), "Disable WS MTP")
flDisableStomp := flag.Bool("stomp_disable", lookupEnvOrBool("STOMP_DISABLE", false), "Disable STOMP MTP") flDisableStomp := flag.Bool("stomp_disable", lookupEnvOrBool("STOMP_DISABLE", false), "Disable STOMP MTP")
flDisableMqtt := flag.Bool("mqtt_disable", lookupEnvOrBool("MQTT_DISABLE", false), "Disable MQTT MTP") flDisableMqtt := flag.Bool("mqtt_disable", lookupEnvOrBool("MQTT_DISABLE", false), "Disable MQTT MTP")
flHelp := flag.Bool("help", false, "Help") flHelp := flag.Bool("help", false, "Help")
@ -87,6 +95,7 @@ func main() {
apiMsgQueue := make(map[string](chan usp_msg.Msg)) apiMsgQueue := make(map[string](chan usp_msg.Msg))
var m sync.Mutex var m sync.Mutex
//TODO: refact mtps initialization through main.go
/* /*
If you want to use another message protocol just make it implement Broker interface. If you want to use another message protocol just make it implement Broker interface.
*/ */
@ -98,10 +107,19 @@ func main() {
} }
wg := new(sync.WaitGroup) wg := new(sync.WaitGroup)
wg.Add(2) wg.Add(3) // Three wait groups (mqtt, stomp, ws)
/* ------------------------------ MTPs clients ------------------------------ */
var stompClient stomp.Stomp var stompClient stomp.Stomp
var mqttClient mqtt.Mqtt var mqttClient mqtt.Mqtt
var wsClient ws.Ws
/* -------------------------------------------------------------------------- */
/* ------------------------ MTPs disconnect channels ------------------------ */
var mqttDone chan os.Signal
var wsDone chan os.Signal
var stompDone chan os.Signal
/* -------------------------------------------------------------------------- */
go func() { go func() {
mqttClient = mqtt.Mqtt{ mqttClient = mqtt.Mqtt{
@ -120,9 +138,11 @@ func main() {
QMutex: &m, QMutex: &m,
} }
mqttDone = make(chan os.Signal, 1)
if !*flDisableMqtt { if !*flDisableMqtt {
// MQTT will try connect to broker forever // MQTT will try connect to broker forever
go mtp.MtpService(&mqttClient, done, wg) go mtp.MtpService(&mqttClient, mqttDone, wg)
} else { } else {
wg.Done() wg.Done()
} }
@ -135,9 +155,31 @@ func main() {
Password: *flStompPasswd, Password: *flStompPasswd,
} }
stompDone = make(chan os.Signal, 1)
if !*flDisableStomp { if !*flDisableStomp {
// STOMP will try to connect for a bunch of times and then exit // STOMP will try to connect for a bunch of times and then exit
go mtp.MtpService(&stompClient, done, wg) go mtp.MtpService(&stompClient, stompDone, wg)
} else {
wg.Done()
}
}()
go func() {
wsClient = ws.Ws{
Addr: *flWsAddr,
Port: *flWsPort,
Token: *flWsToken,
Route: *flWsRoute,
Auth: *flWsAuth,
TLS: *flWsTls,
Ctx: ctx,
}
wsDone = make(chan os.Signal, 1)
if !*flDisableWs {
go mtp.MtpService(&wsClient, wsDone, wg)
} else { } else {
wg.Done() wg.Done()
} }
@ -150,6 +192,10 @@ func main() {
<-done <-done
cancel() cancel()
// send done signal to all MTPs
wsDone <- os.Interrupt
mqttDone <- os.Interrupt
stompDone <- os.Interrupt
log.Println("(⌐■_■) Oktopus is out!") log.Println("(⌐■_■) Oktopus is out!")