From a2f9ad2b835f252bc849e9a1d78145cc7b92eb8a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gerg=C5=91=20S=C3=A1lyi?= Date: Sun, 13 Apr 2025 22:42:49 +0200 Subject: [PATCH] Change main event loop to use poll instead of the mio dependency --- Cargo.lock | 36 ++--------- Cargo.toml | 3 +- src/compositors.rs | 16 ++--- src/main.rs | 141 ++++++++++++++++++++---------------------- src/poll.rs | 149 +++++++++++++++++++++++++++++++++++++++++++++ src/signal.rs | 21 +++---- 6 files changed, 237 insertions(+), 129 deletions(-) create mode 100644 src/poll.rs diff --git a/Cargo.lock b/Cargo.lock index 1e52215..d38bd36 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -65,7 +65,7 @@ version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "79947af37f4177cfead1110013d678905c37501914fba0efea834c3fe9a8d60c" dependencies = [ - "windows-sys 0.59.0", + "windows-sys", ] [[package]] @@ -76,7 +76,7 @@ checksum = "ca3534e77181a9cc07539ad51f2141fe32f6c3ffd4df76db8ad92346b003ae4e" dependencies = [ "anstyle", "once_cell", - "windows-sys 0.59.0", + "windows-sys", ] [[package]] @@ -357,7 +357,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "976dd42dc7e85965fe702eb8164f21f450704bdde31faefd6471dba214cb594e" dependencies = [ "libc", - "windows-sys 0.59.0", + "windows-sys", ] [[package]] @@ -595,18 +595,6 @@ dependencies = [ "simd-adler32", ] -[[package]] -name = "mio" -version = "1.0.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2886843bf800fba2e3377cff24abf6379b4c4d5c6681eaf9ea5b0d15090450bd" -dependencies = [ - "libc", - "log", - "wasi", - "windows-sys 0.52.0", -] - [[package]] name = "mp4parse" version = "0.17.0" @@ -632,7 +620,6 @@ dependencies = [ "image", "libc", "log", - "mio", "niri-ipc", "rustix", "serde", @@ -832,7 +819,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys", - "windows-sys 0.59.0", + "windows-sys", ] [[package]] @@ -1076,12 +1063,6 @@ version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" -[[package]] -name = "wasi" -version = "0.11.0+wasi-snapshot-preview1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" - [[package]] name = "wayland-backend" version = "0.3.8" @@ -1180,15 +1161,6 @@ version = "0.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "53a85b86a771b1c87058196170769dd264f66c0782acf1ae6cc51bfd64b39082" -[[package]] -name = "windows-sys" -version = "0.52.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" -dependencies = [ - "windows-targets", -] - [[package]] name = "windows-sys" version = "0.59.0" diff --git a/Cargo.toml b/Cargo.toml index 0981da3..372d2e9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,8 +20,7 @@ env_logger = "0.11.3" fast_image_resize = "5.0.0" libc = "0.2.171" log = "0.4.21" -mio = { version = "1.0.2", features = ["os-ext", "os-poll"] } -rustix = "0.38.44" +rustix = {version = "0.38.44", features = ["event", "pipe"] } serde = { version = "1.0.219", features = ["derive"] } serde_json = "1.0.140" swayipc = "3.0.2" diff --git a/src/compositors.rs b/src/compositors.rs index 3eeafa5..3181d45 100644 --- a/src/compositors.rs +++ b/src/compositors.rs @@ -2,15 +2,17 @@ mod hyprland; mod niri; mod sway; -use std::{env, os::unix::ffi::OsStrExt}; - -use log::{debug, warn}; -use mio::Waker; use std::{ + env, + os::unix::ffi::OsStrExt, sync::{mpsc::Sender, Arc}, thread::spawn, }; +use log::{debug, warn}; + +use crate::poll::Waker; + #[derive(Clone, Copy, Debug, clap::ValueEnum)] pub enum Compositor { Hyprland, @@ -88,7 +90,7 @@ impl EventSender { fn send(&self, workspace: WorkspaceVisible) { self.tx.send(workspace).unwrap(); - self.waker.wake().unwrap(); + self.waker.wake(); } } @@ -157,7 +159,7 @@ impl ConnectionTask { }) .unwrap(); - self.waker.wake().unwrap(); + self.waker.wake(); } } @@ -170,7 +172,7 @@ impl ConnectionTask { }) .unwrap(); - self.waker.wake().unwrap(); + self.waker.wake(); } } } diff --git a/src/main.rs b/src/main.rs index ddab02a..4e75ca3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,12 +1,13 @@ mod compositors; mod cli; mod image; +mod poll; mod signal; mod wayland; use std::{ io, - os::fd::AsRawFd, + os::fd::AsFd, path::{Path, PathBuf}, sync::{ Arc, @@ -15,10 +16,10 @@ use std::{ }; use clap::Parser; -use log::{debug, error, info}; -use mio::{ - Events, Interest, Poll, Token, Waker, - unix::SourceFd, +use log::{debug, error, info, warn}; +use rustix::{ + event::{poll, PollFd, PollFlags}, + io::retry_on_intr, }; use smithay_client_toolkit::{ compositor::CompositorState, @@ -39,6 +40,7 @@ use smithay_client_toolkit::reexports::protocols use crate::{ cli::{Cli, PixelFormat}, compositors::{Compositor, ConnectionTask, WorkspaceVisible}, + poll::{Poll, Waker}, signal::SignalPipe, wayland::BackgroundLayer, }; @@ -120,9 +122,8 @@ fn run() -> anyhow::Result<()> { .bind_one(&qh, 1..=1, ()).expect("wp_viewporter not available"); // Sync tools for sway ipc tasks - let mut poll = Poll::new().unwrap(); - let waker = Arc::new(Waker::new(poll.registry(), SWAY).unwrap()); let (tx, rx) = channel(); + let waker = Arc::new(Waker::new().unwrap()); let compositor = cli.compositor .or_else(Compositor::from_env) @@ -156,60 +157,31 @@ fn run() -> anyhow::Result<()> { // Main event loop // ******************************** - let mut events = Events::with_capacity(16); - - const WAYLAND: Token = Token(0); - let read_guard = event_queue.prepare_read().unwrap(); - let wayland_socket_fd = read_guard.connection_fd().as_raw_fd(); - poll.registry().register( - &mut SourceFd(&wayland_socket_fd), - WAYLAND, - Interest::READABLE - ).unwrap(); - drop(read_guard); - - const SWAY: Token = Token(1); - ConnectionTask::spawn_subscribe_event_loop(compositor, tx, waker); - - const SIGNAL: Token = Token(2); - let signal_pipe = match SignalPipe::new() { - Ok(signal_pipe) => { - poll.registry().register( - &mut SourceFd(&signal_pipe.as_raw_fd()), - SIGNAL, - Interest::READABLE - ).unwrap(); - Some(signal_pipe) - }, - Err(e) => { - error!("Failed to set up signal handling: {e}"); - None - } - }; + let mut poll = Poll::with_capacity(3); + let token_wayland = poll.add_readable(&conn); + ConnectionTask::spawn_subscribe_event_loop(compositor, tx, waker.clone()); + let token_compositor = poll.add_readable(&waker); + let signal_pipe = SignalPipe::new() + .map_err(|e| error!("Failed to set up signal handling: {e}")) + .ok(); + let token_signal = signal_pipe.as_ref().map(|pipe| poll.add_readable(pipe)); loop { - event_queue.flush().unwrap(); - event_queue.dispatch_pending(&mut state).unwrap(); - let mut read_guard_option = Some(event_queue.prepare_read().unwrap()); - - if let Err(poll_error) = poll.poll(&mut events, None) { - if poll_error.kind() == io::ErrorKind::Interrupted { - continue; - } - else { - panic!("Main event loop poll failed: {:?}", poll_error); - } + flush_blocking(&event_queue); + let read_guard = ensure_prepare_read(&mut state, &mut event_queue); + poll.poll().expect("Main event loop poll failed"); + if poll.ready(token_wayland) { + handle_wayland_event(&mut state, &mut event_queue, read_guard); + } else { + drop(read_guard); } - - for event in events.iter() { - match event.token() { - WAYLAND => handle_wayland_event( - &mut state, - &mut read_guard_option, - &mut event_queue - ), - SWAY => handle_sway_event(&mut state, &rx), - SIGNAL => match signal_pipe.as_ref().unwrap().read() { + if poll.ready(token_compositor) { + waker.read(); + handle_sway_event(&mut state, &rx); + } + if let Some(token_signal) = token_signal { + if poll.ready(token_signal) { + match signal_pipe.as_ref().unwrap().read() { Err(e) => error!("Failed to read the signal pipe: {e}"), Ok(signal_flags) => { if let Some(signal) = signal_flags.any_termination() { @@ -222,33 +194,50 @@ fn run() -> anyhow::Result<()> { reserved for future functionality"); } }, - }, - _ => unreachable!() + } } } } } +fn flush_blocking(event_queue: &EventQueue) { + loop { + let result = event_queue.flush(); + if result.is_ok() { return } + if let Err(WaylandError::Io(io_error)) = &result { + if io_error.kind() == io::ErrorKind::WouldBlock { + warn!("Wayland flush needs to block"); + let mut poll_fds = [PollFd::from_borrowed_fd( + event_queue.as_fd(), + PollFlags::OUT, + )]; + retry_on_intr(|| poll(&mut poll_fds, -1)).unwrap(); + continue + } + } + result.expect("Failed to flush Wayland event queue"); + } +} + +fn ensure_prepare_read( + state: &mut State, + event_queue: &mut EventQueue +) -> ReadEventsGuard { + loop { + if let Some(guard) = event_queue.prepare_read() { return guard } + event_queue.dispatch_pending(state) + .expect("Failed to dispatch pending Wayland events"); + } +} + fn handle_wayland_event( state: &mut State, - read_guard_option: &mut Option, event_queue: &mut EventQueue, + read_guard: ReadEventsGuard, ) { - if let Some(read_guard) = read_guard_option.take() { - if let Err(e) = read_guard.read() { - // WouldBlock is normal here because of epoll false wakeups - if let WaylandError::Io(ref io_err) = e { - if io_err.kind() == io::ErrorKind::WouldBlock { - return; - } - } - panic!("Failed to read Wayland events: {}", e) - } - - if let Err(e) = event_queue.dispatch_pending(state) { - panic!("Failed to dispatch pending Wayland events: {}", e); - } - } + read_guard.read().expect("Failed to read Wayland events"); + event_queue.dispatch_pending(state) + .expect("Failed to dispatch pending Wayland events"); } fn handle_sway_event( diff --git a/src/poll.rs b/src/poll.rs new file mode 100644 index 0000000..47fcc80 --- /dev/null +++ b/src/poll.rs @@ -0,0 +1,149 @@ +use std::{ + io, + marker::PhantomData, + mem::MaybeUninit, + os::fd::{BorrowedFd, OwnedFd}, +}; + +use rustix::{ + event::{PollFd, PollFlags, poll}, + fd::AsFd, + fs::{fcntl_setfl, OFlags}, + io::{Errno, fcntl_setfd, FdFlags, read_uninit, retry_on_intr, write}, + pipe::pipe, +}; + +pub struct Poll<'fd> { + poll_fds: Vec>, +} + +impl<'fd> Poll<'fd> { + pub fn with_capacity(capacity: usize) -> Self { + Poll { poll_fds: Vec::with_capacity(capacity) } + } + + pub fn add_readable(&mut self, fd: &'fd impl AsFd) -> Token<'fd> { + let index = self.poll_fds.len(); + self.poll_fds.push(PollFd::new(fd, PollFlags::IN)); + Token { index, marker: PhantomData } + } + + pub fn poll(&mut self) -> io::Result<()> { + let events_count = retry_on_intr(|| poll(&mut self.poll_fds, -1))?; + assert_ne!(events_count, 0); + Ok(()) + } + + pub fn ready(&mut self, token: Token) -> bool { + let revents = self.poll_fds[token.index].revents(); + assert!(!revents.intersects(PollFlags::NVAL)); + !revents.is_empty() + } +} + +#[derive(Clone, Copy)] +pub struct Token<'a> { + index: usize, + marker: PhantomData> +} + +pub enum Waker { + Eventfd { fd: OwnedFd }, + Pipe { read_half: OwnedFd, write_half: OwnedFd }, +} + +impl Waker { + pub fn new() -> io::Result { + #[cfg(any( + target_os = "linux", + target_os = "android", + target_os = "freebsd", + target_os = "illumos", + ))] { + use rustix::event::{EventfdFlags, eventfd}; + if let Ok(fd) = eventfd( + 0, + EventfdFlags::CLOEXEC | EventfdFlags::NONBLOCK + ) { + return Ok(Waker::Eventfd { fd }); + } + } + let (read_half, write_half) = pipe_cloexec_nonblock()?; + Ok(Waker::Pipe { read_half, write_half }) + } + + pub fn wake(&self) { + match self { + Waker::Eventfd { fd } => assert_ok_or_wouldblock( + write(fd, &1u64.to_ne_bytes()) + ), + Waker::Pipe { write_half, .. } => assert_ok_or_wouldblock( + write(write_half, &[0u8]) + ), + } + } + + pub fn read(&self) { + match self { + Waker::Eventfd { fd } => assert_ok_or_wouldblock( + read_uninit(fd, &mut [MaybeUninit::::uninit(); 8]) + ), + Waker::Pipe { read_half, .. } => assert_ok_or_wouldblock( + clear_pipe(read_half) + ), + } + } +} + +impl AsFd for Waker { + fn as_fd(&self) -> BorrowedFd { + match self { + Waker::Eventfd { fd } => fd.as_fd(), + Waker::Pipe { read_half, .. } => read_half.as_fd(), + } + } +} + +pub fn pipe_cloexec_nonblock() -> io::Result<(OwnedFd, OwnedFd)> { + #[cfg(any( + target_os = "linux", + target_os = "android", + target_os = "freebsd", + target_os = "netbsd", + target_os = "openbsd", + target_os = "dragonfly", + target_os = "illumos", + target_os = "redox", + ))] { + use rustix::pipe::{PipeFlags, pipe_with}; + if let Ok(ret) = pipe_with(PipeFlags::CLOEXEC | PipeFlags::NONBLOCK) { + return Ok(ret) + } + } + let (read_half, write_half) = pipe()?; + fcntl_setfd(&read_half, FdFlags::CLOEXEC)?; + fcntl_setfd(&write_half, FdFlags::CLOEXEC)?; + fcntl_setfl(&read_half, OFlags::NONBLOCK)?; + fcntl_setfl(&write_half, OFlags::NONBLOCK)?; + Ok((read_half, write_half)) +} + +fn clear_pipe(read_half: impl AsFd) -> Result<(), Errno> { + const LEN: usize = 256; + let mut buf = [MaybeUninit::::uninit(); LEN]; + loop { + match read_uninit(&read_half, &mut buf) { + Ok((slice, _)) => if slice.len() < LEN { return Ok(()) }, + Err(e) => return Err(e), + } + } +} + +#[track_caller] +fn assert_ok_or_wouldblock(result: Result) { + match result { + #[allow(unreachable_patterns)] + Ok(_) | Err(Errno::AGAIN) | Err(Errno::WOULDBLOCK) => (), + Err(e) => panic!("{e}"), + } +} diff --git a/src/signal.rs b/src/signal.rs index 8daff26..b6c2d3c 100644 --- a/src/signal.rs +++ b/src/signal.rs @@ -2,7 +2,7 @@ use std::{ ffi::c_int, io, mem::{ManuallyDrop, MaybeUninit}, - os::fd::{AsRawFd, FromRawFd, OwnedFd, RawFd}, + os::fd::{AsRawFd, BorrowedFd, FromRawFd, OwnedFd}, ptr, sync::atomic::{AtomicI32, Ordering::Relaxed}, }; @@ -13,11 +13,12 @@ use libc::{ sigaction, sigemptyset, signal, sigset_t, write, }; use rustix::{ - fs::{fcntl_setfl, OFlags}, - io::{fcntl_setfd, FdFlags, read_uninit}, - pipe::pipe, + fd::AsFd, + io::read_uninit, }; +use crate::poll::pipe_cloexec_nonblock; + const TERM_SIGNALS: [c_int; 3] = [SIGHUP, SIGINT, SIGTERM]; const OTHER_SIGNALS: [c_int; 2] = [SIGUSR1, SIGUSR2]; @@ -36,11 +37,7 @@ pub struct SignalPipe { impl SignalPipe { pub fn new() -> io::Result { unsafe { - let (read_half, write_half) = pipe()?; - fcntl_setfd(&read_half, FdFlags::CLOEXEC)?; - fcntl_setfd(&write_half, FdFlags::CLOEXEC)?; - fcntl_setfl(&read_half, OFlags::NONBLOCK)?; - fcntl_setfl(&write_half, OFlags::NONBLOCK)?; + let (read_half, write_half) = pipe_cloexec_nonblock()?; PIPE_FD.compare_exchange( -1, write_half.as_raw_fd(), @@ -95,9 +92,9 @@ impl Drop for SignalPipe { } } -impl AsRawFd for SignalPipe { - fn as_raw_fd(&self) -> RawFd { - self.read_half.as_raw_fd() +impl AsFd for SignalPipe { + fn as_fd(&self) -> BorrowedFd { + self.read_half.as_fd() } }