Files
niri/src/ipc/server.rs
T
2024-11-02 10:53:55 +03:00

589 lines
20 KiB
Rust

use std::cell::RefCell;
use std::collections::HashSet;
use std::os::unix::net::{UnixListener, UnixStream};
use std::path::PathBuf;
use std::rc::Rc;
use std::sync::{Arc, Mutex};
use std::{env, io, process};
use anyhow::Context;
use async_channel::{Receiver, Sender, TrySendError};
use calloop::futures::Scheduler;
use calloop::io::Async;
use directories::BaseDirs;
use futures_util::io::{AsyncReadExt, BufReader};
use futures_util::{select_biased, AsyncBufReadExt, AsyncWrite, AsyncWriteExt, FutureExt as _};
use niri_config::OutputName;
use niri_ipc::state::{EventStreamState, EventStreamStatePart as _};
use niri_ipc::{Event, KeyboardLayouts, OutputConfigChanged, Reply, Request, Response, Workspace};
use smithay::reexports::calloop::generic::Generic;
use smithay::reexports::calloop::{Interest, LoopHandle, Mode, PostAction};
use smithay::reexports::rustix::fs::unlink;
use crate::backend::IpcOutputMap;
use crate::layout::workspace::WorkspaceId;
use crate::niri::State;
use crate::utils::{version, with_toplevel_role};
use crate::window::Mapped;
// If an event stream client fails to read events fast enough that we accumulate more than this
// number in our buffer, we drop that event stream client.
const EVENT_STREAM_BUFFER_SIZE: usize = 64;
pub struct IpcServer {
pub socket_path: PathBuf,
event_streams: Rc<RefCell<Vec<EventStreamSender>>>,
event_stream_state: Rc<RefCell<EventStreamState>>,
}
struct ClientCtx {
event_loop: LoopHandle<'static, State>,
scheduler: Scheduler<()>,
ipc_outputs: Arc<Mutex<IpcOutputMap>>,
event_streams: Rc<RefCell<Vec<EventStreamSender>>>,
event_stream_state: Rc<RefCell<EventStreamState>>,
}
struct EventStreamClient {
events: Receiver<Event>,
disconnect: Receiver<()>,
write: Box<dyn AsyncWrite + Unpin>,
}
struct EventStreamSender {
events: Sender<Event>,
disconnect: Sender<()>,
}
impl IpcServer {
pub fn start(
event_loop: &LoopHandle<'static, State>,
wayland_socket_name: &str,
) -> anyhow::Result<Self> {
let _span = tracy_client::span!("Ipc::start");
let socket_name = format!("niri.{wayland_socket_name}.{}.sock", process::id());
let mut socket_path = socket_dir();
socket_path.push(socket_name);
let listener = UnixListener::bind(&socket_path).context("error binding socket")?;
listener
.set_nonblocking(true)
.context("error setting socket to non-blocking")?;
let source = Generic::new(listener, Interest::READ, Mode::Level);
event_loop
.insert_source(source, |_, socket, state| {
match socket.accept() {
Ok((stream, _)) => on_new_ipc_client(state, stream),
Err(e) if e.kind() == io::ErrorKind::WouldBlock => (),
Err(e) => return Err(e),
}
Ok(PostAction::Continue)
})
.unwrap();
Ok(Self {
socket_path,
event_streams: Rc::new(RefCell::new(Vec::new())),
event_stream_state: Rc::new(RefCell::new(EventStreamState::default())),
})
}
fn send_event(&self, event: Event) {
let mut streams = self.event_streams.borrow_mut();
let mut to_remove = Vec::new();
for (idx, stream) in streams.iter_mut().enumerate() {
match stream.events.try_send(event.clone()) {
Ok(()) => (),
Err(TrySendError::Closed(_)) => to_remove.push(idx),
Err(TrySendError::Full(_)) => {
warn!(
"disconnecting IPC event stream client \
because it is reading events too slowly"
);
to_remove.push(idx);
}
}
}
for idx in to_remove.into_iter().rev() {
let stream = streams.swap_remove(idx);
let _ = stream.disconnect.send_blocking(());
}
}
}
impl Drop for IpcServer {
fn drop(&mut self) {
let _ = unlink(&self.socket_path);
}
}
fn socket_dir() -> PathBuf {
BaseDirs::new()
.as_ref()
.and_then(|x| x.runtime_dir())
.map(|x| x.to_owned())
.unwrap_or_else(env::temp_dir)
}
fn on_new_ipc_client(state: &mut State, stream: UnixStream) {
let _span = tracy_client::span!("on_new_ipc_client");
trace!("new IPC client connected");
let stream = match state.niri.event_loop.adapt_io(stream) {
Ok(stream) => stream,
Err(err) => {
warn!("error making IPC stream async: {err:?}");
return;
}
};
let ipc_server = state.niri.ipc_server.as_ref().unwrap();
let ctx = ClientCtx {
event_loop: state.niri.event_loop.clone(),
scheduler: state.niri.scheduler.clone(),
ipc_outputs: state.backend.ipc_outputs(),
event_streams: ipc_server.event_streams.clone(),
event_stream_state: ipc_server.event_stream_state.clone(),
};
let future = async move {
if let Err(err) = handle_client(ctx, stream).await {
warn!("error handling IPC client: {err:?}");
}
};
if let Err(err) = state.niri.scheduler.schedule(future) {
warn!("error scheduling IPC stream future: {err:?}");
}
}
async fn handle_client(ctx: ClientCtx, stream: Async<'static, UnixStream>) -> anyhow::Result<()> {
let (read, mut write) = stream.split();
let mut buf = String::new();
// Read a single line to allow extensibility in the future to keep reading.
BufReader::new(read)
.read_line(&mut buf)
.await
.context("error reading request")?;
let request = serde_json::from_str(&buf)
.context("error parsing request")
.map_err(|err| err.to_string());
let requested_error = matches!(request, Ok(Request::ReturnError));
let requested_event_stream = matches!(request, Ok(Request::EventStream));
let reply = match request {
Ok(request) => process(&ctx, request).await,
Err(err) => Err(err),
};
if let Err(err) = &reply {
if !requested_error {
warn!("error processing IPC request: {err:?}");
}
}
let mut buf = serde_json::to_vec(&reply).context("error formatting reply")?;
buf.push(b'\n');
write.write_all(&buf).await.context("error writing reply")?;
if requested_event_stream {
let (events_tx, events_rx) = async_channel::bounded(EVENT_STREAM_BUFFER_SIZE);
let (disconnect_tx, disconnect_rx) = async_channel::bounded(1);
// Spawn a task for the client.
let client = EventStreamClient {
events: events_rx,
disconnect: disconnect_rx,
write: Box::new(write) as _,
};
let future = async move {
if let Err(err) = handle_event_stream_client(client).await {
warn!("error handling IPC event stream client: {err:?}");
}
};
if let Err(err) = ctx.scheduler.schedule(future) {
warn!("error scheduling IPC event stream future: {err:?}");
}
// Send the initial state.
{
let state = ctx.event_stream_state.borrow();
for event in state.replicate() {
events_tx
.try_send(event)
.expect("initial event burst had more events than buffer size");
}
}
// Add it to the list.
{
let mut streams = ctx.event_streams.borrow_mut();
let sender = EventStreamSender {
events: events_tx,
disconnect: disconnect_tx,
};
streams.push(sender);
}
}
Ok(())
}
async fn process(ctx: &ClientCtx, request: Request) -> Reply {
let response = match request {
Request::ReturnError => return Err(String::from("example compositor error")),
Request::Version => Response::Version(version()),
Request::Outputs => {
let ipc_outputs = ctx.ipc_outputs.lock().unwrap().clone();
let outputs = ipc_outputs.values().cloned().map(|o| (o.name.clone(), o));
Response::Outputs(outputs.collect())
}
Request::Workspaces => {
let state = ctx.event_stream_state.borrow();
let workspaces = state.workspaces.workspaces.values().cloned().collect();
Response::Workspaces(workspaces)
}
Request::Windows => {
let state = ctx.event_stream_state.borrow();
let windows = state.windows.windows.values().cloned().collect();
Response::Windows(windows)
}
Request::KeyboardLayouts => {
let state = ctx.event_stream_state.borrow();
let layout = state.keyboard_layouts.keyboard_layouts.clone();
let layout = layout.expect("keyboard layouts should be set at startup");
Response::KeyboardLayouts(layout)
}
Request::FocusedWindow => {
let state = ctx.event_stream_state.borrow();
let windows = &state.windows.windows;
let window = windows.values().find(|win| win.is_focused).cloned();
Response::FocusedWindow(window)
}
Request::Action(action) => {
let (tx, rx) = async_channel::bounded(1);
let action = niri_config::Action::from(action);
ctx.event_loop.insert_idle(move |state| {
state.do_action(action, false);
let _ = tx.send_blocking(());
});
// Wait until the action has been processed before returning. This is important for a
// few actions, for instance for DoScreenTransition this wait ensures that the screen
// contents were sampled into the texture.
let _ = rx.recv().await;
Response::Handled
}
Request::Output { output, action } => {
let ipc_outputs = ctx.ipc_outputs.lock().unwrap();
let found = ipc_outputs
.values()
.any(|o| OutputName::from_ipc_output(o).matches(&output));
let response = if found {
OutputConfigChanged::Applied
} else {
OutputConfigChanged::OutputWasMissing
};
drop(ipc_outputs);
ctx.event_loop.insert_idle(move |state| {
state.apply_transient_output_config(&output, action);
});
Response::OutputConfigChanged(response)
}
Request::FocusedOutput => {
let (tx, rx) = async_channel::bounded(1);
ctx.event_loop.insert_idle(move |state| {
let active_output = state
.niri
.layout
.active_output()
.map(|output| output.name());
let output = active_output.and_then(|active_output| {
state
.backend
.ipc_outputs()
.lock()
.unwrap()
.values()
.find(|o| o.name == active_output)
.cloned()
});
let _ = tx.send_blocking(output);
});
let result = rx.recv().await;
let output = result.map_err(|_| String::from("error getting active output info"))?;
Response::FocusedOutput(output)
}
Request::EventStream => Response::Handled,
};
Ok(response)
}
async fn handle_event_stream_client(client: EventStreamClient) -> anyhow::Result<()> {
let EventStreamClient {
events,
disconnect,
mut write,
} = client;
while let Ok(event) = events.recv().await {
let mut buf = serde_json::to_vec(&event).context("error formatting event")?;
buf.push(b'\n');
let res = select_biased! {
_ = disconnect.recv().fuse() => return Ok(()),
res = write.write_all(&buf).fuse() => res,
};
match res {
Ok(()) => (),
// Normal client disconnection.
Err(err) if err.kind() == io::ErrorKind::BrokenPipe => return Ok(()),
res @ Err(_) => res.context("error writing event")?,
}
}
Ok(())
}
fn make_ipc_window(mapped: &Mapped, workspace_id: Option<WorkspaceId>) -> niri_ipc::Window {
with_toplevel_role(mapped.toplevel(), |role| niri_ipc::Window {
id: mapped.id().get(),
title: role.title.clone(),
app_id: role.app_id.clone(),
workspace_id: workspace_id.map(|id| id.get()),
is_focused: mapped.is_focused(),
})
}
impl State {
pub fn ipc_keyboard_layouts_changed(&mut self) {
let keyboard = self.niri.seat.get_keyboard().unwrap();
let keyboard_layouts = keyboard.with_xkb_state(self, |context| {
let xkb = context.xkb().lock().unwrap();
let layouts = xkb.layouts();
KeyboardLayouts {
names: layouts
.map(|layout| xkb.layout_name(layout).to_owned())
.collect(),
current_idx: xkb.active_layout().0 as u8,
}
});
let Some(server) = &self.niri.ipc_server else {
return;
};
let mut state = server.event_stream_state.borrow_mut();
let state = &mut state.keyboard_layouts;
let event = Event::KeyboardLayoutsChanged { keyboard_layouts };
state.apply(event.clone());
server.send_event(event);
}
pub fn ipc_refresh_keyboard_layout_index(&mut self) {
let keyboard = self.niri.seat.get_keyboard().unwrap();
let idx = keyboard.with_xkb_state(self, |context| {
let xkb = context.xkb().lock().unwrap();
xkb.active_layout().0 as u8
});
let Some(server) = &self.niri.ipc_server else {
return;
};
let mut state = server.event_stream_state.borrow_mut();
let state = &mut state.keyboard_layouts;
if state.keyboard_layouts.as_ref().unwrap().current_idx == idx {
return;
}
let event = Event::KeyboardLayoutSwitched { idx };
state.apply(event.clone());
server.send_event(event);
}
pub fn ipc_refresh_layout(&mut self) {
self.ipc_refresh_workspaces();
self.ipc_refresh_windows();
}
fn ipc_refresh_workspaces(&mut self) {
let Some(server) = &self.niri.ipc_server else {
return;
};
let _span = tracy_client::span!("State::ipc_refresh_workspaces");
let mut state = server.event_stream_state.borrow_mut();
let state = &mut state.workspaces;
let mut events = Vec::new();
let layout = &self.niri.layout;
let focused_ws_id = layout.active_workspace().map(|ws| ws.id().get());
// Check for workspace changes.
let mut seen = HashSet::new();
let mut need_workspaces_changed = false;
for (mon, ws_idx, ws) in layout.workspaces() {
let id = ws.id().get();
seen.insert(id);
let Some(ipc_ws) = state.workspaces.get(&id) else {
// A new workspace was added.
need_workspaces_changed = true;
break;
};
// Check for any changes that we can't signal as individual events.
let output_name = mon.map(|mon| mon.output_name());
if ipc_ws.idx != u8::try_from(ws_idx + 1).unwrap_or(u8::MAX)
|| ipc_ws.name.as_ref() != ws.name()
|| ipc_ws.output.as_ref() != output_name
{
need_workspaces_changed = true;
break;
}
let active_window_id = ws.active_window().map(|win| win.id().get());
if ipc_ws.active_window_id != active_window_id {
events.push(Event::WorkspaceActiveWindowChanged {
workspace_id: id,
active_window_id,
});
}
// Check if this workspace became focused.
let is_focused = Some(id) == focused_ws_id;
if is_focused && !ipc_ws.is_focused {
events.push(Event::WorkspaceActivated { id, focused: true });
continue;
}
// Check if this workspace became active.
let is_active = mon.map_or(false, |mon| mon.active_workspace_idx() == ws_idx);
if is_active && !ipc_ws.is_active {
events.push(Event::WorkspaceActivated { id, focused: false });
}
}
// Check if any workspaces were removed.
if !need_workspaces_changed && state.workspaces.keys().any(|id| !seen.contains(id)) {
need_workspaces_changed = true;
}
if need_workspaces_changed {
events.clear();
let workspaces = layout
.workspaces()
.map(|(mon, ws_idx, ws)| {
let id = ws.id().get();
Workspace {
id,
idx: u8::try_from(ws_idx + 1).unwrap_or(u8::MAX),
name: ws.name().cloned(),
output: mon.map(|mon| mon.output_name().clone()),
is_active: mon.map_or(false, |mon| mon.active_workspace_idx() == ws_idx),
is_focused: Some(id) == focused_ws_id,
active_window_id: ws.active_window().map(|win| win.id().get()),
}
})
.collect();
events.push(Event::WorkspacesChanged { workspaces });
}
for event in events {
state.apply(event.clone());
server.send_event(event);
}
}
fn ipc_refresh_windows(&mut self) {
let Some(server) = &self.niri.ipc_server else {
return;
};
let _span = tracy_client::span!("State::ipc_refresh_windows");
let mut state = server.event_stream_state.borrow_mut();
let state = &mut state.windows;
let mut events = Vec::new();
let layout = &self.niri.layout;
// Check for window changes.
let mut seen = HashSet::new();
let mut focused_id = None;
layout.with_windows(|mapped, _, ws_id| {
let id = mapped.id().get();
seen.insert(id);
if mapped.is_focused() {
focused_id = Some(id);
}
let Some(ipc_win) = state.windows.get(&id) else {
let window = make_ipc_window(mapped, ws_id);
events.push(Event::WindowOpenedOrChanged { window });
return;
};
let workspace_id = ws_id.map(|id| id.get());
let mut changed = ipc_win.workspace_id != workspace_id;
changed |= with_toplevel_role(mapped.toplevel(), |role| {
ipc_win.title != role.title || ipc_win.app_id != role.app_id
});
if changed {
let window = make_ipc_window(mapped, ws_id);
events.push(Event::WindowOpenedOrChanged { window });
return;
}
if mapped.is_focused() && !ipc_win.is_focused {
events.push(Event::WindowFocusChanged { id: Some(id) });
}
});
// Check for closed windows.
let mut ipc_focused_id = None;
for (id, ipc_win) in &state.windows {
if !seen.contains(id) {
events.push(Event::WindowClosed { id: *id });
}
if ipc_win.is_focused {
ipc_focused_id = Some(id);
}
}
// Extra check for focus becoming None, since the checks above only work for focus becoming
// a different window.
if focused_id.is_none() && ipc_focused_id.is_some() {
events.push(Event::WindowFocusChanged { id: None });
}
for event in events {
state.apply(event.clone());
server.send_event(event);
}
}
}