pw_utils: Switch to using raw pw_buffers

We're gonna need to store these in the future, and the lifetime on the pw-rs
Buffer prevents us from easily doing that. Besides, we'll need access to
metadata which pw-rs doesn't expose yet.
This commit is contained in:
Ivan Molodetskikh
2025-08-02 23:05:05 +03:00
parent 2e3935d77d
commit 8fba696d8e

View File

@@ -4,6 +4,7 @@ use std::io::Cursor;
use std::iter::zip;
use std::mem;
use std::os::fd::{AsFd, AsRawFd, BorrowedFd};
use std::ptr::NonNull;
use std::rc::Rc;
use std::time::Duration;
@@ -28,6 +29,7 @@ use pipewire::spa::utils::{
};
use pipewire::spa::{self};
use pipewire::stream::{Stream, StreamFlags, StreamListener, StreamState};
use pipewire::sys::{pw_buffer, pw_stream_queue_buffer};
use smithay::backend::allocator::dmabuf::{AsDmabuf, Dmabuf};
use smithay::backend::allocator::format::FormatSet;
use smithay::backend::allocator::gbm::{GbmBuffer, GbmBufferFlags, GbmDevice};
@@ -838,6 +840,10 @@ impl Cast {
}
}
fn dequeue_available_buffer(&mut self) -> Option<NonNull<pw_buffer>> {
unsafe { NonNull::new(self.stream.dequeue_raw_buffer()) }
}
pub fn dequeue_buffer_and_render(
&mut self,
renderer: &mut GlesRenderer,
@@ -868,57 +874,70 @@ impl Cast {
trace!("no damage, skipping frame");
return false;
}
drop(inner);
let Some(mut buffer) = self.stream.dequeue_buffer() else {
let Some(pw_buffer) = self.dequeue_available_buffer() else {
warn!("no available buffer in pw stream, skipping frame");
return false;
};
let buffer = pw_buffer.as_ptr();
let fd = buffer.datas_mut()[0].as_raw().fd;
let dmabuf = &inner.dmabufs[&fd];
let inner = self.inner.borrow_mut();
unsafe {
let spa_buffer = (*buffer).buffer;
match render_to_dmabuf(
renderer,
dmabuf.clone(),
size,
scale,
Transform::Normal,
elements.iter().rev(),
) {
Ok(sync_point) => {
// FIXME: implement PipeWire explicit sync, and at the very least async wait.
if wait_for_sync {
let _span = tracy_client::span!("wait for completion");
if let Err(err) = sync_point.wait() {
warn!("error waiting for pw frame completion: {err:?}");
let fd = (*(*spa_buffer).datas).fd;
let dmabuf = &inner.dmabufs[&fd];
match render_to_dmabuf(
renderer,
dmabuf.clone(),
size,
scale,
Transform::Normal,
elements.iter().rev(),
) {
Ok(sync_point) => {
// FIXME: implement PipeWire explicit sync, and at the very least async wait.
if wait_for_sync {
let _span = tracy_client::span!("wait for completion");
if let Err(err) = sync_point.wait() {
warn!("error waiting for pw frame completion: {err:?}");
}
}
}
Err(err) => {
warn!("error rendering to dmabuf: {err:?}");
return_unused_buffer(&self.stream, pw_buffer);
return false;
}
}
Err(err) => {
warn!("error rendering to dmabuf: {err:?}");
return false;
for (i, (stride, offset)) in zip(dmabuf.strides(), dmabuf.offsets()).enumerate() {
let spa_data = (*spa_buffer).datas.add(i);
let chunk = (*spa_data).chunk;
// With DMA-BUFs, consumers should ignore the size field, and producers are allowed
// to set it to 0.
//
// https://docs.pipewire.org/page_dma_buf.html
//
// However, OBS checks for size != 0 as a workaround for old compositor versions,
// so we set it to 1.
(*chunk).size = 1;
// Clear the corrupted flag we may have set before.
(*chunk).flags = SPA_CHUNK_FLAG_NONE as i32;
(*chunk).stride = stride as i32;
(*chunk).offset = offset;
trace!(
"pw buffer: fd = {}, stride = {stride}, offset = {offset}",
(*spa_data).fd
);
}
}
for (data, (stride, offset)) in
zip(buffer.datas_mut(), zip(dmabuf.strides(), dmabuf.offsets()))
{
let chunk = data.chunk_mut();
// With DMA-BUFs, consumers should ignore the size field, and producers are allowed to
// set it to 0.
//
// https://docs.pipewire.org/page_dma_buf.html
//
// However, OBS checks for size != 0 as a workaround for old compositor versions,
// so we set it to 1.
*chunk.size_mut() = 1;
*chunk.stride_mut() = stride as i32;
*chunk.offset_mut() = offset;
trace!(
"pw buffer: fd = {}, stride = {stride}, offset = {offset}",
data.as_raw().fd
);
pw_stream_queue_buffer(self.stream.as_raw_ptr(), buffer);
}
true
@@ -935,50 +954,63 @@ impl Cast {
if let CastState::Ready { damage_tracker, .. } = &mut inner.state {
*damage_tracker = None;
};
drop(inner);
let Some(mut buffer) = self.stream.dequeue_buffer() else {
warn!("no available buffer in pw stream, skipping clear");
let Some(pw_buffer) = self.dequeue_available_buffer() else {
warn!("no available buffer in pw stream, skipping frame");
return false;
};
let buffer = pw_buffer.as_ptr();
let fd = buffer.datas_mut()[0].as_raw().fd;
let dmabuf = &inner.dmabufs[&fd];
let inner = self.inner.borrow_mut();
unsafe {
let spa_buffer = (*buffer).buffer;
match clear_dmabuf(renderer, dmabuf.clone()) {
Ok(sync_point) => {
// FIXME: implement PipeWire explicit sync, and at the very least async wait.
if wait_for_sync {
let _span = tracy_client::span!("wait for completion");
if let Err(err) = sync_point.wait() {
warn!("error waiting for pw frame completion: {err:?}");
let fd = (*(*spa_buffer).datas).fd;
let dmabuf = &inner.dmabufs[&fd];
match clear_dmabuf(renderer, dmabuf.clone()) {
Ok(sync_point) => {
// FIXME: implement PipeWire explicit sync, and at the very least async wait.
if wait_for_sync {
let _span = tracy_client::span!("wait for completion");
if let Err(err) = sync_point.wait() {
warn!("error waiting for pw frame completion: {err:?}");
}
}
}
Err(err) => {
warn!("error clearing dmabuf: {err:?}");
return_unused_buffer(&self.stream, pw_buffer);
return false;
}
}
Err(err) => {
warn!("error clearing dmabuf: {err:?}");
return false;
for (i, (stride, offset)) in zip(dmabuf.strides(), dmabuf.offsets()).enumerate() {
let spa_data = (*spa_buffer).datas.add(i);
let chunk = (*spa_data).chunk;
// With DMA-BUFs, consumers should ignore the size field, and producers are allowed
// to set it to 0.
//
// https://docs.pipewire.org/page_dma_buf.html
//
// However, OBS checks for size != 0 as a workaround for old compositor versions,
// so we set it to 1.
(*chunk).size = 1;
// Clear the corrupted flag we may have set before.
(*chunk).flags = SPA_CHUNK_FLAG_NONE as i32;
(*chunk).stride = stride as i32;
(*chunk).offset = offset;
trace!(
"pw buffer: fd = {}, stride = {stride}, offset = {offset}",
(*spa_data).fd
);
}
}
for (data, (stride, offset)) in
zip(buffer.datas_mut(), zip(dmabuf.strides(), dmabuf.offsets()))
{
let chunk = data.chunk_mut();
// With DMA-BUFs, consumers should ignore the size field, and producers are allowed to
// set it to 0.
//
// https://docs.pipewire.org/page_dma_buf.html
//
// However, OBS checks for size != 0 as a workaround for old compositor versions,
// so we set it to 1.
*chunk.size_mut() = 1;
*chunk.stride_mut() = stride as i32;
*chunk.offset_mut() = offset;
trace!(
"pw buffer: fd = {}, stride = {stride}, offset = {offset}",
data.as_raw().fd
);
pw_stream_queue_buffer(self.stream.as_raw_ptr(), buffer);
}
true
@@ -1151,3 +1183,15 @@ fn allocate_dmabuf(
.context("error exporting GBM buffer object as dmabuf")?;
Ok(dmabuf)
}
unsafe fn return_unused_buffer(stream: &Stream, pw_buffer: NonNull<pw_buffer>) {
// pw_stream_return_buffer() requires too new PipeWire (1.4.0). So, mark as
// corrupted and queue.
let pw_buffer = pw_buffer.as_ptr();
let spa_buffer = (*pw_buffer).buffer;
let chunk = (*(*spa_buffer).datas).chunk;
// Some (older?) consumers will check for size == 0 instead of the CORRUPTED flag.
(*chunk).size = 0;
(*chunk).flags = SPA_CHUNK_FLAG_CORRUPTED as i32;
pw_stream_queue_buffer(stream.as_raw_ptr(), pw_buffer);
}