""" Phase 2 backend tests for Kino: profiles, parental control, settings, TMDB/Radarr unconfigured paths, subtitles, HLS endpoint contract. """ import os import io import uuid import pytest import requests BASE_URL = os.environ.get("REACT_APP_BACKEND_URL", "").rstrip("/") ADMIN_EMAIL = "admin@kino.local" ADMIN_PASSWORD = "kino-admin-2026" # ---------- Fixtures ---------- @pytest.fixture(scope="module") def api(): s = requests.Session() return s @pytest.fixture(scope="module") def admin_token(api): r = api.post(f"{BASE_URL}/api/auth/login", json={"email": ADMIN_EMAIL, "password": ADMIN_PASSWORD}) assert r.status_code == 200 return r.json()["access_token"] @pytest.fixture(scope="module") def admin_headers(admin_token): return {"Authorization": f"Bearer {admin_token}"} @pytest.fixture(scope="module") def member(api): """Create a fresh member for profile-scoped tests.""" email = f"TEST_p2_{uuid.uuid4().hex[:8]}@kino.local" r = api.post(f"{BASE_URL}/api/auth/register", json={"email": email, "password": "pass1234", "name": "P2 User"}) assert r.status_code == 200 d = r.json() return d["access_token"], d["user"]["id"] @pytest.fixture(scope="module") def member_headers(member): tok, _ = member return {"Authorization": f"Bearer {tok}"} # ---------- PROFILES ---------- class TestProfiles: def test_default_profile_auto_created(self, api, member_headers): r = api.get(f"{BASE_URL}/api/profiles", headers=member_headers) assert r.status_code == 200 profiles = r.json() assert isinstance(profiles, list) assert len(profiles) >= 1 assert profiles[0]["name"] == "P2 User" assert profiles[0]["max_rating"] == "NR" assert "id" in profiles[0] def test_create_profile(self, api, member_headers): r = api.post(f"{BASE_URL}/api/profiles", json={"name": "TEST_Kid", "is_kids": True, "max_rating": "PG", "avatar_color": "#22aa55"}, headers=member_headers) assert r.status_code == 200, r.text p = r.json() assert p["name"] == "TEST_Kid" assert p["is_kids"] is True assert p["max_rating"] == "PG" # persistence listing = api.get(f"{BASE_URL}/api/profiles", headers=member_headers).json() assert any(x["id"] == p["id"] for x in listing) def test_update_profile(self, api, member_headers): # create then patch c = api.post(f"{BASE_URL}/api/profiles", json={"name": "TEST_PatchMe"}, headers=member_headers).json() r = api.patch(f"{BASE_URL}/api/profiles/{c['id']}", json={"name": "TEST_Patched", "max_rating": "PG-13"}, headers=member_headers) assert r.status_code == 200 assert r.json()["name"] == "TEST_Patched" assert r.json()["max_rating"] == "PG-13" def test_max_5_profiles(self, api, member_headers): # current count existing = api.get(f"{BASE_URL}/api/profiles", headers=member_headers).json() # create up to 5 for i in range(max(0, 5 - len(existing))): api.post(f"{BASE_URL}/api/profiles", json={"name": f"TEST_P{i}"}, headers=member_headers) # 6th must fail r = api.post(f"{BASE_URL}/api/profiles", json={"name": "TEST_Overflow"}, headers=member_headers) assert r.status_code == 400 def test_cannot_delete_last_profile(self, api): # fresh user with single profile email = f"TEST_solo_{uuid.uuid4().hex[:6]}@kino.local" d = api.post(f"{BASE_URL}/api/auth/register", json={"email": email, "password": "pass1234", "name": "Solo"}).json() h = {"Authorization": f"Bearer {d['access_token']}"} plist = api.get(f"{BASE_URL}/api/profiles", headers=h).json() assert len(plist) == 1 r = api.delete(f"{BASE_URL}/api/profiles/{plist[0]['id']}", headers=h) assert r.status_code == 400 def test_delete_profile_cleans_watchlist(self, api, member_headers): # ensure room (we may be at 5-profile cap from prior test) existing = api.get(f"{BASE_URL}/api/profiles", headers=member_headers).json() # delete any TEST_P* profiles to free a slot for x in existing: if x["name"].startswith("TEST_P") and len(existing) > 1: api.delete(f"{BASE_URL}/api/profiles/{x['id']}", headers=member_headers) existing = api.get(f"{BASE_URL}/api/profiles", headers=member_headers).json() if len(existing) < 5: break p = api.post(f"{BASE_URL}/api/profiles", json={"name": "TEST_Del"}, headers=member_headers).json() assert "id" in p, f"profile create failed: {p}" movies = api.get(f"{BASE_URL}/api/movies", headers=member_headers).json() mid = movies[0]["id"] h = {**member_headers, "X-Profile-Id": p["id"]} api.post(f"{BASE_URL}/api/watchlist/{mid}", headers=h) wl = api.get(f"{BASE_URL}/api/watchlist", headers=h).json() assert any(x["id"] == mid for x in wl) r = api.delete(f"{BASE_URL}/api/profiles/{p['id']}", headers=member_headers) assert r.status_code == 200 # ---------- Parental control ---------- class TestParentalControl: def _kid_profile(self, api, headers): # find or create a PG kid profile plist = api.get(f"{BASE_URL}/api/profiles", headers=headers).json() for p in plist: if p.get("max_rating") == "PG": return p return api.post(f"{BASE_URL}/api/profiles", json={"name": "TEST_KidsRC", "is_kids": True, "max_rating": "PG"}, headers=headers).json() def test_movies_filtered_by_kids_profile(self, api, member_headers): kid = self._kid_profile(api, member_headers) h = {**member_headers, "X-Profile-Id": kid["id"]} r = api.get(f"{BASE_URL}/api/movies", headers=h) assert r.status_code == 200 movies = r.json() # only G/PG/NR allowed for m in movies: assert m["rating"] in ("G", "PG", "NR"), f"Disallowed rating: {m['rating']}" def test_featured_respects_cap(self, api, member_headers): kid = self._kid_profile(api, member_headers) h = {**member_headers, "X-Profile-Id": kid["id"]} r = api.get(f"{BASE_URL}/api/movies/featured", headers=h) # may return 200 or 404 if no featured G/PG/NR exists; both acceptable assert r.status_code in (200, 404) if r.status_code == 200: assert r.json()["rating"] in ("G", "PG", "NR") def test_watchlist_profile_scoped(self, api, member_headers): # Use default (unrestricted) profile to add, kid profile should not see plist = api.get(f"{BASE_URL}/api/profiles", headers=member_headers).json() default = next((p for p in plist if p["max_rating"] == "NR"), plist[0]) kid = self._kid_profile(api, member_headers) movies = api.get(f"{BASE_URL}/api/movies", headers=member_headers).json() # find an R-rated or other distinct movie target = next((m for m in movies if m["rating"] not in ("G", "PG", "NR")), movies[0]) mid = target["id"] h_def = {**member_headers, "X-Profile-Id": default["id"]} api.post(f"{BASE_URL}/api/watchlist/{mid}", headers=h_def) wl_def = api.get(f"{BASE_URL}/api/watchlist", headers=h_def).json() assert any(m["id"] == mid for m in wl_def) h_kid = {**member_headers, "X-Profile-Id": kid["id"]} wl_kid = api.get(f"{BASE_URL}/api/watchlist", headers=h_kid).json() assert all(m["id"] != mid for m in wl_kid) # ---------- Settings (admin) ---------- class TestSettings: def test_get_settings(self, api, admin_headers): r = api.get(f"{BASE_URL}/api/settings", headers=admin_headers) assert r.status_code == 200 s = r.json() assert "tmdb_configured" in s and "radarr_configured" in s assert isinstance(s["tmdb_configured"], bool) def test_settings_member_forbidden(self, api, member_headers): r = api.get(f"{BASE_URL}/api/settings", headers=member_headers) assert r.status_code == 403 def test_put_settings_persists(self, api, admin_headers): # snapshot current state before = api.get(f"{BASE_URL}/api/settings", headers=admin_headers).json() # set empty -> tmdb_configured False r = api.put(f"{BASE_URL}/api/settings", json={"tmdb_api_key": "", "radarr_url": "", "radarr_api_key": ""}, headers=admin_headers) assert r.status_code == 200 assert r.json()["tmdb_configured"] is False assert r.json()["radarr_configured"] is False # set a fake TMDB key (won't be hit) — verify configured flips true r2 = api.put(f"{BASE_URL}/api/settings", json={"tmdb_api_key": "TEST_FAKE_KEY", "radarr_url": "", "radarr_api_key": ""}, headers=admin_headers) assert r2.status_code == 200 assert r2.json()["tmdb_configured"] is True # restore to empty (avoid hitting TMDB in other tests) api.put(f"{BASE_URL}/api/settings", json={"tmdb_api_key": before.get("tmdb_api_key", ""), "radarr_url": before.get("radarr_url", ""), "radarr_api_key": before.get("radarr_api_key", "")}, headers=admin_headers) # ---------- TMDB / Radarr unconfigured ---------- class TestExternalUnconfigured: def test_tmdb_search_no_key(self, api, admin_headers): # ensure unconfigured api.put(f"{BASE_URL}/api/settings", json={"tmdb_api_key": "", "radarr_url": "", "radarr_api_key": ""}, headers=admin_headers) r = api.get(f"{BASE_URL}/api/tmdb/search", params={"q": "matrix"}, headers=admin_headers) assert r.status_code == 400 def test_radarr_test_unconfigured(self, api, admin_headers): r = api.post(f"{BASE_URL}/api/radarr/test", headers=admin_headers) assert r.status_code == 400 def test_radarr_movies_unconfigured(self, api, admin_headers): r = api.get(f"{BASE_URL}/api/radarr/movies", headers=admin_headers) assert r.status_code == 400 # ---------- HLS / Transcode contract ---------- class TestHLS: def test_transcode_external_400(self, api, admin_headers): movies = api.get(f"{BASE_URL}/api/movies", headers=admin_headers).json() ext = next(m for m in movies if m["storage_type"] == "external") r = api.post(f"{BASE_URL}/api/movies/{ext['id']}/transcode", headers=admin_headers) assert r.status_code == 400 def test_hls_serve_404_no_files(self, api, admin_token, admin_headers): movies = api.get(f"{BASE_URL}/api/movies", headers=admin_headers).json() mid = movies[0]["id"] r = api.get(f"{BASE_URL}/api/movies/{mid}/hls/playlist.m3u8", params={"auth": admin_token}) assert r.status_code == 404 def test_hls_serve_unauth(self, api, admin_headers): movies = api.get(f"{BASE_URL}/api/movies", headers=admin_headers).json() mid = movies[0]["id"] r = api.get(f"{BASE_URL}/api/movies/{mid}/hls/playlist.m3u8") assert r.status_code == 401 # ---------- Subtitles ---------- class TestSubtitles: @pytest.fixture(scope="class") def movie_id(self, api, admin_headers): return api.get(f"{BASE_URL}/api/movies", headers=admin_headers).json()[0]["id"] def test_upload_vtt(self, api, admin_headers, admin_token, movie_id): vtt = b"WEBVTT\n\n00:00:01.000 --> 00:00:02.000\nHello" files = {"file": ("test.vtt", io.BytesIO(vtt), "text/vtt")} data = {"language": "en", "label": "TEST_English", "is_default": "false"} r = requests.post(f"{BASE_URL}/api/movies/{movie_id}/subtitles", headers=admin_headers, files=files, data=data) assert r.status_code == 200, r.text sub = r.json() sid = sub["id"] assert sub["movie_id"] == movie_id assert sub["language"] == "en" # list ls = api.get(f"{BASE_URL}/api/movies/{movie_id}/subtitles", headers=admin_headers) assert ls.status_code == 200 assert any(x["id"] == sid for x in ls.json()) # serve via header auth f = requests.get(f"{BASE_URL}/api/subtitles/{sid}/file", headers=admin_headers) assert f.status_code == 200 assert "text/vtt" in f.headers.get("content-type", "") assert b"WEBVTT" in f.content # serve via query auth f2 = requests.get(f"{BASE_URL}/api/subtitles/{sid}/file", params={"auth": admin_token}) assert f2.status_code == 200 # serve without auth f3 = requests.get(f"{BASE_URL}/api/subtitles/{sid}/file") assert f3.status_code == 401 # delete d = requests.delete(f"{BASE_URL}/api/subtitles/{sid}", headers=admin_headers) assert d.status_code == 200 # verify gone ls2 = api.get(f"{BASE_URL}/api/movies/{movie_id}/subtitles", headers=admin_headers).json() assert all(x["id"] != sid for x in ls2) def test_upload_srt_converts(self, api, admin_headers, movie_id): srt = b"1\n00:00:01,000 --> 00:00:02,000\nHi from SRT\n" files = {"file": ("test.srt", io.BytesIO(srt), "text/plain")} data = {"language": "fr", "label": "TEST_French"} r = requests.post(f"{BASE_URL}/api/movies/{movie_id}/subtitles", headers=admin_headers, files=files, data=data) assert r.status_code == 200 sid = r.json()["id"] f = requests.get(f"{BASE_URL}/api/subtitles/{sid}/file", headers=admin_headers) assert f.status_code == 200 assert b"WEBVTT" in f.content # converted # cleanup requests.delete(f"{BASE_URL}/api/subtitles/{sid}", headers=admin_headers) def test_upload_subtitle_member_forbidden(self, api, member_headers, movie_id): files = {"file": ("x.vtt", b"WEBVTT\n\n", "text/vtt")} r = requests.post(f"{BASE_URL}/api/movies/{movie_id}/subtitles", headers=member_headers, files=files, data={"language": "en"}) assert r.status_code == 403