feat(gui): list connected viewers and let the host kick them

Track viewers by endpoint id instead of a bare count. The JSON event
stream gains viewer_joined / viewer_left (each carrying the id),
replacing viewer_count; active/max still ride along so the count
display is unchanged.

The host screen now renders one row per connected viewer with a Kick
button. Clicking it sends `kick <id>` to the headless child over a new
stdin command channel, which the host turns into a per-viewer
CancellationToken cancel; the existing teardown path then emits the
leave, so a kick and a self-disconnect look identical downstream.

The stdin channel only runs under --output json (the GUI shell-out) and
on a detached OS thread, so a read parked on stdin can't hold up the
host's Ctrl+C shutdown.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
2026-05-25 15:27:49 -04:00
parent e8f86b0ac2
commit 24e0d0e799
4 changed files with 177 additions and 31 deletions
+11 -3
View File
@@ -22,7 +22,11 @@ pub fn set_json(enabled: bool) {
JSON_ENABLED.store(enabled, Ordering::Relaxed);
}
fn json_enabled() -> bool {
/// Whether the JSON event stream is on — i.e. we're being driven by a
/// machine front-end (the `--gui` shell-out) rather than a human terminal.
/// Gates features that only make sense under that front-end, like the
/// stdin command channel the host reads `kick` requests from.
pub fn json_enabled() -> bool {
JSON_ENABLED.load(Ordering::Relaxed)
}
@@ -42,8 +46,12 @@ pub enum Event<'a> {
max_viewers: u32,
max_viewers_source: &'a str,
},
/// Active viewer count changed.
ViewerCount { active: u32, max: u32 },
/// A viewer joined. `id` is the viewer's endpoint id; `active` is the new
/// total after the join.
ViewerJoined { id: &'a str, active: u32, max: u32 },
/// A viewer left — disconnected on their own or kicked by the host. `id`
/// is the viewer's endpoint id; `active` is the new total after.
ViewerLeft { id: &'a str, active: u32, max: u32 },
/// Capture pipeline lifecycle (spawned on first viewer, torn down on last).
Capture { state: CaptureState },
/// A viewer was turned away (host full, or capture spawn failed).
+37 -7
View File
@@ -6,8 +6,8 @@
//! egui app drains each frame. stderr is captured into a small ring so a
//! failed launch (e.g. a missing gst plugin) can be surfaced in the window.
use std::io::{BufRead, BufReader};
use std::process::{Child, Command, Stdio};
use std::io::{BufRead, BufReader, Write};
use std::process::{Child, ChildStdin, Command, Stdio};
use std::sync::mpsc::Receiver;
use std::sync::{Arc, Mutex};
use std::time::Duration;
@@ -35,7 +35,13 @@ pub enum ChildEvent {
max_viewers: u32,
max_viewers_source: String,
},
ViewerCount {
ViewerJoined {
id: String,
active: u32,
max: u32,
},
ViewerLeft {
id: String,
active: u32,
max: u32,
},
@@ -63,6 +69,9 @@ pub struct ChildProc {
child: Child,
pub rx: Receiver<ChildEvent>,
stderr_tail: Arc<Mutex<Vec<String>>>,
/// Write end of the child's stdin, for the line-based command channel
/// (see [`ChildProc::send_command`]). `None` once it's been closed.
stdin: Option<ChildStdin>,
}
impl ChildProc {
@@ -72,11 +81,14 @@ impl ChildProc {
let exe = std::env::current_exe()?;
let mut child = Command::new(exe)
.args(args)
.stdin(Stdio::null())
// Piped so we can send line commands (e.g. `kick <id>`); the host
// only reads it when driven this way (`--output json`).
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()?;
let stdin = child.stdin.take();
let (tx, rx) = std::sync::mpsc::channel();
let stdout = child.stdout.take().expect("stdout piped");
std::thread::spawn(move || {
@@ -114,9 +126,23 @@ impl ChildProc {
child,
rx,
stderr_tail,
stdin,
})
}
/// Send one newline-terminated command to the child over its stdin (the
/// host parses these as `kick <endpoint-id>`). Best-effort: a closed pipe
/// (child already gone) just drops the command.
pub fn send_command(&mut self, cmd: &str) {
let Some(stdin) = self.stdin.as_mut() else {
return;
};
if let Err(e) = writeln!(stdin, "{cmd}") {
tracing::warn!("failed to send command to host child: {e}");
self.stdin = None; // pipe is dead; stop trying
}
}
/// Whether the child is still running.
pub fn is_alive(&mut self) -> bool {
matches!(self.child.try_wait(), Ok(None))
@@ -205,10 +231,14 @@ mod tests {
}
#[test]
fn viewer_count_round_trips() {
fn viewer_join_leave_round_trip() {
assert!(matches!(
parse(Event::ViewerCount { active: 2, max: 4 }),
ChildEvent::ViewerCount { active: 2, max: 4 }
parse(Event::ViewerJoined { id: "nodeXYZ", active: 2, max: 4 }),
ChildEvent::ViewerJoined { id, active: 2, max: 4 } if id == "nodeXYZ"
));
assert!(matches!(
parse(Event::ViewerLeft { id: "nodeXYZ", active: 1, max: 4 }),
ChildEvent::ViewerLeft { id, active: 1, max: 4 } if id == "nodeXYZ"
));
}
+32 -1
View File
@@ -168,6 +168,9 @@ struct HostState {
copied: bool,
last_refusal: Option<String>,
error: Option<String>,
/// Endpoint ids of the currently-connected viewers, in arrival order.
/// Drives the per-viewer list and its Kick buttons.
viewers: Vec<String>,
}
impl Default for HostState {
@@ -186,6 +189,7 @@ impl Default for HostState {
copied: false,
last_refusal: None,
error: None,
viewers: Vec::new(),
}
}
}
@@ -362,6 +366,23 @@ impl PixelPassApp {
ui.add_space(6.0);
ui.label(format!("Viewers: {} / {}", self.host.active, self.host.max));
// Per-viewer list with a Kick button each. Collect the click first so
// we're not borrowing self.host.viewers while we reach for the child.
let mut kick: Option<String> = None;
for id in &self.host.viewers {
ui.horizontal(|ui| {
ui.label(format!("• endpoint {}", short_id(id)));
if ui.small_button("Kick").clicked() {
kick = Some(id.clone());
}
});
}
if let Some(id) = kick
&& let Some(p) = &mut self.host.proc
{
p.send_command(&format!("kick {id}"));
}
if let Some(info) = &self.host.info {
ui.label(
egui::RichText::new(format!("{} · {}", info.display, info.capture))
@@ -452,6 +473,7 @@ impl PixelPassApp {
self.host.max = 0;
self.host.capturing = false;
self.host.copied = false;
self.host.viewers.clear();
let mut args = vec![
"--host".to_string(),
@@ -483,6 +505,7 @@ impl PixelPassApp {
self.host.capturing = false;
self.host.ticket = None;
self.host.copied = false;
self.host.viewers.clear();
}
/// Drain the host child's event channel into state, and detect an
@@ -543,9 +566,17 @@ impl PixelPassApp {
cap_source: max_viewers_source,
});
}
ChildEvent::ViewerCount { active, max } => {
ChildEvent::ViewerJoined { id, active, max } => {
self.host.active = active;
self.host.max = max;
if !self.host.viewers.contains(&id) {
self.host.viewers.push(id);
}
}
ChildEvent::ViewerLeft { id, active, max } => {
self.host.active = active;
self.host.max = max;
self.host.viewers.retain(|v| v != &id);
}
ChildEvent::Capture { state } => {
self.host.capturing = matches!(state, child::CaptureState::Started);
+97 -20
View File
@@ -10,6 +10,7 @@ use anyhow::{Result, bail};
use iroh::endpoint::{Connection, presets};
use iroh::{Endpoint, EndpointAddr};
use iroh_tickets::endpoint::EndpointTicket;
use std::collections::HashMap;
use std::time::Duration;
use tokio::sync::{mpsc, oneshot};
use tokio_util::sync::CancellationToken;
@@ -23,15 +24,28 @@ use crate::common::{
use self::pipeline::CaptureHandle;
use self::quality::EffectiveQuality;
/// Messages from per-viewer tasks to the capture supervisor.
/// Messages from per-viewer tasks (and the GUI command channel) to the
/// capture supervisor.
// The shared `Viewer` suffix is the point — these are all viewer lifecycle
// messages — so keep the descriptive names.
#[allow(clippy::enum_variant_names)]
enum SupervisorMsg {
/// A new viewer wants in. Supervisor replies with the local capture
/// HTTP port to connect to, or an error string if the host is full or
/// capture spawn failed.
AddViewer(oneshot::Sender<Result<u16, String>>),
/// A new viewer wants in. Supervisor replies with the local capture HTTP
/// port to connect to, or an error string if the host is full or capture
/// spawn failed. `cancel` is the viewer's own token — the supervisor keeps
/// it so a later `KickViewer` can tear this viewer's stream down.
AddViewer {
id: String,
cancel: CancellationToken,
reply: oneshot::Sender<Result<u16, String>>,
},
/// A viewer's session ended. Supervisor decrements the count and tears
/// down capture if it just hit zero.
RemoveViewer,
RemoveViewer { id: String },
/// Host asked (via the GUI command channel) to disconnect a viewer by
/// endpoint id. Cancels that viewer's token; the normal teardown path then
/// emits the `ViewerLeft`.
KickViewer { id: String },
}
pub async fn run(opts: HostOpts) -> Result<()> {
@@ -113,6 +127,15 @@ pub async fn run(opts: HostOpts) -> Result<()> {
sup_rx,
));
// Command channel for the GUI front-end: read `kick <endpoint-id>` lines
// off stdin. Only when machine-driven (`--output json`) — a human host has
// nothing to type here, and we don't want to swallow terminal input. Runs
// on a plain OS thread (not a tokio task) so a read parked on stdin can't
// hold up runtime shutdown on Ctrl+C; the thread dies with the process.
if output::json_enabled() {
spawn_kick_listener(sup_tx.clone());
}
accept_loop(&endpoint, sup_tx.clone(), cancel.clone()).await;
drop(sup_tx);
@@ -159,9 +182,18 @@ async fn handle_peer(
cancel: CancellationToken,
) {
let remote = conn.remote_id();
let id = remote.to_string();
// This viewer's own kill switch: the supervisor holds a clone so a `kick`
// can cancel it, and the stream select! below watches it.
let peer_cancel = CancellationToken::new();
let (reply_tx, reply_rx) = oneshot::channel();
if sup_tx.send(SupervisorMsg::AddViewer(reply_tx)).await.is_err() {
let add = SupervisorMsg::AddViewer {
id: id.clone(),
cancel: peer_cancel.clone(),
reply: reply_tx,
};
if sup_tx.send(add).await.is_err() {
tracing::warn!(%remote, "supervisor channel closed; dropping peer");
return;
}
@@ -182,7 +214,7 @@ async fn handle_peer(
Ok(s) => s,
Err(e) => {
tracing::warn!(%remote, "accept_bi failed: {e:#}");
let _ = sup_tx.send(SupervisorMsg::RemoveViewer).await;
let _ = sup_tx.send(SupervisorMsg::RemoveViewer { id }).await;
return;
}
};
@@ -193,7 +225,7 @@ async fn handle_peer(
Ok(t) => t,
Err(e) => {
tracing::warn!(%remote, "connect_to_capture failed: {e:#}");
let _ = sup_tx.send(SupervisorMsg::RemoveViewer).await;
let _ = sup_tx.send(SupervisorMsg::RemoveViewer { id }).await;
return;
}
};
@@ -207,10 +239,34 @@ async fn handle_peer(
_ = cancel.cancelled() => {
tracing::info!(%remote, "cancellation during stream");
}
_ = peer_cancel.cancelled() => {
tracing::info!(%remote, "kicked by host");
}
}
eprintln!("[pixelpass] viewer disconnected: {remote}");
let _ = sup_tx.send(SupervisorMsg::RemoveViewer).await;
let _ = sup_tx.send(SupervisorMsg::RemoveViewer { id }).await;
}
/// Read `kick <endpoint-id>` lines off stdin and forward them to the
/// supervisor. Runs on a detached OS thread (see the call site for why). Ends
/// when stdin hits EOF (the GUI closed the pipe) or the supervisor is gone.
fn spawn_kick_listener(sup_tx: mpsc::Sender<SupervisorMsg>) {
use std::io::BufRead;
std::thread::spawn(move || {
let stdin = std::io::stdin();
for line in stdin.lock().lines().map_while(Result::ok) {
let Some(id) = line.trim().strip_prefix("kick ") else {
continue;
};
let msg = SupervisorMsg::KickViewer { id: id.trim().to_string() };
// blocking_send is valid here: this is a plain thread, not inside
// the tokio runtime. An Err means the supervisor closed — stop.
if sup_tx.blocking_send(msg).is_err() {
break;
}
}
});
}
/// Owns the single shared CaptureHandle and the active viewer count. Spawns
@@ -225,11 +281,15 @@ async fn supervise(
mut rx: mpsc::Receiver<SupervisorMsg>,
) {
let mut handle: Option<CaptureHandle> = None;
let mut count: u32 = 0;
// Active viewers, keyed by endpoint id, holding each one's kill switch.
// The count is just `viewers.len()`. (A given endpoint connecting twice is
// a non-case here: each viewer process uses a fresh ephemeral identity.)
let mut viewers: HashMap<String, CancellationToken> = HashMap::new();
while let Some(msg) = rx.recv().await {
match msg {
SupervisorMsg::AddViewer(reply) => {
SupervisorMsg::AddViewer { id, cancel, reply } => {
let count = viewers.len() as u32;
if count >= max_viewers {
let reason =
format!("host is full ({count} of {max_viewers} viewers connected)");
@@ -255,16 +315,22 @@ async fn supervise(
}
let port = handle.as_ref().expect("handle was just set").local_port();
count += 1;
viewers.insert(id.clone(), cancel);
let active = viewers.len() as u32;
let _ = reply.send(Ok(port));
output::emit(output::Event::ViewerCount { active: count, max: max_viewers });
tracing::info!(active = count, cap = max_viewers, "viewer joined");
output::emit(output::Event::ViewerJoined { id: &id, active, max: max_viewers });
tracing::info!(active, cap = max_viewers, "viewer joined");
}
SupervisorMsg::RemoveViewer => {
count = count.saturating_sub(1);
output::emit(output::Event::ViewerCount { active: count, max: max_viewers });
tracing::info!(active = count, cap = max_viewers, "viewer left");
if count == 0
SupervisorMsg::RemoveViewer { id } => {
// A given viewer task only ever sends RemoveViewer once, but the
// map remove is the source of truth either way.
if viewers.remove(&id).is_none() {
continue;
}
let active = viewers.len() as u32;
output::emit(output::Event::ViewerLeft { id: &id, active, max: max_viewers });
tracing::info!(active, cap = max_viewers, "viewer left");
if active == 0
&& let Some(h) = handle.take()
{
tracing::info!("last viewer left — tearing down capture");
@@ -274,6 +340,17 @@ async fn supervise(
});
}
}
SupervisorMsg::KickViewer { id } => {
match viewers.get(&id) {
// Cancel the viewer's token; its handle_peer select! wakes,
// sends RemoveViewer, and the leave is emitted there.
Some(cancel) => {
tracing::info!(%id, "kicking viewer");
cancel.cancel();
}
None => tracing::debug!(%id, "kick for unknown/already-gone viewer"),
}
}
}
}