diff --git a/backend/.gitignore b/backend/.gitignore index 7e6433d..555aa08 100644 --- a/backend/.gitignore +++ b/backend/.gitignore @@ -8,4 +8,8 @@ main *.out go.work *.log -*.db \ No newline at end of file +*.db +*.crt +*.key +*.csr +*.db.new \ No newline at end of file diff --git a/backend/cmd/oktopus/main.go b/backend/cmd/oktopus/main.go index ce016d3..3d6c14f 100755 --- a/backend/cmd/oktopus/main.go +++ b/backend/cmd/oktopus/main.go @@ -1,34 +1,39 @@ -// Made by Leandro Antônio Farias Machado (leandrofars@gmail.com) +// Made by Leandro Antônio Farias Machado package main import ( + "context" "flag" "log" "os" "os/signal" "syscall" - "github.com/eclipse/paho.golang/paho" "github.com/leandrofars/oktopus/internal/mqtt" + "github.com/leandrofars/oktopus/internal/mtp" ) const VERSION = "0.0.1" func main() { done := make(chan os.Signal, 1) + + // Locks app running until it receives a stop command as Ctrl+C. signal.Notify(done, syscall.SIGINT, syscall.SIGTERM) log.Println("Starting Oktopus Project TR-369 Controller Version:", VERSION) - fl_broker := flag.Bool("mosquitto", false, "Defines if mosquitto container must run or not") + fl_broker := flag.Bool("m", false, "Defines if mosquitto container must run or not") // fl_endpointId := flag.String("endpoint_id", "proto::oktopus-controller", "Defines the enpoint id the Agent must trust on.") - // fl_sub_topic := flag.String("sub_topic", "oktopus/v1/agent", "That's the topic agent must publish to, and the controller keeps on listening.") + fl_sub_topic := flag.String("s", "oktopus/v1/agent", "That's the topic agent must publish to, and the controller keeps on listening.") // fl_pub_topic := flag.String("pub_topic", "oktopus/v1/controller", "That's the topic controller must publish to, and the agent keeps on listening.") - fl_broker_addr := flag.String("broker_addr", "localhost", "Mqtt broker adrress") - fl_broker_port := flag.String("broker_port", "1883", "Mqtt broker port") - fl_broker_username := flag.String("broker_user", "", "Mqtt broker username") - fl_broker_password := flag.String("password", "", "Mqtt broker password") - fl_broker_clientid := flag.String("clientid", "", "A clientid for the Mqtt connection") + fl_broker_addr := flag.String("a", "localhost", "Mqtt broker adrress") + fl_broker_port := flag.String("p", "1883", "Mqtt broker port") + fl_tls_cert := flag.String("ca", "", "TLS ca certificate") + fl_broker_username := flag.String("u", "", "Mqtt broker username") + fl_broker_password := flag.String("P", "", "Mqtt broker password") + fl_broker_clientid := flag.String("i", "", "A clientid for the Mqtt connection") + fl_broker_qos := flag.Int("q", 2, "Quality of service of mqtt messages delivery") fl_help := flag.Bool("help", false, "Help") flag.Parse() @@ -41,24 +46,31 @@ func main() { log.Println("Starting Mqtt Broker") mqtt.StartMqttBroker() } + /* + This context suppress our needs, but we can use a more sofisticade + approach with cancel and timeout options passing it through paho mqtt functions. + */ + ctx := context.Background() - newClient := mqtt.StartMqttClient(fl_broker_addr, fl_broker_port) + /* + If you want to use another message protocol just make it implement Broker interface. + */ + mqttClient := mqtt.Mqtt{ + Addr: *fl_broker_addr, + Port: *fl_broker_port, + Id: *fl_broker_clientid, + User: *fl_broker_username, + Passwd: *fl_broker_password, + Ctx: ctx, + QoS: *fl_broker_qos, + SubTopic: *fl_sub_topic, + CA: *fl_tls_cert, + } - newConnection := mqtt.StartNewConnection(*fl_broker_clientid, *fl_broker_username, *fl_broker_password) - - mqtt.ConnectMqttBroker(newClient, newConnection, fl_broker_addr) + mtp.MtpService(&mqttClient, done) <-done - log.Println("Disconnecting broker") - if newClient != nil { - d := &paho.Disconnect{ReasonCode: 0} - err := newClient.Disconnect(d) - if err != nil { - log.Fatalf("failed to send Disconnect: %s", err) - } - } - log.Println("(⌐■_■) Oktopus is out!") } diff --git a/backend/internal/mqtt/docker-compose.yml b/backend/internal/mqtt/docker-compose.yml index d0fd3f0..3aa7f56 100755 --- a/backend/internal/mqtt/docker-compose.yml +++ b/backend/internal/mqtt/docker-compose.yml @@ -8,5 +8,6 @@ services: - ./mosquitto/config:/mosquitto/config - ./mosquitto/data:/mosquitto/data - ./mosquitto/log:/mosquitto/log + - ./mosquitto/creds:/mosquitto/creds ports: - - 1883:1883 \ No newline at end of file + - 8883:8883 \ No newline at end of file diff --git a/backend/internal/mqtt/mosquitto/config/mosquitto.conf b/backend/internal/mqtt/mosquitto/config/mosquitto.conf index bd4eba0..399e22c 100755 --- a/backend/internal/mqtt/mosquitto/config/mosquitto.conf +++ b/backend/internal/mqtt/mosquitto/config/mosquitto.conf @@ -1,6 +1,12 @@ allow_anonymous true -listener 1883 +listener 8883 persistence true persistence_location /mosquitto/data connection_messages true log_dest file /mosquitto/log/mosquitto.log +cafile /mosquitto/creds/ca.crt +certfile /mosquitto/creds/server.crt +keyfile /mosquitto/creds/server.key + +# require_certificate true +# use_identity_as_username true \ No newline at end of file diff --git a/backend/internal/mqtt/mosquitto/creds/doc.md b/backend/internal/mqtt/mosquitto/creds/doc.md new file mode 100644 index 0000000..2aebd93 --- /dev/null +++ b/backend/internal/mqtt/mosquitto/creds/doc.md @@ -0,0 +1 @@ +Inside this folder you must save mosquitto ssl certificates. Those are sensitive information! do not share. You can follow this guide to generate necessary certificates: https://mosquitto.org/man/mosquitto-tls-7.html or that one from Mr. Steve which i found more useful and complete http://www.steves-internet-guide.com/mosquitto-tls/. It is important to use broker with ssl to encrypt messages and to avoid someone bad intentioned to read messages exchange between the server and IoTs. \ No newline at end of file diff --git a/backend/internal/mqtt/mqtt-client.go b/backend/internal/mqtt/mqtt-client.go index e724185..ee41f2a 100644 --- a/backend/internal/mqtt/mqtt-client.go +++ b/backend/internal/mqtt/mqtt-client.go @@ -2,19 +2,87 @@ package mqtt import ( "context" + "crypto/tls" + "crypto/x509" + "io/ioutil" "log" "net" + "sync" "github.com/eclipse/paho.golang/paho" "github.com/leandrofars/oktopus/internal/utils" ) -func StartMqttClient(addr, port *string) *paho.Client { +type Mqtt struct { + Addr string + Port string + Id string + User string + Passwd string + Ctx context.Context + QoS int + SubTopic string + CA string +} - conn, err := net.Dial("tcp", *addr+":"+*port) +var c *paho.Client + +/* ------------------- Implementations of broker interface ------------------ */ + +func (m *Mqtt) Connect() { + clientConfig := startClient(m.Addr, m.Port, m.CA, m.Ctx) + connParameters := startConnection(m.Id, m.User, m.Passwd) + + conn, err := clientConfig.Connect(m.Ctx, &connParameters) if err != nil { log.Fatal(err) } + // Sets global client to be used by other mqtt functions + c = clientConfig + + if conn.ReasonCode != 0 { + log.Fatalf("Failed to connect to %s : %d - %s", m.Addr, conn.ReasonCode, conn.Properties.ReasonString) + } + + log.Printf("Connected to broker--> %s:%s", m.Addr, m.Port) +} + +func (m *Mqtt) Disconnect() { + d := &paho.Disconnect{ReasonCode: 0} + err := c.Disconnect(d) + if err != nil { + log.Fatalf("failed to send Disconnect: %s", err) + } +} + +func (m *Mqtt) Subscribe() { + if _, err := c.Subscribe(m.Ctx, &paho.Subscribe{ + Subscriptions: map[string]paho.SubscribeOptions{ + m.SubTopic: {QoS: byte(m.QoS), NoLocal: true}, + }, + }); err != nil { + log.Fatalln(err) + } + + log.Printf("Subscribed to %s", m.SubTopic) +} + +/* -------------------------------------------------------------------------- */ + +func startClient(addr string, port string, tlsCa string, ctx context.Context) *paho.Client { + + if tlsCa != "" { + conn := conntWithTls(tlsCa, addr+":"+port, ctx) + clientConfig := paho.ClientConfig{ + Conn: conn, + } + return paho.NewClient(clientConfig) + } + + conn, err := net.Dial("tcp", addr+":"+port) + if err != nil { + log.Println(err) + } clientConfig := paho.ClientConfig{ Conn: conn, @@ -23,14 +91,57 @@ func StartMqttClient(addr, port *string) *paho.Client { return paho.NewClient(clientConfig) } -func StartNewConnection(id, user, pass string) paho.Connect { +func conntWithTls(tlsCa, address string, ctx context.Context) net.Conn { + ca, err := ioutil.ReadFile(tlsCa) + if err != nil { + log.Fatal(err) + } + + roots := x509.NewCertPool() + ok := roots.AppendCertsFromPEM(ca) + if !ok { + panic("failed to parse root certificate") + } + + config := &tls.Config{ + // After going to cloud, certificates must match names and we must take this option below + InsecureSkipVerify: true, + RootCAs: roots, + } + + d := tls.Dialer{ + Config: config, + } + + conn, err := d.DialContext(ctx, "tcp", address) + if err != nil { + log.Fatal(err) + } + + conn = newThreadSafeConnection(conn) + + return conn +} + +// Custom net.Conn with thread safety +func newThreadSafeConnection(c net.Conn) net.Conn { + type threadSafeConn struct { + net.Conn + sync.Locker + } + + return &threadSafeConn{ + Conn: c, + Locker: &sync.Mutex{}, + } +} + +func startConnection(id, user, pass string) paho.Connect { connParameters := paho.Connect{ KeepAlive: 30, ClientID: id, CleanStart: true, - Username: user, - Password: []byte(pass), } if id != "" { @@ -51,16 +162,4 @@ func StartNewConnection(id, user, pass string) paho.Connect { } return connParameters - -} - -func ConnectMqttBroker(c *paho.Client, cp paho.Connect, addr *string) { - conn, err := c.Connect(context.Background(), &cp) - if err != nil { - log.Fatal(err) - } - - if conn.ReasonCode != 0 { - log.Fatalf("Failed to connect to %s : %d - %s", *addr, conn.ReasonCode, conn.Properties.ReasonString) - } } diff --git a/backend/internal/mtp/mtp.go b/backend/internal/mtp/mtp.go new file mode 100644 index 0000000..d910f31 --- /dev/null +++ b/backend/internal/mtp/mtp.go @@ -0,0 +1,37 @@ +// Defines an interface to be implemented by the choosen MTP. +package mtp + +import ( + "log" + "os" +) + +/* + Message Transfer Protocol layer, which can use WebSockets, MQTT, COAP or STOMP; as defined in tr369 protocol. + It was made thinking in a broker architeture instead of a server-client p2p. +*/ +type Broker interface { + Connect() + Disconnect() + // Publish() + Subscribe() +} + +// Not used, since we are using a broker solution with MQTT. +type P2P interface { +} + +// Start the service which enable the communication with IoTs (MTP protocol layer). +func MtpService(b Broker, done chan os.Signal) { + b.Connect() + go func() { + for range done { + b.Disconnect() + log.Println("Successfully disconnected to broker!") + + // Receives signal and then replicates it to the rest of the app. + done <- os.Interrupt + } + }() + b.Subscribe() +}