diff --git a/backend/services/ws/internal/ws/handler/client.go b/backend/services/ws/internal/ws/handler/client.go new file mode 100644 index 0000000..0e6032f --- /dev/null +++ b/backend/services/ws/internal/ws/handler/client.go @@ -0,0 +1,173 @@ +package handler + +import ( + "bytes" + "log" + "net/http" + "strings" + "time" + + "github.com/gorilla/websocket" +) + +const ( + // Time allowed to write a message to the peer. + writeWait = 10 * time.Second + + // Time allowed to read the next pong message from the peer. + pongWait = 60 * time.Second + + // Send pings to peer with this period. Must be less than pongWait. + pingPeriod = (pongWait * 9) / 10 + + // Maximum message size allowed from peer. + maxMessageSize = 512 + + // Websockets version of the protocol + wsVersion = "13" + + // USP specification version + uspVersion = "v1.usp" +) + +var ( + newline = []byte{'\n'} + space = []byte{' '} + upgrader = websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, + CheckOrigin: func(r *http.Request) bool { return true }, + } +) + +// Client is a middleman between the websocket connection and the hub. +type Client struct { + hub *Hub + + //Websockets client endpoint id, eid follows usp specification + eid string + + // The websocket connection. + conn *websocket.Conn + + // Buffered channel of outbound messages. + send chan []byte +} + +// readPump pumps messages from the websocket connection to the hub. +// +// The application runs readPump in a per-connection goroutine. The application +// ensures that there is at most one reader on a connection by executing all +// reads from this goroutine. +func (c *Client) readPump() { + defer func() { + c.hub.unregister <- c + c.conn.Close() + }() + c.conn.SetReadLimit(maxMessageSize) + c.conn.SetReadDeadline(time.Now().Add(pongWait)) + c.conn.SetPongHandler(func(string) error { c.conn.SetReadDeadline(time.Now().Add(pongWait)); return nil }) + for { + _, data, err := c.conn.ReadMessage() + if err != nil { + if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { + log.Printf("error: %v", err) + } + break + } + data = bytes.TrimSpace(bytes.Replace(data, newline, space, -1)) + message := message{ + eid: "oktopusController", + data: data, + } + c.hub.broadcast <- message + } +} + +// writePump pumps messages from the hub to the websocket connection. +// +// A goroutine running writePump is started for each connection. The +// application ensures that there is at most one writer to a connection by +// executing all writes from this goroutine. +func (c *Client) writePump() { + ticker := time.NewTicker(pingPeriod) + defer func() { + ticker.Stop() + c.conn.Close() + }() + for { + select { + case message, ok := <-c.send: + c.conn.SetWriteDeadline(time.Now().Add(writeWait)) + if !ok { + // The hub closed the channel. + c.conn.WriteMessage(websocket.CloseMessage, []byte{}) + return + } + + w, err := c.conn.NextWriter(websocket.TextMessage) + if err != nil { + return + } + w.Write(message) + + // Add queued messages to the current websocket message. + n := len(c.send) + for i := 0; i < n; i++ { + w.Write(newline) + w.Write(<-c.send) + } + + if err := w.Close(); err != nil { + return + } + case <-ticker.C: + c.conn.SetWriteDeadline(time.Now().Add(writeWait)) + if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil { + return + } + } + } +} + +// Handle USP Agent events +func ServeAgent(w http.ResponseWriter, r *http.Request) { + header := http.Header{ + "Sec-Websocket-Protocol": {uspVersion}, + "Sec-Websocket-Version": {wsVersion}, + } + + deviceid := extractDeviceId(r.Header) + if deviceid == "" { + w.WriteHeader(http.StatusBadRequest) + w.Write([]byte("Device id not found")) + return + } + + conn, err := upgrader.Upgrade(w, r, header) + if err != nil { + log.Println(err) + return + } + + client := &Client{hub: hub, eid: deviceid, conn: conn, send: make(chan []byte, 256)} + client.hub.register <- client + + // Allow collection of memory referenced by the caller by doing all work in + // new goroutines. + go client.writePump() + go client.readPump() +} + +// gets device id from websockets header +func extractDeviceId(header http.Header) string { + + // Header must be like that: bbf-usp-protocol; eid="" + log.Println("Header sec-websocket-extensions:", header.Get("sec-websocket-extensions")) + wsHeaderExtension := header.Get("sec-websocket-extensions") + + // Split the input string by double quotes + deviceid := strings.Split(wsHeaderExtension, "\"") + + return deviceid[1] +} diff --git a/backend/services/ws/internal/ws/handler/hub.go b/backend/services/ws/internal/ws/handler/hub.go new file mode 100644 index 0000000..e56cdbe --- /dev/null +++ b/backend/services/ws/internal/ws/handler/hub.go @@ -0,0 +1,82 @@ +package handler + +import "log" + +// Keeps the content and the destination of a websockets message +type message struct { + // Websockets client endpoint id, eid follows usp specification. + // This field is needed for us to know which agent or controller + // the message is intended to be delivered to. + eid string + data []byte +} + +// Hub maintains the set of active clients and broadcasts messages to the +// clients. +type Hub struct { + // Registered clients. + clients map[string]*Client + + // Inbound messages from the clients. + broadcast chan message + + // Register requests from the clients. + register chan *Client + + // Unregister requests from clients. + unregister chan *Client +} + +// Global hub instance +var hub *Hub + +func InitHandlers() { + hub = newHub() + hub.run() +} + +func newHub() *Hub { + return &Hub{ + broadcast: make(chan message), + register: make(chan *Client), + unregister: make(chan *Client), + clients: make(map[string]*Client), + } +} + +func (h *Hub) run() { + for { + select { + case client := <-h.register: + // register new eid + h.clients[client.eid] = client + log.Printf("New client connected: %s", client.eid) + case client := <-h.unregister: + // verify if eid exists + if _, ok := h.clients[client.eid]; ok { + // delete eid form map of connections + delete(h.clients, client.eid) + // close client messages receiving channel + close(client.send) + } + log.Println("Disconnected client", client.eid) + case message := <-h.broadcast: + // verify if eid exists + if c, ok := h.clients[message.eid]; ok { + select { + // send message to receiver client + case c.send <- message.data: + log.Printf("Sent a message to %s", message.eid) + default: + // in case the message sending fails, close the client connection + // because it means that the client is no longer active + log.Printf("Failed to send a message to %s, disconnecting client...", message.eid) + close(c.send) + delete(h.clients, c.eid) + } + } else { + log.Printf("Message receiver not found: %s", message.eid) + } + } + } +} diff --git a/backend/services/ws/internal/ws/ws.go b/backend/services/ws/internal/ws/ws.go index 894a98c..4e72d47 100644 --- a/backend/services/ws/internal/ws/ws.go +++ b/backend/services/ws/internal/ws/ws.go @@ -1,47 +1,33 @@ package ws +// Websockets server implementation inspired by https://github.com/gorilla/websocket/tree/main/examples/chat + import ( "log" "net/http" + "github.com/OktopUSP/oktopus/ws/internal/ws/handler" "github.com/gorilla/mux" - "github.com/gorilla/websocket" ) -var upgrader = websocket.Upgrader{ - ReadBufferSize: 1024, - WriteBufferSize: 1024, - CheckOrigin: func(r *http.Request) bool { return true }, -} - +// Starts New Websockets Server func StartNewServer() { + // Initialize handlers of websockets events + go handler.InitHandlers() + r := mux.NewRouter() - r.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) { - - header := http.Header{ - "Sec-Websocket-Protocol": {"v1.usp"}, - "Sec-Websocket-Version": {"13"}, - } - - conn, err := upgrader.Upgrade(w, r, header) - if err != nil { - log.Println(err) - } - for { - _, p, err := conn.ReadMessage() - if err != nil { - log.Println("Error to read message:", err) - return - } - log.Println("Message", string(p)) - } + r.HandleFunc("/ws/agent", func(w http.ResponseWriter, r *http.Request) { + handler.ServeAgent(w, r) + }) + r.HandleFunc("/ws/controller", func(w http.ResponseWriter, r *http.Request) { + //TODO: Implement controller handler }) log.Println("Websockets server running") + // Blocks application running until it receives a KILL signal err := http.ListenAndServe(":8080", r) if err != nil { log.Fatal("ListenAndServe: ", err) } - }