File size: 6,123 Bytes
ffa0f3d
7793bb6
ffa0f3d
7793bb6
 
 
ffa0f3d
 
7793bb6
ffa0f3d
 
 
 
 
 
 
7793bb6
 
 
 
ffa0f3d
7793bb6
ffa0f3d
 
7793bb6
ffa0f3d
 
7793bb6
ffa0f3d
7793bb6
ffa0f3d
 
7793bb6
 
 
 
 
ffa0f3d
 
 
 
 
 
7793bb6
ffa0f3d
 
7793bb6
ffa0f3d
 
7793bb6
ffa0f3d
7793bb6
ffa0f3d
7793bb6
 
 
ffa0f3d
 
 
 
7793bb6
ffa0f3d
 
7793bb6
ffa0f3d
 
 
7793bb6
ffa0f3d
7793bb6
ffa0f3d
 
7793bb6
ffa0f3d
 
7793bb6
 
 
 
 
 
 
 
ffa0f3d
 
 
 
 
 
 
7793bb6
ffa0f3d
 
7793bb6
ffa0f3d
7793bb6
ffa0f3d
 
7793bb6
ffa0f3d
 
7793bb6
ffa0f3d
 
7793bb6
 
ffa0f3d
 
 
 
7793bb6
 
ffa0f3d
 
7793bb6
ffa0f3d
 
7793bb6
ffa0f3d
 
7793bb6
ffa0f3d
 
 
7793bb6
 
 
 
ffa0f3d
 
 
7793bb6
ffa0f3d
 
 
 
7793bb6
ffa0f3d
7793bb6
ffa0f3d
 
 
7793bb6
ffa0f3d
 
 
7793bb6
ffa0f3d
 
 
7793bb6
ffa0f3d
 
 
 
 
7793bb6
 
ffa0f3d
 
 
7793bb6
ffa0f3d
 
 
7793bb6
ffa0f3d
 
7793bb6
ffa0f3d
 
7793bb6
 
ffa0f3d
7793bb6
 
ffa0f3d
7793bb6
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
import os
import tempfile
from pathlib import Path

import pytest

from src.ingestion.ingestion_pipeline import IngestionPipeline


def test_full_ingestion_pipeline():
    """Test the complete ingestion pipeline end-to-end"""
    # Create temporary test documents
    with tempfile.TemporaryDirectory() as temp_dir:
        # Create test files
        txt_file = Path(temp_dir) / "policy1.txt"
        md_file = Path(temp_dir) / "policy2.md"

        txt_file.write_text(
            "This is a text policy document with important information."
        )
        md_file.write_text("# Markdown Policy\n\nThis is markdown content.")

        # Initialize pipeline
        pipeline = IngestionPipeline(chunk_size=50, overlap=10, seed=42)

        # Process documents
        results = pipeline.process_directory(temp_dir)

        assert len(results) >= 2  # At least one result per file

        # Verify structure
        for result in results:
            assert "content" in result
            assert "metadata" in result
            assert "chunk_id" in result["metadata"]
            assert "filename" in result["metadata"]


def test_pipeline_reproducibility():
    """Test that pipeline produces consistent results"""
    with tempfile.TemporaryDirectory() as temp_dir:
        test_file = Path(temp_dir) / "test.txt"
        test_file.write_text("Test content for reproducibility. " * 20)

        pipeline1 = IngestionPipeline(chunk_size=100, overlap=20, seed=42)
        pipeline2 = IngestionPipeline(chunk_size=100, overlap=20, seed=42)

        results1 = pipeline1.process_directory(temp_dir)
        results2 = pipeline2.process_directory(temp_dir)

        assert len(results1) == len(results2)

        for r1, r2 in zip(results1, results2):
            assert r1["content"] == r2["content"]
            assert r1["metadata"]["chunk_id"] == r2["metadata"]["chunk_id"]


