language-learning-app/api/app/routers/api/jobs.py
2026-03-21 20:47:15 +00:00

147 lines
4.4 KiB
Python

import uuid
from datetime import datetime, timezone
from functools import partial
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from ...auth import verify_token
from ...database import get_db, AsyncSessionLocal
from ...models import Job
from ...storage import upload_audio
from ...services import tts, job_repo
from ...services.tts import VOICE_BY_LANGUAGE
from ... import worker
router = APIRouter(prefix="/jobs", dependencies=[Depends(verify_token)])
class JobResponse(BaseModel):
id: uuid.UUID
status: str
source_language: str
target_language: str
complexity_level: str
created_at: datetime
started_at: datetime | None = None
completed_at: datetime | None = None
# only present on success
generated_text: str | None = None
translated_text: str | None = None
input_summary: str | None = None
audio_url: str | None = None
# only present on failure
error_message: str | None = None
model_config = {"from_attributes": True}
class JobSummary(BaseModel):
id: uuid.UUID
status: str
created_at: datetime
class JobListResponse(BaseModel):
jobs: list[JobSummary]
model_config = {"from_attributes": True}
@router.get("/", response_model=JobListResponse)
async def get_jobs(
db: AsyncSession = Depends(get_db),
) -> JobListResponse:
try:
result = await db.execute(select(Job).order_by(Job.created_at.desc()))
jobs = result.scalars().all()
return {"jobs": jobs}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/{job_id}", response_model=JobResponse)
async def get_job(
job_id: str,
db: AsyncSession = Depends(get_db),
) -> JobResponse:
try:
uid = uuid.UUID(job_id)
except ValueError:
raise HTTPException(status_code=400, detail="Invalid job ID format")
job: Job | None = await db.get(Job, uid)
if job is None:
raise HTTPException(status_code=404, detail="Job not found")
response = JobResponse(
id=str(job.id),
status=job.status,
source_language=job.source_language,
target_language=job.target_language,
complexity_level=job.complexity_level,
created_at=job.created_at,
started_at=job.started_at,
completed_at=job.completed_at,
)
if job.status == "succeeded":
response.generated_text = job.generated_text
response.translated_text = job.translated_text
response.input_summary = job.input_summary
response.audio_url = job.audio_url
elif job.status == "failed":
response.error_message = job.error_message
return response
async def _run_regenerate_audio(job_id: uuid.UUID) -> None:
async with AsyncSessionLocal() as db:
job = await db.get(Job, job_id)
await job_repo.mark_processing(db, job)
try:
voice = VOICE_BY_LANGUAGE.get(job.target_language, "Kore")
wav_bytes = await tts.generate_audio(job.generated_text, voice)
audio_key = f"audio/{job_id}.wav"
upload_audio(audio_key, wav_bytes)
await job_repo.mark_succeeded(db, job, audio_key)
except Exception as exc:
await job_repo.mark_failed(db, job, str(exc))
@router.post("/{job_id}/regenerate-audio", status_code=202)
async def regenerate_audio(
job_id: str,
db: AsyncSession = Depends(get_db),
token_data: dict = Depends(verify_token),
) -> dict:
try:
uid = uuid.UUID(job_id)
except ValueError:
raise HTTPException(status_code=400, detail="Invalid job ID format")
job: Job | None = await db.get(Job, uid)
if job is None:
raise HTTPException(status_code=404, detail="Job not found")
if str(job.user_id) != token_data["sub"]:
raise HTTPException(
status_code=403, detail="Not authorized to modify this job")
if not job.generated_text:
raise HTTPException(
status_code=400, detail="Job has no generated text to synthesize")
if job.audio_url:
raise HTTPException(status_code=409, detail="Job already has audio")
if job.status == "processing":
raise HTTPException(
status_code=409, detail="Job is already processing")
await worker.enqueue(partial(_run_regenerate_audio, uid))
return {"job_id": job_id}