fix(pty): paste freeze with large amounts of text (#1383)
add pty writer thread to avoid screen thread blocking on unistd::write
This commit is contained in:
parent
69e570cf71
commit
e8f9559062
5 changed files with 101 additions and 11 deletions
|
|
@ -5,6 +5,7 @@ pub mod tab;
|
||||||
|
|
||||||
mod logging_pipe;
|
mod logging_pipe;
|
||||||
mod pty;
|
mod pty;
|
||||||
|
mod pty_writer;
|
||||||
mod route;
|
mod route;
|
||||||
mod screen;
|
mod screen;
|
||||||
mod thread_bus;
|
mod thread_bus;
|
||||||
|
|
@ -12,6 +13,7 @@ mod ui;
|
||||||
mod wasm_vm;
|
mod wasm_vm;
|
||||||
|
|
||||||
use log::info;
|
use log::info;
|
||||||
|
use pty_writer::{pty_writer_main, PtyWriteInstruction};
|
||||||
use std::collections::{HashMap, HashSet};
|
use std::collections::{HashMap, HashSet};
|
||||||
use std::{
|
use std::{
|
||||||
path::PathBuf,
|
path::PathBuf,
|
||||||
|
|
@ -106,6 +108,7 @@ pub(crate) struct SessionMetaData {
|
||||||
screen_thread: Option<thread::JoinHandle<()>>,
|
screen_thread: Option<thread::JoinHandle<()>>,
|
||||||
pty_thread: Option<thread::JoinHandle<()>>,
|
pty_thread: Option<thread::JoinHandle<()>>,
|
||||||
wasm_thread: Option<thread::JoinHandle<()>>,
|
wasm_thread: Option<thread::JoinHandle<()>>,
|
||||||
|
pty_writer_thread: Option<thread::JoinHandle<()>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Drop for SessionMetaData {
|
impl Drop for SessionMetaData {
|
||||||
|
|
@ -116,6 +119,7 @@ impl Drop for SessionMetaData {
|
||||||
let _ = self.screen_thread.take().unwrap().join();
|
let _ = self.screen_thread.take().unwrap().join();
|
||||||
let _ = self.pty_thread.take().unwrap().join();
|
let _ = self.pty_thread.take().unwrap().join();
|
||||||
let _ = self.wasm_thread.take().unwrap().join();
|
let _ = self.wasm_thread.take().unwrap().join();
|
||||||
|
let _ = self.pty_writer_thread.take().unwrap().join();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -583,6 +587,10 @@ fn init_session(
|
||||||
let (to_pty, pty_receiver): ChannelWithContext<PtyInstruction> = channels::unbounded();
|
let (to_pty, pty_receiver): ChannelWithContext<PtyInstruction> = channels::unbounded();
|
||||||
let to_pty = SenderWithContext::new(to_pty);
|
let to_pty = SenderWithContext::new(to_pty);
|
||||||
|
|
||||||
|
let (to_pty_writer, pty_writer_receiver): ChannelWithContext<PtyWriteInstruction> =
|
||||||
|
channels::unbounded();
|
||||||
|
let to_pty_writer = SenderWithContext::new(to_pty_writer);
|
||||||
|
|
||||||
// Determine and initialize the data directory
|
// Determine and initialize the data directory
|
||||||
let data_dir = opts.data_dir.unwrap_or_else(get_default_data_dir);
|
let data_dir = opts.data_dir.unwrap_or_else(get_default_data_dir);
|
||||||
|
|
||||||
|
|
@ -607,6 +615,7 @@ fn init_session(
|
||||||
None,
|
None,
|
||||||
Some(&to_plugin),
|
Some(&to_plugin),
|
||||||
Some(&to_server),
|
Some(&to_server),
|
||||||
|
Some(&to_pty_writer),
|
||||||
Some(os_input.clone()),
|
Some(os_input.clone()),
|
||||||
),
|
),
|
||||||
opts.debug,
|
opts.debug,
|
||||||
|
|
@ -625,6 +634,7 @@ fn init_session(
|
||||||
Some(&to_pty),
|
Some(&to_pty),
|
||||||
Some(&to_plugin),
|
Some(&to_plugin),
|
||||||
Some(&to_server),
|
Some(&to_server),
|
||||||
|
Some(&to_pty_writer),
|
||||||
Some(os_input.clone()),
|
Some(os_input.clone()),
|
||||||
);
|
);
|
||||||
let max_panes = opts.max_panes;
|
let max_panes = opts.max_panes;
|
||||||
|
|
@ -644,6 +654,7 @@ fn init_session(
|
||||||
Some(&to_pty),
|
Some(&to_pty),
|
||||||
Some(&to_plugin),
|
Some(&to_plugin),
|
||||||
None,
|
None,
|
||||||
|
Some(&to_pty_writer),
|
||||||
None,
|
None,
|
||||||
);
|
);
|
||||||
let store = Store::default();
|
let store = Store::default();
|
||||||
|
|
@ -651,11 +662,29 @@ fn init_session(
|
||||||
move || wasm_thread_main(plugin_bus, store, data_dir, plugins.unwrap_or_default())
|
move || wasm_thread_main(plugin_bus, store, data_dir, plugins.unwrap_or_default())
|
||||||
})
|
})
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
|
let pty_writer_thread = thread::Builder::new()
|
||||||
|
.name("pty_writer".to_string())
|
||||||
|
.spawn({
|
||||||
|
let pty_writer_bus = Bus::new(
|
||||||
|
vec![pty_writer_receiver],
|
||||||
|
Some(&to_screen),
|
||||||
|
Some(&to_pty),
|
||||||
|
Some(&to_plugin),
|
||||||
|
Some(&to_server),
|
||||||
|
None,
|
||||||
|
Some(os_input.clone()),
|
||||||
|
);
|
||||||
|
|| pty_writer_main(pty_writer_bus)
|
||||||
|
})
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
SessionMetaData {
|
SessionMetaData {
|
||||||
senders: ThreadSenders {
|
senders: ThreadSenders {
|
||||||
to_screen: Some(to_screen),
|
to_screen: Some(to_screen),
|
||||||
to_pty: Some(to_pty),
|
to_pty: Some(to_pty),
|
||||||
to_plugin: Some(to_plugin),
|
to_plugin: Some(to_plugin),
|
||||||
|
to_pty_writer: Some(to_pty_writer),
|
||||||
to_server: None,
|
to_server: None,
|
||||||
should_silently_fail: false,
|
should_silently_fail: false,
|
||||||
},
|
},
|
||||||
|
|
@ -665,5 +694,6 @@ fn init_session(
|
||||||
screen_thread: Some(screen_thread),
|
screen_thread: Some(screen_thread),
|
||||||
pty_thread: Some(pty_thread),
|
pty_thread: Some(pty_thread),
|
||||||
wasm_thread: Some(wasm_thread),
|
wasm_thread: Some(wasm_thread),
|
||||||
|
pty_writer_thread: Some(pty_writer_thread),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
34
zellij-server/src/pty_writer.rs
Normal file
34
zellij-server/src/pty_writer.rs
Normal file
|
|
@ -0,0 +1,34 @@
|
||||||
|
use zellij_utils::errors::{ContextType, PtyWriteContext};
|
||||||
|
|
||||||
|
use crate::thread_bus::Bus;
|
||||||
|
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub(crate) enum PtyWriteInstruction {
|
||||||
|
Write(Vec<u8>, i32),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<&PtyWriteInstruction> for PtyWriteContext {
|
||||||
|
fn from(tty_write_instruction: &PtyWriteInstruction) -> Self {
|
||||||
|
match *tty_write_instruction {
|
||||||
|
PtyWriteInstruction::Write(..) => PtyWriteContext::Write,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn pty_writer_main(bus: Bus<PtyWriteInstruction>) {
|
||||||
|
loop {
|
||||||
|
let (event, mut err_ctx) = bus.recv().expect("failed to receive event on channel");
|
||||||
|
err_ctx.add_call(ContextType::PtyWrite((&event).into()));
|
||||||
|
let os_input = bus.os_input.clone().unwrap();
|
||||||
|
match event {
|
||||||
|
PtyWriteInstruction::Write(bytes, terminal_id) => {
|
||||||
|
if let Err(e) = os_input.write_to_tty_stdin(terminal_id, &bytes) {
|
||||||
|
log::error!("failed to write to terminal: {}", e);
|
||||||
|
}
|
||||||
|
if let Err(e) = os_input.tcdrain(terminal_id) {
|
||||||
|
log::error!("failed to drain terminal: {}", e);
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -9,6 +9,7 @@ use zellij_tile::prelude::Style;
|
||||||
use zellij_utils::position::{Column, Line};
|
use zellij_utils::position::{Column, Line};
|
||||||
use zellij_utils::{position::Position, serde, zellij_tile};
|
use zellij_utils::{position::Position, serde, zellij_tile};
|
||||||
|
|
||||||
|
use crate::pty_writer::PtyWriteInstruction;
|
||||||
use crate::screen::CopyOptions;
|
use crate::screen::CopyOptions;
|
||||||
use crate::ui::pane_boundaries_frame::FrameParams;
|
use crate::ui::pane_boundaries_frame::FrameParams;
|
||||||
|
|
||||||
|
|
@ -869,15 +870,13 @@ impl Tab {
|
||||||
.get(&pane_id)
|
.get(&pane_id)
|
||||||
.unwrap_or_else(|| self.tiled_panes.get_pane(pane_id).unwrap());
|
.unwrap_or_else(|| self.tiled_panes.get_pane(pane_id).unwrap());
|
||||||
let adjusted_input = active_terminal.adjust_input_to_terminal(input_bytes);
|
let adjusted_input = active_terminal.adjust_input_to_terminal(input_bytes);
|
||||||
if let Err(e) = self
|
|
||||||
.os_api
|
self.senders
|
||||||
.write_to_tty_stdin(active_terminal_id, &adjusted_input)
|
.send_to_pty_writer(PtyWriteInstruction::Write(
|
||||||
{
|
adjusted_input,
|
||||||
log::error!("failed to write to terminal: {}", e);
|
active_terminal_id,
|
||||||
}
|
))
|
||||||
if let Err(e) = self.os_api.tcdrain(active_terminal_id) {
|
.unwrap();
|
||||||
log::error!("failed to drain terminal: {}", e);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
PaneId::Plugin(pid) => {
|
PaneId::Plugin(pid) => {
|
||||||
for key in parse_keys(&input_bytes) {
|
for key in parse_keys(&input_bytes) {
|
||||||
|
|
|
||||||
|
|
@ -1,8 +1,8 @@
|
||||||
//! Definitions and helpers for sending and receiving messages between threads.
|
//! Definitions and helpers for sending and receiving messages between threads.
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
os_input_output::ServerOsApi, pty::PtyInstruction, screen::ScreenInstruction,
|
os_input_output::ServerOsApi, pty::PtyInstruction, pty_writer::PtyWriteInstruction,
|
||||||
wasm_vm::PluginInstruction, ServerInstruction,
|
screen::ScreenInstruction, wasm_vm::PluginInstruction, ServerInstruction,
|
||||||
};
|
};
|
||||||
use zellij_utils::{channels, channels::SenderWithContext, errors::ErrorContext};
|
use zellij_utils::{channels, channels::SenderWithContext, errors::ErrorContext};
|
||||||
|
|
||||||
|
|
@ -13,6 +13,7 @@ pub(crate) struct ThreadSenders {
|
||||||
pub to_pty: Option<SenderWithContext<PtyInstruction>>,
|
pub to_pty: Option<SenderWithContext<PtyInstruction>>,
|
||||||
pub to_plugin: Option<SenderWithContext<PluginInstruction>>,
|
pub to_plugin: Option<SenderWithContext<PluginInstruction>>,
|
||||||
pub to_server: Option<SenderWithContext<ServerInstruction>>,
|
pub to_server: Option<SenderWithContext<ServerInstruction>>,
|
||||||
|
pub to_pty_writer: Option<SenderWithContext<PtyWriteInstruction>>,
|
||||||
// this is a convenience for the unit tests
|
// this is a convenience for the unit tests
|
||||||
// it's not advisable to set it to true in production code
|
// it's not advisable to set it to true in production code
|
||||||
pub should_silently_fail: bool,
|
pub should_silently_fail: bool,
|
||||||
|
|
@ -82,6 +83,22 @@ impl ThreadSenders {
|
||||||
self.to_server.as_ref().unwrap().send(instruction)
|
self.to_server.as_ref().unwrap().send(instruction)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
pub fn send_to_pty_writer(
|
||||||
|
&self,
|
||||||
|
instruction: PtyWriteInstruction,
|
||||||
|
) -> Result<(), channels::SendError<(PtyWriteInstruction, ErrorContext)>> {
|
||||||
|
if self.should_silently_fail {
|
||||||
|
let _ = self
|
||||||
|
.to_pty_writer
|
||||||
|
.as_ref()
|
||||||
|
.map(|sender| sender.send(instruction))
|
||||||
|
.unwrap_or_else(|| Ok(()));
|
||||||
|
Ok(())
|
||||||
|
} else {
|
||||||
|
self.to_pty_writer.as_ref().unwrap().send(instruction)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[allow(unused)]
|
#[allow(unused)]
|
||||||
pub fn silently_fail_on_send(mut self) -> Self {
|
pub fn silently_fail_on_send(mut self) -> Self {
|
||||||
// this is mostly used for the tests, see struct
|
// this is mostly used for the tests, see struct
|
||||||
|
|
@ -105,6 +122,7 @@ impl<T> Bus<T> {
|
||||||
to_pty: Option<&SenderWithContext<PtyInstruction>>,
|
to_pty: Option<&SenderWithContext<PtyInstruction>>,
|
||||||
to_plugin: Option<&SenderWithContext<PluginInstruction>>,
|
to_plugin: Option<&SenderWithContext<PluginInstruction>>,
|
||||||
to_server: Option<&SenderWithContext<ServerInstruction>>,
|
to_server: Option<&SenderWithContext<ServerInstruction>>,
|
||||||
|
to_pty_writer: Option<&SenderWithContext<PtyWriteInstruction>>,
|
||||||
os_input: Option<Box<dyn ServerOsApi>>,
|
os_input: Option<Box<dyn ServerOsApi>>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
Bus {
|
Bus {
|
||||||
|
|
@ -114,6 +132,7 @@ impl<T> Bus<T> {
|
||||||
to_pty: to_pty.cloned(),
|
to_pty: to_pty.cloned(),
|
||||||
to_plugin: to_plugin.cloned(),
|
to_plugin: to_plugin.cloned(),
|
||||||
to_server: to_server.cloned(),
|
to_server: to_server.cloned(),
|
||||||
|
to_pty_writer: to_pty_writer.cloned(),
|
||||||
should_silently_fail: false,
|
should_silently_fail: false,
|
||||||
},
|
},
|
||||||
os_input: os_input.clone(),
|
os_input: os_input.clone(),
|
||||||
|
|
@ -129,6 +148,7 @@ impl<T> Bus<T> {
|
||||||
to_pty: None,
|
to_pty: None,
|
||||||
to_plugin: None,
|
to_plugin: None,
|
||||||
to_server: None,
|
to_server: None,
|
||||||
|
to_pty_writer: None,
|
||||||
should_silently_fail: true,
|
should_silently_fail: true,
|
||||||
},
|
},
|
||||||
os_input: None,
|
os_input: None,
|
||||||
|
|
|
||||||
|
|
@ -182,6 +182,7 @@ pub enum ContextType {
|
||||||
IPCServer(ServerContext),
|
IPCServer(ServerContext),
|
||||||
StdinHandler,
|
StdinHandler,
|
||||||
AsyncTask,
|
AsyncTask,
|
||||||
|
PtyWrite(PtyWriteContext),
|
||||||
/// An empty, placeholder call. This should be thought of as representing no call at all.
|
/// An empty, placeholder call. This should be thought of as representing no call at all.
|
||||||
/// A call stack representation filled with these is the representation of an empty call stack.
|
/// A call stack representation filled with these is the representation of an empty call stack.
|
||||||
Empty,
|
Empty,
|
||||||
|
|
@ -197,6 +198,7 @@ impl Display for ContextType {
|
||||||
ContextType::IPCServer(c) => Some(("ipc_server:", format!("{:?}", c))),
|
ContextType::IPCServer(c) => Some(("ipc_server:", format!("{:?}", c))),
|
||||||
ContextType::StdinHandler => Some(("stdin_handler_thread:", "AcceptInput".to_string())),
|
ContextType::StdinHandler => Some(("stdin_handler_thread:", "AcceptInput".to_string())),
|
||||||
ContextType::AsyncTask => Some(("stream_terminal_bytes:", "AsyncTask".to_string())),
|
ContextType::AsyncTask => Some(("stream_terminal_bytes:", "AsyncTask".to_string())),
|
||||||
|
ContextType::PtyWrite(c) => Some(("pty_writer_thread:", format!("{:?}", c))),
|
||||||
ContextType::Empty => None,
|
ContextType::Empty => None,
|
||||||
} {
|
} {
|
||||||
write!(f, "{} {}", left.purple(), right.green())
|
write!(f, "{} {}", left.purple(), right.green())
|
||||||
|
|
@ -337,3 +339,8 @@ pub enum ServerContext {
|
||||||
AttachClient,
|
AttachClient,
|
||||||
ConnStatus,
|
ConnStatus,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
|
||||||
|
pub enum PtyWriteContext {
|
||||||
|
Write,
|
||||||
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue