Start architecting things into domain driven design / hexagonal architecture

This commit is contained in:
wilson 2026-03-25 21:10:10 +00:00
parent 6f5012cb6c
commit 271519204c
30 changed files with 439 additions and 276 deletions

View file

@ -6,8 +6,10 @@ from sqlalchemy import pool
from sqlalchemy.ext.asyncio import async_engine_from_config from sqlalchemy.ext.asyncio import async_engine_from_config
from app.config import settings from app.config import settings
from app.database import Base from app.outbound.postgres.database import Base
import app.models # noqa: F401 — register all models with Base.metadata
import app.outbound.postgres.entities.summarise_job_entity
import app.outbound.postgres.entities.user_entity
config = context.config config = context.config
config.set_main_option("sqlalchemy.url", settings.database_url) config.set_main_option("sqlalchemy.url", settings.database_url)

View file

View file

@ -0,0 +1,25 @@
from dataclasses import dataclass
from datetime import datetime, timezone
@dataclass
class SummariseJob:
id: str
user_id: str
status: str
source_language: str
target_language: str
complexity_level: str
input_summary: str
generated_text: str
translated_text: str
error_message: str
audio_url: str
created_at: datetime
started_at: datetime | None = None
completed_at: datetime | None = None
updated_at: datetime = None
def __post_init__(self):
if self.updated_at is None:
self.updated_at = datetime.now(timezone.utc)

View file

View file

@ -0,0 +1 @@
# TODO: Implement this service, taking the code currently placed in app/routes/api/generation.py

View file

View file

View file

@ -0,0 +1,74 @@
import asyncio
import anthropic
class AnthropicClient():
def __init__(self, api_key: str):
self._client = anthropic.Anthropic(api_key=api_key)
@classmethod
def new(cls, api_key: str) -> "AnthropicClient":
return cls(api_key)
def _create_summarise_text_system_prompt(
self,
complexity_level: str,
from_language: str,
to_language: str,
length_preference="200-400 words",
) -> str:
return (
f"You are a language learning content creator.\n"
f"The user will provide input, you will generate an engaging realistic summary text in {to_language} "
f"at {complexity_level} proficiency level (CEFR scale).\n\n"
f"The text you generate will:\n"
f"- Contain ONLY the generated text in {to_language}.\n"
f"- Be appropriate for a {complexity_level} {to_language} speaker.\n"
f"- Never generate inappropriate (hateful, sexual, violent) content. It is preferable to return no text than to generate such content.\n"
f"- Speak directly to the reader/listener, adopting the tone and style of a semi-formal news reporter or podcaster.\n"
f"- Where appropriate (fluency level, content), use a small number of idiomatic expressions.\n"
f"- Be formatted in markdown with paragraphs and line breaks.\n"
f"- Be {length_preference} long.\n"
f"- Be inspired by the following source material "
f"(but written originally in {from_language}):\n\n"
)
def _create_prompt_summarise_text(
self,
source_material: str,
) -> str:
return (
f"Source material follows: \n\n"
f"{source_material}"
)
async def generate_summary_text(
self,
content_to_summarise: str,
complexity_level: str,
from_language: str,
to_language: str,
length_preference="200-400 words") -> str:
"""Generate text using Anthropic."""
def _call() -> str:
message = self._client.messages.create(
model="claude-sonnet-4-6",
max_tokens=1024,
system=self._create_summarise_text_system_prompt(
complexity_level=complexity_level,
from_language=from_language,
to_language=to_language,
length_preference=length_preference,
),
messages=[
{
"role": "user",
"content": self._create_prompt_summarise_text(
content_to_summarise
)
}
],
)
return message.content[0].text
return await asyncio.to_thread(_call)

View file

View file

