perf(friends): fire share-campaign rounds concurrently
run_share tried offline peers one at a time, so a single unreachable friend's ~10s control-plane connect timeout serialised the whole round (N offline peers → up to N×10s per round). Spawn each round's sends into a JoinSet and collect as they finish: a round now takes ~one timeout regardless of how many friends are offline. Delivery receipts are still emitted one-per-peer as each ACK lands; the code is shared across tasks via an Arc instead of re-cloning the payload per peer per round. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
+37
-6
@@ -10,6 +10,7 @@
|
|||||||
//! message wakes the loop even while the window is hidden to the tray — the same
|
//! message wakes the loop even while the window is hidden to the tray — the same
|
||||||
//! trick the headless-child reader uses.
|
//! trick the headless-child reader uses.
|
||||||
|
|
||||||
|
use std::sync::Arc;
|
||||||
use std::sync::mpsc::{self, Receiver};
|
use std::sync::mpsc::{self, Receiver};
|
||||||
use std::thread;
|
use std::thread;
|
||||||
|
|
||||||
@@ -232,17 +233,49 @@ fn run(
|
|||||||
/// [`SHARE_RETRY`] until all are delivered (or the task is aborted by a
|
/// [`SHARE_RETRY`] until all are delivered (or the task is aborted by a
|
||||||
/// StartShare/StopShare). Emits one [`PresenceEvent::ShareDelivered`] per peer
|
/// StartShare/StopShare). Emits one [`PresenceEvent::ShareDelivered`] per peer
|
||||||
/// the moment its ACK comes back — that ACK *is* the delivery signal.
|
/// the moment its ACK comes back — that ACK *is* the delivery signal.
|
||||||
|
///
|
||||||
|
/// Each round fires all still-pending peers **concurrently**, so a single
|
||||||
|
/// offline friend's ~10s connect timeout doesn't serialise the whole round
|
||||||
|
/// (which it did when peers were tried one at a time).
|
||||||
async fn run_share(
|
async fn run_share(
|
||||||
ep: Endpoint,
|
ep: Endpoint,
|
||||||
msg: ControlMsg,
|
msg: ControlMsg,
|
||||||
mut pending: Vec<EndpointId>,
|
mut pending: Vec<EndpointId>,
|
||||||
ui: tmpsc::Sender<PresenceEvent>,
|
ui: tmpsc::Sender<PresenceEvent>,
|
||||||
) {
|
) {
|
||||||
|
// The code is immutable for the campaign's life; share it across the
|
||||||
|
// per-peer tasks via an `Arc` rather than re-cloning the payload each round.
|
||||||
|
let msg = Arc::new(msg);
|
||||||
while !pending.is_empty() {
|
while !pending.is_empty() {
|
||||||
let mut still = Vec::new();
|
let mut round = tokio::task::JoinSet::new();
|
||||||
for peer in pending {
|
for peer in pending {
|
||||||
|
let ep = ep.clone();
|
||||||
|
let msg = Arc::clone(&msg);
|
||||||
|
round.spawn(async move {
|
||||||
match control::send(&ep, peer, &msg).await {
|
match control::send(&ep, peer, &msg).await {
|
||||||
Ok(()) => {
|
Ok(()) => (peer, true),
|
||||||
|
Err(e) => {
|
||||||
|
tracing::debug!(%peer, "presence: share not yet delivered: {e:#}");
|
||||||
|
(peer, false)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut still = Vec::new();
|
||||||
|
while let Some(joined) = round.join_next().await {
|
||||||
|
let (peer, delivered) = match joined {
|
||||||
|
Ok(outcome) => outcome,
|
||||||
|
// A send task panicking is unexpected; log and drop that peer
|
||||||
|
// from the campaign rather than abort the whole round. (A
|
||||||
|
// campaign-level abort drops this future entirely — we never
|
||||||
|
// observe that as a JoinError here.)
|
||||||
|
Err(e) => {
|
||||||
|
tracing::warn!("presence: share task failed: {e}");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
if delivered {
|
||||||
tracing::info!(%peer, "presence: shared code delivered");
|
tracing::info!(%peer, "presence: shared code delivered");
|
||||||
if ui
|
if ui
|
||||||
.send(PresenceEvent::ShareDelivered { peer })
|
.send(PresenceEvent::ShareDelivered { peer })
|
||||||
@@ -251,13 +284,11 @@ async fn run_share(
|
|||||||
{
|
{
|
||||||
return; // UI gone — nothing left to report to
|
return; // UI gone — nothing left to report to
|
||||||
}
|
}
|
||||||
}
|
} else {
|
||||||
Err(e) => {
|
|
||||||
tracing::debug!(%peer, "presence: share not yet delivered: {e:#}");
|
|
||||||
still.push(peer);
|
still.push(peer);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
if still.is_empty() {
|
if still.is_empty() {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user