feat(controller): device get database

This commit is contained in:
leandrofars 2024-03-21 22:14:06 -03:00
parent 60b2040b5a
commit ca82895e20
6 changed files with 154 additions and 94 deletions

View File

@ -51,8 +51,8 @@ func (a *Api) StartApi() {
authentication.HandleFunc("/admin/register", a.registerAdminUser).Methods("POST") authentication.HandleFunc("/admin/register", a.registerAdminUser).Methods("POST")
authentication.HandleFunc("/admin/exists", a.adminUserExists).Methods("GET") authentication.HandleFunc("/admin/exists", a.adminUserExists).Methods("GET")
iot := r.PathPrefix("/api/device").Subrouter() iot := r.PathPrefix("/api/device").Subrouter()
// iot.HandleFunc("", a.retrieveDevices).Methods("GET") iot.HandleFunc("", a.retrieveDevices).Methods("GET")
// iot.HandleFunc("/{id}", a.retrieveDevices).Methods("GET") iot.HandleFunc("/{id}", a.retrieveDevices).Methods("GET")
iot.HandleFunc("/{sn}/{mtp}/get", a.deviceGetMsg).Methods("PUT") iot.HandleFunc("/{sn}/{mtp}/get", a.deviceGetMsg).Methods("PUT")
iot.HandleFunc("/{sn}/{mtp}/add", a.deviceCreateMsg).Methods("PUT") iot.HandleFunc("/{sn}/{mtp}/add", a.deviceCreateMsg).Methods("PUT")
iot.HandleFunc("/{sn}/{mtp}/del", a.deviceDeleteMsg).Methods("PUT") iot.HandleFunc("/{sn}/{mtp}/del", a.deviceDeleteMsg).Methods("PUT")

View File

@ -1,105 +1,104 @@
package api package api
// func (a *Api) retrieveDevices(w http.ResponseWriter, r *http.Request) { import (
// const PAGE_SIZE_LIMIT = 50 "encoding/json"
// const PAGE_SIZE_DEFAULT = 20 "log"
"net/http"
"strconv"
// // Get specific device "go.mongodb.org/mongo-driver/bson"
// id := r.URL.Query().Get("id") )
// if id != "" {
// device, err := a.Db.RetrieveDevice(id)
// if err != nil {
// if err == mongo.ErrNoDocuments {
// json.NewEncoder(w).Encode("Device id: " + id + " not found")
// return
// }
// json.NewEncoder(w).Encode(err)
// w.WriteHeader(http.StatusInternalServerError)
// return
// }
// err = json.NewEncoder(w).Encode(device)
// if err != nil {
// log.Println(err)
// }
// return
// }
// // Get devices with pagination func (a *Api) retrieveDevices(w http.ResponseWriter, r *http.Request) {
// page_n := r.URL.Query().Get("page_number") const PAGE_SIZE_LIMIT = 50
// page_s := r.URL.Query().Get("page_size") const PAGE_SIZE_DEFAULT = 20
// var err error
// var page_number int64 // Get specific device
// if page_n == "" { id := r.URL.Query().Get("id")
// page_number = 0 if id != "" {
// } else { device, err := getDeviceInfo(w, id, a.nc)
// page_number, err = strconv.ParseInt(page_n, 10, 64) if err != nil {
// if err != nil { return
// w.WriteHeader(http.StatusBadRequest) }
// json.NewEncoder(w).Encode("Page number must be an integer") err = json.NewEncoder(w).Encode(device)
// return if err != nil {
// } log.Println(err)
// } }
return
}
// var page_size int64 // Get devices with pagination
// if page_s != "" { page_n := r.URL.Query().Get("page_number")
// page_size, err = strconv.ParseInt(page_s, 10, 64) page_s := r.URL.Query().Get("page_size")
var err error
// if err != nil { var page_number int64
// w.WriteHeader(http.StatusBadRequest) if page_n == "" {
// json.NewEncoder(w).Encode("Page size must be an integer") page_number = 0
// return } else {
// } page_number, err = strconv.ParseInt(page_n, 10, 64)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
json.NewEncoder(w).Encode("Page number must be an integer")
return
}
}
// if page_size > PAGE_SIZE_LIMIT { var page_size int64
// w.WriteHeader(http.StatusBadRequest) if page_s != "" {
// json.NewEncoder(w).Encode("Page size must not exceed " + strconv.Itoa(PAGE_SIZE_LIMIT)) page_size, err = strconv.ParseInt(page_s, 10, 64)
// return
// }
// } else { if err != nil {
// page_size = PAGE_SIZE_DEFAULT w.WriteHeader(http.StatusBadRequest)
// } json.NewEncoder(w).Encode("Page size must be an integer")
return
}
// total, err := a.Db.RetrieveDevicesCount(bson.M{}) if page_size > PAGE_SIZE_LIMIT {
// if err != nil { w.WriteHeader(http.StatusBadRequest)
// w.WriteHeader(http.StatusInternalServerError) json.NewEncoder(w).Encode("Page size must not exceed " + strconv.Itoa(PAGE_SIZE_LIMIT))
// json.NewEncoder(w).Encode("Unable to get devices count from database") return
// return }
// }
// skip := page_number * (page_size - 1) } else {
// if total < page_size { page_size = PAGE_SIZE_DEFAULT
// skip = 0 }
// }
// //TODO: Create filters total, err := getDeviceCount(w, a.nc)
// //TODO: Create sorting if err != nil {
return
}
// sort := bson.M{} skip := page_number * (page_size - 1)
// sort["status"] = 1 if total < page_size {
skip = 0
}
// filter := bson.A{ //TODO: Create filters
// //bson.M{"$match": filter}, //TODO: Create sorting
// bson.M{"$sort": sort}, // shows online devices first
// bson.M{"$skip": skip},
// bson.M{"$limit": page_size},
// }
// devices, err := a.Db.RetrieveDevices(filter) sort := bson.M{}
// if err != nil { sort["status"] = 1
// w.WriteHeader(http.StatusInternalServerError)
// json.NewEncoder(w).Encode("Unable to aggregate database devices info")
// return
// }
// err = json.NewEncoder(w).Encode(map[string]interface{}{ filter := bson.A{
// "pages": total / page_size, //bson.M{"$match": filter},
// "page": page_number, bson.M{"$sort": sort}, // shows online devices first
// "size": page_size, bson.M{"$skip": skip},
// "devices": devices, bson.M{"$limit": page_size},
// }) }
// if err != nil {
// log.Println(err) devices, err := getDevices(w, filter, a.nc)
// } if err != nil {
// } return
}
err = json.NewEncoder(w).Encode(map[string]interface{}{
"pages": total / page_size,
"page": page_number,
"size": page_size,
"devices": devices,
})
if err != nil {
log.Println(err)
}
}

