Files
kino-app/backend/tests/test_phase2.py
T
2026-04-29 16:01:20 +00:00

335 lines
14 KiB
Python

"""
Phase 2 backend tests for Kino: profiles, parental control, settings,
TMDB/Radarr unconfigured paths, subtitles, HLS endpoint contract.
"""
import os
import io
import uuid
import pytest
import requests
BASE_URL = os.environ.get("REACT_APP_BACKEND_URL", "").rstrip("/")
ADMIN_EMAIL = "admin@kino.local"
ADMIN_PASSWORD = "kino-admin-2026"
# ---------- Fixtures ----------
@pytest.fixture(scope="module")
def api():
s = requests.Session()
return s
@pytest.fixture(scope="module")
def admin_token(api):
r = api.post(f"{BASE_URL}/api/auth/login",
json={"email": ADMIN_EMAIL, "password": ADMIN_PASSWORD})
assert r.status_code == 200
return r.json()["access_token"]
@pytest.fixture(scope="module")
def admin_headers(admin_token):
return {"Authorization": f"Bearer {admin_token}"}
@pytest.fixture(scope="module")
def member(api):
"""Create a fresh member for profile-scoped tests."""
email = f"TEST_p2_{uuid.uuid4().hex[:8]}@kino.local"
r = api.post(f"{BASE_URL}/api/auth/register",
json={"email": email, "password": "pass1234", "name": "P2 User"})
assert r.status_code == 200
d = r.json()
return d["access_token"], d["user"]["id"]
@pytest.fixture(scope="module")
def member_headers(member):
tok, _ = member
return {"Authorization": f"Bearer {tok}"}
# ---------- PROFILES ----------
class TestProfiles:
def test_default_profile_auto_created(self, api, member_headers):
r = api.get(f"{BASE_URL}/api/profiles", headers=member_headers)
assert r.status_code == 200
profiles = r.json()
assert isinstance(profiles, list)
assert len(profiles) >= 1
assert profiles[0]["name"] == "P2 User"
assert profiles[0]["max_rating"] == "NR"
assert "id" in profiles[0]
def test_create_profile(self, api, member_headers):
r = api.post(f"{BASE_URL}/api/profiles",
json={"name": "TEST_Kid", "is_kids": True,
"max_rating": "PG", "avatar_color": "#22aa55"},
headers=member_headers)
assert r.status_code == 200, r.text
p = r.json()
assert p["name"] == "TEST_Kid"
assert p["is_kids"] is True
assert p["max_rating"] == "PG"
# persistence
listing = api.get(f"{BASE_URL}/api/profiles", headers=member_headers).json()
assert any(x["id"] == p["id"] for x in listing)
def test_update_profile(self, api, member_headers):
# create then patch
c = api.post(f"{BASE_URL}/api/profiles",
json={"name": "TEST_PatchMe"}, headers=member_headers).json()
r = api.patch(f"{BASE_URL}/api/profiles/{c['id']}",
json={"name": "TEST_Patched", "max_rating": "PG-13"},
headers=member_headers)
assert r.status_code == 200
assert r.json()["name"] == "TEST_Patched"
assert r.json()["max_rating"] == "PG-13"
def test_max_5_profiles(self, api, member_headers):
# current count
existing = api.get(f"{BASE_URL}/api/profiles", headers=member_headers).json()
# create up to 5
for i in range(max(0, 5 - len(existing))):
api.post(f"{BASE_URL}/api/profiles",
json={"name": f"TEST_P{i}"}, headers=member_headers)
# 6th must fail
r = api.post(f"{BASE_URL}/api/profiles",
json={"name": "TEST_Overflow"}, headers=member_headers)
assert r.status_code == 400
def test_cannot_delete_last_profile(self, api):
# fresh user with single profile
email = f"TEST_solo_{uuid.uuid4().hex[:6]}@kino.local"
d = api.post(f"{BASE_URL}/api/auth/register",
json={"email": email, "password": "pass1234", "name": "Solo"}).json()
h = {"Authorization": f"Bearer {d['access_token']}"}
plist = api.get(f"{BASE_URL}/api/profiles", headers=h).json()
assert len(plist) == 1
r = api.delete(f"{BASE_URL}/api/profiles/{plist[0]['id']}", headers=h)
assert r.status_code == 400
def test_delete_profile_cleans_watchlist(self, api, member_headers):
# ensure room (we may be at 5-profile cap from prior test)
existing = api.get(f"{BASE_URL}/api/profiles", headers=member_headers).json()
# delete any TEST_P* profiles to free a slot
for x in existing:
if x["name"].startswith("TEST_P") and len(existing) > 1:
api.delete(f"{BASE_URL}/api/profiles/{x['id']}", headers=member_headers)
existing = api.get(f"{BASE_URL}/api/profiles", headers=member_headers).json()
if len(existing) < 5:
break
p = api.post(f"{BASE_URL}/api/profiles",
json={"name": "TEST_Del"}, headers=member_headers).json()
assert "id" in p, f"profile create failed: {p}"
movies = api.get(f"{BASE_URL}/api/movies", headers=member_headers).json()
mid = movies[0]["id"]
h = {**member_headers, "X-Profile-Id": p["id"]}
api.post(f"{BASE_URL}/api/watchlist/{mid}", headers=h)
wl = api.get(f"{BASE_URL}/api/watchlist", headers=h).json()
assert any(x["id"] == mid for x in wl)
r = api.delete(f"{BASE_URL}/api/profiles/{p['id']}", headers=member_headers)
assert r.status_code == 200
# ---------- Parental control ----------
class TestParentalControl:
def _kid_profile(self, api, headers):
# find or create a PG kid profile
plist = api.get(f"{BASE_URL}/api/profiles", headers=headers).json()
for p in plist:
if p.get("max_rating") == "PG":
return p
return api.post(f"{BASE_URL}/api/profiles",
json={"name": "TEST_KidsRC", "is_kids": True,
"max_rating": "PG"}, headers=headers).json()
def test_movies_filtered_by_kids_profile(self, api, member_headers):
kid = self._kid_profile(api, member_headers)
h = {**member_headers, "X-Profile-Id": kid["id"]}
r = api.get(f"{BASE_URL}/api/movies", headers=h)
assert r.status_code == 200
movies = r.json()
# only G/PG/NR allowed
for m in movies:
assert m["rating"] in ("G", "PG", "NR"), f"Disallowed rating: {m['rating']}"
def test_featured_respects_cap(self, api, member_headers):
kid = self._kid_profile(api, member_headers)
h = {**member_headers, "X-Profile-Id": kid["id"]}
r = api.get(f"{BASE_URL}/api/movies/featured", headers=h)
# may return 200 or 404 if no featured G/PG/NR exists; both acceptable
assert r.status_code in (200, 404)
if r.status_code == 200:
assert r.json()["rating"] in ("G", "PG", "NR")
def test_watchlist_profile_scoped(self, api, member_headers):
# Use default (unrestricted) profile to add, kid profile should not see
plist = api.get(f"{BASE_URL}/api/profiles", headers=member_headers).json()
default = next((p for p in plist if p["max_rating"] == "NR"), plist[0])
kid = self._kid_profile(api, member_headers)
movies = api.get(f"{BASE_URL}/api/movies", headers=member_headers).json()
# find an R-rated or other distinct movie
target = next((m for m in movies if m["rating"] not in ("G", "PG", "NR")), movies[0])
mid = target["id"]
h_def = {**member_headers, "X-Profile-Id": default["id"]}
api.post(f"{BASE_URL}/api/watchlist/{mid}", headers=h_def)
wl_def = api.get(f"{BASE_URL}/api/watchlist", headers=h_def).json()
assert any(m["id"] == mid for m in wl_def)
h_kid = {**member_headers, "X-Profile-Id": kid["id"]}
wl_kid = api.get(f"{BASE_URL}/api/watchlist", headers=h_kid).json()
assert all(m["id"] != mid for m in wl_kid)
# ---------- Settings (admin) ----------
class TestSettings:
def test_get_settings(self, api, admin_headers):
r = api.get(f"{BASE_URL}/api/settings", headers=admin_headers)
assert r.status_code == 200
s = r.json()
assert "tmdb_configured" in s and "radarr_configured" in s
assert isinstance(s["tmdb_configured"], bool)
def test_settings_member_forbidden(self, api, member_headers):
r = api.get(f"{BASE_URL}/api/settings", headers=member_headers)
assert r.status_code == 403
def test_put_settings_persists(self, api, admin_headers):
# snapshot current state
before = api.get(f"{BASE_URL}/api/settings", headers=admin_headers).json()
# set empty -> tmdb_configured False
r = api.put(f"{BASE_URL}/api/settings",
json={"tmdb_api_key": "", "radarr_url": "", "radarr_api_key": ""},
headers=admin_headers)
assert r.status_code == 200
assert r.json()["tmdb_configured"] is False
assert r.json()["radarr_configured"] is False
# set a fake TMDB key (won't be hit) — verify configured flips true
r2 = api.put(f"{BASE_URL}/api/settings",
json={"tmdb_api_key": "TEST_FAKE_KEY", "radarr_url": "",
"radarr_api_key": ""}, headers=admin_headers)
assert r2.status_code == 200
assert r2.json()["tmdb_configured"] is True
# restore to empty (avoid hitting TMDB in other tests)
api.put(f"{BASE_URL}/api/settings",
json={"tmdb_api_key": before.get("tmdb_api_key", ""),
"radarr_url": before.get("radarr_url", ""),
"radarr_api_key": before.get("radarr_api_key", "")},
headers=admin_headers)
# ---------- TMDB / Radarr unconfigured ----------
class TestExternalUnconfigured:
def test_tmdb_search_no_key(self, api, admin_headers):
# ensure unconfigured
api.put(f"{BASE_URL}/api/settings",
json={"tmdb_api_key": "", "radarr_url": "", "radarr_api_key": ""},
headers=admin_headers)
r = api.get(f"{BASE_URL}/api/tmdb/search",
params={"q": "matrix"}, headers=admin_headers)
assert r.status_code == 400
def test_radarr_test_unconfigured(self, api, admin_headers):
r = api.post(f"{BASE_URL}/api/radarr/test", headers=admin_headers)
assert r.status_code == 400
def test_radarr_movies_unconfigured(self, api, admin_headers):
r = api.get(f"{BASE_URL}/api/radarr/movies", headers=admin_headers)
assert r.status_code == 400
# ---------- HLS / Transcode contract ----------
class TestHLS:
def test_transcode_external_400(self, api, admin_headers):
movies = api.get(f"{BASE_URL}/api/movies", headers=admin_headers).json()
ext = next(m for m in movies if m["storage_type"] == "external")
r = api.post(f"{BASE_URL}/api/movies/{ext['id']}/transcode",
headers=admin_headers)
assert r.status_code == 400
def test_hls_serve_404_no_files(self, api, admin_token, admin_headers):
movies = api.get(f"{BASE_URL}/api/movies", headers=admin_headers).json()
mid = movies[0]["id"]
r = api.get(f"{BASE_URL}/api/movies/{mid}/hls/playlist.m3u8",
params={"auth": admin_token})
assert r.status_code == 404
def test_hls_serve_unauth(self, api, admin_headers):
movies = api.get(f"{BASE_URL}/api/movies", headers=admin_headers).json()
mid = movies[0]["id"]
r = api.get(f"{BASE_URL}/api/movies/{mid}/hls/playlist.m3u8")
assert r.status_code == 401
# ---------- Subtitles ----------
class TestSubtitles:
@pytest.fixture(scope="class")
def movie_id(self, api, admin_headers):
return api.get(f"{BASE_URL}/api/movies", headers=admin_headers).json()[0]["id"]
def test_upload_vtt(self, api, admin_headers, admin_token, movie_id):
vtt = b"WEBVTT\n\n00:00:01.000 --> 00:00:02.000\nHello"
files = {"file": ("test.vtt", io.BytesIO(vtt), "text/vtt")}
data = {"language": "en", "label": "TEST_English", "is_default": "false"}
r = requests.post(f"{BASE_URL}/api/movies/{movie_id}/subtitles",
headers=admin_headers, files=files, data=data)
assert r.status_code == 200, r.text
sub = r.json()
sid = sub["id"]
assert sub["movie_id"] == movie_id
assert sub["language"] == "en"
# list
ls = api.get(f"{BASE_URL}/api/movies/{movie_id}/subtitles",
headers=admin_headers)
assert ls.status_code == 200
assert any(x["id"] == sid for x in ls.json())
# serve via header auth
f = requests.get(f"{BASE_URL}/api/subtitles/{sid}/file",
headers=admin_headers)
assert f.status_code == 200
assert "text/vtt" in f.headers.get("content-type", "")
assert b"WEBVTT" in f.content
# serve via query auth
f2 = requests.get(f"{BASE_URL}/api/subtitles/{sid}/file",
params={"auth": admin_token})
assert f2.status_code == 200
# serve without auth
f3 = requests.get(f"{BASE_URL}/api/subtitles/{sid}/file")
assert f3.status_code == 401
# delete
d = requests.delete(f"{BASE_URL}/api/subtitles/{sid}",
headers=admin_headers)
assert d.status_code == 200
# verify gone
ls2 = api.get(f"{BASE_URL}/api/movies/{movie_id}/subtitles",
headers=admin_headers).json()
assert all(x["id"] != sid for x in ls2)
def test_upload_srt_converts(self, api, admin_headers, movie_id):
srt = b"1\n00:00:01,000 --> 00:00:02,000\nHi from SRT\n"
files = {"file": ("test.srt", io.BytesIO(srt), "text/plain")}
data = {"language": "fr", "label": "TEST_French"}
r = requests.post(f"{BASE_URL}/api/movies/{movie_id}/subtitles",
headers=admin_headers, files=files, data=data)
assert r.status_code == 200
sid = r.json()["id"]
f = requests.get(f"{BASE_URL}/api/subtitles/{sid}/file",
headers=admin_headers)
assert f.status_code == 200
assert b"WEBVTT" in f.content # converted
# cleanup
requests.delete(f"{BASE_URL}/api/subtitles/{sid}", headers=admin_headers)
def test_upload_subtitle_member_forbidden(self, api, member_headers, movie_id):
files = {"file": ("x.vtt", b"WEBVTT\n\n", "text/vtt")}
r = requests.post(f"{BASE_URL}/api/movies/{movie_id}/subtitles",
headers=member_headers, files=files,
data={"language": "en"})
assert r.status_code == 403