feat: device auth as rbac via nats keyvalue store | closes #212

This commit is contained in:
leandrofars 2024-04-12 19:26:48 -03:00
parent 468a74ea88
commit 4047e1db0b
30 changed files with 590 additions and 121 deletions

View File

@ -20,13 +20,13 @@ func main() {
c := config.NewConfig()
js, nc := nats.StartNatsClient(c.Nats)
js, nc, kv := nats.StartNatsClient(c.Nats)
bridge := bridge.NewBridge(js, nc)
db := db.NewDatabase(c.Mongo.Ctx, c.Mongo.Uri)
api := api.NewApi(c.RestApi, js, nc, bridge, db)
api := api.NewApi(c.RestApi, js, nc, bridge, db, kv)
api.StartApi()
<-done

View File

@ -22,6 +22,7 @@ type Api struct {
nc *nats.Conn
bridge bridge.Bridge
db db.Database
kv jetstream.KeyValue
ctx context.Context
}
@ -32,7 +33,7 @@ const (
AdminUser
)
func NewApi(c config.RestApi, js jetstream.JetStream, nc *nats.Conn, bridge bridge.Bridge, d db.Database) Api {
func NewApi(c config.RestApi, js jetstream.JetStream, nc *nats.Conn, bridge bridge.Bridge, d db.Database, kv jetstream.KeyValue) Api {
return Api{
port: c.Port,
js: js,
@ -40,6 +41,7 @@ func NewApi(c config.RestApi, js jetstream.JetStream, nc *nats.Conn, bridge brid
ctx: c.Ctx,
bridge: bridge,
db: d,
kv: kv,
}
}
@ -51,6 +53,7 @@ func (a *Api) StartApi() {
authentication.HandleFunc("/admin/register", a.registerAdminUser).Methods("POST")
authentication.HandleFunc("/admin/exists", a.adminUserExists).Methods("GET")
iot := r.PathPrefix("/api/device").Subrouter()
iot.HandleFunc("/auth", a.deviceAuth).Methods("GET", "PUT", "DELETE")
iot.HandleFunc("", a.retrieveDevices).Methods("GET")
iot.HandleFunc("/{id}", a.retrieveDevices).Methods("GET")
iot.HandleFunc("/{sn}/{mtp}/get", a.deviceGetMsg).Methods("PUT")
@ -77,10 +80,6 @@ func (a *Api) StartApi() {
return middleware.Middleware(handler)
})
// mtp.Use(func(handler http.Handler) http.Handler {
// return middleware.Middleware(handler)
// })
dash.Use(func(handler http.Handler) http.Handler {
return middleware.Middleware(handler)
})

View File

@ -6,6 +6,8 @@ import (
"net/http"
"strconv"
"github.com/leandrofars/oktopus/internal/utils"
"github.com/nats-io/nats.go/jetstream"
"go.mongodb.org/mongo-driver/bson"
)
@ -102,3 +104,105 @@ func (a *Api) retrieveDevices(w http.ResponseWriter, r *http.Request) {
log.Println(err)
}
}
type DeviceAuth struct {
User string `json:"id"`
Password string `json:"password"`
}
func (a *Api) deviceAuth(w http.ResponseWriter, r *http.Request) {
if r.Method == http.MethodGet {
id := r.URL.Query().Get("id")
if id != "" {
entry, err := a.kv.Get(r.Context(), id)
if err != nil {
if err == jetstream.ErrKeyNotFound {
w.WriteHeader(http.StatusNotFound)
return
}
w.WriteHeader(http.StatusInternalServerError)
utils.MarshallEncoder(err, w)
return
}
utils.MarshallEncoder(map[string]string{
id: string(entry.Value()),
}, w)
return
}
entries, err := a.kv.ListKeys(r.Context(), jetstream.IgnoreDeletes())
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
utils.MarshallEncoder(err, w)
return
}
listOfKeys := make(map[string]string)
keys := entries.Keys()
for key := range keys {
entry, err := a.kv.Get(r.Context(), key)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
utils.MarshallEncoder(err, w)
return
}
/*listOfKeys = append(listOfKeys, map[string]string{
key: string(entry.Value()),
})*/
listOfKeys[key] = string(entry.Value())
}
utils.MarshallEncoder(listOfKeys, w)
} else if r.Method == http.MethodDelete {
id := r.URL.Query().Get("id")
if id != "" {
err := a.kv.Purge(r.Context(), id)
if err != nil {
if err == jetstream.ErrKeyNotFound {
w.WriteHeader(http.StatusNotFound)
return
}
w.WriteHeader(http.StatusInternalServerError)
utils.MarshallEncoder(err, w)
return
}
return
}
w.WriteHeader(http.StatusBadRequest)
utils.MarshallEncoder("No id provided", w)
} else if r.Method == http.MethodPut {
var deviceAuth DeviceAuth
err := utils.MarshallDecoder(&deviceAuth, r.Body)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
utils.MarshallEncoder(err, w)
return
}
if deviceAuth.User != "" {
_, err := a.kv.PutString(r.Context(), deviceAuth.User, deviceAuth.Password)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
utils.MarshallEncoder(err, w)
return
}
return
}
w.WriteHeader(http.StatusBadRequest)
utils.MarshallEncoder("device must have a user", w)
} else {
log.Println("Unknown method used in device auth api")
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
}

