feat: Update responsibility between the job and the translated_articles

This commit is contained in:
wilson 2026-03-30 07:11:32 +01:00
parent c0539bcf59
commit 8252b6fcf0
16 changed files with 431 additions and 341 deletions

View file

@ -0,0 +1,74 @@
"""separate job orchestration from article content
Revision ID: 0006
Revises: 0005
Create Date: 2026-03-29
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
from sqlalchemy.dialects import postgresql
revision: str = "0006"
down_revision: Union[str, None] = "0005"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# Make article content fields nullable — they are now filled in step-by-step
op.alter_column("translated_articles", "source_title", nullable=True)
op.alter_column("translated_articles", "source_body", nullable=True)
op.alter_column("translated_articles", "target_title", nullable=True)
op.alter_column("translated_articles", "target_body", nullable=True)
# Add source_body_pos to translated_articles
op.add_column(
"translated_articles",
sa.Column("source_body_pos", postgresql.JSONB(), nullable=True),
)
# Add FK from jobs to the article they produce
op.add_column(
"jobs",
sa.Column(
"translated_article_id",
postgresql.UUID(as_uuid=True),
sa.ForeignKey("translated_articles.id"),
nullable=True,
),
)
# Drop content columns that now live on translated_articles
op.drop_column("jobs", "source_language")
op.drop_column("jobs", "target_language")
op.drop_column("jobs", "complexity_level")
op.drop_column("jobs", "input_summary")
op.drop_column("jobs", "generated_text")
op.drop_column("jobs", "translated_text")
op.drop_column("jobs", "audio_url")
op.drop_column("jobs", "source_pos_data")
op.drop_column("jobs", "target_pos_data")
op.drop_column("jobs", "audio_transcript")
def downgrade() -> None:
op.add_column("jobs", sa.Column("audio_transcript", postgresql.JSONB(), nullable=True))
op.add_column("jobs", sa.Column("target_pos_data", postgresql.JSONB(), nullable=True))
op.add_column("jobs", sa.Column("source_pos_data", postgresql.JSONB(), nullable=True))
op.add_column("jobs", sa.Column("audio_url", sa.Text(), nullable=True))
op.add_column("jobs", sa.Column("translated_text", sa.Text(), nullable=True))
op.add_column("jobs", sa.Column("generated_text", sa.Text(), nullable=True))
op.add_column("jobs", sa.Column("input_summary", sa.Text(), nullable=True))
op.add_column("jobs", sa.Column("complexity_level", sa.String(5), nullable=False, server_default="B1"))
op.add_column("jobs", sa.Column("target_language", sa.String(10), nullable=False, server_default="fr"))
op.add_column("jobs", sa.Column("source_language", sa.String(10), nullable=False, server_default="en"))
op.drop_column("jobs", "translated_article_id")
op.alter_column("translated_articles", "target_body", nullable=False)
op.alter_column("translated_articles", "target_title", nullable=False)
op.alter_column("translated_articles", "source_body", nullable=False)
op.alter_column("translated_articles", "source_title", nullable=False)

View file

@ -7,17 +7,8 @@ class SummariseJob:
id: str
user_id: str
status: str
source_language: str
target_language: str
complexity_level: str
input_summary: str
generated_text: str
translated_text: str
error_message: str
audio_url: str
source_pos_data: dict | None
target_pos_data: dict | None
audio_transcript: dict | None
translated_article_id: str | None
error_message: str | None
created_at: datetime
started_at: datetime | None = None
completed_at: datetime | None = None

View file

@ -7,12 +7,14 @@ class TranslatedArticle:
id: str
published_at: datetime
source_language: str
source_title: str
source_body: str
target_language: str
target_complexities: list[str]
target_title: str
target_body: str
# Content fields — filled in step-by-step during generation
source_title: str | None
source_body: str | None
source_body_pos: dict | None
target_title: str | None
target_body: str | None
audio_url: str | None
target_body_pos: dict | None
target_body_transcript: dict | None

View file

@ -1,19 +0,0 @@
import re
from sqlalchemy.ext.asyncio import AsyncSession
from ..models.summarise_job import SummariseJob
from ..models.translated_article import TranslatedArticle
from ...outbound.postgres.repositories.translated_article_repository import TranslatedArticleRepository
def first_heading(md: str) -> str | None:
m = re.search(r'^#{1,2}\s+(.+)', md, re.MULTILINE)
return m.group(1).strip() if m else None
class ArticleService:
def __init__(self, db: AsyncSession) -> None:
self.translated_articles_repository = TranslatedArticleRepository(db)
async def get_all_articles(self, target_language: str) -> list[TranslatedArticle]:
"""Fetch all translated articles"""
articles = await self.translated_articles_repository.list_all(target_language)
return articles

View file

@ -0,0 +1,149 @@
import asyncio
import random
import re
import uuid
from typing import Any, Callable, Coroutine
import anthropic
from sqlalchemy.ext.asyncio import AsyncSession
from ...outbound.postgres.repositories import summarise_job_repository
from ...outbound.postgres.repositories.translated_article_repository import TranslatedArticleRepository
from ...outbound.anthropic.anthropic_client import AnthropicClient
from ...outbound.deepgram.deepgram_client import LocalDeepgramClient
from ...outbound.deepl.deepl_client import DeepLClient
from ...outbound.gemini.gemini_client import GeminiClient
from ...outbound.spacy.spacy_client import SpacyClient
from ...storage import upload_audio
from ...languages import SUPPORTED_LANGUAGES
_ANTHROPIC_RETRYABLE = (
anthropic.RateLimitError,
anthropic.InternalServerError,
anthropic.APITimeoutError,
anthropic.APIConnectionError,
)
_MAX_RETRIES = 4
_BASE_DELAY = 1.0
_MAX_DELAY = 60.0
async def _anthropic_with_backoff(
coro_fn: Callable[..., Coroutine[Any, Any, Any]],
*args: Any,
**kwargs: Any,
) -> Any:
for attempt in range(_MAX_RETRIES + 1):
try:
return await coro_fn(*args, **kwargs)
except _ANTHROPIC_RETRYABLE as exc:
if attempt == _MAX_RETRIES:
raise
retry_after: float | None = None
if isinstance(exc, anthropic.RateLimitError):
raw = exc.response.headers.get("retry-after")
if raw is not None:
retry_after = float(raw)
if retry_after is None:
retry_after = min(_BASE_DELAY * (2 ** attempt), _MAX_DELAY)
jittered = retry_after * (0.8 + random.random() * 0.4)
await asyncio.sleep(jittered)
class SummariseService:
def __init__(
self,
anthropic_client: AnthropicClient,
deepgram_client: LocalDeepgramClient,
deepl_client: DeepLClient,
gemini_client: GeminiClient,
spacy_client: SpacyClient,
) -> None:
self.anthropic_client = anthropic_client
self.deepgram_client = deepgram_client
self.deepl_client = deepl_client
self.gemini_client = gemini_client
self.spacy_client = spacy_client
def _first_heading(self, md: str) -> str | None:
m = re.search(r'^#{1,2}\s+(.+)', md, re.MULTILINE)
return m.group(1).strip() if m else None
def _split_title_and_body(self, text: str) -> tuple[str, str]:
"""Splits the text into a title (first heading) and body (the rest)."""
title = self._first_heading(text) or ""
body = text[len(title):].lstrip() if title else text
if title == "":
title = "Untitled Article"
return title, body
async def run(
self,
db: AsyncSession,
job_id: uuid.UUID,
article_id: uuid.UUID,
source_language: str,
target_language: str,
complexity_level: str,
input_texts: list[str],
) -> None:
article_repo = TranslatedArticleRepository(db)
job = await summarise_job_repository.get_by_id(db, job_id)
await summarise_job_repository.mark_processing(db, job)
try:
language_name = SUPPORTED_LANGUAGES[target_language]
source_material = "\n\n".join(input_texts[:3])
generated_text = await _anthropic_with_backoff(
self.anthropic_client.generate_summary_text,
content_to_summarise=source_material,
complexity_level=complexity_level,
from_language=language_name,
to_language=language_name,
length_preference="200-400 words",
)
generated_title, generated_text_without_title = self._split_title_and_body(generated_text)
await article_repo.update_content(
article_id,
target_title=generated_title,
target_body=generated_text_without_title,
source_title="",
source_body="",
)
translated_text = await self.deepl_client.translate(generated_text, source_language)
translated_title, translated_text_without_title = self._split_title_and_body(translated_text)
await article_repo.update_content(
article_id,
target_title=generated_title,
target_body=generated_text_without_title,
source_title=translated_title,
source_body=translated_text_without_title,
)
target_pos_data = self.spacy_client.get_parts_of_speech(generated_text_without_title, target_language)
source_pos_data = self.spacy_client.get_parts_of_speech(translated_text_without_title, source_language)
await article_repo.update_pos(article_id, target_pos_data, source_pos_data)
voice = self.gemini_client.get_voice_by_language(target_language)
wav_bytes = await self.gemini_client.generate_audio(generated_text, voice)
audio_key = f"audio/{job_id}.wav"
upload_audio(audio_key, wav_bytes)
transcript = await self.deepgram_client.transcribe_bytes(wav_bytes, target_language)
await article_repo.update_audio(article_id, audio_key, transcript)
await summarise_job_repository.mark_succeeded(db, job)
except Exception as exc:
await summarise_job_repository.mark_failed(db, job, str(exc))

View file

@ -19,15 +19,15 @@ class AnthropicClient():
) -> str:
return (
f"You are a language learning content creator.\n"
f"You generate markdown summaries of user-provided content.\n"
f"You generate original, level-appropriate content from a source.\n"
f"The content will be spoken aloud in {to_language}, write it accordingly.\n"
f"You will provide content in {to_language} at {complexity_level} proficiency level on the CEFR scale.\n"
f"The text you generate will:\n"
f"- Contain ONLY the generated summary text in {to_language}.\n"
f"- Never contain inappropriate (hateful, sexual, violent) content. It is preferable to return no text than to generate such content.\n"
f"- Speak directly to the reader/listener, adopting the tone and style of a semi-formal news reporter or podcaster.\n"
f"- Where appropriate (fluency level, content), use a small number of idiomatic expressions.\n"
f"- Where appropriate use at least one additional tense, beyond the default of the content.\n"
f"- Be formatted in markdown. Contain a single level 1 header (#) at the top, followed by paragraphs and line breaks.\n"
f"- Occasionally, where natural, include idiomatic expressions appropriate to {complexity_level} level.\n"
f"- Vary tense usage naturally — do not restrict the piece to a single tense.\n"
f"- Contain only plain text. The piece should start with a title prefaced like a level-1 markdown title (#), but all other text should be plain. \n"
f"- Be around {length_preference} long.\n"
f"- Be inspired by the content, but not the tone, of the source material."
)

View file

@ -1,4 +1,5 @@
import asyncio
import json
from deepgram import (
AsyncDeepgramClient,
)
@ -15,7 +16,7 @@ class LocalDeepgramClient:
utterances=True,
smart_format=True,
)
return response.results.json()
return json.loads(response.results.json())
async def transcribe_local_file(self, local_file_path: str, language_code: str) -> dict:
with open(local_file_path, "rb") as audio_file:

View file

@ -3,10 +3,11 @@ from datetime import datetime, timezone
from sqlalchemy import String, Text, DateTime, ForeignKey
from sqlalchemy.orm import Mapped, mapped_column
from sqlalchemy.dialects.postgresql import UUID, JSONB
from sqlalchemy.dialects.postgresql import UUID
from ..database import Base
class SummariseJobEntity(Base):
__tablename__ = "jobs"
@ -17,17 +18,10 @@ class SummariseJobEntity(Base):
UUID(as_uuid=True), ForeignKey("users.id"), nullable=True, index=True
)
status: Mapped[str] = mapped_column(String(20), nullable=False, default="pending")
source_language: Mapped[str] = mapped_column(String(10), nullable=False, default="en")
target_language: Mapped[str] = mapped_column(String(10), nullable=False)
complexity_level: Mapped[str] = mapped_column(String(5), nullable=False)
input_summary: Mapped[str | None] = mapped_column(Text, nullable=True)
generated_text: Mapped[str | None] = mapped_column(Text, nullable=True)
translated_text: Mapped[str | None] = mapped_column(Text, nullable=True)
translated_article_id: Mapped[uuid.UUID | None] = mapped_column(
UUID(as_uuid=True), ForeignKey("translated_articles.id"), nullable=True
)
error_message: Mapped[str | None] = mapped_column(Text, nullable=True)
audio_url: Mapped[str | None] = mapped_column(Text, nullable=True)
source_pos_data: Mapped[dict | None] = mapped_column(JSONB, nullable=True)
target_pos_data: Mapped[dict | None] = mapped_column(JSONB, nullable=True)
audio_transcript: Mapped[dict | None] = mapped_column(JSONB, nullable=True)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
default=lambda: datetime.now(timezone.utc),

View file

@ -19,12 +19,14 @@ class TranslatedArticleEntity(Base):
default=lambda: datetime.now(timezone.utc),
)
source_language: Mapped[str] = mapped_column(String(10), nullable=False)
source_title: Mapped[str] = mapped_column(Text, nullable=False)
source_body: Mapped[str] = mapped_column(Text, nullable=False)
target_language: Mapped[str] = mapped_column(String(10), nullable=False)
target_complexities: Mapped[list[str]] = mapped_column(ARRAY(String(5)), nullable=False)
target_title: Mapped[str] = mapped_column(Text, nullable=False)
target_body: Mapped[str] = mapped_column(Text, nullable=False)
# Content fields — nullable, filled in step-by-step during generation
source_title: Mapped[str | None] = mapped_column(Text, nullable=True)
source_body: Mapped[str | None] = mapped_column(Text, nullable=True)
source_body_pos: Mapped[dict | None] = mapped_column(JSONB, nullable=True)
target_title: Mapped[str | None] = mapped_column(Text, nullable=True)
target_body: Mapped[str | None] = mapped_column(Text, nullable=True)
audio_url: Mapped[str | None] = mapped_column(Text, nullable=True)
target_body_pos: Mapped[dict | None] = mapped_column(JSONB, nullable=True)
target_body_transcript: Mapped[dict | None] = mapped_column(JSONB, nullable=True)

View file

@ -7,69 +7,32 @@ from sqlalchemy.ext.asyncio import AsyncSession
from ..entities.summarise_job_entity import SummariseJobEntity
from ....domain.models.summarise_job import SummariseJob
class PostgresSummariseJobRepository:
def __init__(self, db: AsyncSession):
self.db = db
async def list_all(self) -> list[SummariseJob]:
result = self.db.execute(
select(SummariseJobEntity).order_by(SummariseJobEntity.created_at.desc())
)
return list(result.scalars().all()).map(self.entity_to_model)
async def get_by_audio_url(
self,
audio_url: str
) -> SummariseJob | None:
result = await self.db.execute(
select(SummariseJobEntity).where(
SummariseJobEntity.audio_url == audio_url
)
)
return self.entity_to_model(result.scalar_one_or_none())
def entity_to_model(self, entity: SummariseJobEntity | None) -> SummariseJob:
if entity is None:
return None
def _to_model(entity: SummariseJobEntity) -> SummariseJob:
return SummariseJob(
id=str(entity.id),
user_id=str(entity.user_id),
status=entity.status,
source_language=entity.source_language,
target_language=entity.target_language,
complexity_level=entity.complexity_level,
input_summary=entity.input_summary,
generated_text=entity.generated_text,
translated_text=entity.translated_text,
translated_article_id=str(entity.translated_article_id) if entity.translated_article_id else None,
error_message=entity.error_message,
audio_url=entity.audio_url,
source_pos_data=entity.source_pos_data,
target_pos_data=entity.target_pos_data,
audio_transcript=entity.audio_transcript,
created_at=entity.created_at,
started_at=entity.started_at,
completed_at=entity.completed_at,
)
async def update(db: AsyncSession, job: SummariseJobEntity) -> None:
async def _commit(db: AsyncSession) -> None:
await db.commit()
async def create(
db: AsyncSession,
user_id: uuid.UUID,
source_language: str,
target_language: str,
complexity_level: str,
translated_article_id: uuid.UUID,
) -> SummariseJobEntity:
job = SummariseJobEntity(
user_id=user_id,
source_language=source_language,
target_language=target_language,
complexity_level=complexity_level,
translated_article_id=translated_article_id,
)
db.add(job)
await db.commit()
@ -92,58 +55,17 @@ async def mark_processing(db: AsyncSession, job: SummariseJobEntity) -> None:
job.status = "processing"
job.started_at = datetime.now(timezone.utc)
job.error_message = None
await update(db, job)
await _commit(db)
async def save_generated_text(
db: AsyncSession,
job: SummariseJobEntity,
generated_text: str,
input_summary: str,
) -> None:
job.generated_text = generated_text
job.input_summary = input_summary
await update(db, job)
async def save_translated_text(
db: AsyncSession,
job: SummariseJobEntity,
translated_text: str,
) -> None:
job.translated_text = translated_text
await update(db, job)
async def save_pos_data(
db: AsyncSession,
job: SummariseJobEntity,
source_pos_data: dict,
target_pos_data: dict,
) -> None:
job.source_pos_data = source_pos_data
job.target_pos_data = target_pos_data
await update(db, job)
async def save_audio_transcript(
db: AsyncSession,
job: SummariseJobEntity,
audio_transcript: dict,
) -> None:
job.audio_transcript = audio_transcript
await update(db, job)
async def mark_succeeded(db: AsyncSession, job: SummariseJobEntity, audio_url: str) -> None:
async def mark_succeeded(db: AsyncSession, job: SummariseJobEntity) -> None:
job.status = "succeeded"
job.audio_url = audio_url
job.completed_at = datetime.now(timezone.utc)
await update(db, job)
await _commit(db)
async def mark_failed(db: AsyncSession, job: SummariseJobEntity, error: str) -> None:
job.status = "failed"
job.error_message = error
job.completed_at = datetime.now(timezone.utc)
await update(db, job)
await _commit(db)

View file

@ -5,6 +5,7 @@ from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from ..entities.translated_article_entity import TranslatedArticleEntity
from ..entities.summarise_job_entity import SummariseJobEntity
from ....domain.models.translated_article import TranslatedArticle
@ -17,10 +18,11 @@ class TranslatedArticleRepository:
id=str(entity.id),
published_at=entity.published_at,
source_language=entity.source_language,
source_title=entity.source_title,
source_body=entity.source_body,
target_language=entity.target_language,
target_complexities=list(entity.target_complexities),
source_title=entity.source_title,
source_body=entity.source_body,
source_body_pos=entity.source_body_pos,
target_title=entity.target_title,
target_body=entity.target_body,
audio_url=entity.audio_url,
@ -31,42 +33,93 @@ class TranslatedArticleRepository:
async def create(
self,
source_language: str,
source_title: str,
source_body: str,
target_language: str,
target_complexities: list[str],
target_title: str,
target_body: str,
audio_url: str | None,
target_body_pos: dict | None,
target_body_transcript: dict | None,
) -> TranslatedArticle:
entity = TranslatedArticleEntity(
published_at=datetime.now(timezone.utc),
source_language=source_language,
source_title=source_title,
source_body=source_body,
target_language=target_language,
target_complexities=target_complexities,
target_title=target_title,
target_body=target_body,
audio_url=audio_url,
target_body_pos=target_body_pos,
target_body_transcript=target_body_transcript,
)
self.db.add(entity)
await self.db.commit()
await self.db.refresh(entity)
return self._to_model(entity)
async def list_all(self, target_language: str) -> list[TranslatedArticle]:
async def update_content(
self,
article_id: uuid.UUID,
target_title: str,
target_body: str,
source_title: str,
source_body: str,
) -> None:
entity = await self.db.get(TranslatedArticleEntity, article_id)
entity.target_title = target_title
entity.target_body = target_body
entity.source_title = source_title
entity.source_body = source_body
await self.db.commit()
async def update_pos(self, article_id: uuid.UUID, target_body_pos: dict, source_body_pos: dict) -> None:
entity = await self.db.get(TranslatedArticleEntity, article_id)
entity.target_body_pos = target_body_pos
entity.source_body_pos = source_body_pos
await self.db.commit()
async def update_audio(
self,
article_id: uuid.UUID,
audio_url: str,
target_body_transcript: dict,
) -> None:
entity = await self.db.get(TranslatedArticleEntity, article_id)
entity.audio_url = audio_url
entity.target_body_transcript = target_body_transcript
await self.db.commit()
async def get_by_audio_url(self, audio_url: str) -> TranslatedArticle | None:
result = await self.db.execute(
select(TranslatedArticleEntity).where(
TranslatedArticleEntity.audio_url == audio_url
)
)
entity = result.scalar_one_or_none()
return self._to_model(entity) if entity else None
async def list_complete(self, target_language: str) -> list[TranslatedArticle]:
"""Return articles that are fully generated (all content fields set, job succeeded)."""
result = await self.db.execute(
select(TranslatedArticleEntity)
.where(TranslatedArticleEntity.target_language == target_language)
.join(
SummariseJobEntity,
SummariseJobEntity.translated_article_id == TranslatedArticleEntity.id,
)
.where(
TranslatedArticleEntity.target_language == target_language,
SummariseJobEntity.status == "succeeded",
TranslatedArticleEntity.target_body.is_not(None),
TranslatedArticleEntity.source_body.is_not(None),
)
.order_by(TranslatedArticleEntity.published_at.desc())
)
return [self._to_model(e) for e in result.scalars().all()]
async def get_by_id(self, article_id: uuid.UUID) -> TranslatedArticle | None:
entity = await self.db.get(TranslatedArticleEntity, article_id)
async def get_complete_by_id(self, article_id: uuid.UUID) -> TranslatedArticle | None:
"""Return the article only if fully generated (all content fields set, job succeeded)."""
result = await self.db.execute(
select(TranslatedArticleEntity)
.join(
SummariseJobEntity,
SummariseJobEntity.translated_article_id == TranslatedArticleEntity.id,
)
.where(
TranslatedArticleEntity.id == article_id,
SummariseJobEntity.status == "succeeded",
TranslatedArticleEntity.target_body.is_not(None),
TranslatedArticleEntity.source_body.is_not(None),
)
)
entity = result.scalar_one_or_none()
return self._to_model(entity) if entity else None

View file

@ -29,18 +29,31 @@ class SpacyClient:
return self._cache[language]
def get_parts_of_speech(self, text: str, language: str) -> dict:
"""Use SpaCy to get parts of speech for the given text and language,
broken down by sentences and then by tokens."""
nlp = self._get_nlp(language)
doc = nlp(text)
tokens = [
sentences = [
{
"text": sent.text,
"tokens": [
{
"text": token.text,
"lemma": token.lemma_,
"type": token.ent_type_ if token.ent_type_ else None,
"pos": token.pos_,
"tag": token.tag_,
"dep": token.dep_,
"is_stop": token.is_stop,
"is_punct": token.is_punct,
"is_alpha": token.is_alpha,
}
for token in doc
for token in sent
if not token.is_space
],
}
for sent in doc.sents
]
return {"language": language, "tokens": tokens}
return {"language": language, "sentences": sentences}

View file

@ -1,64 +1,26 @@
import asyncio
import random
import uuid
from functools import partial
from typing import Any, Callable, Coroutine
import anthropic
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
from sqlalchemy.ext.asyncio import AsyncSession
from ...languages import SUPPORTED_LANGUAGES, SUPPORTED_LEVELS
from ...auth import require_admin
from ...storage import upload_audio
from ...outbound.postgres.database import get_db, AsyncSessionLocal
from ...outbound.postgres.repositories import summarise_job_repository
from ...outbound.postgres.repositories.translated_article_repository import TranslatedArticleRepository
from ...domain.services.article_service import first_heading
from ...outbound.anthropic.anthropic_client import AnthropicClient
from ...outbound.deepgram.deepgram_client import LocalDeepgramClient
from ...outbound.deepl.deepl_client import DeepLClient
from ...outbound.gemini.gemini_client import GeminiClient
from ...outbound.spacy.spacy_client import SpacyClient
from ...domain.services.summarise_service import SummariseService
from ...config import settings
from ... import worker
router = APIRouter(prefix="/generate", tags=["api"])
_ANTHROPIC_RETRYABLE = (
anthropic.RateLimitError,
anthropic.InternalServerError,
anthropic.APITimeoutError,
anthropic.APIConnectionError,
)
_MAX_RETRIES = 4
_BASE_DELAY = 1.0 # seconds
_MAX_DELAY = 60.0 # seconds
async def _anthropic_with_backoff(
coro_fn: Callable[..., Coroutine[Any, Any, Any]],
*args: Any,
**kwargs: Any,
) -> Any:
for attempt in range(_MAX_RETRIES + 1):
try:
return await coro_fn(*args, **kwargs)
except _ANTHROPIC_RETRYABLE as exc:
if attempt == _MAX_RETRIES:
raise
retry_after: float | None = None
if isinstance(exc, anthropic.RateLimitError):
raw = exc.response.headers.get("retry-after")
if raw is not None:
retry_after = float(raw)
if retry_after is None:
retry_after = min(_BASE_DELAY * (2 ** attempt), _MAX_DELAY)
# ±20 % jitter to spread out concurrent retries
jittered = retry_after * (0.8 + random.random() * 0.4)
await asyncio.sleep(jittered)
class GenerationRequest(BaseModel):
target_language: str
@ -71,70 +33,29 @@ class GenerationResponse(BaseModel):
job_id: str
async def _run_generation(job_id: uuid.UUID, request: GenerationRequest) -> None:
anthropic_client = AnthropicClient.new(settings.anthropic_api_key)
deepgram_client = LocalDeepgramClient(settings.deepgram_api_key)
deepl_client = DeepLClient(settings.deepl_api_key)
gemini_client = GeminiClient(settings.gemini_api_key)
spacy_client = SpacyClient()
async def _run_generation(
job_id: uuid.UUID,
article_id: uuid.UUID,
request: GenerationRequest,
) -> None:
service = SummariseService(
anthropic_client=AnthropicClient.new(settings.anthropic_api_key),
deepgram_client=LocalDeepgramClient(settings.deepgram_api_key),
deepl_client=DeepLClient(settings.deepl_api_key),
gemini_client=GeminiClient(settings.gemini_api_key),
spacy_client=SpacyClient(),
)
async with AsyncSessionLocal() as db:
job = await summarise_job_repository.get_by_id(db, job_id)
await summarise_job_repository.mark_processing(db, job)
try:
language_name = SUPPORTED_LANGUAGES[request.target_language]
source_material = "\n\n".join(request.input_texts[:3])
generated_text = await _anthropic_with_backoff(
anthropic_client.generate_summary_text,
content_to_summarise=source_material,
complexity_level=request.complexity_level,
from_language=language_name,
to_language=language_name,
length_preference="200-400 words",
)
await summarise_job_repository.save_generated_text(
db, job, generated_text, source_material[:500]
)
translated_text = await deepl_client.translate(generated_text, request.source_language)
await summarise_job_repository.save_translated_text(db, job, translated_text)
target_pos_data = spacy_client.get_parts_of_speech(generated_text, request.target_language)
source_pos_data = spacy_client.get_parts_of_speech(translated_text, request.source_language)
await summarise_job_repository.save_pos_data(db, job, source_pos_data, target_pos_data)
voice = gemini_client.get_voice_by_language(request.target_language)
wav_bytes = await gemini_client.generate_audio(generated_text, voice)
audio_key = f"audio/{job_id}.wav"
upload_audio(audio_key, wav_bytes)
transcript = await deepgram_client.transcribe_bytes(wav_bytes, request.target_language)
await summarise_job_repository.save_audio_transcript(db, job, transcript)
await summarise_job_repository.mark_succeeded(db, job, audio_key)
await TranslatedArticleRepository(db).create(
await service.run(
db=db,
job_id=job_id,
article_id=article_id,
source_language=request.source_language,
source_title=first_heading(translated_text) or "",
source_body=translated_text,
target_language=request.target_language,
target_complexities=[request.complexity_level],
target_title=first_heading(generated_text) or "",
target_body=generated_text,
audio_url=audio_key,
target_body_pos=target_pos_data,
target_body_transcript=transcript,
complexity_level=request.complexity_level,
input_texts=request.input_texts,
)
except Exception as exc:
await summarise_job_repository.mark_failed(db, job, str(exc))
@router.post("", response_model=GenerationResponse, status_code=202)
async def create_generation_job(
@ -155,14 +76,18 @@ async def create_generation_job(
f"Supported: {sorted(SUPPORTED_LEVELS)}",
)
article = await TranslatedArticleRepository(db).create(
source_language=request.source_language,
target_language=request.target_language,
target_complexities=[request.complexity_level],
)
job = await summarise_job_repository.create(
db,
user_id=uuid.UUID(token_data["sub"]),
source_language=request.source_language,
target_language=request.target_language,
complexity_level=request.complexity_level,
translated_article_id=uuid.UUID(article.id),
)
await worker.enqueue(partial(_run_generation, job.id, request))
await worker.enqueue(partial(_run_generation, job.id, uuid.UUID(article.id), request))
return GenerationResponse(job_id=str(job.id))

View file

@ -1,5 +1,5 @@
import uuid
from datetime import datetime, timezone
from datetime import datetime
from functools import partial
from fastapi import APIRouter, Depends, HTTPException
@ -9,6 +9,8 @@ from sqlalchemy.ext.asyncio import AsyncSession
from ...auth import require_admin
from ...outbound.postgres.database import get_db, AsyncSessionLocal
from ...outbound.postgres.repositories import summarise_job_repository
from ...outbound.postgres.repositories.translated_article_repository import TranslatedArticleRepository
from ...outbound.postgres.entities.translated_article_entity import TranslatedArticleEntity
from ...outbound.gemini.gemini_client import GeminiClient
from ...storage import upload_audio
from ...config import settings
@ -20,21 +22,10 @@ router = APIRouter(prefix="/jobs", dependencies=[Depends(require_admin)])
class JobResponse(BaseModel):
id: uuid.UUID
status: str
source_language: str
target_language: str
complexity_level: str
translated_article_id: uuid.UUID | None = None
created_at: datetime
started_at: datetime | None = None
completed_at: datetime | None = None
# only present on success
generated_text: str | None = None
generated_text_pos: dict | None = None
translated_text: str | None = None
translated_text_pos: dict | None = None
input_summary: str | None = None
audio_url: str | None = None
audio_transcript: dict | None = None
# only present on failure
error_message: str | None = None
model_config = {"from_attributes": True}
@ -45,6 +36,7 @@ class JobSummary(BaseModel):
created_at: datetime
completed_at: datetime | None = None
error_message: str | None = None
model_config = {"from_attributes": True}
class JobListResponse(BaseModel):
@ -77,44 +69,37 @@ async def get_job(
if job is None:
raise HTTPException(status_code=404, detail="Job not found")
response = JobResponse(
id=str(job.id),
return JobResponse(
id=job.id,
status=job.status,
source_language=job.source_language,
target_language=job.target_language,
complexity_level=job.complexity_level,
translated_article_id=job.translated_article_id,
created_at=job.created_at,
started_at=job.started_at,
completed_at=job.completed_at,
error_message=job.error_message,
)
if job.status == "succeeded":
response.generated_text = job.generated_text
response.generated_text_pos = job.target_pos_data
response.translated_text = job.translated_text
response.translated_text_pos = job.source_pos_data
response.input_summary = job.input_summary
response.audio_url = job.audio_url
response.audio_transcript = job.audio_transcript
elif job.status == "failed":
response.error_message = job.error_message
return response
async def _run_regenerate_audio(job_id: uuid.UUID) -> None:
gemini_client = GeminiClient(settings.gemini_api_key)
async with AsyncSessionLocal() as db:
job = await summarise_job_repository.get_by_id(db, job_id)
article_repo = TranslatedArticleRepository(db)
article_entity = await db.get(TranslatedArticleEntity, job.translated_article_id)
await summarise_job_repository.mark_processing(db, job)
try:
voice = gemini_client.get_voice_by_language(job.target_language)
wav_bytes = await gemini_client.generate_audio(job.generated_text, voice)
voice = gemini_client.get_voice_by_language(article_entity.target_language)
wav_bytes = await gemini_client.generate_audio(article_entity.target_body, voice)
audio_key = f"audio/{job_id}.wav"
upload_audio(audio_key, wav_bytes)
await summarise_job_repository.mark_succeeded(db, job, audio_key)
await article_repo.update_audio(
article_entity.id,
audio_url=audio_key,
target_body_transcript=article_entity.target_body_transcript,
)
await summarise_job_repository.mark_succeeded(db, job)
except Exception as exc:
await summarise_job_repository.mark_failed(db, job, str(exc))
@ -136,19 +121,21 @@ async def regenerate_audio(
raise HTTPException(status_code=404, detail="Job not found")
if str(job.user_id) != token_data["sub"]:
raise HTTPException(
status_code=403, detail="Not authorized to modify this job")
raise HTTPException(status_code=403, detail="Not authorized to modify this job")
if not job.generated_text:
raise HTTPException(
status_code=400, detail="Job has no generated text to synthesize")
if job.translated_article_id is None:
raise HTTPException(status_code=400, detail="Job has no associated article")
if job.audio_url:
article_entity = await db.get(TranslatedArticleEntity, job.translated_article_id)
if not article_entity or not article_entity.target_body:
raise HTTPException(status_code=400, detail="Job has no generated text to synthesize")
if article_entity.audio_url:
raise HTTPException(status_code=409, detail="Job already has audio")
if job.status == "processing":
raise HTTPException(
status_code=409, detail="Job is already processing")
raise HTTPException(status_code=409, detail="Job is already processing")
await worker.enqueue(partial(_run_regenerate_audio, uid))
return {"job_id": job_id}

View file

@ -7,7 +7,6 @@ from sqlalchemy.ext.asyncio import AsyncSession
from ...auth import verify_token
from ...config import settings
from ...domain.services.article_service import ArticleService
from ...outbound.postgres.database import get_db
from ...outbound.postgres.repositories.translated_article_repository import TranslatedArticleRepository
@ -34,12 +33,13 @@ class ArticleDetail(BaseModel):
source_language: str
source_title: str
source_body: str
source_body_pos: dict
target_language: str
target_complexities: list[str]
target_title: str
target_body: str
target_audio_url: str | None
target_body_pos: dict | None
target_body_pos: dict
target_body_transcript: dict | None
@ -55,8 +55,7 @@ async def list_articles(
db: AsyncSession = Depends(get_db),
_: dict = Depends(verify_token),
) -> ArticleListResponse:
service = ArticleService(TranslatedArticleRepository(db))
articles = await service.get_all_articles(target_language=target_language)
articles = await TranslatedArticleRepository(db).list_complete(target_language=target_language)
return ArticleListResponse(
articles=[
ArticleItem(
@ -84,7 +83,7 @@ async def get_article(
except ValueError:
raise HTTPException(status_code=400, detail="Invalid article ID")
article = await TranslatedArticleRepository(db).get_by_id(uid)
article = await TranslatedArticleRepository(db).get_complete_by_id(uid)
if article is None:
raise HTTPException(status_code=404, detail="Article not found")
@ -94,6 +93,7 @@ async def get_article(
source_language=article.source_language,
source_title=article.source_title,
source_body=article.source_body,
source_body_pos=article.source_body_pos,
target_language=article.target_language,
target_complexities=article.target_complexities,
target_title=article.target_title,

View file

@ -1,13 +1,10 @@
import uuid
from fastapi import APIRouter, Depends, HTTPException
from fastapi.responses import Response
from sqlalchemy.ext.asyncio import AsyncSession
from botocore.exceptions import ClientError
from ..auth import verify_token
from ..outbound.postgres.database import get_db
from ..outbound.postgres.repositories.summarise_job_repository import PostgresSummariseJobRepository
from ..outbound.postgres.repositories.translated_article_repository import TranslatedArticleRepository
from ..storage import download_audio
router = APIRouter(prefix="/media", tags=["media"])
@ -18,10 +15,9 @@ async def get_media_file(
filename: str,
db: AsyncSession = Depends(get_db),
) -> Response:
repository = PostgresSummariseJobRepository(db)
job = await repository.get_by_audio_url(filename)
article = await TranslatedArticleRepository(db).get_by_audio_url(filename)
if job is None:
if article is None:
raise HTTPException(status_code=404, detail="File not found")
try: