Tobias Pasquale commited on
Commit
5abed81
·
2 Parent(s): ca68eb2 57abf2f

Merge pull request #27 from sethmcknight/feat/enhanced-ingestion-pipeline

Browse files
.gitignore CHANGED
@@ -36,3 +36,6 @@ planning/
36
  *.log
37
  *.tmp
38
  .env.local
 
 
 
 
36
  *.log
37
  *.tmp
38
  .env.local
39
+
40
+ # Vector Database (ChromaDB data)
41
+ data/chroma_db/
CHANGELOG.md CHANGED
@@ -19,6 +19,56 @@ Each entry includes:
19
 
20
  ---
21
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
22
  ## Changelog Entries
23
 
24
  ### 2025-12-28 - Phase 2B SearchService Implementation
 
19
 
20
  ---
21
 
22
+ ### 2025-10-17 - Enhanced Ingestion Pipeline with Embeddings Integration
23
+
24
+ **Entry #019** | **Action Type**: CREATE/UPDATE | **Component**: Enhanced Ingestion Pipeline | **Issue**: #21
25
+
26
+ - **Files Changed**:
27
+ - `src/ingestion/ingestion_pipeline.py` (ENHANCED) - Added embedding integration and enhanced reporting
28
+ - `app.py` (UPDATED) - Enhanced /ingest endpoint with configurable embedding storage
29
+ - `tests/test_ingestion/test_enhanced_ingestion_pipeline.py` (NEW) - Comprehensive test suite for enhanced functionality
30
+ - `tests/test_enhanced_app.py` (NEW) - Flask endpoint tests for enhanced ingestion
31
+ - **Implementation Details**:
32
+ - **Core Features**: Embeddings integration with configurable on/off, batch processing with 32-item batches, enhanced API response with statistics
33
+ - **Backward Compatibility**: Maintained original `process_directory()` method for existing tests, added new `process_directory_with_embeddings()` method
34
+ - **API Enhancement**: /ingest endpoint accepts `{"store_embeddings": true/false}` parameter, enhanced response includes files_processed, embeddings_stored, failed_files
35
+ - **Error Handling**: Comprehensive error handling with graceful degradation, detailed failure reporting per file and batch
36
+ - **Batch Processing**: Memory-efficient 32-chunk batches for embedding generation, progress reporting during processing
37
+ - **Integration**: Seamless integration with existing EmbeddingService and VectorDatabase components
38
+ - **Test Coverage**:
39
+ - ✅ 14/14 enhanced ingestion tests passing (100% success rate)
40
+ - Unit tests with mocked embedding services (4 tests)
41
+ - Integration tests with real components (4 tests)
42
+ - Backward compatibility validation (2 tests)
43
+ - Flask endpoint testing (4 tests)
44
+ - ✅ All existing tests maintained backward compatibility (8/8 passing)
45
+ - **Quality Assurance**:
46
+ - ✅ Comprehensive error handling with graceful degradation
47
+ - ✅ Memory-efficient batch processing implementation
48
+ - ✅ Backward compatibility maintained for existing API
49
+ - ✅ Enhanced reporting and statistics generation
50
+ - **Performance**:
51
+ - Batch processing: 32 chunks per batch for memory efficiency
52
+ - Progress reporting: Real-time batch processing updates
53
+ - Error resilience: Continues processing despite individual file/batch failures
54
+ - **Flask API Enhancement**:
55
+ - Enhanced /ingest endpoint with JSON parameter support
56
+ - Configurable embedding storage: `{"store_embeddings": true/false}`
57
+ - Enhanced response format with comprehensive statistics
58
+ - Backward compatible with existing clients
59
+ - **Dependencies**:
60
+ - Builds on existing EmbeddingService and VectorDatabase (Phase 2A)
61
+ - Integrates with SearchService for complete RAG pipeline
62
+ - Maintains compatibility with existing ingestion components
63
+ - **CI/CD**: ✅ All 71 tests pass including new enhanced functionality
64
+ - **Notes**:
65
+ - Addresses GitHub Issue #21 requirements completely
66
+ - Maintains full backward compatibility while adding enhanced features
67
+ - Ready for integration with SearchService and upcoming /search endpoint
68
+ - Sets foundation for complete RAG pipeline implementation
69
+
70
+ ---
71
+
72
  ## Changelog Entries
73
 
74
  ### 2025-12-28 - Phase 2B SearchService Implementation
app.py CHANGED
@@ -1,4 +1,4 @@
1
- from flask import Flask, jsonify, render_template
2
 
3
  app = Flask(__name__)
4
 
@@ -21,7 +21,7 @@ def health():
21
 
22
  @app.route("/ingest", methods=["POST"])
23
  def ingest():
24
- """Endpoint to trigger document ingestion"""
25
  try:
