Solveur OR-Tools (services/roster-solver) : couverture, compétences, équité, coût chargé, cadence/efficacité, capacité-par-job ; contraintes dures/souples façon Timefold. Hub (lib/roster.js) : génération via solveur, publication par réécriture de semaine (anti-doublons), demande (effectif ou nb de jobs), cadence/coût/ compétences par tech, pause, congés (Tech Availability + approbation), booking (slots roster-aware / fit 3-dispos / confirm) + portail public /book. Réessai sur serialization failures frappe_pg ; appels ERP séquentiels. Ops : page Planification (grille compacte « J8 », multi-shift, drag-select + undo/redo, modèles de semaine, éditeur cadence&coût, congés, SMS opt-in), page Rendez-vous (répartiteur), jobColor tech en pause → tickets rouges. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
225 lines
10 KiB
Python
225 lines
10 KiB
Python
"""
|
|
Roster AI — solveur d'optimisation d'horaires (OR-Tools CP-SAT).
|
|
|
|
Modèle de contraintes "façon Timefold" : contraintes DURES (faisabilité) +
|
|
contraintes SOUPLES (objectif pondéré). Agnostique du backend : on lui passe
|
|
des techniciens + disponibilités + besoins de couverture, il rend des
|
|
assignations optimales (+ un rapport de couverture dispo-vs-requis).
|
|
|
|
Entrée / sortie : voir README.md et sample_request.json.
|
|
"""
|
|
from __future__ import annotations
|
|
from datetime import date, timedelta
|
|
from ortools.sat.python import cp_model
|
|
|
|
FR_ABBR = ["lun", "mar", "mer", "jeu", "ven", "sam", "dim"] # date.weekday(): lun=0..dim=6
|
|
|
|
|
|
def _parse_date(s: str) -> date:
|
|
y, m, d = (int(x) for x in s.split("-"))
|
|
return date(y, m, d)
|
|
|
|
|
|
def fr_day(d: date) -> str:
|
|
return FR_ABBR[d.weekday()]
|
|
|
|
|
|
def solve_roster(req: dict) -> dict:
|
|
"""Résout un horaire. `req` = payload décrit dans le README. Retourne un dict."""
|
|
# ── Entrées ──────────────────────────────────────────────────────────────
|
|
horizon = req["horizon"]
|
|
start = _parse_date(horizon["start"])
|
|
n_days = int(horizon["days"])
|
|
days = [start + timedelta(days=i) for i in range(n_days)]
|
|
day_set = {d.isoformat() for d in days}
|
|
|
|
shifts = {s["id"]: s for s in req["shift_templates"]}
|
|
techs = req["technicians"]
|
|
tech_by_id = {t["id"]: t for t in techs}
|
|
|
|
W = {
|
|
"uncovered": 1000, # pénalité par poste non couvert (priorité #1)
|
|
"assignment": 8, # pénalité par assignation → couvrir le requis SANS sur-staffer
|
|
"efficiency": 3, # préférer les techs plus rapides (facteur temps bas)
|
|
"fairness": 5, # écart d'heures max-min
|
|
"cost": 1, # coût horaire
|
|
"preference": 8, # travailler un jour "préféré off"
|
|
"continuity": 4, # bonus si la zone == zone d'attache du tech
|
|
**(req.get("weights") or {}),
|
|
}
|
|
|
|
# ── Slots de couverture (date, shift, zone, requis, compétences) ──────────
|
|
# On ne garde que les slots dont la date est dans l'horizon et le shift connu.
|
|
slots = []
|
|
for c in req.get("coverage", []):
|
|
if c["date"] not in day_set or c["shift"] not in shifts:
|
|
continue
|
|
slots.append({
|
|
"date": c["date"],
|
|
"shift": c["shift"],
|
|
"zone": c.get("zone", "—"),
|
|
"required": int(c.get("required") or 1),
|
|
"skills": set(c.get("required_skills") or []),
|
|
"hours": int(shifts[c["shift"]].get("hours") or 8),
|
|
})
|
|
|
|
model = cp_model.CpModel()
|
|
|
|
# ── Variables de décision : y[(tech_id, slot_idx)] ∈ {0,1} ────────────────
|
|
# On ne crée la variable QUE si le tech est qualifié (compétences) ET
|
|
# disponible (date pas dans unavailable). Élaguer réduit la taille du modèle.
|
|
y = {}
|
|
for ti, t in enumerate(techs):
|
|
tskills = set(t.get("skills", []))
|
|
unavailable = set(t.get("unavailable", []))
|
|
for si, slot in enumerate(slots):
|
|
if slot["date"] in unavailable:
|
|
continue
|
|
if not slot["skills"].issubset(tskills):
|
|
continue
|
|
y[(t["id"], si)] = model.NewBoolVar(f"y_{t['id']}_{si}")
|
|
|
|
# ── Contrainte DURE : ≤ 1 shift par tech par jour ─────────────────────────
|
|
for t in techs:
|
|
by_date = {}
|
|
for si, slot in enumerate(slots):
|
|
if (t["id"], si) in y:
|
|
by_date.setdefault(slot["date"], []).append(y[(t["id"], si)])
|
|
for _d, vars_ in by_date.items():
|
|
if len(vars_) > 1:
|
|
model.AddAtMostOne(vars_)
|
|
|
|
# ── Heures + jours travaillés par tech (pour DURES max + SOUPLE équité) ───
|
|
tech_hours = {}
|
|
for t in techs:
|
|
terms = []
|
|
days_vars = []
|
|
for si, slot in enumerate(slots):
|
|
if (t["id"], si) in y:
|
|
terms.append(slot["hours"] * y[(t["id"], si)])
|
|
days_vars.append(y[(t["id"], si)])
|
|
h = model.NewIntVar(0, 24 * n_days, f"hours_{t['id']}")
|
|
model.Add(h == (sum(terms) if terms else 0)) # parenthèses: sinon model.Add(0) quand terms vide
|
|
tech_hours[t["id"]] = h
|
|
# DURE : max heures / semaine
|
|
if t.get("max_hours_week") is not None:
|
|
model.Add(h <= int(t["max_hours_week"]))
|
|
# DURE : max jours (≤1/jour, donc somme des y = nb de jours)
|
|
if t.get("max_days") is not None and days_vars:
|
|
model.Add(sum(days_vars) <= int(t["max_days"]))
|
|
|
|
# ── Couverture SOUPLE : shortfall[slot] = postes manquants (≥0) ───────────
|
|
shortfalls = []
|
|
coverage_report = []
|
|
for si, slot in enumerate(slots):
|
|
assigned_vars = [y[(t["id"], si)] for t in techs if (t["id"], si) in y]
|
|
short = model.NewIntVar(0, slot["required"], f"short_{si}")
|
|
# sum(assigned) + short >= required → short absorbe le manque
|
|
model.Add(sum(assigned_vars) + short >= slot["required"])
|
|
shortfalls.append(short)
|
|
coverage_report.append((si, slot, assigned_vars, short))
|
|
|
|
# ── Équité SOUPLE : minimiser (max_h - min_h) parmi les techs actifs ──────
|
|
# On borne max_h et min_h sur tous les techs qui ont au moins une variable.
|
|
active = [t["id"] for t in techs if any((t["id"], si) in y for si in range(len(slots)))]
|
|
spread = model.NewIntVar(0, 24 * n_days, "spread")
|
|
if active:
|
|
max_h = model.NewIntVar(0, 24 * n_days, "max_h")
|
|
min_h = model.NewIntVar(0, 24 * n_days, "min_h")
|
|
for tid in active:
|
|
model.Add(max_h >= tech_hours[tid])
|
|
model.Add(min_h <= tech_hours[tid])
|
|
model.Add(spread == max_h - min_h)
|
|
else:
|
|
model.Add(spread == 0)
|
|
|
|
# ── Objectif pondéré ──────────────────────────────────────────────────────
|
|
obj = []
|
|
# 1) couverture (priorité écrasante)
|
|
for short in shortfalls:
|
|
obj.append(W["uncovered"] * short)
|
|
# 2) équité
|
|
obj.append(W["fairness"] * spread)
|
|
# 3) coût + 4) préférences + 5) continuité de zone
|
|
for ti, t in enumerate(techs):
|
|
cost_h = int(t.get("cost_per_h") or 0)
|
|
pref_off = set(t.get("preferred_off", []))
|
|
zone_home = t.get("zone_home")
|
|
tf = int(round((float(t.get("time_factor") or 1.0)) * 10)) # 10=normal, 11=+10% lent, 9=-10% rapide
|
|
for si, slot in enumerate(slots):
|
|
v = y.get((t["id"], si))
|
|
if v is None:
|
|
continue
|
|
# pénalité par assignation : couvrir le requis sans sur-staffer
|
|
obj.append(W["assignment"] * v)
|
|
# préférer les techs plus rapides (facteur temps bas)
|
|
obj.append(W["efficiency"] * tf * v)
|
|
# coût
|
|
if cost_h:
|
|
obj.append(W["cost"] * cost_h * slot["hours"] * v)
|
|
# préférence : pénalité si le jour est "préféré off"
|
|
if fr_day(_parse_date(slot["date"])) in pref_off:
|
|
obj.append(W["preference"] * v)
|
|
# continuité de zone : bonus (= pénalité négative) si même zone d'attache
|
|
if zone_home and slot["zone"] == zone_home:
|
|
obj.append(-W["continuity"] * v)
|
|
model.Minimize(sum(obj))
|
|
|
|
# ── Résolution ────────────────────────────────────────────────────────────
|
|
solver = cp_model.CpSolver()
|
|
solver.parameters.max_time_in_seconds = float(req.get("max_seconds") or 10)
|
|
solver.parameters.num_search_workers = 8
|
|
status = solver.Solve(model)
|
|
status_name = solver.StatusName(status)
|
|
|
|
if status not in (cp_model.OPTIMAL, cp_model.FEASIBLE):
|
|
return {"status": status_name, "assignments": [], "coverage_report": [],
|
|
"tech_hours": {}, "objective": None,
|
|
"message": "Aucune solution (contraintes dures incompatibles)."}
|
|
|
|
# ── Construire le résultat ────────────────────────────────────────────────
|
|
assignments = []
|
|
for ti, t in enumerate(techs):
|
|
for si, slot in enumerate(slots):
|
|
v = y.get((t["id"], si))
|
|
if v is not None and solver.Value(v) == 1:
|
|
assignments.append({
|
|
"tech": t["id"], "tech_name": t.get("name", t["id"]),
|
|
"date": slot["date"], "shift": slot["shift"],
|
|
"shift_name": shifts[slot["shift"]].get("name", slot["shift"]),
|
|
"zone": slot["zone"], "hours": slot["hours"],
|
|
})
|
|
|
|
cov = []
|
|
for si, slot, assigned_vars, short in coverage_report:
|
|
n_assigned = sum(solver.Value(v) for v in assigned_vars)
|
|
cov.append({
|
|
"date": slot["date"], "shift": slot["shift"], "zone": slot["zone"],
|
|
"required": slot["required"], "assigned": int(n_assigned),
|
|
"shortfall": int(solver.Value(short)),
|
|
})
|
|
|
|
hours_out = {tid: int(solver.Value(h)) for tid, h in tech_hours.items()}
|
|
|
|
# Explications lisibles (pour l'UI)
|
|
explanations = []
|
|
for tid in sorted(hours_out, key=lambda k: -hours_out[k]):
|
|
n_shifts = sum(1 for a in assignments if a["tech"] == tid)
|
|
if n_shifts:
|
|
explanations.append(f"{tech_by_id[tid].get('name', tid)} : {n_shifts} shift(s), {hours_out[tid]} h")
|
|
total_short = sum(c["shortfall"] for c in cov)
|
|
if total_short:
|
|
explanations.insert(0, f"⚠️ {total_short} poste(s) non couvert(s) — effectif insuffisant ce(s) jour(s).")
|
|
|
|
return {
|
|
"status": status_name,
|
|
"assignments": assignments,
|
|
"coverage_report": cov,
|
|
"tech_hours": hours_out,
|
|
"objective": solver.ObjectiveValue(),
|
|
"spread_hours": int(solver.Value(spread)),
|
|
"total_shortfall": total_short,
|
|
"explanations": explanations,
|
|
"solve_ms": int(solver.WallTime() * 1000),
|
|
}
|