feat: [api] Add choose your own adventure functionality
Some checks are pending
/ test (push) Waiting to run

This commit is contained in:
wilson 2026-05-03 17:17:47 +01:00
parent 65b30753f0
commit 8b687e9737
10 changed files with 1792 additions and 0 deletions

View file

@ -0,0 +1,72 @@
from dataclasses import dataclass
from datetime import datetime
@dataclass
class Adventure:
id: str
user_id: str
status: str # 'awaiting_first_entry' | 'active' | 'complete' | 'error'
language: str
source_language: str
competencies: list[str]
max_entry_count: int
entry_story_text_target_length: dict # {"min": int, "max": int}
title: str
description: str | None
plot_summary: str | None
genres: list[str]
setting: list[str]
vibes: list[str]
protagonist: list[str]
created_at: datetime
deleted_at: datetime | None
@dataclass
class AdventureEntry:
id: str
adventure_id: str
generated_from_choice_id: str | None
status: str # 'generating' | 'complete' | 'error'
entry_index: int
story_text: str | None
gamemaster_notes: str | None
llm_data: dict | None
created_at: datetime
@dataclass
class AdventureEntryPossibleChoice:
id: str
entry_id: str
index: int
label: str
text: str
@dataclass
class AdventureEntryPossibleChoiceDecision:
id: str
choice_id: str
user_id: str
created_at: datetime
@dataclass
class AdventureEntryTranslation:
id: str
entry_id: str
component_type: str
target_language: str
translated_text: str
@dataclass
class AdventureEntryAudio:
id: str
entry_id: str
component_type: str
tts_provider: str
tts_options: dict | None
file_name: str

View file

@ -0,0 +1,267 @@
import logging
import uuid
from ...outbound.anthropic.adventure_prompts import (
build_conversation_messages,
build_entry_system_prompt,
build_title_system_prompt,
build_title_user_message,
parse_entry_response,
parse_title_response,
)
from ...outbound.anthropic.anthropic_client import AnthropicClient
from ...outbound.deepl.deepl_client import DeepLClient
from ...outbound.gemini.gemini_client import GeminiClient
from ...outbound.postgres.repositories.adventure_repository import (
PostgresAdventureEntryAudioRepository,
PostgresAdventureEntryChoiceRepository,
PostgresAdventureEntryDecisionRepository,
PostgresAdventureEntryRepository,
PostgresAdventureEntryTranslationRepository,
PostgresAdventureRepository,
)
from ...storage import upload_audio
from ..models.adventure import Adventure, AdventureEntry, AdventureEntryPossibleChoiceDecision
from ...languages import SUPPORTED_LANGUAGES
logger = logging.getLogger(__name__)
class AdventureService:
def __init__(
self,
adventure_repo: PostgresAdventureRepository,
entry_repo: PostgresAdventureEntryRepository,
choice_repo: PostgresAdventureEntryChoiceRepository,
decision_repo: PostgresAdventureEntryDecisionRepository,
translation_repo: PostgresAdventureEntryTranslationRepository,
audio_repo: PostgresAdventureEntryAudioRepository,
anthropic_client: AnthropicClient,
deepl_client: DeepLClient,
gemini_client: GeminiClient,
) -> None:
self.adventure_repo = adventure_repo
self.entry_repo = entry_repo
self.choice_repo = choice_repo
self.decision_repo = decision_repo
self.translation_repo = translation_repo
self.audio_repo = audio_repo
self.anthropic_client = anthropic_client
self.deepl_client = deepl_client
self.gemini_client = gemini_client
async def create_adventure_for_user(
self,
user_id: uuid.UUID,
language: str,
source_language: str,
competencies: list[str],
genres: list[str],
setting: list[str],
vibes: list[str],
protagonist: list[str],
max_entry_count: int = 6,
) -> tuple[Adventure, AdventureEntry]:
"""Creates the adventure and a placeholder for the first entry.
Returns (adventure, first_entry) so the caller can enqueue pipeline work.
"""
adventure = await self.adventure_repo.create(
user_id=user_id,
language=language,
source_language=source_language,
competencies=competencies,
genres=genres,
setting=setting,
vibes=vibes,
protagonist=protagonist,
max_entry_count=max_entry_count,
entry_story_text_target_length={"min": 700, "max": 800},
)
first_entry = await self.entry_repo.create(
adventure_id=uuid.UUID(adventure.id),
entry_index=0,
generated_from_choice_id=None,
)
return adventure, first_entry
async def record_decision_and_prepare_next_entry(
self,
adventure_id: uuid.UUID,
choice_id: uuid.UUID,
user_id: uuid.UUID,
) -> tuple[AdventureEntryPossibleChoiceDecision, AdventureEntry]:
"""Validates, records the player's decision, and creates the next entry placeholder.
Returns (decision, next_entry) so the caller can enqueue pipeline work.
Raises ValueError with keys:
'adventure_not_found' missing or not owned by this user
'adventure_not_active' e.g. complete or still generating
'choice_not_found' choice id unknown
'choice_not_in_adventure' choice belongs to a different adventure
'decision_already_made' player already chose on this entry
"""
adventure = await self.adventure_repo.get_by_id(adventure_id)
if adventure is None or adventure.user_id != str(user_id):
raise ValueError("adventure_not_found")
if adventure.status != "active":
raise ValueError("adventure_not_active")
choice = await self.choice_repo.get_by_id(choice_id)
if choice is None:
raise ValueError("choice_not_found")
entry = await self.entry_repo.get_by_id(uuid.UUID(choice.entry_id))
if entry is None or entry.adventure_id != str(adventure_id):
raise ValueError("choice_not_in_adventure")
existing = await self.decision_repo.get_for_entry_and_user(
entry_id=uuid.UUID(entry.id), user_id=user_id
)
if existing is not None:
raise ValueError("decision_already_made")
decision = await self.decision_repo.create(choice_id=choice_id, user_id=user_id)
next_entry = await self.entry_repo.create(
adventure_id=adventure_id,
entry_index=entry.entry_index + 1,
generated_from_choice_id=choice_id,
)
return decision, next_entry
async def run_entry_pipeline(
self,
adventure_id: uuid.UUID,
entry_id: uuid.UUID,
) -> None:
"""Full entry generation pipeline. Called from the worker queue.
Sequence: LLM generation parse persist translate TTS
adventure title (first entry only) update adventure status.
On any error the entry and adventure are marked 'error'.
"""
try:
adventure = await self.adventure_repo.get_by_id(adventure_id)
assert adventure is not None, f"Adventure {adventure_id} not found"
all_entries = await self.entry_repo.list_for_adventure(adventure_id)
current_entry = next(e for e in all_entries if e.id == str(entry_id))
is_first_entry = current_entry.entry_index == 0
is_final_entry = current_entry.entry_index + 1 == adventure.max_entry_count
prior_entries = await self._load_prior_entries_with_metadata(
all_entries=[e for e in all_entries if e.entry_index < current_entry.entry_index],
)
language_name = SUPPORTED_LANGUAGES.get(adventure.language, adventure.language)
competency = adventure.competencies[0] if adventure.competencies else "B1"
system_prompt = build_entry_system_prompt(
language_name=language_name,
competency=competency,
max_entry_count=adventure.max_entry_count,
min_length=adventure.entry_story_text_target_length.get("min", 700),
max_length=adventure.entry_story_text_target_length.get("max", 800),
)
messages = build_conversation_messages(
genres=adventure.genres,
setting=adventure.setting,
vibes=adventure.vibes,
protagonist=adventure.protagonist,
prior_entries=prior_entries,
)
raw_text, usage_dict = await self.anthropic_client.complete(
system_prompt=system_prompt,
messages=messages,
max_tokens=2048,
)
story_text, choices_parsed, gm_notes = parse_entry_response(raw_text)
await self.entry_repo.update_content(
entry_id=entry_id,
story_text=story_text,
gamemaster_notes=gm_notes,
llm_data=usage_dict,
status="complete",
)
if not is_final_entry:
await self.choice_repo.create_many(
entry_id=entry_id,
choices=[(i, label, text) for i, (label, text) in enumerate(choices_parsed)],
)
translated = await self.deepl_client.translate(story_text, adventure.source_language)
await self.translation_repo.create(
entry_id=entry_id,
component_type="story_text",
target_language=adventure.source_language,
translated_text=translated,
)
voice = self.gemini_client.get_voice_by_language(adventure.language)
wav_bytes = await self.gemini_client.generate_audio(story_text, voice)
audio_key = f"adventure-audio/{entry_id}.wav"
upload_audio(audio_key, wav_bytes)
await self.audio_repo.create(
entry_id=entry_id,
component_type="story_text",
tts_provider="google_gemini",
tts_options={"voice": voice},
file_name=audio_key,
)
if is_first_entry:
title_system = build_title_system_prompt()
title_user = build_title_user_message(story_text, language_name, adventure.genres)
title_raw, _ = await self.anthropic_client.complete(
system_prompt=title_system,
messages=[{"role": "user", "content": title_user}],
max_tokens=200,
)
title, description = parse_title_response(title_raw)
await self.adventure_repo.update_title_and_description(
adventure_id=adventure_id, title=title, description=description
)
new_status = "complete" if is_final_entry else "active"
await self.adventure_repo.update_status(adventure_id=adventure_id, status=new_status)
except Exception:
logger.exception("Entry pipeline failed for entry %s", entry_id)
try:
await self.entry_repo.update_status(entry_id=entry_id, status="error")
await self.adventure_repo.update_status(adventure_id=adventure_id, status="error")
except Exception:
logger.exception("Failed to mark entry/adventure as error")
async def _load_prior_entries_with_metadata(
self,
all_entries: list[AdventureEntry],
) -> list[tuple[AdventureEntry, list, str | None]]:
"""Load choices for each prior entry and determine which choice was made.
Returns a list of (entry, choices, chosen_label_or_None) tuples ready for
build_conversation_messages().
"""
sorted_entries = sorted(all_entries, key=lambda e: e.entry_index)
result = []
for i, entry in enumerate(sorted_entries):
choices = await self.choice_repo.list_for_entry(uuid.UUID(entry.id))
chosen_label: str | None = None
if i + 1 < len(sorted_entries):
next_entry = sorted_entries[i + 1]
if next_entry.generated_from_choice_id:
chosen = next(
(c for c in choices if c.id == next_entry.generated_from_choice_id),
None,
)
if chosen:
chosen_label = chosen.label
result.append((entry, choices, chosen_label))
return result

View file

@ -0,0 +1,158 @@
"""
Pure functions that translate adventure domain objects into LLM inputs and
parse LLM outputs back into domain values.
Nothing in this module makes network calls or holds state. The service layer
loads the data; these functions do the translation.
"""
import re
from ...domain.models.adventure import AdventureEntry, AdventureEntryPossibleChoice
def build_entry_system_prompt(
language_name: str,
competency: str,
max_entry_count: int,
min_length: int,
max_length: int,
) -> str:
halfway = max(1, max_entry_count // 2)
return (
f"You are an experienced tabletop game master running a single-player one-shot campaign "
f"in a \"choose your own adventure\" format.\n\n"
f"You are helping the player learn {language_name}. Your writing respects their "
f"intelligence, avoids too many clichés, delivers satisfying plot beats, and reads naturally.\n\n"
f"The session is {max_entry_count} turns. Each turn: you write a story passage, then offer "
f"4 numbered choices. The player replies with their choice; you continue accordingly. "
f"By turn {max_entry_count} there needs to be a clear end. As the player's choices reveal "
f"their character, weave those details back into the story. "
f"Don't railroad them until at least turn {halfway}.\n\n"
f"Rules:\n"
f"- Write entirely in {language_name} at {competency} level on the CEFR scale. "
f"No markdown — plaintext only.\n"
f"- Your response MUST be in exactly three parts, each separated by a line containing "
f"only \"-----\".\n"
f"- Part 1: the story entry, {min_length}{max_length} words, speaking directly to the player.\n"
f"- Part 2: exactly 4 numbered player options, one per line, labelled \"1.\", \"2.\", \"3.\", \"4.\".\n"
f"- Part 3: GM notes to your future self (hidden from the player). "
f"If no notes, write \"no notes\".\n"
f"- Your first message must establish: who the player is, the setting, and the broad direction.\n"
f"- No sexual content or graphic violence. Romance, threat, and adventure are fine (12-certificate)."
)
def build_title_system_prompt() -> str:
return (
"You are a creative writing assistant. Given the opening passage of a choose-your-own-adventure "
"story, generate a short title and a one-sentence description for it.\n\n"
"Respond with exactly two lines of plain text:\n"
"Line 1: the title (max 60 characters, no quotes or labels)\n"
"Line 2: the description (max 200 characters, no quotes or labels)"
)
def build_initial_user_message(
genres: list[str],
setting: list[str],
vibes: list[str],
protagonist: list[str],
) -> str:
return (
"Please begin the adventure with the following details:\n"
f"- Genre: {', '.join(genres)}\n"
f"- Setting: {', '.join(setting)}\n"
f"- Vibes: {', '.join(vibes)}\n"
f"- Protagonist: {', '.join(protagonist)}"
)
def build_title_user_message(
first_entry_text: str,
language_name: str,
genres: list[str],
) -> str:
return (
f"This is the opening passage of a {', '.join(genres)} adventure written in {language_name}:\n\n"
f"{first_entry_text}"
)
def reconstruct_assistant_message(
entry: AdventureEntry,
choices: list[AdventureEntryPossibleChoice],
) -> str:
"""Rebuild the original three-part LLM response from stored entry data."""
options_block = "\n".join(
f"{c.label}. {c.text}" for c in sorted(choices, key=lambda c: c.index)
)
gm_block = entry.gamemaster_notes or "no notes"
return f"{entry.story_text}\n-----\n{options_block}\n-----\n{gm_block}"
def build_conversation_messages(
genres: list[str],
setting: list[str],
vibes: list[str],
protagonist: list[str],
prior_entries: list[tuple[AdventureEntry, list[AdventureEntryPossibleChoice], str | None]],
) -> list[dict]:
"""Build the full messages array for an Anthropic API call.
prior_entries is a list of (entry, choices_for_that_entry, chosen_label_or_None).
The chosen label is the label of the option the player picked to advance past that entry.
For the most recent completed entry it will be None (no choice made yet).
"""
messages: list[dict] = [
{"role": "user", "content": build_initial_user_message(genres, setting, vibes, protagonist)}
]
for entry, choices, chosen_label in prior_entries:
messages.append(
{"role": "assistant", "content": reconstruct_assistant_message(entry, choices)}
)
if chosen_label is not None:
messages.append({"role": "user", "content": chosen_label})
return messages
def parse_entry_response(text: str) -> tuple[str, list[tuple[str, str]], str]:
"""Parse a three-part LLM entry response.
Returns (story_text, choices, gm_notes).
choices is a list of (label, text) pairs e.g. [("1", "Go into the house"), ...].
Raises ValueError if the format cannot be parsed.
"""
parts = text.split("\n-----\n")
if len(parts) < 3:
parts = text.split("-----\n")
if len(parts) < 3:
raise ValueError(f"LLM response has {len(parts)} section(s); expected 3")
story_text = parts[0].strip()
options_raw = parts[1].strip()
gm_notes = "\n-----\n".join(parts[2:]).strip()
choices: list[tuple[str, str]] = []
for line in options_raw.splitlines():
line = line.strip()
if not line:
continue
m = re.match(r"^(\d+)[.)]\s+(.+)$", line)
if m:
choices.append((m.group(1), m.group(2).strip()))
if not choices:
raise ValueError("No choices parsed from LLM response options section")
return story_text, choices, gm_notes
def parse_title_response(text: str) -> tuple[str, str]:
"""Parse a two-line title/description response.
Returns (title, description). Falls back gracefully if only one line is present.
"""
lines = [l.strip() for l in text.strip().splitlines() if l.strip()]
title = lines[0][:60] if lines else "Untitled Adventure"
description = lines[1][:200] if len(lines) > 1 else ""
return title, description

View file

@ -41,6 +41,35 @@ class AnthropicClient():
f"{source_material}"
)
async def complete(
self,
system_prompt: str,
messages: list[dict],
model: str = "claude-sonnet-4-6",
max_tokens: int = 2048,
) -> tuple[str, dict]:
"""Generic text completion.
Returns (response_text, usage_dict) where usage_dict contains provider,
model name, and token counts for cost tracking.
"""
def _call() -> tuple[str, dict]:
message = self._client.messages.create(
model=model,
max_tokens=max_tokens,
system=system_prompt,
messages=messages,
)
usage = {
"provider": "anthropic",
"model": model,
"input_tokens": message.usage.input_tokens,
"output_tokens": message.usage.output_tokens,
}
return message.content[0].text, usage
return await asyncio.to_thread(_call)
async def generate_summary_text(
self,
content_to_summarise: str,

View file

@ -0,0 +1,144 @@
import uuid
from datetime import datetime, timezone
from sqlalchemy import DateTime, ForeignKey, Integer, Text, UniqueConstraint
from sqlalchemy.dialects.postgresql import JSONB, UUID
from sqlalchemy.orm import Mapped, mapped_column
from ..database import Base
class AdventureEntity(Base):
__tablename__ = "choose_your_own_adventure"
id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
user_id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True), ForeignKey("users.id", ondelete="CASCADE"), nullable=False, index=True
)
status: Mapped[str] = mapped_column(Text, nullable=False, default="awaiting_first_entry")
language: Mapped[str] = mapped_column(Text, nullable=False)
source_language: Mapped[str] = mapped_column(Text, nullable=False)
competencies: Mapped[list] = mapped_column(JSONB, nullable=False, default=list)
max_entry_count: Mapped[int] = mapped_column(Integer, nullable=False, default=6)
entry_story_text_target_length: Mapped[dict] = mapped_column(
JSONB, nullable=False, default=lambda: {"min": 700, "max": 800}
)
title: Mapped[str] = mapped_column(Text, nullable=False, default="Untitled adventure")
description: Mapped[str | None] = mapped_column(Text, nullable=True)
plot_summary: Mapped[str | None] = mapped_column(Text, nullable=True)
genres: Mapped[list] = mapped_column(JSONB, nullable=False, default=list)
setting: Mapped[list] = mapped_column(JSONB, nullable=False, default=list)
vibes: Mapped[list] = mapped_column(JSONB, nullable=False, default=list)
protagonist: Mapped[list] = mapped_column(JSONB, nullable=False, default=list)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False, default=lambda: datetime.now(timezone.utc)
)
deleted_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True)
class AdventureEntryEntity(Base):
__tablename__ = "choose_your_own_adventure_entry"
__table_args__ = (
UniqueConstraint("adventure_id", "entry_index", name="uq_cyoa_entry_adventure_index"),
)
id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
adventure_id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True),
ForeignKey("choose_your_own_adventure.id", ondelete="CASCADE"),
nullable=False,
index=True,
)
generated_from_choice_id: Mapped[uuid.UUID | None] = mapped_column(
UUID(as_uuid=True),
ForeignKey(
"choose_your_own_adventure_entry_possible_choice.id", ondelete="SET NULL"
),
nullable=True,
)
status: Mapped[str] = mapped_column(Text, nullable=False, default="generating")
entry_index: Mapped[int] = mapped_column(Integer, nullable=False)
story_text: Mapped[str | None] = mapped_column(Text, nullable=True)
gamemaster_notes: Mapped[str | None] = mapped_column(Text, nullable=True)
llm_data: Mapped[dict | None] = mapped_column(JSONB, nullable=True)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False, default=lambda: datetime.now(timezone.utc)
)
class AdventureEntryPossibleChoiceEntity(Base):
__tablename__ = "choose_your_own_adventure_entry_possible_choice"
__table_args__ = (
UniqueConstraint("entry_id", "index", name="uq_cyoa_choice_entry_index"),
)
id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
entry_id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True),
ForeignKey("choose_your_own_adventure_entry.id", ondelete="CASCADE"),
nullable=False,
index=True,
)
index: Mapped[int] = mapped_column(Integer, nullable=False)
label: Mapped[str] = mapped_column(Text, nullable=False)
text: Mapped[str] = mapped_column(Text, nullable=False)
class AdventureEntryPossibleChoiceDecisionEntity(Base):
__tablename__ = "choose_your_own_adventure_entry_possible_choice_decision"
id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
choice_id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True),
ForeignKey(
"choose_your_own_adventure_entry_possible_choice.id", ondelete="CASCADE"
),
nullable=False,
index=True,
)
user_id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True), ForeignKey("users.id", ondelete="CASCADE"), nullable=False, index=True
)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False, default=lambda: datetime.now(timezone.utc)
)
class AdventureEntryTranslationEntity(Base):
__tablename__ = "choose_your_own_adventure_entry_translation"
__table_args__ = (
UniqueConstraint(
"entry_id", "component_type", "target_language",
name="uq_cyoa_translation_entry_component_lang",
),
)
id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
entry_id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True),
ForeignKey("choose_your_own_adventure_entry.id", ondelete="CASCADE"),
nullable=False,
index=True,
)
component_type: Mapped[str] = mapped_column(Text, nullable=False, default="story_text")
target_language: Mapped[str] = mapped_column(Text, nullable=False)
translated_text: Mapped[str] = mapped_column(Text, nullable=False)
class AdventureEntryAudioEntity(Base):
__tablename__ = "choose_your_own_adventure_entry_audio"
__table_args__ = (
UniqueConstraint("entry_id", "component_type", name="uq_cyoa_audio_entry_component"),
)
id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
entry_id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True),
ForeignKey("choose_your_own_adventure_entry.id", ondelete="CASCADE"),
nullable=False,
index=True,
)
component_type: Mapped[str] = mapped_column(Text, nullable=False, default="story_text")
tts_provider: Mapped[str] = mapped_column(Text, nullable=False, default="google_gemini")
tts_options: Mapped[dict | None] = mapped_column(JSONB, nullable=True)
file_name: Mapped[str] = mapped_column(Text, nullable=False)