26
  from src.config import (
27
  CORPUS_DIRECTORY,
@@ -31,23 +31,138 @@ def ingest():
31
  )
32
  from src.ingestion.ingestion_pipeline import IngestionPipeline
33
 
 
 
 
 
34
  pipeline = IngestionPipeline(
35
- chunk_size=DEFAULT_CHUNK_SIZE, overlap=DEFAULT_OVERLAP, seed=RANDOM_SEED
 
 
 
36
  )
37
 
38
- chunks = pipeline.process_directory(CORPUS_DIRECTORY)
39
 
40
- return jsonify(
41
- {
42
- "status": "success",
43
- "chunks_processed": len(chunks),
44
- "message": f"Successfully processed {len(chunks)} chunks",
45
- }
46
- )
 
 
 
 
 
 
 
 
 
 
 
 
 
47
 
48
  except Exception as e:
49
  return jsonify({"status": "error", "message": str(e)}), 500
50
 
51
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
52
  if __name__ == "__main__":
53
  app.run(debug=True)
 
1
+ from flask import Flask, jsonify, render_template, request
2
 
3
  app = Flask(__name__)
4
 
 
21
 
22
  @app.route("/ingest", methods=["POST"])
23
  def ingest():
24
+ """Endpoint to trigger document ingestion with embeddings"""
25
  try:
26
  from src.config import (
27
  CORPUS_DIRECTORY,
 
31
  )
32
  from src.ingestion.ingestion_pipeline import IngestionPipeline
33
 
34
+ # Get optional parameters from request
35
+ data = request.get_json() if request.is_json else {}
36
+ store_embeddings = data.get("store_embeddings", True)
37
+
38
  pipeline = IngestionPipeline(
39
+ chunk_size=DEFAULT_CHUNK_SIZE,
40
+ overlap=DEFAULT_OVERLAP,
41
+ seed=RANDOM_SEED,
42
+ store_embeddings=store_embeddings,
43
  )
44
 
45
+ result = pipeline.process_directory_with_embeddings(CORPUS_DIRECTORY)
46
 
47
+ # Create response with enhanced information
48
+ response = {
49
+ "status": result["status"],
50
+ "chunks_processed": result["chunks_processed"],
51
+ "files_processed": result["files_processed"],
52
+ "embeddings_stored": result["embeddings_stored"],
53
+ "store_embeddings": result["store_embeddings"],
54
+ "message": (
55
+ f"Successfully processed {result['chunks_processed']} chunks "
56
+ f"from {result['files_processed']} files"
57
+ ),
58
+ }
59
+
60
+ # Include failed files info if any
61
+ if result["failed_files"]:
62
+ response["failed_files"] = result["failed_files"]
63
+ failed_count = len(result["failed_files"])
64
+ response["warnings"] = f"{failed_count} files failed to process"
65
+
66
+ return jsonify(response)
67
 
68
  except Exception as e:
69
  return jsonify({"status": "error", "message": str(e)}), 500
70
 
71
 
72
+ @app.route("/search", methods=["POST"])
73
+ def search():
74
+ """
75
+ Endpoint to perform semantic search on ingested documents.
76
+
77
+ Accepts JSON requests with query text and optional parameters.
78
+ Returns semantically similar document chunks.
79
+ """
80
+ try:
81
+ # Validate request contains JSON data
82
+ if not request.is_json:
83
+ return (
84
+ jsonify(
85
+ {
86
+ "status": "error",
87
+ "message": "Content-Type must be application/json",
88
+ }
89
+ ),
90
+ 400,
91
+ )
92
+
93
+ data = request.get_json()
94
+
95
+ # Validate required query parameter
96
+ query = data.get("query")
97
+ if query is None:
98
+ return (
99
+ jsonify({"status": "error", "message": "Query parameter is required"}),
100
+ 400,
101
+ )
102
+
103
+ if not isinstance(query, str) or not query.strip():
104
+ return (
105
+ jsonify(
106
+ {"status": "error", "message": "Query must be a non-empty string"}
107
+ ),
108
+ 400,
109
+ )
110
+
111
+ # Extract optional parameters with defaults
112
+ top_k = data.get("top_k", 5)
113
+ threshold = data.get("threshold", 0.3)
114
+
115
+ # Validate parameters
116
+ if not isinstance(top_k, int) or top_k <= 0:
117
+ return (
118
+ jsonify(
119
+ {"status": "error", "message": "top_k must be a positive integer"}
120
+ ),
121
+ 400,
122
+ )
123
+
124
+ if not isinstance(threshold, (int, float)) or not (0.0 <= threshold <= 1.0):
125
+ return (
126
+ jsonify(
127
+ {
128
+ "status": "error",
129
+ "message": "threshold must be a number between 0 and 1",
130
+ }
131
+ ),
132
+ 400,
133
+ )
134
+
135
+ # Initialize search components
136
+ from src.config import COLLECTION_NAME, VECTOR_DB_PERSIST_PATH
137
+ from src.embedding.embedding_service import EmbeddingService
138
+ from src.search.search_service import SearchService
139
+ from src.vector_store.vector_db import VectorDatabase
140
+
141
+ vector_db = VectorDatabase(VECTOR_DB_PERSIST_PATH, COLLECTION_NAME)
142
+ embedding_service = EmbeddingService()
143
+ search_service = SearchService(vector_db, embedding_service)
144
+
145
+ # Perform search
146
+ results = search_service.search(
147
+ query=query.strip(), top_k=top_k, threshold=threshold
148
+ )
149
+
150
+ # Format response
151
+ response = {
152
+ "status": "success",
153
+ "query": query.strip(),
154
+ "results_count": len(results),
155
+ "results": results,
156
+ }
157
+
158
+ return jsonify(response)
159
+
160
+ except ValueError as e:
161
+ return jsonify({"status": "error", "message": str(e)}), 400
162
+
163
+ except Exception as e:
164
+ return jsonify({"status": "error", "message": f"Search failed: {str(e)}"}), 500
165
+
166
+
167
  if __name__ == "__main__":
