pw_utils: Implement explicit sync

Largely following the Mutter implementation:
https://gitlab.gnome.org/GNOME/mutter/-/merge_requests/3876
This commit is contained in:
Ivan Molodetskikh
2025-07-18 23:07:22 +03:00
parent 62b8b11909
commit ce76877b04

View File

@@ -1,9 +1,10 @@
use std::cell::{Cell, RefCell};
use std::collections::HashMap;
use std::io::Cursor;
use std::io::{self, Cursor};
use std::iter::zip;
use std::mem;
use std::os::fd::{AsFd, AsRawFd, BorrowedFd};
use std::os::fd::{AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, OwnedFd, RawFd};
use std::ptr::NonNull;
use std::rc::Rc;
use std::time::Duration;
@@ -28,6 +29,7 @@ use pipewire::spa::utils::{
};
use pipewire::spa::{self};
use pipewire::stream::{Stream, StreamFlags, StreamListener, StreamState};
use pipewire::sys::{pw_buffer, pw_stream_queue_buffer};
use smithay::backend::allocator::dmabuf::{AsDmabuf, Dmabuf};
use smithay::backend::allocator::format::FormatSet;
use smithay::backend::allocator::gbm::{GbmBuffer, GbmBufferFlags, GbmDevice};
@@ -36,9 +38,11 @@ use smithay::backend::drm::DrmDeviceFd;
use smithay::backend::renderer::damage::OutputDamageTracker;
use smithay::backend::renderer::element::RenderElement;
use smithay::backend::renderer::gles::GlesRenderer;
use smithay::backend::renderer::sync::SyncPoint;
use smithay::output::{Output, OutputModeSource};
use smithay::reexports::calloop::generic::Generic;
use smithay::reexports::calloop::{Interest, LoopHandle, Mode, PostAction};
use smithay::reexports::drm::control::{syncobj, Device as _};
use smithay::reexports::gbm::Modifier;
use smithay::utils::{Physical, Scale, Size, Transform};
use zbus::object_server::SignalEmitter;
@@ -51,6 +55,47 @@ use crate::utils::get_monotonic_time;
// Give a 0.1 ms allowance for presentation time errors.
const CAST_DELAY_ALLOWANCE: Duration = Duration::from_micros(100);
// Added in PipeWire 1.2.0.
#[allow(non_upper_case_globals)]
const SPA_META_SyncTimeline: spa_meta_type = 9;
#[allow(non_upper_case_globals)]
const SPA_PARAM_BUFFERS_metaType: spa_param_buffers = 7;
#[allow(non_upper_case_globals)]
const SPA_DATA_SyncObj: spa_data_type = 5;
#[allow(non_camel_case_types)]
#[repr(C)]
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
struct spa_meta_sync_timeline {
pub flags: u32,
pub padding: u32,
pub acquire_point: u64,
pub release_point: u64,
}
/// A map of syncobj fd => handle for proper Drop.
struct SyncobjMap {
gbm: GbmDevice<DrmDeviceFd>,
map: HashMap<RawFd, syncobj::Handle>,
}
impl Drop for SyncobjMap {
fn drop(&mut self) {
if !self.map.is_empty() {
debug!("dropping syncobjs on an abruptly stopped cast");
for (fd, syncobj) in self.map.drain() {
unsafe {
if let Err(err) = self.gbm.destroy_syncobj(syncobj) {
warn!("error destroying syncobj: {err:?}");
}
drop(OwnedFd::from_raw_fd(fd));
}
}
}
}
}
pub struct PipeWire {
_context: Context,
pub core: Core,
@@ -80,6 +125,11 @@ pub struct Cast {
pub last_frame_time: Duration,
min_time_between_frames: Rc<Cell<Duration>>,
dmabufs: Rc<RefCell<HashMap<i64, Dmabuf>>>,
syncobjs: Rc<RefCell<SyncobjMap>>,
// Buffers we dequeued from PipeWire that are waiting for their release sync point to be
// signalled before we can use them.
dequeued_buffers: Rc<RefCell<Vec<NonNull<pw_buffer>>>>,
gbm: GbmDevice<DrmDeviceFd>,
scheduled_redraw: Option<RegistrationToken>,
}
@@ -219,6 +269,12 @@ impl PipeWire {
let is_active = Rc::new(Cell::new(false));
let min_time_between_frames = Rc::new(Cell::new(Duration::ZERO));
let dmabufs = Rc::new(RefCell::new(HashMap::new()));
let syncobjs = SyncobjMap {
gbm: gbm.clone(),
map: HashMap::new(),
};
let syncobjs = Rc::new(RefCell::new(syncobjs));
let dequeued_buffers = Rc::new(RefCell::new(Vec::new()));
let refresh = Rc::new(Cell::new(refresh));
let pending_size = Size::from((size.w as u32, size.h as u32));
@@ -493,37 +549,20 @@ impl PipeWire {
}
};
// const BPP: u32 = 4;
// let stride = format.size().width * BPP;
// let size = stride * format.size().height;
let o1 = make_buffers_params(plane_count, true);
// Fallback without SyncTimeline.
let o2 = make_buffers_params(plane_count, false);
let o1 = pod::object!(
SpaTypes::ObjectParamBuffers,
ParamType::Buffers,
let o3 = pod::object!(
SpaTypes::ObjectParamMeta,
ParamType::Meta,
Property::new(
SPA_PARAM_BUFFERS_buffers,
pod::Value::Choice(ChoiceValue::Int(Choice(
ChoiceFlags::empty(),
ChoiceEnum::Range {
default: 16,
min: 2,
max: 16
}
))),
SPA_PARAM_META_type,
pod::Value::Id(spa::utils::Id(SPA_META_SyncTimeline))
),
Property::new(SPA_PARAM_BUFFERS_blocks, pod::Value::Int(plane_count)),
// Property::new(SPA_PARAM_BUFFERS_size, pod::Value::Int(size as i32)),
// Property::new(SPA_PARAM_BUFFERS_stride, pod::Value::Int(stride as i32)),
// Property::new(SPA_PARAM_BUFFERS_align, pod::Value::Int(16)),
Property::new(
SPA_PARAM_BUFFERS_dataType,
pod::Value::Choice(ChoiceValue::Int(Choice(
ChoiceFlags::empty(),
ChoiceEnum::Flags {
default: 1 << DataType::DmaBuf.as_raw(),
flags: vec![1 << DataType::DmaBuf.as_raw()],
},
))),
SPA_PARAM_META_size,
pod::Value::Int(size_of::<spa_meta_sync_timeline>() as i32)
),
);
@@ -539,10 +578,14 @@ impl PipeWire {
// pod::Value::Int(size_of::<spa_meta_header>() as i32)
// ),
// );
let mut b1 = vec![];
// let mut b2 = vec![];
let mut b2 = vec![];
let mut b3 = vec![];
let mut params = [
make_pod(&mut b1, o1), // make_pod(&mut b2, o2)
make_pod(&mut b1, o1),
make_pod(&mut b2, o2),
make_pod(&mut b3, o3),
];
if let Err(err) = stream.update_params(&mut params) {
@@ -552,7 +595,9 @@ impl PipeWire {
}
})
.add_buffer({
let gbm = gbm.clone();
let dmabufs = dmabufs.clone();
let syncobjs = syncobjs.clone();
let stop_cast = stop_cast.clone();
let state = state.clone();
move |stream, (), buffer| {
@@ -592,8 +637,18 @@ impl PipeWire {
}
};
let plane_count = dmabuf.num_planes();
assert_eq!((*spa_buffer).n_datas as usize, plane_count);
let have_sync_timeline = !spa_buffer_find_meta_data(
spa_buffer,
SPA_META_SyncTimeline,
mem::size_of::<spa_meta_sync_timeline>(),
)
.is_null();
let mut expected_n_datas = dmabuf.num_planes();
if have_sync_timeline {
expected_n_datas += 2;
}
assert_eq!((*spa_buffer).n_datas as usize, expected_n_datas);
for (i, fd) in dmabuf.handles().enumerate() {
let spa_data = (*spa_buffer).datas.add(i);
@@ -611,6 +666,12 @@ impl PipeWire {
let fd = (*(*spa_buffer).datas).fd;
assert!(dmabufs.borrow_mut().insert(fd, dmabuf).is_none());
let syncobjs = &mut *syncobjs.borrow_mut();
if let Err(err) = maybe_create_syncobj(&gbm, spa_buffer, &mut syncobjs.map)
{
warn!("error filling syncobj buffer data: {err:?}");
};
}
// During size re-negotiation, the stream sometimes just keeps running, in
@@ -622,6 +683,9 @@ impl PipeWire {
})
.remove_buffer({
let dmabufs = dmabufs.clone();
let syncobjs = syncobjs.clone();
let dequeued_buffers = dequeued_buffers.clone();
let gbm = gbm.clone();
move |_stream, (), buffer| {
trace!("pw stream: remove_buffer");
@@ -631,7 +695,29 @@ impl PipeWire {
assert!((*spa_buffer).n_datas > 0);
let fd = (*spa_data).fd;
dmabufs.borrow_mut().remove(&fd);
if let Some(dmabuf) = dmabufs.borrow_mut().remove(&fd) {
let have_sync_timeline = !spa_buffer_find_meta_data(
spa_buffer,
SPA_META_SyncTimeline,
mem::size_of::<spa_meta_sync_timeline>(),
)
.is_null();
let mut expected_n_datas = dmabuf.num_planes();
if have_sync_timeline {
expected_n_datas += 2;
}
assert_eq!((*spa_buffer).n_datas as usize, expected_n_datas);
let syncobjs = &mut *syncobjs.borrow_mut();
maybe_remove_syncobj(&gbm, spa_buffer, &mut syncobjs.map);
dequeued_buffers
.borrow_mut()
.retain(|buf: &NonNull<_>| buf.as_ptr() != buffer);
} else {
error!("missing dmabuf in remove_buffer()");
}
}
}
})
@@ -667,6 +753,9 @@ impl PipeWire {
last_frame_time: Duration::ZERO,
min_time_between_frames,
dmabufs,
syncobjs,
dequeued_buffers,
gbm,
scheduled_redraw: None,
};
Ok(cast)
@@ -820,6 +909,33 @@ impl Cast {
}
}
fn dequeue_available_buffer(&mut self) -> Option<NonNull<pw_buffer>> {
let mut syncobjs = self.syncobjs.borrow_mut();
let syncobjs = &mut syncobjs.map;
unsafe {
// Check if any already-dequeued buffers are ready.
let mut dequeued_buffers = self.dequeued_buffers.borrow_mut();
for (i, buffer) in dequeued_buffers.iter().enumerate() {
if can_reuse_pw_buffer(&self.gbm, *buffer, syncobjs) {
debug!("buffer is now ready, yielding");
return Some(dequeued_buffers.remove(i));
}
}
while let Some(buffer) = NonNull::new(self.stream.dequeue_raw_buffer()) {
if can_reuse_pw_buffer(&self.gbm, buffer, syncobjs) {
return Some(buffer);
}
debug!("buffer isn't ready yet, storing");
dequeued_buffers.push(buffer);
}
}
None
}
pub fn dequeue_buffer_and_render(
&mut self,
renderer: &mut GlesRenderer,
@@ -828,7 +944,8 @@ impl Cast {
scale: Scale<f64>,
wait_for_sync: bool,
) -> bool {
let CastState::Ready { damage_tracker, .. } = &mut *self.state.borrow_mut() else {
let mut state = self.state.borrow_mut();
let CastState::Ready { damage_tracker, .. } = &mut *state else {
error!("cast must be in Ready state to render");
return false;
};
@@ -848,57 +965,75 @@ impl Cast {
trace!("no damage, skipping frame");
return false;
}
drop(state);
let Some(mut buffer) = self.stream.dequeue_buffer() else {
warn!("no available buffer in pw stream, skipping frame");
return false;
};
unsafe {
let Some(pw_buffer) = self.dequeue_available_buffer() else {
warn!("no available buffer in pw stream, skipping frame");
return false;
};
let pw_buffer = pw_buffer.as_ptr();
let fd = buffer.datas_mut()[0].as_raw().fd;
let dmabuf = &self.dmabufs.borrow()[&fd];
let spa_buffer = (*pw_buffer).buffer;
let fd = (*(*spa_buffer).datas).fd;
let dmabuf = &self.dmabufs.borrow()[&fd];
match render_to_dmabuf(
renderer,
dmabuf.clone(),
size,
scale,
Transform::Normal,
elements.iter().rev(),
) {
Ok(sync_point) => {
// FIXME: implement PipeWire explicit sync, and at the very least async wait.
if wait_for_sync {
let _span = tracy_client::span!("wait for completion");
if let Err(err) = sync_point.wait() {
warn!("error waiting for pw frame completion: {err:?}");
match render_to_dmabuf(
renderer,
dmabuf.clone(),
size,
scale,
Transform::Normal,
elements.iter().rev(),
) {
Ok(sync_point) => {
// FIXME: implement PipeWire explicit sync, and at the very least async wait.
if wait_for_sync {
let _span = tracy_client::span!("wait for completion");
if let Err(err) = sync_point.wait() {
warn!("error waiting for pw frame completion: {err:?}");
}
}
let syncobjs = &mut *self.syncobjs.borrow_mut();
if let Err(err) =
maybe_set_sync_points(&self.gbm, spa_buffer, &mut syncobjs.map, &sync_point)
{
warn!("error setting sync point: {err:?}");
};
}
Err(err) => {
warn!("error rendering to dmabuf: {err:?}");
return_unused_buffer(&self.stream, pw_buffer);
return false;
}
}
Err(err) => {
warn!("error rendering to dmabuf: {err:?}");
return false;
for (i, (stride, offset)) in zip(dmabuf.strides(), dmabuf.offsets()).enumerate() {
let spa_data = (*spa_buffer).datas.add(i);
let chunk = (*spa_data).chunk;
// With DMA-BUFs, consumers should ignore the size field, and producers are allowed
// to set it to 0.
//
// https://docs.pipewire.org/page_dma_buf.html
//
// However, OBS checks for size != 0 as a workaround for old compositor versions,
// so we set it to 1.
(*chunk).size = 1;
// Clear the corrupted flag we may have set before.
(*chunk).flags = SPA_CHUNK_FLAG_NONE as i32;
(*chunk).stride = stride as i32;
(*chunk).offset = offset;
trace!(
"pw buffer: fd = {}, stride = {stride}, offset = {offset}",
(*spa_data).fd
);
}
}
for (data, (stride, offset)) in
zip(buffer.datas_mut(), zip(dmabuf.strides(), dmabuf.offsets()))
{
let chunk = data.chunk_mut();
// With DMA-BUFs, consumers should ignore the size field, and producers are allowed to
// set it to 0.
//
// https://docs.pipewire.org/page_dma_buf.html
//
// However, OBS checks for size != 0 as a workaround for old compositor versions,
// so we set it to 1.
*chunk.size_mut() = 1;
*chunk.stride_mut() = stride as i32;
*chunk.offset_mut() = offset;
trace!(
"pw buffer: fd = {}, stride = {stride}, offset = {offset}",
data.as_raw().fd
);
pw_stream_queue_buffer(self.stream.as_raw_ptr(), pw_buffer);
}
true
@@ -914,49 +1049,66 @@ impl Cast {
*damage_tracker = None;
};
let Some(mut buffer) = self.stream.dequeue_buffer() else {
warn!("no available buffer in pw stream, skipping clear");
return false;
};
unsafe {
let Some(pw_buffer) = self.dequeue_available_buffer() else {
warn!("no available buffer in pw stream, skipping clear");
return false;
};
let pw_buffer = pw_buffer.as_ptr();
let fd = buffer.datas_mut()[0].as_raw().fd;
let dmabuf = &self.dmabufs.borrow()[&fd];
let spa_buffer = (*pw_buffer).buffer;
let fd = (*(*spa_buffer).datas).fd;
let dmabuf = &self.dmabufs.borrow()[&fd];
match clear_dmabuf(renderer, dmabuf.clone()) {
Ok(sync_point) => {
// FIXME: implement PipeWire explicit sync, and at the very least async wait.
if wait_for_sync {
let _span = tracy_client::span!("wait for completion");
if let Err(err) = sync_point.wait() {
warn!("error waiting for pw frame completion: {err:?}");
match clear_dmabuf(renderer, dmabuf.clone()) {
Ok(sync_point) => {
// FIXME: implement PipeWire explicit sync, and at the very least async wait.
if wait_for_sync {
let _span = tracy_client::span!("wait for completion");
if let Err(err) = sync_point.wait() {
warn!("error waiting for pw frame completion: {err:?}");
}
}
let syncobjs = &mut *self.syncobjs.borrow_mut();
if let Err(err) =
maybe_set_sync_points(&self.gbm, spa_buffer, &mut syncobjs.map, &sync_point)
{
warn!("error setting sync point: {err:?}");
};
}
Err(err) => {
warn!("error clearing dmabuf: {err:?}");
return_unused_buffer(&self.stream, pw_buffer);
return false;
}
}
Err(err) => {
warn!("error clearing dmabuf: {err:?}");
return false;
for (i, (stride, offset)) in zip(dmabuf.strides(), dmabuf.offsets()).enumerate() {
let spa_data = (*spa_buffer).datas.add(i);
let chunk = (*spa_data).chunk;
// With DMA-BUFs, consumers should ignore the size field, and producers are allowed
// to set it to 0.
//
// https://docs.pipewire.org/page_dma_buf.html
//
// However, OBS checks for size != 0 as a workaround for old compositor versions,
// so we set it to 1.
(*chunk).size = 1;
// Clear the corrupted flag we may have set before.
(*chunk).flags = SPA_CHUNK_FLAG_NONE as i32;
(*chunk).stride = stride as i32;
(*chunk).offset = offset;
trace!(
"pw buffer: fd = {}, stride = {stride}, offset = {offset}",
(*spa_data).fd
);
}
}
for (data, (stride, offset)) in
zip(buffer.datas_mut(), zip(dmabuf.strides(), dmabuf.offsets()))
{
let chunk = data.chunk_mut();
// With DMA-BUFs, consumers should ignore the size field, and producers are allowed to
// set it to 0.
//
// https://docs.pipewire.org/page_dma_buf.html
//
// However, OBS checks for size != 0 as a workaround for old compositor versions,
// so we set it to 1.
*chunk.size_mut() = 1;
*chunk.stride_mut() = stride as i32;
*chunk.offset_mut() = offset;
trace!(
"pw buffer: fd = {}, stride = {stride}, offset = {offset}",
data.as_raw().fd
);
pw_stream_queue_buffer(self.stream.as_raw_ptr(), pw_buffer);
}
true
@@ -1060,6 +1212,52 @@ fn make_video_params(
)
}
fn make_buffers_params(mut plane_count: i32, sync_timeline: bool) -> pod::Object {
if sync_timeline {
// Two extra file descriptors for acquire and release.
plane_count += 2;
}
let mut object = pod::object!(
SpaTypes::ObjectParamBuffers,
ParamType::Buffers,
Property::new(
SPA_PARAM_BUFFERS_buffers,
pod::Value::Choice(ChoiceValue::Int(Choice(
ChoiceFlags::empty(),
ChoiceEnum::Range {
default: 16,
min: 2,
max: 16
}
))),
),
Property::new(SPA_PARAM_BUFFERS_blocks, pod::Value::Int(plane_count)),
Property::new(
SPA_PARAM_BUFFERS_dataType,
pod::Value::Choice(ChoiceValue::Int(Choice(
ChoiceFlags::empty(),
ChoiceEnum::Flags {
default: 1 << DataType::DmaBuf.as_raw(),
flags: vec![1 << DataType::DmaBuf.as_raw()],
},
))),
),
);
if sync_timeline {
// TODO: do we need to gate this behind runtime check for PW 1.2.0? What happens on older
// PW?
object.properties.push(Property {
key: SPA_PARAM_BUFFERS_metaType,
flags: PropertyFlags::MANDATORY,
value: pod::Value::Int(1 << SPA_META_SyncTimeline),
});
}
object
}
fn make_pod(buffer: &mut Vec<u8>, object: pod::Object) -> &Pod {
PodSerializer::serialize(Cursor::new(&mut *buffer), &pod::Value::Object(object)).unwrap();
Pod::from_bytes(buffer).unwrap()
@@ -1129,3 +1327,238 @@ fn allocate_dmabuf(
.context("error exporting GBM buffer object as dmabuf")?;
Ok(dmabuf)
}
unsafe fn maybe_create_syncobj(
gbm: &GbmDevice<DrmDeviceFd>,
spa_buffer: *mut spa_buffer,
syncobjs: &mut HashMap<RawFd, syncobj::Handle>,
) -> anyhow::Result<()> {
unsafe {
let sync_timeline: *mut spa_meta_sync_timeline = spa_buffer_find_meta_data(
spa_buffer,
SPA_META_SyncTimeline,
mem::size_of::<spa_meta_sync_timeline>(),
)
.cast();
if sync_timeline.is_null() {
return Ok(());
}
let syncobj = gbm
.create_syncobj(false)
.context("error creating syncobj")?;
let fd = match gbm.syncobj_to_fd(syncobj, false) {
Ok(x) => x,
Err(err) => {
let _ = gbm.destroy_syncobj(syncobj);
return Err(err).context("error exporting syncobj to fd");
}
};
debug!("filling syncobj fd={fd:?}");
let n_datas = (*spa_buffer).n_datas as usize;
assert!(n_datas >= 2);
let acquire_data = (*spa_buffer).datas.add(n_datas - 2);
(*acquire_data).type_ = SPA_DATA_SyncObj;
(*acquire_data).flags = SPA_DATA_FLAG_READABLE;
(*acquire_data).fd = i64::from(fd.as_raw_fd());
let release_data = (*spa_buffer).datas.add(n_datas - 1);
(*release_data).type_ = SPA_DATA_SyncObj;
(*release_data).flags = SPA_DATA_FLAG_READABLE;
(*release_data).fd = i64::from(fd.as_raw_fd());
syncobjs.insert(fd.into_raw_fd(), syncobj);
Ok(())
}
}
unsafe fn maybe_remove_syncobj(
gbm: &GbmDevice<DrmDeviceFd>,
spa_buffer: *mut spa_buffer,
syncobjs: &mut HashMap<RawFd, syncobj::Handle>,
) {
unsafe {
let sync_timeline: *mut spa_meta_sync_timeline = spa_buffer_find_meta_data(
spa_buffer,
SPA_META_SyncTimeline,
mem::size_of::<spa_meta_sync_timeline>(),
)
.cast();
if sync_timeline.is_null() {
return;
}
let n_datas = (*spa_buffer).n_datas as usize;
assert!(n_datas >= 2);
let acquire_data = (*spa_buffer).datas.add(n_datas - 2);
let fd = (*acquire_data).fd as RawFd;
debug!("removing syncobj fd={fd:?}");
let Some(syncobj) = syncobjs.remove(&fd) else {
error!("missing syncobj in remove_buffer()");
return;
};
if let Err(err) = gbm.destroy_syncobj(syncobj) {
warn!("error destroying syncobj: {err:?}");
}
drop(OwnedFd::from_raw_fd(fd));
}
}
unsafe fn maybe_set_sync_points(
gbm: &GbmDevice<DrmDeviceFd>,
spa_buffer: *mut spa_buffer,
syncobjs: &mut HashMap<RawFd, syncobj::Handle>,
sync_point: &SyncPoint,
) -> anyhow::Result<()> {
unsafe {
let sync_timeline: *mut spa_meta_sync_timeline = spa_buffer_find_meta_data(
spa_buffer,
SPA_META_SyncTimeline,
mem::size_of::<spa_meta_sync_timeline>(),
)
.cast();
if sync_timeline.is_null() {
return Ok(());
}
// At this point, we must ensure that our syncobj contains a fence, since clients can do a
// blocking wait until the fence is available (OBS does this).
// TODO
let n_datas = (*spa_buffer).n_datas as usize;
assert!(n_datas >= 2);
let acquire_data = (*spa_buffer).datas.add(n_datas - 2);
let fd = (*acquire_data).fd as RawFd;
let Some(syncobj) = syncobjs.get(&fd) else {
error!("missing syncobj in maybe_set_sync_points()");
return Ok(());
};
let Some(sync_fd) = sync_point.export() else {
debug!("have sync_timeline but no sync_fd to export");
return Ok(());
};
let acquire_point = (*sync_timeline).release_point + 1;
// Import sync_fd into our syncobj at the correct point.
let tmp = gbm
.create_syncobj(false)
.context("error creating temp syncobj")?;
let res = drm_import_sync_file(gbm, tmp, sync_fd.as_fd())
.context("error importing sync_fd to temp syncobj");
let res = if res.is_ok() {
gbm.syncobj_timeline_transfer(tmp, *syncobj, 0, acquire_point)
.context("error transferring sync point")
} else {
res
};
let _ = gbm.destroy_syncobj(tmp);
let () = res?;
(*sync_timeline).acquire_point = acquire_point;
(*sync_timeline).release_point = acquire_point + 1;
debug!("set sync timeline fd={fd:?} to acquire={acquire_point}");
Ok(())
}
}
// Our own version until drm-ffi is fixed:
// https://github.com/Smithay/drm-rs/issues/224
unsafe fn drm_import_sync_file(
gbm: &GbmDevice<DrmDeviceFd>,
syncobj: syncobj::Handle,
sync_file: BorrowedFd,
) -> io::Result<()> {
use drm_ffi::drm_sys::*;
use rustix::ioctl::{self, ioctl, Opcode, Updater};
use smithay::reexports::rustix;
unsafe fn fd_to_handle(fd: BorrowedFd, data: &mut drm_syncobj_handle) -> io::Result<()> {
const OPCODE: Opcode =
ioctl::opcode::read_write::<drm_syncobj_handle>(DRM_IOCTL_BASE, 0xC2);
Ok(ioctl(fd, Updater::<OPCODE, drm_syncobj_handle>::new(data))?)
}
let mut args = drm_syncobj_handle {
handle: u32::from(syncobj),
flags: DRM_SYNCOBJ_FD_TO_HANDLE_FLAGS_IMPORT_SYNC_FILE,
fd: sync_file.as_raw_fd(),
pad: 0,
};
unsafe { fd_to_handle(gbm.as_fd(), &mut args) }
}
unsafe fn can_reuse_pw_buffer(
gbm: &GbmDevice<DrmDeviceFd>,
pw_buffer: NonNull<pw_buffer>,
syncobjs: &mut HashMap<RawFd, syncobj::Handle>,
) -> bool {
unsafe {
let spa_buffer = (*pw_buffer.as_ptr()).buffer;
let sync_timeline: *mut spa_meta_sync_timeline = spa_buffer_find_meta_data(
spa_buffer,
SPA_META_SyncTimeline,
mem::size_of::<spa_meta_sync_timeline>(),
)
.cast();
if sync_timeline.is_null() {
// No explicit sync, can always reuse.
return true;
}
let n_datas = (*spa_buffer).n_datas as usize;
assert!(n_datas >= 2);
let release_data = (*spa_buffer).datas.add(n_datas - 1);
let fd = (*release_data).fd as RawFd;
let Some(syncobj) = syncobjs.get(&fd) else {
error!("missing syncobj in can_reuse_pw_buffer()");
return false;
};
let mut points = [0];
if let Err(err) = gbm.syncobj_timeline_query(&[*syncobj], &mut points, false) {
warn!("error querying timeline signaled point: {err:?}");
return false;
}
// For fresh buffers, this will return 0 and the condition will work out to true.
let latest_signaled_point = points[0];
debug!(
"latest signaled point for fd={fd:?} is {latest_signaled_point}; release point is {}",
(*sync_timeline).release_point
);
latest_signaled_point >= (*sync_timeline).release_point
}
}
unsafe fn return_unused_buffer(stream: &Stream, pw_buffer: *mut pw_buffer) {
// pw_stream_return_buffer() requires too new PipeWire (1.4.0). So, mark as
// corrupted and queue.
let spa_buffer = (*pw_buffer).buffer;
let chunk = (*(*spa_buffer).datas).chunk;
(*chunk).size = 0;
(*chunk).flags = SPA_CHUNK_FLAG_CORRUPTED as i32;
pw_stream_queue_buffer(stream.as_raw_ptr(), pw_buffer);
}