language-learning-app/api/docs/technical-design-queue.md
wilson fecb5839ea
Some checks failed
/ test (push) Has been cancelled
feats: use Procrastinate for persistant jobs; try using Gemini for text
generation
2026-05-27 18:45:52 +01:00

19 KiB
Raw Blame History

Technical Design: Persistent Job Queue with Procrastinate

Status: Draft — drafted by LLM, reviewed by human developer. Scope: Migration of adventure entry pipeline from in-process asyncio.Queue to Procrastinate (PostgreSQL-backed), plus groundwork for future scheduled jobs.


Problem

app/worker.py is a plain asyncio.Queue running inside the API process. Its limitations:

  • No persistence. Any enqueued jobs are silently lost if the API process restarts or is redeployed.
  • No retry. A transient failure (network blip calling Anthropic/DeepL/Gemini) permanently sets the entry status to 'error'.
  • No scheduling. We want to run nightly jobs (e.g. news digest generation) on a cron trigger.
  • Contention. Long-running LLM + TTS + NLP pipelines share the same process and event loop as the HTTP API.

Solution: Procrastinate + separate worker container

Procrastinate is a Python asyncio task queue backed by PostgreSQL. It uses LISTEN/NOTIFY for fast job dispatch with a polling fallback. No new infrastructure is needed — the existing PostgreSQL instance is the queue backing store.

A dedicated worker Docker container is added to every compose file. It shares the same Docker image as api (same ./api build context) but runs a different command. Both containers connect to the same PostgreSQL instance.

┌─────────────────────┐   defer_async()   ┌──────────────────────┐
│  api  (FastAPI)     │ ────────────────→ │  PostgreSQL           │
│  port 8000          │                   │  procrastinate_jobs   │
└─────────────────────┘                   │  procrastinate_events │
                                          └──────────────────────┘
┌─────────────────────┐  LISTEN/NOTIFY +  │
│  worker             │ ←──────────────── │
│  (Procrastinate)    │  polling fallback │
└─────────────────────┘                   └

Files Changed

New: api/app/tasks.py

Single source of truth for all task definitions. Both the API (to defer tasks) and the worker (to execute them) import this module.

import uuid
import logging
import procrastinate
from procrastinate.contrib.sqlalchemy import SQLAlchemyAsyncConnector

from .config import settings
from .outbound.postgres.database import engine, AsyncSessionLocal
from .outbound.anthropic.anthropic_client import AnthropicClient
from .outbound.deepl.deepl_client import DeepLClient
from .outbound.gemini.gemini_client import GeminiClient
from .outbound.spacy.spacy_client import SpacyClient
from .outbound.postgres.repositories.adventure_repository import (
    PostgresAdventureRepository,
    PostgresAdventureEntryRepository,
    PostgresAdventureEntryChoiceRepository,
    PostgresAdventureEntryDecisionRepository,
    PostgresAdventureEntryTranslationRepository,
    PostgresAdventureEntryAudioRepository,
)
from .domain.services.adventure_service import AdventureService

logger = logging.getLogger(__name__)

procrastinate_app = procrastinate.App(
    connector=SQLAlchemyAsyncConnector(engine)
)


def _make_adventure_service(db) -> AdventureService:
    """
    Moved here from adventures.py so the worker can construct the service
    without importing the router module.
    """
    if settings.stub_generation:
        from .routers.api.adventures import (   # avoid circular import at module level
            _StubAnthropicClient, _StubDeepLClient, _StubGeminiClient, _StubSpacyClient
        )
        anthropic = _StubAnthropicClient()
        deepl    = _StubDeepLClient()
        gemini   = _StubGeminiClient()
        spacy    = _StubSpacyClient()
    else:
        anthropic = AnthropicClient.new(settings.anthropic_api_key)
        deepl     = DeepLClient(settings.deepl_api_key)
        gemini    = GeminiClient(settings.gemini_api_key)
        spacy     = SpacyClient()

    return AdventureService(
        adventure_repo=PostgresAdventureRepository(db),
        entry_repo=PostgresAdventureEntryRepository(db),
        choice_repo=PostgresAdventureEntryChoiceRepository(db),
        decision_repo=PostgresAdventureEntryDecisionRepository(db),
        translation_repo=PostgresAdventureEntryTranslationRepository(db),
        audio_repo=PostgresAdventureEntryAudioRepository(db),
        anthropic_client=anthropic,
        deepl_client=deepl,
        gemini_client=gemini,
        spacy_client=spacy,
    )


