""" Tool definitions and execution handlers for FleetMind chat Simulates MCP tools using Claude's tool calling feature """ import sys from pathlib import Path from datetime import datetime, timedelta import logging # Add parent directory to path sys.path.insert(0, str(Path(__file__).parent.parent)) from database.connection import execute_write, execute_query from chat.geocoding import GeocodingService logger = logging.getLogger(__name__) # Initialize geocoding service geocoding_service = GeocodingService() # Tool schemas for Claude TOOLS_SCHEMA = [ { "name": "geocode_address", "description": "Convert a delivery address to GPS coordinates and validate the address format. Use this before creating an order to ensure the address is valid.", "input_schema": { "type": "object", "properties": { "address": { "type": "string", "description": "The full delivery address to geocode (e.g., '123 Main St, San Francisco, CA')" } }, "required": ["address"] } }, { "name": "create_order", "description": "Create a new delivery order in the database. Only call this after geocoding the address successfully.", "input_schema": { "type": "object", "properties": { "customer_name": { "type": "string", "description": "Full name of the customer" }, "customer_phone": { "type": "string", "description": "Customer phone number (optional)" }, "customer_email": { "type": "string", "description": "Customer email address (optional)" }, "delivery_address": { "type": "string", "description": "Full delivery address" }, "delivery_lat": { "type": "number", "description": "Latitude from geocoding" }, "delivery_lng": { "type": "number", "description": "Longitude from geocoding" }, "time_window_end": { "type": "string", "description": "Delivery deadline in ISO format (e.g., '2025-11-13T17:00:00'). If not specified by user, default to 6 hours from now." }, "priority": { "type": "string", "enum": ["standard", "express", "urgent"], "description": "Delivery priority. Default to 'standard' unless user specifies urgent/express." }, "special_instructions": { "type": "string", "description": "Any special delivery instructions (optional)" }, "weight_kg": { "type": "number", "description": "Package weight in kilograms (optional, default to 5.0)" } }, "required": ["customer_name", "delivery_address", "delivery_lat", "delivery_lng"] } }, { "name": "create_driver", "description": "Create a new delivery driver in the database. Use this to onboard new drivers to the fleet.", "input_schema": { "type": "object", "properties": { "name": { "type": "string", "description": "Full name of the driver" }, "phone": { "type": "string", "description": "Driver phone number (optional)" }, "email": { "type": "string", "description": "Driver email address (optional)" }, "vehicle_type": { "type": "string", "description": "Type of vehicle: van, truck, car, motorcycle (default: van)" }, "vehicle_plate": { "type": "string", "description": "Vehicle license plate number (optional)" }, "capacity_kg": { "type": "number", "description": "Vehicle cargo capacity in kilograms (default: 1000.0)" }, "capacity_m3": { "type": "number", "description": "Vehicle cargo volume in cubic meters (default: 12.0)" }, "skills": { "type": "array", "description": "List of driver skills/certifications: refrigerated, medical_certified, fragile_handler, overnight, express_delivery", "items": { "type": "string" } }, "status": { "type": "string", "enum": ["active", "busy", "offline", "unavailable"], "description": "Driver status (default: active)" } }, "required": ["name"] } }, { "name": "count_orders", "description": "Count total orders in the database with optional filters. Use this when user asks 'how many orders', 'fetch orders', or wants to know order statistics.", "input_schema": { "type": "object", "properties": { "status": { "type": "string", "enum": ["pending", "assigned", "in_transit", "delivered", "failed", "cancelled"], "description": "Filter by order status (optional)" }, "priority": { "type": "string", "enum": ["standard", "express", "urgent"], "description": "Filter by priority level (optional)" }, "payment_status": { "type": "string", "enum": ["pending", "paid", "cod"], "description": "Filter by payment status (optional)" }, "assigned_driver_id": { "type": "string", "description": "Filter by assigned driver ID (optional)" }, "is_fragile": { "type": "boolean", "description": "Filter fragile packages only (optional)" }, "requires_signature": { "type": "boolean", "description": "Filter orders requiring signature (optional)" }, "requires_cold_storage": { "type": "boolean", "description": "Filter orders requiring cold storage (optional)" } }, "required": [] } }, { "name": "fetch_orders", "description": "Fetch orders from the database with optional filters, pagination, and sorting. Use after counting to show specific number of orders.", "input_schema": { "type": "object", "properties": { "limit": { "type": "integer", "description": "Number of orders to fetch (default: 10, max: 100)" }, "offset": { "type": "integer", "description": "Number of orders to skip for pagination (default: 0)" }, "status": { "type": "string", "enum": ["pending", "assigned", "in_transit", "delivered", "failed", "cancelled"], "description": "Filter by order status (optional)" }, "priority": { "type": "string", "enum": ["standard", "express", "urgent"], "description": "Filter by priority level (optional)" }, "payment_status": { "type": "string", "enum": ["pending", "paid", "cod"], "description": "Filter by payment status (optional)" }, "assigned_driver_id": { "type": "string", "description": "Filter by assigned driver ID (optional)" }, "is_fragile": { "type": "boolean", "description": "Filter fragile packages only (optional)" }, "requires_signature": { "type": "boolean", "description": "Filter orders requiring signature (optional)" }, "requires_cold_storage": { "type": "boolean", "description": "Filter orders requiring cold storage (optional)" }, "sort_by": { "type": "string", "enum": ["created_at", "priority", "time_window_start"], "description": "Field to sort by (default: created_at)" }, "sort_order": { "type": "string", "enum": ["ASC", "DESC"], "description": "Sort order (default: DESC for newest first)" } }, "required": [] } }, { "name": "get_order_details", "description": "Get complete details of a specific order by order ID. Use when user asks 'tell me about order X' or wants detailed information about a specific order.", "input_schema": { "type": "object", "properties": { "order_id": { "type": "string", "description": "The order ID to fetch details for (e.g., 'ORD-20251114163800')" } }, "required": ["order_id"] } }, { "name": "search_orders", "description": "Search for orders by customer name, email, phone, or order ID pattern. Use when user provides partial information to find orders.", "input_schema": { "type": "object", "properties": { "search_term": { "type": "string", "description": "Search term to match against customer_name, customer_email, customer_phone, or order_id" } }, "required": ["search_term"] } }, { "name": "get_incomplete_orders", "description": "Get all orders that are not yet completed (excludes delivered and cancelled orders). Shortcut for finding orders in progress (pending, assigned, in_transit).", "input_schema": { "type": "object", "properties": { "limit": { "type": "integer", "description": "Number of orders to fetch (default: 20)" } }, "required": [] } }, { "name": "count_drivers", "description": "Count total drivers in the database with optional filters. Use this when user asks 'how many drivers', 'show drivers', or wants driver statistics.", "input_schema": { "type": "object", "properties": { "status": { "type": "string", "enum": ["active", "busy", "offline", "unavailable"], "description": "Filter by driver status (optional)" }, "vehicle_type": { "type": "string", "description": "Filter by vehicle type: van, truck, car, motorcycle, etc. (optional)" } }, "required": [] } }, { "name": "fetch_drivers", "description": "Fetch drivers from the database with optional filters, pagination, and sorting. Use after counting to show specific number of drivers.", "input_schema": { "type": "object", "properties": { "limit": { "type": "integer", "description": "Number of drivers to fetch (default: 10, max: 100)" }, "offset": { "type": "integer", "description": "Number of drivers to skip for pagination (default: 0)" }, "status": { "type": "string", "enum": ["active", "busy", "offline", "unavailable"], "description": "Filter by driver status (optional)" }, "vehicle_type": { "type": "string", "description": "Filter by vehicle type: van, truck, car, motorcycle, etc. (optional)" }, "sort_by": { "type": "string", "enum": ["name", "status", "created_at", "last_location_update"], "description": "Field to sort by (default: name)" }, "sort_order": { "type": "string", "enum": ["ASC", "DESC"], "description": "Sort order (default: ASC for alphabetical)" } }, "required": [] } }, { "name": "get_driver_details", "description": "Get complete details of a specific driver by driver ID, including current location (latitude, longitude, and human-readable address), contact info, vehicle details, status, and skills. Use when user asks about a driver's location, coordinates, position, or any other driver information.", "input_schema": { "type": "object", "properties": { "driver_id": { "type": "string", "description": "The driver ID to fetch details for (e.g., 'DRV-20251114163800')" } }, "required": ["driver_id"] } }, { "name": "search_drivers", "description": "Search for drivers by name, email, phone, vehicle plate, or driver ID pattern. Use when user provides partial information to find drivers.", "input_schema": { "type": "object", "properties": { "search_term": { "type": "string", "description": "Search term to match against name, email, phone, vehicle_plate, or driver_id" } }, "required": ["search_term"] } }, { "name": "get_available_drivers", "description": "Get all drivers that are available for assignment (active or offline status, excludes busy and unavailable). Shortcut for finding drivers ready for dispatch.", "input_schema": { "type": "object", "properties": { "limit": { "type": "integer", "description": "Number of drivers to fetch (default: 20)" } }, "required": [] } }, { "name": "update_order", "description": "Update an existing order's details. You can update any combination of fields. Only provide the fields you want to change.", "input_schema": { "type": "object", "properties": { "order_id": { "type": "string", "description": "Order ID to update (e.g., 'ORD-20250114123456')" }, "customer_name": { "type": "string", "description": "Updated customer name" }, "customer_phone": { "type": "string", "description": "Updated customer phone number" }, "customer_email": { "type": "string", "description": "Updated customer email address" }, "delivery_address": { "type": "string", "description": "Updated delivery address" }, "delivery_lat": { "type": "number", "description": "Updated delivery latitude (required if updating address)" }, "delivery_lng": { "type": "number", "description": "Updated delivery longitude (required if updating address)" }, "status": { "type": "string", "description": "Updated order status", "enum": ["pending", "assigned", "in_transit", "delivered", "failed", "cancelled"] }, "priority": { "type": "string", "description": "Updated priority level", "enum": ["standard", "express", "urgent"] }, "special_instructions": { "type": "string", "description": "Updated special delivery instructions" }, "time_window_end": { "type": "string", "description": "Updated delivery deadline (ISO format datetime)" }, "payment_status": { "type": "string", "description": "Updated payment status", "enum": ["pending", "paid", "cod"] }, "weight_kg": { "type": "number", "description": "Updated package weight in kilograms" }, "order_value": { "type": "number", "description": "Updated order value in currency" } }, "required": ["order_id"] } }, { "name": "delete_order", "description": "Permanently delete an order from the database. This action cannot be undone. Use with caution.", "input_schema": { "type": "object", "properties": { "order_id": { "type": "string", "description": "Order ID to delete (e.g., 'ORD-20250114123456')" }, "confirm": { "type": "boolean", "description": "Must be set to true to confirm deletion" } }, "required": ["order_id", "confirm"] } }, { "name": "update_driver", "description": "Update an existing driver's details. You can update any combination of fields. Only provide the fields you want to change.", "input_schema": { "type": "object", "properties": { "driver_id": { "type": "string", "description": "Driver ID to update (e.g., 'DRV-20250114123456')" }, "name": { "type": "string", "description": "Updated driver name" }, "phone": { "type": "string", "description": "Updated phone number" }, "email": { "type": "string", "description": "Updated email address" }, "status": { "type": "string", "description": "Updated driver status", "enum": ["active", "busy", "offline", "unavailable"] }, "vehicle_type": { "type": "string", "description": "Updated vehicle type" }, "vehicle_plate": { "type": "string", "description": "Updated vehicle license plate" }, "capacity_kg": { "type": "number", "description": "Updated cargo capacity in kilograms" }, "capacity_m3": { "type": "number", "description": "Updated cargo capacity in cubic meters" }, "skills": { "type": "array", "items": {"type": "string"}, "description": "Updated list of driver skills/certifications" }, "current_lat": { "type": "number", "description": "Updated current latitude" }, "current_lng": { "type": "number", "description": "Updated current longitude" } }, "required": ["driver_id"] } }, { "name": "delete_driver", "description": "Permanently delete a driver from the database. This action cannot be undone. Use with caution.", "input_schema": { "type": "object", "properties": { "driver_id": { "type": "string", "description": "Driver ID to delete (e.g., 'DRV-20250114123456')" }, "confirm": { "type": "boolean", "description": "Must be set to true to confirm deletion" } }, "required": ["driver_id", "confirm"] } } ] def execute_tool(tool_name: str, tool_input: dict) -> dict: """ Route tool execution to appropriate handler Args: tool_name: Name of the tool to execute tool_input: Tool input parameters Returns: Dict with tool execution results """ try: if tool_name == "geocode_address": return handle_geocode_address(tool_input) elif tool_name == "create_order": return handle_create_order(tool_input) elif tool_name == "create_driver": return handle_create_driver(tool_input) elif tool_name == "count_orders": return handle_count_orders(tool_input) elif tool_name == "fetch_orders": return handle_fetch_orders(tool_input) elif tool_name == "get_order_details": return handle_get_order_details(tool_input) elif tool_name == "search_orders": return handle_search_orders(tool_input) elif tool_name == "get_incomplete_orders": return handle_get_incomplete_orders(tool_input) elif tool_name == "count_drivers": return handle_count_drivers(tool_input) elif tool_name == "fetch_drivers": return handle_fetch_drivers(tool_input) elif tool_name == "get_driver_details": return handle_get_driver_details(tool_input) elif tool_name == "search_drivers": return handle_search_drivers(tool_input) elif tool_name == "get_available_drivers": return handle_get_available_drivers(tool_input) elif tool_name == "update_order": return handle_update_order(tool_input) elif tool_name == "delete_order": return handle_delete_order(tool_input) elif tool_name == "update_driver": return handle_update_driver(tool_input) elif tool_name == "delete_driver": return handle_delete_driver(tool_input) else: return { "success": False, "error": f"Unknown tool: {tool_name}" } except Exception as e: logger.error(f"Tool execution error ({tool_name}): {e}") return { "success": False, "error": str(e) } def handle_geocode_address(tool_input: dict) -> dict: """ Execute geocoding tool Args: tool_input: Dict with 'address' key Returns: Geocoding result """ address = tool_input.get("address", "") if not address: return { "success": False, "error": "Address is required" } logger.info(f"Geocoding address: {address}") result = geocoding_service.geocode(address) return { "success": True, "latitude": result["lat"], "longitude": result["lng"], "formatted_address": result["formatted_address"], "confidence": result["confidence"], "message": f"Address geocoded successfully ({result['confidence']})" } def handle_create_order(tool_input: dict) -> dict: """ Execute order creation tool Args: tool_input: Dict with order fields Returns: Order creation result """ # Extract fields with defaults customer_name = tool_input.get("customer_name") customer_phone = tool_input.get("customer_phone") customer_email = tool_input.get("customer_email") delivery_address = tool_input.get("delivery_address") delivery_lat = tool_input.get("delivery_lat") delivery_lng = tool_input.get("delivery_lng") priority = tool_input.get("priority", "standard") special_instructions = tool_input.get("special_instructions") weight_kg = tool_input.get("weight_kg", 5.0) # Validate required fields if not all([customer_name, delivery_address, delivery_lat, delivery_lng]): return { "success": False, "error": "Missing required fields: customer_name, delivery_address, delivery_lat, delivery_lng" } # Generate order ID now = datetime.now() order_id = f"ORD-{now.strftime('%Y%m%d%H%M%S')}" # Handle time window time_window_end_str = tool_input.get("time_window_end") if time_window_end_str: try: time_window_end = datetime.fromisoformat(time_window_end_str.replace('Z', '+00:00')) except: time_window_end = now + timedelta(hours=6) else: time_window_end = now + timedelta(hours=6) time_window_start = now + timedelta(hours=2) # Insert into database query = """ INSERT INTO orders ( order_id, customer_name, customer_phone, customer_email, delivery_address, delivery_lat, delivery_lng, time_window_start, time_window_end, priority, weight_kg, status, special_instructions ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """ params = ( order_id, customer_name, customer_phone, customer_email, delivery_address, delivery_lat, delivery_lng, time_window_start, time_window_end, priority, weight_kg, "pending", special_instructions ) try: execute_write(query, params) logger.info(f"Order created: {order_id}") return { "success": True, "order_id": order_id, "status": "pending", "customer": customer_name, "address": delivery_address, "deadline": time_window_end.strftime("%Y-%m-%d %H:%M"), "priority": priority, "message": f"Order {order_id} created successfully!" } except Exception as e: logger.error(f"Database error creating order: {e}") return { "success": False, "error": f"Failed to create order: {str(e)}" } def handle_create_driver(tool_input: dict) -> dict: """ Execute driver creation tool Args: tool_input: Dict with driver fields Returns: Driver creation result """ # Extract fields with defaults name = tool_input.get("name") phone = tool_input.get("phone") email = tool_input.get("email") vehicle_type = tool_input.get("vehicle_type", "van") vehicle_plate = tool_input.get("vehicle_plate") capacity_kg = tool_input.get("capacity_kg", 1000.0) capacity_m3 = tool_input.get("capacity_m3", 12.0) # Convert skills to regular list (handles protobuf RepeatedComposite) skills_raw = tool_input.get("skills", []) skills = list(skills_raw) if skills_raw else [] status = tool_input.get("status", "active") # Validate required fields if not name: return { "success": False, "error": "Missing required field: name" } # Generate driver ID now = datetime.now() driver_id = f"DRV-{now.strftime('%Y%m%d%H%M%S')}" # Default location (San Francisco) current_lat = tool_input.get("current_lat", 37.7749) current_lng = tool_input.get("current_lng", -122.4194) # Insert into database query = """ INSERT INTO drivers ( driver_id, name, phone, email, current_lat, current_lng, last_location_update, status, vehicle_type, vehicle_plate, capacity_kg, capacity_m3, skills ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """ # Convert skills list to JSON import json skills_json = json.dumps(skills) if skills else json.dumps([]) params = ( driver_id, name, phone, email, current_lat, current_lng, now, status, vehicle_type, vehicle_plate, capacity_kg, capacity_m3, skills_json ) try: execute_write(query, params) logger.info(f"Driver created: {driver_id}") return { "success": True, "driver_id": driver_id, "name": name, "status": status, "vehicle_type": vehicle_type, "vehicle_plate": vehicle_plate, "capacity_kg": capacity_kg, "skills": skills, "message": f"Driver {driver_id} ({name}) created successfully!" } except Exception as e: logger.error(f"Database error creating driver: {e}") return { "success": False, "error": f"Failed to create driver: {str(e)}" } def handle_update_order(tool_input: dict) -> dict: """ Execute order update tool Args: tool_input: Dict with order_id and fields to update Returns: Update result """ import json order_id = tool_input.get("order_id") # Validate required field if not order_id: return { "success": False, "error": "Missing required field: order_id" } # Check if order exists check_query = "SELECT order_id FROM orders WHERE order_id = %s" existing = execute_query(check_query, (order_id,)) if not existing: return { "success": False, "error": f"Order {order_id} not found" } # Auto-geocode if delivery address is updated without coordinates if "delivery_address" in tool_input and ("delivery_lat" not in tool_input or "delivery_lng" not in tool_input): from chat.geocoding import GeocodingService geocoding_service = GeocodingService() try: geocode_result = geocoding_service.geocode(tool_input["delivery_address"]) tool_input["delivery_lat"] = geocode_result["lat"] tool_input["delivery_lng"] = geocode_result["lng"] logger.info(f"Auto-geocoded delivery address: {geocode_result['formatted_address']}") except Exception as e: logger.warning(f"Failed to geocode address, skipping coordinate update: {e}") # Build UPDATE query dynamically based on provided fields update_fields = [] params = [] # Map of field names to their database columns updateable_fields = { "customer_name": "customer_name", "customer_phone": "customer_phone", "customer_email": "customer_email", "delivery_address": "delivery_address", "delivery_lat": "delivery_lat", "delivery_lng": "delivery_lng", "status": "status", "priority": "priority", "special_instructions": "special_instructions", "time_window_end": "time_window_end", "payment_status": "payment_status", "weight_kg": "weight_kg", "order_value": "order_value" } for field, column in updateable_fields.items(): if field in tool_input: update_fields.append(f"{column} = %s") params.append(tool_input[field]) if not update_fields: return { "success": False, "error": "No fields provided to update" } # Always update the updated_at timestamp update_fields.append("updated_at = %s") params.append(datetime.now()) # Add order_id for WHERE clause params.append(order_id) # Execute update query = f""" UPDATE orders SET {', '.join(update_fields)} WHERE order_id = %s """ try: execute_write(query, tuple(params)) logger.info(f"Order updated: {order_id}") return { "success": True, "order_id": order_id, "updated_fields": list(updateable_fields.keys() & tool_input.keys()), "message": f"Order {order_id} updated successfully!" } except Exception as e: logger.error(f"Database error updating order: {e}") return { "success": False, "error": f"Failed to update order: {str(e)}" } def handle_delete_order(tool_input: dict) -> dict: """ Execute order deletion tool Args: tool_input: Dict with order_id and confirm flag Returns: Deletion result """ order_id = tool_input.get("order_id") confirm = tool_input.get("confirm", False) # Validate required fields if not order_id: return { "success": False, "error": "Missing required field: order_id" } if not confirm: return { "success": False, "error": "Deletion not confirmed. Set confirm=true to proceed." } # Check if order exists check_query = "SELECT order_id, status FROM orders WHERE order_id = %s" existing = execute_query(check_query, (order_id,)) if not existing: return { "success": False, "error": f"Order {order_id} not found" } # Delete the order query = "DELETE FROM orders WHERE order_id = %s" try: execute_write(query, (order_id,)) logger.info(f"Order deleted: {order_id}") return { "success": True, "order_id": order_id, "message": f"Order {order_id} has been permanently deleted." } except Exception as e: logger.error(f"Database error deleting order: {e}") return { "success": False, "error": f"Failed to delete order: {str(e)}" } def handle_update_driver(tool_input: dict) -> dict: """ Execute driver update tool Args: tool_input: Dict with driver_id and fields to update Returns: Update result """ import json driver_id = tool_input.get("driver_id") # Validate required field if not driver_id: return { "success": False, "error": "Missing required field: driver_id" } # Check if driver exists check_query = "SELECT driver_id FROM drivers WHERE driver_id = %s" existing = execute_query(check_query, (driver_id,)) if not existing: return { "success": False, "error": f"Driver {driver_id} not found" } # Build UPDATE query dynamically based on provided fields update_fields = [] params = [] # Map of field names to their database columns updateable_fields = { "name": "name", "phone": "phone", "email": "email", "status": "status", "vehicle_type": "vehicle_type", "vehicle_plate": "vehicle_plate", "capacity_kg": "capacity_kg", "capacity_m3": "capacity_m3", "current_lat": "current_lat", "current_lng": "current_lng" } for field, column in updateable_fields.items(): if field in tool_input: update_fields.append(f"{column} = %s") params.append(tool_input[field]) # Handle skills array specially (convert to JSON) if "skills" in tool_input: skills = list(tool_input.get("skills", [])) update_fields.append("skills = %s") params.append(json.dumps(skills)) if not update_fields: return { "success": False, "error": "No fields provided to update" } # Always update the updated_at timestamp update_fields.append("updated_at = %s") params.append(datetime.now()) # Update location timestamp if lat/lng changed if "current_lat" in tool_input or "current_lng" in tool_input: update_fields.append("last_location_update = %s") params.append(datetime.now()) # Add driver_id for WHERE clause params.append(driver_id) # Execute update query = f""" UPDATE drivers SET {', '.join(update_fields)} WHERE driver_id = %s """ try: execute_write(query, tuple(params)) logger.info(f"Driver updated: {driver_id}") updated_list = list(updateable_fields.keys() & tool_input.keys()) if "skills" in tool_input: updated_list.append("skills") return { "success": True, "driver_id": driver_id, "updated_fields": updated_list, "message": f"Driver {driver_id} updated successfully!" } except Exception as e: logger.error(f"Database error updating driver: {e}") return { "success": False, "error": f"Failed to update driver: {str(e)}" } def handle_delete_driver(tool_input: dict) -> dict: """ Execute driver deletion tool Args: tool_input: Dict with driver_id and confirm flag Returns: Deletion result """ driver_id = tool_input.get("driver_id") confirm = tool_input.get("confirm", False) # Validate required fields if not driver_id: return { "success": False, "error": "Missing required field: driver_id" } if not confirm: return { "success": False, "error": "Deletion not confirmed. Set confirm=true to proceed." } # Check if driver exists check_query = "SELECT driver_id, name FROM drivers WHERE driver_id = %s" existing = execute_query(check_query, (driver_id,)) if not existing: return { "success": False, "error": f"Driver {driver_id} not found" } driver_name = existing[0]["name"] # Delete the driver query = "DELETE FROM drivers WHERE driver_id = %s" try: execute_write(query, (driver_id,)) logger.info(f"Driver deleted: {driver_id}") return { "success": True, "driver_id": driver_id, "message": f"Driver {driver_id} ({driver_name}) has been permanently deleted." } except Exception as e: logger.error(f"Database error deleting driver: {e}") return { "success": False, "error": f"Failed to delete driver: {str(e)}" } def handle_count_orders(tool_input: dict) -> dict: """ Execute count orders tool Args: tool_input: Dict with optional filter fields Returns: Order count result with breakdown """ # Build WHERE clause based on filters where_clauses = [] params = [] if "status" in tool_input: where_clauses.append("status = %s") params.append(tool_input["status"]) if "priority" in tool_input: where_clauses.append("priority = %s") params.append(tool_input["priority"]) if "payment_status" in tool_input: where_clauses.append("payment_status = %s") params.append(tool_input["payment_status"]) if "assigned_driver_id" in tool_input: where_clauses.append("assigned_driver_id = %s") params.append(tool_input["assigned_driver_id"]) if "is_fragile" in tool_input: where_clauses.append("is_fragile = %s") params.append(tool_input["is_fragile"]) if "requires_signature" in tool_input: where_clauses.append("requires_signature = %s") params.append(tool_input["requires_signature"]) if "requires_cold_storage" in tool_input: where_clauses.append("requires_cold_storage = %s") params.append(tool_input["requires_cold_storage"]) where_sql = " WHERE " + " AND ".join(where_clauses) if where_clauses else "" # Total count query count_query = f"SELECT COUNT(*) as total FROM orders{where_sql}" # Breakdown by status query breakdown_query = f""" SELECT status, COUNT(*) as count FROM orders{where_sql} GROUP BY status ORDER BY count DESC """ # Breakdown by priority query priority_query = f""" SELECT priority, COUNT(*) as count FROM orders{where_sql} GROUP BY priority ORDER BY CASE priority WHEN 'urgent' THEN 1 WHEN 'express' THEN 2 WHEN 'standard' THEN 3 END """ try: # Execute queries total_result = execute_query(count_query, tuple(params) if params else None) total = total_result[0]['total'] if total_result else 0 status_result = execute_query(breakdown_query, tuple(params) if params else None) priority_result = execute_query(priority_query, tuple(params) if params else None) # Format breakdown status_breakdown = {row['status']: row['count'] for row in status_result} if status_result else {} priority_breakdown = {row['priority']: row['count'] for row in priority_result} if priority_result else {} logger.info(f"Counted orders: {total} total") return { "success": True, "total": total, "status_breakdown": status_breakdown, "priority_breakdown": priority_breakdown, "message": f"Found {total} order(s)" } except Exception as e: logger.error(f"Database error counting orders: {e}") return { "success": False, "error": f"Failed to count orders: {str(e)}" } def handle_fetch_orders(tool_input: dict) -> dict: """ Execute fetch orders tool Args: tool_input: Dict with filter, pagination, and sorting options Returns: List of orders matching criteria """ # Extract pagination and sorting limit = min(tool_input.get("limit", 10), 100) # Cap at 100 offset = tool_input.get("offset", 0) sort_by = tool_input.get("sort_by", "created_at") sort_order = tool_input.get("sort_order", "DESC") # Build WHERE clause based on filters where_clauses = [] params = [] if "status" in tool_input: where_clauses.append("status = %s") params.append(tool_input["status"]) if "priority" in tool_input: where_clauses.append("priority = %s") params.append(tool_input["priority"]) if "payment_status" in tool_input: where_clauses.append("payment_status = %s") params.append(tool_input["payment_status"]) if "assigned_driver_id" in tool_input: where_clauses.append("assigned_driver_id = %s") params.append(tool_input["assigned_driver_id"]) if "is_fragile" in tool_input: where_clauses.append("is_fragile = %s") params.append(tool_input["is_fragile"]) if "requires_signature" in tool_input: where_clauses.append("requires_signature = %s") params.append(tool_input["requires_signature"]) if "requires_cold_storage" in tool_input: where_clauses.append("requires_cold_storage = %s") params.append(tool_input["requires_cold_storage"]) where_sql = " WHERE " + " AND ".join(where_clauses) if where_clauses else "" # Build query query = f""" SELECT order_id, customer_name, customer_phone, customer_email, delivery_address, delivery_lat, delivery_lng, time_window_start, time_window_end, priority, weight_kg, volume_m3, special_instructions, status, assigned_driver_id, created_at, updated_at, delivered_at, order_value, payment_status, requires_signature, is_fragile, requires_cold_storage FROM orders {where_sql} ORDER BY {sort_by} {sort_order} LIMIT %s OFFSET %s """ params.extend([limit, offset]) try: results = execute_query(query, tuple(params)) if not results: return { "success": True, "orders": [], "count": 0, "message": "No orders found matching criteria" } # Format orders for readability orders = [] for row in results: order = { "order_id": row['order_id'], "customer": { "name": row['customer_name'], "phone": row['customer_phone'], "email": row['customer_email'] }, "delivery": { "address": row['delivery_address'], "latitude": float(row['delivery_lat']) if row['delivery_lat'] else None, "longitude": float(row['delivery_lng']) if row['delivery_lng'] else None }, "time_window": { "start": str(row['time_window_start']) if row['time_window_start'] else None, "end": str(row['time_window_end']) if row['time_window_end'] else None }, "details": { "priority": row['priority'], "status": row['status'], "weight_kg": float(row['weight_kg']) if row['weight_kg'] else None, "volume_m3": float(row['volume_m3']) if row['volume_m3'] else None, "special_instructions": row['special_instructions'] }, "flags": { "requires_signature": row['requires_signature'], "is_fragile": row['is_fragile'], "requires_cold_storage": row['requires_cold_storage'] }, "payment": { "order_value": float(row['order_value']) if row['order_value'] else None, "payment_status": row['payment_status'] }, "assigned_driver_id": row['assigned_driver_id'], "timestamps": { "created_at": str(row['created_at']), "updated_at": str(row['updated_at']) if row['updated_at'] else None, "delivered_at": str(row['delivered_at']) if row['delivered_at'] else None } } orders.append(order) logger.info(f"Fetched {len(orders)} orders") return { "success": True, "orders": orders, "count": len(orders), "message": f"Retrieved {len(orders)} order(s)" } except Exception as e: logger.error(f"Database error fetching orders: {e}") return { "success": False, "error": f"Failed to fetch orders: {str(e)}" } def handle_get_order_details(tool_input: dict) -> dict: """ Execute get order details tool Args: tool_input: Dict with order_id Returns: Complete order details """ order_id = tool_input.get("order_id") if not order_id: return { "success": False, "error": "order_id is required" } query = """ SELECT order_id, customer_name, customer_phone, customer_email, pickup_address, pickup_lat, pickup_lng, delivery_address, delivery_lat, delivery_lng, time_window_start, time_window_end, priority, weight_kg, volume_m3, special_instructions, status, assigned_driver_id, created_at, updated_at, delivered_at, order_value, payment_status, requires_signature, is_fragile, requires_cold_storage FROM orders WHERE order_id = %s """ try: results = execute_query(query, (order_id,)) if not results: return { "success": False, "error": f"Order {order_id} not found" } row = results[0] order = { "order_id": row['order_id'], "customer": { "name": row['customer_name'], "phone": row['customer_phone'], "email": row['customer_email'] }, "pickup": { "address": row['pickup_address'], "latitude": float(row['pickup_lat']) if row['pickup_lat'] else None, "longitude": float(row['pickup_lng']) if row['pickup_lng'] else None } if row['pickup_address'] else None, "delivery": { "address": row['delivery_address'], "latitude": float(row['delivery_lat']) if row['delivery_lat'] else None, "longitude": float(row['delivery_lng']) if row['delivery_lng'] else None }, "time_window": { "start": str(row['time_window_start']) if row['time_window_start'] else None, "end": str(row['time_window_end']) if row['time_window_end'] else None }, "details": { "priority": row['priority'], "status": row['status'], "weight_kg": float(row['weight_kg']) if row['weight_kg'] else None, "volume_m3": float(row['volume_m3']) if row['volume_m3'] else None, "special_instructions": row['special_instructions'] }, "flags": { "requires_signature": row['requires_signature'], "is_fragile": row['is_fragile'], "requires_cold_storage": row['requires_cold_storage'] }, "payment": { "order_value": float(row['order_value']) if row['order_value'] else None, "payment_status": row['payment_status'] }, "assigned_driver_id": row['assigned_driver_id'], "timestamps": { "created_at": str(row['created_at']), "updated_at": str(row['updated_at']) if row['updated_at'] else None, "delivered_at": str(row['delivered_at']) if row['delivered_at'] else None } } logger.info(f"Retrieved details for order: {order_id}") return { "success": True, "order": order, "message": f"Order {order_id} details retrieved" } except Exception as e: logger.error(f"Database error getting order details: {e}") return { "success": False, "error": f"Failed to get order details: {str(e)}" } def handle_search_orders(tool_input: dict) -> dict: """ Execute search orders tool Args: tool_input: Dict with search_term Returns: List of matching orders """ search_term = tool_input.get("search_term", "").strip() if not search_term: return { "success": False, "error": "search_term is required" } query = """ SELECT order_id, customer_name, customer_phone, customer_email, delivery_address, priority, status, created_at FROM orders WHERE order_id ILIKE %s OR customer_name ILIKE %s OR customer_email ILIKE %s OR customer_phone ILIKE %s ORDER BY created_at DESC LIMIT 50 """ search_pattern = f"%{search_term}%" params = (search_pattern, search_pattern, search_pattern, search_pattern) try: results = execute_query(query, params) if not results: return { "success": True, "orders": [], "count": 0, "message": f"No orders found matching '{search_term}'" } orders = [] for row in results: orders.append({ "order_id": row['order_id'], "customer_name": row['customer_name'], "customer_phone": row['customer_phone'], "customer_email": row['customer_email'], "delivery_address": row['delivery_address'], "priority": row['priority'], "status": row['status'], "created_at": str(row['created_at']) }) logger.info(f"Search '{search_term}' found {len(orders)} orders") return { "success": True, "orders": orders, "count": len(orders), "message": f"Found {len(orders)} order(s) matching '{search_term}'" } except Exception as e: logger.error(f"Database error searching orders: {e}") return { "success": False, "error": f"Failed to search orders: {str(e)}" } def handle_get_incomplete_orders(tool_input: dict) -> dict: """ Execute get incomplete orders tool Args: tool_input: Dict with optional limit Returns: List of incomplete orders (pending, assigned, in_transit) """ limit = min(tool_input.get("limit", 20), 100) query = """ SELECT order_id, customer_name, delivery_address, priority, status, time_window_end, created_at, assigned_driver_id FROM orders WHERE status IN ('pending', 'assigned', 'in_transit') ORDER BY CASE priority WHEN 'urgent' THEN 1 WHEN 'express' THEN 2 WHEN 'standard' THEN 3 END, time_window_end ASC LIMIT %s """ try: results = execute_query(query, (limit,)) if not results: return { "success": True, "orders": [], "count": 0, "message": "No incomplete orders found" } orders = [] for row in results: orders.append({ "order_id": row['order_id'], "customer_name": row['customer_name'], "delivery_address": row['delivery_address'], "priority": row['priority'], "status": row['status'], "time_window_end": str(row['time_window_end']) if row['time_window_end'] else None, "created_at": str(row['created_at']), "assigned_driver_id": row['assigned_driver_id'] }) logger.info(f"Retrieved {len(orders)} incomplete orders") return { "success": True, "orders": orders, "count": len(orders), "message": f"Found {len(orders)} incomplete order(s)" } except Exception as e: logger.error(f"Database error getting incomplete orders: {e}") return { "success": False, "error": f"Failed to get incomplete orders: {str(e)}" } def handle_count_drivers(tool_input: dict) -> dict: """ Execute count drivers tool Args: tool_input: Dict with optional filter fields Returns: Driver count result with breakdown """ # Build WHERE clause based on filters where_clauses = [] params = [] if "status" in tool_input: where_clauses.append("status = %s") params.append(tool_input["status"]) if "vehicle_type" in tool_input: where_clauses.append("vehicle_type = %s") params.append(tool_input["vehicle_type"]) where_sql = " WHERE " + " AND ".join(where_clauses) if where_clauses else "" # Total count query count_query = f"SELECT COUNT(*) as total FROM drivers{where_sql}" # Breakdown by status query status_query = f""" SELECT status, COUNT(*) as count FROM drivers{where_sql} GROUP BY status ORDER BY count DESC """ # Breakdown by vehicle type query vehicle_query = f""" SELECT vehicle_type, COUNT(*) as count FROM drivers{where_sql} GROUP BY vehicle_type ORDER BY count DESC """ try: # Execute queries total_result = execute_query(count_query, tuple(params) if params else None) total = total_result[0]['total'] if total_result else 0 status_result = execute_query(status_query, tuple(params) if params else None) vehicle_result = execute_query(vehicle_query, tuple(params) if params else None) # Format breakdown status_breakdown = {row['status']: row['count'] for row in status_result} if status_result else {} vehicle_breakdown = {row['vehicle_type']: row['count'] for row in vehicle_result if row['vehicle_type']} if vehicle_result else {} logger.info(f"Counted drivers: {total} total") return { "success": True, "total": total, "status_breakdown": status_breakdown, "vehicle_breakdown": vehicle_breakdown, "message": f"Found {total} driver(s)" } except Exception as e: logger.error(f"Database error counting drivers: {e}") return { "success": False, "error": f"Failed to count drivers: {str(e)}" } def handle_fetch_drivers(tool_input: dict) -> dict: """ Execute fetch drivers tool Args: tool_input: Dict with filter, pagination, and sorting options Returns: List of drivers matching criteria """ # Extract pagination and sorting limit = min(tool_input.get("limit", 10), 100) # Cap at 100 offset = tool_input.get("offset", 0) sort_by = tool_input.get("sort_by", "name") sort_order = tool_input.get("sort_order", "ASC") # Build WHERE clause based on filters where_clauses = [] params = [] if "status" in tool_input: where_clauses.append("status = %s") params.append(tool_input["status"]) if "vehicle_type" in tool_input: where_clauses.append("vehicle_type = %s") params.append(tool_input["vehicle_type"]) where_sql = " WHERE " + " AND ".join(where_clauses) if where_clauses else "" # Build query query = f""" SELECT driver_id, name, phone, email, current_lat, current_lng, last_location_update, status, vehicle_type, vehicle_plate, capacity_kg, capacity_m3, skills, created_at, updated_at FROM drivers {where_sql} ORDER BY {sort_by} {sort_order} LIMIT %s OFFSET %s """ params.extend([limit, offset]) try: results = execute_query(query, tuple(params)) if not results: return { "success": True, "drivers": [], "count": 0, "message": "No drivers found matching criteria" } # Format drivers for readability drivers = [] for row in results: # Parse skills JSON if present skills = [] if row['skills']: try: import json skills = json.loads(row['skills']) if isinstance(row['skills'], str) else row['skills'] except: skills = [] driver = { "driver_id": row['driver_id'], "name": row['name'], "contact": { "phone": row['phone'], "email": row['email'] }, "location": { "latitude": float(row['current_lat']) if row['current_lat'] else None, "longitude": float(row['current_lng']) if row['current_lng'] else None, "last_update": str(row['last_location_update']) if row['last_location_update'] else None }, "status": row['status'], "vehicle": { "type": row['vehicle_type'], "plate": row['vehicle_plate'], "capacity_kg": float(row['capacity_kg']) if row['capacity_kg'] else None, "capacity_m3": float(row['capacity_m3']) if row['capacity_m3'] else None }, "skills": skills, "timestamps": { "created_at": str(row['created_at']), "updated_at": str(row['updated_at']) if row['updated_at'] else None } } drivers.append(driver) logger.info(f"Fetched {len(drivers)} drivers") return { "success": True, "drivers": drivers, "count": len(drivers), "message": f"Retrieved {len(drivers)} driver(s)" } except Exception as e: logger.error(f"Database error fetching drivers: {e}") return { "success": False, "error": f"Failed to fetch drivers: {str(e)}" } def handle_get_driver_details(tool_input: dict) -> dict: """ Execute get driver details tool Args: tool_input: Dict with driver_id Returns: Complete driver details """ driver_id = tool_input.get("driver_id") if not driver_id: return { "success": False, "error": "driver_id is required" } query = """ SELECT driver_id, name, phone, email, current_lat, current_lng, last_location_update, status, vehicle_type, vehicle_plate, capacity_kg, capacity_m3, skills, created_at, updated_at FROM drivers WHERE driver_id = %s """ try: results = execute_query(query, (driver_id,)) if not results: return { "success": False, "error": f"Driver {driver_id} not found" } row = results[0] # Parse skills JSON if present skills = [] if row['skills']: try: import json skills = json.loads(row['skills']) if isinstance(row['skills'], str) else row['skills'] except: skills = [] # Reverse geocode location to get address location_address = None if row['current_lat'] and row['current_lng']: try: from chat.geocoding import GeocodingService geocoding_service = GeocodingService() reverse_result = geocoding_service.reverse_geocode( float(row['current_lat']), float(row['current_lng']) ) location_address = reverse_result.get('formatted_address', None) logger.info(f"Reverse geocoded driver location: {location_address}") except Exception as e: logger.warning(f"Failed to reverse geocode driver location: {e}") location_address = None driver = { "driver_id": row['driver_id'], "name": row['name'], "contact": { "phone": row['phone'], "email": row['email'] }, "location": { "latitude": float(row['current_lat']) if row['current_lat'] else None, "longitude": float(row['current_lng']) if row['current_lng'] else None, "address": location_address, "last_update": str(row['last_location_update']) if row['last_location_update'] else None }, "status": row['status'], "vehicle": { "type": row['vehicle_type'], "plate": row['vehicle_plate'], "capacity_kg": float(row['capacity_kg']) if row['capacity_kg'] else None, "capacity_m3": float(row['capacity_m3']) if row['capacity_m3'] else None }, "skills": skills, "timestamps": { "created_at": str(row['created_at']), "updated_at": str(row['updated_at']) if row['updated_at'] else None } } logger.info(f"Retrieved details for driver: {driver_id}") return { "success": True, "driver": driver, "message": f"Driver {driver_id} details retrieved" } except Exception as e: logger.error(f"Database error getting driver details: {e}") return { "success": False, "error": f"Failed to get driver details: {str(e)}" } def handle_search_drivers(tool_input: dict) -> dict: """ Execute search drivers tool Args: tool_input: Dict with search_term Returns: List of matching drivers """ search_term = tool_input.get("search_term", "").strip() if not search_term: return { "success": False, "error": "search_term is required" } query = """ SELECT driver_id, name, phone, email, vehicle_type, vehicle_plate, status, created_at FROM drivers WHERE driver_id ILIKE %s OR name ILIKE %s OR email ILIKE %s OR phone ILIKE %s OR vehicle_plate ILIKE %s ORDER BY name ASC LIMIT 50 """ search_pattern = f"%{search_term}%" params = (search_pattern, search_pattern, search_pattern, search_pattern, search_pattern) try: results = execute_query(query, params) if not results: return { "success": True, "drivers": [], "count": 0, "message": f"No drivers found matching '{search_term}'" } drivers = [] for row in results: drivers.append({ "driver_id": row['driver_id'], "name": row['name'], "phone": row['phone'], "email": row['email'], "vehicle_type": row['vehicle_type'], "vehicle_plate": row['vehicle_plate'], "status": row['status'], "created_at": str(row['created_at']) }) logger.info(f"Search '{search_term}' found {len(drivers)} drivers") return { "success": True, "drivers": drivers, "count": len(drivers), "message": f"Found {len(drivers)} driver(s) matching '{search_term}'" } except Exception as e: logger.error(f"Database error searching drivers: {e}") return { "success": False, "error": f"Failed to search drivers: {str(e)}" } def handle_get_available_drivers(tool_input: dict) -> dict: """ Execute get available drivers tool Args: tool_input: Dict with optional limit Returns: List of available drivers (active or offline) """ limit = min(tool_input.get("limit", 20), 100) query = """ SELECT driver_id, name, phone, vehicle_type, vehicle_plate, current_lat, current_lng, last_location_update, status, capacity_kg, capacity_m3, skills FROM drivers WHERE status IN ('active', 'offline') ORDER BY CASE status WHEN 'active' THEN 1 WHEN 'offline' THEN 2 END, name ASC LIMIT %s """ try: results = execute_query(query, (limit,)) if not results: return { "success": True, "drivers": [], "count": 0, "message": "No available drivers found" } drivers = [] for row in results: # Parse skills JSON if present skills = [] if row['skills']: try: import json skills = json.loads(row['skills']) if isinstance(row['skills'], str) else row['skills'] except: skills = [] drivers.append({ "driver_id": row['driver_id'], "name": row['name'], "phone": row['phone'], "location": { "latitude": float(row['current_lat']) if row['current_lat'] else None, "longitude": float(row['current_lng']) if row['current_lng'] else None, "last_update": str(row['last_location_update']) if row['last_location_update'] else None }, "status": row['status'], "vehicle": { "type": row['vehicle_type'], "plate": row['vehicle_plate'], "capacity_kg": float(row['capacity_kg']) if row['capacity_kg'] else None, "capacity_m3": float(row['capacity_m3']) if row['capacity_m3'] else None }, "skills": skills }) logger.info(f"Retrieved {len(drivers)} available drivers") return { "success": True, "drivers": drivers, "count": len(drivers), "message": f"Found {len(drivers)} available driver(s)" } except Exception as e: logger.error(f"Database error getting available drivers: {e}") return { "success": False, "error": f"Failed to get available drivers: {str(e)}" } def get_tools_list() -> list: """Get list of available tools""" return [tool["name"] for tool in TOOLS_SCHEMA] def get_tool_description(tool_name: str) -> str: """Get description for a specific tool""" for tool in TOOLS_SCHEMA: if tool["name"] == tool_name: return tool["description"] return ""