More work done
This commit is contained in:
parent
542dd62d7b
commit
b4880a55b8
9 changed files with 2508 additions and 133 deletions
2348
Cargo.lock
generated
Normal file
2348
Cargo.lock
generated
Normal file
File diff suppressed because it is too large
Load diff
|
@ -46,12 +46,13 @@ simple-signal = "1.1"
|
|||
dashmap = "3.11"
|
||||
unescape = "0.1"
|
||||
|
||||
tokio = { version = "0.3", features = ["full"] }
|
||||
tokio = { version = "1.0", features = ["full"] }
|
||||
tokio-stream = "0.1"
|
||||
async-stream = "0.3"
|
||||
futures-core = "0.3"
|
||||
futures-util = "0.3"
|
||||
inotify = "0.9"
|
||||
tokio-util = "0.6"
|
||||
|
||||
[dev-dependencies]
|
||||
pretty_assertions = "0.6.1"
|
||||
|
|
14
src/app.rs
14
src/app.rs
|
@ -31,8 +31,8 @@ pub enum EwwCommand {
|
|||
},
|
||||
KillServer,
|
||||
CloseAll,
|
||||
PrintState(crossbeam_channel::Sender<String>),
|
||||
PrintDebug(crossbeam_channel::Sender<String>),
|
||||
PrintState(tokio::sync::mpsc::UnboundedSender<String>),
|
||||
PrintDebug(tokio::sync::mpsc::UnboundedSender<String>),
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
|
@ -56,7 +56,7 @@ pub struct App {
|
|||
pub css_provider: gtk::CssProvider,
|
||||
pub app_evt_send: UnboundedSender<EwwCommand>,
|
||||
#[debug_stub = "ScriptVarHandler(...)"]
|
||||
pub script_var_handler: ScriptVarHandler,
|
||||
pub script_var_handler: ScriptVarHandlerHandle,
|
||||
}
|
||||
|
||||
impl App {
|
||||
|
@ -80,7 +80,7 @@ impl App {
|
|||
log::info!("Received kill command, stopping server!");
|
||||
self.script_var_handler.stop_all();
|
||||
self.windows.drain().for_each(|(_, w)| w.close());
|
||||
script_var_process::on_application_death();
|
||||
// script_var_process::on_application_death();
|
||||
std::process::exit(0);
|
||||
}
|
||||
EwwCommand::CloseAll => {
|
||||
|
@ -140,11 +140,7 @@ impl App {
|
|||
.filter(|var| !currently_used_vars.contains(*var))
|
||||
{
|
||||
println!("stopping for {}", &unused_var);
|
||||
let result = self.script_var_handler.stop_for_variable(unused_var);
|
||||
crate::print_result_err!(
|
||||
"While stopping script-var processes while cleaning up after the last window referencing them closed",
|
||||
&result
|
||||
);
|
||||
self.script_var_handler.stop_for_variable(unused_var.clone());
|
||||
}
|
||||
|
||||
window.close();
|
||||
|
|
|
@ -29,7 +29,9 @@ pub fn forward_command_to_server(mut stream: UnixStream, action: opts::ActionWit
|
|||
|
||||
let mut buf = String::new();
|
||||
stream.set_read_timeout(Some(std::time::Duration::from_millis(100)))?;
|
||||
stream.read_to_string(&mut buf)?;
|
||||
stream
|
||||
.read_to_string(&mut buf)
|
||||
.context("Error reading response from server")?;
|
||||
if !buf.is_empty() {
|
||||
println!("{}", buf);
|
||||
}
|
||||
|
|
|
@ -6,7 +6,7 @@ use crate::ensure_xml_tag_is;
|
|||
|
||||
use super::*;
|
||||
|
||||
#[derive(Clone, Debug, PartialEq)]
|
||||
#[derive(Clone, Debug, PartialEq, Eq)]
|
||||
pub struct PollScriptVar {
|
||||
pub name: VarName,
|
||||
pub command: String,
|
||||
|
@ -19,13 +19,13 @@ impl PollScriptVar {
|
|||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq)]
|
||||
#[derive(Clone, Debug, PartialEq, Eq)]
|
||||
pub struct TailScriptVar {
|
||||
pub name: VarName,
|
||||
pub command: String,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq)]
|
||||
#[derive(Clone, Debug, PartialEq, Eq)]
|
||||
pub enum ScriptVar {
|
||||
Poll(PollScriptVar),
|
||||
Tail(TailScriptVar),
|
||||
|
|
|
@ -126,7 +126,7 @@ fn parse_var_update_arg(s: &str) -> Result<(VarName, PrimitiveValue)> {
|
|||
}
|
||||
|
||||
impl ActionWithServer {
|
||||
pub fn into_eww_command(self) -> (app::EwwCommand, Option<crossbeam_channel::Receiver<String>>) {
|
||||
pub fn into_eww_command(self) -> (app::EwwCommand, Option<tokio::sync::mpsc::UnboundedReceiver<String>>) {
|
||||
let command = match self {
|
||||
ActionWithServer::Daemon => app::EwwCommand::NoOp,
|
||||
ActionWithServer::Update { mappings } => app::EwwCommand::UpdateVars(mappings.into_iter().collect()),
|
||||
|
@ -145,11 +145,11 @@ impl ActionWithServer {
|
|||
ActionWithServer::KillServer => app::EwwCommand::KillServer,
|
||||
ActionWithServer::CloseAll => app::EwwCommand::CloseAll,
|
||||
ActionWithServer::ShowState => {
|
||||
let (send, recv) = crossbeam_channel::unbounded();
|
||||
let (send, recv) = tokio::sync::mpsc::unbounded_channel();
|
||||
return (app::EwwCommand::PrintState(send), Some(recv));
|
||||
}
|
||||
ActionWithServer::ShowDebug => {
|
||||
let (send, recv) = crossbeam_channel::unbounded();
|
||||
let (send, recv) = tokio::sync::mpsc::unbounded_channel();
|
||||
return (app::EwwCommand::PrintDebug(send), Some(recv));
|
||||
}
|
||||
};
|
||||
|
|
|
@ -1,8 +1,4 @@
|
|||
use std::{
|
||||
collections::HashMap,
|
||||
sync::{Arc, RwLock},
|
||||
time::Duration,
|
||||
};
|
||||
use std::collections::HashMap;
|
||||
|
||||
use crate::{
|
||||
app, config,
|
||||
|
@ -11,30 +7,78 @@ use crate::{
|
|||
use anyhow::*;
|
||||
use app::EwwCommand;
|
||||
|
||||
use dashmap::DashMap;
|
||||
use std::io::BufRead;
|
||||
use tokio::{
|
||||
io::{AsyncBufReadExt, BufReader},
|
||||
sync::mpsc::UnboundedSender,
|
||||
};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
use self::script_var_process::ScriptVarProcess;
|
||||
use tokio::sync::mpsc::UnboundedSender;
|
||||
#[derive(Debug, Eq, PartialEq)]
|
||||
enum ScriptVarHandlerMsg {
|
||||
AddVar(config::ScriptVar),
|
||||
Stop(VarName),
|
||||
StopAll,
|
||||
}
|
||||
|
||||
pub struct ScriptVarHandlerHandle {
|
||||
msg_send: UnboundedSender<ScriptVarHandlerMsg>,
|
||||
}
|
||||
|
||||
impl ScriptVarHandlerHandle {
|
||||
pub fn add(&self, script_var: config::ScriptVar) {
|
||||
self.msg_send.send(ScriptVarHandlerMsg::AddVar(script_var)).unwrap();
|
||||
}
|
||||
|
||||
pub fn stop_for_variable(&self, name: VarName) {
|
||||
self.msg_send.send(ScriptVarHandlerMsg::Stop(name)).unwrap();
|
||||
}
|
||||
|
||||
pub fn stop_all(&self) {
|
||||
self.msg_send.send(ScriptVarHandlerMsg::StopAll).unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
/// Handler that manages running and updating [ScriptVar]s
|
||||
pub struct ScriptVarHandler {
|
||||
struct ScriptVarHandler {
|
||||
tail_handler: TailVarHandler,
|
||||
poll_handler: PollVarHandler,
|
||||
}
|
||||
|
||||
impl ScriptVarHandler {
|
||||
pub fn new(evt_send: UnboundedSender<EwwCommand>) -> Result<Self> {
|
||||
Ok(ScriptVarHandler {
|
||||
tail_handler: TailVarHandler::new(evt_send.clone())?,
|
||||
poll_handler: PollVarHandler::new(evt_send)?,
|
||||
pub fn init(evt_send: UnboundedSender<EwwCommand>) -> Result<ScriptVarHandlerHandle> {
|
||||
let (msg_send, mut msg_recv) = tokio::sync::mpsc::unbounded_channel();
|
||||
let handle = ScriptVarHandlerHandle { msg_send };
|
||||
std::thread::spawn(move || {
|
||||
let rt = tokio::runtime::Runtime::new().unwrap();
|
||||
rt.block_on(async {
|
||||
let _: Result<_> = try {
|
||||
let mut handler = ScriptVarHandler {
|
||||
tail_handler: TailVarHandler::new(evt_send.clone())?,
|
||||
poll_handler: PollVarHandler::new(evt_send)?,
|
||||
};
|
||||
while let Some(msg) = msg_recv.recv().await {
|
||||
match msg {
|
||||
ScriptVarHandlerMsg::AddVar(var) => {
|
||||
handler.add(var).await;
|
||||
}
|
||||
ScriptVarHandlerMsg::Stop(name) => {
|
||||
handler.stop_for_variable(&name)?;
|
||||
}
|
||||
ScriptVarHandlerMsg::StopAll => {
|
||||
handler.stop_all();
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
})
|
||||
}
|
||||
});
|
||||
Ok(handle)
|
||||
}
|
||||
|
||||
pub fn add(&mut self, script_var: config::ScriptVar) {
|
||||
impl ScriptVarHandler {
|
||||
pub async fn add(&mut self, script_var: config::ScriptVar) {
|
||||
match script_var {
|
||||
config::ScriptVar::Poll(var) => self.poll_handler.start(&var),
|
||||
config::ScriptVar::Tail(var) => self.tail_handler.start(&var),
|
||||
config::ScriptVar::Poll(var) => self.poll_handler.start(var).await,
|
||||
config::ScriptVar::Tail(var) => self.tail_handler.start(var).await,
|
||||
};
|
||||
}
|
||||
|
||||
|
@ -62,136 +106,103 @@ impl Drop for ScriptVarHandler {
|
|||
|
||||
struct PollVarHandler {
|
||||
evt_send: UnboundedSender<EwwCommand>,
|
||||
poll_handles: HashMap<VarName, scheduled_executor::executor::TaskHandle>,
|
||||
poll_executor: scheduled_executor::CoreExecutor,
|
||||
poll_handles: HashMap<VarName, CancellationToken>,
|
||||
}
|
||||
|
||||
impl PollVarHandler {
|
||||
fn new(evt_send: UnboundedSender<EwwCommand>) -> Result<Self> {
|
||||
Ok(PollVarHandler {
|
||||
let handler = PollVarHandler {
|
||||
evt_send,
|
||||
poll_handles: HashMap::new(),
|
||||
poll_executor: scheduled_executor::CoreExecutor::new()?,
|
||||
})
|
||||
};
|
||||
Ok(handler)
|
||||
}
|
||||
|
||||
fn start(&mut self, var: &config::PollScriptVar) {
|
||||
async fn start(&mut self, var: config::PollScriptVar) {
|
||||
log::debug!("starting poll var {}", &var.name);
|
||||
let cancellation_token = CancellationToken::new();
|
||||
self.poll_handles.insert(var.name.clone(), cancellation_token.clone());
|
||||
let evt_send = self.evt_send.clone();
|
||||
let handle = self.poll_executor.schedule_fixed_interval(
|
||||
Duration::from_secs(0),
|
||||
var.interval,
|
||||
glib::clone!(@strong var => move |_| {
|
||||
let result: Result<_> = try {
|
||||
evt_send.send(app::EwwCommand::UpdateVars(vec![(var.name.clone(), var.run_once()?)]))?;
|
||||
};
|
||||
crate::print_result_err!("while running script-var command", &result);
|
||||
}),
|
||||
);
|
||||
self.poll_handles.insert(var.name.clone(), handle);
|
||||
tokio::spawn(async move {
|
||||
crate::loop_select! {
|
||||
_ = cancellation_token.cancelled() => break,
|
||||
_ = tokio::time::sleep(var.interval) => {
|
||||
let result: Result<_> = try {
|
||||
evt_send.send(app::EwwCommand::UpdateVars(vec![(var.name.clone(), var.run_once()?)]))?;
|
||||
};
|
||||
crate::print_result_err!("while running script-var command", &result);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
pub fn stop_for_variable(&mut self, name: &VarName) -> Result<()> {
|
||||
if let Some(handle) = self.poll_handles.remove(name) {
|
||||
if let Some(token) = self.poll_handles.remove(name) {
|
||||
log::debug!("stopped poll var {}", name);
|
||||
handle.stop();
|
||||
token.cancel()
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn stop_all(&mut self) {
|
||||
self.poll_handles.drain().for_each(|(_, handle)| handle.stop());
|
||||
self.poll_handles.drain().for_each(|(_, token)| token.cancel());
|
||||
}
|
||||
}
|
||||
|
||||
struct TailVarHandler {
|
||||
evt_send: UnboundedSender<EwwCommand>,
|
||||
tail_handler_thread: Option<stoppable_thread::StoppableHandle<()>>,
|
||||
tail_process_handles: Arc<DashMap<VarName, script_var_process::ScriptVarProcess>>,
|
||||
tail_sources: Arc<RwLock<popol::Sources<VarName>>>,
|
||||
tail_process_handles: HashMap<VarName, CancellationToken>,
|
||||
}
|
||||
|
||||
impl TailVarHandler {
|
||||
fn new(evt_send: UnboundedSender<EwwCommand>) -> Result<Self> {
|
||||
let mut handler = TailVarHandler {
|
||||
let handler = TailVarHandler {
|
||||
evt_send,
|
||||
tail_handler_thread: None,
|
||||
tail_process_handles: Arc::new(DashMap::new()),
|
||||
tail_sources: Arc::new(RwLock::new(popol::Sources::new())),
|
||||
tail_process_handles: HashMap::new(),
|
||||
};
|
||||
handler.setup_tail_tasks()?;
|
||||
Ok(handler)
|
||||
}
|
||||
|
||||
fn setup_tail_tasks(&mut self) -> Result<()> {
|
||||
log::info!("initializing handler for tail script vars");
|
||||
async fn start(&mut self, var: config::TailScriptVar) {
|
||||
log::debug!("starting poll var {}", &var.name);
|
||||
let cancellation_token = CancellationToken::new();
|
||||
self.tail_process_handles.insert(var.name.clone(), cancellation_token.clone());
|
||||
|
||||
let mut events = popol::Events::<VarName>::new();
|
||||
let evt_send = self.evt_send.clone();
|
||||
|
||||
// TODO all of this is rather ugly
|
||||
let script_var_processes = self.tail_process_handles.clone();
|
||||
let sources = self.tail_sources.clone();
|
||||
let thread_handle = stoppable_thread::spawn(move |stopped| {
|
||||
while !stopped.get() {
|
||||
let result: Result<_> = try {
|
||||
{
|
||||
let _ = sources
|
||||
.write()
|
||||
.unwrap()
|
||||
.wait_timeout(&mut events, std::time::Duration::from_millis(50));
|
||||
}
|
||||
for (var_name, event) in events.iter() {
|
||||
if event.readable {
|
||||
let mut handle = script_var_processes
|
||||
.get_mut(var_name)
|
||||
.with_context(|| format!("No command output handle found for variable '{}'", var_name))?;
|
||||
let mut buffer = String::new();
|
||||
handle.stdout_reader.read_line(&mut buffer)?;
|
||||
evt_send.send(EwwCommand::UpdateVars(vec![(
|
||||
var_name.to_owned(),
|
||||
PrimitiveValue::from_string(buffer.trim_matches('\n').to_owned()),
|
||||
)]))?;
|
||||
} else if event.hangup {
|
||||
script_var_processes.remove(var_name);
|
||||
sources.write().unwrap().unregister(var_name);
|
||||
}
|
||||
}
|
||||
};
|
||||
crate::print_result_err!("in script-var tail handler thread", &result);
|
||||
tokio::spawn(async move {
|
||||
let mut handle = tokio::process::Command::new("sh")
|
||||
.args(&["-c", &var.command])
|
||||
.stdout(std::process::Stdio::piped())
|
||||
.stderr(std::process::Stdio::inherit())
|
||||
.stdin(std::process::Stdio::null())
|
||||
.spawn()
|
||||
.unwrap();
|
||||
let mut stdout_lines = BufReader::new(handle.stdout.take().unwrap()).lines();
|
||||
crate::loop_select! {
|
||||
_ = handle.wait() => break,
|
||||
_ = cancellation_token.cancelled() => break,
|
||||
line = stdout_lines.next_line() => match line {
|
||||
Ok(Some(line)) => {
|
||||
let new_value = PrimitiveValue::from_string(line.to_owned());
|
||||
evt_send.send(EwwCommand::UpdateVars(vec![(var.name.to_owned(), new_value)])).unwrap();
|
||||
},
|
||||
Ok(None) => break,
|
||||
Err(_e) => break,
|
||||
}
|
||||
}
|
||||
for process in script_var_processes.iter() {
|
||||
crate::print_result_err!("While killing tail-var process at the end of tail task", &process.kill());
|
||||
}
|
||||
script_var_processes.clear();
|
||||
});
|
||||
self.tail_handler_thread = Some(thread_handle);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn start(&mut self, var: &config::TailScriptVar) {
|
||||
match ScriptVarProcess::run(&var.command) {
|
||||
Ok(process) => {
|
||||
self.tail_sources.write().unwrap().register(
|
||||
var.name.clone(),
|
||||
process.stdout_reader.get_ref(),
|
||||
popol::interest::READ,
|
||||
);
|
||||
self.tail_process_handles.insert(var.name.clone(), process);
|
||||
}
|
||||
Err(err) => eprintln!("Failed to launch script-var command for tail: {:?}", err),
|
||||
}
|
||||
}
|
||||
|
||||
fn stop_for_variable(&mut self, name: &VarName) -> Result<()> {
|
||||
if let Some((_, process)) = self.tail_process_handles.remove(name) {
|
||||
if let Some(token) = self.tail_process_handles.remove(name) {
|
||||
log::debug!("stopped tail var {}", name);
|
||||
process.kill()?;
|
||||
token.cancel();
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn stop_all(&mut self) {
|
||||
self.tail_handler_thread.take().map(|handle| handle.stop());
|
||||
self.tail_process_handles.drain().for_each(|(_, token)| token.cancel());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
use crate::{app, config, eww_state::*, opts, script_var_handler, try_logging_errors, util};
|
||||
use anyhow::*;
|
||||
use futures_util::StreamExt;
|
||||
use std::{collections::HashMap, os::unix::io::AsRawFd, path::Path};
|
||||
use std::{collections::HashMap, os::unix::io::AsRawFd, path::Path, time::Duration};
|
||||
use tokio::{
|
||||
io::{AsyncReadExt, AsyncWriteExt},
|
||||
sync::mpsc::*,
|
||||
|
@ -16,7 +16,7 @@ pub fn initialize_server(should_detach: bool, action: opts::ActionWithServer) ->
|
|||
|
||||
simple_signal::set_handler(&[simple_signal::Signal::Int, simple_signal::Signal::Term], |_| {
|
||||
println!("Shutting down eww daemon...");
|
||||
script_var_handler::script_var_process::on_application_death();
|
||||
// script_var_handler::script_var_process::on_application_death();
|
||||
std::process::exit(0);
|
||||
});
|
||||
|
||||
|
@ -35,7 +35,7 @@ pub fn initialize_server(should_detach: bool, action: opts::ActionWithServer) ->
|
|||
let glib_context = glib::MainContext::default();
|
||||
|
||||
log::info!("Initializing script var handler");
|
||||
let script_var_handler = script_var_handler::ScriptVarHandler::new(ui_send.clone())?;
|
||||
let script_var_handler = script_var_handler::init(ui_send.clone())?;
|
||||
|
||||
let mut app = app::App {
|
||||
eww_state: EwwState::from_default_vars(eww_config.generate_initial_state()?),
|
||||
|
@ -59,14 +59,9 @@ pub fn initialize_server(should_detach: bool, action: opts::ActionWithServer) ->
|
|||
let (command, maybe_response_recv) = action.into_eww_command();
|
||||
app.handle_command(command);
|
||||
|
||||
// print out the response of this initial command, if there is any
|
||||
if let Some(response_recv) = maybe_response_recv {
|
||||
if let Ok(response) = response_recv.recv_timeout(std::time::Duration::from_millis(100)) {
|
||||
println!("{}", response);
|
||||
}
|
||||
}
|
||||
std::thread::spawn(move || {
|
||||
let rt = tokio::runtime::Builder::new_multi_thread()
|
||||
.enable_all()
|
||||
.build()
|
||||
.expect("Failed to initialize tokio runtime");
|
||||
rt.block_on(async {
|
||||
|
@ -76,11 +71,20 @@ pub fn initialize_server(should_detach: bool, action: opts::ActionWithServer) ->
|
|||
};
|
||||
let ipc_server_join_handle = tokio::spawn(async move { async_server(ui_send.clone()).await });
|
||||
|
||||
tokio::spawn(async {
|
||||
// print out the response of this initial command, if there is any
|
||||
if let Some(mut response_recv) = maybe_response_recv {
|
||||
if let Ok(Some(response)) = tokio::time::timeout(Duration::from_millis(100), response_recv.recv()).await {
|
||||
println!("{}", response);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let result = tokio::try_join!(filewatch_join_handle, ipc_server_join_handle);
|
||||
if let Err(e) = result {
|
||||
eprintln!("Eww exiting with error: {:?}", e);
|
||||
}
|
||||
});
|
||||
})
|
||||
});
|
||||
|
||||
glib_context.spawn_local(async move {
|
||||
|
@ -122,12 +126,14 @@ async fn handle_connection(mut stream: tokio::net::UnixStream, evt_send: Unbound
|
|||
|
||||
evt_send.send(command)?;
|
||||
|
||||
if let Some(response_recv) = maybe_response_recv {
|
||||
if let Ok(response) = response_recv.recv_timeout(std::time::Duration::from_millis(100)) {
|
||||
if let Some(mut response_recv) = maybe_response_recv {
|
||||
log::info!("Waiting for response for IPC client");
|
||||
if let Ok(Some(response)) = tokio::time::timeout(Duration::from_millis(100), response_recv.recv()).await {
|
||||
let result = &stream_write.write_all(response.as_bytes()).await;
|
||||
crate::print_result_err!("Sending text response to ipc client", &result);
|
||||
}
|
||||
}
|
||||
stream_write.shutdown().await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
|
11
src/util.rs
11
src/util.rs
|
@ -39,6 +39,17 @@ macro_rules! print_result_err {
|
|||
}};
|
||||
}
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! loop_select {
|
||||
($($body:tt)*) => {
|
||||
loop {
|
||||
::tokio::select! {
|
||||
$($body)*
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// read an scss file, replace all environment variable references within it and
|
||||
/// then parse it into css.
|
||||
pub fn parse_scss_from_file<P: AsRef<Path>>(path: P) -> Result<String> {
|
||||
|
|
Loading…
Add table
Reference in a new issue