Wayland: VAAPI H.264 + in-process HTTP server, drop ffmpeg
The previous ffmpeg-as-HTTP-server pipeline shape held back two improvements at once. ffmpeg as the runtime server lost a one-shot `-listen 1` accept to a probe-and-discard health check, and forced us to size analyze/probe budgets carefully so ffmpeg would serve before our deadline. Replacing it with a small tokio task that accepts once, drains the HTTP request, writes a fixed 200 OK, then `tokio::io::copy`s gst stdout to the socket removes all of that. VAAPI H.264 (vah264enc) drops CPU encode from ~50% of a core to single-digit %. An earlier attempt at vaav1enc had to be abandoned: libavformat cannot demux AV1-in-MPEG-TS with the custom mapping even with a 20MB probe budget — mpv reports video=eof. H.264 keeps the hardware win on the well-trodden demuxer path. scripts/smoke-pipeline.sh mirrors the runtime pipeline with videotestsrc/audiotestsrc into a file and asserts that mpv reports `video=playing` (not video=eof). The naive --frames=10 check was a false positive when no video stream is recognized; the verbose grep is the real gate. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
Executable
+53
@@ -0,0 +1,53 @@
|
||||
#!/usr/bin/env bash
|
||||
# Pipeline-only smoke test: mirrors host/wayland.rs's gst pipeline but
|
||||
# substitutes videotestsrc/audiotestsrc for pipewiresrc/pulsesrc and writes
|
||||
# to a file instead of fdsink, so it runs without the portal/PipeWire and
|
||||
# without an HTTP/iroh client. Catches AV1 encode + mpegtsmux + AAC mux
|
||||
# regressions in ~5 seconds without any user interaction.
|
||||
#
|
||||
# Usage: scripts/smoke-pipeline.sh
|
||||
# Exits 0 on pass, non-zero with diagnostic on fail.
|
||||
|
||||
set -euo pipefail
|
||||
|
||||
OUT="${TMPDIR:-/tmp}/pixelpass-smoke-$$.ts"
|
||||
trap 'rm -f "$OUT"' EXIT
|
||||
|
||||
echo "[smoke] running gst pipeline (videotestsrc + audiotestsrc, ~2s) -> $OUT"
|
||||
gst-launch-1.0 -q \
|
||||
mpegtsmux name=mux \
|
||||
! queue ! filesink location="$OUT" \
|
||||
videotestsrc num-buffers=60 is-live=false \
|
||||
! video/x-raw,width=640,height=360,framerate=30/1 \
|
||||
! videoconvert ! video/x-raw,format=NV12 \
|
||||
! vah264enc rate-control=cbr bitrate=2000 key-int-max=60 \
|
||||
! h264parse config-interval=-1 \
|
||||
! video/x-h264,stream-format=byte-stream,alignment=au \
|
||||
! mux. \
|
||||
audiotestsrc num-buffers=94 is-live=false \
|
||||
! audioconvert ! audioresample \
|
||||
! audio/x-raw,rate=48000,channels=2 \
|
||||
! avenc_aac bitrate=128000 ! aacparse ! mux.
|
||||
|
||||
[[ -s "$OUT" ]] || { echo "[smoke] FAIL: output file empty or missing"; exit 1; }
|
||||
echo "[smoke] output size: $(stat -c %s "$OUT") bytes"
|
||||
|
||||
echo "[smoke] ffprobe stream listing"
|
||||
ffprobe -v error -show_entries stream=index,codec_type,codec_name "$OUT" \
|
||||
| sed 's/^/ /'
|
||||
|
||||
# Real gate: mpv must actually decode video. The previous --frames=10 check was
|
||||
# a false positive — if there's no video stream, --frames silently does nothing
|
||||
# and mpv exits 0 after playing the audio. We instead grep mpv -v output for
|
||||
# "video=playing"; "video=eof" means mpv saw no video track.
|
||||
echo "[smoke] mpv video-decode probe (-v, asserting 'video=playing')"
|
||||
MPV_LOG=$(mpv --no-config -v --vo=null --ao=null --frames=10 "$OUT" 2>&1)
|
||||
echo "$MPV_LOG" | grep -E 'video=(playing|eof)' | sed 's/^/ /'
|
||||
if echo "$MPV_LOG" | grep -q 'video=playing'; then
|
||||
echo "[smoke] PASS — pipeline produces an mpv-decodable video + audio stream"
|
||||
else
|
||||
echo "[smoke] FAIL: mpv did not decode video (saw video=eof or no video state)"
|
||||
echo "[smoke] last 30 lines of mpv output:"
|
||||
echo "$MPV_LOG" | tail -30 | sed 's/^/ /'
|
||||
exit 1
|
||||
fi
|
||||
+7
-9
@@ -5,12 +5,11 @@ use std::process::Command;
|
||||
use crate::common::display::DisplayServer;
|
||||
|
||||
pub fn check_host_binaries(display: DisplayServer) -> Result<()> {
|
||||
require("ffmpeg")?;
|
||||
if display == DisplayServer::Wayland {
|
||||
require("gst-launch-1.0")?;
|
||||
require("gst-inspect-1.0")?;
|
||||
require_gst_element("pipewiresrc")?;
|
||||
require_gst_element("x264enc")?;
|
||||
require_gst_element("vah264enc")?;
|
||||
require_gst_element("h264parse")?;
|
||||
require_gst_element("mpegtsmux")?;
|
||||
require_gst_element("pulsesrc")?;
|
||||
@@ -56,7 +55,6 @@ fn which(bin: &str) -> Option<PathBuf> {
|
||||
fn install_hint_for_bin(bin: &str) -> String {
|
||||
let distro = detect_distro();
|
||||
let pkg = match bin {
|
||||
"ffmpeg" => "ffmpeg",
|
||||
"gst-launch-1.0" | "gst-inspect-1.0" => match distro.as_deref() {
|
||||
Some("arch" | "cachyos" | "manjaro" | "endeavouros") => "gstreamer gst-plugins-base",
|
||||
Some("debian" | "ubuntu" | "pop" | "linuxmint") => "gstreamer1.0-tools",
|
||||
@@ -78,12 +76,12 @@ fn install_hint_for_gst_element(name: &str) -> String {
|
||||
Some("opensuse" | "opensuse-tumbleweed" | "opensuse-leap") => "pipewire-gstreamer",
|
||||
_ => "the GStreamer PipeWire plugin",
|
||||
},
|
||||
"x264enc" => match distro.as_deref() {
|
||||
Some("arch" | "cachyos" | "manjaro" | "endeavouros") => "gst-plugins-ugly",
|
||||
Some("debian" | "ubuntu" | "pop" | "linuxmint") => "gstreamer1.0-plugins-ugly",
|
||||
Some("fedora" | "nobara") => "gstreamer1-plugins-ugly",
|
||||
Some("opensuse" | "opensuse-tumbleweed" | "opensuse-leap") => "gstreamer-plugins-ugly",
|
||||
_ => "the GStreamer x264 plugin",
|
||||
"vah264enc" => match distro.as_deref() {
|
||||
Some("arch" | "cachyos" | "manjaro" | "endeavouros") => "gst-plugin-va",
|
||||
Some("debian" | "ubuntu" | "pop" | "linuxmint") => "gstreamer1.0-plugins-bad",
|
||||
Some("fedora" | "nobara") => "gstreamer1-plugins-bad-free",
|
||||
Some("opensuse" | "opensuse-tumbleweed" | "opensuse-leap") => "gstreamer-plugins-bad",
|
||||
_ => "the GStreamer VA-API plugin (requires an H.264-capable GPU; almost all modern GPUs)",
|
||||
},
|
||||
"h264parse" | "mpegtsmux" | "aacparse" => match distro.as_deref() {
|
||||
Some("arch" | "cachyos" | "manjaro" | "endeavouros") => "gst-plugins-bad",
|
||||
|
||||
+90
-84
@@ -1,5 +1,7 @@
|
||||
//! Wayland capture: ashpd ScreenCast portal → PipeWire fd → gst-launch
|
||||
//! pipewiresrc → fdsink stdout → ffmpeg stdin → MPEG-TS over `-listen 1` HTTP.
|
||||
//! pipewiresrc → MPEG-TS on gst stdout → in-process HTTP server bound on a
|
||||
//! random localhost port. The host bridge TCP-connects to that server and
|
||||
//! pumps bytes to QUIC.
|
||||
|
||||
use anyhow::{Context, Result, bail};
|
||||
use ashpd::{
|
||||
@@ -12,11 +14,13 @@ use ashpd::{
|
||||
use nix::fcntl::{FcntlArg, FdFlag, fcntl};
|
||||
use nix::sys::signal::{Signal, kill};
|
||||
use nix::unistd::{Pid, close};
|
||||
use std::net::TcpListener as StdTcpListener;
|
||||
use std::os::fd::{AsFd, IntoRawFd, OwnedFd, RawFd};
|
||||
use std::process::Stdio;
|
||||
use std::time::Duration;
|
||||
use tokio::process::{Child, Command};
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
use tokio::net::{TcpListener, TcpStream};
|
||||
use tokio::process::{Child, ChildStdout, Command};
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio::time::{Instant, sleep, timeout};
|
||||
|
||||
use crate::cli::HostOpts;
|
||||
@@ -24,7 +28,7 @@ use crate::cli::HostOpts;
|
||||
pub struct CaptureHandle {
|
||||
port: u16,
|
||||
gst: Option<Child>,
|
||||
ffmpeg: Option<Child>,
|
||||
server: Option<JoinHandle<()>>,
|
||||
}
|
||||
|
||||
impl CaptureHandle {
|
||||
@@ -32,32 +36,32 @@ impl CaptureHandle {
|
||||
self.port
|
||||
}
|
||||
|
||||
/// Graceful teardown: SIGTERM both children, give them ~1s to exit, then
|
||||
/// SIGKILL. Call this before dropping; Drop only fires the kill backstop.
|
||||
/// Graceful teardown: SIGTERM gst, give it ~1s to exit, then SIGKILL, then
|
||||
/// abort the HTTP server task. Call this before dropping; Drop only fires
|
||||
/// the kill backstop.
|
||||
pub async fn shutdown(mut self) {
|
||||
for opt in [&mut self.ffmpeg, &mut self.gst] {
|
||||
if let Some(child) = opt.as_mut()
|
||||
&& let Some(pid) = child.id()
|
||||
{
|
||||
let _ = kill(Pid::from_raw(pid as i32), Signal::SIGTERM);
|
||||
}
|
||||
if let Some(child) = self.gst.as_mut()
|
||||
&& let Some(pid) = child.id()
|
||||
{
|
||||
let _ = kill(Pid::from_raw(pid as i32), Signal::SIGTERM);
|
||||
}
|
||||
|
||||
for opt in [&mut self.ffmpeg, &mut self.gst] {
|
||||
if let Some(child) = opt.as_mut() {
|
||||
let _ = timeout(Duration::from_millis(1000), child.wait()).await;
|
||||
let _ = child.start_kill();
|
||||
}
|
||||
if let Some(child) = self.gst.as_mut() {
|
||||
let _ = timeout(Duration::from_millis(1000), child.wait()).await;
|
||||
let _ = child.start_kill();
|
||||
}
|
||||
if let Some(task) = self.server.take() {
|
||||
task.abort();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for CaptureHandle {
|
||||
fn drop(&mut self) {
|
||||
for opt in [&mut self.ffmpeg, &mut self.gst] {
|
||||
if let Some(child) = opt.as_mut() {
|
||||
let _ = child.start_kill();
|
||||
}
|
||||
if let Some(child) = self.gst.as_mut() {
|
||||
let _ = child.start_kill();
|
||||
}
|
||||
if let Some(task) = self.server.as_ref() {
|
||||
task.abort();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -105,13 +109,16 @@ pub async fn start(opts: &HostOpts) -> Result<CaptureHandle> {
|
||||
clear_cloexec(&pw_fd)?;
|
||||
let raw_fd: RawFd = pw_fd.into_raw_fd();
|
||||
|
||||
// 2. Reserve a localhost port for ffmpeg's HTTP listener.
|
||||
let port = pick_random_port()?;
|
||||
// 2. Bind the in-process HTTP listener on a random localhost port.
|
||||
let listener = TcpListener::bind("127.0.0.1:0")
|
||||
.await
|
||||
.context("could not bind local capture HTTP listener")?;
|
||||
let port = listener.local_addr()?.port();
|
||||
|
||||
// 3. Spawn gst-launch with the full pipeline: video AND audio captured,
|
||||
// encoded, and muxed into MPEG-TS inside gst. ffmpeg becomes a dumb
|
||||
// pass-through HTTP server (`-c copy`), which avoids ffmpeg's input
|
||||
// analysis stalls and timestamp-generation guesswork.
|
||||
// encoded, and muxed into MPEG-TS inside gst. Output goes to stdout,
|
||||
// which we pipe straight to our HTTP server task — no demux/remux,
|
||||
// no codec assumptions.
|
||||
let key_interval = (opts.framerate * 2).to_string();
|
||||
let bitrate = opts.bitrate.to_string();
|
||||
let mut gst_cmd = Command::new("gst-launch-1.0");
|
||||
@@ -135,16 +142,13 @@ pub async fn start(opts: &HostOpts) -> Result<CaptureHandle> {
|
||||
"!",
|
||||
"videoconvert",
|
||||
"!",
|
||||
"video/x-raw,format=I420",
|
||||
"video/x-raw,format=NV12",
|
||||
"!",
|
||||
"x264enc",
|
||||
"tune=zerolatency",
|
||||
"speed-preset=ultrafast",
|
||||
"vah264enc",
|
||||
"rate-control=cbr",
|
||||
&format!("bitrate={bitrate}"),
|
||||
&format!("key-int-max={key_interval}"),
|
||||
"!",
|
||||
"video/x-h264,profile=baseline",
|
||||
"!",
|
||||
"h264parse",
|
||||
"config-interval=-1",
|
||||
"!",
|
||||
@@ -184,59 +188,61 @@ pub async fn start(opts: &HostOpts) -> Result<CaptureHandle> {
|
||||
.stdout
|
||||
.take()
|
||||
.context("gst-launch-1.0 stdout pipe unavailable")?;
|
||||
let ffmpeg_stdin: Stdio = gst_stdout
|
||||
.try_into()
|
||||
.context("could not convert gst stdout into ffmpeg stdin")?;
|
||||
|
||||
// 4. ffmpeg: re-mux pre-encoded H.264 + add pulse audio → MPEG-TS HTTP.
|
||||
// Width/height/framerate are embedded in the h264 stream; ffmpeg
|
||||
// doesn't need our portal-reported dimensions.
|
||||
let url = format!("http://127.0.0.1:{port}");
|
||||
let _ = (w, h);
|
||||
|
||||
let ffmpeg = Command::new("ffmpeg")
|
||||
.stdin(ffmpeg_stdin)
|
||||
.stdout(Stdio::null())
|
||||
.stderr(Stdio::inherit())
|
||||
.args([
|
||||
"-loglevel",
|
||||
"warning",
|
||||
"-fflags",
|
||||
"nobuffer+discardcorrupt+genpts",
|
||||
"-flags",
|
||||
"low_delay",
|
||||
"-analyzeduration",
|
||||
"0",
|
||||
"-probesize",
|
||||
"32",
|
||||
"-f",
|
||||
"mpegts",
|
||||
"-i",
|
||||
"pipe:0",
|
||||
"-c",
|
||||
"copy",
|
||||
"-f",
|
||||
"mpegts",
|
||||
"-listen",
|
||||
"1",
|
||||
&url,
|
||||
])
|
||||
.spawn()
|
||||
.context("failed to spawn ffmpeg")?;
|
||||
// 4. Spawn the HTTP server task. It owns the listener + gst stdout: it
|
||||
// accepts one client (the host's bridge socket via connect_to_capture),
|
||||
// drains the HTTP request, writes a fixed MPEG-TS response, then
|
||||
// copies gst stdout to the socket forever.
|
||||
let server = tokio::spawn(serve_capture(listener, gst_stdout));
|
||||
|
||||
Ok(CaptureHandle {
|
||||
port,
|
||||
gst: Some(gst),
|
||||
ffmpeg: Some(ffmpeg),
|
||||
server: Some(server),
|
||||
})
|
||||
}
|
||||
|
||||
fn pick_random_port() -> Result<u16> {
|
||||
let listener = StdTcpListener::bind("127.0.0.1:0")
|
||||
.context("could not pick a local ephemeral port")?;
|
||||
let port = listener.local_addr()?.port();
|
||||
drop(listener);
|
||||
Ok(port)
|
||||
async fn serve_capture(listener: TcpListener, mut gst_stdout: ChildStdout) {
|
||||
let mut sock = match listener.accept().await {
|
||||
Ok((s, _)) => s,
|
||||
Err(e) => {
|
||||
tracing::warn!("capture HTTP accept failed: {e}");
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
if !drain_http_request(&mut sock).await {
|
||||
return;
|
||||
}
|
||||
|
||||
const RESPONSE: &[u8] = b"HTTP/1.1 200 OK\r\n\
|
||||
Content-Type: video/mp2t\r\n\
|
||||
Cache-Control: no-cache, no-store\r\n\
|
||||
Connection: close\r\n\
|
||||
\r\n";
|
||||
if sock.write_all(RESPONSE).await.is_err() {
|
||||
return;
|
||||
}
|
||||
|
||||
let _ = tokio::io::copy(&mut gst_stdout, &mut sock).await;
|
||||
}
|
||||
|
||||
async fn drain_http_request(sock: &mut TcpStream) -> bool {
|
||||
let mut buf = [0u8; 1024];
|
||||
let mut total = Vec::with_capacity(512);
|
||||
loop {
|
||||
match sock.read(&mut buf).await {
|
||||
Ok(0) => return false,
|
||||
Ok(n) => total.extend_from_slice(&buf[..n]),
|
||||
Err(_) => return false,
|
||||
}
|
||||
if total.windows(4).any(|w| w == b"\r\n\r\n") {
|
||||
return true;
|
||||
}
|
||||
if total.len() > 16 * 1024 {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn clear_cloexec(fd: &impl AsFd) -> Result<()> {
|
||||
@@ -247,18 +253,18 @@ fn clear_cloexec(fd: &impl AsFd) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Connect to ffmpeg's `-listen 1` HTTP listener, retrying until it's up or we
|
||||
/// time out. Returns the connected socket — `-listen 1` is a one-shot listener
|
||||
/// so this stream IS the bridge socket; don't probe and discard.
|
||||
pub async fn connect_to_capture(port: u16, max_wait: Duration) -> Result<tokio::net::TcpStream> {
|
||||
/// Connect to the in-process capture HTTP listener, retrying until it's up or
|
||||
/// we time out. Returns the connected socket — the listener accepts exactly
|
||||
/// one connection (the bridge socket), so this stream IS the bridge socket.
|
||||
pub async fn connect_to_capture(port: u16, max_wait: Duration) -> Result<TcpStream> {
|
||||
let deadline = Instant::now() + max_wait;
|
||||
loop {
|
||||
match tokio::net::TcpStream::connect(("127.0.0.1", port)).await {
|
||||
match TcpStream::connect(("127.0.0.1", port)).await {
|
||||
Ok(stream) => return Ok(stream),
|
||||
Err(_) if Instant::now() < deadline => {
|
||||
sleep(Duration::from_millis(50)).await;
|
||||
}
|
||||
Err(e) => bail!("ffmpeg HTTP listener never came up on 127.0.0.1:{port}: {e}"),
|
||||
Err(e) => bail!("capture HTTP listener never came up on 127.0.0.1:{port}: {e}"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user