//! Per-app audio routing. //! //! Two cooperating layers: //! //! - **Null-sink + loopback** (pactl shell-out): a per-PID null-sink //! `pixelpass_capture_` plus a `module-loopback` that mirrors the //! default sink's monitor into it. gst captures from the null-sink's //! monitor, so the viewer hears whatever the user hears — by default. //! //! - **Per-stream rerouting** (libpipewire on a dedicated OS thread): //! when [`HostOpts::app`] is set, a [`StreamRouter`] subscribes to the //! PipeWire registry, finds `Stream/Output/Audio` nodes whose //! `application.name` matches the filter, and writes //! `target.object` to the "default" metadata so WirePlumber reroutes //! them to our null-sink. Once at least one stream is actually routed, //! the loopback is unloaded — otherwise the viewer would hear the //! filtered audio twice (once via the routed stream, once via the //! default-sink monitor loopback). //! //! - **Local monitor** (pactl shell-out, app mode only): rerouting *moves* //! the chosen app off the sharer's speakers into the null-sink, so without //! this the sharer would go deaf to the very content they're sharing. We //! mirror the null-sink's monitor back to `@DEFAULT_SINK@` so the sharer //! hears it too. Only the chosen app is in the null-sink — never the //! desktop/call — so this can't echo back into the capture. It is loaded on //! the first routed stream (after the default-sink loopback is gone, so the //! two never coexist and feed back) and unloaded when the app stops. //! //! pactl is the right tool for the one-shot null-sink/loopback graph //! mutations. libpipewire is dragged in only when per-stream filtering //! is requested, because that needs registry-event subscription. use anyhow::{Context, Result, bail}; use std::cell::RefCell; use std::collections::BTreeMap; use std::process::Command; use std::rc::Rc; use std::sync::{Arc, Mutex}; use std::thread::JoinHandle; use crate::cli::HostOpts; /// Owns the pactl-loaded modules plus, when filtering is active, the /// libpipewire stream-router thread. Drop unloads modules as a backstop; /// prefer [`Routing::shutdown`] explicitly so failures get logged. pub struct Routing { sink_module: Option, /// Shared with the event task so it can `take()` and unload on the /// first successful route. `Routing::shutdown` unloads whatever /// remains. loopback_module: Arc>>, /// The `null-sink.monitor → @DEFAULT_SINK@` loopback that lets the sharer /// hear the routed app. Shared with the event task, which loads it on the /// first routed stream and unloads it when the app stops. `None` outside /// app mode and whenever no app is currently routed. local_monitor_module: Arc>>, sink_name: String, stream_router: Option, event_task: Option>, } impl Routing { /// Create the per-PID null-sink + loopback. If `opts.app` is set, /// also spawn the libpipewire thread that reroutes matching streams. pub async fn start(opts: &HostOpts) -> Result { let pid = std::process::id(); let sink_name = format!("pixelpass_capture_{pid}"); let sink_module = load_module(&["module-null-sink", &format!("sink_name={sink_name}")]) .context("failed to load module-null-sink")?; // In strict per-app mode we never mirror the default sink: the viewer // must hear *only* the chosen app, never the whole desktop (which would // leak e.g. a voice call the sharer is in back to viewers — the echo // bug A23). Without strict mode (whole-desktop share, or best-effort // app filtering) we load the monitor loopback so the viewer hears // system audio immediately and during any gap before the app routes. // 20ms loopback latency keeps the mirrored audio tight; pactl's // default of 200ms is enough to be perceptible. let strict_app = opts.app.is_some() && opts.strict_audio; let loopback_module = if strict_app { None } else { Some( load_module(&[ "module-loopback", "source=@DEFAULT_SINK@.monitor", &format!("sink={sink_name}"), "latency_msec=20", ]) .context("failed to load module-loopback (null-sink cleaned up on Drop)")?, ) }; tracing::info!( sink_module, ?loopback_module, strict_app, %sink_name, "audio routing: null-sink ready (loopback skipped in strict app mode)" ); let loopback_arc = Arc::new(Mutex::new(loopback_module)); let local_monitor_arc = Arc::new(Mutex::new(None)); let mut routing = Self { sink_module: Some(sink_module), loopback_module: Arc::clone(&loopback_arc), local_monitor_module: Arc::clone(&local_monitor_arc), sink_name: sink_name.clone(), stream_router: None, event_task: None, }; if let Some(app) = &opts.app { let (router, mut event_rx) = StreamRouter::spawn(app.clone(), sink_name.clone())?; let loopback_for_task = Arc::clone(&loopback_arc); let local_monitor_for_task = Arc::clone(&local_monitor_arc); let sink_name_for_task = sink_name.clone(); let strict = opts.strict_audio; let event_task = tokio::spawn(async move { use crate::common::output::{self, AppAudioState}; while let Some(ev) = event_rx.recv().await { match ev { Event::FirstRoutedStream => { let mid = loopback_for_task.lock().unwrap().take(); if let Some(id) = mid { tracing::info!( "audio routing: first stream routed → unloading default-sink loopback" ); unload_module(id); } // Mirror the routed app back to the sharer's own // speakers so they hear the content they're sharing. // Loaded *after* the default-sink loopback is gone so // the two never coexist (which would feed back), and // sourced from the null-sink monitor — the chosen app // only, never the desktop/call — so it can't echo into // the capture. if local_monitor_for_task.lock().unwrap().is_none() { match load_module(&[ "module-loopback", &format!("source={sink_name_for_task}.monitor"), "sink=@DEFAULT_SINK@", "latency_msec=20", ]) { Ok(id) => { tracing::info!( module = id, "audio routing: local monitor loaded (sharer hears the shared app)" ); *local_monitor_for_task.lock().unwrap() = Some(id); } Err(e) => tracing::warn!( "audio routing: failed to load local monitor loopback: {e:#}" ), } } // Tell the front-end the chosen app's audio is live. output::emit(output::Event::AppAudio { state: AppAudioState::Routed, }); } Event::LastRoutedStreamGone => { // Routed app exited/paused mid-session. Notify the // front-end either way; the recovery differs by mode. output::emit(output::Event::AppAudio { state: AppAudioState::Lost, }); // The shared app is gone, so its null-sink is silent: // stop mirroring it to the sharer's speakers. Re-loads // on the next FirstRoutedStream if the app resumes. if let Some(id) = local_monitor_for_task.lock().unwrap().take() { tracing::info!( module = id, "audio routing: last routed stream gone → unloading local monitor" ); unload_module(id); } if strict { // Strict mode: do NOT restore the whole-desktop // loopback. Viewers hear silence until the app // produces audio again — never the rest of the // desktop (call included). tracing::info!( "audio routing: strict mode — last routed stream gone, leaving viewers silent" ); continue; } // Best-effort mode: restore the default-sink loopback // so the viewer hears system audio again instead of // silence. if loopback_for_task.lock().unwrap().is_some() { continue; } tracing::info!( "audio routing: last routed stream gone → restoring default-sink loopback" ); match load_module(&[ "module-loopback", "source=@DEFAULT_SINK@.monitor", &format!("sink={sink_name_for_task}"), "latency_msec=20", ]) { Ok(id) => { *loopback_for_task.lock().unwrap() = Some(id); } Err(e) => { tracing::warn!( "audio routing: failed to re-load loopback: {e:#}" ); } } } } } }); routing.stream_router = Some(router); routing.event_task = Some(event_task); } // Strict per-app mode suppresses the default-sink loopback, so until the // chosen app's first stream routes the viewer hears *silence*. Emit an // initial `lost` at capture start (capture is lazy — this runs on the // first viewer) so the front-end can warn from the outset rather than // only after an app that *was* routed later stops (audit A23 P2/F1): // `LastRoutedStreamGone`→`lost` never fires for an app that never routed. if let Some(state) = initial_app_audio_state(opts) { crate::common::output::emit(crate::common::output::Event::AppAudio { state }); } Ok(routing) } pub fn sink_name(&self) -> &str { &self.sink_name } /// Stop the stream router (if any), then unload loopback (if still /// loaded), then unload the null-sink. Order matters: PipeWire can /// leave zombie links if you destroy a sink with active inputs. /// /// Every step is a `take()`, so this is idempotent — `Drop` calls it again /// as a backstop and the second run is a no-op. fn cleanup(&mut self) { if let Some(router) = self.stream_router.take() { router.shutdown(); } if let Some(task) = self.event_task.take() { task.abort(); } if let Some(id) = self.loopback_module.lock().unwrap().take() { unload_module(id); } // Unload the local monitor before the null-sink it reads from, so the // sink has no active loopback reader when it's destroyed. if let Some(id) = self.local_monitor_module.lock().unwrap().take() { unload_module(id); } if let Some(id) = self.sink_module.take() { unload_module(id); } } /// Consume the routing and tear it all down now. `Drop` is the backstop; /// the real work lives in [`cleanup`](Self::cleanup). pub fn shutdown(mut self) { self.cleanup(); } } impl Drop for Routing { fn drop(&mut self) { self.cleanup(); } } /// The app-audio state to announce at capture start, if any. Only strict per-app /// mode warrants one: there the loopback is suppressed, so the viewer hears /// silence until the chosen app's first stream routes — surface that as an /// initial `lost`. In every other mode (whole-desktop, or best-effort app /// filtering) the loopback keeps audio flowing from the outset, so there is no /// initial gap to report. Pure: no I/O, so the emit decision is unit-testable. pub(super) fn initial_app_audio_state( opts: &HostOpts, ) -> Option { (opts.app.is_some() && opts.strict_audio).then_some(crate::common::output::AppAudioState::Lost) } // ────────────────────────────────────────────────────────────────────── // App enumeration (interactive picker source) // ────────────────────────────────────────────────────────────────────── /// One deduplicated app currently producing audio. The picker in /// interactive mode shows these as the per-app capture choices. #[derive(Debug, Clone)] pub struct App { pub name: String, pub stream_count: u32, } /// Enumerate apps currently sending audio to any sink, deduplicated by /// `application.name`. Returns an empty Vec if nothing is playing. pub fn list_playing_apps() -> Result> { let output = Command::new("pactl") .args(["-f", "json", "list", "sink-inputs"]) .output() .context("failed to run `pactl -f json list sink-inputs`")?; if !output.status.success() { bail!( "pactl list sink-inputs failed: {}", String::from_utf8_lossy(&output.stderr).trim() ); } parse_sink_inputs(&output.stdout) } fn parse_sink_inputs(stdout: &[u8]) -> Result> { let entries: Vec = serde_json::from_slice(stdout).context("pactl returned unparseable JSON")?; let mut counts: BTreeMap = BTreeMap::new(); for entry in entries { let Some(name) = entry.properties.application_name else { continue; }; let trimmed = name.trim(); if trimmed.is_empty() { continue; } *counts.entry(trimmed.to_string()).or_insert(0) += 1; } Ok(counts .into_iter() .map(|(name, stream_count)| App { name, stream_count }) .collect()) } #[derive(serde::Deserialize)] struct SinkInput { properties: SinkInputProperties, } #[derive(serde::Deserialize)] struct SinkInputProperties { #[serde(rename = "application.name")] application_name: Option, } // ────────────────────────────────────────────────────────────────────── // pactl module helpers // ────────────────────────────────────────────────────────────────────── fn load_module(args: &[&str]) -> Result { let output = Command::new("pactl") .arg("load-module") .args(args) .output() .context("failed to run pactl load-module")?; if !output.status.success() { bail!( "pactl load-module failed: {}", String::from_utf8_lossy(&output.stderr).trim() ); } let id_str = String::from_utf8(output.stdout) .context("pactl returned non-UTF-8")? .trim() .to_string(); id_str .parse::() .with_context(|| format!("pactl returned unexpected module ID: {id_str:?}")) } fn unload_module(id: u32) { let result = Command::new("pactl") .arg("unload-module") .arg(id.to_string()) .output(); match result { Ok(output) if output.status.success() => { tracing::info!(module = id, "audio routing: unloaded pactl module"); } Ok(output) => { tracing::warn!( module = id, stderr = %String::from_utf8_lossy(&output.stderr).trim(), "audio routing: pactl unload-module exited non-zero" ); } Err(e) => { tracing::warn!( module = id, "audio routing: failed to run pactl unload-module: {e}" ); } } } // ────────────────────────────────────────────────────────────────────── // Per-stream routing (libpipewire thread) // ────────────────────────────────────────────────────────────────────── /// Command from tokio → libpipewire thread. enum Cmd { /// Clear `target.object` for everything we routed, then quit the /// MainLoop so the thread joins. Shutdown, } /// Event from libpipewire thread → tokio. The pair drives loopback /// oscillation: unload on `FirstRoutedStream`, re-load on /// `LastRoutedStreamGone`. Both fire on count-transitions (0→N and N→0 /// respectively), not on every change. enum Event { /// At least one stream is now routed to our sink. Receiver unloads /// the default-sink loopback so the filtered audio isn't doubled. FirstRoutedStream, /// The last routed stream just disappeared (app closed, paused, /// switched output). Receiver re-loads the default-sink loopback so /// the viewer doesn't go silent. LastRoutedStreamGone, } /// Handle to the libpipewire stream-router thread. pub struct StreamRouter { cmd_tx: pipewire::channel::Sender, thread: Option>, } impl StreamRouter { /// Spawn the libpipewire thread. Returns the router handle and the /// event receiver tokio side polls. fn spawn( filter_name: String, sink_name: String, ) -> Result<(Self, tokio::sync::mpsc::UnboundedReceiver)> { let (cmd_tx, cmd_rx) = pipewire::channel::channel::(); let (event_tx, event_rx) = tokio::sync::mpsc::unbounded_channel::(); let thread = std::thread::Builder::new() .name("pixelpass-pw-router".to_string()) .spawn(move || { if let Err(e) = run_router(filter_name, sink_name, cmd_rx, event_tx) { tracing::warn!("audio routing: libpipewire thread exited with error: {e:#}"); } }) .context("failed to spawn libpipewire router thread")?; Ok(( Self { cmd_tx, thread: Some(thread), }, event_rx, )) } fn shutdown(mut self) { // Best-effort: if the send fails the thread is already gone. let _ = self.cmd_tx.send(Cmd::Shutdown); if let Some(t) = self.thread.take() && let Err(e) = t.join() { tracing::warn!("audio routing: pw thread join failed: {e:?}"); } } } /// Body of the libpipewire thread. Owns MainLoop, registry listener, and /// all PipeWire proxies for the duration of the routing session. fn run_router( filter_name: String, sink_name: String, cmd_rx: pipewire::channel::Receiver, event_tx: tokio::sync::mpsc::UnboundedSender, ) -> Result<()> { use pipewire::{self as pw, types::ObjectType}; let main_loop = pw::main_loop::MainLoopRc::new(None).context("pw main loop construction failed")?; let context = pw::context::ContextRc::new(&main_loop, None).context("pw context construction failed")?; let core = context .connect_rc(None) .context("pw core connect failed (is the daemon running?)")?; let registry = core.get_registry_rc().context("pw get_registry failed")?; let state = Rc::new(RefCell::new(RouterState { sink_serial: None, default_metadata: None, routed_node_ids: Vec::new(), pending: Vec::new(), })); // Cmd handler: clear metadata for routed streams, then quit. let main_loop_for_cmd = main_loop.clone(); let state_for_cmd = Rc::clone(&state); let _cmd_recv = cmd_rx.attach(main_loop.loop_(), move |cmd| match cmd { Cmd::Shutdown => { let s = state_for_cmd.borrow(); if let Some(meta) = &s.default_metadata { for &nid in &s.routed_node_ids { meta.set_property(nid, "target.object", None, None); } if !s.routed_node_ids.is_empty() { tracing::info!( n = s.routed_node_ids.len(), "audio routing: cleared target.object on routed streams before quitting" ); } } main_loop_for_cmd.quit(); } }); let filter_lower = filter_name.to_ascii_lowercase(); let sink_name_owned = sink_name.clone(); let registry_weak = registry.downgrade(); let state_for_reg = Rc::clone(&state); let event_tx_for_reg = event_tx.clone(); let state_for_remove = Rc::clone(&state); let event_tx_for_remove = event_tx.clone(); let _reg_listener = registry .add_listener_local() .global(move |obj| { let Some(reg) = registry_weak.upgrade() else { return; }; match obj.type_ { ObjectType::Node => { let Some(props) = obj.props.as_ref() else { return; }; if props.get("node.name") == Some(sink_name_owned.as_str()) { if let Some(serial) = props .get("object.serial") .and_then(|s| s.parse::().ok()) { state_for_reg.borrow_mut().sink_serial = Some(serial); tracing::info!(serial, "audio routing: pixelpass sink registered"); try_flush(&state_for_reg, &event_tx_for_reg); } return; } if props.get("media.class") != Some("Stream/Output/Audio") { return; } let Some(app) = props.get("application.name") else { return; }; if !app.eq_ignore_ascii_case(&filter_lower) { return; } tracing::info!( node_id = obj.id, %app, "audio routing: matched stream, queued for route" ); state_for_reg.borrow_mut().pending.push(obj.id); try_flush(&state_for_reg, &event_tx_for_reg); } ObjectType::Metadata => { let Some(props) = obj.props.as_ref() else { return; }; if props.get("metadata.name") != Some("default") { return; } let metadata: pw::metadata::Metadata = match reg.bind(obj) { Ok(m) => m, Err(e) => { tracing::warn!("audio routing: bind default metadata failed: {e}"); return; } }; state_for_reg.borrow_mut().default_metadata = Some(metadata); tracing::info!("audio routing: default metadata bound"); try_flush(&state_for_reg, &event_tx_for_reg); } _ => {} } }) .global_remove(move |id| { handle_global_remove(&state_for_remove, &event_tx_for_remove, id); }) .register(); tracing::info!(filter = %filter_name, "audio routing: pw thread running"); main_loop.run(); tracing::info!("audio routing: pw thread exiting"); Ok(()) } struct RouterState { sink_serial: Option, default_metadata: Option, routed_node_ids: Vec, pending: Vec, } /// Drop the vanished node from `routed_node_ids` and `pending`. If it /// was the last routed stream, emit `LastRoutedStreamGone` so the /// tokio side restores the default-sink loopback. fn handle_global_remove( state: &Rc>, event_tx: &tokio::sync::mpsc::UnboundedSender, id: u32, ) { let mut s = state.borrow_mut(); let was_routed = !s.routed_node_ids.is_empty(); s.routed_node_ids.retain(|&x| x != id); s.pending.retain(|&x| x != id); if was_routed && s.routed_node_ids.is_empty() { tracing::info!( node_id = id, "audio routing: last routed stream disappeared" ); let _ = event_tx.send(Event::LastRoutedStreamGone); } } /// Drain pending streams to the sink, but only once both prerequisites /// (sink serial known + default metadata bound) are in place. Emits /// `FirstRoutedStream` when routed count crosses 0→N (so it fires /// each time the count comes back up from zero, not just the first /// time — pairs with `LastRoutedStreamGone` to oscillate the loopback). fn try_flush( state: &Rc>, event_tx: &tokio::sync::mpsc::UnboundedSender, ) { let mut s = state.borrow_mut(); let Some(serial) = s.sink_serial else { return }; if s.default_metadata.is_none() { return; } if s.pending.is_empty() { return; } let was_empty = s.routed_node_ids.is_empty(); let serial_str = serial.to_string(); let pending = std::mem::take(&mut s.pending); if let Some(meta) = &s.default_metadata { for nid in &pending { meta.set_property(*nid, "target.object", Some("Spa:Id"), Some(&serial_str)); tracing::info!( node_id = *nid, sink_serial = serial, "audio routing: stream routed to pixelpass sink" ); } } s.routed_node_ids.extend(pending); if was_empty && !s.routed_node_ids.is_empty() { let _ = event_tx.send(Event::FirstRoutedStream); } }