View file

@ -0,0 +1,403 @@
import uuid
from datetime import datetime, timezone
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from ....domain.models.adventure import (
Adventure,
AdventureEntry,
AdventureEntryAudio,
AdventureEntryPossibleChoice,
AdventureEntryPossibleChoiceDecision,
AdventureEntryTranslation,
)
from ..entities.adventure_entities import (
AdventureEntity,
AdventureEntryAudioEntity,
AdventureEntryEntity,
AdventureEntryPossibleChoiceDecisionEntity,
AdventureEntryPossibleChoiceEntity,
AdventureEntryTranslationEntity,
)
def _to_adventure(e: AdventureEntity) -> Adventure:
return Adventure(
id=str(e.id),
user_id=str(e.user_id),
status=e.status,
language=e.language,
source_language=e.source_language,
competencies=e.competencies,
max_entry_count=e.max_entry_count,
entry_story_text_target_length=e.entry_story_text_target_length,
title=e.title,
description=e.description,
plot_summary=e.plot_summary,
genres=e.genres,
setting=e.setting,
vibes=e.vibes,
protagonist=e.protagonist,
created_at=e.created_at,
deleted_at=e.deleted_at,
)
def _to_entry(e: AdventureEntryEntity) -> AdventureEntry:
return AdventureEntry(
id=str(e.id),
adventure_id=str(e.adventure_id),
generated_from_choice_id=str(e.generated_from_choice_id) if e.generated_from_choice_id else None,
status=e.status,
entry_index=e.entry_index,
story_text=e.story_text,
gamemaster_notes=e.gamemaster_notes,
llm_data=e.llm_data,
created_at=e.created_at,
)
def _to_choice(e: AdventureEntryPossibleChoiceEntity) -> AdventureEntryPossibleChoice:
return AdventureEntryPossibleChoice(
id=str(e.id),
entry_id=str(e.entry_id),
index=e.index,
label=e.label,
text=e.text,
)
def _to_decision(e: AdventureEntryPossibleChoiceDecisionEntity) -> AdventureEntryPossibleChoiceDecision:
return AdventureEntryPossibleChoiceDecision(
id=str(e.id),
choice_id=str(e.choice_id),
user_id=str(e.user_id),
created_at=e.created_at,
)
def _to_translation(e: AdventureEntryTranslationEntity) -> AdventureEntryTranslation:
return AdventureEntryTranslation(
id=str(e.id),
entry_id=str(e.entry_id),
component_type=e.component_type,
target_language=e.target_language,
translated_text=e.translated_text,
)
def _to_audio(e: AdventureEntryAudioEntity) -> AdventureEntryAudio:
return AdventureEntryAudio(
id=str(e.id),
entry_id=str(e.entry_id),
component_type=e.component_type,
tts_provider=e.tts_provider,
tts_options=e.tts_options,
file_name=e.file_name,
)
class PostgresAdventureRepository:
def __init__(self, db: AsyncSession) -> None:
self.db = db
async def create(
self,
user_id: uuid.UUID,
language: str,
source_language: str,
competencies: list[str],
genres: list[str],
setting: list[str],
vibes: list[str],
protagonist: list[str],
max_entry_count: int,
entry_story_text_target_length: dict,
) -> Adventure:
entity = AdventureEntity(
user_id=user_id,
language=language,
source_language=source_language,
competencies=competencies,
genres=genres,
setting=setting,
vibes=vibes,
protagonist=protagonist,
max_entry_count=max_entry_count,
entry_story_text_target_length=entry_story_text_target_length,
)
self.db.add(entity)
await self.db.commit()
await self.db.refresh(entity)
return _to_adventure(entity)
async def get_by_id(self, adventure_id: uuid.UUID) -> Adventure | None:
result = await self.db.execute(
select(AdventureEntity).where(AdventureEntity.id == adventure_id)
)
entity = result.scalar_one_or_none()
return _to_adventure(entity) if entity else None
async def list_for_user(self, user_id: uuid.UUID) -> list[Adventure]:
result = await self.db.execute(
select(AdventureEntity)
.where(AdventureEntity.user_id == user_id, AdventureEntity.deleted_at.is_(None))
.order_by(AdventureEntity.created_at.desc())
)
return [_to_adventure(e) for e in result.scalars().all()]
async def update_status(self, adventure_id: uuid.UUID, status: str) -> Adventure:
result = await self.db.execute(
select(AdventureEntity).where(AdventureEntity.id == adventure_id)
)
entity = result.scalar_one()
entity.status = status
await self.db.commit()
await self.db.refresh(entity)
return _to_adventure(entity)
async def update_title_and_description(
self, adventure_id: uuid.UUID, title: str, description: str
) -> Adventure:
result = await self.db.execute(
select(AdventureEntity).where(AdventureEntity.id == adventure_id)
)
entity = result.scalar_one()
entity.title = title
entity.description = description
await self.db.commit()
await self.db.refresh(entity)
return _to_adventure(entity)
async def soft_delete(self, adventure_id: uuid.UUID) -> Adventure:
result = await self.db.execute(
select(AdventureEntity).where(AdventureEntity.id == adventure_id)
)
entity = result.scalar_one()
entity.deleted_at = datetime.now(timezone.utc)
await self.db.commit()
await self.db.refresh(entity)
return _to_adventure(entity)
class PostgresAdventureEntryRepository:
def __init__(self, db: AsyncSession) -> None:
self.db = db
async def create(
self,
adventure_id: uuid.UUID,
entry_index: int,
generated_from_choice_id: uuid.UUID | None,
) -> AdventureEntry:
entity = AdventureEntryEntity(
adventure_id=adventure_id,
entry_index=entry_index,
generated_from_choice_id=generated_from_choice_id,
)
self.db.add(entity)
await self.db.commit()
await self.db.refresh(entity)
return _to_entry(entity)
async def get_by_id(self, entry_id: uuid.UUID) -> AdventureEntry | None:
result = await self.db.execute(
select(AdventureEntryEntity).where(AdventureEntryEntity.id == entry_id)
)
entity = result.scalar_one_or_none()
return _to_entry(entity) if entity else None
async def list_for_adventure(self, adventure_id: uuid.UUID) -> list[AdventureEntry]:
result = await self.db.execute(
select(AdventureEntryEntity)
.where(AdventureEntryEntity.adventure_id == adventure_id)
.order_by(AdventureEntryEntity.entry_index.asc())
)
return [_to_entry(e) for e in result.scalars().all()]
async def update_content(
self,
entry_id: uuid.UUID,
story_text: str,
gamemaster_notes: str,
llm_data: dict,
status: str,
) -> AdventureEntry:
result = await self.db.execute(
select(AdventureEntryEntity).where(AdventureEntryEntity.id == entry_id)
)
entity = result.scalar_one()
entity.story_text = story_text
entity.gamemaster_notes = gamemaster_notes
entity.llm_data = llm_data
entity.status = status
await self.db.commit()
await self.db.refresh(entity)
return _to_entry(entity)
async def update_status(self, entry_id: uuid.UUID, status: str) -> AdventureEntry:
result = await self.db.execute(
select(AdventureEntryEntity).where(AdventureEntryEntity.id == entry_id)
)
entity = result.scalar_one()
entity.status = status
await self.db.commit()
await self.db.refresh(entity)
return _to_entry(entity)
async def count_complete(self, adventure_id: uuid.UUID) -> int:
result = await self.db.execute(
select(func.count()).select_from(AdventureEntryEntity).where(
AdventureEntryEntity.adventure_id == adventure_id,
AdventureEntryEntity.status == "complete",
)
)
return result.scalar_one()
class PostgresAdventureEntryChoiceRepository:
def __init__(self, db: AsyncSession) -> None:
self.db = db
async def create_many(
self,
entry_id: uuid.UUID,
choices: list[tuple[int, str, str]], # (index, label, text)
) -> list[AdventureEntryPossibleChoice]:
entities = [
AdventureEntryPossibleChoiceEntity(
entry_id=entry_id, index=index, label=label, text=text
)
for index, label, text in choices
]
for e in entities:
self.db.add(e)
await self.db.commit()
for e in entities:
await self.db.refresh(e)
return [_to_choice(e) for e in entities]
async def get_by_id(self, choice_id: uuid.UUID) -> AdventureEntryPossibleChoice | None:
result = await self.db.execute(
select(AdventureEntryPossibleChoiceEntity).where(
AdventureEntryPossibleChoiceEntity.id == choice_id
)
)
entity = result.scalar_one_or_none()
return _to_choice(entity) if entity else None
async def list_for_entry(self, entry_id: uuid.UUID) -> list[AdventureEntryPossibleChoice]:
result = await self.db.execute(
select(AdventureEntryPossibleChoiceEntity)
.where(AdventureEntryPossibleChoiceEntity.entry_id == entry_id)
.order_by(AdventureEntryPossibleChoiceEntity.index.asc())
)
return [_to_choice(e) for e in result.scalars().all()]
class PostgresAdventureEntryDecisionRepository:
def __init__(self, db: AsyncSession) -> None:
self.db = db
async def create(
self, choice_id: uuid.UUID, user_id: uuid.UUID
) -> AdventureEntryPossibleChoiceDecision:
entity = AdventureEntryPossibleChoiceDecisionEntity(
choice_id=choice_id, user_id=user_id
)
self.db.add(entity)
await self.db.commit()
await self.db.refresh(entity)
return _to_decision(entity)
async def get_for_entry_and_user(
self, entry_id: uuid.UUID, user_id: uuid.UUID
) -> AdventureEntryPossibleChoiceDecision | None:
result = await self.db.execute(
select(AdventureEntryPossibleChoiceDecisionEntity)
.join(
AdventureEntryPossibleChoiceEntity,
AdventureEntryPossibleChoiceDecisionEntity.choice_id
== AdventureEntryPossibleChoiceEntity.id,
)
.where(
AdventureEntryPossibleChoiceEntity.entry_id == entry_id,
AdventureEntryPossibleChoiceDecisionEntity.user_id == user_id,
)
)
entity = result.scalar_one_or_none()
return _to_decision(entity) if entity else None
class PostgresAdventureEntryTranslationRepository:
def __init__(self, db: AsyncSession) -> None:
self.db = db
async def create(
self,
entry_id: uuid.UUID,
component_type: str,
target_language: str,
translated_text: str,
) -> AdventureEntryTranslation:
entity = AdventureEntryTranslationEntity(
entry_id=entry_id,
component_type=component_type,
target_language=target_language,
translated_text=translated_text,
)
self.db.add(entity)
await self.db.commit()
await self.db.refresh(entity)
return _to_translation(entity)
async def get_for_entry(
self, entry_id: uuid.UUID, component_type: str, target_language: str
) -> AdventureEntryTranslation | None:
result = await self.db.execute(
select(AdventureEntryTranslationEntity).where(
AdventureEntryTranslationEntity.entry_id == entry_id,
AdventureEntryTranslationEntity.component_type == component_type,
AdventureEntryTranslationEntity.target_language == target_language,
)
)
entity = result.scalar_one_or_none()
return _to_translation(entity) if entity else None
class PostgresAdventureEntryAudioRepository:
def __init__(self, db: AsyncSession) -> None:
self.db = db
async def create(
self,
entry_id: uuid.UUID,
component_type: str,
tts_provider: str,
tts_options: dict,
file_name: str,
) -> AdventureEntryAudio:
entity = AdventureEntryAudioEntity(
entry_id=entry_id,
component_type=component_type,
tts_provider=tts_provider,
tts_options=tts_options,
file_name=file_name,
)
self.db.add(entity)
await self.db.commit()
await self.db.refresh(entity)
return _to_audio(entity)
async def get_for_entry(
self, entry_id: uuid.UUID, component_type: str
) -> AdventureEntryAudio | None:
result = await self.db.execute(
select(AdventureEntryAudioEntity).where(
AdventureEntryAudioEntity.entry_id == entry_id,
AdventureEntryAudioEntity.component_type == component_type,
)
)
entity = result.scalar_one_or_none()
return _to_audio(entity) if entity else None