@ -0,0 +1,38 @@
import httpx
DEEPL_API_URL = "https://api-free.deepl.com/v2/translate"
DEEPL_LANGUAGE_CODES = {
"en": "EN-GB",
"fr": "FR",
"es": "ES",
"it": "IT",
"de": "DE",
}
class DeepLClient():
def __init__(self, api_key: str):
self._api_key = api_key
def can_translate_to(self, the_language: str) -> bool:
return the_language in DEEPL_LANGUAGE_CODES
async def translate(self, text: str, to_language: str, context: str | None = None) -> str:
target_lang_code = DEEPL_LANGUAGE_CODES[to_language]
async with httpx.AsyncClient() as client:
response = await client.post(
DEEPL_API_URL,
headers={
"Authorization": f"DeepL-Auth-Key {self._api_key}",
"Content-Type": "application/json",
},
json={
"text": [text],
"target_lang": target_lang_code,
"context": context or None,
},
)
response.raise_for_status()
data = response.json()
return data["translations"][0]["text"]

View file

View file

@ -0,0 +1,52 @@
import asyncio
from google import genai
from google.genai import types as genai_types
from ...storage import pcm_to_wav
VOICE_BY_LANGUAGE: dict[str, str] = {
"fr": "Kore",
"es": "Charon",
"it": "Aoede",
"de": "Fenrir",
"en": "Kore",
}
class GeminiClient():
"""Communicate with Google's Gemini LLM"""
def __init__(self, api_key: str):
self._api_key = api_key
def get_voice_by_language(self, target_language: str) -> str:
possible_voice = VOICE_BY_LANGUAGE.get(target_language)
if not possible_voice:
raise ValueError(f"No voice found for language: {target_language}")
return possible_voice
async def generate_audio(self, text: str, voice: str) -> bytes:
"""Generate TTS audio and return WAV bytes."""
def _call() -> bytes:
client = genai.Client(api_key=self._api_key)
response = client.models.generate_content(
model="gemini-2.5-flash-preview-tts",
contents=text,
config=genai_types.GenerateContentConfig(
response_modalities=["AUDIO"],
speech_config=genai_types.SpeechConfig(
voice_config=genai_types.VoiceConfig(
prebuilt_voice_config=genai_types.PrebuiltVoiceConfig(
voice_name=voice,
)
)
),
),
)
pcm_data = response.candidates[0].content.parts[0].inline_data.data
return pcm_to_wav(pcm_data)
return await asyncio.to_thread(_call)

View file

View file

@ -1,7 +1,7 @@
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy.orm import DeclarativeBase from sqlalchemy.orm import DeclarativeBase
from .config import settings from ...config import settings
engine = create_async_engine(settings.database_url) engine = create_async_engine(settings.database_url)
AsyncSessionLocal = async_sessionmaker(engine, expire_on_commit=False) AsyncSessionLocal = async_sessionmaker(engine, expire_on_commit=False)

View file

@ -1,31 +1,13 @@
import uuid import uuid
from datetime import datetime, timezone from datetime import datetime, timezone
from sqlalchemy import String, Text, DateTime, Boolean, ForeignKey from sqlalchemy import String, Text, DateTime, ForeignKey
from sqlalchemy.orm import Mapped, mapped_column from sqlalchemy.orm import Mapped, mapped_column
from sqlalchemy.dialects.postgresql import UUID from sqlalchemy.dialects.postgresql import UUID
from .database import Base from ..database import Base
class SummariseJobEntity(Base):
class User(Base):
__tablename__ = "users"
id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True), primary_key=True, default=uuid.uuid4
)
email: Mapped[str] = mapped_column(String(255), nullable=False, unique=True, index=True)
hashed_password: Mapped[str] = mapped_column(String(255), nullable=False)
is_active: Mapped[bool] = mapped_column(Boolean, nullable=False, default=True)
# TODO(email-verification): set to False and require verification once transactional email is implemented
is_email_verified: Mapped[bool] = mapped_column(Boolean, nullable=False, default=True)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
default=lambda: datetime.now(timezone.utc),
)
class Job(Base):
__tablename__ = "jobs" __tablename__ = "jobs"
id: Mapped[uuid.UUID] = mapped_column( id: Mapped[uuid.UUID] = mapped_column(

View file

@ -0,0 +1,27 @@
import uuid
from datetime import datetime, timezone
from sqlalchemy import String, Text, DateTime, Boolean, ForeignKey
from sqlalchemy.orm import Mapped, mapped_column
from sqlalchemy.dialects.postgresql import UUID
from ..database import Base
class User(Base):
__tablename__ = "users"
id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True), primary_key=True, default=uuid.uuid4
)
email: Mapped[str] = mapped_column(String(255), nullable=False, unique=True, index=True)
hashed_password: Mapped[str] = mapped_column(String(255), nullable=False)
is_active: Mapped[bool] = mapped_column(Boolean, nullable=False, default=True)
# TODO(email-verification): set to False and require verification once transactional email is implemented
is_email_verified: Mapped[bool] = mapped_column(Boolean, nullable=False, default=True)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
default=lambda: datetime.now(timezone.utc),
)

