feat(controller): usp message via websockets

This commit is contained in:
leandrofars 2024-02-19 22:28:29 -03:00
parent f4a8f42dbe
commit f49fa5b3bf
6 changed files with 62 additions and 21 deletions

View File

@ -179,6 +179,8 @@ func main() {
InsecureSkipVerify: *flWsSkipVerify,
DB: database,
Ctx: ctx,
MsgQueue: apiMsgQueue,
QMutex: &m,
}
wsDone = make(chan os.Signal, 1)
@ -192,7 +194,7 @@ func main() {
wg.Wait()
a := api.NewApi(*flApiPort, database, &mqttClient, apiMsgQueue, &m)
a := api.NewApi(*flApiPort, database, &mqttClient, apiMsgQueue, &m, wsClient) //TODO: websockets instance
api.StartApi(a)
<-done

View File

@ -12,19 +12,19 @@ import (
"github.com/leandrofars/oktopus/internal/api/middleware"
"github.com/leandrofars/oktopus/internal/db"
"github.com/leandrofars/oktopus/internal/mqtt"
"github.com/leandrofars/oktopus/internal/mtp"
usp_msg "github.com/leandrofars/oktopus/internal/usp_message"
"github.com/leandrofars/oktopus/internal/utils"
"github.com/leandrofars/oktopus/internal/ws"
"google.golang.org/protobuf/proto"
)
type Api struct {
Port string
Db db.Database
Broker mtp.Broker
MsgQueue map[string](chan usp_msg.Msg)
QMutex *sync.Mutex
Mqtt mqtt.Mqtt
Port string
Db db.Database
MsgQueue map[string](chan usp_msg.Msg)
QMutex *sync.Mutex
Mqtt *mqtt.Mqtt
Websockets *ws.Ws
}
const REQUEST_TIMEOUT = time.Second * 30
@ -34,14 +34,14 @@ const (
AdminUser
)
func NewApi(port string, db db.Database, mqtt *mqtt.Mqtt, msgQueue map[string](chan usp_msg.Msg), m *sync.Mutex) Api {
func NewApi(port string, db db.Database, mqtt *mqtt.Mqtt, msgQueue map[string](chan usp_msg.Msg), m *sync.Mutex, w ws.Ws) Api {
return Api{
Port: port,
Db: db,
Broker: mqtt,
MsgQueue: msgQueue,
QMutex: m,
Mqtt: *mqtt,
Port: port,
Db: db,
MsgQueue: msgQueue,
QMutex: m,
Mqtt: mqtt,
Websockets: &w,
}
}
@ -133,8 +133,14 @@ func (a *Api) uspCall(msg usp_msg.Msg, sn string, w http.ResponseWriter, device
a.MsgQueue[msg.Header.MsgId] = make(chan usp_msg.Msg)
a.QMutex.Unlock()
log.Println("Sending Msg:", msg.Header.MsgId)
//TODO: Check what MTP the device is connected to
a.Broker.Publish(tr369Message, "oktopus/v1/agent/"+sn, "oktopus/v1/api/"+sn, false)
if device.Mqtt == db.Online {
a.Mqtt.Publish(tr369Message, "oktopus/v1/agent/"+sn, "oktopus/v1/api/"+sn, false)
} else if device.Websockets == db.Online {
a.Websockets.Publish(tr369Message, "", "", false)
} else if device.Stomp == db.Online {
//TODO: send stomp message
}
select {
case msg := <-a.MsgQueue[msg.Header.MsgId]:

View File

@ -232,6 +232,7 @@ func (a *Api) deviceUpdateMsg(w http.ResponseWriter, r *http.Request) {
a.uspCall(msg, sn, w, device)
}
// TODO: react this function, return err and deal with it in the caller, remove header superfluos
func (a *Api) deviceExists(sn string, w http.ResponseWriter) db.Device {
device, err := a.Db.RetrieveDevice(sn)
if err != nil {

View File

@ -7,6 +7,7 @@ import (
"time"
"github.com/gorilla/mux"
"github.com/leandrofars/oktopus/internal/db"
usp_msg "github.com/leandrofars/oktopus/internal/usp_message"
"github.com/leandrofars/oktopus/internal/utils"
"google.golang.org/protobuf/proto"
@ -51,7 +52,14 @@ func (a *Api) deviceFwUpdate(w http.ResponseWriter, r *http.Request) {
a.MsgQueue[msg.Header.MsgId] = make(chan usp_msg.Msg)
a.QMutex.Unlock()
log.Println("Sending Msg:", msg.Header.MsgId)
a.Broker.Publish(tr369Message, "oktopus/v1/agent/"+sn, "oktopus/v1/api/"+sn, false)
if device.Mqtt == db.Online {
a.Mqtt.Publish(tr369Message, "oktopus/v1/agent/"+sn, "oktopus/v1/api/"+sn, false)
} else if device.Websockets == db.Online {
a.Websockets.Publish(tr369Message, "", "", false)
} else if device.Stomp == db.Online {
//TODO: send stomp message
}
var getMsgAnswer *usp_msg.GetResp

View File

@ -10,6 +10,7 @@ import (
"github.com/gorilla/mux"
"github.com/leandrofars/oktopus/internal/db"
usp_msg "github.com/leandrofars/oktopus/internal/usp_message"
"github.com/leandrofars/oktopus/internal/utils"
"google.golang.org/protobuf/proto"
@ -31,7 +32,7 @@ type WiFi struct {
func (a *Api) deviceWifi(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
sn := vars["sn"]
a.deviceExists(sn, w)
device := a.deviceExists(sn, w)
if r.Method == http.MethodGet {
msg := utils.NewGetMsg(usp_msg.Get{
@ -69,7 +70,14 @@ func (a *Api) deviceWifi(w http.ResponseWriter, r *http.Request) {
a.MsgQueue[msg.Header.MsgId] = make(chan usp_msg.Msg)
a.QMutex.Unlock()
log.Println("Sending Msg:", msg.Header.MsgId)
a.Broker.Publish(tr369Message, "oktopus/v1/agent/"+sn, "oktopus/v1/api/"+sn, false)
if device.Mqtt == db.Online {
a.Mqtt.Publish(tr369Message, "oktopus/v1/agent/"+sn, "oktopus/v1/api/"+sn, false)
} else if device.Websockets == db.Online {
a.Websockets.Publish(tr369Message, "", "", false)
} else if device.Stomp == db.Online {
//TODO: send stomp message
}
//TODO: verify in protocol and in other models, the Device.Wifi parameters. Maybe in the future, to use SSIDReference from AccessPoint
select {

View File

@ -12,6 +12,7 @@ import (
"github.com/gorilla/websocket"
"github.com/leandrofars/oktopus/internal/db"
"github.com/leandrofars/oktopus/internal/mtp/handler"
usp_msg "github.com/leandrofars/oktopus/internal/usp_message"
"github.com/leandrofars/oktopus/internal/usp_record"
"google.golang.org/protobuf/proto"
@ -29,6 +30,8 @@ type Ws struct {
NewDeviceQueue map[string]string
NewDevQMutex *sync.Mutex
DB db.Database
MsgQueue map[string](chan usp_msg.Msg)
QMutex *sync.Mutex
}
const (
@ -204,7 +207,20 @@ func (w *Ws) Subscribe() {
continue
}
//TODO: send message to Api Msg Queue
log.Println("Handle api request")
var msg usp_msg.Msg
err = proto.Unmarshal(record.GetNoSessionContext().Payload, &msg)
if err != nil {
log.Println(err)
continue
}
if _, ok := w.MsgQueue[msg.Header.MsgId]; ok {
//m.QMutex.Lock()
w.MsgQueue[msg.Header.MsgId] <- msg
//m.QMutex.Unlock()
} else {
log.Printf("Message answer to request %s arrived too late", msg.Header.MsgId)
}
}