host/audio: oscillate loopback on stream lifecycle (session 4 of 4)

Subscribe registry.global_remove so we know when routed stream nodes
vanish; drop them from routed_node_ids and emit LastRoutedStreamGone
on the N→0 transition. Tokio side re-runs `pactl load-module
module-loopback` with the same args as start, restoring the
default-sink monitor mirror so the viewer hears system audio again
instead of going silent when the routed app exits mid-session.

FirstRoutedStream now fires on every 0→N transition (not just the
first), so the pair oscillates cleanly: each app open/close cycle
unloads → re-loads the loopback.

Verified cross-machine 2026-05-22 16:29 EDT — host with Strawberry
picked, laptop viewer over mpv with YouTube playing on host as a
control. Strawberry audible on laptop, YouTube silent (route active).
Quit Strawberry → YouTube became audible (loopback restored).
Reopened Strawberry → routed again, YouTube dropped out (loopback
unloaded). Clean Ctrl+C teardown.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
2026-05-22 16:29:28 -04:00
parent a144665f41
commit 54ebe96ca1
+64 -3
View File
@@ -85,6 +85,7 @@ impl Routing {
if let Some(app) = &opts.app {
let (router, mut event_rx) = StreamRouter::spawn(app.clone(), sink_name.clone())?;
let loopback_for_task = Arc::clone(&loopback_arc);
let sink_name_for_task = sink_name.clone();
let event_task = tokio::spawn(async move {
while let Some(ev) = event_rx.recv().await {
match ev {
@@ -97,6 +98,32 @@ impl Routing {
unload_module(id);
}
}
Event::LastRoutedStreamGone => {
// Routed app exited mid-session. Restore the
// default-sink loopback so the viewer hears
// system audio again instead of silence.
if loopback_for_task.lock().unwrap().is_some() {
continue;
}
tracing::info!(
"audio routing: last routed stream gone → restoring default-sink loopback"
);
match load_module(&[
"module-loopback",
"source=@DEFAULT_SINK@.monitor",
&format!("sink={sink_name_for_task}"),
"latency_msec=20",
]) {
Ok(id) => {
*loopback_for_task.lock().unwrap() = Some(id);
}
Err(e) => {
tracing::warn!(
"audio routing: failed to re-load loopback: {e:#}"
);
}
}
}
}
}
});
@@ -265,12 +292,18 @@ enum Cmd {
Shutdown,
}
/// Event from libpipewire thread → tokio. Currently a single variant —
/// session 4 will add `LastRoutedStreamGone` to drive loopback recreate.
/// Event from libpipewire thread → tokio. The pair drives loopback
/// oscillation: unload on `FirstRoutedStream`, re-load on
/// `LastRoutedStreamGone`. Both fire on count-transitions (0→N and N→0
/// respectively), not on every change.
enum Event {
/// At least one stream is now routed to our sink. Receiver unloads
/// the default-sink loopback so the filtered audio isn't doubled.
FirstRoutedStream,
/// The last routed stream just disappeared (app closed, paused,
/// switched output). Receiver re-loads the default-sink loopback so
/// the viewer doesn't go silent.
LastRoutedStreamGone,
}
/// Handle to the libpipewire stream-router thread.
@@ -372,6 +405,8 @@ fn run_router(
let registry_weak = registry.downgrade();
let state_for_reg = Rc::clone(&state);
let event_tx_for_reg = event_tx.clone();
let state_for_remove = Rc::clone(&state);
let event_tx_for_remove = event_tx.clone();
let _reg_listener = registry
.add_listener_local()
@@ -428,6 +463,9 @@ fn run_router(
_ => {}
}
})
.global_remove(move |id| {
handle_global_remove(&state_for_remove, &event_tx_for_remove, id);
})
.register();
tracing::info!(filter = %filter_name, "audio routing: pw thread running");
@@ -443,9 +481,32 @@ struct RouterState {
pending: Vec<u32>,
}
/// Drop the vanished node from `routed_node_ids` and `pending`. If it
/// was the last routed stream, emit `LastRoutedStreamGone` so the
/// tokio side restores the default-sink loopback.
fn handle_global_remove(
state: &Rc<RefCell<RouterState>>,
event_tx: &tokio::sync::mpsc::UnboundedSender<Event>,
id: u32,
) {
let mut s = state.borrow_mut();
let was_routed = !s.routed_node_ids.is_empty();
s.routed_node_ids.retain(|&x| x != id);
s.pending.retain(|&x| x != id);
if was_routed && s.routed_node_ids.is_empty() {
tracing::info!(
node_id = id,
"audio routing: last routed stream disappeared"
);
let _ = event_tx.send(Event::LastRoutedStreamGone);
}
}
/// Drain pending streams to the sink, but only once both prerequisites
/// (sink serial known + default metadata bound) are in place. Emits
/// `FirstRoutedStream` exactly once — when routed count crosses 0→N.
/// `FirstRoutedStream` when routed count crosses 0→N (so it fires
/// each time the count comes back up from zero, not just the first
/// time — pairs with `LastRoutedStreamGone` to oscillate the loopback).
fn try_flush(
state: &Rc<RefCell<RouterState>>,
event_tx: &tokio::sync::mpsc::UnboundedSender<Event>,