from pathlib import Path from typing import Any, Dict, List, Optional from ..embedding.embedding_service import EmbeddingService from ..vector_store.vector_db import VectorDatabase from .document_chunker import DocumentChunker from .document_parser import DocumentParser class IngestionPipeline: """Complete ingestion pipeline for processing document corpus with embeddings""" def __init__( self, chunk_size: int = 1000, overlap: int = 200, seed: int = 42, store_embeddings: bool = True, vector_db: Optional[VectorDatabase] = None, embedding_service: Optional[EmbeddingService] = None, ): """ Initialize the ingestion pipeline Args: chunk_size: Size of text chunks overlap: Overlap between chunks seed: Random seed for reproducibility store_embeddings: Whether to generate and store embeddings vector_db: Vector database instance for storing embeddings embedding_service: Embedding service for generating embeddings """ self.parser = DocumentParser() self.chunker = DocumentChunker( chunk_size=chunk_size, overlap=overlap, seed=seed ) self.seed = seed self.store_embeddings = store_embeddings # Initialize embedding components if storing embeddings if store_embeddings: self.embedding_service = embedding_service or EmbeddingService() if vector_db is None: from ..config import COLLECTION_NAME, VECTOR_DB_PERSIST_PATH self.vector_db = VectorDatabase( persist_path=VECTOR_DB_PERSIST_PATH, collection_name=COLLECTION_NAME ) else: self.vector_db = vector_db else: self.embedding_service = None self.vector_db = None def process_directory(self, directory_path: str) -> List[Dict[str, Any]]: """ Process all supported documents in a directory (backward compatible) Args: directory_path: Path to directory containing documents Returns: List of processed chunks with metadata """ directory = Path(directory_path) if not directory.exists(): raise FileNotFoundError(f"Directory not found: {directory_path}") all_chunks = [] # Process each supported file for file_path in directory.iterdir(): if ( file_path.is_file() and file_path.suffix.lower() in self.parser.SUPPORTED_FORMATS ): try: chunks = self.process_file(str(file_path)) all_chunks.extend(chunks) except Exception as e: print(f"Warning: Failed to process {file_path}: {e}") continue return all_chunks def process_directory_with_embeddings(self, directory_path: str) -> Dict[str, Any]: """ Process all supported documents in a directory with embeddings and enhanced reporting Args: directory_path: Path to directory containing documents Returns: Dictionary with processing results and statistics """ import time start_time = time.time() directory = Path(directory_path) if not directory.exists(): raise FileNotFoundError(f"Directory not found: {directory_path}") all_chunks = [] processed_files = 0 failed_files = [] embeddings_stored = 0 # Process each supported file for file_path in directory.iterdir(): if ( file_path.is_file() and file_path.suffix.lower() in self.parser.SUPPORTED_FORMATS ): try: chunks = self.process_file(str(file_path)) all_chunks.extend(chunks) processed_files += 1 except Exception as e: print(f"Warning: Failed to process {file_path}: {e}") failed_files.append({"file": str(file_path), "error": str(e)}) continue # Generate and store embeddings if enabled if ( self.store_embeddings and all_chunks and self.embedding_service and self.vector_db ): try: embeddings_stored = self._store_embeddings_batch(all_chunks) except Exception as e: print(f"Warning: Failed to store embeddings: {e}") return { "status": "success", "chunks_processed": len(all_chunks), "files_processed": processed_files, "failed_files": failed_files, "embeddings_stored": embeddings_stored, "store_embeddings": self.store_embeddings, "processing_time_seconds": time.time() - start_time, "chunks": all_chunks, # Include chunks for backward compatibility } def process_file(self, file_path: str) -> List[Dict[str, Any]]: """ Process a single file through the complete pipeline Args: file_path: Path to the file to process Returns: List of chunks from the file """ # Parse document parsed_doc = self.parser.parse_document(file_path) # Chunk the document chunks = self.chunker.chunk_document( parsed_doc["content"], parsed_doc["metadata"] ) return chunks def _store_embeddings_batch(self, chunks: List[Dict[str, Any]]) -> int: """ Generate embeddings and store chunks in vector database Args: chunks: List of text chunks with metadata Returns: Number of embeddings stored successfully """ if not self.embedding_service or not self.vector_db: return 0 stored_count = 0 batch_size = 32 # Process in batches for memory efficiency for i in range(0, len(chunks), batch_size): batch = chunks[i : i + batch_size] try: # Extract texts and prepare data for vector storage texts = [chunk["content"] for chunk in batch] chunk_ids = [chunk["metadata"]["chunk_id"] for chunk in batch] metadatas = [chunk["metadata"] for chunk in batch] # Generate embeddings for the batch embeddings = self.embedding_service.embed_texts(texts) # Store in vector database self.vector_db.add_embeddings( embeddings=embeddings, chunk_ids=chunk_ids, documents=texts, metadatas=metadatas, ) stored_count += len(batch) print( f"Stored embeddings for batch {i // batch_size + 1}: " f"{len(batch)} chunks" ) except Exception as e: print(f"Warning: Failed to store batch {i // batch_size + 1}: {e}") continue return stored_count