multi-viewer: broadcast fanout + supervisor lifecycle
One gst capture pipeline now fans out to N concurrent viewers via a tokio::sync::broadcast<Arc<Vec<u8>>>. The HTTP listener accepts forever; each accepted connection spawns a sender task draining its own broadcast::Receiver. Slow consumers see Lagged and skip ahead — MPEG-TS resyncs at the next keyframe. Host runtime is now lazy + sticky: a supervisor task owns the capture handle and viewer count. First viewer triggers capture::spawn; last viewer triggers shutdown. Subsequent reconnects re-trigger the portal dialog as expected. --max-viewers (default 2) caps concurrent viewers; additional connections get a "host is full" refusal and are dropped. Banner updated to reflect the new lifecycle and viewer cap. NOT YET RUNTIME-VERIFIED. cargo build is clean and the pipeline-level smoke test still passes, but the multi-viewer behavior (cap enforcement, lazy-sticky restart, concurrent fanout) requires manual end-to-end testing with the portal dialog + multiple mpv instances. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
@@ -45,6 +45,11 @@ pub struct Cli {
|
||||
#[arg(long)]
|
||||
pub low_latency: bool,
|
||||
|
||||
/// Maximum number of concurrent viewers. Additional connections are
|
||||
/// politely refused with a "host full" message.
|
||||
#[arg(long, default_value_t = 2)]
|
||||
pub max_viewers: u32,
|
||||
|
||||
// ── viewer options ────────────────────────────────────────────────
|
||||
/// Local TCP port for the viewer to expose (default: random).
|
||||
#[arg(long, default_value_t = 0)]
|
||||
@@ -76,6 +81,7 @@ pub struct HostOpts {
|
||||
pub framerate: u32,
|
||||
pub no_hwencode: bool,
|
||||
pub low_latency: bool,
|
||||
pub max_viewers: u32,
|
||||
pub interactive: bool,
|
||||
}
|
||||
|
||||
@@ -96,6 +102,7 @@ impl Cli {
|
||||
framerate: self.framerate,
|
||||
no_hwencode: self.no_hwencode,
|
||||
low_latency: self.low_latency,
|
||||
max_viewers: self.max_viewers,
|
||||
interactive,
|
||||
}
|
||||
}
|
||||
|
||||
+161
-37
@@ -5,10 +5,25 @@ use anyhow::{Result, bail};
|
||||
use iroh::Endpoint;
|
||||
use iroh::endpoint::{Connection, presets};
|
||||
use iroh_tickets::endpoint::EndpointTicket;
|
||||
use std::time::Duration;
|
||||
use tokio::sync::{mpsc, oneshot};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
use crate::cli::HostOpts;
|
||||
use crate::common::{alpn::ALPN, deps, display::DisplayServer, signal};
|
||||
use crate::common::{alpn::ALPN, deps, display::DisplayServer, signal, tunnel};
|
||||
|
||||
use self::capture::CaptureHandle;
|
||||
|
||||
/// Messages from per-viewer tasks to the capture supervisor.
|
||||
enum SupervisorMsg {
|
||||
/// A new viewer wants in. Supervisor replies with the local capture
|
||||
/// HTTP port to connect to, or an error string if the host is full or
|
||||
/// capture spawn failed.
|
||||
AddViewer(oneshot::Sender<Result<u16, String>>),
|
||||
/// A viewer's session ended. Supervisor decrements the count and tears
|
||||
/// down capture if it just hit zero.
|
||||
RemoveViewer,
|
||||
}
|
||||
|
||||
pub async fn run(opts: HostOpts) -> Result<()> {
|
||||
let display = DisplayServer::resolve(opts.display_server);
|
||||
@@ -21,6 +36,10 @@ pub async fn run(opts: HostOpts) -> Result<()> {
|
||||
);
|
||||
}
|
||||
|
||||
if opts.max_viewers == 0 {
|
||||
bail!("--max-viewers must be at least 1");
|
||||
}
|
||||
|
||||
let cancel = signal::install_ctrl_c();
|
||||
|
||||
let endpoint = Endpoint::builder(presets::N0)
|
||||
@@ -33,65 +52,168 @@ pub async fn run(opts: HostOpts) -> Result<()> {
|
||||
let clipboard_ok = opts.interactive && copy_to_clipboard(&ticket.to_string());
|
||||
print_host_banner(&ticket, display, &opts, clipboard_ok);
|
||||
|
||||
let result = accept_loop(&endpoint, display, &opts, cancel.clone()).await;
|
||||
let (sup_tx, sup_rx) = mpsc::channel::<SupervisorMsg>(16);
|
||||
let supervisor = tokio::spawn(supervise(opts.clone(), display, sup_rx));
|
||||
|
||||
accept_loop(&endpoint, sup_tx.clone(), cancel.clone()).await;
|
||||
|
||||
drop(sup_tx);
|
||||
let _ = supervisor.await;
|
||||
|
||||
endpoint.close().await;
|
||||
result
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn accept_loop(
|
||||
endpoint: &Endpoint,
|
||||
display: DisplayServer,
|
||||
opts: &HostOpts,
|
||||
sup_tx: mpsc::Sender<SupervisorMsg>,
|
||||
cancel: CancellationToken,
|
||||
) -> Result<()> {
|
||||
tokio::select! {
|
||||
_ = cancel.cancelled() => {
|
||||
tracing::info!("cancellation requested before any peer connected");
|
||||
Ok(())
|
||||
}
|
||||
accepted = endpoint.accept() => {
|
||||
let Some(incoming) = accepted else {
|
||||
bail!("endpoint stopped accepting connections");
|
||||
};
|
||||
let conn = incoming.await?;
|
||||
let remote = conn.remote_id();
|
||||
tracing::info!(%remote, "peer connected");
|
||||
eprintln!("\n[pixelpass] peer connected: {remote}\n");
|
||||
handle_peer(conn, display, opts, cancel).await
|
||||
) {
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = cancel.cancelled() => {
|
||||
tracing::info!("cancellation requested — closing accept loop");
|
||||
return;
|
||||
}
|
||||
accepted = endpoint.accept() => {
|
||||
let Some(incoming) = accepted else {
|
||||
tracing::info!("endpoint stopped accepting connections");
|
||||
return;
|
||||
};
|
||||
let conn = match incoming.await {
|
||||
Ok(c) => c,
|
||||
Err(e) => {
|
||||
tracing::warn!("incoming connection failed: {e:#}");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let sup_tx = sup_tx.clone();
|
||||
let cancel = cancel.clone();
|
||||
tokio::spawn(handle_peer(conn, sup_tx, cancel));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_peer(
|
||||
conn: Connection,
|
||||
display: DisplayServer,
|
||||
opts: &HostOpts,
|
||||
sup_tx: mpsc::Sender<SupervisorMsg>,
|
||||
cancel: CancellationToken,
|
||||
) -> Result<()> {
|
||||
let (quic_send, quic_recv) = conn.accept_bi().await?;
|
||||
) {
|
||||
let remote = conn.remote_id();
|
||||
|
||||
let capture_handle = capture::spawn(display, opts).await?;
|
||||
let port = capture_handle.local_port();
|
||||
let tcp = wayland::connect_to_capture(port, std::time::Duration::from_secs(5)).await?;
|
||||
let (reply_tx, reply_rx) = oneshot::channel();
|
||||
if sup_tx.send(SupervisorMsg::AddViewer(reply_tx)).await.is_err() {
|
||||
tracing::warn!(%remote, "supervisor channel closed; dropping peer");
|
||||
return;
|
||||
}
|
||||
let port = match reply_rx.await {
|
||||
Ok(Ok(p)) => p,
|
||||
Ok(Err(reason)) => {
|
||||
tracing::warn!(%remote, %reason, "refusing viewer");
|
||||
eprintln!("[pixelpass] refusing viewer {remote}: {reason}");
|
||||
return;
|
||||
}
|
||||
Err(_) => {
|
||||
tracing::warn!(%remote, "supervisor reply dropped; dropping peer");
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let bridge = crate::common::tunnel::bridge(quic_send, quic_recv, tcp);
|
||||
let (quic_send, quic_recv) = match conn.accept_bi().await {
|
||||
Ok(s) => s,
|
||||
Err(e) => {
|
||||
tracing::warn!(%remote, "accept_bi failed: {e:#}");
|
||||
let _ = sup_tx.send(SupervisorMsg::RemoveViewer).await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
eprintln!("[pixelpass] viewer connected: {remote}");
|
||||
|
||||
let tcp = match wayland::connect_to_capture(port, Duration::from_secs(5)).await {
|
||||
Ok(t) => t,
|
||||
Err(e) => {
|
||||
tracing::warn!(%remote, "connect_to_capture failed: {e:#}");
|
||||
let _ = sup_tx.send(SupervisorMsg::RemoveViewer).await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let bridge = tunnel::bridge(quic_send, quic_recv, tcp);
|
||||
tokio::select! {
|
||||
res = bridge => {
|
||||
if let Err(e) = res {
|
||||
tracing::warn!("bridge ended with error: {e:#}");
|
||||
tracing::warn!(%remote, "bridge ended with error: {e:#}");
|
||||
} else {
|
||||
tracing::info!("bridge closed cleanly");
|
||||
tracing::info!(%remote, "bridge closed cleanly");
|
||||
}
|
||||
}
|
||||
_ = cancel.cancelled() => {
|
||||
tracing::info!("cancellation requested during stream");
|
||||
tracing::info!(%remote, "cancellation during stream");
|
||||
}
|
||||
}
|
||||
|
||||
capture_handle.shutdown().await;
|
||||
Ok(())
|
||||
eprintln!("[pixelpass] viewer disconnected: {remote}");
|
||||
let _ = sup_tx.send(SupervisorMsg::RemoveViewer).await;
|
||||
}
|
||||
|
||||
/// Owns the single shared CaptureHandle and the active viewer count. Spawns
|
||||
/// capture lazily on the first AddViewer; tears it down when the count drops
|
||||
/// back to zero. Enforces the --max-viewers cap by refusing AddViewer when
|
||||
/// the count is already at the cap.
|
||||
async fn supervise(
|
||||
opts: HostOpts,
|
||||
display: DisplayServer,
|
||||
mut rx: mpsc::Receiver<SupervisorMsg>,
|
||||
) {
|
||||
let mut handle: Option<CaptureHandle> = None;
|
||||
let mut count: u32 = 0;
|
||||
|
||||
while let Some(msg) = rx.recv().await {
|
||||
match msg {
|
||||
SupervisorMsg::AddViewer(reply) => {
|
||||
if count >= opts.max_viewers {
|
||||
let _ = reply.send(Err(format!(
|
||||
"host is full ({} of {} viewers connected)",
|
||||
count, opts.max_viewers
|
||||
)));
|
||||
continue;
|
||||
}
|
||||
|
||||
if handle.is_none() {
|
||||
tracing::info!("first viewer arriving — spawning capture");
|
||||
match capture::spawn(display, &opts).await {
|
||||
Ok(h) => handle = Some(h),
|
||||
Err(e) => {
|
||||
let _ = reply.send(Err(format!("capture spawn failed: {e:#}")));
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let port = handle.as_ref().expect("handle was just set").local_port();
|
||||
count += 1;
|
||||
let _ = reply.send(Ok(port));
|
||||
tracing::info!(active = count, cap = opts.max_viewers, "viewer joined");
|
||||
}
|
||||
SupervisorMsg::RemoveViewer => {
|
||||
count = count.saturating_sub(1);
|
||||
tracing::info!(active = count, cap = opts.max_viewers, "viewer left");
|
||||
if count == 0
|
||||
&& let Some(h) = handle.take()
|
||||
{
|
||||
tracing::info!("last viewer left — tearing down capture");
|
||||
h.shutdown().await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(h) = handle.take() {
|
||||
tracing::info!("host shutdown — tearing down capture");
|
||||
h.shutdown().await;
|
||||
}
|
||||
}
|
||||
|
||||
fn print_host_banner(
|
||||
@@ -106,19 +228,21 @@ fn print_host_banner(
|
||||
eprintln!("│ capture : {}", capture_summary(opts));
|
||||
eprintln!("│ bitrate / fps : {} kbps @ {} fps", opts.bitrate, opts.framerate);
|
||||
eprintln!("│ hw encode : {}", if opts.no_hwencode { "off" } else { "auto (VAAPI if available)" });
|
||||
eprintln!("│ max viewers : {}", opts.max_viewers);
|
||||
eprintln!("│");
|
||||
if clipboard_ok {
|
||||
eprintln!("│ Your share code has been copied to your clipboard.");
|
||||
eprintln!("│ Send it to your viewer. (If clipboard didn't work, the");
|
||||
eprintln!("│ Send it to your viewer(s). (If clipboard didn't work, the");
|
||||
eprintln!("│ code is also shown below for manual copy.)");
|
||||
} else {
|
||||
eprintln!("│ Share this ticket with your viewer:");
|
||||
eprintln!("│ Share this ticket with your viewer(s):");
|
||||
}
|
||||
eprintln!("│");
|
||||
eprintln!("│ pixelpass {ticket}");
|
||||
eprintln!("│");
|
||||
eprintln!("│ Capture will not start until the viewer connects.");
|
||||
eprintln!("│ Press Ctrl+C to stop.");
|
||||
eprintln!("│ Capture starts when the first viewer connects, runs while");
|
||||
eprintln!("│ any viewer is connected, and tears down when the last one");
|
||||
eprintln!("│ leaves. Press Ctrl+C to stop the host entirely.");
|
||||
eprintln!("└────────────────────────────────────────────────────────────");
|
||||
eprintln!();
|
||||
}
|
||||
|
||||
+94
-17
@@ -1,7 +1,10 @@
|
||||
//! Wayland capture: ashpd ScreenCast portal → PipeWire fd → gst-launch
|
||||
//! pipewiresrc → MPEG-TS on gst stdout → in-process HTTP server bound on a
|
||||
//! random localhost port. The host bridge TCP-connects to that server and
|
||||
//! pumps bytes to QUIC.
|
||||
//! random localhost port. One gst child feeds a tokio::sync::broadcast channel;
|
||||
//! the HTTP listener accepts multiple connections and each one drains its own
|
||||
//! fresh broadcast::Receiver — so a single capture pipeline fans out to N
|
||||
//! concurrent viewers. Slow consumers see Lagged and skip ahead; the MPEG-TS
|
||||
//! stream resyncs at the next keyframe.
|
||||
|
||||
use anyhow::{Context, Result, bail};
|
||||
use ashpd::{
|
||||
@@ -16,18 +19,30 @@ use nix::sys::signal::{Signal, kill};
|
||||
use nix::unistd::{Pid, close};
|
||||
use std::os::fd::{AsFd, IntoRawFd, OwnedFd, RawFd};
|
||||
use std::process::Stdio;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
use tokio::net::{TcpListener, TcpStream};
|
||||
use tokio::process::{Child, ChildStdout, Command};
|
||||
use tokio::sync::broadcast;
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio::time::{Instant, sleep, timeout};
|
||||
|
||||
use crate::cli::HostOpts;
|
||||
|
||||
/// Broadcast-channel capacity in chunks. Each chunk is up to 64 KiB from gst
|
||||
/// stdout, so 16 chunks ≈ 1 MiB ≈ ~2 s of buffered jitter at the default
|
||||
/// 4 Mbps bitrate. A viewer that falls behind by more than this gets Lagged
|
||||
/// and skips ahead — MPEG-TS resyncs at the next keyframe.
|
||||
const FANOUT_CAPACITY: usize = 16;
|
||||
|
||||
/// Size of each chunk read from gst stdout.
|
||||
const READ_CHUNK: usize = 64 * 1024;
|
||||
|
||||
pub struct CaptureHandle {
|
||||
port: u16,
|
||||
gst: Option<Child>,
|
||||
reader: Option<JoinHandle<()>>,
|
||||
server: Option<JoinHandle<()>>,
|
||||
}
|
||||
|
||||
@@ -37,8 +52,8 @@ impl CaptureHandle {
|
||||
}
|
||||
|
||||
/// Graceful teardown: SIGTERM gst, give it ~1s to exit, then SIGKILL, then
|
||||
/// abort the HTTP server task. Call this before dropping; Drop only fires
|
||||
/// the kill backstop.
|
||||
/// abort the reader + accept-loop tasks. Call this before dropping; Drop
|
||||
/// only fires the kill backstop.
|
||||
pub async fn shutdown(mut self) {
|
||||
if let Some(child) = self.gst.as_mut()
|
||||
&& let Some(pid) = child.id()
|
||||
@@ -49,6 +64,9 @@ impl CaptureHandle {
|
||||
let _ = timeout(Duration::from_millis(1000), child.wait()).await;
|
||||
let _ = child.start_kill();
|
||||
}
|
||||
if let Some(task) = self.reader.take() {
|
||||
task.abort();
|
||||
}
|
||||
if let Some(task) = self.server.take() {
|
||||
task.abort();
|
||||
}
|
||||
@@ -60,6 +78,9 @@ impl Drop for CaptureHandle {
|
||||
if let Some(child) = self.gst.as_mut() {
|
||||
let _ = child.start_kill();
|
||||
}
|
||||
if let Some(task) = self.reader.as_ref() {
|
||||
task.abort();
|
||||
}
|
||||
if let Some(task) = self.server.as_ref() {
|
||||
task.abort();
|
||||
}
|
||||
@@ -199,28 +220,68 @@ pub async fn start(opts: &HostOpts) -> Result<CaptureHandle> {
|
||||
.take()
|
||||
.context("gst-launch-1.0 stdout pipe unavailable")?;
|
||||
|
||||
// 4. Spawn the HTTP server task. It owns the listener + gst stdout: it
|
||||
// accepts one client (the host's bridge socket via connect_to_capture),
|
||||
// drains the HTTP request, writes a fixed MPEG-TS response, then
|
||||
// copies gst stdout to the socket forever.
|
||||
let server = tokio::spawn(serve_capture(listener, gst_stdout));
|
||||
// 4. Set up the broadcast fanout. The reader task pumps gst stdout chunks
|
||||
// into the channel; the accept-loop task spawns one sender task per
|
||||
// accepted TCP connection, each draining a fresh broadcast::Receiver.
|
||||
let (tx, _) = broadcast::channel::<Arc<Vec<u8>>>(FANOUT_CAPACITY);
|
||||
|
||||
let reader = tokio::spawn(pump_gst_to_broadcast(gst_stdout, tx.clone()));
|
||||
let server = tokio::spawn(run_accept_loop(listener, tx));
|
||||
|
||||
Ok(CaptureHandle {
|
||||
port,
|
||||
gst: Some(gst),
|
||||
reader: Some(reader),
|
||||
server: Some(server),
|
||||
})
|
||||
}
|
||||
|
||||
async fn serve_capture(listener: TcpListener, mut gst_stdout: ChildStdout) {
|
||||
let mut sock = match listener.accept().await {
|
||||
Ok((s, _)) => s,
|
||||
Err(e) => {
|
||||
tracing::warn!("capture HTTP accept failed: {e}");
|
||||
return;
|
||||
/// Reads gst's stdout in chunks and broadcasts each to all current subscribers.
|
||||
/// `broadcast::send` returns Err when there are no receivers; we ignore it and
|
||||
/// keep reading so gst doesn't backpressure waiting for a viewer.
|
||||
async fn pump_gst_to_broadcast(
|
||||
mut gst_stdout: ChildStdout,
|
||||
tx: broadcast::Sender<Arc<Vec<u8>>>,
|
||||
) {
|
||||
let mut buf = vec![0u8; READ_CHUNK];
|
||||
loop {
|
||||
match gst_stdout.read(&mut buf).await {
|
||||
Ok(0) => {
|
||||
tracing::info!("gst stdout EOF — fanout reader exiting");
|
||||
return;
|
||||
}
|
||||
Ok(n) => {
|
||||
let chunk = Arc::new(buf[..n].to_vec());
|
||||
let _ = tx.send(chunk);
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!("gst stdout read error: {e}");
|
||||
return;
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
/// Accepts TCP connections on the local capture port forever. Each accepted
|
||||
/// connection becomes its own viewer-serving task with a private receiver.
|
||||
async fn run_accept_loop(listener: TcpListener, tx: broadcast::Sender<Arc<Vec<u8>>>) {
|
||||
loop {
|
||||
let sock = match listener.accept().await {
|
||||
Ok((s, _)) => s,
|
||||
Err(e) => {
|
||||
tracing::warn!("capture HTTP accept failed: {e}");
|
||||
return;
|
||||
}
|
||||
};
|
||||
let rx = tx.subscribe();
|
||||
tokio::spawn(serve_one_viewer(sock, rx));
|
||||
}
|
||||
}
|
||||
|
||||
/// Drains the HTTP request, writes a fixed 200 OK, then pumps broadcast
|
||||
/// chunks to the socket until the channel closes or the socket errors out.
|
||||
/// On Lagged (slow consumer), skip ahead — MPEG-TS recovers at next keyframe.
|
||||
async fn serve_one_viewer(mut sock: TcpStream, mut rx: broadcast::Receiver<Arc<Vec<u8>>>) {
|
||||
if !drain_http_request(&mut sock).await {
|
||||
return;
|
||||
}
|
||||
@@ -234,7 +295,23 @@ async fn serve_capture(listener: TcpListener, mut gst_stdout: ChildStdout) {
|
||||
return;
|
||||
}
|
||||
|
||||
let _ = tokio::io::copy(&mut gst_stdout, &mut sock).await;
|
||||
loop {
|
||||
match rx.recv().await {
|
||||
Ok(chunk) => {
|
||||
if sock.write_all(&chunk).await.is_err() {
|
||||
return;
|
||||
}
|
||||
}
|
||||
Err(broadcast::error::RecvError::Lagged(skipped)) => {
|
||||
tracing::warn!(
|
||||
skipped,
|
||||
"viewer fanout lagged — MPEG-TS will resync at next keyframe"
|
||||
);
|
||||
continue;
|
||||
}
|
||||
Err(broadcast::error::RecvError::Closed) => return,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn drain_http_request(sock: &mut TcpStream) -> bool {
|
||||
|
||||
Reference in New Issue
Block a user