View file

@ -0,0 +1,94 @@
import uuid
from datetime import datetime, timezone
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from ..entities.summarise_job_entity import SummariseJobEntity
async def update(db: AsyncSession, job: SummariseJobEntity) -> None:
await db.commit()
async def create(
db: AsyncSession,
user_id: uuid.UUID,
source_language: str,
target_language: str,
complexity_level: str,
) -> SummariseJobEntity:
job = SummariseJobEntity(
user_id=user_id,
source_language=source_language,
target_language=target_language,
complexity_level=complexity_level,
)
db.add(job)
await db.commit()
await db.refresh(job)
return job
async def get_by_id(db: AsyncSession, job_id: uuid.UUID) -> SummariseJobEntity | None:
return await db.get(SummariseJobEntity, job_id)
async def list_all(db: AsyncSession) -> list[SummariseJobEntity]:
result = await db.execute(
select(SummariseJobEntity).order_by(SummariseJobEntity.created_at.desc())
)
return list(result.scalars().all())
async def get_by_audio_url_and_user(
db: AsyncSession, audio_url: str, user_id: uuid.UUID
) -> SummariseJobEntity | None:
result = await db.execute(
select(SummariseJobEntity).where(
SummariseJobEntity.audio_url == audio_url,
SummariseJobEntity.user_id == user_id,
)
)
return result.scalar_one_or_none()
async def mark_processing(db: AsyncSession, job: SummariseJobEntity) -> None:
job.status = "processing"
job.started_at = datetime.now(timezone.utc)
job.error_message = None
await update(db, job)
async def save_generated_text(
db: AsyncSession,
job: SummariseJobEntity,
generated_text: str,
input_summary: str,
) -> None:
job.generated_text = generated_text
job.input_summary = input_summary
await update(db, job)
async def save_translated_text(
db: AsyncSession,
job: SummariseJobEntity,
translated_text: str,
) -> None:
job.translated_text = translated_text
await update(db, job)
async def mark_succeeded(db: AsyncSession, job: SummariseJobEntity, audio_url: str) -> None:
job.status = "succeeded"
job.audio_url = audio_url
job.completed_at = datetime.now(timezone.utc)
await update(db, job)
async def mark_failed(db: AsyncSession, job: SummariseJobEntity, error: str) -> None:
job.status = "failed"
job.error_message = error
job.completed_at = datetime.now(timezone.utc)
await update(db, job)

View file

@ -0,0 +1,17 @@
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from ..entities.user_entity import User
async def create(db: AsyncSession, email: str, hashed_password: str) -> User:
user = User(email=email, hashed_password=hashed_password)
db.add(user)
await db.commit()
await db.refresh(user)
return user
async def get_by_email(db: AsyncSession, email: str) -> User | None:
result = await db.execute(select(User).where(User.email == email))
return result.scalar_one_or_none()

View file

