Use IPC for Server to client as well

Add router thread
This commit is contained in:
Kunal Mohan 2021-02-18 13:29:11 +05:30
parent 831a02b9c0
commit 5ece7f44cc
6 changed files with 98 additions and 74 deletions

View file

@ -194,7 +194,7 @@ pub enum ScreenContext {
MoveFocusDown, MoveFocusDown,
MoveFocusUp, MoveFocusUp,
MoveFocusRight, MoveFocusRight,
Quit, Exit,
ScrollUp, ScrollUp,
ScrollDown, ScrollDown,
PageScrollUp, PageScrollUp,
@ -239,7 +239,7 @@ impl From<&ScreenInstruction> for ScreenContext {
ScreenInstruction::MoveFocusDown => ScreenContext::MoveFocusDown, ScreenInstruction::MoveFocusDown => ScreenContext::MoveFocusDown,
ScreenInstruction::MoveFocusUp => ScreenContext::MoveFocusUp, ScreenInstruction::MoveFocusUp => ScreenContext::MoveFocusUp,
ScreenInstruction::MoveFocusRight => ScreenContext::MoveFocusRight, ScreenInstruction::MoveFocusRight => ScreenContext::MoveFocusRight,
ScreenInstruction::Quit => ScreenContext::Quit, ScreenInstruction::Exit => ScreenContext::Exit,
ScreenInstruction::ScrollUp => ScreenContext::ScrollUp, ScreenInstruction::ScrollUp => ScreenContext::ScrollUp,
ScreenInstruction::ScrollDown => ScreenContext::ScrollDown, ScreenInstruction::ScrollDown => ScreenContext::ScrollDown,
ScreenInstruction::PageScrollUp => ScreenContext::PageScrollUp, ScreenInstruction::PageScrollUp => ScreenContext::PageScrollUp,
@ -276,7 +276,7 @@ pub enum PtyContext {
NewTab, NewTab,
ClosePane, ClosePane,
CloseTab, CloseTab,
Quit, Exit,
} }
impl From<&PtyInstruction> for PtyContext { impl From<&PtyInstruction> for PtyContext {
@ -288,7 +288,7 @@ impl From<&PtyInstruction> for PtyContext {
PtyInstruction::ClosePane(_) => PtyContext::ClosePane, PtyInstruction::ClosePane(_) => PtyContext::ClosePane,
PtyInstruction::CloseTab(_) => PtyContext::CloseTab, PtyInstruction::CloseTab(_) => PtyContext::CloseTab,
PtyInstruction::NewTab => PtyContext::NewTab, PtyInstruction::NewTab => PtyContext::NewTab,
PtyInstruction::Quit => PtyContext::Quit, PtyInstruction::Exit => PtyContext::Exit,
} }
} }
} }
@ -304,7 +304,7 @@ pub enum PluginContext {
Update, Update,
Render, Render,
Unload, Unload,
Quit, Exit,
} }
impl From<&PluginInstruction> for PluginContext { impl From<&PluginInstruction> for PluginContext {
@ -314,7 +314,7 @@ impl From<&PluginInstruction> for PluginContext {
PluginInstruction::Update(..) => PluginContext::Update, PluginInstruction::Update(..) => PluginContext::Update,
PluginInstruction::Render(..) => PluginContext::Render, PluginInstruction::Render(..) => PluginContext::Render,
PluginInstruction::Unload(_) => PluginContext::Unload, PluginInstruction::Unload(_) => PluginContext::Unload,
PluginInstruction::Quit => PluginContext::Quit, PluginInstruction::Exit => PluginContext::Exit,
} }
} }
} }

View file

