commit
106ed28d50
|
|
@ -1,59 +1,23 @@
|
||||||
version: 2.1
|
version: 2.1
|
||||||
executors:
|
|
||||||
nodejs:
|
|
||||||
docker:
|
|
||||||
- image: cimg/node:18.17.1
|
|
||||||
golang:
|
|
||||||
docker:
|
|
||||||
- image: cimg/go:1.20.7
|
|
||||||
|
|
||||||
jobs:
|
jobs:
|
||||||
|
release:
|
||||||
build_and_deploy_controller:
|
docker:
|
||||||
executor: golang
|
- image: cimg/base:2022.09
|
||||||
|
auth:
|
||||||
|
username: $DOCKERHUB_USERNAME
|
||||||
|
password: $DOCKERHUB_PASSWORD
|
||||||
steps:
|
steps:
|
||||||
- checkout
|
- checkout
|
||||||
|
- setup_remote_docker
|
||||||
- run:
|
- run:
|
||||||
name: Build and Deploy Controller
|
name: Build and Push application Docker image
|
||||||
command: |
|
command: |
|
||||||
cd backend/services/controller && go build -o controller cmd/oktopus/main.go
|
echo $DOCKERHUB_PASSWORD | docker login -u $DOCKERHUB_USERNAME --password-stdin
|
||||||
scp -o StrictHostKeyChecking=no controller $SSH_USER@$SSH_HOST:/home/$SSH_USER
|
cd build && make release
|
||||||
ssh -o StrictHostKeyChecking=no $SSH_USER@$SSH_HOST "sudo mv controller /usr/bin/ && sudo systemctl restart controller"
|
|
||||||
|
|
||||||
build_and_deploy_mochi:
|
|
||||||
executor: golang
|
|
||||||
steps:
|
|
||||||
- checkout
|
|
||||||
- run:
|
|
||||||
name: Build and Deploy Mochi
|
|
||||||
command: |
|
|
||||||
cd backend/services/mochi/ && go build -o mochi cmd/main.go
|
|
||||||
scp -o StrictHostKeyChecking=no mochi $SSH_USER@$SSH_HOST:/home/$SSH_USER
|
|
||||||
ssh -o StrictHostKeyChecking=no $SSH_USER@$SSH_HOST "sudo mv mochi /usr/bin/ && sudo systemctl restart mochi"
|
|
||||||
|
|
||||||
build_and_deploy_frontend:
|
|
||||||
executor: nodejs
|
|
||||||
steps:
|
|
||||||
- checkout
|
|
||||||
- run:
|
|
||||||
name: Build and Deploy Frontend
|
|
||||||
command: |
|
|
||||||
cd frontend && npm i && npm run build
|
|
||||||
scp -o StrictHostKeyChecking=no -r .next/ $SSH_USER@$SSH_HOST:/home/$SSH_USER/oktopus/frontend
|
|
||||||
ssh -o StrictHostKeyChecking=no $SSH_USER@$SSH_HOST "pm2 restart oktopus"
|
|
||||||
|
|
||||||
workflows:
|
workflows:
|
||||||
build_and_deploy:
|
release:
|
||||||
jobs:
|
jobs:
|
||||||
- build_and_deploy_controller:
|
- release:
|
||||||
filters:
|
|
||||||
branches:
|
|
||||||
only: main
|
|
||||||
- build_and_deploy_mochi:
|
|
||||||
filters:
|
|
||||||
branches:
|
|
||||||
only: main
|
|
||||||
- build_and_deploy_frontend:
|
|
||||||
filters:
|
filters:
|
||||||
branches:
|
branches:
|
||||||
only: main
|
only: main
|
||||||
|
|
@ -20,13 +20,13 @@ func main() {
|
||||||
|
|
||||||
c := config.NewConfig()
|
c := config.NewConfig()
|
||||||
|
|
||||||
js, nc := nats.StartNatsClient(c.Nats)
|
js, nc, kv := nats.StartNatsClient(c.Nats)
|
||||||
|
|
||||||
bridge := bridge.NewBridge(js, nc)
|
bridge := bridge.NewBridge(js, nc)
|
||||||
|
|
||||||
db := db.NewDatabase(c.Mongo.Ctx, c.Mongo.Uri)
|
db := db.NewDatabase(c.Mongo.Ctx, c.Mongo.Uri)
|
||||||
|
|
||||||
api := api.NewApi(c.RestApi, js, nc, bridge, db)
|
api := api.NewApi(c.RestApi, js, nc, bridge, db, kv)
|
||||||
api.StartApi()
|
api.StartApi()
|
||||||
|
|
||||||
<-done
|
<-done
|
||||||
|
|
|
||||||
|
|
@ -22,6 +22,7 @@ type Api struct {
|
||||||
nc *nats.Conn
|
nc *nats.Conn
|
||||||
bridge bridge.Bridge
|
bridge bridge.Bridge
|
||||||
db db.Database
|
db db.Database
|
||||||
|
kv jetstream.KeyValue
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -32,7 +33,7 @@ const (
|
||||||
AdminUser
|
AdminUser
|
||||||
)
|
)
|
||||||
|
|
||||||
func NewApi(c config.RestApi, js jetstream.JetStream, nc *nats.Conn, bridge bridge.Bridge, d db.Database) Api {
|
func NewApi(c config.RestApi, js jetstream.JetStream, nc *nats.Conn, bridge bridge.Bridge, d db.Database, kv jetstream.KeyValue) Api {
|
||||||
return Api{
|
return Api{
|
||||||
port: c.Port,
|
port: c.Port,
|
||||||
js: js,
|
js: js,
|
||||||
|
|
@ -40,6 +41,7 @@ func NewApi(c config.RestApi, js jetstream.JetStream, nc *nats.Conn, bridge brid
|
||||||
ctx: c.Ctx,
|
ctx: c.Ctx,
|
||||||
bridge: bridge,
|
bridge: bridge,
|
||||||
db: d,
|
db: d,
|
||||||
|
kv: kv,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -51,6 +53,7 @@ func (a *Api) StartApi() {
|
||||||
authentication.HandleFunc("/admin/register", a.registerAdminUser).Methods("POST")
|
authentication.HandleFunc("/admin/register", a.registerAdminUser).Methods("POST")
|
||||||
authentication.HandleFunc("/admin/exists", a.adminUserExists).Methods("GET")
|
authentication.HandleFunc("/admin/exists", a.adminUserExists).Methods("GET")
|
||||||
iot := r.PathPrefix("/api/device").Subrouter()
|
iot := r.PathPrefix("/api/device").Subrouter()
|
||||||
|
iot.HandleFunc("/auth", a.deviceAuth).Methods("GET", "PUT", "DELETE")
|
||||||
iot.HandleFunc("", a.retrieveDevices).Methods("GET")
|
iot.HandleFunc("", a.retrieveDevices).Methods("GET")
|
||||||
iot.HandleFunc("/{id}", a.retrieveDevices).Methods("GET")
|
iot.HandleFunc("/{id}", a.retrieveDevices).Methods("GET")
|
||||||
iot.HandleFunc("/{sn}/{mtp}/get", a.deviceGetMsg).Methods("PUT")
|
iot.HandleFunc("/{sn}/{mtp}/get", a.deviceGetMsg).Methods("PUT")
|
||||||
|
|
@ -77,10 +80,6 @@ func (a *Api) StartApi() {
|
||||||
return middleware.Middleware(handler)
|
return middleware.Middleware(handler)
|
||||||
})
|
})
|
||||||
|
|
||||||
// mtp.Use(func(handler http.Handler) http.Handler {
|
|
||||||
// return middleware.Middleware(handler)
|
|
||||||
// })
|
|
||||||
|
|
||||||
dash.Use(func(handler http.Handler) http.Handler {
|
dash.Use(func(handler http.Handler) http.Handler {
|
||||||
return middleware.Middleware(handler)
|
return middleware.Middleware(handler)
|
||||||
})
|
})
|
||||||
|
|
|
||||||
|
|
@ -6,6 +6,8 @@ import (
|
||||||
"net/http"
|
"net/http"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
|
||||||
|
"github.com/leandrofars/oktopus/internal/utils"
|
||||||
|
"github.com/nats-io/nats.go/jetstream"
|
||||||
"go.mongodb.org/mongo-driver/bson"
|
"go.mongodb.org/mongo-driver/bson"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -102,3 +104,105 @@ func (a *Api) retrieveDevices(w http.ResponseWriter, r *http.Request) {
|
||||||
log.Println(err)
|
log.Println(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type DeviceAuth struct {
|
||||||
|
User string `json:"id"`
|
||||||
|
Password string `json:"password"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *Api) deviceAuth(w http.ResponseWriter, r *http.Request) {
|
||||||
|
if r.Method == http.MethodGet {
|
||||||
|
|
||||||
|
id := r.URL.Query().Get("id")
|
||||||
|
if id != "" {
|
||||||
|
entry, err := a.kv.Get(r.Context(), id)
|
||||||
|
if err != nil {
|
||||||
|
if err == jetstream.ErrKeyNotFound {
|
||||||
|
w.WriteHeader(http.StatusNotFound)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
w.WriteHeader(http.StatusInternalServerError)
|
||||||
|
utils.MarshallEncoder(err, w)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
utils.MarshallEncoder(map[string]string{
|
||||||
|
id: string(entry.Value()),
|
||||||
|
}, w)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
entries, err := a.kv.ListKeys(r.Context(), jetstream.IgnoreDeletes())
|
||||||
|
if err != nil {
|
||||||
|
w.WriteHeader(http.StatusInternalServerError)
|
||||||
|
utils.MarshallEncoder(err, w)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
listOfKeys := make(map[string]string)
|
||||||
|
|
||||||
|
keys := entries.Keys()
|
||||||
|
for key := range keys {
|
||||||
|
entry, err := a.kv.Get(r.Context(), key)
|
||||||
|
if err != nil {
|
||||||
|
w.WriteHeader(http.StatusInternalServerError)
|
||||||
|
utils.MarshallEncoder(err, w)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
/*listOfKeys = append(listOfKeys, map[string]string{
|
||||||
|
key: string(entry.Value()),
|
||||||
|
})*/
|
||||||
|
listOfKeys[key] = string(entry.Value())
|
||||||
|
}
|
||||||
|
|
||||||
|
utils.MarshallEncoder(listOfKeys, w)
|
||||||
|
|
||||||
|
} else if r.Method == http.MethodDelete {
|
||||||
|
|
||||||
|
id := r.URL.Query().Get("id")
|
||||||
|
if id != "" {
|
||||||
|
err := a.kv.Purge(r.Context(), id)
|
||||||
|
if err != nil {
|
||||||
|
if err == jetstream.ErrKeyNotFound {
|
||||||
|
w.WriteHeader(http.StatusNotFound)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
w.WriteHeader(http.StatusInternalServerError)
|
||||||
|
utils.MarshallEncoder(err, w)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
w.WriteHeader(http.StatusBadRequest)
|
||||||
|
utils.MarshallEncoder("No id provided", w)
|
||||||
|
|
||||||
|
} else if r.Method == http.MethodPut {
|
||||||
|
|
||||||
|
var deviceAuth DeviceAuth
|
||||||
|
|
||||||
|
err := utils.MarshallDecoder(&deviceAuth, r.Body)
|
||||||
|
if err != nil {
|
||||||
|
w.WriteHeader(http.StatusBadRequest)
|
||||||
|
utils.MarshallEncoder(err, w)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if deviceAuth.User != "" {
|
||||||
|
_, err := a.kv.PutString(r.Context(), deviceAuth.User, deviceAuth.Password)
|
||||||
|
if err != nil {
|
||||||
|
w.WriteHeader(http.StatusInternalServerError)
|
||||||
|
utils.MarshallEncoder(err, w)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
w.WriteHeader(http.StatusBadRequest)
|
||||||
|
utils.MarshallEncoder("device must have a user", w)
|
||||||
|
|
||||||
|
} else {
|
||||||
|
log.Println("Unknown method used in device auth api")
|
||||||
|
w.WriteHeader(http.StatusMethodNotAllowed)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -18,9 +18,11 @@ const (
|
||||||
NATS_WS_SUBJECT_PREFIX = "ws.usp.v1."
|
NATS_WS_SUBJECT_PREFIX = "ws.usp.v1."
|
||||||
NATS_WS_ADAPTER_SUBJECT_PREFIX = "ws-adapter.usp.v1."
|
NATS_WS_ADAPTER_SUBJECT_PREFIX = "ws-adapter.usp.v1."
|
||||||
DEVICE_SUBJECT_PREFIX = "device.usp.v1."
|
DEVICE_SUBJECT_PREFIX = "device.usp.v1."
|
||||||
|
BUCKET_NAME = "devices-auth"
|
||||||
|
BUCKET_DESCRIPTION = "Devices authentication"
|
||||||
)
|
)
|
||||||
|
|
||||||
func StartNatsClient(c config.Nats) (jetstream.JetStream, *nats.Conn) {
|
func StartNatsClient(c config.Nats) (jetstream.JetStream, *nats.Conn, jetstream.KeyValue) {
|
||||||
|
|
||||||
var (
|
var (
|
||||||
nc *nats.Conn
|
nc *nats.Conn
|
||||||
|
|
@ -46,7 +48,15 @@ func StartNatsClient(c config.Nats) (jetstream.JetStream, *nats.Conn) {
|
||||||
log.Fatalf("Failed to create JetStream client: %v", err)
|
log.Fatalf("Failed to create JetStream client: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return js, nc
|
kv, err := js.CreateOrUpdateKeyValue(c.Ctx, jetstream.KeyValueConfig{
|
||||||
|
Bucket: BUCKET_NAME,
|
||||||
|
Description: BUCKET_DESCRIPTION,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Failed to create KeyValue store: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return js, nc, kv
|
||||||
}
|
}
|
||||||
|
|
||||||
func defineOptions(c config.Nats) []nats.Option {
|
func defineOptions(c config.Nats) []nats.Option {
|
||||||
|
|
|
||||||
|
|
@ -22,9 +22,11 @@ func MarshallEncoder(data any, w io.Writer) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func MarshallDecoder(data any, r io.Reader) {
|
func MarshallDecoder(data any, r io.Reader) error {
|
||||||
err := json.NewDecoder(r).Decode(data)
|
err := json.NewDecoder(r).Decode(data)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("Error to decode message into json: %q", err)
|
log.Printf("Error to decode message into json: %q", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1 +1,2 @@
|
||||||
MONGO_URI="mongodb://172.16.235.2:27017"
|
MONGO_URI="mongodb://localhost:27017"
|
||||||
|
CONTROLLER_PASSWORD="test123"
|
||||||
|
|
|
||||||
|
|
@ -20,11 +20,11 @@ func main() {
|
||||||
|
|
||||||
c := config.NewConfig()
|
c := config.NewConfig()
|
||||||
|
|
||||||
js, nc := nats.StartNatsClient(c.Nats)
|
js, nc := nats.StartNatsClient(c.Nats, c.Controller)
|
||||||
|
|
||||||
db := db.NewDatabase(c.Mongo.Ctx, c.Mongo.Uri)
|
db := db.NewDatabase(c.Mongo.Ctx, c.Mongo.Uri)
|
||||||
|
|
||||||
handler := handler.NewHandler(nc, js, db, c.ControllerId)
|
handler := handler.NewHandler(nc, js, db, c.Controller.ControllerId)
|
||||||
|
|
||||||
events.StartEventsListener(c.Nats.Ctx, js, handler)
|
events.StartEventsListener(c.Nats.Ctx, js, handler)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -24,10 +24,15 @@ type Mongo struct {
|
||||||
Ctx context.Context
|
Ctx context.Context
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type Controller struct {
|
||||||
|
ControllerId string
|
||||||
|
ControllerPasswd string
|
||||||
|
}
|
||||||
|
|
||||||
type Config struct {
|
type Config struct {
|
||||||
Nats Nats
|
Nats Nats
|
||||||
Mongo Mongo
|
Mongo Mongo
|
||||||
ControllerId string
|
Controller Controller
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewConfig() *Config {
|
func NewConfig() *Config {
|
||||||
|
|
@ -40,6 +45,7 @@ func NewConfig() *Config {
|
||||||
natsVerifyCertificates := flag.Bool("nats_verify_certificates", lookupEnvOrBool("NATS_VERIFY_CERTIFICATES", false), "verify validity of certificates from nats server")
|
natsVerifyCertificates := flag.Bool("nats_verify_certificates", lookupEnvOrBool("NATS_VERIFY_CERTIFICATES", false), "verify validity of certificates from nats server")
|
||||||
mongoUri := flag.String("mongo_uri", lookupEnvOrString("MONGO_URI", "mongodb://localhost:27017"), "uri for mongodb server")
|
mongoUri := flag.String("mongo_uri", lookupEnvOrString("MONGO_URI", "mongodb://localhost:27017"), "uri for mongodb server")
|
||||||
controllerId := flag.String("controller_id", lookupEnvOrString("CONTROLLER_ID", "oktopusController"), "usp controller endpoint id")
|
controllerId := flag.String("controller_id", lookupEnvOrString("CONTROLLER_ID", "oktopusController"), "usp controller endpoint id")
|
||||||
|
controllerPassword := flag.String("controller_passwd", lookupEnvOrString("CONTROLLER_PASSWORD", ""), "usp controller endpoint password to connect to")
|
||||||
flHelp := flag.Bool("help", false, "Help")
|
flHelp := flag.Bool("help", false, "Help")
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|
@ -69,7 +75,10 @@ func NewConfig() *Config {
|
||||||
Uri: *mongoUri,
|
Uri: *mongoUri,
|
||||||
Ctx: ctx,
|
Ctx: ctx,
|
||||||
},
|
},
|
||||||
ControllerId: *controllerId,
|
Controller: Controller{
|
||||||
|
ControllerId: *controllerId,
|
||||||
|
ControllerPasswd: *controllerPassword,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -12,16 +12,18 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
MQTT_STREAM_NAME = "mqtt"
|
MQTT_STREAM_NAME = "mqtt"
|
||||||
WS_STREAM_NAME = "ws"
|
WS_STREAM_NAME = "ws"
|
||||||
STOMP_STREAM_NAME = "stomp"
|
STOMP_STREAM_NAME = "stomp"
|
||||||
LORA_STREAM_NAME = "lora"
|
LORA_STREAM_NAME = "lora"
|
||||||
OPC_STREAM_NAME = "opc"
|
OPC_STREAM_NAME = "opc"
|
||||||
ADAPTER_SUBJECT = "adapter" + USP_SUBJECT
|
ADAPTER_SUBJECT = "adapter" + USP_SUBJECT
|
||||||
USP_SUBJECT = ".usp.v1."
|
USP_SUBJECT = ".usp.v1."
|
||||||
|
BUCKET_NAME = "devices-auth"
|
||||||
|
BUCKET_DESCRIPTION = "Devices authentication"
|
||||||
)
|
)
|
||||||
|
|
||||||
func StartNatsClient(c config.Nats) (jetstream.JetStream, *nats.Conn) {
|
func StartNatsClient(c config.Nats, controller config.Controller) (jetstream.JetStream, *nats.Conn) {
|
||||||
|
|
||||||
var (
|
var (
|
||||||
nc *nats.Conn
|
nc *nats.Conn
|
||||||
|
|
@ -59,9 +61,26 @@ func StartNatsClient(c config.Nats) (jetstream.JetStream, *nats.Conn) {
|
||||||
log.Fatalf("Failed to create Consumer: %v", err)
|
log.Fatalf("Failed to create Consumer: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
createKeyValueStores(c.Ctx, js, controller)
|
||||||
|
|
||||||
return js, nc
|
return js, nc
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func createKeyValueStores(ctx context.Context, js jetstream.JetStream, control config.Controller) {
|
||||||
|
kv, err := js.CreateOrUpdateKeyValue(ctx, jetstream.KeyValueConfig{
|
||||||
|
Bucket: BUCKET_NAME,
|
||||||
|
Description: BUCKET_DESCRIPTION,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Failed to create KeyValue store: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = kv.PutString(ctx, control.ControllerId, control.ControllerPasswd)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("Failed to create KeyValue store: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func createStreams(ctx context.Context, js jetstream.JetStream, streams []string) error {
|
func createStreams(ctx context.Context, js jetstream.JetStream, streams []string) error {
|
||||||
for _, stream := range streams {
|
for _, stream := range streams {
|
||||||
_, err := js.CreateOrUpdateStream(ctx, jetstream.StreamConfig{
|
_, err := js.CreateOrUpdateStream(ctx, jetstream.StreamConfig{
|
||||||
|
|
|
||||||
0
backend/services/mtp/mqtt-adapter/.env
Normal file
0
backend/services/mtp/mqtt-adapter/.env
Normal file
1
backend/services/mtp/mqtt-adapter/.gitignore
vendored
Normal file
1
backend/services/mtp/mqtt-adapter/.gitignore
vendored
Normal file
|
|
@ -0,0 +1 @@
|
||||||
|
.env.local
|
||||||
|
|
@ -43,7 +43,7 @@ func NewConfig() *Config {
|
||||||
natsVerifyCertificates := flag.Bool("nats_verify_certificates", lookupEnvOrBool("NATS_VERIFY_CERTIFICATES", false), "verify validity of certificates from nats server")
|
natsVerifyCertificates := flag.Bool("nats_verify_certificates", lookupEnvOrBool("NATS_VERIFY_CERTIFICATES", false), "verify validity of certificates from nats server")
|
||||||
mqttUrl := flag.String("mqtt_url", lookupEnvOrString("MQTT_URL", "tcp://localhost:1883"), "url for mqtt server")
|
mqttUrl := flag.String("mqtt_url", lookupEnvOrString("MQTT_URL", "tcp://localhost:1883"), "url for mqtt server")
|
||||||
mqttClientId := flag.String("mqtt_client_id", lookupEnvOrString("MQTT_CLIENT_ID", "mqtt-adapter"), "client id for mqtt")
|
mqttClientId := flag.String("mqtt_client_id", lookupEnvOrString("MQTT_CLIENT_ID", "mqtt-adapter"), "client id for mqtt")
|
||||||
mqttUsername := flag.String("mqtt_username", lookupEnvOrString("MQTT_USERNAME", ""), "username for mqtt")
|
mqttUsername := flag.String("mqtt_username", lookupEnvOrString("MQTT_USERNAME", "oktopusController"), "username for mqtt")
|
||||||
mqttPassword := flag.String("mqtt_password", lookupEnvOrString("MQTT_PASSWORD", ""), "password for mqtt")
|
mqttPassword := flag.String("mqtt_password", lookupEnvOrString("MQTT_PASSWORD", ""), "password for mqtt")
|
||||||
mqttQos := flag.Int("mqtt_qos", lookupEnvOrInt("MQTT_QOS", 1), "quality of service for mqtt")
|
mqttQos := flag.Int("mqtt_qos", lookupEnvOrInt("MQTT_QOS", 1), "quality of service for mqtt")
|
||||||
flHelp := flag.Bool("help", false, "Help")
|
flHelp := flag.Bool("help", false, "Help")
|
||||||
|
|
|
||||||
|
|
@ -1,4 +1,5 @@
|
||||||
- This MQTT broker runs with https://github.com/mochi-mqtt/server
|
- This MQTT broker runs with https://github.com/mochi-mqtt/server
|
||||||
- It's compatible with MQTTv5 and MQTTv3.1.1
|
- It's compatible with MQTTv5 and MQTTv3.1.1
|
||||||
|
- At authentication, the device is only allowed to subscribe and publish to topics that finish with their username
|
||||||
|
|
||||||
For more info access https://github.com/mochi-mqtt/server and/or execute "go run cmd/mqtt/main.go --help"
|
For more info access https://github.com/mochi-mqtt/server and/or execute "go run cmd/mqtt/main.go --help"
|
||||||
|
|
@ -1,54 +0,0 @@
|
||||||
{
|
|
||||||
"auth": [
|
|
||||||
{
|
|
||||||
"username": "leandro",
|
|
||||||
"password": "leandro",
|
|
||||||
"allow": true
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"username": "steve",
|
|
||||||
"password": "steve",
|
|
||||||
"allow": true
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"username": "root",
|
|
||||||
"password": "root",
|
|
||||||
"allow": true
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"remote": "*",
|
|
||||||
"allow": false
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"remote": "*",
|
|
||||||
"allow": false
|
|
||||||
}
|
|
||||||
],
|
|
||||||
"acl": [
|
|
||||||
{
|
|
||||||
"remote": "*"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"username": "leandro",
|
|
||||||
"filters": {
|
|
||||||
"oktopus/+/agent/+": 1,
|
|
||||||
"oktopus/+/controller/+": 2,
|
|
||||||
"oktopus/+/get/+": 2
|
|
||||||
}
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"username": "steve",
|
|
||||||
"filters": {
|
|
||||||
"oktopus/+/agent/+": 1,
|
|
||||||
"oktopus/+/controller/+": 2,
|
|
||||||
"oktopus/+/get/+": 2
|
|
||||||
}
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"username": "root",
|
|
||||||
"filters": {
|
|
||||||
"#": 3
|
|
||||||
}
|
|
||||||
}
|
|
||||||
]
|
|
||||||
}
|
|
||||||
|
|
@ -1 +0,0 @@
|
||||||
go run . -path auth.json
|
|
||||||
|
|
@ -14,9 +14,15 @@ require (
|
||||||
github.com/google/uuid v1.6.0 // indirect
|
github.com/google/uuid v1.6.0 // indirect
|
||||||
github.com/gorilla/websocket v1.5.0 // indirect
|
github.com/gorilla/websocket v1.5.0 // indirect
|
||||||
github.com/joho/godotenv v1.5.1 // indirect
|
github.com/joho/godotenv v1.5.1 // indirect
|
||||||
|
github.com/klauspost/compress v1.17.2 // indirect
|
||||||
github.com/mattn/go-colorable v0.1.12 // indirect
|
github.com/mattn/go-colorable v0.1.12 // indirect
|
||||||
github.com/mattn/go-isatty v0.0.14 // indirect
|
github.com/mattn/go-isatty v0.0.14 // indirect
|
||||||
|
github.com/nats-io/nats.go v1.34.1 // indirect
|
||||||
|
github.com/nats-io/nkeys v0.4.7 // indirect
|
||||||
|
github.com/nats-io/nuid v1.0.1 // indirect
|
||||||
github.com/rs/xid v1.4.0 // indirect
|
github.com/rs/xid v1.4.0 // indirect
|
||||||
golang.org/x/sys v0.5.0 // indirect
|
golang.org/x/crypto v0.18.0 // indirect
|
||||||
|
golang.org/x/sys v0.16.0 // indirect
|
||||||
|
golang.org/x/text v0.14.0 // indirect
|
||||||
gopkg.in/yaml.v3 v3.0.1 // indirect
|
gopkg.in/yaml.v3 v3.0.1 // indirect
|
||||||
)
|
)
|
||||||
|
|
|
||||||
|
|
@ -17,12 +17,20 @@ github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/ad
|
||||||
github.com/jinzhu/copier v0.3.5 h1:GlvfUwHk62RokgqVNvYsku0TATCF7bAHVwEXoBh3iJg=
|
github.com/jinzhu/copier v0.3.5 h1:GlvfUwHk62RokgqVNvYsku0TATCF7bAHVwEXoBh3iJg=
|
||||||
github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
|
github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
|
||||||
github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
|
github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
|
||||||
|
github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4=
|
||||||
|
github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
|
||||||
github.com/mattn/go-colorable v0.1.12 h1:jF+Du6AlPIjs2BiUiQlKOX0rt3SujHxPnksPKZbaA40=
|
github.com/mattn/go-colorable v0.1.12 h1:jF+Du6AlPIjs2BiUiQlKOX0rt3SujHxPnksPKZbaA40=
|
||||||
github.com/mattn/go-colorable v0.1.12/go.mod h1:u5H1YNBxpqRaxsYJYSkiCWKzEfiAb1Gb520KVy5xxl4=
|
github.com/mattn/go-colorable v0.1.12/go.mod h1:u5H1YNBxpqRaxsYJYSkiCWKzEfiAb1Gb520KVy5xxl4=
|
||||||
github.com/mattn/go-isatty v0.0.14 h1:yVuAays6BHfxijgZPzw+3Zlu5yQgKGP2/hcQbHb7S9Y=
|
github.com/mattn/go-isatty v0.0.14 h1:yVuAays6BHfxijgZPzw+3Zlu5yQgKGP2/hcQbHb7S9Y=
|
||||||
github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94=
|
github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94=
|
||||||
github.com/mochi-co/mqtt/v2 v2.2.16 h1:CBqbxFhExzASNjj4BjSei0hYY1F5N5IeDqNVhjN+tp8=
|
github.com/mochi-co/mqtt/v2 v2.2.16 h1:CBqbxFhExzASNjj4BjSei0hYY1F5N5IeDqNVhjN+tp8=
|
||||||
github.com/mochi-co/mqtt/v2 v2.2.16/go.mod h1:MDMTThFgWj/LjJ6wc51bP5l4xnJG/ahpc9tR9vZVf8Q=
|
github.com/mochi-co/mqtt/v2 v2.2.16/go.mod h1:MDMTThFgWj/LjJ6wc51bP5l4xnJG/ahpc9tR9vZVf8Q=
|
||||||
|
github.com/nats-io/nats.go v1.34.1 h1:syWey5xaNHZgicYBemv0nohUPPmaLteiBEUT6Q5+F/4=
|
||||||
|
github.com/nats-io/nats.go v1.34.1/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
|
||||||
|
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
|
||||||
|
github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc=
|
||||||
|
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
|
||||||
|
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
|
||||||
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
|
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
|
||||||
github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
|
github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
|
||||||
github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE=
|
github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE=
|
||||||
|
|
@ -34,12 +42,18 @@ github.com/rs/zerolog v1.29.1 h1:cO+d60CHkknCbvzEWxP0S9K6KqyTjrCNUy1LdQLCGPc=
|
||||||
github.com/rs/zerolog v1.29.1/go.mod h1:Le6ESbR7hc+DP6Lt1THiV8CQSdkkNrd3R0XbEgp3ZBU=
|
github.com/rs/zerolog v1.29.1/go.mod h1:Le6ESbR7hc+DP6Lt1THiV8CQSdkkNrd3R0XbEgp3ZBU=
|
||||||
github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY=
|
github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY=
|
||||||
github.com/yuin/gopher-lua v0.0.0-20210529063254-f4c35e4016d9 h1:k/gmLsJDWwWqbLCur2yWnJzwQEKRcAHXo6seXGuSwWw=
|
github.com/yuin/gopher-lua v0.0.0-20210529063254-f4c35e4016d9 h1:k/gmLsJDWwWqbLCur2yWnJzwQEKRcAHXo6seXGuSwWw=
|
||||||
|
golang.org/x/crypto v0.18.0 h1:PGVlW0xEltQnzFZ55hkuX5+KLyrMYhHld1YHO4AKcdc=
|
||||||
|
golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg=
|
||||||
golang.org/x/net v0.7.0 h1:rJrUqqhjsgNp7KqAIc25s9pZnjU7TUcSY7HcVZjdn1g=
|
golang.org/x/net v0.7.0 h1:rJrUqqhjsgNp7KqAIc25s9pZnjU7TUcSY7HcVZjdn1g=
|
||||||
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||||
golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||||
golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU=
|
golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU=
|
||||||
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||||
|
golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU=
|
||||||
|
golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
||||||
golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M=
|
golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M=
|
||||||
|
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
|
||||||
|
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
|
||||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
|
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
|
||||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||||
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
|
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,7 @@
|
||||||
package config
|
package config
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"flag"
|
"flag"
|
||||||
"log"
|
"log"
|
||||||
"os"
|
"os"
|
||||||
|
|
@ -16,7 +17,7 @@ type Config struct {
|
||||||
Tls bool
|
Tls bool
|
||||||
Fullchain string
|
Fullchain string
|
||||||
Privkey string
|
Privkey string
|
||||||
AuthFile string
|
AuthEnable bool
|
||||||
RedisEnable bool
|
RedisEnable bool
|
||||||
RedisAddr string
|
RedisAddr string
|
||||||
RedisPassword string
|
RedisPassword string
|
||||||
|
|
@ -25,9 +26,15 @@ type Config struct {
|
||||||
HttpEnable bool
|
HttpEnable bool
|
||||||
HttpPort string
|
HttpPort string
|
||||||
LogLevel int
|
LogLevel int
|
||||||
|
Nats Nats
|
||||||
}
|
}
|
||||||
|
|
||||||
//TODO: debug websocket and http listeners
|
type Nats struct {
|
||||||
|
Url string
|
||||||
|
Name string
|
||||||
|
VerifyCertificates bool
|
||||||
|
Ctx context.Context
|
||||||
|
}
|
||||||
|
|
||||||
func NewConfig() Config {
|
func NewConfig() Config {
|
||||||
|
|
||||||
|
|
@ -44,15 +51,18 @@ func NewConfig() Config {
|
||||||
tls := flag.Bool("mqtt_tls", lookupEnvOrBool("MQTT_TLS", false), "enable/disable TLS")
|
tls := flag.Bool("mqtt_tls", lookupEnvOrBool("MQTT_TLS", false), "enable/disable TLS")
|
||||||
fullchain := flag.String("full_chain_path", lookupEnvOrString("FULL_CHAIN_PATH", ""), "path to fullchain.pem certificate")
|
fullchain := flag.String("full_chain_path", lookupEnvOrString("FULL_CHAIN_PATH", ""), "path to fullchain.pem certificate")
|
||||||
privkey := flag.String("private_key_path", lookupEnvOrString("PRIVATE_KEY_PATH", ""), "path to privkey.pem certificate")
|
privkey := flag.String("private_key_path", lookupEnvOrString("PRIVATE_KEY_PATH", ""), "path to privkey.pem certificate")
|
||||||
authFile := flag.String("auth_file_path", lookupEnvOrString("AUTH_FILE_PATH", ""), "path to MQTT RBAC auth file")
|
authEnable := flag.Bool("auth_enable", lookupEnvOrBool("AUTH_ENABLE", false), "enable authentication")
|
||||||
redisEnable := flag.Bool("redis_enable", lookupEnvOrBool("REDIS_ENABLE", true), "enable/disable Redis db")
|
redisEnable := flag.Bool("redis_enable", lookupEnvOrBool("REDIS_ENABLE", true), "enable/disable Redis db")
|
||||||
redisAddr := flag.String("redis_addr", lookupEnvOrString("REDIS_ADDR", "localhost:6379"), "address of redis db")
|
redisAddr := flag.String("redis_addr", lookupEnvOrString("REDIS_ADDR", ""), "address of redis db")
|
||||||
redisPassword := flag.String("redis_passwd", lookupEnvOrString("REDIS_PASSWD", ""), "redis db password")
|
redisPassword := flag.String("redis_passwd", lookupEnvOrString("REDIS_PASSWD", ""), "redis db password")
|
||||||
wsEnable := flag.Bool("ws_enable", lookupEnvOrBool("WS_ENABLE", false), "enable/disable Websocket listener")
|
wsEnable := flag.Bool("ws_enable", lookupEnvOrBool("WS_ENABLE", false), "enable/disable Websocket listener")
|
||||||
wsPort := flag.String("ws_port", lookupEnvOrString("WS_PORT", ":80"), "port for Websocket listener")
|
wsPort := flag.String("ws_port", lookupEnvOrString("WS_PORT", ":80"), "port for Websocket listener")
|
||||||
httpEnable := flag.Bool("http_enable", lookupEnvOrBool("HTTP_ENABLE", false), "enable/disable HTTP listener of mqtt metrics")
|
httpEnable := flag.Bool("http_enable", lookupEnvOrBool("HTTP_ENABLE", false), "enable/disable HTTP listener of mqtt metrics")
|
||||||
httpPort := flag.String("http_port", lookupEnvOrString("HTTP_PORT", ":8080"), "port for HTTP listener of mqtt metrics")
|
httpPort := flag.String("http_port", lookupEnvOrString("HTTP_PORT", ":8080"), "port for HTTP listener of mqtt metrics")
|
||||||
logLevel := flag.Int("log_level", lookupEnvOrInt("LOG_LEVEL", 1), "0=DEBUG, 1=INFO, 2=WARNING, 3=ERROR")
|
logLevel := flag.Int("log_level", lookupEnvOrInt("LOG_LEVEL", 1), "0=DEBUG, 1=INFO, 2=WARNING, 3=ERROR")
|
||||||
|
natsUrl := flag.String("nats_url", lookupEnvOrString("NATS_URL", "nats://localhost:4222"), "url for nats server")
|
||||||
|
natsName := flag.String("nats_name", lookupEnvOrString("NATS_NAME", "adapter"), "name for nats client")
|
||||||
|
natsVerifyCertificates := flag.Bool("nats_verify_certificates", lookupEnvOrBool("NATS_VERIFY_CERTIFICATES", false), "verify validity of certificates from nats server")
|
||||||
|
|
||||||
flag.Parse()
|
flag.Parse()
|
||||||
flHelp := flag.Bool("help", false, "Help")
|
flHelp := flag.Bool("help", false, "Help")
|
||||||
|
|
@ -62,12 +72,14 @@ func NewConfig() Config {
|
||||||
os.Exit(0)
|
os.Exit(0)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ctx := context.TODO()
|
||||||
|
|
||||||
conf := Config{
|
conf := Config{
|
||||||
MqttPort: *mqttPort,
|
MqttPort: *mqttPort,
|
||||||
Tls: *tls,
|
Tls: *tls,
|
||||||
Fullchain: *fullchain,
|
Fullchain: *fullchain,
|
||||||
Privkey: *privkey,
|
Privkey: *privkey,
|
||||||
AuthFile: *authFile,
|
AuthEnable: *authEnable,
|
||||||
RedisEnable: *redisEnable,
|
RedisEnable: *redisEnable,
|
||||||
RedisAddr: *redisAddr,
|
RedisAddr: *redisAddr,
|
||||||
RedisPassword: *redisPassword,
|
RedisPassword: *redisPassword,
|
||||||
|
|
@ -76,6 +88,12 @@ func NewConfig() Config {
|
||||||
HttpEnable: *httpEnable,
|
HttpEnable: *httpEnable,
|
||||||
HttpPort: *httpPort,
|
HttpPort: *httpPort,
|
||||||
LogLevel: *logLevel,
|
LogLevel: *logLevel,
|
||||||
|
Nats: Nats{
|
||||||
|
Url: *natsUrl,
|
||||||
|
Name: *natsName,
|
||||||
|
VerifyCertificates: *natsVerifyCertificates,
|
||||||
|
Ctx: ctx,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
conf.validate()
|
conf.validate()
|
||||||
|
|
|
||||||
|
|
@ -52,17 +52,18 @@ func StartServers(c config.Config) {
|
||||||
|
|
||||||
func newMqttServer(c config.Config) *broker.Mqtt {
|
func newMqttServer(c config.Config) *broker.Mqtt {
|
||||||
return &broker.Mqtt{
|
return &broker.Mqtt{
|
||||||
Port: c.MqttPort,
|
Port: c.MqttPort,
|
||||||
Tls: c.Tls,
|
Tls: c.Tls,
|
||||||
Fullchain: c.Fullchain,
|
Fullchain: c.Fullchain,
|
||||||
Privkey: c.Privkey,
|
Privkey: c.Privkey,
|
||||||
AuthFile: c.AuthFile,
|
AuthEnable: c.AuthEnable,
|
||||||
Redis: broker.Redis{
|
Redis: broker.Redis{
|
||||||
RedisEnable: c.RedisEnable,
|
RedisEnable: c.RedisEnable,
|
||||||
RedisAddr: c.RedisAddr,
|
RedisAddr: c.RedisAddr,
|
||||||
RedisPassword: c.RedisPassword,
|
RedisPassword: c.RedisPassword,
|
||||||
},
|
},
|
||||||
LogLevel: c.LogLevel,
|
LogLevel: c.LogLevel,
|
||||||
|
Nats: c.Nats,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,18 +1,28 @@
|
||||||
package mqtt
|
package mqtt
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"broker/internal/config"
|
||||||
|
"broker/internal/nats"
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"context"
|
||||||
"log"
|
"log"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"github.com/mochi-co/mqtt/v2"
|
"github.com/mochi-co/mqtt/v2"
|
||||||
|
"github.com/mochi-co/mqtt/v2/hooks/auth"
|
||||||
"github.com/mochi-co/mqtt/v2/packets"
|
"github.com/mochi-co/mqtt/v2/packets"
|
||||||
|
"github.com/nats-io/nats.go/jetstream"
|
||||||
)
|
)
|
||||||
|
|
||||||
type MyHook struct {
|
type MyHook struct {
|
||||||
mqtt.HookBase
|
mqtt.HookBase
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type NatsAuthHook struct {
|
||||||
|
mqtt.HookBase
|
||||||
|
kv jetstream.KeyValue
|
||||||
|
}
|
||||||
|
|
||||||
func (h *MyHook) ID() string {
|
func (h *MyHook) ID() string {
|
||||||
return "events-controller"
|
return "events-controller"
|
||||||
}
|
}
|
||||||
|
|
@ -86,3 +96,91 @@ func (h *MyHook) OnPacketEncode(cl *mqtt.Client, pk packets.Packet) packets.Pack
|
||||||
|
|
||||||
return pk
|
return pk
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (h *NatsAuthHook) ID() string {
|
||||||
|
return "device-auth"
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *NatsAuthHook) Provides(b byte) bool {
|
||||||
|
return bytes.Contains([]byte{
|
||||||
|
mqtt.OnConnectAuthenticate,
|
||||||
|
mqtt.OnACLCheck,
|
||||||
|
}, []byte{b})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *NatsAuthHook) Init(c any) error {
|
||||||
|
|
||||||
|
_, _, kv := nats.StartNatsClient(c.(config.Nats))
|
||||||
|
h.kv = kv
|
||||||
|
|
||||||
|
h.Log.Info().Msg("initialised device auth nats hook")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *NatsAuthHook) OnConnectAuthenticate(cl *mqtt.Client, pk packets.Packet) bool {
|
||||||
|
|
||||||
|
username := string(pk.Connect.Username)
|
||||||
|
|
||||||
|
entry, err := h.kv.Get(context.TODO(), username)
|
||||||
|
if err != nil {
|
||||||
|
if err == jetstream.ErrKeyNotFound {
|
||||||
|
log.Println("user access not found, blocked user:", username)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
log.Println("error getting key value: ", err)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
if bytes.Equal(entry.Value(), pk.Connect.Password) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *NatsAuthHook) OnACLCheck(cl *mqtt.Client, topic string, write bool) bool {
|
||||||
|
|
||||||
|
username := string(cl.Properties.Username)
|
||||||
|
|
||||||
|
_, err := h.kv.Get(context.TODO(), username)
|
||||||
|
if err != nil {
|
||||||
|
if err == jetstream.ErrKeyNotFound {
|
||||||
|
log.Println("user access not found, blocked user:", username)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
log.Println("error getting key value: ", err)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
if username == "oktopusController" {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
if !write {
|
||||||
|
deviceAllowedTopics := []string{
|
||||||
|
"oktopus/usp/v1/agent/" + username,
|
||||||
|
}
|
||||||
|
for _, allowedTopic := range deviceAllowedTopics {
|
||||||
|
_, ok := auth.MatchTopic(allowedTopic, topic)
|
||||||
|
if ok {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
if write {
|
||||||
|
deviceAllowedTopics := []string{
|
||||||
|
"oktopus/usp/v1/controller",
|
||||||
|
"oktopus/usp/v1/api/" + username,
|
||||||
|
}
|
||||||
|
for _, allowedTopic := range deviceAllowedTopics {
|
||||||
|
_, ok := auth.MatchTopic(allowedTopic, topic)
|
||||||
|
if ok {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,10 +1,10 @@
|
||||||
package mqtt
|
package mqtt
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"broker/internal/config"
|
||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"log"
|
"log"
|
||||||
"os"
|
|
||||||
|
|
||||||
rv8 "github.com/go-redis/redis/v8"
|
rv8 "github.com/go-redis/redis/v8"
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
|
|
@ -20,13 +20,14 @@ var (
|
||||||
)
|
)
|
||||||
|
|
||||||
type Mqtt struct {
|
type Mqtt struct {
|
||||||
Port string
|
Port string
|
||||||
Tls bool
|
Tls bool
|
||||||
Fullchain string
|
Fullchain string
|
||||||
Privkey string
|
Privkey string
|
||||||
AuthFile string
|
Redis Redis
|
||||||
Redis Redis
|
LogLevel int
|
||||||
LogLevel int
|
Nats config.Nats
|
||||||
|
AuthEnable bool
|
||||||
}
|
}
|
||||||
|
|
||||||
type Redis struct {
|
type Redis struct {
|
||||||
|
|
@ -38,7 +39,7 @@ type Redis struct {
|
||||||
func (m *Mqtt) Start(mqttServer *mqtt.Server) {
|
func (m *Mqtt) Start(mqttServer *mqtt.Server) {
|
||||||
|
|
||||||
defineSeverLog(mqttServer, m.LogLevel)
|
defineSeverLog(mqttServer, m.LogLevel)
|
||||||
defineServerAuth(mqttServer, m.AuthFile)
|
defineServerAuth(mqttServer, m.Nats, m.AuthEnable)
|
||||||
|
|
||||||
server = mqttServer
|
server = mqttServer
|
||||||
|
|
||||||
|
|
@ -112,15 +113,9 @@ func defineServerTls(fullchain, privkey string) *listeners.Config {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func defineServerAuth(server *mqtt.Server, authFile string) {
|
func defineServerAuth(server *mqtt.Server, natsConfig config.Nats, authEnable bool) {
|
||||||
if authFile != "" {
|
if authEnable {
|
||||||
data, err := os.ReadFile(authFile)
|
err := server.AddHook(new(NatsAuthHook), natsConfig)
|
||||||
if err != nil {
|
|
||||||
log.Fatal(err)
|
|
||||||
}
|
|
||||||
err = server.AddHook(new(auth.Hook), &auth.Options{
|
|
||||||
Data: data,
|
|
||||||
})
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
82
backend/services/mtp/mqtt/internal/nats/nats.go
Normal file
82
backend/services/mtp/mqtt/internal/nats/nats.go
Normal file
|
|
@ -0,0 +1,82 @@
|
||||||
|
package nats
|
||||||
|
|
||||||
|
import (
|
||||||
|
"broker/internal/config"
|
||||||
|
"log"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/nats-io/nats.go"
|
||||||
|
"github.com/nats-io/nats.go/jetstream"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
NATS_ACCOUNT_SUBJ_PREFIX = "account-manager.v1."
|
||||||
|
NATS_REQUEST_TIMEOUT = 10 * time.Second
|
||||||
|
NATS_MQTT_SUBJECT_PREFIX = "mqtt.usp.v1."
|
||||||
|
NATS_MQTT_ADAPTER_SUBJECT_PREFIX = "mqtt-adapter.usp.v1."
|
||||||
|
NATS_ADAPTER_SUBJECT = "adapter.usp.v1."
|
||||||
|
NATS_WS_SUBJECT_PREFIX = "ws.usp.v1."
|
||||||
|
NATS_WS_ADAPTER_SUBJECT_PREFIX = "ws-adapter.usp.v1."
|
||||||
|
DEVICE_SUBJECT_PREFIX = "device.usp.v1."
|
||||||
|
BUCKET_NAME = "devices-auth"
|
||||||
|
BUCKET_DESCRIPTION = "Devices authentication"
|
||||||
|
)
|
||||||
|
|
||||||
|
func StartNatsClient(c config.Nats) (jetstream.JetStream, *nats.Conn, jetstream.KeyValue) {
|
||||||
|
|
||||||
|
var (
|
||||||
|
nc *nats.Conn
|
||||||
|
err error
|
||||||
|
)
|
||||||
|
|
||||||
|
opts := defineOptions(c)
|
||||||
|
|
||||||
|
log.Printf("Connecting to NATS server %s", c.Url)
|
||||||
|
|
||||||
|
for {
|
||||||
|
nc, err = nats.Connect(c.Url, opts...)
|
||||||
|
if err != nil {
|
||||||
|
time.Sleep(5 * time.Second)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
log.Printf("Successfully connected to NATS server %s", c.Url)
|
||||||
|
|
||||||
|
js, err := jetstream.New(nc)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Failed to create JetStream client: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
kv, err := js.CreateOrUpdateKeyValue(c.Ctx, jetstream.KeyValueConfig{
|
||||||
|
Bucket: BUCKET_NAME,
|
||||||
|
Description: BUCKET_DESCRIPTION,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Failed to create KeyValue store: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return js, nc, kv
|
||||||
|
}
|
||||||
|
|
||||||
|
func defineOptions(c config.Nats) []nats.Option {
|
||||||
|
var opts []nats.Option
|
||||||
|
|
||||||
|
opts = append(opts, nats.Name(c.Name))
|
||||||
|
opts = append(opts, nats.MaxReconnects(-1))
|
||||||
|
opts = append(opts, nats.ReconnectWait(5*time.Second))
|
||||||
|
opts = append(opts, nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
|
||||||
|
log.Printf("Got disconnected! Reason: %q\n", err)
|
||||||
|
}))
|
||||||
|
opts = append(opts, nats.ReconnectHandler(func(nc *nats.Conn) {
|
||||||
|
log.Printf("Got reconnected to %v!\n", nc.ConnectedUrl())
|
||||||
|
}))
|
||||||
|
opts = append(opts, nats.ClosedHandler(func(nc *nats.Conn) {
|
||||||
|
log.Printf("Connection closed. Reason: %q\n", nc.LastError())
|
||||||
|
}))
|
||||||
|
if c.VerifyCertificates {
|
||||||
|
opts = append(opts, nats.RootCAs())
|
||||||
|
}
|
||||||
|
|
||||||
|
return opts
|
||||||
|
}
|
||||||
1
backend/services/mtp/ws-adapter/.gitignore
vendored
Normal file
1
backend/services/mtp/ws-adapter/.gitignore
vendored
Normal file
|
|
@ -0,0 +1 @@
|
||||||
|
.env.local
|
||||||
|
|
@ -7,6 +7,7 @@ import (
|
||||||
"syscall"
|
"syscall"
|
||||||
|
|
||||||
"github.com/OktopUSP/oktopus/ws/internal/config"
|
"github.com/OktopUSP/oktopus/ws/internal/config"
|
||||||
|
"github.com/OktopUSP/oktopus/ws/internal/nats"
|
||||||
"github.com/OktopUSP/oktopus/ws/internal/ws"
|
"github.com/OktopUSP/oktopus/ws/internal/ws"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -19,7 +20,9 @@ func main() {
|
||||||
// Locks app running until it receives a stop command as Ctrl+C.
|
// Locks app running until it receives a stop command as Ctrl+C.
|
||||||
signal.Notify(done, syscall.SIGINT)
|
signal.Notify(done, syscall.SIGINT)
|
||||||
|
|
||||||
ws.StartNewServer(conf)
|
_, kv := nats.StartNatsClient(conf.Nats)
|
||||||
|
|
||||||
|
ws.StartNewServer(conf, kv)
|
||||||
|
|
||||||
<-done
|
<-done
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -9,4 +9,13 @@ require (
|
||||||
google.golang.org/protobuf v1.32.0
|
google.golang.org/protobuf v1.32.0
|
||||||
)
|
)
|
||||||
|
|
||||||
require golang.org/x/net v0.17.0 // indirect
|
require (
|
||||||
|
github.com/klauspost/compress v1.17.2 // indirect
|
||||||
|
github.com/nats-io/nats.go v1.34.1 // indirect
|
||||||
|
github.com/nats-io/nkeys v0.4.7 // indirect
|
||||||
|
github.com/nats-io/nuid v1.0.1 // indirect
|
||||||
|
golang.org/x/crypto v0.18.0 // indirect
|
||||||
|
golang.org/x/net v0.17.0 // indirect
|
||||||
|
golang.org/x/sys v0.16.0 // indirect
|
||||||
|
golang.org/x/text v0.14.0 // indirect
|
||||||
|
)
|
||||||
|
|
|
||||||
|
|
@ -6,8 +6,22 @@ github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/
|
||||||
github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY=
|
github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY=
|
||||||
github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
|
github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
|
||||||
github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
|
github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
|
||||||
|
github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4=
|
||||||
|
github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
|
||||||
|
github.com/nats-io/nats.go v1.34.1 h1:syWey5xaNHZgicYBemv0nohUPPmaLteiBEUT6Q5+F/4=
|
||||||
|
github.com/nats-io/nats.go v1.34.1/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
|
||||||
|
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
|
||||||
|
github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc=
|
||||||
|
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
|
||||||
|
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
|
||||||
|
golang.org/x/crypto v0.18.0 h1:PGVlW0xEltQnzFZ55hkuX5+KLyrMYhHld1YHO4AKcdc=
|
||||||
|
golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg=
|
||||||
golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM=
|
golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM=
|
||||||
golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
|
golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
|
||||||
|
golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU=
|
||||||
|
golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
||||||
|
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
|
||||||
|
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
|
||||||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
|
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
|
||||||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||||
google.golang.org/protobuf v1.32.0 h1:pPC6BG5ex8PDFnkbrGU3EixyhKcQ2aDuBS36lqK/C7I=
|
google.golang.org/protobuf v1.32.0 h1:pPC6BG5ex8PDFnkbrGU3EixyhKcQ2aDuBS36lqK/C7I=
|
||||||
|
|
|
||||||
|
|
@ -2,6 +2,7 @@
|
||||||
package config
|
package config
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"flag"
|
"flag"
|
||||||
"log"
|
"log"
|
||||||
"os"
|
"os"
|
||||||
|
|
@ -13,9 +14,16 @@ import (
|
||||||
type Config struct {
|
type Config struct {
|
||||||
Port string // server port: e.g. ":8080"
|
Port string // server port: e.g. ":8080"
|
||||||
Auth bool // server auth enable/disable
|
Auth bool // server auth enable/disable
|
||||||
Token string // controller auth token
|
|
||||||
ControllerEID string // controller endpoint id
|
ControllerEID string // controller endpoint id
|
||||||
Tls bool // enable/diable websockets server tls
|
Tls bool // enable/diable websockets server tls
|
||||||
|
Nats Nats
|
||||||
|
}
|
||||||
|
|
||||||
|
type Nats struct {
|
||||||
|
Url string
|
||||||
|
Name string
|
||||||
|
VerifyCertificates bool
|
||||||
|
Ctx context.Context
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewConfig() Config {
|
func NewConfig() Config {
|
||||||
|
|
@ -33,8 +41,10 @@ func NewConfig() Config {
|
||||||
*/
|
*/
|
||||||
|
|
||||||
/* ------------------------------ define flags ------------------------------ */
|
/* ------------------------------ define flags ------------------------------ */
|
||||||
|
natsUrl := flag.String("nats_url", lookupEnvOrString("NATS_URL", "nats://localhost:4222"), "url for nats server")
|
||||||
|
natsName := flag.String("nats_name", lookupEnvOrString("NATS_NAME", "ws-adapter"), "name for nats client")
|
||||||
|
natsVerifyCertificates := flag.Bool("nats_verify_certificates", lookupEnvOrBool("NATS_VERIFY_CERTIFICATES", false), "verify validity of certificates from nats server")
|
||||||
flPort := flag.String("port", lookupEnvOrString("SERVER_PORT", ":8080"), "Server port")
|
flPort := flag.String("port", lookupEnvOrString("SERVER_PORT", ":8080"), "Server port")
|
||||||
flToken := flag.String("token", lookupEnvOrString("SERVER_AUTH_TOKEN", ""), "Controller auth token")
|
|
||||||
flAuth := flag.Bool("auth", lookupEnvOrBool("SERVER_AUTH_ENABLE", false), "Server auth enable/disable")
|
flAuth := flag.Bool("auth", lookupEnvOrBool("SERVER_AUTH_ENABLE", false), "Server auth enable/disable")
|
||||||
flControllerEid := flag.String("controller-eid", lookupEnvOrString("CONTROLLER_EID", "oktopusController"), "Controller eid")
|
flControllerEid := flag.String("controller-eid", lookupEnvOrString("CONTROLLER_EID", "oktopusController"), "Controller eid")
|
||||||
flTls := flag.Bool("tls", lookupEnvOrBool("SERVER_TLS_ENABLE", false), "Enable/diable websockets server tls")
|
flTls := flag.Bool("tls", lookupEnvOrBool("SERVER_TLS_ENABLE", false), "Enable/diable websockets server tls")
|
||||||
|
|
@ -47,12 +57,19 @@ func NewConfig() Config {
|
||||||
os.Exit(0)
|
os.Exit(0)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ctx := context.TODO()
|
||||||
|
|
||||||
return Config{
|
return Config{
|
||||||
Port: *flPort,
|
Port: *flPort,
|
||||||
Token: *flToken,
|
|
||||||
Auth: *flAuth,
|
Auth: *flAuth,
|
||||||
ControllerEID: *flControllerEid,
|
ControllerEID: *flControllerEid,
|
||||||
Tls: *flTls,
|
Tls: *flTls,
|
||||||
|
Nats: Nats{
|
||||||
|
Url: *natsUrl,
|
||||||
|
Name: *natsName,
|
||||||
|
VerifyCertificates: *natsVerifyCertificates,
|
||||||
|
Ctx: ctx,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
74
backend/services/mtp/ws/internal/nats/nats.go
Normal file
74
backend/services/mtp/ws/internal/nats/nats.go
Normal file
|
|
@ -0,0 +1,74 @@
|
||||||
|
package nats
|
||||||
|
|
||||||
|
import (
|
||||||
|
"log"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/OktopUSP/oktopus/ws/internal/config"
|
||||||
|
"github.com/nats-io/nats.go"
|
||||||
|
"github.com/nats-io/nats.go/jetstream"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
BUCKET_NAME = "devices-auth"
|
||||||
|
BUCKET_DESCRIPTION = "Devices authentication"
|
||||||
|
)
|
||||||
|
|
||||||
|
func StartNatsClient(c config.Nats) (*nats.Conn, jetstream.KeyValue) {
|
||||||
|
|
||||||
|
var (
|
||||||
|
nc *nats.Conn
|
||||||
|
err error
|
||||||
|
)
|
||||||
|
|
||||||
|
opts := defineOptions(c)
|
||||||
|
|
||||||
|
log.Printf("Connecting to NATS server %s", c.Url)
|
||||||
|
|
||||||
|
for {
|
||||||
|
nc, err = nats.Connect(c.Url, opts...)
|
||||||
|
if err != nil {
|
||||||
|
time.Sleep(5 * time.Second)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
log.Printf("Successfully connected to NATS server %s", c.Url)
|
||||||
|
|
||||||
|
js, err := jetstream.New(nc)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Failed to create JetStream client: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
kv, err := js.CreateOrUpdateKeyValue(c.Ctx, jetstream.KeyValueConfig{
|
||||||
|
Bucket: BUCKET_NAME,
|
||||||
|
Description: BUCKET_DESCRIPTION,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Failed to create KeyValue store: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nc, kv
|
||||||
|
}
|
||||||
|
|
||||||
|
func defineOptions(c config.Nats) []nats.Option {
|
||||||
|
var opts []nats.Option
|
||||||
|
|
||||||
|
opts = append(opts, nats.Name(c.Name))
|
||||||
|
opts = append(opts, nats.MaxReconnects(-1))
|
||||||
|
opts = append(opts, nats.ReconnectWait(5*time.Second))
|
||||||
|
opts = append(opts, nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
|
||||||
|
log.Printf("Got disconnected! Reason: %q\n", err)
|
||||||
|
}))
|
||||||
|
opts = append(opts, nats.ReconnectHandler(func(nc *nats.Conn) {
|
||||||
|
log.Printf("Got reconnected to %v!\n", nc.ConnectedUrl())
|
||||||
|
}))
|
||||||
|
opts = append(opts, nats.ClosedHandler(func(nc *nats.Conn) {
|
||||||
|
log.Printf("Connection closed. Reason: %q\n", nc.LastError())
|
||||||
|
}))
|
||||||
|
if c.VerifyCertificates {
|
||||||
|
opts = append(opts, nats.RootCAs())
|
||||||
|
}
|
||||||
|
|
||||||
|
return opts
|
||||||
|
}
|
||||||
|
|
@ -7,7 +7,9 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/OktopUSP/oktopus/ws/internal/usp_record"
|
"github.com/OktopUSP/oktopus/ws/internal/usp_record"
|
||||||
|
"github.com/gorilla/mux"
|
||||||
"github.com/gorilla/websocket"
|
"github.com/gorilla/websocket"
|
||||||
|
"github.com/nats-io/nats.go/jetstream"
|
||||||
"google.golang.org/protobuf/proto"
|
"google.golang.org/protobuf/proto"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -148,10 +150,26 @@ func (c *Client) writePump() {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Handle USP Controller events
|
// Handle USP Controller events
|
||||||
func ServeController(w http.ResponseWriter, r *http.Request, token, cEID string, authEnable bool) {
|
func ServeController(
|
||||||
|
w http.ResponseWriter,
|
||||||
|
r *http.Request,
|
||||||
|
cEID string,
|
||||||
|
authEnable bool,
|
||||||
|
kv jetstream.KeyValue,
|
||||||
|
) {
|
||||||
if authEnable {
|
if authEnable {
|
||||||
|
entry, err := kv.Get(r.Context(), cEID)
|
||||||
|
if err != nil {
|
||||||
|
if err == jetstream.ErrKeyNotFound {
|
||||||
|
w.WriteHeader(http.StatusUnauthorized)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
log.Println("Nats kv error:", err)
|
||||||
|
w.Write([]byte("Nats kv error:" + err.Error()))
|
||||||
|
return
|
||||||
|
}
|
||||||
recv_token := r.URL.Query().Get("token")
|
recv_token := r.URL.Query().Get("token")
|
||||||
if recv_token != token {
|
if recv_token != string(entry.Value()) {
|
||||||
w.WriteHeader(http.StatusUnauthorized)
|
w.WriteHeader(http.StatusUnauthorized)
|
||||||
w.Write([]byte("Unauthorized"))
|
w.Write([]byte("Unauthorized"))
|
||||||
return
|
return
|
||||||
|
|
@ -172,7 +190,13 @@ func ServeController(w http.ResponseWriter, r *http.Request, token, cEID string,
|
||||||
}
|
}
|
||||||
|
|
||||||
// Handle USP Agent events, cEID = controller endpoint id
|
// Handle USP Agent events, cEID = controller endpoint id
|
||||||
func ServeAgent(w http.ResponseWriter, r *http.Request, cEID string) {
|
func ServeAgent(
|
||||||
|
w http.ResponseWriter,
|
||||||
|
r *http.Request,
|
||||||
|
cEID string,
|
||||||
|
kv jetstream.KeyValue,
|
||||||
|
authEnable bool,
|
||||||
|
) {
|
||||||
|
|
||||||
header := http.Header{
|
header := http.Header{
|
||||||
"Sec-Websocket-Protocol": {uspVersion},
|
"Sec-Websocket-Protocol": {uspVersion},
|
||||||
|
|
@ -187,6 +211,24 @@ func ServeAgent(w http.ResponseWriter, r *http.Request, cEID string) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if authEnable {
|
||||||
|
entry, err := kv.Get(r.Context(), deviceid)
|
||||||
|
if err != nil {
|
||||||
|
if err == jetstream.ErrKeyNotFound {
|
||||||
|
w.WriteHeader(http.StatusUnauthorized)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
log.Println("Nats kv error:", err)
|
||||||
|
w.Write([]byte("Nats kv error:" + err.Error()))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if mux.Vars(r)["passwd"] != string(entry.Value()) {
|
||||||
|
w.WriteHeader(http.StatusUnauthorized)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
conn, err := upgrader.Upgrade(w, r, header)
|
conn, err := upgrader.Upgrade(w, r, header)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Println(err)
|
log.Println(err)
|
||||||
|
|
|
||||||
|
|
@ -9,19 +9,23 @@ import (
|
||||||
"github.com/OktopUSP/oktopus/ws/internal/config"
|
"github.com/OktopUSP/oktopus/ws/internal/config"
|
||||||
"github.com/OktopUSP/oktopus/ws/internal/ws/handler"
|
"github.com/OktopUSP/oktopus/ws/internal/ws/handler"
|
||||||
"github.com/gorilla/mux"
|
"github.com/gorilla/mux"
|
||||||
|
"github.com/nats-io/nats.go/jetstream"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Starts New Websockets Server
|
// Starts New Websockets Server
|
||||||
func StartNewServer(c config.Config) {
|
func StartNewServer(c config.Config, kv jetstream.KeyValue) {
|
||||||
// Initialize handlers of websockets events
|
// Initialize handlers of websockets events
|
||||||
go handler.InitHandlers(c.ControllerEID)
|
go handler.InitHandlers(c.ControllerEID)
|
||||||
|
|
||||||
r := mux.NewRouter()
|
r := mux.NewRouter()
|
||||||
|
r.HandleFunc("/ws/agent/{passwd}", func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
handler.ServeAgent(w, r, c.ControllerEID, kv, c.Auth)
|
||||||
|
})
|
||||||
r.HandleFunc("/ws/agent", func(w http.ResponseWriter, r *http.Request) {
|
r.HandleFunc("/ws/agent", func(w http.ResponseWriter, r *http.Request) {
|
||||||
handler.ServeAgent(w, r, c.ControllerEID)
|
handler.ServeAgent(w, r, c.ControllerEID, kv, c.Auth)
|
||||||
})
|
})
|
||||||
r.HandleFunc("/ws/controller", func(w http.ResponseWriter, r *http.Request) {
|
r.HandleFunc("/ws/controller", func(w http.ResponseWriter, r *http.Request) {
|
||||||
handler.ServeController(w, r, c.Token, c.ControllerEID, c.Auth)
|
handler.ServeController(w, r, c.ControllerEID, c.Auth, kv)
|
||||||
})
|
})
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
|
|
|
||||||
|
|
@ -1,7 +1,7 @@
|
||||||
.PHONY: help build
|
.PHONY: help build
|
||||||
|
|
||||||
.DEFAULT_GOAL := help
|
.DEFAULT_GOAL := help
|
||||||
DOCKER_USER ?= oktopus
|
DOCKER_USER ?= oktopusp
|
||||||
|
|
||||||
help:
|
help:
|
||||||
@echo "Makefile arguments:"
|
@echo "Makefile arguments:"
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue
Block a user