From 38cb934d8f579663ff13496bf79293103863b60b Mon Sep 17 00:00:00 2001 From: wojciechkepka Date: Fri, 18 Jun 2021 05:42:34 +0200 Subject: [PATCH] Add unique id to each lsp client/server pair --- helix-lsp/src/client.rs | 14 ++++++++++++-- helix-lsp/src/lib.rs | 25 +++++++++++++++++++------ helix-lsp/src/transport.rs | 12 +++++++++--- helix-term/src/application.rs | 12 ++++++++---- 4 files changed, 48 insertions(+), 15 deletions(-) diff --git a/helix-lsp/src/client.rs b/helix-lsp/src/client.rs index 6554e996f..e14d0197c 100644 --- a/helix-lsp/src/client.rs +++ b/helix-lsp/src/client.rs @@ -18,6 +18,7 @@ use tokio::{ #[derive(Debug)] pub struct Client { + id: usize, _process: Child, server_tx: UnboundedSender, request_counter: AtomicU64, @@ -26,7 +27,11 @@ pub struct Client { } impl Client { - pub fn start(cmd: &str, args: &[String]) -> Result<(Self, UnboundedReceiver)> { + pub fn start( + cmd: &str, + args: &[String], + id: usize, + ) -> Result<(Self, UnboundedReceiver<(usize, Call)>)> { let process = Command::new(cmd) .args(args) .stdin(Stdio::piped()) @@ -43,9 +48,10 @@ impl Client { let reader = BufReader::new(process.stdout.take().expect("Failed to open stdout")); let stderr = BufReader::new(process.stderr.take().expect("Failed to open stderr")); - let (server_rx, server_tx) = Transport::start(reader, writer, stderr); + let (server_rx, server_tx) = Transport::start(reader, writer, stderr, id); let client = Self { + id, _process: process, server_tx, request_counter: AtomicU64::new(0), @@ -59,6 +65,10 @@ impl Client { Ok((client, server_rx)) } + pub fn id(&self) -> usize { + self.id + } + fn next_request_id(&self) -> jsonrpc::Id { let id = self.request_counter.fetch_add(1, Ordering::Relaxed); jsonrpc::Id::Num(id) diff --git a/helix-lsp/src/lib.rs b/helix-lsp/src/lib.rs index be1e23a56..774de075a 100644 --- a/helix-lsp/src/lib.rs +++ b/helix-lsp/src/lib.rs @@ -13,7 +13,10 @@ use helix_core::syntax::LanguageConfiguration; use std::{ collections::{hash_map::Entry, HashMap}, - sync::Arc, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, }; use serde::{Deserialize, Serialize}; @@ -254,9 +257,10 @@ impl Notification { #[derive(Debug)] pub struct Registry { - inner: HashMap>, + inner: HashMap)>, - pub incoming: SelectAll>, + counter: AtomicUsize, + pub incoming: SelectAll>, } impl Default for Registry { @@ -269,10 +273,18 @@ impl Registry { pub fn new() -> Self { Self { inner: HashMap::new(), + counter: AtomicUsize::new(0), incoming: SelectAll::new(), } } + pub fn get_by_id(&mut self, id: usize) -> Option<&Client> { + self.inner + .values() + .find(|(client_id, _)| client_id == &id) + .map(|(_, client)| client.as_ref()) + } + pub fn get(&mut self, language_config: &LanguageConfiguration) -> Result> { if let Some(config) = &language_config.language_server { // avoid borrow issues @@ -280,16 +292,17 @@ impl Registry { let s_incoming = &mut self.incoming; match inner.entry(language_config.scope.clone()) { - Entry::Occupied(language_server) => Ok(language_server.get().clone()), + Entry::Occupied(entry) => Ok(entry.get().1.clone()), Entry::Vacant(entry) => { // initialize a new client - let (mut client, incoming) = Client::start(&config.command, &config.args)?; + let id = self.counter.fetch_add(1, Ordering::Relaxed); + let (mut client, incoming) = Client::start(&config.command, &config.args, id)?; // TODO: run this async without blocking futures_executor::block_on(client.initialize())?; s_incoming.push(UnboundedReceiverStream::new(incoming)); let client = Arc::new(client); - entry.insert(client.clone()); + entry.insert((id, client.clone())); Ok(client) } } diff --git a/helix-lsp/src/transport.rs b/helix-lsp/src/transport.rs index e80683236..29ce2e84c 100644 --- a/helix-lsp/src/transport.rs +++ b/helix-lsp/src/transport.rs @@ -33,7 +33,8 @@ enum ServerMessage { #[derive(Debug)] pub struct Transport { - client_tx: UnboundedSender, + id: usize, + client_tx: UnboundedSender<(usize, jsonrpc::Call)>, client_rx: UnboundedReceiver, pending_requests: HashMap>>, @@ -48,11 +49,16 @@ impl Transport { server_stdout: BufReader, server_stdin: BufWriter, server_stderr: BufReader, - ) -> (UnboundedReceiver, UnboundedSender) { + id: usize, + ) -> ( + UnboundedReceiver<(usize, jsonrpc::Call)>, + UnboundedSender, + ) { let (client_tx, rx) = unbounded_channel(); let (tx, client_rx) = unbounded_channel(); let transport = Self { + id, server_stdout, server_stdin, server_stderr, @@ -156,7 +162,7 @@ impl Transport { match msg { ServerMessage::Output(output) => self.process_request_response(output).await?, ServerMessage::Call(call) => { - self.client_tx.send(call).unwrap(); + self.client_tx.send((self.id, call)).unwrap(); // let notification = Notification::parse(&method, params); } }; diff --git a/helix-term/src/application.rs b/helix-term/src/application.rs index f5cba3651..3f1d6a107 100644 --- a/helix-term/src/application.rs +++ b/helix-term/src/application.rs @@ -109,8 +109,8 @@ impl Application { event = reader.next() => { self.handle_terminal_events(event) } - Some(call) = self.editor.language_servers.incoming.next() => { - self.handle_language_server_message(call).await + Some((id, call)) = self.editor.language_servers.incoming.next() => { + self.handle_language_server_message(call, id).await } Some(callback) = &mut self.callbacks.next() => { self.handle_language_server_callback(callback) @@ -153,8 +153,12 @@ impl Application { } } - pub async fn handle_language_server_message(&mut self, call: helix_lsp::Call) { - use helix_lsp::{Call, Notification}; + pub async fn handle_language_server_message( + &mut self, + call: helix_lsp::Call, + server_id: usize, + ) { + use helix_lsp::{Call, MethodCall, Notification}; match call { Call::Notification(helix_lsp::jsonrpc::Notification { method, params, .. }) => { let notification = match Notification::parse(&method, params) {