mirror of
https://github.com/myronblair/kino
synced 2026-06-30 17:50:29 -05:00
73 lines
2.4 KiB
Python
73 lines
2.4 KiB
Python
"""JWT auth helpers using bcrypt + python-jose."""
|
|
import os
|
|
from datetime import datetime, timezone, timedelta
|
|
from typing import Optional
|
|
|
|
import bcrypt
|
|
from fastapi import Depends, HTTPException, status
|
|
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
|
|
from jose import jwt, JWTError
|
|
|
|
|
|
JWT_SECRET = os.environ.get("JWT_SECRET", "dev-secret")
|
|
JWT_ALG = os.environ.get("JWT_ALG", "HS256")
|
|
JWT_EXPIRE_HOURS = int(os.environ.get("JWT_EXPIRE_HOURS", "720"))
|
|
|
|
bearer = HTTPBearer(auto_error=False)
|
|
|
|
|
|
def hash_password(plain: str) -> str:
|
|
return bcrypt.hashpw(plain.encode("utf-8"), bcrypt.gensalt()).decode("utf-8")
|
|
|
|
|
|
def verify_password(plain: str, hashed: str) -> bool:
|
|
try:
|
|
return bcrypt.checkpw(plain.encode("utf-8"), hashed.encode("utf-8"))
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
def create_token(user_id: str, is_admin: bool = False) -> str:
|
|
payload = {
|
|
"sub": user_id,
|
|
"is_admin": is_admin,
|
|
"iat": datetime.now(timezone.utc),
|
|
"exp": datetime.now(timezone.utc) + timedelta(hours=JWT_EXPIRE_HOURS),
|
|
}
|
|
return jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALG)
|
|
|
|
|
|
def decode_token(token: str) -> dict:
|
|
try:
|
|
return jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALG])
|
|
except JWTError as e:
|
|
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=f"Invalid token: {e}")
|
|
|
|
|
|
async def get_current_user_id(
|
|
creds: Optional[HTTPAuthorizationCredentials] = Depends(bearer),
|
|
auth_query: Optional[str] = None,
|
|
) -> str:
|
|
"""Reads bearer token; supports `?auth=` query for video tags."""
|
|
token = None
|
|
if creds and creds.credentials:
|
|
token = creds.credentials
|
|
if not token:
|
|
raise HTTPException(status_code=401, detail="Not authenticated")
|
|
payload = decode_token(token)
|
|
return payload["sub"]
|
|
|
|
|
|
def token_from_query_or_header(authorization: Optional[str], auth: Optional[str]) -> str:
|
|
"""Helper for endpoints that accept token via Authorization header OR ?auth= query."""
|
|
if authorization and authorization.lower().startswith("bearer "):
|
|
return authorization.split(" ", 1)[1].strip()
|
|
if auth:
|
|
return auth
|
|
raise HTTPException(status_code=401, detail="Not authenticated")
|
|
|
|
|
|
def require_user_from_any(authorization: Optional[str], auth: Optional[str]) -> dict:
|
|
token = token_from_query_or_header(authorization, auth)
|
|
return decode_token(token)
|