fix(controller): api request queue memory leak

This commit is contained in:
Leandro Antônio Farias Machado 2023-06-27 02:20:14 -03:00
parent fcd322c7a9
commit b58b978b5f

View File

@ -81,8 +81,8 @@ func StartApi(a Api) {
srv := &http.Server{
Addr: "0.0.0.0:" + a.Port,
// Good practice to set timeouts to avoid Slowloris attacks.
WriteTimeout: time.Second * 15,
ReadTimeout: time.Second * 15,
WriteTimeout: time.Second * 30,
ReadTimeout: time.Second * 30,
IdleTimeout: time.Second * 60,
Handler: corsOpts.Handler(r), // Pass our instance of gorilla/mux in.
}
@ -170,6 +170,7 @@ func (a *Api) deviceFwUpdate(w http.ResponseWriter, r *http.Request) {
}
a.MsgQueue[msg.Header.MsgId] = make(chan usp_msg.Msg)
log.Println("Sending Msg:", msg.Header.MsgId)
a.Broker.Publish(tr369Message, "oktopus/v1/agent/"+sn, "oktopus/v1/api/"+sn)
var getMsgAnswer *usp_msg.GetResp
@ -177,10 +178,14 @@ func (a *Api) deviceFwUpdate(w http.ResponseWriter, r *http.Request) {
select {
case msg := <-a.MsgQueue[msg.Header.MsgId]:
log.Printf("Received Msg: %s", msg.Header.MsgId)
delete(a.MsgQueue, msg.Header.MsgId)
log.Println("requests queue:", a.MsgQueue)
getMsgAnswer = msg.Body.GetResponse().GetGetResp()
case <-time.After(time.Second * 30):
log.Printf("Request %s Timed Out", msg.Header.MsgId)
w.WriteHeader(http.StatusGatewayTimeout)
delete(a.MsgQueue, msg.Header.MsgId)
log.Println("requests queue:", a.MsgQueue)
json.NewEncoder(w).Encode("Request Timed Out")
return
}
@ -225,16 +230,21 @@ func (a *Api) deviceFwUpdate(w http.ResponseWriter, r *http.Request) {
}
a.MsgQueue[msg.Header.MsgId] = make(chan usp_msg.Msg)
log.Println("Sending Msg:", msg.Header.MsgId)
a.Broker.Publish(tr369Message, "oktopus/v1/agent/"+sn, "oktopus/v1/api/"+sn)
select {
case msg := <-a.MsgQueue[msg.Header.MsgId]:
log.Printf("Received Msg: %s", msg.Header.MsgId)
delete(a.MsgQueue, msg.Header.MsgId)
log.Println("requests queue:", a.MsgQueue)
json.NewEncoder(w).Encode(msg.Body.GetResponse().GetSetResp())
return
case <-time.After(time.Second * 28):
log.Printf("Request %s Timed Out", msg.Header.MsgId)
w.WriteHeader(http.StatusGatewayTimeout)
delete(a.MsgQueue, msg.Header.MsgId)
log.Println("requests queue:", a.MsgQueue)
json.NewEncoder(w).Encode("Request Timed Out")
return
}
@ -270,16 +280,21 @@ func (a *Api) deviceGetParameterInstances(w http.ResponseWriter, r *http.Request
//a.Broker.Request(tr369Message, usp_msg.Header_GET, "oktopus/v1/agent/"+sn, "oktopus/v1/get/"+sn)
a.MsgQueue[msg.Header.MsgId] = make(chan usp_msg.Msg)
log.Println("Sending Msg:", msg.Header.MsgId)
a.Broker.Publish(tr369Message, "oktopus/v1/agent/"+sn, "oktopus/v1/api/"+sn)
select {
case msg := <-a.MsgQueue[msg.Header.MsgId]:
log.Printf("Received Msg: %s", msg.Header.MsgId)
delete(a.MsgQueue, msg.Header.MsgId)
log.Println("requests queue:", a.MsgQueue)
json.NewEncoder(w).Encode(msg.Body.GetResponse().GetGetInstancesResp())
return
case <-time.After(time.Second * 28):
log.Printf("Request %s Timed Out", msg.Header.MsgId)
w.WriteHeader(http.StatusGatewayTimeout)
delete(a.MsgQueue, msg.Header.MsgId)
log.Println("requests queue:", a.MsgQueue)
json.NewEncoder(w).Encode("Request Timed Out")
return
}
@ -315,16 +330,21 @@ func (a *Api) deviceGetSupportedParametersMsg(w http.ResponseWriter, r *http.Req
//a.Broker.Request(tr369Message, usp_msg.Header_GET, "oktopus/v1/agent/"+sn, "oktopus/v1/get/"+sn)
a.MsgQueue[msg.Header.MsgId] = make(chan usp_msg.Msg)
log.Println("Sending Msg:", msg.Header.MsgId)
a.Broker.Publish(tr369Message, "oktopus/v1/agent/"+sn, "oktopus/v1/api/"+sn)
select {
case msg := <-a.MsgQueue[msg.Header.MsgId]:
log.Printf("Received Msg: %s", msg.Header.MsgId)
delete(a.MsgQueue, msg.Header.MsgId)
log.Println("requests queue:", a.MsgQueue)
json.NewEncoder(w).Encode(msg.Body.GetResponse().GetGetSupportedDmResp())
return
case <-time.After(time.Second * 28):
log.Printf("Request %s Timed Out", msg.Header.MsgId)
w.WriteHeader(http.StatusGatewayTimeout)
delete(a.MsgQueue, msg.Header.MsgId)
log.Println("requests queue:", a.MsgQueue)
json.NewEncoder(w).Encode("Request Timed Out")
return
}
@ -360,16 +380,21 @@ func (a *Api) deviceCreateMsg(w http.ResponseWriter, r *http.Request) {
//a.Broker.Request(tr369Message, usp_msg.Header_GET, "oktopus/v1/agent/"+sn, "oktopus/v1/get/"+sn)
a.MsgQueue[msg.Header.MsgId] = make(chan usp_msg.Msg)
log.Println("Sending Msg:", msg.Header.MsgId)
a.Broker.Publish(tr369Message, "oktopus/v1/agent/"+sn, "oktopus/v1/api/"+sn)
select {
case msg := <-a.MsgQueue[msg.Header.MsgId]:
log.Printf("Received Msg: %s", msg.Header.MsgId)
delete(a.MsgQueue, msg.Header.MsgId)
log.Println("requests queue:", a.MsgQueue)
json.NewEncoder(w).Encode(msg.Body.GetResponse().GetAddResp())
return
case <-time.After(time.Second * 28):
log.Printf("Request %s Timed Out", msg.Header.MsgId)
w.WriteHeader(http.StatusGatewayTimeout)
delete(a.MsgQueue, msg.Header.MsgId)
log.Println("requests queue:", a.MsgQueue)
json.NewEncoder(w).Encode("Request Timed Out")
return
}
@ -405,16 +430,22 @@ func (a *Api) deviceGetMsg(w http.ResponseWriter, r *http.Request) {
}
a.MsgQueue[msg.Header.MsgId] = make(chan usp_msg.Msg)
log.Println("Sending Msg:", msg.Header.MsgId)
a.Broker.Publish(tr369Message, "oktopus/v1/agent/"+sn, "oktopus/v1/api/"+sn)
select {
case msg := <-a.MsgQueue[msg.Header.MsgId]:
log.Printf("Received Msg: %s", msg.Header.MsgId)
delete(a.MsgQueue, msg.Header.MsgId)
log.Println("requests queue:", a.MsgQueue)
json.NewEncoder(w).Encode(msg.Body.GetResponse().GetGetResp())
return
case <-time.After(time.Second * 30):
log.Printf("Request %s Timed Out", msg.Header.MsgId)
w.WriteHeader(http.StatusGatewayTimeout)
delete(a.MsgQueue, msg.Header.MsgId)
log.Println("requests queue:", a.MsgQueue)
json.NewEncoder(w).Encode("Request Timed Out")
return
}
@ -450,16 +481,21 @@ func (a *Api) deviceDeleteMsg(w http.ResponseWriter, r *http.Request) {
//a.Broker.Request(tr369Message, usp_msg.Header_GET, "oktopus/v1/agent/"+sn, "oktopus/v1/get/"+sn)
a.MsgQueue[msg.Header.MsgId] = make(chan usp_msg.Msg)
log.Println("Sending Msg:", msg.Header.MsgId)
a.Broker.Publish(tr369Message, "oktopus/v1/agent/"+sn, "oktopus/v1/api/"+sn)
select {
case msg := <-a.MsgQueue[msg.Header.MsgId]:
log.Printf("Received Msg: %s", msg.Header.MsgId)
delete(a.MsgQueue, msg.Header.MsgId)
log.Println("requests queue:", a.MsgQueue)
json.NewEncoder(w).Encode(msg.Body.GetResponse().GetDeleteResp())
return
case <-time.After(time.Second * 28):
log.Printf("Request %s Timed Out", msg.Header.MsgId)
w.WriteHeader(http.StatusGatewayTimeout)
delete(a.MsgQueue, msg.Header.MsgId)
log.Println("requests queue:", a.MsgQueue)
json.NewEncoder(w).Encode("Request Timed Out")
return
}
@ -495,16 +531,21 @@ func (a *Api) deviceUpdateMsg(w http.ResponseWriter, r *http.Request) {
//a.Broker.Request(tr369Message, usp_msg.Header_GET, "oktopus/v1/agent/"+sn, "oktopus/v1/get/"+sn)
a.MsgQueue[msg.Header.MsgId] = make(chan usp_msg.Msg)
log.Println("Sending Msg:", msg.Header.MsgId)
a.Broker.Publish(tr369Message, "oktopus/v1/agent/"+sn, "oktopus/v1/api/"+sn)
select {
case msg := <-a.MsgQueue[msg.Header.MsgId]:
log.Printf("Received Msg: %s", msg.Header.MsgId)
delete(a.MsgQueue, msg.Header.MsgId)
log.Println("requests queue:", a.MsgQueue)
json.NewEncoder(w).Encode(msg.Body.GetResponse().GetSetResp())
return
case <-time.After(time.Second * 28):
log.Printf("Request %s Timed Out", msg.Header.MsgId)
w.WriteHeader(http.StatusGatewayTimeout)
delete(a.MsgQueue, msg.Header.MsgId)
log.Println("requests queue:", a.MsgQueue)
json.NewEncoder(w).Encode("Request Timed Out")
return
}