View File

@ -18,9 +18,11 @@ const (
NATS_WS_SUBJECT_PREFIX = "ws.usp.v1."
NATS_WS_ADAPTER_SUBJECT_PREFIX = "ws-adapter.usp.v1."
DEVICE_SUBJECT_PREFIX = "device.usp.v1."
BUCKET_NAME = "devices-auth"
BUCKET_DESCRIPTION = "Devices authentication"
)
func StartNatsClient(c config.Nats) (jetstream.JetStream, *nats.Conn) {
func StartNatsClient(c config.Nats) (jetstream.JetStream, *nats.Conn, jetstream.KeyValue) {
var (
nc *nats.Conn
@ -46,7 +48,15 @@ func StartNatsClient(c config.Nats) (jetstream.JetStream, *nats.Conn) {
log.Fatalf("Failed to create JetStream client: %v", err)
}
return js, nc
kv, err := js.CreateOrUpdateKeyValue(c.Ctx, jetstream.KeyValueConfig{
Bucket: BUCKET_NAME,
Description: BUCKET_DESCRIPTION,
})
if err != nil {
log.Fatalf("Failed to create KeyValue store: %v", err)
}
return js, nc, kv
}
func defineOptions(c config.Nats) []nats.Option {

View File

@ -22,9 +22,11 @@ func MarshallEncoder(data any, w io.Writer) {
}
}
func MarshallDecoder(data any, r io.Reader) {
func MarshallDecoder(data any, r io.Reader) error {
err := json.NewDecoder(r).Decode(data)
if err != nil {
log.Printf("Error to decode message into json: %q", err)
}
return err
}

View File

@ -1 +1,2 @@
MONGO_URI="mongodb://172.16.235.2:27017"
MONGO_URI="mongodb://localhost:27017"
CONTROLLER_PASSWORD="test123"

View File

@ -20,11 +20,11 @@ func main() {
c := config.NewConfig()
js, nc := nats.StartNatsClient(c.Nats)
js, nc := nats.StartNatsClient(c.Nats, c.Controller)
db := db.NewDatabase(c.Mongo.Ctx, c.Mongo.Uri)
handler := handler.NewHandler(nc, js, db, c.ControllerId)
handler := handler.NewHandler(nc, js, db, c.Controller.ControllerId)
events.StartEventsListener(c.Nats.Ctx, js, handler)

View File

@ -24,10 +24,15 @@ type Mongo struct {
Ctx context.Context
}
type Controller struct {
ControllerId string
ControllerPasswd string
}
type Config struct {
Nats Nats
Mongo Mongo
ControllerId string
Nats Nats
Mongo Mongo
Controller Controller
}
func NewConfig() *Config {
@ -40,6 +45,7 @@ func NewConfig() *Config {
natsVerifyCertificates := flag.Bool("nats_verify_certificates", lookupEnvOrBool("NATS_VERIFY_CERTIFICATES", false), "verify validity of certificates from nats server")
mongoUri := flag.String("mongo_uri", lookupEnvOrString("MONGO_URI", "mongodb://localhost:27017"), "uri for mongodb server")
controllerId := flag.String("controller_id", lookupEnvOrString("CONTROLLER_ID", "oktopusController"), "usp controller endpoint id")
controllerPassword := flag.String("controller_passwd", lookupEnvOrString("CONTROLLER_PASSWORD", ""), "usp controller endpoint password to connect to")
flHelp := flag.Bool("help", false, "Help")
/*
@ -69,7 +75,10 @@ func NewConfig() *Config {
Uri: *mongoUri,
Ctx: ctx,
},
ControllerId: *controllerId,
Controller: Controller{
ControllerId: *controllerId,
ControllerPasswd: *controllerPassword,
},
}
}

View File

@ -12,16 +12,18 @@ import (
)
const (
MQTT_STREAM_NAME = "mqtt"
WS_STREAM_NAME = "ws"
STOMP_STREAM_NAME = "stomp"
LORA_STREAM_NAME = "lora"
OPC_STREAM_NAME = "opc"
ADAPTER_SUBJECT = "adapter" + USP_SUBJECT
USP_SUBJECT = ".usp.v1."
MQTT_STREAM_NAME = "mqtt"
WS_STREAM_NAME = "ws"
STOMP_STREAM_NAME = "stomp"
LORA_STREAM_NAME = "lora"
OPC_STREAM_NAME = "opc"
ADAPTER_SUBJECT = "adapter" + USP_SUBJECT
USP_SUBJECT = ".usp.v1."
BUCKET_NAME = "devices-auth"
BUCKET_DESCRIPTION = "Devices authentication"
)
func StartNatsClient(c config.Nats) (jetstream.JetStream, *nats.Conn) {
func StartNatsClient(c config.Nats, controller config.Controller) (jetstream.JetStream, *nats.Conn) {
var (
nc *nats.Conn
@ -59,9 +61,26 @@ func StartNatsClient(c config.Nats) (jetstream.JetStream, *nats.Conn) {
log.Fatalf("Failed to create Consumer: %v", err)
}
createKeyValueStores(c.Ctx, js, controller)
return js, nc
}
func createKeyValueStores(ctx context.Context, js jetstream.JetStream, control config.Controller) {
kv, err := js.CreateOrUpdateKeyValue(ctx, jetstream.KeyValueConfig{
Bucket: BUCKET_NAME,
Description: BUCKET_DESCRIPTION,
})
if err != nil {
log.Fatalf("Failed to create KeyValue store: %v", err)
}
_, err = kv.PutString(ctx, control.ControllerId, control.ControllerPasswd)
if err != nil {
log.Printf("Failed to create KeyValue store: %v", err)
}
}
func createStreams(ctx context.Context, js jetstream.JetStream, streams []string) error {
for _, stream := range streams {
_, err := js.CreateOrUpdateStream(ctx, jetstream.StreamConfig{

View File

View File

@ -0,0 +1 @@
.env.local

View File

@ -43,7 +43,7 @@ func NewConfig() *Config {
natsVerifyCertificates := flag.Bool("nats_verify_certificates", lookupEnvOrBool("NATS_VERIFY_CERTIFICATES", false), "verify validity of certificates from nats server")
mqttUrl := flag.String("mqtt_url", lookupEnvOrString("MQTT_URL", "tcp://localhost:1883"), "url for mqtt server")
mqttClientId := flag.String("mqtt_client_id", lookupEnvOrString("MQTT_CLIENT_ID", "mqtt-adapter"), "client id for mqtt")
mqttUsername := flag.String("mqtt_username", lookupEnvOrString("MQTT_USERNAME", ""), "username for mqtt")
mqttUsername := flag.String("mqtt_username", lookupEnvOrString("MQTT_USERNAME", "oktopusController"), "username for mqtt")
mqttPassword := flag.String("mqtt_password", lookupEnvOrString("MQTT_PASSWORD", ""), "password for mqtt")
mqttQos := flag.Int("mqtt_qos", lookupEnvOrInt("MQTT_QOS", 1), "quality of service for mqtt")
flHelp := flag.Bool("help", false, "Help")

View File

@ -1,4 +1,5 @@
- This MQTT broker runs with https://github.com/mochi-mqtt/server
- It's compatible with MQTTv5 and MQTTv3.1.1
- At authentication, the device is only allowed to subscribe and publish to topics that finish with their username
For more info access https://github.com/mochi-mqtt/server and/or execute "go run cmd/mqtt/main.go --help"

View File

@ -1,54 +0,0 @@
{
"auth": [
{
"username": "leandro",
"password": "leandro",
"allow": true
},
{
"username": "steve",
"password": "steve",
"allow": true
},
{
"username": "root",
"password": "root",
"allow": true
},
{
"remote": "*",
"allow": false
},
{
"remote": "*",
"allow": false
}
],
"acl": [
{
"remote": "*"
},
{
"username": "leandro",
"filters": {
"oktopus/+/agent/+": 1,
"oktopus/+/controller/+": 2,
"oktopus/+/get/+": 2
}
},
{
"username": "steve",
"filters": {
"oktopus/+/agent/+": 1,
"oktopus/+/controller/+": 2,
"oktopus/+/get/+": 2
}
},
{
"username": "root",
"filters": {
"#": 3
}
}
]
}

View File

@ -1 +0,0 @@
go run . -path auth.json

View File

@ -14,9 +14,15 @@ require (
github.com/google/uuid v1.6.0 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/joho/godotenv v1.5.1 // indirect
github.com/klauspost/compress v1.17.2 // indirect
github.com/mattn/go-colorable v0.1.12 // indirect
github.com/mattn/go-isatty v0.0.14 // indirect
github.com/nats-io/nats.go v1.34.1 // indirect
github.com/nats-io/nkeys v0.4.7 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/rs/xid v1.4.0 // indirect
golang.org/x/sys v0.5.0 // indirect
golang.org/x/crypto v0.18.0 // indirect
golang.org/x/sys v0.16.0 // indirect
golang.org/x/text v0.14.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

View File

@ -17,12 +17,20 @@ github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/ad
github.com/jinzhu/copier v0.3.5 h1:GlvfUwHk62RokgqVNvYsku0TATCF7bAHVwEXoBh3iJg=
github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4=
github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/mattn/go-colorable v0.1.12 h1:jF+Du6AlPIjs2BiUiQlKOX0rt3SujHxPnksPKZbaA40=
github.com/mattn/go-colorable v0.1.12/go.mod h1:u5H1YNBxpqRaxsYJYSkiCWKzEfiAb1Gb520KVy5xxl4=
github.com/mattn/go-isatty v0.0.14 h1:yVuAays6BHfxijgZPzw+3Zlu5yQgKGP2/hcQbHb7S9Y=
github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94=
github.com/mochi-co/mqtt/v2 v2.2.16 h1:CBqbxFhExzASNjj4BjSei0hYY1F5N5IeDqNVhjN+tp8=
github.com/mochi-co/mqtt/v2 v2.2.16/go.mod h1:MDMTThFgWj/LjJ6wc51bP5l4xnJG/ahpc9tR9vZVf8Q=
github.com/nats-io/nats.go v1.34.1 h1:syWey5xaNHZgicYBemv0nohUPPmaLteiBEUT6Q5+F/4=
github.com/nats-io/nats.go v1.34.1/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE=
@ -34,12 +42,18 @@ github.com/rs/zerolog v1.29.1 h1:cO+d60CHkknCbvzEWxP0S9K6KqyTjrCNUy1LdQLCGPc=
github.com/rs/zerolog v1.29.1/go.mod h1:Le6ESbR7hc+DP6Lt1THiV8CQSdkkNrd3R0XbEgp3ZBU=
github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY=
github.com/yuin/gopher-lua v0.0.0-20210529063254-f4c35e4016d9 h1:k/gmLsJDWwWqbLCur2yWnJzwQEKRcAHXo6seXGuSwWw=
golang.org/x/crypto v0.18.0 h1:PGVlW0xEltQnzFZ55hkuX5+KLyrMYhHld1YHO4AKcdc=
golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg=
golang.org/x/net v0.7.0 h1:rJrUqqhjsgNp7KqAIc25s9pZnjU7TUcSY7HcVZjdn1g=
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU=
golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=

View File

@ -1,6 +1,7 @@
package config
import (
"context"
"flag"
"log"
"os"
@ -16,7 +17,7 @@ type Config struct {
Tls bool
Fullchain string
Privkey string
AuthFile string
AuthEnable bool
RedisEnable bool
RedisAddr string
RedisPassword string
@ -25,9 +26,15 @@ type Config struct {
HttpEnable bool
HttpPort string
LogLevel int
Nats Nats
}
//TODO: debug websocket and http listeners
type Nats struct {
Url string
Name string
VerifyCertificates bool
Ctx context.Context
}
func NewConfig() Config {
@ -44,15 +51,18 @@ func NewConfig() Config {
tls := flag.Bool("mqtt_tls", lookupEnvOrBool("MQTT_TLS", false), "enable/disable TLS")
fullchain := flag.String("full_chain_path", lookupEnvOrString("FULL_CHAIN_PATH", ""), "path to fullchain.pem certificate")
privkey := flag.String("private_key_path", lookupEnvOrString("PRIVATE_KEY_PATH", ""), "path to privkey.pem certificate")
authFile := flag.String("auth_file_path", lookupEnvOrString("AUTH_FILE_PATH", ""), "path to MQTT RBAC auth file")
authEnable := flag.Bool("auth_enable", lookupEnvOrBool("AUTH_ENABLE", false), "enable authentication")
redisEnable := flag.Bool("redis_enable", lookupEnvOrBool("REDIS_ENABLE", true), "enable/disable Redis db")
redisAddr := flag.String("redis_addr", lookupEnvOrString("REDIS_ADDR", "localhost:6379"), "address of redis db")
redisAddr := flag.String("redis_addr", lookupEnvOrString("REDIS_ADDR", ""), "address of redis db")
redisPassword := flag.String("redis_passwd", lookupEnvOrString("REDIS_PASSWD", ""), "redis db password")
wsEnable := flag.Bool("ws_enable", lookupEnvOrBool("WS_ENABLE", false), "enable/disable Websocket listener")
wsPort := flag.String("ws_port", lookupEnvOrString("WS_PORT", ":80"), "port for Websocket listener")
httpEnable := flag.Bool("http_enable", lookupEnvOrBool("HTTP_ENABLE", false), "enable/disable HTTP listener of mqtt metrics")
httpPort := flag.String("http_port", lookupEnvOrString("HTTP_PORT", ":8080"), "port for HTTP listener of mqtt metrics")
logLevel := flag.Int("log_level", lookupEnvOrInt("LOG_LEVEL", 1), "0=DEBUG, 1=INFO, 2=WARNING, 3=ERROR")
natsUrl := flag.String("nats_url", lookupEnvOrString("NATS_URL", "nats://localhost:4222"), "url for nats server")
natsName := flag.String("nats_name", lookupEnvOrString("NATS_NAME", "adapter"), "name for nats client")
natsVerifyCertificates := flag.Bool("nats_verify_certificates", lookupEnvOrBool("NATS_VERIFY_CERTIFICATES", false), "verify validity of certificates from nats server")
flag.Parse()
flHelp := flag.Bool("help", false, "Help")
@ -62,12 +72,14 @@ func NewConfig() Config {
os.Exit(0)
}
ctx := context.TODO()
conf := Config{
MqttPort: *mqttPort,
Tls: *tls,
Fullchain: *fullchain,
Privkey: *privkey,
AuthFile: *authFile,
AuthEnable: *authEnable,
RedisEnable: *redisEnable,
RedisAddr: *redisAddr,
RedisPassword: *redisPassword,
@ -76,6 +88,12 @@ func NewConfig() Config {
HttpEnable: *httpEnable,
HttpPort: *httpPort,
LogLevel: *logLevel,
Nats: Nats{
Url: *natsUrl,
Name: *natsName,
VerifyCertificates: *natsVerifyCertificates,
Ctx: ctx,
},
}
conf.validate()

View File

@ -52,17 +52,18 @@ func StartServers(c config.Config) {
func newMqttServer(c config.Config) *broker.Mqtt {
return &broker.Mqtt{
Port: c.MqttPort,
Tls: c.Tls,
Fullchain: c.Fullchain,
Privkey: c.Privkey,
AuthFile: c.AuthFile,
Port: c.MqttPort,
Tls: c.Tls,
Fullchain: c.Fullchain,
Privkey: c.Privkey,
AuthEnable: c.AuthEnable,
Redis: broker.Redis{
RedisEnable: c.RedisEnable,
RedisAddr: c.RedisAddr,
RedisPassword: c.RedisPassword,
},
LogLevel: c.LogLevel,
Nats: c.Nats,
}
}

View File

@ -1,18 +1,28 @@
package mqtt
import (
"broker/internal/config"
"broker/internal/nats"
"bytes"
"context"
"log"
"strings"
"github.com/mochi-co/mqtt/v2"
"github.com/mochi-co/mqtt/v2/hooks/auth"
"github.com/mochi-co/mqtt/v2/packets"
"github.com/nats-io/nats.go/jetstream"
)
type MyHook struct {
mqtt.HookBase
}
type NatsAuthHook struct {
mqtt.HookBase
kv jetstream.KeyValue
}
func (h *MyHook) ID() string {
return "events-controller"
}
@ -86,3 +96,91 @@ func (h *MyHook) OnPacketEncode(cl *mqtt.Client, pk packets.Packet) packets.Pack
return pk
}
func (h *NatsAuthHook) ID() string {
return "device-auth"
}
func (h *NatsAuthHook) Provides(b byte) bool {
return bytes.Contains([]byte{
mqtt.OnConnectAuthenticate,
mqtt.OnACLCheck,
}, []byte{b})
}
func (h *NatsAuthHook) Init(c any) error {
_, _, kv := nats.StartNatsClient(c.(config.Nats))
h.kv = kv
h.Log.Info().Msg("initialised device auth nats hook")
return nil
}
func (h *NatsAuthHook) OnConnectAuthenticate(cl *mqtt.Client, pk packets.Packet) bool {
username := string(pk.Connect.Username)
entry, err := h.kv.Get(context.TODO(), username)
if err != nil {
if err == jetstream.ErrKeyNotFound {
log.Println("user access not found, blocked user:", username)
return false
}
log.Println("error getting key value: ", err)
return false
}
if bytes.Equal(entry.Value(), pk.Connect.Password) {
return true
}
return false
}
func (h *NatsAuthHook) OnACLCheck(cl *mqtt.Client, topic string, write bool) bool {
username := string(cl.Properties.Username)
_, err := h.kv.Get(context.TODO(), username)
if err != nil {
if err == jetstream.ErrKeyNotFound {
log.Println("user access not found, blocked user:", username)
return false
}
log.Println("error getting key value: ", err)
return false
}
if username == "oktopusController" {
return true
}
if !write {
deviceAllowedTopics := []string{
"oktopus/usp/v1/agent/" + username,
}
for _, allowedTopic := range deviceAllowedTopics {
_, ok := auth.MatchTopic(allowedTopic, topic)
if ok {
return true
}
}
return false
}
if write {
deviceAllowedTopics := []string{
"oktopus/usp/v1/controller",
"oktopus/usp/v1/api/" + username,
}
for _, allowedTopic := range deviceAllowedTopics {
_, ok := auth.MatchTopic(allowedTopic, topic)
if ok {
return true
}
}
}
return false
}

View File

@ -1,10 +1,10 @@
package mqtt
import (
"broker/internal/config"
"crypto/tls"
"io/ioutil"
"log"
"os"
rv8 "github.com/go-redis/redis/v8"
"github.com/google/uuid"
@ -20,13 +20,14 @@ var (
)
type Mqtt struct {
Port string
Tls bool
Fullchain string
Privkey string
AuthFile string
Redis Redis
LogLevel int
Port string
Tls bool
Fullchain string
Privkey string
Redis Redis
LogLevel int
Nats config.Nats
AuthEnable bool
}
type Redis struct {
@ -38,7 +39,7 @@ type Redis struct {
func (m *Mqtt) Start(mqttServer *mqtt.Server) {
defineSeverLog(mqttServer, m.LogLevel)
defineServerAuth(mqttServer, m.AuthFile)
defineServerAuth(mqttServer, m.Nats, m.AuthEnable)
server = mqttServer
@ -112,15 +113,9 @@ func defineServerTls(fullchain, privkey string) *listeners.Config {
return nil
}
func defineServerAuth(server *mqtt.Server, authFile string) {
if authFile != "" {
data, err := os.ReadFile(authFile)
if err != nil {
log.Fatal(err)
}
err = server.AddHook(new(auth.Hook), &auth.Options{
Data: data,
})
func defineServerAuth(server *mqtt.Server, natsConfig config.Nats, authEnable bool) {
if authEnable {
err := server.AddHook(new(NatsAuthHook), natsConfig)
if err != nil {
log.Fatal(err)
}

View File

@ -0,0 +1,82 @@
package nats
import (
"broker/internal/config"
"log"
"time"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
)
const (
NATS_ACCOUNT_SUBJ_PREFIX = "account-manager.v1."
NATS_REQUEST_TIMEOUT = 10 * time.Second
NATS_MQTT_SUBJECT_PREFIX = "mqtt.usp.v1."
NATS_MQTT_ADAPTER_SUBJECT_PREFIX = "mqtt-adapter.usp.v1."
NATS_ADAPTER_SUBJECT = "adapter.usp.v1."
NATS_WS_SUBJECT_PREFIX = "ws.usp.v1."
NATS_WS_ADAPTER_SUBJECT_PREFIX = "ws-adapter.usp.v1."
DEVICE_SUBJECT_PREFIX = "device.usp.v1."
BUCKET_NAME = "devices-auth"
BUCKET_DESCRIPTION = "Devices authentication"
)
func StartNatsClient(c config.Nats) (jetstream.JetStream, *nats.Conn, jetstream.KeyValue) {
var (
nc *nats.Conn
err error
)
opts := defineOptions(c)
log.Printf("Connecting to NATS server %s", c.Url)
for {
nc, err = nats.Connect(c.Url, opts...)
if err != nil {
time.Sleep(5 * time.Second)
continue
}
break
}
log.Printf("Successfully connected to NATS server %s", c.Url)
js, err := jetstream.New(nc)
if err != nil {
log.Fatalf("Failed to create JetStream client: %v", err)
}
kv, err := js.CreateOrUpdateKeyValue(c.Ctx, jetstream.KeyValueConfig{
Bucket: BUCKET_NAME,
Description: BUCKET_DESCRIPTION,
})
if err != nil {
log.Fatalf("Failed to create KeyValue store: %v", err)
}
return js, nc, kv
}
func defineOptions(c config.Nats) []nats.Option {
var opts []nats.Option
opts = append(opts, nats.Name(c.Name))
opts = append(opts, nats.MaxReconnects(-1))
opts = append(opts, nats.ReconnectWait(5*time.Second))
opts = append(opts, nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
log.Printf("Got disconnected! Reason: %q\n", err)
}))
opts = append(opts, nats.ReconnectHandler(func(nc *nats.Conn) {
log.Printf("Got reconnected to %v!\n", nc.ConnectedUrl())
}))
opts = append(opts, nats.ClosedHandler(func(nc *nats.Conn) {
log.Printf("Connection closed. Reason: %q\n", nc.LastError())
}))
if c.VerifyCertificates {
opts = append(opts, nats.RootCAs())
}
return opts
}

View File

@ -0,0 +1 @@
.env.local

View File

@ -7,6 +7,7 @@ import (
"syscall"
"github.com/OktopUSP/oktopus/ws/internal/config"
"github.com/OktopUSP/oktopus/ws/internal/nats"
"github.com/OktopUSP/oktopus/ws/internal/ws"
)
@ -19,7 +20,9 @@ func main() {
// Locks app running until it receives a stop command as Ctrl+C.
signal.Notify(done, syscall.SIGINT)
ws.StartNewServer(conf)
_, kv := nats.StartNatsClient(conf.Nats)
ws.StartNewServer(conf, kv)
<-done

View File

@ -9,4 +9,13 @@ require (
google.golang.org/protobuf v1.32.0
)
require golang.org/x/net v0.17.0 // indirect
require (
github.com/klauspost/compress v1.17.2 // indirect
github.com/nats-io/nats.go v1.34.1 // indirect
github.com/nats-io/nkeys v0.4.7 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
golang.org/x/crypto v0.18.0 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/sys v0.16.0 // indirect
golang.org/x/text v0.14.0 // indirect
)

View File

@ -6,8 +6,22 @@ github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/
github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY=
github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4=
github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/nats-io/nats.go v1.34.1 h1:syWey5xaNHZgicYBemv0nohUPPmaLteiBEUT6Q5+F/4=
github.com/nats-io/nats.go v1.34.1/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
golang.org/x/crypto v0.18.0 h1:PGVlW0xEltQnzFZ55hkuX5+KLyrMYhHld1YHO4AKcdc=
golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg=
golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM=
golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU=
golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v1.32.0 h1:pPC6BG5ex8PDFnkbrGU3EixyhKcQ2aDuBS36lqK/C7I=

View File

@ -2,6 +2,7 @@
package config
import (
"context"
"flag"
"log"
"os"
@ -13,9 +14,16 @@ import (
type Config struct {
Port string // server port: e.g. ":8080"
Auth bool // server auth enable/disable
Token string // controller auth token
ControllerEID string // controller endpoint id
Tls bool // enable/diable websockets server tls
Nats Nats
}
type Nats struct {
Url string
Name string
VerifyCertificates bool
Ctx context.Context
}
func NewConfig() Config {
@ -33,8 +41,10 @@ func NewConfig() Config {
*/
/* ------------------------------ define flags ------------------------------ */
natsUrl := flag.String("nats_url", lookupEnvOrString("NATS_URL", "nats://localhost:4222"), "url for nats server")
natsName := flag.String("nats_name", lookupEnvOrString("NATS_NAME", "ws-adapter"), "name for nats client")
natsVerifyCertificates := flag.Bool("nats_verify_certificates", lookupEnvOrBool("NATS_VERIFY_CERTIFICATES", false), "verify validity of certificates from nats server")
flPort := flag.String("port", lookupEnvOrString("SERVER_PORT", ":8080"), "Server port")
flToken := flag.String("token", lookupEnvOrString("SERVER_AUTH_TOKEN", ""), "Controller auth token")
flAuth := flag.Bool("auth", lookupEnvOrBool("SERVER_AUTH_ENABLE", false), "Server auth enable/disable")
flControllerEid := flag.String("controller-eid", lookupEnvOrString("CONTROLLER_EID", "oktopusController"), "Controller eid")
flTls := flag.Bool("tls", lookupEnvOrBool("SERVER_TLS_ENABLE", false), "Enable/diable websockets server tls")
@ -47,12 +57,19 @@ func NewConfig() Config {
os.Exit(0)
}
ctx := context.TODO()
return Config{
Port: *flPort,
Token: *flToken,
Auth: *flAuth,
ControllerEID: *flControllerEid,
Tls: *flTls,
Nats: Nats{
Url: *natsUrl,
Name: *natsName,
VerifyCertificates: *natsVerifyCertificates,
Ctx: ctx,
},
}
}

View File

@ -0,0 +1,74 @@
package nats
import (
"log"
"time"
"github.com/OktopUSP/oktopus/ws/internal/config"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
)
const (
BUCKET_NAME = "devices-auth"
BUCKET_DESCRIPTION = "Devices authentication"
)
func StartNatsClient(c config.Nats) (*nats.Conn, jetstream.KeyValue) {
var (
nc *nats.Conn
err error
)
opts := defineOptions(c)
log.Printf("Connecting to NATS server %s", c.Url)
for {
nc, err = nats.Connect(c.Url, opts...)
if err != nil {
time.Sleep(5 * time.Second)
continue
}
break
}
log.Printf("Successfully connected to NATS server %s", c.Url)
js, err := jetstream.New(nc)
if err != nil {
log.Fatalf("Failed to create JetStream client: %v", err)
}
kv, err := js.CreateOrUpdateKeyValue(c.Ctx, jetstream.KeyValueConfig{
Bucket: BUCKET_NAME,
Description: BUCKET_DESCRIPTION,
})
if err != nil {
log.Fatalf("Failed to create KeyValue store: %v", err)
}
return nc, kv
}
func defineOptions(c config.Nats) []nats.Option {
var opts []nats.Option
opts = append(opts, nats.Name(c.Name))
opts = append(opts, nats.MaxReconnects(-1))
opts = append(opts, nats.ReconnectWait(5*time.Second))
opts = append(opts, nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
log.Printf("Got disconnected! Reason: %q\n", err)
}))
opts = append(opts, nats.ReconnectHandler(func(nc *nats.Conn) {
log.Printf("Got reconnected to %v!\n", nc.ConnectedUrl())
}))
opts = append(opts, nats.ClosedHandler(func(nc *nats.Conn) {
log.Printf("Connection closed. Reason: %q\n", nc.LastError())
}))
if c.VerifyCertificates {
opts = append(opts, nats.RootCAs())
}
return opts
}

View File

@ -7,7 +7,9 @@ import (
"time"
"github.com/OktopUSP/oktopus/ws/internal/usp_record"
"github.com/gorilla/mux"
"github.com/gorilla/websocket"
"github.com/nats-io/nats.go/jetstream"
"google.golang.org/protobuf/proto"
)
@ -148,10 +150,26 @@ func (c *Client) writePump() {
}
// Handle USP Controller events
func ServeController(w http.ResponseWriter, r *http.Request, token, cEID string, authEnable bool) {
func ServeController(
w http.ResponseWriter,
r *http.Request,
cEID string,
authEnable bool,
kv jetstream.KeyValue,
) {
if authEnable {
entry, err := kv.Get(r.Context(), cEID)
if err != nil {
if err == jetstream.ErrKeyNotFound {
w.WriteHeader(http.StatusUnauthorized)
return
}
log.Println("Nats kv error:", err)
w.Write([]byte("Nats kv error:" + err.Error()))
return
}
recv_token := r.URL.Query().Get("token")
if recv_token != token {
if recv_token != string(entry.Value()) {
w.WriteHeader(http.StatusUnauthorized)
w.Write([]byte("Unauthorized"))
return
@ -172,7 +190,13 @@ func ServeController(w http.ResponseWriter, r *http.Request, token, cEID string,
}
// Handle USP Agent events, cEID = controller endpoint id
func ServeAgent(w http.ResponseWriter, r *http.Request, cEID string) {
func ServeAgent(
w http.ResponseWriter,
r *http.Request,
cEID string,
kv jetstream.KeyValue,
authEnable bool,
) {
header := http.Header{
"Sec-Websocket-Protocol": {uspVersion},
@ -187,6 +211,24 @@ func ServeAgent(w http.ResponseWriter, r *http.Request, cEID string) {
return
}
if authEnable {
entry, err := kv.Get(r.Context(), deviceid)
if err != nil {
if err == jetstream.ErrKeyNotFound {
w.WriteHeader(http.StatusUnauthorized)
return
}
log.Println("Nats kv error:", err)
w.Write([]byte("Nats kv error:" + err.Error()))
return
}
if mux.Vars(r)["passwd"] != string(entry.Value()) {
w.WriteHeader(http.StatusUnauthorized)
return
}
}
conn, err := upgrader.Upgrade(w, r, header)
if err != nil {
log.Println(err)

View File

@ -9,19 +9,23 @@ import (
"github.com/OktopUSP/oktopus/ws/internal/config"
"github.com/OktopUSP/oktopus/ws/internal/ws/handler"
"github.com/gorilla/mux"
"github.com/nats-io/nats.go/jetstream"
)
// Starts New Websockets Server
func StartNewServer(c config.Config) {
func StartNewServer(c config.Config, kv jetstream.KeyValue) {
// Initialize handlers of websockets events
go handler.InitHandlers(c.ControllerEID)
r := mux.NewRouter()
r.HandleFunc("/ws/agent/{passwd}", func(w http.ResponseWriter, r *http.Request) {
handler.ServeAgent(w, r, c.ControllerEID, kv, c.Auth)
})
r.HandleFunc("/ws/agent", func(w http.ResponseWriter, r *http.Request) {
handler.ServeAgent(w, r, c.ControllerEID)
handler.ServeAgent(w, r, c.ControllerEID, kv, c.Auth)
})
r.HandleFunc("/ws/controller", func(w http.ResponseWriter, r *http.Request) {
handler.ServeController(w, r, c.Token, c.ControllerEID, c.Auth)
handler.ServeController(w, r, c.ControllerEID, c.Auth, kv)
})
go func() {