mirror of
https://github.com/myronblair/kino
synced 2026-06-30 17:50:29 -05:00
auto-commit for df4b0748-985b-4592-8c48-1e56102f3613
This commit is contained in:
@@ -0,0 +1,295 @@
|
||||
"""
|
||||
Backend pytest suite for Kino Personal Media Server.
|
||||
Tests auth, movies, watchlist, progress, requests, and stream auth.
|
||||
"""
|
||||
import os
|
||||
import time
|
||||
import uuid
|
||||
import pytest
|
||||
import requests
|
||||
|
||||
BASE_URL = os.environ.get("REACT_APP_BACKEND_URL", "https://streamhoard.preview.emergentagent.com").rstrip("/")
|
||||
ADMIN_EMAIL = "admin@kino.local"
|
||||
ADMIN_PASSWORD = "kino-admin-2026"
|
||||
|
||||
|
||||
# ---------- Fixtures ----------
|
||||
@pytest.fixture(scope="session")
|
||||
def api():
|
||||
s = requests.Session()
|
||||
s.headers.update({"Content-Type": "application/json"})
|
||||
return s
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def admin_token(api):
|
||||
r = api.post(f"{BASE_URL}/api/auth/login", json={"email": ADMIN_EMAIL, "password": ADMIN_PASSWORD})
|
||||
assert r.status_code == 200, f"admin login failed: {r.status_code} {r.text}"
|
||||
data = r.json()
|
||||
assert data["user"]["is_admin"] is True
|
||||
return data["access_token"]
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def member_token(api):
|
||||
email = f"TEST_user_{uuid.uuid4().hex[:8]}@kino.local"
|
||||
r = api.post(f"{BASE_URL}/api/auth/register", json={"email": email, "password": "pass1234", "name": "Test User"})
|
||||
assert r.status_code == 200, f"register failed: {r.text}"
|
||||
data = r.json()
|
||||
return data["access_token"], data["user"]
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def admin_headers(admin_token):
|
||||
return {"Authorization": f"Bearer {admin_token}"}
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def member_headers(member_token):
|
||||
tok, _ = member_token
|
||||
return {"Authorization": f"Bearer {tok}"}
|
||||
|
||||
|
||||
# ---------- Health ----------
|
||||
class TestHealth:
|
||||
def test_root(self, api):
|
||||
r = api.get(f"{BASE_URL}/api/")
|
||||
assert r.status_code == 200
|
||||
assert r.json().get("status") == "ok"
|
||||
|
||||
|
||||
# ---------- Auth ----------
|
||||
class TestAuth:
|
||||
def test_admin_login(self, api):
|
||||
r = api.post(f"{BASE_URL}/api/auth/login", json={"email": ADMIN_EMAIL, "password": ADMIN_PASSWORD})
|
||||
assert r.status_code == 200
|
||||
d = r.json()
|
||||
assert "access_token" in d and isinstance(d["access_token"], str)
|
||||
assert d["user"]["email"] == ADMIN_EMAIL
|
||||
assert d["user"]["is_admin"] is True
|
||||
|
||||
def test_login_invalid(self, api):
|
||||
r = api.post(f"{BASE_URL}/api/auth/login", json={"email": ADMIN_EMAIL, "password": "wrong"})
|
||||
assert r.status_code == 401
|
||||
|
||||
def test_register_and_me(self, api):
|
||||
email = f"TEST_reg_{uuid.uuid4().hex[:8]}@kino.local"
|
||||
r = api.post(f"{BASE_URL}/api/auth/register", json={"email": email, "password": "pass1234", "name": "Reg User"})
|
||||
assert r.status_code == 200
|
||||
d = r.json()
|
||||
token = d["access_token"]
|
||||
assert d["user"]["email"] == email.lower()
|
||||
assert d["user"]["is_admin"] is False
|
||||
# /me
|
||||
r2 = api.get(f"{BASE_URL}/api/auth/me", headers={"Authorization": f"Bearer {token}"})
|
||||
assert r2.status_code == 200
|
||||
assert r2.json()["email"] == email.lower()
|
||||
|
||||
def test_register_duplicate(self, api):
|
||||
email = f"TEST_dup_{uuid.uuid4().hex[:8]}@kino.local"
|
||||
api.post(f"{BASE_URL}/api/auth/register", json={"email": email, "password": "pass1234", "name": "U"})
|
||||
r = api.post(f"{BASE_URL}/api/auth/register", json={"email": email, "password": "pass1234", "name": "U"})
|
||||
assert r.status_code == 400
|
||||
|
||||
def test_me_no_token(self, api):
|
||||
r = api.get(f"{BASE_URL}/api/auth/me")
|
||||
assert r.status_code == 401
|
||||
|
||||
|
||||
# ---------- Movies (public) ----------
|
||||
class TestMovies:
|
||||
def test_list_movies_seeded(self, api):
|
||||
r = api.get(f"{BASE_URL}/api/movies")
|
||||
assert r.status_code == 200
|
||||
movies = r.json()
|
||||
assert isinstance(movies, list)
|
||||
assert len(movies) >= 10
|
||||
# No _id leaked
|
||||
assert all("_id" not in m for m in movies)
|
||||
# Required fields
|
||||
m = movies[0]
|
||||
for f in ("id", "title", "video_url", "storage_type"):
|
||||
assert f in m
|
||||
|
||||
def test_featured(self, api):
|
||||
r = api.get(f"{BASE_URL}/api/movies/featured")
|
||||
assert r.status_code == 200
|
||||
m = r.json()
|
||||
assert m["title"] == "Big Buck Bunny"
|
||||
assert "_id" not in m
|
||||
|
||||
def test_genres(self, api):
|
||||
r = api.get(f"{BASE_URL}/api/movies/genres")
|
||||
assert r.status_code == 200
|
||||
genres = r.json()
|
||||
assert isinstance(genres, list)
|
||||
assert len(genres) > 0
|
||||
assert genres == sorted(genres)
|
||||
|
||||
def test_get_movie_404(self, api):
|
||||
r = api.get(f"{BASE_URL}/api/movies/nonexistent-id-xyz")
|
||||
assert r.status_code == 404
|
||||
|
||||
def test_get_movie_by_id(self, api):
|
||||
movies = api.get(f"{BASE_URL}/api/movies").json()
|
||||
mid = movies[0]["id"]
|
||||
r = api.get(f"{BASE_URL}/api/movies/{mid}")
|
||||
assert r.status_code == 200
|
||||
assert r.json()["id"] == mid
|
||||
|
||||
def test_search_query(self, api):
|
||||
r = api.get(f"{BASE_URL}/api/movies", params={"q": "bunny"})
|
||||
assert r.status_code == 200
|
||||
results = r.json()
|
||||
assert any("bunny" in m["title"].lower() for m in results)
|
||||
|
||||
|
||||
# ---------- Movies admin CRUD ----------
|
||||
class TestMoviesAdmin:
|
||||
def test_create_requires_admin(self, api, member_headers):
|
||||
r = api.post(f"{BASE_URL}/api/movies", json={"title": "X"}, headers=member_headers)
|
||||
assert r.status_code == 403
|
||||
|
||||
def test_create_update_delete(self, api, admin_headers):
|
||||
payload = {
|
||||
"title": "TEST_AdminMovie",
|
||||
"description": "test",
|
||||
"year": 2024,
|
||||
"genres": ["Test"],
|
||||
"video_url": "https://example.com/x.mp4",
|
||||
"storage_type": "external",
|
||||
"featured": False,
|
||||
}
|
||||
r = api.post(f"{BASE_URL}/api/movies", json=payload, headers=admin_headers)
|
||||
assert r.status_code == 200, r.text
|
||||
created = r.json()
|
||||
mid = created["id"]
|
||||
assert created["title"] == "TEST_AdminMovie"
|
||||
|
||||
# GET to verify persistence
|
||||
g = api.get(f"{BASE_URL}/api/movies/{mid}")
|
||||
assert g.status_code == 200
|
||||
assert g.json()["title"] == "TEST_AdminMovie"
|
||||
|
||||
# PATCH
|
||||
u = api.patch(f"{BASE_URL}/api/movies/{mid}", json={"title": "TEST_AdminMovieUpdated"}, headers=admin_headers)
|
||||
assert u.status_code == 200
|
||||
assert u.json()["title"] == "TEST_AdminMovieUpdated"
|
||||
|
||||
# GET re-verify
|
||||
g2 = api.get(f"{BASE_URL}/api/movies/{mid}")
|
||||
assert g2.json()["title"] == "TEST_AdminMovieUpdated"
|
||||
|
||||
# DELETE
|
||||
d = api.delete(f"{BASE_URL}/api/movies/{mid}", headers=admin_headers)
|
||||
assert d.status_code == 200
|
||||
|
||||
# Verify 404
|
||||
g3 = api.get(f"{BASE_URL}/api/movies/{mid}")
|
||||
assert g3.status_code == 404
|
||||
|
||||
|
||||
# ---------- Watchlist ----------
|
||||
class TestWatchlist:
|
||||
def test_watchlist_flow(self, api, member_headers):
|
||||
movies = api.get(f"{BASE_URL}/api/movies").json()
|
||||
mid = movies[0]["id"]
|
||||
|
||||
r = api.post(f"{BASE_URL}/api/watchlist/{mid}", headers=member_headers)
|
||||
assert r.status_code == 200
|
||||
|
||||
r2 = api.get(f"{BASE_URL}/api/watchlist", headers=member_headers)
|
||||
assert r2.status_code == 200
|
||||
wl = r2.json()
|
||||
assert any(m["id"] == mid for m in wl)
|
||||
|
||||
# Idempotent add
|
||||
r3 = api.post(f"{BASE_URL}/api/watchlist/{mid}", headers=member_headers)
|
||||
assert r3.status_code == 200
|
||||
|
||||
d = api.delete(f"{BASE_URL}/api/watchlist/{mid}", headers=member_headers)
|
||||
assert d.status_code == 200
|
||||
|
||||
r4 = api.get(f"{BASE_URL}/api/watchlist", headers=member_headers)
|
||||
assert all(m["id"] != mid for m in r4.json())
|
||||
|
||||
def test_watchlist_unauth(self, api):
|
||||
r = api.get(f"{BASE_URL}/api/watchlist")
|
||||
assert r.status_code == 401
|
||||
|
||||
|
||||
# ---------- Progress ----------
|
||||
class TestProgress:
|
||||
def test_progress_upsert_and_continue(self, api, member_headers):
|
||||
movies = api.get(f"{BASE_URL}/api/movies").json()
|
||||
mid = movies[1]["id"]
|
||||
r = api.post(f"{BASE_URL}/api/progress",
|
||||
json={"movie_id": mid, "position_seconds": 30, "duration_seconds": 600},
|
||||
headers=member_headers)
|
||||
assert r.status_code == 200
|
||||
|
||||
r2 = api.get(f"{BASE_URL}/api/progress/continue", headers=member_headers)
|
||||
assert r2.status_code == 200
|
||||
rows = r2.json()
|
||||
assert any(m["id"] == mid for m in rows)
|
||||
# progress object attached
|
||||
target = next(m for m in rows if m["id"] == mid)
|
||||
assert target["progress"]["position_seconds"] == 30
|
||||
|
||||
# update
|
||||
r3 = api.post(f"{BASE_URL}/api/progress",
|
||||
json={"movie_id": mid, "position_seconds": 90, "duration_seconds": 600},
|
||||
headers=member_headers)
|
||||
assert r3.status_code == 200
|
||||
r4 = api.get(f"{BASE_URL}/api/progress/{mid}", headers=member_headers)
|
||||
assert r4.json()["position_seconds"] == 90
|
||||
|
||||
|
||||
# ---------- Requests ----------
|
||||
class TestRequests:
|
||||
def test_submit_and_admin_list(self, api, member_headers, admin_headers):
|
||||
payload = {"title": "TEST_Req_Movie", "year": 2023, "notes": "please add"}
|
||||
r = api.post(f"{BASE_URL}/api/requests", json=payload, headers=member_headers)
|
||||
assert r.status_code == 200
|
||||
req = r.json()
|
||||
rid = req["id"]
|
||||
assert req["status"] == "pending"
|
||||
|
||||
mine = api.get(f"{BASE_URL}/api/requests/mine", headers=member_headers)
|
||||
assert mine.status_code == 200
|
||||
assert any(x["id"] == rid for x in mine.json())
|
||||
|
||||
# Member cannot list all
|
||||
forbid = api.get(f"{BASE_URL}/api/requests", headers=member_headers)
|
||||
assert forbid.status_code == 403
|
||||
|
||||
# Admin lists
|
||||
all_r = api.get(f"{BASE_URL}/api/requests", headers=admin_headers)
|
||||
assert all_r.status_code == 200
|
||||
assert any(x["id"] == rid for x in all_r.json())
|
||||
|
||||
# Admin updates
|
||||
upd = api.patch(f"{BASE_URL}/api/requests/{rid}", json={"status": "fulfilled"}, headers=admin_headers)
|
||||
assert upd.status_code == 200
|
||||
assert upd.json()["status"] == "fulfilled"
|
||||
|
||||
|
||||
# ---------- Streaming ----------
|
||||
class TestStream:
|
||||
def test_stream_no_token(self, api):
|
||||
movies = api.get(f"{BASE_URL}/api/movies").json()
|
||||
mid = movies[0]["id"]
|
||||
r = api.get(f"{BASE_URL}/api/stream/{mid}", allow_redirects=False)
|
||||
assert r.status_code == 401
|
||||
|
||||
def test_stream_external_returns_400(self, api, admin_token):
|
||||
# All seeded movies are storage_type=external -> should 400
|
||||
movies = api.get(f"{BASE_URL}/api/movies").json()
|
||||
mid = movies[0]["id"]
|
||||
r = api.get(f"{BASE_URL}/api/stream/{mid}", params={"auth": admin_token}, allow_redirects=False)
|
||||
assert r.status_code == 400
|
||||
|
||||
def test_upload_unauth(self, api):
|
||||
# Just verify rejection without token; do NOT upload large file
|
||||
r = requests.post(f"{BASE_URL}/api/upload/video", data={"title": "X"}, files={"file": ("x.mp4", b"x", "video/mp4")})
|
||||
assert r.status_code in (401, 403, 422)
|
||||
Reference in New Issue
Block a user