|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def solve_week( |
|
|
week_idx: int, |
|
|
start_date: datetime.date, |
|
|
available_staff: list[str], |
|
|
cumulative_shifts: dict[str, int], |
|
|
names_all: list[str] |
|
|
) -> tuple[dict, dict]: |
|
|
""" |
|
|
Returns (schedule_week, weekly_counts) for 7 days. |
|
|
schedule_week: {d: {"day":[], "night":[]}} for d in 0..6 (relative) |
|
|
""" |
|
|
|
|
|
idx_map = {name: i for i, name in enumerate(available_staff)} |
|
|
N = len(available_staff) |
|
|
if N < 5: |
|
|
raise ValueError("At least 5 staff must be available.") |
|
|
|
|
|
|
|
|
full_names = available_staff + [f"Vacant_{i}" for i in range(9 - N)] |
|
|
SHIFT_IDX = {"day": 0, "night": 1} |
|
|
DAYS = 7 |
|
|
WEEKDAY_REL = {0,1,2,3,4} |
|
|
model = cp_model.CpModel() |
|
|
x = {} |
|
|
for p, d, s in itertools.product(range(9), range(DAYS), range(2)): |
|
|
x[p, d, s] = model.NewBoolVar(f"x_{p}_{d}_{s}") |
|
|
|
|
|
|
|
|
for d in range(DAYS): |
|
|
req = (3, 1) if d in WEEKDAY_REL else (1, 1) |
|
|
model.Add(sum(x[p, d, SHIFT_IDX["day"]] for p in range(9)) == req[0]) |
|
|
model.Add(sum(x[p, d, SHIFT_IDX["night"]] for p in range(9)) == req[1]) |
|
|
|
|
|
|
|
|
for p, d in itertools.product(range(9), range(DAYS)): |
|
|
model.Add(x[p, d, SHIFT_IDX["day"]] + x[p, d, SHIFT_IDX["night"]] <= 1) |
|
|
for p in range(9): |
|
|
for d in range(DAYS - 1): |
|
|
model.Add( |
|
|
x[p, d, SHIFT_IDX["day"]] + |
|
|
x[p, d, SHIFT_IDX["night"]] + |
|
|
x[p, d+1, SHIFT_IDX["day"]] + |
|
|
x[p, d+1, SHIFT_IDX["night"]] <= 1 |
|
|
) |
|
|
|
|
|
for p in range(9): |
|
|
model.Add(sum(x[p, d, s] for d in (5, 6) for s in range(2)) <= 1) |
|
|
|
|
|
|
|
|
for p in range(N, 9): |
|
|
for d, s in itertools.product(range(DAYS), range(2)): |
|
|
model.Add(x[p, d, s] == 0) |
|
|
|
|
|
|
|
|
|
|
|
total_needed = sum(3+1 for _ in range(5)) + sum(1+1 for _ in range(2)) |
|
|
target_per_person = total_needed / len(names_all) |
|
|
week_shifts = {} |
|
|
for p, name in enumerate(available_staff): |
|
|
var = model.NewIntVar(0, 3, f"weekshift_{p}") |
|
|
model.Add(var == sum(x[p, d, s] for d in range(DAYS) for s in range(2))) |
|
|
week_shifts[name] = var |
|
|
|
|
|
|
|
|
max_dev = model.NewIntVar(0, 3, "max_dev") |
|
|
for name in available_staff: |
|
|
total_after = cumulative_shifts.get(name, 0) + week_shifts[name] |
|
|
|
|
|
ideal = 16 / 6 |
|
|
|
|
|
pos = model.NewIntVar(0, 10, f"pos_{name}") |
|
|
neg = model.NewIntVar(0, 10, f"neg_{name}") |
|
|
model.Add(total_after - round(ideal * 10) == pos - neg).OnlyEnforceIf(model.NewBoolVar("")) |
|
|
|
|
|
model.Add(week_shifts[name] >= 2) |
|
|
model.Add(week_shifts[name] <= 3) |
|
|
model.Minimize(max_dev) |
|
|
|
|
|
|
|
|
solver = cp_model.CpSolver() |
|
|
solver.parameters.max_time_in_seconds = 20.0 |
|
|
solver.parameters.num_search_workers = 4 |
|
|
|
|
|
if solver.Solve(model) not in (cp_model.OPTIMAL, cp_model.FEASIBLE): |
|
|
raise RuntimeError(f"Week {week_idx+1} infeasible with given availability.") |
|
|
|
|
|
|
|
|
schedule_week = {} |
|
|
weekly_counts = {name: 0 for name in available_staff} |
|
|
for d in range(7): |
|
|
day_staff = [full_names[p] for p in range(9) |
|
|
if solver.Value(x[p, d, SHIFT_IDX["day"]]) and not full_names[p].startswith("Vacant_")] |
|
|
night_staff = [full_names[p] for p in range(9) |
|
|
if solver.Value(x[p, d, SHIFT_IDX["night"]]) and not full_names[p].startswith("Vacant_")] |
|
|
schedule_week[d] = {"day": day_staff, "night": night_staff} |
|
|
for name in day_staff + night_staff: |
|
|
weekly_counts[name] += 1 |
|
|
|
|
|
return schedule_week, weekly_counts |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if "cumulative_shifts" not in st.session_state: |
|
|
st.session_state.cumulative_shifts = {} |
|
|
if "roster_weekly" not in st.session_state: |
|
|
st.session_state.roster_weekly = {} |
|
|
|
|
|
|
|
|
st.header("3. Weekly Availability (Holiday/Mission)") |
|
|
st.markdown("Mark unavailable staff for each week (max 4 per week).") |
|
|
|
|
|
|
|
|
avail_matrix = {} |
|
|
cols = st.columns(6) |
|
|
for w in range(6): |
|
|
with cols[w]: |
|
|
st.subheader(f"Week {w+1}") |
|
|
available = [] |
|
|
for i, name in enumerate([n for n in st.session_state.names if n]): |
|
|
if st.checkbox(f"{name}", value=True, key=f"avail_w{w}_p{i}"): |
|
|
available.append(name) |
|
|
if len(available) < 5: |
|
|
st.error("β οΈ At least 5 must be available.") |
|
|
avail_matrix[w] = available |
|
|
|
|
|
if st.button("π Generate Rolling Roster", type="primary"): |
|
|
try: |
|
|
names_all = [n for n in st.session_state.names if n] |
|
|
start = st.session_state.start_date |
|
|
cum_shifts = st.session_state.cumulative_shifts.copy() |
|
|
weekly_schedules = {} |
|
|
|
|
|
for w in range(6): |
|
|
week_start = start + timedelta(weeks=w) |
|
|
avail = avail_matrix[w] |
|
|
schedule_w, counts_w = solve_week(w, week_start, avail, cum_shifts, names_all) |
|
|
|
|
|
for name, cnt in counts_w.items(): |
|
|
cum_shifts[name] = cum_shifts.get(name, 0) + cnt |
|
|
|
|
|
abs_schedule = {} |
|
|
for d_rel, shifts in schedule_w.items(): |
|
|
d_abs = w * 7 + d_rel |
|
|
abs_schedule[d_abs] = shifts |
|
|
weekly_schedules[w] = abs_schedule |
|
|
|
|
|
st.session_state.cumulative_shifts = cum_shifts |
|
|
st.session_state.roster_weekly = weekly_schedules |
|
|
st.session_state.roster_ready = True |
|
|
st.success("β
Rolling roster generated!") |
|
|
|
|
|
except Exception as e: |
|
|
st.error(f"Generation failed: {e}") |
|
|
|
|
|
|
|
|
if st.session_state.roster_ready: |
|
|
|
|
|
full_sched = {} |
|
|
for w_sched in st.session_state.roster_weekly.values(): |
|
|
full_sched.update(w_sched) |
|
|
|
|
|
if st.session_state.user_role == "manager": |
|
|
|
|
|
rows = [] |
|
|
for d in range(42): |
|
|
dt = st.session_state.start_date + timedelta(days=d) |
|
|
wd = ["Mon","Tue","Wed","Thu","Fri","Sat","Sun"][d%7] |
|
|
week = d//7 + 1 |
|
|
typ = "WD" if (d%7)<5 else "WE" |
|
|
rows.append({ |
|
|
"Week": f"W{week}", |
|
|
"Date": dt.strftime("%Y-%m-%d"), |
|
|
"Day": wd, |
|
|
"Type": typ, |
|
|
"Day Shift": ", ".join(full_sched.get(d, {}).get("day", [])), |
|
|
"Night Shift": ", ".join(full_sched.get(d, {}).get("night", [])), |
|
|
}) |
|
|
df = pd.DataFrame(rows) |
|
|
st.dataframe(df, use_container_width=True, hide_index=True) |
|
|
|
|
|
|
|
|
st.subheader("π Cumulative Shifts (So Far)") |
|
|
summ = [] |
|
|
for name in [n for n in st.session_state.names if n]: |
|
|
summ.append({"Staff": name, "Total": st.session_state.cumulative_shifts.get(name, 0)}) |
|
|
st.dataframe(pd.DataFrame(summ), use_container_width=True, hide_index=True) |
|
|
|
|
|
else: |
|
|
|
|
|
email = st.session_state.staff_email |
|
|
names = [n for n in st.session_state.names if n] |
|
|
emails = [e for i, e in enumerate(st.session_state.emails) if st.session_state.names[i]] |
|
|
staff_name = None |
|
|
if email in emails: |
|
|
staff_name = names[emails.index(email)] |
|
|
if staff_name: |
|
|
my_shifts = [] |
|
|
for d in range(42): |
|
|
dt = st.session_state.start_date + timedelta(days=d) |
|
|
if staff_name in full_sched.get(d, {}).get("day", []): |
|
|
my_shifts.append({"Date": dt.strftime("%Y-%m-%d"), "Shift": "Day"}) |
|
|
if staff_name in full_sched.get(d, {}).get("night", []): |
|
|
my_shifts.append({"Date": dt.strftime("%Y-%m-%d"), "Shift": "Night"}) |
|
|
st.dataframe(pd.DataFrame(my_shifts), use_container_width=True, hide_index=True) |
|
|
else: |
|
|
st.warning("Enter an email matching a staff member.") |