Seth McKnight
feat: Add specific error handling for LLM configuration in health checks (#55)
d14a473
raw
history blame
25.7 kB
import os
# Import type annotations
from typing import Any, Dict
from dotenv import load_dotenv
from flask import Flask, jsonify, render_template, request
# Load environment variables from .env file
load_dotenv()
# Proactively disable ChromaDB telemetry via environment variables so
# the library doesn't attempt to call external PostHog telemetry endpoints.
# This helps avoid noisy errors in server logs (Render may not expose
# the expected device files or telemetry endpoints).
os.environ.setdefault("ANONYMIZED_TELEMETRY", "False")
os.environ.setdefault("CHROMA_TELEMETRY", "False")
# Attempt to configure chromadb and monkeypatch any telemetry capture
# functions to be no-ops. Some chromadb versions call posthog.capture
# with a different signature which can raise exceptions during runtime
# (observed on Render as: capture() takes 1 positional argument but 3 were given).
try:
import chromadb
try:
chromadb.configure(anonymized_telemetry=False) # type: ignore
except Exception:
# Non-fatal: continue and still try to neutralize telemetry functions
pass
# Defensive monkeypatch: if the telemetry client exists, replace capture
# with a safe no-op that accepts any args/kwargs to avoid signature issues.
try:
from chromadb.telemetry.product import posthog as _posthog # type: ignore
# Replace module-level capture and Posthog.capture if present
if hasattr(_posthog, "capture"):
setattr(_posthog, "capture", lambda *args, **kwargs: None)
if hasattr(_posthog, "Posthog") and hasattr(_posthog.Posthog, "capture"):
setattr(_posthog.Posthog, "capture", lambda *args, **kwargs: None)
except Exception:
# If telemetry internals aren't present or change across versions, ignore
pass
except Exception:
# chromadb not installed or import failed; continue without telemetry
pass
app = Flask(__name__)
@app.route("/")
def index():
"""
Renders the chat interface.
"""
return render_template("chat.html")
@app.route("/health")
def health():
"""
Health check endpoint.
"""
return jsonify({"status": "ok"}), 200
@app.route("/ingest", methods=["POST"])
def ingest():
"""Endpoint to trigger document ingestion with embeddings"""
try:
from src.config import (
CORPUS_DIRECTORY,
DEFAULT_CHUNK_SIZE,
DEFAULT_OVERLAP,
RANDOM_SEED,
)
from src.ingestion.ingestion_pipeline import IngestionPipeline
# Get optional parameters from request
data: Dict[str, Any] = request.get_json() if request.is_json else {}
store_embeddings: bool = bool(data.get("store_embeddings", True))
pipeline = IngestionPipeline(
chunk_size=DEFAULT_CHUNK_SIZE,
overlap=DEFAULT_OVERLAP,
seed=RANDOM_SEED,
store_embeddings=store_embeddings,
)
result = pipeline.process_directory_with_embeddings(CORPUS_DIRECTORY)
# Create response with enhanced information
response: Dict[str, Any] = {
"status": result["status"],
"chunks_processed": result["chunks_processed"],
"files_processed": result["files_processed"],
"embeddings_stored": result["embeddings_stored"],
"store_embeddings": result["store_embeddings"],
"message": (
f"Successfully processed {result['chunks_processed']} chunks "
f"from {result['files_processed']} files"
),
}
# Include failed files info if any
if result["failed_files"]:
response["failed_files"] = result["failed_files"]
failed_count = len(result["failed_files"])
response["warnings"] = f"{failed_count} files failed to process"
return jsonify(response)
except Exception as e:
return jsonify({"status": "error", "message": str(e)}), 500
@app.route("/search", methods=["POST"])
def search():
"""
Endpoint to perform semantic search on ingested documents.
Accepts JSON requests with query text and optional parameters.
Returns semantically similar document chunks.
"""
try:
# Validate request contains JSON data
if not request.is_json:
return (
jsonify(
{
"status": "error",
"message": "Content-Type must be application/json",
}
),
400,
)
data = request.get_json()
# Validate required query parameter
query = data.get("query")
if query is None:
return (
jsonify({"status": "error", "message": "Query parameter is required"}),
400,
)
if not isinstance(query, str) or not query.strip():
return (
jsonify(
{"status": "error", "message": "Query must be a non-empty string"}
),
400,
)
# Extract optional parameters with defaults
top_k = data.get("top_k", 5)
threshold = data.get("threshold", 0.3)
# Validate parameters
if not isinstance(top_k, int) or top_k <= 0:
return (
jsonify(
{"status": "error", "message": "top_k must be a positive integer"}
),
400,
)
if not isinstance(threshold, (int, float)) or not (0.0 <= threshold <= 1.0):
return (
jsonify(
{
"status": "error",
"message": "threshold must be a number between 0 and 1",
}
),
400,
)
# Initialize search components
from src.config import COLLECTION_NAME, VECTOR_DB_PERSIST_PATH
from src.embedding.embedding_service import EmbeddingService
from src.search.search_service import SearchService
from src.vector_store.vector_db import VectorDatabase
vector_db = VectorDatabase(VECTOR_DB_PERSIST_PATH, COLLECTION_NAME)
embedding_service = EmbeddingService()
search_service = SearchService(vector_db, embedding_service)
# Perform search
results = search_service.search(
query=query.strip(), top_k=top_k, threshold=threshold
)
# Format response
response: Dict[str, Any] = {
"status": "success",
"query": query.strip(),
"results_count": len(results),
"results": results,
}
return jsonify(response)
except ValueError as e:
return jsonify({"status": "error", "message": str(e)}), 400
except Exception as e:
return jsonify({"status": "error", "message": f"Search failed: {str(e)}"}), 500
@app.route("/chat/suggestions")
def get_query_suggestions():
"""
Get query suggestions based on available documents.
Returns a list of suggested queries based on the most common topics
in the document corpus.
"""
try:
# In a real implementation, these might come from analytics or document metadata
# For now, we'll return a static list of suggestions based on our corpus
suggestions = [
"What is our remote work policy?",
"How do I request time off?",
"What are our information security guidelines?",
"How does our expense reimbursement work?",
"Tell me about our diversity and inclusion policy",
"What's the process for employee performance reviews?",
"How do I report an emergency at work?",
"What professional development opportunities are available?",
]
return jsonify({"status": "success", "suggestions": suggestions})
except Exception as e:
return (
jsonify(
{
"status": "error",
"message": f"Failed to retrieve suggestions: {str(e)}",
}
),
500,
)
@app.route("/chat/feedback", methods=["POST"])
def submit_feedback():
"""
Submit feedback for a specific chat message.
Collects user feedback on answer quality and relevance.
"""
try:
# Get the feedback data from the request
feedback_data = request.json
if not feedback_data:
return (
jsonify({"status": "error", "message": "No feedback data provided"}),
400,
)
# Validate the required fields
required_fields = ["conversation_id", "message_id", "feedback_type"]
for field in required_fields:
if field not in feedback_data:
return (
jsonify(
{
"status": "error",
"message": f"Missing required field: {field}",
}
),
400,
)
# Log the feedback for now
# In a production system, you'd save this to a database
print(f"Received feedback: {feedback_data}")
# Return a success response
return jsonify(
{
"status": "success",
"message": "Feedback received",
"feedback": feedback_data,
}
)
except Exception as e:
print(f"Error processing feedback: {str(e)}")
return (
jsonify(
{"status": "error", "message": f"Error processing feedback: {str(e)}"}
),
500,
)
@app.route("/chat/source/<source_id>")
def get_source_document(source_id: str):
"""
Get source document content by ID.
Returns the content and metadata of a source document
referenced in chat responses.
"""
try:
# In a real implementation, you'd retrieve this from your vector store
# For this implementation, we'll use a simplified approach with mock data
# We'll use hardcoded mock data instead of actual imports
# Map of source IDs to policy content
# In a real implementation, this would come from your vector store
from typing import Union
source_map: Dict[str, Dict[str, Union[str, Dict[str, str]]]] = {
"remote_work": {
"content": (
"# Remote Work Policy\n\n"
"Employees may work remotely up to 3 days per week with manager"
" approval."
),
"metadata": {
"filename": "remote_work_policy.md",
"last_updated": "2025-09-15",
},
},
"pto": {
"content": (
"# PTO Policy\n\n"
"Full-time employees receive 20 days of PTO annually, accrued"
" monthly."
),
"metadata": {"filename": "pto_policy.md", "last_updated": "2025-08-20"},
},
"security": {
"content": (
"# Information Security Policy\n\n"
"All employees must use company-approved devices and software"
" for work tasks."
),
"metadata": {
"filename": "information_security_policy.md",
"last_updated": "2025-10-01",
},
},
"expense": {
"content": (
"# Expense Reimbursement\n\n"
"Submit all expense reports within 30 days of incurring"
" the expense."
),
"metadata": {
"filename": "expense_reimbursement_policy.md",
"last_updated": "2025-07-10",
},
},
}
# Try to find the source in our mock data
if source_id in source_map:
source_data: Dict[str, Union[str, Dict[str, str]]] = source_map[source_id]
return jsonify(
{
"status": "success",
"source_id": source_id,
"content": source_data["content"],
"metadata": source_data["metadata"],
}
)
else:
# If we don't find it, return a generic response
return (
jsonify(
{
"status": "error",
"message": f"Source document with ID {source_id} not found",
}
),
404,
)
except Exception as e:
return (
jsonify(
{
"status": "error",
"message": f"Failed to retrieve source document: {str(e)}",
}
),
500,
)
@app.route("/chat", methods=["POST"])
def chat():
"""
Endpoint for conversational RAG interactions.
Accepts JSON requests with user messages and returns AI-generated
responses based on corporate policy documents.
"""
try:
# Validate request contains JSON data
if not request.is_json:
return (
jsonify(
{
"status": "error",
"message": "Content-Type must be application/json",
}
),
400,
)
data = request.get_json()
# Validate required message parameter
message = data.get("message")
if message is None:
return (
jsonify(
{"status": "error", "message": "message parameter is required"}
),
400,
)
if not isinstance(message, str) or not message.strip():
return (
jsonify(
{"status": "error", "message": "message must be a non-empty string"}
),
400,
)
# Extract optional parameters
conversation_id = data.get("conversation_id")
include_sources = data.get("include_sources", True)
include_debug = data.get("include_debug", False)
# Initialize RAG pipeline components
try:
from src.config import COLLECTION_NAME, VECTOR_DB_PERSIST_PATH
from src.embedding.embedding_service import EmbeddingService
from src.llm.llm_service import LLMService
from src.rag.rag_pipeline import RAGPipeline
from src.rag.response_formatter import ResponseFormatter
from src.search.search_service import SearchService
from src.vector_store.vector_db import VectorDatabase
# Initialize services
vector_db = VectorDatabase(VECTOR_DB_PERSIST_PATH, COLLECTION_NAME)
embedding_service = EmbeddingService()
search_service = SearchService(vector_db, embedding_service)
# Initialize LLM service from environment
llm_service = LLMService.from_environment()
# Initialize RAG pipeline
rag_pipeline = RAGPipeline(search_service, llm_service)
# Initialize response formatter
formatter = ResponseFormatter()
except ValueError as e:
return (
jsonify(
{
"status": "error",
"message": f"LLM service configuration error: {str(e)}",
"details": (
"Please ensure OPENROUTER_API_KEY or GROQ_API_KEY "
"environment variables are set"
),
}
),
503,
)
except Exception as e:
return (
jsonify(
{
"status": "error",
"message": f"Service initialization failed: {str(e)}",
}
),
500,
)
# Generate RAG response
rag_response = rag_pipeline.generate_answer(message.strip())
# Format response for API
if include_sources:
formatted_response = formatter.format_api_response(
rag_response, include_debug
)
else:
formatted_response = formatter.format_chat_response(
rag_response, conversation_id, include_sources=False
)
return jsonify(formatted_response)
except Exception as e:
return (
jsonify({"status": "error", "message": f"Chat request failed: {str(e)}"}),
500,
)
@app.route("/conversations", methods=["GET"])
def get_conversations():
"""
Get a list of all conversations for the current user.
Returns conversation IDs, titles, and timestamps.
"""
# In a production system, you'd retrieve these from a database
# For now, we'll create some mock data
conversations = [
{
"id": "conv-123456",
"title": "HR Policy Questions",
"timestamp": "2025-10-15T14:30:00Z",
"preview": "What is our remote work policy?",
},
{
"id": "conv-789012",
"title": "Project Planning Queries",
"timestamp": "2025-10-14T09:15:00Z",
"preview": "How do we handle project kickoffs?",
},
{
"id": "conv-345678",
"title": "Security Compliance",
"timestamp": "2025-10-12T16:45:00Z",
"preview": "What are our password requirements?",
},
]
return jsonify({"status": "success", "conversations": conversations})
@app.route("/conversations/<conversation_id>", methods=["GET"])
def get_conversation(conversation_id: str):
"""
Get the full content of a specific conversation.
Returns all messages in the conversation.
"""
try:
# In a production system, you'd retrieve this from a database
# For now, we'll create some mock data based on the ID
# Mock conversation data
if conversation_id == "conv-123456":
from typing import List, Union
messages: List[Dict[str, Union[str, List[Dict[str, str]]]]] = [
{
"id": "msg-111",
"role": "user",
"content": "What is our remote work policy?",
"timestamp": "2025-10-15T14:30:00Z",
},
{
"id": "msg-112",
"role": "assistant",
"content": (
"According to our remote work policy, employees may work "
"up to 3 days per week with manager approval. You need to "
"coordinate with your team to ensure adequate in-office "
"coverage."
),
"timestamp": "2025-10-15T14:30:15Z",
"sources": [{"id": "remote_work", "title": "Remote Work Policy"}],
},
]
elif conversation_id == "conv-789012":
messages: List[Dict[str, Union[str, List[Dict[str, str]]]]] = [
{
"id": "msg-221",
"role": "user",
"content": "How do we handle project kickoffs?",
"timestamp": "2025-10-14T09:15:00Z",
},
{
"id": "msg-222",
"role": "assistant",
"content": (
"Our project kickoff procedure includes a meeting with all "
"stakeholders, defining project scope and goals, establishing "
"communication channels, and setting up the initial project "
"timeline."
),
"timestamp": "2025-10-14T09:15:30Z",
"sources": [
{"id": "project_kickoff", "title": "Project Kickoff Procedure"}
],
},
]
elif conversation_id == "conv-345678":
messages: List[Dict[str, Union[str, List[Dict[str, str]]]]] = [
{
"id": "msg-331",
"role": "user",
"content": "What are our password requirements?",
"timestamp": "2025-10-12T16:45:00Z",
},
{
"id": "msg-332",
"role": "assistant",
"content": (
"Our security policy requires passwords to be at least "
"12 characters long with a mix of uppercase letters, "
"lowercase letters, numbers, and special characters. "
"Passwords must be changed every 90 days and cannot be "
"reused for 12 cycles."
),
"timestamp": "2025-10-12T16:45:20Z",
"sources": [
{"id": "security", "title": "Information Security Policy"}
],
},
]
else:
return (
jsonify(
{
"status": "error",
"message": f"Conversation {conversation_id} not found",
}
),
404,
)
return jsonify(
{
"status": "success",
"conversation_id": conversation_id,
"messages": messages,
}
)
except Exception as e:
return (
jsonify(
{
"status": "error",
"message": f"Error retrieving conversation: {str(e)}",
}
),
500,
)
@app.route("/chat/health", methods=["GET"])
def chat_health():
"""
Health check endpoint for RAG chat functionality.
Returns the status of all RAG pipeline components.
"""
try:
from src.config import COLLECTION_NAME, VECTOR_DB_PERSIST_PATH
from src.embedding.embedding_service import EmbeddingService
from src.llm.llm_service import LLMService
from src.rag.rag_pipeline import RAGPipeline
from src.rag.response_formatter import ResponseFormatter
from src.search.search_service import SearchService
from src.vector_store.vector_db import VectorDatabase
# Initialize services for health check
vector_db = VectorDatabase(VECTOR_DB_PERSIST_PATH, COLLECTION_NAME)
embedding_service = EmbeddingService()
search_service = SearchService(vector_db, embedding_service)
try:
llm_service = LLMService.from_environment()
rag_pipeline = RAGPipeline(search_service, llm_service)
formatter = ResponseFormatter()
# Perform health check
health_data = rag_pipeline.health_check()
health_response = formatter.create_health_response(health_data)
# Determine HTTP status based on health
if health_data.get("pipeline") == "healthy":
return jsonify(health_response), 200
elif health_data.get("pipeline") == "degraded":
return jsonify(health_response), 200 # Still functional
else:
return jsonify(health_response), 503 # Service unavailable
except ValueError as e:
return (
jsonify(
{
"status": "error",
"message": f"LLM configuration error: {str(e)}",
"health": {
"pipeline_status": "unhealthy",
"components": {
"llm_service": {
"status": "unconfigured",
"error": str(e),
}
},
},
}
),
503,
)
except ValueError as e:
# Specific handling for LLM configuration errors
return (
jsonify(
{
"status": "error",
"message": f"LLM configuration error: {str(e)}",
"health": {
"pipeline_status": "unhealthy",
"components": {
"llm_service": {
"status": "unconfigured",
"error": str(e),
}
},
},
}
),
503,
)
except Exception as e:
return (
jsonify({"status": "error", "message": f"Health check failed: {str(e)}"}),
500,
)
if __name__ == "__main__":
port = int(os.environ.get("PORT", 8080))
app.run(debug=True, host="0.0.0.0", port=port)