Wayland: VAAPI H.264 + in-process HTTP server, drop ffmpeg

The previous ffmpeg-as-HTTP-server pipeline shape held back two
improvements at once. ffmpeg as the runtime server lost a one-shot
`-listen 1` accept to a probe-and-discard health check, and forced
us to size analyze/probe budgets carefully so ffmpeg would serve
before our deadline. Replacing it with a small tokio task that
accepts once, drains the HTTP request, writes a fixed 200 OK, then
`tokio::io::copy`s gst stdout to the socket removes all of that.

VAAPI H.264 (vah264enc) drops CPU encode from ~50% of a core to
single-digit %. An earlier attempt at vaav1enc had to be abandoned:
libavformat cannot demux AV1-in-MPEG-TS with the custom mapping
even with a 20MB probe budget — mpv reports video=eof. H.264 keeps
the hardware win on the well-trodden demuxer path.

scripts/smoke-pipeline.sh mirrors the runtime pipeline with
videotestsrc/audiotestsrc into a file and asserts that mpv reports
`video=playing` (not video=eof). The naive --frames=10 check was
a false positive when no video stream is recognized; the verbose
grep is the real gate.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
2026-05-18 02:29:56 -04:00
parent 75a01a361e
commit 7b8b6bcd0c
3 changed files with 150 additions and 93 deletions
+53
View File
@@ -0,0 +1,53 @@
#!/usr/bin/env bash
# Pipeline-only smoke test: mirrors host/wayland.rs's gst pipeline but
# substitutes videotestsrc/audiotestsrc for pipewiresrc/pulsesrc and writes
# to a file instead of fdsink, so it runs without the portal/PipeWire and
# without an HTTP/iroh client. Catches AV1 encode + mpegtsmux + AAC mux
# regressions in ~5 seconds without any user interaction.
#
# Usage: scripts/smoke-pipeline.sh
# Exits 0 on pass, non-zero with diagnostic on fail.
set -euo pipefail
OUT="${TMPDIR:-/tmp}/pixelpass-smoke-$$.ts"
trap 'rm -f "$OUT"' EXIT
echo "[smoke] running gst pipeline (videotestsrc + audiotestsrc, ~2s) -> $OUT"
gst-launch-1.0 -q \
mpegtsmux name=mux \
! queue ! filesink location="$OUT" \
videotestsrc num-buffers=60 is-live=false \
! video/x-raw,width=640,height=360,framerate=30/1 \
! videoconvert ! video/x-raw,format=NV12 \
! vah264enc rate-control=cbr bitrate=2000 key-int-max=60 \
! h264parse config-interval=-1 \
! video/x-h264,stream-format=byte-stream,alignment=au \
! mux. \
audiotestsrc num-buffers=94 is-live=false \
! audioconvert ! audioresample \
! audio/x-raw,rate=48000,channels=2 \
! avenc_aac bitrate=128000 ! aacparse ! mux.
[[ -s "$OUT" ]] || { echo "[smoke] FAIL: output file empty or missing"; exit 1; }
echo "[smoke] output size: $(stat -c %s "$OUT") bytes"
echo "[smoke] ffprobe stream listing"
ffprobe -v error -show_entries stream=index,codec_type,codec_name "$OUT" \
| sed 's/^/ /'
# Real gate: mpv must actually decode video. The previous --frames=10 check was
# a false positive — if there's no video stream, --frames silently does nothing
# and mpv exits 0 after playing the audio. We instead grep mpv -v output for
# "video=playing"; "video=eof" means mpv saw no video track.
echo "[smoke] mpv video-decode probe (-v, asserting 'video=playing')"
MPV_LOG=$(mpv --no-config -v --vo=null --ao=null --frames=10 "$OUT" 2>&1)
echo "$MPV_LOG" | grep -E 'video=(playing|eof)' | sed 's/^/ /'
if echo "$MPV_LOG" | grep -q 'video=playing'; then
echo "[smoke] PASS — pipeline produces an mpv-decodable video + audio stream"
else
echo "[smoke] FAIL: mpv did not decode video (saw video=eof or no video state)"
echo "[smoke] last 30 lines of mpv output:"
echo "$MPV_LOG" | tail -30 | sed 's/^/ /'
exit 1
fi
+7 -9
View File
@@ -5,12 +5,11 @@ use std::process::Command;
use crate::common::display::DisplayServer;
pub fn check_host_binaries(display: DisplayServer) -> Result<()> {
require("ffmpeg")?;
if display == DisplayServer::Wayland {
require("gst-launch-1.0")?;
require("gst-inspect-1.0")?;
require_gst_element("pipewiresrc")?;
require_gst_element("x264enc")?;
require_gst_element("vah264enc")?;
require_gst_element("h264parse")?;
require_gst_element("mpegtsmux")?;
require_gst_element("pulsesrc")?;
@@ -56,7 +55,6 @@ fn which(bin: &str) -> Option<PathBuf> {
fn install_hint_for_bin(bin: &str) -> String {
let distro = detect_distro();
let pkg = match bin {
"ffmpeg" => "ffmpeg",
"gst-launch-1.0" | "gst-inspect-1.0" => match distro.as_deref() {
Some("arch" | "cachyos" | "manjaro" | "endeavouros") => "gstreamer gst-plugins-base",
Some("debian" | "ubuntu" | "pop" | "linuxmint") => "gstreamer1.0-tools",
@@ -78,12 +76,12 @@ fn install_hint_for_gst_element(name: &str) -> String {
Some("opensuse" | "opensuse-tumbleweed" | "opensuse-leap") => "pipewire-gstreamer",
_ => "the GStreamer PipeWire plugin",
},
"x264enc" => match distro.as_deref() {
Some("arch" | "cachyos" | "manjaro" | "endeavouros") => "gst-plugins-ugly",
Some("debian" | "ubuntu" | "pop" | "linuxmint") => "gstreamer1.0-plugins-ugly",
Some("fedora" | "nobara") => "gstreamer1-plugins-ugly",
Some("opensuse" | "opensuse-tumbleweed" | "opensuse-leap") => "gstreamer-plugins-ugly",
_ => "the GStreamer x264 plugin",
"vah264enc" => match distro.as_deref() {
Some("arch" | "cachyos" | "manjaro" | "endeavouros") => "gst-plugin-va",
Some("debian" | "ubuntu" | "pop" | "linuxmint") => "gstreamer1.0-plugins-bad",
Some("fedora" | "nobara") => "gstreamer1-plugins-bad-free",
Some("opensuse" | "opensuse-tumbleweed" | "opensuse-leap") => "gstreamer-plugins-bad",
_ => "the GStreamer VA-API plugin (requires an H.264-capable GPU; almost all modern GPUs)",
},
"h264parse" | "mpegtsmux" | "aacparse" => match distro.as_deref() {
Some("arch" | "cachyos" | "manjaro" | "endeavouros") => "gst-plugins-bad",
+90 -84
View File
@@ -1,5 +1,7 @@
//! Wayland capture: ashpd ScreenCast portal → PipeWire fd → gst-launch
//! pipewiresrc → fdsink stdout → ffmpeg stdin → MPEG-TS over `-listen 1` HTTP.
//! pipewiresrc → MPEG-TS on gst stdout → in-process HTTP server bound on a
//! random localhost port. The host bridge TCP-connects to that server and
//! pumps bytes to QUIC.
use anyhow::{Context, Result, bail};
use ashpd::{
@@ -12,11 +14,13 @@ use ashpd::{
use nix::fcntl::{FcntlArg, FdFlag, fcntl};
use nix::sys::signal::{Signal, kill};
use nix::unistd::{Pid, close};
use std::net::TcpListener as StdTcpListener;
use std::os::fd::{AsFd, IntoRawFd, OwnedFd, RawFd};
use std::process::Stdio;
use std::time::Duration;
use tokio::process::{Child, Command};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};
use tokio::process::{Child, ChildStdout, Command};
use tokio::task::JoinHandle;
use tokio::time::{Instant, sleep, timeout};
use crate::cli::HostOpts;
@@ -24,7 +28,7 @@ use crate::cli::HostOpts;
pub struct CaptureHandle {
port: u16,
gst: Option<Child>,
ffmpeg: Option<Child>,
server: Option<JoinHandle<()>>,
}
impl CaptureHandle {
@@ -32,32 +36,32 @@ impl CaptureHandle {
self.port
}
/// Graceful teardown: SIGTERM both children, give them ~1s to exit, then
/// SIGKILL. Call this before dropping; Drop only fires the kill backstop.
/// Graceful teardown: SIGTERM gst, give it ~1s to exit, then SIGKILL, then
/// abort the HTTP server task. Call this before dropping; Drop only fires
/// the kill backstop.
pub async fn shutdown(mut self) {
for opt in [&mut self.ffmpeg, &mut self.gst] {
if let Some(child) = opt.as_mut()
&& let Some(pid) = child.id()
{
let _ = kill(Pid::from_raw(pid as i32), Signal::SIGTERM);
}
if let Some(child) = self.gst.as_mut()
&& let Some(pid) = child.id()
{
let _ = kill(Pid::from_raw(pid as i32), Signal::SIGTERM);
}
for opt in [&mut self.ffmpeg, &mut self.gst] {
if let Some(child) = opt.as_mut() {
let _ = timeout(Duration::from_millis(1000), child.wait()).await;
let _ = child.start_kill();
}
if let Some(child) = self.gst.as_mut() {
let _ = timeout(Duration::from_millis(1000), child.wait()).await;
let _ = child.start_kill();
}
if let Some(task) = self.server.take() {
task.abort();
}
}
}
impl Drop for CaptureHandle {
fn drop(&mut self) {
for opt in [&mut self.ffmpeg, &mut self.gst] {
if let Some(child) = opt.as_mut() {
let _ = child.start_kill();
}
if let Some(child) = self.gst.as_mut() {
let _ = child.start_kill();
}
if let Some(task) = self.server.as_ref() {
task.abort();
}
}
}
@@ -105,13 +109,16 @@ pub async fn start(opts: &HostOpts) -> Result<CaptureHandle> {
clear_cloexec(&pw_fd)?;
let raw_fd: RawFd = pw_fd.into_raw_fd();
// 2. Reserve a localhost port for ffmpeg's HTTP listener.
let port = pick_random_port()?;
// 2. Bind the in-process HTTP listener on a random localhost port.
let listener = TcpListener::bind("127.0.0.1:0")
.await
.context("could not bind local capture HTTP listener")?;
let port = listener.local_addr()?.port();
// 3. Spawn gst-launch with the full pipeline: video AND audio captured,
// encoded, and muxed into MPEG-TS inside gst. ffmpeg becomes a dumb
// pass-through HTTP server (`-c copy`), which avoids ffmpeg's input
// analysis stalls and timestamp-generation guesswork.
// encoded, and muxed into MPEG-TS inside gst. Output goes to stdout,
// which we pipe straight to our HTTP server task — no demux/remux,
// no codec assumptions.
let key_interval = (opts.framerate * 2).to_string();
let bitrate = opts.bitrate.to_string();
let mut gst_cmd = Command::new("gst-launch-1.0");
@@ -135,16 +142,13 @@ pub async fn start(opts: &HostOpts) -> Result<CaptureHandle> {
"!",
"videoconvert",
"!",
"video/x-raw,format=I420",
"video/x-raw,format=NV12",
"!",
"x264enc",
"tune=zerolatency",
"speed-preset=ultrafast",
"vah264enc",
"rate-control=cbr",
&format!("bitrate={bitrate}"),
&format!("key-int-max={key_interval}"),
"!",
"video/x-h264,profile=baseline",
"!",
"h264parse",
"config-interval=-1",
"!",
@@ -184,59 +188,61 @@ pub async fn start(opts: &HostOpts) -> Result<CaptureHandle> {
.stdout
.take()
.context("gst-launch-1.0 stdout pipe unavailable")?;
let ffmpeg_stdin: Stdio = gst_stdout
.try_into()
.context("could not convert gst stdout into ffmpeg stdin")?;
// 4. ffmpeg: re-mux pre-encoded H.264 + add pulse audio → MPEG-TS HTTP.
// Width/height/framerate are embedded in the h264 stream; ffmpeg
// doesn't need our portal-reported dimensions.
let url = format!("http://127.0.0.1:{port}");
let _ = (w, h);
let ffmpeg = Command::new("ffmpeg")
.stdin(ffmpeg_stdin)
.stdout(Stdio::null())
.stderr(Stdio::inherit())
.args([
"-loglevel",
"warning",
"-fflags",
"nobuffer+discardcorrupt+genpts",
"-flags",
"low_delay",
"-analyzeduration",
"0",
"-probesize",
"32",
"-f",
"mpegts",
"-i",
"pipe:0",
"-c",
"copy",
"-f",
"mpegts",
"-listen",
"1",
&url,
])
.spawn()
.context("failed to spawn ffmpeg")?;
// 4. Spawn the HTTP server task. It owns the listener + gst stdout: it
// accepts one client (the host's bridge socket via connect_to_capture),
// drains the HTTP request, writes a fixed MPEG-TS response, then
// copies gst stdout to the socket forever.
let server = tokio::spawn(serve_capture(listener, gst_stdout));
Ok(CaptureHandle {
port,
gst: Some(gst),
ffmpeg: Some(ffmpeg),
server: Some(server),
})
}
fn pick_random_port() -> Result<u16> {
let listener = StdTcpListener::bind("127.0.0.1:0")
.context("could not pick a local ephemeral port")?;
let port = listener.local_addr()?.port();
drop(listener);
Ok(port)
async fn serve_capture(listener: TcpListener, mut gst_stdout: ChildStdout) {
let mut sock = match listener.accept().await {
Ok((s, _)) => s,
Err(e) => {
tracing::warn!("capture HTTP accept failed: {e}");
return;
}
};
if !drain_http_request(&mut sock).await {
return;
}
const RESPONSE: &[u8] = b"HTTP/1.1 200 OK\r\n\
Content-Type: video/mp2t\r\n\
Cache-Control: no-cache, no-store\r\n\
Connection: close\r\n\
\r\n";
if sock.write_all(RESPONSE).await.is_err() {
return;
}
let _ = tokio::io::copy(&mut gst_stdout, &mut sock).await;
}
async fn drain_http_request(sock: &mut TcpStream) -> bool {
let mut buf = [0u8; 1024];
let mut total = Vec::with_capacity(512);
loop {
match sock.read(&mut buf).await {
Ok(0) => return false,
Ok(n) => total.extend_from_slice(&buf[..n]),
Err(_) => return false,
}
if total.windows(4).any(|w| w == b"\r\n\r\n") {
return true;
}
if total.len() > 16 * 1024 {
return false;
}
}
}
fn clear_cloexec(fd: &impl AsFd) -> Result<()> {
@@ -247,18 +253,18 @@ fn clear_cloexec(fd: &impl AsFd) -> Result<()> {
Ok(())
}
/// Connect to ffmpeg's `-listen 1` HTTP listener, retrying until it's up or we
/// time out. Returns the connected socket — `-listen 1` is a one-shot listener
/// so this stream IS the bridge socket; don't probe and discard.
pub async fn connect_to_capture(port: u16, max_wait: Duration) -> Result<tokio::net::TcpStream> {
/// Connect to the in-process capture HTTP listener, retrying until it's up or
/// we time out. Returns the connected socket — the listener accepts exactly
/// one connection (the bridge socket), so this stream IS the bridge socket.
pub async fn connect_to_capture(port: u16, max_wait: Duration) -> Result<TcpStream> {
let deadline = Instant::now() + max_wait;
loop {
match tokio::net::TcpStream::connect(("127.0.0.1", port)).await {
match TcpStream::connect(("127.0.0.1", port)).await {
Ok(stream) => return Ok(stream),
Err(_) if Instant::now() < deadline => {
sleep(Duration::from_millis(50)).await;
}
Err(e) => bail!("ffmpeg HTTP listener never came up on 127.0.0.1:{port}: {e}"),
Err(e) => bail!("capture HTTP listener never came up on 127.0.0.1:{port}: {e}"),
}
}
}