language-learning-app/api/app/routers/jobs.py
2026-03-18 20:55:02 +00:00

89 lines
2.5 KiB
Python

import uuid
from datetime import datetime
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from ..auth import verify_token
from ..database import get_db
from ..models import Job
router = APIRouter(prefix="/jobs", tags=["jobs"])
class JobResponse(BaseModel):
id: uuid.UUID
status: str
source_language: str
target_language: str
complexity_level: str
created_at: datetime
started_at: datetime | None = None
completed_at: datetime | None = None
# only present on success
generated_text: str | None = None
translated_text: str | None = None
input_summary: str | None = None
# only present on failure
error_message: str | None = None
model_config = { "from_attributes": True }
class JobSummary(BaseModel):
id: uuid.UUID
status: str
created_at: datetime
class JobListResponse(BaseModel):
jobs: list[JobSummary]
model_config = { "from_attributes": True }
@router.get("/", response_model=JobListResponse)
async def get_jobs(
db: AsyncSession = Depends(get_db),
_: dict = Depends(verify_token)
) -> JobListResponse:
try:
result = await db.execute(select(Job).order_by(Job.created_at.desc()))
jobs = result.scalars().all()
return { "jobs": jobs }
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/{job_id}", response_model=JobResponse)
async def get_job(
job_id: str,
db: AsyncSession = Depends(get_db),
_: dict = Depends(verify_token),
) -> JobResponse:
try:
uid = uuid.UUID(job_id)
except ValueError:
raise HTTPException(status_code=400, detail="Invalid job ID format")
job: Job | None = await db.get(Job, uid)
if job is None:
raise HTTPException(status_code=404, detail="Job not found")
response = JobResponse(
id=str(job.id),
status=job.status,
source_language=job.source_language,
target_language=job.target_language,
complexity_level=job.complexity_level,
created_at=job.created_at,
started_at=job.started_at,
completed_at=job.completed_at,
)
if job.status == "succeeded":
response.generated_text = job.generated_text
response.translated_text = job.translated_text
response.input_summary = job.input_summary
elif job.status == "failed":
response.error_message = job.error_message
return response