From a144665f4107ff0f8d5984e7e9063fcaaf15c50b Mon Sep 17 00:00:00 2001 From: Mollusk Date: Fri, 22 May 2026 15:56:44 -0400 Subject: [PATCH] host/audio: per-stream routing via libpipewire (session 3 of 4) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When opts.app is set, a dedicated OS thread runs a libpipewire MainLoop, subscribes to the registry, and writes target.object to the "default" metadata so WirePlumber reroutes matching streams to our per-PID null-sink. Activation is now opts.app.is_some() OR the existing PIXELPASS_AUDIO_VIA_NULL_SINK env var (kept for no-filter dogfooding). Threading: tokio side spawns a std::thread; the two sides bridge via pipewire::channel for cmd→thread (Shutdown) and tokio::sync::mpsc for event→tokio (FirstRoutedStream). Cross-thread quit goes through the libpipewire channel so MainLoop is only mutated from its own thread. Shutdown clears target.object on every routed stream before quitting so WirePlumber doesn't log orphans. Routing decisions: - Filter is case-insensitive equality on application.name (predictable; no surprise matches from substring). - target.object is written as Spa:Id with the sink's object.serial. - Default-sink loopback stays loaded until the first stream is actually routed — avoids viewer silence if the user picks an app that isn't producing sound yet. On first route, the event task takes() the loopback module ID and unloads it. Session 2 picker explainer + (app pick saved: ...) banner softening both removed; banner is back to plain app-audio=NAME. Verified end-to-end cross-machine: desktop host with Strawberry selected, laptop viewer over mpv. Strawberry audible on the laptop; YouTube playback started on the desktop was NOT audible on the laptop. Routing isolates the filtered app. Session 4 still open: recreate loopback when the last filtered stream disappears (avoid silence), handle app-disappears-mid-session, multi-instance, --repair coupling for orphan sink cleanup. Co-Authored-By: Claude Opus 4.7 --- src/host/audio.rs | 364 +++++++++++++++++++++++++++++++++++++++----- src/host/mod.rs | 5 +- src/host/wayland.rs | 20 ++- src/interactive.rs | 7 - src/main.rs | 4 + 5 files changed, 343 insertions(+), 57 deletions(-) diff --git a/src/host/audio.rs b/src/host/audio.rs index 35a51d2..fea2db1 100644 --- a/src/host/audio.rs +++ b/src/host/audio.rs @@ -1,50 +1,60 @@ -//! Per-app audio routing scaffolding. +//! Per-app audio routing. //! -//! Session 1: owns a per-PID PipeWire null-sink and a loopback that -//! mirrors the default sink's monitor into it, so the gst pipeline can -//! `pulsesrc` from `pixelpass_capture_.monitor` and still hear all -//! system audio. No per-stream filtering yet — that lands in session 3, -//! at which point the loopback gets dropped once any stream is being -//! routed exclusively. +//! Two cooperating layers: //! -//! Module lifecycle is via `pactl load-module` / `pactl unload-module` -//! shell-outs. PipeWire offers a Rust binding (already in deps) but -//! null-sink + loopback are one-shot graph mutations with no event -//! subscription; the shell-out is simpler and avoids dragging a -//! pipewire MainLoop thread into the picture until session 3 actually -//! needs it for stream events. +//! - **Null-sink + loopback** (pactl shell-out): a per-PID null-sink +//! `pixelpass_capture_` plus a `module-loopback` that mirrors the +//! default sink's monitor into it. gst captures from the null-sink's +//! monitor, so the viewer hears whatever the user hears — by default. +//! +//! - **Per-stream rerouting** (libpipewire on a dedicated OS thread): +//! when [`HostOpts::app`] is set, a [`StreamRouter`] subscribes to the +//! PipeWire registry, finds `Stream/Output/Audio` nodes whose +//! `application.name` matches the filter, and writes +//! `target.object` to the "default" metadata so WirePlumber reroutes +//! them to our null-sink. Once at least one stream is actually routed, +//! the loopback is unloaded — otherwise the viewer would hear the +//! filtered audio twice (once via the routed stream, once via the +//! default-sink monitor loopback). +//! +//! pactl is the right tool for the one-shot null-sink/loopback graph +//! mutations. libpipewire is dragged in only when per-stream filtering +//! is requested, because that needs registry-event subscription. use anyhow::{Context, Result, bail}; +use std::cell::RefCell; use std::collections::BTreeMap; use std::process::Command; +use std::rc::Rc; +use std::sync::{Arc, Mutex}; +use std::thread::JoinHandle; -/// Owns the pactl module IDs for the null-sink and its loopback. Drop -/// unloads both as a backstop; prefer calling [`Routing::shutdown`] -/// explicitly so failures get logged. +use crate::cli::HostOpts; + +/// Owns the pactl-loaded modules plus, when filtering is active, the +/// libpipewire stream-router thread. Drop unloads modules as a backstop; +/// prefer [`Routing::shutdown`] explicitly so failures get logged. pub struct Routing { sink_module: Option, - loopback_module: Option, + /// Shared with the event task so it can `take()` and unload on the + /// first successful route. `Routing::shutdown` unloads whatever + /// remains. + loopback_module: Arc>>, sink_name: String, + stream_router: Option, + event_task: Option>, } impl Routing { - /// Create a per-PID null-sink and mirror the default sink's monitor - /// into it. Returns once both modules are loaded. - pub fn start() -> Result { + /// Create the per-PID null-sink + loopback. If `opts.app` is set, + /// also spawn the libpipewire thread that reroutes matching streams. + pub async fn start(opts: &HostOpts) -> Result { let pid = std::process::id(); let sink_name = format!("pixelpass_capture_{pid}"); - let sink_module = load_module(&[ - "module-null-sink", - &format!("sink_name={sink_name}"), - ]) - .context("failed to load module-null-sink")?; - - let mut routing = Self { - sink_module: Some(sink_module), - loopback_module: None, - sink_name: sink_name.clone(), - }; + let sink_module = + load_module(&["module-null-sink", &format!("sink_name={sink_name}")]) + .context("failed to load module-null-sink")?; // 20ms loopback latency keeps the mirrored audio tight; pactl's // default of 200ms is enough to be perceptible. @@ -55,7 +65,6 @@ impl Routing { "latency_msec=20", ]) .context("failed to load module-loopback (null-sink will be cleaned up on Drop)")?; - routing.loopback_module = Some(loopback_module); tracing::info!( sink_module, @@ -63,6 +72,38 @@ impl Routing { %sink_name, "audio routing: null-sink + loopback ready" ); + + let loopback_arc = Arc::new(Mutex::new(Some(loopback_module))); + let mut routing = Self { + sink_module: Some(sink_module), + loopback_module: Arc::clone(&loopback_arc), + sink_name: sink_name.clone(), + stream_router: None, + event_task: None, + }; + + if let Some(app) = &opts.app { + let (router, mut event_rx) = StreamRouter::spawn(app.clone(), sink_name.clone())?; + let loopback_for_task = Arc::clone(&loopback_arc); + let event_task = tokio::spawn(async move { + while let Some(ev) = event_rx.recv().await { + match ev { + Event::FirstRoutedStream => { + let mid = loopback_for_task.lock().unwrap().take(); + if let Some(id) = mid { + tracing::info!( + "audio routing: first stream routed → unloading default-sink loopback" + ); + unload_module(id); + } + } + } + } + }); + routing.stream_router = Some(router); + routing.event_task = Some(event_task); + } + Ok(routing) } @@ -70,10 +111,17 @@ impl Routing { &self.sink_name } - /// Unload loopback first, then the null-sink. Order matters: PipeWire - /// can leave zombie links if you destroy a sink with active inputs. + /// Stop the stream router (if any), then unload loopback (if still + /// loaded), then unload the null-sink. Order matters: PipeWire can + /// leave zombie links if you destroy a sink with active inputs. pub fn shutdown(mut self) { - if let Some(id) = self.loopback_module.take() { + if let Some(router) = self.stream_router.take() { + router.shutdown(); + } + if let Some(task) = self.event_task.take() { + task.abort(); + } + if let Some(id) = self.loopback_module.lock().unwrap().take() { unload_module(id); } if let Some(id) = self.sink_module.take() { @@ -84,7 +132,13 @@ impl Routing { impl Drop for Routing { fn drop(&mut self) { - if let Some(id) = self.loopback_module.take() { + if let Some(router) = self.stream_router.take() { + router.shutdown(); + } + if let Some(task) = self.event_task.take() { + task.abort(); + } + if let Some(id) = self.loopback_module.lock().unwrap().take() { unload_module(id); } if let Some(id) = self.sink_module.take() { @@ -93,6 +147,10 @@ impl Drop for Routing { } } +// ────────────────────────────────────────────────────────────────────── +// App enumeration (interactive picker source) +// ────────────────────────────────────────────────────────────────────── + /// One deduplicated app currently producing audio. The picker in /// interactive mode shows these as the per-app capture choices. #[derive(Debug, Clone)] @@ -103,8 +161,6 @@ pub struct App { /// Enumerate apps currently sending audio to any sink, deduplicated by /// `application.name`. Returns an empty Vec if nothing is playing. -/// Per-stream routing isn't wired yet — session 2 just plumbs the -/// selection through `HostOpts.app`; routing lands in session 3. pub fn list_playing_apps() -> Result> { let output = Command::new("pactl") .args(["-f", "json", "list", "sink-inputs"]) @@ -148,6 +204,10 @@ struct SinkInputProperties { application_name: Option, } +// ────────────────────────────────────────────────────────────────────── +// pactl module helpers +// ────────────────────────────────────────────────────────────────────── + fn load_module(args: &[&str]) -> Result { let output = Command::new("pactl") .arg("load-module") @@ -186,7 +246,233 @@ fn unload_module(id: u32) { ); } Err(e) => { - tracing::warn!(module = id, "audio routing: failed to run pactl unload-module: {e}"); + tracing::warn!( + module = id, + "audio routing: failed to run pactl unload-module: {e}" + ); } } } + +// ────────────────────────────────────────────────────────────────────── +// Per-stream routing (libpipewire thread) +// ────────────────────────────────────────────────────────────────────── + +/// Command from tokio → libpipewire thread. +enum Cmd { + /// Clear `target.object` for everything we routed, then quit the + /// MainLoop so the thread joins. + Shutdown, +} + +/// Event from libpipewire thread → tokio. Currently a single variant — +/// session 4 will add `LastRoutedStreamGone` to drive loopback recreate. +enum Event { + /// At least one stream is now routed to our sink. Receiver unloads + /// the default-sink loopback so the filtered audio isn't doubled. + FirstRoutedStream, +} + +/// Handle to the libpipewire stream-router thread. +pub struct StreamRouter { + cmd_tx: pipewire::channel::Sender, + thread: Option>, +} + +impl StreamRouter { + /// Spawn the libpipewire thread. Returns the router handle and the + /// event receiver tokio side polls. + fn spawn( + filter_name: String, + sink_name: String, + ) -> Result<(Self, tokio::sync::mpsc::UnboundedReceiver)> { + let (cmd_tx, cmd_rx) = pipewire::channel::channel::(); + let (event_tx, event_rx) = tokio::sync::mpsc::unbounded_channel::(); + + let thread = std::thread::Builder::new() + .name("pixelpass-pw-router".to_string()) + .spawn(move || { + if let Err(e) = run_router(filter_name, sink_name, cmd_rx, event_tx) { + tracing::warn!("audio routing: libpipewire thread exited with error: {e:#}"); + } + }) + .context("failed to spawn libpipewire router thread")?; + + Ok(( + Self { + cmd_tx, + thread: Some(thread), + }, + event_rx, + )) + } + + fn shutdown(mut self) { + // Best-effort: if the send fails the thread is already gone. + let _ = self.cmd_tx.send(Cmd::Shutdown); + if let Some(t) = self.thread.take() + && let Err(e) = t.join() + { + tracing::warn!("audio routing: pw thread join failed: {e:?}"); + } + } +} + +/// Body of the libpipewire thread. Owns MainLoop, registry listener, and +/// all PipeWire proxies for the duration of the routing session. +fn run_router( + filter_name: String, + sink_name: String, + cmd_rx: pipewire::channel::Receiver, + event_tx: tokio::sync::mpsc::UnboundedSender, +) -> Result<()> { + use pipewire::{self as pw, types::ObjectType}; + + let main_loop = pw::main_loop::MainLoopRc::new(None) + .context("pw main loop construction failed")?; + let context = pw::context::ContextRc::new(&main_loop, None) + .context("pw context construction failed")?; + let core = context + .connect_rc(None) + .context("pw core connect failed (is the daemon running?)")?; + let registry = core + .get_registry_rc() + .context("pw get_registry failed")?; + + let state = Rc::new(RefCell::new(RouterState { + sink_serial: None, + default_metadata: None, + routed_node_ids: Vec::new(), + pending: Vec::new(), + })); + + // Cmd handler: clear metadata for routed streams, then quit. + let main_loop_for_cmd = main_loop.clone(); + let state_for_cmd = Rc::clone(&state); + let _cmd_recv = cmd_rx.attach(main_loop.loop_(), move |cmd| match cmd { + Cmd::Shutdown => { + let s = state_for_cmd.borrow(); + if let Some(meta) = &s.default_metadata { + for &nid in &s.routed_node_ids { + meta.set_property(nid, "target.object", None, None); + } + if !s.routed_node_ids.is_empty() { + tracing::info!( + n = s.routed_node_ids.len(), + "audio routing: cleared target.object on routed streams before quitting" + ); + } + } + main_loop_for_cmd.quit(); + } + }); + + let filter_lower = filter_name.to_ascii_lowercase(); + let sink_name_owned = sink_name.clone(); + let registry_weak = registry.downgrade(); + let state_for_reg = Rc::clone(&state); + let event_tx_for_reg = event_tx.clone(); + + let _reg_listener = registry + .add_listener_local() + .global(move |obj| { + let Some(reg) = registry_weak.upgrade() else { return }; + match obj.type_ { + ObjectType::Node => { + let Some(props) = obj.props.as_ref() else { return }; + if props.get("node.name") == Some(sink_name_owned.as_str()) { + if let Some(serial) = props + .get("object.serial") + .and_then(|s| s.parse::().ok()) + { + state_for_reg.borrow_mut().sink_serial = Some(serial); + tracing::info!( + serial, + "audio routing: pixelpass sink registered" + ); + try_flush(&state_for_reg, &event_tx_for_reg); + } + return; + } + if props.get("media.class") != Some("Stream/Output/Audio") { + return; + } + let Some(app) = props.get("application.name") else { return }; + if !app.eq_ignore_ascii_case(&filter_lower) { + return; + } + tracing::info!( + node_id = obj.id, + %app, + "audio routing: matched stream, queued for route" + ); + state_for_reg.borrow_mut().pending.push(obj.id); + try_flush(&state_for_reg, &event_tx_for_reg); + } + ObjectType::Metadata => { + let Some(props) = obj.props.as_ref() else { return }; + if props.get("metadata.name") != Some("default") { + return; + } + let metadata: pw::metadata::Metadata = match reg.bind(obj) { + Ok(m) => m, + Err(e) => { + tracing::warn!("audio routing: bind default metadata failed: {e}"); + return; + } + }; + state_for_reg.borrow_mut().default_metadata = Some(metadata); + tracing::info!("audio routing: default metadata bound"); + try_flush(&state_for_reg, &event_tx_for_reg); + } + _ => {} + } + }) + .register(); + + tracing::info!(filter = %filter_name, "audio routing: pw thread running"); + main_loop.run(); + tracing::info!("audio routing: pw thread exiting"); + Ok(()) +} + +struct RouterState { + sink_serial: Option, + default_metadata: Option, + routed_node_ids: Vec, + pending: Vec, +} + +/// Drain pending streams to the sink, but only once both prerequisites +/// (sink serial known + default metadata bound) are in place. Emits +/// `FirstRoutedStream` exactly once — when routed count crosses 0→N. +fn try_flush( + state: &Rc>, + event_tx: &tokio::sync::mpsc::UnboundedSender, +) { + let mut s = state.borrow_mut(); + let Some(serial) = s.sink_serial else { return }; + if s.default_metadata.is_none() { + return; + } + if s.pending.is_empty() { + return; + } + let was_empty = s.routed_node_ids.is_empty(); + let serial_str = serial.to_string(); + let pending = std::mem::take(&mut s.pending); + if let Some(meta) = &s.default_metadata { + for nid in &pending { + meta.set_property(*nid, "target.object", Some("Spa:Id"), Some(&serial_str)); + tracing::info!( + node_id = *nid, + sink_serial = serial, + "audio routing: stream routed to pixelpass sink" + ); + } + } + s.routed_node_ids.extend(pending); + if was_empty && !s.routed_node_ids.is_empty() { + let _ = event_tx.send(Event::FirstRoutedStream); + } +} diff --git a/src/host/mod.rs b/src/host/mod.rs index 653caad..9820e6d 100644 --- a/src/host/mod.rs +++ b/src/host/mod.rs @@ -317,11 +317,8 @@ fn copy_to_clipboard(text: &str) -> bool { fn capture_summary(opts: &HostOpts) -> String { let mut bits = vec![if opts.window { "window" } else { "fullscreen" }.to_string()]; - // Per-stream routing isn't wired yet (session 3 owns activation), so - // even when an app is selected the audio capture is system-wide. Keep - // the choice visible in the banner but label it honestly. if let Some(app) = &opts.app { - bits.push(format!("system-audio (app pick saved: {app})")); + bits.push(format!("app-audio={app}")); } else { bits.push("system-audio".to_string()); } diff --git a/src/host/wayland.rs b/src/host/wayland.rs index aebdbc0..916f0a1 100644 --- a/src/host/wayland.rs +++ b/src/host/wayland.rs @@ -119,13 +119,19 @@ pub async fn start(opts: &HostOpts) -> Result { let key_interval = (opts.framerate * 2).to_string(); let bitrate = opts.bitrate.to_string(); - // PIXELPASS_AUDIO_VIA_NULL_SINK=1 routes audio through a per-PID - // PipeWire null-sink + default-sink loopback instead of tapping the - // default sink's monitor directly. Same audio captured either way at - // this stage; the null-sink path is groundwork for per-app filtering - // in a follow-up. - let audio_routing = if std::env::var_os("PIXELPASS_AUDIO_VIA_NULL_SINK").is_some() { - Some(Routing::start().context("audio routing setup failed")?) + // Audio routing activates when either: + // - `opts.app` is set (per-stream rerouting to a per-PID null-sink), + // - or `PIXELPASS_AUDIO_VIA_NULL_SINK=1` is set (no app filter, just + // captures everything via the null-sink → useful for development + // and dogfooding the loopback path before app filtering is picked). + let routing_requested = + opts.app.is_some() || std::env::var_os("PIXELPASS_AUDIO_VIA_NULL_SINK").is_some(); + let audio_routing = if routing_requested { + Some( + Routing::start(opts) + .await + .context("audio routing setup failed")?, + ) } else { None }; diff --git a/src/interactive.rs b/src/interactive.rs index 16b00cc..85b6e84 100644 --- a/src/interactive.rs +++ b/src/interactive.rs @@ -40,11 +40,6 @@ pub async fn run(cli: Cli) -> Result<()> { /// producing audio (deduped by `application.name`); user picks one or /// the "all system audio" default. Bypassed when `--app NAME` was given /// on the CLI. -/// -/// Per-stream routing isn't wired yet — session 3 owns activation. For -/// now the picker plumbs the choice into `HostOpts.app` and the host -/// banner records it, but audio capture is still system-wide. The -/// explainer beneath the prompt sets that expectation honestly. fn pick_app(theme: &ColorfulTheme) -> Result> { let apps = match host::audio::list_playing_apps() { Ok(a) => a, @@ -61,8 +56,6 @@ fn pick_app(theme: &ColorfulTheme) -> Result> { eprintln!("No other apps are currently producing audio."); eprintln!("Start your game / music / call first if you want to pick it specifically."); } - eprintln!("Note: per-app routing isn't live yet — all choices currently capture"); - eprintln!("system audio. The selection is saved for when routing ships."); eprintln!(); let mut items = vec!["Capture all system audio (default)".to_string()]; diff --git a/src/main.rs b/src/main.rs index 4e91275..4ff0406 100644 --- a/src/main.rs +++ b/src/main.rs @@ -16,6 +16,10 @@ async fn main() -> Result<()> { let cli = Cli::parse(); init_tracing(cli.verbose); + // libpipewire requires global init before any pw_* call. Idempotent; + // safe to call even when the per-app audio thread never spawns. + pipewire::init(); + if cli.repair { return repair::run().await; }