""" Roster AI — solveur d'optimisation d'horaires (OR-Tools CP-SAT). Modèle de contraintes "façon Timefold" : contraintes DURES (faisabilité) + contraintes SOUPLES (objectif pondéré). Agnostique du backend : on lui passe des techniciens + disponibilités + besoins de couverture, il rend des assignations optimales (+ un rapport de couverture dispo-vs-requis). Entrée / sortie : voir README.md et sample_request.json. """ from __future__ import annotations from datetime import date, timedelta from ortools.sat.python import cp_model FR_ABBR = ["lun", "mar", "mer", "jeu", "ven", "sam", "dim"] # date.weekday(): lun=0..dim=6 def _parse_date(s: str) -> date: y, m, d = (int(x) for x in s.split("-")) return date(y, m, d) def fr_day(d: date) -> str: return FR_ABBR[d.weekday()] def solve_roster(req: dict) -> dict: """Résout un horaire. `req` = payload décrit dans le README. Retourne un dict.""" # ── Entrées ────────────────────────────────────────────────────────────── horizon = req["horizon"] start = _parse_date(horizon["start"]) n_days = int(horizon["days"]) days = [start + timedelta(days=i) for i in range(n_days)] day_set = {d.isoformat() for d in days} shifts = {s["id"]: s for s in req["shift_templates"]} techs = req["technicians"] tech_by_id = {t["id"]: t for t in techs} W = { "uncovered": 1000, # pénalité par poste non couvert (priorité #1) "assignment": 8, # pénalité par assignation → couvrir le requis SANS sur-staffer "efficiency": 3, # préférer les techs plus rapides (facteur temps bas) "fairness": 5, # écart d'heures max-min "cost": 1, # coût horaire "preference": 8, # travailler un jour "préféré off" "continuity": 4, # bonus si la zone == zone d'attache du tech **(req.get("weights") or {}), } # ── Slots de couverture (date, shift, zone, requis, compétences) ────────── # On ne garde que les slots dont la date est dans l'horizon et le shift connu. slots = [] for c in req.get("coverage", []): if c["date"] not in day_set or c["shift"] not in shifts: continue slots.append({ "date": c["date"], "shift": c["shift"], "zone": c.get("zone", "—"), "required": int(c.get("required") or 1), "skills": set(c.get("required_skills") or []), "hours": int(shifts[c["shift"]].get("hours") or 8), }) model = cp_model.CpModel() # ── Variables de décision : y[(tech_id, slot_idx)] ∈ {0,1} ──────────────── # On ne crée la variable QUE si le tech est qualifié (compétences) ET # disponible (date pas dans unavailable). Élaguer réduit la taille du modèle. y = {} for ti, t in enumerate(techs): tskills = set(t.get("skills", [])) unavailable = set(t.get("unavailable", [])) for si, slot in enumerate(slots): if slot["date"] in unavailable: continue if not slot["skills"].issubset(tskills): continue y[(t["id"], si)] = model.NewBoolVar(f"y_{t['id']}_{si}") # ── Contrainte DURE : ≤ 1 shift par tech par jour ───────────────────────── for t in techs: by_date = {} for si, slot in enumerate(slots): if (t["id"], si) in y: by_date.setdefault(slot["date"], []).append(y[(t["id"], si)]) for _d, vars_ in by_date.items(): if len(vars_) > 1: model.AddAtMostOne(vars_) # ── Heures + jours travaillés par tech (pour DURES max + SOUPLE équité) ─── tech_hours = {} for t in techs: terms = [] days_vars = [] for si, slot in enumerate(slots): if (t["id"], si) in y: terms.append(slot["hours"] * y[(t["id"], si)]) days_vars.append(y[(t["id"], si)]) h = model.NewIntVar(0, 24 * n_days, f"hours_{t['id']}") model.Add(h == (sum(terms) if terms else 0)) # parenthèses: sinon model.Add(0) quand terms vide tech_hours[t["id"]] = h # DURE : max heures / semaine if t.get("max_hours_week") is not None: model.Add(h <= int(t["max_hours_week"])) # DURE : max jours (≤1/jour, donc somme des y = nb de jours) if t.get("max_days") is not None and days_vars: model.Add(sum(days_vars) <= int(t["max_days"])) # ── Couverture SOUPLE : shortfall[slot] = postes manquants (≥0) ─────────── shortfalls = [] coverage_report = [] for si, slot in enumerate(slots): assigned_vars = [y[(t["id"], si)] for t in techs if (t["id"], si) in y] short = model.NewIntVar(0, slot["required"], f"short_{si}") # sum(assigned) + short >= required → short absorbe le manque model.Add(sum(assigned_vars) + short >= slot["required"]) shortfalls.append(short) coverage_report.append((si, slot, assigned_vars, short)) # ── Équité SOUPLE : minimiser (max_h - min_h) parmi les techs actifs ────── # On borne max_h et min_h sur tous les techs qui ont au moins une variable. active = [t["id"] for t in techs if any((t["id"], si) in y for si in range(len(slots)))] spread = model.NewIntVar(0, 24 * n_days, "spread") if active: max_h = model.NewIntVar(0, 24 * n_days, "max_h") min_h = model.NewIntVar(0, 24 * n_days, "min_h") for tid in active: model.Add(max_h >= tech_hours[tid]) model.Add(min_h <= tech_hours[tid]) model.Add(spread == max_h - min_h) else: model.Add(spread == 0) # ── Objectif pondéré ────────────────────────────────────────────────────── obj = [] # 1) couverture (priorité écrasante) for short in shortfalls: obj.append(W["uncovered"] * short) # 2) équité obj.append(W["fairness"] * spread) # 3) coût + 4) préférences + 5) continuité de zone for ti, t in enumerate(techs): cost_h = int(t.get("cost_per_h") or 0) pref_off = set(t.get("preferred_off", [])) zone_home = t.get("zone_home") tf = int(round((float(t.get("time_factor") or 1.0)) * 10)) # 10=normal, 11=+10% lent, 9=-10% rapide for si, slot in enumerate(slots): v = y.get((t["id"], si)) if v is None: continue # pénalité par assignation : couvrir le requis sans sur-staffer obj.append(W["assignment"] * v) # préférer les techs plus rapides (facteur temps bas) obj.append(W["efficiency"] * tf * v) # coût if cost_h: obj.append(W["cost"] * cost_h * slot["hours"] * v) # préférence : pénalité si le jour est "préféré off" if fr_day(_parse_date(slot["date"])) in pref_off: obj.append(W["preference"] * v) # continuité de zone : bonus (= pénalité négative) si même zone d'attache if zone_home and slot["zone"] == zone_home: obj.append(-W["continuity"] * v) model.Minimize(sum(obj)) # ── Résolution ──────────────────────────────────────────────────────────── solver = cp_model.CpSolver() solver.parameters.max_time_in_seconds = float(req.get("max_seconds") or 10) solver.parameters.num_search_workers = 8 status = solver.Solve(model) status_name = solver.StatusName(status) if status not in (cp_model.OPTIMAL, cp_model.FEASIBLE): return {"status": status_name, "assignments": [], "coverage_report": [], "tech_hours": {}, "objective": None, "message": "Aucune solution (contraintes dures incompatibles)."} # ── Construire le résultat ──────────────────────────────────────────────── assignments = [] for ti, t in enumerate(techs): for si, slot in enumerate(slots): v = y.get((t["id"], si)) if v is not None and solver.Value(v) == 1: assignments.append({ "tech": t["id"], "tech_name": t.get("name", t["id"]), "date": slot["date"], "shift": slot["shift"], "shift_name": shifts[slot["shift"]].get("name", slot["shift"]), "zone": slot["zone"], "hours": slot["hours"], }) cov = [] for si, slot, assigned_vars, short in coverage_report: n_assigned = sum(solver.Value(v) for v in assigned_vars) cov.append({ "date": slot["date"], "shift": slot["shift"], "zone": slot["zone"], "required": slot["required"], "assigned": int(n_assigned), "shortfall": int(solver.Value(short)), }) hours_out = {tid: int(solver.Value(h)) for tid, h in tech_hours.items()} # Explications lisibles (pour l'UI) explanations = [] for tid in sorted(hours_out, key=lambda k: -hours_out[k]): n_shifts = sum(1 for a in assignments if a["tech"] == tid) if n_shifts: explanations.append(f"{tech_by_id[tid].get('name', tid)} : {n_shifts} shift(s), {hours_out[tid]} h") total_short = sum(c["shortfall"] for c in cov) if total_short: explanations.insert(0, f"⚠️ {total_short} poste(s) non couvert(s) — effectif insuffisant ce(s) jour(s).") return { "status": status_name, "assignments": assignments, "coverage_report": cov, "tech_hours": hours_out, "objective": solver.ObjectiveValue(), "spread_hours": int(solver.Value(spread)), "total_shortfall": total_short, "explanations": explanations, "solve_ms": int(solver.WallTime() * 1000), }