Add some async job infrastructure.

pull/398/head
Joe Neeman 3 years ago committed by Blaž Hrastnik
parent c9be480bf8
commit d64d75e724

@ -6,6 +6,7 @@ use crate::{
args::Args, args::Args,
compositor::Compositor, compositor::Compositor,
config::Config, config::Config,
job::Jobs,
keymap::Keymaps, keymap::Keymaps,
ui::{self, Spinner}, ui::{self, Spinner},
}; };
@ -31,13 +32,6 @@ use crossterm::{
use futures_util::{future, stream::FuturesUnordered}; use futures_util::{future, stream::FuturesUnordered};
type BoxFuture<T> = Pin<Box<dyn Future<Output = T> + Send>>;
pub type LspCallback =
BoxFuture<Result<Box<dyn FnOnce(&mut Editor, &mut Compositor) + Send>, anyhow::Error>>;
pub type LspCallbacks = FuturesUnordered<LspCallback>;
pub type LspCallbackWrapper = Box<dyn FnOnce(&mut Editor, &mut Compositor) + Send>;
pub struct Application { pub struct Application {
compositor: Compositor, compositor: Compositor,
editor: Editor, editor: Editor,
@ -48,7 +42,7 @@ pub struct Application {
theme_loader: Arc<theme::Loader>, theme_loader: Arc<theme::Loader>,
syn_loader: Arc<syntax::Loader>, syn_loader: Arc<syntax::Loader>,
callbacks: LspCallbacks, jobs: Jobs,
lsp_progress: LspProgressMap, lsp_progress: LspProgressMap,
} }
@ -120,7 +114,7 @@ impl Application {
theme_loader, theme_loader,
syn_loader, syn_loader,
callbacks: FuturesUnordered::new(), jobs: Jobs::new(),
lsp_progress: LspProgressMap::new(), lsp_progress: LspProgressMap::new(),
}; };
@ -130,11 +124,11 @@ impl Application {
fn render(&mut self) { fn render(&mut self) {
let editor = &mut self.editor; let editor = &mut self.editor;
let compositor = &mut self.compositor; let compositor = &mut self.compositor;
let callbacks = &mut self.callbacks; let jobs = &mut self.jobs;
let mut cx = crate::compositor::Context { let mut cx = crate::compositor::Context {
editor, editor,
callbacks, jobs,
scroll: None, scroll: None,
}; };
@ -148,6 +142,7 @@ impl Application {
loop { loop {
if self.editor.should_close() { if self.editor.should_close() {
self.jobs.finish();
break; break;
} }
@ -172,27 +167,18 @@ impl Application {
} }
self.render(); self.render();
} }
Some(callback) = &mut self.callbacks.next() => { Some(callback) = self.jobs.next() => {
self.handle_language_server_callback(callback) self.jobs.handle_callback(&mut self.editor, &mut self.compositor, callback);
self.render();
} }
} }
} }
} }
pub fn handle_language_server_callback(
&mut self,
callback: Result<LspCallbackWrapper, anyhow::Error>,
) {
if let Ok(callback) = callback {
// TODO: handle Err()
callback(&mut self.editor, &mut self.compositor);
self.render();
}
}
pub fn handle_terminal_events(&mut self, event: Option<Result<Event, crossterm::ErrorKind>>) { pub fn handle_terminal_events(&mut self, event: Option<Result<Event, crossterm::ErrorKind>>) {
let mut cx = crate::compositor::Context { let mut cx = crate::compositor::Context {
editor: &mut self.editor, editor: &mut self.editor,
callbacks: &mut self.callbacks, jobs: &mut self.jobs,
scroll: None, scroll: None,
}; };
// Handle key events // Handle key events

@ -35,8 +35,8 @@ use crate::{
ui::{self, Completion, Picker, Popup, Prompt, PromptEvent}, ui::{self, Completion, Picker, Popup, Prompt, PromptEvent},
}; };
use crate::application::{LspCallback, LspCallbackWrapper, LspCallbacks}; use crate::job::{self, Job, JobFuture, Jobs};
use futures_util::FutureExt; use futures_util::{FutureExt, TryFutureExt};
use std::{fmt, future::Future, path::Display, str::FromStr}; use std::{fmt, future::Future, path::Display, str::FromStr};
use std::{ use std::{
@ -54,7 +54,7 @@ pub struct Context<'a> {
pub callback: Option<crate::compositor::Callback>, pub callback: Option<crate::compositor::Callback>,
pub on_next_key_callback: Option<Box<dyn FnOnce(&mut Context, KeyEvent)>>, pub on_next_key_callback: Option<Box<dyn FnOnce(&mut Context, KeyEvent)>>,
pub callbacks: &'a mut LspCallbacks, pub jobs: &'a mut Jobs,
} }
impl<'a> Context<'a> { impl<'a> Context<'a> {
@ -85,13 +85,13 @@ impl<'a> Context<'a> {
let callback = Box::pin(async move { let callback = Box::pin(async move {
let json = call.await?; let json = call.await?;
let response = serde_json::from_value(json)?; let response = serde_json::from_value(json)?;
let call: LspCallbackWrapper = let call: job::Callback =
Box::new(move |editor: &mut Editor, compositor: &mut Compositor| { Box::new(move |editor: &mut Editor, compositor: &mut Compositor| {
callback(editor, compositor, response) callback(editor, compositor, response)
}); });
Ok(call) Ok(call)
}); });
self.callbacks.push(callback); self.jobs.callback(callback);
} }
/// Returns 1 if no explicit count was provided /// Returns 1 if no explicit count was provided
@ -1149,7 +1149,7 @@ mod cmd {
path: Option<P>, path: Option<P>,
) -> Result<tokio::task::JoinHandle<Result<(), anyhow::Error>>, anyhow::Error> { ) -> Result<tokio::task::JoinHandle<Result<(), anyhow::Error>>, anyhow::Error> {
use anyhow::anyhow; use anyhow::anyhow;
let callbacks = &mut cx.callbacks; let jobs = &mut cx.jobs;
let (view, doc) = current!(cx.editor); let (view, doc) = current!(cx.editor);
if let Some(path) = path { if let Some(path) = path {
@ -1168,7 +1168,7 @@ mod cmd {
doc.format().map(|fmt| { doc.format().map(|fmt| {
let shared = fmt.shared(); let shared = fmt.shared();
let callback = make_format_callback(doc.id(), doc.version(), true, shared.clone()); let callback = make_format_callback(doc.id(), doc.version(), true, shared.clone());
callbacks.push(callback); jobs.callback(callback);
shared shared
}) })
} else { } else {
@ -1178,8 +1178,12 @@ mod cmd {
} }
fn write(cx: &mut compositor::Context, args: &[&str], event: PromptEvent) { fn write(cx: &mut compositor::Context, args: &[&str], event: PromptEvent) {
if let Err(e) = write_impl(cx, args.first()) { match write_impl(cx, args.first()) {
cx.editor.set_error(e.to_string()); Err(e) => cx.editor.set_error(e.to_string()),
Ok(handle) => {
cx.jobs
.add(Job::new(handle.unwrap_or_else(|e| Err(e.into()))).wait_before_exiting());
}
}; };
} }
@ -1192,7 +1196,7 @@ mod cmd {
if let Some(format) = doc.format() { if let Some(format) = doc.format() {
let callback = make_format_callback(doc.id(), doc.version(), false, format); let callback = make_format_callback(doc.id(), doc.version(), false, format);
cx.callbacks.push(callback); cx.jobs.callback(callback);
} }
} }
@ -1918,10 +1922,10 @@ fn make_format_callback(
doc_version: i32, doc_version: i32,
set_unmodified: bool, set_unmodified: bool,
format: impl Future<Output = helix_lsp::util::LspFormatting> + Send + 'static, format: impl Future<Output = helix_lsp::util::LspFormatting> + Send + 'static,
) -> LspCallback { ) -> impl Future<Output = anyhow::Result<job::Callback>> {
Box::pin(async move { async move {
let format = format.await; let format = format.await;
let call: LspCallbackWrapper = let call: job::Callback =
Box::new(move |editor: &mut Editor, compositor: &mut Compositor| { Box::new(move |editor: &mut Editor, compositor: &mut Compositor| {
let view_id = view!(editor).id; let view_id = view!(editor).id;
if let Some(doc) = editor.document_mut(doc_id) { if let Some(doc) = editor.document_mut(doc_id) {
@ -1937,7 +1941,7 @@ fn make_format_callback(
} }
}); });
Ok(call) Ok(call)
}) }
} }
enum Open { enum Open {

@ -26,12 +26,12 @@ pub enum EventResult {
use helix_view::Editor; use helix_view::Editor;
use crate::application::LspCallbacks; use crate::job::Jobs;
pub struct Context<'a> { pub struct Context<'a> {
pub editor: &'a mut Editor, pub editor: &'a mut Editor,
pub scroll: Option<usize>, pub scroll: Option<usize>,
pub callbacks: &'a mut LspCallbacks, pub jobs: &'a mut Jobs,
} }
pub trait Component: Any + AnyComponent { pub trait Component: Any + AnyComponent {

@ -0,0 +1,100 @@
use helix_view::Editor;
use crate::compositor::Compositor;
use futures_util::future::{self, BoxFuture, Future, FutureExt};
use futures_util::stream::{self, FuturesUnordered, Select, StreamExt};
pub type Callback = Box<dyn FnOnce(&mut Editor, &mut Compositor) + Send>;
pub type JobFuture = BoxFuture<'static, anyhow::Result<Option<Callback>>>;
pub struct Job {
pub future: BoxFuture<'static, anyhow::Result<Option<Callback>>>,
/// Do we need to wait for this job to finish before exiting?
pub wait: bool,
}
#[derive(Default)]
pub struct Jobs {
futures: FuturesUnordered<JobFuture>,
/// These are the ones that need to complete before we exit.
wait_futures: FuturesUnordered<JobFuture>,
}
impl Job {
pub fn new<F: Future<Output = anyhow::Result<()>> + Send + 'static>(f: F) -> Job {
Job {
future: f.map(|r| r.map(|()| None)).boxed(),
wait: false,
}
}
pub fn with_callback<F: Future<Output = anyhow::Result<Callback>> + Send + 'static>(
f: F,
) -> Job {
Job {
future: f.map(|r| r.map(|x| Some(x))).boxed(),
wait: false,
}
}
pub fn wait_before_exiting(mut self) -> Job {
self.wait = true;
self
}
}
impl Jobs {
pub fn new() -> Jobs {
Jobs::default()
}
pub fn spawn<F: Future<Output = anyhow::Result<()>> + Send + 'static>(&mut self, f: F) {
self.add(Job::new(f));
}
pub fn callback<F: Future<Output = anyhow::Result<Callback>> + Send + 'static>(
&mut self,
f: F,
) {
self.add(Job::with_callback(f));
}
pub fn handle_callback(
&mut self,
editor: &mut Editor,
compositor: &mut Compositor,
call: anyhow::Result<Option<Callback>>,
) {
match call {
Ok(None) => {}
Ok(Some(call)) => {
call(editor, compositor);
}
Err(e) => {
editor.set_error(format!("Async job failed: {}", e));
}
}
}
pub fn next<'a>(
&'a mut self,
) -> impl Future<Output = Option<anyhow::Result<Option<Callback>>>> + 'a {
future::select(self.futures.next(), self.wait_futures.next())
.map(|either| either.factor_first().0)
}
pub fn add(&mut self, j: Job) {
if j.wait {
self.wait_futures.push(j.future);
} else {
self.futures.push(j.future);
}
}
/// Blocks until all the jobs that need to be waited on are done.
pub fn finish(&mut self) {
let wait_futures = std::mem::take(&mut self.wait_futures);
helix_lsp::block_on(wait_futures.for_each(|_| future::ready(())));
}
}

@ -8,5 +8,6 @@ pub mod args;
pub mod commands; pub mod commands;
pub mod compositor; pub mod compositor;
pub mod config; pub mod config;
pub mod job;
pub mod keymap; pub mod keymap;
pub mod ui; pub mod ui;

@ -621,7 +621,7 @@ impl Component for EditorView {
count: None, count: None,
callback: None, callback: None,
on_next_key_callback: None, on_next_key_callback: None,
callbacks: cx.callbacks, jobs: cx.jobs,
}; };
if let Some(on_next_key) = self.on_next_key.take() { if let Some(on_next_key) = self.on_next_key.take() {
@ -639,7 +639,7 @@ impl Component for EditorView {
// use a fake context here // use a fake context here
let mut cx = Context { let mut cx = Context {
editor: cxt.editor, editor: cxt.editor,
callbacks: cxt.callbacks, jobs: cxt.jobs,
scroll: None, scroll: None,
}; };
let res = completion.handle_event(event, &mut cx); let res = completion.handle_event(event, &mut cx);

Loading…
Cancel
Save