//! Wayland capture: ashpd ScreenCast portal → PipeWire fd → gst-launch //! pipewiresrc → MPEG-TS on gst stdout → in-process HTTP server bound on a //! random localhost port. The host bridge TCP-connects to that server and //! pumps bytes to QUIC. use anyhow::{Context, Result, bail}; use ashpd::{ WindowIdentifier, desktop::{ PersistMode, screencast::{CursorMode, Screencast, SourceType}, }, }; use nix::fcntl::{FcntlArg, FdFlag, fcntl}; use nix::sys::signal::{Signal, kill}; use nix::unistd::{Pid, close}; use std::os::fd::{AsFd, IntoRawFd, OwnedFd, RawFd}; use std::process::Stdio; use std::time::Duration; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::{TcpListener, TcpStream}; use tokio::process::{Child, ChildStdout, Command}; use tokio::task::JoinHandle; use tokio::time::{Instant, sleep, timeout}; use crate::cli::HostOpts; pub struct CaptureHandle { port: u16, gst: Option, server: Option>, } impl CaptureHandle { pub fn local_port(&self) -> u16 { self.port } /// Graceful teardown: SIGTERM gst, give it ~1s to exit, then SIGKILL, then /// abort the HTTP server task. Call this before dropping; Drop only fires /// the kill backstop. pub async fn shutdown(mut self) { if let Some(child) = self.gst.as_mut() && let Some(pid) = child.id() { let _ = kill(Pid::from_raw(pid as i32), Signal::SIGTERM); } if let Some(child) = self.gst.as_mut() { let _ = timeout(Duration::from_millis(1000), child.wait()).await; let _ = child.start_kill(); } if let Some(task) = self.server.take() { task.abort(); } } } impl Drop for CaptureHandle { fn drop(&mut self) { if let Some(child) = self.gst.as_mut() { let _ = child.start_kill(); } if let Some(task) = self.server.as_ref() { task.abort(); } } } pub async fn start(opts: &HostOpts) -> Result { // 1. Negotiate the screencast session with the portal. let proxy = Screencast::new() .await .context("could not reach the xdg-desktop-portal ScreenCast interface")?; let session = proxy.create_session().await?; let source = if opts.window { SourceType::Window } else { SourceType::Monitor }; proxy .select_sources( &session, CursorMode::Embedded, source.into(), false, None, PersistMode::DoNot, ) .await .context("select_sources failed")?; let response = proxy .start(&session, &WindowIdentifier::default()) .await .context("portal Start failed (did the user cancel the picker?)")? .response()?; let stream = response .streams() .first() .context("portal returned no screencast streams")?; let node_id = stream.pipe_wire_node_id(); let (w, h) = stream .size() .context("portal returned a stream with no size — pipewiresrc can't infer dimensions")?; let pw_fd: OwnedFd = proxy.open_pipe_wire_remote(&session).await?; tracing::info!(node_id, width = w, height = h, "portal handshake complete"); // The fd is CLOEXEC by default; the gst child needs to inherit it across // exec. We then leak it via into_raw_fd so its lifetime spans the spawn, // and close the parent's copy once gst is running. clear_cloexec(&pw_fd)?; let raw_fd: RawFd = pw_fd.into_raw_fd(); // 2. Bind the in-process HTTP listener on a random localhost port. let listener = TcpListener::bind("127.0.0.1:0") .await .context("could not bind local capture HTTP listener")?; let port = listener.local_addr()?.port(); // 3. Spawn gst-launch with the full pipeline: video AND audio captured, // encoded, and muxed into MPEG-TS inside gst. Output goes to stdout, // which we pipe straight to our HTTP server task — no demux/remux, // no codec assumptions. let key_interval = (opts.framerate * 2).to_string(); let bitrate = opts.bitrate.to_string(); let audio_monitor = default_audio_monitor().await?; let audio_device = format!("device={audio_monitor}"); let mut gst_cmd = Command::new("gst-launch-1.0"); gst_cmd .args([ // muxer + sink "mpegtsmux", "name=mux", "!", "queue", "!", "fdsink", "fd=1", // video branch — videorate caps to 30fps so we don't ship at the // monitor's refresh rate (e.g. 180Hz) and pile up frames in mpv's // demuxer queue faster than realtime. "pipewiresrc", &format!("fd={raw_fd}"), &format!("path={node_id}"), "do-timestamp=true", "!", "videorate", "!", &format!("video/x-raw,framerate={}/1", opts.framerate), "!", "queue", "!", "videoconvert", "!", "video/x-raw,format=NV12", "!", "vah264enc", "rate-control=cbr", &format!("bitrate={bitrate}"), &format!("key-int-max={key_interval}"), "!", "h264parse", "config-interval=-1", "!", "video/x-h264,stream-format=byte-stream,alignment=au", "!", "mux.", // audio branch — capture the default sink's MONITOR (system audio // out), not the default source (which is the mic). "pulsesrc", &audio_device, "do-timestamp=true", "!", "queue", "!", "audioconvert", "!", "audioresample", "!", "audio/x-raw,rate=48000,channels=2", "!", "avenc_aac", "bitrate=128000", "!", "aacparse", "!", "mux.", ]) .stdin(Stdio::null()) .stdout(Stdio::piped()) .stderr(Stdio::inherit()); if std::env::var_os("PIXELPASS_GST_DEBUG").is_some() { gst_cmd.env("GST_DEBUG", "3"); } let mut gst = gst_cmd.spawn().context("failed to spawn gst-launch-1.0")?; // Parent no longer needs the pipewire fd — gst inherited its own copy. let _ = close(raw_fd); let gst_stdout = gst .stdout .take() .context("gst-launch-1.0 stdout pipe unavailable")?; // 4. Spawn the HTTP server task. It owns the listener + gst stdout: it // accepts one client (the host's bridge socket via connect_to_capture), // drains the HTTP request, writes a fixed MPEG-TS response, then // copies gst stdout to the socket forever. let server = tokio::spawn(serve_capture(listener, gst_stdout)); Ok(CaptureHandle { port, gst: Some(gst), server: Some(server), }) } async fn serve_capture(listener: TcpListener, mut gst_stdout: ChildStdout) { let mut sock = match listener.accept().await { Ok((s, _)) => s, Err(e) => { tracing::warn!("capture HTTP accept failed: {e}"); return; } }; if !drain_http_request(&mut sock).await { return; } const RESPONSE: &[u8] = b"HTTP/1.1 200 OK\r\n\ Content-Type: video/mp2t\r\n\ Cache-Control: no-cache, no-store\r\n\ Connection: close\r\n\ \r\n"; if sock.write_all(RESPONSE).await.is_err() { return; } let _ = tokio::io::copy(&mut gst_stdout, &mut sock).await; } async fn drain_http_request(sock: &mut TcpStream) -> bool { let mut buf = [0u8; 1024]; let mut total = Vec::with_capacity(512); loop { match sock.read(&mut buf).await { Ok(0) => return false, Ok(n) => total.extend_from_slice(&buf[..n]), Err(_) => return false, } if total.windows(4).any(|w| w == b"\r\n\r\n") { return true; } if total.len() > 16 * 1024 { return false; } } } fn clear_cloexec(fd: &impl AsFd) -> Result<()> { let flags_int = fcntl(fd.as_fd(), FcntlArg::F_GETFD).context("F_GETFD on pipewire fd")?; let mut flags = FdFlag::from_bits_truncate(flags_int); flags.remove(FdFlag::FD_CLOEXEC); fcntl(fd.as_fd(), FcntlArg::F_SETFD(flags)).context("F_SETFD on pipewire fd")?; Ok(()) } /// Connect to the in-process capture HTTP listener, retrying until it's up or /// we time out. Returns the connected socket — the listener accepts exactly /// one connection (the bridge socket), so this stream IS the bridge socket. pub async fn connect_to_capture(port: u16, max_wait: Duration) -> Result { let deadline = Instant::now() + max_wait; loop { match TcpStream::connect(("127.0.0.1", port)).await { Ok(stream) => return Ok(stream), Err(_) if Instant::now() < deadline => { sleep(Duration::from_millis(50)).await; } Err(e) => bail!("capture HTTP listener never came up on 127.0.0.1:{port}: {e}"), } } } async fn default_audio_monitor() -> Result { let output = Command::new("pactl") .arg("get-default-sink") .output() .await .context("failed to run `pactl get-default-sink` (install pulseaudio-utils or pipewire-pulse)")?; if !output.status.success() { bail!( "pactl get-default-sink failed: {}", String::from_utf8_lossy(&output.stderr).trim() ); } let sink = String::from_utf8(output.stdout) .context("default sink name was not UTF-8")? .trim() .to_string(); if sink.is_empty() { bail!("pactl get-default-sink returned no name (is a sound server running?)"); } Ok(format!("{sink}.monitor")) }