Implement the event stream IPC

This commit is contained in:
Ivan Molodetskikh
2024-06-20 12:04:10 +03:00
parent 8eb34b2e18
commit 30b213601a
12 changed files with 827 additions and 104 deletions
+98
View File
@@ -9,6 +9,8 @@ use serde::{Deserialize, Serialize};
mod socket;
pub use socket::{Socket, SOCKET_PATH_ENV};
pub mod state;
/// Request from client to niri.
#[derive(Debug, Serialize, Deserialize, Clone)]
#[cfg_attr(feature = "json-schema", derive(schemars::JsonSchema))]
@@ -38,6 +40,11 @@ pub enum Request {
FocusedOutput,
/// Request information about the keyboard layout.
KeyboardLayouts,
/// Start continuously receiving events from the compositor.
///
/// The compositor should reply with `Reply::Ok(Response::Handled)`, then continuously send
/// [`Event`]s, one per line.
EventStream,
/// Respond with an error (for testing error handling).
ReturnError,
}
@@ -536,10 +543,18 @@ pub enum Transform {
#[derive(Serialize, Deserialize, Debug, Clone)]
#[cfg_attr(feature = "json-schema", derive(schemars::JsonSchema))]
pub struct Window {
/// Unique id of this window.
pub id: u64,
/// Title, if set.
pub title: Option<String>,
/// Application ID, if set.
pub app_id: Option<String>,
/// Id of the workspace this window is on, if any.
pub workspace_id: Option<u64>,
/// Whether this window is currently focused.
///
/// There can be either one focused window or zero (e.g. when a layer-shell surface has focus).
pub is_focused: bool,
}
/// Output configuration change result.
@@ -556,6 +571,10 @@ pub enum OutputConfigChanged {
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "json-schema", derive(schemars::JsonSchema))]
pub struct Workspace {
/// Unique id of this workspace.
///
/// This id remains constant regardless of the workspace moving around and across monitors.
pub id: u64,
/// Index of the workspace on its monitor.
///
/// This is the same index you can use for requests like `niri msg action focus-workspace`.
@@ -567,7 +586,15 @@ pub struct Workspace {
/// Can be `None` if no outputs are currently connected.
pub output: Option<String>,
/// Whether the workspace is currently active on its output.
///
/// Every output has one active workspace, the one that is currently visible on that output.
pub is_active: bool,
/// Whether the workspace is currently focused.
///
/// There's only one focused workspace across all outputs.
pub is_focused: bool,
/// Id of the active window on this workspace, if any.
pub active_window_id: Option<u64>,
}
/// Configured keyboard layouts.
@@ -580,6 +607,77 @@ pub struct KeyboardLayouts {
pub current_idx: u8,
}
/// A compositor event.
#[derive(Serialize, Deserialize, Debug, Clone)]
#[cfg_attr(feature = "json-schema", derive(schemars::JsonSchema))]
pub enum Event {
/// The workspace configuration has changed.
WorkspacesChanged {
/// The new workspace configuration.
///
/// This configuration completely replaces the previous configuration. I.e. if any
/// workspaces are missing from here, then they were deleted.
workspaces: Vec<Workspace>,
},
/// A workspace was activated on an output.
///
/// This doesn't always mean the workspace became focused, just that it's now the active
/// workspace on its output. All other workspaces on the same output become inactive.
WorkspaceActivated {
/// Id of the newly active workspace.
id: u64,
/// Whether this workspace also became focused.
///
/// If `true`, this is now the single focused workspace. All other workspaces are no longer
/// focused, but they may remain active on their respective outputs.
focused: bool,
},
/// An active window changed on a workspace.
WorkspaceActiveWindowChanged {
/// Id of the workspace on which the active window changed.
workspace_id: u64,
/// Id of the new active window, if any.
active_window_id: Option<u64>,
},
/// The window configuration has changed.
WindowsChanged {
/// The new window configuration.
///
/// This configuration completely replaces the previous configuration. I.e. if any windows
/// are missing from here, then they were closed.
windows: Vec<Window>,
},
/// A new toplevel window was opened, or an existing toplevel window changed.
WindowOpenedOrChanged {
/// The new or updated window.
///
/// If the window is focused, all other windows are no longer focused.
window: Window,
},
/// A toplevel window was closed.
WindowClosed {
/// Id of the removed window.
id: u64,
},
/// Window focus changed.
///
/// All other windows are no longer focused.
WindowFocusChanged {
/// Id of the newly focused window, or `None` if no window is now focused.
id: Option<u64>,
},
/// The configured keyboard layouts have changed.
KeyboardLayoutsChanged {
/// The new keyboard layout configuration.
keyboard_layouts: KeyboardLayouts,
},
/// The keyboard layout switched.
KeyboardLayoutSwitched {
/// Index of the newly active layout.
idx: u8,
},
}
impl FromStr for WorkspaceReferenceArg {
type Err = &'static str;
+15 -3
View File
@@ -6,7 +6,7 @@ use std::net::Shutdown;
use std::os::unix::net::UnixStream;
use std::path::Path;
use crate::{Reply, Request};
use crate::{Event, Reply, Request};
/// Name of the environment variable containing the niri IPC socket path.
pub const SOCKET_PATH_ENV: &str = "NIRI_SOCKET";
@@ -47,7 +47,11 @@ impl Socket {
/// * `Ok(Ok(response))`: successful [`Response`](crate::Response) from niri
/// * `Ok(Err(message))`: error message from niri
/// * `Err(error)`: error communicating with niri
pub fn send(self, request: Request) -> io::Result<Reply> {
///
/// This method also returns a blocking function that you can call to keep reading [`Event`]s
/// after requesting an [`EventStream`][Request::EventStream]. This function is not useful
/// otherwise.
pub fn send(self, request: Request) -> io::Result<(Reply, impl FnMut() -> io::Result<Event>)> {
let Self { mut stream } = self;
let mut buf = serde_json::to_string(&request).unwrap();
@@ -60,6 +64,14 @@ impl Socket {
reader.read_line(&mut buf)?;
let reply = serde_json::from_str(&buf)?;
Ok(reply)
let events = move || {
buf.clear();
reader.read_line(&mut buf)?;
let event = serde_json::from_str(&buf)?;
Ok(event)
};
Ok((reply, events))
}
}
+188
View File
@@ -0,0 +1,188 @@
//! Helpers for keeping track of the event stream state.
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use crate::{Event, KeyboardLayouts, Window, Workspace};
/// Part of the state communicated via the event stream.
pub trait EventStreamStatePart {
/// Returns a sequence of events that replicates this state from default initialization.
fn replicate(&self) -> Vec<Event>;
/// Applies the event to this state.
///
/// Returns `None` after applying the event, and `Some(event)` if the event is ignored by this
/// part of the state.
fn apply(&mut self, event: Event) -> Option<Event>;
}
/// The full state communicated over the event stream.
///
/// Different parts of the state are not guaranteed to be consistent across every single event
/// sent by niri. For example, you may receive the first [`Event::WindowOpenedOrChanged`] for a
/// just-opened window *after* an [`Event::WorkspaceActiveWindowChanged`] for that window. Between
/// these two events, the workspace active window id refers to a window that does not yet exist in
/// the windows state part.
#[derive(Debug, Default)]
pub struct EventStreamState {
/// State of workspaces.
pub workspaces: WorkspacesState,
/// State of workspaces.
pub windows: WindowsState,
/// State of the keyboard layouts.
pub keyboard_layouts: KeyboardLayoutsState,
}
/// The workspaces state communicated over the event stream.
#[derive(Debug, Default)]
pub struct WorkspacesState {
/// Map from a workspace id to the workspace.
pub workspaces: HashMap<u64, Workspace>,
}
/// The windows state communicated over the event stream.
#[derive(Debug, Default)]
pub struct WindowsState {
/// Map from a window id to the window.
pub windows: HashMap<u64, Window>,
}
/// The keyboard layout state communicated over the event stream.
#[derive(Debug, Default)]
pub struct KeyboardLayoutsState {
/// Configured keyboard layouts.
pub keyboard_layouts: Option<KeyboardLayouts>,
}
impl EventStreamStatePart for EventStreamState {
fn replicate(&self) -> Vec<Event> {
let mut events = Vec::new();
events.extend(self.workspaces.replicate());
events.extend(self.windows.replicate());
events.extend(self.keyboard_layouts.replicate());
events
}
fn apply(&mut self, event: Event) -> Option<Event> {
let event = self.workspaces.apply(event)?;
let event = self.windows.apply(event)?;
let event = self.keyboard_layouts.apply(event)?;
Some(event)
}
}
impl EventStreamStatePart for WorkspacesState {
fn replicate(&self) -> Vec<Event> {
let workspaces = self.workspaces.values().cloned().collect();
vec![Event::WorkspacesChanged { workspaces }]
}
fn apply(&mut self, event: Event) -> Option<Event> {
match event {
Event::WorkspacesChanged { workspaces } => {
self.workspaces = workspaces.into_iter().map(|ws| (ws.id, ws)).collect();
}
Event::WorkspaceActivated { id, focused } => {
let ws = self.workspaces.get(&id);
let ws = ws.expect("activated workspace was missing from the map");
let output = ws.output.clone();
for ws in self.workspaces.values_mut() {
let got_activated = ws.id == id;
if ws.output == output {
ws.is_active = got_activated;
}
if focused {
ws.is_focused = got_activated;
}
}
}
Event::WorkspaceActiveWindowChanged {
workspace_id,
active_window_id,
} => {
let ws = self.workspaces.get_mut(&workspace_id);
let ws = ws.expect("changed workspace was missing from the map");
ws.active_window_id = active_window_id;
}
event => return Some(event),
}
None
}
}
impl EventStreamStatePart for WindowsState {
fn replicate(&self) -> Vec<Event> {
let windows = self.windows.values().cloned().collect();
vec![Event::WindowsChanged { windows }]
}
fn apply(&mut self, event: Event) -> Option<Event> {
match event {
Event::WindowsChanged { windows } => {
self.windows = windows.into_iter().map(|win| (win.id, win)).collect();
}
Event::WindowOpenedOrChanged { window } => {
let (id, is_focused) = match self.windows.entry(window.id) {
Entry::Occupied(mut entry) => {
let entry = entry.get_mut();
*entry = window;
(entry.id, entry.is_focused)
}
Entry::Vacant(entry) => {
let entry = entry.insert(window);
(entry.id, entry.is_focused)
}
};
if is_focused {
for win in self.windows.values_mut() {
if win.id != id {
win.is_focused = false;
}
}
}
}
Event::WindowClosed { id } => {
let win = self.windows.remove(&id);
win.expect("closed window was missing from the map");
}
Event::WindowFocusChanged { id } => {
for win in self.windows.values_mut() {
win.is_focused = Some(win.id) == id;
}
}
event => return Some(event),
}
None
}
}
impl EventStreamStatePart for KeyboardLayoutsState {
fn replicate(&self) -> Vec<Event> {
if let Some(keyboard_layouts) = self.keyboard_layouts.clone() {
vec![Event::KeyboardLayoutsChanged { keyboard_layouts }]
} else {
vec![]
}
}
fn apply(&mut self, event: Event) -> Option<Event> {
match event {
Event::KeyboardLayoutsChanged { keyboard_layouts } => {
self.keyboard_layouts = Some(keyboard_layouts);
}
Event::KeyboardLayoutSwitched { idx } => {
let kb = self.keyboard_layouts.as_mut();
let kb = kb.expect("keyboard layouts must be set before a layout can be switched");
kb.current_idx = idx;
}
event => return Some(event),
}
None
}
}
+2
View File
@@ -88,6 +88,8 @@ pub enum Msg {
},
/// Get the configured keyboard layouts.
KeyboardLayouts,
/// Start continuously receiving events from the compositor.
EventStream,
/// Print the version of the running niri instance.
Version,
/// Request an error from the running niri instance.
+11 -6
View File
@@ -16,7 +16,7 @@ use smithay::backend::input::{
TabletToolProximityEvent, TabletToolTipEvent, TabletToolTipState, TouchEvent,
};
use smithay::backend::libinput::LibinputInputBackend;
use smithay::input::keyboard::{keysyms, FilterResult, Keysym, ModifiersState};
use smithay::input::keyboard::{keysyms, FilterResult, Keysym, ModifiersState, XkbContextHandler};
use smithay::input::pointer::{
AxisFrame, ButtonEvent, CursorIcon, CursorImageStatus, Focus, GestureHoldBeginEvent,
GestureHoldEndEvent, GesturePinchBeginEvent, GesturePinchEndEvent, GesturePinchUpdateEvent,
@@ -539,13 +539,18 @@ impl State {
}
}
Action::SwitchLayout(action) => {
self.niri.seat.get_keyboard().unwrap().with_xkb_state(
self,
|mut state| match action {
let keyboard = &self.niri.seat.get_keyboard().unwrap();
let new_idx = keyboard.with_xkb_state(self, |mut state| {
match action {
LayoutSwitchTarget::Next => state.cycle_next_layout(),
LayoutSwitchTarget::Prev => state.cycle_prev_layout(),
},
);
};
state.active_layout().0
});
if let Some(server) = &self.niri.ipc_server {
server.keyboard_layout_switched(new_idx as u8);
}
}
Action::MoveColumnLeft => {
self.niri.layout.move_left();
+61 -3
View File
@@ -1,7 +1,7 @@
use anyhow::{anyhow, bail, Context};
use niri_ipc::{
KeyboardLayouts, LogicalOutput, Mode, Output, OutputConfigChanged, Request, Response, Socket,
Transform,
Event, KeyboardLayouts, LogicalOutput, Mode, Output, OutputConfigChanged, Request, Response,
Socket, Transform,
};
use serde_json::json;
@@ -21,12 +21,13 @@ pub fn handle_msg(msg: Msg, json: bool) -> anyhow::Result<()> {
},
Msg::Workspaces => Request::Workspaces,
Msg::KeyboardLayouts => Request::KeyboardLayouts,
Msg::EventStream => Request::EventStream,
Msg::RequestError => Request::ReturnError,
};
let socket = Socket::connect().context("error connecting to the niri socket")?;
let reply = socket
let (reply, mut read_event) = socket
.send(request)
.context("error communicating with niri")?;
@@ -37,6 +38,7 @@ pub fn handle_msg(msg: Msg, json: bool) -> anyhow::Result<()> {
Socket::connect()
.and_then(|socket| socket.send(Request::Version))
.ok()
.map(|(reply, _read_event)| reply)
}
_ => None,
};
@@ -261,6 +263,62 @@ pub fn handle_msg(msg: Msg, json: bool) -> anyhow::Result<()> {
println!("{is_active}{idx} {name}");
}
}
Msg::EventStream => {
let Response::Handled = response else {
bail!("unexpected response: expected Handled, got {response:?}");
};
if !json {
println!("Started reading events.");
}
loop {
let event = read_event().context("error reading event from niri")?;
if json {
let event = serde_json::to_string(&event).context("error formatting event")?;
println!("{event}");
continue;
}
match event {
Event::WorkspacesChanged { workspaces } => {
println!("Workspaces changed: {workspaces:?}");
}
Event::WorkspaceActivated { id, focused } => {
let word = if focused { "focused" } else { "activated" };
println!("Workspace {word}: {id}");
}
Event::WorkspaceActiveWindowChanged {
workspace_id,
active_window_id,
} => {
println!(
"Workspace {workspace_id}: \
active window changed to {active_window_id:?}"
);
}
Event::WindowsChanged { windows } => {
println!("Windows changed: {windows:?}");
}
Event::WindowOpenedOrChanged { window } => {
println!("Window opened or changed: {window:?}");
}
Event::WindowClosed { id } => {
println!("Window closed: {id}");
}
Event::WindowFocusChanged { id } => {
println!("Window focus changed: {id:?}");
}
Event::KeyboardLayoutsChanged { keyboard_layouts } => {
println!("Keyboard layouts changed: {keyboard_layouts:?}");
}
Event::KeyboardLayoutSwitched { idx } => {
println!("Keyboard layout switched: {idx}");
}
}
}
}
}
Ok(())
+371 -45
View File
@@ -1,15 +1,20 @@
use std::cell::RefCell;
use std::collections::HashSet;
use std::os::unix::net::{UnixListener, UnixStream};
use std::path::PathBuf;
use std::rc::Rc;
use std::sync::{Arc, Mutex};
use std::{env, io, process};
use anyhow::Context;
use async_channel::{Receiver, Sender, TrySendError};
use calloop::futures::Scheduler;
use calloop::io::Async;
use directories::BaseDirs;
use futures_util::io::{AsyncReadExt, BufReader};
use futures_util::{AsyncBufReadExt, AsyncWriteExt};
use niri_ipc::{KeyboardLayouts, OutputConfigChanged, Reply, Request, Response};
use smithay::desktop::Window;
use futures_util::{select_biased, AsyncBufReadExt, AsyncWrite, AsyncWriteExt, FutureExt as _};
use niri_ipc::state::{EventStreamState, EventStreamStatePart as _};
use niri_ipc::{Event, KeyboardLayouts, OutputConfigChanged, Reply, Request, Response, Workspace};
use smithay::input::keyboard::XkbContextHandler;
use smithay::reexports::calloop::generic::Generic;
use smithay::reexports::calloop::{Interest, LoopHandle, Mode, PostAction};
@@ -18,17 +23,38 @@ use smithay::wayland::compositor::with_states;
use smithay::wayland::shell::xdg::XdgToplevelSurfaceData;
use crate::backend::IpcOutputMap;
use crate::layout::workspace::WorkspaceId;
use crate::niri::State;
use crate::utils::version;
use crate::window::Mapped;
// If an event stream client fails to read events fast enough that we accumulate more than this
// number in our buffer, we drop that event stream client.
const EVENT_STREAM_BUFFER_SIZE: usize = 64;
pub struct IpcServer {
pub socket_path: PathBuf,
event_streams: Rc<RefCell<Vec<EventStreamSender>>>,
event_stream_state: Rc<RefCell<EventStreamState>>,
}
struct ClientCtx {
event_loop: LoopHandle<'static, State>,
scheduler: Scheduler<()>,
ipc_outputs: Arc<Mutex<IpcOutputMap>>,
ipc_focused_window: Arc<Mutex<Option<Window>>>,
event_streams: Rc<RefCell<Vec<EventStreamSender>>>,
event_stream_state: Rc<RefCell<EventStreamState>>,
}
struct EventStreamClient {
events: Receiver<Event>,
disconnect: Receiver<()>,
write: Box<dyn AsyncWrite + Unpin>,
}
struct EventStreamSender {
events: Sender<Event>,
disconnect: Sender<()>,
}
impl IpcServer {
@@ -60,7 +86,43 @@ impl IpcServer {
})
.unwrap();
Ok(Self { socket_path })
Ok(Self {
socket_path,
event_streams: Rc::new(RefCell::new(Vec::new())),
event_stream_state: Rc::new(RefCell::new(EventStreamState::default())),
})
}
fn send_event(&self, event: Event) {
let mut streams = self.event_streams.borrow_mut();
let mut to_remove = Vec::new();
for (idx, stream) in streams.iter_mut().enumerate() {
match stream.events.try_send(event.clone()) {
Ok(()) => (),
Err(TrySendError::Closed(_)) => to_remove.push(idx),
Err(TrySendError::Full(_)) => {
warn!(
"disconnecting IPC event stream client \
because it is reading events too slowly"
);
to_remove.push(idx);
}
}
}
for idx in to_remove.into_iter().rev() {
let stream = streams.swap_remove(idx);
let _ = stream.disconnect.send_blocking(());
}
}
pub fn keyboard_layout_switched(&self, new_idx: u8) {
let mut state = self.event_stream_state.borrow_mut();
let state = &mut state.keyboard_layouts;
let event = Event::KeyboardLayoutSwitched { idx: new_idx };
state.apply(event.clone());
self.send_event(event);
}
}
@@ -90,10 +152,14 @@ fn on_new_ipc_client(state: &mut State, stream: UnixStream) {
}
};
let ipc_server = state.niri.ipc_server.as_ref().unwrap();
let ctx = ClientCtx {
event_loop: state.niri.event_loop.clone(),
scheduler: state.niri.scheduler.clone(),
ipc_outputs: state.backend.ipc_outputs(),
ipc_focused_window: state.niri.ipc_focused_window.clone(),
event_streams: ipc_server.event_streams.clone(),
event_stream_state: ipc_server.event_stream_state.clone(),
};
let future = async move {
@@ -106,7 +172,7 @@ fn on_new_ipc_client(state: &mut State, stream: UnixStream) {
}
}
async fn handle_client(ctx: ClientCtx, stream: Async<'_, UnixStream>) -> anyhow::Result<()> {
async fn handle_client(ctx: ClientCtx, stream: Async<'static, UnixStream>) -> anyhow::Result<()> {
let (read, mut write) = stream.split();
let mut buf = String::new();
@@ -120,6 +186,7 @@ async fn handle_client(ctx: ClientCtx, stream: Async<'_, UnixStream>) -> anyhow:
.context("error parsing request")
.map_err(|err| err.to_string());
let requested_error = matches!(request, Ok(Request::ReturnError));
let requested_event_stream = matches!(request, Ok(Request::EventStream));
let reply = match request {
Ok(request) => process(&ctx, request).await,
@@ -136,6 +203,46 @@ async fn handle_client(ctx: ClientCtx, stream: Async<'_, UnixStream>) -> anyhow:
buf.push(b'\n');
write.write_all(&buf).await.context("error writing reply")?;
if requested_event_stream {
let (events_tx, events_rx) = async_channel::bounded(EVENT_STREAM_BUFFER_SIZE);
let (disconnect_tx, disconnect_rx) = async_channel::bounded(1);
// Spawn a task for the client.
let client = EventStreamClient {
events: events_rx,
disconnect: disconnect_rx,
write: Box::new(write) as _,
};
let future = async move {
if let Err(err) = handle_event_stream_client(client).await {
warn!("error handling IPC event stream client: {err:?}");
}
};
if let Err(err) = ctx.scheduler.schedule(future) {
warn!("error scheduling IPC event stream future: {err:?}");
}
// Send the initial state.
{
let state = ctx.event_stream_state.borrow();
for event in state.replicate() {
events_tx
.try_send(event)
.expect("initial event burst had more events than buffer size");
}
}
// Add it to the list.
{
let mut streams = ctx.event_streams.borrow_mut();
let sender = EventStreamSender {
events: events_tx,
disconnect: disconnect_tx,
};
streams.push(sender);
}
}
Ok(())
}
@@ -149,23 +256,9 @@ async fn process(ctx: &ClientCtx, request: Request) -> Reply {
Response::Outputs(outputs.collect())
}
Request::FocusedWindow => {
let window = ctx.ipc_focused_window.lock().unwrap().clone();
let window = window.map(|window| {
let wl_surface = window.toplevel().expect("no X11 support").wl_surface();
with_states(wl_surface, |states| {
let role = states
.data_map
.get::<XdgToplevelSurfaceData>()
.unwrap()
.lock()
.unwrap();
niri_ipc::Window {
title: role.title.clone(),
app_id: role.app_id.clone(),
}
})
});
let state = ctx.event_stream_state.borrow();
let windows = &state.windows.windows;
let window = windows.values().find(|win| win.is_focused).cloned();
Response::FocusedWindow(window)
}
Request::Action(action) => {
@@ -202,13 +295,8 @@ async fn process(ctx: &ClientCtx, request: Request) -> Reply {
Response::OutputConfigChanged(response)
}
Request::Workspaces => {
let (tx, rx) = async_channel::bounded(1);
ctx.event_loop.insert_idle(move |state| {
let workspaces = state.niri.layout.ipc_workspaces();
let _ = tx.send_blocking(workspaces);
});
let result = rx.recv().await;
let workspaces = result.map_err(|_| String::from("error getting workspace info"))?;
let state = ctx.event_stream_state.borrow();
let workspaces = state.workspaces.workspaces.values().cloned().collect();
Response::Workspaces(workspaces)
}
Request::FocusedOutput => {
@@ -238,23 +326,261 @@ async fn process(ctx: &ClientCtx, request: Request) -> Reply {
Response::FocusedOutput(output)
}
Request::KeyboardLayouts => {
let (tx, rx) = async_channel::bounded(1);
ctx.event_loop.insert_idle(move |state| {
let keyboard = state.niri.seat.get_keyboard().unwrap();
let layout = keyboard.with_xkb_state(state, |context| {
let layouts = context.keymap().layouts();
KeyboardLayouts {
names: layouts.map(str::to_owned).collect(),
current_idx: context.active_layout().0 as u8,
}
});
let _ = tx.send_blocking(layout);
});
let result = rx.recv().await;
let layout = result.map_err(|_| String::from("error getting layout info"))?;
let state = ctx.event_stream_state.borrow();
let layout = state.keyboard_layouts.keyboard_layouts.clone();
let layout = layout.expect("keyboard layouts should be set at startup");
Response::KeyboardLayouts(layout)
}
Request::EventStream => Response::Handled,
};
Ok(response)
}
async fn handle_event_stream_client(client: EventStreamClient) -> anyhow::Result<()> {
let EventStreamClient {
events,
disconnect,
mut write,
} = client;
while let Ok(event) = events.recv().await {
let mut buf = serde_json::to_vec(&event).context("error formatting event")?;
buf.push(b'\n');
let res = select_biased! {
_ = disconnect.recv().fuse() => return Ok(()),
res = write.write_all(&buf).fuse() => res,
};
match res {
Ok(()) => (),
// Normal client disconnection.
Err(err) if err.kind() == io::ErrorKind::BrokenPipe => return Ok(()),
res @ Err(_) => res.context("error writing event")?,
}
}
Ok(())
}
fn make_ipc_window(mapped: &Mapped, workspace_id: Option<WorkspaceId>) -> niri_ipc::Window {
let wl_surface = mapped.toplevel().wl_surface();
with_states(wl_surface, |states| {
let role = states
.data_map
.get::<XdgToplevelSurfaceData>()
.unwrap()
.lock()
.unwrap();
niri_ipc::Window {
id: u64::from(mapped.id().get()),
title: role.title.clone(),
app_id: role.app_id.clone(),
workspace_id: workspace_id.map(|id| u64::from(id.0)),
is_focused: mapped.is_focused(),
}
})
}
impl State {
pub fn ipc_keyboard_layouts_changed(&mut self) {
let keyboard = self.niri.seat.get_keyboard().unwrap();
let keyboard_layouts = keyboard.with_xkb_state(self, |context| {
let layouts = context.keymap().layouts();
KeyboardLayouts {
names: layouts.map(str::to_owned).collect(),
current_idx: context.active_layout().0 as u8,
}
});
let Some(server) = &self.niri.ipc_server else {
return;
};
let mut state = server.event_stream_state.borrow_mut();
let state = &mut state.keyboard_layouts;
let event = Event::KeyboardLayoutsChanged { keyboard_layouts };
state.apply(event.clone());
server.send_event(event);
}
pub fn ipc_refresh_layout(&mut self) {
self.ipc_refresh_workspaces();
self.ipc_refresh_windows();
}
fn ipc_refresh_workspaces(&mut self) {
let Some(server) = &self.niri.ipc_server else {
return;
};
let _span = tracy_client::span!("State::ipc_refresh_workspaces");
let mut state = server.event_stream_state.borrow_mut();
let state = &mut state.workspaces;
let mut events = Vec::new();
let layout = &self.niri.layout;
let focused_ws_id = layout.active_workspace().map(|ws| u64::from(ws.id().0));
// Check for workspace changes.
let mut seen = HashSet::new();
let mut need_workspaces_changed = false;
for (mon, ws_idx, ws) in layout.workspaces() {
let id = u64::from(ws.id().0);
seen.insert(id);
let Some(ipc_ws) = state.workspaces.get(&id) else {
// A new workspace was added.
need_workspaces_changed = true;
break;
};
// Check for any changes that we can't signal as individual events.
let output_name = mon.map(|mon| mon.output_name());
if ipc_ws.idx != u8::try_from(ws_idx + 1).unwrap_or(u8::MAX)
|| ipc_ws.name != ws.name
|| ipc_ws.output.as_ref() != output_name
{
need_workspaces_changed = true;
break;
}
let active_window_id = ws.active_window().map(|win| u64::from(win.id().get()));
if ipc_ws.active_window_id != active_window_id {
events.push(Event::WorkspaceActiveWindowChanged {
workspace_id: id,
active_window_id,
});
}
// Check if this workspace became focused.
let is_focused = Some(id) == focused_ws_id;
if is_focused && !ipc_ws.is_focused {
events.push(Event::WorkspaceActivated { id, focused: true });
continue;
}
// Check if this workspace became active.
let is_active = mon.map_or(false, |mon| mon.active_workspace_idx == ws_idx);
if is_active && !ipc_ws.is_active {
events.push(Event::WorkspaceActivated { id, focused: false });
}
}
// Check if any workspaces were removed.
if !need_workspaces_changed && state.workspaces.keys().any(|id| !seen.contains(id)) {
need_workspaces_changed = true;
}
if need_workspaces_changed {
events.clear();
let workspaces = layout
.workspaces()
.map(|(mon, ws_idx, ws)| {
let id = u64::from(ws.id().0);
Workspace {
id,
idx: u8::try_from(ws_idx + 1).unwrap_or(u8::MAX),
name: ws.name.clone(),
output: mon.map(|mon| mon.output_name().clone()),
is_active: mon.map_or(false, |mon| mon.active_workspace_idx == ws_idx),
is_focused: Some(id) == focused_ws_id,
active_window_id: ws.active_window().map(|win| u64::from(win.id().get())),
}
})
.collect();
events.push(Event::WorkspacesChanged { workspaces });
}
for event in events {
state.apply(event.clone());
server.send_event(event);
}
}
fn ipc_refresh_windows(&mut self) {
let Some(server) = &self.niri.ipc_server else {
return;
};
let _span = tracy_client::span!("State::ipc_refresh_windows");
let mut state = server.event_stream_state.borrow_mut();
let state = &mut state.windows;
let mut events = Vec::new();
let layout = &self.niri.layout;
// Check for window changes.
let mut seen = HashSet::new();
let mut focused_id = None;
layout.with_windows(|mapped, _, ws_id| {
let id = u64::from(mapped.id().get());
seen.insert(id);
if mapped.is_focused() {
focused_id = Some(id);
}
let Some(ipc_win) = state.windows.get(&id) else {
let window = make_ipc_window(mapped, Some(ws_id));
events.push(Event::WindowOpenedOrChanged { window });
return;
};
let workspace_id = Some(u64::from(ws_id.0));
let mut changed = ipc_win.workspace_id != workspace_id;
let wl_surface = mapped.toplevel().wl_surface();
changed |= with_states(wl_surface, |states| {
let role = states
.data_map
.get::<XdgToplevelSurfaceData>()
.unwrap()
.lock()
.unwrap();
ipc_win.title != role.title || ipc_win.app_id != role.app_id
});
if changed {
let window = make_ipc_window(mapped, Some(ws_id));
events.push(Event::WindowOpenedOrChanged { window });
return;
}
if mapped.is_focused() && !ipc_win.is_focused {
events.push(Event::WindowFocusChanged { id: Some(id) });
}
});
// Check for closed windows.
let mut ipc_focused_id = None;
for (id, ipc_win) in &state.windows {
if !seen.contains(id) {
events.push(Event::WindowClosed { id: *id });
}
if ipc_win.is_focused {
ipc_focused_id = Some(id);
}
}
// Extra check for focus becoming None, since the checks above only work for focus becoming
// a different window.
if focused_id.is_none() && ipc_focused_id.is_some() {
events.push(Event::WindowFocusChanged { id: None });
}
for event in events {
state.apply(event.clone());
server.send_event(event);
}
}
}
+32 -32
View File
@@ -42,6 +42,7 @@ use smithay::backend::renderer::gles::{GlesRenderer, GlesTexture};
use smithay::output::{self, Output};
use smithay::reexports::wayland_server::protocol::wl_surface::WlSurface;
use smithay::utils::{Logical, Point, Scale, Serial, Size, Transform};
use workspace::WorkspaceId;
pub use self::monitor::MonitorRenderElement;
use self::monitor::{Monitor, WorkspaceSwitch};
@@ -1094,13 +1095,13 @@ impl<W: LayoutElement> Layout<W> {
mon.workspaces.iter().flat_map(|ws| ws.windows())
}
pub fn with_windows(&self, mut f: impl FnMut(&W, Option<&Output>)) {
pub fn with_windows(&self, mut f: impl FnMut(&W, Option<&Output>, WorkspaceId)) {
match &self.monitor_set {
MonitorSet::Normal { monitors, .. } => {
for mon in monitors {
for ws in &mon.workspaces {
for win in ws.windows() {
f(win, Some(&mon.output));
f(win, Some(&mon.output), ws.id());
}
}
}
@@ -1108,7 +1109,7 @@ impl<W: LayoutElement> Layout<W> {
MonitorSet::NoOutputs { workspaces } => {
for ws in workspaces {
for win in ws.windows() {
f(win, None);
f(win, None, ws.id());
}
}
}
@@ -2484,39 +2485,38 @@ impl<W: LayoutElement> Layout<W> {
}
}
pub fn ipc_workspaces(&self) -> Vec<niri_ipc::Workspace> {
pub fn workspaces(
&self,
) -> impl Iterator<Item = (Option<&Monitor<W>>, usize, &Workspace<W>)> + '_ {
let iter_normal;
let iter_no_outputs;
match &self.monitor_set {
MonitorSet::Normal {
monitors,
primary_idx: _,
active_monitor_idx: _,
} => {
let mut workspaces = Vec::new();
MonitorSet::Normal { monitors, .. } => {
let it = monitors.iter().flat_map(|mon| {
mon.workspaces
.iter()
.enumerate()
.map(move |(idx, ws)| (Some(mon), idx, ws))
});
for monitor in monitors {
for (idx, workspace) in monitor.workspaces.iter().enumerate() {
workspaces.push(niri_ipc::Workspace {
idx: u8::try_from(idx + 1).unwrap_or(u8::MAX),
name: workspace.name.clone(),
output: Some(monitor.output.name()),
is_active: monitor.active_workspace_idx == idx,
})
}
}
workspaces
iter_normal = Some(it);
iter_no_outputs = None;
}
MonitorSet::NoOutputs { workspaces } => {
let it = workspaces
.iter()
.enumerate()
.map(|(idx, ws)| (None, idx, ws));
iter_normal = None;
iter_no_outputs = Some(it);
}
MonitorSet::NoOutputs { workspaces } => workspaces
.iter()
.enumerate()
.map(|(idx, ws)| niri_ipc::Workspace {
idx: u8::try_from(idx + 1).unwrap_or(u8::MAX),
name: ws.name.clone(),
output: None,
is_active: false,
})
.collect(),
}
let iter_normal = iter_normal.into_iter().flatten();
let iter_no_outputs = iter_no_outputs.into_iter().flatten();
iter_normal.chain(iter_no_outputs)
}
}
+10 -1
View File
@@ -123,7 +123,7 @@ pub struct OutputId(String);
static WORKSPACE_ID_COUNTER: IdCounter = IdCounter::new();
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct WorkspaceId(u32);
pub struct WorkspaceId(pub u32);
impl WorkspaceId {
fn next() -> WorkspaceId {
@@ -528,6 +528,15 @@ impl<W: LayoutElement> Workspace<W> {
self.output.as_ref()
}
pub fn active_window(&self) -> Option<&W> {
if self.columns.is_empty() {
return None;
}
let col = &self.columns[self.active_column_idx];
Some(col.tiles[col.active_tile_idx].window())
}
pub fn set_output(&mut self, output: Option<Output>) {
if self.output == output {
return;
+18 -13
View File
@@ -291,7 +291,6 @@ pub struct Niri {
pub ipc_server: Option<IpcServer>,
pub ipc_outputs_changed: bool,
pub ipc_focused_window: Arc<Mutex<Option<Window>>>,
// Casts are dropped before PipeWire to prevent a double-free (yay).
pub casts: Vec<Cast>,
@@ -502,7 +501,12 @@ impl State {
let mut niri = Niri::new(config.clone(), event_loop, stop_signal, display, &backend);
backend.init(&mut niri);
Ok(Self { backend, niri })
let mut state = Self { backend, niri };
// Initialize some IPC server state.
state.ipc_keyboard_layouts_changed();
Ok(state)
}
pub fn refresh_and_flush_clients(&mut self) {
@@ -538,6 +542,7 @@ impl State {
foreign_toplevel::refresh(self);
self.niri.refresh_window_rules();
self.refresh_ipc_outputs();
self.ipc_refresh_layout();
#[cfg(feature = "xdp-gnome-screencast")]
self.niri.refresh_mapped_cast_outputs();
@@ -839,8 +844,6 @@ impl State {
focus
);
let mut newly_focused_window = None;
// Tell the windows their new focus state for window rule purposes.
if let KeyboardFocus::Layout {
surface: Some(surface),
@@ -856,12 +859,9 @@ impl State {
{
if let Some((mapped, _)) = self.niri.layout.find_window_and_output_mut(surface) {
mapped.set_is_focused(true);
newly_focused_window = Some(mapped.window.clone());
}
}
*self.niri.ipc_focused_window.lock().unwrap() = newly_focused_window;
if let Some(grab) = self.niri.popup_grab.as_mut() {
if Some(&grab.root) != focus.surface() {
trace!(
@@ -911,6 +911,10 @@ impl State {
keyboard.with_xkb_state(self, |mut context| {
context.set_layout(new_layout);
});
if let Some(server) = &self.niri.ipc_server {
server.keyboard_layout_switched(new_layout.0 as u8);
}
}
}
@@ -1076,6 +1080,8 @@ impl State {
if let Err(err) = keyboard.set_xkb_config(self, xkb.to_xkb_config()) {
warn!("error updating xkb config: {err:?}");
}
self.ipc_keyboard_layouts_changed();
}
if libinput_config_changed {
@@ -1372,7 +1378,7 @@ impl State {
}
StreamTargetId::Window { id } => {
let mut window = None;
self.niri.layout.with_windows(|mapped, _| {
self.niri.layout.with_windows(|mapped, _, _| {
if u64::from(mapped.id().get()) != id {
return;
}
@@ -1489,7 +1495,7 @@ impl State {
let mut windows = HashMap::new();
self.niri.layout.with_windows(|mapped, _| {
self.niri.layout.with_windows(|mapped, _, _| {
let wl_surface = mapped
.window
.toplevel()
@@ -1843,7 +1849,6 @@ impl Niri {
ipc_server,
ipc_outputs_changed: false,
ipc_focused_window: Arc::new(Mutex::new(None)),
pipewire,
casts: vec![],
@@ -2811,7 +2816,7 @@ impl Niri {
let mut seen = HashSet::new();
let mut output_changed = vec![];
self.layout.with_windows(|mapped, output| {
self.layout.with_windows(|mapped, output, _| {
seen.insert(mapped.window.clone());
let Some(output) = output else {
@@ -3510,7 +3515,7 @@ impl Niri {
let frame_callback_time = get_monotonic_time();
self.layout.with_windows(|mapped, _| {
self.layout.with_windows(|mapped, _, _| {
mapped.window.send_frame(
output,
frame_callback_time,
@@ -3753,7 +3758,7 @@ impl Niri {
let _span = tracy_client::span!("Niri::render_window_for_screen_cast");
let mut window = None;
self.layout.with_windows(|mapped, _| {
self.layout.with_windows(|mapped, _, _| {
if u64::from(mapped.id().get()) != window_id {
return;
}
+1 -1
View File
@@ -95,7 +95,7 @@ pub fn refresh(state: &mut State) {
// Save the focused window for last, this way when the focus changes, we will first deactivate
// the previous window and only then activate the newly focused window.
let mut focused = None;
state.niri.layout.with_windows(|mapped, output| {
state.niri.layout.with_windows(|mapped, output, _| {
let wl_surface = mapped.toplevel().wl_surface();
with_states(wl_surface, |states| {
+20
View File
@@ -11,6 +11,26 @@ The communication over the IPC socket happens in JSON.
> If you're getting parsing errors from `niri msg` after upgrading niri, make sure that you've restarted niri itself.
> You might be trying to run a newer `niri msg` against an older `niri` compositor.
### Event Stream
<sup>Since: 0.1.9</sup>
While most niri IPC requests return a single response, the event stream request will make niri continuously stream events into the IPC connection until it is closed.
This is useful for implementing various bars and indicators that update as soon as something happens, without continuous polling.
The event stream IPC is designed to give you the complete current state up-front, then follow up with updates to that state.
This way, your state can never "desync" from niri, and you don't need to make any other IPC information requests.
Where reasonable, event stream state updates are atomic, though this is not always the case.
For example, a window may end up with a workspace id for a workspace that had already been removed.
This can happen if the corresponding workspaces-changed event arrives before the corresponding window-changed event.
To get a taste of the events, run `niri msg event-stream`.
Though, this is more of a debug function than anything.
You can get raw events from `niri msg --json event-stream`, or by connecting to the niri socket and requesting an event stream manually.
You can find the full list of events along with documentation in the [niri-ipc sub-crate](./niri-ipc/).
### Backwards Compatibility
The JSON output *should* remain stable, as in: