feat: initial http bulk data collector

This commit is contained in:
leandrofars 2024-05-13 16:37:22 -03:00
parent 29375417ae
commit 145c5e9426
20 changed files with 567 additions and 6 deletions

1
backend/services/bulkdata/.gitignore vendored Normal file
View File

@ -0,0 +1 @@
/.env.local

View File

View File

@ -0,0 +1,8 @@
FROM golang:1.22@sha256:82e07063a1ac3ee59e6f38b1222e32ce88469e4431ff6496cc40fb9a0fc18229 as builder
WORKDIR /app
COPY ../ .
RUN CGO_ENABLED=0 GOOS=linux go build -o bulkdata cmd/bulkdata/main.go
FROM alpine:3.14@sha256:0f2d5c38dd7a4f4f733e688e3a6733cb5ab1ac6e3cb4603a5dd564e5bfb80eed
COPY --from=builder /app/bulkdata /
ENTRYPOINT ["/bulkdata"]

View File

@ -0,0 +1,61 @@
.PHONY: help build push start stop release remove delete run logs bash
DOCKER_USER ?= oktopusp
DOCKER_APP ?= bulkdata
DOCKER_TAG ?= $(shell git log --format="%h" -n 1)
CONTAINER_SHELL ?= /bin/sh
.DEFAULT_GOAL := help
help:
@echo "Makefile arguments:"
@echo ""
@echo "DOCKER_USER - docker user to build image"
@echo "DOCKER_APP - docker image name"
@echo "DOCKER_TAG - docker image tag"
@echo "CONTAINER_SHELL - container shell e.g:'/bin/bash'"
@echo ""
@echo "Makefile commands:"
@echo ""
@echo "build - docker image build"
@echo "push - push docker iamge to registry"
@echo "run - create and start docker container with the image"
@echo "start - start existent docker container with the image"
@echo "stop - stop docker container running the image"
@echo "remove - remove docker container running the image"
@echo "delete - delete docker image"
@echo "logs - show logs of docker container"
@echo "bash - access container shell"
@echo "release - tag image as latest and push to registry"
build:
@docker build -t ${DOCKER_USER}/${DOCKER_APP}:${DOCKER_TAG} -f Dockerfile ../
run:
@docker run -d --name ${DOCKER_USER}-${DOCKER_APP} ${DOCKER_USER}/${DOCKER_APP}:${DOCKER_TAG}
stop:
@docker stop ${DOCKER_USER}-${DOCKER_APP}
remove: stop
@docker rm ${DOCKER_USER}-${DOCKER_APP}
delete:
@docker rmi ${DOCKER_USER}/${DOCKER_APP}:${DOCKER_TAG}
start:
@docker start ${DOCKER_USER}-${DOCKER_APP}
push:
@docker push ${DOCKER_USER}/${DOCKER_APP}:${DOCKER_TAG}
logs:
@docker logs -f ${DOCKER_USER}-${DOCKER_APP}
bash:
@docker exec -it ${DOCKER_USER}-${DOCKER_APP} ${CONTAINER_SHELL}
release: build
@docker push ${DOCKER_USER}/${DOCKER_APP}:${DOCKER_TAG}
@docker tag ${DOCKER_USER}/${DOCKER_APP}:${DOCKER_TAG} ${DOCKER_USER}/${DOCKER_APP}:latest
@docker push ${DOCKER_USER}/${DOCKER_APP}:latest

View File

@ -0,0 +1,29 @@
package main
import (
"log"
"os"
"os/signal"
"syscall"
"github.com/oktopUSP/backend/services/bulkdata/internal/api"
"github.com/oktopUSP/backend/services/bulkdata/internal/config"
"github.com/oktopUSP/backend/services/bulkdata/internal/nats"
)
func main() {
done := make(chan os.Signal, 1)
signal.Notify(done, syscall.SIGINT)
c := config.NewConfig()
js, nc, kv := nats.StartNatsClient(c.Nats)
server := api.NewApi(c.RestApi, js, nc, kv)
server.StartApi()
<-done
log.Println("bulk data collector is saying adios ...")
}

View File

@ -0,0 +1,19 @@
module github.com/oktopUSP/backend/services/bulkdata
go 1.22.3
require (
github.com/gorilla/mux v1.8.1
github.com/joho/godotenv v1.5.1
github.com/nats-io/nats.go v1.34.1
github.com/rs/cors v1.11.0
)
require (
github.com/klauspost/compress v1.17.2 // indirect
github.com/nats-io/nkeys v0.4.7 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
golang.org/x/crypto v0.18.0 // indirect
golang.org/x/sys v0.16.0 // indirect
golang.org/x/text v0.14.0 // indirect
)

View File

@ -0,0 +1,20 @@
github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY=
github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ=
github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4=
github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/nats-io/nats.go v1.34.1 h1:syWey5xaNHZgicYBemv0nohUPPmaLteiBEUT6Q5+F/4=
github.com/nats-io/nats.go v1.34.1/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/rs/cors v1.11.0 h1:0B9GE/r9Bc2UxRMMtymBkHTenPkHDv0CW4Y98GBY+po=
github.com/rs/cors v1.11.0/go.mod h1:XyqrcTp5zjWr1wsJ8PIRZssZ8b/WMcMf71DJnit4EMU=
golang.org/x/crypto v0.18.0 h1:PGVlW0xEltQnzFZ55hkuX5+KLyrMYhHld1YHO4AKcdc=
golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg=
golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU=
golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=

View File

@ -0,0 +1,58 @@
package api
import (
"log"
"net/http"
"time"
"github.com/gorilla/mux"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
"github.com/oktopUSP/backend/services/bulkdata/internal/api/cors"
"github.com/oktopUSP/backend/services/bulkdata/internal/api/handler"
"github.com/oktopUSP/backend/services/bulkdata/internal/api/middleware"
"github.com/oktopUSP/backend/services/bulkdata/internal/config"
)
type Api struct {
port string
handler handler.Handler
}
const REQUEST_TIMEOUT = time.Second * 30
func NewApi(c config.RestApi, js jetstream.JetStream, nc *nats.Conn, kv jetstream.KeyValue) Api {
return Api{
port: c.Port,
handler: handler.NewHandler(js, nc, kv),
}
}
func (a *Api) StartApi() {
r := mux.NewRouter()
r.HandleFunc("/healthcheck", a.handler.Healthcheck).Methods("GET")
r.HandleFunc("/", a.handler.Data).Methods("POST")
/* ----- Middleware for requests which requires user to be authenticated ---- */
r.Use(func(handler http.Handler) http.Handler {
return middleware.Middleware(handler)
})
/* -------------------------------------------------------------------------- */
corsOpts := cors.GetCorsConfig()
srv := &http.Server{
Addr: "0.0.0.0:" + a.port,
WriteTimeout: time.Second * 60,
ReadTimeout: time.Second * 60,
IdleTimeout: time.Second * 60,
Handler: corsOpts.Handler(r),
}
go func() {
if err := srv.ListenAndServe(); err != nil {
log.Println(err)
}
}()
log.Println("Running Bulk Data Collector HTTP Server at port", a.port)
}

View File

@ -0,0 +1,39 @@
package cors
import (
"log"
"net/http"
"os"
"strings"
"github.com/rs/cors"
)
func GetCorsConfig() cors.Cors {
allowedOrigins := getCorsEnvConfig()
log.Println("API CORS - AllowedOrigins:", allowedOrigins)
return *cors.New(cors.Options{
AllowedOrigins: allowedOrigins,
AllowedMethods: []string{
http.MethodGet,
http.MethodPost,
http.MethodPut,
http.MethodPatch,
http.MethodDelete,
http.MethodOptions,
http.MethodHead,
},
AllowedHeaders: []string{
"*", //or you can your header key values which you are using in your application
},
})
}
func getCorsEnvConfig() []string {
val, _ := os.LookupEnv("REST_API_CORS")
if val == "" {
return []string{"*"}
}
return strings.Split(val, ",")
}

View File

@ -0,0 +1,31 @@
package handler
import (
"encoding/json"
"log"
"net/http"
)
func (h *Handler) Data(w http.ResponseWriter, r *http.Request) {
oui := r.URL.Query().Get("oui")
pc := r.URL.Query().Get("pc")
sn := r.URL.Query().Get("sn")
eid := r.URL.Query().Get("eid")
log.Println("oui: ", oui)
log.Println("pc: ", pc)
log.Println("sn: ", sn)
log.Println("eid: ", eid)
var body map[string]interface{}
err := json.NewDecoder(r.Body).Decode(&body)
if err != nil {
log.Println(err)
w.WriteHeader(http.StatusBadRequest)
return
}
log.Println("Body: ", body)
}

View File

@ -0,0 +1,23 @@
package handler
import (
"context"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
)
type Handler struct {
js jetstream.JetStream
nc *nats.Conn
kv jetstream.KeyValue
ctx context.Context
}
func NewHandler(js jetstream.JetStream, nc *nats.Conn, kv jetstream.KeyValue) Handler {
return Handler{
js: js,
nc: nc,
kv: kv,
}
}

View File

@ -0,0 +1,7 @@
package handler
import "net/http"
func (h *Handler) Healthcheck(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("I'm Alive"))
}

View File