168
  app.run(debug=True)
src/ingestion/ingestion_pipeline.py CHANGED
@@ -1,14 +1,24 @@
1
  from pathlib import Path
2
- from typing import Any, Dict, List
3
 
 
 
4
  from .document_chunker import DocumentChunker
5
  from .document_parser import DocumentParser
6
 
7
 
8
  class IngestionPipeline:
9
- """Complete ingestion pipeline for processing document corpus"""
10
-
11
- def __init__(self, chunk_size: int = 1000, overlap: int = 200, seed: int = 42):
 
 
 
 
 
 
 
 
12
  """
13
  Initialize the ingestion pipeline
14
 
@@ -16,16 +26,35 @@ class IngestionPipeline:
16
  chunk_size: Size of text chunks
17
  overlap: Overlap between chunks
18
  seed: Random seed for reproducibility
 
 
 
19
  """
20
  self.parser = DocumentParser()
21
  self.chunker = DocumentChunker(
22
  chunk_size=chunk_size, overlap=overlap, seed=seed
23
  )
24
  self.seed = seed
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
25
 
26
  def process_directory(self, directory_path: str) -> List[Dict[str, Any]]:
27
  """
28
- Process all supported documents in a directory
29
 
30
  Args:
31
  directory_path: Path to directory containing documents
@@ -54,6 +83,63 @@ class IngestionPipeline:
54
 
55
  return all_chunks
56
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
57
  def process_file(self, file_path: str) -> List[Dict[str, Any]]:
58
  """
59
  Process a single file through the complete pipeline
@@ -73,3 +159,51 @@ class IngestionPipeline:
73
  )
74
 
75
  return chunks
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  from pathlib import Path
2
+ from typing import Any, Dict, List, Optional
3
 
4
+ from ..embedding.embedding_service import EmbeddingService
5
+ from ..vector_store.vector_db import VectorDatabase
6
  from .document_chunker import DocumentChunker
7
  from .document_parser import DocumentParser
8
 
9
 
10
  class IngestionPipeline:
11
+ """Complete ingestion pipeline for processing document corpus with embeddings"""
12
+
13
+ def __init__(
14
+ self,
15
+ chunk_size: int = 1000,
16
+ overlap: int = 200,
17
+ seed: int = 42,
18
+ store_embeddings: bool = True,
19
+ vector_db: Optional[VectorDatabase] = None,
20
+ embedding_service: Optional[EmbeddingService] = None,
21
+ ):
22
  """
23
  Initialize the ingestion pipeline
24
 
 
26
  chunk_size: Size of text chunks
27
  overlap: Overlap between chunks
28
  seed: Random seed for reproducibility
29
+ store_embeddings: Whether to generate and store embeddings
30
+ vector_db: Vector database instance for storing embeddings
31
+ embedding_service: Embedding service for generating embeddings
32
  """
33
  self.parser = DocumentParser()
34
  self.chunker = DocumentChunker(
35
  chunk_size=chunk_size, overlap=overlap, seed=seed
36
  )
37
  self.seed = seed
38
+ self.store_embeddings = store_embeddings
39
+
40
+ # Initialize embedding components if storing embeddings
41
+ if store_embeddings:
42
+ self.embedding_service = embedding_service or EmbeddingService()
43
+ if vector_db is None:
44
+ from ..config import COLLECTION_NAME, VECTOR_DB_PERSIST_PATH
45
+
46
+ self.vector_db = VectorDatabase(
47
+ persist_path=VECTOR_DB_PERSIST_PATH, collection_name=COLLECTION_NAME
48
+ )
49
+ else:
50
+ self.vector_db = vector_db
51
+ else:
52
+ self.embedding_service = None
53
+ self.vector_db = None
54
 
