Merge pull request #246 from OktopUSP/dev

Beta Kubernetes Deployment + Initial CWMP
This commit is contained in:
Leandro Antônio Farias Machado 2024-04-22 13:26:17 -03:00 committed by GitHub
commit 6cb5b43787
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
64 changed files with 3448 additions and 134 deletions

View File

@ -53,7 +53,7 @@ This repository aims to promote the development of a multi-vendor management pla
<ul><li><h4>Infrastructure:</h4></li></ul> <ul><li><h4>Infrastructure:</h4></li></ul>
![image](https://github.com/OktopUSP/oktopus/assets/83298718/67873f6c-d3db-4045-8569-7e1135fc5fa7) ![image](https://github.com/OktopUSP/oktopus/assets/83298718/aa6feb7f-ac32-465c-b166-aa6b3ee5b68a)
<ul> <ul>
<li> <li>
@ -68,13 +68,15 @@ This repository aims to promote the development of a multi-vendor management pla
<ul> <ul>
<li> <li>
<h4>Quick start:</h4> <h4>Quick start:</h4>
Run app using Docker: Run app using <u><b>Docker Compose</b></u>:
<pre> <pre>
user@user-laptop:~$ cd oktopus/deploy/compose user@user-laptop:~$ cd oktopus/deploy/compose
user@user-laptop:~/oktopus/deploy/compose$ COMPOSE_PROFILES=nats,controller,mqtt,stomp,ws,adapter,frontend,portainer docker compose up -d user@user-laptop:~/oktopus/deploy/compose$ COMPOSE_PROFILES=nats,controller,mqtt,stomp,ws,adapter,frontend,portainer docker compose up -d
</pre> </pre>
Oktopus deployment in <u><b>Kubernetes</b></u> still is in beta phase: <a href="https://github.com/OktopUSP/oktopus/blob/main/deploy/kubernetes/README.md"> Instructions for Kubernetes deployment</a><p></p>
UI will open at port 3000: UI will open at port 3000:
<img src="https://github.com/OktopUSP/oktopus/assets/83298718/65f7c5b9-c08d-479a-8a13-fdfc634b5ca2"/> <img src="https://github.com/OktopUSP/oktopus/assets/83298718/65f7c5b9-c08d-479a-8a13-fdfc634b5ca2"/>
</li> </li>
<li> <li>
<h4>Device test agent (obuspa):</h4> <h4>Device test agent (obuspa):</h4>

View File

1
backend/services/acs/.gitignore vendored Normal file
View File

@ -0,0 +1 @@
*.local

View File

@ -0,0 +1,97 @@
# Moses ACS [![Build Status](https://travis-ci.org/lucacervasio/mosesacs.svg?branch=master)](https://travis-ci.org/lucacervasio/mosesacs)
An ACS in Go for provisioning CPEs, suitable for test purposes or production deployment.
## Getting started
Install the package:
go get oktopUSP/backend/services/acs
Run daemon:
mosesacs -d
Connect to it and get a cli:
mosesacs
Congratulations, you've connected to the daemon via websocket. Now you can issue commands via CLI or browse the embedded webserver at http://localhost:9292/www
## Compatibility on ARM
Moses is built on purpose only with dependencies in pure GO. So it runs on ARM processors with no issues. We tested it on QNAP devices and Raspberry for remote control.
## CLI commands
### 1. `list`: list CPEs
example:
```
moses@localhost:9292/> list
cpe list
CPE A54FD with OUI 006754
```
### 2. `readMib SERIAL LEAF/SUBTREE`: read a specific leaf or a subtree
example:
```
moses@localhost:9292/> readMib A54FD Device.
Received an Inform from [::1]:58582 (3191 bytes) with SerialNumber A54FD and EventCodes 6 CONNECTION REQUEST
InternetGatewayDevice.Time.NTPServer1 : pool.ntp.org
InternetGatewayDevice.Time.CurrentLocalTime : 2014-07-11T09:08:25
InternetGatewayDevice.Time.LocalTimeZone : +00:00
InternetGatewayDevice.Time.LocalTimeZoneName : Greenwich Mean Time : Dublin
InternetGatewayDevice.Time.DaylightSavingsUsed : 0
```
### 3. `writeMib SERIAL LEAF VALUE`: issue a SetParameterValues and write a value into a leaf
example:
```
moses@localhost:9292/> writeMib A54FD InternetGatewayDevice.Time.Enable false
Received an Inform from [::1]:58582 (3191 bytes) with SerialNumber A54FD and EventCodes 6 CONNECTION REQUEST
```
### 4. `GetParameterNames SERIAL LEAF/SUBTREE`: issue a GetParameterNames and get all leaves/objects at first level
example:
```
moses@localhost:9292/> GetParameterNames A54FD InternetGatewayDevice.
Received an Inform from [::1]:55385 (3119 bytes) with SerialNumber A54FD and EventCodes 6 CONNECTION REQUEST
InternetGatewayDevice.LANDeviceNumberOfEntries : 0
InternetGatewayDevice.WANDeviceNumberOfEntries : 0
InternetGatewayDevice.DeviceInfo. : 0
InternetGatewayDevice.ManagementServer. : 0
InternetGatewayDevice.Time. : 0
InternetGatewayDevice.Layer3Forwarding. : 0
InternetGatewayDevice.LANDevice. : 0
InternetGatewayDevice.WANDevice. : 0
InternetGatewayDevice.X_00507F_InternetAcc. : 0
InternetGatewayDevice.X_00507F_LAN. : 0
InternetGatewayDevice.X_00507F_NAT. : 0
InternetGatewayDevice.X_00507F_VLAN. : 0
InternetGatewayDevice.X_00507F_Firewall. : 0
InternetGatewayDevice.X_00507F_Applications. : 0
InternetGatewayDevice.X_00507F_System. : 0
InternetGatewayDevice.X_00507F_Status. : 0
InternetGatewayDevice.X_00507F_Diagnostics. : 0
```
## Services exposed
Moses exposes three services:
- http://localhost:9292/acs is the endpoint for the CPEs to connect
- http://localhost:9292/www is the embedded webserver to control your CPEs
- ws://localhost:9292/ws is the websocket endpoint used by the cli to issue commands. Read about the API specification if you want to build a custom frontend which interacts with mosesacs daemon.

View File

@ -0,0 +1,8 @@
FROM golang:1.22.2@sha256:450e3822c7a135e1463cd83e51c8e2eb03b86a02113c89424e6f0f8344bb4168 as builder
WORKDIR /app
COPY ../ .
RUN CGO_ENABLED=0 GOOS=linux go build -o acs cmd/acs/main.go
FROM alpine:3.14@sha256:0f2d5c38dd7a4f4f733e688e3a6733cb5ab1ac6e3cb4603a5dd564e5bfb80eed
COPY --from=builder /app/acs /
ENTRYPOINT ["/acs"]

View File

@ -0,0 +1,61 @@
.PHONY: help build push start stop release remove delete run logs bash
DOCKER_USER ?= oktopusp
DOCKER_APP ?= acs
DOCKER_TAG ?= $(shell git log --format="%h" -n 1)
CONTAINER_SHELL ?= /bin/sh
.DEFAULT_GOAL := help
help:
@echo "Makefile arguments:"
@echo ""
@echo "DOCKER_USER - docker user to build image"
@echo "DOCKER_APP - docker image name"
@echo "DOCKER_TAG - docker image tag"
@echo "CONTAINER_SHELL - container shell e.g:'/bin/bash'"
@echo ""
@echo "Makefile commands:"
@echo ""
@echo "build - docker image build"
@echo "push - push docker iamge to registry"
@echo "run - create and start docker container with the image"
@echo "start - start existent docker container with the image"
@echo "stop - stop docker container running the image"
@echo "remove - remove docker container running the image"
@echo "delete - delete docker image"
@echo "logs - show logs of docker container"
@echo "bash - access container shell"
@echo "release - tag image as latest and push to registry"
build:
@docker build -t ${DOCKER_USER}/${DOCKER_APP}:${DOCKER_TAG} -f Dockerfile ../
run:
@docker run -d --name ${DOCKER_USER}-${DOCKER_APP} ${DOCKER_USER}/${DOCKER_APP}:${DOCKER_TAG}
stop:
@docker stop ${DOCKER_USER}-${DOCKER_APP}
remove: stop
@docker rm ${DOCKER_USER}-${DOCKER_APP}
delete:
@docker rmi ${DOCKER_USER}/${DOCKER_APP}:${DOCKER_TAG}
start:
@docker start ${DOCKER_USER}-${DOCKER_APP}
push:
@docker push ${DOCKER_USER}/${DOCKER_APP}:${DOCKER_TAG}
logs:
@docker logs -f ${DOCKER_USER}-${DOCKER_APP}
bash:
@docker exec -it ${DOCKER_USER}-${DOCKER_APP} ${CONTAINER_SHELL}
release: build
@docker push ${DOCKER_USER}/${DOCKER_APP}:${DOCKER_TAG}
@docker tag ${DOCKER_USER}/${DOCKER_APP}:${DOCKER_TAG} ${DOCKER_USER}/${DOCKER_APP}:latest
@docker push ${DOCKER_USER}/${DOCKER_APP}:latest

View File

@ -0,0 +1,27 @@
package main
import (
"oktopUSP/backend/services/acs/internal/bridge"
"oktopUSP/backend/services/acs/internal/config"
"oktopUSP/backend/services/acs/internal/nats"
"oktopUSP/backend/services/acs/internal/server"
"oktopUSP/backend/services/acs/internal/server/handler"
)
func main() {
c := config.NewConfig()
natsActions := nats.StartNatsClient(c.Nats)
h := handler.NewHandler(natsActions.Publish, natsActions.Subscribe)
b := bridge.NewBridge(
natsActions.Publish,
natsActions.Subscribe,
h,
)
b.StartBridge()
server.Run(c.Acs, natsActions, h)
}

View File

@ -0,0 +1,20 @@
module oktopUSP/backend/services/acs
go 1.22.2
require (
github.com/google/uuid v1.6.0
github.com/joho/godotenv v1.5.1
github.com/nats-io/nats.go v1.34.1
github.com/oleiade/lane v1.0.1
golang.org/x/net v0.24.0
)
require (
github.com/klauspost/compress v1.17.2 // indirect
github.com/nats-io/nkeys v0.4.7 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
golang.org/x/crypto v0.22.0 // indirect
golang.org/x/sys v0.19.0 // indirect
golang.org/x/text v0.14.0 // indirect
)

View File

@ -0,0 +1,22 @@
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4=
github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/nats-io/nats.go v1.34.1 h1:syWey5xaNHZgicYBemv0nohUPPmaLteiBEUT6Q5+F/4=
github.com/nats-io/nats.go v1.34.1/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/oleiade/lane v1.0.1 h1:hXofkn7GEOubzTwNpeL9MaNy8WxolCYb9cInAIeqShU=
github.com/oleiade/lane v1.0.1/go.mod h1:IyTkraa4maLfjq/GmHR+Dxb4kCMtEGeb+qmhlrQ5Mk4=
golang.org/x/crypto v0.22.0 h1:g1v0xeRhjcugydODzvb3mEM9SQ0HGp9s/nh3COQ/C30=
golang.org/x/crypto v0.22.0/go.mod h1:vr6Su+7cTlO45qkww3VDJlzDn0ctJvRgYbC2NvXHt+M=
golang.org/x/net v0.24.0 h1:1PcaxkF854Fu3+lvBIx5SYn9wRlBzzcnHZSiaFFAb0w=
golang.org/x/net v0.24.0/go.mod h1:2Q7sJY5mzlzWjKtYUEXSlBWCdyaioyXzRB2RtU8KVE8=
golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o=
golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=

View File

@ -0,0 +1,112 @@
package auth
import (
"crypto/md5"
"crypto/rand"
"encoding/base64"
"fmt"
"net/http"
"net/url"
"strings"
)
type myjar struct {
jar map[string][]*http.Cookie
}
func (p *myjar) SetCookies(u *url.URL, cookies []*http.Cookie) {
p.jar[u.Host] = cookies
}
func (p *myjar) Cookies(u *url.URL) []*http.Cookie {
return p.jar[u.Host]
}
func Auth(username string, password string, uri string) (bool, error) {
client := &http.Client{}
jar := &myjar{}
jar.jar = make(map[string][]*http.Cookie)
client.Jar = jar
var req *http.Request
var resp *http.Response
var err error
req, err = http.NewRequest("GET", uri, nil)
resp, err = client.Do(req)
if err != nil {
return false, err
}
// if resp.StatusCode == 401 {
// var authorization map[string]string = DigestAuthParams(resp)
// realmHeader := authorization["realm"]
// qopHeader := authorization["qop"]
// nonceHeader := authorization["nonce"]
// opaqueHeader := authorization["opaque"]
// realm := realmHeader
// // A1
// h := md5.New()
// A1 := fmt.Sprintf("%s:%s:%s", username, realm, password)
// io.WriteString(h, A1)
// HA1 := fmt.Sprintf("%x", h.Sum(nil))
// // A2
// h = md5.New()
// A2 := fmt.Sprintf("GET:%s", "/auth")
// io.WriteString(h, A2)
// HA2 := fmt.Sprintf("%x", h.Sum(nil))
// // response
// cnonce := RandomKey()
// response := H(strings.Join([]string{HA1, nonceHeader, "00000001", cnonce, qopHeader, HA2}, ":"))
// // now make header
// AuthHeader := fmt.Sprintf(`Digest username="%s", realm="%s", nonce="%s", uri="%s", cnonce="%s", nc=00000001, qop=%s, response="%s", opaque="%s", algorithm=MD5`,
// username, realmHeader, nonceHeader, "/auth", cnonce, qopHeader, response, opaqueHeader)
// req.Header.Set("Authorization", AuthHeader)
// resp, err = client.Do(req)
// } else {
// return false, fmt.Errorf("response status code should have been 401, it was %v", resp.StatusCode)
// }
return resp.StatusCode == 200, err
}
/*
Parse Authorization header from the http.Request. Returns a map of
auth parameters or nil if the header is not a valid parsable Digest
auth header.
*/
func DigestAuthParams(r *http.Response) map[string]string {
s := strings.SplitN(r.Header.Get("Www-Authenticate"), " ", 2)
if len(s) != 2 || s[0] != "Digest" {
return nil
}
result := map[string]string{}
for _, kv := range strings.Split(s[1], ",") {
parts := strings.SplitN(kv, "=", 2)
if len(parts) != 2 {
continue
}
result[strings.Trim(parts[0], "\" ")] = strings.Trim(parts[1], "\" ")
}
return result
}
func RandomKey() string {
k := make([]byte, 12)
for bytes := 0; bytes < len(k); {
n, err := rand.Read(k[bytes:])
if err != nil {
panic("rand.Read() failed")
}
bytes += n
}
return base64.StdEncoding.EncodeToString(k)
}
/*
H function for MD5 algorithm (returns a lower-case hex MD5 digest)
*/
func H(data string) string {
digest := md5.New()
digest.Write([]byte(data))
return fmt.Sprintf("%x", digest.Sum(nil))
}

View File

@ -0,0 +1,112 @@
package bridge
import (
"encoding/json"
"log"
"net/http"
"oktopUSP/backend/services/acs/internal/server/handler"
"strings"
"time"
"github.com/google/uuid"
"github.com/nats-io/nats.go"
)
type Bridge struct {
pub func(string, []byte) error
sub func(string, func(*nats.Msg)) error
cpes map[string]handler.CPE
h *handler.Handler
}
type msgAnswer struct {
Code int
Msg any
}
const DEVICE_ANSWER_TIMEOUT = 5 * time.Second
func NewBridge(
pub func(string, []byte) error,
sub func(string, func(*nats.Msg)) error,
h *handler.Handler,
) *Bridge {
return &Bridge{
pub: pub,
sub: sub,
cpes: h.Cpes,
h: h,
}
}
func (b *Bridge) StartBridge() {
b.sub(handler.NATS_CWMP_ADAPTER_SUBJECT_PREFIX+"*.api", func(msg *nats.Msg) {
log.Printf("Received message: %s", string(msg.Data))
log.Printf("Subject: %s", msg.Subject)
log.Printf("Reply: %s", msg.Reply)
device := getDeviceFromSubject(msg.Subject)
cpe, ok := b.cpes[device]
if !ok {
log.Printf("Device %s not found", device)
respondMsg(msg.Respond, http.StatusNotFound, "Device not found")
return
}
if cpe.Queue.Size() > 0 {
log.Printf("Device %s is busy", device)
respondMsg(msg.Respond, http.StatusConflict, "Device is busy")
return
}
deviceAnswer := make(chan []byte)
defer close(deviceAnswer)
cpe.Queue.Enqueue(handler.Request{ //TODO: pass user and password too
Id: uuid.NewString(),
CwmpMsg: msg.Data,
Callback: deviceAnswer,
})
err := b.h.ConnectionRequest(cpe)
if err != nil {
log.Println("Failed to do connection request", err)
cpe.Queue.Dequeue()
respondMsg(msg.Respond, http.StatusBadRequest, err.Error())
return
}
//req := cpe.Queue.Dequeue().(handler.Request)
//cpe.Waiting = &req
select {
case response := <-deviceAnswer:
log.Println("Received response from device: ", string(response))
respondMsg(msg.Respond, http.StatusOK, response)
case <-time.After(DEVICE_ANSWER_TIMEOUT):
log.Println("Device response timed out")
respondMsg(msg.Respond, http.StatusRequestTimeout, "Request timeout")
}
})
}
func respondMsg(respond func(data []byte) error, code int, msgData any) {
msg, err := json.Marshal(msgAnswer{
Code: code,
Msg: msgData,
})
if err != nil {
log.Printf("Failed to marshal message: %q", err)
respond([]byte(err.Error()))
return
}
respond(msg)
//log.Println("Responded with message: ", string(msg))
}
func getDeviceFromSubject(subject string) string {
paths := strings.Split(subject, ".")
device := paths[len(paths)-2]
return device
}

View File

@ -0,0 +1,111 @@
package config
import (
"context"
"flag"
"log"
"os"
"strconv"
"github.com/joho/godotenv"
)
const LOCAL_ENV = ".env.local"
type Nats struct {
Url string
Name string
VerifyCertificates bool
Ctx context.Context
}
type Acs struct {
Port string
Tls bool
TlsPort bool
NoTls bool
Username string
Password string
Route string
}
type Config struct {
Acs Acs
Nats Nats
}
func NewConfig() *Config {
loadEnvVariables()
log.SetFlags(log.LstdFlags | log.Lshortfile)
natsUrl := flag.String("nats_url", lookupEnvOrString("NATS_URL", "nats://localhost:4222"), "url for nats server")
natsName := flag.String("nats_name", lookupEnvOrString("NATS_NAME", "adapter"), "name for nats client")
natsVerifyCertificates := flag.Bool("nats_verify_certificates", lookupEnvOrBool("NATS_VERIFY_CERTIFICATES", false), "verify validity of certificates from nats server")
acsPort := flag.String("acs_port", lookupEnvOrString("ACS_PORT", ":9292"), "port for acs server")
acsRoute := flag.String("acs_route", lookupEnvOrString("ACS_ROUTE", "/acs"), "route for acs server")
flHelp := flag.Bool("help", false, "Help")
/*
App variables priority:
1º - Flag through command line.
2º - Env variables.
3º - Default flag value.
*/
flag.Parse()
if *flHelp {
flag.Usage()
os.Exit(0)
}
ctx := context.TODO()
return &Config{
Nats: Nats{
Url: *natsUrl,
Name: *natsName,
VerifyCertificates: *natsVerifyCertificates,
Ctx: ctx,
},
Acs: Acs{
Port: *acsPort,
Route: *acsRoute,
},
}
}
func loadEnvVariables() {
err := godotenv.Load()
if _, err := os.Stat(LOCAL_ENV); err == nil {
_ = godotenv.Overload(LOCAL_ENV)
log.Printf("Loaded variables from '%s'", LOCAL_ENV)
return
}
if err != nil {
log.Println("Error to load environment variables:", err)
} else {
log.Println("Loaded variables from '.env'")
}
}
func lookupEnvOrString(key string, defaultVal string) string {
if val, _ := os.LookupEnv(key); val != "" {
return val
}
return defaultVal
}
func lookupEnvOrBool(key string, defaultVal bool) bool {
if val, _ := os.LookupEnv(key); val != "" {
v, err := strconv.ParseBool(val)
if err != nil {
log.Fatalf("LookupEnvOrBool[%s]: %v", key, err)
}
return v
}
return defaultVal
}

View File

@ -0,0 +1,542 @@
package cwmp
import (
"crypto/rand"
"encoding/xml"
"fmt"
"strconv"
"strings"
"time"
)
type SoapEnvelope struct {
XMLName xml.Name
Header SoapHeader
Body SoapBody
}
type SoapHeader struct {
Id string `xml:"ID"`
}
type SoapBody struct {
CWMPMessage CWMPMessage `xml:",any"`
}
type CWMPMessage struct {
XMLName xml.Name
}
type EventStruct struct {
EventCode string
CommandKey string
}
type ParameterValueStruct struct {
Name string
Value string
}
type ParameterInfoStruct struct {
Name string
Writable string
}
type SetParameterValues_ struct {
ParameterList []ParameterValueStruct `xml:"Body>SetParameterValues>ParameterList>ParameterValueStruct"`
ParameterKey string `xml:"Body>SetParameterValues>ParameterKey>string"`
}
type GetParameterValues_ struct {
ParameterNames []string `xml:"Body>GetParameterValues>ParameterNames>string"`
}
type GetParameterNames_ struct {
ParameterPath []string `xml:"Body>GetParameterNames>ParameterPath"`
NextLevel string `xml:"Body>GetParameterNames>NextLevel"`
}
type GetParameterValuesResponse struct {
ParameterList []ParameterValueStruct `xml:"Body>GetParameterValuesResponse>ParameterList>ParameterValueStruct"`
}
type GetParameterNamesResponse struct {
ParameterList []ParameterInfoStruct `xml:"Body>GetParameterNamesResponse>ParameterList>ParameterInfoStruct"`
}
type CWMPInform struct {
DeviceId DeviceID `xml:"Body>Inform>DeviceId"`
Events []EventStruct `xml:"Body>Inform>Event>EventStruct"`
ParameterList []ParameterValueStruct `xml:"Body>Inform>ParameterList>ParameterValueStruct"`
}
func (s *SoapEnvelope) KindOf() string {
return s.Body.CWMPMessage.XMLName.Local
}
func (i *CWMPInform) GetEvents() string {
res := ""
for idx := range i.Events {
res += i.Events[idx].EventCode
}
return res
}
func (i *CWMPInform) GetConnectionRequest() string {
for idx := range i.ParameterList {
// valid condition for both tr98 and tr181
if strings.HasSuffix(i.ParameterList[idx].Name, "Device.ManagementServer.ConnectionRequestURL") {
return i.ParameterList[idx].Value
}
}
return ""
}
func (i *CWMPInform) GetSoftwareVersion() string {
for idx := range i.ParameterList {
if strings.HasSuffix(i.ParameterList[idx].Name, "Device.DeviceInfo.SoftwareVersion") {
return i.ParameterList[idx].Value
}
}
return ""
}
func (i *CWMPInform) GetHardwareVersion() string {
for idx := range i.ParameterList {
if strings.HasSuffix(i.ParameterList[idx].Name, "Device.DeviceInfo.HardwareVersion") {
return i.ParameterList[idx].Value
}
}
return ""
}
func (i *CWMPInform) GetDataModelType() string {
if strings.HasPrefix(i.ParameterList[0].Name, "InternetGatewayDevice") {
return "TR098"
} else if strings.HasPrefix(i.ParameterList[0].Name, "Device") {
return "TR181"
}
return ""
}
type DeviceID struct {
Manufacturer string
OUI string
SerialNumber string
ProductClass string
}
func InformResponse(mustUnderstand string) string {
mustUnderstandHeader := ""
if mustUnderstand != "" {
mustUnderstandHeader = `<cwmp:ID soap:mustUnderstand="1">` + mustUnderstand + `</cwmp:ID>`
}
return `<?xml version="1.0" encoding="UTF-8"?>
<soap:Envelope xmlns:soapenc="http://schemas.xmlsoap.org/soap/encoding/" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:cwmp="urn:dslforum-org:cwmp-1-0" xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/" xmlns:schemaLocation="urn:dslforum-org:cwmp-1-0 ..\schemas\wt121.xsd" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<soap:Header>` + mustUnderstandHeader + `</soap:Header>
<soap:Body soap:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">
<cwmp:InformResponse>
<MaxEnvelopes>1</MaxEnvelopes>
</cwmp:InformResponse>
</soap:Body>
</soap:Envelope>`
}
func GetParameterValues(leaf string) string {
return `<?xml version="1.0" encoding="UTF-8"?>
<soap:Envelope xmlns:soapenc="http://schemas.xmlsoap.org/soap/encoding/" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:cwmp="urn:dslforum-org:cwmp-1-0" xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/" xmlns:schemaLocation="urn:dslforum-org:cwmp-1-0 ..\schemas\wt121.xsd" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<soap:Header/>
<soap:Body soap:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">
<cwmp:GetParameterValues>
<ParameterNames>
<string>` + leaf + `</string>
</ParameterNames>
</cwmp:GetParameterValues>
</soap:Body>
</soap:Envelope>`
}
func GetParameterMultiValues(leaves []string) string {
msg := `<?xml version="1.0" encoding="UTF-8"?>
<soap:Envelope xmlns:soapenc="http://schemas.xmlsoap.org/soap/encoding/" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:cwmp="urn:dslforum-org:cwmp-1-0" xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/" xmlns:schemaLocation="urn:dslforum-org:cwmp-1-0 ..\schemas\wt121.xsd" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<soap:Header/>
<soap:Body soap:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">
<cwmp:GetParameterValues>
<ParameterNames>`
for idx := range leaves {
msg += `<string>` + leaves[idx] + `</string>`
}
msg += `</ParameterNames>
</cwmp:GetParameterValues>
</soap:Body>
</soap:Envelope>`
return msg
}
func SetParameterValues(leaf string, value string) string {
return `<?xml version="1.0" encoding="UTF-8"?>
<soap:Envelope xmlns:soapenc="http://schemas.xmlsoap.org/soap/encoding/" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:cwmp="urn:dslforum-org:cwmp-1-0" xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/" xmlns:schemaLocation="urn:dslforum-org:cwmp-1-0 ..\schemas\wt121.xsd" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<soap:Header/>
<soap:Body soap:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">
<cwmp:SetParameterValues>
<ParameterList soapenc:arrayType="cwmp:ParameterValueStruct[1]">
<ParameterValueStruct>
<Name>` + leaf + `</Name>
<Value>` + value + `</Value>
</ParameterValueStruct>
</ParameterList>
<ParameterKey>LC1309` + randToken() + `</ParameterKey>
</cwmp:SetParameterValues>
</soap:Body>
</soap:Envelope>`
}
func randToken() string {
b := make([]byte, 8)
rand.Read(b)
return fmt.Sprintf("%x", b)
}
func SetParameterMultiValues(data map[string]string) string {
msg := `<?xml version="1.0" encoding="UTF-8"?>
<soap:Envelope xmlns:soapenc="http://schemas.xmlsoap.org/soap/encoding/" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:cwmp="urn:dslforum-org:cwmp-1-0" xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/" xmlns:schemaLocation="urn:dslforum-org:cwmp-1-0 ..\schemas\wt121.xsd" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<soap:Header/>
<soap:Body soap:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">
<cwmp:SetParameterValues>
<ParameterList soapenc:arrayType="cwmp:ParameterValueStruct[` + string(len(data)) + `]">`
for key, value := range data {
msg += `<ParameterValueStruct>
<Name>` + key + `</Name>
<Value>` + value + `</Value>
</ParameterValueStruct>`
}
msg += `</ParameterList>
<ParameterKey>LC1309` + randToken() + `</ParameterKey>
</cwmp:SetParameterValues>
</soap:Body>
</soap:Envelope>`
return msg
}
func GetParameterNames(leaf string, nextlevel int) string {
return `<?xml version="1.0" encoding="UTF-8"?>
<soap:Envelope xmlns:soapenc="http://schemas.xmlsoap.org/soap/encoding/" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:cwmp="urn:dslforum-org:cwmp-1-0" xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/" xmlns:schemaLocation="urn:dslforum-org:cwmp-1-0 ..\schemas\wt121.xsd" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<soap:Header/>
<soap:Body soap:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">
<cwmp:GetParameterNames>
<ParameterPath>` + leaf + `</ParameterPath>
<NextLevel>` + strconv.Itoa(nextlevel) + `</NextLevel>
</cwmp:GetParameterNames>
</soap:Body>
</soap:Envelope>`
}
func FactoryReset() string {
return `<?xml version="1.0" encoding="UTF-8"?>
<soap:Envelope xmlns:soapenc="http://schemas.xmlsoap.org/soap/encoding/" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:cwmp="urn:dslforum-org:cwmp-1-0" xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/" xmlns:schemaLocation="urn:dslforum-org:cwmp-1-0 ..\schemas\wt121.xsd" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<soap:Header/>
<soap:Body soap:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">
<cwmp:FactoryReset/>
</soap:Body>
</soap:Envelope>`
}
func Download(filetype, url, username, password, filesize string) string {
// 3 Vendor Configuration File
// 1 Firmware Upgrade Image
return `<?xml version="1.0" encoding="UTF-8"?>
<soap:Envelope xmlns:soapenc="http://schemas.xmlsoap.org/soap/encoding/" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:cwmp="urn:dslforum-org:cwmp-1-0" xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/" xmlns:schemaLocation="urn:dslforum-org:cwmp-1-0 ..\schemas\wt121.xsd" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<soap:Header/>
<soap:Body soap:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">
<cwmp:Download>
<CommandKey>MSDWK</CommandKey>
<FileType>` + filetype + `</FileType>
<URL>` + url + `</URL>
<Username>` + username + `</Username>
<Password>` + password + `</Password>
<FileSize>` + filesize + `</FileSize>
<TargetFileName></TargetFileName>
<DelaySeconds>0</DelaySeconds>
<SuccessURL></SuccessURL>
<FailureURL></FailureURL>
</cwmp:Download>
</soap:Body>
</soap:Envelope>`
}
func CancelTransfer() string {
return `<?xml version="1.0" encoding="UTF-8"?>
<soap:Envelope xmlns:soapenc="http://schemas.xmlsoap.org/soap/encoding/" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:cwmp="urn:dslforum-org:cwmp-1-0" xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/" xmlns:schemaLocation="urn:dslforum-org:cwmp-1-0 ..\schemas\wt121.xsd" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<soap:Header/>
<soap:Body soap:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">
<cwmp:CancelTransfer>
<CommandKey></CommandKey>
<cwmp:CancelTransfer/>
</soap:Body>
</soap:Envelope>`
}
type TimeWindowStruct struct {
WindowStart string
WindowEnd string
WindowMode string
UserMessage string
MaxRetries string
}
func (window *TimeWindowStruct) String() string {
return `<TimeWindowStruct>
<WindowStart>` + window.WindowStart + `</WindowStart>
<WindowEnd>` + window.WindowEnd + `</WindowEnd>
<WindowMode>` + window.WindowMode + `</WindowMode>
<UserMessage>` + window.UserMessage + `</UserMessage>
<MaxRetries>` + window.MaxRetries + `</MaxRetries>
</TimeWindowStruct>`
}
func ScheduleDownload(filetype, url, username, password, filesize string, windowslist []fmt.Stringer) string {
ret := `<?xml version="1.0" encoding="UTF-8"?>
<soap:Envelope xmlns:soapenc="http://schemas.xmlsoap.org/soap/encoding/" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:cwmp="urn:dslforum-org:cwmp-1-0" xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/" xmlns:schemaLocation="urn:dslforum-org:cwmp-1-0 ..\schemas\wt121.xsd" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<soap:Header/>
<soap:Body soap:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">
<cwmp:ScheduleDownload>
<CommandKey>MSDWK</CommandKey>
<FileType>` + filetype + `</FileType>
<URL>` + url + `</URL>
<Username>` + username + `</Username>
<Password>` + password + `</Password>
<FileSize>` + filesize + `</FileSize>
<TargetFileName></TargetFileName>
<TimeWindowList>`
for _, op := range windowslist {
ret += op.String()
}
ret += `</TimeWindowList>
</cwmp:ScheduleDownload>
</soap:Body>
</soap:Envelope>`
return ret
}
type InstallOpStruct struct {
Url string
Uuid string
Username string
Password string
ExecutionEnvironment string
}
func (op *InstallOpStruct) String() string {
return `<InstallOpStruct>
<URL>` + op.Url + `</URL>
<UUID>` + op.Uuid + `</UUID>
<Username>` + op.Username + `</Username>
<Password>` + op.Password + `</Password>
<ExecutionEnvRef>` + op.ExecutionEnvironment + `</ExecutionEnvRef>
</InstallOpStruct>`
}
type UpdateOpStruct struct {
Uuid string
Version string
Url string
Username string
Password string
}
func (op *UpdateOpStruct) String() string {
return `<UpdateOpStruct>
<UUID>` + op.Uuid + `</UUID>
<Version>` + op.Version + `</Version>
<URL>` + op.Url + `</URL>
<Username>` + op.Username + `</Username>
<Password>` + op.Password + `</Password>
</UpdateOpStruct>`
}
type UninstallOpStruct struct {
Uuid string
Version string
ExecutionEnvironment string
}
func (op *UninstallOpStruct) String() string {
return `<UninstallOpStruct>
<UUID>` + op.Uuid + `</UUID>
<Version>` + op.Version + `</Version>
<ExecutionEnvRef>` + op.ExecutionEnvironment + `</ExecutionEnvRef>
</UninstallOpStruct>`
}
func ChangeDuState(ops []fmt.Stringer) string {
ret := `<?xml version="1.0" encoding="UTF-8"?>
<soap:Envelope xmlns:soapenc="http://schemas.xmlsoap.org/soap/encoding/" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:cwmp="urn:dslforum-org:cwmp-1-0" xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/" xmlns:schemaLocation="urn:dslforum-org:cwmp-1-0 ..\schemas\wt121.xsd" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<soap:Header/>
<soap:Body soap:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">
<cmwp:ChangeDUState>
<Operations>`
for _, op := range ops {
ret += op.String()
}
ret += `</Operations>
<CommandKey></CommandKey>
</cmwp:ChangeDUState>
</soap:Body>
</soap:Envelope>`
return ret
}
// CPE side
func Inform(serial string) string {
return `<soap:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/" xmlns:soap-enc="http://schemas.xmlsoap.org/soap/encoding/" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:cwmp="urn:dslforum-org:cwmp-1-0"><soap:Header><cwmp:ID soap:mustUnderstand="1">5058</cwmp:ID></soap:Header>
<soap:Body><cwmp:Inform><DeviceId><Manufacturer>ADB Broadband</Manufacturer>
<OUI>0013C8</OUI>
<ProductClass>VV5522</ProductClass>
<SerialNumber>` + serial + `</SerialNumber>
</DeviceId>
<Event soap-enc:arrayType="cwmp:EventStruct[1]">
<EventStruct><EventCode>6 CONNECTION REQUEST</EventCode>
<CommandKey></CommandKey>
</EventStruct>
</Event>
<MaxEnvelopes>1</MaxEnvelopes>
<CurrentTime>` + time.Now().Format(time.RFC3339) + `</CurrentTime>
<RetryCount>0</RetryCount>
<ParameterList soap-enc:arrayType="cwmp:ParameterValueStruct[8]">
<ParameterValueStruct><Name>InternetGatewayDevice.ManagementServer.ConnectionRequestURL</Name>
<Value xsi:type="xsd:string">http://104.199.175.27:7547/ConnectionRequest-` + serial + `</Value>
</ParameterValueStruct>
<ParameterValueStruct><Name>InternetGatewayDevice.ManagementServer.ParameterKey</Name>
<Value xsi:type="xsd:string"></Value>
</ParameterValueStruct>
<ParameterValueStruct><Name>InternetGatewayDevice.DeviceSummary</Name>
<Value xsi:type="xsd:string">InternetGatewayDevice:1.2[](Baseline:1,EthernetLAN:1,WiFiLAN:1,ADSLWAN:1,EthernetWAN:1,QoS:1,QoSDynamicFlow:1,Bridging:1,Time:1,IPPing:1,TraceRoute:1,DeviceAssociation:1,UDPConnReq:1),VoiceService:1.0[1](TAEndpoint:1,SIPEndpoint:1)</Value>
</ParameterValueStruct>
<ParameterValueStruct><Name>InternetGatewayDevice.DeviceInfo.HardwareVersion</Name>
<Value xsi:type="xsd:string">` + serial + `</Value>
</ParameterValueStruct>
<ParameterValueStruct><Name>InternetGatewayDevice.DeviceInfo.ProvisioningCode</Name>
<Value xsi:type="xsd:string">ABCD</Value>
</ParameterValueStruct>
<ParameterValueStruct><Name>InternetGatewayDevice.DeviceInfo.SoftwareVersion</Name>
<Value xsi:type="xsd:string">4.0.8.17785</Value>
</ParameterValueStruct>
<ParameterValueStruct><Name>InternetGatewayDevice.DeviceInfo.SpecVersion</Name>
<Value xsi:type="xsd:string">1.0</Value>
</ParameterValueStruct>
<ParameterValueStruct><Name>InternetGatewayDevice.WANDevice.1.WANConnectionDevice.1.WANIPConnection.1.ExternalIPAddress</Name>
<Value xsi:type="xsd:string">12.0.0.10</Value>
</ParameterValueStruct>
</ParameterList>
</cwmp:Inform>
</soap:Body></soap:Envelope>`
}
/*
func BuildGetParameterValuesResponse(serial string, leaves GetParameterValues_) string {
ret := `<soap:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/" xmlns:soap-enc="http://schemas.xmlsoap.org/soap/encoding/" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:cwmp="urn:dslforum-org:cwmp-1-0">
<soap:Header><cwmp:ID soap:mustUnderstand="1">3</cwmp:ID></soap:Header>
<soap:Body><cwmp:GetParameterValuesResponse>`
db, _ := sqlite3.Open("/tmp/cpe.db")
n_leaves := 0
var temp string
for _, leaf := range leaves.ParameterNames {
sql := "select key, value, tipo from params where key like '" + leaf + "%'"
for s, err := db.Query(sql); err == nil; err = s.Next() {
n_leaves++
var key string
var value string
var tipo string
s.Scan(&key, &value, &tipo)
temp += `<ParameterValueStruct>
<Name>` + key + `</Name>
<Value xsi:type="` + tipo + `">` + value + `</Value>
</ParameterValueStruct>`
}
}
ret += `<ParameterList soap-enc:arrayType="cwmp:ParameterValueStruct[` + strconv.Itoa(n_leaves) + `]">`
ret += temp
ret += `</ParameterList></cwmp:GetParameterValuesResponse></soap:Body></soap:Envelope>`
return ret
}
func BuildGetParameterNamesResponse(serial string, leaves GetParameterNames_) string {
ret := `<soap:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/" xmlns:soap-enc="http://schemas.xmlsoap.org/soap/encoding/" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:cwmp="urn:dslforum-org:cwmp-1-0">
<soap:Header><cwmp:ID soap:mustUnderstand="1">69</cwmp:ID></soap:Header>
<soap:Body><cwmp:GetParameterNamesResponse>`
db, _ := sqlite3.Open("/tmp/cpe.db")
obj := make(map[string]bool)
var temp string
for _, leaf := range leaves.ParameterPath {
fmt.Println(leaf)
sql := "select key, value, tipo from params where key like '" + leaf + "%'"
for s, err := db.Query(sql); err == nil; err = s.Next() {
var key string
var value string
var tipo string
s.Scan(&key, &value, &tipo)
var sp = strings.Split(strings.Split(key, leaf)[1], ".")
nextlevel, _ := strconv.Atoi(leaves.NextLevel)
if nextlevel == 0 {
root := leaf
obj[root] = true
for idx := range sp {
if idx == len(sp)-1 {
root = root + sp[idx]
} else {
root = root + sp[idx] + "."
}
obj[root] = true
}
} else {
if !obj[sp[0]] {
if len(sp) > 1 {
obj[leaf+sp[0]+"."] = true
} else {
obj[leaf+sp[0]] = true
}
}
}
}
}
for o := range obj {
temp += `<ParameterInfoStruct>
<Name>` + o + `</Name>
<Writable>true</Writable>
</ParameterInfoStruct>`
}
fmt.Println(len(obj))
ret += `<ParameterList soap-enc:arrayType="cwmp:ParameterInfoStruct[]">`
ret += temp
ret += `</ParameterList></cwmp:GetParameterNamesResponse></soap:Body></soap:Envelope>`
return ret
}
*/

View File

@ -0,0 +1,149 @@
package nats
import (
"context"
"errors"
"log"
"oktopUSP/backend/services/acs/internal/config"
"time"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
)
const (
CWMP_STREAM_NAME = "cwmp"
)
type NatsActions struct {
Publish func(string, []byte) error
Subscribe func(string, func(*nats.Msg)) error
}
func StartNatsClient(c config.Nats) NatsActions {
var (
nc *nats.Conn
err error
)
opts := defineOptions(c)
log.Printf("Connecting to NATS server %s", c.Url)
for {
nc, err = nats.Connect(c.Url, opts...)
if err != nil {
time.Sleep(5 * time.Second)
continue
}
break
}
log.Printf("Successfully connected to NATS server %s", c.Url)
js, err := jetstream.New(nc)
if err != nil {
log.Fatalf("Failed to create JetStream client: %v", err)
}
streams := defineStreams()
err = createStreams(c.Ctx, js, streams)
if err != nil {
log.Fatalf("Failed to create Consumer: %v", err)
}
consumers := defineConsumers()
err = createConsumers(c.Ctx, js, consumers)
if err != nil {
log.Fatalf("Failed to create Consumer: %v", err)
}
return NatsActions{
Publish: publisher(js),
Subscribe: subscriber(nc),
}
}
func subscriber(nc *nats.Conn) func(string, func(*nats.Msg)) error {
return func(subject string, handler func(*nats.Msg)) error {
_, err := nc.Subscribe(subject, handler)
if err != nil {
log.Printf("error to subscribe to subject %s error: %q", subject, err)
}
return err
}
}
func publisher(js jetstream.JetStream) func(string, []byte) error {
return func(subject string, payload []byte) error {
_, err := js.PublishAsync(subject, payload)
if err != nil {
log.Printf("error to send jetstream message: %q", err)
}
return err
}
}
func createStreams(ctx context.Context, js jetstream.JetStream, streams []string) error {
for _, stream := range streams {
_, err := js.CreateOrUpdateStream(ctx, jetstream.StreamConfig{
Name: stream,
Description: "Stream for " + stream + " messages",
Subjects: []string{stream + ".>"},
Retention: jetstream.InterestPolicy,
})
if err != nil {
return errors.New(err.Error() + " | consumer:" + stream)
}
}
return nil
}
func createConsumers(ctx context.Context, js jetstream.JetStream, consumers []string) error {
for _, consumer := range consumers {
_, err := js.CreateOrUpdateConsumer(ctx, consumer, jetstream.ConsumerConfig{
Name: consumer,
Description: "Consumer for " + consumer + " messages",
AckPolicy: jetstream.AckExplicitPolicy,
Durable: consumer,
})
if err != nil {
return err
}
}
return nil
}
func defineStreams() []string {
return []string{
CWMP_STREAM_NAME,
}
}
func defineConsumers() []string {
return []string{
CWMP_STREAM_NAME,
}
}
func defineOptions(c config.Nats) []nats.Option {
var opts []nats.Option
opts = append(opts, nats.Name(c.Name))
opts = append(opts, nats.MaxReconnects(-1))
opts = append(opts, nats.ReconnectWait(5*time.Second))
opts = append(opts, nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
log.Printf("Got disconnected! Reason: %q\n", err)
}))
opts = append(opts, nats.ReconnectHandler(func(nc *nats.Conn) {
log.Printf("Got reconnected to %v!\n", nc.ConnectedUrl())
}))
opts = append(opts, nats.ClosedHandler(func(nc *nats.Conn) {
log.Printf("Connection closed. Reason: %q\n", nc.LastError())
}))
if c.VerifyCertificates {
opts = append(opts, nats.RootCAs())
}
return opts
}