View file

@ -0,0 +1,425 @@
import io
import uuid
import wave
from functools import partial
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
from sqlalchemy.ext.asyncio import AsyncSession
from ...auth import verify_token
from ...config import settings
from ...domain.services.adventure_service import AdventureService
from ...languages import SUPPORTED_LANGUAGES
from ...outbound.anthropic.anthropic_client import AnthropicClient
from ...outbound.deepl.deepl_client import DeepLClient
from ...outbound.gemini.gemini_client import GeminiClient
from ...outbound.postgres.database import AsyncSessionLocal, get_db
from ...outbound.postgres.repositories.adventure_repository import (
PostgresAdventureEntryAudioRepository,
PostgresAdventureEntryChoiceRepository,
PostgresAdventureEntryDecisionRepository,
PostgresAdventureEntryRepository,
PostgresAdventureEntryTranslationRepository,
PostgresAdventureRepository,
)
from ... import worker
router = APIRouter(prefix="/adventures", tags=["adventures"])
# ---------------------------------------------------------------------------
# Stub clients for the test environment (STUB_GENERATION=true)
# ---------------------------------------------------------------------------
_STUB_ENTRY_RESPONSE = (
"Vous vous retrouvez dans une ruelle sombre de Paris. "
"Une silhouette mystérieuse s'approche lentement.\n"
"-----\n"
"1. Suivez la silhouette dans l'obscurité\n"
"2. Restez dans l'ombre et observez\n"
"3. Demandez de l'aide à voix haute\n"
"4. Courez vers la lumière au bout de la ruelle\n"
"-----\n"
"no notes"
)
_STUB_TITLE_RESPONSE = (
"La Nuit Parisienne\n"
"Une aventure mystérieuse dans les rues sombres de Paris."
)
class _StubAnthropicClient:
async def complete(
self,
system_prompt: str,
messages: list[dict],
model: str = "claude-sonnet-4-6",
max_tokens: int = 2048,
) -> tuple[str, dict]:
usage = {"provider": "stub", "model": "stub", "input_tokens": 0, "output_tokens": 0}
if "game master" in system_prompt.lower():
return _STUB_ENTRY_RESPONSE, usage
return _STUB_TITLE_RESPONSE, usage
class _StubDeepLClient:
def can_translate_to(self, lang: str) -> bool:
return True
async def translate(self, text: str, to_language: str, context: str | None = None) -> str:
return f"[STUB] {text[:120]}"
class _StubGeminiClient:
def get_voice_by_language(self, lang: str) -> str:
return "Stub"
async def generate_audio(self, text: str, voice: str) -> bytes:
buf = io.BytesIO()
with wave.open(buf, "wb") as wf:
wf.setnchannels(1)
wf.setsampwidth(2)
wf.setframerate(24000)
wf.writeframes(b"\x00" * 480)
return buf.getvalue()
# ---------------------------------------------------------------------------
# Service factory
# ---------------------------------------------------------------------------
def _make_service(db: AsyncSession) -> AdventureService:
if settings.stub_generation:
anthropic = _StubAnthropicClient() # type: ignore[assignment]
deepl = _StubDeepLClient() # type: ignore[assignment]
gemini = _StubGeminiClient() # type: ignore[assignment]
else:
anthropic = AnthropicClient.new(settings.anthropic_api_key)
deepl = DeepLClient(settings.deepl_api_key)
gemini = GeminiClient(settings.gemini_api_key)
return AdventureService(
adventure_repo=PostgresAdventureRepository(db),
entry_repo=PostgresAdventureEntryRepository(db),
choice_repo=PostgresAdventureEntryChoiceRepository(db),
decision_repo=PostgresAdventureEntryDecisionRepository(db),
translation_repo=PostgresAdventureEntryTranslationRepository(db),
audio_repo=PostgresAdventureEntryAudioRepository(db),
anthropic_client=anthropic,
deepl_client=deepl,
gemini_client=gemini,
)
async def _run_entry_pipeline_task(
adventure_id: uuid.UUID, entry_id: uuid.UUID
) -> None:
async with AsyncSessionLocal() as db:
await _make_service(db).run_entry_pipeline(adventure_id, entry_id)
# ---------------------------------------------------------------------------
# Request / response models
# ---------------------------------------------------------------------------
class CreateAdventureRequest(BaseModel):
language: str
source_language: str
competencies: list[str]
genres: list[str]
setting: list[str]
vibes: list[str]
protagonist: list[str]
max_entry_count: int = 6
class AdventureResponse(BaseModel):
id: str
user_id: str
status: str
language: str
source_language: str
competencies: list[str]
max_entry_count: int
title: str
description: str | None
genres: list[str]
setting: list[str]
vibes: list[str]
protagonist: list[str]
created_at: str
class CreateDecisionRequest(BaseModel):
choice_id: str
class DecisionResponse(BaseModel):
id: str
choice_id: str
user_id: str
created_at: str
class EntryResponse(BaseModel):
id: str
adventure_id: str
generated_from_choice_id: str | None
status: str
entry_index: int
story_text: str | None
created_at: str
class ChoiceResponse(BaseModel):
id: str
index: int
label: str
text: str
class EntryDetailResponse(BaseModel):
id: str
adventure_id: str
generated_from_choice_id: str | None
status: str
entry_index: int
story_text: str | None
created_at: str
choices: list[ChoiceResponse]
translation: str | None
audio_file_name: str | None
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _to_adventure_response(adventure) -> AdventureResponse:
return AdventureResponse(
id=adventure.id,
user_id=adventure.user_id,
status=adventure.status,
language=adventure.language,
source_language=adventure.source_language,
competencies=adventure.competencies,
max_entry_count=adventure.max_entry_count,
title=adventure.title,
description=adventure.description,
genres=adventure.genres,
setting=adventure.setting,
vibes=adventure.vibes,
protagonist=adventure.protagonist,
created_at=adventure.created_at.isoformat(),
)
def _parse_adventure_id(adventure_id: str) -> uuid.UUID:
try:
return uuid.UUID(adventure_id)
except ValueError:
raise HTTPException(status_code=400, detail="Invalid adventure_id")
# ---------------------------------------------------------------------------
# Endpoints
# ---------------------------------------------------------------------------
@router.post("", response_model=AdventureResponse, status_code=201)
async def create_adventure(
body: CreateAdventureRequest,
db: AsyncSession = Depends(get_db),
token_data: dict = Depends(verify_token),
) -> AdventureResponse:
user_id = uuid.UUID(token_data["sub"])
if body.language not in SUPPORTED_LANGUAGES:
raise HTTPException(
status_code=400,
detail=f"Unsupported language '{body.language}'. Supported: {list(SUPPORTED_LANGUAGES)}",
)
deepl_client = DeepLClient(settings.deepl_api_key) if not settings.stub_generation else _StubDeepLClient() # type: ignore[assignment]
if not deepl_client.can_translate_to(body.source_language):
raise HTTPException(
status_code=400,
detail=f"Cannot translate to source language '{body.source_language}'",
)
adventure, first_entry = await _make_service(db).create_adventure_for_user(
user_id=user_id,
language=body.language,
source_language=body.source_language,
competencies=body.competencies,
genres=body.genres,
setting=body.setting,
vibes=body.vibes,
protagonist=body.protagonist,
max_entry_count=body.max_entry_count,
)
await worker.enqueue(
partial(_run_entry_pipeline_task, uuid.UUID(adventure.id), uuid.UUID(first_entry.id))
)
return _to_adventure_response(adventure)
@router.get("", response_model=list[AdventureResponse])
async def list_adventures(
db: AsyncSession = Depends(get_db),
token_data: dict = Depends(verify_token),
) -> list[AdventureResponse]:
user_id = uuid.UUID(token_data["sub"])
adventures = await PostgresAdventureRepository(db).list_for_user(user_id)
return [_to_adventure_response(a) for a in adventures]
@router.get("/{adventure_id}", response_model=AdventureResponse)
async def get_adventure(
adventure_id: str,
db: AsyncSession = Depends(get_db),
token_data: dict = Depends(verify_token),
) -> AdventureResponse:
user_id = uuid.UUID(token_data["sub"])
adventure = await PostgresAdventureRepository(db).get_by_id(_parse_adventure_id(adventure_id))
if adventure is None or adventure.user_id != str(user_id):
raise HTTPException(status_code=404, detail="Adventure not found")
return _to_adventure_response(adventure)
@router.delete("/{adventure_id}", status_code=204)
async def delete_adventure(
adventure_id: str,
db: AsyncSession = Depends(get_db),
token_data: dict = Depends(verify_token),
) -> None:
user_id = uuid.UUID(token_data["sub"])
repo = PostgresAdventureRepository(db)
adventure = await repo.get_by_id(_parse_adventure_id(adventure_id))
if adventure is None or adventure.user_id != str(user_id):
raise HTTPException(status_code=404, detail="Adventure not found")
await repo.soft_delete(uuid.UUID(adventure.id))
@router.post("/{adventure_id}/decisions", response_model=DecisionResponse, status_code=201)
async def record_decision(
adventure_id: str,
body: CreateDecisionRequest,
db: AsyncSession = Depends(get_db),
token_data: dict = Depends(verify_token),
) -> DecisionResponse:
user_id = uuid.UUID(token_data["sub"])
try:
choice_id = uuid.UUID(body.choice_id)
except ValueError:
raise HTTPException(status_code=400, detail="Invalid choice_id")
try:
decision, next_entry = await _make_service(db).record_decision_and_prepare_next_entry(
adventure_id=_parse_adventure_id(adventure_id),
choice_id=choice_id,
user_id=user_id,
)
except ValueError as exc:
key = str(exc)
if key == "adventure_not_found":
raise HTTPException(status_code=404, detail="Adventure not found")
if key == "adventure_not_active":
raise HTTPException(status_code=409, detail="adventure_not_active")
if key in ("choice_not_found", "choice_not_in_adventure"):
raise HTTPException(status_code=404, detail="Choice not found")
if key == "decision_already_made":
raise HTTPException(status_code=409, detail="decision_already_made")
raise HTTPException(status_code=400, detail=key)
await worker.enqueue(
partial(
_run_entry_pipeline_task,
uuid.UUID(next_entry.adventure_id),
uuid.UUID(next_entry.id),
)
)
return DecisionResponse(
id=decision.id,
choice_id=decision.choice_id,
user_id=decision.user_id,
created_at=decision.created_at.isoformat(),
)
@router.get("/{adventure_id}/entries", response_model=list[EntryResponse])
async def list_entries(
adventure_id: str,
db: AsyncSession = Depends(get_db),
token_data: dict = Depends(verify_token),
) -> list[EntryResponse]:
user_id = uuid.UUID(token_data["sub"])
adv_id = _parse_adventure_id(adventure_id)
adventure = await PostgresAdventureRepository(db).get_by_id(adv_id)
if adventure is None or adventure.user_id != str(user_id):
raise HTTPException(status_code=404, detail="Adventure not found")
entries = await PostgresAdventureEntryRepository(db).list_for_adventure(adv_id)
return [
EntryResponse(
id=e.id,
adventure_id=e.adventure_id,
generated_from_choice_id=e.generated_from_choice_id,
status=e.status,
entry_index=e.entry_index,
story_text=e.story_text,
created_at=e.created_at.isoformat(),
)
for e in entries
]
@router.get("/{adventure_id}/entries/{entry_id}", response_model=EntryDetailResponse)
async def get_entry(
adventure_id: str,
entry_id: str,
db: AsyncSession = Depends(get_db),
token_data: dict = Depends(verify_token),
) -> EntryDetailResponse:
user_id = uuid.UUID(token_data["sub"])
adv_id = _parse_adventure_id(adventure_id)
adventure = await PostgresAdventureRepository(db).get_by_id(adv_id)
if adventure is None or adventure.user_id != str(user_id):
raise HTTPException(status_code=404, detail="Adventure not found")
try:
eid = uuid.UUID(entry_id)
except ValueError:
raise HTTPException(status_code=400, detail="Invalid entry_id")
entry = await PostgresAdventureEntryRepository(db).get_by_id(eid)
if entry is None or entry.adventure_id != str(adv_id):
raise HTTPException(status_code=404, detail="Entry not found")
choices = await PostgresAdventureEntryChoiceRepository(db).list_for_entry(eid)
translation = await PostgresAdventureEntryTranslationRepository(db).get_for_entry(
entry_id=eid,
component_type="story_text",
target_language=adventure.source_language,
)
audio = await PostgresAdventureEntryAudioRepository(db).get_for_entry(
entry_id=eid, component_type="story_text"
)
return EntryDetailResponse(
id=entry.id,
adventure_id=entry.adventure_id,
generated_from_choice_id=entry.generated_from_choice_id,
status=entry.status,
entry_index=entry.entry_index,
story_text=entry.story_text,
created_at=entry.created_at.isoformat(),
choices=[
ChoiceResponse(id=c.id, index=c.index, label=c.label, text=c.text) for c in choices
],
translation=translation.translated_text if translation else None,
audio_file_name=audio.file_name if audio else None,
)

