From 04bc0a808a9a493003db5695d696cb610c2a2dca Mon Sep 17 00:00:00 2001 From: Mollusk Date: Sat, 30 May 2026 22:01:42 -0400 Subject: [PATCH] perf(friends): fire share-campaign rounds concurrently MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit run_share tried offline peers one at a time, so a single unreachable friend's ~10s control-plane connect timeout serialised the whole round (N offline peers → up to N×10s per round). Spawn each round's sends into a JoinSet and collect as they finish: a round now takes ~one timeout regardless of how many friends are offline. Delivery receipts are still emitted one-per-peer as each ACK lands; the code is shared across tasks via an Arc instead of re-cloning the payload per peer per round. Co-Authored-By: Claude Opus 4.8 --- src/gui/presence.rs | 55 +++++++++++++++++++++++++++++++++++---------- 1 file changed, 43 insertions(+), 12 deletions(-) diff --git a/src/gui/presence.rs b/src/gui/presence.rs index ff0dd00..d951453 100644 --- a/src/gui/presence.rs +++ b/src/gui/presence.rs @@ -10,6 +10,7 @@ //! message wakes the loop even while the window is hidden to the tray — the same //! trick the headless-child reader uses. +use std::sync::Arc; use std::sync::mpsc::{self, Receiver}; use std::thread; @@ -232,32 +233,62 @@ fn run( /// [`SHARE_RETRY`] until all are delivered (or the task is aborted by a /// StartShare/StopShare). Emits one [`PresenceEvent::ShareDelivered`] per peer /// the moment its ACK comes back — that ACK *is* the delivery signal. +/// +/// Each round fires all still-pending peers **concurrently**, so a single +/// offline friend's ~10s connect timeout doesn't serialise the whole round +/// (which it did when peers were tried one at a time). async fn run_share( ep: Endpoint, msg: ControlMsg, mut pending: Vec, ui: tmpsc::Sender, ) { + // The code is immutable for the campaign's life; share it across the + // per-peer tasks via an `Arc` rather than re-cloning the payload each round. + let msg = Arc::new(msg); while !pending.is_empty() { - let mut still = Vec::new(); + let mut round = tokio::task::JoinSet::new(); for peer in pending { - match control::send(&ep, peer, &msg).await { - Ok(()) => { - tracing::info!(%peer, "presence: shared code delivered"); - if ui - .send(PresenceEvent::ShareDelivered { peer }) - .await - .is_err() - { - return; // UI gone — nothing left to report to + let ep = ep.clone(); + let msg = Arc::clone(&msg); + round.spawn(async move { + match control::send(&ep, peer, &msg).await { + Ok(()) => (peer, true), + Err(e) => { + tracing::debug!(%peer, "presence: share not yet delivered: {e:#}"); + (peer, false) } } + }); + } + + let mut still = Vec::new(); + while let Some(joined) = round.join_next().await { + let (peer, delivered) = match joined { + Ok(outcome) => outcome, + // A send task panicking is unexpected; log and drop that peer + // from the campaign rather than abort the whole round. (A + // campaign-level abort drops this future entirely — we never + // observe that as a JoinError here.) Err(e) => { - tracing::debug!(%peer, "presence: share not yet delivered: {e:#}"); - still.push(peer); + tracing::warn!("presence: share task failed: {e}"); + continue; } + }; + if delivered { + tracing::info!(%peer, "presence: shared code delivered"); + if ui + .send(PresenceEvent::ShareDelivered { peer }) + .await + .is_err() + { + return; // UI gone — nothing left to report to + } + } else { + still.push(peer); } } + if still.is_empty() { break; }