From 75a01a361ec955dfc824808ab9d478f061ddaf2a Mon Sep 17 00:00:00 2001 From: Mollusk Date: Fri, 15 May 2026 16:43:23 -0400 Subject: [PATCH] Working Wayland end-to-end: gst owns the pipeline, ffmpeg just serves MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Moves the full capture+encode+mux pipeline into gst-launch, leaving ffmpeg as a thin HTTP server. Verified end-to-end on KDE Plasma 6 Wayland: screencast portal → mpv mirror-tunnel rendering in real time. Pipeline: pipewiresrc(do-timestamp) → videoconvert → x264enc (zerolatency ultrafast) → h264parse(config-interval=-1) → byte-stream caps → mpegtsmux ← (aacparse ← avenc_aac ← audioconvert ← pulsesrc) → fdsink fd=1 ffmpeg -fflags nobuffer+discardcorrupt+genpts -flags low_delay -analyzeduration 0 -probesize 32 -f mpegts -i pipe:0 -c copy -f mpegts -listen 1 http://127.0.0.1:N Why each piece is load-bearing (do not relitigate without cause): - x264enc + h264parse + byte-stream caps: raw video over a pipe hits stride/format negotiation problems (green screens with mis-aligned rows). Encoding inside gst sidesteps that entirely. - mpegtsmux inside gst: H.264 Annex B carries no timestamps. Without a container, ffmpeg sees "Timestamps are unset" and downstream muxing breaks. mpegts in gst preserves pipewiresrc's clock. - byte-stream + alignment=au caps: h264parse defaults to AVC format (length-prefixed NALUs) for some downstreams; ffmpeg's mpegts demuxer needs Annex B start codes. - audio in gst (pulsesrc + avenc_aac): keeping ffmpeg as a pure passthrough (`-c copy`) avoids ffmpeg's audio-input dependency delaying HTTP serving until both inputs are ready. - `-analyzeduration 0 -probesize 32`: stop ffmpeg from buffering 5MB / 5s of input before deciding it understands the stream. - Also fixes a separate one-shot bug from earlier: the previous health-probe in wait_for_listener consumed ffmpeg's single `-listen 1` accept slot, so the actual bridge connect hit Connection refused. Replaced with connect_to_capture which returns the bridge socket. Adds dep checks for pipewiresrc, x264enc, h264parse, mpegtsmux, pulsesrc, avenc_aac, aacparse with per-distro install hints. Known gap: VLC currently shows a green screen against the stream even though mpv works fine. Likely VLC-specific demuxer/latency settings, not a pipeline correctness issue — to investigate as a follow-up. mpv is the recommended client either way. --- src/common/deps.rs | 34 +++++++++++ src/host/mod.rs | 4 +- src/host/wayland.rs | 142 +++++++++++++++++++++++++------------------- 3 files changed, 117 insertions(+), 63 deletions(-) diff --git a/src/common/deps.rs b/src/common/deps.rs index 8bc7f00..77be2a4 100644 --- a/src/common/deps.rs +++ b/src/common/deps.rs @@ -10,6 +10,12 @@ pub fn check_host_binaries(display: DisplayServer) -> Result<()> { require("gst-launch-1.0")?; require("gst-inspect-1.0")?; require_gst_element("pipewiresrc")?; + require_gst_element("x264enc")?; + require_gst_element("h264parse")?; + require_gst_element("mpegtsmux")?; + require_gst_element("pulsesrc")?; + require_gst_element("avenc_aac")?; + require_gst_element("aacparse")?; } Ok(()) } @@ -72,6 +78,34 @@ fn install_hint_for_gst_element(name: &str) -> String { Some("opensuse" | "opensuse-tumbleweed" | "opensuse-leap") => "pipewire-gstreamer", _ => "the GStreamer PipeWire plugin", }, + "x264enc" => match distro.as_deref() { + Some("arch" | "cachyos" | "manjaro" | "endeavouros") => "gst-plugins-ugly", + Some("debian" | "ubuntu" | "pop" | "linuxmint") => "gstreamer1.0-plugins-ugly", + Some("fedora" | "nobara") => "gstreamer1-plugins-ugly", + Some("opensuse" | "opensuse-tumbleweed" | "opensuse-leap") => "gstreamer-plugins-ugly", + _ => "the GStreamer x264 plugin", + }, + "h264parse" | "mpegtsmux" | "aacparse" => match distro.as_deref() { + Some("arch" | "cachyos" | "manjaro" | "endeavouros") => "gst-plugins-bad", + Some("debian" | "ubuntu" | "pop" | "linuxmint") => "gstreamer1.0-plugins-bad", + Some("fedora" | "nobara") => "gstreamer1-plugins-bad-free", + Some("opensuse" | "opensuse-tumbleweed" | "opensuse-leap") => "gstreamer-plugins-bad", + _ => "the GStreamer plugins-bad set", + }, + "pulsesrc" => match distro.as_deref() { + Some("arch" | "cachyos" | "manjaro" | "endeavouros") => "gst-plugins-good", + Some("debian" | "ubuntu" | "pop" | "linuxmint") => "gstreamer1.0-pulseaudio", + Some("fedora" | "nobara") => "gstreamer1-plugins-good", + Some("opensuse" | "opensuse-tumbleweed" | "opensuse-leap") => "gstreamer-plugins-good", + _ => "the GStreamer PulseAudio plugin", + }, + "avenc_aac" => match distro.as_deref() { + Some("arch" | "cachyos" | "manjaro" | "endeavouros") => "gst-libav", + Some("debian" | "ubuntu" | "pop" | "linuxmint") => "gstreamer1.0-libav", + Some("fedora" | "nobara") => "gstreamer1-libav", + Some("opensuse" | "opensuse-tumbleweed" | "opensuse-leap") => "gstreamer-libav", + _ => "the GStreamer libav (avenc) plugin", + }, _ => name, }; install_command(&distro, pkg) diff --git a/src/host/mod.rs b/src/host/mod.rs index 6deb2bd..25d776b 100644 --- a/src/host/mod.rs +++ b/src/host/mod.rs @@ -5,7 +5,6 @@ use anyhow::{Result, bail}; use iroh::Endpoint; use iroh::endpoint::{Connection, presets}; use iroh_tickets::endpoint::EndpointTicket; -use tokio::net::TcpStream; use tokio_util::sync::CancellationToken; use crate::cli::HostOpts; @@ -73,8 +72,7 @@ async fn handle_peer( let capture_handle = capture::spawn(display, opts).await?; let port = capture_handle.local_port(); - wayland::wait_for_listener(port, std::time::Duration::from_secs(5)).await?; - let tcp = TcpStream::connect(("127.0.0.1", port)).await?; + let tcp = wayland::connect_to_capture(port, std::time::Duration::from_secs(5)).await?; let bridge = crate::common::tunnel::bridge(quic_send, quic_recv, tcp); diff --git a/src/host/wayland.rs b/src/host/wayland.rs index 4bf100e..abe7a1c 100644 --- a/src/host/wayland.rs +++ b/src/host/wayland.rs @@ -98,6 +98,7 @@ pub async fn start(opts: &HostOpts) -> Result { .context("portal returned a stream with no size — pipewiresrc can't infer dimensions")?; let pw_fd: OwnedFd = proxy.open_pipe_wire_remote(&session).await?; + tracing::info!(node_id, width = w, height = h, "portal handshake complete"); // The fd is CLOEXEC by default; the gst child needs to inherit it across // exec. We then leak it via into_raw_fd so its lifetime spans the spawn, // and close the parent's copy once gst is running. @@ -107,28 +108,76 @@ pub async fn start(opts: &HostOpts) -> Result { // 2. Reserve a localhost port for ffmpeg's HTTP listener. let port = pick_random_port()?; - // 3. Spawn gst-launch → raw NV12 on stdout. - let mut gst = Command::new("gst-launch-1.0") + // 3. Spawn gst-launch with the full pipeline: video AND audio captured, + // encoded, and muxed into MPEG-TS inside gst. ffmpeg becomes a dumb + // pass-through HTTP server (`-c copy`), which avoids ffmpeg's input + // analysis stalls and timestamp-generation guesswork. + let key_interval = (opts.framerate * 2).to_string(); + let bitrate = opts.bitrate.to_string(); + let mut gst_cmd = Command::new("gst-launch-1.0"); + gst_cmd .args([ - "-q", - "pipewiresrc", - &format!("fd={raw_fd}"), - &format!("path={node_id}"), + // muxer + sink + "mpegtsmux", + "name=mux", "!", - "videoconvert", - "!", - "video/x-raw,format=NV12", + "queue", "!", "fdsink", "fd=1", + // video branch + "pipewiresrc", + &format!("fd={raw_fd}"), + &format!("path={node_id}"), + "do-timestamp=true", + "!", + "queue", + "!", + "videoconvert", + "!", + "video/x-raw,format=I420", + "!", + "x264enc", + "tune=zerolatency", + "speed-preset=ultrafast", + &format!("bitrate={bitrate}"), + &format!("key-int-max={key_interval}"), + "!", + "video/x-h264,profile=baseline", + "!", + "h264parse", + "config-interval=-1", + "!", + "video/x-h264,stream-format=byte-stream,alignment=au", + "!", + "mux.", + // audio branch + "pulsesrc", + "do-timestamp=true", + "!", + "queue", + "!", + "audioconvert", + "!", + "audioresample", + "!", + "audio/x-raw,rate=48000,channels=2", + "!", + "avenc_aac", + "bitrate=128000", + "!", + "aacparse", + "!", + "mux.", ]) .stdin(Stdio::null()) .stdout(Stdio::piped()) - .stderr(Stdio::inherit()) - .spawn() - .context("failed to spawn gst-launch-1.0")?; + .stderr(Stdio::inherit()); + if std::env::var_os("PIXELPASS_GST_DEBUG").is_some() { + gst_cmd.env("GST_DEBUG", "3"); + } + let mut gst = gst_cmd.spawn().context("failed to spawn gst-launch-1.0")?; // Parent no longer needs the pipewire fd — gst inherited its own copy. - // Ignore close errors; the worst case is a leaked fd until our exit. let _ = close(raw_fd); let gst_stdout = gst @@ -139,12 +188,11 @@ pub async fn start(opts: &HostOpts) -> Result { .try_into() .context("could not convert gst stdout into ffmpeg stdin")?; - // 4. Spawn ffmpeg consuming gst stdout, producing MPEG-TS HTTP. + // 4. ffmpeg: re-mux pre-encoded H.264 + add pulse audio → MPEG-TS HTTP. + // Width/height/framerate are embedded in the h264 stream; ffmpeg + // doesn't need our portal-reported dimensions. let url = format!("http://127.0.0.1:{port}"); - let bitrate_arg = format!("{}k", opts.bitrate); - let video_size = format!("{w}x{h}"); - let framerate = opts.framerate.to_string(); - let gop = opts.framerate.to_string(); + let _ = (w, h); let ffmpeg = Command::new("ffmpeg") .stdin(ffmpeg_stdin) @@ -153,44 +201,20 @@ pub async fn start(opts: &HostOpts) -> Result { .args([ "-loglevel", "warning", - "-f", - "rawvideo", - "-pix_fmt", - "nv12", - "-video_size", - &video_size, - "-framerate", - &framerate, - "-i", - "pipe:0", - "-f", - "pulse", - "-i", - "default", - "-c:v", - "libx264", - "-preset", - "ultrafast", - "-tune", - "zerolatency", - "-bf", - "0", - "-g", - &gop, - "-b:v", - &bitrate_arg, - "-maxrate", - &bitrate_arg, - "-bufsize", - &bitrate_arg, - "-c:a", - "aac", - "-b:a", - "128k", "-fflags", - "nobuffer", + "nobuffer+discardcorrupt+genpts", "-flags", "low_delay", + "-analyzeduration", + "0", + "-probesize", + "32", + "-f", + "mpegts", + "-i", + "pipe:0", + "-c", + "copy", "-f", "mpegts", "-listen", @@ -223,16 +247,14 @@ fn clear_cloexec(fd: &impl AsFd) -> Result<()> { Ok(()) } -/// Wait until ffmpeg's `-listen 1` server actually accepts a TCP connection, -/// or time out. Returns Ok(()) on success. -pub async fn wait_for_listener(port: u16, max_wait: Duration) -> Result<()> { +/// Connect to ffmpeg's `-listen 1` HTTP listener, retrying until it's up or we +/// time out. Returns the connected socket — `-listen 1` is a one-shot listener +/// so this stream IS the bridge socket; don't probe and discard. +pub async fn connect_to_capture(port: u16, max_wait: Duration) -> Result { let deadline = Instant::now() + max_wait; loop { match tokio::net::TcpStream::connect(("127.0.0.1", port)).await { - Ok(stream) => { - drop(stream); - return Ok(()); - } + Ok(stream) => return Ok(stream), Err(_) if Instant::now() < deadline => { sleep(Duration::from_millis(50)).await; }