feat(host): X11 capture backend + shared pipeline extraction
Extract the display-agnostic encode/mux tail out of wayland.rs into a new host/pipeline.rs: CaptureHandle + lifecycle, audio routing setup, the gst arg builder, the spawn, and Serve::bind now live there. Backends supply only their video-source element args plus a post-spawn hook (Wayland uses it to close its leaked pipewire fd; X11 passes a no-op). capture.rs collapses to a thin dispatcher; its CaptureHandle enum is gone. Add host/x11.rs: ximagesrc (use-damage=false show-pointer=true), whole root window by default or a single window via --window (xwininfo click-picker → xid). x11rb reads geometry for an info log, justifying the previously-vestigial dep. No portal, no fd dance — capture starts silently when the first viewer connects (the ticket is the access control). Viewer is display-agnostic and unchanged. Wire --no-hwencode for real (was a no-op): the shared tail now selects x264enc(tune=zerolatency,ultrafast)/I420 vs vah264enc/NV12 and switches the videoconvert target format to match. Applies to both backends. deps.rs: check_host_binaries now takes &HostOpts and checks shared elements for both backends, encoder by --no-hwencode, source per backend (pipewiresrc/ximagesrc), and xwininfo only when X11 + --window. Install hints added for x264enc, ximagesrc, xwininfo. Verified: warning-free build; smoke test still passes (tail unchanged); ximagesrc + both encoder tails produce mpv-decodable H.264 against an Xwayland root. Interactive cross-machine end-to-end pending. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
+19
-193
@@ -1,8 +1,9 @@
|
||||
//! Wayland capture: ashpd ScreenCast portal → PipeWire fd → gst-launch.
|
||||
//! Builds the gst pipeline that produces MPEG-TS on stdout, then hands
|
||||
//! that stdout to [`super::serve::Serve`] which handles the HTTP fanout.
|
||||
//! Wayland capture: ashpd ScreenCast portal → PipeWire fd → `pipewiresrc`.
|
||||
//! This module owns only the portal handshake and the source-element args;
|
||||
//! the shared encode/mux tail, gst spawn, and serving live in
|
||||
//! [`super::pipeline`].
|
||||
|
||||
use anyhow::{Context, Result, bail};
|
||||
use anyhow::{Context, Result};
|
||||
use ashpd::{
|
||||
WindowIdentifier,
|
||||
desktop::{
|
||||
@@ -11,64 +12,12 @@ use ashpd::{
|
||||
},
|
||||
};
|
||||
use nix::fcntl::{FcntlArg, FdFlag, fcntl};
|
||||
use nix::sys::signal::{Signal, kill};
|
||||
use nix::unistd::{Pid, close};
|
||||
use nix::unistd::close;
|
||||
use std::os::fd::{AsFd, IntoRawFd, OwnedFd, RawFd};
|
||||
use std::process::Stdio;
|
||||
use std::time::Duration;
|
||||
use tokio::process::{Child, Command};
|
||||
use tokio::time::timeout;
|
||||
|
||||
use super::audio::Routing;
|
||||
use super::serve::Serve;
|
||||
use super::pipeline::{self, CaptureHandle};
|
||||
use crate::cli::HostOpts;
|
||||
|
||||
pub struct CaptureHandle {
|
||||
gst: Option<Child>,
|
||||
audio: Option<Routing>,
|
||||
serve: Option<Serve>,
|
||||
}
|
||||
|
||||
impl CaptureHandle {
|
||||
pub fn local_port(&self) -> u16 {
|
||||
self.serve
|
||||
.as_ref()
|
||||
.expect("serve is always Some until shutdown")
|
||||
.local_port()
|
||||
}
|
||||
|
||||
/// Graceful teardown: SIGTERM gst, give it ~1s to exit, then SIGKILL,
|
||||
/// unload audio routing (if any), then tear down the serve layer.
|
||||
/// The serve reader will see EOF on gst stdout and exit on its own;
|
||||
/// serve.shutdown() is the backstop.
|
||||
pub async fn shutdown(mut self) {
|
||||
if let Some(child) = self.gst.as_mut()
|
||||
&& let Some(pid) = child.id()
|
||||
{
|
||||
let _ = kill(Pid::from_raw(pid as i32), Signal::SIGTERM);
|
||||
}
|
||||
if let Some(child) = self.gst.as_mut() {
|
||||
let _ = timeout(Duration::from_millis(1000), child.wait()).await;
|
||||
let _ = child.start_kill();
|
||||
}
|
||||
if let Some(audio) = self.audio.take() {
|
||||
audio.shutdown();
|
||||
}
|
||||
if let Some(serve) = self.serve.take() {
|
||||
serve.shutdown().await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for CaptureHandle {
|
||||
fn drop(&mut self) {
|
||||
if let Some(child) = self.gst.as_mut() {
|
||||
let _ = child.start_kill();
|
||||
}
|
||||
// Routing's and Serve's own Drop impls handle the rest.
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn start(opts: &HostOpts) -> Result<CaptureHandle> {
|
||||
// 1. Negotiate the screencast session with the portal.
|
||||
let proxy = Screencast::new()
|
||||
@@ -108,124 +57,23 @@ pub async fn start(opts: &HostOpts) -> Result<CaptureHandle> {
|
||||
tracing::info!(node_id, width = w, height = h, "portal handshake complete");
|
||||
// The fd is CLOEXEC by default; the gst child needs to inherit it across
|
||||
// exec. We then leak it via into_raw_fd so its lifetime spans the spawn,
|
||||
// and close the parent's copy once gst is running.
|
||||
// and close the parent's copy once gst is running (the pipeline's
|
||||
// after_spawn hook below).
|
||||
clear_cloexec(&pw_fd)?;
|
||||
let raw_fd: RawFd = pw_fd.into_raw_fd();
|
||||
|
||||
// 2. Spawn gst-launch with the full pipeline: video AND audio captured,
|
||||
// encoded, and muxed into MPEG-TS inside gst. Output goes to stdout,
|
||||
// which the serve layer pipes to its HTTP fanout — no demux/remux,
|
||||
// no codec assumptions.
|
||||
let key_interval = (opts.framerate * 2).to_string();
|
||||
let bitrate = opts.bitrate.to_string();
|
||||
let source_args = vec![
|
||||
"pipewiresrc".to_string(),
|
||||
format!("fd={raw_fd}"),
|
||||
format!("path={node_id}"),
|
||||
"do-timestamp=true".to_string(),
|
||||
];
|
||||
|
||||
// Audio routing activates when either:
|
||||
// - `opts.app` is set (per-stream rerouting to a per-PID null-sink),
|
||||
// - or `PIXELPASS_AUDIO_VIA_NULL_SINK=1` is set (no app filter, just
|
||||
// captures everything via the null-sink → useful for development
|
||||
// and dogfooding the loopback path before app filtering is picked).
|
||||
let routing_requested =
|
||||
opts.app.is_some() || std::env::var_os("PIXELPASS_AUDIO_VIA_NULL_SINK").is_some();
|
||||
let audio_routing = if routing_requested {
|
||||
Some(
|
||||
Routing::start(opts)
|
||||
.await
|
||||
.context("audio routing setup failed")?,
|
||||
)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let audio_device = if let Some(r) = &audio_routing {
|
||||
format!("device={}.monitor", r.sink_name())
|
||||
} else {
|
||||
let default = default_audio_monitor().await?;
|
||||
format!("device={default}")
|
||||
};
|
||||
let mut gst_cmd = Command::new("gst-launch-1.0");
|
||||
gst_cmd
|
||||
.args([
|
||||
// muxer + sink
|
||||
"mpegtsmux",
|
||||
"name=mux",
|
||||
"!",
|
||||
"queue",
|
||||
"!",
|
||||
"fdsink",
|
||||
"fd=1",
|
||||
// video branch — videorate caps to 30fps so we don't ship at the
|
||||
// monitor's refresh rate (e.g. 180Hz) and pile up frames in mpv's
|
||||
// demuxer queue faster than realtime.
|
||||
"pipewiresrc",
|
||||
&format!("fd={raw_fd}"),
|
||||
&format!("path={node_id}"),
|
||||
"do-timestamp=true",
|
||||
"!",
|
||||
"videorate",
|
||||
"!",
|
||||
&format!("video/x-raw,framerate={}/1", opts.framerate),
|
||||
"!",
|
||||
"queue",
|
||||
"!",
|
||||
"videoconvert",
|
||||
"!",
|
||||
"video/x-raw,format=NV12",
|
||||
"!",
|
||||
"vah264enc",
|
||||
"rate-control=cbr",
|
||||
&format!("bitrate={bitrate}"),
|
||||
&format!("key-int-max={key_interval}"),
|
||||
"!",
|
||||
"h264parse",
|
||||
"config-interval=-1",
|
||||
"!",
|
||||
"video/x-h264,stream-format=byte-stream,alignment=au",
|
||||
"!",
|
||||
"mux.",
|
||||
// audio branch — capture the default sink's MONITOR (system audio
|
||||
// out), not the default source (which is the mic).
|
||||
"pulsesrc",
|
||||
&audio_device,
|
||||
"do-timestamp=true",
|
||||
"!",
|
||||
"queue",
|
||||
"!",
|
||||
"audioconvert",
|
||||
"!",
|
||||
"audioresample",
|
||||
"!",
|
||||
"audio/x-raw,rate=48000,channels=2",
|
||||
"!",
|
||||
"avenc_aac",
|
||||
"bitrate=128000",
|
||||
"!",
|
||||
"aacparse",
|
||||
"!",
|
||||
"mux.",
|
||||
])
|
||||
.stdin(Stdio::null())
|
||||
.stdout(Stdio::piped())
|
||||
.stderr(Stdio::inherit());
|
||||
if std::env::var_os("PIXELPASS_GST_DEBUG").is_some() {
|
||||
gst_cmd.env("GST_DEBUG", "3");
|
||||
}
|
||||
let mut gst = gst_cmd.spawn().context("failed to spawn gst-launch-1.0")?;
|
||||
// Parent no longer needs the pipewire fd — gst inherited its own copy.
|
||||
let _ = close(raw_fd);
|
||||
|
||||
let gst_stdout = gst
|
||||
.stdout
|
||||
.take()
|
||||
.context("gst-launch-1.0 stdout pipe unavailable")?;
|
||||
|
||||
// 3. Hand stdout to the serve layer, which binds the localhost HTTP
|
||||
// listener and runs the broadcast fanout.
|
||||
let serve = Serve::bind(gst_stdout).await?;
|
||||
|
||||
Ok(CaptureHandle {
|
||||
gst: Some(gst),
|
||||
audio: audio_routing,
|
||||
serve: Some(serve),
|
||||
pipeline::spawn(opts, source_args, move || {
|
||||
// Parent no longer needs the pipewire fd — gst inherited its own copy.
|
||||
let _ = close(raw_fd);
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
fn clear_cloexec(fd: &impl AsFd) -> Result<()> {
|
||||
@@ -235,25 +83,3 @@ fn clear_cloexec(fd: &impl AsFd) -> Result<()> {
|
||||
fcntl(fd.as_fd(), FcntlArg::F_SETFD(flags)).context("F_SETFD on pipewire fd")?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn default_audio_monitor() -> Result<String> {
|
||||
let output = Command::new("pactl")
|
||||
.arg("get-default-sink")
|
||||
.output()
|
||||
.await
|
||||
.context("failed to run `pactl get-default-sink` (install pulseaudio-utils or pipewire-pulse)")?;
|
||||
if !output.status.success() {
|
||||
bail!(
|
||||
"pactl get-default-sink failed: {}",
|
||||
String::from_utf8_lossy(&output.stderr).trim()
|
||||
);
|
||||
}
|
||||
let sink = String::from_utf8(output.stdout)
|
||||
.context("default sink name was not UTF-8")?
|
||||
.trim()
|
||||
.to_string();
|
||||
if sink.is_empty() {
|
||||
bail!("pactl get-default-sink returned no name (is a sound server running?)");
|
||||
}
|
||||
Ok(format!("{sink}.monitor"))
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user