Slight cleanup

This commit is contained in:
elkowar 2021-01-06 19:09:08 +01:00
parent 018bfee24c
commit 34876efe02
3 changed files with 72 additions and 64 deletions

68
src/ipc_server.rs Normal file
View file

@ -0,0 +1,68 @@
use crate::{app, opts};
use anyhow::*;
use std::time::Duration;
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
sync::mpsc::*,
};
pub async fn run_server(evt_send: UnboundedSender<app::EwwCommand>) -> Result<()> {
let listener = tokio::net::UnixListener::bind(&*crate::IPC_SOCKET_PATH)?;
log::info!("IPC server initialized");
crate::loop_select_exiting! {
connection = listener.accept() => match connection {
Ok((stream, _addr)) => {
let evt_send = evt_send.clone();
tokio::spawn(async move {
let result = handle_connection(stream, evt_send.clone()).await;
crate::print_result_err!("while handling IPC connection with client", result);
});
},
Err(e) => eprintln!("Failed to connect to client: {:?}", e),
}
}
Ok(())
}
/// Handle a single IPC connection from start to end.
async fn handle_connection(mut stream: tokio::net::UnixStream, evt_send: UnboundedSender<app::EwwCommand>) -> Result<()> {
let (mut stream_read, mut stream_write) = stream.split();
let action: opts::ActionWithServer = read_action_from_stream(&mut stream_read).await?;
log::info!("received command from IPC: {:?}", &action);
let (command, maybe_response_recv) = action.into_eww_command();
evt_send.send(command)?;
if let Some(mut response_recv) = maybe_response_recv {
log::info!("Waiting for response for IPC client");
if let Ok(Some(response)) = tokio::time::timeout(Duration::from_millis(100), response_recv.recv()).await {
let result = &stream_write.write_all(response.as_bytes()).await;
crate::print_result_err!("sending text response to ipc client", &result);
}
}
stream_write.shutdown().await?;
Ok(())
}
/// Read a single message from a unix stream, and parses it into a `ActionWithServer`
/// The format here requires the first 4 bytes to be the size of the rest of the message (in big-endian), followed by the rest of the message.
async fn read_action_from_stream(stream_read: &'_ mut tokio::net::unix::ReadHalf<'_>) -> Result<opts::ActionWithServer> {
let mut message_byte_length = [0u8; 4];
stream_read
.read_exact(&mut message_byte_length)
.await
.context("Failed to read message size header in IPC message")?;
let message_byte_length = u32::from_be_bytes(message_byte_length);
let mut raw_message = Vec::<u8>::with_capacity(message_byte_length as usize);
while raw_message.len() < message_byte_length as usize {
stream_read
.read_buf(&mut raw_message)
.await
.context("Failed to read actual IPC message")?;
}
bincode::deserialize(&raw_message).context("Failed to parse client message")
}

View file

@ -17,6 +17,7 @@ pub mod application_lifecycle;
pub mod client;
pub mod config;
pub mod eww_state;
pub mod ipc_server;
pub mod opts;
pub mod script_var_handler;
pub mod server;

View file

@ -1,16 +1,12 @@
use crate::{app, config, eww_state::*, opts, script_var_handler, try_logging_errors, util};
use crate::{app, config, eww_state::*, ipc_server, script_var_handler, try_logging_errors, util};
use anyhow::*;
use futures_util::StreamExt;
use std::{
collections::HashMap,
os::unix::io::AsRawFd,
path::{Path, PathBuf},
time::Duration,
};
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
sync::mpsc::*,
};
use tokio::sync::mpsc::*;
pub fn initialize_server() -> Result<()> {
do_detach()?;
@ -85,7 +81,7 @@ fn init_async_part(config_file_path: PathBuf, scss_file_path: PathBuf, ui_send:
let ipc_server_join_handle = {
let ui_send = ui_send.clone();
tokio::spawn(async move { run_ipc_server(ui_send).await })
tokio::spawn(async move { ipc_server::run_server(ui_send).await })
};
let forward_exit_to_app_handle = {
@ -107,63 +103,6 @@ fn init_async_part(config_file_path: PathBuf, scss_file_path: PathBuf, ui_send:
});
}
async fn run_ipc_server(evt_send: UnboundedSender<app::EwwCommand>) -> Result<()> {
let listener = tokio::net::UnixListener::bind(&*crate::IPC_SOCKET_PATH)?;
log::info!("IPC server initialized");
crate::loop_select_exiting! {
connection = listener.accept() => match connection {
Ok((stream, _addr)) => {
let evt_send = evt_send.clone();
tokio::spawn(async move {
let result = handle_connection(stream, evt_send.clone()).await;
crate::print_result_err!("while handling IPC connection with client", result);
});
},
Err(e) => eprintln!("Failed to connect to client: {:?}", e),
}
}
Ok(())
}
/// Handle a single IPC connection from start to end.
async fn handle_connection(mut stream: tokio::net::UnixStream, evt_send: UnboundedSender<app::EwwCommand>) -> Result<()> {
let (mut stream_read, mut stream_write) = stream.split();
let action: opts::ActionWithServer = {
let mut message_byte_length = [0u8; 4];
stream_read
.read_exact(&mut message_byte_length)
.await
.context("Failed to read message size header in IPC message")?;
let message_byte_length = u32::from_be_bytes(message_byte_length);
let mut raw_message = Vec::<u8>::with_capacity(message_byte_length as usize);
while raw_message.len() < message_byte_length as usize {
stream_read
.read_buf(&mut raw_message)
.await
.context("Failed to read actual IPC message")?;
}
bincode::deserialize(&raw_message).context("Failed to parse client message")?
};
log::info!("received command from IPC: {:?}", &action);
let (command, maybe_response_recv) = action.into_eww_command();
evt_send.send(command)?;
if let Some(mut response_recv) = maybe_response_recv {
log::info!("Waiting for response for IPC client");
if let Ok(Some(response)) = tokio::time::timeout(Duration::from_millis(100), response_recv.recv()).await {
let result = &stream_write.write_all(response.as_bytes()).await;
crate::print_result_err!("sending text response to ipc client", &result);
}
}
stream_write.shutdown().await?;
Ok(())
}
/// Watch configuration files for changes, sending reload events to the eww app when the files change.
async fn run_filewatch<P: AsRef<Path>>(
config_file_path: P,