msse-ai-engineering / src /ingestion /ingestion_pipeline.py
Tobias Pasquale
style: apply black code formatting
3d9d99a
raw
history blame
7.3 kB
from pathlib import Path
from typing import Any, Dict, List, Optional
from ..embedding.embedding_service import EmbeddingService
from ..vector_store.vector_db import VectorDatabase
from .document_chunker import DocumentChunker
from .document_parser import DocumentParser
class IngestionPipeline:
"""Complete ingestion pipeline for processing document corpus with embeddings"""
def __init__(
self,
chunk_size: int = 1000,
overlap: int = 200,
seed: int = 42,
store_embeddings: bool = True,
vector_db: Optional[VectorDatabase] = None,
embedding_service: Optional[EmbeddingService] = None,
):
"""
Initialize the ingestion pipeline
Args:
chunk_size: Size of text chunks
overlap: Overlap between chunks
seed: Random seed for reproducibility
store_embeddings: Whether to generate and store embeddings
vector_db: Vector database instance for storing embeddings
embedding_service: Embedding service for generating embeddings
"""
self.parser = DocumentParser()
self.chunker = DocumentChunker(
chunk_size=chunk_size, overlap=overlap, seed=seed
)
self.seed = seed
self.store_embeddings = store_embeddings
# Initialize embedding components if storing embeddings
if store_embeddings:
self.embedding_service = embedding_service or EmbeddingService()
if vector_db is None:
from ..config import COLLECTION_NAME, VECTOR_DB_PERSIST_PATH
self.vector_db = VectorDatabase(
persist_path=VECTOR_DB_PERSIST_PATH, collection_name=COLLECTION_NAME
)
else:
self.vector_db = vector_db
else:
self.embedding_service = None
self.vector_db = None
def process_directory(self, directory_path: str) -> List[Dict[str, Any]]:
"""
Process all supported documents in a directory (backward compatible)
Args:
directory_path: Path to directory containing documents
Returns:
List of processed chunks with metadata
"""
directory = Path(directory_path)
if not directory.exists():
raise FileNotFoundError(f"Directory not found: {directory_path}")
all_chunks = []
# Process each supported file
for file_path in directory.iterdir():
if (
file_path.is_file()
and file_path.suffix.lower() in self.parser.SUPPORTED_FORMATS
):
try:
chunks = self.process_file(str(file_path))
all_chunks.extend(chunks)
except Exception as e:
print(f"Warning: Failed to process {file_path}: {e}")
continue
return all_chunks
def process_directory_with_embeddings(self, directory_path: str) -> Dict[str, Any]:
"""
Process all supported documents in a directory with embeddings and enhanced
reporting
Args:
directory_path: Path to directory containing documents
Returns:
Dictionary with processing results and statistics
"""
import time
start_time = time.time()
directory = Path(directory_path)
if not directory.exists():
raise FileNotFoundError(f"Directory not found: {directory_path}")
all_chunks = []
processed_files = 0
failed_files = []
embeddings_stored = 0
# Process each supported file
for file_path in directory.iterdir():
if (
file_path.is_file()
and file_path.suffix.lower() in self.parser.SUPPORTED_FORMATS
):
try:
chunks = self.process_file(str(file_path))
all_chunks.extend(chunks)
processed_files += 1
except Exception as e:
print(f"Warning: Failed to process {file_path}: {e}")
failed_files.append({"file": str(file_path), "error": str(e)})
continue
# Generate and store embeddings if enabled
if (
self.store_embeddings
and all_chunks
and self.embedding_service
and self.vector_db
):
try:
embeddings_stored = self._store_embeddings_batch(all_chunks)
except Exception as e:
print(f"Warning: Failed to store embeddings: {e}")
return {
"status": "success",
"chunks_processed": len(all_chunks),
"files_processed": processed_files,
"failed_files": failed_files,
"embeddings_stored": embeddings_stored,
"store_embeddings": self.store_embeddings,
"processing_time_seconds": time.time() - start_time,
"chunks": all_chunks, # Include chunks for backward compatibility
}
def process_file(self, file_path: str) -> List[Dict[str, Any]]:
"""
Process a single file through the complete pipeline
Args:
file_path: Path to the file to process
Returns:
List of chunks from the file
"""
# Parse document
parsed_doc = self.parser.parse_document(file_path)
# Chunk the document
chunks = self.chunker.chunk_document(
parsed_doc["content"], parsed_doc["metadata"]
)
return chunks
def _store_embeddings_batch(self, chunks: List[Dict[str, Any]]) -> int:
"""
Generate embeddings and store chunks in vector database
Args:
chunks: List of text chunks with metadata
Returns:
Number of embeddings stored successfully
"""
if not self.embedding_service or not self.vector_db:
return 0
stored_count = 0
batch_size = 32 # Process in batches for memory efficiency
for i in range(0, len(chunks), batch_size):
batch = chunks[i : i + batch_size]
try:
# Extract texts and prepare data for vector storage
texts = [chunk["content"] for chunk in batch]
chunk_ids = [chunk["metadata"]["chunk_id"] for chunk in batch]
metadatas = [chunk["metadata"] for chunk in batch]
# Generate embeddings for the batch
embeddings = self.embedding_service.embed_texts(texts)
# Store in vector database
self.vector_db.add_embeddings(
embeddings=embeddings,
chunk_ids=chunk_ids,
documents=texts,
metadatas=metadatas,
)
stored_count += len(batch)
print(
f"Stored embeddings for batch {i // batch_size + 1}: "
f"{len(batch)} chunks"
)
except Exception as e:
print(f"Warning: Failed to store batch {i // batch_size + 1}: {e}")
continue
return stored_count