feat(friends): push share codes to friends on hosting + receive bell (phase 4)

The payoff phase: on Start-Hosting, auto-push the wrapped share code to the
selected accepted friends, and surface codes friends push us in a bell.

Sending (host side):
- A share-target picker on the host form lists accepted friends as checkboxes;
  selection is stored as the *exclusion* set so the default ships to everyone
  and a friend added mid-session is included automatically.
- When the child reports its ticket, the wrapped code is pushed to the selected
  friends, gated by FriendStore::is_accepted.
- Delivery is online-now + retry-while-hosting: the presence service runs an
  abortable share campaign that retries offline friends every 5s until they're
  reached or hosting stops. The control-plane ACK is the delivered/failed
  signal; each success emits a ShareDelivered receipt.
- The running host screen shows a live "delivered ✓ / offline, retrying" row
  per targeted friend.

Receiving (viewer side):
- The previously-stubbed ShareCode handler now honours codes from accepted
  friends only, records a notice (deduped per friend), and fires a desktop
  notification.
- A top-right bell with a white-on-red badge counts pending notices; its panel
  lets you Watch a code (opens the viewer with it prefilled) or dismiss it.

Presence service refactor: the fire-and-forget Outbound becomes a Command enum
(Send / StartShare / StopShare) and the UI now drains a PresenceEvent enum
(Message / ShareDelivered) over one unified async→sync bridge.

