feat(adapter): listen for nats requests + small refacts
This commit is contained in:
parent
8b56913b07
commit
479198ead1
|
|
@ -11,6 +11,7 @@ import (
|
|||
"github.com/OktopUSP/oktopus/backend/services/mtp/adapter/internal/events"
|
||||
"github.com/OktopUSP/oktopus/backend/services/mtp/adapter/internal/events/handler"
|
||||
"github.com/OktopUSP/oktopus/backend/services/mtp/adapter/internal/nats"
|
||||
"github.com/OktopUSP/oktopus/backend/services/mtp/adapter/internal/reqs"
|
||||
)
|
||||
|
||||
func main() {
|
||||
|
|
@ -27,6 +28,8 @@ func main() {
|
|||
|
||||
events.StartEventsListener(c.Nats.Ctx, js, handler)
|
||||
|
||||
reqs.StartRequestsListener(c.Nats.Ctx, nc, db)
|
||||
|
||||
<-done
|
||||
|
||||
log.Println("mtp adapter is shutting down...")
|
||||
|
|
|
|||
|
|
@ -79,6 +79,7 @@ func loadEnvVariables() {
|
|||
if _, err := os.Stat(LOCAL_ENV); err == nil {
|
||||
_ = godotenv.Overload(LOCAL_ENV)
|
||||
log.Printf("Loaded variables from '%s'", LOCAL_ENV)
|
||||
return
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
|
|
|
|||
|
|
@ -116,7 +116,6 @@ func (d *Database) RetrieveDevices(filter bson.A) ([]Device, error) {
|
|||
|
||||
func (d *Database) RetrieveDevice(sn string) (Device, error) {
|
||||
var result Device
|
||||
//TODO: filter devices by user ownership
|
||||
err := d.devices.FindOne(d.ctx, bson.D{{"sn", sn}}, nil).Decode(&result)
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
|
|
@ -133,6 +132,17 @@ func (d *Database) DeleteDevice() {
|
|||
|
||||
}
|
||||
|
||||
func (d *Database) DeviceExists(sn string) (bool, error) {
|
||||
_, err := d.RetrieveDevice(sn)
|
||||
if err != nil {
|
||||
if err == mongo.ErrNoDocuments {
|
||||
return false, nil
|
||||
}
|
||||
return false, err
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func (m MTP) String() string {
|
||||
switch m {
|
||||
case UNDEFINED:
|
||||
|
|
|
|||
|
|
@ -12,6 +12,8 @@ import (
|
|||
|
||||
func StartEventsListener(ctx context.Context, js jetstream.JetStream, h handler.Handler) {
|
||||
|
||||
log.Println("Listening for nats events")
|
||||
|
||||
events := []string{
|
||||
nats.MQTT_STREAM_NAME,
|
||||
nats.WS_STREAM_NAME,
|
||||
|
|
@ -48,11 +50,11 @@ func StartEventsListener(ctx context.Context, js jetstream.JetStream, h handler.
|
|||
|
||||
switch msgType {
|
||||
case "status":
|
||||
h.HandleDeviceStatus(device, msg.Subject(), data, event, func() { msg.Ack(); log.Println("Acked msg") })
|
||||
h.HandleDeviceStatus(device, msg.Subject(), data, event, func() { msg.Ack() })
|
||||
case "info":
|
||||
h.HandleDeviceInfo(device, msg.Subject(), data, event, func() { msg.Ack(); log.Println("Acked msg") })
|
||||
h.HandleDeviceInfo(device, msg.Subject(), data, event, func() { msg.Ack() })
|
||||
default:
|
||||
//ignoreMsg(msg.Subject(), "status", msg.Data())
|
||||
log.Printf("Unknown message type received, subject: %s", msg.Subject())
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
|
|
|||
|
|
@ -17,6 +17,8 @@ const (
|
|||
STOMP_STREAM_NAME = "stomp"
|
||||
LORA_STREAM_NAME = "lora"
|
||||
OPC_STREAM_NAME = "opc"
|
||||
ADAPTER_SUBJECT = "adapter" + USP_SUBJECT
|
||||
USP_SUBJECT = ".usp.v1."
|
||||
)
|
||||
|
||||
func StartNatsClient(c config.Nats) (jetstream.JetStream, *nats.Conn) {
|
||||
|
|
|
|||
58
backend/services/mtp/adapter/internal/reqs/reqs.go
Normal file
58
backend/services/mtp/adapter/internal/reqs/reqs.go
Normal file
|
|
@ -0,0 +1,58 @@
|
|||
/*
|
||||
Provide answers to nats request-reply messages, executing queries to the database
|
||||
*/
|
||||
package reqs
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"log"
|
||||
"strings"
|
||||
|
||||
"github.com/OktopUSP/oktopus/backend/services/mtp/adapter/internal/db"
|
||||
local "github.com/OktopUSP/oktopus/backend/services/mtp/adapter/internal/nats"
|
||||
"github.com/nats-io/nats.go"
|
||||
"go.mongodb.org/mongo-driver/mongo"
|
||||
)
|
||||
|
||||
type msgAnswer struct {
|
||||
Code int
|
||||
Msg any
|
||||
}
|
||||
|
||||
func StartRequestsListener(ctx context.Context, nc *nats.Conn, db db.Database) {
|
||||
log.Println("Listening for nats requests")
|
||||
nc.Subscribe(local.ADAPTER_SUBJECT+"*.device", func(msg *nats.Msg) {
|
||||
subject := strings.Split(msg.Subject, ".")
|
||||
device := subject[len(subject)-2]
|
||||
|
||||
deviceInfo, err := db.RetrieveDevice(device)
|
||||
if deviceInfo.SN != "" {
|
||||
body, _ := json.Marshal(deviceInfo)
|
||||
respondMsg(msg.Respond, 200, body)
|
||||
} else {
|
||||
if err != nil {
|
||||
if err == mongo.ErrNoDocuments {
|
||||
respondMsg(msg.Respond, 404, "Device not found")
|
||||
} else {
|
||||
respondMsg(msg.Respond, 500, err.Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func respondMsg(respond func(data []byte) error, code int, msgData any) {
|
||||
|
||||
msg, err := json.Marshal(msgAnswer{
|
||||
Code: code,
|
||||
Msg: msgData,
|
||||
})
|
||||
if err != nil {
|
||||
log.Printf("Failed to marshal message: %q", err)
|
||||
respond([]byte(err.Error()))
|
||||
return
|
||||
}
|
||||
|
||||
respond([]byte(msg))
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user