From 4047e1db0b702e79d002014252704263bf35be69 Mon Sep 17 00:00:00 2001 From: leandrofars Date: Fri, 12 Apr 2024 19:26:48 -0300 Subject: [PATCH] feat: device auth as rbac via nats keyvalue store | closes #212 --- .../controller/cmd/controller/main.go | 4 +- .../services/controller/internal/api/api.go | 9 +- .../controller/internal/api/device.go | 104 ++++++++++++++++++ .../services/controller/internal/nats/nats.go | 14 ++- .../controller/internal/utils/utils.go | 4 +- backend/services/mtp/adapter/.env.local | 3 +- .../services/mtp/adapter/cmd/adapter/main.go | 4 +- .../mtp/adapter/internal/config/config.go | 17 ++- .../mtp/adapter/internal/nats/nats.go | 35 ++++-- backend/services/mtp/mqtt-adapter/.env | 0 backend/services/mtp/mqtt-adapter/.gitignore | 1 + .../mqtt-adapter/internal/config/config.go | 2 +- backend/services/mtp/mqtt/README.md | 1 + backend/services/mtp/mqtt/cmd/mqtt/auth.json | 54 --------- backend/services/mtp/mqtt/cmd/mqtt/run.sh | 1 - backend/services/mtp/mqtt/go.mod | 8 +- backend/services/mtp/mqtt/go.sum | 14 +++ .../mtp/mqtt/internal/config/config.go | 28 ++++- .../mtp/mqtt/internal/listeners/listeners.go | 11 +- .../mtp/mqtt/internal/listeners/mqtt/hook.go | 98 +++++++++++++++++ .../mtp/mqtt/internal/listeners/mqtt/mqtt.go | 31 +++--- .../services/mtp/mqtt/internal/nats/nats.go | 82 ++++++++++++++ backend/services/mtp/ws-adapter/.gitignore | 1 + backend/services/mtp/ws/cmd/ws/main.go | 5 +- backend/services/mtp/ws/go.mod | 11 +- backend/services/mtp/ws/go.sum | 14 +++ .../services/mtp/ws/internal/config/config.go | 23 +++- backend/services/mtp/ws/internal/nats/nats.go | 74 +++++++++++++ .../mtp/ws/internal/ws/handler/client.go | 48 +++++++- backend/services/mtp/ws/internal/ws/ws.go | 10 +- 30 files changed, 590 insertions(+), 121 deletions(-) create mode 100644 backend/services/mtp/mqtt-adapter/.env create mode 100644 backend/services/mtp/mqtt-adapter/.gitignore delete mode 100644 backend/services/mtp/mqtt/cmd/mqtt/auth.json delete mode 100644 backend/services/mtp/mqtt/cmd/mqtt/run.sh create mode 100644 backend/services/mtp/mqtt/internal/nats/nats.go create mode 100644 backend/services/mtp/ws-adapter/.gitignore create mode 100644 backend/services/mtp/ws/internal/nats/nats.go diff --git a/backend/services/controller/cmd/controller/main.go b/backend/services/controller/cmd/controller/main.go index 7fe7616..e4b50c9 100755 --- a/backend/services/controller/cmd/controller/main.go +++ b/backend/services/controller/cmd/controller/main.go @@ -20,13 +20,13 @@ func main() { c := config.NewConfig() - js, nc := nats.StartNatsClient(c.Nats) + js, nc, kv := nats.StartNatsClient(c.Nats) bridge := bridge.NewBridge(js, nc) db := db.NewDatabase(c.Mongo.Ctx, c.Mongo.Uri) - api := api.NewApi(c.RestApi, js, nc, bridge, db) + api := api.NewApi(c.RestApi, js, nc, bridge, db, kv) api.StartApi() <-done diff --git a/backend/services/controller/internal/api/api.go b/backend/services/controller/internal/api/api.go index 077360b..46ad272 100644 --- a/backend/services/controller/internal/api/api.go +++ b/backend/services/controller/internal/api/api.go @@ -22,6 +22,7 @@ type Api struct { nc *nats.Conn bridge bridge.Bridge db db.Database + kv jetstream.KeyValue ctx context.Context } @@ -32,7 +33,7 @@ const ( AdminUser ) -func NewApi(c config.RestApi, js jetstream.JetStream, nc *nats.Conn, bridge bridge.Bridge, d db.Database) Api { +func NewApi(c config.RestApi, js jetstream.JetStream, nc *nats.Conn, bridge bridge.Bridge, d db.Database, kv jetstream.KeyValue) Api { return Api{ port: c.Port, js: js, @@ -40,6 +41,7 @@ func NewApi(c config.RestApi, js jetstream.JetStream, nc *nats.Conn, bridge brid ctx: c.Ctx, bridge: bridge, db: d, + kv: kv, } } @@ -51,6 +53,7 @@ func (a *Api) StartApi() { authentication.HandleFunc("/admin/register", a.registerAdminUser).Methods("POST") authentication.HandleFunc("/admin/exists", a.adminUserExists).Methods("GET") iot := r.PathPrefix("/api/device").Subrouter() + iot.HandleFunc("/auth", a.deviceAuth).Methods("GET", "PUT", "DELETE") iot.HandleFunc("", a.retrieveDevices).Methods("GET") iot.HandleFunc("/{id}", a.retrieveDevices).Methods("GET") iot.HandleFunc("/{sn}/{mtp}/get", a.deviceGetMsg).Methods("PUT") @@ -77,10 +80,6 @@ func (a *Api) StartApi() { return middleware.Middleware(handler) }) - // mtp.Use(func(handler http.Handler) http.Handler { - // return middleware.Middleware(handler) - // }) - dash.Use(func(handler http.Handler) http.Handler { return middleware.Middleware(handler) }) diff --git a/backend/services/controller/internal/api/device.go b/backend/services/controller/internal/api/device.go index 0131294..f5669e8 100644 --- a/backend/services/controller/internal/api/device.go +++ b/backend/services/controller/internal/api/device.go @@ -6,6 +6,8 @@ import ( "net/http" "strconv" + "github.com/leandrofars/oktopus/internal/utils" + "github.com/nats-io/nats.go/jetstream" "go.mongodb.org/mongo-driver/bson" ) @@ -102,3 +104,105 @@ func (a *Api) retrieveDevices(w http.ResponseWriter, r *http.Request) { log.Println(err) } } + +type DeviceAuth struct { + User string `json:"id"` + Password string `json:"password"` +} + +func (a *Api) deviceAuth(w http.ResponseWriter, r *http.Request) { + if r.Method == http.MethodGet { + + id := r.URL.Query().Get("id") + if id != "" { + entry, err := a.kv.Get(r.Context(), id) + if err != nil { + if err == jetstream.ErrKeyNotFound { + w.WriteHeader(http.StatusNotFound) + return + } + w.WriteHeader(http.StatusInternalServerError) + utils.MarshallEncoder(err, w) + return + } + utils.MarshallEncoder(map[string]string{ + id: string(entry.Value()), + }, w) + return + } + + entries, err := a.kv.ListKeys(r.Context(), jetstream.IgnoreDeletes()) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + utils.MarshallEncoder(err, w) + return + } + + listOfKeys := make(map[string]string) + + keys := entries.Keys() + for key := range keys { + entry, err := a.kv.Get(r.Context(), key) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + utils.MarshallEncoder(err, w) + return + } + + /*listOfKeys = append(listOfKeys, map[string]string{ + key: string(entry.Value()), + })*/ + listOfKeys[key] = string(entry.Value()) + } + + utils.MarshallEncoder(listOfKeys, w) + + } else if r.Method == http.MethodDelete { + + id := r.URL.Query().Get("id") + if id != "" { + err := a.kv.Purge(r.Context(), id) + if err != nil { + if err == jetstream.ErrKeyNotFound { + w.WriteHeader(http.StatusNotFound) + return + } + w.WriteHeader(http.StatusInternalServerError) + utils.MarshallEncoder(err, w) + return + } + return + } + w.WriteHeader(http.StatusBadRequest) + utils.MarshallEncoder("No id provided", w) + + } else if r.Method == http.MethodPut { + + var deviceAuth DeviceAuth + + err := utils.MarshallDecoder(&deviceAuth, r.Body) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + utils.MarshallEncoder(err, w) + return + } + + if deviceAuth.User != "" { + _, err := a.kv.PutString(r.Context(), deviceAuth.User, deviceAuth.Password) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + utils.MarshallEncoder(err, w) + return + } + return + } + + w.WriteHeader(http.StatusBadRequest) + utils.MarshallEncoder("device must have a user", w) + + } else { + log.Println("Unknown method used in device auth api") + w.WriteHeader(http.StatusMethodNotAllowed) + return + } +} diff --git a/backend/services/controller/internal/nats/nats.go b/backend/services/controller/internal/nats/nats.go index 4cbd9c0..a24e65a 100644 --- a/backend/services/controller/internal/nats/nats.go +++ b/backend/services/controller/internal/nats/nats.go @@ -18,9 +18,11 @@ const ( NATS_WS_SUBJECT_PREFIX = "ws.usp.v1." NATS_WS_ADAPTER_SUBJECT_PREFIX = "ws-adapter.usp.v1." DEVICE_SUBJECT_PREFIX = "device.usp.v1." + BUCKET_NAME = "devices-auth" + BUCKET_DESCRIPTION = "Devices authentication" ) -func StartNatsClient(c config.Nats) (jetstream.JetStream, *nats.Conn) { +func StartNatsClient(c config.Nats) (jetstream.JetStream, *nats.Conn, jetstream.KeyValue) { var ( nc *nats.Conn @@ -46,7 +48,15 @@ func StartNatsClient(c config.Nats) (jetstream.JetStream, *nats.Conn) { log.Fatalf("Failed to create JetStream client: %v", err) } - return js, nc + kv, err := js.CreateOrUpdateKeyValue(c.Ctx, jetstream.KeyValueConfig{ + Bucket: BUCKET_NAME, + Description: BUCKET_DESCRIPTION, + }) + if err != nil { + log.Fatalf("Failed to create KeyValue store: %v", err) + } + + return js, nc, kv } func defineOptions(c config.Nats) []nats.Option { diff --git a/backend/services/controller/internal/utils/utils.go b/backend/services/controller/internal/utils/utils.go index c86d2f1..6019cd7 100644 --- a/backend/services/controller/internal/utils/utils.go +++ b/backend/services/controller/internal/utils/utils.go @@ -22,9 +22,11 @@ func MarshallEncoder(data any, w io.Writer) { } } -func MarshallDecoder(data any, r io.Reader) { +func MarshallDecoder(data any, r io.Reader) error { err := json.NewDecoder(r).Decode(data) if err != nil { log.Printf("Error to decode message into json: %q", err) } + + return err } diff --git a/backend/services/mtp/adapter/.env.local b/backend/services/mtp/adapter/.env.local index 81b4f64..c9b245e 100644 --- a/backend/services/mtp/adapter/.env.local +++ b/backend/services/mtp/adapter/.env.local @@ -1 +1,2 @@ -MONGO_URI="mongodb://172.16.235.2:27017" \ No newline at end of file +MONGO_URI="mongodb://localhost:27017" +CONTROLLER_PASSWORD="test123" diff --git a/backend/services/mtp/adapter/cmd/adapter/main.go b/backend/services/mtp/adapter/cmd/adapter/main.go index c1f4885..cd32564 100644 --- a/backend/services/mtp/adapter/cmd/adapter/main.go +++ b/backend/services/mtp/adapter/cmd/adapter/main.go @@ -20,11 +20,11 @@ func main() { c := config.NewConfig() - js, nc := nats.StartNatsClient(c.Nats) + js, nc := nats.StartNatsClient(c.Nats, c.Controller) db := db.NewDatabase(c.Mongo.Ctx, c.Mongo.Uri) - handler := handler.NewHandler(nc, js, db, c.ControllerId) + handler := handler.NewHandler(nc, js, db, c.Controller.ControllerId) events.StartEventsListener(c.Nats.Ctx, js, handler) diff --git a/backend/services/mtp/adapter/internal/config/config.go b/backend/services/mtp/adapter/internal/config/config.go index ec8980b..7562b36 100644 --- a/backend/services/mtp/adapter/internal/config/config.go +++ b/backend/services/mtp/adapter/internal/config/config.go @@ -24,10 +24,15 @@ type Mongo struct { Ctx context.Context } +type Controller struct { + ControllerId string + ControllerPasswd string +} + type Config struct { - Nats Nats - Mongo Mongo - ControllerId string + Nats Nats + Mongo Mongo + Controller Controller } func NewConfig() *Config { @@ -40,6 +45,7 @@ func NewConfig() *Config { natsVerifyCertificates := flag.Bool("nats_verify_certificates", lookupEnvOrBool("NATS_VERIFY_CERTIFICATES", false), "verify validity of certificates from nats server") mongoUri := flag.String("mongo_uri", lookupEnvOrString("MONGO_URI", "mongodb://localhost:27017"), "uri for mongodb server") controllerId := flag.String("controller_id", lookupEnvOrString("CONTROLLER_ID", "oktopusController"), "usp controller endpoint id") + controllerPassword := flag.String("controller_passwd", lookupEnvOrString("CONTROLLER_PASSWORD", ""), "usp controller endpoint password to connect to") flHelp := flag.Bool("help", false, "Help") /* @@ -69,7 +75,10 @@ func NewConfig() *Config { Uri: *mongoUri, Ctx: ctx, }, - ControllerId: *controllerId, + Controller: Controller{ + ControllerId: *controllerId, + ControllerPasswd: *controllerPassword, + }, } } diff --git a/backend/services/mtp/adapter/internal/nats/nats.go b/backend/services/mtp/adapter/internal/nats/nats.go index c2b8998..7754adf 100644 --- a/backend/services/mtp/adapter/internal/nats/nats.go +++ b/backend/services/mtp/adapter/internal/nats/nats.go @@ -12,16 +12,18 @@ import ( ) const ( - MQTT_STREAM_NAME = "mqtt" - WS_STREAM_NAME = "ws" - STOMP_STREAM_NAME = "stomp" - LORA_STREAM_NAME = "lora" - OPC_STREAM_NAME = "opc" - ADAPTER_SUBJECT = "adapter" + USP_SUBJECT - USP_SUBJECT = ".usp.v1." + MQTT_STREAM_NAME = "mqtt" + WS_STREAM_NAME = "ws" + STOMP_STREAM_NAME = "stomp" + LORA_STREAM_NAME = "lora" + OPC_STREAM_NAME = "opc" + ADAPTER_SUBJECT = "adapter" + USP_SUBJECT + USP_SUBJECT = ".usp.v1." + BUCKET_NAME = "devices-auth" + BUCKET_DESCRIPTION = "Devices authentication" ) -func StartNatsClient(c config.Nats) (jetstream.JetStream, *nats.Conn) { +func StartNatsClient(c config.Nats, controller config.Controller) (jetstream.JetStream, *nats.Conn) { var ( nc *nats.Conn @@ -59,9 +61,26 @@ func StartNatsClient(c config.Nats) (jetstream.JetStream, *nats.Conn) { log.Fatalf("Failed to create Consumer: %v", err) } + createKeyValueStores(c.Ctx, js, controller) + return js, nc } +func createKeyValueStores(ctx context.Context, js jetstream.JetStream, control config.Controller) { + kv, err := js.CreateOrUpdateKeyValue(ctx, jetstream.KeyValueConfig{ + Bucket: BUCKET_NAME, + Description: BUCKET_DESCRIPTION, + }) + if err != nil { + log.Fatalf("Failed to create KeyValue store: %v", err) + } + + _, err = kv.PutString(ctx, control.ControllerId, control.ControllerPasswd) + if err != nil { + log.Printf("Failed to create KeyValue store: %v", err) + } +} + func createStreams(ctx context.Context, js jetstream.JetStream, streams []string) error { for _, stream := range streams { _, err := js.CreateOrUpdateStream(ctx, jetstream.StreamConfig{ diff --git a/backend/services/mtp/mqtt-adapter/.env b/backend/services/mtp/mqtt-adapter/.env new file mode 100644 index 0000000..e69de29 diff --git a/backend/services/mtp/mqtt-adapter/.gitignore b/backend/services/mtp/mqtt-adapter/.gitignore new file mode 100644 index 0000000..3a8fe5e --- /dev/null +++ b/backend/services/mtp/mqtt-adapter/.gitignore @@ -0,0 +1 @@ +.env.local \ No newline at end of file diff --git a/backend/services/mtp/mqtt-adapter/internal/config/config.go b/backend/services/mtp/mqtt-adapter/internal/config/config.go index 8d2c832..08b6ce7 100644 --- a/backend/services/mtp/mqtt-adapter/internal/config/config.go +++ b/backend/services/mtp/mqtt-adapter/internal/config/config.go @@ -43,7 +43,7 @@ func NewConfig() *Config { natsVerifyCertificates := flag.Bool("nats_verify_certificates", lookupEnvOrBool("NATS_VERIFY_CERTIFICATES", false), "verify validity of certificates from nats server") mqttUrl := flag.String("mqtt_url", lookupEnvOrString("MQTT_URL", "tcp://localhost:1883"), "url for mqtt server") mqttClientId := flag.String("mqtt_client_id", lookupEnvOrString("MQTT_CLIENT_ID", "mqtt-adapter"), "client id for mqtt") - mqttUsername := flag.String("mqtt_username", lookupEnvOrString("MQTT_USERNAME", ""), "username for mqtt") + mqttUsername := flag.String("mqtt_username", lookupEnvOrString("MQTT_USERNAME", "oktopusController"), "username for mqtt") mqttPassword := flag.String("mqtt_password", lookupEnvOrString("MQTT_PASSWORD", ""), "password for mqtt") mqttQos := flag.Int("mqtt_qos", lookupEnvOrInt("MQTT_QOS", 1), "quality of service for mqtt") flHelp := flag.Bool("help", false, "Help") diff --git a/backend/services/mtp/mqtt/README.md b/backend/services/mtp/mqtt/README.md index ee92491..b9e251f 100644 --- a/backend/services/mtp/mqtt/README.md +++ b/backend/services/mtp/mqtt/README.md @@ -1,4 +1,5 @@ - This MQTT broker runs with https://github.com/mochi-mqtt/server - It's compatible with MQTTv5 and MQTTv3.1.1 +- At authentication, the device is only allowed to subscribe and publish to topics that finish with their username For more info access https://github.com/mochi-mqtt/server and/or execute "go run cmd/mqtt/main.go --help" \ No newline at end of file diff --git a/backend/services/mtp/mqtt/cmd/mqtt/auth.json b/backend/services/mtp/mqtt/cmd/mqtt/auth.json deleted file mode 100644 index f05253a..0000000 --- a/backend/services/mtp/mqtt/cmd/mqtt/auth.json +++ /dev/null @@ -1,54 +0,0 @@ -{ - "auth": [ - { - "username": "leandro", - "password": "leandro", - "allow": true - }, - { - "username": "steve", - "password": "steve", - "allow": true - }, - { - "username": "root", - "password": "root", - "allow": true - }, - { - "remote": "*", - "allow": false - }, - { - "remote": "*", - "allow": false - } - ], - "acl": [ - { - "remote": "*" - }, - { - "username": "leandro", - "filters": { - "oktopus/+/agent/+": 1, - "oktopus/+/controller/+": 2, - "oktopus/+/get/+": 2 - } - }, - { - "username": "steve", - "filters": { - "oktopus/+/agent/+": 1, - "oktopus/+/controller/+": 2, - "oktopus/+/get/+": 2 - } - }, - { - "username": "root", - "filters": { - "#": 3 - } - } - ] -} \ No newline at end of file diff --git a/backend/services/mtp/mqtt/cmd/mqtt/run.sh b/backend/services/mtp/mqtt/cmd/mqtt/run.sh deleted file mode 100644 index 186907d..0000000 --- a/backend/services/mtp/mqtt/cmd/mqtt/run.sh +++ /dev/null @@ -1 +0,0 @@ -go run . -path auth.json diff --git a/backend/services/mtp/mqtt/go.mod b/backend/services/mtp/mqtt/go.mod index 381107f..acf272f 100644 --- a/backend/services/mtp/mqtt/go.mod +++ b/backend/services/mtp/mqtt/go.mod @@ -14,9 +14,15 @@ require ( github.com/google/uuid v1.6.0 // indirect github.com/gorilla/websocket v1.5.0 // indirect github.com/joho/godotenv v1.5.1 // indirect + github.com/klauspost/compress v1.17.2 // indirect github.com/mattn/go-colorable v0.1.12 // indirect github.com/mattn/go-isatty v0.0.14 // indirect + github.com/nats-io/nats.go v1.34.1 // indirect + github.com/nats-io/nkeys v0.4.7 // indirect + github.com/nats-io/nuid v1.0.1 // indirect github.com/rs/xid v1.4.0 // indirect - golang.org/x/sys v0.5.0 // indirect + golang.org/x/crypto v0.18.0 // indirect + golang.org/x/sys v0.16.0 // indirect + golang.org/x/text v0.14.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/backend/services/mtp/mqtt/go.sum b/backend/services/mtp/mqtt/go.sum index a4dd6fa..8f75c0d 100644 --- a/backend/services/mtp/mqtt/go.sum +++ b/backend/services/mtp/mqtt/go.sum @@ -17,12 +17,20 @@ github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/ad github.com/jinzhu/copier v0.3.5 h1:GlvfUwHk62RokgqVNvYsku0TATCF7bAHVwEXoBh3iJg= github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= +github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4= +github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/mattn/go-colorable v0.1.12 h1:jF+Du6AlPIjs2BiUiQlKOX0rt3SujHxPnksPKZbaA40= github.com/mattn/go-colorable v0.1.12/go.mod h1:u5H1YNBxpqRaxsYJYSkiCWKzEfiAb1Gb520KVy5xxl4= github.com/mattn/go-isatty v0.0.14 h1:yVuAays6BHfxijgZPzw+3Zlu5yQgKGP2/hcQbHb7S9Y= github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94= github.com/mochi-co/mqtt/v2 v2.2.16 h1:CBqbxFhExzASNjj4BjSei0hYY1F5N5IeDqNVhjN+tp8= github.com/mochi-co/mqtt/v2 v2.2.16/go.mod h1:MDMTThFgWj/LjJ6wc51bP5l4xnJG/ahpc9tR9vZVf8Q= +github.com/nats-io/nats.go v1.34.1 h1:syWey5xaNHZgicYBemv0nohUPPmaLteiBEUT6Q5+F/4= +github.com/nats-io/nats.go v1.34.1/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= +github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= +github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE= @@ -34,12 +42,18 @@ github.com/rs/zerolog v1.29.1 h1:cO+d60CHkknCbvzEWxP0S9K6KqyTjrCNUy1LdQLCGPc= github.com/rs/zerolog v1.29.1/go.mod h1:Le6ESbR7hc+DP6Lt1THiV8CQSdkkNrd3R0XbEgp3ZBU= github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY= github.com/yuin/gopher-lua v0.0.0-20210529063254-f4c35e4016d9 h1:k/gmLsJDWwWqbLCur2yWnJzwQEKRcAHXo6seXGuSwWw= +golang.org/x/crypto v0.18.0 h1:PGVlW0xEltQnzFZ55hkuX5+KLyrMYhHld1YHO4AKcdc= +golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg= golang.org/x/net v0.7.0 h1:rJrUqqhjsgNp7KqAIc25s9pZnjU7TUcSY7HcVZjdn1g= golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU= +golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M= +golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= diff --git a/backend/services/mtp/mqtt/internal/config/config.go b/backend/services/mtp/mqtt/internal/config/config.go index 157b93b..b33504a 100644 --- a/backend/services/mtp/mqtt/internal/config/config.go +++ b/backend/services/mtp/mqtt/internal/config/config.go @@ -1,6 +1,7 @@ package config import ( + "context" "flag" "log" "os" @@ -16,7 +17,7 @@ type Config struct { Tls bool Fullchain string Privkey string - AuthFile string + AuthEnable bool RedisEnable bool RedisAddr string RedisPassword string @@ -25,9 +26,15 @@ type Config struct { HttpEnable bool HttpPort string LogLevel int + Nats Nats } -//TODO: debug websocket and http listeners +type Nats struct { + Url string + Name string + VerifyCertificates bool + Ctx context.Context +} func NewConfig() Config { @@ -44,15 +51,18 @@ func NewConfig() Config { tls := flag.Bool("mqtt_tls", lookupEnvOrBool("MQTT_TLS", false), "enable/disable TLS") fullchain := flag.String("full_chain_path", lookupEnvOrString("FULL_CHAIN_PATH", ""), "path to fullchain.pem certificate") privkey := flag.String("private_key_path", lookupEnvOrString("PRIVATE_KEY_PATH", ""), "path to privkey.pem certificate") - authFile := flag.String("auth_file_path", lookupEnvOrString("AUTH_FILE_PATH", ""), "path to MQTT RBAC auth file") + authEnable := flag.Bool("auth_enable", lookupEnvOrBool("AUTH_ENABLE", false), "enable authentication") redisEnable := flag.Bool("redis_enable", lookupEnvOrBool("REDIS_ENABLE", true), "enable/disable Redis db") - redisAddr := flag.String("redis_addr", lookupEnvOrString("REDIS_ADDR", "localhost:6379"), "address of redis db") + redisAddr := flag.String("redis_addr", lookupEnvOrString("REDIS_ADDR", ""), "address of redis db") redisPassword := flag.String("redis_passwd", lookupEnvOrString("REDIS_PASSWD", ""), "redis db password") wsEnable := flag.Bool("ws_enable", lookupEnvOrBool("WS_ENABLE", false), "enable/disable Websocket listener") wsPort := flag.String("ws_port", lookupEnvOrString("WS_PORT", ":80"), "port for Websocket listener") httpEnable := flag.Bool("http_enable", lookupEnvOrBool("HTTP_ENABLE", false), "enable/disable HTTP listener of mqtt metrics") httpPort := flag.String("http_port", lookupEnvOrString("HTTP_PORT", ":8080"), "port for HTTP listener of mqtt metrics") logLevel := flag.Int("log_level", lookupEnvOrInt("LOG_LEVEL", 1), "0=DEBUG, 1=INFO, 2=WARNING, 3=ERROR") + natsUrl := flag.String("nats_url", lookupEnvOrString("NATS_URL", "nats://localhost:4222"), "url for nats server") + natsName := flag.String("nats_name", lookupEnvOrString("NATS_NAME", "adapter"), "name for nats client") + natsVerifyCertificates := flag.Bool("nats_verify_certificates", lookupEnvOrBool("NATS_VERIFY_CERTIFICATES", false), "verify validity of certificates from nats server") flag.Parse() flHelp := flag.Bool("help", false, "Help") @@ -62,12 +72,14 @@ func NewConfig() Config { os.Exit(0) } + ctx := context.TODO() + conf := Config{ MqttPort: *mqttPort, Tls: *tls, Fullchain: *fullchain, Privkey: *privkey, - AuthFile: *authFile, + AuthEnable: *authEnable, RedisEnable: *redisEnable, RedisAddr: *redisAddr, RedisPassword: *redisPassword, @@ -76,6 +88,12 @@ func NewConfig() Config { HttpEnable: *httpEnable, HttpPort: *httpPort, LogLevel: *logLevel, + Nats: Nats{ + Url: *natsUrl, + Name: *natsName, + VerifyCertificates: *natsVerifyCertificates, + Ctx: ctx, + }, } conf.validate() diff --git a/backend/services/mtp/mqtt/internal/listeners/listeners.go b/backend/services/mtp/mqtt/internal/listeners/listeners.go index cd7922e..6e601bc 100644 --- a/backend/services/mtp/mqtt/internal/listeners/listeners.go +++ b/backend/services/mtp/mqtt/internal/listeners/listeners.go @@ -52,17 +52,18 @@ func StartServers(c config.Config) { func newMqttServer(c config.Config) *broker.Mqtt { return &broker.Mqtt{ - Port: c.MqttPort, - Tls: c.Tls, - Fullchain: c.Fullchain, - Privkey: c.Privkey, - AuthFile: c.AuthFile, + Port: c.MqttPort, + Tls: c.Tls, + Fullchain: c.Fullchain, + Privkey: c.Privkey, + AuthEnable: c.AuthEnable, Redis: broker.Redis{ RedisEnable: c.RedisEnable, RedisAddr: c.RedisAddr, RedisPassword: c.RedisPassword, }, LogLevel: c.LogLevel, + Nats: c.Nats, } } diff --git a/backend/services/mtp/mqtt/internal/listeners/mqtt/hook.go b/backend/services/mtp/mqtt/internal/listeners/mqtt/hook.go index 5ab883f..d763903 100644 --- a/backend/services/mtp/mqtt/internal/listeners/mqtt/hook.go +++ b/backend/services/mtp/mqtt/internal/listeners/mqtt/hook.go @@ -1,18 +1,28 @@ package mqtt import ( + "broker/internal/config" + "broker/internal/nats" "bytes" + "context" "log" "strings" "github.com/mochi-co/mqtt/v2" + "github.com/mochi-co/mqtt/v2/hooks/auth" "github.com/mochi-co/mqtt/v2/packets" + "github.com/nats-io/nats.go/jetstream" ) type MyHook struct { mqtt.HookBase } +type NatsAuthHook struct { + mqtt.HookBase + kv jetstream.KeyValue +} + func (h *MyHook) ID() string { return "events-controller" } @@ -86,3 +96,91 @@ func (h *MyHook) OnPacketEncode(cl *mqtt.Client, pk packets.Packet) packets.Pack return pk } + +func (h *NatsAuthHook) ID() string { + return "device-auth" +} + +func (h *NatsAuthHook) Provides(b byte) bool { + return bytes.Contains([]byte{ + mqtt.OnConnectAuthenticate, + mqtt.OnACLCheck, + }, []byte{b}) +} + +func (h *NatsAuthHook) Init(c any) error { + + _, _, kv := nats.StartNatsClient(c.(config.Nats)) + h.kv = kv + + h.Log.Info().Msg("initialised device auth nats hook") + return nil +} + +func (h *NatsAuthHook) OnConnectAuthenticate(cl *mqtt.Client, pk packets.Packet) bool { + + username := string(pk.Connect.Username) + + entry, err := h.kv.Get(context.TODO(), username) + if err != nil { + if err == jetstream.ErrKeyNotFound { + log.Println("user access not found, blocked user:", username) + return false + } + log.Println("error getting key value: ", err) + return false + } + + if bytes.Equal(entry.Value(), pk.Connect.Password) { + return true + } + + return false +} + +func (h *NatsAuthHook) OnACLCheck(cl *mqtt.Client, topic string, write bool) bool { + + username := string(cl.Properties.Username) + + _, err := h.kv.Get(context.TODO(), username) + if err != nil { + if err == jetstream.ErrKeyNotFound { + log.Println("user access not found, blocked user:", username) + return false + } + log.Println("error getting key value: ", err) + return false + } + + if username == "oktopusController" { + return true + } + + if !write { + deviceAllowedTopics := []string{ + "oktopus/usp/v1/agent/" + username, + } + for _, allowedTopic := range deviceAllowedTopics { + _, ok := auth.MatchTopic(allowedTopic, topic) + if ok { + return true + } + } + return false + } + + if write { + deviceAllowedTopics := []string{ + "oktopus/usp/v1/controller", + "oktopus/usp/v1/api/" + username, + } + for _, allowedTopic := range deviceAllowedTopics { + _, ok := auth.MatchTopic(allowedTopic, topic) + if ok { + return true + } + } + } + + return false +} diff --git a/backend/services/mtp/mqtt/internal/listeners/mqtt/mqtt.go b/backend/services/mtp/mqtt/internal/listeners/mqtt/mqtt.go index 9deb164..bdf3728 100644 --- a/backend/services/mtp/mqtt/internal/listeners/mqtt/mqtt.go +++ b/backend/services/mtp/mqtt/internal/listeners/mqtt/mqtt.go @@ -1,10 +1,10 @@ package mqtt import ( + "broker/internal/config" "crypto/tls" "io/ioutil" "log" - "os" rv8 "github.com/go-redis/redis/v8" "github.com/google/uuid" @@ -20,13 +20,14 @@ var ( ) type Mqtt struct { - Port string - Tls bool - Fullchain string - Privkey string - AuthFile string - Redis Redis - LogLevel int + Port string + Tls bool + Fullchain string + Privkey string + Redis Redis + LogLevel int + Nats config.Nats + AuthEnable bool } type Redis struct { @@ -38,7 +39,7 @@ type Redis struct { func (m *Mqtt) Start(mqttServer *mqtt.Server) { defineSeverLog(mqttServer, m.LogLevel) - defineServerAuth(mqttServer, m.AuthFile) + defineServerAuth(mqttServer, m.Nats, m.AuthEnable) server = mqttServer @@ -112,15 +113,9 @@ func defineServerTls(fullchain, privkey string) *listeners.Config { return nil } -func defineServerAuth(server *mqtt.Server, authFile string) { - if authFile != "" { - data, err := os.ReadFile(authFile) - if err != nil { - log.Fatal(err) - } - err = server.AddHook(new(auth.Hook), &auth.Options{ - Data: data, - }) +func defineServerAuth(server *mqtt.Server, natsConfig config.Nats, authEnable bool) { + if authEnable { + err := server.AddHook(new(NatsAuthHook), natsConfig) if err != nil { log.Fatal(err) } diff --git a/backend/services/mtp/mqtt/internal/nats/nats.go b/backend/services/mtp/mqtt/internal/nats/nats.go new file mode 100644 index 0000000..1eb51a4 --- /dev/null +++ b/backend/services/mtp/mqtt/internal/nats/nats.go @@ -0,0 +1,82 @@ +package nats + +import ( + "broker/internal/config" + "log" + "time" + + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" +) + +const ( + NATS_ACCOUNT_SUBJ_PREFIX = "account-manager.v1." + NATS_REQUEST_TIMEOUT = 10 * time.Second + NATS_MQTT_SUBJECT_PREFIX = "mqtt.usp.v1." + NATS_MQTT_ADAPTER_SUBJECT_PREFIX = "mqtt-adapter.usp.v1." + NATS_ADAPTER_SUBJECT = "adapter.usp.v1." + NATS_WS_SUBJECT_PREFIX = "ws.usp.v1." + NATS_WS_ADAPTER_SUBJECT_PREFIX = "ws-adapter.usp.v1." + DEVICE_SUBJECT_PREFIX = "device.usp.v1." + BUCKET_NAME = "devices-auth" + BUCKET_DESCRIPTION = "Devices authentication" +) + +func StartNatsClient(c config.Nats) (jetstream.JetStream, *nats.Conn, jetstream.KeyValue) { + + var ( + nc *nats.Conn + err error + ) + + opts := defineOptions(c) + + log.Printf("Connecting to NATS server %s", c.Url) + + for { + nc, err = nats.Connect(c.Url, opts...) + if err != nil { + time.Sleep(5 * time.Second) + continue + } + break + } + log.Printf("Successfully connected to NATS server %s", c.Url) + + js, err := jetstream.New(nc) + if err != nil { + log.Fatalf("Failed to create JetStream client: %v", err) + } + + kv, err := js.CreateOrUpdateKeyValue(c.Ctx, jetstream.KeyValueConfig{ + Bucket: BUCKET_NAME, + Description: BUCKET_DESCRIPTION, + }) + if err != nil { + log.Fatalf("Failed to create KeyValue store: %v", err) + } + + return js, nc, kv +} + +func defineOptions(c config.Nats) []nats.Option { + var opts []nats.Option + + opts = append(opts, nats.Name(c.Name)) + opts = append(opts, nats.MaxReconnects(-1)) + opts = append(opts, nats.ReconnectWait(5*time.Second)) + opts = append(opts, nats.DisconnectErrHandler(func(nc *nats.Conn, err error) { + log.Printf("Got disconnected! Reason: %q\n", err) + })) + opts = append(opts, nats.ReconnectHandler(func(nc *nats.Conn) { + log.Printf("Got reconnected to %v!\n", nc.ConnectedUrl()) + })) + opts = append(opts, nats.ClosedHandler(func(nc *nats.Conn) { + log.Printf("Connection closed. Reason: %q\n", nc.LastError()) + })) + if c.VerifyCertificates { + opts = append(opts, nats.RootCAs()) + } + + return opts +} diff --git a/backend/services/mtp/ws-adapter/.gitignore b/backend/services/mtp/ws-adapter/.gitignore new file mode 100644 index 0000000..3a8fe5e --- /dev/null +++ b/backend/services/mtp/ws-adapter/.gitignore @@ -0,0 +1 @@ +.env.local \ No newline at end of file diff --git a/backend/services/mtp/ws/cmd/ws/main.go b/backend/services/mtp/ws/cmd/ws/main.go index 2a49c94..4fde1b8 100644 --- a/backend/services/mtp/ws/cmd/ws/main.go +++ b/backend/services/mtp/ws/cmd/ws/main.go @@ -7,6 +7,7 @@ import ( "syscall" "github.com/OktopUSP/oktopus/ws/internal/config" + "github.com/OktopUSP/oktopus/ws/internal/nats" "github.com/OktopUSP/oktopus/ws/internal/ws" ) @@ -19,7 +20,9 @@ func main() { // Locks app running until it receives a stop command as Ctrl+C. signal.Notify(done, syscall.SIGINT) - ws.StartNewServer(conf) + _, kv := nats.StartNatsClient(conf.Nats) + + ws.StartNewServer(conf, kv) <-done diff --git a/backend/services/mtp/ws/go.mod b/backend/services/mtp/ws/go.mod index a42cdbf..1d82ba9 100644 --- a/backend/services/mtp/ws/go.mod +++ b/backend/services/mtp/ws/go.mod @@ -9,4 +9,13 @@ require ( google.golang.org/protobuf v1.32.0 ) -require golang.org/x/net v0.17.0 // indirect +require ( + github.com/klauspost/compress v1.17.2 // indirect + github.com/nats-io/nats.go v1.34.1 // indirect + github.com/nats-io/nkeys v0.4.7 // indirect + github.com/nats-io/nuid v1.0.1 // indirect + golang.org/x/crypto v0.18.0 // indirect + golang.org/x/net v0.17.0 // indirect + golang.org/x/sys v0.16.0 // indirect + golang.org/x/text v0.14.0 // indirect +) diff --git a/backend/services/mtp/ws/go.sum b/backend/services/mtp/ws/go.sum index a9eb360..a79c4e4 100644 --- a/backend/services/mtp/ws/go.sum +++ b/backend/services/mtp/ws/go.sum @@ -6,8 +6,22 @@ github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/ github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY= github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= +github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4= +github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/nats-io/nats.go v1.34.1 h1:syWey5xaNHZgicYBemv0nohUPPmaLteiBEUT6Q5+F/4= +github.com/nats-io/nats.go v1.34.1/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= +github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= +github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= +golang.org/x/crypto v0.18.0 h1:PGVlW0xEltQnzFZ55hkuX5+KLyrMYhHld1YHO4AKcdc= +golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg= golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM= golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= +golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU= +golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/protobuf v1.32.0 h1:pPC6BG5ex8PDFnkbrGU3EixyhKcQ2aDuBS36lqK/C7I= diff --git a/backend/services/mtp/ws/internal/config/config.go b/backend/services/mtp/ws/internal/config/config.go index 9aebb3f..840c0e6 100644 --- a/backend/services/mtp/ws/internal/config/config.go +++ b/backend/services/mtp/ws/internal/config/config.go @@ -2,6 +2,7 @@ package config import ( + "context" "flag" "log" "os" @@ -13,9 +14,16 @@ import ( type Config struct { Port string // server port: e.g. ":8080" Auth bool // server auth enable/disable - Token string // controller auth token ControllerEID string // controller endpoint id Tls bool // enable/diable websockets server tls + Nats Nats +} + +type Nats struct { + Url string + Name string + VerifyCertificates bool + Ctx context.Context } func NewConfig() Config { @@ -33,8 +41,10 @@ func NewConfig() Config { */ /* ------------------------------ define flags ------------------------------ */ + natsUrl := flag.String("nats_url", lookupEnvOrString("NATS_URL", "nats://localhost:4222"), "url for nats server") + natsName := flag.String("nats_name", lookupEnvOrString("NATS_NAME", "ws-adapter"), "name for nats client") + natsVerifyCertificates := flag.Bool("nats_verify_certificates", lookupEnvOrBool("NATS_VERIFY_CERTIFICATES", false), "verify validity of certificates from nats server") flPort := flag.String("port", lookupEnvOrString("SERVER_PORT", ":8080"), "Server port") - flToken := flag.String("token", lookupEnvOrString("SERVER_AUTH_TOKEN", ""), "Controller auth token") flAuth := flag.Bool("auth", lookupEnvOrBool("SERVER_AUTH_ENABLE", false), "Server auth enable/disable") flControllerEid := flag.String("controller-eid", lookupEnvOrString("CONTROLLER_EID", "oktopusController"), "Controller eid") flTls := flag.Bool("tls", lookupEnvOrBool("SERVER_TLS_ENABLE", false), "Enable/diable websockets server tls") @@ -47,12 +57,19 @@ func NewConfig() Config { os.Exit(0) } + ctx := context.TODO() + return Config{ Port: *flPort, - Token: *flToken, Auth: *flAuth, ControllerEID: *flControllerEid, Tls: *flTls, + Nats: Nats{ + Url: *natsUrl, + Name: *natsName, + VerifyCertificates: *natsVerifyCertificates, + Ctx: ctx, + }, } } diff --git a/backend/services/mtp/ws/internal/nats/nats.go b/backend/services/mtp/ws/internal/nats/nats.go new file mode 100644 index 0000000..a5a557d --- /dev/null +++ b/backend/services/mtp/ws/internal/nats/nats.go @@ -0,0 +1,74 @@ +package nats + +import ( + "log" + "time" + + "github.com/OktopUSP/oktopus/ws/internal/config" + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" +) + +const ( + BUCKET_NAME = "devices-auth" + BUCKET_DESCRIPTION = "Devices authentication" +) + +func StartNatsClient(c config.Nats) (*nats.Conn, jetstream.KeyValue) { + + var ( + nc *nats.Conn + err error + ) + + opts := defineOptions(c) + + log.Printf("Connecting to NATS server %s", c.Url) + + for { + nc, err = nats.Connect(c.Url, opts...) + if err != nil { + time.Sleep(5 * time.Second) + continue + } + break + } + log.Printf("Successfully connected to NATS server %s", c.Url) + + js, err := jetstream.New(nc) + if err != nil { + log.Fatalf("Failed to create JetStream client: %v", err) + } + + kv, err := js.CreateOrUpdateKeyValue(c.Ctx, jetstream.KeyValueConfig{ + Bucket: BUCKET_NAME, + Description: BUCKET_DESCRIPTION, + }) + if err != nil { + log.Fatalf("Failed to create KeyValue store: %v", err) + } + + return nc, kv +} + +func defineOptions(c config.Nats) []nats.Option { + var opts []nats.Option + + opts = append(opts, nats.Name(c.Name)) + opts = append(opts, nats.MaxReconnects(-1)) + opts = append(opts, nats.ReconnectWait(5*time.Second)) + opts = append(opts, nats.DisconnectErrHandler(func(nc *nats.Conn, err error) { + log.Printf("Got disconnected! Reason: %q\n", err) + })) + opts = append(opts, nats.ReconnectHandler(func(nc *nats.Conn) { + log.Printf("Got reconnected to %v!\n", nc.ConnectedUrl()) + })) + opts = append(opts, nats.ClosedHandler(func(nc *nats.Conn) { + log.Printf("Connection closed. Reason: %q\n", nc.LastError()) + })) + if c.VerifyCertificates { + opts = append(opts, nats.RootCAs()) + } + + return opts +} diff --git a/backend/services/mtp/ws/internal/ws/handler/client.go b/backend/services/mtp/ws/internal/ws/handler/client.go index e2f297e..f26c8cb 100644 --- a/backend/services/mtp/ws/internal/ws/handler/client.go +++ b/backend/services/mtp/ws/internal/ws/handler/client.go @@ -7,7 +7,9 @@ import ( "time" "github.com/OktopUSP/oktopus/ws/internal/usp_record" + "github.com/gorilla/mux" "github.com/gorilla/websocket" + "github.com/nats-io/nats.go/jetstream" "google.golang.org/protobuf/proto" ) @@ -148,10 +150,26 @@ func (c *Client) writePump() { } // Handle USP Controller events -func ServeController(w http.ResponseWriter, r *http.Request, token, cEID string, authEnable bool) { +func ServeController( + w http.ResponseWriter, + r *http.Request, + cEID string, + authEnable bool, + kv jetstream.KeyValue, +) { if authEnable { + entry, err := kv.Get(r.Context(), cEID) + if err != nil { + if err == jetstream.ErrKeyNotFound { + w.WriteHeader(http.StatusUnauthorized) + return + } + log.Println("Nats kv error:", err) + w.Write([]byte("Nats kv error:" + err.Error())) + return + } recv_token := r.URL.Query().Get("token") - if recv_token != token { + if recv_token != string(entry.Value()) { w.WriteHeader(http.StatusUnauthorized) w.Write([]byte("Unauthorized")) return @@ -172,7 +190,13 @@ func ServeController(w http.ResponseWriter, r *http.Request, token, cEID string, } // Handle USP Agent events, cEID = controller endpoint id -func ServeAgent(w http.ResponseWriter, r *http.Request, cEID string) { +func ServeAgent( + w http.ResponseWriter, + r *http.Request, + cEID string, + kv jetstream.KeyValue, + authEnable bool, +) { header := http.Header{ "Sec-Websocket-Protocol": {uspVersion}, @@ -187,6 +211,24 @@ func ServeAgent(w http.ResponseWriter, r *http.Request, cEID string) { return } + if authEnable { + entry, err := kv.Get(r.Context(), deviceid) + if err != nil { + if err == jetstream.ErrKeyNotFound { + w.WriteHeader(http.StatusUnauthorized) + return + } + log.Println("Nats kv error:", err) + w.Write([]byte("Nats kv error:" + err.Error())) + return + } + + if mux.Vars(r)["passwd"] != string(entry.Value()) { + w.WriteHeader(http.StatusUnauthorized) + return + } + } + conn, err := upgrader.Upgrade(w, r, header) if err != nil { log.Println(err) diff --git a/backend/services/mtp/ws/internal/ws/ws.go b/backend/services/mtp/ws/internal/ws/ws.go index 08e4eb4..d08ef44 100644 --- a/backend/services/mtp/ws/internal/ws/ws.go +++ b/backend/services/mtp/ws/internal/ws/ws.go @@ -9,19 +9,23 @@ import ( "github.com/OktopUSP/oktopus/ws/internal/config" "github.com/OktopUSP/oktopus/ws/internal/ws/handler" "github.com/gorilla/mux" + "github.com/nats-io/nats.go/jetstream" ) // Starts New Websockets Server -func StartNewServer(c config.Config) { +func StartNewServer(c config.Config, kv jetstream.KeyValue) { // Initialize handlers of websockets events go handler.InitHandlers(c.ControllerEID) r := mux.NewRouter() + r.HandleFunc("/ws/agent/{passwd}", func(w http.ResponseWriter, r *http.Request) { + handler.ServeAgent(w, r, c.ControllerEID, kv, c.Auth) + }) r.HandleFunc("/ws/agent", func(w http.ResponseWriter, r *http.Request) { - handler.ServeAgent(w, r, c.ControllerEID) + handler.ServeAgent(w, r, c.ControllerEID, kv, c.Auth) }) r.HandleFunc("/ws/controller", func(w http.ResponseWriter, r *http.Request) { - handler.ServeController(w, r, c.Token, c.ControllerEID, c.Auth) + handler.ServeController(w, r, c.ControllerEID, c.Auth, kv) }) go func() {