""" Tests for enhanced ingestion pipeline with embeddings """ import tempfile import unittest from pathlib import Path from unittest.mock import Mock, patch from src.ingestion.ingestion_pipeline import IngestionPipeline class TestEnhancedIngestionPipeline(unittest.TestCase): """Test cases for enhanced IngestionPipeline with embeddings""" def setUp(self): """Set up test fixtures""" self.temp_dir = tempfile.mkdtemp() self.test_dir = Path(self.temp_dir) # Create test files self.test_file1 = self.test_dir / "test1.md" self.test_file1.write_text( "# Test Document 1\n\nThis is test content for document 1." ) self.test_file2 = self.test_dir / "test2.txt" self.test_file2.write_text("This is test content for document 2.") # Create an unsupported file (should be skipped) self.test_file3 = self.test_dir / "test3.pdf" self.test_file3.write_text("PDF content") def test_initialization_without_embeddings(self): """Test pipeline initialization without embeddings""" pipeline = IngestionPipeline(store_embeddings=False) self.assertIsNotNone(pipeline.parser) self.assertIsNotNone(pipeline.chunker) self.assertFalse(pipeline.store_embeddings) self.assertIsNone(pipeline.embedding_service) self.assertIsNone(pipeline.vector_db) def test_initialization_with_embeddings(self): """Test pipeline initialization with embeddings""" pipeline = IngestionPipeline(store_embeddings=True) self.assertIsNotNone(pipeline.parser) self.assertIsNotNone(pipeline.chunker) self.assertTrue(pipeline.store_embeddings) self.assertIsNotNone(pipeline.embedding_service) self.assertIsNotNone(pipeline.vector_db) def test_initialization_with_custom_components(self): """Test pipeline initialization with custom embedding components""" mock_embedding_service = Mock() mock_vector_db = Mock() pipeline = IngestionPipeline( store_embeddings=True, embedding_service=mock_embedding_service, vector_db=mock_vector_db, ) self.assertEqual(pipeline.embedding_service, mock_embedding_service) self.assertEqual(pipeline.vector_db, mock_vector_db) def test_process_directory_without_embeddings(self): """Test directory processing without embeddings""" pipeline = IngestionPipeline(store_embeddings=False) result = pipeline.process_directory_with_embeddings(str(self.test_dir)) # Check response structure self.assertIsInstance(result, dict) self.assertEqual(result["status"], "success") self.assertGreater(result["chunks_processed"], 0) self.assertEqual(result["files_processed"], 2) # Only .md and .txt files self.assertEqual(result["embeddings_stored"], 0) self.assertFalse(result["store_embeddings"]) self.assertIn("chunks", result) @patch("src.ingestion.ingestion_pipeline.VectorDatabase") @patch("src.ingestion.ingestion_pipeline.EmbeddingService") def test_process_directory_with_embeddings( self, mock_embedding_service_class, mock_vector_db_class ): """Test directory processing with embeddings""" # Mock the classes to return mock instances mock_embedding_service = Mock() mock_vector_db = Mock() mock_embedding_service_class.return_value = mock_embedding_service mock_vector_db_class.return_value = mock_vector_db # Configure mock embedding service mock_embedding_service.embed_texts.return_value = [ [0.1, 0.2, 0.3], [0.4, 0.5, 0.6], ] # Configure mock vector database mock_vector_db.add_embeddings.return_value = True pipeline = IngestionPipeline(store_embeddings=True) result = pipeline.process_directory_with_embeddings(str(self.test_dir)) # Check response structure self.assertIsInstance(result, dict) self.assertEqual(result["status"], "success") self.assertGreater(result["chunks_processed"], 0) self.assertEqual(result["files_processed"], 2) self.assertGreater(result["embeddings_stored"], 0) self.assertTrue(result["store_embeddings"]) # Verify embedding service was called mock_embedding_service.embed_texts.assert_called() mock_vector_db.add_embeddings.assert_called() def test_process_directory_nonexistent(self): """Test processing non-existent directory""" pipeline = IngestionPipeline(store_embeddings=False) with self.assertRaises(FileNotFoundError): pipeline.process_directory("/nonexistent/directory") def test_store_embeddings_batch_without_components(self): """Test batch embedding storage without embedding components""" pipeline = IngestionPipeline(store_embeddings=False) chunks = [ { "content": "Test content 1", "metadata": {"chunk_id": "test1", "source": "test1.txt"}, } ] result = pipeline._store_embeddings_batch(chunks) self.assertEqual(result, 0) @patch("src.ingestion.ingestion_pipeline.VectorDatabase") @patch("src.ingestion.ingestion_pipeline.EmbeddingService") def test_store_embeddings_batch_success( self, mock_embedding_service_class, mock_vector_db_class ): """Test successful batch embedding storage""" # Mock the classes to return mock instances mock_embedding_service = Mock() mock_vector_db = Mock() mock_embedding_service_class.return_value = mock_embedding_service mock_vector_db_class.return_value = mock_vector_db # Configure mocks mock_embedding_service.embed_texts.return_value = [ [0.1, 0.2, 0.3], [0.4, 0.5, 0.6], ] mock_vector_db.add_embeddings.return_value = True pipeline = IngestionPipeline(store_embeddings=True) chunks = [ { "content": "Test content 1", "metadata": {"chunk_id": "test1", "source": "test1.txt"}, }, { "content": "Test content 2", "metadata": {"chunk_id": "test2", "source": "test2.txt"}, }, ] result = pipeline._store_embeddings_batch(chunks) self.assertEqual(result, 2) # Verify method calls mock_embedding_service.embed_texts.assert_called_once_with( ["Test content 1", "Test content 2"] ) mock_vector_db.add_embeddings.assert_called_once() @patch("src.ingestion.ingestion_pipeline.VectorDatabase") @patch("src.ingestion.ingestion_pipeline.EmbeddingService") def test_store_embeddings_batch_error_handling( self, mock_embedding_service_class, mock_vector_db_class ): """Test error handling in batch embedding storage""" # Mock the classes to return mock instances mock_embedding_service = Mock() mock_vector_db = Mock() mock_embedding_service_class.return_value = mock_embedding_service mock_vector_db_class.return_value = mock_vector_db # Configure embedding service to raise an error mock_embedding_service.embed_texts.side_effect = Exception("Embedding error") pipeline = IngestionPipeline(store_embeddings=True) chunks = [ { "content": "Test content 1", "metadata": {"chunk_id": "test1", "source": "test1.txt"}, } ] # Should handle error gracefully and return 0 result = pipeline._store_embeddings_batch(chunks) self.assertEqual(result, 0) def test_backward_compatibility(self): """Test that enhanced pipeline maintains backward compatibility""" pipeline = IngestionPipeline(store_embeddings=False) result = pipeline.process_directory(str(self.test_dir)) # Should return list for backward compatibility self.assertIsInstance(result, list) self.assertGreater(len(result), 0) # First chunk should have expected structure chunk = result[0] self.assertIn("content", chunk) self.assertIn("metadata", chunk) self.assertIn("chunk_id", chunk["metadata"]) def tearDown(self): """Clean up test fixtures""" import shutil shutil.rmtree(self.temp_dir, ignore_errors=True) if __name__ == "__main__": unittest.main()