Fix IPC streams not working correctly

This commit is contained in:
elkowar 2021-01-03 12:50:37 +01:00 committed by ElKowar
parent 05d9320598
commit 6df15379bb
3 changed files with 39 additions and 9 deletions

View file

@ -22,13 +22,24 @@ pub fn handle_client_only_action(action: ActionClientOnly) -> Result<()> {
pub fn forward_command_to_server(mut stream: UnixStream, action: opts::ActionWithServer) -> Result<()> { pub fn forward_command_to_server(mut stream: UnixStream, action: opts::ActionWithServer) -> Result<()> {
log::info!("Forwarding options to server"); log::info!("Forwarding options to server");
stream.set_nonblocking(false)?;
stream stream
.write_all(&bincode::serialize(&action)?) .set_nonblocking(false)
.context("Failed to set stream to non-blocking")?;
let message_bytes = bincode::serialize(&action)?;
stream
.write(&(message_bytes.len() as u32).to_be_bytes())
.context("Failed to send command size header to IPC stream")?;
stream
.write_all(&message_bytes)
.context("Failed to write command to IPC stream")?; .context("Failed to write command to IPC stream")?;
let mut buf = String::new(); let mut buf = String::new();
stream.set_read_timeout(Some(std::time::Duration::from_millis(100)))?; stream
.set_read_timeout(Some(std::time::Duration::from_millis(100)))
.context("Failed to set read timeout")?;
stream stream
.read_to_string(&mut buf) .read_to_string(&mut buf)
.context("Error reading response from server")?; .context("Error reading response from server")?;

View file

@ -54,11 +54,13 @@ fn main() {
opts::Action::WithServer(action) => { opts::Action::WithServer(action) => {
log::info!("Trying to find server process"); log::info!("Trying to find server process");
if let Ok(stream) = net::UnixStream::connect(&*IPC_SOCKET_PATH) { if let Ok(stream) = net::UnixStream::connect(&*IPC_SOCKET_PATH) {
log::info!("Connected to eww server.");
client::forward_command_to_server(stream, action).context("Error while forwarding command to server")?; client::forward_command_to_server(stream, action).context("Error while forwarding command to server")?;
} else if action.needs_server_running() { } else if action.needs_server_running() {
println!("No eww server running"); println!("No eww server running");
} else { } else {
log::info!("No server running, initializing server..."); log::info!("No server running, initializing server...");
let _ = std::fs::remove_file(&*crate::IPC_SOCKET_PATH);
server::initialize_server(opts.should_detach, action)?; server::initialize_server(opts.should_detach, action)?;
} }
} }

View file

@ -13,8 +13,6 @@ use tokio::{
}; };
pub fn initialize_server(should_detach: bool, action: opts::ActionWithServer) -> Result<()> { pub fn initialize_server(should_detach: bool, action: opts::ActionWithServer) -> Result<()> {
let _ = std::fs::remove_file(&*crate::IPC_SOCKET_PATH);
if should_detach { if should_detach {
do_detach()?; do_detach()?;
} }
@ -133,9 +131,16 @@ fn init_async_part(
async fn run_ipc_server(evt_send: UnboundedSender<app::EwwCommand>) -> Result<()> { async fn run_ipc_server(evt_send: UnboundedSender<app::EwwCommand>) -> Result<()> {
let listener = tokio::net::UnixListener::bind(&*crate::IPC_SOCKET_PATH)?; let listener = tokio::net::UnixListener::bind(&*crate::IPC_SOCKET_PATH)?;
log::info!("IPC server initialized");
crate::loop_select_exiting! { crate::loop_select_exiting! {
connection = listener.accept() => match connection { connection = listener.accept() => match connection {
Ok((stream, _addr)) => handle_connection(stream, evt_send.clone()).await?, Ok((stream, _addr)) => {
let evt_send = evt_send.clone();
tokio::spawn(async move {
let result = handle_connection(stream, evt_send.clone()).await;
crate::print_result_err!("while handling IPC connection with client", result);
});
},
Err(e) => eprintln!("Failed to connect to client: {:?}", e), Err(e) => eprintln!("Failed to connect to client: {:?}", e),
} }
} }
@ -147,8 +152,20 @@ async fn handle_connection(mut stream: tokio::net::UnixStream, evt_send: Unbound
let (mut stream_read, mut stream_write) = stream.split(); let (mut stream_read, mut stream_write) = stream.split();
let action: opts::ActionWithServer = { let action: opts::ActionWithServer = {
let mut raw_message = Vec::<u8>::new(); let mut message_byte_length = [0u8; 4];
stream_read.read_to_end(&mut raw_message).await?; stream_read
.read_exact(&mut message_byte_length)
.await
.context("Failed to read message size header in IPC message")?;
let message_byte_length = u32::from_be_bytes(message_byte_length);
let mut raw_message = Vec::<u8>::with_capacity(message_byte_length as usize);
while raw_message.len() < message_byte_length as usize {
stream_read
.read_buf(&mut raw_message)
.await
.context("Failed to read actual IPC message")?;
}
bincode::deserialize(&raw_message).context("Failed to parse client message")? bincode::deserialize(&raw_message).context("Failed to parse client message")?
}; };
@ -162,7 +179,7 @@ async fn handle_connection(mut stream: tokio::net::UnixStream, evt_send: Unbound
log::info!("Waiting for response for IPC client"); log::info!("Waiting for response for IPC client");
if let Ok(Some(response)) = tokio::time::timeout(Duration::from_millis(100), response_recv.recv()).await { if let Ok(Some(response)) = tokio::time::timeout(Duration::from_millis(100), response_recv.recv()).await {
let result = &stream_write.write_all(response.as_bytes()).await; let result = &stream_write.write_all(response.as_bytes()).await;
crate::print_result_err!("Sending text response to ipc client", &result); crate::print_result_err!("sending text response to ipc client", &result);
} }
} }
stream_write.shutdown().await?; stream_write.shutdown().await?;