diff --git a/api/alembic/versions/20260329_0006_separate_job_and_article.py b/api/alembic/versions/20260329_0006_separate_job_and_article.py new file mode 100644 index 0000000..47ff762 --- /dev/null +++ b/api/alembic/versions/20260329_0006_separate_job_and_article.py @@ -0,0 +1,74 @@ +"""separate job orchestration from article content + +Revision ID: 0006 +Revises: 0005 +Create Date: 2026-03-29 + +""" +from typing import Sequence, Union + +import sqlalchemy as sa +from alembic import op +from sqlalchemy.dialects import postgresql + +revision: str = "0006" +down_revision: Union[str, None] = "0005" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + # Make article content fields nullable — they are now filled in step-by-step + op.alter_column("translated_articles", "source_title", nullable=True) + op.alter_column("translated_articles", "source_body", nullable=True) + op.alter_column("translated_articles", "target_title", nullable=True) + op.alter_column("translated_articles", "target_body", nullable=True) + + # Add source_body_pos to translated_articles + op.add_column( + "translated_articles", + sa.Column("source_body_pos", postgresql.JSONB(), nullable=True), + ) + + # Add FK from jobs to the article they produce + op.add_column( + "jobs", + sa.Column( + "translated_article_id", + postgresql.UUID(as_uuid=True), + sa.ForeignKey("translated_articles.id"), + nullable=True, + ), + ) + + # Drop content columns that now live on translated_articles + op.drop_column("jobs", "source_language") + op.drop_column("jobs", "target_language") + op.drop_column("jobs", "complexity_level") + op.drop_column("jobs", "input_summary") + op.drop_column("jobs", "generated_text") + op.drop_column("jobs", "translated_text") + op.drop_column("jobs", "audio_url") + op.drop_column("jobs", "source_pos_data") + op.drop_column("jobs", "target_pos_data") + op.drop_column("jobs", "audio_transcript") + + +def downgrade() -> None: + op.add_column("jobs", sa.Column("audio_transcript", postgresql.JSONB(), nullable=True)) + op.add_column("jobs", sa.Column("target_pos_data", postgresql.JSONB(), nullable=True)) + op.add_column("jobs", sa.Column("source_pos_data", postgresql.JSONB(), nullable=True)) + op.add_column("jobs", sa.Column("audio_url", sa.Text(), nullable=True)) + op.add_column("jobs", sa.Column("translated_text", sa.Text(), nullable=True)) + op.add_column("jobs", sa.Column("generated_text", sa.Text(), nullable=True)) + op.add_column("jobs", sa.Column("input_summary", sa.Text(), nullable=True)) + op.add_column("jobs", sa.Column("complexity_level", sa.String(5), nullable=False, server_default="B1")) + op.add_column("jobs", sa.Column("target_language", sa.String(10), nullable=False, server_default="fr")) + op.add_column("jobs", sa.Column("source_language", sa.String(10), nullable=False, server_default="en")) + + op.drop_column("jobs", "translated_article_id") + + op.alter_column("translated_articles", "target_body", nullable=False) + op.alter_column("translated_articles", "target_title", nullable=False) + op.alter_column("translated_articles", "source_body", nullable=False) + op.alter_column("translated_articles", "source_title", nullable=False) diff --git a/api/app/domain/models/summarise_job.py b/api/app/domain/models/summarise_job.py index 2fd8868..f6e0afc 100644 --- a/api/app/domain/models/summarise_job.py +++ b/api/app/domain/models/summarise_job.py @@ -7,17 +7,8 @@ class SummariseJob: id: str user_id: str status: str - source_language: str - target_language: str - complexity_level: str - input_summary: str - generated_text: str - translated_text: str - error_message: str - audio_url: str - source_pos_data: dict | None - target_pos_data: dict | None - audio_transcript: dict | None + translated_article_id: str | None + error_message: str | None created_at: datetime started_at: datetime | None = None completed_at: datetime | None = None diff --git a/api/app/domain/models/translated_article.py b/api/app/domain/models/translated_article.py index 2800839..a666ddd 100644 --- a/api/app/domain/models/translated_article.py +++ b/api/app/domain/models/translated_article.py @@ -7,12 +7,14 @@ class TranslatedArticle: id: str published_at: datetime source_language: str - source_title: str - source_body: str target_language: str target_complexities: list[str] - target_title: str - target_body: str + # Content fields — filled in step-by-step during generation + source_title: str | None + source_body: str | None + source_body_pos: dict | None + target_title: str | None + target_body: str | None audio_url: str | None target_body_pos: dict | None target_body_transcript: dict | None diff --git a/api/app/domain/services/article_service.py b/api/app/domain/services/article_service.py deleted file mode 100644 index f525e6e..0000000 --- a/api/app/domain/services/article_service.py +++ /dev/null @@ -1,19 +0,0 @@ -import re -from sqlalchemy.ext.asyncio import AsyncSession - -from ..models.summarise_job import SummariseJob -from ..models.translated_article import TranslatedArticle -from ...outbound.postgres.repositories.translated_article_repository import TranslatedArticleRepository - -def first_heading(md: str) -> str | None: - m = re.search(r'^#{1,2}\s+(.+)', md, re.MULTILINE) - return m.group(1).strip() if m else None - -class ArticleService: - def __init__(self, db: AsyncSession) -> None: - self.translated_articles_repository = TranslatedArticleRepository(db) - - async def get_all_articles(self, target_language: str) -> list[TranslatedArticle]: - """Fetch all translated articles""" - articles = await self.translated_articles_repository.list_all(target_language) - return articles \ No newline at end of file diff --git a/api/app/domain/services/summarise_service.py b/api/app/domain/services/summarise_service.py new file mode 100644 index 0000000..b866c51 --- /dev/null +++ b/api/app/domain/services/summarise_service.py @@ -0,0 +1,149 @@ +import asyncio +import random +import re +import uuid +from typing import Any, Callable, Coroutine + +import anthropic +from sqlalchemy.ext.asyncio import AsyncSession + +from ...outbound.postgres.repositories import summarise_job_repository +from ...outbound.postgres.repositories.translated_article_repository import TranslatedArticleRepository +from ...outbound.anthropic.anthropic_client import AnthropicClient +from ...outbound.deepgram.deepgram_client import LocalDeepgramClient +from ...outbound.deepl.deepl_client import DeepLClient +from ...outbound.gemini.gemini_client import GeminiClient +from ...outbound.spacy.spacy_client import SpacyClient +from ...storage import upload_audio +from ...languages import SUPPORTED_LANGUAGES + + + +_ANTHROPIC_RETRYABLE = ( + anthropic.RateLimitError, + anthropic.InternalServerError, + anthropic.APITimeoutError, + anthropic.APIConnectionError, +) +_MAX_RETRIES = 4 +_BASE_DELAY = 1.0 +_MAX_DELAY = 60.0 + + +async def _anthropic_with_backoff( + coro_fn: Callable[..., Coroutine[Any, Any, Any]], + *args: Any, + **kwargs: Any, +) -> Any: + for attempt in range(_MAX_RETRIES + 1): + try: + return await coro_fn(*args, **kwargs) + except _ANTHROPIC_RETRYABLE as exc: + if attempt == _MAX_RETRIES: + raise + retry_after: float | None = None + if isinstance(exc, anthropic.RateLimitError): + raw = exc.response.headers.get("retry-after") + if raw is not None: + retry_after = float(raw) + if retry_after is None: + retry_after = min(_BASE_DELAY * (2 ** attempt), _MAX_DELAY) + jittered = retry_after * (0.8 + random.random() * 0.4) + await asyncio.sleep(jittered) + + +class SummariseService: + def __init__( + self, + anthropic_client: AnthropicClient, + deepgram_client: LocalDeepgramClient, + deepl_client: DeepLClient, + gemini_client: GeminiClient, + spacy_client: SpacyClient, + ) -> None: + self.anthropic_client = anthropic_client + self.deepgram_client = deepgram_client + self.deepl_client = deepl_client + self.gemini_client = gemini_client + self.spacy_client = spacy_client + + def _first_heading(self, md: str) -> str | None: + m = re.search(r'^#{1,2}\s+(.+)', md, re.MULTILINE) + return m.group(1).strip() if m else None + + def _split_title_and_body(self, text: str) -> tuple[str, str]: + """Splits the text into a title (first heading) and body (the rest).""" + title = self._first_heading(text) or "" + body = text[len(title):].lstrip() if title else text + if title == "": + title = "Untitled Article" + + return title, body + + async def run( + self, + db: AsyncSession, + job_id: uuid.UUID, + article_id: uuid.UUID, + source_language: str, + target_language: str, + complexity_level: str, + input_texts: list[str], + ) -> None: + article_repo = TranslatedArticleRepository(db) + job = await summarise_job_repository.get_by_id(db, job_id) + await summarise_job_repository.mark_processing(db, job) + + try: + language_name = SUPPORTED_LANGUAGES[target_language] + source_material = "\n\n".join(input_texts[:3]) + + generated_text = await _anthropic_with_backoff( + self.anthropic_client.generate_summary_text, + content_to_summarise=source_material, + complexity_level=complexity_level, + from_language=language_name, + to_language=language_name, + length_preference="200-400 words", + ) + + generated_title, generated_text_without_title = self._split_title_and_body(generated_text) + + await article_repo.update_content( + article_id, + target_title=generated_title, + target_body=generated_text_without_title, + source_title="", + source_body="", + ) + + translated_text = await self.deepl_client.translate(generated_text, source_language) + + translated_title, translated_text_without_title = self._split_title_and_body(translated_text) + + await article_repo.update_content( + article_id, + target_title=generated_title, + target_body=generated_text_without_title, + source_title=translated_title, + source_body=translated_text_without_title, + ) + + target_pos_data = self.spacy_client.get_parts_of_speech(generated_text_without_title, target_language) + source_pos_data = self.spacy_client.get_parts_of_speech(translated_text_without_title, source_language) + + await article_repo.update_pos(article_id, target_pos_data, source_pos_data) + + voice = self.gemini_client.get_voice_by_language(target_language) + wav_bytes = await self.gemini_client.generate_audio(generated_text, voice) + audio_key = f"audio/{job_id}.wav" + upload_audio(audio_key, wav_bytes) + + transcript = await self.deepgram_client.transcribe_bytes(wav_bytes, target_language) + + await article_repo.update_audio(article_id, audio_key, transcript) + + await summarise_job_repository.mark_succeeded(db, job) + + except Exception as exc: + await summarise_job_repository.mark_failed(db, job, str(exc)) diff --git a/api/app/outbound/anthropic/anthropic_client.py b/api/app/outbound/anthropic/anthropic_client.py index fe4958b..cd7f600 100644 --- a/api/app/outbound/anthropic/anthropic_client.py +++ b/api/app/outbound/anthropic/anthropic_client.py @@ -19,15 +19,15 @@ class AnthropicClient(): ) -> str: return ( f"You are a language learning content creator.\n" - f"You generate markdown summaries of user-provided content.\n" + f"You generate original, level-appropriate content from a source.\n" + f"The content will be spoken aloud in {to_language}, write it accordingly.\n" f"You will provide content in {to_language} at {complexity_level} proficiency level on the CEFR scale.\n" f"The text you generate will:\n" f"- Contain ONLY the generated summary text in {to_language}.\n" - f"- Never contain inappropriate (hateful, sexual, violent) content. It is preferable to return no text than to generate such content.\n" f"- Speak directly to the reader/listener, adopting the tone and style of a semi-formal news reporter or podcaster.\n" - f"- Where appropriate (fluency level, content), use a small number of idiomatic expressions.\n" - f"- Where appropriate use at least one additional tense, beyond the default of the content.\n" - f"- Be formatted in markdown. Contain a single level 1 header (#) at the top, followed by paragraphs and line breaks.\n" + f"- Occasionally, where natural, include idiomatic expressions appropriate to {complexity_level} level.\n" + f"- Vary tense usage naturally — do not restrict the piece to a single tense.\n" + f"- Contain only plain text. The piece should start with a title prefaced like a level-1 markdown title (#), but all other text should be plain. \n" f"- Be around {length_preference} long.\n" f"- Be inspired by the content, but not the tone, of the source material." ) diff --git a/api/app/outbound/deepgram/deepgram_client.py b/api/app/outbound/deepgram/deepgram_client.py index cb81a14..8efd748 100644 --- a/api/app/outbound/deepgram/deepgram_client.py +++ b/api/app/outbound/deepgram/deepgram_client.py @@ -1,4 +1,5 @@ -import asyncio +import json + from deepgram import ( AsyncDeepgramClient, ) @@ -15,7 +16,7 @@ class LocalDeepgramClient: utterances=True, smart_format=True, ) - return response.results.json() + return json.loads(response.results.json()) async def transcribe_local_file(self, local_file_path: str, language_code: str) -> dict: with open(local_file_path, "rb") as audio_file: diff --git a/api/app/outbound/postgres/entities/summarise_job_entity.py b/api/app/outbound/postgres/entities/summarise_job_entity.py index ee43640..9d19b1d 100644 --- a/api/app/outbound/postgres/entities/summarise_job_entity.py +++ b/api/app/outbound/postgres/entities/summarise_job_entity.py @@ -3,10 +3,11 @@ from datetime import datetime, timezone from sqlalchemy import String, Text, DateTime, ForeignKey from sqlalchemy.orm import Mapped, mapped_column -from sqlalchemy.dialects.postgresql import UUID, JSONB +from sqlalchemy.dialects.postgresql import UUID from ..database import Base + class SummariseJobEntity(Base): __tablename__ = "jobs" @@ -17,17 +18,10 @@ class SummariseJobEntity(Base): UUID(as_uuid=True), ForeignKey("users.id"), nullable=True, index=True ) status: Mapped[str] = mapped_column(String(20), nullable=False, default="pending") - source_language: Mapped[str] = mapped_column(String(10), nullable=False, default="en") - target_language: Mapped[str] = mapped_column(String(10), nullable=False) - complexity_level: Mapped[str] = mapped_column(String(5), nullable=False) - input_summary: Mapped[str | None] = mapped_column(Text, nullable=True) - generated_text: Mapped[str | None] = mapped_column(Text, nullable=True) - translated_text: Mapped[str | None] = mapped_column(Text, nullable=True) + translated_article_id: Mapped[uuid.UUID | None] = mapped_column( + UUID(as_uuid=True), ForeignKey("translated_articles.id"), nullable=True + ) error_message: Mapped[str | None] = mapped_column(Text, nullable=True) - audio_url: Mapped[str | None] = mapped_column(Text, nullable=True) - source_pos_data: Mapped[dict | None] = mapped_column(JSONB, nullable=True) - target_pos_data: Mapped[dict | None] = mapped_column(JSONB, nullable=True) - audio_transcript: Mapped[dict | None] = mapped_column(JSONB, nullable=True) created_at: Mapped[datetime] = mapped_column( DateTime(timezone=True), default=lambda: datetime.now(timezone.utc), diff --git a/api/app/outbound/postgres/entities/translated_article_entity.py b/api/app/outbound/postgres/entities/translated_article_entity.py index 9d14eb2..95073f6 100644 --- a/api/app/outbound/postgres/entities/translated_article_entity.py +++ b/api/app/outbound/postgres/entities/translated_article_entity.py @@ -19,12 +19,14 @@ class TranslatedArticleEntity(Base): default=lambda: datetime.now(timezone.utc), ) source_language: Mapped[str] = mapped_column(String(10), nullable=False) - source_title: Mapped[str] = mapped_column(Text, nullable=False) - source_body: Mapped[str] = mapped_column(Text, nullable=False) target_language: Mapped[str] = mapped_column(String(10), nullable=False) target_complexities: Mapped[list[str]] = mapped_column(ARRAY(String(5)), nullable=False) - target_title: Mapped[str] = mapped_column(Text, nullable=False) - target_body: Mapped[str] = mapped_column(Text, nullable=False) + # Content fields — nullable, filled in step-by-step during generation + source_title: Mapped[str | None] = mapped_column(Text, nullable=True) + source_body: Mapped[str | None] = mapped_column(Text, nullable=True) + source_body_pos: Mapped[dict | None] = mapped_column(JSONB, nullable=True) + target_title: Mapped[str | None] = mapped_column(Text, nullable=True) + target_body: Mapped[str | None] = mapped_column(Text, nullable=True) audio_url: Mapped[str | None] = mapped_column(Text, nullable=True) target_body_pos: Mapped[dict | None] = mapped_column(JSONB, nullable=True) target_body_transcript: Mapped[dict | None] = mapped_column(JSONB, nullable=True) diff --git a/api/app/outbound/postgres/repositories/summarise_job_repository.py b/api/app/outbound/postgres/repositories/summarise_job_repository.py index bda311a..fbc081b 100644 --- a/api/app/outbound/postgres/repositories/summarise_job_repository.py +++ b/api/app/outbound/postgres/repositories/summarise_job_repository.py @@ -7,69 +7,32 @@ from sqlalchemy.ext.asyncio import AsyncSession from ..entities.summarise_job_entity import SummariseJobEntity from ....domain.models.summarise_job import SummariseJob -class PostgresSummariseJobRepository: - def __init__(self, db: AsyncSession): - self.db = db - async def list_all(self) -> list[SummariseJob]: - result = self.db.execute( - select(SummariseJobEntity).order_by(SummariseJobEntity.created_at.desc()) - ) +def _to_model(entity: SummariseJobEntity) -> SummariseJob: + return SummariseJob( + id=str(entity.id), + user_id=str(entity.user_id), + status=entity.status, + translated_article_id=str(entity.translated_article_id) if entity.translated_article_id else None, + error_message=entity.error_message, + created_at=entity.created_at, + started_at=entity.started_at, + completed_at=entity.completed_at, + ) - return list(result.scalars().all()).map(self.entity_to_model) - async def get_by_audio_url( - self, - audio_url: str - ) -> SummariseJob | None: - result = await self.db.execute( - select(SummariseJobEntity).where( - SummariseJobEntity.audio_url == audio_url - ) - ) - - return self.entity_to_model(result.scalar_one_or_none()) - - def entity_to_model(self, entity: SummariseJobEntity | None) -> SummariseJob: - if entity is None: - return None - - return SummariseJob( - id=str(entity.id), - user_id=str(entity.user_id), - status=entity.status, - source_language=entity.source_language, - target_language=entity.target_language, - complexity_level=entity.complexity_level, - input_summary=entity.input_summary, - generated_text=entity.generated_text, - translated_text=entity.translated_text, - error_message=entity.error_message, - audio_url=entity.audio_url, - source_pos_data=entity.source_pos_data, - target_pos_data=entity.target_pos_data, - audio_transcript=entity.audio_transcript, - created_at=entity.created_at, - started_at=entity.started_at, - completed_at=entity.completed_at, - ) - -async def update(db: AsyncSession, job: SummariseJobEntity) -> None: +async def _commit(db: AsyncSession) -> None: await db.commit() async def create( db: AsyncSession, user_id: uuid.UUID, - source_language: str, - target_language: str, - complexity_level: str, + translated_article_id: uuid.UUID, ) -> SummariseJobEntity: job = SummariseJobEntity( user_id=user_id, - source_language=source_language, - target_language=target_language, - complexity_level=complexity_level, + translated_article_id=translated_article_id, ) db.add(job) await db.commit() @@ -92,58 +55,17 @@ async def mark_processing(db: AsyncSession, job: SummariseJobEntity) -> None: job.status = "processing" job.started_at = datetime.now(timezone.utc) job.error_message = None - await update(db, job) + await _commit(db) -async def save_generated_text( - db: AsyncSession, - job: SummariseJobEntity, - generated_text: str, - input_summary: str, -) -> None: - job.generated_text = generated_text - job.input_summary = input_summary - await update(db, job) - - -async def save_translated_text( - db: AsyncSession, - job: SummariseJobEntity, - translated_text: str, -) -> None: - job.translated_text = translated_text - await update(db, job) - - -async def save_pos_data( - db: AsyncSession, - job: SummariseJobEntity, - source_pos_data: dict, - target_pos_data: dict, -) -> None: - job.source_pos_data = source_pos_data - job.target_pos_data = target_pos_data - await update(db, job) - - -async def save_audio_transcript( - db: AsyncSession, - job: SummariseJobEntity, - audio_transcript: dict, -) -> None: - job.audio_transcript = audio_transcript - await update(db, job) - - -async def mark_succeeded(db: AsyncSession, job: SummariseJobEntity, audio_url: str) -> None: +async def mark_succeeded(db: AsyncSession, job: SummariseJobEntity) -> None: job.status = "succeeded" - job.audio_url = audio_url job.completed_at = datetime.now(timezone.utc) - await update(db, job) + await _commit(db) async def mark_failed(db: AsyncSession, job: SummariseJobEntity, error: str) -> None: job.status = "failed" job.error_message = error job.completed_at = datetime.now(timezone.utc) - await update(db, job) + await _commit(db) diff --git a/api/app/outbound/postgres/repositories/translated_article_repository.py b/api/app/outbound/postgres/repositories/translated_article_repository.py index daf9513..ac7d2a8 100644 --- a/api/app/outbound/postgres/repositories/translated_article_repository.py +++ b/api/app/outbound/postgres/repositories/translated_article_repository.py @@ -5,6 +5,7 @@ from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from ..entities.translated_article_entity import TranslatedArticleEntity +from ..entities.summarise_job_entity import SummariseJobEntity from ....domain.models.translated_article import TranslatedArticle @@ -17,10 +18,11 @@ class TranslatedArticleRepository: id=str(entity.id), published_at=entity.published_at, source_language=entity.source_language, - source_title=entity.source_title, - source_body=entity.source_body, target_language=entity.target_language, target_complexities=list(entity.target_complexities), + source_title=entity.source_title, + source_body=entity.source_body, + source_body_pos=entity.source_body_pos, target_title=entity.target_title, target_body=entity.target_body, audio_url=entity.audio_url, @@ -31,42 +33,93 @@ class TranslatedArticleRepository: async def create( self, source_language: str, - source_title: str, - source_body: str, target_language: str, target_complexities: list[str], - target_title: str, - target_body: str, - audio_url: str | None, - target_body_pos: dict | None, - target_body_transcript: dict | None, ) -> TranslatedArticle: entity = TranslatedArticleEntity( published_at=datetime.now(timezone.utc), source_language=source_language, - source_title=source_title, - source_body=source_body, target_language=target_language, target_complexities=target_complexities, - target_title=target_title, - target_body=target_body, - audio_url=audio_url, - target_body_pos=target_body_pos, - target_body_transcript=target_body_transcript, ) self.db.add(entity) await self.db.commit() await self.db.refresh(entity) return self._to_model(entity) - async def list_all(self, target_language: str) -> list[TranslatedArticle]: + async def update_content( + self, + article_id: uuid.UUID, + target_title: str, + target_body: str, + source_title: str, + source_body: str, + ) -> None: + entity = await self.db.get(TranslatedArticleEntity, article_id) + entity.target_title = target_title + entity.target_body = target_body + entity.source_title = source_title + entity.source_body = source_body + await self.db.commit() + + async def update_pos(self, article_id: uuid.UUID, target_body_pos: dict, source_body_pos: dict) -> None: + entity = await self.db.get(TranslatedArticleEntity, article_id) + entity.target_body_pos = target_body_pos + entity.source_body_pos = source_body_pos + await self.db.commit() + + async def update_audio( + self, + article_id: uuid.UUID, + audio_url: str, + target_body_transcript: dict, + ) -> None: + entity = await self.db.get(TranslatedArticleEntity, article_id) + entity.audio_url = audio_url + entity.target_body_transcript = target_body_transcript + await self.db.commit() + + async def get_by_audio_url(self, audio_url: str) -> TranslatedArticle | None: + result = await self.db.execute( + select(TranslatedArticleEntity).where( + TranslatedArticleEntity.audio_url == audio_url + ) + ) + entity = result.scalar_one_or_none() + return self._to_model(entity) if entity else None + + async def list_complete(self, target_language: str) -> list[TranslatedArticle]: + """Return articles that are fully generated (all content fields set, job succeeded).""" result = await self.db.execute( select(TranslatedArticleEntity) - .where(TranslatedArticleEntity.target_language == target_language) + .join( + SummariseJobEntity, + SummariseJobEntity.translated_article_id == TranslatedArticleEntity.id, + ) + .where( + TranslatedArticleEntity.target_language == target_language, + SummariseJobEntity.status == "succeeded", + TranslatedArticleEntity.target_body.is_not(None), + TranslatedArticleEntity.source_body.is_not(None), + ) .order_by(TranslatedArticleEntity.published_at.desc()) ) return [self._to_model(e) for e in result.scalars().all()] - async def get_by_id(self, article_id: uuid.UUID) -> TranslatedArticle | None: - entity = await self.db.get(TranslatedArticleEntity, article_id) + async def get_complete_by_id(self, article_id: uuid.UUID) -> TranslatedArticle | None: + """Return the article only if fully generated (all content fields set, job succeeded).""" + result = await self.db.execute( + select(TranslatedArticleEntity) + .join( + SummariseJobEntity, + SummariseJobEntity.translated_article_id == TranslatedArticleEntity.id, + ) + .where( + TranslatedArticleEntity.id == article_id, + SummariseJobEntity.status == "succeeded", + TranslatedArticleEntity.target_body.is_not(None), + TranslatedArticleEntity.source_body.is_not(None), + ) + ) + entity = result.scalar_one_or_none() return self._to_model(entity) if entity else None diff --git a/api/app/outbound/spacy/spacy_client.py b/api/app/outbound/spacy/spacy_client.py index 93200ed..3283b65 100644 --- a/api/app/outbound/spacy/spacy_client.py +++ b/api/app/outbound/spacy/spacy_client.py @@ -29,18 +29,31 @@ class SpacyClient: return self._cache[language] def get_parts_of_speech(self, text: str, language: str) -> dict: + """Use SpaCy to get parts of speech for the given text and language, + broken down by sentences and then by tokens.""" nlp = self._get_nlp(language) doc = nlp(text) - tokens = [ + + sentences = [ { - "text": token.text, - "lemma": token.lemma_, - "pos": token.pos_, - "tag": token.tag_, - "dep": token.dep_, - "is_stop": token.is_stop, + "text": sent.text, + "tokens": [ + { + "text": token.text, + "lemma": token.lemma_, + "type": token.ent_type_ if token.ent_type_ else None, + "pos": token.pos_, + "tag": token.tag_, + "dep": token.dep_, + "is_stop": token.is_stop, + "is_punct": token.is_punct, + "is_alpha": token.is_alpha, + + } + for token in sent + if not token.is_space + ], } - for token in doc - if not token.is_space + for sent in doc.sents ] - return {"language": language, "tokens": tokens} + return {"language": language, "sentences": sentences} diff --git a/api/app/routers/api/generation.py b/api/app/routers/api/generation.py index d721256..88c63b0 100644 --- a/api/app/routers/api/generation.py +++ b/api/app/routers/api/generation.py @@ -1,64 +1,26 @@ -import asyncio -import random import uuid from functools import partial -from typing import Any, Callable, Coroutine -import anthropic from fastapi import APIRouter, Depends, HTTPException from pydantic import BaseModel from sqlalchemy.ext.asyncio import AsyncSession from ...languages import SUPPORTED_LANGUAGES, SUPPORTED_LEVELS from ...auth import require_admin -from ...storage import upload_audio from ...outbound.postgres.database import get_db, AsyncSessionLocal from ...outbound.postgres.repositories import summarise_job_repository from ...outbound.postgres.repositories.translated_article_repository import TranslatedArticleRepository -from ...domain.services.article_service import first_heading from ...outbound.anthropic.anthropic_client import AnthropicClient from ...outbound.deepgram.deepgram_client import LocalDeepgramClient from ...outbound.deepl.deepl_client import DeepLClient from ...outbound.gemini.gemini_client import GeminiClient from ...outbound.spacy.spacy_client import SpacyClient +from ...domain.services.summarise_service import SummariseService from ...config import settings from ... import worker router = APIRouter(prefix="/generate", tags=["api"]) -_ANTHROPIC_RETRYABLE = ( - anthropic.RateLimitError, - anthropic.InternalServerError, - anthropic.APITimeoutError, - anthropic.APIConnectionError, -) -_MAX_RETRIES = 4 -_BASE_DELAY = 1.0 # seconds -_MAX_DELAY = 60.0 # seconds - - -async def _anthropic_with_backoff( - coro_fn: Callable[..., Coroutine[Any, Any, Any]], - *args: Any, - **kwargs: Any, -) -> Any: - for attempt in range(_MAX_RETRIES + 1): - try: - return await coro_fn(*args, **kwargs) - except _ANTHROPIC_RETRYABLE as exc: - if attempt == _MAX_RETRIES: - raise - retry_after: float | None = None - if isinstance(exc, anthropic.RateLimitError): - raw = exc.response.headers.get("retry-after") - if raw is not None: - retry_after = float(raw) - if retry_after is None: - retry_after = min(_BASE_DELAY * (2 ** attempt), _MAX_DELAY) - # ±20 % jitter to spread out concurrent retries - jittered = retry_after * (0.8 + random.random() * 0.4) - await asyncio.sleep(jittered) - class GenerationRequest(BaseModel): target_language: str @@ -71,69 +33,28 @@ class GenerationResponse(BaseModel): job_id: str -async def _run_generation(job_id: uuid.UUID, request: GenerationRequest) -> None: - anthropic_client = AnthropicClient.new(settings.anthropic_api_key) - deepgram_client = LocalDeepgramClient(settings.deepgram_api_key) - deepl_client = DeepLClient(settings.deepl_api_key) - gemini_client = GeminiClient(settings.gemini_api_key) - spacy_client = SpacyClient() - +async def _run_generation( + job_id: uuid.UUID, + article_id: uuid.UUID, + request: GenerationRequest, +) -> None: + service = SummariseService( + anthropic_client=AnthropicClient.new(settings.anthropic_api_key), + deepgram_client=LocalDeepgramClient(settings.deepgram_api_key), + deepl_client=DeepLClient(settings.deepl_api_key), + gemini_client=GeminiClient(settings.gemini_api_key), + spacy_client=SpacyClient(), + ) async with AsyncSessionLocal() as db: - job = await summarise_job_repository.get_by_id(db, job_id) - await summarise_job_repository.mark_processing(db, job) - - try: - language_name = SUPPORTED_LANGUAGES[request.target_language] - - source_material = "\n\n".join(request.input_texts[:3]) - - generated_text = await _anthropic_with_backoff( - anthropic_client.generate_summary_text, - content_to_summarise=source_material, - complexity_level=request.complexity_level, - from_language=language_name, - to_language=language_name, - length_preference="200-400 words", - ) - - await summarise_job_repository.save_generated_text( - db, job, generated_text, source_material[:500] - ) - - translated_text = await deepl_client.translate(generated_text, request.source_language) - - await summarise_job_repository.save_translated_text(db, job, translated_text) - - target_pos_data = spacy_client.get_parts_of_speech(generated_text, request.target_language) - source_pos_data = spacy_client.get_parts_of_speech(translated_text, request.source_language) - - await summarise_job_repository.save_pos_data(db, job, source_pos_data, target_pos_data) - - voice = gemini_client.get_voice_by_language(request.target_language) - wav_bytes = await gemini_client.generate_audio(generated_text, voice) - audio_key = f"audio/{job_id}.wav" - upload_audio(audio_key, wav_bytes) - - transcript = await deepgram_client.transcribe_bytes(wav_bytes, request.target_language) - await summarise_job_repository.save_audio_transcript(db, job, transcript) - - await summarise_job_repository.mark_succeeded(db, job, audio_key) - - await TranslatedArticleRepository(db).create( - source_language=request.source_language, - source_title=first_heading(translated_text) or "", - source_body=translated_text, - target_language=request.target_language, - target_complexities=[request.complexity_level], - target_title=first_heading(generated_text) or "", - target_body=generated_text, - audio_url=audio_key, - target_body_pos=target_pos_data, - target_body_transcript=transcript, - ) - - except Exception as exc: - await summarise_job_repository.mark_failed(db, job, str(exc)) + await service.run( + db=db, + job_id=job_id, + article_id=article_id, + source_language=request.source_language, + target_language=request.target_language, + complexity_level=request.complexity_level, + input_texts=request.input_texts, + ) @router.post("", response_model=GenerationResponse, status_code=202) @@ -155,14 +76,18 @@ async def create_generation_job( f"Supported: {sorted(SUPPORTED_LEVELS)}", ) + article = await TranslatedArticleRepository(db).create( + source_language=request.source_language, + target_language=request.target_language, + target_complexities=[request.complexity_level], + ) + job = await summarise_job_repository.create( db, user_id=uuid.UUID(token_data["sub"]), - source_language=request.source_language, - target_language=request.target_language, - complexity_level=request.complexity_level, + translated_article_id=uuid.UUID(article.id), ) - await worker.enqueue(partial(_run_generation, job.id, request)) + await worker.enqueue(partial(_run_generation, job.id, uuid.UUID(article.id), request)) return GenerationResponse(job_id=str(job.id)) diff --git a/api/app/routers/api/jobs.py b/api/app/routers/api/jobs.py index 02ee821..da3f099 100644 --- a/api/app/routers/api/jobs.py +++ b/api/app/routers/api/jobs.py @@ -1,5 +1,5 @@ import uuid -from datetime import datetime, timezone +from datetime import datetime from functools import partial from fastapi import APIRouter, Depends, HTTPException @@ -9,6 +9,8 @@ from sqlalchemy.ext.asyncio import AsyncSession from ...auth import require_admin from ...outbound.postgres.database import get_db, AsyncSessionLocal from ...outbound.postgres.repositories import summarise_job_repository +from ...outbound.postgres.repositories.translated_article_repository import TranslatedArticleRepository +from ...outbound.postgres.entities.translated_article_entity import TranslatedArticleEntity from ...outbound.gemini.gemini_client import GeminiClient from ...storage import upload_audio from ...config import settings @@ -20,21 +22,10 @@ router = APIRouter(prefix="/jobs", dependencies=[Depends(require_admin)]) class JobResponse(BaseModel): id: uuid.UUID status: str - source_language: str - target_language: str - complexity_level: str + translated_article_id: uuid.UUID | None = None created_at: datetime started_at: datetime | None = None completed_at: datetime | None = None - # only present on success - generated_text: str | None = None - generated_text_pos: dict | None = None - translated_text: str | None = None - translated_text_pos: dict | None = None - input_summary: str | None = None - audio_url: str | None = None - audio_transcript: dict | None = None - # only present on failure error_message: str | None = None model_config = {"from_attributes": True} @@ -45,6 +36,7 @@ class JobSummary(BaseModel): created_at: datetime completed_at: datetime | None = None error_message: str | None = None + model_config = {"from_attributes": True} class JobListResponse(BaseModel): @@ -77,44 +69,37 @@ async def get_job( if job is None: raise HTTPException(status_code=404, detail="Job not found") - response = JobResponse( - id=str(job.id), + return JobResponse( + id=job.id, status=job.status, - source_language=job.source_language, - target_language=job.target_language, - complexity_level=job.complexity_level, + translated_article_id=job.translated_article_id, created_at=job.created_at, started_at=job.started_at, completed_at=job.completed_at, + error_message=job.error_message, ) - if job.status == "succeeded": - response.generated_text = job.generated_text - response.generated_text_pos = job.target_pos_data - response.translated_text = job.translated_text - response.translated_text_pos = job.source_pos_data - response.input_summary = job.input_summary - response.audio_url = job.audio_url - response.audio_transcript = job.audio_transcript - elif job.status == "failed": - response.error_message = job.error_message - - return response - async def _run_regenerate_audio(job_id: uuid.UUID) -> None: gemini_client = GeminiClient(settings.gemini_api_key) async with AsyncSessionLocal() as db: job = await summarise_job_repository.get_by_id(db, job_id) + article_repo = TranslatedArticleRepository(db) + article_entity = await db.get(TranslatedArticleEntity, job.translated_article_id) await summarise_job_repository.mark_processing(db, job) try: - voice = gemini_client.get_voice_by_language(job.target_language) - wav_bytes = await gemini_client.generate_audio(job.generated_text, voice) + voice = gemini_client.get_voice_by_language(article_entity.target_language) + wav_bytes = await gemini_client.generate_audio(article_entity.target_body, voice) audio_key = f"audio/{job_id}.wav" upload_audio(audio_key, wav_bytes) - await summarise_job_repository.mark_succeeded(db, job, audio_key) + await article_repo.update_audio( + article_entity.id, + audio_url=audio_key, + target_body_transcript=article_entity.target_body_transcript, + ) + await summarise_job_repository.mark_succeeded(db, job) except Exception as exc: await summarise_job_repository.mark_failed(db, job, str(exc)) @@ -136,19 +121,21 @@ async def regenerate_audio( raise HTTPException(status_code=404, detail="Job not found") if str(job.user_id) != token_data["sub"]: - raise HTTPException( - status_code=403, detail="Not authorized to modify this job") + raise HTTPException(status_code=403, detail="Not authorized to modify this job") - if not job.generated_text: - raise HTTPException( - status_code=400, detail="Job has no generated text to synthesize") + if job.translated_article_id is None: + raise HTTPException(status_code=400, detail="Job has no associated article") - if job.audio_url: + article_entity = await db.get(TranslatedArticleEntity, job.translated_article_id) + + if not article_entity or not article_entity.target_body: + raise HTTPException(status_code=400, detail="Job has no generated text to synthesize") + + if article_entity.audio_url: raise HTTPException(status_code=409, detail="Job already has audio") if job.status == "processing": - raise HTTPException( - status_code=409, detail="Job is already processing") + raise HTTPException(status_code=409, detail="Job is already processing") await worker.enqueue(partial(_run_regenerate_audio, uid)) return {"job_id": job_id} diff --git a/api/app/routers/bff/articles.py b/api/app/routers/bff/articles.py index 0c32c06..e829b85 100644 --- a/api/app/routers/bff/articles.py +++ b/api/app/routers/bff/articles.py @@ -7,7 +7,6 @@ from sqlalchemy.ext.asyncio import AsyncSession from ...auth import verify_token from ...config import settings -from ...domain.services.article_service import ArticleService from ...outbound.postgres.database import get_db from ...outbound.postgres.repositories.translated_article_repository import TranslatedArticleRepository @@ -34,12 +33,13 @@ class ArticleDetail(BaseModel): source_language: str source_title: str source_body: str + source_body_pos: dict target_language: str target_complexities: list[str] target_title: str target_body: str target_audio_url: str | None - target_body_pos: dict | None + target_body_pos: dict target_body_transcript: dict | None @@ -55,8 +55,7 @@ async def list_articles( db: AsyncSession = Depends(get_db), _: dict = Depends(verify_token), ) -> ArticleListResponse: - service = ArticleService(TranslatedArticleRepository(db)) - articles = await service.get_all_articles(target_language=target_language) + articles = await TranslatedArticleRepository(db).list_complete(target_language=target_language) return ArticleListResponse( articles=[ ArticleItem( @@ -84,7 +83,7 @@ async def get_article( except ValueError: raise HTTPException(status_code=400, detail="Invalid article ID") - article = await TranslatedArticleRepository(db).get_by_id(uid) + article = await TranslatedArticleRepository(db).get_complete_by_id(uid) if article is None: raise HTTPException(status_code=404, detail="Article not found") @@ -94,6 +93,7 @@ async def get_article( source_language=article.source_language, source_title=article.source_title, source_body=article.source_body, + source_body_pos=article.source_body_pos, target_language=article.target_language, target_complexities=article.target_complexities, target_title=article.target_title, diff --git a/api/app/routers/media.py b/api/app/routers/media.py index d6d6c58..21ea447 100644 --- a/api/app/routers/media.py +++ b/api/app/routers/media.py @@ -1,13 +1,10 @@ -import uuid - from fastapi import APIRouter, Depends, HTTPException from fastapi.responses import Response from sqlalchemy.ext.asyncio import AsyncSession from botocore.exceptions import ClientError -from ..auth import verify_token from ..outbound.postgres.database import get_db -from ..outbound.postgres.repositories.summarise_job_repository import PostgresSummariseJobRepository +from ..outbound.postgres.repositories.translated_article_repository import TranslatedArticleRepository from ..storage import download_audio router = APIRouter(prefix="/media", tags=["media"]) @@ -18,10 +15,9 @@ async def get_media_file( filename: str, db: AsyncSession = Depends(get_db), ) -> Response: - repository = PostgresSummariseJobRepository(db) - job = await repository.get_by_audio_url(filename) + article = await TranslatedArticleRepository(db).get_by_audio_url(filename) - if job is None: + if article is None: raise HTTPException(status_code=404, detail="File not found") try: