mirror of
https://github.com/niri-wm/niri.git
synced 2026-06-23 02:05:33 +07:00
ipc: support long living sockets
This commit is contained in:
committed by
Ivan Molodetskikh
parent
89b7423ee5
commit
f917932b3e
+73
-63
@@ -185,76 +185,86 @@ fn on_new_ipc_client(state: &mut State, stream: UnixStream) {
|
|||||||
|
|
||||||
async fn handle_client(ctx: ClientCtx, stream: Async<'static, UnixStream>) -> anyhow::Result<()> {
|
async fn handle_client(ctx: ClientCtx, stream: Async<'static, UnixStream>) -> anyhow::Result<()> {
|
||||||
let (read, mut write) = stream.split();
|
let (read, mut write) = stream.split();
|
||||||
let mut buf = String::new();
|
let mut read = BufReader::new(read);
|
||||||
|
|
||||||
// Read a single line to allow extensibility in the future to keep reading.
|
loop {
|
||||||
BufReader::new(read)
|
// Don't keep buf around to avoid clients wasting RAM by filling it with bogus data.
|
||||||
.read_line(&mut buf)
|
let mut buf = Vec::new();
|
||||||
.await
|
let res = read.read_until(b'\n', &mut buf).await;
|
||||||
.context("error reading request")?;
|
match res {
|
||||||
|
Ok(0) => return Ok(()),
|
||||||
let request = serde_json::from_str(&buf)
|
Ok(_) => (),
|
||||||
.context("error parsing request")
|
// Normal client disconnection.
|
||||||
.map_err(|err| err.to_string());
|
Err(err) if err.kind() == io::ErrorKind::BrokenPipe => return Ok(()),
|
||||||
let requested_error = matches!(request, Ok(Request::ReturnError));
|
Err(err) => {
|
||||||
let requested_event_stream = matches!(request, Ok(Request::EventStream));
|
return Err(err).context("error reading request");
|
||||||
|
|
||||||
let reply = match request {
|
|
||||||
Ok(request) => process(&ctx, request).await,
|
|
||||||
Err(err) => Err(err),
|
|
||||||
};
|
|
||||||
|
|
||||||
if let Err(err) = &reply {
|
|
||||||
if !requested_error {
|
|
||||||
warn!("error processing IPC request: {err:?}");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut buf = serde_json::to_vec(&reply).context("error formatting reply")?;
|
|
||||||
buf.push(b'\n');
|
|
||||||
write.write_all(&buf).await.context("error writing reply")?;
|
|
||||||
|
|
||||||
if requested_event_stream {
|
|
||||||
let (events_tx, events_rx) = async_channel::bounded(EVENT_STREAM_BUFFER_SIZE);
|
|
||||||
let (disconnect_tx, disconnect_rx) = async_channel::bounded(1);
|
|
||||||
|
|
||||||
// Spawn a task for the client.
|
|
||||||
let client = EventStreamClient {
|
|
||||||
events: events_rx,
|
|
||||||
disconnect: disconnect_rx,
|
|
||||||
write: Box::new(write) as _,
|
|
||||||
};
|
|
||||||
let future = async move {
|
|
||||||
if let Err(err) = handle_event_stream_client(client).await {
|
|
||||||
warn!("error handling IPC event stream client: {err:?}");
|
|
||||||
}
|
|
||||||
};
|
|
||||||
if let Err(err) = ctx.scheduler.schedule(future) {
|
|
||||||
warn!("error scheduling IPC event stream future: {err:?}");
|
|
||||||
}
|
|
||||||
|
|
||||||
// Send the initial state.
|
|
||||||
{
|
|
||||||
let state = ctx.event_stream_state.borrow();
|
|
||||||
for event in state.replicate() {
|
|
||||||
events_tx
|
|
||||||
.try_send(event)
|
|
||||||
.expect("initial event burst had more events than buffer size");
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add it to the list.
|
let request = serde_json::from_slice(&buf)
|
||||||
{
|
.context("error parsing request")
|
||||||
let mut streams = ctx.event_streams.borrow_mut();
|
.map_err(|err| err.to_string());
|
||||||
let sender = EventStreamSender {
|
let requested_error = matches!(request, Ok(Request::ReturnError));
|
||||||
events: events_tx,
|
let requested_event_stream = matches!(request, Ok(Request::EventStream));
|
||||||
disconnect: disconnect_tx,
|
|
||||||
|
let reply = match request {
|
||||||
|
Ok(request) => process(&ctx, request).await,
|
||||||
|
Err(err) => Err(err),
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Err(err) = &reply {
|
||||||
|
if !requested_error {
|
||||||
|
warn!("error processing IPC request: {err:?}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
buf.clear();
|
||||||
|
serde_json::to_writer(&mut buf, &reply).context("error formatting reply")?;
|
||||||
|
buf.push(b'\n');
|
||||||
|
write.write_all(&buf).await.context("error writing reply")?;
|
||||||
|
|
||||||
|
if requested_event_stream {
|
||||||
|
let (events_tx, events_rx) = async_channel::bounded(EVENT_STREAM_BUFFER_SIZE);
|
||||||
|
let (disconnect_tx, disconnect_rx) = async_channel::bounded(1);
|
||||||
|
|
||||||
|
// Spawn a task for the client.
|
||||||
|
let client = EventStreamClient {
|
||||||
|
events: events_rx,
|
||||||
|
disconnect: disconnect_rx,
|
||||||
|
write: Box::new(write) as _,
|
||||||
};
|
};
|
||||||
streams.push(sender);
|
let future = async move {
|
||||||
|
if let Err(err) = handle_event_stream_client(client).await {
|
||||||
|
warn!("error handling IPC event stream client: {err:?}");
|
||||||
|
}
|
||||||
|
};
|
||||||
|
if let Err(err) = ctx.scheduler.schedule(future) {
|
||||||
|
warn!("error scheduling IPC event stream future: {err:?}");
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send the initial state.
|
||||||
|
{
|
||||||
|
let state = ctx.event_stream_state.borrow();
|
||||||
|
for event in state.replicate() {
|
||||||
|
events_tx
|
||||||
|
.try_send(event)
|
||||||
|
.expect("initial event burst had more events than buffer size");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add it to the list.
|
||||||
|
{
|
||||||
|
let mut streams = ctx.event_streams.borrow_mut();
|
||||||
|
let sender = EventStreamSender {
|
||||||
|
events: events_tx,
|
||||||
|
disconnect: disconnect_tx,
|
||||||
|
};
|
||||||
|
streams.push(sender);
|
||||||
|
}
|
||||||
|
|
||||||
|
return Ok(());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn process(ctx: &ClientCtx, request: Request) -> Reply {
|
async fn process(ctx: &ClientCtx, request: Request) -> Reply {
|
||||||
|
|||||||
Reference in New Issue
Block a user