55
  def process_directory(self, directory_path: str) -> List[Dict[str, Any]]:
56
  """
57
+ Process all supported documents in a directory (backward compatible)
58
 
59
  Args:
60
  directory_path: Path to directory containing documents
 
83
 
84
  return all_chunks
85
 
86
+ def process_directory_with_embeddings(self, directory_path: str) -> Dict[str, Any]:
87
+ """
88
+ Process all supported documents in a directory with embeddings and enhanced
89
+ reporting
90
+
91
+ Args:
92
+ directory_path: Path to directory containing documents
93
+
94
+ Returns:
95
+ Dictionary with processing results and statistics
96
+ """
97
+ directory = Path(directory_path)
98
+ if not directory.exists():
99
+ raise FileNotFoundError(f"Directory not found: {directory_path}")
100
+
101
+ all_chunks = []
102
+ processed_files = 0
103
+ failed_files = []
104
+ embeddings_stored = 0
105
+
106
+ # Process each supported file
107
+ for file_path in directory.iterdir():
108
+ if (
109
+ file_path.is_file()
110
+ and file_path.suffix.lower() in self.parser.SUPPORTED_FORMATS
111
+ ):
112
+ try:
113
+ chunks = self.process_file(str(file_path))
114
+ all_chunks.extend(chunks)
115
+ processed_files += 1
116
+ except Exception as e:
117
+ print(f"Warning: Failed to process {file_path}: {e}")
118
+ failed_files.append({"file": str(file_path), "error": str(e)})
119
+ continue
120
+
121
+ # Generate and store embeddings if enabled
122
+ if (
123
+ self.store_embeddings
124
+ and all_chunks
125
+ and self.embedding_service
126
+ and self.vector_db
127
+ ):
128
+ try:
129
+ embeddings_stored = self._store_embeddings_batch(all_chunks)
130
+ except Exception as e:
131
+ print(f"Warning: Failed to store embeddings: {e}")
132
+
133
+ return {
134
+ "status": "success",
135
+ "chunks_processed": len(all_chunks),
136
+ "files_processed": processed_files,
137
+ "failed_files": failed_files,
138
+ "embeddings_stored": embeddings_stored,
139
+ "store_embeddings": self.store_embeddings,
140
+ "chunks": all_chunks, # Include chunks for backward compatibility
141
+ }
142
+
143
  def process_file(self, file_path: str) -> List[Dict[str, Any]]:
144
  """
145
  Process a single file through the complete pipeline
 
159
  )
160
 
161
  return chunks
162
+
163
+ def _store_embeddings_batch(self, chunks: List[Dict[str, Any]]) -> int:
164
+ """
165
+ Generate embeddings and store chunks in vector database
166
+
167
+ Args:
168
+ chunks: List of text chunks with metadata
169
+
170
+ Returns:
171
+ Number of embeddings stored successfully
172
+ """
173
+ if not self.embedding_service or not self.vector_db:
174
+ return 0
175
+
176
+ stored_count = 0
177
+ batch_size = 32 # Process in batches for memory efficiency
178
+
179
+ for i in range(0, len(chunks), batch_size):
180
+ batch = chunks[i : i + batch_size]
181
+
182
+ try:
183
+ # Extract texts and prepare data for vector storage
184
+ texts = [chunk["content"] for chunk in batch]
185
+ chunk_ids = [chunk["metadata"]["chunk_id"] for chunk in batch]
186
+ metadatas = [chunk["metadata"] for chunk in batch]
187
+
188
+ # Generate embeddings for the batch
189
+ embeddings = self.embedding_service.embed_texts(texts)
190
+
191
+ # Store in vector database
192
+ self.vector_db.add_embeddings(
193
+ embeddings=embeddings,
194
+ chunk_ids=chunk_ids,
195
+ documents=texts,
196
+ metadatas=metadatas,
197
+ )
198
+
199
+ stored_count += len(batch)
200
+ print(
201
+ f"Stored embeddings for batch {i // batch_size + 1}: "
202
+ f"{len(batch)} chunks"
203
+ )
204
+
205
+ except Exception as e:
206
+ print(f"Warning: Failed to store batch {i // batch_size + 1}: {e}")
207
+ continue
208
+
209
+ return stored_count
tests/test_app.py CHANGED
@@ -1,3 +1,5 @@
 
 
1
  import pytest
2
 
3
  from app import app as flask_app
@@ -38,3 +40,123 @@ def test_ingest_endpoint_exists():
38
  response = client.post("/ingest")
