Files
pixelpass/src/host/wayland.rs
T
mollusk 3a551c2287 Wayland capture: ashpd portal → gst-launch → ffmpeg → MPEG-TS
Implements the Wayland host pipeline from plan §4.5:

  ashpd ScreenCast portal
    -> CreateSession + SelectSources + Start + OpenPipeWireRemote
    -> (pipewire fd, node_id, width, height)
  gst-launch-1.0 pipewiresrc fd=N path=NODE_ID ! videoconvert
    ! video/x-raw,format=NV12 ! fdsink fd=1
  ffmpeg
    -f rawvideo -pix_fmt nv12 -video_size WxH -i pipe:0
    -f pulse -i default
    -c:v libx264 -preset ultrafast -tune zerolatency
    -c:a aac -f mpegts -listen 1 http://127.0.0.1:<rand>

Phase 1 ships software x264 per plan §7; VAAPI is Phase 2.

src/host/wayland.rs is the new module. capture.rs becomes a thin
dispatcher with a CaptureHandle enum (Wayland today, X11 next).
host/mod.rs swaps the 150ms sleep for a poll-until-listener-ready
helper, and calls handle.shutdown().await for an orderly SIGTERM /
1s grace / SIGKILL teardown. The Drop impl is the panic backstop.

The pipewire fd handoff clears CLOEXEC before gst-launch spawn and
closes the parent's copy of the raw fd after the child has it.

Also deletes the empty src/host/tunnel.rs and src/viewer/tunnel.rs
placeholder files — the generic bridge in common/tunnel.rs is doing
the work, and there's no host- or viewer-specific tunnel concern
worth a module yet.
2026-05-15 15:22:35 -04:00

243 lines
7.1 KiB
Rust

