dap: make transport IO-agnostic

pull/574/head
Dmitry Sharshakov 3 years ago committed by Blaž Hrastnik
parent e11b67b0db
commit 26a55dcefd

@ -280,7 +280,7 @@ impl Client {
let writer = BufWriter::new(process.stdin.take().expect("Failed to open stdin")); let writer = BufWriter::new(process.stdin.take().expect("Failed to open stdin"));
let reader = BufReader::new(process.stdout.take().expect("Failed to open stdout")); let reader = BufReader::new(process.stdout.take().expect("Failed to open stdout"));
let (server_rx, server_tx) = Transport::start(reader, writer, id); let (server_rx, server_tx) = Transport::start(Box::new(reader), Box::new(writer), id);
let client = Self { let client = Self {
id, id,

@ -1,272 +1,271 @@
use crate::{Error, Result}; use crate::{Error, Result};
use anyhow::Context; use anyhow::Context;
use log::{error, info}; use log::{error, info};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_json::Value; use serde_json::Value;
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use tokio::{ use tokio::{
io::{AsyncBufRead, AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader, BufWriter}, io::{AsyncBufRead, AsyncBufReadExt, AsyncReadExt, AsyncWrite, AsyncWriteExt},
process::{ChildStdin, ChildStdout}, sync::{
sync::{ mpsc::{unbounded_channel, Sender, UnboundedReceiver, UnboundedSender},
mpsc::{unbounded_channel, Sender, UnboundedReceiver, UnboundedSender}, Mutex,
Mutex, },
}, };
};
#[derive(Debug, Clone, Deserialize, Serialize)]
#[derive(Debug, Clone, Deserialize, Serialize)] pub struct Request {
pub struct Request { #[serde(skip)]
#[serde(skip)] pub back_ch: Option<Sender<Result<Response>>>,
pub back_ch: Option<Sender<Result<Response>>>, pub seq: u64,
pub seq: u64, #[serde(rename = "type")]
#[serde(rename = "type")] pub msg_type: String,
pub msg_type: String, pub command: String,
pub command: String, pub arguments: Option<Value>,
pub arguments: Option<Value>, }
}
#[derive(Debug, PartialEq, Clone, Deserialize, Serialize)]
#[derive(Debug, PartialEq, Clone, Deserialize, Serialize)] pub struct Response {
pub struct Response { pub seq: u64,
pub seq: u64, #[serde(rename = "type")]
#[serde(rename = "type")] pub msg_type: String,
pub msg_type: String, pub request_seq: u64,
pub request_seq: u64, pub success: bool,
pub success: bool, pub command: String,
pub command: String, pub message: Option<String>,
pub message: Option<String>, pub body: Option<Value>,
pub body: Option<Value>, }
}
#[derive(Debug, PartialEq, Clone, Deserialize, Serialize)]
#[derive(Debug, PartialEq, Clone, Deserialize, Serialize)] pub struct Event {
pub struct Event { pub seq: u64,
pub seq: u64, #[serde(rename = "type")]
#[serde(rename = "type")] pub msg_type: String,
pub msg_type: String, pub event: String,
pub event: String, pub body: Option<Value>,
pub body: Option<Value>, }
}
#[derive(Debug, Clone, Deserialize, Serialize)]
#[derive(Debug, Clone, Deserialize, Serialize)] #[serde(untagged)]
#[serde(untagged)] pub enum Payload {
pub enum Payload { // type = "event"
// type = "event" Event(Event),
Event(Event), // type = "response"
// type = "response" Response(Response),
Response(Response), // type = "request"
// type = "request" Request(Request),
Request(Request), }
}
#[derive(Debug)]
#[derive(Debug)] pub struct Transport {
pub struct Transport { id: usize,
id: usize, pending_requests: Mutex<HashMap<u64, Sender<Result<Response>>>>,
pending_requests: Mutex<HashMap<u64, Sender<Result<Response>>>>, }
}
impl Transport {
impl Transport { pub fn start(
pub fn start( server_stdout: Box<dyn AsyncBufRead + Unpin + Send>,
server_stdout: BufReader<ChildStdout>, server_stdin: Box<dyn AsyncWrite + Unpin + Send>,
server_stdin: BufWriter<ChildStdin>, id: usize,
id: usize, ) -> (UnboundedReceiver<Payload>, UnboundedSender<Request>) {
) -> (UnboundedReceiver<Payload>, UnboundedSender<Request>) { let (client_tx, rx) = unbounded_channel();
let (client_tx, rx) = unbounded_channel(); let (tx, client_rx) = unbounded_channel();
let (tx, client_rx) = unbounded_channel();
let transport = Self {
let transport = Self { id,
id, pending_requests: Mutex::new(HashMap::default()),
pending_requests: Mutex::new(HashMap::default()), };
};
let transport = Arc::new(transport);
let transport = Arc::new(transport);
tokio::spawn(Self::recv(transport.clone(), server_stdout, client_tx));
tokio::spawn(Self::recv(transport.clone(), server_stdout, client_tx)); tokio::spawn(Self::send(transport, server_stdin, client_rx));
tokio::spawn(Self::send(transport, server_stdin, client_rx));
(rx, tx)
(rx, tx) }
}
async fn recv_server_message(
async fn recv_server_message( reader: &mut Box<dyn AsyncBufRead + Unpin + Send>,
reader: &mut (impl AsyncBufRead + Unpin + Send), buffer: &mut String,
buffer: &mut String, ) -> Result<Payload> {
) -> Result<Payload> { let mut content_length = None;
let mut content_length = None; loop {
loop { buffer.truncate(0);
buffer.truncate(0); reader.read_line(buffer).await?;
reader.read_line(buffer).await?; let header = buffer.trim();
let header = buffer.trim();
if header.is_empty() {
if header.is_empty() { break;
break; }
}
let mut parts = header.split(": ");
let mut parts = header.split(": ");
match (parts.next(), parts.next(), parts.next()) {
match (parts.next(), parts.next(), parts.next()) { (Some("Content-Length"), Some(value), None) => {
(Some("Content-Length"), Some(value), None) => { content_length = Some(value.parse().context("invalid content length")?);
content_length = Some(value.parse().context("invalid content length")?); }
} (Some(_), Some(_), None) => {}
(Some(_), Some(_), None) => {} _ => {
_ => { return Err(std::io::Error::new(
return Err(std::io::Error::new( std::io::ErrorKind::Other,
std::io::ErrorKind::Other, "Failed to parse header",
"Failed to parse header", )
) .into());
.into()); }
} }
} }
}
let content_length = content_length.context("missing content length")?;
let content_length = content_length.context("missing content length")?;
//TODO: reuse vector
//TODO: reuse vector let mut content = vec![0; content_length];
let mut content = vec![0; content_length]; reader.read_exact(&mut content).await?;
reader.read_exact(&mut content).await?; let msg = std::str::from_utf8(&content).context("invalid utf8 from server")?;
let msg = std::str::from_utf8(&content).context("invalid utf8 from server")?;
info!("<- DAP {}", msg);
info!("<- DAP {}", msg);
// try parsing as output (server response) or call (server request)
// try parsing as output (server response) or call (server request) let output: serde_json::Result<Payload> = serde_json::from_str(msg);
let output: serde_json::Result<Payload> = serde_json::from_str(msg);
Ok(output?)
Ok(output?) }
}
async fn send_payload_to_server(
async fn send_payload_to_server( &self,
&self, server_stdin: &mut Box<dyn AsyncWrite + Unpin + Send>,
server_stdin: &mut BufWriter<ChildStdin>, req: Request,
req: Request, ) -> Result<()> {
) -> Result<()> { let json = serde_json::to_string(&req)?;
let json = serde_json::to_string(&req)?; if let Some(back) = req.back_ch {
if let Some(back) = req.back_ch { self.pending_requests.lock().await.insert(req.seq, back);
self.pending_requests.lock().await.insert(req.seq, back); }
} self.send_string_to_server(server_stdin, json).await
self.send_string_to_server(server_stdin, json).await }
}
async fn send_string_to_server(
async fn send_string_to_server( &self,
&self, server_stdin: &mut Box<dyn AsyncWrite + Unpin + Send>,
server_stdin: &mut BufWriter<ChildStdin>, request: String,
request: String, ) -> Result<()> {
) -> Result<()> { info!("-> DAP {}", request);
info!("-> DAP {}", request);
// send the headers
// send the headers server_stdin
server_stdin .write_all(format!("Content-Length: {}\r\n\r\n", request.len()).as_bytes())
.write_all(format!("Content-Length: {}\r\n\r\n", request.len()).as_bytes()) .await?;
.await?;
// send the body
// send the body server_stdin.write_all(request.as_bytes()).await?;
server_stdin.write_all(request.as_bytes()).await?;
server_stdin.flush().await?;
server_stdin.flush().await?;
Ok(())
Ok(()) }
}
async fn process_server_message(
async fn process_server_message( &self,
&self, client_tx: &UnboundedSender<Payload>,
client_tx: &UnboundedSender<Payload>, msg: Payload,
msg: Payload, ) -> Result<()> {
) -> Result<()> { let (id, result) = match msg {
let (id, result) = match msg { Payload::Response(Response {
Payload::Response(Response { success: true,
success: true, seq,
seq, request_seq,
request_seq, ..
.. }) => {
}) => { info!("<- DAP success ({}, in response to {})", seq, request_seq);
info!("<- DAP success ({}, in response to {})", seq, request_seq); if let Payload::Response(val) = msg {
if let Payload::Response(val) = msg { (request_seq, Ok(val))
(request_seq, Ok(val)) } else {
} else { unreachable!();
unreachable!(); }
} }
} Payload::Response(Response {
Payload::Response(Response { success: false,
success: false, message,
message, body,
body, request_seq,
request_seq, command,
command, ..
.. }) => {
}) => { error!(
error!( "<- DAP error {:?} ({:?}) for command #{} {}",
"<- DAP error {:?} ({:?}) for command #{} {}", message, body, request_seq, command
message, body, request_seq, command );
); (
( request_seq,
request_seq, Err(Error::Other(anyhow::format_err!("{:?}", body))),
Err(Error::Other(anyhow::format_err!("{:?}", body))), )
) }
} Payload::Request(Request {
Payload::Request(Request { ref command,
ref command, ref seq,
ref seq, ..
.. }) => {
}) => { info!("<- DAP request {} #{}", command, seq);
info!("<- DAP request {} #{}", command, seq); client_tx.send(msg).expect("Failed to send");
client_tx.send(msg).expect("Failed to send"); return Ok(());
return Ok(()); }
} Payload::Event(Event {
Payload::Event(Event { ref event, ref seq, ..
ref event, ref seq, .. }) => {
}) => { info!("<- DAP event {} #{}", event, seq);
info!("<- DAP event {} #{}", event, seq); client_tx.send(msg).expect("Failed to send");
client_tx.send(msg).expect("Failed to send"); return Ok(());
return Ok(()); }
} };
};
let tx = self
let tx = self .pending_requests
.pending_requests .lock()
.lock() .await
.await .remove(&id)
.remove(&id) .expect("pending_request with id not found!");
.expect("pending_request with id not found!");
match tx.send(result).await {
match tx.send(result).await { Ok(_) => (),
Ok(_) => (), Err(_) => error!(
Err(_) => error!( "Tried sending response into a closed channel (id={:?}), original request likely timed out",
"Tried sending response into a closed channel (id={:?}), original request likely timed out", id
id ),
), };
};
Ok(())
Ok(()) }
}
async fn recv(
async fn recv( transport: Arc<Self>,
transport: Arc<Self>, mut server_stdout: Box<dyn AsyncBufRead + Unpin + Send>,
mut server_stdout: BufReader<ChildStdout>, client_tx: UnboundedSender<Payload>,
client_tx: UnboundedSender<Payload>, ) {
) { let mut recv_buffer = String::new();
let mut recv_buffer = String::new(); loop {
loop { match Self::recv_server_message(&mut server_stdout, &mut recv_buffer).await {
match Self::recv_server_message(&mut server_stdout, &mut recv_buffer).await { Ok(msg) => {
Ok(msg) => { transport
transport .process_server_message(&client_tx, msg)
.process_server_message(&client_tx, msg) .await
.await .unwrap();
.unwrap(); }
} Err(err) => {
Err(err) => { error!("err: <- {:?}", err);
error!("err: <- {:?}", err); break;
break; }
} }
} }
} }
}
async fn send(
async fn send( transport: Arc<Self>,
transport: Arc<Self>, mut server_stdin: Box<dyn AsyncWrite + Unpin + Send>,
mut server_stdin: BufWriter<ChildStdin>, mut client_rx: UnboundedReceiver<Request>,
mut client_rx: UnboundedReceiver<Request>, ) {
) { while let Some(req) = client_rx.recv().await {
while let Some(req) = client_rx.recv().await { transport
transport .send_payload_to_server(&mut server_stdin, req)
.send_payload_to_server(&mut server_stdin, req) .await
.await .unwrap()
.unwrap() }
} }
} }
}

Loading…
Cancel
Save