Add unique id to each lsp client/server pair

pull/297/head
wojciechkepka 3 years ago committed by Blaž Hrastnik
parent 80b4a69053
commit 38cb934d8f

@ -18,6 +18,7 @@ use tokio::{
#[derive(Debug)] #[derive(Debug)]
pub struct Client { pub struct Client {
id: usize,
_process: Child, _process: Child,
server_tx: UnboundedSender<Payload>, server_tx: UnboundedSender<Payload>,
request_counter: AtomicU64, request_counter: AtomicU64,
@ -26,7 +27,11 @@ pub struct Client {
} }
impl Client { impl Client {
pub fn start(cmd: &str, args: &[String]) -> Result<(Self, UnboundedReceiver<Call>)> { pub fn start(
cmd: &str,
args: &[String],
id: usize,
) -> Result<(Self, UnboundedReceiver<(usize, Call)>)> {
let process = Command::new(cmd) let process = Command::new(cmd)
.args(args) .args(args)
.stdin(Stdio::piped()) .stdin(Stdio::piped())
@ -43,9 +48,10 @@ impl Client {
let reader = BufReader::new(process.stdout.take().expect("Failed to open stdout")); let reader = BufReader::new(process.stdout.take().expect("Failed to open stdout"));
let stderr = BufReader::new(process.stderr.take().expect("Failed to open stderr")); let stderr = BufReader::new(process.stderr.take().expect("Failed to open stderr"));
let (server_rx, server_tx) = Transport::start(reader, writer, stderr); let (server_rx, server_tx) = Transport::start(reader, writer, stderr, id);
let client = Self { let client = Self {
id,
_process: process, _process: process,
server_tx, server_tx,
request_counter: AtomicU64::new(0), request_counter: AtomicU64::new(0),
@ -59,6 +65,10 @@ impl Client {
Ok((client, server_rx)) Ok((client, server_rx))
} }
pub fn id(&self) -> usize {
self.id
}
fn next_request_id(&self) -> jsonrpc::Id { fn next_request_id(&self) -> jsonrpc::Id {
let id = self.request_counter.fetch_add(1, Ordering::Relaxed); let id = self.request_counter.fetch_add(1, Ordering::Relaxed);
jsonrpc::Id::Num(id) jsonrpc::Id::Num(id)

@ -13,7 +13,10 @@ use helix_core::syntax::LanguageConfiguration;
use std::{ use std::{
collections::{hash_map::Entry, HashMap}, collections::{hash_map::Entry, HashMap},
sync::Arc, sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
}; };
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@ -254,9 +257,10 @@ impl Notification {
#[derive(Debug)] #[derive(Debug)]
pub struct Registry { pub struct Registry {
inner: HashMap<LanguageId, Arc<Client>>, inner: HashMap<LanguageId, (usize, Arc<Client>)>,
pub incoming: SelectAll<UnboundedReceiverStream<Call>>, counter: AtomicUsize,
pub incoming: SelectAll<UnboundedReceiverStream<(usize, Call)>>,
} }
impl Default for Registry { impl Default for Registry {
@ -269,10 +273,18 @@ impl Registry {
pub fn new() -> Self { pub fn new() -> Self {
Self { Self {
inner: HashMap::new(), inner: HashMap::new(),
counter: AtomicUsize::new(0),
incoming: SelectAll::new(), incoming: SelectAll::new(),
} }
} }
pub fn get_by_id(&mut self, id: usize) -> Option<&Client> {
self.inner
.values()
.find(|(client_id, _)| client_id == &id)
.map(|(_, client)| client.as_ref())
}
pub fn get(&mut self, language_config: &LanguageConfiguration) -> Result<Arc<Client>> { pub fn get(&mut self, language_config: &LanguageConfiguration) -> Result<Arc<Client>> {
if let Some(config) = &language_config.language_server { if let Some(config) = &language_config.language_server {
// avoid borrow issues // avoid borrow issues
@ -280,16 +292,17 @@ impl Registry {
let s_incoming = &mut self.incoming; let s_incoming = &mut self.incoming;
match inner.entry(language_config.scope.clone()) { match inner.entry(language_config.scope.clone()) {
Entry::Occupied(language_server) => Ok(language_server.get().clone()), Entry::Occupied(entry) => Ok(entry.get().1.clone()),
Entry::Vacant(entry) => { Entry::Vacant(entry) => {
// initialize a new client // initialize a new client
let (mut client, incoming) = Client::start(&config.command, &config.args)?; let id = self.counter.fetch_add(1, Ordering::Relaxed);
let (mut client, incoming) = Client::start(&config.command, &config.args, id)?;
// TODO: run this async without blocking // TODO: run this async without blocking
futures_executor::block_on(client.initialize())?; futures_executor::block_on(client.initialize())?;
s_incoming.push(UnboundedReceiverStream::new(incoming)); s_incoming.push(UnboundedReceiverStream::new(incoming));
let client = Arc::new(client); let client = Arc::new(client);
entry.insert(client.clone()); entry.insert((id, client.clone()));
Ok(client) Ok(client)
} }
} }

@ -33,7 +33,8 @@ enum ServerMessage {
#[derive(Debug)] #[derive(Debug)]
pub struct Transport { pub struct Transport {
client_tx: UnboundedSender<jsonrpc::Call>, id: usize,
client_tx: UnboundedSender<(usize, jsonrpc::Call)>,
client_rx: UnboundedReceiver<Payload>, client_rx: UnboundedReceiver<Payload>,
pending_requests: HashMap<jsonrpc::Id, Sender<Result<Value>>>, pending_requests: HashMap<jsonrpc::Id, Sender<Result<Value>>>,
@ -48,11 +49,16 @@ impl Transport {
server_stdout: BufReader<ChildStdout>, server_stdout: BufReader<ChildStdout>,
server_stdin: BufWriter<ChildStdin>, server_stdin: BufWriter<ChildStdin>,
server_stderr: BufReader<ChildStderr>, server_stderr: BufReader<ChildStderr>,
) -> (UnboundedReceiver<jsonrpc::Call>, UnboundedSender<Payload>) { id: usize,
) -> (
UnboundedReceiver<(usize, jsonrpc::Call)>,
UnboundedSender<Payload>,
) {
let (client_tx, rx) = unbounded_channel(); let (client_tx, rx) = unbounded_channel();
let (tx, client_rx) = unbounded_channel(); let (tx, client_rx) = unbounded_channel();
let transport = Self { let transport = Self {
id,
server_stdout, server_stdout,
server_stdin, server_stdin,
server_stderr, server_stderr,
@ -156,7 +162,7 @@ impl Transport {
match msg { match msg {
ServerMessage::Output(output) => self.process_request_response(output).await?, ServerMessage::Output(output) => self.process_request_response(output).await?,
ServerMessage::Call(call) => { ServerMessage::Call(call) => {
self.client_tx.send(call).unwrap(); self.client_tx.send((self.id, call)).unwrap();
// let notification = Notification::parse(&method, params); // let notification = Notification::parse(&method, params);
} }
}; };

@ -109,8 +109,8 @@ impl Application {
event = reader.next() => { event = reader.next() => {
self.handle_terminal_events(event) self.handle_terminal_events(event)
} }
Some(call) = self.editor.language_servers.incoming.next() => { Some((id, call)) = self.editor.language_servers.incoming.next() => {
self.handle_language_server_message(call).await self.handle_language_server_message(call, id).await
} }
Some(callback) = &mut self.callbacks.next() => { Some(callback) = &mut self.callbacks.next() => {
self.handle_language_server_callback(callback) self.handle_language_server_callback(callback)
@ -153,8 +153,12 @@ impl Application {
} }
} }
pub async fn handle_language_server_message(&mut self, call: helix_lsp::Call) { pub async fn handle_language_server_message(
use helix_lsp::{Call, Notification}; &mut self,
call: helix_lsp::Call,
server_id: usize,
) {
use helix_lsp::{Call, MethodCall, Notification};
match call { match call {
Call::Notification(helix_lsp::jsonrpc::Notification { method, params, .. }) => { Call::Notification(helix_lsp::jsonrpc::Notification { method, params, .. }) => {
let notification = match Notification::parse(&method, params) { let notification = match Notification::parse(&method, params) {

Loading…
Cancel
Save