Use interprocess crate for IPC

This commit is contained in:
Kunal Mohan 2021-04-28 19:00:09 +05:30
parent 9fc1f0038e
commit b7aa3fc21a
8 changed files with 217 additions and 329 deletions

179
Cargo.lock generated
View file

@ -234,15 +234,6 @@ dependencies = [
"wyz", "wyz",
] ]
[[package]]
name = "block-buffer"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4152116fd6e9dadb291ae18fc1ec3575ed6d84c29642d97890f4b4a3417297e4"
dependencies = [
"generic-array",
]
[[package]] [[package]]
name = "blocking" name = "blocking"
version = "1.0.2" version = "1.0.2"
@ -293,19 +284,6 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "chrono"
version = "0.4.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "670ad68c9088c2a963aaa298cb369688cf3f9465ce5e2d4ca10e6e0098a1ce73"
dependencies = [
"libc",
"num-integer",
"num-traits",
"time",
"winapi",
]
[[package]] [[package]]
name = "clap" name = "clap"
version = "2.33.3" version = "2.33.3"
@ -354,12 +332,6 @@ dependencies = [
"winapi", "winapi",
] ]
[[package]]
name = "cpuid-bool"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8aebca1129a03dc6dc2b127edd729435bbc4a37e1d5f4d7513165089ceb02634"
[[package]] [[package]]
name = "cranelift-bforest" name = "cranelift-bforest"
version = "0.68.0" version = "0.68.0"
@ -524,15 +496,6 @@ dependencies = [
"syn", "syn",
] ]
[[package]]
name = "digest"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3dd60d1080a57a05ab032377049e0591415d2b31afd7028356dbf3cc6dcb066"
dependencies = [
"generic-array",
]
[[package]] [[package]]
name = "directories-next" name = "directories-next"
version = "2.0.0" version = "2.0.0"
@ -752,16 +715,6 @@ dependencies = [
"serde", "serde",
] ]
[[package]]
name = "generic-array"
version = "0.14.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "501466ecc8a30d1d3b7fc9229b122b2ce8ed6e9d9223f1138d4babb253e51817"
dependencies = [
"typenum",
"version_check",
]
[[package]] [[package]]
name = "getopts" name = "getopts"
version = "0.2.21" version = "0.2.21"
@ -847,12 +800,6 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "hex"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70"
[[package]] [[package]]
name = "ident_case" name = "ident_case"
version = "1.0.1" version = "1.0.1"
@ -939,24 +886,6 @@ dependencies = [
"syn", "syn",
] ]
[[package]]
name = "ipmpsc"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e36cf1ebb87bae3dbbf0a91b80463831de213b2921f28c325b22026f318f17a3"
dependencies = [
"bincode",
"hex",
"libc",
"memmap",
"serde",
"sha2",
"tempfile",
"thiserror",
"vergen",
"winapi",
]
[[package]] [[package]]
name = "itoa" name = "itoa"
version = "0.4.7" version = "0.4.7"
@ -1062,16 +991,6 @@ version = "2.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ee1c47aaa256ecabcaea351eae4a9b01ef39ed810004e298d2511ed284b1525" checksum = "0ee1c47aaa256ecabcaea351eae4a9b01ef39ed810004e298d2511ed284b1525"
[[package]]
name = "memmap"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6585fd95e7bb50d6cc31e20d4cf9afb4e2ba16c5846fc76793f11218da9c475b"
dependencies = [
"libc",
"winapi",
]
[[package]] [[package]]
name = "memmap2" name = "memmap2"
version = "0.2.2" version = "0.2.2"
@ -1141,25 +1060,6 @@ dependencies = [
"version_check", "version_check",
] ]
[[package]]
name = "num-integer"
version = "0.1.44"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d2cc698a63b549a70bc047073d2949cce27cd1c7b0a4a862d08a8031bc2801db"
dependencies = [
"autocfg",
"num-traits",
]
[[package]]
name = "num-traits"
version = "0.2.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a64b1ec5cda2586e284722486d802acf1f7dbdc623e2bfc57e65ca1cd099290"
dependencies = [
"autocfg",
]
[[package]] [[package]]
name = "num_cpus" name = "num_cpus"
version = "1.13.0" version = "1.13.0"
@ -1198,27 +1098,12 @@ version = "1.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "af8b08b04175473088b46763e51ee54da5f9a164bc162f615b91bc179dbf15a3" checksum = "af8b08b04175473088b46763e51ee54da5f9a164bc162f615b91bc179dbf15a3"
[[package]]
name = "opaque-debug"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5"
[[package]] [[package]]
name = "parking" name = "parking"
version = "2.0.0" version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "427c3892f9e783d91cc128285287e70a59e206ca452770ece88a76f7a3eddd72" checksum = "427c3892f9e783d91cc128285287e70a59e206ca452770ece88a76f7a3eddd72"
[[package]]
name = "pest"
version = "2.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "10f4872ae94d7b90ae48754df22fd42ad52ce740b8f370b03da4835417403e53"
dependencies = [
"ucd-trie",
]
[[package]] [[package]]
name = "pin-project-lite" name = "pin-project-lite"
version = "0.2.6" version = "0.2.6"
@ -1457,15 +1342,6 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2"
[[package]]
name = "rustc_version"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f0dfe2087c51c460008730de8b57e6a320782fbfb312e1f4d520e6c6fae155ee"
dependencies = [
"semver",
]
[[package]] [[package]]
name = "ryu" name = "ryu"
version = "1.0.5" version = "1.0.5"
@ -1478,24 +1354,6 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd"
[[package]]
name = "semver"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f301af10236f6df4160f7c3f04eec6dbc70ace82d23326abad5edee88801c6b6"
dependencies = [
"semver-parser",
]
[[package]]
name = "semver-parser"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "00b0bef5b7f9e0df16536d3961cfb6e84331c065b4066afb39768d0e319411f7"
dependencies = [
"pest",
]
[[package]] [[package]]
name = "serde" name = "serde"
version = "1.0.125" version = "1.0.125"
@ -1548,19 +1406,6 @@ dependencies = [
"yaml-rust", "yaml-rust",
] ]
[[package]]
name = "sha2"
version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fa827a14b29ab7f44778d14a88d3cb76e949c45083f7dbfa507d0cb699dc12de"
dependencies = [
"block-buffer",
"cfg-if 1.0.0",
"cpuid-bool",
"digest",
"opaque-debug",
]
[[package]] [[package]]
name = "signal-hook" name = "signal-hook"
version = "0.3.8" version = "0.3.8"
@ -1861,12 +1706,6 @@ dependencies = [
"lazy_static", "lazy_static",
] ]
[[package]]
name = "typenum"
version = "1.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "879f6906492a7cd215bfa4cf595b600146ccfac0c79bcbd1f3000162af5e8b06"
[[package]] [[package]]
name = "typetag" name = "typetag"
version = "0.1.7" version = "0.1.7"
@ -1891,12 +1730,6 @@ dependencies = [
"syn", "syn",
] ]
[[package]]
name = "ucd-trie"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "56dee185309b50d1f11bfedef0fe6d036842e3fb77413abef29f8f8d1c5d4c1c"
[[package]] [[package]]
name = "unicode-segmentation" name = "unicode-segmentation"
version = "1.7.1" version = "1.7.1"
@ -1963,17 +1796,6 @@ version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f1bddf1187be692e79c5ffeab891132dfb0f236ed36a43c7ed39f1165ee20191" checksum = "f1bddf1187be692e79c5ffeab891132dfb0f236ed36a43c7ed39f1165ee20191"
[[package]]
name = "vergen"
version = "3.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e7141e445af09c8919f1d5f8a20dae0b20c3b57a45dee0d5823c6ed5d237f15a"
dependencies = [
"bitflags",
"chrono",
"rustc_version",
]
[[package]] [[package]]
name = "version_check" name = "version_check"
version = "0.9.3" version = "0.9.3"
@ -2384,7 +2206,6 @@ dependencies = [
"futures", "futures",
"insta", "insta",
"interprocess", "interprocess",
"ipmpsc",
"lazy_static", "lazy_static",
"libc", "libc",
"nix", "nix",

View file

@ -17,7 +17,6 @@ backtrace = "0.3.55"
bincode = "1.3.1" bincode = "1.3.1"
directories-next = "2.0" directories-next = "2.0"
futures = "0.3.5" futures = "0.3.5"
ipmpsc = "0.5.0"
libc = "0.2" libc = "0.2"
nix = "0.19.1" nix = "0.19.1"
nom = "6.0.1" nom = "6.0.1"
@ -36,7 +35,7 @@ strum = "0.20.0"
lazy_static = "1.4.0" lazy_static = "1.4.0"
wasmer = "1.0.0" wasmer = "1.0.0"
wasmer-wasi = "1.0.0" wasmer-wasi = "1.0.0"
interprocess = "1.0.1" interprocess = "1.1.1"
zellij-tile = { path = "zellij-tile/", version = "0.5.0" } zellij-tile = { path = "zellij-tile/", version = "0.5.0" }
[dependencies.async-std] [dependencies.async-std]

View file

@ -49,6 +49,8 @@ pub fn start_client(mut os_input: Box<dyn ClientOsApi>, opts: CliArgs) {
let mut command_is_executing = CommandIsExecuting::new(); let mut command_is_executing = CommandIsExecuting::new();
let full_screen_ws = os_input.get_terminal_size_using_fd(0); let full_screen_ws = os_input.get_terminal_size_using_fd(0);
os_input.connect_to_server();
os_input.send_to_server(ServerInstruction::NewClient(full_screen_ws));
os_input.set_raw_mode(0); os_input.set_raw_mode(0);
let (send_client_instructions, receive_client_instructions): SyncChannelWithContext< let (send_client_instructions, receive_client_instructions): SyncChannelWithContext<
@ -57,8 +59,6 @@ pub fn start_client(mut os_input: Box<dyn ClientOsApi>, opts: CliArgs) {
let send_client_instructions = let send_client_instructions =
SenderWithContext::new(SenderType::SyncSender(send_client_instructions)); SenderWithContext::new(SenderType::SyncSender(send_client_instructions));
os_input.connect_to_server(full_screen_ws);
#[cfg(not(test))] #[cfg(not(test))]
std::panic::set_hook({ std::panic::set_hook({
use crate::errors::handle_panic; use crate::errors::handle_panic;

View file

@ -357,7 +357,6 @@ pub enum ServerContext {
DoneOpeningNewPane, DoneOpeningNewPane,
DoneUpdatingTabs, DoneUpdatingTabs,
ClientExit, ClientExit,
Exit,
} }
impl From<&ServerInstruction> for ServerContext { impl From<&ServerInstruction> for ServerContext {
@ -375,7 +374,6 @@ impl From<&ServerInstruction> for ServerContext {
ServerInstruction::DoneOpeningNewPane => ServerContext::DoneOpeningNewPane, ServerInstruction::DoneOpeningNewPane => ServerContext::DoneOpeningNewPane,
ServerInstruction::DoneUpdatingTabs => ServerContext::DoneUpdatingTabs, ServerInstruction::DoneUpdatingTabs => ServerContext::DoneUpdatingTabs,
ServerInstruction::ClientExit => ServerContext::ClientExit, ServerInstruction::ClientExit => ServerContext::ClientExit,
ServerInstruction::Exit => ServerContext::Exit,
} }
} }
} }

View file

@ -1,18 +1,17 @@
use ipmpsc::{Receiver as IpcReceiver, Sender as IpcSender, SharedRingBuffer}; use interprocess::local_socket::LocalSocketStream;
use nix::fcntl::{fcntl, FcntlArg, OFlag}; use nix::fcntl::{fcntl, FcntlArg, OFlag};
use nix::pty::{forkpty, Winsize}; use nix::pty::{forkpty, Winsize};
use nix::sys::signal::{kill, Signal}; use nix::sys::signal::{kill, Signal};
use nix::sys::termios; use nix::sys::termios;
use nix::sys::wait::waitpid; use nix::sys::wait::waitpid;
use nix::unistd; use nix::unistd::{self, ForkResult, Pid};
use nix::unistd::{ForkResult, Pid};
use serde::Serialize; use serde::Serialize;
use signal_hook::{consts::signal::*, iterator::Signals}; use signal_hook::{consts::signal::*, iterator::Signals};
use std::env; use std::env;
use std::io; use std::io;
use std::io::prelude::*; use std::io::prelude::*;
use std::marker::PhantomData; use std::marker::PhantomData;
use std::os::unix::io::RawFd; use std::os::unix::io::{AsRawFd, FromRawFd, RawFd};
use std::path::PathBuf; use std::path::PathBuf;
use std::process::{Child, Command}; use std::process::{Child, Command};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
@ -23,7 +22,7 @@ use crate::panes::PositionAndSize;
use crate::server::ServerInstruction; use crate::server::ServerInstruction;
use crate::utils::consts::ZELLIJ_IPC_PIPE; use crate::utils::consts::ZELLIJ_IPC_PIPE;
const IPC_BUFFER_SIZE: u32 = 8388608; const IPC_BUFFER_SIZE: usize = 262144;
fn into_raw_mode(pid: RawFd) { fn into_raw_mode(pid: RawFd) {
let mut tio = termios::tcgetattr(pid).expect("could not get terminal attribute"); let mut tio = termios::tcgetattr(pid).expect("could not get terminal attribute");
@ -162,35 +161,34 @@ fn spawn_terminal(file_to_open: Option<PathBuf>, orig_termios: termios::Termios)
} }
/// Sends messages on an [ipmpsc](ipmpsc) channel, along with an [`ErrorContext`]. /// Sends messages on an [ipmpsc](ipmpsc) channel, along with an [`ErrorContext`].
#[derive(Clone)]
struct IpcSenderWithContext<T: Serialize> { struct IpcSenderWithContext<T: Serialize> {
sender: IpcSender, sender: LocalSocketStream,
_phantom: PhantomData<T>, _phantom: PhantomData<T>,
} }
impl<T: Serialize> IpcSenderWithContext<T> { impl<T: Serialize> IpcSenderWithContext<T> {
/// Returns a sender to the given [SharedRingBuffer](ipmpsc::SharedRingBuffer). /// Returns a sender to the given [SharedRingBuffer](ipmpsc::SharedRingBuffer).
fn new(buffer: SharedRingBuffer) -> Self { fn new(sender: LocalSocketStream) -> Self {
Self { Self {
sender: IpcSender::new(buffer), sender,
_phantom: PhantomData, _phantom: PhantomData,
} }
} }
/// Sends an event, along with the current [`ErrorContext`], on this /// Sends an event, along with the current [`ErrorContext`], on this
/// [`IpcSenderWithContext`]'s channel. /// [`IpcSenderWithContext`]'s channel.
fn send(&self, msg: T) -> ipmpsc::Result<()> { fn send(&mut self, msg: T) -> Result<(), std::io::Error> {
let err_ctx = get_current_ctx(); let err_ctx = get_current_ctx();
self.sender.send(&(msg, err_ctx)) self.sender
.write_all(&bincode::serialize(&(msg, err_ctx)).unwrap())
} }
} }
#[derive(Clone)] #[derive(Clone)]
pub struct ServerOsInputOutput { pub struct ServerOsInputOutput {
orig_termios: Arc<Mutex<termios::Termios>>, orig_termios: Arc<Mutex<termios::Termios>>,
server_sender: IpcSenderWithContext<ServerInstruction>, recv_socket: Option<Arc<Mutex<LocalSocketStream>>>,
server_receiver: Arc<IpcReceiver>, // Should this be Arc<Mutex<_>> ? sender_socket: Arc<Mutex<Option<IpcSenderWithContext<ClientInstruction>>>>,
client_sender: Option<IpcSenderWithContext<ClientInstruction>>,
} }
/// The `ServerOsApi` trait represents an abstract interface to the features of an operating system that /// The `ServerOsApi` trait represents an abstract interface to the features of an operating system that
@ -213,14 +211,14 @@ pub trait ServerOsApi: Send + Sync {
fn kill(&mut self, pid: RawFd) -> Result<(), nix::Error>; fn kill(&mut self, pid: RawFd) -> Result<(), nix::Error>;
/// Returns a [`Box`] pointer to this [`ServerOsApi`] struct. /// Returns a [`Box`] pointer to this [`ServerOsApi`] struct.
fn box_clone(&self) -> Box<dyn ServerOsApi>; fn box_clone(&self) -> Box<dyn ServerOsApi>;
/// Sends an `Exit` message to the server router thread.
fn server_exit(&mut self);
/// Receives a message on server-side IPC channel /// Receives a message on server-side IPC channel
fn server_recv(&self) -> (ServerInstruction, ErrorContext); fn server_recv(&self) -> (ServerInstruction, ErrorContext);
/// Sends a message to client /// Sends a message to client
fn send_to_client(&self, msg: ClientInstruction); fn send_to_client(&self, msg: ClientInstruction);
/// Adds a sender to client /// Adds a sender to client
fn add_client_sender(&mut self, buffer_path: String); fn add_client_sender(&mut self);
/// Update the receiver socket for the client
fn update_receiver(&mut self, stream: LocalSocketStream);
} }
impl ServerOsApi for ServerOsInputOutput { impl ServerOsApi for ServerOsInputOutput {
@ -254,18 +252,42 @@ impl ServerOsApi for ServerOsInputOutput {
waitpid(Pid::from_raw(pid), None).unwrap(); waitpid(Pid::from_raw(pid), None).unwrap();
Ok(()) Ok(())
} }
fn server_exit(&mut self) {
self.server_sender.send(ServerInstruction::Exit).unwrap();
}
fn server_recv(&self) -> (ServerInstruction, ErrorContext) { fn server_recv(&self) -> (ServerInstruction, ErrorContext) {
self.server_receiver.recv().unwrap() let mut buf = [0; IPC_BUFFER_SIZE];
let bytes = self
.recv_socket
.as_ref()
.unwrap()
.lock()
.unwrap()
.read(&mut buf)
.unwrap();
bincode::deserialize(&buf[..bytes]).unwrap()
} }
fn send_to_client(&self, msg: ClientInstruction) { fn send_to_client(&self, msg: ClientInstruction) {
self.client_sender.as_ref().unwrap().send(msg).unwrap(); self.sender_socket
.lock()
.unwrap()
.as_mut()
.unwrap()
.send(msg)
.unwrap();
} }
fn add_client_sender(&mut self, buffer_path: String) { fn add_client_sender(&mut self) {
let buffer = SharedRingBuffer::open(buffer_path.as_str()).unwrap(); assert!(self.sender_socket.lock().unwrap().is_none());
self.client_sender = Some(IpcSenderWithContext::new(buffer)); let sock_fd = self
.recv_socket
.as_ref()
.unwrap()
.lock()
.unwrap()
.as_raw_fd();
let dup_fd = unistd::dup(sock_fd).unwrap();
let dup_sock = unsafe { LocalSocketStream::from_raw_fd(dup_fd) };
*self.sender_socket.lock().unwrap() = Some(IpcSenderWithContext::new(dup_sock));
}
fn update_receiver(&mut self, stream: LocalSocketStream) {
self.recv_socket = Some(Arc::new(Mutex::new(stream)));
} }
} }
@ -278,22 +300,18 @@ impl Clone for Box<dyn ServerOsApi> {
pub fn get_server_os_input() -> ServerOsInputOutput { pub fn get_server_os_input() -> ServerOsInputOutput {
let current_termios = termios::tcgetattr(0).unwrap(); let current_termios = termios::tcgetattr(0).unwrap();
let orig_termios = Arc::new(Mutex::new(current_termios)); let orig_termios = Arc::new(Mutex::new(current_termios));
let server_buffer = SharedRingBuffer::create(ZELLIJ_IPC_PIPE, IPC_BUFFER_SIZE).unwrap();
let server_sender = IpcSenderWithContext::new(server_buffer.clone());
let server_receiver = Arc::new(IpcReceiver::new(server_buffer));
ServerOsInputOutput { ServerOsInputOutput {
orig_termios, orig_termios,
server_sender, recv_socket: None,
server_receiver, sender_socket: Arc::new(Mutex::new(None)),
client_sender: None,
} }
} }
#[derive(Clone)] #[derive(Clone)]
pub struct ClientOsInputOutput { pub struct ClientOsInputOutput {
orig_termios: Arc<Mutex<termios::Termios>>, orig_termios: Arc<Mutex<termios::Termios>>,
server_sender: IpcSenderWithContext<ServerInstruction>, server_sender: Arc<Mutex<Option<IpcSenderWithContext<ServerInstruction>>>>,
client_receiver: Option<Arc<IpcReceiver>>, // Should this be Option<Arc<Mutex<_>>> ? receiver: Arc<Mutex<Option<LocalSocketStream>>>,
} }
/// The `ClientOsApi` trait represents an abstract interface to the features of an operating system that /// The `ClientOsApi` trait represents an abstract interface to the features of an operating system that
@ -318,9 +336,9 @@ pub trait ClientOsApi: Send + Sync {
/// Receives a message on client-side IPC channel /// Receives a message on client-side IPC channel
// This should be called from the client-side router thread only. // This should be called from the client-side router thread only.
fn client_recv(&self) -> (ClientInstruction, ErrorContext); fn client_recv(&self) -> (ClientInstruction, ErrorContext);
/// Setup the client IpcChannel and notify server of new client
fn connect_to_server(&mut self, full_screen_ws: PositionAndSize);
fn receive_sigwinch(&self, cb: Box<dyn Fn()>); fn receive_sigwinch(&self, cb: Box<dyn Fn()>);
/// Establish a connection with the server socket.
fn connect_to_server(&self);
} }
impl ClientOsApi for ClientOsInputOutput { impl ClientOsApi for ClientOsInputOutput {
@ -351,19 +369,25 @@ impl ClientOsApi for ClientOsInputOutput {
Box::new(stdout) Box::new(stdout)
} }
fn send_to_server(&self, msg: ServerInstruction) { fn send_to_server(&self, msg: ServerInstruction) {
self.server_sender.send(msg).unwrap(); self.server_sender
} .lock()
fn connect_to_server(&mut self, full_screen_ws: PositionAndSize) { .unwrap()
let (client_buffer_path, client_buffer) = .as_mut()
SharedRingBuffer::create_temp(IPC_BUFFER_SIZE).unwrap(); .unwrap()
self.client_receiver = Some(Arc::new(IpcReceiver::new(client_buffer))); .send(msg)
self.send_to_server(ServerInstruction::NewClient( .unwrap();
client_buffer_path,
full_screen_ws,
));
} }
fn client_recv(&self) -> (ClientInstruction, ErrorContext) { fn client_recv(&self) -> (ClientInstruction, ErrorContext) {
self.client_receiver.as_ref().unwrap().recv().unwrap() let mut buf = [0; IPC_BUFFER_SIZE];
let bytes = self
.receiver
.lock()
.unwrap()
.as_mut()
.unwrap()
.read(&mut buf)
.unwrap();
bincode::deserialize(&buf[..bytes]).unwrap()
} }
fn receive_sigwinch(&self, cb: Box<dyn Fn()>) { fn receive_sigwinch(&self, cb: Box<dyn Fn()>) {
let mut signals = Signals::new(&[SIGWINCH, SIGTERM, SIGINT, SIGQUIT]).unwrap(); let mut signals = Signals::new(&[SIGWINCH, SIGTERM, SIGINT, SIGQUIT]).unwrap();
@ -379,6 +403,15 @@ impl ClientOsApi for ClientOsInputOutput {
} }
} }
} }
fn connect_to_server(&self) {
let socket = LocalSocketStream::connect(ZELLIJ_IPC_PIPE).unwrap();
let sock_fd = socket.as_raw_fd();
let dup_fd = unistd::dup(sock_fd).unwrap();
let receiver = unsafe { LocalSocketStream::from_raw_fd(dup_fd) };
let sender = IpcSenderWithContext::new(socket);
*self.server_sender.lock().unwrap() = Some(sender);
*self.receiver.lock().unwrap() = Some(receiver);
}
} }
impl Clone for Box<dyn ClientOsApi> { impl Clone for Box<dyn ClientOsApi> {
@ -390,11 +423,9 @@ impl Clone for Box<dyn ClientOsApi> {
pub fn get_client_os_input() -> ClientOsInputOutput { pub fn get_client_os_input() -> ClientOsInputOutput {
let current_termios = termios::tcgetattr(0).unwrap(); let current_termios = termios::tcgetattr(0).unwrap();
let orig_termios = Arc::new(Mutex::new(current_termios)); let orig_termios = Arc::new(Mutex::new(current_termios));
let server_buffer = SharedRingBuffer::open(ZELLIJ_IPC_PIPE).unwrap();
let server_sender = IpcSenderWithContext::new(server_buffer);
ClientOsInputOutput { ClientOsInputOutput {
orig_termios, orig_termios,
server_sender, server_sender: Arc::new(Mutex::new(None)),
client_receiver: None, receiver: Arc::new(Mutex::new(None)),
} }
} }

View file

@ -60,23 +60,30 @@ pub fn main() {
if let Some(split_dir) = opts.split { if let Some(split_dir) = opts.split {
match split_dir { match split_dir {
'h' => { 'h' => {
get_client_os_input().send_to_server(ServerInstruction::SplitHorizontally); let os_input = get_client_os_input();
os_input.connect_to_server();
os_input.send_to_server(ServerInstruction::SplitHorizontally);
} }
'v' => { 'v' => {
get_client_os_input().send_to_server(ServerInstruction::SplitVertically); let os_input = get_client_os_input();
os_input.connect_to_server();
os_input.send_to_server(ServerInstruction::SplitVertically);
} }
_ => {} _ => {}
}; };
} else if opts.move_focus { } else if opts.move_focus {
get_client_os_input().send_to_server(ServerInstruction::MoveFocus);
} else if let Some(file_to_open) = opts.open_file {
get_client_os_input().send_to_server(ServerInstruction::OpenFile(file_to_open));
} else {
// Mind the order: server_os_input should be created before client_os_input
let server_os_input = get_server_os_input();
let os_input = get_client_os_input(); let os_input = get_client_os_input();
os_input.connect_to_server();
os_input.send_to_server(ServerInstruction::MoveFocus);
} else if let Some(file_to_open) = opts.open_file {
let os_input = get_client_os_input();
os_input.connect_to_server();
os_input.send_to_server(ServerInstruction::OpenFile(file_to_open));
} else {
atomic_create_dir(ZELLIJ_TMP_DIR).unwrap(); atomic_create_dir(ZELLIJ_TMP_DIR).unwrap();
atomic_create_dir(ZELLIJ_TMP_LOG_DIR).unwrap(); atomic_create_dir(ZELLIJ_TMP_LOG_DIR).unwrap();
let server_os_input = get_server_os_input();
let os_input = get_client_os_input();
start(Box::new(os_input), opts, Box::new(server_os_input)); start(Box::new(os_input), opts, Box::new(server_os_input));
} }
} }

View file

@ -1,4 +1,5 @@
use directories_next::ProjectDirs; use directories_next::ProjectDirs;
use interprocess::local_socket::LocalSocketListener;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::mpsc::channel; use std::sync::mpsc::channel;
@ -28,6 +29,7 @@ use crate::common::{
use crate::layout::Layout; use crate::layout::Layout;
use crate::panes::PaneId; use crate::panes::PaneId;
use crate::panes::PositionAndSize; use crate::panes::PositionAndSize;
use crate::utils::consts::ZELLIJ_IPC_PIPE;
/// Instructions related to server-side application including the /// Instructions related to server-side application including the
/// ones sent by client to server /// ones sent by client to server
@ -38,15 +40,13 @@ pub enum ServerInstruction {
SplitVertically, SplitVertically,
MoveFocus, MoveFocus,
TerminalResize(PositionAndSize), TerminalResize(PositionAndSize),
NewClient(String, PositionAndSize), NewClient(PositionAndSize),
Action(Action), Action(Action),
Render(Option<String>), Render(Option<String>),
DoneClosingPane, DoneClosingPane,
DoneOpeningNewPane, DoneOpeningNewPane,
DoneUpdatingTabs, DoneUpdatingTabs,
ClientExit, ClientExit,
// notify router thread to exit
Exit,
} }
struct SessionMetaData { struct SessionMetaData {
@ -69,71 +69,44 @@ impl Drop for SessionMetaData {
} }
} }
pub fn start_server(mut os_input: Box<dyn ServerOsApi>, opts: CliArgs) -> thread::JoinHandle<()> { pub fn start_server(os_input: Box<dyn ServerOsApi>, opts: CliArgs) -> thread::JoinHandle<()> {
let (send_server_instructions, receive_server_instructions): ChannelWithContext< let (send_server_instructions, receive_server_instructions): ChannelWithContext<
ServerInstruction, ServerInstruction,
> = channel(); > = channel();
let send_server_instructions = let send_server_instructions =
SenderWithContext::new(SenderType::Sender(send_server_instructions)); SenderWithContext::new(SenderType::Sender(send_server_instructions));
let sessions: Arc<RwLock<Option<SessionMetaData>>> = Arc::new(RwLock::new(None));
let sessions: Arc<RwLock<HashMap<String, SessionMetaData>>> = #[cfg(test)]
Arc::new(RwLock::new(HashMap::new())); handle_client(
// We handle only single client for now sessions.clone(),
let session: Arc<RwLock<String>> = Arc::new(RwLock::new("session1".into())); os_input.clone(),
let router_thread = thread::Builder::new() send_server_instructions.clone(),
.name("server_router".to_string()) );
.spawn({ #[cfg(not(test))]
let os_input = os_input.clone(); let _ = thread::Builder::new().name("listener".to_string()).spawn({
let session = session.clone(); let os_input = os_input.clone();
let sessions = sessions.clone(); let sessions = sessions.clone();
let send_server_instructions = send_server_instructions.clone(); let send_server_instructions = send_server_instructions.clone();
move || loop { move || {
let (instruction, mut err_ctx) = os_input.server_recv(); drop(std::fs::remove_file(ZELLIJ_IPC_PIPE));
err_ctx.add_call(ContextType::IPCServer(ServerContext::from(&instruction))); let listener = LocalSocketListener::bind(ZELLIJ_IPC_PIPE).unwrap();
let rlocked_session = &*session.read().unwrap(); for stream in listener.incoming() {
let rlocked_sessions = sessions.read().unwrap(); match stream {
match instruction { Ok(stream) => {
ServerInstruction::Exit => break, let mut os_input = os_input.clone();
ServerInstruction::OpenFile(file_name) => { os_input.update_receiver(stream);
rlocked_sessions[rlocked_session] let sessions = sessions.clone();
.send_pty_instructions let send_server_instructions = send_server_instructions.clone();
.send(PtyInstruction::SpawnTerminal(Some(file_name))) handle_client(sessions, os_input, send_server_instructions);
.unwrap();
} }
ServerInstruction::SplitHorizontally => { Err(err) => {
rlocked_sessions[rlocked_session] panic!("err {:?}", err);
.send_pty_instructions
.send(PtyInstruction::SpawnTerminalHorizontally(None))
.unwrap();
}
ServerInstruction::SplitVertically => {
rlocked_sessions[rlocked_session]
.send_pty_instructions
.send(PtyInstruction::SpawnTerminalVertically(None))
.unwrap();
}
ServerInstruction::MoveFocus => {
rlocked_sessions[rlocked_session]
.send_screen_instructions
.send(ScreenInstruction::FocusNextPane)
.unwrap();
}
ServerInstruction::Action(action) => {
route_action(action, &rlocked_sessions[rlocked_session]);
}
ServerInstruction::TerminalResize(new_size) => {
rlocked_sessions[rlocked_session]
.send_screen_instructions
.send(ScreenInstruction::TerminalResize(new_size))
.unwrap();
}
_ => {
send_server_instructions.send(instruction).unwrap();
} }
} }
} }
}) }
.unwrap(); });
thread::Builder::new() thread::Builder::new()
.name("ipc_server".to_string()) .name("ipc_server".to_string())
@ -142,24 +115,22 @@ pub fn start_server(mut os_input: Box<dyn ServerOsApi>, opts: CliArgs) -> thread
let (instruction, mut err_ctx) = receive_server_instructions.recv().unwrap(); let (instruction, mut err_ctx) = receive_server_instructions.recv().unwrap();
err_ctx.add_call(ContextType::IPCServer(ServerContext::from(&instruction))); err_ctx.add_call(ContextType::IPCServer(ServerContext::from(&instruction)));
match instruction { match instruction {
ServerInstruction::NewClient(buffer_path, full_screen_ws) => { ServerInstruction::NewClient(full_screen_ws) => {
let session_data = init_session( let session_data = init_session(
os_input.clone(), os_input.clone(),
opts.clone(), opts.clone(),
send_server_instructions.clone(), send_server_instructions.clone(),
full_screen_ws, full_screen_ws,
); );
drop( *sessions.write().unwrap() = Some(session_data);
sessions sessions
.write() .read()
.unwrap() .unwrap()
.insert(session.read().unwrap().clone(), session_data), .as_ref()
); .unwrap()
sessions.read().unwrap()[&*session.read().unwrap()]
.send_pty_instructions .send_pty_instructions
.send(PtyInstruction::NewTab) .send(PtyInstruction::NewTab)
.unwrap(); .unwrap();
os_input.add_client_sender(buffer_path);
} }
ServerInstruction::DoneClosingPane => { ServerInstruction::DoneClosingPane => {
os_input.send_to_client(ClientInstruction::DoneClosingPane); os_input.send_to_client(ClientInstruction::DoneClosingPane);
@ -171,16 +142,8 @@ pub fn start_server(mut os_input: Box<dyn ServerOsApi>, opts: CliArgs) -> thread
os_input.send_to_client(ClientInstruction::DoneUpdatingTabs); os_input.send_to_client(ClientInstruction::DoneUpdatingTabs);
} }
ServerInstruction::ClientExit => { ServerInstruction::ClientExit => {
drop( *sessions.write().unwrap() = None;
sessions
.write()
.unwrap()
.remove(&*session.read().unwrap())
.unwrap(),
);
os_input.send_to_client(ClientInstruction::Exit); os_input.send_to_client(ClientInstruction::Exit);
os_input.server_exit();
let _ = router_thread.join();
break; break;
} }
ServerInstruction::Render(output) => { ServerInstruction::Render(output) => {
@ -193,6 +156,81 @@ pub fn start_server(mut os_input: Box<dyn ServerOsApi>, opts: CliArgs) -> thread
.unwrap() .unwrap()
} }
fn handle_client(
sessions: Arc<RwLock<Option<SessionMetaData>>>,
mut os_input: Box<dyn ServerOsApi>,
send_server_instructions: SenderWithContext<ServerInstruction>,
) {
thread::Builder::new()
.name("router".to_string())
.spawn(move || loop {
let (instruction, mut err_ctx) = os_input.server_recv();
err_ctx.add_call(ContextType::IPCServer(ServerContext::from(&instruction)));
let rlocked_sessions = sessions.read().unwrap();
match instruction {
ServerInstruction::ClientExit => {
send_server_instructions.send(instruction).unwrap();
break;
}
ServerInstruction::OpenFile(file_name) => {
rlocked_sessions
.as_ref()
.unwrap()
.send_pty_instructions
.send(PtyInstruction::SpawnTerminal(Some(file_name)))
.unwrap();
break;
}
ServerInstruction::SplitHorizontally => {
rlocked_sessions
.as_ref()
.unwrap()
.send_pty_instructions
.send(PtyInstruction::SpawnTerminalHorizontally(None))
.unwrap();
break;
}
ServerInstruction::SplitVertically => {
rlocked_sessions
.as_ref()
.unwrap()
.send_pty_instructions
.send(PtyInstruction::SpawnTerminalVertically(None))
.unwrap();
break;
}
ServerInstruction::MoveFocus => {
rlocked_sessions
.as_ref()
.unwrap()
.send_screen_instructions
.send(ScreenInstruction::FocusNextPane)
.unwrap();
break;
}
ServerInstruction::Action(action) => {
route_action(action, rlocked_sessions.as_ref().unwrap());
}
ServerInstruction::TerminalResize(new_size) => {
rlocked_sessions
.as_ref()
.unwrap()
.send_screen_instructions
.send(ScreenInstruction::TerminalResize(new_size))
.unwrap();
}
ServerInstruction::NewClient(_) => {
os_input.add_client_sender();
send_server_instructions.send(instruction).unwrap();
}
_ => {
send_server_instructions.send(instruction).unwrap();
}
}
})
.unwrap();
}
fn init_session( fn init_session(
os_input: Box<dyn ServerOsApi>, os_input: Box<dyn ServerOsApi>,
opts: CliArgs, opts: CliArgs,

View file

@ -1,4 +1,5 @@
use crate::panes::PositionAndSize; use crate::panes::PositionAndSize;
use interprocess::local_socket::LocalSocketStream;
use std::collections::{HashMap, VecDeque}; use std::collections::{HashMap, VecDeque};
use std::io::Write; use std::io::Write;
use std::os::unix::io::RawFd; use std::os::unix::io::RawFd;
@ -196,12 +197,6 @@ impl ClientOsApi for FakeInputOutput {
fn send_to_server(&self, msg: ServerInstruction) { fn send_to_server(&self, msg: ServerInstruction) {
self.server_sender.send(msg).unwrap(); self.server_sender.send(msg).unwrap();
} }
fn connect_to_server(&mut self, full_screen_ws: PositionAndSize) {
ClientOsApi::send_to_server(
self,
ServerInstruction::NewClient("zellij".into(), full_screen_ws),
);
}
fn client_recv(&self) -> (ClientInstruction, ErrorContext) { fn client_recv(&self) -> (ClientInstruction, ErrorContext) {
self.client_receiver.lock().unwrap().recv().unwrap() self.client_receiver.lock().unwrap().recv().unwrap()
} }
@ -217,6 +212,7 @@ impl ClientOsApi for FakeInputOutput {
cb(); cb();
} }
} }
fn connect_to_server(&self) {}
} }
impl ServerOsApi for FakeInputOutput { impl ServerOsApi for FakeInputOutput {
@ -277,14 +273,12 @@ impl ServerOsApi for FakeInputOutput {
self.io_events.lock().unwrap().push(IoEvent::Kill(fd)); self.io_events.lock().unwrap().push(IoEvent::Kill(fd));
Ok(()) Ok(())
} }
fn server_exit(&mut self) {
self.server_sender.send(ServerInstruction::Exit).unwrap();
}
fn server_recv(&self) -> (ServerInstruction, ErrorContext) { fn server_recv(&self) -> (ServerInstruction, ErrorContext) {
self.server_receiver.lock().unwrap().recv().unwrap() self.server_receiver.lock().unwrap().recv().unwrap()
} }
fn send_to_client(&self, msg: ClientInstruction) { fn send_to_client(&self, msg: ClientInstruction) {
self.client_sender.send(msg).unwrap(); self.client_sender.send(msg).unwrap();
} }
fn add_client_sender(&mut self, _buffer_path: String) {} fn add_client_sender(&mut self) {}
fn update_receiver(&mut self, stream: LocalSocketStream) {}
} }