feat(plugins): Plugin workers and strider (#2449)

* mvp of strider fuzzy find

* improve search ui

* various refactoringz

* moar refactoring

* even more refactoring

* tests and more refactoring

* refactor: remove unused stuff

* style(fmt): rustfmt

* debug ci

* debug ci

* correct path for plugin system tests

* fix plugin system ci tests

* remove debugging statements from test

* fix plugin worker persistence

* add test for plugin worker persistence

* style(fmt): rustfmt

* final cleanups

* remove outdated comments
This commit is contained in:
Aram Drevekenin 2023-05-16 12:47:18 +02:00 committed by GitHub
parent 4b7d7c34b7
commit 5fe4d60c22
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
34 changed files with 1926 additions and 186 deletions

66
Cargo.lock generated
View file

@ -776,11 +776,12 @@ checksum = "3c877555693c14d2f84191cfd3ad8582790fc52b5e2274b40b59cf5f5cea25c7"
[[package]]
name = "dialoguer"
version = "0.10.1"
version = "0.10.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d8c8ae48e400addc32a8710c8d62d55cb84249a7d58ac4cd959daecfbaddc545"
checksum = "59c6f2989294b9a498d3ad5491a79c6deb604617378e1cdc4bfc1c1361fe2f87"
dependencies = [
"console",
"shell-words",
"tempfile",
"zeroize",
]
@ -1019,6 +1020,15 @@ version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80"
[[package]]
name = "fixture-plugin-for-tests"
version = "0.1.0"
dependencies = [
"serde",
"serde_json",
"zellij-tile",
]
[[package]]
name = "fnv"
version = "1.0.7"
@ -1071,6 +1081,15 @@ dependencies = [
"waker-fn",
]
[[package]]
name = "fuzzy-matcher"
version = "0.3.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "54614a3312934d066701a80f20f15fa3b56d67ac7722b39eea5b4c9dd1d66c94"
dependencies = [
"thread_local",
]
[[package]]
name = "generational-arena"
version = "0.2.8"
@ -2443,6 +2462,15 @@ version = "1.0.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f3f6f92acf49d1b98f7a81226834412ada05458b7364277387724a237f062695"
[[package]]
name = "same-file"
version = "1.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502"
dependencies = [
"winapi-util",
]
[[package]]
name = "scopeguard"
version = "1.1.0"
@ -2596,6 +2624,12 @@ dependencies = [
"opaque-debug 0.3.0",
]
[[package]]
name = "shell-words"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "24188a676b6ae68c3b2cb3a01be17fbf7240ce009799bb56d5b1409051e78fde"
[[package]]
name = "shellexpand"
version = "3.0.0"
@ -2798,8 +2832,14 @@ checksum = "213701ba3370744dcd1a12960caa4843b3d68b4d1c0a5d575e0d65b2ee9d16c0"
name = "strider"
version = "0.2.0"
dependencies = [
"ansi_term",
"colored",
"fuzzy-matcher",
"pretty-bytes",
"serde",
"serde_json",
"unicode-width",
"walkdir",
"zellij-tile",
]
@ -3066,6 +3106,16 @@ dependencies = [
"winapi",
]
[[package]]
name = "thread_local"
version = "1.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3fdd6f064ccff2d6567adcb3873ca630700f00b5ad3f060c25b5dcfd9a4ce152"
dependencies = [
"cfg-if 1.0.0",
"once_cell",
]
[[package]]
name = "time"
version = "0.1.44"
@ -3371,6 +3421,16 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9d5b2c62b4012a3e1eca5a7e077d13b3bf498c4073e33ccd58626607748ceeca"
[[package]]
name = "walkdir"
version = "2.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "36df944cda56c7d8d8b7496af378e6b16de9284591917d307c9b4d313c44e698"
dependencies = [
"same-file",
"winapi-util",
]
[[package]]
name = "wasi"
version = "0.9.0+wasi-snapshot-preview1"
@ -4212,6 +4272,7 @@ dependencies = [
"sixel-image",
"sixel-tokenizer",
"sysinfo",
"tempfile",
"typetag",
"unicode-width",
"url",
@ -4278,6 +4339,7 @@ dependencies = [
"thiserror",
"unicode-width",
"url",
"uuid",
"vte 0.11.0",
]

View file

@ -34,6 +34,7 @@ members = [
"default-plugins/status-bar",
"default-plugins/strider",
"default-plugins/tab-bar",
"default-plugins/fixture-plugin-for-tests",
"zellij-client",
"zellij-server",
"zellij-utils",

View file

@ -0,0 +1,2 @@
[build]
target = "wasm32-wasi"

View file

@ -0,0 +1,11 @@
[package]
name = "fixture-plugin-for-tests"
version = "0.1.0"
authors = ["Aram Drevekenin <aram@poor.dev>"]
edition = "2021"
license = "MIT"
[dependencies]
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
zellij-tile = { path = "../../zellij-tile" }

View file

@ -0,0 +1 @@
../../LICENSE.md

View file

@ -0,0 +1,78 @@
use serde::{Deserialize, Serialize};
use zellij_tile::prelude::*;
// This is a fixture plugin used only for tests in Zellij
// it is not (and should not!) be included in the mainline executable
// it's included here for convenience so that it will be built by the CI
#[derive(Default)]
struct State {
received_events: Vec<Event>,
received_payload: Option<String>,
}
#[derive(Default, Serialize, Deserialize)]
struct TestWorker {
number_of_messages_received: usize,
}
impl<'de> ZellijWorker<'de> for TestWorker {
fn on_message(&mut self, message: String, payload: String) {
if message == "ping" {
self.number_of_messages_received += 1;
post_message_to_plugin(
"pong".into(),
format!(
"{}, received {} messages",
payload, self.number_of_messages_received
),
);
}
}
}
register_plugin!(State);
register_worker!(TestWorker, test_worker);
impl ZellijPlugin for State {
fn load(&mut self) {
subscribe(&[
EventType::InputReceived,
EventType::SystemClipboardFailure,
EventType::CustomMessage,
]);
}
fn update(&mut self, event: Event) -> bool {
match &event {
Event::CustomMessage(message, payload) => {
if message == "pong" {
self.received_payload = Some(payload.clone());
}
},
Event::SystemClipboardFailure => {
// this is just to trigger the worker message
post_message_to(
"test",
"ping".to_owned(),
"gimme_back_my_payload".to_owned(),
);
},
_ => {},
}
let should_render = true;
self.received_events.push(event);
should_render
}
fn render(&mut self, rows: usize, cols: usize) {
if let Some(payload) = self.received_payload.as_ref() {
println!("Payload from worker: {:?}", payload);
} else {
println!(
"Rows: {:?}, Cols: {:?}, Received events: {:?}",
rows, cols, self.received_events
);
}
}
}

View file

@ -10,3 +10,9 @@ license = "MIT"
colored = "2.0.0"
zellij-tile = { path = "../../zellij-tile" }
pretty-bytes = "0.2.2"
walkdir = "2.3.3"
fuzzy-matcher = "0.3.7"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
unicode-width = "0.1.8"
ansi_term = "0.12.1"

View file

@ -1,16 +1,28 @@
mod search;
mod state;
use colored::*;
use state::{refresh_directory, FsEntry, State};
use search::{ResultsOfSearch, SearchWorker};
use serde_json;
use state::{refresh_directory, FsEntry, State, CURRENT_SEARCH_TERM};
use std::{cmp::min, time::Instant};
use zellij_tile::prelude::*;
register_plugin!(State);
register_worker!(SearchWorker, search_worker);
impl ZellijPlugin for State {
fn load(&mut self) {
refresh_directory(self);
subscribe(&[EventType::Key, EventType::Mouse]);
self.loading = true;
subscribe(&[
EventType::Key,
EventType::Mouse,
EventType::CustomMessage,
EventType::Timer,
]);
post_message_to("search", String::from("scan_folder"), String::new());
set_timeout(0.5); // for displaying loading animation
}
fn update(&mut self, event: Event) -> bool {
@ -22,26 +34,101 @@ impl ZellijPlugin for State {
};
self.ev_history.push_back((event.clone(), Instant::now()));
match event {
Event::Timer(_elapsed) => {
should_render = true;
if self.loading {
set_timeout(0.5);
if self.loading_animation_offset == u8::MAX {
self.loading_animation_offset = 0;
} else {
self.loading_animation_offset =
self.loading_animation_offset.saturating_add(1);
}
}
},
Event::CustomMessage(message, payload) => match message.as_str() {
"update_search_results" => {
if let Ok(mut results_of_search) =
serde_json::from_str::<ResultsOfSearch>(&payload)
{
if Some(results_of_search.search_term) == self.search_term {
self.search_results =
results_of_search.search_results.drain(..).collect();
should_render = true;
}
}
},
"done_scanning_folder" => {
self.loading = false;
should_render = true;
},
_ => {},
},
Event::Key(key) => match key {
// modes:
// 1. typing_search_term
// 2. exploring_search_results
// 3. normal
Key::Esc | Key::Char('\n') if self.typing_search_term() => {
self.accept_search_term();
},
_ if self.typing_search_term() => {
self.append_to_search_term(key);
if let Some(search_term) = self.search_term.as_ref() {
std::fs::write(CURRENT_SEARCH_TERM, search_term.as_bytes()).unwrap();
post_message_to(
"search",
String::from("search"),
String::from(&self.search_term.clone().unwrap()),
);
}
should_render = true;
},
Key::Esc if self.exploring_search_results() => {
self.stop_exploring_search_results();
should_render = true;
},
Key::Char('/') => {
self.start_typing_search_term();
should_render = true;
},
Key::Esc => {
self.stop_typing_search_term();
should_render = true;
},
Key::Up | Key::Char('k') => {
let currently_selected = self.selected();
*self.selected_mut() = self.selected().saturating_sub(1);
if currently_selected != self.selected() {
if self.exploring_search_results() {
self.move_search_selection_up();
should_render = true;
} else {
let currently_selected = self.selected();
*self.selected_mut() = self.selected().saturating_sub(1);
if currently_selected != self.selected() {
should_render = true;
}
}
},
Key::Down | Key::Char('j') => {
let currently_selected = self.selected();
let next = self.selected().saturating_add(1);
*self.selected_mut() = min(self.files.len().saturating_sub(1), next);
if currently_selected != self.selected() {
if self.exploring_search_results() {
self.move_search_selection_down();
should_render = true;
} else {
let currently_selected = self.selected();
let next = self.selected().saturating_add(1);
*self.selected_mut() = min(self.files.len().saturating_sub(1), next);
if currently_selected != self.selected() {
should_render = true;
}
}
},
Key::Right | Key::Char('\n') | Key::Char('l') if !self.files.is_empty() => {
if self.exploring_search_results() {
self.open_search_result();
} else {
self.traverse_dir_or_open_file();
self.ev_history.clear();
}
should_render = true;
self.traverse_dir_or_open_file();
self.ev_history.clear();
},
Key::Left | Key::Char('h') => {
if self.path.components().count() > 2 {
@ -111,6 +198,10 @@ impl ZellijPlugin for State {
}
fn render(&mut self, rows: usize, cols: usize) {
if self.typing_search_term() || self.exploring_search_results() {
return self.render_search(rows, cols);
}
for i in 0..rows {
if self.selected() < self.scroll() {
*self.scroll_mut() = self.selected();

View file

@ -0,0 +1,415 @@
use crate::state::{State, CURRENT_SEARCH_TERM, ROOT};
use unicode_width::UnicodeWidthStr;
use zellij_tile::prelude::*;
use fuzzy_matcher::skim::SkimMatcherV2;
use fuzzy_matcher::FuzzyMatcher;
use serde::{Deserialize, Serialize};
use walkdir::WalkDir;
use std::io::{self, BufRead};
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum SearchResult {
File {
path: String,
score: i64,
indices: Vec<usize>,
},
LineInFile {
path: String,
line: String,
line_number: usize,
score: i64,
indices: Vec<usize>,
},
}
impl SearchResult {
pub fn new_file_name(score: i64, indices: Vec<usize>, path: String) -> Self {
SearchResult::File {
path,
score,
indices,
}
}
pub fn new_file_line(
score: i64,
indices: Vec<usize>,
path: String,
line: String,
line_number: usize,
) -> Self {
SearchResult::LineInFile {
path,
score,
indices,
line,
line_number,
}
}
pub fn score(&self) -> i64 {
match self {
SearchResult::File { score, .. } => *score,
SearchResult::LineInFile { score, .. } => *score,
}
}
pub fn rendered_height(&self) -> usize {
match self {
SearchResult::File { .. } => 1,
SearchResult::LineInFile { .. } => 2,
}
}
pub fn render(&self, max_width: usize, is_selected: bool) -> String {
let green_code = 154;
let orange_code = 166;
let bold_code = "\u{1b}[1m";
let green_foreground = format!("\u{1b}[38;5;{}m", green_code);
let orange_foreground = format!("\u{1b}[38;5;{}m", orange_code);
let reset_code = "\u{1b}[m";
let max_width = max_width.saturating_sub(3); // for the UI left line separator
match self {
SearchResult::File { path, indices, .. } => {
if is_selected {
let line = self.render_line_with_indices(
path,
indices,
max_width,
None,
Some(green_code),
true,
);
format!("{} | {}{}", green_foreground, reset_code, line)
} else {
let line =
self.render_line_with_indices(path, indices, max_width, None, None, true);
format!(" | {}", line)
}
},
SearchResult::LineInFile {
path,
line,
line_number,
indices,
..
} => {
if is_selected {
let first_line = self.render_line_with_indices(
path,
&vec![],
max_width,
None,
Some(green_code),
true,
);
let line_indication_text = format!("{}-> {}", bold_code, line_number);
let line_indication = format!(
"{}{}{}",
orange_foreground, line_indication_text, reset_code
); // TODO: also truncate
let second_line = self.render_line_with_indices(
line,
indices,
max_width.saturating_sub(line_indication_text.width()),
None,
Some(orange_code),
false,
);
format!(
" {}│{} {}\n {}│{} {} {}",
green_foreground,
reset_code,
first_line,
green_foreground,
reset_code,
line_indication,
second_line
)
} else {
let first_line =
self.render_line_with_indices(path, &vec![], max_width, None, None, true); // TODO:
let line_indication_text = format!("{}-> {}", bold_code, line_number);
let second_line = self.render_line_with_indices(
line,
indices,
max_width.saturating_sub(line_indication_text.width()),
None,
None,
false,
);
format!(
" │ {}\n │ {} {}",
first_line, line_indication_text, second_line
)
}
},
}
}
fn render_line_with_indices(
&self,
line_to_render: &String,
indices: &Vec<usize>,
max_width: usize,
background_color: Option<usize>,
foreground_color: Option<usize>,
is_bold: bool,
) -> String {
// TODO: get these from Zellij
let reset_code = "\u{1b}[m";
let underline_code = "\u{1b}[4m";
let foreground_color = foreground_color
.map(|c| format!("\u{1b}[38;5;{}m", c))
.unwrap_or_else(|| format!(""));
let background_color = background_color
.map(|c| format!("\u{1b}[48;5;{}m", c))
.unwrap_or_else(|| format!(""));
let bold = if is_bold { "\u{1b}[1m" } else { "" };
let non_index_character_style = format!("{}{}{}", background_color, foreground_color, bold);
let index_character_style = format!(
"{}{}{}{}",
background_color, foreground_color, bold, underline_code
);
let mut truncate_start_position = None;
let mut truncate_end_position = None;
if line_to_render.width() > max_width {
let length_of_each_half = max_width.saturating_sub(4) / 2;
truncate_start_position = Some(length_of_each_half);
truncate_end_position =
Some(line_to_render.width().saturating_sub(length_of_each_half));
}
let mut first_half = format!("{}", reset_code);
let mut second_half = format!("{}", reset_code);
for (i, character) in line_to_render.chars().enumerate() {
if (truncate_start_position.is_none() && truncate_end_position.is_none())
|| Some(i) < truncate_start_position
{
if indices.contains(&i) {
first_half.push_str(&index_character_style);
first_half.push(character);
first_half.push_str(reset_code);
} else {
first_half.push_str(&non_index_character_style);
first_half.push(character);
first_half.push_str(reset_code);
}
} else if Some(i) > truncate_end_position {
if indices.contains(&i) {
second_half.push_str(&index_character_style);
second_half.push(character);
second_half.push_str(reset_code);
} else {
second_half.push_str(&non_index_character_style);
second_half.push(character);
second_half.push_str(reset_code);
}
}
}
if let Some(_truncate_start_position) = truncate_start_position {
format!(
"{}{}{}[..]{}{}{}",
first_half, reset_code, foreground_color, reset_code, second_half, reset_code
)
} else {
format!("{}{}", first_half, reset_code)
}
}
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct ResultsOfSearch {
pub search_term: String,
pub search_results: Vec<SearchResult>,
}
impl ResultsOfSearch {
pub fn new(search_term: String, search_results: Vec<SearchResult>) -> Self {
ResultsOfSearch {
search_term,
search_results,
}
}
pub fn limit_search_results(mut self, max_results: usize) -> Self {
self.search_results
.sort_by(|a, b| b.score().cmp(&a.score()));
self.search_results = if self.search_results.len() > max_results {
self.search_results.drain(..max_results).collect()
} else {
self.search_results.drain(..).collect()
};
self
}
}
#[derive(Default, Serialize, Deserialize)]
pub struct SearchWorker {
pub search_paths: Vec<String>,
pub search_file_contents: Vec<(String, usize, String)>, // file_name, line_number, line
skip_hidden_files: bool,
}
impl<'de> ZellijWorker<'de> for SearchWorker {
// TODO: handle out of order messages, likely when rendering
fn on_message(&mut self, message: String, payload: String) {
match message.as_str() {
// TODO: deserialize to type
"scan_folder" => {
self.populate_search_paths();
post_message_to_plugin("done_scanning_folder".into(), "".into());
},
"search" => {
let search_term = payload;
let (search_term, matches) = self.search(search_term);
let search_results =
ResultsOfSearch::new(search_term, matches).limit_search_results(100);
post_message_to_plugin(
"update_search_results".into(),
serde_json::to_string(&search_results).unwrap(),
);
},
"skip_hidden_files" => match serde_json::from_str::<bool>(&payload) {
Ok(should_skip_hidden_files) => {
self.skip_hidden_files = should_skip_hidden_files;
},
Err(e) => {
eprintln!("Failed to deserialize payload: {:?}", e);
},
},
_ => {},
}
}
}
impl SearchWorker {
fn search(&mut self, search_term: String) -> (String, Vec<SearchResult>) {
if self.search_paths.is_empty() {
self.populate_search_paths();
}
let mut matches = vec![];
let mut matcher = SkimMatcherV2::default().use_cache(true).element_limit(100); // TODO: no hard
// coded limit!
self.search_file_names(&search_term, &mut matcher, &mut matches);
self.search_file_contents(&search_term, &mut matcher, &mut matches);
// if the search term changed before we finished, let's search again!
if let Ok(current_search_term) = std::fs::read(CURRENT_SEARCH_TERM) {
let current_search_term = String::from_utf8_lossy(&current_search_term); // TODO: not lossy, search can be lots of stuff
if current_search_term != search_term {
return self.search(current_search_term.into());
}
}
(search_term, matches)
}
fn populate_search_paths(&mut self) {
for entry in WalkDir::new(ROOT).into_iter().filter_map(|e| e.ok()) {
if self.skip_hidden_files
&& entry
.file_name()
.to_str()
.map(|s| s.starts_with('.'))
.unwrap_or(false)
{
continue;
}
let file_path = entry.path().display().to_string();
if entry.metadata().unwrap().is_file() {
if let Ok(file) = std::fs::File::open(&file_path) {
let lines = io::BufReader::new(file).lines();
for (index, line) in lines.enumerate() {
match line {
Ok(line) => {
self.search_file_contents.push((
file_path.clone(),
index + 1,
line,
));
},
Err(_) => {
break; // probably a binary file, skip it
},
}
}
}
}
self.search_paths.push(file_path);
}
}
fn search_file_names(
&self,
search_term: &str,
matcher: &mut SkimMatcherV2,
matches: &mut Vec<SearchResult>,
) {
for entry in &self.search_paths {
if let Some((score, indices)) = matcher.fuzzy_indices(&entry, &search_term) {
matches.push(SearchResult::new_file_name(
score,
indices,
entry.to_owned(),
));
}
}
}
fn search_file_contents(
&self,
search_term: &str,
matcher: &mut SkimMatcherV2,
matches: &mut Vec<SearchResult>,
) {
for (file_name, line_number, line_entry) in &self.search_file_contents {
if let Some((score, indices)) = matcher.fuzzy_indices(&line_entry, &search_term) {
matches.push(SearchResult::new_file_line(
score,
indices,
file_name.clone(),
line_entry.clone(),
*line_number,
));
}
}
}
}
impl State {
pub fn render_search(&mut self, rows: usize, cols: usize) {
if let Some(search_term) = self.search_term.as_ref() {
let mut to_render = String::new();
to_render.push_str(&format!(
" \u{1b}[38;5;51;1mSEARCH:\u{1b}[m {}\n",
search_term
));
let mut rows_left_to_render = rows.saturating_sub(3);
if self.loading && self.search_results.is_empty() {
to_render.push_str(&self.render_loading());
}
for (i, result) in self
.search_results
.iter()
.enumerate()
.take(rows.saturating_sub(3))
{
let result_height = result.rendered_height();
if result_height + 1 > rows_left_to_render {
break;
}
rows_left_to_render -= result_height;
rows_left_to_render -= 1; // space between
let is_selected = i == self.selected_search_result;
let rendered_result = result.render(cols, is_selected);
to_render.push_str(&format!("\n{}\n", rendered_result));
}
print!("{}", to_render);
}
}
pub fn render_loading(&self) -> String {
let mut rendered = String::from("Scanning folder");
let dot_count = self.loading_animation_offset % 4;
for _ in 0..dot_count {
rendered.push('.');
}
rendered
}
}

View file

@ -1,3 +1,4 @@
use crate::search::SearchResult;
use pretty_bytes::converter as pb;
use std::{
collections::{HashMap, VecDeque},
@ -7,7 +8,9 @@ use std::{
};
use zellij_tile::prelude::*;
const ROOT: &str = "/host";
pub const ROOT: &str = "/host";
pub const CURRENT_SEARCH_TERM: &str = "/data/current_search_term";
#[derive(Default)]
pub struct State {
pub path: PathBuf,
@ -15,9 +18,66 @@ pub struct State {
pub cursor_hist: HashMap<PathBuf, (usize, usize)>,
pub hide_hidden_files: bool,
pub ev_history: VecDeque<(Event, Instant)>, // stores last event, can be expanded in future
pub search_paths: Vec<String>,
pub search_term: Option<String>,
pub search_results: Vec<SearchResult>,
pub loading: bool,
pub loading_animation_offset: u8,
pub typing_search_term: bool,
pub exploring_search_results: bool,
pub selected_search_result: usize,
}
impl State {
pub fn append_to_search_term(&mut self, key: Key) {
match key {
Key::Char(character) => {
if let Some(search_term) = self.search_term.as_mut() {
search_term.push(character);
}
},
Key::Backspace => {
if let Some(search_term) = self.search_term.as_mut() {
search_term.pop();
if search_term.len() == 0 {
self.search_term = None;
self.typing_search_term = false;
}
}
},
_ => {},
}
}
pub fn accept_search_term(&mut self) {
self.typing_search_term = false;
self.exploring_search_results = true;
}
pub fn typing_search_term(&self) -> bool {
self.typing_search_term
}
pub fn exploring_search_results(&self) -> bool {
self.exploring_search_results
}
pub fn stop_exploring_search_results(&mut self) {
self.exploring_search_results = false;
}
pub fn start_typing_search_term(&mut self) {
if self.search_term.is_none() {
self.search_term = Some(String::new());
}
self.typing_search_term = true;
}
pub fn stop_typing_search_term(&mut self) {
self.typing_search_term = true;
}
pub fn move_search_selection_up(&mut self) {
self.selected_search_result = self.selected_search_result.saturating_sub(1);
}
pub fn move_search_selection_down(&mut self) {
if self.selected_search_result < self.search_results.len() {
self.selected_search_result = self.selected_search_result.saturating_add(1);
}
}
pub fn selected_mut(&mut self) -> &mut usize {
&mut self.cursor_hist.entry(self.path.clone()).or_default().0
}
@ -44,6 +104,32 @@ impl State {
}
}
}
pub fn open_search_result(&mut self) {
match self.search_results.get(self.selected_search_result) {
Some(SearchResult::File {
path,
score,
indices,
}) => {
let file_path = PathBuf::from(path);
open_file(file_path.strip_prefix(ROOT).unwrap());
},
Some(SearchResult::LineInFile {
path,
score,
indices,
line,
line_number,
}) => {
let file_path = PathBuf::from(path);
open_file_with_line(file_path.strip_prefix(ROOT).unwrap(), *line_number);
// open_file_with_line(&file_path, *line_number); // TODO: no!!
},
None => {
eprintln!("Search result not found");
},
}
}
}
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone)]

View file

@ -4,7 +4,10 @@ use crate::{
flags::{self, CiCmd, Cross, E2e},
};
use anyhow::Context;
use std::{ffi::OsString, path::PathBuf};
use std::{
ffi::OsString,
path::{Path, PathBuf},
};
use xshell::{cmd, Shell};
pub fn main(sh: &Shell, flags: flags::Ci) -> anyhow::Result<()> {
@ -105,13 +108,27 @@ fn e2e_test(sh: &Shell, args: Vec<OsString>) -> anyhow::Result<()> {
// set --no-default-features so the test binary gets built with the plugins from assets/plugins that just got built
crate::cargo()
.and_then(|cargo| {
// e2e tests
cmd!(
sh,
"{cargo} test --no-default-features -- --ignored --nocapture --test-threads 1"
)
.args(args)
.args(args.clone())
.run()
.map_err(anyhow::Error::new)
.map_err(anyhow::Error::new)?;
// plugin system tests are run here because they're medium-slow
let _pd = sh.push_dir(Path::new("zellij-server"));
println!("");
let msg = format!(">> Testing Plugin System");
crate::status(&msg);
println!("{}", msg);
cmd!(sh, "{cargo} test -- --ignored --nocapture --test-threads 1")
.args(args.clone())
.run()
.with_context(|| format!("Failed to run tests for the Plugin System"))?;
Ok(())
})
.context(err_context)
}

View file

@ -34,6 +34,7 @@ lazy_static::lazy_static! {
WorkspaceMember{crate_name: "default-plugins/status-bar", build: true},
WorkspaceMember{crate_name: "default-plugins/strider", build: true},
WorkspaceMember{crate_name: "default-plugins/tab-bar", build: true},
WorkspaceMember{crate_name: "default-plugins/fixture-plugin-for-tests", build: true},
WorkspaceMember{crate_name: "zellij-utils", build: false},
WorkspaceMember{crate_name: "zellij-tile-utils", build: false},
WorkspaceMember{crate_name: "zellij-tile", build: false},

View file

@ -35,6 +35,8 @@ semver = "0.11.0"
[dev-dependencies]
insta = "1.6.0"
tempfile = "3.2.0"
wasmer = { version = "2.3.0", features = [ "singlepass" ] }
[features]
singlepass = ["wasmer/singlepass"]

View file

@ -3,6 +3,7 @@ use std::{
io::{Read, Seek, Write},
};
use crate::plugins::PluginId;
use log::{debug, error};
use wasmer_wasi::{WasiFile, WasiFsError};
use zellij_utils::{errors::prelude::*, serde};
@ -17,11 +18,11 @@ const ZELLIJ_MAX_PIPE_BUFFER_SIZE: usize = 16_384;
pub struct LoggingPipe {
buffer: VecDeque<u8>,
plugin_name: String,
plugin_id: u32,
plugin_id: PluginId,
}
impl LoggingPipe {
pub fn new(plugin_name: &str, plugin_id: u32) -> LoggingPipe {
pub fn new(plugin_name: &str, plugin_id: PluginId) -> LoggingPipe {
LoggingPipe {
buffer: VecDeque::new(),
plugin_name: String::from(plugin_name),

View file

@ -22,6 +22,8 @@ use zellij_utils::{
pane_size::Size,
};
pub type PluginId = u32;
#[derive(Clone, Debug)]
pub enum PluginInstruction {
Load(
@ -32,8 +34,8 @@ pub enum PluginInstruction {
ClientId,
Size,
),
Update(Vec<(Option<u32>, Option<ClientId>, Event)>), // Focused plugin / broadcast, client_id, event data
Unload(u32), // plugin_id
Update(Vec<(Option<PluginId>, Option<ClientId>, Event)>), // Focused plugin / broadcast, client_id, event data
Unload(PluginId), // plugin_id
Reload(
Option<bool>, // should float
Option<String>, // pane title
@ -41,7 +43,7 @@ pub enum PluginInstruction {
usize, // tab index
Size,
),
Resize(u32, usize, usize), // plugin_id, columns, rows
Resize(PluginId, usize, usize), // plugin_id, columns, rows
AddClient(ClientId),
RemoveClient(ClientId),
NewTab(
@ -52,7 +54,23 @@ pub enum PluginInstruction {
usize, // tab_index
ClientId,
),
ApplyCachedEvents(Vec<u32>), // a list of plugin id
ApplyCachedEvents(Vec<PluginId>),
ApplyCachedWorkerMessages(PluginId),
PostMessagesToPluginWorker(
PluginId,
ClientId,
String, // worker name
Vec<(
String, // serialized message name
String, // serialized payload
)>,
),
PostMessageToPlugin(
PluginId,
ClientId,
String, // serialized message
String, // serialized payload
),
Exit,
}
@ -69,6 +87,13 @@ impl From<&PluginInstruction> for PluginContext {
PluginInstruction::RemoveClient(_) => PluginContext::RemoveClient,
PluginInstruction::NewTab(..) => PluginContext::NewTab,
PluginInstruction::ApplyCachedEvents(..) => PluginContext::ApplyCachedEvents,
PluginInstruction::ApplyCachedWorkerMessages(..) => {
PluginContext::ApplyCachedWorkerMessages
},
PluginInstruction::PostMessagesToPluginWorker(..) => {
PluginContext::PostMessageToPluginWorker
},
PluginInstruction::PostMessageToPlugin(..) => PluginContext::PostMessageToPlugin,
}
}
}
@ -163,7 +188,7 @@ pub(crate) fn plugin_thread_main(
tab_index,
client_id,
) => {
let mut plugin_ids: HashMap<RunPluginLocation, Vec<u32>> = HashMap::new();
let mut plugin_ids: HashMap<RunPluginLocation, Vec<PluginId>> = HashMap::new();
let mut extracted_run_instructions = tab_layout
.clone()
.unwrap_or_else(|| layout.new_tab().0)
@ -199,6 +224,30 @@ pub(crate) fn plugin_thread_main(
PluginInstruction::ApplyCachedEvents(plugin_id) => {
wasm_bridge.apply_cached_events(plugin_id)?;
},
PluginInstruction::ApplyCachedWorkerMessages(plugin_id) => {
wasm_bridge.apply_cached_worker_messages(plugin_id)?;
},
PluginInstruction::PostMessagesToPluginWorker(
plugin_id,
client_id,
worker_name,
messages,
) => {
wasm_bridge.post_messages_to_plugin_worker(
plugin_id,
client_id,
worker_name,
messages,
)?;
},
PluginInstruction::PostMessageToPlugin(plugin_id, client_id, message, payload) => {
let updates = vec![(
Some(plugin_id),
Some(client_id),
Event::CustomMessage(message, payload),
)];
wasm_bridge.update_plugins(updates)?;
},
PluginInstruction::Exit => {
wasm_bridge.cleanup();
break;
@ -218,3 +267,7 @@ pub(crate) fn plugin_thread_main(
})
.context("failed to cleanup plugin data directory")
}
#[path = "./unit/plugin_tests.rs"]
#[cfg(test)]
mod plugin_tests;

View file

@ -1,5 +1,8 @@
use crate::plugins::plugin_map::{PluginEnv, PluginMap, RunningPlugin, Subscriptions};
use crate::plugins::plugin_map::{
PluginEnv, PluginMap, RunningPlugin, RunningWorker, Subscriptions,
};
use crate::plugins::zellij_exports::{wasi_read_string, zellij_exports};
use crate::plugins::PluginId;
use highway::{HighwayHash, PortableHash};
use log::info;
use semver::Version;
@ -19,7 +22,7 @@ use crate::{
};
use zellij_utils::{
consts::{VERSION, ZELLIJ_CACHE_DIR, ZELLIJ_TMP_DIR},
consts::{VERSION, ZELLIJ_CACHE_DIR, ZELLIJ_SESSION_CACHE_DIR, ZELLIJ_TMP_DIR},
errors::prelude::*,
input::plugins::PluginConfig,
pane_size::Size,
@ -152,7 +155,7 @@ pub struct PluginLoader<'a> {
plugin_path: PathBuf,
loading_indication: &'a mut LoadingIndication,
senders: ThreadSenders,
plugin_id: u32,
plugin_id: PluginId,
client_id: ClientId,
store: Store,
plugin: PluginConfig,
@ -165,7 +168,7 @@ pub struct PluginLoader<'a> {
impl<'a> PluginLoader<'a> {
pub fn reload_plugin_from_memory(
plugin_id: u32,
plugin_id: PluginId,
plugin_dir: PathBuf,
plugin_cache: Arc<Mutex<HashMap<PathBuf, Module>>>,
senders: ThreadSenders,
@ -194,9 +197,7 @@ impl<'a> PluginLoader<'a> {
)?;
plugin_loader
.load_module_from_memory()
.and_then(|module| {
plugin_loader.create_plugin_instance_environment_and_subscriptions(module)
})
.and_then(|module| plugin_loader.create_plugin_environment(module))
.and_then(|(instance, plugin_env, subscriptions)| {
plugin_loader.load_plugin_instance(
&instance,
@ -214,7 +215,7 @@ impl<'a> PluginLoader<'a> {
}
pub fn start_plugin(
plugin_id: u32,
plugin_id: PluginId,
client_id: ClientId,
plugin: &PluginConfig,
tab_index: usize,
@ -227,7 +228,7 @@ impl<'a> PluginLoader<'a> {
connected_clients: Arc<Mutex<Vec<ClientId>>>,
loading_indication: &mut LoadingIndication,
) -> Result<()> {
let err_context = || format!("failed to start plugin {plugin:#?} for client {client_id}");
let err_context = || format!("failed to start plugin {plugin_id} for client {client_id}");
let mut plugin_loader = PluginLoader::new(
&plugin_cache,
loading_indication,
@ -244,9 +245,7 @@ impl<'a> PluginLoader<'a> {
.load_module_from_memory()
.or_else(|_e| plugin_loader.load_module_from_hd_cache())
.or_else(|_e| plugin_loader.compile_module())
.and_then(|module| {
plugin_loader.create_plugin_instance_environment_and_subscriptions(module)
})
.and_then(|module| plugin_loader.create_plugin_environment(module))
.and_then(|(instance, plugin_env, subscriptions)| {
plugin_loader.load_plugin_instance(
&instance,
@ -276,7 +275,7 @@ impl<'a> PluginLoader<'a> {
loading_indication: &mut LoadingIndication,
) -> Result<()> {
let mut new_plugins = HashSet::new();
for (&(plugin_id, _), _) in &*plugin_map.lock().unwrap() {
for plugin_id in plugin_map.lock().unwrap().plugin_ids() {
new_plugins.insert((plugin_id, client_id));
}
for (plugin_id, existing_client_id) in new_plugins {
@ -292,9 +291,7 @@ impl<'a> PluginLoader<'a> {
)?;
plugin_loader
.load_module_from_memory()
.and_then(|module| {
plugin_loader.create_plugin_instance_environment_and_subscriptions(module)
})
.and_then(|module| plugin_loader.create_plugin_environment(module))
.and_then(|(instance, plugin_env, subscriptions)| {
plugin_loader.load_plugin_instance(
&instance,
@ -309,7 +306,7 @@ impl<'a> PluginLoader<'a> {
}
pub fn reload_plugin(
plugin_id: u32,
plugin_id: PluginId,
plugin_dir: PathBuf,
plugin_cache: Arc<Mutex<HashMap<PathBuf, Module>>>,
senders: ThreadSenders,
@ -339,9 +336,7 @@ impl<'a> PluginLoader<'a> {
)?;
plugin_loader
.compile_module()
.and_then(|module| {
plugin_loader.create_plugin_instance_environment_and_subscriptions(module)
})
.and_then(|module| plugin_loader.create_plugin_environment(module))
.and_then(|(instance, plugin_env, subscriptions)| {
plugin_loader.load_plugin_instance(
&instance,
@ -361,7 +356,7 @@ impl<'a> PluginLoader<'a> {
plugin_cache: &Arc<Mutex<HashMap<PathBuf, Module>>>,
loading_indication: &'a mut LoadingIndication,
senders: &ThreadSenders,
plugin_id: u32,
plugin_id: PluginId,
client_id: ClientId,
store: &Store,
plugin: PluginConfig,
@ -369,7 +364,9 @@ impl<'a> PluginLoader<'a> {
tab_index: usize,
size: Size,
) -> Result<Self> {
let plugin_own_data_dir = ZELLIJ_CACHE_DIR.join(Url::from(&plugin.location).to_string());
let plugin_own_data_dir = ZELLIJ_SESSION_CACHE_DIR
.join(Url::from(&plugin.location).to_string())
.join(format!("{}-{}", plugin_id, client_id));
create_plugin_fs_entries(&plugin_own_data_dir)?;
let plugin_path = plugin.path.clone();
Ok(PluginLoader {
@ -393,16 +390,16 @@ impl<'a> PluginLoader<'a> {
plugin_map: &Arc<Mutex<PluginMap>>,
loading_indication: &'a mut LoadingIndication,
senders: &ThreadSenders,
plugin_id: u32,
plugin_id: PluginId,
client_id: ClientId,
store: &Store,
plugin_dir: &'a PathBuf,
) -> Result<Self> {
let err_context = || "Failed to find existing plugin";
let (running_plugin, _subscriptions) = {
let (running_plugin, _subscriptions, _workers) = {
let mut plugin_map = plugin_map.lock().unwrap();
plugin_map
.remove(&(plugin_id, client_id))
.remove_single_plugin(plugin_id, client_id)
.with_context(err_context)?
};
let running_plugin = running_plugin.lock().unwrap();
@ -431,19 +428,17 @@ impl<'a> PluginLoader<'a> {
plugin_map: &Arc<Mutex<PluginMap>>,
loading_indication: &'a mut LoadingIndication,
senders: &ThreadSenders,
plugin_id: u32,
plugin_id: PluginId,
client_id: ClientId,
store: &Store,
plugin_dir: &'a PathBuf,
) -> Result<Self> {
let err_context = || "Failed to find existing plugin";
let (running_plugin, _subscriptions) = {
let running_plugin = {
let plugin_map = plugin_map.lock().unwrap();
plugin_map
.iter()
.find(|((p_id, _c_id), _)| p_id == &plugin_id)
.get_running_plugin(plugin_id, None)
.with_context(err_context)?
.1
.clone()
};
let running_plugin = running_plugin.lock().unwrap();
@ -553,49 +548,13 @@ impl<'a> PluginLoader<'a> {
.with_context(err_context)?;
Ok(module)
}
pub fn create_plugin_instance_environment_and_subscriptions(
pub fn create_plugin_environment(
&mut self,
module: Module,
) -> Result<(Instance, PluginEnv, Arc<Mutex<Subscriptions>>)> {
let err_context = || {
format!(
"Failed to create instance and plugin env for plugin {}",
self.plugin_id
)
};
let mut wasi_env = WasiState::new("Zellij")
.env("CLICOLOR_FORCE", "1")
.map_dir("/host", ".")
.and_then(|wasi| wasi.map_dir("/data", &self.plugin_own_data_dir))
.and_then(|wasi| wasi.map_dir("/tmp", ZELLIJ_TMP_DIR.as_path()))
.and_then(|wasi| {
wasi.stdin(Box::new(Pipe::new()))
.stdout(Box::new(Pipe::new()))
.stderr(Box::new(LoggingPipe::new(
&self.plugin.location.to_string(),
self.plugin_id,
)))
.finalize()
})
.with_context(err_context)?;
let wasi = wasi_env.import_object(&module).with_context(err_context)?;
let mut mut_plugin = self.plugin.clone();
mut_plugin.set_tab_index(self.tab_index);
let plugin_env = PluginEnv {
plugin_id: self.plugin_id,
client_id: self.client_id,
plugin: mut_plugin,
senders: self.senders.clone(),
wasi_env,
plugin_own_data_dir: self.plugin_own_data_dir.clone(),
tab_index: self.tab_index,
};
let subscriptions = Arc::new(Mutex::new(HashSet::new()));
let zellij = zellij_exports(&self.store, &plugin_env, &subscriptions);
let instance =
Instance::new(&module, &zellij.chain_back(wasi)).with_context(err_context)?;
let err_context = || format!("Failed to create environment for plugin");
let (instance, plugin_env, subscriptions) =
self.create_plugin_instance_env_and_subscriptions(&module)?;
assert_plugin_version(&instance, &plugin_env).with_context(err_context)?;
// Only do an insert when everything went well!
let cloned_plugin = self.plugin.clone();
@ -605,6 +564,26 @@ impl<'a> PluginLoader<'a> {
.insert(cloned_plugin.path, module);
Ok((instance, plugin_env, subscriptions))
}
pub fn create_plugin_instance_and_wasi_env_for_worker(
&mut self,
) -> Result<(Instance, PluginEnv)> {
let err_context = || {
format!(
"Failed to create instance and plugin env for worker {}",
self.plugin_id
)
};
let module = self
.plugin_cache
.lock()
.unwrap()
.get(&self.plugin.path)
.with_context(err_context)?
.clone();
let (instance, plugin_env, _subscriptions) =
self.create_plugin_instance_env_and_subscriptions(&module)?;
Ok((instance, plugin_env))
}
pub fn load_plugin_instance(
&mut self,
instance: &Instance,
@ -621,11 +600,35 @@ impl<'a> PluginLoader<'a> {
self.senders,
self.plugin_id
);
let load_function = instance
let start_function = instance
.exports
.get_function("_start")
.with_context(err_context)?;
// This eventually calls the `.load()` method
let load_function = instance
.exports
.get_function("load")
.with_context(err_context)?;
let mut workers = HashMap::new();
for (function_name, _exported_function) in instance.exports.iter().functions() {
if function_name.ends_with("_worker") {
let plugin_config = self.plugin.clone();
let (instance, plugin_env) =
self.create_plugin_instance_and_wasi_env_for_worker()?;
let start_function_for_worker = instance
.exports
.get_function("_start")
.with_context(err_context)?;
start_function_for_worker
.call(&[])
.with_context(err_context)?;
let worker =
RunningWorker::new(instance, &function_name, plugin_config, plugin_env);
workers.insert(function_name.into(), Arc::new(Mutex::new(worker)));
}
}
start_function.call(&[]).with_context(err_context)?;
load_function.call(&[]).with_context(err_context)?;
display_loading_stage!(
indicate_starting_plugin_success,
@ -640,16 +643,16 @@ impl<'a> PluginLoader<'a> {
self.plugin_id
);
plugin_map.lock().unwrap().insert(
(self.plugin_id, self.client_id),
(
Arc::new(Mutex::new(RunningPlugin::new(
main_user_instance,
main_user_env,
self.size.rows,
self.size.cols,
))),
subscriptions.clone(),
),
self.plugin_id,
self.client_id,
Arc::new(Mutex::new(RunningPlugin::new(
main_user_instance,
main_user_env,
self.size.rows,
self.size.cols,
))),
subscriptions.clone(),
workers,
);
display_loading_stage!(
indicate_writing_plugin_to_cache_success,
@ -672,6 +675,10 @@ impl<'a> PluginLoader<'a> {
self.plugin_id
);
for client_id in connected_clients {
if client_id == &self.client_id {
// don't reload the plugin once more for ourselves
continue;
}
let mut loading_indication = LoadingIndication::new("".into());
let mut plugin_loader_for_client = PluginLoader::new_from_different_client_id(
&self.plugin_cache.clone(),
@ -685,10 +692,7 @@ impl<'a> PluginLoader<'a> {
)?;
plugin_loader_for_client
.load_module_from_memory()
.and_then(|module| {
plugin_loader_for_client
.create_plugin_instance_environment_and_subscriptions(module)
})
.and_then(|module| plugin_loader_for_client.create_plugin_environment(module))
.and_then(|(instance, plugin_env, subscriptions)| {
plugin_loader_for_client.load_plugin_instance(
&instance,
@ -730,6 +734,51 @@ impl<'a> PluginLoader<'a> {
},
}
}
fn create_plugin_instance_env_and_subscriptions(
&self,
module: &Module,
) -> Result<(Instance, PluginEnv, Arc<Mutex<Subscriptions>>)> {
let err_context = || {
format!(
"Failed to create instance, plugin env and subscriptions for plugin {}",
self.plugin_id
)
};
let mut wasi_env = WasiState::new("Zellij")
.env("CLICOLOR_FORCE", "1")
.map_dir("/host", ".")
.and_then(|wasi| wasi.map_dir("/data", &self.plugin_own_data_dir))
.and_then(|wasi| wasi.map_dir("/tmp", ZELLIJ_TMP_DIR.as_path()))
.and_then(|wasi| {
wasi.stdin(Box::new(Pipe::new()))
.stdout(Box::new(Pipe::new()))
.stderr(Box::new(LoggingPipe::new(
&self.plugin.location.to_string(),
self.plugin_id,
)))
.finalize()
})
.with_context(err_context)?;
let wasi = wasi_env.import_object(&module).with_context(err_context)?;
let mut mut_plugin = self.plugin.clone();
mut_plugin.set_tab_index(self.tab_index);
let plugin_env = PluginEnv {
plugin_id: self.plugin_id,
client_id: self.client_id,
plugin: mut_plugin,
senders: self.senders.clone(),
wasi_env,
plugin_own_data_dir: self.plugin_own_data_dir.clone(),
tab_index: self.tab_index,
};
let subscriptions = Arc::new(Mutex::new(HashSet::new()));
let zellij = zellij_exports(&self.store, &plugin_env, &subscriptions);
let instance =
Instance::new(&module, &zellij.chain_back(wasi)).with_context(err_context)?;
Ok((instance, plugin_env, subscriptions))
}
}
fn create_plugin_fs_entries(plugin_own_data_dir: &PathBuf) -> Result<()> {

View file

@ -1,4 +1,6 @@
use crate::plugins::wasm_bridge::PluginId;
use crate::plugins::plugin_loader::{PluginLoader, VersionMismatchError};
use crate::plugins::zellij_exports::wasi_write_object;
use crate::plugins::PluginId;
use std::{
collections::{HashMap, HashSet},
path::PathBuf,
@ -9,20 +11,183 @@ use wasmer_wasi::WasiEnv;
use crate::{thread_bus::ThreadSenders, ClientId};
use zellij_utils::{data::EventType, input::plugins::PluginConfig};
use zellij_utils::errors::prelude::*;
use zellij_utils::{
consts::VERSION, data::EventType, input::layout::RunPluginLocation,
input::plugins::PluginConfig,
};
// the idea here is to provide atomicity when adding/removing plugins from the map (eg. when a new
// client connects) but to also allow updates/renders not to block each other
// so when adding/removing from the map - everything is halted, that's life
// but when cloning the internal RunningPlugin and Subscriptions atomics, we can call methods on
// them without blocking other instances
pub type PluginMap =
HashMap<(PluginId, ClientId), (Arc<Mutex<RunningPlugin>>, Arc<Mutex<Subscriptions>>)>;
#[derive(Default)]
pub struct PluginMap {
plugin_assets: HashMap<
(PluginId, ClientId),
(
Arc<Mutex<RunningPlugin>>,
Arc<Mutex<Subscriptions>>,
HashMap<String, Arc<Mutex<RunningWorker>>>,
),
>,
}
impl PluginMap {
pub fn remove_plugins(
&mut self,
pid: PluginId,
) -> Vec<(
Arc<Mutex<RunningPlugin>>,
Arc<Mutex<Subscriptions>>,
HashMap<String, Arc<Mutex<RunningWorker>>>,
)> {
let mut removed = vec![];
let ids_in_plugin_map: Vec<(PluginId, ClientId)> =
self.plugin_assets.keys().copied().collect();
for (plugin_id, client_id) in ids_in_plugin_map {
if pid == plugin_id {
if let Some(plugin_asset) = self.plugin_assets.remove(&(plugin_id, client_id)) {
removed.push(plugin_asset);
}
}
}
removed
}
pub fn remove_single_plugin(
&mut self,
plugin_id: PluginId,
client_id: ClientId,
) -> Option<(
Arc<Mutex<RunningPlugin>>,
Arc<Mutex<Subscriptions>>,
HashMap<String, Arc<Mutex<RunningWorker>>>,
)> {
self.plugin_assets.remove(&(plugin_id, client_id))
}
pub fn plugin_ids(&self) -> Vec<PluginId> {
let mut unique_plugins: HashSet<PluginId> = self
.plugin_assets
.keys()
.map(|(plugin_id, _client_id)| *plugin_id)
.collect();
unique_plugins.drain().into_iter().collect()
}
pub fn running_plugins(&mut self) -> Vec<(PluginId, ClientId, Arc<Mutex<RunningPlugin>>)> {
self.plugin_assets
.iter()
.map(|((plugin_id, client_id), (running_plugin, _, _))| {
(*plugin_id, *client_id, running_plugin.clone())
})
.collect()
}
pub fn running_plugins_and_subscriptions(
&mut self,
) -> Vec<(
PluginId,
ClientId,
Arc<Mutex<RunningPlugin>>,
Arc<Mutex<Subscriptions>>,
)> {
self.plugin_assets
.iter()
.map(
|((plugin_id, client_id), (running_plugin, subscriptions, _))| {
(
*plugin_id,
*client_id,
running_plugin.clone(),
subscriptions.clone(),
)
},
)
.collect()
}
pub fn get_running_plugin_and_subscriptions(
&self,
plugin_id: PluginId,
client_id: ClientId,
) -> Option<(Arc<Mutex<RunningPlugin>>, Arc<Mutex<Subscriptions>>)> {
self.plugin_assets.get(&(plugin_id, client_id)).and_then(
|(running_plugin, subscriptions, _)| {
Some((running_plugin.clone(), subscriptions.clone()))
},
)
}
pub fn get_running_plugin(
&self,
plugin_id: PluginId,
client_id: Option<ClientId>,
) -> Option<Arc<Mutex<RunningPlugin>>> {
match client_id {
Some(client_id) => self
.plugin_assets
.get(&(plugin_id, client_id))
.and_then(|(running_plugin, _, _)| Some(running_plugin.clone())),
None => self
.plugin_assets
.iter()
.find(|((p_id, _), _)| *p_id == plugin_id)
.and_then(|(_, (running_plugin, _, _))| Some(running_plugin.clone())),
}
}
pub fn clone_worker(
&self,
plugin_id: PluginId,
client_id: ClientId,
worker_name: &str,
) -> Option<Arc<Mutex<RunningWorker>>> {
self.plugin_assets
.iter()
.find(|((p_id, c_id), _)| p_id == &plugin_id && c_id == &client_id)
.and_then(|(_, (_running_plugin, _subscriptions, workers))| {
if let Some(worker) = workers.get(&format!("{}_worker", worker_name)) {
Some(worker.clone())
} else {
None
}
})
.clone()
}
pub fn all_plugin_ids_for_plugin_location(
&self,
plugin_location: &RunPluginLocation,
) -> Result<Vec<PluginId>> {
let err_context = || format!("Failed to get plugin ids for location {plugin_location}");
let plugin_ids: Vec<PluginId> = self
.plugin_assets
.iter()
.filter(|(_, (running_plugin, _subscriptions, _workers))| {
&running_plugin.lock().unwrap().plugin_env.plugin.location == plugin_location
})
.map(|((plugin_id, _client_id), _)| *plugin_id)
.collect();
if plugin_ids.is_empty() {
return Err(ZellijError::PluginDoesNotExist).with_context(err_context);
}
Ok(plugin_ids)
}
pub fn insert(
&mut self,
plugin_id: PluginId,
client_id: ClientId,
running_plugin: Arc<Mutex<RunningPlugin>>,
subscriptions: Arc<Mutex<Subscriptions>>,
running_workers: HashMap<String, Arc<Mutex<RunningWorker>>>,
) {
self.plugin_assets.insert(
(plugin_id, client_id),
(running_plugin, subscriptions, running_workers),
);
}
}
pub type Subscriptions = HashSet<EventType>;
#[derive(Clone)]
pub struct PluginEnv {
pub plugin_id: u32,
pub plugin_id: PluginId,
pub plugin: PluginConfig,
pub senders: ThreadSenders,
pub wasi_env: WasiEnv,
@ -69,7 +234,6 @@ impl RunningPlugin {
}
}
pub fn next_event_id(&mut self, atomic_event: AtomicEvent) -> usize {
// TODO: probably not usize...
let current_event_id = *self.next_event_ids.get(&atomic_event).unwrap_or(&0);
if current_event_id < usize::MAX {
let next_event_id = current_event_id + 1;
@ -92,3 +256,53 @@ impl RunningPlugin {
}
}
}
pub struct RunningWorker {
pub instance: Instance,
pub name: String,
pub plugin_config: PluginConfig,
pub plugin_env: PluginEnv,
}
impl RunningWorker {
pub fn new(
instance: Instance,
name: &str,
plugin_config: PluginConfig,
plugin_env: PluginEnv,
) -> Self {
RunningWorker {
instance,
name: name.into(),
plugin_config,
plugin_env,
}
}
pub fn send_message(&self, message: String, payload: String) -> Result<()> {
let err_context = || format!("Failed to send message to worker");
let work_function = self
.instance
.exports
.get_function(&self.name)
.with_context(err_context)?;
wasi_write_object(&self.plugin_env.wasi_env, &(message, payload))
.with_context(err_context)?;
work_function.call(&[]).or_else::<anyError, _>(|e| {
match e.downcast::<serde_json::Error>() {
Ok(_) => panic!(
"{}",
anyError::new(VersionMismatchError::new(
VERSION,
"Unavailable",
&self.plugin_config.path,
self.plugin_config.is_builtin(),
))
),
Err(e) => Err(e).with_context(err_context),
}
})?;
Ok(())
}
}

View file

@ -0,0 +1,320 @@
use super::plugin_thread_main;
use crate::screen::ScreenInstruction;
use crate::{channels::SenderWithContext, thread_bus::Bus, ServerInstruction};
use insta::assert_snapshot;
use std::path::PathBuf;
use tempfile::tempdir;
use wasmer::Store;
use zellij_utils::data::Event;
use zellij_utils::errors::ErrorContext;
use zellij_utils::input::layout::{Layout, RunPlugin, RunPluginLocation};
use zellij_utils::input::plugins::PluginsConfig;
use zellij_utils::lazy_static::lazy_static;
use zellij_utils::pane_size::Size;
use crate::background_jobs::BackgroundJob;
use crate::pty_writer::PtyWriteInstruction;
use std::env::set_var;
use std::sync::{Arc, Mutex};
use crate::{plugins::PluginInstruction, pty::PtyInstruction};
use zellij_utils::channels::{self, ChannelWithContext, Receiver};
macro_rules! log_actions_in_thread {
( $arc_mutex_log:expr, $exit_event:path, $receiver:expr, $exit_after_count:expr ) => {
std::thread::Builder::new()
.name("logger thread".to_string())
.spawn({
let log = $arc_mutex_log.clone();
let mut exit_event_count = 0;
move || loop {
let (event, _err_ctx) = $receiver
.recv()
.expect("failed to receive event on channel");
match event {
$exit_event(..) => {
exit_event_count += 1;
log.lock().unwrap().push(event);
if exit_event_count == $exit_after_count {
break;
}
},
_ => {
log.lock().unwrap().push(event);
},
}
}
})
.unwrap()
};
}
fn create_plugin_thread() -> (
SenderWithContext<PluginInstruction>,
Receiver<(ScreenInstruction, ErrorContext)>,
Box<dyn FnMut()>,
) {
let (to_server, _server_receiver): ChannelWithContext<ServerInstruction> =
channels::bounded(50);
let to_server = SenderWithContext::new(to_server);
let (to_screen, screen_receiver): ChannelWithContext<ScreenInstruction> = channels::unbounded();
let to_screen = SenderWithContext::new(to_screen);
let (to_plugin, plugin_receiver): ChannelWithContext<PluginInstruction> = channels::unbounded();
let to_plugin = SenderWithContext::new(to_plugin);
let (to_pty, _pty_receiver): ChannelWithContext<PtyInstruction> = channels::unbounded();
let to_pty = SenderWithContext::new(to_pty);
let (to_pty_writer, _pty_writer_receiver): ChannelWithContext<PtyWriteInstruction> =
channels::unbounded();
let to_pty_writer = SenderWithContext::new(to_pty_writer);
let (to_background_jobs, _background_jobs_receiver): ChannelWithContext<BackgroundJob> =
channels::unbounded();
let to_background_jobs = SenderWithContext::new(to_background_jobs);
let plugin_bus = Bus::new(
vec![plugin_receiver],
Some(&to_screen),
Some(&to_pty),
Some(&to_plugin),
Some(&to_server),
Some(&to_pty_writer),
Some(&to_background_jobs),
None,
)
.should_silently_fail();
let store = Store::new(&wasmer::Universal::new(wasmer::Singlepass::default()).engine());
let data_dir = PathBuf::from(tempdir().unwrap().path());
let _plugin_thread = std::thread::Builder::new()
.name("plugin_thread".to_string())
.spawn(move || {
set_var("ZELLIJ_SESSION_NAME", "zellij-test");
plugin_thread_main(
plugin_bus,
store,
data_dir,
PluginsConfig::default(),
Box::new(Layout::default()),
)
.expect("TEST")
})
.unwrap();
let teardown = {
let to_plugin = to_plugin.clone();
move || {
let _ = to_pty.send(PtyInstruction::Exit);
let _ = to_pty_writer.send(PtyWriteInstruction::Exit);
let _ = to_screen.send(ScreenInstruction::Exit);
let _ = to_server.send(ServerInstruction::KillSession);
let _ = to_plugin.send(PluginInstruction::Exit);
}
};
(to_plugin, screen_receiver, Box::new(teardown))
}
lazy_static! {
static ref PLUGIN_FIXTURE: String = format!(
// to populate this file, make sure to run the build-e2e CI job
// (or compile the fixture plugin and copy the resulting .wasm blob to the below location)
"{}/../target/e2e-data/plugins/fixture-plugin-for-tests.wasm",
std::env::var_os("CARGO_MANIFEST_DIR")
.unwrap()
.to_string_lossy()
);
}
#[test]
#[ignore]
pub fn load_new_plugin_from_hd() {
// here we load our fixture plugin into the plugin thread, and then send it an update message
// expecting tha thte plugin will log the received event and render it later after the update
// message (this is what the fixture plugin does)
// we then listen on our mock screen receiver to make sure we got a PluginBytes instruction
// that contains said render, and assert against it
let (plugin_thread_sender, screen_receiver, mut teardown) = create_plugin_thread();
let plugin_should_float = Some(false);
let plugin_title = Some("test_plugin".to_owned());
let run_plugin = RunPlugin {
_allow_exec_host_cmd: false,
location: RunPluginLocation::File(PathBuf::from(&*PLUGIN_FIXTURE)),
};
let tab_index = 1;
let client_id = 1;
let size = Size {
cols: 121,
rows: 20,
};
let received_screen_instructions = Arc::new(Mutex::new(vec![]));
let screen_thread = log_actions_in_thread!(
received_screen_instructions,
ScreenInstruction::PluginBytes,
screen_receiver,
2
);
let _ = plugin_thread_sender.send(PluginInstruction::AddClient(client_id));
let _ = plugin_thread_sender.send(PluginInstruction::Load(
plugin_should_float,
plugin_title,
run_plugin,
tab_index,
client_id,
size,
));
let _ = plugin_thread_sender.send(PluginInstruction::Update(vec![(
None,
Some(client_id),
Event::InputReceived,
)])); // will be cached and sent to the plugin once it's loaded
screen_thread.join().unwrap(); // this might take a while if the cache is cold
teardown();
let plugin_bytes_event = received_screen_instructions
.lock()
.unwrap()
.iter()
.find_map(|i| {
if let ScreenInstruction::PluginBytes(plugin_bytes) = i {
for (plugin_id, client_id, plugin_bytes) in plugin_bytes {
let plugin_bytes = String::from_utf8_lossy(plugin_bytes).to_string();
if plugin_bytes.contains("InputReceived") {
return Some((*plugin_id, *client_id, plugin_bytes));
}
}
}
None
});
assert_snapshot!(format!("{:#?}", plugin_bytes_event));
}
#[test]
#[ignore]
pub fn plugin_workers() {
let (plugin_thread_sender, screen_receiver, mut teardown) = create_plugin_thread();
let plugin_should_float = Some(false);
let plugin_title = Some("test_plugin".to_owned());
let run_plugin = RunPlugin {
_allow_exec_host_cmd: false,
location: RunPluginLocation::File(PathBuf::from(&*PLUGIN_FIXTURE)),
};
let tab_index = 1;
let client_id = 1;
let size = Size {
cols: 121,
rows: 20,
};
let received_screen_instructions = Arc::new(Mutex::new(vec![]));
let screen_thread = log_actions_in_thread!(
received_screen_instructions,
ScreenInstruction::PluginBytes,
screen_receiver,
3
);
let _ = plugin_thread_sender.send(PluginInstruction::AddClient(client_id));
let _ = plugin_thread_sender.send(PluginInstruction::Load(
plugin_should_float,
plugin_title,
run_plugin,
tab_index,
client_id,
size,
));
// we send a SystemClipboardFailure to trigger the custom handler in the fixture plugin that
// will send a message to the worker and in turn back to the plugin to be rendered, so we know
// that this cycle is working
let _ = plugin_thread_sender.send(PluginInstruction::Update(vec![(
None,
Some(client_id),
Event::SystemClipboardFailure,
)])); // will be cached and sent to the plugin once it's loaded
screen_thread.join().unwrap(); // this might take a while if the cache is cold
teardown();
let plugin_bytes_event = received_screen_instructions
.lock()
.unwrap()
.iter()
.find_map(|i| {
if let ScreenInstruction::PluginBytes(plugin_bytes) = i {
for (plugin_id, client_id, plugin_bytes) in plugin_bytes {
let plugin_bytes = String::from_utf8_lossy(plugin_bytes).to_string();
if plugin_bytes.contains("Payload from worker") {
return Some((*plugin_id, *client_id, plugin_bytes));
}
}
}
None
});
assert_snapshot!(format!("{:#?}", plugin_bytes_event));
}
#[test]
#[ignore]
pub fn plugin_workers_persist_state() {
let (plugin_thread_sender, screen_receiver, mut teardown) = create_plugin_thread();
let plugin_should_float = Some(false);
let plugin_title = Some("test_plugin".to_owned());
let run_plugin = RunPlugin {
_allow_exec_host_cmd: false,
location: RunPluginLocation::File(PathBuf::from(&*PLUGIN_FIXTURE)),
};
let tab_index = 1;
let client_id = 1;
let size = Size {
cols: 121,
rows: 20,
};
let received_screen_instructions = Arc::new(Mutex::new(vec![]));
let screen_thread = log_actions_in_thread!(
received_screen_instructions,
ScreenInstruction::PluginBytes,
screen_receiver,
5
);
let _ = plugin_thread_sender.send(PluginInstruction::AddClient(client_id));
let _ = plugin_thread_sender.send(PluginInstruction::Load(
plugin_should_float,
plugin_title,
run_plugin,
tab_index,
client_id,
size,
));
// we send a SystemClipboardFailure to trigger the custom handler in the fixture plugin that
// will send a message to the worker and in turn back to the plugin to be rendered, so we know
// that this cycle is working
// we do this a second time so that the worker will log the first message on its own state and
// then send us the "received 2 messages" indication we check for below, letting us know it
// managed to persist its own state and act upon it
let _ = plugin_thread_sender.send(PluginInstruction::Update(vec![(
None,
Some(client_id),
Event::SystemClipboardFailure,
)]));
let _ = plugin_thread_sender.send(PluginInstruction::Update(vec![(
None,
Some(client_id),
Event::SystemClipboardFailure,
)]));
screen_thread.join().unwrap(); // this might take a while if the cache is cold
teardown();
let plugin_bytes_event = received_screen_instructions
.lock()
.unwrap()
.iter()
.find_map(|i| {
if let ScreenInstruction::PluginBytes(plugin_bytes) = i {
for (plugin_id, client_id, plugin_bytes) in plugin_bytes {
let plugin_bytes = String::from_utf8_lossy(plugin_bytes).to_string();
if plugin_bytes.contains("received 2 messages") {
return Some((*plugin_id, *client_id, plugin_bytes));
}
}
}
None
});
assert_snapshot!(format!("{:#?}", plugin_bytes_event));
}

View file

@ -0,0 +1,12 @@
---
source: zellij-server/src/plugins/./unit/plugin_tests.rs
assertion_line: 744
expression: "format!(\"{:#?}\", plugin_bytes_event)"
---
Some(
(
0,
1,
"Rows: 20, Cols: 121, Received events: [InputReceived]\n\r",
),
)

View file

@ -0,0 +1,12 @@
---
source: zellij-server/src/plugins/./unit/plugin_tests.rs
assertion_line: 250
expression: "format!(\"{:#?}\", plugin_bytes_event)"
---
Some(
(
0,
1,
"Payload from worker: \"gimme_back_my_payload, received 1 messages\"\n\r",
),
)

View file

@ -0,0 +1,12 @@
---
source: zellij-server/src/plugins/./unit/plugin_tests.rs
assertion_line: 319
expression: "format!(\"{:#?}\", plugin_bytes_event)"
---
Some(
(
0,
1,
"Payload from worker: \"gimme_back_my_payload, received 2 messages\"\n\r",
),
)

View file

@ -1,14 +1,15 @@
use super::PluginInstruction;
use super::{PluginId, PluginInstruction};
use crate::plugins::plugin_loader::{PluginLoader, VersionMismatchError};
use crate::plugins::plugin_map::{AtomicEvent, PluginEnv, PluginMap};
use crate::plugins::plugin_map::{
AtomicEvent, PluginEnv, PluginMap, RunningPlugin, RunningWorker, Subscriptions,
};
use crate::plugins::zellij_exports::{wasi_read_string, wasi_write_object};
use log::info;
use std::{
collections::{HashMap, HashSet},
fmt::Display,
path::PathBuf,
str::FromStr,
sync::{Arc, Mutex},
sync::{Arc, Mutex, TryLockError},
};
use wasmer::{Instance, Module, Store, Value};
use zellij_utils::async_std::task::{self, JoinHandle};
@ -22,7 +23,6 @@ use zellij_utils::{
consts::VERSION,
data::{Event, EventType},
errors::prelude::*,
errors::ZellijError,
input::{
layout::{RunPlugin, RunPluginLocation},
plugins::PluginsConfig,
@ -30,7 +30,7 @@ use zellij_utils::{
pane_size::Size,
};
pub type PluginId = u32;
const RETRY_INTERVAL_MS: u64 = 100;
pub struct WasmBridge {
connected_clients: Arc<Mutex<Vec<ClientId>>>,
@ -40,10 +40,14 @@ pub struct WasmBridge {
plugin_dir: PathBuf,
plugin_cache: Arc<Mutex<HashMap<PathBuf, Module>>>,
plugin_map: Arc<Mutex<PluginMap>>,
next_plugin_id: u32,
cached_events_for_pending_plugins: HashMap<u32, Vec<Event>>, // u32 is the plugin id
cached_resizes_for_pending_plugins: HashMap<u32, (usize, usize)>, // (rows, columns)
loading_plugins: HashMap<(u32, RunPlugin), JoinHandle<()>>, // plugin_id to join-handle
next_plugin_id: PluginId,
cached_events_for_pending_plugins: HashMap<PluginId, Vec<Event>>,
cached_resizes_for_pending_plugins: HashMap<PluginId, (usize, usize)>, // (rows, columns)
cached_worker_messages: HashMap<PluginId, Vec<(ClientId, String, String, String)>>, // Vec<clientid,
// worker_name,
// message,
// payload>
loading_plugins: HashMap<(PluginId, RunPlugin), JoinHandle<()>>, // plugin_id to join-handle
pending_plugin_reloads: HashSet<RunPlugin>,
}
@ -54,7 +58,7 @@ impl WasmBridge {
store: Store,
plugin_dir: PathBuf,
) -> Self {
let plugin_map = Arc::new(Mutex::new(HashMap::new()));
let plugin_map = Arc::new(Mutex::new(PluginMap::default()));
let connected_clients: Arc<Mutex<Vec<ClientId>>> = Arc::new(Mutex::new(vec![]));
let plugin_cache: Arc<Mutex<HashMap<PathBuf, Module>>> =
Arc::new(Mutex::new(HashMap::new()));
@ -69,6 +73,7 @@ impl WasmBridge {
next_plugin_id: 0,
cached_events_for_pending_plugins: HashMap::new(),
cached_resizes_for_pending_plugins: HashMap::new(),
cached_worker_messages: HashMap::new(),
loading_plugins: HashMap::new(),
pending_plugin_reloads: HashSet::new(),
}
@ -79,7 +84,7 @@ impl WasmBridge {
tab_index: usize,
size: Size,
client_id: Option<ClientId>,
) -> Result<u32> {
) -> Result<PluginId> {
// returns the plugin id
let err_context = move || format!("failed to load plugin");
@ -152,14 +157,14 @@ impl WasmBridge {
self.next_plugin_id += 1;
Ok(plugin_id)
}
pub fn unload_plugin(&mut self, pid: u32) -> Result<()> {
pub fn unload_plugin(&mut self, pid: PluginId) -> Result<()> {
info!("Bye from plugin {}", &pid);
// TODO: remove plugin's own data directory
let mut plugin_map = self.plugin_map.lock().unwrap();
let ids_in_plugin_map: Vec<(u32, ClientId)> = plugin_map.keys().copied().collect();
for (plugin_id, client_id) in ids_in_plugin_map {
if pid == plugin_id {
drop(plugin_map.remove(&(plugin_id, client_id)));
for (running_plugin, _, _) in plugin_map.remove_plugins(pid) {
let running_plugin = running_plugin.lock().unwrap();
let cache_dir = running_plugin.plugin_env.plugin_own_data_dir.clone();
if let Err(e) = std::fs::remove_dir_all(cache_dir) {
log::error!("Failed to remove cache dir for plugin: {:?}", e);
}
}
Ok(())
@ -268,18 +273,29 @@ impl WasmBridge {
Err(e) => Err(e),
}
}
pub fn resize_plugin(&mut self, pid: u32, new_columns: usize, new_rows: usize) -> Result<()> {
pub fn resize_plugin(
&mut self,
pid: PluginId,
new_columns: usize,
new_rows: usize,
) -> Result<()> {
let err_context = move || format!("failed to resize plugin {pid}");
for ((plugin_id, client_id), (running_plugin, _subscriptions)) in
self.plugin_map.lock().unwrap().iter_mut()
{
if self
.cached_resizes_for_pending_plugins
.contains_key(&plugin_id)
{
continue;
}
if *plugin_id == pid {
let plugins_to_resize: Vec<(PluginId, ClientId, Arc<Mutex<RunningPlugin>>)> = self
.plugin_map
.lock()
.unwrap()
.running_plugins()
.iter()
.cloned()
.filter(|(plugin_id, _client_id, _running_plugin)| {
!self
.cached_resizes_for_pending_plugins
.contains_key(&plugin_id)
})
.collect();
for (plugin_id, client_id, running_plugin) in plugins_to_resize {
if plugin_id == pid {
let event_id = running_plugin
.lock()
.unwrap()
@ -287,8 +303,8 @@ impl WasmBridge {
task::spawn({
let senders = self.senders.clone();
let running_plugin = running_plugin.clone();
let plugin_id = *plugin_id;
let client_id = *client_id;
let plugin_id = plugin_id;
let client_id = client_id;
async move {
let mut running_plugin = running_plugin.lock().unwrap();
if running_plugin.apply_event_id(AtomicEvent::Resize, event_id) {
@ -339,34 +355,46 @@ impl WasmBridge {
}
pub fn update_plugins(
&mut self,
mut updates: Vec<(Option<u32>, Option<ClientId>, Event)>,
mut updates: Vec<(Option<PluginId>, Option<ClientId>, Event)>,
) -> Result<()> {
let err_context = || "failed to update plugin state".to_string();
for (pid, cid, event) in updates.drain(..) {
for (&(plugin_id, client_id), (running_plugin, subscriptions)) in
&*self.plugin_map.lock().unwrap()
{
if self
let plugins_to_update: Vec<(
PluginId,
ClientId,
Arc<Mutex<RunningPlugin>>,
Arc<Mutex<Subscriptions>>,
)> = self
.plugin_map
.lock()
.unwrap()
.running_plugins_and_subscriptions()
.iter()
.cloned()
.filter(|(plugin_id, _client_id, _running_plugin, _subscriptions)| {
!&self
.cached_events_for_pending_plugins
.contains_key(&plugin_id)
{
continue;
}
})
.collect();
for (pid, cid, event) in updates.drain(..) {
for (plugin_id, client_id, running_plugin, subscriptions) in &plugins_to_update {
let subs = subscriptions.lock().unwrap().clone();
// FIXME: This is very janky... Maybe I should write my own macro for Event -> EventType?
let event_type =
EventType::from_str(&event.to_string()).with_context(err_context)?;
if subs.contains(&event_type)
&& ((pid.is_none() && cid.is_none())
|| (pid.is_none() && cid == Some(client_id))
|| (cid.is_none() && pid == Some(plugin_id))
|| (cid == Some(client_id) && pid == Some(plugin_id)))
|| (pid.is_none() && cid == Some(*client_id))
|| (cid.is_none() && pid == Some(*plugin_id))
|| (cid == Some(*client_id) && pid == Some(*plugin_id)))
{
task::spawn({
let senders = self.senders.clone();
let running_plugin = running_plugin.clone();
let event = event.clone();
let plugin_id = *plugin_id;
let client_id = *client_id;
async move {
let running_plugin = running_plugin.lock().unwrap();
let mut plugin_bytes = vec![];
@ -401,7 +429,7 @@ impl WasmBridge {
}
Ok(())
}
pub fn apply_cached_events(&mut self, plugin_ids: Vec<u32>) -> Result<()> {
pub fn apply_cached_events(&mut self, plugin_ids: Vec<PluginId>) -> Result<()> {
let mut applied_plugin_paths = HashSet::new();
for plugin_id in plugin_ids {
self.apply_cached_events_and_resizes_for_plugin(plugin_id)?;
@ -428,6 +456,10 @@ impl WasmBridge {
for (_plugin_id, loading_plugin_task) in self.loading_plugins.drain() {
drop(loading_plugin_task.cancel());
}
let plugin_ids = self.plugin_map.lock().unwrap().plugin_ids();
for plugin_id in &plugin_ids {
drop(self.unload_plugin(*plugin_id));
}
}
fn run_plugin_of_plugin_id(&self, plugin_id: PluginId) -> Option<&RunPlugin> {
self.loading_plugins
@ -450,7 +482,7 @@ impl WasmBridge {
.plugin_map
.lock()
.unwrap()
.get_mut(&(plugin_id, *client_id))
.get_running_plugin_and_subscriptions(plugin_id, *client_id)
{
let subs = subscriptions.lock().unwrap().clone();
for event in events.clone() {
@ -494,6 +526,23 @@ impl WasmBridge {
if let Some((rows, columns)) = self.cached_resizes_for_pending_plugins.remove(&plugin_id) {
self.resize_plugin(plugin_id, columns, rows)?;
}
self.apply_cached_worker_messages(plugin_id)?;
Ok(())
}
pub fn apply_cached_worker_messages(&mut self, plugin_id: PluginId) -> Result<()> {
if let Some(mut messages) = self.cached_worker_messages.remove(&plugin_id) {
let mut worker_messages: HashMap<(ClientId, String), Vec<(String, String)>> =
HashMap::new();
for (client_id, worker_name, message, payload) in messages.drain(..) {
worker_messages
.entry((client_id, worker_name))
.or_default()
.push((message, payload));
}
for ((client_id, worker_name), messages) in worker_messages.drain() {
self.post_messages_to_plugin_worker(plugin_id, client_id, worker_name, messages)?;
}
}
Ok(())
}
fn plugin_is_currently_being_loaded(&self, plugin_location: &RunPluginLocation) -> bool {
@ -506,34 +555,20 @@ impl WasmBridge {
&self,
plugin_location: &RunPluginLocation,
) -> Result<Vec<PluginId>> {
let err_context = || format!("Failed to get plugin ids for location {plugin_location}");
let plugin_ids: Vec<PluginId> = self
.plugin_map
self.plugin_map
.lock()
.unwrap()
.iter()
.filter(|(_, (running_plugin, _subscriptions))| {
&running_plugin.lock().unwrap().plugin_env.plugin.location == plugin_location
// TODO:
// better
})
.map(|((plugin_id, _client_id), _)| *plugin_id)
.collect();
if plugin_ids.is_empty() {
return Err(ZellijError::PluginDoesNotExist).with_context(err_context);
}
Ok(plugin_ids)
.all_plugin_ids_for_plugin_location(plugin_location)
}
fn size_of_plugin_id(&self, plugin_id: PluginId) -> Option<(usize, usize)> {
// (rows/colums)
self.plugin_map
.lock()
.unwrap()
.iter()
.find(|((p_id, _client_id), _)| *p_id == plugin_id)
.map(|(_, (running_plugin, _subscriptions))| {
let running_plugin = running_plugin.lock().unwrap();
(running_plugin.rows, running_plugin.columns)
.get_running_plugin(plugin_id, None)
.map(|r| {
let r = r.lock().unwrap();
(r.rows, r.columns)
})
}
fn start_plugin_loading_indication(
@ -553,6 +588,51 @@ impl WasmBridge {
.send_to_background_jobs(BackgroundJob::AnimatePluginLoading(*plugin_id));
}
}
pub fn post_messages_to_plugin_worker(
&mut self,
plugin_id: PluginId,
client_id: ClientId,
worker_name: String,
mut messages: Vec<(String, String)>,
) -> Result<()> {
let worker =
self.plugin_map
.lock()
.unwrap()
.clone_worker(plugin_id, client_id, &worker_name);
let mut cache_messages = || {
for (message, payload) in messages.drain(..) {
self.cached_worker_messages
.entry(plugin_id)
.or_default()
.push((client_id, worker_name.clone(), message, payload));
}
};
match worker {
Some(worker) => {
let worker_is_busy = { worker.try_lock().is_err() };
if worker_is_busy {
// most messages will be caught here, we do this once before the async task to
// bulk most messages together and prevent them from cascading
cache_messages();
} else {
async_send_messages_to_worker(
self.senders.clone(),
messages,
worker,
plugin_id,
client_id,
worker_name,
);
}
},
None => {
log::warn!("Worker {worker_name} not found, placing message in cache");
cache_messages();
},
}
Ok(())
}
}
fn handle_plugin_successful_loading(senders: &ThreadSenders, plugin_id: PluginId) {
@ -564,11 +644,11 @@ fn handle_plugin_loading_failure(
senders: &ThreadSenders,
plugin_id: PluginId,
loading_indication: &mut LoadingIndication,
error: impl Display,
error: impl std::fmt::Debug,
) {
log::error!("{}", error);
log::error!("{:?}", error);
let _ = senders.send_to_background_jobs(BackgroundJob::StopPluginLoadingAnimation(plugin_id));
loading_indication.indicate_loading_error(error.to_string());
loading_indication.indicate_loading_error(format!("{:?}", error));
let _ = senders.send_to_screen(ScreenInstruction::UpdatePluginLoadingStage(
plugin_id,
loading_indication.clone(),
@ -576,14 +656,14 @@ fn handle_plugin_loading_failure(
}
pub fn apply_event_to_plugin(
plugin_id: u32,
plugin_id: PluginId,
client_id: ClientId,
instance: &Instance,
plugin_env: &PluginEnv,
event: &Event,
rows: usize,
columns: usize,
plugin_bytes: &mut Vec<(u32, ClientId, Vec<u8>)>,
plugin_bytes: &mut Vec<(PluginId, ClientId, Vec<u8>)>,
) -> Result<()> {
let err_context = || format!("Failed to apply event to plugin {plugin_id}");
let update = instance
@ -627,3 +707,53 @@ pub fn apply_event_to_plugin(
}
Ok(())
}
fn async_send_messages_to_worker(
senders: ThreadSenders,
mut messages: Vec<(String, String)>,
worker: Arc<Mutex<RunningWorker>>,
plugin_id: PluginId,
client_id: ClientId,
worker_name: String,
) {
task::spawn({
async move {
match worker.try_lock() {
Ok(worker) => {
for (message, payload) in messages.drain(..) {
worker.send_message(message, payload).ok();
}
let _ = senders
.send_to_plugin(PluginInstruction::ApplyCachedWorkerMessages(plugin_id));
},
Err(TryLockError::WouldBlock) => {
task::spawn({
async move {
log::warn!(
"Worker {} busy, retrying sending message after: {}ms",
worker_name,
RETRY_INTERVAL_MS
);
task::sleep(std::time::Duration::from_millis(RETRY_INTERVAL_MS)).await;
let _ = senders.send_to_plugin(
PluginInstruction::PostMessagesToPluginWorker(
plugin_id,
client_id,
worker_name,
messages,
),
);
}
});
},
Err(e) => {
log::error!(
"Failed to send message to worker \"{}\": {:?}",
worker_name,
e
);
},
}
}
});
}

View file

@ -50,10 +50,13 @@ pub fn zellij_exports(
host_get_plugin_ids,
host_get_zellij_version,
host_open_file,
host_open_file_with_line,
host_switch_tab_to,
host_set_timeout,
host_exec_cmd,
host_report_panic,
host_post_message_to,
host_post_message_to_plugin,
}
}
@ -171,6 +174,27 @@ fn host_open_file(env: &ForeignFunctionEnv) {
.non_fatal();
}
fn host_open_file_with_line(env: &ForeignFunctionEnv) {
wasi_read_object::<(PathBuf, usize)>(&env.plugin_env.wasi_env)
.and_then(|(path, line)| {
env.plugin_env
.senders
.send_to_pty(PtyInstruction::SpawnTerminal(
Some(TerminalAction::OpenFile(path, Some(line), None)), // TODO: add cwd
None,
None,
ClientOrTabIndex::TabIndex(env.plugin_env.tab_index),
))
})
.with_context(|| {
format!(
"failed to open file on host from plugin {}",
env.plugin_env.name()
)
})
.non_fatal();
}
fn host_switch_tab_to(env: &ForeignFunctionEnv, tab_idx: u32) {
env.plugin_env
.senders
@ -201,6 +225,7 @@ fn host_set_timeout(env: &ForeignFunctionEnv, secs: f64) {
let update_target = Some(env.plugin_env.plugin_id);
let client_id = env.plugin_env.client_id;
let plugin_name = env.plugin_env.name();
// TODO: we should really use an async task for this
thread::spawn(move || {
let start_time = Instant::now();
thread::sleep(Duration::from_secs_f64(secs));
@ -257,6 +282,38 @@ fn host_exec_cmd(env: &ForeignFunctionEnv) {
.non_fatal();
}
fn host_post_message_to(env: &ForeignFunctionEnv) {
wasi_read_object::<(String, String, String)>(&env.plugin_env.wasi_env)
.and_then(|(worker_name, message, payload)| {
env.plugin_env
.senders
.send_to_plugin(PluginInstruction::PostMessagesToPluginWorker(
env.plugin_env.plugin_id,
env.plugin_env.client_id,
worker_name,
vec![(message, payload)],
))
})
.with_context(|| format!("failed to post message to worker {}", env.plugin_env.name()))
.fatal();
}
fn host_post_message_to_plugin(env: &ForeignFunctionEnv) {
wasi_read_object::<(String, String)>(&env.plugin_env.wasi_env)
.and_then(|(message, payload)| {
env.plugin_env
.senders
.send_to_plugin(PluginInstruction::PostMessageToPlugin(
env.plugin_env.plugin_id,
env.plugin_env.client_id,
message,
payload,
))
})
.with_context(|| format!("failed to post message to plugin {}", env.plugin_env.name()))
.fatal();
}
// Custom panic handler for plugins.
//
// This is called when a panic occurs in a plugin. Since most panics will likely originate in the

View file

@ -1,6 +1,7 @@
pub mod prelude;
pub mod shim;
use serde::{Deserialize, Serialize};
use zellij_utils::data::Event;
#[allow(unused_variables)]
@ -12,6 +13,12 @@ pub trait ZellijPlugin {
fn render(&mut self, rows: usize, cols: usize) {}
}
#[allow(unused_variables)]
// TODO: can we get rid of the lifetime? maybe with generics?
pub trait ZellijWorker<'de>: Default + Serialize + Deserialize<'de> {
fn on_message(&mut self, message: String, payload: String) {}
}
pub const PLUGIN_MISMATCH: &str =
"An error occured in a plugin while receiving an Event from zellij. This means
that the plugins aren't compatible with the current zellij version.
@ -36,7 +43,10 @@ macro_rules! register_plugin {
std::panic::set_hook(Box::new(|info| {
report_panic(info);
}));
}
#[no_mangle]
fn load() {
STATE.with(|state| {
state.borrow_mut().load();
});
@ -65,3 +75,61 @@ macro_rules! register_plugin {
}
};
}
#[macro_export]
macro_rules! register_worker {
($worker:ty, $worker_name:ident) => {
#[no_mangle]
pub fn $worker_name() {
use serde_json::*;
let worker_display_name = std::stringify!($worker_name);
// read message from STDIN
let (message, payload): (String, String) = $crate::shim::object_from_stdin()
.unwrap_or_else(|e| {
eprintln!(
"Failed to deserialize message to worker \"{}\": {:?}",
worker_display_name, e
);
Default::default()
});
// read previous worker state from HD if it exists
let mut worker_instance = match std::fs::read(&format!("/data/{}", worker_display_name))
.map_err(|e| format!("Failed to read file: {:?}", e))
.and_then(|s| {
serde_json::from_str::<$worker>(&String::from_utf8_lossy(&s))
.map_err(|e| format!("Failed to deserialize: {:?}", e))
}) {
Ok(s) => s,
Err(e) => {
eprintln!(
"Failed to read existing state ({:?}), creating new state for worker",
e
);
<$worker>::default()
},
};
// invoke worker
worker_instance.on_message(message, payload);
// persist worker state to HD for next run
match serde_json::to_string(&worker_instance)
.map_err(|e| format!("Failed to serialize worker state"))
.and_then(|serialized_state| {
std::fs::write(
&format!("/data/{}", worker_display_name),
serialized_state.as_bytes(),
)
.map_err(|e| format!("Failed to persist state to HD: {:?}", e))
}) {
Ok(()) => {},
Err(e) => eprintln!(
"Failed to serialize and persist worker state to hd: {:?}",
e
),
}
}
};
}

View file

@ -39,6 +39,11 @@ pub fn open_file(path: &Path) {
unsafe { host_open_file() };
}
pub fn open_file_with_line(path: &Path, line: usize) {
object_to_stdout(&(path, line));
unsafe { host_open_file_with_line() };
}
pub fn switch_tab_to(tab_idx: u32) {
unsafe { host_switch_tab_to(tab_idx) };
}
@ -74,6 +79,24 @@ pub fn object_to_stdout(object: &impl Serialize) {
println!("{}", serde_json::to_string(object).unwrap());
}
#[doc(hidden)]
pub fn post_message_to(worker_name: &str, message: String, payload: String) {
match serde_json::to_string(&(worker_name, message, payload)) {
Ok(serialized) => println!("{}", serialized),
Err(e) => eprintln!("Failed to serialize message: {:?}", e),
}
unsafe { host_post_message_to() };
}
#[doc(hidden)]
pub fn post_message_to_plugin(message: String, payload: String) {
match serde_json::to_string(&(message, payload)) {
Ok(serialized) => println!("{}", serialized),
Err(e) => eprintln!("Failed to serialize message: {:?}", e),
}
unsafe { host_post_message_to_plugin() };
}
#[link(wasm_import_module = "zellij")]
extern "C" {
fn host_subscribe();
@ -82,8 +105,11 @@ extern "C" {
fn host_get_plugin_ids();
fn host_get_zellij_version();
fn host_open_file();
fn host_open_file_with_line();
fn host_switch_tab_to(tab_idx: u32);
fn host_set_timeout(secs: f64);
fn host_exec_cmd();
fn host_report_panic();
fn host_post_message_to();
fn host_post_message_to_plugin();
}

View file

@ -39,6 +39,7 @@ regex = "1.5.5"
tempfile = "3.2.0"
kdl = { version = "4.5.0", features = ["span"] }
shellexpand = "3.0.0"
uuid = { version = "0.8.2", features = ["serde", "v4"] }
#[cfg(not(target_family = "wasm"))]
[target.'cfg(not(target_family = "wasm"))'.dependencies]

Binary file not shown.

View file

@ -5,6 +5,7 @@ use directories_next::ProjectDirs;
use lazy_static::lazy_static;
use once_cell::sync::OnceCell;
use std::path::PathBuf;
use uuid::Uuid;
pub const ZELLIJ_CONFIG_FILE_ENV: &str = "ZELLIJ_CONFIG_FILE";
pub const ZELLIJ_CONFIG_DIR_ENV: &str = "ZELLIJ_CONFIG_DIR";
@ -29,6 +30,10 @@ lazy_static! {
pub static ref ZELLIJ_PROJ_DIR: ProjectDirs =
ProjectDirs::from("org", "Zellij Contributors", "Zellij").unwrap();
pub static ref ZELLIJ_CACHE_DIR: PathBuf = ZELLIJ_PROJ_DIR.cache_dir().to_path_buf();
pub static ref ZELLIJ_SESSION_CACHE_DIR: PathBuf = ZELLIJ_PROJ_DIR
.cache_dir()
.to_path_buf()
.join(format!("{}", Uuid::new_v4()));
pub static ref ZELLIJ_DEFAULT_THEMES: Themes = {
let mut default_themes = Themes::default();

View file

@ -469,6 +469,10 @@ pub enum Event {
SystemClipboardFailure,
InputReceived,
Visible(bool),
CustomMessage(
String, // message
String, // payload
),
}
/// Describes the different input modes, which change the way that keystrokes will be interpreted.

View file

@ -365,6 +365,9 @@ pub enum PluginContext {
RemoveClient,
NewTab,
ApplyCachedEvents,
ApplyCachedWorkerMessages,
PostMessageToPluginWorker,
PostMessageToPlugin,
}
/// Stack call representations corresponding to the different types of [`ClientInstruction`]s.