View File

@ -0,0 +1,164 @@
package handler
import (
"encoding/json"
"encoding/xml"
"fmt"
"io/ioutil"
"log"
"net/http"
"oktopUSP/backend/services/acs/internal/auth"
"oktopUSP/backend/services/acs/internal/cwmp"
"time"
"github.com/oleiade/lane"
)
func (h *Handler) CwmpHandler(w http.ResponseWriter, r *http.Request) {
log.Printf("--> Connection from %s", r.RemoteAddr)
defer r.Body.Close()
defer log.Printf("<-- Connection from %s closed", r.RemoteAddr)
tmp, _ := ioutil.ReadAll(r.Body)
body := string(tmp)
var envelope cwmp.SoapEnvelope
xml.Unmarshal(tmp, &envelope)
messageType := envelope.Body.CWMPMessage.XMLName.Local
log.Println("messageType: ", messageType)
var cpe CPE
var exists bool
w.Header().Set("Server", "Oktopus "+Version)
if messageType != "Inform" {
if cookie, err := r.Cookie("oktopus"); err == nil {
if cpe, exists = h.Cpes[cookie.Value]; !exists {
log.Printf("CPE with serial number %s not found", cookie.Value)
}
log.Printf("CPE with serial number %s found", cookie.Value)
} else {
fmt.Println("cookie 'oktopus' missing")
w.WriteHeader(401)
return
}
}
if messageType == "Inform" {
var Inform cwmp.CWMPInform
xml.Unmarshal(tmp, &Inform)
var addr string
if r.Header.Get("X-Real-Ip") != "" {
addr = r.Header.Get("X-Real-Ip")
} else {
addr = r.RemoteAddr
}
sn := Inform.DeviceId.SerialNumber
if _, exists := h.Cpes[sn]; !exists {
log.Println("New device: " + sn)
h.Cpes[sn] = CPE{
SerialNumber: sn,
LastConnection: time.Now().UTC(),
SoftwareVersion: Inform.GetSoftwareVersion(),
HardwareVersion: Inform.GetHardwareVersion(),
ExternalIPAddress: addr,
ConnectionRequestURL: Inform.GetConnectionRequest(),
OUI: Inform.DeviceId.OUI,
Queue: lane.NewQueue(),
DataModel: Inform.GetDataModelType(),
}
go h.handleCpeStatus(sn)
h.pub(NATS_CWMP_SUBJECT_PREFIX+sn+".info", tmp)
}
obj := h.Cpes[sn]
cpe := &obj
cpe.LastConnection = time.Now().UTC()
log.Printf("Received an Inform from device %s withEventCodes %s", addr, Inform.GetEvents())
expiration := time.Now().AddDate(0, 0, 1)
cookie := http.Cookie{Name: "oktopus", Value: sn, Expires: expiration}
http.SetCookie(w, &cookie)
data, _ := xml.Marshal(cwmp.InformResponse(envelope.Header.Id))
w.Write(data)
} else if messageType == "TransferComplete" {
} else if messageType == "GetRPC" {
} else {
if len(body) == 0 {
log.Println("Got Empty Post")
}
if cpe.Waiting != nil {
log.Println("CPE was waiting for a response, now received something")
var e cwmp.SoapEnvelope
xml.Unmarshal([]byte(body), &e)
log.Println("Kind of envelope: ", e.KindOf())
if e.KindOf() == "GetParameterNamesResponse" {
// var envelope cwmp.GetParameterNamesResponse
// xml.Unmarshal([]byte(body), &envelope)
// msg := new(NatsSendMessage)
// msg.MsgType = "GetParameterNamesResponse"
// msg.Data, _ = json.Marshal(envelope)
log.Println("Receive GetParameterNamesResponse from CPE:", cpe.SerialNumber)
cpe.Waiting.Callback <- tmp
} else if e.KindOf() == "GetParameterValuesResponse" {
var envelope cwmp.GetParameterValuesResponse
xml.Unmarshal([]byte(body), &envelope)
msg := new(NatsSendMessage)
msg.MsgType = "GetParameterValuesResponse"
msg.Data, _ = json.Marshal(envelope)
cpe.Waiting.Callback <- tmp
} else {
log.Println("Unknown message type")
cpe.Waiting.Callback <- tmp
}
cpe.Waiting = nil
} else {
log.Println("CPE was not waiting for a response")
}
log.Printf("CPE %s Queue size: %d", cpe.SerialNumber, cpe.Queue.Size())
if cpe.Queue.Size() > 0 {
req := cpe.Queue.Dequeue().(Request)
cpe.Waiting = &req
log.Println("Sending request to CPE:", req.Id)
w.Header().Set("Connection", "keep-alive")
w.Write(req.CwmpMsg)
} else {
w.Header().Set("Connection", "close")
w.WriteHeader(204)
}
}
h.Cpes[cpe.SerialNumber] = cpe
}
func (h *Handler) ConnectionRequest(cpe CPE) error {
log.Println("--> ConnectionRequest, CPE: ", cpe.SerialNumber)
// log.Println("ConnectionRequestURL: ", cpe.ConnectionRequestURL)
// log.Println("ConnectionRequestUsername: ", cpe.Username)
// log.Println("ConnectionRequestPassword: ", cpe.Password)
ok, err := auth.Auth("", "", cpe.ConnectionRequestURL)
if !ok {
log.Println("Error while authenticating to CPE: ", err)
}
return err
}

