diff --git a/scripts/smoke-pipeline.sh b/scripts/smoke-pipeline.sh new file mode 100755 index 0000000..cdacb8f --- /dev/null +++ b/scripts/smoke-pipeline.sh @@ -0,0 +1,53 @@ +#!/usr/bin/env bash +# Pipeline-only smoke test: mirrors host/wayland.rs's gst pipeline but +# substitutes videotestsrc/audiotestsrc for pipewiresrc/pulsesrc and writes +# to a file instead of fdsink, so it runs without the portal/PipeWire and +# without an HTTP/iroh client. Catches AV1 encode + mpegtsmux + AAC mux +# regressions in ~5 seconds without any user interaction. +# +# Usage: scripts/smoke-pipeline.sh +# Exits 0 on pass, non-zero with diagnostic on fail. + +set -euo pipefail + +OUT="${TMPDIR:-/tmp}/pixelpass-smoke-$$.ts" +trap 'rm -f "$OUT"' EXIT + +echo "[smoke] running gst pipeline (videotestsrc + audiotestsrc, ~2s) -> $OUT" +gst-launch-1.0 -q \ + mpegtsmux name=mux \ + ! queue ! filesink location="$OUT" \ + videotestsrc num-buffers=60 is-live=false \ + ! video/x-raw,width=640,height=360,framerate=30/1 \ + ! videoconvert ! video/x-raw,format=NV12 \ + ! vah264enc rate-control=cbr bitrate=2000 key-int-max=60 \ + ! h264parse config-interval=-1 \ + ! video/x-h264,stream-format=byte-stream,alignment=au \ + ! mux. \ + audiotestsrc num-buffers=94 is-live=false \ + ! audioconvert ! audioresample \ + ! audio/x-raw,rate=48000,channels=2 \ + ! avenc_aac bitrate=128000 ! aacparse ! mux. + +[[ -s "$OUT" ]] || { echo "[smoke] FAIL: output file empty or missing"; exit 1; } +echo "[smoke] output size: $(stat -c %s "$OUT") bytes" + +echo "[smoke] ffprobe stream listing" +ffprobe -v error -show_entries stream=index,codec_type,codec_name "$OUT" \ + | sed 's/^/ /' + +# Real gate: mpv must actually decode video. The previous --frames=10 check was +# a false positive — if there's no video stream, --frames silently does nothing +# and mpv exits 0 after playing the audio. We instead grep mpv -v output for +# "video=playing"; "video=eof" means mpv saw no video track. +echo "[smoke] mpv video-decode probe (-v, asserting 'video=playing')" +MPV_LOG=$(mpv --no-config -v --vo=null --ao=null --frames=10 "$OUT" 2>&1) +echo "$MPV_LOG" | grep -E 'video=(playing|eof)' | sed 's/^/ /' +if echo "$MPV_LOG" | grep -q 'video=playing'; then + echo "[smoke] PASS — pipeline produces an mpv-decodable video + audio stream" +else + echo "[smoke] FAIL: mpv did not decode video (saw video=eof or no video state)" + echo "[smoke] last 30 lines of mpv output:" + echo "$MPV_LOG" | tail -30 | sed 's/^/ /' + exit 1 +fi diff --git a/src/common/deps.rs b/src/common/deps.rs index 77be2a4..8c89d84 100644 --- a/src/common/deps.rs +++ b/src/common/deps.rs @@ -5,12 +5,11 @@ use std::process::Command; use crate::common::display::DisplayServer; pub fn check_host_binaries(display: DisplayServer) -> Result<()> { - require("ffmpeg")?; if display == DisplayServer::Wayland { require("gst-launch-1.0")?; require("gst-inspect-1.0")?; require_gst_element("pipewiresrc")?; - require_gst_element("x264enc")?; + require_gst_element("vah264enc")?; require_gst_element("h264parse")?; require_gst_element("mpegtsmux")?; require_gst_element("pulsesrc")?; @@ -56,7 +55,6 @@ fn which(bin: &str) -> Option { fn install_hint_for_bin(bin: &str) -> String { let distro = detect_distro(); let pkg = match bin { - "ffmpeg" => "ffmpeg", "gst-launch-1.0" | "gst-inspect-1.0" => match distro.as_deref() { Some("arch" | "cachyos" | "manjaro" | "endeavouros") => "gstreamer gst-plugins-base", Some("debian" | "ubuntu" | "pop" | "linuxmint") => "gstreamer1.0-tools", @@ -78,12 +76,12 @@ fn install_hint_for_gst_element(name: &str) -> String { Some("opensuse" | "opensuse-tumbleweed" | "opensuse-leap") => "pipewire-gstreamer", _ => "the GStreamer PipeWire plugin", }, - "x264enc" => match distro.as_deref() { - Some("arch" | "cachyos" | "manjaro" | "endeavouros") => "gst-plugins-ugly", - Some("debian" | "ubuntu" | "pop" | "linuxmint") => "gstreamer1.0-plugins-ugly", - Some("fedora" | "nobara") => "gstreamer1-plugins-ugly", - Some("opensuse" | "opensuse-tumbleweed" | "opensuse-leap") => "gstreamer-plugins-ugly", - _ => "the GStreamer x264 plugin", + "vah264enc" => match distro.as_deref() { + Some("arch" | "cachyos" | "manjaro" | "endeavouros") => "gst-plugin-va", + Some("debian" | "ubuntu" | "pop" | "linuxmint") => "gstreamer1.0-plugins-bad", + Some("fedora" | "nobara") => "gstreamer1-plugins-bad-free", + Some("opensuse" | "opensuse-tumbleweed" | "opensuse-leap") => "gstreamer-plugins-bad", + _ => "the GStreamer VA-API plugin (requires an H.264-capable GPU; almost all modern GPUs)", }, "h264parse" | "mpegtsmux" | "aacparse" => match distro.as_deref() { Some("arch" | "cachyos" | "manjaro" | "endeavouros") => "gst-plugins-bad", diff --git a/src/host/wayland.rs b/src/host/wayland.rs index abe7a1c..7d17677 100644 --- a/src/host/wayland.rs +++ b/src/host/wayland.rs @@ -1,5 +1,7 @@ //! Wayland capture: ashpd ScreenCast portal → PipeWire fd → gst-launch -//! pipewiresrc → fdsink stdout → ffmpeg stdin → MPEG-TS over `-listen 1` HTTP. +//! pipewiresrc → MPEG-TS on gst stdout → in-process HTTP server bound on a +//! random localhost port. The host bridge TCP-connects to that server and +//! pumps bytes to QUIC. use anyhow::{Context, Result, bail}; use ashpd::{ @@ -12,11 +14,13 @@ use ashpd::{ use nix::fcntl::{FcntlArg, FdFlag, fcntl}; use nix::sys::signal::{Signal, kill}; use nix::unistd::{Pid, close}; -use std::net::TcpListener as StdTcpListener; use std::os::fd::{AsFd, IntoRawFd, OwnedFd, RawFd}; use std::process::Stdio; use std::time::Duration; -use tokio::process::{Child, Command}; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::net::{TcpListener, TcpStream}; +use tokio::process::{Child, ChildStdout, Command}; +use tokio::task::JoinHandle; use tokio::time::{Instant, sleep, timeout}; use crate::cli::HostOpts; @@ -24,7 +28,7 @@ use crate::cli::HostOpts; pub struct CaptureHandle { port: u16, gst: Option, - ffmpeg: Option, + server: Option>, } impl CaptureHandle { @@ -32,32 +36,32 @@ impl CaptureHandle { self.port } - /// Graceful teardown: SIGTERM both children, give them ~1s to exit, then - /// SIGKILL. Call this before dropping; Drop only fires the kill backstop. + /// Graceful teardown: SIGTERM gst, give it ~1s to exit, then SIGKILL, then + /// abort the HTTP server task. Call this before dropping; Drop only fires + /// the kill backstop. pub async fn shutdown(mut self) { - for opt in [&mut self.ffmpeg, &mut self.gst] { - if let Some(child) = opt.as_mut() - && let Some(pid) = child.id() - { - let _ = kill(Pid::from_raw(pid as i32), Signal::SIGTERM); - } + if let Some(child) = self.gst.as_mut() + && let Some(pid) = child.id() + { + let _ = kill(Pid::from_raw(pid as i32), Signal::SIGTERM); } - - for opt in [&mut self.ffmpeg, &mut self.gst] { - if let Some(child) = opt.as_mut() { - let _ = timeout(Duration::from_millis(1000), child.wait()).await; - let _ = child.start_kill(); - } + if let Some(child) = self.gst.as_mut() { + let _ = timeout(Duration::from_millis(1000), child.wait()).await; + let _ = child.start_kill(); + } + if let Some(task) = self.server.take() { + task.abort(); } } } impl Drop for CaptureHandle { fn drop(&mut self) { - for opt in [&mut self.ffmpeg, &mut self.gst] { - if let Some(child) = opt.as_mut() { - let _ = child.start_kill(); - } + if let Some(child) = self.gst.as_mut() { + let _ = child.start_kill(); + } + if let Some(task) = self.server.as_ref() { + task.abort(); } } } @@ -105,13 +109,16 @@ pub async fn start(opts: &HostOpts) -> Result { clear_cloexec(&pw_fd)?; let raw_fd: RawFd = pw_fd.into_raw_fd(); - // 2. Reserve a localhost port for ffmpeg's HTTP listener. - let port = pick_random_port()?; + // 2. Bind the in-process HTTP listener on a random localhost port. + let listener = TcpListener::bind("127.0.0.1:0") + .await + .context("could not bind local capture HTTP listener")?; + let port = listener.local_addr()?.port(); // 3. Spawn gst-launch with the full pipeline: video AND audio captured, - // encoded, and muxed into MPEG-TS inside gst. ffmpeg becomes a dumb - // pass-through HTTP server (`-c copy`), which avoids ffmpeg's input - // analysis stalls and timestamp-generation guesswork. + // encoded, and muxed into MPEG-TS inside gst. Output goes to stdout, + // which we pipe straight to our HTTP server task — no demux/remux, + // no codec assumptions. let key_interval = (opts.framerate * 2).to_string(); let bitrate = opts.bitrate.to_string(); let mut gst_cmd = Command::new("gst-launch-1.0"); @@ -135,16 +142,13 @@ pub async fn start(opts: &HostOpts) -> Result { "!", "videoconvert", "!", - "video/x-raw,format=I420", + "video/x-raw,format=NV12", "!", - "x264enc", - "tune=zerolatency", - "speed-preset=ultrafast", + "vah264enc", + "rate-control=cbr", &format!("bitrate={bitrate}"), &format!("key-int-max={key_interval}"), "!", - "video/x-h264,profile=baseline", - "!", "h264parse", "config-interval=-1", "!", @@ -184,59 +188,61 @@ pub async fn start(opts: &HostOpts) -> Result { .stdout .take() .context("gst-launch-1.0 stdout pipe unavailable")?; - let ffmpeg_stdin: Stdio = gst_stdout - .try_into() - .context("could not convert gst stdout into ffmpeg stdin")?; - // 4. ffmpeg: re-mux pre-encoded H.264 + add pulse audio → MPEG-TS HTTP. - // Width/height/framerate are embedded in the h264 stream; ffmpeg - // doesn't need our portal-reported dimensions. - let url = format!("http://127.0.0.1:{port}"); - let _ = (w, h); - - let ffmpeg = Command::new("ffmpeg") - .stdin(ffmpeg_stdin) - .stdout(Stdio::null()) - .stderr(Stdio::inherit()) - .args([ - "-loglevel", - "warning", - "-fflags", - "nobuffer+discardcorrupt+genpts", - "-flags", - "low_delay", - "-analyzeduration", - "0", - "-probesize", - "32", - "-f", - "mpegts", - "-i", - "pipe:0", - "-c", - "copy", - "-f", - "mpegts", - "-listen", - "1", - &url, - ]) - .spawn() - .context("failed to spawn ffmpeg")?; + // 4. Spawn the HTTP server task. It owns the listener + gst stdout: it + // accepts one client (the host's bridge socket via connect_to_capture), + // drains the HTTP request, writes a fixed MPEG-TS response, then + // copies gst stdout to the socket forever. + let server = tokio::spawn(serve_capture(listener, gst_stdout)); Ok(CaptureHandle { port, gst: Some(gst), - ffmpeg: Some(ffmpeg), + server: Some(server), }) } -fn pick_random_port() -> Result { - let listener = StdTcpListener::bind("127.0.0.1:0") - .context("could not pick a local ephemeral port")?; - let port = listener.local_addr()?.port(); - drop(listener); - Ok(port) +async fn serve_capture(listener: TcpListener, mut gst_stdout: ChildStdout) { + let mut sock = match listener.accept().await { + Ok((s, _)) => s, + Err(e) => { + tracing::warn!("capture HTTP accept failed: {e}"); + return; + } + }; + + if !drain_http_request(&mut sock).await { + return; + } + + const RESPONSE: &[u8] = b"HTTP/1.1 200 OK\r\n\ + Content-Type: video/mp2t\r\n\ + Cache-Control: no-cache, no-store\r\n\ + Connection: close\r\n\ + \r\n"; + if sock.write_all(RESPONSE).await.is_err() { + return; + } + + let _ = tokio::io::copy(&mut gst_stdout, &mut sock).await; +} + +async fn drain_http_request(sock: &mut TcpStream) -> bool { + let mut buf = [0u8; 1024]; + let mut total = Vec::with_capacity(512); + loop { + match sock.read(&mut buf).await { + Ok(0) => return false, + Ok(n) => total.extend_from_slice(&buf[..n]), + Err(_) => return false, + } + if total.windows(4).any(|w| w == b"\r\n\r\n") { + return true; + } + if total.len() > 16 * 1024 { + return false; + } + } } fn clear_cloexec(fd: &impl AsFd) -> Result<()> { @@ -247,18 +253,18 @@ fn clear_cloexec(fd: &impl AsFd) -> Result<()> { Ok(()) } -/// Connect to ffmpeg's `-listen 1` HTTP listener, retrying until it's up or we -/// time out. Returns the connected socket — `-listen 1` is a one-shot listener -/// so this stream IS the bridge socket; don't probe and discard. -pub async fn connect_to_capture(port: u16, max_wait: Duration) -> Result { +/// Connect to the in-process capture HTTP listener, retrying until it's up or +/// we time out. Returns the connected socket — the listener accepts exactly +/// one connection (the bridge socket), so this stream IS the bridge socket. +pub async fn connect_to_capture(port: u16, max_wait: Duration) -> Result { let deadline = Instant::now() + max_wait; loop { - match tokio::net::TcpStream::connect(("127.0.0.1", port)).await { + match TcpStream::connect(("127.0.0.1", port)).await { Ok(stream) => return Ok(stream), Err(_) if Instant::now() < deadline => { sleep(Duration::from_millis(50)).await; } - Err(e) => bail!("ffmpeg HTTP listener never came up on 127.0.0.1:{port}: {e}"), + Err(e) => bail!("capture HTTP listener never came up on 127.0.0.1:{port}: {e}"), } } }