View File

@ -10,6 +10,7 @@ import (
local "github.com/leandrofars/oktopus/internal/nats" local "github.com/leandrofars/oktopus/internal/nats"
"github.com/leandrofars/oktopus/internal/utils" "github.com/leandrofars/oktopus/internal/utils"
"github.com/nats-io/nats.go" "github.com/nats-io/nats.go"
"go.mongodb.org/mongo-driver/bson/primitive"
) )
var errInvalidMtp = errors.New("Invalid MTP, valid options are: mqtt, ws, stomp") var errInvalidMtp = errors.New("Invalid MTP, valid options are: mqtt, ws, stomp")
@ -88,5 +89,31 @@ func getDeviceInfo(w http.ResponseWriter, sn string, nc *nats.Conn) (device *ent
w, w,
nc, nc,
) )
return &msg.Msg, err if msg != nil {
return &msg.Msg, err
}
return nil, err
}
func getDeviceCount(w http.ResponseWriter, nc *nats.Conn) (int64, error) {
msg, err := bridge.NatsReq[int64](
local.NATS_ADAPTER_SUBJECT+"devices.count",
[]byte(""),
w,
nc,
)
return msg.Msg, err
}
func getDevices(w http.ResponseWriter, filter primitive.A, nc *nats.Conn) (*[]entity.Device, error) {
msg, err := bridge.NatsReq[[]entity.Device](
local.NATS_ADAPTER_SUBJECT+"devices.retrieve",
utils.Marshall(filter),
w,
nc,
)
if msg != nil {
return &msg.Msg, err
}
return nil, err
} }