View File

@ -0,0 +1,73 @@
package handler
import (
"encoding/json"
"time"
"github.com/nats-io/nats.go"
"github.com/oleiade/lane"
)
const Version = "1.0.0"
type Request struct {
Id string
User string
Password string
CwmpMsg []byte
Callback chan []byte
}
type CPE struct {
SerialNumber string
Manufacturer string
OUI string
ConnectionRequestURL string
SoftwareVersion string
ExternalIPAddress string
Queue *lane.Queue
Waiting *Request
HardwareVersion string
LastConnection time.Time
DataModel string
Username string
Password string
}
type Message struct {
SerialNumber string
Message string
}
type WsMessage struct {
Cmd string
}
type NatsSendMessage struct {
MsgType string
Data json.RawMessage
}
type MsgCPEs struct {
CPES map[string]CPE
}
type Handler struct {
pub func(string, []byte) error
sub func(string, func(*nats.Msg)) error
Cpes map[string]CPE
}
const (
NATS_CWMP_SUBJECT_PREFIX = "cwmp.v1."
NATS_CWMP_ADAPTER_SUBJECT_PREFIX = "cwmp-adapter.v1."
NATS_ADAPTER_SUBJECT_PREFIX = "adapter.v1."
)
func NewHandler(pub func(string, []byte) error, sub func(string, func(*nats.Msg)) error) *Handler {
return &Handler{
pub: pub,
sub: sub,
Cpes: make(map[string]CPE),
}
}

