chore(write): serialize write operations within a Document

The way that document writes are handled are by submitting them to the
async job pool, which are all executed opportunistically out of order. It
was discovered that this can lead to write inconsistencies when there
are multiple writes to the same file in quick succession.

This seeks to fix this problem by removing document writes from the
general pool of jobs and into its own specialized event. Now when a
user submits a write with one of the write commands, a request is simply
queued up in a new mpsc channel that each Document makes to handle its own
writes. This way, if multiple writes are submitted on the same document,
they are executed in order, while still allowing concurrent writes for
different documents.
pull/2267/head
Skyler Hawthorne 3 years ago
parent faf0c521d1
commit d706194597

@ -6,7 +6,14 @@ use helix_core::{
pos_at_coords, syntax, Selection,
};
use helix_lsp::{lsp, util::lsp_pos_to_pos, LspProgressMap};
use helix_view::{align_view, editor::ConfigEvent, theme, tree::Layout, Align, Editor};
use helix_view::{
align_view,
document::DocumentSaveEventResult,
editor::{ConfigEvent, EditorEvent},
theme,
tree::Layout,
Align, Editor,
};
use serde_json::json;
use crate::{
@ -19,7 +26,7 @@ use crate::{
ui::{self, overlay::overlayed},
};
use log::{error, warn};
use log::{debug, error, warn};
use std::{
io::{stdin, stdout, Write},
sync::Arc,
@ -294,26 +301,6 @@ impl Application {
Some(signal) = self.signals.next() => {
self.handle_signals(signal).await;
}
Some((id, call)) = self.editor.language_servers.incoming.next() => {
self.handle_language_server_message(call, id).await;
// limit render calls for fast language server messages
let last = self.editor.language_servers.incoming.is_empty();
if last || self.last_render.elapsed() > LSP_DEADLINE {
self.render();
self.last_render = Instant::now();
}
}
Some(payload) = self.editor.debugger_events.next() => {
let needs_render = self.editor.handle_debugger_message(payload).await;
if needs_render {
self.render();
}
}
Some(config_event) = self.editor.config_events.1.recv() => {
self.handle_config_events(config_event);
self.render();
}
Some(callback) = self.jobs.futures.next() => {
self.jobs.handle_callback(&mut self.editor, &mut self.compositor, callback);
self.render();
@ -322,20 +309,47 @@ impl Application {
self.jobs.handle_callback(&mut self.editor, &mut self.compositor, callback);
self.render();
}
_ = &mut self.editor.idle_timer => {
// idle timeout
self.editor.clear_idle_timer();
self.handle_idle_timeout();
event = self.editor.wait_event() => {
match event {
EditorEvent::DocumentSave(event) => {
self.handle_document_write(event);
self.render();
}
EditorEvent::ConfigEvent(event) => {
self.handle_config_events(event);
self.render();
}
EditorEvent::LanguageServerMessage((id, call)) => {
self.handle_language_server_message(call, id).await;
// limit render calls for fast language server messages
let last = self.editor.language_servers.incoming.is_empty();
if last || self.last_render.elapsed() > LSP_DEADLINE {
self.render();
self.last_render = Instant::now();
}
}
EditorEvent::DebuggerEvent(payload) => {
let needs_render = self.editor.handle_debugger_message(payload).await;
if needs_render {
self.render();
}
}
EditorEvent::IdleTimer => {
self.editor.clear_idle_timer();
self.handle_idle_timeout();
#[cfg(feature = "integration")]
{
idle_handled = true;
#[cfg(feature = "integration")]
{
idle_handled = true;
}
}
}
}
}
// for integration tests only, reset the idle timer after every
// event to make a signal when test events are done processing
// event to signal when test events are done processing
#[cfg(feature = "integration")]
{
if idle_handled {
@ -446,6 +460,46 @@ impl Application {
}
}
pub fn handle_document_write(&mut self, doc_save_event: DocumentSaveEventResult) {
if let Err(err) = doc_save_event {
self.editor.set_error(err.to_string());
return;
}
let doc_save_event = doc_save_event.unwrap();
let doc = self.editor.document_mut(doc_save_event.doc_id);
if doc.is_none() {
warn!(
"received document saved event for non-existent doc id: {}",
doc_save_event.doc_id
);
return;
}
let doc = doc.unwrap();
debug!(
"document {:?} saved with revision {}",
doc.path(),
doc_save_event.revision
);
doc.set_last_saved_revision(doc_save_event.revision);
let lines = doc.text().len_lines();
let bytes = doc.text().len_bytes();
let path_str = doc
.path()
.expect("document written without path")
.to_string_lossy()
.into_owned();
self.editor
.set_status(format!("'{}' written, {}L {}B", path_str, lines, bytes));
}
pub fn handle_terminal_events(&mut self, event: Result<CrosstermEvent, crossterm::ErrorKind>) {
let mut cx = crate::compositor::Context {
editor: &mut self.editor,
@ -866,11 +920,28 @@ impl Application {
self.event_loop(input_stream).await;
let err = self.close().await.err();
let mut save_errs = Vec::new();
for doc in self.editor.documents_mut() {
if let Some(Err(err)) = doc.close().await {
save_errs.push((
doc.path()
.map(|path| path.to_string_lossy().into_owned())
.unwrap_or_else(|| "".into()),
err,
));
}
}
let close_err = self.close().await.err();
restore_term()?;
if let Some(err) = err {
for (path, err) in save_errs {
self.editor.exit_code = 1;
eprintln!("Error closing '{}': {}", path, err);
}
if let Some(err) = close_err {
self.editor.exit_code = 1;
eprintln!("Error: {}", err);
}

@ -51,7 +51,7 @@ use crate::{
ui::{self, overlay::overlayed, FilePicker, Picker, Popup, Prompt, PromptEvent},
};
use crate::job::{self, Job, Jobs};
use crate::job::{self, Jobs};
use futures_util::{FutureExt, StreamExt};
use std::{collections::HashMap, fmt, future::Future};
use std::{collections::HashSet, num::NonZeroUsize};

@ -77,7 +77,9 @@ fn buffer_close_by_ids_impl(
let (modified_ids, modified_names): (Vec<_>, Vec<_>) = doc_ids
.iter()
.filter_map(|&doc_id| {
if let Err(CloseError::BufferModified(name)) = editor.close_document(doc_id, force) {
if let Err(CloseError::BufferModified(name)) =
helix_lsp::block_on(editor.close_document(doc_id, force))
{
Some((doc_id, name))
} else {
None
@ -269,6 +271,7 @@ fn write_impl(
doc.set_path(Some(path.as_ref().as_ref()))
.context("invalid filepath")?;
}
if doc.path().is_none() {
bail!("cannot write a buffer without a filename");
}
@ -287,8 +290,8 @@ fn write_impl(
} else {
None
};
let future = doc.format_and_save(fmt, force);
cx.jobs.add(Job::new(future).wait_before_exiting());
doc.format_and_save(fmt, force)?;
if path.is_some() {
let id = doc.id();
@ -602,8 +605,8 @@ fn write_all_impl(
} else {
None
};
let future = doc.format_and_save(fmt, force);
jobs.add(Job::new(future).wait_before_exiting());
doc.format_and_save(fmt, force)?;
}
if quit {

@ -62,7 +62,6 @@ async fn test_write_quit() -> anyhow::Result<()> {
}
#[tokio::test]
#[ignore]
async fn test_write_concurrent() -> anyhow::Result<()> {
let mut file = tempfile::NamedTempFile::new()?;
let mut command = String::new();
@ -92,7 +91,6 @@ async fn test_write_concurrent() -> anyhow::Result<()> {
}
#[tokio::test]
#[ignore]
async fn test_write_fail_mod_flag() -> anyhow::Result<()> {
let file = helpers::new_readonly_tempfile()?;

@ -3,6 +3,7 @@ use futures_util::future::BoxFuture;
use futures_util::FutureExt;
use helix_core::auto_pairs::AutoPairs;
use helix_core::Range;
use log::debug;
use serde::de::{self, Deserialize, Deserializer};
use serde::Serialize;
use std::borrow::Cow;
@ -13,6 +14,8 @@ use std::future::Future;
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::sync::Arc;
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use tokio::sync::Mutex;
use helix_core::{
encoding,
@ -83,6 +86,16 @@ impl Serialize for Mode {
}
}
/// A snapshot of the text of a document that we want to write out to disk
#[derive(Debug, Clone)]
pub struct DocumentSaveEvent {
pub revision: usize,
pub doc_id: DocumentId,
}
pub type DocumentSaveEventResult = Result<DocumentSaveEvent, anyhow::Error>;
pub type DocumentSaveEventFuture = BoxFuture<'static, DocumentSaveEventResult>;
pub struct Document {
pub(crate) id: DocumentId,
text: Rope,
@ -118,6 +131,9 @@ pub struct Document {
last_saved_revision: usize,
version: i32, // should be usize?
pub(crate) modified_since_accessed: bool,
save_sender: Option<UnboundedSender<DocumentSaveEventFuture>>,
save_receiver: Option<UnboundedReceiver<DocumentSaveEventFuture>>,
current_save: Arc<Mutex<Option<DocumentSaveEventFuture>>>,
diagnostics: Vec<Diagnostic>,
language_server: Option<Arc<helix_lsp::Client>>,
@ -338,6 +354,7 @@ impl Document {
let encoding = encoding.unwrap_or(encoding::UTF_8);
let changes = ChangeSet::new(&text);
let old_state = None;
let (save_sender, save_receiver) = tokio::sync::mpsc::unbounded_channel();
Self {
id: DocumentId::default(),
@ -358,6 +375,9 @@ impl Document {
savepoint: None,
last_saved_revision: 0,
modified_since_accessed: false,
save_sender: Some(save_sender),
save_receiver: Some(save_receiver),
current_save: Arc::new(Mutex::new(None)),
language_server: None,
}
}
@ -492,29 +512,34 @@ impl Document {
Some(fut.boxed())
}
pub fn save(&mut self, force: bool) -> impl Future<Output = Result<(), anyhow::Error>> {
pub fn save(&mut self, force: bool) -> Result<(), anyhow::Error> {
self.save_impl::<futures_util::future::Ready<_>>(None, force)
}
pub fn format_and_save(
&mut self,
formatting: Option<impl Future<Output = Result<Transaction, FormatterError>>>,
formatting: Option<
impl Future<Output = Result<Transaction, FormatterError>> + 'static + Send,
>,
force: bool,
) -> impl Future<Output = anyhow::Result<()>> {
) -> anyhow::Result<()> {
self.save_impl(formatting, force)
}
// TODO: do we need some way of ensuring two save operations on the same doc can't run at once?
// or is that handled by the OS/async layer
// TODO: impl Drop to handle ensuring writes when closed
/// The `Document`'s text is encoded according to its encoding and written to the file located
/// at its `path()`.
///
/// If `formatting` is present, it supplies some changes that we apply to the text before saving.
fn save_impl<F: Future<Output = Result<Transaction, FormatterError>>>(
fn save_impl<F: Future<Output = Result<Transaction, FormatterError>> + 'static + Send>(
&mut self,
formatting: Option<F>,
force: bool,
) -> impl Future<Output = Result<(), anyhow::Error>> {
) -> Result<(), anyhow::Error> {
if self.save_sender.is_none() {
bail!("saves are closed for this document!");
}
// we clone and move text + path into the future so that we asynchronously save the current
// state without blocking any further edits.
@ -525,12 +550,13 @@ impl Document {
let language_server = self.language_server.clone();
// mark changes up to now as saved
self.reset_modified();
let current_rev = self.get_current_revision();
let doc_id = self.id();
let encoding = self.encoding;
// We encode the file according to the `Document`'s encoding.
async move {
let save_event = async move {
use tokio::fs::File;
if let Some(parent) = path.parent() {
// TODO: display a prompt asking the user if the directories should be created
@ -563,9 +589,14 @@ impl Document {
let mut file = File::create(path).await?;
to_writer(&mut file, encoding, &text).await?;
let event = DocumentSaveEvent {
revision: current_rev,
doc_id,
};
if let Some(language_server) = language_server {
if !language_server.is_initialized() {
return Ok(());
return Ok(event);
}
if let Some(notification) =
language_server.text_document_did_save(identifier, &text)
@ -574,8 +605,70 @@ impl Document {
}
}
Ok(())
Ok(event)
};
self.save_sender
.as_mut()
.unwrap()
.send(Box::pin(save_event))
.map_err(|err| anyhow!("failed to send save event: {}", err))
}
pub async fn await_save(&mut self) -> Option<DocumentSaveEventResult> {
let mut current_save = self.current_save.lock().await;
if let Some(ref mut save) = *current_save {
let result = save.await;
*current_save = None;
debug!("save of '{:?}' result: {:?}", self.path(), result);
return Some(result);
}
// return early if the receiver is closed
self.save_receiver.as_ref()?;
let save = match self.save_receiver.as_mut().unwrap().recv().await {
Some(save) => save,
None => {
self.save_receiver = None;
return None;
}
};
// save a handle to the future so that when a poll on this
// function gets cancelled, we don't lose it
*current_save = Some(save);
debug!("awaiting save of '{:?}'", self.path());
let result = (*current_save).as_mut().unwrap().await;
*current_save = None;
debug!("save of '{:?}' result: {:?}", self.path(), result);
Some(result)
}
/// Prepares the Document for being closed by stopping any new writes
/// and flushing through the queue of pending writes. If any fail,
/// it stops early before emptying the rest of the queue. Callers
/// should keep calling until it returns None.
pub async fn close(&mut self) -> Option<DocumentSaveEventResult> {
if self.save_sender.is_some() {
self.save_sender = None;
}
let mut final_result = None;
while let Some(save_event) = self.await_save().await {
let is_err = save_event.is_err();
final_result = Some(save_event);
if is_err {
break;
}
}
final_result
}
/// Detect the programming language based on the file type.
@ -941,6 +1034,19 @@ impl Document {
self.last_saved_revision = current_revision;
}
/// Set the document's latest saved revision to the given one.
pub fn set_last_saved_revision(&mut self, rev: usize) {
self.last_saved_revision = rev;
}
/// Get the current revision number
pub fn get_current_revision(&mut self) -> usize {
let history = self.history.take();
let current_revision = history.current_revision();
self.history.set(history);
current_revision
}
/// Corresponding language scope name. Usually `source.<lang>`.
pub fn language_scope(&self) -> Option<&str> {
self.language

@ -1,6 +1,6 @@
use crate::{
clipboard::{get_clipboard_provider, ClipboardProvider},
document::Mode,
document::{DocumentSaveEventResult, Mode},
graphics::{CursorKind, Rect},
info::Info,
input::KeyEvent,
@ -9,8 +9,9 @@ use crate::{
Document, DocumentId, View, ViewId,
};
use futures_util::future;
use futures_util::stream::select_all::SelectAll;
use futures_util::stream::{select_all::SelectAll, FuturesUnordered};
use futures_util::{future, StreamExt};
use helix_lsp::Call;
use tokio_stream::wrappers::UnboundedReceiverStream;
use std::{
@ -65,7 +66,7 @@ where
)
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case", default, deny_unknown_fields)]
pub struct FilePickerConfig {
/// IgnoreOptions
@ -172,7 +173,7 @@ pub struct Config {
pub color_modes: bool,
}
#[derive(Debug, Default, Clone, PartialEq, Serialize, Deserialize)]
#[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(default, rename_all = "kebab-case", deny_unknown_fields)]
pub struct TerminalConfig {
pub command: String,
@ -225,7 +226,7 @@ pub fn get_terminal_provider() -> Option<TerminalConfig> {
None
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(default, rename_all = "kebab-case", deny_unknown_fields)]
pub struct LspConfig {
/// Display LSP progress messages below statusline
@ -246,7 +247,7 @@ impl Default for LspConfig {
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case", default, deny_unknown_fields)]
pub struct SearchConfig {
/// Smart case: Case insensitive searching unless pattern contains upper case characters. Defaults to true.
@ -255,7 +256,7 @@ pub struct SearchConfig {
pub wrap_around: bool,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case", default, deny_unknown_fields)]
pub struct StatusLineConfig {
pub left: Vec<StatusLineElement>,
@ -279,7 +280,7 @@ impl Default for StatusLineConfig {
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case", default, deny_unknown_fields)]
pub struct ModeConfig {
pub normal: String,
@ -458,7 +459,7 @@ impl std::str::FromStr for GutterType {
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(default)]
pub struct WhitespaceConfig {
pub render: WhitespaceRender,
@ -688,6 +689,15 @@ pub struct Editor {
pub config_events: (UnboundedSender<ConfigEvent>, UnboundedReceiver<ConfigEvent>),
}
#[derive(Debug)]
pub enum EditorEvent {
DocumentSave(DocumentSaveEventResult),
ConfigEvent(ConfigEvent),
LanguageServerMessage((usize, Call)),
DebuggerEvent(dap::Payload),
IdleTimer,
}
#[derive(Debug, Clone)]
pub enum ConfigEvent {
Refresh,
@ -719,6 +729,8 @@ pub enum CloseError {
DoesNotExist,
/// Buffer is modified
BufferModified(String),
/// Document failed to save
SaveError(anyhow::Error),
}
impl Editor {
@ -1079,8 +1091,12 @@ impl Editor {
self._refresh();
}
pub fn close_document(&mut self, doc_id: DocumentId, force: bool) -> Result<(), CloseError> {
let doc = match self.documents.get(&doc_id) {
pub async fn close_document(
&mut self,
doc_id: DocumentId,
force: bool,
) -> Result<(), CloseError> {
let doc = match self.documents.get_mut(&doc_id) {
Some(doc) => doc,
None => return Err(CloseError::DoesNotExist),
};
@ -1089,8 +1105,19 @@ impl Editor {
return Err(CloseError::BufferModified(doc.display_name().into_owned()));
}
if let Some(Err(err)) = doc.close().await {
return Err(CloseError::SaveError(err));
}
// Don't fail the whole write because the language server could not
// acknowledge the close
if let Some(language_server) = doc.language_server() {
tokio::spawn(language_server.text_document_did_close(doc.identifier()));
if let Err(err) = language_server
.text_document_did_close(doc.identifier())
.await
{
log::error!("Error closing doc in language server: {}", err);
}
}
enum Action {
@ -1269,4 +1296,32 @@ impl Editor {
.await
.map(|_| ())
}
pub async fn wait_event(&mut self) -> EditorEvent {
let mut saves: FuturesUnordered<_> = self
.documents
.values_mut()
.map(Document::await_save)
.collect();
tokio::select! {
biased;
Some(Some(event)) = saves.next() => {
EditorEvent::DocumentSave(event)
}
Some(config_event) = self.config_events.1.recv() => {
EditorEvent::ConfigEvent(config_event)
}
Some(message) = self.language_servers.incoming.next() => {
EditorEvent::LanguageServerMessage(message)
}
Some(event) = self.debugger_events.next() => {
EditorEvent::DebuggerEvent(event)
}
_ = &mut self.idle_timer => {
EditorEvent::IdleTimer
}
}
}
}

Loading…
Cancel
Save