import os # Import type annotations from typing import Any, Dict from dotenv import load_dotenv from flask import Flask, jsonify, render_template, request # Load environment variables from .env file load_dotenv() # Proactively disable ChromaDB telemetry via environment variables so # the library doesn't attempt to call external PostHog telemetry endpoints. # This helps avoid noisy errors in server logs (Render may not expose # the expected device files or telemetry endpoints). os.environ.setdefault("ANONYMIZED_TELEMETRY", "False") os.environ.setdefault("CHROMA_TELEMETRY", "False") # Attempt to configure chromadb and monkeypatch any telemetry capture # functions to be no-ops. Some chromadb versions call posthog.capture # with a different signature which can raise exceptions during runtime # (observed on Render as: capture() takes 1 positional argument but 3 were given). try: import chromadb try: chromadb.configure(anonymized_telemetry=False) # type: ignore except Exception: # Non-fatal: continue and still try to neutralize telemetry functions pass # Defensive monkeypatch: if the telemetry client exists, replace capture # with a safe no-op that accepts any args/kwargs to avoid signature issues. try: from chromadb.telemetry.product import posthog as _posthog # type: ignore # Replace module-level capture and Posthog.capture if present if hasattr(_posthog, "capture"): setattr(_posthog, "capture", lambda *args, **kwargs: None) if hasattr(_posthog, "Posthog") and hasattr(_posthog.Posthog, "capture"): setattr(_posthog.Posthog, "capture", lambda *args, **kwargs: None) except Exception: # If telemetry internals aren't present or change across versions, ignore pass except Exception: # chromadb not installed or import failed; continue without telemetry pass app = Flask(__name__) @app.route("/") def index(): """ Renders the chat interface. """ return render_template("chat.html") @app.route("/health") def health(): """ Health check endpoint. """ return jsonify({"status": "ok"}), 200 @app.route("/ingest", methods=["POST"]) def ingest(): """Endpoint to trigger document ingestion with embeddings""" try: from src.config import ( CORPUS_DIRECTORY, DEFAULT_CHUNK_SIZE, DEFAULT_OVERLAP, RANDOM_SEED, ) from src.ingestion.ingestion_pipeline import IngestionPipeline # Get optional parameters from request data: Dict[str, Any] = request.get_json() if request.is_json else {} store_embeddings: bool = bool(data.get("store_embeddings", True)) pipeline = IngestionPipeline( chunk_size=DEFAULT_CHUNK_SIZE, overlap=DEFAULT_OVERLAP, seed=RANDOM_SEED, store_embeddings=store_embeddings, ) result = pipeline.process_directory_with_embeddings(CORPUS_DIRECTORY) # Create response with enhanced information response: Dict[str, Any] = { "status": result["status"], "chunks_processed": result["chunks_processed"], "files_processed": result["files_processed"], "embeddings_stored": result["embeddings_stored"], "store_embeddings": result["store_embeddings"], "message": ( f"Successfully processed {result['chunks_processed']} chunks " f"from {result['files_processed']} files" ), } # Include failed files info if any if result["failed_files"]: response["failed_files"] = result["failed_files"] failed_count = len(result["failed_files"]) response["warnings"] = f"{failed_count} files failed to process" return jsonify(response) except Exception as e: return jsonify({"status": "error", "message": str(e)}), 500 @app.route("/search", methods=["POST"]) def search(): """ Endpoint to perform semantic search on ingested documents. Accepts JSON requests with query text and optional parameters. Returns semantically similar document chunks. """ try: # Validate request contains JSON data if not request.is_json: return ( jsonify( { "status": "error", "message": "Content-Type must be application/json", } ), 400, ) data = request.get_json() # Validate required query parameter query = data.get("query") if query is None: return ( jsonify({"status": "error", "message": "Query parameter is required"}), 400, ) if not isinstance(query, str) or not query.strip(): return ( jsonify( {"status": "error", "message": "Query must be a non-empty string"} ), 400, ) # Extract optional parameters with defaults top_k = data.get("top_k", 5) threshold = data.get("threshold", 0.3) # Validate parameters if not isinstance(top_k, int) or top_k <= 0: return ( jsonify( {"status": "error", "message": "top_k must be a positive integer"} ), 400, ) if not isinstance(threshold, (int, float)) or not (0.0 <= threshold <= 1.0): return ( jsonify( { "status": "error", "message": "threshold must be a number between 0 and 1", } ), 400, ) # Initialize search components from src.config import COLLECTION_NAME, VECTOR_DB_PERSIST_PATH from src.embedding.embedding_service import EmbeddingService from src.search.search_service import SearchService from src.vector_store.vector_db import VectorDatabase vector_db = VectorDatabase(VECTOR_DB_PERSIST_PATH, COLLECTION_NAME) embedding_service = EmbeddingService() search_service = SearchService(vector_db, embedding_service) # Perform search results = search_service.search( query=query.strip(), top_k=top_k, threshold=threshold ) # Format response response: Dict[str, Any] = { "status": "success", "query": query.strip(), "results_count": len(results), "results": results, } return jsonify(response) except ValueError as e: return jsonify({"status": "error", "message": str(e)}), 400 except Exception as e: return jsonify({"status": "error", "message": f"Search failed: {str(e)}"}), 500 @app.route("/chat/suggestions") def get_query_suggestions(): """ Get query suggestions based on available documents. Returns a list of suggested queries based on the most common topics in the document corpus. """ try: # In a real implementation, these might come from analytics or document metadata # For now, we'll return a static list of suggestions based on our corpus suggestions = [ "What is our remote work policy?", "How do I request time off?", "What are our information security guidelines?", "How does our expense reimbursement work?", "Tell me about our diversity and inclusion policy", "What's the process for employee performance reviews?", "How do I report an emergency at work?", "What professional development opportunities are available?", ] return jsonify({"status": "success", "suggestions": suggestions}) except Exception as e: return ( jsonify( { "status": "error", "message": f"Failed to retrieve suggestions: {str(e)}", } ), 500, ) @app.route("/chat/feedback", methods=["POST"]) def submit_feedback(): """ Submit feedback for a specific chat message. Collects user feedback on answer quality and relevance. """ try: # Get the feedback data from the request feedback_data = request.json if not feedback_data: return ( jsonify({"status": "error", "message": "No feedback data provided"}), 400, ) # Validate the required fields required_fields = ["conversation_id", "message_id", "feedback_type"] for field in required_fields: if field not in feedback_data: return ( jsonify( { "status": "error", "message": f"Missing required field: {field}", } ), 400, ) # Log the feedback for now # In a production system, you'd save this to a database print(f"Received feedback: {feedback_data}") # Return a success response return jsonify( { "status": "success", "message": "Feedback received", "feedback": feedback_data, } ) except Exception as e: print(f"Error processing feedback: {str(e)}") return ( jsonify( {"status": "error", "message": f"Error processing feedback: {str(e)}"} ), 500, ) @app.route("/chat/source/") def get_source_document(source_id: str): """ Get source document content by ID. Returns the content and metadata of a source document referenced in chat responses. """ try: # In a real implementation, you'd retrieve this from your vector store # For this implementation, we'll use a simplified approach with mock data # We'll use hardcoded mock data instead of actual imports # Map of source IDs to policy content # In a real implementation, this would come from your vector store from typing import Union source_map: Dict[str, Dict[str, Union[str, Dict[str, str]]]] = { "remote_work": { "content": ( "# Remote Work Policy\n\n" "Employees may work remotely up to 3 days per week with manager" " approval." ), "metadata": { "filename": "remote_work_policy.md", "last_updated": "2025-09-15", }, }, "pto": { "content": ( "# PTO Policy\n\n" "Full-time employees receive 20 days of PTO annually, accrued" " monthly." ), "metadata": {"filename": "pto_policy.md", "last_updated": "2025-08-20"}, }, "security": { "content": ( "# Information Security Policy\n\n" "All employees must use company-approved devices and software" " for work tasks." ), "metadata": { "filename": "information_security_policy.md", "last_updated": "2025-10-01", }, }, "expense": { "content": ( "# Expense Reimbursement\n\n" "Submit all expense reports within 30 days of incurring" " the expense." ), "metadata": { "filename": "expense_reimbursement_policy.md", "last_updated": "2025-07-10", }, }, } # Try to find the source in our mock data if source_id in source_map: source_data: Dict[str, Union[str, Dict[str, str]]] = source_map[source_id] return jsonify( { "status": "success", "source_id": source_id, "content": source_data["content"], "metadata": source_data["metadata"], } ) else: # If we don't find it, return a generic response return ( jsonify( { "status": "error", "message": f"Source document with ID {source_id} not found", } ), 404, ) except Exception as e: return ( jsonify( { "status": "error", "message": f"Failed to retrieve source document: {str(e)}", } ), 500, ) @app.route("/chat", methods=["POST"]) def chat(): """ Endpoint for conversational RAG interactions. Accepts JSON requests with user messages and returns AI-generated responses based on corporate policy documents. """ try: # Validate request contains JSON data if not request.is_json: return ( jsonify( { "status": "error", "message": "Content-Type must be application/json", } ), 400, ) data = request.get_json() # Validate required message parameter message = data.get("message") if message is None: return ( jsonify( {"status": "error", "message": "message parameter is required"} ), 400, ) if not isinstance(message, str) or not message.strip(): return ( jsonify( {"status": "error", "message": "message must be a non-empty string"} ), 400, ) # Extract optional parameters conversation_id = data.get("conversation_id") include_sources = data.get("include_sources", True) include_debug = data.get("include_debug", False) # Initialize RAG pipeline components try: from src.config import COLLECTION_NAME, VECTOR_DB_PERSIST_PATH from src.embedding.embedding_service import EmbeddingService from src.llm.llm_service import LLMService from src.rag.rag_pipeline import RAGPipeline from src.rag.response_formatter import ResponseFormatter from src.search.search_service import SearchService from src.vector_store.vector_db import VectorDatabase # Initialize services vector_db = VectorDatabase(VECTOR_DB_PERSIST_PATH, COLLECTION_NAME) embedding_service = EmbeddingService() search_service = SearchService(vector_db, embedding_service) # Initialize LLM service from environment llm_service = LLMService.from_environment() # Initialize RAG pipeline rag_pipeline = RAGPipeline(search_service, llm_service) # Initialize response formatter formatter = ResponseFormatter() except ValueError as e: return ( jsonify( { "status": "error", "message": f"LLM service configuration error: {str(e)}", "details": ( "Please ensure OPENROUTER_API_KEY or GROQ_API_KEY " "environment variables are set" ), } ), 503, ) except Exception as e: return ( jsonify( { "status": "error", "message": f"Service initialization failed: {str(e)}", } ), 500, ) # Generate RAG response rag_response = rag_pipeline.generate_answer(message.strip()) # Format response for API if include_sources: formatted_response = formatter.format_api_response( rag_response, include_debug ) else: formatted_response = formatter.format_chat_response( rag_response, conversation_id, include_sources=False ) return jsonify(formatted_response) except Exception as e: return ( jsonify({"status": "error", "message": f"Chat request failed: {str(e)}"}), 500, ) @app.route("/conversations", methods=["GET"]) def get_conversations(): """ Get a list of all conversations for the current user. Returns conversation IDs, titles, and timestamps. """ # In a production system, you'd retrieve these from a database # For now, we'll create some mock data conversations = [ { "id": "conv-123456", "title": "HR Policy Questions", "timestamp": "2025-10-15T14:30:00Z", "preview": "What is our remote work policy?", }, { "id": "conv-789012", "title": "Project Planning Queries", "timestamp": "2025-10-14T09:15:00Z", "preview": "How do we handle project kickoffs?", }, { "id": "conv-345678", "title": "Security Compliance", "timestamp": "2025-10-12T16:45:00Z", "preview": "What are our password requirements?", }, ] return jsonify({"status": "success", "conversations": conversations}) @app.route("/conversations/", methods=["GET"]) def get_conversation(conversation_id: str): """ Get the full content of a specific conversation. Returns all messages in the conversation. """ try: # In a production system, you'd retrieve this from a database # For now, we'll create some mock data based on the ID # Mock conversation data if conversation_id == "conv-123456": from typing import List, Union messages: List[Dict[str, Union[str, List[Dict[str, str]]]]] = [ { "id": "msg-111", "role": "user", "content": "What is our remote work policy?", "timestamp": "2025-10-15T14:30:00Z", }, { "id": "msg-112", "role": "assistant", "content": ( "According to our remote work policy, employees may work " "up to 3 days per week with manager approval. You need to " "coordinate with your team to ensure adequate in-office " "coverage." ), "timestamp": "2025-10-15T14:30:15Z", "sources": [{"id": "remote_work", "title": "Remote Work Policy"}], }, ] elif conversation_id == "conv-789012": messages: List[Dict[str, Union[str, List[Dict[str, str]]]]] = [ { "id": "msg-221", "role": "user", "content": "How do we handle project kickoffs?", "timestamp": "2025-10-14T09:15:00Z", }, { "id": "msg-222", "role": "assistant", "content": ( "Our project kickoff procedure includes a meeting with all " "stakeholders, defining project scope and goals, establishing " "communication channels, and setting up the initial project " "timeline." ), "timestamp": "2025-10-14T09:15:30Z", "sources": [ {"id": "project_kickoff", "title": "Project Kickoff Procedure"} ], }, ] elif conversation_id == "conv-345678": messages: List[Dict[str, Union[str, List[Dict[str, str]]]]] = [ { "id": "msg-331", "role": "user", "content": "What are our password requirements?", "timestamp": "2025-10-12T16:45:00Z", }, { "id": "msg-332", "role": "assistant", "content": ( "Our security policy requires passwords to be at least " "12 characters long with a mix of uppercase letters, " "lowercase letters, numbers, and special characters. " "Passwords must be changed every 90 days and cannot be " "reused for 12 cycles." ), "timestamp": "2025-10-12T16:45:20Z", "sources": [ {"id": "security", "title": "Information Security Policy"} ], }, ] else: return ( jsonify( { "status": "error", "message": f"Conversation {conversation_id} not found", } ), 404, ) return jsonify( { "status": "success", "conversation_id": conversation_id, "messages": messages, } ) except Exception as e: return ( jsonify( { "status": "error", "message": f"Error retrieving conversation: {str(e)}", } ), 500, ) @app.route("/chat/health", methods=["GET"]) def chat_health(): """ Health check endpoint for RAG chat functionality. Returns the status of all RAG pipeline components. """ try: from src.config import COLLECTION_NAME, VECTOR_DB_PERSIST_PATH from src.embedding.embedding_service import EmbeddingService from src.llm.llm_service import LLMService from src.rag.rag_pipeline import RAGPipeline from src.rag.response_formatter import ResponseFormatter from src.search.search_service import SearchService from src.vector_store.vector_db import VectorDatabase # Initialize services for health check vector_db = VectorDatabase(VECTOR_DB_PERSIST_PATH, COLLECTION_NAME) embedding_service = EmbeddingService() search_service = SearchService(vector_db, embedding_service) try: llm_service = LLMService.from_environment() rag_pipeline = RAGPipeline(search_service, llm_service) formatter = ResponseFormatter() # Perform health check health_data = rag_pipeline.health_check() health_response = formatter.create_health_response(health_data) # Determine HTTP status based on health if health_data.get("pipeline") == "healthy": return jsonify(health_response), 200 elif health_data.get("pipeline") == "degraded": return jsonify(health_response), 200 # Still functional else: return jsonify(health_response), 503 # Service unavailable except ValueError as e: return ( jsonify( { "status": "error", "message": f"LLM configuration error: {str(e)}", "health": { "pipeline_status": "unhealthy", "components": { "llm_service": { "status": "unconfigured", "error": str(e), } }, }, } ), 503, ) except ValueError as e: # Specific handling for LLM configuration errors return ( jsonify( { "status": "error", "message": f"LLM configuration error: {str(e)}", "health": { "pipeline_status": "unhealthy", "components": { "llm_service": { "status": "unconfigured", "error": str(e), } }, }, } ), 503, ) except Exception as e: return ( jsonify({"status": "error", "message": f"Health check failed: {str(e)}"}), 500, ) if __name__ == "__main__": port = int(os.environ.get("PORT", 8080)) app.run(debug=True, host="0.0.0.0", port=port)