Tests: +2 for the share-target selection rule (43 gui pass). clippy + fmt clean
on both feature sets; smoke-launch shows the control endpoint online, no panic.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
2026-05-30 17:01:19 -04:00
parent 9e839ca452
commit 1a746461b4
2 changed files with 476 additions and 49 deletions
+325 -12
View File
@@ -66,11 +66,13 @@ use winit::event_loop::{ActiveEventLoop, ControlFlow, EventLoop, EventLoopProxy}
use winit::raw_window_handle::HasWindowHandle as _;
use winit::window::{Window, WindowAttributes, WindowId};
use std::collections::{BTreeMap, BTreeSet};
use self::child::{ChildEvent, ChildProc};
use self::presence::PresenceHandle;
use self::presence::{PresenceEvent, PresenceHandle};
use self::tray::{TrayAction, TrayHandle, TrayStatus};
use crate::common::control::ControlMsg;
use crate::common::friends::FriendState;
use crate::common::friends::{FriendState, FriendStore};
/// Initial / minimum window size, in logical points. Initial height fits the
/// host screen (ticket + Copy + QR + Stop) without needing to scroll on a 1080p
@@ -643,6 +645,10 @@ pub fn run(relay: Option<String>) -> anyhow::Result<()> {
friends,
display_name: gui_settings.display_name,
met: Vec::new(),
share_excluded: BTreeSet::new(),
share_status: BTreeMap::new(),
notices: Vec::new(),
show_notices: false,
};
let mut app = App {
state,
@@ -707,6 +713,21 @@ fn control_hello(name: &str) -> ControlMsg {
}
}
/// The accepted friends a host's share push targets: every accepted friend the
/// user hasn't excluded on the host form. A free function (over the store + the
/// exclusion set) so the selection rule is unit-testable without a live UI.
fn selected_share_targets(
friends: &FriendStore,
excluded: &BTreeSet<iroh::EndpointId>,
) -> Vec<iroh::EndpointId> {
friends
.friends
.iter()
.filter(|f| f.state == FriendState::Accepted && !excluded.contains(&f.id))
.map(|f| f.id)
.collect()
}
/// Fire a desktop notification, on a detached thread so the D-Bus round-trip
/// can't stall the egui frame. Best-effort: with no notification daemon it
/// just does nothing. (notify-rust talks D-Bus via pure-Rust zbus, so this
@@ -935,6 +956,20 @@ struct PixelPassApp {
/// Peers met this session (over a connection) who aren't yet in the friends
/// list — drives the "add friend" offer. Session-scoped, not persisted.
met: Vec<MetPeer>,
/// Accepted friends *excluded* from the host's share push, toggled on the
/// host form. Stored as the exclusion (not the inclusion) so the default —
/// an empty set — means "share with everyone," and a friend added mid-session
/// is included automatically. Session-scoped.
share_excluded: BTreeSet<iroh::EndpointId>,
/// Delivery state for the current host session's share campaign: a friend is
/// present once targeted, `true` once their ACK arrives. Drives the live
/// "delivered / retrying" list on the running host screen. Cleared on stop.
share_status: BTreeMap<iroh::EndpointId, bool>,
/// Share codes friends have pushed to us, awaiting the user — the bell badge.
/// Deduped by sender (a friend re-hosting replaces their stale code).
notices: Vec<ShareNotice>,
/// Whether the bell's notification list is currently expanded.
show_notices: bool,
}
/// A peer encountered this session but not yet befriended.
@@ -944,6 +979,17 @@ struct MetPeer {
name: String,
}
/// A share code a friend pushed to us over the control plane, shown in the bell
/// panel until the user watches or dismisses it.
struct ShareNotice {
/// The friend who shared — the dedupe key (one live notice per friend).
from: iroh::EndpointId,
/// Their display name, for the panel row.
name: String,
/// The share code to drop into the viewer when "Watch" is clicked.
code: String,
}
/// The active theme plus the Settings picker/editor working state.
struct ThemeState {
/// Currently applied theme (persisted by name in the config).
@@ -1022,22 +1068,33 @@ impl PixelPassApp {
self.sync_tray_status();
}
/// Drain inbound control-plane messages and fold them into the friends list
/// and the session's met-peers, queueing any replies. Collected up front so
/// the presence borrow is released before we mutate `self` / re-borrow it to
/// send. (Phase 4 adds the bell badge + ShareCode handling.)
/// Drain presence-service events — inbound control messages and share-code
/// delivery receipts — folding them into the friends list, the session's
/// met-peers, the bell notices, and the live share status, queueing any
/// replies. Collected up front so the presence borrow is released before we
/// mutate `self` / re-borrow it to send.
fn pump_presence_events(&mut self) {
let Some(inbound) = self.presence.as_ref().map(|p| p.drain()) else {
let Some(events) = self.presence.as_ref().map(|p| p.drain()) else {
return;
};
if inbound.is_empty() {
if events.is_empty() {
return;
}
let my_name = self.display_name.clone();
let mut outbox: Vec<(iroh::EndpointId, ControlMsg)> = Vec::new();
let mut store_changed = false;
for inb in inbound {
for event in events {
let inb = match event {
PresenceEvent::ShareDelivered { peer } => {
// A code we pushed reached this friend — flip their row.
if let Some(s) = self.share_status.get_mut(&peer) {
*s = true;
}
continue;
}
PresenceEvent::Message(inb) => inb,
};
let from = inb.from;
match inb.msg {
ControlMsg::Hello { name } => {
@@ -1088,10 +1145,19 @@ impl PixelPassApp {
}
}
ControlMsg::ShareCode { name, ticket } => {
// Phase 4 turns this into the bell badge + an in-app notice.
// For now, only honour codes from accepted friends and log.
// Only accepted friends may push us a code a stranger's is
// ignored, so the control plane can't be used to spam viewers.
if self.friends.is_accepted(&from) {
tracing::info!(from = %from, %name, "presence: friend shared a code: {ticket}");
// Keep the stored name fresh from the live push.
if let Some(f) = self.friends.find_mut(&from) {
f.name = name.clone();
store_changed = true;
}
self.push_notice(from, name.clone(), ticket);
notify(
"PixelPass — a friend is sharing",
format!("{name} is sharing their screen. Open PixelPass to watch."),
);
} else {
tracing::warn!(from = %from, "presence: ignoring ShareCode from a non-friend");
}
@@ -1138,6 +1204,43 @@ impl PixelPassApp {
}
}
/// Record a share code a friend pushed us, replacing any prior notice from
/// the same friend (their previous code is stale once they re-host).
fn push_notice(&mut self, from: iroh::EndpointId, name: String, code: String) {
if let Some(n) = self.notices.iter_mut().find(|n| n.from == from) {
n.name = name;
n.code = code;
} else {
self.notices.push(ShareNotice { from, name, code });
}
}
/// Begin pushing the current host share code to the selected accepted
/// friends. Called once the child reports its ticket (so `share_code` is
/// set). Seeds the per-friend status map to "retrying" and hands the campaign
/// to the presence service, which delivers now and retries offline friends
/// while we host.
fn begin_share(&mut self) {
self.share_status.clear();
let Some(code) = self.host.share_code.clone() else {
return;
};
let targets = selected_share_targets(&self.friends, &self.share_excluded);
if targets.is_empty() {
return;
}
self.share_status = targets.iter().map(|id| (*id, false)).collect();
if let Some(p) = &self.presence {
p.start_share(
ControlMsg::ShareCode {
name: self.display_name.clone(),
ticket: code,
},
targets,
);
}
}
/// Send a friend request to a peer (or accept theirs if they already asked),
/// persisting the new state and notifying the peer over the control plane.
fn request_friend(&mut self, id: iroh::EndpointId, name: String) {
@@ -1348,6 +1451,98 @@ impl PixelPassApp {
Screen::Settings => self.settings(ui),
Screen::Shortcuts => self.shortcuts(ui),
}
// The notification bell floats over every screen (an overlay Area, so it
// doesn't disturb the per-screen layouts), drawn last to sit on top.
self.notification_bell(ui);
}
/// A bell in the top-right corner with a red badge counting share codes
/// friends have pushed us. Clicking it toggles a panel listing them, each
/// openable straight into the viewer. An overlay so it rides above whichever
/// screen is showing without fighting its scroll areas or back buttons.
fn notification_bell(&mut self, ui: &mut egui::Ui) {
let count = self.notices.len();
egui::Area::new(egui::Id::new("notif_bell"))
.anchor(egui::Align2::RIGHT_TOP, egui::vec2(-10.0, 8.0))
.order(egui::Order::Foreground)
.show(ui.ctx(), |ui| {
ui.horizontal(|ui| {
if count > 0 {
ui.label(
egui::RichText::new(format!(" {count} "))
.color(egui::Color32::WHITE)
.background_color(egui::Color32::from_rgb(200, 40, 40))
.strong(),
);
}
if ui
.button("🔔")
.on_hover_text("Screen codes shared by friends")
.clicked()
{
self.show_notices = !self.show_notices;
}
});
});
if self.show_notices {
self.notification_panel(ui);
}
}
/// The dropdown listing pushed share codes. Each row offers Watch (open it in
/// the viewer) and Dismiss; collected first so the list isn't mutated mid-draw.
fn notification_panel(&mut self, ui: &mut egui::Ui) {
let mut watch: Option<String> = None;
let mut dismiss: Option<iroh::EndpointId> = None;
let mut clear_all = false;
egui::Area::new(egui::Id::new("notif_panel"))
.anchor(egui::Align2::RIGHT_TOP, egui::vec2(-10.0, 40.0))
.order(egui::Order::Foreground)
.show(ui.ctx(), |ui| {
egui::Frame::popup(ui.style()).show(ui, |ui| {
ui.set_max_width(280.0);
ui.label(egui::RichText::new("Shared screen codes").strong());
ui.separator();
if self.notices.is_empty() {
ui.label(egui::RichText::new("No codes from friends right now.").weak());
return;
}
for n in &self.notices {
ui.horizontal_wrapped(|ui| {
ui.label(egui::RichText::new(&n.name).strong());
ui.label(egui::RichText::new("is sharing their screen").weak());
});
ui.horizontal(|ui| {
if ui.button("▶ Watch").clicked() {
watch = Some(n.code.clone());
dismiss = Some(n.from);
}
if ui.small_button("Dismiss").clicked() {
dismiss = Some(n.from);
}
});
ui.separator();
}
if ui.small_button("Clear all").clicked() {
clear_all = true;
}
});
});
if clear_all {
self.notices.clear();
self.show_notices = false;
}
if let Some(id) = dismiss {
self.notices.retain(|n| n.from != id);
}
if let Some(code) = watch {
self.viewer.ticket_input = code;
self.viewer.focus_ticket = true;
self.screen = Screen::Viewer;
self.show_notices = false;
}
}
/// Window-focused keyboard shortcuts (mirrored on the Shortcuts screen).
@@ -1781,6 +1976,47 @@ impl PixelPassApp {
});
}
/// The "share my code with these friends" picker on the host form. Lists the
/// accepted friends as checkboxes (ticked = will receive the code when I
/// start). Selection is the *exclusion* set, so the default ships the code to
/// everyone. Hidden when there's no presence service or no accepted friends.
fn share_picker(&mut self, ui: &mut egui::Ui) {
if self.presence.is_none() {
return;
}
let accepted: Vec<(iroh::EndpointId, String)> = self
.friends
.friends
.iter()
.filter(|f| f.state == FriendState::Accepted)
.map(|f| (f.id, f.name.clone()))
.collect();
if accepted.is_empty() {
return;
}
ui.add_space(12.0);
ui.separator();
ui.label("Auto-share my code with:");
for (id, name) in &accepted {
let mut on = !self.share_excluded.contains(id);
if ui.checkbox(&mut on, name.as_str()).changed() {
if on {
self.share_excluded.remove(id);
} else {
self.share_excluded.insert(*id);
}
}
}
ui.label(
egui::RichText::new(
"Ticked friends get this session's code automatically — even if \
they're offline now (we keep retrying while you host).",
)
.small()
.weak(),
);
}
fn host_form(&mut self, ui: &mut egui::Ui) {
if let Some(err) = &self.host.error {
ui.colored_label(self.theme.active.error, err);
@@ -1821,6 +2057,8 @@ impl PixelPassApp {
ui.end_row();
});
self.share_picker(ui);
ui.add_space(16.0);
if ui
.add_sized([160.0, 36.0], egui::Button::new("Start hosting"))
@@ -1972,6 +2210,7 @@ impl PixelPassApp {
ui.colored_label(self.theme.active.warning, format!("{reason}"));
}
self.share_status_list(ui);
self.friend_offers(ui);
ui.add_space(16.0);
@@ -1983,6 +2222,34 @@ impl PixelPassApp {
}
}
/// Live status of the host's share push: each targeted friend with a
/// delivered ✓ or a "retrying" marker (offline friends are chased until they
/// come online or the session stops). Empty — and so hidden — when no friends
/// were selected to share with.
fn share_status_list(&mut self, ui: &mut egui::Ui) {
if self.share_status.is_empty() {
return;
}
ui.add_space(10.0);
ui.separator();
ui.label("Shared this code with:");
for (id, delivered) in &self.share_status {
let name = self
.friends
.find(id)
.map(|f| f.name.clone())
.unwrap_or_else(|| short_id(&id.to_string()));
ui.horizontal(|ui| {
ui.label(format!("{name}"));
if *delivered {
ui.colored_label(self.theme.active.success, "✓ delivered");
} else {
ui.colored_label(self.theme.active.waiting, "… offline, retrying");
}
});
}
}
fn start_host(&mut self) {
self.host.error = None;
self.host.last_refusal = None;
@@ -1996,6 +2263,12 @@ impl PixelPassApp {
self.host.viewers.clear();
self.host.qr_texture = None;
self.met.clear();
// Any previous campaign is stale; the new session's Ticket event will
// start a fresh one once its code arrives.
self.share_status.clear();
if let Some(p) = &self.presence {
p.stop_share();
}
let mut args = vec![
"--host".to_string(),
@@ -2035,6 +2308,11 @@ impl PixelPassApp {
self.host.viewers.clear();
self.host.qr_texture = None;
self.met.clear();
// The code is no longer valid, so stop chasing offline friends with it.
self.share_status.clear();
if let Some(p) = &self.presence {
p.stop_share();
}
}
/// Drain the host child's event channel into state, and detect an
@@ -2083,6 +2361,8 @@ impl PixelPassApp {
self.host.copied = set_clipboard(&share_code);
self.host.ticket = Some(value);
self.host.share_code = Some(share_code);
// Now that there's a code, push it to the selected friends.
self.begin_share();
}
ChildEvent::HostInfo {
display_server,
@@ -2428,4 +2708,37 @@ mod tests {
assert_eq!(short_id("abc"), "abc");
assert_eq!(short_id("0123456789ab"), "0123456789ab"); // exactly 12, no ellipsis
}
fn id() -> iroh::EndpointId {
iroh::SecretKey::generate().public()
}
#[test]
fn share_targets_are_accepted_friends_minus_excluded() {
let mut friends = FriendStore::default();
let alice = id();
let bob = id();
let pending = id();
friends.upsert(alice, "Alice".into(), FriendState::Accepted);
friends.upsert(bob, "Bob".into(), FriendState::Accepted);
friends.upsert(pending, "Pat".into(), FriendState::PendingIncoming);
// No exclusions → every accepted friend, and never a pending one.
let all = selected_share_targets(&friends, &BTreeSet::new());
assert_eq!(all.len(), 2);
assert!(all.contains(&alice) && all.contains(&bob));
assert!(!all.contains(&pending));
// Excluding Bob drops only Bob.
let excluded: BTreeSet<_> = [bob].into_iter().collect();
let some = selected_share_targets(&friends, &excluded);
assert_eq!(some, vec![alice]);
}
#[test]
fn share_targets_empty_without_accepted_friends() {
let mut friends = FriendStore::default();
friends.upsert(id(), "Out".into(), FriendState::PendingOutgoing);
assert!(selected_share_targets(&friends, &BTreeSet::new()).is_empty());
}
}
+151 -37
View File
@@ -13,7 +13,7 @@
use std::sync::mpsc::{self, Receiver};
use std::thread;
use iroh::EndpointId;
use iroh::{Endpoint, EndpointId};
use tokio::sync::mpsc as tmpsc;
use super::Waker;
@@ -22,12 +22,39 @@ use crate::common::{
endpoint, identity,
};
/// An outbound control message the UI asks the service to deliver.
struct Outbound {
peer: EndpointId,
msg: ControlMsg,
/// A command the UI hands the presence service over [`PresenceHandle`].
enum Command {
/// Deliver one message, once, fire-and-forget (friend request/accept/decline
/// and the presence `Hello`). A failure is logged, not retried.
Send { peer: EndpointId, msg: ControlMsg },
/// Begin — or replace — a share campaign: push `msg` (a
/// [`ControlMsg::ShareCode`]) to every peer in `peers`, retrying the ones
/// that are offline until they're reached or the campaign is stopped. Each
/// success emits a [`PresenceEvent::ShareDelivered`]. Replaces any campaign
/// already running (a fresh host session supersedes the previous code).
StartShare {
msg: ControlMsg,
peers: Vec<EndpointId>,
},
/// Stop the active share campaign — the host stopped or left the screen, so
/// the perishable code is no longer valid and offline friends shouldn't keep
/// being chased.
StopShare,
}
/// Something the service surfaces to the UI, drained each tick.
pub enum PresenceEvent {
/// A control message arrived from a peer.
Message(Inbound),
/// A share-campaign code reached `peer` (its ACK came back). Lets the host
/// screen flip that friend's row from "retrying" to "delivered."
ShareDelivered { peer: EndpointId },
}
/// How long to wait before re-attempting delivery to friends who were offline
/// on the previous round of a share campaign.
const SHARE_RETRY: std::time::Duration = std::time::Duration::from_secs(5);
/// Handle the GUI holds for the presence service. Dropping it doesn't stop the
/// service (the thread is detached; the endpoint closes when the process exits)
/// — it just stops the UI from draining inbound messages.
@@ -35,11 +62,12 @@ pub struct PresenceHandle {
/// Our stable control-plane id — what friends know us by, and what we embed
/// in a wrapped share code so a viewer can find us.
id: EndpointId,
/// Inbound control messages, drained by [`PresenceHandle::drain`] each tick.
rx: Receiver<Inbound>,
/// Outbound requests handed to the service thread. Unbounded tokio sender so
/// the sync UI can enqueue without blocking or being inside the runtime.
out_tx: tmpsc::UnboundedSender<Outbound>,
/// Service events (inbound messages + share receipts), drained by
/// [`PresenceHandle::drain`] each tick.
rx: Receiver<PresenceEvent>,
/// Commands handed to the service thread. Unbounded tokio sender so the sync
/// UI can enqueue without blocking or being inside the runtime.
out_tx: tmpsc::UnboundedSender<Command>,
}
impl PresenceHandle {
@@ -48,18 +76,33 @@ impl PresenceHandle {
self.id
}
/// Pull every control message received since the last call. Collected by the
/// Pull every service event received since the last call. Collected by the
/// caller so it can take `&mut self` while handling them.
pub fn drain(&self) -> Vec<Inbound> {
pub fn drain(&self) -> Vec<PresenceEvent> {
std::iter::from_fn(|| self.rx.try_recv().ok()).collect()
}
/// Enqueue a message for delivery to `peer`. Fire-and-forget from the UI's
/// view; the service connects, delivers, and logs a failure (Phase 4 adds a
/// retry queue). A send error here only means the service thread is gone.
/// Enqueue a one-shot message for delivery to `peer`. Fire-and-forget from
/// the UI's view; the service connects, delivers, and logs a failure. A send
/// error here only means the service thread is gone.
pub fn send(&self, peer: EndpointId, msg: ControlMsg) {
if self.out_tx.send(Outbound { peer, msg }).is_err() {
tracing::warn!("presence: service thread gone; dropping outbound message");
self.command(Command::Send { peer, msg });
}
/// Begin (or replace) a share campaign pushing `msg` to `peers`, retrying
/// offline friends until [`PresenceHandle::stop_share`] or the next call.
pub fn start_share(&self, msg: ControlMsg, peers: Vec<EndpointId>) {
self.command(Command::StartShare { msg, peers });
}
/// Stop the active share campaign (host stopped — the code is now stale).
pub fn stop_share(&self) {
self.command(Command::StopShare);
}
fn command(&self, cmd: Command) {
if self.out_tx.send(cmd).is_err() {
tracing::warn!("presence: service thread gone; dropping command");
}
}
}
@@ -79,8 +122,8 @@ pub fn start(waker: Waker, relay: Option<String>) -> Option<PresenceHandle> {
};
tracing::info!(%id, "presence: starting control service");
let (tx, rx) = mpsc::channel::<Inbound>();
let (out_tx, out_rx) = tmpsc::unbounded_channel::<Outbound>();
let (tx, rx) = mpsc::channel::<PresenceEvent>();
let (out_tx, out_rx) = tmpsc::unbounded_channel::<Command>();
thread::Builder::new()
.name("pixelpass-presence".into())
.spawn(move || run(relay, id, tx, out_rx, waker))
@@ -96,8 +139,8 @@ pub fn start(waker: Waker, relay: Option<String>) -> Option<PresenceHandle> {
fn run(
relay: Option<String>,
id: EndpointId,
tx: mpsc::Sender<Inbound>,
mut out_rx: tmpsc::UnboundedReceiver<Outbound>,
tx: mpsc::Sender<PresenceEvent>,
mut out_rx: tmpsc::UnboundedReceiver<Command>,
waker: Waker,
) {
let rt = match tokio::runtime::Builder::new_current_thread()
@@ -121,34 +164,105 @@ fn run(
};
tracing::info!(%id, "presence: control endpoint online");
// Bridge the async accept loop to the sync UI channel, waking the loop
// on each message so it lands even while hidden to the tray.
let (itx, mut irx) = tmpsc::channel::<Inbound>(32);
// One async→sync bridge for *everything* the UI sees: every producer
// (the accept loop and the share campaign) pushes a `PresenceEvent` into
// `ui_tx`; this task drains it onto the std channel and wakes the loop so
// the event lands even while the window is hidden to the tray.
let (ui_tx, mut ui_rx) = tmpsc::channel::<PresenceEvent>(64);
let forward = tokio::spawn(async move {
while let Some(inbound) = irx.recv().await {
if tx.send(inbound).is_err() {
while let Some(event) = ui_rx.recv().await {
if tx.send(event).is_err() {
break; // UI gone
}
waker.wake();
}
});
// Deliver outbound messages the UI enqueues, each on its own task so a
// slow/offline peer doesn't hold up the others.
let send_ep = ep.clone();
let sender = tokio::spawn(async move {
while let Some(out) = out_rx.recv().await {
let ep = send_ep.clone();
tokio::spawn(async move {
if let Err(e) = control::send(&ep, out.peer, &out.msg).await {
tracing::warn!(peer = %out.peer, "presence: outbound send failed: {e:#}");
// Wrap inbound control messages as events and feed the bridge.
let (itx, mut irx) = tmpsc::channel::<Inbound>(32);
let inbound_ui = ui_tx.clone();
let inbound = tokio::spawn(async move {
while let Some(msg) = irx.recv().await {
if inbound_ui.send(PresenceEvent::Message(msg)).await.is_err() {
break;
}
}
});
// Handle UI commands: one-shot sends each on their own task, and a single
// abortable share campaign (StartShare replaces it, StopShare cancels it).
let cmd_ep = ep.clone();
let commands = tokio::spawn(async move {
let mut share: Option<tokio::task::JoinHandle<()>> = None;
while let Some(cmd) = out_rx.recv().await {
match cmd {
Command::Send { peer, msg } => {
let ep = cmd_ep.clone();
tokio::spawn(async move {
if let Err(e) = control::send(&ep, peer, &msg).await {
tracing::warn!(%peer, "presence: outbound send failed: {e:#}");
}
});
}
});
Command::StartShare { msg, peers } => {
if let Some(t) = share.take() {
t.abort();
}
let ep = cmd_ep.clone();
let ui = ui_tx.clone();
share = Some(tokio::spawn(run_share(ep, msg, peers, ui)));
}
Command::StopShare => {
if let Some(t) = share.take() {
t.abort();
}
}
}
}
});
control::serve(ep, itx).await;
forward.abort();
sender.abort();
inbound.abort();
commands.abort();
});
}
/// Push `msg` to every peer in `peers`, retrying the ones that are offline every
/// [`SHARE_RETRY`] until all are delivered (or the task is aborted by a
/// StartShare/StopShare). Emits one [`PresenceEvent::ShareDelivered`] per peer
/// the moment its ACK comes back — that ACK *is* the delivery signal.
async fn run_share(
ep: Endpoint,
msg: ControlMsg,
mut pending: Vec<EndpointId>,
ui: tmpsc::Sender<PresenceEvent>,
) {
while !pending.is_empty() {
let mut still = Vec::new();
for peer in pending {
match control::send(&ep, peer, &msg).await {
Ok(()) => {
tracing::info!(%peer, "presence: shared code delivered");
if ui
.send(PresenceEvent::ShareDelivered { peer })
.await
.is_err()
{
return; // UI gone — nothing left to report to
}
}
Err(e) => {
tracing::debug!(%peer, "presence: share not yet delivered: {e:#}");
still.push(peer);
}
}
}
if still.is_empty() {
break;
}
pending = still;
tokio::time::sleep(SHARE_RETRY).await;
}
tracing::info!("presence: share campaign complete");
}