Wayland: VAAPI H.264 + in-process HTTP server, drop ffmpeg

The previous ffmpeg-as-HTTP-server pipeline shape held back two
improvements at once. ffmpeg as the runtime server lost a one-shot
`-listen 1` accept to a probe-and-discard health check, and forced
us to size analyze/probe budgets carefully so ffmpeg would serve
before our deadline. Replacing it with a small tokio task that
accepts once, drains the HTTP request, writes a fixed 200 OK, then
`tokio::io::copy`s gst stdout to the socket removes all of that.

VAAPI H.264 (vah264enc) drops CPU encode from ~50% of a core to
single-digit %. An earlier attempt at vaav1enc had to be abandoned:
libavformat cannot demux AV1-in-MPEG-TS with the custom mapping
even with a 20MB probe budget — mpv reports video=eof. H.264 keeps
the hardware win on the well-trodden demuxer path.

scripts/smoke-pipeline.sh mirrors the runtime pipeline with
videotestsrc/audiotestsrc into a file and asserts that mpv reports
`video=playing` (not video=eof). The naive --frames=10 check was
a false positive when no video stream is recognized; the verbose
grep is the real gate.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
2026-05-18 02:29:56 -04:00
parent 75a01a361e
commit 7b8b6bcd0c
3 changed files with 150 additions and 93 deletions
+53
View File
@@ -0,0 +1,53 @@
#!/usr/bin/env bash
# Pipeline-only smoke test: mirrors host/wayland.rs's gst pipeline but
# substitutes videotestsrc/audiotestsrc for pipewiresrc/pulsesrc and writes
# to a file instead of fdsink, so it runs without the portal/PipeWire and
# without an HTTP/iroh client. Catches AV1 encode + mpegtsmux + AAC mux
# regressions in ~5 seconds without any user interaction.
#
# Usage: scripts/smoke-pipeline.sh
# Exits 0 on pass, non-zero with diagnostic on fail.
set -euo pipefail
OUT="${TMPDIR:-/tmp}/pixelpass-smoke-$$.ts"
trap 'rm -f "$OUT"' EXIT
echo "[smoke] running gst pipeline (videotestsrc + audiotestsrc, ~2s) -> $OUT"
gst-launch-1.0 -q \
mpegtsmux name=mux \
! queue ! filesink location="$OUT" \
videotestsrc num-buffers=60 is-live=false \
! video/x-raw,width=640,height=360,framerate=30/1 \
! videoconvert ! video/x-raw,format=NV12 \
! vah264enc rate-control=cbr bitrate=2000 key-int-max=60 \
! h264parse config-interval=-1 \
! video/x-h264,stream-format=byte-stream,alignment=au \
! mux. \
audiotestsrc num-buffers=94 is-live=false \
! audioconvert ! audioresample \
! audio/x-raw,rate=48000,channels=2 \
! avenc_aac bitrate=128000 ! aacparse ! mux.
[[ -s "$OUT" ]] || { echo "[smoke] FAIL: output file empty or missing"; exit 1; }
echo "[smoke] output size: $(stat -c %s "$OUT") bytes"
echo "[smoke] ffprobe stream listing"
ffprobe -v error -show_entries stream=index,codec_type,codec_name "$OUT" \
| sed 's/^/ /'
# Real gate: mpv must actually decode video. The previous --frames=10 check was
# a false positive — if there's no video stream, --frames silently does nothing
# and mpv exits 0 after playing the audio. We instead grep mpv -v output for
# "video=playing"; "video=eof" means mpv saw no video track.
echo "[smoke] mpv video-decode probe (-v, asserting 'video=playing')"
MPV_LOG=$(mpv --no-config -v --vo=null --ao=null --frames=10 "$OUT" 2>&1)
echo "$MPV_LOG" | grep -E 'video=(playing|eof)' | sed 's/^/ /'
if echo "$MPV_LOG" | grep -q 'video=playing'; then
echo "[smoke] PASS — pipeline produces an mpv-decodable video + audio stream"
else
echo "[smoke] FAIL: mpv did not decode video (saw video=eof or no video state)"
echo "[smoke] last 30 lines of mpv output:"
echo "$MPV_LOG" | tail -30 | sed 's/^/ /'
exit 1
fi
+7 -9
View File
@@ -5,12 +5,11 @@ use std::process::Command;
use crate::common::display::DisplayServer; use crate::common::display::DisplayServer;
pub fn check_host_binaries(display: DisplayServer) -> Result<()> { pub fn check_host_binaries(display: DisplayServer) -> Result<()> {
require("ffmpeg")?;
if display == DisplayServer::Wayland { if display == DisplayServer::Wayland {
require("gst-launch-1.0")?; require("gst-launch-1.0")?;
require("gst-inspect-1.0")?; require("gst-inspect-1.0")?;
require_gst_element("pipewiresrc")?; require_gst_element("pipewiresrc")?;
require_gst_element("x264enc")?; require_gst_element("vah264enc")?;
require_gst_element("h264parse")?; require_gst_element("h264parse")?;
require_gst_element("mpegtsmux")?; require_gst_element("mpegtsmux")?;
require_gst_element("pulsesrc")?; require_gst_element("pulsesrc")?;
@@ -56,7 +55,6 @@ fn which(bin: &str) -> Option<PathBuf> {
fn install_hint_for_bin(bin: &str) -> String { fn install_hint_for_bin(bin: &str) -> String {
let distro = detect_distro(); let distro = detect_distro();
let pkg = match bin { let pkg = match bin {
"ffmpeg" => "ffmpeg",
"gst-launch-1.0" | "gst-inspect-1.0" => match distro.as_deref() { "gst-launch-1.0" | "gst-inspect-1.0" => match distro.as_deref() {
Some("arch" | "cachyos" | "manjaro" | "endeavouros") => "gstreamer gst-plugins-base", Some("arch" | "cachyos" | "manjaro" | "endeavouros") => "gstreamer gst-plugins-base",
Some("debian" | "ubuntu" | "pop" | "linuxmint") => "gstreamer1.0-tools", Some("debian" | "ubuntu" | "pop" | "linuxmint") => "gstreamer1.0-tools",
@@ -78,12 +76,12 @@ fn install_hint_for_gst_element(name: &str) -> String {
Some("opensuse" | "opensuse-tumbleweed" | "opensuse-leap") => "pipewire-gstreamer", Some("opensuse" | "opensuse-tumbleweed" | "opensuse-leap") => "pipewire-gstreamer",
_ => "the GStreamer PipeWire plugin", _ => "the GStreamer PipeWire plugin",
}, },
"x264enc" => match distro.as_deref() { "vah264enc" => match distro.as_deref() {
Some("arch" | "cachyos" | "manjaro" | "endeavouros") => "gst-plugins-ugly", Some("arch" | "cachyos" | "manjaro" | "endeavouros") => "gst-plugin-va",
Some("debian" | "ubuntu" | "pop" | "linuxmint") => "gstreamer1.0-plugins-ugly", Some("debian" | "ubuntu" | "pop" | "linuxmint") => "gstreamer1.0-plugins-bad",
Some("fedora" | "nobara") => "gstreamer1-plugins-ugly", Some("fedora" | "nobara") => "gstreamer1-plugins-bad-free",
Some("opensuse" | "opensuse-tumbleweed" | "opensuse-leap") => "gstreamer-plugins-ugly", Some("opensuse" | "opensuse-tumbleweed" | "opensuse-leap") => "gstreamer-plugins-bad",
_ => "the GStreamer x264 plugin", _ => "the GStreamer VA-API plugin (requires an H.264-capable GPU; almost all modern GPUs)",
}, },
"h264parse" | "mpegtsmux" | "aacparse" => match distro.as_deref() { "h264parse" | "mpegtsmux" | "aacparse" => match distro.as_deref() {
Some("arch" | "cachyos" | "manjaro" | "endeavouros") => "gst-plugins-bad", Some("arch" | "cachyos" | "manjaro" | "endeavouros") => "gst-plugins-bad",
+90 -84
View File
@@ -1,5 +1,7 @@
//! Wayland capture: ashpd ScreenCast portal → PipeWire fd → gst-launch //! Wayland capture: ashpd ScreenCast portal → PipeWire fd → gst-launch
//! pipewiresrc → fdsink stdout → ffmpeg stdin → MPEG-TS over `-listen 1` HTTP. //! pipewiresrc → MPEG-TS on gst stdout → in-process HTTP server bound on a
//! random localhost port. The host bridge TCP-connects to that server and
//! pumps bytes to QUIC.
use anyhow::{Context, Result, bail}; use anyhow::{Context, Result, bail};
use ashpd::{ use ashpd::{
@@ -12,11 +14,13 @@ use ashpd::{
use nix::fcntl::{FcntlArg, FdFlag, fcntl}; use nix::fcntl::{FcntlArg, FdFlag, fcntl};
use nix::sys::signal::{Signal, kill}; use nix::sys::signal::{Signal, kill};
use nix::unistd::{Pid, close}; use nix::unistd::{Pid, close};
use std::net::TcpListener as StdTcpListener;
use std::os::fd::{AsFd, IntoRawFd, OwnedFd, RawFd}; use std::os::fd::{AsFd, IntoRawFd, OwnedFd, RawFd};
use std::process::Stdio; use std::process::Stdio;
use std::time::Duration; use std::time::Duration;
use tokio::process::{Child, Command}; use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};
use tokio::process::{Child, ChildStdout, Command};
use tokio::task::JoinHandle;
use tokio::time::{Instant, sleep, timeout}; use tokio::time::{Instant, sleep, timeout};
use crate::cli::HostOpts; use crate::cli::HostOpts;
@@ -24,7 +28,7 @@ use crate::cli::HostOpts;
pub struct CaptureHandle { pub struct CaptureHandle {
port: u16, port: u16,
gst: Option<Child>, gst: Option<Child>,
ffmpeg: Option<Child>, server: Option<JoinHandle<()>>,
} }
impl CaptureHandle { impl CaptureHandle {
@@ -32,32 +36,32 @@ impl CaptureHandle {
self.port self.port
} }
/// Graceful teardown: SIGTERM both children, give them ~1s to exit, then /// Graceful teardown: SIGTERM gst, give it ~1s to exit, then SIGKILL, then
/// SIGKILL. Call this before dropping; Drop only fires the kill backstop. /// abort the HTTP server task. Call this before dropping; Drop only fires
/// the kill backstop.
pub async fn shutdown(mut self) { pub async fn shutdown(mut self) {
for opt in [&mut self.ffmpeg, &mut self.gst] { if let Some(child) = self.gst.as_mut()
if let Some(child) = opt.as_mut() && let Some(pid) = child.id()
&& let Some(pid) = child.id() {
{ let _ = kill(Pid::from_raw(pid as i32), Signal::SIGTERM);
let _ = kill(Pid::from_raw(pid as i32), Signal::SIGTERM);
}
} }
if let Some(child) = self.gst.as_mut() {
for opt in [&mut self.ffmpeg, &mut self.gst] { let _ = timeout(Duration::from_millis(1000), child.wait()).await;
if let Some(child) = opt.as_mut() { let _ = child.start_kill();
let _ = timeout(Duration::from_millis(1000), child.wait()).await; }
let _ = child.start_kill(); if let Some(task) = self.server.take() {
} task.abort();
} }
} }
} }
impl Drop for CaptureHandle { impl Drop for CaptureHandle {
fn drop(&mut self) { fn drop(&mut self) {
for opt in [&mut self.ffmpeg, &mut self.gst] { if let Some(child) = self.gst.as_mut() {
if let Some(child) = opt.as_mut() { let _ = child.start_kill();
let _ = child.start_kill(); }
} if let Some(task) = self.server.as_ref() {
task.abort();
} }
} }
} }
@@ -105,13 +109,16 @@ pub async fn start(opts: &HostOpts) -> Result<CaptureHandle> {
clear_cloexec(&pw_fd)?; clear_cloexec(&pw_fd)?;
let raw_fd: RawFd = pw_fd.into_raw_fd(); let raw_fd: RawFd = pw_fd.into_raw_fd();
// 2. Reserve a localhost port for ffmpeg's HTTP listener. // 2. Bind the in-process HTTP listener on a random localhost port.
let port = pick_random_port()?; let listener = TcpListener::bind("127.0.0.1:0")
.await
.context("could not bind local capture HTTP listener")?;
let port = listener.local_addr()?.port();
// 3. Spawn gst-launch with the full pipeline: video AND audio captured, // 3. Spawn gst-launch with the full pipeline: video AND audio captured,
// encoded, and muxed into MPEG-TS inside gst. ffmpeg becomes a dumb // encoded, and muxed into MPEG-TS inside gst. Output goes to stdout,
// pass-through HTTP server (`-c copy`), which avoids ffmpeg's input // which we pipe straight to our HTTP server task — no demux/remux,
// analysis stalls and timestamp-generation guesswork. // no codec assumptions.
let key_interval = (opts.framerate * 2).to_string(); let key_interval = (opts.framerate * 2).to_string();
let bitrate = opts.bitrate.to_string(); let bitrate = opts.bitrate.to_string();
let mut gst_cmd = Command::new("gst-launch-1.0"); let mut gst_cmd = Command::new("gst-launch-1.0");
@@ -135,16 +142,13 @@ pub async fn start(opts: &HostOpts) -> Result<CaptureHandle> {
"!", "!",
"videoconvert", "videoconvert",
"!", "!",
"video/x-raw,format=I420", "video/x-raw,format=NV12",
"!", "!",
"x264enc", "vah264enc",
"tune=zerolatency", "rate-control=cbr",
"speed-preset=ultrafast",
&format!("bitrate={bitrate}"), &format!("bitrate={bitrate}"),
&format!("key-int-max={key_interval}"), &format!("key-int-max={key_interval}"),
"!", "!",
"video/x-h264,profile=baseline",
"!",
"h264parse", "h264parse",
"config-interval=-1", "config-interval=-1",
"!", "!",
@@ -184,59 +188,61 @@ pub async fn start(opts: &HostOpts) -> Result<CaptureHandle> {
.stdout .stdout
.take() .take()
.context("gst-launch-1.0 stdout pipe unavailable")?; .context("gst-launch-1.0 stdout pipe unavailable")?;
let ffmpeg_stdin: Stdio = gst_stdout
.try_into()
.context("could not convert gst stdout into ffmpeg stdin")?;
// 4. ffmpeg: re-mux pre-encoded H.264 + add pulse audio → MPEG-TS HTTP. // 4. Spawn the HTTP server task. It owns the listener + gst stdout: it
// Width/height/framerate are embedded in the h264 stream; ffmpeg // accepts one client (the host's bridge socket via connect_to_capture),
// doesn't need our portal-reported dimensions. // drains the HTTP request, writes a fixed MPEG-TS response, then
let url = format!("http://127.0.0.1:{port}"); // copies gst stdout to the socket forever.
let _ = (w, h); let server = tokio::spawn(serve_capture(listener, gst_stdout));
let ffmpeg = Command::new("ffmpeg")
.stdin(ffmpeg_stdin)
.stdout(Stdio::null())
.stderr(Stdio::inherit())
.args([
"-loglevel",
"warning",
"-fflags",
"nobuffer+discardcorrupt+genpts",
"-flags",
"low_delay",
"-analyzeduration",
"0",
"-probesize",
"32",
"-f",
"mpegts",
"-i",
"pipe:0",
"-c",
"copy",
"-f",
"mpegts",
"-listen",
"1",
&url,
])
.spawn()
.context("failed to spawn ffmpeg")?;
Ok(CaptureHandle { Ok(CaptureHandle {
port, port,
gst: Some(gst), gst: Some(gst),
ffmpeg: Some(ffmpeg), server: Some(server),
}) })
} }
fn pick_random_port() -> Result<u16> { async fn serve_capture(listener: TcpListener, mut gst_stdout: ChildStdout) {
let listener = StdTcpListener::bind("127.0.0.1:0") let mut sock = match listener.accept().await {
.context("could not pick a local ephemeral port")?; Ok((s, _)) => s,
let port = listener.local_addr()?.port(); Err(e) => {
drop(listener); tracing::warn!("capture HTTP accept failed: {e}");
Ok(port) return;
}
};
if !drain_http_request(&mut sock).await {
return;
}
const RESPONSE: &[u8] = b"HTTP/1.1 200 OK\r\n\
Content-Type: video/mp2t\r\n\
Cache-Control: no-cache, no-store\r\n\
Connection: close\r\n\
\r\n";
if sock.write_all(RESPONSE).await.is_err() {
return;
}
let _ = tokio::io::copy(&mut gst_stdout, &mut sock).await;
}
async fn drain_http_request(sock: &mut TcpStream) -> bool {
let mut buf = [0u8; 1024];
let mut total = Vec::with_capacity(512);
loop {
match sock.read(&mut buf).await {
Ok(0) => return false,
Ok(n) => total.extend_from_slice(&buf[..n]),
Err(_) => return false,
}
if total.windows(4).any(|w| w == b"\r\n\r\n") {
return true;
}
if total.len() > 16 * 1024 {
return false;
}
}
} }
fn clear_cloexec(fd: &impl AsFd) -> Result<()> { fn clear_cloexec(fd: &impl AsFd) -> Result<()> {
@@ -247,18 +253,18 @@ fn clear_cloexec(fd: &impl AsFd) -> Result<()> {
Ok(()) Ok(())
} }
/// Connect to ffmpeg's `-listen 1` HTTP listener, retrying until it's up or we /// Connect to the in-process capture HTTP listener, retrying until it's up or
/// time out. Returns the connected socket — `-listen 1` is a one-shot listener /// we time out. Returns the connected socket — the listener accepts exactly
/// so this stream IS the bridge socket; don't probe and discard. /// one connection (the bridge socket), so this stream IS the bridge socket.
pub async fn connect_to_capture(port: u16, max_wait: Duration) -> Result<tokio::net::TcpStream> { pub async fn connect_to_capture(port: u16, max_wait: Duration) -> Result<TcpStream> {
let deadline = Instant::now() + max_wait; let deadline = Instant::now() + max_wait;
loop { loop {
match tokio::net::TcpStream::connect(("127.0.0.1", port)).await { match TcpStream::connect(("127.0.0.1", port)).await {
Ok(stream) => return Ok(stream), Ok(stream) => return Ok(stream),
Err(_) if Instant::now() < deadline => { Err(_) if Instant::now() < deadline => {
sleep(Duration::from_millis(50)).await; sleep(Duration::from_millis(50)).await;
} }
Err(e) => bail!("ffmpeg HTTP listener never came up on 127.0.0.1:{port}: {e}"), Err(e) => bail!("capture HTTP listener never came up on 127.0.0.1:{port}: {e}"),
} }
} }
} }