API: Add the part_of_speech and transcript generation in the sumamry job

This commit is contained in:
wilson 2026-03-27 09:45:44 +00:00
parent 3d5551c3d9
commit 746aa5f382
9 changed files with 140 additions and 21 deletions

View file

@ -0,0 +1,29 @@
"""add source_pos_data, target_pos_data, audio_transcript to jobs
Revision ID: 0003
Revises: 0002
Create Date: 2026-03-27
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
from sqlalchemy.dialects import postgresql
revision: str = "0003"
down_revision: Union[str, None] = "0002"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
op.add_column("jobs", sa.Column("source_pos_data", postgresql.JSONB(), nullable=True))
op.add_column("jobs", sa.Column("target_pos_data", postgresql.JSONB(), nullable=True))
op.add_column("jobs", sa.Column("audio_transcript", postgresql.JSONB(), nullable=True))
def downgrade() -> None:
op.drop_column("jobs", "audio_transcript")
op.drop_column("jobs", "target_pos_data")
op.drop_column("jobs", "source_pos_data")

View file

@ -6,6 +6,7 @@ class Settings(BaseSettings):
jwt_secret: str
anthropic_api_key: str
deepl_api_key: str
deepgram_api_key: str
gemini_api_key: str
storage_endpoint_url: str
storage_access_key: str

View file

@ -15,6 +15,9 @@ class SummariseJob:
translated_text: str
error_message: str
audio_url: str
source_pos_data: dict | None
target_pos_data: dict | None
audio_transcript: dict | None
created_at: datetime
started_at: datetime | None = None
completed_at: datetime | None = None

View file

@ -19,18 +19,17 @@ class AnthropicClient():
) -> str:
return (
f"You are a language learning content creator.\n"
f"The user will provide input, you will generate an engaging realistic summary text in {to_language} "
f"at {complexity_level} proficiency level (CEFR scale).\n\n"
f"You generate markdown summaries of user-provided content.\n"
f"You will provide content in {to_language} at {complexity_level} proficiency level on the CEFR scale.\n"
f"The text you generate will:\n"
f"- Contain ONLY the generated text in {to_language}.\n"
f"- Be appropriate for a {complexity_level} {to_language} speaker.\n"
f"- Never generate inappropriate (hateful, sexual, violent) content. It is preferable to return no text than to generate such content.\n"
f"- Contain ONLY the generated summary text in {to_language}.\n"
f"- Never contain inappropriate (hateful, sexual, violent) content. It is preferable to return no text than to generate such content.\n"
f"- Speak directly to the reader/listener, adopting the tone and style of a semi-formal news reporter or podcaster.\n"
f"- Where appropriate (fluency level, content), use a small number of idiomatic expressions.\n"
f"- Be formatted in markdown with paragraphs and line breaks.\n"
f"- Be {length_preference} long.\n"
f"- Be inspired by the following source material "
f"(but written originally in {from_language}):\n\n"
f"- Where appropriate use at least one additional tense, beyond the default of the content.\n"
f"- Be formatted in markdown. Contain a single level 1 header (#) at the top, followed by paragraphs and line breaks.\n"
f"- Be around {length_preference} long.\n"
f"- Be inspired by the content, but not the tone, of the source material."
)
def _create_prompt_summarise_text(

View file

@ -7,15 +7,18 @@ class LocalDeepgramClient:
def __init__(self, api_key: str):
self.deepgram_client = AsyncDeepgramClient(api_key=api_key)
async def transcribe_local_file(self, local_file_path: str, language_code: str):
async def transcribe_bytes(self, audio_bytes: bytes, language_code: str) -> dict:
response = await self.deepgram_client.listen.v1.media.transcribe_file(
request=audio_bytes,
model="nova-3",
language=language_code,
utterances=True,
smart_format=True,
)
return response.results.json()
async def transcribe_local_file(self, local_file_path: str, language_code: str) -> dict:
with open(local_file_path, "rb") as audio_file:
response = await self.deepgram_client.listen.v1.media.transcribe_file(
request=audio_file.read(),
model="nova-3",
language=language_code,
utterances=True,
smart_format=True,
)
return await self.transcribe_bytes(audio_file, language_code)
return response.results

View file

@ -3,7 +3,7 @@ from datetime import datetime, timezone
from sqlalchemy import String, Text, DateTime, ForeignKey
from sqlalchemy.orm import Mapped, mapped_column
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.dialects.postgresql import UUID, JSONB
from ..database import Base
@ -25,6 +25,9 @@ class SummariseJobEntity(Base):
translated_text: Mapped[str | None] = mapped_column(Text, nullable=True)
error_message: Mapped[str | None] = mapped_column(Text, nullable=True)
audio_url: Mapped[str | None] = mapped_column(Text, nullable=True)
source_pos_data: Mapped[dict | None] = mapped_column(JSONB, nullable=True)
target_pos_data: Mapped[dict | None] = mapped_column(JSONB, nullable=True)
audio_transcript: Mapped[dict | None] = mapped_column(JSONB, nullable=True)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
default=lambda: datetime.now(timezone.utc),

View file

@ -46,6 +46,9 @@ class PostgresSummariseJobRepository:
translated_text=entity.translated_text,
error_message=entity.error_message,
audio_url=entity.audio_url,
source_pos_data=entity.source_pos_data,
target_pos_data=entity.target_pos_data,
audio_transcript=entity.audio_transcript,
created_at=entity.created_at,
started_at=entity.started_at,
completed_at=entity.completed_at,
@ -112,6 +115,26 @@ async def save_translated_text(
await update(db, job)
async def save_pos_data(
db: AsyncSession,
job: SummariseJobEntity,
source_pos_data: dict,
target_pos_data: dict,
) -> None:
job.source_pos_data = source_pos_data
job.target_pos_data = target_pos_data
await update(db, job)
async def save_audio_transcript(
db: AsyncSession,
job: SummariseJobEntity,
audio_transcript: dict,
) -> None:
job.audio_transcript = audio_transcript
await update(db, job)
async def mark_succeeded(db: AsyncSession, job: SummariseJobEntity, audio_url: str) -> None:
job.status = "succeeded"
job.audio_url = audio_url

View file

@ -1,6 +1,10 @@
import asyncio
import random
import uuid
from functools import partial
from typing import Any, Callable, Coroutine
import anthropic
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
from sqlalchemy.ext.asyncio import AsyncSession
@ -11,13 +15,48 @@ from ...storage import upload_audio
from ...outbound.postgres.database import get_db, AsyncSessionLocal
from ...outbound.postgres.repositories import summarise_job_repository
from ...outbound.anthropic.anthropic_client import AnthropicClient
from ...outbound.deepgram.deepgram_client import LocalDeepgramClient
from ...outbound.deepl.deepl_client import DeepLClient
from ...outbound.gemini.gemini_client import GeminiClient
from ...outbound.spacy.spacy_client import SpacyClient
from ...config import settings
from ... import worker
router = APIRouter(prefix="/generate", tags=["api"])
_ANTHROPIC_RETRYABLE = (
anthropic.RateLimitError,
anthropic.InternalServerError,
anthropic.APITimeoutError,
anthropic.APIConnectionError,
)
_MAX_RETRIES = 4
_BASE_DELAY = 1.0 # seconds
_MAX_DELAY = 60.0 # seconds
async def _anthropic_with_backoff(
coro_fn: Callable[..., Coroutine[Any, Any, Any]],
*args: Any,
**kwargs: Any,
) -> Any:
for attempt in range(_MAX_RETRIES + 1):
try:
return await coro_fn(*args, **kwargs)
except _ANTHROPIC_RETRYABLE as exc:
if attempt == _MAX_RETRIES:
raise
retry_after: float | None = None
if isinstance(exc, anthropic.RateLimitError):
raw = exc.response.headers.get("retry-after")
if raw is not None:
retry_after = float(raw)
if retry_after is None:
retry_after = min(_BASE_DELAY * (2 ** attempt), _MAX_DELAY)
# ±20 % jitter to spread out concurrent retries
jittered = retry_after * (0.8 + random.random() * 0.4)
await asyncio.sleep(jittered)
class GenerationRequest(BaseModel):
target_language: str
@ -32,8 +71,10 @@ class GenerationResponse(BaseModel):
async def _run_generation(job_id: uuid.UUID, request: GenerationRequest) -> None:
anthropic_client = AnthropicClient.new(settings.anthropic_api_key)
deepgram_client = LocalDeepgramClient(settings.deepgram_api_key)
deepl_client = DeepLClient(settings.deepl_api_key)
gemini_client = GeminiClient(settings.gemini_api_key)
spacy_client = SpacyClient()
async with AsyncSessionLocal() as db:
job = await summarise_job_repository.get_by_id(db, job_id)
@ -44,7 +85,8 @@ async def _run_generation(job_id: uuid.UUID, request: GenerationRequest) -> None
source_material = "\n\n".join(request.input_texts[:3])
generated_text = await anthropic_client.generate_summary_text(
generated_text = await _anthropic_with_backoff(
anthropic_client.generate_summary_text,
content_to_summarise=source_material,
complexity_level=request.complexity_level,
from_language=language_name,
@ -60,11 +102,19 @@ async def _run_generation(job_id: uuid.UUID, request: GenerationRequest) -> None
await summarise_job_repository.save_translated_text(db, job, translated_text)
target_pos_data = spacy_client.get_parts_of_speech(generated_text, request.target_language)
source_pos_data = spacy_client.get_parts_of_speech(translated_text, request.source_language)
await summarise_job_repository.save_pos_data(db, job, source_pos_data, target_pos_data)
voice = gemini_client.get_voice_by_language(request.target_language)
wav_bytes = await gemini_client.generate_audio(generated_text, voice)
audio_key = f"audio/{job_id}.wav"
upload_audio(audio_key, wav_bytes)
transcript = await deepgram_client.transcribe_bytes(wav_bytes, request.target_language)
await summarise_job_repository.save_audio_transcript(db, job, transcript)
await summarise_job_repository.mark_succeeded(db, job, audio_key)
except Exception as exc:

View file

@ -28,9 +28,12 @@ class JobResponse(BaseModel):
completed_at: datetime | None = None
# only present on success
generated_text: str | None = None
generated_text_pos: dict | None = None
translated_text: str | None = None
translated_text_pos: dict | None = None
input_summary: str | None = None
audio_url: str | None = None
audio_transcript: dict | None = None
# only present on failure
error_message: str | None = None
model_config = {"from_attributes": True}
@ -40,6 +43,8 @@ class JobSummary(BaseModel):
id: uuid.UUID
status: str
created_at: datetime
completed_at: datetime | None = None
error_message: str | None = None
class JobListResponse(BaseModel):
@ -85,9 +90,12 @@ async def get_job(
if job.status == "succeeded":
response.generated_text = job.generated_text
response.generated_text_pos = job.target_pos_data
response.translated_text = job.translated_text
response.translated_text_pos = job.source_pos_data
response.input_summary = job.input_summary
response.audio_url = job.audio_url
response.audio_transcript = job.audio_transcript
elif job.status == "failed":
response.error_message = job.error_message