language-learning-app/api/app/outbound/postgres/repositories/article_repository.py

207 lines
6.6 KiB
Python
Raw Permalink Normal View History

import logging
import uuid
from datetime import datetime, timezone
from typing import Protocol
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from ....domain.models.article import (
Article,
ArticleOwnership,
ArticleOwnershipRoleEnum,
ArticleTypeEnum,
)
from ..entities.article_entities import ArticleEntity, ArticleOwnershipEntity
class ArticleRepository(Protocol):
async def create(
self,
article_type: ArticleTypeEnum,
language: str,
target_complexity: str,
title: str,
text: str,
) -> Article: ...
async def get_by_id(self, article_id: uuid.UUID) -> Article | None: ...
async def update_title_and_text(
self, id: uuid.UUID, title: str, text: str
) -> Article: ...
async def update_linguistic_data(
self, id: uuid.UUID, linguistic_data: dict
) -> Article: ...
async def update_audio_key(self, id: uuid.UUID, audio_key: str) -> Article: ...
async def get_non_deleted_articles_for_owner(self, owner_id: uuid.UUID) -> list[Article]: ...
class ArticleOwnershipRepository(Protocol):
async def create(
self,
article_id: uuid.UUID,
ownership_role: ArticleOwnershipRoleEnum,
user_id: uuid.UUID,
) -> ArticleOwnership: ...
async def get_by_article_id(
self, article_id: uuid.UUID
) -> list[ArticleOwnership]: ...
def _article_to_model(entity: ArticleEntity) -> Article:
return Article(
id=str(entity.id),
article_type=ArticleTypeEnum(entity.article_type),
language=entity.language,
target_complexity=entity.target_complexity,
title=entity.title,
text=entity.text,
audio_key=entity.audio_key,
text_linguistic_data=entity.text_linguistic_data,
created_at=entity.created_at,
published_at=entity.published_at,
deleted_at=entity.deleted_at,
)
def _ownership_to_model(entity: ArticleOwnershipEntity) -> ArticleOwnership:
return ArticleOwnership(
id=str(entity.id),
article_id=str(entity.article_id),
ownership_role=ArticleOwnershipRoleEnum(entity.ownership_role),
user_id=str(entity.user_id),
created_at=entity.created_at,
deleted_at=entity.deleted_at,
)
logger = logging.getLogger(__name__)
class PostgresArticleRepository:
def __init__(self, db: AsyncSession) -> None:
self.db = db
async def create(
self,
article_type: ArticleTypeEnum,
language: str,
target_complexity: str,
title: str,
text: str,
) -> Article:
entity = ArticleEntity(
article_type=article_type.value,
language=language,
target_complexity=target_complexity,
title=title,
text=text,
created_at=datetime.now(timezone.utc),
)
self.db.add(entity)
await self.db.commit()
await self.db.refresh(entity)
return _article_to_model(entity)
async def get_by_id(self, article_id: uuid.UUID) -> Article | None:
result = await self.db.execute(
select(ArticleEntity).where(ArticleEntity.id == article_id)
)
entity = result.scalar_one_or_none()
return _article_to_model(entity) if entity else None
async def get_non_deleted_articles_for_owner(self, owner_id: uuid.UUID) -> list[Article]:
result = await self.db.execute(
select(ArticleEntity)
.join(ArticleOwnershipEntity, ArticleEntity.id == ArticleOwnershipEntity.article_id)
.where(
ArticleOwnershipEntity.user_id == owner_id,
ArticleOwnershipEntity.deleted_at.is_(None),
ArticleEntity.deleted_at.is_(None),
ArticleOwnershipEntity.ownership_role == ArticleOwnershipRoleEnum.owner.value,
)
)
entities = result.scalars().all()
return [_article_to_model(entity) for entity in entities]
async def update_title_and_text(
self, id: uuid.UUID, title: str, text: str
) -> Article:
entity = await self.db.execute(
select(ArticleEntity).where(ArticleEntity.id == id)
)
a = entity.scalar_one_or_none()
if a is None:
logger.error(
f"update_title_and_text failed, cannot find article with id '{id}'"
)
raise
a.title = title
a.text = text
await self.db.commit()
logger.info(f"update_title_and_text for article '{id}' successful")
return _article_to_model(a)
async def update_linguistic_data(
self, id: uuid.UUID, linguistic_data: dict
) -> Article:
e = await self.db.execute(select(ArticleEntity).where(ArticleEntity.id == id))
a = e.scalar_one_or_none()
if a is None:
logger.error(
f"update_linguistic_data failed, cannot find article with id '{id}'"
)
raise
a.text_linguistic_data = linguistic_data
await self.db.commit()
logger.info(f"update_linguistic_data for article '{id}' successful")
return _article_to_model(a)
async def update_audio_key(self, id: uuid.UUID, audio_key: str) -> Article:
e = await self.db.execute(select(ArticleEntity).where(ArticleEntity.id == id))
a = e.scalar_one_or_none()
if a is None:
logger.error(f"update_audio_key failed, cannot find article with id '{id}'")
raise
a.audio_key = audio_key
await self.db.commit()
logger.info(f"update_audio_key for article '{id}' successful")
return _article_to_model(a)
class PostgresArticleOwnershipRepository:
def __init__(self, db: AsyncSession) -> None:
self.db = db
async def create(
self,
article_id: uuid.UUID,
ownership_role: ArticleOwnershipRoleEnum,
user_id: uuid.UUID,
) -> ArticleOwnership:
entity = ArticleOwnershipEntity(
article_id=article_id,
ownership_role=ownership_role.value,
user_id=user_id,
created_at=datetime.now(timezone.utc),
)
self.db.add(entity)
await self.db.commit()
await self.db.refresh(entity)
return _ownership_to_model(entity)
async def get_by_article_id(self, article_id: uuid.UUID) -> list[ArticleOwnership]:
result = await self.db.execute(
select(ArticleOwnershipEntity).where(
ArticleOwnershipEntity.article_id == article_id
)
)
return [_ownership_to_model(e) for e in result.scalars().all()]