""" flow_scheduler.py — Frappe scheduler tick for delayed Flow steps. Purpose ─────── The Flow Runtime in targo-hub stores delayed steps as `Flow Step Pending` rows with a `trigger_at` timestamp. This script (run every minute by a Frappe cron hook) fetches rows whose `trigger_at <= now()` and nudges the hub to advance the owning Flow Run. How it's hooked ─────────────── In ERPNext, add to hooks.py (or scheduler config): scheduler_events = { "cron": { "* * * * *": [ "path.to.flow_scheduler.tick", ], }, } Or invoke manually from bench console for testing: docker exec -u frappe erpnext-backend-1 bash -c \\ 'cd /home/frappe/frappe-bench/sites && \\ /home/frappe/frappe-bench/env/bin/python -c \\ "import frappe; frappe.init(site=\\"erp.gigafibre.ca\\"); frappe.connect(); \\ from flow_scheduler import tick; tick()"' Behaviour ───────── - Atomic status flip: `pending` → `running` under a transaction so two ticks can't double-fire. - POSTs to `HUB_URL/flow/complete` with `{run, step_id}`. Hub fires the kind handler and advances the run. - On hub error: retry_count += 1, status stays `pending` until 5 retries exhausted (then marked `failed`). - On success: status = `completed`. Idempotent: safe to call multiple times — only truly due rows are fired. """ import os import json import time import urllib.request import urllib.error import frappe # --------------------------------------------------------------------------- # Config — these default to the Docker compose network hostname. # Override via environment variables in ERPNext container if hub URL changes. # --------------------------------------------------------------------------- HUB_URL = os.environ.get("HUB_URL", "http://targo-hub:3300") INTERNAL_TOKEN = os.environ.get("HUB_INTERNAL_TOKEN", "") MAX_RETRIES = 5 BATCH_LIMIT = 50 # safety net: fire at most N pending steps per tick # --------------------------------------------------------------------------- # Public entry point — Frappe cron calls this # --------------------------------------------------------------------------- def tick(): """Process all due pending steps. Called every minute by Frappe cron.""" due = _claim_due_rows() if not due: return frappe.logger().info(f"[flow_scheduler] claimed {len(due)} due rows") for row in due: _fire_row(row) # --------------------------------------------------------------------------- # Claim phase: atomically flip due rows from pending → running # --------------------------------------------------------------------------- def _claim_due_rows(): """ Return all Flow Step Pending rows where: status = 'pending' AND trigger_at <= now() Flips them to 'running' in the same transaction to prevent double-firing. """ now = frappe.utils.now() rows = frappe.get_all( "Flow Step Pending", filters={ "status": "pending", "trigger_at": ["<=", now], }, fields=["name", "flow_run", "step_id", "retry_count"], limit=BATCH_LIMIT, order_by="trigger_at asc", ) if not rows: return [] # Optimistic claim: update in bulk. If it fails (concurrent tick), we # simply re-query; PostgreSQL serialisation will prevent double-runs. names = [r["name"] for r in rows] frappe.db.sql( """UPDATE `tabFlow Step Pending` SET status='running', modified=%s, modified_by=%s WHERE name IN %s AND status='pending'""", (now, "Administrator", tuple(names)), ) frappe.db.commit() return rows # --------------------------------------------------------------------------- # Fire phase: POST to Hub /flow/complete # --------------------------------------------------------------------------- def _fire_row(row): """Send a completion event to the Hub. Update row status on response.""" payload = { "run": row["flow_run"], "step_id": row["step_id"], "result": {"fired_by": "scheduler", "fired_at": frappe.utils.now()}, } try: _post_hub("/flow/complete", payload) _mark_done(row["name"]) except Exception as e: # noqa: BLE001 _handle_failure(row, str(e)) def _post_hub(path, body): data = json.dumps(body).encode("utf-8") req = urllib.request.Request( HUB_URL + path, data=data, method="POST", headers={"Content-Type": "application/json"}, ) if INTERNAL_TOKEN: req.add_header("Authorization", "Bearer " + INTERNAL_TOKEN) with urllib.request.urlopen(req, timeout=15) as resp: body = resp.read().decode("utf-8") if resp.status >= 400: raise RuntimeError(f"Hub HTTP {resp.status}: {body[:200]}") return body def _mark_done(row_name): frappe.db.set_value( "Flow Step Pending", row_name, { "status": "completed", "executed_at": frappe.utils.now(), "last_error": "", }, update_modified=False, ) frappe.db.commit() def _handle_failure(row, err_msg): retries = int(row.get("retry_count") or 0) + 1 final = retries >= MAX_RETRIES frappe.db.set_value( "Flow Step Pending", row["name"], { "status": "failed" if final else "pending", "retry_count": retries, "last_error": err_msg[:500], }, update_modified=False, ) frappe.db.commit() frappe.logger().error( f"[flow_scheduler] row={row['name']} retry={retries} err={err_msg[:200]}" )