pw_utils: Extract shared state to CastInner

This commit is contained in:
Ivan Molodetskikh
2025-08-02 13:29:08 +03:00
parent 53b7c08363
commit 2e3935d77d
2 changed files with 76 additions and 54 deletions

View File

@@ -5002,7 +5002,7 @@ impl Niri {
let mut casts = mem::take(&mut self.casts);
for cast in &mut casts {
if !cast.is_active.get() {
if !cast.is_active() {
continue;
}
@@ -5058,7 +5058,7 @@ impl Niri {
let mut casts = mem::take(&mut self.casts);
for cast in &mut casts {
if !cast.is_active.get() {
if !cast.is_active() {
continue;
}

View File

@@ -1,4 +1,4 @@
use std::cell::{Cell, RefCell};
use std::cell::RefCell;
use std::collections::HashMap;
use std::io::Cursor;
use std::iter::zip;
@@ -69,18 +69,25 @@ pub struct Cast {
pub stream_id: usize,
pub stream: Stream,
_listener: StreamListener<()>,
pub is_active: Rc<Cell<bool>>,
pub target: CastTarget,
pub dynamic_target: bool,
formats: FormatSet,
state: Rc<RefCell<CastState>>,
refresh: Rc<Cell<u32>>,
offer_alpha: bool,
pub cursor_mode: CursorMode,
pub last_frame_time: Duration,
min_time_between_frames: Rc<Cell<Duration>>,
dmabufs: Rc<RefCell<HashMap<i64, Dmabuf>>>,
scheduled_redraw: Option<RegistrationToken>,
inner: Rc<RefCell<CastInner>>,
}
/// Mutable `Cast` state shared with PipeWire callbacks.
#[derive(Debug)]
struct CastInner {
is_active: bool,
node_id: Option<u32>,
state: CastState,
refresh: u32,
min_time_between_frames: Duration,
dmabufs: HashMap<i64, Dmabuf>,
}
#[allow(clippy::large_enum_variant)]
@@ -214,29 +221,32 @@ impl PipeWire {
let stream = Stream::new(&self.core, "niri-screen-cast-src", Properties::new())
.context("error creating Stream")?;
// Like in good old wayland-rs times...
let node_id = Rc::new(Cell::new(None));
let is_active = Rc::new(Cell::new(false));
let min_time_between_frames = Rc::new(Cell::new(Duration::ZERO));
let dmabufs = Rc::new(RefCell::new(HashMap::new()));
let refresh = Rc::new(Cell::new(refresh));
let pending_size = Size::from((size.w as u32, size.h as u32));
let state = Rc::new(RefCell::new(CastState::ResizePending { pending_size }));
// Like in good old wayland-rs times...
let inner = Rc::new(RefCell::new(CastInner {
is_active: false,
node_id: None,
state: CastState::ResizePending { pending_size },
refresh,
min_time_between_frames: Duration::ZERO,
dmabufs: HashMap::new(),
}));
let listener = stream
.add_local_listener_with_user_data(())
.state_changed({
let is_active = is_active.clone();
let inner = inner.clone();
let stop_cast = stop_cast.clone();
move |stream, (), old, new| {
debug!("pw stream: state changed: {old:?} -> {new:?}");
let mut inner = inner.borrow_mut();
match new {
StreamState::Paused => {
if node_id.get().is_none() {
if inner.node_id.is_none() {
let id = stream.node_id();
node_id.set(Some(id));
inner.node_id = Some(id);
debug!("pw stream: sending signal with {id}");
let _span = tracy_client::span!("sending PipeWireStreamAdded");
@@ -254,33 +264,33 @@ impl PipeWire {
});
}
is_active.set(false);
inner.is_active = false;
}
StreamState::Error(_) => {
if is_active.get() {
is_active.set(false);
if inner.is_active {
inner.is_active = false;
stop_cast();
}
}
StreamState::Unconnected => (),
StreamState::Connecting => (),
StreamState::Streaming => {
is_active.set(true);
inner.is_active = true;
redraw();
}
}
}
})
.param_changed({
let min_time_between_frames = min_time_between_frames.clone();
let inner = inner.clone();
let stop_cast = stop_cast.clone();
let state = state.clone();
let gbm = gbm.clone();
let formats = formats.clone();
let refresh = refresh.clone();
move |stream, (), id, pod| {
let id = ParamType::from_raw(id);
trace!(?id, "pw stream: param_changed");
let mut inner = inner.borrow_mut();
let inner = &mut *inner;
if id != ParamType::Format {
return;
@@ -306,7 +316,7 @@ impl PipeWire {
let format_size = Size::from((format.size().width, format.size().height));
let mut state = state.borrow_mut();
let state = &mut inner.state;
if format_size != state.expected_format_size() {
if !matches!(&*state, CastState::ResizePending { .. }) {
warn!("pw stream: wrong size, but we're not resizing");
@@ -329,7 +339,7 @@ impl PipeWire {
let min_frame_time = Duration::from_micros(
1_000_000 * u64::from(max_frame_rate.denom) / u64::from(max_frame_rate.num),
);
min_time_between_frames.set(min_frame_time);
inner.min_time_between_frames = min_frame_time;
let object = pod.as_object().unwrap();
let Some(prop_modifier) =
@@ -396,7 +406,7 @@ impl PipeWire {
let o1 = make_video_params(
&fixated_format,
format_size,
refresh.get(),
inner.refresh,
format_has_alpha,
);
let pod1 = make_pod(&mut b1, o1);
@@ -404,7 +414,7 @@ impl PipeWire {
let o2 = make_video_params(
&formats,
format_size,
refresh.get(),
inner.refresh,
format_has_alpha,
);
let mut params = [pod1, make_pod(&mut b2, o2)];
@@ -552,16 +562,17 @@ impl PipeWire {
}
})
.add_buffer({
let dmabufs = dmabufs.clone();
let inner = inner.clone();
let stop_cast = stop_cast.clone();
let state = state.clone();
move |stream, (), buffer| {
let mut inner = inner.borrow_mut();
let (size, alpha, modifier) = if let CastState::Ready {
size,
alpha,
modifier,
..
} = &*state.borrow()
} = &inner.state
{
(*size, *alpha, *modifier)
} else {
@@ -610,20 +621,21 @@ impl PipeWire {
}
let fd = (*(*spa_buffer).datas).fd;
assert!(dmabufs.borrow_mut().insert(fd, dmabuf).is_none());
assert!(inner.dmabufs.insert(fd, dmabuf).is_none());
}
// During size re-negotiation, the stream sometimes just keeps running, in
// which case we may need to force a redraw once we got a newly sized buffer.
if dmabufs.borrow().len() == 1 && stream.state() == StreamState::Streaming {
if inner.dmabufs.len() == 1 && stream.state() == StreamState::Streaming {
redraw_();
}
}
})
.remove_buffer({
let dmabufs = dmabufs.clone();
let inner = inner.clone();
move |_stream, (), buffer| {
trace!("pw stream: remove_buffer");
let mut inner = inner.borrow_mut();
unsafe {
let spa_buffer = (*buffer).buffer;
@@ -631,7 +643,7 @@ impl PipeWire {
assert!((*spa_buffer).n_datas > 0);
let fd = (*spa_data).fd;
dmabufs.borrow_mut().remove(&fd);
inner.dmabufs.remove(&fd);
}
}
})
@@ -641,7 +653,7 @@ impl PipeWire {
trace!("starting pw stream with size={pending_size:?}, refresh={refresh:?}");
let params;
make_params!(params, &formats, pending_size, refresh.get(), alpha);
make_params!(params, &formats, pending_size, refresh, alpha);
stream
.connect(
Direction::Output,
@@ -656,29 +668,31 @@ impl PipeWire {
stream_id,
stream,
_listener: listener,
is_active,
target,
dynamic_target,
formats,
state,
refresh,
offer_alpha: alpha,
cursor_mode,
last_frame_time: Duration::ZERO,
min_time_between_frames,
dmabufs,
scheduled_redraw: None,
inner,
};
Ok(cast)
}
}
impl Cast {
pub fn is_active(&self) -> bool {
self.inner.borrow().is_active
}
pub fn ensure_size(&self, size: Size<i32, Physical>) -> anyhow::Result<CastSizeChange> {
let mut inner = self.inner.borrow_mut();
let new_size = Size::from((size.w as u32, size.h as u32));
let mut state = self.state.borrow_mut();
if matches!(&*state, CastState::Ready { size, .. } if *size == new_size) {
let state = &mut inner.state;
if matches!(state, CastState::Ready { size, .. } if *size == new_size) {
return Ok(CastSizeChange::Ready);
}
@@ -699,7 +713,7 @@ impl Cast {
params,
&self.formats,
new_size,
self.refresh.get(),
inner.refresh,
self.offer_alpha
);
self.stream
@@ -710,15 +724,17 @@ impl Cast {
}
pub fn set_refresh(&mut self, refresh: u32) -> anyhow::Result<()> {
if self.refresh.get() == refresh {
let mut inner = self.inner.borrow_mut();
if inner.refresh == refresh {
return Ok(());
}
let _span = tracy_client::span!("Cast::set_refresh");
debug!("cast FPS changed, updating stream FPS");
self.refresh.set(refresh);
inner.refresh = refresh;
let size = self.state.borrow().expected_format_size();
let size = inner.state.expected_format_size();
let params;
make_params!(params, &self.formats, size, refresh, self.offer_alpha);
self.stream
@@ -729,8 +745,10 @@ impl Cast {
}
fn compute_extra_delay(&self, target_frame_time: Duration) -> Duration {
let inner = self.inner.borrow();
let last = self.last_frame_time;
let min = self.min_time_between_frames.get();
let min = inner.min_time_between_frames;
if last.is_zero() {
trace!(?target_frame_time, ?last, "last is zero, recording");
@@ -828,7 +846,9 @@ impl Cast {
scale: Scale<f64>,
wait_for_sync: bool,
) -> bool {
let CastState::Ready { damage_tracker, .. } = &mut *self.state.borrow_mut() else {
let mut inner = self.inner.borrow_mut();
let CastState::Ready { damage_tracker, .. } = &mut inner.state else {
error!("cast must be in Ready state to render");
return false;
};
@@ -855,7 +875,7 @@ impl Cast {
};
let fd = buffer.datas_mut()[0].as_raw().fd;
let dmabuf = &self.dmabufs.borrow()[&fd];
let dmabuf = &inner.dmabufs[&fd];
match render_to_dmabuf(
renderer,
@@ -909,8 +929,10 @@ impl Cast {
renderer: &mut GlesRenderer,
wait_for_sync: bool,
) -> bool {
let mut inner = self.inner.borrow_mut();
// Clear out the damage tracker if we're in Ready state.
if let CastState::Ready { damage_tracker, .. } = &mut *self.state.borrow_mut() {
if let CastState::Ready { damage_tracker, .. } = &mut inner.state {
*damage_tracker = None;
};
@@ -920,7 +942,7 @@ impl Cast {
};
let fd = buffer.datas_mut()[0].as_raw().fd;
let dmabuf = &self.dmabufs.borrow()[&fd];
let dmabuf = &inner.dmabufs[&fd];
match clear_dmabuf(renderer, dmabuf.clone()) {
Ok(sync_point) => {