Your reading library
- -- Articles & reading -
+Your recent content
Browse your library of French articles and generated readings. Tap any word for a definition @@ -49,24 +44,6 @@
- -
diff --git a/Makefile b/Makefile index 35c94b5..9c38aad 100644 --- a/Makefile +++ b/Makefile @@ -1,13 +1,13 @@ .PHONY: down build up logs shell lock migrate migration import-dictionary run-prod-locally -build: - docker compose build --no-cache - build-dev: docker compose -f docker-compose-dev.yml --env-file .env build --no-cache - + +build-prod: + docker compose -f docker-compose-prod.yml --env-file .env build --no-cache + up-dev: - docker compose -f docker-compose-dev.yml --env-file .env up -d + docker compose -f docker-compose-dev.yml --env-file .env up -d up-prod: docker compose -f docker-compose-prod.yml --env-file .env.prod up -d diff --git a/api/Dockerfile b/api/Dockerfile index 15ffb9e..3472942 100644 --- a/api/Dockerfile +++ b/api/Dockerfile @@ -2,15 +2,22 @@ FROM python:3.13-slim WORKDIR /app +# install pq (libpq5) - a driver for postgres +RUN apt-get update && apt-get install -y --no-install-recommends \ + libpq5 \ + && rm -rf /var/lib/apt/lists/* + # Install uv for fast, reproducible installs -RUN pip install --no-cache-dir uv alembic +RUN pip install --no-cache-dir uv alembic procrastinate psycopg2-binary # Install Python dependencies from pyproject.toml COPY pyproject.toml . -RUN uv pip install --system --no-cache . +RUN --mount=type=cache,target=/root/.cache/pip \ + uv pip install --no-cache-dir --system --requirements pyproject.toml # Download spaCy language models -RUN python -m spacy download en_core_web_sm && \ +RUN --mount=type=cache,target=/root/.cache/pip \ + python -m spacy download en_core_web_sm && \ python -m spacy download fr_core_news_sm && \ python -m spacy download es_core_news_sm && \ python -m spacy download it_core_news_sm && \ diff --git a/api/alembic/versions/20260523_0019_add_procrastinate_schema.py b/api/alembic/versions/20260523_0019_add_procrastinate_schema.py new file mode 100644 index 0000000..7801741 --- /dev/null +++ b/api/alembic/versions/20260523_0019_add_procrastinate_schema.py @@ -0,0 +1,56 @@ +"""add procrastinate schema + +Revision ID: 0019 +Revises: 0018 +Create Date: 2026-05-23 +""" +from typing import Sequence, Union + +from alembic import op + +revision: str = "0019" +down_revision: Union[str, None] = "0018" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + from procrastinate.schema import SchemaManager + from app.config import settings + import psycopg + + schema_sql = SchemaManager.get_schema() + # asyncpg (used by SQLAlchemy async) cannot execute multi-statement DDL via + # its prepare() path, so we open a direct psycopg connection instead. + with psycopg.connect(settings.procrastinate_database_url, autocommit=True) as conn: + conn.execute(schema_sql) + + +def downgrade() -> None: + op.execute(""" + DROP TABLE IF EXISTS procrastinate_events CASCADE; + DROP TABLE IF EXISTS procrastinate_periodic_defers CASCADE; + DROP TABLE IF EXISTS procrastinate_jobs CASCADE; + DROP TABLE IF EXISTS procrastinate_workers CASCADE; + DROP FUNCTION IF EXISTS procrastinate_defer_jobs_v1 CASCADE; + DROP FUNCTION IF EXISTS procrastinate_defer_periodic_job_v2 CASCADE; + DROP FUNCTION IF EXISTS procrastinate_fetch_job_v2 CASCADE; + DROP FUNCTION IF EXISTS procrastinate_finish_job_v1 CASCADE; + DROP FUNCTION IF EXISTS procrastinate_cancel_job_v1 CASCADE; + DROP FUNCTION IF EXISTS procrastinate_retry_job_v1 CASCADE; + DROP FUNCTION IF EXISTS procrastinate_retry_job_v2 CASCADE; + DROP FUNCTION IF EXISTS procrastinate_notify_queue_job_inserted_v1 CASCADE; + DROP FUNCTION IF EXISTS procrastinate_notify_queue_abort_job_v1 CASCADE; + DROP FUNCTION IF EXISTS procrastinate_trigger_function_status_events_insert_v1 CASCADE; + DROP FUNCTION IF EXISTS procrastinate_trigger_function_status_events_update_v1 CASCADE; + DROP FUNCTION IF EXISTS procrastinate_trigger_function_scheduled_events_v1 CASCADE; + DROP FUNCTION IF EXISTS procrastinate_trigger_abort_requested_events_procedure_v1 CASCADE; + DROP FUNCTION IF EXISTS procrastinate_unlink_periodic_defers_v1 CASCADE; + DROP FUNCTION IF EXISTS procrastinate_register_worker_v1 CASCADE; + DROP FUNCTION IF EXISTS procrastinate_unregister_worker_v1 CASCADE; + DROP FUNCTION IF EXISTS procrastinate_update_heartbeat_v1 CASCADE; + DROP FUNCTION IF EXISTS procrastinate_prune_stalled_workers_v1 CASCADE; + DROP TYPE IF EXISTS procrastinate_job_to_defer_v1 CASCADE; + DROP TYPE IF EXISTS procrastinate_job_event_type CASCADE; + DROP TYPE IF EXISTS procrastinate_job_status CASCADE; + """) diff --git a/api/app/config.py b/api/app/config.py index f70cca8..b40fe85 100644 --- a/api/app/config.py +++ b/api/app/config.py @@ -3,6 +3,7 @@ from pydantic_settings import BaseSettings class Settings(BaseSettings): database_url: str + procrastinate_database_url: str jwt_secret: str anthropic_api_key: str deepl_api_key: str @@ -26,6 +27,7 @@ class Settings(BaseSettings): bunny_token_auth_key: str = "" bunny_storage_endpoint: str = "" stub_generation: bool = False + story_generation_api_provider: str = "gemini" # "gemini" | "anthropic" model_config = {"env_file": ".env"} diff --git a/api/app/domain/models/gen_ai.py b/api/app/domain/models/gen_ai.py new file mode 100644 index 0000000..63bd27d --- /dev/null +++ b/api/app/domain/models/gen_ai.py @@ -0,0 +1,19 @@ +from dataclasses import dataclass +from datetime import datetime +from typing import Protocol + + +@dataclass +class GenAiChatMessage: + actor: str # 'user' | 'agent' + content: str + + +class GenerativeAiClient(Protocol): + async def complete( + self, + system_prompt: str, + messages: list[GenAiChatMessage], + model: str = "", + max_tokens: int = 2048, + ) -> tuple[str, dict]: ... diff --git a/api/app/domain/services/adventure_service.py b/api/app/domain/services/adventure_service.py index f78b9fa..07c1995 100644 --- a/api/app/domain/services/adventure_service.py +++ b/api/app/domain/services/adventure_service.py @@ -3,6 +3,8 @@ import logging import time import uuid +from app.domain.models.gen_ai import GenAiChatMessage, GenerativeAiClient + from ...languages import SUPPORTED_LANGUAGES from ...outbound.anthropic.adventure_prompts import ( build_conversation_messages, @@ -49,7 +51,7 @@ class AdventureService: decision_repo: PostgresAdventureEntryDecisionRepository, translation_repo: PostgresAdventureEntryTranslationRepository, audio_repo: PostgresAdventureEntryAudioRepository, - anthropic_client: AnthropicClient, + gen_ai_client: GenerativeAiClient, deepl_client: DeepLClient, gemini_client: GeminiClient, spacy_client: SpacyClient, @@ -60,7 +62,7 @@ class AdventureService: self.decision_repo = decision_repo self.translation_repo = translation_repo self.audio_repo = audio_repo - self.anthropic_client = anthropic_client + self.gen_ai_client = gen_ai_client self.deepl_client = deepl_client self.gemini_client = gemini_client self.spacy_client = spacy_client @@ -173,17 +175,20 @@ class AdventureService: is_first_entry = current_entry.entry_index == 0 is_final_entry = current_entry.entry_index + 1 == adventure.max_entry_count - prior_entries_with_possible_choices = await self._load_possible_choices_for_entries( - all_entries=[ - e for e in all_entries if e.entry_index < current_entry.entry_index - ], - user_id=user_id, - ) - prior_decisions = await self.decision_repo.list_for_adventure_and_user( - adventure_id=adventure_id, user_id=user_id + prior_entries_with_possible_choices = ( + await self._load_possible_choices_for_entries( + all_entries=[ + e + for e in all_entries + if e.entry_index < current_entry.entry_index + ], + user_id=user_id, + ) ) - language_name = SUPPORTED_LANGUAGES.get(adventure.language, adventure.language) + language_name = SUPPORTED_LANGUAGES.get( + adventure.language, adventure.language + ) competency = adventure.competencies[0] if adventure.competencies else "B1" system_prompt = build_entry_system_prompt( language_name=language_name, @@ -198,11 +203,12 @@ class AdventureService: vibes=adventure.vibes, protagonist=adventure.protagonist, prior_entries_with_choices=prior_entries_with_possible_choices, + max_entry_count=adventure.max_entry_count, ) # ── LLM generation ────────────────────────────────────────────── t0 = time.monotonic() - raw_text, usage_dict = await self.anthropic_client.complete( + raw_text, usage_dict = await self.gen_ai_client.complete( system_prompt=system_prompt, messages=messages, max_tokens=2048, @@ -308,8 +314,12 @@ class AdventureService: # ── TTS ────────────────────────────────────────────────────────── t0 = time.monotonic() voice = self.gemini_client.get_voice_by_language(adventure.language) - story_text_with_tag = "[like a dungeons and dragons gamemaster] " + story_text - wav_bytes = await self.gemini_client.generate_audio(story_text_with_tag, voice) + story_text_with_tag = ( + "[like a dungeons and dragons gamemaster] " + story_text + ) + wav_bytes = await self.gemini_client.generate_audio( + story_text_with_tag, voice + ) timing_tts = time.monotonic() - t0 # ── File upload ─────────────────────────────────────────────────── @@ -344,13 +354,16 @@ class AdventureService: # ── Adventure title (first entry only) ──────────────────────────── if is_first_entry: - title_system = build_title_system_prompt() - title_user = build_title_user_message( - story_text, language_name, adventure.genres, gm_notes - ) - title_raw, _ = await self.anthropic_client.complete( - system_prompt=title_system, - messages=[{"role": "user", "content": title_user}], + title_raw, _ = await self.gen_ai_client.complete( + system_prompt=build_title_system_prompt(), + messages=[ + GenAiChatMessage( + actor="user", + content=build_title_user_message( + story_text, language_name, adventure.genres, gm_notes + ), + ) + ], max_tokens=200, ) title, description = parse_title_response(title_raw) @@ -384,10 +397,12 @@ class AdventureService: build_conversation_messages(). """ result = [] - + for entry in sorted(all_entries, key=lambda e: e.entry_index): choices = await self.choice_repo.list_for_entry(uuid.UUID(entry.id)) - decision = await self.decision_repo.get_for_entry_and_user(entry_id = uuid.UUID(entry.id), user_id=user_id) + decision = await self.decision_repo.get_for_entry_and_user( + entry_id=uuid.UUID(entry.id), user_id=user_id + ) selected_choice_id = decision.choice_id if decision else None result.append((entry, choices, selected_choice_id)) return result diff --git a/api/app/main.py b/api/app/main.py index 4db2caa..8449d92 100644 --- a/api/app/main.py +++ b/api/app/main.py @@ -1,29 +1,25 @@ import asyncio from contextlib import asynccontextmanager -from prometheus_fastapi_instrumentator import Instrumentator -from .routers.api import generation, pos -from fastapi import FastAPI -from .routers.api import jobs +from fastapi import FastAPI +from prometheus_fastapi_instrumentator import Instrumentator + +from .observability import setup_observability +from .outbound.storage_factory import init_storage from .routers import media as media_router from .routers.api.main import api_router from .routers.bff.main import bff_router -from .outbound.storage_factory import init_storage -from .observability import setup_observability -from . import worker - +from .tasks.app import procrastinate_app @asynccontextmanager async def lifespan(app: FastAPI): - init_storage() - setup_observability(app) - worker_task = asyncio.create_task(worker.worker_loop()) - yield - worker_task.cancel() - try: - await worker_task - except asyncio.CancelledError: - pass + async with procrastinate_app.open_async(): + worker = asyncio.create_task( + procrastinate_app.run_worker_async(install_signal_handlers=False) + ) + init_storage() + setup_observability(app) + yield app = FastAPI(title="Language Learning API", lifespan=lifespan) @@ -33,6 +29,7 @@ app.include_router(bff_router) app.include_router(media_router.router) Instrumentator().instrument(app).expose(app, should_gzip=True) + @app.get("/health") async def health() -> dict: return {"status": "ok"} diff --git a/api/app/outbound/anthropic/adventure_prompts.py b/api/app/outbound/anthropic/adventure_prompts.py index d119cef..1abca25 100644 --- a/api/app/outbound/anthropic/adventure_prompts.py +++ b/api/app/outbound/anthropic/adventure_prompts.py @@ -5,9 +5,16 @@ parse LLM outputs back into domain values. Nothing in this module makes network calls or holds state. The service layer loads the data; these functions do the translation. """ + import re -from ...domain.models.adventure import AdventureEntry, AdventureEntryPossibleChoice, AdventureEntryPossibleChoiceDecision +from app.domain.models.gen_ai import GenAiChatMessage + +from ...domain.models.adventure import ( + AdventureEntry, + AdventureEntryPossibleChoice, + AdventureEntryPossibleChoiceDecision, +) def build_entry_system_prompt( @@ -19,42 +26,46 @@ def build_entry_system_prompt( ) -> str: halfway = max(1, max_entry_count // 2) return ( - f"You are a game master running a single-player choose-your-own-adventure story " - f"to help the player practise {language_name}, write like a native. \n\n" - f"The session is {max_entry_count} turns. Deliver a satisfying narrative arc: " - f"establish, complicate, escalate, resolve. Don't force convergence until turn {halfway}. " - f"By turn {max_entry_count} the story must conclude clearly. " - f"Track the character the player is building through their choices and reflect it back.\n\n" - f"Write with economy and confidence. Favour scene over summary. " - f"Use dialogue to reveal character rather than reporting what was said. " - f"Resist the urge to over-explain — trust the player.\n\n" - f"Format — your response must have exactly three parts, each separated by a line containing only \"-----\":\n" + f"You are a game master running a single-player TTRPG-like experience " + f"to help the player practise {language_name}\n\n" + f"The narrative will last {max_entry_count} entries, so make them count. " + f"Narratively, you are encouraged to respond to the player's pace - players who " + f"want a narrative-driven piece should be indulged in dialogue and backstory; players " + f"who escalate or investigate get heightened stakes. Don't pre-plan the ending or style from the beginning. " + f"Plot twists are okay, but it's not a melodrama. By turn {max_entry_count} the story must conclude clearly. " + f"Write like a native {language_name} writer, write with economy and confidence. Favour scene over summary. " + f"Show, don't tell - tell the player what they notice (see, think, glimpse, remember, etc.) " + f"Trust and respect the player's intelligence, resist formulaic or random options and outcomes.\n\n" + f"When generating the options for the player, tailor them to the scenario and character that is emerging, " + f"don't present 4 scattered, random options every time. Later in a narrative, the options should be more similar.\n\n " + f'Format — your response must have exactly three parts, each separated by a line containing only "-----":\n' f"Part 1: story passage, {min_length}–{max_length} words, in second person, " f"written entirely in {language_name} at {competency} CEFR level. Plaintext only, no markdown.\n" - f"Part 2: exactly 4 numbered options (\"1.\" through \"4.\"), one per line, in {language_name}.\n" - f"Part 3: GM notes — three lines only:\n" - f" Character: one sentence on what this player's choices reveal about them. When empty, write 'None'.\n" - f" Threads: unresolved plot points or planted details that should pay off later.\n" - f" Next beat: what the next turn needs to do narratively.\n" - f" Do not describe unchosen options or recap what just happened.\n\n" + f'Part 2: exactly 4 numbered options ("1." through "4."), one per line, in {language_name}.\n' + f"Part 3: GM notes. These won't be shown to the player. Use these to help future LLM calls generate " + f"a better TTRPG experience. This may include hidden details, juicy resolutions or twists, your thoughts " + "on the type of options to generate/avoid, or anything that might pay off later. GM notes can be empty.\n" f"No sexual content or graphic violence. Romance, threat, and adventure are fine. " f"12-certificate." ) + """ SECTION: Title generation prompts """ + def build_title_system_prompt() -> str: return ( "You are a creative writing assistant. Given the opening passage of a choose-your-own-adventure " - "story, generate a short title and a one-sentence description for it.\n\n" + "story, and the Game Master's notes, generate a title and a one-sentence description for it.\n\n" "Respond with exactly two lines of plain text:\n" - "Line 1: the title (max 60 characters, no quotes or labels)\n" + "Line 1: the title (max 12 words)\n" "Line 2: the description (max 200 characters, no quotes or labels)\n\n" "Avoid the following tropes: 'The secret of [noun]', 'The [noun] of [noun]'" ) - + + def build_title_user_message( first_entry_text: str, language_name: str, @@ -67,7 +78,8 @@ def build_title_user_message( f"The gamemaster has provided the following (hidden from the player) notes. " f"Consider using non-spoiler details:\n{gamemaster_notes}" ) - + + def parse_title_response(text: str) -> tuple[str, str]: """Parse a two-line title/description response. @@ -107,39 +119,59 @@ def reconstruct_assistant_message( return f"{entry.story_text}\n-----\n{options_block}\n-----\n{gm_block}" +def reconstruct_choice_message( + choice_label: str, choice_index: int, max_entry_count: int +) -> str: + return f"Choice: {choice_label}. Please use this to generate entry {choice_index + 1} of {max_entry_count}." + + def build_conversation_messages( genres: list[str], setting: list[str], vibes: list[str], protagonist: list[str], - prior_entries_with_choices: list[tuple[AdventureEntry, list[AdventureEntryPossibleChoice], str | None]], -) -> list[dict]: + prior_entries_with_choices: list[ + tuple[AdventureEntry, list[AdventureEntryPossibleChoice], str | None] + ], + max_entry_count: int, +) -> list[GenAiChatMessage]: """Build the full messages array for an Anthropic API call. prior_entries is a list of (entry, choices_for_that_entry, selected_choice_id). The chosen label is the label of the option the player picked to advance past that entry. For the most recent completed entry it will be None (no choice made yet). """ - messages: list[dict] = [ - {"role": "user", "content": build_initial_user_message(genres, setting, vibes, protagonist)} - ] - for entry, choices, selected_choice_id in prior_entries_with_choices: - + first_message = GenAiChatMessage( + actor="user", + content=build_initial_user_message(genres, setting, vibes, protagonist), + ) + messages: list[GenAiChatMessage] = [first_message] + + for index, (entry, choices, selected_choice_id) in enumerate( + prior_entries_with_choices + ): chosen_choice = next((c for c in choices if c.id == selected_choice_id), None) - + if selected_choice_id is None or chosen_choice is None: - # We have a problem, no decision was recorded for this entry + # We have a problem, no decision was recorded for this entry print(f"Warning: no decision found for entry {entry.id}") continue - - - + messages.append( - {"role": "assistant", "content": reconstruct_assistant_message(entry, choices)} + GenAiChatMessage( + actor="agent", content=reconstruct_assistant_message(entry, choices) + ) ) - - messages.append({"role": "user", "content": chosen_choice.label}) - + + messages.append( + GenAiChatMessage( + actor="user", + content=reconstruct_choice_message( + chosen_choice.label, index, max_entry_count + ), + ) + ) + return messages @@ -173,6 +205,3 @@ def parse_entry_response(text: str) -> tuple[str, list[tuple[str, str]], str]: raise ValueError("No choices parsed from LLM response options section") return story_text, choices, gm_notes - - - diff --git a/api/app/outbound/anthropic/anthropic_client.py b/api/app/outbound/anthropic/anthropic_client.py index e34c9a3..948c0f3 100644 --- a/api/app/outbound/anthropic/anthropic_client.py +++ b/api/app/outbound/anthropic/anthropic_client.py @@ -1,8 +1,11 @@ import asyncio + import anthropic +from app.domain.models.gen_ai import GenAiChatMessage -class AnthropicClient(): + +class AnthropicClient: def __init__(self, api_key: str): self._client = anthropic.Anthropic(api_key=api_key) @@ -11,11 +14,11 @@ class AnthropicClient(): return cls(api_key) def _create_summarise_text_system_prompt( - self, - complexity_level: str, - from_language: str, - to_language: str, - length_preference="200-400 words", + self, + complexity_level: str, + from_language: str, + to_language: str, + length_preference="200-400 words", ) -> str: return ( f"You are a language learning content creator.\n" @@ -33,18 +36,23 @@ class AnthropicClient(): ) def _create_prompt_summarise_text( - self, - source_material: str, + self, + source_material: str, ) -> str: - return ( - f"Source material follows: \n\n" - f"{source_material}" - ) + return f"Source material follows: \n\n{source_material}" + + def _messages_to_anthropic_messages( + self, messages: list[GenAiChatMessage] + ) -> list[dict]: + def transform(message: GenAiChatMessage) -> dict: + return {"role": message.actor, "content": message.content} + + return list(map(transform, messages)) async def complete( self, system_prompt: str, - messages: list[dict], + messages: list[GenAiChatMessage], model: str = "claude-sonnet-4-6", max_tokens: int = 2048, ) -> tuple[str, dict]: @@ -53,12 +61,13 @@ class AnthropicClient(): Returns (response_text, usage_dict) where usage_dict contains provider, model name, and token counts for cost tracking. """ + def _call() -> tuple[str, dict]: message = self._client.messages.create( model=model, max_tokens=max_tokens, system=system_prompt, - messages=messages, + messages=self._messages_to_anthropic_messages(messages), ) usage = { "provider": "anthropic", @@ -71,13 +80,15 @@ class AnthropicClient(): return await asyncio.to_thread(_call) async def generate_summary_text( - self, - content_to_summarise: str, - complexity_level: str, - from_language: str, - to_language: str, - length_preference="200-400 words") -> str: + self, + content_to_summarise: str, + complexity_level: str, + from_language: str, + to_language: str, + length_preference="200-400 words", + ) -> str: """Generate text using Anthropic.""" + def _call() -> str: message = self._client.messages.create( model="claude-sonnet-4-6", @@ -93,7 +104,7 @@ class AnthropicClient(): "role": "user", "content": self._create_prompt_summarise_text( content_to_summarise - ) + ), } ], ) diff --git a/api/app/outbound/gemini/gemini_client.py b/api/app/outbound/gemini/gemini_client.py index f25e112..413457c 100644 --- a/api/app/outbound/gemini/gemini_client.py +++ b/api/app/outbound/gemini/gemini_client.py @@ -1,10 +1,13 @@ import asyncio import io +import json import wave from google import genai from google.genai import types as genai_types +from app.domain.models.gen_ai import GenAiChatMessage + def _pcm_to_wav(pcm_data: bytes, sample_rate: int = 24000) -> bytes: buf = io.BytesIO() @@ -15,6 +18,7 @@ def _pcm_to_wav(pcm_data: bytes, sample_rate: int = 24000) -> bytes: wf.writeframes(pcm_data) return buf.getvalue() + VOICE_BY_LANGUAGE: dict[str, str] = { "fr": "Kore", "es": "Charon", @@ -24,11 +28,16 @@ VOICE_BY_LANGUAGE: dict[str, str] = { } -class GeminiClient(): +class GeminiClient: """Communicate with Google's Gemini LLM""" + def __init__(self, api_key: str): self._api_key = api_key + @classmethod + def new(cls, api_key: str) -> "GeminiClient": + return GeminiClient(api_key) + def get_voice_by_language(self, target_language: str) -> str: possible_voice = VOICE_BY_LANGUAGE.get(target_language) @@ -37,9 +46,55 @@ class GeminiClient(): return possible_voice + def _make_gemini_messags( + self, messages: list[GenAiChatMessage] + ) -> list[genai_types.Content]: + def transform(message: GenAiChatMessage) -> genai_types.Content: + role_name = "model" + + if message.actor == "user": + role_name = "user" + + return genai_types.Content( + role=role_name, + parts=[genai_types.Part.from_text(text=message.content)], + ) + + return list(map(transform, messages)) + + async def complete( + self, + system_prompt: str, + messages: list[GenAiChatMessage], + model: str = "gemini-3.1-flash-lite", + max_tokens: int = 2048, + ) -> tuple[str, dict]: + client = genai.Client(api_key=self._api_key) + + contents = self._make_gemini_messags(messages) + + response = client.models.generate_content( + model=model, + contents=contents, + config=genai_types.GenerateContentConfig( + system_instruction=system_prompt, + temperature=1.5, + top_p=0.95, + max_output_tokens=max_tokens, + ), + ) + + response_text = response.candidates[0].content.parts[0].text + response_metadata = { + "model": model, + "total_token_count": response.usage_metadata.total_token_count, + } + + return response_text, response_metadata async def generate_audio(self, text: str, voice: str) -> bytes: """Generate TTS audio and return WAV bytes.""" + def _call() -> bytes: client = genai.Client(api_key=self._api_key) response = client.models.generate_content( diff --git a/api/app/outbound/postgres/database.py b/api/app/outbound/postgres/database.py index 24c20ad..fd9a3fd 100644 --- a/api/app/outbound/postgres/database.py +++ b/api/app/outbound/postgres/database.py @@ -1,16 +1,29 @@ -from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker +from collections.abc import AsyncGenerator + +from sqlalchemy.ext.asyncio import ( + AsyncEngine, + AsyncSession, + async_sessionmaker, + create_async_engine, +) from sqlalchemy.orm import DeclarativeBase from ...config import settings -engine = create_async_engine(settings.database_url) -AsyncSessionLocal = async_sessionmaker(engine, expire_on_commit=False) +engine: AsyncEngine = create_async_engine( + settings.database_url, + pool_pre_ping=True, +) +AsyncSessionLocal = async_sessionmaker[AsyncSession]( + engine, + expire_on_commit=False, +) class Base(DeclarativeBase): pass -async def get_db() -> AsyncSession: +async def get_db() -> AsyncGenerator[AsyncSession, None]: async with AsyncSessionLocal() as session: yield session diff --git a/api/app/outbound/stubs.py b/api/app/outbound/stubs.py new file mode 100644 index 0000000..db069e6 --- /dev/null +++ b/api/app/outbound/stubs.py @@ -0,0 +1,60 @@ +import io +import wave + + +class StubAnthropicClient: + async def complete( + self, + system_prompt: str, + messages: list[dict], + model: str = "claude-sonnet-4-6", + max_tokens: int = 2048, + ) -> tuple[str, dict]: + usage = { + "provider": "stub", + "model": "stub", + "input_tokens": 0, + "output_tokens": 0, + } + if "game master" in system_prompt.lower(): + return ( + "Vous vous retrouvez dans une ruelle sombre de Paris. " + "Une silhouette mystérieuse s'approche lentement.\n" + "-----\n" + "1. Suivez la silhouette dans l'obscurité\n" + "2. Restez dans l'ombre et observez\n" + "3. Demandez de l'aide à voix haute\n" + "4. Courez vers la lumière au bout de la ruelle\n" + "-----\n" + "no notes" + ), usage + return "La Nuit Parisienne\nUne aventure mystérieuse dans les rues sombres de Paris.", usage + + +class StubDeepLClient: + def can_translate_to(self, lang: str) -> bool: + return True + + async def translate( + self, text: str, to_language: str, context: str | None = None + ) -> str: + return f"[STUB] {text[:120]}" + + +class StubGeminiClient: + def get_voice_by_language(self, lang: str) -> str: + return "Stub" + + async def generate_audio(self, text: str, voice: str) -> bytes: + buf = io.BytesIO() + with wave.open(buf, "wb") as wf: + wf.setnchannels(1) + wf.setsampwidth(2) + wf.setframerate(24000) + wf.writeframes(b"\x00" * 480) + return buf.getvalue() + + +class StubSpacyClient: + def get_parts_of_speech(self, text: str, language: str) -> dict: + return {"language": language, "sentences": [{"text": text, "tokens": []}]} diff --git a/api/app/routers/api/adventures.py b/api/app/routers/api/adventures.py index 256acea..575cab9 100644 --- a/api/app/routers/api/adventures.py +++ b/api/app/routers/api/adventures.py @@ -1,13 +1,9 @@ -import io import uuid -import wave -from functools import partial from fastapi import APIRouter, Depends, HTTPException from pydantic import BaseModel from sqlalchemy.ext.asyncio import AsyncSession -from ... import worker from ...auth import verify_token from ...config import settings from ...domain.services.adventure_service import AdventureService @@ -15,8 +11,7 @@ from ...languages import SUPPORTED_LANGUAGES from ...outbound.anthropic.anthropic_client import AnthropicClient from ...outbound.deepl.deepl_client import DeepLClient from ...outbound.gemini.gemini_client import GeminiClient -from ...outbound.postgres.database import AsyncSessionLocal, get_db -from ...outbound.spacy.spacy_client import SpacyClient +from ...outbound.postgres.database import get_db from ...outbound.postgres.repositories.adventure_repository import ( PostgresAdventureEntryAudioRepository, PostgresAdventureEntryChoiceRepository, @@ -25,116 +20,18 @@ from ...outbound.postgres.repositories.adventure_repository import ( PostgresAdventureEntryTranslationRepository, PostgresAdventureRepository, ) +from ...outbound.spacy.spacy_client import SpacyClient +from ...outbound.stubs import ( + StubAnthropicClient, + StubDeepLClient, + StubGeminiClient, + StubSpacyClient, +) +from ...tasks import generate_adventure_entry router = APIRouter(prefix="/adventures", tags=["adventures"]) -# --------------------------------------------------------------------------- -# Stub clients for the test environment (STUB_GENERATION=true) -# --------------------------------------------------------------------------- - -_STUB_ENTRY_RESPONSE = ( - "Vous vous retrouvez dans une ruelle sombre de Paris. " - "Une silhouette mystérieuse s'approche lentement.\n" - "-----\n" - "1. Suivez la silhouette dans l'obscurité\n" - "2. Restez dans l'ombre et observez\n" - "3. Demandez de l'aide à voix haute\n" - "4. Courez vers la lumière au bout de la ruelle\n" - "-----\n" - "no notes" -) -_STUB_TITLE_RESPONSE = ( - "La Nuit Parisienne\nUne aventure mystérieuse dans les rues sombres de Paris." -) - - -class _StubAnthropicClient: - async def complete( - self, - system_prompt: str, - messages: list[dict], - model: str = "claude-sonnet-4-6", - max_tokens: int = 2048, - ) -> tuple[str, dict]: - usage = { - "provider": "stub", - "model": "stub", - "input_tokens": 0, - "output_tokens": 0, - } - if "game master" in system_prompt.lower(): - return _STUB_ENTRY_RESPONSE, usage - return _STUB_TITLE_RESPONSE, usage - - -class _StubDeepLClient: - def can_translate_to(self, lang: str) -> bool: - return True - - async def translate( - self, text: str, to_language: str, context: str | None = None - ) -> str: - return f"[STUB] {text[:120]}" - - -class _StubGeminiClient: - def get_voice_by_language(self, lang: str) -> str: - return "Stub" - - async def generate_audio(self, text: str, voice: str) -> bytes: - buf = io.BytesIO() - with wave.open(buf, "wb") as wf: - wf.setnchannels(1) - wf.setsampwidth(2) - wf.setframerate(24000) - wf.writeframes(b"\x00" * 480) - return buf.getvalue() - - -class _StubSpacyClient: - def get_parts_of_speech(self, text: str, language: str) -> dict: - return {"language": language, "sentences": [{"text": text, "tokens": []}]} - - -# --------------------------------------------------------------------------- -# Service factory -# --------------------------------------------------------------------------- - - -def _make_service(db: AsyncSession) -> AdventureService: - if settings.stub_generation: - anthropic = _StubAnthropicClient() # type: ignore[assignment] - deepl = _StubDeepLClient() # type: ignore[assignment] - gemini = _StubGeminiClient() # type: ignore[assignment] - spacy = _StubSpacyClient() # type: ignore[assignment] - else: - anthropic = AnthropicClient.new(settings.anthropic_api_key) - deepl = DeepLClient(settings.deepl_api_key) - gemini = GeminiClient(settings.gemini_api_key) - spacy = SpacyClient() - - return AdventureService( - adventure_repo=PostgresAdventureRepository(db), - entry_repo=PostgresAdventureEntryRepository(db), - choice_repo=PostgresAdventureEntryChoiceRepository(db), - decision_repo=PostgresAdventureEntryDecisionRepository(db), - translation_repo=PostgresAdventureEntryTranslationRepository(db), - audio_repo=PostgresAdventureEntryAudioRepository(db), - anthropic_client=anthropic, - deepl_client=deepl, - gemini_client=gemini, - spacy_client=spacy, - ) - - -async def _run_entry_pipeline_task( - adventure_id: uuid.UUID, entry_id: uuid.UUID, user_id: uuid.UUID -) -> None: - async with AsyncSessionLocal() as db: - await _make_service(db).run_entry_pipeline(adventure_id, entry_id, user_id) - - # --------------------------------------------------------------------------- # Request / response models # --------------------------------------------------------------------------- @@ -216,6 +113,31 @@ class EntryDetailResponse(BaseModel): # --------------------------------------------------------------------------- +def _service(db: AsyncSession) -> AdventureService: + if settings.stub_generation: + anthropic = StubAnthropicClient() # type: ignore[assignment] + deepl = StubDeepLClient() # type: ignore[assignment] + gemini = StubGeminiClient() # type: ignore[assignment] + spacy = StubSpacyClient() # type: ignore[assignment] + else: + anthropic = AnthropicClient.new(settings.anthropic_api_key) + deepl = DeepLClient(settings.deepl_api_key) + gemini = GeminiClient(settings.gemini_api_key) + spacy = SpacyClient() + return AdventureService( + adventure_repo=PostgresAdventureRepository(db), + entry_repo=PostgresAdventureEntryRepository(db), + choice_repo=PostgresAdventureEntryChoiceRepository(db), + decision_repo=PostgresAdventureEntryDecisionRepository(db), + translation_repo=PostgresAdventureEntryTranslationRepository(db), + audio_repo=PostgresAdventureEntryAudioRepository(db), + gen_ai_client=anthropic, + deepl_client=deepl, + gemini_client=gemini, + spacy_client=spacy, + ) + + def _to_adventure_response(adventure) -> AdventureResponse: return AdventureResponse( id=adventure.id, @@ -262,9 +184,9 @@ async def create_adventure( ) deepl_client = ( - DeepLClient(settings.deepl_api_key) - if not settings.stub_generation - else _StubDeepLClient() + StubDeepLClient() + if settings.stub_generation + else DeepLClient(settings.deepl_api_key) ) # type: ignore[assignment] if not deepl_client.can_translate_to(body.source_language): raise HTTPException( @@ -274,7 +196,7 @@ async def create_adventure( # Word count is e.g. "100-200 Words", convert to a tuple of ints (100, 200) try: - word_count_range = tuple( + word_count_range = list( int(x.strip().split()[0]) for x in body.entry_word_count_range.split("-") ) if len(word_count_range) != 2 or word_count_range[0] >= word_count_range[1]: @@ -285,7 +207,7 @@ async def create_adventure( detail="Invalid entry_word_count_range. Expected format 'min-max Words', e.g. '100-200 Words'", ) - adventure, first_entry = await _make_service(db).create_adventure_for_user( + adventure, first_entry = await _service(db).create_adventure_for_user( user_id=user_id, language=body.language, source_language=body.source_language, @@ -297,10 +219,10 @@ async def create_adventure( max_entry_count=body.max_entry_count, entry_word_count_range=word_count_range, ) - await worker.enqueue( - partial( - _run_entry_pipeline_task, uuid.UUID(adventure.id), uuid.UUID(first_entry.id), user_id - ) + await generate_adventure_entry.defer_async( + adventure_id=str(adventure.id), + entry_id=str(first_entry.id), + user_id=str(user_id), ) return _to_adventure_response(adventure) @@ -361,7 +283,7 @@ async def record_decision( raise HTTPException(status_code=400, detail="Invalid choice_id") try: - decision, next_entry = await _make_service( + decision, next_entry = await _service( db ).record_decision_and_prepare_next_entry( adventure_id=_parse_adventure_id(adventure_id), @@ -380,13 +302,10 @@ async def record_decision( raise HTTPException(status_code=409, detail="decision_already_made") raise HTTPException(status_code=400, detail=key) - await worker.enqueue( - partial( - _run_entry_pipeline_task, - uuid.UUID(next_entry.adventure_id), - uuid.UUID(next_entry.id), - user_id, - ) + await generate_adventure_entry.defer_async( + adventure_id=str(next_entry.adventure_id), + entry_id=str(next_entry.id), + user_id=str(user_id), ) return DecisionResponse( id=decision.id, diff --git a/api/app/routers/api/generation.py b/api/app/routers/api/generation.py index 88c63b0..400294b 100644 --- a/api/app/routers/api/generation.py +++ b/api/app/routers/api/generation.py @@ -1,23 +1,15 @@ import uuid -from functools import partial from fastapi import APIRouter, Depends, HTTPException from pydantic import BaseModel from sqlalchemy.ext.asyncio import AsyncSession -from ...languages import SUPPORTED_LANGUAGES, SUPPORTED_LEVELS from ...auth import require_admin -from ...outbound.postgres.database import get_db, AsyncSessionLocal +from ...languages import SUPPORTED_LANGUAGES, SUPPORTED_LEVELS +from ...outbound.postgres.database import get_db from ...outbound.postgres.repositories import summarise_job_repository from ...outbound.postgres.repositories.translated_article_repository import TranslatedArticleRepository -from ...outbound.anthropic.anthropic_client import AnthropicClient -from ...outbound.deepgram.deepgram_client import LocalDeepgramClient -from ...outbound.deepl.deepl_client import DeepLClient -from ...outbound.gemini.gemini_client import GeminiClient -from ...outbound.spacy.spacy_client import SpacyClient -from ...domain.services.summarise_service import SummariseService -from ...config import settings -from ... import worker +from ...tasks import summarise_article router = APIRouter(prefix="/generate", tags=["api"]) @@ -33,30 +25,6 @@ class GenerationResponse(BaseModel): job_id: str -async def _run_generation( - job_id: uuid.UUID, - article_id: uuid.UUID, - request: GenerationRequest, -) -> None: - service = SummariseService( - anthropic_client=AnthropicClient.new(settings.anthropic_api_key), - deepgram_client=LocalDeepgramClient(settings.deepgram_api_key), - deepl_client=DeepLClient(settings.deepl_api_key), - gemini_client=GeminiClient(settings.gemini_api_key), - spacy_client=SpacyClient(), - ) - async with AsyncSessionLocal() as db: - await service.run( - db=db, - job_id=job_id, - article_id=article_id, - source_language=request.source_language, - target_language=request.target_language, - complexity_level=request.complexity_level, - input_texts=request.input_texts, - ) - - @router.post("", response_model=GenerationResponse, status_code=202) async def create_generation_job( request: GenerationRequest, @@ -88,6 +56,13 @@ async def create_generation_job( translated_article_id=uuid.UUID(article.id), ) - await worker.enqueue(partial(_run_generation, job.id, uuid.UUID(article.id), request)) + await summarise_article.defer_async( + job_id=str(job.id), + article_id=str(article.id), + source_language=request.source_language, + target_language=request.target_language, + complexity_level=request.complexity_level, + input_texts=request.input_texts, + ) return GenerationResponse(job_id=str(job.id)) diff --git a/api/app/routers/api/jobs.py b/api/app/routers/api/jobs.py index 9e89d74..b16c3ec 100644 --- a/api/app/routers/api/jobs.py +++ b/api/app/routers/api/jobs.py @@ -1,20 +1,15 @@ import uuid from datetime import datetime -from functools import partial from fastapi import APIRouter, Depends, HTTPException from pydantic import BaseModel from sqlalchemy.ext.asyncio import AsyncSession from ...auth import require_admin -from ...outbound.postgres.database import get_db, AsyncSessionLocal -from ...outbound.postgres.repositories import summarise_job_repository -from ...outbound.postgres.repositories.translated_article_repository import TranslatedArticleRepository +from ...outbound.postgres.database import get_db from ...outbound.postgres.entities.translated_article_entity import TranslatedArticleEntity -from ...outbound.gemini.gemini_client import GeminiClient -from ...outbound.storage_client import get_storage_client -from ...config import settings -from ... import worker +from ...outbound.postgres.repositories import summarise_job_repository +from ...tasks import regenerate_audio_for_job router = APIRouter(prefix="/jobs", dependencies=[Depends(require_admin)]) @@ -80,31 +75,6 @@ async def get_job( ) -async def _run_regenerate_audio(job_id: uuid.UUID) -> None: - gemini_client = GeminiClient(settings.gemini_api_key) - async with AsyncSessionLocal() as db: - job = await summarise_job_repository.get_by_id(db, job_id) - article_repo = TranslatedArticleRepository(db) - article_entity = await db.get(TranslatedArticleEntity, job.translated_article_id) - await summarise_job_repository.mark_processing(db, job) - - try: - voice = gemini_client.get_voice_by_language(article_entity.target_language) - wav_bytes = await gemini_client.generate_audio(article_entity.target_body, voice) - audio_key = f"audio/{job_id}.wav" - get_storage_client().upload(audio_key, wav_bytes) - - await article_repo.update_audio( - article_entity.id, - audio_url=audio_key, - target_body_transcript=article_entity.target_body_transcript, - ) - await summarise_job_repository.mark_succeeded(db, job) - - except Exception as exc: - await summarise_job_repository.mark_failed(db, job, str(exc)) - - @router.post("/{job_id}/regenerate-audio", status_code=202) async def regenerate_audio( job_id: str, @@ -137,5 +107,5 @@ async def regenerate_audio( if job.status == "processing": raise HTTPException(status_code=409, detail="Job is already processing") - await worker.enqueue(partial(_run_regenerate_audio, uid)) + await regenerate_audio_for_job.defer_async(job_id=str(uid)) return {"job_id": job_id} diff --git a/api/app/tasks/__init__.py b/api/app/tasks/__init__.py new file mode 100644 index 0000000..9146c4f --- /dev/null +++ b/api/app/tasks/__init__.py @@ -0,0 +1,11 @@ +from .app import procrastinate_app +from .adventure import generate_adventure_entry +from .summarise import summarise_article +from .regenerate_audio import regenerate_audio_for_job + +__all__ = [ + "procrastinate_app", + "generate_adventure_entry", + "summarise_article", + "regenerate_audio_for_job", +] diff --git a/api/app/tasks/adventure.py b/api/app/tasks/adventure.py new file mode 100644 index 0000000..2dbd4d9 --- /dev/null +++ b/api/app/tasks/adventure.py @@ -0,0 +1,75 @@ +import logging +import uuid + +from ..config import settings +from ..domain.services.adventure_service import AdventureService +from ..outbound.anthropic.anthropic_client import AnthropicClient +from ..outbound.deepl.deepl_client import DeepLClient +from ..outbound.gemini.gemini_client import GeminiClient +from ..outbound.postgres.database import AsyncSessionLocal +from ..outbound.postgres.repositories.adventure_repository import ( + PostgresAdventureEntryAudioRepository, + PostgresAdventureEntryChoiceRepository, + PostgresAdventureEntryDecisionRepository, + PostgresAdventureEntryRepository, + PostgresAdventureEntryTranslationRepository, + PostgresAdventureRepository, +) +from ..outbound.spacy.spacy_client import SpacyClient +from ..outbound.stubs import ( + StubAnthropicClient, + StubDeepLClient, + StubGeminiClient, + StubSpacyClient, +) +from .app import procrastinate_app + +logger = logging.getLogger(__name__) + + +def _make_adventure_service(db) -> AdventureService: + if settings.stub_generation: + anthropic = StubAnthropicClient() # type: ignore[assignment] + gemini = StubAnthropicClient() # type: ignore[assignment] + deepl = StubDeepLClient() # type: ignore[assignment] + gemini = StubGeminiClient() # type: ignore[assignment] + spacy = StubSpacyClient() # type: ignore[assignment] + else: + anthropic = AnthropicClient.new(settings.anthropic_api_key) + gemini = GeminiClient.new(settings.gemini_api_key) + deepl = DeepLClient(settings.deepl_api_key) + gemini = GeminiClient(settings.gemini_api_key) + spacy = SpacyClient() + + gen_ai_client = gemini + if settings.story_generation_api_provider == "anthropic": + gen_ai_client = anthropic + + return AdventureService( + adventure_repo=PostgresAdventureRepository(db), + entry_repo=PostgresAdventureEntryRepository(db), + choice_repo=PostgresAdventureEntryChoiceRepository(db), + decision_repo=PostgresAdventureEntryDecisionRepository(db), + translation_repo=PostgresAdventureEntryTranslationRepository(db), + audio_repo=PostgresAdventureEntryAudioRepository(db), + gen_ai_client=gen_ai_client, + deepl_client=deepl, + gemini_client=gemini, + spacy_client=spacy, + ) + + +@procrastinate_app.task(queue="adventure_pipeline") +async def generate_adventure_entry( + adventure_id: str, entry_id: str, user_id: str +) -> None: + print( + f"Starting task for adventure_id={adventure_id}, entry_id={entry_id}, user_id={user_id}" + ) + async with AsyncSessionLocal() as db: + service = _make_adventure_service(db) + await service.run_entry_pipeline( + uuid.UUID(adventure_id), + uuid.UUID(entry_id), + uuid.UUID(user_id), + ) diff --git a/api/app/tasks/app.py b/api/app/tasks/app.py new file mode 100644 index 0000000..e42e11c --- /dev/null +++ b/api/app/tasks/app.py @@ -0,0 +1,19 @@ +from procrastinate import App, PsycopgConnector + +from ..config import settings + + +procrastinate_app = App( + connector=PsycopgConnector(conninfo=settings.procrastinate_database_url), + import_paths=[ + "app.tasks.adventure", + "app.tasks.regenerate_audio", + "app.tasks.summarise", + ], + ) + +if __name__ == "__main__": + procrastinate_app.run_worker( + queues=["adventure_pipeline", "default"] + ) + diff --git a/api/app/tasks/regenerate_audio.py b/api/app/tasks/regenerate_audio.py new file mode 100644 index 0000000..b446eb8 --- /dev/null +++ b/api/app/tasks/regenerate_audio.py @@ -0,0 +1,40 @@ +import logging +import uuid + +from ..config import settings +from ..outbound.gemini.gemini_client import GeminiClient +from ..outbound.postgres.database import AsyncSessionLocal +from ..outbound.postgres.entities.translated_article_entity import TranslatedArticleEntity +from ..outbound.postgres.repositories import summarise_job_repository +from ..outbound.postgres.repositories.translated_article_repository import TranslatedArticleRepository +from ..outbound.storage_client import get_storage_client +from .app import procrastinate_app + +logger = logging.getLogger(__name__) + + +@procrastinate_app.task(queue="default") +async def regenerate_audio_for_job(job_id: str) -> None: + uid = uuid.UUID(job_id) + gemini_client = GeminiClient(settings.gemini_api_key) + async with AsyncSessionLocal() as db: + job = await summarise_job_repository.get_by_id(db, uid) + article_repo = TranslatedArticleRepository(db) + article_entity = await db.get(TranslatedArticleEntity, job.translated_article_id) + await summarise_job_repository.mark_processing(db, job) + + try: + voice = gemini_client.get_voice_by_language(article_entity.target_language) + wav_bytes = await gemini_client.generate_audio(article_entity.target_body, voice) + audio_key = f"audio/{job_id}.wav" + get_storage_client().upload(audio_key, wav_bytes) + + await article_repo.update_audio( + article_entity.id, + audio_url=audio_key, + target_body_transcript=article_entity.target_body_transcript, + ) + await summarise_job_repository.mark_succeeded(db, job) + + except Exception as exc: + await summarise_job_repository.mark_failed(db, job, str(exc)) diff --git a/api/app/tasks/summarise.py b/api/app/tasks/summarise.py new file mode 100644 index 0000000..50f2ca3 --- /dev/null +++ b/api/app/tasks/summarise.py @@ -0,0 +1,45 @@ +import logging +import uuid + +from ..config import settings +from ..domain.services.summarise_service import SummariseService +from ..outbound.anthropic.anthropic_client import AnthropicClient +from ..outbound.deepgram.deepgram_client import LocalDeepgramClient +from ..outbound.deepl.deepl_client import DeepLClient +from ..outbound.gemini.gemini_client import GeminiClient +from ..outbound.postgres.database import AsyncSessionLocal +from ..outbound.spacy.spacy_client import SpacyClient +from .app import procrastinate_app + +logger = logging.getLogger(__name__) + + +def _make_summarise_service() -> SummariseService: + return SummariseService( + anthropic_client=AnthropicClient.new(settings.anthropic_api_key), + deepgram_client=LocalDeepgramClient(settings.deepgram_api_key), + deepl_client=DeepLClient(settings.deepl_api_key), + gemini_client=GeminiClient(settings.gemini_api_key), + spacy_client=SpacyClient(), + ) + + +@procrastinate_app.task(queue="default") +async def summarise_article( + job_id: str, + article_id: str, + source_language: str, + target_language: str, + complexity_level: str, + input_texts: list[str], +) -> None: + async with AsyncSessionLocal() as db: + await _make_summarise_service().run( + db=db, + job_id=uuid.UUID(job_id), + article_id=uuid.UUID(article_id), + source_language=source_language, + target_language=target_language, + complexity_level=complexity_level, + input_texts=input_texts, + ) diff --git a/api/app/worker.py b/api/app/worker.py deleted file mode 100644 index c0f82ef..0000000 --- a/api/app/worker.py +++ /dev/null @@ -1,22 +0,0 @@ -import asyncio -import logging -from typing import Awaitable, Callable - -logger = logging.getLogger(__name__) - -_queue: asyncio.Queue[Callable[[], Awaitable[None]]] = asyncio.Queue() - - -async def enqueue(task: Callable[[], Awaitable[None]]) -> None: - await _queue.put(task) - - -async def worker_loop() -> None: - while True: - task = await _queue.get() - try: - await task() - except Exception: - logger.exception("Unhandled error in worker task") - finally: - _queue.task_done() diff --git a/api/docs/technical-design-queue.md b/api/docs/technical-design-queue.md new file mode 100644 index 0000000..4159ab9 --- /dev/null +++ b/api/docs/technical-design-queue.md @@ -0,0 +1,466 @@ +# Technical Design: Persistent Job Queue with Procrastinate + +**Status:** Draft — drafted by LLM, reviewed by human developer. +**Scope:** Migration of adventure entry pipeline from in-process `asyncio.Queue` to Procrastinate (PostgreSQL-backed), plus groundwork for future scheduled jobs. + +--- + +## Problem + +`app/worker.py` is a plain `asyncio.Queue` running inside the API process. Its limitations: + +- **No persistence.** Any enqueued jobs are silently lost if the API process restarts or is redeployed. +- **No retry.** A transient failure (network blip calling Anthropic/DeepL/Gemini) permanently sets the entry status to `'error'`. +- **No scheduling.** We want to run nightly jobs (e.g. news digest generation) on a cron trigger. +- **Contention.** Long-running LLM + TTS + NLP pipelines share the same process and event loop as the HTTP API. + +--- + +## Solution: Procrastinate + separate worker container + +[Procrastinate](https://procrastinate.readthedocs.io) is a Python asyncio task queue backed by PostgreSQL. It uses `LISTEN/NOTIFY` for fast job dispatch with a polling fallback. No new infrastructure is needed — the existing PostgreSQL instance is the queue backing store. + +A dedicated `worker` Docker container is added to every compose file. It shares the same Docker image as `api` (same `./api` build context) but runs a different command. Both containers connect to the same PostgreSQL instance. + +``` +┌─────────────────────┐ defer_async() ┌──────────────────────┐ +│ api (FastAPI) │ ────────────────→ │ PostgreSQL │ +│ port 8000 │ │ procrastinate_jobs │ +└─────────────────────┘ │ procrastinate_events │ + └──────────────────────┘ +┌─────────────────────┐ LISTEN/NOTIFY + │ +│ worker │ ←──────────────── │ +│ (Procrastinate) │ polling fallback │ +└─────────────────────┘ └ +``` + +--- + +## Files Changed + +### New: `api/app/tasks.py` + +Single source of truth for all task definitions. Both the API (to _defer_ tasks) and the worker (to _execute_ them) import this module. + +```python +import uuid +import logging +import procrastinate +from procrastinate.contrib.sqlalchemy import SQLAlchemyAsyncConnector + +from .config import settings +from .outbound.postgres.database import engine, AsyncSessionLocal +from .outbound.anthropic.anthropic_client import AnthropicClient +from .outbound.deepl.deepl_client import DeepLClient +from .outbound.gemini.gemini_client import GeminiClient +from .outbound.spacy.spacy_client import SpacyClient +from .outbound.postgres.repositories.adventure_repository import ( + PostgresAdventureRepository, + PostgresAdventureEntryRepository, + PostgresAdventureEntryChoiceRepository, + PostgresAdventureEntryDecisionRepository, + PostgresAdventureEntryTranslationRepository, + PostgresAdventureEntryAudioRepository, +) +from .domain.services.adventure_service import AdventureService + +logger = logging.getLogger(__name__) + +procrastinate_app = procrastinate.App( + connector=SQLAlchemyAsyncConnector(engine) +) + + +def _make_adventure_service(db) -> AdventureService: + """ + Moved here from adventures.py so the worker can construct the service + without importing the router module. + """ + if settings.stub_generation: + from .routers.api.adventures import ( # avoid circular import at module level + _StubAnthropicClient, _StubDeepLClient, _StubGeminiClient, _StubSpacyClient + ) + anthropic = _StubAnthropicClient() + deepl = _StubDeepLClient() + gemini = _StubGeminiClient() + spacy = _StubSpacyClient() + else: + anthropic = AnthropicClient.new(settings.anthropic_api_key) + deepl = DeepLClient(settings.deepl_api_key) + gemini = GeminiClient(settings.gemini_api_key) + spacy = SpacyClient() + + return AdventureService( + adventure_repo=PostgresAdventureRepository(db), + entry_repo=PostgresAdventureEntryRepository(db), + choice_repo=PostgresAdventureEntryChoiceRepository(db), + decision_repo=PostgresAdventureEntryDecisionRepository(db), + translation_repo=PostgresAdventureEntryTranslationRepository(db), + audio_repo=PostgresAdventureEntryAudioRepository(db), + anthropic_client=anthropic, + deepl_client=deepl, + gemini_client=gemini, + spacy_client=spacy, + ) + + +@procrastinate_app.task(queue="adventure_pipeline") +async def generate_adventure_entry( + adventure_id: str, entry_id: str, user_id: str +) -> None: + async with AsyncSessionLocal() as db: + service = _make_adventure_service(db) + await service.run_entry_pipeline( + uuid.UUID(adventure_id), + uuid.UUID(entry_id), + uuid.UUID(user_id), + ) +``` + +**EDITOR'S NOTE** There's no good reason why the stubs should live in `routers.api.adventures`, and therefore risk circular dependencies. They should be moved to the `outbound.SERVICE_NAME` (e.g. `outbound.bunny.stub_bunny_client`). This will involve updating the dependencies in the API router. + +**Notes on retry strategy:** +`run_entry_pipeline` currently catches all exceptions internally and writes `status='error'` to the DB — it never raises. From Procrastinate's point of view the job always succeeds, so retry is not wired up in this first pass. This preserves the existing behaviour exactly. + +A follow-up improvement (out of scope here) would be to remove the internal catch-all, let exceptions propagate, and configure: + +```python +@procrastinate_app.task( + queue="adventure_pipeline", + retry=procrastinate.RetryStrategy( + max_attempts=3, + wait_minimum=30, + wait_multiplier=2, + wait_jitter=30, + ), +) +``` + +with an `on_abort` hook responsible for writing the `'error'` status after all attempts are exhausted. + +--- + +### New: `api/app/worker_main.py` + +The worker process entrypoint. The Docker `command` points here. + +```python +import asyncio +import logging +from . import tasks # side-effect: registers all task definitions + +logger = logging.getLogger(__name__) + + +async def _run() -> None: + async with tasks.procrastinate_app.open_async(): + logger.info("Procrastinate worker started, queue=adventure_pipeline") + await tasks.procrastinate_app.run_worker_async( + queues=["adventure_pipeline"], + concurrency=4, + ) + + +if __name__ == "__main__": + logging.basicConfig(level=logging.INFO) + asyncio.run(_run()) +``` + +Run command (in docker-compose): `python -m app.worker_main` + +--- + +### New: Alembic migration `YYYYMMDD_0019_add_procrastinate_schema.py` + +Procrastinate manages its own schema independently, but we embed it in Alembic to keep all DB changes in one place and ensure it runs as part of `alembic upgrade head`. + +```python +from alembic import op +import procrastinate.contrib.sqlalchemy + +def upgrade() -> None: + # Procrastinate provides its DDL via the schema manager. + # Run: `procrastinate schema --app=app.tasks.procrastinate_app print-sql` + # to get the SQL and paste it here, or use: + op.execute(procrastinate.contrib.sqlalchemy.SQLAlchemyAsyncConnector.get_schema_manager().get_schema()) + +def downgrade() -> None: + op.execute("DROP SCHEMA procrastinate CASCADE;") + # or the equivalent table-by-table drops if procrastinate uses public schema +``` + +**Alternative (simpler):** Add `await tasks.procrastinate_app.schema.apply_schema_async()` in `worker_main.py` before starting the worker. This runs Procrastinate's own migration tool, which is idempotent. It's less consistent with the project's Alembic-only convention but simpler to maintain as Procrastinate is upgraded. + +**REVIEW NOTE** - yes, let's stick to Alembic, two solutions for migration management would add complexity. + +--- + +### Modified: `api/app/main.py` + +Remove the `worker_loop` asyncio task; open the Procrastinate connector in lifespan so that `defer_async` calls from API routes work. + +```python +# Before +from . import worker + +@asynccontextmanager +async def lifespan(app: FastAPI): + init_storage() + setup_observability(app) + worker_task = asyncio.create_task(worker.worker_loop()) + yield + worker_task.cancel() + try: + await worker_task + except asyncio.CancelledError: + pass + +# After +from . import tasks + +@asynccontextmanager +async def lifespan(app: FastAPI): + async with tasks.procrastinate_app.open_async(): + init_storage() + setup_observability(app) + yield +``` + +The `import asyncio` line can be removed from `main.py` if it has no other uses after this change. + +--- + +### Modified: `api/app/routers/api/adventures.py` + +Two changes: + +1. **Remove** the `_make_service`, `_run_entry_pipeline_task`, and stub client definitions (they move to `tasks.py`). Keep `_make_service` as a thin shim that delegates to `tasks._make_adventure_service` if any remaining synchronous use in the router still needs it (e.g. the `can_translate_to` check in `create_adventure` — this needs a `DeepLClient` instance, which is currently built inline anyway, so no change needed there). + +2. **Replace `worker.enqueue(...)` with `defer_async`** in the two endpoints that trigger pipeline work: + +**`POST /adventures` (create_adventure)** + +```python +# Before +await worker.enqueue( + partial( + _run_entry_pipeline_task, uuid.UUID(adventure.id), uuid.UUID(first_entry.id), user_id + ) +) + +# After +await tasks.generate_adventure_entry.defer_async( + adventure_id=str(adventure.id), + entry_id=str(first_entry.id), + user_id=str(user_id), +) +``` + +**`POST /adventures/{adventure_id}/decisions` (record_decision)** + +```python +# Before +await worker.enqueue( + partial( + _run_entry_pipeline_task, + uuid.UUID(next_entry.adventure_id), + uuid.UUID(next_entry.id), + user_id, + ) +) + +# After +await tasks.generate_adventure_entry.defer_async( + adventure_id=str(next_entry.adventure_id), + entry_id=str(next_entry.id), + user_id=str(user_id), +) +``` + +Procrastinate task arguments must be JSON-serialisable. `uuid.UUID` objects are converted to `str` at the call site; the task function converts them back with `uuid.UUID(...)`. + +--- + +### Deleted: `api/app/worker.py` + +Removed entirely once the migration is complete. No other callers exist outside `adventures.py` and `main.py`. + +--- + +## Docker Compose Changes + +### `docker-compose.yml` (base / local dev) + +Add a `worker` service after `api`: + +```yaml +worker: + build: ./api + volumes: + - ./api:/app:z # hot-reload on code change (same as api) + command: python -m worker.main + environment: + DATABASE_URL: postgresql+asyncpg://${POSTGRES_USER:-langlearn}:${POSTGRES_PASSWORD}@db:5432/${POSTGRES_DB:-langlearn} + JWT_SECRET: ${JWT_SECRET} + ANTHROPIC_API_KEY: ${ANTHROPIC_API_KEY} + DEEPL_API_KEY: ${DEEPL_API_KEY} + DEEPGRAM_API_KEY: ${DEEPGRAM_API_KEY} + GEMINI_API_KEY: ${GEMINI_API_KEY} + PYTHONPATH: /app + STORAGE_ENDPOINT_URL: http://storage:9000 + STORAGE_ACCESS_KEY: ${STORAGE_ACCESS_KEY:-langlearn} + STORAGE_SECRET_KEY: ${STORAGE_SECRET_KEY} + STORAGE_BUCKET: ${STORAGE_BUCKET:-langlearn} + OTEL_SERVICE_NAME: ${OTEL_SERVICE_NAME:-language-learning-worker} + depends_on: + db: + condition: service_healthy + storage: + condition: service_healthy + restart: unless-stopped +``` + +The worker does not need `ports`, the Prometheus exporter config, or `API_BASE_URL`. OTEL service name is changed so traces are distinguishable in Grafana. + +### `docker-compose-dev.yml` + +Same addition as above. The `volumes: - ./api:/app:z` mount means worker code reloads on save — but note that `python -m worker.main` does **not** hot-reload automatically (unlike uvicorn). For local dev, just restart the worker container after code changes: `docker compose restart worker`. + +If hot-reload matters during development, the command can be wrapped with `watchfiles`: + +```yaml +command: watchfiles --filter python "python -m worker.main" /app +``` + +(This requires `watchfiles` in the Python dependencies.) + +### `docker-compose-prod.yml` + +Add a `worker` service. The production command does _not_ run `alembic upgrade head` because migrations are already applied by the `api` container's startup command. The worker just starts: + +```yaml +worker: + build: ./api + command: python -m worker.main + environment: + DATABASE_URL: postgresql+asyncpg://${POSTGRES_USER:-langlearn}:${POSTGRES_PASSWORD}@db:5432/${POSTGRES_DB:-langlearn} + JWT_SECRET: ${JWT_SECRET} + ANTHROPIC_API_KEY: ${ANTHROPIC_API_KEY} + DEEPL_API_KEY: ${DEEPL_API_KEY} + DEEPGRAM_API_KEY: ${DEEPGRAM_API_KEY} + GEMINI_API_KEY: ${GEMINI_API_KEY} + PYTHONPATH: /app + STORAGE_PROVIDER: bunny + BUNNY_ZONE: ${BUNNY_ZONE} + BUNNY_API_KEY: ${BUNNY_API_KEY} + BUNNY_CDN_BASE_URL: ${BUNNY_CDN_BASE_URL} + BUNNY_TOKEN_AUTH_KEY: ${BUNNY_TOKEN_AUTH_KEY} + BUNNY_STORAGE_ENDPOINT: ${BUNNY_STORAGE_ENDPOINT} + OTEL_SERVICE_NAME: ${OTEL_SERVICE_NAME:-language-learning-worker} + depends_on: + db: + condition: service_healthy + restart: unless-stopped + deploy: + resources: + limits: + cpus: "1" + memory: 1G +``` + +The worker depends on `db` only (no `storage` healthcheck needed since storage is Bunny CDN in prod, not a local container). + +### `docker-compose.test.yml` + +Add a `worker` service. Critically, it must receive `STUB_GENERATION: "true"` so it uses stub clients, matching what the API does in tests. + +```yaml +worker: + build: ./api + command: python -m worker.main + environment: + DATABASE_URL: postgresql+asyncpg://langlearn_test:testpassword@db:5432/langlearn_test + JWT_SECRET: test-jwt-secret-not-for-production + ANTHROPIC_API_KEY: test-key + DEEPL_API_KEY: test-key + DEEPGRAM_API_KEY: test-key + GEMINI_API_KEY: test-key + STORAGE_ENDPOINT_URL: http://storage:9000 + STORAGE_ACCESS_KEY: langlearn_test + STORAGE_SECRET_KEY: testpassword123 + STORAGE_BUCKET: langlearn-test + PYTHONPATH: /app + STUB_GENERATION: "true" + depends_on: + db: + condition: service_healthy + storage: + condition: service_healthy +``` + +No healthcheck needed — the worker has no HTTP endpoint, and if it starts late, pending jobs simply wait in the queue until it picks them up. + +--- + +## Testing + +### Integration tests — no changes required + +`tests/test_adventures.py` already polls with `_wait_for_adventure_status(client, id, "active", timeout=30)`. This pattern is compatible with the worker being a separate process: the test enqueues a job, the worker processes it asynchronously, and the test polls until the adventure status reflects completion. + +With stub generation, the pipeline completes in milliseconds. The 30-second timeout is more than sufficient even accounting for worker container startup time. + +The one risk is a **startup race**: if the first test creates an adventure before the worker container has opened its Procrastinate connection, the job sits in the queue unprocessed until the worker is ready. Since `docker compose up --wait` waits for containers with healthchecks to pass (i.e. `api` is healthy before tests run), and the worker starts immediately after `db` is healthy (a prerequisite already met before `api` is healthy), the worker will typically be ready before the first test fires. No action needed, but if this proves flaky in CI, adding a short `pg_isready`-style healthcheck to the worker is the fix. + +### What the tests implicitly verify after migration + +- `POST /adventures` returns `201` and adventure status is `'awaiting_first_entry'` ✓ +- Worker picks up the job and calls `run_entry_pipeline` ✓ +- Adventure transitions to `'active'`, entry to `'complete'` (polled via `_wait_for_adventure_status`) ✓ +- Decision endpoint triggers a second pipeline job ✓ + +The existing tests cover all of this already. No new tests are required for the migration itself, though a test that verifies behaviour on worker restart (jobs not lost) would be a nice addition. + +--- + +## Implementation Order + +1. DONE: Add `procrastinate` (and `procrastinate[sqlalchemy]`) to `api/pyproject.toml` / `requirements`. +2. Write Alembic migration for Procrastinate schema. Run it locally. +3. Move stub services into their `app/outbound` directories +4. Create `app/tasks.py` with the `procrastinate_app` instance and the `generate_adventure_entry` task. +5. Create `app/worker_main.py`. +6. Modify `app/main.py`: remove `worker_loop`, add `procrastinate_app.open_async()` to lifespan. +7. Modify `app/routers/api/adventures.py`: replace `worker.enqueue(...)` with `tasks.generate_adventure_entry.defer_async(...)`. +8. Add `worker` service to all four compose files. +9. Run the test suite: `docker compose -f docker-compose.test.yml up --build --wait -d && pytest`. +10. Delete `app/worker.py`. + +Steps 1–4 can be done before touching the API, so the migration can be tested end-to-end before cutting over. + +--- + +## Open Questions + +1. [ANSWERED] As mentioned above, so this small refactor, it makes sense. **Stub client placement.** The stub classes inside `adventures.py` need to be reachable from `tasks.py`. The proposal above lazy-imports them; the cleaner fix is to extract them to `app/outbound/stubs.py`. Doing this in the same PR keeps scope small but is worth doing if it avoids the circular-import smell. + +2. **Worker concurrency.** `concurrency=4` is a placeholder. Adventure pipeline jobs are I/O-heavy (network calls), not CPU-heavy, so higher concurrency is fine. Tune based on Anthropic/DeepL API rate limits. + +3. **Procrastinate schema management.** Procrastinate has its own versioned migration system (separate from Alembic). When upgrading Procrastinate in future, run `procrastinate schema --app=app.tasks.procrastinate_app migrate` (or wrap it in an Alembic migration). Don't forget this step. + +4. **Observability.** Procrastinate emits structured log lines per job. These will appear in Loki automatically. A future improvement would be to add the `job_id` to the OpenTelemetry trace context inside `generate_adventure_entry`. + +--- + +## Future: Scheduled Jobs (News Digest) + +With Procrastinate in place, cron-style jobs are first-class citizens. Once `tasks.py` exists, adding a nightly job is: + +```python +@procrastinate_app.periodic(cron="0 2 * * *") # 2am daily UTC +async def generate_nightly_news_digest() -> None: + async with AsyncSessionLocal() as db: + await NewsDigestService(db, ...).run() +``` + +The worker process runs periodic tasks automatically; no additional scheduler container is needed. Procrastinate tracks the last fire time in the `procrastinate_periodic_defers` table, so missed runs (e.g. worker was down) fire once on the next startup. diff --git a/api/pyproject.toml b/api/pyproject.toml index 61c056a..c5ecabd 100644 --- a/api/pyproject.toml +++ b/api/pyproject.toml @@ -14,7 +14,7 @@ dependencies = [ "email-validator>=2.0.0", "alembic>=1.13.0", "pydantic-settings>=2.0.0", - "google-genai>=1.0.0", + "google-genai>=2.6.0", "boto3>=1.35.0", "httpx>=0.28.1", "deepgram-sdk>=6.1.0", @@ -25,6 +25,8 @@ dependencies = [ "opentelemetry-exporter-prometheus>=0.63b1", "prometheus-client>=0.25.0", "prometheus-fastapi-instrumentator>=7.1.0", + "procrastinate>=3.8.1", + "watchfiles>=1.0.0", ] [build-system] diff --git a/api/uv.lock b/api/uv.lock index 33d7800..8488733 100644 --- a/api/uv.lock +++ b/api/uv.lock @@ -65,6 +65,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/da/42/e921fccf5015463e32a3cf6ee7f980a6ed0f395ceeaa45060b61d86486c2/anyio-4.13.0-py3-none-any.whl", hash = "sha256:08b310f9e24a9594186fd75b4f73f4a4152069e3853f1ed8bfbf58369f4ad708", size = 114353, upload-time = "2026-03-24T12:59:08.246Z" }, ] +[[package]] +name = "asgiref" +version = "3.11.1" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/63/40/f03da1264ae8f7cfdbf9146542e5e7e8100a4c66ab48e791df9a03d3f6c0/asgiref-3.11.1.tar.gz", hash = "sha256:5f184dc43b7e763efe848065441eac62229c9f7b0475f41f80e207a114eda4ce", size = 38550, upload-time = "2026-02-03T13:30:14.33Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/5c/0a/a72d10ed65068e115044937873362e6e32fab1b7dce0046aeb224682c989/asgiref-3.11.1-py3-none-any.whl", hash = "sha256:e8667a091e69529631969fd45dc268fa79b99c92c5fcdda727757e52146ec133", size = 24345, upload-time = "2026-02-03T13:30:13.039Z" }, +] + [[package]] name = "asyncpg" version = "0.31.0" @@ -81,6 +90,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/91/66/b25ccb84a246b470eb943b0107c07edcae51804912b824054b3413995a10/asyncpg-0.31.0-cp313-cp313-win_amd64.whl", hash = "sha256:dc5f2fa9916f292e5c5c8b2ac2813763bcd7f58e130055b4ad8a0531314201ab", size = 596569, upload-time = "2025-11-24T23:26:16.189Z" }, ] +[[package]] +name = "attrs" +version = "26.1.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/9a/8e/82a0fe20a541c03148528be8cac2408564a6c9a0cc7e9171802bc1d26985/attrs-26.1.0.tar.gz", hash = "sha256:d03ceb89cb322a8fd706d4fb91940737b6642aa36998fe130a9bc96c985eff32", size = 952055, upload-time = "2026-03-19T14:22:25.026Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/64/b4/17d4b0b2a2dc85a6df63d1157e028ed19f90d4cd97c36717afef2bc2f395/attrs-26.1.0-py3-none-any.whl", hash = "sha256:c647aa4a12dfbad9333ca4e71fe62ddc36f4e63b2d260a37a8b83d2f043ac309", size = 67548, upload-time = "2026-03-19T14:22:23.645Z" }, +] + [[package]] name = "blis" version = "1.3.3" @@ -232,6 +250,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/8d/e4/d66708bdf0d92fb4d49b22cdff4b10cec38aca5dcd7e81d909bb55c65cd7/confection-1.3.3-py3-none-any.whl", hash = "sha256:b9fef9ee84b237ef4611ec3eb5797b70e13063e6310ad9f15536373f5e313c82", size = 35902, upload-time = "2026-03-24T18:45:22.664Z" }, ] +[[package]] +name = "croniter" +version = "6.2.2" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "python-dateutil" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/df/de/5832661ed55107b8a09af3f0a2e71e0957226a59eb1dcf0a445cce6daf20/croniter-6.2.2.tar.gz", hash = "sha256:ba60832a5ec8e12e51b8691c3309a113d1cf6526bdf1a48150ce8ec7a532d0ab", size = 113762, upload-time = "2026-03-15T08:43:48.112Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/d0/39/783980e78cb92c2d7bdb1fc7dbc86e94ccc6d58224d76a7f1f51b6c51e30/croniter-6.2.2-py3-none-any.whl", hash = "sha256:a5d17b1060974d36251ea4faf388233eca8acf0d09cbd92d35f4c4ac8f279960", size = 45422, upload-time = "2026-03-15T08:43:46.626Z" }, +] + [[package]] name = "cryptography" version = "46.0.6" @@ -554,12 +584,21 @@ dependencies = [ { name = "fastapi" }, { name = "google-genai" }, { name = "httpx" }, + { name = "opentelemetry-api" }, + { name = "opentelemetry-exporter-prometheus" }, + { name = "opentelemetry-instrumentation-fastapi" }, + { name = "opentelemetry-instrumentation-logging" }, + { name = "opentelemetry-sdk" }, { name = "passlib" }, + { name = "procrastinate" }, + { name = "prometheus-client" }, + { name = "prometheus-fastapi-instrumentator" }, { name = "pydantic-settings" }, { name = "pyjwt" }, { name = "spacy" }, { name = "sqlalchemy", extra = ["asyncio"] }, { name = "uvicorn", extra = ["standard"] }, + { name = "watchfiles" }, ] [package.dev-dependencies] @@ -580,12 +619,21 @@ requires-dist = [ { name = "fastapi", specifier = ">=0.115.0" }, { name = "google-genai", specifier = ">=1.0.0" }, { name = "httpx", specifier = ">=0.28.1" }, + { name = "opentelemetry-api", specifier = ">=1.42.1" }, + { name = "opentelemetry-exporter-prometheus", specifier = ">=0.63b1" }, + { name = "opentelemetry-instrumentation-fastapi", specifier = ">=0.63b1" }, + { name = "opentelemetry-instrumentation-logging", specifier = ">=0.63b1" }, + { name = "opentelemetry-sdk", specifier = ">=1.42.1" }, { name = "passlib", specifier = ">=1.7.4" }, + { name = "procrastinate", specifier = ">=3.8.1" }, + { name = "prometheus-client", specifier = ">=0.25.0" }, + { name = "prometheus-fastapi-instrumentator", specifier = ">=7.1.0" }, { name = "pydantic-settings", specifier = ">=2.0.0" }, { name = "pyjwt", specifier = ">=2.10.0" }, { name = "spacy", specifier = ">=3.8.0" }, { name = "sqlalchemy", extras = ["asyncio"], specifier = ">=2.0.0" }, { name = "uvicorn", extras = ["standard"], specifier = ">=0.30.0" }, + { name = "watchfiles", specifier = ">=1.0.0" }, ] [package.metadata.requires-dev] @@ -711,6 +759,129 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/97/6a/7e345032cc60501721ef94e0e30b60f6b0bd601f9174ebd36389a2b86d40/numpy-2.4.4-cp313-cp313t-win_arm64.whl", hash = "sha256:0dfd3f9d3adbe2920b68b5cd3d51444e13a10792ec7154cd0a2f6e74d4ab3233", size = 10292002, upload-time = "2026-03-29T13:20:25.909Z" }, ] +[[package]] +name = "opentelemetry-api" +version = "1.42.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "typing-extensions" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/b4/1c/125e1c936c0873796771b7f04f6c93b9f1bf5d424cea90fda94a99f61da8/opentelemetry_api-1.42.1.tar.gz", hash = "sha256:56c63bea9f77b62856be8c47600474acad853b2924b99b1687c4cb6297166716", size = 72296, upload-time = "2026-05-21T16:32:49.335Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/a3/ca/9520cc1f3dfbbd03ac5903bbf55833e257bc64b1cf30fa8b0d6df374d821/opentelemetry_api-1.42.1-py3-none-any.whl", hash = "sha256:51a69edacadbc03a8950ace1c4c21099cacc538820ac2c9e36277e78cebba714", size = 61311, upload-time = "2026-05-21T16:32:28.822Z" }, +] + +[[package]] +name = "opentelemetry-exporter-prometheus" +version = "0.63b1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "opentelemetry-api" }, + { name = "opentelemetry-sdk" }, + { name = "prometheus-client" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/ac/2a/dfeddff262b12eff0c72f4ad9e258aab8889f48c4dc1417a0377a13bc427/opentelemetry_exporter_prometheus-0.63b1.tar.gz", hash = "sha256:31902e22c89431058a95b6dcdb644f9309f226aa4872cc755f0a780d2895e97f", size = 15234, upload-time = "2026-05-21T16:32:57.797Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/2f/ec/d7c7435e9000fb69837cf7753b7cbbbdeb5d0585203daf1b6ebf8fa93e02/opentelemetry_exporter_prometheus-0.63b1-py3-none-any.whl", hash = "sha256:0efd00aa6b1939345ddcc6de141b83ebffa2b4401a37a68f880e54217602701d", size = 12466, upload-time = "2026-05-21T16:32:36.622Z" }, +] + +[[package]] +name = "opentelemetry-instrumentation" +version = "0.63b1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "opentelemetry-api" }, + { name = "opentelemetry-semantic-conventions" }, + { name = "packaging" }, + { name = "wrapt" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/da/6d/4de72d97ff54db1ed270c7a59c9b904b917c0ac7af429c086c388b824ddb/opentelemetry_instrumentation-0.63b1.tar.gz", hash = "sha256:32368d6ae52c8de20aa790a6ad86b10a76f09956092337ae37d675773990e541", size = 41081, upload-time = "2026-05-21T16:36:14.206Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/35/a1/9314e621c143e4d82a5bf7a43c2ff7a745d31023506336857607c8c543cc/opentelemetry_instrumentation-0.63b1-py3-none-any.whl", hash = "sha256:f1986716d52cc316ea5f60189098726a9071d8ecc0eee96c9ed110be08bade9c", size = 35577, upload-time = "2026-05-21T16:34:56.818Z" }, +] + +[[package]] +name = "opentelemetry-instrumentation-asgi" +version = "0.63b1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "asgiref" }, + { name = "opentelemetry-api" }, + { name = "opentelemetry-instrumentation" }, + { name = "opentelemetry-semantic-conventions" }, + { name = "opentelemetry-util-http" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/a0/b5/7ea3a9fd1b80e89786c14250bfaecf32a753c3fd08232690f4da8dc16e29/opentelemetry_instrumentation_asgi-0.63b1.tar.gz", hash = "sha256:267b422416d768f3c7f4054883b41d9c3a7c943d86d20032b738c99a3dbb5862", size = 26151, upload-time = "2026-05-21T16:36:18.368Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/57/7e/83986f27b421de04fab1e1a84e892621dac42e6432a9c66779505f4d1381/opentelemetry_instrumentation_asgi-0.63b1-py3-none-any.whl", hash = "sha256:1a22453dfa965f14799b10a674b8acbcb897a8a75c79136060af54214cc7886e", size = 15906, upload-time = "2026-05-21T16:35:04.162Z" }, +] + +[[package]] +name = "opentelemetry-instrumentation-fastapi" +version = "0.63b1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "opentelemetry-api" }, + { name = "opentelemetry-instrumentation" }, + { name = "opentelemetry-instrumentation-asgi" }, + { name = "opentelemetry-semantic-conventions" }, + { name = "opentelemetry-util-http" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/32/d6/0c128fac2e34b7d526a8d3c6edc45b875a97f8a987861b00511151b6337d/opentelemetry_instrumentation_fastapi-0.63b1.tar.gz", hash = "sha256:cc42dff56c96d0a2921510c4abab2a4c2e27fe64b26dc1254727fb550df100ba", size = 25387, upload-time = "2026-05-21T16:36:32.071Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/b1/3d/2eae63f13f36d7a8ab5bf03d06ecaf169c2069b524547f24947be6d92094/opentelemetry_instrumentation_fastapi-0.63b1-py3-none-any.whl", hash = "sha256:52ee2cde9a2ac094bdd45d79f85860e03a972928a2553006071fe61d94cf7281", size = 12795, upload-time = "2026-05-21T16:35:28.68Z" }, +] + +[[package]] +name = "opentelemetry-instrumentation-logging" +version = "0.63b1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "opentelemetry-api" }, + { name = "opentelemetry-instrumentation" }, + { name = "opentelemetry-semantic-conventions" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/d1/cf/119381b1ae446fb07921a452e3a8e1887aa87f9856225f9829958dc20063/opentelemetry_instrumentation_logging-0.63b1.tar.gz", hash = "sha256:aa57d1bcb8931186b5dde565e9c17c572cf02412572d962da5b1a17ee5637d2c", size = 19823, upload-time = "2026-05-21T16:36:37.276Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/b6/71/1ba447311adf33023be14a1a309852c4cf74219f095d0055a54c1824d9ff/opentelemetry_instrumentation_logging-0.63b1-py3-none-any.whl", hash = "sha256:6b3aac8d18bc897468814d5ce4ed00f9d43588c583b4ba2288267e191b96d944", size = 15993, upload-time = "2026-05-21T16:35:35.851Z" }, +] + +[[package]] +name = "opentelemetry-sdk" +version = "1.42.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "opentelemetry-api" }, + { name = "opentelemetry-semantic-conventions" }, + { name = "typing-extensions" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/40/f7/b390bd9bfd703bf98a68fea1f27786c6872331fd617164a54b8a59bdc008/opentelemetry_sdk-1.42.1.tar.gz", hash = "sha256:8c834e8f8c9ba4171d4ec843d0cb8a67e4c7394d3f9e9297e582cbd9456ddbf7", size = 239262, upload-time = "2026-05-21T16:33:04.641Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/8f/6b/4287766cfbde577ae2272e8884abac325aeaac0d64f41c61d5b8cc595105/opentelemetry_sdk-1.42.1-py3-none-any.whl", hash = "sha256:083cd4bbfaa5aa7b5a9e552430d9951219967cfb27aa61feb13a77aba1fc839d", size = 170907, upload-time = "2026-05-21T16:32:45.894Z" }, +] + +[[package]] +name = "opentelemetry-semantic-conventions" +version = "0.63b1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "opentelemetry-api" }, + { name = "typing-extensions" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/93/99/4d7dd6df64795951413ce6e815f8cf1eb191daf7196ae86574589643d5f3/opentelemetry_semantic_conventions-0.63b1.tar.gz", hash = "sha256:3daf963611334b365e98a57438183eb012d3bfb40b2d931a9af613476b8701a9", size = 148340, upload-time = "2026-05-21T16:33:05.455Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/cb/7a/7fe66f5f3682b1dd47d88cc4e11f1c6c0966b737de2d16671146e23c39a5/opentelemetry_semantic_conventions-0.63b1-py3-none-any.whl", hash = "sha256:dfe5ef4dee82586b746f522b818ceb298d00b3d59f660042bd79404bff8d0682", size = 203713, upload-time = "2026-05-21T16:32:47.016Z" }, +] + +[[package]] +name = "opentelemetry-util-http" +version = "0.63b1" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/6c/d8/7bf5e4cec0578ac3c28c18eb7b88f34279139cbc8c568d6aa02b9c5ae53e/opentelemetry_util_http-0.63b1.tar.gz", hash = "sha256:ba1268f00922ee522dba2ae38458060f99486e7385a8056985901ca9685adfff", size = 11102, upload-time = "2026-05-21T16:36:56.675Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/e5/f1/34e047e8f6a3c67e5220acf1af7b9f62868c25d77791bca74457bd2180a6/opentelemetry_util_http-0.63b1-py3-none-any.whl", hash = "sha256:6284194028c59cd439f8acfe388145069a6127f11dc077e1344a2094adacc3f8", size = 8205, upload-time = "2026-05-21T16:36:09.736Z" }, +] + [[package]] name = "packaging" version = "26.0" @@ -758,6 +929,75 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/ef/a7/32a4903019d936a2316fdd330bedddac287ac26326107d24fb76a1fbc60a/preshed-3.0.13-cp313-cp313-win_arm64.whl", hash = "sha256:35d6c5acb3ee3b12b87a551913063f0cec784055c2af16e028c19fe875f079d0", size = 108497, upload-time = "2026-03-23T08:56:55.816Z" }, ] +[[package]] +name = "procrastinate" +version = "3.8.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "asgiref" }, + { name = "attrs" }, + { name = "croniter" }, + { name = "packaging" }, + { name = "psycopg", extra = ["pool"] }, + { name = "python-dateutil" }, + { name = "typing-extensions" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/8f/cd/cbb88b0f19fa94e8a610af2fd3844e96b70591f4263ef4c36f10e4ebe4e2/procrastinate-3.8.1.tar.gz", hash = "sha256:cf7f11dfd4247daa166e9b61a211f9d5b70512d86eccc2bf4298f6ad182a32fa", size = 85343, upload-time = "2026-04-08T06:24:21.385Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/c5/ef/05a54e7ef9328d3d91a1a3b84ccf08a578128a48c57cd1117d1fbd8e6f17/procrastinate-3.8.1-py3-none-any.whl", hash = "sha256:67db4e9f0243c45775c02a0090fb3bfc7877d496e6b279d960d9ad4b1fa2f185", size = 148736, upload-time = "2026-04-08T06:24:19.754Z" }, +] + +[[package]] +name = "prometheus-client" +version = "0.25.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/1b/fb/d9aa83ffe43ce1f19e557c0971d04b90561b0cfd50762aafb01968285553/prometheus_client-0.25.0.tar.gz", hash = "sha256:5e373b75c31afb3c86f1a52fa1ad470c9aace18082d39ec0d2f918d11cc9ba28", size = 86035, upload-time = "2026-04-09T19:53:42.359Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/8d/9b/d4b1e644385499c8346fa9b622a3f030dce14cd6ef8a1871c221a17a67e7/prometheus_client-0.25.0-py3-none-any.whl", hash = "sha256:d5aec89e349a6ec230805d0df882f3807f74fd6c1a2fa86864e3c2279059fed1", size = 64154, upload-time = "2026-04-09T19:53:41.324Z" }, +] + +[[package]] +name = "prometheus-fastapi-instrumentator" +version = "7.1.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "prometheus-client" }, + { name = "starlette" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/69/6d/24d53033cf93826aa7857699a4450c1c67e5b9c710e925b1ed2b320c04df/prometheus_fastapi_instrumentator-7.1.0.tar.gz", hash = "sha256:be7cd61eeea4e5912aeccb4261c6631b3f227d8924542d79eaf5af3f439cbe5e", size = 20220, upload-time = "2025-03-19T19:35:05.351Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/27/72/0824c18f3bc75810f55dacc2dd933f6ec829771180245ae3cc976195dec0/prometheus_fastapi_instrumentator-7.1.0-py3-none-any.whl", hash = "sha256:978130f3c0bb7b8ebcc90d35516a6fe13e02d2eb358c8f83887cdef7020c31e9", size = 19296, upload-time = "2025-03-19T19:35:04.323Z" }, +] + +[[package]] +name = "psycopg" +version = "3.3.4" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "tzdata", marker = "sys_platform == 'win32'" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/db/2f/cb91e5502ec9de1de6f1b76cfbf69531932725361168bb06963620c77e2e/psycopg-3.3.4.tar.gz", hash = "sha256:e21207764952cff81b6b8bdacad9a3939f2793367fdac2987b3aac36a651b5bc", size = 165799, upload-time = "2026-05-01T23:31:55.179Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/5c/e0/7b3dee031daae7743609ce3c746565d4a3ed7c2c186479eb48e34e838c64/psycopg-3.3.4-py3-none-any.whl", hash = "sha256:b6bbc25ccf05c8fad3b061d9db2ef0909a555171b84b07f29458a447253d679a", size = 213001, upload-time = "2026-05-01T23:20:50.816Z" }, +] + +[package.optional-dependencies] +pool = [ + { name = "psycopg-pool" }, +] + +[[package]] +name = "psycopg-pool" +version = "3.3.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "typing-extensions" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/90/82/7a23d26039827ecd4ebe93905651029ddd307c5182ad59296dfb6f67b528/psycopg_pool-3.3.1.tar.gz", hash = "sha256:b10b10b7a175d5cc1592147dc5b7eec8a9e0834eb3ed2c4a92c858e2f51eb63c", size = 31661, upload-time = "2026-05-01T23:31:59.809Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/37/ed/89c2c620af0e1660354cd8aabf9f5b21f911597ce22acb37c805d6c86bc8/psycopg_pool-3.3.1-py3-none-any.whl", hash = "sha256:2af5b432941c4c9ad5c87b3fa410aec910ec8f7c122855897983a06c45f2e4b5", size = 40023, upload-time = "2026-05-01T23:31:53.136Z" }, +] + [[package]] name = "pyasn1" version = "0.6.3" @@ -1120,14 +1360,14 @@ wheels = [ [[package]] name = "starlette" -version = "1.0.0" +version = "0.52.1" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "anyio" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/81/69/17425771797c36cded50b7fe44e850315d039f28b15901ab44839e70b593/starlette-1.0.0.tar.gz", hash = "sha256:6a4beaf1f81bb472fd19ea9b918b50dc3a77a6f2e190a12954b25e6ed5eea149", size = 2655289, upload-time = "2026-03-22T18:29:46.779Z" } +sdist = { url = "https://files.pythonhosted.org/packages/c4/68/79977123bb7be889ad680d79a40f339082c1978b5cfcf62c2d8d196873ac/starlette-0.52.1.tar.gz", hash = "sha256:834edd1b0a23167694292e94f597773bc3f89f362be6effee198165a35d62933", size = 2653702, upload-time = "2026-01-18T13:34:11.062Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/0b/c9/584bc9651441b4ba60cc4d557d8a547b5aff901af35bda3a4ee30c819b82/starlette-1.0.0-py3-none-any.whl", hash = "sha256:d3ec55e0bb321692d275455ddfd3df75fff145d009685eb40dc91fc66b03d38b", size = 72651, upload-time = "2026-03-22T18:29:45.111Z" }, + { url = "https://files.pythonhosted.org/packages/81/0d/13d1d239a25cbfb19e740db83143e95c772a1fe10202dda4b76792b114dd/starlette-0.52.1-py3-none-any.whl", hash = "sha256:0029d43eb3d273bc4f83a08720b4912ea4b071087a3b48db01b7c839f7954d74", size = 74272, upload-time = "2026-01-18T13:34:09.188Z" }, ] [[package]] @@ -1217,6 +1457,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/dc/9b/47798a6c91d8bdb567fe2698fe81e0c6b7cb7ef4d13da4114b41d239f65d/typing_inspection-0.4.2-py3-none-any.whl", hash = "sha256:4ed1cacbdc298c220f1bd249ed5287caa16f34d44ef4e9c3d0cbad5b521545e7", size = 14611, upload-time = "2025-10-01T02:14:40.154Z" }, ] +[[package]] +name = "tzdata" +version = "2026.2" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/ba/19/1b9b0e29f30c6d35cb345486df41110984ea67ae69dddbc0e8a100999493/tzdata-2026.2.tar.gz", hash = "sha256:9173fde7d80d9018e02a662e168e5a2d04f87c41ea174b139fbef642eda62d10", size = 198254, upload-time = "2026-04-24T15:22:08.651Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/ce/e4/dccd7f47c4b64213ac01ef921a1337ee6e30e8c6466046018326977efd95/tzdata-2026.2-py2.py3-none-any.whl", hash = "sha256:bbe9af844f658da81a5f95019480da3a89415801f6cc966806612cc7169bffe7", size = 349321, upload-time = "2026-04-24T15:22:05.876Z" }, +] + [[package]] name = "urllib3" version = "2.6.3" diff --git a/api/worker/main.py b/api/worker/main.py new file mode 100644 index 0000000..ac84773 --- /dev/null +++ b/api/worker/main.py @@ -0,0 +1,8 @@ +# Importing the tasks package registers all tasks with the procrastinate app. +from app.tasks import procrastinate_app as app + +if __name__ == "__main__": + print("Starting worker...") + app.run_worker(queues=["adventure_pipeline", "default"], name="worker-1") +else: + print("not starting worker, since __name__ is not '__main__'") \ No newline at end of file diff --git a/docker-compose-dev.yml b/docker-compose-dev.yml index cb4df06..f14d1b0 100644 --- a/docker-compose-dev.yml +++ b/docker-compose-dev.yml @@ -42,9 +42,10 @@ services: - ./api:/app:z ports: - "${API_PORT:-8000}:8000" - command: uvicorn app.main:app --host 0.0.0.0 --port 8000 --reload + command: sh -c "alembic upgrade head && uvicorn app.main:app --host 0.0.0.0 --port 8000 --reload" environment: DATABASE_URL: postgresql+asyncpg://${POSTGRES_USER:-langlearn}:${POSTGRES_PASSWORD}@db:5432/${POSTGRES_DB:-langlearn} + PROCRASTINATE_DATABASE_URL: postgresql://${POSTGRES_USER:-langlearn}:${POSTGRES_PASSWORD}@db:5432/${POSTGRES_DB:-langlearn} ADMIN_USER_EMAILS: ${ADMIN_USER_EMAILS:-wilson@thomaswilson.xyz} API_BASE_URL: ${API_BASE_URL:-http://localhost:8000} JWT_SECRET: ${JWT_SECRET} @@ -69,6 +70,34 @@ services: condition: service_healthy restart: unless-stopped + worker: + build: ./api + volumes: + - ./api:/app:z + command: watchfiles --filter python "python -m worker.main" /app + environment: + DATABASE_URL: postgresql+asyncpg://${POSTGRES_USER:-langlearn}:${POSTGRES_PASSWORD}@db:5432/${POSTGRES_DB:-langlearn} + PROCRASTINATE_DATABASE_URL: postgresql://${POSTGRES_USER:-langlearn}:${POSTGRES_PASSWORD}@db:5432/${POSTGRES_DB:-langlearn} + JWT_SECRET: ${JWT_SECRET} + ANTHROPIC_API_KEY: ${ANTHROPIC_API_KEY} + DEEPL_API_KEY: ${DEEPL_API_KEY} + DEEPGRAM_API_KEY: ${DEEPGRAM_API_KEY} + GEMINI_API_KEY: ${GEMINI_API_KEY} + STORY_GENERATION_API_PROVIDER: ${STORY_GENERATION_API_PROVIDER:-"anthropic"} + PYTHONPATH: /app + STORAGE_PROVIDER: local + STORAGE_ENDPOINT_URL: http://storage:9000 + STORAGE_ACCESS_KEY: ${STORAGE_ACCESS_KEY:-langlearn} + STORAGE_SECRET_KEY: ${STORAGE_SECRET_KEY} + STORAGE_BUCKET: ${STORAGE_BUCKET:-langlearn} + OTEL_SERVICE_NAME: ${OTEL_SERVICE_NAME:-language-learning-worker} + depends_on: + db: + condition: service_healthy + storage: + condition: service_healthy + restart: unless-stopped + frontend: build: context: ./frontend diff --git a/docker-compose-prod.yml b/docker-compose-prod.yml index c2b3157..235d5ee 100644 --- a/docker-compose-prod.yml +++ b/docker-compose-prod.yml @@ -65,6 +65,34 @@ services: cpus: "1" memory: 1G + worker: + build: ./api + command: python -m worker.main + environment: + DATABASE_URL: postgresql+asyncpg://${POSTGRES_USER:-langlearn}:${POSTGRES_PASSWORD}@db:5432/${POSTGRES_DB:-langlearn} + JWT_SECRET: ${JWT_SECRET} + ANTHROPIC_API_KEY: ${ANTHROPIC_API_KEY} + DEEPL_API_KEY: ${DEEPL_API_KEY} + DEEPGRAM_API_KEY: ${DEEPGRAM_API_KEY} + GEMINI_API_KEY: ${GEMINI_API_KEY} + PYTHONPATH: /app + STORAGE_PROVIDER: bunny + BUNNY_ZONE: ${BUNNY_ZONE} + BUNNY_API_KEY: ${BUNNY_API_KEY} + BUNNY_CDN_BASE_URL: ${BUNNY_CDN_BASE_URL} + BUNNY_TOKEN_AUTH_KEY: ${BUNNY_TOKEN_AUTH_KEY} + BUNNY_STORAGE_ENDPOINT: ${BUNNY_STORAGE_ENDPOINT} + OTEL_SERVICE_NAME: ${OTEL_SERVICE_NAME:-language-learning-worker} + depends_on: + db: + condition: service_healthy + restart: unless-stopped + deploy: + resources: + limits: + cpus: "1" + memory: 1G + frontend: build: context: ./frontend diff --git a/docker-compose.test.yml b/docker-compose.test.yml index 809170f..d27ecc9 100644 --- a/docker-compose.test.yml +++ b/docker-compose.test.yml @@ -22,7 +22,11 @@ services: tmpfs: - /data healthcheck: - test: ["CMD-SHELL", "curl -sf http://localhost:9000/minio/health/live || exit 1"] + test: + [ + "CMD-SHELL", + "curl -sf http://localhost:9000/minio/health/live || exit 1", + ] interval: 5s timeout: 5s retries: 10 @@ -53,8 +57,30 @@ services: healthcheck: test: - "CMD-SHELL" - - "python -c \"import urllib.request; urllib.request.urlopen('http://localhost:8000/health')\"" + - 'python -c "import urllib.request; urllib.request.urlopen(''http://localhost:8000/health'')"' interval: 5s timeout: 5s retries: 20 start_period: 10s + + worker: + build: ./api + command: python -m worker.main + environment: + DATABASE_URL: postgresql+asyncpg://langlearn_test:testpassword@db:5432/langlearn_test + JWT_SECRET: test-jwt-secret-not-for-production + ANTHROPIC_API_KEY: test-key + DEEPL_API_KEY: test-key + DEEPGRAM_API_KEY: test-key + GEMINI_API_KEY: test-key + STORAGE_ENDPOINT_URL: http://storage:9000 + STORAGE_ACCESS_KEY: langlearn_test + STORAGE_SECRET_KEY: testpassword123 + STORAGE_BUCKET: langlearn-test + PYTHONPATH: /app + STUB_GENERATION: "true" + depends_on: + db: + condition: service_healthy + storage: + condition: service_healthy diff --git a/docker-compose.yml b/docker-compose.yml index 4f530a7..1be0e6f 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -27,20 +27,25 @@ services: volumes: - storagedata:/data healthcheck: - test: ["CMD-SHELL", "curl -sf http://localhost:9000/minio/health/live || exit 1"] + test: + [ + "CMD-SHELL", + "curl -sf http://localhost:9000/minio/health/live || exit 1", + ] interval: 5s timeout: 5s retries: 10 api: build: ./api - volumes: + volumes: - ./api:/app:z ports: - "${API_PORT:-8000}:8000" command: uvicorn app.main:app --host 0.0.0.0 --port 8000 --reload environment: DATABASE_URL: postgresql+asyncpg://${POSTGRES_USER:-langlearn}:${POSTGRES_PASSWORD}@db:5432/${POSTGRES_DB:-langlearn} + PROCRASTINATEDATABASE_URL: postgresql://${POSTGRES_USER:-langlearn}:${POSTGRES_PASSWORD}@db:5432/${POSTGRES_DB:-langlearn} ADMIN_USER_EMAILS: ${ADMIN_USER_EMAILS:-wilson@thomaswilson.xyz} API_BASE_URL: ${API_BASE_URL:-http://localhost:8000} JWT_SECRET: ${JWT_SECRET} @@ -64,6 +69,32 @@ services: condition: service_healthy restart: unless-stopped + worker: + build: ./api + volumes: + - ./api:/app:z + command: watchfiles --filter python "python -m worker.main" /app + environment: + DATABASE_URL: postgresql+asyncpg://${POSTGRES_USER:-langlearn}:${POSTGRES_PASSWORD}@db:5432/${POSTGRES_DB:-langlearn} + PROCRASTINATEDATABASE_URL: postgresql://${POSTGRES_USER:-langlearn}:${POSTGRES_PASSWORD}@db:5432/${POSTGRES_DB:-langlearn} + JWT_SECRET: ${JWT_SECRET} + ANTHROPIC_API_KEY: ${ANTHROPIC_API_KEY} + DEEPL_API_KEY: ${DEEPL_API_KEY} + DEEPGRAM_API_KEY: ${DEEPGRAM_API_KEY} + GEMINI_API_KEY: ${GEMINI_API_KEY} + PYTHONPATH: /app + STORAGE_ENDPOINT_URL: http://storage:9000 + STORAGE_ACCESS_KEY: ${STORAGE_ACCESS_KEY:-langlearn} + STORAGE_SECRET_KEY: ${STORAGE_SECRET_KEY} + STORAGE_BUCKET: ${STORAGE_BUCKET:-langlearn} + OTEL_SERVICE_NAME: ${OTEL_SERVICE_NAME:-language-learning-worker} + depends_on: + db: + condition: service_healthy + storage: + condition: service_healthy + restart: unless-stopped + frontend: build: context: ./frontend diff --git a/frontend/src/routes/app/+page.svelte b/frontend/src/routes/app/+page.svelte index 9fb5c45..fee76c3 100644 --- a/frontend/src/routes/app/+page.svelte +++ b/frontend/src/routes/app/+page.svelte @@ -23,18 +23,13 @@
Your reading library
- -Browse your library of French articles and generated readings. Tap any word for a definition @@ -49,24 +44,6 @@