language-learning-app/api/app/outbound/postgres/repositories/article_repository.py
2026-06-02 21:02:50 +01:00

206 lines
6.6 KiB
Python

import logging
import uuid
from datetime import datetime, timezone
from typing import Protocol
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from ....domain.models.article import (
Article,
ArticleOwnership,
ArticleOwnershipRoleEnum,
ArticleTypeEnum,
)
from ..entities.article_entities import ArticleEntity, ArticleOwnershipEntity
class ArticleRepository(Protocol):
async def create(
self,
article_type: ArticleTypeEnum,
language: str,
target_complexity: str,
title: str,
text: str,
) -> Article: ...
async def get_by_id(self, article_id: uuid.UUID) -> Article | None: ...
async def update_title_and_text(
self, id: uuid.UUID, title: str, text: str
) -> Article: ...
async def update_linguistic_data(
self, id: uuid.UUID, linguistic_data: dict
) -> Article: ...
async def update_audio_key(self, id: uuid.UUID, audio_key: str) -> Article: ...
async def get_non_deleted_articles_for_owner(self, owner_id: uuid.UUID) -> list[Article]: ...
class ArticleOwnershipRepository(Protocol):
async def create(
self,
article_id: uuid.UUID,
ownership_role: ArticleOwnershipRoleEnum,
user_id: uuid.UUID,
) -> ArticleOwnership: ...
async def get_by_article_id(
self, article_id: uuid.UUID
) -> list[ArticleOwnership]: ...
def _article_to_model(entity: ArticleEntity) -> Article:
return Article(
id=str(entity.id),
article_type=ArticleTypeEnum(entity.article_type),
language=entity.language,
target_complexity=entity.target_complexity,
title=entity.title,
text=entity.text,
audio_key=entity.audio_key,
text_linguistic_data=entity.text_linguistic_data,
created_at=entity.created_at,
published_at=entity.published_at,
deleted_at=entity.deleted_at,
)
def _ownership_to_model(entity: ArticleOwnershipEntity) -> ArticleOwnership:
return ArticleOwnership(
id=str(entity.id),
article_id=str(entity.article_id),
ownership_role=ArticleOwnershipRoleEnum(entity.ownership_role),
user_id=str(entity.user_id),
created_at=entity.created_at,
deleted_at=entity.deleted_at,
)
logger = logging.getLogger(__name__)
class PostgresArticleRepository:
def __init__(self, db: AsyncSession) -> None:
self.db = db
async def create(
self,
article_type: ArticleTypeEnum,
language: str,
target_complexity: str,
title: str,
text: str,
) -> Article:
entity = ArticleEntity(
article_type=article_type.value,
language=language,
target_complexity=target_complexity,
title=title,
text=text,
created_at=datetime.now(timezone.utc),
)
self.db.add(entity)
await self.db.commit()
await self.db.refresh(entity)
return _article_to_model(entity)
async def get_by_id(self, article_id: uuid.UUID) -> Article | None:
result = await self.db.execute(
select(ArticleEntity).where(ArticleEntity.id == article_id)
)
entity = result.scalar_one_or_none()
return _article_to_model(entity) if entity else None
async def get_non_deleted_articles_for_owner(self, owner_id: uuid.UUID) -> list[Article]:
result = await self.db.execute(
select(ArticleEntity)
.join(ArticleOwnershipEntity, ArticleEntity.id == ArticleOwnershipEntity.article_id)
.where(
ArticleOwnershipEntity.user_id == owner_id,
ArticleOwnershipEntity.deleted_at.is_(None),
ArticleEntity.deleted_at.is_(None),
ArticleOwnershipEntity.ownership_role == ArticleOwnershipRoleEnum.owner.value,
)
)
entities = result.scalars().all()
return [_article_to_model(entity) for entity in entities]
async def update_title_and_text(
self, id: uuid.UUID, title: str, text: str
) -> Article:
entity = await self.db.execute(
select(ArticleEntity).where(ArticleEntity.id == id)
)
a = entity.scalar_one_or_none()
if a is None:
logger.error(
f"update_title_and_text failed, cannot find article with id '{id}'"
)
raise
a.title = title
a.text = text
await self.db.commit()
logger.info(f"update_title_and_text for article '{id}' successful")
return _article_to_model(a)
async def update_linguistic_data(
self, id: uuid.UUID, linguistic_data: dict
) -> Article:
e = await self.db.execute(select(ArticleEntity).where(ArticleEntity.id == id))
a = e.scalar_one_or_none()
if a is None:
logger.error(
f"update_linguistic_data failed, cannot find article with id '{id}'"
)
raise
a.text_linguistic_data = linguistic_data
await self.db.commit()
logger.info(f"update_linguistic_data for article '{id}' successful")
return _article_to_model(a)
async def update_audio_key(self, id: uuid.UUID, audio_key: str) -> Article:
e = await self.db.execute(select(ArticleEntity).where(ArticleEntity.id == id))
a = e.scalar_one_or_none()
if a is None:
logger.error(f"update_audio_key failed, cannot find article with id '{id}'")
raise
a.audio_key = audio_key
await self.db.commit()
logger.info(f"update_audio_key for article '{id}' successful")
return _article_to_model(a)
class PostgresArticleOwnershipRepository:
def __init__(self, db: AsyncSession) -> None:
self.db = db
async def create(
self,
article_id: uuid.UUID,
ownership_role: ArticleOwnershipRoleEnum,
user_id: uuid.UUID,
) -> ArticleOwnership:
entity = ArticleOwnershipEntity(
article_id=article_id,
ownership_role=ownership_role.value,
user_id=user_id,
created_at=datetime.now(timezone.utc),
)
self.db.add(entity)
await self.db.commit()
await self.db.refresh(entity)
return _ownership_to_model(entity)
async def get_by_article_id(self, article_id: uuid.UUID) -> list[ArticleOwnership]:
result = await self.db.execute(
select(ArticleOwnershipEntity).where(
ArticleOwnershipEntity.article_id == article_id
)
)
return [_ownership_to_model(e) for e in result.scalars().all()]