@procrastinate_app.task(queue="adventure_pipeline")
async def generate_adventure_entry(
    adventure_id: str, entry_id: str, user_id: str
) -> None:
    async with AsyncSessionLocal() as db:
        service = _make_adventure_service(db)
        await service.run_entry_pipeline(
            uuid.UUID(adventure_id),
            uuid.UUID(entry_id),
            uuid.UUID(user_id),
        )

EDITOR'S NOTE There's no good reason why the stubs should live in routers.api.adventures, and therefore risk circular dependencies. They should be moved to the outbound.SERVICE_NAME (e.g. outbound.bunny.stub_bunny_client). This will involve updating the dependencies in the API router.

Notes on retry strategy:
run_entry_pipeline currently catches all exceptions internally and writes status='error' to the DB — it never raises. From Procrastinate's point of view the job always succeeds, so retry is not wired up in this first pass. This preserves the existing behaviour exactly.

A follow-up improvement (out of scope here) would be to remove the internal catch-all, let exceptions propagate, and configure:

@procrastinate_app.task(
    queue="adventure_pipeline",
    retry=procrastinate.RetryStrategy(
        max_attempts=3,
        wait_minimum=30,
        wait_multiplier=2,
        wait_jitter=30,
    ),
)

with an on_abort hook responsible for writing the 'error' status after all attempts are exhausted.


New: api/app/worker_main.py

The worker process entrypoint. The Docker command points here.

import asyncio
import logging
from . import tasks  # side-effect: registers all task definitions

logger = logging.getLogger(__name__)


async def _run() -> None:
    async with tasks.procrastinate_app.open_async():
        logger.info("Procrastinate worker started, queue=adventure_pipeline")
        await tasks.procrastinate_app.run_worker_async(
            queues=["adventure_pipeline"],
            concurrency=4,
        )


if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)
    asyncio.run(_run())

Run command (in docker-compose): python -m app.worker_main


New: Alembic migration YYYYMMDD_0019_add_procrastinate_schema.py

Procrastinate manages its own schema independently, but we embed it in Alembic to keep all DB changes in one place and ensure it runs as part of alembic upgrade head.

from alembic import op
import procrastinate.contrib.sqlalchemy

def upgrade() -> None:
    # Procrastinate provides its DDL via the schema manager.
    # Run: `procrastinate schema --app=app.tasks.procrastinate_app print-sql`
    # to get the SQL and paste it here, or use:
    op.execute(procrastinate.contrib.sqlalchemy.SQLAlchemyAsyncConnector.get_schema_manager().get_schema())

def downgrade() -> None:
    op.execute("DROP SCHEMA procrastinate CASCADE;")
    # or the equivalent table-by-table drops if procrastinate uses public schema

Alternative (simpler): Add await tasks.procrastinate_app.schema.apply_schema_async() in worker_main.py before starting the worker. This runs Procrastinate's own migration tool, which is idempotent. It's less consistent with the project's Alembic-only convention but simpler to maintain as Procrastinate is upgraded.

REVIEW NOTE - yes, let's stick to Alembic, two solutions for migration management would add complexity.


Modified: api/app/main.py

Remove the worker_loop asyncio task; open the Procrastinate connector in lifespan so that defer_async calls from API routes work.

# Before
from . import worker

@asynccontextmanager
async def lifespan(app: FastAPI):
    init_storage()
    setup_observability(app)
    worker_task = asyncio.create_task(worker.worker_loop())
    yield
    worker_task.cancel()
    try:
        await worker_task
    except asyncio.CancelledError:
        pass

# After
from . import tasks

@asynccontextmanager
async def lifespan(app: FastAPI):
    async with tasks.procrastinate_app.open_async():
        init_storage()
        setup_observability(app)
        yield

The import asyncio line can be removed from main.py if it has no other uses after this change.


Modified: api/app/routers/api/adventures.py

Two changes:

  1. Remove the _make_service, _run_entry_pipeline_task, and stub client definitions (they move to tasks.py). Keep _make_service as a thin shim that delegates to tasks._make_adventure_service if any remaining synchronous use in the router still needs it (e.g. the can_translate_to check in create_adventure — this needs a DeepLClient instance, which is currently built inline anyway, so no change needed there).

  2. Replace worker.enqueue(...) with defer_async in the two endpoints that trigger pipeline work:

POST /adventures (create_adventure)

# Before
await worker.enqueue(
    partial(
        _run_entry_pipeline_task, uuid.UUID(adventure.id), uuid.UUID(first_entry.id), user_id
    )
)

# After
await tasks.generate_adventure_entry.defer_async(
    adventure_id=str(adventure.id),
    entry_id=str(first_entry.id),
    user_id=str(user_id),
)

POST /adventures/{adventure_id}/decisions (record_decision)

