Add reverse geocoding functionality and enhance driver details retrieval with location information
18f4b6b
| """ | |
| Tool definitions and execution handlers for FleetMind chat | |
| Simulates MCP tools using Claude's tool calling feature | |
| """ | |
| import sys | |
| from pathlib import Path | |
| from datetime import datetime, timedelta | |
| import logging | |
| # Add parent directory to path | |
| sys.path.insert(0, str(Path(__file__).parent.parent)) | |
| from database.connection import execute_write, execute_query | |
| from chat.geocoding import GeocodingService | |
| logger = logging.getLogger(__name__) | |
| # Initialize geocoding service | |
| geocoding_service = GeocodingService() | |
| # Tool schemas for Claude | |
| TOOLS_SCHEMA = [ | |
| { | |
| "name": "geocode_address", | |
| "description": "Convert a delivery address to GPS coordinates and validate the address format. Use this before creating an order to ensure the address is valid.", | |
| "input_schema": { | |
| "type": "object", | |
| "properties": { | |
| "address": { | |
| "type": "string", | |
| "description": "The full delivery address to geocode (e.g., '123 Main St, San Francisco, CA')" | |
| } | |
| }, | |
| "required": ["address"] | |
| } | |
| }, | |
| { | |
| "name": "create_order", | |
| "description": "Create a new delivery order in the database. Only call this after geocoding the address successfully.", | |
| "input_schema": { | |
| "type": "object", | |
| "properties": { | |
| "customer_name": { | |
| "type": "string", | |
| "description": "Full name of the customer" | |
| }, | |
| "customer_phone": { | |
| "type": "string", | |
| "description": "Customer phone number (optional)" | |
| }, | |
| "customer_email": { | |
| "type": "string", | |
| "description": "Customer email address (optional)" | |
| }, | |
| "delivery_address": { | |
| "type": "string", | |
| "description": "Full delivery address" | |
| }, | |
| "delivery_lat": { | |
| "type": "number", | |
| "description": "Latitude from geocoding" | |
| }, | |
| "delivery_lng": { | |
| "type": "number", | |
| "description": "Longitude from geocoding" | |
| }, | |
| "time_window_end": { | |
| "type": "string", | |
| "description": "Delivery deadline in ISO format (e.g., '2025-11-13T17:00:00'). If not specified by user, default to 6 hours from now." | |
| }, | |
| "priority": { | |
| "type": "string", | |
| "enum": ["standard", "express", "urgent"], | |
| "description": "Delivery priority. Default to 'standard' unless user specifies urgent/express." | |
| }, | |
| "special_instructions": { | |
| "type": "string", | |
| "description": "Any special delivery instructions (optional)" | |
| }, | |
| "weight_kg": { | |
| "type": "number", | |
| "description": "Package weight in kilograms (optional, default to 5.0)" | |
| } | |
| }, | |
| "required": ["customer_name", "delivery_address", "delivery_lat", "delivery_lng"] | |
| } | |
| }, | |
| { | |
| "name": "create_driver", | |
| "description": "Create a new delivery driver in the database. Use this to onboard new drivers to the fleet.", | |
| "input_schema": { | |
| "type": "object", | |
| "properties": { | |
| "name": { | |
| "type": "string", | |
| "description": "Full name of the driver" | |
| }, | |
| "phone": { | |
| "type": "string", | |
| "description": "Driver phone number (optional)" | |
| }, | |
| "email": { | |
| "type": "string", | |
| "description": "Driver email address (optional)" | |
| }, | |
| "vehicle_type": { | |
| "type": "string", | |
| "description": "Type of vehicle: van, truck, car, motorcycle (default: van)" | |
| }, | |
| "vehicle_plate": { | |
| "type": "string", | |
| "description": "Vehicle license plate number (optional)" | |
| }, | |
| "capacity_kg": { | |
| "type": "number", | |
| "description": "Vehicle cargo capacity in kilograms (default: 1000.0)" | |
| }, | |
| "capacity_m3": { | |
| "type": "number", | |
| "description": "Vehicle cargo volume in cubic meters (default: 12.0)" | |
| }, | |
| "skills": { | |
| "type": "array", | |
| "description": "List of driver skills/certifications: refrigerated, medical_certified, fragile_handler, overnight, express_delivery", | |
| "items": { | |
| "type": "string" | |
| } | |
| }, | |
| "status": { | |
| "type": "string", | |
| "enum": ["active", "busy", "offline", "unavailable"], | |
| "description": "Driver status (default: active)" | |
| } | |
| }, | |
| "required": ["name"] | |
| } | |
| }, | |
| { | |
| "name": "count_orders", | |
| "description": "Count total orders in the database with optional filters. Use this when user asks 'how many orders', 'fetch orders', or wants to know order statistics.", | |
| "input_schema": { | |
| "type": "object", | |
| "properties": { | |
| "status": { | |
| "type": "string", | |
| "enum": ["pending", "assigned", "in_transit", "delivered", "failed", "cancelled"], | |
| "description": "Filter by order status (optional)" | |
| }, | |
| "priority": { | |
| "type": "string", | |
| "enum": ["standard", "express", "urgent"], | |
| "description": "Filter by priority level (optional)" | |
| }, | |
| "payment_status": { | |
| "type": "string", | |
| "enum": ["pending", "paid", "cod"], | |
| "description": "Filter by payment status (optional)" | |
| }, | |
| "assigned_driver_id": { | |
| "type": "string", | |
| "description": "Filter by assigned driver ID (optional)" | |
| }, | |
| "is_fragile": { | |
| "type": "boolean", | |
| "description": "Filter fragile packages only (optional)" | |
| }, | |
| "requires_signature": { | |
| "type": "boolean", | |
| "description": "Filter orders requiring signature (optional)" | |
| }, | |
| "requires_cold_storage": { | |
| "type": "boolean", | |
| "description": "Filter orders requiring cold storage (optional)" | |
| } | |
| }, | |
| "required": [] | |
| } | |
| }, | |
| { | |
| "name": "fetch_orders", | |
| "description": "Fetch orders from the database with optional filters, pagination, and sorting. Use after counting to show specific number of orders.", | |
| "input_schema": { | |
| "type": "object", | |
| "properties": { | |
| "limit": { | |
| "type": "integer", | |
| "description": "Number of orders to fetch (default: 10, max: 100)" | |
| }, | |
| "offset": { | |
| "type": "integer", | |
| "description": "Number of orders to skip for pagination (default: 0)" | |
| }, | |
| "status": { | |
| "type": "string", | |
| "enum": ["pending", "assigned", "in_transit", "delivered", "failed", "cancelled"], | |
| "description": "Filter by order status (optional)" | |
| }, | |
| "priority": { | |
| "type": "string", | |
| "enum": ["standard", "express", "urgent"], | |
| "description": "Filter by priority level (optional)" | |
| }, | |
| "payment_status": { | |
| "type": "string", | |
| "enum": ["pending", "paid", "cod"], | |
| "description": "Filter by payment status (optional)" | |
| }, | |
| "assigned_driver_id": { | |
| "type": "string", | |
| "description": "Filter by assigned driver ID (optional)" | |
| }, | |
| "is_fragile": { | |
| "type": "boolean", | |
| "description": "Filter fragile packages only (optional)" | |
| }, | |
| "requires_signature": { | |
| "type": "boolean", | |
| "description": "Filter orders requiring signature (optional)" | |
| }, | |
| "requires_cold_storage": { | |
| "type": "boolean", | |
| "description": "Filter orders requiring cold storage (optional)" | |
| }, | |
| "sort_by": { | |
| "type": "string", | |
| "enum": ["created_at", "priority", "time_window_start"], | |
| "description": "Field to sort by (default: created_at)" | |
| }, | |
| "sort_order": { | |
| "type": "string", | |
| "enum": ["ASC", "DESC"], | |
| "description": "Sort order (default: DESC for newest first)" | |
| } | |
| }, | |
| "required": [] | |
| } | |
| }, | |
| { | |
| "name": "get_order_details", | |
| "description": "Get complete details of a specific order by order ID. Use when user asks 'tell me about order X' or wants detailed information about a specific order.", | |
| "input_schema": { | |
| "type": "object", | |
| "properties": { | |
| "order_id": { | |
| "type": "string", | |
| "description": "The order ID to fetch details for (e.g., 'ORD-20251114163800')" | |
| } | |
| }, | |
| "required": ["order_id"] | |
| } | |
| }, | |
| { | |
| "name": "search_orders", | |
| "description": "Search for orders by customer name, email, phone, or order ID pattern. Use when user provides partial information to find orders.", | |
| "input_schema": { | |
| "type": "object", | |
| "properties": { | |
| "search_term": { | |
| "type": "string", | |
| "description": "Search term to match against customer_name, customer_email, customer_phone, or order_id" | |
| } | |
| }, | |
| "required": ["search_term"] | |
| } | |
| }, | |
| { | |
| "name": "get_incomplete_orders", | |
| "description": "Get all orders that are not yet completed (excludes delivered and cancelled orders). Shortcut for finding orders in progress (pending, assigned, in_transit).", | |
| "input_schema": { | |
| "type": "object", | |
| "properties": { | |
| "limit": { | |
| "type": "integer", | |
| "description": "Number of orders to fetch (default: 20)" | |
| } | |
| }, | |
| "required": [] | |
| } | |
| }, | |
| { | |
| "name": "count_drivers", | |
| "description": "Count total drivers in the database with optional filters. Use this when user asks 'how many drivers', 'show drivers', or wants driver statistics.", | |
| "input_schema": { | |
| "type": "object", | |
| "properties": { | |
| "status": { | |
| "type": "string", | |
| "enum": ["active", "busy", "offline", "unavailable"], | |
| "description": "Filter by driver status (optional)" | |
| }, | |
| "vehicle_type": { | |
| "type": "string", | |
| "description": "Filter by vehicle type: van, truck, car, motorcycle, etc. (optional)" | |
| } | |
| }, | |
| "required": [] | |
| } | |
| }, | |
| { | |
| "name": "fetch_drivers", | |
| "description": "Fetch drivers from the database with optional filters, pagination, and sorting. Use after counting to show specific number of drivers.", | |
| "input_schema": { | |
| "type": "object", | |
| "properties": { | |
| "limit": { | |
| "type": "integer", | |
| "description": "Number of drivers to fetch (default: 10, max: 100)" | |
| }, | |
| "offset": { | |
| "type": "integer", | |
| "description": "Number of drivers to skip for pagination (default: 0)" | |
| }, | |
| "status": { | |
| "type": "string", | |
| "enum": ["active", "busy", "offline", "unavailable"], | |
| "description": "Filter by driver status (optional)" | |
| }, | |
| "vehicle_type": { | |
| "type": "string", | |
| "description": "Filter by vehicle type: van, truck, car, motorcycle, etc. (optional)" | |
| }, | |
| "sort_by": { | |
| "type": "string", | |
| "enum": ["name", "status", "created_at", "last_location_update"], | |
| "description": "Field to sort by (default: name)" | |
| }, | |
| "sort_order": { | |
| "type": "string", | |
| "enum": ["ASC", "DESC"], | |
| "description": "Sort order (default: ASC for alphabetical)" | |
| } | |
| }, | |
| "required": [] | |
| } | |
| }, | |
| { | |
| "name": "get_driver_details", | |
| "description": "Get complete details of a specific driver by driver ID, including current location (latitude, longitude, and human-readable address), contact info, vehicle details, status, and skills. Use when user asks about a driver's location, coordinates, position, or any other driver information.", | |
| "input_schema": { | |
| "type": "object", | |
| "properties": { | |
| "driver_id": { | |
| "type": "string", | |
| "description": "The driver ID to fetch details for (e.g., 'DRV-20251114163800')" | |
| } | |
| }, | |
| "required": ["driver_id"] | |
| } | |
| }, | |
| { | |
| "name": "search_drivers", | |
| "description": "Search for drivers by name, email, phone, vehicle plate, or driver ID pattern. Use when user provides partial information to find drivers.", | |
| "input_schema": { | |
| "type": "object", | |
| "properties": { | |
| "search_term": { | |
| "type": "string", | |
| "description": "Search term to match against name, email, phone, vehicle_plate, or driver_id" | |
| } | |
| }, | |
| "required": ["search_term"] | |
| } | |
| }, | |
| { | |
| "name": "get_available_drivers", | |
| "description": "Get all drivers that are available for assignment (active or offline status, excludes busy and unavailable). Shortcut for finding drivers ready for dispatch.", | |
| "input_schema": { | |
| "type": "object", | |
| "properties": { | |
| "limit": { | |
| "type": "integer", | |
| "description": "Number of drivers to fetch (default: 20)" | |
| } | |
| }, | |
| "required": [] | |
| } | |
| }, | |
| { | |
| "name": "update_order", | |
| "description": "Update an existing order's details. You can update any combination of fields. Only provide the fields you want to change.", | |
| "input_schema": { | |
| "type": "object", | |
| "properties": { | |
| "order_id": { | |
| "type": "string", | |
| "description": "Order ID to update (e.g., 'ORD-20250114123456')" | |
| }, | |
| "customer_name": { | |
| "type": "string", | |
| "description": "Updated customer name" | |
| }, | |
| "customer_phone": { | |
| "type": "string", | |
| "description": "Updated customer phone number" | |
| }, | |
| "customer_email": { | |
| "type": "string", | |
| "description": "Updated customer email address" | |
| }, | |
| "delivery_address": { | |
| "type": "string", | |
| "description": "Updated delivery address" | |
| }, | |
| "delivery_lat": { | |
| "type": "number", | |
| "description": "Updated delivery latitude (required if updating address)" | |
| }, | |
| "delivery_lng": { | |
| "type": "number", | |
| "description": "Updated delivery longitude (required if updating address)" | |
| }, | |
| "status": { | |
| "type": "string", | |
| "description": "Updated order status", | |
| "enum": ["pending", "assigned", "in_transit", "delivered", "failed", "cancelled"] | |
| }, | |
| "priority": { | |
| "type": "string", | |
| "description": "Updated priority level", | |
| "enum": ["standard", "express", "urgent"] | |
| }, | |
| "special_instructions": { | |
| "type": "string", | |
| "description": "Updated special delivery instructions" | |
| }, | |
| "time_window_end": { | |
| "type": "string", | |
| "description": "Updated delivery deadline (ISO format datetime)" | |
| }, | |
| "payment_status": { | |
| "type": "string", | |
| "description": "Updated payment status", | |
| "enum": ["pending", "paid", "cod"] | |
| }, | |
| "weight_kg": { | |
| "type": "number", | |
| "description": "Updated package weight in kilograms" | |
| }, | |
| "order_value": { | |
| "type": "number", | |
| "description": "Updated order value in currency" | |
| } | |
| }, | |
| "required": ["order_id"] | |
| } | |
| }, | |
| { | |
| "name": "delete_order", | |
| "description": "Permanently delete an order from the database. This action cannot be undone. Use with caution.", | |
| "input_schema": { | |
| "type": "object", | |
| "properties": { | |
| "order_id": { | |
| "type": "string", | |
| "description": "Order ID to delete (e.g., 'ORD-20250114123456')" | |
| }, | |
| "confirm": { | |
| "type": "boolean", | |
| "description": "Must be set to true to confirm deletion" | |
| } | |
| }, | |
| "required": ["order_id", "confirm"] | |
| } | |
| }, | |
| { | |
| "name": "update_driver", | |
| "description": "Update an existing driver's details. You can update any combination of fields. Only provide the fields you want to change.", | |
| "input_schema": { | |
| "type": "object", | |
| "properties": { | |
| "driver_id": { | |
| "type": "string", | |
| "description": "Driver ID to update (e.g., 'DRV-20250114123456')" | |
| }, | |
| "name": { | |
| "type": "string", | |
| "description": "Updated driver name" | |
| }, | |
| "phone": { | |
| "type": "string", | |
| "description": "Updated phone number" | |
| }, | |
| "email": { | |
| "type": "string", | |
| "description": "Updated email address" | |
| }, | |
| "status": { | |
| "type": "string", | |
| "description": "Updated driver status", | |
| "enum": ["active", "busy", "offline", "unavailable"] | |
| }, | |
| "vehicle_type": { | |
| "type": "string", | |
| "description": "Updated vehicle type" | |
| }, | |
| "vehicle_plate": { | |
| "type": "string", | |
| "description": "Updated vehicle license plate" | |
| }, | |
| "capacity_kg": { | |
| "type": "number", | |
| "description": "Updated cargo capacity in kilograms" | |
| }, | |
| "capacity_m3": { | |
| "type": "number", | |
| "description": "Updated cargo capacity in cubic meters" | |
| }, | |
| "skills": { | |
| "type": "array", | |
| "items": {"type": "string"}, | |
| "description": "Updated list of driver skills/certifications" | |
| }, | |
| "current_lat": { | |
| "type": "number", | |
| "description": "Updated current latitude" | |
| }, | |
| "current_lng": { | |
| "type": "number", | |
| "description": "Updated current longitude" | |
| } | |
| }, | |
| "required": ["driver_id"] | |
| } | |
| }, | |
| { | |
| "name": "delete_driver", | |
| "description": "Permanently delete a driver from the database. This action cannot be undone. Use with caution.", | |
| "input_schema": { | |
| "type": "object", | |
| "properties": { | |
| "driver_id": { | |
| "type": "string", | |
| "description": "Driver ID to delete (e.g., 'DRV-20250114123456')" | |
| }, | |
| "confirm": { | |
| "type": "boolean", | |
| "description": "Must be set to true to confirm deletion" | |
| } | |
| }, | |
| "required": ["driver_id", "confirm"] | |
| } | |
| } | |
| ] | |
| def execute_tool(tool_name: str, tool_input: dict) -> dict: | |
| """ | |
| Route tool execution to appropriate handler | |
| Args: | |
| tool_name: Name of the tool to execute | |
| tool_input: Tool input parameters | |
| Returns: | |
| Dict with tool execution results | |
| """ | |
| try: | |
| if tool_name == "geocode_address": | |
| return handle_geocode_address(tool_input) | |
| elif tool_name == "create_order": | |
| return handle_create_order(tool_input) | |
| elif tool_name == "create_driver": | |
| return handle_create_driver(tool_input) | |
| elif tool_name == "count_orders": | |
| return handle_count_orders(tool_input) | |
| elif tool_name == "fetch_orders": | |
| return handle_fetch_orders(tool_input) | |
| elif tool_name == "get_order_details": | |
| return handle_get_order_details(tool_input) | |
| elif tool_name == "search_orders": | |
| return handle_search_orders(tool_input) | |
| elif tool_name == "get_incomplete_orders": | |
| return handle_get_incomplete_orders(tool_input) | |
| elif tool_name == "count_drivers": | |
| return handle_count_drivers(tool_input) | |
| elif tool_name == "fetch_drivers": | |
| return handle_fetch_drivers(tool_input) | |
| elif tool_name == "get_driver_details": | |
| return handle_get_driver_details(tool_input) | |
| elif tool_name == "search_drivers": | |
| return handle_search_drivers(tool_input) | |
| elif tool_name == "get_available_drivers": | |
| return handle_get_available_drivers(tool_input) | |
| elif tool_name == "update_order": | |
| return handle_update_order(tool_input) | |
| elif tool_name == "delete_order": | |
| return handle_delete_order(tool_input) | |
| elif tool_name == "update_driver": | |
| return handle_update_driver(tool_input) | |
| elif tool_name == "delete_driver": | |
| return handle_delete_driver(tool_input) | |
| else: | |
| return { | |
| "success": False, | |
| "error": f"Unknown tool: {tool_name}" | |
| } | |
| except Exception as e: | |
| logger.error(f"Tool execution error ({tool_name}): {e}") | |
| return { | |
| "success": False, | |
| "error": str(e) | |
| } | |
| def handle_geocode_address(tool_input: dict) -> dict: | |
| """ | |
| Execute geocoding tool | |
| Args: | |
| tool_input: Dict with 'address' key | |
| Returns: | |
| Geocoding result | |
| """ | |
| address = tool_input.get("address", "") | |
| if not address: | |
| return { | |
| "success": False, | |
| "error": "Address is required" | |
| } | |
| logger.info(f"Geocoding address: {address}") | |
| result = geocoding_service.geocode(address) | |
| return { | |
| "success": True, | |
| "latitude": result["lat"], | |
| "longitude": result["lng"], | |
| "formatted_address": result["formatted_address"], | |
| "confidence": result["confidence"], | |
| "message": f"Address geocoded successfully ({result['confidence']})" | |
| } | |
| def handle_create_order(tool_input: dict) -> dict: | |
| """ | |
| Execute order creation tool | |
| Args: | |
| tool_input: Dict with order fields | |
| Returns: | |
| Order creation result | |
| """ | |
| # Extract fields with defaults | |
| customer_name = tool_input.get("customer_name") | |
| customer_phone = tool_input.get("customer_phone") | |
| customer_email = tool_input.get("customer_email") | |
| delivery_address = tool_input.get("delivery_address") | |
| delivery_lat = tool_input.get("delivery_lat") | |
| delivery_lng = tool_input.get("delivery_lng") | |
| priority = tool_input.get("priority", "standard") | |
| special_instructions = tool_input.get("special_instructions") | |
| weight_kg = tool_input.get("weight_kg", 5.0) | |
| # Validate required fields | |
| if not all([customer_name, delivery_address, delivery_lat, delivery_lng]): | |
| return { | |
| "success": False, | |
| "error": "Missing required fields: customer_name, delivery_address, delivery_lat, delivery_lng" | |
| } | |
| # Generate order ID | |
| now = datetime.now() | |
| order_id = f"ORD-{now.strftime('%Y%m%d%H%M%S')}" | |
| # Handle time window | |
| time_window_end_str = tool_input.get("time_window_end") | |
| if time_window_end_str: | |
| try: | |
| time_window_end = datetime.fromisoformat(time_window_end_str.replace('Z', '+00:00')) | |
| except: | |
| time_window_end = now + timedelta(hours=6) | |
| else: | |
| time_window_end = now + timedelta(hours=6) | |
| time_window_start = now + timedelta(hours=2) | |
| # Insert into database | |
| query = """ | |
| INSERT INTO orders ( | |
| order_id, customer_name, customer_phone, customer_email, | |
| delivery_address, delivery_lat, delivery_lng, | |
| time_window_start, time_window_end, | |
| priority, weight_kg, status, special_instructions | |
| ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) | |
| """ | |
| params = ( | |
| order_id, | |
| customer_name, | |
| customer_phone, | |
| customer_email, | |
| delivery_address, | |
| delivery_lat, | |
| delivery_lng, | |
| time_window_start, | |
| time_window_end, | |
| priority, | |
| weight_kg, | |
| "pending", | |
| special_instructions | |
| ) | |
| try: | |
| execute_write(query, params) | |
| logger.info(f"Order created: {order_id}") | |
| return { | |
| "success": True, | |
| "order_id": order_id, | |
| "status": "pending", | |
| "customer": customer_name, | |
| "address": delivery_address, | |
| "deadline": time_window_end.strftime("%Y-%m-%d %H:%M"), | |
| "priority": priority, | |
| "message": f"Order {order_id} created successfully!" | |
| } | |
| except Exception as e: | |
| logger.error(f"Database error creating order: {e}") | |
| return { | |
| "success": False, | |
| "error": f"Failed to create order: {str(e)}" | |
| } | |
| def handle_create_driver(tool_input: dict) -> dict: | |
| """ | |
| Execute driver creation tool | |
| Args: | |
| tool_input: Dict with driver fields | |
| Returns: | |
| Driver creation result | |
| """ | |
| # Extract fields with defaults | |
| name = tool_input.get("name") | |
| phone = tool_input.get("phone") | |
| email = tool_input.get("email") | |
| vehicle_type = tool_input.get("vehicle_type", "van") | |
| vehicle_plate = tool_input.get("vehicle_plate") | |
| capacity_kg = tool_input.get("capacity_kg", 1000.0) | |
| capacity_m3 = tool_input.get("capacity_m3", 12.0) | |
| # Convert skills to regular list (handles protobuf RepeatedComposite) | |
| skills_raw = tool_input.get("skills", []) | |
| skills = list(skills_raw) if skills_raw else [] | |
| status = tool_input.get("status", "active") | |
| # Validate required fields | |
| if not name: | |
| return { | |
| "success": False, | |
| "error": "Missing required field: name" | |
| } | |
| # Generate driver ID | |
| now = datetime.now() | |
| driver_id = f"DRV-{now.strftime('%Y%m%d%H%M%S')}" | |
| # Default location (San Francisco) | |
| current_lat = tool_input.get("current_lat", 37.7749) | |
| current_lng = tool_input.get("current_lng", -122.4194) | |
| # Insert into database | |
| query = """ | |
| INSERT INTO drivers ( | |
| driver_id, name, phone, email, | |
| current_lat, current_lng, last_location_update, | |
| status, vehicle_type, vehicle_plate, | |
| capacity_kg, capacity_m3, skills | |
| ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) | |
| """ | |
| # Convert skills list to JSON | |
| import json | |
| skills_json = json.dumps(skills) if skills else json.dumps([]) | |
| params = ( | |
| driver_id, | |
| name, | |
| phone, | |
| email, | |
| current_lat, | |
| current_lng, | |
| now, | |
| status, | |
| vehicle_type, | |
| vehicle_plate, | |
| capacity_kg, | |
| capacity_m3, | |
| skills_json | |
| ) | |
| try: | |
| execute_write(query, params) | |
| logger.info(f"Driver created: {driver_id}") | |
| return { | |
| "success": True, | |
| "driver_id": driver_id, | |
| "name": name, | |
| "status": status, | |
| "vehicle_type": vehicle_type, | |
| "vehicle_plate": vehicle_plate, | |
| "capacity_kg": capacity_kg, | |
| "skills": skills, | |
| "message": f"Driver {driver_id} ({name}) created successfully!" | |
| } | |
| except Exception as e: | |
| logger.error(f"Database error creating driver: {e}") | |
| return { | |
| "success": False, | |
| "error": f"Failed to create driver: {str(e)}" | |
| } | |
| def handle_update_order(tool_input: dict) -> dict: | |
| """ | |
| Execute order update tool | |
| Args: | |
| tool_input: Dict with order_id and fields to update | |
| Returns: | |
| Update result | |
| """ | |
| import json | |
| order_id = tool_input.get("order_id") | |
| # Validate required field | |
| if not order_id: | |
| return { | |
| "success": False, | |
| "error": "Missing required field: order_id" | |
| } | |
| # Check if order exists | |
| check_query = "SELECT order_id FROM orders WHERE order_id = %s" | |
| existing = execute_query(check_query, (order_id,)) | |
| if not existing: | |
| return { | |
| "success": False, | |
| "error": f"Order {order_id} not found" | |
| } | |
| # Auto-geocode if delivery address is updated without coordinates | |
| if "delivery_address" in tool_input and ("delivery_lat" not in tool_input or "delivery_lng" not in tool_input): | |
| from chat.geocoding import GeocodingService | |
| geocoding_service = GeocodingService() | |
| try: | |
| geocode_result = geocoding_service.geocode(tool_input["delivery_address"]) | |
| tool_input["delivery_lat"] = geocode_result["lat"] | |
| tool_input["delivery_lng"] = geocode_result["lng"] | |
| logger.info(f"Auto-geocoded delivery address: {geocode_result['formatted_address']}") | |
| except Exception as e: | |
| logger.warning(f"Failed to geocode address, skipping coordinate update: {e}") | |
| # Build UPDATE query dynamically based on provided fields | |
| update_fields = [] | |
| params = [] | |
| # Map of field names to their database columns | |
| updateable_fields = { | |
| "customer_name": "customer_name", | |
| "customer_phone": "customer_phone", | |
| "customer_email": "customer_email", | |
| "delivery_address": "delivery_address", | |
| "delivery_lat": "delivery_lat", | |
| "delivery_lng": "delivery_lng", | |
| "status": "status", | |
| "priority": "priority", | |
| "special_instructions": "special_instructions", | |
| "time_window_end": "time_window_end", | |
| "payment_status": "payment_status", | |
| "weight_kg": "weight_kg", | |
| "order_value": "order_value" | |
| } | |
| for field, column in updateable_fields.items(): | |
| if field in tool_input: | |
| update_fields.append(f"{column} = %s") | |
| params.append(tool_input[field]) | |
| if not update_fields: | |
| return { | |
| "success": False, | |
| "error": "No fields provided to update" | |
| } | |
| # Always update the updated_at timestamp | |
| update_fields.append("updated_at = %s") | |
| params.append(datetime.now()) | |
| # Add order_id for WHERE clause | |
| params.append(order_id) | |
| # Execute update | |
| query = f""" | |
| UPDATE orders | |
| SET {', '.join(update_fields)} | |
| WHERE order_id = %s | |
| """ | |
| try: | |
| execute_write(query, tuple(params)) | |
| logger.info(f"Order updated: {order_id}") | |
| return { | |
| "success": True, | |
| "order_id": order_id, | |
| "updated_fields": list(updateable_fields.keys() & tool_input.keys()), | |
| "message": f"Order {order_id} updated successfully!" | |
| } | |
| except Exception as e: | |
| logger.error(f"Database error updating order: {e}") | |
| return { | |
| "success": False, | |
| "error": f"Failed to update order: {str(e)}" | |
| } | |
| def handle_delete_order(tool_input: dict) -> dict: | |
| """ | |
| Execute order deletion tool | |
| Args: | |
| tool_input: Dict with order_id and confirm flag | |
| Returns: | |
| Deletion result | |
| """ | |
| order_id = tool_input.get("order_id") | |
| confirm = tool_input.get("confirm", False) | |
| # Validate required fields | |
| if not order_id: | |
| return { | |
| "success": False, | |
| "error": "Missing required field: order_id" | |
| } | |
| if not confirm: | |
| return { | |
| "success": False, | |
| "error": "Deletion not confirmed. Set confirm=true to proceed." | |
| } | |
| # Check if order exists | |
| check_query = "SELECT order_id, status FROM orders WHERE order_id = %s" | |
| existing = execute_query(check_query, (order_id,)) | |
| if not existing: | |
| return { | |
| "success": False, | |
| "error": f"Order {order_id} not found" | |
| } | |
| # Delete the order | |
| query = "DELETE FROM orders WHERE order_id = %s" | |
| try: | |
| execute_write(query, (order_id,)) | |
| logger.info(f"Order deleted: {order_id}") | |
| return { | |
| "success": True, | |
| "order_id": order_id, | |
| "message": f"Order {order_id} has been permanently deleted." | |
| } | |
| except Exception as e: | |
| logger.error(f"Database error deleting order: {e}") | |
| return { | |
| "success": False, | |
| "error": f"Failed to delete order: {str(e)}" | |
| } | |
| def handle_update_driver(tool_input: dict) -> dict: | |
| """ | |
| Execute driver update tool | |
| Args: | |
| tool_input: Dict with driver_id and fields to update | |
| Returns: | |
| Update result | |
| """ | |
| import json | |
| driver_id = tool_input.get("driver_id") | |
| # Validate required field | |
| if not driver_id: | |
| return { | |
| "success": False, | |
| "error": "Missing required field: driver_id" | |
| } | |
| # Check if driver exists | |
| check_query = "SELECT driver_id FROM drivers WHERE driver_id = %s" | |
| existing = execute_query(check_query, (driver_id,)) | |
| if not existing: | |
| return { | |
| "success": False, | |
| "error": f"Driver {driver_id} not found" | |
| } | |
| # Build UPDATE query dynamically based on provided fields | |
| update_fields = [] | |
| params = [] | |
| # Map of field names to their database columns | |
| updateable_fields = { | |
| "name": "name", | |
| "phone": "phone", | |
| "email": "email", | |
| "status": "status", | |
| "vehicle_type": "vehicle_type", | |
| "vehicle_plate": "vehicle_plate", | |
| "capacity_kg": "capacity_kg", | |
| "capacity_m3": "capacity_m3", | |
| "current_lat": "current_lat", | |
| "current_lng": "current_lng" | |
| } | |
| for field, column in updateable_fields.items(): | |
| if field in tool_input: | |
| update_fields.append(f"{column} = %s") | |
| params.append(tool_input[field]) | |
| # Handle skills array specially (convert to JSON) | |
| if "skills" in tool_input: | |
| skills = list(tool_input.get("skills", [])) | |
| update_fields.append("skills = %s") | |
| params.append(json.dumps(skills)) | |
| if not update_fields: | |
| return { | |
| "success": False, | |
| "error": "No fields provided to update" | |
| } | |
| # Always update the updated_at timestamp | |
| update_fields.append("updated_at = %s") | |
| params.append(datetime.now()) | |
| # Update location timestamp if lat/lng changed | |
| if "current_lat" in tool_input or "current_lng" in tool_input: | |
| update_fields.append("last_location_update = %s") | |
| params.append(datetime.now()) | |
| # Add driver_id for WHERE clause | |
| params.append(driver_id) | |
| # Execute update | |
| query = f""" | |
| UPDATE drivers | |
| SET {', '.join(update_fields)} | |
| WHERE driver_id = %s | |
| """ | |
| try: | |
| execute_write(query, tuple(params)) | |
| logger.info(f"Driver updated: {driver_id}") | |
| updated_list = list(updateable_fields.keys() & tool_input.keys()) | |
| if "skills" in tool_input: | |
| updated_list.append("skills") | |
| return { | |
| "success": True, | |
| "driver_id": driver_id, | |
| "updated_fields": updated_list, | |
| "message": f"Driver {driver_id} updated successfully!" | |
| } | |
| except Exception as e: | |
| logger.error(f"Database error updating driver: {e}") | |
| return { | |
| "success": False, | |
| "error": f"Failed to update driver: {str(e)}" | |
| } | |
| def handle_delete_driver(tool_input: dict) -> dict: | |
| """ | |
| Execute driver deletion tool | |
| Args: | |
| tool_input: Dict with driver_id and confirm flag | |
| Returns: | |
| Deletion result | |
| """ | |
| driver_id = tool_input.get("driver_id") | |
| confirm = tool_input.get("confirm", False) | |
| # Validate required fields | |
| if not driver_id: | |
| return { | |
| "success": False, | |
| "error": "Missing required field: driver_id" | |
| } | |
| if not confirm: | |
| return { | |
| "success": False, | |
| "error": "Deletion not confirmed. Set confirm=true to proceed." | |
| } | |
| # Check if driver exists | |
| check_query = "SELECT driver_id, name FROM drivers WHERE driver_id = %s" | |
| existing = execute_query(check_query, (driver_id,)) | |
| if not existing: | |
| return { | |
| "success": False, | |
| "error": f"Driver {driver_id} not found" | |
| } | |
| driver_name = existing[0]["name"] | |
| # Delete the driver | |
| query = "DELETE FROM drivers WHERE driver_id = %s" | |
| try: | |
| execute_write(query, (driver_id,)) | |
| logger.info(f"Driver deleted: {driver_id}") | |
| return { | |
| "success": True, | |
| "driver_id": driver_id, | |
| "message": f"Driver {driver_id} ({driver_name}) has been permanently deleted." | |
| } | |
| except Exception as e: | |
| logger.error(f"Database error deleting driver: {e}") | |
| return { | |
| "success": False, | |
| "error": f"Failed to delete driver: {str(e)}" | |
| } | |
| def handle_count_orders(tool_input: dict) -> dict: | |
| """ | |
| Execute count orders tool | |
| Args: | |
| tool_input: Dict with optional filter fields | |
| Returns: | |
| Order count result with breakdown | |
| """ | |
| # Build WHERE clause based on filters | |
| where_clauses = [] | |
| params = [] | |
| if "status" in tool_input: | |
| where_clauses.append("status = %s") | |
| params.append(tool_input["status"]) | |
| if "priority" in tool_input: | |
| where_clauses.append("priority = %s") | |
| params.append(tool_input["priority"]) | |
| if "payment_status" in tool_input: | |
| where_clauses.append("payment_status = %s") | |
| params.append(tool_input["payment_status"]) | |
| if "assigned_driver_id" in tool_input: | |
| where_clauses.append("assigned_driver_id = %s") | |
| params.append(tool_input["assigned_driver_id"]) | |
| if "is_fragile" in tool_input: | |
| where_clauses.append("is_fragile = %s") | |
| params.append(tool_input["is_fragile"]) | |
| if "requires_signature" in tool_input: | |
| where_clauses.append("requires_signature = %s") | |
| params.append(tool_input["requires_signature"]) | |
| if "requires_cold_storage" in tool_input: | |
| where_clauses.append("requires_cold_storage = %s") | |
| params.append(tool_input["requires_cold_storage"]) | |
| where_sql = " WHERE " + " AND ".join(where_clauses) if where_clauses else "" | |
| # Total count query | |
| count_query = f"SELECT COUNT(*) as total FROM orders{where_sql}" | |
| # Breakdown by status query | |
| breakdown_query = f""" | |
| SELECT status, COUNT(*) as count | |
| FROM orders{where_sql} | |
| GROUP BY status | |
| ORDER BY count DESC | |
| """ | |
| # Breakdown by priority query | |
| priority_query = f""" | |
| SELECT priority, COUNT(*) as count | |
| FROM orders{where_sql} | |
| GROUP BY priority | |
| ORDER BY CASE priority | |
| WHEN 'urgent' THEN 1 | |
| WHEN 'express' THEN 2 | |
| WHEN 'standard' THEN 3 | |
| END | |
| """ | |
| try: | |
| # Execute queries | |
| total_result = execute_query(count_query, tuple(params) if params else None) | |
| total = total_result[0]['total'] if total_result else 0 | |
| status_result = execute_query(breakdown_query, tuple(params) if params else None) | |
| priority_result = execute_query(priority_query, tuple(params) if params else None) | |
| # Format breakdown | |
| status_breakdown = {row['status']: row['count'] for row in status_result} if status_result else {} | |
| priority_breakdown = {row['priority']: row['count'] for row in priority_result} if priority_result else {} | |
| logger.info(f"Counted orders: {total} total") | |
| return { | |
| "success": True, | |
| "total": total, | |
| "status_breakdown": status_breakdown, | |
| "priority_breakdown": priority_breakdown, | |
| "message": f"Found {total} order(s)" | |
| } | |
| except Exception as e: | |
| logger.error(f"Database error counting orders: {e}") | |
| return { | |
| "success": False, | |
| "error": f"Failed to count orders: {str(e)}" | |
| } | |
| def handle_fetch_orders(tool_input: dict) -> dict: | |
| """ | |
| Execute fetch orders tool | |
| Args: | |
| tool_input: Dict with filter, pagination, and sorting options | |
| Returns: | |
| List of orders matching criteria | |
| """ | |
| # Extract pagination and sorting | |
| limit = min(tool_input.get("limit", 10), 100) # Cap at 100 | |
| offset = tool_input.get("offset", 0) | |
| sort_by = tool_input.get("sort_by", "created_at") | |
| sort_order = tool_input.get("sort_order", "DESC") | |
| # Build WHERE clause based on filters | |
| where_clauses = [] | |
| params = [] | |
| if "status" in tool_input: | |
| where_clauses.append("status = %s") | |
| params.append(tool_input["status"]) | |
| if "priority" in tool_input: | |
| where_clauses.append("priority = %s") | |
| params.append(tool_input["priority"]) | |
| if "payment_status" in tool_input: | |
| where_clauses.append("payment_status = %s") | |
| params.append(tool_input["payment_status"]) | |
| if "assigned_driver_id" in tool_input: | |
| where_clauses.append("assigned_driver_id = %s") | |
| params.append(tool_input["assigned_driver_id"]) | |
| if "is_fragile" in tool_input: | |
| where_clauses.append("is_fragile = %s") | |
| params.append(tool_input["is_fragile"]) | |
| if "requires_signature" in tool_input: | |
| where_clauses.append("requires_signature = %s") | |
| params.append(tool_input["requires_signature"]) | |
| if "requires_cold_storage" in tool_input: | |
| where_clauses.append("requires_cold_storage = %s") | |
| params.append(tool_input["requires_cold_storage"]) | |
| where_sql = " WHERE " + " AND ".join(where_clauses) if where_clauses else "" | |
| # Build query | |
| query = f""" | |
| SELECT | |
| order_id, customer_name, customer_phone, customer_email, | |
| delivery_address, delivery_lat, delivery_lng, | |
| time_window_start, time_window_end, | |
| priority, weight_kg, volume_m3, special_instructions, | |
| status, assigned_driver_id, | |
| created_at, updated_at, delivered_at, | |
| order_value, payment_status, | |
| requires_signature, is_fragile, requires_cold_storage | |
| FROM orders | |
| {where_sql} | |
| ORDER BY {sort_by} {sort_order} | |
| LIMIT %s OFFSET %s | |
| """ | |
| params.extend([limit, offset]) | |
| try: | |
| results = execute_query(query, tuple(params)) | |
| if not results: | |
| return { | |
| "success": True, | |
| "orders": [], | |
| "count": 0, | |
| "message": "No orders found matching criteria" | |
| } | |
| # Format orders for readability | |
| orders = [] | |
| for row in results: | |
| order = { | |
| "order_id": row['order_id'], | |
| "customer": { | |
| "name": row['customer_name'], | |
| "phone": row['customer_phone'], | |
| "email": row['customer_email'] | |
| }, | |
| "delivery": { | |
| "address": row['delivery_address'], | |
| "latitude": float(row['delivery_lat']) if row['delivery_lat'] else None, | |
| "longitude": float(row['delivery_lng']) if row['delivery_lng'] else None | |
| }, | |
| "time_window": { | |
| "start": str(row['time_window_start']) if row['time_window_start'] else None, | |
| "end": str(row['time_window_end']) if row['time_window_end'] else None | |
| }, | |
| "details": { | |
| "priority": row['priority'], | |
| "status": row['status'], | |
| "weight_kg": float(row['weight_kg']) if row['weight_kg'] else None, | |
| "volume_m3": float(row['volume_m3']) if row['volume_m3'] else None, | |
| "special_instructions": row['special_instructions'] | |
| }, | |
| "flags": { | |
| "requires_signature": row['requires_signature'], | |
| "is_fragile": row['is_fragile'], | |
| "requires_cold_storage": row['requires_cold_storage'] | |
| }, | |
| "payment": { | |
| "order_value": float(row['order_value']) if row['order_value'] else None, | |
| "payment_status": row['payment_status'] | |
| }, | |
| "assigned_driver_id": row['assigned_driver_id'], | |
| "timestamps": { | |
| "created_at": str(row['created_at']), | |
| "updated_at": str(row['updated_at']) if row['updated_at'] else None, | |
| "delivered_at": str(row['delivered_at']) if row['delivered_at'] else None | |
| } | |
| } | |
| orders.append(order) | |
| logger.info(f"Fetched {len(orders)} orders") | |
| return { | |
| "success": True, | |
| "orders": orders, | |
| "count": len(orders), | |
| "message": f"Retrieved {len(orders)} order(s)" | |
| } | |
| except Exception as e: | |
| logger.error(f"Database error fetching orders: {e}") | |
| return { | |
| "success": False, | |
| "error": f"Failed to fetch orders: {str(e)}" | |
| } | |
| def handle_get_order_details(tool_input: dict) -> dict: | |
| """ | |
| Execute get order details tool | |
| Args: | |
| tool_input: Dict with order_id | |
| Returns: | |
| Complete order details | |
| """ | |
| order_id = tool_input.get("order_id") | |
| if not order_id: | |
| return { | |
| "success": False, | |
| "error": "order_id is required" | |
| } | |
| query = """ | |
| SELECT | |
| order_id, customer_name, customer_phone, customer_email, | |
| pickup_address, pickup_lat, pickup_lng, | |
| delivery_address, delivery_lat, delivery_lng, | |
| time_window_start, time_window_end, | |
| priority, weight_kg, volume_m3, special_instructions, | |
| status, assigned_driver_id, | |
| created_at, updated_at, delivered_at, | |
| order_value, payment_status, | |
| requires_signature, is_fragile, requires_cold_storage | |
| FROM orders | |
| WHERE order_id = %s | |
| """ | |
| try: | |
| results = execute_query(query, (order_id,)) | |
| if not results: | |
| return { | |
| "success": False, | |
| "error": f"Order {order_id} not found" | |
| } | |
| row = results[0] | |
| order = { | |
| "order_id": row['order_id'], | |
| "customer": { | |
| "name": row['customer_name'], | |
| "phone": row['customer_phone'], | |
| "email": row['customer_email'] | |
| }, | |
| "pickup": { | |
| "address": row['pickup_address'], | |
| "latitude": float(row['pickup_lat']) if row['pickup_lat'] else None, | |
| "longitude": float(row['pickup_lng']) if row['pickup_lng'] else None | |
| } if row['pickup_address'] else None, | |
| "delivery": { | |
| "address": row['delivery_address'], | |
| "latitude": float(row['delivery_lat']) if row['delivery_lat'] else None, | |
| "longitude": float(row['delivery_lng']) if row['delivery_lng'] else None | |
| }, | |
| "time_window": { | |
| "start": str(row['time_window_start']) if row['time_window_start'] else None, | |
| "end": str(row['time_window_end']) if row['time_window_end'] else None | |
| }, | |
| "details": { | |
| "priority": row['priority'], | |
| "status": row['status'], | |
| "weight_kg": float(row['weight_kg']) if row['weight_kg'] else None, | |
| "volume_m3": float(row['volume_m3']) if row['volume_m3'] else None, | |
| "special_instructions": row['special_instructions'] | |
| }, | |
| "flags": { | |
| "requires_signature": row['requires_signature'], | |
| "is_fragile": row['is_fragile'], | |
| "requires_cold_storage": row['requires_cold_storage'] | |
| }, | |
| "payment": { | |
| "order_value": float(row['order_value']) if row['order_value'] else None, | |
| "payment_status": row['payment_status'] | |
| }, | |
| "assigned_driver_id": row['assigned_driver_id'], | |
| "timestamps": { | |
| "created_at": str(row['created_at']), | |
| "updated_at": str(row['updated_at']) if row['updated_at'] else None, | |
| "delivered_at": str(row['delivered_at']) if row['delivered_at'] else None | |
| } | |
| } | |
| logger.info(f"Retrieved details for order: {order_id}") | |
| return { | |
| "success": True, | |
| "order": order, | |
| "message": f"Order {order_id} details retrieved" | |
| } | |
| except Exception as e: | |
| logger.error(f"Database error getting order details: {e}") | |
| return { | |
| "success": False, | |
| "error": f"Failed to get order details: {str(e)}" | |
| } | |
| def handle_search_orders(tool_input: dict) -> dict: | |
| """ | |
| Execute search orders tool | |
| Args: | |
| tool_input: Dict with search_term | |
| Returns: | |
| List of matching orders | |
| """ | |
| search_term = tool_input.get("search_term", "").strip() | |
| if not search_term: | |
| return { | |
| "success": False, | |
| "error": "search_term is required" | |
| } | |
| query = """ | |
| SELECT | |
| order_id, customer_name, customer_phone, customer_email, | |
| delivery_address, priority, status, created_at | |
| FROM orders | |
| WHERE | |
| order_id ILIKE %s OR | |
| customer_name ILIKE %s OR | |
| customer_email ILIKE %s OR | |
| customer_phone ILIKE %s | |
| ORDER BY created_at DESC | |
| LIMIT 50 | |
| """ | |
| search_pattern = f"%{search_term}%" | |
| params = (search_pattern, search_pattern, search_pattern, search_pattern) | |
| try: | |
| results = execute_query(query, params) | |
| if not results: | |
| return { | |
| "success": True, | |
| "orders": [], | |
| "count": 0, | |
| "message": f"No orders found matching '{search_term}'" | |
| } | |
| orders = [] | |
| for row in results: | |
| orders.append({ | |
| "order_id": row['order_id'], | |
| "customer_name": row['customer_name'], | |
| "customer_phone": row['customer_phone'], | |
| "customer_email": row['customer_email'], | |
| "delivery_address": row['delivery_address'], | |
| "priority": row['priority'], | |
| "status": row['status'], | |
| "created_at": str(row['created_at']) | |
| }) | |
| logger.info(f"Search '{search_term}' found {len(orders)} orders") | |
| return { | |
| "success": True, | |
| "orders": orders, | |
| "count": len(orders), | |
| "message": f"Found {len(orders)} order(s) matching '{search_term}'" | |
| } | |
| except Exception as e: | |
| logger.error(f"Database error searching orders: {e}") | |
| return { | |
| "success": False, | |
| "error": f"Failed to search orders: {str(e)}" | |
| } | |
| def handle_get_incomplete_orders(tool_input: dict) -> dict: | |
| """ | |
| Execute get incomplete orders tool | |
| Args: | |
| tool_input: Dict with optional limit | |
| Returns: | |
| List of incomplete orders (pending, assigned, in_transit) | |
| """ | |
| limit = min(tool_input.get("limit", 20), 100) | |
| query = """ | |
| SELECT | |
| order_id, customer_name, delivery_address, | |
| priority, status, time_window_end, created_at, | |
| assigned_driver_id | |
| FROM orders | |
| WHERE status IN ('pending', 'assigned', 'in_transit') | |
| ORDER BY | |
| CASE priority | |
| WHEN 'urgent' THEN 1 | |
| WHEN 'express' THEN 2 | |
| WHEN 'standard' THEN 3 | |
| END, | |
| time_window_end ASC | |
| LIMIT %s | |
| """ | |
| try: | |
| results = execute_query(query, (limit,)) | |
| if not results: | |
| return { | |
| "success": True, | |
| "orders": [], | |
| "count": 0, | |
| "message": "No incomplete orders found" | |
| } | |
| orders = [] | |
| for row in results: | |
| orders.append({ | |
| "order_id": row['order_id'], | |
| "customer_name": row['customer_name'], | |
| "delivery_address": row['delivery_address'], | |
| "priority": row['priority'], | |
| "status": row['status'], | |
| "time_window_end": str(row['time_window_end']) if row['time_window_end'] else None, | |
| "created_at": str(row['created_at']), | |
| "assigned_driver_id": row['assigned_driver_id'] | |
| }) | |
| logger.info(f"Retrieved {len(orders)} incomplete orders") | |
| return { | |
| "success": True, | |
| "orders": orders, | |
| "count": len(orders), | |
| "message": f"Found {len(orders)} incomplete order(s)" | |
| } | |
| except Exception as e: | |
| logger.error(f"Database error getting incomplete orders: {e}") | |
| return { | |
| "success": False, | |
| "error": f"Failed to get incomplete orders: {str(e)}" | |
| } | |
| def handle_count_drivers(tool_input: dict) -> dict: | |
| """ | |
| Execute count drivers tool | |
| Args: | |
| tool_input: Dict with optional filter fields | |
| Returns: | |
| Driver count result with breakdown | |
| """ | |
| # Build WHERE clause based on filters | |
| where_clauses = [] | |
| params = [] | |
| if "status" in tool_input: | |
| where_clauses.append("status = %s") | |
| params.append(tool_input["status"]) | |
| if "vehicle_type" in tool_input: | |
| where_clauses.append("vehicle_type = %s") | |
| params.append(tool_input["vehicle_type"]) | |
| where_sql = " WHERE " + " AND ".join(where_clauses) if where_clauses else "" | |
| # Total count query | |
| count_query = f"SELECT COUNT(*) as total FROM drivers{where_sql}" | |
| # Breakdown by status query | |
| status_query = f""" | |
| SELECT status, COUNT(*) as count | |
| FROM drivers{where_sql} | |
| GROUP BY status | |
| ORDER BY count DESC | |
| """ | |
| # Breakdown by vehicle type query | |
| vehicle_query = f""" | |
| SELECT vehicle_type, COUNT(*) as count | |
| FROM drivers{where_sql} | |
| GROUP BY vehicle_type | |
| ORDER BY count DESC | |
| """ | |
| try: | |
| # Execute queries | |
| total_result = execute_query(count_query, tuple(params) if params else None) | |
| total = total_result[0]['total'] if total_result else 0 | |
| status_result = execute_query(status_query, tuple(params) if params else None) | |
| vehicle_result = execute_query(vehicle_query, tuple(params) if params else None) | |
| # Format breakdown | |
| status_breakdown = {row['status']: row['count'] for row in status_result} if status_result else {} | |
| vehicle_breakdown = {row['vehicle_type']: row['count'] for row in vehicle_result if row['vehicle_type']} if vehicle_result else {} | |
| logger.info(f"Counted drivers: {total} total") | |
| return { | |
| "success": True, | |
| "total": total, | |
| "status_breakdown": status_breakdown, | |
| "vehicle_breakdown": vehicle_breakdown, | |
| "message": f"Found {total} driver(s)" | |
| } | |
| except Exception as e: | |
| logger.error(f"Database error counting drivers: {e}") | |
| return { | |
| "success": False, | |
| "error": f"Failed to count drivers: {str(e)}" | |
| } | |
| def handle_fetch_drivers(tool_input: dict) -> dict: | |
| """ | |
| Execute fetch drivers tool | |
| Args: | |
| tool_input: Dict with filter, pagination, and sorting options | |
| Returns: | |
| List of drivers matching criteria | |
| """ | |
| # Extract pagination and sorting | |
| limit = min(tool_input.get("limit", 10), 100) # Cap at 100 | |
| offset = tool_input.get("offset", 0) | |
| sort_by = tool_input.get("sort_by", "name") | |
| sort_order = tool_input.get("sort_order", "ASC") | |
| # Build WHERE clause based on filters | |
| where_clauses = [] | |
| params = [] | |
| if "status" in tool_input: | |
| where_clauses.append("status = %s") | |
| params.append(tool_input["status"]) | |
| if "vehicle_type" in tool_input: | |
| where_clauses.append("vehicle_type = %s") | |
| params.append(tool_input["vehicle_type"]) | |
| where_sql = " WHERE " + " AND ".join(where_clauses) if where_clauses else "" | |
| # Build query | |
| query = f""" | |
| SELECT | |
| driver_id, name, phone, email, | |
| current_lat, current_lng, last_location_update, | |
| status, vehicle_type, vehicle_plate, | |
| capacity_kg, capacity_m3, skills, | |
| created_at, updated_at | |
| FROM drivers | |
| {where_sql} | |
| ORDER BY {sort_by} {sort_order} | |
| LIMIT %s OFFSET %s | |
| """ | |
| params.extend([limit, offset]) | |
| try: | |
| results = execute_query(query, tuple(params)) | |
| if not results: | |
| return { | |
| "success": True, | |
| "drivers": [], | |
| "count": 0, | |
| "message": "No drivers found matching criteria" | |
| } | |
| # Format drivers for readability | |
| drivers = [] | |
| for row in results: | |
| # Parse skills JSON if present | |
| skills = [] | |
| if row['skills']: | |
| try: | |
| import json | |
| skills = json.loads(row['skills']) if isinstance(row['skills'], str) else row['skills'] | |
| except: | |
| skills = [] | |
| driver = { | |
| "driver_id": row['driver_id'], | |
| "name": row['name'], | |
| "contact": { | |
| "phone": row['phone'], | |
| "email": row['email'] | |
| }, | |
| "location": { | |
| "latitude": float(row['current_lat']) if row['current_lat'] else None, | |
| "longitude": float(row['current_lng']) if row['current_lng'] else None, | |
| "last_update": str(row['last_location_update']) if row['last_location_update'] else None | |
| }, | |
| "status": row['status'], | |
| "vehicle": { | |
| "type": row['vehicle_type'], | |
| "plate": row['vehicle_plate'], | |
| "capacity_kg": float(row['capacity_kg']) if row['capacity_kg'] else None, | |
| "capacity_m3": float(row['capacity_m3']) if row['capacity_m3'] else None | |
| }, | |
| "skills": skills, | |
| "timestamps": { | |
| "created_at": str(row['created_at']), | |
| "updated_at": str(row['updated_at']) if row['updated_at'] else None | |
| } | |
| } | |
| drivers.append(driver) | |
| logger.info(f"Fetched {len(drivers)} drivers") | |
| return { | |
| "success": True, | |
| "drivers": drivers, | |
| "count": len(drivers), | |
| "message": f"Retrieved {len(drivers)} driver(s)" | |
| } | |
| except Exception as e: | |
| logger.error(f"Database error fetching drivers: {e}") | |
| return { | |
| "success": False, | |
| "error": f"Failed to fetch drivers: {str(e)}" | |
| } | |
| def handle_get_driver_details(tool_input: dict) -> dict: | |
| """ | |
| Execute get driver details tool | |
| Args: | |
| tool_input: Dict with driver_id | |
| Returns: | |
| Complete driver details | |
| """ | |
| driver_id = tool_input.get("driver_id") | |
| if not driver_id: | |
| return { | |
| "success": False, | |
| "error": "driver_id is required" | |
| } | |
| query = """ | |
| SELECT | |
| driver_id, name, phone, email, | |
| current_lat, current_lng, last_location_update, | |
| status, vehicle_type, vehicle_plate, | |
| capacity_kg, capacity_m3, skills, | |
| created_at, updated_at | |
| FROM drivers | |
| WHERE driver_id = %s | |
| """ | |
| try: | |
| results = execute_query(query, (driver_id,)) | |
| if not results: | |
| return { | |
| "success": False, | |
| "error": f"Driver {driver_id} not found" | |
| } | |
| row = results[0] | |
| # Parse skills JSON if present | |
| skills = [] | |
| if row['skills']: | |
| try: | |
| import json | |
| skills = json.loads(row['skills']) if isinstance(row['skills'], str) else row['skills'] | |
| except: | |
| skills = [] | |
| # Reverse geocode location to get address | |
| location_address = None | |
| if row['current_lat'] and row['current_lng']: | |
| try: | |
| from chat.geocoding import GeocodingService | |
| geocoding_service = GeocodingService() | |
| reverse_result = geocoding_service.reverse_geocode( | |
| float(row['current_lat']), | |
| float(row['current_lng']) | |
| ) | |
| location_address = reverse_result.get('formatted_address', None) | |
| logger.info(f"Reverse geocoded driver location: {location_address}") | |
| except Exception as e: | |
| logger.warning(f"Failed to reverse geocode driver location: {e}") | |
| location_address = None | |
| driver = { | |
| "driver_id": row['driver_id'], | |
| "name": row['name'], | |
| "contact": { | |
| "phone": row['phone'], | |
| "email": row['email'] | |
| }, | |
| "location": { | |
| "latitude": float(row['current_lat']) if row['current_lat'] else None, | |
| "longitude": float(row['current_lng']) if row['current_lng'] else None, | |
| "address": location_address, | |
| "last_update": str(row['last_location_update']) if row['last_location_update'] else None | |
| }, | |
| "status": row['status'], | |
| "vehicle": { | |
| "type": row['vehicle_type'], | |
| "plate": row['vehicle_plate'], | |
| "capacity_kg": float(row['capacity_kg']) if row['capacity_kg'] else None, | |
| "capacity_m3": float(row['capacity_m3']) if row['capacity_m3'] else None | |
| }, | |
| "skills": skills, | |
| "timestamps": { | |
| "created_at": str(row['created_at']), | |
| "updated_at": str(row['updated_at']) if row['updated_at'] else None | |
| } | |
| } | |
| logger.info(f"Retrieved details for driver: {driver_id}") | |
| return { | |
| "success": True, | |
| "driver": driver, | |
| "message": f"Driver {driver_id} details retrieved" | |
| } | |
| except Exception as e: | |
| logger.error(f"Database error getting driver details: {e}") | |
| return { | |
| "success": False, | |
| "error": f"Failed to get driver details: {str(e)}" | |
| } | |
| def handle_search_drivers(tool_input: dict) -> dict: | |
| """ | |
| Execute search drivers tool | |
| Args: | |
| tool_input: Dict with search_term | |
| Returns: | |
| List of matching drivers | |
| """ | |
| search_term = tool_input.get("search_term", "").strip() | |
| if not search_term: | |
| return { | |
| "success": False, | |
| "error": "search_term is required" | |
| } | |
| query = """ | |
| SELECT | |
| driver_id, name, phone, email, | |
| vehicle_type, vehicle_plate, status, created_at | |
| FROM drivers | |
| WHERE | |
| driver_id ILIKE %s OR | |
| name ILIKE %s OR | |
| email ILIKE %s OR | |
| phone ILIKE %s OR | |
| vehicle_plate ILIKE %s | |
| ORDER BY name ASC | |
| LIMIT 50 | |
| """ | |
| search_pattern = f"%{search_term}%" | |
| params = (search_pattern, search_pattern, search_pattern, search_pattern, search_pattern) | |
| try: | |
| results = execute_query(query, params) | |
| if not results: | |
| return { | |
| "success": True, | |
| "drivers": [], | |
| "count": 0, | |
| "message": f"No drivers found matching '{search_term}'" | |
| } | |
| drivers = [] | |
| for row in results: | |
| drivers.append({ | |
| "driver_id": row['driver_id'], | |
| "name": row['name'], | |
| "phone": row['phone'], | |
| "email": row['email'], | |
| "vehicle_type": row['vehicle_type'], | |
| "vehicle_plate": row['vehicle_plate'], | |
| "status": row['status'], | |
| "created_at": str(row['created_at']) | |
| }) | |
| logger.info(f"Search '{search_term}' found {len(drivers)} drivers") | |
| return { | |
| "success": True, | |
| "drivers": drivers, | |
| "count": len(drivers), | |
| "message": f"Found {len(drivers)} driver(s) matching '{search_term}'" | |
| } | |
| except Exception as e: | |
| logger.error(f"Database error searching drivers: {e}") | |
| return { | |
| "success": False, | |
| "error": f"Failed to search drivers: {str(e)}" | |
| } | |
| def handle_get_available_drivers(tool_input: dict) -> dict: | |
| """ | |
| Execute get available drivers tool | |
| Args: | |
| tool_input: Dict with optional limit | |
| Returns: | |
| List of available drivers (active or offline) | |
| """ | |
| limit = min(tool_input.get("limit", 20), 100) | |
| query = """ | |
| SELECT | |
| driver_id, name, phone, vehicle_type, vehicle_plate, | |
| current_lat, current_lng, last_location_update, | |
| status, capacity_kg, capacity_m3, skills | |
| FROM drivers | |
| WHERE status IN ('active', 'offline') | |
| ORDER BY | |
| CASE status | |
| WHEN 'active' THEN 1 | |
| WHEN 'offline' THEN 2 | |
| END, | |
| name ASC | |
| LIMIT %s | |
| """ | |
| try: | |
| results = execute_query(query, (limit,)) | |
| if not results: | |
| return { | |
| "success": True, | |
| "drivers": [], | |
| "count": 0, | |
| "message": "No available drivers found" | |
| } | |
| drivers = [] | |
| for row in results: | |
| # Parse skills JSON if present | |
| skills = [] | |
| if row['skills']: | |
| try: | |
| import json | |
| skills = json.loads(row['skills']) if isinstance(row['skills'], str) else row['skills'] | |
| except: | |
| skills = [] | |
| drivers.append({ | |
| "driver_id": row['driver_id'], | |
| "name": row['name'], | |
| "phone": row['phone'], | |
| "location": { | |
| "latitude": float(row['current_lat']) if row['current_lat'] else None, | |
| "longitude": float(row['current_lng']) if row['current_lng'] else None, | |
| "last_update": str(row['last_location_update']) if row['last_location_update'] else None | |
| }, | |
| "status": row['status'], | |
| "vehicle": { | |
| "type": row['vehicle_type'], | |
| "plate": row['vehicle_plate'], | |
| "capacity_kg": float(row['capacity_kg']) if row['capacity_kg'] else None, | |
| "capacity_m3": float(row['capacity_m3']) if row['capacity_m3'] else None | |
| }, | |
| "skills": skills | |
| }) | |
| logger.info(f"Retrieved {len(drivers)} available drivers") | |
| return { | |
| "success": True, | |
| "drivers": drivers, | |
| "count": len(drivers), | |
| "message": f"Found {len(drivers)} available driver(s)" | |
| } | |
| except Exception as e: | |
| logger.error(f"Database error getting available drivers: {e}") | |
| return { | |
| "success": False, | |
| "error": f"Failed to get available drivers: {str(e)}" | |
| } | |
| def get_tools_list() -> list: | |
| """Get list of available tools""" | |
| return [tool["name"] for tool in TOOLS_SCHEMA] | |
| def get_tool_description(tool_name: str) -> str: | |
| """Get description for a specific tool""" | |
| for tool in TOOLS_SCHEMA: | |
| if tool["name"] == tool_name: | |
| return tool["description"] | |
| return "" | |