Add reply listen timeouts

Signed-off-by: trivernis <trivernis@protonmail.com>
pull/22/head
trivernis 3 years ago
parent 7b0413704c
commit 628573f051
Signed by: Trivernis
GPG Key ID: DFFFCC2C7A02DB45

53
Cargo.lock generated

@ -210,6 +210,7 @@ checksum = "a12aa0eb539080d55c3f2d45a67c3b58b6b0773c1a3ca2dfec66d58c97fd66ca"
dependencies = [
"futures-channel",
"futures-core",
"futures-executor",
"futures-io",
"futures-sink",
"futures-task",
@ -232,12 +233,36 @@ version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "88d1c26957f23603395cd326b0ffe64124b818f4449552f960d815cfba83a53d"
[[package]]
name = "futures-executor"
version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "45025be030969d763025784f7f355043dc6bc74093e4ecc5000ca4dc50d8745c"
dependencies = [
"futures-core",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-io"
version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "522de2a0fe3e380f1bc577ba0474108faf3f6b18321dbf60b3b9c39a75073377"
[[package]]
name = "futures-macro"
version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "18e4a4b95cea4b4ccbcf1c5675ca7c4ee4e9e75eb79944d07defde18068f79bb"
dependencies = [
"autocfg",
"proc-macro-hack",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "futures-sink"
version = "0.3.17"
@ -257,11 +282,18 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "36568465210a3a6ee45e1f165136d68671471a501e632e9a98d96872222b5481"
dependencies = [
"autocfg",
"futures-channel",
"futures-core",
"futures-io",
"futures-macro",
"futures-sink",
"futures-task",
"memchr",
"pin-project-lite",
"pin-utils",
"proc-macro-hack",
"proc-macro-nested",
"slab",
]
[[package]]
@ -435,6 +467,18 @@ dependencies = [
"plotters-backend",
]
[[package]]
name = "proc-macro-hack"
version = "0.5.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dbf0c48bc1d91375ae5c3cd81e3722dff1abcf81a30960240640d223f59fe0e5"
[[package]]
name = "proc-macro-nested"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc881b2c22681370c6a780e47af9840ef841837bc98118431d4e1868bd0c1086"
[[package]]
name = "proc-macro2"
version = "1.0.29"
@ -511,11 +555,12 @@ dependencies = [
[[package]]
name = "rmp-ipc"
version = "0.9.2"
version = "0.10.0"
dependencies = [
"async-trait",
"byteorder",
"criterion",
"futures",
"lazy_static",
"rmp-serde",
"serde",
@ -613,6 +658,12 @@ dependencies = [
"serde",
]
[[package]]
name = "slab"
version = "0.4.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9def91fd1e018fe007022791f865d0ccc9b3a0d5001e01aabb8b40e46000afb5"
[[package]]
name = "syn"
version = "1.0.80"

@ -1,6 +1,6 @@
[package]
name = "rmp-ipc"
version = "0.9.2"
version = "0.10.0"
authors = ["trivernis <trivernis@protonmail.com>"]
edition = "2018"
readme = "README.md"
@ -27,6 +27,7 @@ lazy_static = "1.4.0"
typemap_rev = "0.1.5"
byteorder = "1.4.3"
async-trait = "0.1.51"
futures = "0.3.17"
[dependencies.serde]
version = "1.0.130"

@ -29,6 +29,9 @@ pub enum Error {
#[error("Error response: {0}")]
ErrorEvent(#[from] ErrorEventData),
#[error("Timed out")]
Timeout,
}
impl From<String> for Error {

@ -12,6 +12,7 @@ use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::RwLock;
use typemap_rev::{TypeMap, TypeMapKey};
@ -54,6 +55,7 @@ pub struct IPCBuilder<L: AsyncStreamProtocolListener> {
address: Option<L::AddressType>,
namespaces: HashMap<String, Namespace<L::Stream>>,
data: TypeMap,
timeout: Duration,
}
impl<L> IPCBuilder<L>
@ -76,6 +78,7 @@ where
address: None,
namespaces: HashMap::new(),
data: TypeMap::new(),
timeout: Duration::from_secs(60),
}
}
@ -121,6 +124,13 @@ where
self
}
/// Sets the timeout when listening for a response
pub fn timeout(mut self, timeout: Duration) -> Self {
self.timeout = timeout;
self
}
/// Builds an ipc server
#[tracing::instrument(skip(self))]
pub async fn build_server(self) -> Result<()> {
@ -129,6 +139,7 @@ where
namespaces: self.namespaces,
handler: self.handler,
data: self.data,
timeout: self.timeout,
};
server.start(self.address.unwrap()).await?;
@ -146,6 +157,7 @@ where
handler: self.handler,
data,
reply_listeners,
timeout: self.timeout,
};
let ctx = client.connect(self.address.unwrap()).await?;
@ -174,6 +186,7 @@ where
handler: self.handler.clone(),
data: Arc::clone(&data),
reply_listeners: Arc::clone(&reply_listeners),
timeout: self.timeout.clone(),
};
let ctx = client.connect(address.clone()).await?;

@ -7,6 +7,7 @@ use crate::namespaces::namespace::Namespace;
use crate::protocol::AsyncProtocolStream;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::oneshot;
use tokio::sync::RwLock;
use typemap_rev::TypeMap;
@ -20,6 +21,7 @@ pub struct IPCClient<S: AsyncProtocolStream> {
pub(crate) namespaces: HashMap<String, Namespace<S>>,
pub(crate) data: Arc<RwLock<TypeMap>>,
pub(crate) reply_listeners: ReplyListeners,
pub(crate) timeout: Duration,
}
impl<S> IPCClient<S>
@ -39,6 +41,7 @@ where
self.data,
Some(tx),
self.reply_listeners,
self.timeout,
);
let handler = Arc::new(self.handler);
let namespaces = Arc::new(self.namespaces);

@ -2,6 +2,8 @@ use crate::error::{Error, Result};
use crate::event::Event;
use crate::ipc::stream_emitter::StreamEmitter;
use crate::protocol::AsyncProtocolStream;
use futures::future;
use futures::future::Either;
use std::collections::HashMap;
use std::mem;
use std::ops::{Deref, DerefMut};
@ -9,6 +11,7 @@ use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use tokio::sync::oneshot::Sender;
use tokio::sync::{oneshot, Mutex, RwLock};
use tokio::time::Duration;
use typemap_rev::TypeMap;
pub(crate) type ReplyListeners = Arc<Mutex<HashMap<u64, oneshot::Sender<Event>>>>;
@ -35,6 +38,8 @@ pub struct Context<S: AsyncProtocolStream> {
stop_sender: Arc<Mutex<Option<Sender<()>>>>,
reply_listeners: ReplyListeners,
reply_timeout: Duration,
}
impl<S> Clone for Context<S>
@ -47,6 +52,7 @@ where
data: Arc::clone(&self.data),
stop_sender: Arc::clone(&self.stop_sender),
reply_listeners: Arc::clone(&self.reply_listeners),
reply_timeout: self.reply_timeout.clone(),
}
}
}
@ -60,12 +66,14 @@ where
data: Arc<RwLock<TypeMap>>,
stop_sender: Option<Sender<()>>,
reply_listeners: ReplyListeners,
reply_timeout: Duration,
) -> Self {
Self {
emitter,
reply_listeners,
data,
stop_sender: Arc::new(Mutex::new(stop_sender)),
reply_timeout,
}
}
@ -77,7 +85,20 @@ where
let mut listeners = self.reply_listeners.lock().await;
listeners.insert(message_id, rx);
}
let event = tx.await?;
let result = future::select(
Box::pin(tx),
Box::pin(tokio::time::sleep(self.reply_timeout)),
)
.await;
let event = match result {
Either::Left((tx_result, _)) => Ok(tx_result?),
Either::Right(_) => {
let mut listeners = self.reply_listeners.lock().await;
listeners.remove(&message_id);
Err(Error::Timeout)
}
}?;
Ok(event)
}

