fix(plugins): synchronize event batch handling (#3367)

* fix(plugins): synchronize event batch handling

* style(fmt): rustfmt

* fix(tests): graceful shutdown for async tasks
This commit is contained in:
Aram Drevekenin 2024-05-22 11:09:10 +02:00 committed by GitHub
parent 64a5ac095c
commit fda5ab1830
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -563,38 +563,38 @@ impl WasmBridge {
.contains_key(&plugin_id)
})
.collect();
task::spawn({
let mut updates = updates.clone();
let senders = self.senders.clone();
let s = shutdown_sender.clone();
async move {
let _s = s;
for (pid, cid, event) in updates.drain(..) {
for (plugin_id, client_id, running_plugin, subscriptions) in &plugins_to_update {
for (plugin_id, client_id, running_plugin, subscriptions) in &plugins_to_update
{
let subs = subscriptions.lock().unwrap().clone();
// FIXME: This is very janky... Maybe I should write my own macro for Event -> EventType?
let event_type =
EventType::from_str(&event.to_string()).with_context(err_context)?;
if (subs.contains(&event_type) || event_type == EventType::PermissionRequestResult)
&& Self::message_is_directed_at_plugin(pid, cid, plugin_id, client_id)
if let Ok(event_type) = EventType::from_str(&event.to_string()) {
if (subs.contains(&event_type)
|| event_type == EventType::PermissionRequestResult)
&& Self::message_is_directed_at_plugin(
pid, cid, plugin_id, client_id,
)
{
task::spawn({
let senders = self.senders.clone();
let running_plugin = running_plugin.clone();
let event = event.clone();
let plugin_id = *plugin_id;
let client_id = *client_id;
let _s = shutdown_sender.clone();
async move {
let mut running_plugin = running_plugin.lock().unwrap();
let mut plugin_render_assets = vec![];
let _s = _s; // guard to allow the task to complete before cleanup/shutdown
match apply_event_to_plugin(
plugin_id,
client_id,
*plugin_id,
*client_id,
&mut running_plugin,
&event,
&mut plugin_render_assets,
senders.clone(),
) {
Ok(()) => {
let _ = senders.send_to_screen(ScreenInstruction::PluginBytes(
plugin_render_assets,
));
let _ = senders.send_to_screen(
ScreenInstruction::PluginBytes(plugin_render_assets),
);
},
Err(e) => {
log::error!("{:?}", e);
@ -604,16 +604,21 @@ impl WasmBridge {
format!("{:?}", e).replace("\n", "\n\r");
handle_plugin_crash(
plugin_id,
*plugin_id,
stringified_error,
senders.clone(),
);
},
}
}
}
}
}
}
});
}
}
// loop once more to update the cached events for the pending plugins (probably currently
// being loaded, we'll send them these events when they load)
for (pid, _cid, event) in updates.drain(..) {
for (plugin_id, cached_events) in self.cached_events_for_pending_plugins.iter_mut() {
if pid.is_none() || pid.as_ref() == Some(plugin_id) {
cached_events.push(EventOrPipeMessage::Event(event.clone()));