oktopus/backend/services/controller/internal/bridge/bridge.go
2024-03-23 16:06:56 -03:00

175 lines
3.9 KiB
Go

package bridge
import (
"encoding/json"
"errors"
"log"
"net/http"
"time"
"github.com/leandrofars/oktopus/internal/entity"
local "github.com/leandrofars/oktopus/internal/nats"
"github.com/leandrofars/oktopus/internal/utils"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
)
var errNatsMsgReceivedWithErrorData = errors.New("Nats message received with error data")
var errNatsRequestTimeout = errors.New("Nats message response timeout")
type Bridge struct {
js jetstream.JetStream
nc *nats.Conn
}
func NewBridge(js jetstream.JetStream, nc *nats.Conn) Bridge {
return Bridge{
js: js,
nc: nc,
}
}
func NatsUspInteraction(
subSubj, pubSubj string,
body []byte,
w http.ResponseWriter,
nc *nats.Conn,
) ([]byte, error) {
var answer []byte
log.Println("Sending usp message")
log.Println("subSubj: ", subSubj)
log.Println("pubSubj: ", pubSubj)
ch := make(chan *nats.Msg, 64)
done := make(chan error)
_, err := nc.ChanSubscribe(subSubj, ch)
if err != nil {
log.Println(err)
w.WriteHeader(http.StatusInternalServerError)
w.Write(utils.Marshall("Error to communicate with nats: " + err.Error()))
return []byte{}, err
}
go func() {
select {
case msg := <-ch:
log.Println("Received an usp message response")
answer = msg.Data
done <- nil
case <-time.After(local.NATS_REQUEST_TIMEOUT):
log.Println("usp message response timeout")
w.WriteHeader(http.StatusGatewayTimeout)
w.Write(utils.Marshall("usp message response timeout"))
done <- errNatsRequestTimeout
}
}()
err = nc.Publish(pubSubj, body)
if err != nil {
log.Println(err)
w.WriteHeader(http.StatusInternalServerError)
w.Write(utils.Marshall("Error to communicate with nats: " + err.Error()))
return nil, err
}
err = <-done
return answer, err
}
func NatsCustomReq[T entity.DataType](
subSubj, pubSubj string,
body []byte,
w http.ResponseWriter,
nc *nats.Conn,
) (interface{}, error) {
var answer T
ch := make(chan *nats.Msg, 64)
done := make(chan string)
_, err := nc.ChanSubscribe(subSubj, ch)
if err != nil {
log.Println(err)
w.WriteHeader(http.StatusInternalServerError)
w.Write(utils.Marshall("Error to communicate with nats: " + err.Error()))
return nil, err
}
select {
case msg := <-ch:
log.Println("Received an api message response")
err = json.Unmarshal(msg.Data, &answer)
if err != nil {
log.Println(err)
w.WriteHeader(http.StatusInternalServerError)
w.Write(msg.Data)
return nil, err
}
done <- "done"
case <-time.After(local.NATS_REQUEST_TIMEOUT):
log.Println("Api message response timeout")
done <- "timeout"
}
err = nc.Publish(pubSubj, body)
if err != nil {
log.Println(err)
w.WriteHeader(http.StatusInternalServerError)
w.Write(utils.Marshall("Error to communicate with nats: " + err.Error()))
return nil, err
}
<-done
return nil, nil
}
/*
- makes a request to nats topic
- handle nats communication
- verify if received data is of error type
*/
func NatsReq[T entity.DataType](
subj string,
body []byte,
w http.ResponseWriter,
nc *nats.Conn,
) (*entity.MsgAnswer[T], error) {
var answer *entity.MsgAnswer[T]
msg, err := nc.Request(subj, body, local.NATS_REQUEST_TIMEOUT)
if err != nil {
log.Println(err)
w.WriteHeader(http.StatusInternalServerError)
w.Write(utils.Marshall("Error to communicate with nats: " + err.Error()))
return nil, err
}
err = json.Unmarshal(msg.Data, &answer)
if err != nil {
var errMsg *entity.MsgAnswer[*string]
err = json.Unmarshal(msg.Data, &errMsg)
if err != nil {
log.Println("Bad answer message formatting: ", err.Error())
w.WriteHeader(http.StatusInternalServerError)
w.Write(msg.Data)
return nil, err
}
log.Printf("Error message received, msg: %s, code: %d", *errMsg.Msg, errMsg.Code)
w.WriteHeader(errMsg.Code)
w.Write(utils.Marshall(*errMsg.Msg))
return nil, errNatsMsgReceivedWithErrorData
}
return answer, nil
}