From b58b978b5f9f1977f38839cb8bd7c761c484040c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Leandro=20Ant=C3=B4nio=20Farias=20Machado?= Date: Tue, 27 Jun 2023 02:20:14 -0300 Subject: [PATCH] fix(controller): api request queue memory leak --- .../services/controller/internal/api/api.go | 45 ++++++++++++++++++- 1 file changed, 43 insertions(+), 2 deletions(-) diff --git a/backend/services/controller/internal/api/api.go b/backend/services/controller/internal/api/api.go index 5bc68a1..1fa9987 100644 --- a/backend/services/controller/internal/api/api.go +++ b/backend/services/controller/internal/api/api.go @@ -81,8 +81,8 @@ func StartApi(a Api) { srv := &http.Server{ Addr: "0.0.0.0:" + a.Port, // Good practice to set timeouts to avoid Slowloris attacks. - WriteTimeout: time.Second * 15, - ReadTimeout: time.Second * 15, + WriteTimeout: time.Second * 30, + ReadTimeout: time.Second * 30, IdleTimeout: time.Second * 60, Handler: corsOpts.Handler(r), // Pass our instance of gorilla/mux in. } @@ -170,6 +170,7 @@ func (a *Api) deviceFwUpdate(w http.ResponseWriter, r *http.Request) { } a.MsgQueue[msg.Header.MsgId] = make(chan usp_msg.Msg) + log.Println("Sending Msg:", msg.Header.MsgId) a.Broker.Publish(tr369Message, "oktopus/v1/agent/"+sn, "oktopus/v1/api/"+sn) var getMsgAnswer *usp_msg.GetResp @@ -177,10 +178,14 @@ func (a *Api) deviceFwUpdate(w http.ResponseWriter, r *http.Request) { select { case msg := <-a.MsgQueue[msg.Header.MsgId]: log.Printf("Received Msg: %s", msg.Header.MsgId) + delete(a.MsgQueue, msg.Header.MsgId) + log.Println("requests queue:", a.MsgQueue) getMsgAnswer = msg.Body.GetResponse().GetGetResp() case <-time.After(time.Second * 30): log.Printf("Request %s Timed Out", msg.Header.MsgId) w.WriteHeader(http.StatusGatewayTimeout) + delete(a.MsgQueue, msg.Header.MsgId) + log.Println("requests queue:", a.MsgQueue) json.NewEncoder(w).Encode("Request Timed Out") return } @@ -225,16 +230,21 @@ func (a *Api) deviceFwUpdate(w http.ResponseWriter, r *http.Request) { } a.MsgQueue[msg.Header.MsgId] = make(chan usp_msg.Msg) + log.Println("Sending Msg:", msg.Header.MsgId) a.Broker.Publish(tr369Message, "oktopus/v1/agent/"+sn, "oktopus/v1/api/"+sn) select { case msg := <-a.MsgQueue[msg.Header.MsgId]: log.Printf("Received Msg: %s", msg.Header.MsgId) + delete(a.MsgQueue, msg.Header.MsgId) + log.Println("requests queue:", a.MsgQueue) json.NewEncoder(w).Encode(msg.Body.GetResponse().GetSetResp()) return case <-time.After(time.Second * 28): log.Printf("Request %s Timed Out", msg.Header.MsgId) w.WriteHeader(http.StatusGatewayTimeout) + delete(a.MsgQueue, msg.Header.MsgId) + log.Println("requests queue:", a.MsgQueue) json.NewEncoder(w).Encode("Request Timed Out") return } @@ -270,16 +280,21 @@ func (a *Api) deviceGetParameterInstances(w http.ResponseWriter, r *http.Request //a.Broker.Request(tr369Message, usp_msg.Header_GET, "oktopus/v1/agent/"+sn, "oktopus/v1/get/"+sn) a.MsgQueue[msg.Header.MsgId] = make(chan usp_msg.Msg) + log.Println("Sending Msg:", msg.Header.MsgId) a.Broker.Publish(tr369Message, "oktopus/v1/agent/"+sn, "oktopus/v1/api/"+sn) select { case msg := <-a.MsgQueue[msg.Header.MsgId]: log.Printf("Received Msg: %s", msg.Header.MsgId) + delete(a.MsgQueue, msg.Header.MsgId) + log.Println("requests queue:", a.MsgQueue) json.NewEncoder(w).Encode(msg.Body.GetResponse().GetGetInstancesResp()) return case <-time.After(time.Second * 28): log.Printf("Request %s Timed Out", msg.Header.MsgId) w.WriteHeader(http.StatusGatewayTimeout) + delete(a.MsgQueue, msg.Header.MsgId) + log.Println("requests queue:", a.MsgQueue) json.NewEncoder(w).Encode("Request Timed Out") return } @@ -315,16 +330,21 @@ func (a *Api) deviceGetSupportedParametersMsg(w http.ResponseWriter, r *http.Req //a.Broker.Request(tr369Message, usp_msg.Header_GET, "oktopus/v1/agent/"+sn, "oktopus/v1/get/"+sn) a.MsgQueue[msg.Header.MsgId] = make(chan usp_msg.Msg) + log.Println("Sending Msg:", msg.Header.MsgId) a.Broker.Publish(tr369Message, "oktopus/v1/agent/"+sn, "oktopus/v1/api/"+sn) select { case msg := <-a.MsgQueue[msg.Header.MsgId]: log.Printf("Received Msg: %s", msg.Header.MsgId) + delete(a.MsgQueue, msg.Header.MsgId) + log.Println("requests queue:", a.MsgQueue) json.NewEncoder(w).Encode(msg.Body.GetResponse().GetGetSupportedDmResp()) return case <-time.After(time.Second * 28): log.Printf("Request %s Timed Out", msg.Header.MsgId) w.WriteHeader(http.StatusGatewayTimeout) + delete(a.MsgQueue, msg.Header.MsgId) + log.Println("requests queue:", a.MsgQueue) json.NewEncoder(w).Encode("Request Timed Out") return } @@ -360,16 +380,21 @@ func (a *Api) deviceCreateMsg(w http.ResponseWriter, r *http.Request) { //a.Broker.Request(tr369Message, usp_msg.Header_GET, "oktopus/v1/agent/"+sn, "oktopus/v1/get/"+sn) a.MsgQueue[msg.Header.MsgId] = make(chan usp_msg.Msg) + log.Println("Sending Msg:", msg.Header.MsgId) a.Broker.Publish(tr369Message, "oktopus/v1/agent/"+sn, "oktopus/v1/api/"+sn) select { case msg := <-a.MsgQueue[msg.Header.MsgId]: log.Printf("Received Msg: %s", msg.Header.MsgId) + delete(a.MsgQueue, msg.Header.MsgId) + log.Println("requests queue:", a.MsgQueue) json.NewEncoder(w).Encode(msg.Body.GetResponse().GetAddResp()) return case <-time.After(time.Second * 28): log.Printf("Request %s Timed Out", msg.Header.MsgId) w.WriteHeader(http.StatusGatewayTimeout) + delete(a.MsgQueue, msg.Header.MsgId) + log.Println("requests queue:", a.MsgQueue) json.NewEncoder(w).Encode("Request Timed Out") return } @@ -405,16 +430,22 @@ func (a *Api) deviceGetMsg(w http.ResponseWriter, r *http.Request) { } a.MsgQueue[msg.Header.MsgId] = make(chan usp_msg.Msg) + + log.Println("Sending Msg:", msg.Header.MsgId) a.Broker.Publish(tr369Message, "oktopus/v1/agent/"+sn, "oktopus/v1/api/"+sn) select { case msg := <-a.MsgQueue[msg.Header.MsgId]: log.Printf("Received Msg: %s", msg.Header.MsgId) + delete(a.MsgQueue, msg.Header.MsgId) + log.Println("requests queue:", a.MsgQueue) json.NewEncoder(w).Encode(msg.Body.GetResponse().GetGetResp()) return case <-time.After(time.Second * 30): log.Printf("Request %s Timed Out", msg.Header.MsgId) w.WriteHeader(http.StatusGatewayTimeout) + delete(a.MsgQueue, msg.Header.MsgId) + log.Println("requests queue:", a.MsgQueue) json.NewEncoder(w).Encode("Request Timed Out") return } @@ -450,16 +481,21 @@ func (a *Api) deviceDeleteMsg(w http.ResponseWriter, r *http.Request) { //a.Broker.Request(tr369Message, usp_msg.Header_GET, "oktopus/v1/agent/"+sn, "oktopus/v1/get/"+sn) a.MsgQueue[msg.Header.MsgId] = make(chan usp_msg.Msg) + log.Println("Sending Msg:", msg.Header.MsgId) a.Broker.Publish(tr369Message, "oktopus/v1/agent/"+sn, "oktopus/v1/api/"+sn) select { case msg := <-a.MsgQueue[msg.Header.MsgId]: log.Printf("Received Msg: %s", msg.Header.MsgId) + delete(a.MsgQueue, msg.Header.MsgId) + log.Println("requests queue:", a.MsgQueue) json.NewEncoder(w).Encode(msg.Body.GetResponse().GetDeleteResp()) return case <-time.After(time.Second * 28): log.Printf("Request %s Timed Out", msg.Header.MsgId) w.WriteHeader(http.StatusGatewayTimeout) + delete(a.MsgQueue, msg.Header.MsgId) + log.Println("requests queue:", a.MsgQueue) json.NewEncoder(w).Encode("Request Timed Out") return } @@ -495,16 +531,21 @@ func (a *Api) deviceUpdateMsg(w http.ResponseWriter, r *http.Request) { //a.Broker.Request(tr369Message, usp_msg.Header_GET, "oktopus/v1/agent/"+sn, "oktopus/v1/get/"+sn) a.MsgQueue[msg.Header.MsgId] = make(chan usp_msg.Msg) + log.Println("Sending Msg:", msg.Header.MsgId) a.Broker.Publish(tr369Message, "oktopus/v1/agent/"+sn, "oktopus/v1/api/"+sn) select { case msg := <-a.MsgQueue[msg.Header.MsgId]: log.Printf("Received Msg: %s", msg.Header.MsgId) + delete(a.MsgQueue, msg.Header.MsgId) + log.Println("requests queue:", a.MsgQueue) json.NewEncoder(w).Encode(msg.Body.GetResponse().GetSetResp()) return case <-time.After(time.Second * 28): log.Printf("Request %s Timed Out", msg.Header.MsgId) w.WriteHeader(http.StatusGatewayTimeout) + delete(a.MsgQueue, msg.Header.MsgId) + log.Println("requests queue:", a.MsgQueue) json.NewEncoder(w).Encode("Request Timed Out") return }