fix(router): gracefully handle client crashes (#1710)

* fix(router): gracefully handle client crashes

* style(comments): remove unused
This commit is contained in:
Aram Drevekenin 2022-09-02 15:30:43 +02:00 committed by GitHub
parent 93f0f783b8
commit d68d407d26
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 90 additions and 27 deletions

View file

@ -54,7 +54,7 @@ fn assert_socket(name: &str) -> bool {
match LocalSocketStream::connect(path) { match LocalSocketStream::connect(path) {
Ok(stream) => { Ok(stream) => {
let mut sender = IpcSenderWithContext::new(stream); let mut sender = IpcSenderWithContext::new(stream);
sender.send(ClientToServerMsg::ConnStatus); let _ = sender.send(ClientToServerMsg::ConnStatus);
let mut receiver: IpcReceiverWithContext<ServerToClientMsg> = sender.get_receiver(); let mut receiver: IpcReceiverWithContext<ServerToClientMsg> = sender.get_receiver();
match receiver.recv() { match receiver.recv() {
Some((ServerToClientMsg::Connected, _)) => true, Some((ServerToClientMsg::Connected, _)) => true,
@ -115,7 +115,7 @@ pub(crate) fn kill_session(name: &str) {
let path = &*ZELLIJ_SOCK_DIR.join(name); let path = &*ZELLIJ_SOCK_DIR.join(name);
match LocalSocketStream::connect(path) { match LocalSocketStream::connect(path) {
Ok(stream) => { Ok(stream) => {
IpcSenderWithContext::new(stream).send(ClientToServerMsg::KillSession); let _ = IpcSenderWithContext::new(stream).send(ClientToServerMsg::KillSession);
}, },
Err(e) => { Err(e) => {
eprintln!("Error occurred: {:?}", e); eprintln!("Error occurred: {:?}", e);

View file

@ -145,7 +145,9 @@ impl ClientOsApi for ClientOsInputOutput {
} }
fn send_to_server(&self, msg: ClientToServerMsg) { fn send_to_server(&self, msg: ClientToServerMsg) {
self.send_instructions_to_server // TODO: handle the error here, right now we silently ignore it
let _ = self
.send_instructions_to_server
.lock() .lock()
.unwrap() .unwrap()
.as_mut() .as_mut()

View file

@ -8,7 +8,7 @@ pub(crate) fn kill_session(name: &str) {
let path = &*ZELLIJ_SOCK_DIR.join(name); let path = &*ZELLIJ_SOCK_DIR.join(name);
match LocalSocketStream::connect(path) { match LocalSocketStream::connect(path) {
Ok(stream) => { Ok(stream) => {
IpcSenderWithContext::new(stream).send(ClientToServerMsg::KillSession); let _ = IpcSenderWithContext::new(stream).send(ClientToServerMsg::KillSession);
}, },
Err(e) => { Err(e) => {
eprintln!("Error occurred: {:?}", e); eprintln!("Error occurred: {:?}", e);

View file

@ -132,6 +132,16 @@ macro_rules! remove_client {
}; };
} }
macro_rules! send_to_client {
($client_id:expr, $os_input:expr, $msg:expr, $session_state:expr) => {
let send_to_client_res = $os_input.send_to_client($client_id, $msg);
if let Err(_) = send_to_client_res {
// failed to send to client, remove it
remove_client!($client_id, $os_input, $session_state);
}
};
}
#[derive(Clone, Debug, PartialEq)] #[derive(Clone, Debug, PartialEq)]
pub(crate) struct SessionState { pub(crate) struct SessionState {
clients: HashMap<ClientId, Option<Size>>, clients: HashMap<ClientId, Option<Size>>,
@ -392,15 +402,26 @@ pub fn start_server(mut os_input: Box<dyn ServerOsApi>, socket_path: PathBuf) {
Event::ModeUpdate(mode_info), Event::ModeUpdate(mode_info),
)) ))
.unwrap(); .unwrap();
os_input.send_to_client(client_id, ServerToClientMsg::SwitchToMode(mode)); send_to_client!(
client_id,
os_input,
ServerToClientMsg::SwitchToMode(mode),
session_state
);
}, },
ServerInstruction::UnblockInputThread => { ServerInstruction::UnblockInputThread => {
for client_id in session_state.read().unwrap().clients.keys() { for client_id in session_state.read().unwrap().clients.keys() {
os_input.send_to_client(*client_id, ServerToClientMsg::UnblockInputThread); send_to_client!(
*client_id,
os_input,
ServerToClientMsg::UnblockInputThread,
session_state
);
} }
}, },
ServerInstruction::ClientExit(client_id) => { ServerInstruction::ClientExit(client_id) => {
os_input.send_to_client(client_id, ServerToClientMsg::Exit(ExitReason::Normal)); let _ =
os_input.send_to_client(client_id, ServerToClientMsg::Exit(ExitReason::Normal));
remove_client!(client_id, os_input, session_state); remove_client!(client_id, os_input, session_state);
if let Some(min_size) = session_state.read().unwrap().min_client_terminal_size() { if let Some(min_size) = session_state.read().unwrap().min_client_terminal_size() {
session_data session_data
@ -465,14 +486,16 @@ pub fn start_server(mut os_input: Box<dyn ServerOsApi>, socket_path: PathBuf) {
ServerInstruction::KillSession => { ServerInstruction::KillSession => {
let client_ids = session_state.read().unwrap().client_ids(); let client_ids = session_state.read().unwrap().client_ids();
for client_id in client_ids { for client_id in client_ids {
os_input.send_to_client(client_id, ServerToClientMsg::Exit(ExitReason::Normal)); let _ = os_input
.send_to_client(client_id, ServerToClientMsg::Exit(ExitReason::Normal));
remove_client!(client_id, os_input, session_state); remove_client!(client_id, os_input, session_state);
} }
break; break;
}, },
ServerInstruction::DetachSession(client_ids) => { ServerInstruction::DetachSession(client_ids) => {
for client_id in client_ids { for client_id in client_ids {
os_input.send_to_client(client_id, ServerToClientMsg::Exit(ExitReason::Normal)); let _ = os_input
.send_to_client(client_id, ServerToClientMsg::Exit(ExitReason::Normal));
remove_client!(client_id, os_input, session_state); remove_client!(client_id, os_input, session_state);
if let Some(min_size) = session_state.read().unwrap().min_client_terminal_size() if let Some(min_size) = session_state.read().unwrap().min_client_terminal_size()
{ {
@ -509,14 +532,16 @@ pub fn start_server(mut os_input: Box<dyn ServerOsApi>, socket_path: PathBuf) {
// If `None`- Send an exit instruction. This is the case when a user closes the last Tab/Pane. // If `None`- Send an exit instruction. This is the case when a user closes the last Tab/Pane.
if let Some(output) = &serialized_output { if let Some(output) = &serialized_output {
for (client_id, client_render_instruction) in output.iter() { for (client_id, client_render_instruction) in output.iter() {
os_input.send_to_client( send_to_client!(
*client_id, *client_id,
os_input,
ServerToClientMsg::Render(client_render_instruction.clone()), ServerToClientMsg::Render(client_render_instruction.clone()),
session_state
); );
} }
} else { } else {
for client_id in client_ids { for client_id in client_ids {
os_input let _ = os_input
.send_to_client(client_id, ServerToClientMsg::Exit(ExitReason::Normal)); .send_to_client(client_id, ServerToClientMsg::Exit(ExitReason::Normal));
remove_client!(client_id, os_input, session_state); remove_client!(client_id, os_input, session_state);
} }
@ -526,7 +551,7 @@ pub fn start_server(mut os_input: Box<dyn ServerOsApi>, socket_path: PathBuf) {
ServerInstruction::Error(backtrace) => { ServerInstruction::Error(backtrace) => {
let client_ids = session_state.read().unwrap().client_ids(); let client_ids = session_state.read().unwrap().client_ids();
for client_id in client_ids { for client_id in client_ids {
os_input.send_to_client( let _ = os_input.send_to_client(
client_id, client_id,
ServerToClientMsg::Exit(ExitReason::Error(backtrace.clone())), ServerToClientMsg::Exit(ExitReason::Error(backtrace.clone())),
); );
@ -535,7 +560,7 @@ pub fn start_server(mut os_input: Box<dyn ServerOsApi>, socket_path: PathBuf) {
break; break;
}, },
ServerInstruction::ConnStatus(client_id) => { ServerInstruction::ConnStatus(client_id) => {
os_input.send_to_client(client_id, ServerToClientMsg::Connected); let _ = os_input.send_to_client(client_id, ServerToClientMsg::Connected);
remove_client!(client_id, os_input, session_state); remove_client!(client_id, os_input, session_state);
}, },
ServerInstruction::ActiveClients(client_id) => { ServerInstruction::ActiveClients(client_id) => {
@ -545,7 +570,12 @@ pub fn start_server(mut os_input: Box<dyn ServerOsApi>, socket_path: PathBuf) {
client_ids, client_ids,
client_id client_id
); );
os_input.send_to_client(client_id, ServerToClientMsg::ActiveClients(client_ids)); send_to_client!(
client_id,
os_input,
ServerToClientMsg::ActiveClients(client_ids),
session_state
);
}, },
} }
} }

View file

@ -316,7 +316,11 @@ pub trait ServerOsApi: Send + Sync {
fn force_kill(&self, pid: Pid) -> Result<(), nix::Error>; fn force_kill(&self, pid: Pid) -> Result<(), nix::Error>;
/// Returns a [`Box`] pointer to this [`ServerOsApi`] struct. /// Returns a [`Box`] pointer to this [`ServerOsApi`] struct.
fn box_clone(&self) -> Box<dyn ServerOsApi>; fn box_clone(&self) -> Box<dyn ServerOsApi>;
fn send_to_client(&self, client_id: ClientId, msg: ServerToClientMsg); fn send_to_client(
&self,
client_id: ClientId,
msg: ServerToClientMsg,
) -> Result<(), &'static str>;
fn new_client( fn new_client(
&mut self, &mut self,
client_id: ClientId, client_id: ClientId,
@ -373,9 +377,15 @@ impl ServerOsApi for ServerOsInputOutput {
let _ = kill(pid, Some(Signal::SIGKILL)); let _ = kill(pid, Some(Signal::SIGKILL));
Ok(()) Ok(())
} }
fn send_to_client(&self, client_id: ClientId, msg: ServerToClientMsg) { fn send_to_client(
&self,
client_id: ClientId,
msg: ServerToClientMsg,
) -> Result<(), &'static str> {
if let Some(sender) = self.client_senders.lock().unwrap().get_mut(&client_id) { if let Some(sender) = self.client_senders.lock().unwrap().get_mut(&client_id) {
sender.send(msg); sender.send(msg)
} else {
Ok(())
} }
} }
fn new_client( fn new_client(

View file

@ -514,10 +514,15 @@ pub(crate) fn route_thread_main(
let client_id = maybe_client_id.unwrap_or(client_id); let client_id = maybe_client_id.unwrap_or(client_id);
if let Some(rlocked_sessions) = rlocked_sessions.as_ref() { if let Some(rlocked_sessions) = rlocked_sessions.as_ref() {
if let Action::SwitchToMode(input_mode) = action { if let Action::SwitchToMode(input_mode) = action {
os_input.send_to_client( let send_res = os_input.send_to_client(
client_id, client_id,
ServerToClientMsg::SwitchToMode(input_mode), ServerToClientMsg::SwitchToMode(input_mode),
); );
if send_res.is_err() {
let _ = to_server
.send(ServerInstruction::RemoveClient(client_id));
return true;
}
} }
if route_action( if route_action(
action, action,
@ -642,7 +647,7 @@ pub(crate) fn route_thread_main(
}, },
None => { None => {
log::error!("Received empty message from client"); log::error!("Received empty message from client");
os_input.send_to_client( let _ = os_input.send_to_client(
client_id, client_id,
ServerToClientMsg::Exit(ExitReason::Error( ServerToClientMsg::Exit(ExitReason::Error(
"Received empty message".to_string(), "Received empty message".to_string(),

View file

@ -72,7 +72,11 @@ impl ServerOsApi for FakeInputOutput {
fn box_clone(&self) -> Box<dyn ServerOsApi> { fn box_clone(&self) -> Box<dyn ServerOsApi> {
Box::new((*self).clone()) Box::new((*self).clone())
} }
fn send_to_client(&self, _client_id: ClientId, _msg: ServerToClientMsg) { fn send_to_client(
&self,
_client_id: ClientId,
_msg: ServerToClientMsg,
) -> Result<(), &'static str> {
unimplemented!() unimplemented!()
} }
fn new_client( fn new_client(

View file

@ -63,7 +63,11 @@ impl ServerOsApi for FakeInputOutput {
fn box_clone(&self) -> Box<dyn ServerOsApi> { fn box_clone(&self) -> Box<dyn ServerOsApi> {
Box::new((*self).clone()) Box::new((*self).clone())
} }
fn send_to_client(&self, _client_id: ClientId, _msg: ServerToClientMsg) { fn send_to_client(
&self,
_client_id: ClientId,
_msg: ServerToClientMsg,
) -> Result<(), &'static str> {
unimplemented!() unimplemented!()
} }
fn new_client( fn new_client(

View file

@ -59,7 +59,11 @@ impl ServerOsApi for FakeInputOutput {
fn box_clone(&self) -> Box<dyn ServerOsApi> { fn box_clone(&self) -> Box<dyn ServerOsApi> {
Box::new((*self).clone()) Box::new((*self).clone())
} }
fn send_to_client(&self, _client_id: ClientId, _msg: ServerToClientMsg) { fn send_to_client(
&self,
_client_id: ClientId,
_msg: ServerToClientMsg,
) -> Result<(), &'static str> {
unimplemented!() unimplemented!()
} }
fn new_client( fn new_client(

View file

@ -156,12 +156,16 @@ impl<T: Serialize> IpcSenderWithContext<T> {
} }
/// Sends an event, along with the current [`ErrorContext`], on this [`IpcSenderWithContext`]'s socket. /// Sends an event, along with the current [`ErrorContext`], on this [`IpcSenderWithContext`]'s socket.
pub fn send(&mut self, msg: T) { pub fn send(&mut self, msg: T) -> Result<(), &'static str> {
let err_ctx = get_current_ctx(); let err_ctx = get_current_ctx();
rmp_serde::encode::write(&mut self.sender, &(msg, err_ctx)).unwrap(); if rmp_serde::encode::write(&mut self.sender, &(msg, err_ctx)).is_err() {
// TODO: unwrapping here can cause issues when the server disconnects which we don't mind Err("Failed to send message to client")
// do we need to handle errors here in other cases? } else {
let _ = self.sender.flush(); // TODO: unwrapping here can cause issues when the server disconnects which we don't mind
// do we need to handle errors here in other cases?
let _ = self.sender.flush();
Ok(())
}
} }
/// Returns an [`IpcReceiverWithContext`] with the same socket as this sender. /// Returns an [`IpcReceiverWithContext`] with the same socket as this sender.