Spaces:
Sleeping
Sleeping
| # app/catalog.py | |
| from __future__ import annotations | |
| import json | |
| import os | |
| import re | |
| from typing import Dict, Any, List, Optional, Tuple | |
| # In-memory singleton | |
| _CATALOG: Optional[Dict[str, Any]] = None | |
| def _norm(s: str) -> str: | |
| """lightweight normalization for fuzzy-ish equality""" | |
| return re.sub(r"\s+", " ", (s or "").strip().lower()) | |
| def get_catalog_path() -> str: | |
| """ | |
| Resolve catalog path in this order: | |
| 1) ENV CAFE_CATALOG_PATH | |
| 2) repo-relative data/menu_catalog.json (current default) | |
| """ | |
| env_path = os.getenv("CAFE_CATALOG_PATH") | |
| if env_path: | |
| return env_path | |
| here = os.path.dirname(os.path.abspath(__file__)) | |
| root = os.path.dirname(here) | |
| return os.path.join(root, "data", "menu_catalog.json") | |
| def _load_from_disk(path: str) -> Dict[str, Any]: | |
| if not os.path.exists(path): | |
| raise FileNotFoundError( | |
| f"Catalog not found at {path}. " | |
| "Set CAFE_CATALOG_PATH in your .env or place data/menu_catalog.json." | |
| ) | |
| with open(path, "r", encoding="utf-8") as f: | |
| try: | |
| data = json.load(f) | |
| except json.JSONDecodeError as e: | |
| raise ValueError(f"Catalog JSON invalid at {path}: {e}") from e | |
| # quick shape checks (non-fatal, but helpful) | |
| if not isinstance(data, dict) or "items" not in data or "schema" not in data: | |
| raise ValueError("Catalog must contain top-level keys: 'items' and 'schema'.") | |
| if not isinstance(data["items"], list): | |
| raise ValueError("'items' must be a list.") | |
| if not isinstance(data["schema"], dict): | |
| raise ValueError("'schema' must be a dict.") | |
| return data | |
| def load_catalog(force_reload: bool = False) -> Dict[str, Any]: | |
| global _CATALOG | |
| if _CATALOG is not None and not force_reload: | |
| return _CATALOG | |
| path = get_catalog_path() | |
| _CATALOG = _load_from_disk(path) | |
| return _CATALOG | |
| def find_item_by_name(name: str) -> Optional[Dict[str, Any]]: | |
| c = load_catalog() | |
| q = _norm(name) | |
| if not q: | |
| return None | |
| for it in c["items"]: | |
| nm = _norm(it.get("name", "")) | |
| if q == nm or q in nm: | |
| return it | |
| # optional alias list support: ["alias1", "alias2"] | |
| for alias in it.get("aliases", []) or []: | |
| if q == _norm(alias) or q in _norm(alias): | |
| return it | |
| return None | |
| def find_item_by_sku(sku: str) -> Optional[Dict[str, Any]]: | |
| c = load_catalog() | |
| target = (sku or "").strip() | |
| if not target: | |
| return None | |
| for it in c["items"]: | |
| if str(it.get("sku", "")).strip() == target: | |
| return it | |
| return None | |
| def required_fields_for_category(category: str) -> List[str]: | |
| c = load_catalog() | |
| schema = c["schema"].get(category) or {} | |
| rf = schema.get("required_fields") or [] | |
| return list(rf) | |
| def optional_fields_for_category(category: str) -> List[str]: | |
| c = load_catalog() | |
| schema = c["schema"].get(category) or {} | |
| of = schema.get("optional_fields") or [] | |
| return list(of) | |
| def _resolve_item(order_item: Dict[str, Any]) -> Optional[Dict[str, Any]]: | |
| it: Optional[Dict[str, Any]] = None | |
| if order_item.get("sku"): | |
| it = find_item_by_sku(str(order_item["sku"])) | |
| if not it and order_item.get("name"): | |
| it = find_item_by_name(str(order_item["name"])) | |
| return it | |
| def compute_missing_fields(order_item: Dict[str, Any]) -> List[str]: | |
| """ | |
| order_item example: | |
| {"name": "Margherita Pizza", "qty": 2, "size": "large", ...} | |
| Uses catalog schema to see which fields are missing. | |
| """ | |
| it = _resolve_item(order_item) | |
| if not it: | |
| return ["name"] # we don’t even know the item yet | |
| category = it["category"] | |
| req = set(required_fields_for_category(category)) | |
| present = set(k for k in order_item.keys() if k in req or k in {"qty", "name", "sku"}) | |
| # qty normalization: consider qty present if >= 1 | |
| if "qty" in req: | |
| try: | |
| q = int(order_item.get("qty", 0)) | |
| except Exception: | |
| q = 0 | |
| if q >= 1: | |
| present.add("qty") | |
| # else leave "qty" missing | |
| else: | |
| # even if qty isn't required, count it as present if provided | |
| if order_item.get("qty") is not None: | |
| present.add("qty") | |
| missing = [f for f in req if f not in present] | |
| return missing | |
| def friendly_requirements_prompt(order_item: Dict[str, Any]) -> str: | |
| it = _resolve_item(order_item) | |
| if not it: | |
| return "Which item would you like to order?" | |
| category = it["category"] | |
| req = required_fields_for_category(category) | |
| opt = optional_fields_for_category(category) | |
| parts: List[str] = [] | |
| opt_txt = f" Optional: {', '.join(opt)}." if opt else "" | |
| if req: | |
| parts.append(f"I need {', '.join(req)} for {it['name']}.{opt_txt}") | |
| else: | |
| parts.append(f"Please specify quantity for {it['name']}.{opt_txt}") | |
| # Also list choices for required options (e.g., size choices) | |
| opts = it.get("options") or {} | |
| choice_bits: List[str] = [] | |
| for k, spec in opts.items(): | |
| if spec.get("required"): | |
| choices = spec.get("choices") or [] | |
| if choices: | |
| choice_bits.append(f"{k}: {', '.join(choices)}") | |
| if choice_bits: | |
| parts.append("Choices → " + " | ".join(choice_bits)) | |
| return " ".join(parts) |