diff --git a/Cargo.lock b/Cargo.lock index a5fc41a8..5db20e0f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -210,6 +210,7 @@ checksum = "a12aa0eb539080d55c3f2d45a67c3b58b6b0773c1a3ca2dfec66d58c97fd66ca" dependencies = [ "futures-channel", "futures-core", + "futures-executor", "futures-io", "futures-sink", "futures-task", @@ -232,12 +233,36 @@ version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "88d1c26957f23603395cd326b0ffe64124b818f4449552f960d815cfba83a53d" +[[package]] +name = "futures-executor" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "45025be030969d763025784f7f355043dc6bc74093e4ecc5000ca4dc50d8745c" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + [[package]] name = "futures-io" version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "522de2a0fe3e380f1bc577ba0474108faf3f6b18321dbf60b3b9c39a75073377" +[[package]] +name = "futures-macro" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "18e4a4b95cea4b4ccbcf1c5675ca7c4ee4e9e75eb79944d07defde18068f79bb" +dependencies = [ + "autocfg", + "proc-macro-hack", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "futures-sink" version = "0.3.17" @@ -257,11 +282,18 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "36568465210a3a6ee45e1f165136d68671471a501e632e9a98d96872222b5481" dependencies = [ "autocfg", + "futures-channel", "futures-core", + "futures-io", + "futures-macro", "futures-sink", "futures-task", + "memchr", "pin-project-lite", "pin-utils", + "proc-macro-hack", + "proc-macro-nested", + "slab", ] [[package]] @@ -435,6 +467,18 @@ dependencies = [ "plotters-backend", ] +[[package]] +name = "proc-macro-hack" +version = "0.5.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dbf0c48bc1d91375ae5c3cd81e3722dff1abcf81a30960240640d223f59fe0e5" + +[[package]] +name = "proc-macro-nested" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc881b2c22681370c6a780e47af9840ef841837bc98118431d4e1868bd0c1086" + [[package]] name = "proc-macro2" version = "1.0.29" @@ -511,11 +555,12 @@ dependencies = [ [[package]] name = "rmp-ipc" -version = "0.9.2" +version = "0.10.0" dependencies = [ "async-trait", "byteorder", "criterion", + "futures", "lazy_static", "rmp-serde", "serde", @@ -613,6 +658,12 @@ dependencies = [ "serde", ] +[[package]] +name = "slab" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9def91fd1e018fe007022791f865d0ccc9b3a0d5001e01aabb8b40e46000afb5" + [[package]] name = "syn" version = "1.0.80" diff --git a/Cargo.toml b/Cargo.toml index 300b0499..84ce761a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "rmp-ipc" -version = "0.9.2" +version = "0.10.0" authors = ["trivernis "] edition = "2018" readme = "README.md" @@ -27,6 +27,7 @@ lazy_static = "1.4.0" typemap_rev = "0.1.5" byteorder = "1.4.3" async-trait = "0.1.51" +futures = "0.3.17" [dependencies.serde] version = "1.0.130" diff --git a/src/error.rs b/src/error.rs index 22d01973..113ba491 100644 --- a/src/error.rs +++ b/src/error.rs @@ -29,6 +29,9 @@ pub enum Error { #[error("Error response: {0}")] ErrorEvent(#[from] ErrorEventData), + + #[error("Timed out")] + Timeout, } impl From for Error { diff --git a/src/ipc/builder.rs b/src/ipc/builder.rs index dcfe6948..f25f8409 100644 --- a/src/ipc/builder.rs +++ b/src/ipc/builder.rs @@ -12,6 +12,7 @@ use std::collections::HashMap; use std::future::Future; use std::pin::Pin; use std::sync::Arc; +use std::time::Duration; use tokio::sync::RwLock; use typemap_rev::{TypeMap, TypeMapKey}; @@ -54,6 +55,7 @@ pub struct IPCBuilder { address: Option, namespaces: HashMap>, data: TypeMap, + timeout: Duration, } impl IPCBuilder @@ -76,6 +78,7 @@ where address: None, namespaces: HashMap::new(), data: TypeMap::new(), + timeout: Duration::from_secs(60), } } @@ -121,6 +124,13 @@ where self } + /// Sets the timeout when listening for a response + pub fn timeout(mut self, timeout: Duration) -> Self { + self.timeout = timeout; + + self + } + /// Builds an ipc server #[tracing::instrument(skip(self))] pub async fn build_server(self) -> Result<()> { @@ -129,6 +139,7 @@ where namespaces: self.namespaces, handler: self.handler, data: self.data, + timeout: self.timeout, }; server.start(self.address.unwrap()).await?; @@ -146,6 +157,7 @@ where handler: self.handler, data, reply_listeners, + timeout: self.timeout, }; let ctx = client.connect(self.address.unwrap()).await?; @@ -174,6 +186,7 @@ where handler: self.handler.clone(), data: Arc::clone(&data), reply_listeners: Arc::clone(&reply_listeners), + timeout: self.timeout.clone(), }; let ctx = client.connect(address.clone()).await?; diff --git a/src/ipc/client.rs b/src/ipc/client.rs index d8cc9b6b..f0a2f329 100644 --- a/src/ipc/client.rs +++ b/src/ipc/client.rs @@ -7,6 +7,7 @@ use crate::namespaces::namespace::Namespace; use crate::protocol::AsyncProtocolStream; use std::collections::HashMap; use std::sync::Arc; +use std::time::Duration; use tokio::sync::oneshot; use tokio::sync::RwLock; use typemap_rev::TypeMap; @@ -20,6 +21,7 @@ pub struct IPCClient { pub(crate) namespaces: HashMap>, pub(crate) data: Arc>, pub(crate) reply_listeners: ReplyListeners, + pub(crate) timeout: Duration, } impl IPCClient @@ -39,6 +41,7 @@ where self.data, Some(tx), self.reply_listeners, + self.timeout, ); let handler = Arc::new(self.handler); let namespaces = Arc::new(self.namespaces); diff --git a/src/ipc/context.rs b/src/ipc/context.rs index aa341a90..65eee3e1 100644 --- a/src/ipc/context.rs +++ b/src/ipc/context.rs @@ -2,6 +2,8 @@ use crate::error::{Error, Result}; use crate::event::Event; use crate::ipc::stream_emitter::StreamEmitter; use crate::protocol::AsyncProtocolStream; +use futures::future; +use futures::future::Either; use std::collections::HashMap; use std::mem; use std::ops::{Deref, DerefMut}; @@ -9,6 +11,7 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use tokio::sync::oneshot::Sender; use tokio::sync::{oneshot, Mutex, RwLock}; +use tokio::time::Duration; use typemap_rev::TypeMap; pub(crate) type ReplyListeners = Arc>>>; @@ -35,6 +38,8 @@ pub struct Context { stop_sender: Arc>>>, reply_listeners: ReplyListeners, + + reply_timeout: Duration, } impl Clone for Context @@ -47,6 +52,7 @@ where data: Arc::clone(&self.data), stop_sender: Arc::clone(&self.stop_sender), reply_listeners: Arc::clone(&self.reply_listeners), + reply_timeout: self.reply_timeout.clone(), } } } @@ -60,12 +66,14 @@ where data: Arc>, stop_sender: Option>, reply_listeners: ReplyListeners, + reply_timeout: Duration, ) -> Self { Self { emitter, reply_listeners, data, stop_sender: Arc::new(Mutex::new(stop_sender)), + reply_timeout, } } @@ -77,7 +85,20 @@ where let mut listeners = self.reply_listeners.lock().await; listeners.insert(message_id, rx); } - let event = tx.await?; + let result = future::select( + Box::pin(tx), + Box::pin(tokio::time::sleep(self.reply_timeout)), + ) + .await; + + let event = match result { + Either::Left((tx_result, _)) => Ok(tx_result?), + Either::Right(_) => { + let mut listeners = self.reply_listeners.lock().await; + listeners.remove(&message_id); + Err(Error::Timeout) + } + }?; Ok(event) } diff --git a/src/ipc/server.rs b/src/ipc/server.rs index 095310df..904cd488 100644 --- a/src/ipc/server.rs +++ b/src/ipc/server.rs @@ -7,6 +7,7 @@ use crate::namespaces::namespace::Namespace; use crate::protocol::{AsyncProtocolStreamSplit, AsyncStreamProtocolListener}; use std::collections::HashMap; use std::sync::Arc; +use std::time::Duration; use tokio::sync::RwLock; use typemap_rev::TypeMap; @@ -17,6 +18,7 @@ pub struct IPCServer { pub(crate) handler: EventHandler, pub(crate) namespaces: HashMap>, pub(crate) data: TypeMap, + pub(crate) timeout: Duration, } impl IPCServer @@ -38,12 +40,19 @@ where let handler = Arc::clone(&handler); let namespaces = Arc::clone(&namespaces); let data = Arc::clone(&data); + let timeout = self.timeout.clone(); - tokio::spawn(async { + tokio::spawn(async move { let (read_half, write_half) = stream.protocol_into_split(); let emitter = StreamEmitter::new(write_half); let reply_listeners = ReplyListeners::default(); - let ctx = Context::new(StreamEmitter::clone(&emitter), data, None, reply_listeners); + let ctx = Context::new( + StreamEmitter::clone(&emitter), + data, + None, + reply_listeners, + timeout.into(), + ); handle_connection(namespaces, handler, read_half, ctx).await; }); diff --git a/src/tests/ipc_tests.rs b/src/tests/ipc_tests.rs index 4541cf97..633923a0 100644 --- a/src/tests/ipc_tests.rs +++ b/src/tests/ipc_tests.rs @@ -11,6 +11,7 @@ use tokio::net::TcpListener; use typemap_rev::TypeMapKey; async fn handle_ping_event(ctx: &Context

, e: Event) -> IPCResult<()> { + tokio::time::sleep(Duration::from_secs(1)).await; let mut ping_data = e.data::()?; ping_data.time = SystemTime::now(); ping_data.ttl -= 1; @@ -25,6 +26,7 @@ async fn handle_ping_event(ctx: &Context

, e: Event) - fn get_builder_with_ping(address: L::AddressType) -> IPCBuilder { IPCBuilder::new() .on("ping", |ctx, e| Box::pin(handle_ping_event(ctx, e))) + .timeout(Duration::from_secs(10)) .address(address) }