From e30ec5745ef1f5526a05be1f49fed7b406558805 Mon Sep 17 00:00:00 2001 From: Kunal Mohan Date: Wed, 24 Mar 2021 15:34:53 +0530 Subject: [PATCH] Add router thread on server side as well --- src/common/errors.rs | 2 + src/common/mod.rs | 30 ++------ src/common/os_input_output.rs | 11 +-- src/common/pty_bus.rs | 131 +++++++++++++++++++++------------- src/main.rs | 2 +- src/server/mod.rs | 107 +++++++++++++++++++++------ src/tests/fakes.rs | 9 ++- 7 files changed, 183 insertions(+), 109 deletions(-) diff --git a/src/common/errors.rs b/src/common/errors.rs index 83527448..37729fbb 100644 --- a/src/common/errors.rs +++ b/src/common/errors.rs @@ -387,6 +387,7 @@ pub enum ServerContext { OsApi, DoneClosingPane, ClosePluginPane, + ClientExit, Exit, } @@ -403,6 +404,7 @@ impl From<&ServerInstruction> for ServerContext { ServerInstruction::OsApi(_) => ServerContext::OsApi, ServerInstruction::DoneClosingPane => ServerContext::DoneClosingPane, ServerInstruction::ClosePluginPane(_) => ServerContext::ClosePluginPane, + ServerInstruction::ClientExit => ServerContext::ClientExit, ServerInstruction::Exit => ServerContext::Exit, } } diff --git a/src/common/mod.rs b/src/common/mod.rs index cca8b0db..ef9fc47f 100644 --- a/src/common/mod.rs +++ b/src/common/mod.rs @@ -26,7 +26,7 @@ use wasmer_wasi::{Pipe, WasiState}; use crate::cli::CliArgs; use crate::layout::Layout; -use crate::server::start_server; +use crate::server::{start_server, ServerInstruction}; use command_is_executing::CommandIsExecuting; use errors::{AppContext, ContextType, ErrorContext, PluginContext, ScreenContext}; use input::handler::input_loop; @@ -38,21 +38,7 @@ use wasm_vm::{ wasi_stdout, wasi_write_string, zellij_imports, EventType, PluginInputType, PluginInstruction, }; -#[derive(Serialize, Deserialize, Debug, Clone)] -pub enum ServerInstruction { - OpenFile(PathBuf), - SplitHorizontally, - SplitVertically, - MoveFocus, - NewClient(String), - ToPty(PtyInstruction), - ToScreen(ScreenInstruction), - OsApi(ServerOsApiInstruction), - DoneClosingPane, - ClosePluginPane(u32), - Exit, -} - +/// Instructions sent from server to client #[derive(Serialize, Deserialize, Debug, Clone)] pub enum ClientInstruction { ToScreen(ScreenInstruction), @@ -134,13 +120,7 @@ thread_local!( static OPENCALLS: RefCell = RefCell::default() ); -task_local! { - /// A key to some task local storage that holds a representation of the task's call - /// stack in the form of an [`ErrorContext`]. - static ASYNCOPENCALLS: RefCell = RefCell::default() -} - -/// Instructions related to the entire application. +/// Instructions related to the client-side application. #[derive(Clone)] pub enum AppInstruction { Exit, @@ -571,7 +551,7 @@ pub fn start( AppInstruction::SetState(state) => app_state = state, AppInstruction::Exit => break, AppInstruction::Error(backtrace) => { - let _ = os_input.send_to_server(ServerInstruction::Exit); + let _ = os_input.send_to_server(ServerInstruction::ClientExit); let _ = send_screen_instructions.send(ScreenInstruction::Exit); let _ = send_plugin_instructions.send(PluginInstruction::Exit); let _ = screen_thread.join(); @@ -603,7 +583,7 @@ pub fn start( } } - let _ = os_input.send_to_server(ServerInstruction::Exit); + let _ = os_input.send_to_server(ServerInstruction::ClientExit); let _ = send_screen_instructions.send(ScreenInstruction::Exit); let _ = send_plugin_instructions.send(PluginInstruction::Exit); screen_thread.join().unwrap(); diff --git a/src/common/os_input_output.rs b/src/common/os_input_output.rs index ad772833..62006162 100644 --- a/src/common/os_input_output.rs +++ b/src/common/os_input_output.rs @@ -16,9 +16,10 @@ use std::path::PathBuf; use std::process::{Child, Command}; use std::sync::{Arc, Mutex}; -use crate::common::{ClientInstruction, ServerInstruction}; +use crate::common::ClientInstruction; use crate::errors::ErrorContext; use crate::panes::PositionAndSize; +use crate::server::ServerInstruction; use crate::utils::consts::ZELLIJ_IPC_PIPE; const IPC_BUFFER_SIZE: u32 = 8192; @@ -221,8 +222,8 @@ pub trait ServerOsApi: Send + Sync { fn kill(&mut self, pid: RawFd) -> Result<(), nix::Error>; /// Returns a [`Box`] pointer to this [`ServerOsApi`] struct. fn box_clone(&self) -> Box; - /// Sends a message to the server. - fn send_to_server(&mut self, msg: ServerInstruction); + /// Sends an `Exit` message to the server router thread. + fn server_exit(&mut self); /// Receives a message on server-side IPC channel fn server_recv(&self) -> (ServerInstruction, ErrorContext); /// Sends a message to client @@ -258,8 +259,8 @@ impl ServerOsApi for ServerOsInputOutput { waitpid(Pid::from_raw(pid), None).unwrap(); Ok(()) } - fn send_to_server(&mut self, msg: ServerInstruction) { - self.server_sender.send(msg).unwrap(); + fn server_exit(&mut self) { + self.server_sender.send(ServerInstruction::Exit).unwrap(); } fn server_recv(&self) -> (ServerInstruction, ErrorContext) { self.server_receiver.lock().unwrap().recv().unwrap() diff --git a/src/common/pty_bus.rs b/src/common/pty_bus.rs index 84a06e80..1d887ba2 100644 --- a/src/common/pty_bus.rs +++ b/src/common/pty_bus.rs @@ -10,14 +10,14 @@ use ::vte; use serde::{Deserialize, Serialize}; use std::path::PathBuf; -use super::{ScreenInstruction, OPENCALLS}; +use super::{ScreenInstruction, SenderWithContext, OPENCALLS}; use crate::layout::Layout; use crate::os_input_output::ServerOsApi; use crate::utils::logging::debug_to_file; use crate::{ - common::ServerInstruction, errors::{ContextType, ErrorContext}, panes::PaneId, + server::ServerInstruction, }; pub struct ReadFromPid { @@ -81,83 +81,94 @@ pub enum VteEvent { struct VteEventSender { id: RawFd, - os_input: Box, + send_server_instructions: SenderWithContext, } impl VteEventSender { - pub fn new(id: RawFd, os_input: Box) -> Self { - VteEventSender { id, os_input } + pub fn new(id: RawFd, send_server_instructions: SenderWithContext) -> Self { + VteEventSender { + id, + send_server_instructions, + } } } impl vte::Perform for VteEventSender { fn print(&mut self, c: char) { - self.os_input - .send_to_server(ServerInstruction::ToScreen(ScreenInstruction::Pty( + self.send_server_instructions + .send(ServerInstruction::ToScreen(ScreenInstruction::Pty( self.id, VteEvent::Print(c), - ))); + ))) + .unwrap(); } fn execute(&mut self, byte: u8) { - self.os_input - .send_to_server(ServerInstruction::ToScreen(ScreenInstruction::Pty( + self.send_server_instructions + .send(ServerInstruction::ToScreen(ScreenInstruction::Pty( self.id, VteEvent::Execute(byte), - ))); + ))) + .unwrap(); } fn hook(&mut self, params: &[i64], intermediates: &[u8], ignore: bool, c: char) { let params = params.iter().copied().collect(); let intermediates = intermediates.iter().copied().collect(); - self.os_input - .send_to_server(ServerInstruction::ToScreen(ScreenInstruction::Pty( + self.send_server_instructions + .send(ServerInstruction::ToScreen(ScreenInstruction::Pty( self.id, VteEvent::Hook(params, intermediates, ignore, c), - ))); + ))) + .unwrap(); } fn put(&mut self, byte: u8) { - self.os_input - .send_to_server(ServerInstruction::ToScreen(ScreenInstruction::Pty( + self.send_server_instructions + .send(ServerInstruction::ToScreen(ScreenInstruction::Pty( self.id, VteEvent::Put(byte), - ))); + ))) + .unwrap(); } fn unhook(&mut self) { - self.os_input - .send_to_server(ServerInstruction::ToScreen(ScreenInstruction::Pty( + self.send_server_instructions + .send(ServerInstruction::ToScreen(ScreenInstruction::Pty( self.id, VteEvent::Unhook, - ))); + ))) + .unwrap(); } fn osc_dispatch(&mut self, params: &[&[u8]], bell_terminated: bool) { let params = params.iter().map(|p| p.to_vec()).collect(); - self.os_input - .send_to_server(ServerInstruction::ToScreen(ScreenInstruction::Pty( + self.send_server_instructions + .send(ServerInstruction::ToScreen(ScreenInstruction::Pty( self.id, VteEvent::OscDispatch(params, bell_terminated), - ))); + ))) + .unwrap(); } fn csi_dispatch(&mut self, params: &[i64], intermediates: &[u8], ignore: bool, c: char) { let params = params.iter().copied().collect(); let intermediates = intermediates.iter().copied().collect(); - self.os_input - .send_to_server(ServerInstruction::ToScreen(ScreenInstruction::Pty( + self.send_server_instructions + .send(ServerInstruction::ToScreen(ScreenInstruction::Pty( self.id, VteEvent::CsiDispatch(params, intermediates, ignore, c), - ))); + ))) + .unwrap(); } fn esc_dispatch(&mut self, intermediates: &[u8], ignore: bool, byte: u8) { let intermediates = intermediates.iter().copied().collect(); - self.os_input - .send_to_server(ServerInstruction::ToScreen(ScreenInstruction::Pty( + self.send_server_instructions + .send(ServerInstruction::ToScreen(ScreenInstruction::Pty( self.id, VteEvent::EscDispatch(intermediates, ignore, byte), - ))); + ))) + .unwrap(); } } @@ -176,23 +187,25 @@ pub enum PtyInstruction { pub struct PtyBus { pub receive_pty_instructions: Receiver<(PtyInstruction, ErrorContext)>, pub id_to_child_pid: HashMap, - pub os_input: Box, + pub send_server_instructions: SenderWithContext, + os_input: Box, debug_to_file: bool, task_handles: HashMap>, } fn stream_terminal_bytes( pid: RawFd, - mut os_input: Box, + os_input: Box, + mut send_server_instructions: SenderWithContext, debug: bool, ) -> JoinHandle<()> { let mut err_ctx = OPENCALLS.with(|ctx| *ctx.borrow()); task::spawn({ async move { err_ctx.add_call(ContextType::AsyncTask); - os_input.update_senders(err_ctx); + send_server_instructions.update(err_ctx); let mut vte_parser = vte::Parser::new(); - let mut vte_event_sender = VteEventSender::new(pid, os_input.clone()); + let mut vte_event_sender = VteEventSender::new(pid, send_server_instructions.clone()); let mut terminal_bytes = ReadFromPid::new(&pid, os_input.clone()); let mut last_byte_receive_time: Option = None; @@ -218,9 +231,9 @@ fn stream_terminal_bytes( Some(receive_time) => { if receive_time.elapsed() > max_render_pause { pending_render = false; - os_input.send_to_server(ServerInstruction::ToScreen( - ScreenInstruction::Render, - )); + send_server_instructions + .send(ServerInstruction::ToScreen(ScreenInstruction::Render)) + .unwrap(); last_byte_receive_time = Some(Instant::now()); } else { pending_render = true; @@ -234,21 +247,26 @@ fn stream_terminal_bytes( } else { if pending_render { pending_render = false; - os_input - .send_to_server(ServerInstruction::ToScreen(ScreenInstruction::Render)); + send_server_instructions + .send(ServerInstruction::ToScreen(ScreenInstruction::Render)) + .unwrap(); } last_byte_receive_time = None; task::sleep(::std::time::Duration::from_millis(10)).await; } } - os_input.send_to_server(ServerInstruction::ToScreen(ScreenInstruction::Render)); + send_server_instructions + .send(ServerInstruction::ToScreen(ScreenInstruction::Render)) + .unwrap(); #[cfg(not(test))] // this is a little hacky, and is because the tests end the file as soon as // we read everything, rather than hanging until there is new data // a better solution would be to fix the test fakes, but this will do for now - os_input.send_to_server(ServerInstruction::ToScreen(ScreenInstruction::ClosePane( - PaneId::Terminal(pid), - ))); + send_server_instructions + .send(ServerInstruction::ToScreen(ScreenInstruction::ClosePane( + PaneId::Terminal(pid), + ))) + .unwrap(); } }) } @@ -257,12 +275,14 @@ impl PtyBus { pub fn new( receive_pty_instructions: Receiver<(PtyInstruction, ErrorContext)>, os_input: Box, + send_server_instructions: SenderWithContext, debug_to_file: bool, ) -> Self { PtyBus { receive_pty_instructions, os_input, id_to_child_pid: HashMap::new(), + send_server_instructions, debug_to_file, task_handles: HashMap::new(), } @@ -270,8 +290,12 @@ impl PtyBus { pub fn spawn_terminal(&mut self, file_to_open: Option) -> RawFd { let (pid_primary, pid_secondary): (RawFd, RawFd) = self.os_input.spawn_terminal(file_to_open); - let task_handle = - stream_terminal_bytes(pid_primary, self.os_input.clone(), self.debug_to_file); + let task_handle = stream_terminal_bytes( + pid_primary, + self.os_input.clone(), + self.send_server_instructions.clone(), + self.debug_to_file, + ); self.task_handles.insert(pid_primary, task_handle); self.id_to_child_pid.insert(pid_primary, pid_secondary); pid_primary @@ -285,12 +309,18 @@ impl PtyBus { self.id_to_child_pid.insert(pid_primary, pid_secondary); new_pane_pids.push(pid_primary); } - self.os_input - .send_to_server(ServerInstruction::ToScreen(ScreenInstruction::ApplyLayout( + self.send_server_instructions + .send(ServerInstruction::ToScreen(ScreenInstruction::ApplyLayout( (layout_path, new_pane_pids.clone()), - ))); + ))) + .unwrap(); for id in new_pane_pids { - let task_handle = stream_terminal_bytes(id, self.os_input.clone(), self.debug_to_file); + let task_handle = stream_terminal_bytes( + id, + self.os_input.clone(), + self.send_server_instructions.clone(), + self.debug_to_file, + ); self.task_handles.insert(id, task_handle); } } @@ -305,8 +335,9 @@ impl PtyBus { }); } PaneId::Plugin(pid) => self - .os_input - .send_to_server(ServerInstruction::ClosePluginPane(pid)), + .send_server_instructions + .send(ServerInstruction::ClosePluginPane(pid)) + .unwrap(), } } pub fn close_tab(&mut self, ids: Vec) { diff --git a/src/main.rs b/src/main.rs index 4a89470c..eef590a4 100644 --- a/src/main.rs +++ b/src/main.rs @@ -6,9 +6,9 @@ mod server; use client::{boundaries, layout, panes, tab}; use common::{ command_is_executing, errors, os_input_output, pty_bus, screen, start, utils, wasm_vm, - ServerInstruction, }; use directories_next::ProjectDirs; +use server::ServerInstruction; use structopt::StructOpt; diff --git a/src/server/mod.rs b/src/server/mod.rs index 45c0db39..495cceb4 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -1,16 +1,33 @@ use crate::cli::CliArgs; -use crate::common::{ - ChannelWithContext, ClientInstruction, SenderType, SenderWithContext, ServerInstruction, -}; +use crate::common::{ChannelWithContext, ClientInstruction, SenderType, SenderWithContext}; use crate::errors::{ContextType, ErrorContext, OsContext, PtyContext, ServerContext}; use crate::os_input_output::{ServerOsApi, ServerOsApiInstruction}; use crate::panes::PaneId; use crate::pty_bus::{PtyBus, PtyInstruction}; use crate::screen::ScreenInstruction; +use serde::{Deserialize, Serialize}; use std::path::PathBuf; use std::sync::mpsc::channel; use std::thread; +/// Instructions related to server-side application including the +/// ones sent by client to server +#[derive(Serialize, Deserialize, Debug, Clone)] +pub enum ServerInstruction { + OpenFile(PathBuf), + SplitHorizontally, + SplitVertically, + MoveFocus, + NewClient(String), + ToPty(PtyInstruction), + ToScreen(ScreenInstruction), + OsApi(ServerOsApiInstruction), + DoneClosingPane, + ClosePluginPane(u32), + ClientExit, + Exit, +} + pub fn start_server(mut os_input: Box, opts: CliArgs) -> thread::JoinHandle<()> { let (send_pty_instructions, receive_pty_instructions): ChannelWithContext = channel(); @@ -27,6 +44,14 @@ pub fn start_server(mut os_input: Box, opts: CliArgs) -> thread SenderType::Sender(send_os_instructions), ); + let (send_server_instructions, receive_server_instructions): ChannelWithContext< + ServerInstruction, + > = channel(); + let mut send_server_instructions = SenderWithContext::new( + ErrorContext::new(), + SenderType::Sender(send_server_instructions), + ); + // Don't use default layouts in tests, but do everywhere else #[cfg(not(test))] let default_layout = Some(PathBuf::from("default")); @@ -34,7 +59,12 @@ pub fn start_server(mut os_input: Box, opts: CliArgs) -> thread let default_layout = None; let maybe_layout = opts.layout.or(default_layout); - let mut pty_bus = PtyBus::new(receive_pty_instructions, os_input.clone(), opts.debug); + let mut pty_bus = PtyBus::new( + receive_pty_instructions, + os_input.clone(), + send_server_instructions.clone(), + opts.debug, + ); let pty_thread = thread::Builder::new() .name("pty".to_string()) @@ -47,43 +77,55 @@ pub fn start_server(mut os_input: Box, opts: CliArgs) -> thread match event { PtyInstruction::SpawnTerminal(file_to_open) => { let pid = pty_bus.spawn_terminal(file_to_open); - pty_bus.os_input.send_to_server(ServerInstruction::ToScreen( - ScreenInstruction::NewPane(PaneId::Terminal(pid)), - )); + pty_bus + .send_server_instructions + .send(ServerInstruction::ToScreen(ScreenInstruction::NewPane( + PaneId::Terminal(pid), + ))) + .unwrap(); } PtyInstruction::SpawnTerminalVertically(file_to_open) => { let pid = pty_bus.spawn_terminal(file_to_open); - pty_bus.os_input.send_to_server(ServerInstruction::ToScreen( - ScreenInstruction::VerticalSplit(PaneId::Terminal(pid)), - )); + pty_bus + .send_server_instructions + .send(ServerInstruction::ToScreen( + ScreenInstruction::VerticalSplit(PaneId::Terminal(pid)), + )) + .unwrap(); } PtyInstruction::SpawnTerminalHorizontally(file_to_open) => { let pid = pty_bus.spawn_terminal(file_to_open); - pty_bus.os_input.send_to_server(ServerInstruction::ToScreen( - ScreenInstruction::HorizontalSplit(PaneId::Terminal(pid)), - )); + pty_bus + .send_server_instructions + .send(ServerInstruction::ToScreen( + ScreenInstruction::HorizontalSplit(PaneId::Terminal(pid)), + )) + .unwrap(); } PtyInstruction::NewTab => { if let Some(layout) = maybe_layout.clone() { pty_bus.spawn_terminals_for_layout(layout); } else { let pid = pty_bus.spawn_terminal(None); - pty_bus.os_input.send_to_server(ServerInstruction::ToScreen( - ScreenInstruction::NewTab(pid), - )); + pty_bus + .send_server_instructions + .send(ServerInstruction::ToScreen(ScreenInstruction::NewTab(pid))) + .unwrap(); } } PtyInstruction::ClosePane(id) => { pty_bus.close_pane(id); pty_bus - .os_input - .send_to_server(ServerInstruction::DoneClosingPane); + .send_server_instructions + .send(ServerInstruction::DoneClosingPane) + .unwrap(); } PtyInstruction::CloseTab(ids) => { pty_bus.close_tab(ids); pty_bus - .os_input - .send_to_server(ServerInstruction::DoneClosingPane); + .send_server_instructions + .send(ServerInstruction::DoneClosingPane) + .unwrap(); } PtyInstruction::Exit => { break; @@ -118,16 +160,32 @@ pub fn start_server(mut os_input: Box, opts: CliArgs) -> thread }) .unwrap(); + let router_thread = thread::Builder::new() + .name("server_router".to_string()) + .spawn({ + let os_input = os_input.clone(); + move || loop { + let (instruction, err_ctx) = os_input.server_recv(); + send_server_instructions.update(err_ctx); + match instruction { + ServerInstruction::Exit => break, + _ => { + send_server_instructions.send(instruction).unwrap(); + } + } + } + }) + .unwrap(); + thread::Builder::new() .name("ipc_server".to_string()) .spawn({ move || loop { - let (instruction, mut err_ctx) = os_input.server_recv(); + let (instruction, mut err_ctx) = receive_server_instructions.recv().unwrap(); err_ctx.add_call(ContextType::IPCServer(ServerContext::from(&instruction))); send_pty_instructions.update(err_ctx); send_os_instructions.update(err_ctx); os_input.update_senders(err_ctx); - match instruction { ServerInstruction::OpenFile(file_name) => { let path = PathBuf::from(file_name); @@ -169,14 +227,17 @@ pub fn start_server(mut os_input: Box, opts: CliArgs) -> thread ServerInstruction::ClosePluginPane(pid) => { os_input.send_to_client(ClientInstruction::ClosePluginPane(pid)); } - ServerInstruction::Exit => { + ServerInstruction::ClientExit => { let _ = send_pty_instructions.send(PtyInstruction::Exit); let _ = send_os_instructions.send(ServerOsApiInstruction::Exit); + os_input.server_exit(); let _ = pty_thread.join(); let _ = os_thread.join(); + let _ = router_thread.join(); let _ = os_input.send_to_client(ClientInstruction::Exit); break; } + _ => {} } } }) diff --git a/src/tests/fakes.rs b/src/tests/fakes.rs index 43d100bf..32dde38f 100644 --- a/src/tests/fakes.rs +++ b/src/tests/fakes.rs @@ -7,11 +7,10 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{mpsc, Arc, Mutex}; use std::time::{Duration, Instant}; -use crate::common::{ - ChannelWithContext, ClientInstruction, SenderType, SenderWithContext, ServerInstruction, -}; +use crate::common::{ChannelWithContext, ClientInstruction, SenderType, SenderWithContext}; use crate::errors::ErrorContext; use crate::os_input_output::{ClientOsApi, ServerOsApi}; +use crate::server::ServerInstruction; use crate::tests::possible_tty_inputs::{get_possible_tty_inputs, Bytes}; use crate::utils::shared::default_palette; use zellij_tile::data::Palette; @@ -256,8 +255,8 @@ impl ServerOsApi for FakeInputOutput { self.io_events.lock().unwrap().push(IoEvent::Kill(fd)); Ok(()) } - fn send_to_server(&mut self, msg: ServerInstruction) { - self.server_sender.send(msg).unwrap(); + fn server_exit(&mut self) { + self.server_sender.send(ServerInstruction::Exit).unwrap(); } fn server_recv(&self) -> (ServerInstruction, ErrorContext) { self.server_receiver.lock().unwrap().recv().unwrap()