from flask import Flask, jsonify, render_template, request # Disable ChromaDB anonymized telemetry for local development so the # library doesn't attempt to call external PostHog telemetry endpoints. # This avoids noisy errors in server logs and respects developer privacy. try: import chromadb # Turn off anonymized telemetry (the chromadb package defaults this to True) chromadb.configure(anonymized_telemetry=False) except Exception: # If chromadb isn't installed in this environment yet, ignore silently. pass app = Flask(__name__) @app.route("/") def index(): """ Renders the main page. """ return render_template("index.html") @app.route("/health") def health(): """ Health check endpoint. """ return jsonify({"status": "ok"}), 200 @app.route("/ingest", methods=["POST"]) def ingest(): """Endpoint to trigger document ingestion with embeddings""" try: from src.config import ( CORPUS_DIRECTORY, DEFAULT_CHUNK_SIZE, DEFAULT_OVERLAP, RANDOM_SEED, ) from src.ingestion.ingestion_pipeline import IngestionPipeline # Get optional parameters from request data = request.get_json() if request.is_json else {} store_embeddings = data.get("store_embeddings", True) pipeline = IngestionPipeline( chunk_size=DEFAULT_CHUNK_SIZE, overlap=DEFAULT_OVERLAP, seed=RANDOM_SEED, store_embeddings=store_embeddings, ) result = pipeline.process_directory_with_embeddings(CORPUS_DIRECTORY) # Create response with enhanced information response = { "status": result["status"], "chunks_processed": result["chunks_processed"], "files_processed": result["files_processed"], "embeddings_stored": result["embeddings_stored"], "store_embeddings": result["store_embeddings"], "message": ( f"Successfully processed {result['chunks_processed']} chunks " f"from {result['files_processed']} files" ), } # Include failed files info if any if result["failed_files"]: response["failed_files"] = result["failed_files"] failed_count = len(result["failed_files"]) response["warnings"] = f"{failed_count} files failed to process" return jsonify(response) except Exception as e: return jsonify({"status": "error", "message": str(e)}), 500 @app.route("/search", methods=["POST"]) def search(): """ Endpoint to perform semantic search on ingested documents. Accepts JSON requests with query text and optional parameters. Returns semantically similar document chunks. """ try: # Validate request contains JSON data if not request.is_json: return ( jsonify( { "status": "error", "message": "Content-Type must be application/json", } ), 400, ) data = request.get_json() # Validate required query parameter query = data.get("query") if query is None: return ( jsonify({"status": "error", "message": "Query parameter is required"}), 400, ) if not isinstance(query, str) or not query.strip(): return ( jsonify( {"status": "error", "message": "Query must be a non-empty string"} ), 400, ) # Extract optional parameters with defaults top_k = data.get("top_k", 5) threshold = data.get("threshold", 0.3) # Validate parameters if not isinstance(top_k, int) or top_k <= 0: return ( jsonify( {"status": "error", "message": "top_k must be a positive integer"} ), 400, ) if not isinstance(threshold, (int, float)) or not (0.0 <= threshold <= 1.0): return ( jsonify( { "status": "error", "message": "threshold must be a number between 0 and 1", } ), 400, ) # Initialize search components from src.config import COLLECTION_NAME, VECTOR_DB_PERSIST_PATH from src.embedding.embedding_service import EmbeddingService from src.search.search_service import SearchService from src.vector_store.vector_db import VectorDatabase vector_db = VectorDatabase(VECTOR_DB_PERSIST_PATH, COLLECTION_NAME) embedding_service = EmbeddingService() search_service = SearchService(vector_db, embedding_service) # Perform search results = search_service.search( query=query.strip(), top_k=top_k, threshold=threshold ) # Format response response = { "status": "success", "query": query.strip(), "results_count": len(results), "results": results, } return jsonify(response) except ValueError as e: return jsonify({"status": "error", "message": str(e)}), 400 except Exception as e: return jsonify({"status": "error", "message": f"Search failed: {str(e)}"}), 500 @app.route("/chat", methods=["POST"]) def chat(): """ Endpoint for conversational RAG interactions. Accepts JSON requests with user messages and returns AI-generated responses based on corporate policy documents. """ try: # Validate request contains JSON data if not request.is_json: return ( jsonify( { "status": "error", "message": "Content-Type must be application/json", } ), 400, ) data = request.get_json() # Validate required message parameter message = data.get("message") if message is None: return ( jsonify( {"status": "error", "message": "message parameter is required"} ), 400, ) if not isinstance(message, str) or not message.strip(): return ( jsonify( {"status": "error", "message": "message must be a non-empty string"} ), 400, ) # Extract optional parameters conversation_id = data.get("conversation_id") include_sources = data.get("include_sources", True) include_debug = data.get("include_debug", False) # Initialize RAG pipeline components try: from src.config import COLLECTION_NAME, VECTOR_DB_PERSIST_PATH from src.embedding.embedding_service import EmbeddingService from src.llm.llm_service import LLMService from src.rag.rag_pipeline import RAGPipeline from src.rag.response_formatter import ResponseFormatter from src.search.search_service import SearchService from src.vector_store.vector_db import VectorDatabase # Initialize services vector_db = VectorDatabase(VECTOR_DB_PERSIST_PATH, COLLECTION_NAME) embedding_service = EmbeddingService() search_service = SearchService(vector_db, embedding_service) # Initialize LLM service from environment llm_service = LLMService.from_environment() # Initialize RAG pipeline rag_pipeline = RAGPipeline(search_service, llm_service) # Initialize response formatter formatter = ResponseFormatter() except ValueError as e: return ( jsonify( { "status": "error", "message": f"LLM service configuration error: {str(e)}", "details": ( "Please ensure OPENROUTER_API_KEY or GROQ_API_KEY " "environment variables are set" ), } ), 503, ) except Exception as e: return ( jsonify( { "status": "error", "message": f"Service initialization failed: {str(e)}", } ), 500, ) # Generate RAG response rag_response = rag_pipeline.generate_answer(message.strip()) # Format response for API if include_sources: formatted_response = formatter.format_api_response( rag_response, include_debug ) else: formatted_response = formatter.format_chat_response( rag_response, conversation_id, include_sources=False ) return jsonify(formatted_response) except Exception as e: return ( jsonify({"status": "error", "message": f"Chat request failed: {str(e)}"}), 500, ) @app.route("/chat/health", methods=["GET"]) def chat_health(): """ Health check endpoint for RAG chat functionality. Returns the status of all RAG pipeline components. """ try: from src.config import COLLECTION_NAME, VECTOR_DB_PERSIST_PATH from src.embedding.embedding_service import EmbeddingService from src.llm.llm_service import LLMService from src.rag.rag_pipeline import RAGPipeline from src.rag.response_formatter import ResponseFormatter from src.search.search_service import SearchService from src.vector_store.vector_db import VectorDatabase # Initialize services for health check vector_db = VectorDatabase(VECTOR_DB_PERSIST_PATH, COLLECTION_NAME) embedding_service = EmbeddingService() search_service = SearchService(vector_db, embedding_service) try: llm_service = LLMService.from_environment() rag_pipeline = RAGPipeline(search_service, llm_service) formatter = ResponseFormatter() # Perform health check health_data = rag_pipeline.health_check() health_response = formatter.create_health_response(health_data) # Determine HTTP status based on health if health_data.get("pipeline") == "healthy": return jsonify(health_response), 200 elif health_data.get("pipeline") == "degraded": return jsonify(health_response), 200 # Still functional else: return jsonify(health_response), 503 # Service unavailable except ValueError as e: return ( jsonify( { "status": "error", "message": f"LLM configuration error: {str(e)}", "health": { "pipeline_status": "unhealthy", "components": { "llm_service": { "status": "unconfigured", "error": str(e), } }, }, } ), 503, ) except Exception as e: return ( jsonify({"status": "error", "message": f"Health check failed: {str(e)}"}), 500, ) if __name__ == "__main__": app.run(debug=True)