View File

@ -0,0 +1,24 @@
package handler
import (
"log"
"time"
)
// TODO: make these consts dynamic via config
const (
CHECK_STATUS_INTERVAL = 10 * time.Second
KEEP_ALIVE_INTERVAL = 600 * time.Second
)
func (h *Handler) handleCpeStatus(cpe string) {
for {
if time.Since(h.Cpes[cpe].LastConnection) > KEEP_ALIVE_INTERVAL {
delete(h.Cpes, cpe)
break
}
time.Sleep(CHECK_STATUS_INTERVAL)
}
log.Println("CPE", cpe, "is offline")
h.pub("cwmp.v1."+cpe+".status", []byte("0"))
}

View File

@ -0,0 +1,22 @@
package server
import (
"log"
"net/http"
"oktopUSP/backend/services/acs/internal/config"
"oktopUSP/backend/services/acs/internal/nats"
"oktopUSP/backend/services/acs/internal/server/handler"
"os"
)
func Run(c config.Acs, natsActions nats.NatsActions, h *handler.Handler) {
http.HandleFunc(c.Route, h.CwmpHandler)
log.Printf("ACS running at %s%s", c.Port, c.Route)
err := http.ListenAndServe(c.Port, nil)
if err != nil {
log.Fatal(err)
os.Exit(1)
}
}

View File

@ -57,6 +57,7 @@ func (a *Api) StartApi() {
authentication.HandleFunc("/admin/exists", a.adminUserExists).Methods("GET") authentication.HandleFunc("/admin/exists", a.adminUserExists).Methods("GET")
iot := r.PathPrefix("/api/device").Subrouter() iot := r.PathPrefix("/api/device").Subrouter()
iot.HandleFunc("/auth", a.deviceAuth).Methods("GET", "PUT", "DELETE") iot.HandleFunc("/auth", a.deviceAuth).Methods("GET", "PUT", "DELETE")
iot.HandleFunc("/cwmp/{sn}/getParameterNames", a.cwmpGetParameterNamesMsg).Methods("GET", "PUT", "DELETE")
iot.HandleFunc("", a.retrieveDevices).Methods("GET") iot.HandleFunc("", a.retrieveDevices).Methods("GET")
iot.HandleFunc("/{id}", a.retrieveDevices).Methods("GET") iot.HandleFunc("/{id}", a.retrieveDevices).Methods("GET")
iot.HandleFunc("/{sn}/{mtp}/get", a.deviceGetMsg).Methods("PUT") iot.HandleFunc("/{sn}/{mtp}/get", a.deviceGetMsg).Methods("PUT")

View File

@ -0,0 +1,48 @@
package api
import (
"encoding/json"
"encoding/xml"
"io"
"net/http"
"github.com/leandrofars/oktopus/internal/bridge"
"github.com/leandrofars/oktopus/internal/cwmp"
"github.com/leandrofars/oktopus/internal/nats"
"github.com/leandrofars/oktopus/internal/utils"
)
func (a *Api) cwmpGetParameterNamesMsg(w http.ResponseWriter, r *http.Request) {
sn := getSerialNumberFromRequest(r)
payload, err := io.ReadAll(r.Body)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
w.Write(utils.Marshall(err.Error()))
return
}
data, err := bridge.NatsCwmpInteraction(
nats.NATS_CWMP_ADAPTER_SUBJECT_PREFIX+sn+".api",
payload,
w,
a.nc,
)
if err != nil {
return
}
var response cwmp.GetParameterNamesResponse
err = xml.Unmarshal(data, &response)
if err != nil {
err = json.Unmarshal(data, &response)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
w.Write(utils.Marshall(err))
return
}
return
}
w.Write(data)
}

View File

@ -204,3 +204,45 @@ func NatsReqWithoutHttpSet[T entity.DataType](
return answer, nil return answer, nil
} }
func NatsCwmpInteraction(
subj string,
body []byte,
w http.ResponseWriter,
nc *nats.Conn,
) ([]byte, error) {
log.Println("Sending cwmp message")
log.Println("Subject: ", subj)
var answer entity.MsgAnswer[[]byte]
msg, err := nc.Request(subj, body, local.NATS_REQUEST_TIMEOUT)
if err != nil {
log.Println(err)
w.WriteHeader(http.StatusInternalServerError)
w.Write(utils.Marshall("Error to communicate with nats: " + err.Error()))
return nil, err
}
err = json.Unmarshal(msg.Data, &answer)
if err != nil {
var errMsg *entity.MsgAnswer[*string]
err = json.Unmarshal(msg.Data, &errMsg)
if err != nil {
log.Println("Bad answer message formatting: ", err.Error())
w.WriteHeader(http.StatusInternalServerError)
w.Write(msg.Data)
return nil, err
}
log.Printf("Error message received, msg: %s, code: %d", *errMsg.Msg, errMsg.Code)
w.WriteHeader(errMsg.Code)
w.Write(utils.Marshall(*errMsg.Msg))
return nil, errNatsMsgReceivedWithErrorData
}
return answer.Msg, nil
}

View File

