Replace hashmap for replies with dashmap

Signed-off-by: trivernis <trivernis@protonmail.com>
pull/39/head
trivernis 3 years ago
parent 5a0c829602
commit ae52798cd0
Signed by: Trivernis
GPG Key ID: DFFFCC2C7A02DB45

87
Cargo.lock generated

@ -129,6 +129,7 @@ dependencies = [
"chacha20poly1305", "chacha20poly1305",
"criterion", "criterion",
"crossbeam-utils", "crossbeam-utils",
"dashmap",
"futures", "futures",
"futures-core", "futures-core",
"lazy_static", "lazy_static",
@ -399,6 +400,17 @@ dependencies = [
"zeroize", "zeroize",
] ]
[[package]]
name = "dashmap"
version = "5.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4c8858831f7781322e539ea39e72449c46b059638250c14344fec8d0aa6e539c"
dependencies = [
"cfg-if",
"num_cpus",
"parking_lot",
]
[[package]] [[package]]
name = "digest" name = "digest"
version = "0.9.0" version = "0.9.0"
@ -778,6 +790,29 @@ version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5" checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5"
[[package]]
name = "parking_lot"
version = "0.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "87f5ec2493a61ac0506c0f4199f99070cbe83857b0337006a30f3e6719b8ef58"
dependencies = [
"lock_api",
"parking_lot_core",
]
[[package]]
name = "parking_lot_core"
version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "28141e0cc4143da2443301914478dc976a61ffdb3f043058310c70df2fed8954"
dependencies = [
"cfg-if",
"libc",
"redox_syscall",
"smallvec",
"windows-sys",
]
[[package]] [[package]]
name = "pin-project-lite" name = "pin-project-lite"
version = "0.2.8" version = "0.2.8"
@ -950,6 +985,15 @@ dependencies = [
"num_cpus", "num_cpus",
] ]
[[package]]
name = "redox_syscall"
version = "0.2.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ae183fc1b06c149f0c1793e1eb447c8b04bfe46d48e9e48bfb8d2d7ed64ecf0"
dependencies = [
"bitflags",
]
[[package]] [[package]]
name = "regex" name = "regex"
version = "1.5.5" version = "1.5.5"
@ -1539,6 +1583,49 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
[[package]]
name = "windows-sys"
version = "0.32.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3df6e476185f92a12c072be4a189a0210dcdcf512a1891d6dff9edb874deadc6"
dependencies = [
"windows_aarch64_msvc",
"windows_i686_gnu",
"windows_i686_msvc",
"windows_x86_64_gnu",
"windows_x86_64_msvc",
]
[[package]]
name = "windows_aarch64_msvc"
version = "0.32.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d8e92753b1c443191654ec532f14c199742964a061be25d77d7a96f09db20bf5"
[[package]]
name = "windows_i686_gnu"
version = "0.32.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6a711c68811799e017b6038e0922cb27a5e2f43a2ddb609fe0b6f3eeda9de615"
[[package]]
name = "windows_i686_msvc"
version = "0.32.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "146c11bb1a02615db74680b32a68e2d61f553cc24c4eb5b4ca10311740e44172"
[[package]]
name = "windows_x86_64_gnu"
version = "0.32.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c912b12f7454c6620635bbff3450962753834be2a594819bd5e945af18ec64bc"
[[package]]
name = "windows_x86_64_msvc"
version = "0.32.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "504a2476202769977a040c6364301a3f65d0cc9e3fb08600b2bda150a0488316"
[[package]] [[package]]
name = "x25519-dalek" name = "x25519-dalek"
version = "1.2.0" version = "1.2.0"

@ -29,6 +29,7 @@ num_enum = "0.5.7"
futures-core = "0.3.21" futures-core = "0.3.21"
trait-bound-typemap = "0.3.3" trait-bound-typemap = "0.3.3"
bytes = "1.1.0" bytes = "1.1.0"
dashmap = "5.2.0"
rmp-serde = { version = "1.0.0", optional = true } rmp-serde = { version = "1.0.0", optional = true }
bincode = { version = "1.3.3", optional = true } bincode = { version = "1.3.3", optional = true }
serde_json = { version = "1.0.79", optional = true } serde_json = { version = "1.0.79", optional = true }

