mirror of
https://github.com/myronblair/kino-app
synced 2026-06-30 17:50:16 -05:00
296 lines
11 KiB
Python
296 lines
11 KiB
Python
"""
|
|
Backend pytest suite for Kino Personal Media Server.
|
|
Tests auth, movies, watchlist, progress, requests, and stream auth.
|
|
"""
|
|
import os
|
|
import time
|
|
import uuid
|
|
import pytest
|
|
import requests
|
|
|
|
BASE_URL = os.environ.get("REACT_APP_BACKEND_URL", "https://streamhoard.preview.emergentagent.com").rstrip("/")
|
|
ADMIN_EMAIL = "admin@kino.local"
|
|
ADMIN_PASSWORD = "kino-admin-2026"
|
|
|
|
|
|
# ---------- Fixtures ----------
|
|
@pytest.fixture(scope="session")
|
|
def api():
|
|
s = requests.Session()
|
|
s.headers.update({"Content-Type": "application/json"})
|
|
return s
|
|
|
|
|
|
@pytest.fixture(scope="session")
|
|
def admin_token(api):
|
|
r = api.post(f"{BASE_URL}/api/auth/login", json={"email": ADMIN_EMAIL, "password": ADMIN_PASSWORD})
|
|
assert r.status_code == 200, f"admin login failed: {r.status_code} {r.text}"
|
|
data = r.json()
|
|
assert data["user"]["is_admin"] is True
|
|
return data["access_token"]
|
|
|
|
|
|
@pytest.fixture(scope="session")
|
|
def member_token(api):
|
|
email = f"TEST_user_{uuid.uuid4().hex[:8]}@kino.local"
|
|
r = api.post(f"{BASE_URL}/api/auth/register", json={"email": email, "password": "pass1234", "name": "Test User"})
|
|
assert r.status_code == 200, f"register failed: {r.text}"
|
|
data = r.json()
|
|
return data["access_token"], data["user"]
|
|
|
|
|
|
@pytest.fixture
|
|
def admin_headers(admin_token):
|
|
return {"Authorization": f"Bearer {admin_token}"}
|
|
|
|
|
|
@pytest.fixture
|
|
def member_headers(member_token):
|
|
tok, _ = member_token
|
|
return {"Authorization": f"Bearer {tok}"}
|
|
|
|
|
|
# ---------- Health ----------
|
|
class TestHealth:
|
|
def test_root(self, api):
|
|
r = api.get(f"{BASE_URL}/api/")
|
|
assert r.status_code == 200
|
|
assert r.json().get("status") == "ok"
|
|
|
|
|
|
# ---------- Auth ----------
|
|
class TestAuth:
|
|
def test_admin_login(self, api):
|
|
r = api.post(f"{BASE_URL}/api/auth/login", json={"email": ADMIN_EMAIL, "password": ADMIN_PASSWORD})
|
|
assert r.status_code == 200
|
|
d = r.json()
|
|
assert "access_token" in d and isinstance(d["access_token"], str)
|
|
assert d["user"]["email"] == ADMIN_EMAIL
|
|
assert d["user"]["is_admin"] is True
|
|
|
|
def test_login_invalid(self, api):
|
|
r = api.post(f"{BASE_URL}/api/auth/login", json={"email": ADMIN_EMAIL, "password": "wrong"})
|
|
assert r.status_code == 401
|
|
|
|
def test_register_and_me(self, api):
|
|
email = f"TEST_reg_{uuid.uuid4().hex[:8]}@kino.local"
|
|
r = api.post(f"{BASE_URL}/api/auth/register", json={"email": email, "password": "pass1234", "name": "Reg User"})
|
|
assert r.status_code == 200
|
|
d = r.json()
|
|
token = d["access_token"]
|
|
assert d["user"]["email"] == email.lower()
|
|
assert d["user"]["is_admin"] is False
|
|
# /me
|
|
r2 = api.get(f"{BASE_URL}/api/auth/me", headers={"Authorization": f"Bearer {token}"})
|
|
assert r2.status_code == 200
|
|
assert r2.json()["email"] == email.lower()
|
|
|
|
def test_register_duplicate(self, api):
|
|
email = f"TEST_dup_{uuid.uuid4().hex[:8]}@kino.local"
|
|
api.post(f"{BASE_URL}/api/auth/register", json={"email": email, "password": "pass1234", "name": "U"})
|
|
r = api.post(f"{BASE_URL}/api/auth/register", json={"email": email, "password": "pass1234", "name": "U"})
|
|
assert r.status_code == 400
|
|
|
|
def test_me_no_token(self, api):
|
|
r = api.get(f"{BASE_URL}/api/auth/me")
|
|
assert r.status_code == 401
|
|
|
|
|
|
# ---------- Movies (public) ----------
|
|
class TestMovies:
|
|
def test_list_movies_seeded(self, api, admin_headers):
|
|
r = api.get(f"{BASE_URL}/api/movies", headers=admin_headers)
|
|
assert r.status_code == 200
|
|
movies = r.json()
|
|
assert isinstance(movies, list)
|
|
assert len(movies) >= 10
|
|
# No _id leaked
|
|
assert all("_id" not in m for m in movies)
|
|
# Required fields
|
|
m = movies[0]
|
|
for f in ("id", "title", "video_url", "storage_type"):
|
|
assert f in m
|
|
|
|
def test_featured(self, api, admin_headers):
|
|
r = api.get(f"{BASE_URL}/api/movies/featured", headers=admin_headers)
|
|
assert r.status_code == 200
|
|
m = r.json()
|
|
assert m["title"] == "Big Buck Bunny"
|
|
assert "_id" not in m
|
|
|
|
def test_genres(self, api):
|
|
r = api.get(f"{BASE_URL}/api/movies/genres")
|
|
assert r.status_code == 200
|
|
genres = r.json()
|
|
assert isinstance(genres, list)
|
|
assert len(genres) > 0
|
|
assert genres == sorted(genres)
|
|
|
|
def test_get_movie_404(self, api):
|
|
r = api.get(f"{BASE_URL}/api/movies/nonexistent-id-xyz")
|
|
assert r.status_code == 404
|
|
|
|
def test_get_movie_by_id(self, api, admin_headers):
|
|
movies = api.get(f"{BASE_URL}/api/movies", headers=admin_headers).json()
|
|
mid = movies[0]["id"]
|
|
r = api.get(f"{BASE_URL}/api/movies/{mid}")
|
|
assert r.status_code == 200
|
|
assert r.json()["id"] == mid
|
|
|
|
def test_search_query(self, api, admin_headers):
|
|
r = api.get(f"{BASE_URL}/api/movies", params={"q": "bunny"}, headers=admin_headers)
|
|
assert r.status_code == 200
|
|
results = r.json()
|
|
assert any("bunny" in m["title"].lower() for m in results)
|
|
|
|
|
|
# ---------- Movies admin CRUD ----------
|
|
class TestMoviesAdmin:
|
|
def test_create_requires_admin(self, api, member_headers):
|
|
r = api.post(f"{BASE_URL}/api/movies", json={"title": "X"}, headers=member_headers)
|
|
assert r.status_code == 403
|
|
|
|
def test_create_update_delete(self, api, admin_headers):
|
|
payload = {
|
|
"title": "TEST_AdminMovie",
|
|
"description": "test",
|
|
"year": 2024,
|
|
"genres": ["Test"],
|
|
"video_url": "https://example.com/x.mp4",
|
|
"storage_type": "external",
|
|
"featured": False,
|
|
}
|
|
r = api.post(f"{BASE_URL}/api/movies", json=payload, headers=admin_headers)
|
|
assert r.status_code == 200, r.text
|
|
created = r.json()
|
|
mid = created["id"]
|
|
assert created["title"] == "TEST_AdminMovie"
|
|
|
|
# GET to verify persistence
|
|
g = api.get(f"{BASE_URL}/api/movies/{mid}")
|
|
assert g.status_code == 200
|
|
assert g.json()["title"] == "TEST_AdminMovie"
|
|
|
|
# PATCH
|
|
u = api.patch(f"{BASE_URL}/api/movies/{mid}", json={"title": "TEST_AdminMovieUpdated"}, headers=admin_headers)
|
|
assert u.status_code == 200
|
|
assert u.json()["title"] == "TEST_AdminMovieUpdated"
|
|
|
|
# GET re-verify
|
|
g2 = api.get(f"{BASE_URL}/api/movies/{mid}")
|
|
assert g2.json()["title"] == "TEST_AdminMovieUpdated"
|
|
|
|
# DELETE
|
|
d = api.delete(f"{BASE_URL}/api/movies/{mid}", headers=admin_headers)
|
|
assert d.status_code == 200
|
|
|
|
# Verify 404
|
|
g3 = api.get(f"{BASE_URL}/api/movies/{mid}")
|
|
assert g3.status_code == 404
|
|
|
|
|
|
# ---------- Watchlist ----------
|
|
class TestWatchlist:
|
|
def test_watchlist_flow(self, api, member_headers, admin_headers):
|
|
movies = api.get(f"{BASE_URL}/api/movies", headers=admin_headers).json()
|
|
mid = movies[0]["id"]
|
|
|
|
r = api.post(f"{BASE_URL}/api/watchlist/{mid}", headers=member_headers)
|
|
assert r.status_code == 200
|
|
|
|
r2 = api.get(f"{BASE_URL}/api/watchlist", headers=member_headers)
|
|
assert r2.status_code == 200
|
|
wl = r2.json()
|
|
assert any(m["id"] == mid for m in wl)
|
|
|
|
# Idempotent add
|
|
r3 = api.post(f"{BASE_URL}/api/watchlist/{mid}", headers=member_headers)
|
|
assert r3.status_code == 200
|
|
|
|
d = api.delete(f"{BASE_URL}/api/watchlist/{mid}", headers=member_headers)
|
|
assert d.status_code == 200
|
|
|
|
r4 = api.get(f"{BASE_URL}/api/watchlist", headers=member_headers)
|
|
assert all(m["id"] != mid for m in r4.json())
|
|
|
|
def test_watchlist_unauth(self, api):
|
|
r = api.get(f"{BASE_URL}/api/watchlist")
|
|
assert r.status_code == 401
|
|
|
|
|
|
# ---------- Progress ----------
|
|
class TestProgress:
|
|
def test_progress_upsert_and_continue(self, api, member_headers, admin_headers):
|
|
movies = api.get(f"{BASE_URL}/api/movies", headers=admin_headers).json()
|
|
mid = movies[1]["id"]
|
|
r = api.post(f"{BASE_URL}/api/progress",
|
|
json={"movie_id": mid, "position_seconds": 30, "duration_seconds": 600},
|
|
headers=member_headers)
|
|
assert r.status_code == 200
|
|
|
|
r2 = api.get(f"{BASE_URL}/api/progress/continue", headers=member_headers)
|
|
assert r2.status_code == 200
|
|
rows = r2.json()
|
|
assert any(m["id"] == mid for m in rows)
|
|
# progress object attached
|
|
target = next(m for m in rows if m["id"] == mid)
|
|
assert target["progress"]["position_seconds"] == 30
|
|
|
|
# update
|
|
r3 = api.post(f"{BASE_URL}/api/progress",
|
|
json={"movie_id": mid, "position_seconds": 90, "duration_seconds": 600},
|
|
headers=member_headers)
|
|
assert r3.status_code == 200
|
|
r4 = api.get(f"{BASE_URL}/api/progress/{mid}", headers=member_headers)
|
|
assert r4.json()["position_seconds"] == 90
|
|
|
|
|
|
# ---------- Requests ----------
|
|
class TestRequests:
|
|
def test_submit_and_admin_list(self, api, member_headers, admin_headers):
|
|
payload = {"title": "TEST_Req_Movie", "year": 2023, "notes": "please add"}
|
|
r = api.post(f"{BASE_URL}/api/requests", json=payload, headers=member_headers)
|
|
assert r.status_code == 200
|
|
req = r.json()
|
|
rid = req["id"]
|
|
assert req["status"] == "pending"
|
|
|
|
mine = api.get(f"{BASE_URL}/api/requests/mine", headers=member_headers)
|
|
assert mine.status_code == 200
|
|
assert any(x["id"] == rid for x in mine.json())
|
|
|
|
# Member cannot list all
|
|
forbid = api.get(f"{BASE_URL}/api/requests", headers=member_headers)
|
|
assert forbid.status_code == 403
|
|
|
|
# Admin lists
|
|
all_r = api.get(f"{BASE_URL}/api/requests", headers=admin_headers)
|
|
assert all_r.status_code == 200
|
|
assert any(x["id"] == rid for x in all_r.json())
|
|
|
|
# Admin updates
|
|
upd = api.patch(f"{BASE_URL}/api/requests/{rid}", json={"status": "fulfilled"}, headers=admin_headers)
|
|
assert upd.status_code == 200
|
|
assert upd.json()["status"] == "fulfilled"
|
|
|
|
|
|
# ---------- Streaming ----------
|
|
class TestStream:
|
|
def test_stream_no_token(self, api, admin_headers):
|
|
movies = api.get(f"{BASE_URL}/api/movies", headers=admin_headers).json()
|
|
mid = movies[0]["id"]
|
|
r = api.get(f"{BASE_URL}/api/stream/{mid}", allow_redirects=False)
|
|
assert r.status_code == 401
|
|
|
|
def test_stream_external_returns_400(self, api, admin_token, admin_headers):
|
|
# All seeded movies are storage_type=external -> should 400
|
|
movies = api.get(f"{BASE_URL}/api/movies", headers=admin_headers).json()
|
|
mid = movies[0]["id"]
|
|
r = api.get(f"{BASE_URL}/api/stream/{mid}", params={"auth": admin_token}, allow_redirects=False)
|
|
assert r.status_code == 400
|
|
|
|
def test_upload_unauth(self, api):
|
|
# Just verify rejection without token; do NOT upload large file
|
|
r = requests.post(f"{BASE_URL}/api/upload/video", data={"title": "X"}, files={"file": ("x.mp4", b"x", "video/mp4")})
|
|
assert r.status_code in (401, 403, 422)
|