import logging import uuid from ...languages import SUPPORTED_LANGUAGES from ...outbound.anthropic.adventure_prompts import ( build_conversation_messages, build_entry_system_prompt, build_title_system_prompt, build_title_user_message, parse_entry_response, parse_title_response, ) from ...outbound.anthropic.anthropic_client import AnthropicClient from ...outbound.deepl.deepl_client import DeepLClient from ...outbound.gemini.gemini_client import GeminiClient from ...outbound.postgres.repositories.adventure_repository import ( PostgresAdventureEntryAudioRepository, PostgresAdventureEntryChoiceRepository, PostgresAdventureEntryDecisionRepository, PostgresAdventureEntryRepository, PostgresAdventureEntryTranslationRepository, PostgresAdventureRepository, ) from ...storage import upload_audio from ..models.adventure import ( Adventure, AdventureEntry, AdventureEntryPossibleChoiceDecision, ) logger = logging.getLogger(__name__) class AdventureService: def __init__( self, adventure_repo: PostgresAdventureRepository, entry_repo: PostgresAdventureEntryRepository, choice_repo: PostgresAdventureEntryChoiceRepository, decision_repo: PostgresAdventureEntryDecisionRepository, translation_repo: PostgresAdventureEntryTranslationRepository, audio_repo: PostgresAdventureEntryAudioRepository, anthropic_client: AnthropicClient, deepl_client: DeepLClient, gemini_client: GeminiClient, ) -> None: self.adventure_repo = adventure_repo self.entry_repo = entry_repo self.choice_repo = choice_repo self.decision_repo = decision_repo self.translation_repo = translation_repo self.audio_repo = audio_repo self.anthropic_client = anthropic_client self.deepl_client = deepl_client self.gemini_client = gemini_client async def create_adventure_for_user( self, user_id: uuid.UUID, language: str, source_language: str, competencies: list[str], genres: list[str], setting: list[str], vibes: list[str], protagonist: list[str], entry_word_count_range: list[int], max_entry_count: int = 6, ) -> tuple[Adventure, AdventureEntry]: """Creates the adventure and a placeholder for the first entry. Returns (adventure, first_entry) so the caller can enqueue pipeline work. """ adventure = await self.adventure_repo.create( user_id=user_id, language=language, source_language=source_language, competencies=competencies, genres=genres, setting=setting, vibes=vibes, protagonist=protagonist, max_entry_count=max_entry_count, entry_story_text_target_length={ "min": entry_word_count_range[0], "max": entry_word_count_range[1], }, ) first_entry = await self.entry_repo.create( adventure_id=uuid.UUID(adventure.id), entry_index=0, generated_from_choice_id=None, ) return adventure, first_entry async def record_decision_and_prepare_next_entry( self, adventure_id: uuid.UUID, choice_id: uuid.UUID, user_id: uuid.UUID, ) -> tuple[AdventureEntryPossibleChoiceDecision, AdventureEntry]: """Validates, records the player's decision, and creates the next entry placeholder. Returns (decision, next_entry) so the caller can enqueue pipeline work. Raises ValueError with keys: 'adventure_not_found' — missing or not owned by this user 'adventure_not_active' — e.g. complete or still generating 'choice_not_found' — choice id unknown 'choice_not_in_adventure' — choice belongs to a different adventure 'decision_already_made' — player already chose on this entry """ adventure = await self.adventure_repo.get_by_id(adventure_id) if adventure is None or adventure.user_id != str(user_id): raise ValueError("adventure_not_found") if adventure.status != "active": raise ValueError("adventure_not_active") choice = await self.choice_repo.get_by_id(choice_id) if choice is None: raise ValueError("choice_not_found") entry = await self.entry_repo.get_by_id(uuid.UUID(choice.entry_id)) if entry is None or entry.adventure_id != str(adventure_id): raise ValueError("choice_not_in_adventure") existing = await self.decision_repo.get_for_entry_and_user( entry_id=uuid.UUID(entry.id), user_id=user_id ) if existing is not None: raise ValueError("decision_already_made") decision = await self.decision_repo.create(choice_id=choice_id, user_id=user_id) next_entry = await self.entry_repo.create( adventure_id=adventure_id, entry_index=entry.entry_index + 1, generated_from_choice_id=choice_id, ) return decision, next_entry async def run_entry_pipeline( self, adventure_id: uuid.UUID, entry_id: uuid.UUID, ) -> None: """Full entry generation pipeline. Called from the worker queue. Sequence: LLM generation → parse → persist → translate → TTS → adventure title (first entry only) → update adventure status. On any error the entry and adventure are marked 'error'. """ try: adventure = await self.adventure_repo.get_by_id(adventure_id) assert adventure is not None, f"Adventure {adventure_id} not found" all_entries = await self.entry_repo.list_for_adventure(adventure_id) current_entry = next(e for e in all_entries if e.id == str(entry_id)) is_first_entry = current_entry.entry_index == 0 is_final_entry = current_entry.entry_index + 1 == adventure.max_entry_count prior_entries = await self._load_prior_entries_with_metadata( all_entries=[ e for e in all_entries if e.entry_index < current_entry.entry_index ], ) language_name = SUPPORTED_LANGUAGES.get( adventure.language, adventure.language ) competency = adventure.competencies[0] if adventure.competencies else "B1" system_prompt = build_entry_system_prompt( language_name=language_name, competency=competency, max_entry_count=adventure.max_entry_count, min_length=adventure.entry_story_text_target_length.get("min", 700), max_length=adventure.entry_story_text_target_length.get("max", 800), ) messages = build_conversation_messages( genres=adventure.genres, setting=adventure.setting, vibes=adventure.vibes, protagonist=adventure.protagonist, prior_entries=prior_entries, ) raw_text, usage_dict = await self.anthropic_client.complete( system_prompt=system_prompt, messages=messages, max_tokens=2048, ) story_text, choices_parsed, gm_notes = parse_entry_response(raw_text) await self.entry_repo.update_content( entry_id=entry_id, story_text=story_text, gamemaster_notes=gm_notes, llm_data=usage_dict, status="complete", ) if not is_final_entry: await self.choice_repo.create_many( entry_id=entry_id, choices=[ (i, label, text) for i, (label, text) in enumerate(choices_parsed) ], ) translated = await self.deepl_client.translate( story_text, adventure.source_language ) await self.translation_repo.create( entry_id=entry_id, component_type="story_text", target_language=adventure.source_language, translated_text=translated, ) voice = self.gemini_client.get_voice_by_language(adventure.language) wav_bytes = await self.gemini_client.generate_audio(story_text, voice) audio_key = f"adventure-audio/{entry_id}.wav" upload_audio(audio_key, wav_bytes) await self.audio_repo.create( entry_id=entry_id, component_type="story_text", tts_provider="google_gemini", tts_options={"voice": voice}, file_name=audio_key, ) if is_first_entry: title_system = build_title_system_prompt() title_user = build_title_user_message( story_text, language_name, adventure.genres ) title_raw, _ = await self.anthropic_client.complete( system_prompt=title_system, messages=[{"role": "user", "content": title_user}], max_tokens=200, ) title, description = parse_title_response(title_raw) await self.adventure_repo.update_title_and_description( adventure_id=adventure_id, title=title, description=description ) new_status = "complete" if is_final_entry else "active" await self.adventure_repo.update_status( adventure_id=adventure_id, status=new_status ) except Exception: logger.exception("Entry pipeline failed for entry %s", entry_id) try: await self.entry_repo.update_status(entry_id=entry_id, status="error") await self.adventure_repo.update_status( adventure_id=adventure_id, status="error" ) except Exception: logger.exception("Failed to mark entry/adventure as error") async def _load_prior_entries_with_metadata( self, all_entries: list[AdventureEntry], ) -> list[tuple[AdventureEntry, list, str | None]]: """Load choices for each prior entry and determine which choice was made. Returns a list of (entry, choices, chosen_label_or_None) tuples ready for build_conversation_messages(). """ sorted_entries = sorted(all_entries, key=lambda e: e.entry_index) result = [] for i, entry in enumerate(sorted_entries): choices = await self.choice_repo.list_for_entry(uuid.UUID(entry.id)) chosen_label: str | None = None if i + 1 < len(sorted_entries): next_entry = sorted_entries[i + 1] if next_entry.generated_from_choice_id: chosen = next( ( c for c in choices if c.id == next_entry.generated_from_choice_id ), None, ) if chosen: chosen_label = chosen.label result.append((entry, choices, chosen_label)) return result