Introduce IpcReceiverWIthContext and move ipc stuff to common/ipc.rs

Fall back to /tmp/zellij-{uid} directory if runtime_dir is not available.
Use serialize_into() to avoid Vec allocation.
And some cleanup
This commit is contained in:
Kunal Mohan 2021-05-03 17:23:32 +05:30
parent c6f93ba0d2
commit ea552d71e4
5 changed files with 121 additions and 82 deletions

View file

@ -1,7 +1,13 @@
//! IPC stuff for starting to split things into a client and server model. //! IPC stuff for starting to split things into a client and server model.
use crate::common::errors::{get_current_ctx, ErrorContext};
use interprocess::local_socket::LocalSocketStream;
use nix::unistd::dup;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::collections::HashSet; use std::collections::HashSet;
use std::io::{self, Write};
use std::marker::PhantomData;
use std::os::unix::io::{AsRawFd, FromRawFd};
type SessionId = u64; type SessionId = u64;
@ -45,3 +51,69 @@ pub enum _ServerToClientMsg {
// A list of sessions // A list of sessions
SessionList(HashSet<Session>), SessionList(HashSet<Session>),
} }
/// Sends messages on a stream socket, along with an [`ErrorContext`].
pub struct IpcSenderWithContext<T: Serialize> {
sender: io::BufWriter<LocalSocketStream>,
_phantom: PhantomData<T>,
}
impl<T: Serialize> IpcSenderWithContext<T> {
/// Returns a sender to the given [LocalSocketStream](interprocess::local_socket::LocalSocketStream).
pub fn new(sender: LocalSocketStream) -> Self {
Self {
sender: io::BufWriter::new(sender),
_phantom: PhantomData,
}
}
/// Sends an event, along with the current [`ErrorContext`], on this [`IpcSenderWithContext`]'s socket.
pub fn send(&mut self, msg: T) {
let err_ctx = get_current_ctx();
bincode::serialize_into(&mut self.sender, &(msg, err_ctx)).unwrap();
self.sender.flush().unwrap();
}
/// Returns an [`IpcReceiverWithContext`] with the same socket as this sender.
pub fn get_receiver<F>(&self) -> IpcReceiverWithContext<F>
where
F: for<'de> Deserialize<'de> + Serialize,
{
let sock_fd = self.sender.get_ref().as_raw_fd();
let dup_sock = dup(sock_fd).unwrap();
let socket = unsafe { LocalSocketStream::from_raw_fd(dup_sock) };
IpcReceiverWithContext::new(socket)
}
}
/// Receives messages on a stream socket, along with an [`ErrorContext`].
pub struct IpcReceiverWithContext<T> {
receiver: io::BufReader<LocalSocketStream>,
_phantom: PhantomData<T>,
}
impl<T> IpcReceiverWithContext<T>
where
T: for<'de> Deserialize<'de> + Serialize,
{
/// Returns a receiver to the given [LocalSocketStream](interprocess::local_socket::LocalSocketStream).
pub fn new(receiver: LocalSocketStream) -> Self {
Self {
receiver: io::BufReader::new(receiver),
_phantom: PhantomData,
}
}
/// Receives an event, along with the current [`ErrorContext`], on this [`IpcReceiverWithContext`]'s socket.
pub fn recv(&mut self) -> (T, ErrorContext) {
bincode::deserialize_from(&mut self.receiver).unwrap()
}
/// Returns an [`IpcSenderWithContext`] with the same socket as this receiver.
pub fn get_sender<F: Serialize>(&self) -> IpcSenderWithContext<F> {
let sock_fd = self.receiver.get_ref().as_raw_fd();
let dup_sock = dup(sock_fd).unwrap();
let socket = unsafe { LocalSocketStream::from_raw_fd(dup_sock) };
IpcSenderWithContext::new(socket)
}
}

View file