def test_pipeline_with_real_corpus():
    """Test pipeline with actual policy documents"""
    pipeline = IngestionPipeline(chunk_size=1000, overlap=200, seed=42)

    # Process just one real document to verify it works
    corpus_dir = "synthetic_policies"

    # Check if corpus directory exists
    if not Path(corpus_dir).exists():
        pytest.skip("Corpus directory not found - test requires synthetic_policies/")

    results = pipeline.process_directory(corpus_dir)

    # Should process all 22 documents
    assert len(results) > 20  # Should have many chunks from 22 documents

    # Verify all results have proper structure
    for result in results:
        assert "content" in result
        assert "metadata" in result
        assert "chunk_id" in result["metadata"]
        assert "filename" in result["metadata"]
        assert "file_type" in result["metadata"]
        assert result["metadata"]["file_type"] == "md"
        assert "chunk_index" in result["metadata"]


def test_pipeline_error_handling():
    """Test pipeline handles errors gracefully"""
    with tempfile.TemporaryDirectory() as temp_dir:
        # Create valid and invalid files
        valid_file = Path(temp_dir) / "valid.md"
        invalid_file = Path(temp_dir) / "invalid.xyz"

        valid_file.write_text("# Valid Policy\n\nThis is valid content.")
        invalid_file.write_text("This file has unsupported format.")

        pipeline = IngestionPipeline(chunk_size=100, overlap=20, seed=42)

        # Should process valid file and skip invalid one
        results = pipeline.process_directory(temp_dir)

        # Should only get results from valid file
        assert len(results) >= 1

        # All results should be from valid file
        for result in results:
            assert result["metadata"]["filename"] == "valid.md"


def test_pipeline_single_file():
    """Test processing a single file"""
    pipeline = IngestionPipeline(chunk_size=100, overlap=20, seed=42)

    with tempfile.NamedTemporaryFile(mode="w", suffix=".md", delete=False) as f:
        f.write("# Test Policy\n\n" + "Content section. " * 20)
        temp_path = f.name

    try:
        results = pipeline.process_file(temp_path)

        # Should get multiple chunks due to length
        assert len(results) > 1

        # All chunks should have same filename
        filename = Path(temp_path).name
        for result in results:
            assert result["metadata"]["filename"] == filename
            assert result["metadata"]["file_type"] == "md"
            assert "chunk_index" in result["metadata"]

    finally:
        os.unlink(temp_path)


def test_pipeline_empty_directory():
    """Test pipeline with empty directory"""
    with tempfile.TemporaryDirectory() as temp_dir:
        pipeline = IngestionPipeline(chunk_size=100, overlap=20, seed=42)

        results = pipeline.process_directory(temp_dir)

        # Should return empty list for empty directory
        assert len(results) == 0


def test_pipeline_nonexistent_directory():
    """Test pipeline with non-existent directory"""
    pipeline = IngestionPipeline(chunk_size=100, overlap=20, seed=42)

    with pytest.raises(FileNotFoundError):
        pipeline.process_directory("/nonexistent/directory")


def test_pipeline_configuration():
    """Test pipeline configuration options"""
    # Test different configurations
    pipeline_small = IngestionPipeline(chunk_size=50, overlap=10, seed=42)
    pipeline_large = IngestionPipeline(chunk_size=200, overlap=50, seed=42)

    with tempfile.NamedTemporaryFile(mode="w", suffix=".txt", delete=False) as f:
        content = "Policy content goes here. " * 30  # 780 characters
        f.write(content)
        temp_path = f.name

    try:
        results_small = pipeline_small.process_file(temp_path)
        results_large = pipeline_large.process_file(temp_path)

        # Small chunks should create more chunks
        assert len(results_small) > len(results_large)

        # All chunks should respect size limits
        for result in results_small:
            assert len(result["content"]) <= 50

        for result in results_large:
            assert len(result["content"]) <= 200

    finally:
        os.unlink(temp_path)