Use ipmpsc crate for IPC

This commit is contained in:
Kunal Mohan 2021-02-18 11:27:19 +05:30
parent 1ee86f9a77
commit 77682d9ab5
6 changed files with 276 additions and 181 deletions

237
Cargo.lock generated
View file

@ -234,6 +234,15 @@ dependencies = [
"wyz",
]
[[package]]
name = "block-buffer"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4152116fd6e9dadb291ae18fc1ec3575ed6d84c29642d97890f4b4a3417297e4"
dependencies = [
"generic-array",
]
[[package]]
name = "blocking"
version = "1.0.2"
@ -284,6 +293,19 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "chrono"
version = "0.4.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "670ad68c9088c2a963aaa298cb369688cf3f9465ce5e2d4ca10e6e0098a1ce73"
dependencies = [
"libc",
"num-integer",
"num-traits",
"time",
"winapi",
]
[[package]]
name = "clap"
version = "2.33.3"
@ -338,6 +360,18 @@ dependencies = [
"winapi",
]
[[package]]
name = "const_fn"
version = "0.4.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "28b9d6de7f49e22cf97ad17fc4036ece69300032f45f78f30b4a4482cdc3f4a6"
[[package]]
name = "cpuid-bool"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8aebca1129a03dc6dc2b127edd729435bbc4a37e1d5f4d7513165089ceb02634"
[[package]]
name = "cranelift-bforest"
version = "0.68.0"
@ -502,6 +536,15 @@ dependencies = [
"syn",
]
[[package]]
name = "digest"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3dd60d1080a57a05ab032377049e0591415d2b31afd7028356dbf3cc6dcb066"
dependencies = [
"generic-array",
]
[[package]]
name = "directories-next"
version = "2.0.0"
@ -721,6 +764,16 @@ dependencies = [
"serde",
]
[[package]]
name = "generic-array"
version = "0.14.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "501466ecc8a30d1d3b7fc9229b122b2ce8ed6e9d9223f1138d4babb253e51817"
dependencies = [
"typenum",
"version_check",
]
[[package]]
name = "getopts"
version = "0.2.21"
@ -806,6 +859,12 @@ dependencies = [
"libc",
]
[[package]]
name = "hex"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "644f9158b2f133fd50f5fb3242878846d9eb792e445c893805ff0e3824006e35"
[[package]]
name = "ident_case"
version = "1.0.1"
@ -847,29 +906,6 @@ dependencies = [
"cfg-if 1.0.0",
]
[[package]]
name = "interprocess"
version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1c58ec7fbda1df9a93f587b780659db3c99f61f4be27f9c82c9b37684ffd0366"
dependencies = [
"blocking",
"cfg-if 1.0.0",
"futures",
"intmap",
"libc",
"once_cell",
"spinning",
"thiserror",
"winapi",
]
[[package]]
name = "intmap"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e50930385956f6c4a0b99f3dd654adcc40788456c36e17c5b20e1d1ceb523ec6"
[[package]]
name = "inventory"
version = "0.1.10"
@ -892,6 +928,24 @@ dependencies = [
"syn",
]
[[package]]
name = "ipmpsc"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e36cf1ebb87bae3dbbf0a91b80463831de213b2921f28c325b22026f318f17a3"
dependencies = [
"bincode",
"hex",
"libc",
"memmap",
"serde",
"sha2",
"tempfile",
"thiserror",
"vergen",
"winapi",
]
[[package]]
name = "itoa"
version = "0.4.7"
@ -963,15 +1017,6 @@ version = "0.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7fb9b38af92608140b86b693604b9ffcc5824240a484d1ecd4795bacb2fe88f3"
[[package]]
name = "lock_api"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a3c91c24eae6777794bb1997ad98bbb87daf92890acab859f7eaa4320333176"
dependencies = [
"scopeguard",
]
[[package]]
name = "log"
version = "0.4.14"
@ -997,6 +1042,16 @@ version = "2.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ee1c47aaa256ecabcaea351eae4a9b01ef39ed810004e298d2511ed284b1525"
[[package]]
name = "memmap"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6585fd95e7bb50d6cc31e20d4cf9afb4e2ba16c5846fc76793f11218da9c475b"
dependencies = [
"libc",
"winapi",
]
[[package]]
name = "memmap2"
version = "0.2.2"
@ -1056,6 +1111,25 @@ dependencies = [
"version_check",
]
[[package]]
name = "num-integer"
version = "0.1.44"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d2cc698a63b549a70bc047073d2949cce27cd1c7b0a4a862d08a8031bc2801db"
dependencies = [
"autocfg",
"num-traits",
]
[[package]]
name = "num-traits"
version = "0.2.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a64b1ec5cda2586e284722486d802acf1f7dbdc623e2bfc57e65ca1cd099290"
dependencies = [
"autocfg",
]
[[package]]
name = "num_cpus"
version = "1.13.0"
@ -1094,12 +1168,27 @@ version = "1.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "af8b08b04175473088b46763e51ee54da5f9a164bc162f615b91bc179dbf15a3"
[[package]]
name = "opaque-debug"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5"
[[package]]
name = "parking"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "427c3892f9e783d91cc128285287e70a59e206ca452770ece88a76f7a3eddd72"
[[package]]
name = "pest"
version = "2.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "10f4872ae94d7b90ae48754df22fd42ad52ce740b8f370b03da4835417403e53"
dependencies = [
"ucd-trie",
]
[[package]]
name = "pin-project-lite"
version = "0.2.6"
@ -1338,6 +1427,15 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2"
[[package]]
name = "rustc_version"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f0dfe2087c51c460008730de8b57e6a320782fbfb312e1f4d520e6c6fae155ee"
dependencies = [
"semver",
]
[[package]]
name = "ryu"
version = "1.0.5"
@ -1350,6 +1448,24 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd"
[[package]]
name = "semver"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f301af10236f6df4160f7c3f04eec6dbc70ace82d23326abad5edee88801c6b6"
dependencies = [
"semver-parser",
]
[[package]]
name = "semver-parser"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "00b0bef5b7f9e0df16536d3961cfb6e84331c065b4066afb39768d0e319411f7"
dependencies = [
"pest",
]
[[package]]
name = "serde"
version = "1.0.125"
@ -1402,6 +1518,19 @@ dependencies = [
"yaml-rust",
]
[[package]]
name = "sha2"
version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fa827a14b29ab7f44778d14a88d3cb76e949c45083f7dbfa507d0cb699dc12de"
dependencies = [
"block-buffer",
"cfg-if 1.0.0",
"cpuid-bool",
"digest",
"opaque-debug",
]
[[package]]
name = "signal-hook"
version = "0.3.8"
@ -1439,25 +1568,6 @@ version = "1.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fe0f37c9e8f3c5a4a66ad655a93c74daac4ad00c441533bf5c6e7990bb42604e"
[[package]]
name = "socket2"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e3dfc207c526015c632472a77be09cf1b6e46866581aecae5cc38fb4235dea2"
dependencies = [
"libc",
"winapi",
]
[[package]]
name = "spinning"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2d4f0e86297cad2658d92a707320d87bf4e6ae1050287f51d19b67ef3f153a7b"
dependencies = [
"lock_api",
]
[[package]]
name = "stable_deref_trait"
version = "1.2.0"
@ -1702,6 +1812,12 @@ dependencies = [
"lazy_static",
]
[[package]]
name = "typenum"
version = "1.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "373c8a200f9e67a0c95e62a4f52fbf80c23b4381c05a17845531982fa99e6b33"
[[package]]
name = "typetag"
version = "0.1.7"
@ -1726,6 +1842,12 @@ dependencies = [
"syn",
]
[[package]]
name = "ucd-trie"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "56dee185309b50d1f11bfedef0fe6d036842e3fb77413abef29f8f8d1c5d4c1c"
[[package]]
name = "unicode-segmentation"
version = "1.7.1"
@ -1792,6 +1914,17 @@ version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f1bddf1187be692e79c5ffeab891132dfb0f236ed36a43c7ed39f1165ee20191"
[[package]]
name = "vergen"
version = "3.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e7141e445af09c8919f1d5f8a20dae0b20c3b57a45dee0d5823c6ed5d237f15a"
dependencies = [
"bitflags",
"chrono",
"rustc_version",
]
[[package]]
name = "version_check"
version = "0.9.3"
@ -2202,7 +2335,7 @@ dependencies = [
"directories-next",
"futures",
"insta",
"interprocess",
"ipmpsc",
"lazy_static",
"libc",
"nix",

View file

@ -17,6 +17,7 @@ backtrace = "0.3.55"
bincode = "1.3.1"
directories-next = "2.0"
futures = "0.3.5"
ipmpsc = "0.5.0"
libc = "0.2"
nix = "0.19.1"
nom = "6.0.1"
@ -35,10 +36,6 @@ strum = "0.20.0"
lazy_static = "1.4.0"
wasmer = "1.0.0"
wasmer-wasi = "1.0.0"
interprocess = "1.0.1"
colors-transform = "0.2.5"
zellij-tile = { path = "zellij-tile/", version = "1.1.0" }
zellij-tile-extra = { path = "zellij-tile-extra/", version="1.0.0" }
[dependencies.async-std]
version = "1.3.0"

View file

@ -18,7 +18,7 @@ use std::{collections::HashMap, fs};
use crate::panes::PaneId;
use directories_next::ProjectDirs;
use input::handler::InputMode;
use interprocess::local_socket::LocalSocketStream;
use ipmpsc::{Sender as IpcSender, SharedRingBuffer};
use serde::{Deserialize, Serialize};
use termion::input::TermRead;
use wasm_vm::PluginEnv;
@ -132,37 +132,32 @@ thread_local!(
/// stack in the form of an [`ErrorContext`].
static OPENCALLS: RefCell<ErrorContext> = RefCell::default()
);
#[derive(Clone)]
pub struct IpcSenderWithContext {
err_ctx: ErrorContext,
sender: BufWriter<LocalSocketStream>,
sender: IpcSender,
}
impl IpcSenderWithContext {
pub fn new() -> Self {
pub fn new(buffer: SharedRingBuffer) -> Self {
Self {
err_ctx: ErrorContext::new(),
sender: BufWriter::new(LocalSocketStream::connect(ZELLIJ_IPC_PIPE).unwrap()),
sender: IpcSender::new(buffer),
}
}
// This is expensive. Use this only if a buffer is not available.
// Otherwise clone the buffer and use `new()`
pub fn to_server() -> Self {
Self::new(SharedRingBuffer::open(ZELLIJ_IPC_PIPE).unwrap())
}
pub fn update(&mut self, ctx: ErrorContext) {
self.err_ctx = ctx;
}
pub fn send(&mut self, msg: ServerInstruction) -> std::io::Result<()> {
let command = bincode::serialize(&(self.err_ctx, msg)).unwrap();
let x = self.sender.write_all(&command);
self.sender.flush();
x
}
}
impl std::clone::Clone for IpcSenderWithContext {
fn clone(&self) -> Self {
Self {
err_ctx: self.err_ctx,
sender: BufWriter::new(LocalSocketStream::connect(ZELLIJ_IPC_PIPE).unwrap()),
}
pub fn send<T: Serialize>(&mut self, msg: T) -> ipmpsc::Result<()> {
self.sender.send(&(self.err_ctx, msg))
}
}
@ -215,7 +210,7 @@ pub fn start(mut os_input: Box<dyn OsApi>, opts: CliArgs, config: Config) {
let send_app_instructions =
SenderWithContext::new(SenderType::SyncSender(send_app_instructions));
let pty_thread = start_server(
let ipc_thread = start_server(
os_input.clone(),
opts.clone(),
command_is_executing.clone(),
@ -555,7 +550,7 @@ pub fn start(mut os_input: Box<dyn OsApi>, opts: CliArgs, config: Config) {
}
});
let mut send_server_instructions = IpcSenderWithContext::new();
let mut send_server_instructions = IpcSenderWithContext::to_server();
#[warn(clippy::never_loop)]
loop {
@ -572,7 +567,7 @@ pub fn start(mut os_input: Box<dyn OsApi>, opts: CliArgs, config: Config) {
}
AppInstruction::Error(backtrace) => {
let _ = send_server_instructions.send(ServerInstruction::Quit);
//let _ = pty_thread.join();
//let _ = ipc_thread.join();
let _ = send_screen_instructions.send(ScreenInstruction::Quit);
let _ = screen_thread.join();
let _ = send_plugin_instructions.send(PluginInstruction::Quit);
@ -599,7 +594,7 @@ pub fn start(mut os_input: Box<dyn OsApi>, opts: CliArgs, config: Config) {
}
let _ = send_server_instructions.send(ServerInstruction::Quit);
//let _ = pty_thread.join().unwrap();
//let _ = ipc_thread.join().unwrap();
let _ = send_screen_instructions.send(ScreenInstruction::Quit);
screen_thread.join().unwrap();
let _ = send_plugin_instructions.send(PluginInstruction::Quit);

View file

@ -193,12 +193,16 @@ pub struct PtyBus {
pub send_server_instructions: IpcSenderWithContext,
}
fn stream_terminal_bytes(pid: RawFd, os_input: Box<dyn OsApi>, debug: bool) -> JoinHandle<()> {
fn stream_terminal_bytes(
pid: RawFd,
os_input: Box<dyn OsApi>,
mut send_server_instructions: IpcSenderWithContext,
debug: bool,
) -> JoinHandle<()> {
let mut err_ctx = OPENCALLS.with(|ctx| *ctx.borrow());
task::spawn({
async move {
err_ctx.add_call(ContextType::AsyncTask);
let mut send_server_instructions = IpcSenderWithContext::new();
send_server_instructions.update(err_ctx);
let mut vte_parser = vte::Parser::new();
let mut vte_event_sender = VteEventSender::new(pid, send_server_instructions.clone());
@ -286,8 +290,12 @@ impl PtyBus {
pub fn spawn_terminal(&mut self, file_to_open: Option<PathBuf>) -> RawFd {
let (pid_primary, pid_secondary): (RawFd, RawFd) =
self.os_input.spawn_terminal(file_to_open);
let task_handle =
stream_terminal_bytes(pid_primary, self.os_input.clone(), self.debug_to_file);
let task_handle = stream_terminal_bytes(
pid_primary,
self.os_input.clone(),
self.send_server_instructions.clone(),
self.debug_to_file,
);
self.task_handles.insert(pid_primary, task_handle);
self.id_to_child_pid.insert(pid_primary, pid_secondary);
pid_primary
@ -307,7 +315,12 @@ impl PtyBus {
)))
.unwrap();
for id in new_pane_pids {
let task_handle = stream_terminal_bytes(id, self.os_input.clone(), self.debug_to_file);
let task_handle = stream_terminal_bytes(
id,
self.os_input.clone(),
self.send_server_instructions.clone(),
self.debug_to_file,
);
self.task_handles.insert(id, task_handle);
}
}

View file

@ -62,13 +62,13 @@ pub fn main() {
if let Some(split_dir) = opts.split {
match split_dir {
'h' => {
let mut send_server_instructions = IpcSenderWithContext::new();
let mut send_server_instructions = IpcSenderWithContext::to_server();
send_server_instructions
.send(ServerInstruction::SplitHorizontally)
.unwrap();
}
'v' => {
let mut send_server_instructions = IpcSenderWithContext::new();
let mut send_server_instructions = IpcSenderWithContext::to_server();
send_server_instructions
.send(ServerInstruction::SplitVertically)
.unwrap();
@ -76,12 +76,12 @@ pub fn main() {
_ => {}
};
} else if opts.move_focus {
let mut send_server_instructions = IpcSenderWithContext::new();
let mut send_server_instructions = IpcSenderWithContext::to_server();
send_server_instructions
.send(ServerInstruction::MoveFocus)
.unwrap();
} else if let Some(file_to_open) = opts.open_file {
let mut send_server_instructions = IpcSenderWithContext::new();
let mut send_server_instructions = IpcSenderWithContext::to_server();
send_server_instructions
.send(ServerInstruction::OpenFile(file_to_open))
.unwrap();

View file

@ -12,7 +12,7 @@ use crate::pty_bus::{PtyBus, PtyInstruction};
use crate::screen::ScreenInstruction;
use crate::utils::consts::ZELLIJ_IPC_PIPE;
use crate::wasm_vm::PluginInstruction;
use interprocess::local_socket::{LocalSocketListener, LocalSocketStream};
use ipmpsc::{Receiver, SharedRingBuffer};
use std::io::{BufReader, Read};
use std::path::PathBuf;
use std::sync::mpsc::channel;
@ -32,8 +32,7 @@ pub fn start_server(
);
std::fs::remove_file(ZELLIJ_IPC_PIPE).ok();
let listener =
LocalSocketListener::bind(ZELLIJ_IPC_PIPE).expect("could not listen on ipc socket");
let server_buffer = SharedRingBuffer::create(ZELLIJ_IPC_PIPE, 8192).unwrap();
// Don't use default layouts in tests, but do everywhere else
#[cfg(not(test))]
@ -42,7 +41,7 @@ pub fn start_server(
let default_layout = None;
let maybe_layout = opts.layout.or(default_layout);
let send_server_instructions = IpcSenderWithContext::new();
let send_server_instructions = IpcSenderWithContext::new(server_buffer.clone());
let mut pty_bus = PtyBus::new(
receive_pty_instructions,
@ -120,55 +119,10 @@ pub fn start_server(
thread::Builder::new()
.name("ipc_server".to_string())
.spawn({
move || {
let mut km = 0;
for stream in listener.incoming() {
match stream {
Ok(stream) => {
let send_app_instructions = send_app_instructions.clone();
let send_pty_instructions = send_pty_instructions.clone();
let nm = format!("{}", km);
thread::Builder::new()
.name(nm)
.spawn(move || {
handle_stream(
send_pty_instructions,
send_app_instructions,
stream,
km,
);
})
.unwrap();
km += 1;
}
Err(err) => {
panic!("err {:?}", err);
}
}
}
}
})
.unwrap();
pty_thread
}
fn handle_stream(
mut send_pty_instructions: SenderWithContext<PtyInstruction>,
mut send_app_instructions: SenderWithContext<AppInstruction>,
mut stream: LocalSocketStream,
km: u32,
) {
let mut reader = BufReader::new(stream);
let mut buffer = [0; 65535]; // TODO: more accurate
loop {
let bytes = reader
.read(&mut buffer)
.expect("failed to parse ipc message");
let recv_server_instructions = Receiver::new(server_buffer);
move || loop {
let (mut err_ctx, decoded): (ErrorContext, ServerInstruction) =
match bincode::deserialize(&buffer[..bytes]) {
Ok(d) => d,
Err(_) => break,
};
recv_server_instructions.recv().unwrap();
err_ctx.add_call(ContextType::IPCServer);
send_pty_instructions.update(err_ctx);
send_app_instructions.update(err_ctx);
@ -210,8 +164,11 @@ fn handle_stream(
}
ServerInstruction::Quit => {
let _ = send_pty_instructions.send(PtyInstruction::Quit);
let _ = pty_thread.join();
break;
}
}
}
})
.unwrap()
}