use better names for senders, receivers and threads

This commit is contained in:
Kunal Mohan 2021-04-30 20:40:32 +05:30
parent 913697b144
commit c6f93ba0d2
5 changed files with 91 additions and 64 deletions

View file

@ -105,7 +105,7 @@ pub fn start_client(mut os_input: Box<dyn ClientOsApi>, opts: CliArgs) {
let os_input = os_input.clone(); let os_input = os_input.clone();
move || { move || {
loop { loop {
let (instruction, mut err_ctx) = os_input.client_recv(); let (instruction, mut err_ctx) = os_input.recv_from_server();
err_ctx.add_call(ContextType::Client(ClientContext::from(&instruction))); err_ctx.add_call(ContextType::Client(ClientContext::from(&instruction)));
if let ClientInstruction::Exit = instruction { if let ClientInstruction::Exit = instruction {
break; break;

View file

@ -85,7 +85,6 @@ lazy_static! {
.unwrap_or_else(|| project_dir.cache_dir()); .unwrap_or_else(|| project_dir.cache_dir());
std::fs::create_dir_all(ipc_dir).unwrap(); std::fs::create_dir_all(ipc_dir).unwrap();
let session_name = names::Generator::default().next().unwrap(); let session_name = names::Generator::default().next().unwrap();
let x = ipc_dir.join(session_name); ipc_dir.join(session_name)
x
}; };
} }

View file

@ -186,8 +186,8 @@ impl<T: Serialize> IpcSenderWithContext<T> {
#[derive(Clone)] #[derive(Clone)]
pub struct ServerOsInputOutput { pub struct ServerOsInputOutput {
orig_termios: Arc<Mutex<termios::Termios>>, orig_termios: Arc<Mutex<termios::Termios>>,
recv_socket: Option<Arc<Mutex<io::BufReader<LocalSocketStream>>>>, receive_instructions_from_client: Option<Arc<Mutex<io::BufReader<LocalSocketStream>>>>,
sender_socket: Arc<Mutex<Option<IpcSenderWithContext<ClientInstruction>>>>, send_instructions_to_client: Arc<Mutex<Option<IpcSenderWithContext<ClientInstruction>>>>,
} }
/// The `ServerOsApi` trait represents an abstract interface to the features of an operating system that /// The `ServerOsApi` trait represents an abstract interface to the features of an operating system that
@ -211,7 +211,7 @@ pub trait ServerOsApi: Send + Sync {
/// Returns a [`Box`] pointer to this [`ServerOsApi`] struct. /// Returns a [`Box`] pointer to this [`ServerOsApi`] struct.
fn box_clone(&self) -> Box<dyn ServerOsApi>; fn box_clone(&self) -> Box<dyn ServerOsApi>;
/// Receives a message on server-side IPC channel /// Receives a message on server-side IPC channel
fn server_recv(&self) -> (ServerInstruction, ErrorContext); fn recv_from_client(&self) -> (ServerInstruction, ErrorContext);
/// Sends a message to client /// Sends a message to client
fn send_to_client(&self, msg: ClientInstruction); fn send_to_client(&self, msg: ClientInstruction);
/// Adds a sender to client /// Adds a sender to client
@ -251,11 +251,19 @@ impl ServerOsApi for ServerOsInputOutput {
waitpid(Pid::from_raw(pid), None).unwrap(); waitpid(Pid::from_raw(pid), None).unwrap();
Ok(()) Ok(())
} }
fn server_recv(&self) -> (ServerInstruction, ErrorContext) { fn recv_from_client(&self) -> (ServerInstruction, ErrorContext) {
bincode::deserialize_from(&mut *self.recv_socket.as_ref().unwrap().lock().unwrap()).unwrap() bincode::deserialize_from(
&mut *self
.receive_instructions_from_client
.as_ref()
.unwrap()
.lock()
.unwrap(),
)
.unwrap()
} }
fn send_to_client(&self, msg: ClientInstruction) { fn send_to_client(&self, msg: ClientInstruction) {
self.sender_socket self.send_instructions_to_client
.lock() .lock()
.unwrap() .unwrap()
.as_mut() .as_mut()
@ -264,9 +272,9 @@ impl ServerOsApi for ServerOsInputOutput {
.unwrap(); .unwrap();
} }
fn add_client_sender(&mut self) { fn add_client_sender(&mut self) {
assert!(self.sender_socket.lock().unwrap().is_none()); assert!(self.send_instructions_to_client.lock().unwrap().is_none());
let sock_fd = self let sock_fd = self
.recv_socket .receive_instructions_from_client
.as_ref() .as_ref()
.unwrap() .unwrap()
.lock() .lock()
@ -275,10 +283,12 @@ impl ServerOsApi for ServerOsInputOutput {
.as_raw_fd(); .as_raw_fd();
let dup_fd = unistd::dup(sock_fd).unwrap(); let dup_fd = unistd::dup(sock_fd).unwrap();
let dup_sock = unsafe { LocalSocketStream::from_raw_fd(dup_fd) }; let dup_sock = unsafe { LocalSocketStream::from_raw_fd(dup_fd) };
*self.sender_socket.lock().unwrap() = Some(IpcSenderWithContext::new(dup_sock)); *self.send_instructions_to_client.lock().unwrap() =
Some(IpcSenderWithContext::new(dup_sock));
} }
fn update_receiver(&mut self, stream: LocalSocketStream) { fn update_receiver(&mut self, stream: LocalSocketStream) {
self.recv_socket = Some(Arc::new(Mutex::new(io::BufReader::new(stream)))); self.receive_instructions_from_client =
Some(Arc::new(Mutex::new(io::BufReader::new(stream))));
} }
} }
@ -293,16 +303,16 @@ pub fn get_server_os_input() -> ServerOsInputOutput {
let orig_termios = Arc::new(Mutex::new(current_termios)); let orig_termios = Arc::new(Mutex::new(current_termios));
ServerOsInputOutput { ServerOsInputOutput {
orig_termios, orig_termios,
recv_socket: None, receive_instructions_from_client: None,
sender_socket: Arc::new(Mutex::new(None)), send_instructions_to_client: Arc::new(Mutex::new(None)),
} }
} }
#[derive(Clone)] #[derive(Clone)]
pub struct ClientOsInputOutput { pub struct ClientOsInputOutput {
orig_termios: Arc<Mutex<termios::Termios>>, orig_termios: Arc<Mutex<termios::Termios>>,
server_sender: Arc<Mutex<Option<IpcSenderWithContext<ServerInstruction>>>>, send_instructions_to_server: Arc<Mutex<Option<IpcSenderWithContext<ServerInstruction>>>>,
receiver: Arc<Mutex<Option<io::BufReader<LocalSocketStream>>>>, receive_instructions_from_server: Arc<Mutex<Option<io::BufReader<LocalSocketStream>>>>,
} }
/// The `ClientOsApi` trait represents an abstract interface to the features of an operating system that /// The `ClientOsApi` trait represents an abstract interface to the features of an operating system that
@ -326,7 +336,7 @@ pub trait ClientOsApi: Send + Sync {
fn send_to_server(&self, msg: ServerInstruction); fn send_to_server(&self, msg: ServerInstruction);
/// Receives a message on client-side IPC channel /// Receives a message on client-side IPC channel
// This should be called from the client-side router thread only. // This should be called from the client-side router thread only.
fn client_recv(&self) -> (ClientInstruction, ErrorContext); fn recv_from_server(&self) -> (ClientInstruction, ErrorContext);
fn receive_sigwinch(&self, cb: Box<dyn Fn()>); fn receive_sigwinch(&self, cb: Box<dyn Fn()>);
/// Establish a connection with the server socket. /// Establish a connection with the server socket.
fn connect_to_server(&self); fn connect_to_server(&self);
@ -360,7 +370,7 @@ impl ClientOsApi for ClientOsInputOutput {
Box::new(stdout) Box::new(stdout)
} }
fn send_to_server(&self, msg: ServerInstruction) { fn send_to_server(&self, msg: ServerInstruction) {
self.server_sender self.send_instructions_to_server
.lock() .lock()
.unwrap() .unwrap()
.as_mut() .as_mut()
@ -368,8 +378,16 @@ impl ClientOsApi for ClientOsInputOutput {
.send(msg) .send(msg)
.unwrap(); .unwrap();
} }
fn client_recv(&self) -> (ClientInstruction, ErrorContext) { fn recv_from_server(&self) -> (ClientInstruction, ErrorContext) {
bincode::deserialize_from(&mut self.receiver.lock().unwrap().as_mut().unwrap()).unwrap() bincode::deserialize_from(
&mut self
.receive_instructions_from_server
.lock()
.unwrap()
.as_mut()
.unwrap(),
)
.unwrap()
} }
fn receive_sigwinch(&self, cb: Box<dyn Fn()>) { fn receive_sigwinch(&self, cb: Box<dyn Fn()>) {
let mut signals = Signals::new(&[SIGWINCH, SIGTERM, SIGINT, SIGQUIT]).unwrap(); let mut signals = Signals::new(&[SIGWINCH, SIGTERM, SIGINT, SIGQUIT]).unwrap();
@ -397,8 +415,8 @@ impl ClientOsApi for ClientOsInputOutput {
let dup_fd = unistd::dup(sock_fd).unwrap(); let dup_fd = unistd::dup(sock_fd).unwrap();
let receiver = unsafe { LocalSocketStream::from_raw_fd(dup_fd) }; let receiver = unsafe { LocalSocketStream::from_raw_fd(dup_fd) };
let sender = IpcSenderWithContext::new(socket); let sender = IpcSenderWithContext::new(socket);
*self.server_sender.lock().unwrap() = Some(sender); *self.send_instructions_to_server.lock().unwrap() = Some(sender);
*self.receiver.lock().unwrap() = Some(io::BufReader::new(receiver)); *self.receive_instructions_from_server.lock().unwrap() = Some(io::BufReader::new(receiver));
} }
} }
@ -413,7 +431,7 @@ pub fn get_client_os_input() -> ClientOsInputOutput {
let orig_termios = Arc::new(Mutex::new(current_termios)); let orig_termios = Arc::new(Mutex::new(current_termios));
ClientOsInputOutput { ClientOsInputOutput {
orig_termios, orig_termios,
server_sender: Arc::new(Mutex::new(None)), send_instructions_to_server: Arc::new(Mutex::new(None)),
receiver: Arc::new(Mutex::new(None)), receive_instructions_from_server: Arc::new(Mutex::new(None)),
} }
} }

View file

@ -78,7 +78,9 @@ pub fn start_server(os_input: Box<dyn ServerOsApi>, opts: CliArgs) -> thread::Jo
send_server_instructions.clone(), send_server_instructions.clone(),
); );
#[cfg(not(test))] #[cfg(not(test))]
let _ = thread::Builder::new().name("listener".to_string()).spawn({ let _ = thread::Builder::new()
.name("server_listener".to_string())
.spawn({
let os_input = os_input.clone(); let os_input = os_input.clone();
let sessions = sessions.clone(); let sessions = sessions.clone();
let send_server_instructions = send_server_instructions.clone(); let send_server_instructions = send_server_instructions.clone();
@ -103,7 +105,7 @@ pub fn start_server(os_input: Box<dyn ServerOsApi>, opts: CliArgs) -> thread::Jo
}); });
thread::Builder::new() thread::Builder::new()
.name("ipc_server".to_string()) .name("server_thread".to_string())
.spawn({ .spawn({
move || loop { move || loop {
let (instruction, mut err_ctx) = receive_server_instructions.recv().unwrap(); let (instruction, mut err_ctx) = receive_server_instructions.recv().unwrap();
@ -151,9 +153,9 @@ fn handle_client(
send_server_instructions: SenderWithContext<ServerInstruction>, send_server_instructions: SenderWithContext<ServerInstruction>,
) { ) {
thread::Builder::new() thread::Builder::new()
.name("router".to_string()) .name("server_router".to_string())
.spawn(move || loop { .spawn(move || loop {
let (instruction, mut err_ctx) = os_input.server_recv(); let (instruction, mut err_ctx) = os_input.recv_from_client();
err_ctx.add_call(ContextType::IPCServer(ServerContext::from(&instruction))); err_ctx.add_call(ContextType::IPCServer(ServerContext::from(&instruction)));
let rlocked_sessions = sessions.read().unwrap(); let rlocked_sessions = sessions.read().unwrap();
match instruction { match instruction {

View file

@ -75,10 +75,10 @@ pub struct FakeInputOutput {
win_sizes: Arc<Mutex<HashMap<RawFd, PositionAndSize>>>, win_sizes: Arc<Mutex<HashMap<RawFd, PositionAndSize>>>,
possible_tty_inputs: HashMap<u16, Bytes>, possible_tty_inputs: HashMap<u16, Bytes>,
last_snapshot_time: Arc<Mutex<Instant>>, last_snapshot_time: Arc<Mutex<Instant>>,
client_sender: SenderWithContext<ClientInstruction>, send_instructions_to_client: SenderWithContext<ClientInstruction>,
client_receiver: Arc<Mutex<mpsc::Receiver<(ClientInstruction, ErrorContext)>>>, receive_instructions_from_server: Arc<Mutex<mpsc::Receiver<(ClientInstruction, ErrorContext)>>>,
server_sender: SenderWithContext<ServerInstruction>, send_instructions_to_server: SenderWithContext<ServerInstruction>,
server_receiver: Arc<Mutex<mpsc::Receiver<(ServerInstruction, ErrorContext)>>>, receive_instructions_from_client: Arc<Mutex<mpsc::Receiver<(ServerInstruction, ErrorContext)>>>,
should_trigger_sigwinch: Arc<(Mutex<bool>, Condvar)>, should_trigger_sigwinch: Arc<(Mutex<bool>, Condvar)>,
sigwinch_event: Option<PositionAndSize>, sigwinch_event: Option<PositionAndSize>,
} }
@ -90,10 +90,10 @@ impl FakeInputOutput {
let stdout_writer = FakeStdoutWriter::new(last_snapshot_time.clone()); let stdout_writer = FakeStdoutWriter::new(last_snapshot_time.clone());
let (client_sender, client_receiver): ChannelWithContext<ClientInstruction> = let (client_sender, client_receiver): ChannelWithContext<ClientInstruction> =
mpsc::channel(); mpsc::channel();
let client_sender = SenderWithContext::new(SenderType::Sender(client_sender)); let send_instructions_to_client = SenderWithContext::new(SenderType::Sender(client_sender));
let (server_sender, server_receiver): ChannelWithContext<ServerInstruction> = let (server_sender, server_receiver): ChannelWithContext<ServerInstruction> =
mpsc::channel(); mpsc::channel();
let server_sender = SenderWithContext::new(SenderType::Sender(server_sender)); let send_instructions_to_server = SenderWithContext::new(SenderType::Sender(server_sender));
win_sizes.insert(0, winsize); // 0 is the current terminal win_sizes.insert(0, winsize); // 0 is the current terminal
FakeInputOutput { FakeInputOutput {
@ -106,10 +106,10 @@ impl FakeInputOutput {
io_events: Arc::new(Mutex::new(vec![])), io_events: Arc::new(Mutex::new(vec![])),
win_sizes: Arc::new(Mutex::new(win_sizes)), win_sizes: Arc::new(Mutex::new(win_sizes)),
possible_tty_inputs: get_possible_tty_inputs(), possible_tty_inputs: get_possible_tty_inputs(),
server_receiver: Arc::new(Mutex::new(server_receiver)), receive_instructions_from_client: Arc::new(Mutex::new(server_receiver)),
server_sender, send_instructions_to_server,
client_receiver: Arc::new(Mutex::new(client_receiver)), receive_instructions_from_server: Arc::new(Mutex::new(client_receiver)),
client_sender, send_instructions_to_client,
should_trigger_sigwinch: Arc::new((Mutex::new(false), Condvar::new())), should_trigger_sigwinch: Arc::new((Mutex::new(false), Condvar::new())),
sigwinch_event: None, sigwinch_event: None,
} }
@ -195,10 +195,14 @@ impl ClientOsApi for FakeInputOutput {
Box::new(self.stdout_writer.clone()) Box::new(self.stdout_writer.clone())
} }
fn send_to_server(&self, msg: ServerInstruction) { fn send_to_server(&self, msg: ServerInstruction) {
self.server_sender.send(msg).unwrap(); self.send_instructions_to_server.send(msg).unwrap();
} }
fn client_recv(&self) -> (ClientInstruction, ErrorContext) { fn recv_from_server(&self) -> (ClientInstruction, ErrorContext) {
self.client_receiver.lock().unwrap().recv().unwrap() self.receive_instructions_from_server
.lock()
.unwrap()
.recv()
.unwrap()
} }
fn receive_sigwinch(&self, cb: Box<dyn Fn()>) { fn receive_sigwinch(&self, cb: Box<dyn Fn()>) {
if self.sigwinch_event.is_some() { if self.sigwinch_event.is_some() {
@ -273,11 +277,15 @@ impl ServerOsApi for FakeInputOutput {
self.io_events.lock().unwrap().push(IoEvent::Kill(fd)); self.io_events.lock().unwrap().push(IoEvent::Kill(fd));
Ok(()) Ok(())
} }
fn server_recv(&self) -> (ServerInstruction, ErrorContext) { fn recv_from_client(&self) -> (ServerInstruction, ErrorContext) {
self.server_receiver.lock().unwrap().recv().unwrap() self.receive_instructions_from_client
.lock()
.unwrap()
.recv()
.unwrap()
} }
fn send_to_client(&self, msg: ClientInstruction) { fn send_to_client(&self, msg: ClientInstruction) {
self.client_sender.send(msg).unwrap(); self.send_instructions_to_client.send(msg).unwrap();
} }
fn add_client_sender(&mut self) {} fn add_client_sender(&mut self) {}
fn update_receiver(&mut self, _stream: LocalSocketStream) {} fn update_receiver(&mut self, _stream: LocalSocketStream) {}