diff --git a/src/cli.rs b/src/cli.rs index bec26a6..b1d7188 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -45,6 +45,11 @@ pub struct Cli { #[arg(long)] pub low_latency: bool, + /// Maximum number of concurrent viewers. Additional connections are + /// politely refused with a "host full" message. + #[arg(long, default_value_t = 2)] + pub max_viewers: u32, + // ── viewer options ──────────────────────────────────────────────── /// Local TCP port for the viewer to expose (default: random). #[arg(long, default_value_t = 0)] @@ -76,6 +81,7 @@ pub struct HostOpts { pub framerate: u32, pub no_hwencode: bool, pub low_latency: bool, + pub max_viewers: u32, pub interactive: bool, } @@ -96,6 +102,7 @@ impl Cli { framerate: self.framerate, no_hwencode: self.no_hwencode, low_latency: self.low_latency, + max_viewers: self.max_viewers, interactive, } } diff --git a/src/host/mod.rs b/src/host/mod.rs index bd31079..e7adf01 100644 --- a/src/host/mod.rs +++ b/src/host/mod.rs @@ -5,10 +5,25 @@ use anyhow::{Result, bail}; use iroh::Endpoint; use iroh::endpoint::{Connection, presets}; use iroh_tickets::endpoint::EndpointTicket; +use std::time::Duration; +use tokio::sync::{mpsc, oneshot}; use tokio_util::sync::CancellationToken; use crate::cli::HostOpts; -use crate::common::{alpn::ALPN, deps, display::DisplayServer, signal}; +use crate::common::{alpn::ALPN, deps, display::DisplayServer, signal, tunnel}; + +use self::capture::CaptureHandle; + +/// Messages from per-viewer tasks to the capture supervisor. +enum SupervisorMsg { + /// A new viewer wants in. Supervisor replies with the local capture + /// HTTP port to connect to, or an error string if the host is full or + /// capture spawn failed. + AddViewer(oneshot::Sender>), + /// A viewer's session ended. Supervisor decrements the count and tears + /// down capture if it just hit zero. + RemoveViewer, +} pub async fn run(opts: HostOpts) -> Result<()> { let display = DisplayServer::resolve(opts.display_server); @@ -21,6 +36,10 @@ pub async fn run(opts: HostOpts) -> Result<()> { ); } + if opts.max_viewers == 0 { + bail!("--max-viewers must be at least 1"); + } + let cancel = signal::install_ctrl_c(); let endpoint = Endpoint::builder(presets::N0) @@ -33,65 +52,168 @@ pub async fn run(opts: HostOpts) -> Result<()> { let clipboard_ok = opts.interactive && copy_to_clipboard(&ticket.to_string()); print_host_banner(&ticket, display, &opts, clipboard_ok); - let result = accept_loop(&endpoint, display, &opts, cancel.clone()).await; + let (sup_tx, sup_rx) = mpsc::channel::(16); + let supervisor = tokio::spawn(supervise(opts.clone(), display, sup_rx)); + + accept_loop(&endpoint, sup_tx.clone(), cancel.clone()).await; + + drop(sup_tx); + let _ = supervisor.await; endpoint.close().await; - result + Ok(()) } async fn accept_loop( endpoint: &Endpoint, - display: DisplayServer, - opts: &HostOpts, + sup_tx: mpsc::Sender, cancel: CancellationToken, -) -> Result<()> { - tokio::select! { - _ = cancel.cancelled() => { - tracing::info!("cancellation requested before any peer connected"); - Ok(()) - } - accepted = endpoint.accept() => { - let Some(incoming) = accepted else { - bail!("endpoint stopped accepting connections"); - }; - let conn = incoming.await?; - let remote = conn.remote_id(); - tracing::info!(%remote, "peer connected"); - eprintln!("\n[pixelpass] peer connected: {remote}\n"); - handle_peer(conn, display, opts, cancel).await +) { + loop { + tokio::select! { + _ = cancel.cancelled() => { + tracing::info!("cancellation requested — closing accept loop"); + return; + } + accepted = endpoint.accept() => { + let Some(incoming) = accepted else { + tracing::info!("endpoint stopped accepting connections"); + return; + }; + let conn = match incoming.await { + Ok(c) => c, + Err(e) => { + tracing::warn!("incoming connection failed: {e:#}"); + continue; + } + }; + let sup_tx = sup_tx.clone(); + let cancel = cancel.clone(); + tokio::spawn(handle_peer(conn, sup_tx, cancel)); + } } } } async fn handle_peer( conn: Connection, - display: DisplayServer, - opts: &HostOpts, + sup_tx: mpsc::Sender, cancel: CancellationToken, -) -> Result<()> { - let (quic_send, quic_recv) = conn.accept_bi().await?; +) { + let remote = conn.remote_id(); - let capture_handle = capture::spawn(display, opts).await?; - let port = capture_handle.local_port(); - let tcp = wayland::connect_to_capture(port, std::time::Duration::from_secs(5)).await?; + let (reply_tx, reply_rx) = oneshot::channel(); + if sup_tx.send(SupervisorMsg::AddViewer(reply_tx)).await.is_err() { + tracing::warn!(%remote, "supervisor channel closed; dropping peer"); + return; + } + let port = match reply_rx.await { + Ok(Ok(p)) => p, + Ok(Err(reason)) => { + tracing::warn!(%remote, %reason, "refusing viewer"); + eprintln!("[pixelpass] refusing viewer {remote}: {reason}"); + return; + } + Err(_) => { + tracing::warn!(%remote, "supervisor reply dropped; dropping peer"); + return; + } + }; - let bridge = crate::common::tunnel::bridge(quic_send, quic_recv, tcp); + let (quic_send, quic_recv) = match conn.accept_bi().await { + Ok(s) => s, + Err(e) => { + tracing::warn!(%remote, "accept_bi failed: {e:#}"); + let _ = sup_tx.send(SupervisorMsg::RemoveViewer).await; + return; + } + }; + eprintln!("[pixelpass] viewer connected: {remote}"); + + let tcp = match wayland::connect_to_capture(port, Duration::from_secs(5)).await { + Ok(t) => t, + Err(e) => { + tracing::warn!(%remote, "connect_to_capture failed: {e:#}"); + let _ = sup_tx.send(SupervisorMsg::RemoveViewer).await; + return; + } + }; + + let bridge = tunnel::bridge(quic_send, quic_recv, tcp); tokio::select! { res = bridge => { if let Err(e) = res { - tracing::warn!("bridge ended with error: {e:#}"); + tracing::warn!(%remote, "bridge ended with error: {e:#}"); } else { - tracing::info!("bridge closed cleanly"); + tracing::info!(%remote, "bridge closed cleanly"); } } _ = cancel.cancelled() => { - tracing::info!("cancellation requested during stream"); + tracing::info!(%remote, "cancellation during stream"); } } - capture_handle.shutdown().await; - Ok(()) + eprintln!("[pixelpass] viewer disconnected: {remote}"); + let _ = sup_tx.send(SupervisorMsg::RemoveViewer).await; +} + +/// Owns the single shared CaptureHandle and the active viewer count. Spawns +/// capture lazily on the first AddViewer; tears it down when the count drops +/// back to zero. Enforces the --max-viewers cap by refusing AddViewer when +/// the count is already at the cap. +async fn supervise( + opts: HostOpts, + display: DisplayServer, + mut rx: mpsc::Receiver, +) { + let mut handle: Option = None; + let mut count: u32 = 0; + + while let Some(msg) = rx.recv().await { + match msg { + SupervisorMsg::AddViewer(reply) => { + if count >= opts.max_viewers { + let _ = reply.send(Err(format!( + "host is full ({} of {} viewers connected)", + count, opts.max_viewers + ))); + continue; + } + + if handle.is_none() { + tracing::info!("first viewer arriving — spawning capture"); + match capture::spawn(display, &opts).await { + Ok(h) => handle = Some(h), + Err(e) => { + let _ = reply.send(Err(format!("capture spawn failed: {e:#}"))); + continue; + } + } + } + + let port = handle.as_ref().expect("handle was just set").local_port(); + count += 1; + let _ = reply.send(Ok(port)); + tracing::info!(active = count, cap = opts.max_viewers, "viewer joined"); + } + SupervisorMsg::RemoveViewer => { + count = count.saturating_sub(1); + tracing::info!(active = count, cap = opts.max_viewers, "viewer left"); + if count == 0 + && let Some(h) = handle.take() + { + tracing::info!("last viewer left — tearing down capture"); + h.shutdown().await; + } + } + } + } + + if let Some(h) = handle.take() { + tracing::info!("host shutdown — tearing down capture"); + h.shutdown().await; + } } fn print_host_banner( @@ -106,19 +228,21 @@ fn print_host_banner( eprintln!("│ capture : {}", capture_summary(opts)); eprintln!("│ bitrate / fps : {} kbps @ {} fps", opts.bitrate, opts.framerate); eprintln!("│ hw encode : {}", if opts.no_hwencode { "off" } else { "auto (VAAPI if available)" }); + eprintln!("│ max viewers : {}", opts.max_viewers); eprintln!("│"); if clipboard_ok { eprintln!("│ Your share code has been copied to your clipboard."); - eprintln!("│ Send it to your viewer. (If clipboard didn't work, the"); + eprintln!("│ Send it to your viewer(s). (If clipboard didn't work, the"); eprintln!("│ code is also shown below for manual copy.)"); } else { - eprintln!("│ Share this ticket with your viewer:"); + eprintln!("│ Share this ticket with your viewer(s):"); } eprintln!("│"); eprintln!("│ pixelpass {ticket}"); eprintln!("│"); - eprintln!("│ Capture will not start until the viewer connects."); - eprintln!("│ Press Ctrl+C to stop."); + eprintln!("│ Capture starts when the first viewer connects, runs while"); + eprintln!("│ any viewer is connected, and tears down when the last one"); + eprintln!("│ leaves. Press Ctrl+C to stop the host entirely."); eprintln!("└────────────────────────────────────────────────────────────"); eprintln!(); } diff --git a/src/host/wayland.rs b/src/host/wayland.rs index bb30d0e..f3e14d1 100644 --- a/src/host/wayland.rs +++ b/src/host/wayland.rs @@ -1,7 +1,10 @@ //! Wayland capture: ashpd ScreenCast portal → PipeWire fd → gst-launch //! pipewiresrc → MPEG-TS on gst stdout → in-process HTTP server bound on a -//! random localhost port. The host bridge TCP-connects to that server and -//! pumps bytes to QUIC. +//! random localhost port. One gst child feeds a tokio::sync::broadcast channel; +//! the HTTP listener accepts multiple connections and each one drains its own +//! fresh broadcast::Receiver — so a single capture pipeline fans out to N +//! concurrent viewers. Slow consumers see Lagged and skip ahead; the MPEG-TS +//! stream resyncs at the next keyframe. use anyhow::{Context, Result, bail}; use ashpd::{ @@ -16,18 +19,30 @@ use nix::sys::signal::{Signal, kill}; use nix::unistd::{Pid, close}; use std::os::fd::{AsFd, IntoRawFd, OwnedFd, RawFd}; use std::process::Stdio; +use std::sync::Arc; use std::time::Duration; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::{TcpListener, TcpStream}; use tokio::process::{Child, ChildStdout, Command}; +use tokio::sync::broadcast; use tokio::task::JoinHandle; use tokio::time::{Instant, sleep, timeout}; use crate::cli::HostOpts; +/// Broadcast-channel capacity in chunks. Each chunk is up to 64 KiB from gst +/// stdout, so 16 chunks ≈ 1 MiB ≈ ~2 s of buffered jitter at the default +/// 4 Mbps bitrate. A viewer that falls behind by more than this gets Lagged +/// and skips ahead — MPEG-TS resyncs at the next keyframe. +const FANOUT_CAPACITY: usize = 16; + +/// Size of each chunk read from gst stdout. +const READ_CHUNK: usize = 64 * 1024; + pub struct CaptureHandle { port: u16, gst: Option, + reader: Option>, server: Option>, } @@ -37,8 +52,8 @@ impl CaptureHandle { } /// Graceful teardown: SIGTERM gst, give it ~1s to exit, then SIGKILL, then - /// abort the HTTP server task. Call this before dropping; Drop only fires - /// the kill backstop. + /// abort the reader + accept-loop tasks. Call this before dropping; Drop + /// only fires the kill backstop. pub async fn shutdown(mut self) { if let Some(child) = self.gst.as_mut() && let Some(pid) = child.id() @@ -49,6 +64,9 @@ impl CaptureHandle { let _ = timeout(Duration::from_millis(1000), child.wait()).await; let _ = child.start_kill(); } + if let Some(task) = self.reader.take() { + task.abort(); + } if let Some(task) = self.server.take() { task.abort(); } @@ -60,6 +78,9 @@ impl Drop for CaptureHandle { if let Some(child) = self.gst.as_mut() { let _ = child.start_kill(); } + if let Some(task) = self.reader.as_ref() { + task.abort(); + } if let Some(task) = self.server.as_ref() { task.abort(); } @@ -199,28 +220,68 @@ pub async fn start(opts: &HostOpts) -> Result { .take() .context("gst-launch-1.0 stdout pipe unavailable")?; - // 4. Spawn the HTTP server task. It owns the listener + gst stdout: it - // accepts one client (the host's bridge socket via connect_to_capture), - // drains the HTTP request, writes a fixed MPEG-TS response, then - // copies gst stdout to the socket forever. - let server = tokio::spawn(serve_capture(listener, gst_stdout)); + // 4. Set up the broadcast fanout. The reader task pumps gst stdout chunks + // into the channel; the accept-loop task spawns one sender task per + // accepted TCP connection, each draining a fresh broadcast::Receiver. + let (tx, _) = broadcast::channel::>>(FANOUT_CAPACITY); + + let reader = tokio::spawn(pump_gst_to_broadcast(gst_stdout, tx.clone())); + let server = tokio::spawn(run_accept_loop(listener, tx)); Ok(CaptureHandle { port, gst: Some(gst), + reader: Some(reader), server: Some(server), }) } -async fn serve_capture(listener: TcpListener, mut gst_stdout: ChildStdout) { - let mut sock = match listener.accept().await { - Ok((s, _)) => s, - Err(e) => { - tracing::warn!("capture HTTP accept failed: {e}"); - return; +/// Reads gst's stdout in chunks and broadcasts each to all current subscribers. +/// `broadcast::send` returns Err when there are no receivers; we ignore it and +/// keep reading so gst doesn't backpressure waiting for a viewer. +async fn pump_gst_to_broadcast( + mut gst_stdout: ChildStdout, + tx: broadcast::Sender>>, +) { + let mut buf = vec![0u8; READ_CHUNK]; + loop { + match gst_stdout.read(&mut buf).await { + Ok(0) => { + tracing::info!("gst stdout EOF — fanout reader exiting"); + return; + } + Ok(n) => { + let chunk = Arc::new(buf[..n].to_vec()); + let _ = tx.send(chunk); + } + Err(e) => { + tracing::warn!("gst stdout read error: {e}"); + return; + } } - }; + } +} +/// Accepts TCP connections on the local capture port forever. Each accepted +/// connection becomes its own viewer-serving task with a private receiver. +async fn run_accept_loop(listener: TcpListener, tx: broadcast::Sender>>) { + loop { + let sock = match listener.accept().await { + Ok((s, _)) => s, + Err(e) => { + tracing::warn!("capture HTTP accept failed: {e}"); + return; + } + }; + let rx = tx.subscribe(); + tokio::spawn(serve_one_viewer(sock, rx)); + } +} + +/// Drains the HTTP request, writes a fixed 200 OK, then pumps broadcast +/// chunks to the socket until the channel closes or the socket errors out. +/// On Lagged (slow consumer), skip ahead — MPEG-TS recovers at next keyframe. +async fn serve_one_viewer(mut sock: TcpStream, mut rx: broadcast::Receiver>>) { if !drain_http_request(&mut sock).await { return; } @@ -234,7 +295,23 @@ async fn serve_capture(listener: TcpListener, mut gst_stdout: ChildStdout) { return; } - let _ = tokio::io::copy(&mut gst_stdout, &mut sock).await; + loop { + match rx.recv().await { + Ok(chunk) => { + if sock.write_all(&chunk).await.is_err() { + return; + } + } + Err(broadcast::error::RecvError::Lagged(skipped)) => { + tracing::warn!( + skipped, + "viewer fanout lagged — MPEG-TS will resync at next keyframe" + ); + continue; + } + Err(broadcast::error::RecvError::Closed) => return, + } + } } async fn drain_http_request(sock: &mut TcpStream) -> bool {