149 lines
5.4 KiB
Python
149 lines
5.4 KiB
Python
import asyncio
|
|
import random
|
|
import re
|
|
import uuid
|
|
from typing import Any, Callable, Coroutine
|
|
|
|
import anthropic
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from ...outbound.postgres.repositories import summarise_job_repository
|
|
from ...outbound.postgres.repositories.translated_article_repository import TranslatedArticleRepository
|
|
from ...outbound.anthropic.anthropic_client import AnthropicClient
|
|
from ...outbound.deepgram.deepgram_client import LocalDeepgramClient
|
|
from ...outbound.deepl.deepl_client import DeepLClient
|
|
from ...outbound.gemini.gemini_client import GeminiClient
|
|
from ...outbound.spacy.spacy_client import SpacyClient
|
|
from ...storage import upload_audio
|
|
from ...languages import SUPPORTED_LANGUAGES
|
|
|
|
|
|
|
|
_ANTHROPIC_RETRYABLE = (
|
|
anthropic.RateLimitError,
|
|
anthropic.InternalServerError,
|
|
anthropic.APITimeoutError,
|
|
anthropic.APIConnectionError,
|
|
)
|
|
_MAX_RETRIES = 4
|
|
_BASE_DELAY = 1.0
|
|
_MAX_DELAY = 60.0
|
|
|
|
|
|
async def _anthropic_with_backoff(
|
|
coro_fn: Callable[..., Coroutine[Any, Any, Any]],
|
|
*args: Any,
|
|
**kwargs: Any,
|
|
) -> Any:
|
|
for attempt in range(_MAX_RETRIES + 1):
|
|
try:
|
|
return await coro_fn(*args, **kwargs)
|
|
except _ANTHROPIC_RETRYABLE as exc:
|
|
if attempt == _MAX_RETRIES:
|
|
raise
|
|
retry_after: float | None = None
|
|
if isinstance(exc, anthropic.RateLimitError):
|
|
raw = exc.response.headers.get("retry-after")
|
|
if raw is not None:
|
|
retry_after = float(raw)
|
|
if retry_after is None:
|
|
retry_after = min(_BASE_DELAY * (2 ** attempt), _MAX_DELAY)
|
|
jittered = retry_after * (0.8 + random.random() * 0.4)
|
|
await asyncio.sleep(jittered)
|
|
|
|
|
|
class SummariseService:
|
|
def __init__(
|
|
self,
|
|
anthropic_client: AnthropicClient,
|
|
deepgram_client: LocalDeepgramClient,
|
|
deepl_client: DeepLClient,
|
|
gemini_client: GeminiClient,
|
|
spacy_client: SpacyClient,
|
|
) -> None:
|
|
self.anthropic_client = anthropic_client
|
|
self.deepgram_client = deepgram_client
|
|
self.deepl_client = deepl_client
|
|
self.gemini_client = gemini_client
|
|
self.spacy_client = spacy_client
|
|
|
|
def _first_heading(self, md: str) -> str | None:
|
|
m = re.search(r'^#{1,2}\s+(.+)', md, re.MULTILINE)
|
|
return m.group(1).strip() if m else None
|
|
|
|
def _split_title_and_body(self, text: str) -> tuple[str, str]:
|
|
"""Splits the text into a title (first heading) and body (the rest)."""
|
|
title = self._first_heading(text) or ""
|
|
body = text[len(title):].lstrip() if title else text
|
|
if title == "":
|
|
title = "Untitled Article"
|
|
|
|
return title, body
|
|
|
|
async def run(
|
|
self,
|
|
db: AsyncSession,
|
|
job_id: uuid.UUID,
|
|
article_id: uuid.UUID,
|
|
source_language: str,
|
|
target_language: str,
|
|
complexity_level: str,
|
|
input_texts: list[str],
|
|
) -> None:
|
|
article_repo = TranslatedArticleRepository(db)
|
|
job = await summarise_job_repository.get_by_id(db, job_id)
|
|
await summarise_job_repository.mark_processing(db, job)
|
|
|
|
try:
|
|
language_name = SUPPORTED_LANGUAGES[target_language]
|
|
source_material = "\n\n".join(input_texts[:3])
|
|
|
|
generated_text = await _anthropic_with_backoff(
|
|
self.anthropic_client.generate_summary_text,
|
|
content_to_summarise=source_material,
|
|
complexity_level=complexity_level,
|
|
from_language=language_name,
|
|
to_language=language_name,
|
|
length_preference="200-400 words",
|
|
)
|
|
|
|
generated_title, generated_text_without_title = self._split_title_and_body(generated_text)
|
|
|
|
await article_repo.update_content(
|
|
article_id,
|
|
target_title=generated_title,
|
|
target_body=generated_text_without_title,
|
|
source_title="",
|
|
source_body="",
|
|
)
|
|
|
|
translated_text = await self.deepl_client.translate(generated_text, source_language)
|
|
|
|
translated_title, translated_text_without_title = self._split_title_and_body(translated_text)
|
|
|
|
await article_repo.update_content(
|
|
article_id,
|
|
target_title=generated_title,
|
|
target_body=generated_text_without_title,
|
|
source_title=translated_title,
|
|
source_body=translated_text_without_title,
|
|
)
|
|
|
|
target_pos_data = self.spacy_client.get_parts_of_speech(generated_text_without_title, target_language)
|
|
source_pos_data = self.spacy_client.get_parts_of_speech(translated_text_without_title, source_language)
|
|
|
|
await article_repo.update_pos(article_id, target_pos_data, source_pos_data)
|
|
|
|
voice = self.gemini_client.get_voice_by_language(target_language)
|
|
wav_bytes = await self.gemini_client.generate_audio(generated_text, voice)
|
|
audio_key = f"audio/{job_id}.wav"
|
|
upload_audio(audio_key, wav_bytes)
|
|
|
|
transcript = await self.deepgram_client.transcribe_bytes(wav_bytes, target_language)
|
|
|
|
await article_repo.update_audio(article_id, audio_key, transcript)
|
|
|
|
await summarise_job_repository.mark_succeeded(db, job)
|
|
|
|
except Exception as exc:
|
|
await summarise_job_repository.mark_failed(db, job, str(exc))
|