gigafibre-fsm/services/targo-hub/server.js
louispaulb 4693bcf60c feat: telephony UI, performance indexes, Twilio softphone, lazy-load invoices
- Add PostgreSQL performance indexes migration script (1000x faster queries)
  Sales Invoice: 1,248ms → 28ms, Payment Entry: 443ms → 31ms
  Indexes on customer/party columns for all major tables
- Disable 3CX poller (PBX_ENABLED flag, using Twilio instead)
- Add TelephonyPage: full CRUD UI for Routr/Fonoster resources
  (trunks, agents, credentials, numbers, domains, peers)
- Add PhoneModal + usePhone composable (Twilio WebRTC softphone)
- Lazy-load invoices/payments (initial 5, expand on demand)
- Parallelize all API calls in ClientDetailPage (no waterfall)
- Add targo-hub service (SSE relay, SMS, voice, telephony API)
- Customer portal: invoice detail, ticket detail, messages pages
- Remove dead Ollama nginx upstream

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-02 13:59:59 -04:00

1039 lines
37 KiB
JavaScript

/**
* targo-hub — Lightweight SSE relay for real-time Communication events.
*
* Endpoints:
* GET /sse?topics=customer:C-LPB4,customer:C-114796350603272
* → SSE stream, authenticated via X-Authentik-Email header (Traefik injects it)
*
* POST /broadcast
* → Push an event to all SSE clients subscribed to matching topics
* → Auth: Bearer token (HUB_INTERNAL_TOKEN)
* → Body: { topic, event, data }
*
* POST /webhook/twilio/sms-incoming
* → Receive Twilio inbound SMS, log to ERPNext, broadcast SSE
*
* POST /webhook/twilio/sms-status
* → Receive Twilio delivery status updates
*
* GET /health
* → Health check
*
* 3CX Call Log Poller:
* → Polls 3CX xAPI every 30s for new completed calls
* → Logs to ERPNext Communication, broadcasts SSE
*/
const http = require('http')
const https = require('https')
const { URL } = require('url')
// ── Config ──
const PORT = parseInt(process.env.PORT || '3300', 10)
const INTERNAL_TOKEN = process.env.HUB_INTERNAL_TOKEN || ''
const ERP_URL = process.env.ERP_URL || 'http://erpnext-backend:8000'
const ERP_TOKEN = process.env.ERP_TOKEN || ''
const TWILIO_ACCOUNT_SID = process.env.TWILIO_ACCOUNT_SID || ''
const TWILIO_AUTH_TOKEN = process.env.TWILIO_AUTH_TOKEN || ''
const TWILIO_FROM = process.env.TWILIO_FROM || ''
// 3CX Config (DISABLED by default — set PBX_ENABLED=1 to re-enable)
const PBX_ENABLED = process.env.PBX_ENABLED === '1'
const PBX_URL = process.env.PBX_URL || 'https://targopbx.3cx.ca'
const PBX_USER = process.env.PBX_USER || ''
const PBX_PASS = process.env.PBX_PASS || ''
const PBX_POLL_INTERVAL = parseInt(process.env.PBX_POLL_INTERVAL || '30000', 10) // 30s
// Twilio Voice Config
const TWILIO_API_KEY = process.env.TWILIO_API_KEY || ''
const TWILIO_API_SECRET = process.env.TWILIO_API_SECRET || ''
const TWILIO_TWIML_APP_SID = process.env.TWILIO_TWIML_APP_SID || ''
// Fonoster/Routr DB Config (direct PostgreSQL access)
const ROUTR_DB_URL = process.env.ROUTR_DB_URL || ''
const FNIDENTITY_DB_URL = process.env.FNIDENTITY_DB_URL || ''
// ── SSE Client Registry ──
// Map<topic, Set<{ res, email }>>
const subscribers = new Map()
let clientIdSeq = 0
function addClient (topics, res, email) {
const id = ++clientIdSeq
const client = { id, res, email, topics }
for (const t of topics) {
if (!subscribers.has(t)) subscribers.set(t, new Set())
subscribers.get(t).add(client)
}
// Remove on disconnect
res.on('close', () => {
for (const t of topics) {
const set = subscribers.get(t)
if (set) {
set.delete(client)
if (set.size === 0) subscribers.delete(t)
}
}
log(`SSE client #${id} disconnected (${email})`)
})
log(`SSE client #${id} connected (${email}) topics=[${topics.join(',')}]`)
return id
}
function broadcast (topic, event, data) {
const set = subscribers.get(topic)
if (!set || set.size === 0) return 0
const payload = `event: ${event}\ndata: ${JSON.stringify(data)}\n\n`
let count = 0
for (const client of set) {
try {
client.res.write(payload)
count++
} catch {
// Client gone, will be cleaned up on close
}
}
return count
}
function broadcastAll (event, data) {
// Broadcast to ALL connected clients regardless of topic
const sent = new Set()
let count = 0
for (const [, set] of subscribers) {
for (const client of set) {
if (!sent.has(client.id)) {
sent.add(client.id)
try {
client.res.write(`event: ${event}\ndata: ${JSON.stringify(data)}\n\n`)
count++
} catch {}
}
}
}
return count
}
// ── ERPNext helpers ──
const ERP_SITE = process.env.ERP_SITE || 'erp.gigafibre.ca'
function erpFetch (path, opts = {}) {
const url = ERP_URL + path
return new Promise((resolve, reject) => {
const parsed = new URL(url)
const reqOpts = {
hostname: parsed.hostname,
port: parsed.port || 8000,
path: parsed.pathname + parsed.search,
method: opts.method || 'GET',
headers: {
Host: ERP_SITE, // Required: Frappe multi-tenant routing needs the site name
Authorization: 'token ' + ERP_TOKEN,
'Content-Type': 'application/json',
...opts.headers,
},
}
const req = http.request(reqOpts, (res) => {
let body = ''
res.on('data', c => body += c)
res.on('end', () => {
try { resolve({ status: res.statusCode, data: JSON.parse(body) }) }
catch { resolve({ status: res.statusCode, data: body }) }
})
})
req.on('error', reject)
if (opts.body) req.write(typeof opts.body === 'string' ? opts.body : JSON.stringify(opts.body))
req.end()
})
}
async function lookupCustomerByPhone (phone) {
// Normalize to last 10 digits
const digits = phone.replace(/\D/g, '').slice(-10)
const fields = JSON.stringify(['name', 'customer_name', 'cell_phone', 'tel_home', 'tel_office'])
// Search across all phone fields: cell_phone, tel_home, tel_office
for (const field of ['cell_phone', 'tel_home', 'tel_office']) {
const filters = JSON.stringify([[field, 'like', '%' + digits]])
const path = `/api/resource/Customer?filters=${encodeURIComponent(filters)}&fields=${encodeURIComponent(fields)}&limit_page_length=1`
try {
const res = await erpFetch(path)
if (res.status === 200 && res.data.data && res.data.data.length > 0) {
return res.data.data[0]
}
} catch (e) {
log('lookupCustomerByPhone error on ' + field + ':', e.message)
}
}
return null
}
async function createCommunication (fields) {
return erpFetch('/api/resource/Communication', {
method: 'POST',
body: JSON.stringify(fields),
})
}
// ── Request body parser ──
function parseBody (req) {
return new Promise((resolve, reject) => {
const chunks = []
req.on('data', c => chunks.push(c))
req.on('end', () => {
const raw = Buffer.concat(chunks).toString()
const ct = (req.headers['content-type'] || '').toLowerCase()
if (ct.includes('application/json')) {
try { resolve(JSON.parse(raw)) } catch { resolve({}) }
} else if (ct.includes('urlencoded')) {
// Twilio sends application/x-www-form-urlencoded
const params = new URLSearchParams(raw)
const obj = {}
for (const [k, v] of params) obj[k] = v
resolve(obj)
} else {
resolve(raw)
}
})
req.on('error', reject)
})
}
// ── Logging ──
function log (...args) {
console.log(`[${new Date().toISOString().slice(11, 19)}]`, ...args)
}
// ── HTTP Server ──
const server = http.createServer(async (req, res) => {
const url = new URL(req.url, `http://localhost:${PORT}`)
const path = url.pathname
const method = req.method
// CORS headers (for browser SSE + POST from ops/client apps)
res.setHeader('Access-Control-Allow-Origin', '*')
res.setHeader('Access-Control-Allow-Methods', 'GET, POST, PUT, DELETE, OPTIONS')
res.setHeader('Access-Control-Allow-Headers', 'Content-Type, Authorization, X-Authentik-Email, X-Authentik-Groups')
if (method === 'OPTIONS') {
res.writeHead(204)
return res.end()
}
try {
// ─── Health ───
if (path === '/health') {
const clientCount = new Set()
for (const [, set] of subscribers) for (const c of set) clientCount.add(c.id)
return json(res, 200, {
ok: true,
clients: clientCount.size,
topics: subscribers.size,
uptime: Math.floor(process.uptime()),
})
}
// ─── SSE endpoint ───
if (path === '/sse' && method === 'GET') {
const email = req.headers['x-authentik-email'] || 'anonymous'
const topicsParam = url.searchParams.get('topics') || ''
const topics = topicsParam.split(',').map(t => t.trim()).filter(Boolean)
if (!topics.length) {
return json(res, 400, { error: 'Missing topics parameter' })
}
// Set SSE headers
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
Connection: 'keep-alive',
'X-Accel-Buffering': 'no', // Disable nginx/traefik buffering
})
// Send initial comment to establish connection
res.write(': connected\n\n')
const clientId = addClient(topics, res, email)
// Keepalive ping every 25s
const keepalive = setInterval(() => {
try { res.write(': ping\n\n') } catch { clearInterval(keepalive) }
}, 25000)
res.on('close', () => clearInterval(keepalive))
return
}
// ─── Broadcast (internal) ───
if (path === '/broadcast' && method === 'POST') {
// Auth check
const auth = req.headers.authorization || ''
if (INTERNAL_TOKEN && auth !== 'Bearer ' + INTERNAL_TOKEN) {
return json(res, 401, { error: 'Unauthorized' })
}
const body = await parseBody(req)
const { topic, event, data } = body
if (!topic || !event) {
return json(res, 400, { error: 'Missing topic or event' })
}
const count = broadcast(topic, event || 'message', data || {})
return json(res, 200, { ok: true, delivered: count })
}
// ─── Twilio SMS Incoming ───
if (path === '/webhook/twilio/sms-incoming' && method === 'POST') {
const body = await parseBody(req)
const from = body.From || ''
const to = body.To || ''
const text = body.Body || ''
const sid = body.MessageSid || ''
log(`SMS IN: ${from}${to}: ${text.substring(0, 50)}...`)
// Respond to Twilio immediately (they expect TwiML or 200)
res.writeHead(200, { 'Content-Type': 'text/xml' })
res.end('<Response></Response>')
// Process async: lookup customer, log, broadcast
setImmediate(async () => {
try {
const customer = await lookupCustomerByPhone(from)
const customerName = customer ? customer.name : null
const customerLabel = customer ? customer.customer_name : 'Inconnu'
// Log Communication in ERPNext
if (customerName) {
await createCommunication({
communication_type: 'Communication',
communication_medium: 'SMS',
sent_or_received: 'Received',
sender: 'sms@gigafibre.ca',
sender_full_name: customerLabel,
phone_no: from,
content: text,
subject: 'SMS from ' + from,
reference_doctype: 'Customer',
reference_name: customerName,
message_id: sid,
status: 'Open',
})
}
// Broadcast SSE event
const eventData = {
type: 'sms',
direction: 'in',
customer: customerName,
customer_name: customerLabel,
phone: from,
text,
sid,
ts: new Date().toISOString(),
}
let n = 0
if (customerName) {
n = broadcast('customer:' + customerName, 'message', eventData)
}
// Also broadcast to global topic for dispatch/monitoring
broadcastAll('sms-incoming', eventData)
log(`SMS logged: ${from}${customerName || 'UNKNOWN'} (${sid}) broadcast=${n}`)
} catch (e) {
log('SMS processing error:', e.message)
}
})
return
}
// ─── Twilio SMS Status ───
if (path === '/webhook/twilio/sms-status' && method === 'POST') {
const body = await parseBody(req)
const sid = body.MessageSid || body.SmsSid || ''
const status = body.MessageStatus || body.SmsStatus || ''
log(`SMS STATUS: ${sid}${status}`)
// Respond immediately
res.writeHead(200, { 'Content-Type': 'text/xml' })
res.end('<Response></Response>')
// Broadcast status update
setImmediate(() => {
broadcastAll('sms-status', { sid, status, ts: new Date().toISOString() })
})
return
}
// ─── Send SMS (outbound, called from ops/client apps) ───
if (path === '/send/sms' && method === 'POST') {
const body = await parseBody(req)
const { phone, message, customer } = body
if (!phone || !message) {
return json(res, 400, { error: 'Missing phone or message' })
}
// Send via Twilio
const twilioData = new URLSearchParams({
To: phone.startsWith('+') ? phone : '+' + phone,
From: TWILIO_FROM,
Body: message,
StatusCallback: (process.env.HUB_PUBLIC_URL || 'https://msg.gigafibre.ca') + '/webhook/twilio/sms-status',
})
const twilioRes = await new Promise((resolve, reject) => {
const authStr = Buffer.from(TWILIO_ACCOUNT_SID + ':' + TWILIO_AUTH_TOKEN).toString('base64')
const twilioReq = require('https').request({
hostname: 'api.twilio.com',
path: `/2010-04-01/Accounts/${TWILIO_ACCOUNT_SID}/Messages.json`,
method: 'POST',
headers: {
Authorization: 'Basic ' + authStr,
'Content-Type': 'application/x-www-form-urlencoded',
},
}, (res) => {
let body = ''
res.on('data', c => body += c)
res.on('end', () => {
try { resolve({ status: res.statusCode, data: JSON.parse(body) }) }
catch { resolve({ status: res.statusCode, data: body }) }
})
})
twilioReq.on('error', reject)
twilioReq.write(twilioData.toString())
twilioReq.end()
})
if (twilioRes.status >= 400) {
log('Twilio send error:', twilioRes.data)
return json(res, 502, { ok: false, error: 'Twilio error', details: twilioRes.data })
}
const sid = twilioRes.data.sid || ''
log(`SMS OUT: → ${phone}: ${message.substring(0, 50)}... (${sid})`)
// Log Communication in ERPNext
if (customer) {
await createCommunication({
communication_type: 'Communication',
communication_medium: 'SMS',
sent_or_received: 'Sent',
sender: 'sms@gigafibre.ca',
sender_full_name: 'Targo Ops',
phone_no: phone,
content: message,
subject: 'SMS to ' + phone,
reference_doctype: 'Customer',
reference_name: customer,
message_id: sid,
status: 'Linked',
})
// Broadcast SSE
broadcast('customer:' + customer, 'message', {
type: 'sms',
direction: 'out',
customer,
phone,
text: message,
sid,
ts: new Date().toISOString(),
})
}
return json(res, 200, { ok: true, sid })
}
// ─── Twilio Voice: Generate Access Token ───
if (path === '/voice/token' && method === 'GET') {
if (!TWILIO_API_KEY || !TWILIO_API_SECRET || !TWILIO_TWIML_APP_SID) {
return json(res, 503, { error: 'Twilio Voice not configured' })
}
const identity = url.searchParams.get('identity') || req.headers['x-authentik-email'] || 'ops-agent'
try {
const twilio = require('twilio')
const AccessToken = twilio.jwt.AccessToken
const VoiceGrant = AccessToken.VoiceGrant
const token = new AccessToken(
TWILIO_ACCOUNT_SID,
TWILIO_API_KEY,
TWILIO_API_SECRET,
{ identity, ttl: 3600 }
)
const grant = new VoiceGrant({
outgoingApplicationSid: TWILIO_TWIML_APP_SID,
incomingAllow: true,
})
token.addGrant(grant)
log(`Voice token generated for ${identity}`)
return json(res, 200, { token: token.toJwt(), identity })
} catch (e) {
log('Voice token error:', e.message)
return json(res, 500, { error: 'Token generation failed: ' + e.message })
}
}
// ─── Fonoster SIP: Get SIP config for WebRTC client ───
if (path === '/voice/sip-config' && method === 'GET') {
// Return SIP credentials for the browser client
// These are configured via env vars or fetched from Fonoster API
const sipConfig = {
wssUrl: process.env.SIP_WSS_URL || 'wss://voice.gigafibre.ca:5063',
domain: process.env.SIP_DOMAIN || 'voice.gigafibre.ca',
extension: process.env.SIP_EXTENSION || '1001',
authId: process.env.SIP_AUTH_ID || '1001',
authPassword: process.env.SIP_AUTH_PASSWORD || '',
displayName: process.env.SIP_DISPLAY_NAME || 'Targo Ops',
identity: process.env.SIP_EXTENSION || '1001',
}
return json(res, 200, sipConfig)
}
// ─── Twilio Voice: TwiML for outbound calls ───
if (path === '/voice/twiml' && method === 'POST') {
const body = await parseBody(req)
const to = body.To || body.phone || ''
log(`Voice TwiML: dialing ${to}`)
// Return TwiML to dial the number
const callerId = TWILIO_FROM || '+14382313838'
const twiml = `<?xml version="1.0" encoding="UTF-8"?>
<Response>
<Dial callerId="${callerId}" answerOnBridge="true" timeout="30">
<Number statusCallbackEvent="initiated ringing answered completed"
statusCallback="${(process.env.HUB_PUBLIC_URL || 'https://msg.gigafibre.ca')}/voice/status"
statusCallbackMethod="POST">${to}</Number>
</Dial>
</Response>`
res.writeHead(200, { 'Content-Type': 'text/xml' })
return res.end(twiml)
}
// ─── Twilio Voice: Call status callback ───
if (path === '/voice/status' && method === 'POST') {
const body = await parseBody(req)
const callSid = body.CallSid || ''
const callStatus = body.CallStatus || ''
const to = body.To || ''
const from = body.From || ''
const duration = parseInt(body.CallDuration || '0', 10)
log(`Voice STATUS: ${callSid} ${from}${to} status=${callStatus} dur=${duration}s`)
res.writeHead(200, { 'Content-Type': 'text/xml' })
res.end('<Response></Response>')
// On completed, log to ERPNext and broadcast
if (callStatus === 'completed' && duration > 0) {
setImmediate(async () => {
try {
const phone = to.startsWith('client:') ? from : to
const customer = await lookupCustomerByPhone(phone)
const customerName = customer ? customer.name : null
if (customerName) {
const durationMin = Math.floor(duration / 60)
const durationSec = duration % 60
const durationStr = `${durationMin}m${durationSec.toString().padStart(2, '0')}s`
await createCommunication({
communication_type: 'Communication',
communication_medium: 'Phone',
sent_or_received: 'Sent',
sender: 'sms@gigafibre.ca',
sender_full_name: 'Targo Ops',
phone_no: phone,
content: `Appel vers ${phone} — Duree: ${durationStr}`,
subject: `Appel vers ${phone}`,
reference_doctype: 'Customer',
reference_name: customerName,
status: 'Linked',
})
broadcast('customer:' + customerName, 'call-event', {
type: 'call', event: 'completed', direction: 'out',
customer: customerName, phone, duration,
call_id: callSid, ts: new Date().toISOString(),
})
log(`Voice logged: ${phone}${customerName} (${durationStr})`)
}
} catch (e) {
log('Voice status processing error:', e.message)
}
})
}
return
}
// ─── 3CX Call Event Webhook (CRM Integration) ───
if (path === '/webhook/3cx/call-event' && method === 'POST') {
const body = await parseBody(req)
// 3CX CRM Integration sends: event_type, call_id, direction, caller, callee, ext, duration, status, etc.
const eventType = body.event_type || body.EventType || body.type || ''
const callId = body.call_id || body.CallId || body.id || ''
const direction = (body.direction || body.Direction || '').toLowerCase() // inbound / outbound
const caller = body.caller || body.Caller || body.from || ''
const callee = body.callee || body.Callee || body.to || ''
const ext = body.ext || body.Extension || body.extension || ''
const duration = parseInt(body.duration || body.Duration || '0', 10)
const status = body.status || body.Status || eventType
// Determine the remote phone number (not the extension)
const isOutbound = direction === 'outbound' || direction === 'out'
const remotePhone = isOutbound ? callee : caller
log(`3CX ${eventType}: ${caller}${callee} ext=${ext} dir=${direction} dur=${duration}s status=${status} (${callId})`)
// Respond immediately
json(res, 200, { ok: true })
// Process async: lookup customer, log communication, broadcast SSE
setImmediate(async () => {
try {
// Only log completed calls (not ringing/answered events) to avoid duplicates
const isCallEnd = ['ended', 'completed', 'hangup', 'Notified', 'missed'].some(
s => eventType.toLowerCase().includes(s.toLowerCase()) || status.toLowerCase().includes(s.toLowerCase())
)
const customer = remotePhone ? await lookupCustomerByPhone(remotePhone) : null
const customerName = customer ? customer.name : null
const customerLabel = customer ? customer.customer_name : 'Inconnu'
// Log Communication in ERPNext for ended/completed calls
if (isCallEnd && customerName && duration > 0) {
const durationMin = Math.floor(duration / 60)
const durationSec = duration % 60
const durationStr = `${durationMin}m${durationSec.toString().padStart(2, '0')}s`
await createCommunication({
communication_type: 'Communication',
communication_medium: 'Phone',
sent_or_received: isOutbound ? 'Sent' : 'Received',
sender: 'sms@gigafibre.ca',
sender_full_name: isOutbound ? 'Targo Ops' : customerLabel,
phone_no: remotePhone,
content: `Appel ${isOutbound ? 'sortant vers' : 'entrant de'} ${remotePhone} — Duree: ${durationStr}`,
subject: `Appel ${isOutbound ? 'vers' : 'de'} ${remotePhone}`,
reference_doctype: 'Customer',
reference_name: customerName,
status: 'Linked',
})
}
// Broadcast SSE event for ALL call events (ringing, answered, ended)
const eventData = {
type: 'call',
event: eventType,
direction: isOutbound ? 'out' : 'in',
customer: customerName,
customer_name: customerLabel,
phone: remotePhone,
extension: ext,
duration,
status,
call_id: callId,
ts: new Date().toISOString(),
}
let n = 0
if (customerName) {
n = broadcast('customer:' + customerName, 'call-event', eventData)
}
// Also broadcast globally for dispatch monitoring
broadcastAll('call-event', eventData)
log(`3CX logged: ${remotePhone}${customerName || 'UNKNOWN'} (${callId}) broadcast=${n}`)
} catch (e) {
log('3CX call processing error:', e.message)
}
})
return
}
// ─── Fonoster Telephony Management API ───
// All /telephony/* endpoints manage the Fonoster CPaaS (trunks, agents, credentials, numbers, domains)
if (path.startsWith('/telephony/')) {
return handleTelephony(req, res, method, path, url)
}
// ─── 404 ───
json(res, 404, { error: 'Not found' })
} catch (e) {
log('ERROR:', e.message)
json(res, 500, { error: 'Internal error' })
}
})
function json (res, status, data) {
res.writeHead(status, { 'Content-Type': 'application/json' })
res.end(JSON.stringify(data))
}
// ── Fonoster Telephony Management (Direct DB) ──
// Access Routr + Fonoster identity PostgreSQL databases directly
let routrPool = null
let identityPool = null
function getRoutrPool () {
if (!routrPool) {
const { Pool } = require('pg')
routrPool = new Pool({ connectionString: ROUTR_DB_URL, max: 3 })
routrPool.on('error', e => log('Routr DB pool error:', e.message))
}
return routrPool
}
function getIdentityPool () {
if (!identityPool) {
const { Pool } = require('pg')
identityPool = new Pool({ connectionString: FNIDENTITY_DB_URL, max: 3 })
identityPool.on('error', e => log('Identity DB pool error:', e.message))
}
return identityPool
}
// Routr resource tables
const ROUTR_TABLES = {
trunks: 'trunks',
agents: 'agents',
credentials: 'credentials',
numbers: 'numbers',
domains: 'domains',
acls: 'access_control_lists',
peers: 'peers',
}
// Identity resource tables
const IDENTITY_TABLES = {
workspaces: 'workspaces',
users: 'users',
}
async function handleTelephony (req, res, method, path, url) {
try {
const parts = path.replace('/telephony/', '').split('/').filter(Boolean)
const resource = parts[0]
const ref = parts[1] || null
// GET /telephony/overview — Dashboard summary
if (resource === 'overview' && method === 'GET') {
const rPool = getRoutrPool()
const iPool = getIdentityPool()
const [trunks, agents, creds, numbers, domains, peers, workspaces] = await Promise.all([
rPool.query('SELECT COUNT(*) FROM trunks'),
rPool.query('SELECT COUNT(*) FROM agents'),
rPool.query('SELECT COUNT(*) FROM credentials'),
rPool.query('SELECT COUNT(*) FROM numbers'),
rPool.query('SELECT COUNT(*) FROM domains'),
rPool.query('SELECT COUNT(*) FROM peers'),
iPool.query('SELECT COUNT(*) FROM workspaces'),
])
return json(res, 200, {
trunks: parseInt(trunks.rows[0].count),
agents: parseInt(agents.rows[0].count),
credentials: parseInt(creds.rows[0].count),
numbers: parseInt(numbers.rows[0].count),
domains: parseInt(domains.rows[0].count),
peers: parseInt(peers.rows[0].count),
workspaces: parseInt(workspaces.rows[0].count),
})
}
// Determine which database
const isIdentity = IDENTITY_TABLES[resource]
const tableName = IDENTITY_TABLES[resource] || ROUTR_TABLES[resource]
if (!tableName) {
return json(res, 404, { error: `Unknown resource: ${resource}. Available: overview, ${[...Object.keys(ROUTR_TABLES), ...Object.keys(IDENTITY_TABLES)].join(', ')}` })
}
const pool = isIdentity ? getIdentityPool() : getRoutrPool()
// GET /telephony/{resource} — List all
if (method === 'GET' && !ref) {
const limit = parseInt(url.searchParams.get('limit') || '100', 10)
const offset = parseInt(url.searchParams.get('offset') || '0', 10)
const result = await pool.query(
`SELECT * FROM ${tableName} ORDER BY created_at DESC LIMIT $1 OFFSET $2`,
[limit, offset]
)
const count = await pool.query(`SELECT COUNT(*) FROM ${tableName}`)
return json(res, 200, { items: result.rows, total: parseInt(count.rows[0].count, 10) })
}
// GET /telephony/{resource}/{ref} — Get by ref
if (method === 'GET' && ref) {
const result = await pool.query(`SELECT * FROM ${tableName} WHERE ref = $1`, [ref])
if (result.rows.length === 0) return json(res, 404, { error: 'Not found' })
return json(res, 200, result.rows[0])
}
// POST /telephony/{resource} — Create
if (method === 'POST' && !ref) {
const body = await parseBody(req)
// Generate ref if not provided
if (!body.ref) {
const crypto = require('crypto')
body.ref = crypto.randomUUID()
}
if (!body.api_version) body.api_version = 'v2'
if (!body.created_at) body.created_at = new Date()
if (!body.updated_at) body.updated_at = new Date()
const keys = Object.keys(body)
const values = Object.values(body)
const placeholders = keys.map((_, i) => `$${i + 1}`)
const query = `INSERT INTO ${tableName} (${keys.join(', ')}) VALUES (${placeholders.join(', ')}) RETURNING *`
const result = await pool.query(query, values)
log(`Telephony: created ${resource}/${result.rows[0].ref}`)
return json(res, 201, result.rows[0])
}
// PUT /telephony/{resource}/{ref} — Update
if (method === 'PUT' && ref) {
const body = await parseBody(req)
body.updated_at = new Date()
delete body.ref
delete body.created_at
const keys = Object.keys(body)
const values = Object.values(body)
const setClauses = keys.map((k, i) => `${k} = $${i + 1}`)
values.push(ref)
const query = `UPDATE ${tableName} SET ${setClauses.join(', ')} WHERE ref = $${values.length} RETURNING *`
const result = await pool.query(query, values)
if (result.rows.length === 0) return json(res, 404, { error: 'Not found' })
log(`Telephony: updated ${resource}/${ref}`)
return json(res, 200, result.rows[0])
}
// DELETE /telephony/{resource}/{ref} — Delete
if (method === 'DELETE' && ref) {
const result = await pool.query(`DELETE FROM ${tableName} WHERE ref = $1 RETURNING ref`, [ref])
if (result.rows.length === 0) return json(res, 404, { error: 'Not found' })
log(`Telephony: deleted ${resource}/${ref}`)
return json(res, 200, { ok: true, deleted: ref })
}
return json(res, 405, { error: 'Method not allowed' })
} catch (e) {
log('Telephony error:', e.message)
return json(res, 500, { error: e.message })
}
}
// ── 3CX Call Log Poller ──
// Polls 3CX xAPI for recently completed calls, logs new ones to ERPNext + SSE
let pbxToken = null
let pbxTokenExpiry = 0
const processedCallIds = new Set() // Track already-processed call IDs (in memory, survives poll cycles)
let pbxPollTimer = null
function httpsFetch (url, opts = {}) {
return new Promise((resolve, reject) => {
const parsed = new URL(url)
const reqOpts = {
hostname: parsed.hostname,
port: parsed.port || 443,
path: parsed.pathname + parsed.search,
method: opts.method || 'GET',
headers: opts.headers || {},
}
const req = https.request(reqOpts, (res) => {
let body = ''
res.on('data', c => body += c)
res.on('end', () => {
try { resolve({ status: res.statusCode, data: JSON.parse(body) }) }
catch { resolve({ status: res.statusCode, data: body }) }
})
})
req.on('error', reject)
if (opts.body) req.write(typeof opts.body === 'string' ? opts.body : JSON.stringify(opts.body))
req.end()
})
}
async function get3cxToken () {
if (pbxToken && Date.now() < pbxTokenExpiry - 60000) return pbxToken // 1min buffer
if (!PBX_USER || !PBX_PASS) return null
try {
const res = await httpsFetch(PBX_URL + '/webclient/api/Login/GetAccessToken', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ Username: PBX_USER, Password: PBX_PASS }),
})
if (res.data.Status === 'AuthSuccess') {
pbxToken = res.data.Token.access_token
pbxTokenExpiry = Date.now() + (res.data.Token.expires_in * 1000)
return pbxToken
}
log('3CX auth failed:', res.data.Status)
} catch (e) {
log('3CX auth error:', e.message)
}
return null
}
async function poll3cxCallLog () {
const token = await get3cxToken()
if (!token) return
try {
// Fetch last 20 calls from the last 5 minutes
const since = new Date(Date.now() - 5 * 60 * 1000).toISOString()
const url = `${PBX_URL}/xapi/v1/ReportCallLogData?$top=20&$orderby=StartTime%20desc&$filter=StartTime%20gt%20${encodeURIComponent(since)}`
const res = await httpsFetch(url, {
headers: { Authorization: 'Bearer ' + token },
})
if (res.status !== 200 || !res.data.value) return
for (const call of res.data.value) {
const callId = call.CdrId || call.CallHistoryId || ''
if (!callId || processedCallIds.has(callId)) continue
// Only process answered/completed calls (not segments still in progress)
if (!call.TalkingDuration && !call.Answered) continue
processedCallIds.add(callId)
// Parse call data
const direction = (call.Direction || '').toLowerCase().includes('inbound') ? 'in' : 'out'
const isOutbound = direction === 'out'
const sourcePhone = call.SourceCallerId || call.SourceDn || ''
const destPhone = call.DestinationCallerId || call.DestinationDn || ''
const remotePhone = isOutbound ? destPhone : sourcePhone
const ext = isOutbound ? call.SourceDn : call.DestinationDn
// Parse duration from ISO 8601 duration (PT1M30S) or seconds
let durationSec = 0
const td = call.TalkingDuration || ''
if (typeof td === 'string' && td.startsWith('PT')) {
const minMatch = td.match(/(\d+)M/)
const secMatch = td.match(/(\d+)S/)
const hrMatch = td.match(/(\d+)H/)
durationSec = (hrMatch ? parseInt(hrMatch[1]) * 3600 : 0) +
(minMatch ? parseInt(minMatch[1]) * 60 : 0) +
(secMatch ? parseInt(secMatch[1]) : 0)
} else if (typeof td === 'number') {
durationSec = td
}
// Skip very short/empty calls
if (durationSec < 3 && !call.Answered) continue
log(`3CX POLL: ${sourcePhone}${destPhone} dir=${direction} dur=${durationSec}s answered=${call.Answered} (${callId})`)
// Lookup customer and log
try {
const customer = remotePhone ? await lookupCustomerByPhone(remotePhone) : null
const customerName = customer ? customer.name : null
const customerLabel = customer ? customer.customer_name : 'Inconnu'
// Log Communication in ERPNext
if (customerName) {
const durationMin = Math.floor(durationSec / 60)
const durationS = durationSec % 60
const durationStr = `${durationMin}m${durationS.toString().padStart(2, '0')}s`
await createCommunication({
communication_type: 'Communication',
communication_medium: 'Phone',
sent_or_received: isOutbound ? 'Sent' : 'Received',
sender: 'sms@gigafibre.ca',
sender_full_name: isOutbound ? (call.SourceDisplayName || 'Targo Ops') : customerLabel,
phone_no: remotePhone,
content: `Appel ${isOutbound ? 'sortant vers' : 'entrant de'} ${remotePhone} — Duree: ${durationStr}${call.Answered ? '' : ' (manque)'}`,
subject: `Appel ${isOutbound ? 'vers' : 'de'} ${remotePhone}`,
reference_doctype: 'Customer',
reference_name: customerName,
status: 'Linked',
})
log(`3CX logged to ERPNext: ${remotePhone}${customerName} (${durationStr})`)
}
// Broadcast SSE
const eventData = {
type: 'call',
event: 'completed',
direction,
customer: customerName,
customer_name: customerLabel,
phone: remotePhone,
extension: ext,
duration: durationSec,
answered: call.Answered,
call_id: callId,
ts: call.StartTime || new Date().toISOString(),
}
if (customerName) {
broadcast('customer:' + customerName, 'call-event', eventData)
}
broadcastAll('call-event', eventData)
} catch (e) {
log('3CX poll processing error:', e.message)
}
}
// Prune old processed IDs (keep last 500)
if (processedCallIds.size > 500) {
const arr = [...processedCallIds]
arr.splice(0, arr.length - 500)
processedCallIds.clear()
arr.forEach(id => processedCallIds.add(id))
}
} catch (e) {
log('3CX poll error:', e.message)
}
}
function start3cxPoller () {
if (!PBX_ENABLED) {
log('3CX poller: DISABLED (PBX_ENABLED != 1, using Twilio instead)')
return
}
if (!PBX_USER || !PBX_PASS) {
log('3CX poller: DISABLED (no PBX_USER/PBX_PASS)')
return
}
log(`3CX poller: ENABLED (every ${PBX_POLL_INTERVAL / 1000}s) → ${PBX_URL}`)
// Initial poll after 5s
setTimeout(poll3cxCallLog, 5000)
pbxPollTimer = setInterval(poll3cxCallLog, PBX_POLL_INTERVAL)
}
server.listen(PORT, '0.0.0.0', () => {
log(`targo-hub listening on :${PORT}`)
log(` SSE: GET /sse?topics=customer:C-LPB4`)
log(` Broadcast: POST /broadcast`)
log(` SMS In: POST /webhook/twilio/sms-incoming`)
log(` SMS Out: POST /send/sms`)
log(` 3CX Call: POST /webhook/3cx/call-event`)
log(` Voice Tkn: GET /voice/token`)
log(` Voice TML: POST /voice/twiml`)
log(` Voice Sts: POST /voice/status`)
log(` Health: GET /health`)
log(` Telephony: GET|POST|PUT|DELETE /telephony/{trunks,agents,credentials,numbers,domains,acls,peers,workspaces,users}/{ref?}`)
log(` Telephony: GET /telephony/overview`)
log(` Routr DB: ${ROUTR_DB_URL.replace(/:[^:@]+@/, ':***@')}`)
// Start 3CX poller
start3cxPoller()
})