diff --git a/Cargo.lock b/Cargo.lock index ad64e523..2bd55d12 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -129,6 +129,7 @@ dependencies = [ "chacha20poly1305", "criterion", "crossbeam-utils", + "dashmap", "futures", "futures-core", "lazy_static", @@ -399,6 +400,17 @@ dependencies = [ "zeroize", ] +[[package]] +name = "dashmap" +version = "5.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c8858831f7781322e539ea39e72449c46b059638250c14344fec8d0aa6e539c" +dependencies = [ + "cfg-if", + "num_cpus", + "parking_lot", +] + [[package]] name = "digest" version = "0.9.0" @@ -778,6 +790,29 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5" +[[package]] +name = "parking_lot" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87f5ec2493a61ac0506c0f4199f99070cbe83857b0337006a30f3e6719b8ef58" +dependencies = [ + "lock_api", + "parking_lot_core", +] + +[[package]] +name = "parking_lot_core" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28141e0cc4143da2443301914478dc976a61ffdb3f043058310c70df2fed8954" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall", + "smallvec", + "windows-sys", +] + [[package]] name = "pin-project-lite" version = "0.2.8" @@ -950,6 +985,15 @@ dependencies = [ "num_cpus", ] +[[package]] +name = "redox_syscall" +version = "0.2.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ae183fc1b06c149f0c1793e1eb447c8b04bfe46d48e9e48bfb8d2d7ed64ecf0" +dependencies = [ + "bitflags", +] + [[package]] name = "regex" version = "1.5.5" @@ -1539,6 +1583,49 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "windows-sys" +version = "0.32.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3df6e476185f92a12c072be4a189a0210dcdcf512a1891d6dff9edb874deadc6" +dependencies = [ + "windows_aarch64_msvc", + "windows_i686_gnu", + "windows_i686_msvc", + "windows_x86_64_gnu", + "windows_x86_64_msvc", +] + +[[package]] +name = "windows_aarch64_msvc" +version = "0.32.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8e92753b1c443191654ec532f14c199742964a061be25d77d7a96f09db20bf5" + +[[package]] +name = "windows_i686_gnu" +version = "0.32.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a711c68811799e017b6038e0922cb27a5e2f43a2ddb609fe0b6f3eeda9de615" + +[[package]] +name = "windows_i686_msvc" +version = "0.32.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "146c11bb1a02615db74680b32a68e2d61f553cc24c4eb5b4ca10311740e44172" + +[[package]] +name = "windows_x86_64_gnu" +version = "0.32.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c912b12f7454c6620635bbff3450962753834be2a594819bd5e945af18ec64bc" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.32.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "504a2476202769977a040c6364301a3f65d0cc9e3fb08600b2bda150a0488316" + [[package]] name = "x25519-dalek" version = "1.2.0" diff --git a/Cargo.toml b/Cargo.toml index 09f447ff..2182ea62 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,6 +29,7 @@ num_enum = "0.5.7" futures-core = "0.3.21" trait-bound-typemap = "0.3.3" bytes = "1.1.0" +dashmap = "5.2.0" rmp-serde = { version = "1.0.0", optional = true } bincode = { version = "1.3.3", optional = true } serde_json = { version = "1.0.79", optional = true } diff --git a/src/ipc/builder.rs b/src/ipc/builder.rs index 83cba3ce..566d6ffa 100644 --- a/src/ipc/builder.rs +++ b/src/ipc/builder.rs @@ -249,7 +249,7 @@ where namespaces: self.namespaces.clone(), handler: self.handler.clone(), data: Arc::clone(&data), - reply_listeners: Arc::clone(&reply_listeners), + reply_listeners: reply_listeners.clone(), timeout: self.timeout.clone(), #[cfg(feature = "serialize")] diff --git a/src/ipc/context.rs b/src/ipc/context.rs index a44ead8a..540302b9 100644 --- a/src/ipc/context.rs +++ b/src/ipc/context.rs @@ -1,4 +1,4 @@ -use std::collections::HashMap; +use dashmap::DashMap; use std::mem; use std::ops::{Deref, DerefMut}; use std::sync::atomic::{AtomicUsize, Ordering}; @@ -18,7 +18,7 @@ use crate::payload::IntoPayload; use crate::payload::{DynamicSerializer, SerdePayload}; use crate::prelude::Response; -pub(crate) type ReplyListeners = Arc>>>; +pub(crate) type ReplyListeners = Arc>>; /// An object provided to each callback function. /// Currently it only holds the event emitter to emit response events in event callbacks. @@ -126,10 +126,7 @@ impl Context { #[tracing::instrument(level = "debug", skip(self))] pub(crate) async fn register_reply_listener(&self, event_id: u64) -> Result> { let (rx, tx) = mpsc::channel(8); - { - let mut listeners = self.reply_listeners.lock().await; - listeners.insert(event_id, rx); - } + self.reply_listeners.insert(event_id, rx); Ok(tx) } @@ -154,8 +151,7 @@ impl Context { /// Returns the channel for a reply to the given message id #[inline] pub(crate) async fn get_reply_sender(&self, ref_id: u64) -> Option> { - let listeners = self.reply_listeners.lock().await; - listeners.get(&ref_id).cloned() + self.reply_listeners.get(&ref_id).map(|e| e.value().clone()) } #[inline] diff --git a/src/ipc/mod.rs b/src/ipc/mod.rs index 8efcc30f..b663a64b 100644 --- a/src/ipc/mod.rs +++ b/src/ipc/mod.rs @@ -75,8 +75,7 @@ fn handle_event(mut ctx: Context, handler: Arc, event: Event) { { tracing::error!("Error occurred when sending error response: {:?}", e); } - let mut reply_listeners = ctx.reply_listeners.lock().await; - reply_listeners.remove(&event_id); + ctx.reply_listeners.remove(&event_id); } Err(e) => { // emit an error event diff --git a/src/ipc/stream_emitter/emit_metadata_with_response.rs b/src/ipc/stream_emitter/emit_metadata_with_response.rs index 29a8c2c0..896735d1 100644 --- a/src/ipc/stream_emitter/emit_metadata_with_response.rs +++ b/src/ipc/stream_emitter/emit_metadata_with_response.rs @@ -79,6 +79,5 @@ impl Future for EmitMetadataWithResponse } pub(crate) async fn remove_reply_listener(ctx: &Context, event_id: u64) { - let mut listeners = ctx.reply_listeners.lock().await; - listeners.remove(&event_id); + ctx.reply_listeners.remove(&event_id); } diff --git a/tests/test_event_streams.rs b/tests/test_event_streams.rs index dfdbbd51..1c351d98 100644 --- a/tests/test_event_streams.rs +++ b/tests/test_event_streams.rs @@ -29,12 +29,14 @@ async fn it_receives_responses() { .unwrap(); for i in 0u8..=100 { - if let Some(Ok(event)) = reply_stream.next().await { + if let Some(result) = reply_stream.next().await { + let event = result.unwrap(); assert_eq!(event.payload::().unwrap().0, i) } else { panic!("stream 1 has no value {}", i); } - if let Some(Ok(event)) = reply_stream_2.next().await { + if let Some(result) = reply_stream_2.next().await { + let event = result.unwrap(); assert_eq!(event.payload::().unwrap().0, i) } else { panic!("stream 2 has no value {}", i);