@ -0,0 +1,542 @@
package cwmp
import (
"crypto/rand"
"encoding/xml"
"fmt"
"strconv"
"strings"
"time"
)
type SoapEnvelope struct {
XMLName xml.Name
Header SoapHeader
Body SoapBody
}
type SoapHeader struct {
Id string `xml:"ID"`
}
type SoapBody struct {
CWMPMessage CWMPMessage `xml:",any"`
}
type CWMPMessage struct {
XMLName xml.Name
}
type EventStruct struct {
EventCode string
CommandKey string
}
type ParameterValueStruct struct {
Name string
Value string
}
type ParameterInfoStruct struct {
Name string
Writable string
}
type SetParameterValues_ struct {
ParameterList []ParameterValueStruct `xml:"Body>SetParameterValues>ParameterList>ParameterValueStruct"`
ParameterKey string `xml:"Body>SetParameterValues>ParameterKey>string"`
}
type GetParameterValues_ struct {
ParameterNames []string `xml:"Body>GetParameterValues>ParameterNames>string"`
}
type GetParameterNames_ struct {
ParameterPath []string `xml:"Body>GetParameterNames>ParameterPath"`
NextLevel string `xml:"Body>GetParameterNames>NextLevel"`
}
type GetParameterValuesResponse struct {
ParameterList []ParameterValueStruct `xml:"Body>GetParameterValuesResponse>ParameterList>ParameterValueStruct"`
}
type GetParameterNamesResponse struct {
ParameterList []ParameterInfoStruct `xml:"Body>GetParameterNamesResponse>ParameterList>ParameterInfoStruct"`
}
type CWMPInform struct {
DeviceId DeviceID `xml:"Body>Inform>DeviceId"`
Events []EventStruct `xml:"Body>Inform>Event>EventStruct"`
ParameterList []ParameterValueStruct `xml:"Body>Inform>ParameterList>ParameterValueStruct"`
}
func (s *SoapEnvelope) KindOf() string {
return s.Body.CWMPMessage.XMLName.Local
}
func (i *CWMPInform) GetEvents() string {
res := ""
for idx := range i.Events {
res += i.Events[idx].EventCode
}
return res
}
func (i *CWMPInform) GetConnectionRequest() string {
for idx := range i.ParameterList {
// valid condition for both tr98 and tr181
if strings.HasSuffix(i.ParameterList[idx].Name, "Device.ManagementServer.ConnectionRequestURL") {
return i.ParameterList[idx].Value
}
}
return ""
}
func (i *CWMPInform) GetSoftwareVersion() string {
for idx := range i.ParameterList {
if strings.HasSuffix(i.ParameterList[idx].Name, "Device.DeviceInfo.SoftwareVersion") {
return i.ParameterList[idx].Value
}
}
return ""
}
func (i *CWMPInform) GetHardwareVersion() string {
for idx := range i.ParameterList {
if strings.HasSuffix(i.ParameterList[idx].Name, "Device.DeviceInfo.HardwareVersion") {
return i.ParameterList[idx].Value
}
}
return ""
}
func (i *CWMPInform) GetDataModelType() string {
if strings.HasPrefix(i.ParameterList[0].Name, "InternetGatewayDevice") {
return "TR098"
} else if strings.HasPrefix(i.ParameterList[0].Name, "Device") {
return "TR181"
}
return ""
}
type DeviceID struct {
Manufacturer string
OUI string
SerialNumber string
ProductClass string
}
func InformResponse(mustUnderstand string) string {
mustUnderstandHeader := ""
if mustUnderstand != "" {
mustUnderstandHeader = `<cwmp:ID soap:mustUnderstand="1">` + mustUnderstand + `</cwmp:ID>`
}
return `<?xml version="1.0" encoding="UTF-8"?>
<soap:Envelope xmlns:soapenc="http://schemas.xmlsoap.org/soap/encoding/" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:cwmp="urn:dslforum-org:cwmp-1-0" xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/" xmlns:schemaLocation="urn:dslforum-org:cwmp-1-0 ..\schemas\wt121.xsd" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<soap:Header>` + mustUnderstandHeader + `</soap:Header>
<soap:Body soap:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">
<cwmp:InformResponse>
<MaxEnvelopes>1</MaxEnvelopes>
</cwmp:InformResponse>
</soap:Body>
</soap:Envelope>`
}
func GetParameterValues(leaf string) string {
return `<?xml version="1.0" encoding="UTF-8"?>
<soap:Envelope xmlns:soapenc="http://schemas.xmlsoap.org/soap/encoding/" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:cwmp="urn:dslforum-org:cwmp-1-0" xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/" xmlns:schemaLocation="urn:dslforum-org:cwmp-1-0 ..\schemas\wt121.xsd" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<soap:Header/>
<soap:Body soap:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">
<cwmp:GetParameterValues>
<ParameterNames>
<string>` + leaf + `</string>
</ParameterNames>
</cwmp:GetParameterValues>
</soap:Body>
</soap:Envelope>`
}
func GetParameterMultiValues(leaves []string) string {
msg := `<?xml version="1.0" encoding="UTF-8"?>
<soap:Envelope xmlns:soapenc="http://schemas.xmlsoap.org/soap/encoding/" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:cwmp="urn:dslforum-org:cwmp-1-0" xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/" xmlns:schemaLocation="urn:dslforum-org:cwmp-1-0 ..\schemas\wt121.xsd" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<soap:Header/>
<soap:Body soap:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">
<cwmp:GetParameterValues>
<ParameterNames>`
for idx := range leaves {
msg += `<string>` + leaves[idx] + `</string>`
}
msg += `</ParameterNames>
</cwmp:GetParameterValues>
</soap:Body>
</soap:Envelope>`
return msg
}
func SetParameterValues(leaf string, value string) string {
return `<?xml version="1.0" encoding="UTF-8"?>
<soap:Envelope xmlns:soapenc="http://schemas.xmlsoap.org/soap/encoding/" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:cwmp="urn:dslforum-org:cwmp-1-0" xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/" xmlns:schemaLocation="urn:dslforum-org:cwmp-1-0 ..\schemas\wt121.xsd" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<soap:Header/>
<soap:Body soap:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">
<cwmp:SetParameterValues>
<ParameterList soapenc:arrayType="cwmp:ParameterValueStruct[1]">
<ParameterValueStruct>
<Name>` + leaf + `</Name>
<Value>` + value + `</Value>
</ParameterValueStruct>
</ParameterList>
<ParameterKey>LC1309` + randToken() + `</ParameterKey>
</cwmp:SetParameterValues>
</soap:Body>
</soap:Envelope>`
}
func randToken() string {
b := make([]byte, 8)
rand.Read(b)
return fmt.Sprintf("%x", b)
}
func SetParameterMultiValues(data map[string]string) string {
msg := `<?xml version="1.0" encoding="UTF-8"?>
<soap:Envelope xmlns:soapenc="http://schemas.xmlsoap.org/soap/encoding/" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:cwmp="urn:dslforum-org:cwmp-1-0" xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/" xmlns:schemaLocation="urn:dslforum-org:cwmp-1-0 ..\schemas\wt121.xsd" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<soap:Header/>
<soap:Body soap:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">
<cwmp:SetParameterValues>
<ParameterList soapenc:arrayType="cwmp:ParameterValueStruct[` + string(len(data)) + `]">`
for key, value := range data {
msg += `<ParameterValueStruct>
<Name>` + key + `</Name>
<Value>` + value + `</Value>
</ParameterValueStruct>`
}
msg += `</ParameterList>
<ParameterKey>LC1309` + randToken() + `</ParameterKey>
</cwmp:SetParameterValues>
</soap:Body>
</soap:Envelope>`
return msg
}
func GetParameterNames(leaf string, nextlevel int) string {
return `<?xml version="1.0" encoding="UTF-8"?>
<soap:Envelope xmlns:soapenc="http://schemas.xmlsoap.org/soap/encoding/" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:cwmp="urn:dslforum-org:cwmp-1-0" xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/" xmlns:schemaLocation="urn:dslforum-org:cwmp-1-0 ..\schemas\wt121.xsd" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<soap:Header/>
<soap:Body soap:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">
<cwmp:GetParameterNames>
<ParameterPath>` + leaf + `</ParameterPath>
<NextLevel>` + strconv.Itoa(nextlevel) + `</NextLevel>
</cwmp:GetParameterNames>
</soap:Body>
</soap:Envelope>`
}
func FactoryReset() string {
return `<?xml version="1.0" encoding="UTF-8"?>
<soap:Envelope xmlns:soapenc="http://schemas.xmlsoap.org/soap/encoding/" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:cwmp="urn:dslforum-org:cwmp-1-0" xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/" xmlns:schemaLocation="urn:dslforum-org:cwmp-1-0 ..\schemas\wt121.xsd" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<soap:Header/>
<soap:Body soap:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">
<cwmp:FactoryReset/>
</soap:Body>
</soap:Envelope>`
}
func Download(filetype, url, username, password, filesize string) string {
// 3 Vendor Configuration File
// 1 Firmware Upgrade Image
return `<?xml version="1.0" encoding="UTF-8"?>
<soap:Envelope xmlns:soapenc="http://schemas.xmlsoap.org/soap/encoding/" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:cwmp="urn:dslforum-org:cwmp-1-0" xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/" xmlns:schemaLocation="urn:dslforum-org:cwmp-1-0 ..\schemas\wt121.xsd" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<soap:Header/>
<soap:Body soap:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">
<cwmp:Download>
<CommandKey>MSDWK</CommandKey>
<FileType>` + filetype + `</FileType>
<URL>` + url + `</URL>
<Username>` + username + `</Username>
<Password>` + password + `</Password>
<FileSize>` + filesize + `</FileSize>
<TargetFileName></TargetFileName>
<DelaySeconds>0</DelaySeconds>
<SuccessURL></SuccessURL>
<FailureURL></FailureURL>
</cwmp:Download>
</soap:Body>
</soap:Envelope>`
}
func CancelTransfer() string {
return `<?xml version="1.0" encoding="UTF-8"?>
<soap:Envelope xmlns:soapenc="http://schemas.xmlsoap.org/soap/encoding/" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:cwmp="urn:dslforum-org:cwmp-1-0" xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/" xmlns:schemaLocation="urn:dslforum-org:cwmp-1-0 ..\schemas\wt121.xsd" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<soap:Header/>
<soap:Body soap:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">
<cwmp:CancelTransfer>
<CommandKey></CommandKey>
<cwmp:CancelTransfer/>
</soap:Body>
</soap:Envelope>`
}
type TimeWindowStruct struct {
WindowStart string
WindowEnd string
WindowMode string
UserMessage string
MaxRetries string
}
func (window *TimeWindowStruct) String() string {
return `<TimeWindowStruct>
<WindowStart>` + window.WindowStart + `</WindowStart>
<WindowEnd>` + window.WindowEnd + `</WindowEnd>
<WindowMode>` + window.WindowMode + `</WindowMode>
<UserMessage>` + window.UserMessage + `</UserMessage>
<MaxRetries>` + window.MaxRetries + `</MaxRetries>
</TimeWindowStruct>`
}
func ScheduleDownload(filetype, url, username, password, filesize string, windowslist []fmt.Stringer) string {
ret := `<?xml version="1.0" encoding="UTF-8"?>
<soap:Envelope xmlns:soapenc="http://schemas.xmlsoap.org/soap/encoding/" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:cwmp="urn:dslforum-org:cwmp-1-0" xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/" xmlns:schemaLocation="urn:dslforum-org:cwmp-1-0 ..\schemas\wt121.xsd" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<soap:Header/>
<soap:Body soap:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">
<cwmp:ScheduleDownload>
<CommandKey>MSDWK</CommandKey>
<FileType>` + filetype + `</FileType>
<URL>` + url + `</URL>
<Username>` + username + `</Username>
<Password>` + password + `</Password>
<FileSize>` + filesize + `</FileSize>
<TargetFileName></TargetFileName>
<TimeWindowList>`
for _, op := range windowslist {
ret += op.String()
}
ret += `</TimeWindowList>
</cwmp:ScheduleDownload>
</soap:Body>
</soap:Envelope>`
return ret
}
type InstallOpStruct struct {
Url string
Uuid string
Username string
Password string
ExecutionEnvironment string
}
func (op *InstallOpStruct) String() string {
return `<InstallOpStruct>
<URL>` + op.Url + `</URL>
<UUID>` + op.Uuid + `</UUID>
<Username>` + op.Username + `</Username>
<Password>` + op.Password + `</Password>
<ExecutionEnvRef>` + op.ExecutionEnvironment + `</ExecutionEnvRef>
</InstallOpStruct>`
}
type UpdateOpStruct struct {
Uuid string
Version string
Url string
Username string
Password string
}
func (op *UpdateOpStruct) String() string {
return `<UpdateOpStruct>
<UUID>` + op.Uuid + `</UUID>
<Version>` + op.Version + `</Version>
<URL>` + op.Url + `</URL>
<Username>` + op.Username + `</Username>
<Password>` + op.Password + `</Password>
</UpdateOpStruct>`
}
type UninstallOpStruct struct {
Uuid string
Version string
ExecutionEnvironment string
}
func (op *UninstallOpStruct) String() string {
return `<UninstallOpStruct>
<UUID>` + op.Uuid + `</UUID>
<Version>` + op.Version + `</Version>
<ExecutionEnvRef>` + op.ExecutionEnvironment + `</ExecutionEnvRef>
</UninstallOpStruct>`
}
func ChangeDuState(ops []fmt.Stringer) string {
ret := `<?xml version="1.0" encoding="UTF-8"?>
<soap:Envelope xmlns:soapenc="http://schemas.xmlsoap.org/soap/encoding/" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:cwmp="urn:dslforum-org:cwmp-1-0" xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/" xmlns:schemaLocation="urn:dslforum-org:cwmp-1-0 ..\schemas\wt121.xsd" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<soap:Header/>
<soap:Body soap:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">
<cmwp:ChangeDUState>
<Operations>`
for _, op := range ops {
ret += op.String()
}
ret += `</Operations>
<CommandKey></CommandKey>
</cmwp:ChangeDUState>
</soap:Body>
</soap:Envelope>`
return ret
}
// CPE side
func Inform(serial string) string {
return `<soap:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/" xmlns:soap-enc="http://schemas.xmlsoap.org/soap/encoding/" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:cwmp="urn:dslforum-org:cwmp-1-0"><soap:Header><cwmp:ID soap:mustUnderstand="1">5058</cwmp:ID></soap:Header>
<soap:Body><cwmp:Inform><DeviceId><Manufacturer>ADB Broadband</Manufacturer>
<OUI>0013C8</OUI>
<ProductClass>VV5522</ProductClass>
<SerialNumber>` + serial + `</SerialNumber>
</DeviceId>
<Event soap-enc:arrayType="cwmp:EventStruct[1]">
<EventStruct><EventCode>6 CONNECTION REQUEST</EventCode>
<CommandKey></CommandKey>
</EventStruct>
</Event>
<MaxEnvelopes>1</MaxEnvelopes>
<CurrentTime>` + time.Now().Format(time.RFC3339) + `</CurrentTime>
<RetryCount>0</RetryCount>
<ParameterList soap-enc:arrayType="cwmp:ParameterValueStruct[8]">
<ParameterValueStruct><Name>InternetGatewayDevice.ManagementServer.ConnectionRequestURL</Name>
<Value xsi:type="xsd:string">http://104.199.175.27:7547/ConnectionRequest-` + serial + `</Value>
</ParameterValueStruct>
<ParameterValueStruct><Name>InternetGatewayDevice.ManagementServer.ParameterKey</Name>
<Value xsi:type="xsd:string"></Value>
</ParameterValueStruct>
<ParameterValueStruct><Name>InternetGatewayDevice.DeviceSummary</Name>
<Value xsi:type="xsd:string">InternetGatewayDevice:1.2[](Baseline:1,EthernetLAN:1,WiFiLAN:1,ADSLWAN:1,EthernetWAN:1,QoS:1,QoSDynamicFlow:1,Bridging:1,Time:1,IPPing:1,TraceRoute:1,DeviceAssociation:1,UDPConnReq:1),VoiceService:1.0[1](TAEndpoint:1,SIPEndpoint:1)</Value>
</ParameterValueStruct>
<ParameterValueStruct><Name>InternetGatewayDevice.DeviceInfo.HardwareVersion</Name>
<Value xsi:type="xsd:string">` + serial + `</Value>
</ParameterValueStruct>
<ParameterValueStruct><Name>InternetGatewayDevice.DeviceInfo.ProvisioningCode</Name>
<Value xsi:type="xsd:string">ABCD</Value>
</ParameterValueStruct>
<ParameterValueStruct><Name>InternetGatewayDevice.DeviceInfo.SoftwareVersion</Name>
<Value xsi:type="xsd:string">4.0.8.17785</Value>
</ParameterValueStruct>
<ParameterValueStruct><Name>InternetGatewayDevice.DeviceInfo.SpecVersion</Name>
<Value xsi:type="xsd:string">1.0</Value>
</ParameterValueStruct>
<ParameterValueStruct><Name>InternetGatewayDevice.WANDevice.1.WANConnectionDevice.1.WANIPConnection.1.ExternalIPAddress</Name>
<Value xsi:type="xsd:string">12.0.0.10</Value>
</ParameterValueStruct>
</ParameterList>
</cwmp:Inform>
</soap:Body></soap:Envelope>`
}
/*
func BuildGetParameterValuesResponse(serial string, leaves GetParameterValues_) string {
ret := `<soap:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/" xmlns:soap-enc="http://schemas.xmlsoap.org/soap/encoding/" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:cwmp="urn:dslforum-org:cwmp-1-0">
<soap:Header><cwmp:ID soap:mustUnderstand="1">3</cwmp:ID></soap:Header>
<soap:Body><cwmp:GetParameterValuesResponse>`
db, _ := sqlite3.Open("/tmp/cpe.db")
n_leaves := 0
var temp string
for _, leaf := range leaves.ParameterNames {
sql := "select key, value, tipo from params where key like '" + leaf + "%'"
for s, err := db.Query(sql); err == nil; err = s.Next() {
n_leaves++
var key string
var value string
var tipo string
s.Scan(&key, &value, &tipo)
temp += `<ParameterValueStruct>
<Name>` + key + `</Name>
<Value xsi:type="` + tipo + `">` + value + `</Value>
</ParameterValueStruct>`
}
}
ret += `<ParameterList soap-enc:arrayType="cwmp:ParameterValueStruct[` + strconv.Itoa(n_leaves) + `]">`
ret += temp
ret += `</ParameterList></cwmp:GetParameterValuesResponse></soap:Body></soap:Envelope>`
return ret
}
func BuildGetParameterNamesResponse(serial string, leaves GetParameterNames_) string {
ret := `<soap:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/" xmlns:soap-enc="http://schemas.xmlsoap.org/soap/encoding/" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:cwmp="urn:dslforum-org:cwmp-1-0">
<soap:Header><cwmp:ID soap:mustUnderstand="1">69</cwmp:ID></soap:Header>
<soap:Body><cwmp:GetParameterNamesResponse>`
db, _ := sqlite3.Open("/tmp/cpe.db")
obj := make(map[string]bool)
var temp string
for _, leaf := range leaves.ParameterPath {
fmt.Println(leaf)
sql := "select key, value, tipo from params where key like '" + leaf + "%'"
for s, err := db.Query(sql); err == nil; err = s.Next() {
var key string
var value string
var tipo string
s.Scan(&key, &value, &tipo)
var sp = strings.Split(strings.Split(key, leaf)[1], ".")
nextlevel, _ := strconv.Atoi(leaves.NextLevel)
if nextlevel == 0 {
root := leaf
obj[root] = true
for idx := range sp {
if idx == len(sp)-1 {
root = root + sp[idx]
} else {
root = root + sp[idx] + "."
}
obj[root] = true
}
} else {
if !obj[sp[0]] {
if len(sp) > 1 {
obj[leaf+sp[0]+"."] = true
} else {
obj[leaf+sp[0]] = true
}
}
}
}
}
for o := range obj {
temp += `<ParameterInfoStruct>
<Name>` + o + `</Name>
<Writable>true</Writable>
</ParameterInfoStruct>`
}
fmt.Println(len(obj))
ret += `<ParameterList soap-enc:arrayType="cwmp:ParameterInfoStruct[]">`
ret += temp
ret += `</ParameterList></cwmp:GetParameterNamesResponse></soap:Body></soap:Envelope>`
return ret
}
*/

View File

