""" Infrastructure - PostgreSQL Repository Implementation """ from typing import List, Optional from uuid import UUID from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.domain.entities import Document, DocumentChunk, DocumentStatus, DocumentType from app.domain.interfaces import IChunkRepository, IDocumentRepository from app.infrastructure.database.models import DocumentChunkModel, DocumentModel class PostgresDocumentRepository(IDocumentRepository): """PostgreSQL implementation of document repository""" def __init__(self, session: AsyncSession): self.session = session async def create(self, document: Document) -> Document: """Create new document""" model = DocumentModel( id=document.id, title=document.title, filename=document.filename, file_type=document.file_type.value, file_size=document.file_size, storage_path=document.storage_path, department=document.department, status=document.status.value, upload_session_id=document.upload_session_id, uploaded_at=document.uploaded_at, indexed_at=document.indexed_at, doc_metadata=document.metadata, ) self.session.add(model) await self.session.commit() await self.session.refresh(model) return self._to_entity(model) async def get_by_id(self, document_id: UUID) -> Optional[Document]: """Get document by ID""" result = await self.session.execute( select(DocumentModel).where(DocumentModel.id == document_id) ) model = result.scalar_one_or_none() return self._to_entity(model) if model else None async def update(self, document: Document) -> Document: """Update document""" result = await self.session.execute( select(DocumentModel).where(DocumentModel.id == document.id) ) model = result.scalar_one_or_none() if not model: raise ValueError(f"Document {document.id} not found") model.title = document.title model.status = document.status.value model.indexed_at = document.indexed_at model.doc_metadata = document.metadata model.updated_at = document.updated_at await self.session.commit() await self.session.refresh(model) return self._to_entity(model) async def list_by_department( self, department: str, skip: int = 0, limit: int = 100 ) -> List[Document]: """List documents by department""" result = await self.session.execute( select(DocumentModel) .where(DocumentModel.department == department) .offset(skip) .limit(limit) .order_by(DocumentModel.created_at.desc()) ) models = result.scalars().all() return [self._to_entity(model) for model in models] async def delete(self, document_id: UUID) -> bool: """Delete document""" result = await self.session.execute( select(DocumentModel).where(DocumentModel.id == document_id) ) model = result.scalar_one_or_none() if not model: return False await self.session.delete(model) await self.session.commit() return True def _to_entity(self, model: DocumentModel) -> Document: """Convert model to entity""" return Document( id=model.id, title=model.title, filename=model.filename, file_type=DocumentType(model.file_type), file_size=model.file_size, storage_path=model.storage_path, department=model.department, status=DocumentStatus(model.status), upload_session_id=model.upload_session_id, uploaded_at=model.uploaded_at, indexed_at=model.indexed_at, metadata=model.doc_metadata, created_at=model.created_at, updated_at=model.updated_at, ) class PostgresChunkRepository(IChunkRepository): """PostgreSQL implementation of chunk repository""" def __init__(self, session: AsyncSession): self.session = session async def create_bulk(self, chunks: List[DocumentChunk]) -> List[DocumentChunk]: """Create multiple chunks""" models = [ DocumentChunkModel( id=chunk.id, document_id=chunk.document_id, chunk_index=chunk.chunk_index, content=chunk.content, token_count=chunk.token_count, vector_id=chunk.vector_id, chunk_metadata=chunk.metadata, ) for chunk in chunks ] self.session.add_all(models) await self.session.commit() return chunks async def get_by_document_id(self, document_id: UUID) -> List[DocumentChunk]: """Get all chunks for a document""" result = await self.session.execute( select(DocumentChunkModel) .where(DocumentChunkModel.document_id == document_id) .order_by(DocumentChunkModel.chunk_index) ) models = result.scalars().all() return [self._to_entity(model) for model in models] async def delete_by_document_id(self, document_id: UUID) -> int: """Delete all chunks for a document""" result = await self.session.execute( select(DocumentChunkModel).where(DocumentChunkModel.document_id == document_id) ) models = result.scalars().all() for model in models: await self.session.delete(model) await self.session.commit() return len(models) def _to_entity(self, model: DocumentChunkModel) -> DocumentChunk: """Convert model to entity""" return DocumentChunk( id=model.id, document_id=model.document_id, chunk_index=model.chunk_index, content=model.content, token_count=model.token_count, vector_id=model.vector_id, metadata=model.chunk_metadata, created_at=model.created_at, )