fix(ipc): empty ipc msg crash (#1351)
* fix(ipc): recover from corrupted channel state * style(fmt): rustfmt
This commit is contained in:
parent
90da35f4e6
commit
7ba49658f7
6 changed files with 116 additions and 85 deletions
|
|
@ -58,10 +58,13 @@ fn assert_socket(name: &str) -> bool {
|
||||||
Ok(stream) => {
|
Ok(stream) => {
|
||||||
let mut sender = IpcSenderWithContext::new(stream);
|
let mut sender = IpcSenderWithContext::new(stream);
|
||||||
sender.send(ClientToServerMsg::ConnStatus);
|
sender.send(ClientToServerMsg::ConnStatus);
|
||||||
|
|
||||||
let mut receiver: IpcReceiverWithContext<ServerToClientMsg> = sender.get_receiver();
|
let mut receiver: IpcReceiverWithContext<ServerToClientMsg> = sender.get_receiver();
|
||||||
let (instruction, _) = receiver.recv();
|
match receiver.recv() {
|
||||||
matches!(instruction, ServerToClientMsg::Connected)
|
Some((instruction, _)) => {
|
||||||
|
matches!(instruction, ServerToClientMsg::Connected)
|
||||||
|
}
|
||||||
|
None => false,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
Err(e) if e.kind() == io::ErrorKind::ConnectionRefused => {
|
Err(e) if e.kind() == io::ErrorKind::ConnectionRefused => {
|
||||||
drop(fs::remove_file(path));
|
drop(fs::remove_file(path));
|
||||||
|
|
|
||||||
|
|
@ -270,14 +270,23 @@ pub fn start_client(
|
||||||
let os_input = os_input.clone();
|
let os_input = os_input.clone();
|
||||||
let mut should_break = false;
|
let mut should_break = false;
|
||||||
move || loop {
|
move || loop {
|
||||||
let (instruction, err_ctx) = os_input.recv_from_server();
|
match os_input.recv_from_server() {
|
||||||
err_ctx.update_thread_ctx();
|
Some((instruction, err_ctx)) => {
|
||||||
if let ServerToClientMsg::Exit(_) = instruction {
|
err_ctx.update_thread_ctx();
|
||||||
should_break = true;
|
if let ServerToClientMsg::Exit(_) = instruction {
|
||||||
}
|
should_break = true;
|
||||||
send_client_instructions.send(instruction.into()).unwrap();
|
}
|
||||||
if should_break {
|
send_client_instructions.send(instruction.into()).unwrap();
|
||||||
break;
|
if should_break {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
send_client_instructions
|
||||||
|
.send(ClientInstruction::UnblockInputThread)
|
||||||
|
.unwrap();
|
||||||
|
log::error!("Received empty message from server");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
|
||||||
|
|
@ -93,7 +93,7 @@ pub trait ClientOsApi: Send + Sync {
|
||||||
fn send_to_server(&self, msg: ClientToServerMsg);
|
fn send_to_server(&self, msg: ClientToServerMsg);
|
||||||
/// Receives a message on client-side IPC channel
|
/// Receives a message on client-side IPC channel
|
||||||
// This should be called from the client-side router thread only.
|
// This should be called from the client-side router thread only.
|
||||||
fn recv_from_server(&self) -> (ServerToClientMsg, ErrorContext);
|
fn recv_from_server(&self) -> Option<(ServerToClientMsg, ErrorContext)>;
|
||||||
fn handle_signals(&self, sigwinch_cb: Box<dyn Fn()>, quit_cb: Box<dyn Fn()>);
|
fn handle_signals(&self, sigwinch_cb: Box<dyn Fn()>, quit_cb: Box<dyn Fn()>);
|
||||||
/// Establish a connection with the server socket.
|
/// Establish a connection with the server socket.
|
||||||
fn connect_to_server(&self, path: &Path);
|
fn connect_to_server(&self, path: &Path);
|
||||||
|
|
@ -144,7 +144,7 @@ impl ClientOsApi for ClientOsInputOutput {
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.send(msg);
|
.send(msg);
|
||||||
}
|
}
|
||||||
fn recv_from_server(&self) -> (ServerToClientMsg, ErrorContext) {
|
fn recv_from_server(&self) -> Option<(ServerToClientMsg, ErrorContext)> {
|
||||||
self.receive_instructions_from_server
|
self.receive_instructions_from_server
|
||||||
.lock()
|
.lock()
|
||||||
.unwrap()
|
.unwrap()
|
||||||
|
|
|
||||||
|
|
@ -151,7 +151,7 @@ impl ClientOsApi for FakeClientOsApi {
|
||||||
command_is_executing.unblock_input_thread();
|
command_is_executing.unblock_input_thread();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
fn recv_from_server(&self) -> (ServerToClientMsg, ErrorContext) {
|
fn recv_from_server(&self) -> Option<(ServerToClientMsg, ErrorContext)> {
|
||||||
unimplemented!()
|
unimplemented!()
|
||||||
}
|
}
|
||||||
fn handle_signals(&self, _sigwinch_cb: Box<dyn Fn()>, _quit_cb: Box<dyn Fn()>) {
|
fn handle_signals(&self, _sigwinch_cb: Box<dyn Fn()>, _quit_cb: Box<dyn Fn()>) {
|
||||||
|
|
|
||||||
|
|
@ -397,81 +397,97 @@ pub(crate) fn route_thread_main(
|
||||||
client_id: ClientId,
|
client_id: ClientId,
|
||||||
) {
|
) {
|
||||||
loop {
|
loop {
|
||||||
let (instruction, err_ctx) = receiver.recv();
|
match receiver.recv() {
|
||||||
err_ctx.update_thread_ctx();
|
Some((instruction, err_ctx)) => {
|
||||||
let rlocked_sessions = session_data.read().unwrap();
|
err_ctx.update_thread_ctx();
|
||||||
|
let rlocked_sessions = session_data.read().unwrap();
|
||||||
|
|
||||||
match instruction {
|
match instruction {
|
||||||
ClientToServerMsg::Action(action) => {
|
ClientToServerMsg::Action(action) => {
|
||||||
if let Some(rlocked_sessions) = rlocked_sessions.as_ref() {
|
if let Some(rlocked_sessions) = rlocked_sessions.as_ref() {
|
||||||
if let Action::SwitchToMode(input_mode) = action {
|
if let Action::SwitchToMode(input_mode) = action {
|
||||||
os_input
|
os_input.send_to_client(
|
||||||
.send_to_client(client_id, ServerToClientMsg::SwitchToMode(input_mode));
|
client_id,
|
||||||
|
ServerToClientMsg::SwitchToMode(input_mode),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
if route_action(
|
||||||
|
action,
|
||||||
|
rlocked_sessions,
|
||||||
|
&*os_input,
|
||||||
|
&to_server,
|
||||||
|
client_id,
|
||||||
|
) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if route_action(action, rlocked_sessions, &*os_input, &to_server, client_id) {
|
ClientToServerMsg::TerminalResize(new_size) => {
|
||||||
|
session_state
|
||||||
|
.write()
|
||||||
|
.unwrap()
|
||||||
|
.set_client_size(client_id, new_size);
|
||||||
|
let min_size = session_state
|
||||||
|
.read()
|
||||||
|
.unwrap()
|
||||||
|
.min_client_terminal_size()
|
||||||
|
.unwrap();
|
||||||
|
rlocked_sessions
|
||||||
|
.as_ref()
|
||||||
|
.unwrap()
|
||||||
|
.senders
|
||||||
|
.send_to_screen(ScreenInstruction::TerminalResize(min_size))
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
ClientToServerMsg::TerminalPixelDimensions(pixel_dimensions) => {
|
||||||
|
rlocked_sessions
|
||||||
|
.as_ref()
|
||||||
|
.unwrap()
|
||||||
|
.senders
|
||||||
|
.send_to_screen(ScreenInstruction::TerminalPixelDimensions(
|
||||||
|
pixel_dimensions,
|
||||||
|
))
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
ClientToServerMsg::NewClient(
|
||||||
|
client_attributes,
|
||||||
|
cli_args,
|
||||||
|
opts,
|
||||||
|
layout,
|
||||||
|
plugin_config,
|
||||||
|
) => {
|
||||||
|
let new_client_instruction = ServerInstruction::NewClient(
|
||||||
|
client_attributes,
|
||||||
|
cli_args,
|
||||||
|
opts,
|
||||||
|
layout,
|
||||||
|
client_id,
|
||||||
|
plugin_config,
|
||||||
|
);
|
||||||
|
to_server.send(new_client_instruction).unwrap();
|
||||||
|
}
|
||||||
|
ClientToServerMsg::AttachClient(client_attributes, opts) => {
|
||||||
|
let attach_client_instruction =
|
||||||
|
ServerInstruction::AttachClient(client_attributes, opts, client_id);
|
||||||
|
to_server.send(attach_client_instruction).unwrap();
|
||||||
|
}
|
||||||
|
ClientToServerMsg::ClientExited => {
|
||||||
|
// we don't unwrap this because we don't really care if there's an error here (eg.
|
||||||
|
// if the main server thread exited before this router thread did)
|
||||||
|
let _ = to_server.send(ServerInstruction::RemoveClient(client_id));
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
ClientToServerMsg::KillSession => {
|
||||||
|
to_server.send(ServerInstruction::KillSession).unwrap();
|
||||||
|
}
|
||||||
|
ClientToServerMsg::ConnStatus => {
|
||||||
|
let _ = to_server.send(ServerInstruction::ConnStatus(client_id));
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
ClientToServerMsg::TerminalResize(new_size) => {
|
None => {
|
||||||
session_state
|
log::error!("Received empty message from client");
|
||||||
.write()
|
|
||||||
.unwrap()
|
|
||||||
.set_client_size(client_id, new_size);
|
|
||||||
let min_size = session_state
|
|
||||||
.read()
|
|
||||||
.unwrap()
|
|
||||||
.min_client_terminal_size()
|
|
||||||
.unwrap();
|
|
||||||
rlocked_sessions
|
|
||||||
.as_ref()
|
|
||||||
.unwrap()
|
|
||||||
.senders
|
|
||||||
.send_to_screen(ScreenInstruction::TerminalResize(min_size))
|
|
||||||
.unwrap();
|
|
||||||
}
|
|
||||||
ClientToServerMsg::TerminalPixelDimensions(pixel_dimensions) => {
|
|
||||||
rlocked_sessions
|
|
||||||
.as_ref()
|
|
||||||
.unwrap()
|
|
||||||
.senders
|
|
||||||
.send_to_screen(ScreenInstruction::TerminalPixelDimensions(pixel_dimensions))
|
|
||||||
.unwrap();
|
|
||||||
}
|
|
||||||
ClientToServerMsg::NewClient(
|
|
||||||
client_attributes,
|
|
||||||
cli_args,
|
|
||||||
opts,
|
|
||||||
layout,
|
|
||||||
plugin_config,
|
|
||||||
) => {
|
|
||||||
let new_client_instruction = ServerInstruction::NewClient(
|
|
||||||
client_attributes,
|
|
||||||
cli_args,
|
|
||||||
opts,
|
|
||||||
layout,
|
|
||||||
client_id,
|
|
||||||
plugin_config,
|
|
||||||
);
|
|
||||||
to_server.send(new_client_instruction).unwrap();
|
|
||||||
}
|
|
||||||
ClientToServerMsg::AttachClient(client_attributes, opts) => {
|
|
||||||
let attach_client_instruction =
|
|
||||||
ServerInstruction::AttachClient(client_attributes, opts, client_id);
|
|
||||||
to_server.send(attach_client_instruction).unwrap();
|
|
||||||
}
|
|
||||||
ClientToServerMsg::ClientExited => {
|
|
||||||
// we don't unwrap this because we don't really care if there's an error here (eg.
|
|
||||||
// if the main server thread exited before this router thread did)
|
|
||||||
let _ = to_server.send(ServerInstruction::RemoveClient(client_id));
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
ClientToServerMsg::KillSession => {
|
|
||||||
to_server.send(ServerInstruction::KillSession).unwrap();
|
|
||||||
}
|
|
||||||
ClientToServerMsg::ConnStatus => {
|
|
||||||
let _ = to_server.send(ServerInstruction::ConnStatus(client_id));
|
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -184,8 +184,11 @@ where
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Receives an event, along with the current [`ErrorContext`], on this [`IpcReceiverWithContext`]'s socket.
|
/// Receives an event, along with the current [`ErrorContext`], on this [`IpcReceiverWithContext`]'s socket.
|
||||||
pub fn recv(&mut self) -> (T, ErrorContext) {
|
pub fn recv(&mut self) -> Option<(T, ErrorContext)> {
|
||||||
bincode::deserialize_from(&mut self.receiver).unwrap()
|
match bincode::deserialize_from(&mut self.receiver) {
|
||||||
|
Ok(msg) => Some(msg),
|
||||||
|
Err(_) => None,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns an [`IpcSenderWithContext`] with the same socket as this receiver.
|
/// Returns an [`IpcSenderWithContext`] with the same socket as this receiver.
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue