"""HLS transcoding via ffmpeg. Two modes: - quick (stream-copy): instant, single bitrate, no quality loss. Source must be H.264/AAC. - abr (adaptive): re-encode to multiple bitrates with master playlist for ABR streaming. """ import asyncio import json import logging import shutil from pathlib import Path from typing import Optional, List, Tuple, Callable, Awaitable logger = logging.getLogger("kino.transcode") async def probe_video(source: Path) -> Tuple[int, int]: """Return (width, height) using ffprobe; (0, 0) on failure.""" try: proc = await asyncio.create_subprocess_exec( "ffprobe", "-v", "error", "-select_streams", "v:0", "-show_entries", "stream=width,height", "-of", "json", str(source), stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, _ = await proc.communicate() if proc.returncode != 0: return (0, 0) data = json.loads(stdout.decode("utf-8", errors="ignore") or "{}") s = (data.get("streams") or [{}])[0] return (int(s.get("width") or 0), int(s.get("height") or 0)) except Exception: return (0, 0) # (height, video_bitrate, max_bitrate, buffer, audio_bitrate) ALL_VARIANTS = [ (1080, "5000k", "5500k", "7500k", "192k"), (720, "2800k", "3000k", "4200k", "128k"), (480, "1400k", "1500k", "2100k", "96k"), (360, "800k", "856k", "1200k", "64k"), ] def variants_for_source(src_height: int) -> List[Tuple[int, str, str, str, str]]: """Pick variants ≤ source height. Always include at least one (smallest).""" if src_height <= 0: return [ALL_VARIANTS[2]] # safe default 480p chosen = [v for v in ALL_VARIANTS if v[0] <= src_height] if not chosen: chosen = [ALL_VARIANTS[-1]] return chosen async def transcode_quick(source: Path, out_dir: Path, on_status: Callable[..., Awaitable]) -> None: """Stream-copy to single-rate HLS. Output filename: playlist.m3u8.""" if not source.is_file(): await on_status("failed", error=f"Source missing: {source}") return out_dir.mkdir(parents=True, exist_ok=True) cmd = [ "ffmpeg", "-y", "-i", str(source), "-c:v", "copy", "-c:a", "copy", "-bsf:v", "h264_mp4toannexb", "-f", "hls", "-hls_time", "6", "-hls_list_size", "0", "-hls_playlist_type", "vod", "-hls_segment_filename", str(out_dir / "seg_%04d.ts"), str(out_dir / "playlist.m3u8"), ] await _run(cmd, on_status, out_dir, "playlist.m3u8") async def transcode_abr(source: Path, out_dir: Path, on_status: Callable[..., Awaitable]) -> None: """Multi-bitrate ABR HLS. Output entry filename: master.m3u8.""" if not source.is_file(): await on_status("failed", error=f"Source missing: {source}") return out_dir.mkdir(parents=True, exist_ok=True) _, height = await probe_video(source) variants = variants_for_source(height) n = len(variants) # Build filter graph splits = "".join(f"[v{i}]" for i in range(n)) fc_parts = [f"[0:v]split={n}{splits}"] for i, (h, *_rest) in enumerate(variants): fc_parts.append(f"[v{i}]scale=w=-2:h={h}[v{i}out]") filter_complex = ";".join(fc_parts) cmd: List[str] = ["ffmpeg", "-y", "-i", str(source), "-filter_complex", filter_complex] for i, (_h, vb, maxr, buf, _ab) in enumerate(variants): cmd += [ "-map", f"[v{i}out]", f"-c:v:{i}", "libx264", f"-preset:v:{i}", "veryfast", f"-profile:v:{i}", "main", f"-pix_fmt:v:{i}", "yuv420p", f"-b:v:{i}", vb, f"-maxrate:v:{i}", maxr, f"-bufsize:v:{i}", buf, f"-g", "48", f"-keyint_min", "48", f"-sc_threshold", "0", ] # Audio: same source mapped N times, one per variant for i in range(n): cmd += ["-map", "a:0?"] for i, (*_v, ab) in enumerate(variants): cmd += [f"-c:a:{i}", "aac", f"-b:a:{i}", ab, f"-ac:a:{i}", "2"] var_stream_map = " ".join(f"v:{i},a:{i}" for i in range(n)) cmd += [ "-f", "hls", "-hls_time", "6", "-hls_playlist_type", "vod", "-hls_flags", "independent_segments", "-hls_segment_filename", str(out_dir / "v%v" / "seg_%04d.ts"), "-master_pl_name", "master.m3u8", "-var_stream_map", var_stream_map, str(out_dir / "v%v" / "playlist.m3u8"), ] # Pre-create variant subdirs (some ffmpeg builds need them) for i in range(n): (out_dir / f"v{i}").mkdir(exist_ok=True) await _run(cmd, on_status, out_dir, "master.m3u8") async def _run(cmd: List[str], on_status, out_dir: Path, entry_filename: str) -> None: await on_status("running") try: proc = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) _stdout, stderr = await proc.communicate() if proc.returncode != 0: err = stderr.decode("utf-8", errors="ignore")[-500:] logger.error(f"ffmpeg failed: {err}") await on_status("failed", error=err[:200]) shutil.rmtree(out_dir, ignore_errors=True) return await on_status("done", entry=entry_filename) except FileNotFoundError: await on_status("failed", error="ffmpeg not installed") except Exception as e: logger.exception("transcode crashed") await on_status("failed", error=str(e)) shutil.rmtree(out_dir, ignore_errors=True) def srt_to_vtt(srt_text: str) -> str: lines = srt_text.replace("\r\n", "\n").split("\n") out = ["WEBVTT", ""] for line in lines: if "-->" in line: out.append(line.replace(",", ".")) else: out.append(line) return "\n".join(out)