diff --git a/Cargo.lock b/Cargo.lock index 74374912..33c76296 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -776,11 +776,12 @@ checksum = "3c877555693c14d2f84191cfd3ad8582790fc52b5e2274b40b59cf5f5cea25c7" [[package]] name = "dialoguer" -version = "0.10.1" +version = "0.10.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d8c8ae48e400addc32a8710c8d62d55cb84249a7d58ac4cd959daecfbaddc545" +checksum = "59c6f2989294b9a498d3ad5491a79c6deb604617378e1cdc4bfc1c1361fe2f87" dependencies = [ "console", + "shell-words", "tempfile", "zeroize", ] @@ -1019,6 +1020,15 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" +[[package]] +name = "fixture-plugin-for-tests" +version = "0.1.0" +dependencies = [ + "serde", + "serde_json", + "zellij-tile", +] + [[package]] name = "fnv" version = "1.0.7" @@ -1071,6 +1081,15 @@ dependencies = [ "waker-fn", ] +[[package]] +name = "fuzzy-matcher" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "54614a3312934d066701a80f20f15fa3b56d67ac7722b39eea5b4c9dd1d66c94" +dependencies = [ + "thread_local", +] + [[package]] name = "generational-arena" version = "0.2.8" @@ -2443,6 +2462,15 @@ version = "1.0.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f3f6f92acf49d1b98f7a81226834412ada05458b7364277387724a237f062695" +[[package]] +name = "same-file" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502" +dependencies = [ + "winapi-util", +] + [[package]] name = "scopeguard" version = "1.1.0" @@ -2596,6 +2624,12 @@ dependencies = [ "opaque-debug 0.3.0", ] +[[package]] +name = "shell-words" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24188a676b6ae68c3b2cb3a01be17fbf7240ce009799bb56d5b1409051e78fde" + [[package]] name = "shellexpand" version = "3.0.0" @@ -2798,8 +2832,14 @@ checksum = "213701ba3370744dcd1a12960caa4843b3d68b4d1c0a5d575e0d65b2ee9d16c0" name = "strider" version = "0.2.0" dependencies = [ + "ansi_term", "colored", + "fuzzy-matcher", "pretty-bytes", + "serde", + "serde_json", + "unicode-width", + "walkdir", "zellij-tile", ] @@ -3066,6 +3106,16 @@ dependencies = [ "winapi", ] +[[package]] +name = "thread_local" +version = "1.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3fdd6f064ccff2d6567adcb3873ca630700f00b5ad3f060c25b5dcfd9a4ce152" +dependencies = [ + "cfg-if 1.0.0", + "once_cell", +] + [[package]] name = "time" version = "0.1.44" @@ -3371,6 +3421,16 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9d5b2c62b4012a3e1eca5a7e077d13b3bf498c4073e33ccd58626607748ceeca" +[[package]] +name = "walkdir" +version = "2.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "36df944cda56c7d8d8b7496af378e6b16de9284591917d307c9b4d313c44e698" +dependencies = [ + "same-file", + "winapi-util", +] + [[package]] name = "wasi" version = "0.9.0+wasi-snapshot-preview1" @@ -4212,6 +4272,7 @@ dependencies = [ "sixel-image", "sixel-tokenizer", "sysinfo", + "tempfile", "typetag", "unicode-width", "url", @@ -4278,6 +4339,7 @@ dependencies = [ "thiserror", "unicode-width", "url", + "uuid", "vte 0.11.0", ] diff --git a/Cargo.toml b/Cargo.toml index 1c16269f..1ce85c77 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,6 +34,7 @@ members = [ "default-plugins/status-bar", "default-plugins/strider", "default-plugins/tab-bar", + "default-plugins/fixture-plugin-for-tests", "zellij-client", "zellij-server", "zellij-utils", diff --git a/default-plugins/fixture-plugin-for-tests/.cargo/config.toml b/default-plugins/fixture-plugin-for-tests/.cargo/config.toml new file mode 100644 index 00000000..bc255e30 --- /dev/null +++ b/default-plugins/fixture-plugin-for-tests/.cargo/config.toml @@ -0,0 +1,2 @@ +[build] +target = "wasm32-wasi" \ No newline at end of file diff --git a/default-plugins/fixture-plugin-for-tests/Cargo.toml b/default-plugins/fixture-plugin-for-tests/Cargo.toml new file mode 100644 index 00000000..39211c34 --- /dev/null +++ b/default-plugins/fixture-plugin-for-tests/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "fixture-plugin-for-tests" +version = "0.1.0" +authors = ["Aram Drevekenin "] +edition = "2021" +license = "MIT" + +[dependencies] +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +zellij-tile = { path = "../../zellij-tile" } diff --git a/default-plugins/fixture-plugin-for-tests/LICENSE.md b/default-plugins/fixture-plugin-for-tests/LICENSE.md new file mode 120000 index 00000000..f0608a63 --- /dev/null +++ b/default-plugins/fixture-plugin-for-tests/LICENSE.md @@ -0,0 +1 @@ +../../LICENSE.md \ No newline at end of file diff --git a/default-plugins/fixture-plugin-for-tests/src/main.rs b/default-plugins/fixture-plugin-for-tests/src/main.rs new file mode 100644 index 00000000..124b2dc7 --- /dev/null +++ b/default-plugins/fixture-plugin-for-tests/src/main.rs @@ -0,0 +1,78 @@ +use serde::{Deserialize, Serialize}; +use zellij_tile::prelude::*; + +// This is a fixture plugin used only for tests in Zellij +// it is not (and should not!) be included in the mainline executable +// it's included here for convenience so that it will be built by the CI + +#[derive(Default)] +struct State { + received_events: Vec, + received_payload: Option, +} + +#[derive(Default, Serialize, Deserialize)] +struct TestWorker { + number_of_messages_received: usize, +} + +impl<'de> ZellijWorker<'de> for TestWorker { + fn on_message(&mut self, message: String, payload: String) { + if message == "ping" { + self.number_of_messages_received += 1; + post_message_to_plugin( + "pong".into(), + format!( + "{}, received {} messages", + payload, self.number_of_messages_received + ), + ); + } + } +} + +register_plugin!(State); +register_worker!(TestWorker, test_worker); + +impl ZellijPlugin for State { + fn load(&mut self) { + subscribe(&[ + EventType::InputReceived, + EventType::SystemClipboardFailure, + EventType::CustomMessage, + ]); + } + + fn update(&mut self, event: Event) -> bool { + match &event { + Event::CustomMessage(message, payload) => { + if message == "pong" { + self.received_payload = Some(payload.clone()); + } + }, + Event::SystemClipboardFailure => { + // this is just to trigger the worker message + post_message_to( + "test", + "ping".to_owned(), + "gimme_back_my_payload".to_owned(), + ); + }, + _ => {}, + } + let should_render = true; + self.received_events.push(event); + should_render + } + + fn render(&mut self, rows: usize, cols: usize) { + if let Some(payload) = self.received_payload.as_ref() { + println!("Payload from worker: {:?}", payload); + } else { + println!( + "Rows: {:?}, Cols: {:?}, Received events: {:?}", + rows, cols, self.received_events + ); + } + } +} diff --git a/default-plugins/strider/Cargo.toml b/default-plugins/strider/Cargo.toml index f7801bfa..d45a8ff2 100644 --- a/default-plugins/strider/Cargo.toml +++ b/default-plugins/strider/Cargo.toml @@ -10,3 +10,9 @@ license = "MIT" colored = "2.0.0" zellij-tile = { path = "../../zellij-tile" } pretty-bytes = "0.2.2" +walkdir = "2.3.3" +fuzzy-matcher = "0.3.7" +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +unicode-width = "0.1.8" +ansi_term = "0.12.1" diff --git a/default-plugins/strider/src/main.rs b/default-plugins/strider/src/main.rs index 6868c670..4f299c50 100644 --- a/default-plugins/strider/src/main.rs +++ b/default-plugins/strider/src/main.rs @@ -1,16 +1,28 @@ +mod search; mod state; use colored::*; -use state::{refresh_directory, FsEntry, State}; +use search::{ResultsOfSearch, SearchWorker}; +use serde_json; +use state::{refresh_directory, FsEntry, State, CURRENT_SEARCH_TERM}; use std::{cmp::min, time::Instant}; use zellij_tile::prelude::*; register_plugin!(State); +register_worker!(SearchWorker, search_worker); impl ZellijPlugin for State { fn load(&mut self) { refresh_directory(self); - subscribe(&[EventType::Key, EventType::Mouse]); + self.loading = true; + subscribe(&[ + EventType::Key, + EventType::Mouse, + EventType::CustomMessage, + EventType::Timer, + ]); + post_message_to("search", String::from("scan_folder"), String::new()); + set_timeout(0.5); // for displaying loading animation } fn update(&mut self, event: Event) -> bool { @@ -22,26 +34,101 @@ impl ZellijPlugin for State { }; self.ev_history.push_back((event.clone(), Instant::now())); match event { + Event::Timer(_elapsed) => { + should_render = true; + if self.loading { + set_timeout(0.5); + if self.loading_animation_offset == u8::MAX { + self.loading_animation_offset = 0; + } else { + self.loading_animation_offset = + self.loading_animation_offset.saturating_add(1); + } + } + }, + Event::CustomMessage(message, payload) => match message.as_str() { + "update_search_results" => { + if let Ok(mut results_of_search) = + serde_json::from_str::(&payload) + { + if Some(results_of_search.search_term) == self.search_term { + self.search_results = + results_of_search.search_results.drain(..).collect(); + should_render = true; + } + } + }, + "done_scanning_folder" => { + self.loading = false; + should_render = true; + }, + _ => {}, + }, Event::Key(key) => match key { + // modes: + // 1. typing_search_term + // 2. exploring_search_results + // 3. normal + Key::Esc | Key::Char('\n') if self.typing_search_term() => { + self.accept_search_term(); + }, + _ if self.typing_search_term() => { + self.append_to_search_term(key); + if let Some(search_term) = self.search_term.as_ref() { + std::fs::write(CURRENT_SEARCH_TERM, search_term.as_bytes()).unwrap(); + post_message_to( + "search", + String::from("search"), + String::from(&self.search_term.clone().unwrap()), + ); + } + should_render = true; + }, + Key::Esc if self.exploring_search_results() => { + self.stop_exploring_search_results(); + should_render = true; + }, + Key::Char('/') => { + self.start_typing_search_term(); + should_render = true; + }, + Key::Esc => { + self.stop_typing_search_term(); + should_render = true; + }, Key::Up | Key::Char('k') => { - let currently_selected = self.selected(); - *self.selected_mut() = self.selected().saturating_sub(1); - if currently_selected != self.selected() { + if self.exploring_search_results() { + self.move_search_selection_up(); should_render = true; + } else { + let currently_selected = self.selected(); + *self.selected_mut() = self.selected().saturating_sub(1); + if currently_selected != self.selected() { + should_render = true; + } } }, Key::Down | Key::Char('j') => { - let currently_selected = self.selected(); - let next = self.selected().saturating_add(1); - *self.selected_mut() = min(self.files.len().saturating_sub(1), next); - if currently_selected != self.selected() { + if self.exploring_search_results() { + self.move_search_selection_down(); should_render = true; + } else { + let currently_selected = self.selected(); + let next = self.selected().saturating_add(1); + *self.selected_mut() = min(self.files.len().saturating_sub(1), next); + if currently_selected != self.selected() { + should_render = true; + } } }, Key::Right | Key::Char('\n') | Key::Char('l') if !self.files.is_empty() => { + if self.exploring_search_results() { + self.open_search_result(); + } else { + self.traverse_dir_or_open_file(); + self.ev_history.clear(); + } should_render = true; - self.traverse_dir_or_open_file(); - self.ev_history.clear(); }, Key::Left | Key::Char('h') => { if self.path.components().count() > 2 { @@ -111,6 +198,10 @@ impl ZellijPlugin for State { } fn render(&mut self, rows: usize, cols: usize) { + if self.typing_search_term() || self.exploring_search_results() { + return self.render_search(rows, cols); + } + for i in 0..rows { if self.selected() < self.scroll() { *self.scroll_mut() = self.selected(); diff --git a/default-plugins/strider/src/search.rs b/default-plugins/strider/src/search.rs new file mode 100644 index 00000000..299882ea --- /dev/null +++ b/default-plugins/strider/src/search.rs @@ -0,0 +1,415 @@ +use crate::state::{State, CURRENT_SEARCH_TERM, ROOT}; + +use unicode_width::UnicodeWidthStr; +use zellij_tile::prelude::*; + +use fuzzy_matcher::skim::SkimMatcherV2; +use fuzzy_matcher::FuzzyMatcher; +use serde::{Deserialize, Serialize}; +use walkdir::WalkDir; + +use std::io::{self, BufRead}; + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub enum SearchResult { + File { + path: String, + score: i64, + indices: Vec, + }, + LineInFile { + path: String, + line: String, + line_number: usize, + score: i64, + indices: Vec, + }, +} + +impl SearchResult { + pub fn new_file_name(score: i64, indices: Vec, path: String) -> Self { + SearchResult::File { + path, + score, + indices, + } + } + pub fn new_file_line( + score: i64, + indices: Vec, + path: String, + line: String, + line_number: usize, + ) -> Self { + SearchResult::LineInFile { + path, + score, + indices, + line, + line_number, + } + } + pub fn score(&self) -> i64 { + match self { + SearchResult::File { score, .. } => *score, + SearchResult::LineInFile { score, .. } => *score, + } + } + pub fn rendered_height(&self) -> usize { + match self { + SearchResult::File { .. } => 1, + SearchResult::LineInFile { .. } => 2, + } + } + pub fn render(&self, max_width: usize, is_selected: bool) -> String { + let green_code = 154; + let orange_code = 166; + let bold_code = "\u{1b}[1m"; + let green_foreground = format!("\u{1b}[38;5;{}m", green_code); + let orange_foreground = format!("\u{1b}[38;5;{}m", orange_code); + let reset_code = "\u{1b}[m"; + let max_width = max_width.saturating_sub(3); // for the UI left line separator + match self { + SearchResult::File { path, indices, .. } => { + if is_selected { + let line = self.render_line_with_indices( + path, + indices, + max_width, + None, + Some(green_code), + true, + ); + format!("{} | {}{}", green_foreground, reset_code, line) + } else { + let line = + self.render_line_with_indices(path, indices, max_width, None, None, true); + format!(" | {}", line) + } + }, + SearchResult::LineInFile { + path, + line, + line_number, + indices, + .. + } => { + if is_selected { + let first_line = self.render_line_with_indices( + path, + &vec![], + max_width, + None, + Some(green_code), + true, + ); + let line_indication_text = format!("{}-> {}", bold_code, line_number); + let line_indication = format!( + "{}{}{}", + orange_foreground, line_indication_text, reset_code + ); // TODO: also truncate + let second_line = self.render_line_with_indices( + line, + indices, + max_width.saturating_sub(line_indication_text.width()), + None, + Some(orange_code), + false, + ); + format!( + " {}│{} {}\n {}│{} {} {}", + green_foreground, + reset_code, + first_line, + green_foreground, + reset_code, + line_indication, + second_line + ) + } else { + let first_line = + self.render_line_with_indices(path, &vec![], max_width, None, None, true); // TODO: + let line_indication_text = format!("{}-> {}", bold_code, line_number); + let second_line = self.render_line_with_indices( + line, + indices, + max_width.saturating_sub(line_indication_text.width()), + None, + None, + false, + ); + format!( + " │ {}\n │ {} {}", + first_line, line_indication_text, second_line + ) + } + }, + } + } + fn render_line_with_indices( + &self, + line_to_render: &String, + indices: &Vec, + max_width: usize, + background_color: Option, + foreground_color: Option, + is_bold: bool, + ) -> String { + // TODO: get these from Zellij + let reset_code = "\u{1b}[m"; + let underline_code = "\u{1b}[4m"; + let foreground_color = foreground_color + .map(|c| format!("\u{1b}[38;5;{}m", c)) + .unwrap_or_else(|| format!("")); + let background_color = background_color + .map(|c| format!("\u{1b}[48;5;{}m", c)) + .unwrap_or_else(|| format!("")); + let bold = if is_bold { "\u{1b}[1m" } else { "" }; + let non_index_character_style = format!("{}{}{}", background_color, foreground_color, bold); + let index_character_style = format!( + "{}{}{}{}", + background_color, foreground_color, bold, underline_code + ); + + let mut truncate_start_position = None; + let mut truncate_end_position = None; + if line_to_render.width() > max_width { + let length_of_each_half = max_width.saturating_sub(4) / 2; + truncate_start_position = Some(length_of_each_half); + truncate_end_position = + Some(line_to_render.width().saturating_sub(length_of_each_half)); + } + let mut first_half = format!("{}", reset_code); + let mut second_half = format!("{}", reset_code); + for (i, character) in line_to_render.chars().enumerate() { + if (truncate_start_position.is_none() && truncate_end_position.is_none()) + || Some(i) < truncate_start_position + { + if indices.contains(&i) { + first_half.push_str(&index_character_style); + first_half.push(character); + first_half.push_str(reset_code); + } else { + first_half.push_str(&non_index_character_style); + first_half.push(character); + first_half.push_str(reset_code); + } + } else if Some(i) > truncate_end_position { + if indices.contains(&i) { + second_half.push_str(&index_character_style); + second_half.push(character); + second_half.push_str(reset_code); + } else { + second_half.push_str(&non_index_character_style); + second_half.push(character); + second_half.push_str(reset_code); + } + } + } + if let Some(_truncate_start_position) = truncate_start_position { + format!( + "{}{}{}[..]{}{}{}", + first_half, reset_code, foreground_color, reset_code, second_half, reset_code + ) + } else { + format!("{}{}", first_half, reset_code) + } + } +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct ResultsOfSearch { + pub search_term: String, + pub search_results: Vec, +} + +impl ResultsOfSearch { + pub fn new(search_term: String, search_results: Vec) -> Self { + ResultsOfSearch { + search_term, + search_results, + } + } + pub fn limit_search_results(mut self, max_results: usize) -> Self { + self.search_results + .sort_by(|a, b| b.score().cmp(&a.score())); + self.search_results = if self.search_results.len() > max_results { + self.search_results.drain(..max_results).collect() + } else { + self.search_results.drain(..).collect() + }; + self + } +} + +#[derive(Default, Serialize, Deserialize)] +pub struct SearchWorker { + pub search_paths: Vec, + pub search_file_contents: Vec<(String, usize, String)>, // file_name, line_number, line + skip_hidden_files: bool, +} + +impl<'de> ZellijWorker<'de> for SearchWorker { + // TODO: handle out of order messages, likely when rendering + fn on_message(&mut self, message: String, payload: String) { + match message.as_str() { + // TODO: deserialize to type + "scan_folder" => { + self.populate_search_paths(); + post_message_to_plugin("done_scanning_folder".into(), "".into()); + }, + "search" => { + let search_term = payload; + let (search_term, matches) = self.search(search_term); + let search_results = + ResultsOfSearch::new(search_term, matches).limit_search_results(100); + post_message_to_plugin( + "update_search_results".into(), + serde_json::to_string(&search_results).unwrap(), + ); + }, + "skip_hidden_files" => match serde_json::from_str::(&payload) { + Ok(should_skip_hidden_files) => { + self.skip_hidden_files = should_skip_hidden_files; + }, + Err(e) => { + eprintln!("Failed to deserialize payload: {:?}", e); + }, + }, + _ => {}, + } + } +} + +impl SearchWorker { + fn search(&mut self, search_term: String) -> (String, Vec) { + if self.search_paths.is_empty() { + self.populate_search_paths(); + } + let mut matches = vec![]; + let mut matcher = SkimMatcherV2::default().use_cache(true).element_limit(100); // TODO: no hard + // coded limit! + self.search_file_names(&search_term, &mut matcher, &mut matches); + self.search_file_contents(&search_term, &mut matcher, &mut matches); + + // if the search term changed before we finished, let's search again! + if let Ok(current_search_term) = std::fs::read(CURRENT_SEARCH_TERM) { + let current_search_term = String::from_utf8_lossy(¤t_search_term); // TODO: not lossy, search can be lots of stuff + if current_search_term != search_term { + return self.search(current_search_term.into()); + } + } + (search_term, matches) + } + fn populate_search_paths(&mut self) { + for entry in WalkDir::new(ROOT).into_iter().filter_map(|e| e.ok()) { + if self.skip_hidden_files + && entry + .file_name() + .to_str() + .map(|s| s.starts_with('.')) + .unwrap_or(false) + { + continue; + } + let file_path = entry.path().display().to_string(); + + if entry.metadata().unwrap().is_file() { + if let Ok(file) = std::fs::File::open(&file_path) { + let lines = io::BufReader::new(file).lines(); + for (index, line) in lines.enumerate() { + match line { + Ok(line) => { + self.search_file_contents.push(( + file_path.clone(), + index + 1, + line, + )); + }, + Err(_) => { + break; // probably a binary file, skip it + }, + } + } + } + } + + self.search_paths.push(file_path); + } + } + fn search_file_names( + &self, + search_term: &str, + matcher: &mut SkimMatcherV2, + matches: &mut Vec, + ) { + for entry in &self.search_paths { + if let Some((score, indices)) = matcher.fuzzy_indices(&entry, &search_term) { + matches.push(SearchResult::new_file_name( + score, + indices, + entry.to_owned(), + )); + } + } + } + fn search_file_contents( + &self, + search_term: &str, + matcher: &mut SkimMatcherV2, + matches: &mut Vec, + ) { + for (file_name, line_number, line_entry) in &self.search_file_contents { + if let Some((score, indices)) = matcher.fuzzy_indices(&line_entry, &search_term) { + matches.push(SearchResult::new_file_line( + score, + indices, + file_name.clone(), + line_entry.clone(), + *line_number, + )); + } + } + } +} + +impl State { + pub fn render_search(&mut self, rows: usize, cols: usize) { + if let Some(search_term) = self.search_term.as_ref() { + let mut to_render = String::new(); + to_render.push_str(&format!( + " \u{1b}[38;5;51;1mSEARCH:\u{1b}[m {}\n", + search_term + )); + let mut rows_left_to_render = rows.saturating_sub(3); + if self.loading && self.search_results.is_empty() { + to_render.push_str(&self.render_loading()); + } + for (i, result) in self + .search_results + .iter() + .enumerate() + .take(rows.saturating_sub(3)) + { + let result_height = result.rendered_height(); + if result_height + 1 > rows_left_to_render { + break; + } + rows_left_to_render -= result_height; + rows_left_to_render -= 1; // space between + let is_selected = i == self.selected_search_result; + let rendered_result = result.render(cols, is_selected); + to_render.push_str(&format!("\n{}\n", rendered_result)); + } + print!("{}", to_render); + } + } + pub fn render_loading(&self) -> String { + let mut rendered = String::from("Scanning folder"); + let dot_count = self.loading_animation_offset % 4; + for _ in 0..dot_count { + rendered.push('.'); + } + rendered + } +} diff --git a/default-plugins/strider/src/state.rs b/default-plugins/strider/src/state.rs index 05ede0df..7ced4932 100644 --- a/default-plugins/strider/src/state.rs +++ b/default-plugins/strider/src/state.rs @@ -1,3 +1,4 @@ +use crate::search::SearchResult; use pretty_bytes::converter as pb; use std::{ collections::{HashMap, VecDeque}, @@ -7,7 +8,9 @@ use std::{ }; use zellij_tile::prelude::*; -const ROOT: &str = "/host"; +pub const ROOT: &str = "/host"; +pub const CURRENT_SEARCH_TERM: &str = "/data/current_search_term"; + #[derive(Default)] pub struct State { pub path: PathBuf, @@ -15,9 +18,66 @@ pub struct State { pub cursor_hist: HashMap, pub hide_hidden_files: bool, pub ev_history: VecDeque<(Event, Instant)>, // stores last event, can be expanded in future + pub search_paths: Vec, + pub search_term: Option, + pub search_results: Vec, + pub loading: bool, + pub loading_animation_offset: u8, + pub typing_search_term: bool, + pub exploring_search_results: bool, + pub selected_search_result: usize, } impl State { + pub fn append_to_search_term(&mut self, key: Key) { + match key { + Key::Char(character) => { + if let Some(search_term) = self.search_term.as_mut() { + search_term.push(character); + } + }, + Key::Backspace => { + if let Some(search_term) = self.search_term.as_mut() { + search_term.pop(); + if search_term.len() == 0 { + self.search_term = None; + self.typing_search_term = false; + } + } + }, + _ => {}, + } + } + pub fn accept_search_term(&mut self) { + self.typing_search_term = false; + self.exploring_search_results = true; + } + pub fn typing_search_term(&self) -> bool { + self.typing_search_term + } + pub fn exploring_search_results(&self) -> bool { + self.exploring_search_results + } + pub fn stop_exploring_search_results(&mut self) { + self.exploring_search_results = false; + } + pub fn start_typing_search_term(&mut self) { + if self.search_term.is_none() { + self.search_term = Some(String::new()); + } + self.typing_search_term = true; + } + pub fn stop_typing_search_term(&mut self) { + self.typing_search_term = true; + } + pub fn move_search_selection_up(&mut self) { + self.selected_search_result = self.selected_search_result.saturating_sub(1); + } + pub fn move_search_selection_down(&mut self) { + if self.selected_search_result < self.search_results.len() { + self.selected_search_result = self.selected_search_result.saturating_add(1); + } + } pub fn selected_mut(&mut self) -> &mut usize { &mut self.cursor_hist.entry(self.path.clone()).or_default().0 } @@ -44,6 +104,32 @@ impl State { } } } + pub fn open_search_result(&mut self) { + match self.search_results.get(self.selected_search_result) { + Some(SearchResult::File { + path, + score, + indices, + }) => { + let file_path = PathBuf::from(path); + open_file(file_path.strip_prefix(ROOT).unwrap()); + }, + Some(SearchResult::LineInFile { + path, + score, + indices, + line, + line_number, + }) => { + let file_path = PathBuf::from(path); + open_file_with_line(file_path.strip_prefix(ROOT).unwrap(), *line_number); + // open_file_with_line(&file_path, *line_number); // TODO: no!! + }, + None => { + eprintln!("Search result not found"); + }, + } + } } #[derive(PartialEq, Eq, PartialOrd, Ord, Clone)] diff --git a/xtask/src/ci.rs b/xtask/src/ci.rs index e34f517e..99f195da 100644 --- a/xtask/src/ci.rs +++ b/xtask/src/ci.rs @@ -4,7 +4,10 @@ use crate::{ flags::{self, CiCmd, Cross, E2e}, }; use anyhow::Context; -use std::{ffi::OsString, path::PathBuf}; +use std::{ + ffi::OsString, + path::{Path, PathBuf}, +}; use xshell::{cmd, Shell}; pub fn main(sh: &Shell, flags: flags::Ci) -> anyhow::Result<()> { @@ -105,13 +108,27 @@ fn e2e_test(sh: &Shell, args: Vec) -> anyhow::Result<()> { // set --no-default-features so the test binary gets built with the plugins from assets/plugins that just got built crate::cargo() .and_then(|cargo| { + // e2e tests cmd!( sh, "{cargo} test --no-default-features -- --ignored --nocapture --test-threads 1" ) - .args(args) + .args(args.clone()) .run() - .map_err(anyhow::Error::new) + .map_err(anyhow::Error::new)?; + + // plugin system tests are run here because they're medium-slow + let _pd = sh.push_dir(Path::new("zellij-server")); + println!(""); + let msg = format!(">> Testing Plugin System"); + crate::status(&msg); + println!("{}", msg); + + cmd!(sh, "{cargo} test -- --ignored --nocapture --test-threads 1") + .args(args.clone()) + .run() + .with_context(|| format!("Failed to run tests for the Plugin System"))?; + Ok(()) }) .context(err_context) } diff --git a/xtask/src/main.rs b/xtask/src/main.rs index e394f843..c6e75118 100644 --- a/xtask/src/main.rs +++ b/xtask/src/main.rs @@ -34,6 +34,7 @@ lazy_static::lazy_static! { WorkspaceMember{crate_name: "default-plugins/status-bar", build: true}, WorkspaceMember{crate_name: "default-plugins/strider", build: true}, WorkspaceMember{crate_name: "default-plugins/tab-bar", build: true}, + WorkspaceMember{crate_name: "default-plugins/fixture-plugin-for-tests", build: true}, WorkspaceMember{crate_name: "zellij-utils", build: false}, WorkspaceMember{crate_name: "zellij-tile-utils", build: false}, WorkspaceMember{crate_name: "zellij-tile", build: false}, diff --git a/zellij-server/Cargo.toml b/zellij-server/Cargo.toml index 0fa9fe1e..725b36fc 100644 --- a/zellij-server/Cargo.toml +++ b/zellij-server/Cargo.toml @@ -35,6 +35,8 @@ semver = "0.11.0" [dev-dependencies] insta = "1.6.0" +tempfile = "3.2.0" +wasmer = { version = "2.3.0", features = [ "singlepass" ] } [features] singlepass = ["wasmer/singlepass"] diff --git a/zellij-server/src/logging_pipe.rs b/zellij-server/src/logging_pipe.rs index 6081b283..4f35db4f 100644 --- a/zellij-server/src/logging_pipe.rs +++ b/zellij-server/src/logging_pipe.rs @@ -3,6 +3,7 @@ use std::{ io::{Read, Seek, Write}, }; +use crate::plugins::PluginId; use log::{debug, error}; use wasmer_wasi::{WasiFile, WasiFsError}; use zellij_utils::{errors::prelude::*, serde}; @@ -17,11 +18,11 @@ const ZELLIJ_MAX_PIPE_BUFFER_SIZE: usize = 16_384; pub struct LoggingPipe { buffer: VecDeque, plugin_name: String, - plugin_id: u32, + plugin_id: PluginId, } impl LoggingPipe { - pub fn new(plugin_name: &str, plugin_id: u32) -> LoggingPipe { + pub fn new(plugin_name: &str, plugin_id: PluginId) -> LoggingPipe { LoggingPipe { buffer: VecDeque::new(), plugin_name: String::from(plugin_name), diff --git a/zellij-server/src/plugins/mod.rs b/zellij-server/src/plugins/mod.rs index aaf6164b..9384b3f6 100644 --- a/zellij-server/src/plugins/mod.rs +++ b/zellij-server/src/plugins/mod.rs @@ -22,6 +22,8 @@ use zellij_utils::{ pane_size::Size, }; +pub type PluginId = u32; + #[derive(Clone, Debug)] pub enum PluginInstruction { Load( @@ -32,8 +34,8 @@ pub enum PluginInstruction { ClientId, Size, ), - Update(Vec<(Option, Option, Event)>), // Focused plugin / broadcast, client_id, event data - Unload(u32), // plugin_id + Update(Vec<(Option, Option, Event)>), // Focused plugin / broadcast, client_id, event data + Unload(PluginId), // plugin_id Reload( Option, // should float Option, // pane title @@ -41,7 +43,7 @@ pub enum PluginInstruction { usize, // tab index Size, ), - Resize(u32, usize, usize), // plugin_id, columns, rows + Resize(PluginId, usize, usize), // plugin_id, columns, rows AddClient(ClientId), RemoveClient(ClientId), NewTab( @@ -52,7 +54,23 @@ pub enum PluginInstruction { usize, // tab_index ClientId, ), - ApplyCachedEvents(Vec), // a list of plugin id + ApplyCachedEvents(Vec), + ApplyCachedWorkerMessages(PluginId), + PostMessagesToPluginWorker( + PluginId, + ClientId, + String, // worker name + Vec<( + String, // serialized message name + String, // serialized payload + )>, + ), + PostMessageToPlugin( + PluginId, + ClientId, + String, // serialized message + String, // serialized payload + ), Exit, } @@ -69,6 +87,13 @@ impl From<&PluginInstruction> for PluginContext { PluginInstruction::RemoveClient(_) => PluginContext::RemoveClient, PluginInstruction::NewTab(..) => PluginContext::NewTab, PluginInstruction::ApplyCachedEvents(..) => PluginContext::ApplyCachedEvents, + PluginInstruction::ApplyCachedWorkerMessages(..) => { + PluginContext::ApplyCachedWorkerMessages + }, + PluginInstruction::PostMessagesToPluginWorker(..) => { + PluginContext::PostMessageToPluginWorker + }, + PluginInstruction::PostMessageToPlugin(..) => PluginContext::PostMessageToPlugin, } } } @@ -163,7 +188,7 @@ pub(crate) fn plugin_thread_main( tab_index, client_id, ) => { - let mut plugin_ids: HashMap> = HashMap::new(); + let mut plugin_ids: HashMap> = HashMap::new(); let mut extracted_run_instructions = tab_layout .clone() .unwrap_or_else(|| layout.new_tab().0) @@ -199,6 +224,30 @@ pub(crate) fn plugin_thread_main( PluginInstruction::ApplyCachedEvents(plugin_id) => { wasm_bridge.apply_cached_events(plugin_id)?; }, + PluginInstruction::ApplyCachedWorkerMessages(plugin_id) => { + wasm_bridge.apply_cached_worker_messages(plugin_id)?; + }, + PluginInstruction::PostMessagesToPluginWorker( + plugin_id, + client_id, + worker_name, + messages, + ) => { + wasm_bridge.post_messages_to_plugin_worker( + plugin_id, + client_id, + worker_name, + messages, + )?; + }, + PluginInstruction::PostMessageToPlugin(plugin_id, client_id, message, payload) => { + let updates = vec![( + Some(plugin_id), + Some(client_id), + Event::CustomMessage(message, payload), + )]; + wasm_bridge.update_plugins(updates)?; + }, PluginInstruction::Exit => { wasm_bridge.cleanup(); break; @@ -218,3 +267,7 @@ pub(crate) fn plugin_thread_main( }) .context("failed to cleanup plugin data directory") } + +#[path = "./unit/plugin_tests.rs"] +#[cfg(test)] +mod plugin_tests; diff --git a/zellij-server/src/plugins/plugin_loader.rs b/zellij-server/src/plugins/plugin_loader.rs index 86afd8ca..91eb6c93 100644 --- a/zellij-server/src/plugins/plugin_loader.rs +++ b/zellij-server/src/plugins/plugin_loader.rs @@ -1,5 +1,8 @@ -use crate::plugins::plugin_map::{PluginEnv, PluginMap, RunningPlugin, Subscriptions}; +use crate::plugins::plugin_map::{ + PluginEnv, PluginMap, RunningPlugin, RunningWorker, Subscriptions, +}; use crate::plugins::zellij_exports::{wasi_read_string, zellij_exports}; +use crate::plugins::PluginId; use highway::{HighwayHash, PortableHash}; use log::info; use semver::Version; @@ -19,7 +22,7 @@ use crate::{ }; use zellij_utils::{ - consts::{VERSION, ZELLIJ_CACHE_DIR, ZELLIJ_TMP_DIR}, + consts::{VERSION, ZELLIJ_CACHE_DIR, ZELLIJ_SESSION_CACHE_DIR, ZELLIJ_TMP_DIR}, errors::prelude::*, input::plugins::PluginConfig, pane_size::Size, @@ -152,7 +155,7 @@ pub struct PluginLoader<'a> { plugin_path: PathBuf, loading_indication: &'a mut LoadingIndication, senders: ThreadSenders, - plugin_id: u32, + plugin_id: PluginId, client_id: ClientId, store: Store, plugin: PluginConfig, @@ -165,7 +168,7 @@ pub struct PluginLoader<'a> { impl<'a> PluginLoader<'a> { pub fn reload_plugin_from_memory( - plugin_id: u32, + plugin_id: PluginId, plugin_dir: PathBuf, plugin_cache: Arc>>, senders: ThreadSenders, @@ -194,9 +197,7 @@ impl<'a> PluginLoader<'a> { )?; plugin_loader .load_module_from_memory() - .and_then(|module| { - plugin_loader.create_plugin_instance_environment_and_subscriptions(module) - }) + .and_then(|module| plugin_loader.create_plugin_environment(module)) .and_then(|(instance, plugin_env, subscriptions)| { plugin_loader.load_plugin_instance( &instance, @@ -214,7 +215,7 @@ impl<'a> PluginLoader<'a> { } pub fn start_plugin( - plugin_id: u32, + plugin_id: PluginId, client_id: ClientId, plugin: &PluginConfig, tab_index: usize, @@ -227,7 +228,7 @@ impl<'a> PluginLoader<'a> { connected_clients: Arc>>, loading_indication: &mut LoadingIndication, ) -> Result<()> { - let err_context = || format!("failed to start plugin {plugin:#?} for client {client_id}"); + let err_context = || format!("failed to start plugin {plugin_id} for client {client_id}"); let mut plugin_loader = PluginLoader::new( &plugin_cache, loading_indication, @@ -244,9 +245,7 @@ impl<'a> PluginLoader<'a> { .load_module_from_memory() .or_else(|_e| plugin_loader.load_module_from_hd_cache()) .or_else(|_e| plugin_loader.compile_module()) - .and_then(|module| { - plugin_loader.create_plugin_instance_environment_and_subscriptions(module) - }) + .and_then(|module| plugin_loader.create_plugin_environment(module)) .and_then(|(instance, plugin_env, subscriptions)| { plugin_loader.load_plugin_instance( &instance, @@ -276,7 +275,7 @@ impl<'a> PluginLoader<'a> { loading_indication: &mut LoadingIndication, ) -> Result<()> { let mut new_plugins = HashSet::new(); - for (&(plugin_id, _), _) in &*plugin_map.lock().unwrap() { + for plugin_id in plugin_map.lock().unwrap().plugin_ids() { new_plugins.insert((plugin_id, client_id)); } for (plugin_id, existing_client_id) in new_plugins { @@ -292,9 +291,7 @@ impl<'a> PluginLoader<'a> { )?; plugin_loader .load_module_from_memory() - .and_then(|module| { - plugin_loader.create_plugin_instance_environment_and_subscriptions(module) - }) + .and_then(|module| plugin_loader.create_plugin_environment(module)) .and_then(|(instance, plugin_env, subscriptions)| { plugin_loader.load_plugin_instance( &instance, @@ -309,7 +306,7 @@ impl<'a> PluginLoader<'a> { } pub fn reload_plugin( - plugin_id: u32, + plugin_id: PluginId, plugin_dir: PathBuf, plugin_cache: Arc>>, senders: ThreadSenders, @@ -339,9 +336,7 @@ impl<'a> PluginLoader<'a> { )?; plugin_loader .compile_module() - .and_then(|module| { - plugin_loader.create_plugin_instance_environment_and_subscriptions(module) - }) + .and_then(|module| plugin_loader.create_plugin_environment(module)) .and_then(|(instance, plugin_env, subscriptions)| { plugin_loader.load_plugin_instance( &instance, @@ -361,7 +356,7 @@ impl<'a> PluginLoader<'a> { plugin_cache: &Arc>>, loading_indication: &'a mut LoadingIndication, senders: &ThreadSenders, - plugin_id: u32, + plugin_id: PluginId, client_id: ClientId, store: &Store, plugin: PluginConfig, @@ -369,7 +364,9 @@ impl<'a> PluginLoader<'a> { tab_index: usize, size: Size, ) -> Result { - let plugin_own_data_dir = ZELLIJ_CACHE_DIR.join(Url::from(&plugin.location).to_string()); + let plugin_own_data_dir = ZELLIJ_SESSION_CACHE_DIR + .join(Url::from(&plugin.location).to_string()) + .join(format!("{}-{}", plugin_id, client_id)); create_plugin_fs_entries(&plugin_own_data_dir)?; let plugin_path = plugin.path.clone(); Ok(PluginLoader { @@ -393,16 +390,16 @@ impl<'a> PluginLoader<'a> { plugin_map: &Arc>, loading_indication: &'a mut LoadingIndication, senders: &ThreadSenders, - plugin_id: u32, + plugin_id: PluginId, client_id: ClientId, store: &Store, plugin_dir: &'a PathBuf, ) -> Result { let err_context = || "Failed to find existing plugin"; - let (running_plugin, _subscriptions) = { + let (running_plugin, _subscriptions, _workers) = { let mut plugin_map = plugin_map.lock().unwrap(); plugin_map - .remove(&(plugin_id, client_id)) + .remove_single_plugin(plugin_id, client_id) .with_context(err_context)? }; let running_plugin = running_plugin.lock().unwrap(); @@ -431,19 +428,17 @@ impl<'a> PluginLoader<'a> { plugin_map: &Arc>, loading_indication: &'a mut LoadingIndication, senders: &ThreadSenders, - plugin_id: u32, + plugin_id: PluginId, client_id: ClientId, store: &Store, plugin_dir: &'a PathBuf, ) -> Result { let err_context = || "Failed to find existing plugin"; - let (running_plugin, _subscriptions) = { + let running_plugin = { let plugin_map = plugin_map.lock().unwrap(); plugin_map - .iter() - .find(|((p_id, _c_id), _)| p_id == &plugin_id) + .get_running_plugin(plugin_id, None) .with_context(err_context)? - .1 .clone() }; let running_plugin = running_plugin.lock().unwrap(); @@ -553,49 +548,13 @@ impl<'a> PluginLoader<'a> { .with_context(err_context)?; Ok(module) } - pub fn create_plugin_instance_environment_and_subscriptions( + pub fn create_plugin_environment( &mut self, module: Module, ) -> Result<(Instance, PluginEnv, Arc>)> { - let err_context = || { - format!( - "Failed to create instance and plugin env for plugin {}", - self.plugin_id - ) - }; - let mut wasi_env = WasiState::new("Zellij") - .env("CLICOLOR_FORCE", "1") - .map_dir("/host", ".") - .and_then(|wasi| wasi.map_dir("/data", &self.plugin_own_data_dir)) - .and_then(|wasi| wasi.map_dir("/tmp", ZELLIJ_TMP_DIR.as_path())) - .and_then(|wasi| { - wasi.stdin(Box::new(Pipe::new())) - .stdout(Box::new(Pipe::new())) - .stderr(Box::new(LoggingPipe::new( - &self.plugin.location.to_string(), - self.plugin_id, - ))) - .finalize() - }) - .with_context(err_context)?; - let wasi = wasi_env.import_object(&module).with_context(err_context)?; - - let mut mut_plugin = self.plugin.clone(); - mut_plugin.set_tab_index(self.tab_index); - let plugin_env = PluginEnv { - plugin_id: self.plugin_id, - client_id: self.client_id, - plugin: mut_plugin, - senders: self.senders.clone(), - wasi_env, - plugin_own_data_dir: self.plugin_own_data_dir.clone(), - tab_index: self.tab_index, - }; - - let subscriptions = Arc::new(Mutex::new(HashSet::new())); - let zellij = zellij_exports(&self.store, &plugin_env, &subscriptions); - let instance = - Instance::new(&module, &zellij.chain_back(wasi)).with_context(err_context)?; + let err_context = || format!("Failed to create environment for plugin"); + let (instance, plugin_env, subscriptions) = + self.create_plugin_instance_env_and_subscriptions(&module)?; assert_plugin_version(&instance, &plugin_env).with_context(err_context)?; // Only do an insert when everything went well! let cloned_plugin = self.plugin.clone(); @@ -605,6 +564,26 @@ impl<'a> PluginLoader<'a> { .insert(cloned_plugin.path, module); Ok((instance, plugin_env, subscriptions)) } + pub fn create_plugin_instance_and_wasi_env_for_worker( + &mut self, + ) -> Result<(Instance, PluginEnv)> { + let err_context = || { + format!( + "Failed to create instance and plugin env for worker {}", + self.plugin_id + ) + }; + let module = self + .plugin_cache + .lock() + .unwrap() + .get(&self.plugin.path) + .with_context(err_context)? + .clone(); + let (instance, plugin_env, _subscriptions) = + self.create_plugin_instance_env_and_subscriptions(&module)?; + Ok((instance, plugin_env)) + } pub fn load_plugin_instance( &mut self, instance: &Instance, @@ -621,11 +600,35 @@ impl<'a> PluginLoader<'a> { self.senders, self.plugin_id ); - let load_function = instance + let start_function = instance .exports .get_function("_start") .with_context(err_context)?; - // This eventually calls the `.load()` method + let load_function = instance + .exports + .get_function("load") + .with_context(err_context)?; + let mut workers = HashMap::new(); + for (function_name, _exported_function) in instance.exports.iter().functions() { + if function_name.ends_with("_worker") { + let plugin_config = self.plugin.clone(); + let (instance, plugin_env) = + self.create_plugin_instance_and_wasi_env_for_worker()?; + + let start_function_for_worker = instance + .exports + .get_function("_start") + .with_context(err_context)?; + start_function_for_worker + .call(&[]) + .with_context(err_context)?; + + let worker = + RunningWorker::new(instance, &function_name, plugin_config, plugin_env); + workers.insert(function_name.into(), Arc::new(Mutex::new(worker))); + } + } + start_function.call(&[]).with_context(err_context)?; load_function.call(&[]).with_context(err_context)?; display_loading_stage!( indicate_starting_plugin_success, @@ -640,16 +643,16 @@ impl<'a> PluginLoader<'a> { self.plugin_id ); plugin_map.lock().unwrap().insert( - (self.plugin_id, self.client_id), - ( - Arc::new(Mutex::new(RunningPlugin::new( - main_user_instance, - main_user_env, - self.size.rows, - self.size.cols, - ))), - subscriptions.clone(), - ), + self.plugin_id, + self.client_id, + Arc::new(Mutex::new(RunningPlugin::new( + main_user_instance, + main_user_env, + self.size.rows, + self.size.cols, + ))), + subscriptions.clone(), + workers, ); display_loading_stage!( indicate_writing_plugin_to_cache_success, @@ -672,6 +675,10 @@ impl<'a> PluginLoader<'a> { self.plugin_id ); for client_id in connected_clients { + if client_id == &self.client_id { + // don't reload the plugin once more for ourselves + continue; + } let mut loading_indication = LoadingIndication::new("".into()); let mut plugin_loader_for_client = PluginLoader::new_from_different_client_id( &self.plugin_cache.clone(), @@ -685,10 +692,7 @@ impl<'a> PluginLoader<'a> { )?; plugin_loader_for_client .load_module_from_memory() - .and_then(|module| { - plugin_loader_for_client - .create_plugin_instance_environment_and_subscriptions(module) - }) + .and_then(|module| plugin_loader_for_client.create_plugin_environment(module)) .and_then(|(instance, plugin_env, subscriptions)| { plugin_loader_for_client.load_plugin_instance( &instance, @@ -730,6 +734,51 @@ impl<'a> PluginLoader<'a> { }, } } + fn create_plugin_instance_env_and_subscriptions( + &self, + module: &Module, + ) -> Result<(Instance, PluginEnv, Arc>)> { + let err_context = || { + format!( + "Failed to create instance, plugin env and subscriptions for plugin {}", + self.plugin_id + ) + }; + let mut wasi_env = WasiState::new("Zellij") + .env("CLICOLOR_FORCE", "1") + .map_dir("/host", ".") + .and_then(|wasi| wasi.map_dir("/data", &self.plugin_own_data_dir)) + .and_then(|wasi| wasi.map_dir("/tmp", ZELLIJ_TMP_DIR.as_path())) + .and_then(|wasi| { + wasi.stdin(Box::new(Pipe::new())) + .stdout(Box::new(Pipe::new())) + .stderr(Box::new(LoggingPipe::new( + &self.plugin.location.to_string(), + self.plugin_id, + ))) + .finalize() + }) + .with_context(err_context)?; + let wasi = wasi_env.import_object(&module).with_context(err_context)?; + + let mut mut_plugin = self.plugin.clone(); + mut_plugin.set_tab_index(self.tab_index); + let plugin_env = PluginEnv { + plugin_id: self.plugin_id, + client_id: self.client_id, + plugin: mut_plugin, + senders: self.senders.clone(), + wasi_env, + plugin_own_data_dir: self.plugin_own_data_dir.clone(), + tab_index: self.tab_index, + }; + + let subscriptions = Arc::new(Mutex::new(HashSet::new())); + let zellij = zellij_exports(&self.store, &plugin_env, &subscriptions); + let instance = + Instance::new(&module, &zellij.chain_back(wasi)).with_context(err_context)?; + Ok((instance, plugin_env, subscriptions)) + } } fn create_plugin_fs_entries(plugin_own_data_dir: &PathBuf) -> Result<()> { diff --git a/zellij-server/src/plugins/plugin_map.rs b/zellij-server/src/plugins/plugin_map.rs index 4cb5ebc9..0c3df931 100644 --- a/zellij-server/src/plugins/plugin_map.rs +++ b/zellij-server/src/plugins/plugin_map.rs @@ -1,4 +1,6 @@ -use crate::plugins::wasm_bridge::PluginId; +use crate::plugins::plugin_loader::{PluginLoader, VersionMismatchError}; +use crate::plugins::zellij_exports::wasi_write_object; +use crate::plugins::PluginId; use std::{ collections::{HashMap, HashSet}, path::PathBuf, @@ -9,20 +11,183 @@ use wasmer_wasi::WasiEnv; use crate::{thread_bus::ThreadSenders, ClientId}; -use zellij_utils::{data::EventType, input::plugins::PluginConfig}; +use zellij_utils::errors::prelude::*; +use zellij_utils::{ + consts::VERSION, data::EventType, input::layout::RunPluginLocation, + input::plugins::PluginConfig, +}; // the idea here is to provide atomicity when adding/removing plugins from the map (eg. when a new // client connects) but to also allow updates/renders not to block each other // so when adding/removing from the map - everything is halted, that's life // but when cloning the internal RunningPlugin and Subscriptions atomics, we can call methods on // them without blocking other instances -pub type PluginMap = - HashMap<(PluginId, ClientId), (Arc>, Arc>)>; +#[derive(Default)] +pub struct PluginMap { + plugin_assets: HashMap< + (PluginId, ClientId), + ( + Arc>, + Arc>, + HashMap>>, + ), + >, +} + +impl PluginMap { + pub fn remove_plugins( + &mut self, + pid: PluginId, + ) -> Vec<( + Arc>, + Arc>, + HashMap>>, + )> { + let mut removed = vec![]; + let ids_in_plugin_map: Vec<(PluginId, ClientId)> = + self.plugin_assets.keys().copied().collect(); + for (plugin_id, client_id) in ids_in_plugin_map { + if pid == plugin_id { + if let Some(plugin_asset) = self.plugin_assets.remove(&(plugin_id, client_id)) { + removed.push(plugin_asset); + } + } + } + removed + } + pub fn remove_single_plugin( + &mut self, + plugin_id: PluginId, + client_id: ClientId, + ) -> Option<( + Arc>, + Arc>, + HashMap>>, + )> { + self.plugin_assets.remove(&(plugin_id, client_id)) + } + pub fn plugin_ids(&self) -> Vec { + let mut unique_plugins: HashSet = self + .plugin_assets + .keys() + .map(|(plugin_id, _client_id)| *plugin_id) + .collect(); + unique_plugins.drain().into_iter().collect() + } + pub fn running_plugins(&mut self) -> Vec<(PluginId, ClientId, Arc>)> { + self.plugin_assets + .iter() + .map(|((plugin_id, client_id), (running_plugin, _, _))| { + (*plugin_id, *client_id, running_plugin.clone()) + }) + .collect() + } + pub fn running_plugins_and_subscriptions( + &mut self, + ) -> Vec<( + PluginId, + ClientId, + Arc>, + Arc>, + )> { + self.plugin_assets + .iter() + .map( + |((plugin_id, client_id), (running_plugin, subscriptions, _))| { + ( + *plugin_id, + *client_id, + running_plugin.clone(), + subscriptions.clone(), + ) + }, + ) + .collect() + } + pub fn get_running_plugin_and_subscriptions( + &self, + plugin_id: PluginId, + client_id: ClientId, + ) -> Option<(Arc>, Arc>)> { + self.plugin_assets.get(&(plugin_id, client_id)).and_then( + |(running_plugin, subscriptions, _)| { + Some((running_plugin.clone(), subscriptions.clone())) + }, + ) + } + pub fn get_running_plugin( + &self, + plugin_id: PluginId, + client_id: Option, + ) -> Option>> { + match client_id { + Some(client_id) => self + .plugin_assets + .get(&(plugin_id, client_id)) + .and_then(|(running_plugin, _, _)| Some(running_plugin.clone())), + None => self + .plugin_assets + .iter() + .find(|((p_id, _), _)| *p_id == plugin_id) + .and_then(|(_, (running_plugin, _, _))| Some(running_plugin.clone())), + } + } + pub fn clone_worker( + &self, + plugin_id: PluginId, + client_id: ClientId, + worker_name: &str, + ) -> Option>> { + self.plugin_assets + .iter() + .find(|((p_id, c_id), _)| p_id == &plugin_id && c_id == &client_id) + .and_then(|(_, (_running_plugin, _subscriptions, workers))| { + if let Some(worker) = workers.get(&format!("{}_worker", worker_name)) { + Some(worker.clone()) + } else { + None + } + }) + .clone() + } + pub fn all_plugin_ids_for_plugin_location( + &self, + plugin_location: &RunPluginLocation, + ) -> Result> { + let err_context = || format!("Failed to get plugin ids for location {plugin_location}"); + let plugin_ids: Vec = self + .plugin_assets + .iter() + .filter(|(_, (running_plugin, _subscriptions, _workers))| { + &running_plugin.lock().unwrap().plugin_env.plugin.location == plugin_location + }) + .map(|((plugin_id, _client_id), _)| *plugin_id) + .collect(); + if plugin_ids.is_empty() { + return Err(ZellijError::PluginDoesNotExist).with_context(err_context); + } + Ok(plugin_ids) + } + pub fn insert( + &mut self, + plugin_id: PluginId, + client_id: ClientId, + running_plugin: Arc>, + subscriptions: Arc>, + running_workers: HashMap>>, + ) { + self.plugin_assets.insert( + (plugin_id, client_id), + (running_plugin, subscriptions, running_workers), + ); + } +} + pub type Subscriptions = HashSet; #[derive(Clone)] pub struct PluginEnv { - pub plugin_id: u32, + pub plugin_id: PluginId, pub plugin: PluginConfig, pub senders: ThreadSenders, pub wasi_env: WasiEnv, @@ -69,7 +234,6 @@ impl RunningPlugin { } } pub fn next_event_id(&mut self, atomic_event: AtomicEvent) -> usize { - // TODO: probably not usize... let current_event_id = *self.next_event_ids.get(&atomic_event).unwrap_or(&0); if current_event_id < usize::MAX { let next_event_id = current_event_id + 1; @@ -92,3 +256,53 @@ impl RunningPlugin { } } } + +pub struct RunningWorker { + pub instance: Instance, + pub name: String, + pub plugin_config: PluginConfig, + pub plugin_env: PluginEnv, +} + +impl RunningWorker { + pub fn new( + instance: Instance, + name: &str, + plugin_config: PluginConfig, + plugin_env: PluginEnv, + ) -> Self { + RunningWorker { + instance, + name: name.into(), + plugin_config, + plugin_env, + } + } + pub fn send_message(&self, message: String, payload: String) -> Result<()> { + let err_context = || format!("Failed to send message to worker"); + + let work_function = self + .instance + .exports + .get_function(&self.name) + .with_context(err_context)?; + wasi_write_object(&self.plugin_env.wasi_env, &(message, payload)) + .with_context(err_context)?; + work_function.call(&[]).or_else::(|e| { + match e.downcast::() { + Ok(_) => panic!( + "{}", + anyError::new(VersionMismatchError::new( + VERSION, + "Unavailable", + &self.plugin_config.path, + self.plugin_config.is_builtin(), + )) + ), + Err(e) => Err(e).with_context(err_context), + } + })?; + + Ok(()) + } +} diff --git a/zellij-server/src/plugins/unit/plugin_tests.rs b/zellij-server/src/plugins/unit/plugin_tests.rs new file mode 100644 index 00000000..d1e79a5f --- /dev/null +++ b/zellij-server/src/plugins/unit/plugin_tests.rs @@ -0,0 +1,320 @@ +use super::plugin_thread_main; +use crate::screen::ScreenInstruction; +use crate::{channels::SenderWithContext, thread_bus::Bus, ServerInstruction}; +use insta::assert_snapshot; +use std::path::PathBuf; +use tempfile::tempdir; +use wasmer::Store; +use zellij_utils::data::Event; +use zellij_utils::errors::ErrorContext; +use zellij_utils::input::layout::{Layout, RunPlugin, RunPluginLocation}; +use zellij_utils::input::plugins::PluginsConfig; +use zellij_utils::lazy_static::lazy_static; +use zellij_utils::pane_size::Size; + +use crate::background_jobs::BackgroundJob; +use crate::pty_writer::PtyWriteInstruction; +use std::env::set_var; +use std::sync::{Arc, Mutex}; + +use crate::{plugins::PluginInstruction, pty::PtyInstruction}; + +use zellij_utils::channels::{self, ChannelWithContext, Receiver}; + +macro_rules! log_actions_in_thread { + ( $arc_mutex_log:expr, $exit_event:path, $receiver:expr, $exit_after_count:expr ) => { + std::thread::Builder::new() + .name("logger thread".to_string()) + .spawn({ + let log = $arc_mutex_log.clone(); + let mut exit_event_count = 0; + move || loop { + let (event, _err_ctx) = $receiver + .recv() + .expect("failed to receive event on channel"); + match event { + $exit_event(..) => { + exit_event_count += 1; + log.lock().unwrap().push(event); + if exit_event_count == $exit_after_count { + break; + } + }, + _ => { + log.lock().unwrap().push(event); + }, + } + } + }) + .unwrap() + }; +} + +fn create_plugin_thread() -> ( + SenderWithContext, + Receiver<(ScreenInstruction, ErrorContext)>, + Box, +) { + let (to_server, _server_receiver): ChannelWithContext = + channels::bounded(50); + let to_server = SenderWithContext::new(to_server); + + let (to_screen, screen_receiver): ChannelWithContext = channels::unbounded(); + let to_screen = SenderWithContext::new(to_screen); + + let (to_plugin, plugin_receiver): ChannelWithContext = channels::unbounded(); + let to_plugin = SenderWithContext::new(to_plugin); + let (to_pty, _pty_receiver): ChannelWithContext = channels::unbounded(); + let to_pty = SenderWithContext::new(to_pty); + + let (to_pty_writer, _pty_writer_receiver): ChannelWithContext = + channels::unbounded(); + let to_pty_writer = SenderWithContext::new(to_pty_writer); + + let (to_background_jobs, _background_jobs_receiver): ChannelWithContext = + channels::unbounded(); + let to_background_jobs = SenderWithContext::new(to_background_jobs); + + let plugin_bus = Bus::new( + vec![plugin_receiver], + Some(&to_screen), + Some(&to_pty), + Some(&to_plugin), + Some(&to_server), + Some(&to_pty_writer), + Some(&to_background_jobs), + None, + ) + .should_silently_fail(); + let store = Store::new(&wasmer::Universal::new(wasmer::Singlepass::default()).engine()); + let data_dir = PathBuf::from(tempdir().unwrap().path()); + let _plugin_thread = std::thread::Builder::new() + .name("plugin_thread".to_string()) + .spawn(move || { + set_var("ZELLIJ_SESSION_NAME", "zellij-test"); + plugin_thread_main( + plugin_bus, + store, + data_dir, + PluginsConfig::default(), + Box::new(Layout::default()), + ) + .expect("TEST") + }) + .unwrap(); + let teardown = { + let to_plugin = to_plugin.clone(); + move || { + let _ = to_pty.send(PtyInstruction::Exit); + let _ = to_pty_writer.send(PtyWriteInstruction::Exit); + let _ = to_screen.send(ScreenInstruction::Exit); + let _ = to_server.send(ServerInstruction::KillSession); + let _ = to_plugin.send(PluginInstruction::Exit); + } + }; + (to_plugin, screen_receiver, Box::new(teardown)) +} + +lazy_static! { + static ref PLUGIN_FIXTURE: String = format!( + // to populate this file, make sure to run the build-e2e CI job + // (or compile the fixture plugin and copy the resulting .wasm blob to the below location) + "{}/../target/e2e-data/plugins/fixture-plugin-for-tests.wasm", + std::env::var_os("CARGO_MANIFEST_DIR") + .unwrap() + .to_string_lossy() + ); +} + +#[test] +#[ignore] +pub fn load_new_plugin_from_hd() { + // here we load our fixture plugin into the plugin thread, and then send it an update message + // expecting tha thte plugin will log the received event and render it later after the update + // message (this is what the fixture plugin does) + // we then listen on our mock screen receiver to make sure we got a PluginBytes instruction + // that contains said render, and assert against it + let (plugin_thread_sender, screen_receiver, mut teardown) = create_plugin_thread(); + let plugin_should_float = Some(false); + let plugin_title = Some("test_plugin".to_owned()); + let run_plugin = RunPlugin { + _allow_exec_host_cmd: false, + location: RunPluginLocation::File(PathBuf::from(&*PLUGIN_FIXTURE)), + }; + let tab_index = 1; + let client_id = 1; + let size = Size { + cols: 121, + rows: 20, + }; + let received_screen_instructions = Arc::new(Mutex::new(vec![])); + let screen_thread = log_actions_in_thread!( + received_screen_instructions, + ScreenInstruction::PluginBytes, + screen_receiver, + 2 + ); + + let _ = plugin_thread_sender.send(PluginInstruction::AddClient(client_id)); + let _ = plugin_thread_sender.send(PluginInstruction::Load( + plugin_should_float, + plugin_title, + run_plugin, + tab_index, + client_id, + size, + )); + let _ = plugin_thread_sender.send(PluginInstruction::Update(vec![( + None, + Some(client_id), + Event::InputReceived, + )])); // will be cached and sent to the plugin once it's loaded + screen_thread.join().unwrap(); // this might take a while if the cache is cold + teardown(); + let plugin_bytes_event = received_screen_instructions + .lock() + .unwrap() + .iter() + .find_map(|i| { + if let ScreenInstruction::PluginBytes(plugin_bytes) = i { + for (plugin_id, client_id, plugin_bytes) in plugin_bytes { + let plugin_bytes = String::from_utf8_lossy(plugin_bytes).to_string(); + if plugin_bytes.contains("InputReceived") { + return Some((*plugin_id, *client_id, plugin_bytes)); + } + } + } + None + }); + assert_snapshot!(format!("{:#?}", plugin_bytes_event)); +} + +#[test] +#[ignore] +pub fn plugin_workers() { + let (plugin_thread_sender, screen_receiver, mut teardown) = create_plugin_thread(); + let plugin_should_float = Some(false); + let plugin_title = Some("test_plugin".to_owned()); + let run_plugin = RunPlugin { + _allow_exec_host_cmd: false, + location: RunPluginLocation::File(PathBuf::from(&*PLUGIN_FIXTURE)), + }; + let tab_index = 1; + let client_id = 1; + let size = Size { + cols: 121, + rows: 20, + }; + let received_screen_instructions = Arc::new(Mutex::new(vec![])); + let screen_thread = log_actions_in_thread!( + received_screen_instructions, + ScreenInstruction::PluginBytes, + screen_receiver, + 3 + ); + + let _ = plugin_thread_sender.send(PluginInstruction::AddClient(client_id)); + let _ = plugin_thread_sender.send(PluginInstruction::Load( + plugin_should_float, + plugin_title, + run_plugin, + tab_index, + client_id, + size, + )); + // we send a SystemClipboardFailure to trigger the custom handler in the fixture plugin that + // will send a message to the worker and in turn back to the plugin to be rendered, so we know + // that this cycle is working + let _ = plugin_thread_sender.send(PluginInstruction::Update(vec![( + None, + Some(client_id), + Event::SystemClipboardFailure, + )])); // will be cached and sent to the plugin once it's loaded + screen_thread.join().unwrap(); // this might take a while if the cache is cold + teardown(); + let plugin_bytes_event = received_screen_instructions + .lock() + .unwrap() + .iter() + .find_map(|i| { + if let ScreenInstruction::PluginBytes(plugin_bytes) = i { + for (plugin_id, client_id, plugin_bytes) in plugin_bytes { + let plugin_bytes = String::from_utf8_lossy(plugin_bytes).to_string(); + if plugin_bytes.contains("Payload from worker") { + return Some((*plugin_id, *client_id, plugin_bytes)); + } + } + } + None + }); + assert_snapshot!(format!("{:#?}", plugin_bytes_event)); +} + +#[test] +#[ignore] +pub fn plugin_workers_persist_state() { + let (plugin_thread_sender, screen_receiver, mut teardown) = create_plugin_thread(); + let plugin_should_float = Some(false); + let plugin_title = Some("test_plugin".to_owned()); + let run_plugin = RunPlugin { + _allow_exec_host_cmd: false, + location: RunPluginLocation::File(PathBuf::from(&*PLUGIN_FIXTURE)), + }; + let tab_index = 1; + let client_id = 1; + let size = Size { + cols: 121, + rows: 20, + }; + let received_screen_instructions = Arc::new(Mutex::new(vec![])); + let screen_thread = log_actions_in_thread!( + received_screen_instructions, + ScreenInstruction::PluginBytes, + screen_receiver, + 5 + ); + + let _ = plugin_thread_sender.send(PluginInstruction::AddClient(client_id)); + let _ = plugin_thread_sender.send(PluginInstruction::Load( + plugin_should_float, + plugin_title, + run_plugin, + tab_index, + client_id, + size, + )); + // we send a SystemClipboardFailure to trigger the custom handler in the fixture plugin that + // will send a message to the worker and in turn back to the plugin to be rendered, so we know + // that this cycle is working + // we do this a second time so that the worker will log the first message on its own state and + // then send us the "received 2 messages" indication we check for below, letting us know it + // managed to persist its own state and act upon it + let _ = plugin_thread_sender.send(PluginInstruction::Update(vec![( + None, + Some(client_id), + Event::SystemClipboardFailure, + )])); + let _ = plugin_thread_sender.send(PluginInstruction::Update(vec![( + None, + Some(client_id), + Event::SystemClipboardFailure, + )])); + screen_thread.join().unwrap(); // this might take a while if the cache is cold + teardown(); + let plugin_bytes_event = received_screen_instructions + .lock() + .unwrap() + .iter() + .find_map(|i| { + if let ScreenInstruction::PluginBytes(plugin_bytes) = i { + for (plugin_id, client_id, plugin_bytes) in plugin_bytes { + let plugin_bytes = String::from_utf8_lossy(plugin_bytes).to_string(); + if plugin_bytes.contains("received 2 messages") { + return Some((*plugin_id, *client_id, plugin_bytes)); + } + } + } + None + }); + assert_snapshot!(format!("{:#?}", plugin_bytes_event)); +} diff --git a/zellij-server/src/plugins/unit/snapshots/zellij_server__plugins__plugin_tests__load_new_plugin_from_hd.snap b/zellij-server/src/plugins/unit/snapshots/zellij_server__plugins__plugin_tests__load_new_plugin_from_hd.snap new file mode 100644 index 00000000..18c7e537 --- /dev/null +++ b/zellij-server/src/plugins/unit/snapshots/zellij_server__plugins__plugin_tests__load_new_plugin_from_hd.snap @@ -0,0 +1,12 @@ +--- +source: zellij-server/src/plugins/./unit/plugin_tests.rs +assertion_line: 744 +expression: "format!(\"{:#?}\", plugin_bytes_event)" +--- +Some( + ( + 0, + 1, + "Rows: 20, Cols: 121, Received events: [InputReceived]\n\r", + ), +) diff --git a/zellij-server/src/plugins/unit/snapshots/zellij_server__plugins__plugin_tests__plugin_workers.snap b/zellij-server/src/plugins/unit/snapshots/zellij_server__plugins__plugin_tests__plugin_workers.snap new file mode 100644 index 00000000..91417b76 --- /dev/null +++ b/zellij-server/src/plugins/unit/snapshots/zellij_server__plugins__plugin_tests__plugin_workers.snap @@ -0,0 +1,12 @@ +--- +source: zellij-server/src/plugins/./unit/plugin_tests.rs +assertion_line: 250 +expression: "format!(\"{:#?}\", plugin_bytes_event)" +--- +Some( + ( + 0, + 1, + "Payload from worker: \"gimme_back_my_payload, received 1 messages\"\n\r", + ), +) diff --git a/zellij-server/src/plugins/unit/snapshots/zellij_server__plugins__plugin_tests__plugin_workers_persist_state.snap b/zellij-server/src/plugins/unit/snapshots/zellij_server__plugins__plugin_tests__plugin_workers_persist_state.snap new file mode 100644 index 00000000..51693505 --- /dev/null +++ b/zellij-server/src/plugins/unit/snapshots/zellij_server__plugins__plugin_tests__plugin_workers_persist_state.snap @@ -0,0 +1,12 @@ +--- +source: zellij-server/src/plugins/./unit/plugin_tests.rs +assertion_line: 319 +expression: "format!(\"{:#?}\", plugin_bytes_event)" +--- +Some( + ( + 0, + 1, + "Payload from worker: \"gimme_back_my_payload, received 2 messages\"\n\r", + ), +) diff --git a/zellij-server/src/plugins/wasm_bridge.rs b/zellij-server/src/plugins/wasm_bridge.rs index 45a516d3..0015552c 100644 --- a/zellij-server/src/plugins/wasm_bridge.rs +++ b/zellij-server/src/plugins/wasm_bridge.rs @@ -1,14 +1,15 @@ -use super::PluginInstruction; +use super::{PluginId, PluginInstruction}; use crate::plugins::plugin_loader::{PluginLoader, VersionMismatchError}; -use crate::plugins::plugin_map::{AtomicEvent, PluginEnv, PluginMap}; +use crate::plugins::plugin_map::{ + AtomicEvent, PluginEnv, PluginMap, RunningPlugin, RunningWorker, Subscriptions, +}; use crate::plugins::zellij_exports::{wasi_read_string, wasi_write_object}; use log::info; use std::{ collections::{HashMap, HashSet}, - fmt::Display, path::PathBuf, str::FromStr, - sync::{Arc, Mutex}, + sync::{Arc, Mutex, TryLockError}, }; use wasmer::{Instance, Module, Store, Value}; use zellij_utils::async_std::task::{self, JoinHandle}; @@ -22,7 +23,6 @@ use zellij_utils::{ consts::VERSION, data::{Event, EventType}, errors::prelude::*, - errors::ZellijError, input::{ layout::{RunPlugin, RunPluginLocation}, plugins::PluginsConfig, @@ -30,7 +30,7 @@ use zellij_utils::{ pane_size::Size, }; -pub type PluginId = u32; +const RETRY_INTERVAL_MS: u64 = 100; pub struct WasmBridge { connected_clients: Arc>>, @@ -40,10 +40,14 @@ pub struct WasmBridge { plugin_dir: PathBuf, plugin_cache: Arc>>, plugin_map: Arc>, - next_plugin_id: u32, - cached_events_for_pending_plugins: HashMap>, // u32 is the plugin id - cached_resizes_for_pending_plugins: HashMap, // (rows, columns) - loading_plugins: HashMap<(u32, RunPlugin), JoinHandle<()>>, // plugin_id to join-handle + next_plugin_id: PluginId, + cached_events_for_pending_plugins: HashMap>, + cached_resizes_for_pending_plugins: HashMap, // (rows, columns) + cached_worker_messages: HashMap>, // Vec + loading_plugins: HashMap<(PluginId, RunPlugin), JoinHandle<()>>, // plugin_id to join-handle pending_plugin_reloads: HashSet, } @@ -54,7 +58,7 @@ impl WasmBridge { store: Store, plugin_dir: PathBuf, ) -> Self { - let plugin_map = Arc::new(Mutex::new(HashMap::new())); + let plugin_map = Arc::new(Mutex::new(PluginMap::default())); let connected_clients: Arc>> = Arc::new(Mutex::new(vec![])); let plugin_cache: Arc>> = Arc::new(Mutex::new(HashMap::new())); @@ -69,6 +73,7 @@ impl WasmBridge { next_plugin_id: 0, cached_events_for_pending_plugins: HashMap::new(), cached_resizes_for_pending_plugins: HashMap::new(), + cached_worker_messages: HashMap::new(), loading_plugins: HashMap::new(), pending_plugin_reloads: HashSet::new(), } @@ -79,7 +84,7 @@ impl WasmBridge { tab_index: usize, size: Size, client_id: Option, - ) -> Result { + ) -> Result { // returns the plugin id let err_context = move || format!("failed to load plugin"); @@ -152,14 +157,14 @@ impl WasmBridge { self.next_plugin_id += 1; Ok(plugin_id) } - pub fn unload_plugin(&mut self, pid: u32) -> Result<()> { + pub fn unload_plugin(&mut self, pid: PluginId) -> Result<()> { info!("Bye from plugin {}", &pid); - // TODO: remove plugin's own data directory let mut plugin_map = self.plugin_map.lock().unwrap(); - let ids_in_plugin_map: Vec<(u32, ClientId)> = plugin_map.keys().copied().collect(); - for (plugin_id, client_id) in ids_in_plugin_map { - if pid == plugin_id { - drop(plugin_map.remove(&(plugin_id, client_id))); + for (running_plugin, _, _) in plugin_map.remove_plugins(pid) { + let running_plugin = running_plugin.lock().unwrap(); + let cache_dir = running_plugin.plugin_env.plugin_own_data_dir.clone(); + if let Err(e) = std::fs::remove_dir_all(cache_dir) { + log::error!("Failed to remove cache dir for plugin: {:?}", e); } } Ok(()) @@ -268,18 +273,29 @@ impl WasmBridge { Err(e) => Err(e), } } - pub fn resize_plugin(&mut self, pid: u32, new_columns: usize, new_rows: usize) -> Result<()> { + pub fn resize_plugin( + &mut self, + pid: PluginId, + new_columns: usize, + new_rows: usize, + ) -> Result<()> { let err_context = move || format!("failed to resize plugin {pid}"); - for ((plugin_id, client_id), (running_plugin, _subscriptions)) in - self.plugin_map.lock().unwrap().iter_mut() - { - if self - .cached_resizes_for_pending_plugins - .contains_key(&plugin_id) - { - continue; - } - if *plugin_id == pid { + + let plugins_to_resize: Vec<(PluginId, ClientId, Arc>)> = self + .plugin_map + .lock() + .unwrap() + .running_plugins() + .iter() + .cloned() + .filter(|(plugin_id, _client_id, _running_plugin)| { + !self + .cached_resizes_for_pending_plugins + .contains_key(&plugin_id) + }) + .collect(); + for (plugin_id, client_id, running_plugin) in plugins_to_resize { + if plugin_id == pid { let event_id = running_plugin .lock() .unwrap() @@ -287,8 +303,8 @@ impl WasmBridge { task::spawn({ let senders = self.senders.clone(); let running_plugin = running_plugin.clone(); - let plugin_id = *plugin_id; - let client_id = *client_id; + let plugin_id = plugin_id; + let client_id = client_id; async move { let mut running_plugin = running_plugin.lock().unwrap(); if running_plugin.apply_event_id(AtomicEvent::Resize, event_id) { @@ -339,34 +355,46 @@ impl WasmBridge { } pub fn update_plugins( &mut self, - mut updates: Vec<(Option, Option, Event)>, + mut updates: Vec<(Option, Option, Event)>, ) -> Result<()> { let err_context = || "failed to update plugin state".to_string(); - for (pid, cid, event) in updates.drain(..) { - for (&(plugin_id, client_id), (running_plugin, subscriptions)) in - &*self.plugin_map.lock().unwrap() - { - if self + let plugins_to_update: Vec<( + PluginId, + ClientId, + Arc>, + Arc>, + )> = self + .plugin_map + .lock() + .unwrap() + .running_plugins_and_subscriptions() + .iter() + .cloned() + .filter(|(plugin_id, _client_id, _running_plugin, _subscriptions)| { + !&self .cached_events_for_pending_plugins .contains_key(&plugin_id) - { - continue; - } + }) + .collect(); + for (pid, cid, event) in updates.drain(..) { + for (plugin_id, client_id, running_plugin, subscriptions) in &plugins_to_update { let subs = subscriptions.lock().unwrap().clone(); // FIXME: This is very janky... Maybe I should write my own macro for Event -> EventType? let event_type = EventType::from_str(&event.to_string()).with_context(err_context)?; if subs.contains(&event_type) && ((pid.is_none() && cid.is_none()) - || (pid.is_none() && cid == Some(client_id)) - || (cid.is_none() && pid == Some(plugin_id)) - || (cid == Some(client_id) && pid == Some(plugin_id))) + || (pid.is_none() && cid == Some(*client_id)) + || (cid.is_none() && pid == Some(*plugin_id)) + || (cid == Some(*client_id) && pid == Some(*plugin_id))) { task::spawn({ let senders = self.senders.clone(); let running_plugin = running_plugin.clone(); let event = event.clone(); + let plugin_id = *plugin_id; + let client_id = *client_id; async move { let running_plugin = running_plugin.lock().unwrap(); let mut plugin_bytes = vec![]; @@ -401,7 +429,7 @@ impl WasmBridge { } Ok(()) } - pub fn apply_cached_events(&mut self, plugin_ids: Vec) -> Result<()> { + pub fn apply_cached_events(&mut self, plugin_ids: Vec) -> Result<()> { let mut applied_plugin_paths = HashSet::new(); for plugin_id in plugin_ids { self.apply_cached_events_and_resizes_for_plugin(plugin_id)?; @@ -428,6 +456,10 @@ impl WasmBridge { for (_plugin_id, loading_plugin_task) in self.loading_plugins.drain() { drop(loading_plugin_task.cancel()); } + let plugin_ids = self.plugin_map.lock().unwrap().plugin_ids(); + for plugin_id in &plugin_ids { + drop(self.unload_plugin(*plugin_id)); + } } fn run_plugin_of_plugin_id(&self, plugin_id: PluginId) -> Option<&RunPlugin> { self.loading_plugins @@ -450,7 +482,7 @@ impl WasmBridge { .plugin_map .lock() .unwrap() - .get_mut(&(plugin_id, *client_id)) + .get_running_plugin_and_subscriptions(plugin_id, *client_id) { let subs = subscriptions.lock().unwrap().clone(); for event in events.clone() { @@ -494,6 +526,23 @@ impl WasmBridge { if let Some((rows, columns)) = self.cached_resizes_for_pending_plugins.remove(&plugin_id) { self.resize_plugin(plugin_id, columns, rows)?; } + self.apply_cached_worker_messages(plugin_id)?; + Ok(()) + } + pub fn apply_cached_worker_messages(&mut self, plugin_id: PluginId) -> Result<()> { + if let Some(mut messages) = self.cached_worker_messages.remove(&plugin_id) { + let mut worker_messages: HashMap<(ClientId, String), Vec<(String, String)>> = + HashMap::new(); + for (client_id, worker_name, message, payload) in messages.drain(..) { + worker_messages + .entry((client_id, worker_name)) + .or_default() + .push((message, payload)); + } + for ((client_id, worker_name), messages) in worker_messages.drain() { + self.post_messages_to_plugin_worker(plugin_id, client_id, worker_name, messages)?; + } + } Ok(()) } fn plugin_is_currently_being_loaded(&self, plugin_location: &RunPluginLocation) -> bool { @@ -506,34 +555,20 @@ impl WasmBridge { &self, plugin_location: &RunPluginLocation, ) -> Result> { - let err_context = || format!("Failed to get plugin ids for location {plugin_location}"); - let plugin_ids: Vec = self - .plugin_map + self.plugin_map .lock() .unwrap() - .iter() - .filter(|(_, (running_plugin, _subscriptions))| { - &running_plugin.lock().unwrap().plugin_env.plugin.location == plugin_location - // TODO: - // better - }) - .map(|((plugin_id, _client_id), _)| *plugin_id) - .collect(); - if plugin_ids.is_empty() { - return Err(ZellijError::PluginDoesNotExist).with_context(err_context); - } - Ok(plugin_ids) + .all_plugin_ids_for_plugin_location(plugin_location) } fn size_of_plugin_id(&self, plugin_id: PluginId) -> Option<(usize, usize)> { // (rows/colums) self.plugin_map .lock() .unwrap() - .iter() - .find(|((p_id, _client_id), _)| *p_id == plugin_id) - .map(|(_, (running_plugin, _subscriptions))| { - let running_plugin = running_plugin.lock().unwrap(); - (running_plugin.rows, running_plugin.columns) + .get_running_plugin(plugin_id, None) + .map(|r| { + let r = r.lock().unwrap(); + (r.rows, r.columns) }) } fn start_plugin_loading_indication( @@ -553,6 +588,51 @@ impl WasmBridge { .send_to_background_jobs(BackgroundJob::AnimatePluginLoading(*plugin_id)); } } + pub fn post_messages_to_plugin_worker( + &mut self, + plugin_id: PluginId, + client_id: ClientId, + worker_name: String, + mut messages: Vec<(String, String)>, + ) -> Result<()> { + let worker = + self.plugin_map + .lock() + .unwrap() + .clone_worker(plugin_id, client_id, &worker_name); + let mut cache_messages = || { + for (message, payload) in messages.drain(..) { + self.cached_worker_messages + .entry(plugin_id) + .or_default() + .push((client_id, worker_name.clone(), message, payload)); + } + }; + match worker { + Some(worker) => { + let worker_is_busy = { worker.try_lock().is_err() }; + if worker_is_busy { + // most messages will be caught here, we do this once before the async task to + // bulk most messages together and prevent them from cascading + cache_messages(); + } else { + async_send_messages_to_worker( + self.senders.clone(), + messages, + worker, + plugin_id, + client_id, + worker_name, + ); + } + }, + None => { + log::warn!("Worker {worker_name} not found, placing message in cache"); + cache_messages(); + }, + } + Ok(()) + } } fn handle_plugin_successful_loading(senders: &ThreadSenders, plugin_id: PluginId) { @@ -564,11 +644,11 @@ fn handle_plugin_loading_failure( senders: &ThreadSenders, plugin_id: PluginId, loading_indication: &mut LoadingIndication, - error: impl Display, + error: impl std::fmt::Debug, ) { - log::error!("{}", error); + log::error!("{:?}", error); let _ = senders.send_to_background_jobs(BackgroundJob::StopPluginLoadingAnimation(plugin_id)); - loading_indication.indicate_loading_error(error.to_string()); + loading_indication.indicate_loading_error(format!("{:?}", error)); let _ = senders.send_to_screen(ScreenInstruction::UpdatePluginLoadingStage( plugin_id, loading_indication.clone(), @@ -576,14 +656,14 @@ fn handle_plugin_loading_failure( } pub fn apply_event_to_plugin( - plugin_id: u32, + plugin_id: PluginId, client_id: ClientId, instance: &Instance, plugin_env: &PluginEnv, event: &Event, rows: usize, columns: usize, - plugin_bytes: &mut Vec<(u32, ClientId, Vec)>, + plugin_bytes: &mut Vec<(PluginId, ClientId, Vec)>, ) -> Result<()> { let err_context = || format!("Failed to apply event to plugin {plugin_id}"); let update = instance @@ -627,3 +707,53 @@ pub fn apply_event_to_plugin( } Ok(()) } + +fn async_send_messages_to_worker( + senders: ThreadSenders, + mut messages: Vec<(String, String)>, + worker: Arc>, + plugin_id: PluginId, + client_id: ClientId, + worker_name: String, +) { + task::spawn({ + async move { + match worker.try_lock() { + Ok(worker) => { + for (message, payload) in messages.drain(..) { + worker.send_message(message, payload).ok(); + } + let _ = senders + .send_to_plugin(PluginInstruction::ApplyCachedWorkerMessages(plugin_id)); + }, + Err(TryLockError::WouldBlock) => { + task::spawn({ + async move { + log::warn!( + "Worker {} busy, retrying sending message after: {}ms", + worker_name, + RETRY_INTERVAL_MS + ); + task::sleep(std::time::Duration::from_millis(RETRY_INTERVAL_MS)).await; + let _ = senders.send_to_plugin( + PluginInstruction::PostMessagesToPluginWorker( + plugin_id, + client_id, + worker_name, + messages, + ), + ); + } + }); + }, + Err(e) => { + log::error!( + "Failed to send message to worker \"{}\": {:?}", + worker_name, + e + ); + }, + } + } + }); +} diff --git a/zellij-server/src/plugins/zellij_exports.rs b/zellij-server/src/plugins/zellij_exports.rs index cc0321ab..3515cb34 100644 --- a/zellij-server/src/plugins/zellij_exports.rs +++ b/zellij-server/src/plugins/zellij_exports.rs @@ -50,10 +50,13 @@ pub fn zellij_exports( host_get_plugin_ids, host_get_zellij_version, host_open_file, + host_open_file_with_line, host_switch_tab_to, host_set_timeout, host_exec_cmd, host_report_panic, + host_post_message_to, + host_post_message_to_plugin, } } @@ -171,6 +174,27 @@ fn host_open_file(env: &ForeignFunctionEnv) { .non_fatal(); } +fn host_open_file_with_line(env: &ForeignFunctionEnv) { + wasi_read_object::<(PathBuf, usize)>(&env.plugin_env.wasi_env) + .and_then(|(path, line)| { + env.plugin_env + .senders + .send_to_pty(PtyInstruction::SpawnTerminal( + Some(TerminalAction::OpenFile(path, Some(line), None)), // TODO: add cwd + None, + None, + ClientOrTabIndex::TabIndex(env.plugin_env.tab_index), + )) + }) + .with_context(|| { + format!( + "failed to open file on host from plugin {}", + env.plugin_env.name() + ) + }) + .non_fatal(); +} + fn host_switch_tab_to(env: &ForeignFunctionEnv, tab_idx: u32) { env.plugin_env .senders @@ -201,6 +225,7 @@ fn host_set_timeout(env: &ForeignFunctionEnv, secs: f64) { let update_target = Some(env.plugin_env.plugin_id); let client_id = env.plugin_env.client_id; let plugin_name = env.plugin_env.name(); + // TODO: we should really use an async task for this thread::spawn(move || { let start_time = Instant::now(); thread::sleep(Duration::from_secs_f64(secs)); @@ -257,6 +282,38 @@ fn host_exec_cmd(env: &ForeignFunctionEnv) { .non_fatal(); } +fn host_post_message_to(env: &ForeignFunctionEnv) { + wasi_read_object::<(String, String, String)>(&env.plugin_env.wasi_env) + .and_then(|(worker_name, message, payload)| { + env.plugin_env + .senders + .send_to_plugin(PluginInstruction::PostMessagesToPluginWorker( + env.plugin_env.plugin_id, + env.plugin_env.client_id, + worker_name, + vec![(message, payload)], + )) + }) + .with_context(|| format!("failed to post message to worker {}", env.plugin_env.name())) + .fatal(); +} + +fn host_post_message_to_plugin(env: &ForeignFunctionEnv) { + wasi_read_object::<(String, String)>(&env.plugin_env.wasi_env) + .and_then(|(message, payload)| { + env.plugin_env + .senders + .send_to_plugin(PluginInstruction::PostMessageToPlugin( + env.plugin_env.plugin_id, + env.plugin_env.client_id, + message, + payload, + )) + }) + .with_context(|| format!("failed to post message to plugin {}", env.plugin_env.name())) + .fatal(); +} + // Custom panic handler for plugins. // // This is called when a panic occurs in a plugin. Since most panics will likely originate in the diff --git a/zellij-tile/src/lib.rs b/zellij-tile/src/lib.rs index 800b6d68..685de4af 100644 --- a/zellij-tile/src/lib.rs +++ b/zellij-tile/src/lib.rs @@ -1,6 +1,7 @@ pub mod prelude; pub mod shim; +use serde::{Deserialize, Serialize}; use zellij_utils::data::Event; #[allow(unused_variables)] @@ -12,6 +13,12 @@ pub trait ZellijPlugin { fn render(&mut self, rows: usize, cols: usize) {} } +#[allow(unused_variables)] +// TODO: can we get rid of the lifetime? maybe with generics? +pub trait ZellijWorker<'de>: Default + Serialize + Deserialize<'de> { + fn on_message(&mut self, message: String, payload: String) {} +} + pub const PLUGIN_MISMATCH: &str = "An error occured in a plugin while receiving an Event from zellij. This means that the plugins aren't compatible with the current zellij version. @@ -36,7 +43,10 @@ macro_rules! register_plugin { std::panic::set_hook(Box::new(|info| { report_panic(info); })); + } + #[no_mangle] + fn load() { STATE.with(|state| { state.borrow_mut().load(); }); @@ -65,3 +75,61 @@ macro_rules! register_plugin { } }; } + +#[macro_export] +macro_rules! register_worker { + ($worker:ty, $worker_name:ident) => { + #[no_mangle] + pub fn $worker_name() { + use serde_json::*; + let worker_display_name = std::stringify!($worker_name); + + // read message from STDIN + let (message, payload): (String, String) = $crate::shim::object_from_stdin() + .unwrap_or_else(|e| { + eprintln!( + "Failed to deserialize message to worker \"{}\": {:?}", + worker_display_name, e + ); + Default::default() + }); + + // read previous worker state from HD if it exists + let mut worker_instance = match std::fs::read(&format!("/data/{}", worker_display_name)) + .map_err(|e| format!("Failed to read file: {:?}", e)) + .and_then(|s| { + serde_json::from_str::<$worker>(&String::from_utf8_lossy(&s)) + .map_err(|e| format!("Failed to deserialize: {:?}", e)) + }) { + Ok(s) => s, + Err(e) => { + eprintln!( + "Failed to read existing state ({:?}), creating new state for worker", + e + ); + <$worker>::default() + }, + }; + + // invoke worker + worker_instance.on_message(message, payload); + + // persist worker state to HD for next run + match serde_json::to_string(&worker_instance) + .map_err(|e| format!("Failed to serialize worker state")) + .and_then(|serialized_state| { + std::fs::write( + &format!("/data/{}", worker_display_name), + serialized_state.as_bytes(), + ) + .map_err(|e| format!("Failed to persist state to HD: {:?}", e)) + }) { + Ok(()) => {}, + Err(e) => eprintln!( + "Failed to serialize and persist worker state to hd: {:?}", + e + ), + } + } + }; +} diff --git a/zellij-tile/src/shim.rs b/zellij-tile/src/shim.rs index 0aa8d3e0..f1c1fb54 100644 --- a/zellij-tile/src/shim.rs +++ b/zellij-tile/src/shim.rs @@ -39,6 +39,11 @@ pub fn open_file(path: &Path) { unsafe { host_open_file() }; } +pub fn open_file_with_line(path: &Path, line: usize) { + object_to_stdout(&(path, line)); + unsafe { host_open_file_with_line() }; +} + pub fn switch_tab_to(tab_idx: u32) { unsafe { host_switch_tab_to(tab_idx) }; } @@ -74,6 +79,24 @@ pub fn object_to_stdout(object: &impl Serialize) { println!("{}", serde_json::to_string(object).unwrap()); } +#[doc(hidden)] +pub fn post_message_to(worker_name: &str, message: String, payload: String) { + match serde_json::to_string(&(worker_name, message, payload)) { + Ok(serialized) => println!("{}", serialized), + Err(e) => eprintln!("Failed to serialize message: {:?}", e), + } + unsafe { host_post_message_to() }; +} + +#[doc(hidden)] +pub fn post_message_to_plugin(message: String, payload: String) { + match serde_json::to_string(&(message, payload)) { + Ok(serialized) => println!("{}", serialized), + Err(e) => eprintln!("Failed to serialize message: {:?}", e), + } + unsafe { host_post_message_to_plugin() }; +} + #[link(wasm_import_module = "zellij")] extern "C" { fn host_subscribe(); @@ -82,8 +105,11 @@ extern "C" { fn host_get_plugin_ids(); fn host_get_zellij_version(); fn host_open_file(); + fn host_open_file_with_line(); fn host_switch_tab_to(tab_idx: u32); fn host_set_timeout(secs: f64); fn host_exec_cmd(); fn host_report_panic(); + fn host_post_message_to(); + fn host_post_message_to_plugin(); } diff --git a/zellij-utils/Cargo.toml b/zellij-utils/Cargo.toml index 95f8b82a..f12d8c78 100644 --- a/zellij-utils/Cargo.toml +++ b/zellij-utils/Cargo.toml @@ -39,6 +39,7 @@ regex = "1.5.5" tempfile = "3.2.0" kdl = { version = "4.5.0", features = ["span"] } shellexpand = "3.0.0" +uuid = { version = "0.8.2", features = ["serde", "v4"] } #[cfg(not(target_family = "wasm"))] [target.'cfg(not(target_family = "wasm"))'.dependencies] diff --git a/zellij-utils/assets/plugins/compact-bar.wasm b/zellij-utils/assets/plugins/compact-bar.wasm index d59cb92b..3145fe1d 100755 Binary files a/zellij-utils/assets/plugins/compact-bar.wasm and b/zellij-utils/assets/plugins/compact-bar.wasm differ diff --git a/zellij-utils/assets/plugins/fixture-plugin-for-tests.wasm b/zellij-utils/assets/plugins/fixture-plugin-for-tests.wasm new file mode 100755 index 00000000..a5421d72 Binary files /dev/null and b/zellij-utils/assets/plugins/fixture-plugin-for-tests.wasm differ diff --git a/zellij-utils/assets/plugins/status-bar.wasm b/zellij-utils/assets/plugins/status-bar.wasm index 63907aed..8f7ed782 100755 Binary files a/zellij-utils/assets/plugins/status-bar.wasm and b/zellij-utils/assets/plugins/status-bar.wasm differ diff --git a/zellij-utils/assets/plugins/strider.wasm b/zellij-utils/assets/plugins/strider.wasm index 39909aaa..929f5e8c 100755 Binary files a/zellij-utils/assets/plugins/strider.wasm and b/zellij-utils/assets/plugins/strider.wasm differ diff --git a/zellij-utils/assets/plugins/tab-bar.wasm b/zellij-utils/assets/plugins/tab-bar.wasm index 5f59fe4f..a8742f67 100755 Binary files a/zellij-utils/assets/plugins/tab-bar.wasm and b/zellij-utils/assets/plugins/tab-bar.wasm differ diff --git a/zellij-utils/src/consts.rs b/zellij-utils/src/consts.rs index b851b152..047cf986 100644 --- a/zellij-utils/src/consts.rs +++ b/zellij-utils/src/consts.rs @@ -5,6 +5,7 @@ use directories_next::ProjectDirs; use lazy_static::lazy_static; use once_cell::sync::OnceCell; use std::path::PathBuf; +use uuid::Uuid; pub const ZELLIJ_CONFIG_FILE_ENV: &str = "ZELLIJ_CONFIG_FILE"; pub const ZELLIJ_CONFIG_DIR_ENV: &str = "ZELLIJ_CONFIG_DIR"; @@ -29,6 +30,10 @@ lazy_static! { pub static ref ZELLIJ_PROJ_DIR: ProjectDirs = ProjectDirs::from("org", "Zellij Contributors", "Zellij").unwrap(); pub static ref ZELLIJ_CACHE_DIR: PathBuf = ZELLIJ_PROJ_DIR.cache_dir().to_path_buf(); + pub static ref ZELLIJ_SESSION_CACHE_DIR: PathBuf = ZELLIJ_PROJ_DIR + .cache_dir() + .to_path_buf() + .join(format!("{}", Uuid::new_v4())); pub static ref ZELLIJ_DEFAULT_THEMES: Themes = { let mut default_themes = Themes::default(); diff --git a/zellij-utils/src/data.rs b/zellij-utils/src/data.rs index 6da375f5..285532cc 100644 --- a/zellij-utils/src/data.rs +++ b/zellij-utils/src/data.rs @@ -469,6 +469,10 @@ pub enum Event { SystemClipboardFailure, InputReceived, Visible(bool), + CustomMessage( + String, // message + String, // payload + ), } /// Describes the different input modes, which change the way that keystrokes will be interpreted. diff --git a/zellij-utils/src/errors.rs b/zellij-utils/src/errors.rs index 8b49112b..74395e57 100644 --- a/zellij-utils/src/errors.rs +++ b/zellij-utils/src/errors.rs @@ -365,6 +365,9 @@ pub enum PluginContext { RemoveClient, NewTab, ApplyCachedEvents, + ApplyCachedWorkerMessages, + PostMessageToPluginWorker, + PostMessageToPlugin, } /// Stack call representations corresponding to the different types of [`ClientInstruction`]s.