diff --git a/backend/services/acs/go.mod b/backend/services/acs/go.mod index f57be7f..b15b275 100644 --- a/backend/services/acs/go.mod +++ b/backend/services/acs/go.mod @@ -7,7 +7,6 @@ require ( github.com/joho/godotenv v1.5.1 github.com/nats-io/nats.go v1.34.1 github.com/oleiade/lane v1.0.1 - golang.org/x/net v0.24.0 ) require ( diff --git a/backend/services/acs/go.sum b/backend/services/acs/go.sum index c223b41..3035182 100644 --- a/backend/services/acs/go.sum +++ b/backend/services/acs/go.sum @@ -14,8 +14,6 @@ github.com/oleiade/lane v1.0.1 h1:hXofkn7GEOubzTwNpeL9MaNy8WxolCYb9cInAIeqShU= github.com/oleiade/lane v1.0.1/go.mod h1:IyTkraa4maLfjq/GmHR+Dxb4kCMtEGeb+qmhlrQ5Mk4= golang.org/x/crypto v0.22.0 h1:g1v0xeRhjcugydODzvb3mEM9SQ0HGp9s/nh3COQ/C30= golang.org/x/crypto v0.22.0/go.mod h1:vr6Su+7cTlO45qkww3VDJlzDn0ctJvRgYbC2NvXHt+M= -golang.org/x/net v0.24.0 h1:1PcaxkF854Fu3+lvBIx5SYn9wRlBzzcnHZSiaFFAb0w= -golang.org/x/net v0.24.0/go.mod h1:2Q7sJY5mzlzWjKtYUEXSlBWCdyaioyXzRB2RtU8KVE8= golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o= golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= diff --git a/backend/services/acs/internal/auth/auth.go b/backend/services/acs/internal/auth/auth.go index 3b20e03..d38ad15 100644 --- a/backend/services/acs/internal/auth/auth.go +++ b/backend/services/acs/internal/auth/auth.go @@ -5,9 +5,12 @@ import ( "crypto/rand" "encoding/base64" "fmt" + "io" + "io/ioutil" "net/http" "net/url" "strings" + "time" ) type myjar struct { @@ -22,51 +25,81 @@ func (p *myjar) Cookies(u *url.URL) []*http.Cookie { return p.jar[u.Host] } -func Auth(username string, password string, uri string) (bool, error) { - client := &http.Client{} +func Auth(username string, password string, url string) (bool, error) { + client := &http.Client{Timeout: time.Second * 1} jar := &myjar{} jar.jar = make(map[string][]*http.Cookie) client.Jar = jar var req *http.Request var resp *http.Response var err error - req, err = http.NewRequest("GET", uri, nil) + + req, err = http.NewRequest("GET", url, nil) + if err != nil { + return false, err + } + + req.Header.Set("Connection", "keep-alive") + req.Header.Del("Accept-Encoding") + resp, err = client.Do(req) if err != nil { return false, err } - // if resp.StatusCode == 401 { - // var authorization map[string]string = DigestAuthParams(resp) - // realmHeader := authorization["realm"] - // qopHeader := authorization["qop"] - // nonceHeader := authorization["nonce"] - // opaqueHeader := authorization["opaque"] - // realm := realmHeader - // // A1 - // h := md5.New() - // A1 := fmt.Sprintf("%s:%s:%s", username, realm, password) - // io.WriteString(h, A1) - // HA1 := fmt.Sprintf("%x", h.Sum(nil)) - // // A2 - // h = md5.New() - // A2 := fmt.Sprintf("GET:%s", "/auth") - // io.WriteString(h, A2) - // HA2 := fmt.Sprintf("%x", h.Sum(nil)) + defer resp.Body.Close() + io.Copy(ioutil.Discard, resp.Body) - // // response - // cnonce := RandomKey() - // response := H(strings.Join([]string{HA1, nonceHeader, "00000001", cnonce, qopHeader, HA2}, ":")) + if resp.StatusCode == 401 { + var authorization map[string]string = DigestAuthParams(resp) + realmHeader := authorization["realm"] + qopHeader := authorization["qop"] + nonceHeader := authorization["nonce"] + opaqueHeader := authorization["opaque"] + realm := realmHeader - // // now make header - // AuthHeader := fmt.Sprintf(`Digest username="%s", realm="%s", nonce="%s", uri="%s", cnonce="%s", nc=00000001, qop=%s, response="%s", opaque="%s", algorithm=MD5`, - // username, realmHeader, nonceHeader, "/auth", cnonce, qopHeader, response, opaqueHeader) - // req.Header.Set("Authorization", AuthHeader) - // resp, err = client.Do(req) - // } else { - // return false, fmt.Errorf("response status code should have been 401, it was %v", resp.StatusCode) - // } - return resp.StatusCode == 200, err + uri := "/" + if surl := strings.Split(url, "/"); len(surl) == 4 { + uri = "/" + surl[3] + } + + // A1 + h := md5.New() + A1 := fmt.Sprintf("%s:%s:%s", username, realm, password) + io.WriteString(h, A1) + HA1 := fmt.Sprintf("%x", h.Sum(nil)) + + // A2 + h = md5.New() + A2 := fmt.Sprintf("GET:%s", uri) + io.WriteString(h, A2) + HA2 := fmt.Sprintf("%x", h.Sum(nil)) + + // response + cnonce := RandomKey() + response := H(strings.Join([]string{HA1, nonceHeader, "00000001", cnonce, qopHeader, HA2}, ":")) + + // now make header + AuthHeader := fmt.Sprintf(`Digest username="%s",realm="%s",nonce="%s",uri="%s",qop=%s,nc=00000001,cnonce="%s",response="%s",opaque="%s",algorithm=MD5`, + username, realmHeader, nonceHeader, uri, qopHeader, cnonce, response, opaqueHeader) + req.Header.Set("Authorization", AuthHeader) + + resp, err = client.Do(req) + if err != nil { + return false, err + } + + io.Copy(ioutil.Discard, resp.Body) + if resp.StatusCode == 401 { + return false, fmt.Errorf("Authentication error!") + } + + return true, err + } else if resp.StatusCode == 200 { + return true, err + } + + return false, fmt.Errorf("Response status code not expected") } /* @@ -90,6 +123,7 @@ func DigestAuthParams(r *http.Response) map[string]string { } return result } + func RandomKey() string { k := make([]byte, 12) for bytes := 0; bytes < len(k); { diff --git a/backend/services/acs/internal/bridge/bridge.go b/backend/services/acs/internal/bridge/bridge.go index 9eda542..fdf5097 100644 --- a/backend/services/acs/internal/bridge/bridge.go +++ b/backend/services/acs/internal/bridge/bridge.go @@ -40,8 +40,9 @@ func NewBridge( } func (b *Bridge) StartBridge() { + b.sub(handler.NATS_CWMP_ADAPTER_SUBJECT_PREFIX+"*.api", func(msg *nats.Msg) { - log.Printf("Received message: %s", string(msg.Data)) + //log.Printf("Received message: %s", string(msg.Data)) log.Printf("Subject: %s", msg.Subject) log.Printf("Reply: %s", msg.Reply) @@ -53,6 +54,8 @@ func (b *Bridge) StartBridge() { return } if cpe.Queue.Size() > 0 { + log.Println("Queue size: ", cpe.Queue.Size()) + log.Println("Queue data: ", cpe.Queue) log.Printf("Device %s is busy", device) respondMsg(msg.Respond, http.StatusConflict, "Device is busy") return @@ -78,6 +81,8 @@ func (b *Bridge) StartBridge() { //req := cpe.Queue.Dequeue().(handler.Request) //cpe.Waiting = &req + defer cpe.Queue.Dequeue() + select { case response := <-deviceAnswer: log.Println("Received response from device: ", string(response)) diff --git a/backend/services/acs/internal/config/config.go b/backend/services/acs/internal/config/config.go index 79b228b..51815bf 100644 --- a/backend/services/acs/internal/config/config.go +++ b/backend/services/acs/internal/config/config.go @@ -30,6 +30,8 @@ type Acs struct { Password string Route string DebugMode bool + ConnReqUsername string + ConnReqPassword string } type Config struct { @@ -47,6 +49,8 @@ func NewConfig() *Config { natsVerifyCertificates := flag.Bool("nats_verify_certificates", lookupEnvOrBool("NATS_VERIFY_CERTIFICATES", false), "verify validity of certificates from nats server") acsPort := flag.String("acs_port", lookupEnvOrString("ACS_PORT", ":9292"), "port for acs server") acsRoute := flag.String("acs_route", lookupEnvOrString("ACS_ROUTE", "/acs"), "route for acs server") + connReqUser := flag.String("connrq_user", lookupEnvOrString("CONN_RQ_USER", ""), "Connection Request Username") + connReqPasswd := flag.String("connrq_passwd", lookupEnvOrString("CONN_RQ_PASSWD", ""), "Connection Request Password") acsKeepAliveInterval := flag.Int("acs_keep_alive_interval", lookupEnvOrInt("KEEP_ALIVE_INTERVAL", 300), "keep alive interval in seconds for acs server") cwmpDebugMode := flag.Bool("debug_mode", lookupEnvOrBool("CWMP_DEBUG", false), "enable or disable cwmp logs in debug mode") flHelp := flag.Bool("help", false, "Help") @@ -77,8 +81,10 @@ func NewConfig() *Config { Acs: Acs{ Port: *acsPort, Route: *acsRoute, - KeepAliveInterval: time.Duration(*acsKeepAliveInterval), + KeepAliveInterval: time.Duration(*acsKeepAliveInterval) * time.Second, DebugMode: *cwmpDebugMode, + ConnReqUsername: *connReqUser, + ConnReqPassword: *connReqPasswd, }, } } diff --git a/backend/services/acs/internal/server/handler/cwmp.go b/backend/services/acs/internal/server/handler/cwmp.go index bc8bff6..1b0c984 100644 --- a/backend/services/acs/internal/server/handler/cwmp.go +++ b/backend/services/acs/internal/server/handler/cwmp.go @@ -69,7 +69,7 @@ func (h *Handler) CwmpHandler(w http.ResponseWriter, r *http.Request) { log.Println("New device: " + sn) h.Cpes[sn] = CPE{ SerialNumber: sn, - LastConnection: time.Now().UTC(), + LastConnection: time.Now(), SoftwareVersion: Inform.GetSoftwareVersion(), HardwareVersion: Inform.GetHardwareVersion(), ExternalIPAddress: addr, @@ -78,12 +78,11 @@ func (h *Handler) CwmpHandler(w http.ResponseWriter, r *http.Request) { Queue: lane.NewQueue(), DataModel: Inform.GetDataModelType(), } - go h.handleCpeStatus(sn) h.pub(NATS_CWMP_SUBJECT_PREFIX+sn+".info", tmp) } obj := h.Cpes[sn] cpe := &obj - cpe.LastConnection = time.Now().UTC() + cpe.LastConnection = time.Now() log.Printf("Received an Inform from device %s withEventCodes %s", addr, Inform.GetEvents()) @@ -91,8 +90,8 @@ func (h *Handler) CwmpHandler(w http.ResponseWriter, r *http.Request) { cookie := http.Cookie{Name: "oktopus", Value: sn, Expires: expiration} http.SetCookie(w, &cookie) - data, _ := xml.Marshal(cwmp.InformResponse(envelope.Header.Id)) - w.Write(data) + //data, _ := xml.Marshal(cwmp.InformResponse(envelope.Header.Id)) + fmt.Fprintf(w, cwmp.InformResponse(envelope.Header.Id)) } else if messageType == "TransferComplete" { } else if messageType == "GetRPC" { @@ -156,13 +155,15 @@ func (h *Handler) CwmpHandler(w http.ResponseWriter, r *http.Request) { func (h *Handler) ConnectionRequest(cpe CPE) error { log.Println("--> ConnectionRequest, CPE: ", cpe.SerialNumber) - // log.Println("ConnectionRequestURL: ", cpe.ConnectionRequestURL) + // log.Println("ConnectionRequestURL: ", cpe.ConnectionRequestURL) // log.Println("ConnectionRequestUsername: ", cpe.Username) // log.Println("ConnectionRequestPassword: ", cpe.Password) - ok, err := auth.Auth("", "", cpe.ConnectionRequestURL) + ok, err := auth.Auth(h.acsConfig.ConnReqUsername, h.acsConfig.ConnReqPassword, cpe.ConnectionRequestURL) if !ok { + cpe.Queue.Dequeue() log.Println("Error while authenticating to CPE: ", err) } + log.Println("<-- Successfully authenticated to CPE", cpe.SerialNumber) return err } diff --git a/backend/services/acs/internal/server/handler/status.go b/backend/services/acs/internal/server/handler/status.go index 8334b31..9cd3c91 100644 --- a/backend/services/acs/internal/server/handler/status.go +++ b/backend/services/acs/internal/server/handler/status.go @@ -5,14 +5,21 @@ import ( "time" ) -func (h *Handler) handleCpeStatus(cpe string) { +func (h *Handler) HandleCpeStatus() { for { - if time.Since(h.Cpes[cpe].LastConnection) > h.acsConfig.KeepAliveInterval { - delete(h.Cpes, cpe) - break + for cpe := range h.Cpes { + if cpe == "" { + continue + } + log.Println("Checking CPE " + cpe + " status") + if time.Since(h.Cpes[cpe].LastConnection) > h.acsConfig.KeepAliveInterval { + log.Printf("LastConnection: %s, KeepAliveInterval: %s", h.Cpes[cpe].LastConnection, h.acsConfig.KeepAliveInterval) + log.Println("CPE", cpe, "is offline") + h.pub("cwmp.v1."+cpe+".status", []byte("0")) + delete(h.Cpes, cpe) + break + } } - time.Sleep(h.acsConfig.KeepAliveInterval) + time.Sleep(10 * time.Second) } - log.Println("CPE", cpe, "is offline") - h.pub("cwmp.v1."+cpe+".status", []byte("0")) } diff --git a/backend/services/acs/internal/server/server.go b/backend/services/acs/internal/server/server.go index 8022e2a..98970b8 100644 --- a/backend/services/acs/internal/server/server.go +++ b/backend/services/acs/internal/server/server.go @@ -12,6 +12,8 @@ import ( func Run(c config.Acs, natsActions nats.NatsActions, h *handler.Handler) { http.HandleFunc(c.Route, h.CwmpHandler) + go h.HandleCpeStatus() + log.Printf("ACS running at %s%s", c.Port, c.Route) err := http.ListenAndServe(c.Port, nil)