mashrur950's picture
Add reverse geocoding functionality and enhance driver details retrieval with location information
18f4b6b
raw
history blame
70.8 kB
"""
Tool definitions and execution handlers for FleetMind chat
Simulates MCP tools using Claude's tool calling feature
"""
import sys
from pathlib import Path
from datetime import datetime, timedelta
import logging
# Add parent directory to path
sys.path.insert(0, str(Path(__file__).parent.parent))
from database.connection import execute_write, execute_query
from chat.geocoding import GeocodingService
logger = logging.getLogger(__name__)
# Initialize geocoding service
geocoding_service = GeocodingService()
# Tool schemas for Claude
TOOLS_SCHEMA = [
{
"name": "geocode_address",
"description": "Convert a delivery address to GPS coordinates and validate the address format. Use this before creating an order to ensure the address is valid.",
"input_schema": {
"type": "object",
"properties": {
"address": {
"type": "string",
"description": "The full delivery address to geocode (e.g., '123 Main St, San Francisco, CA')"
}
},
"required": ["address"]
}
},
{
"name": "create_order",
"description": "Create a new delivery order in the database. Only call this after geocoding the address successfully.",
"input_schema": {
"type": "object",
"properties": {
"customer_name": {
"type": "string",
"description": "Full name of the customer"
},
"customer_phone": {
"type": "string",
"description": "Customer phone number (optional)"
},
"customer_email": {
"type": "string",
"description": "Customer email address (optional)"
},
"delivery_address": {
"type": "string",
"description": "Full delivery address"
},
"delivery_lat": {
"type": "number",
"description": "Latitude from geocoding"
},
"delivery_lng": {
"type": "number",
"description": "Longitude from geocoding"
},
"time_window_end": {
"type": "string",
"description": "Delivery deadline in ISO format (e.g., '2025-11-13T17:00:00'). If not specified by user, default to 6 hours from now."
},
"priority": {
"type": "string",
"enum": ["standard", "express", "urgent"],
"description": "Delivery priority. Default to 'standard' unless user specifies urgent/express."
},
"special_instructions": {
"type": "string",
"description": "Any special delivery instructions (optional)"
},
"weight_kg": {
"type": "number",
"description": "Package weight in kilograms (optional, default to 5.0)"
}
},
"required": ["customer_name", "delivery_address", "delivery_lat", "delivery_lng"]
}
},
{
"name": "create_driver",
"description": "Create a new delivery driver in the database. Use this to onboard new drivers to the fleet.",
"input_schema": {
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "Full name of the driver"
},
"phone": {
"type": "string",
"description": "Driver phone number (optional)"
},
"email": {
"type": "string",
"description": "Driver email address (optional)"
},
"vehicle_type": {
"type": "string",
"description": "Type of vehicle: van, truck, car, motorcycle (default: van)"
},
"vehicle_plate": {
"type": "string",
"description": "Vehicle license plate number (optional)"
},
"capacity_kg": {
"type": "number",
"description": "Vehicle cargo capacity in kilograms (default: 1000.0)"
},
"capacity_m3": {
"type": "number",
"description": "Vehicle cargo volume in cubic meters (default: 12.0)"
},
"skills": {
"type": "array",
"description": "List of driver skills/certifications: refrigerated, medical_certified, fragile_handler, overnight, express_delivery",
"items": {
"type": "string"
}
},
"status": {
"type": "string",
"enum": ["active", "busy", "offline", "unavailable"],
"description": "Driver status (default: active)"
}
},
"required": ["name"]
}
},
{
"name": "count_orders",
"description": "Count total orders in the database with optional filters. Use this when user asks 'how many orders', 'fetch orders', or wants to know order statistics.",
"input_schema": {
"type": "object",
"properties": {
"status": {
"type": "string",
"enum": ["pending", "assigned", "in_transit", "delivered", "failed", "cancelled"],
"description": "Filter by order status (optional)"
},
"priority": {
"type": "string",
"enum": ["standard", "express", "urgent"],
"description": "Filter by priority level (optional)"
},
"payment_status": {
"type": "string",
"enum": ["pending", "paid", "cod"],
"description": "Filter by payment status (optional)"
},
"assigned_driver_id": {
"type": "string",
"description": "Filter by assigned driver ID (optional)"
},
"is_fragile": {
"type": "boolean",
"description": "Filter fragile packages only (optional)"
},
"requires_signature": {
"type": "boolean",
"description": "Filter orders requiring signature (optional)"
},
"requires_cold_storage": {
"type": "boolean",
"description": "Filter orders requiring cold storage (optional)"
}
},
"required": []
}
},
{
"name": "fetch_orders",
"description": "Fetch orders from the database with optional filters, pagination, and sorting. Use after counting to show specific number of orders.",
"input_schema": {
"type": "object",
"properties": {
"limit": {
"type": "integer",
"description": "Number of orders to fetch (default: 10, max: 100)"
},
"offset": {
"type": "integer",
"description": "Number of orders to skip for pagination (default: 0)"
},
"status": {
"type": "string",
"enum": ["pending", "assigned", "in_transit", "delivered", "failed", "cancelled"],
"description": "Filter by order status (optional)"
},
"priority": {
"type": "string",
"enum": ["standard", "express", "urgent"],
"description": "Filter by priority level (optional)"
},
"payment_status": {
"type": "string",
"enum": ["pending", "paid", "cod"],
"description": "Filter by payment status (optional)"
},
"assigned_driver_id": {
"type": "string",
"description": "Filter by assigned driver ID (optional)"
},
"is_fragile": {
"type": "boolean",
"description": "Filter fragile packages only (optional)"
},
"requires_signature": {
"type": "boolean",
"description": "Filter orders requiring signature (optional)"
},
"requires_cold_storage": {
"type": "boolean",
"description": "Filter orders requiring cold storage (optional)"
},
"sort_by": {
"type": "string",
"enum": ["created_at", "priority", "time_window_start"],
"description": "Field to sort by (default: created_at)"
},
"sort_order": {
"type": "string",
"enum": ["ASC", "DESC"],
"description": "Sort order (default: DESC for newest first)"
}
},
"required": []
}
},
{
"name": "get_order_details",
"description": "Get complete details of a specific order by order ID. Use when user asks 'tell me about order X' or wants detailed information about a specific order.",
"input_schema": {
"type": "object",
"properties": {
"order_id": {
"type": "string",
"description": "The order ID to fetch details for (e.g., 'ORD-20251114163800')"
}
},
"required": ["order_id"]
}
},
{
"name": "search_orders",
"description": "Search for orders by customer name, email, phone, or order ID pattern. Use when user provides partial information to find orders.",
"input_schema": {
"type": "object",
"properties": {
"search_term": {
"type": "string",
"description": "Search term to match against customer_name, customer_email, customer_phone, or order_id"
}
},
"required": ["search_term"]
}
},
{
"name": "get_incomplete_orders",
"description": "Get all orders that are not yet completed (excludes delivered and cancelled orders). Shortcut for finding orders in progress (pending, assigned, in_transit).",
"input_schema": {
"type": "object",
"properties": {
"limit": {
"type": "integer",
"description": "Number of orders to fetch (default: 20)"
}
},
"required": []
}
},
{
"name": "count_drivers",
"description": "Count total drivers in the database with optional filters. Use this when user asks 'how many drivers', 'show drivers', or wants driver statistics.",
"input_schema": {
"type": "object",
"properties": {
"status": {
"type": "string",
"enum": ["active", "busy", "offline", "unavailable"],
"description": "Filter by driver status (optional)"
},
"vehicle_type": {
"type": "string",
"description": "Filter by vehicle type: van, truck, car, motorcycle, etc. (optional)"
}
},
"required": []
}
},
{
"name": "fetch_drivers",
"description": "Fetch drivers from the database with optional filters, pagination, and sorting. Use after counting to show specific number of drivers.",
"input_schema": {
"type": "object",
"properties": {
"limit": {
"type": "integer",
"description": "Number of drivers to fetch (default: 10, max: 100)"
},
"offset": {
"type": "integer",
"description": "Number of drivers to skip for pagination (default: 0)"
},
"status": {
"type": "string",
"enum": ["active", "busy", "offline", "unavailable"],
"description": "Filter by driver status (optional)"
},
"vehicle_type": {
"type": "string",
"description": "Filter by vehicle type: van, truck, car, motorcycle, etc. (optional)"
},
"sort_by": {
"type": "string",
"enum": ["name", "status", "created_at", "last_location_update"],
"description": "Field to sort by (default: name)"
},
"sort_order": {
"type": "string",
"enum": ["ASC", "DESC"],
"description": "Sort order (default: ASC for alphabetical)"
}
},
"required": []
}
},
{
"name": "get_driver_details",
"description": "Get complete details of a specific driver by driver ID, including current location (latitude, longitude, and human-readable address), contact info, vehicle details, status, and skills. Use when user asks about a driver's location, coordinates, position, or any other driver information.",
"input_schema": {
"type": "object",
"properties": {
"driver_id": {
"type": "string",
"description": "The driver ID to fetch details for (e.g., 'DRV-20251114163800')"
}
},
"required": ["driver_id"]
}
},
{
"name": "search_drivers",
"description": "Search for drivers by name, email, phone, vehicle plate, or driver ID pattern. Use when user provides partial information to find drivers.",
"input_schema": {
"type": "object",
"properties": {
"search_term": {
"type": "string",
"description": "Search term to match against name, email, phone, vehicle_plate, or driver_id"
}
},
"required": ["search_term"]
}
},
{
"name": "get_available_drivers",
"description": "Get all drivers that are available for assignment (active or offline status, excludes busy and unavailable). Shortcut for finding drivers ready for dispatch.",
"input_schema": {
"type": "object",
"properties": {
"limit": {
"type": "integer",
"description": "Number of drivers to fetch (default: 20)"
}
},
"required": []
}
},
{
"name": "update_order",
"description": "Update an existing order's details. You can update any combination of fields. Only provide the fields you want to change.",
"input_schema": {
"type": "object",
"properties": {
"order_id": {
"type": "string",
"description": "Order ID to update (e.g., 'ORD-20250114123456')"
},
"customer_name": {
"type": "string",
"description": "Updated customer name"
},
"customer_phone": {
"type": "string",
"description": "Updated customer phone number"
},
"customer_email": {
"type": "string",
"description": "Updated customer email address"
},
"delivery_address": {
"type": "string",
"description": "Updated delivery address"
},
"delivery_lat": {
"type": "number",
"description": "Updated delivery latitude (required if updating address)"
},
"delivery_lng": {
"type": "number",
"description": "Updated delivery longitude (required if updating address)"
},
"status": {
"type": "string",
"description": "Updated order status",
"enum": ["pending", "assigned", "in_transit", "delivered", "failed", "cancelled"]
},
"priority": {
"type": "string",
"description": "Updated priority level",
"enum": ["standard", "express", "urgent"]
},
"special_instructions": {
"type": "string",
"description": "Updated special delivery instructions"
},
"time_window_end": {
"type": "string",
"description": "Updated delivery deadline (ISO format datetime)"
},
"payment_status": {
"type": "string",
"description": "Updated payment status",
"enum": ["pending", "paid", "cod"]
},
"weight_kg": {
"type": "number",
"description": "Updated package weight in kilograms"
},
"order_value": {
"type": "number",
"description": "Updated order value in currency"
}
},
"required": ["order_id"]
}
},
{
"name": "delete_order",
"description": "Permanently delete an order from the database. This action cannot be undone. Use with caution.",
"input_schema": {
"type": "object",
"properties": {
"order_id": {
"type": "string",
"description": "Order ID to delete (e.g., 'ORD-20250114123456')"
},
"confirm": {
"type": "boolean",
"description": "Must be set to true to confirm deletion"
}
},
"required": ["order_id", "confirm"]
}
},
{
"name": "update_driver",
"description": "Update an existing driver's details. You can update any combination of fields. Only provide the fields you want to change.",
"input_schema": {
"type": "object",
"properties": {
"driver_id": {
"type": "string",
"description": "Driver ID to update (e.g., 'DRV-20250114123456')"
},
"name": {
"type": "string",
"description": "Updated driver name"
},
"phone": {
"type": "string",
"description": "Updated phone number"
},
"email": {
"type": "string",
"description": "Updated email address"
},
"status": {
"type": "string",
"description": "Updated driver status",
"enum": ["active", "busy", "offline", "unavailable"]
},
"vehicle_type": {
"type": "string",
"description": "Updated vehicle type"
},
"vehicle_plate": {
"type": "string",
"description": "Updated vehicle license plate"
},
"capacity_kg": {
"type": "number",
"description": "Updated cargo capacity in kilograms"
},
"capacity_m3": {
"type": "number",
"description": "Updated cargo capacity in cubic meters"
},
"skills": {
"type": "array",
"items": {"type": "string"},
"description": "Updated list of driver skills/certifications"
},
"current_lat": {
"type": "number",
"description": "Updated current latitude"
},
"current_lng": {
"type": "number",
"description": "Updated current longitude"
}
},
"required": ["driver_id"]
}
},
{
"name": "delete_driver",
"description": "Permanently delete a driver from the database. This action cannot be undone. Use with caution.",
"input_schema": {
"type": "object",
"properties": {
"driver_id": {
"type": "string",
"description": "Driver ID to delete (e.g., 'DRV-20250114123456')"
},
"confirm": {
"type": "boolean",
"description": "Must be set to true to confirm deletion"
}
},
"required": ["driver_id", "confirm"]
}
}
]
def execute_tool(tool_name: str, tool_input: dict) -> dict:
"""
Route tool execution to appropriate handler
Args:
tool_name: Name of the tool to execute
tool_input: Tool input parameters
Returns:
Dict with tool execution results
"""
try:
if tool_name == "geocode_address":
return handle_geocode_address(tool_input)
elif tool_name == "create_order":
return handle_create_order(tool_input)
elif tool_name == "create_driver":
return handle_create_driver(tool_input)
elif tool_name == "count_orders":
return handle_count_orders(tool_input)
elif tool_name == "fetch_orders":
return handle_fetch_orders(tool_input)
elif tool_name == "get_order_details":
return handle_get_order_details(tool_input)
elif tool_name == "search_orders":
return handle_search_orders(tool_input)
elif tool_name == "get_incomplete_orders":
return handle_get_incomplete_orders(tool_input)
elif tool_name == "count_drivers":
return handle_count_drivers(tool_input)
elif tool_name == "fetch_drivers":
return handle_fetch_drivers(tool_input)
elif tool_name == "get_driver_details":
return handle_get_driver_details(tool_input)
elif tool_name == "search_drivers":
return handle_search_drivers(tool_input)
elif tool_name == "get_available_drivers":
return handle_get_available_drivers(tool_input)
elif tool_name == "update_order":
return handle_update_order(tool_input)
elif tool_name == "delete_order":
return handle_delete_order(tool_input)
elif tool_name == "update_driver":
return handle_update_driver(tool_input)
elif tool_name == "delete_driver":
return handle_delete_driver(tool_input)
else:
return {
"success": False,
"error": f"Unknown tool: {tool_name}"
}
except Exception as e:
logger.error(f"Tool execution error ({tool_name}): {e}")
return {
"success": False,
"error": str(e)
}
def handle_geocode_address(tool_input: dict) -> dict:
"""
Execute geocoding tool
Args:
tool_input: Dict with 'address' key
Returns:
Geocoding result
"""
address = tool_input.get("address", "")
if not address:
return {
"success": False,
"error": "Address is required"
}
logger.info(f"Geocoding address: {address}")
result = geocoding_service.geocode(address)
return {
"success": True,
"latitude": result["lat"],
"longitude": result["lng"],
"formatted_address": result["formatted_address"],
"confidence": result["confidence"],
"message": f"Address geocoded successfully ({result['confidence']})"
}
def handle_create_order(tool_input: dict) -> dict:
"""
Execute order creation tool
Args:
tool_input: Dict with order fields
Returns:
Order creation result
"""
# Extract fields with defaults
customer_name = tool_input.get("customer_name")
customer_phone = tool_input.get("customer_phone")
customer_email = tool_input.get("customer_email")
delivery_address = tool_input.get("delivery_address")
delivery_lat = tool_input.get("delivery_lat")
delivery_lng = tool_input.get("delivery_lng")
priority = tool_input.get("priority", "standard")
special_instructions = tool_input.get("special_instructions")
weight_kg = tool_input.get("weight_kg", 5.0)
# Validate required fields
if not all([customer_name, delivery_address, delivery_lat, delivery_lng]):
return {
"success": False,
"error": "Missing required fields: customer_name, delivery_address, delivery_lat, delivery_lng"
}
# Generate order ID
now = datetime.now()
order_id = f"ORD-{now.strftime('%Y%m%d%H%M%S')}"
# Handle time window
time_window_end_str = tool_input.get("time_window_end")
if time_window_end_str:
try:
time_window_end = datetime.fromisoformat(time_window_end_str.replace('Z', '+00:00'))
except:
time_window_end = now + timedelta(hours=6)
else:
time_window_end = now + timedelta(hours=6)
time_window_start = now + timedelta(hours=2)
# Insert into database
query = """
INSERT INTO orders (
order_id, customer_name, customer_phone, customer_email,
delivery_address, delivery_lat, delivery_lng,
time_window_start, time_window_end,
priority, weight_kg, status, special_instructions
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
params = (
order_id,
customer_name,
customer_phone,
customer_email,
delivery_address,
delivery_lat,
delivery_lng,
time_window_start,
time_window_end,
priority,
weight_kg,
"pending",
special_instructions
)
try:
execute_write(query, params)
logger.info(f"Order created: {order_id}")
return {
"success": True,
"order_id": order_id,
"status": "pending",
"customer": customer_name,
"address": delivery_address,
"deadline": time_window_end.strftime("%Y-%m-%d %H:%M"),
"priority": priority,
"message": f"Order {order_id} created successfully!"
}
except Exception as e:
logger.error(f"Database error creating order: {e}")
return {
"success": False,
"error": f"Failed to create order: {str(e)}"
}
def handle_create_driver(tool_input: dict) -> dict:
"""
Execute driver creation tool
Args:
tool_input: Dict with driver fields
Returns:
Driver creation result
"""
# Extract fields with defaults
name = tool_input.get("name")
phone = tool_input.get("phone")
email = tool_input.get("email")
vehicle_type = tool_input.get("vehicle_type", "van")
vehicle_plate = tool_input.get("vehicle_plate")
capacity_kg = tool_input.get("capacity_kg", 1000.0)
capacity_m3 = tool_input.get("capacity_m3", 12.0)
# Convert skills to regular list (handles protobuf RepeatedComposite)
skills_raw = tool_input.get("skills", [])
skills = list(skills_raw) if skills_raw else []
status = tool_input.get("status", "active")
# Validate required fields
if not name:
return {
"success": False,
"error": "Missing required field: name"
}
# Generate driver ID
now = datetime.now()
driver_id = f"DRV-{now.strftime('%Y%m%d%H%M%S')}"
# Default location (San Francisco)
current_lat = tool_input.get("current_lat", 37.7749)
current_lng = tool_input.get("current_lng", -122.4194)
# Insert into database
query = """
INSERT INTO drivers (
driver_id, name, phone, email,
current_lat, current_lng, last_location_update,
status, vehicle_type, vehicle_plate,
capacity_kg, capacity_m3, skills
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
# Convert skills list to JSON
import json
skills_json = json.dumps(skills) if skills else json.dumps([])
params = (
driver_id,
name,
phone,
email,
current_lat,
current_lng,
now,
status,
vehicle_type,
vehicle_plate,
capacity_kg,
capacity_m3,
skills_json
)
try:
execute_write(query, params)
logger.info(f"Driver created: {driver_id}")
return {
"success": True,
"driver_id": driver_id,
"name": name,
"status": status,
"vehicle_type": vehicle_type,
"vehicle_plate": vehicle_plate,
"capacity_kg": capacity_kg,
"skills": skills,
"message": f"Driver {driver_id} ({name}) created successfully!"
}
except Exception as e:
logger.error(f"Database error creating driver: {e}")
return {
"success": False,
"error": f"Failed to create driver: {str(e)}"
}
def handle_update_order(tool_input: dict) -> dict:
"""
Execute order update tool
Args:
tool_input: Dict with order_id and fields to update
Returns:
Update result
"""
import json
order_id = tool_input.get("order_id")
# Validate required field
if not order_id:
return {
"success": False,
"error": "Missing required field: order_id"
}
# Check if order exists
check_query = "SELECT order_id FROM orders WHERE order_id = %s"
existing = execute_query(check_query, (order_id,))
if not existing:
return {
"success": False,
"error": f"Order {order_id} not found"
}
# Auto-geocode if delivery address is updated without coordinates
if "delivery_address" in tool_input and ("delivery_lat" not in tool_input or "delivery_lng" not in tool_input):
from chat.geocoding import GeocodingService
geocoding_service = GeocodingService()
try:
geocode_result = geocoding_service.geocode(tool_input["delivery_address"])
tool_input["delivery_lat"] = geocode_result["lat"]
tool_input["delivery_lng"] = geocode_result["lng"]
logger.info(f"Auto-geocoded delivery address: {geocode_result['formatted_address']}")
except Exception as e:
logger.warning(f"Failed to geocode address, skipping coordinate update: {e}")
# Build UPDATE query dynamically based on provided fields
update_fields = []
params = []
# Map of field names to their database columns
updateable_fields = {
"customer_name": "customer_name",
"customer_phone": "customer_phone",
"customer_email": "customer_email",
"delivery_address": "delivery_address",
"delivery_lat": "delivery_lat",
"delivery_lng": "delivery_lng",
"status": "status",
"priority": "priority",
"special_instructions": "special_instructions",
"time_window_end": "time_window_end",
"payment_status": "payment_status",
"weight_kg": "weight_kg",
"order_value": "order_value"
}
for field, column in updateable_fields.items():
if field in tool_input:
update_fields.append(f"{column} = %s")
params.append(tool_input[field])
if not update_fields:
return {
"success": False,
"error": "No fields provided to update"
}
# Always update the updated_at timestamp
update_fields.append("updated_at = %s")
params.append(datetime.now())
# Add order_id for WHERE clause
params.append(order_id)
# Execute update
query = f"""
UPDATE orders
SET {', '.join(update_fields)}
WHERE order_id = %s
"""
try:
execute_write(query, tuple(params))
logger.info(f"Order updated: {order_id}")
return {
"success": True,
"order_id": order_id,
"updated_fields": list(updateable_fields.keys() & tool_input.keys()),
"message": f"Order {order_id} updated successfully!"
}
except Exception as e:
logger.error(f"Database error updating order: {e}")
return {
"success": False,
"error": f"Failed to update order: {str(e)}"
}
def handle_delete_order(tool_input: dict) -> dict:
"""
Execute order deletion tool
Args:
tool_input: Dict with order_id and confirm flag
Returns:
Deletion result
"""
order_id = tool_input.get("order_id")
confirm = tool_input.get("confirm", False)
# Validate required fields
if not order_id:
return {
"success": False,
"error": "Missing required field: order_id"
}
if not confirm:
return {
"success": False,
"error": "Deletion not confirmed. Set confirm=true to proceed."
}
# Check if order exists
check_query = "SELECT order_id, status FROM orders WHERE order_id = %s"
existing = execute_query(check_query, (order_id,))
if not existing:
return {
"success": False,
"error": f"Order {order_id} not found"
}
# Delete the order
query = "DELETE FROM orders WHERE order_id = %s"
try:
execute_write(query, (order_id,))
logger.info(f"Order deleted: {order_id}")
return {
"success": True,
"order_id": order_id,
"message": f"Order {order_id} has been permanently deleted."
}
except Exception as e:
logger.error(f"Database error deleting order: {e}")
return {
"success": False,
"error": f"Failed to delete order: {str(e)}"
}
def handle_update_driver(tool_input: dict) -> dict:
"""
Execute driver update tool
Args:
tool_input: Dict with driver_id and fields to update
Returns:
Update result
"""
import json
driver_id = tool_input.get("driver_id")
# Validate required field
if not driver_id:
return {
"success": False,
"error": "Missing required field: driver_id"
}
# Check if driver exists
check_query = "SELECT driver_id FROM drivers WHERE driver_id = %s"
existing = execute_query(check_query, (driver_id,))
if not existing:
return {
"success": False,
"error": f"Driver {driver_id} not found"
}
# Build UPDATE query dynamically based on provided fields
update_fields = []
params = []
# Map of field names to their database columns
updateable_fields = {
"name": "name",
"phone": "phone",
"email": "email",
"status": "status",
"vehicle_type": "vehicle_type",
"vehicle_plate": "vehicle_plate",
"capacity_kg": "capacity_kg",
"capacity_m3": "capacity_m3",
"current_lat": "current_lat",
"current_lng": "current_lng"
}
for field, column in updateable_fields.items():
if field in tool_input:
update_fields.append(f"{column} = %s")
params.append(tool_input[field])
# Handle skills array specially (convert to JSON)
if "skills" in tool_input:
skills = list(tool_input.get("skills", []))
update_fields.append("skills = %s")
params.append(json.dumps(skills))
if not update_fields:
return {
"success": False,
"error": "No fields provided to update"
}
# Always update the updated_at timestamp
update_fields.append("updated_at = %s")
params.append(datetime.now())
# Update location timestamp if lat/lng changed
if "current_lat" in tool_input or "current_lng" in tool_input:
update_fields.append("last_location_update = %s")
params.append(datetime.now())
# Add driver_id for WHERE clause
params.append(driver_id)
# Execute update
query = f"""
UPDATE drivers
SET {', '.join(update_fields)}
WHERE driver_id = %s
"""
try:
execute_write(query, tuple(params))
logger.info(f"Driver updated: {driver_id}")
updated_list = list(updateable_fields.keys() & tool_input.keys())
if "skills" in tool_input:
updated_list.append("skills")
return {
"success": True,
"driver_id": driver_id,
"updated_fields": updated_list,
"message": f"Driver {driver_id} updated successfully!"
}
except Exception as e:
logger.error(f"Database error updating driver: {e}")
return {
"success": False,
"error": f"Failed to update driver: {str(e)}"
}
def handle_delete_driver(tool_input: dict) -> dict:
"""
Execute driver deletion tool
Args:
tool_input: Dict with driver_id and confirm flag
Returns:
Deletion result
"""
driver_id = tool_input.get("driver_id")
confirm = tool_input.get("confirm", False)
# Validate required fields
if not driver_id:
return {
"success": False,
"error": "Missing required field: driver_id"
}
if not confirm:
return {
"success": False,
"error": "Deletion not confirmed. Set confirm=true to proceed."
}
# Check if driver exists
check_query = "SELECT driver_id, name FROM drivers WHERE driver_id = %s"
existing = execute_query(check_query, (driver_id,))
if not existing:
return {
"success": False,
"error": f"Driver {driver_id} not found"
}
driver_name = existing[0]["name"]
# Delete the driver
query = "DELETE FROM drivers WHERE driver_id = %s"
try:
execute_write(query, (driver_id,))
logger.info(f"Driver deleted: {driver_id}")
return {
"success": True,
"driver_id": driver_id,
"message": f"Driver {driver_id} ({driver_name}) has been permanently deleted."
}
except Exception as e:
logger.error(f"Database error deleting driver: {e}")
return {
"success": False,
"error": f"Failed to delete driver: {str(e)}"
}
def handle_count_orders(tool_input: dict) -> dict:
"""
Execute count orders tool
Args:
tool_input: Dict with optional filter fields
Returns:
Order count result with breakdown
"""
# Build WHERE clause based on filters
where_clauses = []
params = []
if "status" in tool_input:
where_clauses.append("status = %s")
params.append(tool_input["status"])
if "priority" in tool_input:
where_clauses.append("priority = %s")
params.append(tool_input["priority"])
if "payment_status" in tool_input:
where_clauses.append("payment_status = %s")
params.append(tool_input["payment_status"])
if "assigned_driver_id" in tool_input:
where_clauses.append("assigned_driver_id = %s")
params.append(tool_input["assigned_driver_id"])
if "is_fragile" in tool_input:
where_clauses.append("is_fragile = %s")
params.append(tool_input["is_fragile"])
if "requires_signature" in tool_input:
where_clauses.append("requires_signature = %s")
params.append(tool_input["requires_signature"])
if "requires_cold_storage" in tool_input:
where_clauses.append("requires_cold_storage = %s")
params.append(tool_input["requires_cold_storage"])
where_sql = " WHERE " + " AND ".join(where_clauses) if where_clauses else ""
# Total count query
count_query = f"SELECT COUNT(*) as total FROM orders{where_sql}"
# Breakdown by status query
breakdown_query = f"""
SELECT status, COUNT(*) as count
FROM orders{where_sql}
GROUP BY status
ORDER BY count DESC
"""
# Breakdown by priority query
priority_query = f"""
SELECT priority, COUNT(*) as count
FROM orders{where_sql}
GROUP BY priority
ORDER BY CASE priority
WHEN 'urgent' THEN 1
WHEN 'express' THEN 2
WHEN 'standard' THEN 3
END
"""
try:
# Execute queries
total_result = execute_query(count_query, tuple(params) if params else None)
total = total_result[0]['total'] if total_result else 0
status_result = execute_query(breakdown_query, tuple(params) if params else None)
priority_result = execute_query(priority_query, tuple(params) if params else None)
# Format breakdown
status_breakdown = {row['status']: row['count'] for row in status_result} if status_result else {}
priority_breakdown = {row['priority']: row['count'] for row in priority_result} if priority_result else {}
logger.info(f"Counted orders: {total} total")
return {
"success": True,
"total": total,
"status_breakdown": status_breakdown,
"priority_breakdown": priority_breakdown,
"message": f"Found {total} order(s)"
}
except Exception as e:
logger.error(f"Database error counting orders: {e}")
return {
"success": False,
"error": f"Failed to count orders: {str(e)}"
}
def handle_fetch_orders(tool_input: dict) -> dict:
"""
Execute fetch orders tool
Args:
tool_input: Dict with filter, pagination, and sorting options
Returns:
List of orders matching criteria
"""
# Extract pagination and sorting
limit = min(tool_input.get("limit", 10), 100) # Cap at 100
offset = tool_input.get("offset", 0)
sort_by = tool_input.get("sort_by", "created_at")
sort_order = tool_input.get("sort_order", "DESC")
# Build WHERE clause based on filters
where_clauses = []
params = []
if "status" in tool_input:
where_clauses.append("status = %s")
params.append(tool_input["status"])
if "priority" in tool_input:
where_clauses.append("priority = %s")
params.append(tool_input["priority"])
if "payment_status" in tool_input:
where_clauses.append("payment_status = %s")
params.append(tool_input["payment_status"])
if "assigned_driver_id" in tool_input:
where_clauses.append("assigned_driver_id = %s")
params.append(tool_input["assigned_driver_id"])
if "is_fragile" in tool_input:
where_clauses.append("is_fragile = %s")
params.append(tool_input["is_fragile"])
if "requires_signature" in tool_input:
where_clauses.append("requires_signature = %s")
params.append(tool_input["requires_signature"])
if "requires_cold_storage" in tool_input:
where_clauses.append("requires_cold_storage = %s")
params.append(tool_input["requires_cold_storage"])
where_sql = " WHERE " + " AND ".join(where_clauses) if where_clauses else ""
# Build query
query = f"""
SELECT
order_id, customer_name, customer_phone, customer_email,
delivery_address, delivery_lat, delivery_lng,
time_window_start, time_window_end,
priority, weight_kg, volume_m3, special_instructions,
status, assigned_driver_id,
created_at, updated_at, delivered_at,
order_value, payment_status,
requires_signature, is_fragile, requires_cold_storage
FROM orders
{where_sql}
ORDER BY {sort_by} {sort_order}
LIMIT %s OFFSET %s
"""
params.extend([limit, offset])
try:
results = execute_query(query, tuple(params))
if not results:
return {
"success": True,
"orders": [],
"count": 0,
"message": "No orders found matching criteria"
}
# Format orders for readability
orders = []
for row in results:
order = {
"order_id": row['order_id'],
"customer": {
"name": row['customer_name'],
"phone": row['customer_phone'],
"email": row['customer_email']
},
"delivery": {
"address": row['delivery_address'],
"latitude": float(row['delivery_lat']) if row['delivery_lat'] else None,
"longitude": float(row['delivery_lng']) if row['delivery_lng'] else None
},
"time_window": {
"start": str(row['time_window_start']) if row['time_window_start'] else None,
"end": str(row['time_window_end']) if row['time_window_end'] else None
},
"details": {
"priority": row['priority'],
"status": row['status'],
"weight_kg": float(row['weight_kg']) if row['weight_kg'] else None,
"volume_m3": float(row['volume_m3']) if row['volume_m3'] else None,
"special_instructions": row['special_instructions']
},
"flags": {
"requires_signature": row['requires_signature'],
"is_fragile": row['is_fragile'],
"requires_cold_storage": row['requires_cold_storage']
},
"payment": {
"order_value": float(row['order_value']) if row['order_value'] else None,
"payment_status": row['payment_status']
},
"assigned_driver_id": row['assigned_driver_id'],
"timestamps": {
"created_at": str(row['created_at']),
"updated_at": str(row['updated_at']) if row['updated_at'] else None,
"delivered_at": str(row['delivered_at']) if row['delivered_at'] else None
}
}
orders.append(order)
logger.info(f"Fetched {len(orders)} orders")
return {
"success": True,
"orders": orders,
"count": len(orders),
"message": f"Retrieved {len(orders)} order(s)"
}
except Exception as e:
logger.error(f"Database error fetching orders: {e}")
return {
"success": False,
"error": f"Failed to fetch orders: {str(e)}"
}
def handle_get_order_details(tool_input: dict) -> dict:
"""
Execute get order details tool
Args:
tool_input: Dict with order_id
Returns:
Complete order details
"""
order_id = tool_input.get("order_id")
if not order_id:
return {
"success": False,
"error": "order_id is required"
}
query = """
SELECT
order_id, customer_name, customer_phone, customer_email,
pickup_address, pickup_lat, pickup_lng,
delivery_address, delivery_lat, delivery_lng,
time_window_start, time_window_end,
priority, weight_kg, volume_m3, special_instructions,
status, assigned_driver_id,
created_at, updated_at, delivered_at,
order_value, payment_status,
requires_signature, is_fragile, requires_cold_storage
FROM orders
WHERE order_id = %s
"""
try:
results = execute_query(query, (order_id,))
if not results:
return {
"success": False,
"error": f"Order {order_id} not found"
}
row = results[0]
order = {
"order_id": row['order_id'],
"customer": {
"name": row['customer_name'],
"phone": row['customer_phone'],
"email": row['customer_email']
},
"pickup": {
"address": row['pickup_address'],
"latitude": float(row['pickup_lat']) if row['pickup_lat'] else None,
"longitude": float(row['pickup_lng']) if row['pickup_lng'] else None
} if row['pickup_address'] else None,
"delivery": {
"address": row['delivery_address'],
"latitude": float(row['delivery_lat']) if row['delivery_lat'] else None,
"longitude": float(row['delivery_lng']) if row['delivery_lng'] else None
},
"time_window": {
"start": str(row['time_window_start']) if row['time_window_start'] else None,
"end": str(row['time_window_end']) if row['time_window_end'] else None
},
"details": {
"priority": row['priority'],
"status": row['status'],
"weight_kg": float(row['weight_kg']) if row['weight_kg'] else None,
"volume_m3": float(row['volume_m3']) if row['volume_m3'] else None,
"special_instructions": row['special_instructions']
},
"flags": {
"requires_signature": row['requires_signature'],
"is_fragile": row['is_fragile'],
"requires_cold_storage": row['requires_cold_storage']
},
"payment": {
"order_value": float(row['order_value']) if row['order_value'] else None,
"payment_status": row['payment_status']
},
"assigned_driver_id": row['assigned_driver_id'],
"timestamps": {
"created_at": str(row['created_at']),
"updated_at": str(row['updated_at']) if row['updated_at'] else None,
"delivered_at": str(row['delivered_at']) if row['delivered_at'] else None
}
}
logger.info(f"Retrieved details for order: {order_id}")
return {
"success": True,
"order": order,
"message": f"Order {order_id} details retrieved"
}
except Exception as e:
logger.error(f"Database error getting order details: {e}")
return {
"success": False,
"error": f"Failed to get order details: {str(e)}"
}
def handle_search_orders(tool_input: dict) -> dict:
"""
Execute search orders tool
Args:
tool_input: Dict with search_term
Returns:
List of matching orders
"""
search_term = tool_input.get("search_term", "").strip()
if not search_term:
return {
"success": False,
"error": "search_term is required"
}
query = """
SELECT
order_id, customer_name, customer_phone, customer_email,
delivery_address, priority, status, created_at
FROM orders
WHERE
order_id ILIKE %s OR
customer_name ILIKE %s OR
customer_email ILIKE %s OR
customer_phone ILIKE %s
ORDER BY created_at DESC
LIMIT 50
"""
search_pattern = f"%{search_term}%"
params = (search_pattern, search_pattern, search_pattern, search_pattern)
try:
results = execute_query(query, params)
if not results:
return {
"success": True,
"orders": [],
"count": 0,
"message": f"No orders found matching '{search_term}'"
}
orders = []
for row in results:
orders.append({
"order_id": row['order_id'],
"customer_name": row['customer_name'],
"customer_phone": row['customer_phone'],
"customer_email": row['customer_email'],
"delivery_address": row['delivery_address'],
"priority": row['priority'],
"status": row['status'],
"created_at": str(row['created_at'])
})
logger.info(f"Search '{search_term}' found {len(orders)} orders")
return {
"success": True,
"orders": orders,
"count": len(orders),
"message": f"Found {len(orders)} order(s) matching '{search_term}'"
}
except Exception as e:
logger.error(f"Database error searching orders: {e}")
return {
"success": False,
"error": f"Failed to search orders: {str(e)}"
}
def handle_get_incomplete_orders(tool_input: dict) -> dict:
"""
Execute get incomplete orders tool
Args:
tool_input: Dict with optional limit
Returns:
List of incomplete orders (pending, assigned, in_transit)
"""
limit = min(tool_input.get("limit", 20), 100)
query = """
SELECT
order_id, customer_name, delivery_address,
priority, status, time_window_end, created_at,
assigned_driver_id
FROM orders
WHERE status IN ('pending', 'assigned', 'in_transit')
ORDER BY
CASE priority
WHEN 'urgent' THEN 1
WHEN 'express' THEN 2
WHEN 'standard' THEN 3
END,
time_window_end ASC
LIMIT %s
"""
try:
results = execute_query(query, (limit,))
if not results:
return {
"success": True,
"orders": [],
"count": 0,
"message": "No incomplete orders found"
}
orders = []
for row in results:
orders.append({
"order_id": row['order_id'],
"customer_name": row['customer_name'],
"delivery_address": row['delivery_address'],
"priority": row['priority'],
"status": row['status'],
"time_window_end": str(row['time_window_end']) if row['time_window_end'] else None,
"created_at": str(row['created_at']),
"assigned_driver_id": row['assigned_driver_id']
})
logger.info(f"Retrieved {len(orders)} incomplete orders")
return {
"success": True,
"orders": orders,
"count": len(orders),
"message": f"Found {len(orders)} incomplete order(s)"
}
except Exception as e:
logger.error(f"Database error getting incomplete orders: {e}")
return {
"success": False,
"error": f"Failed to get incomplete orders: {str(e)}"
}
def handle_count_drivers(tool_input: dict) -> dict:
"""
Execute count drivers tool
Args:
tool_input: Dict with optional filter fields
Returns:
Driver count result with breakdown
"""
# Build WHERE clause based on filters
where_clauses = []
params = []
if "status" in tool_input:
where_clauses.append("status = %s")
params.append(tool_input["status"])
if "vehicle_type" in tool_input:
where_clauses.append("vehicle_type = %s")
params.append(tool_input["vehicle_type"])
where_sql = " WHERE " + " AND ".join(where_clauses) if where_clauses else ""
# Total count query
count_query = f"SELECT COUNT(*) as total FROM drivers{where_sql}"
# Breakdown by status query
status_query = f"""
SELECT status, COUNT(*) as count
FROM drivers{where_sql}
GROUP BY status
ORDER BY count DESC
"""
# Breakdown by vehicle type query
vehicle_query = f"""
SELECT vehicle_type, COUNT(*) as count
FROM drivers{where_sql}
GROUP BY vehicle_type
ORDER BY count DESC
"""
try:
# Execute queries
total_result = execute_query(count_query, tuple(params) if params else None)
total = total_result[0]['total'] if total_result else 0
status_result = execute_query(status_query, tuple(params) if params else None)
vehicle_result = execute_query(vehicle_query, tuple(params) if params else None)
# Format breakdown
status_breakdown = {row['status']: row['count'] for row in status_result} if status_result else {}
vehicle_breakdown = {row['vehicle_type']: row['count'] for row in vehicle_result if row['vehicle_type']} if vehicle_result else {}
logger.info(f"Counted drivers: {total} total")
return {
"success": True,
"total": total,
"status_breakdown": status_breakdown,
"vehicle_breakdown": vehicle_breakdown,
"message": f"Found {total} driver(s)"
}
except Exception as e:
logger.error(f"Database error counting drivers: {e}")
return {
"success": False,
"error": f"Failed to count drivers: {str(e)}"
}
def handle_fetch_drivers(tool_input: dict) -> dict:
"""
Execute fetch drivers tool
Args:
tool_input: Dict with filter, pagination, and sorting options
Returns:
List of drivers matching criteria
"""
# Extract pagination and sorting
limit = min(tool_input.get("limit", 10), 100) # Cap at 100
offset = tool_input.get("offset", 0)
sort_by = tool_input.get("sort_by", "name")
sort_order = tool_input.get("sort_order", "ASC")
# Build WHERE clause based on filters
where_clauses = []
params = []
if "status" in tool_input:
where_clauses.append("status = %s")
params.append(tool_input["status"])
if "vehicle_type" in tool_input:
where_clauses.append("vehicle_type = %s")
params.append(tool_input["vehicle_type"])
where_sql = " WHERE " + " AND ".join(where_clauses) if where_clauses else ""
# Build query
query = f"""
SELECT
driver_id, name, phone, email,
current_lat, current_lng, last_location_update,
status, vehicle_type, vehicle_plate,
capacity_kg, capacity_m3, skills,
created_at, updated_at
FROM drivers
{where_sql}
ORDER BY {sort_by} {sort_order}
LIMIT %s OFFSET %s
"""
params.extend([limit, offset])
try:
results = execute_query(query, tuple(params))
if not results:
return {
"success": True,
"drivers": [],
"count": 0,
"message": "No drivers found matching criteria"
}
# Format drivers for readability
drivers = []
for row in results:
# Parse skills JSON if present
skills = []
if row['skills']:
try:
import json
skills = json.loads(row['skills']) if isinstance(row['skills'], str) else row['skills']
except:
skills = []
driver = {
"driver_id": row['driver_id'],
"name": row['name'],
"contact": {
"phone": row['phone'],
"email": row['email']
},
"location": {
"latitude": float(row['current_lat']) if row['current_lat'] else None,
"longitude": float(row['current_lng']) if row['current_lng'] else None,
"last_update": str(row['last_location_update']) if row['last_location_update'] else None
},
"status": row['status'],
"vehicle": {
"type": row['vehicle_type'],
"plate": row['vehicle_plate'],
"capacity_kg": float(row['capacity_kg']) if row['capacity_kg'] else None,
"capacity_m3": float(row['capacity_m3']) if row['capacity_m3'] else None
},
"skills": skills,
"timestamps": {
"created_at": str(row['created_at']),
"updated_at": str(row['updated_at']) if row['updated_at'] else None
}
}
drivers.append(driver)
logger.info(f"Fetched {len(drivers)} drivers")
return {
"success": True,
"drivers": drivers,
"count": len(drivers),
"message": f"Retrieved {len(drivers)} driver(s)"
}
except Exception as e:
logger.error(f"Database error fetching drivers: {e}")
return {
"success": False,
"error": f"Failed to fetch drivers: {str(e)}"
}
def handle_get_driver_details(tool_input: dict) -> dict:
"""
Execute get driver details tool
Args:
tool_input: Dict with driver_id
Returns:
Complete driver details
"""
driver_id = tool_input.get("driver_id")
if not driver_id:
return {
"success": False,
"error": "driver_id is required"
}
query = """
SELECT
driver_id, name, phone, email,
current_lat, current_lng, last_location_update,
status, vehicle_type, vehicle_plate,
capacity_kg, capacity_m3, skills,
created_at, updated_at
FROM drivers
WHERE driver_id = %s
"""
try:
results = execute_query(query, (driver_id,))
if not results:
return {
"success": False,
"error": f"Driver {driver_id} not found"
}
row = results[0]
# Parse skills JSON if present
skills = []
if row['skills']:
try:
import json
skills = json.loads(row['skills']) if isinstance(row['skills'], str) else row['skills']
except:
skills = []
# Reverse geocode location to get address
location_address = None
if row['current_lat'] and row['current_lng']:
try:
from chat.geocoding import GeocodingService
geocoding_service = GeocodingService()
reverse_result = geocoding_service.reverse_geocode(
float(row['current_lat']),
float(row['current_lng'])
)
location_address = reverse_result.get('formatted_address', None)
logger.info(f"Reverse geocoded driver location: {location_address}")
except Exception as e:
logger.warning(f"Failed to reverse geocode driver location: {e}")
location_address = None
driver = {
"driver_id": row['driver_id'],
"name": row['name'],
"contact": {
"phone": row['phone'],
"email": row['email']
},
"location": {
"latitude": float(row['current_lat']) if row['current_lat'] else None,
"longitude": float(row['current_lng']) if row['current_lng'] else None,
"address": location_address,
"last_update": str(row['last_location_update']) if row['last_location_update'] else None
},
"status": row['status'],
"vehicle": {
"type": row['vehicle_type'],
"plate": row['vehicle_plate'],
"capacity_kg": float(row['capacity_kg']) if row['capacity_kg'] else None,
"capacity_m3": float(row['capacity_m3']) if row['capacity_m3'] else None
},
"skills": skills,
"timestamps": {
"created_at": str(row['created_at']),
"updated_at": str(row['updated_at']) if row['updated_at'] else None
}
}
logger.info(f"Retrieved details for driver: {driver_id}")
return {
"success": True,
"driver": driver,
"message": f"Driver {driver_id} details retrieved"
}
except Exception as e:
logger.error(f"Database error getting driver details: {e}")
return {
"success": False,
"error": f"Failed to get driver details: {str(e)}"
}
def handle_search_drivers(tool_input: dict) -> dict:
"""
Execute search drivers tool
Args:
tool_input: Dict with search_term
Returns:
List of matching drivers
"""
search_term = tool_input.get("search_term", "").strip()
if not search_term:
return {
"success": False,
"error": "search_term is required"
}
query = """
SELECT
driver_id, name, phone, email,
vehicle_type, vehicle_plate, status, created_at
FROM drivers
WHERE
driver_id ILIKE %s OR
name ILIKE %s OR
email ILIKE %s OR
phone ILIKE %s OR
vehicle_plate ILIKE %s
ORDER BY name ASC
LIMIT 50
"""
search_pattern = f"%{search_term}%"
params = (search_pattern, search_pattern, search_pattern, search_pattern, search_pattern)
try:
results = execute_query(query, params)
if not results:
return {
"success": True,
"drivers": [],
"count": 0,
"message": f"No drivers found matching '{search_term}'"
}
drivers = []
for row in results:
drivers.append({
"driver_id": row['driver_id'],
"name": row['name'],
"phone": row['phone'],
"email": row['email'],
"vehicle_type": row['vehicle_type'],
"vehicle_plate": row['vehicle_plate'],
"status": row['status'],
"created_at": str(row['created_at'])
})
logger.info(f"Search '{search_term}' found {len(drivers)} drivers")
return {
"success": True,
"drivers": drivers,
"count": len(drivers),
"message": f"Found {len(drivers)} driver(s) matching '{search_term}'"
}
except Exception as e:
logger.error(f"Database error searching drivers: {e}")
return {
"success": False,
"error": f"Failed to search drivers: {str(e)}"
}
def handle_get_available_drivers(tool_input: dict) -> dict:
"""
Execute get available drivers tool
Args:
tool_input: Dict with optional limit
Returns:
List of available drivers (active or offline)
"""
limit = min(tool_input.get("limit", 20), 100)
query = """
SELECT
driver_id, name, phone, vehicle_type, vehicle_plate,
current_lat, current_lng, last_location_update,
status, capacity_kg, capacity_m3, skills
FROM drivers
WHERE status IN ('active', 'offline')
ORDER BY
CASE status
WHEN 'active' THEN 1
WHEN 'offline' THEN 2
END,
name ASC
LIMIT %s
"""
try:
results = execute_query(query, (limit,))
if not results:
return {
"success": True,
"drivers": [],
"count": 0,
"message": "No available drivers found"
}
drivers = []
for row in results:
# Parse skills JSON if present
skills = []
if row['skills']:
try:
import json
skills = json.loads(row['skills']) if isinstance(row['skills'], str) else row['skills']
except:
skills = []
drivers.append({
"driver_id": row['driver_id'],
"name": row['name'],
"phone": row['phone'],
"location": {
"latitude": float(row['current_lat']) if row['current_lat'] else None,
"longitude": float(row['current_lng']) if row['current_lng'] else None,
"last_update": str(row['last_location_update']) if row['last_location_update'] else None
},
"status": row['status'],
"vehicle": {
"type": row['vehicle_type'],
"plate": row['vehicle_plate'],
"capacity_kg": float(row['capacity_kg']) if row['capacity_kg'] else None,
"capacity_m3": float(row['capacity_m3']) if row['capacity_m3'] else None
},
"skills": skills
})
logger.info(f"Retrieved {len(drivers)} available drivers")
return {
"success": True,
"drivers": drivers,
"count": len(drivers),
"message": f"Found {len(drivers)} available driver(s)"
}
except Exception as e:
logger.error(f"Database error getting available drivers: {e}")
return {
"success": False,
"error": f"Failed to get available drivers: {str(e)}"
}
def get_tools_list() -> list:
"""Get list of available tools"""
return [tool["name"] for tool in TOOLS_SCHEMA]
def get_tool_description(tool_name: str) -> str:
"""Get description for a specific tool"""
for tool in TOOLS_SCHEMA:
if tool["name"] == tool_name:
return tool["description"]
return ""