fix(router): deadlock when unblocking input thread (#3281)

This commit is contained in:
Aram Drevekenin 2024-04-19 18:29:49 +02:00 committed by GitHub
parent 07dddc60fc
commit b02e026e35
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -264,6 +264,9 @@ impl SessionState {
pub fn client_ids(&self) -> Vec<ClientId> { pub fn client_ids(&self) -> Vec<ClientId> {
self.clients.keys().copied().collect() self.clients.keys().copied().collect()
} }
pub fn get_pipe(&self, pipe_name: &str) -> Option<ClientId> {
self.pipes.get(pipe_name).copied()
}
pub fn active_clients_are_connected(&self) -> bool { pub fn active_clients_are_connected(&self) -> bool {
let ids_of_pipe_clients: HashSet<ClientId> = self.pipes.values().copied().collect(); let ids_of_pipe_clients: HashSet<ClientId> = self.pipes.values().copied().collect();
let mut active_clients_connected = false; let mut active_clients_connected = false;
@ -516,9 +519,10 @@ pub fn start_server(mut os_input: Box<dyn ServerOsApi>, socket_path: PathBuf) {
); );
}, },
ServerInstruction::UnblockInputThread => { ServerInstruction::UnblockInputThread => {
for client_id in session_state.read().unwrap().clients.keys() { let client_ids = session_state.read().unwrap().client_ids();
for client_id in client_ids {
send_to_client!( send_to_client!(
*client_id, client_id,
os_input, os_input,
ServerToClientMsg::UnblockInputThread, ServerToClientMsg::UnblockInputThread,
session_state session_state
@ -526,10 +530,11 @@ pub fn start_server(mut os_input: Box<dyn ServerOsApi>, socket_path: PathBuf) {
} }
}, },
ServerInstruction::UnblockCliPipeInput(pipe_name) => { ServerInstruction::UnblockCliPipeInput(pipe_name) => {
match session_state.read().unwrap().pipes.get(&pipe_name) { let pipe = session_state.read().unwrap().get_pipe(&pipe_name);
match pipe {
Some(client_id) => { Some(client_id) => {
send_to_client!( send_to_client!(
*client_id, client_id,
os_input, os_input,
ServerToClientMsg::UnblockCliPipeInput(pipe_name.clone()), ServerToClientMsg::UnblockCliPipeInput(pipe_name.clone()),
session_state session_state
@ -537,9 +542,10 @@ pub fn start_server(mut os_input: Box<dyn ServerOsApi>, socket_path: PathBuf) {
}, },
None => { None => {
// send to all clients, this pipe might not have been associated yet // send to all clients, this pipe might not have been associated yet
for client_id in session_state.read().unwrap().clients.keys() { let client_ids = session_state.read().unwrap().client_ids();
for client_id in client_ids {
send_to_client!( send_to_client!(
*client_id, client_id,
os_input, os_input,
ServerToClientMsg::UnblockCliPipeInput(pipe_name.clone()), ServerToClientMsg::UnblockCliPipeInput(pipe_name.clone()),
session_state session_state
@ -549,10 +555,11 @@ pub fn start_server(mut os_input: Box<dyn ServerOsApi>, socket_path: PathBuf) {
} }
}, },
ServerInstruction::CliPipeOutput(pipe_name, output) => { ServerInstruction::CliPipeOutput(pipe_name, output) => {
match session_state.read().unwrap().pipes.get(&pipe_name) { let pipe = session_state.read().unwrap().get_pipe(&pipe_name);
match pipe {
Some(client_id) => { Some(client_id) => {
send_to_client!( send_to_client!(
*client_id, client_id,
os_input, os_input,
ServerToClientMsg::CliPipeOutput(pipe_name.clone(), output.clone()), ServerToClientMsg::CliPipeOutput(pipe_name.clone(), output.clone()),
session_state session_state
@ -560,9 +567,10 @@ pub fn start_server(mut os_input: Box<dyn ServerOsApi>, socket_path: PathBuf) {
}, },
None => { None => {
// send to all clients, this pipe might not have been associated yet // send to all clients, this pipe might not have been associated yet
for client_id in session_state.read().unwrap().clients.keys() { let client_ids = session_state.read().unwrap().client_ids();
for client_id in client_ids {
send_to_client!( send_to_client!(
*client_id, client_id,
os_input, os_input,
ServerToClientMsg::CliPipeOutput(pipe_name.clone(), output.clone()), ServerToClientMsg::CliPipeOutput(pipe_name.clone(), output.clone()),
session_state session_state