Merge pull request #32 from leandrofars/dev

Entrega 3
This commit is contained in:
Leandro Antônio Farias Machado 2023-04-26 01:24:37 -03:00 committed by GitHub
commit b09eb054e6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 350 additions and 52 deletions

View File

@ -7,9 +7,11 @@ import (
"flag"
"github.com/leandrofars/oktopus/internal/api"
"github.com/leandrofars/oktopus/internal/db"
usp_msg "github.com/leandrofars/oktopus/internal/usp_message"
"log"
"os"
"os/signal"
"sync"
"syscall"
"github.com/leandrofars/oktopus/internal/mqtt"
@ -29,6 +31,7 @@ func main() {
log.Println("Starting Oktopus Project TR-369 Controller Version:", VERSION)
// fl_endpointId := flag.String("endpoint_id", "proto::oktopus-controller", "Defines the enpoint id the Agent must trust on.")
flDevicesTopic := flag.String("d", "oktopus/devices", "That's the topic mqtt broker end new devices info.")
flDisconTopic := flag.String("dis", "oktopus/disconnect", "It's where disconnected IoTs are known.")
flSubTopic := flag.String("sub", "oktopus/+/controller/+", "That's the topic agent must publish to, and the controller keeps on listening.")
flBrokerAddr := flag.String("a", "localhost", "Mqtt broker adrress")
flBrokerPort := flag.String("p", "1883", "Mqtt broker port")
@ -53,25 +56,30 @@ func main() {
*/
ctx, cancel := context.WithCancel(context.Background())
database := db.NewDatabase(ctx, *flAddrDB)
apiMsgQueue := make(map[string](chan usp_msg.Msg))
var m sync.Mutex
/*
If you want to use another message protocol just make it implement Broker interface.
*/
mqttClient := mqtt.Mqtt{
Addr: *flBrokerAddr,
Port: *flBrokerPort,
Id: *flBrokerClientId,
User: *flBrokerUsername,
Passwd: *flBrokerPassword,
Ctx: ctx,
QoS: *flBrokerQos,
SubTopic: *flSubTopic,
DevicesTopic: *flDevicesTopic,
CA: *flTlsCert,
DB: database,
Addr: *flBrokerAddr,
Port: *flBrokerPort,
Id: *flBrokerClientId,
User: *flBrokerUsername,
Passwd: *flBrokerPassword,
Ctx: ctx,
QoS: *flBrokerQos,
SubTopic: *flSubTopic,
DevicesTopic: *flDevicesTopic,
DisconnectTopic: *flDisconTopic,
CA: *flTlsCert,
DB: database,
MsgQueue: apiMsgQueue,
QMutex: &m,
}
mtp.MtpService(&mqttClient, done)
a := api.NewApi(*flApiPort, database)
a := api.NewApi(*flApiPort, database, &mqttClient, apiMsgQueue, &m)
api.StartApi(a)
<-done

View File

@ -10,6 +10,7 @@ require (
require (
github.com/golang/snappy v0.0.1 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/gorilla/mux v1.8.0 // indirect
github.com/klauspost/compress v1.13.6 // indirect
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe // indirect

View File

@ -9,6 +9,8 @@ github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEW
github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI=
github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=

View File

@ -4,20 +4,32 @@ import (
"encoding/json"
"github.com/gorilla/mux"
"github.com/leandrofars/oktopus/internal/db"
"github.com/leandrofars/oktopus/internal/mtp"
usp_msg "github.com/leandrofars/oktopus/internal/usp_message"
"github.com/leandrofars/oktopus/internal/utils"
"go.mongodb.org/mongo-driver/mongo"
"google.golang.org/protobuf/proto"
"log"
"net/http"
"sync"
"time"
)
type Api struct {
Port string
Db db.Database
Port string
Db db.Database
Broker mtp.Broker
MsgQueue map[string](chan usp_msg.Msg)
QMutex *sync.Mutex
}
func NewApi(port string, db db.Database) Api {
func NewApi(port string, db db.Database, b mtp.Broker, msgQueue map[string](chan usp_msg.Msg), m *sync.Mutex) Api {
return Api{
Port: port,
Db: db,
Port: port,
Db: db,
Broker: b,
MsgQueue: msgQueue,
QMutex: m,
}
}
@ -28,7 +40,7 @@ func StartApi(a Api) {
return
})
r.HandleFunc("/devices", a.retrieveDevices)
//r.HandleFunc("/devices/{sn}", a.devicesMessaging)
r.HandleFunc("/devices/{sn}/get", a.deviceGetMsg)
srv := &http.Server{
Addr: "0.0.0.0:" + a.Port,
@ -61,3 +73,61 @@ func (a *Api) retrieveDevices(w http.ResponseWriter, r *http.Request) {
return
}
func (a *Api) deviceGetMsg(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
sn := vars["sn"]
_, err := a.Db.RetrieveDevice(sn)
if err != nil {
if err == mongo.ErrNoDocuments {
w.WriteHeader(http.StatusBadRequest)
json.NewEncoder(w).Encode("No device with serial number " + sn + " was found")
return
}
w.WriteHeader(http.StatusInternalServerError)
return
}
var receiver usp_msg.Get
//data := []byte(`{"param_paths": {'Device.DeviceInfo.'},"max_depth": 2}`)
//data := []byte("'opa'")
//var jsonBlob = []byte(`{
// "param_paths": ["Device.DeviceInfo.","Device.ManagementServer."],
// "max_depth": 2
//}`)
err = json.NewDecoder(r.Body).Decode(receiver)
if err != nil {
log.Println(err)
w.WriteHeader(http.StatusBadRequest)
return
}
msg := utils.NewGetMsg(receiver)
encodedMsg, err := proto.Marshal(&msg)
if err != nil {
log.Println(err)
w.WriteHeader(http.StatusBadRequest)
return
}
record := utils.NewUspRecord(encodedMsg, sn)
tr369Message, err := proto.Marshal(&record)
if err != nil {
log.Fatalln("Failed to encode tr369 record:", err)
}
//a.Broker.Request(tr369Message, usp_msg.Header_GET, "oktopus/v1/agent/"+sn, "oktopus/v1/get/"+sn)
a.MsgQueue[msg.Header.MsgId] = make(chan usp_msg.Msg)
a.Broker.Publish(tr369Message, "oktopus/v1/agent/"+sn, "oktopus/v1/api/"+sn)
select {
case msg := <-a.MsgQueue[msg.Header.MsgId]:
log.Printf("Received Msg")
json.NewEncoder(w).Encode(msg)
return
case <-time.After(time.Second * 5):
log.Printf("Request Timed Out")
w.WriteHeader(http.StatusGatewayTimeout)
json.NewEncoder(w).Encode("Request Timed Out")
return
}
}

View File

@ -13,6 +13,7 @@ type Device struct {
Customer string
Vendor string
Version string
Status uint8
}
func (d *Database) CreateDevice(device Device) error {
@ -45,6 +46,16 @@ func (d *Database) RetrieveDevices() ([]Device, error) {
return results, nil
}
func (d *Database) RetrieveDevice(sn string) (Device, error) {
var result Device
//TODO: filter devices by user ownership
err := d.devices.FindOne(d.ctx, bson.D{{"sn", sn}}, nil).Decode(&result)
if err != nil {
log.Println(err)
}
return result, err
}
func (d *Database) DeleteDevice() {
}

View File

@ -0,0 +1,21 @@
package db
import (
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"log"
)
func (d *Database) UpdateStatus(sn string, status uint8) error {
var result bson.M
err := d.devices.FindOneAndUpdate(d.ctx, bson.D{{"sn", sn}}, bson.D{{"$set", bson.D{{"status", status}}}}).Decode(&result)
if err != nil {
if err == mongo.ErrNoDocuments {
log.Printf("Device %s is not mapped into database", sn)
return nil
}
log.Println(err)
}
log.Printf("%s is now offline.", sn)
return err
}

View File

@ -18,17 +18,20 @@ import (
)
type Mqtt struct {
Addr string
Port string
Id string
User string
Passwd string
Ctx context.Context
QoS int
SubTopic string
DevicesTopic string
CA string
DB db.Database
Addr string
Port string
Id string
User string
Passwd string
Ctx context.Context
QoS int
SubTopic string
DevicesTopic string
DisconnectTopic string
CA string
DB db.Database
MsgQueue map[string](chan usp_msg.Msg)
QMutex *sync.Mutex
}
var c *paho.Client
@ -38,8 +41,10 @@ var c *paho.Client
func (m *Mqtt) Connect() {
devices := make(chan *paho.Publish)
controller := make(chan *paho.Publish)
go m.messageHandler(devices, controller)
clientConfig := m.startClient(devices, controller)
disconnect := make(chan *paho.Publish)
apiMsg := make(chan *paho.Publish)
go m.messageHandler(devices, controller, disconnect, apiMsg)
clientConfig := m.startClient(devices, controller, disconnect, apiMsg)
connParameters := startConnection(m.Id, m.User, m.Passwd)
conn, err := clientConfig.Connect(m.Ctx, &connParameters)
@ -67,8 +72,10 @@ func (m *Mqtt) Disconnect() {
func (m *Mqtt) Subscribe() {
if _, err := c.Subscribe(m.Ctx, &paho.Subscribe{
Subscriptions: map[string]paho.SubscribeOptions{
m.SubTopic: {QoS: byte(m.QoS), NoLocal: true},
m.DevicesTopic: {QoS: byte(m.QoS), NoLocal: true},
m.SubTopic: {QoS: byte(m.QoS), NoLocal: true},
m.DevicesTopic: {QoS: byte(m.QoS), NoLocal: true},
m.DisconnectTopic: {QoS: byte(m.QoS), NoLocal: true},
"oktopus/+/api/+": {QoS: byte(m.QoS), NoLocal: true},
},
}); err != nil {
log.Fatalln(err)
@ -76,6 +83,8 @@ func (m *Mqtt) Subscribe() {
log.Printf("Subscribed to %s", m.SubTopic)
log.Printf("Subscribed to %s", m.DevicesTopic)
log.Printf("Subscribed to %s", m.DisconnectTopic)
}
func (m *Mqtt) Publish(msg []byte, topic, respTopic string) {
@ -96,12 +105,16 @@ func (m *Mqtt) Publish(msg []byte, topic, respTopic string) {
/* -------------------------------------------------------------------------- */
func (m *Mqtt) startClient(devices, controller chan *paho.Publish) *paho.Client {
func (m *Mqtt) startClient(devices, controller, disconnect, apiMsg chan *paho.Publish) *paho.Client {
singleHandler := paho.NewSingleHandlerRouter(func(p *paho.Publish) {
if p.Topic == m.DevicesTopic {
devices <- p
} else if strings.Contains(p.Topic, "controller") {
controller <- p
} else if p.Topic == m.DisconnectTopic {
disconnect <- p
} else if strings.Contains(p.Topic, "api") {
apiMsg <- p
} else {
log.Println("No handler for topic: ", p.Topic)
}
@ -210,7 +223,7 @@ func startConnection(id, user, pass string) paho.Connect {
return connParameters
}
func (m *Mqtt) messageHandler(devices, controller chan *paho.Publish) {
func (m *Mqtt) messageHandler(devices, controller, disconnect, apiMsg chan *paho.Publish) {
for {
select {
case d := <-devices:
@ -218,11 +231,43 @@ func (m *Mqtt) messageHandler(devices, controller chan *paho.Publish) {
log.Println("New device: ", payload)
m.handleNewDevice(payload)
case c := <-controller:
m.handleDevicesResponse(c.Payload)
topic := c.Topic
sn := strings.Split(topic, "/")
m.handleNewDevicesResponse(c.Payload, sn[3])
case dis := <-disconnect:
payload := string(dis.Payload)
log.Println("Device disconnected: ", payload)
m.handleDevicesDisconnect(payload)
case api := <-apiMsg:
log.Println("Handle api request")
m.handleApiRequest(api.Payload)
}
}
}
func (m *Mqtt) handleApiRequest(api []byte) {
var record usp_record.Record
err := proto.Unmarshal(api, &record)
if err != nil {
log.Println(err)
}
//TODO: verify record operation type
var msg usp_msg.Msg
err = proto.Unmarshal(record.GetNoSessionContext().Payload, &msg)
if err != nil {
log.Println(err)
}
if _, ok := m.MsgQueue[msg.Header.MsgId]; ok {
//m.QMutex.Lock()
m.MsgQueue[msg.Header.MsgId] <- msg
//m.QMutex.Unlock()
} else {
log.Printf("Message answer to request %s arrived too late", msg.Header.MsgId)
}
}
func (m *Mqtt) handleNewDevice(deviceMac string) {
payload := usp_msg.Msg{
Header: &usp_msg.Header{
@ -248,26 +293,16 @@ func (m *Mqtt) handleNewDevice(deviceMac string) {
},
}
teste, _ := proto.Marshal(&payload)
record := usp_record.Record{
Version: "0.1",
ToId: deviceMac,
FromId: "leleco",
PayloadSecurity: usp_record.Record_PLAINTEXT,
RecordType: &usp_record.Record_NoSessionContext{
NoSessionContext: &usp_record.NoSessionContextRecord{
Payload: teste,
},
},
}
record := utils.NewUspRecord(teste, deviceMac)
tr369Message, err := proto.Marshal(&record)
if err != nil {
log.Fatalln("Failed to encode address book:", err)
log.Fatalln("Failed to encode tr369 record:", err)
}
m.Publish(tr369Message, "oktopus/v1/agent/"+deviceMac, "oktopus/v1/controller/"+deviceMac)
}
func (m *Mqtt) handleDevicesResponse(p []byte) {
func (m *Mqtt) handleNewDevicesResponse(p []byte, sn string) {
var record usp_record.Record
var message usp_msg.Msg
@ -286,10 +321,24 @@ func (m *Mqtt) handleDevicesResponse(p []byte) {
device.Vendor = msg.ReqPathResults[0].ResolvedPathResults[0].ResultParams["Manufacturer"]
device.Model = msg.ReqPathResults[1].ResolvedPathResults[0].ResultParams["ModelName"]
device.Version = msg.ReqPathResults[2].ResolvedPathResults[0].ResultParams["SoftwareVersion"]
device.SN = msg.ReqPathResults[3].ResolvedPathResults[0].ResultParams["SerialNumber"]
device.SN = sn
device.Status = utils.Online
err = m.DB.CreateDevice(device)
if err != nil {
log.Fatal(err)
}
}
func (m *Mqtt) handleDevicesDisconnect(p string) {
// Update status of device at database
err := m.DB.UpdateStatus(p, utils.Offline)
if err != nil {
log.Fatal(err)
}
}
/*
func (m *Mqtt) Request(msg []byte, msgType usp_msg.Header_MsgType, pubTopic string, respTopic string) {
m.Publish(msg, pubTopic, respTopic)
}*/

View File

@ -0,0 +1,52 @@
package mqtt
//
//import (
// usp_msg "github.com/leandrofars/oktopus/internal/usp_message"
// "github.com/leandrofars/oktopus/internal/usp_record"
// "google.golang.org/protobuf/proto"
// "log"
//)
//
//func SendGetMsg(sn string) {
// payload := usp_msg.Msg{
// Header: &usp_msg.Header{
// MsgId: "uniqueIdentifierForThismessage",
// MsgType: usp_msg.Header_GET,
// },
// Body: &usp_msg.Body{
// MsgBody: &usp_msg.Body_Request{
// Request: &usp_msg.Request{
// ReqType: &usp_msg.Request_Get{
// Get: &usp_msg.Get{
// ParamPaths: []string{
// "Device.DeviceInfo.Manufacturer",
// "Device.DeviceInfo.ModelName",
// "Device.DeviceInfo.SoftwareVersion",
// },
// MaxDepth: 1,
// },
// },
// },
// },
// },
// }
// teste, _ := proto.Marshal(&payload)
// record := usp_record.Record{
// Version: "0.1",
// ToId: sn,
// FromId: "leleco",
// PayloadSecurity: usp_record.Record_PLAINTEXT,
// RecordType: &usp_record.Record_NoSessionContext{
// NoSessionContext: &usp_record.NoSessionContextRecord{
// Payload: teste,
// },
// },
// }
//
// tr369Message, err := proto.Marshal(&record)
// if err != nil {
// log.Fatalln("Failed to encode address book:", err)
// }
// m.Publish(tr369Message, "oktopus/v1/agent/"+deviceMac, "oktopus/v1/controller/"+deviceMac)
//}

View File

@ -15,6 +15,11 @@ type Broker interface {
Disconnect()
Publish(msg []byte, topic, respTopic string)
Subscribe()
/*
At request method we're able to send a message to a topic
and wait until we have a response (in the same topic).
*/
//Request(msg []byte, msgType usp_msg.Header_MsgType, pubTopic string, subTopic string)
}
// Not used, since we are using a broker solution with MQTT.

View File

@ -1,9 +1,19 @@
package utils
import (
"github.com/google/uuid"
usp_msg "github.com/leandrofars/oktopus/internal/usp_message"
"github.com/leandrofars/oktopus/internal/usp_record"
"net"
)
//Status are saved at database as numbers
const (
Online = iota
Associating
Offline
)
// Get interfaces MACs, and the first interface MAC is gonna be used as mqtt clientId
func GetMacAddr() ([]string, error) {
ifas, err := net.Interfaces()
@ -19,3 +29,35 @@ func GetMacAddr() ([]string, error) {
}
return as, nil
}
func NewUspRecord(p []byte, toId string) usp_record.Record {
return usp_record.Record{
Version: "0.1",
ToId: toId,
FromId: "leleco",
PayloadSecurity: usp_record.Record_PLAINTEXT,
RecordType: &usp_record.Record_NoSessionContext{
NoSessionContext: &usp_record.NoSessionContextRecord{
Payload: p,
},
},
}
}
func NewGetMsg(getStuff usp_msg.Get) usp_msg.Msg {
return usp_msg.Msg{
Header: &usp_msg.Header{
MsgId: uuid.NewString(),
MsgType: usp_msg.Header_GET,
},
Body: &usp_msg.Body{
MsgBody: &usp_msg.Body_Request{
Request: &usp_msg.Request{
ReqType: &usp_msg.Request_Get{
Get: &getStuff,
},
},
},
},
}
}

View File

@ -32,14 +32,16 @@
"username": "leandro",
"filters": {
"oktopus/+/agent/+": 1,
"oktopus/+/controller/+": 2
"oktopus/+/controller/+": 2,
"oktopus/+/get/+": 2
}
},
{
"username": "steve",
"filters": {
"oktopus/+/agent/+": 1,
"oktopus/+/controller/+": 2
"oktopus/+/controller/+": 2,
"oktopus/+/get/+": 2
}
},
{

View File

@ -15,8 +15,10 @@ import (
"strings"
"syscall"
rv8 "github.com/go-redis/redis/v8"
"github.com/mochi-co/mqtt/v2"
"github.com/mochi-co/mqtt/v2/hooks/auth"
"github.com/mochi-co/mqtt/v2/hooks/storage/redis"
"github.com/mochi-co/mqtt/v2/listeners"
)
@ -34,6 +36,7 @@ var server = mqtt.New(&mqtt.Options{
func main() {
tcpAddr := flag.String("tcp", ":1883", "network address for TCP listener")
redisAddr := flag.String("redis", "172.17.0.2:6379", "host address of redis db")
wsAddr := flag.String("ws", "", "network address for Websocket listener")
infoAddr := flag.String("info", "", "network address for web info dashboard listener")
path := flag.String("path", "", "path to data auth file")
@ -98,6 +101,17 @@ func main() {
log.Fatal(err)
}
err = server.AddHook(new(redis.Hook), &redis.Options{
Options: &rv8.Options{
Addr: *redisAddr, // default redis address
Password: "", // your password
DB: 0, // your redis db
},
})
if err != nil {
log.Fatal(err)
}
go func() {
err := server.Serve()
if err != nil {
@ -123,6 +137,7 @@ func (h *MyHook) ID() string {
func (h *MyHook) Provides(b byte) bool {
return bytes.Contains([]byte{
mqtt.OnSubscribed,
mqtt.OnDisconnect,
}, []byte{b})
}
@ -131,6 +146,26 @@ func (h *MyHook) Init(config any) error {
return nil
}
func (h *MyHook) Red(config any) error {
h.Log.Info().Msg("initialised")
return nil
}
func (h *MyHook) OnDisconnect(cl *mqtt.Client, err error, expire bool) {
var clUser string
if len(cl.Properties.Props.User) > 0 {
clUser = cl.Properties.Props.User[0].Val
}
if clUser != "" {
sn := strings.Split(clUser, "-")
err := server.Publish("oktopus/disconnect", []byte(sn[1]), false, 1)
if err != nil {
log.Println("server publish error: ", err)
}
}
}
func (h *MyHook) OnSubscribed(cl *mqtt.Client, pk packets.Packet, reasonCodes []byte) {
// Verifies if it's a device who is subscribed
if strings.Contains(pk.Filters[0].Filter, "oktopus/v1/agent") {