import asyncio import logging import time import uuid from ...languages import SUPPORTED_LANGUAGES from ...outbound.anthropic.adventure_prompts import ( build_conversation_messages, build_entry_system_prompt, build_title_system_prompt, build_title_user_message, parse_entry_response, parse_title_response, ) from ...outbound.anthropic.anthropic_client import AnthropicClient from ...outbound.deepl.deepl_client import DeepLClient from ...outbound.gemini.gemini_client import GeminiClient from ...outbound.postgres.repositories.adventure_repository import ( PostgresAdventureEntryAudioRepository, PostgresAdventureEntryChoiceRepository, PostgresAdventureEntryDecisionRepository, PostgresAdventureEntryRepository, PostgresAdventureEntryTranslationRepository, PostgresAdventureRepository, ) from ...outbound.spacy.spacy_client import SpacyClient from ...storage import upload_audio from ..models.adventure import ( Adventure, AdventureEntry, AdventureEntryPossibleChoice, AdventureEntryPossibleChoiceDecision, ) logger = logging.getLogger(__name__) def _base_language(lang: str) -> str: """Normalise a language tag like 'en-US' or 'EN' to a bare ISO 639-1 code.""" return lang.split("-")[0].split("_")[0].lower() class AdventureService: def __init__( self, adventure_repo: PostgresAdventureRepository, entry_repo: PostgresAdventureEntryRepository, choice_repo: PostgresAdventureEntryChoiceRepository, decision_repo: PostgresAdventureEntryDecisionRepository, translation_repo: PostgresAdventureEntryTranslationRepository, audio_repo: PostgresAdventureEntryAudioRepository, anthropic_client: AnthropicClient, deepl_client: DeepLClient, gemini_client: GeminiClient, spacy_client: SpacyClient, ) -> None: self.adventure_repo = adventure_repo self.entry_repo = entry_repo self.choice_repo = choice_repo self.decision_repo = decision_repo self.translation_repo = translation_repo self.audio_repo = audio_repo self.anthropic_client = anthropic_client self.deepl_client = deepl_client self.gemini_client = gemini_client self.spacy_client = spacy_client async def create_adventure_for_user( self, user_id: uuid.UUID, language: str, source_language: str, competencies: list[str], genres: list[str], setting: list[str], vibes: list[str], protagonist: list[str], entry_word_count_range: list[int], max_entry_count: int = 6, ) -> tuple[Adventure, AdventureEntry]: """Creates the adventure and a placeholder for the first entry. Returns (adventure, first_entry) so the caller can enqueue pipeline work. """ adventure = await self.adventure_repo.create( user_id=user_id, language=language, source_language=source_language, competencies=competencies, genres=genres, setting=setting, vibes=vibes, protagonist=protagonist, max_entry_count=max_entry_count, entry_story_text_target_length={ "min": entry_word_count_range[0], "max": entry_word_count_range[1], }, ) first_entry = await self.entry_repo.create( adventure_id=uuid.UUID(adventure.id), entry_index=0, generated_from_choice_id=None, ) return adventure, first_entry async def record_decision_and_prepare_next_entry( self, adventure_id: uuid.UUID, choice_id: uuid.UUID, user_id: uuid.UUID, ) -> tuple[AdventureEntryPossibleChoiceDecision, AdventureEntry]: """Validates, records the player's decision, and creates the next entry placeholder. Returns (decision, next_entry) so the caller can enqueue pipeline work. Raises ValueError with keys: 'adventure_not_found' — missing or not owned by this user 'adventure_not_active' — e.g. complete or still generating 'choice_not_found' — choice id unknown 'choice_not_in_adventure' — choice belongs to a different adventure 'decision_already_made' — player already chose on this entry """ adventure = await self.adventure_repo.get_by_id(adventure_id) if adventure is None or adventure.user_id != str(user_id): raise ValueError("adventure_not_found") if adventure.status != "active": raise ValueError("adventure_not_active") choice = await self.choice_repo.get_by_id(choice_id) if choice is None: raise ValueError("choice_not_found") entry = await self.entry_repo.get_by_id(uuid.UUID(choice.entry_id)) if entry is None or entry.adventure_id != str(adventure_id): raise ValueError("choice_not_in_adventure") existing = await self.decision_repo.get_for_entry_and_user( entry_id=uuid.UUID(entry.id), user_id=user_id ) if existing is not None: raise ValueError("decision_already_made") decision = await self.decision_repo.create(choice_id=choice_id, user_id=user_id) next_entry = await self.entry_repo.create( adventure_id=adventure_id, entry_index=entry.entry_index + 1, generated_from_choice_id=choice_id, ) return decision, next_entry async def run_entry_pipeline( self, adventure_id: uuid.UUID, entry_id: uuid.UUID, user_id: uuid.UUID, ) -> None: """Full entry generation pipeline. Called from the worker queue. Sequence: LLM generation → parse → persist → NLP + per-sentence translation → TTS → adventure title (first entry only) → update adventure status. On any error the entry and adventure are marked 'error'. """ try: adventure = await self.adventure_repo.get_by_id(adventure_id) assert adventure is not None, f"Adventure {adventure_id} not found" all_entries = await self.entry_repo.list_for_adventure(adventure_id) current_entry = next(e for e in all_entries if e.id == str(entry_id)) is_first_entry = current_entry.entry_index == 0 is_final_entry = current_entry.entry_index + 1 == adventure.max_entry_count prior_entries_with_possible_choices = await self._load_possible_choices_for_entries( all_entries=[ e for e in all_entries if e.entry_index < current_entry.entry_index ], user_id=user_id, ) prior_decisions = await self.decision_repo.list_for_adventure_and_user( adventure_id=adventure_id, user_id=user_id ) language_name = SUPPORTED_LANGUAGES.get(adventure.language, adventure.language) competency = adventure.competencies[0] if adventure.competencies else "B1" system_prompt = build_entry_system_prompt( language_name=language_name, competency=competency, max_entry_count=adventure.max_entry_count, min_length=adventure.entry_story_text_target_length.get("min", 700), max_length=adventure.entry_story_text_target_length.get("max", 800), ) messages = build_conversation_messages( genres=adventure.genres, setting=adventure.setting, vibes=adventure.vibes, protagonist=adventure.protagonist, prior_entries_with_choices=prior_entries_with_possible_choices, ) # ── LLM generation ────────────────────────────────────────────── t0 = time.monotonic() raw_text, usage_dict = await self.anthropic_client.complete( system_prompt=system_prompt, messages=messages, max_tokens=2048, ) timing_text_generation = time.monotonic() - t0 story_text, choices_parsed, gm_notes = parse_entry_response(raw_text) await self.entry_repo.update_content( entry_id=entry_id, story_text=story_text, gamemaster_notes=gm_notes, llm_data=usage_dict, status="complete", ) if not is_final_entry: await self.choice_repo.create_many( entry_id=entry_id, choices=[ (i, label, text) for i, (label, text) in enumerate(choices_parsed) ], ) # ── Per-sentence NLP + translation ─────────────────────────────── target_lang = _base_language(adventure.language) source_lang = _base_language(adventure.source_language) timing_nlp = 0.0 timing_translations = 0.0 paragraphs = [p.strip() for p in story_text.split("\n\n") if p.strip()] linguistic_paragraphs = [] for para_idx, paragraph_text in enumerate(paragraphs): t0 = time.monotonic() target_nlp = await asyncio.to_thread( self.spacy_client.get_parts_of_speech, paragraph_text, target_lang ) timing_nlp += time.monotonic() - t0 linguistic_sentences = [] translated_sentence_texts = [] for sent_idx, target_sent in enumerate(target_nlp["sentences"]): t0 = time.monotonic() translated_sentence = await self.deepl_client.translate( target_sent["text"], adventure.source_language ) timing_translations += time.monotonic() - t0 t0 = time.monotonic() source_nlp = await asyncio.to_thread( self.spacy_client.get_parts_of_speech, translated_sentence, source_lang, ) timing_nlp += time.monotonic() - t0 source_tokens = ( source_nlp["sentences"][0]["tokens"] if source_nlp["sentences"] else [] ) translated_sentence_texts.append(translated_sentence) linguistic_sentences.append( { "index": sent_idx, "source_text": translated_sentence, "target_text": target_sent["text"], "source_tokens": source_tokens, "target_tokens": target_sent["tokens"], } ) translated_paragraph = " ".join(translated_sentence_texts) linguistic_paragraphs.append( { "index": para_idx, "source_text": translated_paragraph, "target_text": paragraph_text, "sentences": linguistic_sentences, } ) story_text_linguistic_data = { "source_language": source_lang, "target_language": target_lang, "paragraphs": linguistic_paragraphs, } full_translated_text = "\n\n".join( p["source_text"] for p in linguistic_paragraphs ) await self.translation_repo.create( entry_id=entry_id, component_type="story_text", target_language=adventure.source_language, translated_text=full_translated_text, ) # ── TTS ────────────────────────────────────────────────────────── t0 = time.monotonic() voice = self.gemini_client.get_voice_by_language(adventure.language) story_text_with_tag = "[like a dungeons and dragons gamemaster] " + story_text wav_bytes = await self.gemini_client.generate_audio(story_text_with_tag, voice) timing_tts = time.monotonic() - t0 # ── File upload ─────────────────────────────────────────────────── t0 = time.monotonic() audio_key = f"adventure-audio/{entry_id}.wav" upload_audio(audio_key, wav_bytes) timing_file_uploading = time.monotonic() - t0 await self.audio_repo.create( entry_id=entry_id, component_type="story_text", tts_provider="google_gemini", tts_options={"voice": voice}, file_name=audio_key, ) # ── Persist linguistic data + timing ───────────────────────────── pipeline_timing = { "durations": { "text_generation": round(timing_text_generation, 1), "translations_total": round(timing_translations, 1), "nlp_total": round(timing_nlp, 1), "tts": round(timing_tts, 1), "file_uploading": round(timing_file_uploading, 1), } } await self.entry_repo.update_linguistic_data( entry_id=entry_id, story_text_linguistic_data=story_text_linguistic_data, pipeline_timing=pipeline_timing, ) # ── Adventure title (first entry only) ──────────────────────────── if is_first_entry: title_system = build_title_system_prompt() title_user = build_title_user_message( story_text, language_name, adventure.genres, gm_notes ) title_raw, _ = await self.anthropic_client.complete( system_prompt=title_system, messages=[{"role": "user", "content": title_user}], max_tokens=200, ) title, description = parse_title_response(title_raw) await self.adventure_repo.update_title_and_description( adventure_id=adventure_id, title=title, description=description ) new_status = "complete" if is_final_entry else "active" await self.adventure_repo.update_status( adventure_id=adventure_id, status=new_status ) except Exception: logger.exception("Entry pipeline failed for entry %s", entry_id) try: await self.entry_repo.update_status(entry_id=entry_id, status="error") await self.adventure_repo.update_status( adventure_id=adventure_id, status="error" ) except Exception: logger.exception("Failed to mark entry/adventure as error") async def _load_possible_choices_for_entries( self, all_entries: list[AdventureEntry], user_id: uuid.UUID, ) -> list[tuple[AdventureEntry, list[AdventureEntryPossibleChoice], str | None]]: """Load choices for each prior entry and determine which choice was made. Returns a list of (entry, choices, selected_choice_id) tuples ready for build_conversation_messages(). """ result = [] for entry in sorted(all_entries, key=lambda e: e.entry_index): choices = await self.choice_repo.list_for_entry(uuid.UUID(entry.id)) decision = await self.decision_repo.get_for_entry_and_user(entry_id = uuid.UUID(entry.id), user_id=user_id) selected_choice_id = decision.choice_id if decision else None result.append((entry, choices, selected_choice_id)) return result