# app/catalog.py from __future__ import annotations import json import os import re from typing import Dict, Any, List, Optional, Tuple # In-memory singleton _CATALOG: Optional[Dict[str, Any]] = None def _norm(s: str) -> str: """lightweight normalization for fuzzy-ish equality""" return re.sub(r"\s+", " ", (s or "").strip().lower()) def get_catalog_path() -> str: """ Resolve catalog path in this order: 1) ENV CAFE_CATALOG_PATH 2) repo-relative data/menu_catalog.json (current default) """ env_path = os.getenv("CAFE_CATALOG_PATH") if env_path: return env_path here = os.path.dirname(os.path.abspath(__file__)) root = os.path.dirname(here) return os.path.join(root, "data", "menu_catalog.json") def _load_from_disk(path: str) -> Dict[str, Any]: if not os.path.exists(path): raise FileNotFoundError( f"Catalog not found at {path}. " "Set CAFE_CATALOG_PATH in your .env or place data/menu_catalog.json." ) with open(path, "r", encoding="utf-8") as f: try: data = json.load(f) except json.JSONDecodeError as e: raise ValueError(f"Catalog JSON invalid at {path}: {e}") from e # quick shape checks (non-fatal, but helpful) if not isinstance(data, dict) or "items" not in data or "schema" not in data: raise ValueError("Catalog must contain top-level keys: 'items' and 'schema'.") if not isinstance(data["items"], list): raise ValueError("'items' must be a list.") if not isinstance(data["schema"], dict): raise ValueError("'schema' must be a dict.") return data def load_catalog(force_reload: bool = False) -> Dict[str, Any]: global _CATALOG if _CATALOG is not None and not force_reload: return _CATALOG path = get_catalog_path() _CATALOG = _load_from_disk(path) return _CATALOG def find_item_by_name(name: str) -> Optional[Dict[str, Any]]: c = load_catalog() q = _norm(name) if not q: return None for it in c["items"]: nm = _norm(it.get("name", "")) if q == nm or q in nm: return it # optional alias list support: ["alias1", "alias2"] for alias in it.get("aliases", []) or []: if q == _norm(alias) or q in _norm(alias): return it return None def find_item_by_sku(sku: str) -> Optional[Dict[str, Any]]: c = load_catalog() target = (sku or "").strip() if not target: return None for it in c["items"]: if str(it.get("sku", "")).strip() == target: return it return None def required_fields_for_category(category: str) -> List[str]: c = load_catalog() schema = c["schema"].get(category) or {} rf = schema.get("required_fields") or [] return list(rf) def optional_fields_for_category(category: str) -> List[str]: c = load_catalog() schema = c["schema"].get(category) or {} of = schema.get("optional_fields") or [] return list(of) def _resolve_item(order_item: Dict[str, Any]) -> Optional[Dict[str, Any]]: it: Optional[Dict[str, Any]] = None if order_item.get("sku"): it = find_item_by_sku(str(order_item["sku"])) if not it and order_item.get("name"): it = find_item_by_name(str(order_item["name"])) return it def compute_missing_fields(order_item: Dict[str, Any]) -> List[str]: """ order_item example: {"name": "Margherita Pizza", "qty": 2, "size": "large", ...} Uses catalog schema to see which fields are missing. """ it = _resolve_item(order_item) if not it: return ["name"] # we don’t even know the item yet category = it["category"] req = set(required_fields_for_category(category)) present = set(k for k in order_item.keys() if k in req or k in {"qty", "name", "sku"}) # qty normalization: consider qty present if >= 1 if "qty" in req: try: q = int(order_item.get("qty", 0)) except Exception: q = 0 if q >= 1: present.add("qty") # else leave "qty" missing else: # even if qty isn't required, count it as present if provided if order_item.get("qty") is not None: present.add("qty") missing = [f for f in req if f not in present] return missing def friendly_requirements_prompt(order_item: Dict[str, Any]) -> str: it = _resolve_item(order_item) if not it: return "Which item would you like to order?" category = it["category"] req = required_fields_for_category(category) opt = optional_fields_for_category(category) parts: List[str] = [] opt_txt = f" Optional: {', '.join(opt)}." if opt else "" if req: parts.append(f"I need {', '.join(req)} for {it['name']}.{opt_txt}") else: parts.append(f"Please specify quantity for {it['name']}.{opt_txt}") # Also list choices for required options (e.g., size choices) opts = it.get("options") or {} choice_bits: List[str] = [] for k, spec in opts.items(): if spec.get("required"): choices = spec.get("choices") or [] if choices: choice_bits.append(f"{k}: {', '.join(choices)}") if choice_bits: parts.append("Choices → " + " | ".join(choice_bits)) return " ".join(parts)