diff --git a/helix-lsp/src/client.rs b/helix-lsp/src/client.rs index 1f07cf897..160dd93b4 100644 --- a/helix-lsp/src/client.rs +++ b/helix-lsp/src/client.rs @@ -23,13 +23,16 @@ use smol::{ Executor, }; +fn text_document_identifier(doc: &Document) -> lsp::TextDocumentIdentifier { + lsp::TextDocumentIdentifier::new(lsp::Url::from_file_path(doc.path().unwrap()).unwrap()) +} + pub struct Client { _process: Child, stderr: BufReader, outgoing: Sender, - pub incoming: Receiver, - + // pub incoming: Receiver, pub request_counter: AtomicU64, capabilities: Option, @@ -38,7 +41,7 @@ pub struct Client { } impl Client { - pub fn start(ex: &Executor, cmd: &str, args: &[String]) -> Self { + pub fn start(ex: &Executor, cmd: &str, args: &[String]) -> (Self, Receiver) { let mut process = Command::new(cmd) .args(args) .stdin(Stdio::piped()) @@ -55,18 +58,22 @@ impl Client { let (incoming, outgoing) = Transport::start(ex, reader, writer); - Client { + let client = Client { _process: process, stderr, outgoing, - incoming, - + // incoming, request_counter: AtomicU64::new(0), capabilities: None, // diagnostics: HashMap::new(), - } + }; + + // TODO: async client.initialize() + // maybe use an arc flag + + (client, incoming) } fn next_request_id(&self) -> jsonrpc::Id { @@ -219,7 +226,7 @@ impl Client { // Text document // ------------------------------------------------------------------------------------------- - pub async fn text_document_did_open(&mut self, doc: &Document) -> Result<()> { + pub async fn text_document_did_open(&self, doc: &Document) -> Result<()> { self.notify::(lsp::DidOpenTextDocumentParams { text_document: lsp::TextDocumentItem { uri: lsp::Url::from_file_path(doc.path().unwrap()).unwrap(), @@ -295,7 +302,7 @@ impl Client { // TODO: trigger any time history.commit_revision happens pub async fn text_document_did_change( - &mut self, + &self, doc: &Document, transaction: &Transaction, ) -> Result<()> { @@ -328,6 +335,7 @@ impl Client { self.notify::(lsp::DidChangeTextDocumentParams { text_document: lsp::VersionedTextDocumentIdentifier::new( + // TODO: doc.into() Url lsp::Url::from_file_path(doc.path().unwrap()).unwrap(), doc.version, ), @@ -338,18 +346,16 @@ impl Client { // TODO: impl into() TextDocumentIdentifier / VersionedTextDocumentIdentifier for Document. - pub async fn text_document_did_close(&mut self, doc: &Document) -> Result<()> { + pub async fn text_document_did_close(&self, doc: &Document) -> Result<()> { self.notify::(lsp::DidCloseTextDocumentParams { - text_document: lsp::TextDocumentIdentifier::new( - lsp::Url::from_file_path(doc.path().unwrap()).unwrap(), - ), + text_document: text_document_identifier(doc), }) .await } // will_save / will_save_wait_until - pub async fn text_document_did_save(&mut self) -> anyhow::Result<()> { + pub async fn text_document_did_save(&self) -> anyhow::Result<()> { unimplemented!() } } diff --git a/helix-lsp/src/lib.rs b/helix-lsp/src/lib.rs index eae6fa868..c56721a59 100644 --- a/helix-lsp/src/lib.rs +++ b/helix-lsp/src/lib.rs @@ -1,4 +1,5 @@ mod client; +mod select_all; mod transport; pub use jsonrpc_core as jsonrpc; @@ -69,16 +70,24 @@ pub use jsonrpc::Call; type LanguageId = String; -pub static REGISTRY: Lazy = Lazy::new(Registry::init); +use crate::select_all::SelectAll; +use smol::channel::Receiver; pub struct Registry { inner: HashMap>>, + + pub incoming: SelectAll>, } impl Registry { - pub fn init() -> Self { + pub fn new() -> Self { + let mut inner = HashMap::new(); + + inner.insert("rust".to_string(), OnceCell::new()); + Self { - inner: HashMap::new(), + inner, + incoming: SelectAll::new(), } } @@ -91,8 +100,12 @@ impl Registry { // TODO: lookup defaults for id (name, args) // initialize a new client - let client = Client::start(&ex, "rust-analyzer", &[]); - // TODO: also call initialize().await() + let (mut client, incoming) = Client::start(&ex, "rust-analyzer", &[]); + // TODO: run this async without blocking + smol::block_on(client.initialize()).unwrap(); + + self.incoming.push(incoming); + Arc::new(client) }) }) @@ -115,3 +128,13 @@ impl Registry { // -> PROBLEM: how do you trigger an update on the editor side when data updates? // // -> The data updates should pull all events until we run out so we don't frequently re-render +// +// +// v2: +// +// there should be a registry of lsp clients, one per language type (or workspace). +// the clients should lazy init on first access +// the client.initialize() should be called async and we buffer any requests until that completes +// there needs to be a way to process incoming lsp messages from all clients. +// -> notifications need to be dispatched to wherever +// -> requests need to generate a reply and travel back to the same lsp! diff --git a/helix-lsp/src/select_all.rs b/helix-lsp/src/select_all.rs new file mode 100644 index 000000000..987f2a102 --- /dev/null +++ b/helix-lsp/src/select_all.rs @@ -0,0 +1,143 @@ +//! An unbounded set of streams + +use core::fmt::{self, Debug}; +use core::iter::FromIterator; +use core::pin::Pin; + +use smol::ready; +use smol::stream::Stream; +use std::task::{Context, Poll}; + +use futures_util::stream::FusedStream; +use futures_util::stream::{FuturesUnordered, StreamExt, StreamFuture}; + +/// An unbounded set of streams +/// +/// This "combinator" provides the ability to maintain a set of streams +/// and drive them all to completion. +/// +/// Streams are pushed into this set and their realized values are +/// yielded as they become ready. Streams will only be polled when they +/// generate notifications. This allows to coordinate a large number of streams. +/// +/// Note that you can create a ready-made `SelectAll` via the +/// `select_all` function in the `stream` module, or you can start with an +/// empty set with the `SelectAll::new` constructor. +#[must_use = "streams do nothing unless polled"] +pub struct SelectAll { + inner: FuturesUnordered>, +} + +impl Debug for SelectAll { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "SelectAll {{ ... }}") + } +} + +impl SelectAll { + /// Constructs a new, empty `SelectAll` + /// + /// The returned `SelectAll` does not contain any streams and, in this + /// state, `SelectAll::poll` will return `Poll::Ready(None)`. + pub fn new() -> Self { + Self { + inner: FuturesUnordered::new(), + } + } + + /// Returns the number of streams contained in the set. + /// + /// This represents the total number of in-flight streams. + pub fn len(&self) -> usize { + self.inner.len() + } + + /// Returns `true` if the set contains no streams + pub fn is_empty(&self) -> bool { + self.inner.is_empty() + } + + /// Push a stream into the set. + /// + /// This function submits the given stream to the set for managing. This + /// function will not call `poll` on the submitted stream. The caller must + /// ensure that `SelectAll::poll` is called in order to receive task + /// notifications. + pub fn push(&self, stream: St) { + self.inner.push(stream.into_future()); + } +} + +impl Default for SelectAll { + fn default() -> Self { + Self::new() + } +} + +impl Stream for SelectAll { + type Item = St::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + loop { + match ready!(self.inner.poll_next_unpin(cx)) { + Some((Some(item), remaining)) => { + self.push(remaining); + return Poll::Ready(Some(item)); + } + Some((None, _)) => { + // `FuturesUnordered` thinks it isn't terminated + // because it yielded a Some. + // We do not return, but poll `FuturesUnordered` + // in the next loop iteration. + } + None => return Poll::Ready(None), + } + } + } +} + +impl FusedStream for SelectAll { + fn is_terminated(&self) -> bool { + self.inner.is_terminated() + } +} + +/// Convert a list of streams into a `Stream` of results from the streams. +/// +/// This essentially takes a list of streams (e.g. a vector, an iterator, etc.) +/// and bundles them together into a single stream. +/// The stream will yield items as they become available on the underlying +/// streams internally, in the order they become available. +/// +/// Note that the returned set can also be used to dynamically push more +/// futures into the set as they become available. +/// +/// This function is only available when the `std` or `alloc` feature of this +/// library is activated, and it is activated by default. +pub fn select_all(streams: I) -> SelectAll +where + I: IntoIterator, + I::Item: Stream + Unpin, +{ + let mut set = SelectAll::new(); + + for stream in streams { + set.push(stream); + } + + set +} + +impl FromIterator for SelectAll { + fn from_iter>(iter: T) -> Self { + select_all(iter) + } +} + +impl Extend for SelectAll { + fn extend>(&mut self, iter: T) { + for st in iter { + self.push(st) + } + } +} diff --git a/helix-term/src/application.rs b/helix-term/src/application.rs index b90ec2837..e15f21c6a 100644 --- a/helix-term/src/application.rs +++ b/helix-term/src/application.rs @@ -10,6 +10,7 @@ use log::{debug, info}; use std::{ io::{self, stdout, Stdout, Write}, path::PathBuf, + sync::Arc, time::Duration, }; @@ -32,7 +33,8 @@ pub struct Application { terminal: Terminal, executor: &'static smol::Executor<'static>, - language_server: helix_lsp::Client, + language_server: Arc, + language_servers: helix_lsp::Registry, } impl Application { @@ -49,7 +51,8 @@ impl Application { let mut compositor = Compositor::new(); compositor.push(Box::new(ui::EditorView::new())); - let language_server = helix_lsp::Client::start(&executor, "rust-analyzer", &[]); + let language_servers = helix_lsp::Registry::new(); + let language_server = language_servers.get("rust", &executor).unwrap(); let mut app = Self { editor, @@ -58,6 +61,7 @@ impl Application { executor, language_server, + language_servers, }; Ok(app) @@ -81,9 +85,6 @@ impl Application { pub async fn event_loop(&mut self) { let mut reader = EventStream::new(); - // initialize lsp - self.language_server.initialize().await.unwrap(); - self.language_server .text_document_did_open(&self.editor.view().unwrap().doc) .await @@ -101,7 +102,7 @@ impl Application { event = reader.next().fuse() => { self.handle_terminal_events(event) } - call = self.language_server.incoming.next().fuse() => { + call = self.language_servers.incoming.next().fuse() => { self.handle_language_server_message(call).await } }