package mqtt import ( "context" "log" "net/url" "strconv" "strings" "sync" "time" "github.com/eclipse/paho.golang/autopaho" "github.com/eclipse/paho.golang/paho" "github.com/leandrofars/oktopus/internal/db" "github.com/leandrofars/oktopus/internal/mtp/handler" usp_msg "github.com/leandrofars/oktopus/internal/usp_message" "github.com/leandrofars/oktopus/internal/usp_record" "github.com/leandrofars/oktopus/internal/utils" "google.golang.org/protobuf/proto" ) type Mqtt struct { Addr string Port string Id string User string Passwd string Ctx context.Context QoS int SubTopic string DevicesTopic string TLS bool DB db.Database MsgQueue map[string](chan usp_msg.Msg) QMutex *sync.Mutex } const ( ONLINE = iota OFFLINE ) var c *autopaho.ConnectionManager /* ------------------- Implementations of broker interface ------------------ */ func (m *Mqtt) Connect() { broker, _ := url.Parse("tcp://" + m.Addr + ":" + m.Port) status := make(chan *paho.Publish) controller := make(chan *paho.Publish) apiMsg := make(chan *paho.Publish) go m.messageHandler(status, controller, apiMsg) pahoClientConfig := m.buildClientConfig(status, controller, apiMsg) autopahoClientConfig := autopaho.ClientConfig{ BrokerUrls: []*url.URL{broker}, KeepAlive: 30, ConnectRetryDelay: 5 * time.Second, ConnectTimeout: 5 * time.Second, OnConnectionUp: func(cm *autopaho.ConnectionManager, connAck *paho.Connack) { log.Printf("Connected to MQTT broker--> %s:%s", m.Addr, m.Port) m.Subscribe() }, OnConnectError: func(err error) { log.Printf("Error while attempting connection: %s\n", err) }, ClientConfig: *pahoClientConfig, } if m.User != "" && m.Passwd != "" { autopahoClientConfig.SetUsernamePassword(m.User, []byte(m.Passwd)) } log.Println("MQTT client id:", pahoClientConfig.ClientID) log.Println("MQTT username:", m.User) log.Println("MQTT password:", m.Passwd) cm, err := autopaho.NewConnection(m.Ctx, autopahoClientConfig) if err != nil { log.Fatalln(err) } c = cm } func (m *Mqtt) Disconnect() { log.Println("Disconnecting from MQTT broker...") err := c.Disconnect(m.Ctx) if err != nil { log.Fatalf("failed to send Disconnect: %s", err) } } func (m *Mqtt) Subscribe() { if _, err := c.Subscribe(m.Ctx, &paho.Subscribe{ Subscriptions: map[string]paho.SubscribeOptions{ m.SubTopic: {QoS: byte(m.QoS)}, m.DevicesTopic: {QoS: byte(m.QoS)}, "oktopus/+/api/+": {QoS: byte(m.QoS)}, }, }); err != nil { log.Fatalln(err) } log.Printf("Subscribed to %s", m.SubTopic) log.Printf("Subscribed to %s", m.DevicesTopic) log.Printf("Subscribed to %s", "oktopus/+/api/+") } func (m *Mqtt) Publish(msg []byte, topic, respTopic string, retain bool) { if _, err := c.Publish(context.Background(), &paho.Publish{ Topic: topic, QoS: byte(m.QoS), Retain: retain, Payload: msg, Properties: &paho.PublishProperties{ ResponseTopic: respTopic, }, }); err != nil { log.Println("error sending message:", err) } log.Printf("Published to %s", topic) } /* -------------------------------------------------------------------------- */ func (m *Mqtt) buildClientConfig(status, controller, apiMsg chan *paho.Publish) *paho.ClientConfig { log.Println("Starting new MQTT client") singleHandler := paho.NewSingleHandlerRouter(func(p *paho.Publish) { if strings.Contains(p.Topic, "status") { status <- p } else if strings.Contains(p.Topic, "controller") { controller <- p } else if strings.Contains(p.Topic, "api") { apiMsg <- p } else { log.Println("No handler for topic: ", p.Topic) } }) clientConfig := paho.ClientConfig{} clientConfig = paho.ClientConfig{ //Conn: conn, Router: singleHandler, OnServerDisconnect: func(d *paho.Disconnect) { if d.Properties != nil { log.Printf("Requested disconnect: %s\n , properties reason: %s\n", clientConfig.ClientID, d.Properties.ReasonString) } else { log.Printf("Requested disconnect; %s reason code: %d\n", clientConfig.ClientID, d.ReasonCode) } }, OnClientError: func(err error) { log.Println(err) }, } if m.Id != "" { clientConfig.ClientID = m.Id } else { mac, err := utils.GetMacAddr() if err != nil { log.Fatal(err) } clientConfig.ClientID = mac[0] } return &clientConfig } func (m *Mqtt) messageHandler(status, controller, apiMsg chan *paho.Publish) { for { select { case d := <-status: paths := strings.Split(d.Topic, "/") device := paths[len(paths)-1] payload, err := strconv.Atoi(string(d.Payload)) if err != nil { log.Println("Status topic payload message type error") log.Fatalln(err) } if payload == ONLINE { log.Println("Device connected:", device) tr369Message := handler.HandleNewDevice(device) m.Publish(tr369Message, "oktopus/v1/agent/"+device, "oktopus/v1/controller/"+device, false) //m.deleteRetainedMessage(d, device) } else if payload == OFFLINE { log.Println("Device disconnected:1", device) m.handleDevicesDisconnect(device) //m.deleteRetainedMessage(d, device) } else { log.Println("Status topic payload message type error") } case c := <-controller: topic := c.Topic sn := strings.Split(topic, "/") device := handler.HandleNewDevicesResponse(c.Payload, sn[3], db.MQTT) err := m.DB.CreateDevice(device) if err != nil { log.Fatal(err) } case api := <-apiMsg: log.Println("Handle api request") m.handleApiRequest(api.Payload) } } } //TODO: handle device status at mochi redis //func (m *Mqtt) deleteRetainedMessage(message *paho.Publish, deviceMac string) { // m.Publish([]byte(""), "oktopus/v1/status/"+deviceMac, "", true) // log.Println("Message contains the retain flag, deleting it, as it's already received") //} func (m *Mqtt) handleApiRequest(api []byte) { var record usp_record.Record err := proto.Unmarshal(api, &record) if err != nil { log.Println(err) } var msg usp_msg.Msg err = proto.Unmarshal(record.GetNoSessionContext().Payload, &msg) if err != nil { log.Println(err) } if _, ok := m.MsgQueue[msg.Header.MsgId]; ok { //m.QMutex.Lock() m.MsgQueue[msg.Header.MsgId] <- msg //m.QMutex.Unlock() } else { log.Printf("Message answer to request %s arrived too late", msg.Header.MsgId) } } func (m *Mqtt) handleDevicesDisconnect(p string) { // Update status of device at database err := m.DB.UpdateStatus(p, db.Offline, db.MQTT) if err != nil { log.Fatal(err) } } /* func (m *Mqtt) Request(msg []byte, msgType usp_msg.Header_MsgType, pubTopic string, respTopic string) { m.Publish(msg, pubTopic, respTopic) }*/