@ -7,6 +7,7 @@ use crate::namespaces::namespace::Namespace;
use crate::protocol::{AsyncProtocolStreamSplit, AsyncStreamProtocolListener};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::RwLock;
use typemap_rev::TypeMap;
@ -17,6 +18,7 @@ pub struct IPCServer<L: AsyncStreamProtocolListener> {
pub(crate) handler: EventHandler<L::Stream>,
pub(crate) namespaces: HashMap<String, Namespace<L::Stream>>,
pub(crate) data: TypeMap,
pub(crate) timeout: Duration,
}
impl<L> IPCServer<L>
@ -38,12 +40,19 @@ where
let handler = Arc::clone(&handler);
let namespaces = Arc::clone(&namespaces);
let data = Arc::clone(&data);
let timeout = self.timeout.clone();
tokio::spawn(async {
tokio::spawn(async move {
let (read_half, write_half) = stream.protocol_into_split();
let emitter = StreamEmitter::new(write_half);
let reply_listeners = ReplyListeners::default();
let ctx = Context::new(StreamEmitter::clone(&emitter), data, None, reply_listeners);
let ctx = Context::new(
StreamEmitter::clone(&emitter),
data,
None,
reply_listeners,
timeout.into(),
);
handle_connection(namespaces, handler, read_half, ctx).await;
});

@ -11,6 +11,7 @@ use tokio::net::TcpListener;
use typemap_rev::TypeMapKey;
async fn handle_ping_event<P: AsyncProtocolStream>(ctx: &Context<P>, e: Event) -> IPCResult<()> {
tokio::time::sleep(Duration::from_secs(1)).await;
let mut ping_data = e.data::<PingEventData>()?;
ping_data.time = SystemTime::now();
ping_data.ttl -= 1;
@ -25,6 +26,7 @@ async fn handle_ping_event<P: AsyncProtocolStream>(ctx: &Context<P>, e: Event) -
fn get_builder_with_ping<L: AsyncStreamProtocolListener>(address: L::AddressType) -> IPCBuilder<L> {
IPCBuilder::new()
.on("ping", |ctx, e| Box::pin(handle_ping_event(ctx, e)))
.timeout(Duration::from_secs(10))
.address(address)
}

Loading…
Cancel
Save