mashrur950's picture
feat: Initialize FleetMind MCP Server with core functionalities
0a743f9
raw
history blame
30.5 kB
"""
FleetMind Dispatch Coordinator - MCP Server
Industry-standard Model Context Protocol server for delivery dispatch management
Provides 18 AI tools for order and driver management via standardized MCP protocol.
Compatible with Claude Desktop, Continue, Cline, and all MCP clients.
"""
import os
import sys
import json
import logging
from pathlib import Path
from typing import Literal
from datetime import datetime
# Add project root to path
sys.path.insert(0, str(Path(__file__).parent))
from fastmcp import FastMCP
# Import existing services (unchanged)
from chat.geocoding import GeocodingService
from database.connection import execute_query, execute_write, test_connection
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('logs/fleetmind_mcp.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# ============================================================================
# MCP SERVER INITIALIZATION
# ============================================================================
mcp = FastMCP(
name="FleetMind Dispatch Coordinator",
version="1.0.0"
)
# Initialize shared services
logger.info("Initializing FleetMind MCP Server...")
geocoding_service = GeocodingService()
logger.info(f"Geocoding Service: {geocoding_service.get_status()}")
# Test database connection
try:
test_connection()
logger.info("Database: Connected to PostgreSQL")
except Exception as e:
logger.error(f"Database: Connection failed - {e}")
# ============================================================================
# MCP RESOURCES
# ============================================================================
@mcp.resource("orders://all")
def get_orders_resource() -> str:
"""
Real-time orders dataset for AI context.
Returns all orders from the last 30 days.
Returns:
JSON string containing orders array with key fields:
- order_id, customer_name, delivery_address
- status, priority, created_at, assigned_driver_id
"""
try:
query = """
SELECT order_id, customer_name, delivery_address,
status, priority, created_at, assigned_driver_id
FROM orders
WHERE created_at > NOW() - INTERVAL '30 days'
ORDER BY created_at DESC
LIMIT 1000
"""
orders = execute_query(query)
logger.info(f"Resource orders://all - Retrieved {len(orders) if orders else 0} orders")
return json.dumps(orders, default=str, indent=2)
except Exception as e:
logger.error(f"Resource orders://all failed: {e}")
return json.dumps({"error": str(e)})
@mcp.resource("drivers://all")
def get_drivers_resource() -> str:
"""
Real-time drivers dataset for AI context.
Returns all drivers with current locations and status.
Returns:
JSON string containing drivers array with key fields:
- driver_id, name, status, vehicle_type, vehicle_plate
- current_lat, current_lng, last_location_update
"""
try:
query = """
SELECT driver_id, name, status, vehicle_type, vehicle_plate,
current_lat, current_lng, last_location_update
FROM drivers
ORDER BY name ASC
"""
drivers = execute_query(query)
logger.info(f"Resource drivers://all - Retrieved {len(drivers) if drivers else 0} drivers")
return json.dumps(drivers, default=str, indent=2)
except Exception as e:
logger.error(f"Resource drivers://all failed: {e}")
return json.dumps({"error": str(e)})
# ============================================================================
# MCP PROMPTS (Workflows)
# ============================================================================
# TODO: Add prompts once FastMCP prompt API is confirmed
# Prompts will provide guided workflows for:
# - create_order_workflow: Interactive order creation with validation
# - assign_driver_workflow: Smart driver assignment with route optimization
# - order_status_check: Quick order status queries
# ============================================================================
# MCP TOOLS - ORDER CREATION & VALIDATION
# ============================================================================
@mcp.tool()
def geocode_address(address: str) -> dict:
"""
Convert a delivery address to GPS coordinates and validate the address format.
Use this before creating an order to ensure the address is valid.
Args:
address: The full delivery address to geocode (e.g., '123 Main St, San Francisco, CA')
Returns:
dict: {
success: bool,
latitude: float,
longitude: float,
formatted_address: str,
confidence: str (high/medium/low),
message: str
}
"""
from chat.tools import handle_geocode_address
logger.info(f"Tool: geocode_address('{address}')")
return handle_geocode_address({"address": address})
@mcp.tool()
def calculate_route(
origin: str,
destination: str,
mode: Literal["driving", "walking", "bicycling", "transit"] = "driving",
alternatives: bool = False,
include_steps: bool = False
) -> dict:
"""
Calculate the shortest route between two locations, including distance, duration, and turn-by-turn directions.
Supports both addresses and GPS coordinates.
Args:
origin: Starting location - either full address or coordinates as 'lat,lng'
destination: Destination location - either full address or coordinates as 'lat,lng'
mode: Travel mode for route calculation (default: driving)
alternatives: Return multiple route options if available (default: false)
include_steps: Include turn-by-turn navigation steps in response (default: false)
Returns:
dict: {
success: bool,
origin: str,
destination: str,
distance: {meters: int, text: str},
duration: {seconds: int, text: str},
mode: str,
route_summary: str,
confidence: str,
steps: list (if include_steps=True)
}
"""
from chat.tools import handle_calculate_route
logger.info(f"Tool: calculate_route('{origin}' -> '{destination}', mode={mode})")
return handle_calculate_route({
"origin": origin,
"destination": destination,
"mode": mode,
"alternatives": alternatives,
"include_steps": include_steps
})
@mcp.tool()
def create_order(
customer_name: str,
delivery_address: str,
delivery_lat: float,
delivery_lng: float,
customer_phone: str | None = None,
customer_email: str | None = None,
priority: Literal["standard", "express", "urgent"] = "standard",
weight_kg: float = 5.0,
special_instructions: str | None = None,
time_window_end: str | None = None
) -> dict:
"""
Create a new delivery order in the database. Only call this after geocoding the address successfully.
Args:
customer_name: Full name of the customer
delivery_address: Complete delivery address
delivery_lat: Latitude from geocoding
delivery_lng: Longitude from geocoding
customer_phone: Customer phone number (optional)
customer_email: Customer email address (optional)
priority: Delivery priority level (default: standard)
weight_kg: Package weight in kilograms (default: 5.0)
special_instructions: Special delivery instructions (optional)
time_window_end: Delivery deadline in ISO format (default: 6 hours from now)
Returns:
dict: {
success: bool,
order_id: str,
status: str,
customer: str,
address: str,
deadline: str,
priority: str,
message: str
}
"""
from chat.tools import handle_create_order
logger.info(f"Tool: create_order(customer='{customer_name}', address='{delivery_address}')")
return handle_create_order({
"customer_name": customer_name,
"delivery_address": delivery_address,
"delivery_lat": delivery_lat,
"delivery_lng": delivery_lng,
"customer_phone": customer_phone,
"customer_email": customer_email,
"priority": priority,
"weight_kg": weight_kg,
"special_instructions": special_instructions,
"time_window_end": time_window_end
})
# ============================================================================
# MCP TOOLS - ORDER QUERYING
# ============================================================================
@mcp.tool()
def count_orders(
status: Literal["pending", "assigned", "in_transit", "delivered", "failed", "cancelled"] | None = None,
priority: Literal["standard", "express", "urgent"] | None = None,
payment_status: Literal["pending", "paid", "cod"] | None = None,
assigned_driver_id: str | None = None,
is_fragile: bool | None = None,
requires_signature: bool | None = None,
requires_cold_storage: bool | None = None
) -> dict:
"""
Count total orders in the database with optional filters.
Use this when user asks 'how many orders', 'fetch orders', or wants to know order statistics.
Args:
status: Filter by order status (optional)
priority: Filter by priority level (optional)
payment_status: Filter by payment status (optional)
assigned_driver_id: Filter by assigned driver ID (optional)
is_fragile: Filter fragile packages only (optional)
requires_signature: Filter orders requiring signature (optional)
requires_cold_storage: Filter orders requiring cold storage (optional)
Returns:
dict: {
success: bool,
total: int,
status_breakdown: dict,
priority_breakdown: dict,
message: str
}
"""
from chat.tools import handle_count_orders
logger.info(f"Tool: count_orders(status={status}, priority={priority})")
tool_input = {}
if status is not None:
tool_input["status"] = status
if priority is not None:
tool_input["priority"] = priority
if payment_status is not None:
tool_input["payment_status"] = payment_status
if assigned_driver_id is not None:
tool_input["assigned_driver_id"] = assigned_driver_id
if is_fragile is not None:
tool_input["is_fragile"] = is_fragile
if requires_signature is not None:
tool_input["requires_signature"] = requires_signature
if requires_cold_storage is not None:
tool_input["requires_cold_storage"] = requires_cold_storage
return handle_count_orders(tool_input)
@mcp.tool()
def fetch_orders(
limit: int = 10,
offset: int = 0,
status: Literal["pending", "assigned", "in_transit", "delivered", "failed", "cancelled"] | None = None,
priority: Literal["standard", "express", "urgent"] | None = None,
payment_status: Literal["pending", "paid", "cod"] | None = None,
assigned_driver_id: str | None = None,
is_fragile: bool | None = None,
requires_signature: bool | None = None,
requires_cold_storage: bool | None = None,
sort_by: Literal["created_at", "priority", "time_window_start"] = "created_at",
sort_order: Literal["ASC", "DESC"] = "DESC"
) -> dict:
"""
Fetch orders from the database with optional filters, pagination, and sorting.
Use after counting to show specific number of orders.
Args:
limit: Number of orders to fetch (default: 10, max: 100)
offset: Number of orders to skip for pagination (default: 0)
status: Filter by order status (optional)
priority: Filter by priority level (optional)
payment_status: Filter by payment status (optional)
assigned_driver_id: Filter by assigned driver ID (optional)
is_fragile: Filter fragile packages only (optional)
requires_signature: Filter orders requiring signature (optional)
requires_cold_storage: Filter orders requiring cold storage (optional)
sort_by: Field to sort by (default: created_at)
sort_order: Sort order (default: DESC for newest first)
Returns:
dict: {
success: bool,
orders: list[dict],
count: int,
message: str
}
"""
from chat.tools import handle_fetch_orders
logger.info(f"Tool: fetch_orders(limit={limit}, offset={offset}, status={status})")
tool_input = {
"limit": limit,
"offset": offset,
"sort_by": sort_by,
"sort_order": sort_order
}
if status is not None:
tool_input["status"] = status
if priority is not None:
tool_input["priority"] = priority
if payment_status is not None:
tool_input["payment_status"] = payment_status
if assigned_driver_id is not None:
tool_input["assigned_driver_id"] = assigned_driver_id
if is_fragile is not None:
tool_input["is_fragile"] = is_fragile
if requires_signature is not None:
tool_input["requires_signature"] = requires_signature
if requires_cold_storage is not None:
tool_input["requires_cold_storage"] = requires_cold_storage
return handle_fetch_orders(tool_input)
@mcp.tool()
def get_order_details(order_id: str) -> dict:
"""
Get complete details of a specific order by order ID.
Use when user asks 'tell me about order X' or wants detailed information about a specific order.
Args:
order_id: The order ID to fetch details for (e.g., 'ORD-20251114163800')
Returns:
dict: {
success: bool,
order: dict (with all 26 fields),
message: str
}
"""
from chat.tools import handle_get_order_details
logger.info(f"Tool: get_order_details(order_id='{order_id}')")
return handle_get_order_details({"order_id": order_id})
@mcp.tool()
def search_orders(search_term: str) -> dict:
"""
Search for orders by customer name, email, phone, or order ID pattern.
Use when user provides partial information to find orders.
Args:
search_term: Search term to match against customer_name, customer_email, customer_phone, or order_id
Returns:
dict: {
success: bool,
orders: list[dict],
count: int,
message: str
}
"""
from chat.tools import handle_search_orders
logger.info(f"Tool: search_orders(search_term='{search_term}')")
return handle_search_orders({"search_term": search_term})
@mcp.tool()
def get_incomplete_orders(limit: int = 20) -> dict:
"""
Get all orders that are not yet completed (excludes delivered and cancelled orders).
Shortcut for finding orders in progress (pending, assigned, in_transit).
Args:
limit: Number of orders to fetch (default: 20)
Returns:
dict: {
success: bool,
orders: list[dict],
count: int,
message: str
}
"""
from chat.tools import handle_get_incomplete_orders
logger.info(f"Tool: get_incomplete_orders(limit={limit})")
return handle_get_incomplete_orders({"limit": limit})
# ============================================================================
# MCP TOOLS - ORDER MANAGEMENT
# ============================================================================
@mcp.tool()
def update_order(
order_id: str,
customer_name: str | None = None,
customer_phone: str | None = None,
customer_email: str | None = None,
delivery_address: str | None = None,
delivery_lat: float | None = None,
delivery_lng: float | None = None,
status: Literal["pending", "assigned", "in_transit", "delivered", "failed", "cancelled"] | None = None,
priority: Literal["standard", "express", "urgent"] | None = None,
special_instructions: str | None = None,
time_window_end: str | None = None,
payment_status: Literal["pending", "paid", "cod"] | None = None,
weight_kg: float | None = None,
order_value: float | None = None
) -> dict:
"""
Update an existing order's details. You can update any combination of fields.
Only provide the fields you want to change. Auto-geocodes if delivery_address updated without coordinates.
Args:
order_id: Order ID to update (e.g., 'ORD-20250114123456')
customer_name: Updated customer name (optional)
customer_phone: Updated customer phone number (optional)
customer_email: Updated customer email address (optional)
delivery_address: Updated delivery address (optional)
delivery_lat: Updated delivery latitude (required if updating address) (optional)
delivery_lng: Updated delivery longitude (required if updating address) (optional)
status: Updated order status (optional)
priority: Updated priority level (optional)
special_instructions: Updated special delivery instructions (optional)
time_window_end: Updated delivery deadline (ISO format datetime) (optional)
payment_status: Updated payment status (optional)
weight_kg: Updated package weight in kilograms (optional)
order_value: Updated order value in currency (optional)
Returns:
dict: {
success: bool,
order_id: str,
updated_fields: list[str],
message: str
}
"""
from chat.tools import handle_update_order
logger.info(f"Tool: update_order(order_id='{order_id}')")
tool_input = {"order_id": order_id}
if customer_name is not None:
tool_input["customer_name"] = customer_name
if customer_phone is not None:
tool_input["customer_phone"] = customer_phone
if customer_email is not None:
tool_input["customer_email"] = customer_email
if delivery_address is not None:
tool_input["delivery_address"] = delivery_address
if delivery_lat is not None:
tool_input["delivery_lat"] = delivery_lat
if delivery_lng is not None:
tool_input["delivery_lng"] = delivery_lng
if status is not None:
tool_input["status"] = status
if priority is not None:
tool_input["priority"] = priority
if special_instructions is not None:
tool_input["special_instructions"] = special_instructions
if time_window_end is not None:
tool_input["time_window_end"] = time_window_end
if payment_status is not None:
tool_input["payment_status"] = payment_status
if weight_kg is not None:
tool_input["weight_kg"] = weight_kg
if order_value is not None:
tool_input["order_value"] = order_value
return handle_update_order(tool_input)
@mcp.tool()
def delete_order(order_id: str, confirm: bool) -> dict:
"""
Permanently delete an order from the database. This action cannot be undone. Use with caution.
Args:
order_id: Order ID to delete (e.g., 'ORD-20250114123456')
confirm: Must be set to true to confirm deletion
Returns:
dict: {
success: bool,
order_id: str,
message: str
}
"""
from chat.tools import handle_delete_order
logger.info(f"Tool: delete_order(order_id='{order_id}', confirm={confirm})")
return handle_delete_order({"order_id": order_id, "confirm": confirm})
# ============================================================================
# MCP TOOLS - DRIVER CREATION
# ============================================================================
@mcp.tool()
def create_driver(
name: str,
phone: str | None = None,
email: str | None = None,
vehicle_type: str = "van",
vehicle_plate: str | None = None,
capacity_kg: float = 1000.0,
capacity_m3: float = 12.0,
skills: list[str] | None = None,
status: Literal["active", "busy", "offline", "unavailable"] = "active"
) -> dict:
"""
Create a new delivery driver in the database. Use this to onboard new drivers to the fleet.
Args:
name: Full name of the driver
phone: Driver phone number (optional)
email: Driver email address (optional)
vehicle_type: Type of vehicle: van, truck, car, motorcycle (default: van)
vehicle_plate: Vehicle license plate number (optional)
capacity_kg: Vehicle cargo capacity in kilograms (default: 1000.0)
capacity_m3: Vehicle cargo volume in cubic meters (default: 12.0)
skills: List of driver skills/certifications: refrigerated, medical_certified, fragile_handler, overnight, express_delivery (optional)
status: Driver status (default: active)
Returns:
dict: {
success: bool,
driver_id: str,
name: str,
status: str,
vehicle_type: str,
vehicle_plate: str,
capacity_kg: float,
skills: list[str],
message: str
}
"""
from chat.tools import handle_create_driver
logger.info(f"Tool: create_driver(name='{name}', vehicle_type='{vehicle_type}')")
return handle_create_driver({
"name": name,
"phone": phone,
"email": email,
"vehicle_type": vehicle_type,
"vehicle_plate": vehicle_plate,
"capacity_kg": capacity_kg,
"capacity_m3": capacity_m3,
"skills": skills or [],
"status": status
})
# ============================================================================
# MCP TOOLS - DRIVER QUERYING
# ============================================================================
@mcp.tool()
def count_drivers(
status: Literal["active", "busy", "offline", "unavailable"] | None = None,
vehicle_type: str | None = None
) -> dict:
"""
Count total drivers in the database with optional filters.
Use this when user asks 'how many drivers', 'show drivers', or wants driver statistics.
Args:
status: Filter by driver status (optional)
vehicle_type: Filter by vehicle type: van, truck, car, motorcycle, etc. (optional)
Returns:
dict: {
success: bool,
total: int,
status_breakdown: dict,
vehicle_breakdown: dict,
message: str
}
"""
from chat.tools import handle_count_drivers
logger.info(f"Tool: count_drivers(status={status}, vehicle_type={vehicle_type})")
tool_input = {}
if status is not None:
tool_input["status"] = status
if vehicle_type is not None:
tool_input["vehicle_type"] = vehicle_type
return handle_count_drivers(tool_input)
@mcp.tool()
def fetch_drivers(
limit: int = 10,
offset: int = 0,
status: Literal["active", "busy", "offline", "unavailable"] | None = None,
vehicle_type: str | None = None,
sort_by: Literal["name", "status", "created_at", "last_location_update"] = "name",
sort_order: Literal["ASC", "DESC"] = "ASC"
) -> dict:
"""
Fetch drivers from the database with optional filters, pagination, and sorting.
Use after counting to show specific number of drivers.
Args:
limit: Number of drivers to fetch (default: 10, max: 100)
offset: Number of drivers to skip for pagination (default: 0)
status: Filter by driver status (optional)
vehicle_type: Filter by vehicle type: van, truck, car, motorcycle, etc. (optional)
sort_by: Field to sort by (default: name)
sort_order: Sort order (default: ASC for alphabetical)
Returns:
dict: {
success: bool,
drivers: list[dict],
count: int,
message: str
}
"""
from chat.tools import handle_fetch_drivers
logger.info(f"Tool: fetch_drivers(limit={limit}, offset={offset}, status={status})")
tool_input = {
"limit": limit,
"offset": offset,
"sort_by": sort_by,
"sort_order": sort_order
}
if status is not None:
tool_input["status"] = status
if vehicle_type is not None:
tool_input["vehicle_type"] = vehicle_type
return handle_fetch_drivers(tool_input)
@mcp.tool()
def get_driver_details(driver_id: str) -> dict:
"""
Get complete details of a specific driver by driver ID, including current location
(latitude, longitude, and human-readable address via reverse geocoding), contact info,
vehicle details, status, and skills. Use when user asks about a driver's location,
coordinates, position, or any other driver information.
Args:
driver_id: The driver ID to fetch details for (e.g., 'DRV-20251114163800')
Returns:
dict: {
success: bool,
driver: dict (with all fields including reverse-geocoded location address),
message: str
}
"""
from chat.tools import handle_get_driver_details
logger.info(f"Tool: get_driver_details(driver_id='{driver_id}')")
return handle_get_driver_details({"driver_id": driver_id})
@mcp.tool()
def search_drivers(search_term: str) -> dict:
"""
Search for drivers by name, email, phone, vehicle plate, or driver ID pattern.
Use when user provides partial information to find drivers.
Args:
search_term: Search term to match against name, email, phone, vehicle_plate, or driver_id
Returns:
dict: {
success: bool,
drivers: list[dict],
count: int,
message: str
}
"""
from chat.tools import handle_search_drivers
logger.info(f"Tool: search_drivers(search_term='{search_term}')")
return handle_search_drivers({"search_term": search_term})
@mcp.tool()
def get_available_drivers(limit: int = 20) -> dict:
"""
Get all drivers that are available for assignment (active or offline status, excludes busy and unavailable).
Shortcut for finding drivers ready for dispatch.
Args:
limit: Number of drivers to fetch (default: 20)
Returns:
dict: {
success: bool,
drivers: list[dict],
count: int,
message: str
}
"""
from chat.tools import handle_get_available_drivers
logger.info(f"Tool: get_available_drivers(limit={limit})")
return handle_get_available_drivers({"limit": limit})
# ============================================================================
# MCP TOOLS - DRIVER MANAGEMENT
# ============================================================================
@mcp.tool()
def update_driver(
driver_id: str,
name: str | None = None,
phone: str | None = None,
email: str | None = None,
status: Literal["active", "busy", "offline", "unavailable"] | None = None,
vehicle_type: str | None = None,
vehicle_plate: str | None = None,
capacity_kg: float | None = None,
capacity_m3: float | None = None,
skills: list[str] | None = None,
current_lat: float | None = None,
current_lng: float | None = None
) -> dict:
"""
Update an existing driver's details. You can update any combination of fields.
Only provide the fields you want to change. Auto-updates last_location_update if coordinates changed.
Args:
driver_id: Driver ID to update (e.g., 'DRV-20250114123456')
name: Updated driver name (optional)
phone: Updated phone number (optional)
email: Updated email address (optional)
status: Updated driver status (optional)
vehicle_type: Updated vehicle type (optional)
vehicle_plate: Updated vehicle license plate (optional)
capacity_kg: Updated cargo capacity in kilograms (optional)
capacity_m3: Updated cargo capacity in cubic meters (optional)
skills: Updated list of driver skills/certifications (optional)
current_lat: Updated current latitude (optional)
current_lng: Updated current longitude (optional)
Returns:
dict: {
success: bool,
driver_id: str,
updated_fields: list[str],
message: str
}
"""
from chat.tools import handle_update_driver
logger.info(f"Tool: update_driver(driver_id='{driver_id}')")
tool_input = {"driver_id": driver_id}
if name is not None:
tool_input["name"] = name
if phone is not None:
tool_input["phone"] = phone
if email is not None:
tool_input["email"] = email
if status is not None:
tool_input["status"] = status
if vehicle_type is not None:
tool_input["vehicle_type"] = vehicle_type
if vehicle_plate is not None:
tool_input["vehicle_plate"] = vehicle_plate
if capacity_kg is not None:
tool_input["capacity_kg"] = capacity_kg
if capacity_m3 is not None:
tool_input["capacity_m3"] = capacity_m3
if skills is not None:
tool_input["skills"] = skills
if current_lat is not None:
tool_input["current_lat"] = current_lat
if current_lng is not None:
tool_input["current_lng"] = current_lng
return handle_update_driver(tool_input)
@mcp.tool()
def delete_driver(driver_id: str, confirm: bool) -> dict:
"""
Permanently delete a driver from the database. This action cannot be undone. Use with caution.
Args:
driver_id: Driver ID to delete (e.g., 'DRV-20250114123456')
confirm: Must be set to true to confirm deletion
Returns:
dict: {
success: bool,
driver_id: str,
message: str
}
"""
from chat.tools import handle_delete_driver
logger.info(f"Tool: delete_driver(driver_id='{driver_id}', confirm={confirm})")
return handle_delete_driver({"driver_id": driver_id, "confirm": confirm})
# ============================================================================
# MAIN ENTRY POINT
# ============================================================================
if __name__ == "__main__":
logger.info("=" * 60)
logger.info("FleetMind MCP Server v1.0.0")
logger.info("=" * 60)
logger.info(f"Geocoding: {geocoding_service.get_status()}")
logger.info("Tools: 18 tools registered")
logger.info("Resources: 2 resources available")
logger.info("Prompts: 3 workflow templates")
logger.info("Starting MCP server...")
mcp.run()