| from typing import List, Tuple |
| import requests |
| import time |
| import json |
| import re |
| import ast |
| import gradio as gr |
| |
| from scrape_and_format_hf_mcp_servers import scrape_and_format_hf_mcp_servers, SORT_OPTIONS |
|
|
| |
|
|
|
|
| def parse_huggingface_url(url: str) -> str: |
| """ |
| Parse various Hugging Face URL formats and extract space info. |
| |
| Args: |
| url (str): Can be any HF Space URL format: |
| - https://huggingface.co/spaces/{username}/{space-name} |
| - https://{username}-{space-name}.hf.space |
| - https://{username}-{space-name}.hf.space/gradio_api/mcp/sse |
| |
| Returns: |
| str: JSON string containing parsed URLs and metadata |
| """ |
| url = url.strip().rstrip('/') |
| |
| |
| if '/gradio_api/mcp/sse' in url: |
| base_url = url.replace('/gradio_api/mcp/sse', '') |
| space_url = base_url |
| mcp_endpoint = url |
| |
| |
| if '.hf.space' in base_url: |
| domain_match = re.search(r'https://([^.]+)\.hf\.space', base_url) |
| if domain_match: |
| space_identifier = domain_match.group(1) |
| |
| if '-' in space_identifier: |
| parts = space_identifier.split('-', 1) |
| username, space_name = parts[0], parts[1] |
| hf_spaces_url = f"https://huggingface.co/spaces/{username}/{space_name}" |
| else: |
| hf_spaces_url = "unknown" |
| else: |
| hf_spaces_url = "unknown" |
| else: |
| hf_spaces_url = "unknown" |
| |
| |
| elif '.hf.space' in url: |
| space_url = url |
| mcp_endpoint = f"{url}/gradio_api/mcp/sse" |
| |
| |
| domain_match = re.search(r'https://([^.]+)\.hf\.space', url) |
| if domain_match: |
| space_identifier = domain_match.group(1) |
| |
| if '-' in space_identifier: |
| parts = space_identifier.split('-', 1) |
| username, space_name = parts[0], parts[1] |
| hf_spaces_url = f"https://huggingface.co/spaces/{username}/{space_name}" |
| else: |
| hf_spaces_url = "unknown" |
| else: |
| hf_spaces_url = "unknown" |
| |
| |
| elif 'huggingface.co/spaces/' in url: |
| hf_spaces_url = url |
| |
| |
| spaces_match = re.search(r'huggingface\.co/spaces/([^/]+)/([^/?]+)', url) |
| if spaces_match: |
| username, space_name = spaces_match.groups() |
| space_url = f"https://{username}-{space_name}.hf.space" |
| mcp_endpoint = f"{space_url}/gradio_api/mcp/sse" |
| else: |
| space_url = "unknown" |
| mcp_endpoint = "unknown" |
| |
| |
| else: |
| |
| space_url = url |
| mcp_endpoint = f"{url}/gradio_api/mcp/sse" |
| hf_spaces_url = "unknown" |
| |
| result = { |
| "original_url": url, |
| "hf_spaces_url": hf_spaces_url, |
| "space_url": space_url, |
| "mcp_endpoint": mcp_endpoint, |
| "is_valid": mcp_endpoint != "unknown" |
| } |
| |
| return json.dumps(result, indent=2) |
|
|
| def parse_huggingface_url_with_summary(url: str) -> tuple: |
| """Parse URL and return both markdown summary and JSON.""" |
| |
| if not url.strip(): |
| return "# ❌ No URL Provided\n\nPlease enter a URL to parse.", "{}" |
| |
| json_result = parse_huggingface_url(url) |
| |
| parsed_info = json.loads(json_result) |
| md_summary = format_url_summary(parsed_info) |
| |
| return md_summary, json_result |
|
|
| def format_url_summary(parsed_info: dict) -> str: |
| """Generate markdown summary for URL parsing results.""" |
| md = f"# 🔍 URL Parser Results\n\n" |
| md += f"**Original URL:** [{parsed_info['original_url']}]({parsed_info['original_url']})\n\n" |
| |
| if parsed_info['is_valid']: |
| md += "✅ **Status:** Valid URL format\n\n" |
| md += "## 📋 Extracted URLs\n\n" |
| if parsed_info['hf_spaces_url'] != "unknown": |
| md += f"- **HF Spaces URL:** [{parsed_info['hf_spaces_url']}]({parsed_info['hf_spaces_url']})\n" |
| if parsed_info['space_url'] != "unknown": |
| md += f"- **Space URL:** [{parsed_info['space_url']}]({parsed_info['space_url']})\n" |
| if parsed_info['mcp_endpoint'] != "unknown": |
| md += f"- **MCP Endpoint:** [{parsed_info['mcp_endpoint']}]({parsed_info['mcp_endpoint']})\n\n" |
| |
| md += "## ⚙️ MCP Client Configuration\n\n" |
| md += "Copy this configuration for your MCP client:\n\n" |
| md += "```json\n" |
| md += "{\n" |
| md += ' "mcpServers": {\n' |
| md += ' "gradio_server": {\n' |
| md += f' "url": "{parsed_info["mcp_endpoint"]}"\n' |
| md += ' }\n' |
| md += ' }\n' |
| md += "}\n" |
| md += "```\n" |
| else: |
| md += "❌ **Status:** Invalid URL format\n\n" |
| md += "Could not parse the provided URL. Please check the format.\n" |
| |
| return md |
|
|
| def check_single_server_health(url: str) -> tuple: |
| """ |
| Check health of a single MCP server from any URL format. |
| |
| Args: |
| url (str): Any supported HF Space URL format |
| |
| Returns: |
| tuple: (markdown_summary, json_data) |
| """ |
| |
| if not url.strip(): |
| return "# ❌ No URL Provided\n\nPlease enter a URL to check.", "{}" |
| |
| parsed_info = json.loads(parse_huggingface_url(url)) |
| |
| if not parsed_info["is_valid"]: |
| result = { |
| "original_url": url, |
| "status": "invalid_url", |
| "error": "Could not parse URL format", |
| "parsed_info": parsed_info |
| } |
| md = "# ❌ Health Check Failed\n\nCould not parse URL format. Please check the URL." |
| return md, json.dumps(result, indent=2) |
| |
| results = { |
| "original_url": url, |
| "parsed_info": parsed_info, |
| "space_health": None, |
| "mcp_health": None, |
| "overall_status": "unknown" |
| } |
| |
| |
| if parsed_info["space_url"] != "unknown": |
| start_time = time.time() |
| try: |
| response = requests.get(parsed_info["space_url"], timeout=8) |
| response_time = round((time.time() - start_time) * 1000, 2) |
| |
| results["space_health"] = { |
| "url": parsed_info["space_url"], |
| "status_code": response.status_code, |
| "response_time_ms": response_time, |
| "accessible": response.status_code == 200 |
| } |
| except Exception as e: |
| response_time = round((time.time() - start_time) * 1000, 2) |
| results["space_health"] = { |
| "url": parsed_info["space_url"], |
| "status_code": None, |
| "response_time_ms": response_time, |
| "accessible": False, |
| "error": str(e) |
| } |
| |
| |
| start_time = time.time() |
| try: |
| response = requests.get(parsed_info["mcp_endpoint"], timeout=8, stream=True) |
| response_time = round((time.time() - start_time) * 1000, 2) |
| |
| results["mcp_health"] = { |
| "url": parsed_info["mcp_endpoint"], |
| "status_code": response.status_code, |
| "response_time_ms": response_time, |
| "accessible": response.status_code == 200 |
| } |
| except Exception as e: |
| response_time = round((time.time() - start_time) * 1000, 2) |
| results["mcp_health"] = { |
| "url": parsed_info["mcp_endpoint"], |
| "status_code": None, |
| "response_time_ms": response_time, |
| "accessible": False, |
| "error": str(e) |
| } |
| |
| |
| space_ok = results["space_health"] is None or results["space_health"]["accessible"] |
| mcp_ok = results["mcp_health"]["accessible"] |
| |
| if mcp_ok and space_ok: |
| results["overall_status"] = "healthy" |
| elif mcp_ok: |
| results["overall_status"] = "mcp_only" |
| elif space_ok: |
| results["overall_status"] = "space_only" |
| else: |
| results["overall_status"] = "unreachable" |
| |
| |
| md = format_health_summary(results) |
| |
| return md, json.dumps(results, indent=2) |
|
|
| def format_health_summary(results: dict) -> str: |
| """Generate markdown summary for health check results.""" |
| status_icons = { |
| "healthy": "🟢", |
| "mcp_only": "🟡", |
| "space_only": "🟠", |
| "unreachable": "🔴" |
| } |
| |
| icon = status_icons.get(results["overall_status"], "❓") |
| md = f"# {icon} Server Health Report\n\n" |
| |
| md += f"**Overall Status:** {results['overall_status'].replace('_', ' ').title()}\n\n" |
| |
| |
| if results["space_health"]: |
| sh = results["space_health"] |
| status_icon = "✅" if sh["accessible"] else "❌" |
| md += f"## 🌐 Space Health {status_icon}\n\n" |
| md += f"- **URL:** [{sh['url']}]({sh['url']})\n" |
| md += f"- **Status Code:** {sh.get('status_code', 'N/A')}\n" |
| md += f"- **Response Time:** {sh['response_time_ms']}ms\n" |
| if "error" in sh: |
| md += f"- **Error:** {sh['error']}\n" |
| md += "\n" |
| |
| |
| mh = results["mcp_health"] |
| status_icon = "✅" if mh["accessible"] else "❌" |
| md += f"## 🔧 MCP Endpoint Health {status_icon}\n\n" |
| md += f"- **URL:** [{mh['url']}]({mh['url']})\n" |
| md += f"- **Status Code:** {mh.get('status_code', 'N/A')}\n" |
| md += f"- **Response Time:** {mh['response_time_ms']}ms\n" |
| if "error" in mh: |
| md += f"- **Error:** {mh['error']}\n" |
| |
| if mh["accessible"]: |
| md += "\n## ⚙️ MCP Client Configuration\n\n" |
| md += "Add this to your MCP client config:\n\n" |
| md += "```json\n" |
| md += "{\n" |
| md += ' "mcpServers": {\n' |
| md += ' "gradio_server": {\n' |
| md += f' "url": "{mh["url"]}"\n' |
| md += ' }\n' |
| md += ' }\n' |
| md += "}\n" |
| md += "```\n" |
| |
| return md |
|
|
| def extract_functions_from_source(source_code: str) -> List[Tuple[str, str, List[str]]]: |
| """ |
| Extract function definitions, docstrings, and parameters from Python source code using AST. |
| |
| Args: |
| source_code (str): Python source code to analyze |
| |
| Returns: |
| List[Tuple[str, str, List[str]]]: List of (function_name, docstring, parameters) |
| """ |
| functions = [] |
| |
| try: |
| tree = ast.parse(source_code) |
| |
| for node in ast.walk(tree): |
| if isinstance(node, ast.FunctionDef): |
| func_name = node.name |
| docstring = ast.get_docstring(node) or "No docstring available" |
| |
| |
| parameters = [] |
| for arg in node.args.args: |
| parameters.append(arg.arg) |
| |
| functions.append((func_name, docstring, parameters)) |
| |
| except Exception as e: |
| |
| pass |
| |
| return functions |
|
|
| def discover_server_tools(url: str) -> tuple: |
| """ |
| Discover available MCP tools from a server. |
| |
| Args: |
| url (str): Any supported HF Space URL format to discover tools from |
| |
| Returns: |
| tuple: (markdown_summary, json_data) |
| """ |
| |
| if not url.strip(): |
| return "# ❌ No URL Provided\n\nPlease enter a URL to discover tools.", "{}" |
| |
| parsed_info = json.loads(parse_huggingface_url(url)) |
| |
| if not parsed_info["is_valid"]: |
| result = { |
| "original_url": url, |
| "status": "invalid_url", |
| "error": "Could not parse URL format" |
| } |
| md = "# ❌ Tools Discovery Failed\n\nCould not parse URL format." |
| return md, json.dumps(result, indent=2) |
| |
| tools = [] |
| discovery_methods = [] |
| |
| |
| try: |
| |
| if parsed_info["hf_spaces_url"] != "unknown": |
| app_url = f"{parsed_info['hf_spaces_url']}/raw/main/app.py" |
| response = requests.get(app_url, timeout=10) |
| if response.status_code == 200: |
| functions = extract_functions_from_source(response.text) |
| for func_name, docstring, params in functions: |
| tools.append({ |
| "name": func_name, |
| "description": docstring, |
| "parameters": params, |
| "source": "app.py_analysis" |
| }) |
| discovery_methods.append("Analyzed app.py source code") |
| except Exception as e: |
| discovery_methods.append(f"Failed to analyze app.py: {str(e)}") |
| |
| |
| result = { |
| "original_url": url, |
| "status": "success" if tools else "no_tools_found", |
| "tools": tools, |
| "tool_count": len(tools), |
| "tool_names": [tool["name"] for tool in tools], |
| "mcp_endpoint": parsed_info["mcp_endpoint"], |
| "discovery_methods": discovery_methods |
| } |
| |
| if not tools: |
| result["message"] = "No tools discovered. Server may not expose MCP tools or may be private." |
| |
| |
| md = format_tools_summary(result) |
| |
| return md, json.dumps(result, indent=2) |
|
|
| def format_tools_summary(result: dict) -> str: |
| """Generate markdown summary for tools discovery results.""" |
| md = f"# 🔧 Tools Discovery Report\n\n" |
| |
| if result["status"] == "success": |
| md += f"✅ **Status:** Found {result['tool_count']} tools\n\n" |
| |
| md += "## 🛠️ Available Tools\n\n" |
| for i, tool in enumerate(result["tools"], 1): |
| md += f"### {i}. {tool['name']}\n" |
| md += f"**Description:** {tool['description'][:200]}{'...' if len(tool['description']) > 200 else ''}\n" |
| md += f"**Parameters:** {', '.join(tool['parameters'])}\n\n" |
| |
| else: |
| md += "❌ **Status:** No tools found\n\n" |
| md += "This could mean:\n" |
| md += "- The server doesn't expose MCP tools\n" |
| md += "- The server is private or requires authentication\n" |
| md += "- The server is not running\n\n" |
| |
| if result.get("discovery_methods"): |
| md += "## 🔍 Discovery Methods Used\n\n" |
| for method in result["discovery_methods"]: |
| md += f"- {method}\n" |
| |
| return md |
|
|
| def monitor_multiple_servers(urls_text: str) -> tuple: |
| """ |
| Monitor health and tools of multiple MCP servers simultaneously. |
| |
| Args: |
| urls_text (str): Newline-separated list of URLs to monitor |
| |
| Returns: |
| tuple: (markdown_summary, json_data) |
| """ |
| |
| if not urls_text.strip(): |
| result = { |
| "error": "No URLs provided", |
| "servers": [], |
| "total_servers": 0 |
| } |
| md = "# ❌ No URLs Provided\n\nPlease enter URLs to monitor." |
| return md, json.dumps(result, indent=2) |
| |
| urls = [url.strip() for url in urls_text.strip().split('\n') if url.strip()] |
| |
| if not urls: |
| result = { |
| "error": "No valid URLs found", |
| "servers": [], |
| "total_servers": 0 } |
| md = "# ❌ No Valid URLs\n\nPlease check the URL format." |
| return md, json.dumps(result, indent=2) |
| |
| results = [] |
| |
| for i, url in enumerate(urls, 1): |
| print(f"🔍 Checking server {i}/{len(urls)}: {url}") |
| |
| try: |
| _, health_json = check_single_server_health(url) |
| health_data = json.loads(health_json) |
| |
| _, tools_json = discover_server_tools(url) |
| tools_data = json.loads(tools_json) |
| |
| server_result = { |
| "url": url, |
| "health": health_data, |
| "tools": tools_data, |
| "combined_status": health_data.get("overall_status", "unknown") |
| } |
| results.append(server_result) |
| |
| except Exception as e: |
| print(f"❌ Error checking {url}: {str(e)}") |
| results.append({ |
| "url": url, |
| "health": {"error": str(e)}, |
| "tools": {"error": str(e)}, |
| "combined_status": "error" |
| }) |
| |
| final_result = { |
| "servers": results, |
| "total_servers": len(urls), |
| "timestamp": time.strftime("%Y-%m-%d %H:%M:%S") |
| } |
| |
| |
| md = format_multiple_servers_summary(final_result) |
| |
| return md, json.dumps(final_result, indent=2) |
|
|
| def format_multiple_servers_summary(result: dict) -> str: |
| """Generate markdown summary for multiple servers monitoring.""" |
| md = f"# 📊 Multiple Servers Monitor Report\n\n" |
| md += f"**Total Servers:** {result['total_servers']}\n" |
| md += f"**Timestamp:** {result['timestamp']}\n\n" |
| |
| healthy_count = 0 |
| total_tools = 0 |
| |
| for i, server in enumerate(result["servers"], 1): |
| status = server.get("combined_status", "unknown") |
| if status == "healthy": |
| healthy_count += 1 |
| |
| tools_count = server.get("tools", {}).get("tool_count", 0) |
| total_tools += tools_count |
| |
| status_icon = "🟢" if status == "healthy" else "🔴" |
| md += f"## {status_icon} Server {i}\n\n" |
| md += f"**URL:** [{server['url']}]({server['url']})\n" |
| md += f"**Status:** {status.replace('_', ' ').title()}\n" |
| md += f"**Tools Found:** {tools_count}\n\n" |
| |
| |
| md += "## 📈 Summary\n\n" |
| md += f"- **Healthy Servers:** {healthy_count}/{result['total_servers']}\n" |
| md += f"- **Total Tools Available:** {total_tools}\n" |
| |
| if healthy_count > 0: |
| md += f"- **Success Rate:** {round(healthy_count/result['total_servers']*100, 1)}%\n" |
| |
| return md |
|
|
| def validate_mcp_endpoint(url: str) -> tuple: |
| """ |
| Validate that a URL is a working MCP endpoint by checking its schema. |
| |
| Args: |
| url (str): URL to validate as MCP endpoint (can be space URL or direct MCP endpoint) |
| |
| Returns: |
| tuple: (markdown_summary, json_data) |
| """ |
| |
| if not url.strip(): |
| return "# ❌ No URL Provided\n\nPlease enter a URL to validate.", "{}" |
| |
| parsed_info = json.loads(parse_huggingface_url(url)) |
| |
| validation_result = { |
| "original_url": url, |
| "is_valid_mcp": False, |
| "mcp_endpoint_url": parsed_info.get("mcp_endpoint"), |
| "mcp_schema_url": None, |
| "connection_config": None, |
| "error": None, |
| "schema_details": None |
| } |
| |
| if not parsed_info["is_valid"] or validation_result["mcp_endpoint_url"] == "unknown": |
| validation_result["error"] = "Invalid URL format or could not determine MCP endpoint." |
| md = f"# ❌ Invalid URL\n\nCould not parse the provided URL format to find an MCP endpoint: `{url}`" |
| return md, json.dumps(validation_result, indent=2) |
|
|
| mcp_endpoint = validation_result["mcp_endpoint_url"] |
| |
| |
| |
| if mcp_endpoint.endswith("/sse"): |
| mcp_schema_url = mcp_endpoint[:-4] + "/schema" |
| validation_result["mcp_schema_url"] = mcp_schema_url |
| else: |
| |
| validation_result["error"] = f"MCP endpoint does not end with /sse, cannot determine schema URL: {mcp_endpoint}" |
| md = f"# ⚠️ MCP Validation Warning\n\nCould not determine schema URL from MCP endpoint: `{mcp_endpoint}`. Validation might be incomplete." |
| return md, json.dumps(validation_result, indent=2) |
|
|
| print(f"ℹ️ Validating MCP: Original URL='{url}', Endpoint='{mcp_endpoint}', Schema='{mcp_schema_url}'") |
|
|
| |
| try: |
| headers = {'User-Agent': 'MCP-Validator/1.0'} |
| response = requests.get(mcp_schema_url, timeout=10, headers=headers) |
| |
| validation_result["schema_http_status"] = response.status_code |
|
|
| if response.status_code == 200: |
| try: |
| schema_data = response.json() |
| validation_result["is_valid_mcp"] = True |
| validation_result["connection_config"] = { |
| "mcpServers": { |
| "gradio_server": { |
| "url": mcp_endpoint |
| } |
| } |
| } |
| |
| if isinstance(schema_data, dict) and "tools" in schema_data: |
| validation_result["schema_details"] = { |
| "tool_count": len(schema_data["tools"]), |
| "tool_names": [tool.get("name") for tool in schema_data["tools"]] |
| } |
| elif isinstance(schema_data, list): |
| validation_result["schema_details"] = { |
| "tool_count": len(schema_data), |
| "tool_names": [tool.get("name") for tool in schema_data] |
| } |
| else: |
| validation_result["schema_details"] = "Schema format not recognized or no tools found." |
| print(f"✅ MCP Schema valid for {mcp_schema_url}") |
|
|
| except json.JSONDecodeError: |
| validation_result["error"] = "Schema endpoint returned 200 OK, but response is not valid JSON." |
| print(f"❌ MCP Schema JSON decode error for {mcp_schema_url}") |
| except Exception as e_json: |
| validation_result["error"] = f"Schema endpoint returned 200 OK, but error processing JSON: {str(e_json)}" |
| print(f"❌ MCP Schema JSON processing error for {mcp_schema_url}: {str(e_json)}") |
| elif response.status_code == 401 or response.status_code == 403: |
| validation_result["error"] = f"Schema endpoint access denied (HTTP {response.status_code}). Private space may require auth token." |
| print(f"⚠️ MCP Schema access denied for {mcp_schema_url} (HTTP {response.status_code})") |
| else: |
| validation_result["error"] = f"Schema endpoint returned HTTP {response.status_code}." |
| print(f"❌ MCP Schema request failed for {mcp_schema_url} (HTTP {response.status_code})") |
| |
| except requests.exceptions.Timeout: |
| validation_result["error"] = f"Request to schema endpoint timed out: {mcp_schema_url}" |
| print(f"❌ MCP Schema request timeout for {mcp_schema_url}") |
| except requests.exceptions.RequestException as e: |
| validation_result["error"] = f"Request to schema endpoint failed: {str(e)}" |
| print(f"❌ MCP Schema request failed for {mcp_schema_url}: {str(e)}") |
| except Exception as e_gen: |
| validation_result["error"] = f"An unexpected error occurred during validation: {str(e_gen)}" |
| print(f"❌ Unexpected error during MCP validation for {mcp_schema_url}: {str(e_gen)}") |
|
|
| |
| md = format_validation_summary(validation_result) |
| |
| return md, json.dumps(validation_result, indent=2) |
|
|
| def format_validation_summary(result: dict) -> str: |
| """Generate markdown summary for MCP validation results.""" |
| md = f"# ✅ MCP Endpoint Validation\n\n" |
| md += f"**Original URL:** [{result['original_url']}]({result['original_url']})\n\n" |
| if result.get('mcp_endpoint_url'): |
| md += f"**Attempted MCP Endpoint:** [{result['mcp_endpoint_url']}]({result['mcp_endpoint_url']})\n\n" |
| if result.get('mcp_schema_url'): |
| md += f"**Attempted MCP Schema URL:** [{result['mcp_schema_url']}]({result['mcp_schema_url']})\n\n\n" |
| |
| if result["is_valid_mcp"]: |
| md += "## ✅ **Status: Valid MCP Endpoint**\n\n" |
| md += "The server appears to be a functional MCP endpoint based on schema accessibility.\n\n" |
| |
| if result.get("schema_details"): |
| md += "### 📋 Schema Details:\n" |
| if isinstance(result["schema_details"], dict): |
| md += f"- **Tools Found:** {result['schema_details'].get('tool_count', 'N/A')}\n\n" |
| if result['schema_details'].get('tool_names'): |
| tool_names = result['schema_details']['tool_names'] |
| md += "- **Tool Names:**\n" |
| for tool_name in tool_names: |
| md += f" - {tool_name}\n" |
| else: |
| md += f"- {result['schema_details']}\n" |
| md += "\n" |
|
|
| md += "### 🔧 Configuration for MCP Client\n\n" |
| md += "You can likely use the following configuration (ensure the key like `gradio_server` is appropriate for your client):\n" |
| md += "```json\n" |
| md += json.dumps(result["connection_config"], indent=2) |
| md += "\n```\n" |
| |
| else: |
| md += "## ❌ **Status: Invalid or Inaccessible MCP Endpoint**\n\n" |
| if result.get("error"): |
| md += f"**Reason:** {result['error']}\n\n" |
| else: |
| md += "Could not confirm MCP functionality.\n\n" |
| |
| md += "### 💡 Troubleshooting Tips:\n" |
| md += "- Ensure the URL is correct and the Hugging Face Space is running.\n" |
| md += "- Verify the Space has `mcp_server=True` in its `launch()` method (if it's a Gradio app).\n" |
| md += "- For private Spaces, your MCP client might need an `Authorization: Bearer <HF_TOKEN>` header.\n" |
| md += "- Check the Space logs for any errors if you own the Space.\n" |
| |
| if result.get("schema_http_status"): |
| md += f"\n**Schema HTTP Status:** {result['schema_http_status']}\n" |
| |
| return md |
|
|
| def scrape_hf_spaces_with_progress(max_pages: int, sort_by: str) -> tuple: |
| """Wrapper function for scraping.""" |
| |
| |
| if sort_by not in SORT_OPTIONS: |
| sort_by = "relevance" |
| |
| |
| |
| |
| md, json_data = scrape_and_format_hf_mcp_servers(max_pages, sort_by) |
| |
| return md, json_data |
|
|
| |
| DEFAULT_URLS = """https://huggingface.co/spaces/NLarchive/MCP-Server-Finder-Monitor |
| https://huggingface.co/spaces/NLarchive/mcp-sentiment""" |
|
|
| |
|
|
| |
| with gr.Blocks(title="🚀 MCP Server Health Monitor") as demo: |
| gr.Markdown("# 🚀 MCP Server Health Monitor") |
| gr.Markdown("Find, Monitor and analyze Hugging Face Spaces configured as MCP servers") |
| |
| with gr.Tabs(): |
| |
| with gr.Tab("🏥 Single Server Health"): |
| gr.Markdown("### Check the health of a single MCP server") |
| |
| with gr.Row(): |
| single_url = gr.Textbox( |
| label="Server URL", |
| placeholder="Enter any HF Space URL format...", |
| value="https://huggingface.co/spaces/NLarchive/MCP-Server-Finder-Monitor" |
| ) |
| check_health_btn = gr.Button("Check Health", variant="primary") |
| |
| health_output = gr.Markdown(label="Health Report") |
| health_json = gr.JSON(label="Detailed Results", visible=False) |
| |
| check_health_btn.click( |
| check_single_server_health, |
| inputs=[single_url], |
| outputs=[health_output, health_json] |
| ) |
| |
| |
| with gr.Tab("🔍 URL Parser"): |
| gr.Markdown("### Parse and validate HuggingFace Space URLs") |
| |
| with gr.Row(): |
| parse_url = gr.Textbox( |
| label="URL to Parse", |
| placeholder="Enter any HF Space URL format...", |
| value="https://huggingface.co/spaces/NLarchive/MCP-Server-Finder-Monitor" |
| ) |
| parse_btn = gr.Button("Parse URL", variant="primary") |
| |
| parse_output = gr.Markdown(label="Parsing Results") |
| parse_json = gr.JSON(label="JSON Output", visible=False) |
| |
| parse_btn.click( |
| parse_huggingface_url_with_summary, |
| inputs=[parse_url], |
| outputs=[parse_output, parse_json] |
| ) |
| |
| |
| with gr.Tab("🛠️ Tools Discovery"): |
| gr.Markdown("### Discover available MCP tools from a server") |
| |
| with gr.Row(): |
| tools_url = gr.Textbox( |
| label="Server URL", |
| placeholder="Enter HF Space URL...", |
| value="https://huggingface.co/spaces/NLarchive/MCP-Server-Finder-Monitor" |
| ) |
| discover_btn = gr.Button("Discover Tools", variant="primary") |
| |
| tools_output = gr.Markdown(label="Tools Report") |
| tools_json = gr.JSON(label="Tools Data", visible=False) |
| |
| discover_btn.click( |
| discover_server_tools, |
| inputs=[tools_url], |
| outputs=[tools_output, tools_json] |
| ) |
| |
| |
| with gr.Tab("📊 Multi-Server Monitor"): |
| gr.Markdown("### Monitor multiple MCP servers simultaneously") |
| |
| multi_urls = gr.Textbox( |
| label="Server URLs (one per line)", |
| placeholder="Enter multiple URLs, one per line...", |
| lines=8, |
| value=DEFAULT_URLS |
| ) |
| monitor_btn = gr.Button("Monitor All Servers", variant="primary") |
| |
| multi_output = gr.Markdown(label="Multi-Server Report") |
| multi_json = gr.JSON(label="Detailed Results", visible=False) |
| |
| monitor_btn.click( |
| monitor_multiple_servers, |
| inputs=[multi_urls], |
| outputs=[multi_output, multi_json] |
| ) |
| |
| |
| with gr.Tab("🕷️ HF Spaces Scraper"): |
| gr.Markdown("### Discover MCP servers on HuggingFace Spaces") |
| gr.Markdown("Scrape HuggingFace to find all spaces tagged with 'mcp-server' using different sorting methods") |
| |
| with gr.Row(): |
| with gr.Column(scale=1): |
| max_pages = gr.Slider( |
| minimum=1, |
| maximum=50, |
| value=1, |
| step=1, |
| label="Maximum Pages to Scrape", |
| info="Each page contains ~24 spaces. Total pages available: ~48+" |
| ) |
| |
| |
| sort_choices = [(SORT_OPTIONS[key]["label"], key) for key in SORT_OPTIONS.keys()] |
| sort_dropdown = gr.Dropdown( |
| choices=sort_choices, |
| value="relevance", |
| label="Sort Method", |
| info="Choose how to sort the search results" |
| ) |
| |
| with gr.Column(scale=1): |
| scrape_btn = gr.Button("🕷️ Scrape HF Spaces", variant="primary", size="lg") |
| |
| |
| with gr.Accordion("ℹ️ Scraping Information", open=False): |
| gr.Markdown(""" |
| **Sort Methods Explained:** |
| |
| - **🎯 Relevance (Default):** HuggingFace's default relevance ranking |
| - **📈 Trending:** Currently popular and active spaces |
| - **❤️ Most Likes:** Spaces with the highest community appreciation |
| - **🆕 Recently Created:** Newest spaces, great for discovering latest tools |
| - **🔄 Recently Updated:** Recently modified spaces, likely actively maintained |
| |
| **Pagination Information:** |
| - Each page contains approximately 24 spaces |
| - Current total: 48+ pages available (and growing!) |
| - The scraper will automatically stop if it encounters 3 consecutive empty pages |
| - Different sort methods may reveal different sets of MCP servers |
| |
| **Tips:** |
| - Start with 5-10 pages for a good sample |
| - Try multiple sort methods for comprehensive discovery |
| - Higher page counts will take longer but find more servers |
| """) |
| |
| scrape_output = gr.Markdown(label="Scraping Results") |
| scrape_json = gr.JSON(label="Scraped Data", visible=False) |
| |
| scrape_btn.click( |
| scrape_hf_spaces_with_progress, |
| inputs=[max_pages, sort_dropdown], |
| outputs=[scrape_output, scrape_json] |
| ) |
| |
| |
| with gr.Tab("✅ MCP Validator"): |
| gr.Markdown("### Validate MCP endpoint connectivity") |
| |
| with gr.Row(): |
| validate_url = gr.Textbox( |
| label="URL to Validate", |
| placeholder="Enter URL to validate as MCP endpoint...", |
| value="https://nlarchive-mcp-server-finder-monitor.hf.space/gradio_api/mcp/sse" |
| ) |
| validate_btn = gr.Button("Validate Endpoint", variant="primary") |
| |
| validate_output = gr.Markdown(label="Validation Results") |
| validate_json = gr.JSON(label="Validation Data", visible=False) |
| |
| validate_btn.click( |
| validate_mcp_endpoint, |
| inputs=[validate_url], |
| outputs=[validate_output, validate_json] |
| ) |
|
|
| if __name__ == "__main__": |
| demo.launch(mcp_server=True) |