@ -0,0 +1,26 @@
package middleware
import (
"net/http"
)
func Middleware(next http.Handler) http.Handler {
return http.HandlerFunc(
func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
// tokenString := r.Header.Get("Authorization")
// if tokenString == "" {
// w.WriteHeader(http.StatusUnauthorized)
// return
// }
// email, err := auth.ValidateToken(tokenString)
// if err != nil {
// w.WriteHeader(http.StatusUnauthorized)
// return
// }
//ctx := context.WithValue(r.Context(), "email", email)
next.ServeHTTP(w, r.WithContext(r.Context()))
},
)
}

View File

@ -0,0 +1,105 @@
package config
import (
"context"
"flag"
"log"
"os"
"strconv"
"github.com/joho/godotenv"
)
const LOCAL_ENV = ".env.local"
type Nats struct {
Url string
Name string
VerifyCertificates bool
Ctx context.Context
}
type RestApi struct {
Port string
Ctx context.Context
}
type Config struct {
RestApi RestApi
Nats Nats
}
func NewConfig() *Config {
loadEnvVariables()
log.SetFlags(log.LstdFlags | log.Lshortfile)
natsUrl := flag.String("nats_url", lookupEnvOrString("NATS_URL", "nats://localhost:4222"), "url for nats server")
natsName := flag.String("nats_name", lookupEnvOrString("NATS_NAME", "adapter"), "name for nats client")
natsVerifyCertificates := flag.Bool("nats_verify_certificates", lookupEnvOrBool("NATS_VERIFY_CERTIFICATES", false), "verify validity of certificates from nats server")
flApiPort := flag.String("api_port", lookupEnvOrString("REST_API_PORT", "4000"), "Rest api port")
flHelp := flag.Bool("help", false, "Help")
/*
App variables priority:
1º - Flag through command line.
2º - Env variables.
3º - Default flag value.
*/
flag.Parse()
if *flHelp {
flag.Usage()
os.Exit(0)
}
ctx := context.TODO()
return &Config{
RestApi: RestApi{
Port: *flApiPort,
Ctx: ctx,
},
Nats: Nats{
Url: *natsUrl,
Name: *natsName,
VerifyCertificates: *natsVerifyCertificates,
Ctx: ctx,
},
}
}
func loadEnvVariables() {
err := godotenv.Load()
if _, err := os.Stat(LOCAL_ENV); err == nil {
_ = godotenv.Overload(LOCAL_ENV)
log.Printf("Loaded variables from '%s'", LOCAL_ENV)
return
}
if err != nil {
log.Println("Error to load environment variables:", err)
} else {
log.Println("Loaded variables from '.env'")
}
}
func lookupEnvOrString(key string, defaultVal string) string {
if val, _ := os.LookupEnv(key); val != "" {
return val
}
return defaultVal
}
func lookupEnvOrBool(key string, defaultVal bool) bool {
if val, _ := os.LookupEnv(key); val != "" {
v, err := strconv.ParseBool(val)
if err != nil {
log.Fatalf("LookupEnvOrBool[%s]: %v", key, err)
}
return v
}
return defaultVal
}

View File

@ -0,0 +1,74 @@
package nats
import (
"log"
"time"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
"github.com/oktopUSP/backend/services/bulkdata/internal/config"
)
const (
BUCKET_NAME = "devices-auth"
BUCKET_DESCRIPTION = "Devices authentication"
)
func StartNatsClient(c config.Nats) (jetstream.JetStream, *nats.Conn, jetstream.KeyValue) {
var (
nc *nats.Conn
err error
)
opts := defineOptions(c)
log.Printf("Connecting to NATS server %s", c.Url)
for {
nc, err = nats.Connect(c.Url, opts...)
if err != nil {
time.Sleep(5 * time.Second)
continue
}
break
}
log.Printf("Successfully connected to NATS server %s", c.Url)
js, err := jetstream.New(nc)
if err != nil {
log.Fatalf("Failed to create JetStream client: %v", err)
}
kv, err := js.CreateOrUpdateKeyValue(c.Ctx, jetstream.KeyValueConfig{
Bucket: BUCKET_NAME,
Description: BUCKET_DESCRIPTION,
})
if err != nil {
log.Fatalf("Failed to create KeyValue store: %v", err)
}
return js, nc, kv
}
func defineOptions(c config.Nats) []nats.Option {
var opts []nats.Option
opts = append(opts, nats.Name(c.Name))
opts = append(opts, nats.MaxReconnects(-1))
opts = append(opts, nats.ReconnectWait(5*time.Second))
opts = append(opts, nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
log.Printf("Got disconnected! Reason: %q\n", err)
}))
opts = append(opts, nats.ReconnectHandler(func(nc *nats.Conn) {
log.Printf("Got reconnected to %v!\n", nc.ConnectedUrl())
}))
opts = append(opts, nats.ClosedHandler(func(nc *nats.Conn) {
log.Printf("Connection closed. Reason: %q\n", nc.LastError())
}))
if c.VerifyCertificates {
opts = append(opts, nats.RootCAs())
}
return opts
}

