diff --git a/backend/services/controller/cmd/oktopus/main.go b/backend/services/controller/cmd/oktopus/main.go index f56aa37..0423df9 100755 --- a/backend/services/controller/cmd/oktopus/main.go +++ b/backend/services/controller/cmd/oktopus/main.go @@ -5,6 +5,7 @@ package main import ( "context" "flag" + "github.com/leandrofars/oktopus/internal/db" "log" "os" "os/signal" @@ -35,6 +36,7 @@ func main() { flBrokerPassword := flag.String("P", "", "Mqtt broker password") flBrokerClientId := flag.String("i", "", "A clientid for the Mqtt connection") flBrokerQos := flag.Int("q", 2, "Quality of service of mqtt messages delivery") + flAddrDB := flag.String("mongo", "mongodb://localhost:27017/", "MongoDB URI") flHelp := flag.Bool("help", false, "Help") flag.Parse() @@ -47,8 +49,8 @@ func main() { This context suppress our needs, but we can use a more sofisticate approach with cancel and timeout options passing it through paho mqtt functions. */ - ctx := context.Background() - + ctx, cancel := context.WithCancel(context.Background()) + database := db.NewDatabase(ctx, *flAddrDB) /* If you want to use another message protocol just make it implement Broker interface. */ @@ -63,11 +65,13 @@ func main() { SubTopic: *flSubTopic, DevicesTopic: *flDevicesTopic, CA: *flTlsCert, + DB: database, } mtp.MtpService(&mqttClient, done) <-done + cancel() log.Println("(⌐■_■) Oktopus is out!") diff --git a/backend/services/controller/go.mod b/backend/services/controller/go.mod index e2dc489..e8d211b 100755 --- a/backend/services/controller/go.mod +++ b/backend/services/controller/go.mod @@ -2,9 +2,22 @@ module github.com/leandrofars/oktopus go 1.18 -require google.golang.org/protobuf v1.28.1 +require ( + github.com/eclipse/paho.golang v0.10.0 + go.mongodb.org/mongo-driver v1.11.3 + google.golang.org/protobuf v1.28.1 +) require ( - github.com/eclipse/paho.golang v0.10.0 // indirect - golang.org/x/sync v0.0.0-20201207232520-09787c993a3a // indirect + github.com/golang/snappy v0.0.1 // indirect + github.com/klauspost/compress v1.13.6 // indirect + github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe // indirect + github.com/pkg/errors v0.9.1 // indirect + github.com/xdg-go/pbkdf2 v1.0.0 // indirect + github.com/xdg-go/scram v1.1.1 // indirect + github.com/xdg-go/stringprep v1.0.3 // indirect + github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect + golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d // indirect + golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect + golang.org/x/text v0.3.7 // indirect ) diff --git a/backend/services/controller/internal/db/db.go b/backend/services/controller/internal/db/db.go new file mode 100644 index 0000000..5f217f7 --- /dev/null +++ b/backend/services/controller/internal/db/db.go @@ -0,0 +1,28 @@ +package db + +import ( + "context" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" + "log" +) + +type Database struct { + devices *mongo.Collection + ctx context.Context +} + +func NewDatabase(ctx context.Context, mongoUri string) Database { + var db Database + clientOptions := options.Client().ApplyURI(mongoUri) + client, err := mongo.Connect(ctx, clientOptions) + if err != nil { + log.Fatal(err) + } + + log.Println("Connected to MongoDB-->", mongoUri) + devices := client.Database("oktopus").Collection("devices") + db.devices = devices + db.ctx = ctx + return db +} diff --git a/backend/services/controller/internal/db/device.go b/backend/services/controller/internal/db/device.go new file mode 100644 index 0000000..6883b2c --- /dev/null +++ b/backend/services/controller/internal/db/device.go @@ -0,0 +1,21 @@ +package db + +type Device struct { + Model string + Customer string + Vendor string + Version string +} + +func (d *Database) CreateDevice(device Device) error { + _, err := d.devices.InsertOne(d.ctx, device, nil) + return err +} + +func (d *Database) RetrieveDevice() { + +} + +func (d *Database) DeleteDevice() { + +} diff --git a/backend/services/controller/internal/mqtt/mqtt-server.go b/backend/services/controller/internal/mqtt/mqtt-server.go deleted file mode 100755 index 40583aa..0000000 --- a/backend/services/controller/internal/mqtt/mqtt-server.go +++ /dev/null @@ -1,30 +0,0 @@ -/* - Runs MQTT broker trough a Docker container. - Better approach would be to use docker api to Go language, but os/exec lib is already enough for our purpose, - since it's more convenient and easier to use docker shell commands, and it's already a start point. -*/ -package mqtt - -import ( - "log" - "os/exec" -) - -// Get Mqtt Broker up and running -func StartMqttBroker() { - - //TODO: Start Container through Docker SDK for GO, eliminating docker-compose and shell comands. - //TODO: Create Broker with user, password and CA certificate. - //TODO: Set MQTTv5 CONNACK packet with topic for agent to use. - - cmd := exec.Command("sudo", "docker", "compose", "-f", "internal/mosquitto/docker-compose.yml", "up", "-d") - - err := cmd.Run() - - if err != nil { - log.Fatal(err.Error()) - return - } - - log.Println("Broker Mqtt Up and Running!") -} diff --git a/backend/services/controller/internal/mqtt/mqtt-client.go b/backend/services/controller/internal/mqtt/mqtt.go similarity index 89% rename from backend/services/controller/internal/mqtt/mqtt-client.go rename to backend/services/controller/internal/mqtt/mqtt.go index 9df597f..a30b206 100644 --- a/backend/services/controller/internal/mqtt/mqtt-client.go +++ b/backend/services/controller/internal/mqtt/mqtt.go @@ -4,18 +4,17 @@ import ( "context" "crypto/tls" "crypto/x509" + "github.com/eclipse/paho.golang/paho" + "github.com/leandrofars/oktopus/internal/db" usp_msg "github.com/leandrofars/oktopus/internal/usp_message" "github.com/leandrofars/oktopus/internal/usp_record" + "github.com/leandrofars/oktopus/internal/utils" "google.golang.org/protobuf/proto" "io/ioutil" "log" "net" "strings" "sync" - "time" - - "github.com/eclipse/paho.golang/paho" - "github.com/leandrofars/oktopus/internal/utils" ) type Mqtt struct { @@ -29,6 +28,7 @@ type Mqtt struct { SubTopic string DevicesTopic string CA string + DB db.Database } var c *paho.Client @@ -53,10 +53,6 @@ func (m *Mqtt) Connect() { // Sets global client to be used by other mqtt functions c = clientConfig - if conn.ReasonCode != 0 { - log.Fatalf("Failed to connect to %s : %d - %s", m.Addr, conn.ReasonCode, conn.Properties.ReasonString) - } - log.Printf("Connected to broker--> %s:%s", m.Addr, m.Port) } @@ -238,8 +234,12 @@ func (m *Mqtt) handleNewDevice(deviceMac string) { Request: &usp_msg.Request{ ReqType: &usp_msg.Request_Get{ Get: &usp_msg.Get{ - ParamPaths: []string{"Device.DeviceInfo."}, - MaxDepth: 1, + ParamPaths: []string{ + "Device.DeviceInfo.Manufacturer", + "Device.DeviceInfo.ModelName", + "Device.DeviceInfo.SoftwareVersion", + }, + MaxDepth: 1, }, }, }, @@ -263,7 +263,6 @@ func (m *Mqtt) handleNewDevice(deviceMac string) { if err != nil { log.Fatalln("Failed to encode address book:", err) } - time.Sleep(5 * time.Second) m.Publish(tr369Message, "oktopus/v1/agent/"+deviceMac, "oktopus/v1/controller/"+deviceMac) } @@ -280,5 +279,17 @@ func (m *Mqtt) handleDevicesResponse(p []byte) { log.Fatal(err) } - log.Printf("Received a usp_message: %s\n", message.String()) + var device db.Device + msg := message.Body.MsgBody.(*usp_msg.Body_Response).Response.GetGetResp() + + device.Vendor = msg.ReqPathResults[0].ResolvedPathResults[0].ResultParams["Manufacturer"] + device.Model = msg.ReqPathResults[1].ResolvedPathResults[0].ResultParams["ModelName"] + device.Version = msg.ReqPathResults[2].ResolvedPathResults[0].ResultParams["SoftwareVersion"] + + err = m.DB.CreateDevice(device) + if err != nil { + log.Fatal(err) + } + + log.Printf("New device saved at database") }