|
|
|
@ -5,11 +5,18 @@ use crate::{
|
|
|
|
|
use log::{error, info};
|
|
|
|
|
use serde::{Deserialize, Serialize};
|
|
|
|
|
use serde_json::{from_value, to_value, Value};
|
|
|
|
|
use std::sync::{
|
|
|
|
|
atomic::{AtomicU64, Ordering},
|
|
|
|
|
Arc,
|
|
|
|
|
use std::{
|
|
|
|
|
collections::HashMap,
|
|
|
|
|
net::{IpAddr, Ipv4Addr},
|
|
|
|
|
process::Stdio,
|
|
|
|
|
};
|
|
|
|
|
use std::{
|
|
|
|
|
net::SocketAddr,
|
|
|
|
|
sync::{
|
|
|
|
|
atomic::{AtomicU64, Ordering},
|
|
|
|
|
Arc,
|
|
|
|
|
},
|
|
|
|
|
};
|
|
|
|
|
use std::{collections::HashMap, process::Stdio};
|
|
|
|
|
use tokio::{
|
|
|
|
|
io::{AsyncBufRead, AsyncWrite, BufReader, BufWriter},
|
|
|
|
|
join,
|
|
|
|
@ -19,6 +26,7 @@ use tokio::{
|
|
|
|
|
mpsc::{channel, Receiver, Sender, UnboundedReceiver, UnboundedSender},
|
|
|
|
|
Mutex,
|
|
|
|
|
},
|
|
|
|
|
time,
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
#[derive(Debug, PartialEq, Clone, Deserialize, Serialize)]
|
|
|
|
@ -376,6 +384,82 @@ impl Client {
|
|
|
|
|
Ok(client)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub async fn tcp(addr: std::net::SocketAddr, id: usize) -> Result<Self> {
|
|
|
|
|
let stream = TcpStream::connect(addr).await?;
|
|
|
|
|
let (rx, tx) = stream.into_split();
|
|
|
|
|
Self::streams(Box::new(BufReader::new(rx)), Box::new(tx), id, None)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub fn stdio(cmd: &str, args: Vec<&str>, id: usize) -> Result<Self> {
|
|
|
|
|
let process = Command::new(cmd)
|
|
|
|
|
.args(args)
|
|
|
|
|
.stdin(Stdio::piped())
|
|
|
|
|
.stdout(Stdio::piped())
|
|
|
|
|
// make sure the process is reaped on drop
|
|
|
|
|
.kill_on_drop(true)
|
|
|
|
|
.spawn();
|
|
|
|
|
|
|
|
|
|
let mut process = process?;
|
|
|
|
|
|
|
|
|
|
// TODO: do we need bufreader/writer here? or do we use async wrappers on unblock?
|
|
|
|
|
let writer = BufWriter::new(process.stdin.take().expect("Failed to open stdin"));
|
|
|
|
|
let reader = BufReader::new(process.stdout.take().expect("Failed to open stdout"));
|
|
|
|
|
|
|
|
|
|
Self::streams(
|
|
|
|
|
Box::new(BufReader::new(reader)),
|
|
|
|
|
Box::new(writer),
|
|
|
|
|
id,
|
|
|
|
|
Some(process),
|
|
|
|
|
)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async fn get_port() -> Option<u16> {
|
|
|
|
|
Some(
|
|
|
|
|
tokio::net::TcpListener::bind(SocketAddr::new(
|
|
|
|
|
IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
|
|
|
|
|
0,
|
|
|
|
|
))
|
|
|
|
|
.await
|
|
|
|
|
.ok()?
|
|
|
|
|
.local_addr()
|
|
|
|
|
.ok()?
|
|
|
|
|
.port(),
|
|
|
|
|
)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub async fn tcp_process(
|
|
|
|
|
cmd: &str,
|
|
|
|
|
args: Vec<&str>,
|
|
|
|
|
port_format: &str,
|
|
|
|
|
id: usize,
|
|
|
|
|
) -> Result<Self> {
|
|
|
|
|
let port = Self::get_port().await.unwrap();
|
|
|
|
|
|
|
|
|
|
let process = Command::new(cmd)
|
|
|
|
|
.args(args)
|
|
|
|
|
.args(port_format.replace("{}", &port.to_string()).split(' '))
|
|
|
|
|
// make sure the process is reaped on drop
|
|
|
|
|
.kill_on_drop(true)
|
|
|
|
|
.spawn()?;
|
|
|
|
|
|
|
|
|
|
// Wait for adapter to become ready for connection
|
|
|
|
|
time::sleep(time::Duration::from_millis(500)).await;
|
|
|
|
|
|
|
|
|
|
let stream = TcpStream::connect(SocketAddr::new(
|
|
|
|
|
IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
|
|
|
|
|
port,
|
|
|
|
|
))
|
|
|
|
|
.await?;
|
|
|
|
|
|
|
|
|
|
let (rx, tx) = stream.into_split();
|
|
|
|
|
Self::streams(
|
|
|
|
|
Box::new(BufReader::new(rx)),
|
|
|
|
|
Box::new(tx),
|
|
|
|
|
id,
|
|
|
|
|
Some(process),
|
|
|
|
|
)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async fn recv(
|
|
|
|
|
awaited_events: Arc<Mutex<HashMap<String, Sender<Event>>>>,
|
|
|
|
|
mut server_rx: UnboundedReceiver<Payload>,
|
|
|
|
@ -413,35 +497,6 @@ impl Client {
|
|
|
|
|
tx
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub async fn tcp(addr: std::net::SocketAddr, id: usize) -> Result<Self> {
|
|
|
|
|
let stream = TcpStream::connect(addr).await?;
|
|
|
|
|
let (rx, tx) = stream.into_split();
|
|
|
|
|
Self::streams(Box::new(BufReader::new(rx)), Box::new(tx), id, None)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub fn stdio(cmd: &str, args: Vec<&str>, id: usize) -> Result<Self> {
|
|
|
|
|
let process = Command::new(cmd)
|
|
|
|
|
.args(args)
|
|
|
|
|
.stdin(Stdio::piped())
|
|
|
|
|
.stdout(Stdio::piped())
|
|
|
|
|
// make sure the process is reaped on drop
|
|
|
|
|
.kill_on_drop(true)
|
|
|
|
|
.spawn();
|
|
|
|
|
|
|
|
|
|
let mut process = process?;
|
|
|
|
|
|
|
|
|
|
// TODO: do we need bufreader/writer here? or do we use async wrappers on unblock?
|
|
|
|
|
let writer = BufWriter::new(process.stdin.take().expect("Failed to open stdin"));
|
|
|
|
|
let reader = BufReader::new(process.stdout.take().expect("Failed to open stdout"));
|
|
|
|
|
|
|
|
|
|
Self::streams(
|
|
|
|
|
Box::new(BufReader::new(reader)),
|
|
|
|
|
Box::new(writer),
|
|
|
|
|
id,
|
|
|
|
|
Some(process),
|
|
|
|
|
)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub fn id(&self) -> usize {
|
|
|
|
|
self.id
|
|
|
|
|
}
|
|
|
|
|