diff --git a/backend/services/controller/cmd/oktopus/main.go b/backend/services/controller/cmd/oktopus/main.go index de354aa..8e80dc4 100755 --- a/backend/services/controller/cmd/oktopus/main.go +++ b/backend/services/controller/cmd/oktopus/main.go @@ -19,6 +19,7 @@ import ( "github.com/leandrofars/oktopus/internal/mtp" "github.com/leandrofars/oktopus/internal/stomp" usp_msg "github.com/leandrofars/oktopus/internal/usp_message" + "github.com/leandrofars/oktopus/internal/ws" ) const VERSION = "0.0.1" @@ -41,7 +42,7 @@ func main() { } // Locks app running until it receives a stop command as Ctrl+C. - signal.Notify(done, syscall.SIGINT, syscall.SIGTERM) + signal.Notify(done, syscall.SIGINT) log.SetFlags(log.LstdFlags | log.Lshortfile) @@ -68,6 +69,13 @@ func main() { flStompAddr := flag.String("stomp", lookupEnvOrString("STOMP_ADDR", "127.0.0.1:61613"), "Stomp broker address") flStompUser := flag.String("stomp_user", lookupEnvOrString("STOMP_USERNAME", ""), "Stomp broker username") flStompPasswd := flag.String("stomp_passwd", lookupEnvOrString("STOMP_PASSWORD", ""), "Stomp broker password") + flWsToken := flag.String("ws_token", lookupEnvOrString("WS_TOKEN", ""), "Websocket token") + flWsAuth := flag.Bool("ws_auth", lookupEnvOrBool("WS_AUTH", true), "Websocket auth enable or not") + flWsAddr := flag.String("ws_addr", lookupEnvOrString("WS_ADDR", "localhost"), "Websocket server address") + flWsPort := flag.String("ws_port", lookupEnvOrString("WS_PORT", "8080"), "Websocket server port") + flWsRoute := flag.String("ws_route", lookupEnvOrString("WS_ROUTE", "/ws/controller"), "Websocket server route") + flWsTls := flag.Bool("ws_tls", lookupEnvOrBool("WS_TLS", false), "Websocket server tls") + flDisableWs := flag.Bool("ws_disable", lookupEnvOrBool("WS_DISABLE", false), "Disable WS MTP") flDisableStomp := flag.Bool("stomp_disable", lookupEnvOrBool("STOMP_DISABLE", false), "Disable STOMP MTP") flDisableMqtt := flag.Bool("mqtt_disable", lookupEnvOrBool("MQTT_DISABLE", false), "Disable MQTT MTP") flHelp := flag.Bool("help", false, "Help") @@ -87,6 +95,7 @@ func main() { apiMsgQueue := make(map[string](chan usp_msg.Msg)) var m sync.Mutex + //TODO: refact mtps initialization through main.go /* If you want to use another message protocol just make it implement Broker interface. */ @@ -98,10 +107,19 @@ func main() { } wg := new(sync.WaitGroup) - wg.Add(2) + wg.Add(3) // Three wait groups (mqtt, stomp, ws) + /* ------------------------------ MTPs clients ------------------------------ */ var stompClient stomp.Stomp var mqttClient mqtt.Mqtt + var wsClient ws.Ws + /* -------------------------------------------------------------------------- */ + + /* ------------------------ MTPs disconnect channels ------------------------ */ + var mqttDone chan os.Signal + var wsDone chan os.Signal + var stompDone chan os.Signal + /* -------------------------------------------------------------------------- */ go func() { mqttClient = mqtt.Mqtt{ @@ -120,9 +138,11 @@ func main() { QMutex: &m, } + mqttDone = make(chan os.Signal, 1) + if !*flDisableMqtt { // MQTT will try connect to broker forever - go mtp.MtpService(&mqttClient, done, wg) + go mtp.MtpService(&mqttClient, mqttDone, wg) } else { wg.Done() } @@ -135,9 +155,31 @@ func main() { Password: *flStompPasswd, } + stompDone = make(chan os.Signal, 1) + if !*flDisableStomp { // STOMP will try to connect for a bunch of times and then exit - go mtp.MtpService(&stompClient, done, wg) + go mtp.MtpService(&stompClient, stompDone, wg) + } else { + wg.Done() + } + }() + + go func() { + wsClient = ws.Ws{ + Addr: *flWsAddr, + Port: *flWsPort, + Token: *flWsToken, + Route: *flWsRoute, + Auth: *flWsAuth, + TLS: *flWsTls, + Ctx: ctx, + } + + wsDone = make(chan os.Signal, 1) + + if !*flDisableWs { + go mtp.MtpService(&wsClient, wsDone, wg) } else { wg.Done() } @@ -150,6 +192,10 @@ func main() { <-done cancel() + // send done signal to all MTPs + wsDone <- os.Interrupt + mqttDone <- os.Interrupt + stompDone <- os.Interrupt log.Println("(⌐■_■) Oktopus is out!")