diff --git a/backend/services/controller/internal/api/api.go b/backend/services/controller/internal/api/api.go index 1ba1e9c..077360b 100644 --- a/backend/services/controller/internal/api/api.go +++ b/backend/services/controller/internal/api/api.go @@ -60,15 +60,15 @@ func (a *Api) StartApi() { iot.HandleFunc("/{sn}/{mtp}/parameters", a.deviceGetSupportedParametersMsg).Methods("PUT") iot.HandleFunc("/{sn}/{mtp}/instances", a.deviceGetParameterInstances).Methods("PUT") iot.HandleFunc("/{sn}/{mtp}/operate", a.deviceOperateMsg).Methods("PUT") - // iot.HandleFunc("/{sn}/{mtp}/fw_update", a.deviceFwUpdate).Methods("PUT") + iot.HandleFunc("/{sn}/{mtp}/fw_update", a.deviceFwUpdate).Methods("PUT") // iot.HandleFunc("/{sn}/{mtp}/wifi", a.deviceWifi).Methods("PUT", "GET") // mtp := r.PathPrefix("/api/mtp").Subrouter() // mtp.HandleFunc("", a.mtpInfo).Methods("GET") - // dash := r.PathPrefix("/api/info").Subrouter() - // dash.HandleFunc("/vendors", a.vendorsInfo).Methods("GET") - // dash.HandleFunc("/status", a.statusInfo).Methods("GET") - // dash.HandleFunc("/device_class", a.productClassInfo).Methods("GET") - // dash.HandleFunc("/general", a.generalInfo).Methods("GET") + dash := r.PathPrefix("/api/info").Subrouter() + dash.HandleFunc("/vendors", a.vendorsInfo).Methods("GET") + dash.HandleFunc("/status", a.statusInfo).Methods("GET") + dash.HandleFunc("/device_class", a.productClassInfo).Methods("GET") + dash.HandleFunc("/general", a.generalInfo).Methods("GET") users := r.PathPrefix("/api/users").Subrouter() users.HandleFunc("", a.retrieveUsers).Methods("GET") @@ -81,13 +81,13 @@ func (a *Api) StartApi() { // return middleware.Middleware(handler) // }) - // dash.Use(func(handler http.Handler) http.Handler { - // return middleware.Middleware(handler) - // }) + dash.Use(func(handler http.Handler) http.Handler { + return middleware.Middleware(handler) + }) - // users.Use(func(handler http.Handler) http.Handler { - // return middleware.Middleware(handler) - // }) + users.Use(func(handler http.Handler) http.Handler { + return middleware.Middleware(handler) + }) /* -------------------------------------------------------------------------- */ corsOpts := cors.GetCorsConfig() @@ -107,74 +107,3 @@ func (a *Api) StartApi() { }() log.Println("Running REST API at port", a.port) } - -// func (a *Api) uspCall(msg usp_msg.Msg, sn string, w http.ResponseWriter, device db.Device) { - -// encodedMsg, err := proto.Marshal(&msg) -// if err != nil { -// log.Println(err) -// w.WriteHeader(http.StatusBadRequest) -// return -// } - -// record := utils.NewUspRecord(encodedMsg, sn) -// tr369Message, err := proto.Marshal(&record) -// if err != nil { -// log.Fatalln("Failed to encode tr369 record:", err) -// } - -// a.QMutex.Lock() -// a.MsgQueue[msg.Header.MsgId] = make(chan usp_msg.Msg) -// a.QMutex.Unlock() -// log.Println("Sending Msg:", msg.Header.MsgId) - -// if device.Mqtt == db.Online { -// a.Mqtt.Publish(tr369Message, "oktopus/v1/agent/"+sn, "oktopus/v1/api/"+sn, false) -// } else if device.Websockets == db.Online { -// a.Websockets.Publish(tr369Message, "", "", false) -// } else if device.Stomp == db.Online { -// //TODO: send stomp message -// } - -// select { -// case msg := <-a.MsgQueue[msg.Header.MsgId]: -// log.Printf("Received Msg: %s", msg.Header.MsgId) -// a.QMutex.Lock() -// delete(a.MsgQueue, msg.Header.MsgId) -// a.QMutex.Unlock() -// log.Println("requests queue:", a.MsgQueue) -// body := msg.Body.GetResponse() -// switch body.RespType.(type) { -// case *usp_msg.Response_GetResp: -// json.NewEncoder(w).Encode(body.GetGetResp()) -// case *usp_msg.Response_DeleteResp: -// json.NewEncoder(w).Encode(body.GetDeleteResp()) -// case *usp_msg.Response_AddResp: -// json.NewEncoder(w).Encode(body.GetAddResp()) -// case *usp_msg.Response_SetResp: -// json.NewEncoder(w).Encode(body.GetSetResp()) -// case *usp_msg.Response_GetInstancesResp: -// json.NewEncoder(w).Encode(body.GetGetInstancesResp()) -// case *usp_msg.Response_GetSupportedDmResp: -// json.NewEncoder(w).Encode(body.GetGetSupportedDmResp()) -// case *usp_msg.Response_GetSupportedProtocolResp: -// json.NewEncoder(w).Encode(body.GetGetSupportedProtocolResp()) -// case *usp_msg.Response_NotifyResp: -// json.NewEncoder(w).Encode(body.GetNotifyResp()) -// case *usp_msg.Response_OperateResp: -// json.NewEncoder(w).Encode(body.GetOperateResp()) -// default: -// json.NewEncoder(w).Encode("Unknown message answer") -// } -// return -// case <-time.After(REQUEST_TIMEOUT): -// log.Printf("Request %s Timed Out", msg.Header.MsgId) -// w.WriteHeader(http.StatusGatewayTimeout) -// a.QMutex.Lock() -// delete(a.MsgQueue, msg.Header.MsgId) -// a.QMutex.Unlock() -// log.Println("requests queue:", a.MsgQueue) -// json.NewEncoder(w).Encode("Request Timed Out") -// return -// } -// } diff --git a/backend/services/controller/internal/api/fw_update.go b/backend/services/controller/internal/api/fw_update.go new file mode 100644 index 0000000..bef03e5 --- /dev/null +++ b/backend/services/controller/internal/api/fw_update.go @@ -0,0 +1,137 @@ +package api + +import ( + "encoding/json" + "log" + "net/http" + + "github.com/leandrofars/oktopus/internal/bridge" + local "github.com/leandrofars/oktopus/internal/nats" + "github.com/leandrofars/oktopus/internal/usp/usp_msg" + "github.com/leandrofars/oktopus/internal/usp/usp_record" + "github.com/leandrofars/oktopus/internal/usp/usp_utils" + "github.com/leandrofars/oktopus/internal/utils" + "google.golang.org/protobuf/proto" +) + +type fwUpdate struct { + Url string +} + +func (a *Api) deviceFwUpdate(w http.ResponseWriter, r *http.Request) { + sn := getSerialNumberFromRequest(r) + mtp, err := getMtpFromRequest(r, w) + if err != nil { + return + } + + if mtp == "" { + var ok bool + mtp, ok = deviceStateOK(w, a.nc, sn) + if !ok { + return + } + } + + var payload fwUpdate + + utils.MarshallDecoder(&payload, r.Body) + + msg := usp_utils.NewGetMsg(usp_msg.Get{ + ParamPaths: []string{"Device.DeviceInfo.FirmwareImage.*.Status"}, + MaxDepth: 1, + }) + + protoMsg, err := proto.Marshal(&msg) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + w.Write(utils.Marshall(err.Error())) + return + } + + record := usp_utils.NewUspRecord(protoMsg, sn) + protoRecord, err := proto.Marshal(&record) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + w.Write(utils.Marshall(err.Error())) + return + } + + data, err := bridge.NatsUspInteraction( + local.DEVICE_SUBJECT_PREFIX+sn+".api", + mtp+"-adapter.usp.v1."+sn+".api", + protoRecord, + w, + a.nc, + ) + if err != nil { + return + } + + var receivedRecord usp_record.Record + err = proto.Unmarshal(data, &receivedRecord) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + w.Write(utils.Marshall(err.Error())) + return + } + var receivedMsg usp_msg.Msg + err = proto.Unmarshal(receivedRecord.GetNoSessionContext().Payload, &receivedMsg) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + w.Write(utils.Marshall(err.Error())) + return + } + + getMsgAnswer := receivedMsg.Body.GetResponse().GetGetResp() + + partition := checkAvaiableFwPartition(getMsgAnswer.ReqPathResults) + if partition == "" { + log.Println("Error to get device available firmware partition, probably it has only one partition") + w.WriteHeader(http.StatusInternalServerError) + json.NewEncoder(w).Encode("Server don't have the hability to update device with only one partition") + return + //TODO: update device with only one partition + } + + log.Println("URL to download firmware:", payload.Url) + + receiver := usp_msg.Operate{ + Command: "Device.DeviceInfo.FirmwareImage." + partition + "Download()", + CommandKey: "Download()", + SendResp: true, + InputArgs: map[string]string{ + "URL": payload.Url, + "AutoActivate": "true", + //"Username": "", + //"Password": "", + "FileSize": "0", //TODO: send firmware length + //"CheckSumAlgorithm": "", + //"CheckSum": "", //TODO: send firmware with checksum + }, + } + + msg = usp_utils.NewOperateMsg(receiver) + err = sendUspMsg(msg, sn, w, a.nc, mtp) +} + +// Check which fw image is activated +func checkAvaiableFwPartition(reqPathResult []*usp_msg.GetResp_RequestedPathResult) string { + for _, x := range reqPathResult { + partitionsNumber := len(x.ResolvedPathResults) + if partitionsNumber > 1 { + log.Printf("Device has %d firmware partitions", partitionsNumber) + for _, y := range x.ResolvedPathResults { + //TODO: verify if validation failed is trustable + if y.ResultParams["Status"] == "Available" || y.ResultParams["Status"] == "ValidationFailed" { + partition := y.ResolvedPath[len(y.ResolvedPath)-2:] + log.Printf("Partition %s is avaiable", partition) + return partition + } + } + } else { + return "" + } + } + return "" +} diff --git a/backend/services/controller/internal/api/info.go b/backend/services/controller/internal/api/info.go new file mode 100644 index 0000000..737d904 --- /dev/null +++ b/backend/services/controller/internal/api/info.go @@ -0,0 +1,153 @@ +package api + +import ( + "encoding/json" + "log" + "net/http" + "time" + + "github.com/leandrofars/oktopus/internal/bridge" + "github.com/leandrofars/oktopus/internal/entity" + local "github.com/leandrofars/oktopus/internal/nats" + "github.com/leandrofars/oktopus/internal/utils" +) + +type StatusCount struct { + Online int + Offline int +} + +type GeneralInfo struct { + MqttRtt string + WebsocketsRtt string + ProductClassCount []entity.ProductClassCount + StatusCount StatusCount + VendorsCount []entity.VendorsCount +} + +// TODO: fix when mqtt broker is not set don't break api +func (a *Api) generalInfo(w http.ResponseWriter, r *http.Request) { + + var result GeneralInfo + + productclasscount, err := bridge.NatsReq[[]entity.ProductClassCount]( + local.NATS_ADAPTER_SUBJECT+"devices.class", + []byte(""), + w, + a.nc, + ) + if err != nil { + return + } + + vendorcount, err := bridge.NatsReq[[]entity.VendorsCount]( + local.NATS_ADAPTER_SUBJECT+"devices.vendors", + []byte(""), + w, + a.nc, + ) + if err != nil { + return + } + + statusCount, err := bridge.NatsReq[[]entity.StatusCount]( + local.NATS_ADAPTER_SUBJECT+"devices.status", + []byte(""), + w, + a.nc, + ) + if err != nil { + return + } + + for _, v := range statusCount.Msg { + switch entity.Status(v.Status) { + case entity.Online: + result.StatusCount.Online = v.Count + case entity.Offline: + result.StatusCount.Offline = v.Count + } + } + + result.VendorsCount = vendorcount.Msg + result.ProductClassCount = productclasscount.Msg + + now := time.Now() + _, err = bridge.NatsReq[time.Duration]( + local.NATS_WS_ADAPTER_SUBJECT_PREFIX+"rtt", + []byte(""), + w, + a.nc, + ) + if err != nil { + return + } + result.WebsocketsRtt = time.Until(now).String() + + now = time.Now() + _, err = bridge.NatsReq[time.Duration]( + local.NATS_MQTT_ADAPTER_SUBJECT_PREFIX+"rtt", + []byte(""), + w, + a.nc, + ) + if err != nil { + return + } + result.MqttRtt = time.Until(now).String() + + err = json.NewEncoder(w).Encode(result) + if err != nil { + log.Println(err) + } +} + +func (a *Api) vendorsInfo(w http.ResponseWriter, r *http.Request) { + vendors, err := bridge.NatsReq[[]entity.VendorsCount]( + local.NATS_ADAPTER_SUBJECT+"devices.vendors", + []byte(""), + w, + a.nc, + ) + if err != nil { + return + } + utils.MarshallEncoder(vendors.Msg, w) +} + +func (a *Api) productClassInfo(w http.ResponseWriter, r *http.Request) { + vendors, err := bridge.NatsReq[[]entity.ProductClassCount]( + local.NATS_ADAPTER_SUBJECT+"devices.class", + []byte(""), + w, + a.nc, + ) + if err != nil { + return + } + utils.MarshallEncoder(vendors.Msg, w) +} + +func (a *Api) statusInfo(w http.ResponseWriter, r *http.Request) { + vendors, err := bridge.NatsReq[[]entity.StatusCount]( + local.NATS_ADAPTER_SUBJECT+"devices.status", + []byte(""), + w, + a.nc, + ) + if err != nil { + return + } + + var status StatusCount + for _, v := range vendors.Msg { + switch entity.Status(v.Status) { + case entity.Online: + status.Online = v.Count + case entity.Offline: + status.Offline = v.Count + } + } + + utils.MarshallEncoder(status, w) +} diff --git a/backend/services/controller/internal/api/mtp.go b/backend/services/controller/internal/api/mtp.go index ae97c53..35f117c 100644 --- a/backend/services/controller/internal/api/mtp.go +++ b/backend/services/controller/internal/api/mtp.go @@ -1,55 +1,55 @@ package api -import ( - "encoding/json" - "net" - "net/http" - "time" +// import ( +// "encoding/json" +// "net" +// "net/http" +// "time" - "golang.org/x/sys/unix" -) +// "golang.org/x/sys/unix" +// ) -type mqttInfo struct { - MqttRtt time.Duration -} +// type mqttInfo struct { +// MqttRtt time.Duration +// } -func (a *Api) mtpInfo(w http.ResponseWriter, r *http.Request) { - //TODO: address with value from env or something like that - conn, err := net.Dial("tcp", "127.0.0.1:1883") - if err != nil { - json.NewEncoder(w).Encode("Error to connect to broker") - w.WriteHeader(http.StatusInternalServerError) - return - } - defer conn.Close() +// func (a *Api) mtpInfo(w http.ResponseWriter, r *http.Request) { +// //TODO: address with value from env or something like that +// conn, err := net.Dial("tcp", "127.0.0.1:1883") +// if err != nil { +// json.NewEncoder(w).Encode("Error to connect to broker") +// w.WriteHeader(http.StatusInternalServerError) +// return +// } +// defer conn.Close() - info, err := tcpInfo(conn.(*net.TCPConn)) - if err != nil { - json.NewEncoder(w).Encode("Error to get TCP socket info") - w.WriteHeader(http.StatusInternalServerError) - return - } - rtt := time.Duration(info.Rtt) * time.Microsecond - json.NewEncoder(w).Encode(mqttInfo{ - MqttRtt: rtt / 1000, - }) -} +// info, err := tcpInfo(conn.(*net.TCPConn)) +// if err != nil { +// json.NewEncoder(w).Encode("Error to get TCP socket info") +// w.WriteHeader(http.StatusInternalServerError) +// return +// } +// rtt := time.Duration(info.Rtt) * time.Microsecond +// json.NewEncoder(w).Encode(mqttInfo{ +// MqttRtt: rtt / 1000, +// }) +// } -func tcpInfo(conn *net.TCPConn) (*unix.TCPInfo, error) { - raw, err := conn.SyscallConn() - if err != nil { - return nil, err - } +// func tcpInfo(conn *net.TCPConn) (*unix.TCPInfo, error) { +// raw, err := conn.SyscallConn() +// if err != nil { +// return nil, err +// } - var info *unix.TCPInfo - ctrlErr := raw.Control(func(fd uintptr) { - info, err = unix.GetsockoptTCPInfo(int(fd), unix.IPPROTO_TCP, unix.TCP_INFO) - }) - switch { - case ctrlErr != nil: - return nil, ctrlErr - case err != nil: - return nil, err - } - return info, nil -} +// var info *unix.TCPInfo +// ctrlErr := raw.Control(func(fd uintptr) { +// info, err = unix.GetsockoptTCPInfo(int(fd), unix.IPPROTO_TCP, unix.TCP_INFO) +// }) +// switch { +// case ctrlErr != nil: +// return nil, ctrlErr +// case err != nil: +// return nil, err +// } +// return info, nil +// } diff --git a/backend/services/controller/internal/api/wifi.go b/backend/services/controller/internal/api/wifi.go new file mode 100644 index 0000000..85d63d9 --- /dev/null +++ b/backend/services/controller/internal/api/wifi.go @@ -0,0 +1,171 @@ +package api + +// import ( +// "encoding/json" +// "log" +// "net/http" +// "strconv" +// "strings" +// "time" + +// "github.com/gorilla/mux" + +// "github.com/leandrofars/oktopus/internal/db" +// usp_msg "github.com/leandrofars/oktopus/internal/usp_message" +// "github.com/leandrofars/oktopus/internal/utils" +// "google.golang.org/protobuf/proto" +// ) + +// type WiFi struct { +// SSID string `json:"ssid"` +// Password string `json:"password"` +// Security string `json:"security"` +// SecurityCapabilities []string `json:"securityCapabilities"` +// AutoChannelEnable bool `json:"autoChannelEnable"` +// Channel int `json:"channel"` +// ChannelBandwidth string `json:"channelBandwidth"` +// FrequencyBand string `json:"frequencyBand"` +// //PossibleChannels []int `json:"PossibleChannels"` +// SupportedChannelBandwidths []string `json:"supportedChannelBandwidths"` +// } + +// func (a *Api) deviceWifi(w http.ResponseWriter, r *http.Request) { +// vars := mux.Vars(r) +// sn := vars["sn"] +// device := a.deviceExists(sn, w) + +// if r.Method == http.MethodGet { +// msg := utils.NewGetMsg(usp_msg.Get{ +// ParamPaths: []string{ +// "Device.WiFi.SSID.[Enable==true].SSID", +// //"Device.WiFi.AccessPoint.[Enable==true].SSIDReference", +// "Device.WiFi.AccessPoint.[Enable==true].Security.ModeEnabled", +// "Device.WiFi.AccessPoint.[Enable==true].Security.ModesSupported", +// //"Device.WiFi.EndPoint.[Enable==true].", +// "Device.WiFi.Radio.[Enable==true].AutoChannelEnable", +// "Device.WiFi.Radio.[Enable==true].Channel", +// "Device.WiFi.Radio.[Enable==true].CurrentOperatingChannelBandwidth", +// "Device.WiFi.Radio.[Enable==true].OperatingFrequencyBand", +// //"Device.WiFi.Radio.[Enable==true].PossibleChannels", +// "Device.WiFi.Radio.[Enable==true].SupportedOperatingChannelBandwidths", +// }, +// MaxDepth: 2, +// }) + +// encodedMsg, err := proto.Marshal(&msg) +// if err != nil { +// log.Println(err) +// w.WriteHeader(http.StatusBadRequest) +// return +// } + +// record := utils.NewUspRecord(encodedMsg, sn) +// tr369Message, err := proto.Marshal(&record) +// if err != nil { +// log.Fatalln("Failed to encode tr369 record:", err) +// } + +// //a.Broker.Request(tr369Message, usp_msg.Header_GET, "oktopus/v1/agent/"+sn, "oktopus/v1/get/"+sn) +// a.QMutex.Lock() +// a.MsgQueue[msg.Header.MsgId] = make(chan usp_msg.Msg) +// a.QMutex.Unlock() +// log.Println("Sending Msg:", msg.Header.MsgId) + +// if device.Mqtt == db.Online { +// a.Mqtt.Publish(tr369Message, "oktopus/v1/agent/"+sn, "oktopus/v1/api/"+sn, false) +// } else if device.Websockets == db.Online { +// a.Websockets.Publish(tr369Message, "", "", false) +// } else if device.Stomp == db.Online { +// //TODO: send stomp message +// } + +// //TODO: verify in protocol and in other models, the Device.Wifi parameters. Maybe in the future, to use SSIDReference from AccessPoint +// select { +// case msg := <-a.MsgQueue[msg.Header.MsgId]: +// log.Printf("Received Msg: %s", msg.Header.MsgId) +// a.QMutex.Lock() +// delete(a.MsgQueue, msg.Header.MsgId) +// a.QMutex.Unlock() +// log.Println("requests queue:", a.MsgQueue) +// answer := msg.Body.GetResponse().GetGetResp() + +// var wifi [2]WiFi + +// //TODO: better algorithm, might use something faster an more reliable +// //TODO: full fill the commented wifi resources +// for _, x := range answer.ReqPathResults { +// if x.RequestedPath == "Device.WiFi.SSID.[Enable==true].SSID" { +// for i, y := range x.ResolvedPathResults { +// wifi[i].SSID = y.ResultParams["SSID"] +// } +// continue +// } +// if x.RequestedPath == "Device.WiFi.AccessPoint.[Enable==true].Security.ModeEnabled" { +// for i, y := range x.ResolvedPathResults { +// wifi[i].Security = y.ResultParams["Security.ModeEnabled"] +// } +// continue +// } +// if x.RequestedPath == "Device.WiFi.AccessPoint.[Enable==true].Security.ModesSupported" { +// for i, y := range x.ResolvedPathResults { +// wifi[i].SecurityCapabilities = strings.Split(y.ResultParams["Security.ModesSupported"], ",") +// } +// continue +// } +// if x.RequestedPath == "Device.WiFi.Radio.[Enable==true].AutoChannelEnable" { +// for i, y := range x.ResolvedPathResults { +// autoChannel, err := strconv.ParseBool(y.ResultParams["AutoChannelEnable"]) +// if err != nil { +// log.Println(err) +// wifi[i].AutoChannelEnable = false +// } else { +// wifi[i].AutoChannelEnable = autoChannel +// } +// } +// continue +// } +// if x.RequestedPath == "Device.WiFi.Radio.[Enable==true].Channel" { +// for i, y := range x.ResolvedPathResults { +// channel, err := strconv.Atoi(y.ResultParams["Channel"]) +// if err != nil { +// log.Println(err) +// wifi[i].Channel = -1 +// } else { +// wifi[i].Channel = channel +// } +// } +// continue +// } +// if x.RequestedPath == "Device.WiFi.Radio.[Enable==true].CurrentOperatingChannelBandwidth" { +// for i, y := range x.ResolvedPathResults { +// wifi[i].ChannelBandwidth = y.ResultParams["CurrentOperatingChannelBandwidth"] +// } +// continue +// } +// if x.RequestedPath == "Device.WiFi.Radio.[Enable==true].OperatingFrequencyBand" { +// for i, y := range x.ResolvedPathResults { +// wifi[i].FrequencyBand = y.ResultParams["OperatingFrequencyBand"] +// } +// continue +// } +// if x.RequestedPath == "Device.WiFi.Radio.[Enable==true].SupportedOperatingChannelBandwidths" { +// for i, y := range x.ResolvedPathResults { +// wifi[i].SupportedChannelBandwidths = strings.Split(y.ResultParams["SupportedOperatingChannelBandwidths"], ",") +// } +// continue +// } +// } +// json.NewEncoder(w).Encode(&wifi) +// return +// case <-time.After(time.Second * 45): +// log.Printf("Request %s Timed Out", msg.Header.MsgId) +// w.WriteHeader(http.StatusGatewayTimeout) +// a.QMutex.Lock() +// delete(a.MsgQueue, msg.Header.MsgId) +// a.QMutex.Unlock() +// log.Println("requests queue:", a.MsgQueue) +// json.NewEncoder(w).Encode("Request Timed Out") +// return +// } +// } +// } diff --git a/backend/services/controller/internal/entity/device.go b/backend/services/controller/internal/entity/device.go index f7812c5..39dcf42 100644 --- a/backend/services/controller/internal/entity/device.go +++ b/backend/services/controller/internal/entity/device.go @@ -12,3 +12,18 @@ type Device struct { Stomp Status Websockets Status } + +type VendorsCount struct { + Vendor string `bson:"_id" json:"vendor"` + Count int `bson:"count" json:"count"` +} + +type ProductClassCount struct { + ProductClass string `bson:"_id" json:"productClass"` + Count int `bson:"count" json:"count"` +} + +type StatusCount struct { + Status int `bson:"_id" json:"status"` + Count int `bson:"count" json:"count"` +} diff --git a/backend/services/controller/internal/entity/msg.go b/backend/services/controller/internal/entity/msg.go index 8c68aae..a3e7eda 100644 --- a/backend/services/controller/internal/entity/msg.go +++ b/backend/services/controller/internal/entity/msg.go @@ -1,7 +1,9 @@ package entity +import "time" + type DataType interface { - []map[string]interface{} | *string | Device | int64 | []Device + []map[string]interface{} | *string | Device | int64 | []Device | []VendorsCount | []ProductClassCount | []StatusCount | time.Duration } type MsgAnswer[T DataType] struct { diff --git a/backend/services/controller/internal/entity/usp.go b/backend/services/controller/internal/entity/usp.go deleted file mode 100644 index a642ecd..0000000 --- a/backend/services/controller/internal/entity/usp.go +++ /dev/null @@ -1,7 +0,0 @@ -package entity - -import "github.com/leandrofars/oktopus/internal/usp/usp_msg" - -type UspType interface { - usp_msg.GetSupportedDM -} \ No newline at end of file diff --git a/backend/services/controller/internal/nats/nats.go b/backend/services/controller/internal/nats/nats.go index 2023a5e..b6ca86f 100644 --- a/backend/services/controller/internal/nats/nats.go +++ b/backend/services/controller/internal/nats/nats.go @@ -16,7 +16,7 @@ const ( NATS_MQTT_ADAPTER_SUBJECT_PREFIX = "mqtt-adapter.usp.v1." NATS_ADAPTER_SUBJECT = "adapter.usp.v1." NATS_WS_SUBJECT_PREFIX = "ws.usp.v1." - NATS_WS_ADAPTER_SUBJECT_PREFIX = "ws-adapter.usp.v1.*." + NATS_WS_ADAPTER_SUBJECT_PREFIX = "ws-adapter.usp.v1." DEVICE_SUBJECT_PREFIX = "device.usp.v1." ) diff --git a/backend/services/mtp/adapter/internal/reqs/reqs.go b/backend/services/mtp/adapter/internal/reqs/reqs.go index 2a6eb44..e6d5f0a 100644 --- a/backend/services/mtp/adapter/internal/reqs/reqs.go +++ b/backend/services/mtp/adapter/internal/reqs/reqs.go @@ -65,6 +65,30 @@ func StartRequestsListener(ctx context.Context, nc *nats.Conn, db db.Database) { } respondMsg(msg.Respond, 200, devicesList) }) + + nc.Subscribe(local.ADAPTER_SUBJECT+"devices.class", func(msg *nats.Msg) { + productClassCount, err := db.RetrieveProductsClassInfo() + if err != nil { + respondMsg(msg.Respond, 500, err.Error()) + } + respondMsg(msg.Respond, 200, productClassCount) + }) + + nc.Subscribe(local.ADAPTER_SUBJECT+"devices.vendors", func(msg *nats.Msg) { + productClassCount, err := db.RetrieveVendorsInfo() + if err != nil { + respondMsg(msg.Respond, 500, err.Error()) + } + respondMsg(msg.Respond, 200, productClassCount) + }) + + nc.Subscribe(local.ADAPTER_SUBJECT+"devices.status", func(msg *nats.Msg) { + productClassCount, err := db.RetrieveStatusInfo() + if err != nil { + respondMsg(msg.Respond, 500, err.Error()) + } + respondMsg(msg.Respond, 200, productClassCount) + }) } func respondMsg(respond func(data []byte) error, code int, msgData any) { diff --git a/backend/services/mtp/mqtt-adapter/internal/bridge/bridge.go b/backend/services/mtp/mqtt-adapter/internal/bridge/bridge.go index b3f095a..ecb68d2 100644 --- a/backend/services/mtp/mqtt-adapter/internal/bridge/bridge.go +++ b/backend/services/mtp/mqtt-adapter/internal/bridge/bridge.go @@ -2,7 +2,9 @@ package bridge import ( "context" + "encoding/json" "log" + "net" "net/url" "strings" "time" @@ -12,6 +14,7 @@ import ( "github.com/eclipse/paho.golang/paho" "github.com/google/uuid" "github.com/nats-io/nats.go" + "golang.org/x/sys/unix" ) const ( @@ -19,8 +22,13 @@ const ( ONLINE ) +type msgAnswer struct { + Code int + Msg any +} + const NATS_MQTT_SUBJECT_PREFIX = "mqtt.usp.v1." -const NATS_MQTT_ADAPTER_SUBJECT_PREFIX = "mqtt-adapter.usp.v1.*." +const NATS_MQTT_ADAPTER_SUBJECT_PREFIX = "mqtt-adapter.usp.v1." const DEVICE_SUBJECT_PREFIX = "device.usp.v1." const MQTT_TOPIC_PREFIX = "oktopus/usp/" @@ -91,7 +99,7 @@ func (b *Bridge) StartBridge() { } func (b *Bridge) natsMessageHandler(cm *autopaho.ConnectionManager) { - b.Sub(NATS_MQTT_ADAPTER_SUBJECT_PREFIX+"info", func(m *nats.Msg) { + b.Sub(NATS_MQTT_ADAPTER_SUBJECT_PREFIX+"*.info", func(m *nats.Msg) { log.Printf("Received message on info subject") cm.Publish(b.Ctx, &paho.Publish{ @@ -105,7 +113,7 @@ func (b *Bridge) natsMessageHandler(cm *autopaho.ConnectionManager) { }) - b.Sub(NATS_MQTT_ADAPTER_SUBJECT_PREFIX+"api", func(m *nats.Msg) { + b.Sub(NATS_MQTT_ADAPTER_SUBJECT_PREFIX+"*.api", func(m *nats.Msg) { log.Printf("Received message on api subject") cm.Publish(b.Ctx, &paho.Publish{ @@ -118,6 +126,27 @@ func (b *Bridge) natsMessageHandler(cm *autopaho.ConnectionManager) { }) }) + + b.Sub(NATS_MQTT_ADAPTER_SUBJECT_PREFIX+"rtt", func(msg *nats.Msg) { + + log.Printf("Received message on rtt subject") + url := strings.Split(b.Mqtt.Url, "://")[1] + conn, err := net.Dial("tcp", url) + if err != nil { + respondMsg(msg.Respond, 500, err.Error()) + return + } + defer conn.Close() + + info, err := tcpInfo(conn.(*net.TCPConn)) + if err != nil { + respondMsg(msg.Respond, 500, err.Error()) + return + } + rtt := time.Duration(info.Rtt) * time.Microsecond + + respondMsg(msg.Respond, 200, rtt/1000) + }) } func getDeviceFromSubject(subject string) string { @@ -210,3 +239,37 @@ func buildClientConfig(status, controller, apiMsg chan *paho.Publish, id string) return &clientConfig } + +func respondMsg(respond func(data []byte) error, code int, msgData any) { + + msg, err := json.Marshal(msgAnswer{ + Code: code, + Msg: msgData, + }) + if err != nil { + log.Printf("Failed to marshal message: %q", err) + respond([]byte(err.Error())) + return + } + + respond([]byte(msg)) +} + +func tcpInfo(conn *net.TCPConn) (*unix.TCPInfo, error) { + raw, err := conn.SyscallConn() + if err != nil { + return nil, err + } + + var info *unix.TCPInfo + ctrlErr := raw.Control(func(fd uintptr) { + info, err = unix.GetsockoptTCPInfo(int(fd), unix.IPPROTO_TCP, unix.TCP_INFO) + }) + switch { + case ctrlErr != nil: + return nil, ctrlErr + case err != nil: + return nil, err + } + return info, nil +} diff --git a/backend/services/mtp/ws-adapter/internal/bridge/bridge.go b/backend/services/mtp/ws-adapter/internal/bridge/bridge.go index 6ddd29d..6687632 100644 --- a/backend/services/mtp/ws-adapter/internal/bridge/bridge.go +++ b/backend/services/mtp/ws-adapter/internal/bridge/bridge.go @@ -5,11 +5,11 @@ import ( "crypto/tls" "encoding/json" "log" + "net" "reflect" "strings" "sync" - // "reflect" "time" "github.com/OktopUSP/oktopus/backend/services/mtp/ws-adapter/internal/config" @@ -17,12 +17,13 @@ import ( "github.com/OktopUSP/oktopus/backend/services/mtp/ws-adapter/internal/usp/usp_record" "github.com/gorilla/websocket" "github.com/nats-io/nats.go" + "golang.org/x/sys/unix" "google.golang.org/protobuf/proto" ) const ( NATS_WS_SUBJECT_PREFIX = "ws.usp.v1." - NATS_WS_ADAPTER_SUBJECT_PREFIX = "ws-adapter.usp.v1.*." + NATS_WS_ADAPTER_SUBJECT_PREFIX = "ws-adapter.usp.v1." DEVICE_SUBJECT_PREFIX = "device.usp.v1." WS_CONNECTION_RETRY = 10 * time.Second ) @@ -32,6 +33,11 @@ const ( ONLINE ) +type msgAnswer struct { + Code int + Msg any +} + type deviceStatus struct { Eid string Status string @@ -130,7 +136,7 @@ func (b *Bridge) subscribe(wc *websocket.Conn) { b.NewDeviceQueue = make(map[string]string) b.NewDevQMutex = &sync.Mutex{} - b.Sub(NATS_WS_ADAPTER_SUBJECT_PREFIX+"info", func(msg *nats.Msg) { + b.Sub(NATS_WS_ADAPTER_SUBJECT_PREFIX+"*.info", func(msg *nats.Msg) { log.Printf("Received message on info subject") @@ -148,7 +154,7 @@ func (b *Bridge) subscribe(wc *websocket.Conn) { } }) - b.Sub(NATS_WS_ADAPTER_SUBJECT_PREFIX+"api", func(msg *nats.Msg) { + b.Sub(NATS_WS_ADAPTER_SUBJECT_PREFIX+"*.api", func(msg *nats.Msg) { log.Printf("Received message on api subject") @@ -158,6 +164,43 @@ func (b *Bridge) subscribe(wc *websocket.Conn) { return } }) + + b.Sub(NATS_WS_ADAPTER_SUBJECT_PREFIX+"rtt", func(msg *nats.Msg) { + + log.Printf("Received message on rtt subject") + + conn, err := net.Dial("tcp", b.Ws.Addr+b.Ws.Port) + if err != nil { + respondMsg(msg.Respond, 500, err.Error()) + return + } + defer conn.Close() + + info, err := tcpInfo(conn.(*net.TCPConn)) + if err != nil { + respondMsg(msg.Respond, 500, err.Error()) + return + } + rtt := time.Duration(info.Rtt) * time.Microsecond + + respondMsg(msg.Respond, 200, rtt/1000) + + }) +} + +func respondMsg(respond func(data []byte) error, code int, msgData any) { + + msg, err := json.Marshal(msgAnswer{ + Code: code, + Msg: msgData, + }) + if err != nil { + log.Printf("Failed to marshal message: %q", err) + respond([]byte(err.Error())) + return + } + + respond([]byte(msg)) } func (b *Bridge) newDeviceMsgHandler(wc *websocket.Conn, device string, msg []byte) { @@ -201,3 +244,22 @@ func (b *Bridge) newDialer() websocket.Dialer { }, } } + +func tcpInfo(conn *net.TCPConn) (*unix.TCPInfo, error) { + raw, err := conn.SyscallConn() + if err != nil { + return nil, err + } + + var info *unix.TCPInfo + ctrlErr := raw.Control(func(fd uintptr) { + info, err = unix.GetsockoptTCPInfo(int(fd), unix.IPPROTO_TCP, unix.TCP_INFO) + }) + switch { + case ctrlErr != nil: + return nil, ctrlErr + case err != nil: + return nil, err + } + return info, nil +}