diff --git a/Cargo.lock b/Cargo.lock index 9deb4336..2bd55d12 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -120,7 +120,7 @@ dependencies = [ [[package]] name = "bromine" -version = "0.20.0" +version = "0.20.1" dependencies = [ "async-trait", "bincode", @@ -129,6 +129,7 @@ dependencies = [ "chacha20poly1305", "criterion", "crossbeam-utils", + "dashmap", "futures", "futures-core", "lazy_static", @@ -399,6 +400,17 @@ dependencies = [ "zeroize", ] +[[package]] +name = "dashmap" +version = "5.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c8858831f7781322e539ea39e72449c46b059638250c14344fec8d0aa6e539c" +dependencies = [ + "cfg-if", + "num_cpus", + "parking_lot", +] + [[package]] name = "digest" version = "0.9.0" @@ -778,6 +790,29 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5" +[[package]] +name = "parking_lot" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87f5ec2493a61ac0506c0f4199f99070cbe83857b0337006a30f3e6719b8ef58" +dependencies = [ + "lock_api", + "parking_lot_core", +] + +[[package]] +name = "parking_lot_core" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28141e0cc4143da2443301914478dc976a61ffdb3f043058310c70df2fed8954" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall", + "smallvec", + "windows-sys", +] + [[package]] name = "pin-project-lite" version = "0.2.8" @@ -950,6 +985,15 @@ dependencies = [ "num_cpus", ] +[[package]] +name = "redox_syscall" +version = "0.2.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ae183fc1b06c149f0c1793e1eb447c8b04bfe46d48e9e48bfb8d2d7ed64ecf0" +dependencies = [ + "bitflags", +] + [[package]] name = "regex" version = "1.5.5" @@ -1539,6 +1583,49 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "windows-sys" +version = "0.32.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3df6e476185f92a12c072be4a189a0210dcdcf512a1891d6dff9edb874deadc6" +dependencies = [ + "windows_aarch64_msvc", + "windows_i686_gnu", + "windows_i686_msvc", + "windows_x86_64_gnu", + "windows_x86_64_msvc", +] + +[[package]] +name = "windows_aarch64_msvc" +version = "0.32.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8e92753b1c443191654ec532f14c199742964a061be25d77d7a96f09db20bf5" + +[[package]] +name = "windows_i686_gnu" +version = "0.32.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a711c68811799e017b6038e0922cb27a5e2f43a2ddb609fe0b6f3eeda9de615" + +[[package]] +name = "windows_i686_msvc" +version = "0.32.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "146c11bb1a02615db74680b32a68e2d61f553cc24c4eb5b4ca10311740e44172" + +[[package]] +name = "windows_x86_64_gnu" +version = "0.32.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c912b12f7454c6620635bbff3450962753834be2a594819bd5e945af18ec64bc" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.32.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "504a2476202769977a040c6364301a3f65d0cc9e3fb08600b2bda150a0488316" + [[package]] name = "x25519-dalek" version = "1.2.0" diff --git a/Cargo.toml b/Cargo.toml index 1c72b631..2182ea62 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "bromine" -version = "0.20.0" +version = "0.20.1" authors = ["trivernis "] edition = "2018" readme = "README.md" @@ -28,15 +28,16 @@ async-trait = "0.1.52" num_enum = "0.5.7" futures-core = "0.3.21" trait-bound-typemap = "0.3.3" +bytes = "1.1.0" +dashmap = "5.2.0" rmp-serde = { version = "1.0.0", optional = true } bincode = { version = "1.3.3", optional = true } serde_json = { version = "1.0.79", optional = true } -bytes = "1.1.0" -chacha20poly1305 = "0.9.0" -x25519-dalek = "1.2.0" -rand = "0.8.5" -rand_core = "0.6.3" -sha2 = "0.10.2" +chacha20poly1305 = {version = "0.9.0", optional = true} +x25519-dalek = {version = "1.2.0", optional = true} +rand = {version = "0.8.5", optional = true} +rand_core = {version = "0.6.3", optional = true} +sha2 = {version = "0.10.2", optional = true} [dependencies.serde] optional = true @@ -73,6 +74,7 @@ features = ["macros", "rt-multi-thread"] [features] default = [] +encryption_layer = ["chacha20poly1305", "sha2", "rand", "x25519-dalek", "rand_core"] serialize = ["serde"] serialize_rmp = ["serialize", "rmp-serde"] serialize_bincode = ["serialize", "bincode"] diff --git a/src/ipc/builder.rs b/src/ipc/builder.rs index 83cba3ce..566d6ffa 100644 --- a/src/ipc/builder.rs +++ b/src/ipc/builder.rs @@ -249,7 +249,7 @@ where namespaces: self.namespaces.clone(), handler: self.handler.clone(), data: Arc::clone(&data), - reply_listeners: Arc::clone(&reply_listeners), + reply_listeners: reply_listeners.clone(), timeout: self.timeout.clone(), #[cfg(feature = "serialize")] diff --git a/src/ipc/context.rs b/src/ipc/context.rs index a44ead8a..540302b9 100644 --- a/src/ipc/context.rs +++ b/src/ipc/context.rs @@ -1,4 +1,4 @@ -use std::collections::HashMap; +use dashmap::DashMap; use std::mem; use std::ops::{Deref, DerefMut}; use std::sync::atomic::{AtomicUsize, Ordering}; @@ -18,7 +18,7 @@ use crate::payload::IntoPayload; use crate::payload::{DynamicSerializer, SerdePayload}; use crate::prelude::Response; -pub(crate) type ReplyListeners = Arc>>>; +pub(crate) type ReplyListeners = Arc>>; /// An object provided to each callback function. /// Currently it only holds the event emitter to emit response events in event callbacks. @@ -126,10 +126,7 @@ impl Context { #[tracing::instrument(level = "debug", skip(self))] pub(crate) async fn register_reply_listener(&self, event_id: u64) -> Result> { let (rx, tx) = mpsc::channel(8); - { - let mut listeners = self.reply_listeners.lock().await; - listeners.insert(event_id, rx); - } + self.reply_listeners.insert(event_id, rx); Ok(tx) } @@ -154,8 +151,7 @@ impl Context { /// Returns the channel for a reply to the given message id #[inline] pub(crate) async fn get_reply_sender(&self, ref_id: u64) -> Option> { - let listeners = self.reply_listeners.lock().await; - listeners.get(&ref_id).cloned() + self.reply_listeners.get(&ref_id).map(|e| e.value().clone()) } #[inline] diff --git a/src/ipc/mod.rs b/src/ipc/mod.rs index 8efcc30f..b663a64b 100644 --- a/src/ipc/mod.rs +++ b/src/ipc/mod.rs @@ -75,8 +75,7 @@ fn handle_event(mut ctx: Context, handler: Arc, event: Event) { { tracing::error!("Error occurred when sending error response: {:?}", e); } - let mut reply_listeners = ctx.reply_listeners.lock().await; - reply_listeners.remove(&event_id); + ctx.reply_listeners.remove(&event_id); } Err(e) => { // emit an error event diff --git a/src/ipc/stream_emitter/emit_metadata_with_response.rs b/src/ipc/stream_emitter/emit_metadata_with_response.rs index 29a8c2c0..896735d1 100644 --- a/src/ipc/stream_emitter/emit_metadata_with_response.rs +++ b/src/ipc/stream_emitter/emit_metadata_with_response.rs @@ -79,6 +79,5 @@ impl Future for EmitMetadataWithResponse } pub(crate) async fn remove_reply_listener(ctx: &Context, event_id: u64) { - let mut listeners = ctx.reply_listeners.lock().await; - listeners.remove(&event_id); + ctx.reply_listeners.remove(&event_id); } diff --git a/src/protocol/encrypted/mod.rs b/src/protocol/encrypted/mod.rs index dc130ec8..b7eb4675 100644 --- a/src/protocol/encrypted/mod.rs +++ b/src/protocol/encrypted/mod.rs @@ -5,7 +5,7 @@ mod protocol_impl; use bytes::{BufMut, Bytes, BytesMut}; pub use io_impl::*; pub use protocol_impl::*; -use rand_core::RngCore; +use rand::RngCore; use std::future::Future; use std::io; use std::pin::Pin; diff --git a/src/protocol/mod.rs b/src/protocol/mod.rs index 2792fb6f..c65d4d87 100644 --- a/src/protocol/mod.rs +++ b/src/protocol/mod.rs @@ -1,5 +1,6 @@ pub mod tcp; +#[cfg(feature = "encryption_layer")] pub mod encrypted; #[cfg(unix)] pub mod unix_socket; diff --git a/src/protocol/tcp.rs b/src/protocol/tcp.rs index f9369b4f..a7432883 100644 --- a/src/protocol/tcp.rs +++ b/src/protocol/tcp.rs @@ -5,18 +5,27 @@ use std::net::SocketAddr; use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf}; use tokio::net::{TcpListener, TcpStream}; +#[derive(Clone, Debug, Default)] +pub struct TcpOptions { + /// The time to live for the socket connection + pub ttl: Option, +} + #[async_trait] impl AsyncStreamProtocolListener for TcpListener { type AddressType = SocketAddr; type RemoteAddressType = SocketAddr; type Stream = TcpStream; - type ListenerOptions = (); + type ListenerOptions = TcpOptions; async fn protocol_bind( address: Self::AddressType, - _: Self::ListenerOptions, + options: Self::ListenerOptions, ) -> IPCResult { let listener = TcpListener::bind(address).await?; + if let Some(ttl) = options.ttl { + listener.set_ttl(ttl)?; + } Ok(listener) } @@ -40,13 +49,16 @@ impl AsyncProtocolStreamSplit for TcpStream { #[async_trait] impl AsyncProtocolStream for TcpStream { type AddressType = SocketAddr; - type StreamOptions = (); + type StreamOptions = TcpOptions; async fn protocol_connect( address: Self::AddressType, - _: Self::StreamOptions, + options: Self::StreamOptions, ) -> IPCResult { let stream = TcpStream::connect(address).await?; + if let Some(ttl) = options.ttl { + stream.set_ttl(ttl)?; + } Ok(stream) } diff --git a/tests/test_encryption.rs b/tests/test_encryption.rs index 24eeddea..1bfa5ea7 100644 --- a/tests/test_encryption.rs +++ b/tests/test_encryption.rs @@ -1,3 +1,4 @@ +#![cfg(feature = "encryption_layer")] use crate::utils::call_counter::increment_counter_for_event; use crate::utils::protocol::TestProtocolListener; use crate::utils::{get_free_port, start_server_and_client}; diff --git a/tests/test_event_streams.rs b/tests/test_event_streams.rs index dfdbbd51..1c351d98 100644 --- a/tests/test_event_streams.rs +++ b/tests/test_event_streams.rs @@ -29,12 +29,14 @@ async fn it_receives_responses() { .unwrap(); for i in 0u8..=100 { - if let Some(Ok(event)) = reply_stream.next().await { + if let Some(result) = reply_stream.next().await { + let event = result.unwrap(); assert_eq!(event.payload::().unwrap().0, i) } else { panic!("stream 1 has no value {}", i); } - if let Some(Ok(event)) = reply_stream_2.next().await { + if let Some(result) = reply_stream_2.next().await { + let event = result.unwrap(); assert_eq!(event.payload::().unwrap().0, i) } else { panic!("stream 2 has no value {}", i);