diff --git a/README.md b/README.md
index 5611dce..06c1202 100644
--- a/README.md
+++ b/README.md
@@ -53,7 +53,7 @@ This repository aims to promote the development of a multi-vendor management pla
-
+
-
diff --git a/backend/services/controller/internal/api/api.go b/backend/services/controller/internal/api/api.go
index 46ad272..33c25fe 100644
--- a/backend/services/controller/internal/api/api.go
+++ b/backend/services/controller/internal/api/api.go
@@ -50,6 +50,9 @@ func (a *Api) StartApi() {
authentication := r.PathPrefix("/api/auth").Subrouter()
authentication.HandleFunc("/login", a.generateToken).Methods("PUT")
authentication.HandleFunc("/register", a.registerUser).Methods("POST")
+ authentication.HandleFunc("/delete/{user}", a.deleteUser).Methods("DELETE")
+ authentication.HandleFunc("/password/{user}", a.changePassword).Methods("PUT")
+ authentication.HandleFunc("/password", a.changePassword).Methods("PUT")
authentication.HandleFunc("/admin/register", a.registerAdminUser).Methods("POST")
authentication.HandleFunc("/admin/exists", a.adminUserExists).Methods("GET")
iot := r.PathPrefix("/api/device").Subrouter()
diff --git a/backend/services/controller/internal/api/device.go b/backend/services/controller/internal/api/device.go
index f5669e8..8a96644 100644
--- a/backend/services/controller/internal/api/device.go
+++ b/backend/services/controller/internal/api/device.go
@@ -111,6 +111,18 @@ type DeviceAuth struct {
}
func (a *Api) deviceAuth(w http.ResponseWriter, r *http.Request) {
+
+ user, err := a.db.FindUser(r.Context().Value("email").(string))
+ if err != nil {
+ w.WriteHeader(http.StatusInternalServerError)
+ utils.MarshallEncoder(err, w)
+ return
+ }
+ if user.Level != AdminUser {
+ w.WriteHeader(http.StatusForbidden)
+ return
+ }
+
if r.Method == http.MethodGet {
id := r.URL.Query().Get("id")
diff --git a/backend/services/controller/internal/api/info.go b/backend/services/controller/internal/api/info.go
index 7610985..de15b14 100644
--- a/backend/services/controller/internal/api/info.go
+++ b/backend/services/controller/internal/api/info.go
@@ -26,7 +26,6 @@ type GeneralInfo struct {
VendorsCount []entity.VendorsCount
}
-// TODO: fix when mqtt broker is not set don't break api
func (a *Api) generalInfo(w http.ResponseWriter, r *http.Request) {
var result GeneralInfo
diff --git a/backend/services/controller/internal/api/user.go b/backend/services/controller/internal/api/user.go
index 59891e3..6800d40 100644
--- a/backend/services/controller/internal/api/user.go
+++ b/backend/services/controller/internal/api/user.go
@@ -5,8 +5,10 @@ import (
"log"
"net/http"
+ "github.com/gorilla/mux"
"github.com/leandrofars/oktopus/internal/api/auth"
"github.com/leandrofars/oktopus/internal/db"
+ "github.com/leandrofars/oktopus/internal/utils"
)
func (a *Api) retrieveUsers(w http.ResponseWriter, r *http.Request) {
@@ -68,6 +70,81 @@ func (a *Api) registerUser(w http.ResponseWriter, r *http.Request) {
}
}
+func (a *Api) deleteUser(w http.ResponseWriter, r *http.Request) {
+ tokenString := r.Header.Get("Authorization")
+ if tokenString == "" {
+ w.WriteHeader(http.StatusUnauthorized)
+ return
+ }
+ email, err := auth.ValidateToken(tokenString)
+ if err != nil {
+ w.WriteHeader(http.StatusUnauthorized)
+ return
+ }
+
+ //Check if user which is requesting deletion has the necessary privileges
+ rUser, err := a.db.FindUser(email)
+ if rUser.Level != AdminUser {
+ w.WriteHeader(http.StatusForbidden)
+ return
+ }
+
+ userEmail := mux.Vars(r)["user"]
+ if userEmail == email {
+ w.WriteHeader(http.StatusBadRequest)
+ return
+ }
+
+ if err := a.db.DeleteUser(userEmail); err != nil {
+ w.WriteHeader(http.StatusInternalServerError)
+ json.NewEncoder(w).Encode(err)
+ return
+ }
+}
+
+func (a *Api) changePassword(w http.ResponseWriter, r *http.Request) {
+ tokenString := r.Header.Get("Authorization")
+ if tokenString == "" {
+ w.WriteHeader(http.StatusUnauthorized)
+ return
+ }
+ email, err := auth.ValidateToken(tokenString)
+ if err != nil {
+ w.WriteHeader(http.StatusUnauthorized)
+ return
+ }
+
+ userToChangePasswd := mux.Vars(r)["user"]
+ if userToChangePasswd != "" && userToChangePasswd != email {
+ rUser, _ := a.db.FindUser(email)
+ if rUser.Level != AdminUser {
+ w.WriteHeader(http.StatusForbidden)
+ return
+ }
+ email = userToChangePasswd
+ }
+
+ var user db.User
+ err = json.NewDecoder(r.Body).Decode(&user)
+ if err != nil {
+ w.WriteHeader(http.StatusBadRequest)
+ utils.MarshallEncoder(err, w)
+ return
+ }
+ user.Email = email
+
+ if err := user.HashPassword(user.Password); err != nil {
+ w.WriteHeader(http.StatusInternalServerError)
+ return
+ }
+
+ if err := a.db.UpdatePassword(user); err != nil {
+ w.WriteHeader(http.StatusInternalServerError)
+ return
+ }
+
+}
+
func (a *Api) registerAdminUser(w http.ResponseWriter, r *http.Request) {
var user db.User
diff --git a/backend/services/controller/internal/db/user.go b/backend/services/controller/internal/db/user.go
index 3f251d1..af0950f 100644
--- a/backend/services/controller/internal/db/user.go
+++ b/backend/services/controller/internal/db/user.go
@@ -27,6 +27,11 @@ func (d *Database) RegisterUser(user User) error {
return err
}
+func (d *Database) UpdatePassword(user User) error {
+ _, err := d.users.UpdateOne(d.ctx, bson.D{{"email", user.Email}}, bson.D{{"$set", bson.D{{"password", user.Password}}}})
+ return err
+}
+
func (d *Database) FindAllUsers() ([]map[string]interface{}, error) {
var result []map[string]interface{}
cursor, err := d.users.Find(d.ctx, bson.D{{}})
@@ -45,6 +50,11 @@ func (d *Database) FindUser(email string) (User, error) {
return result, err
}
+func (d *Database) DeleteUser(email string) error {
+ _, err := d.users.DeleteOne(d.ctx, bson.D{{"email", email}})
+ return err
+}
+
func (user *User) HashPassword(password string) error {
bytes, err := bcrypt.GenerateFromPassword([]byte(password), 14)
if err != nil {
diff --git a/backend/services/mtp/mqtt-adapter/cmd/mqtt-adapter/main.go b/backend/services/mtp/mqtt-adapter/cmd/mqtt-adapter/main.go
index 961d48e..eab14f6 100644
--- a/backend/services/mtp/mqtt-adapter/cmd/mqtt-adapter/main.go
+++ b/backend/services/mtp/mqtt-adapter/cmd/mqtt-adapter/main.go
@@ -18,9 +18,9 @@ func main() {
c := config.NewConfig()
- _, publisher, subscriber := nats.StartNatsClient(c.Nats)
+ kv, publisher, subscriber := nats.StartNatsClient(c.Nats)
- bridge := bridge.NewBridge(publisher, subscriber, c.Mqtt.Ctx, c.Mqtt)
+ bridge := bridge.NewBridge(publisher, subscriber, c.Mqtt.Ctx, c.Mqtt, kv)
bridge.StartBridge()
<-done
diff --git a/backend/services/mtp/mqtt-adapter/internal/bridge/bridge.go b/backend/services/mtp/mqtt-adapter/internal/bridge/bridge.go
index ecb68d2..0a65158 100644
--- a/backend/services/mtp/mqtt-adapter/internal/bridge/bridge.go
+++ b/backend/services/mtp/mqtt-adapter/internal/bridge/bridge.go
@@ -14,6 +14,7 @@ import (
"github.com/eclipse/paho.golang/paho"
"github.com/google/uuid"
"github.com/nats-io/nats.go"
+ "github.com/nats-io/nats.go/jetstream"
"golang.org/x/sys/unix"
)
@@ -41,15 +42,17 @@ type Bridge struct {
Pub Publisher
Sub Subscriber
Mqtt config.Mqtt
+ kv jetstream.KeyValue
Ctx context.Context
}
-func NewBridge(p Publisher, s Subscriber, ctx context.Context, m config.Mqtt) *Bridge {
+func NewBridge(p Publisher, s Subscriber, ctx context.Context, m config.Mqtt, kv jetstream.KeyValue) *Bridge {
return &Bridge{
Pub: p,
Sub: s,
Mqtt: m,
Ctx: ctx,
+ kv: kv,
}
}
@@ -82,6 +85,7 @@ func (b *Bridge) StartBridge() {
ClientConfig: *pahoClientConfig,
}
+ b.setMqttPassword()
if b.Mqtt.Username != "" && b.Mqtt.Password != "" {
autopahoClientConfig.SetUsernamePassword(b.Mqtt.Username, []byte(b.Mqtt.Password))
}
@@ -273,3 +277,8 @@ func tcpInfo(conn *net.TCPConn) (*unix.TCPInfo, error) {
}
return info, nil
}
+
+func (b *Bridge) setMqttPassword() {
+ entry, _ := b.kv.Get(b.Ctx, b.Mqtt.Username)
+ b.Mqtt.Password = string(entry.Value())
+}
diff --git a/backend/services/mtp/mqtt-adapter/internal/config/config.go b/backend/services/mtp/mqtt-adapter/internal/config/config.go
index 08b6ce7..a29f4dc 100644
--- a/backend/services/mtp/mqtt-adapter/internal/config/config.go
+++ b/backend/services/mtp/mqtt-adapter/internal/config/config.go
@@ -44,7 +44,6 @@ func NewConfig() *Config {
mqttUrl := flag.String("mqtt_url", lookupEnvOrString("MQTT_URL", "tcp://localhost:1883"), "url for mqtt server")
mqttClientId := flag.String("mqtt_client_id", lookupEnvOrString("MQTT_CLIENT_ID", "mqtt-adapter"), "client id for mqtt")
mqttUsername := flag.String("mqtt_username", lookupEnvOrString("MQTT_USERNAME", "oktopusController"), "username for mqtt")
- mqttPassword := flag.String("mqtt_password", lookupEnvOrString("MQTT_PASSWORD", ""), "password for mqtt")
mqttQos := flag.Int("mqtt_qos", lookupEnvOrInt("MQTT_QOS", 1), "quality of service for mqtt")
flHelp := flag.Bool("help", false, "Help")
@@ -75,7 +74,6 @@ func NewConfig() *Config {
Url: *mqttUrl,
ClientId: *mqttClientId,
Username: *mqttUsername,
- Password: *mqttPassword,
Ctx: ctx,
Qos: *mqttQos,
},
diff --git a/backend/services/mtp/mqtt-adapter/internal/nats/nats.go b/backend/services/mtp/mqtt-adapter/internal/nats/nats.go
index cb89574..7698d76 100644
--- a/backend/services/mtp/mqtt-adapter/internal/nats/nats.go
+++ b/backend/services/mtp/mqtt-adapter/internal/nats/nats.go
@@ -11,11 +11,13 @@ import (
)
const (
- STREAM_NAME = "mqtt"
+ STREAM_NAME = "mqtt"
+ BUCKET_NAME = "devices-auth"
+ BUCKET_DESCRIPTION = "Devices authentication"
)
func StartNatsClient(c config.Nats) (
- *nats.Conn,
+ jetstream.KeyValue,
func(string, []byte) error,
func(string, func(*nats.Msg)) error,
) {
@@ -27,7 +29,7 @@ func StartNatsClient(c config.Nats) (
opts := defineOptions(c)
- log.Printf("Connecting to NATS server %s", c.Url)
+ log.Printf("Connecting to NATS server %s", c.Url)
for {
nc, err = nats.Connect(c.Url, opts...)
@@ -44,7 +46,15 @@ func StartNatsClient(c config.Nats) (
log.Fatalf("Failed to create JetStream client: %v", err)
}
- return nc, publisher(js), subscriber(nc)
+ kv, err := js.CreateOrUpdateKeyValue(c.Ctx, jetstream.KeyValueConfig{
+ Bucket: BUCKET_NAME,
+ Description: BUCKET_DESCRIPTION,
+ })
+ if err != nil {
+ log.Fatalf("Failed to create KeyValue store: %v", err)
+ }
+
+ return kv, publisher(js), subscriber(nc)
}
func subscriber(nc *nats.Conn) func(string, func(*nats.Msg)) error {
diff --git a/backend/services/mtp/ws-adapter/cmd/ws-adapter/main.go b/backend/services/mtp/ws-adapter/cmd/ws-adapter/main.go
index 8c1570e..ba0c588 100644
--- a/backend/services/mtp/ws-adapter/cmd/ws-adapter/main.go
+++ b/backend/services/mtp/ws-adapter/cmd/ws-adapter/main.go
@@ -18,9 +18,9 @@ func main() {
c := config.NewConfig()
- _, publisher, subscriber := nats.StartNatsClient(c.Nats)
+ kv, publisher, subscriber := nats.StartNatsClient(c.Nats)
- bridge := bridge.NewBridge(publisher, subscriber, c.Ws.Ctx, c.Ws)
+ bridge := bridge.NewBridge(publisher, subscriber, c.Ws.Ctx, c.Ws, kv)
bridge.StartBridge()
<-done
diff --git a/backend/services/mtp/ws-adapter/internal/bridge/bridge.go b/backend/services/mtp/ws-adapter/internal/bridge/bridge.go
index 6687632..6d2f236 100644
--- a/backend/services/mtp/ws-adapter/internal/bridge/bridge.go
+++ b/backend/services/mtp/ws-adapter/internal/bridge/bridge.go
@@ -17,6 +17,7 @@ import (
"github.com/OktopUSP/oktopus/backend/services/mtp/ws-adapter/internal/usp/usp_record"
"github.com/gorilla/websocket"
"github.com/nats-io/nats.go"
+ "github.com/nats-io/nats.go/jetstream"
"golang.org/x/sys/unix"
"google.golang.org/protobuf/proto"
)
@@ -54,25 +55,26 @@ type Bridge struct {
Ws config.Ws
NewDeviceQueue map[string]string
NewDevQMutex *sync.Mutex
-
- Ctx context.Context
+ kv jetstream.KeyValue
+ Ctx context.Context
}
-func NewBridge(p Publisher, s Subscriber, ctx context.Context, w config.Ws) *Bridge {
+func NewBridge(p Publisher, s Subscriber, ctx context.Context, w config.Ws, kv jetstream.KeyValue) *Bridge {
return &Bridge{
Pub: p,
Sub: s,
Ws: w,
Ctx: ctx,
+ kv: kv,
}
}
func (b *Bridge) StartBridge() {
- url := b.urlBuild()
- dialer := b.newDialer()
- go func(dialer websocket.Dialer) {
+ go func() {
for {
+ url := b.urlBuild()
+ dialer := b.newDialer()
wc, _, err := dialer.Dial(url, nil)
if err != nil {
log.Printf("Error to connect to %s, err: %s", url, err)
@@ -128,7 +130,7 @@ func (b *Bridge) StartBridge() {
}(wc)
break
}
- }(dialer)
+ }()
}
func (b *Bridge) subscribe(wc *websocket.Conn) {
@@ -230,8 +232,10 @@ func (b *Bridge) urlBuild() string {
wsUrl := prefix + b.Ws.Addr + b.Ws.Port + b.Ws.Route
+ token, _ := b.kv.Get(b.Ctx, "oktopusController")
+
if b.Ws.AuthEnable {
- wsUrl = wsUrl + "?token=" + b.Ws.Token
+ wsUrl = wsUrl + "?token=" + string(token.Value())
}
return wsUrl
diff --git a/backend/services/mtp/ws-adapter/internal/config/config.go b/backend/services/mtp/ws-adapter/internal/config/config.go
index 5f683b2..eb4c026 100644
--- a/backend/services/mtp/ws-adapter/internal/config/config.go
+++ b/backend/services/mtp/ws-adapter/internal/config/config.go
@@ -20,7 +20,6 @@ type Nats struct {
}
type Ws struct {
- Token string
AuthEnable bool
Addr string
Port string
@@ -42,7 +41,6 @@ func NewConfig() *Config {
natsUrl := flag.String("nats_url", lookupEnvOrString("NATS_URL", "nats://localhost:4222"), "url for nats server")
natsName := flag.String("nats_name", lookupEnvOrString("NATS_NAME", "ws-adapter"), "name for nats client")
natsVerifyCertificates := flag.Bool("nats_verify_certificates", lookupEnvOrBool("NATS_VERIFY_CERTIFICATES", false), "verify validity of certificates from nats server")
- wsToken := flag.String("ws_token", lookupEnvOrString("WS_TOKEN", ""), "websocket server auth token (if authentication is enabled)")
wsAuthEnable := flag.Bool("ws_auth_enable", lookupEnvOrBool("WS_AUTH_ENABLE", false), "enable authentication for websocket server")
wsAddr := flag.String("ws_addr", lookupEnvOrString("WS_ADDR", "localhost"), "websocket server address (domain or ip)")
wsPort := flag.String("ws_port", lookupEnvOrString("WS_PORT", ":8080"), "websocket server port")
@@ -68,7 +66,6 @@ func NewConfig() *Config {
Ctx: ctx,
},
Ws: Ws{
- Token: *wsToken,
AuthEnable: *wsAuthEnable,
Addr: *wsAddr,
Port: *wsPort,
diff --git a/backend/services/mtp/ws-adapter/internal/nats/nats.go b/backend/services/mtp/ws-adapter/internal/nats/nats.go
index 4078222..c5a1de4 100644
--- a/backend/services/mtp/ws-adapter/internal/nats/nats.go
+++ b/backend/services/mtp/ws-adapter/internal/nats/nats.go
@@ -10,11 +10,13 @@ import (
)
const (
- STREAM_NAME = "ws"
+ STREAM_NAME = "ws"
+ BUCKET_NAME = "devices-auth"
+ BUCKET_DESCRIPTION = "Devices authentication"
)
func StartNatsClient(c config.Nats) (
- *nats.Conn,
+ jetstream.KeyValue,
func(string, []byte) error,
func(string, func(*nats.Msg)) error,
) {
@@ -43,7 +45,15 @@ func StartNatsClient(c config.Nats) (
log.Fatalf("Failed to create JetStream client: %v", err)
}
- return nc, publisher(js), subscriber(nc)
+ kv, err := js.CreateOrUpdateKeyValue(c.Ctx, jetstream.KeyValueConfig{
+ Bucket: BUCKET_NAME,
+ Description: BUCKET_DESCRIPTION,
+ })
+ if err != nil {
+ log.Fatalf("Failed to create KeyValue store: %v", err)
+ }
+
+ return kv, publisher(js), subscriber(nc)
}
func subscriber(nc *nats.Conn) func(string, func(*nats.Msg)) error {
diff --git a/deploy/compose/.env.mqtt b/deploy/compose/.env.mqtt
index 9bf7853..2939a97 100644
--- a/deploy/compose/.env.mqtt
+++ b/deploy/compose/.env.mqtt
@@ -1,2 +1,2 @@
-REDIS_ENABLE=true
+REDIS_ENABLE=false
REDIS_ADDR=redis_usp:6379
\ No newline at end of file
diff --git a/deploy/compose/docker-compose.yaml b/deploy/compose/docker-compose.yaml
index 9ce146c..12b4d38 100644
--- a/deploy/compose/docker-compose.yaml
+++ b/deploy/compose/docker-compose.yaml
@@ -43,15 +43,6 @@ services:
- ./mongo_data:/data/db
profiles: [controller,adapter]
- redis:
- image: redis
- container_name: redis_usp
- ports:
- - 6379:6379
- networks:
- usp_network:
- ipv4_address: 172.16.235.5
- profiles: [mqtt]
#/* -------------------------------------------------------------------------- */
#/* ----------------------- Message Transfer Protocols ----------------------- */
@@ -61,8 +52,6 @@ services:
ports:
- 1883:1883
- 8883:8883
- depends_on:
- - redis
env_file:
- .env.mqtt
networks: