diff --git a/src/client.rs b/src/client.rs index de43899..299176e 100644 --- a/src/client.rs +++ b/src/client.rs @@ -22,13 +22,24 @@ pub fn handle_client_only_action(action: ActionClientOnly) -> Result<()> { pub fn forward_command_to_server(mut stream: UnixStream, action: opts::ActionWithServer) -> Result<()> { log::info!("Forwarding options to server"); - stream.set_nonblocking(false)?; stream - .write_all(&bincode::serialize(&action)?) + .set_nonblocking(false) + .context("Failed to set stream to non-blocking")?; + + let message_bytes = bincode::serialize(&action)?; + + stream + .write(&(message_bytes.len() as u32).to_be_bytes()) + .context("Failed to send command size header to IPC stream")?; + + stream + .write_all(&message_bytes) .context("Failed to write command to IPC stream")?; let mut buf = String::new(); - stream.set_read_timeout(Some(std::time::Duration::from_millis(100)))?; + stream + .set_read_timeout(Some(std::time::Duration::from_millis(100))) + .context("Failed to set read timeout")?; stream .read_to_string(&mut buf) .context("Error reading response from server")?; diff --git a/src/main.rs b/src/main.rs index faffbe0..48095cc 100644 --- a/src/main.rs +++ b/src/main.rs @@ -54,11 +54,13 @@ fn main() { opts::Action::WithServer(action) => { log::info!("Trying to find server process"); if let Ok(stream) = net::UnixStream::connect(&*IPC_SOCKET_PATH) { + log::info!("Connected to eww server."); client::forward_command_to_server(stream, action).context("Error while forwarding command to server")?; } else if action.needs_server_running() { println!("No eww server running"); } else { log::info!("No server running, initializing server..."); + let _ = std::fs::remove_file(&*crate::IPC_SOCKET_PATH); server::initialize_server(opts.should_detach, action)?; } } diff --git a/src/server.rs b/src/server.rs index 6ba07ac..e1a0d25 100644 --- a/src/server.rs +++ b/src/server.rs @@ -13,8 +13,6 @@ use tokio::{ }; pub fn initialize_server(should_detach: bool, action: opts::ActionWithServer) -> Result<()> { - let _ = std::fs::remove_file(&*crate::IPC_SOCKET_PATH); - if should_detach { do_detach()?; } @@ -133,9 +131,16 @@ fn init_async_part( async fn run_ipc_server(evt_send: UnboundedSender) -> Result<()> { let listener = tokio::net::UnixListener::bind(&*crate::IPC_SOCKET_PATH)?; + log::info!("IPC server initialized"); crate::loop_select_exiting! { connection = listener.accept() => match connection { - Ok((stream, _addr)) => handle_connection(stream, evt_send.clone()).await?, + Ok((stream, _addr)) => { + let evt_send = evt_send.clone(); + tokio::spawn(async move { + let result = handle_connection(stream, evt_send.clone()).await; + crate::print_result_err!("while handling IPC connection with client", result); + }); + }, Err(e) => eprintln!("Failed to connect to client: {:?}", e), } } @@ -147,8 +152,20 @@ async fn handle_connection(mut stream: tokio::net::UnixStream, evt_send: Unbound let (mut stream_read, mut stream_write) = stream.split(); let action: opts::ActionWithServer = { - let mut raw_message = Vec::::new(); - stream_read.read_to_end(&mut raw_message).await?; + let mut message_byte_length = [0u8; 4]; + stream_read + .read_exact(&mut message_byte_length) + .await + .context("Failed to read message size header in IPC message")?; + let message_byte_length = u32::from_be_bytes(message_byte_length); + let mut raw_message = Vec::::with_capacity(message_byte_length as usize); + while raw_message.len() < message_byte_length as usize { + stream_read + .read_buf(&mut raw_message) + .await + .context("Failed to read actual IPC message")?; + } + bincode::deserialize(&raw_message).context("Failed to parse client message")? }; @@ -162,7 +179,7 @@ async fn handle_connection(mut stream: tokio::net::UnixStream, evt_send: Unbound log::info!("Waiting for response for IPC client"); if let Ok(Some(response)) = tokio::time::timeout(Duration::from_millis(100), response_recv.recv()).await { let result = &stream_write.write_all(response.as_bytes()).await; - crate::print_result_err!("Sending text response to ipc client", &result); + crate::print_result_err!("sending text response to ipc client", &result); } } stream_write.shutdown().await?;