From a07082fb796d3a22587a5fe5b126ac5debb45c10 Mon Sep 17 00:00:00 2001 From: leandrofars Date: Mon, 23 Oct 2023 15:46:26 -0300 Subject: [PATCH] refact(api): controller clean code [ close #121 ] --- .../services/controller/internal/api/api.go | 84 +++++- .../controller/internal/api/device.go | 275 +----------------- .../services/controller/internal/api/wifi.go | 13 + 3 files changed, 88 insertions(+), 284 deletions(-) diff --git a/backend/services/controller/internal/api/api.go b/backend/services/controller/internal/api/api.go index 0007990..22adb20 100644 --- a/backend/services/controller/internal/api/api.go +++ b/backend/services/controller/internal/api/api.go @@ -1,6 +1,7 @@ package api import ( + "encoding/json" "log" "net/http" "sync" @@ -12,6 +13,8 @@ import ( "github.com/leandrofars/oktopus/internal/db" "github.com/leandrofars/oktopus/internal/mtp" usp_msg "github.com/leandrofars/oktopus/internal/usp_message" + "github.com/leandrofars/oktopus/internal/utils" + "google.golang.org/protobuf/proto" ) type Api struct { @@ -22,18 +25,7 @@ type Api struct { QMutex *sync.Mutex } -type WiFi struct { - SSID string `json:"ssid"` - Password string `json:"password"` - Security string `json:"security"` - SecurityCapabilities []string `json:"securityCapabilities"` - AutoChannelEnable bool `json:"autoChannelEnable"` - Channel int `json:"channel"` - ChannelBandwidth string `json:"channelBandwidth"` - FrequencyBand string `json:"frequencyBand"` - //PossibleChannels []int `json:"PossibleChannels"` - SupportedChannelBandwidths []string `json:"supportedChannelBandwidths"` -} +const REQUEST_TIMEOUT = time.Second * 30 const ( NormalUser = iota @@ -50,10 +42,6 @@ func NewApi(port string, db db.Database, b mtp.Broker, msgQueue map[string](chan } } -//TODO: restructure http api calls for mqtt, to use golang generics and avoid code repetition -//TODO: standardize timeouts through code -//TODO: fix api methods - func StartApi(a Api) { r := mux.NewRouter() authentication := r.PathPrefix("/api/auth").Subrouter() @@ -104,3 +92,67 @@ func StartApi(a Api) { }() log.Println("Running Api at port", a.Port) } + +func (a *Api) uspCall(msg usp_msg.Msg, sn string, w http.ResponseWriter) { + + encodedMsg, err := proto.Marshal(&msg) + if err != nil { + log.Println(err) + w.WriteHeader(http.StatusBadRequest) + return + } + + record := utils.NewUspRecord(encodedMsg, sn) + tr369Message, err := proto.Marshal(&record) + if err != nil { + log.Fatalln("Failed to encode tr369 record:", err) + } + + a.QMutex.Lock() + a.MsgQueue[msg.Header.MsgId] = make(chan usp_msg.Msg) + a.QMutex.Unlock() + log.Println("Sending Msg:", msg.Header.MsgId) + a.Broker.Publish(tr369Message, "oktopus/v1/agent/"+sn, "oktopus/v1/api/"+sn, false) + + select { + case msg := <-a.MsgQueue[msg.Header.MsgId]: + log.Printf("Received Msg: %s", msg.Header.MsgId) + a.QMutex.Lock() + delete(a.MsgQueue, msg.Header.MsgId) + a.QMutex.Unlock() + log.Println("requests queue:", a.MsgQueue) + body := msg.Body.GetResponse() + switch body.RespType.(type) { + case *usp_msg.Response_GetResp: + json.NewEncoder(w).Encode(body.GetGetResp()) + case *usp_msg.Response_DeleteResp: + json.NewEncoder(w).Encode(body.GetDeleteResp()) + case *usp_msg.Response_AddResp: + json.NewEncoder(w).Encode(body.GetAddResp()) + case *usp_msg.Response_SetResp: + json.NewEncoder(w).Encode(body.GetSetResp()) + case *usp_msg.Response_GetInstancesResp: + json.NewEncoder(w).Encode(body.GetGetInstancesResp()) + case *usp_msg.Response_GetSupportedDmResp: + json.NewEncoder(w).Encode(body.GetGetSupportedDmResp()) + case *usp_msg.Response_GetSupportedProtocolResp: + json.NewEncoder(w).Encode(body.GetGetSupportedProtocolResp()) + case *usp_msg.Response_NotifyResp: + json.NewEncoder(w).Encode(body.GetNotifyResp()) + case *usp_msg.Response_OperateResp: + json.NewEncoder(w).Encode(body.GetOperateResp()) + default: + json.NewEncoder(w).Encode("Unknown message answer") + } + return + case <-time.After(REQUEST_TIMEOUT): + log.Printf("Request %s Timed Out", msg.Header.MsgId) + w.WriteHeader(http.StatusGatewayTimeout) + a.QMutex.Lock() + delete(a.MsgQueue, msg.Header.MsgId) + a.QMutex.Unlock() + log.Println("requests queue:", a.MsgQueue) + json.NewEncoder(w).Encode("Request Timed Out") + return + } +} diff --git a/backend/services/controller/internal/api/device.go b/backend/services/controller/internal/api/device.go index 5d4d135..755c708 100644 --- a/backend/services/controller/internal/api/device.go +++ b/backend/services/controller/internal/api/device.go @@ -88,44 +88,7 @@ func (a *Api) deviceFwUpdate(w http.ResponseWriter, r *http.Request) { } msg = utils.NewOperateMsg(receiver) - encodedMsg, err = proto.Marshal(&msg) - if err != nil { - log.Println(err) - w.WriteHeader(http.StatusBadRequest) - return - } - - record = utils.NewUspRecord(encodedMsg, sn) - tr369Message, err = proto.Marshal(&record) - if err != nil { - log.Fatalln("Failed to encode tr369 record:", err) - } - - a.QMutex.Lock() - a.MsgQueue[msg.Header.MsgId] = make(chan usp_msg.Msg) - a.QMutex.Unlock() - log.Println("Sending Msg:", msg.Header.MsgId) - a.Broker.Publish(tr369Message, "oktopus/v1/agent/"+sn, "oktopus/v1/api/"+sn, false) - - select { - case msg := <-a.MsgQueue[msg.Header.MsgId]: - log.Printf("Received Msg: %s", msg.Header.MsgId) - a.QMutex.Lock() - delete(a.MsgQueue, msg.Header.MsgId) - a.QMutex.Unlock() - log.Println("requests queue:", a.MsgQueue) - json.NewEncoder(w).Encode(msg.Body.GetResponse().GetSetResp()) - return - case <-time.After(time.Second * 55): - log.Printf("Request %s Timed Out", msg.Header.MsgId) - w.WriteHeader(http.StatusGatewayTimeout) - a.QMutex.Lock() - delete(a.MsgQueue, msg.Header.MsgId) - a.QMutex.Unlock() - log.Println("requests queue:", a.MsgQueue) - json.NewEncoder(w).Encode("Request Timed Out") - return - } + a.uspCall(msg, sn, w) } // Check which fw image is activated @@ -160,45 +123,7 @@ func (a *Api) deviceGetSupportedParametersMsg(w http.ResponseWriter, r *http.Req } msg := utils.NewGetSupportedParametersMsg(receiver) - encodedMsg, err := proto.Marshal(&msg) - if err != nil { - log.Println(err) - w.WriteHeader(http.StatusBadRequest) - return - } - - record := utils.NewUspRecord(encodedMsg, sn) - tr369Message, err := proto.Marshal(&record) - if err != nil { - log.Fatalln("Failed to encode tr369 record:", err) - } - - //a.Broker.Request(tr369Message, usp_msg.Header_GET, "oktopus/v1/agent/"+sn, "oktopus/v1/get/"+sn) - a.QMutex.Lock() - a.MsgQueue[msg.Header.MsgId] = make(chan usp_msg.Msg) - a.QMutex.Unlock() - log.Println("Sending Msg:", msg.Header.MsgId) - a.Broker.Publish(tr369Message, "oktopus/v1/agent/"+sn, "oktopus/v1/api/"+sn, false) - - select { - case msg := <-a.MsgQueue[msg.Header.MsgId]: - log.Printf("Received Msg: %s", msg.Header.MsgId) - a.QMutex.Lock() - delete(a.MsgQueue, msg.Header.MsgId) - a.QMutex.Unlock() - log.Println("requests queue:", a.MsgQueue) - json.NewEncoder(w).Encode(msg.Body.GetResponse().GetGetSupportedDmResp()) - return - case <-time.After(time.Second * 55): - log.Printf("Request %s Timed Out", msg.Header.MsgId) - w.WriteHeader(http.StatusGatewayTimeout) - a.QMutex.Lock() - delete(a.MsgQueue, msg.Header.MsgId) - a.QMutex.Unlock() - log.Println("requests queue:", a.MsgQueue) - json.NewEncoder(w).Encode("Request Timed Out") - return - } + a.uspCall(msg, sn, w) } func (a *Api) retrieveDevices(w http.ResponseWriter, r *http.Request) { @@ -231,45 +156,7 @@ func (a *Api) deviceCreateMsg(w http.ResponseWriter, r *http.Request) { } msg := utils.NewCreateMsg(receiver) - encodedMsg, err := proto.Marshal(&msg) - if err != nil { - log.Println(err) - w.WriteHeader(http.StatusBadRequest) - return - } - - record := utils.NewUspRecord(encodedMsg, sn) - tr369Message, err := proto.Marshal(&record) - if err != nil { - log.Fatalln("Failed to encode tr369 record:", err) - } - - //a.Broker.Request(tr369Message, usp_msg.Header_GET, "oktopus/v1/agent/"+sn, "oktopus/v1/get/"+sn) - a.QMutex.Lock() - a.MsgQueue[msg.Header.MsgId] = make(chan usp_msg.Msg) - a.QMutex.Unlock() - log.Println("Sending Msg:", msg.Header.MsgId) - a.Broker.Publish(tr369Message, "oktopus/v1/agent/"+sn, "oktopus/v1/api/"+sn, false) - - select { - case msg := <-a.MsgQueue[msg.Header.MsgId]: - log.Printf("Received Msg: %s", msg.Header.MsgId) - a.QMutex.Lock() - delete(a.MsgQueue, msg.Header.MsgId) - a.QMutex.Unlock() - log.Println("requests queue:", a.MsgQueue) - json.NewEncoder(w).Encode(msg.Body.GetResponse().GetAddResp()) - return - case <-time.After(time.Second * 55): - log.Printf("Request %s Timed Out", msg.Header.MsgId) - w.WriteHeader(http.StatusGatewayTimeout) - a.QMutex.Lock() - delete(a.MsgQueue, msg.Header.MsgId) - a.QMutex.Unlock() - log.Println("requests queue:", a.MsgQueue) - json.NewEncoder(w).Encode("Request Timed Out") - return - } + a.uspCall(msg, sn, w) } func (a *Api) deviceGetMsg(w http.ResponseWriter, r *http.Request) { @@ -288,45 +175,7 @@ func (a *Api) deviceGetMsg(w http.ResponseWriter, r *http.Request) { } msg := utils.NewGetMsg(receiver) - encodedMsg, err := proto.Marshal(&msg) - if err != nil { - log.Println(err) - w.WriteHeader(http.StatusBadRequest) - return - } - - record := utils.NewUspRecord(encodedMsg, sn) - tr369Message, err := proto.Marshal(&record) - if err != nil { - log.Fatalln("Failed to encode tr369 record:", err) - } - - a.QMutex.Lock() - a.MsgQueue[msg.Header.MsgId] = make(chan usp_msg.Msg) - a.QMutex.Unlock() - - log.Println("Sending Msg:", msg.Header.MsgId) - a.Broker.Publish(tr369Message, "oktopus/v1/agent/"+sn, "oktopus/v1/api/"+sn, false) - - select { - case msg := <-a.MsgQueue[msg.Header.MsgId]: - log.Printf("Received Msg: %s", msg.Header.MsgId) - a.QMutex.Lock() - delete(a.MsgQueue, msg.Header.MsgId) - a.QMutex.Unlock() - log.Println("requests queue:", a.MsgQueue) - json.NewEncoder(w).Encode(msg.Body.GetResponse().GetGetResp()) - return - case <-time.After(time.Second * 55): - log.Printf("Request %s Timed Out", msg.Header.MsgId) - w.WriteHeader(http.StatusGatewayTimeout) - a.QMutex.Lock() - delete(a.MsgQueue, msg.Header.MsgId) - a.QMutex.Unlock() - log.Println("requests queue:", a.MsgQueue) - json.NewEncoder(w).Encode("Request Timed Out") - return - } + a.uspCall(msg, sn, w) } func (a *Api) deviceDeleteMsg(w http.ResponseWriter, r *http.Request) { @@ -344,45 +193,10 @@ func (a *Api) deviceDeleteMsg(w http.ResponseWriter, r *http.Request) { } msg := utils.NewDelMsg(receiver) - encodedMsg, err := proto.Marshal(&msg) - if err != nil { - log.Println(err) - w.WriteHeader(http.StatusBadRequest) - return - } - - record := utils.NewUspRecord(encodedMsg, sn) - tr369Message, err := proto.Marshal(&record) - if err != nil { - log.Fatalln("Failed to encode tr369 record:", err) - } + a.uspCall(msg, sn, w) //a.Broker.Request(tr369Message, usp_msg.Header_GET, "oktopus/v1/agent/"+sn, "oktopus/v1/get/"+sn) - a.QMutex.Lock() - a.MsgQueue[msg.Header.MsgId] = make(chan usp_msg.Msg) - a.QMutex.Unlock() - log.Println("Sending Msg:", msg.Header.MsgId) - a.Broker.Publish(tr369Message, "oktopus/v1/agent/"+sn, "oktopus/v1/api/"+sn, false) - select { - case msg := <-a.MsgQueue[msg.Header.MsgId]: - log.Printf("Received Msg: %s", msg.Header.MsgId) - a.QMutex.Lock() - delete(a.MsgQueue, msg.Header.MsgId) - a.QMutex.Unlock() - log.Println("requests queue:", a.MsgQueue) - json.NewEncoder(w).Encode(msg.Body.GetResponse().GetDeleteResp()) - return - case <-time.After(time.Second * 55): - log.Printf("Request %s Timed Out", msg.Header.MsgId) - w.WriteHeader(http.StatusGatewayTimeout) - a.QMutex.Lock() - delete(a.MsgQueue, msg.Header.MsgId) - a.QMutex.Unlock() - log.Println("requests queue:", a.MsgQueue) - json.NewEncoder(w).Encode("Request Timed Out") - return - } } func (a *Api) deviceUpdateMsg(w http.ResponseWriter, r *http.Request) { @@ -400,45 +214,7 @@ func (a *Api) deviceUpdateMsg(w http.ResponseWriter, r *http.Request) { } msg := utils.NewSetMsg(receiver) - encodedMsg, err := proto.Marshal(&msg) - if err != nil { - log.Println(err) - w.WriteHeader(http.StatusBadRequest) - return - } - - record := utils.NewUspRecord(encodedMsg, sn) - tr369Message, err := proto.Marshal(&record) - if err != nil { - log.Fatalln("Failed to encode tr369 record:", err) - } - - //a.Broker.Request(tr369Message, usp_msg.Header_GET, "oktopus/v1/agent/"+sn, "oktopus/v1/get/"+sn) - a.QMutex.Lock() - a.MsgQueue[msg.Header.MsgId] = make(chan usp_msg.Msg) - a.QMutex.Unlock() - log.Println("Sending Msg:", msg.Header.MsgId) - a.Broker.Publish(tr369Message, "oktopus/v1/agent/"+sn, "oktopus/v1/api/"+sn, false) - - select { - case msg := <-a.MsgQueue[msg.Header.MsgId]: - log.Printf("Received Msg: %s", msg.Header.MsgId) - a.QMutex.Lock() - delete(a.MsgQueue, msg.Header.MsgId) - a.QMutex.Unlock() - log.Println("requests queue:", a.MsgQueue) - json.NewEncoder(w).Encode(msg.Body.GetResponse().GetSetResp()) - return - case <-time.After(time.Second * 55): - log.Printf("Request %s Timed Out", msg.Header.MsgId) - w.WriteHeader(http.StatusGatewayTimeout) - a.QMutex.Lock() - delete(a.MsgQueue, msg.Header.MsgId) - a.QMutex.Unlock() - log.Println("requests queue:", a.MsgQueue) - json.NewEncoder(w).Encode("Request Timed Out") - return - } + a.uspCall(msg, sn, w) } func (a *Api) deviceExists(sn string, w http.ResponseWriter) { @@ -469,42 +245,5 @@ func (a *Api) deviceGetParameterInstances(w http.ResponseWriter, r *http.Request } msg := utils.NewGetParametersInstancesMsg(receiver) - encodedMsg, err := proto.Marshal(&msg) - if err != nil { - log.Println(err) - w.WriteHeader(http.StatusBadRequest) - return - } - - record := utils.NewUspRecord(encodedMsg, sn) - tr369Message, err := proto.Marshal(&record) - if err != nil { - log.Fatalln("Failed to encode tr369 record:", err) - } - - a.QMutex.Lock() - a.MsgQueue[msg.Header.MsgId] = make(chan usp_msg.Msg) - a.QMutex.Unlock() - log.Println("Sending Msg:", msg.Header.MsgId) - a.Broker.Publish(tr369Message, "oktopus/v1/agent/"+sn, "oktopus/v1/api/"+sn, false) - - select { - case msg := <-a.MsgQueue[msg.Header.MsgId]: - log.Printf("Received Msg: %s", msg.Header.MsgId) - a.QMutex.Lock() - delete(a.MsgQueue, msg.Header.MsgId) - a.QMutex.Unlock() - log.Println("requests queue:", a.MsgQueue) - json.NewEncoder(w).Encode(msg.Body.GetResponse().GetGetInstancesResp()) - return - case <-time.After(time.Second * 55): - log.Printf("Request %s Timed Out", msg.Header.MsgId) - w.WriteHeader(http.StatusGatewayTimeout) - a.QMutex.Lock() - delete(a.MsgQueue, msg.Header.MsgId) - a.QMutex.Unlock() - log.Println("requests queue:", a.MsgQueue) - json.NewEncoder(w).Encode("Request Timed Out") - return - } + a.uspCall(msg, sn, w) } diff --git a/backend/services/controller/internal/api/wifi.go b/backend/services/controller/internal/api/wifi.go index 262a85b..d51c18d 100644 --- a/backend/services/controller/internal/api/wifi.go +++ b/backend/services/controller/internal/api/wifi.go @@ -15,6 +15,19 @@ import ( "google.golang.org/protobuf/proto" ) +type WiFi struct { + SSID string `json:"ssid"` + Password string `json:"password"` + Security string `json:"security"` + SecurityCapabilities []string `json:"securityCapabilities"` + AutoChannelEnable bool `json:"autoChannelEnable"` + Channel int `json:"channel"` + ChannelBandwidth string `json:"channelBandwidth"` + FrequencyBand string `json:"frequencyBand"` + //PossibleChannels []int `json:"PossibleChannels"` + SupportedChannelBandwidths []string `json:"supportedChannelBandwidths"` +} + func (a *Api) deviceWifi(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) sn := vars["sn"]