# Before
await worker.enqueue(
    partial(
        _run_entry_pipeline_task,
        uuid.UUID(next_entry.adventure_id),
        uuid.UUID(next_entry.id),
        user_id,
    )
)

# After
await tasks.generate_adventure_entry.defer_async(
    adventure_id=str(next_entry.adventure_id),
    entry_id=str(next_entry.id),
    user_id=str(user_id),
)

Procrastinate task arguments must be JSON-serialisable. uuid.UUID objects are converted to str at the call site; the task function converts them back with uuid.UUID(...).


Deleted: api/app/worker.py

Removed entirely once the migration is complete. No other callers exist outside adventures.py and main.py.


Docker Compose Changes

docker-compose.yml (base / local dev)

Add a worker service after api:

worker:
  build: ./api
  volumes:
    - ./api:/app:z # hot-reload on code change (same as api)
  command: python -m worker.main
  environment:
    DATABASE_URL: postgresql+asyncpg://${POSTGRES_USER:-langlearn}:${POSTGRES_PASSWORD}@db:5432/${POSTGRES_DB:-langlearn}
    JWT_SECRET: ${JWT_SECRET}
    ANTHROPIC_API_KEY: ${ANTHROPIC_API_KEY}
    DEEPL_API_KEY: ${DEEPL_API_KEY}
    DEEPGRAM_API_KEY: ${DEEPGRAM_API_KEY}
    GEMINI_API_KEY: ${GEMINI_API_KEY}
    PYTHONPATH: /app
    STORAGE_ENDPOINT_URL: http://storage:9000
    STORAGE_ACCESS_KEY: ${STORAGE_ACCESS_KEY:-langlearn}
    STORAGE_SECRET_KEY: ${STORAGE_SECRET_KEY}
    STORAGE_BUCKET: ${STORAGE_BUCKET:-langlearn}
    OTEL_SERVICE_NAME: ${OTEL_SERVICE_NAME:-language-learning-worker}
  depends_on:
    db:
      condition: service_healthy
    storage:
      condition: service_healthy
  restart: unless-stopped

The worker does not need ports, the Prometheus exporter config, or API_BASE_URL. OTEL service name is changed so traces are distinguishable in Grafana.

docker-compose-dev.yml

Same addition as above. The volumes: - ./api:/app:z mount means worker code reloads on save — but note that python -m worker.main does not hot-reload automatically (unlike uvicorn). For local dev, just restart the worker container after code changes: docker compose restart worker.

If hot-reload matters during development, the command can be wrapped with watchfiles:

command: watchfiles --filter python "python -m worker.main" /app

(This requires watchfiles in the Python dependencies.)

docker-compose-prod.yml

Add a worker service. The production command does not run alembic upgrade head because migrations are already applied by the api container's startup command. The worker just starts:

worker:
  build: ./api
  command: python -m worker.main
  environment:
    DATABASE_URL: postgresql+asyncpg://${POSTGRES_USER:-langlearn}:${POSTGRES_PASSWORD}@db:5432/${POSTGRES_DB:-langlearn}
    JWT_SECRET: ${JWT_SECRET}
    ANTHROPIC_API_KEY: ${ANTHROPIC_API_KEY}
    DEEPL_API_KEY: ${DEEPL_API_KEY}
    DEEPGRAM_API_KEY: ${DEEPGRAM_API_KEY}
    GEMINI_API_KEY: ${GEMINI_API_KEY}
    PYTHONPATH: /app
    STORAGE_PROVIDER: bunny
    BUNNY_ZONE: ${BUNNY_ZONE}
    BUNNY_API_KEY: ${BUNNY_API_KEY}
    BUNNY_CDN_BASE_URL: ${BUNNY_CDN_BASE_URL}
    BUNNY_TOKEN_AUTH_KEY: ${BUNNY_TOKEN_AUTH_KEY}
    BUNNY_STORAGE_ENDPOINT: ${BUNNY_STORAGE_ENDPOINT}
    OTEL_SERVICE_NAME: ${OTEL_SERVICE_NAME:-language-learning-worker}
  depends_on:
    db:
      condition: service_healthy
  restart: unless-stopped
  deploy:
    resources:
      limits:
        cpus: "1"
        memory: 1G

The worker depends on db only (no storage healthcheck needed since storage is Bunny CDN in prod, not a local container).

docker-compose.test.yml

Add a worker service. Critically, it must receive STUB_GENERATION: "true" so it uses stub clients, matching what the API does in tests.