@ -12,11 +12,8 @@ pub mod wasm_vm;
use crate::panes::PaneId; use crate::panes::PaneId;
use crate::server::ServerInstruction; use crate::server::ServerInstruction;
use async_std::task_local; use async_std::task_local;
use directories_next::ProjectDirs;
use errors::{get_current_ctx, ErrorContext}; use errors::{get_current_ctx, ErrorContext};
use lazy_static::lazy_static;
use std::cell::RefCell; use std::cell::RefCell;
use std::path::PathBuf;
use std::sync::mpsc; use std::sync::mpsc;
/// An [MPSC](mpsc) asynchronous channel with added error context. /// An [MPSC](mpsc) asynchronous channel with added error context.
@ -76,15 +73,3 @@ task_local! {
/// stack in the form of an [`ErrorContext`]. /// stack in the form of an [`ErrorContext`].
static ASYNCOPENCALLS: RefCell<ErrorContext> = RefCell::default() static ASYNCOPENCALLS: RefCell<ErrorContext> = RefCell::default()
} }
lazy_static! {
pub static ref ZELLIJ_IPC_PIPE: PathBuf = {
let project_dir = ProjectDirs::from("org", "Zellij Contributors", "Zellij").unwrap();
let ipc_dir = project_dir
.runtime_dir()
.unwrap_or_else(|| project_dir.cache_dir());
std::fs::create_dir_all(ipc_dir).unwrap();
let session_name = names::Generator::default().next().unwrap();
ipc_dir.join(session_name)
};
}

View file

@ -5,20 +5,21 @@ use nix::sys::signal::{kill, Signal};
use nix::sys::termios; use nix::sys::termios;
use nix::sys::wait::waitpid; use nix::sys::wait::waitpid;
use nix::unistd::{self, ForkResult, Pid}; use nix::unistd::{self, ForkResult, Pid};
use serde::Serialize;
use signal_hook::{consts::signal::*, iterator::Signals}; use signal_hook::{consts::signal::*, iterator::Signals};
use std::env; use std::env;
use std::io; use std::io;
use std::io::prelude::*; use std::io::prelude::*;
use std::marker::PhantomData; use std::os::unix::io::RawFd;
use std::os::unix::io::{AsRawFd, FromRawFd, RawFd};
use std::path::PathBuf; use std::path::PathBuf;
use std::process::{Child, Command}; use std::process::{Child, Command};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use crate::client::ClientInstruction; use crate::client::ClientInstruction;
use crate::common::ZELLIJ_IPC_PIPE; use crate::common::{
use crate::errors::{get_current_ctx, ErrorContext}; ipc::{IpcReceiverWithContext, IpcSenderWithContext},
utils::consts::ZELLIJ_IPC_PIPE,
};
use crate::errors::ErrorContext;
use crate::panes::PositionAndSize; use crate::panes::PositionAndSize;
use crate::server::ServerInstruction; use crate::server::ServerInstruction;
@ -158,35 +159,10 @@ fn spawn_terminal(file_to_open: Option<PathBuf>, orig_termios: termios::Termios)
(pid_primary, pid_secondary) (pid_primary, pid_secondary)
} }
/// Sends messages on an [ipmpsc](ipmpsc) channel, along with an [`ErrorContext`].
struct IpcSenderWithContext<T: Serialize> {
sender: io::BufWriter<LocalSocketStream>,
_phantom: PhantomData<T>,
}
impl<T: Serialize> IpcSenderWithContext<T> {
/// Returns a sender to the given [SharedRingBuffer](ipmpsc::SharedRingBuffer).
fn new(sender: LocalSocketStream) -> Self {
Self {
sender: io::BufWriter::new(sender),
_phantom: PhantomData,
}
}
/// Sends an event, along with the current [`ErrorContext`], on this
/// [`IpcSenderWithContext`]'s channel.
fn send(&mut self, msg: T) -> Result<(), std::io::Error> {
let err_ctx = get_current_ctx();
self.sender
.write_all(&bincode::serialize(&(msg, err_ctx)).unwrap())?;
self.sender.flush()
}
}
#[derive(Clone)] #[derive(Clone)]
pub struct ServerOsInputOutput { pub struct ServerOsInputOutput {
orig_termios: Arc<Mutex<termios::Termios>>, orig_termios: Arc<Mutex<termios::Termios>>,
receive_instructions_from_client: Option<Arc<Mutex<io::BufReader<LocalSocketStream>>>>, receive_instructions_from_client: Option<Arc<Mutex<IpcReceiverWithContext<ServerInstruction>>>>,
send_instructions_to_client: Arc<Mutex<Option<IpcSenderWithContext<ClientInstruction>>>>, send_instructions_to_client: Arc<Mutex<Option<IpcSenderWithContext<ClientInstruction>>>>,
} }
@ -252,15 +228,12 @@ impl ServerOsApi for ServerOsInputOutput {
Ok(()) Ok(())
} }
fn recv_from_client(&self) -> (ServerInstruction, ErrorContext) { fn recv_from_client(&self) -> (ServerInstruction, ErrorContext) {
bincode::deserialize_from( self.receive_instructions_from_client
&mut *self .as_ref()
.receive_instructions_from_client .unwrap()
.as_ref() .lock()
.unwrap() .unwrap()
.lock() .recv()
.unwrap(),
)
.unwrap()
} }
fn send_to_client(&self, msg: ClientInstruction) { fn send_to_client(&self, msg: ClientInstruction) {
self.send_instructions_to_client self.send_instructions_to_client
@ -268,27 +241,22 @@ impl ServerOsApi for ServerOsInputOutput {
.unwrap() .unwrap()
.as_mut() .as_mut()
.unwrap() .unwrap()
.send(msg) .send(msg);
.unwrap();
} }
fn add_client_sender(&mut self) { fn add_client_sender(&mut self) {
assert!(self.send_instructions_to_client.lock().unwrap().is_none()); assert!(self.send_instructions_to_client.lock().unwrap().is_none());
let sock_fd = self let sender = self
.receive_instructions_from_client .receive_instructions_from_client
.as_ref() .as_ref()
.unwrap() .unwrap()
.lock() .lock()
.unwrap() .unwrap()
.get_ref() .get_sender();
.as_raw_fd(); *self.send_instructions_to_client.lock().unwrap() = Some(sender);
let dup_fd = unistd::dup(sock_fd).unwrap();
let dup_sock = unsafe { LocalSocketStream::from_raw_fd(dup_fd) };
*self.send_instructions_to_client.lock().unwrap() =
Some(IpcSenderWithContext::new(dup_sock));
} }
fn update_receiver(&mut self, stream: LocalSocketStream) { fn update_receiver(&mut self, stream: LocalSocketStream) {
self.receive_instructions_from_client = self.receive_instructions_from_client =
Some(Arc::new(Mutex::new(io::BufReader::new(stream)))); Some(Arc::new(Mutex::new(IpcReceiverWithContext::new(stream))));
} }
} }
@ -312,7 +280,7 @@ pub fn get_server_os_input() -> ServerOsInputOutput {
pub struct ClientOsInputOutput { pub struct ClientOsInputOutput {
orig_termios: Arc<Mutex<termios::Termios>>, orig_termios: Arc<Mutex<termios::Termios>>,
send_instructions_to_server: Arc<Mutex<Option<IpcSenderWithContext<ServerInstruction>>>>, send_instructions_to_server: Arc<Mutex<Option<IpcSenderWithContext<ServerInstruction>>>>,
receive_instructions_from_server: Arc<Mutex<Option<io::BufReader<LocalSocketStream>>>>, receive_instructions_from_server: Arc<Mutex<Option<IpcReceiverWithContext<ClientInstruction>>>>,
} }
/// The `ClientOsApi` trait represents an abstract interface to the features of an operating system that /// The `ClientOsApi` trait represents an abstract interface to the features of an operating system that
@ -375,19 +343,15 @@ impl ClientOsApi for ClientOsInputOutput {
.unwrap() .unwrap()
.as_mut() .as_mut()
.unwrap() .unwrap()
.send(msg) .send(msg);
.unwrap();
} }
fn recv_from_server(&self) -> (ClientInstruction, ErrorContext) { fn recv_from_server(&self) -> (ClientInstruction, ErrorContext) {
bincode::deserialize_from( self.receive_instructions_from_server
&mut self .lock()
.receive_instructions_from_server .unwrap()
.lock() .as_mut()
.unwrap() .unwrap()
.as_mut() .recv()
.unwrap(),
)
.unwrap()
} }
fn receive_sigwinch(&self, cb: Box<dyn Fn()>) { fn receive_sigwinch(&self, cb: Box<dyn Fn()>) {
let mut signals = Signals::new(&[SIGWINCH, SIGTERM, SIGINT, SIGQUIT]).unwrap(); let mut signals = Signals::new(&[SIGWINCH, SIGTERM, SIGINT, SIGQUIT]).unwrap();
@ -411,12 +375,10 @@ impl ClientOsApi for ClientOsInputOutput {
LocalSocketStream::connect(ZELLIJ_IPC_PIPE.clone()).unwrap() LocalSocketStream::connect(ZELLIJ_IPC_PIPE.clone()).unwrap()
} }
}; };
let sock_fd = socket.as_raw_fd();
let dup_fd = unistd::dup(sock_fd).unwrap();
let receiver = unsafe { LocalSocketStream::from_raw_fd(dup_fd) };
let sender = IpcSenderWithContext::new(socket); let sender = IpcSenderWithContext::new(socket);
let receiver = sender.get_receiver();
*self.send_instructions_to_server.lock().unwrap() = Some(sender); *self.send_instructions_to_server.lock().unwrap() = Some(sender);
*self.receive_instructions_from_server.lock().unwrap() = Some(io::BufReader::new(receiver)); *self.receive_instructions_from_server.lock().unwrap() = Some(receiver);
} }
} }

