Spaces:
Sleeping
Sleeping
| import os | |
| import tempfile | |
| from pathlib import Path | |
| import pytest | |
| from src.ingestion.ingestion_pipeline import IngestionPipeline | |
| def test_full_ingestion_pipeline(): | |
| """Test the complete ingestion pipeline end-to-end""" | |
| # Create temporary test documents | |
| with tempfile.TemporaryDirectory() as temp_dir: | |
| # Create test files | |
| txt_file = Path(temp_dir) / "policy1.txt" | |
| md_file = Path(temp_dir) / "policy2.md" | |
| txt_file.write_text( | |
| "This is a text policy document with important information." | |
| ) | |
| md_file.write_text("# Markdown Policy\n\nThis is markdown content.") | |
| # Initialize pipeline | |
| pipeline = IngestionPipeline(chunk_size=50, overlap=10, seed=42) | |
| # Process documents | |
| results = pipeline.process_directory(temp_dir) | |
| assert len(results) >= 2 # At least one result per file | |
| # Verify structure | |
| for result in results: | |
| assert "content" in result | |
| assert "metadata" in result | |
| assert "chunk_id" in result["metadata"] | |
| assert "filename" in result["metadata"] | |
| def test_pipeline_reproducibility(): | |
| """Test that pipeline produces consistent results""" | |
| with tempfile.TemporaryDirectory() as temp_dir: | |
| test_file = Path(temp_dir) / "test.txt" | |
| test_file.write_text("Test content for reproducibility. " * 20) | |
| pipeline1 = IngestionPipeline(chunk_size=100, overlap=20, seed=42) | |
| pipeline2 = IngestionPipeline(chunk_size=100, overlap=20, seed=42) | |
| results1 = pipeline1.process_directory(temp_dir) | |
| results2 = pipeline2.process_directory(temp_dir) | |
| assert len(results1) == len(results2) | |
| for r1, r2 in zip(results1, results2): | |
| assert r1["content"] == r2["content"] | |
| assert r1["metadata"]["chunk_id"] == r2["metadata"]["chunk_id"] | |
| def test_pipeline_with_real_corpus(): | |
| """Test pipeline with actual policy documents""" | |
| pipeline = IngestionPipeline(chunk_size=1000, overlap=200, seed=42) | |
| # Process just one real document to verify it works | |
| corpus_dir = "synthetic_policies" | |
| # Check if corpus directory exists | |
| if not Path(corpus_dir).exists(): | |
| pytest.skip("Corpus directory not found - test requires synthetic_policies/") | |
| results = pipeline.process_directory(corpus_dir) | |
| # Should process all 22 documents | |
| assert len(results) > 20 # Should have many chunks from 22 documents | |
| # Verify all results have proper structure | |
| for result in results: | |
| assert "content" in result | |
| assert "metadata" in result | |
| assert "chunk_id" in result["metadata"] | |
| assert "filename" in result["metadata"] | |
| assert "file_type" in result["metadata"] | |
| assert result["metadata"]["file_type"] == "md" | |
| assert "chunk_index" in result["metadata"] | |
| def test_pipeline_error_handling(): | |
| """Test pipeline handles errors gracefully""" | |
| with tempfile.TemporaryDirectory() as temp_dir: | |
| # Create valid and invalid files | |
| valid_file = Path(temp_dir) / "valid.md" | |
| invalid_file = Path(temp_dir) / "invalid.xyz" | |
| valid_file.write_text("# Valid Policy\n\nThis is valid content.") | |
| invalid_file.write_text("This file has unsupported format.") | |
| pipeline = IngestionPipeline(chunk_size=100, overlap=20, seed=42) | |
| # Should process valid file and skip invalid one | |
| results = pipeline.process_directory(temp_dir) | |
| # Should only get results from valid file | |
| assert len(results) >= 1 | |
| # All results should be from valid file | |
| for result in results: | |
| assert result["metadata"]["filename"] == "valid.md" | |
| def test_pipeline_single_file(): | |
| """Test processing a single file""" | |
| pipeline = IngestionPipeline(chunk_size=100, overlap=20, seed=42) | |
| with tempfile.NamedTemporaryFile(mode="w", suffix=".md", delete=False) as f: | |
| f.write("# Test Policy\n\n" + "Content section. " * 20) | |
| temp_path = f.name | |
| try: | |
| results = pipeline.process_file(temp_path) | |
| # Should get multiple chunks due to length | |
| assert len(results) > 1 | |
| # All chunks should have same filename | |
| filename = Path(temp_path).name | |
| for result in results: | |
| assert result["metadata"]["filename"] == filename | |
| assert result["metadata"]["file_type"] == "md" | |
| assert "chunk_index" in result["metadata"] | |
| finally: | |
| os.unlink(temp_path) | |
| def test_pipeline_empty_directory(): | |
| """Test pipeline with empty directory""" | |
| with tempfile.TemporaryDirectory() as temp_dir: | |
| pipeline = IngestionPipeline(chunk_size=100, overlap=20, seed=42) | |
| results = pipeline.process_directory(temp_dir) | |
| # Should return empty list for empty directory | |
| assert len(results) == 0 | |
| def test_pipeline_nonexistent_directory(): | |
| """Test pipeline with non-existent directory""" | |
| pipeline = IngestionPipeline(chunk_size=100, overlap=20, seed=42) | |
| with pytest.raises(FileNotFoundError): | |
| pipeline.process_directory("/nonexistent/directory") | |
| def test_pipeline_configuration(): | |
| """Test pipeline configuration options""" | |
| # Test different configurations | |
| pipeline_small = IngestionPipeline(chunk_size=50, overlap=10, seed=42) | |
| pipeline_large = IngestionPipeline(chunk_size=200, overlap=50, seed=42) | |
| with tempfile.NamedTemporaryFile(mode="w", suffix=".txt", delete=False) as f: | |
| content = "Policy content goes here. " * 30 # 780 characters | |
| f.write(content) | |
| temp_path = f.name | |
| try: | |
| results_small = pipeline_small.process_file(temp_path) | |
| results_large = pipeline_large.process_file(temp_path) | |
| # Small chunks should create more chunks | |
| assert len(results_small) > len(results_large) | |
| # All chunks should respect size limits | |
| for result in results_small: | |
| assert len(result["content"]) <= 50 | |
| for result in results_large: | |
| assert len(result["content"]) <= 200 | |
| finally: | |
| os.unlink(temp_path) | |