use helix_event::status::StatusMessage; use helix_event::{runtime_local, send_blocking}; use helix_view::Editor; use once_cell::sync::OnceCell; use crate::compositor::Compositor; use futures_util::future::{BoxFuture, Future, FutureExt}; use futures_util::stream::{FuturesUnordered, StreamExt}; use tokio::sync::mpsc::{channel, Receiver, Sender}; pub type EditorCompositorCallback = Box; pub type EditorCallback = Box; runtime_local! { static JOB_QUEUE: OnceCell> = OnceCell::new(); } pub async fn dispatch_callback(job: Callback) { let _ = JOB_QUEUE.wait().send(job).await; } pub async fn dispatch(job: impl FnOnce(&mut Editor, &mut Compositor) + Send + 'static) { let _ = JOB_QUEUE .wait() .send(Callback::EditorCompositor(Box::new(job))) .await; } pub fn dispatch_blocking(job: impl FnOnce(&mut Editor, &mut Compositor) + Send + 'static) { let jobs = JOB_QUEUE.wait(); send_blocking(jobs, Callback::EditorCompositor(Box::new(job))) } pub enum Callback { EditorCompositor(EditorCompositorCallback), Editor(EditorCallback), } pub type JobFuture = BoxFuture<'static, anyhow::Result>>; pub struct Job { pub future: BoxFuture<'static, anyhow::Result>>, /// Do we need to wait for this job to finish before exiting? pub wait: bool, } pub struct Jobs { /// jobs that need to complete before we exit. pub wait_futures: FuturesUnordered, pub callbacks: Receiver, pub status_messages: Receiver, } impl Job { pub fn new> + Send + 'static>(f: F) -> Self { Self { future: f.map(|r| r.map(|()| None)).boxed(), wait: false, } } pub fn with_callback> + Send + 'static>( f: F, ) -> Self { Self { future: f.map(|r| r.map(Some)).boxed(), wait: false, } } pub fn wait_before_exiting(mut self) -> Self { self.wait = true; self } } impl Jobs { #[allow(clippy::new_without_default)] pub fn new() -> Self { let (tx, rx) = channel(1024); let _ = JOB_QUEUE.set(tx); let status_messages = helix_event::status::setup(); Self { wait_futures: FuturesUnordered::new(), callbacks: rx, status_messages, } } pub fn spawn> + Send + 'static>(&mut self, f: F) { self.add(Job::new(f)); } pub fn callback> + Send + 'static>( &mut self, f: F, ) { self.add(Job::with_callback(f)); } pub fn handle_callback( &self, editor: &mut Editor, compositor: &mut Compositor, call: anyhow::Result>, ) { match call { Ok(None) => {} Ok(Some(call)) => match call { Callback::EditorCompositor(call) => call(editor, compositor), Callback::Editor(call) => call(editor), }, Err(e) => { editor.set_error(format!("Async job failed: {}", e)); } } } pub fn add(&self, j: Job) { if j.wait { self.wait_futures.push(j.future); } else { tokio::spawn(async move { match j.future.await { Ok(Some(cb)) => dispatch_callback(cb).await, Ok(None) => (), Err(err) => helix_event::status::report(err).await, } }); } } /// Blocks until all the jobs that need to be waited on are done. pub async fn finish( &mut self, editor: &mut Editor, mut compositor: Option<&mut Compositor>, ) -> anyhow::Result<()> { log::debug!("waiting on jobs..."); let mut wait_futures = std::mem::take(&mut self.wait_futures); while let (Some(job), tail) = wait_futures.into_future().await { match job { Ok(callback) => { wait_futures = tail; if let Some(callback) = callback { // clippy doesn't realize this is an error without the derefs #[allow(clippy::needless_option_as_deref)] match callback { Callback::EditorCompositor(call) if compositor.is_some() => { call(editor, compositor.as_deref_mut().unwrap()) } Callback::Editor(call) => call(editor), // skip callbacks for which we don't have the necessary references _ => (), } } } Err(e) => { self.wait_futures = tail; return Err(e); } } } Ok(()) } }