perf(terminal): better responsiveness (#1585)
* performance(pty): only buffer terminal bytes when screen thread is backed up * style(fmt): rustfmt
This commit is contained in:
parent
b4cf0e7a81
commit
2d2bbbd6c3
3 changed files with 191 additions and 126 deletions
|
|
@ -8,6 +8,7 @@ mod pty;
|
||||||
mod pty_writer;
|
mod pty_writer;
|
||||||
mod route;
|
mod route;
|
||||||
mod screen;
|
mod screen;
|
||||||
|
mod terminal_bytes;
|
||||||
mod thread_bus;
|
mod thread_bus;
|
||||||
mod ui;
|
mod ui;
|
||||||
mod wasm_vm;
|
mod wasm_vm;
|
||||||
|
|
|
||||||
|
|
@ -1,31 +1,18 @@
|
||||||
|
use crate::terminal_bytes::TerminalBytes;
|
||||||
use crate::{
|
use crate::{
|
||||||
os_input_output::{AsyncReader, ServerOsApi},
|
panes::PaneId, screen::ScreenInstruction, thread_bus::Bus, wasm_vm::PluginInstruction,
|
||||||
panes::PaneId,
|
|
||||||
screen::ScreenInstruction,
|
|
||||||
thread_bus::{Bus, ThreadSenders},
|
|
||||||
wasm_vm::PluginInstruction,
|
|
||||||
ClientId, ServerInstruction,
|
ClientId, ServerInstruction,
|
||||||
};
|
};
|
||||||
use async_std::{
|
use async_std::task::{self, JoinHandle};
|
||||||
future::timeout as async_timeout,
|
use std::{collections::HashMap, env, os::unix::io::RawFd, path::PathBuf};
|
||||||
task::{self, JoinHandle},
|
|
||||||
};
|
|
||||||
use std::{
|
|
||||||
collections::HashMap,
|
|
||||||
env,
|
|
||||||
os::unix::io::RawFd,
|
|
||||||
path::PathBuf,
|
|
||||||
time::{Duration, Instant},
|
|
||||||
};
|
|
||||||
use zellij_utils::nix::unistd::Pid;
|
use zellij_utils::nix::unistd::Pid;
|
||||||
use zellij_utils::{
|
use zellij_utils::{
|
||||||
async_std,
|
async_std,
|
||||||
errors::{get_current_ctx, ContextType, PtyContext},
|
errors::{ContextType, PtyContext},
|
||||||
input::{
|
input::{
|
||||||
command::{RunCommand, TerminalAction},
|
command::{RunCommand, TerminalAction},
|
||||||
layout::{Layout, LayoutFromYaml, Run, TabLayout},
|
layout::{Layout, LayoutFromYaml, Run, TabLayout},
|
||||||
},
|
},
|
||||||
logging::debug_to_file,
|
|
||||||
};
|
};
|
||||||
|
|
||||||
pub type VteBytes = Vec<u8>;
|
pub type VteBytes = Vec<u8>;
|
||||||
|
|
@ -198,100 +185,6 @@ pub(crate) fn pty_thread_main(mut pty: Pty, layout: Box<LayoutFromYaml>) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
enum ReadResult {
|
|
||||||
Ok(usize),
|
|
||||||
Timeout,
|
|
||||||
Err(std::io::Error),
|
|
||||||
}
|
|
||||||
|
|
||||||
impl From<std::io::Result<usize>> for ReadResult {
|
|
||||||
fn from(e: std::io::Result<usize>) -> ReadResult {
|
|
||||||
match e {
|
|
||||||
Err(e) => ReadResult::Err(e),
|
|
||||||
Ok(n) => ReadResult::Ok(n),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn deadline_read(
|
|
||||||
reader: &mut dyn AsyncReader,
|
|
||||||
deadline: Option<Instant>,
|
|
||||||
buf: &mut [u8],
|
|
||||||
) -> ReadResult {
|
|
||||||
if let Some(deadline) = deadline {
|
|
||||||
let timeout = deadline.checked_duration_since(Instant::now());
|
|
||||||
if let Some(timeout) = timeout {
|
|
||||||
match async_timeout(timeout, reader.read(buf)).await {
|
|
||||||
Ok(res) => res.into(),
|
|
||||||
_ => ReadResult::Timeout,
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// deadline has already elapsed
|
|
||||||
ReadResult::Timeout
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
reader.read(buf).await.into()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn async_send_to_screen(senders: ThreadSenders, screen_instruction: ScreenInstruction) {
|
|
||||||
task::spawn_blocking(move || senders.send_to_screen(screen_instruction))
|
|
||||||
.await
|
|
||||||
.unwrap()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn stream_terminal_bytes(
|
|
||||||
pid: RawFd,
|
|
||||||
senders: ThreadSenders,
|
|
||||||
os_input: Box<dyn ServerOsApi>,
|
|
||||||
debug: bool,
|
|
||||||
) -> JoinHandle<()> {
|
|
||||||
let mut err_ctx = get_current_ctx();
|
|
||||||
task::spawn({
|
|
||||||
async move {
|
|
||||||
err_ctx.add_call(ContextType::AsyncTask);
|
|
||||||
|
|
||||||
// After a successful read, we keep on reading additional data up to a duration of
|
|
||||||
// `RENDER_PAUSE`. This is in order to batch up PtyBytes before rendering them.
|
|
||||||
// Once `render_deadline` has elapsed, we send Render.
|
|
||||||
const RENDER_PAUSE: Duration = Duration::from_millis(30);
|
|
||||||
let mut render_deadline = None;
|
|
||||||
// Keep track of the last render time so we can render immediately if something shows
|
|
||||||
// up after a period of inactivity. This reduces input latency perception.
|
|
||||||
let mut last_render = Instant::now();
|
|
||||||
|
|
||||||
let mut buf = [0u8; 65536];
|
|
||||||
let mut async_reader = os_input.async_file_reader(pid);
|
|
||||||
loop {
|
|
||||||
match deadline_read(async_reader.as_mut(), render_deadline, &mut buf).await {
|
|
||||||
ReadResult::Ok(0) | ReadResult::Err(_) => break, // EOF or error
|
|
||||||
ReadResult::Timeout => {
|
|
||||||
async_send_to_screen(senders.clone(), ScreenInstruction::Render).await;
|
|
||||||
// next read does not need a deadline as we just rendered everything
|
|
||||||
render_deadline = None;
|
|
||||||
last_render = Instant::now();
|
|
||||||
},
|
|
||||||
ReadResult::Ok(n_bytes) => {
|
|
||||||
let bytes = &buf[..n_bytes];
|
|
||||||
if debug {
|
|
||||||
let _ = debug_to_file(bytes, pid);
|
|
||||||
}
|
|
||||||
async_send_to_screen(
|
|
||||||
senders.clone(),
|
|
||||||
ScreenInstruction::PtyBytes(pid, bytes.to_vec()),
|
|
||||||
)
|
|
||||||
.await;
|
|
||||||
// if we already have a render_deadline we keep it, otherwise we set it
|
|
||||||
// to RENDER_PAUSE since the last time we rendered.
|
|
||||||
render_deadline.get_or_insert(last_render + RENDER_PAUSE);
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
async_send_to_screen(senders.clone(), ScreenInstruction::Render).await;
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Pty {
|
impl Pty {
|
||||||
pub fn new(
|
pub fn new(
|
||||||
bus: Bus<PtyInstruction>,
|
bus: Bus<PtyInstruction>,
|
||||||
|
|
@ -361,13 +254,18 @@ impl Pty {
|
||||||
.as_mut()
|
.as_mut()
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.spawn_terminal(terminal_action, quit_cb, self.default_editor.clone())?;
|
.spawn_terminal(terminal_action, quit_cb, self.default_editor.clone())?;
|
||||||
let task_handle = stream_terminal_bytes(
|
let terminal_bytes = task::spawn({
|
||||||
pid_primary,
|
let senders = self.bus.senders.clone();
|
||||||
self.bus.senders.clone(),
|
let os_input = self.bus.os_input.as_ref().unwrap().clone();
|
||||||
self.bus.os_input.as_ref().unwrap().clone(),
|
let debug_to_file = self.debug_to_file;
|
||||||
self.debug_to_file,
|
async move {
|
||||||
);
|
TerminalBytes::new(pid_primary, senders, os_input, debug_to_file)
|
||||||
self.task_handles.insert(pid_primary, task_handle);
|
.listen()
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
self.task_handles.insert(pid_primary, terminal_bytes);
|
||||||
self.id_to_child_pid.insert(pid_primary, child_fd);
|
self.id_to_child_pid.insert(pid_primary, child_fd);
|
||||||
Ok(pid_primary)
|
Ok(pid_primary)
|
||||||
}
|
}
|
||||||
|
|
@ -425,13 +323,17 @@ impl Pty {
|
||||||
))
|
))
|
||||||
.unwrap();
|
.unwrap();
|
||||||
for id in new_pane_pids {
|
for id in new_pane_pids {
|
||||||
let task_handle = stream_terminal_bytes(
|
let terminal_bytes = task::spawn({
|
||||||
id,
|
let senders = self.bus.senders.clone();
|
||||||
self.bus.senders.clone(),
|
let os_input = self.bus.os_input.as_ref().unwrap().clone();
|
||||||
self.bus.os_input.as_ref().unwrap().clone(),
|
let debug_to_file = self.debug_to_file;
|
||||||
self.debug_to_file,
|
async move {
|
||||||
);
|
TerminalBytes::new(id, senders, os_input, debug_to_file)
|
||||||
self.task_handles.insert(id, task_handle);
|
.listen()
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
self.task_handles.insert(id, terminal_bytes);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
pub fn close_pane(&mut self, id: PaneId) {
|
pub fn close_pane(&mut self, id: PaneId) {
|
||||||
|
|
|
||||||
162
zellij-server/src/terminal_bytes.rs
Normal file
162
zellij-server/src/terminal_bytes.rs
Normal file
|
|
@ -0,0 +1,162 @@
|
||||||
|
use crate::{
|
||||||
|
os_input_output::{AsyncReader, ServerOsApi},
|
||||||
|
screen::ScreenInstruction,
|
||||||
|
thread_bus::ThreadSenders,
|
||||||
|
};
|
||||||
|
use async_std::{future::timeout as async_timeout, task};
|
||||||
|
use std::{
|
||||||
|
os::unix::io::RawFd,
|
||||||
|
time::{Duration, Instant},
|
||||||
|
};
|
||||||
|
use zellij_utils::{
|
||||||
|
async_std,
|
||||||
|
errors::{get_current_ctx, ContextType},
|
||||||
|
logging::debug_to_file,
|
||||||
|
};
|
||||||
|
|
||||||
|
enum ReadResult {
|
||||||
|
Ok(usize),
|
||||||
|
Timeout,
|
||||||
|
Err(std::io::Error),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<std::io::Result<usize>> for ReadResult {
|
||||||
|
fn from(e: std::io::Result<usize>) -> ReadResult {
|
||||||
|
match e {
|
||||||
|
Err(e) => ReadResult::Err(e),
|
||||||
|
Ok(n) => ReadResult::Ok(n),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) struct TerminalBytes {
|
||||||
|
pid: RawFd,
|
||||||
|
senders: ThreadSenders,
|
||||||
|
async_reader: Box<dyn AsyncReader>,
|
||||||
|
debug: bool,
|
||||||
|
render_deadline: Option<Instant>,
|
||||||
|
backed_up: bool,
|
||||||
|
minimum_render_send_time: Option<Duration>,
|
||||||
|
buffering_pause: Duration,
|
||||||
|
last_render: Instant,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TerminalBytes {
|
||||||
|
pub fn new(
|
||||||
|
pid: RawFd,
|
||||||
|
senders: ThreadSenders,
|
||||||
|
os_input: Box<dyn ServerOsApi>,
|
||||||
|
debug: bool,
|
||||||
|
) -> Self {
|
||||||
|
TerminalBytes {
|
||||||
|
pid,
|
||||||
|
senders,
|
||||||
|
debug,
|
||||||
|
async_reader: os_input.async_file_reader(pid),
|
||||||
|
render_deadline: None,
|
||||||
|
backed_up: false,
|
||||||
|
minimum_render_send_time: None,
|
||||||
|
buffering_pause: Duration::from_millis(30),
|
||||||
|
last_render: Instant::now(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub async fn listen(&mut self) {
|
||||||
|
// This function reads bytes from the pty and then sends them as
|
||||||
|
// ScreenInstruction::PtyBytes to screen to be parsed there
|
||||||
|
// We also send a separate instruction to Screen to render as ScreenInstruction::Render
|
||||||
|
//
|
||||||
|
// We endeavour to send a Render instruction to screen immediately after having send bytes
|
||||||
|
// to parse - this is so that the rendering is quick and smooth. However, this can cause
|
||||||
|
// latency if the screen is backed up. For this reason, if we detect a peak in the time it
|
||||||
|
// takes to send the render instruction, we assume the screen thread is backed up and so
|
||||||
|
// only send a render instruction sparingly, giving screen time to process bytes and render
|
||||||
|
// while still allowing the user to see an indication that things are happening (the
|
||||||
|
// sparing render instructions)
|
||||||
|
let mut err_ctx = get_current_ctx();
|
||||||
|
err_ctx.add_call(ContextType::AsyncTask);
|
||||||
|
let mut buf = [0u8; 65536];
|
||||||
|
loop {
|
||||||
|
match self.deadline_read(&mut buf).await {
|
||||||
|
// match deadline_read(async_reader.as_mut(), self.render_deadline, &mut buf).await {
|
||||||
|
ReadResult::Ok(0) | ReadResult::Err(_) => break, // EOF or error
|
||||||
|
ReadResult::Timeout => {
|
||||||
|
let time_to_send_render =
|
||||||
|
self.async_send_to_screen(ScreenInstruction::Render).await;
|
||||||
|
self.update_render_send_time(time_to_send_render);
|
||||||
|
// next read does not need a deadline as we just rendered everything
|
||||||
|
self.render_deadline = None;
|
||||||
|
self.last_render = Instant::now();
|
||||||
|
},
|
||||||
|
ReadResult::Ok(n_bytes) => {
|
||||||
|
let bytes = &buf[..n_bytes];
|
||||||
|
if self.debug {
|
||||||
|
let _ = debug_to_file(bytes, self.pid);
|
||||||
|
}
|
||||||
|
self.async_send_to_screen(ScreenInstruction::PtyBytes(
|
||||||
|
self.pid,
|
||||||
|
bytes.to_vec(),
|
||||||
|
))
|
||||||
|
.await;
|
||||||
|
if !self.backed_up {
|
||||||
|
// we're not backed up, let's send an immediate render instruction
|
||||||
|
let time_to_send_render =
|
||||||
|
self.async_send_to_screen(ScreenInstruction::Render).await;
|
||||||
|
self.update_render_send_time(time_to_send_render);
|
||||||
|
}
|
||||||
|
// if we already have a render_deadline we keep it, otherwise we set it
|
||||||
|
// to buffering_pause since the last time we rendered.
|
||||||
|
self.render_deadline
|
||||||
|
.get_or_insert(self.last_render + self.buffering_pause);
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
self.async_send_to_screen(ScreenInstruction::Render).await;
|
||||||
|
}
|
||||||
|
async fn async_send_to_screen(&self, screen_instruction: ScreenInstruction) -> Duration {
|
||||||
|
// returns the time it blocked the thread for
|
||||||
|
let sent_at = Instant::now();
|
||||||
|
let senders = self.senders.clone();
|
||||||
|
task::spawn_blocking(move || senders.send_to_screen(screen_instruction))
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
sent_at.elapsed()
|
||||||
|
}
|
||||||
|
fn update_render_send_time(&mut self, time_to_send_render: Duration) {
|
||||||
|
match self.minimum_render_send_time.as_mut() {
|
||||||
|
Some(minimum_render_time) => {
|
||||||
|
if time_to_send_render < *minimum_render_time {
|
||||||
|
*minimum_render_time = time_to_send_render;
|
||||||
|
}
|
||||||
|
if time_to_send_render > *minimum_render_time * 10 {
|
||||||
|
// sending the render instruction took an especially long time, we can safely
|
||||||
|
// assume the screen thread is backed up and we should only send render
|
||||||
|
// instructions sparingly
|
||||||
|
self.backed_up = true;
|
||||||
|
} else if time_to_send_render < *minimum_render_time * 5 {
|
||||||
|
// the screen thread is not backed up, we atomically unset the backed_up
|
||||||
|
// indication
|
||||||
|
self.backed_up = false;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
None => {
|
||||||
|
self.minimum_render_send_time = Some(time_to_send_render);
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
async fn deadline_read(&mut self, buf: &mut [u8]) -> ReadResult {
|
||||||
|
if let Some(deadline) = self.render_deadline {
|
||||||
|
let timeout = deadline.checked_duration_since(Instant::now());
|
||||||
|
if let Some(timeout) = timeout {
|
||||||
|
match async_timeout(timeout, self.async_reader.read(buf)).await {
|
||||||
|
Ok(res) => res.into(),
|
||||||
|
_ => ReadResult::Timeout,
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// deadline has already elapsed
|
||||||
|
ReadResult::Timeout
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
self.async_reader.read(buf).await.into()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
Add table
Reference in a new issue