Spaces:
Sleeping
Sleeping
File size: 6,123 Bytes
ffa0f3d 7793bb6 ffa0f3d 7793bb6 ffa0f3d 7793bb6 ffa0f3d 7793bb6 ffa0f3d 7793bb6 ffa0f3d 7793bb6 ffa0f3d 7793bb6 ffa0f3d 7793bb6 ffa0f3d 7793bb6 ffa0f3d 7793bb6 ffa0f3d 7793bb6 ffa0f3d 7793bb6 ffa0f3d 7793bb6 ffa0f3d 7793bb6 ffa0f3d 7793bb6 ffa0f3d 7793bb6 ffa0f3d 7793bb6 ffa0f3d 7793bb6 ffa0f3d 7793bb6 ffa0f3d 7793bb6 ffa0f3d 7793bb6 ffa0f3d 7793bb6 ffa0f3d 7793bb6 ffa0f3d 7793bb6 ffa0f3d 7793bb6 ffa0f3d 7793bb6 ffa0f3d 7793bb6 ffa0f3d 7793bb6 ffa0f3d 7793bb6 ffa0f3d 7793bb6 ffa0f3d 7793bb6 ffa0f3d 7793bb6 ffa0f3d 7793bb6 ffa0f3d 7793bb6 ffa0f3d 7793bb6 ffa0f3d 7793bb6 ffa0f3d 7793bb6 ffa0f3d 7793bb6 ffa0f3d 7793bb6 ffa0f3d 7793bb6 ffa0f3d 7793bb6 ffa0f3d 7793bb6 ffa0f3d 7793bb6 ffa0f3d 7793bb6 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 |
import os
import tempfile
from pathlib import Path
import pytest
from src.ingestion.ingestion_pipeline import IngestionPipeline
def test_full_ingestion_pipeline():
"""Test the complete ingestion pipeline end-to-end"""
# Create temporary test documents
with tempfile.TemporaryDirectory() as temp_dir:
# Create test files
txt_file = Path(temp_dir) / "policy1.txt"
md_file = Path(temp_dir) / "policy2.md"
txt_file.write_text(
"This is a text policy document with important information."
)
md_file.write_text("# Markdown Policy\n\nThis is markdown content.")
# Initialize pipeline
pipeline = IngestionPipeline(chunk_size=50, overlap=10, seed=42)
# Process documents
results = pipeline.process_directory(temp_dir)
assert len(results) >= 2 # At least one result per file
# Verify structure
for result in results:
assert "content" in result
assert "metadata" in result
assert "chunk_id" in result["metadata"]
assert "filename" in result["metadata"]
def test_pipeline_reproducibility():
"""Test that pipeline produces consistent results"""
with tempfile.TemporaryDirectory() as temp_dir:
test_file = Path(temp_dir) / "test.txt"
test_file.write_text("Test content for reproducibility. " * 20)
pipeline1 = IngestionPipeline(chunk_size=100, overlap=20, seed=42)
pipeline2 = IngestionPipeline(chunk_size=100, overlap=20, seed=42)
results1 = pipeline1.process_directory(temp_dir)
results2 = pipeline2.process_directory(temp_dir)
assert len(results1) == len(results2)
for r1, r2 in zip(results1, results2):
assert r1["content"] == r2["content"]
assert r1["metadata"]["chunk_id"] == r2["metadata"]["chunk_id"]
def test_pipeline_with_real_corpus():
"""Test pipeline with actual policy documents"""
pipeline = IngestionPipeline(chunk_size=1000, overlap=200, seed=42)
# Process just one real document to verify it works
corpus_dir = "synthetic_policies"
# Check if corpus directory exists
if not Path(corpus_dir).exists():
pytest.skip("Corpus directory not found - test requires synthetic_policies/")
results = pipeline.process_directory(corpus_dir)
# Should process all 22 documents
assert len(results) > 20 # Should have many chunks from 22 documents
# Verify all results have proper structure
for result in results:
assert "content" in result
assert "metadata" in result
assert "chunk_id" in result["metadata"]
assert "filename" in result["metadata"]
assert "file_type" in result["metadata"]
assert result["metadata"]["file_type"] == "md"
assert "chunk_index" in result["metadata"]
def test_pipeline_error_handling():
"""Test pipeline handles errors gracefully"""
with tempfile.TemporaryDirectory() as temp_dir:
# Create valid and invalid files
valid_file = Path(temp_dir) / "valid.md"
invalid_file = Path(temp_dir) / "invalid.xyz"
valid_file.write_text("# Valid Policy\n\nThis is valid content.")
invalid_file.write_text("This file has unsupported format.")
pipeline = IngestionPipeline(chunk_size=100, overlap=20, seed=42)
# Should process valid file and skip invalid one
results = pipeline.process_directory(temp_dir)
# Should only get results from valid file
assert len(results) >= 1
# All results should be from valid file
for result in results:
assert result["metadata"]["filename"] == "valid.md"
def test_pipeline_single_file():
"""Test processing a single file"""
pipeline = IngestionPipeline(chunk_size=100, overlap=20, seed=42)
with tempfile.NamedTemporaryFile(mode="w", suffix=".md", delete=False) as f:
f.write("# Test Policy\n\n" + "Content section. " * 20)
temp_path = f.name
try:
results = pipeline.process_file(temp_path)
# Should get multiple chunks due to length
assert len(results) > 1
# All chunks should have same filename
filename = Path(temp_path).name
for result in results:
assert result["metadata"]["filename"] == filename
assert result["metadata"]["file_type"] == "md"
assert "chunk_index" in result["metadata"]
finally:
os.unlink(temp_path)
def test_pipeline_empty_directory():
"""Test pipeline with empty directory"""
with tempfile.TemporaryDirectory() as temp_dir:
pipeline = IngestionPipeline(chunk_size=100, overlap=20, seed=42)
results = pipeline.process_directory(temp_dir)
# Should return empty list for empty directory
assert len(results) == 0
def test_pipeline_nonexistent_directory():
"""Test pipeline with non-existent directory"""
pipeline = IngestionPipeline(chunk_size=100, overlap=20, seed=42)
with pytest.raises(FileNotFoundError):
pipeline.process_directory("/nonexistent/directory")
def test_pipeline_configuration():
"""Test pipeline configuration options"""
# Test different configurations
pipeline_small = IngestionPipeline(chunk_size=50, overlap=10, seed=42)
pipeline_large = IngestionPipeline(chunk_size=200, overlap=50, seed=42)
with tempfile.NamedTemporaryFile(mode="w", suffix=".txt", delete=False) as f:
content = "Policy content goes here. " * 30 # 780 characters
f.write(content)
temp_path = f.name
try:
results_small = pipeline_small.process_file(temp_path)
results_large = pipeline_large.process_file(temp_path)
# Small chunks should create more chunks
assert len(results_small) > len(results_large)
# All chunks should respect size limits
for result in results_small:
assert len(result["content"]) <= 50
for result in results_large:
assert len(result["content"]) <= 200
finally:
os.unlink(temp_path)
|