Working Wayland end-to-end: gst owns the pipeline, ffmpeg just serves
Moves the full capture+encode+mux pipeline into gst-launch, leaving
ffmpeg as a thin HTTP server. Verified end-to-end on KDE Plasma 6
Wayland: screencast portal → mpv mirror-tunnel rendering in real time.
Pipeline:
pipewiresrc(do-timestamp) → videoconvert → x264enc (zerolatency
ultrafast) → h264parse(config-interval=-1) → byte-stream caps →
mpegtsmux ← (aacparse ← avenc_aac ← audioconvert ← pulsesrc) →
fdsink fd=1
ffmpeg -fflags nobuffer+discardcorrupt+genpts -flags low_delay
-analyzeduration 0 -probesize 32 -f mpegts -i pipe:0 -c copy
-f mpegts -listen 1 http://127.0.0.1:N
Why each piece is load-bearing (do not relitigate without cause):
- x264enc + h264parse + byte-stream caps: raw video over a pipe hits
stride/format negotiation problems (green screens with mis-aligned
rows). Encoding inside gst sidesteps that entirely.
- mpegtsmux inside gst: H.264 Annex B carries no timestamps. Without
a container, ffmpeg sees "Timestamps are unset" and downstream
muxing breaks. mpegts in gst preserves pipewiresrc's clock.
- byte-stream + alignment=au caps: h264parse defaults to AVC format
(length-prefixed NALUs) for some downstreams; ffmpeg's mpegts
demuxer needs Annex B start codes.
- audio in gst (pulsesrc + avenc_aac): keeping ffmpeg as a pure
passthrough (`-c copy`) avoids ffmpeg's audio-input dependency
delaying HTTP serving until both inputs are ready.
- `-analyzeduration 0 -probesize 32`: stop ffmpeg from buffering 5MB
/ 5s of input before deciding it understands the stream.
- Also fixes a separate one-shot bug from earlier: the previous
health-probe in wait_for_listener consumed ffmpeg's single
`-listen 1` accept slot, so the actual bridge connect hit
Connection refused. Replaced with connect_to_capture which
returns the bridge socket.
Adds dep checks for pipewiresrc, x264enc, h264parse, mpegtsmux,
pulsesrc, avenc_aac, aacparse with per-distro install hints.
Known gap: VLC currently shows a green screen against the stream
even though mpv works fine. Likely VLC-specific demuxer/latency
settings, not a pipeline correctness issue — to investigate as a
follow-up. mpv is the recommended client either way.
This commit is contained in:
@@ -10,6 +10,12 @@ pub fn check_host_binaries(display: DisplayServer) -> Result<()> {
|
|||||||
require("gst-launch-1.0")?;
|
require("gst-launch-1.0")?;
|
||||||
require("gst-inspect-1.0")?;
|
require("gst-inspect-1.0")?;
|
||||||
require_gst_element("pipewiresrc")?;
|
require_gst_element("pipewiresrc")?;
|
||||||
|
require_gst_element("x264enc")?;
|
||||||
|
require_gst_element("h264parse")?;
|
||||||
|
require_gst_element("mpegtsmux")?;
|
||||||
|
require_gst_element("pulsesrc")?;
|
||||||
|
require_gst_element("avenc_aac")?;
|
||||||
|
require_gst_element("aacparse")?;
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@@ -72,6 +78,34 @@ fn install_hint_for_gst_element(name: &str) -> String {
|
|||||||
Some("opensuse" | "opensuse-tumbleweed" | "opensuse-leap") => "pipewire-gstreamer",
|
Some("opensuse" | "opensuse-tumbleweed" | "opensuse-leap") => "pipewire-gstreamer",
|
||||||
_ => "the GStreamer PipeWire plugin",
|
_ => "the GStreamer PipeWire plugin",
|
||||||
},
|
},
|
||||||
|
"x264enc" => match distro.as_deref() {
|
||||||
|
Some("arch" | "cachyos" | "manjaro" | "endeavouros") => "gst-plugins-ugly",
|
||||||
|
Some("debian" | "ubuntu" | "pop" | "linuxmint") => "gstreamer1.0-plugins-ugly",
|
||||||
|
Some("fedora" | "nobara") => "gstreamer1-plugins-ugly",
|
||||||
|
Some("opensuse" | "opensuse-tumbleweed" | "opensuse-leap") => "gstreamer-plugins-ugly",
|
||||||
|
_ => "the GStreamer x264 plugin",
|
||||||
|
},
|
||||||
|
"h264parse" | "mpegtsmux" | "aacparse" => match distro.as_deref() {
|
||||||
|
Some("arch" | "cachyos" | "manjaro" | "endeavouros") => "gst-plugins-bad",
|
||||||
|
Some("debian" | "ubuntu" | "pop" | "linuxmint") => "gstreamer1.0-plugins-bad",
|
||||||
|
Some("fedora" | "nobara") => "gstreamer1-plugins-bad-free",
|
||||||
|
Some("opensuse" | "opensuse-tumbleweed" | "opensuse-leap") => "gstreamer-plugins-bad",
|
||||||
|
_ => "the GStreamer plugins-bad set",
|
||||||
|
},
|
||||||
|
"pulsesrc" => match distro.as_deref() {
|
||||||
|
Some("arch" | "cachyos" | "manjaro" | "endeavouros") => "gst-plugins-good",
|
||||||
|
Some("debian" | "ubuntu" | "pop" | "linuxmint") => "gstreamer1.0-pulseaudio",
|
||||||
|
Some("fedora" | "nobara") => "gstreamer1-plugins-good",
|
||||||
|
Some("opensuse" | "opensuse-tumbleweed" | "opensuse-leap") => "gstreamer-plugins-good",
|
||||||
|
_ => "the GStreamer PulseAudio plugin",
|
||||||
|
},
|
||||||
|
"avenc_aac" => match distro.as_deref() {
|
||||||
|
Some("arch" | "cachyos" | "manjaro" | "endeavouros") => "gst-libav",
|
||||||
|
Some("debian" | "ubuntu" | "pop" | "linuxmint") => "gstreamer1.0-libav",
|
||||||
|
Some("fedora" | "nobara") => "gstreamer1-libav",
|
||||||
|
Some("opensuse" | "opensuse-tumbleweed" | "opensuse-leap") => "gstreamer-libav",
|
||||||
|
_ => "the GStreamer libav (avenc) plugin",
|
||||||
|
},
|
||||||
_ => name,
|
_ => name,
|
||||||
};
|
};
|
||||||
install_command(&distro, pkg)
|
install_command(&distro, pkg)
|
||||||
|
|||||||
+1
-3
@@ -5,7 +5,6 @@ use anyhow::{Result, bail};
|
|||||||
use iroh::Endpoint;
|
use iroh::Endpoint;
|
||||||
use iroh::endpoint::{Connection, presets};
|
use iroh::endpoint::{Connection, presets};
|
||||||
use iroh_tickets::endpoint::EndpointTicket;
|
use iroh_tickets::endpoint::EndpointTicket;
|
||||||
use tokio::net::TcpStream;
|
|
||||||
use tokio_util::sync::CancellationToken;
|
use tokio_util::sync::CancellationToken;
|
||||||
|
|
||||||
use crate::cli::HostOpts;
|
use crate::cli::HostOpts;
|
||||||
@@ -73,8 +72,7 @@ async fn handle_peer(
|
|||||||
|
|
||||||
let capture_handle = capture::spawn(display, opts).await?;
|
let capture_handle = capture::spawn(display, opts).await?;
|
||||||
let port = capture_handle.local_port();
|
let port = capture_handle.local_port();
|
||||||
wayland::wait_for_listener(port, std::time::Duration::from_secs(5)).await?;
|
let tcp = wayland::connect_to_capture(port, std::time::Duration::from_secs(5)).await?;
|
||||||
let tcp = TcpStream::connect(("127.0.0.1", port)).await?;
|
|
||||||
|
|
||||||
let bridge = crate::common::tunnel::bridge(quic_send, quic_recv, tcp);
|
let bridge = crate::common::tunnel::bridge(quic_send, quic_recv, tcp);
|
||||||
|
|
||||||
|
|||||||
+82
-60
@@ -98,6 +98,7 @@ pub async fn start(opts: &HostOpts) -> Result<CaptureHandle> {
|
|||||||
.context("portal returned a stream with no size — pipewiresrc can't infer dimensions")?;
|
.context("portal returned a stream with no size — pipewiresrc can't infer dimensions")?;
|
||||||
|
|
||||||
let pw_fd: OwnedFd = proxy.open_pipe_wire_remote(&session).await?;
|
let pw_fd: OwnedFd = proxy.open_pipe_wire_remote(&session).await?;
|
||||||
|
tracing::info!(node_id, width = w, height = h, "portal handshake complete");
|
||||||
// The fd is CLOEXEC by default; the gst child needs to inherit it across
|
// The fd is CLOEXEC by default; the gst child needs to inherit it across
|
||||||
// exec. We then leak it via into_raw_fd so its lifetime spans the spawn,
|
// exec. We then leak it via into_raw_fd so its lifetime spans the spawn,
|
||||||
// and close the parent's copy once gst is running.
|
// and close the parent's copy once gst is running.
|
||||||
@@ -107,28 +108,76 @@ pub async fn start(opts: &HostOpts) -> Result<CaptureHandle> {
|
|||||||
// 2. Reserve a localhost port for ffmpeg's HTTP listener.
|
// 2. Reserve a localhost port for ffmpeg's HTTP listener.
|
||||||
let port = pick_random_port()?;
|
let port = pick_random_port()?;
|
||||||
|
|
||||||
// 3. Spawn gst-launch → raw NV12 on stdout.
|
// 3. Spawn gst-launch with the full pipeline: video AND audio captured,
|
||||||
let mut gst = Command::new("gst-launch-1.0")
|
// encoded, and muxed into MPEG-TS inside gst. ffmpeg becomes a dumb
|
||||||
|
// pass-through HTTP server (`-c copy`), which avoids ffmpeg's input
|
||||||
|
// analysis stalls and timestamp-generation guesswork.
|
||||||
|
let key_interval = (opts.framerate * 2).to_string();
|
||||||
|
let bitrate = opts.bitrate.to_string();
|
||||||
|
let mut gst_cmd = Command::new("gst-launch-1.0");
|
||||||
|
gst_cmd
|
||||||
.args([
|
.args([
|
||||||
"-q",
|
// muxer + sink
|
||||||
"pipewiresrc",
|
"mpegtsmux",
|
||||||
&format!("fd={raw_fd}"),
|
"name=mux",
|
||||||
&format!("path={node_id}"),
|
|
||||||
"!",
|
"!",
|
||||||
"videoconvert",
|
"queue",
|
||||||
"!",
|
|
||||||
"video/x-raw,format=NV12",
|
|
||||||
"!",
|
"!",
|
||||||
"fdsink",
|
"fdsink",
|
||||||
"fd=1",
|
"fd=1",
|
||||||
|
// video branch
|
||||||
|
"pipewiresrc",
|
||||||
|
&format!("fd={raw_fd}"),
|
||||||
|
&format!("path={node_id}"),
|
||||||
|
"do-timestamp=true",
|
||||||
|
"!",
|
||||||
|
"queue",
|
||||||
|
"!",
|
||||||
|
"videoconvert",
|
||||||
|
"!",
|
||||||
|
"video/x-raw,format=I420",
|
||||||
|
"!",
|
||||||
|
"x264enc",
|
||||||
|
"tune=zerolatency",
|
||||||
|
"speed-preset=ultrafast",
|
||||||
|
&format!("bitrate={bitrate}"),
|
||||||
|
&format!("key-int-max={key_interval}"),
|
||||||
|
"!",
|
||||||
|
"video/x-h264,profile=baseline",
|
||||||
|
"!",
|
||||||
|
"h264parse",
|
||||||
|
"config-interval=-1",
|
||||||
|
"!",
|
||||||
|
"video/x-h264,stream-format=byte-stream,alignment=au",
|
||||||
|
"!",
|
||||||
|
"mux.",
|
||||||
|
// audio branch
|
||||||
|
"pulsesrc",
|
||||||
|
"do-timestamp=true",
|
||||||
|
"!",
|
||||||
|
"queue",
|
||||||
|
"!",
|
||||||
|
"audioconvert",
|
||||||
|
"!",
|
||||||
|
"audioresample",
|
||||||
|
"!",
|
||||||
|
"audio/x-raw,rate=48000,channels=2",
|
||||||
|
"!",
|
||||||
|
"avenc_aac",
|
||||||
|
"bitrate=128000",
|
||||||
|
"!",
|
||||||
|
"aacparse",
|
||||||
|
"!",
|
||||||
|
"mux.",
|
||||||
])
|
])
|
||||||
.stdin(Stdio::null())
|
.stdin(Stdio::null())
|
||||||
.stdout(Stdio::piped())
|
.stdout(Stdio::piped())
|
||||||
.stderr(Stdio::inherit())
|
.stderr(Stdio::inherit());
|
||||||
.spawn()
|
if std::env::var_os("PIXELPASS_GST_DEBUG").is_some() {
|
||||||
.context("failed to spawn gst-launch-1.0")?;
|
gst_cmd.env("GST_DEBUG", "3");
|
||||||
|
}
|
||||||
|
let mut gst = gst_cmd.spawn().context("failed to spawn gst-launch-1.0")?;
|
||||||
// Parent no longer needs the pipewire fd — gst inherited its own copy.
|
// Parent no longer needs the pipewire fd — gst inherited its own copy.
|
||||||
// Ignore close errors; the worst case is a leaked fd until our exit.
|
|
||||||
let _ = close(raw_fd);
|
let _ = close(raw_fd);
|
||||||
|
|
||||||
let gst_stdout = gst
|
let gst_stdout = gst
|
||||||
@@ -139,12 +188,11 @@ pub async fn start(opts: &HostOpts) -> Result<CaptureHandle> {
|
|||||||
.try_into()
|
.try_into()
|
||||||
.context("could not convert gst stdout into ffmpeg stdin")?;
|
.context("could not convert gst stdout into ffmpeg stdin")?;
|
||||||
|
|
||||||
// 4. Spawn ffmpeg consuming gst stdout, producing MPEG-TS HTTP.
|
// 4. ffmpeg: re-mux pre-encoded H.264 + add pulse audio → MPEG-TS HTTP.
|
||||||
|
// Width/height/framerate are embedded in the h264 stream; ffmpeg
|
||||||
|
// doesn't need our portal-reported dimensions.
|
||||||
let url = format!("http://127.0.0.1:{port}");
|
let url = format!("http://127.0.0.1:{port}");
|
||||||
let bitrate_arg = format!("{}k", opts.bitrate);
|
let _ = (w, h);
|
||||||
let video_size = format!("{w}x{h}");
|
|
||||||
let framerate = opts.framerate.to_string();
|
|
||||||
let gop = opts.framerate.to_string();
|
|
||||||
|
|
||||||
let ffmpeg = Command::new("ffmpeg")
|
let ffmpeg = Command::new("ffmpeg")
|
||||||
.stdin(ffmpeg_stdin)
|
.stdin(ffmpeg_stdin)
|
||||||
@@ -153,44 +201,20 @@ pub async fn start(opts: &HostOpts) -> Result<CaptureHandle> {
|
|||||||
.args([
|
.args([
|
||||||
"-loglevel",
|
"-loglevel",
|
||||||
"warning",
|
"warning",
|
||||||
"-f",
|
|
||||||
"rawvideo",
|
|
||||||
"-pix_fmt",
|
|
||||||
"nv12",
|
|
||||||
"-video_size",
|
|
||||||
&video_size,
|
|
||||||
"-framerate",
|
|
||||||
&framerate,
|
|
||||||
"-i",
|
|
||||||
"pipe:0",
|
|
||||||
"-f",
|
|
||||||
"pulse",
|
|
||||||
"-i",
|
|
||||||
"default",
|
|
||||||
"-c:v",
|
|
||||||
"libx264",
|
|
||||||
"-preset",
|
|
||||||
"ultrafast",
|
|
||||||
"-tune",
|
|
||||||
"zerolatency",
|
|
||||||
"-bf",
|
|
||||||
"0",
|
|
||||||
"-g",
|
|
||||||
&gop,
|
|
||||||
"-b:v",
|
|
||||||
&bitrate_arg,
|
|
||||||
"-maxrate",
|
|
||||||
&bitrate_arg,
|
|
||||||
"-bufsize",
|
|
||||||
&bitrate_arg,
|
|
||||||
"-c:a",
|
|
||||||
"aac",
|
|
||||||
"-b:a",
|
|
||||||
"128k",
|
|
||||||
"-fflags",
|
"-fflags",
|
||||||
"nobuffer",
|
"nobuffer+discardcorrupt+genpts",
|
||||||
"-flags",
|
"-flags",
|
||||||
"low_delay",
|
"low_delay",
|
||||||
|
"-analyzeduration",
|
||||||
|
"0",
|
||||||
|
"-probesize",
|
||||||
|
"32",
|
||||||
|
"-f",
|
||||||
|
"mpegts",
|
||||||
|
"-i",
|
||||||
|
"pipe:0",
|
||||||
|
"-c",
|
||||||
|
"copy",
|
||||||
"-f",
|
"-f",
|
||||||
"mpegts",
|
"mpegts",
|
||||||
"-listen",
|
"-listen",
|
||||||
@@ -223,16 +247,14 @@ fn clear_cloexec(fd: &impl AsFd) -> Result<()> {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Wait until ffmpeg's `-listen 1` server actually accepts a TCP connection,
|
/// Connect to ffmpeg's `-listen 1` HTTP listener, retrying until it's up or we
|
||||||
/// or time out. Returns Ok(()) on success.
|
/// time out. Returns the connected socket — `-listen 1` is a one-shot listener
|
||||||
pub async fn wait_for_listener(port: u16, max_wait: Duration) -> Result<()> {
|
/// so this stream IS the bridge socket; don't probe and discard.
|
||||||
|
pub async fn connect_to_capture(port: u16, max_wait: Duration) -> Result<tokio::net::TcpStream> {
|
||||||
let deadline = Instant::now() + max_wait;
|
let deadline = Instant::now() + max_wait;
|
||||||
loop {
|
loop {
|
||||||
match tokio::net::TcpStream::connect(("127.0.0.1", port)).await {
|
match tokio::net::TcpStream::connect(("127.0.0.1", port)).await {
|
||||||
Ok(stream) => {
|
Ok(stream) => return Ok(stream),
|
||||||
drop(stream);
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
Err(_) if Instant::now() < deadline => {
|
Err(_) if Instant::now() < deadline => {
|
||||||
sleep(Duration::from_millis(50)).await;
|
sleep(Duration::from_millis(50)).await;
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user