Fix listen vars not being cleaned up when reloading config (#538)
This commit is contained in:
parent
1d794de86c
commit
17d91f8a44
2 changed files with 76 additions and 19 deletions
|
@ -372,7 +372,8 @@ impl App {
|
||||||
log::info!("Reloading windows");
|
log::info!("Reloading windows");
|
||||||
|
|
||||||
self.script_var_handler.stop_all();
|
self.script_var_handler.stop_all();
|
||||||
self.script_var_handler = script_var_handler::init(self.app_evt_send.clone());
|
let old_handler = std::mem::replace(&mut self.script_var_handler, script_var_handler::init(self.app_evt_send.clone()));
|
||||||
|
old_handler.join_thread();
|
||||||
|
|
||||||
log::trace!("loading config: {:#?}", config);
|
log::trace!("loading config: {:#?}", config);
|
||||||
|
|
||||||
|
|
|
@ -24,8 +24,7 @@ use yuck::config::script_var_definition::{ListenScriptVar, PollScriptVar, Script
|
||||||
/// the script var execution.
|
/// the script var execution.
|
||||||
pub fn init(evt_send: UnboundedSender<DaemonCommand>) -> ScriptVarHandlerHandle {
|
pub fn init(evt_send: UnboundedSender<DaemonCommand>) -> ScriptVarHandlerHandle {
|
||||||
let (msg_send, mut msg_recv) = tokio::sync::mpsc::unbounded_channel();
|
let (msg_send, mut msg_recv) = tokio::sync::mpsc::unbounded_channel();
|
||||||
let handle = ScriptVarHandlerHandle { msg_send };
|
let thread_handle = std::thread::spawn(move || {
|
||||||
std::thread::spawn(move || {
|
|
||||||
let rt = tokio::runtime::Runtime::new().expect("Failed to initialize tokio runtime for script var handlers");
|
let rt = tokio::runtime::Runtime::new().expect("Failed to initialize tokio runtime for script var handlers");
|
||||||
rt.block_on(async {
|
rt.block_on(async {
|
||||||
let _: Result<_> = try {
|
let _: Result<_> = try {
|
||||||
|
@ -39,10 +38,10 @@ pub fn init(evt_send: UnboundedSender<DaemonCommand>) -> ScriptVarHandlerHandle
|
||||||
handler.add(var).await;
|
handler.add(var).await;
|
||||||
}
|
}
|
||||||
ScriptVarHandlerMsg::Stop(name) => {
|
ScriptVarHandlerMsg::Stop(name) => {
|
||||||
handler.stop_for_variable(&name)?;
|
handler.stop_for_variable(&name).await?;
|
||||||
}
|
}
|
||||||
ScriptVarHandlerMsg::StopAll => {
|
ScriptVarHandlerMsg::StopAll => {
|
||||||
handler.stop_all();
|
handler.stop_all().await;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
@ -51,12 +50,14 @@ pub fn init(evt_send: UnboundedSender<DaemonCommand>) -> ScriptVarHandlerHandle
|
||||||
};
|
};
|
||||||
})
|
})
|
||||||
});
|
});
|
||||||
|
let handle = ScriptVarHandlerHandle { msg_send, thread_handle };
|
||||||
handle
|
handle
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Handle to the script-var handling system.
|
/// Handle to the script-var handling system.
|
||||||
pub struct ScriptVarHandlerHandle {
|
pub struct ScriptVarHandlerHandle {
|
||||||
msg_send: UnboundedSender<ScriptVarHandlerMsg>,
|
msg_send: UnboundedSender<ScriptVarHandlerMsg>,
|
||||||
|
thread_handle: std::thread::JoinHandle<()>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ScriptVarHandlerHandle {
|
impl ScriptVarHandlerHandle {
|
||||||
|
@ -85,6 +86,10 @@ impl ScriptVarHandlerHandle {
|
||||||
self.msg_send.send(ScriptVarHandlerMsg::StopAll)
|
self.msg_send.send(ScriptVarHandlerMsg::StopAll)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn join_thread(self) {
|
||||||
|
let _ = self.thread_handle.join();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Message enum used by the ScriptVarHandlerHandle to communicate to the ScriptVarHandler
|
/// Message enum used by the ScriptVarHandlerHandle to communicate to the ScriptVarHandler
|
||||||
|
@ -110,17 +115,17 @@ impl ScriptVarHandler {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Stop the handler that is responsible for a given variable.
|
/// Stop the handler that is responsible for a given variable.
|
||||||
fn stop_for_variable(&mut self, name: &VarName) -> Result<()> {
|
async fn stop_for_variable(&mut self, name: &VarName) -> Result<()> {
|
||||||
log::debug!("Stopping script var process for variable {}", name);
|
log::debug!("Stopping script var process for variable {}", name);
|
||||||
self.listen_handler.stop_for_variable(name);
|
self.listen_handler.stop_for_variable(name).await;
|
||||||
self.poll_handler.stop_for_variable(name);
|
self.poll_handler.stop_for_variable(name);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// stop all running scripts and schedules
|
/// stop all running scripts and schedules
|
||||||
fn stop_all(&mut self) {
|
async fn stop_all(&mut self) {
|
||||||
log::debug!("Stopping script-var-handlers");
|
log::debug!("Stopping script-var-handlers");
|
||||||
self.listen_handler.stop_all();
|
self.listen_handler.stop_all().await;
|
||||||
self.poll_handler.stop_all();
|
self.poll_handler.stop_all();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -197,7 +202,7 @@ impl Drop for PollVarHandler {
|
||||||
|
|
||||||
struct ListenVarHandler {
|
struct ListenVarHandler {
|
||||||
evt_send: UnboundedSender<DaemonCommand>,
|
evt_send: UnboundedSender<DaemonCommand>,
|
||||||
listen_process_handles: HashMap<VarName, CancellationToken>,
|
listen_process_handles: HashMap<VarName, cancellation::AwaitableCancelationSender>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ListenVarHandler {
|
impl ListenVarHandler {
|
||||||
|
@ -216,8 +221,8 @@ impl ListenVarHandler {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
let cancellation_token = CancellationToken::new();
|
let (cancel_send, mut cancel_recv) = cancellation::create();
|
||||||
self.listen_process_handles.insert(var.name.clone(), cancellation_token.clone());
|
self.listen_process_handles.insert(var.name.clone(), cancel_send.clone());
|
||||||
|
|
||||||
let evt_send = self.evt_send.clone();
|
let evt_send = self.evt_send.clone();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
|
@ -235,9 +240,13 @@ impl ListenVarHandler {
|
||||||
};
|
};
|
||||||
let mut stdout_lines = BufReader::new(handle.stdout.take().unwrap()).lines();
|
let mut stdout_lines = BufReader::new(handle.stdout.take().unwrap()).lines();
|
||||||
let mut stderr_lines = BufReader::new(handle.stderr.take().unwrap()).lines();
|
let mut stderr_lines = BufReader::new(handle.stderr.take().unwrap()).lines();
|
||||||
|
let mut completion_notify = None;
|
||||||
crate::loop_select_exiting! {
|
crate::loop_select_exiting! {
|
||||||
_ = handle.wait() => break,
|
_ = handle.wait() => break,
|
||||||
_ = cancellation_token.cancelled() => break,
|
notify = cancel_recv.wait_for_cancel() => {
|
||||||
|
completion_notify = notify;
|
||||||
|
break;
|
||||||
|
}
|
||||||
Ok(Some(line)) = stdout_lines.next_line() => {
|
Ok(Some(line)) = stdout_lines.next_line() => {
|
||||||
let new_value = DynVal::from_string(line.to_owned());
|
let new_value = DynVal::from_string(line.to_owned());
|
||||||
evt_send.send(DaemonCommand::UpdateVars(vec![(var.name.to_owned(), new_value)]))?;
|
evt_send.send(DaemonCommand::UpdateVars(vec![(var.name.to_owned(), new_value)]))?;
|
||||||
|
@ -246,32 +255,42 @@ impl ListenVarHandler {
|
||||||
log::warn!("stderr of `{}`: {}", var.name, line);
|
log::warn!("stderr of `{}`: {}", var.name, line);
|
||||||
}
|
}
|
||||||
else => break,
|
else => break,
|
||||||
}
|
};
|
||||||
terminate_handle(handle).await;
|
terminate_handle(handle).await;
|
||||||
|
|
||||||
|
if let Some(completion_notify) = completion_notify {
|
||||||
|
completion_notify.completed().await;
|
||||||
|
}
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
fn stop_for_variable(&mut self, name: &VarName) {
|
async fn stop_for_variable(&mut self, name: &VarName) {
|
||||||
if let Some(token) = self.listen_process_handles.remove(name) {
|
if let Some(token) = self.listen_process_handles.remove(name) {
|
||||||
log::debug!("stopped listen-var {}", name);
|
log::debug!("stopped listen-var {}", name);
|
||||||
token.cancel();
|
token.cancel().await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn stop_all(&mut self) {
|
async fn stop_all(&mut self) {
|
||||||
self.listen_process_handles.drain().for_each(|(_, token)| token.cancel());
|
for (_, token) in self.listen_process_handles.drain() {
|
||||||
|
token.cancel().await;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Drop for ListenVarHandler {
|
impl Drop for ListenVarHandler {
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
self.stop_all();
|
let rt = tokio::runtime::Runtime::new().expect("Failed to initialize tokio runtime for script var handlers");
|
||||||
|
rt.block_on(async {
|
||||||
|
self.stop_all().await;
|
||||||
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn terminate_handle(mut child: tokio::process::Child) {
|
async fn terminate_handle(mut child: tokio::process::Child) {
|
||||||
if let Some(id) = child.id() {
|
if let Some(id) = child.id() {
|
||||||
|
log::debug!("Killing process with id {}", id);
|
||||||
let _ = signal::killpg(Pid::from_raw(id as i32), signal::SIGTERM);
|
let _ = signal::killpg(Pid::from_raw(id as i32), signal::SIGTERM);
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
_ = child.wait() => { },
|
_ = child.wait() => { },
|
||||||
|
@ -283,3 +302,40 @@ async fn terminate_handle(mut child: tokio::process::Child) {
|
||||||
let _ = child.kill().await;
|
let _ = child.kill().await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Especially for listenvars, we want to make sure that the scripts are actually
|
||||||
|
// cancelled before we kill the tokio task that they run in.
|
||||||
|
// for that, we need to wait for the completion of the cancel itself
|
||||||
|
/// Provides a CancellationToken-like object that allows to wait for completion of the cancellation.
|
||||||
|
mod cancellation {
|
||||||
|
pub(super) struct CancelCompletionNotifier(tokio::sync::mpsc::Sender<()>);
|
||||||
|
impl CancelCompletionNotifier {
|
||||||
|
pub async fn completed(self) {
|
||||||
|
crate::print_result_err!("Sending cancellation completion", self.0.send(()).await);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(super) struct AwaitableCancelationReceiver(tokio::sync::mpsc::Receiver<CancelCompletionNotifier>);
|
||||||
|
|
||||||
|
impl AwaitableCancelationReceiver {
|
||||||
|
pub(super) async fn wait_for_cancel(&mut self) -> Option<CancelCompletionNotifier> {
|
||||||
|
self.0.recv().await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub(super) struct AwaitableCancelationSender(tokio::sync::mpsc::Sender<CancelCompletionNotifier>);
|
||||||
|
impl AwaitableCancelationSender {
|
||||||
|
pub(super) async fn cancel(&self) {
|
||||||
|
let (send, mut recv) = tokio::sync::mpsc::channel(1);
|
||||||
|
if self.0.send(CancelCompletionNotifier(send)).await.is_ok() {
|
||||||
|
let _ = recv.recv().await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(super) fn create() -> (AwaitableCancelationSender, AwaitableCancelationReceiver) {
|
||||||
|
let (send, recv) = tokio::sync::mpsc::channel(1);
|
||||||
|
(AwaitableCancelationSender(send), AwaitableCancelationReceiver(recv))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Add table
Reference in a new issue