"""JWT auth helpers using bcrypt + python-jose.""" import os from datetime import datetime, timezone, timedelta from typing import Optional import bcrypt from fastapi import Depends, HTTPException, status from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from jose import jwt, JWTError JWT_SECRET = os.environ.get("JWT_SECRET", "dev-secret") JWT_ALG = os.environ.get("JWT_ALG", "HS256") JWT_EXPIRE_HOURS = int(os.environ.get("JWT_EXPIRE_HOURS", "720")) bearer = HTTPBearer(auto_error=False) def hash_password(plain: str) -> str: return bcrypt.hashpw(plain.encode("utf-8"), bcrypt.gensalt()).decode("utf-8") def verify_password(plain: str, hashed: str) -> bool: try: return bcrypt.checkpw(plain.encode("utf-8"), hashed.encode("utf-8")) except Exception: return False def create_token(user_id: str, is_admin: bool = False) -> str: payload = { "sub": user_id, "is_admin": is_admin, "iat": datetime.now(timezone.utc), "exp": datetime.now(timezone.utc) + timedelta(hours=JWT_EXPIRE_HOURS), } return jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALG) def decode_token(token: str) -> dict: try: return jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALG]) except JWTError as e: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=f"Invalid token: {e}") async def get_current_user_id( creds: Optional[HTTPAuthorizationCredentials] = Depends(bearer), auth_query: Optional[str] = None, ) -> str: """Reads bearer token; supports `?auth=` query for video tags.""" token = None if creds and creds.credentials: token = creds.credentials if not token: raise HTTPException(status_code=401, detail="Not authenticated") payload = decode_token(token) return payload["sub"] def token_from_query_or_header(authorization: Optional[str], auth: Optional[str]) -> str: """Helper for endpoints that accept token via Authorization header OR ?auth= query.""" if authorization and authorization.lower().startswith("bearer "): return authorization.split(" ", 1)[1].strip() if auth: return auth raise HTTPException(status_code=401, detail="Not authenticated") def require_user_from_any(authorization: Optional[str], auth: Optional[str]) -> dict: token = token_from_query_or_header(authorization, auth) return decode_token(token)