diff --git a/src/common/output.rs b/src/common/output.rs index 58df24d..511e69c 100644 --- a/src/common/output.rs +++ b/src/common/output.rs @@ -22,7 +22,11 @@ pub fn set_json(enabled: bool) { JSON_ENABLED.store(enabled, Ordering::Relaxed); } -fn json_enabled() -> bool { +/// Whether the JSON event stream is on — i.e. we're being driven by a +/// machine front-end (the `--gui` shell-out) rather than a human terminal. +/// Gates features that only make sense under that front-end, like the +/// stdin command channel the host reads `kick` requests from. +pub fn json_enabled() -> bool { JSON_ENABLED.load(Ordering::Relaxed) } @@ -42,8 +46,12 @@ pub enum Event<'a> { max_viewers: u32, max_viewers_source: &'a str, }, - /// Active viewer count changed. - ViewerCount { active: u32, max: u32 }, + /// A viewer joined. `id` is the viewer's endpoint id; `active` is the new + /// total after the join. + ViewerJoined { id: &'a str, active: u32, max: u32 }, + /// A viewer left — disconnected on their own or kicked by the host. `id` + /// is the viewer's endpoint id; `active` is the new total after. + ViewerLeft { id: &'a str, active: u32, max: u32 }, /// Capture pipeline lifecycle (spawned on first viewer, torn down on last). Capture { state: CaptureState }, /// A viewer was turned away (host full, or capture spawn failed). diff --git a/src/gui/child.rs b/src/gui/child.rs index 2a5a50a..cc267b5 100644 --- a/src/gui/child.rs +++ b/src/gui/child.rs @@ -6,8 +6,8 @@ //! egui app drains each frame. stderr is captured into a small ring so a //! failed launch (e.g. a missing gst plugin) can be surfaced in the window. -use std::io::{BufRead, BufReader}; -use std::process::{Child, Command, Stdio}; +use std::io::{BufRead, BufReader, Write}; +use std::process::{Child, ChildStdin, Command, Stdio}; use std::sync::mpsc::Receiver; use std::sync::{Arc, Mutex}; use std::time::Duration; @@ -35,7 +35,13 @@ pub enum ChildEvent { max_viewers: u32, max_viewers_source: String, }, - ViewerCount { + ViewerJoined { + id: String, + active: u32, + max: u32, + }, + ViewerLeft { + id: String, active: u32, max: u32, }, @@ -63,6 +69,9 @@ pub struct ChildProc { child: Child, pub rx: Receiver, stderr_tail: Arc>>, + /// Write end of the child's stdin, for the line-based command channel + /// (see [`ChildProc::send_command`]). `None` once it's been closed. + stdin: Option, } impl ChildProc { @@ -72,11 +81,14 @@ impl ChildProc { let exe = std::env::current_exe()?; let mut child = Command::new(exe) .args(args) - .stdin(Stdio::null()) + // Piped so we can send line commands (e.g. `kick `); the host + // only reads it when driven this way (`--output json`). + .stdin(Stdio::piped()) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .spawn()?; + let stdin = child.stdin.take(); let (tx, rx) = std::sync::mpsc::channel(); let stdout = child.stdout.take().expect("stdout piped"); std::thread::spawn(move || { @@ -114,9 +126,23 @@ impl ChildProc { child, rx, stderr_tail, + stdin, }) } + /// Send one newline-terminated command to the child over its stdin (the + /// host parses these as `kick `). Best-effort: a closed pipe + /// (child already gone) just drops the command. + pub fn send_command(&mut self, cmd: &str) { + let Some(stdin) = self.stdin.as_mut() else { + return; + }; + if let Err(e) = writeln!(stdin, "{cmd}") { + tracing::warn!("failed to send command to host child: {e}"); + self.stdin = None; // pipe is dead; stop trying + } + } + /// Whether the child is still running. pub fn is_alive(&mut self) -> bool { matches!(self.child.try_wait(), Ok(None)) @@ -205,10 +231,14 @@ mod tests { } #[test] - fn viewer_count_round_trips() { + fn viewer_join_leave_round_trip() { assert!(matches!( - parse(Event::ViewerCount { active: 2, max: 4 }), - ChildEvent::ViewerCount { active: 2, max: 4 } + parse(Event::ViewerJoined { id: "nodeXYZ", active: 2, max: 4 }), + ChildEvent::ViewerJoined { id, active: 2, max: 4 } if id == "nodeXYZ" + )); + assert!(matches!( + parse(Event::ViewerLeft { id: "nodeXYZ", active: 1, max: 4 }), + ChildEvent::ViewerLeft { id, active: 1, max: 4 } if id == "nodeXYZ" )); } diff --git a/src/gui/mod.rs b/src/gui/mod.rs index dd7519b..63ebb74 100644 --- a/src/gui/mod.rs +++ b/src/gui/mod.rs @@ -168,6 +168,9 @@ struct HostState { copied: bool, last_refusal: Option, error: Option, + /// Endpoint ids of the currently-connected viewers, in arrival order. + /// Drives the per-viewer list and its Kick buttons. + viewers: Vec, } impl Default for HostState { @@ -186,6 +189,7 @@ impl Default for HostState { copied: false, last_refusal: None, error: None, + viewers: Vec::new(), } } } @@ -362,6 +366,23 @@ impl PixelPassApp { ui.add_space(6.0); ui.label(format!("Viewers: {} / {}", self.host.active, self.host.max)); + // Per-viewer list with a Kick button each. Collect the click first so + // we're not borrowing self.host.viewers while we reach for the child. + let mut kick: Option = None; + for id in &self.host.viewers { + ui.horizontal(|ui| { + ui.label(format!("• endpoint {}", short_id(id))); + if ui.small_button("Kick").clicked() { + kick = Some(id.clone()); + } + }); + } + if let Some(id) = kick + && let Some(p) = &mut self.host.proc + { + p.send_command(&format!("kick {id}")); + } + if let Some(info) = &self.host.info { ui.label( egui::RichText::new(format!("{} · {}", info.display, info.capture)) @@ -452,6 +473,7 @@ impl PixelPassApp { self.host.max = 0; self.host.capturing = false; self.host.copied = false; + self.host.viewers.clear(); let mut args = vec![ "--host".to_string(), @@ -483,6 +505,7 @@ impl PixelPassApp { self.host.capturing = false; self.host.ticket = None; self.host.copied = false; + self.host.viewers.clear(); } /// Drain the host child's event channel into state, and detect an @@ -543,9 +566,17 @@ impl PixelPassApp { cap_source: max_viewers_source, }); } - ChildEvent::ViewerCount { active, max } => { + ChildEvent::ViewerJoined { id, active, max } => { self.host.active = active; self.host.max = max; + if !self.host.viewers.contains(&id) { + self.host.viewers.push(id); + } + } + ChildEvent::ViewerLeft { id, active, max } => { + self.host.active = active; + self.host.max = max; + self.host.viewers.retain(|v| v != &id); } ChildEvent::Capture { state } => { self.host.capturing = matches!(state, child::CaptureState::Started); diff --git a/src/host/mod.rs b/src/host/mod.rs index 28cdbb4..44f2eb4 100644 --- a/src/host/mod.rs +++ b/src/host/mod.rs @@ -10,6 +10,7 @@ use anyhow::{Result, bail}; use iroh::endpoint::{Connection, presets}; use iroh::{Endpoint, EndpointAddr}; use iroh_tickets::endpoint::EndpointTicket; +use std::collections::HashMap; use std::time::Duration; use tokio::sync::{mpsc, oneshot}; use tokio_util::sync::CancellationToken; @@ -23,15 +24,28 @@ use crate::common::{ use self::pipeline::CaptureHandle; use self::quality::EffectiveQuality; -/// Messages from per-viewer tasks to the capture supervisor. +/// Messages from per-viewer tasks (and the GUI command channel) to the +/// capture supervisor. +// The shared `Viewer` suffix is the point — these are all viewer lifecycle +// messages — so keep the descriptive names. +#[allow(clippy::enum_variant_names)] enum SupervisorMsg { - /// A new viewer wants in. Supervisor replies with the local capture - /// HTTP port to connect to, or an error string if the host is full or - /// capture spawn failed. - AddViewer(oneshot::Sender>), + /// A new viewer wants in. Supervisor replies with the local capture HTTP + /// port to connect to, or an error string if the host is full or capture + /// spawn failed. `cancel` is the viewer's own token — the supervisor keeps + /// it so a later `KickViewer` can tear this viewer's stream down. + AddViewer { + id: String, + cancel: CancellationToken, + reply: oneshot::Sender>, + }, /// A viewer's session ended. Supervisor decrements the count and tears /// down capture if it just hit zero. - RemoveViewer, + RemoveViewer { id: String }, + /// Host asked (via the GUI command channel) to disconnect a viewer by + /// endpoint id. Cancels that viewer's token; the normal teardown path then + /// emits the `ViewerLeft`. + KickViewer { id: String }, } pub async fn run(opts: HostOpts) -> Result<()> { @@ -113,6 +127,15 @@ pub async fn run(opts: HostOpts) -> Result<()> { sup_rx, )); + // Command channel for the GUI front-end: read `kick ` lines + // off stdin. Only when machine-driven (`--output json`) — a human host has + // nothing to type here, and we don't want to swallow terminal input. Runs + // on a plain OS thread (not a tokio task) so a read parked on stdin can't + // hold up runtime shutdown on Ctrl+C; the thread dies with the process. + if output::json_enabled() { + spawn_kick_listener(sup_tx.clone()); + } + accept_loop(&endpoint, sup_tx.clone(), cancel.clone()).await; drop(sup_tx); @@ -159,9 +182,18 @@ async fn handle_peer( cancel: CancellationToken, ) { let remote = conn.remote_id(); + let id = remote.to_string(); + // This viewer's own kill switch: the supervisor holds a clone so a `kick` + // can cancel it, and the stream select! below watches it. + let peer_cancel = CancellationToken::new(); let (reply_tx, reply_rx) = oneshot::channel(); - if sup_tx.send(SupervisorMsg::AddViewer(reply_tx)).await.is_err() { + let add = SupervisorMsg::AddViewer { + id: id.clone(), + cancel: peer_cancel.clone(), + reply: reply_tx, + }; + if sup_tx.send(add).await.is_err() { tracing::warn!(%remote, "supervisor channel closed; dropping peer"); return; } @@ -182,7 +214,7 @@ async fn handle_peer( Ok(s) => s, Err(e) => { tracing::warn!(%remote, "accept_bi failed: {e:#}"); - let _ = sup_tx.send(SupervisorMsg::RemoveViewer).await; + let _ = sup_tx.send(SupervisorMsg::RemoveViewer { id }).await; return; } }; @@ -193,7 +225,7 @@ async fn handle_peer( Ok(t) => t, Err(e) => { tracing::warn!(%remote, "connect_to_capture failed: {e:#}"); - let _ = sup_tx.send(SupervisorMsg::RemoveViewer).await; + let _ = sup_tx.send(SupervisorMsg::RemoveViewer { id }).await; return; } }; @@ -207,10 +239,34 @@ async fn handle_peer( _ = cancel.cancelled() => { tracing::info!(%remote, "cancellation during stream"); } + _ = peer_cancel.cancelled() => { + tracing::info!(%remote, "kicked by host"); + } } eprintln!("[pixelpass] viewer disconnected: {remote}"); - let _ = sup_tx.send(SupervisorMsg::RemoveViewer).await; + let _ = sup_tx.send(SupervisorMsg::RemoveViewer { id }).await; +} + +/// Read `kick ` lines off stdin and forward them to the +/// supervisor. Runs on a detached OS thread (see the call site for why). Ends +/// when stdin hits EOF (the GUI closed the pipe) or the supervisor is gone. +fn spawn_kick_listener(sup_tx: mpsc::Sender) { + use std::io::BufRead; + std::thread::spawn(move || { + let stdin = std::io::stdin(); + for line in stdin.lock().lines().map_while(Result::ok) { + let Some(id) = line.trim().strip_prefix("kick ") else { + continue; + }; + let msg = SupervisorMsg::KickViewer { id: id.trim().to_string() }; + // blocking_send is valid here: this is a plain thread, not inside + // the tokio runtime. An Err means the supervisor closed — stop. + if sup_tx.blocking_send(msg).is_err() { + break; + } + } + }); } /// Owns the single shared CaptureHandle and the active viewer count. Spawns @@ -225,11 +281,15 @@ async fn supervise( mut rx: mpsc::Receiver, ) { let mut handle: Option = None; - let mut count: u32 = 0; + // Active viewers, keyed by endpoint id, holding each one's kill switch. + // The count is just `viewers.len()`. (A given endpoint connecting twice is + // a non-case here: each viewer process uses a fresh ephemeral identity.) + let mut viewers: HashMap = HashMap::new(); while let Some(msg) = rx.recv().await { match msg { - SupervisorMsg::AddViewer(reply) => { + SupervisorMsg::AddViewer { id, cancel, reply } => { + let count = viewers.len() as u32; if count >= max_viewers { let reason = format!("host is full ({count} of {max_viewers} viewers connected)"); @@ -255,16 +315,22 @@ async fn supervise( } let port = handle.as_ref().expect("handle was just set").local_port(); - count += 1; + viewers.insert(id.clone(), cancel); + let active = viewers.len() as u32; let _ = reply.send(Ok(port)); - output::emit(output::Event::ViewerCount { active: count, max: max_viewers }); - tracing::info!(active = count, cap = max_viewers, "viewer joined"); + output::emit(output::Event::ViewerJoined { id: &id, active, max: max_viewers }); + tracing::info!(active, cap = max_viewers, "viewer joined"); } - SupervisorMsg::RemoveViewer => { - count = count.saturating_sub(1); - output::emit(output::Event::ViewerCount { active: count, max: max_viewers }); - tracing::info!(active = count, cap = max_viewers, "viewer left"); - if count == 0 + SupervisorMsg::RemoveViewer { id } => { + // A given viewer task only ever sends RemoveViewer once, but the + // map remove is the source of truth either way. + if viewers.remove(&id).is_none() { + continue; + } + let active = viewers.len() as u32; + output::emit(output::Event::ViewerLeft { id: &id, active, max: max_viewers }); + tracing::info!(active, cap = max_viewers, "viewer left"); + if active == 0 && let Some(h) = handle.take() { tracing::info!("last viewer left — tearing down capture"); @@ -274,6 +340,17 @@ async fn supervise( }); } } + SupervisorMsg::KickViewer { id } => { + match viewers.get(&id) { + // Cancel the viewer's token; its handle_peer select! wakes, + // sends RemoveViewer, and the leave is emitted there. + Some(cancel) => { + tracing::info!(%id, "kicking viewer"); + cancel.cancel(); + } + None => tracing::debug!(%id, "kick for unknown/already-gone viewer"), + } + } } }