oktopus/backend/services/mtp/adapter/internal/nats/nats.go
2024-07-05 16:29:51 -03:00

161 lines
3.9 KiB
Go

package nats
import (
"context"
"errors"
"log"
"time"
"github.com/OktopUSP/oktopus/backend/services/mtp/adapter/internal/config"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
)
const (
MQTT_STREAM_NAME = "mqtt"
WS_STREAM_NAME = "ws"
STOMP_STREAM_NAME = "stomp"
LORA_STREAM_NAME = "lora"
OPC_STREAM_NAME = "opc"
CWMP_STREAM_NAME = "cwmp"
USP_SUBJECT = ".usp.v1."
BUCKET_NAME = "devices-auth"
BUCKET_DESCRIPTION = "Devices authentication"
ADAPTER_SUBJECT = "adapter" + USP_SUBJECT
ADAPTER_QUEUE = "adapter"
)
func StartNatsClient(c config.Nats, controller config.Controller) (jetstream.JetStream, *nats.Conn) {
var (
nc *nats.Conn
err error
)
opts := defineOptions(c)
log.Printf("Connecting to NATS server %s", c.Url)
for {
nc, err = nats.Connect(c.Url, opts...)
if err != nil {
time.Sleep(5 * time.Second)
continue
}
break
}
log.Printf("Successfully connected to NATS server %s", c.Url)
js, err := jetstream.New(nc)
if err != nil {
log.Fatalf("Failed to create JetStream client: %v", err)
}
streams := defineStreams()
err = createStreams(c.Ctx, js, streams)
if err != nil {
log.Fatalf("Failed to create Consumer: %v", err)
}
consumers := defineConsumers()
err = createConsumers(c.Ctx, js, consumers)
if err != nil {
log.Fatalf("Failed to create Consumer: %v", err)
}
createKeyValueStores(c.Ctx, js, controller)
return js, nc
}
func createKeyValueStores(ctx context.Context, js jetstream.JetStream, control config.Controller) {
kv, err := js.CreateOrUpdateKeyValue(ctx, jetstream.KeyValueConfig{
Bucket: BUCKET_NAME,
Description: BUCKET_DESCRIPTION,
})
if err != nil {
log.Fatalf("Failed to create KeyValue store: %v", err)
}
_, err = kv.PutString(ctx, control.ControllerId, control.ControllerPasswd)
if err != nil {
log.Printf("Failed to create KeyValue store: %v", err)
}
}
func createStreams(ctx context.Context, js jetstream.JetStream, streams []string) error {
for _, stream := range streams {
_, err := js.CreateOrUpdateStream(ctx, jetstream.StreamConfig{
Name: stream,
Description: "Stream for " + stream + " messages",
Subjects: []string{stream + ".>"},
Retention: jetstream.InterestPolicy,
})
if err != nil {
return errors.New(err.Error() + " | consumer:" + stream)
}
}
return nil
}
func createConsumers(ctx context.Context, js jetstream.JetStream, consumers []string) error {
for _, consumer := range consumers {
_, err := js.CreateOrUpdateConsumer(ctx, consumer, jetstream.ConsumerConfig{
Name: consumer,
Description: "Consumer for " + consumer + " messages",
AckPolicy: jetstream.AckExplicitPolicy,
Durable: consumer,
})
if err != nil {
return err
}
}
return nil
}
func defineStreams() []string {
return []string{
MQTT_STREAM_NAME,
WS_STREAM_NAME,
STOMP_STREAM_NAME,
LORA_STREAM_NAME,
OPC_STREAM_NAME,
CWMP_STREAM_NAME,
}
}
func defineConsumers() []string {
return []string{
MQTT_STREAM_NAME,
WS_STREAM_NAME,
STOMP_STREAM_NAME,
LORA_STREAM_NAME,
OPC_STREAM_NAME,
CWMP_STREAM_NAME,
}
}
func defineOptions(c config.Nats) []nats.Option {
var opts []nats.Option
opts = append(opts, nats.Name(c.Name))
opts = append(opts, nats.MaxReconnects(-1))
opts = append(opts, nats.ReconnectWait(5*time.Second))
opts = append(opts, nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
log.Printf("Got disconnected! Reason: %q\n", err)
}))
opts = append(opts, nats.ReconnectHandler(func(nc *nats.Conn) {
log.Printf("Got reconnected to %v!\n", nc.ConnectedUrl())
}))
opts = append(opts, nats.ClosedHandler(func(nc *nats.Conn) {
log.Printf("Connection closed. Reason: %q\n", nc.LastError())
}))
if c.EnableTls {
log.Printf("Load certificates: %s and %s\n", c.Cert.CertFile, c.Cert.KeyFile)
opts = append(opts, nats.RootCAs(c.Cert.CaFile))
opts = append(opts, nats.ClientCert(c.Cert.CertFile, c.Cert.KeyFile))
}
return opts
}