Spaces:
Sleeping
Sleeping
File size: 3,291 Bytes
81d39a3 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 |
#!/usr/bin/env python3
"""
Test script for Swiss BFS MCP Server
Demonstrates direct API usage (not MCP protocol)
"""
import asyncio
import httpx
import json
BASE_URL = "https://www.pxweb.bfs.admin.ch/api/v1"
async def test_api():
"""Test the BFS API directly to verify functionality"""
headers = {
"User-Agent": "Mozilla/5.0 (compatible; BFS-Test/1.0)",
"Accept": "application/json",
"Accept-Language": "en,de,fr,it"
}
async with httpx.AsyncClient(timeout=30.0, headers=headers) as client:
print("=" * 60)
print("Swiss BFS API Test")
print("=" * 60)
# 1. Test getting root categories
print("\n1. Getting root categories...")
try:
response = await client.get(f"{BASE_URL}/en")
data = response.json()
print(f"Found {len(data)} main categories")
for item in data[:5]:
if isinstance(item, dict):
print(f" - {item.get('id', 'N/A')}: {item.get('text', 'N/A')}")
except Exception as e:
print(f"Error: {e}")
# 2. Test getting metadata for population datacube
print("\n2. Getting metadata for population datacube...")
datacube_id = "px-x-0102030000_101"
try:
response = await client.get(f"{BASE_URL}/en/{datacube_id}/{datacube_id}.px")
metadata = response.json()
print(f"Datacube: {metadata.get('title', 'N/A')}")
if "variables" in metadata:
print("Variables available:")
for var in metadata["variables"]:
print(f" - {var.get('code', 'N/A')}: {var.get('text', 'N/A')}")
except Exception as e:
print(f"Error: {e}")
# 3. Test querying recent data
print("\n3. Querying recent population data...")
query = {
"query": [
{
"code": "Jahr",
"selection": {
"filter": "top",
"values": ["3"]
}
}
],
"response": {
"format": "json"
}
}
try:
response = await client.post(
f"{BASE_URL}/en/{datacube_id}/{datacube_id}.px",
json=query
)
data = response.json()
print("Successfully retrieved data")
print(f"Response keys: {list(data.keys())}")
except Exception as e:
print(f"Error: {e}")
# 4. Test browsing other categories
print("\n4. Browsing price statistics category...")
try:
response = await client.get(f"{BASE_URL}/en/px-x-05")
data = response.json()
print(f"Found {len(data)} items in price statistics")
for item in data[:3]:
if isinstance(item, dict):
print(f" - {item.get('id', 'N/A')}: {item.get('text', 'N/A')}")
except Exception as e:
print(f"Error: {e}")
print("\n" + "=" * 60)
print("Test completed")
print("=" * 60)
if __name__ == "__main__":
asyncio.run(test_api())
|