oktopus/backend/services/mtp/adapter/internal/db/device.go

247 lines
5.2 KiB
Go

package db
import (
"log"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
type MTP int32
const (
UNDEFINED MTP = iota
MQTT
STOMP
WEBSOCKETS
CWMP
)
type Status uint8
const (
Offline Status = iota
Associating
Online
)
type Device struct {
SN string
Model string
Customer string
Vendor string
Version string
ProductClass string
Alias string
Status Status
Mqtt Status
Stomp Status
Websockets Status
Cwmp Status
}
type DevicesList struct {
Devices []Device `json:"devices" bson:"documents"`
Total int64 `json:"total" bson:"totalCount"`
}
type FilterOptions struct {
Models []string `json:"models"`
ProductClasses []string `json:"productClasses"`
Vendors []string `json:"vendors"`
Versions []string `json:"versions"`
}
func (d *Database) CreateDevice(device Device) error {
var result bson.M
var deviceExistent Device
d.m.Lock()
defer d.m.Unlock()
/* ------------------ Do not overwrite status of other mtp ------------------ */
err := d.devices.FindOne(d.ctx, bson.D{{"sn", device.SN}}, nil).Decode(&deviceExistent)
if err == nil {
if deviceExistent.Mqtt == Online {
device.Mqtt = Online
}
if deviceExistent.Stomp == Online {
device.Stomp = Online
}
if deviceExistent.Websockets == Online {
device.Websockets = Online
}
if deviceExistent.Cwmp == Online {
device.Cwmp = Online
}
} else {
if err != mongo.ErrNoDocuments {
log.Println(err)
return err
}
}
/* -------------------------------------------------------------------------- */
callback := func(sessCtx mongo.SessionContext) (interface{}, error) {
// Important: You must pass sessCtx as the Context parameter to the operations for them to be executed in the
// transaction.
opts := options.FindOneAndReplace().SetUpsert(true)
err := d.devices.FindOneAndReplace(d.ctx, bson.D{{"sn", device.SN}}, device, opts).Decode(&result)
if err != nil {
if err == mongo.ErrNoDocuments {
log.Printf("New device %s added to database", device.SN)
return nil, nil
}
return nil, err
}
log.Printf("Device %s already existed, and got replaced for new info", device.SN)
return nil, nil
}
session, err := d.client.StartSession()
if err != nil {
return err
}
defer session.EndSession(d.ctx)
_, err = session.WithTransaction(d.ctx, callback)
if err != nil {
return err
}
return err
}
func (d *Database) RetrieveDevices(filter bson.A) (*DevicesList, error) {
var results []DevicesList
cursor, err := d.devices.Aggregate(d.ctx, filter)
if err != nil {
return nil, err
}
if cursor.Err() != nil {
return nil, cursor.Err()
}
defer cursor.Close(d.ctx)
if err := cursor.All(d.ctx, &results); err != nil {
log.Println(err)
return nil, err
}
//log.Printf("results: %++v", results)
return &results[0], err
}
func (d *Database) RetrieveDeviceFilterOptions() (FilterOptions, error) {
filter := bson.A{
bson.D{
{"$group",
bson.D{
{"_id", primitive.Null{}},
{"vendors", bson.D{{"$addToSet", "$vendor"}}},
{"versions", bson.D{{"$addToSet", "$version"}}},
{"productClasses", bson.D{{"$addToSet", "$productclass"}}},
{"models", bson.D{{"$addToSet", "$model"}}},
},
},
},
bson.D{
{"$project",
bson.D{
{"_id", 0},
{"vendors", 1},
{"versions", 1},
{"productClasses", 1},
{"models", 1},
},
},
},
}
var results []FilterOptions
cursor, err := d.devices.Aggregate(d.ctx, filter)
if err != nil {
log.Println(err)
return FilterOptions{}, err
}
defer cursor.Close(d.ctx)
if err := cursor.All(d.ctx, &results); err != nil {
log.Println(err)
return FilterOptions{}, err
}
if len(results) > 0 {
return results[0], nil
} else {
return FilterOptions{
Models: []string{},
ProductClasses: []string{},
Vendors: []string{},
Versions: []string{},
}, nil
}
}
func (d *Database) DeleteDevices(filter bson.D) (int64, error) {
result, err := d.devices.DeleteMany(d.ctx, filter)
if err != nil {
log.Println(err)
}
return result.DeletedCount, err
}
func (d *Database) RetrieveDevice(sn string) (Device, error) {
var result Device
err := d.devices.FindOne(d.ctx, bson.D{{"sn", sn}}, nil).Decode(&result)
if err != nil {
log.Println(err)
}
return result, err
}
func (d *Database) RetrieveDevicesCount(filter bson.M) (int64, error) {
count, err := d.devices.CountDocuments(d.ctx, filter)
return count, err
}
func (d *Database) DeleteDevice() {
}
func (d *Database) SetDeviceAlias(sn string, newAlias string) error {
err := d.devices.FindOneAndUpdate(d.ctx, bson.D{{"sn", sn}}, bson.D{{"$set", bson.D{{"alias", newAlias}}}}).Err()
return err
}
func (d *Database) DeviceExists(sn string) (bool, error) {
_, err := d.RetrieveDevice(sn)
if err != nil {
if err == mongo.ErrNoDocuments {
return false, nil
}
return false, err
}
return true, nil
}
func (m MTP) String() string {
switch m {
case UNDEFINED:
return "unknown"
case MQTT:
return "mqtt"
case STOMP:
return "stomp"
case WEBSOCKETS:
return "websockets"
case CWMP:
return "cwmp"
}
return "unknown"
}