Add router thread on server side as well

This commit is contained in:
Kunal Mohan 2021-03-24 15:34:53 +05:30
parent 90982c3e47
commit e30ec5745e
7 changed files with 183 additions and 109 deletions

View file

@ -387,6 +387,7 @@ pub enum ServerContext {
OsApi,
DoneClosingPane,
ClosePluginPane,
ClientExit,
Exit,
}
@ -403,6 +404,7 @@ impl From<&ServerInstruction> for ServerContext {
ServerInstruction::OsApi(_) => ServerContext::OsApi,
ServerInstruction::DoneClosingPane => ServerContext::DoneClosingPane,
ServerInstruction::ClosePluginPane(_) => ServerContext::ClosePluginPane,
ServerInstruction::ClientExit => ServerContext::ClientExit,
ServerInstruction::Exit => ServerContext::Exit,
}
}

View file

@ -26,7 +26,7 @@ use wasmer_wasi::{Pipe, WasiState};
use crate::cli::CliArgs;
use crate::layout::Layout;
use crate::server::start_server;
use crate::server::{start_server, ServerInstruction};
use command_is_executing::CommandIsExecuting;
use errors::{AppContext, ContextType, ErrorContext, PluginContext, ScreenContext};
use input::handler::input_loop;
@ -38,21 +38,7 @@ use wasm_vm::{
wasi_stdout, wasi_write_string, zellij_imports, EventType, PluginInputType, PluginInstruction,
};
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum ServerInstruction {
OpenFile(PathBuf),
SplitHorizontally,
SplitVertically,
MoveFocus,
NewClient(String),
ToPty(PtyInstruction),
ToScreen(ScreenInstruction),
OsApi(ServerOsApiInstruction),
DoneClosingPane,
ClosePluginPane(u32),
Exit,
}
/// Instructions sent from server to client
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum ClientInstruction {
ToScreen(ScreenInstruction),
@ -134,13 +120,7 @@ thread_local!(
static OPENCALLS: RefCell<ErrorContext> = RefCell::default()
);
task_local! {
/// A key to some task local storage that holds a representation of the task's call
/// stack in the form of an [`ErrorContext`].
static ASYNCOPENCALLS: RefCell<ErrorContext> = RefCell::default()
}
/// Instructions related to the entire application.
/// Instructions related to the client-side application.
#[derive(Clone)]
pub enum AppInstruction {
Exit,
@ -571,7 +551,7 @@ pub fn start(
AppInstruction::SetState(state) => app_state = state,
AppInstruction::Exit => break,
AppInstruction::Error(backtrace) => {
let _ = os_input.send_to_server(ServerInstruction::Exit);
let _ = os_input.send_to_server(ServerInstruction::ClientExit);
let _ = send_screen_instructions.send(ScreenInstruction::Exit);
let _ = send_plugin_instructions.send(PluginInstruction::Exit);
let _ = screen_thread.join();
@ -603,7 +583,7 @@ pub fn start(
}
}
let _ = os_input.send_to_server(ServerInstruction::Exit);
let _ = os_input.send_to_server(ServerInstruction::ClientExit);
let _ = send_screen_instructions.send(ScreenInstruction::Exit);
let _ = send_plugin_instructions.send(PluginInstruction::Exit);
screen_thread.join().unwrap();

View file

@ -16,9 +16,10 @@ use std::path::PathBuf;
use std::process::{Child, Command};
use std::sync::{Arc, Mutex};
use crate::common::{ClientInstruction, ServerInstruction};
use crate::common::ClientInstruction;
use crate::errors::ErrorContext;
use crate::panes::PositionAndSize;
use crate::server::ServerInstruction;
use crate::utils::consts::ZELLIJ_IPC_PIPE;
const IPC_BUFFER_SIZE: u32 = 8192;
@ -221,8 +222,8 @@ pub trait ServerOsApi: Send + Sync {
fn kill(&mut self, pid: RawFd) -> Result<(), nix::Error>;
/// Returns a [`Box`] pointer to this [`ServerOsApi`] struct.
fn box_clone(&self) -> Box<dyn ServerOsApi>;
/// Sends a message to the server.
fn send_to_server(&mut self, msg: ServerInstruction);
/// Sends an `Exit` message to the server router thread.
fn server_exit(&mut self);
/// Receives a message on server-side IPC channel
fn server_recv(&self) -> (ServerInstruction, ErrorContext);
/// Sends a message to client
@ -258,8 +259,8 @@ impl ServerOsApi for ServerOsInputOutput {
waitpid(Pid::from_raw(pid), None).unwrap();
Ok(())
}
fn send_to_server(&mut self, msg: ServerInstruction) {
self.server_sender.send(msg).unwrap();
fn server_exit(&mut self) {
self.server_sender.send(ServerInstruction::Exit).unwrap();
}
fn server_recv(&self) -> (ServerInstruction, ErrorContext) {
self.server_receiver.lock().unwrap().recv().unwrap()

View file

@ -10,14 +10,14 @@ use ::vte;
use serde::{Deserialize, Serialize};
use std::path::PathBuf;
use super::{ScreenInstruction, OPENCALLS};
use super::{ScreenInstruction, SenderWithContext, OPENCALLS};
use crate::layout::Layout;
use crate::os_input_output::ServerOsApi;
use crate::utils::logging::debug_to_file;
use crate::{
common::ServerInstruction,
errors::{ContextType, ErrorContext},
panes::PaneId,
server::ServerInstruction,
};
pub struct ReadFromPid {
@ -81,83 +81,94 @@ pub enum VteEvent {
struct VteEventSender {
id: RawFd,
os_input: Box<dyn ServerOsApi>,
send_server_instructions: SenderWithContext<ServerInstruction>,
}
impl VteEventSender {
pub fn new(id: RawFd, os_input: Box<dyn ServerOsApi>) -> Self {
VteEventSender { id, os_input }
pub fn new(id: RawFd, send_server_instructions: SenderWithContext<ServerInstruction>) -> Self {
VteEventSender {
id,
send_server_instructions,
}
}
}
impl vte::Perform for VteEventSender {
fn print(&mut self, c: char) {
self.os_input
.send_to_server(ServerInstruction::ToScreen(ScreenInstruction::Pty(
self.send_server_instructions
.send(ServerInstruction::ToScreen(ScreenInstruction::Pty(
self.id,
VteEvent::Print(c),
)));
)))
.unwrap();
}
fn execute(&mut self, byte: u8) {
self.os_input
.send_to_server(ServerInstruction::ToScreen(ScreenInstruction::Pty(
self.send_server_instructions
.send(ServerInstruction::ToScreen(ScreenInstruction::Pty(
self.id,
VteEvent::Execute(byte),
)));
)))
.unwrap();
}
fn hook(&mut self, params: &[i64], intermediates: &[u8], ignore: bool, c: char) {
let params = params.iter().copied().collect();
let intermediates = intermediates.iter().copied().collect();
self.os_input
.send_to_server(ServerInstruction::ToScreen(ScreenInstruction::Pty(
self.send_server_instructions
.send(ServerInstruction::ToScreen(ScreenInstruction::Pty(
self.id,
VteEvent::Hook(params, intermediates, ignore, c),
)));
)))
.unwrap();
}
fn put(&mut self, byte: u8) {
self.os_input
.send_to_server(ServerInstruction::ToScreen(ScreenInstruction::Pty(
self.send_server_instructions
.send(ServerInstruction::ToScreen(ScreenInstruction::Pty(
self.id,
VteEvent::Put(byte),
)));
)))
.unwrap();
}
fn unhook(&mut self) {
self.os_input
.send_to_server(ServerInstruction::ToScreen(ScreenInstruction::Pty(
self.send_server_instructions
.send(ServerInstruction::ToScreen(ScreenInstruction::Pty(
self.id,
VteEvent::Unhook,
)));
)))
.unwrap();
}
fn osc_dispatch(&mut self, params: &[&[u8]], bell_terminated: bool) {
let params = params.iter().map(|p| p.to_vec()).collect();
self.os_input
.send_to_server(ServerInstruction::ToScreen(ScreenInstruction::Pty(
self.send_server_instructions
.send(ServerInstruction::ToScreen(ScreenInstruction::Pty(
self.id,
VteEvent::OscDispatch(params, bell_terminated),
)));
)))
.unwrap();
}
fn csi_dispatch(&mut self, params: &[i64], intermediates: &[u8], ignore: bool, c: char) {
let params = params.iter().copied().collect();
let intermediates = intermediates.iter().copied().collect();
self.os_input
.send_to_server(ServerInstruction::ToScreen(ScreenInstruction::Pty(
self.send_server_instructions
.send(ServerInstruction::ToScreen(ScreenInstruction::Pty(
self.id,
VteEvent::CsiDispatch(params, intermediates, ignore, c),
)));
)))
.unwrap();
}
fn esc_dispatch(&mut self, intermediates: &[u8], ignore: bool, byte: u8) {
let intermediates = intermediates.iter().copied().collect();
self.os_input
.send_to_server(ServerInstruction::ToScreen(ScreenInstruction::Pty(
self.send_server_instructions
.send(ServerInstruction::ToScreen(ScreenInstruction::Pty(
self.id,
VteEvent::EscDispatch(intermediates, ignore, byte),
)));
)))
.unwrap();
}
}
@ -176,23 +187,25 @@ pub enum PtyInstruction {
pub struct PtyBus {
pub receive_pty_instructions: Receiver<(PtyInstruction, ErrorContext)>,
pub id_to_child_pid: HashMap<RawFd, RawFd>,
pub os_input: Box<dyn ServerOsApi>,
pub send_server_instructions: SenderWithContext<ServerInstruction>,
os_input: Box<dyn ServerOsApi>,
debug_to_file: bool,
task_handles: HashMap<RawFd, JoinHandle<()>>,
}
fn stream_terminal_bytes(
pid: RawFd,
mut os_input: Box<dyn ServerOsApi>,
os_input: Box<dyn ServerOsApi>,
mut send_server_instructions: SenderWithContext<ServerInstruction>,
debug: bool,
) -> JoinHandle<()> {
let mut err_ctx = OPENCALLS.with(|ctx| *ctx.borrow());
task::spawn({
async move {
err_ctx.add_call(ContextType::AsyncTask);
os_input.update_senders(err_ctx);
send_server_instructions.update(err_ctx);
let mut vte_parser = vte::Parser::new();
let mut vte_event_sender = VteEventSender::new(pid, os_input.clone());
let mut vte_event_sender = VteEventSender::new(pid, send_server_instructions.clone());
let mut terminal_bytes = ReadFromPid::new(&pid, os_input.clone());
let mut last_byte_receive_time: Option<Instant> = None;
@ -218,9 +231,9 @@ fn stream_terminal_bytes(
Some(receive_time) => {
if receive_time.elapsed() > max_render_pause {
pending_render = false;
os_input.send_to_server(ServerInstruction::ToScreen(
ScreenInstruction::Render,
));
send_server_instructions
.send(ServerInstruction::ToScreen(ScreenInstruction::Render))
.unwrap();
last_byte_receive_time = Some(Instant::now());
} else {
pending_render = true;
@ -234,21 +247,26 @@ fn stream_terminal_bytes(
} else {
if pending_render {
pending_render = false;
os_input
.send_to_server(ServerInstruction::ToScreen(ScreenInstruction::Render));
send_server_instructions
.send(ServerInstruction::ToScreen(ScreenInstruction::Render))
.unwrap();
}
last_byte_receive_time = None;
task::sleep(::std::time::Duration::from_millis(10)).await;
}
}
os_input.send_to_server(ServerInstruction::ToScreen(ScreenInstruction::Render));
send_server_instructions
.send(ServerInstruction::ToScreen(ScreenInstruction::Render))
.unwrap();
#[cfg(not(test))]
// this is a little hacky, and is because the tests end the file as soon as
// we read everything, rather than hanging until there is new data
// a better solution would be to fix the test fakes, but this will do for now
os_input.send_to_server(ServerInstruction::ToScreen(ScreenInstruction::ClosePane(
PaneId::Terminal(pid),
)));
send_server_instructions
.send(ServerInstruction::ToScreen(ScreenInstruction::ClosePane(
PaneId::Terminal(pid),
)))
.unwrap();
}
})
}
@ -257,12 +275,14 @@ impl PtyBus {
pub fn new(
receive_pty_instructions: Receiver<(PtyInstruction, ErrorContext)>,
os_input: Box<dyn ServerOsApi>,
send_server_instructions: SenderWithContext<ServerInstruction>,
debug_to_file: bool,
) -> Self {
PtyBus {
receive_pty_instructions,
os_input,
id_to_child_pid: HashMap::new(),
send_server_instructions,
debug_to_file,
task_handles: HashMap::new(),
}
@ -270,8 +290,12 @@ impl PtyBus {
pub fn spawn_terminal(&mut self, file_to_open: Option<PathBuf>) -> RawFd {
let (pid_primary, pid_secondary): (RawFd, RawFd) =
self.os_input.spawn_terminal(file_to_open);
let task_handle =
stream_terminal_bytes(pid_primary, self.os_input.clone(), self.debug_to_file);
let task_handle = stream_terminal_bytes(
pid_primary,
self.os_input.clone(),
self.send_server_instructions.clone(),
self.debug_to_file,
);
self.task_handles.insert(pid_primary, task_handle);
self.id_to_child_pid.insert(pid_primary, pid_secondary);
pid_primary
@ -285,12 +309,18 @@ impl PtyBus {
self.id_to_child_pid.insert(pid_primary, pid_secondary);
new_pane_pids.push(pid_primary);
}
self.os_input
.send_to_server(ServerInstruction::ToScreen(ScreenInstruction::ApplyLayout(
self.send_server_instructions
.send(ServerInstruction::ToScreen(ScreenInstruction::ApplyLayout(
(layout_path, new_pane_pids.clone()),
)));
)))
.unwrap();
for id in new_pane_pids {
let task_handle = stream_terminal_bytes(id, self.os_input.clone(), self.debug_to_file);
let task_handle = stream_terminal_bytes(
id,
self.os_input.clone(),
self.send_server_instructions.clone(),
self.debug_to_file,
);
self.task_handles.insert(id, task_handle);
}
}
@ -305,8 +335,9 @@ impl PtyBus {
});
}
PaneId::Plugin(pid) => self
.os_input
.send_to_server(ServerInstruction::ClosePluginPane(pid)),
.send_server_instructions
.send(ServerInstruction::ClosePluginPane(pid))
.unwrap(),
}
}
pub fn close_tab(&mut self, ids: Vec<PaneId>) {

View file

@ -6,9 +6,9 @@ mod server;
use client::{boundaries, layout, panes, tab};
use common::{
command_is_executing, errors, os_input_output, pty_bus, screen, start, utils, wasm_vm,
ServerInstruction,
};
use directories_next::ProjectDirs;
use server::ServerInstruction;
use structopt::StructOpt;

View file

@ -1,16 +1,33 @@
use crate::cli::CliArgs;
use crate::common::{
ChannelWithContext, ClientInstruction, SenderType, SenderWithContext, ServerInstruction,
};
use crate::common::{ChannelWithContext, ClientInstruction, SenderType, SenderWithContext};
use crate::errors::{ContextType, ErrorContext, OsContext, PtyContext, ServerContext};
use crate::os_input_output::{ServerOsApi, ServerOsApiInstruction};
use crate::panes::PaneId;
use crate::pty_bus::{PtyBus, PtyInstruction};
use crate::screen::ScreenInstruction;
use serde::{Deserialize, Serialize};
use std::path::PathBuf;
use std::sync::mpsc::channel;
use std::thread;
/// Instructions related to server-side application including the
/// ones sent by client to server
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum ServerInstruction {
OpenFile(PathBuf),
SplitHorizontally,
SplitVertically,
MoveFocus,
NewClient(String),
ToPty(PtyInstruction),
ToScreen(ScreenInstruction),
OsApi(ServerOsApiInstruction),
DoneClosingPane,
ClosePluginPane(u32),
ClientExit,
Exit,
}
pub fn start_server(mut os_input: Box<dyn ServerOsApi>, opts: CliArgs) -> thread::JoinHandle<()> {
let (send_pty_instructions, receive_pty_instructions): ChannelWithContext<PtyInstruction> =
channel();
@ -27,6 +44,14 @@ pub fn start_server(mut os_input: Box<dyn ServerOsApi>, opts: CliArgs) -> thread
SenderType::Sender(send_os_instructions),
);
let (send_server_instructions, receive_server_instructions): ChannelWithContext<
ServerInstruction,
> = channel();
let mut send_server_instructions = SenderWithContext::new(
ErrorContext::new(),
SenderType::Sender(send_server_instructions),
);
// Don't use default layouts in tests, but do everywhere else
#[cfg(not(test))]
let default_layout = Some(PathBuf::from("default"));
@ -34,7 +59,12 @@ pub fn start_server(mut os_input: Box<dyn ServerOsApi>, opts: CliArgs) -> thread
let default_layout = None;
let maybe_layout = opts.layout.or(default_layout);
let mut pty_bus = PtyBus::new(receive_pty_instructions, os_input.clone(), opts.debug);
let mut pty_bus = PtyBus::new(
receive_pty_instructions,
os_input.clone(),
send_server_instructions.clone(),
opts.debug,
);
let pty_thread = thread::Builder::new()
.name("pty".to_string())
@ -47,43 +77,55 @@ pub fn start_server(mut os_input: Box<dyn ServerOsApi>, opts: CliArgs) -> thread
match event {
PtyInstruction::SpawnTerminal(file_to_open) => {
let pid = pty_bus.spawn_terminal(file_to_open);
pty_bus.os_input.send_to_server(ServerInstruction::ToScreen(
ScreenInstruction::NewPane(PaneId::Terminal(pid)),
));
pty_bus
.send_server_instructions
.send(ServerInstruction::ToScreen(ScreenInstruction::NewPane(
PaneId::Terminal(pid),
)))
.unwrap();
}
PtyInstruction::SpawnTerminalVertically(file_to_open) => {
let pid = pty_bus.spawn_terminal(file_to_open);
pty_bus.os_input.send_to_server(ServerInstruction::ToScreen(
ScreenInstruction::VerticalSplit(PaneId::Terminal(pid)),
));
pty_bus
.send_server_instructions
.send(ServerInstruction::ToScreen(
ScreenInstruction::VerticalSplit(PaneId::Terminal(pid)),
))
.unwrap();
}
PtyInstruction::SpawnTerminalHorizontally(file_to_open) => {
let pid = pty_bus.spawn_terminal(file_to_open);
pty_bus.os_input.send_to_server(ServerInstruction::ToScreen(
ScreenInstruction::HorizontalSplit(PaneId::Terminal(pid)),
));
pty_bus
.send_server_instructions
.send(ServerInstruction::ToScreen(
ScreenInstruction::HorizontalSplit(PaneId::Terminal(pid)),
))
.unwrap();
}
PtyInstruction::NewTab => {
if let Some(layout) = maybe_layout.clone() {
pty_bus.spawn_terminals_for_layout(layout);
} else {
let pid = pty_bus.spawn_terminal(None);
pty_bus.os_input.send_to_server(ServerInstruction::ToScreen(
ScreenInstruction::NewTab(pid),
));
pty_bus
.send_server_instructions
.send(ServerInstruction::ToScreen(ScreenInstruction::NewTab(pid)))
.unwrap();
}
}
PtyInstruction::ClosePane(id) => {
pty_bus.close_pane(id);
pty_bus
.os_input
.send_to_server(ServerInstruction::DoneClosingPane);
.send_server_instructions
.send(ServerInstruction::DoneClosingPane)
.unwrap();
}
PtyInstruction::CloseTab(ids) => {
pty_bus.close_tab(ids);
pty_bus
.os_input
.send_to_server(ServerInstruction::DoneClosingPane);
.send_server_instructions
.send(ServerInstruction::DoneClosingPane)
.unwrap();
}
PtyInstruction::Exit => {
break;
@ -118,16 +160,32 @@ pub fn start_server(mut os_input: Box<dyn ServerOsApi>, opts: CliArgs) -> thread
})
.unwrap();
let router_thread = thread::Builder::new()
.name("server_router".to_string())
.spawn({
let os_input = os_input.clone();
move || loop {
let (instruction, err_ctx) = os_input.server_recv();
send_server_instructions.update(err_ctx);
match instruction {
ServerInstruction::Exit => break,
_ => {
send_server_instructions.send(instruction).unwrap();
}
}
}
})
.unwrap();
thread::Builder::new()
.name("ipc_server".to_string())
.spawn({
move || loop {
let (instruction, mut err_ctx) = os_input.server_recv();
let (instruction, mut err_ctx) = receive_server_instructions.recv().unwrap();
err_ctx.add_call(ContextType::IPCServer(ServerContext::from(&instruction)));
send_pty_instructions.update(err_ctx);
send_os_instructions.update(err_ctx);
os_input.update_senders(err_ctx);
match instruction {
ServerInstruction::OpenFile(file_name) => {
let path = PathBuf::from(file_name);
@ -169,14 +227,17 @@ pub fn start_server(mut os_input: Box<dyn ServerOsApi>, opts: CliArgs) -> thread
ServerInstruction::ClosePluginPane(pid) => {
os_input.send_to_client(ClientInstruction::ClosePluginPane(pid));
}
ServerInstruction::Exit => {
ServerInstruction::ClientExit => {
let _ = send_pty_instructions.send(PtyInstruction::Exit);
let _ = send_os_instructions.send(ServerOsApiInstruction::Exit);
os_input.server_exit();
let _ = pty_thread.join();
let _ = os_thread.join();
let _ = router_thread.join();
let _ = os_input.send_to_client(ClientInstruction::Exit);
break;
}
_ => {}
}
}
})

View file

@ -7,11 +7,10 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{mpsc, Arc, Mutex};
use std::time::{Duration, Instant};
use crate::common::{
ChannelWithContext, ClientInstruction, SenderType, SenderWithContext, ServerInstruction,
};
use crate::common::{ChannelWithContext, ClientInstruction, SenderType, SenderWithContext};
use crate::errors::ErrorContext;
use crate::os_input_output::{ClientOsApi, ServerOsApi};
use crate::server::ServerInstruction;
use crate::tests::possible_tty_inputs::{get_possible_tty_inputs, Bytes};
use crate::utils::shared::default_palette;
use zellij_tile::data::Palette;
@ -256,8 +255,8 @@ impl ServerOsApi for FakeInputOutput {
self.io_events.lock().unwrap().push(IoEvent::Kill(fd));
Ok(())
}
fn send_to_server(&mut self, msg: ServerInstruction) {
self.server_sender.send(msg).unwrap();
fn server_exit(&mut self) {
self.server_sender.send(ServerInstruction::Exit).unwrap();
}
fn server_recv(&self) -> (ServerInstruction, ErrorContext) {
self.server_receiver.lock().unwrap().recv().unwrap()