@ -7,12 +7,13 @@ from sqlalchemy.ext.asyncio import AsyncSession
from ...languages import SUPPORTED_LANGUAGES, SUPPORTED_LEVELS from ...languages import SUPPORTED_LANGUAGES, SUPPORTED_LEVELS
from ...auth import verify_token from ...auth import verify_token
from ...database import get_db, AsyncSessionLocal
from ...models import Job
from ...storage import upload_audio from ...storage import upload_audio
from ...services import llm, tts, job_repo from ...outbound.postgres.database import get_db, AsyncSessionLocal
from ...services.tts import VOICE_BY_LANGUAGE from ...outbound.postgres.repositories import summarise_job_repository
from ...services.translate import translate from ...outbound.anthropic.anthropic_client import AnthropicClient
from ...outbound.deepl.deepl_client import DeepLClient
from ...outbound.gemini.gemini_client import GeminiClient
from ...config import settings
from ... import worker from ... import worker
router = APIRouter(prefix="/generate", tags=["api"]) router = APIRouter(prefix="/generate", tags=["api"])
@ -31,16 +32,20 @@ class GenerationResponse(BaseModel):
async def _run_generation(job_id: uuid.UUID, request: GenerationRequest) -> None: async def _run_generation(job_id: uuid.UUID, request: GenerationRequest) -> None:
anthropic_client = AnthropicClient.new(settings.anthropic_api_key)
deepl_client = DeepLClient(settings.deepl_api_key)
gemini_client = GeminiClient(settings.gemini_api_key)
async with AsyncSessionLocal() as db: async with AsyncSessionLocal() as db:
job = await db.get(Job, job_id) job = await summarise_job_repository.get_by_id(db, job_id)
await job_repo.mark_processing(db, job) await summarise_job_repository.mark_processing(db, job)
try: try:
language_name = SUPPORTED_LANGUAGES[request.target_language] language_name = SUPPORTED_LANGUAGES[request.target_language]
source_material = "\n\n".join(request.input_texts[:3]) source_material = "\n\n".join(request.input_texts[:3])
generated_text = await llm.generate_summary_text( generated_text = await anthropic_client.generate_summary_text(
content_to_summarise=source_material, content_to_summarise=source_material,
complexity_level=request.complexity_level, complexity_level=request.complexity_level,
from_language=language_name, from_language=language_name,
@ -48,26 +53,23 @@ async def _run_generation(job_id: uuid.UUID, request: GenerationRequest) -> None
length_preference="200-400 words", length_preference="200-400 words",
) )
await job_repo.save_generated_text( await summarise_job_repository.save_generated_text(
db, job, generated_text, source_material[:500] db, job, generated_text, source_material[:500]
) )
translated_text = await translate(generated_text, request.source_language) translated_text = await deepl_client.translate(generated_text, request.source_language)
# Save LLM results before attempting TTS so they're preserved on failure await summarise_job_repository.save_translated_text(db, job, translated_text)
await job_repo.save_generated_text(
db, job, generated_text
)
voice = VOICE_BY_LANGUAGE.get(request.target_language, "Kore") voice = gemini_client.get_voice_by_language(request.target_language)
wav_bytes = await tts.generate_audio(generated_text, voice) wav_bytes = await gemini_client.generate_audio(generated_text, voice)
audio_key = f"audio/{job_id}.wav" audio_key = f"audio/{job_id}.wav"
upload_audio(audio_key, wav_bytes) upload_audio(audio_key, wav_bytes)
await job_repo.mark_succeeded(db, job, audio_key) await summarise_job_repository.mark_succeeded(db, job, audio_key)
except Exception as exc: except Exception as exc:
await job_repo.mark_failed(db, job, str(exc)) await summarise_job_repository.mark_failed(db, job, str(exc))
@router.post("", response_model=GenerationResponse, status_code=202) @router.post("", response_model=GenerationResponse, status_code=202)
@ -89,15 +91,13 @@ async def create_generation_job(
f"Supported: {sorted(SUPPORTED_LEVELS)}", f"Supported: {sorted(SUPPORTED_LEVELS)}",
) )
job = Job( job = await summarise_job_repository.create(
db,
user_id=uuid.UUID(token_data["sub"]), user_id=uuid.UUID(token_data["sub"]),
source_language=request.source_language, source_language=request.source_language,
target_language=request.target_language, target_language=request.target_language,
complexity_level=request.complexity_level, complexity_level=request.complexity_level,
) )
db.add(job)
await db.commit()
await db.refresh(job)
await worker.enqueue(partial(_run_generation, job.id, request)) await worker.enqueue(partial(_run_generation, job.id, request))

