feat:save devices at database
This commit is contained in:
parent
ec64cd9679
commit
45364aec39
|
|
@ -5,6 +5,7 @@ package main
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"flag"
|
"flag"
|
||||||
|
"github.com/leandrofars/oktopus/internal/db"
|
||||||
"log"
|
"log"
|
||||||
"os"
|
"os"
|
||||||
"os/signal"
|
"os/signal"
|
||||||
|
|
@ -35,6 +36,7 @@ func main() {
|
||||||
flBrokerPassword := flag.String("P", "", "Mqtt broker password")
|
flBrokerPassword := flag.String("P", "", "Mqtt broker password")
|
||||||
flBrokerClientId := flag.String("i", "", "A clientid for the Mqtt connection")
|
flBrokerClientId := flag.String("i", "", "A clientid for the Mqtt connection")
|
||||||
flBrokerQos := flag.Int("q", 2, "Quality of service of mqtt messages delivery")
|
flBrokerQos := flag.Int("q", 2, "Quality of service of mqtt messages delivery")
|
||||||
|
flAddrDB := flag.String("mongo", "mongodb://localhost:27017/", "MongoDB URI")
|
||||||
flHelp := flag.Bool("help", false, "Help")
|
flHelp := flag.Bool("help", false, "Help")
|
||||||
|
|
||||||
flag.Parse()
|
flag.Parse()
|
||||||
|
|
@ -47,8 +49,8 @@ func main() {
|
||||||
This context suppress our needs, but we can use a more sofisticate
|
This context suppress our needs, but we can use a more sofisticate
|
||||||
approach with cancel and timeout options passing it through paho mqtt functions.
|
approach with cancel and timeout options passing it through paho mqtt functions.
|
||||||
*/
|
*/
|
||||||
ctx := context.Background()
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
database := db.NewDatabase(ctx, *flAddrDB)
|
||||||
/*
|
/*
|
||||||
If you want to use another message protocol just make it implement Broker interface.
|
If you want to use another message protocol just make it implement Broker interface.
|
||||||
*/
|
*/
|
||||||
|
|
@ -63,11 +65,13 @@ func main() {
|
||||||
SubTopic: *flSubTopic,
|
SubTopic: *flSubTopic,
|
||||||
DevicesTopic: *flDevicesTopic,
|
DevicesTopic: *flDevicesTopic,
|
||||||
CA: *flTlsCert,
|
CA: *flTlsCert,
|
||||||
|
DB: database,
|
||||||
}
|
}
|
||||||
|
|
||||||
mtp.MtpService(&mqttClient, done)
|
mtp.MtpService(&mqttClient, done)
|
||||||
|
|
||||||
<-done
|
<-done
|
||||||
|
cancel()
|
||||||
|
|
||||||
log.Println("(⌐■_■) Oktopus is out!")
|
log.Println("(⌐■_■) Oktopus is out!")
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -2,9 +2,22 @@ module github.com/leandrofars/oktopus
|
||||||
|
|
||||||
go 1.18
|
go 1.18
|
||||||
|
|
||||||
require google.golang.org/protobuf v1.28.1
|
require (
|
||||||
|
github.com/eclipse/paho.golang v0.10.0
|
||||||
|
go.mongodb.org/mongo-driver v1.11.3
|
||||||
|
google.golang.org/protobuf v1.28.1
|
||||||
|
)
|
||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/eclipse/paho.golang v0.10.0 // indirect
|
github.com/golang/snappy v0.0.1 // indirect
|
||||||
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a // indirect
|
github.com/klauspost/compress v1.13.6 // indirect
|
||||||
|
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe // indirect
|
||||||
|
github.com/pkg/errors v0.9.1 // indirect
|
||||||
|
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
|
||||||
|
github.com/xdg-go/scram v1.1.1 // indirect
|
||||||
|
github.com/xdg-go/stringprep v1.0.3 // indirect
|
||||||
|
github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect
|
||||||
|
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d // indirect
|
||||||
|
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
|
||||||
|
golang.org/x/text v0.3.7 // indirect
|
||||||
)
|
)
|
||||||
|
|
|
||||||
28
backend/services/controller/internal/db/db.go
Normal file
28
backend/services/controller/internal/db/db.go
Normal file
|
|
@ -0,0 +1,28 @@
|
||||||
|
package db
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"go.mongodb.org/mongo-driver/mongo"
|
||||||
|
"go.mongodb.org/mongo-driver/mongo/options"
|
||||||
|
"log"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Database struct {
|
||||||
|
devices *mongo.Collection
|
||||||
|
ctx context.Context
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewDatabase(ctx context.Context, mongoUri string) Database {
|
||||||
|
var db Database
|
||||||
|
clientOptions := options.Client().ApplyURI(mongoUri)
|
||||||
|
client, err := mongo.Connect(ctx, clientOptions)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Println("Connected to MongoDB-->", mongoUri)
|
||||||
|
devices := client.Database("oktopus").Collection("devices")
|
||||||
|
db.devices = devices
|
||||||
|
db.ctx = ctx
|
||||||
|
return db
|
||||||
|
}
|
||||||
21
backend/services/controller/internal/db/device.go
Normal file
21
backend/services/controller/internal/db/device.go
Normal file
|
|
@ -0,0 +1,21 @@
|
||||||
|
package db
|
||||||
|
|
||||||
|
type Device struct {
|
||||||
|
Model string
|
||||||
|
Customer string
|
||||||
|
Vendor string
|
||||||
|
Version string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *Database) CreateDevice(device Device) error {
|
||||||
|
_, err := d.devices.InsertOne(d.ctx, device, nil)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *Database) RetrieveDevice() {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *Database) DeleteDevice() {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
@ -1,30 +0,0 @@
|
||||||
/*
|
|
||||||
Runs MQTT broker trough a Docker container.
|
|
||||||
Better approach would be to use docker api to Go language, but os/exec lib is already enough for our purpose,
|
|
||||||
since it's more convenient and easier to use docker shell commands, and it's already a start point.
|
|
||||||
*/
|
|
||||||
package mqtt
|
|
||||||
|
|
||||||
import (
|
|
||||||
"log"
|
|
||||||
"os/exec"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Get Mqtt Broker up and running
|
|
||||||
func StartMqttBroker() {
|
|
||||||
|
|
||||||
//TODO: Start Container through Docker SDK for GO, eliminating docker-compose and shell comands.
|
|
||||||
//TODO: Create Broker with user, password and CA certificate.
|
|
||||||
//TODO: Set MQTTv5 CONNACK packet with topic for agent to use.
|
|
||||||
|
|
||||||
cmd := exec.Command("sudo", "docker", "compose", "-f", "internal/mosquitto/docker-compose.yml", "up", "-d")
|
|
||||||
|
|
||||||
err := cmd.Run()
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
log.Fatal(err.Error())
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Println("Broker Mqtt Up and Running!")
|
|
||||||
}
|
|
||||||
|
|
@ -4,18 +4,17 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
"crypto/x509"
|
"crypto/x509"
|
||||||
|
"github.com/eclipse/paho.golang/paho"
|
||||||
|
"github.com/leandrofars/oktopus/internal/db"
|
||||||
usp_msg "github.com/leandrofars/oktopus/internal/usp_message"
|
usp_msg "github.com/leandrofars/oktopus/internal/usp_message"
|
||||||
"github.com/leandrofars/oktopus/internal/usp_record"
|
"github.com/leandrofars/oktopus/internal/usp_record"
|
||||||
|
"github.com/leandrofars/oktopus/internal/utils"
|
||||||
"google.golang.org/protobuf/proto"
|
"google.golang.org/protobuf/proto"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"log"
|
"log"
|
||||||
"net"
|
"net"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/eclipse/paho.golang/paho"
|
|
||||||
"github.com/leandrofars/oktopus/internal/utils"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type Mqtt struct {
|
type Mqtt struct {
|
||||||
|
|
@ -29,6 +28,7 @@ type Mqtt struct {
|
||||||
SubTopic string
|
SubTopic string
|
||||||
DevicesTopic string
|
DevicesTopic string
|
||||||
CA string
|
CA string
|
||||||
|
DB db.Database
|
||||||
}
|
}
|
||||||
|
|
||||||
var c *paho.Client
|
var c *paho.Client
|
||||||
|
|
@ -53,10 +53,6 @@ func (m *Mqtt) Connect() {
|
||||||
// Sets global client to be used by other mqtt functions
|
// Sets global client to be used by other mqtt functions
|
||||||
c = clientConfig
|
c = clientConfig
|
||||||
|
|
||||||
if conn.ReasonCode != 0 {
|
|
||||||
log.Fatalf("Failed to connect to %s : %d - %s", m.Addr, conn.ReasonCode, conn.Properties.ReasonString)
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Printf("Connected to broker--> %s:%s", m.Addr, m.Port)
|
log.Printf("Connected to broker--> %s:%s", m.Addr, m.Port)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -238,7 +234,11 @@ func (m *Mqtt) handleNewDevice(deviceMac string) {
|
||||||
Request: &usp_msg.Request{
|
Request: &usp_msg.Request{
|
||||||
ReqType: &usp_msg.Request_Get{
|
ReqType: &usp_msg.Request_Get{
|
||||||
Get: &usp_msg.Get{
|
Get: &usp_msg.Get{
|
||||||
ParamPaths: []string{"Device.DeviceInfo."},
|
ParamPaths: []string{
|
||||||
|
"Device.DeviceInfo.Manufacturer",
|
||||||
|
"Device.DeviceInfo.ModelName",
|
||||||
|
"Device.DeviceInfo.SoftwareVersion",
|
||||||
|
},
|
||||||
MaxDepth: 1,
|
MaxDepth: 1,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
|
@ -263,7 +263,6 @@ func (m *Mqtt) handleNewDevice(deviceMac string) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalln("Failed to encode address book:", err)
|
log.Fatalln("Failed to encode address book:", err)
|
||||||
}
|
}
|
||||||
time.Sleep(5 * time.Second)
|
|
||||||
m.Publish(tr369Message, "oktopus/v1/agent/"+deviceMac, "oktopus/v1/controller/"+deviceMac)
|
m.Publish(tr369Message, "oktopus/v1/agent/"+deviceMac, "oktopus/v1/controller/"+deviceMac)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -280,5 +279,17 @@ func (m *Mqtt) handleDevicesResponse(p []byte) {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Printf("Received a usp_message: %s\n", message.String())
|
var device db.Device
|
||||||
|
msg := message.Body.MsgBody.(*usp_msg.Body_Response).Response.GetGetResp()
|
||||||
|
|
||||||
|
device.Vendor = msg.ReqPathResults[0].ResolvedPathResults[0].ResultParams["Manufacturer"]
|
||||||
|
device.Model = msg.ReqPathResults[1].ResolvedPathResults[0].ResultParams["ModelName"]
|
||||||
|
device.Version = msg.ReqPathResults[2].ResolvedPathResults[0].ResultParams["SoftwareVersion"]
|
||||||
|
|
||||||
|
err = m.DB.CreateDevice(device)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Printf("New device saved at database")
|
||||||
}
|
}
|
||||||
Loading…
Reference in New Issue
Block a user