feat(adapter): work at cluster mode

This commit is contained in:
leandrofars 2024-06-27 18:50:17 -03:00
parent 2deadba855
commit 62d05b2389
2 changed files with 9 additions and 8 deletions

View File

@ -18,10 +18,11 @@ const (
LORA_STREAM_NAME = "lora"
OPC_STREAM_NAME = "opc"
CWMP_STREAM_NAME = "cwmp"
ADAPTER_SUBJECT = "adapter" + USP_SUBJECT
USP_SUBJECT = ".usp.v1."
BUCKET_NAME = "devices-auth"
BUCKET_DESCRIPTION = "Devices authentication"
ADAPTER_SUBJECT = "adapter" + USP_SUBJECT
ADAPTER_QUEUE = "adapter"
)
func StartNatsClient(c config.Nats, controller config.Controller) (jetstream.JetStream, *nats.Conn) {

View File

@ -24,7 +24,7 @@ type msgAnswer struct {
func StartRequestsListener(ctx context.Context, nc *nats.Conn, db db.Database) {
log.Println("Listening for nats requests")
nc.Subscribe(local.ADAPTER_SUBJECT+"*.device", func(msg *nats.Msg) {
nc.QueueSubscribe(local.ADAPTER_SUBJECT+"*.device", local.ADAPTER_QUEUE, func(msg *nats.Msg) {
subject := strings.Split(msg.Subject, ".")
device := subject[len(subject)-2]
@ -42,7 +42,7 @@ func StartRequestsListener(ctx context.Context, nc *nats.Conn, db db.Database) {
}
})
nc.Subscribe(local.ADAPTER_SUBJECT+"devices.count", func(msg *nats.Msg) {
nc.QueueSubscribe(local.ADAPTER_SUBJECT+"devices.count", local.ADAPTER_QUEUE, func(msg *nats.Msg) {
count, err := db.RetrieveDevicesCount(bson.M{})
if err != nil {
respondMsg(msg.Respond, 500, err.Error())
@ -50,7 +50,7 @@ func StartRequestsListener(ctx context.Context, nc *nats.Conn, db db.Database) {
respondMsg(msg.Respond, 200, count)
})
nc.Subscribe(local.ADAPTER_SUBJECT+"devices.retrieve", func(msg *nats.Msg) {
nc.QueueSubscribe(local.ADAPTER_SUBJECT+"devices.retrieve", local.ADAPTER_QUEUE, func(msg *nats.Msg) {
var filter bson.A
@ -66,7 +66,7 @@ func StartRequestsListener(ctx context.Context, nc *nats.Conn, db db.Database) {
respondMsg(msg.Respond, 200, devicesList)
})
nc.Subscribe(local.ADAPTER_SUBJECT+"devices.class", func(msg *nats.Msg) {
nc.QueueSubscribe(local.ADAPTER_SUBJECT+"devices.class", local.ADAPTER_QUEUE, func(msg *nats.Msg) {
productClassCount, err := db.RetrieveProductsClassInfo()
if err != nil {
respondMsg(msg.Respond, 500, err.Error())
@ -74,7 +74,7 @@ func StartRequestsListener(ctx context.Context, nc *nats.Conn, db db.Database) {
respondMsg(msg.Respond, 200, productClassCount)
})
nc.Subscribe(local.ADAPTER_SUBJECT+"devices.vendors", func(msg *nats.Msg) {
nc.QueueSubscribe(local.ADAPTER_SUBJECT+"devices.vendors", local.ADAPTER_QUEUE, func(msg *nats.Msg) {
productClassCount, err := db.RetrieveVendorsInfo()
if err != nil {
respondMsg(msg.Respond, 500, err.Error())
@ -82,7 +82,7 @@ func StartRequestsListener(ctx context.Context, nc *nats.Conn, db db.Database) {
respondMsg(msg.Respond, 200, productClassCount)
})
nc.Subscribe(local.ADAPTER_SUBJECT+"devices.status", func(msg *nats.Msg) {
nc.QueueSubscribe(local.ADAPTER_SUBJECT+"devices.status", local.ADAPTER_QUEUE, func(msg *nats.Msg) {
productClassCount, err := db.RetrieveStatusInfo()
if err != nil {
respondMsg(msg.Respond, 500, err.Error())
@ -90,7 +90,7 @@ func StartRequestsListener(ctx context.Context, nc *nats.Conn, db db.Database) {
respondMsg(msg.Respond, 200, productClassCount)
})
nc.Subscribe(local.ADAPTER_SUBJECT+"*.device.alias", func(msg *nats.Msg) {
nc.QueueSubscribe(local.ADAPTER_SUBJECT+"*.device.alias", local.ADAPTER_QUEUE, func(msg *nats.Msg) {
subject := strings.Split(msg.Subject, ".")
device := subject[len(subject)-3]