feat: dynamic credentials for usp controller
This commit is contained in:
parent
a6c1f9ca5b
commit
f9428ae449
|
|
@ -18,9 +18,9 @@ func main() {
|
|||
|
||||
c := config.NewConfig()
|
||||
|
||||
_, publisher, subscriber := nats.StartNatsClient(c.Nats)
|
||||
kv, publisher, subscriber := nats.StartNatsClient(c.Nats)
|
||||
|
||||
bridge := bridge.NewBridge(publisher, subscriber, c.Mqtt.Ctx, c.Mqtt)
|
||||
bridge := bridge.NewBridge(publisher, subscriber, c.Mqtt.Ctx, c.Mqtt, kv)
|
||||
bridge.StartBridge()
|
||||
|
||||
<-done
|
||||
|
|
|
|||
|
|
@ -14,6 +14,7 @@ import (
|
|||
"github.com/eclipse/paho.golang/paho"
|
||||
"github.com/google/uuid"
|
||||
"github.com/nats-io/nats.go"
|
||||
"github.com/nats-io/nats.go/jetstream"
|
||||
"golang.org/x/sys/unix"
|
||||
)
|
||||
|
||||
|
|
@ -41,15 +42,17 @@ type Bridge struct {
|
|||
Pub Publisher
|
||||
Sub Subscriber
|
||||
Mqtt config.Mqtt
|
||||
kv jetstream.KeyValue
|
||||
Ctx context.Context
|
||||
}
|
||||
|
||||
func NewBridge(p Publisher, s Subscriber, ctx context.Context, m config.Mqtt) *Bridge {
|
||||
func NewBridge(p Publisher, s Subscriber, ctx context.Context, m config.Mqtt, kv jetstream.KeyValue) *Bridge {
|
||||
return &Bridge{
|
||||
Pub: p,
|
||||
Sub: s,
|
||||
Mqtt: m,
|
||||
Ctx: ctx,
|
||||
kv: kv,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -82,6 +85,7 @@ func (b *Bridge) StartBridge() {
|
|||
ClientConfig: *pahoClientConfig,
|
||||
}
|
||||
|
||||
b.setMqttPassword()
|
||||
if b.Mqtt.Username != "" && b.Mqtt.Password != "" {
|
||||
autopahoClientConfig.SetUsernamePassword(b.Mqtt.Username, []byte(b.Mqtt.Password))
|
||||
}
|
||||
|
|
@ -273,3 +277,8 @@ func tcpInfo(conn *net.TCPConn) (*unix.TCPInfo, error) {
|
|||
}
|
||||
return info, nil
|
||||
}
|
||||
|
||||
func (b *Bridge) setMqttPassword() {
|
||||
entry, _ := b.kv.Get(b.Ctx, b.Mqtt.Username)
|
||||
b.Mqtt.Password = string(entry.Value())
|
||||
}
|
||||
|
|
|
|||
|
|
@ -44,7 +44,6 @@ func NewConfig() *Config {
|
|||
mqttUrl := flag.String("mqtt_url", lookupEnvOrString("MQTT_URL", "tcp://localhost:1883"), "url for mqtt server")
|
||||
mqttClientId := flag.String("mqtt_client_id", lookupEnvOrString("MQTT_CLIENT_ID", "mqtt-adapter"), "client id for mqtt")
|
||||
mqttUsername := flag.String("mqtt_username", lookupEnvOrString("MQTT_USERNAME", "oktopusController"), "username for mqtt")
|
||||
mqttPassword := flag.String("mqtt_password", lookupEnvOrString("MQTT_PASSWORD", ""), "password for mqtt")
|
||||
mqttQos := flag.Int("mqtt_qos", lookupEnvOrInt("MQTT_QOS", 1), "quality of service for mqtt")
|
||||
flHelp := flag.Bool("help", false, "Help")
|
||||
|
||||
|
|
@ -75,7 +74,6 @@ func NewConfig() *Config {
|
|||
Url: *mqttUrl,
|
||||
ClientId: *mqttClientId,
|
||||
Username: *mqttUsername,
|
||||
Password: *mqttPassword,
|
||||
Ctx: ctx,
|
||||
Qos: *mqttQos,
|
||||
},
|
||||
|
|
|
|||
|
|
@ -11,11 +11,13 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
STREAM_NAME = "mqtt"
|
||||
STREAM_NAME = "mqtt"
|
||||
BUCKET_NAME = "devices-auth"
|
||||
BUCKET_DESCRIPTION = "Devices authentication"
|
||||
)
|
||||
|
||||
func StartNatsClient(c config.Nats) (
|
||||
*nats.Conn,
|
||||
jetstream.KeyValue,
|
||||
func(string, []byte) error,
|
||||
func(string, func(*nats.Msg)) error,
|
||||
) {
|
||||
|
|
@ -27,7 +29,7 @@ func StartNatsClient(c config.Nats) (
|
|||
|
||||
opts := defineOptions(c)
|
||||
|
||||
log.Printf("Connecting to NATS server %s", c.Url)
|
||||
log.Printf("Connecting to NATS server %s", c.Url)
|
||||
|
||||
for {
|
||||
nc, err = nats.Connect(c.Url, opts...)
|
||||
|
|
@ -44,7 +46,15 @@ func StartNatsClient(c config.Nats) (
|
|||
log.Fatalf("Failed to create JetStream client: %v", err)
|
||||
}
|
||||
|
||||
return nc, publisher(js), subscriber(nc)
|
||||
kv, err := js.CreateOrUpdateKeyValue(c.Ctx, jetstream.KeyValueConfig{
|
||||
Bucket: BUCKET_NAME,
|
||||
Description: BUCKET_DESCRIPTION,
|
||||
})
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to create KeyValue store: %v", err)
|
||||
}
|
||||
|
||||
return kv, publisher(js), subscriber(nc)
|
||||
}
|
||||
|
||||
func subscriber(nc *nats.Conn) func(string, func(*nats.Msg)) error {
|
||||
|
|
|
|||
|
|
@ -18,9 +18,9 @@ func main() {
|
|||
|
||||
c := config.NewConfig()
|
||||
|
||||
_, publisher, subscriber := nats.StartNatsClient(c.Nats)
|
||||
kv, publisher, subscriber := nats.StartNatsClient(c.Nats)
|
||||
|
||||
bridge := bridge.NewBridge(publisher, subscriber, c.Ws.Ctx, c.Ws)
|
||||
bridge := bridge.NewBridge(publisher, subscriber, c.Ws.Ctx, c.Ws, kv)
|
||||
bridge.StartBridge()
|
||||
|
||||
<-done
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@ import (
|
|||
"github.com/OktopUSP/oktopus/backend/services/mtp/ws-adapter/internal/usp/usp_record"
|
||||
"github.com/gorilla/websocket"
|
||||
"github.com/nats-io/nats.go"
|
||||
"github.com/nats-io/nats.go/jetstream"
|
||||
"golang.org/x/sys/unix"
|
||||
"google.golang.org/protobuf/proto"
|
||||
)
|
||||
|
|
@ -54,25 +55,26 @@ type Bridge struct {
|
|||
Ws config.Ws
|
||||
NewDeviceQueue map[string]string
|
||||
NewDevQMutex *sync.Mutex
|
||||
|
||||
Ctx context.Context
|
||||
kv jetstream.KeyValue
|
||||
Ctx context.Context
|
||||
}
|
||||
|
||||
func NewBridge(p Publisher, s Subscriber, ctx context.Context, w config.Ws) *Bridge {
|
||||
func NewBridge(p Publisher, s Subscriber, ctx context.Context, w config.Ws, kv jetstream.KeyValue) *Bridge {
|
||||
return &Bridge{
|
||||
Pub: p,
|
||||
Sub: s,
|
||||
Ws: w,
|
||||
Ctx: ctx,
|
||||
kv: kv,
|
||||
}
|
||||
}
|
||||
|
||||
func (b *Bridge) StartBridge() {
|
||||
|
||||
url := b.urlBuild()
|
||||
dialer := b.newDialer()
|
||||
go func(dialer websocket.Dialer) {
|
||||
go func() {
|
||||
for {
|
||||
url := b.urlBuild()
|
||||
dialer := b.newDialer()
|
||||
wc, _, err := dialer.Dial(url, nil)
|
||||
if err != nil {
|
||||
log.Printf("Error to connect to %s, err: %s", url, err)
|
||||
|
|
@ -128,7 +130,7 @@ func (b *Bridge) StartBridge() {
|
|||
}(wc)
|
||||
break
|
||||
}
|
||||
}(dialer)
|
||||
}()
|
||||
}
|
||||
|
||||
func (b *Bridge) subscribe(wc *websocket.Conn) {
|
||||
|
|
@ -230,8 +232,10 @@ func (b *Bridge) urlBuild() string {
|
|||
|
||||
wsUrl := prefix + b.Ws.Addr + b.Ws.Port + b.Ws.Route
|
||||
|
||||
token, _ := b.kv.Get(b.Ctx, "oktopusController")
|
||||
|
||||
if b.Ws.AuthEnable {
|
||||
wsUrl = wsUrl + "?token=" + b.Ws.Token
|
||||
wsUrl = wsUrl + "?token=" + string(token.Value())
|
||||
}
|
||||
|
||||
return wsUrl
|
||||
|
|
|
|||
|
|
@ -20,7 +20,6 @@ type Nats struct {
|
|||
}
|
||||
|
||||
type Ws struct {
|
||||
Token string
|
||||
AuthEnable bool
|
||||
Addr string
|
||||
Port string
|
||||
|
|
@ -42,7 +41,6 @@ func NewConfig() *Config {
|
|||
natsUrl := flag.String("nats_url", lookupEnvOrString("NATS_URL", "nats://localhost:4222"), "url for nats server")
|
||||
natsName := flag.String("nats_name", lookupEnvOrString("NATS_NAME", "ws-adapter"), "name for nats client")
|
||||
natsVerifyCertificates := flag.Bool("nats_verify_certificates", lookupEnvOrBool("NATS_VERIFY_CERTIFICATES", false), "verify validity of certificates from nats server")
|
||||
wsToken := flag.String("ws_token", lookupEnvOrString("WS_TOKEN", ""), "websocket server auth token (if authentication is enabled)")
|
||||
wsAuthEnable := flag.Bool("ws_auth_enable", lookupEnvOrBool("WS_AUTH_ENABLE", false), "enable authentication for websocket server")
|
||||
wsAddr := flag.String("ws_addr", lookupEnvOrString("WS_ADDR", "localhost"), "websocket server address (domain or ip)")
|
||||
wsPort := flag.String("ws_port", lookupEnvOrString("WS_PORT", ":8080"), "websocket server port")
|
||||
|
|
@ -68,7 +66,6 @@ func NewConfig() *Config {
|
|||
Ctx: ctx,
|
||||
},
|
||||
Ws: Ws{
|
||||
Token: *wsToken,
|
||||
AuthEnable: *wsAuthEnable,
|
||||
Addr: *wsAddr,
|
||||
Port: *wsPort,
|
||||
|
|
|
|||
|
|
@ -10,11 +10,13 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
STREAM_NAME = "ws"
|
||||
STREAM_NAME = "ws"
|
||||
BUCKET_NAME = "devices-auth"
|
||||
BUCKET_DESCRIPTION = "Devices authentication"
|
||||
)
|
||||
|
||||
func StartNatsClient(c config.Nats) (
|
||||
*nats.Conn,
|
||||
jetstream.KeyValue,
|
||||
func(string, []byte) error,
|
||||
func(string, func(*nats.Msg)) error,
|
||||
) {
|
||||
|
|
@ -43,7 +45,15 @@ func StartNatsClient(c config.Nats) (
|
|||
log.Fatalf("Failed to create JetStream client: %v", err)
|
||||
}
|
||||
|
||||
return nc, publisher(js), subscriber(nc)
|
||||
kv, err := js.CreateOrUpdateKeyValue(c.Ctx, jetstream.KeyValueConfig{
|
||||
Bucket: BUCKET_NAME,
|
||||
Description: BUCKET_DESCRIPTION,
|
||||
})
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to create KeyValue store: %v", err)
|
||||
}
|
||||
|
||||
return kv, publisher(js), subscriber(nc)
|
||||
}
|
||||
|
||||
func subscriber(nc *nats.Conn) func(string, func(*nats.Msg)) error {
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user