Phase 1 foundation: CLI, iroh tunnel, lazy capture wiring

Scaffolding for PixelPass per ~/Documents/p2p-screenshare-plan.md §7
"Phase 1 — MVP". The full QUIC tunnel handshake works end-to-end
(verified locally: host generates ticket, viewer dials through iroh's
relay, open_bi succeeds, lazy capture is wired correctly).

What's implemented:

- Cargo project with deps locked: iroh 1.0.0-rc.0, iroh-tickets,
  tokio, clap, ashpd, pipewire-rs, x11rb, ashpd, anyhow, thiserror,
  tracing, nix, directories, uuid.
- src/cli.rs: complete clap surface per plan §6 (--window, --app,
  --mic, --display-server, --bitrate, --framerate, --no-hwencode,
  --low-latency, --port, --verbose, --repair).
- Mode dispatch in main.rs: EndpointTicket::from_str is the
  authoritative check; no regex / heuristics.
- common/display.rs: WAYLAND_DISPLAY → DISPLAY → XDG_SESSION_TYPE
  precedence with --display-server override.
- common/deps.rs: per-distro install hints (pacman/apt/dnf/zypper)
  parsing /etc/os-release.
- common/alpn.rs: ALPN = b"pixelpass/0".
- common/tunnel.rs: generic bidirectional bridge between an iroh
  bi-stream and any AsyncRead+AsyncWrite (typically a TCP socket).
- common/signal.rs: ctrl-c -> CancellationToken; second ctrl-c hard
  exit.
- host/mod.rs: build Endpoint, generate ticket, print banner, await
  first peer (lazy — no ffmpeg until peer connects), accept_bi,
  spawn capture, bridge to localhost ffmpeg HTTP listener.
- host/capture.rs: stub returning Phase-2 error; the place X11
  x11grab and Wayland ashpd+gst pipelines will land.
- viewer/mod.rs: Endpoint, connect with ALPN, open_bi, TcpListener
  on 127.0.0.1, print copy-ready mpv/vlc commands, bridge.
- repair.rs: stub for --repair PipeWire scan.

