Spaces:
Sleeping
Sleeping
| """ | |
| Infrastructure - PostgreSQL Repository Implementation | |
| """ | |
| from typing import List, Optional | |
| from uuid import UUID | |
| from sqlalchemy import select | |
| from sqlalchemy.ext.asyncio import AsyncSession | |
| from app.domain.entities import Document, DocumentChunk, DocumentStatus, DocumentType | |
| from app.domain.interfaces import IChunkRepository, IDocumentRepository | |
| from app.infrastructure.database.models import DocumentChunkModel, DocumentModel | |
| class PostgresDocumentRepository(IDocumentRepository): | |
| """PostgreSQL implementation of document repository""" | |
| def __init__(self, session: AsyncSession): | |
| self.session = session | |
| async def create(self, document: Document) -> Document: | |
| """Create new document""" | |
| model = DocumentModel( | |
| id=document.id, | |
| title=document.title, | |
| filename=document.filename, | |
| file_type=document.file_type.value, | |
| file_size=document.file_size, | |
| storage_path=document.storage_path, | |
| department=document.department, | |
| status=document.status.value, | |
| upload_session_id=document.upload_session_id, | |
| uploaded_at=document.uploaded_at, | |
| indexed_at=document.indexed_at, | |
| doc_metadata=document.metadata, | |
| ) | |
| self.session.add(model) | |
| await self.session.commit() | |
| await self.session.refresh(model) | |
| return self._to_entity(model) | |
| async def get_by_id(self, document_id: UUID) -> Optional[Document]: | |
| """Get document by ID""" | |
| result = await self.session.execute( | |
| select(DocumentModel).where(DocumentModel.id == document_id) | |
| ) | |
| model = result.scalar_one_or_none() | |
| return self._to_entity(model) if model else None | |
| async def update(self, document: Document) -> Document: | |
| """Update document""" | |
| result = await self.session.execute( | |
| select(DocumentModel).where(DocumentModel.id == document.id) | |
| ) | |
| model = result.scalar_one_or_none() | |
| if not model: | |
| raise ValueError(f"Document {document.id} not found") | |
| model.title = document.title | |
| model.status = document.status.value | |
| model.indexed_at = document.indexed_at | |
| model.doc_metadata = document.metadata | |
| model.updated_at = document.updated_at | |
| await self.session.commit() | |
| await self.session.refresh(model) | |
| return self._to_entity(model) | |
| async def list_by_department( | |
| self, department: str, skip: int = 0, limit: int = 100 | |
| ) -> List[Document]: | |
| """List documents by department""" | |
| result = await self.session.execute( | |
| select(DocumentModel) | |
| .where(DocumentModel.department == department) | |
| .offset(skip) | |
| .limit(limit) | |
| .order_by(DocumentModel.created_at.desc()) | |
| ) | |
| models = result.scalars().all() | |
| return [self._to_entity(model) for model in models] | |
| async def delete(self, document_id: UUID) -> bool: | |
| """Delete document""" | |
| result = await self.session.execute( | |
| select(DocumentModel).where(DocumentModel.id == document_id) | |
| ) | |
| model = result.scalar_one_or_none() | |
| if not model: | |
| return False | |
| await self.session.delete(model) | |
| await self.session.commit() | |
| return True | |
| def _to_entity(self, model: DocumentModel) -> Document: | |
| """Convert model to entity""" | |
| return Document( | |
| id=model.id, | |
| title=model.title, | |
| filename=model.filename, | |
| file_type=DocumentType(model.file_type), | |
| file_size=model.file_size, | |
| storage_path=model.storage_path, | |
| department=model.department, | |
| status=DocumentStatus(model.status), | |
| upload_session_id=model.upload_session_id, | |
| uploaded_at=model.uploaded_at, | |
| indexed_at=model.indexed_at, | |
| metadata=model.doc_metadata, | |
| created_at=model.created_at, | |
| updated_at=model.updated_at, | |
| ) | |
| class PostgresChunkRepository(IChunkRepository): | |
| """PostgreSQL implementation of chunk repository""" | |
| def __init__(self, session: AsyncSession): | |
| self.session = session | |
| async def create_bulk(self, chunks: List[DocumentChunk]) -> List[DocumentChunk]: | |
| """Create multiple chunks""" | |
| models = [ | |
| DocumentChunkModel( | |
| id=chunk.id, | |
| document_id=chunk.document_id, | |
| chunk_index=chunk.chunk_index, | |
| content=chunk.content, | |
| token_count=chunk.token_count, | |
| vector_id=chunk.vector_id, | |
| chunk_metadata=chunk.metadata, | |
| ) | |
| for chunk in chunks | |
| ] | |
| self.session.add_all(models) | |
| await self.session.commit() | |
| return chunks | |
| async def get_by_document_id(self, document_id: UUID) -> List[DocumentChunk]: | |
| """Get all chunks for a document""" | |
| result = await self.session.execute( | |
| select(DocumentChunkModel) | |
| .where(DocumentChunkModel.document_id == document_id) | |
| .order_by(DocumentChunkModel.chunk_index) | |
| ) | |
| models = result.scalars().all() | |
| return [self._to_entity(model) for model in models] | |
| async def delete_by_document_id(self, document_id: UUID) -> int: | |
| """Delete all chunks for a document""" | |
| result = await self.session.execute( | |
| select(DocumentChunkModel).where(DocumentChunkModel.document_id == document_id) | |
| ) | |
| models = result.scalars().all() | |
| for model in models: | |
| await self.session.delete(model) | |
| await self.session.commit() | |
| return len(models) | |
| def _to_entity(self, model: DocumentChunkModel) -> DocumentChunk: | |
| """Convert model to entity""" | |
| return DocumentChunk( | |
| id=model.id, | |
| document_id=model.document_id, | |
| chunk_index=model.chunk_index, | |
| content=model.content, | |
| token_count=model.token_count, | |
| vector_id=model.vector_id, | |
| metadata=model.chunk_metadata, | |
| created_at=model.created_at, | |
| ) | |