diff --git a/api/alembic/versions/20260327_0003_add_pos_and_transcript_to_jobs.py b/api/alembic/versions/20260327_0003_add_pos_and_transcript_to_jobs.py new file mode 100644 index 0000000..dbb644c --- /dev/null +++ b/api/alembic/versions/20260327_0003_add_pos_and_transcript_to_jobs.py @@ -0,0 +1,29 @@ +"""add source_pos_data, target_pos_data, audio_transcript to jobs + +Revision ID: 0003 +Revises: 0002 +Create Date: 2026-03-27 + +""" +from typing import Sequence, Union + +import sqlalchemy as sa +from alembic import op +from sqlalchemy.dialects import postgresql + +revision: str = "0003" +down_revision: Union[str, None] = "0002" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.add_column("jobs", sa.Column("source_pos_data", postgresql.JSONB(), nullable=True)) + op.add_column("jobs", sa.Column("target_pos_data", postgresql.JSONB(), nullable=True)) + op.add_column("jobs", sa.Column("audio_transcript", postgresql.JSONB(), nullable=True)) + + +def downgrade() -> None: + op.drop_column("jobs", "audio_transcript") + op.drop_column("jobs", "target_pos_data") + op.drop_column("jobs", "source_pos_data") diff --git a/api/app/config.py b/api/app/config.py index de2b88a..56d091d 100644 --- a/api/app/config.py +++ b/api/app/config.py @@ -6,6 +6,7 @@ class Settings(BaseSettings): jwt_secret: str anthropic_api_key: str deepl_api_key: str + deepgram_api_key: str gemini_api_key: str storage_endpoint_url: str storage_access_key: str diff --git a/api/app/domain/models/summarise_job.py b/api/app/domain/models/summarise_job.py index c7a6db2..2fd8868 100644 --- a/api/app/domain/models/summarise_job.py +++ b/api/app/domain/models/summarise_job.py @@ -15,6 +15,9 @@ class SummariseJob: translated_text: str error_message: str audio_url: str + source_pos_data: dict | None + target_pos_data: dict | None + audio_transcript: dict | None created_at: datetime started_at: datetime | None = None completed_at: datetime | None = None diff --git a/api/app/outbound/anthropic/anthropic_client.py b/api/app/outbound/anthropic/anthropic_client.py index 0fd6e9e..fe4958b 100644 --- a/api/app/outbound/anthropic/anthropic_client.py +++ b/api/app/outbound/anthropic/anthropic_client.py @@ -19,18 +19,17 @@ class AnthropicClient(): ) -> str: return ( f"You are a language learning content creator.\n" - f"The user will provide input, you will generate an engaging realistic summary text in {to_language} " - f"at {complexity_level} proficiency level (CEFR scale).\n\n" + f"You generate markdown summaries of user-provided content.\n" + f"You will provide content in {to_language} at {complexity_level} proficiency level on the CEFR scale.\n" f"The text you generate will:\n" - f"- Contain ONLY the generated text in {to_language}.\n" - f"- Be appropriate for a {complexity_level} {to_language} speaker.\n" - f"- Never generate inappropriate (hateful, sexual, violent) content. It is preferable to return no text than to generate such content.\n" + f"- Contain ONLY the generated summary text in {to_language}.\n" + f"- Never contain inappropriate (hateful, sexual, violent) content. It is preferable to return no text than to generate such content.\n" f"- Speak directly to the reader/listener, adopting the tone and style of a semi-formal news reporter or podcaster.\n" f"- Where appropriate (fluency level, content), use a small number of idiomatic expressions.\n" - f"- Be formatted in markdown with paragraphs and line breaks.\n" - f"- Be {length_preference} long.\n" - f"- Be inspired by the following source material " - f"(but written originally in {from_language}):\n\n" + f"- Where appropriate use at least one additional tense, beyond the default of the content.\n" + f"- Be formatted in markdown. Contain a single level 1 header (#) at the top, followed by paragraphs and line breaks.\n" + f"- Be around {length_preference} long.\n" + f"- Be inspired by the content, but not the tone, of the source material." ) def _create_prompt_summarise_text( diff --git a/api/app/outbound/deepgram/deepgram_client.py b/api/app/outbound/deepgram/deepgram_client.py index 5ffc266..cb81a14 100644 --- a/api/app/outbound/deepgram/deepgram_client.py +++ b/api/app/outbound/deepgram/deepgram_client.py @@ -7,15 +7,18 @@ class LocalDeepgramClient: def __init__(self, api_key: str): self.deepgram_client = AsyncDeepgramClient(api_key=api_key) - async def transcribe_local_file(self, local_file_path: str, language_code: str): + async def transcribe_bytes(self, audio_bytes: bytes, language_code: str) -> dict: + response = await self.deepgram_client.listen.v1.media.transcribe_file( + request=audio_bytes, + model="nova-3", + language=language_code, + utterances=True, + smart_format=True, + ) + return response.results.json() + + async def transcribe_local_file(self, local_file_path: str, language_code: str) -> dict: with open(local_file_path, "rb") as audio_file: - response = await self.deepgram_client.listen.v1.media.transcribe_file( - request=audio_file.read(), - model="nova-3", - language=language_code, - utterances=True, - smart_format=True, - ) + return await self.transcribe_bytes(audio_file, language_code) - return response.results diff --git a/api/app/outbound/postgres/entities/summarise_job_entity.py b/api/app/outbound/postgres/entities/summarise_job_entity.py index f3d5411..ee43640 100644 --- a/api/app/outbound/postgres/entities/summarise_job_entity.py +++ b/api/app/outbound/postgres/entities/summarise_job_entity.py @@ -3,7 +3,7 @@ from datetime import datetime, timezone from sqlalchemy import String, Text, DateTime, ForeignKey from sqlalchemy.orm import Mapped, mapped_column -from sqlalchemy.dialects.postgresql import UUID +from sqlalchemy.dialects.postgresql import UUID, JSONB from ..database import Base @@ -25,6 +25,9 @@ class SummariseJobEntity(Base): translated_text: Mapped[str | None] = mapped_column(Text, nullable=True) error_message: Mapped[str | None] = mapped_column(Text, nullable=True) audio_url: Mapped[str | None] = mapped_column(Text, nullable=True) + source_pos_data: Mapped[dict | None] = mapped_column(JSONB, nullable=True) + target_pos_data: Mapped[dict | None] = mapped_column(JSONB, nullable=True) + audio_transcript: Mapped[dict | None] = mapped_column(JSONB, nullable=True) created_at: Mapped[datetime] = mapped_column( DateTime(timezone=True), default=lambda: datetime.now(timezone.utc), diff --git a/api/app/outbound/postgres/repositories/summarise_job_repository.py b/api/app/outbound/postgres/repositories/summarise_job_repository.py index a0c67d5..bda311a 100644 --- a/api/app/outbound/postgres/repositories/summarise_job_repository.py +++ b/api/app/outbound/postgres/repositories/summarise_job_repository.py @@ -46,6 +46,9 @@ class PostgresSummariseJobRepository: translated_text=entity.translated_text, error_message=entity.error_message, audio_url=entity.audio_url, + source_pos_data=entity.source_pos_data, + target_pos_data=entity.target_pos_data, + audio_transcript=entity.audio_transcript, created_at=entity.created_at, started_at=entity.started_at, completed_at=entity.completed_at, @@ -112,6 +115,26 @@ async def save_translated_text( await update(db, job) +async def save_pos_data( + db: AsyncSession, + job: SummariseJobEntity, + source_pos_data: dict, + target_pos_data: dict, +) -> None: + job.source_pos_data = source_pos_data + job.target_pos_data = target_pos_data + await update(db, job) + + +async def save_audio_transcript( + db: AsyncSession, + job: SummariseJobEntity, + audio_transcript: dict, +) -> None: + job.audio_transcript = audio_transcript + await update(db, job) + + async def mark_succeeded(db: AsyncSession, job: SummariseJobEntity, audio_url: str) -> None: job.status = "succeeded" job.audio_url = audio_url diff --git a/api/app/routers/api/generation.py b/api/app/routers/api/generation.py index 93beb06..41f4705 100644 --- a/api/app/routers/api/generation.py +++ b/api/app/routers/api/generation.py @@ -1,6 +1,10 @@ +import asyncio +import random import uuid from functools import partial +from typing import Any, Callable, Coroutine +import anthropic from fastapi import APIRouter, Depends, HTTPException from pydantic import BaseModel from sqlalchemy.ext.asyncio import AsyncSession @@ -11,13 +15,48 @@ from ...storage import upload_audio from ...outbound.postgres.database import get_db, AsyncSessionLocal from ...outbound.postgres.repositories import summarise_job_repository from ...outbound.anthropic.anthropic_client import AnthropicClient +from ...outbound.deepgram.deepgram_client import LocalDeepgramClient from ...outbound.deepl.deepl_client import DeepLClient -from ...outbound.gemini.gemini_client import GeminiClient +from ...outbound.gemini.gemini_client import GeminiClient +from ...outbound.spacy.spacy_client import SpacyClient from ...config import settings from ... import worker router = APIRouter(prefix="/generate", tags=["api"]) +_ANTHROPIC_RETRYABLE = ( + anthropic.RateLimitError, + anthropic.InternalServerError, + anthropic.APITimeoutError, + anthropic.APIConnectionError, +) +_MAX_RETRIES = 4 +_BASE_DELAY = 1.0 # seconds +_MAX_DELAY = 60.0 # seconds + + +async def _anthropic_with_backoff( + coro_fn: Callable[..., Coroutine[Any, Any, Any]], + *args: Any, + **kwargs: Any, +) -> Any: + for attempt in range(_MAX_RETRIES + 1): + try: + return await coro_fn(*args, **kwargs) + except _ANTHROPIC_RETRYABLE as exc: + if attempt == _MAX_RETRIES: + raise + retry_after: float | None = None + if isinstance(exc, anthropic.RateLimitError): + raw = exc.response.headers.get("retry-after") + if raw is not None: + retry_after = float(raw) + if retry_after is None: + retry_after = min(_BASE_DELAY * (2 ** attempt), _MAX_DELAY) + # ±20 % jitter to spread out concurrent retries + jittered = retry_after * (0.8 + random.random() * 0.4) + await asyncio.sleep(jittered) + class GenerationRequest(BaseModel): target_language: str @@ -32,8 +71,10 @@ class GenerationResponse(BaseModel): async def _run_generation(job_id: uuid.UUID, request: GenerationRequest) -> None: anthropic_client = AnthropicClient.new(settings.anthropic_api_key) + deepgram_client = LocalDeepgramClient(settings.deepgram_api_key) deepl_client = DeepLClient(settings.deepl_api_key) gemini_client = GeminiClient(settings.gemini_api_key) + spacy_client = SpacyClient() async with AsyncSessionLocal() as db: job = await summarise_job_repository.get_by_id(db, job_id) @@ -44,7 +85,8 @@ async def _run_generation(job_id: uuid.UUID, request: GenerationRequest) -> None source_material = "\n\n".join(request.input_texts[:3]) - generated_text = await anthropic_client.generate_summary_text( + generated_text = await _anthropic_with_backoff( + anthropic_client.generate_summary_text, content_to_summarise=source_material, complexity_level=request.complexity_level, from_language=language_name, @@ -60,11 +102,19 @@ async def _run_generation(job_id: uuid.UUID, request: GenerationRequest) -> None await summarise_job_repository.save_translated_text(db, job, translated_text) + target_pos_data = spacy_client.get_parts_of_speech(generated_text, request.target_language) + source_pos_data = spacy_client.get_parts_of_speech(translated_text, request.source_language) + + await summarise_job_repository.save_pos_data(db, job, source_pos_data, target_pos_data) + voice = gemini_client.get_voice_by_language(request.target_language) wav_bytes = await gemini_client.generate_audio(generated_text, voice) audio_key = f"audio/{job_id}.wav" upload_audio(audio_key, wav_bytes) + transcript = await deepgram_client.transcribe_bytes(wav_bytes, request.target_language) + await summarise_job_repository.save_audio_transcript(db, job, transcript) + await summarise_job_repository.mark_succeeded(db, job, audio_key) except Exception as exc: diff --git a/api/app/routers/api/jobs.py b/api/app/routers/api/jobs.py index 56b9934..8cd1798 100644 --- a/api/app/routers/api/jobs.py +++ b/api/app/routers/api/jobs.py @@ -28,9 +28,12 @@ class JobResponse(BaseModel): completed_at: datetime | None = None # only present on success generated_text: str | None = None + generated_text_pos: dict | None = None translated_text: str | None = None + translated_text_pos: dict | None = None input_summary: str | None = None audio_url: str | None = None + audio_transcript: dict | None = None # only present on failure error_message: str | None = None model_config = {"from_attributes": True} @@ -40,6 +43,8 @@ class JobSummary(BaseModel): id: uuid.UUID status: str created_at: datetime + completed_at: datetime | None = None + error_message: str | None = None class JobListResponse(BaseModel): @@ -85,9 +90,12 @@ async def get_job( if job.status == "succeeded": response.generated_text = job.generated_text + response.generated_text_pos = job.target_pos_data response.translated_text = job.translated_text + response.translated_text_pos = job.source_pos_data response.input_summary = job.input_summary response.audio_url = job.audio_url + response.audio_transcript = job.audio_transcript elif job.status == "failed": response.error_message = job.error_message