gigafibre-fsm/services/targo-hub/lib/oktopus-mqtt.js
louispaulb 607ea54b5c refactor: reduce token count, DRY code, consolidate docs
Backend services:
- targo-hub: extract deepGetValue to helpers.js, DRY disconnect reasons
  lookup map, compact CAPABILITIES, consolidate vision.js prompts/schemas,
  extract dispatch scoring weights, trim section dividers across 9 files
- modem-bridge: extract getSession() helper (6 occurrences), resetIdleTimer(),
  consolidate DM query factory, fix duplicate username fill bug, trim headers
  (server.js -36%, tplink-session.js -47%, docker-compose.yml -57%)

Frontend:
- useWifiDiagnostic: extract THRESHOLDS const, split processDiagnostic into
  6 focused helpers (processOnlineStatus, processWanIPs, processRadios,
  processMeshNodes, processClients, checkRadioIssues)
- EquipmentDetail: merge duplicate ROLE_LABELS, remove verbose comments

Documentation (17 → 13 files, -1,400 lines):
- New consolidated README.md (architecture, services, dependencies, auth)
- Merge ECOSYSTEM-OVERVIEW into ARCHITECTURE.md
- Merge MIGRATION-PLAN + ARCHITECTURE-COMPARE + FIELD-GAP + CHANGELOG → MIGRATION.md
- Merge COMPETITIVE-ANALYSIS into PLATFORM-STRATEGY.md
- Update ROADMAP.md with current phase status
- Delete CONTEXT.md (absorbed into README)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-13 08:39:58 -04:00

420 lines
14 KiB
JavaScript

'use strict'
/**
* Oktopus MQTT Monitor
*
* The Oktopus CE MQTT broker's events-controller hook is non-functional
* (not connected to NATS, no status events published). This module
* provides a workaround by:
*
* 1. Connecting to the Oktopus MQTT broker as a privileged monitor
* 2. Listening on topic `oktopus/usp/+/status/+` for device Will
* messages (the only thing that works — offline status from LWT)
* 3. Actively publishing "online" status when we detect a device
* has connected (via $SYS topics or heartbeat probes)
* 4. Updating Oktopus MongoDB device status directly
*
* TP-Link xx230v USP agent behavior observed:
* - Client ID: ops::{MAC_PREFIX}-Device2-{SERIAL} (e.g. ops::E4FAC4-Device2-223B0R6001338)
* - Username: empty (not set)
* - Will Topic: sets LWT on oktopus status topic
* - Subscribe: does NOT subscribe to oktopus/usp/v1/agent/{id}
* - Result: OnSubscribed hook never fires → device never marked ONLINE
*
* This monitor fixes the status tracking by directly managing MongoDB.
*/
const { log } = require('./helpers')
const cfg = require('./config')
// MQTT broker on the Oktopus Docker network
const MQTT_BROKER_URL = cfg.OKTOPUS_MQTT_URL || 'mqtt://oktopus-mqtt-1:1883'
const OKTOPUS_MONGO_URL = cfg.OKTOPUS_MONGO_URL || 'mongodb://oktopus-mongo-1:27017'
const MONITOR_CLIENT_ID = 'targo-hub-monitor'
let mqttClient = null
let mongoDb = null
let deviceMap = new Map() // Client ID → Endpoint info
let onlineDevices = new Set() // Track currently online endpoint IDs
/* ── Client ID mapping ───────────────────────────────────────────── */
/**
* Parse TP-Link ONU Client ID to extract MAC prefix and build endpoint ID.
* Format: ops::{MAC6}-Device2-{SERIAL}
* e.g. ops::E4FAC4-Device2-223B0R6001338
* We need to look up the full MAC from the 6-char prefix via MongoDB.
*/
function parseClientId (clientId) {
if (!clientId) return null
// Standard USP format: USP::E4FAC4160688 or proto::XXXX
const uspMatch = clientId.match(/^USP::([A-F0-9]{12})$/i)
if (uspMatch) return { endpointId: clientId, macClean: uspMatch[1], format: 'usp' }
// TP-Link format: ops::E4FAC4-Device2-223B0R6001338
const opsMatch = clientId.match(/^ops::([A-F0-9]{6})-Device\d+-(.+)$/i)
if (opsMatch) {
return {
macPrefix: opsMatch[1].toUpperCase(),
serial: opsMatch[2],
format: 'ops',
raw: clientId,
}
}
// mqtt-adapter internal client
if (clientId === 'mqtt-adapter' || clientId === 'oktopusController') {
return { internal: true }
}
// Unknown format — could be a scanner or other device
return { unknown: true, raw: clientId }
}
/**
* Resolve a partial client ID (ops:: format) to a full USP endpoint ID.
* Looks up the device in MongoDB by MAC prefix or serial.
*/
async function resolveEndpointId (parsed) {
if (!parsed || parsed.internal || parsed.unknown) return null
if (parsed.format === 'usp') return parsed.endpointId
// For ops:: format, search MongoDB for matching MAC prefix
const db = await getMongoDb()
if (!db) return null
try {
const col = db.collection('devices')
// Try by serial first (more specific)
if (parsed.serial) {
const bySerial = await col.findOne({
$or: [
{ alias: parsed.serial },
{ sn: { $regex: parsed.serial, $options: 'i' } },
]
})
if (bySerial) return bySerial.sn
}
// Try by MAC prefix
if (parsed.macPrefix) {
const byMac = await col.findOne({
sn: { $regex: `^USP::${parsed.macPrefix}`, $options: 'i' }
})
if (byMac) return byMac.sn
}
log(`[mqtt-mon] Could not resolve Client ID: ${parsed.raw}`)
return null
} catch (e) {
log(`[mqtt-mon] Resolve error:`, e.message)
return null
}
}
/* ── MongoDB ─────────────────────────────────────────────────────── */
async function getMongoDb () {
if (mongoDb) return mongoDb
try {
const { MongoClient } = require('mongodb')
const client = new MongoClient(OKTOPUS_MONGO_URL)
await client.connect()
mongoDb = client.db('adapter')
log('[mqtt-mon] Connected to Oktopus MongoDB')
return mongoDb
} catch (e) {
log('[mqtt-mon] MongoDB connect error:', e.message)
return null
}
}
/**
* Update device status in Oktopus MongoDB.
* The CE adapter only updates MongoDB for OFFLINE events. For ONLINE, it
* just sends a USP Get request and waits for a response to update the DB.
* Since TP-Link devices don't respond properly, we update MongoDB directly.
*
* status: 1 = online, 0 = offline
*/
async function updateDeviceStatus (endpointId, status) {
const db = await getMongoDb()
if (!db) return
try {
const update = {
status: status ? 1 : 0,
mqtt: status ? 1 : 0,
}
const result = await db.collection('devices').updateOne(
{ sn: endpointId },
{ $set: update }
)
if (result.matchedCount > 0) {
log(`[mqtt-mon] ${endpointId}${status ? 'ONLINE' : 'OFFLINE'}`)
} else {
log(`[mqtt-mon] ${endpointId} not found in MongoDB (status update skipped)`)
}
} catch (e) {
log(`[mqtt-mon] Status update error for ${endpointId}:`, e.message)
}
}
/* ── MQTT Client ─────────────────────────────────────────────────── */
function start () {
let mqtt
try {
mqtt = require('mqtt')
} catch (e) {
log('[mqtt-mon] mqtt package not available, monitor disabled:', e.message)
return
}
log(`[mqtt-mon] Connecting to MQTT broker: ${MQTT_BROKER_URL}`)
mqttClient = mqtt.connect(MQTT_BROKER_URL, {
clientId: MONITOR_CLIENT_ID,
clean: true,
protocolVersion: 5, // MQTTv5 required for Oktopus
keepalive: 30,
reconnectPeriod: 5000,
connectTimeout: 10000,
// Will message so other monitors know we disconnected
will: {
topic: `oktopus/internal/monitor/status`,
payload: Buffer.from('0'),
qos: 1,
retain: false,
},
})
mqttClient.on('connect', () => {
log('[mqtt-mon] Connected to MQTT broker')
// Subscribe to device status topics (catches LWT offline messages)
mqttClient.subscribe('oktopus/usp/v1/status/#', { qos: 1 })
// Subscribe to all oktopus USP topics to detect device activity
mqttClient.subscribe('oktopus/usp/v1/agent/#', { qos: 0 })
mqttClient.subscribe('oktopus/usp/v1/controller/#', { qos: 0 })
// Subscribe to $SYS for client connect/disconnect events
mqttClient.subscribe('$SYS/+/clients/+', { qos: 0 })
// Publish our monitor status
mqttClient.publish('oktopus/internal/monitor/status', '1', { qos: 1, retain: false })
// Start heartbeat poller — periodically check for connected devices
startHeartbeat()
})
mqttClient.on('message', handleMessage)
mqttClient.on('error', (err) => {
log('[mqtt-mon] MQTT error:', err.message)
})
mqttClient.on('reconnect', () => {
log('[mqtt-mon] Reconnecting to MQTT broker...')
})
mqttClient.on('close', () => {
log('[mqtt-mon] MQTT connection closed')
})
}
/**
* Handle incoming MQTT messages.
*/
async function handleMessage (topic, payload, packet) {
const msg = payload.toString()
// Device status messages (from LWT or direct publish)
// Topic: oktopus/usp/v1/status/{endpointId}
const statusMatch = topic.match(/^oktopus\/usp\/v1\/status\/(.+)$/)
if (statusMatch) {
const endpointId = statusMatch[1]
const status = msg === '1' ? 1 : 0
if (status === 0) {
onlineDevices.delete(endpointId)
} else {
onlineDevices.add(endpointId)
}
await updateDeviceStatus(endpointId, status)
return
}
// Device sending USP messages to controller = proof of life
// Topic: oktopus/usp/v1/controller/{endpointId_or_type}
const controllerMatch = topic.match(/^oktopus\/usp\/v1\/controller\/(.+)$/)
if (controllerMatch) {
// The device is actively communicating — mark online
// Try to identify the device from the MQTT packet properties
const deviceId = extractDeviceFromPacket(controllerMatch[1], packet)
if (deviceId && !onlineDevices.has(deviceId)) {
onlineDevices.add(deviceId)
await updateDeviceStatus(deviceId, 1)
// Also publish online status so Oktopus adapter picks it up
publishOnlineStatus(deviceId)
}
return
}
// $SYS client events (Mochi-mqtt publishes these)
if (topic.startsWith('$SYS/')) {
// Handle $SYS/+/clients/connected or similar
// Mochi-mqtt doesn't expose per-client $SYS, so this is a fallback
return
}
}
/**
* Extract device endpoint ID from MQTT packet properties or topic.
*/
function extractDeviceFromPacket (topicSuffix, packet) {
// MQTTv5 packets may have User Properties with endpoint ID
if (packet && packet.properties && packet.properties.userProperties) {
for (const prop of packet.properties.userProperties) {
if (prop.key === 'reply-to' || prop.key === 'endpoint-id') {
return prop.value
}
}
}
// USP messages on controller topic: last segment might be message type
// e.g. oktopus/usp/v1/controller/resp or oktopus/usp/v1/controller/USP::E4FAC4160688
if (topicSuffix.startsWith('USP::') || topicSuffix.startsWith('ops::')) {
const parsed = parseClientId(topicSuffix)
if (parsed && parsed.endpointId) return parsed.endpointId
}
return null
}
/**
* Publish online status on behalf of a device.
* This is the fix for the broken CE hook — we publish the "1" status
* message to the status topic, which the mqtt-adapter will pick up
* and forward to NATS → adapter → MongoDB.
*/
function publishOnlineStatus (endpointId) {
if (!mqttClient || !mqttClient.connected) return
const statusTopic = `oktopus/usp/v1/status/${endpointId}`
mqttClient.publish(statusTopic, '1', { qos: 1, retain: false }, (err) => {
if (err) log(`[mqtt-mon] Failed to publish online status for ${endpointId}:`, err.message)
else log(`[mqtt-mon] Published ONLINE status for ${endpointId}`)
})
}
/* ── Heartbeat / Device Discovery ────────────────────────────────── */
let heartbeatInterval = null
function startHeartbeat () {
if (heartbeatInterval) clearInterval(heartbeatInterval)
// Every 60 seconds, check MongoDB for devices that should be checked
heartbeatInterval = setInterval(async () => {
try {
await discoverConnectedDevices()
} catch (e) {
log('[mqtt-mon] Heartbeat error:', e.message)
}
}, 60000)
// First check immediately
setTimeout(() => discoverConnectedDevices().catch(e => log('[mqtt-mon] Initial discover error:', e.message)), 5000)
}
/**
* Discover devices that are connected to the MQTT broker.
* Since the CE broker doesn't expose a client list API, we use a
* probe-and-check approach: publish a USP message and see if we
* get a response.
*
* Alternative: query the MQTT broker's HTTP stats if available.
*/
async function discoverConnectedDevices () {
const db = await getMongoDb()
if (!db) return
// Get all known MQTT devices from MongoDB
const devices = await db.collection('devices').find({ mqtt: true }).toArray()
for (const dev of devices) {
if (!dev.sn) continue
// Try to send a lightweight USP ping via the controller
// Actually, we can't do this without the full USP protocol stack.
// Instead, we rely on:
// 1. Device LWT (offline status) — already handled
// 2. Device publishing to controller topic — handled in handleMessage
// 3. Periodic status check by publishing a request
//
// For now, just log the device list for monitoring
}
}
/* ── Public API ──────────────────────────────────────────────────── */
/**
* Get online/offline status of all known devices.
*/
function getStatus () {
return {
connected: mqttClient?.connected || false,
broker: MQTT_BROKER_URL,
onlineDevices: Array.from(onlineDevices),
monitoredDevices: deviceMap.size,
}
}
/**
* Manually mark a device as online (for testing or after provisioning).
*/
async function markOnline (endpointId) {
onlineDevices.add(endpointId)
await updateDeviceStatus(endpointId, 1)
publishOnlineStatus(endpointId)
return { ok: true, endpointId, status: 'online' }
}
/**
* Manually trigger a status check for a specific device.
* Publishes a lightweight message and waits for response.
*/
async function probeDevice (endpointId) {
if (!mqttClient || !mqttClient.connected) {
return { ok: false, error: 'MQTT not connected' }
}
return new Promise((resolve) => {
const timeout = setTimeout(() => {
resolve({ ok: false, endpointId, status: 'offline', note: 'No response within 5s' })
}, 5000)
// Listen for any response from the device
const responseTopic = `oktopus/usp/v1/controller/#`
const handler = (topic, payload, pkt) => {
if (topic.includes(endpointId) || (pkt?.properties?.correlationData?.toString() === endpointId)) {
clearTimeout(timeout)
mqttClient.removeListener('message', handler)
onlineDevices.add(endpointId)
updateDeviceStatus(endpointId, 1)
resolve({ ok: true, endpointId, status: 'online' })
}
}
mqttClient.on('message', handler)
// Send a lightweight USP message via the agent topic
const agentTopic = `oktopus/usp/v1/agent/${endpointId}`
mqttClient.publish(agentTopic, Buffer.from(''), {
qos: 1,
properties: {
correlationData: Buffer.from(endpointId),
responseTopic: `oktopus/usp/v1/controller/${endpointId}`,
}
})
})
}
module.exports = { start, getStatus, markOnline, probeDevice, publishOnlineStatus }