from datetime import datetime, timedelta, timezone from fastapi import Depends, HTTPException, status from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials import jwt from passlib.context import CryptContext from .config import settings security = HTTPBearer() pwd_context = CryptContext( schemes=["pbkdf2_sha256"], deprecated="auto", ) TOKEN_EXPIRY_HOURS = 24 def hash_password(password: str) -> str: return pwd_context.hash(password) def verify_password(plain: str, hashed: str) -> bool: return pwd_context.verify(plain, hashed) def create_access_token(user_id: str, email: str, is_admin: bool = False) -> str: payload = { "sub": user_id, "email": email, "is_admin": is_admin, "exp": datetime.now(timezone.utc) + timedelta(hours=TOKEN_EXPIRY_HOURS), } return jwt.encode(payload, settings.jwt_secret, algorithm="HS256") def verify_token( credentials: HTTPAuthorizationCredentials = Depends(security), ) -> dict: try: payload = jwt.decode( credentials.credentials, settings.jwt_secret, algorithms=["HS256"], ) return payload except jwt.InvalidTokenError: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid or expired token", ) def _admin_emails() -> frozenset[str]: return frozenset( e.strip() for e in settings.admin_user_emails.split(",") if e.strip() ) def require_admin(token_data: dict = Depends(verify_token)) -> dict: if not token_data.get("is_admin"): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Admin access required", ) return token_data