Spaces:
Sleeping
Sleeping
Seth McKnight
feat: Add specific error handling for LLM configuration in health checks (#55)
d14a473
| import os | |
| # Import type annotations | |
| from typing import Any, Dict | |
| from dotenv import load_dotenv | |
| from flask import Flask, jsonify, render_template, request | |
| # Load environment variables from .env file | |
| load_dotenv() | |
| # Proactively disable ChromaDB telemetry via environment variables so | |
| # the library doesn't attempt to call external PostHog telemetry endpoints. | |
| # This helps avoid noisy errors in server logs (Render may not expose | |
| # the expected device files or telemetry endpoints). | |
| os.environ.setdefault("ANONYMIZED_TELEMETRY", "False") | |
| os.environ.setdefault("CHROMA_TELEMETRY", "False") | |
| # Attempt to configure chromadb and monkeypatch any telemetry capture | |
| # functions to be no-ops. Some chromadb versions call posthog.capture | |
| # with a different signature which can raise exceptions during runtime | |
| # (observed on Render as: capture() takes 1 positional argument but 3 were given). | |
| try: | |
| import chromadb | |
| try: | |
| chromadb.configure(anonymized_telemetry=False) # type: ignore | |
| except Exception: | |
| # Non-fatal: continue and still try to neutralize telemetry functions | |
| pass | |
| # Defensive monkeypatch: if the telemetry client exists, replace capture | |
| # with a safe no-op that accepts any args/kwargs to avoid signature issues. | |
| try: | |
| from chromadb.telemetry.product import posthog as _posthog # type: ignore | |
| # Replace module-level capture and Posthog.capture if present | |
| if hasattr(_posthog, "capture"): | |
| setattr(_posthog, "capture", lambda *args, **kwargs: None) | |
| if hasattr(_posthog, "Posthog") and hasattr(_posthog.Posthog, "capture"): | |
| setattr(_posthog.Posthog, "capture", lambda *args, **kwargs: None) | |
| except Exception: | |
| # If telemetry internals aren't present or change across versions, ignore | |
| pass | |
| except Exception: | |
| # chromadb not installed or import failed; continue without telemetry | |
| pass | |
| app = Flask(__name__) | |
| def index(): | |
| """ | |
| Renders the chat interface. | |
| """ | |
| return render_template("chat.html") | |
| def health(): | |
| """ | |
| Health check endpoint. | |
| """ | |
| return jsonify({"status": "ok"}), 200 | |
| def ingest(): | |
| """Endpoint to trigger document ingestion with embeddings""" | |
| try: | |
| from src.config import ( | |
| CORPUS_DIRECTORY, | |
| DEFAULT_CHUNK_SIZE, | |
| DEFAULT_OVERLAP, | |
| RANDOM_SEED, | |
| ) | |
| from src.ingestion.ingestion_pipeline import IngestionPipeline | |
| # Get optional parameters from request | |
| data: Dict[str, Any] = request.get_json() if request.is_json else {} | |
| store_embeddings: bool = bool(data.get("store_embeddings", True)) | |
| pipeline = IngestionPipeline( | |
| chunk_size=DEFAULT_CHUNK_SIZE, | |
| overlap=DEFAULT_OVERLAP, | |
| seed=RANDOM_SEED, | |
| store_embeddings=store_embeddings, | |
| ) | |
| result = pipeline.process_directory_with_embeddings(CORPUS_DIRECTORY) | |
| # Create response with enhanced information | |
| response: Dict[str, Any] = { | |
| "status": result["status"], | |
| "chunks_processed": result["chunks_processed"], | |
| "files_processed": result["files_processed"], | |
| "embeddings_stored": result["embeddings_stored"], | |
| "store_embeddings": result["store_embeddings"], | |
| "message": ( | |
| f"Successfully processed {result['chunks_processed']} chunks " | |
| f"from {result['files_processed']} files" | |
| ), | |
| } | |
| # Include failed files info if any | |
| if result["failed_files"]: | |
| response["failed_files"] = result["failed_files"] | |
| failed_count = len(result["failed_files"]) | |
| response["warnings"] = f"{failed_count} files failed to process" | |
| return jsonify(response) | |
| except Exception as e: | |
| return jsonify({"status": "error", "message": str(e)}), 500 | |
| def search(): | |
| """ | |
| Endpoint to perform semantic search on ingested documents. | |
| Accepts JSON requests with query text and optional parameters. | |
| Returns semantically similar document chunks. | |
| """ | |
| try: | |
| # Validate request contains JSON data | |
| if not request.is_json: | |
| return ( | |
| jsonify( | |
| { | |
| "status": "error", | |
| "message": "Content-Type must be application/json", | |
| } | |
| ), | |
| 400, | |
| ) | |
| data = request.get_json() | |
| # Validate required query parameter | |
| query = data.get("query") | |
| if query is None: | |
| return ( | |
| jsonify({"status": "error", "message": "Query parameter is required"}), | |
| 400, | |
| ) | |
| if not isinstance(query, str) or not query.strip(): | |
| return ( | |
| jsonify( | |
| {"status": "error", "message": "Query must be a non-empty string"} | |
| ), | |
| 400, | |
| ) | |
| # Extract optional parameters with defaults | |
| top_k = data.get("top_k", 5) | |
| threshold = data.get("threshold", 0.3) | |
| # Validate parameters | |
| if not isinstance(top_k, int) or top_k <= 0: | |
| return ( | |
| jsonify( | |
| {"status": "error", "message": "top_k must be a positive integer"} | |
| ), | |
| 400, | |
| ) | |
| if not isinstance(threshold, (int, float)) or not (0.0 <= threshold <= 1.0): | |
| return ( | |
| jsonify( | |
| { | |
| "status": "error", | |
| "message": "threshold must be a number between 0 and 1", | |
| } | |
| ), | |
| 400, | |
| ) | |
| # Initialize search components | |
| from src.config import COLLECTION_NAME, VECTOR_DB_PERSIST_PATH | |
| from src.embedding.embedding_service import EmbeddingService | |
| from src.search.search_service import SearchService | |
| from src.vector_store.vector_db import VectorDatabase | |
| vector_db = VectorDatabase(VECTOR_DB_PERSIST_PATH, COLLECTION_NAME) | |
| embedding_service = EmbeddingService() | |
| search_service = SearchService(vector_db, embedding_service) | |
| # Perform search | |
| results = search_service.search( | |
| query=query.strip(), top_k=top_k, threshold=threshold | |
| ) | |
| # Format response | |
| response: Dict[str, Any] = { | |
| "status": "success", | |
| "query": query.strip(), | |
| "results_count": len(results), | |
| "results": results, | |
| } | |
| return jsonify(response) | |
| except ValueError as e: | |
| return jsonify({"status": "error", "message": str(e)}), 400 | |
| except Exception as e: | |
| return jsonify({"status": "error", "message": f"Search failed: {str(e)}"}), 500 | |
| def get_query_suggestions(): | |
| """ | |
| Get query suggestions based on available documents. | |
| Returns a list of suggested queries based on the most common topics | |
| in the document corpus. | |
| """ | |
| try: | |
| # In a real implementation, these might come from analytics or document metadata | |
| # For now, we'll return a static list of suggestions based on our corpus | |
| suggestions = [ | |
| "What is our remote work policy?", | |
| "How do I request time off?", | |
| "What are our information security guidelines?", | |
| "How does our expense reimbursement work?", | |
| "Tell me about our diversity and inclusion policy", | |
| "What's the process for employee performance reviews?", | |
| "How do I report an emergency at work?", | |
| "What professional development opportunities are available?", | |
| ] | |
| return jsonify({"status": "success", "suggestions": suggestions}) | |
| except Exception as e: | |
| return ( | |
| jsonify( | |
| { | |
| "status": "error", | |
| "message": f"Failed to retrieve suggestions: {str(e)}", | |
| } | |
| ), | |
| 500, | |
| ) | |
| def submit_feedback(): | |
| """ | |
| Submit feedback for a specific chat message. | |
| Collects user feedback on answer quality and relevance. | |
| """ | |
| try: | |
| # Get the feedback data from the request | |
| feedback_data = request.json | |
| if not feedback_data: | |
| return ( | |
| jsonify({"status": "error", "message": "No feedback data provided"}), | |
| 400, | |
| ) | |
| # Validate the required fields | |
| required_fields = ["conversation_id", "message_id", "feedback_type"] | |
| for field in required_fields: | |
| if field not in feedback_data: | |
| return ( | |
| jsonify( | |
| { | |
| "status": "error", | |
| "message": f"Missing required field: {field}", | |
| } | |
| ), | |
| 400, | |
| ) | |
| # Log the feedback for now | |
| # In a production system, you'd save this to a database | |
| print(f"Received feedback: {feedback_data}") | |
| # Return a success response | |
| return jsonify( | |
| { | |
| "status": "success", | |
| "message": "Feedback received", | |
| "feedback": feedback_data, | |
| } | |
| ) | |
| except Exception as e: | |
| print(f"Error processing feedback: {str(e)}") | |
| return ( | |
| jsonify( | |
| {"status": "error", "message": f"Error processing feedback: {str(e)}"} | |
| ), | |
| 500, | |
| ) | |
| def get_source_document(source_id: str): | |
| """ | |
| Get source document content by ID. | |
| Returns the content and metadata of a source document | |
| referenced in chat responses. | |
| """ | |
| try: | |
| # In a real implementation, you'd retrieve this from your vector store | |
| # For this implementation, we'll use a simplified approach with mock data | |
| # We'll use hardcoded mock data instead of actual imports | |
| # Map of source IDs to policy content | |
| # In a real implementation, this would come from your vector store | |
| from typing import Union | |
| source_map: Dict[str, Dict[str, Union[str, Dict[str, str]]]] = { | |
| "remote_work": { | |
| "content": ( | |
| "# Remote Work Policy\n\n" | |
| "Employees may work remotely up to 3 days per week with manager" | |
| " approval." | |
| ), | |
| "metadata": { | |
| "filename": "remote_work_policy.md", | |
| "last_updated": "2025-09-15", | |
| }, | |
| }, | |
| "pto": { | |
| "content": ( | |
| "# PTO Policy\n\n" | |
| "Full-time employees receive 20 days of PTO annually, accrued" | |
| " monthly." | |
| ), | |
| "metadata": {"filename": "pto_policy.md", "last_updated": "2025-08-20"}, | |
| }, | |
| "security": { | |
| "content": ( | |
| "# Information Security Policy\n\n" | |
| "All employees must use company-approved devices and software" | |
| " for work tasks." | |
| ), | |
| "metadata": { | |
| "filename": "information_security_policy.md", | |
| "last_updated": "2025-10-01", | |
| }, | |
| }, | |
| "expense": { | |
| "content": ( | |
| "# Expense Reimbursement\n\n" | |
| "Submit all expense reports within 30 days of incurring" | |
| " the expense." | |
| ), | |
| "metadata": { | |
| "filename": "expense_reimbursement_policy.md", | |
| "last_updated": "2025-07-10", | |
| }, | |
| }, | |
| } | |
| # Try to find the source in our mock data | |
| if source_id in source_map: | |
| source_data: Dict[str, Union[str, Dict[str, str]]] = source_map[source_id] | |
| return jsonify( | |
| { | |
| "status": "success", | |
| "source_id": source_id, | |
| "content": source_data["content"], | |
| "metadata": source_data["metadata"], | |
| } | |
| ) | |
| else: | |
| # If we don't find it, return a generic response | |
| return ( | |
| jsonify( | |
| { | |
| "status": "error", | |
| "message": f"Source document with ID {source_id} not found", | |
| } | |
| ), | |
| 404, | |
| ) | |
| except Exception as e: | |
| return ( | |
| jsonify( | |
| { | |
| "status": "error", | |
| "message": f"Failed to retrieve source document: {str(e)}", | |
| } | |
| ), | |
| 500, | |
| ) | |
| def chat(): | |
| """ | |
| Endpoint for conversational RAG interactions. | |
| Accepts JSON requests with user messages and returns AI-generated | |
| responses based on corporate policy documents. | |
| """ | |
| try: | |
| # Validate request contains JSON data | |
| if not request.is_json: | |
| return ( | |
| jsonify( | |
| { | |
| "status": "error", | |
| "message": "Content-Type must be application/json", | |
| } | |
| ), | |
| 400, | |
| ) | |
| data = request.get_json() | |
| # Validate required message parameter | |
| message = data.get("message") | |
| if message is None: | |
| return ( | |
| jsonify( | |
| {"status": "error", "message": "message parameter is required"} | |
| ), | |
| 400, | |
| ) | |
| if not isinstance(message, str) or not message.strip(): | |
| return ( | |
| jsonify( | |
| {"status": "error", "message": "message must be a non-empty string"} | |
| ), | |
| 400, | |
| ) | |
| # Extract optional parameters | |
| conversation_id = data.get("conversation_id") | |
| include_sources = data.get("include_sources", True) | |
| include_debug = data.get("include_debug", False) | |
| # Initialize RAG pipeline components | |
| try: | |
| from src.config import COLLECTION_NAME, VECTOR_DB_PERSIST_PATH | |
| from src.embedding.embedding_service import EmbeddingService | |
| from src.llm.llm_service import LLMService | |
| from src.rag.rag_pipeline import RAGPipeline | |
| from src.rag.response_formatter import ResponseFormatter | |
| from src.search.search_service import SearchService | |
| from src.vector_store.vector_db import VectorDatabase | |
| # Initialize services | |
| vector_db = VectorDatabase(VECTOR_DB_PERSIST_PATH, COLLECTION_NAME) | |
| embedding_service = EmbeddingService() | |
| search_service = SearchService(vector_db, embedding_service) | |
| # Initialize LLM service from environment | |
| llm_service = LLMService.from_environment() | |
| # Initialize RAG pipeline | |
| rag_pipeline = RAGPipeline(search_service, llm_service) | |
| # Initialize response formatter | |
| formatter = ResponseFormatter() | |
| except ValueError as e: | |
| return ( | |
| jsonify( | |
| { | |
| "status": "error", | |
| "message": f"LLM service configuration error: {str(e)}", | |
| "details": ( | |
| "Please ensure OPENROUTER_API_KEY or GROQ_API_KEY " | |
| "environment variables are set" | |
| ), | |
| } | |
| ), | |
| 503, | |
| ) | |
| except Exception as e: | |
| return ( | |
| jsonify( | |
| { | |
| "status": "error", | |
| "message": f"Service initialization failed: {str(e)}", | |
| } | |
| ), | |
| 500, | |
| ) | |
| # Generate RAG response | |
| rag_response = rag_pipeline.generate_answer(message.strip()) | |
| # Format response for API | |
| if include_sources: | |
| formatted_response = formatter.format_api_response( | |
| rag_response, include_debug | |
| ) | |
| else: | |
| formatted_response = formatter.format_chat_response( | |
| rag_response, conversation_id, include_sources=False | |
| ) | |
| return jsonify(formatted_response) | |
| except Exception as e: | |
| return ( | |
| jsonify({"status": "error", "message": f"Chat request failed: {str(e)}"}), | |
| 500, | |
| ) | |
| def get_conversations(): | |
| """ | |
| Get a list of all conversations for the current user. | |
| Returns conversation IDs, titles, and timestamps. | |
| """ | |
| # In a production system, you'd retrieve these from a database | |
| # For now, we'll create some mock data | |
| conversations = [ | |
| { | |
| "id": "conv-123456", | |
| "title": "HR Policy Questions", | |
| "timestamp": "2025-10-15T14:30:00Z", | |
| "preview": "What is our remote work policy?", | |
| }, | |
| { | |
| "id": "conv-789012", | |
| "title": "Project Planning Queries", | |
| "timestamp": "2025-10-14T09:15:00Z", | |
| "preview": "How do we handle project kickoffs?", | |
| }, | |
| { | |
| "id": "conv-345678", | |
| "title": "Security Compliance", | |
| "timestamp": "2025-10-12T16:45:00Z", | |
| "preview": "What are our password requirements?", | |
| }, | |
| ] | |
| return jsonify({"status": "success", "conversations": conversations}) | |
| def get_conversation(conversation_id: str): | |
| """ | |
| Get the full content of a specific conversation. | |
| Returns all messages in the conversation. | |
| """ | |
| try: | |
| # In a production system, you'd retrieve this from a database | |
| # For now, we'll create some mock data based on the ID | |
| # Mock conversation data | |
| if conversation_id == "conv-123456": | |
| from typing import List, Union | |
| messages: List[Dict[str, Union[str, List[Dict[str, str]]]]] = [ | |
| { | |
| "id": "msg-111", | |
| "role": "user", | |
| "content": "What is our remote work policy?", | |
| "timestamp": "2025-10-15T14:30:00Z", | |
| }, | |
| { | |
| "id": "msg-112", | |
| "role": "assistant", | |
| "content": ( | |
| "According to our remote work policy, employees may work " | |
| "up to 3 days per week with manager approval. You need to " | |
| "coordinate with your team to ensure adequate in-office " | |
| "coverage." | |
| ), | |
| "timestamp": "2025-10-15T14:30:15Z", | |
| "sources": [{"id": "remote_work", "title": "Remote Work Policy"}], | |
| }, | |
| ] | |
| elif conversation_id == "conv-789012": | |
| messages: List[Dict[str, Union[str, List[Dict[str, str]]]]] = [ | |
| { | |
| "id": "msg-221", | |
| "role": "user", | |
| "content": "How do we handle project kickoffs?", | |
| "timestamp": "2025-10-14T09:15:00Z", | |
| }, | |
| { | |
| "id": "msg-222", | |
| "role": "assistant", | |
| "content": ( | |
| "Our project kickoff procedure includes a meeting with all " | |
| "stakeholders, defining project scope and goals, establishing " | |
| "communication channels, and setting up the initial project " | |
| "timeline." | |
| ), | |
| "timestamp": "2025-10-14T09:15:30Z", | |
| "sources": [ | |
| {"id": "project_kickoff", "title": "Project Kickoff Procedure"} | |
| ], | |
| }, | |
| ] | |
| elif conversation_id == "conv-345678": | |
| messages: List[Dict[str, Union[str, List[Dict[str, str]]]]] = [ | |
| { | |
| "id": "msg-331", | |
| "role": "user", | |
| "content": "What are our password requirements?", | |
| "timestamp": "2025-10-12T16:45:00Z", | |
| }, | |
| { | |
| "id": "msg-332", | |
| "role": "assistant", | |
| "content": ( | |
| "Our security policy requires passwords to be at least " | |
| "12 characters long with a mix of uppercase letters, " | |
| "lowercase letters, numbers, and special characters. " | |
| "Passwords must be changed every 90 days and cannot be " | |
| "reused for 12 cycles." | |
| ), | |
| "timestamp": "2025-10-12T16:45:20Z", | |
| "sources": [ | |
| {"id": "security", "title": "Information Security Policy"} | |
| ], | |
| }, | |
| ] | |
| else: | |
| return ( | |
| jsonify( | |
| { | |
| "status": "error", | |
| "message": f"Conversation {conversation_id} not found", | |
| } | |
| ), | |
| 404, | |
| ) | |
| return jsonify( | |
| { | |
| "status": "success", | |
| "conversation_id": conversation_id, | |
| "messages": messages, | |
| } | |
| ) | |
| except Exception as e: | |
| return ( | |
| jsonify( | |
| { | |
| "status": "error", | |
| "message": f"Error retrieving conversation: {str(e)}", | |
| } | |
| ), | |
| 500, | |
| ) | |
| def chat_health(): | |
| """ | |
| Health check endpoint for RAG chat functionality. | |
| Returns the status of all RAG pipeline components. | |
| """ | |
| try: | |
| from src.config import COLLECTION_NAME, VECTOR_DB_PERSIST_PATH | |
| from src.embedding.embedding_service import EmbeddingService | |
| from src.llm.llm_service import LLMService | |
| from src.rag.rag_pipeline import RAGPipeline | |
| from src.rag.response_formatter import ResponseFormatter | |
| from src.search.search_service import SearchService | |
| from src.vector_store.vector_db import VectorDatabase | |
| # Initialize services for health check | |
| vector_db = VectorDatabase(VECTOR_DB_PERSIST_PATH, COLLECTION_NAME) | |
| embedding_service = EmbeddingService() | |
| search_service = SearchService(vector_db, embedding_service) | |
| try: | |
| llm_service = LLMService.from_environment() | |
| rag_pipeline = RAGPipeline(search_service, llm_service) | |
| formatter = ResponseFormatter() | |
| # Perform health check | |
| health_data = rag_pipeline.health_check() | |
| health_response = formatter.create_health_response(health_data) | |
| # Determine HTTP status based on health | |
| if health_data.get("pipeline") == "healthy": | |
| return jsonify(health_response), 200 | |
| elif health_data.get("pipeline") == "degraded": | |
| return jsonify(health_response), 200 # Still functional | |
| else: | |
| return jsonify(health_response), 503 # Service unavailable | |
| except ValueError as e: | |
| return ( | |
| jsonify( | |
| { | |
| "status": "error", | |
| "message": f"LLM configuration error: {str(e)}", | |
| "health": { | |
| "pipeline_status": "unhealthy", | |
| "components": { | |
| "llm_service": { | |
| "status": "unconfigured", | |
| "error": str(e), | |
| } | |
| }, | |
| }, | |
| } | |
| ), | |
| 503, | |
| ) | |
| except ValueError as e: | |
| # Specific handling for LLM configuration errors | |
| return ( | |
| jsonify( | |
| { | |
| "status": "error", | |
| "message": f"LLM configuration error: {str(e)}", | |
| "health": { | |
| "pipeline_status": "unhealthy", | |
| "components": { | |
| "llm_service": { | |
| "status": "unconfigured", | |
| "error": str(e), | |
| } | |
| }, | |
| }, | |
| } | |
| ), | |
| 503, | |
| ) | |
| except Exception as e: | |
| return ( | |
| jsonify({"status": "error", "message": f"Health check failed: {str(e)}"}), | |
| 500, | |
| ) | |
| if __name__ == "__main__": | |
| port = int(os.environ.get("PORT", 8080)) | |
| app.run(debug=True, host="0.0.0.0", port=port) | |