Baktabek's picture
Upload folder using huggingface_hub
e12568c verified
"""
Infrastructure - PostgreSQL Repository Implementation
"""
from typing import List, Optional
from uuid import UUID
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.domain.entities import Document, DocumentChunk, DocumentStatus, DocumentType
from app.domain.interfaces import IChunkRepository, IDocumentRepository
from app.infrastructure.database.models import DocumentChunkModel, DocumentModel
class PostgresDocumentRepository(IDocumentRepository):
"""PostgreSQL implementation of document repository"""
def __init__(self, session: AsyncSession):
self.session = session
async def create(self, document: Document) -> Document:
"""Create new document"""
model = DocumentModel(
id=document.id,
title=document.title,
filename=document.filename,
file_type=document.file_type.value,
file_size=document.file_size,
storage_path=document.storage_path,
department=document.department,
status=document.status.value,
upload_session_id=document.upload_session_id,
uploaded_at=document.uploaded_at,
indexed_at=document.indexed_at,
doc_metadata=document.metadata,
)
self.session.add(model)
await self.session.commit()
await self.session.refresh(model)
return self._to_entity(model)
async def get_by_id(self, document_id: UUID) -> Optional[Document]:
"""Get document by ID"""
result = await self.session.execute(
select(DocumentModel).where(DocumentModel.id == document_id)
)
model = result.scalar_one_or_none()
return self._to_entity(model) if model else None
async def update(self, document: Document) -> Document:
"""Update document"""
result = await self.session.execute(
select(DocumentModel).where(DocumentModel.id == document.id)
)
model = result.scalar_one_or_none()
if not model:
raise ValueError(f"Document {document.id} not found")
model.title = document.title
model.status = document.status.value
model.indexed_at = document.indexed_at
model.doc_metadata = document.metadata
model.updated_at = document.updated_at
await self.session.commit()
await self.session.refresh(model)
return self._to_entity(model)
async def list_by_department(
self, department: str, skip: int = 0, limit: int = 100
) -> List[Document]:
"""List documents by department"""
result = await self.session.execute(
select(DocumentModel)
.where(DocumentModel.department == department)
.offset(skip)
.limit(limit)
.order_by(DocumentModel.created_at.desc())
)
models = result.scalars().all()
return [self._to_entity(model) for model in models]
async def delete(self, document_id: UUID) -> bool:
"""Delete document"""
result = await self.session.execute(
select(DocumentModel).where(DocumentModel.id == document_id)
)
model = result.scalar_one_or_none()
if not model:
return False
await self.session.delete(model)
await self.session.commit()
return True
def _to_entity(self, model: DocumentModel) -> Document:
"""Convert model to entity"""
return Document(
id=model.id,
title=model.title,
filename=model.filename,
file_type=DocumentType(model.file_type),
file_size=model.file_size,
storage_path=model.storage_path,
department=model.department,
status=DocumentStatus(model.status),
upload_session_id=model.upload_session_id,
uploaded_at=model.uploaded_at,
indexed_at=model.indexed_at,
metadata=model.doc_metadata,
created_at=model.created_at,
updated_at=model.updated_at,
)
class PostgresChunkRepository(IChunkRepository):
"""PostgreSQL implementation of chunk repository"""
def __init__(self, session: AsyncSession):
self.session = session
async def create_bulk(self, chunks: List[DocumentChunk]) -> List[DocumentChunk]:
"""Create multiple chunks"""
models = [
DocumentChunkModel(
id=chunk.id,
document_id=chunk.document_id,
chunk_index=chunk.chunk_index,
content=chunk.content,
token_count=chunk.token_count,
vector_id=chunk.vector_id,
chunk_metadata=chunk.metadata,
)
for chunk in chunks
]
self.session.add_all(models)
await self.session.commit()
return chunks
async def get_by_document_id(self, document_id: UUID) -> List[DocumentChunk]:
"""Get all chunks for a document"""
result = await self.session.execute(
select(DocumentChunkModel)
.where(DocumentChunkModel.document_id == document_id)
.order_by(DocumentChunkModel.chunk_index)
)
models = result.scalars().all()
return [self._to_entity(model) for model in models]
async def delete_by_document_id(self, document_id: UUID) -> int:
"""Delete all chunks for a document"""
result = await self.session.execute(
select(DocumentChunkModel).where(DocumentChunkModel.document_id == document_id)
)
models = result.scalars().all()
for model in models:
await self.session.delete(model)
await self.session.commit()
return len(models)
def _to_entity(self, model: DocumentChunkModel) -> DocumentChunk:
"""Convert model to entity"""
return DocumentChunk(
id=model.id,
document_id=model.document_id,
chunk_index=model.chunk_index,
content=model.content,
token_count=model.token_count,
vector_id=model.vector_id,
metadata=model.chunk_metadata,
created_at=model.created_at,
)