diff --git a/src/common/deps.rs b/src/common/deps.rs index 8bc7f00..77be2a4 100644 --- a/src/common/deps.rs +++ b/src/common/deps.rs @@ -10,6 +10,12 @@ pub fn check_host_binaries(display: DisplayServer) -> Result<()> { require("gst-launch-1.0")?; require("gst-inspect-1.0")?; require_gst_element("pipewiresrc")?; + require_gst_element("x264enc")?; + require_gst_element("h264parse")?; + require_gst_element("mpegtsmux")?; + require_gst_element("pulsesrc")?; + require_gst_element("avenc_aac")?; + require_gst_element("aacparse")?; } Ok(()) } @@ -72,6 +78,34 @@ fn install_hint_for_gst_element(name: &str) -> String { Some("opensuse" | "opensuse-tumbleweed" | "opensuse-leap") => "pipewire-gstreamer", _ => "the GStreamer PipeWire plugin", }, + "x264enc" => match distro.as_deref() { + Some("arch" | "cachyos" | "manjaro" | "endeavouros") => "gst-plugins-ugly", + Some("debian" | "ubuntu" | "pop" | "linuxmint") => "gstreamer1.0-plugins-ugly", + Some("fedora" | "nobara") => "gstreamer1-plugins-ugly", + Some("opensuse" | "opensuse-tumbleweed" | "opensuse-leap") => "gstreamer-plugins-ugly", + _ => "the GStreamer x264 plugin", + }, + "h264parse" | "mpegtsmux" | "aacparse" => match distro.as_deref() { + Some("arch" | "cachyos" | "manjaro" | "endeavouros") => "gst-plugins-bad", + Some("debian" | "ubuntu" | "pop" | "linuxmint") => "gstreamer1.0-plugins-bad", + Some("fedora" | "nobara") => "gstreamer1-plugins-bad-free", + Some("opensuse" | "opensuse-tumbleweed" | "opensuse-leap") => "gstreamer-plugins-bad", + _ => "the GStreamer plugins-bad set", + }, + "pulsesrc" => match distro.as_deref() { + Some("arch" | "cachyos" | "manjaro" | "endeavouros") => "gst-plugins-good", + Some("debian" | "ubuntu" | "pop" | "linuxmint") => "gstreamer1.0-pulseaudio", + Some("fedora" | "nobara") => "gstreamer1-plugins-good", + Some("opensuse" | "opensuse-tumbleweed" | "opensuse-leap") => "gstreamer-plugins-good", + _ => "the GStreamer PulseAudio plugin", + }, + "avenc_aac" => match distro.as_deref() { + Some("arch" | "cachyos" | "manjaro" | "endeavouros") => "gst-libav", + Some("debian" | "ubuntu" | "pop" | "linuxmint") => "gstreamer1.0-libav", + Some("fedora" | "nobara") => "gstreamer1-libav", + Some("opensuse" | "opensuse-tumbleweed" | "opensuse-leap") => "gstreamer-libav", + _ => "the GStreamer libav (avenc) plugin", + }, _ => name, }; install_command(&distro, pkg) diff --git a/src/host/mod.rs b/src/host/mod.rs index 6deb2bd..25d776b 100644 --- a/src/host/mod.rs +++ b/src/host/mod.rs @@ -5,7 +5,6 @@ use anyhow::{Result, bail}; use iroh::Endpoint; use iroh::endpoint::{Connection, presets}; use iroh_tickets::endpoint::EndpointTicket; -use tokio::net::TcpStream; use tokio_util::sync::CancellationToken; use crate::cli::HostOpts; @@ -73,8 +72,7 @@ async fn handle_peer( let capture_handle = capture::spawn(display, opts).await?; let port = capture_handle.local_port(); - wayland::wait_for_listener(port, std::time::Duration::from_secs(5)).await?; - let tcp = TcpStream::connect(("127.0.0.1", port)).await?; + let tcp = wayland::connect_to_capture(port, std::time::Duration::from_secs(5)).await?; let bridge = crate::common::tunnel::bridge(quic_send, quic_recv, tcp); diff --git a/src/host/wayland.rs b/src/host/wayland.rs index 4bf100e..abe7a1c 100644 --- a/src/host/wayland.rs +++ b/src/host/wayland.rs @@ -98,6 +98,7 @@ pub async fn start(opts: &HostOpts) -> Result { .context("portal returned a stream with no size — pipewiresrc can't infer dimensions")?; let pw_fd: OwnedFd = proxy.open_pipe_wire_remote(&session).await?; + tracing::info!(node_id, width = w, height = h, "portal handshake complete"); // The fd is CLOEXEC by default; the gst child needs to inherit it across // exec. We then leak it via into_raw_fd so its lifetime spans the spawn, // and close the parent's copy once gst is running. @@ -107,28 +108,76 @@ pub async fn start(opts: &HostOpts) -> Result { // 2. Reserve a localhost port for ffmpeg's HTTP listener. let port = pick_random_port()?; - // 3. Spawn gst-launch → raw NV12 on stdout. - let mut gst = Command::new("gst-launch-1.0") + // 3. Spawn gst-launch with the full pipeline: video AND audio captured, + // encoded, and muxed into MPEG-TS inside gst. ffmpeg becomes a dumb + // pass-through HTTP server (`-c copy`), which avoids ffmpeg's input + // analysis stalls and timestamp-generation guesswork. + let key_interval = (opts.framerate * 2).to_string(); + let bitrate = opts.bitrate.to_string(); + let mut gst_cmd = Command::new("gst-launch-1.0"); + gst_cmd .args([ - "-q", - "pipewiresrc", - &format!("fd={raw_fd}"), - &format!("path={node_id}"), + // muxer + sink + "mpegtsmux", + "name=mux", "!", - "videoconvert", - "!", - "video/x-raw,format=NV12", + "queue", "!", "fdsink", "fd=1", + // video branch + "pipewiresrc", + &format!("fd={raw_fd}"), + &format!("path={node_id}"), + "do-timestamp=true", + "!", + "queue", + "!", + "videoconvert", + "!", + "video/x-raw,format=I420", + "!", + "x264enc", + "tune=zerolatency", + "speed-preset=ultrafast", + &format!("bitrate={bitrate}"), + &format!("key-int-max={key_interval}"), + "!", + "video/x-h264,profile=baseline", + "!", + "h264parse", + "config-interval=-1", + "!", + "video/x-h264,stream-format=byte-stream,alignment=au", + "!", + "mux.", + // audio branch + "pulsesrc", + "do-timestamp=true", + "!", + "queue", + "!", + "audioconvert", + "!", + "audioresample", + "!", + "audio/x-raw,rate=48000,channels=2", + "!", + "avenc_aac", + "bitrate=128000", + "!", + "aacparse", + "!", + "mux.", ]) .stdin(Stdio::null()) .stdout(Stdio::piped()) - .stderr(Stdio::inherit()) - .spawn() - .context("failed to spawn gst-launch-1.0")?; + .stderr(Stdio::inherit()); + if std::env::var_os("PIXELPASS_GST_DEBUG").is_some() { + gst_cmd.env("GST_DEBUG", "3"); + } + let mut gst = gst_cmd.spawn().context("failed to spawn gst-launch-1.0")?; // Parent no longer needs the pipewire fd — gst inherited its own copy. - // Ignore close errors; the worst case is a leaked fd until our exit. let _ = close(raw_fd); let gst_stdout = gst @@ -139,12 +188,11 @@ pub async fn start(opts: &HostOpts) -> Result { .try_into() .context("could not convert gst stdout into ffmpeg stdin")?; - // 4. Spawn ffmpeg consuming gst stdout, producing MPEG-TS HTTP. + // 4. ffmpeg: re-mux pre-encoded H.264 + add pulse audio → MPEG-TS HTTP. + // Width/height/framerate are embedded in the h264 stream; ffmpeg + // doesn't need our portal-reported dimensions. let url = format!("http://127.0.0.1:{port}"); - let bitrate_arg = format!("{}k", opts.bitrate); - let video_size = format!("{w}x{h}"); - let framerate = opts.framerate.to_string(); - let gop = opts.framerate.to_string(); + let _ = (w, h); let ffmpeg = Command::new("ffmpeg") .stdin(ffmpeg_stdin) @@ -153,44 +201,20 @@ pub async fn start(opts: &HostOpts) -> Result { .args([ "-loglevel", "warning", - "-f", - "rawvideo", - "-pix_fmt", - "nv12", - "-video_size", - &video_size, - "-framerate", - &framerate, - "-i", - "pipe:0", - "-f", - "pulse", - "-i", - "default", - "-c:v", - "libx264", - "-preset", - "ultrafast", - "-tune", - "zerolatency", - "-bf", - "0", - "-g", - &gop, - "-b:v", - &bitrate_arg, - "-maxrate", - &bitrate_arg, - "-bufsize", - &bitrate_arg, - "-c:a", - "aac", - "-b:a", - "128k", "-fflags", - "nobuffer", + "nobuffer+discardcorrupt+genpts", "-flags", "low_delay", + "-analyzeduration", + "0", + "-probesize", + "32", + "-f", + "mpegts", + "-i", + "pipe:0", + "-c", + "copy", "-f", "mpegts", "-listen", @@ -223,16 +247,14 @@ fn clear_cloexec(fd: &impl AsFd) -> Result<()> { Ok(()) } -/// Wait until ffmpeg's `-listen 1` server actually accepts a TCP connection, -/// or time out. Returns Ok(()) on success. -pub async fn wait_for_listener(port: u16, max_wait: Duration) -> Result<()> { +/// Connect to ffmpeg's `-listen 1` HTTP listener, retrying until it's up or we +/// time out. Returns the connected socket — `-listen 1` is a one-shot listener +/// so this stream IS the bridge socket; don't probe and discard. +pub async fn connect_to_capture(port: u16, max_wait: Duration) -> Result { let deadline = Instant::now() + max_wait; loop { match tokio::net::TcpStream::connect(("127.0.0.1", port)).await { - Ok(stream) => { - drop(stream); - return Ok(()); - } + Ok(stream) => return Ok(stream), Err(_) if Instant::now() < deadline => { sleep(Duration::from_millis(50)).await; }