From 3a551c2287517510734bed10f6441154fc348f67 Mon Sep 17 00:00:00 2001 From: Mollusk Date: Fri, 15 May 2026 15:22:35 -0400 Subject: [PATCH] =?UTF-8?q?Wayland=20capture:=20ashpd=20portal=20=E2=86=92?= =?UTF-8?q?=20gst-launch=20=E2=86=92=20ffmpeg=20=E2=86=92=20MPEG-TS?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements the Wayland host pipeline from plan §4.5: ashpd ScreenCast portal -> CreateSession + SelectSources + Start + OpenPipeWireRemote -> (pipewire fd, node_id, width, height) gst-launch-1.0 pipewiresrc fd=N path=NODE_ID ! videoconvert ! video/x-raw,format=NV12 ! fdsink fd=1 ffmpeg -f rawvideo -pix_fmt nv12 -video_size WxH -i pipe:0 -f pulse -i default -c:v libx264 -preset ultrafast -tune zerolatency -c:a aac -f mpegts -listen 1 http://127.0.0.1: Phase 1 ships software x264 per plan §7; VAAPI is Phase 2. src/host/wayland.rs is the new module. capture.rs becomes a thin dispatcher with a CaptureHandle enum (Wayland today, X11 next). host/mod.rs swaps the 150ms sleep for a poll-until-listener-ready helper, and calls handle.shutdown().await for an orderly SIGTERM / 1s grace / SIGKILL teardown. The Drop impl is the panic backstop. The pipewire fd handoff clears CLOEXEC before gst-launch spawn and closes the parent's copy of the raw fd after the child has it. Also deletes the empty src/host/tunnel.rs and src/viewer/tunnel.rs placeholder files — the generic bridge in common/tunnel.rs is doing the work, and there's no host- or viewer-specific tunnel concern worth a module yet. --- src/host/capture.rs | 52 +++++----- src/host/mod.rs | 11 +- src/host/tunnel.rs | 3 - src/host/wayland.rs | 242 +++++++++++++++++++++++++++++++++++++++++++ src/viewer/mod.rs | 2 - src/viewer/tunnel.rs | 2 - 6 files changed, 269 insertions(+), 43 deletions(-) delete mode 100644 src/host/tunnel.rs create mode 100644 src/host/wayland.rs delete mode 100644 src/viewer/tunnel.rs diff --git a/src/host/capture.rs b/src/host/capture.rs index b8285d5..d1be31b 100644 --- a/src/host/capture.rs +++ b/src/host/capture.rs @@ -1,43 +1,39 @@ -//! Capture-and-encode pipeline. -//! -//! Phase 1 stub: the actual ffmpeg / gst-launch spawn lives here. Returns a -//! [`CaptureHandle`] whose `Drop` impl kills the children so a panic on the -//! bridge side can't leave a black-hole encoder running. -//! -//! Phase 2 will fill in X11 (x11grab) and Wayland (ashpd portal + -//! pipewiresrc → fdsink | ffmpeg) bodies; per-app PipeWire routing hangs off -//! the Wayland branch. +//! Capture dispatcher. Selects the per-display-server pipeline and returns its +//! [`CaptureHandle`]. Each backend owns its own children and tears them down +//! via its own Drop / shutdown. use anyhow::{Result, bail}; use crate::cli::HostOpts; use crate::common::display::DisplayServer; +use crate::host::wayland; -pub struct CaptureHandle { - port: u16, - // Future: child handles, PipewireState, etc. +pub enum CaptureHandle { + Wayland(wayland::CaptureHandle), } impl CaptureHandle { pub fn local_port(&self) -> u16 { - self.port - } -} - -impl Drop for CaptureHandle { - fn drop(&mut self) { - // Future: SIGTERM children with a short grace period, then SIGKILL. - // Tear down any PipeWire null sinks / loopbacks created in spawn(). - } -} - -pub async fn spawn(display: DisplayServer, _opts: &HostOpts) -> Result { - match display { - DisplayServer::X11 => { - bail!("X11 capture pipeline not yet implemented (Phase 2)"); + match self { + CaptureHandle::Wayland(h) => h.local_port(), } + } + + pub async fn shutdown(self) { + match self { + CaptureHandle::Wayland(h) => h.shutdown().await, + } + } +} + +pub async fn spawn(display: DisplayServer, opts: &HostOpts) -> Result { + match display { DisplayServer::Wayland => { - bail!("Wayland capture pipeline not yet implemented (Phase 2)"); + let h = wayland::start(opts).await?; + Ok(CaptureHandle::Wayland(h)) + } + DisplayServer::X11 => { + bail!("X11 capture pipeline not yet implemented (Phase 2 follow-up)"); } DisplayServer::Unknown => unreachable!("caller guarantees display != Unknown"), } diff --git a/src/host/mod.rs b/src/host/mod.rs index ee53a58..6deb2bd 100644 --- a/src/host/mod.rs +++ b/src/host/mod.rs @@ -1,5 +1,5 @@ mod capture; -mod tunnel; +mod wayland; use anyhow::{Result, bail}; use iroh::Endpoint; @@ -71,14 +71,9 @@ async fn handle_peer( ) -> Result<()> { let (quic_send, quic_recv) = conn.accept_bi().await?; - // Spawn ffmpeg (and on Wayland, the gst-launch bridge) listening on a - // random localhost port. Returns the port and a guard that kills the - // child(ren) on drop. let capture_handle = capture::spawn(display, opts).await?; let port = capture_handle.local_port(); - - // Give ffmpeg's `-listen 1` HTTP server a moment to bind before we dial. - tokio::time::sleep(std::time::Duration::from_millis(150)).await; + wayland::wait_for_listener(port, std::time::Duration::from_secs(5)).await?; let tcp = TcpStream::connect(("127.0.0.1", port)).await?; let bridge = crate::common::tunnel::bridge(quic_send, quic_recv, tcp); @@ -96,7 +91,7 @@ async fn handle_peer( } } - drop(capture_handle); // explicit teardown of ffmpeg/gst + capture_handle.shutdown().await; Ok(()) } diff --git a/src/host/tunnel.rs b/src/host/tunnel.rs deleted file mode 100644 index 331bd7b..0000000 --- a/src/host/tunnel.rs +++ /dev/null @@ -1,3 +0,0 @@ -//! Host-side tunnel glue lives here once it gains host-specific concerns -//! (e.g. coordinating capture teardown with bridge shutdown). For now the -//! generic bridge in `common::tunnel` is sufficient. diff --git a/src/host/wayland.rs b/src/host/wayland.rs new file mode 100644 index 0000000..4bf100e --- /dev/null +++ b/src/host/wayland.rs @@ -0,0 +1,242 @@ +//! Wayland capture: ashpd ScreenCast portal → PipeWire fd → gst-launch +//! pipewiresrc → fdsink stdout → ffmpeg stdin → MPEG-TS over `-listen 1` HTTP. + +use anyhow::{Context, Result, bail}; +use ashpd::{ + WindowIdentifier, + desktop::{ + PersistMode, + screencast::{CursorMode, Screencast, SourceType}, + }, +}; +use nix::fcntl::{FcntlArg, FdFlag, fcntl}; +use nix::sys::signal::{Signal, kill}; +use nix::unistd::{Pid, close}; +use std::net::TcpListener as StdTcpListener; +use std::os::fd::{AsFd, IntoRawFd, OwnedFd, RawFd}; +use std::process::Stdio; +use std::time::Duration; +use tokio::process::{Child, Command}; +use tokio::time::{Instant, sleep, timeout}; + +use crate::cli::HostOpts; + +pub struct CaptureHandle { + port: u16, + gst: Option, + ffmpeg: Option, +} + +impl CaptureHandle { + pub fn local_port(&self) -> u16 { + self.port + } + + /// Graceful teardown: SIGTERM both children, give them ~1s to exit, then + /// SIGKILL. Call this before dropping; Drop only fires the kill backstop. + pub async fn shutdown(mut self) { + for opt in [&mut self.ffmpeg, &mut self.gst] { + if let Some(child) = opt.as_mut() + && let Some(pid) = child.id() + { + let _ = kill(Pid::from_raw(pid as i32), Signal::SIGTERM); + } + } + + for opt in [&mut self.ffmpeg, &mut self.gst] { + if let Some(child) = opt.as_mut() { + let _ = timeout(Duration::from_millis(1000), child.wait()).await; + let _ = child.start_kill(); + } + } + } +} + +impl Drop for CaptureHandle { + fn drop(&mut self) { + for opt in [&mut self.ffmpeg, &mut self.gst] { + if let Some(child) = opt.as_mut() { + let _ = child.start_kill(); + } + } + } +} + +pub async fn start(opts: &HostOpts) -> Result { + // 1. Negotiate the screencast session with the portal. + let proxy = Screencast::new() + .await + .context("could not reach the xdg-desktop-portal ScreenCast interface")?; + let session = proxy.create_session().await?; + + let source = if opts.window { SourceType::Window } else { SourceType::Monitor }; + proxy + .select_sources( + &session, + CursorMode::Embedded, + source.into(), + false, + None, + PersistMode::DoNot, + ) + .await + .context("select_sources failed")?; + + let response = proxy + .start(&session, &WindowIdentifier::default()) + .await + .context("portal Start failed (did the user cancel the picker?)")? + .response()?; + + let stream = response + .streams() + .first() + .context("portal returned no screencast streams")?; + let node_id = stream.pipe_wire_node_id(); + let (w, h) = stream + .size() + .context("portal returned a stream with no size — pipewiresrc can't infer dimensions")?; + + let pw_fd: OwnedFd = proxy.open_pipe_wire_remote(&session).await?; + // The fd is CLOEXEC by default; the gst child needs to inherit it across + // exec. We then leak it via into_raw_fd so its lifetime spans the spawn, + // and close the parent's copy once gst is running. + clear_cloexec(&pw_fd)?; + let raw_fd: RawFd = pw_fd.into_raw_fd(); + + // 2. Reserve a localhost port for ffmpeg's HTTP listener. + let port = pick_random_port()?; + + // 3. Spawn gst-launch → raw NV12 on stdout. + let mut gst = Command::new("gst-launch-1.0") + .args([ + "-q", + "pipewiresrc", + &format!("fd={raw_fd}"), + &format!("path={node_id}"), + "!", + "videoconvert", + "!", + "video/x-raw,format=NV12", + "!", + "fdsink", + "fd=1", + ]) + .stdin(Stdio::null()) + .stdout(Stdio::piped()) + .stderr(Stdio::inherit()) + .spawn() + .context("failed to spawn gst-launch-1.0")?; + // Parent no longer needs the pipewire fd — gst inherited its own copy. + // Ignore close errors; the worst case is a leaked fd until our exit. + let _ = close(raw_fd); + + let gst_stdout = gst + .stdout + .take() + .context("gst-launch-1.0 stdout pipe unavailable")?; + let ffmpeg_stdin: Stdio = gst_stdout + .try_into() + .context("could not convert gst stdout into ffmpeg stdin")?; + + // 4. Spawn ffmpeg consuming gst stdout, producing MPEG-TS HTTP. + let url = format!("http://127.0.0.1:{port}"); + let bitrate_arg = format!("{}k", opts.bitrate); + let video_size = format!("{w}x{h}"); + let framerate = opts.framerate.to_string(); + let gop = opts.framerate.to_string(); + + let ffmpeg = Command::new("ffmpeg") + .stdin(ffmpeg_stdin) + .stdout(Stdio::null()) + .stderr(Stdio::inherit()) + .args([ + "-loglevel", + "warning", + "-f", + "rawvideo", + "-pix_fmt", + "nv12", + "-video_size", + &video_size, + "-framerate", + &framerate, + "-i", + "pipe:0", + "-f", + "pulse", + "-i", + "default", + "-c:v", + "libx264", + "-preset", + "ultrafast", + "-tune", + "zerolatency", + "-bf", + "0", + "-g", + &gop, + "-b:v", + &bitrate_arg, + "-maxrate", + &bitrate_arg, + "-bufsize", + &bitrate_arg, + "-c:a", + "aac", + "-b:a", + "128k", + "-fflags", + "nobuffer", + "-flags", + "low_delay", + "-f", + "mpegts", + "-listen", + "1", + &url, + ]) + .spawn() + .context("failed to spawn ffmpeg")?; + + Ok(CaptureHandle { + port, + gst: Some(gst), + ffmpeg: Some(ffmpeg), + }) +} + +fn pick_random_port() -> Result { + let listener = StdTcpListener::bind("127.0.0.1:0") + .context("could not pick a local ephemeral port")?; + let port = listener.local_addr()?.port(); + drop(listener); + Ok(port) +} + +fn clear_cloexec(fd: &impl AsFd) -> Result<()> { + let flags_int = fcntl(fd.as_fd(), FcntlArg::F_GETFD).context("F_GETFD on pipewire fd")?; + let mut flags = FdFlag::from_bits_truncate(flags_int); + flags.remove(FdFlag::FD_CLOEXEC); + fcntl(fd.as_fd(), FcntlArg::F_SETFD(flags)).context("F_SETFD on pipewire fd")?; + Ok(()) +} + +/// Wait until ffmpeg's `-listen 1` server actually accepts a TCP connection, +/// or time out. Returns Ok(()) on success. +pub async fn wait_for_listener(port: u16, max_wait: Duration) -> Result<()> { + let deadline = Instant::now() + max_wait; + loop { + match tokio::net::TcpStream::connect(("127.0.0.1", port)).await { + Ok(stream) => { + drop(stream); + return Ok(()); + } + Err(_) if Instant::now() < deadline => { + sleep(Duration::from_millis(50)).await; + } + Err(e) => bail!("ffmpeg HTTP listener never came up on 127.0.0.1:{port}: {e}"), + } + } +} diff --git a/src/viewer/mod.rs b/src/viewer/mod.rs index 036534d..bfb931f 100644 --- a/src/viewer/mod.rs +++ b/src/viewer/mod.rs @@ -1,5 +1,3 @@ -mod tunnel; - use anyhow::Result; use iroh::Endpoint; use iroh::endpoint::presets; diff --git a/src/viewer/tunnel.rs b/src/viewer/tunnel.rs deleted file mode 100644 index d9f3e73..0000000 --- a/src/viewer/tunnel.rs +++ /dev/null @@ -1,2 +0,0 @@ -//! Viewer-side tunnel glue lives here if/when it grows beyond the generic -//! bridge in `common::tunnel`. For Phase 1 the generic bridge is sufficient.