fix(router): retry messages when server is not ready (#1651)

* fix(router): retry messages when server is not ready

* style(fmt): rustfmt
This commit is contained in:
Aram Drevekenin 2022-08-11 14:35:15 +02:00 committed by GitHub
parent dba5dcbd83
commit e910db9bad
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -445,6 +445,22 @@ fn route_action(
should_break should_break
} }
macro_rules! send_to_screen_or_retry_queue {
($rlocked_sessions:expr, $message:expr, $instruction: expr, $retry_queue:expr) => {{
match $rlocked_sessions.as_ref() {
Some(session_metadata) => {
session_metadata.senders.send_to_screen($message).unwrap();
},
None => {
log::warn!("Server not ready, trying to place instruction in retry queue...");
if let Some(retry_queue) = $retry_queue.as_mut() {
retry_queue.push($instruction);
}
},
}
}};
}
pub(crate) fn route_thread_main( pub(crate) fn route_thread_main(
session_data: Arc<RwLock<Option<SessionMetaData>>>, session_data: Arc<RwLock<Option<SessionMetaData>>>,
session_state: Arc<RwLock<SessionState>>, session_state: Arc<RwLock<SessionState>>,
@ -453,132 +469,148 @@ pub(crate) fn route_thread_main(
mut receiver: IpcReceiverWithContext<ClientToServerMsg>, mut receiver: IpcReceiverWithContext<ClientToServerMsg>,
client_id: ClientId, client_id: ClientId,
) { ) {
loop { let mut retry_queue = vec![];
'route_loop: loop {
match receiver.recv() { match receiver.recv() {
Some((instruction, err_ctx)) => { Some((instruction, err_ctx)) => {
err_ctx.update_thread_ctx(); err_ctx.update_thread_ctx();
let rlocked_sessions = session_data.read().unwrap(); let rlocked_sessions = session_data.read().unwrap();
let handle_instruction = |instruction: ClientToServerMsg,
match instruction { mut retry_queue: Option<&mut Vec<ClientToServerMsg>>|
ClientToServerMsg::Action(action, maybe_client_id) => { -> bool {
let client_id = maybe_client_id.unwrap_or(client_id); let mut should_break = false;
if let Some(rlocked_sessions) = rlocked_sessions.as_ref() { match instruction {
if let Action::SwitchToMode(input_mode) = action { ClientToServerMsg::Action(action, maybe_client_id) => {
os_input.send_to_client( let client_id = maybe_client_id.unwrap_or(client_id);
if let Some(rlocked_sessions) = rlocked_sessions.as_ref() {
if let Action::SwitchToMode(input_mode) = action {
os_input.send_to_client(
client_id,
ServerToClientMsg::SwitchToMode(input_mode),
);
}
if route_action(
action,
rlocked_sessions,
&*os_input,
&to_server,
client_id, client_id,
ServerToClientMsg::SwitchToMode(input_mode), ) {
); should_break = true;
}
} }
if route_action( },
action, ClientToServerMsg::TerminalResize(new_size) => {
session_state
.write()
.unwrap()
.set_client_size(client_id, new_size);
let min_size = session_state
.read()
.unwrap()
.min_client_terminal_size()
.unwrap();
rlocked_sessions
.as_ref()
.unwrap()
.senders
.send_to_screen(ScreenInstruction::TerminalResize(min_size))
.unwrap();
},
ClientToServerMsg::TerminalPixelDimensions(pixel_dimensions) => {
// this is experimental, please be cautious implementing this elsewhere
send_to_screen_or_retry_queue!(
rlocked_sessions, rlocked_sessions,
&*os_input, ScreenInstruction::TerminalPixelDimensions(pixel_dimensions),
&to_server, instruction,
client_id, retry_queue
) { );
break; },
} ClientToServerMsg::BackgroundColor(background_color_instruction) => {
} rlocked_sessions
}, .as_ref()
ClientToServerMsg::TerminalResize(new_size) => { .unwrap()
session_state .senders
.write() .send_to_screen(ScreenInstruction::TerminalBackgroundColor(
.unwrap() background_color_instruction,
.set_client_size(client_id, new_size); ))
let min_size = session_state .unwrap();
.read() },
.unwrap() ClientToServerMsg::ForegroundColor(foreground_color_instruction) => {
.min_client_terminal_size() rlocked_sessions
.unwrap(); .as_ref()
rlocked_sessions .unwrap()
.as_ref() .senders
.unwrap() .send_to_screen(ScreenInstruction::TerminalForegroundColor(
.senders foreground_color_instruction,
.send_to_screen(ScreenInstruction::TerminalResize(min_size)) ))
.unwrap(); .unwrap();
}, },
ClientToServerMsg::TerminalPixelDimensions(pixel_dimensions) => { ClientToServerMsg::ColorRegisters(color_registers) => {
rlocked_sessions rlocked_sessions
.as_ref() .as_ref()
.unwrap() .unwrap()
.senders .senders
.send_to_screen(ScreenInstruction::TerminalPixelDimensions( .send_to_screen(ScreenInstruction::TerminalColorRegisters(
pixel_dimensions, color_registers,
)) ))
.unwrap(); .unwrap();
}, },
ClientToServerMsg::BackgroundColor(background_color_instruction) => { ClientToServerMsg::NewClient(
rlocked_sessions
.as_ref()
.unwrap()
.senders
.send_to_screen(ScreenInstruction::TerminalBackgroundColor(
background_color_instruction,
))
.unwrap();
},
ClientToServerMsg::ForegroundColor(foreground_color_instruction) => {
rlocked_sessions
.as_ref()
.unwrap()
.senders
.send_to_screen(ScreenInstruction::TerminalForegroundColor(
foreground_color_instruction,
))
.unwrap();
},
ClientToServerMsg::ColorRegisters(color_registers) => {
rlocked_sessions
.as_ref()
.unwrap()
.senders
.send_to_screen(ScreenInstruction::TerminalColorRegisters(
color_registers,
))
.unwrap();
},
ClientToServerMsg::NewClient(
client_attributes,
cli_args,
opts,
layout,
plugin_config,
) => {
let new_client_instruction = ServerInstruction::NewClient(
client_attributes, client_attributes,
cli_args, cli_args,
opts, opts,
layout, layout,
client_id,
plugin_config, plugin_config,
); ) => {
to_server.send(new_client_instruction).unwrap(); let new_client_instruction = ServerInstruction::NewClient(
}, client_attributes,
ClientToServerMsg::AttachClient(client_attributes, opts) => { cli_args,
let attach_client_instruction = opts,
ServerInstruction::AttachClient(client_attributes, opts, client_id); layout,
to_server.send(attach_client_instruction).unwrap(); client_id,
}, plugin_config,
ClientToServerMsg::ClientExited => { );
// we don't unwrap this because we don't really care if there's an error here (eg. to_server.send(new_client_instruction).unwrap();
// if the main server thread exited before this router thread did) },
let _ = to_server.send(ServerInstruction::RemoveClient(client_id)); ClientToServerMsg::AttachClient(client_attributes, opts) => {
break; let attach_client_instruction =
}, ServerInstruction::AttachClient(client_attributes, opts, client_id);
ClientToServerMsg::KillSession => { to_server.send(attach_client_instruction).unwrap();
to_server.send(ServerInstruction::KillSession).unwrap(); },
}, ClientToServerMsg::ClientExited => {
ClientToServerMsg::ConnStatus => { // we don't unwrap this because we don't really care if there's an error here (eg.
let _ = to_server.send(ServerInstruction::ConnStatus(client_id)); // if the main server thread exited before this router thread did)
break; let _ = to_server.send(ServerInstruction::RemoveClient(client_id));
}, return true;
ClientToServerMsg::DetachSession(client_id) => { },
let _ = to_server.send(ServerInstruction::DetachSession(client_id)); ClientToServerMsg::KillSession => {
break; to_server.send(ServerInstruction::KillSession).unwrap();
}, },
ClientToServerMsg::ListClients => { ClientToServerMsg::ConnStatus => {
let _ = to_server.send(ServerInstruction::ActiveClients(client_id)); let _ = to_server.send(ServerInstruction::ConnStatus(client_id));
}, should_break = true;
},
ClientToServerMsg::DetachSession(client_id) => {
let _ = to_server.send(ServerInstruction::DetachSession(client_id));
should_break = true;
},
ClientToServerMsg::ListClients => {
let _ = to_server.send(ServerInstruction::ActiveClients(client_id));
},
}
should_break
};
for instruction_to_retry in retry_queue.drain(..) {
log::warn!("Server ready, retrying sending instruction.");
let should_break = handle_instruction(instruction_to_retry, None);
if should_break {
break 'route_loop;
}
}
let should_break = handle_instruction(instruction, Some(&mut retry_queue));
if should_break {
break 'route_loop;
} }
}, },
None => { None => {