gigafibre-fsm/erpnext/flow_scheduler.py
louispaulb 41d9b5f316 feat: flow editor, Gemini QR scanner with offline queue, dispatch planning v2
Major additions accumulated over 9 days — single commit per request.

Flow editor (new):
- Generic visual editor for step trees, usable by project wizard + agent flows
- PROJECT_KINDS / AGENT_KINDS catalogs decouple UI from domain
- Drag-and-drop reorder via vuedraggable with scope isolation per peer group
- Chain-aware depends_on rewrite on reorder (sequential only — DAGs preserved)
- Variable picker with per-applies_to catalog (Customer / Quotation /
  Service Contract / Issue / Subscription), insert + copy-clipboard modes
- trigger_condition helper with domain-specific JSONLogic examples
- Global FlowEditorDialog mounted once in MainLayout, Odoo inline pattern
- Server: targo-hub flow-runtime.js, flow-api.js, flow-templates.js
- ERPNext: Flow Template/Run doctypes, scheduler, 5 seeded system templates
- depends_on chips resolve to step labels instead of opaque "s4" ids

QR/OCR scanner (field app):
- Camera capture → Gemini Vision via targo-hub with 8s timeout
- IndexedDB offline queue retries photos when signal returns
- Watcher merges late-arriving scan results into the live UI

Dispatch:
- Planning mode (draft → publish) with offer pool for unassigned jobs
- Shared presets, recurrence selector, suggested-slots dialog
- PublishScheduleModal, unassign confirmation

Ops app:
- ClientDetailPage composables extraction (useClientData, useDeviceStatus,
  useWifiDiagnostic, useModemDiagnostic)
- Project wizard: shared detail sections, wizard catalog/publish composables
- Address pricing composable + pricing-mock data
- Settings redesign hosting flow templates

Targo-hub:
- Contract acceptance (JWT residential + DocuSeal commercial tracks)
- Referral system
- Modem-bridge diagnostic normalizer
- Device extractors consolidated

Migration scripts:
- Invoice/quote print format setup, Jinja rendering
- Additional import + fix scripts (reversals, dates, customers, payments)

Docs:
- Consolidated: old scattered MDs → HANDOFF, ARCHITECTURE, DATA_AND_FLOWS,
  FLOW_EDITOR_ARCHITECTURE, BILLING_AND_PAYMENTS, CPE_MANAGEMENT,
  APP_DESIGN_GUIDELINES
- Archived legacy wizard PHP for reference
- STATUS snapshots for 2026-04-18/19

Cleanup:
- Removed ~40 generated PDFs/HTMLs (invoice_preview*, rendered_jinja*)
- .gitignore now covers invoice preview output + nested .DS_Store

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-22 10:44:17 -04:00

182 lines
5.6 KiB
Python

"""
flow_scheduler.py — Frappe scheduler tick for delayed Flow steps.
Purpose
───────
The Flow Runtime in targo-hub stores delayed steps as `Flow Step Pending`
rows with a `trigger_at` timestamp. This script (run every minute by a
Frappe cron hook) fetches rows whose `trigger_at <= now()` and nudges the
hub to advance the owning Flow Run.
How it's hooked
───────────────
In ERPNext, add to hooks.py (or scheduler config):
scheduler_events = {
"cron": {
"* * * * *": [
"path.to.flow_scheduler.tick",
],
},
}
Or invoke manually from bench console for testing:
docker exec -u frappe erpnext-backend-1 bash -c \\
'cd /home/frappe/frappe-bench/sites && \\
/home/frappe/frappe-bench/env/bin/python -c \\
"import frappe; frappe.init(site=\\"erp.gigafibre.ca\\"); frappe.connect(); \\
from flow_scheduler import tick; tick()"'
Behaviour
─────────
- Atomic status flip: `pending` → `running` under a transaction so two
ticks can't double-fire.
- POSTs to `HUB_URL/flow/complete` with `{run, step_id}`. Hub fires the
kind handler and advances the run.
- On hub error: retry_count += 1, status stays `pending` until 5 retries
exhausted (then marked `failed`).
- On success: status = `completed`.
Idempotent: safe to call multiple times — only truly due rows are fired.
"""
import os
import json
import time
import urllib.request
import urllib.error
import frappe
# ---------------------------------------------------------------------------
# Config — these default to the Docker compose network hostname.
# Override via environment variables in ERPNext container if hub URL changes.
# ---------------------------------------------------------------------------
HUB_URL = os.environ.get("HUB_URL", "http://targo-hub:3300")
INTERNAL_TOKEN = os.environ.get("HUB_INTERNAL_TOKEN", "")
MAX_RETRIES = 5
BATCH_LIMIT = 50 # safety net: fire at most N pending steps per tick
# ---------------------------------------------------------------------------
# Public entry point — Frappe cron calls this
# ---------------------------------------------------------------------------
def tick():
"""Process all due pending steps. Called every minute by Frappe cron."""
due = _claim_due_rows()
if not due:
return
frappe.logger().info(f"[flow_scheduler] claimed {len(due)} due rows")
for row in due:
_fire_row(row)
# ---------------------------------------------------------------------------
# Claim phase: atomically flip due rows from pending → running
# ---------------------------------------------------------------------------
def _claim_due_rows():
"""
Return all Flow Step Pending rows where:
status = 'pending' AND trigger_at <= now()
Flips them to 'running' in the same transaction to prevent double-firing.
"""
now = frappe.utils.now()
rows = frappe.get_all(
"Flow Step Pending",
filters={
"status": "pending",
"trigger_at": ["<=", now],
},
fields=["name", "flow_run", "step_id", "retry_count"],
limit=BATCH_LIMIT,
order_by="trigger_at asc",
)
if not rows:
return []
# Optimistic claim: update in bulk. If it fails (concurrent tick), we
# simply re-query; PostgreSQL serialisation will prevent double-runs.
names = [r["name"] for r in rows]
frappe.db.sql(
"""UPDATE `tabFlow Step Pending`
SET status='running', modified=%s, modified_by=%s
WHERE name IN %s AND status='pending'""",
(now, "Administrator", tuple(names)),
)
frappe.db.commit()
return rows
# ---------------------------------------------------------------------------
# Fire phase: POST to Hub /flow/complete
# ---------------------------------------------------------------------------
def _fire_row(row):
"""Send a completion event to the Hub. Update row status on response."""
payload = {
"run": row["flow_run"],
"step_id": row["step_id"],
"result": {"fired_by": "scheduler", "fired_at": frappe.utils.now()},
}
try:
_post_hub("/flow/complete", payload)
_mark_done(row["name"])
except Exception as e: # noqa: BLE001
_handle_failure(row, str(e))
def _post_hub(path, body):
data = json.dumps(body).encode("utf-8")
req = urllib.request.Request(
HUB_URL + path,
data=data,
method="POST",
headers={"Content-Type": "application/json"},
)
if INTERNAL_TOKEN:
req.add_header("Authorization", "Bearer " + INTERNAL_TOKEN)
with urllib.request.urlopen(req, timeout=15) as resp:
body = resp.read().decode("utf-8")
if resp.status >= 400:
raise RuntimeError(f"Hub HTTP {resp.status}: {body[:200]}")
return body
def _mark_done(row_name):
frappe.db.set_value(
"Flow Step Pending",
row_name,
{
"status": "completed",
"executed_at": frappe.utils.now(),
"last_error": "",
},
update_modified=False,
)
frappe.db.commit()
def _handle_failure(row, err_msg):
retries = int(row.get("retry_count") or 0) + 1
final = retries >= MAX_RETRIES
frappe.db.set_value(
"Flow Step Pending",
row["name"],
{
"status": "failed" if final else "pending",
"retry_count": retries,
"last_error": err_msg[:500],
},
update_modified=False,
)
frappe.db.commit()
frappe.logger().error(
f"[flow_scheduler] row={row['name']} retry={retries} err={err_msg[:200]}"
)