Handle stderr

pull/574/head
Blaž Hrastnik 3 years ago
parent b997d2cdeb
commit 7b61c63ece

@ -62,10 +62,11 @@ impl Client {
pub fn streams( pub fn streams(
rx: Box<dyn AsyncBufRead + Unpin + Send>, rx: Box<dyn AsyncBufRead + Unpin + Send>,
tx: Box<dyn AsyncWrite + Unpin + Send>, tx: Box<dyn AsyncWrite + Unpin + Send>,
err: Option<Box<dyn AsyncBufRead + Unpin + Send>>,
id: usize, id: usize,
process: Option<Child>, process: Option<Child>,
) -> Result<(Self, UnboundedReceiver<Payload>)> { ) -> Result<(Self, UnboundedReceiver<Payload>)> {
let (server_rx, server_tx) = Transport::start(rx, tx, id); let (server_rx, server_tx) = Transport::start(rx, tx, err, id);
let (client_rx, client_tx) = unbounded_channel(); let (client_rx, client_tx) = unbounded_channel();
let client = Self { let client = Self {
@ -92,7 +93,7 @@ impl Client {
) -> Result<(Self, UnboundedReceiver<Payload>)> { ) -> Result<(Self, UnboundedReceiver<Payload>)> {
let stream = TcpStream::connect(addr).await?; let stream = TcpStream::connect(addr).await?;
let (rx, tx) = stream.into_split(); let (rx, tx) = stream.into_split();
Self::streams(Box::new(BufReader::new(rx)), Box::new(tx), id, None) Self::streams(Box::new(BufReader::new(rx)), Box::new(tx), None, id, None)
} }
pub fn stdio( pub fn stdio(
@ -113,10 +114,12 @@ impl Client {
// TODO: do we need bufreader/writer here? or do we use async wrappers on unblock? // TODO: do we need bufreader/writer here? or do we use async wrappers on unblock?
let writer = BufWriter::new(process.stdin.take().expect("Failed to open stdin")); let writer = BufWriter::new(process.stdin.take().expect("Failed to open stdin"));
let reader = BufReader::new(process.stdout.take().expect("Failed to open stdout")); let reader = BufReader::new(process.stdout.take().expect("Failed to open stdout"));
let errors = BufReader::new(process.stderr.take().expect("Failed to open stderr"));
Self::streams( Self::streams(
Box::new(BufReader::new(reader)), Box::new(BufReader::new(reader)),
Box::new(writer), Box::new(writer),
Some(Box::new(BufReader::new(errors))),
id, id,
Some(process), Some(process),
) )
@ -167,6 +170,7 @@ impl Client {
Self::streams( Self::streams(
Box::new(BufReader::new(rx)), Box::new(BufReader::new(rx)),
Box::new(tx), Box::new(tx),
None,
id, id,
Some(process), Some(process),
) )

@ -53,6 +53,7 @@ impl Transport {
pub fn start( pub fn start(
server_stdout: Box<dyn AsyncBufRead + Unpin + Send>, server_stdout: Box<dyn AsyncBufRead + Unpin + Send>,
server_stdin: Box<dyn AsyncWrite + Unpin + Send>, server_stdin: Box<dyn AsyncWrite + Unpin + Send>,
server_stderr: Option<Box<dyn AsyncBufRead + Unpin + Send>>,
id: usize, id: usize,
) -> (UnboundedReceiver<Payload>, UnboundedSender<Request>) { ) -> (UnboundedReceiver<Payload>, UnboundedSender<Request>) {
let (client_tx, rx) = unbounded_channel(); let (client_tx, rx) = unbounded_channel();
@ -67,6 +68,9 @@ impl Transport {
tokio::spawn(Self::recv(transport.clone(), server_stdout, client_tx)); tokio::spawn(Self::recv(transport.clone(), server_stdout, client_tx));
tokio::spawn(Self::send(transport, server_stdin, client_rx)); tokio::spawn(Self::send(transport, server_stdin, client_rx));
if let Some(stderr) = server_stderr {
tokio::spawn(Self::err(stderr));
}
(rx, tx) (rx, tx)
} }
@ -117,6 +121,17 @@ impl Transport {
Ok(output?) Ok(output?)
} }
async fn recv_server_error(
err: &mut (impl AsyncBufRead + Unpin + Send),
buffer: &mut String,
) -> Result<()> {
buffer.truncate(0);
err.read_line(buffer).await?;
error!("err <- {}", buffer);
Ok(())
}
async fn send_payload_to_server( async fn send_payload_to_server(
&self, &self,
server_stdin: &mut Box<dyn AsyncWrite + Unpin + Send>, server_stdin: &mut Box<dyn AsyncWrite + Unpin + Send>,
@ -243,4 +258,17 @@ impl Transport {
.unwrap() .unwrap()
} }
} }
async fn err(mut server_stderr: Box<dyn AsyncBufRead + Unpin + Send>) {
let mut recv_buffer = String::new();
loop {
match Self::recv_server_error(&mut server_stderr, &mut recv_buffer).await {
Ok(_) => {}
Err(err) => {
error!("err: <- {:?}", err);
break;
}
}
}
}
} }

Loading…
Cancel
Save