View File

@ -1,7 +1,7 @@
package entity package entity
type DataType interface { type DataType interface {
[]map[string]interface{} | *string | Device []map[string]interface{} | *string | Device | int64 | []Device
} }
type MsgAnswer[T DataType] struct { type MsgAnswer[T DataType] struct {

View File

@ -95,10 +95,18 @@ func (d *Database) CreateDevice(device Device) error {
return err return err
} }
func (d *Database) RetrieveDevices(filter bson.A) ([]Device, error) { func (d *Database) RetrieveDevices(filter bson.A) ([]Device, error) {
cursor, err := d.devices.Aggregate(d.ctx, filter)
var results []Device var results []Device
cursor, err := d.devices.Aggregate(d.ctx, filter)
if err != nil {
return results, err
}
if cursor.Err() != nil {
return results, cursor.Err()
}
for cursor.Next(d.ctx) { for cursor.Next(d.ctx) {
var device Device var device Device

View File

@ -12,6 +12,7 @@ import (
"github.com/OktopUSP/oktopus/backend/services/mtp/adapter/internal/db" "github.com/OktopUSP/oktopus/backend/services/mtp/adapter/internal/db"
local "github.com/OktopUSP/oktopus/backend/services/mtp/adapter/internal/nats" local "github.com/OktopUSP/oktopus/backend/services/mtp/adapter/internal/nats"
"github.com/nats-io/nats.go" "github.com/nats-io/nats.go"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo"
) )
@ -22,6 +23,7 @@ type msgAnswer struct {
func StartRequestsListener(ctx context.Context, nc *nats.Conn, db db.Database) { func StartRequestsListener(ctx context.Context, nc *nats.Conn, db db.Database) {
log.Println("Listening for nats requests") log.Println("Listening for nats requests")
nc.Subscribe(local.ADAPTER_SUBJECT+"*.device", func(msg *nats.Msg) { nc.Subscribe(local.ADAPTER_SUBJECT+"*.device", func(msg *nats.Msg) {
subject := strings.Split(msg.Subject, ".") subject := strings.Split(msg.Subject, ".")
device := subject[len(subject)-2] device := subject[len(subject)-2]
@ -39,6 +41,30 @@ func StartRequestsListener(ctx context.Context, nc *nats.Conn, db db.Database) {
} }
} }
}) })
nc.Subscribe(local.ADAPTER_SUBJECT+"devices.count", func(msg *nats.Msg) {
count, err := db.RetrieveDevicesCount(bson.M{})
if err != nil {
respondMsg(msg.Respond, 500, err.Error())
}
respondMsg(msg.Respond, 200, count)
})
nc.Subscribe(local.ADAPTER_SUBJECT+"devices.retrieve", func(msg *nats.Msg) {
var filter bson.A
err := json.Unmarshal(msg.Data, &filter)
if err != nil {
respondMsg(msg.Respond, 500, err.Error())
}
devicesList, err := db.RetrieveDevices(filter)
if err != nil {
respondMsg(msg.Respond, 500, err.Error())
}
respondMsg(msg.Respond, 200, devicesList)
})
} }
func respondMsg(respond func(data []byte) error, code int, msgData any) { func respondMsg(respond func(data []byte) error, code int, msgData any) {