gigafibre-fsm/services/roster-solver/solver.py
louispaulb f4138cdd75 Roster AI (planification) + prise de rendez-vous client
Solveur OR-Tools (services/roster-solver) : couverture, compétences,
équité, coût chargé, cadence/efficacité, capacité-par-job ; contraintes
dures/souples façon Timefold.

Hub (lib/roster.js) : génération via solveur, publication par réécriture
de semaine (anti-doublons), demande (effectif ou nb de jobs), cadence/coût/
compétences par tech, pause, congés (Tech Availability + approbation),
booking (slots roster-aware / fit 3-dispos / confirm) + portail public /book.
Réessai sur serialization failures frappe_pg ; appels ERP séquentiels.

Ops : page Planification (grille compacte « J8 », multi-shift, drag-select
+ undo/redo, modèles de semaine, éditeur cadence&coût, congés, SMS opt-in),
page Rendez-vous (répartiteur), jobColor tech en pause → tickets rouges.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-03 16:42:44 -04:00

225 lines
10 KiB
Python

"""
Roster AI — solveur d'optimisation d'horaires (OR-Tools CP-SAT).
Modèle de contraintes "façon Timefold" : contraintes DURES (faisabilité) +
contraintes SOUPLES (objectif pondéré). Agnostique du backend : on lui passe
des techniciens + disponibilités + besoins de couverture, il rend des
assignations optimales (+ un rapport de couverture dispo-vs-requis).
Entrée / sortie : voir README.md et sample_request.json.
"""
from __future__ import annotations
from datetime import date, timedelta
from ortools.sat.python import cp_model
FR_ABBR = ["lun", "mar", "mer", "jeu", "ven", "sam", "dim"] # date.weekday(): lun=0..dim=6
def _parse_date(s: str) -> date:
y, m, d = (int(x) for x in s.split("-"))
return date(y, m, d)
def fr_day(d: date) -> str:
return FR_ABBR[d.weekday()]
def solve_roster(req: dict) -> dict:
"""Résout un horaire. `req` = payload décrit dans le README. Retourne un dict."""
# ── Entrées ──────────────────────────────────────────────────────────────
horizon = req["horizon"]
start = _parse_date(horizon["start"])
n_days = int(horizon["days"])
days = [start + timedelta(days=i) for i in range(n_days)]
day_set = {d.isoformat() for d in days}
shifts = {s["id"]: s for s in req["shift_templates"]}
techs = req["technicians"]
tech_by_id = {t["id"]: t for t in techs}
W = {
"uncovered": 1000, # pénalité par poste non couvert (priorité #1)
"assignment": 8, # pénalité par assignation → couvrir le requis SANS sur-staffer
"efficiency": 3, # préférer les techs plus rapides (facteur temps bas)
"fairness": 5, # écart d'heures max-min
"cost": 1, # coût horaire
"preference": 8, # travailler un jour "préféré off"
"continuity": 4, # bonus si la zone == zone d'attache du tech
**(req.get("weights") or {}),
}
# ── Slots de couverture (date, shift, zone, requis, compétences) ──────────
# On ne garde que les slots dont la date est dans l'horizon et le shift connu.
slots = []
for c in req.get("coverage", []):
if c["date"] not in day_set or c["shift"] not in shifts:
continue
slots.append({
"date": c["date"],
"shift": c["shift"],
"zone": c.get("zone", ""),
"required": int(c.get("required") or 1),
"skills": set(c.get("required_skills") or []),
"hours": int(shifts[c["shift"]].get("hours") or 8),
})
model = cp_model.CpModel()
# ── Variables de décision : y[(tech_id, slot_idx)] ∈ {0,1} ────────────────
# On ne crée la variable QUE si le tech est qualifié (compétences) ET
# disponible (date pas dans unavailable). Élaguer réduit la taille du modèle.
y = {}
for ti, t in enumerate(techs):
tskills = set(t.get("skills", []))
unavailable = set(t.get("unavailable", []))
for si, slot in enumerate(slots):
if slot["date"] in unavailable:
continue
if not slot["skills"].issubset(tskills):
continue
y[(t["id"], si)] = model.NewBoolVar(f"y_{t['id']}_{si}")
# ── Contrainte DURE : ≤ 1 shift par tech par jour ─────────────────────────
for t in techs:
by_date = {}
for si, slot in enumerate(slots):
if (t["id"], si) in y:
by_date.setdefault(slot["date"], []).append(y[(t["id"], si)])
for _d, vars_ in by_date.items():
if len(vars_) > 1:
model.AddAtMostOne(vars_)
# ── Heures + jours travaillés par tech (pour DURES max + SOUPLE équité) ───
tech_hours = {}
for t in techs:
terms = []
days_vars = []
for si, slot in enumerate(slots):
if (t["id"], si) in y:
terms.append(slot["hours"] * y[(t["id"], si)])
days_vars.append(y[(t["id"], si)])
h = model.NewIntVar(0, 24 * n_days, f"hours_{t['id']}")
model.Add(h == (sum(terms) if terms else 0)) # parenthèses: sinon model.Add(0) quand terms vide
tech_hours[t["id"]] = h
# DURE : max heures / semaine
if t.get("max_hours_week") is not None:
model.Add(h <= int(t["max_hours_week"]))
# DURE : max jours (≤1/jour, donc somme des y = nb de jours)
if t.get("max_days") is not None and days_vars:
model.Add(sum(days_vars) <= int(t["max_days"]))
# ── Couverture SOUPLE : shortfall[slot] = postes manquants (≥0) ───────────
shortfalls = []
coverage_report = []
for si, slot in enumerate(slots):
assigned_vars = [y[(t["id"], si)] for t in techs if (t["id"], si) in y]
short = model.NewIntVar(0, slot["required"], f"short_{si}")
# sum(assigned) + short >= required → short absorbe le manque
model.Add(sum(assigned_vars) + short >= slot["required"])
shortfalls.append(short)
coverage_report.append((si, slot, assigned_vars, short))
# ── Équité SOUPLE : minimiser (max_h - min_h) parmi les techs actifs ──────
# On borne max_h et min_h sur tous les techs qui ont au moins une variable.
active = [t["id"] for t in techs if any((t["id"], si) in y for si in range(len(slots)))]
spread = model.NewIntVar(0, 24 * n_days, "spread")
if active:
max_h = model.NewIntVar(0, 24 * n_days, "max_h")
min_h = model.NewIntVar(0, 24 * n_days, "min_h")
for tid in active:
model.Add(max_h >= tech_hours[tid])
model.Add(min_h <= tech_hours[tid])
model.Add(spread == max_h - min_h)
else:
model.Add(spread == 0)
# ── Objectif pondéré ──────────────────────────────────────────────────────
obj = []
# 1) couverture (priorité écrasante)
for short in shortfalls:
obj.append(W["uncovered"] * short)
# 2) équité
obj.append(W["fairness"] * spread)
# 3) coût + 4) préférences + 5) continuité de zone
for ti, t in enumerate(techs):
cost_h = int(t.get("cost_per_h") or 0)
pref_off = set(t.get("preferred_off", []))
zone_home = t.get("zone_home")
tf = int(round((float(t.get("time_factor") or 1.0)) * 10)) # 10=normal, 11=+10% lent, 9=-10% rapide
for si, slot in enumerate(slots):
v = y.get((t["id"], si))
if v is None:
continue
# pénalité par assignation : couvrir le requis sans sur-staffer
obj.append(W["assignment"] * v)
# préférer les techs plus rapides (facteur temps bas)
obj.append(W["efficiency"] * tf * v)
# coût
if cost_h:
obj.append(W["cost"] * cost_h * slot["hours"] * v)
# préférence : pénalité si le jour est "préféré off"
if fr_day(_parse_date(slot["date"])) in pref_off:
obj.append(W["preference"] * v)
# continuité de zone : bonus (= pénalité négative) si même zone d'attache
if zone_home and slot["zone"] == zone_home:
obj.append(-W["continuity"] * v)
model.Minimize(sum(obj))
# ── Résolution ────────────────────────────────────────────────────────────
solver = cp_model.CpSolver()
solver.parameters.max_time_in_seconds = float(req.get("max_seconds") or 10)
solver.parameters.num_search_workers = 8
status = solver.Solve(model)
status_name = solver.StatusName(status)
if status not in (cp_model.OPTIMAL, cp_model.FEASIBLE):
return {"status": status_name, "assignments": [], "coverage_report": [],
"tech_hours": {}, "objective": None,
"message": "Aucune solution (contraintes dures incompatibles)."}
# ── Construire le résultat ────────────────────────────────────────────────
assignments = []
for ti, t in enumerate(techs):
for si, slot in enumerate(slots):
v = y.get((t["id"], si))
if v is not None and solver.Value(v) == 1:
assignments.append({
"tech": t["id"], "tech_name": t.get("name", t["id"]),
"date": slot["date"], "shift": slot["shift"],
"shift_name": shifts[slot["shift"]].get("name", slot["shift"]),
"zone": slot["zone"], "hours": slot["hours"],
})
cov = []
for si, slot, assigned_vars, short in coverage_report:
n_assigned = sum(solver.Value(v) for v in assigned_vars)
cov.append({
"date": slot["date"], "shift": slot["shift"], "zone": slot["zone"],
"required": slot["required"], "assigned": int(n_assigned),
"shortfall": int(solver.Value(short)),
})
hours_out = {tid: int(solver.Value(h)) for tid, h in tech_hours.items()}
# Explications lisibles (pour l'UI)
explanations = []
for tid in sorted(hours_out, key=lambda k: -hours_out[k]):
n_shifts = sum(1 for a in assignments if a["tech"] == tid)
if n_shifts:
explanations.append(f"{tech_by_id[tid].get('name', tid)} : {n_shifts} shift(s), {hours_out[tid]} h")
total_short = sum(c["shortfall"] for c in cov)
if total_short:
explanations.insert(0, f"⚠️ {total_short} poste(s) non couvert(s) — effectif insuffisant ce(s) jour(s).")
return {
"status": status_name,
"assignments": assignments,
"coverage_report": cov,
"tech_hours": hours_out,
"objective": solver.ObjectiveValue(),
"spread_hours": int(solver.Value(spread)),
"total_shortfall": total_short,
"explanations": explanations,
"solve_ms": int(solver.WallTime() * 1000),
}