diff --git a/crates/eww/src/app.rs b/crates/eww/src/app.rs index 2e9aad9..d415509 100644 --- a/crates/eww/src/app.rs +++ b/crates/eww/src/app.rs @@ -372,7 +372,8 @@ impl App { log::info!("Reloading windows"); self.script_var_handler.stop_all(); - self.script_var_handler = script_var_handler::init(self.app_evt_send.clone()); + let old_handler = std::mem::replace(&mut self.script_var_handler, script_var_handler::init(self.app_evt_send.clone())); + old_handler.join_thread(); log::trace!("loading config: {:#?}", config); diff --git a/crates/eww/src/script_var_handler.rs b/crates/eww/src/script_var_handler.rs index 3bc7541..fde341e 100644 --- a/crates/eww/src/script_var_handler.rs +++ b/crates/eww/src/script_var_handler.rs @@ -24,8 +24,7 @@ use yuck::config::script_var_definition::{ListenScriptVar, PollScriptVar, Script /// the script var execution. pub fn init(evt_send: UnboundedSender) -> ScriptVarHandlerHandle { let (msg_send, mut msg_recv) = tokio::sync::mpsc::unbounded_channel(); - let handle = ScriptVarHandlerHandle { msg_send }; - std::thread::spawn(move || { + let thread_handle = std::thread::spawn(move || { let rt = tokio::runtime::Runtime::new().expect("Failed to initialize tokio runtime for script var handlers"); rt.block_on(async { let _: Result<_> = try { @@ -39,10 +38,10 @@ pub fn init(evt_send: UnboundedSender) -> ScriptVarHandlerHandle handler.add(var).await; } ScriptVarHandlerMsg::Stop(name) => { - handler.stop_for_variable(&name)?; + handler.stop_for_variable(&name).await?; } ScriptVarHandlerMsg::StopAll => { - handler.stop_all(); + handler.stop_all().await; break; } }, @@ -51,12 +50,14 @@ pub fn init(evt_send: UnboundedSender) -> ScriptVarHandlerHandle }; }) }); + let handle = ScriptVarHandlerHandle { msg_send, thread_handle }; handle } /// Handle to the script-var handling system. pub struct ScriptVarHandlerHandle { msg_send: UnboundedSender, + thread_handle: std::thread::JoinHandle<()>, } impl ScriptVarHandlerHandle { @@ -85,6 +86,10 @@ impl ScriptVarHandlerHandle { self.msg_send.send(ScriptVarHandlerMsg::StopAll) ); } + + pub fn join_thread(self) { + let _ = self.thread_handle.join(); + } } /// Message enum used by the ScriptVarHandlerHandle to communicate to the ScriptVarHandler @@ -110,17 +115,17 @@ impl ScriptVarHandler { } /// Stop the handler that is responsible for a given variable. - fn stop_for_variable(&mut self, name: &VarName) -> Result<()> { + async fn stop_for_variable(&mut self, name: &VarName) -> Result<()> { log::debug!("Stopping script var process for variable {}", name); - self.listen_handler.stop_for_variable(name); + self.listen_handler.stop_for_variable(name).await; self.poll_handler.stop_for_variable(name); Ok(()) } /// stop all running scripts and schedules - fn stop_all(&mut self) { + async fn stop_all(&mut self) { log::debug!("Stopping script-var-handlers"); - self.listen_handler.stop_all(); + self.listen_handler.stop_all().await; self.poll_handler.stop_all(); } } @@ -197,7 +202,7 @@ impl Drop for PollVarHandler { struct ListenVarHandler { evt_send: UnboundedSender, - listen_process_handles: HashMap, + listen_process_handles: HashMap, } impl ListenVarHandler { @@ -216,8 +221,8 @@ impl ListenVarHandler { return; } - let cancellation_token = CancellationToken::new(); - self.listen_process_handles.insert(var.name.clone(), cancellation_token.clone()); + let (cancel_send, mut cancel_recv) = cancellation::create(); + self.listen_process_handles.insert(var.name.clone(), cancel_send.clone()); let evt_send = self.evt_send.clone(); tokio::spawn(async move { @@ -235,9 +240,13 @@ impl ListenVarHandler { }; let mut stdout_lines = BufReader::new(handle.stdout.take().unwrap()).lines(); let mut stderr_lines = BufReader::new(handle.stderr.take().unwrap()).lines(); + let mut completion_notify = None; crate::loop_select_exiting! { _ = handle.wait() => break, - _ = cancellation_token.cancelled() => break, + notify = cancel_recv.wait_for_cancel() => { + completion_notify = notify; + break; + } Ok(Some(line)) = stdout_lines.next_line() => { let new_value = DynVal::from_string(line.to_owned()); evt_send.send(DaemonCommand::UpdateVars(vec![(var.name.to_owned(), new_value)]))?; @@ -246,32 +255,42 @@ impl ListenVarHandler { log::warn!("stderr of `{}`: {}", var.name, line); } else => break, - } + }; terminate_handle(handle).await; + + if let Some(completion_notify) = completion_notify { + completion_notify.completed().await; + } }); }); } - fn stop_for_variable(&mut self, name: &VarName) { + async fn stop_for_variable(&mut self, name: &VarName) { if let Some(token) = self.listen_process_handles.remove(name) { log::debug!("stopped listen-var {}", name); - token.cancel(); + token.cancel().await; } } - fn stop_all(&mut self) { - self.listen_process_handles.drain().for_each(|(_, token)| token.cancel()); + async fn stop_all(&mut self) { + for (_, token) in self.listen_process_handles.drain() { + token.cancel().await; + } } } impl Drop for ListenVarHandler { fn drop(&mut self) { - self.stop_all(); + let rt = tokio::runtime::Runtime::new().expect("Failed to initialize tokio runtime for script var handlers"); + rt.block_on(async { + self.stop_all().await; + }); } } async fn terminate_handle(mut child: tokio::process::Child) { if let Some(id) = child.id() { + log::debug!("Killing process with id {}", id); let _ = signal::killpg(Pid::from_raw(id as i32), signal::SIGTERM); tokio::select! { _ = child.wait() => { }, @@ -283,3 +302,40 @@ async fn terminate_handle(mut child: tokio::process::Child) { let _ = child.kill().await; } } + +// Especially for listenvars, we want to make sure that the scripts are actually +// cancelled before we kill the tokio task that they run in. +// for that, we need to wait for the completion of the cancel itself +/// Provides a CancellationToken-like object that allows to wait for completion of the cancellation. +mod cancellation { + pub(super) struct CancelCompletionNotifier(tokio::sync::mpsc::Sender<()>); + impl CancelCompletionNotifier { + pub async fn completed(self) { + crate::print_result_err!("Sending cancellation completion", self.0.send(()).await); + } + } + + pub(super) struct AwaitableCancelationReceiver(tokio::sync::mpsc::Receiver); + + impl AwaitableCancelationReceiver { + pub(super) async fn wait_for_cancel(&mut self) -> Option { + self.0.recv().await + } + } + + #[derive(Clone)] + pub(super) struct AwaitableCancelationSender(tokio::sync::mpsc::Sender); + impl AwaitableCancelationSender { + pub(super) async fn cancel(&self) { + let (send, mut recv) = tokio::sync::mpsc::channel(1); + if self.0.send(CancelCompletionNotifier(send)).await.is_ok() { + let _ = recv.recv().await; + } + } + } + + pub(super) fn create() -> (AwaitableCancelationSender, AwaitableCancelationReceiver) { + let (send, recv) = tokio::sync::mpsc::channel(1); + (AwaitableCancelationSender(send), AwaitableCancelationReceiver(recv)) + } +}