feat(ws+controller): basic implementation

This commit is contained in:
leandrofars 2024-02-10 10:04:31 -03:00
parent f31e5159c9
commit 7d1df69a69
20 changed files with 8785 additions and 155 deletions

View File

@ -8,7 +8,6 @@ import (
"time"
"github.com/leandrofars/oktopus/internal/db"
"github.com/leandrofars/oktopus/internal/utils"
)
type StatusCount struct {
@ -23,6 +22,7 @@ type GeneralInfo struct {
VendorsCount []db.VendorsCount
}
// TODO: fix when mqtt broker is not set don't break api
func (a *Api) generalInfo(w http.ResponseWriter, r *http.Request) {
var result GeneralInfo
@ -49,10 +49,10 @@ func (a *Api) generalInfo(w http.ResponseWriter, r *http.Request) {
}
for _, v := range statuscount {
switch v.Status {
case utils.Online:
switch db.Status(v.Status) {
case db.Online:
result.StatusCount.Online = v.Count
case utils.Offline:
case db.Offline:
result.StatusCount.Offline = v.Count
}
}
@ -120,10 +120,10 @@ func (a *Api) statusInfo(w http.ResponseWriter, r *http.Request) {
var status StatusCount
for _, v := range vendors {
switch v.Status {
case utils.Online:
switch db.Status(v.Status) {
case db.Online:
status.Online = v.Count
case utils.Offline:
case db.Offline:
status.Offline = v.Count
}
}

View File

@ -2,11 +2,14 @@ package db
import (
"context"
"log"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"log"
)
//TODO: create another package fo structs and interfaces
type Database struct {
devices *mongo.Collection
users *mongo.Collection

View File

@ -17,6 +17,14 @@ const (
WEBSOCKETS
)
type Status uint8
const (
Offline Status = iota
Associating
Online
)
type Device struct {
SN string
Model string
@ -24,14 +32,38 @@ type Device struct {
Vendor string
Version string
ProductClass string
Status uint8
MTP []map[string]string
Status Status
Mqtt Status
Stomp Status
Websockets Status
}
// TODO: don't change device status of other MTP
func (d *Database) CreateDevice(device Device) error {
var result bson.M
var deviceExistent Device
/* ------------------ Do not overwrite status of other mtp ------------------ */
err := d.devices.FindOne(d.ctx, bson.D{{"sn", device.SN}}, nil).Decode(&deviceExistent)
if err != nil && err != mongo.ErrNoDocuments {
log.Println(err)
return err
}
if deviceExistent.Mqtt == Online {
device.Mqtt = Online
}
if deviceExistent.Stomp == Online {
device.Stomp = Online
}
if deviceExistent.Websockets == Online {
device.Websockets = Online
}
/* -------------------------------------------------------------------------- */
opts := options.FindOneAndReplace().SetUpsert(true)
err := d.devices.FindOneAndReplace(d.ctx, bson.D{{"sn", device.SN}}, device, opts).Decode(&result)
err = d.devices.FindOneAndReplace(d.ctx, bson.D{{"sn", device.SN}}, device, opts).Decode(&result)
if err != nil {
if err == mongo.ErrNoDocuments {
log.Printf("New device %s added to database", device.SN)

View File

@ -4,13 +4,57 @@ import (
"log"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
)
// TODO: fix this function to also change device status at different mtp
func (d *Database) UpdateStatus(sn string, status uint8) error {
var result bson.M
err := d.devices.FindOneAndUpdate(d.ctx, bson.D{{"sn", sn}}, bson.D{{"$set", bson.D{{"status", status}}}}).Decode(&result)
func (d *Database) UpdateStatus(sn string, status Status, mtp MTP) error {
var result Device
err := d.devices.FindOne(d.ctx, bson.D{{"sn", sn}}, nil).Decode(&result)
if err != nil {
log.Println(err)
}
//TODO: abolish this logic, find another approach, microservices design maybe?
/*
In case the device status is online, we must check if the mtp
changing is going to affect the global status. In case it does,
we must update the global status accordingly.
*/
/*
mix the existent device status to the updated one
*/
switch mtp {
case MQTT:
result.Mqtt = status
case STOMP:
result.Stomp = status
case WEBSOCKETS:
result.Websockets = status
}
/*
check if the global status needs update
*/
var globalStatus primitive.E
if result.Mqtt == Offline && result.Stomp == Offline && result.Websockets == Offline {
globalStatus = primitive.E{"status", Offline}
}
if result.Mqtt == Online || result.Stomp == Online || result.Websockets == Online {
globalStatus = primitive.E{"status", Online}
}
_, err = d.devices.UpdateOne(d.ctx, bson.D{{"sn", sn}}, bson.D{
{
"$set", bson.D{
{mtp.String(), status},
globalStatus,
},
},
})
if err != nil {
if err == mongo.ErrNoDocuments {
log.Printf("Device %s is not mapped into database", sn)

View File

@ -12,6 +12,7 @@ import (
"github.com/eclipse/paho.golang/autopaho"
"github.com/eclipse/paho.golang/paho"
"github.com/leandrofars/oktopus/internal/db"
"github.com/leandrofars/oktopus/internal/mtp/handler"
usp_msg "github.com/leandrofars/oktopus/internal/usp_message"
"github.com/leandrofars/oktopus/internal/usp_record"
"github.com/leandrofars/oktopus/internal/utils"
@ -184,7 +185,8 @@ func (m *Mqtt) messageHandler(status, controller, apiMsg chan *paho.Publish) {
}
if payload == ONLINE {
log.Println("Device connected:", device)
m.handleNewDevice(device)
tr369Message := handler.HandleNewDevice(device)
m.Publish(tr369Message, "oktopus/v1/agent/"+device, "oktopus/v1/controller/"+device, false)
//m.deleteRetainedMessage(d, device)
} else if payload == OFFLINE {
log.Println("Device disconnected:1", device)
@ -196,7 +198,11 @@ func (m *Mqtt) messageHandler(status, controller, apiMsg chan *paho.Publish) {
case c := <-controller:
topic := c.Topic
sn := strings.Split(topic, "/")
m.handleNewDevicesResponse(c.Payload, sn[3])
device := handler.HandleNewDevicesResponse(c.Payload, sn[3], db.MQTT)
err := m.DB.CreateDevice(device)
if err != nil {
log.Fatal(err)
}
case api := <-apiMsg:
log.Println("Handle api request")
m.handleApiRequest(api.Payload)
@ -232,79 +238,9 @@ func (m *Mqtt) handleApiRequest(api []byte) {
}
}
func (m *Mqtt) handleNewDevice(deviceMac string) {
payload := usp_msg.Msg{
Header: &usp_msg.Header{
MsgId: "uniqueIdentifierForThismessage",
MsgType: usp_msg.Header_GET,
},
Body: &usp_msg.Body{
MsgBody: &usp_msg.Body_Request{
Request: &usp_msg.Request{
ReqType: &usp_msg.Request_Get{
Get: &usp_msg.Get{
ParamPaths: []string{
"Device.DeviceInfo.Manufacturer",
"Device.DeviceInfo.ModelName",
"Device.DeviceInfo.SoftwareVersion",
"Device.DeviceInfo.SerialNumber",
"Device.DeviceInfo.ProductClass",
},
MaxDepth: 1,
},
},
},
},
},
}
teste, _ := proto.Marshal(&payload)
record := utils.NewUspRecord(teste, deviceMac)
tr369Message, err := proto.Marshal(&record)
if err != nil {
log.Fatalln("Failed to encode tr369 record:", err)
}
m.Publish(tr369Message, "oktopus/v1/agent/"+deviceMac, "oktopus/v1/controller/"+deviceMac, false)
}
func (m *Mqtt) handleNewDevicesResponse(p []byte, sn string) {
var record usp_record.Record
var message usp_msg.Msg
err := proto.Unmarshal(p, &record)
if err != nil {
log.Fatal(err)
}
err = proto.Unmarshal(record.GetNoSessionContext().Payload, &message)
if err != nil {
log.Fatal(err)
}
var device db.Device
msg := message.Body.MsgBody.(*usp_msg.Body_Response).Response.GetGetResp()
device.Vendor = msg.ReqPathResults[0].ResolvedPathResults[0].ResultParams["Manufacturer"]
device.Model = msg.ReqPathResults[1].ResolvedPathResults[0].ResultParams["ModelName"]
device.Version = msg.ReqPathResults[2].ResolvedPathResults[0].ResultParams["SoftwareVersion"]
device.ProductClass = msg.ReqPathResults[4].ResolvedPathResults[0].ResultParams["ProductClass"]
device.SN = sn
mtp := map[string]string{
db.MQTT.String(): "online",
}
device.MTP = append(device.MTP, mtp)
device.Status = utils.Online
err = m.DB.CreateDevice(device)
if err != nil {
log.Fatal(err)
}
}
func (m *Mqtt) handleDevicesDisconnect(p string) {
// Update status of device at database
err := m.DB.UpdateStatus(p, utils.Offline)
err := m.DB.UpdateStatus(p, db.Offline, db.MQTT)
if err != nil {
log.Fatal(err)
}

View File

@ -0,0 +1,70 @@
package handler
import (
"log"
"github.com/leandrofars/oktopus/internal/db"
usp_msg "github.com/leandrofars/oktopus/internal/usp_message"
"github.com/leandrofars/oktopus/internal/usp_record"
"github.com/leandrofars/oktopus/internal/utils"
"google.golang.org/protobuf/proto"
)
func HandleNewDevice(deviceMac string) []byte {
payload := utils.NewGetMsg(usp_msg.Get{
ParamPaths: []string{
"Device.DeviceInfo.Manufacturer",
"Device.DeviceInfo.ModelName",
"Device.DeviceInfo.SoftwareVersion",
"Device.DeviceInfo.SerialNumber",
"Device.DeviceInfo.ProductClass",
},
MaxDepth: 1,
})
teste, _ := proto.Marshal(&payload)
record := utils.NewUspRecord(teste, deviceMac)
tr369Message, err := proto.Marshal(&record)
if err != nil {
log.Fatalln("Failed to encode tr369 record:", err)
}
return tr369Message
}
func HandleNewDevicesResponse(p []byte, sn string, mtp db.MTP) db.Device {
var record usp_record.Record
var message usp_msg.Msg
err := proto.Unmarshal(p, &record)
if err != nil {
log.Fatal(err)
}
err = proto.Unmarshal(record.GetNoSessionContext().Payload, &message)
if err != nil {
log.Fatal(err)
}
var device db.Device
msg := message.Body.MsgBody.(*usp_msg.Body_Response).Response.GetGetResp()
device.Vendor = msg.ReqPathResults[0].ResolvedPathResults[0].ResultParams["Manufacturer"]
device.Model = msg.ReqPathResults[1].ResolvedPathResults[0].ResultParams["ModelName"]
device.Version = msg.ReqPathResults[2].ResolvedPathResults[0].ResultParams["SoftwareVersion"]
device.ProductClass = msg.ReqPathResults[4].ResolvedPathResults[0].ResultParams["ProductClass"]
device.SN = sn
switch db.MTP(mtp) {
case db.MQTT:
device.Mqtt = db.Online
case db.WEBSOCKETS:
device.Websockets = db.Online
case db.STOMP:
device.Stomp = db.Online
}
device.Status = db.Online
return device
}

View File

@ -1,18 +1,14 @@
package utils
import (
"net"
"github.com/google/uuid"
usp_msg "github.com/leandrofars/oktopus/internal/usp_message"
"github.com/leandrofars/oktopus/internal/usp_record"
"net"
)
// Status are saved at database as numbers
const (
Online = iota
Associating
Offline
)
//TODO: change usp utils related to another package
// Get interfaces MACs, and the first interface MAC is gonna be used as mqtt clientId
func GetMacAddr() ([]string, error) {

View File

@ -2,10 +2,18 @@ package ws
import (
"context"
"encoding/json"
"log"
"reflect"
"sync"
"time"
"github.com/gorilla/websocket"
"github.com/leandrofars/oktopus/internal/db"
"github.com/leandrofars/oktopus/internal/mtp/handler"
"github.com/leandrofars/oktopus/internal/usp_record"
"google.golang.org/protobuf/proto"
)
type Ws struct {
@ -16,25 +24,37 @@ type Ws struct {
Auth bool
TLS bool
Ctx context.Context
NewDeviceQueue map[string]string
NewDevQMutex *sync.Mutex
DB db.Database
}
const (
WS_CONNECTION_RETRY = 10 * time.Second
)
const (
OFFLINE = "0"
ONLINE = "1"
)
type deviceStatus struct {
Eid string
Status string
}
// Global Websocket connection used in this package
var wsConn *websocket.Conn
func (w *Ws) Connect() {
var wsUrl string
// communication with devices
wsUrl := "ws://" + w.Addr + ":" + w.Port + w.Route
if w.Auth {
log.Println("WS token:", w.Token)
// e.g. ws://localhost:8080/ws/controller?token=123456
wsUrl = "ws://" + w.Addr + ":" + w.Port + w.Route + "?token=" + w.Token
} else {
// e.g. ws://localhost:8080/ws/controller
wsUrl = "ws://" + w.Addr + ":" + w.Port + w.Route
wsUrl = wsUrl + "?token=" + w.Token
}
// Keeps trying to connect to the WS endpoint until it succeeds or receives a stop signal
@ -74,18 +94,108 @@ func (w *Ws) Disconnect() {
/* -------------------------------------------------------------------------- */
func (w *Ws) Subscribe() {
var m sync.Mutex
w.NewDevQMutex = &m
w.NewDeviceQueue = make(map[string]string)
for {
_, message, err := wsConn.ReadMessage()
//TODO: deal with message in new go routine
msgType, wsMsg, err := wsConn.ReadMessage()
if err != nil {
log.Println("read:", err)
return
}
log.Printf("recv: %s", message)
if msgType == websocket.TextMessage {
var deviceStatus deviceStatus
err = json.Unmarshal(wsMsg, &deviceStatus)
if err != nil {
log.Println("Websockets Text Message is not about devices status")
}
log.Println("Received device status message")
var status db.Status
switch deviceStatus.Status {
case ONLINE:
status = db.Online
case OFFLINE:
status = db.Offline
default:
log.Println("Invalid device status")
return
}
w.DB.UpdateStatus(deviceStatus.Eid, status, db.WEBSOCKETS)
//TODO: return error 1003 to device
//TODO: get status messages
continue
}
//log.Printf("binary data: %s", string(wsMsg))
//TODO: if error at processing message return error 1003 to devicec
//TODO: deal with received messages in parallel
var record usp_record.Record
//var message usp_msg.Msg
err = proto.Unmarshal(wsMsg, &record)
if err != nil {
log.Println(err)
}
connRecord := &usp_record.Record_WebsocketConnect{
WebsocketConnect: &usp_record.WebSocketConnectRecord{},
}
noSessionRecord := &usp_record.Record_NoSessionContext{
NoSessionContext: &usp_record.NoSessionContextRecord{},
}
//log.Printf("Record Type: %++v", record.RecordType)
deviceId := record.FromId
// New Device Handler
if reflect.TypeOf(record.RecordType) == reflect.TypeOf(connRecord) {
log.Println("Websocket new device:", deviceId)
tr369Message := handler.HandleNewDevice(deviceId)
w.NewDevQMutex.Lock()
w.NewDeviceQueue[deviceId] = ""
w.NewDevQMutex.Unlock()
w.Publish(tr369Message, "", "", false)
continue
}
//TODO: see what type of message was received
if reflect.TypeOf(record.RecordType) == reflect.TypeOf(noSessionRecord) {
//log.Printf("Websocket device %s message", record.FromId)
// New device answer
if _, ok := w.NewDeviceQueue[deviceId]; ok {
log.Printf("New device %s response", deviceId)
device := handler.HandleNewDevicesResponse(wsMsg, deviceId, db.WEBSOCKETS)
w.NewDevQMutex.Lock()
delete(w.NewDeviceQueue, deviceId)
w.NewDevQMutex.Unlock()
w.DB.CreateDevice(device)
if err != nil {
log.Fatal(err)
}
continue
}
//TODO: send message to Api Msg Queue
}
//log.Printf("recv: %++v", record)
}
}
func (w *Ws) Publish(msg []byte, topic, respTopic string, retain bool) {
err := wsConn.WriteMessage(websocket.TextMessage, msg)
err := wsConn.WriteMessage(websocket.BinaryMessage, msg)
if err != nil {
log.Println("write:", err)
return

View File

@ -1,6 +1,7 @@
package server
import (
"log"
"net"
"strings"
"time"
@ -50,6 +51,8 @@ func (proc *requestProcessor) Serve(l net.Listener) error {
} else {
topic := proc.tm.Find(r.Sub.Destination())
topic.Subscribe(r.Sub)
//TODO: if subscribed to oktopus/v1/agent send online message to ...status topic
log.Println(r.Sub.Destination())
}
case client.UnsubscribeOp:

View File

@ -1,2 +1,4 @@
SERVER_PORT=""
SERVER_AUTH_TOKEN=""
CONTROLLER_EID=""
SERVER_AUTH_ENABLE=""

View File

@ -6,6 +6,7 @@ require (
github.com/gorilla/mux v1.8.1
github.com/gorilla/websocket v1.5.1
github.com/joho/godotenv v1.5.1
google.golang.org/protobuf v1.32.0
)
require golang.org/x/net v0.17.0 // indirect

View File

@ -1,3 +1,5 @@
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY=
github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ=
github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY=
@ -6,3 +8,7 @@ github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM=
golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v1.32.0 h1:pPC6BG5ex8PDFnkbrGU3EixyhKcQ2aDuBS36lqK/C7I=
google.golang.org/protobuf v1.32.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=

View File

@ -12,7 +12,9 @@ import (
type Config struct {
Port string // server port: e.g. ":8080"
Auth bool // server auth enable/disable
Token string // controller auth token
ControllerEID string // controller endpoint id
}
func NewConfig() Config {
@ -32,6 +34,8 @@ func NewConfig() Config {
/* ------------------------------ define flags ------------------------------ */
flPort := flag.String("port", lookupEnvOrString("SERVER_PORT", ":8080"), "Server port")
flToken := flag.String("token", lookupEnvOrString("SERVER_AUTH_TOKEN", ""), "Controller auth token")
flAuth := flag.Bool("auth", lookupEnvOrBool("SERVER_AUTH_ENABLE", false), "Server auth enable/disable")
flControllerEid := flag.String("controller-eid", lookupEnvOrString("CONTROLLER_EID", "oktopusController"), "Controller eid")
flHelp := flag.Bool("help", false, "Help")
flag.Parse()
/* -------------------------------------------------------------------------- */
@ -44,6 +48,8 @@ func NewConfig() Config {
return Config{
Port: *flPort,
Token: *flToken,
Auth: *flAuth,
ControllerEID: *flControllerEid,
}
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,530 @@
syntax = "proto3";
//**************************************************************************
// TR-369 USP Message Protocol Buffer Schema
//
// Copyright (c) 2017-2018, Broadband Forum
//
// The undersigned members have elected to grant the copyright to
// their contributed material used in this software:
// Copyright (c) 2017-2018 ARRIS Enterprises, LLC.
//
// Redistribution and use in source and binary forms, with or
// without modification, are permitted provided that the following
// conditions are met:
//
// 1. Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
//
// 2. Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following
// disclaimer in the documentation and/or other materials
// provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its
// contributors may be used to endorse or promote products
// derived from this software without specific prior written
// permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
// CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
// NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
// CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
// ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
//
// The above license is used as a license under copyright only.
// Please reference the Forum IPR Policy for patent licensing terms
// <https://www.broadband-forum.org/ipr-policy>.
//
// Any moral rights which are necessary to exercise under the above
// license grant are also deemed granted under this license.
//
// | Version | Name | Date |
// | TR-369 1.0.0 | User Services Platform | APR, 2018 |
// | TR-369 1.0.1 | User Services Platform | JUN, 2018 |
// | TR-369 1.0.2 | User Services Platform | OCT, 2018 |
// | TR-369 1.1 | User Services Platform | SEP, 2019 |
//
// BBF software release registry: http://www.broadband-forum.org/software
//**************************************************************************
package usp;
option go_package="./usp-msg";
message Msg {
Header header = 1; // Make required in the protocol
Body body = 2; // Make required in the protocol
}
message Header {
string msg_id = 1; // Make required in the protocol
MsgType msg_type = 2; // Make required in the protocol
enum MsgType {
ERROR = 0;
GET = 1;
GET_RESP = 2;
NOTIFY = 3;
SET = 4;
SET_RESP = 5;
OPERATE = 6;
OPERATE_RESP = 7;
ADD = 8;
ADD_RESP = 9;
DELETE = 10;
DELETE_RESP = 11;
GET_SUPPORTED_DM = 12;
GET_SUPPORTED_DM_RESP = 13;
GET_INSTANCES = 14;
GET_INSTANCES_RESP = 15;
NOTIFY_RESP = 16;
GET_SUPPORTED_PROTO = 17;
GET_SUPPORTED_PROTO_RESP = 18;
}
}
message Body {
oneof msg_body {
Request request = 1;
Response response = 2;
Error error = 3;
}
}
message Request {
oneof req_type {
Get get = 1;
GetSupportedDM get_supported_dm = 2;
GetInstances get_instances = 3;
Set set = 4;
Add add = 5;
Delete delete = 6;
Operate operate = 7;
Notify notify = 8;
GetSupportedProtocol get_supported_protocol = 9;
}
}
message Response {
oneof resp_type {
GetResp get_resp = 1;
GetSupportedDMResp get_supported_dm_resp = 2;
GetInstancesResp get_instances_resp = 3;
SetResp set_resp = 4;
AddResp add_resp = 5;
DeleteResp delete_resp = 6;
OperateResp operate_resp = 7;
NotifyResp notify_resp = 8;
GetSupportedProtocolResp get_supported_protocol_resp = 9;
}
}
message Error {
fixed32 err_code = 1;
string err_msg = 2;
repeated ParamError param_errs = 3;
message ParamError {
string param_path = 1;
fixed32 err_code = 2;
string err_msg = 3;
}
}
message Get {
repeated string param_paths = 1;
fixed32 max_depth = 2;
}
message GetResp {
repeated RequestedPathResult req_path_results = 1;
message RequestedPathResult {
string requested_path = 1;
fixed32 err_code = 2;
string err_msg = 3;
repeated ResolvedPathResult resolved_path_results = 4;
}
message ResolvedPathResult {
string resolved_path = 1;
map<string, string> result_params = 2;
}
}
message GetSupportedDM {
repeated string obj_paths = 1;
bool first_level_only = 2;
bool return_commands = 3;
bool return_events = 4;
bool return_params = 5;
}
message GetSupportedDMResp {
repeated RequestedObjectResult req_obj_results = 1;
message RequestedObjectResult {
string req_obj_path = 1;
fixed32 err_code = 2;
string err_msg = 3;
string data_model_inst_uri = 4;
repeated SupportedObjectResult supported_objs = 5;
}
message SupportedObjectResult {
string supported_obj_path = 1;
ObjAccessType access = 2;
bool is_multi_instance = 3;
repeated SupportedCommandResult supported_commands = 4;
repeated SupportedEventResult supported_events = 5;
repeated SupportedParamResult supported_params = 6;
repeated string divergent_paths = 7;
}
message SupportedParamResult {
string param_name = 1;
ParamAccessType access = 2;
ParamValueType value_type = 3;
ValueChangeType value_change = 4;
}
message SupportedCommandResult {
string command_name = 1;
repeated string input_arg_names = 2;
repeated string output_arg_names = 3;
CmdType command_type = 4;
}
message SupportedEventResult {
string event_name = 1;
repeated string arg_names = 2;
}
enum ParamAccessType {
PARAM_READ_ONLY = 0;
PARAM_READ_WRITE = 1;
PARAM_WRITE_ONLY = 2;
}
enum ObjAccessType {
OBJ_READ_ONLY = 0;
OBJ_ADD_DELETE = 1;
OBJ_ADD_ONLY = 2;
OBJ_DELETE_ONLY = 3;
}
enum ParamValueType {
PARAM_UNKNOWN = 0;
PARAM_BASE_64 = 1;
PARAM_BOOLEAN = 2;
PARAM_DATE_TIME = 3;
PARAM_DECIMAL = 4;
PARAM_HEX_BINARY = 5;
PARAM_INT = 6;
PARAM_LONG = 7;
PARAM_STRING = 8;
PARAM_UNSIGNED_INT = 9;
PARAM_UNSIGNED_LONG = 10;
}
enum ValueChangeType {
VALUE_CHANGE_UNKNOWN = 0;
VALUE_CHANGE_ALLOWED = 1;
VALUE_CHANGE_WILL_IGNORE = 2;
}
enum CmdType {
CMD_UNKNOWN = 0;
CMD_SYNC = 1;
CMD_ASYNC = 2;
}
}
message GetInstances {
repeated string obj_paths = 1;
bool first_level_only = 2;
}
message GetInstancesResp {
repeated RequestedPathResult req_path_results = 1;
message RequestedPathResult {
string requested_path = 1;
fixed32 err_code = 2;
string err_msg = 3;
repeated CurrInstance curr_insts = 4;
}
message CurrInstance {
string instantiated_obj_path = 1;
map<string, string> unique_keys = 2;
}
}
message GetSupportedProtocol {
string controller_supported_protocol_versions = 1;
}
message GetSupportedProtocolResp {
string agent_supported_protocol_versions = 1;
}
message Add {
bool allow_partial = 1;
repeated CreateObject create_objs = 2;
message CreateObject {
string obj_path = 1;
repeated CreateParamSetting param_settings = 2;
}
message CreateParamSetting {
string param = 1;
string value = 2;
bool required = 3;
}
}
message AddResp {
repeated CreatedObjectResult created_obj_results = 1;
message CreatedObjectResult {
string requested_path = 1;
OperationStatus oper_status = 2;
message OperationStatus {
oneof oper_status {
OperationFailure oper_failure = 1;
OperationSuccess oper_success = 2;
}
message OperationFailure {
fixed32 err_code = 1;
string err_msg = 2;
}
message OperationSuccess {
string instantiated_path = 1;
repeated ParameterError param_errs = 2;
map<string, string> unique_keys = 3;
}
}
}
message ParameterError {
string param = 1;
fixed32 err_code = 2;
string err_msg = 3;
}
}
message Delete {
bool allow_partial = 1;
repeated string obj_paths = 2;
}
message DeleteResp {
repeated DeletedObjectResult deleted_obj_results = 1;
message DeletedObjectResult {
string requested_path = 1;
OperationStatus oper_status = 2;
message OperationStatus {
oneof oper_status {
OperationFailure oper_failure = 1;
OperationSuccess oper_success = 2;
}
message OperationFailure {
fixed32 err_code = 1;
string err_msg = 2;
}
message OperationSuccess {
repeated string affected_paths = 1;
repeated UnaffectedPathError unaffected_path_errs = 2;
}
}
}
message UnaffectedPathError {
string unaffected_path = 1;
fixed32 err_code = 2;
string err_msg = 3;
}
}
message Set {
bool allow_partial = 1;
repeated UpdateObject update_objs = 2;
message UpdateObject {
string obj_path = 1;
repeated UpdateParamSetting param_settings = 2;
}
message UpdateParamSetting {
string param = 1;
string value = 2;
bool required = 3;
}
}
message SetResp {
repeated UpdatedObjectResult updated_obj_results = 1;
message UpdatedObjectResult {
string requested_path = 1;
OperationStatus oper_status = 2;
message OperationStatus {
oneof oper_status {
OperationFailure oper_failure = 1;
OperationSuccess oper_success = 2;
}
message OperationFailure {
fixed32 err_code = 1;
string err_msg = 2;
repeated UpdatedInstanceFailure updated_inst_failures = 3;
}
message OperationSuccess {
repeated UpdatedInstanceResult updated_inst_results = 1;
}
}
}
message UpdatedInstanceFailure {
string affected_path = 1;
repeated ParameterError param_errs = 2;
}
message UpdatedInstanceResult {
string affected_path = 1;
repeated ParameterError param_errs = 2;
map<string, string> updated_params = 3;
}
message ParameterError {
string param = 1;
fixed32 err_code = 2;
string err_msg = 3;
}
}
message Operate {
string command = 1;
string command_key = 2;
bool send_resp = 3;
map<string, string> input_args = 4;
}
message OperateResp {
repeated OperationResult operation_results = 1;
message OperationResult {
string executed_command = 1;
oneof operation_resp {
string req_obj_path = 2;
OutputArgs req_output_args = 3;
CommandFailure cmd_failure = 4;
}
message OutputArgs {
map<string, string> output_args = 1;
}
message CommandFailure {
fixed32 err_code = 1;
string err_msg = 2;
}
}
}
message Notify {
string subscription_id = 1;
bool send_resp = 2;
oneof notification {
Event event = 3;
ValueChange value_change = 4;
ObjectCreation obj_creation = 5;
ObjectDeletion obj_deletion = 6;
OperationComplete oper_complete = 7;
OnBoardRequest on_board_req = 8;
}
message Event {
string obj_path = 1;
string event_name = 2;
map<string, string> params = 3;
}
message ValueChange {
string param_path = 1;
string param_value = 2;
}
message ObjectCreation {
string obj_path = 1;
map<string, string> unique_keys = 2;
}
message ObjectDeletion {
string obj_path = 1;
}
message OperationComplete {
string obj_path = 1;
string command_name = 2;
string command_key = 3;
oneof operation_resp {
OutputArgs req_output_args = 4;
CommandFailure cmd_failure = 5;
}
message OutputArgs {
map<string, string> output_args = 1;
}
message CommandFailure {
fixed32 err_code = 1;
string err_msg = 2;
}
}
message OnBoardRequest {
string oui = 1;
string product_class = 2;
string serial_number = 3;
string agent_supported_protocol_versions = 4;
}
}
message NotifyResp {
string subscription_id = 1;
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,134 @@
syntax = "proto3";
//**************************************************************************
// TR-369 USP Record Protocol Buffer Schema
//
// Copyright (c) 2017-2018, Broadband Forum
//
// The undersigned members have elected to grant the copyright to
// their contributed material used in this software:
// Copyright (c) 2017-2018 ARRIS Enterprises, LLC.
//
// Redistribution and use in source and binary forms, with or
// without modification, are permitted provided that the following
// conditions are met:
//
// 1. Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
//
// 2. Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following
// disclaimer in the documentation and/or other materials
// provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its
// contributors may be used to endorse or promote products
// derived from this software without specific prior written
// permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
// CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
// NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
// CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
// ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
//
// The above license is used as a license under copyright only.
// Please reference the Forum IPR Policy for patent licensing terms
// <https://www.broadband-forum.org/ipr-policy>.
//
// Any moral rights which are necessary to exercise under the above
// license grant are also deemed granted under this license.
//
//
// | Version | Name | Date |
// | TR-369 1.0.0 | User Services Platform | APR, 2018 |
// | TR-369 1.0.1 | User Services Platform | JUN, 2018 |
// | TR-369 1.0.2 | User Services Platform | OCT, 2018 |
// | TR-369 1.1 | User Services Platform | SEP, 2019 |
//
// BBF software release registry: http://www.broadband-forum.org/software
//**************************************************************************
package usp_record;
option go_package="./usp-record";
message Record {
string version = 1;
string to_id = 2;
string from_id = 3;
PayloadSecurity payload_security = 4;
bytes mac_signature = 5; //MAC or Signature
bytes sender_cert = 6;
oneof record_type {
NoSessionContextRecord no_session_context = 7;
SessionContextRecord session_context = 8;
WebSocketConnectRecord websocket_connect = 9;
MQTTConnectRecord mqtt_connect = 10;
STOMPConnectRecord stomp_connect = 11;
DisconnectRecord disconnect = 12;
}
enum PayloadSecurity {
PLAINTEXT = 0;
TLS12 = 1;
}
}
message NoSessionContextRecord {
bytes payload = 2;
}
message SessionContextRecord {
uint64 session_id = 1;
uint64 sequence_id = 2;
uint64 expected_id = 3;
uint64 retransmit_id = 4;
PayloadSARState payload_sar_state = 5;
PayloadSARState payloadrec_sar_state = 6;
repeated bytes payload = 7;
enum PayloadSARState {
NONE = 0; //No segmentation
BEGIN = 1; //Begin segmentation
INPROCESS = 2; //Segmentation in process
COMPLETE = 3; //Segmentation is complete
}
}
message WebSocketConnectRecord {
// An empty message
}
message MQTTConnectRecord {
MQTTVersion version = 1;
string subscribed_topic = 2;
enum MQTTVersion {
V3_1_1 = 0; // Represents MQTT v3.1.1, a.k.a. v4 in the MQTT Spec
V5 = 1;
}
}
message STOMPConnectRecord {
STOMPVersion version = 1;
string subscribed_destination = 2;
enum STOMPVersion {
V1_2 = 0;
}
}
message DisconnectRecord {
string reason = 1;
fixed32 reason_code = 2;
}

View File

@ -1,27 +1,28 @@
package handler
import (
"bytes"
"log"
"net/http"
"strings"
"time"
"github.com/OktopUSP/oktopus/ws/internal/usp_record"
"github.com/gorilla/websocket"
"google.golang.org/protobuf/proto"
)
const (
// Time allowed to write a message to the peer.
writeWait = 10 * time.Second
writeWait = 30 * time.Second
// Time allowed to read the next pong message from the peer.
pongWait = 60 * time.Second
pongWait = 10 * time.Second
// Send pings to peer with this period. Must be less than pongWait.
pingPeriod = (pongWait * 9) / 10
// Maximum message size allowed from peer.
maxMessageSize = 512
//maxMessageSize = 512
// Websockets version of the protocol
wsVersion = "13"
@ -51,7 +52,7 @@ type Client struct {
conn *websocket.Conn
// Buffered channel of outbound messages.
send chan []byte
send chan message
}
// readPump pumps messages from the websocket connection to the hub.
@ -59,12 +60,13 @@ type Client struct {
// The application runs readPump in a per-connection goroutine. The application
// ensures that there is at most one reader on a connection by executing all
// reads from this goroutine.
func (c *Client) readPump() {
// cEID = controller endpoint id
func (c *Client) readPump(cEID string) {
defer func() {
c.hub.unregister <- c
c.conn.Close()
}()
c.conn.SetReadLimit(maxMessageSize)
//c.conn.SetReadLimit(maxMessageSize)
c.conn.SetReadDeadline(time.Now().Add(pongWait))
c.conn.SetPongHandler(func(string) error { c.conn.SetReadDeadline(time.Now().Add(pongWait)); return nil })
for {
@ -75,15 +77,28 @@ func (c *Client) readPump() {
}
break
}
data = bytes.TrimSpace(bytes.Replace(data, newline, space, -1))
message := message{
eid: "oktopusController",
data: data,
}
message := constructMsg(cEID, c.eid, data)
c.hub.broadcast <- message
}
}
func constructMsg(eid string, from string, data []byte) message {
if eid == "" {
var record usp_record.Record
err := proto.Unmarshal(data, &record)
if err != nil {
log.Println(err)
}
eid = record.ToId
}
return message{
eid: eid,
from: from,
data: data,
msgType: websocket.BinaryMessage,
}
}
// writePump pumps messages from the hub to the websocket connection.
//
// A goroutine running writePump is started for each connection. The
@ -101,21 +116,23 @@ func (c *Client) writePump() {
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if !ok {
// The hub closed the channel.
log.Println("The hub closed the channel of", c.eid)
c.conn.WriteMessage(websocket.CloseMessage, []byte{})
return
}
w, err := c.conn.NextWriter(websocket.TextMessage)
w, err := c.conn.NextWriter(message.msgType)
if err != nil {
return
}
w.Write(message)
w.Write(message.data)
// Add queued messages to the current websocket message.
n := len(c.send)
for i := 0; i < n; i++ {
w.Write(newline)
w.Write(<-c.send)
send := <-c.send
w.Write(send.data)
}
if err := w.Close(); err != nil {
@ -131,23 +148,31 @@ func (c *Client) writePump() {
}
// Handle USP Controller events
func ServeController(w http.ResponseWriter, r *http.Request, token string) {
func ServeController(w http.ResponseWriter, r *http.Request, token, cEID string, authEnable bool) {
if authEnable {
recv_token := r.URL.Query().Get("token")
if recv_token != token {
w.WriteHeader(http.StatusUnauthorized)
w.Write([]byte("Unauthorized"))
return
}
}
_, err := upgrader.Upgrade(w, r, nil)
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Println(err)
return
}
client := &Client{hub: hub, eid: cEID, conn: conn, send: make(chan message)}
client.hub.register <- client
go client.writePump()
go client.readPump("")
}
// Handle USP Agent events
func ServeAgent(w http.ResponseWriter, r *http.Request) {
// Handle USP Agent events, cEID = controller endpoint id
func ServeAgent(w http.ResponseWriter, r *http.Request, cEID string) {
//TODO: find out a way to authenticate agents
@ -159,6 +184,7 @@ func ServeAgent(w http.ResponseWriter, r *http.Request) {
deviceid := extractDeviceId(r.Header)
if deviceid == "" {
w.WriteHeader(http.StatusBadRequest)
log.Println("Device id not found")
w.Write([]byte("Device id not found"))
return
}
@ -169,24 +195,28 @@ func ServeAgent(w http.ResponseWriter, r *http.Request) {
return
}
client := &Client{hub: hub, eid: deviceid, conn: conn, send: make(chan []byte, 256)}
client := &Client{hub: hub, eid: deviceid, conn: conn, send: make(chan message)}
client.hub.register <- client
// Allow collection of memory referenced by the caller by doing all work in
// new goroutines.
go client.writePump()
go client.readPump()
//TODO: get cEID from device message toId record field (must refact nice part of the code for this to be dynamic)
go client.readPump(cEID)
}
// gets device id from websockets header
func extractDeviceId(header http.Header) string {
// Header must be like that: bbf-usp-protocol; eid="<endpoint-id>"
log.Println("Header sec-websocket-extensions:", header.Get("sec-websocket-extensions"))
// Header must be like that: bbf-usp-protocol; eid="<endpoint-id>" <endpoint-id> is the same ar the record.FromId/record.ToId
// log.Println("Header sec-websocket-extensions:", header.Get("sec-websocket-extensions"))
wsHeaderExtension := header.Get("sec-websocket-extensions")
// Split the input string by double quotes
deviceid := strings.Split(wsHeaderExtension, "\"")
if len(deviceid) < 2 {
return ""
}
return deviceid[1]
}

View File

@ -1,6 +1,11 @@
package handler
import "log"
import (
"encoding/json"
"log"
"github.com/gorilla/websocket"
)
// Keeps the content and the destination of a websockets message
type message struct {
@ -9,6 +14,8 @@ type message struct {
// the message is intended to be delivered to.
eid string
data []byte
msgType int
from string
}
// Hub maintains the set of active clients and broadcasts messages to the
@ -27,10 +34,25 @@ type Hub struct {
unregister chan *Client
}
const (
OFFLINE = "0"
ONLINE = "1"
)
type deviceStatus struct {
Eid string
Status string
}
// Global hub instance
var hub *Hub
func InitHandlers() {
// Controller Endpoint ID
var ceid string
func InitHandlers(eid string) {
ceid = eid
log.Println("New hub, Controller eid:", ceid)
hub = newHub()
hub.run()
}
@ -54,19 +76,42 @@ func (h *Hub) run() {
case client := <-h.unregister:
// verify if eid exists
if _, ok := h.clients[client.eid]; ok {
// delete eid form map of connections
// delete eid from map of connections
delete(h.clients, client.eid)
// close client messages receiving channel
close(client.send)
}
log.Println("Disconnected client", client.eid)
case message := <-h.broadcast:
data, _ := json.Marshal(deviceStatus{client.eid, OFFLINE})
msg := message{
from: "WS server",
eid: ceid,
data: data,
msgType: websocket.TextMessage,
}
log.Printf("%++v", msg)
//TODO: set this snippet of code as a function to avoid repetition
if c, ok := h.clients[msg.eid]; ok {
select {
// send message to receiver client
case c.send <- msg:
log.Printf("Sent a message %s --> %s", msg.from, msg.eid)
default:
// in case the msg sending fails, close the client connection
// because it means that the client is no longer active
log.Printf("Failed to send a msg to %s, disconnecting client...", msg.eid)
close(c.send)
delete(h.clients, c.eid)
}
}
case message := <-h.broadcast: //TODO: ver a conexão de quem está enviando
log.Println("send message to", message.eid)
// verify if eid exists
if c, ok := h.clients[message.eid]; ok {
select {
// send message to receiver client
case c.send <- message.data:
log.Printf("Sent a message to %s", message.eid)
case c.send <- message:
log.Printf("Sent a message %s --> %s", message.from, message.eid)
default:
// in case the message sending fails, close the client connection
// because it means that the client is no longer active
@ -75,6 +120,7 @@ func (h *Hub) run() {
delete(h.clients, c.eid)
}
} else {
//TODO: create queue for receiver while the client is not online
log.Printf("Message receiver not found: %s", message.eid)
}
}

View File

@ -14,14 +14,14 @@ import (
// Starts New Websockets Server
func StartNewServer(c config.Config) {
// Initialize handlers of websockets events
go handler.InitHandlers()
go handler.InitHandlers(c.ControllerEID)
r := mux.NewRouter()
r.HandleFunc("/ws/agent", func(w http.ResponseWriter, r *http.Request) {
handler.ServeAgent(w, r)
handler.ServeAgent(w, r, c.ControllerEID)
})
r.HandleFunc("/ws/controller", func(w http.ResponseWriter, r *http.Request) {
handler.ServeController(w, r, c.Token)
handler.ServeController(w, r, c.Token, c.ControllerEID, c.Auth)
})
log.Println("Websockets server running")