language-learning-app/api/app/domain/services/summarise_service.py

149 lines
5.4 KiB
Python

import asyncio
import random
import re
import uuid
from typing import Any, Callable, Coroutine
import anthropic
from sqlalchemy.ext.asyncio import AsyncSession
from ...outbound.postgres.repositories import summarise_job_repository
from ...outbound.postgres.repositories.translated_article_repository import TranslatedArticleRepository
from ...outbound.anthropic.anthropic_client import AnthropicClient
from ...outbound.deepgram.deepgram_client import LocalDeepgramClient
from ...outbound.deepl.deepl_client import DeepLClient
from ...outbound.gemini.gemini_client import GeminiClient
from ...outbound.spacy.spacy_client import SpacyClient
from ...storage import upload_audio
from ...languages import SUPPORTED_LANGUAGES
_ANTHROPIC_RETRYABLE = (
anthropic.RateLimitError,
anthropic.InternalServerError,
anthropic.APITimeoutError,
anthropic.APIConnectionError,
)
_MAX_RETRIES = 4
_BASE_DELAY = 1.0
_MAX_DELAY = 60.0
async def _anthropic_with_backoff(
coro_fn: Callable[..., Coroutine[Any, Any, Any]],
*args: Any,
**kwargs: Any,
) -> Any:
for attempt in range(_MAX_RETRIES + 1):
try:
return await coro_fn(*args, **kwargs)
except _ANTHROPIC_RETRYABLE as exc:
if attempt == _MAX_RETRIES:
raise
retry_after: float | None = None
if isinstance(exc, anthropic.RateLimitError):
raw = exc.response.headers.get("retry-after")
if raw is not None:
retry_after = float(raw)
if retry_after is None:
retry_after = min(_BASE_DELAY * (2 ** attempt), _MAX_DELAY)
jittered = retry_after * (0.8 + random.random() * 0.4)
await asyncio.sleep(jittered)
class SummariseService:
def __init__(
self,
anthropic_client: AnthropicClient,
deepgram_client: LocalDeepgramClient,
deepl_client: DeepLClient,
gemini_client: GeminiClient,
spacy_client: SpacyClient,
) -> None:
self.anthropic_client = anthropic_client
self.deepgram_client = deepgram_client
self.deepl_client = deepl_client
self.gemini_client = gemini_client
self.spacy_client = spacy_client
def _first_heading(self, md: str) -> str | None:
m = re.search(r'^#{1,2}\s+(.+)', md, re.MULTILINE)
return m.group(1).strip() if m else None
def _split_title_and_body(self, text: str) -> tuple[str, str]:
"""Splits the text into a title (first heading) and body (the rest)."""
title = self._first_heading(text) or ""
body = text[len(title):].lstrip() if title else text
if title == "":
title = "Untitled Article"
return title, body
async def run(
self,
db: AsyncSession,
job_id: uuid.UUID,
article_id: uuid.UUID,
source_language: str,
target_language: str,
complexity_level: str,
input_texts: list[str],
) -> None:
article_repo = TranslatedArticleRepository(db)
job = await summarise_job_repository.get_by_id(db, job_id)
await summarise_job_repository.mark_processing(db, job)
try:
language_name = SUPPORTED_LANGUAGES[target_language]
source_material = "\n\n".join(input_texts[:3])
generated_text = await _anthropic_with_backoff(
self.anthropic_client.generate_summary_text,
content_to_summarise=source_material,
complexity_level=complexity_level,
from_language=language_name,
to_language=language_name,
length_preference="200-400 words",
)
generated_title, generated_text_without_title = self._split_title_and_body(generated_text)
await article_repo.update_content(
article_id,
target_title=generated_title,
target_body=generated_text_without_title,
source_title="",
source_body="",
)
translated_text = await self.deepl_client.translate(generated_text, source_language)
translated_title, translated_text_without_title = self._split_title_and_body(translated_text)
await article_repo.update_content(
article_id,
target_title=generated_title,
target_body=generated_text_without_title,
source_title=translated_title,
source_body=translated_text_without_title,
)
target_pos_data = self.spacy_client.get_parts_of_speech(generated_text_without_title, target_language)
source_pos_data = self.spacy_client.get_parts_of_speech(translated_text_without_title, source_language)
await article_repo.update_pos(article_id, target_pos_data, source_pos_data)
voice = self.gemini_client.get_voice_by_language(target_language)
wav_bytes = await self.gemini_client.generate_audio(generated_text, voice)
audio_key = f"audio/{job_id}.wav"
upload_audio(audio_key, wav_bytes)
transcript = await self.deepgram_client.transcribe_bytes(wav_bytes, target_language)
await article_repo.update_audio(article_id, audio_key, transcript)
await summarise_job_repository.mark_succeeded(db, job)
except Exception as exc:
await summarise_job_repository.mark_failed(db, job, str(exc))