View file

@ -10,6 +10,7 @@ from .learnable_languages import router as learnable_languages_router
from .vocab import router as vocab_router
from .packs import router as packs_router
from .admin.packs import router as admin_packs_router
from .adventures import router as adventures_router
from fastapi import APIRouter
@ -27,3 +28,4 @@ api_router.include_router(learnable_languages_router)
api_router.include_router(vocab_router)
api_router.include_router(packs_router)
api_router.include_router(admin_packs_router)
api_router.include_router(adventures_router)

View file

@ -44,6 +44,7 @@ services:
STORAGE_ACCESS_KEY: langlearn_test
STORAGE_SECRET_KEY: testpassword123
STORAGE_BUCKET: langlearn-test
STUB_GENERATION: "true"
depends_on:
db:
condition: service_healthy

291
tests/test_adventures.py Normal file
View file

@ -0,0 +1,291 @@
import time
import uuid
import httpx
import pytest
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _register_and_login(client: httpx.Client, email: str, password: str = "password123") -> str:
client.post("/auth/register", json={"email": email, "password": password})
resp = client.post("/auth/login", json={"email": email, "password": password})
return resp.json()["access_token"]
def _auth_client(client: httpx.Client, email: str) -> httpx.Client:
token = _register_and_login(client, email)
client.headers["Authorization"] = f"Bearer {token}"
return client
def _wait_for_adventure_status(
client: httpx.Client,
adventure_id: str,
expected_status: str,
timeout: int = 30,
) -> dict:
deadline = time.time() + timeout
while time.time() < deadline:
resp = client.get(f"/api/adventures/{adventure_id}")
assert resp.status_code == 200, resp.text
if resp.json()["status"] == expected_status:
return resp.json()
time.sleep(0.5)
raise TimeoutError(
f"Adventure {adventure_id} did not reach '{expected_status}' within {timeout}s. "
f"Last status: {client.get(f'/api/adventures/{adventure_id}').json().get('status')}"
)
_DEFAULT_ADVENTURE_BODY = {
"language": "fr",
"source_language": "en",
"competencies": ["B1"],
"genres": ["crime fiction"],
"setting": ["Paris", "city"],
"vibes": ["dark"],
"protagonist": ["male", "late-teens"],
}
@pytest.fixture
def user_client(client: httpx.Client) -> httpx.Client:
email = f"adventure-user-{uuid.uuid4()}@example.com"
return _auth_client(client, email)
@pytest.fixture
def second_user_client(client: httpx.Client) -> httpx.Client:
email = f"adventure-user2-{uuid.uuid4()}@example.com"
return _auth_client(client, email)
# ---------------------------------------------------------------------------
# Test 1: Adventure creation and first-entry pipeline
# ---------------------------------------------------------------------------
def test_create_adventure_generates_first_entry(user_client: httpx.Client) -> None:
resp = user_client.post("/api/adventures", json=_DEFAULT_ADVENTURE_BODY)
assert resp.status_code == 201
body = resp.json()
adventure_id = body["id"]
assert body["status"] == "awaiting_first_entry"
assert body["title"] == "Untitled adventure"
adventure = _wait_for_adventure_status(user_client, adventure_id, "active")
assert adventure["title"] != "Untitled adventure"
assert adventure["description"] is not None
entries_resp = user_client.get(f"/api/adventures/{adventure_id}/entries")
assert entries_resp.status_code == 200
entries = entries_resp.json()
assert len(entries) == 1
assert entries[0]["status"] == "complete"
assert entries[0]["entry_index"] == 0
assert entries[0]["story_text"] is not None
detail_resp = user_client.get(f"/api/adventures/{adventure_id}/entries/{entries[0]['id']}")
assert detail_resp.status_code == 200
detail = detail_resp.json()
assert len(detail["choices"]) == 4
assert detail["translation"] is not None
assert detail["audio_file_name"] is not None
# ---------------------------------------------------------------------------
# Test 2: Recording a decision generates the next entry
# ---------------------------------------------------------------------------
def test_record_decision_generates_next_entry(user_client: httpx.Client) -> None:
resp = user_client.post("/api/adventures", json=_DEFAULT_ADVENTURE_BODY)
adventure_id = resp.json()["id"]
_wait_for_adventure_status(user_client, adventure_id, "active")
entries = user_client.get(f"/api/adventures/{adventure_id}/entries").json()
detail = user_client.get(
f"/api/adventures/{adventure_id}/entries/{entries[0]['id']}"
).json()
choice_id = detail["choices"][0]["id"]
decision_resp = user_client.post(
f"/api/adventures/{adventure_id}/decisions", json={"choice_id": choice_id}
)
assert decision_resp.status_code == 201
assert decision_resp.json()["choice_id"] == choice_id
_wait_for_adventure_status(user_client, adventure_id, "active")
entries = user_client.get(f"/api/adventures/{adventure_id}/entries").json()
assert len(entries) == 2
second = next(e for e in entries if e["entry_index"] == 1)
assert second["status"] == "complete"
assert second["generated_from_choice_id"] == choice_id
# ---------------------------------------------------------------------------
# Test 3: Adventure completes after max_entry_count entries
# ---------------------------------------------------------------------------
def test_adventure_completes_at_max_entries(user_client: httpx.Client) -> None:
body = {**_DEFAULT_ADVENTURE_BODY, "max_entry_count": 2}
resp = user_client.post("/api/adventures", json=body)
adventure_id = resp.json()["id"]
_wait_for_adventure_status(user_client, adventure_id, "active")
entries = user_client.get(f"/api/adventures/{adventure_id}/entries").json()
detail = user_client.get(
f"/api/adventures/{adventure_id}/entries/{entries[0]['id']}"
).json()
choice_id = detail["choices"][0]["id"]
user_client.post(
f"/api/adventures/{adventure_id}/decisions", json={"choice_id": choice_id}
)
adventure = _wait_for_adventure_status(user_client, adventure_id, "complete")
assert adventure["status"] == "complete"
entries = user_client.get(f"/api/adventures/{adventure_id}/entries").json()
assert len(entries) == 2
final_detail = user_client.get(
f"/api/adventures/{adventure_id}/entries/{entries[1]['id']}"
).json()
assert final_detail["choices"] == []
# Decision on a complete adventure returns 409
extra = user_client.post(
f"/api/adventures/{adventure_id}/decisions", json={"choice_id": choice_id}
)
assert extra.status_code == 409
assert "not_active" in extra.json()["detail"]
# ---------------------------------------------------------------------------
# Test 4: Double-decision on the same entry is rejected
# ---------------------------------------------------------------------------
def test_cannot_make_second_decision_on_same_entry(user_client: httpx.Client) -> None:
resp = user_client.post("/api/adventures", json=_DEFAULT_ADVENTURE_BODY)
adventure_id = resp.json()["id"]
_wait_for_adventure_status(user_client, adventure_id, "active")
entries = user_client.get(f"/api/adventures/{adventure_id}/entries").json()
detail = user_client.get(
f"/api/adventures/{adventure_id}/entries/{entries[0]['id']}"
).json()
choice_id = detail["choices"][0]["id"]
other_choice_id = detail["choices"][1]["id"]
first = user_client.post(
f"/api/adventures/{adventure_id}/decisions", json={"choice_id": choice_id}
)
assert first.status_code == 201
second = user_client.post(
f"/api/adventures/{adventure_id}/decisions", json={"choice_id": other_choice_id}
)
assert second.status_code == 409
assert "decision_already_made" in second.json()["detail"]
# ---------------------------------------------------------------------------
# Test 5: User isolation — one user cannot see or interact with another's adventure
# ---------------------------------------------------------------------------
def test_user_cannot_access_another_users_adventure(
user_client: httpx.Client, second_user_client: httpx.Client
) -> None:
resp = user_client.post("/api/adventures", json=_DEFAULT_ADVENTURE_BODY)
adventure_id = resp.json()["id"]
assert second_user_client.get(f"/api/adventures/{adventure_id}").status_code == 404
assert second_user_client.get(f"/api/adventures/{adventure_id}/entries").status_code == 404
decision_resp = second_user_client.post(
f"/api/adventures/{adventure_id}/decisions",
json={"choice_id": str(uuid.uuid4())},
)
assert decision_resp.status_code == 404
# ---------------------------------------------------------------------------
# Test 6: Soft-delete removes adventure from list
# ---------------------------------------------------------------------------
def test_soft_delete_hides_adventure(user_client: httpx.Client) -> None:
resp = user_client.post("/api/adventures", json=_DEFAULT_ADVENTURE_BODY)
adventure_id = resp.json()["id"]
delete_resp = user_client.delete(f"/api/adventures/{adventure_id}")
assert delete_resp.status_code == 204
# Should no longer appear in the list
adventures = user_client.get("/api/adventures").json()
assert not any(a["id"] == adventure_id for a in adventures)
# Direct GET also 404s after deletion
assert user_client.get(f"/api/adventures/{adventure_id}").status_code == 404
# ---------------------------------------------------------------------------
# Unit-level: LLM response parser (no Docker / network required)
# ---------------------------------------------------------------------------
def _parse(text: str):
"""Local copy of the parser for isolated unit testing."""
import re
parts = text.split("\n-----\n")
if len(parts) < 3:
parts = text.split("-----\n")
if len(parts) < 3:
raise ValueError(f"LLM response has {len(parts)} section(s); expected 3")
story_text = parts[0].strip()
options_raw = parts[1].strip()
gm_notes = "\n-----\n".join(parts[2:]).strip()
choices = []
for line in options_raw.splitlines():
line = line.strip()
if not line:
continue
m = re.match(r"^(\d+)[.)]\s+(.+)$", line)
if m:
choices.append((m.group(1), m.group(2).strip()))
if not choices:
raise ValueError("No choices parsed from LLM response options section")
return story_text, choices, gm_notes
def test_parse_valid_three_section_response() -> None:
text = (
"Story text here.\n-----\n"
"1. Option one\n2. Option two\n3. Option three\n4. Option four\n"
"-----\nno notes"
)
story, choices, notes = _parse(text)
assert story == "Story text here."
assert len(choices) == 4
assert choices[0] == ("1", "Option one")
assert choices[3] == ("4", "Option four")
assert notes == "no notes"
def test_parse_with_parenthesis_delimiters() -> None:
text = "Story.\n-----\n1) First\n2) Second\n3) Third\n4) Fourth\n-----\nGM note."
_, choices, notes = _parse(text)
assert len(choices) == 4
assert choices[2] == ("3", "Third")
assert notes == "GM note."
def test_parse_missing_sections_raises() -> None:
with pytest.raises(ValueError, match="section"):
_parse("Only one section here, no separators at all.")