host/audio: per-PID PipeWire null-sink + loopback scaffolding
Session 1 of the per-app audio routing feature. Adds host/audio.rs
with a Routing struct that owns the lifecycle of two pactl-loaded
modules: a per-PID null-sink (pixelpass_capture_<pid>) and a loopback
mirroring @DEFAULT_SINK@.monitor into it at 20ms latency. Activated
by PIXELPASS_AUDIO_VIA_NULL_SINK=1 — kept hidden behind an env var
because without per-stream filtering (session 3) the user-facing
behavior of --app foo would be identical to no flag, which would
mislead users about what the flag does.
When the env var is set, wayland::start substitutes the gst pulsesrc
device from {DEFAULT_SINK}.monitor to pixelpass_capture_<pid>.monitor;
audio still works end-to-end via the loopback. CaptureHandle owns the
Routing alongside gst and serve; teardown order is gst → audio → serve
so streams unlink from the null-sink before the sink is destroyed.
Lifecycle is via pactl shell-outs rather than pipewire-rs. Null-sink
+ loopback are one-shot graph mutations with no event subscription;
the libpipewire route would mean dragging a MainLoop thread in for no
benefit until session 3 needs stream events.
Known cosmetic: the null-sink appears in Plasma's audio mixer as a
user-facing volume slider. Pactl's sink_properties= quoting is fiddly
enough that the device.hidden=true fix is parked for a follow-up.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,136 @@
|
|||||||
|
//! Per-app audio routing scaffolding.
|
||||||
|
//!
|
||||||
|
//! Session 1: owns a per-PID PipeWire null-sink and a loopback that
|
||||||
|
//! mirrors the default sink's monitor into it, so the gst pipeline can
|
||||||
|
//! `pulsesrc` from `pixelpass_capture_<pid>.monitor` and still hear all
|
||||||
|
//! system audio. No per-stream filtering yet — that lands in session 3,
|
||||||
|
//! at which point the loopback gets dropped once any stream is being
|
||||||
|
//! routed exclusively.
|
||||||
|
//!
|
||||||
|
//! Module lifecycle is via `pactl load-module` / `pactl unload-module`
|
||||||
|
//! shell-outs. PipeWire offers a Rust binding (already in deps) but
|
||||||
|
//! null-sink + loopback are one-shot graph mutations with no event
|
||||||
|
//! subscription; the shell-out is simpler and avoids dragging a
|
||||||
|
//! pipewire MainLoop thread into the picture until session 3 actually
|
||||||
|
//! needs it for stream events.
|
||||||
|
|
||||||
|
use anyhow::{Context, Result, bail};
|
||||||
|
use std::process::Command;
|
||||||
|
|
||||||
|
/// Owns the pactl module IDs for the null-sink and its loopback. Drop
|
||||||
|
/// unloads both as a backstop; prefer calling [`Routing::shutdown`]
|
||||||
|
/// explicitly so failures get logged.
|
||||||
|
pub struct Routing {
|
||||||
|
sink_module: Option<u32>,
|
||||||
|
loopback_module: Option<u32>,
|
||||||
|
sink_name: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Routing {
|
||||||
|
/// Create a per-PID null-sink and mirror the default sink's monitor
|
||||||
|
/// into it. Returns once both modules are loaded.
|
||||||
|
pub fn start() -> Result<Self> {
|
||||||
|
let pid = std::process::id();
|
||||||
|
let sink_name = format!("pixelpass_capture_{pid}");
|
||||||
|
|
||||||
|
let sink_module = load_module(&[
|
||||||
|
"module-null-sink",
|
||||||
|
&format!("sink_name={sink_name}"),
|
||||||
|
])
|
||||||
|
.context("failed to load module-null-sink")?;
|
||||||
|
|
||||||
|
let mut routing = Self {
|
||||||
|
sink_module: Some(sink_module),
|
||||||
|
loopback_module: None,
|
||||||
|
sink_name: sink_name.clone(),
|
||||||
|
};
|
||||||
|
|
||||||
|
// 20ms loopback latency keeps the mirrored audio tight; pactl's
|
||||||
|
// default of 200ms is enough to be perceptible.
|
||||||
|
let loopback_module = load_module(&[
|
||||||
|
"module-loopback",
|
||||||
|
"source=@DEFAULT_SINK@.monitor",
|
||||||
|
&format!("sink={sink_name}"),
|
||||||
|
"latency_msec=20",
|
||||||
|
])
|
||||||
|
.context("failed to load module-loopback (null-sink will be cleaned up on Drop)")?;
|
||||||
|
routing.loopback_module = Some(loopback_module);
|
||||||
|
|
||||||
|
tracing::info!(
|
||||||
|
sink_module,
|
||||||
|
loopback_module,
|
||||||
|
%sink_name,
|
||||||
|
"audio routing: null-sink + loopback ready"
|
||||||
|
);
|
||||||
|
Ok(routing)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn sink_name(&self) -> &str {
|
||||||
|
&self.sink_name
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Unload loopback first, then the null-sink. Order matters: PipeWire
|
||||||
|
/// can leave zombie links if you destroy a sink with active inputs.
|
||||||
|
pub fn shutdown(mut self) {
|
||||||
|
if let Some(id) = self.loopback_module.take() {
|
||||||
|
unload_module(id);
|
||||||
|
}
|
||||||
|
if let Some(id) = self.sink_module.take() {
|
||||||
|
unload_module(id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Drop for Routing {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
if let Some(id) = self.loopback_module.take() {
|
||||||
|
unload_module(id);
|
||||||
|
}
|
||||||
|
if let Some(id) = self.sink_module.take() {
|
||||||
|
unload_module(id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn load_module(args: &[&str]) -> Result<u32> {
|
||||||
|
let output = Command::new("pactl")
|
||||||
|
.arg("load-module")
|
||||||
|
.args(args)
|
||||||
|
.output()
|
||||||
|
.context("failed to run pactl load-module")?;
|
||||||
|
if !output.status.success() {
|
||||||
|
bail!(
|
||||||
|
"pactl load-module failed: {}",
|
||||||
|
String::from_utf8_lossy(&output.stderr).trim()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
let id_str = String::from_utf8(output.stdout)
|
||||||
|
.context("pactl returned non-UTF-8")?
|
||||||
|
.trim()
|
||||||
|
.to_string();
|
||||||
|
id_str
|
||||||
|
.parse::<u32>()
|
||||||
|
.with_context(|| format!("pactl returned unexpected module ID: {id_str:?}"))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn unload_module(id: u32) {
|
||||||
|
let result = Command::new("pactl")
|
||||||
|
.arg("unload-module")
|
||||||
|
.arg(id.to_string())
|
||||||
|
.output();
|
||||||
|
match result {
|
||||||
|
Ok(output) if output.status.success() => {
|
||||||
|
tracing::info!(module = id, "audio routing: unloaded pactl module");
|
||||||
|
}
|
||||||
|
Ok(output) => {
|
||||||
|
tracing::warn!(
|
||||||
|
module = id,
|
||||||
|
stderr = %String::from_utf8_lossy(&output.stderr).trim(),
|
||||||
|
"audio routing: pactl unload-module exited non-zero"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
tracing::warn!(module = id, "audio routing: failed to run pactl unload-module: {e}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,3 +1,4 @@
|
|||||||
|
mod audio;
|
||||||
mod capture;
|
mod capture;
|
||||||
mod serve;
|
mod serve;
|
||||||
mod wayland;
|
mod wayland;
|
||||||
|
|||||||
+27
-5
@@ -19,11 +19,13 @@ use std::time::Duration;
|
|||||||
use tokio::process::{Child, Command};
|
use tokio::process::{Child, Command};
|
||||||
use tokio::time::timeout;
|
use tokio::time::timeout;
|
||||||
|
|
||||||
|
use super::audio::Routing;
|
||||||
use super::serve::Serve;
|
use super::serve::Serve;
|
||||||
use crate::cli::HostOpts;
|
use crate::cli::HostOpts;
|
||||||
|
|
||||||
pub struct CaptureHandle {
|
pub struct CaptureHandle {
|
||||||
gst: Option<Child>,
|
gst: Option<Child>,
|
||||||
|
audio: Option<Routing>,
|
||||||
serve: Option<Serve>,
|
serve: Option<Serve>,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -36,8 +38,9 @@ impl CaptureHandle {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Graceful teardown: SIGTERM gst, give it ~1s to exit, then SIGKILL,
|
/// Graceful teardown: SIGTERM gst, give it ~1s to exit, then SIGKILL,
|
||||||
/// then tear down the serve layer. The serve reader will see EOF on
|
/// unload audio routing (if any), then tear down the serve layer.
|
||||||
/// gst stdout and exit on its own; serve.shutdown() is the backstop.
|
/// The serve reader will see EOF on gst stdout and exit on its own;
|
||||||
|
/// serve.shutdown() is the backstop.
|
||||||
pub async fn shutdown(mut self) {
|
pub async fn shutdown(mut self) {
|
||||||
if let Some(child) = self.gst.as_mut()
|
if let Some(child) = self.gst.as_mut()
|
||||||
&& let Some(pid) = child.id()
|
&& let Some(pid) = child.id()
|
||||||
@@ -48,6 +51,9 @@ impl CaptureHandle {
|
|||||||
let _ = timeout(Duration::from_millis(1000), child.wait()).await;
|
let _ = timeout(Duration::from_millis(1000), child.wait()).await;
|
||||||
let _ = child.start_kill();
|
let _ = child.start_kill();
|
||||||
}
|
}
|
||||||
|
if let Some(audio) = self.audio.take() {
|
||||||
|
audio.shutdown();
|
||||||
|
}
|
||||||
if let Some(serve) = self.serve.take() {
|
if let Some(serve) = self.serve.take() {
|
||||||
serve.shutdown().await;
|
serve.shutdown().await;
|
||||||
}
|
}
|
||||||
@@ -59,7 +65,7 @@ impl Drop for CaptureHandle {
|
|||||||
if let Some(child) = self.gst.as_mut() {
|
if let Some(child) = self.gst.as_mut() {
|
||||||
let _ = child.start_kill();
|
let _ = child.start_kill();
|
||||||
}
|
}
|
||||||
// Serve's own Drop aborts its tasks.
|
// Routing's and Serve's own Drop impls handle the rest.
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -112,8 +118,23 @@ pub async fn start(opts: &HostOpts) -> Result<CaptureHandle> {
|
|||||||
// no codec assumptions.
|
// no codec assumptions.
|
||||||
let key_interval = (opts.framerate * 2).to_string();
|
let key_interval = (opts.framerate * 2).to_string();
|
||||||
let bitrate = opts.bitrate.to_string();
|
let bitrate = opts.bitrate.to_string();
|
||||||
let audio_monitor = default_audio_monitor().await?;
|
|
||||||
let audio_device = format!("device={audio_monitor}");
|
// PIXELPASS_AUDIO_VIA_NULL_SINK=1 routes audio through a per-PID
|
||||||
|
// PipeWire null-sink + default-sink loopback instead of tapping the
|
||||||
|
// default sink's monitor directly. Same audio captured either way at
|
||||||
|
// this stage; the null-sink path is groundwork for per-app filtering
|
||||||
|
// in a follow-up.
|
||||||
|
let audio_routing = if std::env::var_os("PIXELPASS_AUDIO_VIA_NULL_SINK").is_some() {
|
||||||
|
Some(Routing::start().context("audio routing setup failed")?)
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
|
let audio_device = if let Some(r) = &audio_routing {
|
||||||
|
format!("device={}.monitor", r.sink_name())
|
||||||
|
} else {
|
||||||
|
let default = default_audio_monitor().await?;
|
||||||
|
format!("device={default}")
|
||||||
|
};
|
||||||
let mut gst_cmd = Command::new("gst-launch-1.0");
|
let mut gst_cmd = Command::new("gst-launch-1.0");
|
||||||
gst_cmd
|
gst_cmd
|
||||||
.args([
|
.args([
|
||||||
@@ -196,6 +217,7 @@ pub async fn start(opts: &HostOpts) -> Result<CaptureHandle> {
|
|||||||
|
|
||||||
Ok(CaptureHandle {
|
Ok(CaptureHandle {
|
||||||
gst: Some(gst),
|
gst: Some(gst),
|
||||||
|
audio: audio_routing,
|
||||||
serve: Some(serve),
|
serve: Some(serve),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user