View file

@ -5,14 +5,13 @@ from functools import partial
from fastapi import APIRouter, Depends, HTTPException from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel from pydantic import BaseModel
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from ...auth import verify_token from ...auth import verify_token
from ...database import get_db, AsyncSessionLocal from ...outbound.postgres.database import get_db, AsyncSessionLocal
from ...models import Job from ...outbound.postgres.repositories import summarise_job_repository
from ...outbound.gemini.gemini_client import GeminiClient
from ...storage import upload_audio from ...storage import upload_audio
from ...services import tts, job_repo from ...config import settings
from ...services.tts import VOICE_BY_LANGUAGE
from ... import worker from ... import worker
router = APIRouter(prefix="/jobs", dependencies=[Depends(verify_token)]) router = APIRouter(prefix="/jobs", dependencies=[Depends(verify_token)])
@ -53,8 +52,7 @@ async def get_jobs(
db: AsyncSession = Depends(get_db), db: AsyncSession = Depends(get_db),
) -> JobListResponse: ) -> JobListResponse:
try: try:
result = await db.execute(select(Job).order_by(Job.created_at.desc())) jobs = await summarise_job_repository.list_all(db)
jobs = result.scalars().all()
return {"jobs": jobs} return {"jobs": jobs}
except Exception as e: except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) raise HTTPException(status_code=500, detail=str(e))
@ -70,7 +68,7 @@ async def get_job(
except ValueError: except ValueError:
raise HTTPException(status_code=400, detail="Invalid job ID format") raise HTTPException(status_code=400, detail="Invalid job ID format")
job: Job | None = await db.get(Job, uid) job = await summarise_job_repository.get_by_id(db, uid)
if job is None: if job is None:
raise HTTPException(status_code=404, detail="Job not found") raise HTTPException(status_code=404, detail="Job not found")
@ -97,20 +95,21 @@ async def get_job(
async def _run_regenerate_audio(job_id: uuid.UUID) -> None: async def _run_regenerate_audio(job_id: uuid.UUID) -> None:
gemini_client = GeminiClient(settings.gemini_api_key)
async with AsyncSessionLocal() as db: async with AsyncSessionLocal() as db:
job = await db.get(Job, job_id) job = await summarise_job_repository.get_by_id(db, job_id)
await job_repo.mark_processing(db, job) await summarise_job_repository.mark_processing(db, job)
try: try:
voice = VOICE_BY_LANGUAGE.get(job.target_language, "Kore") voice = gemini_client.get_voice_by_language(job.target_language)
wav_bytes = await tts.generate_audio(job.generated_text, voice) wav_bytes = await gemini_client.generate_audio(job.generated_text, voice)
audio_key = f"audio/{job_id}.wav" audio_key = f"audio/{job_id}.wav"
upload_audio(audio_key, wav_bytes) upload_audio(audio_key, wav_bytes)
await job_repo.mark_succeeded(db, job, audio_key) await summarise_job_repository.mark_succeeded(db, job, audio_key)
except Exception as exc: except Exception as exc:
await job_repo.mark_failed(db, job, str(exc)) await summarise_job_repository.mark_failed(db, job, str(exc))
@router.post("/{job_id}/regenerate-audio", status_code=202) @router.post("/{job_id}/regenerate-audio", status_code=202)
@ -124,7 +123,7 @@ async def regenerate_audio(
except ValueError: except ValueError:
raise HTTPException(status_code=400, detail="Invalid job ID format") raise HTTPException(status_code=400, detail="Invalid job ID format")
job: Job | None = await db.get(Job, uid) job = await summarise_job_repository.get_by_id(db, uid)
if job is None: if job is None:
raise HTTPException(status_code=404, detail="Job not found") raise HTTPException(status_code=404, detail="Job not found")

View file

@ -1,8 +1,9 @@
from fastapi import APIRouter, Depends, HTTPException from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel from pydantic import BaseModel
from ...outbound.deepl.deepl_client import DeepLClient, DEEPL_LANGUAGE_CODES
from ...auth import verify_token from ...auth import verify_token
from ...services.translate import DEEPL_LANGUAGE_CODES, translate from ...config import settings
router = APIRouter(prefix="/translate", router = APIRouter(prefix="/translate",
tags=["api", "translate"], tags=["api", "translate"],
@ -22,12 +23,16 @@ async def translate_text(
target_language: str, target_language: str,
context: str | None = None, context: str | None = None,
) -> TranslationResponse: ) -> TranslationResponse:
if target_language not in DEEPL_LANGUAGE_CODES: deepl_client = DeepLClient(settings.deepl_api_key)
if not deepl_client.can_translate(target_language):
raise HTTPException( raise HTTPException(
status_code=400, status_code=400,
detail=f"Unsupported target language '{target_language}'. Supported: {list(DEEPL_LANGUAGE_CODES)}", detail=f"Unsupported target language '{target_language}'. Supported: {list(DEEPL_LANGUAGE_CODES)}",
) )
translated = await translate(text, target_language, context)
translated = await deepl_client.translate(text, target_language, context)
return TranslationResponse( return TranslationResponse(
text=text, text=text,
target_language=target_language, target_language=target_language,

View file

@ -1,12 +1,11 @@
from fastapi import APIRouter, Depends, HTTPException, status from fastapi import APIRouter, Depends, HTTPException, status
from pydantic import BaseModel, EmailStr from pydantic import BaseModel, EmailStr
from sqlalchemy import select
from sqlalchemy.exc import IntegrityError from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from ..auth import create_access_token, hash_password, verify_password from ..auth import create_access_token, hash_password, verify_password
from ..database import get_db from ..outbound.postgres.database import get_db
from ..models import User from ..outbound.postgres.repositories import user_repository
router = APIRouter(prefix="/auth", tags=["auth"]) router = APIRouter(prefix="/auth", tags=["auth"])
@ -28,14 +27,12 @@ class TokenResponse(BaseModel):
@router.post("/register", status_code=status.HTTP_201_CREATED) @router.post("/register", status_code=status.HTTP_201_CREATED)
async def register(body: RegisterRequest, db: AsyncSession = Depends(get_db)): async def register(body: RegisterRequest, db: AsyncSession = Depends(get_db)):
user = User(
email=body.email,
hashed_password=hash_password(body.password),
)
db.add(user)
try: try:
await db.commit() user = await user_repository.create(
await db.refresh(user) db,
email=body.email,
hashed_password=hash_password(body.password),
)
except IntegrityError: except IntegrityError:
await db.rollback() await db.rollback()
raise HTTPException( raise HTTPException(
@ -52,8 +49,7 @@ async def register(body: RegisterRequest, db: AsyncSession = Depends(get_db)):
@router.post("/login", response_model=TokenResponse) @router.post("/login", response_model=TokenResponse)
async def login(body: LoginRequest, db: AsyncSession = Depends(get_db)): async def login(body: LoginRequest, db: AsyncSession = Depends(get_db)):
result = await db.execute(select(User).where(User.email == body.email)) user = await user_repository.get_by_email(db, body.email)
user = result.scalar_one_or_none()
if user is None or not verify_password(body.password, user.hashed_password): if user is None or not verify_password(body.password, user.hashed_password):
raise HTTPException( raise HTTPException(

View file

@ -2,13 +2,12 @@ import uuid
from fastapi import APIRouter, Depends, HTTPException from fastapi import APIRouter, Depends, HTTPException
from fastapi.responses import Response from fastapi.responses import Response
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from botocore.exceptions import ClientError from botocore.exceptions import ClientError
from ..auth import verify_token from ..auth import verify_token
from ..database import get_db from ..outbound.postgres.database import get_db
from ..models import Job from ..outbound.postgres.repositories import summarise_job_repository
from ..storage import download_audio from ..storage import download_audio
router = APIRouter(prefix="/media", tags=["media"]) router = APIRouter(prefix="/media", tags=["media"])
@ -22,10 +21,7 @@ async def get_media_file(
) -> Response: ) -> Response:
user_id = uuid.UUID(token_data["sub"]) user_id = uuid.UUID(token_data["sub"])
result = await db.execute( job = await summarise_job_repository.get_by_audio_url_and_user(db, filename, user_id)
select(Job).where(Job.audio_url == filename, Job.user_id == user_id)
)
job = result.scalar_one_or_none()
if job is None: if job is None:
raise HTTPException(status_code=404, detail="File not found") raise HTTPException(status_code=404, detail="File not found")

View file

@ -1,46 +0,0 @@
from datetime import datetime, timezone
from sqlalchemy.ext.asyncio import AsyncSession
from ..models import Job
async def mark_processing(db: AsyncSession, job: Job) -> None:
job.status = "processing"
job.started_at = datetime.now(timezone.utc)
job.error_message = None
await db.commit()
async def save_generated_text(
db: AsyncSession,
job: Job,
generated_text: str,
input_summary: str,
) -> None:
job.generated_text = generated_text
job.input_summary = input_summary
await db.commit()
async def save_translated_text(
db: AsyncSession,
job: Job,
translated_text: str,
) -> None:
job.translated_text = translated_text
await db.commit()
async def mark_succeeded(db: AsyncSession, job: Job, audio_url: str) -> None:
job.status = "succeeded"
job.audio_url = audio_url
job.completed_at = datetime.now(timezone.utc)
await db.commit()
async def mark_failed(db: AsyncSession, job: Job, error: str) -> None:
job.status = "failed"
job.error_message = error
job.completed_at = datetime.now(timezone.utc)
await db.commit()

View file

@ -1,75 +0,0 @@
import asyncio
import anthropic
from ..config import settings
def _create_anthropic_client() -> anthropic.Anthropic:
return anthropic.Anthropic(api_key=settings.anthropic_api_key)
def _create_system_prompt_summarise_text(
complexity_level: str,
from_language: str,
to_language: str,
length_preference="200-400 words",
) -> str:
return (
f"You are a language learning content creator.\n"
f"The user will provide input, you will generate an engaging realistic summary text in {to_language} "
f"at {complexity_level} proficiency level (CEFR scale).\n\n"
f"The text you generate will:\n"
f"- Contain ONLY the generated text in {to_language}.\n"
f"- Be appropriate for a {complexity_level} {to_language} speaker.\n"
f"- Never generate inappropriate (hateful, sexual, violent) content. It is preferable to return no text than to generate such content.\n"
f"- Speak directly to the reader/listener, adopting the tone and style of a semi-formal news reporter or podcaster.\n"
f"- Where appropriate (fluency level, content), use a small number of idiomatic expressions.\n"
f"- Be formatted in markdown with paragraphs and line breaks.\n"
f"- Be {length_preference} long.\n"
f"- Be inspired by the following source material "
f"(but written originally in {from_language}):\n\n"
)
def _create_prompt_summarise_text(
source_material: str,
) -> str:
return (
f"Source material follows: \n\n"
f"{source_material}"
)
async def generate_summary_text(
content_to_summarise: str,
complexity_level: str,
from_language: str,
to_language: str,
length_preference="200-400 words",) -> str:
"""Generate text using Anthropic."""
def _call() -> str:
client = _create_anthropic_client()
message = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=1024,
messages=[
{
"role": "system",
"content": _create_system_prompt_summarise_text(
complexity_level=complexity_level,
from_language=from_language,
to_language=to_language,
length_preference=length_preference,
)
},
{
"role": "user",
"content": _create_prompt_summarise_text(
content_to_summarise
)
}
],
)
return message.content[0].text
return await asyncio.to_thread(_call)

View file

@ -1,33 +0,0 @@
import httpx
from ..config import settings
DEEPL_API_URL = "https://api-free.deepl.com/v2/translate"
DEEPL_LANGUAGE_CODES = {
"en": "EN-GB",
"fr": "FR",
"es": "ES",
"it": "IT",
"de": "DE",
}
async def translate(text: str, to_language: str, context: str | None = None) -> str:
target_lang_code = DEEPL_LANGUAGE_CODES[to_language]
async with httpx.AsyncClient() as client:
response = await client.post(
DEEPL_API_URL,
headers={
"Authorization": f"DeepL-Auth-Key {settings.deepl_api_key}",
"Content-Type": "application/json",
},
json={
"text": [text],
"target_lang": target_lang_code,
"context": context or None,
},
)
response.raise_for_status()
data = response.json()
return data["translations"][0]["text"]

View file

@ -1,39 +0,0 @@
import asyncio
from google import genai
from google.genai import types as genai_types
from ..config import settings
from ..storage import pcm_to_wav
VOICE_BY_LANGUAGE: dict[str, str] = {
"fr": "Kore",
"es": "Charon",
"it": "Aoede",
"de": "Fenrir",
"en": "Kore",
}
async def generate_audio(text: str, voice: str) -> bytes:
"""Generate TTS audio and return WAV bytes."""
def _call() -> bytes:
client = genai.Client(api_key=settings.gemini_api_key)
response = client.models.generate_content(
model="gemini-2.5-flash-preview-tts",
contents=text,
config=genai_types.GenerateContentConfig(
response_modalities=["AUDIO"],
speech_config=genai_types.SpeechConfig(
voice_config=genai_types.VoiceConfig(
prebuilt_voice_config=genai_types.PrebuiltVoiceConfig(
voice_name=voice,
)
)
),
),
)
pcm_data = response.candidates[0].content.parts[0].inline_data.data
return pcm_to_wav(pcm_data)
return await asyncio.to_thread(_call)

48
api/architecture.md Normal file
View file

@ -0,0 +1,48 @@
# Language Learning App API Architecture
This is a HTTP API, written in Python, using the Fastapi framework.
The code should be organised using both Domain Driven Design and Hexagonal Architecture principles.
## Domain
The `domain` directory contains the core logic of my application, not related to specific implementation details.
This is where all of the logic around the actual language learning lives.
### Models
In `app/domain/models` contains the core Domain Entities, i.e. classes that represent objects for core domain processes.
### Services
The `app/domain/services` directory contains modules that encapsulate the "orchestration" or "choreography" of other components of the system to achieve complex, domain actions.
For example:
- `TextGenerationService` details the step-by-step process of how a series of text is synthesised, audio is generated, parts of speech tagged, timed transcripts generated, etc. which is then used by the learner for language learning.
## Outbound
The `app/outbound` directory contains modules that allow this project to communicate with external systems.
### Repositories and persistence
This project uses SQLAlchemy for persistence and a postgresql database for storage. This is the same in local, deployed, and test environments.
Notably the `app/outbound/postgres` directory contains two modules:
- `entities` contains definitions of tables in the database. These are how the Domain Entities are serialised, but this code specifically is used for serialisation to, and de-serialisation from, persisted data and into Domain Entities from the `models` module.
- `repositories` module contains classes which manage transactions (e.g. CRUD operations) with the psotgresql database, managed through sqlite. Code in the repositories module are what allow Models to be persisted via Entities, allowing all other code to be free of implementation details.
### API Clients
Other modules in the wider `app/outbound` directory are about communicating with specific third-party APIs, notably through HTTP APIs authenticated with an authentication token.
These modules are modelled as Classes.
Example Api Clients in their own modules are:
- `AnthropicClient` to communicate with Anthorpic's LLM, i.e. Claude, to generate text and synthesis.
- `GeminiClient` to communicate with Google's Gemini for text-to-speech generation
- `DeepgramClient` for timestamped speech-to-text transcription