tunnel: keep data direction alive when reverse direction EOFs
The old tokio::select! tore down the whole bridge when EITHER direction's io::copy finished. For a one-way streaming workload the reverse direction carries only the initial HTTP GET; once mpv stops sending and the read half EOFs, the data direction got killed mid- stream and the host logged "bridge closed cleanly" while the user's video disappeared. Spawn the reverse direction as a detached task and `.await` only the data direction. When the data direction ends naturally, abort the reverse task. The function gains Send + 'static bounds on T, which TcpStream satisfies. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
+13
-16
@@ -5,30 +5,27 @@ use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
|
|||||||
/// Bridge an iroh bi-stream and a generic AsyncRead+AsyncWrite (typically a TCP
|
/// Bridge an iroh bi-stream and a generic AsyncRead+AsyncWrite (typically a TCP
|
||||||
/// socket) by copying bytes in both directions concurrently.
|
/// socket) by copying bytes in both directions concurrently.
|
||||||
///
|
///
|
||||||
/// Returns once either direction finishes or errors. The peer-side socket
|
/// The data direction (peer → quic on the host, quic → peer on the viewer)
|
||||||
/// halves are owned and dropped here on exit, ensuring FIN propagates.
|
/// is the only one whose completion ends the bridge. The reverse direction
|
||||||
|
/// carries the initial HTTP request and then nothing for a one-way streaming
|
||||||
|
/// workload; if its `io::copy` finishes (e.g. mpv half-closes its send side),
|
||||||
|
/// we must not tear down the data direction with it.
|
||||||
pub async fn bridge<T>(quic_send: SendStream, quic_recv: RecvStream, peer: T) -> Result<()>
|
pub async fn bridge<T>(quic_send: SendStream, quic_recv: RecvStream, peer: T) -> Result<()>
|
||||||
where
|
where
|
||||||
T: AsyncRead + AsyncWrite + Unpin,
|
T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
|
||||||
{
|
{
|
||||||
let (mut peer_r, mut peer_w) = tokio::io::split(peer);
|
let (mut peer_r, mut peer_w) = tokio::io::split(peer);
|
||||||
let mut quic_send = quic_send;
|
let mut quic_send = quic_send;
|
||||||
let mut quic_recv = quic_recv;
|
let mut quic_recv = quic_recv;
|
||||||
|
|
||||||
let to_quic = async {
|
let reverse = tokio::spawn(async move {
|
||||||
let n = tokio::io::copy(&mut peer_r, &mut quic_send).await;
|
let _ = tokio::io::copy(&mut quic_recv, &mut peer_w).await;
|
||||||
let _ = quic_send.finish();
|
|
||||||
n
|
|
||||||
};
|
|
||||||
let to_peer = async {
|
|
||||||
let n = tokio::io::copy(&mut quic_recv, &mut peer_w).await;
|
|
||||||
let _ = peer_w.shutdown().await;
|
let _ = peer_w.shutdown().await;
|
||||||
n
|
});
|
||||||
};
|
|
||||||
|
|
||||||
tokio::select! {
|
let res = tokio::io::copy(&mut peer_r, &mut quic_send).await;
|
||||||
res = to_quic => { res?; }
|
let _ = quic_send.finish();
|
||||||
res = to_peer => { res?; }
|
reverse.abort();
|
||||||
}
|
res?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user