Compare commits

...

2 Commits

Author SHA1 Message Date
Ivan Molodetskikh ce76877b04 pw_utils: Implement explicit sync
Largely following the Mutter implementation:
https://gitlab.gnome.org/GNOME/mutter/-/merge_requests/3876
2025-07-20 11:24:44 +03:00
Ivan Molodetskikh 62b8b11909 pw_utils: Add clarifying comments on maxsize and size 2025-07-20 09:41:46 +03:00
+551 -100
View File
@@ -1,9 +1,10 @@
use std::cell::{Cell, RefCell}; use std::cell::{Cell, RefCell};
use std::collections::HashMap; use std::collections::HashMap;
use std::io::Cursor; use std::io::{self, Cursor};
use std::iter::zip; use std::iter::zip;
use std::mem; use std::mem;
use std::os::fd::{AsFd, AsRawFd, BorrowedFd}; use std::os::fd::{AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, OwnedFd, RawFd};
use std::ptr::NonNull;
use std::rc::Rc; use std::rc::Rc;
use std::time::Duration; use std::time::Duration;
@@ -28,6 +29,7 @@ use pipewire::spa::utils::{
}; };
use pipewire::spa::{self}; use pipewire::spa::{self};
use pipewire::stream::{Stream, StreamFlags, StreamListener, StreamState}; use pipewire::stream::{Stream, StreamFlags, StreamListener, StreamState};
use pipewire::sys::{pw_buffer, pw_stream_queue_buffer};
use smithay::backend::allocator::dmabuf::{AsDmabuf, Dmabuf}; use smithay::backend::allocator::dmabuf::{AsDmabuf, Dmabuf};
use smithay::backend::allocator::format::FormatSet; use smithay::backend::allocator::format::FormatSet;
use smithay::backend::allocator::gbm::{GbmBuffer, GbmBufferFlags, GbmDevice}; use smithay::backend::allocator::gbm::{GbmBuffer, GbmBufferFlags, GbmDevice};
@@ -36,9 +38,11 @@ use smithay::backend::drm::DrmDeviceFd;
use smithay::backend::renderer::damage::OutputDamageTracker; use smithay::backend::renderer::damage::OutputDamageTracker;
use smithay::backend::renderer::element::RenderElement; use smithay::backend::renderer::element::RenderElement;
use smithay::backend::renderer::gles::GlesRenderer; use smithay::backend::renderer::gles::GlesRenderer;
use smithay::backend::renderer::sync::SyncPoint;
use smithay::output::{Output, OutputModeSource}; use smithay::output::{Output, OutputModeSource};
use smithay::reexports::calloop::generic::Generic; use smithay::reexports::calloop::generic::Generic;
use smithay::reexports::calloop::{Interest, LoopHandle, Mode, PostAction}; use smithay::reexports::calloop::{Interest, LoopHandle, Mode, PostAction};
use smithay::reexports::drm::control::{syncobj, Device as _};
use smithay::reexports::gbm::Modifier; use smithay::reexports::gbm::Modifier;
use smithay::utils::{Physical, Scale, Size, Transform}; use smithay::utils::{Physical, Scale, Size, Transform};
use zbus::object_server::SignalEmitter; use zbus::object_server::SignalEmitter;
@@ -51,6 +55,47 @@ use crate::utils::get_monotonic_time;
// Give a 0.1 ms allowance for presentation time errors. // Give a 0.1 ms allowance for presentation time errors.
const CAST_DELAY_ALLOWANCE: Duration = Duration::from_micros(100); const CAST_DELAY_ALLOWANCE: Duration = Duration::from_micros(100);
// Added in PipeWire 1.2.0.
#[allow(non_upper_case_globals)]
const SPA_META_SyncTimeline: spa_meta_type = 9;
#[allow(non_upper_case_globals)]
const SPA_PARAM_BUFFERS_metaType: spa_param_buffers = 7;
#[allow(non_upper_case_globals)]
const SPA_DATA_SyncObj: spa_data_type = 5;
#[allow(non_camel_case_types)]
#[repr(C)]
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
struct spa_meta_sync_timeline {
pub flags: u32,
pub padding: u32,
pub acquire_point: u64,
pub release_point: u64,
}
/// A map of syncobj fd => handle for proper Drop.
struct SyncobjMap {
gbm: GbmDevice<DrmDeviceFd>,
map: HashMap<RawFd, syncobj::Handle>,
}
impl Drop for SyncobjMap {
fn drop(&mut self) {
if !self.map.is_empty() {
debug!("dropping syncobjs on an abruptly stopped cast");
for (fd, syncobj) in self.map.drain() {
unsafe {
if let Err(err) = self.gbm.destroy_syncobj(syncobj) {
warn!("error destroying syncobj: {err:?}");
}
drop(OwnedFd::from_raw_fd(fd));
}
}
}
}
}
pub struct PipeWire { pub struct PipeWire {
_context: Context, _context: Context,
pub core: Core, pub core: Core,
@@ -80,6 +125,11 @@ pub struct Cast {
pub last_frame_time: Duration, pub last_frame_time: Duration,
min_time_between_frames: Rc<Cell<Duration>>, min_time_between_frames: Rc<Cell<Duration>>,
dmabufs: Rc<RefCell<HashMap<i64, Dmabuf>>>, dmabufs: Rc<RefCell<HashMap<i64, Dmabuf>>>,
syncobjs: Rc<RefCell<SyncobjMap>>,
// Buffers we dequeued from PipeWire that are waiting for their release sync point to be
// signalled before we can use them.
dequeued_buffers: Rc<RefCell<Vec<NonNull<pw_buffer>>>>,
gbm: GbmDevice<DrmDeviceFd>,
scheduled_redraw: Option<RegistrationToken>, scheduled_redraw: Option<RegistrationToken>,
} }
@@ -219,6 +269,12 @@ impl PipeWire {
let is_active = Rc::new(Cell::new(false)); let is_active = Rc::new(Cell::new(false));
let min_time_between_frames = Rc::new(Cell::new(Duration::ZERO)); let min_time_between_frames = Rc::new(Cell::new(Duration::ZERO));
let dmabufs = Rc::new(RefCell::new(HashMap::new())); let dmabufs = Rc::new(RefCell::new(HashMap::new()));
let syncobjs = SyncobjMap {
gbm: gbm.clone(),
map: HashMap::new(),
};
let syncobjs = Rc::new(RefCell::new(syncobjs));
let dequeued_buffers = Rc::new(RefCell::new(Vec::new()));
let refresh = Rc::new(Cell::new(refresh)); let refresh = Rc::new(Cell::new(refresh));
let pending_size = Size::from((size.w as u32, size.h as u32)); let pending_size = Size::from((size.w as u32, size.h as u32));
@@ -493,37 +549,20 @@ impl PipeWire {
} }
}; };
// const BPP: u32 = 4; let o1 = make_buffers_params(plane_count, true);
// let stride = format.size().width * BPP; // Fallback without SyncTimeline.
// let size = stride * format.size().height; let o2 = make_buffers_params(plane_count, false);
let o1 = pod::object!( let o3 = pod::object!(
SpaTypes::ObjectParamBuffers, SpaTypes::ObjectParamMeta,
ParamType::Buffers, ParamType::Meta,
Property::new( Property::new(
SPA_PARAM_BUFFERS_buffers, SPA_PARAM_META_type,
pod::Value::Choice(ChoiceValue::Int(Choice( pod::Value::Id(spa::utils::Id(SPA_META_SyncTimeline))
ChoiceFlags::empty(),
ChoiceEnum::Range {
default: 16,
min: 2,
max: 16
}
))),
), ),
Property::new(SPA_PARAM_BUFFERS_blocks, pod::Value::Int(plane_count)),
// Property::new(SPA_PARAM_BUFFERS_size, pod::Value::Int(size as i32)),
// Property::new(SPA_PARAM_BUFFERS_stride, pod::Value::Int(stride as i32)),
// Property::new(SPA_PARAM_BUFFERS_align, pod::Value::Int(16)),
Property::new( Property::new(
SPA_PARAM_BUFFERS_dataType, SPA_PARAM_META_size,
pod::Value::Choice(ChoiceValue::Int(Choice( pod::Value::Int(size_of::<spa_meta_sync_timeline>() as i32)
ChoiceFlags::empty(),
ChoiceEnum::Flags {
default: 1 << DataType::DmaBuf.as_raw(),
flags: vec![1 << DataType::DmaBuf.as_raw()],
},
))),
), ),
); );
@@ -539,10 +578,14 @@ impl PipeWire {
// pod::Value::Int(size_of::<spa_meta_header>() as i32) // pod::Value::Int(size_of::<spa_meta_header>() as i32)
// ), // ),
// ); // );
let mut b1 = vec![]; let mut b1 = vec![];
// let mut b2 = vec![]; let mut b2 = vec![];
let mut b3 = vec![];
let mut params = [ let mut params = [
make_pod(&mut b1, o1), // make_pod(&mut b2, o2) make_pod(&mut b1, o1),
make_pod(&mut b2, o2),
make_pod(&mut b3, o3),
]; ];
if let Err(err) = stream.update_params(&mut params) { if let Err(err) = stream.update_params(&mut params) {
@@ -552,7 +595,9 @@ impl PipeWire {
} }
}) })
.add_buffer({ .add_buffer({
let gbm = gbm.clone();
let dmabufs = dmabufs.clone(); let dmabufs = dmabufs.clone();
let syncobjs = syncobjs.clone();
let stop_cast = stop_cast.clone(); let stop_cast = stop_cast.clone();
let state = state.clone(); let state = state.clone();
move |stream, (), buffer| { move |stream, (), buffer| {
@@ -592,14 +637,28 @@ impl PipeWire {
} }
}; };
let plane_count = dmabuf.num_planes(); let have_sync_timeline = !spa_buffer_find_meta_data(
assert_eq!((*spa_buffer).n_datas as usize, plane_count); spa_buffer,
SPA_META_SyncTimeline,
mem::size_of::<spa_meta_sync_timeline>(),
)
.is_null();
let mut expected_n_datas = dmabuf.num_planes();
if have_sync_timeline {
expected_n_datas += 2;
}
assert_eq!((*spa_buffer).n_datas as usize, expected_n_datas);
for (i, fd) in dmabuf.handles().enumerate() { for (i, fd) in dmabuf.handles().enumerate() {
let spa_data = (*spa_buffer).datas.add(i); let spa_data = (*spa_buffer).datas.add(i);
assert!((*spa_data).type_ & (1 << DataType::DmaBuf.as_raw()) > 0); assert!((*spa_data).type_ & (1 << DataType::DmaBuf.as_raw()) > 0);
(*spa_data).type_ = DataType::DmaBuf.as_raw(); (*spa_data).type_ = DataType::DmaBuf.as_raw();
// With DMA-BUFs, consumers should ignore the maxsize field, and
// producers are allowed to set it to 0.
//
// https://docs.pipewire.org/page_dma_buf.html
(*spa_data).maxsize = 1; (*spa_data).maxsize = 1;
(*spa_data).fd = fd.as_raw_fd() as i64; (*spa_data).fd = fd.as_raw_fd() as i64;
(*spa_data).flags = SPA_DATA_FLAG_READWRITE; (*spa_data).flags = SPA_DATA_FLAG_READWRITE;
@@ -607,6 +666,12 @@ impl PipeWire {
let fd = (*(*spa_buffer).datas).fd; let fd = (*(*spa_buffer).datas).fd;
assert!(dmabufs.borrow_mut().insert(fd, dmabuf).is_none()); assert!(dmabufs.borrow_mut().insert(fd, dmabuf).is_none());
let syncobjs = &mut *syncobjs.borrow_mut();
if let Err(err) = maybe_create_syncobj(&gbm, spa_buffer, &mut syncobjs.map)
{
warn!("error filling syncobj buffer data: {err:?}");
};
} }
// During size re-negotiation, the stream sometimes just keeps running, in // During size re-negotiation, the stream sometimes just keeps running, in
@@ -618,6 +683,9 @@ impl PipeWire {
}) })
.remove_buffer({ .remove_buffer({
let dmabufs = dmabufs.clone(); let dmabufs = dmabufs.clone();
let syncobjs = syncobjs.clone();
let dequeued_buffers = dequeued_buffers.clone();
let gbm = gbm.clone();
move |_stream, (), buffer| { move |_stream, (), buffer| {
trace!("pw stream: remove_buffer"); trace!("pw stream: remove_buffer");
@@ -627,7 +695,29 @@ impl PipeWire {
assert!((*spa_buffer).n_datas > 0); assert!((*spa_buffer).n_datas > 0);
let fd = (*spa_data).fd; let fd = (*spa_data).fd;
dmabufs.borrow_mut().remove(&fd); if let Some(dmabuf) = dmabufs.borrow_mut().remove(&fd) {
let have_sync_timeline = !spa_buffer_find_meta_data(
spa_buffer,
SPA_META_SyncTimeline,
mem::size_of::<spa_meta_sync_timeline>(),
)
.is_null();
let mut expected_n_datas = dmabuf.num_planes();
if have_sync_timeline {
expected_n_datas += 2;
}
assert_eq!((*spa_buffer).n_datas as usize, expected_n_datas);
let syncobjs = &mut *syncobjs.borrow_mut();
maybe_remove_syncobj(&gbm, spa_buffer, &mut syncobjs.map);
dequeued_buffers
.borrow_mut()
.retain(|buf: &NonNull<_>| buf.as_ptr() != buffer);
} else {
error!("missing dmabuf in remove_buffer()");
}
} }
} }
}) })
@@ -663,6 +753,9 @@ impl PipeWire {
last_frame_time: Duration::ZERO, last_frame_time: Duration::ZERO,
min_time_between_frames, min_time_between_frames,
dmabufs, dmabufs,
syncobjs,
dequeued_buffers,
gbm,
scheduled_redraw: None, scheduled_redraw: None,
}; };
Ok(cast) Ok(cast)
@@ -816,6 +909,33 @@ impl Cast {
} }
} }
fn dequeue_available_buffer(&mut self) -> Option<NonNull<pw_buffer>> {
let mut syncobjs = self.syncobjs.borrow_mut();
let syncobjs = &mut syncobjs.map;
unsafe {
// Check if any already-dequeued buffers are ready.
let mut dequeued_buffers = self.dequeued_buffers.borrow_mut();
for (i, buffer) in dequeued_buffers.iter().enumerate() {
if can_reuse_pw_buffer(&self.gbm, *buffer, syncobjs) {
debug!("buffer is now ready, yielding");
return Some(dequeued_buffers.remove(i));
}
}
while let Some(buffer) = NonNull::new(self.stream.dequeue_raw_buffer()) {
if can_reuse_pw_buffer(&self.gbm, buffer, syncobjs) {
return Some(buffer);
}
debug!("buffer isn't ready yet, storing");
dequeued_buffers.push(buffer);
}
}
None
}
pub fn dequeue_buffer_and_render( pub fn dequeue_buffer_and_render(
&mut self, &mut self,
renderer: &mut GlesRenderer, renderer: &mut GlesRenderer,
@@ -824,7 +944,8 @@ impl Cast {
scale: Scale<f64>, scale: Scale<f64>,
wait_for_sync: bool, wait_for_sync: bool,
) -> bool { ) -> bool {
let CastState::Ready { damage_tracker, .. } = &mut *self.state.borrow_mut() else { let mut state = self.state.borrow_mut();
let CastState::Ready { damage_tracker, .. } = &mut *state else {
error!("cast must be in Ready state to render"); error!("cast must be in Ready state to render");
return false; return false;
}; };
@@ -844,50 +965,75 @@ impl Cast {
trace!("no damage, skipping frame"); trace!("no damage, skipping frame");
return false; return false;
} }
drop(state);
let Some(mut buffer) = self.stream.dequeue_buffer() else { unsafe {
warn!("no available buffer in pw stream, skipping frame"); let Some(pw_buffer) = self.dequeue_available_buffer() else {
return false; warn!("no available buffer in pw stream, skipping frame");
}; return false;
};
let pw_buffer = pw_buffer.as_ptr();
let fd = buffer.datas_mut()[0].as_raw().fd; let spa_buffer = (*pw_buffer).buffer;
let dmabuf = &self.dmabufs.borrow()[&fd]; let fd = (*(*spa_buffer).datas).fd;
let dmabuf = &self.dmabufs.borrow()[&fd];
match render_to_dmabuf( match render_to_dmabuf(
renderer, renderer,
dmabuf.clone(), dmabuf.clone(),
size, size,
scale, scale,
Transform::Normal, Transform::Normal,
elements.iter().rev(), elements.iter().rev(),
) { ) {
Ok(sync_point) => { Ok(sync_point) => {
// FIXME: implement PipeWire explicit sync, and at the very least async wait. // FIXME: implement PipeWire explicit sync, and at the very least async wait.
if wait_for_sync { if wait_for_sync {
let _span = tracy_client::span!("wait for completion"); let _span = tracy_client::span!("wait for completion");
if let Err(err) = sync_point.wait() { if let Err(err) = sync_point.wait() {
warn!("error waiting for pw frame completion: {err:?}"); warn!("error waiting for pw frame completion: {err:?}");
}
} }
let syncobjs = &mut *self.syncobjs.borrow_mut();
if let Err(err) =
maybe_set_sync_points(&self.gbm, spa_buffer, &mut syncobjs.map, &sync_point)
{
warn!("error setting sync point: {err:?}");
};
}
Err(err) => {
warn!("error rendering to dmabuf: {err:?}");
return_unused_buffer(&self.stream, pw_buffer);
return false;
} }
} }
Err(err) => {
warn!("error rendering to dmabuf: {err:?}"); for (i, (stride, offset)) in zip(dmabuf.strides(), dmabuf.offsets()).enumerate() {
return false; let spa_data = (*spa_buffer).datas.add(i);
let chunk = (*spa_data).chunk;
// With DMA-BUFs, consumers should ignore the size field, and producers are allowed
// to set it to 0.
//
// https://docs.pipewire.org/page_dma_buf.html
//
// However, OBS checks for size != 0 as a workaround for old compositor versions,
// so we set it to 1.
(*chunk).size = 1;
// Clear the corrupted flag we may have set before.
(*chunk).flags = SPA_CHUNK_FLAG_NONE as i32;
(*chunk).stride = stride as i32;
(*chunk).offset = offset;
trace!(
"pw buffer: fd = {}, stride = {stride}, offset = {offset}",
(*spa_data).fd
);
} }
}
for (data, (stride, offset)) in pw_stream_queue_buffer(self.stream.as_raw_ptr(), pw_buffer);
zip(buffer.datas_mut(), zip(dmabuf.strides(), dmabuf.offsets()))
{
let chunk = data.chunk_mut();
*chunk.size_mut() = 1;
*chunk.stride_mut() = stride as i32;
*chunk.offset_mut() = offset;
trace!(
"pw buffer: fd = {}, stride = {stride}, offset = {offset}",
data.as_raw().fd
);
} }
true true
@@ -903,42 +1049,66 @@ impl Cast {
*damage_tracker = None; *damage_tracker = None;
}; };
let Some(mut buffer) = self.stream.dequeue_buffer() else { unsafe {
warn!("no available buffer in pw stream, skipping clear"); let Some(pw_buffer) = self.dequeue_available_buffer() else {
return false; warn!("no available buffer in pw stream, skipping clear");
}; return false;
};
let pw_buffer = pw_buffer.as_ptr();
let fd = buffer.datas_mut()[0].as_raw().fd; let spa_buffer = (*pw_buffer).buffer;
let dmabuf = &self.dmabufs.borrow()[&fd]; let fd = (*(*spa_buffer).datas).fd;
let dmabuf = &self.dmabufs.borrow()[&fd];
match clear_dmabuf(renderer, dmabuf.clone()) { match clear_dmabuf(renderer, dmabuf.clone()) {
Ok(sync_point) => { Ok(sync_point) => {
// FIXME: implement PipeWire explicit sync, and at the very least async wait. // FIXME: implement PipeWire explicit sync, and at the very least async wait.
if wait_for_sync { if wait_for_sync {
let _span = tracy_client::span!("wait for completion"); let _span = tracy_client::span!("wait for completion");
if let Err(err) = sync_point.wait() { if let Err(err) = sync_point.wait() {
warn!("error waiting for pw frame completion: {err:?}"); warn!("error waiting for pw frame completion: {err:?}");
}
} }
let syncobjs = &mut *self.syncobjs.borrow_mut();
if let Err(err) =
maybe_set_sync_points(&self.gbm, spa_buffer, &mut syncobjs.map, &sync_point)
{
warn!("error setting sync point: {err:?}");
};
}
Err(err) => {
warn!("error clearing dmabuf: {err:?}");
return_unused_buffer(&self.stream, pw_buffer);
return false;
} }
} }
Err(err) => {
warn!("error clearing dmabuf: {err:?}"); for (i, (stride, offset)) in zip(dmabuf.strides(), dmabuf.offsets()).enumerate() {
return false; let spa_data = (*spa_buffer).datas.add(i);
let chunk = (*spa_data).chunk;
// With DMA-BUFs, consumers should ignore the size field, and producers are allowed
// to set it to 0.
//
// https://docs.pipewire.org/page_dma_buf.html
//
// However, OBS checks for size != 0 as a workaround for old compositor versions,
// so we set it to 1.
(*chunk).size = 1;
// Clear the corrupted flag we may have set before.
(*chunk).flags = SPA_CHUNK_FLAG_NONE as i32;
(*chunk).stride = stride as i32;
(*chunk).offset = offset;
trace!(
"pw buffer: fd = {}, stride = {stride}, offset = {offset}",
(*spa_data).fd
);
} }
}
for (data, (stride, offset)) in pw_stream_queue_buffer(self.stream.as_raw_ptr(), pw_buffer);
zip(buffer.datas_mut(), zip(dmabuf.strides(), dmabuf.offsets()))
{
let chunk = data.chunk_mut();
*chunk.size_mut() = 1;
*chunk.stride_mut() = stride as i32;
*chunk.offset_mut() = offset;
trace!(
"pw buffer: fd = {}, stride = {stride}, offset = {offset}",
data.as_raw().fd
);
} }
true true
@@ -1042,6 +1212,52 @@ fn make_video_params(
) )
} }
fn make_buffers_params(mut plane_count: i32, sync_timeline: bool) -> pod::Object {
if sync_timeline {
// Two extra file descriptors for acquire and release.
plane_count += 2;
}
let mut object = pod::object!(
SpaTypes::ObjectParamBuffers,
ParamType::Buffers,
Property::new(
SPA_PARAM_BUFFERS_buffers,
pod::Value::Choice(ChoiceValue::Int(Choice(
ChoiceFlags::empty(),
ChoiceEnum::Range {
default: 16,
min: 2,
max: 16
}
))),
),
Property::new(SPA_PARAM_BUFFERS_blocks, pod::Value::Int(plane_count)),
Property::new(
SPA_PARAM_BUFFERS_dataType,
pod::Value::Choice(ChoiceValue::Int(Choice(
ChoiceFlags::empty(),
ChoiceEnum::Flags {
default: 1 << DataType::DmaBuf.as_raw(),
flags: vec![1 << DataType::DmaBuf.as_raw()],
},
))),
),
);
if sync_timeline {
// TODO: do we need to gate this behind runtime check for PW 1.2.0? What happens on older
// PW?
object.properties.push(Property {
key: SPA_PARAM_BUFFERS_metaType,
flags: PropertyFlags::MANDATORY,
value: pod::Value::Int(1 << SPA_META_SyncTimeline),
});
}
object
}
fn make_pod(buffer: &mut Vec<u8>, object: pod::Object) -> &Pod { fn make_pod(buffer: &mut Vec<u8>, object: pod::Object) -> &Pod {
PodSerializer::serialize(Cursor::new(&mut *buffer), &pod::Value::Object(object)).unwrap(); PodSerializer::serialize(Cursor::new(&mut *buffer), &pod::Value::Object(object)).unwrap();
Pod::from_bytes(buffer).unwrap() Pod::from_bytes(buffer).unwrap()
@@ -1111,3 +1327,238 @@ fn allocate_dmabuf(
.context("error exporting GBM buffer object as dmabuf")?; .context("error exporting GBM buffer object as dmabuf")?;
Ok(dmabuf) Ok(dmabuf)
} }
unsafe fn maybe_create_syncobj(
gbm: &GbmDevice<DrmDeviceFd>,
spa_buffer: *mut spa_buffer,
syncobjs: &mut HashMap<RawFd, syncobj::Handle>,
) -> anyhow::Result<()> {
unsafe {
let sync_timeline: *mut spa_meta_sync_timeline = spa_buffer_find_meta_data(
spa_buffer,
SPA_META_SyncTimeline,
mem::size_of::<spa_meta_sync_timeline>(),
)
.cast();
if sync_timeline.is_null() {
return Ok(());
}
let syncobj = gbm
.create_syncobj(false)
.context("error creating syncobj")?;
let fd = match gbm.syncobj_to_fd(syncobj, false) {
Ok(x) => x,
Err(err) => {
let _ = gbm.destroy_syncobj(syncobj);
return Err(err).context("error exporting syncobj to fd");
}
};
debug!("filling syncobj fd={fd:?}");
let n_datas = (*spa_buffer).n_datas as usize;
assert!(n_datas >= 2);
let acquire_data = (*spa_buffer).datas.add(n_datas - 2);
(*acquire_data).type_ = SPA_DATA_SyncObj;
(*acquire_data).flags = SPA_DATA_FLAG_READABLE;
(*acquire_data).fd = i64::from(fd.as_raw_fd());
let release_data = (*spa_buffer).datas.add(n_datas - 1);
(*release_data).type_ = SPA_DATA_SyncObj;
(*release_data).flags = SPA_DATA_FLAG_READABLE;
(*release_data).fd = i64::from(fd.as_raw_fd());
syncobjs.insert(fd.into_raw_fd(), syncobj);
Ok(())
}
}
unsafe fn maybe_remove_syncobj(
gbm: &GbmDevice<DrmDeviceFd>,
spa_buffer: *mut spa_buffer,
syncobjs: &mut HashMap<RawFd, syncobj::Handle>,
) {
unsafe {
let sync_timeline: *mut spa_meta_sync_timeline = spa_buffer_find_meta_data(
spa_buffer,
SPA_META_SyncTimeline,
mem::size_of::<spa_meta_sync_timeline>(),
)
.cast();
if sync_timeline.is_null() {
return;
}
let n_datas = (*spa_buffer).n_datas as usize;
assert!(n_datas >= 2);
let acquire_data = (*spa_buffer).datas.add(n_datas - 2);
let fd = (*acquire_data).fd as RawFd;
debug!("removing syncobj fd={fd:?}");
let Some(syncobj) = syncobjs.remove(&fd) else {
error!("missing syncobj in remove_buffer()");
return;
};
if let Err(err) = gbm.destroy_syncobj(syncobj) {
warn!("error destroying syncobj: {err:?}");
}
drop(OwnedFd::from_raw_fd(fd));
}
}
unsafe fn maybe_set_sync_points(
gbm: &GbmDevice<DrmDeviceFd>,
spa_buffer: *mut spa_buffer,
syncobjs: &mut HashMap<RawFd, syncobj::Handle>,
sync_point: &SyncPoint,
) -> anyhow::Result<()> {
unsafe {
let sync_timeline: *mut spa_meta_sync_timeline = spa_buffer_find_meta_data(
spa_buffer,
SPA_META_SyncTimeline,
mem::size_of::<spa_meta_sync_timeline>(),
)
.cast();
if sync_timeline.is_null() {
return Ok(());
}
// At this point, we must ensure that our syncobj contains a fence, since clients can do a
// blocking wait until the fence is available (OBS does this).
// TODO
let n_datas = (*spa_buffer).n_datas as usize;
assert!(n_datas >= 2);
let acquire_data = (*spa_buffer).datas.add(n_datas - 2);
let fd = (*acquire_data).fd as RawFd;
let Some(syncobj) = syncobjs.get(&fd) else {
error!("missing syncobj in maybe_set_sync_points()");
return Ok(());
};
let Some(sync_fd) = sync_point.export() else {
debug!("have sync_timeline but no sync_fd to export");
return Ok(());
};
let acquire_point = (*sync_timeline).release_point + 1;
// Import sync_fd into our syncobj at the correct point.
let tmp = gbm
.create_syncobj(false)
.context("error creating temp syncobj")?;
let res = drm_import_sync_file(gbm, tmp, sync_fd.as_fd())
.context("error importing sync_fd to temp syncobj");
let res = if res.is_ok() {
gbm.syncobj_timeline_transfer(tmp, *syncobj, 0, acquire_point)
.context("error transferring sync point")
} else {
res
};
let _ = gbm.destroy_syncobj(tmp);
let () = res?;
(*sync_timeline).acquire_point = acquire_point;
(*sync_timeline).release_point = acquire_point + 1;
debug!("set sync timeline fd={fd:?} to acquire={acquire_point}");
Ok(())
}
}
// Our own version until drm-ffi is fixed:
// https://github.com/Smithay/drm-rs/issues/224
unsafe fn drm_import_sync_file(
gbm: &GbmDevice<DrmDeviceFd>,
syncobj: syncobj::Handle,
sync_file: BorrowedFd,
) -> io::Result<()> {
use drm_ffi::drm_sys::*;
use rustix::ioctl::{self, ioctl, Opcode, Updater};
use smithay::reexports::rustix;
unsafe fn fd_to_handle(fd: BorrowedFd, data: &mut drm_syncobj_handle) -> io::Result<()> {
const OPCODE: Opcode =
ioctl::opcode::read_write::<drm_syncobj_handle>(DRM_IOCTL_BASE, 0xC2);
Ok(ioctl(fd, Updater::<OPCODE, drm_syncobj_handle>::new(data))?)
}
let mut args = drm_syncobj_handle {
handle: u32::from(syncobj),
flags: DRM_SYNCOBJ_FD_TO_HANDLE_FLAGS_IMPORT_SYNC_FILE,
fd: sync_file.as_raw_fd(),
pad: 0,
};
unsafe { fd_to_handle(gbm.as_fd(), &mut args) }
}
unsafe fn can_reuse_pw_buffer(
gbm: &GbmDevice<DrmDeviceFd>,
pw_buffer: NonNull<pw_buffer>,
syncobjs: &mut HashMap<RawFd, syncobj::Handle>,
) -> bool {
unsafe {
let spa_buffer = (*pw_buffer.as_ptr()).buffer;
let sync_timeline: *mut spa_meta_sync_timeline = spa_buffer_find_meta_data(
spa_buffer,
SPA_META_SyncTimeline,
mem::size_of::<spa_meta_sync_timeline>(),
)
.cast();
if sync_timeline.is_null() {
// No explicit sync, can always reuse.
return true;
}
let n_datas = (*spa_buffer).n_datas as usize;
assert!(n_datas >= 2);
let release_data = (*spa_buffer).datas.add(n_datas - 1);
let fd = (*release_data).fd as RawFd;
let Some(syncobj) = syncobjs.get(&fd) else {
error!("missing syncobj in can_reuse_pw_buffer()");
return false;
};
let mut points = [0];
if let Err(err) = gbm.syncobj_timeline_query(&[*syncobj], &mut points, false) {
warn!("error querying timeline signaled point: {err:?}");
return false;
}
// For fresh buffers, this will return 0 and the condition will work out to true.
let latest_signaled_point = points[0];
debug!(
"latest signaled point for fd={fd:?} is {latest_signaled_point}; release point is {}",
(*sync_timeline).release_point
);
latest_signaled_point >= (*sync_timeline).release_point
}
}
unsafe fn return_unused_buffer(stream: &Stream, pw_buffer: *mut pw_buffer) {
// pw_stream_return_buffer() requires too new PipeWire (1.4.0). So, mark as
// corrupted and queue.
let spa_buffer = (*pw_buffer).buffer;
let chunk = (*(*spa_buffer).datas).chunk;
(*chunk).size = 0;
(*chunk).flags = SPA_CHUNK_FLAG_CORRUPTED as i32;
pw_stream_queue_buffer(stream.as_raw_ptr(), pw_buffer);
}