#!/usr/bin/env python3 """ OpenParlData MCP Server This MCP server provides access to Swiss parliamentary data from the OpenParlData API. It enables searching and retrieving information about parliamentarians, votes, motions, and other parliamentary activities across Swiss federal, cantonal, and municipal levels. Note: This is based on typical parliamentary API endpoints as the actual OpenParlData API documentation was not accessible at the time of creation. """ import os import json from datetime import datetime from typing import Optional, List, Dict, Any from enum import Enum import httpx from mcp.server.fastmcp import FastMCP from pydantic import BaseModel, Field, field_validator, ConfigDict # Initialize the MCP server mcp = FastMCP("openparldata_mcp") # Constants API_BASE_URL = "https://api.openparldata.ch/v1" CHARACTER_LIMIT = 25000 DEFAULT_LIMIT = 20 MAX_LIMIT = 100 # Enums for validation class Language(str, Enum): DE = "de" FR = "fr" IT = "it" EN = "en" class ParliamentLevel(str, Enum): FEDERAL = "federal" CANTONAL = "cantonal" MUNICIPAL = "municipal" class VoteType(str, Enum): FINAL = "final" DETAIL = "detail" OVERALL = "overall" class ResponseFormat(str, Enum): JSON = "json" MARKDOWN = "markdown" # Pydantic models for input validation class SearchParliamentariansInput(BaseModel): """Input for searching parliamentarians.""" model_config = ConfigDict(str_strip_whitespace=True, validate_assignment=True, extra='forbid') query: Optional[str] = Field(None, description="Search query for name or party", min_length=1, max_length=100) canton: Optional[str] = Field(None, description="Canton code (e.g., 'ZH', 'BE', 'GE')", pattern="^[A-Z]{2}$") party: Optional[str] = Field(None, description="Party abbreviation (e.g., 'SP', 'SVP', 'FDP')") active_only: bool = Field(True, description="Only return active parliamentarians") level: Optional[ParliamentLevel] = Field(None, description="Parliament level filter") language: Language = Field(Language.EN, description="Response language") limit: int = Field(DEFAULT_LIMIT, description="Maximum results to return", ge=1, le=MAX_LIMIT) offset: int = Field(0, description="Pagination offset", ge=0) response_format: ResponseFormat = Field(ResponseFormat.MARKDOWN, description="Response format") class GetParliamentarianInput(BaseModel): """Input for getting a specific parliamentarian's details.""" model_config = ConfigDict(str_strip_whitespace=True, validate_assignment=True, extra='forbid') person_id: str = Field(..., description="Unique ID of the parliamentarian", min_length=1, max_length=50) include_votes: bool = Field(False, description="Include recent voting history") include_motions: bool = Field(False, description="Include submitted motions") language: Language = Field(Language.EN, description="Response language") response_format: ResponseFormat = Field(ResponseFormat.MARKDOWN, description="Response format") class SearchVotesInput(BaseModel): """Input for searching parliamentary votes.""" model_config = ConfigDict(str_strip_whitespace=True, validate_assignment=True, extra='forbid') query: Optional[str] = Field(None, description="Search query for vote title or description", min_length=1, max_length=200) date_from: Optional[str] = Field(None, description="Start date (ISO format: YYYY-MM-DD)", pattern="^\\d{4}-\\d{2}-\\d{2}$") date_to: Optional[str] = Field(None, description="End date (ISO format: YYYY-MM-DD)", pattern="^\\d{4}-\\d{2}-\\d{2}$") parliament_id: Optional[str] = Field(None, description="Filter by parliament ID") vote_type: Optional[VoteType] = Field(None, description="Type of vote") level: Optional[ParliamentLevel] = Field(None, description="Parliament level") language: Language = Field(Language.EN, description="Response language") limit: int = Field(DEFAULT_LIMIT, description="Maximum results to return", ge=1, le=MAX_LIMIT) offset: int = Field(0, description="Pagination offset", ge=0) response_format: ResponseFormat = Field(ResponseFormat.MARKDOWN, description="Response format") class GetVoteDetailsInput(BaseModel): """Input for getting detailed vote information.""" model_config = ConfigDict(str_strip_whitespace=True, validate_assignment=True, extra='forbid') vote_id: str = Field(..., description="Unique vote identifier", min_length=1, max_length=50) include_individual_votes: bool = Field(False, description="Include how each parliamentarian voted") language: Language = Field(Language.EN, description="Response language") response_format: ResponseFormat = Field(ResponseFormat.MARKDOWN, description="Response format") class SearchMotionsInput(BaseModel): """Input for searching parliamentary motions and proposals.""" model_config = ConfigDict(str_strip_whitespace=True, validate_assignment=True, extra='forbid') query: Optional[str] = Field(None, description="Search query for motion text", min_length=1, max_length=200) submitter_id: Optional[str] = Field(None, description="Filter by submitter's ID") status: Optional[str] = Field(None, description="Motion status (e.g., 'pending', 'accepted', 'rejected')") date_from: Optional[str] = Field(None, description="Start date (ISO format)", pattern="^\\d{4}-\\d{2}-\\d{2}$") date_to: Optional[str] = Field(None, description="End date (ISO format)", pattern="^\\d{4}-\\d{2}-\\d{2}$") level: Optional[ParliamentLevel] = Field(None, description="Parliament level") language: Language = Field(Language.EN, description="Response language") limit: int = Field(DEFAULT_LIMIT, description="Maximum results", ge=1, le=MAX_LIMIT) offset: int = Field(0, description="Pagination offset", ge=0) response_format: ResponseFormat = Field(ResponseFormat.MARKDOWN, description="Response format") class SearchDebatesInput(BaseModel): """Input for searching parliamentary debates.""" model_config = ConfigDict(str_strip_whitespace=True, validate_assignment=True, extra='forbid') query: Optional[str] = Field(None, description="Search query for debate content", min_length=1, max_length=200) date_from: Optional[str] = Field(None, description="Start date (ISO format)", pattern="^\\d{4}-\\d{2}-\\d{2}$") date_to: Optional[str] = Field(None, description="End date (ISO format)", pattern="^\\d{4}-\\d{2}-\\d{2}$") speaker_id: Optional[str] = Field(None, description="Filter by speaker's ID") topic: Optional[str] = Field(None, description="Topic or theme filter") parliament_id: Optional[str] = Field(None, description="Parliament identifier") level: Optional[ParliamentLevel] = Field(None, description="Parliament level") language: Language = Field(Language.EN, description="Response language") limit: int = Field(DEFAULT_LIMIT, description="Maximum results", ge=1, le=MAX_LIMIT) offset: int = Field(0, description="Pagination offset", ge=0) response_format: ResponseFormat = Field(ResponseFormat.MARKDOWN, description="Response format") class SearchMeetingsInput(BaseModel): """Input for searching parliamentary meetings and sessions.""" model_config = ConfigDict(str_strip_whitespace=True, validate_assignment=True, extra='forbid') query: Optional[str] = Field(None, description="Search query for meeting name or description", min_length=1, max_length=200) date_from: Optional[str] = Field(None, description="Start date (ISO format: YYYY-MM-DD)", pattern="^\\d{4}-\\d{2}-\\d{2}$") date_to: Optional[str] = Field(None, description="End date (ISO format: YYYY-MM-DD)", pattern="^\\d{4}-\\d{2}-\\d{2}$") body_key: Optional[str] = Field(None, description="Filter by body key (e.g., 'ZH')") level: Optional[ParliamentLevel] = Field(None, description="Parliament level") language: Language = Field(Language.EN, description="Response language") limit: int = Field(DEFAULT_LIMIT, description="Maximum results", ge=1, le=MAX_LIMIT) offset: int = Field(0, description="Pagination offset", ge=0) response_format: ResponseFormat = Field(ResponseFormat.MARKDOWN, description="Response format") # Helper functions def truncate_response(content: str, limit: int = CHARACTER_LIMIT) -> str: """Truncate response if it exceeds the character limit, but never break JSON payloads.""" stripped = content.lstrip() if stripped.startswith("{") or stripped.startswith("["): return content # Preserve JSON so downstream parsers receive valid payloads if len(content) <= limit: return content return content[:limit] + "\n\n... [Response truncated due to size limit. Use pagination parameters to retrieve more data.]" def format_date(date_str: str) -> str: """Format ISO date string to human-readable format.""" try: dt = datetime.fromisoformat(date_str.replace('Z', '+00:00')) return dt.strftime("%B %d, %Y") except: return date_str def _extract_multilingual_text(obj: Dict[str, Any] | None, preferred_lang: str = "de") -> str | None: """Helper to extract a text value from multilingual dictionaries.""" if not isinstance(obj, dict): return obj lang_order = [preferred_lang, "de", "fr", "it", "rm", "en"] for lang in lang_order: value = obj.get(lang) if value: return value return next(iter(obj.values()), None) def format_parliamentarian_markdown(person: Dict[str, Any]) -> str: """Format parliamentarian data as markdown.""" preferred_lang = "de" fullname = person.get("fullname") if not fullname: firstname = person.get("firstname", "") lastname = person.get("lastname", "") fullname = f"{firstname} {lastname}".strip() or "Parliamentarian" lines = [f"## {fullname}"] party_obj = person.get("party") or person.get("party_harmonized") party = _extract_multilingual_text(party_obj, preferred_lang) if not party: party = person.get("party_name") if party: lines.append(f"**Party:** {party}") body_key = person.get("body_key") if body_key: lines.append(f"**Body:** {body_key}") electoral_district = _extract_multilingual_text(person.get("electoral_district"), preferred_lang) if electoral_district: lines.append(f"**District:** {electoral_district}") status = person.get("active") if status is not None: lines.append(f"**Status:** {'Active' if status else 'Inactive'}") if person.get("email"): lines.append(f"**Email:** {person['email']}") if person.get("phone"): lines.append(f"**Phone:** {person['phone']}") website = _extract_multilingual_text(person.get("website_parliament_url"), preferred_lang) if website: lines.append(f"**Official Page:** {website}") # Append recent affairs when available (requires expand=affairs) affairs = person.get("affairs") if isinstance(affairs, list) and affairs: lines.append("\n### Recent Affairs") for affair in affairs[:5]: title = _extract_multilingual_text(affair.get("title"), preferred_lang) or affair.get("title") number = affair.get("number") begin_date = affair.get("begin_date") summary = f"- {title}" if title else "- Affair" if number: summary += f" ({number})" if begin_date: summary += f" – {begin_date[:10]}" lines.append(summary) # Append recent votes when available (requires expand=votes) votes = person.get("votes") if isinstance(votes, list) and votes: lines.append("\n### Recent Votes") for vote in votes[:5]: voting = vote.get("voting") date = None title = None if isinstance(voting, dict): date = voting.get("date") title = _extract_multilingual_text(voting.get("title"), preferred_lang) decision = vote.get("decision") or vote.get("value") summary = "- Vote" if date: summary += f" on {date[:10]}" if title: summary += f": {title}" if decision: summary += f" → {decision}" lines.append(summary) return "\n".join(lines) def format_vote_markdown(vote: Dict[str, Any]) -> str: """Format vote data as markdown.""" lines = [ f"## {vote.get('title', 'Vote')}", f"**Date:** {format_date(vote.get('date', ''))}", f"**Type:** {vote.get('type', 'N/A')}", f"**Result:** {vote.get('result', 'N/A')}", ] if vote.get('yes_count') is not None: lines.extend([ f"\n### Vote Count", f"- **Yes:** {vote['yes_count']}", f"- **No:** {vote.get('no_count', 0)}", f"- **Abstentions:** {vote.get('abstention_count', 0)}", ]) if vote.get('description'): lines.extend(["\n### Description", vote['description']]) return "\n".join(lines) def format_motion_markdown(motion: Dict[str, Any]) -> str: """Format motion data as markdown.""" lines = [ f"## {motion.get('title', 'Motion')}", f"**Submitted:** {format_date(motion.get('submission_date', ''))}", f"**Submitter:** {motion.get('submitter_name', 'N/A')}", f"**Status:** {motion.get('status', 'N/A')}", ] if motion.get('text'): lines.extend(["\n### Motion Text", motion['text'][:500] + ("..." if len(motion['text']) > 500 else "")]) if motion.get('response'): lines.extend(["\n### Government Response", motion['response'][:500] + ("..." if len(motion['response']) > 500 else "")]) return "\n".join(lines) # Mock API functions (replace with actual API calls when API is accessible) async def make_api_request(endpoint: str, params: Dict[str, Any]) -> Dict[str, Any]: """ Make an API request to OpenParlData. Now using the real OpenParlData API! """ # Clean up params - remove None values clean_params = {k: v for k, v in params.items() if v is not None} # Ensure endpoint has trailing slash (API requires it) if not endpoint.endswith('/'): endpoint = endpoint + '/' try: async with httpx.AsyncClient(timeout=30.0, follow_redirects=True) as client: response = await client.get(f"{API_BASE_URL}{endpoint}", params=clean_params) response.raise_for_status() return response.json() except httpx.HTTPError as e: return { "status": "error", "message": f"API request failed: {str(e)}", "endpoint": endpoint, "params": clean_params } except Exception as e: return { "status": "error", "message": f"Unexpected error: {str(e)}", "endpoint": endpoint, "params": clean_params } # Tool implementations @mcp.tool( name="openparldata_search_parliamentarians", annotations={ "title": "Search Parliamentarians", "readOnlyHint": True, "destructiveHint": False, "idempotentHint": True, "openWorldHint": True } ) async def search_parliamentarians(params: SearchParliamentariansInput) -> str: """ Search for parliamentarians across Swiss federal, cantonal, and municipal levels. Returns a list of parliamentarians matching the search criteria, including their party affiliation, canton, and current status. Examples: - Search for all active federal parliamentarians from Zurich - Find all SP party members in cantonal parliaments - Search for a specific person by name """ request_params = { "search": params.query, "active": params.active_only, "lang": params.language.value, "lang_fallback": "de,fr,it", "search_language": params.language.value, "limit": params.limit, "offset": params.offset, "sort_by": "-id" } # Add canton filter if provided if params.canton: request_params["body_key"] = params.canton # Add party filter if provided if params.party: request_params["party"] = params.party # Remove None values request_params = {k: v for k, v in request_params.items() if v is not None} try: result = await make_api_request("/persons", request_params) if params.response_format == ResponseFormat.JSON: # Use ensure_ascii=False to properly handle special characters in text content return truncate_response(json.dumps(result, indent=2, ensure_ascii=False)) # Format as markdown if result.get("status") == "error": return f"# API Error\n\n{result.get('message', 'Unknown error')}\n\n**Endpoint:** {result.get('endpoint', 'N/A')}" # Format actual data from OpenParlData API lines = ["# Parliamentarians Search Results\n"] if isinstance(result, dict) and "data" in result: items = result["data"] meta = result.get("meta", {}) total = meta.get("total_records", len(items)) if not items: return "# No Results\n\nNo parliamentarians found matching your criteria." for person in items: lines.append(f"## {person.get('firstname', '')} {person.get('lastname', '')}") if person.get('party'): lines.append(f"**Party:** {person['party']}") if person.get('body_key'): lines.append(f"**Region:** {person['body_key']}") if person.get('active') is not None: lines.append(f"**Status:** {'Active' if person['active'] else 'Inactive'}") lines.append("\n---\n") lines.append(f"\n**Showing {len(items)} of {total} results**") if meta.get("has_more"): lines.append(f"\nMore results available. Use offset={params.offset + params.limit}") return truncate_response("\n".join(lines)) except Exception as e: return f"Error searching parliamentarians: {str(e)}" @mcp.tool( name="openparldata_get_parliamentarian", annotations={ "title": "Get Parliamentarian Details", "readOnlyHint": True, "destructiveHint": False, "idempotentHint": True, "openWorldHint": True } ) async def get_parliamentarian(params: GetParliamentarianInput) -> str: """ Get detailed information about a specific parliamentarian. Returns comprehensive information including biographical data, party membership, committee assignments, and optionally their voting history and submitted motions. """ expand_relations: list[str] = [] request_params: Dict[str, Any] = { "lang": params.language.value, "lang_fallback": "de,fr,it", "lang_format": "nested", } if params.include_votes: expand_relations.append("votes") request_params["votes_limit"] = 25 if params.include_motions: expand_relations.append("affairs") request_params["affairs_limit"] = 25 if expand_relations: request_params["expand"] = ",".join(expand_relations) try: result = await make_api_request(f"/persons/{params.person_id}", request_params) if params.response_format == ResponseFormat.JSON: # Use ensure_ascii=False to properly handle special characters in text content return truncate_response(json.dumps(result, indent=2, ensure_ascii=False)) if result.get("mock_data"): return f"# OpenParlData API Status\n\n{result['message']}\n\n**Person ID:** {params.person_id}" # Format actual data when available return truncate_response(format_parliamentarian_markdown(result)) except Exception as e: return f"Error getting parliamentarian details: {str(e)}" @mcp.tool( name="openparldata_search_votes", annotations={ "title": "Search Parliamentary Votes", "readOnlyHint": True, "destructiveHint": False, "idempotentHint": True, "openWorldHint": True } ) async def search_votes(params: SearchVotesInput) -> str: """ Search for parliamentary votes across different levels of Swiss government. Returns vote records including titles, dates, results, and vote counts. Can filter by date range, parliament, and vote type. """ request_params = { "search": params.query, "date_from": params.date_from, "date_to": params.date_to, "limit": params.limit, "offset": params.offset, "sort_by": "-date", "lang": params.language.value, "lang_fallback": "de,fr,it", "search_language": params.language.value, "search_mode": "natural", } request_params = {k: v for k, v in request_params.items() if v is not None} try: result = await make_api_request("/votings", request_params) if params.response_format == ResponseFormat.JSON: # Use ensure_ascii=False to properly handle special characters in text content return truncate_response(json.dumps(result, indent=2, ensure_ascii=False)) if result.get("status") == "error": return f"# API Error\n\n{result.get('message', 'Unknown error')}\n\n**Endpoint:** {result.get('endpoint', 'N/A')}" lines = ["# Parliamentary Votes Search Results\n"] if isinstance(result, dict) and "data" in result: items = result["data"] meta = result.get("meta", {}) total = meta.get("total_records", len(items)) if not items: return "# No Results\n\nNo votes found matching your criteria." for voting in items: # Handle multilingual title title_obj = voting.get('title', {}) if isinstance(title_obj, dict): title = title_obj.get('de') or title_obj.get('fr') or title_obj.get('it') or title_obj.get('en', 'Untitled Vote') else: title = title_obj or 'Untitled Vote' lines.append(f"## {title}") if voting.get('date'): lines.append(f"**Date:** {voting['date']}") if voting.get('body_key'): lines.append(f"**Parliament:** {voting['body_key']}") if voting.get('results_yes') is not None: lines.append(f"**Yes:** {voting.get('results_yes', 0)} | **No:** {voting.get('results_no', 0)} | **Abstentions:** {voting.get('results_abstention', 0)}") # Show affair title if available affair_title_obj = voting.get('affair_title', {}) if isinstance(affair_title_obj, dict): affair_title = affair_title_obj.get('de') or affair_title_obj.get('fr') or affair_title_obj.get('it') or affair_title_obj.get('en') if affair_title: lines.append(f"*Related to: {affair_title}*") lines.append("\n---\n") lines.append(f"\n**Showing {len(items)} of {total} results**") if meta.get("has_more"): lines.append(f"\nMore results available.") return truncate_response("\n".join(lines)) except Exception as e: return f"Error searching votes: {str(e)}" @mcp.tool( name="openparldata_get_vote_details", annotations={ "title": "Get Vote Details", "readOnlyHint": True, "destructiveHint": False, "idempotentHint": True, "openWorldHint": True } ) async def get_vote_details(params: GetVoteDetailsInput) -> str: """ Get detailed information about a specific parliamentary vote. Returns comprehensive vote information including the proposal text, voting results, and optionally how each parliamentarian voted. """ request_params = { "include_individual": params.include_individual_votes, "lang": params.language.value } try: result = await make_api_request(f"/votes/{params.vote_id}", request_params) if params.response_format == ResponseFormat.JSON: # Use ensure_ascii=False to properly handle special characters in text content return truncate_response(json.dumps(result, indent=2, ensure_ascii=False)) if result.get("mock_data"): return f"# OpenParlData API Status\n\n{result['message']}\n\n**Vote ID:** {params.vote_id}" return truncate_response(format_vote_markdown(result)) except Exception as e: return f"Error getting vote details: {str(e)}" @mcp.tool( name="openparldata_search_motions", annotations={ "title": "Search Parliamentary Motions", "readOnlyHint": True, "destructiveHint": False, "idempotentHint": True, "openWorldHint": True } ) async def search_motions(params: SearchMotionsInput) -> str: """ Search for parliamentary affairs (motions, postulates, interpellations, and initiatives). Returns affair records including titles, submitters, dates, status, and text. Can filter by submitter, status, and date range. Note: Uses the /affairs endpoint which encompasses all types of parliamentary affairs. """ request_params: Dict[str, Any] = { "search": params.query, "submitter_id": params.submitter_id, "begin_date_from": params.date_from, "begin_date_to": params.date_to, "level": params.level.value if params.level else None, "lang": params.language.value, "lang_fallback": "de,fr,it", "search_language": params.language.value, "limit": params.limit, "offset": params.offset, "sort_by": "-begin_date" } if params.status: status_str = str(params.status) if status_str.isdigit(): request_params["state_external_id"] = status_str else: request_params["state_name"] = status_str request_params = {k: v for k, v in request_params.items() if v is not None} try: result = await make_api_request("/affairs", request_params) if params.response_format == ResponseFormat.JSON: # Use ensure_ascii=False to properly handle special characters in text content return truncate_response(json.dumps(result, indent=2, ensure_ascii=False)) if result.get("mock_data"): return f"# OpenParlData API Status\n\n{result['message']}\n\n**Endpoint:** {result['endpoint']}" lines = ["# Parliamentary Motions Search Results\n"] if result.get("data"): for motion in result["data"]: lines.append(format_motion_markdown(motion)) lines.append("\n---\n") return truncate_response("\n".join(lines)) except Exception as e: return f"Error searching motions: {str(e)}" @mcp.tool( name="openparldata_search_debates", annotations={ "title": "Search Parliamentary Debates", "readOnlyHint": True, "destructiveHint": False, "idempotentHint": True, "openWorldHint": True } ) async def search_debates(params: SearchDebatesInput) -> str: """ Search parliamentary speeches (debate contributions). Returns speech records including speakers, dates, and speech text excerpts. Can search by content, speaker, date range, and topic. Note: Uses the /speeches endpoint which contains individual speech contributions. """ request_params = { # NOTE: "search" parameter causes 500 errors on /speeches endpoint - removed # "search": params.query, "date_from": params.date_from, "date_to": params.date_to, "speaker_id": params.speaker_id, "topic": params.topic, "parliament_id": params.parliament_id, "level": params.level.value if params.level else None, "lang": params.language.value, "lang_fallback": "de,fr,it", # "search_language": params.language.value, # Only used with search # "search_mode": "natural", # Only used with search "expand": "person,affair,meeting", "limit": params.limit, "offset": params.offset } request_params = {k: v for k, v in request_params.items() if v is not None} try: result = await make_api_request("/speeches", request_params) if params.response_format == ResponseFormat.JSON: # Use ensure_ascii=False to properly handle special characters in text content return truncate_response(json.dumps(result, indent=2, ensure_ascii=False)) if result.get("mock_data"): return f"# OpenParlData API Status\n\n{result['message']}\n\n**Endpoint:** {result['endpoint']}" lines = ["# Parliamentary Debates Search Results\n"] if result.get("data"): for debate in result["data"]: lines.extend([ f"## {debate.get('title', 'Debate')}", f"**Date:** {format_date(debate.get('date', ''))}", f"**Parliament:** {debate.get('parliament_name', 'N/A')}", f"**Topic:** {debate.get('topic', 'N/A')}", ]) if debate.get('speakers'): lines.append("\n### Speakers") for speaker in debate['speakers'][:5]: lines.append(f"- {speaker}") if debate.get('excerpt'): lines.extend(["\n### Excerpt", debate['excerpt'][:500] + "..."]) lines.append("\n---\n") return truncate_response("\n".join(lines)) except Exception as e: return f"Error searching debates: {str(e)}" @mcp.tool( name="openparldata_search_meetings", annotations={ "title": "Search Parliamentary Meetings", "readOnlyHint": True, "destructiveHint": False, "idempotentHint": True, "openWorldHint": True } ) async def search_meetings(params: SearchMeetingsInput) -> str: """ Search for parliamentary meetings and sessions. Returns meeting metadata including titles, dates, locations, and related bodies. """ request_params = { "search": params.query, "body_key": params.body_key, "level": params.level.value if params.level else None, "lang": params.language.value, "lang_fallback": "de,fr,it", "limit": params.limit, "offset": params.offset, "sort_by": "-begin_date" } request_params = {k: v for k, v in request_params.items() if v is not None} try: result = await make_api_request("/meetings", request_params) if result.get("status") == "error": return f"# API Error\n\n{result.get('message', 'Unknown error')}\n\n**Endpoint:** {result.get('endpoint', 'N/A')}" data = result.get("data", []) # Optional client-side date filtering since API lacks date filters filtered_data = [] for item in data: begin_date = item.get("begin_date") if begin_date and (params.date_from or params.date_to): try: ts = datetime.fromisoformat(begin_date.replace("Z", "+00:00")) except ValueError: ts = None if ts: if params.date_from: start = datetime.fromisoformat(params.date_from) if ts.date() < start.date(): continue if params.date_to: end = datetime.fromisoformat(params.date_to) if ts.date() > end.date(): continue filtered_data.append(item) # Replace data with filtered list for downstream consumers if filtered_data is not data: result = dict(result) result["data"] = filtered_data meta = dict(result.get("meta", {})) meta["filtered_count"] = len(filtered_data) result["meta"] = meta if params.response_format == ResponseFormat.JSON: return truncate_response(json.dumps(result, indent=2, ensure_ascii=False)) # Markdown formatting lines = ["# Parliamentary Meetings\n"] for meeting in filtered_data: name = meeting.get("name") or {} title = name.get("de") if isinstance(name, dict) else name or "Meeting" lines.append(f"## {title}") if meeting.get("begin_date"): lines.append(f"**Start:** {format_date(meeting['begin_date'])}") if meeting.get("end_date"): lines.append(f"**End:** {format_date(meeting['end_date'])}") if meeting.get("location"): lines.append(f"**Location:** {meeting['location']}") if meeting.get("body_key"): lines.append(f"**Body:** {meeting['body_key']}") if meeting.get("url_external"): url = meeting["url_external"].get("de") if isinstance(meeting["url_external"], dict) else meeting["url_external"] if url: lines.append(f"[External Link]({url})") lines.append("\n---\n") if len(lines) == 1: lines.append("No meetings found for the provided filters.") return truncate_response("\n".join(lines)) except Exception as e: return f"Error searching meetings: {str(e)}" # Main execution if __name__ == "__main__": # Run FastMCP server (synchronous, blocking call) mcp.run()