From 54ebe96ca1d3aea0b1066d05ed7c751c8b86796d Mon Sep 17 00:00:00 2001 From: Mollusk Date: Fri, 22 May 2026 16:29:28 -0400 Subject: [PATCH] host/audio: oscillate loopback on stream lifecycle (session 4 of 4) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Subscribe registry.global_remove so we know when routed stream nodes vanish; drop them from routed_node_ids and emit LastRoutedStreamGone on the N→0 transition. Tokio side re-runs `pactl load-module module-loopback` with the same args as start, restoring the default-sink monitor mirror so the viewer hears system audio again instead of going silent when the routed app exits mid-session. FirstRoutedStream now fires on every 0→N transition (not just the first), so the pair oscillates cleanly: each app open/close cycle unloads → re-loads the loopback. Verified cross-machine 2026-05-22 16:29 EDT — host with Strawberry picked, laptop viewer over mpv with YouTube playing on host as a control. Strawberry audible on laptop, YouTube silent (route active). Quit Strawberry → YouTube became audible (loopback restored). Reopened Strawberry → routed again, YouTube dropped out (loopback unloaded). Clean Ctrl+C teardown. Co-Authored-By: Claude Opus 4.7 --- src/host/audio.rs | 67 ++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 64 insertions(+), 3 deletions(-) diff --git a/src/host/audio.rs b/src/host/audio.rs index fea2db1..518339e 100644 --- a/src/host/audio.rs +++ b/src/host/audio.rs @@ -85,6 +85,7 @@ impl Routing { if let Some(app) = &opts.app { let (router, mut event_rx) = StreamRouter::spawn(app.clone(), sink_name.clone())?; let loopback_for_task = Arc::clone(&loopback_arc); + let sink_name_for_task = sink_name.clone(); let event_task = tokio::spawn(async move { while let Some(ev) = event_rx.recv().await { match ev { @@ -97,6 +98,32 @@ impl Routing { unload_module(id); } } + Event::LastRoutedStreamGone => { + // Routed app exited mid-session. Restore the + // default-sink loopback so the viewer hears + // system audio again instead of silence. + if loopback_for_task.lock().unwrap().is_some() { + continue; + } + tracing::info!( + "audio routing: last routed stream gone → restoring default-sink loopback" + ); + match load_module(&[ + "module-loopback", + "source=@DEFAULT_SINK@.monitor", + &format!("sink={sink_name_for_task}"), + "latency_msec=20", + ]) { + Ok(id) => { + *loopback_for_task.lock().unwrap() = Some(id); + } + Err(e) => { + tracing::warn!( + "audio routing: failed to re-load loopback: {e:#}" + ); + } + } + } } } }); @@ -265,12 +292,18 @@ enum Cmd { Shutdown, } -/// Event from libpipewire thread → tokio. Currently a single variant — -/// session 4 will add `LastRoutedStreamGone` to drive loopback recreate. +/// Event from libpipewire thread → tokio. The pair drives loopback +/// oscillation: unload on `FirstRoutedStream`, re-load on +/// `LastRoutedStreamGone`. Both fire on count-transitions (0→N and N→0 +/// respectively), not on every change. enum Event { /// At least one stream is now routed to our sink. Receiver unloads /// the default-sink loopback so the filtered audio isn't doubled. FirstRoutedStream, + /// The last routed stream just disappeared (app closed, paused, + /// switched output). Receiver re-loads the default-sink loopback so + /// the viewer doesn't go silent. + LastRoutedStreamGone, } /// Handle to the libpipewire stream-router thread. @@ -372,6 +405,8 @@ fn run_router( let registry_weak = registry.downgrade(); let state_for_reg = Rc::clone(&state); let event_tx_for_reg = event_tx.clone(); + let state_for_remove = Rc::clone(&state); + let event_tx_for_remove = event_tx.clone(); let _reg_listener = registry .add_listener_local() @@ -428,6 +463,9 @@ fn run_router( _ => {} } }) + .global_remove(move |id| { + handle_global_remove(&state_for_remove, &event_tx_for_remove, id); + }) .register(); tracing::info!(filter = %filter_name, "audio routing: pw thread running"); @@ -443,9 +481,32 @@ struct RouterState { pending: Vec, } +/// Drop the vanished node from `routed_node_ids` and `pending`. If it +/// was the last routed stream, emit `LastRoutedStreamGone` so the +/// tokio side restores the default-sink loopback. +fn handle_global_remove( + state: &Rc>, + event_tx: &tokio::sync::mpsc::UnboundedSender, + id: u32, +) { + let mut s = state.borrow_mut(); + let was_routed = !s.routed_node_ids.is_empty(); + s.routed_node_ids.retain(|&x| x != id); + s.pending.retain(|&x| x != id); + if was_routed && s.routed_node_ids.is_empty() { + tracing::info!( + node_id = id, + "audio routing: last routed stream disappeared" + ); + let _ = event_tx.send(Event::LastRoutedStreamGone); + } +} + /// Drain pending streams to the sink, but only once both prerequisites /// (sink serial known + default metadata bound) are in place. Emits -/// `FirstRoutedStream` exactly once — when routed count crosses 0→N. +/// `FirstRoutedStream` when routed count crosses 0→N (so it fires +/// each time the count comes back up from zero, not just the first +/// time — pairs with `LastRoutedStreamGone` to oscillate the loopback). fn try_flush( state: &Rc>, event_tx: &tokio::sync::mpsc::UnboundedSender,