Files
2026-04-29 14:49:07 +00:00

73 lines
2.4 KiB
Python

"""JWT auth helpers using bcrypt + python-jose."""
import os
from datetime import datetime, timezone, timedelta
from typing import Optional
import bcrypt
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from jose import jwt, JWTError
JWT_SECRET = os.environ.get("JWT_SECRET", "dev-secret")
JWT_ALG = os.environ.get("JWT_ALG", "HS256")
JWT_EXPIRE_HOURS = int(os.environ.get("JWT_EXPIRE_HOURS", "720"))
bearer = HTTPBearer(auto_error=False)
def hash_password(plain: str) -> str:
return bcrypt.hashpw(plain.encode("utf-8"), bcrypt.gensalt()).decode("utf-8")
def verify_password(plain: str, hashed: str) -> bool:
try:
return bcrypt.checkpw(plain.encode("utf-8"), hashed.encode("utf-8"))
except Exception:
return False
def create_token(user_id: str, is_admin: bool = False) -> str:
payload = {
"sub": user_id,
"is_admin": is_admin,
"iat": datetime.now(timezone.utc),
"exp": datetime.now(timezone.utc) + timedelta(hours=JWT_EXPIRE_HOURS),
}
return jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALG)
def decode_token(token: str) -> dict:
try:
return jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALG])
except JWTError as e:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=f"Invalid token: {e}")
async def get_current_user_id(
creds: Optional[HTTPAuthorizationCredentials] = Depends(bearer),
auth_query: Optional[str] = None,
) -> str:
"""Reads bearer token; supports `?auth=` query for video tags."""
token = None
if creds and creds.credentials:
token = creds.credentials
if not token:
raise HTTPException(status_code=401, detail="Not authenticated")
payload = decode_token(token)
return payload["sub"]
def token_from_query_or_header(authorization: Optional[str], auth: Optional[str]) -> str:
"""Helper for endpoints that accept token via Authorization header OR ?auth= query."""
if authorization and authorization.lower().startswith("bearer "):
return authorization.split(" ", 1)[1].strip()
if auth:
return auth
raise HTTPException(status_code=401, detail="Not authenticated")
def require_user_from_any(authorization: Optional[str], auth: Optional[str]) -> dict:
token = token_from_query_or_header(authorization, auth)
return decode_token(token)