39
  # Should not be 404 (not found)
40
  assert response.status_code != 404
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import json
2
+
3
  import pytest
4
 
5
  from app import app as flask_app
 
40
  response = client.post("/ingest")
41
  # Should not be 404 (not found)
42
  assert response.status_code != 404
43
+
44
+
45
+ class TestSearchEndpoint:
46
+ """Test cases for the /search endpoint"""
47
+
48
+ def test_search_endpoint_valid_request(self, client):
49
+ """Test search endpoint with valid request"""
50
+ request_data = {"query": "remote work policy", "top_k": 3, "threshold": 0.3}
51
+
52
+ response = client.post(
53
+ "/search", data=json.dumps(request_data), content_type="application/json"
54
+ )
55
+
56
+ assert response.status_code == 200
57
+ data = response.get_json()
58
+
59
+ assert data["status"] == "success"
60
+ assert data["query"] == "remote work policy"
61
+ assert "results_count" in data
62
+ assert "results" in data
63
+ assert isinstance(data["results"], list)
64
+
65
+ def test_search_endpoint_minimal_request(self, client):
66
+ """Test search endpoint with minimal request (only query)"""
67
+ request_data = {"query": "employee benefits"}
68
+
69
+ response = client.post(
70
+ "/search", data=json.dumps(request_data), content_type="application/json"
71
+ )
72
+
73
+ assert response.status_code == 200
74
+ data = response.get_json()
75
+
76
+ assert data["status"] == "success"
77
+ assert data["query"] == "employee benefits"
78
+
79
+ def test_search_endpoint_missing_query(self, client):
80
+ """Test search endpoint with missing query parameter"""
81
+ request_data = {"top_k": 5}
82
+
83
+ response = client.post(
84
+ "/search", data=json.dumps(request_data), content_type="application/json"
85
+ )
86
+
87
+ assert response.status_code == 400
88
+ data = response.get_json()
89
+
90
+ assert data["status"] == "error"
91
+ assert "Query parameter is required" in data["message"]
92
+
93
+ def test_search_endpoint_empty_query(self, client):
94
+ """Test search endpoint with empty query"""
95
+ request_data = {"query": ""}
96
+
97
+ response = client.post(
98
+ "/search", data=json.dumps(request_data), content_type="application/json"
99
+ )
100
+
101
+ assert response.status_code == 400
102
+ data = response.get_json()
103
+
104
+ assert data["status"] == "error"
105
+ assert "non-empty string" in data["message"]
106
+
107
+ def test_search_endpoint_invalid_top_k(self, client):
108
+ """Test search endpoint with invalid top_k parameter"""
109
+ request_data = {"query": "test query", "top_k": -1}
110
+
111
+ response = client.post(
112
+ "/search", data=json.dumps(request_data), content_type="application/json"
113
+ )
114
+
115
+ assert response.status_code == 400
116
+ data = response.get_json()
117
+
118
+ assert data["status"] == "error"
119
+ assert "positive integer" in data["message"]
120
+
121
+ def test_search_endpoint_invalid_threshold(self, client):
122
+ """Test search endpoint with invalid threshold parameter"""
123
+ request_data = {"query": "test query", "threshold": 1.5}
124
+
125
+ response = client.post(
126
+ "/search", data=json.dumps(request_data), content_type="application/json"
127
+ )
128
+
129
+ assert response.status_code == 400
130
+ data = response.get_json()
131
+
132
+ assert data["status"] == "error"
133
+ assert "between 0 and 1" in data["message"]
134
+
135
+ def test_search_endpoint_non_json_request(self, client):
136
+ """Test search endpoint with non-JSON request"""
137
+ response = client.post("/search", data="not json", content_type="text/plain")
138
+
139
+ assert response.status_code == 400
140
+ data = response.get_json()
141
+
142
+ assert data["status"] == "error"
143
+ assert "application/json" in data["message"]
144
+
145
+ def test_search_endpoint_result_structure(self, client):
146
+ """Test that search results have the correct structure"""
147
+ request_data = {"query": "policy"}
148
+
149
+ response = client.post(
150
+ "/search", data=json.dumps(request_data), content_type="application/json"
151
+ )
152
+
153
+ assert response.status_code == 200
154
+ data = response.get_json()
155
+
156
+ if data["results_count"] > 0:
157
+ result = data["results"][0]
158
+ assert "chunk_id" in result
159
+ assert "content" in result
160
+ assert "similarity_score" in result
161
+ assert "metadata" in result
162
+ assert isinstance(result["similarity_score"], (int, float))
tests/test_enhanced_app.py ADDED
@@ -0,0 +1,103 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Test for enhanced ingestion Flask endpoint
3
+ """
4
+
5
+ import json
6
+ import tempfile
7
+ import unittest
8
+ from pathlib import Path
9
+ from unittest.mock import patch
10
+
11
+ from app import app
12
+
13
+
14
+ class TestEnhancedIngestionEndpoint(unittest.TestCase):
15
+ """Test cases for enhanced ingestion Flask endpoint"""
16
+
17
+ def setUp(self):
18
+ """Set up test fixtures"""
19
+ app.config["TESTING"] = True
20
+ self.app = app.test_client()
21
+
22
+ # Create temporary directory and files for testing
23
+ self.temp_dir = tempfile.mkdtemp()
24
+ self.test_dir = Path(self.temp_dir)
25
+
26
+ self.test_file = self.test_dir / "test.md"
27
+ self.test_file.write_text(
28
+ "# Test Document\n\nThis is test content for enhanced ingestion."
29
+ )
30
+
31
+ def test_ingest_endpoint_with_embeddings_default(self):
32
+ """Test ingestion endpoint with default embeddings enabled"""
33
+ with patch("src.config.CORPUS_DIRECTORY", str(self.test_dir)):
34
+ response = self.app.post("/ingest")
35
+
36
+ self.assertEqual(response.status_code, 200)
37
+ data = json.loads(response.data)
38
+
39
+ # Check enhanced response structure
40
+ self.assertEqual(data["status"], "success")
41
+ self.assertIn("chunks_processed", data)
42
+ self.assertIn("files_processed", data)
43
+ self.assertIn("embeddings_stored", data)
44
+ self.assertIn("store_embeddings", data)
45
+ self.assertTrue(data["store_embeddings"]) # Default is True
46
+ self.assertGreater(data["chunks_processed"], 0)
47
+ self.assertGreater(data["files_processed"], 0)
48
+
49
+ def test_ingest_endpoint_with_embeddings_disabled(self):
50
+ """Test ingestion endpoint with embeddings disabled"""
51
+ with patch("src.config.CORPUS_DIRECTORY", str(self.test_dir)):
52
+ response = self.app.post(
53
+ "/ingest",
54
+ data=json.dumps({"store_embeddings": False}),
55
+ content_type="application/json",
56
+ )
57
+
58
+ self.assertEqual(response.status_code, 200)
59
+ data = json.loads(response.data)
60
+
61
+ # Check response structure with embeddings disabled
62
+ self.assertEqual(data["status"], "success")
63
+ self.assertIn("chunks_processed", data)
64
+ self.assertIn("files_processed", data)
65
+ self.assertIn("embeddings_stored", data)
66
+ self.assertIn("store_embeddings", data)
67
+ self.assertFalse(data["store_embeddings"])
68
+ self.assertEqual(data["embeddings_stored"], 0)
69
+ self.assertGreater(data["chunks_processed"], 0)
70
+ self.assertGreater(data["files_processed"], 0)
71
+
72
+ def test_ingest_endpoint_with_no_json(self):
73
+ """Test ingestion endpoint with no JSON payload (should default to
74
+ embeddings enabled)"""
75
+ with patch("src.config.CORPUS_DIRECTORY", str(self.test_dir)):
76
+ response = self.app.post("/ingest")
77
+
78
+ self.assertEqual(response.status_code, 200)
79
+ data = json.loads(response.data)
80
+
81
+ # Should default to embeddings enabled
82
+ self.assertTrue(data["store_embeddings"])
83
+
84
+ def test_ingest_endpoint_error_handling(self):
85
+ """Test ingestion endpoint error handling"""
86
+ with patch("src.config.CORPUS_DIRECTORY", "/nonexistent/directory"):
87
+ response = self.app.post("/ingest")
88
+
89
+ self.assertEqual(response.status_code, 500)
90
+ data = json.loads(response.data)
91
+
92
+ self.assertEqual(data["status"], "error")
93
+ self.assertIn("message", data)
94
+
95
+ def tearDown(self):
96
+ """Clean up test fixtures"""
97
+ import shutil
98
+
99
+ shutil.rmtree(self.temp_dir, ignore_errors=True)
100
+
101
+
102
+ if __name__ == "__main__":
103
+ unittest.main()
tests/test_ingestion/test_enhanced_ingestion_pipeline.py ADDED
@@ -0,0 +1,231 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Tests for enhanced ingestion pipeline with embeddings
3
+ """
4
+
5
+ import tempfile
6
+ import unittest
7
+ from pathlib import Path
8
+ from unittest.mock import Mock, patch
9
+
10
+ from src.ingestion.ingestion_pipeline import IngestionPipeline
11
+
12
+
13
+ class TestEnhancedIngestionPipeline(unittest.TestCase):
14
+ """Test cases for enhanced IngestionPipeline with embeddings"""
15
+
16
+ def setUp(self):
17
+ """Set up test fixtures"""
18
+ self.temp_dir = tempfile.mkdtemp()
19
+ self.test_dir = Path(self.temp_dir)
20
+
21
+ # Create test files
22
+ self.test_file1 = self.test_dir / "test1.md"
23
+ self.test_file1.write_text(
24
+ "# Test Document 1\n\nThis is test content for document 1."
25
+ )
26
+
27
+ self.test_file2 = self.test_dir / "test2.txt"
28
+ self.test_file2.write_text("This is test content for document 2.")
29
+
30
+ # Create an unsupported file (should be skipped)
31
+ self.test_file3 = self.test_dir / "test3.pdf"
32
+ self.test_file3.write_text("PDF content")
33
+
34
+ def test_initialization_without_embeddings(self):
35
+ """Test pipeline initialization without embeddings"""
36
+ pipeline = IngestionPipeline(store_embeddings=False)
37
+
38
+ self.assertIsNotNone(pipeline.parser)
39
+ self.assertIsNotNone(pipeline.chunker)
40
+ self.assertFalse(pipeline.store_embeddings)
41
+ self.assertIsNone(pipeline.embedding_service)
42
+ self.assertIsNone(pipeline.vector_db)
43
+
44
+ def test_initialization_with_embeddings(self):
45
+ """Test pipeline initialization with embeddings"""
46
+ pipeline = IngestionPipeline(store_embeddings=True)
47
+
48
+ self.assertIsNotNone(pipeline.parser)
49
+ self.assertIsNotNone(pipeline.chunker)
50
+ self.assertTrue(pipeline.store_embeddings)
51
+ self.assertIsNotNone(pipeline.embedding_service)
52
+ self.assertIsNotNone(pipeline.vector_db)
53
+
54
+ def test_initialization_with_custom_components(self):
55
+ """Test pipeline initialization with custom embedding components"""
56
+ mock_embedding_service = Mock()
57
+ mock_vector_db = Mock()
58
+
59
+ pipeline = IngestionPipeline(
60
+ store_embeddings=True,
61
+ embedding_service=mock_embedding_service,
62
+ vector_db=mock_vector_db,
63
+ )
64
+
65
+ self.assertEqual(pipeline.embedding_service, mock_embedding_service)
66
+ self.assertEqual(pipeline.vector_db, mock_vector_db)
67
+
68
+ def test_process_directory_without_embeddings(self):
69
+ """Test directory processing without embeddings"""
70
+ pipeline = IngestionPipeline(store_embeddings=False)
71
+ result = pipeline.process_directory_with_embeddings(str(self.test_dir))
72
+
73
+ # Check response structure
74
+ self.assertIsInstance(result, dict)
75
+ self.assertEqual(result["status"], "success")
76
+ self.assertGreater(result["chunks_processed"], 0)
77
+ self.assertEqual(result["files_processed"], 2) # Only .md and .txt files
78
+ self.assertEqual(result["embeddings_stored"], 0)
79
+ self.assertFalse(result["store_embeddings"])
80
+ self.assertIn("chunks", result)
81
+
82
+ @patch("src.ingestion.ingestion_pipeline.VectorDatabase")
83
+ @patch("src.ingestion.ingestion_pipeline.EmbeddingService")
84
+ def test_process_directory_with_embeddings(
85
+ self, mock_embedding_service_class, mock_vector_db_class
86
+ ):
87
+ """Test directory processing with embeddings"""
88
+ # Mock the classes to return mock instances
89
+ mock_embedding_service = Mock()
90
+ mock_vector_db = Mock()
91
+ mock_embedding_service_class.return_value = mock_embedding_service
92
+ mock_vector_db_class.return_value = mock_vector_db
93
+
94
+ # Configure mock embedding service
95
+ mock_embedding_service.embed_texts.return_value = [
96
+ [0.1, 0.2, 0.3],
97
+ [0.4, 0.5, 0.6],
98
+ ]
99
+
100
+ # Configure mock vector database
101
+ mock_vector_db.add_embeddings.return_value = True
102
+
103
+ pipeline = IngestionPipeline(store_embeddings=True)
104
+ result = pipeline.process_directory_with_embeddings(str(self.test_dir))
105
+
106
+ # Check response structure
107
+ self.assertIsInstance(result, dict)
108
+ self.assertEqual(result["status"], "success")
109
+ self.assertGreater(result["chunks_processed"], 0)
110
+ self.assertEqual(result["files_processed"], 2)
111
+ self.assertGreater(result["embeddings_stored"], 0)
112
+ self.assertTrue(result["store_embeddings"])
113
+
114
+ # Verify embedding service was called
115
+ mock_embedding_service.embed_texts.assert_called()
116
+ mock_vector_db.add_embeddings.assert_called()
117
+
118
+ def test_process_directory_nonexistent(self):
119
+ """Test processing non-existent directory"""
120
+ pipeline = IngestionPipeline(store_embeddings=False)
121
+
122
+ with self.assertRaises(FileNotFoundError):
123
+ pipeline.process_directory("/nonexistent/directory")
124
+
125
+ def test_store_embeddings_batch_without_components(self):
126
+ """Test batch embedding storage without embedding components"""
127
+ pipeline = IngestionPipeline(store_embeddings=False)
128
+
129
+ chunks = [
130
+ {
131
+ "content": "Test content 1",
132
+ "metadata": {"chunk_id": "test1", "source": "test1.txt"},
133
+ }
134
+ ]
135
+
136
+ result = pipeline._store_embeddings_batch(chunks)
137
+ self.assertEqual(result, 0)
138
+
139
+ @patch("src.ingestion.ingestion_pipeline.VectorDatabase")
140
+ @patch("src.ingestion.ingestion_pipeline.EmbeddingService")
141
+ def test_store_embeddings_batch_success(
142
+ self, mock_embedding_service_class, mock_vector_db_class
143
+ ):
144
+ """Test successful batch embedding storage"""
145
+ # Mock the classes to return mock instances
146
+ mock_embedding_service = Mock()
147
+ mock_vector_db = Mock()
148
+ mock_embedding_service_class.return_value = mock_embedding_service
149
+ mock_vector_db_class.return_value = mock_vector_db
150
+
151
+ # Configure mocks
152
+ mock_embedding_service.embed_texts.return_value = [
153
+ [0.1, 0.2, 0.3],
154
+ [0.4, 0.5, 0.6],
155
+ ]
156
+ mock_vector_db.add_embeddings.return_value = True
157
+
158
+ pipeline = IngestionPipeline(store_embeddings=True)
159
+
160
+ chunks = [
161
+ {
162
+ "content": "Test content 1",
163
+ "metadata": {"chunk_id": "test1", "source": "test1.txt"},
164
+ },
165
+ {
166
+ "content": "Test content 2",
167
+ "metadata": {"chunk_id": "test2", "source": "test2.txt"},
168
+ },
169
+ ]
170
+
171
+ result = pipeline._store_embeddings_batch(chunks)
172
+ self.assertEqual(result, 2)
173
+
174
+ # Verify method calls
175
+ mock_embedding_service.embed_texts.assert_called_once_with(
176
+ ["Test content 1", "Test content 2"]
177
+ )
178
+ mock_vector_db.add_embeddings.assert_called_once()
179
+
180
+ @patch("src.ingestion.ingestion_pipeline.VectorDatabase")
181
+ @patch("src.ingestion.ingestion_pipeline.EmbeddingService")
182
+ def test_store_embeddings_batch_error_handling(
183
+ self, mock_embedding_service_class, mock_vector_db_class
184
+ ):
185
+ """Test error handling in batch embedding storage"""
186
+ # Mock the classes to return mock instances
187
+ mock_embedding_service = Mock()
188
+ mock_vector_db = Mock()
189
+ mock_embedding_service_class.return_value = mock_embedding_service
190
+ mock_vector_db_class.return_value = mock_vector_db
191
+
192
+ # Configure embedding service to raise an error
193
+ mock_embedding_service.embed_texts.side_effect = Exception("Embedding error")
194
+
195
+ pipeline = IngestionPipeline(store_embeddings=True)
196
+
197
+ chunks = [
198
+ {
199
+ "content": "Test content 1",
200
+ "metadata": {"chunk_id": "test1", "source": "test1.txt"},
201
+ }
202
+ ]
203
+
204
+ # Should handle error gracefully and return 0
205
+ result = pipeline._store_embeddings_batch(chunks)
206
+ self.assertEqual(result, 0)
207
+
208
+ def test_backward_compatibility(self):
209
+ """Test that enhanced pipeline maintains backward compatibility"""
210
+ pipeline = IngestionPipeline(store_embeddings=False)
211
+ result = pipeline.process_directory(str(self.test_dir))
212
+
213
+ # Should return list for backward compatibility
214
+ self.assertIsInstance(result, list)
215
+ self.assertGreater(len(result), 0)
216
+
217
+ # First chunk should have expected structure
218
+ chunk = result[0]
219
+ self.assertIn("content", chunk)
220
+ self.assertIn("metadata", chunk)
221
+ self.assertIn("chunk_id", chunk["metadata"])
222
+
223
+ def tearDown(self):
224
+ """Clean up test fixtures"""
225
+ import shutil
226
+
227
+ shutil.rmtree(self.temp_dir, ignore_errors=True)
228
+
229
+
230
+ if __name__ == "__main__":
231
+ unittest.main()