diff --git a/src/host/mod.rs b/src/host/mod.rs index fe29b2a..f0a9eb6 100644 --- a/src/host/mod.rs +++ b/src/host/mod.rs @@ -1,4 +1,5 @@ mod capture; +mod serve; mod wayland; use anyhow::{Result, bail}; @@ -135,7 +136,7 @@ async fn handle_peer( eprintln!("[pixelpass] viewer connected: {remote}"); - let tcp = match wayland::connect_to_capture(port, Duration::from_secs(5)).await { + let tcp = match serve::connect_to_capture(port, Duration::from_secs(5)).await { Ok(t) => t, Err(e) => { tracing::warn!(%remote, "connect_to_capture failed: {e:#}"); diff --git a/src/host/serve.rs b/src/host/serve.rs new file mode 100644 index 0000000..74ce058 --- /dev/null +++ b/src/host/serve.rs @@ -0,0 +1,191 @@ +//! Display-server-agnostic serving layer: takes a capture child's stdout +//! producing MPEG-TS bytes and fans them out to N concurrent HTTP viewers +//! on a localhost port. One reader task pumps stdout chunks into a +//! tokio::sync::broadcast channel; the accept loop spawns one drain task +//! per accepted TCP connection. Slow consumers see Lagged and skip ahead; +//! MPEG-TS resyncs at the next keyframe. +//! +//! Backends (host/wayland.rs, future host/x11.rs) build their own gst +//! pipeline and hand the resulting ChildStdout to [`Serve::bind`]. + +use anyhow::{Context, Result, bail}; +use std::sync::Arc; +use std::time::Duration; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::net::{TcpListener, TcpStream}; +use tokio::process::ChildStdout; +use tokio::sync::broadcast; +use tokio::task::JoinHandle; +use tokio::time::{Instant, sleep}; + +/// Broadcast-channel capacity in chunks. Each chunk is up to 64 KiB from +/// the capture child's stdout, so 16 chunks ≈ 1 MiB ≈ ~2 s of buffered +/// jitter at typical bitrates. A viewer that falls behind by more than +/// this gets Lagged and skips ahead — MPEG-TS recovers at the next +/// keyframe. +const FANOUT_CAPACITY: usize = 16; + +/// Size of each chunk read from the capture child's stdout. +const READ_CHUNK: usize = 64 * 1024; + +/// Owns the localhost HTTP listener and the two long-running tasks that +/// pump bytes from a capture child to all connected viewers. +pub struct Serve { + port: u16, + reader: Option>, + server: Option>, +} + +impl Serve { + /// Bind a localhost listener on a random port, set up the broadcast + /// fanout, and spawn the reader + accept-loop tasks. The provided + /// `stdout` is assumed to produce MPEG-TS bytes. + pub async fn bind(stdout: ChildStdout) -> Result { + let listener = TcpListener::bind("127.0.0.1:0") + .await + .context("could not bind local capture HTTP listener")?; + let port = listener.local_addr()?.port(); + + let (tx, _) = broadcast::channel::>>(FANOUT_CAPACITY); + let reader = tokio::spawn(pump_to_broadcast(stdout, tx.clone())); + let server = tokio::spawn(run_accept_loop(listener, tx)); + + Ok(Self { + port, + reader: Some(reader), + server: Some(server), + }) + } + + pub fn local_port(&self) -> u16 { + self.port + } + + /// Abort the reader and accept-loop tasks. Backends typically call this + /// after killing their capture child so the reader sees stdout EOF and + /// exits on its own; the abort is a backstop. + pub async fn shutdown(mut self) { + if let Some(task) = self.reader.take() { + task.abort(); + } + if let Some(task) = self.server.take() { + task.abort(); + } + } +} + +impl Drop for Serve { + fn drop(&mut self) { + if let Some(task) = self.reader.as_ref() { + task.abort(); + } + if let Some(task) = self.server.as_ref() { + task.abort(); + } + } +} + +/// Connect to the local capture HTTP listener, retrying until it's up or +/// we time out. Returns the connected socket — the bridge layer pipes +/// QUIC↔this socket once it's open. +pub async fn connect_to_capture(port: u16, max_wait: Duration) -> Result { + let deadline = Instant::now() + max_wait; + loop { + match TcpStream::connect(("127.0.0.1", port)).await { + Ok(stream) => return Ok(stream), + Err(_) if Instant::now() < deadline => { + sleep(Duration::from_millis(50)).await; + } + Err(e) => bail!("capture HTTP listener never came up on 127.0.0.1:{port}: {e}"), + } + } +} + +/// Read the capture child's stdout in chunks and broadcast each to all +/// current subscribers. `broadcast::send` returns Err when there are no +/// receivers; we ignore it so the capture child isn't backpressured +/// waiting for a viewer. +async fn pump_to_broadcast(mut stdout: ChildStdout, tx: broadcast::Sender>>) { + let mut buf = vec![0u8; READ_CHUNK]; + loop { + match stdout.read(&mut buf).await { + Ok(0) => { + tracing::info!("capture stdout EOF — fanout reader exiting"); + return; + } + Ok(n) => { + let chunk = Arc::new(buf[..n].to_vec()); + let _ = tx.send(chunk); + } + Err(e) => { + tracing::warn!("capture stdout read error: {e}"); + return; + } + } + } +} + +async fn run_accept_loop(listener: TcpListener, tx: broadcast::Sender>>) { + loop { + let sock = match listener.accept().await { + Ok((s, _)) => s, + Err(e) => { + tracing::warn!("capture HTTP accept failed: {e}"); + return; + } + }; + let rx = tx.subscribe(); + tokio::spawn(serve_one_viewer(sock, rx)); + } +} + +async fn serve_one_viewer(mut sock: TcpStream, mut rx: broadcast::Receiver>>) { + if !drain_http_request(&mut sock).await { + return; + } + + const RESPONSE: &[u8] = b"HTTP/1.1 200 OK\r\n\ + Content-Type: video/mp2t\r\n\ + Cache-Control: no-cache, no-store\r\n\ + Connection: close\r\n\ + \r\n"; + if sock.write_all(RESPONSE).await.is_err() { + return; + } + + loop { + match rx.recv().await { + Ok(chunk) => { + if sock.write_all(&chunk).await.is_err() { + return; + } + } + Err(broadcast::error::RecvError::Lagged(skipped)) => { + tracing::warn!( + skipped, + "viewer fanout lagged — MPEG-TS will resync at next keyframe" + ); + continue; + } + Err(broadcast::error::RecvError::Closed) => return, + } + } +} + +async fn drain_http_request(sock: &mut TcpStream) -> bool { + let mut buf = [0u8; 1024]; + let mut total = Vec::with_capacity(512); + loop { + match sock.read(&mut buf).await { + Ok(0) => return false, + Ok(n) => total.extend_from_slice(&buf[..n]), + Err(_) => return false, + } + if total.windows(4).any(|w| w == b"\r\n\r\n") { + return true; + } + if total.len() > 16 * 1024 { + return false; + } + } +} diff --git a/src/host/wayland.rs b/src/host/wayland.rs index f3e14d1..0589873 100644 --- a/src/host/wayland.rs +++ b/src/host/wayland.rs @@ -1,10 +1,6 @@ -//! Wayland capture: ashpd ScreenCast portal → PipeWire fd → gst-launch -//! pipewiresrc → MPEG-TS on gst stdout → in-process HTTP server bound on a -//! random localhost port. One gst child feeds a tokio::sync::broadcast channel; -//! the HTTP listener accepts multiple connections and each one drains its own -//! fresh broadcast::Receiver — so a single capture pipeline fans out to N -//! concurrent viewers. Slow consumers see Lagged and skip ahead; the MPEG-TS -//! stream resyncs at the next keyframe. +//! Wayland capture: ashpd ScreenCast portal → PipeWire fd → gst-launch. +//! Builds the gst pipeline that produces MPEG-TS on stdout, then hands +//! that stdout to [`super::serve::Serve`] which handles the HTTP fanout. use anyhow::{Context, Result, bail}; use ashpd::{ @@ -19,41 +15,29 @@ use nix::sys::signal::{Signal, kill}; use nix::unistd::{Pid, close}; use std::os::fd::{AsFd, IntoRawFd, OwnedFd, RawFd}; use std::process::Stdio; -use std::sync::Arc; use std::time::Duration; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; -use tokio::net::{TcpListener, TcpStream}; -use tokio::process::{Child, ChildStdout, Command}; -use tokio::sync::broadcast; -use tokio::task::JoinHandle; -use tokio::time::{Instant, sleep, timeout}; +use tokio::process::{Child, Command}; +use tokio::time::timeout; +use super::serve::Serve; use crate::cli::HostOpts; -/// Broadcast-channel capacity in chunks. Each chunk is up to 64 KiB from gst -/// stdout, so 16 chunks ≈ 1 MiB ≈ ~2 s of buffered jitter at the default -/// 4 Mbps bitrate. A viewer that falls behind by more than this gets Lagged -/// and skips ahead — MPEG-TS resyncs at the next keyframe. -const FANOUT_CAPACITY: usize = 16; - -/// Size of each chunk read from gst stdout. -const READ_CHUNK: usize = 64 * 1024; - pub struct CaptureHandle { - port: u16, gst: Option, - reader: Option>, - server: Option>, + serve: Option, } impl CaptureHandle { pub fn local_port(&self) -> u16 { - self.port + self.serve + .as_ref() + .expect("serve is always Some until shutdown") + .local_port() } - /// Graceful teardown: SIGTERM gst, give it ~1s to exit, then SIGKILL, then - /// abort the reader + accept-loop tasks. Call this before dropping; Drop - /// only fires the kill backstop. + /// Graceful teardown: SIGTERM gst, give it ~1s to exit, then SIGKILL, + /// then tear down the serve layer. The serve reader will see EOF on + /// gst stdout and exit on its own; serve.shutdown() is the backstop. pub async fn shutdown(mut self) { if let Some(child) = self.gst.as_mut() && let Some(pid) = child.id() @@ -64,11 +48,8 @@ impl CaptureHandle { let _ = timeout(Duration::from_millis(1000), child.wait()).await; let _ = child.start_kill(); } - if let Some(task) = self.reader.take() { - task.abort(); - } - if let Some(task) = self.server.take() { - task.abort(); + if let Some(serve) = self.serve.take() { + serve.shutdown().await; } } } @@ -78,12 +59,7 @@ impl Drop for CaptureHandle { if let Some(child) = self.gst.as_mut() { let _ = child.start_kill(); } - if let Some(task) = self.reader.as_ref() { - task.abort(); - } - if let Some(task) = self.server.as_ref() { - task.abort(); - } + // Serve's own Drop aborts its tasks. } } @@ -130,15 +106,9 @@ pub async fn start(opts: &HostOpts) -> Result { clear_cloexec(&pw_fd)?; let raw_fd: RawFd = pw_fd.into_raw_fd(); - // 2. Bind the in-process HTTP listener on a random localhost port. - let listener = TcpListener::bind("127.0.0.1:0") - .await - .context("could not bind local capture HTTP listener")?; - let port = listener.local_addr()?.port(); - - // 3. Spawn gst-launch with the full pipeline: video AND audio captured, + // 2. Spawn gst-launch with the full pipeline: video AND audio captured, // encoded, and muxed into MPEG-TS inside gst. Output goes to stdout, - // which we pipe straight to our HTTP server task — no demux/remux, + // which the serve layer pipes to its HTTP fanout — no demux/remux, // no codec assumptions. let key_interval = (opts.framerate * 2).to_string(); let bitrate = opts.bitrate.to_string(); @@ -220,118 +190,16 @@ pub async fn start(opts: &HostOpts) -> Result { .take() .context("gst-launch-1.0 stdout pipe unavailable")?; - // 4. Set up the broadcast fanout. The reader task pumps gst stdout chunks - // into the channel; the accept-loop task spawns one sender task per - // accepted TCP connection, each draining a fresh broadcast::Receiver. - let (tx, _) = broadcast::channel::>>(FANOUT_CAPACITY); - - let reader = tokio::spawn(pump_gst_to_broadcast(gst_stdout, tx.clone())); - let server = tokio::spawn(run_accept_loop(listener, tx)); + // 3. Hand stdout to the serve layer, which binds the localhost HTTP + // listener and runs the broadcast fanout. + let serve = Serve::bind(gst_stdout).await?; Ok(CaptureHandle { - port, gst: Some(gst), - reader: Some(reader), - server: Some(server), + serve: Some(serve), }) } -/// Reads gst's stdout in chunks and broadcasts each to all current subscribers. -/// `broadcast::send` returns Err when there are no receivers; we ignore it and -/// keep reading so gst doesn't backpressure waiting for a viewer. -async fn pump_gst_to_broadcast( - mut gst_stdout: ChildStdout, - tx: broadcast::Sender>>, -) { - let mut buf = vec![0u8; READ_CHUNK]; - loop { - match gst_stdout.read(&mut buf).await { - Ok(0) => { - tracing::info!("gst stdout EOF — fanout reader exiting"); - return; - } - Ok(n) => { - let chunk = Arc::new(buf[..n].to_vec()); - let _ = tx.send(chunk); - } - Err(e) => { - tracing::warn!("gst stdout read error: {e}"); - return; - } - } - } -} - -/// Accepts TCP connections on the local capture port forever. Each accepted -/// connection becomes its own viewer-serving task with a private receiver. -async fn run_accept_loop(listener: TcpListener, tx: broadcast::Sender>>) { - loop { - let sock = match listener.accept().await { - Ok((s, _)) => s, - Err(e) => { - tracing::warn!("capture HTTP accept failed: {e}"); - return; - } - }; - let rx = tx.subscribe(); - tokio::spawn(serve_one_viewer(sock, rx)); - } -} - -/// Drains the HTTP request, writes a fixed 200 OK, then pumps broadcast -/// chunks to the socket until the channel closes or the socket errors out. -/// On Lagged (slow consumer), skip ahead — MPEG-TS recovers at next keyframe. -async fn serve_one_viewer(mut sock: TcpStream, mut rx: broadcast::Receiver>>) { - if !drain_http_request(&mut sock).await { - return; - } - - const RESPONSE: &[u8] = b"HTTP/1.1 200 OK\r\n\ - Content-Type: video/mp2t\r\n\ - Cache-Control: no-cache, no-store\r\n\ - Connection: close\r\n\ - \r\n"; - if sock.write_all(RESPONSE).await.is_err() { - return; - } - - loop { - match rx.recv().await { - Ok(chunk) => { - if sock.write_all(&chunk).await.is_err() { - return; - } - } - Err(broadcast::error::RecvError::Lagged(skipped)) => { - tracing::warn!( - skipped, - "viewer fanout lagged — MPEG-TS will resync at next keyframe" - ); - continue; - } - Err(broadcast::error::RecvError::Closed) => return, - } - } -} - -async fn drain_http_request(sock: &mut TcpStream) -> bool { - let mut buf = [0u8; 1024]; - let mut total = Vec::with_capacity(512); - loop { - match sock.read(&mut buf).await { - Ok(0) => return false, - Ok(n) => total.extend_from_slice(&buf[..n]), - Err(_) => return false, - } - if total.windows(4).any(|w| w == b"\r\n\r\n") { - return true; - } - if total.len() > 16 * 1024 { - return false; - } - } -} - fn clear_cloexec(fd: &impl AsFd) -> Result<()> { let flags_int = fcntl(fd.as_fd(), FcntlArg::F_GETFD).context("F_GETFD on pipewire fd")?; let mut flags = FdFlag::from_bits_truncate(flags_int); @@ -340,22 +208,6 @@ fn clear_cloexec(fd: &impl AsFd) -> Result<()> { Ok(()) } -/// Connect to the in-process capture HTTP listener, retrying until it's up or -/// we time out. Returns the connected socket — the listener accepts exactly -/// one connection (the bridge socket), so this stream IS the bridge socket. -pub async fn connect_to_capture(port: u16, max_wait: Duration) -> Result { - let deadline = Instant::now() + max_wait; - loop { - match TcpStream::connect(("127.0.0.1", port)).await { - Ok(stream) => return Ok(stream), - Err(_) if Instant::now() < deadline => { - sleep(Duration::from_millis(50)).await; - } - Err(e) => bail!("capture HTTP listener never came up on 127.0.0.1:{port}: {e}"), - } - } -} - async fn default_audio_monitor() -> Result { let output = Command::new("pactl") .arg("get-default-sink")