import uuid from datetime import datetime, timezone from functools import partial from fastapi import APIRouter, Depends, HTTPException from pydantic import BaseModel from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select from ..auth import verify_token from ..database import get_db, AsyncSessionLocal from ..models import Job from ..storage import upload_audio from ..services import tts, job_repo from ..services.tts import VOICE_BY_LANGUAGE from .. import worker router = APIRouter(prefix="/jobs", tags=["jobs"]) class JobResponse(BaseModel): id: uuid.UUID status: str source_language: str target_language: str complexity_level: str created_at: datetime started_at: datetime | None = None completed_at: datetime | None = None # only present on success generated_text: str | None = None translated_text: str | None = None input_summary: str | None = None audio_url: str | None = None # only present on failure error_message: str | None = None model_config = { "from_attributes": True } class JobSummary(BaseModel): id: uuid.UUID status: str created_at: datetime class JobListResponse(BaseModel): jobs: list[JobSummary] model_config = { "from_attributes": True } @router.get("/", response_model=JobListResponse) async def get_jobs( db: AsyncSession = Depends(get_db), _: dict = Depends(verify_token) ) -> JobListResponse: try: result = await db.execute(select(Job).order_by(Job.created_at.desc())) jobs = result.scalars().all() return { "jobs": jobs } except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @router.get("/{job_id}", response_model=JobResponse) async def get_job( job_id: str, db: AsyncSession = Depends(get_db), _: dict = Depends(verify_token), ) -> JobResponse: try: uid = uuid.UUID(job_id) except ValueError: raise HTTPException(status_code=400, detail="Invalid job ID format") job: Job | None = await db.get(Job, uid) if job is None: raise HTTPException(status_code=404, detail="Job not found") response = JobResponse( id=str(job.id), status=job.status, source_language=job.source_language, target_language=job.target_language, complexity_level=job.complexity_level, created_at=job.created_at, started_at=job.started_at, completed_at=job.completed_at, ) if job.status == "succeeded": response.generated_text = job.generated_text response.translated_text = job.translated_text response.input_summary = job.input_summary response.audio_url = job.audio_url elif job.status == "failed": response.error_message = job.error_message return response async def _run_regenerate_audio(job_id: uuid.UUID) -> None: async with AsyncSessionLocal() as db: job = await db.get(Job, job_id) await job_repo.mark_processing(db, job) try: voice = VOICE_BY_LANGUAGE.get(job.target_language, "Kore") wav_bytes = await tts.generate_audio(job.generated_text, voice) audio_key = f"audio/{job_id}.wav" upload_audio(audio_key, wav_bytes) await job_repo.mark_succeeded(db, job, audio_key) except Exception as exc: await job_repo.mark_failed(db, job, str(exc)) @router.post("/{job_id}/regenerate-audio", status_code=202) async def regenerate_audio( job_id: str, db: AsyncSession = Depends(get_db), token_data: dict = Depends(verify_token), ) -> dict: try: uid = uuid.UUID(job_id) except ValueError: raise HTTPException(status_code=400, detail="Invalid job ID format") job: Job | None = await db.get(Job, uid) if job is None: raise HTTPException(status_code=404, detail="Job not found") if str(job.user_id) != token_data["sub"]: raise HTTPException(status_code=403, detail="Not authorized to modify this job") if not job.generated_text: raise HTTPException(status_code=400, detail="Job has no generated text to synthesize") if job.audio_url: raise HTTPException(status_code=409, detail="Job already has audio") if job.status == "processing": raise HTTPException(status_code=409, detail="Job is already processing") await worker.enqueue(partial(_run_regenerate_audio, uid)) return {"job_id": job_id}