iroh 1.0-rc renamed Node* -> Endpoint* and moved EndpointTicket into
a sibling crate (iroh-tickets); no design impact. Plan still locked.
This commit is contained in:
2026-05-15 15:13:28 -04:00
commit 6ad92081aa
17 changed files with 5886 additions and 0 deletions
+103
View File
@@ -0,0 +1,103 @@
use clap::{Parser, ValueEnum};
#[derive(Parser, Debug)]
#[command(
name = "pixelpass",
version,
about = "P2P screen sharing over iroh + ffmpeg",
long_about = "Run with no arguments to host (prints a ticket to share). \
Pass a ticket to view."
)]
pub struct Cli {
/// iroh ticket. If present, runs as viewer. If absent, runs as host.
pub ticket: Option<String>,
// ── host options ──────────────────────────────────────────────────
/// Pick a single window instead of the whole screen.
#[arg(long)]
pub window: bool,
/// Capture only this app's audio (per-app PipeWire routing).
#[arg(long, value_name = "NAME")]
pub app: Option<String>,
/// Mix in the default microphone source.
#[arg(long)]
pub mic: bool,
/// Override display server autodetection.
#[arg(long, value_enum)]
pub display_server: Option<DisplayServerArg>,
/// Encode bitrate in kbps.
#[arg(long, default_value_t = 6000)]
pub bitrate: u32,
/// Capture framerate.
#[arg(long, default_value_t = 30)]
pub framerate: u32,
/// Disable VAAPI HW encode; force software x264.
#[arg(long)]
pub no_hwencode: bool,
/// Use low-latency SRT transport instead of HTTP MPEG-TS (Phase 2/3).
#[arg(long)]
pub low_latency: bool,
// ── viewer options ────────────────────────────────────────────────
/// Local TCP port for the viewer to expose (default: random).
#[arg(long, default_value_t = 0)]
pub port: u16,
// ── global ────────────────────────────────────────────────────────
/// Trace-level logging.
#[arg(long, short)]
pub verbose: bool,
/// Clean up orphaned PipeWire state from a crashed host run, then exit.
#[arg(long)]
pub repair: bool,
}
#[derive(ValueEnum, Clone, Copy, Debug)]
pub enum DisplayServerArg {
Wayland,
X11,
}
#[derive(Debug, Clone)]
pub struct HostOpts {
pub window: bool,
pub app: Option<String>,
pub mic: bool,
pub display_server: Option<DisplayServerArg>,
pub bitrate: u32,
pub framerate: u32,
pub no_hwencode: bool,
pub low_latency: bool,
}
#[derive(Debug, Clone)]
pub struct ViewerOpts {
pub port: u16,
}
impl Cli {
pub fn into_host_opts(self) -> HostOpts {
HostOpts {
window: self.window,
app: self.app,
mic: self.mic,
display_server: self.display_server,
bitrate: self.bitrate,
framerate: self.framerate,
no_hwencode: self.no_hwencode,
low_latency: self.low_latency,
}
}
pub fn into_viewer_opts(self) -> ViewerOpts {
ViewerOpts { port: self.port }
}
}
+5
View File
@@ -0,0 +1,5 @@
/// ALPN identifying the pixelpass wire protocol on the iroh tunnel.
///
/// Bump the version suffix whenever the wire format changes. Today the wire is
/// "raw MPEG-TS bytes copied bidirectionally," so bumps will be rare.
pub const ALPN: &[u8] = b"pixelpass/0";
+67
View File
@@ -0,0 +1,67 @@
use anyhow::{Result, bail};
use std::path::PathBuf;
use crate::common::display::DisplayServer;
pub fn check_host_binaries(display: DisplayServer) -> Result<()> {
require("ffmpeg")?;
if display == DisplayServer::Wayland {
require("gst-launch-1.0")?;
}
Ok(())
}
fn require(bin: &str) -> Result<PathBuf> {
match which(bin) {
Some(p) => Ok(p),
None => {
let hint = install_hint(bin);
bail!("`{bin}` not found on PATH.\n{hint}");
}
}
}
fn which(bin: &str) -> Option<PathBuf> {
let path = std::env::var_os("PATH")?;
for dir in std::env::split_paths(&path) {
let candidate = dir.join(bin);
if candidate.is_file() {
return Some(candidate);
}
}
None
}
fn install_hint(bin: &str) -> String {
let distro = detect_distro();
let pkg = match bin {
"ffmpeg" => "ffmpeg",
"gst-launch-1.0" => match distro.as_deref() {
Some("arch" | "cachyos" | "manjaro" | "endeavouros") => "gst-plugins-base gst-plugins-good",
Some("debian" | "ubuntu" | "pop" | "linuxmint") => "gstreamer1.0-tools gstreamer1.0-plugins-good",
Some("fedora" | "nobara") => "gstreamer1-plugins-good",
_ => "gstreamer (and tools / good-plugins)",
},
_ => bin,
};
let cmd = match distro.as_deref() {
Some("arch" | "cachyos" | "manjaro" | "endeavouros") => format!("sudo pacman -S {pkg}"),
Some("debian" | "ubuntu" | "pop" | "linuxmint") => format!("sudo apt install {pkg}"),
Some("fedora" | "nobara") => format!("sudo dnf install {pkg}"),
Some("opensuse" | "opensuse-tumbleweed" | "opensuse-leap") => format!("sudo zypper install {pkg}"),
_ => format!("install the `{pkg}` package via your distro's package manager"),
};
format!("Install hint: {cmd}")
}
fn detect_distro() -> Option<String> {
let contents = std::fs::read_to_string("/etc/os-release").ok()?;
for line in contents.lines() {
if let Some(rest) = line.strip_prefix("ID=") {
return Some(rest.trim_matches('"').to_lowercase());
}
}
None
}
+32
View File
@@ -0,0 +1,32 @@
use crate::cli::DisplayServerArg;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum DisplayServer {
Wayland,
X11,
Unknown,
}
impl DisplayServer {
pub fn detect() -> Self {
if std::env::var_os("WAYLAND_DISPLAY").is_some() {
return DisplayServer::Wayland;
}
if std::env::var_os("DISPLAY").is_some() {
return DisplayServer::X11;
}
match std::env::var("XDG_SESSION_TYPE").as_deref() {
Ok("wayland") => DisplayServer::Wayland,
Ok("x11") => DisplayServer::X11,
_ => DisplayServer::Unknown,
}
}
pub fn resolve(override_: Option<DisplayServerArg>) -> Self {
match override_ {
Some(DisplayServerArg::Wayland) => DisplayServer::Wayland,
Some(DisplayServerArg::X11) => DisplayServer::X11,
None => DisplayServer::detect(),
}
}
}
+5
View File
@@ -0,0 +1,5 @@
pub mod alpn;
pub mod deps;
pub mod display;
pub mod signal;
pub mod tunnel;
+20
View File
@@ -0,0 +1,20 @@
use tokio_util::sync::CancellationToken;
/// Install a ctrl-c handler that triggers the returned token.
///
/// The first ctrl-c cancels gracefully; a second ctrl-c terminates the process.
pub fn install_ctrl_c() -> CancellationToken {
let token = CancellationToken::new();
let trigger = token.clone();
tokio::spawn(async move {
if tokio::signal::ctrl_c().await.is_ok() {
tracing::info!("ctrl-c received, shutting down");
trigger.cancel();
}
if tokio::signal::ctrl_c().await.is_ok() {
tracing::warn!("second ctrl-c — exiting now");
std::process::exit(130);
}
});
token
}
+34
View File
@@ -0,0 +1,34 @@
use anyhow::Result;
use iroh::endpoint::{RecvStream, SendStream};
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
/// Bridge an iroh bi-stream and a generic AsyncRead+AsyncWrite (typically a TCP
/// socket) by copying bytes in both directions concurrently.
///
/// Returns once either direction finishes or errors. The peer-side socket
/// halves are owned and dropped here on exit, ensuring FIN propagates.
pub async fn bridge<T>(quic_send: SendStream, quic_recv: RecvStream, peer: T) -> Result<()>
where
T: AsyncRead + AsyncWrite + Unpin,
{
let (mut peer_r, mut peer_w) = tokio::io::split(peer);
let mut quic_send = quic_send;
let mut quic_recv = quic_recv;
let to_quic = async {
let n = tokio::io::copy(&mut peer_r, &mut quic_send).await;
let _ = quic_send.finish();
n
};
let to_peer = async {
let n = tokio::io::copy(&mut quic_recv, &mut peer_w).await;
let _ = peer_w.shutdown().await;
n
};
tokio::select! {
res = to_quic => { res?; }
res = to_peer => { res?; }
}
Ok(())
}
+44
View File
@@ -0,0 +1,44 @@
//! Capture-and-encode pipeline.
//!
//! Phase 1 stub: the actual ffmpeg / gst-launch spawn lives here. Returns a
//! [`CaptureHandle`] whose `Drop` impl kills the children so a panic on the
//! bridge side can't leave a black-hole encoder running.
//!
//! Phase 2 will fill in X11 (x11grab) and Wayland (ashpd portal +
//! pipewiresrc → fdsink | ffmpeg) bodies; per-app PipeWire routing hangs off
//! the Wayland branch.
use anyhow::{Result, bail};
use crate::cli::HostOpts;
use crate::common::display::DisplayServer;
pub struct CaptureHandle {
port: u16,
// Future: child handles, PipewireState, etc.
}
impl CaptureHandle {
pub fn local_port(&self) -> u16 {
self.port
}
}
impl Drop for CaptureHandle {
fn drop(&mut self) {
// Future: SIGTERM children with a short grace period, then SIGKILL.
// Tear down any PipeWire null sinks / loopbacks created in spawn().
}
}
pub async fn spawn(display: DisplayServer, _opts: &HostOpts) -> Result<CaptureHandle> {
match display {
DisplayServer::X11 => {
bail!("X11 capture pipeline not yet implemented (Phase 2)");
}
DisplayServer::Wayland => {
bail!("Wayland capture pipeline not yet implemented (Phase 2)");
}
DisplayServer::Unknown => unreachable!("caller guarantees display != Unknown"),
}
}
+132
View File
@@ -0,0 +1,132 @@
mod capture;
mod tunnel;
use anyhow::{Result, bail};
use iroh::Endpoint;
use iroh::endpoint::{Connection, presets};
use iroh_tickets::endpoint::EndpointTicket;
use tokio::net::TcpStream;
use tokio_util::sync::CancellationToken;
use crate::cli::HostOpts;
use crate::common::{alpn::ALPN, deps, display::DisplayServer, signal};
pub async fn run(opts: HostOpts) -> Result<()> {
let display = DisplayServer::resolve(opts.display_server);
deps::check_host_binaries(display)?;
if display == DisplayServer::Unknown {
bail!(
"could not detect display server (WAYLAND_DISPLAY / DISPLAY / XDG_SESSION_TYPE all unset).\n\
Use --display-server wayland|x11 to override."
);
}
let cancel = signal::install_ctrl_c();
let endpoint = Endpoint::builder(presets::N0)
.alpns(vec![ALPN.to_vec()])
.bind()
.await?;
let addr = endpoint.addr();
let ticket = EndpointTicket::new(addr);
print_host_banner(&ticket, display, &opts);
let result = accept_loop(&endpoint, display, &opts, cancel.clone()).await;
endpoint.close().await;
result
}
async fn accept_loop(
endpoint: &Endpoint,
display: DisplayServer,
opts: &HostOpts,
cancel: CancellationToken,
) -> Result<()> {
tokio::select! {
_ = cancel.cancelled() => {
tracing::info!("cancellation requested before any peer connected");
Ok(())
}
accepted = endpoint.accept() => {
let Some(incoming) = accepted else {
bail!("endpoint stopped accepting connections");
};
let conn = incoming.await?;
let remote = conn.remote_id();
tracing::info!(%remote, "peer connected");
eprintln!("\n[pixelpass] peer connected: {remote}\n");
handle_peer(conn, display, opts, cancel).await
}
}
}
async fn handle_peer(
conn: Connection,
display: DisplayServer,
opts: &HostOpts,
cancel: CancellationToken,
) -> Result<()> {
let (quic_send, quic_recv) = conn.accept_bi().await?;
// Spawn ffmpeg (and on Wayland, the gst-launch bridge) listening on a
// random localhost port. Returns the port and a guard that kills the
// child(ren) on drop.
let capture_handle = capture::spawn(display, opts).await?;
let port = capture_handle.local_port();
// Give ffmpeg's `-listen 1` HTTP server a moment to bind before we dial.
tokio::time::sleep(std::time::Duration::from_millis(150)).await;
let tcp = TcpStream::connect(("127.0.0.1", port)).await?;
let bridge = crate::common::tunnel::bridge(quic_send, quic_recv, tcp);
tokio::select! {
res = bridge => {
if let Err(e) = res {
tracing::warn!("bridge ended with error: {e:#}");
} else {
tracing::info!("bridge closed cleanly");
}
}
_ = cancel.cancelled() => {
tracing::info!("cancellation requested during stream");
}
}
drop(capture_handle); // explicit teardown of ffmpeg/gst
Ok(())
}
fn print_host_banner(ticket: &EndpointTicket, display: DisplayServer, opts: &HostOpts) {
eprintln!();
eprintln!("┌─ PixelPass · host ─────────────────────────────────────────");
eprintln!("│ display server : {display:?}");
eprintln!("│ capture : {}", capture_summary(opts));
eprintln!("│ bitrate / fps : {} kbps @ {} fps", opts.bitrate, opts.framerate);
eprintln!("│ hw encode : {}", if opts.no_hwencode { "off" } else { "auto (VAAPI if available)" });
eprintln!("");
eprintln!("│ Share this ticket with your viewer:");
eprintln!("");
eprintln!("│ pixelpass {ticket}");
eprintln!("");
eprintln!("│ Capture will not start until the viewer connects.");
eprintln!("│ Press Ctrl+C to stop.");
eprintln!("└────────────────────────────────────────────────────────────");
eprintln!();
}
fn capture_summary(opts: &HostOpts) -> String {
let mut bits = vec![if opts.window { "window" } else { "fullscreen" }.to_string()];
if let Some(app) = &opts.app {
bits.push(format!("app-audio={app}"));
} else {
bits.push("system-audio".to_string());
}
if opts.mic {
bits.push("mic".to_string());
}
bits.join(" + ")
}
+3
View File
@@ -0,0 +1,3 @@
//! Host-side tunnel glue lives here once it gains host-specific concerns
//! (e.g. coordinating capture teardown with bridge shutdown). For now the
//! generic bridge in `common::tunnel` is sufficient.
+40
View File
@@ -0,0 +1,40 @@
mod cli;
mod common;
mod host;
mod repair;
mod viewer;
use anyhow::Result;
use clap::Parser;
use cli::Cli;
use iroh_tickets::endpoint::EndpointTicket;
use tracing_subscriber::EnvFilter;
#[tokio::main]
async fn main() -> Result<()> {
let cli = Cli::parse();
init_tracing(cli.verbose);
if cli.repair {
return repair::run().await;
}
match cli.ticket.as_deref() {
Some(s) => {
let ticket: EndpointTicket = s.parse().map_err(|e| {
anyhow::anyhow!(
"argument doesn't look like a pixelpass ticket ({e}).\n\
Run with no arguments to host, or pass a ticket to view."
)
})?;
viewer::run(ticket, cli.into_viewer_opts()).await
}
None => host::run(cli.into_host_opts()).await,
}
}
fn init_tracing(verbose: bool) {
let default = if verbose { "pixelpass=trace,iroh=info" } else { "pixelpass=info,iroh=warn" };
let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(default));
tracing_subscriber::fmt().with_env_filter(filter).with_target(false).init();
}
+12
View File
@@ -0,0 +1,12 @@
//! `--repair`: clean up any null sinks / loopbacks that a crashed pixelpass
//! host left behind. Phase 2 will scan PipeWire for nodes tagged with the
//! `pixelpass.session = <uuid>` property and destroy them.
use anyhow::Result;
pub async fn run() -> Result<()> {
eprintln!("[pixelpass] --repair: PipeWire scan not yet implemented (Phase 2).");
eprintln!(" Run `pactl list short sinks | grep pixelpass` to spot orphans,");
eprintln!(" and `pactl unload-module <id>` to remove them manually.");
Ok(())
}
+57
View File
@@ -0,0 +1,57 @@
mod tunnel;
use anyhow::Result;
use iroh::Endpoint;
use iroh::endpoint::presets;
use iroh_tickets::endpoint::EndpointTicket;
use tokio::net::TcpListener;
use crate::cli::ViewerOpts;
use crate::common::{alpn::ALPN, signal};
pub async fn run(ticket: EndpointTicket, opts: ViewerOpts) -> Result<()> {
let cancel = signal::install_ctrl_c();
let endpoint = Endpoint::builder(presets::N0)
.alpns(vec![ALPN.to_vec()])
.bind()
.await?;
let addr = ticket.endpoint_addr().clone();
tracing::info!(remote = %addr.id, "connecting to host");
let conn = endpoint.connect(addr, ALPN).await?;
let (quic_send, quic_recv) = conn.open_bi().await?;
let listener = TcpListener::bind(("127.0.0.1", opts.port)).await?;
let port = listener.local_addr()?.port();
print_viewer_banner(port);
let result = tokio::select! {
accepted = listener.accept() => {
let (tcp, peer) = accepted?;
tracing::info!(%peer, "local viewer connected");
crate::common::tunnel::bridge(quic_send, quic_recv, tcp).await
}
_ = cancel.cancelled() => {
tracing::info!("ctrl-c received before local viewer connected");
Ok(())
}
};
endpoint.close().await;
result
}
fn print_viewer_banner(port: u16) {
let url = format!("http://127.0.0.1:{port}");
eprintln!();
eprintln!("┌─ PixelPass · viewer ───────────────────────────────────────");
eprintln!("│ Connected to host. Open the stream in your player:");
eprintln!("");
eprintln!("│ mpv --profile=low-latency --untimed {url}");
eprintln!("│ vlc --network-caching=200 --live-caching=200 {url}");
eprintln!("");
eprintln!("│ Press Ctrl+C to disconnect.");
eprintln!("└────────────────────────────────────────────────────────────");
eprintln!();
}
+2
View File
@@ -0,0 +1,2 @@
//! Viewer-side tunnel glue lives here if/when it grows beyond the generic
//! bridge in `common::tunnel`. For Phase 1 the generic bridge is sufficient.