Files
2026-04-29 16:21:32 +00:00

161 lines
5.7 KiB
Python

"""HLS transcoding via ffmpeg.
Two modes:
- quick (stream-copy): instant, single bitrate, no quality loss. Source must be H.264/AAC.
- abr (adaptive): re-encode to multiple bitrates with master playlist for ABR streaming.
"""
import asyncio
import json
import logging
import shutil
from pathlib import Path
from typing import Optional, List, Tuple, Callable, Awaitable
logger = logging.getLogger("kino.transcode")
async def probe_video(source: Path) -> Tuple[int, int]:
"""Return (width, height) using ffprobe; (0, 0) on failure."""
try:
proc = await asyncio.create_subprocess_exec(
"ffprobe", "-v", "error", "-select_streams", "v:0",
"-show_entries", "stream=width,height", "-of", "json", str(source),
stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
)
stdout, _ = await proc.communicate()
if proc.returncode != 0:
return (0, 0)
data = json.loads(stdout.decode("utf-8", errors="ignore") or "{}")
s = (data.get("streams") or [{}])[0]
return (int(s.get("width") or 0), int(s.get("height") or 0))
except Exception:
return (0, 0)
# (height, video_bitrate, max_bitrate, buffer, audio_bitrate)
ALL_VARIANTS = [
(1080, "5000k", "5500k", "7500k", "192k"),
(720, "2800k", "3000k", "4200k", "128k"),
(480, "1400k", "1500k", "2100k", "96k"),
(360, "800k", "856k", "1200k", "64k"),
]
def variants_for_source(src_height: int) -> List[Tuple[int, str, str, str, str]]:
"""Pick variants ≤ source height. Always include at least one (smallest)."""
if src_height <= 0:
return [ALL_VARIANTS[2]] # safe default 480p
chosen = [v for v in ALL_VARIANTS if v[0] <= src_height]
if not chosen:
chosen = [ALL_VARIANTS[-1]]
return chosen
async def transcode_quick(source: Path, out_dir: Path, on_status: Callable[..., Awaitable]) -> None:
"""Stream-copy to single-rate HLS. Output filename: playlist.m3u8."""
if not source.is_file():
await on_status("failed", error=f"Source missing: {source}")
return
out_dir.mkdir(parents=True, exist_ok=True)
cmd = [
"ffmpeg", "-y", "-i", str(source),
"-c:v", "copy", "-c:a", "copy",
"-bsf:v", "h264_mp4toannexb",
"-f", "hls", "-hls_time", "6",
"-hls_list_size", "0", "-hls_playlist_type", "vod",
"-hls_segment_filename", str(out_dir / "seg_%04d.ts"),
str(out_dir / "playlist.m3u8"),
]
await _run(cmd, on_status, out_dir, "playlist.m3u8")
async def transcode_abr(source: Path, out_dir: Path, on_status: Callable[..., Awaitable]) -> None:
"""Multi-bitrate ABR HLS. Output entry filename: master.m3u8."""
if not source.is_file():
await on_status("failed", error=f"Source missing: {source}")
return
out_dir.mkdir(parents=True, exist_ok=True)
_, height = await probe_video(source)
variants = variants_for_source(height)
n = len(variants)
# Build filter graph
splits = "".join(f"[v{i}]" for i in range(n))
fc_parts = [f"[0:v]split={n}{splits}"]
for i, (h, *_rest) in enumerate(variants):
fc_parts.append(f"[v{i}]scale=w=-2:h={h}[v{i}out]")
filter_complex = ";".join(fc_parts)
cmd: List[str] = ["ffmpeg", "-y", "-i", str(source), "-filter_complex", filter_complex]
for i, (_h, vb, maxr, buf, _ab) in enumerate(variants):
cmd += [
"-map", f"[v{i}out]",
f"-c:v:{i}", "libx264",
f"-preset:v:{i}", "veryfast",
f"-profile:v:{i}", "main",
f"-pix_fmt:v:{i}", "yuv420p",
f"-b:v:{i}", vb,
f"-maxrate:v:{i}", maxr,
f"-bufsize:v:{i}", buf,
f"-g", "48", f"-keyint_min", "48", f"-sc_threshold", "0",
]
# Audio: same source mapped N times, one per variant
for i in range(n):
cmd += ["-map", "a:0?"]
for i, (*_v, ab) in enumerate(variants):
cmd += [f"-c:a:{i}", "aac", f"-b:a:{i}", ab, f"-ac:a:{i}", "2"]
var_stream_map = " ".join(f"v:{i},a:{i}" for i in range(n))
cmd += [
"-f", "hls",
"-hls_time", "6",
"-hls_playlist_type", "vod",
"-hls_flags", "independent_segments",
"-hls_segment_filename", str(out_dir / "v%v" / "seg_%04d.ts"),
"-master_pl_name", "master.m3u8",
"-var_stream_map", var_stream_map,
str(out_dir / "v%v" / "playlist.m3u8"),
]
# Pre-create variant subdirs (some ffmpeg builds need them)
for i in range(n):
(out_dir / f"v{i}").mkdir(exist_ok=True)
await _run(cmd, on_status, out_dir, "master.m3u8")
async def _run(cmd: List[str], on_status, out_dir: Path, entry_filename: str) -> None:
await on_status("running")
try:
proc = await asyncio.create_subprocess_exec(
*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
)
_stdout, stderr = await proc.communicate()
if proc.returncode != 0:
err = stderr.decode("utf-8", errors="ignore")[-500:]
logger.error(f"ffmpeg failed: {err}")
await on_status("failed", error=err[:200])
shutil.rmtree(out_dir, ignore_errors=True)
return
await on_status("done", entry=entry_filename)
except FileNotFoundError:
await on_status("failed", error="ffmpeg not installed")
except Exception as e:
logger.exception("transcode crashed")
await on_status("failed", error=str(e))
shutil.rmtree(out_dir, ignore_errors=True)
def srt_to_vtt(srt_text: str) -> str:
lines = srt_text.replace("\r\n", "\n").split("\n")
out = ["WEBVTT", ""]
for line in lines:
if "-->" in line:
out.append(line.replace(",", "."))
else:
out.append(line)
return "\n".join(out)