//! Wayland capture: ashpd ScreenCast portal → PipeWire fd → gst-launch
//! pipewiresrc → fdsink stdout → ffmpeg stdin → MPEG-TS over `-listen 1` HTTP.
use anyhow::{Context, Result, bail};
use ashpd::{
WindowIdentifier,
desktop::{
PersistMode,
screencast::{CursorMode, Screencast, SourceType},
},
};
use nix::fcntl::{FcntlArg, FdFlag, fcntl};
use nix::sys::signal::{Signal, kill};
use nix::unistd::{Pid, close};
use std::net::TcpListener as StdTcpListener;
use std::os::fd::{AsFd, IntoRawFd, OwnedFd, RawFd};
use std::process::Stdio;
use std::time::Duration;
use tokio::process::{Child, Command};
use tokio::time::{Instant, sleep, timeout};
use crate::cli::HostOpts;
pub struct CaptureHandle {
port: u16,
gst: Option<Child>,
ffmpeg: Option<Child>,
}
impl CaptureHandle {
pub fn local_port(&self) -> u16 {
self.port
}
/// Graceful teardown: SIGTERM both children, give them ~1s to exit, then
/// SIGKILL. Call this before dropping; Drop only fires the kill backstop.
pub async fn shutdown(mut self) {
for opt in [&mut self.ffmpeg, &mut self.gst] {
if let Some(child) = opt.as_mut()
&& let Some(pid) = child.id()
{
let _ = kill(Pid::from_raw(pid as i32), Signal::SIGTERM);
}
}
for opt in [&mut self.ffmpeg, &mut self.gst] {
if let Some(child) = opt.as_mut() {
let _ = timeout(Duration::from_millis(1000), child.wait()).await;
let _ = child.start_kill();
}
}
}
}
impl Drop for CaptureHandle {
fn drop(&mut self) {
for opt in [&mut self.ffmpeg, &mut self.gst] {
if let Some(child) = opt.as_mut() {
let _ = child.start_kill();
}
}
}
}
pub async fn start(opts: &HostOpts) -> Result<CaptureHandle> {
// 1. Negotiate the screencast session with the portal.
let proxy = Screencast::new()
.await
.context("could not reach the xdg-desktop-portal ScreenCast interface")?;
let session = proxy.create_session().await?;
let source = if opts.window { SourceType::Window } else { SourceType::Monitor };
proxy
.select_sources(
&session,
CursorMode::Embedded,
source.into(),
false,
None,
PersistMode::DoNot,
)
.await
.context("select_sources failed")?;
let response = proxy
.start(&session, &WindowIdentifier::default())
.await
.context("portal Start failed (did the user cancel the picker?)")?
.response()?;
let stream = response
.streams()
.first()
.context("portal returned no screencast streams")?;
let node_id = stream.pipe_wire_node_id();
let (w, h) = stream
.size()
.context("portal returned a stream with no size — pipewiresrc can't infer dimensions")?;
let pw_fd: OwnedFd = proxy.open_pipe_wire_remote(&session).await?;
// The fd is CLOEXEC by default; the gst child needs to inherit it across
// exec. We then leak it via into_raw_fd so its lifetime spans the spawn,
// and close the parent's copy once gst is running.
clear_cloexec(&pw_fd)?;
let raw_fd: RawFd = pw_fd.into_raw_fd();
// 2. Reserve a localhost port for ffmpeg's HTTP listener.
let port = pick_random_port()?;
// 3. Spawn gst-launch → raw NV12 on stdout.
let mut gst = Command::new("gst-launch-1.0")
.args([
"-q",
"pipewiresrc",
&format!("fd={raw_fd}"),
&format!("path={node_id}"),
"!",
"videoconvert",
"!",
"video/x-raw,format=NV12",
"!",
"fdsink",
"fd=1",
])
.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::inherit())
.spawn()
.context("failed to spawn gst-launch-1.0")?;
// Parent no longer needs the pipewire fd — gst inherited its own copy.
// Ignore close errors; the worst case is a leaked fd until our exit.
let _ = close(raw_fd);
let gst_stdout = gst
.stdout
.take()
.context("gst-launch-1.0 stdout pipe unavailable")?;
let ffmpeg_stdin: Stdio = gst_stdout
.try_into()
.context("could not convert gst stdout into ffmpeg stdin")?;
// 4. Spawn ffmpeg consuming gst stdout, producing MPEG-TS HTTP.
let url = format!("http://127.0.0.1:{port}");
let bitrate_arg = format!("{}k", opts.bitrate);
let video_size = format!("{w}x{h}");
let framerate = opts.framerate.to_string();
let gop = opts.framerate.to_string();
let ffmpeg = Command::new("ffmpeg")
.stdin(ffmpeg_stdin)
.stdout(Stdio::null())
.stderr(Stdio::inherit())
.args([
"-loglevel",
"warning",
"-f",
"rawvideo",
"-pix_fmt",
"nv12",
"-video_size",
&video_size,
"-framerate",
&framerate,
"-i",
"pipe:0",
"-f",
"pulse",
"-i",
"default",
"-c:v",
"libx264",
"-preset",
"ultrafast",
"-tune",
"zerolatency",
"-bf",
"0",
"-g",
&gop,
"-b:v",
&bitrate_arg,
"-maxrate",
&bitrate_arg,
"-bufsize",
&bitrate_arg,
"-c:a",
"aac",
"-b:a",
"128k",
"-fflags",
"nobuffer",
"-flags",
"low_delay",
"-f",
"mpegts",
"-listen",
"1",
&url,
])
.spawn()
.context("failed to spawn ffmpeg")?;
Ok(CaptureHandle {
port,
gst: Some(gst),
ffmpeg: Some(ffmpeg),
})
}
fn pick_random_port() -> Result<u16> {
let listener = StdTcpListener::bind("127.0.0.1:0")
.context("could not pick a local ephemeral port")?;
let port = listener.local_addr()?.port();
drop(listener);
Ok(port)
}
fn clear_cloexec(fd: &impl AsFd) -> Result<()> {
let flags_int = fcntl(fd.as_fd(), FcntlArg::F_GETFD).context("F_GETFD on pipewire fd")?;
let mut flags = FdFlag::from_bits_truncate(flags_int);
flags.remove(FdFlag::FD_CLOEXEC);
fcntl(fd.as_fd(), FcntlArg::F_SETFD(flags)).context("F_SETFD on pipewire fd")?;
Ok(())
}
/// Wait until ffmpeg's `-listen 1` server actually accepts a TCP connection,
/// or time out. Returns Ok(()) on success.
pub async fn wait_for_listener(port: u16, max_wait: Duration) -> Result<()> {
let deadline = Instant::now() + max_wait;
loop {
match tokio::net::TcpStream::connect(("127.0.0.1", port)).await {
Ok(stream) => {
drop(stream);
return Ok(());
}
Err(_) if Instant::now() < deadline => {
sleep(Duration::from_millis(50)).await;
}
Err(e) => bail!("ffmpeg HTTP listener never came up on 127.0.0.1:{port}: {e}"),
}
}
}