This commit is contained in:
elkowar 2021-01-02 23:10:20 +01:00 committed by ElKowar
parent b9b8a77cf4
commit 05d9320598
5 changed files with 126 additions and 91 deletions

7
Cargo.lock generated
View file

@ -412,6 +412,7 @@ dependencies = [
"tokio 1.0.1", "tokio 1.0.1",
"tokio-stream", "tokio-stream",
"tokio-util", "tokio-util",
"unescape",
] ]
[[package]] [[package]]
@ -2229,6 +2230,12 @@ dependencies = [
"serde", "serde",
] ]
[[package]]
name = "unescape"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ccb97dac3243214f8d8507998906ca3e2e0b900bf9bf4870477f125b82e68f6e"
[[package]] [[package]]
name = "unicode-segmentation" name = "unicode-segmentation"
version = "1.7.1" version = "1.7.1"

View file

@ -1,5 +1,5 @@
use crate::{ use crate::{
application_lifecycle, config, config,
config::{window_definition::WindowName, AnchorPoint, WindowStacking}, config::{window_definition::WindowName, AnchorPoint, WindowStacking},
eww_state, eww_state,
script_var_handler::*, script_var_handler::*,
@ -80,7 +80,7 @@ impl App {
EwwCommand::KillServer => { EwwCommand::KillServer => {
log::info!("Received kill command, stopping server!"); log::info!("Received kill command, stopping server!");
self.stop_application(); self.stop_application();
crate::application_lifecycle::send_exit()?; let _ = crate::application_lifecycle::send_exit();
} }
EwwCommand::CloseAll => { EwwCommand::CloseAll => {
log::info!("Received close command, closing all windows"); log::info!("Received close command, closing all windows");

View file

@ -15,11 +15,11 @@ use tokio_util::sync::CancellationToken;
/// Initialize the script var handler, and return a handle to that handler, which can be used to control /// Initialize the script var handler, and return a handle to that handler, which can be used to control
/// the script var execution. /// the script var execution.
pub fn init(evt_send: UnboundedSender<EwwCommand>) -> Result<ScriptVarHandlerHandle> { pub fn init(evt_send: UnboundedSender<EwwCommand>) -> ScriptVarHandlerHandle {
let (msg_send, mut msg_recv) = tokio::sync::mpsc::unbounded_channel(); let (msg_send, mut msg_recv) = tokio::sync::mpsc::unbounded_channel();
let handle = ScriptVarHandlerHandle { msg_send }; let handle = ScriptVarHandlerHandle { msg_send };
std::thread::spawn(move || { std::thread::spawn(move || {
let rt = tokio::runtime::Runtime::new().unwrap(); let rt = tokio::runtime::Runtime::new().expect("Failed to initialize tokio runtime for script var handlers");
rt.block_on(async { rt.block_on(async {
let _: Result<_> = try { let _: Result<_> = try {
let mut handler = ScriptVarHandler { let mut handler = ScriptVarHandler {
@ -43,7 +43,7 @@ pub fn init(evt_send: UnboundedSender<EwwCommand>) -> Result<ScriptVarHandlerHan
}; };
}) })
}); });
Ok(handle) handle
} }
/// Handle to the script-var handling system. /// Handle to the script-var handling system.
@ -54,17 +54,26 @@ pub struct ScriptVarHandlerHandle {
impl ScriptVarHandlerHandle { impl ScriptVarHandlerHandle {
/// Add a new script-var that should be executed. /// Add a new script-var that should be executed.
pub fn add(&self, script_var: config::ScriptVar) { pub fn add(&self, script_var: config::ScriptVar) {
self.msg_send.send(ScriptVarHandlerMsg::AddVar(script_var)).unwrap(); crate::print_result_err!(
"while forwarding instruction to script-var handler",
self.msg_send.send(ScriptVarHandlerMsg::AddVar(script_var))
);
} }
/// Stop the execution of a specific script-var. /// Stop the execution of a specific script-var.
pub fn stop_for_variable(&self, name: VarName) { pub fn stop_for_variable(&self, name: VarName) {
self.msg_send.send(ScriptVarHandlerMsg::Stop(name)).unwrap(); crate::print_result_err!(
"while forwarding instruction to script-var handler",
self.msg_send.send(ScriptVarHandlerMsg::Stop(name)),
);
} }
/// Stop the execution of all script-vars. /// Stop the execution of all script-vars.
pub fn stop_all(&self) { pub fn stop_all(&self) {
self.msg_send.send(ScriptVarHandlerMsg::StopAll).unwrap(); crate::print_result_err!(
"while forwarding instruction to script-var handler",
self.msg_send.send(ScriptVarHandlerMsg::StopAll)
);
} }
} }
@ -177,24 +186,25 @@ impl TailVarHandler {
let evt_send = self.evt_send.clone(); let evt_send = self.evt_send.clone();
tokio::spawn(async move { tokio::spawn(async move {
crate::try_logging_errors!(format!("Executing tail var command {}", &var.command) => {
let mut handle = tokio::process::Command::new("sh") let mut handle = tokio::process::Command::new("sh")
.args(&["-c", &var.command]) .args(&["-c", &var.command])
.stdout(std::process::Stdio::piped()) .stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::inherit()) .stderr(std::process::Stdio::inherit())
.stdin(std::process::Stdio::null()) .stdin(std::process::Stdio::null())
.spawn() .spawn()?;
.unwrap();
let mut stdout_lines = BufReader::new(handle.stdout.take().unwrap()).lines(); let mut stdout_lines = BufReader::new(handle.stdout.take().unwrap()).lines();
crate::loop_select_exiting! { crate::loop_select_exiting! {
_ = handle.wait() => break, _ = handle.wait() => break,
_ = cancellation_token.cancelled() => break, _ = cancellation_token.cancelled() => break,
Ok(Some(line)) = stdout_lines.next_line() => { Ok(Some(line)) = stdout_lines.next_line() => {
let new_value = PrimitiveValue::from_string(line.to_owned()); let new_value = PrimitiveValue::from_string(line.to_owned());
evt_send.send(EwwCommand::UpdateVars(vec![(var.name.to_owned(), new_value)])).unwrap(); evt_send.send(EwwCommand::UpdateVars(vec![(var.name.to_owned(), new_value)]))?;
} }
else => break, else => break,
} }
handle.kill().await.unwrap(); let _ = handle.kill().await;
});
}); });
} }

View file

@ -1,7 +1,12 @@
use crate::{app, config, eww_state::*, opts, script_var_handler, try_logging_errors, util}; use crate::{app, config, eww_state::*, opts, script_var_handler, try_logging_errors, util};
use anyhow::*; use anyhow::*;
use futures_util::StreamExt; use futures_util::StreamExt;
use std::{collections::HashMap, os::unix::io::AsRawFd, path::Path, time::Duration}; use std::{
collections::HashMap,
os::unix::io::AsRawFd,
path::{Path, PathBuf},
time::Duration,
};
use tokio::{ use tokio::{
io::{AsyncReadExt, AsyncWriteExt}, io::{AsyncReadExt, AsyncWriteExt},
sync::mpsc::*, sync::mpsc::*,
@ -16,7 +21,10 @@ pub fn initialize_server(should_detach: bool, action: opts::ActionWithServer) ->
simple_signal::set_handler(&[simple_signal::Signal::Int, simple_signal::Signal::Term], move |_| { simple_signal::set_handler(&[simple_signal::Signal::Int, simple_signal::Signal::Term], move |_| {
println!("Shutting down eww daemon..."); println!("Shutting down eww daemon...");
crate::application_lifecycle::send_exit().unwrap(); if let Err(e) = crate::application_lifecycle::send_exit() {
eprintln!("Failed to send application shutdown event to workers: {:?}", e);
std::process::exit(1);
}
}); });
let config_file_path = crate::CONFIG_DIR.join("eww.xml"); let config_file_path = crate::CONFIG_DIR.join("eww.xml");
@ -31,10 +39,9 @@ pub fn initialize_server(should_detach: bool, action: opts::ActionWithServer) ->
gtk::init()?; gtk::init()?;
let (ui_send, mut ui_recv) = tokio::sync::mpsc::unbounded_channel(); let (ui_send, mut ui_recv) = tokio::sync::mpsc::unbounded_channel();
let glib_context = glib::MainContext::default();
log::info!("Initializing script var handler"); log::info!("Initializing script var handler");
let script_var_handler = script_var_handler::init(ui_send.clone())?; let script_var_handler = script_var_handler::init(ui_send.clone());
let mut app = app::App { let mut app = app::App {
eww_state: EwwState::from_default_vars(eww_config.generate_initial_state()?), eww_state: EwwState::from_default_vars(eww_config.generate_initial_state()?),
@ -58,49 +65,10 @@ pub fn initialize_server(should_detach: bool, action: opts::ActionWithServer) ->
let (command, maybe_response_recv) = action.into_eww_command(); let (command, maybe_response_recv) = action.into_eww_command();
app.handle_command(command); app.handle_command(command);
std::thread::spawn(move || { // initialize all the handlers and tasks running asyncronously
let rt = tokio::runtime::Builder::new_multi_thread() init_async_part(config_file_path, scss_file_path, maybe_response_recv, ui_send);
.enable_all()
.build()
.expect("Failed to initialize tokio runtime");
rt.block_on(async {
// print out the response of this initial command, if there is any
tokio::spawn(async {
if let Some(mut response_recv) = maybe_response_recv {
if let Ok(Some(response)) = tokio::time::timeout(Duration::from_millis(100), response_recv.recv()).await {
println!("{}", response);
}
}
});
let filewatch_join_handle = { glib::MainContext::default().spawn_local(async move {
let ui_send = ui_send.clone();
tokio::spawn(async move { run_filewatch_thingy(config_file_path, scss_file_path, ui_send).await })
};
let ipc_server_join_handle = {
let ui_send = ui_send.clone();
tokio::spawn(async move { async_server(ui_send).await })
};
let forward_lifecycle_to_app_handle = {
let ui_send = ui_send.clone();
tokio::spawn(async move {
while let Ok(()) = crate::application_lifecycle::recv_exit().await {
let _ = ui_send.send(app::EwwCommand::KillServer);
}
})
};
let result = tokio::try_join!(filewatch_join_handle, ipc_server_join_handle, forward_lifecycle_to_app_handle);
if let Err(e) = result {
eprintln!("Eww exiting with error: {:?}", e);
}
})
});
glib_context.spawn_local(async move {
while let Some(event) = ui_recv.recv().await { while let Some(event) = ui_recv.recv().await {
app.handle_command(event); app.handle_command(event);
} }
@ -112,7 +80,58 @@ pub fn initialize_server(should_detach: bool, action: opts::ActionWithServer) ->
Ok(()) Ok(())
} }
async fn async_server(evt_send: UnboundedSender<app::EwwCommand>) -> Result<()> { fn init_async_part(
config_file_path: PathBuf,
scss_file_path: PathBuf,
maybe_response_recv: Option<UnboundedReceiver<String>>,
ui_send: UnboundedSender<app::EwwCommand>,
) {
std::thread::spawn(move || {
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.expect("Failed to initialize tokio runtime");
rt.block_on(async {
// TODO This really does not belong here at all :<
// print out the response of this initial command, if there is any
tokio::spawn(async {
if let Some(mut response_recv) = maybe_response_recv {
if let Ok(Some(response)) = tokio::time::timeout(Duration::from_millis(100), response_recv.recv()).await {
println!("{}", response);
}
}
});
let filewatch_join_handle = {
let ui_send = ui_send.clone();
tokio::spawn(async move { run_filewatch(config_file_path, scss_file_path, ui_send).await })
};
let ipc_server_join_handle = {
let ui_send = ui_send.clone();
tokio::spawn(async move { run_ipc_server(ui_send).await })
};
let forward_exit_to_app_handle = {
let ui_send = ui_send.clone();
tokio::spawn(async move {
// Wait for application exit event
let _ = crate::application_lifecycle::recv_exit().await;
// Then forward that to the application
let _ = ui_send.send(app::EwwCommand::KillServer);
})
};
let result = tokio::try_join!(filewatch_join_handle, ipc_server_join_handle, forward_exit_to_app_handle);
if let Err(e) = result {
eprintln!("Eww exiting with error: {:?}", e);
}
})
});
}
async fn run_ipc_server(evt_send: UnboundedSender<app::EwwCommand>) -> Result<()> {
let listener = tokio::net::UnixListener::bind(&*crate::IPC_SOCKET_PATH)?; let listener = tokio::net::UnixListener::bind(&*crate::IPC_SOCKET_PATH)?;
crate::loop_select_exiting! { crate::loop_select_exiting! {
connection = listener.accept() => match connection { connection = listener.accept() => match connection {
@ -123,6 +142,7 @@ async fn async_server(evt_send: UnboundedSender<app::EwwCommand>) -> Result<()>
Ok(()) Ok(())
} }
/// Handle a single IPC connection from start to end.
async fn handle_connection(mut stream: tokio::net::UnixStream, evt_send: UnboundedSender<app::EwwCommand>) -> Result<()> { async fn handle_connection(mut stream: tokio::net::UnixStream, evt_send: UnboundedSender<app::EwwCommand>) -> Result<()> {
let (mut stream_read, mut stream_write) = stream.split(); let (mut stream_read, mut stream_write) = stream.split();
@ -149,7 +169,8 @@ async fn handle_connection(mut stream: tokio::net::UnixStream, evt_send: Unbound
Ok(()) Ok(())
} }
async fn run_filewatch_thingy<P: AsRef<Path>>( /// Watch configuration files for changes, sending reload events to the eww app when the files change.
async fn run_filewatch<P: AsRef<Path>>(
config_file_path: P, config_file_path: P,
scss_file_path: P, scss_file_path: P,
evt_send: UnboundedSender<app::EwwCommand>, evt_send: UnboundedSender<app::EwwCommand>,
@ -167,29 +188,26 @@ async fn run_filewatch_thingy<P: AsRef<Path>>(
crate::loop_select_exiting! { crate::loop_select_exiting! {
Some(Ok(event)) = event_stream.next() => { Some(Ok(event)) = event_stream.next() => {
if event.wd == config_file_descriptor {
try_logging_errors!("handling change of config file" => { try_logging_errors!("handling change of config file" => {
if event.wd == config_file_descriptor {
log::info!("Reloading eww configuration"); log::info!("Reloading eww configuration");
let new_eww_config = config::EwwConfig::read_from_file(config_file_path.as_ref())?; let new_eww_config = config::EwwConfig::read_from_file(config_file_path.as_ref())?;
evt_send.send(app::EwwCommand::ReloadConfig(new_eww_config))?; evt_send.send(app::EwwCommand::ReloadConfig(new_eww_config))?;
});
} else if event.wd == scss_file_descriptor { } else if event.wd == scss_file_descriptor {
try_logging_errors!("handling change of scss file" => {
log::info!("reloading eww css file"); log::info!("reloading eww css file");
let eww_css = crate::util::parse_scss_from_file(scss_file_path.as_ref())?; let eww_css = crate::util::parse_scss_from_file(scss_file_path.as_ref())?;
evt_send.send(app::EwwCommand::ReloadCss(eww_css))?; evt_send.send(app::EwwCommand::ReloadCss(eww_css))?;
});
} else { } else {
eprintln!("Got inotify event for unknown thing: {:?}", event); eprintln!("Got inotify event for unknown thing: {:?}", event);
} }
});
} }
else => break, else => break,
} }
Ok(()) Ok(())
} }
/// detach the process from the terminal, also redirecting stdout and stderr to /// detach the process from the terminal, also redirecting stdout and stderr to LOG_FILE
/// LOG_FILE
fn do_detach() -> Result<()> { fn do_detach() -> Result<()> {
// detach from terminal // detach from terminal
match unsafe { nix::unistd::fork()? } { match unsafe { nix::unistd::fork()? } {

View file

@ -22,7 +22,7 @@ macro_rules! impl_try_from {
#[macro_export] #[macro_export]
macro_rules! try_logging_errors { macro_rules! try_logging_errors {
($context:literal => $code:block) => {{ ($context:expr => $code:block) => {{
let result: Result<_> = try { $code }; let result: Result<_> = try { $code };
if let Err(err) = result { if let Err(err) = result {
eprintln!("[{}:{}] Error while {}: {:?}", ::std::file!(), ::std::line!(), $context, err); eprintln!("[{}:{}] Error while {}: {:?}", ::std::file!(), ::std::line!(), $context, err);