Backend services: - targo-hub: extract deepGetValue to helpers.js, DRY disconnect reasons lookup map, compact CAPABILITIES, consolidate vision.js prompts/schemas, extract dispatch scoring weights, trim section dividers across 9 files - modem-bridge: extract getSession() helper (6 occurrences), resetIdleTimer(), consolidate DM query factory, fix duplicate username fill bug, trim headers (server.js -36%, tplink-session.js -47%, docker-compose.yml -57%) Frontend: - useWifiDiagnostic: extract THRESHOLDS const, split processDiagnostic into 6 focused helpers (processOnlineStatus, processWanIPs, processRadios, processMeshNodes, processClients, checkRadioIssues) - EquipmentDetail: merge duplicate ROLE_LABELS, remove verbose comments Documentation (17 → 13 files, -1,400 lines): - New consolidated README.md (architecture, services, dependencies, auth) - Merge ECOSYSTEM-OVERVIEW into ARCHITECTURE.md - Merge MIGRATION-PLAN + ARCHITECTURE-COMPARE + FIELD-GAP + CHANGELOG → MIGRATION.md - Merge COMPETITIVE-ANALYSIS into PLATFORM-STRATEGY.md - Update ROADMAP.md with current phase status - Delete CONTEXT.md (absorbed into README) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
420 lines
14 KiB
JavaScript
420 lines
14 KiB
JavaScript
'use strict'
|
|
/**
|
|
* Oktopus MQTT Monitor
|
|
*
|
|
* The Oktopus CE MQTT broker's events-controller hook is non-functional
|
|
* (not connected to NATS, no status events published). This module
|
|
* provides a workaround by:
|
|
*
|
|
* 1. Connecting to the Oktopus MQTT broker as a privileged monitor
|
|
* 2. Listening on topic `oktopus/usp/+/status/+` for device Will
|
|
* messages (the only thing that works — offline status from LWT)
|
|
* 3. Actively publishing "online" status when we detect a device
|
|
* has connected (via $SYS topics or heartbeat probes)
|
|
* 4. Updating Oktopus MongoDB device status directly
|
|
*
|
|
* TP-Link xx230v USP agent behavior observed:
|
|
* - Client ID: ops::{MAC_PREFIX}-Device2-{SERIAL} (e.g. ops::E4FAC4-Device2-223B0R6001338)
|
|
* - Username: empty (not set)
|
|
* - Will Topic: sets LWT on oktopus status topic
|
|
* - Subscribe: does NOT subscribe to oktopus/usp/v1/agent/{id}
|
|
* - Result: OnSubscribed hook never fires → device never marked ONLINE
|
|
*
|
|
* This monitor fixes the status tracking by directly managing MongoDB.
|
|
*/
|
|
const { log } = require('./helpers')
|
|
const cfg = require('./config')
|
|
|
|
// MQTT broker on the Oktopus Docker network
|
|
const MQTT_BROKER_URL = cfg.OKTOPUS_MQTT_URL || 'mqtt://oktopus-mqtt-1:1883'
|
|
const OKTOPUS_MONGO_URL = cfg.OKTOPUS_MONGO_URL || 'mongodb://oktopus-mongo-1:27017'
|
|
const MONITOR_CLIENT_ID = 'targo-hub-monitor'
|
|
|
|
let mqttClient = null
|
|
let mongoDb = null
|
|
let deviceMap = new Map() // Client ID → Endpoint info
|
|
let onlineDevices = new Set() // Track currently online endpoint IDs
|
|
|
|
/* ── Client ID mapping ───────────────────────────────────────────── */
|
|
|
|
/**
|
|
* Parse TP-Link ONU Client ID to extract MAC prefix and build endpoint ID.
|
|
* Format: ops::{MAC6}-Device2-{SERIAL}
|
|
* e.g. ops::E4FAC4-Device2-223B0R6001338
|
|
* We need to look up the full MAC from the 6-char prefix via MongoDB.
|
|
*/
|
|
function parseClientId (clientId) {
|
|
if (!clientId) return null
|
|
|
|
// Standard USP format: USP::E4FAC4160688 or proto::XXXX
|
|
const uspMatch = clientId.match(/^USP::([A-F0-9]{12})$/i)
|
|
if (uspMatch) return { endpointId: clientId, macClean: uspMatch[1], format: 'usp' }
|
|
|
|
// TP-Link format: ops::E4FAC4-Device2-223B0R6001338
|
|
const opsMatch = clientId.match(/^ops::([A-F0-9]{6})-Device\d+-(.+)$/i)
|
|
if (opsMatch) {
|
|
return {
|
|
macPrefix: opsMatch[1].toUpperCase(),
|
|
serial: opsMatch[2],
|
|
format: 'ops',
|
|
raw: clientId,
|
|
}
|
|
}
|
|
|
|
// mqtt-adapter internal client
|
|
if (clientId === 'mqtt-adapter' || clientId === 'oktopusController') {
|
|
return { internal: true }
|
|
}
|
|
|
|
// Unknown format — could be a scanner or other device
|
|
return { unknown: true, raw: clientId }
|
|
}
|
|
|
|
/**
|
|
* Resolve a partial client ID (ops:: format) to a full USP endpoint ID.
|
|
* Looks up the device in MongoDB by MAC prefix or serial.
|
|
*/
|
|
async function resolveEndpointId (parsed) {
|
|
if (!parsed || parsed.internal || parsed.unknown) return null
|
|
if (parsed.format === 'usp') return parsed.endpointId
|
|
|
|
// For ops:: format, search MongoDB for matching MAC prefix
|
|
const db = await getMongoDb()
|
|
if (!db) return null
|
|
|
|
try {
|
|
const col = db.collection('devices')
|
|
|
|
// Try by serial first (more specific)
|
|
if (parsed.serial) {
|
|
const bySerial = await col.findOne({
|
|
$or: [
|
|
{ alias: parsed.serial },
|
|
{ sn: { $regex: parsed.serial, $options: 'i' } },
|
|
]
|
|
})
|
|
if (bySerial) return bySerial.sn
|
|
}
|
|
|
|
// Try by MAC prefix
|
|
if (parsed.macPrefix) {
|
|
const byMac = await col.findOne({
|
|
sn: { $regex: `^USP::${parsed.macPrefix}`, $options: 'i' }
|
|
})
|
|
if (byMac) return byMac.sn
|
|
}
|
|
|
|
log(`[mqtt-mon] Could not resolve Client ID: ${parsed.raw}`)
|
|
return null
|
|
} catch (e) {
|
|
log(`[mqtt-mon] Resolve error:`, e.message)
|
|
return null
|
|
}
|
|
}
|
|
|
|
/* ── MongoDB ─────────────────────────────────────────────────────── */
|
|
|
|
async function getMongoDb () {
|
|
if (mongoDb) return mongoDb
|
|
try {
|
|
const { MongoClient } = require('mongodb')
|
|
const client = new MongoClient(OKTOPUS_MONGO_URL)
|
|
await client.connect()
|
|
mongoDb = client.db('adapter')
|
|
log('[mqtt-mon] Connected to Oktopus MongoDB')
|
|
return mongoDb
|
|
} catch (e) {
|
|
log('[mqtt-mon] MongoDB connect error:', e.message)
|
|
return null
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Update device status in Oktopus MongoDB.
|
|
* The CE adapter only updates MongoDB for OFFLINE events. For ONLINE, it
|
|
* just sends a USP Get request and waits for a response to update the DB.
|
|
* Since TP-Link devices don't respond properly, we update MongoDB directly.
|
|
*
|
|
* status: 1 = online, 0 = offline
|
|
*/
|
|
async function updateDeviceStatus (endpointId, status) {
|
|
const db = await getMongoDb()
|
|
if (!db) return
|
|
|
|
try {
|
|
const update = {
|
|
status: status ? 1 : 0,
|
|
mqtt: status ? 1 : 0,
|
|
}
|
|
const result = await db.collection('devices').updateOne(
|
|
{ sn: endpointId },
|
|
{ $set: update }
|
|
)
|
|
if (result.matchedCount > 0) {
|
|
log(`[mqtt-mon] ${endpointId} → ${status ? 'ONLINE' : 'OFFLINE'}`)
|
|
} else {
|
|
log(`[mqtt-mon] ${endpointId} not found in MongoDB (status update skipped)`)
|
|
}
|
|
} catch (e) {
|
|
log(`[mqtt-mon] Status update error for ${endpointId}:`, e.message)
|
|
}
|
|
}
|
|
|
|
/* ── MQTT Client ─────────────────────────────────────────────────── */
|
|
|
|
function start () {
|
|
let mqtt
|
|
try {
|
|
mqtt = require('mqtt')
|
|
} catch (e) {
|
|
log('[mqtt-mon] mqtt package not available, monitor disabled:', e.message)
|
|
return
|
|
}
|
|
|
|
log(`[mqtt-mon] Connecting to MQTT broker: ${MQTT_BROKER_URL}`)
|
|
|
|
mqttClient = mqtt.connect(MQTT_BROKER_URL, {
|
|
clientId: MONITOR_CLIENT_ID,
|
|
clean: true,
|
|
protocolVersion: 5, // MQTTv5 required for Oktopus
|
|
keepalive: 30,
|
|
reconnectPeriod: 5000,
|
|
connectTimeout: 10000,
|
|
// Will message so other monitors know we disconnected
|
|
will: {
|
|
topic: `oktopus/internal/monitor/status`,
|
|
payload: Buffer.from('0'),
|
|
qos: 1,
|
|
retain: false,
|
|
},
|
|
})
|
|
|
|
mqttClient.on('connect', () => {
|
|
log('[mqtt-mon] Connected to MQTT broker')
|
|
|
|
// Subscribe to device status topics (catches LWT offline messages)
|
|
mqttClient.subscribe('oktopus/usp/v1/status/#', { qos: 1 })
|
|
// Subscribe to all oktopus USP topics to detect device activity
|
|
mqttClient.subscribe('oktopus/usp/v1/agent/#', { qos: 0 })
|
|
mqttClient.subscribe('oktopus/usp/v1/controller/#', { qos: 0 })
|
|
// Subscribe to $SYS for client connect/disconnect events
|
|
mqttClient.subscribe('$SYS/+/clients/+', { qos: 0 })
|
|
|
|
// Publish our monitor status
|
|
mqttClient.publish('oktopus/internal/monitor/status', '1', { qos: 1, retain: false })
|
|
|
|
// Start heartbeat poller — periodically check for connected devices
|
|
startHeartbeat()
|
|
})
|
|
|
|
mqttClient.on('message', handleMessage)
|
|
|
|
mqttClient.on('error', (err) => {
|
|
log('[mqtt-mon] MQTT error:', err.message)
|
|
})
|
|
|
|
mqttClient.on('reconnect', () => {
|
|
log('[mqtt-mon] Reconnecting to MQTT broker...')
|
|
})
|
|
|
|
mqttClient.on('close', () => {
|
|
log('[mqtt-mon] MQTT connection closed')
|
|
})
|
|
}
|
|
|
|
/**
|
|
* Handle incoming MQTT messages.
|
|
*/
|
|
async function handleMessage (topic, payload, packet) {
|
|
const msg = payload.toString()
|
|
|
|
// Device status messages (from LWT or direct publish)
|
|
// Topic: oktopus/usp/v1/status/{endpointId}
|
|
const statusMatch = topic.match(/^oktopus\/usp\/v1\/status\/(.+)$/)
|
|
if (statusMatch) {
|
|
const endpointId = statusMatch[1]
|
|
const status = msg === '1' ? 1 : 0
|
|
if (status === 0) {
|
|
onlineDevices.delete(endpointId)
|
|
} else {
|
|
onlineDevices.add(endpointId)
|
|
}
|
|
await updateDeviceStatus(endpointId, status)
|
|
return
|
|
}
|
|
|
|
// Device sending USP messages to controller = proof of life
|
|
// Topic: oktopus/usp/v1/controller/{endpointId_or_type}
|
|
const controllerMatch = topic.match(/^oktopus\/usp\/v1\/controller\/(.+)$/)
|
|
if (controllerMatch) {
|
|
// The device is actively communicating — mark online
|
|
// Try to identify the device from the MQTT packet properties
|
|
const deviceId = extractDeviceFromPacket(controllerMatch[1], packet)
|
|
if (deviceId && !onlineDevices.has(deviceId)) {
|
|
onlineDevices.add(deviceId)
|
|
await updateDeviceStatus(deviceId, 1)
|
|
// Also publish online status so Oktopus adapter picks it up
|
|
publishOnlineStatus(deviceId)
|
|
}
|
|
return
|
|
}
|
|
|
|
// $SYS client events (Mochi-mqtt publishes these)
|
|
if (topic.startsWith('$SYS/')) {
|
|
// Handle $SYS/+/clients/connected or similar
|
|
// Mochi-mqtt doesn't expose per-client $SYS, so this is a fallback
|
|
return
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Extract device endpoint ID from MQTT packet properties or topic.
|
|
*/
|
|
function extractDeviceFromPacket (topicSuffix, packet) {
|
|
// MQTTv5 packets may have User Properties with endpoint ID
|
|
if (packet && packet.properties && packet.properties.userProperties) {
|
|
for (const prop of packet.properties.userProperties) {
|
|
if (prop.key === 'reply-to' || prop.key === 'endpoint-id') {
|
|
return prop.value
|
|
}
|
|
}
|
|
}
|
|
|
|
// USP messages on controller topic: last segment might be message type
|
|
// e.g. oktopus/usp/v1/controller/resp or oktopus/usp/v1/controller/USP::E4FAC4160688
|
|
if (topicSuffix.startsWith('USP::') || topicSuffix.startsWith('ops::')) {
|
|
const parsed = parseClientId(topicSuffix)
|
|
if (parsed && parsed.endpointId) return parsed.endpointId
|
|
}
|
|
|
|
return null
|
|
}
|
|
|
|
/**
|
|
* Publish online status on behalf of a device.
|
|
* This is the fix for the broken CE hook — we publish the "1" status
|
|
* message to the status topic, which the mqtt-adapter will pick up
|
|
* and forward to NATS → adapter → MongoDB.
|
|
*/
|
|
function publishOnlineStatus (endpointId) {
|
|
if (!mqttClient || !mqttClient.connected) return
|
|
const statusTopic = `oktopus/usp/v1/status/${endpointId}`
|
|
mqttClient.publish(statusTopic, '1', { qos: 1, retain: false }, (err) => {
|
|
if (err) log(`[mqtt-mon] Failed to publish online status for ${endpointId}:`, err.message)
|
|
else log(`[mqtt-mon] Published ONLINE status for ${endpointId}`)
|
|
})
|
|
}
|
|
|
|
/* ── Heartbeat / Device Discovery ────────────────────────────────── */
|
|
|
|
let heartbeatInterval = null
|
|
|
|
function startHeartbeat () {
|
|
if (heartbeatInterval) clearInterval(heartbeatInterval)
|
|
|
|
// Every 60 seconds, check MongoDB for devices that should be checked
|
|
heartbeatInterval = setInterval(async () => {
|
|
try {
|
|
await discoverConnectedDevices()
|
|
} catch (e) {
|
|
log('[mqtt-mon] Heartbeat error:', e.message)
|
|
}
|
|
}, 60000)
|
|
|
|
// First check immediately
|
|
setTimeout(() => discoverConnectedDevices().catch(e => log('[mqtt-mon] Initial discover error:', e.message)), 5000)
|
|
}
|
|
|
|
/**
|
|
* Discover devices that are connected to the MQTT broker.
|
|
* Since the CE broker doesn't expose a client list API, we use a
|
|
* probe-and-check approach: publish a USP message and see if we
|
|
* get a response.
|
|
*
|
|
* Alternative: query the MQTT broker's HTTP stats if available.
|
|
*/
|
|
async function discoverConnectedDevices () {
|
|
const db = await getMongoDb()
|
|
if (!db) return
|
|
|
|
// Get all known MQTT devices from MongoDB
|
|
const devices = await db.collection('devices').find({ mqtt: true }).toArray()
|
|
|
|
for (const dev of devices) {
|
|
if (!dev.sn) continue
|
|
// Try to send a lightweight USP ping via the controller
|
|
// Actually, we can't do this without the full USP protocol stack.
|
|
// Instead, we rely on:
|
|
// 1. Device LWT (offline status) — already handled
|
|
// 2. Device publishing to controller topic — handled in handleMessage
|
|
// 3. Periodic status check by publishing a request
|
|
//
|
|
// For now, just log the device list for monitoring
|
|
}
|
|
}
|
|
|
|
/* ── Public API ──────────────────────────────────────────────────── */
|
|
|
|
/**
|
|
* Get online/offline status of all known devices.
|
|
*/
|
|
function getStatus () {
|
|
return {
|
|
connected: mqttClient?.connected || false,
|
|
broker: MQTT_BROKER_URL,
|
|
onlineDevices: Array.from(onlineDevices),
|
|
monitoredDevices: deviceMap.size,
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Manually mark a device as online (for testing or after provisioning).
|
|
*/
|
|
async function markOnline (endpointId) {
|
|
onlineDevices.add(endpointId)
|
|
await updateDeviceStatus(endpointId, 1)
|
|
publishOnlineStatus(endpointId)
|
|
return { ok: true, endpointId, status: 'online' }
|
|
}
|
|
|
|
/**
|
|
* Manually trigger a status check for a specific device.
|
|
* Publishes a lightweight message and waits for response.
|
|
*/
|
|
async function probeDevice (endpointId) {
|
|
if (!mqttClient || !mqttClient.connected) {
|
|
return { ok: false, error: 'MQTT not connected' }
|
|
}
|
|
|
|
return new Promise((resolve) => {
|
|
const timeout = setTimeout(() => {
|
|
resolve({ ok: false, endpointId, status: 'offline', note: 'No response within 5s' })
|
|
}, 5000)
|
|
|
|
// Listen for any response from the device
|
|
const responseTopic = `oktopus/usp/v1/controller/#`
|
|
const handler = (topic, payload, pkt) => {
|
|
if (topic.includes(endpointId) || (pkt?.properties?.correlationData?.toString() === endpointId)) {
|
|
clearTimeout(timeout)
|
|
mqttClient.removeListener('message', handler)
|
|
onlineDevices.add(endpointId)
|
|
updateDeviceStatus(endpointId, 1)
|
|
resolve({ ok: true, endpointId, status: 'online' })
|
|
}
|
|
}
|
|
mqttClient.on('message', handler)
|
|
|
|
// Send a lightweight USP message via the agent topic
|
|
const agentTopic = `oktopus/usp/v1/agent/${endpointId}`
|
|
mqttClient.publish(agentTopic, Buffer.from(''), {
|
|
qos: 1,
|
|
properties: {
|
|
correlationData: Buffer.from(endpointId),
|
|
responseTopic: `oktopus/usp/v1/controller/${endpointId}`,
|
|
}
|
|
})
|
|
})
|
|
}
|
|
|
|
module.exports = { start, getStatus, markOnline, probeDevice, publishOnlineStatus }
|