From 34876efe02e379d3b19d158c18ec4078958bc7da Mon Sep 17 00:00:00 2001 From: elkowar <5300871+elkowar@users.noreply.github.com> Date: Wed, 6 Jan 2021 19:09:08 +0100 Subject: [PATCH] Slight cleanup --- src/ipc_server.rs | 68 +++++++++++++++++++++++++++++++++++++++++++++++ src/main.rs | 1 + src/server.rs | 67 +++------------------------------------------- 3 files changed, 72 insertions(+), 64 deletions(-) create mode 100644 src/ipc_server.rs diff --git a/src/ipc_server.rs b/src/ipc_server.rs new file mode 100644 index 0000000..c9e6681 --- /dev/null +++ b/src/ipc_server.rs @@ -0,0 +1,68 @@ +use crate::{app, opts}; +use anyhow::*; +use std::time::Duration; +use tokio::{ + io::{AsyncReadExt, AsyncWriteExt}, + sync::mpsc::*, +}; + +pub async fn run_server(evt_send: UnboundedSender) -> Result<()> { + let listener = tokio::net::UnixListener::bind(&*crate::IPC_SOCKET_PATH)?; + log::info!("IPC server initialized"); + crate::loop_select_exiting! { + connection = listener.accept() => match connection { + Ok((stream, _addr)) => { + let evt_send = evt_send.clone(); + tokio::spawn(async move { + let result = handle_connection(stream, evt_send.clone()).await; + crate::print_result_err!("while handling IPC connection with client", result); + }); + }, + Err(e) => eprintln!("Failed to connect to client: {:?}", e), + } + } + Ok(()) +} + +/// Handle a single IPC connection from start to end. +async fn handle_connection(mut stream: tokio::net::UnixStream, evt_send: UnboundedSender) -> Result<()> { + let (mut stream_read, mut stream_write) = stream.split(); + + let action: opts::ActionWithServer = read_action_from_stream(&mut stream_read).await?; + + log::info!("received command from IPC: {:?}", &action); + + let (command, maybe_response_recv) = action.into_eww_command(); + + evt_send.send(command)?; + + if let Some(mut response_recv) = maybe_response_recv { + log::info!("Waiting for response for IPC client"); + if let Ok(Some(response)) = tokio::time::timeout(Duration::from_millis(100), response_recv.recv()).await { + let result = &stream_write.write_all(response.as_bytes()).await; + crate::print_result_err!("sending text response to ipc client", &result); + } + } + stream_write.shutdown().await?; + Ok(()) +} + +/// Read a single message from a unix stream, and parses it into a `ActionWithServer` +/// The format here requires the first 4 bytes to be the size of the rest of the message (in big-endian), followed by the rest of the message. +async fn read_action_from_stream(stream_read: &'_ mut tokio::net::unix::ReadHalf<'_>) -> Result { + let mut message_byte_length = [0u8; 4]; + stream_read + .read_exact(&mut message_byte_length) + .await + .context("Failed to read message size header in IPC message")?; + let message_byte_length = u32::from_be_bytes(message_byte_length); + let mut raw_message = Vec::::with_capacity(message_byte_length as usize); + while raw_message.len() < message_byte_length as usize { + stream_read + .read_buf(&mut raw_message) + .await + .context("Failed to read actual IPC message")?; + } + + bincode::deserialize(&raw_message).context("Failed to parse client message") +} diff --git a/src/main.rs b/src/main.rs index ca2712c..6ac6623 100644 --- a/src/main.rs +++ b/src/main.rs @@ -17,6 +17,7 @@ pub mod application_lifecycle; pub mod client; pub mod config; pub mod eww_state; +pub mod ipc_server; pub mod opts; pub mod script_var_handler; pub mod server; diff --git a/src/server.rs b/src/server.rs index 268e883..b8692fb 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,16 +1,12 @@ -use crate::{app, config, eww_state::*, opts, script_var_handler, try_logging_errors, util}; +use crate::{app, config, eww_state::*, ipc_server, script_var_handler, try_logging_errors, util}; use anyhow::*; use futures_util::StreamExt; use std::{ collections::HashMap, os::unix::io::AsRawFd, path::{Path, PathBuf}, - time::Duration, -}; -use tokio::{ - io::{AsyncReadExt, AsyncWriteExt}, - sync::mpsc::*, }; +use tokio::sync::mpsc::*; pub fn initialize_server() -> Result<()> { do_detach()?; @@ -85,7 +81,7 @@ fn init_async_part(config_file_path: PathBuf, scss_file_path: PathBuf, ui_send: let ipc_server_join_handle = { let ui_send = ui_send.clone(); - tokio::spawn(async move { run_ipc_server(ui_send).await }) + tokio::spawn(async move { ipc_server::run_server(ui_send).await }) }; let forward_exit_to_app_handle = { @@ -107,63 +103,6 @@ fn init_async_part(config_file_path: PathBuf, scss_file_path: PathBuf, ui_send: }); } -async fn run_ipc_server(evt_send: UnboundedSender) -> Result<()> { - let listener = tokio::net::UnixListener::bind(&*crate::IPC_SOCKET_PATH)?; - log::info!("IPC server initialized"); - crate::loop_select_exiting! { - connection = listener.accept() => match connection { - Ok((stream, _addr)) => { - let evt_send = evt_send.clone(); - tokio::spawn(async move { - let result = handle_connection(stream, evt_send.clone()).await; - crate::print_result_err!("while handling IPC connection with client", result); - }); - }, - Err(e) => eprintln!("Failed to connect to client: {:?}", e), - } - } - Ok(()) -} - -/// Handle a single IPC connection from start to end. -async fn handle_connection(mut stream: tokio::net::UnixStream, evt_send: UnboundedSender) -> Result<()> { - let (mut stream_read, mut stream_write) = stream.split(); - - let action: opts::ActionWithServer = { - let mut message_byte_length = [0u8; 4]; - stream_read - .read_exact(&mut message_byte_length) - .await - .context("Failed to read message size header in IPC message")?; - let message_byte_length = u32::from_be_bytes(message_byte_length); - let mut raw_message = Vec::::with_capacity(message_byte_length as usize); - while raw_message.len() < message_byte_length as usize { - stream_read - .read_buf(&mut raw_message) - .await - .context("Failed to read actual IPC message")?; - } - - bincode::deserialize(&raw_message).context("Failed to parse client message")? - }; - - log::info!("received command from IPC: {:?}", &action); - - let (command, maybe_response_recv) = action.into_eww_command(); - - evt_send.send(command)?; - - if let Some(mut response_recv) = maybe_response_recv { - log::info!("Waiting for response for IPC client"); - if let Ok(Some(response)) = tokio::time::timeout(Duration::from_millis(100), response_recv.recv()).await { - let result = &stream_write.write_all(response.as_bytes()).await; - crate::print_result_err!("sending text response to ipc client", &result); - } - } - stream_write.shutdown().await?; - Ok(()) -} - /// Watch configuration files for changes, sending reload events to the eww app when the files change. async fn run_filewatch>( config_file_path: P,