language-learning-app/api/app/routers/auth.py

79 lines
2.4 KiB
Python
Raw Normal View History

from fastapi import APIRouter, Depends, HTTPException, status
from pydantic import BaseModel, EmailStr
from sqlalchemy import select
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import AsyncSession
from ..auth import create_access_token, hash_password, verify_password
from ..database import get_db
from ..models import User
router = APIRouter(prefix="/auth", tags=["auth"])
class RegisterRequest(BaseModel):
email: EmailStr
password: str
class LoginRequest(BaseModel):
email: EmailStr
password: str
class TokenResponse(BaseModel):
access_token: str
token_type: str = "bearer"
@router.post("/register", status_code=status.HTTP_201_CREATED)
async def register(body: RegisterRequest, db: AsyncSession = Depends(get_db)):
user = User(
email=body.email,
hashed_password=hash_password(body.password),
)
db.add(user)
try:
await db.commit()
await db.refresh(user)
except IntegrityError:
await db.rollback()
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="Email already registered",
)
# TODO(email-verification): send verification email here once transactional
# email is implemented. Set is_email_verified=False on the User model and
# require verification before allowing login.
return {"id": str(user.id), "email": user.email}
@router.post("/login", response_model=TokenResponse)
async def login(body: LoginRequest, db: AsyncSession = Depends(get_db)):
result = await db.execute(select(User).where(User.email == body.email))
user = result.scalar_one_or_none()
if user is None or not verify_password(body.password, user.hashed_password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid email or password",
)
if not user.is_active:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Account disabled",
)
# TODO(email-verification): uncomment once email verification is in place
# if not user.is_email_verified:
# raise HTTPException(
# status_code=status.HTTP_403_FORBIDDEN,
# detail="Email address not verified",
# )
token = create_access_token(str(user.id), user.email)
return TokenResponse(access_token=token)