diff --git a/src/host/audio.rs b/src/host/audio.rs index 35a51d2..fea2db1 100644 --- a/src/host/audio.rs +++ b/src/host/audio.rs @@ -1,50 +1,60 @@ -//! Per-app audio routing scaffolding. +//! Per-app audio routing. //! -//! Session 1: owns a per-PID PipeWire null-sink and a loopback that -//! mirrors the default sink's monitor into it, so the gst pipeline can -//! `pulsesrc` from `pixelpass_capture_.monitor` and still hear all -//! system audio. No per-stream filtering yet — that lands in session 3, -//! at which point the loopback gets dropped once any stream is being -//! routed exclusively. +//! Two cooperating layers: //! -//! Module lifecycle is via `pactl load-module` / `pactl unload-module` -//! shell-outs. PipeWire offers a Rust binding (already in deps) but -//! null-sink + loopback are one-shot graph mutations with no event -//! subscription; the shell-out is simpler and avoids dragging a -//! pipewire MainLoop thread into the picture until session 3 actually -//! needs it for stream events. +//! - **Null-sink + loopback** (pactl shell-out): a per-PID null-sink +//! `pixelpass_capture_` plus a `module-loopback` that mirrors the +//! default sink's monitor into it. gst captures from the null-sink's +//! monitor, so the viewer hears whatever the user hears — by default. +//! +//! - **Per-stream rerouting** (libpipewire on a dedicated OS thread): +//! when [`HostOpts::app`] is set, a [`StreamRouter`] subscribes to the +//! PipeWire registry, finds `Stream/Output/Audio` nodes whose +//! `application.name` matches the filter, and writes +//! `target.object` to the "default" metadata so WirePlumber reroutes +//! them to our null-sink. Once at least one stream is actually routed, +//! the loopback is unloaded — otherwise the viewer would hear the +//! filtered audio twice (once via the routed stream, once via the +//! default-sink monitor loopback). +//! +//! pactl is the right tool for the one-shot null-sink/loopback graph +//! mutations. libpipewire is dragged in only when per-stream filtering +//! is requested, because that needs registry-event subscription. use anyhow::{Context, Result, bail}; +use std::cell::RefCell; use std::collections::BTreeMap; use std::process::Command; +use std::rc::Rc; +use std::sync::{Arc, Mutex}; +use std::thread::JoinHandle; -/// Owns the pactl module IDs for the null-sink and its loopback. Drop -/// unloads both as a backstop; prefer calling [`Routing::shutdown`] -/// explicitly so failures get logged. +use crate::cli::HostOpts; + +/// Owns the pactl-loaded modules plus, when filtering is active, the +/// libpipewire stream-router thread. Drop unloads modules as a backstop; +/// prefer [`Routing::shutdown`] explicitly so failures get logged. pub struct Routing { sink_module: Option, - loopback_module: Option, + /// Shared with the event task so it can `take()` and unload on the + /// first successful route. `Routing::shutdown` unloads whatever + /// remains. + loopback_module: Arc>>, sink_name: String, + stream_router: Option, + event_task: Option>, } impl Routing { - /// Create a per-PID null-sink and mirror the default sink's monitor - /// into it. Returns once both modules are loaded. - pub fn start() -> Result { + /// Create the per-PID null-sink + loopback. If `opts.app` is set, + /// also spawn the libpipewire thread that reroutes matching streams. + pub async fn start(opts: &HostOpts) -> Result { let pid = std::process::id(); let sink_name = format!("pixelpass_capture_{pid}"); - let sink_module = load_module(&[ - "module-null-sink", - &format!("sink_name={sink_name}"), - ]) - .context("failed to load module-null-sink")?; - - let mut routing = Self { - sink_module: Some(sink_module), - loopback_module: None, - sink_name: sink_name.clone(), - }; + let sink_module = + load_module(&["module-null-sink", &format!("sink_name={sink_name}")]) + .context("failed to load module-null-sink")?; // 20ms loopback latency keeps the mirrored audio tight; pactl's // default of 200ms is enough to be perceptible. @@ -55,7 +65,6 @@ impl Routing { "latency_msec=20", ]) .context("failed to load module-loopback (null-sink will be cleaned up on Drop)")?; - routing.loopback_module = Some(loopback_module); tracing::info!( sink_module, @@ -63,6 +72,38 @@ impl Routing { %sink_name, "audio routing: null-sink + loopback ready" ); + + let loopback_arc = Arc::new(Mutex::new(Some(loopback_module))); + let mut routing = Self { + sink_module: Some(sink_module), + loopback_module: Arc::clone(&loopback_arc), + sink_name: sink_name.clone(), + stream_router: None, + event_task: None, + }; + + if let Some(app) = &opts.app { + let (router, mut event_rx) = StreamRouter::spawn(app.clone(), sink_name.clone())?; + let loopback_for_task = Arc::clone(&loopback_arc); + let event_task = tokio::spawn(async move { + while let Some(ev) = event_rx.recv().await { + match ev { + Event::FirstRoutedStream => { + let mid = loopback_for_task.lock().unwrap().take(); + if let Some(id) = mid { + tracing::info!( + "audio routing: first stream routed → unloading default-sink loopback" + ); + unload_module(id); + } + } + } + } + }); + routing.stream_router = Some(router); + routing.event_task = Some(event_task); + } + Ok(routing) } @@ -70,10 +111,17 @@ impl Routing { &self.sink_name } - /// Unload loopback first, then the null-sink. Order matters: PipeWire - /// can leave zombie links if you destroy a sink with active inputs. + /// Stop the stream router (if any), then unload loopback (if still + /// loaded), then unload the null-sink. Order matters: PipeWire can + /// leave zombie links if you destroy a sink with active inputs. pub fn shutdown(mut self) { - if let Some(id) = self.loopback_module.take() { + if let Some(router) = self.stream_router.take() { + router.shutdown(); + } + if let Some(task) = self.event_task.take() { + task.abort(); + } + if let Some(id) = self.loopback_module.lock().unwrap().take() { unload_module(id); } if let Some(id) = self.sink_module.take() { @@ -84,7 +132,13 @@ impl Routing { impl Drop for Routing { fn drop(&mut self) { - if let Some(id) = self.loopback_module.take() { + if let Some(router) = self.stream_router.take() { + router.shutdown(); + } + if let Some(task) = self.event_task.take() { + task.abort(); + } + if let Some(id) = self.loopback_module.lock().unwrap().take() { unload_module(id); } if let Some(id) = self.sink_module.take() { @@ -93,6 +147,10 @@ impl Drop for Routing { } } +// ────────────────────────────────────────────────────────────────────── +// App enumeration (interactive picker source) +// ────────────────────────────────────────────────────────────────────── + /// One deduplicated app currently producing audio. The picker in /// interactive mode shows these as the per-app capture choices. #[derive(Debug, Clone)] @@ -103,8 +161,6 @@ pub struct App { /// Enumerate apps currently sending audio to any sink, deduplicated by /// `application.name`. Returns an empty Vec if nothing is playing. -/// Per-stream routing isn't wired yet — session 2 just plumbs the -/// selection through `HostOpts.app`; routing lands in session 3. pub fn list_playing_apps() -> Result> { let output = Command::new("pactl") .args(["-f", "json", "list", "sink-inputs"]) @@ -148,6 +204,10 @@ struct SinkInputProperties { application_name: Option, } +// ────────────────────────────────────────────────────────────────────── +// pactl module helpers +// ────────────────────────────────────────────────────────────────────── + fn load_module(args: &[&str]) -> Result { let output = Command::new("pactl") .arg("load-module") @@ -186,7 +246,233 @@ fn unload_module(id: u32) { ); } Err(e) => { - tracing::warn!(module = id, "audio routing: failed to run pactl unload-module: {e}"); + tracing::warn!( + module = id, + "audio routing: failed to run pactl unload-module: {e}" + ); } } } + +// ────────────────────────────────────────────────────────────────────── +// Per-stream routing (libpipewire thread) +// ────────────────────────────────────────────────────────────────────── + +/// Command from tokio → libpipewire thread. +enum Cmd { + /// Clear `target.object` for everything we routed, then quit the + /// MainLoop so the thread joins. + Shutdown, +} + +/// Event from libpipewire thread → tokio. Currently a single variant — +/// session 4 will add `LastRoutedStreamGone` to drive loopback recreate. +enum Event { + /// At least one stream is now routed to our sink. Receiver unloads + /// the default-sink loopback so the filtered audio isn't doubled. + FirstRoutedStream, +} + +/// Handle to the libpipewire stream-router thread. +pub struct StreamRouter { + cmd_tx: pipewire::channel::Sender, + thread: Option>, +} + +impl StreamRouter { + /// Spawn the libpipewire thread. Returns the router handle and the + /// event receiver tokio side polls. + fn spawn( + filter_name: String, + sink_name: String, + ) -> Result<(Self, tokio::sync::mpsc::UnboundedReceiver)> { + let (cmd_tx, cmd_rx) = pipewire::channel::channel::(); + let (event_tx, event_rx) = tokio::sync::mpsc::unbounded_channel::(); + + let thread = std::thread::Builder::new() + .name("pixelpass-pw-router".to_string()) + .spawn(move || { + if let Err(e) = run_router(filter_name, sink_name, cmd_rx, event_tx) { + tracing::warn!("audio routing: libpipewire thread exited with error: {e:#}"); + } + }) + .context("failed to spawn libpipewire router thread")?; + + Ok(( + Self { + cmd_tx, + thread: Some(thread), + }, + event_rx, + )) + } + + fn shutdown(mut self) { + // Best-effort: if the send fails the thread is already gone. + let _ = self.cmd_tx.send(Cmd::Shutdown); + if let Some(t) = self.thread.take() + && let Err(e) = t.join() + { + tracing::warn!("audio routing: pw thread join failed: {e:?}"); + } + } +} + +/// Body of the libpipewire thread. Owns MainLoop, registry listener, and +/// all PipeWire proxies for the duration of the routing session. +fn run_router( + filter_name: String, + sink_name: String, + cmd_rx: pipewire::channel::Receiver, + event_tx: tokio::sync::mpsc::UnboundedSender, +) -> Result<()> { + use pipewire::{self as pw, types::ObjectType}; + + let main_loop = pw::main_loop::MainLoopRc::new(None) + .context("pw main loop construction failed")?; + let context = pw::context::ContextRc::new(&main_loop, None) + .context("pw context construction failed")?; + let core = context + .connect_rc(None) + .context("pw core connect failed (is the daemon running?)")?; + let registry = core + .get_registry_rc() + .context("pw get_registry failed")?; + + let state = Rc::new(RefCell::new(RouterState { + sink_serial: None, + default_metadata: None, + routed_node_ids: Vec::new(), + pending: Vec::new(), + })); + + // Cmd handler: clear metadata for routed streams, then quit. + let main_loop_for_cmd = main_loop.clone(); + let state_for_cmd = Rc::clone(&state); + let _cmd_recv = cmd_rx.attach(main_loop.loop_(), move |cmd| match cmd { + Cmd::Shutdown => { + let s = state_for_cmd.borrow(); + if let Some(meta) = &s.default_metadata { + for &nid in &s.routed_node_ids { + meta.set_property(nid, "target.object", None, None); + } + if !s.routed_node_ids.is_empty() { + tracing::info!( + n = s.routed_node_ids.len(), + "audio routing: cleared target.object on routed streams before quitting" + ); + } + } + main_loop_for_cmd.quit(); + } + }); + + let filter_lower = filter_name.to_ascii_lowercase(); + let sink_name_owned = sink_name.clone(); + let registry_weak = registry.downgrade(); + let state_for_reg = Rc::clone(&state); + let event_tx_for_reg = event_tx.clone(); + + let _reg_listener = registry + .add_listener_local() + .global(move |obj| { + let Some(reg) = registry_weak.upgrade() else { return }; + match obj.type_ { + ObjectType::Node => { + let Some(props) = obj.props.as_ref() else { return }; + if props.get("node.name") == Some(sink_name_owned.as_str()) { + if let Some(serial) = props + .get("object.serial") + .and_then(|s| s.parse::().ok()) + { + state_for_reg.borrow_mut().sink_serial = Some(serial); + tracing::info!( + serial, + "audio routing: pixelpass sink registered" + ); + try_flush(&state_for_reg, &event_tx_for_reg); + } + return; + } + if props.get("media.class") != Some("Stream/Output/Audio") { + return; + } + let Some(app) = props.get("application.name") else { return }; + if !app.eq_ignore_ascii_case(&filter_lower) { + return; + } + tracing::info!( + node_id = obj.id, + %app, + "audio routing: matched stream, queued for route" + ); + state_for_reg.borrow_mut().pending.push(obj.id); + try_flush(&state_for_reg, &event_tx_for_reg); + } + ObjectType::Metadata => { + let Some(props) = obj.props.as_ref() else { return }; + if props.get("metadata.name") != Some("default") { + return; + } + let metadata: pw::metadata::Metadata = match reg.bind(obj) { + Ok(m) => m, + Err(e) => { + tracing::warn!("audio routing: bind default metadata failed: {e}"); + return; + } + }; + state_for_reg.borrow_mut().default_metadata = Some(metadata); + tracing::info!("audio routing: default metadata bound"); + try_flush(&state_for_reg, &event_tx_for_reg); + } + _ => {} + } + }) + .register(); + + tracing::info!(filter = %filter_name, "audio routing: pw thread running"); + main_loop.run(); + tracing::info!("audio routing: pw thread exiting"); + Ok(()) +} + +struct RouterState { + sink_serial: Option, + default_metadata: Option, + routed_node_ids: Vec, + pending: Vec, +} + +/// Drain pending streams to the sink, but only once both prerequisites +/// (sink serial known + default metadata bound) are in place. Emits +/// `FirstRoutedStream` exactly once — when routed count crosses 0→N. +fn try_flush( + state: &Rc>, + event_tx: &tokio::sync::mpsc::UnboundedSender, +) { + let mut s = state.borrow_mut(); + let Some(serial) = s.sink_serial else { return }; + if s.default_metadata.is_none() { + return; + } + if s.pending.is_empty() { + return; + } + let was_empty = s.routed_node_ids.is_empty(); + let serial_str = serial.to_string(); + let pending = std::mem::take(&mut s.pending); + if let Some(meta) = &s.default_metadata { + for nid in &pending { + meta.set_property(*nid, "target.object", Some("Spa:Id"), Some(&serial_str)); + tracing::info!( + node_id = *nid, + sink_serial = serial, + "audio routing: stream routed to pixelpass sink" + ); + } + } + s.routed_node_ids.extend(pending); + if was_empty && !s.routed_node_ids.is_empty() { + let _ = event_tx.send(Event::FirstRoutedStream); + } +} diff --git a/src/host/mod.rs b/src/host/mod.rs index 653caad..9820e6d 100644 --- a/src/host/mod.rs +++ b/src/host/mod.rs @@ -317,11 +317,8 @@ fn copy_to_clipboard(text: &str) -> bool { fn capture_summary(opts: &HostOpts) -> String { let mut bits = vec![if opts.window { "window" } else { "fullscreen" }.to_string()]; - // Per-stream routing isn't wired yet (session 3 owns activation), so - // even when an app is selected the audio capture is system-wide. Keep - // the choice visible in the banner but label it honestly. if let Some(app) = &opts.app { - bits.push(format!("system-audio (app pick saved: {app})")); + bits.push(format!("app-audio={app}")); } else { bits.push("system-audio".to_string()); } diff --git a/src/host/wayland.rs b/src/host/wayland.rs index aebdbc0..916f0a1 100644 --- a/src/host/wayland.rs +++ b/src/host/wayland.rs @@ -119,13 +119,19 @@ pub async fn start(opts: &HostOpts) -> Result { let key_interval = (opts.framerate * 2).to_string(); let bitrate = opts.bitrate.to_string(); - // PIXELPASS_AUDIO_VIA_NULL_SINK=1 routes audio through a per-PID - // PipeWire null-sink + default-sink loopback instead of tapping the - // default sink's monitor directly. Same audio captured either way at - // this stage; the null-sink path is groundwork for per-app filtering - // in a follow-up. - let audio_routing = if std::env::var_os("PIXELPASS_AUDIO_VIA_NULL_SINK").is_some() { - Some(Routing::start().context("audio routing setup failed")?) + // Audio routing activates when either: + // - `opts.app` is set (per-stream rerouting to a per-PID null-sink), + // - or `PIXELPASS_AUDIO_VIA_NULL_SINK=1` is set (no app filter, just + // captures everything via the null-sink → useful for development + // and dogfooding the loopback path before app filtering is picked). + let routing_requested = + opts.app.is_some() || std::env::var_os("PIXELPASS_AUDIO_VIA_NULL_SINK").is_some(); + let audio_routing = if routing_requested { + Some( + Routing::start(opts) + .await + .context("audio routing setup failed")?, + ) } else { None }; diff --git a/src/interactive.rs b/src/interactive.rs index 16b00cc..85b6e84 100644 --- a/src/interactive.rs +++ b/src/interactive.rs @@ -40,11 +40,6 @@ pub async fn run(cli: Cli) -> Result<()> { /// producing audio (deduped by `application.name`); user picks one or /// the "all system audio" default. Bypassed when `--app NAME` was given /// on the CLI. -/// -/// Per-stream routing isn't wired yet — session 3 owns activation. For -/// now the picker plumbs the choice into `HostOpts.app` and the host -/// banner records it, but audio capture is still system-wide. The -/// explainer beneath the prompt sets that expectation honestly. fn pick_app(theme: &ColorfulTheme) -> Result> { let apps = match host::audio::list_playing_apps() { Ok(a) => a, @@ -61,8 +56,6 @@ fn pick_app(theme: &ColorfulTheme) -> Result> { eprintln!("No other apps are currently producing audio."); eprintln!("Start your game / music / call first if you want to pick it specifically."); } - eprintln!("Note: per-app routing isn't live yet — all choices currently capture"); - eprintln!("system audio. The selection is saved for when routing ships."); eprintln!(); let mut items = vec!["Capture all system audio (default)".to_string()]; diff --git a/src/main.rs b/src/main.rs index 4e91275..4ff0406 100644 --- a/src/main.rs +++ b/src/main.rs @@ -16,6 +16,10 @@ async fn main() -> Result<()> { let cli = Cli::parse(); init_tracing(cli.verbose); + // libpipewire requires global init before any pw_* call. Idempotent; + // safe to call even when the per-app audio thread never spawns. + pipewire::init(); + if cli.repair { return repair::run().await; }