@ -3,7 +3,7 @@ package entity
import "time" import "time"
type DataType interface { type DataType interface {
[]map[string]interface{} | *string | Device | int64 | []Device | []VendorsCount | []ProductClassCount | []StatusCount | time.Duration []map[string]interface{} | *string | Device | int64 | []Device | []VendorsCount | []ProductClassCount | []StatusCount | time.Duration | []byte
} }
type MsgAnswer[T DataType] struct { type MsgAnswer[T DataType] struct {

View File

@ -19,8 +19,10 @@ const (
NATS_WS_ADAPTER_SUBJECT_PREFIX = "ws-adapter.usp.v1." NATS_WS_ADAPTER_SUBJECT_PREFIX = "ws-adapter.usp.v1."
NATS_STOMP_ADAPTER_SUBJECT_PREFIX = "stomp-adapter.usp.v1." NATS_STOMP_ADAPTER_SUBJECT_PREFIX = "stomp-adapter.usp.v1."
DEVICE_SUBJECT_PREFIX = "device.usp.v1." DEVICE_SUBJECT_PREFIX = "device.usp.v1."
DEVICE_CWMP_SUBJECT_PREFIX = "device.cwmp.v1."
BUCKET_NAME = "devices-auth" BUCKET_NAME = "devices-auth"
BUCKET_DESCRIPTION = "Devices authentication" BUCKET_DESCRIPTION = "Devices authentication"
NATS_CWMP_ADAPTER_SUBJECT_PREFIX = "cwmp-adapter.v1."
) )
func StartNatsClient(c config.Nats) (jetstream.JetStream, *nats.Conn, jetstream.KeyValue) { func StartNatsClient(c config.Nats) (jetstream.JetStream, *nats.Conn, jetstream.KeyValue) {

View File

@ -1,2 +0,0 @@
MONGO_URI="mongodb://localhost:27017"
CONTROLLER_PASSWORD="test123"

View File

@ -0,0 +1 @@
*.local

View File

@ -1,2 +1,2 @@
- Abstracts all other mtps existence - Abstracts all other mtps and cwmp/usp existence
- Saves devices info and status - Saves devices info and status

View File

@ -9,7 +9,8 @@ import (
"github.com/OktopUSP/oktopus/backend/services/mtp/adapter/internal/config" "github.com/OktopUSP/oktopus/backend/services/mtp/adapter/internal/config"
"github.com/OktopUSP/oktopus/backend/services/mtp/adapter/internal/db" "github.com/OktopUSP/oktopus/backend/services/mtp/adapter/internal/db"
"github.com/OktopUSP/oktopus/backend/services/mtp/adapter/internal/events" "github.com/OktopUSP/oktopus/backend/services/mtp/adapter/internal/events"
"github.com/OktopUSP/oktopus/backend/services/mtp/adapter/internal/events/handler" "github.com/OktopUSP/oktopus/backend/services/mtp/adapter/internal/events/cwmp_handler"
"github.com/OktopUSP/oktopus/backend/services/mtp/adapter/internal/events/usp_handler"
"github.com/OktopUSP/oktopus/backend/services/mtp/adapter/internal/nats" "github.com/OktopUSP/oktopus/backend/services/mtp/adapter/internal/nats"
"github.com/OktopUSP/oktopus/backend/services/mtp/adapter/internal/reqs" "github.com/OktopUSP/oktopus/backend/services/mtp/adapter/internal/reqs"
) )
@ -24,9 +25,10 @@ func main() {
db := db.NewDatabase(c.Mongo.Ctx, c.Mongo.Uri) db := db.NewDatabase(c.Mongo.Ctx, c.Mongo.Uri)
handler := handler.NewHandler(nc, js, db, c.Controller.ControllerId) usp_handler := usp_handler.NewHandler(nc, js, db, c.Controller.ControllerId)
cwmp_handler := cwmp_handler.NewHandler(nc, js, db, c.Controller.ControllerId)
events.StartEventsListener(c.Nats.Ctx, js, handler) events.StartEventsListener(c.Nats.Ctx, js, usp_handler, cwmp_handler)
reqs.StartRequestsListener(c.Nats.Ctx, nc, db) reqs.StartRequestsListener(c.Nats.Ctx, nc, db)

View File

@ -0,0 +1,500 @@
package cwmp
import (
"crypto/rand"
"encoding/xml"
"fmt"
"strconv"
"strings"
)
type MsgType string
const (
INFORM = "Inform"
)
type SoapEnvelope struct {
XMLName xml.Name
Header SoapHeader
Body SoapBody
}
type SoapHeader struct {
Id string `xml:"ID"`
}
type SoapBody struct {
CWMPMessage CWMPMessage `xml:",any"`
}
type CWMPMessage struct {
XMLName xml.Name
}
type EventStruct struct {
EventCode string
CommandKey string
}
type ParameterValueStruct struct {
Name string
Value string
}
type ParameterInfoStruct struct {
Name string
Writable string
}
type SetParameterValues_ struct {
ParameterList []ParameterValueStruct `xml:"Body>SetParameterValues>ParameterList>ParameterValueStruct"`
ParameterKey string `xml:"Body>SetParameterValues>ParameterKey>string"`
}
type GetParameterValues_ struct {
ParameterNames []string `xml:"Body>GetParameterValues>ParameterNames>string"`
}
type GetParameterNames_ struct {
ParameterPath []string `xml:"Body>GetParameterNames>ParameterPath"`
NextLevel string `xml:"Body>GetParameterNames>NextLevel"`
}
type GetParameterValuesResponse struct {
ParameterList []ParameterValueStruct `xml:"Body>GetParameterValuesResponse>ParameterList>ParameterValueStruct"`
}
type GetParameterNamesResponse struct {
ParameterList []ParameterInfoStruct `xml:"Body>GetParameterNamesResponse>ParameterList>ParameterInfoStruct"`
}
type CWMPInform struct {
DeviceId DeviceID `xml:"Body>Inform>DeviceId"`
Events []EventStruct `xml:"Body>Inform>Event>EventStruct"`
ParameterList []ParameterValueStruct `xml:"Body>Inform>ParameterList>ParameterValueStruct"`
}
func (s *SoapEnvelope) KindOf() string {
return s.Body.CWMPMessage.XMLName.Local
}
func (i *CWMPInform) GetEvents() string {
res := ""
for idx := range i.Events {
res += i.Events[idx].EventCode
}
return res
}
func (i *CWMPInform) GetConnectionRequest() string {
for idx := range i.ParameterList {
// valid condition for both tr98 and tr181
if strings.HasSuffix(i.ParameterList[idx].Name, "Device.ManagementServer.ConnectionRequestURL") {
return i.ParameterList[idx].Value
}
}
return ""
}
func (i *CWMPInform) GetSoftwareVersion() string {
for idx := range i.ParameterList {
if strings.HasSuffix(i.ParameterList[idx].Name, "Device.DeviceInfo.SoftwareVersion") {
return i.ParameterList[idx].Value
}
}
return ""
}
func (i *CWMPInform) GetHardwareVersion() string {
for idx := range i.ParameterList {
if strings.HasSuffix(i.ParameterList[idx].Name, "Device.DeviceInfo.HardwareVersion") {
return i.ParameterList[idx].Value
}
}
return ""
}
func (i *CWMPInform) GetDataModelType() string {
if strings.HasPrefix(i.ParameterList[0].Name, "InternetGatewayDevice") {
return "TR098"
} else if strings.HasPrefix(i.ParameterList[0].Name, "Device") {
return "TR181"
}
return ""
}
type DeviceID struct {
Manufacturer string
OUI string
SerialNumber string
ProductClass string
}
func InformResponse(mustUnderstand string) string {
mustUnderstandHeader := ""
if mustUnderstand != "" {
mustUnderstandHeader = `<cwmp:ID soap:mustUnderstand="1">` + mustUnderstand + `</cwmp:ID>`
}
return `<?xml version="1.0" encoding="UTF-8"?>
<soap:Envelope xmlns:soapenc="http://schemas.xmlsoap.org/soap/encoding/" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:cwmp="urn:dslforum-org:cwmp-1-0" xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/" xmlns:schemaLocation="urn:dslforum-org:cwmp-1-0 ..\schemas\wt121.xsd" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<soap:Header>` + mustUnderstandHeader + `</soap:Header>
<soap:Body soap:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">
<cwmp:InformResponse>
<MaxEnvelopes>1</MaxEnvelopes>
</cwmp:InformResponse>
</soap:Body>
</soap:Envelope>`
}
func GetParameterValues(leaf string) string {
return `<?xml version="1.0" encoding="UTF-8"?>
<soap:Envelope xmlns:soapenc="http://schemas.xmlsoap.org/soap/encoding/" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:cwmp="urn:dslforum-org:cwmp-1-0" xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/" xmlns:schemaLocation="urn:dslforum-org:cwmp-1-0 ..\schemas\wt121.xsd" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<soap:Header/>
<soap:Body soap:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">
<cwmp:GetParameterValues>
<ParameterNames>
<string>` + leaf + `</string>
</ParameterNames>
</cwmp:GetParameterValues>
</soap:Body>
</soap:Envelope>`
}
func GetParameterMultiValues(leaves []string) string {
msg := `<?xml version="1.0" encoding="UTF-8"?>
<soap:Envelope xmlns:soapenc="http://schemas.xmlsoap.org/soap/encoding/" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:cwmp="urn:dslforum-org:cwmp-1-0" xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/" xmlns:schemaLocation="urn:dslforum-org:cwmp-1-0 ..\schemas\wt121.xsd" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<soap:Header/>
<soap:Body soap:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">
<cwmp:GetParameterValues>
<ParameterNames>`
for idx := range leaves {
msg += `<string>` + leaves[idx] + `</string>`
}
msg += `</ParameterNames>
</cwmp:GetParameterValues>
</soap:Body>
</soap:Envelope>`
return msg
}
func SetParameterValues(leaf string, value string) string {
return `<?xml version="1.0" encoding="UTF-8"?>
<soap:Envelope xmlns:soapenc="http://schemas.xmlsoap.org/soap/encoding/" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:cwmp="urn:dslforum-org:cwmp-1-0" xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/" xmlns:schemaLocation="urn:dslforum-org:cwmp-1-0 ..\schemas\wt121.xsd" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<soap:Header/>
<soap:Body soap:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">
<cwmp:SetParameterValues>
<ParameterList soapenc:arrayType="cwmp:ParameterValueStruct[1]">
<ParameterValueStruct>
<Name>` + leaf + `</Name>
<Value>` + value + `</Value>
</ParameterValueStruct>
</ParameterList>
<ParameterKey>LC1309` + randToken() + `</ParameterKey>
</cwmp:SetParameterValues>
</soap:Body>
</soap:Envelope>`
}
func randToken() string {
b := make([]byte, 8)
rand.Read(b)
return fmt.Sprintf("%x", b)
}
func SetParameterMultiValues(data map[string]string) string {
msg := `<?xml version="1.0" encoding="UTF-8"?>
<soap:Envelope xmlns:soapenc="http://schemas.xmlsoap.org/soap/encoding/" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:cwmp="urn:dslforum-org:cwmp-1-0" xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/" xmlns:schemaLocation="urn:dslforum-org:cwmp-1-0 ..\schemas\wt121.xsd" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<soap:Header/>
<soap:Body soap:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">
<cwmp:SetParameterValues>
<ParameterList soapenc:arrayType="cwmp:ParameterValueStruct[` + string(len(data)) + `]">`
for key, value := range data {
msg += `<ParameterValueStruct>
<Name>` + key + `</Name>
<Value>` + value + `</Value>
</ParameterValueStruct>`
}
msg += `</ParameterList>
<ParameterKey>LC1309` + randToken() + `</ParameterKey>
</cwmp:SetParameterValues>
</soap:Body>
</soap:Envelope>`
return msg
}
func GetParameterNames(leaf string, nextlevel int) string {
return `<?xml version="1.0" encoding="UTF-8"?>
<soap:Envelope xmlns:soapenc="http://schemas.xmlsoap.org/soap/encoding/" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:cwmp="urn:dslforum-org:cwmp-1-0" xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/" xmlns:schemaLocation="urn:dslforum-org:cwmp-1-0 ..\schemas\wt121.xsd" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<soap:Header/>
<soap:Body soap:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">
<cwmp:GetParameterNames>
<ParameterPath>` + leaf + `</ParameterPath>
<NextLevel>` + strconv.Itoa(nextlevel) + `</NextLevel>
</cwmp:GetParameterNames>
</soap:Body>
</soap:Envelope>`
}
func FactoryReset() string {
return `<?xml version="1.0" encoding="UTF-8"?>
<soap:Envelope xmlns:soapenc="http://schemas.xmlsoap.org/soap/encoding/" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:cwmp="urn:dslforum-org:cwmp-1-0" xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/" xmlns:schemaLocation="urn:dslforum-org:cwmp-1-0 ..\schemas\wt121.xsd" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<soap:Header/>
<soap:Body soap:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">
<cwmp:FactoryReset/>
</soap:Body>
</soap:Envelope>`
}
func Download(filetype, url, username, password, filesize string) string {
// 3 Vendor Configuration File
// 1 Firmware Upgrade Image
return `<?xml version="1.0" encoding="UTF-8"?>
<soap:Envelope xmlns:soapenc="http://schemas.xmlsoap.org/soap/encoding/" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:cwmp="urn:dslforum-org:cwmp-1-0" xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/" xmlns:schemaLocation="urn:dslforum-org:cwmp-1-0 ..\schemas\wt121.xsd" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<soap:Header/>
<soap:Body soap:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">
<cwmp:Download>
<CommandKey>MSDWK</CommandKey>
<FileType>` + filetype + `</FileType>
<URL>` + url + `</URL>
<Username>` + username + `</Username>
<Password>` + password + `</Password>
<FileSize>` + filesize + `</FileSize>
<TargetFileName></TargetFileName>
<DelaySeconds>0</DelaySeconds>
<SuccessURL></SuccessURL>
<FailureURL></FailureURL>
</cwmp:Download>
</soap:Body>
</soap:Envelope>`
}
func CancelTransfer() string {
return `<?xml version="1.0" encoding="UTF-8"?>
<soap:Envelope xmlns:soapenc="http://schemas.xmlsoap.org/soap/encoding/" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:cwmp="urn:dslforum-org:cwmp-1-0" xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/" xmlns:schemaLocation="urn:dslforum-org:cwmp-1-0 ..\schemas\wt121.xsd" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<soap:Header/>
<soap:Body soap:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">
<cwmp:CancelTransfer>
<CommandKey></CommandKey>
<cwmp:CancelTransfer/>
</soap:Body>
</soap:Envelope>`
}
type TimeWindowStruct struct {
WindowStart string
WindowEnd string
WindowMode string
UserMessage string
MaxRetries string
}
func (window *TimeWindowStruct) String() string {
return `<TimeWindowStruct>
<WindowStart>` + window.WindowStart + `</WindowStart>
<WindowEnd>` + window.WindowEnd + `</WindowEnd>
<WindowMode>` + window.WindowMode + `</WindowMode>
<UserMessage>` + window.UserMessage + `</UserMessage>
<MaxRetries>` + window.MaxRetries + `</MaxRetries>
</TimeWindowStruct>`
}
func ScheduleDownload(filetype, url, username, password, filesize string, windowslist []fmt.Stringer) string {
ret := `<?xml version="1.0" encoding="UTF-8"?>
<soap:Envelope xmlns:soapenc="http://schemas.xmlsoap.org/soap/encoding/" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:cwmp="urn:dslforum-org:cwmp-1-0" xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/" xmlns:schemaLocation="urn:dslforum-org:cwmp-1-0 ..\schemas\wt121.xsd" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<soap:Header/>
<soap:Body soap:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">
<cwmp:ScheduleDownload>
<CommandKey>MSDWK</CommandKey>
<FileType>` + filetype + `</FileType>
<URL>` + url + `</URL>
<Username>` + username + `</Username>
<Password>` + password + `</Password>
<FileSize>` + filesize + `</FileSize>
<TargetFileName></TargetFileName>
<TimeWindowList>`
for _, op := range windowslist {
ret += op.String()
}
ret += `</TimeWindowList>
</cwmp:ScheduleDownload>
</soap:Body>
</soap:Envelope>`
return ret
}
type InstallOpStruct struct {
Url string
Uuid string
Username string
Password string
ExecutionEnvironment string
}
func (op *InstallOpStruct) String() string {
return `<InstallOpStruct>
<URL>` + op.Url + `</URL>
<UUID>` + op.Uuid + `</UUID>
<Username>` + op.Username + `</Username>
<Password>` + op.Password + `</Password>
<ExecutionEnvRef>` + op.ExecutionEnvironment + `</ExecutionEnvRef>
</InstallOpStruct>`
}
type UpdateOpStruct struct {
Uuid string
Version string
Url string
Username string
Password string
}
func (op *UpdateOpStruct) String() string {
return `<UpdateOpStruct>
<UUID>` + op.Uuid + `</UUID>
<Version>` + op.Version + `</Version>
<URL>` + op.Url + `</URL>
<Username>` + op.Username + `</Username>
<Password>` + op.Password + `</Password>
</UpdateOpStruct>`
}
type UninstallOpStruct struct {
Uuid string
Version string
ExecutionEnvironment string
}
func (op *UninstallOpStruct) String() string {
return `<UninstallOpStruct>
<UUID>` + op.Uuid + `</UUID>
<Version>` + op.Version + `</Version>
<ExecutionEnvRef>` + op.ExecutionEnvironment + `</ExecutionEnvRef>
</UninstallOpStruct>`
}
func ChangeDuState(ops []fmt.Stringer) string {
ret := `<?xml version="1.0" encoding="UTF-8"?>
<soap:Envelope xmlns:soapenc="http://schemas.xmlsoap.org/soap/encoding/" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:cwmp="urn:dslforum-org:cwmp-1-0" xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/" xmlns:schemaLocation="urn:dslforum-org:cwmp-1-0 ..\schemas\wt121.xsd" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<soap:Header/>
<soap:Body soap:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">
<cmwp:ChangeDUState>
<Operations>`
for _, op := range ops {
ret += op.String()
}
ret += `</Operations>
<CommandKey></CommandKey>
</cmwp:ChangeDUState>
</soap:Body>
</soap:Envelope>`
return ret
}
/*
func BuildGetParameterValuesResponse(serial string, leaves GetParameterValues_) string {
ret := `<soap:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/" xmlns:soap-enc="http://schemas.xmlsoap.org/soap/encoding/" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:cwmp="urn:dslforum-org:cwmp-1-0">
<soap:Header><cwmp:ID soap:mustUnderstand="1">3</cwmp:ID></soap:Header>
<soap:Body><cwmp:GetParameterValuesResponse>`
db, _ := sqlite3.Open("/tmp/cpe.db")
n_leaves := 0
var temp string
for _, leaf := range leaves.ParameterNames {
sql := "select key, value, tipo from params where key like '" + leaf + "%'"
for s, err := db.Query(sql); err == nil; err = s.Next() {
n_leaves++
var key string
var value string
var tipo string
s.Scan(&key, &value, &tipo)
temp += `<ParameterValueStruct>
<Name>` + key + `</Name>
<Value xsi:type="` + tipo + `">` + value + `</Value>
</ParameterValueStruct>`
}
}
ret += `<ParameterList soap-enc:arrayType="cwmp:ParameterValueStruct[` + strconv.Itoa(n_leaves) + `]">`
ret += temp
ret += `</ParameterList></cwmp:GetParameterValuesResponse></soap:Body></soap:Envelope>`
return ret
}
func BuildGetParameterNamesResponse(serial string, leaves GetParameterNames_) string {
ret := `<soap:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/" xmlns:soap-enc="http://schemas.xmlsoap.org/soap/encoding/" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:cwmp="urn:dslforum-org:cwmp-1-0">
<soap:Header><cwmp:ID soap:mustUnderstand="1">69</cwmp:ID></soap:Header>
<soap:Body><cwmp:GetParameterNamesResponse>`
db, _ := sqlite3.Open("/tmp/cpe.db")
obj := make(map[string]bool)
var temp string
for _, leaf := range leaves.ParameterPath {
fmt.Println(leaf)
sql := "select key, value, tipo from params where key like '" + leaf + "%'"
for s, err := db.Query(sql); err == nil; err = s.Next() {
var key string
var value string
var tipo string
s.Scan(&key, &value, &tipo)
var sp = strings.Split(strings.Split(key, leaf)[1], ".")
nextlevel, _ := strconv.Atoi(leaves.NextLevel)
if nextlevel == 0 {
root := leaf
obj[root] = true
for idx := range sp {
if idx == len(sp)-1 {
root = root + sp[idx]
} else {
root = root + sp[idx] + "."
}
obj[root] = true
}
} else {
if !obj[sp[0]] {
if len(sp) > 1 {
obj[leaf+sp[0]+"."] = true
} else {
obj[leaf+sp[0]] = true
}
}
}
}
}
for o := range obj {
temp += `<ParameterInfoStruct>
<Name>` + o + `</Name>
<Writable>true</Writable>
</ParameterInfoStruct>`
}
fmt.Println(len(obj))
ret += `<ParameterList soap-enc:arrayType="cwmp:ParameterInfoStruct[]">`
ret += temp
ret += `</ParameterList></cwmp:GetParameterNamesResponse></soap:Body></soap:Envelope>`
return ret
}
*/

View File

@ -15,6 +15,7 @@ const (
MQTT MQTT
STOMP STOMP
WEBSOCKETS WEBSOCKETS
CWMP
) )
type Status uint8 type Status uint8
@ -36,6 +37,7 @@ type Device struct {
Mqtt Status Mqtt Status
Stomp Status Stomp Status
Websockets Status Websockets Status
Cwmp Status
} }
func (d *Database) CreateDevice(device Device) error { func (d *Database) CreateDevice(device Device) error {
@ -57,8 +59,11 @@ func (d *Database) CreateDevice(device Device) error {
if deviceExistent.Websockets == Online { if deviceExistent.Websockets == Online {
device.Websockets = Online device.Websockets = Online
} }
if deviceExistent.Cwmp == Online {
device.Cwmp = Online
}
} else { } else {
if err != nil && err != mongo.ErrNoDocuments { if err != mongo.ErrNoDocuments {
log.Println(err) log.Println(err)
return err return err
} }
@ -70,7 +75,7 @@ func (d *Database) CreateDevice(device Device) error {
// transaction. // transaction.
opts := options.FindOneAndReplace().SetUpsert(true) opts := options.FindOneAndReplace().SetUpsert(true)
err = d.devices.FindOneAndReplace(d.ctx, bson.D{{"sn", device.SN}}, device, opts).Decode(&result) err := d.devices.FindOneAndReplace(d.ctx, bson.D{{"sn", device.SN}}, device, opts).Decode(&result)
if err != nil { if err != nil {
if err == mongo.ErrNoDocuments { if err == mongo.ErrNoDocuments {
log.Printf("New device %s added to database", device.SN) log.Printf("New device %s added to database", device.SN)
@ -161,6 +166,8 @@ func (m MTP) String() string {
return "stomp" return "stomp"
case WEBSOCKETS: case WEBSOCKETS:
return "websockets" return "websockets"
case CWMP:
return "cwmp"
} }
return "unknown" return "unknown"
} }

View File

@ -35,16 +35,18 @@ func (d *Database) UpdateStatus(sn string, status Status, mtp MTP) error {
result.Stomp = status result.Stomp = status
case WEBSOCKETS: case WEBSOCKETS:
result.Websockets = status result.Websockets = status
case CWMP:
result.Cwmp = status
} }
/* /*
check if the global status needs update check if the global status needs update
*/ */
var globalStatus primitive.E var globalStatus primitive.E
if result.Mqtt == Offline && result.Stomp == Offline && result.Websockets == Offline { if result.Mqtt == Offline && result.Stomp == Offline && result.Websockets == Offline && result.Cwmp == Offline {
globalStatus = primitive.E{"status", Offline} globalStatus = primitive.E{"status", Offline}
} }
if result.Mqtt == Online || result.Stomp == Online || result.Websockets == Online { if result.Mqtt == Online || result.Stomp == Online || result.Websockets == Online || result.Cwmp == Online {
globalStatus = primitive.E{"status", Online} globalStatus = primitive.E{"status", Online}
} }

View File

@ -0,0 +1,28 @@
package cwmp_handler
import (
"github.com/OktopUSP/oktopus/backend/services/mtp/adapter/internal/db"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
)
const (
OFFLINE = iota
ONLINE
)
type Handler struct {
nc *nats.Conn
js jetstream.JetStream
db db.Database
cid string
}
func NewHandler(nc *nats.Conn, js jetstream.JetStream, d db.Database, cid string) Handler {
return Handler{
nc: nc,
js: js,
db: d,
cid: cid,
}
}

View File

@ -0,0 +1,40 @@
package cwmp_handler
import (
"encoding/xml"
"log"
"github.com/OktopUSP/oktopus/backend/services/mtp/adapter/internal/cwmp"
"github.com/OktopUSP/oktopus/backend/services/mtp/adapter/internal/db"
)
func (h *Handler) HandleDeviceInfo(device string, data []byte, ack func()) {
defer ack()
log.Printf("Device %s info", device)
deviceInfo := parseDeviceInfoMsg(data)
err := h.db.CreateDevice(deviceInfo)
if err != nil {
log.Printf("Failed to create device: %v", err)
}
}
func parseDeviceInfoMsg(data []byte) db.Device {
var inform cwmp.CWMPInform
err := xml.Unmarshal(data, &inform)
if err != nil {
log.Println("Error unmarshalling xml:", err)
}
var device db.Device
device.Vendor = inform.DeviceId.Manufacturer
device.Model = ""
device.Version = inform.GetSoftwareVersion()
device.ProductClass = inform.DeviceId.ProductClass
device.SN = inform.DeviceId.SerialNumber
device.Cwmp = db.Online
device.Status = db.Online
return device
}

View File

@ -0,0 +1,36 @@
package cwmp_handler
import (
"log"
"strconv"
"github.com/OktopUSP/oktopus/backend/services/mtp/adapter/internal/db"
)
func (h *Handler) HandleDeviceStatus(device, subject string, data []byte, ack func()) {
defer ack()
payload, err := strconv.Atoi(string(data))
if err != nil {
log.Printf("Status subject payload message error %q", err)
}
switch payload {
case OFFLINE:
h.deviceOffline(device)
default:
ignoreMsg(subject, "status", data)
}
}
func (h *Handler) deviceOffline(device string) {
log.Printf("Device %s is offline", device)
err := h.db.UpdateStatus(device, db.Offline, db.CWMP)
if err != nil {
log.Fatal(err)
}
}
func ignoreMsg(subject, ctx string, data []byte) {
log.Printf("Unknown message of %s received, subject: %s, payload: %s. Ignored...", ctx, subject, string(data))
}

View File

@ -5,16 +5,17 @@ import (
"log" "log"
"strings" "strings"
"github.com/OktopUSP/oktopus/backend/services/mtp/adapter/internal/events/handler" "github.com/OktopUSP/oktopus/backend/services/mtp/adapter/internal/events/cwmp_handler"
"github.com/OktopUSP/oktopus/backend/services/mtp/adapter/internal/events/usp_handler"
"github.com/OktopUSP/oktopus/backend/services/mtp/adapter/internal/nats" "github.com/OktopUSP/oktopus/backend/services/mtp/adapter/internal/nats"
"github.com/nats-io/nats.go/jetstream" "github.com/nats-io/nats.go/jetstream"
) )
func StartEventsListener(ctx context.Context, js jetstream.JetStream, h handler.Handler) { func StartEventsListener(ctx context.Context, js jetstream.JetStream, uspHandler usp_handler.Handler, cwmpHandler cwmp_handler.Handler) {
log.Println("Listening for nats events") log.Println("Listening for nats events")
events := []string{ uspEvents := []string{
nats.MQTT_STREAM_NAME, nats.MQTT_STREAM_NAME,
nats.WS_STREAM_NAME, nats.WS_STREAM_NAME,
nats.STOMP_STREAM_NAME, nats.STOMP_STREAM_NAME,
@ -22,8 +23,8 @@ func StartEventsListener(ctx context.Context, js jetstream.JetStream, h handler.
nats.OPC_STREAM_NAME, nats.OPC_STREAM_NAME,
} }
for _, event := range events { for _, uspEvent := range uspEvents {
go func() { go func(event string) {
consumer, err := js.Consumer(ctx, event, event) consumer, err := js.Consumer(ctx, event, event)
if err != nil { if err != nil {
log.Fatalf("Failed to get consumer: %v", err) log.Fatalf("Failed to get consumer: %v", err)
@ -50,14 +51,57 @@ func StartEventsListener(ctx context.Context, js jetstream.JetStream, h handler.
switch msgType { switch msgType {
case "status": case "status":
h.HandleDeviceStatus(device, msg.Subject(), data, event, func() { msg.Ack() }) uspHandler.HandleDeviceStatus(device, msg.Subject(), data, event, func() { msg.Ack() })
case "info": case "info":
h.HandleDeviceInfo(device, msg.Subject(), data, event, func() { msg.Ack() }) uspHandler.HandleDeviceInfo(device, msg.Subject(), data, event, func() { msg.Ack() })
default: default:
log.Printf("Unknown message type received, subject: %s", msg.Subject()) log.Printf("Unknown message type received, subject: %s", msg.Subject())
msg.Ack() msg.Ack()
} }
} }
}() }(uspEvent)
}
cwmpEvents := []string{
nats.CWMP_STREAM_NAME,
}
for _, cwmpEvent := range cwmpEvents {
go func(event string) {
consumer, err := js.Consumer(ctx, event, event)
if err != nil {
log.Fatalf("Failed to get consumer: %v", err)
}
messages, err := consumer.Messages()
if err != nil {
log.Fatalf("Failed to get consumer messages: %v", err)
}
defer messages.Stop()
for {
msg, err := messages.Next()
if err != nil {
log.Println("Error to get next message:", err)
continue
}
data := msg.Data()
log.Printf("Received message, subject: %s", msg.Subject())
subject := strings.Split(msg.Subject(), ".")
msgType := subject[len(subject)-1]
device := subject[len(subject)-2]
switch msgType {
case "status":
cwmpHandler.HandleDeviceStatus(device, msg.Subject(), data, func() { msg.Ack() })
case "info":
cwmpHandler.HandleDeviceInfo(device, data, func() { msg.Ack() })
default:
log.Printf("Unknown message type received, subject: %s", msg.Subject())
msg.Ack()
}
}
}(cwmpEvent)
} }
} }

View File

@ -1,72 +0,0 @@
package handler
import (
"log"
"strconv"
"github.com/OktopUSP/oktopus/backend/services/mtp/adapter/internal/db"
"github.com/OktopUSP/oktopus/backend/services/mtp/adapter/internal/usp"
"github.com/OktopUSP/oktopus/backend/services/mtp/adapter/internal/usp/usp_msg"
"google.golang.org/protobuf/proto"
)
func (h *Handler) HandleDeviceStatus(device, subject string, data []byte, mtp string, ack func()) {
defer ack()
payload, err := strconv.Atoi(string(data))
if err != nil {
log.Printf("Status subject payload message error %q", err)
}
switch payload {
case ONLINE:
h.deviceOnline(device, mtp)
case OFFLINE:
h.deviceOffline(device, mtp)
default:
ignoreMsg(subject, "status", data)
}
}
func (h *Handler) deviceOnline(device, mtp string) {
log.Printf("Device %s is online", device)
msg := usp.NewGetMsg(usp_msg.Get{
ParamPaths: []string{
"Device.DeviceInfo.Manufacturer",
"Device.DeviceInfo.ModelName",
"Device.DeviceInfo.SoftwareVersion",
"Device.DeviceInfo.SerialNumber",
"Device.DeviceInfo.ProductClass",
},
MaxDepth: 1,
})
payload, _ := proto.Marshal(&msg)
record := usp.NewUspRecord(payload, device, h.cid)
tr369Message, err := proto.Marshal(&record)
if err != nil {
log.Fatalln("Failed to encode tr369 record:", err)
}
err = h.nc.Publish(mtp+"-adapter.usp.v1."+device+".info", tr369Message)
if err != nil {
log.Printf("Failed to publish online device message: %v", err)
}
}
func (h *Handler) deviceOffline(device, mtp string) {
log.Printf("Device %s is offline", device)
mtpLayer := getMtp(mtp)
err := h.db.UpdateStatus(device, db.Offline, mtpLayer)
if err != nil {
log.Fatal(err)
}
}
func ignoreMsg(subject, ctx string, data []byte) {
log.Printf("Unknown message of %s received, subject: %s, payload: %s. Ignored...", ctx, subject, string(data))
}

View File

@ -1,4 +1,4 @@
package handler package usp_handler
import ( import (
"github.com/OktopUSP/oktopus/backend/services/mtp/adapter/internal/db" "github.com/OktopUSP/oktopus/backend/services/mtp/adapter/internal/db"

View File

@ -1,4 +1,4 @@
package handler package usp_handler
import ( import (
"log" "log"

View File

@ -0,0 +1,38 @@
package usp_handler
import (
"log"
"strconv"
"github.com/OktopUSP/oktopus/backend/services/mtp/adapter/internal/db"
)
func (h *Handler) HandleDeviceStatus(device, subject string, data []byte, mtp string, ack func()) {
defer ack()
payload, err := strconv.Atoi(string(data))
if err != nil {
log.Printf("Status subject payload message error %q", err)
}
switch payload {
case OFFLINE:
h.deviceOffline(device, mtp)
default:
ignoreMsg(subject, "status", data)
}
}
func (h *Handler) deviceOffline(device, mtp string) {
log.Printf("Device %s is offline", device)
mtpLayer := getMtp(mtp)
err := h.db.UpdateStatus(device, db.Offline, mtpLayer)
if err != nil {
log.Fatal(err)
}
}
func ignoreMsg(subject, ctx string, data []byte) {
log.Printf("Unknown message of %s received, subject: %s, payload: %s. Ignored...", ctx, subject, string(data))
}

View File

@ -17,6 +17,7 @@ const (
STOMP_STREAM_NAME = "stomp" STOMP_STREAM_NAME = "stomp"
LORA_STREAM_NAME = "lora" LORA_STREAM_NAME = "lora"
OPC_STREAM_NAME = "opc" OPC_STREAM_NAME = "opc"
CWMP_STREAM_NAME = "cwmp"
ADAPTER_SUBJECT = "adapter" + USP_SUBJECT ADAPTER_SUBJECT = "adapter" + USP_SUBJECT
USP_SUBJECT = ".usp.v1." USP_SUBJECT = ".usp.v1."
BUCKET_NAME = "devices-auth" BUCKET_NAME = "devices-auth"
@ -118,6 +119,7 @@ func defineStreams() []string {
STOMP_STREAM_NAME, STOMP_STREAM_NAME,
LORA_STREAM_NAME, LORA_STREAM_NAME,
OPC_STREAM_NAME, OPC_STREAM_NAME,
CWMP_STREAM_NAME,
} }
} }
@ -128,6 +130,7 @@ func defineConsumers() []string {
STOMP_STREAM_NAME, STOMP_STREAM_NAME,
LORA_STREAM_NAME, LORA_STREAM_NAME,
OPC_STREAM_NAME, OPC_STREAM_NAME,
CWMP_STREAM_NAME,
} }
} }

View File

@ -24,6 +24,7 @@ build: build-frontend build-backend
build-backend: build-backend:
@make build -C ../backend/services/controller/build/ DOCKER_USER=${DOCKER_USER} @make build -C ../backend/services/controller/build/ DOCKER_USER=${DOCKER_USER}
@make build -C ../backend/services/acs/build/ DOCKER_USER=${DOCKER_USER}
@make build -C ../backend/services/utils/socketio/build/ DOCKER_USER=${DOCKER_USER} @make build -C ../backend/services/utils/socketio/build/ DOCKER_USER=${DOCKER_USER}
@make build -C ../backend/services/mtp/adapter/build/ DOCKER_USER=${DOCKER_USER} @make build -C ../backend/services/mtp/adapter/build/ DOCKER_USER=${DOCKER_USER}
@make build -C ../backend/services/mtp/ws-adapter/build/ DOCKER_USER=${DOCKER_USER} @make build -C ../backend/services/mtp/ws-adapter/build/ DOCKER_USER=${DOCKER_USER}

View File

@ -1,2 +1,3 @@
REDIS_ENABLE=false REDIS_ENABLE=false
REDIS_ADDR=redis_usp:6379 REDIS_ADDR=redis_usp:6379
NATS_URL=nats://msg_broker:4222

1
deploy/compose/.env.ws Normal file
View File

@ -0,0 +1 @@
NATS_URL=nats://msg_broker:4222

View File

@ -2,3 +2,5 @@ portainer_data/*
!portainer_data/.gitkeep !portainer_data/.gitkeep
mongo_data/* mongo_data/*
!mongo_data/.gitkeep !mongo_data/.gitkeep
nats_data/*
!nats_data/.gitkeep

View File

@ -8,6 +8,8 @@ services:
- 4222:4222 - 4222:4222
- 8222:8222 - 8222:8222
command: -n oktopus -m 8222 -js command: -n oktopus -m 8222 -js
volumes:
- ./nats_data:/tmp/nats/jetstream
networks: networks:
usp_network: usp_network:
ipv4_address: 172.16.235.2 ipv4_address: 172.16.235.2
@ -64,6 +66,8 @@ services:
container_name: websockets container_name: websockets
ports: ports:
- 8080:8080 - 8080:8080
env_file:
- .env.ws
networks: networks:
usp_network: usp_network:
ipv4_address: 172.16.235.7 ipv4_address: 172.16.235.7

View File

View File

@ -0,0 +1,91 @@
# Oktopus Kubernetes
## Requirements
Kubernetes 1.28+
### Standalone Installation
Single Node:
* 8 vCPUs
* 8 GB RAM
# Installation
## Download Files
```shell
git clone https://github.com/OktopUSP/oktopus
export DEPLOYMENT_PATH=oktopus/deploy/kubernetes
```
## MongoBD
```shell
# Mongo DB Operator at mongodb namespace
helm repo add mongodb https://mongodb.github.io/helm-charts
helm install community-operator mongodb/community-operator --namespace mongodb --create-namespace
# Mongo DB ReplicaSet
export DEPLOYMENT_PATH=oktopus/deploy/kubernetes
kubectl apply -f $DEPLOYMENT_PATH/mongodb.yaml -n mongodb
# Check Installation
kubectl get pods -n mongodb
```
## NATS Server
```shell
# Download the NATS charts
helm repo add nats https://nats-io.github.io/k8s/helm/charts/
# Install NATS with Jetstream Enabled
helm install nats nats/nats --set config.jetstream.enabled=true
```
## Oktopus
<b>Node Ports</b>
For this deployment, we are not using a load balancer and kubernetes is deployed on-premises so we are using Nodeports to insource the client traffic into cluster. below the ports set on deployment files:
1. MQTT broker service (mqtt-svc): 30000
2. Frontend (frontend-svc): 30001
3. SocketIO: (socketio-svc): 30002
4. Controller (controller-svc): 30003
5. WebSocket (ws-svc): 30005
Before deploying the files, edit the frontend.yaml file to set the correct enviroment variables:
```yaml
env:
- name: NEXT_PUBLIC_REST_ENDPOINT
value: "<FRONTEND_IP>:30003"
- name: NEXT_PUBLIC_WS_ENDPOINT
value: "<FRONTEND_IP>:30005"
```
```shell
kubectl apply -f $DEPLOYMENT_PATH/mqtt.yaml
kubectl apply -f $DEPLOYMENT_PATH/mqtt-adapter.yaml
kubectl apply -f $DEPLOYMENT_PATH/adapter.yaml
kubectl apply -f $DEPLOYMENT_PATH/controller.yaml
kubectl apply -f $DEPLOYMENT_PATH/socketio.yaml
kubectl apply -f $DEPLOYMENT_PATH/frontend.yaml
kubectl apply -f $DEPLOYMENT_PATH/ws.yaml
kubectl apply -f $DEPLOYMENT_PATH/ws-adapter.yaml
```
### Checking cluster status:
```shell
kubectl get pods
kubectl get svc
```

View File

@ -0,0 +1,39 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: adapter
spec:
replicas: 1
selector:
matchLabels:
app: adapter
strategy:
type: Recreate # Specify the Recreate strategy
template:
metadata:
labels:
app: adapter
spec:
containers:
- name: adapter
image: oktopusp/adapter:latest
resources:
requests:
memory: 64Mi
cpu: 0.1
limits:
memory: 128Mi
cpu: 0.2
imagePullPolicy: IfNotPresent
env:
- name: NATS_URL
value: "nats://nats:4222"
- name: NATS_NAME
value: "adapter"
- name: NATS_VERIFY_CERTIFICATES
value: "false"
- name: MONGO_URI
value: "mongodb://oktopusp:oktopusp@mongodb-0.mongodb-svc.mongodb.svc.cluster.local:27017,mongodb-1.mongodb-svc.mongodb.svc.cluster.local:27017,mongodb-2.mongodb-svc.mongodb.svc.cluster.local:27017/adapter?replicaSet=mongodb&ssl=false"
- name: CONTROLLER_ID
value: "oktopusController"

View File

@ -1,21 +0,0 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: backend
labels:
app: backend
spec:
replicas: 1
selector:
matchLabels:
app: backend
template:
metadata:
labels:
app: backend
spec:
containers:
- name: backend
image: backend:latest
ports:
- containerPort: 8080

View File

@ -0,0 +1,53 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: controller
spec:
replicas: 1
selector:
matchLabels:
app: controller
strategy:
type: Recreate
template:
metadata:
labels:
app: controller
spec:
containers:
- name: controller
image: oktopusp/controller:latest
resources:
# requests:
# memory: 64Mi
# cpu: 0.5
# limits:
# memory: 256Mi
# cpu: 1
imagePullPolicy: IfNotPresent
env:
- name: NATS_URL
value: "nats://nats:4222"
- name: NATS_NAME
value: "adapter"
- name: NATS_VERIFY_CERTIFICATES
value: "false"
- name: MONGO_URI
value: "mongodb://oktopusp:oktopusp@mongodb-0.mongodb-svc.mongodb.svc.cluster.local:27017,mongodb-1.mongodb-svc.mongodb.svc.cluster.local:27017,mongodb-2.mongodb-svc.mongodb.svc.cluster.local:27017/?replicaSet=mongodb&ssl=false"
- name: REST_API_PORT
value: "8000"
---
apiVersion: v1
kind: Service
metadata:
name: controller-svc
spec:
selector:
app: controller
ports:
- protocol: TCP
port: 8000
targetPort: 8000
nodePort: 30003
type: NodePort

View File

@ -0,0 +1,48 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: frontend
spec:
replicas: 1
selector:
matchLabels:
app: frontend
strategy:
type: Recreate # Specify the Recreate strategy
template:
metadata:
labels:
app: frontend
spec:
containers:
- name: frontend
image: oktopusp/frontend:latest
resources:
requests:
memory: 64Mi
cpu: 100m
limits:
memory: 256Mi
cpu: 200m
ports:
- containerPort: 3000
imagePullPolicy: IfNotPresent
env:
- name: NEXT_PUBLIC_REST_ENDPOINT
value: "192.168.1.130:30003"
- name: NEXT_PUBLIC_WS_ENDPOINT
value: "192.168.1.130:30005"
---
apiVersion: v1
kind: Service
metadata:
name: frontend-svc
spec:
selector:
app: frontend
ports:
- protocol: TCP
port: 3000
targetPort: 3000
nodePort: 30001
type: NodePort

View File

@ -0,0 +1,7 @@
apiVersion: v1
kind: ConfigMap
metadata:
name: haproxy-kubernetes-ingress
namespace: haproxy-controller
data:
syslog-server: "address:stdout, format: raw, facility:daemon"

View File

@ -0,0 +1,8 @@
apiVersion: v1
kind: ConfigMap
metadata:
name: haproxy-tcp
namespace: default
data:
1883: # Port where the frontend is going to listen to.
default/mqtt-svc:1883 # Kubernetes service in the format NS/ServiceName:ServicePort

View File

@ -0,0 +1,8 @@
controller:
service:
tcpPorts:
- name: mqtt
port: 1883
targetPort: 1883
extraArgs:
- --configmap-tcp-services=default/haproxy-tcp

View File

@ -0,0 +1,38 @@
---
apiVersion: mongodbcommunity.mongodb.com/v1
kind: MongoDBCommunity
metadata:
name: mongodb
spec:
members: 3
type: ReplicaSet
version: "6.0.5"
security:
authentication:
modes: ["SCRAM"]
users:
- name: oktopusp
db: admin
passwordSecretRef: # a reference to the secret that will be used to generate the user's password
name: mongo-secret
roles:
- name: clusterAdmin
db: admin
- name: userAdminAnyDatabase
db: admin
- name: readWriteAnyDatabase
db: admin
scramCredentialsSecretName: my-scram
additionalMongodConfig:
storage.wiredTiger.engineConfig.journalCompressor: zlib
# the user credentials will be generated from this secret
# once the credentials are generated, this secret is no longer required
---
apiVersion: v1
kind: Secret
metadata:
name: mongo-secret
type: Opaque
stringData:
password: oktopusp

View File

@ -0,0 +1,44 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: mqtt-adapter
spec:
replicas: 1
selector:
matchLabels:
app: mqtt-adapter
template:
metadata:
labels:
app: mqtt-adapter
spec:
containers:
- name: mqtt-adapter
image: oktopusp/mqtt-adapter:latest
resources:
requests:
memory: 64Mi
cpu: 0.1
limits:
memory: 128Mi
cpu: 0.2
imagePullPolicy: IfNotPresent
env:
- name: NATS_URL
value: "nats:4222"
- name: NATS_NAME
value: "mqtt-adapter"
- name: NATS_VERIFY_CERTIFICATES
value: "false"
- name: MQTT_URL
value: "tcp://mqtt:1883"
- name: MQTT_CLIENT_ID
value: "mqtt-adapter"
- name: MQTT_USERNAME
value: ""
- name: MQTT_PASSWORD
value: ""
- name: MQTT_QOS
value: "1"
- name: MQTT_SERVICE_HOST
value: "mqtt"

View File

@ -0,0 +1,48 @@
apiVersion: v1
kind: Service
metadata:
name: mqtt-svc
spec:
selector:
app: mqtt
ports:
- protocol: TCP
port: 1883
targetPort: 1883
nodePort: 30000
type: NodePort
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: mqtt
spec:
replicas: 1
selector:
matchLabels:
app: mqtt
template:
metadata:
labels:
app: mqtt
spec:
containers:
- name: mqtt
image: rogersacchelli/mqtt:latest
ports:
- containerPort: 1883
resources:
requests:
memory: 64Mi
cpu: 100m
limits:
memory: 256Mi
cpu: 200m
imagePullPolicy: IfNotPresent
env:
- name: MQTT_PORT
value: ":1883"
- name: MQTT_TLS
value: "false"
- name: LOG_LEVEL
value: "0" # 0 - DEBUG

View File

@ -0,0 +1,35 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: socketio
spec:
replicas: 1
selector:
matchLabels:
app: socketio
template:
metadata:
labels:
app: socketio
spec:
containers:
- name: socketio
image: oktopusp/socketio:latest
imagePullPolicy: IfNotPresent
env:
- name: NATS_URL
value: "nats:4222"
---
apiVersion: v1
kind: Service
metadata:
name: socketio-svc
spec:
selector:
app: socketio
ports:
- protocol: TCP
port: 5000
targetPort: 5000
nodePort: 30002
type: NodePort

View File

@ -0,0 +1,46 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: ws-adapter
spec:
replicas: 1
selector:
matchLabels:
app: ws-adapter
template:
metadata:
labels:
app: ws-adapter
spec:
containers:
- name: ws-adapter
image: oktopusp/ws-adapter:latest
resources:
#requests:
# memory: 64Mi
# cpu: 0.1
#limits:
# memory: 256Mi
# cpu: 0.2
imagePullPolicy: IfNotPresent
env:
- name: NATS_URL
value: "nats://nats:4222"
- name: NATS_NAME
value: "ws-adapter"
- name: NATS_VERIFY_CERTIFICATES
value: "0" # 0 - DEBUG
- name: WS_TOKEN
value: ""
- name: WS_AUTH_ENABLE
value: "false"
- name: WS_ADDR
value: "ws-svc"
- name: WS_PORT
value: ":8080"
- name: WS_ROUTE
value: "/ws/controller"
- name: WS_TLS_ENABLE
value: "false"
- name: WS_SKIP_TLS_VERIFY
value: "false"

52
deploy/kubernetes/ws.yaml Normal file
View File

@ -0,0 +1,52 @@
apiVersion: v1
kind: Service
metadata:
name: ws-svc
spec:
selector:
app: ws
ports:
- protocol: TCP
port: 8080
targetPort: 8080
nodePort: 30005
type: NodePort
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: ws
spec:
replicas: 1
selector:
matchLabels:
app: ws
template:
metadata:
labels:
app: ws
spec:
containers:
- name: ws
image: oktopusp/ws:latest
ports:
- containerPort: 8080
resources:
requests:
memory: 64Mi
cpu: 100m
limits:
memory: 256Mi
cpu: 200m
imagePullPolicy: IfNotPresent
env:
- name: SERVER_PORT
value: ":8080"
- name: SERVER_AUTH_TOKEN
value: ""
- name: SERVER_AUTH_ENABLE
value: "false"
- name: CONTROLLER_EID
value: "oktopusController"
- name: SERVER_TLS_ENABLE
value: "false"

View File

@ -1,17 +1,17 @@
# ----------------------------- Local Environment ---------------------------- # # ----------------------------- Local Environment ---------------------------- #
NEXT_PUBLIC_REST_ENPOINT="http://localhost:8000/api" NEXT_PUBLIC_REST_ENDPOINT="http://localhost:8000/api"
NEXT_PUBLIC_WS_ENPOINT="http://localhost:5000/" NEXT_PUBLIC_WS_ENDPOINT="http://localhost:5000/"
# ---------------------------------------------------------------------------- # # ---------------------------------------------------------------------------- #
# -------------------------- Production Environment -------------------------- # # -------------------------- Production Environment -------------------------- #
#NEXT_PUBLIC_REST_ENPOINT="https://demo.oktopus.app.br/api" #NEXT_PUBLIC_REST_ENDPOINT="https://demo.oktopus.app.br/api"
#NEXT_PUBLIC_WS_ENPOINT="https://demo.oktopus.app.br/" #NEXT_PUBLIC_WS_ENDPOINT="https://demo.oktopus.app.br/"
# ---------------------------------------------------------------------------- # # ---------------------------------------------------------------------------- #
# ---------------------------- Mocked Environment ---------------------------- # # ---------------------------- Mocked Environment ---------------------------- #
#NEXT_PUBLIC_REST_ENPOINT="https://d9962fd9-2464-4a30-9a86-a15a04b57ad0.mock.pstmn.io" #NEXT_PUBLIC_REST_ENDPOINT="https://d9962fd9-2464-4a30-9a86-a15a04b57ad0.mock.pstmn.io"
# ---------------------------------------------------------------------------- # # ---------------------------------------------------------------------------- #

View File

@ -2,11 +2,11 @@ FROM node:16.20.2-alpine as builder
WORKDIR /app WORKDIR /app
COPY ../ . COPY ../ ./
RUN npm install RUN npm install
RUN NEXT_PUBLIC_REST_ENPOINT=REST_API_URL NEXT_PUBLIC_WS_ENPOINT=WS_URL npm run build RUN NEXT_PUBLIC_REST_ENDPOINT=REST_API_URL NEXT_PUBLIC_WS_ENDPOINT=WS_URL npm run build
RUN ls -la && echo "Listing directory contents done" RUN ls -la && echo "Listing directory contents done"

View File

@ -3,14 +3,14 @@ set -Ex
function apply_path { function apply_path {
echo "Check that we have NEXT_PUBLIC_REST_ENPOINT vars" echo "Check that we have NEXT_PUBLIC_REST_ENDPOINT vars"
test -n "$NEXT_PUBLIC_REST_ENPOINT" test -n "$NEXT_PUBLIC_REST_ENDPOINT"
echo "Check that we have NEXT_PUBLIC_WS_ENPOINT vars" echo "Check that we have NEXT_PUBLIC_WS_ENDPOINT vars"
test -n "$NEXT_PUBLIC_WS_ENPOINT" test -n "$NEXT_PUBLIC_WS_ENDPOINT"
find /app/.next \( -type d -name .git -prune \) -o -type f -print0 | xargs -0 sed -i "s#REST_API_URL#$NEXT_PUBLIC_REST_ENPOINT#g" find /app/.next \( -type d -name .git -prune \) -o -type f -print0 | xargs -0 sed -i "s#REST_API_URL#$NEXT_PUBLIC_REST_ENDPOINT#g"
find /app/.next \( -type d -name .git -prune \) -o -type f -print0 | xargs -0 sed -i "s#WS_URL#$NEXT_PUBLIC_WS_ENPOINT#g" find /app/.next \( -type d -name .git -prune \) -o -type f -print0 | xargs -0 sed -i "s#WS_URL#$NEXT_PUBLIC_WS_ENDPOINT#g"
} }
apply_path apply_path