host/serve: extract HTTP fanout from wayland.rs
The broadcast fanout, supervisor-facing listener bind, accept loop, and per-viewer drain were all sitting inside host/wayland.rs even though none of it is Wayland-specific. Move them to host/serve.rs so the X11 backend can share the same serving layer with a one-line constructor call instead of copy-pasting (and drifting on) the fanout code. No behavior change. Wayland's CaptureHandle now wraps a serve::Serve instead of owning the listener/reader/server fields directly; gst pipeline construction is unchanged. connect_to_capture moves alongside Serve since it pairs with it. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
+2
-1
@@ -1,4 +1,5 @@
|
|||||||
mod capture;
|
mod capture;
|
||||||
|
mod serve;
|
||||||
mod wayland;
|
mod wayland;
|
||||||
|
|
||||||
use anyhow::{Result, bail};
|
use anyhow::{Result, bail};
|
||||||
@@ -135,7 +136,7 @@ async fn handle_peer(
|
|||||||
|
|
||||||
eprintln!("[pixelpass] viewer connected: {remote}");
|
eprintln!("[pixelpass] viewer connected: {remote}");
|
||||||
|
|
||||||
let tcp = match wayland::connect_to_capture(port, Duration::from_secs(5)).await {
|
let tcp = match serve::connect_to_capture(port, Duration::from_secs(5)).await {
|
||||||
Ok(t) => t,
|
Ok(t) => t,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
tracing::warn!(%remote, "connect_to_capture failed: {e:#}");
|
tracing::warn!(%remote, "connect_to_capture failed: {e:#}");
|
||||||
|
|||||||
@@ -0,0 +1,191 @@
|
|||||||
|
//! Display-server-agnostic serving layer: takes a capture child's stdout
|
||||||
|
//! producing MPEG-TS bytes and fans them out to N concurrent HTTP viewers
|
||||||
|
//! on a localhost port. One reader task pumps stdout chunks into a
|
||||||
|
//! tokio::sync::broadcast channel; the accept loop spawns one drain task
|
||||||
|
//! per accepted TCP connection. Slow consumers see Lagged and skip ahead;
|
||||||
|
//! MPEG-TS resyncs at the next keyframe.
|
||||||
|
//!
|
||||||
|
//! Backends (host/wayland.rs, future host/x11.rs) build their own gst
|
||||||
|
//! pipeline and hand the resulting ChildStdout to [`Serve::bind`].
|
||||||
|
|
||||||
|
use anyhow::{Context, Result, bail};
|
||||||
|
use std::sync::Arc;
|
||||||
|
use std::time::Duration;
|
||||||
|
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||||
|
use tokio::net::{TcpListener, TcpStream};
|
||||||
|
use tokio::process::ChildStdout;
|
||||||
|
use tokio::sync::broadcast;
|
||||||
|
use tokio::task::JoinHandle;
|
||||||
|
use tokio::time::{Instant, sleep};
|
||||||
|
|
||||||
|
/// Broadcast-channel capacity in chunks. Each chunk is up to 64 KiB from
|
||||||
|
/// the capture child's stdout, so 16 chunks ≈ 1 MiB ≈ ~2 s of buffered
|
||||||
|
/// jitter at typical bitrates. A viewer that falls behind by more than
|
||||||
|
/// this gets Lagged and skips ahead — MPEG-TS recovers at the next
|
||||||
|
/// keyframe.
|
||||||
|
const FANOUT_CAPACITY: usize = 16;
|
||||||
|
|
||||||
|
/// Size of each chunk read from the capture child's stdout.
|
||||||
|
const READ_CHUNK: usize = 64 * 1024;
|
||||||
|
|
||||||
|
/// Owns the localhost HTTP listener and the two long-running tasks that
|
||||||
|
/// pump bytes from a capture child to all connected viewers.
|
||||||
|
pub struct Serve {
|
||||||
|
port: u16,
|
||||||
|
reader: Option<JoinHandle<()>>,
|
||||||
|
server: Option<JoinHandle<()>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Serve {
|
||||||
|
/// Bind a localhost listener on a random port, set up the broadcast
|
||||||
|
/// fanout, and spawn the reader + accept-loop tasks. The provided
|
||||||
|
/// `stdout` is assumed to produce MPEG-TS bytes.
|
||||||
|
pub async fn bind(stdout: ChildStdout) -> Result<Self> {
|
||||||
|
let listener = TcpListener::bind("127.0.0.1:0")
|
||||||
|
.await
|
||||||
|
.context("could not bind local capture HTTP listener")?;
|
||||||
|
let port = listener.local_addr()?.port();
|
||||||
|
|
||||||
|
let (tx, _) = broadcast::channel::<Arc<Vec<u8>>>(FANOUT_CAPACITY);
|
||||||
|
let reader = tokio::spawn(pump_to_broadcast(stdout, tx.clone()));
|
||||||
|
let server = tokio::spawn(run_accept_loop(listener, tx));
|
||||||
|
|
||||||
|
Ok(Self {
|
||||||
|
port,
|
||||||
|
reader: Some(reader),
|
||||||
|
server: Some(server),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn local_port(&self) -> u16 {
|
||||||
|
self.port
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Abort the reader and accept-loop tasks. Backends typically call this
|
||||||
|
/// after killing their capture child so the reader sees stdout EOF and
|
||||||
|
/// exits on its own; the abort is a backstop.
|
||||||
|
pub async fn shutdown(mut self) {
|
||||||
|
if let Some(task) = self.reader.take() {
|
||||||
|
task.abort();
|
||||||
|
}
|
||||||
|
if let Some(task) = self.server.take() {
|
||||||
|
task.abort();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Drop for Serve {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
if let Some(task) = self.reader.as_ref() {
|
||||||
|
task.abort();
|
||||||
|
}
|
||||||
|
if let Some(task) = self.server.as_ref() {
|
||||||
|
task.abort();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Connect to the local capture HTTP listener, retrying until it's up or
|
||||||
|
/// we time out. Returns the connected socket — the bridge layer pipes
|
||||||
|
/// QUIC↔this socket once it's open.
|
||||||
|
pub async fn connect_to_capture(port: u16, max_wait: Duration) -> Result<TcpStream> {
|
||||||
|
let deadline = Instant::now() + max_wait;
|
||||||
|
loop {
|
||||||
|
match TcpStream::connect(("127.0.0.1", port)).await {
|
||||||
|
Ok(stream) => return Ok(stream),
|
||||||
|
Err(_) if Instant::now() < deadline => {
|
||||||
|
sleep(Duration::from_millis(50)).await;
|
||||||
|
}
|
||||||
|
Err(e) => bail!("capture HTTP listener never came up on 127.0.0.1:{port}: {e}"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Read the capture child's stdout in chunks and broadcast each to all
|
||||||
|
/// current subscribers. `broadcast::send` returns Err when there are no
|
||||||
|
/// receivers; we ignore it so the capture child isn't backpressured
|
||||||
|
/// waiting for a viewer.
|
||||||
|
async fn pump_to_broadcast(mut stdout: ChildStdout, tx: broadcast::Sender<Arc<Vec<u8>>>) {
|
||||||
|
let mut buf = vec![0u8; READ_CHUNK];
|
||||||
|
loop {
|
||||||
|
match stdout.read(&mut buf).await {
|
||||||
|
Ok(0) => {
|
||||||
|
tracing::info!("capture stdout EOF — fanout reader exiting");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
Ok(n) => {
|
||||||
|
let chunk = Arc::new(buf[..n].to_vec());
|
||||||
|
let _ = tx.send(chunk);
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
tracing::warn!("capture stdout read error: {e}");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn run_accept_loop(listener: TcpListener, tx: broadcast::Sender<Arc<Vec<u8>>>) {
|
||||||
|
loop {
|
||||||
|
let sock = match listener.accept().await {
|
||||||
|
Ok((s, _)) => s,
|
||||||
|
Err(e) => {
|
||||||
|
tracing::warn!("capture HTTP accept failed: {e}");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
let rx = tx.subscribe();
|
||||||
|
tokio::spawn(serve_one_viewer(sock, rx));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn serve_one_viewer(mut sock: TcpStream, mut rx: broadcast::Receiver<Arc<Vec<u8>>>) {
|
||||||
|
if !drain_http_request(&mut sock).await {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const RESPONSE: &[u8] = b"HTTP/1.1 200 OK\r\n\
|
||||||
|
Content-Type: video/mp2t\r\n\
|
||||||
|
Cache-Control: no-cache, no-store\r\n\
|
||||||
|
Connection: close\r\n\
|
||||||
|
\r\n";
|
||||||
|
if sock.write_all(RESPONSE).await.is_err() {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
loop {
|
||||||
|
match rx.recv().await {
|
||||||
|
Ok(chunk) => {
|
||||||
|
if sock.write_all(&chunk).await.is_err() {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(broadcast::error::RecvError::Lagged(skipped)) => {
|
||||||
|
tracing::warn!(
|
||||||
|
skipped,
|
||||||
|
"viewer fanout lagged — MPEG-TS will resync at next keyframe"
|
||||||
|
);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
Err(broadcast::error::RecvError::Closed) => return,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn drain_http_request(sock: &mut TcpStream) -> bool {
|
||||||
|
let mut buf = [0u8; 1024];
|
||||||
|
let mut total = Vec::with_capacity(512);
|
||||||
|
loop {
|
||||||
|
match sock.read(&mut buf).await {
|
||||||
|
Ok(0) => return false,
|
||||||
|
Ok(n) => total.extend_from_slice(&buf[..n]),
|
||||||
|
Err(_) => return false,
|
||||||
|
}
|
||||||
|
if total.windows(4).any(|w| w == b"\r\n\r\n") {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
if total.len() > 16 * 1024 {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
+23
-171
@@ -1,10 +1,6 @@
|
|||||||
//! Wayland capture: ashpd ScreenCast portal → PipeWire fd → gst-launch
|
//! Wayland capture: ashpd ScreenCast portal → PipeWire fd → gst-launch.
|
||||||
//! pipewiresrc → MPEG-TS on gst stdout → in-process HTTP server bound on a
|
//! Builds the gst pipeline that produces MPEG-TS on stdout, then hands
|
||||||
//! random localhost port. One gst child feeds a tokio::sync::broadcast channel;
|
//! that stdout to [`super::serve::Serve`] which handles the HTTP fanout.
|
||||||
//! the HTTP listener accepts multiple connections and each one drains its own
|
|
||||||
//! fresh broadcast::Receiver — so a single capture pipeline fans out to N
|
|
||||||
//! concurrent viewers. Slow consumers see Lagged and skip ahead; the MPEG-TS
|
|
||||||
//! stream resyncs at the next keyframe.
|
|
||||||
|
|
||||||
use anyhow::{Context, Result, bail};
|
use anyhow::{Context, Result, bail};
|
||||||
use ashpd::{
|
use ashpd::{
|
||||||
@@ -19,41 +15,29 @@ use nix::sys::signal::{Signal, kill};
|
|||||||
use nix::unistd::{Pid, close};
|
use nix::unistd::{Pid, close};
|
||||||
use std::os::fd::{AsFd, IntoRawFd, OwnedFd, RawFd};
|
use std::os::fd::{AsFd, IntoRawFd, OwnedFd, RawFd};
|
||||||
use std::process::Stdio;
|
use std::process::Stdio;
|
||||||
use std::sync::Arc;
|
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
use tokio::process::{Child, Command};
|
||||||
use tokio::net::{TcpListener, TcpStream};
|
use tokio::time::timeout;
|
||||||
use tokio::process::{Child, ChildStdout, Command};
|
|
||||||
use tokio::sync::broadcast;
|
|
||||||
use tokio::task::JoinHandle;
|
|
||||||
use tokio::time::{Instant, sleep, timeout};
|
|
||||||
|
|
||||||
|
use super::serve::Serve;
|
||||||
use crate::cli::HostOpts;
|
use crate::cli::HostOpts;
|
||||||
|
|
||||||
/// Broadcast-channel capacity in chunks. Each chunk is up to 64 KiB from gst
|
|
||||||
/// stdout, so 16 chunks ≈ 1 MiB ≈ ~2 s of buffered jitter at the default
|
|
||||||
/// 4 Mbps bitrate. A viewer that falls behind by more than this gets Lagged
|
|
||||||
/// and skips ahead — MPEG-TS resyncs at the next keyframe.
|
|
||||||
const FANOUT_CAPACITY: usize = 16;
|
|
||||||
|
|
||||||
/// Size of each chunk read from gst stdout.
|
|
||||||
const READ_CHUNK: usize = 64 * 1024;
|
|
||||||
|
|
||||||
pub struct CaptureHandle {
|
pub struct CaptureHandle {
|
||||||
port: u16,
|
|
||||||
gst: Option<Child>,
|
gst: Option<Child>,
|
||||||
reader: Option<JoinHandle<()>>,
|
serve: Option<Serve>,
|
||||||
server: Option<JoinHandle<()>>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl CaptureHandle {
|
impl CaptureHandle {
|
||||||
pub fn local_port(&self) -> u16 {
|
pub fn local_port(&self) -> u16 {
|
||||||
self.port
|
self.serve
|
||||||
|
.as_ref()
|
||||||
|
.expect("serve is always Some until shutdown")
|
||||||
|
.local_port()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Graceful teardown: SIGTERM gst, give it ~1s to exit, then SIGKILL, then
|
/// Graceful teardown: SIGTERM gst, give it ~1s to exit, then SIGKILL,
|
||||||
/// abort the reader + accept-loop tasks. Call this before dropping; Drop
|
/// then tear down the serve layer. The serve reader will see EOF on
|
||||||
/// only fires the kill backstop.
|
/// gst stdout and exit on its own; serve.shutdown() is the backstop.
|
||||||
pub async fn shutdown(mut self) {
|
pub async fn shutdown(mut self) {
|
||||||
if let Some(child) = self.gst.as_mut()
|
if let Some(child) = self.gst.as_mut()
|
||||||
&& let Some(pid) = child.id()
|
&& let Some(pid) = child.id()
|
||||||
@@ -64,11 +48,8 @@ impl CaptureHandle {
|
|||||||
let _ = timeout(Duration::from_millis(1000), child.wait()).await;
|
let _ = timeout(Duration::from_millis(1000), child.wait()).await;
|
||||||
let _ = child.start_kill();
|
let _ = child.start_kill();
|
||||||
}
|
}
|
||||||
if let Some(task) = self.reader.take() {
|
if let Some(serve) = self.serve.take() {
|
||||||
task.abort();
|
serve.shutdown().await;
|
||||||
}
|
|
||||||
if let Some(task) = self.server.take() {
|
|
||||||
task.abort();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -78,12 +59,7 @@ impl Drop for CaptureHandle {
|
|||||||
if let Some(child) = self.gst.as_mut() {
|
if let Some(child) = self.gst.as_mut() {
|
||||||
let _ = child.start_kill();
|
let _ = child.start_kill();
|
||||||
}
|
}
|
||||||
if let Some(task) = self.reader.as_ref() {
|
// Serve's own Drop aborts its tasks.
|
||||||
task.abort();
|
|
||||||
}
|
|
||||||
if let Some(task) = self.server.as_ref() {
|
|
||||||
task.abort();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -130,15 +106,9 @@ pub async fn start(opts: &HostOpts) -> Result<CaptureHandle> {
|
|||||||
clear_cloexec(&pw_fd)?;
|
clear_cloexec(&pw_fd)?;
|
||||||
let raw_fd: RawFd = pw_fd.into_raw_fd();
|
let raw_fd: RawFd = pw_fd.into_raw_fd();
|
||||||
|
|
||||||
// 2. Bind the in-process HTTP listener on a random localhost port.
|
// 2. Spawn gst-launch with the full pipeline: video AND audio captured,
|
||||||
let listener = TcpListener::bind("127.0.0.1:0")
|
|
||||||
.await
|
|
||||||
.context("could not bind local capture HTTP listener")?;
|
|
||||||
let port = listener.local_addr()?.port();
|
|
||||||
|
|
||||||
// 3. Spawn gst-launch with the full pipeline: video AND audio captured,
|
|
||||||
// encoded, and muxed into MPEG-TS inside gst. Output goes to stdout,
|
// encoded, and muxed into MPEG-TS inside gst. Output goes to stdout,
|
||||||
// which we pipe straight to our HTTP server task — no demux/remux,
|
// which the serve layer pipes to its HTTP fanout — no demux/remux,
|
||||||
// no codec assumptions.
|
// no codec assumptions.
|
||||||
let key_interval = (opts.framerate * 2).to_string();
|
let key_interval = (opts.framerate * 2).to_string();
|
||||||
let bitrate = opts.bitrate.to_string();
|
let bitrate = opts.bitrate.to_string();
|
||||||
@@ -220,118 +190,16 @@ pub async fn start(opts: &HostOpts) -> Result<CaptureHandle> {
|
|||||||
.take()
|
.take()
|
||||||
.context("gst-launch-1.0 stdout pipe unavailable")?;
|
.context("gst-launch-1.0 stdout pipe unavailable")?;
|
||||||
|
|
||||||
// 4. Set up the broadcast fanout. The reader task pumps gst stdout chunks
|
// 3. Hand stdout to the serve layer, which binds the localhost HTTP
|
||||||
// into the channel; the accept-loop task spawns one sender task per
|
// listener and runs the broadcast fanout.
|
||||||
// accepted TCP connection, each draining a fresh broadcast::Receiver.
|
let serve = Serve::bind(gst_stdout).await?;
|
||||||
let (tx, _) = broadcast::channel::<Arc<Vec<u8>>>(FANOUT_CAPACITY);
|
|
||||||
|
|
||||||
let reader = tokio::spawn(pump_gst_to_broadcast(gst_stdout, tx.clone()));
|
|
||||||
let server = tokio::spawn(run_accept_loop(listener, tx));
|
|
||||||
|
|
||||||
Ok(CaptureHandle {
|
Ok(CaptureHandle {
|
||||||
port,
|
|
||||||
gst: Some(gst),
|
gst: Some(gst),
|
||||||
reader: Some(reader),
|
serve: Some(serve),
|
||||||
server: Some(server),
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Reads gst's stdout in chunks and broadcasts each to all current subscribers.
|
|
||||||
/// `broadcast::send` returns Err when there are no receivers; we ignore it and
|
|
||||||
/// keep reading so gst doesn't backpressure waiting for a viewer.
|
|
||||||
async fn pump_gst_to_broadcast(
|
|
||||||
mut gst_stdout: ChildStdout,
|
|
||||||
tx: broadcast::Sender<Arc<Vec<u8>>>,
|
|
||||||
) {
|
|
||||||
let mut buf = vec![0u8; READ_CHUNK];
|
|
||||||
loop {
|
|
||||||
match gst_stdout.read(&mut buf).await {
|
|
||||||
Ok(0) => {
|
|
||||||
tracing::info!("gst stdout EOF — fanout reader exiting");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
Ok(n) => {
|
|
||||||
let chunk = Arc::new(buf[..n].to_vec());
|
|
||||||
let _ = tx.send(chunk);
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
tracing::warn!("gst stdout read error: {e}");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Accepts TCP connections on the local capture port forever. Each accepted
|
|
||||||
/// connection becomes its own viewer-serving task with a private receiver.
|
|
||||||
async fn run_accept_loop(listener: TcpListener, tx: broadcast::Sender<Arc<Vec<u8>>>) {
|
|
||||||
loop {
|
|
||||||
let sock = match listener.accept().await {
|
|
||||||
Ok((s, _)) => s,
|
|
||||||
Err(e) => {
|
|
||||||
tracing::warn!("capture HTTP accept failed: {e}");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
let rx = tx.subscribe();
|
|
||||||
tokio::spawn(serve_one_viewer(sock, rx));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Drains the HTTP request, writes a fixed 200 OK, then pumps broadcast
|
|
||||||
/// chunks to the socket until the channel closes or the socket errors out.
|
|
||||||
/// On Lagged (slow consumer), skip ahead — MPEG-TS recovers at next keyframe.
|
|
||||||
async fn serve_one_viewer(mut sock: TcpStream, mut rx: broadcast::Receiver<Arc<Vec<u8>>>) {
|
|
||||||
if !drain_http_request(&mut sock).await {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
const RESPONSE: &[u8] = b"HTTP/1.1 200 OK\r\n\
|
|
||||||
Content-Type: video/mp2t\r\n\
|
|
||||||
Cache-Control: no-cache, no-store\r\n\
|
|
||||||
Connection: close\r\n\
|
|
||||||
\r\n";
|
|
||||||
if sock.write_all(RESPONSE).await.is_err() {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
loop {
|
|
||||||
match rx.recv().await {
|
|
||||||
Ok(chunk) => {
|
|
||||||
if sock.write_all(&chunk).await.is_err() {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Err(broadcast::error::RecvError::Lagged(skipped)) => {
|
|
||||||
tracing::warn!(
|
|
||||||
skipped,
|
|
||||||
"viewer fanout lagged — MPEG-TS will resync at next keyframe"
|
|
||||||
);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
Err(broadcast::error::RecvError::Closed) => return,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn drain_http_request(sock: &mut TcpStream) -> bool {
|
|
||||||
let mut buf = [0u8; 1024];
|
|
||||||
let mut total = Vec::with_capacity(512);
|
|
||||||
loop {
|
|
||||||
match sock.read(&mut buf).await {
|
|
||||||
Ok(0) => return false,
|
|
||||||
Ok(n) => total.extend_from_slice(&buf[..n]),
|
|
||||||
Err(_) => return false,
|
|
||||||
}
|
|
||||||
if total.windows(4).any(|w| w == b"\r\n\r\n") {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
if total.len() > 16 * 1024 {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn clear_cloexec(fd: &impl AsFd) -> Result<()> {
|
fn clear_cloexec(fd: &impl AsFd) -> Result<()> {
|
||||||
let flags_int = fcntl(fd.as_fd(), FcntlArg::F_GETFD).context("F_GETFD on pipewire fd")?;
|
let flags_int = fcntl(fd.as_fd(), FcntlArg::F_GETFD).context("F_GETFD on pipewire fd")?;
|
||||||
let mut flags = FdFlag::from_bits_truncate(flags_int);
|
let mut flags = FdFlag::from_bits_truncate(flags_int);
|
||||||
@@ -340,22 +208,6 @@ fn clear_cloexec(fd: &impl AsFd) -> Result<()> {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Connect to the in-process capture HTTP listener, retrying until it's up or
|
|
||||||
/// we time out. Returns the connected socket — the listener accepts exactly
|
|
||||||
/// one connection (the bridge socket), so this stream IS the bridge socket.
|
|
||||||
pub async fn connect_to_capture(port: u16, max_wait: Duration) -> Result<TcpStream> {
|
|
||||||
let deadline = Instant::now() + max_wait;
|
|
||||||
loop {
|
|
||||||
match TcpStream::connect(("127.0.0.1", port)).await {
|
|
||||||
Ok(stream) => return Ok(stream),
|
|
||||||
Err(_) if Instant::now() < deadline => {
|
|
||||||
sleep(Duration::from_millis(50)).await;
|
|
||||||
}
|
|
||||||
Err(e) => bail!("capture HTTP listener never came up on 127.0.0.1:{port}: {e}"),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn default_audio_monitor() -> Result<String> {
|
async fn default_audio_monitor() -> Result<String> {
|
||||||
let output = Command::new("pactl")
|
let output = Command::new("pactl")
|
||||||
.arg("get-default-sink")
|
.arg("get-default-sink")
|
||||||
|
|||||||
Reference in New Issue
Block a user