Spaces:
Sleeping
Sleeping
| """ | |
| CoJournalist Data - Swiss Parliamentary Data & Statistics Chatbot | |
| Powered by Llama-3.1-8B-Instruct with OpenParlData and BFS MCP | |
| """ | |
| import os | |
| import json | |
| import tempfile | |
| from datetime import datetime | |
| from pathlib import Path | |
| import gradio as gr | |
| from huggingface_hub import InferenceClient | |
| from dotenv import load_dotenv | |
| from mcp_integration import execute_mcp_query, execute_mcp_query_bfs | |
| import asyncio | |
| from usage_tracker import UsageTracker | |
| from typing import Any | |
| from ui.helpers import prefer_language, strip_html, pick_external_url | |
| from datasets.parliament.constants import OPENPARLDATA_EXAMPLES, TOOL_PARAMS as PARLIAMENT_TOOL_PARAMS | |
| from datasets.bfs.constants import BFS_EXAMPLES | |
| # Load environment variables | |
| load_dotenv() | |
| # Load system prompts from files | |
| PROMPTS_DIR = Path(__file__).parent / "prompts" | |
| def load_prompt(dataset_name: str) -> str: | |
| """Load system prompt from file.""" | |
| prompt_file = PROMPTS_DIR / f"{dataset_name}.txt" | |
| if not prompt_file.exists(): | |
| raise FileNotFoundError(f"Prompt file not found: {prompt_file}") | |
| return prompt_file.read_text(encoding='utf-8') | |
| # Load prompts at startup | |
| PARLIAMENT_PROMPT = load_prompt("parliament") | |
| BFS_PROMPT = load_prompt("bfs") | |
| # Initialize Hugging Face Inference Client | |
| HF_TOKEN = os.getenv("HF_TOKEN") | |
| if not HF_TOKEN: | |
| print("Warning: HF_TOKEN not found. Please set it in .env file or Hugging Face Space secrets.") | |
| client = InferenceClient(token=HF_TOKEN) | |
| def translate_to_german(text: str) -> str: | |
| """ | |
| Translate user-facing keywords into German to improve OpenParlData recall. | |
| Falls back to the original text if translation fails or input is empty. | |
| """ | |
| cleaned = text.strip() | |
| if not cleaned: | |
| return cleaned | |
| prompt = ( | |
| "Übersetze die folgenden Suchbegriffe ins Deutsche. " | |
| "Gib nur die deutschen Stichwörter zurück, ohne Zusatztext.\n" | |
| f"Original: {cleaned}" | |
| ) | |
| try: | |
| response = client.chat_completion( | |
| model="meta-llama/Llama-3.1-70B-Instruct", | |
| messages=[ | |
| {"role": "system", "content": "Du bist ein präziser Übersetzer ins Deutsche."}, | |
| {"role": "user", "content": prompt}, | |
| ], | |
| max_tokens=64, | |
| temperature=0.0, | |
| ) | |
| translated = response.choices[0].message.content.strip() | |
| return translated or cleaned | |
| except Exception as exc: | |
| print(f"⚠️ [translate_to_german] Translation failed ({exc}); falling back to original text.") | |
| return cleaned | |
| class DatasetEngine: | |
| """Dataset-specific orchestrator for LLM prompting and tool execution.""" | |
| def __init__( | |
| self, | |
| name: str, | |
| display_name: str, | |
| system_prompt: str, | |
| routing_instruction: str, | |
| allowed_tools: set[str], | |
| ): | |
| self.name = name | |
| self.display_name = display_name | |
| self.system_prompt = system_prompt | |
| self.routing_instruction = routing_instruction | |
| self.allowed_tools = allowed_tools | |
| self._last_request: dict[str, Any] | None = None | |
| def build_messages(self, user_message: str, language_label: str, language_code: str) -> list[dict]: | |
| """Construct chat completion messages with dataset-specific guardrails.""" | |
| routing_guardrails = ( | |
| f"TARGET_DATA_SOURCE: {self.display_name}\n" | |
| f"{self.routing_instruction}\n" | |
| 'If the request requires a different data source, respond with ' | |
| '{"response": "Explain that the other dataset should be selected in the app."}' | |
| ) | |
| # Get current date for dynamic date handling | |
| current_date = datetime.now().strftime("%Y-%m-%d") | |
| return [ | |
| {"role": "system", "content": self.system_prompt}, | |
| {"role": "system", "content": routing_guardrails}, | |
| { | |
| "role": "user", | |
| "content": ( | |
| f"Current date: {current_date}\n" | |
| f"Selected dataset: {self.display_name}\n" | |
| f"Language preference: {language_label} ({language_code})\n" | |
| f"Question: {user_message}" | |
| ), | |
| }, | |
| ] | |
| def _parse_model_response(raw_response: str) -> dict: | |
| """Parse JSON (with cleanup) returned by the LLM.""" | |
| clean_response = raw_response.strip() | |
| if clean_response.startswith("```json"): | |
| clean_response = clean_response[7:] | |
| if clean_response.startswith("```"): | |
| clean_response = clean_response[3:] | |
| if clean_response.endswith("```"): | |
| clean_response = clean_response[:-3] | |
| clean_response = clean_response.strip() | |
| json_start_candidates = [] | |
| for ch in ("{", "["): | |
| idx = clean_response.find(ch) | |
| if idx != -1: | |
| json_start_candidates.append(idx) | |
| if json_start_candidates: | |
| clean_response = clean_response[min(json_start_candidates):] | |
| return json.loads(clean_response) | |
| def query_model(self, user_message: str, language_label: str, language_code: str) -> dict: | |
| """Call the LLM with dataset-constrained instructions.""" | |
| try: | |
| messages = self.build_messages(user_message, language_label, language_code) | |
| response = client.chat_completion( | |
| model="meta-llama/Llama-3.1-70B-Instruct", | |
| messages=messages, | |
| max_tokens=500, | |
| temperature=0.3, | |
| ) | |
| assistant_message = response.choices[0].message.content | |
| return self._parse_model_response(assistant_message) | |
| except json.JSONDecodeError: | |
| # Surface malformed responses to the user so they can retry. | |
| return {"response": assistant_message} | |
| except Exception as exc: | |
| return {"error": f"Error querying model: {str(exc)}"} | |
| def execute_tool( | |
| self, | |
| user_message: str, | |
| tool_name: str, | |
| arguments: dict, | |
| show_debug: bool, | |
| ) -> tuple[str, str | None]: | |
| """Run the MCP tool for the dataset.""" | |
| raise NotImplementedError("execute_tool must be implemented by subclasses.") | |
| def sanitize_arguments(self, tool_name: str, arguments: dict) -> dict: | |
| """ | |
| Sanitize and validate tool arguments before execution. | |
| Args: | |
| tool_name: Name of the tool being called | |
| arguments: Raw arguments from LLM | |
| Returns: | |
| Sanitized arguments dict with proper types and valid values | |
| """ | |
| raise NotImplementedError("sanitize_arguments must be implemented by subclasses.") | |
| def _compose_response_text( | |
| self, | |
| explanation: str, | |
| debug_info: str | None, | |
| show_debug: bool, | |
| body: str, | |
| ) -> str: | |
| parts = [] | |
| if explanation: | |
| parts.append(f"*{explanation}*") | |
| if show_debug and debug_info: | |
| parts.append(f"### 🔧 Debug Information\n{debug_info}\n\n---") | |
| parts.append(body) | |
| return "\n\n".join(parts) | |
| def postprocess_tool_response( | |
| self, | |
| *, | |
| response: str, | |
| tool_name: str, | |
| explanation: str, | |
| debug_info: str | None, | |
| show_debug: bool, | |
| language_code: str, | |
| ) -> tuple[str, str | None, dict, list]: | |
| """Default dataset response handler.""" | |
| body = f"### 📊 Results\n{response}" | |
| final_response = self._compose_response_text(explanation, debug_info, show_debug, body) | |
| return final_response, None, {}, [] | |
| def respond( | |
| self, | |
| user_message: str, | |
| language_label: str, | |
| language_code: str, | |
| show_debug: bool, | |
| ) -> tuple[str, str | None, dict, list]: | |
| """Entry point used by the Gradio handler.""" | |
| model_response = self.query_model(user_message, language_label, language_code) | |
| if "response" in model_response: | |
| return model_response["response"], None, {}, [] | |
| if "error" in model_response: | |
| return f"❌ {model_response['error']}", None, {}, [] | |
| tool_name = model_response.get("tool") | |
| arguments = model_response.get("arguments") | |
| if not tool_name or not isinstance(arguments, dict): | |
| return ( | |
| "I couldn't determine how to process your request. Please try rephrasing your question.", | |
| None, | |
| {}, | |
| [], | |
| ) | |
| if tool_name not in self.allowed_tools: | |
| allowed_list = ", ".join(sorted(self.allowed_tools)) | |
| warning = ( | |
| f"❌ Tool '{tool_name}' is not available for {self.display_name}. " | |
| f"Allowed tools: {allowed_list}. Please adjust your request." | |
| ) | |
| return warning, None, {}, [] | |
| if "language" not in arguments: | |
| arguments["language"] = language_code | |
| # Force JSON response format for parliament tools to ensure consistent card rendering | |
| if isinstance(self, ParliamentEngine): | |
| arguments["response_format"] = "json" | |
| # Sanitize arguments before execution | |
| arguments = self.sanitize_arguments(tool_name, arguments) | |
| print(f"✅ [DatasetEngine] Sanitized arguments: {arguments}") | |
| # Remember latest request context for downstream post-processing | |
| self._last_request = { | |
| "tool": tool_name, | |
| "arguments": dict(arguments), | |
| } | |
| explanation = model_response.get("explanation", "") | |
| response, debug_info = self.execute_tool(user_message, tool_name, arguments, show_debug) | |
| return self.postprocess_tool_response( | |
| response=response, | |
| tool_name=tool_name, | |
| explanation=explanation, | |
| debug_info=debug_info, | |
| show_debug=show_debug, | |
| language_code=language_code, | |
| ) | |
| class ParliamentEngine(DatasetEngine): | |
| def __init__(self): | |
| super().__init__( | |
| name="parliament", | |
| display_name="Swiss Parliament Data (OpenParlData)", | |
| system_prompt=PARLIAMENT_PROMPT, | |
| routing_instruction="Use only tools that begin with 'openparldata_'. Never mention BFS tools.", | |
| allowed_tools={ | |
| "openparldata_search_parliamentarians", | |
| "openparldata_search_votes", | |
| "openparldata_search_motions", | |
| "openparldata_search_debates", | |
| "openparldata_search_meetings", | |
| }, | |
| ) | |
| def sanitize_arguments(self, tool_name: str, arguments: dict) -> dict: | |
| """Sanitize arguments for OpenParlData tools.""" | |
| sanitized = {} | |
| valid_params = PARLIAMENT_TOOL_PARAMS.get(tool_name, set()) | |
| requested_language = str(arguments.get("language", "")).lower() | |
| original_arguments = dict(arguments) | |
| optional_string_params = { | |
| "canton", | |
| "party", | |
| "parliament_id", | |
| "vote_type", | |
| "submitter_id", | |
| "speaker_id", | |
| "topic", | |
| "status", | |
| "body_key", | |
| "level", | |
| } | |
| for key, value in arguments.items(): | |
| # Skip extra fields not in the tool schema | |
| if key not in valid_params: | |
| print(f"⚠️ [ParliamentEngine] Skipping invalid parameter '{key}' for {tool_name}") | |
| continue | |
| # Normalize strings and drop empty values for optional params | |
| if isinstance(value, str): | |
| value = value.strip() | |
| if value == "" and key in optional_string_params: | |
| print(f"⚠️ [ParliamentEngine] Dropping empty string for '{key}'") | |
| continue | |
| # Type conversions | |
| if key == "limit": | |
| # Convert to int and clamp to 1-100 | |
| try: | |
| limit_val = int(value) if isinstance(value, str) else value | |
| sanitized[key] = max(1, min(100, limit_val)) | |
| except (ValueError, TypeError): | |
| sanitized[key] = 20 # Default | |
| elif key == "offset": | |
| # Convert to int and ensure >= 0 | |
| try: | |
| offset_val = int(value) if isinstance(value, str) else value | |
| sanitized[key] = max(0, offset_val) | |
| except (ValueError, TypeError): | |
| sanitized[key] = 0 # Default | |
| elif key == "language": | |
| # Validate language enum (case-insensitive) | |
| lang_upper = str(value).upper() | |
| if lang_upper in ["DE", "FR", "IT", "EN"]: | |
| sanitized[key] = lang_upper.lower() | |
| else: | |
| sanitized[key] = "en" # Default to English | |
| elif key == "active_only": | |
| # Convert to bool | |
| sanitized[key] = bool(value) | |
| elif key == "status": | |
| status_val = str(value).strip().lower() | |
| if status_val in {"", "all", "any", "*", "none"}: | |
| print("⚠️ [ParliamentEngine] Removing non-specific status filter") | |
| continue | |
| status_map = { | |
| "pending": "Eingereicht", | |
| "submitted": "Eingereicht", | |
| "in_progress": "Eingereicht", | |
| "open": "Eingereicht", | |
| "accepted": "Angenommen", | |
| "approved": "Angenommen", | |
| "rejected": "Abgelehnt", | |
| "declined": "Abgelehnt", | |
| "completed": "Erledigt", | |
| "closed": "Erledigt", | |
| } | |
| if status_val.isdigit(): | |
| sanitized[key] = status_val | |
| else: | |
| mapped = status_map.get(status_val) | |
| if mapped: | |
| sanitized[key] = mapped | |
| else: | |
| print(f"⚠️ [ParliamentEngine] Unknown status '{value}' dropped") | |
| continue | |
| elif key == "body_key": | |
| sanitized[key] = str(value).upper() | |
| elif key == "level": | |
| sanitized[key] = str(value).lower() | |
| elif key == "query" and tool_name == "openparldata_search_parliamentarians": | |
| query_text = str(value) | |
| tokens = [tok for tok in query_text.replace(",", " ").split() if tok] | |
| if len(tokens) >= 2 and all(tok[0].isupper() for tok in tokens if tok): | |
| # Use last token (family name) for broader matching | |
| sanitized[key] = tokens[-1] | |
| else: | |
| sanitized[key] = value | |
| else: | |
| # Keep other values as-is | |
| sanitized[key] = value | |
| # Enforce German language for English UI users | |
| if requested_language == "en": | |
| sanitized["language"] = "de" | |
| elif "language" in sanitized: | |
| sanitized["language"] = sanitized["language"].lower() | |
| # Translate key textual filters into German for better recall | |
| if sanitized.get("language") == "de": | |
| for text_key in ("query", "topic"): | |
| if text_key in sanitized: | |
| text_value = str(sanitized[text_key]).strip() | |
| if text_value: | |
| translated = translate_to_german(text_value) | |
| if translated: | |
| sanitized[text_key] = translated | |
| else: | |
| # Restore original if translation failed | |
| sanitized[text_key] = text_value | |
| # Avoid empty required query strings by falling back to original input | |
| if "query" in sanitized: | |
| if not str(sanitized["query"]).strip(): | |
| fallback = str(original_arguments.get("query", "")).strip() | |
| if fallback: | |
| sanitized["query"] = translate_to_german(fallback) if sanitized.get("language") == "de" else fallback | |
| else: | |
| sanitized.pop("query", None) | |
| return sanitized | |
| def execute_tool( | |
| self, | |
| user_message: str, | |
| tool_name: str, | |
| arguments: dict, | |
| show_debug: bool, | |
| ) -> tuple[str, str | None]: | |
| # DEBUG: Capture arguments before MCP call | |
| print(f"\n🔍 [ParliamentEngine] execute_tool called:") | |
| print(f" Tool: {tool_name}") | |
| print(f" Arguments: {arguments}") | |
| print(f" Argument types: {dict((k, type(v).__name__) for k, v in arguments.items())}") | |
| return asyncio.run(execute_mcp_query(user_message, tool_name, arguments, show_debug)) | |
| def postprocess_tool_response( | |
| self, | |
| *, | |
| response: str, | |
| tool_name: str, | |
| explanation: str, | |
| debug_info: str | None, | |
| show_debug: bool, | |
| language_code: str, | |
| ) -> tuple[str, str | None, dict, str]: | |
| """Pass through the response for parsing in respond() function.""" | |
| # Simplified: just return the raw JSON response | |
| # The respond() function will handle parsing and card extraction | |
| # Don't embed raw JSON in message - use clean placeholder instead | |
| body = "Searching parliament data..." | |
| final_response = self._compose_response_text(explanation, debug_info, show_debug, body) | |
| return final_response, None, {}, response | |
| class BFSEngine(DatasetEngine): | |
| # Valid parameter names per tool | |
| TOOL_PARAMS = { | |
| "bfs_search": { | |
| "keywords", "language" # NO format parameter! | |
| }, | |
| "bfs_query_data": { | |
| "datacube_id", "filters", "format", "language" | |
| }, | |
| } | |
| def __init__(self): | |
| super().__init__( | |
| name="statistics", | |
| display_name="Swiss Statistics (BFS)", | |
| system_prompt=BFS_PROMPT, | |
| routing_instruction="Use only tools that begin with 'bfs_'. Never mention OpenParlData tools.", | |
| allowed_tools={ | |
| "bfs_search", | |
| "bfs_query_data", | |
| }, | |
| ) | |
| def sanitize_arguments(self, tool_name: str, arguments: dict) -> dict: | |
| """Sanitize arguments for BFS tools.""" | |
| sanitized = {} | |
| valid_params = self.TOOL_PARAMS.get(tool_name, set()) | |
| for key, value in arguments.items(): | |
| # Skip extra fields not in the tool schema | |
| if key not in valid_params: | |
| print(f"⚠️ [BFSEngine] Skipping invalid parameter '{key}' for {tool_name}") | |
| continue | |
| # Type conversions | |
| if key == "language": | |
| # Validate language enum (case-insensitive) | |
| lang_upper = str(value).upper() | |
| if lang_upper in ["DE", "FR", "IT", "EN"]: | |
| sanitized[key] = lang_upper.lower() | |
| else: | |
| sanitized[key] = "en" # Default to English | |
| elif key == "format": | |
| # Validate and normalize format enum (only for bfs_query_data) | |
| if tool_name == "bfs_query_data": | |
| format_upper = str(value).upper().replace("-", "_") | |
| # Map common values to DataFormat enum | |
| format_map = { | |
| "CSV": "csv", | |
| "JSON": "json", | |
| "JSON_STAT": "json-stat", | |
| "JSON_STAT2": "json-stat2", | |
| "PX": "px", | |
| } | |
| sanitized[key] = format_map.get(format_upper, "csv") # Default to CSV | |
| else: | |
| # Keep other values as-is | |
| sanitized[key] = value | |
| # Add default format for bfs_query_data if not present | |
| if tool_name == "bfs_query_data" and "format" not in sanitized: | |
| sanitized["format"] = "csv" | |
| return sanitized | |
| def execute_tool( | |
| self, | |
| user_message: str, | |
| tool_name: str, | |
| arguments: dict, | |
| show_debug: bool, | |
| ) -> tuple[str, str | None]: | |
| # DEBUG: Capture arguments after sanitization | |
| print(f"\n🔍 [BFSEngine] execute_tool called:") | |
| print(f" Tool: {tool_name}") | |
| print(f" Arguments (sanitized): {arguments}") | |
| print(f" Argument types: {dict((k, type(v).__name__) for k, v in arguments.items())}") | |
| return asyncio.run(execute_mcp_query_bfs(user_message, tool_name, arguments, show_debug)) | |
| def _parse_datacube_choices(response: str) -> tuple[dict, list]: | |
| datacube_map: dict[str, str] = {} | |
| datacube_choices: list[str] = [] | |
| import re | |
| lines = response.split('\n') | |
| i = 0 | |
| while i < len(lines): | |
| line = lines[i] | |
| match = re.search(r'^\s*\d+\.\s+\*\*([^*]+)\*\*\s*$', line) | |
| if match: | |
| datacube_id = match.group(1).strip() | |
| description = datacube_id | |
| if i + 1 < len(lines): | |
| next_line = lines[i + 1].strip() | |
| if not next_line.startswith('↳') and next_line: | |
| description = next_line | |
| elif i + 2 < len(lines): | |
| description = lines[i + 2].strip() or datacube_id | |
| if len(description) > 80: | |
| description = description[:77] + "..." | |
| label = f"{description} ({datacube_id})" | |
| datacube_choices.append(label) | |
| datacube_map[label] = datacube_id | |
| i += 1 | |
| return datacube_map, datacube_choices | |
| def _detect_csv(response: str) -> bool: | |
| lines = response.strip().split('\n') | |
| if len(lines) < 2: | |
| return False | |
| if ',' not in lines[0] or ',' not in lines[1]: | |
| return False | |
| prefix = response.lower()[:200] | |
| error_tokens = ["error", "no data", "no datacubes found", "try broader"] | |
| return not any(token in prefix for token in error_tokens) | |
| def postprocess_tool_response( | |
| self, | |
| *, | |
| response: str, | |
| tool_name: str, | |
| explanation: str, | |
| debug_info: str | None, | |
| show_debug: bool, | |
| language_code: str, | |
| ) -> tuple[str, str | None, dict, list]: | |
| csv_file_path = None | |
| datacube_map: dict[str, str] = {} | |
| datacube_choices: list[str] = [] | |
| body = "" | |
| if tool_name == "bfs_query_data" and self._detect_csv(response): | |
| rows = response.count('\n') | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| csv_filename = f"bfs_data_{timestamp}.csv" | |
| csv_file_path = os.path.join(tempfile.gettempdir(), csv_filename) | |
| with open(csv_file_path, 'w', encoding='utf-8') as f: | |
| f.write(response) | |
| body = ( | |
| "### 📊 Data Ready\n" | |
| f"✅ CSV file generated with {rows} rows\n\n" | |
| "💾 **Download your data using the button below**" | |
| ) | |
| else: | |
| if tool_name == "bfs_search" and "matching datacube" in response.lower(): | |
| datacube_map, datacube_choices = self._parse_datacube_choices(response) | |
| # If we found datacubes, show a simple message instead of the full response | |
| if datacube_choices: | |
| # Extract the search term from explanation | |
| import re | |
| match = re.search(r'related to (.+)', explanation, re.IGNORECASE) | |
| search_term = match.group(1).strip() if match else "your search" | |
| body = f"### 📊 Available Datasets\n\nHere is the data available for **{search_term}**. Please select a dataset below to download:" | |
| else: | |
| # No datacubes found, show the full error message | |
| body = f"### 📊 Results\n{response}" | |
| else: | |
| body = f"### 📊 Results\n{response}" | |
| final_response = self._compose_response_text(explanation, debug_info, show_debug, body) | |
| return final_response, csv_file_path, datacube_map, datacube_choices | |
| def fetch_datacube_data( | |
| self, | |
| datacube_id: str, | |
| language_code: str, | |
| show_debug: bool, | |
| ) -> tuple[str, str | None]: | |
| response, debug_info = self.execute_tool( | |
| user_message=f"Get data for datacube {datacube_id}", | |
| tool_name="bfs_query_data", | |
| arguments={"datacube_id": datacube_id, "language": language_code}, | |
| show_debug=show_debug, | |
| ) | |
| if self._detect_csv(response): | |
| rows = response.count('\n') | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| csv_filename = f"bfs_data_{timestamp}.csv" | |
| csv_file_path = os.path.join(tempfile.gettempdir(), csv_filename) | |
| with open(csv_file_path, 'w', encoding='utf-8') as f: | |
| f.write(response) | |
| message = ( | |
| "### 📊 Data Ready\n" | |
| f"✅ CSV file generated with {rows} rows for datacube: `{datacube_id}`\n\n" | |
| "💾 **Download your data using the button below**" | |
| ) | |
| if show_debug and debug_info: | |
| message = f"### 🔧 Debug Information\n{debug_info}\n\n---\n\n{message}" | |
| return message, csv_file_path | |
| error_message = f"❌ Error retrieving data:\n\n{response}" | |
| return error_message, None | |
| DATASET_ENGINES: dict[str, DatasetEngine] = { | |
| "parliament": ParliamentEngine(), | |
| "statistics": BFSEngine(), | |
| } | |
| # Initialize usage tracker with 50 requests per day limit | |
| tracker = UsageTracker(daily_limit=50) | |
| # Available languages | |
| LANGUAGES = { | |
| "English": "en", | |
| "Deutsch": "de", | |
| "Français": "fr", | |
| "Italiano": "it" | |
| } | |
| # Constants imported from datasets/ modules above | |
| def chat_response(message: str, history: list, language: str, show_debug: bool, dataset: str = "parliament") -> tuple[str, str | None, dict, list]: | |
| """ | |
| Main chat response function routed through dataset-specific engines. | |
| """ | |
| try: | |
| engine = DATASET_ENGINES.get(dataset) | |
| if not engine: | |
| return f"❌ Unknown dataset selected: {dataset}", None, {}, [] | |
| language_code = LANGUAGES.get(language, "en") | |
| return engine.respond(message, language, language_code, show_debug) | |
| except Exception as e: | |
| return f"❌ An error occurred: {str(e)}", None, {}, [] | |
| # Load custom CSS | |
| with open("ui/styles.css", "r") as f: | |
| custom_css = f.read() | |
| # Build Gradio interface | |
| with gr.Blocks(css=custom_css, title="Swiss and European Government Data LLM") as demo: | |
| # State to track datacube search results | |
| datacube_state = gr.State({}) # Maps display text → datacube_id | |
| # State to track parliament cards | |
| parliament_cards_state = gr.State([]) # List of card dicts | |
| parliament_page_state = gr.State(1) # Current page number | |
| gr.Markdown( | |
| """ | |
| <div class="chatbot-header"> | |
| <h1>🇨🇭 Swiss & European Government Data LLM</h1> | |
| <p>Explore Swiss parliament records and BFS statistics, with more datasets on the way.</p> | |
| </div> | |
| """ | |
| ) | |
| with gr.Row(): | |
| with gr.Column(scale=3): | |
| # Simple query input form | |
| with gr.Row(): | |
| msg = gr.Textbox( | |
| placeholder="Ask a question about Swiss parliamentary data or statistics...", | |
| show_label=False, | |
| scale=4, | |
| container=False | |
| ) | |
| submit = gr.Button("🔍 Search", variant="primary", scale=1) | |
| # Status/explanation text | |
| status_text = gr.Markdown("", visible=False) | |
| # CSV download file component | |
| download_file = gr.File( | |
| label="📥 Download Data", | |
| visible=False, | |
| interactive=False | |
| ) | |
| # Datacube selection (hidden by default, shown when search returns results) | |
| with gr.Row(visible=False) as datacube_selection_row: | |
| with gr.Column(scale=4): | |
| datacube_radio = gr.Radio( | |
| label="📋 Select Datacube for Download", | |
| choices=[], | |
| visible=True | |
| ) | |
| with gr.Column(scale=1): | |
| get_data_btn = gr.Button("📥 Get Data", variant="primary", size="lg") | |
| # Parliament cards display (hidden by default, shown when parliament results return) | |
| with gr.Column(visible=False) as parliament_cards_row: | |
| parliament_cards_html = gr.HTML("") | |
| with gr.Row(): | |
| prev_page_btn = gr.Button("◀ Previous", size="sm") | |
| page_info = gr.Markdown("Page 1") | |
| next_page_btn = gr.Button("Next ▶", size="sm") | |
| with gr.Column(scale=1): | |
| gr.Markdown("### ⚙️ Settings") | |
| dataset = gr.Radio( | |
| choices=[ | |
| "Swiss Parliament Data", | |
| "Swiss Statistics (BFS)" | |
| ], | |
| value="Swiss Parliament Data", | |
| label="Data Source", | |
| info="Choose which API to query" | |
| ) | |
| gr.HTML( | |
| """ | |
| <div class="coming-soon-row"> | |
| <span class="coming-soon-pill">ParlTalk • Coming Soon</span> | |
| <span class="coming-soon-pill">Eurostat • Coming Soon</span> | |
| </div> | |
| """ | |
| ) | |
| language = gr.Radio( | |
| choices=list(LANGUAGES.keys()), | |
| value="English", | |
| label="Language", | |
| info="Select response language" | |
| ) | |
| # Example queries display | |
| gr.Markdown("### 💡 Example Queries") | |
| examples_display = gr.Markdown() | |
| def ensure_message_history(history): | |
| """Normalize chat history to the format expected by gr.Chatbot(type='messages').""" | |
| normalized: list[dict] = [] | |
| if not history: | |
| return normalized | |
| for entry in history: | |
| if isinstance(entry, dict): | |
| role = entry.get("role") | |
| content = entry.get("content", "") | |
| if role: | |
| normalized.append({"role": role, "content": "" if content is None else str(content)}) | |
| elif isinstance(entry, (tuple, list)) and len(entry) == 2: | |
| user, assistant = entry | |
| if user is not None: | |
| normalized.append({"role": "user", "content": str(user)}) | |
| if assistant is not None: | |
| normalized.append({"role": "assistant", "content": str(assistant)}) | |
| return normalized | |
| def create_examples_text(dataset_choice: str, language: str) -> str: | |
| """Create formatted example queries text.""" | |
| lang_code = LANGUAGES.get(language, "en") | |
| if dataset_choice == "Swiss Parliament Data": | |
| examples = OPENPARLDATA_EXAMPLES.get(lang_code, OPENPARLDATA_EXAMPLES["en"]) | |
| elif dataset_choice == "Swiss Statistics (BFS)": | |
| examples = BFS_EXAMPLES.get(lang_code, BFS_EXAMPLES["en"]) | |
| else: | |
| examples = OPENPARLDATA_EXAMPLES.get(lang_code, OPENPARLDATA_EXAMPLES["en"]) | |
| examples_md = "\n".join([f"- {example}" for example in examples]) | |
| return examples_md | |
| # Helper functions imported from ui.helpers | |
| def build_parliament_card(item: dict, lang_code: str) -> dict: | |
| """Normalize OpenParlData rows into unified card metadata.""" | |
| card = { | |
| "title": "Untitled", | |
| "url": "#", | |
| "date": "", | |
| "category": "Result", | |
| "summary": "" | |
| } | |
| if not isinstance(item, dict): | |
| return card | |
| # People directory | |
| if any(key in item for key in ("firstname", "lastname", "fullname")): | |
| card["category"] = "Person" | |
| fullname = item.get("fullname") or f"{item.get('firstname', '')} {item.get('lastname', '')}".strip() | |
| card["title"] = fullname or "Parliamentarian" | |
| website = prefer_language(item.get("website_parliament_url"), lang_code) | |
| card["url"] = website or item.get("url_api", "#") | |
| party_display = None | |
| if item.get("party"): | |
| party_display = prefer_language(item.get("party"), lang_code) | |
| if not party_display and isinstance(item["party"], dict): | |
| party_display = prefer_language(item["party"], "de") | |
| if not party_display and item.get("party_harmonized"): | |
| party_display = prefer_language(item.get("party_harmonized"), lang_code) | |
| body_key = item.get("body_key") | |
| summary_parts = [] | |
| if party_display: | |
| summary_parts.append(f"Party: {party_display}") | |
| if body_key: | |
| summary_parts.append(f"Body: {body_key}") | |
| if summary_parts: | |
| card["summary"] = " · ".join(summary_parts) | |
| updated = item.get("updated_at") or item.get("created_at") | |
| if updated: | |
| card["date"] = updated[:10] | |
| return card | |
| # Meetings | |
| if item.get("begin_date") and (item.get("name") or item.get("location") or item.get("type") == "meeting"): | |
| card["category"] = "Meeting" | |
| card["title"] = prefer_language(item.get("name"), lang_code) or item.get("number") or "Meeting" | |
| card["date"] = (item.get("begin_date") or "")[:10] | |
| card["url"] = prefer_language(item.get("url_external"), lang_code) or item.get("url_api", "#") | |
| details = [] | |
| if item.get("location"): | |
| details.append(item["location"]) | |
| if item.get("body_key"): | |
| details.append(f"Body: {item['body_key']}") | |
| if item.get("number"): | |
| details.append(f"Meeting #{item['number']}") | |
| if details: | |
| card["summary"] = " · ".join(details) | |
| return card | |
| # Votes | |
| if "results_yes" in item or "results_no" in item: | |
| card["category"] = "Vote" | |
| card["title"] = prefer_language(item.get("title"), lang_code) or "Vote" | |
| card["date"] = (item.get("date") or "")[:10] | |
| card["url"] = prefer_language(item.get("url_external"), lang_code) or item.get("url_api", "#") | |
| affair_title = prefer_language(item.get("affair_title"), lang_code) | |
| if affair_title: | |
| card["summary"] = affair_title | |
| else: | |
| totals = [] | |
| if item.get("results_yes") is not None: | |
| totals.append(f"Yes {item.get('results_yes')}") | |
| if item.get("results_no") is not None: | |
| totals.append(f"No {item.get('results_no')}") | |
| if item.get("results_abstention") is not None: | |
| totals.append(f"Abst {item.get('results_abstention')}") | |
| if totals: | |
| card["summary"] = " · ".join(totals) | |
| return card | |
| # Affairs / motions | |
| if "type_name" in item or "number" in item or "state_name" in item: | |
| card["category"] = "Affair" | |
| card["title"] = prefer_language(item.get("title"), lang_code) or item.get("number") or "Affair" | |
| card["url"] = prefer_language(item.get("url_external"), lang_code) or item.get("url_api", "#") | |
| begin = item.get("begin_date") or item.get("created_at") | |
| if begin: | |
| card["date"] = begin[:10] | |
| details = [] | |
| type_name = prefer_language(item.get("type_name"), lang_code) | |
| state_name = prefer_language(item.get("state_name"), lang_code) | |
| if type_name: | |
| details.append(type_name) | |
| if state_name: | |
| details.append(state_name) | |
| if item.get("number"): | |
| details.append(item["number"]) | |
| if details: | |
| card["summary"] = " · ".join(details) | |
| return card | |
| # Speeches / debates | |
| if any(key in item for key in ("transcript", "speech_text", "speech_text_content", "speaker_name", "person_name", "person")): | |
| card["category"] = "Speech" | |
| # Extract person from nested expand structure: person = {"data": [...], "meta": {...}} | |
| person_data = item.get("person", {}) | |
| if isinstance(person_data, dict) and "data" in person_data and person_data["data"]: | |
| person = person_data["data"][0] | |
| elif isinstance(person_data, dict): | |
| person = person_data | |
| else: | |
| person = {} | |
| speaker = ( | |
| prefer_language(person.get("fullname"), lang_code) | |
| or prefer_language(item.get("person_name"), lang_code) | |
| or person.get("fullname") | |
| or item.get("speaker_name") | |
| ) | |
| # Extract affair from nested expand structure | |
| affair_data = item.get("affair", {}) | |
| if isinstance(affair_data, dict) and "data" in affair_data and affair_data["data"]: | |
| affair = affair_data["data"][0] | |
| elif isinstance(affair_data, dict): | |
| affair = affair_data | |
| else: | |
| affair = {} | |
| affair_title = prefer_language(affair.get("title"), lang_code) | |
| card["title"] = ( | |
| prefer_language(item.get("title"), lang_code) | |
| or affair_title | |
| or (f"Rede von {speaker}" if speaker else "Rede") | |
| ) | |
| card["date"] = (item.get("date") or item.get("date_start") or "")[:10] | |
| # Extract meeting from nested expand structure | |
| meeting_data = item.get("meeting") | |
| if isinstance(meeting_data, dict) and "data" in meeting_data and meeting_data["data"]: | |
| meeting = meeting_data["data"][0] | |
| else: | |
| meeting = {} | |
| # Speeches use "url" field (plain string), not "url_external" (dict) | |
| external_url = pick_external_url( | |
| item.get("url"), # Speeches have direct url field | |
| item.get("url_external"), | |
| affair.get("url_external") if isinstance(affair, dict) else None, | |
| meeting.get("url_external") if isinstance(meeting, dict) else None, | |
| ) | |
| # Never use url_api for clickable links | |
| card["url"] = external_url or "#" | |
| text_content = item.get("speech_text_content") | |
| summary = None | |
| if isinstance(text_content, dict): | |
| summary = prefer_language(text_content, lang_code) or prefer_language(text_content, "de") | |
| elif isinstance(text_content, str): | |
| summary = text_content | |
| elif item.get("transcript"): | |
| summary = item.get("transcript") | |
| elif item.get("speech_text"): | |
| summary = item.get("speech_text") | |
| if summary: | |
| summary = strip_html(summary)[:200] | |
| summary_parts = [] | |
| if speaker: | |
| summary_parts.append(speaker) | |
| if summary: | |
| summary_parts.append(summary) | |
| if affair_title and affair_title != card["title"]: | |
| summary_parts.append(affair_title) | |
| if summary_parts: | |
| card["summary"] = " — ".join(summary_parts[:2]) | |
| return card | |
| # Fallback generic | |
| if item.get("title"): | |
| card["title"] = prefer_language(item.get("title"), lang_code) or item["title"] | |
| external = prefer_language(item.get("url_external"), lang_code) | |
| card["url"] = external or item.get("url_api", "#") | |
| if item.get("date"): | |
| card["date"] = item["date"][:10] | |
| return card | |
| def render_parliament_cards(cards: list[dict], page: int, items_per_page: int = 10) -> tuple[str, str, int, bool]: | |
| """Render parliament cards as HTML with pagination.""" | |
| if not cards: | |
| return "", "No results", 1, False | |
| total_pages = (len(cards) + items_per_page - 1) // items_per_page | |
| page = max(1, min(page, total_pages)) # Clamp page to valid range | |
| show_pagination = len(cards) > items_per_page | |
| start_idx = (page - 1) * items_per_page | |
| end_idx = min(start_idx + items_per_page, len(cards)) | |
| page_cards = cards[start_idx:end_idx] | |
| # Generate HTML for cards | |
| cards_html = '<div style="display: flex; flex-direction: column; gap: 15px;">' | |
| for card in page_cards: | |
| title = card.get("title", "Untitled") | |
| url = card.get("url", "#") | |
| date = card.get("date", "") | |
| category = card.get("category", "Result") | |
| summary = card.get("summary", "") | |
| # Truncate title if too long | |
| if len(title) > 120: | |
| title = title[:117] + "..." | |
| date_badge = f'<span style="background: #e0e0e0; padding: 4px 8px; border-radius: 4px; font-size: 12px; color: #666;">{date}</span>' if date else '' | |
| cards_html += f''' | |
| <a href="{url}" target="_blank" style="text-decoration: none; display: block;" rel="noopener noreferrer"> | |
| <div class="parliament-card"> | |
| <div style="display: flex; justify-content: space-between; align-items: start; gap: 12px;"> | |
| <div style="display: flex; flex-direction: column; gap: 6px; flex: 1;"> | |
| <span class="category-badge">{category}</span> | |
| <h3 style="margin: 0; color: #333; font-size: 16px;">{title}</h3> | |
| {f'<p style="margin: 0; color: #555; font-size: 13px;">{summary}</p>' if summary else ''} | |
| </div> | |
| {date_badge} | |
| </div> | |
| </div> | |
| </a> | |
| ''' | |
| cards_html += '</div>' | |
| page_info = f"Page {page} of {total_pages} ({len(cards)} total results)" | |
| return cards_html, page_info, page, show_pagination | |
| # Handle message submission | |
| def respond(message, language, dataset_choice, current_datacube_state, current_parliament_cards, current_page, request: gr.Request): | |
| show_debug = False # Debug mode disabled in UI | |
| if not message.strip(): | |
| return "", gr.update(visible=False), None, gr.update(visible=False), current_datacube_state, gr.update(), gr.update(visible=False), current_parliament_cards, current_page, "", "", gr.update(visible=False), gr.update(), gr.update() | |
| # Check usage limit | |
| user_id = request.client.host if request and hasattr(request, 'client') else "unknown" | |
| if not tracker.check_limit(user_id): | |
| status_msg = ( | |
| "⚠️ **Daily request limit reached.** You have used all 50 requests for today. " | |
| "Please try again tomorrow.\n\nThis limit helps us keep the service free and available for everyone." | |
| ) | |
| return "", gr.update(value=status_msg, visible=True), None, gr.update(visible=False), current_datacube_state, gr.update(), gr.update(visible=False), current_parliament_cards, current_page, "", "", gr.update(visible=False), gr.update(), gr.update() | |
| # Map dataset choice to engine type | |
| dataset_map = { | |
| "Swiss Parliament Data": "parliament", | |
| "Swiss Statistics (BFS)": "statistics" | |
| } | |
| dataset_type = dataset_map.get(dataset_choice, "parliament") | |
| # Get bot response (returns tuple with optional CSV file and results data) | |
| # Create temporary chat history for API call | |
| temp_chat = [] | |
| bot_message, csv_file, datacube_map, results_data = chat_response( | |
| message, temp_chat, language, show_debug, dataset_type | |
| ) | |
| engine_instance = DATASET_ENGINES.get(dataset_type) | |
| last_request = getattr(engine_instance, "_last_request", None) if engine_instance else None | |
| # Parse JSON and extract cards for Parliament dataset | |
| parliament_cards: list[dict] = [] | |
| if dataset_type == "parliament" and results_data and isinstance(results_data, str): | |
| try: | |
| print(f"\n🔍 [respond] Parsing JSON results_data...") | |
| data = json.loads(results_data, strict=False) | |
| print(f"✅ [respond] JSON parsed successfully") | |
| if isinstance(data, dict) and data.get("status") == "error": | |
| error_msg = data.get("message") or data.get("detail") or "Die OpenParlData-API meldet einen Fehler." | |
| endpoint = data.get("endpoint") | |
| if endpoint: | |
| error_msg += f"\n\nEndpoint: `{endpoint}`" | |
| bot_message = f"❌ {error_msg}" | |
| return ( | |
| "", | |
| gr.update(value=bot_message, visible=True), | |
| None, | |
| gr.update(visible=False), | |
| current_datacube_state, | |
| gr.update(), | |
| gr.update(visible=False), | |
| current_parliament_cards, | |
| current_page, | |
| "", | |
| "", | |
| gr.update(visible=False), | |
| gr.update(), | |
| gr.update() | |
| ) | |
| if isinstance(data, dict) and isinstance(data.get("data"), list): | |
| items = data["data"] | |
| print(f"✅ [respond] Found data array with {len(items)} items") | |
| lang_code = LANGUAGES.get(language, "en") | |
| # Filter out error objects before building cards | |
| valid_items = [ | |
| item for item in items | |
| if isinstance(item, dict) and item.get("status") != "error" | |
| ] | |
| if len(valid_items) < len(items): | |
| print(f"⚠️ [respond] Filtered out {len(items) - len(valid_items)} error objects") | |
| for item in valid_items: | |
| parliament_cards.append(build_parliament_card(item, lang_code)) | |
| # Optional date filtering for meetings (client-side) | |
| if last_request and last_request.get("tool") == "openparldata_search_meetings": | |
| args = last_request.get("arguments", {}) | |
| date_from = args.get("date_from") | |
| date_to = args.get("date_to") | |
| if date_from or date_to: | |
| def within_window(date_value: str | None) -> bool: | |
| if not date_value: | |
| return False | |
| try: | |
| card_date = datetime.fromisoformat(date_value).date() | |
| except ValueError: | |
| try: | |
| card_date = datetime.strptime(date_value, "%Y-%m-%d").date() | |
| except ValueError: | |
| return False | |
| if date_from: | |
| start = datetime.strptime(date_from, "%Y-%m-%d").date() | |
| if card_date < start: | |
| return False | |
| if date_to: | |
| end = datetime.strptime(date_to, "%Y-%m-%d").date() | |
| if card_date > end: | |
| return False | |
| return True | |
| before = len(parliament_cards) | |
| parliament_cards = [card for card in parliament_cards if within_window(card.get("date"))] | |
| print(f"✅ [respond] Filtered meetings by date window ({before} → {len(parliament_cards)})") | |
| # Limit display to avoid overwhelming the UI | |
| MAX_RESULTS = 50 | |
| truncated = False | |
| if len(parliament_cards) > MAX_RESULTS: | |
| print(f"⚠️ [respond] Truncating card list from {len(parliament_cards)} to {MAX_RESULTS}") | |
| parliament_cards = parliament_cards[:MAX_RESULTS] | |
| truncated = True | |
| if parliament_cards: | |
| total = data.get("meta", {}).get("total_records") or len(parliament_cards) | |
| display_count = len(parliament_cards) | |
| bot_message = f"**Found {total} result(s).** Showing {display_count} items below:" | |
| if LANGUAGES.get(language, "en") == "en": | |
| bot_message += "\n\n*Note: English content is not available from the API. Results are displayed in German.*" | |
| if truncated: | |
| bot_message += f"\n\n_Only the first {MAX_RESULTS} items are displayed. Refine your search for more specific results._" | |
| elif last_request and last_request.get("tool") == "openparldata_search_meetings": | |
| bot_message = "No meetings found that match the requested filters. Try adjusting the date range or search keywords." | |
| else: | |
| print("❌ [respond] Data structure does not contain a 'data' array.") | |
| except json.JSONDecodeError as e: | |
| print(f"❌ [respond] JSON parsing failed: {e}") | |
| except Exception as e: | |
| print(f"❌ [respond] Unexpected error during card extraction: {e}") | |
| # Handle parliament cards (for Parliament dataset) | |
| if dataset_type == "parliament" and parliament_cards: | |
| cards_html, page_info, page_num, show_pagination = render_parliament_cards(parliament_cards, 1) | |
| return ( | |
| "", | |
| gr.update(value=bot_message, visible=True), | |
| None, | |
| gr.update(visible=False), | |
| current_datacube_state, | |
| gr.update(), | |
| gr.update(visible=False), | |
| parliament_cards, # parliament_cards_state | |
| page_num, # parliament_page_state | |
| cards_html, # parliament_cards_html | |
| page_info, # page_info | |
| gr.update(visible=True), # parliament_cards_row | |
| gr.update(visible=show_pagination), # prev_page_btn | |
| gr.update(visible=show_pagination) # next_page_btn | |
| ) | |
| # Handle datacube search results (for BFS dataset) | |
| if dataset_type == "statistics" and results_data: | |
| return ( | |
| "", | |
| gr.update(value=bot_message, visible=True), | |
| None, | |
| gr.update(visible=False), | |
| datacube_map, | |
| gr.update(choices=results_data, value=None), | |
| gr.update(visible=True), | |
| current_parliament_cards, | |
| current_page, | |
| "", | |
| "", | |
| gr.update(visible=False), | |
| gr.update(), | |
| gr.update() | |
| ) | |
| # Handle CSV download | |
| if csv_file: | |
| return ( | |
| "", | |
| gr.update(value=bot_message, visible=True), | |
| csv_file, | |
| gr.update(visible=True), | |
| current_datacube_state, | |
| gr.update(), | |
| gr.update(visible=False), | |
| current_parliament_cards, | |
| current_page, | |
| "", | |
| "", | |
| gr.update(visible=False), | |
| gr.update(), | |
| gr.update() | |
| ) | |
| return ( | |
| "", | |
| gr.update(value=bot_message, visible=True), | |
| None, | |
| gr.update(visible=False), | |
| current_datacube_state, | |
| gr.update(), | |
| gr.update(visible=False), | |
| current_parliament_cards, | |
| current_page, | |
| "", | |
| "", | |
| gr.update(visible=False), | |
| gr.update(), | |
| gr.update() | |
| ) | |
| # Handle parliament pagination | |
| def prev_page(cards, current_page): | |
| """Go to previous page of parliament results.""" | |
| new_page = max(1, current_page - 1) | |
| cards_html, page_info, page_num, show_pagination = render_parliament_cards(cards, new_page) | |
| return cards_html, page_info, page_num | |
| def next_page(cards, current_page): | |
| """Go to next page of parliament results.""" | |
| if not cards: | |
| return "", "No results", current_page | |
| total_pages = (len(cards) + 9) // 10 # 10 items per page | |
| new_page = min(total_pages, current_page + 1) | |
| cards_html, page_info, page_num, show_pagination = render_parliament_cards(cards, new_page) | |
| return cards_html, page_info, page_num | |
| # Handle "Get Data" button click for datacube selection | |
| def fetch_datacube_data(selected_choice, current_datacube_state, language, request: gr.Request): | |
| show_debug = False # Debug mode disabled in UI | |
| if not selected_choice or not current_datacube_state: | |
| error_msg = "⚠️ Please select a datacube first." | |
| return gr.update(value=error_msg, visible=True), None, gr.update(visible=False), gr.update(visible=False) | |
| # Check usage limit | |
| user_id = request.client.host if request and hasattr(request, 'client') else "unknown" | |
| if not tracker.check_limit(user_id): | |
| bot_message = ( | |
| "⚠️ Daily request limit reached. You have used all 50 requests for today. " | |
| "Please try again tomorrow.\n\nThis limit helps us keep the service free and available for everyone." | |
| ) | |
| return gr.update(value=bot_message, visible=True), None, gr.update(visible=False), gr.update(visible=False) | |
| # Get datacube ID from mapping | |
| datacube_id = current_datacube_state.get(selected_choice) | |
| if not datacube_id: | |
| error_msg = "❌ Error: Could not find datacube ID for selected option." | |
| return gr.update(value=error_msg, visible=True), None, gr.update(visible=False), gr.update(visible=False) | |
| # Get language code | |
| lang_code = LANGUAGES.get(language, "en") | |
| bfs_engine = DATASET_ENGINES.get("statistics") | |
| if not isinstance(bfs_engine, BFSEngine): | |
| error_msg = "❌ Error: BFS engine unavailable." | |
| return gr.update(value=error_msg, visible=True), None, gr.update(visible=False), gr.update(visible=False) | |
| bot_message, csv_file_path = bfs_engine.fetch_datacube_data(datacube_id, lang_code, show_debug) | |
| if csv_file_path: | |
| return gr.update(value=bot_message, visible=True), csv_file_path, gr.update(visible=True), gr.update(visible=False) | |
| return gr.update(value=bot_message, visible=True), None, gr.update(visible=False), gr.update(visible=False) | |
| msg.submit( | |
| respond, | |
| [msg, language, dataset, datacube_state, parliament_cards_state, parliament_page_state], | |
| [msg, status_text, download_file, download_file, datacube_state, datacube_radio, datacube_selection_row, | |
| parliament_cards_state, parliament_page_state, parliament_cards_html, page_info, parliament_cards_row, | |
| prev_page_btn, next_page_btn] | |
| ) | |
| submit.click( | |
| respond, | |
| [msg, language, dataset, datacube_state, parliament_cards_state, parliament_page_state], | |
| [msg, status_text, download_file, download_file, datacube_state, datacube_radio, datacube_selection_row, | |
| parliament_cards_state, parliament_page_state, parliament_cards_html, page_info, parliament_cards_row, | |
| prev_page_btn, next_page_btn] | |
| ) | |
| get_data_btn.click( | |
| fetch_datacube_data, | |
| [datacube_radio, datacube_state, language], | |
| [status_text, download_file, download_file, datacube_selection_row] | |
| ) | |
| prev_page_btn.click( | |
| prev_page, | |
| [parliament_cards_state, parliament_page_state], | |
| [parliament_cards_html, page_info, parliament_page_state] | |
| ) | |
| next_page_btn.click( | |
| next_page, | |
| [parliament_cards_state, parliament_page_state], | |
| [parliament_cards_html, page_info, parliament_page_state] | |
| ) | |
| # Update examples when dataset or language changes | |
| dataset.change( | |
| create_examples_text, | |
| [dataset, language], | |
| [examples_display] | |
| ) | |
| language.change( | |
| create_examples_text, | |
| [dataset, language], | |
| [examples_display] | |
| ) | |
| # Initialize examples on load | |
| demo.load( | |
| create_examples_text, | |
| [dataset, language], | |
| [examples_display] | |
| ) | |
| gr.Markdown( | |
| """ | |
| --- | |
| **Data Sources:** | |
| - **Swiss Parliament Data:** with thanks to Christian, Florin and the many contributors for creating OpenParlData.ch, the model queries their API to retrieve parliamentary data | |
| - **Swiss Statistics (BFS):** Federal Statistical Office data via PxWeb API | |
| **Rate Limit:** 50 requests per day per user (shared across both datasets) to keep the service affordable and accessible. | |
| Powered by [Llama-3.1-8B-Instruct](https://huggingface.co/meta-llama/Llama-3.1-8B-Instruct) via HF Inference Providers and [Model Context Protocol (MCP)](https://modelcontextprotocol.io/) | |
| """ | |
| ) | |
| # Launch the app | |
| if __name__ == "__main__": | |
| demo.launch() | |