language-learning-app/api/docs/technical-design-queue.md

467 lines
19 KiB
Markdown
Raw Permalink Normal View History

# Technical Design: Persistent Job Queue with Procrastinate
**Status:** Draft — drafted by LLM, reviewed by human developer.
**Scope:** Migration of adventure entry pipeline from in-process `asyncio.Queue` to Procrastinate (PostgreSQL-backed), plus groundwork for future scheduled jobs.
---
## Problem
`app/worker.py` is a plain `asyncio.Queue` running inside the API process. Its limitations:
- **No persistence.** Any enqueued jobs are silently lost if the API process restarts or is redeployed.
- **No retry.** A transient failure (network blip calling Anthropic/DeepL/Gemini) permanently sets the entry status to `'error'`.
- **No scheduling.** We want to run nightly jobs (e.g. news digest generation) on a cron trigger.
- **Contention.** Long-running LLM + TTS + NLP pipelines share the same process and event loop as the HTTP API.
---
## Solution: Procrastinate + separate worker container
[Procrastinate](https://procrastinate.readthedocs.io) is a Python asyncio task queue backed by PostgreSQL. It uses `LISTEN/NOTIFY` for fast job dispatch with a polling fallback. No new infrastructure is needed — the existing PostgreSQL instance is the queue backing store.
A dedicated `worker` Docker container is added to every compose file. It shares the same Docker image as `api` (same `./api` build context) but runs a different command. Both containers connect to the same PostgreSQL instance.
```
┌─────────────────────┐ defer_async() ┌──────────────────────┐
│ api (FastAPI) │ ────────────────→ │ PostgreSQL │
│ port 8000 │ │ procrastinate_jobs │
└─────────────────────┘ │ procrastinate_events │
└──────────────────────┘
┌─────────────────────┐ LISTEN/NOTIFY + │
│ worker │ ←──────────────── │
│ (Procrastinate) │ polling fallback │
└─────────────────────┘ └
```
---
## Files Changed
### New: `api/app/tasks.py`
Single source of truth for all task definitions. Both the API (to _defer_ tasks) and the worker (to _execute_ them) import this module.
```python
import uuid
import logging
import procrastinate
from procrastinate.contrib.sqlalchemy import SQLAlchemyAsyncConnector
from .config import settings
from .outbound.postgres.database import engine, AsyncSessionLocal
from .outbound.anthropic.anthropic_client import AnthropicClient
from .outbound.deepl.deepl_client import DeepLClient
from .outbound.gemini.gemini_client import GeminiClient
from .outbound.spacy.spacy_client import SpacyClient
from .outbound.postgres.repositories.adventure_repository import (
PostgresAdventureRepository,
PostgresAdventureEntryRepository,
PostgresAdventureEntryChoiceRepository,
PostgresAdventureEntryDecisionRepository,
PostgresAdventureEntryTranslationRepository,
PostgresAdventureEntryAudioRepository,
)
from .domain.services.adventure_service import AdventureService
logger = logging.getLogger(__name__)
procrastinate_app = procrastinate.App(
connector=SQLAlchemyAsyncConnector(engine)
)
def _make_adventure_service(db) -> AdventureService:
"""
Moved here from adventures.py so the worker can construct the service
without importing the router module.
"""
if settings.stub_generation:
from .routers.api.adventures import ( # avoid circular import at module level
_StubAnthropicClient, _StubDeepLClient, _StubGeminiClient, _StubSpacyClient
)
anthropic = _StubAnthropicClient()
deepl = _StubDeepLClient()
gemini = _StubGeminiClient()
spacy = _StubSpacyClient()
else:
anthropic = AnthropicClient.new(settings.anthropic_api_key)
deepl = DeepLClient(settings.deepl_api_key)
gemini = GeminiClient(settings.gemini_api_key)
spacy = SpacyClient()
return AdventureService(
adventure_repo=PostgresAdventureRepository(db),
entry_repo=PostgresAdventureEntryRepository(db),
choice_repo=PostgresAdventureEntryChoiceRepository(db),
decision_repo=PostgresAdventureEntryDecisionRepository(db),
translation_repo=PostgresAdventureEntryTranslationRepository(db),
audio_repo=PostgresAdventureEntryAudioRepository(db),
anthropic_client=anthropic,
deepl_client=deepl,
gemini_client=gemini,
spacy_client=spacy,
)
@procrastinate_app.task(queue="adventure_pipeline")
async def generate_adventure_entry(
adventure_id: str, entry_id: str, user_id: str
) -> None:
async with AsyncSessionLocal() as db:
service = _make_adventure_service(db)
await service.run_entry_pipeline(
uuid.UUID(adventure_id),
uuid.UUID(entry_id),
uuid.UUID(user_id),
)
```
**EDITOR'S NOTE** There's no good reason why the stubs should live in `routers.api.adventures`, and therefore risk circular dependencies. They should be moved to the `outbound.SERVICE_NAME` (e.g. `outbound.bunny.stub_bunny_client`). This will involve updating the dependencies in the API router.
**Notes on retry strategy:**
`run_entry_pipeline` currently catches all exceptions internally and writes `status='error'` to the DB — it never raises. From Procrastinate's point of view the job always succeeds, so retry is not wired up in this first pass. This preserves the existing behaviour exactly.
A follow-up improvement (out of scope here) would be to remove the internal catch-all, let exceptions propagate, and configure:
```python
@procrastinate_app.task(
queue="adventure_pipeline",
retry=procrastinate.RetryStrategy(
max_attempts=3,
wait_minimum=30,
wait_multiplier=2,
wait_jitter=30,
),
)
```
with an `on_abort` hook responsible for writing the `'error'` status after all attempts are exhausted.
---
### New: `api/app/worker_main.py`
The worker process entrypoint. The Docker `command` points here.
```python
import asyncio
import logging
from . import tasks # side-effect: registers all task definitions
logger = logging.getLogger(__name__)
async def _run() -> None:
async with tasks.procrastinate_app.open_async():
logger.info("Procrastinate worker started, queue=adventure_pipeline")
await tasks.procrastinate_app.run_worker_async(
queues=["adventure_pipeline"],
concurrency=4,
)
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
asyncio.run(_run())
```
Run command (in docker-compose): `python -m app.worker_main`
---
### New: Alembic migration `YYYYMMDD_0019_add_procrastinate_schema.py`
Procrastinate manages its own schema independently, but we embed it in Alembic to keep all DB changes in one place and ensure it runs as part of `alembic upgrade head`.
```python
from alembic import op
import procrastinate.contrib.sqlalchemy
def upgrade() -> None:
# Procrastinate provides its DDL via the schema manager.
# Run: `procrastinate schema --app=app.tasks.procrastinate_app print-sql`
# to get the SQL and paste it here, or use:
op.execute(procrastinate.contrib.sqlalchemy.SQLAlchemyAsyncConnector.get_schema_manager().get_schema())
def downgrade() -> None:
op.execute("DROP SCHEMA procrastinate CASCADE;")
# or the equivalent table-by-table drops if procrastinate uses public schema
```
**Alternative (simpler):** Add `await tasks.procrastinate_app.schema.apply_schema_async()` in `worker_main.py` before starting the worker. This runs Procrastinate's own migration tool, which is idempotent. It's less consistent with the project's Alembic-only convention but simpler to maintain as Procrastinate is upgraded.
**REVIEW NOTE** - yes, let's stick to Alembic, two solutions for migration management would add complexity.
---
### Modified: `api/app/main.py`
Remove the `worker_loop` asyncio task; open the Procrastinate connector in lifespan so that `defer_async` calls from API routes work.
```python
# Before
from . import worker
@asynccontextmanager
async def lifespan(app: FastAPI):
init_storage()
setup_observability(app)
worker_task = asyncio.create_task(worker.worker_loop())
yield
worker_task.cancel()
try:
await worker_task
except asyncio.CancelledError:
pass
# After
from . import tasks
@asynccontextmanager
async def lifespan(app: FastAPI):
async with tasks.procrastinate_app.open_async():
init_storage()
setup_observability(app)
yield
```
The `import asyncio` line can be removed from `main.py` if it has no other uses after this change.
---
### Modified: `api/app/routers/api/adventures.py`
Two changes:
1. **Remove** the `_make_service`, `_run_entry_pipeline_task`, and stub client definitions (they move to `tasks.py`). Keep `_make_service` as a thin shim that delegates to `tasks._make_adventure_service` if any remaining synchronous use in the router still needs it (e.g. the `can_translate_to` check in `create_adventure` — this needs a `DeepLClient` instance, which is currently built inline anyway, so no change needed there).
2. **Replace `worker.enqueue(...)` with `defer_async`** in the two endpoints that trigger pipeline work:
**`POST /adventures` (create_adventure)**
```python
# Before
await worker.enqueue(
partial(
_run_entry_pipeline_task, uuid.UUID(adventure.id), uuid.UUID(first_entry.id), user_id
)
)
# After
await tasks.generate_adventure_entry.defer_async(
adventure_id=str(adventure.id),
entry_id=str(first_entry.id),
user_id=str(user_id),
)
```
**`POST /adventures/{adventure_id}/decisions` (record_decision)**
```python
# Before
await worker.enqueue(
partial(
_run_entry_pipeline_task,
uuid.UUID(next_entry.adventure_id),
uuid.UUID(next_entry.id),
user_id,
)
)
# After
await tasks.generate_adventure_entry.defer_async(
adventure_id=str(next_entry.adventure_id),
entry_id=str(next_entry.id),
user_id=str(user_id),
)
```
Procrastinate task arguments must be JSON-serialisable. `uuid.UUID` objects are converted to `str` at the call site; the task function converts them back with `uuid.UUID(...)`.
---
### Deleted: `api/app/worker.py`
Removed entirely once the migration is complete. No other callers exist outside `adventures.py` and `main.py`.
---
## Docker Compose Changes
### `docker-compose.yml` (base / local dev)
Add a `worker` service after `api`:
```yaml
worker:
build: ./api
volumes:
- ./api:/app:z # hot-reload on code change (same as api)
command: python -m worker.main
environment:
DATABASE_URL: postgresql+asyncpg://${POSTGRES_USER:-langlearn}:${POSTGRES_PASSWORD}@db:5432/${POSTGRES_DB:-langlearn}
JWT_SECRET: ${JWT_SECRET}
ANTHROPIC_API_KEY: ${ANTHROPIC_API_KEY}
DEEPL_API_KEY: ${DEEPL_API_KEY}
DEEPGRAM_API_KEY: ${DEEPGRAM_API_KEY}
GEMINI_API_KEY: ${GEMINI_API_KEY}
PYTHONPATH: /app
STORAGE_ENDPOINT_URL: http://storage:9000
STORAGE_ACCESS_KEY: ${STORAGE_ACCESS_KEY:-langlearn}
STORAGE_SECRET_KEY: ${STORAGE_SECRET_KEY}
STORAGE_BUCKET: ${STORAGE_BUCKET:-langlearn}
OTEL_SERVICE_NAME: ${OTEL_SERVICE_NAME:-language-learning-worker}
depends_on:
db:
condition: service_healthy
storage:
condition: service_healthy
restart: unless-stopped
```
The worker does not need `ports`, the Prometheus exporter config, or `API_BASE_URL`. OTEL service name is changed so traces are distinguishable in Grafana.
### `docker-compose-dev.yml`
Same addition as above. The `volumes: - ./api:/app:z` mount means worker code reloads on save — but note that `python -m worker.main` does **not** hot-reload automatically (unlike uvicorn). For local dev, just restart the worker container after code changes: `docker compose restart worker`.
If hot-reload matters during development, the command can be wrapped with `watchfiles`:
```yaml
command: watchfiles --filter python "python -m worker.main" /app
```
(This requires `watchfiles` in the Python dependencies.)
### `docker-compose-prod.yml`
Add a `worker` service. The production command does _not_ run `alembic upgrade head` because migrations are already applied by the `api` container's startup command. The worker just starts:
```yaml
worker:
build: ./api
command: python -m worker.main
environment:
DATABASE_URL: postgresql+asyncpg://${POSTGRES_USER:-langlearn}:${POSTGRES_PASSWORD}@db:5432/${POSTGRES_DB:-langlearn}
JWT_SECRET: ${JWT_SECRET}
ANTHROPIC_API_KEY: ${ANTHROPIC_API_KEY}
DEEPL_API_KEY: ${DEEPL_API_KEY}
DEEPGRAM_API_KEY: ${DEEPGRAM_API_KEY}
GEMINI_API_KEY: ${GEMINI_API_KEY}
PYTHONPATH: /app
STORAGE_PROVIDER: bunny
BUNNY_ZONE: ${BUNNY_ZONE}
BUNNY_API_KEY: ${BUNNY_API_KEY}
BUNNY_CDN_BASE_URL: ${BUNNY_CDN_BASE_URL}
BUNNY_TOKEN_AUTH_KEY: ${BUNNY_TOKEN_AUTH_KEY}
BUNNY_STORAGE_ENDPOINT: ${BUNNY_STORAGE_ENDPOINT}
OTEL_SERVICE_NAME: ${OTEL_SERVICE_NAME:-language-learning-worker}
depends_on:
db:
condition: service_healthy
restart: unless-stopped
deploy:
resources:
limits:
cpus: "1"
memory: 1G
```
The worker depends on `db` only (no `storage` healthcheck needed since storage is Bunny CDN in prod, not a local container).
### `docker-compose.test.yml`
Add a `worker` service. Critically, it must receive `STUB_GENERATION: "true"` so it uses stub clients, matching what the API does in tests.
```yaml
worker:
build: ./api
command: python -m worker.main
environment:
DATABASE_URL: postgresql+asyncpg://langlearn_test:testpassword@db:5432/langlearn_test
JWT_SECRET: test-jwt-secret-not-for-production
ANTHROPIC_API_KEY: test-key
DEEPL_API_KEY: test-key
DEEPGRAM_API_KEY: test-key
GEMINI_API_KEY: test-key
STORAGE_ENDPOINT_URL: http://storage:9000
STORAGE_ACCESS_KEY: langlearn_test
STORAGE_SECRET_KEY: testpassword123
STORAGE_BUCKET: langlearn-test
PYTHONPATH: /app
STUB_GENERATION: "true"
depends_on:
db:
condition: service_healthy
storage:
condition: service_healthy
```
No healthcheck needed — the worker has no HTTP endpoint, and if it starts late, pending jobs simply wait in the queue until it picks them up.
---
## Testing
### Integration tests — no changes required
`tests/test_adventures.py` already polls with `_wait_for_adventure_status(client, id, "active", timeout=30)`. This pattern is compatible with the worker being a separate process: the test enqueues a job, the worker processes it asynchronously, and the test polls until the adventure status reflects completion.
With stub generation, the pipeline completes in milliseconds. The 30-second timeout is more than sufficient even accounting for worker container startup time.
The one risk is a **startup race**: if the first test creates an adventure before the worker container has opened its Procrastinate connection, the job sits in the queue unprocessed until the worker is ready. Since `docker compose up --wait` waits for containers with healthchecks to pass (i.e. `api` is healthy before tests run), and the worker starts immediately after `db` is healthy (a prerequisite already met before `api` is healthy), the worker will typically be ready before the first test fires. No action needed, but if this proves flaky in CI, adding a short `pg_isready`-style healthcheck to the worker is the fix.
### What the tests implicitly verify after migration
- `POST /adventures` returns `201` and adventure status is `'awaiting_first_entry'`
- Worker picks up the job and calls `run_entry_pipeline`
- Adventure transitions to `'active'`, entry to `'complete'` (polled via `_wait_for_adventure_status`) ✓
- Decision endpoint triggers a second pipeline job ✓
The existing tests cover all of this already. No new tests are required for the migration itself, though a test that verifies behaviour on worker restart (jobs not lost) would be a nice addition.
---
## Implementation Order
1. DONE: Add `procrastinate` (and `procrastinate[sqlalchemy]`) to `api/pyproject.toml` / `requirements`.
2. Write Alembic migration for Procrastinate schema. Run it locally.
3. Move stub services into their `app/outbound` directories
4. Create `app/tasks.py` with the `procrastinate_app` instance and the `generate_adventure_entry` task.
5. Create `app/worker_main.py`.
6. Modify `app/main.py`: remove `worker_loop`, add `procrastinate_app.open_async()` to lifespan.
7. Modify `app/routers/api/adventures.py`: replace `worker.enqueue(...)` with `tasks.generate_adventure_entry.defer_async(...)`.
8. Add `worker` service to all four compose files.
9. Run the test suite: `docker compose -f docker-compose.test.yml up --build --wait -d && pytest`.
10. Delete `app/worker.py`.
Steps 14 can be done before touching the API, so the migration can be tested end-to-end before cutting over.
---
## Open Questions
1. [ANSWERED] As mentioned above, so this small refactor, it makes sense. **Stub client placement.** The stub classes inside `adventures.py` need to be reachable from `tasks.py`. The proposal above lazy-imports them; the cleaner fix is to extract them to `app/outbound/stubs.py`. Doing this in the same PR keeps scope small but is worth doing if it avoids the circular-import smell.
2. **Worker concurrency.** `concurrency=4` is a placeholder. Adventure pipeline jobs are I/O-heavy (network calls), not CPU-heavy, so higher concurrency is fine. Tune based on Anthropic/DeepL API rate limits.
3. **Procrastinate schema management.** Procrastinate has its own versioned migration system (separate from Alembic). When upgrading Procrastinate in future, run `procrastinate schema --app=app.tasks.procrastinate_app migrate` (or wrap it in an Alembic migration). Don't forget this step.
4. **Observability.** Procrastinate emits structured log lines per job. These will appear in Loki automatically. A future improvement would be to add the `job_id` to the OpenTelemetry trace context inside `generate_adventure_entry`.
---
## Future: Scheduled Jobs (News Digest)
With Procrastinate in place, cron-style jobs are first-class citizens. Once `tasks.py` exists, adding a nightly job is:
```python
@procrastinate_app.periodic(cron="0 2 * * *") # 2am daily UTC
async def generate_nightly_news_digest() -> None:
async with AsyncSessionLocal() as db:
await NewsDigestService(db, ...).run()
```
The worker process runs periodic tasks automatically; no additional scheduler container is needed. Procrastinate tracks the last fire time in the `procrastinate_periodic_defers` table, so missed runs (e.g. worker was down) fire once on the next startup.