@ -18,7 +18,7 @@ use std::{collections::HashMap, fs};
use crate::panes::PaneId; use crate::panes::PaneId;
use directories_next::ProjectDirs; use directories_next::ProjectDirs;
use input::handler::InputMode; use input::handler::InputMode;
use ipmpsc::{Sender as IpcSender, SharedRingBuffer}; use ipmpsc::{Receiver as IpcReceiver, Sender as IpcSender, SharedRingBuffer};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use termion::input::TermRead; use termion::input::TermRead;
use wasm_vm::PluginEnv; use wasm_vm::PluginEnv;
@ -48,10 +48,11 @@ pub enum ServerInstruction {
SplitHorizontally, SplitHorizontally,
SplitVertically, SplitVertically,
MoveFocus, MoveFocus,
NewClient(String),
ToPty(PtyInstruction), ToPty(PtyInstruction),
ToScreen(ScreenInstruction), ToScreen(ScreenInstruction),
ClosePluginPane(u32), ClosePluginPane(u32),
Quit, Exit,
} }
#[derive(Serialize, Deserialize, Debug, Clone)] #[derive(Serialize, Deserialize, Debug, Clone)]
@ -59,6 +60,7 @@ pub enum ClientInstruction {
ToScreen(ScreenInstruction), ToScreen(ScreenInstruction),
ClosePluginPane(u32), ClosePluginPane(u32),
Error(String), Error(String),
Exit,
} }
// FIXME: It would be good to add some more things to this over time // FIXME: It would be good to add some more things to this over time
@ -177,6 +179,19 @@ pub enum AppInstruction {
ToPlugin(PluginInstruction), ToPlugin(PluginInstruction),
} }
impl From<ClientInstruction> for AppInstruction {
fn from(item: ClientInstruction) -> Self {
match item {
ClientInstruction::ToScreen(s) => AppInstruction::ToScreen(s),
ClientInstruction::Error(e) => AppInstruction::Error(e),
ClientInstruction::ClosePluginPane(p) => {
AppInstruction::ToPlugin(PluginInstruction::Unload(p))
}
ClientInstruction::Exit => AppInstruction::Exit,
}
}
}
/// Start Zellij with the specified [`OsApi`] and command-line arguments. /// Start Zellij with the specified [`OsApi`] and command-line arguments.
// FIXME this should definitely be modularized and split into different functions. // FIXME this should definitely be modularized and split into different functions.
pub fn start(mut os_input: Box<dyn OsApi>, opts: CliArgs, config: Config) { pub fn start(mut os_input: Box<dyn OsApi>, opts: CliArgs, config: Config) {
@ -207,15 +222,16 @@ pub fn start(mut os_input: Box<dyn OsApi>, opts: CliArgs, config: Config) {
let (send_app_instructions, receive_app_instructions): SyncChannelWithContext<AppInstruction> = let (send_app_instructions, receive_app_instructions): SyncChannelWithContext<AppInstruction> =
mpsc::sync_channel(500); mpsc::sync_channel(500);
let send_app_instructions = let mut send_app_instructions =
SenderWithContext::new(SenderType::SyncSender(send_app_instructions)); SenderWithContext::new(err_ctx, SenderType::SyncSender(send_app_instructions));
let ipc_thread = start_server( let ipc_thread = start_server(os_input.clone(), opts.clone(), command_is_executing.clone());
os_input.clone(),
opts.clone(), let (client_buffer_path, client_buffer) = SharedRingBuffer::create_temp(8192).unwrap();
command_is_executing.clone(), let mut send_server_instructions = IpcSenderWithContext::to_server();
send_app_instructions.clone(), send_server_instructions
); .send(ServerInstruction::NewClient(client_buffer_path))
.unwrap();
#[cfg(not(test))] #[cfg(not(test))]
std::panic::set_hook({ std::panic::set_hook({
@ -405,26 +421,7 @@ pub fn start(mut os_input: Box<dyn OsApi>, opts: CliArgs, config: Config) {
screen.apply_layout(Layout::new(layout), new_pane_pids); screen.apply_layout(Layout::new(layout), new_pane_pids);
command_is_executing.done_opening_new_pane(); command_is_executing.done_opening_new_pane();
} }
ScreenInstruction::GoToTab(tab_index) => { ScreenInstruction::Exit => {
screen.go_to_tab(tab_index as usize)
}
ScreenInstruction::UpdateTabName(c) => {
screen.update_active_tab_name(c);
}
ScreenInstruction::TerminalResize => {
screen.resize_to_screen();
}
ScreenInstruction::ChangeMode(mode_info) => {
screen.change_mode(mode_info);
}
ScreenInstruction::ToggleActiveSyncPanes => {
screen
.get_active_tab_mut()
.unwrap()
.toggle_sync_panes_is_active();
screen.update_tabs();
}
ScreenInstruction::Quit => {
break; break;
} }
} }
@ -525,7 +522,7 @@ pub fn start(mut os_input: Box<dyn OsApi>, opts: CliArgs, config: Config) {
buf_tx.send(wasi_read_string(&plugin_env.wasi_env)).unwrap(); buf_tx.send(wasi_read_string(&plugin_env.wasi_env)).unwrap();
} }
PluginInstruction::Unload(pid) => drop(plugin_map.remove(&pid)), PluginInstruction::Unload(pid) => drop(plugin_map.remove(&pid)),
PluginInstruction::Quit => break, PluginInstruction::Exit => break,
} }
} }
}) })
@ -536,6 +533,7 @@ pub fn start(mut os_input: Box<dyn OsApi>, opts: CliArgs, config: Config) {
.spawn({ .spawn({
let send_screen_instructions = send_screen_instructions.clone(); let send_screen_instructions = send_screen_instructions.clone();
let send_plugin_instructions = send_plugin_instructions.clone(); let send_plugin_instructions = send_plugin_instructions.clone();
let send_app_instructions = send_app_instructions.clone();
let os_input = os_input.clone(); let os_input = os_input.clone();
let config = config; let config = config;
move || { move || {
@ -550,7 +548,25 @@ pub fn start(mut os_input: Box<dyn OsApi>, opts: CliArgs, config: Config) {
} }
}); });
let mut send_server_instructions = IpcSenderWithContext::to_server(); let router_thread = thread::Builder::new()
.name("router".to_string())
.spawn({
let recv_client_instructions = IpcReceiver::new(client_buffer);
move || loop {
let (err_ctx, instruction): (ErrorContext, ClientInstruction) =
recv_client_instructions.recv().unwrap();
send_app_instructions.update(err_ctx);
match instruction {
ClientInstruction::Exit => break,
_ => {
send_app_instructions
.send(AppInstruction::from(instruction))
.unwrap();
}
}
}
})
.unwrap();
#[warn(clippy::never_loop)] #[warn(clippy::never_loop)]
loop { loop {
@ -562,16 +578,17 @@ pub fn start(mut os_input: Box<dyn OsApi>, opts: CliArgs, config: Config) {
send_screen_instructions.update(err_ctx); send_screen_instructions.update(err_ctx);
send_server_instructions.update(err_ctx); send_server_instructions.update(err_ctx);
match app_instruction { match app_instruction {
AppInstruction::Exit => { AppInstruction::GetState(state_tx) => drop(state_tx.send(app_state.clone())),
break; AppInstruction::SetState(state) => app_state = state,
} AppInstruction::Exit => break,
AppInstruction::Error(backtrace) => { AppInstruction::Error(backtrace) => {
let _ = send_server_instructions.send(ServerInstruction::Quit); let _ = send_server_instructions.send(ServerInstruction::Exit);
let _ = send_screen_instructions.send(ScreenInstruction::Quit); let _ = send_screen_instructions.send(ScreenInstruction::Exit);
let _ = send_plugin_instructions.send(PluginInstruction::Quit); let _ = send_plugin_instructions.send(PluginInstruction::Exit);
let _ = screen_thread.join(); let _ = screen_thread.join();
let _ = wasm_thread.join(); let _ = wasm_thread.join();
let _ = ipc_thread.join(); let _ = ipc_thread.join();
//let _ = router_thread.join();
os_input.unset_raw_mode(0); os_input.unset_raw_mode(0);
let goto_start_of_last_line = format!("\u{1b}[{};{}H", full_screen_ws.rows, 1); let goto_start_of_last_line = format!("\u{1b}[{};{}H", full_screen_ws.rows, 1);
let error = format!("{}\n{}", goto_start_of_last_line, backtrace); let error = format!("{}\n{}", goto_start_of_last_line, backtrace);
@ -593,12 +610,13 @@ pub fn start(mut os_input: Box<dyn OsApi>, opts: CliArgs, config: Config) {
} }
} }
let _ = send_server_instructions.send(ServerInstruction::Quit); let _ = send_server_instructions.send(ServerInstruction::Exit);
let _ = send_screen_instructions.send(ScreenInstruction::Quit); let _ = send_screen_instructions.send(ScreenInstruction::Exit);
let _ = send_plugin_instructions.send(PluginInstruction::Quit); let _ = send_plugin_instructions.send(PluginInstruction::Exit);
screen_thread.join().unwrap(); screen_thread.join().unwrap();
wasm_thread.join().unwrap(); wasm_thread.join().unwrap();
ipc_thread.join().unwrap(); ipc_thread.join().unwrap();
router_thread.join().unwrap();
// cleanup(); // cleanup();
let reset_style = "\u{1b}[m"; let reset_style = "\u{1b}[m";

View file

@ -181,7 +181,7 @@ pub enum PtyInstruction {
NewTab, NewTab,
ClosePane(PaneId), ClosePane(PaneId),
CloseTab(Vec<PaneId>), CloseTab(Vec<PaneId>),
Quit, Exit,
} }
pub struct PtyBus { pub struct PtyBus {

View file

@ -36,7 +36,7 @@ pub enum ScreenInstruction {
MoveFocusDown, MoveFocusDown,
MoveFocusUp, MoveFocusUp,
MoveFocusRight, MoveFocusRight,
Quit, Exit,
ScrollUp, ScrollUp,
ScrollDown, ScrollDown,
PageScrollUp, PageScrollUp,

View file

@ -21,7 +21,7 @@ pub enum PluginInstruction {
Update(Option<u32>, Event), // Focused plugin / broadcast, event data Update(Option<u32>, Event), // Focused plugin / broadcast, event data
Render(Sender<String>, u32, usize, usize), // String buffer, plugin id, rows, cols Render(Sender<String>, u32, usize, usize), // String buffer, plugin id, rows, cols
Unload(u32), Unload(u32),
Quit, Exit,
} }
#[derive(WasmerEnv, Clone)] #[derive(WasmerEnv, Clone)]

View file

@ -1,19 +1,16 @@
use crate::cli::CliArgs; use crate::cli::CliArgs;
use crate::command_is_executing::CommandIsExecuting; use crate::command_is_executing::CommandIsExecuting;
use crate::common::{ use crate::common::{
AppInstruction, ChannelWithContext, IpcSenderWithContext, SenderType, SenderWithContext, ChannelWithContext, ClientInstruction, IpcSenderWithContext, SenderType, SenderWithContext,
ServerInstruction, ServerInstruction,
}; };
use crate::errors::{ContextType, ErrorContext, PtyContext}; use crate::errors::{ContextType, ErrorContext, PtyContext};
use crate::layout::Layout;
use crate::os_input_output::OsApi; use crate::os_input_output::OsApi;
use crate::panes::PaneId; use crate::panes::PaneId;
use crate::pty_bus::{PtyBus, PtyInstruction}; use crate::pty_bus::{PtyBus, PtyInstruction};
use crate::screen::ScreenInstruction; use crate::screen::ScreenInstruction;
use crate::utils::consts::ZELLIJ_IPC_PIPE; use crate::utils::consts::ZELLIJ_IPC_PIPE;
use crate::wasm_vm::PluginInstruction; use ipmpsc::{Receiver as IpcReceiver, SharedRingBuffer};
use ipmpsc::{Receiver, SharedRingBuffer};
use std::io::{BufReader, Read};
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::mpsc::channel; use std::sync::mpsc::channel;
use std::thread; use std::thread;
@ -22,7 +19,6 @@ pub fn start_server(
os_input: Box<dyn OsApi>, os_input: Box<dyn OsApi>,
opts: CliArgs, opts: CliArgs,
command_is_executing: CommandIsExecuting, command_is_executing: CommandIsExecuting,
mut send_app_instructions: SenderWithContext<AppInstruction>,
) -> thread::JoinHandle<()> { ) -> thread::JoinHandle<()> {
let (send_pty_instructions, receive_pty_instructions): ChannelWithContext<PtyInstruction> = let (send_pty_instructions, receive_pty_instructions): ChannelWithContext<PtyInstruction> =
channel(); channel();
@ -31,7 +27,6 @@ pub fn start_server(
SenderType::Sender(send_pty_instructions), SenderType::Sender(send_pty_instructions),
); );
std::fs::remove_file(ZELLIJ_IPC_PIPE).ok();
let server_buffer = SharedRingBuffer::create(ZELLIJ_IPC_PIPE, 8192).unwrap(); let server_buffer = SharedRingBuffer::create(ZELLIJ_IPC_PIPE, 8192).unwrap();
// Don't use default layouts in tests, but do everywhere else // Don't use default layouts in tests, but do everywhere else
@ -54,7 +49,6 @@ pub fn start_server(
.name("pty".to_string()) .name("pty".to_string())
.spawn({ .spawn({
let mut command_is_executing = command_is_executing.clone(); let mut command_is_executing = command_is_executing.clone();
send_pty_instructions.send(PtyInstruction::NewTab).unwrap();
move || loop { move || loop {
let (event, mut err_ctx) = pty_bus let (event, mut err_ctx) = pty_bus
.receive_pty_instructions .receive_pty_instructions
@ -108,7 +102,7 @@ pub fn start_server(
pty_bus.close_tab(ids); pty_bus.close_tab(ids);
command_is_executing.done_closing_pane(); command_is_executing.done_closing_pane();
} }
PtyInstruction::Quit => { PtyInstruction::Exit => {
break; break;
} }
} }
@ -119,15 +113,20 @@ pub fn start_server(
thread::Builder::new() thread::Builder::new()
.name("ipc_server".to_string()) .name("ipc_server".to_string())
.spawn({ .spawn({
let recv_server_instructions = Receiver::new(server_buffer); let recv_server_instructions = IpcReceiver::new(server_buffer);
// Fixme: We cannot use uninitialised sender, therefore this Vec.
// We make sure that the first message is `NewClient` so there are no out of bouns panics.
let mut send_client_instructions: Vec<IpcSenderWithContext> = Vec::with_capacity(1);
move || loop { move || loop {
let (mut err_ctx, decoded): (ErrorContext, ServerInstruction) = let (mut err_ctx, instruction): (ErrorContext, ServerInstruction) =
recv_server_instructions.recv().unwrap(); recv_server_instructions.recv().unwrap();
err_ctx.add_call(ContextType::IPCServer); err_ctx.add_call(ContextType::IPCServer);
send_pty_instructions.update(err_ctx); send_pty_instructions.update(err_ctx);
send_app_instructions.update(err_ctx); if send_client_instructions.len() == 1 {
send_client_instructions[0].update(err_ctx);
}
match decoded { match instruction {
ServerInstruction::OpenFile(file_name) => { ServerInstruction::OpenFile(file_name) => {
let path = PathBuf::from(file_name); let path = PathBuf::from(file_name);
send_pty_instructions send_pty_instructions
@ -145,26 +144,33 @@ pub fn start_server(
.unwrap(); .unwrap();
} }
ServerInstruction::MoveFocus => { ServerInstruction::MoveFocus => {
send_app_instructions send_client_instructions[0]
.send(AppInstruction::ToScreen(ScreenInstruction::MoveFocus)) .send(ClientInstruction::ToScreen(ScreenInstruction::MoveFocus))
.unwrap(); .unwrap();
} }
ServerInstruction::ToPty(instruction) => { ServerInstruction::NewClient(buffer_path) => {
send_pty_instructions.send(instruction).unwrap(); send_pty_instructions.send(PtyInstruction::NewTab).unwrap();
send_client_instructions.push(IpcSenderWithContext::new(
SharedRingBuffer::open(&buffer_path).unwrap(),
));
} }
ServerInstruction::ToScreen(instruction) => { ServerInstruction::ToPty(instr) => {
send_app_instructions send_pty_instructions.send(instr).unwrap();
.send(AppInstruction::ToScreen(instruction)) }
ServerInstruction::ToScreen(instr) => {
send_client_instructions[0]
.send(ClientInstruction::ToScreen(instr))
.unwrap(); .unwrap();
} }
ServerInstruction::ClosePluginPane(pid) => { ServerInstruction::ClosePluginPane(pid) => {
send_app_instructions send_client_instructions[0]
.send(AppInstruction::ToPlugin(PluginInstruction::Unload(pid))) .send(ClientInstruction::ClosePluginPane(pid))
.unwrap(); .unwrap();
} }
ServerInstruction::Quit => { ServerInstruction::Exit => {
let _ = send_pty_instructions.send(PtyInstruction::Quit); let _ = send_pty_instructions.send(PtyInstruction::Exit);
let _ = pty_thread.join(); let _ = pty_thread.join();
let _ = send_client_instructions[0].send(ClientInstruction::Exit);
break; break;
} }
} }