mirror of
https://github.com/myronblair/kino-app
synced 2026-06-30 17:50:16 -05:00
335 lines
14 KiB
Python
335 lines
14 KiB
Python
"""
|
|
Phase 2 backend tests for Kino: profiles, parental control, settings,
|
|
TMDB/Radarr unconfigured paths, subtitles, HLS endpoint contract.
|
|
"""
|
|
import os
|
|
import io
|
|
import uuid
|
|
import pytest
|
|
import requests
|
|
|
|
BASE_URL = os.environ.get("REACT_APP_BACKEND_URL", "").rstrip("/")
|
|
ADMIN_EMAIL = "admin@kino.local"
|
|
ADMIN_PASSWORD = "kino-admin-2026"
|
|
|
|
|
|
# ---------- Fixtures ----------
|
|
@pytest.fixture(scope="module")
|
|
def api():
|
|
s = requests.Session()
|
|
return s
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def admin_token(api):
|
|
r = api.post(f"{BASE_URL}/api/auth/login",
|
|
json={"email": ADMIN_EMAIL, "password": ADMIN_PASSWORD})
|
|
assert r.status_code == 200
|
|
return r.json()["access_token"]
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def admin_headers(admin_token):
|
|
return {"Authorization": f"Bearer {admin_token}"}
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def member(api):
|
|
"""Create a fresh member for profile-scoped tests."""
|
|
email = f"TEST_p2_{uuid.uuid4().hex[:8]}@kino.local"
|
|
r = api.post(f"{BASE_URL}/api/auth/register",
|
|
json={"email": email, "password": "pass1234", "name": "P2 User"})
|
|
assert r.status_code == 200
|
|
d = r.json()
|
|
return d["access_token"], d["user"]["id"]
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def member_headers(member):
|
|
tok, _ = member
|
|
return {"Authorization": f"Bearer {tok}"}
|
|
|
|
|
|
# ---------- PROFILES ----------
|
|
class TestProfiles:
|
|
def test_default_profile_auto_created(self, api, member_headers):
|
|
r = api.get(f"{BASE_URL}/api/profiles", headers=member_headers)
|
|
assert r.status_code == 200
|
|
profiles = r.json()
|
|
assert isinstance(profiles, list)
|
|
assert len(profiles) >= 1
|
|
assert profiles[0]["name"] == "P2 User"
|
|
assert profiles[0]["max_rating"] == "NR"
|
|
assert "id" in profiles[0]
|
|
|
|
def test_create_profile(self, api, member_headers):
|
|
r = api.post(f"{BASE_URL}/api/profiles",
|
|
json={"name": "TEST_Kid", "is_kids": True,
|
|
"max_rating": "PG", "avatar_color": "#22aa55"},
|
|
headers=member_headers)
|
|
assert r.status_code == 200, r.text
|
|
p = r.json()
|
|
assert p["name"] == "TEST_Kid"
|
|
assert p["is_kids"] is True
|
|
assert p["max_rating"] == "PG"
|
|
# persistence
|
|
listing = api.get(f"{BASE_URL}/api/profiles", headers=member_headers).json()
|
|
assert any(x["id"] == p["id"] for x in listing)
|
|
|
|
def test_update_profile(self, api, member_headers):
|
|
# create then patch
|
|
c = api.post(f"{BASE_URL}/api/profiles",
|
|
json={"name": "TEST_PatchMe"}, headers=member_headers).json()
|
|
r = api.patch(f"{BASE_URL}/api/profiles/{c['id']}",
|
|
json={"name": "TEST_Patched", "max_rating": "PG-13"},
|
|
headers=member_headers)
|
|
assert r.status_code == 200
|
|
assert r.json()["name"] == "TEST_Patched"
|
|
assert r.json()["max_rating"] == "PG-13"
|
|
|
|
def test_max_5_profiles(self, api, member_headers):
|
|
# current count
|
|
existing = api.get(f"{BASE_URL}/api/profiles", headers=member_headers).json()
|
|
# create up to 5
|
|
for i in range(max(0, 5 - len(existing))):
|
|
api.post(f"{BASE_URL}/api/profiles",
|
|
json={"name": f"TEST_P{i}"}, headers=member_headers)
|
|
# 6th must fail
|
|
r = api.post(f"{BASE_URL}/api/profiles",
|
|
json={"name": "TEST_Overflow"}, headers=member_headers)
|
|
assert r.status_code == 400
|
|
|
|
def test_cannot_delete_last_profile(self, api):
|
|
# fresh user with single profile
|
|
email = f"TEST_solo_{uuid.uuid4().hex[:6]}@kino.local"
|
|
d = api.post(f"{BASE_URL}/api/auth/register",
|
|
json={"email": email, "password": "pass1234", "name": "Solo"}).json()
|
|
h = {"Authorization": f"Bearer {d['access_token']}"}
|
|
plist = api.get(f"{BASE_URL}/api/profiles", headers=h).json()
|
|
assert len(plist) == 1
|
|
r = api.delete(f"{BASE_URL}/api/profiles/{plist[0]['id']}", headers=h)
|
|
assert r.status_code == 400
|
|
|
|
def test_delete_profile_cleans_watchlist(self, api, member_headers):
|
|
# ensure room (we may be at 5-profile cap from prior test)
|
|
existing = api.get(f"{BASE_URL}/api/profiles", headers=member_headers).json()
|
|
# delete any TEST_P* profiles to free a slot
|
|
for x in existing:
|
|
if x["name"].startswith("TEST_P") and len(existing) > 1:
|
|
api.delete(f"{BASE_URL}/api/profiles/{x['id']}", headers=member_headers)
|
|
existing = api.get(f"{BASE_URL}/api/profiles", headers=member_headers).json()
|
|
if len(existing) < 5:
|
|
break
|
|
p = api.post(f"{BASE_URL}/api/profiles",
|
|
json={"name": "TEST_Del"}, headers=member_headers).json()
|
|
assert "id" in p, f"profile create failed: {p}"
|
|
movies = api.get(f"{BASE_URL}/api/movies", headers=member_headers).json()
|
|
mid = movies[0]["id"]
|
|
h = {**member_headers, "X-Profile-Id": p["id"]}
|
|
api.post(f"{BASE_URL}/api/watchlist/{mid}", headers=h)
|
|
wl = api.get(f"{BASE_URL}/api/watchlist", headers=h).json()
|
|
assert any(x["id"] == mid for x in wl)
|
|
r = api.delete(f"{BASE_URL}/api/profiles/{p['id']}", headers=member_headers)
|
|
assert r.status_code == 200
|
|
|
|
|
|
# ---------- Parental control ----------
|
|
class TestParentalControl:
|
|
def _kid_profile(self, api, headers):
|
|
# find or create a PG kid profile
|
|
plist = api.get(f"{BASE_URL}/api/profiles", headers=headers).json()
|
|
for p in plist:
|
|
if p.get("max_rating") == "PG":
|
|
return p
|
|
return api.post(f"{BASE_URL}/api/profiles",
|
|
json={"name": "TEST_KidsRC", "is_kids": True,
|
|
"max_rating": "PG"}, headers=headers).json()
|
|
|
|
def test_movies_filtered_by_kids_profile(self, api, member_headers):
|
|
kid = self._kid_profile(api, member_headers)
|
|
h = {**member_headers, "X-Profile-Id": kid["id"]}
|
|
r = api.get(f"{BASE_URL}/api/movies", headers=h)
|
|
assert r.status_code == 200
|
|
movies = r.json()
|
|
# only G/PG/NR allowed
|
|
for m in movies:
|
|
assert m["rating"] in ("G", "PG", "NR"), f"Disallowed rating: {m['rating']}"
|
|
|
|
def test_featured_respects_cap(self, api, member_headers):
|
|
kid = self._kid_profile(api, member_headers)
|
|
h = {**member_headers, "X-Profile-Id": kid["id"]}
|
|
r = api.get(f"{BASE_URL}/api/movies/featured", headers=h)
|
|
# may return 200 or 404 if no featured G/PG/NR exists; both acceptable
|
|
assert r.status_code in (200, 404)
|
|
if r.status_code == 200:
|
|
assert r.json()["rating"] in ("G", "PG", "NR")
|
|
|
|
def test_watchlist_profile_scoped(self, api, member_headers):
|
|
# Use default (unrestricted) profile to add, kid profile should not see
|
|
plist = api.get(f"{BASE_URL}/api/profiles", headers=member_headers).json()
|
|
default = next((p for p in plist if p["max_rating"] == "NR"), plist[0])
|
|
kid = self._kid_profile(api, member_headers)
|
|
movies = api.get(f"{BASE_URL}/api/movies", headers=member_headers).json()
|
|
# find an R-rated or other distinct movie
|
|
target = next((m for m in movies if m["rating"] not in ("G", "PG", "NR")), movies[0])
|
|
mid = target["id"]
|
|
h_def = {**member_headers, "X-Profile-Id": default["id"]}
|
|
api.post(f"{BASE_URL}/api/watchlist/{mid}", headers=h_def)
|
|
wl_def = api.get(f"{BASE_URL}/api/watchlist", headers=h_def).json()
|
|
assert any(m["id"] == mid for m in wl_def)
|
|
h_kid = {**member_headers, "X-Profile-Id": kid["id"]}
|
|
wl_kid = api.get(f"{BASE_URL}/api/watchlist", headers=h_kid).json()
|
|
assert all(m["id"] != mid for m in wl_kid)
|
|
|
|
|
|
# ---------- Settings (admin) ----------
|
|
class TestSettings:
|
|
def test_get_settings(self, api, admin_headers):
|
|
r = api.get(f"{BASE_URL}/api/settings", headers=admin_headers)
|
|
assert r.status_code == 200
|
|
s = r.json()
|
|
assert "tmdb_configured" in s and "radarr_configured" in s
|
|
assert isinstance(s["tmdb_configured"], bool)
|
|
|
|
def test_settings_member_forbidden(self, api, member_headers):
|
|
r = api.get(f"{BASE_URL}/api/settings", headers=member_headers)
|
|
assert r.status_code == 403
|
|
|
|
def test_put_settings_persists(self, api, admin_headers):
|
|
# snapshot current state
|
|
before = api.get(f"{BASE_URL}/api/settings", headers=admin_headers).json()
|
|
# set empty -> tmdb_configured False
|
|
r = api.put(f"{BASE_URL}/api/settings",
|
|
json={"tmdb_api_key": "", "radarr_url": "", "radarr_api_key": ""},
|
|
headers=admin_headers)
|
|
assert r.status_code == 200
|
|
assert r.json()["tmdb_configured"] is False
|
|
assert r.json()["radarr_configured"] is False
|
|
# set a fake TMDB key (won't be hit) — verify configured flips true
|
|
r2 = api.put(f"{BASE_URL}/api/settings",
|
|
json={"tmdb_api_key": "TEST_FAKE_KEY", "radarr_url": "",
|
|
"radarr_api_key": ""}, headers=admin_headers)
|
|
assert r2.status_code == 200
|
|
assert r2.json()["tmdb_configured"] is True
|
|
# restore to empty (avoid hitting TMDB in other tests)
|
|
api.put(f"{BASE_URL}/api/settings",
|
|
json={"tmdb_api_key": before.get("tmdb_api_key", ""),
|
|
"radarr_url": before.get("radarr_url", ""),
|
|
"radarr_api_key": before.get("radarr_api_key", "")},
|
|
headers=admin_headers)
|
|
|
|
|
|
# ---------- TMDB / Radarr unconfigured ----------
|
|
class TestExternalUnconfigured:
|
|
def test_tmdb_search_no_key(self, api, admin_headers):
|
|
# ensure unconfigured
|
|
api.put(f"{BASE_URL}/api/settings",
|
|
json={"tmdb_api_key": "", "radarr_url": "", "radarr_api_key": ""},
|
|
headers=admin_headers)
|
|
r = api.get(f"{BASE_URL}/api/tmdb/search",
|
|
params={"q": "matrix"}, headers=admin_headers)
|
|
assert r.status_code == 400
|
|
|
|
def test_radarr_test_unconfigured(self, api, admin_headers):
|
|
r = api.post(f"{BASE_URL}/api/radarr/test", headers=admin_headers)
|
|
assert r.status_code == 400
|
|
|
|
def test_radarr_movies_unconfigured(self, api, admin_headers):
|
|
r = api.get(f"{BASE_URL}/api/radarr/movies", headers=admin_headers)
|
|
assert r.status_code == 400
|
|
|
|
|
|
# ---------- HLS / Transcode contract ----------
|
|
class TestHLS:
|
|
def test_transcode_external_400(self, api, admin_headers):
|
|
movies = api.get(f"{BASE_URL}/api/movies", headers=admin_headers).json()
|
|
ext = next(m for m in movies if m["storage_type"] == "external")
|
|
r = api.post(f"{BASE_URL}/api/movies/{ext['id']}/transcode",
|
|
headers=admin_headers)
|
|
assert r.status_code == 400
|
|
|
|
def test_hls_serve_404_no_files(self, api, admin_token, admin_headers):
|
|
movies = api.get(f"{BASE_URL}/api/movies", headers=admin_headers).json()
|
|
mid = movies[0]["id"]
|
|
r = api.get(f"{BASE_URL}/api/movies/{mid}/hls/playlist.m3u8",
|
|
params={"auth": admin_token})
|
|
assert r.status_code == 404
|
|
|
|
def test_hls_serve_unauth(self, api, admin_headers):
|
|
movies = api.get(f"{BASE_URL}/api/movies", headers=admin_headers).json()
|
|
mid = movies[0]["id"]
|
|
r = api.get(f"{BASE_URL}/api/movies/{mid}/hls/playlist.m3u8")
|
|
assert r.status_code == 401
|
|
|
|
|
|
# ---------- Subtitles ----------
|
|
class TestSubtitles:
|
|
@pytest.fixture(scope="class")
|
|
def movie_id(self, api, admin_headers):
|
|
return api.get(f"{BASE_URL}/api/movies", headers=admin_headers).json()[0]["id"]
|
|
|
|
def test_upload_vtt(self, api, admin_headers, admin_token, movie_id):
|
|
vtt = b"WEBVTT\n\n00:00:01.000 --> 00:00:02.000\nHello"
|
|
files = {"file": ("test.vtt", io.BytesIO(vtt), "text/vtt")}
|
|
data = {"language": "en", "label": "TEST_English", "is_default": "false"}
|
|
r = requests.post(f"{BASE_URL}/api/movies/{movie_id}/subtitles",
|
|
headers=admin_headers, files=files, data=data)
|
|
assert r.status_code == 200, r.text
|
|
sub = r.json()
|
|
sid = sub["id"]
|
|
assert sub["movie_id"] == movie_id
|
|
assert sub["language"] == "en"
|
|
|
|
# list
|
|
ls = api.get(f"{BASE_URL}/api/movies/{movie_id}/subtitles",
|
|
headers=admin_headers)
|
|
assert ls.status_code == 200
|
|
assert any(x["id"] == sid for x in ls.json())
|
|
|
|
# serve via header auth
|
|
f = requests.get(f"{BASE_URL}/api/subtitles/{sid}/file",
|
|
headers=admin_headers)
|
|
assert f.status_code == 200
|
|
assert "text/vtt" in f.headers.get("content-type", "")
|
|
assert b"WEBVTT" in f.content
|
|
|
|
# serve via query auth
|
|
f2 = requests.get(f"{BASE_URL}/api/subtitles/{sid}/file",
|
|
params={"auth": admin_token})
|
|
assert f2.status_code == 200
|
|
|
|
# serve without auth
|
|
f3 = requests.get(f"{BASE_URL}/api/subtitles/{sid}/file")
|
|
assert f3.status_code == 401
|
|
|
|
# delete
|
|
d = requests.delete(f"{BASE_URL}/api/subtitles/{sid}",
|
|
headers=admin_headers)
|
|
assert d.status_code == 200
|
|
# verify gone
|
|
ls2 = api.get(f"{BASE_URL}/api/movies/{movie_id}/subtitles",
|
|
headers=admin_headers).json()
|
|
assert all(x["id"] != sid for x in ls2)
|
|
|
|
def test_upload_srt_converts(self, api, admin_headers, movie_id):
|
|
srt = b"1\n00:00:01,000 --> 00:00:02,000\nHi from SRT\n"
|
|
files = {"file": ("test.srt", io.BytesIO(srt), "text/plain")}
|
|
data = {"language": "fr", "label": "TEST_French"}
|
|
r = requests.post(f"{BASE_URL}/api/movies/{movie_id}/subtitles",
|
|
headers=admin_headers, files=files, data=data)
|
|
assert r.status_code == 200
|
|
sid = r.json()["id"]
|
|
f = requests.get(f"{BASE_URL}/api/subtitles/{sid}/file",
|
|
headers=admin_headers)
|
|
assert f.status_code == 200
|
|
assert b"WEBVTT" in f.content # converted
|
|
# cleanup
|
|
requests.delete(f"{BASE_URL}/api/subtitles/{sid}", headers=admin_headers)
|
|
|
|
def test_upload_subtitle_member_forbidden(self, api, member_headers, movie_id):
|
|
files = {"file": ("x.vtt", b"WEBVTT\n\n", "text/vtt")}
|
|
r = requests.post(f"{BASE_URL}/api/movies/{movie_id}/subtitles",
|
|
headers=member_headers, files=files,
|
|
data={"language": "en"})
|
|
assert r.status_code == 403
|