diff --git a/backend/services/mtp/adapter/cmd/adapter/main.go b/backend/services/mtp/adapter/cmd/adapter/main.go index 2d12da7..c1f4885 100644 --- a/backend/services/mtp/adapter/cmd/adapter/main.go +++ b/backend/services/mtp/adapter/cmd/adapter/main.go @@ -11,6 +11,7 @@ import ( "github.com/OktopUSP/oktopus/backend/services/mtp/adapter/internal/events" "github.com/OktopUSP/oktopus/backend/services/mtp/adapter/internal/events/handler" "github.com/OktopUSP/oktopus/backend/services/mtp/adapter/internal/nats" + "github.com/OktopUSP/oktopus/backend/services/mtp/adapter/internal/reqs" ) func main() { @@ -27,6 +28,8 @@ func main() { events.StartEventsListener(c.Nats.Ctx, js, handler) + reqs.StartRequestsListener(c.Nats.Ctx, nc, db) + <-done log.Println("mtp adapter is shutting down...") diff --git a/backend/services/mtp/adapter/internal/config/config.go b/backend/services/mtp/adapter/internal/config/config.go index 5173dd8..ec8980b 100644 --- a/backend/services/mtp/adapter/internal/config/config.go +++ b/backend/services/mtp/adapter/internal/config/config.go @@ -79,6 +79,7 @@ func loadEnvVariables() { if _, err := os.Stat(LOCAL_ENV); err == nil { _ = godotenv.Overload(LOCAL_ENV) log.Printf("Loaded variables from '%s'", LOCAL_ENV) + return } if err != nil { diff --git a/backend/services/mtp/adapter/internal/db/device.go b/backend/services/mtp/adapter/internal/db/device.go index 5af07df..81b0a29 100644 --- a/backend/services/mtp/adapter/internal/db/device.go +++ b/backend/services/mtp/adapter/internal/db/device.go @@ -116,7 +116,6 @@ func (d *Database) RetrieveDevices(filter bson.A) ([]Device, error) { func (d *Database) RetrieveDevice(sn string) (Device, error) { var result Device - //TODO: filter devices by user ownership err := d.devices.FindOne(d.ctx, bson.D{{"sn", sn}}, nil).Decode(&result) if err != nil { log.Println(err) @@ -133,6 +132,17 @@ func (d *Database) DeleteDevice() { } +func (d *Database) DeviceExists(sn string) (bool, error) { + _, err := d.RetrieveDevice(sn) + if err != nil { + if err == mongo.ErrNoDocuments { + return false, nil + } + return false, err + } + return true, nil +} + func (m MTP) String() string { switch m { case UNDEFINED: diff --git a/backend/services/mtp/adapter/internal/events/events.go b/backend/services/mtp/adapter/internal/events/events.go index 06aa0fe..2415ac9 100644 --- a/backend/services/mtp/adapter/internal/events/events.go +++ b/backend/services/mtp/adapter/internal/events/events.go @@ -12,6 +12,8 @@ import ( func StartEventsListener(ctx context.Context, js jetstream.JetStream, h handler.Handler) { + log.Println("Listening for nats events") + events := []string{ nats.MQTT_STREAM_NAME, nats.WS_STREAM_NAME, @@ -48,11 +50,11 @@ func StartEventsListener(ctx context.Context, js jetstream.JetStream, h handler. switch msgType { case "status": - h.HandleDeviceStatus(device, msg.Subject(), data, event, func() { msg.Ack(); log.Println("Acked msg") }) + h.HandleDeviceStatus(device, msg.Subject(), data, event, func() { msg.Ack() }) case "info": - h.HandleDeviceInfo(device, msg.Subject(), data, event, func() { msg.Ack(); log.Println("Acked msg") }) + h.HandleDeviceInfo(device, msg.Subject(), data, event, func() { msg.Ack() }) default: - //ignoreMsg(msg.Subject(), "status", msg.Data()) + log.Printf("Unknown message type received, subject: %s", msg.Subject()) } } }() diff --git a/backend/services/mtp/adapter/internal/nats/nats.go b/backend/services/mtp/adapter/internal/nats/nats.go index e62855d..c2b8998 100644 --- a/backend/services/mtp/adapter/internal/nats/nats.go +++ b/backend/services/mtp/adapter/internal/nats/nats.go @@ -17,6 +17,8 @@ const ( STOMP_STREAM_NAME = "stomp" LORA_STREAM_NAME = "lora" OPC_STREAM_NAME = "opc" + ADAPTER_SUBJECT = "adapter" + USP_SUBJECT + USP_SUBJECT = ".usp.v1." ) func StartNatsClient(c config.Nats) (jetstream.JetStream, *nats.Conn) { diff --git a/backend/services/mtp/adapter/internal/reqs/reqs.go b/backend/services/mtp/adapter/internal/reqs/reqs.go new file mode 100644 index 0000000..887d4ae --- /dev/null +++ b/backend/services/mtp/adapter/internal/reqs/reqs.go @@ -0,0 +1,58 @@ +/* +Provide answers to nats request-reply messages, executing queries to the database +*/ +package reqs + +import ( + "context" + "encoding/json" + "log" + "strings" + + "github.com/OktopUSP/oktopus/backend/services/mtp/adapter/internal/db" + local "github.com/OktopUSP/oktopus/backend/services/mtp/adapter/internal/nats" + "github.com/nats-io/nats.go" + "go.mongodb.org/mongo-driver/mongo" +) + +type msgAnswer struct { + Code int + Msg any +} + +func StartRequestsListener(ctx context.Context, nc *nats.Conn, db db.Database) { + log.Println("Listening for nats requests") + nc.Subscribe(local.ADAPTER_SUBJECT+"*.device", func(msg *nats.Msg) { + subject := strings.Split(msg.Subject, ".") + device := subject[len(subject)-2] + + deviceInfo, err := db.RetrieveDevice(device) + if deviceInfo.SN != "" { + body, _ := json.Marshal(deviceInfo) + respondMsg(msg.Respond, 200, body) + } else { + if err != nil { + if err == mongo.ErrNoDocuments { + respondMsg(msg.Respond, 404, "Device not found") + } else { + respondMsg(msg.Respond, 500, err.Error()) + } + } + } + }) +} + +func respondMsg(respond func(data []byte) error, code int, msgData any) { + + msg, err := json.Marshal(msgAnswer{ + Code: code, + Msg: msgData, + }) + if err != nil { + log.Printf("Failed to marshal message: %q", err) + respond([]byte(err.Error())) + return + } + + respond([]byte(msg)) +}