Merge pull request #235 from OktopUSP/dev
Redis remove + Dynamic adapters auth + Users crud
This commit is contained in:
commit
555cb4fb09
|
|
@ -53,7 +53,7 @@ This repository aims to promote the development of a multi-vendor management pla
|
||||||
|
|
||||||
<ul><li><h4>Infrastructure:</h4></li></ul>
|
<ul><li><h4>Infrastructure:</h4></li></ul>
|
||||||
|
|
||||||

|

|
||||||
|
|
||||||
<ul>
|
<ul>
|
||||||
<li>
|
<li>
|
||||||
|
|
|
||||||
|
|
@ -50,6 +50,9 @@ func (a *Api) StartApi() {
|
||||||
authentication := r.PathPrefix("/api/auth").Subrouter()
|
authentication := r.PathPrefix("/api/auth").Subrouter()
|
||||||
authentication.HandleFunc("/login", a.generateToken).Methods("PUT")
|
authentication.HandleFunc("/login", a.generateToken).Methods("PUT")
|
||||||
authentication.HandleFunc("/register", a.registerUser).Methods("POST")
|
authentication.HandleFunc("/register", a.registerUser).Methods("POST")
|
||||||
|
authentication.HandleFunc("/delete/{user}", a.deleteUser).Methods("DELETE")
|
||||||
|
authentication.HandleFunc("/password/{user}", a.changePassword).Methods("PUT")
|
||||||
|
authentication.HandleFunc("/password", a.changePassword).Methods("PUT")
|
||||||
authentication.HandleFunc("/admin/register", a.registerAdminUser).Methods("POST")
|
authentication.HandleFunc("/admin/register", a.registerAdminUser).Methods("POST")
|
||||||
authentication.HandleFunc("/admin/exists", a.adminUserExists).Methods("GET")
|
authentication.HandleFunc("/admin/exists", a.adminUserExists).Methods("GET")
|
||||||
iot := r.PathPrefix("/api/device").Subrouter()
|
iot := r.PathPrefix("/api/device").Subrouter()
|
||||||
|
|
|
||||||
|
|
@ -111,6 +111,18 @@ type DeviceAuth struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *Api) deviceAuth(w http.ResponseWriter, r *http.Request) {
|
func (a *Api) deviceAuth(w http.ResponseWriter, r *http.Request) {
|
||||||
|
|
||||||
|
user, err := a.db.FindUser(r.Context().Value("email").(string))
|
||||||
|
if err != nil {
|
||||||
|
w.WriteHeader(http.StatusInternalServerError)
|
||||||
|
utils.MarshallEncoder(err, w)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if user.Level != AdminUser {
|
||||||
|
w.WriteHeader(http.StatusForbidden)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
if r.Method == http.MethodGet {
|
if r.Method == http.MethodGet {
|
||||||
|
|
||||||
id := r.URL.Query().Get("id")
|
id := r.URL.Query().Get("id")
|
||||||
|
|
|
||||||
|
|
@ -26,7 +26,6 @@ type GeneralInfo struct {
|
||||||
VendorsCount []entity.VendorsCount
|
VendorsCount []entity.VendorsCount
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: fix when mqtt broker is not set don't break api
|
|
||||||
func (a *Api) generalInfo(w http.ResponseWriter, r *http.Request) {
|
func (a *Api) generalInfo(w http.ResponseWriter, r *http.Request) {
|
||||||
|
|
||||||
var result GeneralInfo
|
var result GeneralInfo
|
||||||
|
|
|
||||||
|
|
@ -5,8 +5,10 @@ import (
|
||||||
"log"
|
"log"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
|
||||||
|
"github.com/gorilla/mux"
|
||||||
"github.com/leandrofars/oktopus/internal/api/auth"
|
"github.com/leandrofars/oktopus/internal/api/auth"
|
||||||
"github.com/leandrofars/oktopus/internal/db"
|
"github.com/leandrofars/oktopus/internal/db"
|
||||||
|
"github.com/leandrofars/oktopus/internal/utils"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (a *Api) retrieveUsers(w http.ResponseWriter, r *http.Request) {
|
func (a *Api) retrieveUsers(w http.ResponseWriter, r *http.Request) {
|
||||||
|
|
@ -68,6 +70,81 @@ func (a *Api) registerUser(w http.ResponseWriter, r *http.Request) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (a *Api) deleteUser(w http.ResponseWriter, r *http.Request) {
|
||||||
|
tokenString := r.Header.Get("Authorization")
|
||||||
|
if tokenString == "" {
|
||||||
|
w.WriteHeader(http.StatusUnauthorized)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
email, err := auth.ValidateToken(tokenString)
|
||||||
|
if err != nil {
|
||||||
|
w.WriteHeader(http.StatusUnauthorized)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
//Check if user which is requesting deletion has the necessary privileges
|
||||||
|
rUser, err := a.db.FindUser(email)
|
||||||
|
if rUser.Level != AdminUser {
|
||||||
|
w.WriteHeader(http.StatusForbidden)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
userEmail := mux.Vars(r)["user"]
|
||||||
|
if userEmail == email {
|
||||||
|
w.WriteHeader(http.StatusBadRequest)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := a.db.DeleteUser(userEmail); err != nil {
|
||||||
|
w.WriteHeader(http.StatusInternalServerError)
|
||||||
|
json.NewEncoder(w).Encode(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *Api) changePassword(w http.ResponseWriter, r *http.Request) {
|
||||||
|
tokenString := r.Header.Get("Authorization")
|
||||||
|
if tokenString == "" {
|
||||||
|
w.WriteHeader(http.StatusUnauthorized)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
email, err := auth.ValidateToken(tokenString)
|
||||||
|
if err != nil {
|
||||||
|
w.WriteHeader(http.StatusUnauthorized)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
userToChangePasswd := mux.Vars(r)["user"]
|
||||||
|
if userToChangePasswd != "" && userToChangePasswd != email {
|
||||||
|
rUser, _ := a.db.FindUser(email)
|
||||||
|
if rUser.Level != AdminUser {
|
||||||
|
w.WriteHeader(http.StatusForbidden)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
email = userToChangePasswd
|
||||||
|
}
|
||||||
|
|
||||||
|
var user db.User
|
||||||
|
err = json.NewDecoder(r.Body).Decode(&user)
|
||||||
|
if err != nil {
|
||||||
|
w.WriteHeader(http.StatusBadRequest)
|
||||||
|
utils.MarshallEncoder(err, w)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
user.Email = email
|
||||||
|
|
||||||
|
if err := user.HashPassword(user.Password); err != nil {
|
||||||
|
w.WriteHeader(http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := a.db.UpdatePassword(user); err != nil {
|
||||||
|
w.WriteHeader(http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
func (a *Api) registerAdminUser(w http.ResponseWriter, r *http.Request) {
|
func (a *Api) registerAdminUser(w http.ResponseWriter, r *http.Request) {
|
||||||
|
|
||||||
var user db.User
|
var user db.User
|
||||||
|
|
|
||||||
|
|
@ -27,6 +27,11 @@ func (d *Database) RegisterUser(user User) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (d *Database) UpdatePassword(user User) error {
|
||||||
|
_, err := d.users.UpdateOne(d.ctx, bson.D{{"email", user.Email}}, bson.D{{"$set", bson.D{{"password", user.Password}}}})
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
func (d *Database) FindAllUsers() ([]map[string]interface{}, error) {
|
func (d *Database) FindAllUsers() ([]map[string]interface{}, error) {
|
||||||
var result []map[string]interface{}
|
var result []map[string]interface{}
|
||||||
cursor, err := d.users.Find(d.ctx, bson.D{{}})
|
cursor, err := d.users.Find(d.ctx, bson.D{{}})
|
||||||
|
|
@ -45,6 +50,11 @@ func (d *Database) FindUser(email string) (User, error) {
|
||||||
return result, err
|
return result, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (d *Database) DeleteUser(email string) error {
|
||||||
|
_, err := d.users.DeleteOne(d.ctx, bson.D{{"email", email}})
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
func (user *User) HashPassword(password string) error {
|
func (user *User) HashPassword(password string) error {
|
||||||
bytes, err := bcrypt.GenerateFromPassword([]byte(password), 14)
|
bytes, err := bcrypt.GenerateFromPassword([]byte(password), 14)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
||||||
|
|
@ -18,9 +18,9 @@ func main() {
|
||||||
|
|
||||||
c := config.NewConfig()
|
c := config.NewConfig()
|
||||||
|
|
||||||
_, publisher, subscriber := nats.StartNatsClient(c.Nats)
|
kv, publisher, subscriber := nats.StartNatsClient(c.Nats)
|
||||||
|
|
||||||
bridge := bridge.NewBridge(publisher, subscriber, c.Mqtt.Ctx, c.Mqtt)
|
bridge := bridge.NewBridge(publisher, subscriber, c.Mqtt.Ctx, c.Mqtt, kv)
|
||||||
bridge.StartBridge()
|
bridge.StartBridge()
|
||||||
|
|
||||||
<-done
|
<-done
|
||||||
|
|
|
||||||
|
|
@ -14,6 +14,7 @@ import (
|
||||||
"github.com/eclipse/paho.golang/paho"
|
"github.com/eclipse/paho.golang/paho"
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
"github.com/nats-io/nats.go"
|
"github.com/nats-io/nats.go"
|
||||||
|
"github.com/nats-io/nats.go/jetstream"
|
||||||
"golang.org/x/sys/unix"
|
"golang.org/x/sys/unix"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -41,15 +42,17 @@ type Bridge struct {
|
||||||
Pub Publisher
|
Pub Publisher
|
||||||
Sub Subscriber
|
Sub Subscriber
|
||||||
Mqtt config.Mqtt
|
Mqtt config.Mqtt
|
||||||
|
kv jetstream.KeyValue
|
||||||
Ctx context.Context
|
Ctx context.Context
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewBridge(p Publisher, s Subscriber, ctx context.Context, m config.Mqtt) *Bridge {
|
func NewBridge(p Publisher, s Subscriber, ctx context.Context, m config.Mqtt, kv jetstream.KeyValue) *Bridge {
|
||||||
return &Bridge{
|
return &Bridge{
|
||||||
Pub: p,
|
Pub: p,
|
||||||
Sub: s,
|
Sub: s,
|
||||||
Mqtt: m,
|
Mqtt: m,
|
||||||
Ctx: ctx,
|
Ctx: ctx,
|
||||||
|
kv: kv,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -82,6 +85,7 @@ func (b *Bridge) StartBridge() {
|
||||||
ClientConfig: *pahoClientConfig,
|
ClientConfig: *pahoClientConfig,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
b.setMqttPassword()
|
||||||
if b.Mqtt.Username != "" && b.Mqtt.Password != "" {
|
if b.Mqtt.Username != "" && b.Mqtt.Password != "" {
|
||||||
autopahoClientConfig.SetUsernamePassword(b.Mqtt.Username, []byte(b.Mqtt.Password))
|
autopahoClientConfig.SetUsernamePassword(b.Mqtt.Username, []byte(b.Mqtt.Password))
|
||||||
}
|
}
|
||||||
|
|
@ -273,3 +277,8 @@ func tcpInfo(conn *net.TCPConn) (*unix.TCPInfo, error) {
|
||||||
}
|
}
|
||||||
return info, nil
|
return info, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (b *Bridge) setMqttPassword() {
|
||||||
|
entry, _ := b.kv.Get(b.Ctx, b.Mqtt.Username)
|
||||||
|
b.Mqtt.Password = string(entry.Value())
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -44,7 +44,6 @@ func NewConfig() *Config {
|
||||||
mqttUrl := flag.String("mqtt_url", lookupEnvOrString("MQTT_URL", "tcp://localhost:1883"), "url for mqtt server")
|
mqttUrl := flag.String("mqtt_url", lookupEnvOrString("MQTT_URL", "tcp://localhost:1883"), "url for mqtt server")
|
||||||
mqttClientId := flag.String("mqtt_client_id", lookupEnvOrString("MQTT_CLIENT_ID", "mqtt-adapter"), "client id for mqtt")
|
mqttClientId := flag.String("mqtt_client_id", lookupEnvOrString("MQTT_CLIENT_ID", "mqtt-adapter"), "client id for mqtt")
|
||||||
mqttUsername := flag.String("mqtt_username", lookupEnvOrString("MQTT_USERNAME", "oktopusController"), "username for mqtt")
|
mqttUsername := flag.String("mqtt_username", lookupEnvOrString("MQTT_USERNAME", "oktopusController"), "username for mqtt")
|
||||||
mqttPassword := flag.String("mqtt_password", lookupEnvOrString("MQTT_PASSWORD", ""), "password for mqtt")
|
|
||||||
mqttQos := flag.Int("mqtt_qos", lookupEnvOrInt("MQTT_QOS", 1), "quality of service for mqtt")
|
mqttQos := flag.Int("mqtt_qos", lookupEnvOrInt("MQTT_QOS", 1), "quality of service for mqtt")
|
||||||
flHelp := flag.Bool("help", false, "Help")
|
flHelp := flag.Bool("help", false, "Help")
|
||||||
|
|
||||||
|
|
@ -75,7 +74,6 @@ func NewConfig() *Config {
|
||||||
Url: *mqttUrl,
|
Url: *mqttUrl,
|
||||||
ClientId: *mqttClientId,
|
ClientId: *mqttClientId,
|
||||||
Username: *mqttUsername,
|
Username: *mqttUsername,
|
||||||
Password: *mqttPassword,
|
|
||||||
Ctx: ctx,
|
Ctx: ctx,
|
||||||
Qos: *mqttQos,
|
Qos: *mqttQos,
|
||||||
},
|
},
|
||||||
|
|
|
||||||
|
|
@ -12,10 +12,12 @@ import (
|
||||||
|
|
||||||
const (
|
const (
|
||||||
STREAM_NAME = "mqtt"
|
STREAM_NAME = "mqtt"
|
||||||
|
BUCKET_NAME = "devices-auth"
|
||||||
|
BUCKET_DESCRIPTION = "Devices authentication"
|
||||||
)
|
)
|
||||||
|
|
||||||
func StartNatsClient(c config.Nats) (
|
func StartNatsClient(c config.Nats) (
|
||||||
*nats.Conn,
|
jetstream.KeyValue,
|
||||||
func(string, []byte) error,
|
func(string, []byte) error,
|
||||||
func(string, func(*nats.Msg)) error,
|
func(string, func(*nats.Msg)) error,
|
||||||
) {
|
) {
|
||||||
|
|
@ -44,7 +46,15 @@ func StartNatsClient(c config.Nats) (
|
||||||
log.Fatalf("Failed to create JetStream client: %v", err)
|
log.Fatalf("Failed to create JetStream client: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nc, publisher(js), subscriber(nc)
|
kv, err := js.CreateOrUpdateKeyValue(c.Ctx, jetstream.KeyValueConfig{
|
||||||
|
Bucket: BUCKET_NAME,
|
||||||
|
Description: BUCKET_DESCRIPTION,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Failed to create KeyValue store: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return kv, publisher(js), subscriber(nc)
|
||||||
}
|
}
|
||||||
|
|
||||||
func subscriber(nc *nats.Conn) func(string, func(*nats.Msg)) error {
|
func subscriber(nc *nats.Conn) func(string, func(*nats.Msg)) error {
|
||||||
|
|
|
||||||
|
|
@ -18,9 +18,9 @@ func main() {
|
||||||
|
|
||||||
c := config.NewConfig()
|
c := config.NewConfig()
|
||||||
|
|
||||||
_, publisher, subscriber := nats.StartNatsClient(c.Nats)
|
kv, publisher, subscriber := nats.StartNatsClient(c.Nats)
|
||||||
|
|
||||||
bridge := bridge.NewBridge(publisher, subscriber, c.Ws.Ctx, c.Ws)
|
bridge := bridge.NewBridge(publisher, subscriber, c.Ws.Ctx, c.Ws, kv)
|
||||||
bridge.StartBridge()
|
bridge.StartBridge()
|
||||||
|
|
||||||
<-done
|
<-done
|
||||||
|
|
|
||||||
|
|
@ -17,6 +17,7 @@ import (
|
||||||
"github.com/OktopUSP/oktopus/backend/services/mtp/ws-adapter/internal/usp/usp_record"
|
"github.com/OktopUSP/oktopus/backend/services/mtp/ws-adapter/internal/usp/usp_record"
|
||||||
"github.com/gorilla/websocket"
|
"github.com/gorilla/websocket"
|
||||||
"github.com/nats-io/nats.go"
|
"github.com/nats-io/nats.go"
|
||||||
|
"github.com/nats-io/nats.go/jetstream"
|
||||||
"golang.org/x/sys/unix"
|
"golang.org/x/sys/unix"
|
||||||
"google.golang.org/protobuf/proto"
|
"google.golang.org/protobuf/proto"
|
||||||
)
|
)
|
||||||
|
|
@ -54,25 +55,26 @@ type Bridge struct {
|
||||||
Ws config.Ws
|
Ws config.Ws
|
||||||
NewDeviceQueue map[string]string
|
NewDeviceQueue map[string]string
|
||||||
NewDevQMutex *sync.Mutex
|
NewDevQMutex *sync.Mutex
|
||||||
|
kv jetstream.KeyValue
|
||||||
Ctx context.Context
|
Ctx context.Context
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewBridge(p Publisher, s Subscriber, ctx context.Context, w config.Ws) *Bridge {
|
func NewBridge(p Publisher, s Subscriber, ctx context.Context, w config.Ws, kv jetstream.KeyValue) *Bridge {
|
||||||
return &Bridge{
|
return &Bridge{
|
||||||
Pub: p,
|
Pub: p,
|
||||||
Sub: s,
|
Sub: s,
|
||||||
Ws: w,
|
Ws: w,
|
||||||
Ctx: ctx,
|
Ctx: ctx,
|
||||||
|
kv: kv,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *Bridge) StartBridge() {
|
func (b *Bridge) StartBridge() {
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
url := b.urlBuild()
|
url := b.urlBuild()
|
||||||
dialer := b.newDialer()
|
dialer := b.newDialer()
|
||||||
go func(dialer websocket.Dialer) {
|
|
||||||
for {
|
|
||||||
wc, _, err := dialer.Dial(url, nil)
|
wc, _, err := dialer.Dial(url, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("Error to connect to %s, err: %s", url, err)
|
log.Printf("Error to connect to %s, err: %s", url, err)
|
||||||
|
|
@ -128,7 +130,7 @@ func (b *Bridge) StartBridge() {
|
||||||
}(wc)
|
}(wc)
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}(dialer)
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *Bridge) subscribe(wc *websocket.Conn) {
|
func (b *Bridge) subscribe(wc *websocket.Conn) {
|
||||||
|
|
@ -230,8 +232,10 @@ func (b *Bridge) urlBuild() string {
|
||||||
|
|
||||||
wsUrl := prefix + b.Ws.Addr + b.Ws.Port + b.Ws.Route
|
wsUrl := prefix + b.Ws.Addr + b.Ws.Port + b.Ws.Route
|
||||||
|
|
||||||
|
token, _ := b.kv.Get(b.Ctx, "oktopusController")
|
||||||
|
|
||||||
if b.Ws.AuthEnable {
|
if b.Ws.AuthEnable {
|
||||||
wsUrl = wsUrl + "?token=" + b.Ws.Token
|
wsUrl = wsUrl + "?token=" + string(token.Value())
|
||||||
}
|
}
|
||||||
|
|
||||||
return wsUrl
|
return wsUrl
|
||||||
|
|
|
||||||
|
|
@ -20,7 +20,6 @@ type Nats struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type Ws struct {
|
type Ws struct {
|
||||||
Token string
|
|
||||||
AuthEnable bool
|
AuthEnable bool
|
||||||
Addr string
|
Addr string
|
||||||
Port string
|
Port string
|
||||||
|
|
@ -42,7 +41,6 @@ func NewConfig() *Config {
|
||||||
natsUrl := flag.String("nats_url", lookupEnvOrString("NATS_URL", "nats://localhost:4222"), "url for nats server")
|
natsUrl := flag.String("nats_url", lookupEnvOrString("NATS_URL", "nats://localhost:4222"), "url for nats server")
|
||||||
natsName := flag.String("nats_name", lookupEnvOrString("NATS_NAME", "ws-adapter"), "name for nats client")
|
natsName := flag.String("nats_name", lookupEnvOrString("NATS_NAME", "ws-adapter"), "name for nats client")
|
||||||
natsVerifyCertificates := flag.Bool("nats_verify_certificates", lookupEnvOrBool("NATS_VERIFY_CERTIFICATES", false), "verify validity of certificates from nats server")
|
natsVerifyCertificates := flag.Bool("nats_verify_certificates", lookupEnvOrBool("NATS_VERIFY_CERTIFICATES", false), "verify validity of certificates from nats server")
|
||||||
wsToken := flag.String("ws_token", lookupEnvOrString("WS_TOKEN", ""), "websocket server auth token (if authentication is enabled)")
|
|
||||||
wsAuthEnable := flag.Bool("ws_auth_enable", lookupEnvOrBool("WS_AUTH_ENABLE", false), "enable authentication for websocket server")
|
wsAuthEnable := flag.Bool("ws_auth_enable", lookupEnvOrBool("WS_AUTH_ENABLE", false), "enable authentication for websocket server")
|
||||||
wsAddr := flag.String("ws_addr", lookupEnvOrString("WS_ADDR", "localhost"), "websocket server address (domain or ip)")
|
wsAddr := flag.String("ws_addr", lookupEnvOrString("WS_ADDR", "localhost"), "websocket server address (domain or ip)")
|
||||||
wsPort := flag.String("ws_port", lookupEnvOrString("WS_PORT", ":8080"), "websocket server port")
|
wsPort := flag.String("ws_port", lookupEnvOrString("WS_PORT", ":8080"), "websocket server port")
|
||||||
|
|
@ -68,7 +66,6 @@ func NewConfig() *Config {
|
||||||
Ctx: ctx,
|
Ctx: ctx,
|
||||||
},
|
},
|
||||||
Ws: Ws{
|
Ws: Ws{
|
||||||
Token: *wsToken,
|
|
||||||
AuthEnable: *wsAuthEnable,
|
AuthEnable: *wsAuthEnable,
|
||||||
Addr: *wsAddr,
|
Addr: *wsAddr,
|
||||||
Port: *wsPort,
|
Port: *wsPort,
|
||||||
|
|
|
||||||
|
|
@ -11,10 +11,12 @@ import (
|
||||||
|
|
||||||
const (
|
const (
|
||||||
STREAM_NAME = "ws"
|
STREAM_NAME = "ws"
|
||||||
|
BUCKET_NAME = "devices-auth"
|
||||||
|
BUCKET_DESCRIPTION = "Devices authentication"
|
||||||
)
|
)
|
||||||
|
|
||||||
func StartNatsClient(c config.Nats) (
|
func StartNatsClient(c config.Nats) (
|
||||||
*nats.Conn,
|
jetstream.KeyValue,
|
||||||
func(string, []byte) error,
|
func(string, []byte) error,
|
||||||
func(string, func(*nats.Msg)) error,
|
func(string, func(*nats.Msg)) error,
|
||||||
) {
|
) {
|
||||||
|
|
@ -43,7 +45,15 @@ func StartNatsClient(c config.Nats) (
|
||||||
log.Fatalf("Failed to create JetStream client: %v", err)
|
log.Fatalf("Failed to create JetStream client: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nc, publisher(js), subscriber(nc)
|
kv, err := js.CreateOrUpdateKeyValue(c.Ctx, jetstream.KeyValueConfig{
|
||||||
|
Bucket: BUCKET_NAME,
|
||||||
|
Description: BUCKET_DESCRIPTION,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Failed to create KeyValue store: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return kv, publisher(js), subscriber(nc)
|
||||||
}
|
}
|
||||||
|
|
||||||
func subscriber(nc *nats.Conn) func(string, func(*nats.Msg)) error {
|
func subscriber(nc *nats.Conn) func(string, func(*nats.Msg)) error {
|
||||||
|
|
|
||||||
|
|
@ -1,2 +1,2 @@
|
||||||
REDIS_ENABLE=true
|
REDIS_ENABLE=false
|
||||||
REDIS_ADDR=redis_usp:6379
|
REDIS_ADDR=redis_usp:6379
|
||||||
|
|
@ -43,15 +43,6 @@ services:
|
||||||
- ./mongo_data:/data/db
|
- ./mongo_data:/data/db
|
||||||
profiles: [controller,adapter]
|
profiles: [controller,adapter]
|
||||||
|
|
||||||
redis:
|
|
||||||
image: redis
|
|
||||||
container_name: redis_usp
|
|
||||||
ports:
|
|
||||||
- 6379:6379
|
|
||||||
networks:
|
|
||||||
usp_network:
|
|
||||||
ipv4_address: 172.16.235.5
|
|
||||||
profiles: [mqtt]
|
|
||||||
#/* -------------------------------------------------------------------------- */
|
#/* -------------------------------------------------------------------------- */
|
||||||
|
|
||||||
#/* ----------------------- Message Transfer Protocols ----------------------- */
|
#/* ----------------------- Message Transfer Protocols ----------------------- */
|
||||||
|
|
@ -61,8 +52,6 @@ services:
|
||||||
ports:
|
ports:
|
||||||
- 1883:1883
|
- 1883:1883
|
||||||
- 8883:8883
|
- 8883:8883
|
||||||
depends_on:
|
|
||||||
- redis
|
|
||||||
env_file:
|
env_file:
|
||||||
- .env.mqtt
|
- .env.mqtt
|
||||||
networks:
|
networks:
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue
Block a user