diff --git a/src/common/alpn.rs b/src/common/alpn.rs index abb9ece..f708d5c 100644 --- a/src/common/alpn.rs +++ b/src/common/alpn.rs @@ -1,5 +1,14 @@ -/// ALPN identifying the pixelpass wire protocol on the iroh tunnel. +/// ALPN identifying the pixelpass video wire protocol on the iroh tunnel. /// /// Bump the version suffix whenever the wire format changes. Today the wire is /// "raw MPEG-TS bytes copied bidirectionally," so bumps will be rare. pub const ALPN: &[u8] = b"pixelpass/0"; + +/// ALPN for the friends control plane — the always-on presence endpoint that +/// carries friend requests and shared codes between peers' GUIs. Separate from +/// [`ALPN`] so the same machine can run a control endpoint and a video endpoint +/// without their accept loops colliding, and so a control dial never lands on a +/// bare video host (which doesn't speak this protocol). GUI-only, like the rest +/// of the friends stack. +#[cfg(feature = "gui")] +pub const CONTROL_ALPN: &[u8] = b"pixelpass/ctrl/0"; diff --git a/src/common/control.rs b/src/common/control.rs new file mode 100644 index 0000000..161a25e --- /dev/null +++ b/src/common/control.rs @@ -0,0 +1,261 @@ +//! Friends control-plane protocol and service. +//! +//! This is the always-on presence channel that rides the [`CONTROL_ALPN`] +//! endpoint (bound with the persistent identity — see +//! [`super::endpoint::bind_control`]). It's how two peers' GUIs exchange friend +//! requests and pushed share-codes, independent of any video session. +//! +//! Wire shape: **one message per connection.** The sender opens a bi-stream, +//! writes the JSON-encoded [`ControlMsg`], and finishes its send side (EOF +//! delimits the message — no length framing needed). The receiver reads to EOF, +//! parses, hands the message up, then writes a one-byte [`ACK`] back so the +//! sender knows it was delivered *and* parsed. That delivery signal is what +//! lets the host-side code-push queue (a later phase) tell "sent" from "friend +//! was offline." A friend's *reply* (accept/decline) is a separate later +//! connection in the other direction, because acceptance can happen minutes +//! after the request — not a response on the same stream. + +use std::time::Duration; + +use anyhow::{Context, Result, bail}; +use iroh::endpoint::{Incoming, VarInt}; +use iroh::{Endpoint, EndpointAddr, EndpointId}; +use serde::{Deserialize, Serialize}; +use tokio::sync::mpsc; + +use super::alpn::CONTROL_ALPN; + +/// Upper bound on a single control message. Generous for a display name plus a +/// share-code ticket (~150 chars); rejects a peer trying to make us buffer a +/// huge blob. +const MAX_MSG: usize = 64 * 1024; + +/// One-byte application acknowledgement the receiver returns once it has parsed +/// a message. ASCII ACK (0x06). +const ACK: &[u8] = b"\x06"; + +/// Bound on each phase of the send handshake, so a half-dead peer or relay +/// can't park a sender (or an inbound handler) forever. +const IO_TIMEOUT: Duration = Duration::from_secs(10); + +/// A message on the friends control plane. +/// +/// `#[serde(tag = "type")]` keeps the JSON self-describing and lets us add +/// variants without breaking older peers (an unknown tag fails to parse and is +/// logged, rather than being silently misread as another variant). +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(tag = "type", rename_all = "snake_case")] +pub enum ControlMsg { + /// "I'm online; here's my current display name." A presence/name refresh. + Hello { name: String }, + /// Ask the recipient to become friends. + FriendRequest { name: String }, + /// Accept a request the recipient previously sent us. + FriendAccept { name: String }, + /// Decline a pending request, or cancel an outgoing one. + FriendDecline, + /// A host pushing a freshly generated share-code to an accepted friend. + ShareCode { name: String, ticket: String }, +} + +/// A received control message, paired with the *authenticated* sender id (the +/// connection's verified remote public key — not a value the peer can spoof in +/// the payload, which is why no variant carries a sender id). +#[derive(Debug, Clone)] +pub struct Inbound { + pub from: EndpointId, + pub msg: ControlMsg, +} + +fn encode(msg: &ControlMsg) -> Result> { + serde_json::to_vec(msg).context("failed to encode control message") +} + +fn decode(bytes: &[u8]) -> Result { + serde_json::from_slice(bytes).context("failed to decode control message") +} + +/// Deliver one message to `peer` over `endpoint`, returning once the recipient +/// has acknowledged it. An error means it was *not* delivered (peer offline, +/// unreachable, or rejected the stream) — the caller can queue and retry. +/// +/// `peer` is usually a bare [`EndpointId`] — friends store only the stable id, +/// and n0 DNS discovery resolves it to a live address. The full [`EndpointAddr`] +/// form exists for callers that already hold one (and for hermetic tests). +// +// Lands ahead of its caller: the outbound paths (friend requests, code pushes) +// are wired into the GUI in Phase 3/4. The loopback test exercises it now. +#[allow(dead_code)] +pub async fn send( + endpoint: &Endpoint, + peer: impl Into, + msg: &ControlMsg, +) -> Result<()> { + let payload = encode(msg)?; + let conn = tokio::time::timeout(IO_TIMEOUT, endpoint.connect(peer, CONTROL_ALPN)) + .await + .context("timed out connecting to peer")? + .context("failed to connect to peer")?; + + let io = async { + let (mut send, mut recv) = conn + .open_bi() + .await + .context("failed to open control stream")?; + send.write_all(&payload) + .await + .context("failed to write control message")?; + send.finish().context("failed to finish control stream")?; + // Read the peer's ACK. read_to_end returns once the peer finishes its + // send side, so this also serves as "the peer is done with us." + let ack = recv + .read_to_end(ACK.len() + 1) + .await + .context("peer closed the control stream without acknowledging")?; + if ack != ACK { + bail!( + "peer sent an unexpected acknowledgement ({} bytes)", + ack.len() + ); + } + Ok(()) + }; + + let result = tokio::time::timeout(IO_TIMEOUT, io) + .await + .context("timed out sending control message")?; + // Clean close so the peer's `closed().await` returns promptly either way. + conn.close(VarInt::from_u32(0), b"done"); + result +} + +/// Run the control-plane accept loop, forwarding every received message to +/// `tx`. Returns when the endpoint stops accepting (i.e. it was closed). +pub async fn serve(endpoint: Endpoint, tx: mpsc::Sender) { + while let Some(incoming) = endpoint.accept().await { + let tx = tx.clone(); + tokio::spawn(async move { + if let Err(e) = handle(incoming, &tx).await { + tracing::warn!("control: inbound connection failed: {e:#}"); + } + }); + } + tracing::info!("control: endpoint stopped accepting"); +} + +async fn handle(incoming: Incoming, tx: &mpsc::Sender) -> Result<()> { + let conn = incoming + .await + .context("inbound control connection failed")?; + let from = conn.remote_id(); + + let msg = async { + let (mut send, mut recv) = conn + .accept_bi() + .await + .context("failed to accept control stream")?; + let bytes = recv + .read_to_end(MAX_MSG) + .await + .context("failed to read control message")?; + let msg = decode(&bytes)?; + // ACK only after a successful parse, so the sender's delivery signal + // means "received and understood." + send.write_all(ACK).await.context("failed to write ack")?; + send.finish().context("failed to finish ack stream")?; + Ok::<_, anyhow::Error>(msg) + }; + + let msg = tokio::time::timeout(IO_TIMEOUT, msg) + .await + .context("timed out reading control message")??; + + // Wait (briefly) for the sender's close so our ACK flushes before the + // connection is dropped at the end of this scope. + let _ = tokio::time::timeout(IO_TIMEOUT, conn.closed()).await; + + tx.send(Inbound { from, msg }) + .await + .map_err(|_| anyhow::anyhow!("control: receiver dropped"))?; + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn control_msg_round_trips() { + let cases = [ + ControlMsg::Hello { + name: "alice".into(), + }, + ControlMsg::FriendRequest { name: "bob".into() }, + ControlMsg::FriendAccept { + name: "carol".into(), + }, + ControlMsg::FriendDecline, + ControlMsg::ShareCode { + name: "dave".into(), + ticket: "endpointaa…".into(), + }, + ]; + for msg in cases { + let bytes = encode(&msg).unwrap(); + assert_eq!(decode(&bytes).unwrap(), msg); + } + } + + #[test] + fn unknown_tag_is_rejected() { + assert!(decode(br#"{"type":"nonsense"}"#).is_err()); + } + + /// Bind a control-plane endpoint with a *fresh* random key, so two of them + /// in one test get distinct ids (two real machines each have their own + /// persistent key; `bind_control` would give both the same one here, and + /// iroh refuses "connecting to ourself"). + async fn bind_test_control() -> Endpoint { + iroh::Endpoint::builder(iroh::endpoint::presets::N0) + .secret_key(iroh::SecretKey::generate()) + .alpns(vec![CONTROL_ALPN.to_vec()]) + .bind() + .await + .unwrap() + } + + /// End-to-end over two real iroh endpoints on this machine. Ignored by + /// default — it binds endpoints and waits on the relay, so it's slow and + /// network-dependent. Run with `cargo test -- --ignored control`. + #[tokio::test] + #[ignore = "binds real iroh endpoints; run on demand"] + async fn loopback_delivers_and_acks() { + let server = bind_test_control().await; + let client = bind_test_control().await; + // Connect by full addr so the test doesn't depend on DNS discovery. + server.online().await; + client.online().await; + let server_addr = server.addr(); + + let (tx, mut rx) = mpsc::channel(4); + let server_ep = server.clone(); + let serve_task = tokio::spawn(async move { serve(server_ep, tx).await }); + + let msg = ControlMsg::FriendRequest { + name: "tester".into(), + }; + // Full addr (not just the id) so the test doesn't depend on DNS discovery. + send(&client, server_addr.clone(), &msg).await.unwrap(); + + let got = tokio::time::timeout(Duration::from_secs(15), rx.recv()) + .await + .expect("no inbound within 15s") + .expect("channel closed"); + assert_eq!(got.msg, msg); + assert_eq!(got.from, client.addr().id); + + server.close().await; + client.close().await; + serve_task.abort(); + } +} diff --git a/src/common/endpoint.rs b/src/common/endpoint.rs index 6fb21f6..49caaeb 100644 --- a/src/common/endpoint.rs +++ b/src/common/endpoint.rs @@ -1,6 +1,19 @@ -//! Shared iroh endpoint construction for the host and viewer. +//! Shared iroh endpoint construction. //! -//! Both sides bind an endpoint with the same ALPN; the only knob is the relay. +//! Two planes, two identities: +//! +//! * The **video** plane (host/viewer sessions) binds with an *ephemeral* +//! keypair — a fresh `EndpointId` per run. Each session is a throwaway tunnel, +//! and keeping its id ephemeral means a screen-share leaks no stable +//! fingerprint. +//! * The **control** plane (the always-on friends presence service) binds with +//! the machine's *persistent* identity (see [`identity`]), so peers can find +//! and recognise each other across launches. +//! +//! They must use different identities because both can be live at once on the +//! same machine (the GUI's control endpoint while a host session runs), and +//! iroh routes by `EndpointId` — two live endpoints sharing one id would make +//! relay delivery ambiguous. use std::str::FromStr; @@ -9,7 +22,6 @@ use iroh::endpoint::presets; use iroh::{Endpoint, RelayMap, RelayMode, RelayUrl}; use super::alpn::ALPN; -use super::identity; /// Environment variable consulted when `--relay` isn't passed. Lets the GUI's /// child processes and scripted runs inherit a relay choice without a flag. @@ -18,11 +30,14 @@ pub const RELAY_ENV: &str = "PIXELPASS_RELAY"; /// Resolve the relay override: explicit `--relay` wins, else `PIXELPASS_RELAY`, /// else `None` (use the bundled defaults). pub fn relay_override(flag: Option<&str>) -> Option { - flag.map(str::to_owned) - .or_else(|| std::env::var(RELAY_ENV).ok().filter(|s| !s.trim().is_empty())) + flag.map(str::to_owned).or_else(|| { + std::env::var(RELAY_ENV) + .ok() + .filter(|s| !s.trim().is_empty()) + }) } -/// Bind the iroh endpoint with our ALPN. +/// Bind a **video-plane** endpoint (host/viewer) with an ephemeral identity. /// /// With no `relay` override we use [`presets::N0`] — n0 DNS discovery, the /// library's default relays, and the chosen crypto provider. With an override @@ -30,21 +45,40 @@ pub fn relay_override(flag: Option<&str>) -> Option { /// [`RelayMode::Custom`]; this is how a user gets off the rc's bundled /// (canary-grade) relays or points at a self-hosted one. Discovery is /// unchanged, so peers still resolve each other by endpoint id. -/// -/// The endpoint is bound with the machine's persistent identity (see -/// [`identity`]), so its `EndpointId` is stable across launches and roles — -/// the property the friends system is built on. pub async fn bind(relay: Option<&str>) -> Result { - let secret_key = identity::load_or_create()?; - let mut builder = Endpoint::builder(presets::N0) - .secret_key(secret_key) - .alpns(vec![ALPN.to_vec()]); + // No `secret_key` set → iroh mints a fresh ephemeral keypair for this run. + bind_with(relay, None, ALPN).await +} + +/// Bind the **control-plane** endpoint with the machine's persistent identity +/// (see [`super::identity`]) and the friends [`super::alpn::CONTROL_ALPN`]. Its +/// `EndpointId` is the stable id friends know you by. +#[cfg(feature = "gui")] +pub async fn bind_control(relay: Option<&str>) -> Result { + let secret_key = super::identity::load_or_create()?; + bind_with(relay, Some(secret_key), super::alpn::CONTROL_ALPN).await +} + +/// Shared builder: optional persistent key (None → ephemeral) + the plane's ALPN. +async fn bind_with( + relay: Option<&str>, + key: Option, + alpn: &[u8], +) -> Result { + let mut builder = Endpoint::builder(presets::N0).alpns(vec![alpn.to_vec()]); + if let Some(key) = key { + builder = builder.secret_key(key); + } if let Some(url) = relay { - let url = RelayUrl::from_str(url) - .with_context(|| format!("invalid relay URL {url:?} (expected e.g. https://relay.example/)"))?; + let url = RelayUrl::from_str(url).with_context(|| { + format!("invalid relay URL {url:?} (expected e.g. https://relay.example/)") + })?; builder = builder.relay_mode(RelayMode::Custom(RelayMap::from(url))); } - builder.bind().await.context("failed to bind the iroh endpoint") + builder + .bind() + .await + .context("failed to bind the iroh endpoint") } diff --git a/src/common/identity.rs b/src/common/identity.rs index 01e8cb5..609c396 100644 --- a/src/common/identity.rs +++ b/src/common/identity.rs @@ -63,8 +63,7 @@ pub fn save(key: &SecretKey) -> Result<()> { let parent = path .parent() .context("identity path has no parent directory")?; - fs::create_dir_all(parent) - .with_context(|| format!("failed to create {}", parent.display()))?; + fs::create_dir_all(parent).with_context(|| format!("failed to create {}", parent.display()))?; let tmp = parent.join(format!(".identity.key.tmp.{}", std::process::id())); { @@ -95,7 +94,7 @@ fn encode_hex(bytes: &[u8]) -> String { } fn decode_hex(s: &str) -> Result> { - if s.len() % 2 != 0 { + if !s.len().is_multiple_of(2) { bail!("hex string has an odd length"); } (0..s.len()) diff --git a/src/common/mod.rs b/src/common/mod.rs index 9179857..0fefb0a 100644 --- a/src/common/mod.rs +++ b/src/common/mod.rs @@ -1,10 +1,16 @@ pub mod alpn; pub mod bandwidth; pub mod config; +// The friends stack (persistent identity + control plane) is GUI-only — a +// headless CLI host runs no presence service — so it's gated with the feature +// that pulls the rest of the GUI, keeping the headless build lean. +#[cfg(feature = "gui")] +pub mod control; pub mod deps; -pub mod endpoint; -pub mod identity; pub mod display; +pub mod endpoint; +#[cfg(feature = "gui")] +pub mod identity; pub mod output; pub mod process; pub mod signal; diff --git a/src/gui/mod.rs b/src/gui/mod.rs index af0cc53..399f536 100644 --- a/src/gui/mod.rs +++ b/src/gui/mod.rs @@ -37,6 +37,7 @@ //! dropped and no egui frame is running. mod child; +mod presence; mod theme; mod tray; @@ -65,6 +66,7 @@ use winit::raw_window_handle::HasWindowHandle as _; use winit::window::{Window, WindowAttributes, WindowId}; use self::child::{ChildEvent, ChildProc}; +use self::presence::PresenceHandle; use self::tray::{TrayAction, TrayHandle, TrayStatus}; /// Initial / minimum window size, in logical points. Initial height fits the @@ -601,6 +603,9 @@ pub fn run(relay: Option) -> anyhow::Result<()> { }; // The tray runs on its own thread and wakes us via the proxy. let tray = tray::start(proxy.clone()); + // The friends presence service runs on its own thread too, waking us when + // a control message arrives. + let presence = presence::start(waker.clone(), relay.clone()); let gui_settings = crate::common::config::load() .map(|c| c.gui) .unwrap_or_default(); @@ -626,6 +631,7 @@ pub fn run(relay: Option) -> anyhow::Result<()> { status: None, }, waker, + presence, }; let mut app = App { state, @@ -880,6 +886,10 @@ struct PixelPassApp { theme: ThemeState, /// Wakes the winit loop when a spawned child emits/exits. waker: Waker, + /// The always-on friends presence service (control-plane endpoint). `None` + /// if it couldn't start (no identity), in which case friends features are + /// simply absent. + presence: Option, } /// The active theme plus the Settings picker/editor working state. @@ -956,9 +966,22 @@ impl PixelPassApp { fn tick(&mut self) { self.pump_host_events(); self.pump_viewer_events(); + self.pump_presence_events(); self.sync_tray_status(); } + /// Drain inbound control-plane messages. Phase 3 turns these into friend-list + /// state, in-app notifications, and the bell badge; for now they're logged so + /// the control plane is observable end-to-end. + fn pump_presence_events(&mut self) { + let Some(presence) = &self.presence else { + return; + }; + for inbound in presence.drain() { + tracing::info!(from = %inbound.from, msg = ?inbound.msg, "presence: control message"); + } + } + /// Render the current screen. Called from inside the egui frame. fn draw(&mut self, ui: &mut egui::Ui) { self.handle_keys(ui); diff --git a/src/gui/presence.rs b/src/gui/presence.rs new file mode 100644 index 0000000..a617150 --- /dev/null +++ b/src/gui/presence.rs @@ -0,0 +1,105 @@ +//! The always-on friends presence service. +//! +//! A control-plane iroh endpoint ([`endpoint::bind_control`]) that lives for the +//! whole GUI session on its own thread with a current-thread tokio runtime — the +//! GUI is a synchronous winit/egui loop, so iroh's async work can't run on it +//! (the same reason [`super::tray`] has its own thread + runtime). +//! +//! Inbound control messages are forwarded over a std mpsc channel the UI drains +//! each [`super::PixelPassApp::tick`]; the [`Waker`] is pinged on arrival so a +//! message wakes the loop even while the window is hidden to the tray — the same +//! trick the headless-child reader uses. + +use std::sync::mpsc::{self, Receiver}; +use std::thread; + +use iroh::EndpointId; +use tokio::sync::mpsc as tmpsc; + +use super::Waker; +use crate::common::{ + control::{self, Inbound}, + endpoint, identity, +}; + +/// Handle the GUI holds for the presence service. Dropping it doesn't stop the +/// service (the thread is detached; the endpoint closes when the process exits) +/// — it just stops the UI from draining inbound messages. +pub struct PresenceHandle { + /// Inbound control messages, drained by [`PresenceHandle::drain`] each tick. + rx: Receiver, +} + +impl PresenceHandle { + /// Pull every control message received since the last call. Collected by the + /// caller so it can take `&mut self` while handling them. + pub fn drain(&self) -> Vec { + std::iter::from_fn(|| self.rx.try_recv().ok()).collect() + } +} + +/// Start the presence service. Returns `None` if the persistent identity can't +/// be loaded — the GUI then simply runs without friends features rather than +/// refusing to start. The endpoint binds asynchronously on the spawned thread; +/// our id is known immediately because it derives from the saved key, so we can +/// fail-fast and log it without waiting on the relay handshake. +pub fn start(waker: Waker, relay: Option) -> Option { + let id: EndpointId = match identity::load_or_create() { + Ok(key) => key.public(), + Err(e) => { + tracing::warn!("presence: no identity, friends features disabled: {e:#}"); + return None; + } + }; + tracing::info!(%id, "presence: starting control service"); + + let (tx, rx) = mpsc::channel::(); + thread::Builder::new() + .name("pixelpass-presence".into()) + .spawn(move || run(relay, id, tx, waker)) + .map_err(|e| tracing::warn!("presence: could not spawn service thread: {e}")) + .ok()?; + + Some(PresenceHandle { rx }) +} + +/// Thread body: a current-thread tokio runtime that binds the control endpoint, +/// runs the accept loop, and bridges inbound messages to the UI channel. +fn run(relay: Option, id: EndpointId, tx: mpsc::Sender, waker: Waker) { + let rt = match tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + { + Ok(rt) => rt, + Err(e) => { + tracing::error!("presence: failed to build runtime: {e}"); + return; + } + }; + + rt.block_on(async move { + let ep = match endpoint::bind_control(relay.as_deref()).await { + Ok(ep) => ep, + Err(e) => { + tracing::error!("presence: failed to bind control endpoint: {e:#}"); + return; + } + }; + tracing::info!(%id, "presence: control endpoint online"); + + // Bridge the async accept loop to the sync UI channel, waking the loop + // on each message so it lands even while hidden to the tray. + let (itx, mut irx) = tmpsc::channel::(32); + let forward = tokio::spawn(async move { + while let Some(inbound) = irx.recv().await { + if tx.send(inbound).is_err() { + break; // UI gone + } + waker.wake(); + } + }); + + control::serve(ep, itx).await; + forward.abort(); + }); +}