View file

@ -1,5 +1,25 @@
//! Zellij program-wide constants. //! Zellij program-wide constants.
use directories_next::ProjectDirs;
use lazy_static::lazy_static;
use nix::unistd::Uid;
use std::path::PathBuf;
pub const ZELLIJ_TMP_DIR: &str = "/tmp/zellij"; pub const ZELLIJ_TMP_DIR: &str = "/tmp/zellij";
pub const ZELLIJ_TMP_LOG_DIR: &str = "/tmp/zellij/zellij-log"; pub const ZELLIJ_TMP_LOG_DIR: &str = "/tmp/zellij/zellij-log";
pub const ZELLIJ_TMP_LOG_FILE: &str = "/tmp/zellij/zellij-log/log.txt"; pub const ZELLIJ_TMP_LOG_FILE: &str = "/tmp/zellij/zellij-log/log.txt";
lazy_static! {
static ref UID: Uid = Uid::current();
pub static ref SESSION_NAME: String = names::Generator::default().next().unwrap();
pub static ref ZELLIJ_IPC_PIPE: PathBuf = {
let project_dir = ProjectDirs::from("org", "Zellij Contributors", "Zellij").unwrap();
let mut ipc_dir = project_dir
.runtime_dir()
.map(|p| p.to_owned())
.unwrap_or_else(|| PathBuf::from("/tmp/zellij-".to_string() + &format!("{}", *UID)));
std::fs::create_dir_all(&ipc_dir).unwrap();
ipc_dir.push(&*SESSION_NAME);
ipc_dir
};
}

View file

@ -16,7 +16,6 @@ use zellij_tile::data::{Event, EventType, ModeInfo};
use crate::cli::CliArgs; use crate::cli::CliArgs;
use crate::client::ClientInstruction; use crate::client::ClientInstruction;
use crate::common::ZELLIJ_IPC_PIPE;
use crate::common::{ use crate::common::{
errors::{ContextType, PluginContext, PtyContext, ScreenContext, ServerContext}, errors::{ContextType, PluginContext, PtyContext, ScreenContext, ServerContext},
input::actions::{Action, Direction}, input::actions::{Action, Direction},
@ -24,6 +23,7 @@ use crate::common::{
os_input_output::ServerOsApi, os_input_output::ServerOsApi,
pty_bus::{PtyBus, PtyInstruction}, pty_bus::{PtyBus, PtyInstruction},
screen::{Screen, ScreenInstruction}, screen::{Screen, ScreenInstruction},
utils::consts::ZELLIJ_IPC_PIPE,
wasm_vm::{wasi_stdout, wasi_write_string, zellij_imports, PluginEnv, PluginInstruction}, wasm_vm::{wasi_stdout, wasi_write_string, zellij_imports, PluginEnv, PluginInstruction},
ChannelWithContext, SenderType, SenderWithContext, ChannelWithContext, SenderType, SenderWithContext,
}; };