roster / src /additional.py
nyimbi's picture
Create additional.py
a5e61d2 verified
raw
history blame
8.94 kB
# Add to top of roster_app_enterprise.py (after imports)
# ------------------------------
# Weekly Optimiser (7-day CP-SAT)
# ------------------------------
def solve_week(
week_idx: int,
start_date: datetime.date,
available_staff: list[str],
cumulative_shifts: dict[str, int], # current totals
names_all: list[str]
) -> tuple[dict, dict]:
"""
Returns (schedule_week, weekly_counts) for 7 days.
schedule_week: {d: {"day":[], "night":[]}} for d in 0..6 (relative)
"""
# Map name β†’ index for optimisation (only available staff + placeholders)
idx_map = {name: i for i, name in enumerate(available_staff)}
N = len(available_staff)
if N < 5:
raise ValueError("At least 5 staff must be available.")
# Pad to 9 with vacants (to reuse model structure)
full_names = available_staff + [f"Vacant_{i}" for i in range(9 - N)]
SHIFT_IDX = {"day": 0, "night": 1}
DAYS = 7
WEEKDAY_REL = {0,1,2,3,4} # Mon=0 relative to week start
model = cp_model.CpModel()
x = {}
for p, d, s in itertools.product(range(9), range(DAYS), range(2)):
x[p, d, s] = model.NewBoolVar(f"x_{p}_{d}_{s}")
# Coverage (7 days)
for d in range(DAYS):
req = (3, 1) if d in WEEKDAY_REL else (1, 1)
model.Add(sum(x[p, d, SHIFT_IDX["day"]] for p in range(9)) == req[0])
model.Add(sum(x[p, d, SHIFT_IDX["night"]] for p in range(9)) == req[1])
# Temporal constraints (relative days)
for p, d in itertools.product(range(9), range(DAYS)):
model.Add(x[p, d, SHIFT_IDX["day"]] + x[p, d, SHIFT_IDX["night"]] <= 1)
for p in range(9):
for d in range(DAYS - 1):
model.Add(
x[p, d, SHIFT_IDX["day"]] +
x[p, d, SHIFT_IDX["night"]] +
x[p, d+1, SHIFT_IDX["day"]] +
x[p, d+1, SHIFT_IDX["night"]] <= 1
)
# Weekend cap (d=5,6)
for p in range(9):
model.Add(sum(x[p, d, s] for d in (5, 6) for s in range(2)) <= 1)
# Availability: vacants must be 0
for p in range(N, 9): # vacants
for d, s in itertools.product(range(DAYS), range(2)):
model.Add(x[p, d, s] == 0)
# Fairness: minimize max deviation from target
# Target = avg of cumulative + weekly fair share
total_needed = sum(3+1 for _ in range(5)) + sum(1+1 for _ in range(2)) # 24 shifts/week
target_per_person = total_needed / len(names_all) # β‰ˆ2.67 β†’ use soft objective
week_shifts = {}
for p, name in enumerate(available_staff):
var = model.NewIntVar(0, 3, f"weekshift_{p}")
model.Add(var == sum(x[p, d, s] for d in range(DAYS) for s in range(2)))
week_shifts[name] = var
# Soft fairness: minimize max deviation
max_dev = model.NewIntVar(0, 3, "max_dev")
for name in available_staff:
total_after = cumulative_shifts.get(name, 0) + week_shifts[name]
# Deviation from ideal rolling mean (e.g., 16/6 β‰ˆ 2.67 per week)
ideal = 16 / 6 # 2.666...
# Linearize |total_after - ideal| via auxiliary vars
pos = model.NewIntVar(0, 10, f"pos_{name}")
neg = model.NewIntVar(0, 10, f"neg_{name}")
model.Add(total_after - round(ideal * 10) == pos - neg).OnlyEnforceIf(model.NewBoolVar("")) # skip exact
# Simpler: bound each person to 2 or 3 shifts/week
model.Add(week_shifts[name] >= 2)
model.Add(week_shifts[name] <= 3)
model.Minimize(max_dev)
# Solve
solver = cp_model.CpSolver()
solver.parameters.max_time_in_seconds = 20.0
solver.parameters.num_search_workers = 4
if solver.Solve(model) not in (cp_model.OPTIMAL, cp_model.FEASIBLE):
raise RuntimeError(f"Week {week_idx+1} infeasible with given availability.")
# Extract
schedule_week = {}
weekly_counts = {name: 0 for name in available_staff}
for d in range(7):
day_staff = [full_names[p] for p in range(9)
if solver.Value(x[p, d, SHIFT_IDX["day"]]) and not full_names[p].startswith("Vacant_")]
night_staff = [full_names[p] for p in range(9)
if solver.Value(x[p, d, SHIFT_IDX["night"]]) and not full_names[p].startswith("Vacant_")]
schedule_week[d] = {"day": day_staff, "night": night_staff}
for name in day_staff + night_staff:
weekly_counts[name] += 1
return schedule_week, weekly_counts
# ------------------------------
# Streamlit UI Additions
# ------------------------------
# Add to session state init (if not present)
if "cumulative_shifts" not in st.session_state:
st.session_state.cumulative_shifts = {}
if "roster_weekly" not in st.session_state:
st.session_state.roster_weekly = {} # week_idx β†’ schedule
# In Manager section, after staff input:
st.header("3. Weekly Availability (Holiday/Mission)")
st.markdown("Mark unavailable staff for each week (max 4 per week).")
# Weekly toggles
avail_matrix = {}
cols = st.columns(6)
for w in range(6):
with cols[w]:
st.subheader(f"Week {w+1}")
available = []
for i, name in enumerate([n for n in st.session_state.names if n]):
if st.checkbox(f"{name}", value=True, key=f"avail_w{w}_p{i}"):
available.append(name)
if len(available) < 5:
st.error("⚠️ At least 5 must be available.")
avail_matrix[w] = available
if st.button("πŸš€ Generate Rolling Roster", type="primary"):
try:
names_all = [n for n in st.session_state.names if n]
start = st.session_state.start_date
cum_shifts = st.session_state.cumulative_shifts.copy()
weekly_schedules = {}
for w in range(6):
week_start = start + timedelta(weeks=w)
avail = avail_matrix[w]
schedule_w, counts_w = solve_week(w, week_start, avail, cum_shifts, names_all)
# Update cumulative
for name, cnt in counts_w.items():
cum_shifts[name] = cum_shifts.get(name, 0) + cnt
# Store absolute-date schedule
abs_schedule = {}
for d_rel, shifts in schedule_w.items():
d_abs = w * 7 + d_rel
abs_schedule[d_abs] = shifts
weekly_schedules[w] = abs_schedule
st.session_state.cumulative_shifts = cum_shifts
st.session_state.roster_weekly = weekly_schedules
st.session_state.roster_ready = True
st.success("βœ… Rolling roster generated!")
except Exception as e:
st.error(f"Generation failed: {e}")
# Display logic (updated for weekly)
if st.session_state.roster_ready:
# Merge weekly schedules into full 42-day dict
full_sched = {}
for w_sched in st.session_state.roster_weekly.values():
full_sched.update(w_sched)
if st.session_state.user_role == "manager":
# Full roster table (same as before, using full_sched)
rows = []
for d in range(42):
dt = st.session_state.start_date + timedelta(days=d)
wd = ["Mon","Tue","Wed","Thu","Fri","Sat","Sun"][d%7]
week = d//7 + 1
typ = "WD" if (d%7)<5 else "WE"
rows.append({
"Week": f"W{week}",
"Date": dt.strftime("%Y-%m-%d"),
"Day": wd,
"Type": typ,
"Day Shift": ", ".join(full_sched.get(d, {}).get("day", [])),
"Night Shift": ", ".join(full_sched.get(d, {}).get("night", [])),
})
df = pd.DataFrame(rows)
st.dataframe(df, use_container_width=True, hide_index=True)
# Cumulative summary
st.subheader("πŸ“Š Cumulative Shifts (So Far)")
summ = []
for name in [n for n in st.session_state.names if n]:
summ.append({"Staff": name, "Total": st.session_state.cumulative_shifts.get(name, 0)})
st.dataframe(pd.DataFrame(summ), use_container_width=True, hide_index=True)
else:
# Staff view: filter full_sched
email = st.session_state.staff_email
names = [n for n in st.session_state.names if n]
emails = [e for i, e in enumerate(st.session_state.emails) if st.session_state.names[i]]
staff_name = None
if email in emails:
staff_name = names[emails.index(email)]
if staff_name:
my_shifts = []
for d in range(42):
dt = st.session_state.start_date + timedelta(days=d)
if staff_name in full_sched.get(d, {}).get("day", []):
my_shifts.append({"Date": dt.strftime("%Y-%m-%d"), "Shift": "Day"})
if staff_name in full_sched.get(d, {}).get("night", []):
my_shifts.append({"Date": dt.strftime("%Y-%m-%d"), "Shift": "Night"})
st.dataframe(pd.DataFrame(my_shifts), use_container_width=True, hide_index=True)
else:
st.warning("Enter an email matching a staff member.")