From 06a28f6b60b6310f8e84ab0cc2bac7ef29b72563 Mon Sep 17 00:00:00 2001 From: leandrofars Date: Sun, 31 Mar 2024 16:07:59 -0300 Subject: [PATCH] fix(ws): missing code because of gitignore | closes #206 --- backend/services/mtp/ws/.gitignore | 1 - backend/services/mtp/ws/cmd/ws/main.go | 27 +++ .../mtp/ws/internal/ws/handler/client.go | 219 ++++++++++++++++++ .../mtp/ws/internal/ws/handler/hub.go | 151 ++++++++++++ backend/services/mtp/ws/internal/ws/ws.go | 42 ++++ 5 files changed, 439 insertions(+), 1 deletion(-) create mode 100644 backend/services/mtp/ws/cmd/ws/main.go create mode 100644 backend/services/mtp/ws/internal/ws/handler/client.go create mode 100644 backend/services/mtp/ws/internal/ws/handler/hub.go create mode 100644 backend/services/mtp/ws/internal/ws/ws.go diff --git a/backend/services/mtp/ws/.gitignore b/backend/services/mtp/ws/.gitignore index bb855f3..ad0ec79 100644 --- a/backend/services/mtp/ws/.gitignore +++ b/backend/services/mtp/ws/.gitignore @@ -1,3 +1,2 @@ .env.local -ws *.pem \ No newline at end of file diff --git a/backend/services/mtp/ws/cmd/ws/main.go b/backend/services/mtp/ws/cmd/ws/main.go new file mode 100644 index 0000000..2a49c94 --- /dev/null +++ b/backend/services/mtp/ws/cmd/ws/main.go @@ -0,0 +1,27 @@ +package main + +import ( + "log" + "os" + "os/signal" + "syscall" + + "github.com/OktopUSP/oktopus/ws/internal/config" + "github.com/OktopUSP/oktopus/ws/internal/ws" +) + +func main() { + + done := make(chan os.Signal, 1) + + conf := config.NewConfig() + + // Locks app running until it receives a stop command as Ctrl+C. + signal.Notify(done, syscall.SIGINT) + + ws.StartNewServer(conf) + + <-done + + log.Println("(⌐■_■) Websockets server is out!") +} diff --git a/backend/services/mtp/ws/internal/ws/handler/client.go b/backend/services/mtp/ws/internal/ws/handler/client.go new file mode 100644 index 0000000..e2f297e --- /dev/null +++ b/backend/services/mtp/ws/internal/ws/handler/client.go @@ -0,0 +1,219 @@ +package handler + +import ( + "log" + "net/http" + "strings" + "time" + + "github.com/OktopUSP/oktopus/ws/internal/usp_record" + "github.com/gorilla/websocket" + "google.golang.org/protobuf/proto" +) + +const ( + // Time allowed to write a message to the peer. + writeWait = 30 * time.Second + + // Time allowed to read the next pong message from the peer. + pongWait = 10 * time.Second + + // Send pings to peer with this period. Must be less than pongWait. + pingPeriod = (pongWait * 9) / 10 + + // Maximum message size allowed from peer. + //maxMessageSize = 512 + + // Websockets version of the protocol + wsVersion = "13" + + // USP specification version + uspVersion = "v1.usp" +) + +var ( + newline = []byte{'\n'} + space = []byte{' '} + upgrader = websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, + CheckOrigin: func(r *http.Request) bool { return true }, + } +) + +// Client is a middleman between the websocket connection and the hub. +type Client struct { + hub *Hub + + //Websockets client endpoint id, eid follows usp specification + eid string + + // The websocket connection. + conn *websocket.Conn + + // Buffered channel of outbound messages. + send chan message +} + +// readPump pumps messages from the websocket connection to the hub. +// +// The application runs readPump in a per-connection goroutine. The application +// ensures that there is at most one reader on a connection by executing all +// reads from this goroutine. +// cEID = controller endpoint id +func (c *Client) readPump(cEID string) { + defer func() { + c.hub.unregister <- c + c.conn.Close() + }() + //c.conn.SetReadLimit(maxMessageSize) + c.conn.SetReadDeadline(time.Now().Add(pongWait)) + c.conn.SetPongHandler(func(string) error { c.conn.SetReadDeadline(time.Now().Add(pongWait)); return nil }) + for { + _, data, err := c.conn.ReadMessage() + if err != nil { + if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { + log.Printf("error: %v", err) + } + break + } + message := constructMsg(cEID, c.eid, data) + c.hub.broadcast <- message + } +} + +func constructMsg(eid string, from string, data []byte) message { + if eid == "" { + var record usp_record.Record + err := proto.Unmarshal(data, &record) + if err != nil { + log.Println(err) + } + eid = record.ToId + } + return message{ + eid: eid, + from: from, + data: data, + msgType: websocket.BinaryMessage, + } +} + +// writePump pumps messages from the hub to the websocket connection. +// +// A goroutine running writePump is started for each connection. The +// application ensures that there is at most one writer to a connection by +// executing all writes from this goroutine. +func (c *Client) writePump() { + ticker := time.NewTicker(pingPeriod) + defer func() { + ticker.Stop() + c.conn.Close() + }() + for { + select { + case message, ok := <-c.send: + c.conn.SetWriteDeadline(time.Now().Add(writeWait)) + if !ok { + // The hub closed the channel. + log.Println("The hub closed the channel of", c.eid) + c.conn.WriteMessage(websocket.CloseMessage, []byte{}) + return + } + + w, err := c.conn.NextWriter(message.msgType) + if err != nil { + return + } + w.Write(message.data) + + // Add queued messages to the current websocket message. + n := len(c.send) + for i := 0; i < n; i++ { + w.Write(newline) + send := <-c.send + w.Write(send.data) + } + + if err := w.Close(); err != nil { + return + } + case <-ticker.C: + c.conn.SetWriteDeadline(time.Now().Add(writeWait)) + if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil { + return + } + } + } +} + +// Handle USP Controller events +func ServeController(w http.ResponseWriter, r *http.Request, token, cEID string, authEnable bool) { + if authEnable { + recv_token := r.URL.Query().Get("token") + if recv_token != token { + w.WriteHeader(http.StatusUnauthorized) + w.Write([]byte("Unauthorized")) + return + } + } + + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + log.Println(err) + return + } + + client := &Client{hub: hub, eid: cEID, conn: conn, send: make(chan message)} + client.hub.register <- client + + go client.writePump() + go client.readPump("") +} + +// Handle USP Agent events, cEID = controller endpoint id +func ServeAgent(w http.ResponseWriter, r *http.Request, cEID string) { + + header := http.Header{ + "Sec-Websocket-Protocol": {uspVersion}, + "Sec-Websocket-Version": {wsVersion}, + } + + deviceid := extractDeviceId(r.Header) + if deviceid == "" { + w.WriteHeader(http.StatusBadRequest) + log.Println("Device id not found") + w.Write([]byte("Device id not found")) + return + } + + conn, err := upgrader.Upgrade(w, r, header) + if err != nil { + log.Println(err) + return + } + + client := &Client{hub: hub, eid: deviceid, conn: conn, send: make(chan message)} + client.hub.register <- client + + // Allow collection of memory referenced by the caller by doing all work in + // new goroutines. + go client.writePump() + go client.readPump(cEID) +} + +// gets device id from websockets header +func extractDeviceId(header http.Header) string { + + // Header must be like that: bbf-usp-protocol; eid="" is the same ar the record.FromId/record.ToId + // log.Println("Header sec-websocket-extensions:", header.Get("sec-websocket-extensions")) + wsHeaderExtension := header.Get("sec-websocket-extensions") + + // Split the input string by double quotes + deviceid := strings.Split(wsHeaderExtension, "\"") + if len(deviceid) < 2 { + return "" + } + + return deviceid[1] +} diff --git a/backend/services/mtp/ws/internal/ws/handler/hub.go b/backend/services/mtp/ws/internal/ws/handler/hub.go new file mode 100644 index 0000000..d4f5070 --- /dev/null +++ b/backend/services/mtp/ws/internal/ws/handler/hub.go @@ -0,0 +1,151 @@ +package handler + +import ( + "encoding/json" + "log" + + "github.com/gorilla/websocket" +) + +// Keeps the content and the destination of a websockets message +type message struct { + // Websockets client endpoint id, eid follows usp specification. + // This field is needed for us to know which agent or controller + // the message is intended to be delivered to. + eid string + data []byte + msgType int + from string +} + +// Hub maintains the set of active clients and broadcasts messages to the +// clients. +type Hub struct { + // Registered clients. + clients map[string]*Client + + // Inbound messages from the clients. + broadcast chan message + + // Register requests from the clients. + register chan *Client + + // Unregister requests from clients. + unregister chan *Client +} + +const ( + OFFLINE = "0" + ONLINE = "1" +) + +type deviceStatus struct { + Eid string + Status string +} + +// Global hub instance +var hub *Hub + +// Controller Endpoint ID +var ceid string + +func InitHandlers(eid string) { + ceid = eid + log.Println("New hub, Controller eid:", ceid) + hub = newHub() + hub.run() +} + +func newHub() *Hub { + return &Hub{ + broadcast: make(chan message), + register: make(chan *Client), + unregister: make(chan *Client), + clients: make(map[string]*Client), + } +} + +func (h *Hub) run() { + for { + select { + case client := <-h.register: + // register new eid + h.clients[client.eid] = client + if client.eid != ceid{ + log.Printf("New device connected: %s", client.eid) + data, _ := json.Marshal(deviceStatus{client.eid, ONLINE}) + msg := message{ + from: "WS server", + eid: ceid, + data: data, + msgType: websocket.TextMessage, + } + log.Printf("%++v", msg) + if c, ok := h.clients[msg.eid]; ok { + select { + // send message to receiver client + case c.send <- msg: + log.Printf("Sent a message %s --> %s", msg.from, msg.eid) + default: + // in case the msg sending fails, close the client connection + // because it means that the client is no longer active + log.Printf("Failed to send a msg to %s, disconnecting client...", msg.eid) + close(c.send) + delete(h.clients, c.eid) + } + } + }else{ + log.Printf("New controller connected: %s", client.eid) + } + + case client := <-h.unregister: + // verify if eid exists + if _, ok := h.clients[client.eid]; ok { + // delete eid from map of connections + delete(h.clients, client.eid) + // close client messages receiving channel + close(client.send) + } + log.Println("Disconnected client", client.eid) + data, _ := json.Marshal(deviceStatus{client.eid, OFFLINE}) + msg := message{ + from: "WS server", + eid: ceid, + data: data, + msgType: websocket.TextMessage, + } + if c, ok := h.clients[msg.eid]; ok { + select { + // send message to receiver client + case c.send <- msg: + log.Printf("Sent a message %s --> %s", msg.from, msg.eid) + default: + // in case the msg sending fails, close the client connection + // because it means that the client is no longer active + log.Printf("Failed to send a msg to %s, disconnecting client...", msg.eid) + close(c.send) + delete(h.clients, c.eid) + } + } + case message := <-h.broadcast: + log.Println("send message to", message.eid) + // verify if eid exists + if c, ok := h.clients[message.eid]; ok { + select { + // send message to receiver client + case c.send <- message: + log.Printf("Sent a message %s --> %s", message.from, message.eid) + default: + // in case the message sending fails, close the client connection + // because it means that the client is no longer active + log.Printf("Failed to send a message to %s, disconnecting client...", message.eid) + close(c.send) + delete(h.clients, c.eid) + } + } else { + log.Printf("Message receiver not found: %s", message.eid) + } + } + } +} diff --git a/backend/services/mtp/ws/internal/ws/ws.go b/backend/services/mtp/ws/internal/ws/ws.go new file mode 100644 index 0000000..08e4eb4 --- /dev/null +++ b/backend/services/mtp/ws/internal/ws/ws.go @@ -0,0 +1,42 @@ +package ws + +// Websockets server implementation inspired by https://github.com/gorilla/websocket/tree/main/examples/chat + +import ( + "log" + "net/http" + + "github.com/OktopUSP/oktopus/ws/internal/config" + "github.com/OktopUSP/oktopus/ws/internal/ws/handler" + "github.com/gorilla/mux" +) + +// Starts New Websockets Server +func StartNewServer(c config.Config) { + // Initialize handlers of websockets events + go handler.InitHandlers(c.ControllerEID) + + r := mux.NewRouter() + r.HandleFunc("/ws/agent", func(w http.ResponseWriter, r *http.Request) { + handler.ServeAgent(w, r, c.ControllerEID) + }) + r.HandleFunc("/ws/controller", func(w http.ResponseWriter, r *http.Request) { + handler.ServeController(w, r, c.Token, c.ControllerEID, c.Auth) + }) + + go func() { + if c.Tls { + log.Println("Websockets server running with TLS") + err := http.ListenAndServeTLS(c.Port, "cert.pem", "key.pem", r) + if err != nil { + log.Fatal("ListenAndServeTLS: ", err) + } + } else { + log.Println("Websockets server running at port", c.Port) + err := http.ListenAndServe(c.Port, r) + if err != nil { + log.Fatal("ListenAndServe: ", err) + } + } + }() +}