import os import tempfile from pathlib import Path import pytest from src.ingestion.ingestion_pipeline import IngestionPipeline def test_full_ingestion_pipeline(): """Test the complete ingestion pipeline end-to-end""" # Create temporary test documents with tempfile.TemporaryDirectory() as temp_dir: # Create test files txt_file = Path(temp_dir) / "policy1.txt" md_file = Path(temp_dir) / "policy2.md" txt_file.write_text( "This is a text policy document with important information." ) md_file.write_text("# Markdown Policy\n\nThis is markdown content.") # Initialize pipeline pipeline = IngestionPipeline(chunk_size=50, overlap=10, seed=42) # Process documents results = pipeline.process_directory(temp_dir) assert len(results) >= 2 # At least one result per file # Verify structure for result in results: assert "content" in result assert "metadata" in result assert "chunk_id" in result["metadata"] assert "filename" in result["metadata"] def test_pipeline_reproducibility(): """Test that pipeline produces consistent results""" with tempfile.TemporaryDirectory() as temp_dir: test_file = Path(temp_dir) / "test.txt" test_file.write_text("Test content for reproducibility. " * 20) pipeline1 = IngestionPipeline(chunk_size=100, overlap=20, seed=42) pipeline2 = IngestionPipeline(chunk_size=100, overlap=20, seed=42) results1 = pipeline1.process_directory(temp_dir) results2 = pipeline2.process_directory(temp_dir) assert len(results1) == len(results2) for r1, r2 in zip(results1, results2): assert r1["content"] == r2["content"] assert r1["metadata"]["chunk_id"] == r2["metadata"]["chunk_id"] def test_pipeline_with_real_corpus(): """Test pipeline with actual policy documents""" pipeline = IngestionPipeline(chunk_size=1000, overlap=200, seed=42) # Process just one real document to verify it works corpus_dir = "synthetic_policies" # Check if corpus directory exists if not Path(corpus_dir).exists(): pytest.skip("Corpus directory not found - test requires synthetic_policies/") results = pipeline.process_directory(corpus_dir) # Should process all 22 documents assert len(results) > 20 # Should have many chunks from 22 documents # Verify all results have proper structure for result in results: assert "content" in result assert "metadata" in result assert "chunk_id" in result["metadata"] assert "filename" in result["metadata"] assert "file_type" in result["metadata"] assert result["metadata"]["file_type"] == "md" assert "chunk_index" in result["metadata"] def test_pipeline_error_handling(): """Test pipeline handles errors gracefully""" with tempfile.TemporaryDirectory() as temp_dir: # Create valid and invalid files valid_file = Path(temp_dir) / "valid.md" invalid_file = Path(temp_dir) / "invalid.xyz" valid_file.write_text("# Valid Policy\n\nThis is valid content.") invalid_file.write_text("This file has unsupported format.") pipeline = IngestionPipeline(chunk_size=100, overlap=20, seed=42) # Should process valid file and skip invalid one results = pipeline.process_directory(temp_dir) # Should only get results from valid file assert len(results) >= 1 # All results should be from valid file for result in results: assert result["metadata"]["filename"] == "valid.md" def test_pipeline_single_file(): """Test processing a single file""" pipeline = IngestionPipeline(chunk_size=100, overlap=20, seed=42) with tempfile.NamedTemporaryFile(mode="w", suffix=".md", delete=False) as f: f.write("# Test Policy\n\n" + "Content section. " * 20) temp_path = f.name try: results = pipeline.process_file(temp_path) # Should get multiple chunks due to length assert len(results) > 1 # All chunks should have same filename filename = Path(temp_path).name for result in results: assert result["metadata"]["filename"] == filename assert result["metadata"]["file_type"] == "md" assert "chunk_index" in result["metadata"] finally: os.unlink(temp_path) def test_pipeline_empty_directory(): """Test pipeline with empty directory""" with tempfile.TemporaryDirectory() as temp_dir: pipeline = IngestionPipeline(chunk_size=100, overlap=20, seed=42) results = pipeline.process_directory(temp_dir) # Should return empty list for empty directory assert len(results) == 0 def test_pipeline_nonexistent_directory(): """Test pipeline with non-existent directory""" pipeline = IngestionPipeline(chunk_size=100, overlap=20, seed=42) with pytest.raises(FileNotFoundError): pipeline.process_directory("/nonexistent/directory") def test_pipeline_configuration(): """Test pipeline configuration options""" # Test different configurations pipeline_small = IngestionPipeline(chunk_size=50, overlap=10, seed=42) pipeline_large = IngestionPipeline(chunk_size=200, overlap=50, seed=42) with tempfile.NamedTemporaryFile(mode="w", suffix=".txt", delete=False) as f: content = "Policy content goes here. " * 30 # 780 characters f.write(content) temp_path = f.name try: results_small = pipeline_small.process_file(temp_path) results_large = pipeline_large.process_file(temp_path) # Small chunks should create more chunks assert len(results_small) > len(results_large) # All chunks should respect size limits for result in results_small: assert len(result["content"]) <= 50 for result in results_large: assert len(result["content"]) <= 200 finally: os.unlink(temp_path)