From 8d32ded412ad2cc46071b06a1035edb3e2a77299 Mon Sep 17 00:00:00 2001 From: Mollusk Date: Fri, 22 May 2026 05:11:48 -0400 Subject: [PATCH] host/audio: per-PID PipeWire null-sink + loopback scaffolding MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Session 1 of the per-app audio routing feature. Adds host/audio.rs with a Routing struct that owns the lifecycle of two pactl-loaded modules: a per-PID null-sink (pixelpass_capture_) and a loopback mirroring @DEFAULT_SINK@.monitor into it at 20ms latency. Activated by PIXELPASS_AUDIO_VIA_NULL_SINK=1 — kept hidden behind an env var because without per-stream filtering (session 3) the user-facing behavior of --app foo would be identical to no flag, which would mislead users about what the flag does. When the env var is set, wayland::start substitutes the gst pulsesrc device from {DEFAULT_SINK}.monitor to pixelpass_capture_.monitor; audio still works end-to-end via the loopback. CaptureHandle owns the Routing alongside gst and serve; teardown order is gst → audio → serve so streams unlink from the null-sink before the sink is destroyed. Lifecycle is via pactl shell-outs rather than pipewire-rs. Null-sink + loopback are one-shot graph mutations with no event subscription; the libpipewire route would mean dragging a MainLoop thread in for no benefit until session 3 needs stream events. Known cosmetic: the null-sink appears in Plasma's audio mixer as a user-facing volume slider. Pactl's sink_properties= quoting is fiddly enough that the device.hidden=true fix is parked for a follow-up. Co-Authored-By: Claude Opus 4.7 --- src/host/audio.rs | 136 ++++++++++++++++++++++++++++++++++++++++++++ src/host/mod.rs | 1 + src/host/wayland.rs | 32 +++++++++-- 3 files changed, 164 insertions(+), 5 deletions(-) create mode 100644 src/host/audio.rs diff --git a/src/host/audio.rs b/src/host/audio.rs new file mode 100644 index 0000000..2e5b35a --- /dev/null +++ b/src/host/audio.rs @@ -0,0 +1,136 @@ +//! Per-app audio routing scaffolding. +//! +//! Session 1: owns a per-PID PipeWire null-sink and a loopback that +//! mirrors the default sink's monitor into it, so the gst pipeline can +//! `pulsesrc` from `pixelpass_capture_.monitor` and still hear all +//! system audio. No per-stream filtering yet — that lands in session 3, +//! at which point the loopback gets dropped once any stream is being +//! routed exclusively. +//! +//! Module lifecycle is via `pactl load-module` / `pactl unload-module` +//! shell-outs. PipeWire offers a Rust binding (already in deps) but +//! null-sink + loopback are one-shot graph mutations with no event +//! subscription; the shell-out is simpler and avoids dragging a +//! pipewire MainLoop thread into the picture until session 3 actually +//! needs it for stream events. + +use anyhow::{Context, Result, bail}; +use std::process::Command; + +/// Owns the pactl module IDs for the null-sink and its loopback. Drop +/// unloads both as a backstop; prefer calling [`Routing::shutdown`] +/// explicitly so failures get logged. +pub struct Routing { + sink_module: Option, + loopback_module: Option, + sink_name: String, +} + +impl Routing { + /// Create a per-PID null-sink and mirror the default sink's monitor + /// into it. Returns once both modules are loaded. + pub fn start() -> Result { + let pid = std::process::id(); + let sink_name = format!("pixelpass_capture_{pid}"); + + let sink_module = load_module(&[ + "module-null-sink", + &format!("sink_name={sink_name}"), + ]) + .context("failed to load module-null-sink")?; + + let mut routing = Self { + sink_module: Some(sink_module), + loopback_module: None, + sink_name: sink_name.clone(), + }; + + // 20ms loopback latency keeps the mirrored audio tight; pactl's + // default of 200ms is enough to be perceptible. + let loopback_module = load_module(&[ + "module-loopback", + "source=@DEFAULT_SINK@.monitor", + &format!("sink={sink_name}"), + "latency_msec=20", + ]) + .context("failed to load module-loopback (null-sink will be cleaned up on Drop)")?; + routing.loopback_module = Some(loopback_module); + + tracing::info!( + sink_module, + loopback_module, + %sink_name, + "audio routing: null-sink + loopback ready" + ); + Ok(routing) + } + + pub fn sink_name(&self) -> &str { + &self.sink_name + } + + /// Unload loopback first, then the null-sink. Order matters: PipeWire + /// can leave zombie links if you destroy a sink with active inputs. + pub fn shutdown(mut self) { + if let Some(id) = self.loopback_module.take() { + unload_module(id); + } + if let Some(id) = self.sink_module.take() { + unload_module(id); + } + } +} + +impl Drop for Routing { + fn drop(&mut self) { + if let Some(id) = self.loopback_module.take() { + unload_module(id); + } + if let Some(id) = self.sink_module.take() { + unload_module(id); + } + } +} + +fn load_module(args: &[&str]) -> Result { + let output = Command::new("pactl") + .arg("load-module") + .args(args) + .output() + .context("failed to run pactl load-module")?; + if !output.status.success() { + bail!( + "pactl load-module failed: {}", + String::from_utf8_lossy(&output.stderr).trim() + ); + } + let id_str = String::from_utf8(output.stdout) + .context("pactl returned non-UTF-8")? + .trim() + .to_string(); + id_str + .parse::() + .with_context(|| format!("pactl returned unexpected module ID: {id_str:?}")) +} + +fn unload_module(id: u32) { + let result = Command::new("pactl") + .arg("unload-module") + .arg(id.to_string()) + .output(); + match result { + Ok(output) if output.status.success() => { + tracing::info!(module = id, "audio routing: unloaded pactl module"); + } + Ok(output) => { + tracing::warn!( + module = id, + stderr = %String::from_utf8_lossy(&output.stderr).trim(), + "audio routing: pactl unload-module exited non-zero" + ); + } + Err(e) => { + tracing::warn!(module = id, "audio routing: failed to run pactl unload-module: {e}"); + } + } +} diff --git a/src/host/mod.rs b/src/host/mod.rs index 6bfbac7..0a02665 100644 --- a/src/host/mod.rs +++ b/src/host/mod.rs @@ -1,3 +1,4 @@ +mod audio; mod capture; mod serve; mod wayland; diff --git a/src/host/wayland.rs b/src/host/wayland.rs index 0589873..aebdbc0 100644 --- a/src/host/wayland.rs +++ b/src/host/wayland.rs @@ -19,11 +19,13 @@ use std::time::Duration; use tokio::process::{Child, Command}; use tokio::time::timeout; +use super::audio::Routing; use super::serve::Serve; use crate::cli::HostOpts; pub struct CaptureHandle { gst: Option, + audio: Option, serve: Option, } @@ -36,8 +38,9 @@ impl CaptureHandle { } /// Graceful teardown: SIGTERM gst, give it ~1s to exit, then SIGKILL, - /// then tear down the serve layer. The serve reader will see EOF on - /// gst stdout and exit on its own; serve.shutdown() is the backstop. + /// unload audio routing (if any), then tear down the serve layer. + /// The serve reader will see EOF on gst stdout and exit on its own; + /// serve.shutdown() is the backstop. pub async fn shutdown(mut self) { if let Some(child) = self.gst.as_mut() && let Some(pid) = child.id() @@ -48,6 +51,9 @@ impl CaptureHandle { let _ = timeout(Duration::from_millis(1000), child.wait()).await; let _ = child.start_kill(); } + if let Some(audio) = self.audio.take() { + audio.shutdown(); + } if let Some(serve) = self.serve.take() { serve.shutdown().await; } @@ -59,7 +65,7 @@ impl Drop for CaptureHandle { if let Some(child) = self.gst.as_mut() { let _ = child.start_kill(); } - // Serve's own Drop aborts its tasks. + // Routing's and Serve's own Drop impls handle the rest. } } @@ -112,8 +118,23 @@ pub async fn start(opts: &HostOpts) -> Result { // no codec assumptions. let key_interval = (opts.framerate * 2).to_string(); let bitrate = opts.bitrate.to_string(); - let audio_monitor = default_audio_monitor().await?; - let audio_device = format!("device={audio_monitor}"); + + // PIXELPASS_AUDIO_VIA_NULL_SINK=1 routes audio through a per-PID + // PipeWire null-sink + default-sink loopback instead of tapping the + // default sink's monitor directly. Same audio captured either way at + // this stage; the null-sink path is groundwork for per-app filtering + // in a follow-up. + let audio_routing = if std::env::var_os("PIXELPASS_AUDIO_VIA_NULL_SINK").is_some() { + Some(Routing::start().context("audio routing setup failed")?) + } else { + None + }; + let audio_device = if let Some(r) = &audio_routing { + format!("device={}.monitor", r.sink_name()) + } else { + let default = default_audio_monitor().await?; + format!("device={default}") + }; let mut gst_cmd = Command::new("gst-launch-1.0"); gst_cmd .args([ @@ -196,6 +217,7 @@ pub async fn start(opts: &HostOpts) -> Result { Ok(CaptureHandle { gst: Some(gst), + audio: audio_routing, serve: Some(serve), }) }