From b9797a7dd22a5aa7d401a7d37166e21fd810c235 Mon Sep 17 00:00:00 2001 From: Dmitry Sharshakov Date: Sun, 15 Aug 2021 12:46:16 +0300 Subject: [PATCH] client: support tcp_process transport --- helix-dap/examples/dap-dlv.rs | 2 +- helix-dap/examples/dap-lldb.rs | 2 +- helix-dap/src/client.rs | 121 ++++++++++++++++++++++++--------- 3 files changed, 90 insertions(+), 35 deletions(-) diff --git a/helix-dap/examples/dap-dlv.rs b/helix-dap/examples/dap-dlv.rs index c42559b99..a18ec5e2b 100644 --- a/helix-dap/examples/dap-dlv.rs +++ b/helix-dap/examples/dap-dlv.rs @@ -35,7 +35,7 @@ pub async fn main() -> Result<()> { .apply() .expect("Failed to set up logging"); - let client = Client::tcp("127.0.0.1:7777".parse::().unwrap(), 0).await; + let client = Client::tcp_process("dlv", vec!["dap"], "-l 127.0.0.1:{}", 0).await; println!("create: {:?}", client); let mut client = client?; diff --git a/helix-dap/examples/dap-lldb.rs b/helix-dap/examples/dap-lldb.rs index 6fe666b21..5e10ebed5 100644 --- a/helix-dap/examples/dap-lldb.rs +++ b/helix-dap/examples/dap-lldb.rs @@ -35,7 +35,7 @@ pub async fn main() -> Result<()> { .apply() .expect("Failed to set up logging"); - let client = Client::tcp("127.0.0.1:7777".parse::().unwrap(), 0).await; + let client = Client::tcp_process("lldb-vscode", vec![], "-p {}", 0).await; println!("create: {:?}", client); let mut client = client?; diff --git a/helix-dap/src/client.rs b/helix-dap/src/client.rs index 8339b953f..080061cff 100644 --- a/helix-dap/src/client.rs +++ b/helix-dap/src/client.rs @@ -5,11 +5,18 @@ use crate::{ use log::{error, info}; use serde::{Deserialize, Serialize}; use serde_json::{from_value, to_value, Value}; -use std::sync::{ - atomic::{AtomicU64, Ordering}, - Arc, +use std::{ + collections::HashMap, + net::{IpAddr, Ipv4Addr}, + process::Stdio, +}; +use std::{ + net::SocketAddr, + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, + }, }; -use std::{collections::HashMap, process::Stdio}; use tokio::{ io::{AsyncBufRead, AsyncWrite, BufReader, BufWriter}, join, @@ -19,6 +26,7 @@ use tokio::{ mpsc::{channel, Receiver, Sender, UnboundedReceiver, UnboundedSender}, Mutex, }, + time, }; #[derive(Debug, PartialEq, Clone, Deserialize, Serialize)] @@ -376,6 +384,82 @@ impl Client { Ok(client) } + pub async fn tcp(addr: std::net::SocketAddr, id: usize) -> Result { + let stream = TcpStream::connect(addr).await?; + let (rx, tx) = stream.into_split(); + Self::streams(Box::new(BufReader::new(rx)), Box::new(tx), id, None) + } + + pub fn stdio(cmd: &str, args: Vec<&str>, id: usize) -> Result { + let process = Command::new(cmd) + .args(args) + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + // make sure the process is reaped on drop + .kill_on_drop(true) + .spawn(); + + let mut process = process?; + + // TODO: do we need bufreader/writer here? or do we use async wrappers on unblock? + let writer = BufWriter::new(process.stdin.take().expect("Failed to open stdin")); + let reader = BufReader::new(process.stdout.take().expect("Failed to open stdout")); + + Self::streams( + Box::new(BufReader::new(reader)), + Box::new(writer), + id, + Some(process), + ) + } + + async fn get_port() -> Option { + Some( + tokio::net::TcpListener::bind(SocketAddr::new( + IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), + 0, + )) + .await + .ok()? + .local_addr() + .ok()? + .port(), + ) + } + + pub async fn tcp_process( + cmd: &str, + args: Vec<&str>, + port_format: &str, + id: usize, + ) -> Result { + let port = Self::get_port().await.unwrap(); + + let process = Command::new(cmd) + .args(args) + .args(port_format.replace("{}", &port.to_string()).split(' ')) + // make sure the process is reaped on drop + .kill_on_drop(true) + .spawn()?; + + // Wait for adapter to become ready for connection + time::sleep(time::Duration::from_millis(500)).await; + + let stream = TcpStream::connect(SocketAddr::new( + IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), + port, + )) + .await?; + + let (rx, tx) = stream.into_split(); + Self::streams( + Box::new(BufReader::new(rx)), + Box::new(tx), + id, + Some(process), + ) + } + async fn recv( awaited_events: Arc>>>, mut server_rx: UnboundedReceiver, @@ -413,35 +497,6 @@ impl Client { tx } - pub async fn tcp(addr: std::net::SocketAddr, id: usize) -> Result { - let stream = TcpStream::connect(addr).await?; - let (rx, tx) = stream.into_split(); - Self::streams(Box::new(BufReader::new(rx)), Box::new(tx), id, None) - } - - pub fn stdio(cmd: &str, args: Vec<&str>, id: usize) -> Result { - let process = Command::new(cmd) - .args(args) - .stdin(Stdio::piped()) - .stdout(Stdio::piped()) - // make sure the process is reaped on drop - .kill_on_drop(true) - .spawn(); - - let mut process = process?; - - // TODO: do we need bufreader/writer here? or do we use async wrappers on unblock? - let writer = BufWriter::new(process.stdin.take().expect("Failed to open stdin")); - let reader = BufReader::new(process.stdout.take().expect("Failed to open stdout")); - - Self::streams( - Box::new(BufReader::new(reader)), - Box::new(writer), - id, - Some(process), - ) - } - pub fn id(&self) -> usize { self.id }