language-learning-app/api/app/routers/auth.py

74 lines
2.3 KiB
Python

from fastapi import APIRouter, Depends, HTTPException, status
from pydantic import BaseModel, EmailStr
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import AsyncSession
from ..auth import create_access_token, hash_password, verify_password
from ..outbound.postgres.database import get_db
from ..outbound.postgres.repositories import user_repository
router = APIRouter(prefix="/auth", tags=["auth"])
class RegisterRequest(BaseModel):
email: EmailStr
password: str
class LoginRequest(BaseModel):
email: EmailStr
password: str
class TokenResponse(BaseModel):
access_token: str
token_type: str = "bearer"
@router.post("/register", status_code=status.HTTP_201_CREATED)
async def register(body: RegisterRequest, db: AsyncSession = Depends(get_db)):
try:
user = await user_repository.create(
db,
email=body.email,
hashed_password=hash_password(body.password),
)
except IntegrityError:
await db.rollback()
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="Email already registered",
)
# TODO(email-verification): send verification email here once transactional
# email is implemented. Set is_email_verified=False on the User model and
# require verification before allowing login.
return {"id": str(user.id), "email": user.email}
@router.post("/login", response_model=TokenResponse)
async def login(body: LoginRequest, db: AsyncSession = Depends(get_db)):
user = await user_repository.get_by_email(db, body.email)
if user is None or not verify_password(body.password, user.hashed_password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid email or password",
)
if not user.is_active:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Account disabled",
)
# TODO(email-verification): uncomment once email verification is in place
# if not user.is_email_verified:
# raise HTTPException(
# status_code=status.HTTP_403_FORBIDDEN,
# detail="Email address not verified",
# )
token = create_access_token(str(user.id), user.email)
return TokenResponse(access_token=token)