View File

@ -64,6 +64,7 @@ func (a *Api) StartApi() {
iot.HandleFunc("/{sn}/{mtp}/add", a.deviceCreateMsg).Methods("PUT")
iot.HandleFunc("/{sn}/{mtp}/del", a.deviceDeleteMsg).Methods("PUT")
iot.HandleFunc("/{sn}/{mtp}/set", a.deviceUpdateMsg).Methods("PUT")
iot.HandleFunc("/{sn}/{mtp}/notify", a.deviceNotifyMsg).Methods("PUT")
iot.HandleFunc("/{sn}/{mtp}/parameters", a.deviceGetSupportedParametersMsg).Methods("PUT")
iot.HandleFunc("/{sn}/{mtp}/instances", a.deviceGetParameterInstances).Methods("PUT")
iot.HandleFunc("/{sn}/{mtp}/operate", a.deviceOperateMsg).Methods("PUT")

View File

@ -1,8 +1,10 @@
package api
import (
"log"
"net/http"
"github.com/google/uuid"
"github.com/leandrofars/oktopus/internal/bridge"
local "github.com/leandrofars/oktopus/internal/nats"
"github.com/leandrofars/oktopus/internal/usp/usp_msg"
@ -165,6 +167,44 @@ func (a *Api) deviceOperateMsg(w http.ResponseWriter, r *http.Request) {
}
}
func (a *Api) deviceNotifyMsg(w http.ResponseWriter, r *http.Request) {
sn := getSerialNumberFromRequest(r)
mtp, err := getMtpFromRequest(r, w)
if err != nil {
return
}
if mtp == "" {
var ok bool
mtp, ok = deviceStateOK(w, a.nc, sn)
if !ok {
return
}
}
// var notify usp_msg.Notify
notify := usp_msg.Notify{
SubscriptionId: uuid.NewString(),
SendResp: true,
Notification: &usp_msg.Notify_Event_{
Event: &usp_msg.Notify_Event{
EventName: "Push!",
ObjPath: "Device.BulkData.Profile.1.",
},
},
}
log.Printf("Notify %s:", notify.String())
msg := usp_utils.NewNotifyMsg(notify)
err = sendUspMsg(msg, sn, w, a.nc, mtp)
if err != nil {
return
}
}
func (a *Api) deviceUpdateMsg(w http.ResponseWriter, r *http.Request) {
sn := getSerialNumberFromRequest(r)

View File

@ -145,3 +145,21 @@ func NewOperateMsg(getStuff usp_msg.Operate) usp_msg.Msg {
},
}
}
func NewNotifyMsg(notify usp_msg.Notify) usp_msg.Msg {
return usp_msg.Msg{
Header: &usp_msg.Header{
MsgId: uuid.NewString(),
MsgType: usp_msg.Header_NOTIFY,
},
Body: &usp_msg.Body{
MsgBody: &usp_msg.Body_Request{
Request: &usp_msg.Request{
ReqType: &usp_msg.Request_Notify{
Notify: &notify,
},
},
},
},
}
}

View File

@ -1,3 +1,3 @@
module.exports = {
reactStrictMode: true
reactStrictMode: false
};

View File

@ -655,7 +655,7 @@ const getDeviceParameterInstances = async (raw) =>{
let paramsInfo = {}
let commandsInfo = {}
let supportedParams = content.req_obj_results[0].supported_objs[0].supported_params
let supportedParams = content.req_obj_results[0].supported_objs[0].supported_params //TODO: fixme when more then one supported_objs
let supportedCommands = content.req_obj_results[0].supported_objs[0].supported_commands
let parametersToFetch = () => {
@ -725,7 +725,7 @@ const getDeviceParameterInstances = async (raw) =>{
// console.log(y.result_params[key])
// console.log({[key]:paramsInfo[key]})
console.log("Take a look here mate: ",{...paramsInfo[key], value: y.result_params[key]})
//console.log("Take a look here mate: ",{...paramsInfo[key], value: y.result_params[key]})
if (!values[y.resolved_path]){
values[y.resolved_path] = []
}
@ -751,12 +751,13 @@ const getDeviceParameterInstances = async (raw) =>{
})
}
console.log("values:", values)
//console.log("values:", values)
setDeviceParametersValue(values)
console.log("commands:", commandsInfo)
//console.log("commands:", commandsInfo)
setDeviceCommands(commandsInfo)
})
console.log("values:", values)
console.log("commands:", commandsInfo)
console.log("/-------------------------------------------------------/")
setDeviceParameters(content)
}else{