feat(friends): always-on control plane for the presence service (phase 2)
Stand up the friends control plane: a persistent-identity iroh endpoint that's online for the whole GUI session, separate from the ephemeral video sessions, ready to carry friend requests and pushed share-codes. Identity split by plane (common/endpoint.rs): the video plane (host/ viewer) goes back to ephemeral per-session keypairs, while the new bind_control() binds with the machine's persistent identity. They must differ — the GUI's control endpoint and a host's video endpoint can be live at once, and iroh routes by EndpointId, so a shared id would make relay delivery ambiguous. Bonus: a screen-share now leaks no stable id. common/control.rs — the protocol: a ControlMsg enum (Hello / Friend Request / FriendAccept / FriendDecline / ShareCode) with one-message- per-connection framing (EOF-delimited JSON) and a one-byte ACK the receiver returns only after a successful parse, so send() gets a real delivered/failed signal (the basis for the later code-push queue). The sender id is taken from the connection's verified remote key, never the payload. send() takes impl Into<EndpointAddr> so production dials a bare EndpointId (discovery resolves it) while tests use a full addr. gui/presence.rs — the service: a dedicated thread + current-thread tokio runtime (mirroring the tray) binds the control endpoint and runs the accept loop, bridging inbound messages to a std mpsc the UI drains each tick and pinging the Waker so they land even while hidden to the tray. The whole friends stack (identity, control, CONTROL_ALPN, bind_control) is gated behind the `gui` feature — a headless CLI host runs no presence service — keeping the headless build lean and warning-free. Verified: loopback test delivers a FriendRequest across two real iroh endpoints with the correct authenticated sender id; the live GUI binds its control endpoint on launch under the persistent identity. fmt + clippy clean on both feature sets; headless and gui test suites pass. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
+10
-1
@@ -1,5 +1,14 @@
|
||||
/// ALPN identifying the pixelpass wire protocol on the iroh tunnel.
|
||||
/// ALPN identifying the pixelpass video wire protocol on the iroh tunnel.
|
||||
///
|
||||
/// Bump the version suffix whenever the wire format changes. Today the wire is
|
||||
/// "raw MPEG-TS bytes copied bidirectionally," so bumps will be rare.
|
||||
pub const ALPN: &[u8] = b"pixelpass/0";
|
||||
|
||||
/// ALPN for the friends control plane — the always-on presence endpoint that
|
||||
/// carries friend requests and shared codes between peers' GUIs. Separate from
|
||||
/// [`ALPN`] so the same machine can run a control endpoint and a video endpoint
|
||||
/// without their accept loops colliding, and so a control dial never lands on a
|
||||
/// bare video host (which doesn't speak this protocol). GUI-only, like the rest
|
||||
/// of the friends stack.
|
||||
#[cfg(feature = "gui")]
|
||||
pub const CONTROL_ALPN: &[u8] = b"pixelpass/ctrl/0";
|
||||
|
||||
@@ -0,0 +1,261 @@
|
||||
//! Friends control-plane protocol and service.
|
||||
//!
|
||||
//! This is the always-on presence channel that rides the [`CONTROL_ALPN`]
|
||||
//! endpoint (bound with the persistent identity — see
|
||||
//! [`super::endpoint::bind_control`]). It's how two peers' GUIs exchange friend
|
||||
//! requests and pushed share-codes, independent of any video session.
|
||||
//!
|
||||
//! Wire shape: **one message per connection.** The sender opens a bi-stream,
|
||||
//! writes the JSON-encoded [`ControlMsg`], and finishes its send side (EOF
|
||||
//! delimits the message — no length framing needed). The receiver reads to EOF,
|
||||
//! parses, hands the message up, then writes a one-byte [`ACK`] back so the
|
||||
//! sender knows it was delivered *and* parsed. That delivery signal is what
|
||||
//! lets the host-side code-push queue (a later phase) tell "sent" from "friend
|
||||
//! was offline." A friend's *reply* (accept/decline) is a separate later
|
||||
//! connection in the other direction, because acceptance can happen minutes
|
||||
//! after the request — not a response on the same stream.
|
||||
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::{Context, Result, bail};
|
||||
use iroh::endpoint::{Incoming, VarInt};
|
||||
use iroh::{Endpoint, EndpointAddr, EndpointId};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
use super::alpn::CONTROL_ALPN;
|
||||
|
||||
/// Upper bound on a single control message. Generous for a display name plus a
|
||||
/// share-code ticket (~150 chars); rejects a peer trying to make us buffer a
|
||||
/// huge blob.
|
||||
const MAX_MSG: usize = 64 * 1024;
|
||||
|
||||
/// One-byte application acknowledgement the receiver returns once it has parsed
|
||||
/// a message. ASCII ACK (0x06).
|
||||
const ACK: &[u8] = b"\x06";
|
||||
|
||||
/// Bound on each phase of the send handshake, so a half-dead peer or relay
|
||||
/// can't park a sender (or an inbound handler) forever.
|
||||
const IO_TIMEOUT: Duration = Duration::from_secs(10);
|
||||
|
||||
/// A message on the friends control plane.
|
||||
///
|
||||
/// `#[serde(tag = "type")]` keeps the JSON self-describing and lets us add
|
||||
/// variants without breaking older peers (an unknown tag fails to parse and is
|
||||
/// logged, rather than being silently misread as another variant).
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
#[serde(tag = "type", rename_all = "snake_case")]
|
||||
pub enum ControlMsg {
|
||||
/// "I'm online; here's my current display name." A presence/name refresh.
|
||||
Hello { name: String },
|
||||
/// Ask the recipient to become friends.
|
||||
FriendRequest { name: String },
|
||||
/// Accept a request the recipient previously sent us.
|
||||
FriendAccept { name: String },
|
||||
/// Decline a pending request, or cancel an outgoing one.
|
||||
FriendDecline,
|
||||
/// A host pushing a freshly generated share-code to an accepted friend.
|
||||
ShareCode { name: String, ticket: String },
|
||||
}
|
||||
|
||||
/// A received control message, paired with the *authenticated* sender id (the
|
||||
/// connection's verified remote public key — not a value the peer can spoof in
|
||||
/// the payload, which is why no variant carries a sender id).
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Inbound {
|
||||
pub from: EndpointId,
|
||||
pub msg: ControlMsg,
|
||||
}
|
||||
|
||||
fn encode(msg: &ControlMsg) -> Result<Vec<u8>> {
|
||||
serde_json::to_vec(msg).context("failed to encode control message")
|
||||
}
|
||||
|
||||
fn decode(bytes: &[u8]) -> Result<ControlMsg> {
|
||||
serde_json::from_slice(bytes).context("failed to decode control message")
|
||||
}
|
||||
|
||||
/// Deliver one message to `peer` over `endpoint`, returning once the recipient
|
||||
/// has acknowledged it. An error means it was *not* delivered (peer offline,
|
||||
/// unreachable, or rejected the stream) — the caller can queue and retry.
|
||||
///
|
||||
/// `peer` is usually a bare [`EndpointId`] — friends store only the stable id,
|
||||
/// and n0 DNS discovery resolves it to a live address. The full [`EndpointAddr`]
|
||||
/// form exists for callers that already hold one (and for hermetic tests).
|
||||
//
|
||||
// Lands ahead of its caller: the outbound paths (friend requests, code pushes)
|
||||
// are wired into the GUI in Phase 3/4. The loopback test exercises it now.
|
||||
#[allow(dead_code)]
|
||||
pub async fn send(
|
||||
endpoint: &Endpoint,
|
||||
peer: impl Into<EndpointAddr>,
|
||||
msg: &ControlMsg,
|
||||
) -> Result<()> {
|
||||
let payload = encode(msg)?;
|
||||
let conn = tokio::time::timeout(IO_TIMEOUT, endpoint.connect(peer, CONTROL_ALPN))
|
||||
.await
|
||||
.context("timed out connecting to peer")?
|
||||
.context("failed to connect to peer")?;
|
||||
|
||||
let io = async {
|
||||
let (mut send, mut recv) = conn
|
||||
.open_bi()
|
||||
.await
|
||||
.context("failed to open control stream")?;
|
||||
send.write_all(&payload)
|
||||
.await
|
||||
.context("failed to write control message")?;
|
||||
send.finish().context("failed to finish control stream")?;
|
||||
// Read the peer's ACK. read_to_end returns once the peer finishes its
|
||||
// send side, so this also serves as "the peer is done with us."
|
||||
let ack = recv
|
||||
.read_to_end(ACK.len() + 1)
|
||||
.await
|
||||
.context("peer closed the control stream without acknowledging")?;
|
||||
if ack != ACK {
|
||||
bail!(
|
||||
"peer sent an unexpected acknowledgement ({} bytes)",
|
||||
ack.len()
|
||||
);
|
||||
}
|
||||
Ok(())
|
||||
};
|
||||
|
||||
let result = tokio::time::timeout(IO_TIMEOUT, io)
|
||||
.await
|
||||
.context("timed out sending control message")?;
|
||||
// Clean close so the peer's `closed().await` returns promptly either way.
|
||||
conn.close(VarInt::from_u32(0), b"done");
|
||||
result
|
||||
}
|
||||
|
||||
/// Run the control-plane accept loop, forwarding every received message to
|
||||
/// `tx`. Returns when the endpoint stops accepting (i.e. it was closed).
|
||||
pub async fn serve(endpoint: Endpoint, tx: mpsc::Sender<Inbound>) {
|
||||
while let Some(incoming) = endpoint.accept().await {
|
||||
let tx = tx.clone();
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = handle(incoming, &tx).await {
|
||||
tracing::warn!("control: inbound connection failed: {e:#}");
|
||||
}
|
||||
});
|
||||
}
|
||||
tracing::info!("control: endpoint stopped accepting");
|
||||
}
|
||||
|
||||
async fn handle(incoming: Incoming, tx: &mpsc::Sender<Inbound>) -> Result<()> {
|
||||
let conn = incoming
|
||||
.await
|
||||
.context("inbound control connection failed")?;
|
||||
let from = conn.remote_id();
|
||||
|
||||
let msg = async {
|
||||
let (mut send, mut recv) = conn
|
||||
.accept_bi()
|
||||
.await
|
||||
.context("failed to accept control stream")?;
|
||||
let bytes = recv
|
||||
.read_to_end(MAX_MSG)
|
||||
.await
|
||||
.context("failed to read control message")?;
|
||||
let msg = decode(&bytes)?;
|
||||
// ACK only after a successful parse, so the sender's delivery signal
|
||||
// means "received and understood."
|
||||
send.write_all(ACK).await.context("failed to write ack")?;
|
||||
send.finish().context("failed to finish ack stream")?;
|
||||
Ok::<_, anyhow::Error>(msg)
|
||||
};
|
||||
|
||||
let msg = tokio::time::timeout(IO_TIMEOUT, msg)
|
||||
.await
|
||||
.context("timed out reading control message")??;
|
||||
|
||||
// Wait (briefly) for the sender's close so our ACK flushes before the
|
||||
// connection is dropped at the end of this scope.
|
||||
let _ = tokio::time::timeout(IO_TIMEOUT, conn.closed()).await;
|
||||
|
||||
tx.send(Inbound { from, msg })
|
||||
.await
|
||||
.map_err(|_| anyhow::anyhow!("control: receiver dropped"))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn control_msg_round_trips() {
|
||||
let cases = [
|
||||
ControlMsg::Hello {
|
||||
name: "alice".into(),
|
||||
},
|
||||
ControlMsg::FriendRequest { name: "bob".into() },
|
||||
ControlMsg::FriendAccept {
|
||||
name: "carol".into(),
|
||||
},
|
||||
ControlMsg::FriendDecline,
|
||||
ControlMsg::ShareCode {
|
||||
name: "dave".into(),
|
||||
ticket: "endpointaa…".into(),
|
||||
},
|
||||
];
|
||||
for msg in cases {
|
||||
let bytes = encode(&msg).unwrap();
|
||||
assert_eq!(decode(&bytes).unwrap(), msg);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn unknown_tag_is_rejected() {
|
||||
assert!(decode(br#"{"type":"nonsense"}"#).is_err());
|
||||
}
|
||||
|
||||
/// Bind a control-plane endpoint with a *fresh* random key, so two of them
|
||||
/// in one test get distinct ids (two real machines each have their own
|
||||
/// persistent key; `bind_control` would give both the same one here, and
|
||||
/// iroh refuses "connecting to ourself").
|
||||
async fn bind_test_control() -> Endpoint {
|
||||
iroh::Endpoint::builder(iroh::endpoint::presets::N0)
|
||||
.secret_key(iroh::SecretKey::generate())
|
||||
.alpns(vec![CONTROL_ALPN.to_vec()])
|
||||
.bind()
|
||||
.await
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
/// End-to-end over two real iroh endpoints on this machine. Ignored by
|
||||
/// default — it binds endpoints and waits on the relay, so it's slow and
|
||||
/// network-dependent. Run with `cargo test -- --ignored control`.
|
||||
#[tokio::test]
|
||||
#[ignore = "binds real iroh endpoints; run on demand"]
|
||||
async fn loopback_delivers_and_acks() {
|
||||
let server = bind_test_control().await;
|
||||
let client = bind_test_control().await;
|
||||
// Connect by full addr so the test doesn't depend on DNS discovery.
|
||||
server.online().await;
|
||||
client.online().await;
|
||||
let server_addr = server.addr();
|
||||
|
||||
let (tx, mut rx) = mpsc::channel(4);
|
||||
let server_ep = server.clone();
|
||||
let serve_task = tokio::spawn(async move { serve(server_ep, tx).await });
|
||||
|
||||
let msg = ControlMsg::FriendRequest {
|
||||
name: "tester".into(),
|
||||
};
|
||||
// Full addr (not just the id) so the test doesn't depend on DNS discovery.
|
||||
send(&client, server_addr.clone(), &msg).await.unwrap();
|
||||
|
||||
let got = tokio::time::timeout(Duration::from_secs(15), rx.recv())
|
||||
.await
|
||||
.expect("no inbound within 15s")
|
||||
.expect("channel closed");
|
||||
assert_eq!(got.msg, msg);
|
||||
assert_eq!(got.from, client.addr().id);
|
||||
|
||||
server.close().await;
|
||||
client.close().await;
|
||||
serve_task.abort();
|
||||
}
|
||||
}
|
||||
+51
-17
@@ -1,6 +1,19 @@
|
||||
//! Shared iroh endpoint construction for the host and viewer.
|
||||
//! Shared iroh endpoint construction.
|
||||
//!
|
||||
//! Both sides bind an endpoint with the same ALPN; the only knob is the relay.
|
||||
//! Two planes, two identities:
|
||||
//!
|
||||
//! * The **video** plane (host/viewer sessions) binds with an *ephemeral*
|
||||
//! keypair — a fresh `EndpointId` per run. Each session is a throwaway tunnel,
|
||||
//! and keeping its id ephemeral means a screen-share leaks no stable
|
||||
//! fingerprint.
|
||||
//! * The **control** plane (the always-on friends presence service) binds with
|
||||
//! the machine's *persistent* identity (see [`identity`]), so peers can find
|
||||
//! and recognise each other across launches.
|
||||
//!
|
||||
//! They must use different identities because both can be live at once on the
|
||||
//! same machine (the GUI's control endpoint while a host session runs), and
|
||||
//! iroh routes by `EndpointId` — two live endpoints sharing one id would make
|
||||
//! relay delivery ambiguous.
|
||||
|
||||
use std::str::FromStr;
|
||||
|
||||
@@ -9,7 +22,6 @@ use iroh::endpoint::presets;
|
||||
use iroh::{Endpoint, RelayMap, RelayMode, RelayUrl};
|
||||
|
||||
use super::alpn::ALPN;
|
||||
use super::identity;
|
||||
|
||||
/// Environment variable consulted when `--relay` isn't passed. Lets the GUI's
|
||||
/// child processes and scripted runs inherit a relay choice without a flag.
|
||||
@@ -18,11 +30,14 @@ pub const RELAY_ENV: &str = "PIXELPASS_RELAY";
|
||||
/// Resolve the relay override: explicit `--relay` wins, else `PIXELPASS_RELAY`,
|
||||
/// else `None` (use the bundled defaults).
|
||||
pub fn relay_override(flag: Option<&str>) -> Option<String> {
|
||||
flag.map(str::to_owned)
|
||||
.or_else(|| std::env::var(RELAY_ENV).ok().filter(|s| !s.trim().is_empty()))
|
||||
flag.map(str::to_owned).or_else(|| {
|
||||
std::env::var(RELAY_ENV)
|
||||
.ok()
|
||||
.filter(|s| !s.trim().is_empty())
|
||||
})
|
||||
}
|
||||
|
||||
/// Bind the iroh endpoint with our ALPN.
|
||||
/// Bind a **video-plane** endpoint (host/viewer) with an ephemeral identity.
|
||||
///
|
||||
/// With no `relay` override we use [`presets::N0`] — n0 DNS discovery, the
|
||||
/// library's default relays, and the chosen crypto provider. With an override
|
||||
@@ -30,21 +45,40 @@ pub fn relay_override(flag: Option<&str>) -> Option<String> {
|
||||
/// [`RelayMode::Custom`]; this is how a user gets off the rc's bundled
|
||||
/// (canary-grade) relays or points at a self-hosted one. Discovery is
|
||||
/// unchanged, so peers still resolve each other by endpoint id.
|
||||
///
|
||||
/// The endpoint is bound with the machine's persistent identity (see
|
||||
/// [`identity`]), so its `EndpointId` is stable across launches and roles —
|
||||
/// the property the friends system is built on.
|
||||
pub async fn bind(relay: Option<&str>) -> Result<Endpoint> {
|
||||
let secret_key = identity::load_or_create()?;
|
||||
let mut builder = Endpoint::builder(presets::N0)
|
||||
.secret_key(secret_key)
|
||||
.alpns(vec![ALPN.to_vec()]);
|
||||
// No `secret_key` set → iroh mints a fresh ephemeral keypair for this run.
|
||||
bind_with(relay, None, ALPN).await
|
||||
}
|
||||
|
||||
/// Bind the **control-plane** endpoint with the machine's persistent identity
|
||||
/// (see [`super::identity`]) and the friends [`super::alpn::CONTROL_ALPN`]. Its
|
||||
/// `EndpointId` is the stable id friends know you by.
|
||||
#[cfg(feature = "gui")]
|
||||
pub async fn bind_control(relay: Option<&str>) -> Result<Endpoint> {
|
||||
let secret_key = super::identity::load_or_create()?;
|
||||
bind_with(relay, Some(secret_key), super::alpn::CONTROL_ALPN).await
|
||||
}
|
||||
|
||||
/// Shared builder: optional persistent key (None → ephemeral) + the plane's ALPN.
|
||||
async fn bind_with(
|
||||
relay: Option<&str>,
|
||||
key: Option<iroh::SecretKey>,
|
||||
alpn: &[u8],
|
||||
) -> Result<Endpoint> {
|
||||
let mut builder = Endpoint::builder(presets::N0).alpns(vec![alpn.to_vec()]);
|
||||
if let Some(key) = key {
|
||||
builder = builder.secret_key(key);
|
||||
}
|
||||
|
||||
if let Some(url) = relay {
|
||||
let url = RelayUrl::from_str(url)
|
||||
.with_context(|| format!("invalid relay URL {url:?} (expected e.g. https://relay.example/)"))?;
|
||||
let url = RelayUrl::from_str(url).with_context(|| {
|
||||
format!("invalid relay URL {url:?} (expected e.g. https://relay.example/)")
|
||||
})?;
|
||||
builder = builder.relay_mode(RelayMode::Custom(RelayMap::from(url)));
|
||||
}
|
||||
|
||||
builder.bind().await.context("failed to bind the iroh endpoint")
|
||||
builder
|
||||
.bind()
|
||||
.await
|
||||
.context("failed to bind the iroh endpoint")
|
||||
}
|
||||
|
||||
@@ -63,8 +63,7 @@ pub fn save(key: &SecretKey) -> Result<()> {
|
||||
let parent = path
|
||||
.parent()
|
||||
.context("identity path has no parent directory")?;
|
||||
fs::create_dir_all(parent)
|
||||
.with_context(|| format!("failed to create {}", parent.display()))?;
|
||||
fs::create_dir_all(parent).with_context(|| format!("failed to create {}", parent.display()))?;
|
||||
|
||||
let tmp = parent.join(format!(".identity.key.tmp.{}", std::process::id()));
|
||||
{
|
||||
@@ -95,7 +94,7 @@ fn encode_hex(bytes: &[u8]) -> String {
|
||||
}
|
||||
|
||||
fn decode_hex(s: &str) -> Result<Vec<u8>> {
|
||||
if s.len() % 2 != 0 {
|
||||
if !s.len().is_multiple_of(2) {
|
||||
bail!("hex string has an odd length");
|
||||
}
|
||||
(0..s.len())
|
||||
|
||||
+8
-2
@@ -1,10 +1,16 @@
|
||||
pub mod alpn;
|
||||
pub mod bandwidth;
|
||||
pub mod config;
|
||||
// The friends stack (persistent identity + control plane) is GUI-only — a
|
||||
// headless CLI host runs no presence service — so it's gated with the feature
|
||||
// that pulls the rest of the GUI, keeping the headless build lean.
|
||||
#[cfg(feature = "gui")]
|
||||
pub mod control;
|
||||
pub mod deps;
|
||||
pub mod endpoint;
|
||||
pub mod identity;
|
||||
pub mod display;
|
||||
pub mod endpoint;
|
||||
#[cfg(feature = "gui")]
|
||||
pub mod identity;
|
||||
pub mod output;
|
||||
pub mod process;
|
||||
pub mod signal;
|
||||
|
||||
@@ -37,6 +37,7 @@
|
||||
//! dropped and no egui frame is running.
|
||||
|
||||
mod child;
|
||||
mod presence;
|
||||
mod theme;
|
||||
mod tray;
|
||||
|
||||
@@ -65,6 +66,7 @@ use winit::raw_window_handle::HasWindowHandle as _;
|
||||
use winit::window::{Window, WindowAttributes, WindowId};
|
||||
|
||||
use self::child::{ChildEvent, ChildProc};
|
||||
use self::presence::PresenceHandle;
|
||||
use self::tray::{TrayAction, TrayHandle, TrayStatus};
|
||||
|
||||
/// Initial / minimum window size, in logical points. Initial height fits the
|
||||
@@ -601,6 +603,9 @@ pub fn run(relay: Option<String>) -> anyhow::Result<()> {
|
||||
};
|
||||
// The tray runs on its own thread and wakes us via the proxy.
|
||||
let tray = tray::start(proxy.clone());
|
||||
// The friends presence service runs on its own thread too, waking us when
|
||||
// a control message arrives.
|
||||
let presence = presence::start(waker.clone(), relay.clone());
|
||||
let gui_settings = crate::common::config::load()
|
||||
.map(|c| c.gui)
|
||||
.unwrap_or_default();
|
||||
@@ -626,6 +631,7 @@ pub fn run(relay: Option<String>) -> anyhow::Result<()> {
|
||||
status: None,
|
||||
},
|
||||
waker,
|
||||
presence,
|
||||
};
|
||||
let mut app = App {
|
||||
state,
|
||||
@@ -880,6 +886,10 @@ struct PixelPassApp {
|
||||
theme: ThemeState,
|
||||
/// Wakes the winit loop when a spawned child emits/exits.
|
||||
waker: Waker,
|
||||
/// The always-on friends presence service (control-plane endpoint). `None`
|
||||
/// if it couldn't start (no identity), in which case friends features are
|
||||
/// simply absent.
|
||||
presence: Option<PresenceHandle>,
|
||||
}
|
||||
|
||||
/// The active theme plus the Settings picker/editor working state.
|
||||
@@ -956,9 +966,22 @@ impl PixelPassApp {
|
||||
fn tick(&mut self) {
|
||||
self.pump_host_events();
|
||||
self.pump_viewer_events();
|
||||
self.pump_presence_events();
|
||||
self.sync_tray_status();
|
||||
}
|
||||
|
||||
/// Drain inbound control-plane messages. Phase 3 turns these into friend-list
|
||||
/// state, in-app notifications, and the bell badge; for now they're logged so
|
||||
/// the control plane is observable end-to-end.
|
||||
fn pump_presence_events(&mut self) {
|
||||
let Some(presence) = &self.presence else {
|
||||
return;
|
||||
};
|
||||
for inbound in presence.drain() {
|
||||
tracing::info!(from = %inbound.from, msg = ?inbound.msg, "presence: control message");
|
||||
}
|
||||
}
|
||||
|
||||
/// Render the current screen. Called from inside the egui frame.
|
||||
fn draw(&mut self, ui: &mut egui::Ui) {
|
||||
self.handle_keys(ui);
|
||||
|
||||
@@ -0,0 +1,105 @@
|
||||
//! The always-on friends presence service.
|
||||
//!
|
||||
//! A control-plane iroh endpoint ([`endpoint::bind_control`]) that lives for the
|
||||
//! whole GUI session on its own thread with a current-thread tokio runtime — the
|
||||
//! GUI is a synchronous winit/egui loop, so iroh's async work can't run on it
|
||||
//! (the same reason [`super::tray`] has its own thread + runtime).
|
||||
//!
|
||||
//! Inbound control messages are forwarded over a std mpsc channel the UI drains
|
||||
//! each [`super::PixelPassApp::tick`]; the [`Waker`] is pinged on arrival so a
|
||||
//! message wakes the loop even while the window is hidden to the tray — the same
|
||||
//! trick the headless-child reader uses.
|
||||
|
||||
use std::sync::mpsc::{self, Receiver};
|
||||
use std::thread;
|
||||
|
||||
use iroh::EndpointId;
|
||||
use tokio::sync::mpsc as tmpsc;
|
||||
|
||||
use super::Waker;
|
||||
use crate::common::{
|
||||
control::{self, Inbound},
|
||||
endpoint, identity,
|
||||
};
|
||||
|
||||
/// Handle the GUI holds for the presence service. Dropping it doesn't stop the
|
||||
/// service (the thread is detached; the endpoint closes when the process exits)
|
||||
/// — it just stops the UI from draining inbound messages.
|
||||
pub struct PresenceHandle {
|
||||
/// Inbound control messages, drained by [`PresenceHandle::drain`] each tick.
|
||||
rx: Receiver<Inbound>,
|
||||
}
|
||||
|
||||
impl PresenceHandle {
|
||||
/// Pull every control message received since the last call. Collected by the
|
||||
/// caller so it can take `&mut self` while handling them.
|
||||
pub fn drain(&self) -> Vec<Inbound> {
|
||||
std::iter::from_fn(|| self.rx.try_recv().ok()).collect()
|
||||
}
|
||||
}
|
||||
|
||||
/// Start the presence service. Returns `None` if the persistent identity can't
|
||||
/// be loaded — the GUI then simply runs without friends features rather than
|
||||
/// refusing to start. The endpoint binds asynchronously on the spawned thread;
|
||||
/// our id is known immediately because it derives from the saved key, so we can
|
||||
/// fail-fast and log it without waiting on the relay handshake.
|
||||
pub fn start(waker: Waker, relay: Option<String>) -> Option<PresenceHandle> {
|
||||
let id: EndpointId = match identity::load_or_create() {
|
||||
Ok(key) => key.public(),
|
||||
Err(e) => {
|
||||
tracing::warn!("presence: no identity, friends features disabled: {e:#}");
|
||||
return None;
|
||||
}
|
||||
};
|
||||
tracing::info!(%id, "presence: starting control service");
|
||||
|
||||
let (tx, rx) = mpsc::channel::<Inbound>();
|
||||
thread::Builder::new()
|
||||
.name("pixelpass-presence".into())
|
||||
.spawn(move || run(relay, id, tx, waker))
|
||||
.map_err(|e| tracing::warn!("presence: could not spawn service thread: {e}"))
|
||||
.ok()?;
|
||||
|
||||
Some(PresenceHandle { rx })
|
||||
}
|
||||
|
||||
/// Thread body: a current-thread tokio runtime that binds the control endpoint,
|
||||
/// runs the accept loop, and bridges inbound messages to the UI channel.
|
||||
fn run(relay: Option<String>, id: EndpointId, tx: mpsc::Sender<Inbound>, waker: Waker) {
|
||||
let rt = match tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()
|
||||
{
|
||||
Ok(rt) => rt,
|
||||
Err(e) => {
|
||||
tracing::error!("presence: failed to build runtime: {e}");
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
rt.block_on(async move {
|
||||
let ep = match endpoint::bind_control(relay.as_deref()).await {
|
||||
Ok(ep) => ep,
|
||||
Err(e) => {
|
||||
tracing::error!("presence: failed to bind control endpoint: {e:#}");
|
||||
return;
|
||||
}
|
||||
};
|
||||
tracing::info!(%id, "presence: control endpoint online");
|
||||
|
||||
// Bridge the async accept loop to the sync UI channel, waking the loop
|
||||
// on each message so it lands even while hidden to the tray.
|
||||
let (itx, mut irx) = tmpsc::channel::<Inbound>(32);
|
||||
let forward = tokio::spawn(async move {
|
||||
while let Some(inbound) = irx.recv().await {
|
||||
if tx.send(inbound).is_err() {
|
||||
break; // UI gone
|
||||
}
|
||||
waker.wake();
|
||||
}
|
||||
});
|
||||
|
||||
control::serve(ep, itx).await;
|
||||
forward.abort();
|
||||
});
|
||||
}
|
||||
Reference in New Issue
Block a user