worker:
  build: ./api
  command: python -m worker.main
  environment:
    DATABASE_URL: postgresql+asyncpg://langlearn_test:testpassword@db:5432/langlearn_test
    JWT_SECRET: test-jwt-secret-not-for-production
    ANTHROPIC_API_KEY: test-key
    DEEPL_API_KEY: test-key
    DEEPGRAM_API_KEY: test-key
    GEMINI_API_KEY: test-key
    STORAGE_ENDPOINT_URL: http://storage:9000
    STORAGE_ACCESS_KEY: langlearn_test
    STORAGE_SECRET_KEY: testpassword123
    STORAGE_BUCKET: langlearn-test
    PYTHONPATH: /app
    STUB_GENERATION: "true"
  depends_on:
    db:
      condition: service_healthy
    storage:
      condition: service_healthy

No healthcheck needed — the worker has no HTTP endpoint, and if it starts late, pending jobs simply wait in the queue until it picks them up.


Testing

Integration tests — no changes required

tests/test_adventures.py already polls with _wait_for_adventure_status(client, id, "active", timeout=30). This pattern is compatible with the worker being a separate process: the test enqueues a job, the worker processes it asynchronously, and the test polls until the adventure status reflects completion.

With stub generation, the pipeline completes in milliseconds. The 30-second timeout is more than sufficient even accounting for worker container startup time.

The one risk is a startup race: if the first test creates an adventure before the worker container has opened its Procrastinate connection, the job sits in the queue unprocessed until the worker is ready. Since docker compose up --wait waits for containers with healthchecks to pass (i.e. api is healthy before tests run), and the worker starts immediately after db is healthy (a prerequisite already met before api is healthy), the worker will typically be ready before the first test fires. No action needed, but if this proves flaky in CI, adding a short pg_isready-style healthcheck to the worker is the fix.

What the tests implicitly verify after migration

  • POST /adventures returns 201 and adventure status is 'awaiting_first_entry'
  • Worker picks up the job and calls run_entry_pipeline
  • Adventure transitions to 'active', entry to 'complete' (polled via _wait_for_adventure_status) ✓
  • Decision endpoint triggers a second pipeline job ✓

The existing tests cover all of this already. No new tests are required for the migration itself, though a test that verifies behaviour on worker restart (jobs not lost) would be a nice addition.


Implementation Order

  1. DONE: Add procrastinate (and procrastinate[sqlalchemy]) to api/pyproject.toml / requirements.
  2. Write Alembic migration for Procrastinate schema. Run it locally.
  3. Move stub services into their app/outbound directories
  4. Create app/tasks.py with the procrastinate_app instance and the generate_adventure_entry task.
  5. Create app/worker_main.py.
  6. Modify app/main.py: remove worker_loop, add procrastinate_app.open_async() to lifespan.
  7. Modify app/routers/api/adventures.py: replace worker.enqueue(...) with tasks.generate_adventure_entry.defer_async(...).
  8. Add worker service to all four compose files.
  9. Run the test suite: docker compose -f docker-compose.test.yml up --build --wait -d && pytest.
  10. Delete app/worker.py.

Steps 14 can be done before touching the API, so the migration can be tested end-to-end before cutting over.


Open Questions

  1. [ANSWERED] As mentioned above, so this small refactor, it makes sense. Stub client placement. The stub classes inside adventures.py need to be reachable from tasks.py. The proposal above lazy-imports them; the cleaner fix is to extract them to app/outbound/stubs.py. Doing this in the same PR keeps scope small but is worth doing if it avoids the circular-import smell.

  2. Worker concurrency. concurrency=4 is a placeholder. Adventure pipeline jobs are I/O-heavy (network calls), not CPU-heavy, so higher concurrency is fine. Tune based on Anthropic/DeepL API rate limits.

  3. Procrastinate schema management. Procrastinate has its own versioned migration system (separate from Alembic). When upgrading Procrastinate in future, run procrastinate schema --app=app.tasks.procrastinate_app migrate (or wrap it in an Alembic migration). Don't forget this step.

  4. Observability. Procrastinate emits structured log lines per job. These will appear in Loki automatically. A future improvement would be to add the job_id to the OpenTelemetry trace context inside generate_adventure_entry.


Future: Scheduled Jobs (News Digest)

With Procrastinate in place, cron-style jobs are first-class citizens. Once tasks.py exists, adding a nightly job is:

@procrastinate_app.periodic(cron="0 2 * * *")   # 2am daily UTC
async def generate_nightly_news_digest() -> None:
    async with AsyncSessionLocal() as db:
        await NewsDigestService(db, ...).run()

The worker process runs periodic tasks automatically; no additional scheduler container is needed. Procrastinate tracks the last fire time in the procrastinate_periodic_defers table, so missed runs (e.g. worker was down) fire once on the next startup.