diff --git a/backend/services/controller/cmd/oktopus/main.go b/backend/services/controller/cmd/oktopus/main.go index 3978781..f56aa37 100755 --- a/backend/services/controller/cmd/oktopus/main.go +++ b/backend/services/controller/cmd/oktopus/main.go @@ -25,10 +25,9 @@ func main() { log.SetFlags(log.LstdFlags | log.Lshortfile) log.Println("Starting Oktopus Project TR-369 Controller Version:", VERSION) - //flBroker := flag.Bool("m", false, "Defines if mosquitto container must run or not") // fl_endpointId := flag.String("endpoint_id", "proto::oktopus-controller", "Defines the enpoint id the Agent must trust on.") - flSubTopic := flag.String("s", "oktopus/+/agent/+", "That's the topic agent must publish to, and the controller keeps on listening.") - // fl_pub_topic := flag.String("pub_topic", "oktopus/v1/controller", "That's the topic controller must publish to, and the agent keeps on listening.") + flDevicesTopic := flag.String("d", "oktopus/devices", "That's the topic mqtt broker end new devices info.") + flSubTopic := flag.String("sub", "oktopus/+/controller/+", "That's the topic agent must publish to, and the controller keeps on listening.") flBrokerAddr := flag.String("a", "localhost", "Mqtt broker adrress") flBrokerPort := flag.String("p", "1883", "Mqtt broker port") flTlsCert := flag.String("ca", "", "TLS ca certificate") @@ -44,10 +43,6 @@ func main() { flag.Usage() os.Exit(0) } - //if *flBroker { - // log.Println("Starting Mqtt Broker") - // mqtt.StartMqttBroker() - //} /* This context suppress our needs, but we can use a more sofisticate approach with cancel and timeout options passing it through paho mqtt functions. @@ -58,15 +53,16 @@ func main() { If you want to use another message protocol just make it implement Broker interface. */ mqttClient := mqtt.Mqtt{ - Addr: *flBrokerAddr, - Port: *flBrokerPort, - Id: *flBrokerClientId, - User: *flBrokerUsername, - Passwd: *flBrokerPassword, - Ctx: ctx, - QoS: *flBrokerQos, - SubTopic: *flSubTopic, - CA: *flTlsCert, + Addr: *flBrokerAddr, + Port: *flBrokerPort, + Id: *flBrokerClientId, + User: *flBrokerUsername, + Passwd: *flBrokerPassword, + Ctx: ctx, + QoS: *flBrokerQos, + SubTopic: *flSubTopic, + DevicesTopic: *flDevicesTopic, + CA: *flTlsCert, } mtp.MtpService(&mqttClient, done) diff --git a/backend/services/controller/internal/mqtt/mqtt-client.go b/backend/services/controller/internal/mqtt/mqtt-client.go index 0a49591..9df597f 100644 --- a/backend/services/controller/internal/mqtt/mqtt-client.go +++ b/backend/services/controller/internal/mqtt/mqtt-client.go @@ -4,25 +4,31 @@ import ( "context" "crypto/tls" "crypto/x509" + usp_msg "github.com/leandrofars/oktopus/internal/usp_message" + "github.com/leandrofars/oktopus/internal/usp_record" + "google.golang.org/protobuf/proto" "io/ioutil" "log" "net" + "strings" "sync" + "time" "github.com/eclipse/paho.golang/paho" "github.com/leandrofars/oktopus/internal/utils" ) type Mqtt struct { - Addr string - Port string - Id string - User string - Passwd string - Ctx context.Context - QoS int - SubTopic string - CA string + Addr string + Port string + Id string + User string + Passwd string + Ctx context.Context + QoS int + SubTopic string + DevicesTopic string + CA string } var c *paho.Client @@ -30,15 +36,20 @@ var c *paho.Client /* ------------------- Implementations of broker interface ------------------ */ func (m *Mqtt) Connect() { - msgChan := make(chan *paho.Publish) - go messageHandler(msgChan) - clientConfig := startClient(m.Addr, m.Port, m.CA, m.Ctx, msgChan) + devices := make(chan *paho.Publish) + controller := make(chan *paho.Publish) + go m.messageHandler(devices, controller) + clientConfig := m.startClient(devices, controller) connParameters := startConnection(m.Id, m.User, m.Passwd) conn, err := clientConfig.Connect(m.Ctx, &connParameters) if err != nil { log.Println(err) } + if conn.ReasonCode != 0 { + log.Fatalf("Failed to connect to %s : %d - %s", m.Addr+m.Port, conn.ReasonCode, conn.Properties.ReasonString) + } + // Sets global client to be used by other mqtt functions c = clientConfig @@ -60,24 +71,48 @@ func (m *Mqtt) Disconnect() { func (m *Mqtt) Subscribe() { if _, err := c.Subscribe(m.Ctx, &paho.Subscribe{ Subscriptions: map[string]paho.SubscribeOptions{ - m.SubTopic: {QoS: byte(m.QoS), NoLocal: true}, + m.SubTopic: {QoS: byte(m.QoS), NoLocal: true}, + m.DevicesTopic: {QoS: byte(m.QoS), NoLocal: true}, }, }); err != nil { log.Fatalln(err) } log.Printf("Subscribed to %s", m.SubTopic) + log.Printf("Subscribed to %s", m.DevicesTopic) +} + +func (m *Mqtt) Publish(msg []byte, topic, respTopic string) { + if _, err := c.Publish(context.Background(), &paho.Publish{ + Topic: topic, + QoS: byte(m.QoS), + Retain: false, + Payload: msg, + Properties: &paho.PublishProperties{ + ResponseTopic: respTopic, + }, + }); err != nil { + log.Println("error sending message:", err) + } + + log.Printf("Published to %s", topic) } /* -------------------------------------------------------------------------- */ -func startClient(addr string, port string, tlsCa string, ctx context.Context, msgChan chan *paho.Publish) *paho.Client { - singleHandler := paho.NewSingleHandlerRouter(func(m *paho.Publish) { - msgChan <- m +func (m *Mqtt) startClient(devices, controller chan *paho.Publish) *paho.Client { + singleHandler := paho.NewSingleHandlerRouter(func(p *paho.Publish) { + if p.Topic == m.DevicesTopic { + devices <- p + } else if strings.Contains(p.Topic, "controller") { + controller <- p + } else { + log.Println("No handler for topic: ", p.Topic) + } }) - if tlsCa != "" { - conn := connWithTls(tlsCa, addr+":"+port, ctx) + if m.CA != "" { + conn := connWithTls(m.CA, m.Addr+":"+m.Port, m.Ctx) clientConfig := paho.ClientConfig{ Conn: conn, Router: singleHandler, @@ -91,7 +126,7 @@ func startClient(addr string, port string, tlsCa string, ctx context.Context, ms return paho.NewClient(clientConfig) } - conn, err := net.Dial("tcp", addr+":"+port) + conn, err := net.Dial("tcp", m.Addr+":"+m.Port) if err != nil { log.Println(err) } @@ -179,8 +214,71 @@ func startConnection(id, user, pass string) paho.Connect { return connParameters } -func messageHandler(msg chan *paho.Publish) { - for m := range msg { - log.Println("Received message:", string(m.Payload)) +func (m *Mqtt) messageHandler(devices, controller chan *paho.Publish) { + for { + select { + case d := <-devices: + payload := string(d.Payload) + log.Println("New device: ", payload) + m.handleNewDevice(payload) + case c := <-controller: + m.handleDevicesResponse(c.Payload) + } } } + +func (m *Mqtt) handleNewDevice(deviceMac string) { + payload := usp_msg.Msg{ + Header: &usp_msg.Header{ + MsgId: "uniqueIdentifierForThismessage", + MsgType: usp_msg.Header_GET, + }, + Body: &usp_msg.Body{ + MsgBody: &usp_msg.Body_Request{ + Request: &usp_msg.Request{ + ReqType: &usp_msg.Request_Get{ + Get: &usp_msg.Get{ + ParamPaths: []string{"Device.DeviceInfo."}, + MaxDepth: 1, + }, + }, + }, + }, + }, + } + teste, _ := proto.Marshal(&payload) + record := usp_record.Record{ + Version: "0.1", + ToId: deviceMac, + FromId: "leleco", + PayloadSecurity: usp_record.Record_PLAINTEXT, + RecordType: &usp_record.Record_NoSessionContext{ + NoSessionContext: &usp_record.NoSessionContextRecord{ + Payload: teste, + }, + }, + } + + tr369Message, err := proto.Marshal(&record) + if err != nil { + log.Fatalln("Failed to encode address book:", err) + } + time.Sleep(5 * time.Second) + m.Publish(tr369Message, "oktopus/v1/agent/"+deviceMac, "oktopus/v1/controller/"+deviceMac) +} + +func (m *Mqtt) handleDevicesResponse(p []byte) { + var record usp_record.Record + var message usp_msg.Msg + + err := proto.Unmarshal(p, &record) + if err != nil { + log.Fatal(err) + } + err = proto.Unmarshal(record.GetNoSessionContext().Payload, &message) + if err != nil { + log.Fatal(err) + } + + log.Printf("Received a usp_message: %s\n", message.String()) +} diff --git a/backend/services/controller/internal/mtp/mtp.go b/backend/services/controller/internal/mtp/mtp.go index d910f31..5ba3583 100644 --- a/backend/services/controller/internal/mtp/mtp.go +++ b/backend/services/controller/internal/mtp/mtp.go @@ -13,7 +13,7 @@ import ( type Broker interface { Connect() Disconnect() - // Publish() + Publish(msg []byte, topic, respTopic string) Subscribe() } diff --git a/backend/services/mochi/img.png b/backend/services/mochi/img.png new file mode 100644 index 0000000..1e2f50a Binary files /dev/null and b/backend/services/mochi/img.png differ