msse-ai-engineering / tests /test_ingestion /test_ingestion_pipeline.py
Tobias Pasquale
style: Fix code formatting and linting issues for CI/CD compliance
7793bb6
raw
history blame
6.12 kB
import os
import tempfile
from pathlib import Path
import pytest
from src.ingestion.ingestion_pipeline import IngestionPipeline
def test_full_ingestion_pipeline():
"""Test the complete ingestion pipeline end-to-end"""
# Create temporary test documents
with tempfile.TemporaryDirectory() as temp_dir:
# Create test files
txt_file = Path(temp_dir) / "policy1.txt"
md_file = Path(temp_dir) / "policy2.md"
txt_file.write_text(
"This is a text policy document with important information."
)
md_file.write_text("# Markdown Policy\n\nThis is markdown content.")
# Initialize pipeline
pipeline = IngestionPipeline(chunk_size=50, overlap=10, seed=42)
# Process documents
results = pipeline.process_directory(temp_dir)
assert len(results) >= 2 # At least one result per file
# Verify structure
for result in results:
assert "content" in result
assert "metadata" in result
assert "chunk_id" in result["metadata"]
assert "filename" in result["metadata"]
def test_pipeline_reproducibility():
"""Test that pipeline produces consistent results"""
with tempfile.TemporaryDirectory() as temp_dir:
test_file = Path(temp_dir) / "test.txt"
test_file.write_text("Test content for reproducibility. " * 20)
pipeline1 = IngestionPipeline(chunk_size=100, overlap=20, seed=42)
pipeline2 = IngestionPipeline(chunk_size=100, overlap=20, seed=42)
results1 = pipeline1.process_directory(temp_dir)
results2 = pipeline2.process_directory(temp_dir)
assert len(results1) == len(results2)
for r1, r2 in zip(results1, results2):
assert r1["content"] == r2["content"]
assert r1["metadata"]["chunk_id"] == r2["metadata"]["chunk_id"]
def test_pipeline_with_real_corpus():
"""Test pipeline with actual policy documents"""
pipeline = IngestionPipeline(chunk_size=1000, overlap=200, seed=42)
# Process just one real document to verify it works
corpus_dir = "synthetic_policies"
# Check if corpus directory exists
if not Path(corpus_dir).exists():
pytest.skip("Corpus directory not found - test requires synthetic_policies/")
results = pipeline.process_directory(corpus_dir)
# Should process all 22 documents
assert len(results) > 20 # Should have many chunks from 22 documents
# Verify all results have proper structure
for result in results:
assert "content" in result
assert "metadata" in result
assert "chunk_id" in result["metadata"]
assert "filename" in result["metadata"]
assert "file_type" in result["metadata"]
assert result["metadata"]["file_type"] == "md"
assert "chunk_index" in result["metadata"]
def test_pipeline_error_handling():
"""Test pipeline handles errors gracefully"""
with tempfile.TemporaryDirectory() as temp_dir:
# Create valid and invalid files
valid_file = Path(temp_dir) / "valid.md"
invalid_file = Path(temp_dir) / "invalid.xyz"
valid_file.write_text("# Valid Policy\n\nThis is valid content.")
invalid_file.write_text("This file has unsupported format.")
pipeline = IngestionPipeline(chunk_size=100, overlap=20, seed=42)
# Should process valid file and skip invalid one
results = pipeline.process_directory(temp_dir)
# Should only get results from valid file
assert len(results) >= 1
# All results should be from valid file
for result in results:
assert result["metadata"]["filename"] == "valid.md"
def test_pipeline_single_file():
"""Test processing a single file"""
pipeline = IngestionPipeline(chunk_size=100, overlap=20, seed=42)
with tempfile.NamedTemporaryFile(mode="w", suffix=".md", delete=False) as f:
f.write("# Test Policy\n\n" + "Content section. " * 20)
temp_path = f.name
try:
results = pipeline.process_file(temp_path)
# Should get multiple chunks due to length
assert len(results) > 1
# All chunks should have same filename
filename = Path(temp_path).name
for result in results:
assert result["metadata"]["filename"] == filename
assert result["metadata"]["file_type"] == "md"
assert "chunk_index" in result["metadata"]
finally:
os.unlink(temp_path)
def test_pipeline_empty_directory():
"""Test pipeline with empty directory"""
with tempfile.TemporaryDirectory() as temp_dir:
pipeline = IngestionPipeline(chunk_size=100, overlap=20, seed=42)
results = pipeline.process_directory(temp_dir)
# Should return empty list for empty directory
assert len(results) == 0
def test_pipeline_nonexistent_directory():
"""Test pipeline with non-existent directory"""
pipeline = IngestionPipeline(chunk_size=100, overlap=20, seed=42)
with pytest.raises(FileNotFoundError):
pipeline.process_directory("/nonexistent/directory")
def test_pipeline_configuration():
"""Test pipeline configuration options"""
# Test different configurations
pipeline_small = IngestionPipeline(chunk_size=50, overlap=10, seed=42)
pipeline_large = IngestionPipeline(chunk_size=200, overlap=50, seed=42)
with tempfile.NamedTemporaryFile(mode="w", suffix=".txt", delete=False) as f:
content = "Policy content goes here. " * 30 # 780 characters
f.write(content)
temp_path = f.name
try:
results_small = pipeline_small.process_file(temp_path)
results_large = pipeline_large.process_file(temp_path)
# Small chunks should create more chunks
assert len(results_small) > len(results_large)
# All chunks should respect size limits
for result in results_small:
assert len(result["content"]) <= 50
for result in results_large:
assert len(result["content"]) <= 200
finally:
os.unlink(temp_path)