Seth McKnight
Update CI/CD workflow and enhance contributing guidelines (#51)
29c3655
raw
history blame
12 kB
from flask import Flask, jsonify, render_template, request
# Disable ChromaDB anonymized telemetry for local development so the
# library doesn't attempt to call external PostHog telemetry endpoints.
# This avoids noisy errors in server logs and respects developer privacy.
try:
import chromadb
# Turn off anonymized telemetry (the chromadb package defaults this to True)
chromadb.configure(anonymized_telemetry=False)
except Exception:
# If chromadb isn't installed in this environment yet, ignore silently.
pass
app = Flask(__name__)
@app.route("/")
def index():
"""
Renders the main page.
"""
return render_template("index.html")
@app.route("/health")
def health():
"""
Health check endpoint.
"""
return jsonify({"status": "ok"}), 200
@app.route("/ingest", methods=["POST"])
def ingest():
"""Endpoint to trigger document ingestion with embeddings"""
try:
from src.config import (
CORPUS_DIRECTORY,
DEFAULT_CHUNK_SIZE,
DEFAULT_OVERLAP,
RANDOM_SEED,
)
from src.ingestion.ingestion_pipeline import IngestionPipeline
# Get optional parameters from request
data = request.get_json() if request.is_json else {}
store_embeddings = data.get("store_embeddings", True)
pipeline = IngestionPipeline(
chunk_size=DEFAULT_CHUNK_SIZE,
overlap=DEFAULT_OVERLAP,
seed=RANDOM_SEED,
store_embeddings=store_embeddings,
)
result = pipeline.process_directory_with_embeddings(CORPUS_DIRECTORY)
# Create response with enhanced information
response = {
"status": result["status"],
"chunks_processed": result["chunks_processed"],
"files_processed": result["files_processed"],
"embeddings_stored": result["embeddings_stored"],
"store_embeddings": result["store_embeddings"],
"message": (
f"Successfully processed {result['chunks_processed']} chunks "
f"from {result['files_processed']} files"
),
}
# Include failed files info if any
if result["failed_files"]:
response["failed_files"] = result["failed_files"]
failed_count = len(result["failed_files"])
response["warnings"] = f"{failed_count} files failed to process"
return jsonify(response)
except Exception as e:
return jsonify({"status": "error", "message": str(e)}), 500
@app.route("/search", methods=["POST"])
def search():
"""
Endpoint to perform semantic search on ingested documents.
Accepts JSON requests with query text and optional parameters.
Returns semantically similar document chunks.
"""
try:
# Validate request contains JSON data
if not request.is_json:
return (
jsonify(
{
"status": "error",
"message": "Content-Type must be application/json",
}
),
400,
)
data = request.get_json()
# Validate required query parameter
query = data.get("query")
if query is None:
return (
jsonify({"status": "error", "message": "Query parameter is required"}),
400,
)
if not isinstance(query, str) or not query.strip():
return (
jsonify(
{"status": "error", "message": "Query must be a non-empty string"}
),
400,
)
# Extract optional parameters with defaults
top_k = data.get("top_k", 5)
threshold = data.get("threshold", 0.3)
# Validate parameters
if not isinstance(top_k, int) or top_k <= 0:
return (
jsonify(
{"status": "error", "message": "top_k must be a positive integer"}
),
400,
)
if not isinstance(threshold, (int, float)) or not (0.0 <= threshold <= 1.0):
return (
jsonify(
{
"status": "error",
"message": "threshold must be a number between 0 and 1",
}
),
400,
)
# Initialize search components
from src.config import COLLECTION_NAME, VECTOR_DB_PERSIST_PATH
from src.embedding.embedding_service import EmbeddingService
from src.search.search_service import SearchService
from src.vector_store.vector_db import VectorDatabase
vector_db = VectorDatabase(VECTOR_DB_PERSIST_PATH, COLLECTION_NAME)
embedding_service = EmbeddingService()
search_service = SearchService(vector_db, embedding_service)
# Perform search
results = search_service.search(
query=query.strip(), top_k=top_k, threshold=threshold
)
# Format response
response = {
"status": "success",
"query": query.strip(),
"results_count": len(results),
"results": results,
}
return jsonify(response)
except ValueError as e:
return jsonify({"status": "error", "message": str(e)}), 400
except Exception as e:
return jsonify({"status": "error", "message": f"Search failed: {str(e)}"}), 500
@app.route("/chat", methods=["POST"])
def chat():
"""
Endpoint for conversational RAG interactions.
Accepts JSON requests with user messages and returns AI-generated
responses based on corporate policy documents.
"""
try:
# Validate request contains JSON data
if not request.is_json:
return (
jsonify(
{
"status": "error",
"message": "Content-Type must be application/json",
}
),
400,
)
data = request.get_json()
# Validate required message parameter
message = data.get("message")
if message is None:
return (
jsonify(
{"status": "error", "message": "message parameter is required"}
),
400,
)
if not isinstance(message, str) or not message.strip():
return (
jsonify(
{"status": "error", "message": "message must be a non-empty string"}
),
400,
)
# Extract optional parameters
conversation_id = data.get("conversation_id")
include_sources = data.get("include_sources", True)
include_debug = data.get("include_debug", False)
# Initialize RAG pipeline components
try:
from src.config import COLLECTION_NAME, VECTOR_DB_PERSIST_PATH
from src.embedding.embedding_service import EmbeddingService
from src.llm.llm_service import LLMService
from src.rag.rag_pipeline import RAGPipeline
from src.rag.response_formatter import ResponseFormatter
from src.search.search_service import SearchService
from src.vector_store.vector_db import VectorDatabase
# Initialize services
vector_db = VectorDatabase(VECTOR_DB_PERSIST_PATH, COLLECTION_NAME)
embedding_service = EmbeddingService()
search_service = SearchService(vector_db, embedding_service)
# Initialize LLM service from environment
llm_service = LLMService.from_environment()
# Initialize RAG pipeline
rag_pipeline = RAGPipeline(search_service, llm_service)
# Initialize response formatter
formatter = ResponseFormatter()
except ValueError as e:
return (
jsonify(
{
"status": "error",
"message": f"LLM service configuration error: {str(e)}",
"details": (
"Please ensure OPENROUTER_API_KEY or GROQ_API_KEY "
"environment variables are set"
),
}
),
503,
)
except Exception as e:
return (
jsonify(
{
"status": "error",
"message": f"Service initialization failed: {str(e)}",
}
),
500,
)
# Generate RAG response
rag_response = rag_pipeline.generate_answer(message.strip())
# Format response for API
if include_sources:
formatted_response = formatter.format_api_response(
rag_response, include_debug
)
else:
formatted_response = formatter.format_chat_response(
rag_response, conversation_id, include_sources=False
)
return jsonify(formatted_response)
except Exception as e:
return (
jsonify({"status": "error", "message": f"Chat request failed: {str(e)}"}),
500,
)
@app.route("/chat/health", methods=["GET"])
def chat_health():
"""
Health check endpoint for RAG chat functionality.
Returns the status of all RAG pipeline components.
"""
try:
from src.config import COLLECTION_NAME, VECTOR_DB_PERSIST_PATH
from src.embedding.embedding_service import EmbeddingService
from src.llm.llm_service import LLMService
from src.rag.rag_pipeline import RAGPipeline
from src.rag.response_formatter import ResponseFormatter
from src.search.search_service import SearchService
from src.vector_store.vector_db import VectorDatabase
# Initialize services for health check
vector_db = VectorDatabase(VECTOR_DB_PERSIST_PATH, COLLECTION_NAME)
embedding_service = EmbeddingService()
search_service = SearchService(vector_db, embedding_service)
try:
llm_service = LLMService.from_environment()
rag_pipeline = RAGPipeline(search_service, llm_service)
formatter = ResponseFormatter()
# Perform health check
health_data = rag_pipeline.health_check()
health_response = formatter.create_health_response(health_data)
# Determine HTTP status based on health
if health_data.get("pipeline") == "healthy":
return jsonify(health_response), 200
elif health_data.get("pipeline") == "degraded":
return jsonify(health_response), 200 # Still functional
else:
return jsonify(health_response), 503 # Service unavailable
except ValueError as e:
return (
jsonify(
{
"status": "error",
"message": f"LLM configuration error: {str(e)}",
"health": {
"pipeline_status": "unhealthy",
"components": {
"llm_service": {
"status": "unconfigured",
"error": str(e),
}
},
},
}
),
503,
)
except Exception as e:
return (
jsonify({"status": "error", "message": f"Health check failed: {str(e)}"}),
500,
)
if __name__ == "__main__":
app.run(debug=True)