Files
pixelpass/src/host/mod.rs
T
mollusk e7ded10db8 feat(output): --output json machine-readable event stream
Adds common/output.rs: a process-global JSON-lines emitter for
non-interactive front-ends. With --output json, host and viewer emit one
JSON object per line on stdout (ticket, host_info, viewer_count, capture
start/stop, viewer_refused, connected), flushed per line; the human banner
and tracing logs stay on stderr so the two never interleave. No-op when the
flag is absent, so call sites emit unconditionally.

This is the shell-out counterpart to an in-process event channel: the
upcoming --gui front-end re-execs this binary as `pixelpass --host
--output json` and parses these lines to drive its window. serde_json was
already in the tree from the bandwidth pre-flight.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-24 16:17:38 -04:00

396 lines
14 KiB
Rust

pub mod audio;
mod capture;
mod pipeline;
mod quality;
mod serve;
mod wayland;
mod x11;
use anyhow::{Result, bail};
use iroh::endpoint::{Connection, presets};
use iroh::{Endpoint, EndpointAddr};
use iroh_tickets::endpoint::EndpointTicket;
use std::time::Duration;
use tokio::sync::{mpsc, oneshot};
use tokio_util::sync::CancellationToken;
use crate::cli::HostOpts;
use crate::common::{
alpn::ALPN, bandwidth, config, config::BandwidthStatus, deps, display::DisplayServer, output,
signal, tunnel,
};
use self::pipeline::CaptureHandle;
use self::quality::EffectiveQuality;
/// Messages from per-viewer tasks to the capture supervisor.
enum SupervisorMsg {
/// A new viewer wants in. Supervisor replies with the local capture
/// HTTP port to connect to, or an error string if the host is full or
/// capture spawn failed.
AddViewer(oneshot::Sender<Result<u16, String>>),
/// A viewer's session ended. Supervisor decrements the count and tears
/// down capture if it just hit zero.
RemoveViewer,
}
pub async fn run(opts: HostOpts) -> Result<()> {
let display = DisplayServer::resolve(opts.display_server);
deps::check_host_binaries(display, &opts)?;
if display == DisplayServer::Unknown {
bail!(
"could not detect display server (WAYLAND_DISPLAY / DISPLAY / XDG_SESSION_TYPE all unset).\n\
Use --display-server wayland|x11 to override."
);
}
// Resolve quality first: Auto sizes its bandwidth budget against the viewer
// cap the host will honor. To avoid a circular dependency (the auto-derived
// cap itself depends on bitrate), Auto sizes against the user's explicit
// --max-viewers when given, else a single viewer. The resulting effective
// bitrate then feeds the cap resolution below.
let sizing_viewers = opts.max_viewers.filter(|&n| n > 0).unwrap_or(1);
let quality = quality::resolve(&opts, sizing_viewers);
let resolution = resolve_max_viewers(&opts, quality.bitrate);
if resolution.value == 0 {
bail!("--max-viewers must be at least 1");
}
let cancel = signal::install_ctrl_c();
let endpoint = Endpoint::builder(presets::N0)
.alpns(vec![ALPN.to_vec()])
.bind()
.await?;
// Relay-only ticket: wait for the home relay to connect, then keep only
// the endpoint id + relay URL and drop the direct IP candidates. The relay
// coordinates hole-punching to a direct path right after connect, so this
// doesn't change whether peers can reach each other — it just keeps the
// ticket short (~140 vs ~320 chars) and stops it from leaking LAN /
// Docker-bridge addresses to whoever receives the ticket. Awaiting online()
// first guarantees the relay URL is actually present (addr() right after
// bind can return before the relay handshake completes); the 15s cap means
// a relay outage degrades to a possibly-incomplete ticket rather than a hang
// (n0 DNS discovery still resolves the id in that case).
if tokio::time::timeout(Duration::from_secs(15), endpoint.online())
.await
.is_err()
{
tracing::warn!("home relay not connected within 15s; ticket may be incomplete");
}
let addr = endpoint.addr();
let relay_only =
EndpointAddr::new(addr.id).with_addrs(addr.addrs.iter().filter(|a| a.is_relay()).cloned());
let ticket = EndpointTicket::new(relay_only);
let ticket_str = ticket.to_string();
let clipboard_ok = opts.interactive && copy_to_clipboard(&ticket_str);
print_host_banner(&ticket, display, &opts, &quality, &resolution, clipboard_ok);
output::emit(output::Event::Ticket { value: &ticket_str });
let display_str = format!("{display:?}");
let capture = capture_summary(&opts);
let dims = quality.dimensions_summary();
let cap_source = resolution.source.label();
output::emit(output::Event::HostInfo {
display_server: &display_str,
capture: &capture,
quality: &quality.label,
dimensions: &dims,
hw_encode: !opts.no_hwencode,
max_viewers: resolution.value,
max_viewers_source: &cap_source,
});
let (sup_tx, sup_rx) = mpsc::channel::<SupervisorMsg>(16);
let supervisor = tokio::spawn(supervise(
opts.clone(),
quality,
display,
resolution.value,
sup_rx,
));
accept_loop(&endpoint, sup_tx.clone(), cancel.clone()).await;
drop(sup_tx);
let _ = supervisor.await;
endpoint.close().await;
Ok(())
}
async fn accept_loop(
endpoint: &Endpoint,
sup_tx: mpsc::Sender<SupervisorMsg>,
cancel: CancellationToken,
) {
loop {
tokio::select! {
_ = cancel.cancelled() => {
tracing::info!("cancellation requested — closing accept loop");
return;
}
accepted = endpoint.accept() => {
let Some(incoming) = accepted else {
tracing::info!("endpoint stopped accepting connections");
return;
};
let conn = match incoming.await {
Ok(c) => c,
Err(e) => {
tracing::warn!("incoming connection failed: {e:#}");
continue;
}
};
let sup_tx = sup_tx.clone();
let cancel = cancel.clone();
tokio::spawn(handle_peer(conn, sup_tx, cancel));
}
}
}
}
async fn handle_peer(
conn: Connection,
sup_tx: mpsc::Sender<SupervisorMsg>,
cancel: CancellationToken,
) {
let remote = conn.remote_id();
let (reply_tx, reply_rx) = oneshot::channel();
if sup_tx.send(SupervisorMsg::AddViewer(reply_tx)).await.is_err() {
tracing::warn!(%remote, "supervisor channel closed; dropping peer");
return;
}
let port = match reply_rx.await {
Ok(Ok(p)) => p,
Ok(Err(reason)) => {
tracing::warn!(%remote, %reason, "refusing viewer");
eprintln!("[pixelpass] refusing viewer {remote}: {reason}");
return;
}
Err(_) => {
tracing::warn!(%remote, "supervisor reply dropped; dropping peer");
return;
}
};
let (quic_send, quic_recv) = match conn.accept_bi().await {
Ok(s) => s,
Err(e) => {
tracing::warn!(%remote, "accept_bi failed: {e:#}");
let _ = sup_tx.send(SupervisorMsg::RemoveViewer).await;
return;
}
};
eprintln!("[pixelpass] viewer connected: {remote}");
let tcp = match serve::connect_to_capture(port, Duration::from_secs(5)).await {
Ok(t) => t,
Err(e) => {
tracing::warn!(%remote, "connect_to_capture failed: {e:#}");
let _ = sup_tx.send(SupervisorMsg::RemoveViewer).await;
return;
}
};
let bridge = tunnel::bridge(quic_send, quic_recv, tcp);
tokio::select! {
res = bridge => match res {
Ok(()) => tracing::info!(%remote, "bridge closed cleanly"),
Err(e) => tracing::info!(%remote, "bridge ended: {e:#}"),
},
_ = cancel.cancelled() => {
tracing::info!(%remote, "cancellation during stream");
}
}
eprintln!("[pixelpass] viewer disconnected: {remote}");
let _ = sup_tx.send(SupervisorMsg::RemoveViewer).await;
}
/// Owns the single shared CaptureHandle and the active viewer count. Spawns
/// capture lazily on the first AddViewer; tears it down when the count drops
/// back to zero. Enforces the max-viewers cap by refusing AddViewer when
/// the count is already at the cap.
async fn supervise(
opts: HostOpts,
quality: EffectiveQuality,
display: DisplayServer,
max_viewers: u32,
mut rx: mpsc::Receiver<SupervisorMsg>,
) {
let mut handle: Option<CaptureHandle> = None;
let mut count: u32 = 0;
while let Some(msg) = rx.recv().await {
match msg {
SupervisorMsg::AddViewer(reply) => {
if count >= max_viewers {
let reason =
format!("host is full ({count} of {max_viewers} viewers connected)");
output::emit(output::Event::ViewerRefused { reason: &reason });
let _ = reply.send(Err(reason));
continue;
}
if handle.is_none() {
tracing::info!("first viewer arriving — spawning capture");
match capture::spawn(display, &opts, &quality).await {
Ok(h) => {
handle = Some(h);
output::emit(output::Event::Capture {
state: output::CaptureState::Started,
});
}
Err(e) => {
let _ = reply.send(Err(format!("capture spawn failed: {e:#}")));
continue;
}
}
}
let port = handle.as_ref().expect("handle was just set").local_port();
count += 1;
let _ = reply.send(Ok(port));
output::emit(output::Event::ViewerCount { active: count, max: max_viewers });
tracing::info!(active = count, cap = max_viewers, "viewer joined");
}
SupervisorMsg::RemoveViewer => {
count = count.saturating_sub(1);
output::emit(output::Event::ViewerCount { active: count, max: max_viewers });
tracing::info!(active = count, cap = max_viewers, "viewer left");
if count == 0
&& let Some(h) = handle.take()
{
tracing::info!("last viewer left — tearing down capture");
h.shutdown().await;
output::emit(output::Event::Capture {
state: output::CaptureState::Stopped,
});
}
}
}
}
if let Some(h) = handle.take() {
tracing::info!("host shutdown — tearing down capture");
h.shutdown().await;
output::emit(output::Event::Capture {
state: output::CaptureState::Stopped,
});
}
}
fn print_host_banner(
ticket: &EndpointTicket,
display: DisplayServer,
opts: &HostOpts,
quality: &EffectiveQuality,
resolution: &MaxViewersResolution,
clipboard_ok: bool,
) {
eprintln!();
eprintln!("┌─ PixelPass · host ─────────────────────────────────────────");
eprintln!("│ display server : {display:?}");
eprintln!("│ capture : {}", capture_summary(opts));
eprintln!("│ quality : {}{}", quality.label, quality.dimensions_summary());
eprintln!("│ ({})", quality.note);
eprintln!("│ hw encode : {}", if opts.no_hwencode { "off (software x264)" } else { "on (VAAPI H.264)" });
eprintln!("│ max viewers : {} ({})", resolution.value, resolution.source.label());
eprintln!("");
if clipboard_ok {
eprintln!("│ Your share code has been copied to your clipboard.");
eprintln!("│ Send it to your viewer(s). (If clipboard didn't work, the");
eprintln!("│ code is also shown below for manual copy.)");
} else {
eprintln!("│ Share this ticket with your viewer(s):");
}
eprintln!("");
eprintln!("│ pixelpass {ticket}");
eprintln!("");
eprintln!("│ Capture starts when the first viewer connects, runs while");
eprintln!("│ any viewer is connected, and tears down when the last one");
eprintln!("│ leaves. Press Ctrl+C to stop the host entirely.");
eprintln!("└────────────────────────────────────────────────────────────");
eprintln!();
}
/// How we arrived at the final viewer cap. Surfaced in the banner so the
/// user can tell at a glance whether the number is what they specified,
/// what their measured upstream supports, or just the fallback default.
struct MaxViewersResolution {
value: u32,
source: MaxViewersSource,
}
enum MaxViewersSource {
/// User passed --max-viewers explicitly.
UserFlag,
/// Derived from the saved bandwidth measurement.
BandwidthMeasurement { safe_mbps: f64 },
/// No flag, no measurement — falling back.
DefaultFallback,
}
impl MaxViewersSource {
fn label(&self) -> String {
match self {
MaxViewersSource::UserFlag => "user-specified".to_string(),
MaxViewersSource::BandwidthMeasurement { safe_mbps } => {
format!("auto: {safe_mbps:.1} Mbps measured upstream")
}
MaxViewersSource::DefaultFallback => {
"default — run `pixelpass --reconfigure` for a connection-aware value".to_string()
}
}
}
}
fn resolve_max_viewers(opts: &HostOpts, effective_bitrate: u32) -> MaxViewersResolution {
if let Some(n) = opts.max_viewers {
return MaxViewersResolution {
value: n,
source: MaxViewersSource::UserFlag,
};
}
if let Ok(cfg) = config::load()
&& cfg.bandwidth.status == BandwidthStatus::Measured
&& let Some(upstream) = cfg.bandwidth.upstream_mbps
{
let n = bandwidth::recommended_max_viewers(upstream, effective_bitrate);
return MaxViewersResolution {
value: n,
source: MaxViewersSource::BandwidthMeasurement { safe_mbps: upstream },
};
}
MaxViewersResolution {
value: 2,
source: MaxViewersSource::DefaultFallback,
}
}
fn copy_to_clipboard(text: &str) -> bool {
match arboard::Clipboard::new().and_then(|mut cb| cb.set_text(text.to_owned())) {
Ok(()) => true,
Err(e) => {
tracing::warn!("clipboard copy failed: {e}");
false
}
}
}
fn capture_summary(opts: &HostOpts) -> String {
let mut bits = vec![if opts.window { "window" } else { "fullscreen" }.to_string()];
if let Some(app) = &opts.app {
bits.push(format!("app-audio={app}"));
} else {
bits.push("system-audio".to_string());
}
bits.join(" + ")
}