fix(cwmp): device status and message handler

This commit is contained in:
leandrofars 2024-05-21 19:27:55 -03:00
parent 59d99f9ff2
commit 6829755345
8 changed files with 103 additions and 51 deletions

View File

@ -7,7 +7,6 @@ require (
github.com/joho/godotenv v1.5.1
github.com/nats-io/nats.go v1.34.1
github.com/oleiade/lane v1.0.1
golang.org/x/net v0.24.0
)
require (

View File

@ -14,8 +14,6 @@ github.com/oleiade/lane v1.0.1 h1:hXofkn7GEOubzTwNpeL9MaNy8WxolCYb9cInAIeqShU=
github.com/oleiade/lane v1.0.1/go.mod h1:IyTkraa4maLfjq/GmHR+Dxb4kCMtEGeb+qmhlrQ5Mk4=
golang.org/x/crypto v0.22.0 h1:g1v0xeRhjcugydODzvb3mEM9SQ0HGp9s/nh3COQ/C30=
golang.org/x/crypto v0.22.0/go.mod h1:vr6Su+7cTlO45qkww3VDJlzDn0ctJvRgYbC2NvXHt+M=
golang.org/x/net v0.24.0 h1:1PcaxkF854Fu3+lvBIx5SYn9wRlBzzcnHZSiaFFAb0w=
golang.org/x/net v0.24.0/go.mod h1:2Q7sJY5mzlzWjKtYUEXSlBWCdyaioyXzRB2RtU8KVE8=
golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o=
golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=

View File

@ -5,9 +5,12 @@ import (
"crypto/rand"
"encoding/base64"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"strings"
"time"
)
type myjar struct {
@ -22,51 +25,81 @@ func (p *myjar) Cookies(u *url.URL) []*http.Cookie {
return p.jar[u.Host]
}
func Auth(username string, password string, uri string) (bool, error) {
client := &http.Client{}
func Auth(username string, password string, url string) (bool, error) {
client := &http.Client{Timeout: time.Second * 1}
jar := &myjar{}
jar.jar = make(map[string][]*http.Cookie)
client.Jar = jar
var req *http.Request
var resp *http.Response
var err error
req, err = http.NewRequest("GET", uri, nil)
req, err = http.NewRequest("GET", url, nil)
if err != nil {
return false, err
}
req.Header.Set("Connection", "keep-alive")
req.Header.Del("Accept-Encoding")
resp, err = client.Do(req)
if err != nil {
return false, err
}
// if resp.StatusCode == 401 {
// var authorization map[string]string = DigestAuthParams(resp)
// realmHeader := authorization["realm"]
// qopHeader := authorization["qop"]
// nonceHeader := authorization["nonce"]
// opaqueHeader := authorization["opaque"]
// realm := realmHeader
// // A1
// h := md5.New()
// A1 := fmt.Sprintf("%s:%s:%s", username, realm, password)
// io.WriteString(h, A1)
// HA1 := fmt.Sprintf("%x", h.Sum(nil))
// // A2
// h = md5.New()
// A2 := fmt.Sprintf("GET:%s", "/auth")
// io.WriteString(h, A2)
// HA2 := fmt.Sprintf("%x", h.Sum(nil))
defer resp.Body.Close()
io.Copy(ioutil.Discard, resp.Body)
// // response
// cnonce := RandomKey()
// response := H(strings.Join([]string{HA1, nonceHeader, "00000001", cnonce, qopHeader, HA2}, ":"))
if resp.StatusCode == 401 {
var authorization map[string]string = DigestAuthParams(resp)
realmHeader := authorization["realm"]
qopHeader := authorization["qop"]
nonceHeader := authorization["nonce"]
opaqueHeader := authorization["opaque"]
realm := realmHeader
// // now make header
// AuthHeader := fmt.Sprintf(`Digest username="%s", realm="%s", nonce="%s", uri="%s", cnonce="%s", nc=00000001, qop=%s, response="%s", opaque="%s", algorithm=MD5`,
// username, realmHeader, nonceHeader, "/auth", cnonce, qopHeader, response, opaqueHeader)
// req.Header.Set("Authorization", AuthHeader)
// resp, err = client.Do(req)
// } else {
// return false, fmt.Errorf("response status code should have been 401, it was %v", resp.StatusCode)
// }
return resp.StatusCode == 200, err
uri := "/"
if surl := strings.Split(url, "/"); len(surl) == 4 {
uri = "/" + surl[3]
}
// A1
h := md5.New()
A1 := fmt.Sprintf("%s:%s:%s", username, realm, password)
io.WriteString(h, A1)
HA1 := fmt.Sprintf("%x", h.Sum(nil))
// A2
h = md5.New()
A2 := fmt.Sprintf("GET:%s", uri)
io.WriteString(h, A2)
HA2 := fmt.Sprintf("%x", h.Sum(nil))
// response
cnonce := RandomKey()
response := H(strings.Join([]string{HA1, nonceHeader, "00000001", cnonce, qopHeader, HA2}, ":"))
// now make header
AuthHeader := fmt.Sprintf(`Digest username="%s",realm="%s",nonce="%s",uri="%s",qop=%s,nc=00000001,cnonce="%s",response="%s",opaque="%s",algorithm=MD5`,
username, realmHeader, nonceHeader, uri, qopHeader, cnonce, response, opaqueHeader)
req.Header.Set("Authorization", AuthHeader)
resp, err = client.Do(req)
if err != nil {
return false, err
}
io.Copy(ioutil.Discard, resp.Body)
if resp.StatusCode == 401 {
return false, fmt.Errorf("Authentication error!")
}
return true, err
} else if resp.StatusCode == 200 {
return true, err
}
return false, fmt.Errorf("Response status code not expected")
}
/*
@ -90,6 +123,7 @@ func DigestAuthParams(r *http.Response) map[string]string {
}
return result
}
func RandomKey() string {
k := make([]byte, 12)
for bytes := 0; bytes < len(k); {

View File

@ -40,8 +40,9 @@ func NewBridge(
}
func (b *Bridge) StartBridge() {
b.sub(handler.NATS_CWMP_ADAPTER_SUBJECT_PREFIX+"*.api", func(msg *nats.Msg) {
log.Printf("Received message: %s", string(msg.Data))
//log.Printf("Received message: %s", string(msg.Data))
log.Printf("Subject: %s", msg.Subject)
log.Printf("Reply: %s", msg.Reply)
@ -53,6 +54,8 @@ func (b *Bridge) StartBridge() {
return
}
if cpe.Queue.Size() > 0 {
log.Println("Queue size: ", cpe.Queue.Size())
log.Println("Queue data: ", cpe.Queue)
log.Printf("Device %s is busy", device)
respondMsg(msg.Respond, http.StatusConflict, "Device is busy")
return
@ -78,6 +81,8 @@ func (b *Bridge) StartBridge() {
//req := cpe.Queue.Dequeue().(handler.Request)
//cpe.Waiting = &req
defer cpe.Queue.Dequeue()
select {
case response := <-deviceAnswer:
log.Println("Received response from device: ", string(response))

View File

@ -30,6 +30,8 @@ type Acs struct {
Password string
Route string
DebugMode bool
ConnReqUsername string
ConnReqPassword string
}
type Config struct {
@ -47,6 +49,8 @@ func NewConfig() *Config {
natsVerifyCertificates := flag.Bool("nats_verify_certificates", lookupEnvOrBool("NATS_VERIFY_CERTIFICATES", false), "verify validity of certificates from nats server")
acsPort := flag.String("acs_port", lookupEnvOrString("ACS_PORT", ":9292"), "port for acs server")
acsRoute := flag.String("acs_route", lookupEnvOrString("ACS_ROUTE", "/acs"), "route for acs server")
connReqUser := flag.String("connrq_user", lookupEnvOrString("CONN_RQ_USER", ""), "Connection Request Username")
connReqPasswd := flag.String("connrq_passwd", lookupEnvOrString("CONN_RQ_PASSWD", ""), "Connection Request Password")
acsKeepAliveInterval := flag.Int("acs_keep_alive_interval", lookupEnvOrInt("KEEP_ALIVE_INTERVAL", 300), "keep alive interval in seconds for acs server")
cwmpDebugMode := flag.Bool("debug_mode", lookupEnvOrBool("CWMP_DEBUG", false), "enable or disable cwmp logs in debug mode")
flHelp := flag.Bool("help", false, "Help")
@ -77,8 +81,10 @@ func NewConfig() *Config {
Acs: Acs{
Port: *acsPort,
Route: *acsRoute,
KeepAliveInterval: time.Duration(*acsKeepAliveInterval),
KeepAliveInterval: time.Duration(*acsKeepAliveInterval) * time.Second,
DebugMode: *cwmpDebugMode,
ConnReqUsername: *connReqUser,
ConnReqPassword: *connReqPasswd,
},
}
}

View File

@ -69,7 +69,7 @@ func (h *Handler) CwmpHandler(w http.ResponseWriter, r *http.Request) {
log.Println("New device: " + sn)
h.Cpes[sn] = CPE{
SerialNumber: sn,
LastConnection: time.Now().UTC(),
LastConnection: time.Now(),
SoftwareVersion: Inform.GetSoftwareVersion(),
HardwareVersion: Inform.GetHardwareVersion(),
ExternalIPAddress: addr,
@ -78,12 +78,11 @@ func (h *Handler) CwmpHandler(w http.ResponseWriter, r *http.Request) {
Queue: lane.NewQueue(),
DataModel: Inform.GetDataModelType(),
}
go h.handleCpeStatus(sn)
h.pub(NATS_CWMP_SUBJECT_PREFIX+sn+".info", tmp)
}
obj := h.Cpes[sn]
cpe := &obj
cpe.LastConnection = time.Now().UTC()
cpe.LastConnection = time.Now()
log.Printf("Received an Inform from device %s withEventCodes %s", addr, Inform.GetEvents())
@ -91,8 +90,8 @@ func (h *Handler) CwmpHandler(w http.ResponseWriter, r *http.Request) {
cookie := http.Cookie{Name: "oktopus", Value: sn, Expires: expiration}
http.SetCookie(w, &cookie)
data, _ := xml.Marshal(cwmp.InformResponse(envelope.Header.Id))
w.Write(data)
//data, _ := xml.Marshal(cwmp.InformResponse(envelope.Header.Id))
fmt.Fprintf(w, cwmp.InformResponse(envelope.Header.Id))
} else if messageType == "TransferComplete" {
} else if messageType == "GetRPC" {
@ -160,9 +159,11 @@ func (h *Handler) ConnectionRequest(cpe CPE) error {
// log.Println("ConnectionRequestUsername: ", cpe.Username)
// log.Println("ConnectionRequestPassword: ", cpe.Password)
ok, err := auth.Auth("", "", cpe.ConnectionRequestURL)
ok, err := auth.Auth(h.acsConfig.ConnReqUsername, h.acsConfig.ConnReqPassword, cpe.ConnectionRequestURL)
if !ok {
cpe.Queue.Dequeue()
log.Println("Error while authenticating to CPE: ", err)
}
log.Println("<-- Successfully authenticated to CPE", cpe.SerialNumber)
return err
}

View File

@ -5,14 +5,21 @@ import (
"time"
)
func (h *Handler) handleCpeStatus(cpe string) {
func (h *Handler) HandleCpeStatus() {
for {
for cpe := range h.Cpes {
if cpe == "" {
continue
}
log.Println("Checking CPE " + cpe + " status")
if time.Since(h.Cpes[cpe].LastConnection) > h.acsConfig.KeepAliveInterval {
log.Printf("LastConnection: %s, KeepAliveInterval: %s", h.Cpes[cpe].LastConnection, h.acsConfig.KeepAliveInterval)
log.Println("CPE", cpe, "is offline")
h.pub("cwmp.v1."+cpe+".status", []byte("0"))
delete(h.Cpes, cpe)
break
}
time.Sleep(h.acsConfig.KeepAliveInterval)
}
log.Println("CPE", cpe, "is offline")
h.pub("cwmp.v1."+cpe+".status", []byte("0"))
time.Sleep(10 * time.Second)
}
}

View File

@ -12,6 +12,8 @@ import (
func Run(c config.Acs, natsActions nats.NatsActions, h *handler.Handler) {
http.HandleFunc(c.Route, h.CwmpHandler)
go h.HandleCpeStatus()
log.Printf("ACS running at %s%s", c.Port, c.Route)
err := http.ListenAndServe(c.Port, nil)