From ca82895e209ef3b88ed0a090efeb5e0134d118ea Mon Sep 17 00:00:00 2001 From: leandrofars Date: Thu, 21 Mar 2024 22:14:06 -0300 Subject: [PATCH] feat(controller): device get database --- .../services/controller/internal/api/api.go | 4 +- .../controller/internal/api/device.go | 177 +++++++++--------- .../services/controller/internal/api/utils.go | 29 ++- .../controller/internal/entity/msg.go | 2 +- .../mtp/adapter/internal/db/device.go | 10 +- .../mtp/adapter/internal/reqs/reqs.go | 26 +++ 6 files changed, 154 insertions(+), 94 deletions(-) diff --git a/backend/services/controller/internal/api/api.go b/backend/services/controller/internal/api/api.go index 254dd31..1ba1e9c 100644 --- a/backend/services/controller/internal/api/api.go +++ b/backend/services/controller/internal/api/api.go @@ -51,8 +51,8 @@ func (a *Api) StartApi() { authentication.HandleFunc("/admin/register", a.registerAdminUser).Methods("POST") authentication.HandleFunc("/admin/exists", a.adminUserExists).Methods("GET") iot := r.PathPrefix("/api/device").Subrouter() - // iot.HandleFunc("", a.retrieveDevices).Methods("GET") - // iot.HandleFunc("/{id}", a.retrieveDevices).Methods("GET") + iot.HandleFunc("", a.retrieveDevices).Methods("GET") + iot.HandleFunc("/{id}", a.retrieveDevices).Methods("GET") iot.HandleFunc("/{sn}/{mtp}/get", a.deviceGetMsg).Methods("PUT") iot.HandleFunc("/{sn}/{mtp}/add", a.deviceCreateMsg).Methods("PUT") iot.HandleFunc("/{sn}/{mtp}/del", a.deviceDeleteMsg).Methods("PUT") diff --git a/backend/services/controller/internal/api/device.go b/backend/services/controller/internal/api/device.go index 0f00ca1..0131294 100644 --- a/backend/services/controller/internal/api/device.go +++ b/backend/services/controller/internal/api/device.go @@ -1,105 +1,104 @@ package api -// func (a *Api) retrieveDevices(w http.ResponseWriter, r *http.Request) { -// const PAGE_SIZE_LIMIT = 50 -// const PAGE_SIZE_DEFAULT = 20 +import ( + "encoding/json" + "log" + "net/http" + "strconv" -// // Get specific device -// id := r.URL.Query().Get("id") -// if id != "" { -// device, err := a.Db.RetrieveDevice(id) -// if err != nil { -// if err == mongo.ErrNoDocuments { -// json.NewEncoder(w).Encode("Device id: " + id + " not found") -// return -// } -// json.NewEncoder(w).Encode(err) -// w.WriteHeader(http.StatusInternalServerError) -// return -// } -// err = json.NewEncoder(w).Encode(device) -// if err != nil { -// log.Println(err) -// } -// return -// } + "go.mongodb.org/mongo-driver/bson" +) -// // Get devices with pagination -// page_n := r.URL.Query().Get("page_number") -// page_s := r.URL.Query().Get("page_size") -// var err error +func (a *Api) retrieveDevices(w http.ResponseWriter, r *http.Request) { + const PAGE_SIZE_LIMIT = 50 + const PAGE_SIZE_DEFAULT = 20 -// var page_number int64 -// if page_n == "" { -// page_number = 0 -// } else { -// page_number, err = strconv.ParseInt(page_n, 10, 64) -// if err != nil { -// w.WriteHeader(http.StatusBadRequest) -// json.NewEncoder(w).Encode("Page number must be an integer") -// return -// } -// } + // Get specific device + id := r.URL.Query().Get("id") + if id != "" { + device, err := getDeviceInfo(w, id, a.nc) + if err != nil { + return + } + err = json.NewEncoder(w).Encode(device) + if err != nil { + log.Println(err) + } + return + } -// var page_size int64 -// if page_s != "" { -// page_size, err = strconv.ParseInt(page_s, 10, 64) + // Get devices with pagination + page_n := r.URL.Query().Get("page_number") + page_s := r.URL.Query().Get("page_size") + var err error -// if err != nil { -// w.WriteHeader(http.StatusBadRequest) -// json.NewEncoder(w).Encode("Page size must be an integer") -// return -// } + var page_number int64 + if page_n == "" { + page_number = 0 + } else { + page_number, err = strconv.ParseInt(page_n, 10, 64) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + json.NewEncoder(w).Encode("Page number must be an integer") + return + } + } -// if page_size > PAGE_SIZE_LIMIT { -// w.WriteHeader(http.StatusBadRequest) -// json.NewEncoder(w).Encode("Page size must not exceed " + strconv.Itoa(PAGE_SIZE_LIMIT)) -// return -// } + var page_size int64 + if page_s != "" { + page_size, err = strconv.ParseInt(page_s, 10, 64) -// } else { -// page_size = PAGE_SIZE_DEFAULT -// } + if err != nil { + w.WriteHeader(http.StatusBadRequest) + json.NewEncoder(w).Encode("Page size must be an integer") + return + } -// total, err := a.Db.RetrieveDevicesCount(bson.M{}) -// if err != nil { -// w.WriteHeader(http.StatusInternalServerError) -// json.NewEncoder(w).Encode("Unable to get devices count from database") -// return -// } + if page_size > PAGE_SIZE_LIMIT { + w.WriteHeader(http.StatusBadRequest) + json.NewEncoder(w).Encode("Page size must not exceed " + strconv.Itoa(PAGE_SIZE_LIMIT)) + return + } -// skip := page_number * (page_size - 1) -// if total < page_size { -// skip = 0 -// } + } else { + page_size = PAGE_SIZE_DEFAULT + } -// //TODO: Create filters -// //TODO: Create sorting + total, err := getDeviceCount(w, a.nc) + if err != nil { + return + } -// sort := bson.M{} -// sort["status"] = 1 + skip := page_number * (page_size - 1) + if total < page_size { + skip = 0 + } -// filter := bson.A{ -// //bson.M{"$match": filter}, -// bson.M{"$sort": sort}, // shows online devices first -// bson.M{"$skip": skip}, -// bson.M{"$limit": page_size}, -// } + //TODO: Create filters + //TODO: Create sorting -// devices, err := a.Db.RetrieveDevices(filter) -// if err != nil { -// w.WriteHeader(http.StatusInternalServerError) -// json.NewEncoder(w).Encode("Unable to aggregate database devices info") -// return -// } + sort := bson.M{} + sort["status"] = 1 -// err = json.NewEncoder(w).Encode(map[string]interface{}{ -// "pages": total / page_size, -// "page": page_number, -// "size": page_size, -// "devices": devices, -// }) -// if err != nil { -// log.Println(err) -// } -// } + filter := bson.A{ + //bson.M{"$match": filter}, + bson.M{"$sort": sort}, // shows online devices first + bson.M{"$skip": skip}, + bson.M{"$limit": page_size}, + } + + devices, err := getDevices(w, filter, a.nc) + if err != nil { + return + } + + err = json.NewEncoder(w).Encode(map[string]interface{}{ + "pages": total / page_size, + "page": page_number, + "size": page_size, + "devices": devices, + }) + if err != nil { + log.Println(err) + } +} diff --git a/backend/services/controller/internal/api/utils.go b/backend/services/controller/internal/api/utils.go index 77c20b1..7a81f88 100644 --- a/backend/services/controller/internal/api/utils.go +++ b/backend/services/controller/internal/api/utils.go @@ -10,6 +10,7 @@ import ( local "github.com/leandrofars/oktopus/internal/nats" "github.com/leandrofars/oktopus/internal/utils" "github.com/nats-io/nats.go" + "go.mongodb.org/mongo-driver/bson/primitive" ) var errInvalidMtp = errors.New("Invalid MTP, valid options are: mqtt, ws, stomp") @@ -88,5 +89,31 @@ func getDeviceInfo(w http.ResponseWriter, sn string, nc *nats.Conn) (device *ent w, nc, ) - return &msg.Msg, err + if msg != nil { + return &msg.Msg, err + } + return nil, err +} + +func getDeviceCount(w http.ResponseWriter, nc *nats.Conn) (int64, error) { + msg, err := bridge.NatsReq[int64]( + local.NATS_ADAPTER_SUBJECT+"devices.count", + []byte(""), + w, + nc, + ) + return msg.Msg, err +} + +func getDevices(w http.ResponseWriter, filter primitive.A, nc *nats.Conn) (*[]entity.Device, error) { + msg, err := bridge.NatsReq[[]entity.Device]( + local.NATS_ADAPTER_SUBJECT+"devices.retrieve", + utils.Marshall(filter), + w, + nc, + ) + if msg != nil { + return &msg.Msg, err + } + return nil, err } diff --git a/backend/services/controller/internal/entity/msg.go b/backend/services/controller/internal/entity/msg.go index 9ba8498..8c68aae 100644 --- a/backend/services/controller/internal/entity/msg.go +++ b/backend/services/controller/internal/entity/msg.go @@ -1,7 +1,7 @@ package entity type DataType interface { - []map[string]interface{} | *string | Device + []map[string]interface{} | *string | Device | int64 | []Device } type MsgAnswer[T DataType] struct { diff --git a/backend/services/mtp/adapter/internal/db/device.go b/backend/services/mtp/adapter/internal/db/device.go index 81b0a29..14dcd9d 100644 --- a/backend/services/mtp/adapter/internal/db/device.go +++ b/backend/services/mtp/adapter/internal/db/device.go @@ -95,10 +95,18 @@ func (d *Database) CreateDevice(device Device) error { return err } func (d *Database) RetrieveDevices(filter bson.A) ([]Device, error) { - cursor, err := d.devices.Aggregate(d.ctx, filter) var results []Device + cursor, err := d.devices.Aggregate(d.ctx, filter) + if err != nil { + return results, err + } + + if cursor.Err() != nil { + return results, cursor.Err() + } + for cursor.Next(d.ctx) { var device Device diff --git a/backend/services/mtp/adapter/internal/reqs/reqs.go b/backend/services/mtp/adapter/internal/reqs/reqs.go index 5d6e028..2a6eb44 100644 --- a/backend/services/mtp/adapter/internal/reqs/reqs.go +++ b/backend/services/mtp/adapter/internal/reqs/reqs.go @@ -12,6 +12,7 @@ import ( "github.com/OktopUSP/oktopus/backend/services/mtp/adapter/internal/db" local "github.com/OktopUSP/oktopus/backend/services/mtp/adapter/internal/nats" "github.com/nats-io/nats.go" + "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" ) @@ -22,6 +23,7 @@ type msgAnswer struct { func StartRequestsListener(ctx context.Context, nc *nats.Conn, db db.Database) { log.Println("Listening for nats requests") + nc.Subscribe(local.ADAPTER_SUBJECT+"*.device", func(msg *nats.Msg) { subject := strings.Split(msg.Subject, ".") device := subject[len(subject)-2] @@ -39,6 +41,30 @@ func StartRequestsListener(ctx context.Context, nc *nats.Conn, db db.Database) { } } }) + + nc.Subscribe(local.ADAPTER_SUBJECT+"devices.count", func(msg *nats.Msg) { + count, err := db.RetrieveDevicesCount(bson.M{}) + if err != nil { + respondMsg(msg.Respond, 500, err.Error()) + } + respondMsg(msg.Respond, 200, count) + }) + + nc.Subscribe(local.ADAPTER_SUBJECT+"devices.retrieve", func(msg *nats.Msg) { + + var filter bson.A + + err := json.Unmarshal(msg.Data, &filter) + if err != nil { + respondMsg(msg.Respond, 500, err.Error()) + } + + devicesList, err := db.RetrieveDevices(filter) + if err != nil { + respondMsg(msg.Respond, 500, err.Error()) + } + respondMsg(msg.Respond, 200, devicesList) + }) } func respondMsg(respond func(data []byte) error, code int, msgData any) {