From 62d05b23896134d9129624d04e272b244b472650 Mon Sep 17 00:00:00 2001 From: leandrofars Date: Thu, 27 Jun 2024 18:50:17 -0300 Subject: [PATCH] feat(adapter): work at cluster mode --- backend/services/mtp/adapter/internal/nats/nats.go | 3 ++- backend/services/mtp/adapter/internal/reqs/reqs.go | 14 +++++++------- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/backend/services/mtp/adapter/internal/nats/nats.go b/backend/services/mtp/adapter/internal/nats/nats.go index 3a9a56e..7985275 100644 --- a/backend/services/mtp/adapter/internal/nats/nats.go +++ b/backend/services/mtp/adapter/internal/nats/nats.go @@ -18,10 +18,11 @@ const ( LORA_STREAM_NAME = "lora" OPC_STREAM_NAME = "opc" CWMP_STREAM_NAME = "cwmp" - ADAPTER_SUBJECT = "adapter" + USP_SUBJECT USP_SUBJECT = ".usp.v1." BUCKET_NAME = "devices-auth" BUCKET_DESCRIPTION = "Devices authentication" + ADAPTER_SUBJECT = "adapter" + USP_SUBJECT + ADAPTER_QUEUE = "adapter" ) func StartNatsClient(c config.Nats, controller config.Controller) (jetstream.JetStream, *nats.Conn) { diff --git a/backend/services/mtp/adapter/internal/reqs/reqs.go b/backend/services/mtp/adapter/internal/reqs/reqs.go index 166651d..3da0fa9 100644 --- a/backend/services/mtp/adapter/internal/reqs/reqs.go +++ b/backend/services/mtp/adapter/internal/reqs/reqs.go @@ -24,7 +24,7 @@ type msgAnswer struct { func StartRequestsListener(ctx context.Context, nc *nats.Conn, db db.Database) { log.Println("Listening for nats requests") - nc.Subscribe(local.ADAPTER_SUBJECT+"*.device", func(msg *nats.Msg) { + nc.QueueSubscribe(local.ADAPTER_SUBJECT+"*.device", local.ADAPTER_QUEUE, func(msg *nats.Msg) { subject := strings.Split(msg.Subject, ".") device := subject[len(subject)-2] @@ -42,7 +42,7 @@ func StartRequestsListener(ctx context.Context, nc *nats.Conn, db db.Database) { } }) - nc.Subscribe(local.ADAPTER_SUBJECT+"devices.count", func(msg *nats.Msg) { + nc.QueueSubscribe(local.ADAPTER_SUBJECT+"devices.count", local.ADAPTER_QUEUE, func(msg *nats.Msg) { count, err := db.RetrieveDevicesCount(bson.M{}) if err != nil { respondMsg(msg.Respond, 500, err.Error()) @@ -50,7 +50,7 @@ func StartRequestsListener(ctx context.Context, nc *nats.Conn, db db.Database) { respondMsg(msg.Respond, 200, count) }) - nc.Subscribe(local.ADAPTER_SUBJECT+"devices.retrieve", func(msg *nats.Msg) { + nc.QueueSubscribe(local.ADAPTER_SUBJECT+"devices.retrieve", local.ADAPTER_QUEUE, func(msg *nats.Msg) { var filter bson.A @@ -66,7 +66,7 @@ func StartRequestsListener(ctx context.Context, nc *nats.Conn, db db.Database) { respondMsg(msg.Respond, 200, devicesList) }) - nc.Subscribe(local.ADAPTER_SUBJECT+"devices.class", func(msg *nats.Msg) { + nc.QueueSubscribe(local.ADAPTER_SUBJECT+"devices.class", local.ADAPTER_QUEUE, func(msg *nats.Msg) { productClassCount, err := db.RetrieveProductsClassInfo() if err != nil { respondMsg(msg.Respond, 500, err.Error()) @@ -74,7 +74,7 @@ func StartRequestsListener(ctx context.Context, nc *nats.Conn, db db.Database) { respondMsg(msg.Respond, 200, productClassCount) }) - nc.Subscribe(local.ADAPTER_SUBJECT+"devices.vendors", func(msg *nats.Msg) { + nc.QueueSubscribe(local.ADAPTER_SUBJECT+"devices.vendors", local.ADAPTER_QUEUE, func(msg *nats.Msg) { productClassCount, err := db.RetrieveVendorsInfo() if err != nil { respondMsg(msg.Respond, 500, err.Error()) @@ -82,7 +82,7 @@ func StartRequestsListener(ctx context.Context, nc *nats.Conn, db db.Database) { respondMsg(msg.Respond, 200, productClassCount) }) - nc.Subscribe(local.ADAPTER_SUBJECT+"devices.status", func(msg *nats.Msg) { + nc.QueueSubscribe(local.ADAPTER_SUBJECT+"devices.status", local.ADAPTER_QUEUE, func(msg *nats.Msg) { productClassCount, err := db.RetrieveStatusInfo() if err != nil { respondMsg(msg.Respond, 500, err.Error()) @@ -90,7 +90,7 @@ func StartRequestsListener(ctx context.Context, nc *nats.Conn, db db.Database) { respondMsg(msg.Respond, 200, productClassCount) }) - nc.Subscribe(local.ADAPTER_SUBJECT+"*.device.alias", func(msg *nats.Msg) { + nc.QueueSubscribe(local.ADAPTER_SUBJECT+"*.device.alias", local.ADAPTER_QUEUE, func(msg *nats.Msg) { subject := strings.Split(msg.Subject, ".") device := subject[len(subject)-3]