@ -249,7 +249,7 @@ where
namespaces: self.namespaces.clone(), namespaces: self.namespaces.clone(),
handler: self.handler.clone(), handler: self.handler.clone(),
data: Arc::clone(&data), data: Arc::clone(&data),
reply_listeners: Arc::clone(&reply_listeners), reply_listeners: reply_listeners.clone(),
timeout: self.timeout.clone(), timeout: self.timeout.clone(),
#[cfg(feature = "serialize")] #[cfg(feature = "serialize")]

@ -1,4 +1,4 @@
use std::collections::HashMap; use dashmap::DashMap;
use std::mem; use std::mem;
use std::ops::{Deref, DerefMut}; use std::ops::{Deref, DerefMut};
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
@ -18,7 +18,7 @@ use crate::payload::IntoPayload;
use crate::payload::{DynamicSerializer, SerdePayload}; use crate::payload::{DynamicSerializer, SerdePayload};
use crate::prelude::Response; use crate::prelude::Response;
pub(crate) type ReplyListeners = Arc<Mutex<HashMap<u64, mpsc::Sender<Event>>>>; pub(crate) type ReplyListeners = Arc<DashMap<u64, mpsc::Sender<Event>>>;
/// An object provided to each callback function. /// An object provided to each callback function.
/// Currently it only holds the event emitter to emit response events in event callbacks. /// Currently it only holds the event emitter to emit response events in event callbacks.
@ -126,10 +126,7 @@ impl Context {
#[tracing::instrument(level = "debug", skip(self))] #[tracing::instrument(level = "debug", skip(self))]
pub(crate) async fn register_reply_listener(&self, event_id: u64) -> Result<Receiver<Event>> { pub(crate) async fn register_reply_listener(&self, event_id: u64) -> Result<Receiver<Event>> {
let (rx, tx) = mpsc::channel(8); let (rx, tx) = mpsc::channel(8);
{ self.reply_listeners.insert(event_id, rx);
let mut listeners = self.reply_listeners.lock().await;
listeners.insert(event_id, rx);
}
Ok(tx) Ok(tx)
} }
@ -154,8 +151,7 @@ impl Context {
/// Returns the channel for a reply to the given message id /// Returns the channel for a reply to the given message id
#[inline] #[inline]
pub(crate) async fn get_reply_sender(&self, ref_id: u64) -> Option<mpsc::Sender<Event>> { pub(crate) async fn get_reply_sender(&self, ref_id: u64) -> Option<mpsc::Sender<Event>> {
let listeners = self.reply_listeners.lock().await; self.reply_listeners.get(&ref_id).map(|e| e.value().clone())
listeners.get(&ref_id).cloned()
} }
#[inline] #[inline]

@ -75,8 +75,7 @@ fn handle_event(mut ctx: Context, handler: Arc<EventHandler>, event: Event) {
{ {
tracing::error!("Error occurred when sending error response: {:?}", e); tracing::error!("Error occurred when sending error response: {:?}", e);
} }
let mut reply_listeners = ctx.reply_listeners.lock().await; ctx.reply_listeners.remove(&event_id);
reply_listeners.remove(&event_id);
} }
Err(e) => { Err(e) => {
// emit an error event // emit an error event

@ -79,6 +79,5 @@ impl<P: IntoPayload + Send + Sync + 'static> Future for EmitMetadataWithResponse
} }
pub(crate) async fn remove_reply_listener(ctx: &Context, event_id: u64) { pub(crate) async fn remove_reply_listener(ctx: &Context, event_id: u64) {
let mut listeners = ctx.reply_listeners.lock().await; ctx.reply_listeners.remove(&event_id);
listeners.remove(&event_id);
} }

@ -29,12 +29,14 @@ async fn it_receives_responses() {
.unwrap(); .unwrap();
for i in 0u8..=100 { for i in 0u8..=100 {
if let Some(Ok(event)) = reply_stream.next().await { if let Some(result) = reply_stream.next().await {
let event = result.unwrap();
assert_eq!(event.payload::<NumberPayload>().unwrap().0, i) assert_eq!(event.payload::<NumberPayload>().unwrap().0, i)
} else { } else {
panic!("stream 1 has no value {}", i); panic!("stream 1 has no value {}", i);
} }
if let Some(Ok(event)) = reply_stream_2.next().await { if let Some(result) = reply_stream_2.next().await {
let event = result.unwrap();
assert_eq!(event.payload::<NumberPayload>().unwrap().0, i) assert_eq!(event.payload::<NumberPayload>().unwrap().0, i)
} else { } else {
panic!("stream 2 has no value {}", i); panic!("stream 2 has no value {}", i);

Loading…
Cancel
Save