* xtask/run: Use varargs when run with `-data-dir` Previously any additional arguments passed on the command line were ignored. Now they are appended to `cargo run ...` as documented. * server/os_i_o: Improve error message when IPC dies and display the last send/recv error to the user instead of a generic "Buffer full" message. * server/lib: Log error in `send_to_client!` so we will know when an error occured while trying to send a message to the client. The most likely cause for this is that the client buffer filled up and hence we cannot send any new messages. While we still disconnect the client as before, now we also write a log message that explains the situation. * utils/channel: Apply rustfmt * server/lib: Detect when client responds too slow and log a message before disconnecting it. * server/os_i_o: Add retry queue to client senders that is dynamically allocated on-demand and stores `ServerToClientMsg` in case the regular IPC channel is currently full. This acts as a dynamic buffer to hold and buffer messages for a while until the client hopefully catches up. Also write a message to the log to indicate when the client is recognized to be too slow in handling server messages. * server: apply rustfmt * utils/ipc: Add session name to "Disconnect" error * utils/ipc: Fix error message indent * server/os_i_o: Undo IPC channel extension via `Vec` and drastically increase the IPC message queue size instead. Measurements didn't discover a drastic increase in RAM caused by this, and it is a much easier fix for the problem at hand. * CHANGELOG: Add PR #2068
44 lines
1.6 KiB
Rust
44 lines
1.6 KiB
Rust
//! Definitions and helpers for sending and receiving messages between threads.
|
|
|
|
use async_std::task_local;
|
|
use std::cell::RefCell;
|
|
|
|
use crate::errors::{get_current_ctx, ErrorContext};
|
|
pub use crossbeam::channel::{
|
|
bounded, unbounded, Receiver, RecvError, Select, SendError, Sender, TrySendError,
|
|
};
|
|
|
|
/// An [MPSC](mpsc) asynchronous channel with added error context.
|
|
pub type ChannelWithContext<T> = (Sender<(T, ErrorContext)>, Receiver<(T, ErrorContext)>);
|
|
|
|
/// Sends messages on an [MPSC](std::sync::mpsc) channel, along with an [`ErrorContext`],
|
|
/// synchronously or asynchronously depending on the underlying [`SenderType`].
|
|
#[derive(Clone)]
|
|
pub struct SenderWithContext<T> {
|
|
sender: Sender<(T, ErrorContext)>,
|
|
}
|
|
|
|
impl<T: Clone> SenderWithContext<T> {
|
|
pub fn new(sender: Sender<(T, ErrorContext)>) -> Self {
|
|
Self { sender }
|
|
}
|
|
|
|
/// Sends an event, along with the current [`ErrorContext`], on this
|
|
/// [`SenderWithContext`]'s channel.
|
|
pub fn send(&self, event: T) -> Result<(), SendError<(T, ErrorContext)>> {
|
|
let err_ctx = get_current_ctx();
|
|
self.sender.send((event, err_ctx))
|
|
}
|
|
}
|
|
|
|
thread_local!(
|
|
/// A key to some thread local storage (TLS) that holds a representation of the thread's call
|
|
/// stack in the form of an [`ErrorContext`].
|
|
pub static OPENCALLS: RefCell<ErrorContext> = RefCell::default()
|
|
);
|
|
|
|
task_local! {
|
|
/// A key to some task local storage that holds a representation of the task's call
|
|
/// stack in the form of an [`ErrorContext`].
|
|
pub static ASYNCOPENCALLS: RefCell<ErrorContext> = RefCell::default()
|
|
}
|