diff --git a/src/common/mod.rs b/src/common/mod.rs index 9a7e40e4..9cafb5d6 100644 --- a/src/common/mod.rs +++ b/src/common/mod.rs @@ -18,7 +18,6 @@ use std::{collections::HashMap, fs}; use crate::panes::PaneId; use directories_next::ProjectDirs; use input::handler::InputMode; -use ipmpsc::{Receiver as IpcReceiver, Sender as IpcSender, SharedRingBuffer}; use serde::{Deserialize, Serialize}; use termion::input::TermRead; use wasm_vm::PluginEnv; @@ -34,13 +33,10 @@ use input::handler::input_loop; use os_input_output::{ClientOsApi, ServerOsApi, ServerOsApiInstruction}; use pty_bus::PtyInstruction; use screen::{Screen, ScreenInstruction}; -use serde::{Deserialize, Serialize}; -use setup::install; -use utils::consts::ZELLIJ_IPC_PIPE; -use wasm_vm::{wasi_read_string, wasi_write_object, zellij_exports, PluginEnv, PluginInstruction}; -use wasmer::{ChainableNamedResolver, Instance, Module, Store, Value}; -use wasmer_wasi::{Pipe, WasiState}; -use zellij_tile::data::{EventType, InputMode, ModeInfo}; +use utils::consts::ZELLIJ_ROOT_PLUGIN_DIR; +use wasm_vm::{ + wasi_stdout, wasi_write_string, zellij_imports, EventType, PluginInputType, PluginInstruction, +}; pub const IPC_BUFFER_SIZE: u32 = 8192; @@ -139,34 +135,6 @@ thread_local!( /// stack in the form of an [`ErrorContext`]. static OPENCALLS: RefCell = RefCell::default() ); -#[derive(Clone)] -pub struct IpcSenderWithContext { - err_ctx: ErrorContext, - sender: IpcSender, -} - -impl IpcSenderWithContext { - pub fn new(buffer: SharedRingBuffer) -> Self { - Self { - err_ctx: ErrorContext::new(), - sender: IpcSender::new(buffer), - } - } - - // This is expensive. Use this only if a buffer is not available. - // Otherwise clone the buffer and use `new()` - pub fn to_server() -> Self { - Self::new(SharedRingBuffer::open(ZELLIJ_IPC_PIPE).unwrap()) - } - - pub fn update(&mut self, ctx: ErrorContext) { - self.err_ctx = ctx; - } - - pub fn send(&mut self, msg: T) -> ipmpsc::Result<()> { - self.sender.send(&(msg, self.err_ctx)) - } -} task_local! { /// A key to some task local storage that holds a representation of the task's call @@ -207,7 +175,7 @@ pub fn start( opts: CliArgs, server_os_input: Box, ) { - let ipc_thread = start_server(server_os_input.clone(), opts.clone()); + let ipc_thread = start_server(server_os_input, opts.clone()); let take_snapshot = "\u{1b}[?1049h"; os_input.unset_raw_mode(0); @@ -237,12 +205,7 @@ pub fn start( let mut send_app_instructions = SenderWithContext::new(err_ctx, SenderType::SyncSender(send_app_instructions)); - let (client_buffer_path, client_buffer) = - SharedRingBuffer::create_temp(IPC_BUFFER_SIZE).unwrap(); - let mut send_server_instructions = os_input.get_server_sender().unwrap(); - send_server_instructions - .send(ServerInstruction::NewClient(client_buffer_path)) - .unwrap(); + os_input.notify_server(); #[cfg(not(test))] std::panic::set_hook({ @@ -571,10 +534,9 @@ pub fn start( let router_thread = thread::Builder::new() .name("router".to_string()) .spawn({ - let recv_client_instructions = IpcReceiver::new(client_buffer); + let os_input = os_input.clone(); move || loop { - let (instruction, err_ctx): (ClientInstruction, ErrorContext) = - recv_client_instructions.recv().unwrap(); + let (instruction, err_ctx) = os_input.client_recv(); send_app_instructions.update(err_ctx); match instruction { ClientInstruction::Exit => break, @@ -596,13 +558,13 @@ pub fn start( err_ctx.add_call(ContextType::App(AppContext::from(&app_instruction))); send_screen_instructions.update(err_ctx); - send_server_instructions.update(err_ctx); + os_input.update_senders(err_ctx); match app_instruction { AppInstruction::GetState(state_tx) => drop(state_tx.send(app_state.clone())), AppInstruction::SetState(state) => app_state = state, AppInstruction::Exit => break, AppInstruction::Error(backtrace) => { - let _ = send_server_instructions.send(ServerInstruction::Exit); + let _ = os_input.send_to_server(ServerInstruction::Exit); let _ = send_screen_instructions.send(ScreenInstruction::Exit); let _ = send_plugin_instructions.send(PluginInstruction::Exit); let _ = screen_thread.join(); @@ -625,20 +587,16 @@ pub fn start( send_plugin_instructions.send(instruction).unwrap(); } AppInstruction::ToPty(instruction) => { - let _ = send_server_instructions - .send(ServerInstruction::ToPty(instruction)) - .unwrap(); + let _ = os_input.send_to_server(ServerInstruction::ToPty(instruction)); } AppInstruction::OsApi(instruction) => { - let _ = send_server_instructions - .send(ServerInstruction::OsApi(instruction)) - .unwrap(); + let _ = os_input.send_to_server(ServerInstruction::OsApi(instruction)); } AppInstruction::DoneClosingPane => command_is_executing.done_closing_pane(), } } - let _ = send_server_instructions.send(ServerInstruction::Exit); + let _ = os_input.send_to_server(ServerInstruction::Exit); let _ = send_screen_instructions.send(ScreenInstruction::Exit); let _ = send_plugin_instructions.send(PluginInstruction::Exit); screen_thread.join().unwrap(); diff --git a/src/common/os_input_output.rs b/src/common/os_input_output.rs index b97834b5..6585251c 100644 --- a/src/common/os_input_output.rs +++ b/src/common/os_input_output.rs @@ -1,7 +1,4 @@ -use crate::common::{IpcSenderWithContext, IPC_BUFFER_SIZE}; -use crate::panes::PositionAndSize; -use crate::utils::consts::ZELLIJ_IPC_PIPE; -use ipmpsc::{Receiver as IpcReceiver, Result as IpcResult, SharedRingBuffer}; +use ipmpsc::{Receiver as IpcReceiver, Sender as IpcSender, SharedRingBuffer}; use nix::fcntl::{fcntl, FcntlArg, OFlag}; use nix::pty::{forkpty, Winsize}; use nix::sys::signal::{kill, Signal}; @@ -13,11 +10,17 @@ use serde::{Deserialize, Serialize}; use std::env; use std::io; use std::io::prelude::*; +use std::marker::PhantomData; use std::os::unix::io::RawFd; use std::path::PathBuf; use std::process::{Child, Command}; use std::sync::{Arc, Mutex}; +use crate::common::{ClientInstruction, ServerInstruction, IPC_BUFFER_SIZE}; +use crate::errors::ErrorContext; +use crate::panes::PositionAndSize; +use crate::utils::consts::ZELLIJ_IPC_PIPE; + fn into_raw_mode(pid: RawFd) { let mut tio = termios::tcgetattr(pid).expect("could not get terminal attribute"); termios::cfmakeraw(&mut tio); @@ -154,10 +157,37 @@ fn spawn_terminal(file_to_open: Option, orig_termios: termios::Termios) (pid_primary, pid_secondary) } +#[derive(Clone)] +struct IpcSenderWithContext { + err_ctx: ErrorContext, + sender: IpcSender, + _phantom: PhantomData, +} + +impl IpcSenderWithContext { + fn new(buffer: SharedRingBuffer) -> Self { + Self { + err_ctx: ErrorContext::new(), + sender: IpcSender::new(buffer), + _phantom: PhantomData, + } + } + + fn update(&mut self, ctx: ErrorContext) { + self.err_ctx = ctx; + } + + fn send(&mut self, msg: T) -> ipmpsc::Result<()> { + self.sender.send(&(msg, self.err_ctx)) + } +} + #[derive(Clone)] pub struct ServerOsInputOutput { orig_termios: Arc>, - server_buffer: SharedRingBuffer, + server_sender: IpcSenderWithContext, + server_receiver: Arc>, + client_sender: Option>, } /// The `ServerOsApi` trait represents an abstract interface to the features of an operating system that @@ -180,11 +210,16 @@ pub trait ServerOsApi: Send + Sync { fn kill(&mut self, pid: RawFd) -> Result<(), nix::Error>; /// Returns a [`Box`] pointer to this [`OsApi`] struct. fn box_clone(&self) -> Box; - /// Returns the receiver of ServerInstructions. - // Should be called by server once only. - fn get_server_receiver(&self) -> IpcReceiver; - /// Returns a sender to the Server. - fn get_server_sender(&self) -> IpcSenderWithContext; + /// Sends a message to the server. + fn send_to_server(&mut self, msg: ServerInstruction); + /// Receives a message on server-side IPC channel + fn server_recv(&self) -> (ServerInstruction, ErrorContext); + /// Sends a message to client + fn send_to_client(&mut self, msg: ClientInstruction); + /// Adds a sender to client + fn add_client_sender(&mut self, buffer_path: String); + /// Update ErrorContext of senders + fn update_senders(&mut self, new_ctx: ErrorContext); } impl ServerOsApi for ServerOsInputOutput { @@ -212,11 +247,24 @@ impl ServerOsApi for ServerOsInputOutput { waitpid(Pid::from_raw(pid), None).unwrap(); Ok(()) } - fn get_server_receiver(&self) -> IpcReceiver { - IpcReceiver::new(self.server_buffer.clone()) + fn send_to_server(&mut self, msg: ServerInstruction) { + self.server_sender.send(msg).unwrap(); } - fn get_server_sender(&self) -> IpcSenderWithContext { - IpcSenderWithContext::new(self.server_buffer.clone()) + fn server_recv(&self) -> (ServerInstruction, ErrorContext) { + self.server_receiver.lock().unwrap().recv().unwrap() + } + fn send_to_client(&mut self, msg: ClientInstruction) { + self.client_sender.as_mut().unwrap().send(msg).unwrap(); + } + fn add_client_sender(&mut self, buffer_path: String) { + let buffer = SharedRingBuffer::open(buffer_path.as_str()).unwrap(); + self.client_sender = Some(IpcSenderWithContext::new(buffer)); + } + fn update_senders(&mut self, new_ctx: ErrorContext) { + self.server_sender.update(new_ctx); + if let Some(ref mut s) = self.client_sender { + s.update(new_ctx); + } } } @@ -230,9 +278,13 @@ pub fn get_server_os_input() -> ServerOsInputOutput { let current_termios = termios::tcgetattr(0).unwrap(); let orig_termios = Arc::new(Mutex::new(current_termios)); let server_buffer = SharedRingBuffer::create(ZELLIJ_IPC_PIPE, IPC_BUFFER_SIZE).unwrap(); + let server_sender = IpcSenderWithContext::new(server_buffer.clone()); + let server_receiver = Arc::new(Mutex::new(IpcReceiver::new(server_buffer.clone()))); ServerOsInputOutput { orig_termios, - server_buffer, + server_sender, + server_receiver, + client_sender: None, } } @@ -248,6 +300,9 @@ pub enum ServerOsApiInstruction { #[derive(Clone)] pub struct ClientOsInputOutput { orig_termios: Arc>, + server_sender: IpcSenderWithContext, + client_buffer_path: String, + client_receiver: Arc>, } /// The `ClientOsApi` trait represents an abstract interface to the features of an operating system that @@ -267,8 +322,14 @@ pub trait ClientOsApi: Send + Sync { fn read_from_stdin(&self) -> Vec; /// Returns a [`Box`] pointer to this [`OsApi`] struct. fn box_clone(&self) -> Box; - /// Returns a sender to the Server. - fn get_server_sender(&self) -> IpcResult; + /// Sends a message to the server. + fn send_to_server(&mut self, msg: ServerInstruction); + /// Update ErrorContext of senders + fn update_senders(&mut self, new_ctx: ErrorContext); + /// Receives a message on client-side IPC channel + fn client_recv(&self) -> (ClientInstruction, ErrorContext); + /// Notify server of new client + fn notify_server(&mut self); } impl ClientOsApi for ClientOsInputOutput { @@ -298,9 +359,19 @@ impl ClientOsApi for ClientOsInputOutput { let stdout = ::std::io::stdout(); Box::new(stdout) } - fn get_server_sender(&self) -> IpcResult { - let buffer = SharedRingBuffer::open(ZELLIJ_IPC_PIPE)?; - Ok(IpcSenderWithContext::new(buffer)) + fn send_to_server(&mut self, msg: ServerInstruction) { + self.server_sender.send(msg).unwrap(); + } + fn update_senders(&mut self, new_ctx: ErrorContext) { + self.server_sender.update(new_ctx); + } + fn notify_server(&mut self) { + self.send_to_server(ServerInstruction::NewClient( + self.client_buffer_path.clone(), + )); + } + fn client_recv(&self) -> (ClientInstruction, ErrorContext) { + self.client_receiver.lock().unwrap().recv().unwrap() } } @@ -313,5 +384,15 @@ impl Clone for Box { pub fn get_client_os_input() -> ClientOsInputOutput { let current_termios = termios::tcgetattr(0).unwrap(); let orig_termios = Arc::new(Mutex::new(current_termios)); - ClientOsInputOutput { orig_termios } + let server_buffer = SharedRingBuffer::open(ZELLIJ_IPC_PIPE).unwrap(); + let server_sender = IpcSenderWithContext::new(server_buffer); + let (client_buffer_path, client_buffer) = + SharedRingBuffer::create_temp(IPC_BUFFER_SIZE).unwrap(); + let client_receiver = Arc::new(Mutex::new(IpcReceiver::new(client_buffer.clone()))); + ClientOsInputOutput { + orig_termios, + server_sender, + client_buffer_path, + client_receiver, + } } diff --git a/src/common/pty_bus.rs b/src/common/pty_bus.rs index a0eda375..84a06e80 100644 --- a/src/common/pty_bus.rs +++ b/src/common/pty_bus.rs @@ -10,7 +10,7 @@ use ::vte; use serde::{Deserialize, Serialize}; use std::path::PathBuf; -use super::{IpcSenderWithContext, ScreenInstruction, OPENCALLS}; +use super::{ScreenInstruction, OPENCALLS}; use crate::layout::Layout; use crate::os_input_output::ServerOsApi; use crate::utils::logging::debug_to_file; @@ -81,94 +81,83 @@ pub enum VteEvent { struct VteEventSender { id: RawFd, - send_server_instructions: IpcSenderWithContext, + os_input: Box, } impl VteEventSender { - pub fn new(id: RawFd, send_server_instructions: IpcSenderWithContext) -> Self { - VteEventSender { - id, - send_server_instructions, - } + pub fn new(id: RawFd, os_input: Box) -> Self { + VteEventSender { id, os_input } } } impl vte::Perform for VteEventSender { fn print(&mut self, c: char) { - self.send_server_instructions - .send(ServerInstruction::ToScreen(ScreenInstruction::Pty( + self.os_input + .send_to_server(ServerInstruction::ToScreen(ScreenInstruction::Pty( self.id, VteEvent::Print(c), - ))) - .unwrap(); + ))); } fn execute(&mut self, byte: u8) { - self.send_server_instructions - .send(ServerInstruction::ToScreen(ScreenInstruction::Pty( + self.os_input + .send_to_server(ServerInstruction::ToScreen(ScreenInstruction::Pty( self.id, VteEvent::Execute(byte), - ))) - .unwrap(); + ))); } fn hook(&mut self, params: &[i64], intermediates: &[u8], ignore: bool, c: char) { let params = params.iter().copied().collect(); let intermediates = intermediates.iter().copied().collect(); - self.send_server_instructions - .send(ServerInstruction::ToScreen(ScreenInstruction::Pty( + self.os_input + .send_to_server(ServerInstruction::ToScreen(ScreenInstruction::Pty( self.id, VteEvent::Hook(params, intermediates, ignore, c), - ))) - .unwrap(); + ))); } fn put(&mut self, byte: u8) { - self.send_server_instructions - .send(ServerInstruction::ToScreen(ScreenInstruction::Pty( + self.os_input + .send_to_server(ServerInstruction::ToScreen(ScreenInstruction::Pty( self.id, VteEvent::Put(byte), - ))) - .unwrap(); + ))); } fn unhook(&mut self) { - self.send_server_instructions - .send(ServerInstruction::ToScreen(ScreenInstruction::Pty( + self.os_input + .send_to_server(ServerInstruction::ToScreen(ScreenInstruction::Pty( self.id, VteEvent::Unhook, - ))) - .unwrap(); + ))); } fn osc_dispatch(&mut self, params: &[&[u8]], bell_terminated: bool) { let params = params.iter().map(|p| p.to_vec()).collect(); - self.send_server_instructions - .send(ServerInstruction::ToScreen(ScreenInstruction::Pty( + self.os_input + .send_to_server(ServerInstruction::ToScreen(ScreenInstruction::Pty( self.id, VteEvent::OscDispatch(params, bell_terminated), - ))) - .unwrap(); + ))); } fn csi_dispatch(&mut self, params: &[i64], intermediates: &[u8], ignore: bool, c: char) { let params = params.iter().copied().collect(); let intermediates = intermediates.iter().copied().collect(); - self.send_server_instructions - .send(ServerInstruction::ToScreen(ScreenInstruction::Pty( + self.os_input + .send_to_server(ServerInstruction::ToScreen(ScreenInstruction::Pty( self.id, VteEvent::CsiDispatch(params, intermediates, ignore, c), - ))) - .unwrap(); + ))); } fn esc_dispatch(&mut self, intermediates: &[u8], ignore: bool, byte: u8) { let intermediates = intermediates.iter().copied().collect(); - self.send_server_instructions - .send(ServerInstruction::ToScreen(ScreenInstruction::Pty( + self.os_input + .send_to_server(ServerInstruction::ToScreen(ScreenInstruction::Pty( self.id, VteEvent::EscDispatch(intermediates, ignore, byte), - ))) - .unwrap(); + ))); } } @@ -187,26 +176,24 @@ pub enum PtyInstruction { pub struct PtyBus { pub receive_pty_instructions: Receiver<(PtyInstruction, ErrorContext)>, pub id_to_child_pid: HashMap, - os_input: Box, + pub os_input: Box, debug_to_file: bool, task_handles: HashMap>, - pub send_server_instructions: IpcSenderWithContext, } fn stream_terminal_bytes( pid: RawFd, - os_input: Box, - mut send_server_instructions: IpcSenderWithContext, + mut os_input: Box, debug: bool, ) -> JoinHandle<()> { let mut err_ctx = OPENCALLS.with(|ctx| *ctx.borrow()); task::spawn({ async move { err_ctx.add_call(ContextType::AsyncTask); - send_server_instructions.update(err_ctx); + os_input.update_senders(err_ctx); let mut vte_parser = vte::Parser::new(); - let mut vte_event_sender = VteEventSender::new(pid, send_server_instructions.clone()); - let mut terminal_bytes = ReadFromPid::new(&pid, os_input); + let mut vte_event_sender = VteEventSender::new(pid, os_input.clone()); + let mut terminal_bytes = ReadFromPid::new(&pid, os_input.clone()); let mut last_byte_receive_time: Option = None; let mut pending_render = false; @@ -231,9 +218,9 @@ fn stream_terminal_bytes( Some(receive_time) => { if receive_time.elapsed() > max_render_pause { pending_render = false; - send_server_instructions - .send(ServerInstruction::ToScreen(ScreenInstruction::Render)) - .unwrap(); + os_input.send_to_server(ServerInstruction::ToScreen( + ScreenInstruction::Render, + )); last_byte_receive_time = Some(Instant::now()); } else { pending_render = true; @@ -247,26 +234,21 @@ fn stream_terminal_bytes( } else { if pending_render { pending_render = false; - send_server_instructions - .send(ServerInstruction::ToScreen(ScreenInstruction::Render)) - .unwrap(); + os_input + .send_to_server(ServerInstruction::ToScreen(ScreenInstruction::Render)); } last_byte_receive_time = None; task::sleep(::std::time::Duration::from_millis(10)).await; } } - send_server_instructions - .send(ServerInstruction::ToScreen(ScreenInstruction::Render)) - .unwrap(); + os_input.send_to_server(ServerInstruction::ToScreen(ScreenInstruction::Render)); #[cfg(not(test))] // this is a little hacky, and is because the tests end the file as soon as // we read everything, rather than hanging until there is new data // a better solution would be to fix the test fakes, but this will do for now - send_server_instructions - .send(ServerInstruction::ToScreen(ScreenInstruction::ClosePane( - PaneId::Terminal(pid), - ))) - .unwrap(); + os_input.send_to_server(ServerInstruction::ToScreen(ScreenInstruction::ClosePane( + PaneId::Terminal(pid), + ))); } }) } @@ -275,7 +257,6 @@ impl PtyBus { pub fn new( receive_pty_instructions: Receiver<(PtyInstruction, ErrorContext)>, os_input: Box, - send_server_instructions: IpcSenderWithContext, debug_to_file: bool, ) -> Self { PtyBus { @@ -284,18 +265,13 @@ impl PtyBus { id_to_child_pid: HashMap::new(), debug_to_file, task_handles: HashMap::new(), - send_server_instructions, } } pub fn spawn_terminal(&mut self, file_to_open: Option) -> RawFd { let (pid_primary, pid_secondary): (RawFd, RawFd) = self.os_input.spawn_terminal(file_to_open); - let task_handle = stream_terminal_bytes( - pid_primary, - self.os_input.clone(), - self.send_server_instructions.clone(), - self.debug_to_file, - ); + let task_handle = + stream_terminal_bytes(pid_primary, self.os_input.clone(), self.debug_to_file); self.task_handles.insert(pid_primary, task_handle); self.id_to_child_pid.insert(pid_primary, pid_secondary); pid_primary @@ -309,18 +285,12 @@ impl PtyBus { self.id_to_child_pid.insert(pid_primary, pid_secondary); new_pane_pids.push(pid_primary); } - self.send_server_instructions - .send(ServerInstruction::ToScreen(ScreenInstruction::ApplyLayout( + self.os_input + .send_to_server(ServerInstruction::ToScreen(ScreenInstruction::ApplyLayout( (layout_path, new_pane_pids.clone()), - ))) - .unwrap(); + ))); for id in new_pane_pids { - let task_handle = stream_terminal_bytes( - id, - self.os_input.clone(), - self.send_server_instructions.clone(), - self.debug_to_file, - ); + let task_handle = stream_terminal_bytes(id, self.os_input.clone(), self.debug_to_file); self.task_handles.insert(id, task_handle); } } @@ -335,9 +305,8 @@ impl PtyBus { }); } PaneId::Plugin(pid) => self - .send_server_instructions - .send(ServerInstruction::ClosePluginPane(pid)) - .unwrap(), + .os_input + .send_to_server(ServerInstruction::ClosePluginPane(pid)), } } pub fn close_tab(&mut self, ids: Vec) { diff --git a/src/main.rs b/src/main.rs index e6b94064..4a89470c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -6,7 +6,7 @@ mod server; use client::{boundaries, layout, panes, tab}; use common::{ command_is_executing, errors, os_input_output, pty_bus, screen, start, utils, wasm_vm, - IpcSenderWithContext, ServerInstruction, + ServerInstruction, }; use directories_next::ProjectDirs; @@ -14,7 +14,7 @@ use structopt::StructOpt; use crate::cli::CliArgs; use crate::command_is_executing::CommandIsExecuting; -use crate::os_input_output::{get_client_os_input, get_server_os_input}; +use crate::os_input_output::{get_client_os_input, get_server_os_input, ClientOsApi}; use crate::pty_bus::VteEvent; use crate::utils::{ consts::{ZELLIJ_TMP_DIR, ZELLIJ_TMP_LOG_DIR}, @@ -63,29 +63,17 @@ pub fn main() { if let Some(split_dir) = opts.split { match split_dir { 'h' => { - let mut send_server_instructions = IpcSenderWithContext::to_server(); - send_server_instructions - .send(ServerInstruction::SplitHorizontally) - .unwrap(); + get_client_os_input().send_to_server(ServerInstruction::SplitHorizontally); } 'v' => { - let mut send_server_instructions = IpcSenderWithContext::to_server(); - send_server_instructions - .send(ServerInstruction::SplitVertically) - .unwrap(); + get_client_os_input().send_to_server(ServerInstruction::SplitVertically); } _ => {} }; } else if opts.move_focus { - let mut send_server_instructions = IpcSenderWithContext::to_server(); - send_server_instructions - .send(ServerInstruction::MoveFocus) - .unwrap(); + get_client_os_input().send_to_server(ServerInstruction::MoveFocus); } else if let Some(file_to_open) = opts.open_file { - let mut send_server_instructions = IpcSenderWithContext::to_server(); - send_server_instructions - .send(ServerInstruction::OpenFile(file_to_open)) - .unwrap(); + get_client_os_input().send_to_server(ServerInstruction::OpenFile(file_to_open)); } else { let server_os_input = get_server_os_input(); let os_input = get_client_os_input(); diff --git a/src/server/mod.rs b/src/server/mod.rs index b902ea27..45c0db39 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -1,19 +1,17 @@ use crate::cli::CliArgs; use crate::common::{ - ChannelWithContext, ClientInstruction, IpcSenderWithContext, SenderType, SenderWithContext, - ServerInstruction, + ChannelWithContext, ClientInstruction, SenderType, SenderWithContext, ServerInstruction, }; use crate::errors::{ContextType, ErrorContext, OsContext, PtyContext, ServerContext}; use crate::os_input_output::{ServerOsApi, ServerOsApiInstruction}; use crate::panes::PaneId; use crate::pty_bus::{PtyBus, PtyInstruction}; use crate::screen::ScreenInstruction; -use ipmpsc::SharedRingBuffer; use std::path::PathBuf; use std::sync::mpsc::channel; use std::thread; -pub fn start_server(os_input: Box, opts: CliArgs) -> thread::JoinHandle<()> { +pub fn start_server(mut os_input: Box, opts: CliArgs) -> thread::JoinHandle<()> { let (send_pty_instructions, receive_pty_instructions): ChannelWithContext = channel(); let mut send_pty_instructions = SenderWithContext::new( @@ -36,14 +34,7 @@ pub fn start_server(os_input: Box, opts: CliArgs) -> thread::Jo let default_layout = None; let maybe_layout = opts.layout.or(default_layout); - let send_server_instructions = os_input.get_server_sender(); - - let mut pty_bus = PtyBus::new( - receive_pty_instructions, - os_input.clone(), - send_server_instructions, - opts.debug, - ); + let mut pty_bus = PtyBus::new(receive_pty_instructions, os_input.clone(), opts.debug); let pty_thread = thread::Builder::new() .name("pty".to_string()) @@ -56,55 +47,43 @@ pub fn start_server(os_input: Box, opts: CliArgs) -> thread::Jo match event { PtyInstruction::SpawnTerminal(file_to_open) => { let pid = pty_bus.spawn_terminal(file_to_open); - pty_bus - .send_server_instructions - .send(ServerInstruction::ToScreen(ScreenInstruction::NewPane( - PaneId::Terminal(pid), - ))) - .unwrap(); + pty_bus.os_input.send_to_server(ServerInstruction::ToScreen( + ScreenInstruction::NewPane(PaneId::Terminal(pid)), + )); } PtyInstruction::SpawnTerminalVertically(file_to_open) => { let pid = pty_bus.spawn_terminal(file_to_open); - pty_bus - .send_server_instructions - .send(ServerInstruction::ToScreen( - ScreenInstruction::VerticalSplit(PaneId::Terminal(pid)), - )) - .unwrap(); + pty_bus.os_input.send_to_server(ServerInstruction::ToScreen( + ScreenInstruction::VerticalSplit(PaneId::Terminal(pid)), + )); } PtyInstruction::SpawnTerminalHorizontally(file_to_open) => { let pid = pty_bus.spawn_terminal(file_to_open); - pty_bus - .send_server_instructions - .send(ServerInstruction::ToScreen( - ScreenInstruction::HorizontalSplit(PaneId::Terminal(pid)), - )) - .unwrap(); + pty_bus.os_input.send_to_server(ServerInstruction::ToScreen( + ScreenInstruction::HorizontalSplit(PaneId::Terminal(pid)), + )); } PtyInstruction::NewTab => { if let Some(layout) = maybe_layout.clone() { pty_bus.spawn_terminals_for_layout(layout); } else { let pid = pty_bus.spawn_terminal(None); - pty_bus - .send_server_instructions - .send(ServerInstruction::ToScreen(ScreenInstruction::NewTab(pid))) - .unwrap(); + pty_bus.os_input.send_to_server(ServerInstruction::ToScreen( + ScreenInstruction::NewTab(pid), + )); } } PtyInstruction::ClosePane(id) => { pty_bus.close_pane(id); pty_bus - .send_server_instructions - .send(ServerInstruction::DoneClosingPane) - .unwrap(); + .os_input + .send_to_server(ServerInstruction::DoneClosingPane); } PtyInstruction::CloseTab(ids) => { pty_bus.close_tab(ids); pty_bus - .send_server_instructions - .send(ServerInstruction::DoneClosingPane) - .unwrap(); + .os_input + .send_to_server(ServerInstruction::DoneClosingPane); } PtyInstruction::Exit => { break; @@ -142,19 +121,12 @@ pub fn start_server(os_input: Box, opts: CliArgs) -> thread::Jo thread::Builder::new() .name("ipc_server".to_string()) .spawn({ - let recv_server_instructions = os_input.get_server_receiver(); - // Fixme: We cannot use uninitialised sender, therefore this Vec. - // For now, We make sure that the first message is `NewClient` so there are no out of bound panics. - let mut send_client_instructions: Vec = Vec::with_capacity(1); move || loop { - let (instruction, mut err_ctx): (ServerInstruction, ErrorContext) = - recv_server_instructions.recv().unwrap(); + let (instruction, mut err_ctx) = os_input.server_recv(); err_ctx.add_call(ContextType::IPCServer(ServerContext::from(&instruction))); send_pty_instructions.update(err_ctx); send_os_instructions.update(err_ctx); - if send_client_instructions.len() == 1 { - send_client_instructions[0].update(err_ctx); - } + os_input.update_senders(err_ctx); match instruction { ServerInstruction::OpenFile(file_name) => { @@ -174,43 +146,35 @@ pub fn start_server(os_input: Box, opts: CliArgs) -> thread::Jo .unwrap(); } ServerInstruction::MoveFocus => { - send_client_instructions[0] - .send(ClientInstruction::ToScreen(ScreenInstruction::MoveFocus)) - .unwrap(); + os_input.send_to_client(ClientInstruction::ToScreen( + ScreenInstruction::MoveFocus, + )); } ServerInstruction::NewClient(buffer_path) => { send_pty_instructions.send(PtyInstruction::NewTab).unwrap(); - send_client_instructions.push(IpcSenderWithContext::new( - SharedRingBuffer::open(&buffer_path).unwrap(), - )); + os_input.add_client_sender(buffer_path); } ServerInstruction::ToPty(instr) => { send_pty_instructions.send(instr).unwrap(); } ServerInstruction::ToScreen(instr) => { - send_client_instructions[0] - .send(ClientInstruction::ToScreen(instr)) - .unwrap(); + os_input.send_to_client(ClientInstruction::ToScreen(instr)); } ServerInstruction::OsApi(instr) => { send_os_instructions.send(instr).unwrap(); } ServerInstruction::DoneClosingPane => { - send_client_instructions[0] - .send(ClientInstruction::DoneClosingPane) - .unwrap(); + os_input.send_to_client(ClientInstruction::DoneClosingPane); } ServerInstruction::ClosePluginPane(pid) => { - send_client_instructions[0] - .send(ClientInstruction::ClosePluginPane(pid)) - .unwrap(); + os_input.send_to_client(ClientInstruction::ClosePluginPane(pid)); } ServerInstruction::Exit => { let _ = send_pty_instructions.send(PtyInstruction::Exit); let _ = send_os_instructions.send(ServerOsApiInstruction::Exit); let _ = pty_thread.join(); let _ = os_thread.join(); - let _ = send_client_instructions[0].send(ClientInstruction::Exit); + let _ = os_input.send_to_client(ClientInstruction::Exit); break; } } diff --git a/src/tests/fakes.rs b/src/tests/fakes.rs index 80cf67b5..42993614 100644 --- a/src/tests/fakes.rs +++ b/src/tests/fakes.rs @@ -1,21 +1,22 @@ use crate::panes::PositionAndSize; -use ipmpsc::{Receiver as IpcReceiver, Result as IpcResult, SharedRingBuffer}; use std::collections::{HashMap, VecDeque}; use std::io::Write; use std::os::unix::io::RawFd; use std::path::PathBuf; -use std::sync::{Arc, Condvar, Mutex}; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::{mpsc, Arc, Mutex}; use std::time::{Duration, Instant}; -use crate::common::{IpcSenderWithContext, IPC_BUFFER_SIZE}; +use crate::common::{ + ChannelWithContext, ClientInstruction, SenderType, SenderWithContext, ServerInstruction, +}; +use crate::errors::ErrorContext; use crate::os_input_output::{ClientOsApi, ServerOsApi}; use crate::tests::possible_tty_inputs::{get_possible_tty_inputs, Bytes}; use crate::utils::shared::default_palette; use zellij_tile::data::Palette; -use crate::tests::utils::commands::{QUIT, SLEEP}; - -const MIN_TIME_BETWEEN_SNAPSHOTS: Duration = Duration::from_millis(500); +const MIN_TIME_BETWEEN_SNAPSHOTS: Duration = Duration::from_millis(100); #[derive(Clone)] pub enum IoEvent { @@ -76,7 +77,10 @@ pub struct FakeInputOutput { possible_tty_inputs: HashMap, last_snapshot_time: Arc>, started_reading_from_pty: Arc, - server_buffer: SharedRingBuffer, + client_sender: SenderWithContext, + client_receiver: Arc>>, + server_sender: SenderWithContext, + server_receiver: Arc>>, } impl FakeInputOutput { @@ -84,7 +88,14 @@ impl FakeInputOutput { let mut win_sizes = HashMap::new(); let last_snapshot_time = Arc::new(Mutex::new(Instant::now())); let stdout_writer = FakeStdoutWriter::new(last_snapshot_time.clone()); - let (_, server_buffer) = SharedRingBuffer::create_temp(IPC_BUFFER_SIZE).unwrap(); + let (client_sender, client_receiver): ChannelWithContext = + mpsc::channel(); + let client_sender = + SenderWithContext::new(ErrorContext::new(), SenderType::Sender(client_sender)); + let (server_sender, server_receiver): ChannelWithContext = + mpsc::channel(); + let server_sender = + SenderWithContext::new(ErrorContext::new(), SenderType::Sender(server_sender)); win_sizes.insert(0, winsize); // 0 is the current terminal FakeInputOutput { @@ -98,7 +109,10 @@ impl FakeInputOutput { win_sizes: Arc::new(Mutex::new(win_sizes)), possible_tty_inputs: get_possible_tty_inputs(), started_reading_from_pty: Arc::new(AtomicBool::new(false)), - server_buffer, + server_receiver: Arc::new(Mutex::new(server_receiver)), + server_sender, + client_receiver: Arc::new(Mutex::new(client_receiver)), + client_sender, } } pub fn with_tty_inputs(mut self, tty_inputs: HashMap) -> Self { @@ -166,8 +180,18 @@ impl ClientOsApi for FakeInputOutput { fn get_stdout_writer(&self) -> Box { Box::new(self.stdout_writer.clone()) } - fn get_server_sender(&self) -> IpcResult { - Ok(IpcSenderWithContext::new(self.server_buffer.clone())) + fn send_to_server(&mut self, msg: ServerInstruction) { + self.server_sender.send(msg).unwrap(); + } + fn update_senders(&mut self, new_ctx: ErrorContext) { + self.server_sender.update(new_ctx); + self.client_sender.update(new_ctx); + } + fn notify_server(&mut self) { + ClientOsApi::send_to_server(self, ServerInstruction::NewClient("zellij".into())); + } + fn client_recv(&self) -> (ClientInstruction, ErrorContext) { + self.client_receiver.lock().unwrap().recv().unwrap() } } @@ -229,11 +253,19 @@ impl ServerOsApi for FakeInputOutput { self.io_events.lock().unwrap().push(IoEvent::Kill(fd)); Ok(()) } - fn get_server_receiver(&self) -> IpcReceiver { - IpcReceiver::new(self.server_buffer.clone()) + fn send_to_server(&mut self, msg: ServerInstruction) { + self.server_sender.send(msg).unwrap(); } - fn get_server_sender(&self) -> IpcSenderWithContext { - IpcSenderWithContext::new(self.server_buffer.clone()) + fn server_recv(&self) -> (ServerInstruction, ErrorContext) { + self.server_receiver.lock().unwrap().recv().unwrap() + } + fn send_to_client(&mut self, msg: ClientInstruction) { + self.client_sender.send(msg).unwrap(); + } + fn add_client_sender(&mut self, _buffer_path: String) {} + fn update_senders(&mut self, new_ctx: ErrorContext) { + self.server_sender.update(new_ctx); + self.client_sender.update(new_ctx); } fn load_palette(&self) -> Palette { default_palette()