diff --git a/Cargo.lock b/Cargo.lock index 2bd55d12..5247a424 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -120,7 +120,7 @@ dependencies = [ [[package]] name = "bromine" -version = "0.20.1" +version = "0.20.2" dependencies = [ "async-trait", "bincode", diff --git a/Cargo.toml b/Cargo.toml index 2182ea62..cb60aeca 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "bromine" -version = "0.20.1" +version = "0.20.2" authors = ["trivernis "] edition = "2018" readme = "README.md" @@ -46,7 +46,7 @@ features = [] [dependencies.tokio] version = "1.17.0" -features = ["net", "io-std", "io-util", "sync", "time", "macros"] +features = ["net", "io-std", "io-util", "sync", "time", "macros", "rt"] [dependencies.postcard] version = "0.7.3" diff --git a/src/events/event.rs b/src/events/event.rs index 59475f31..2ad6a441 100644 --- a/src/events/event.rs +++ b/src/events/event.rs @@ -103,7 +103,7 @@ impl Event { /// It represents the message that is replied to and can be None. #[inline] pub fn reference_id(&self) -> Option { - self.header.ref_id.clone() + self.header.ref_id } /// Decodes the payload to the given type implementing the receive payload trait diff --git a/src/events/event_handler.rs b/src/events/event_handler.rs index 16c09fe4..5f9168e7 100644 --- a/src/events/event_handler.rs +++ b/src/events/event_handler.rs @@ -39,7 +39,7 @@ type EventCallback = Arc< >; /// Handler for events -#[derive(Clone)] +#[derive(Clone, Default)] pub struct EventHandler { callbacks: HashMap, } @@ -59,9 +59,7 @@ impl Debug for EventHandler { impl EventHandler { /// Creates a new event handler pub fn new() -> Self { - Self { - callbacks: HashMap::new(), - } + Self::default() } /// Adds a new event callback diff --git a/src/events/payload.rs b/src/events/payload.rs index bad2d26f..0d70d7f4 100644 --- a/src/events/payload.rs +++ b/src/events/payload.rs @@ -98,8 +98,8 @@ impl TandemPayload { impl IntoPayload for TandemPayload { fn into_payload(self, ctx: &Context) -> IPCResult { - let p1_bytes = self.load1.into_payload(&ctx)?; - let p2_bytes = self.load2.into_payload(&ctx)?; + let p1_bytes = self.load1.into_payload(ctx)?; + let p2_bytes = self.load2.into_payload(ctx)?; let mut bytes = BytesMut::with_capacity(p1_bytes.len() + p2_bytes.len() + 16); diff --git a/src/ipc/builder.rs b/src/ipc/builder.rs index 566d6ffa..6bcfdd4c 100644 --- a/src/ipc/builder.rs +++ b/src/ipc/builder.rs @@ -69,6 +69,12 @@ pub struct IPCBuilder { stream_options: ::StreamOptions, } +impl Default for IPCBuilder { + fn default() -> Self { + Self::new() + } +} + impl IPCBuilder where L: AsyncStreamProtocolListener, @@ -236,7 +242,7 @@ where #[tracing::instrument(skip(self))] pub async fn build_pooled_client(self, pool_size: usize) -> Result { if pool_size == 0 { - Error::BuildError("Pool size must be greater than 0".to_string()); + return Err(Error::BuildError("Pool size must be greater than 0".to_string())); } self.validate()?; let data = Arc::new(RwLock::new(self.data)); @@ -250,7 +256,7 @@ where handler: self.handler.clone(), data: Arc::clone(&data), reply_listeners: reply_listeners.clone(), - timeout: self.timeout.clone(), + timeout: self.timeout, #[cfg(feature = "serialize")] default_serializer: self.default_serializer.clone(), diff --git a/src/ipc/context.rs b/src/ipc/context.rs index 540302b9..12f1c56c 100644 --- a/src/ipc/context.rs +++ b/src/ipc/context.rs @@ -83,7 +83,7 @@ impl Context { ) -> EmitMetadata

{ self.emitter.emit_raw( self.clone(), - self.ref_id.clone(), + self.ref_id, name, namespace, event_type, diff --git a/src/ipc/server.rs b/src/ipc/server.rs index c88d238a..ec88fa93 100644 --- a/src/ipc/server.rs +++ b/src/ipc/server.rs @@ -48,7 +48,7 @@ impl IPCServer { let handler = Arc::clone(&handler); let namespaces = Arc::clone(&namespaces); let data = Arc::clone(&data); - let timeout = self.timeout.clone(); + let timeout = self.timeout; #[cfg(feature = "serialize")] let default_serializer = self.default_serializer.clone(); @@ -69,7 +69,7 @@ impl IPCServer { default_serializer.clone(), ); #[cfg(not(feature = "serialize"))] - let ctx = Context::new(emitter, data, None, reply_listeners, timeout.into()); + let ctx = Context::new(emitter, data, None, reply_listeners, timeout); handle_connection::(namespaces, handler, read_half, ctx).await; }); diff --git a/src/ipc/stream_emitter/emit_metadata_with_response.rs b/src/ipc/stream_emitter/emit_metadata_with_response.rs index 896735d1..acd3b045 100644 --- a/src/ipc/stream_emitter/emit_metadata_with_response.rs +++ b/src/ipc/stream_emitter/emit_metadata_with_response.rs @@ -43,7 +43,7 @@ impl Future for EmitMetadataWithResponse let timeout = self .timeout .take() - .unwrap_or_else(|| ctx.default_reply_timeout.clone()); + .unwrap_or(ctx.default_reply_timeout); let event_id = match poll_unwrap!(emit_metadata.event_metadata.as_mut()).get_event() { Ok(e) => e.id(), @@ -58,7 +58,7 @@ impl Future for EmitMetadataWithResponse let reply = tokio::select! { tx_result = tx.recv() => { - Ok(tx_result.ok_or_else(|| Error::SendError)?) + tx_result.ok_or(Error::SendError) } _ = tokio::time::sleep(timeout) => { Err(Error::Timeout) diff --git a/src/ipc/stream_emitter/emit_metadata_with_response_stream.rs b/src/ipc/stream_emitter/emit_metadata_with_response_stream.rs index a0288de4..d90d40b5 100644 --- a/src/ipc/stream_emitter/emit_metadata_with_response_stream.rs +++ b/src/ipc/stream_emitter/emit_metadata_with_response_stream.rs @@ -10,23 +10,26 @@ use std::future::Future; use std::pin::Pin; use std::task::Poll; use std::time::Duration; +use futures_core::future::BoxFuture; use tokio::sync::mpsc::Receiver; /// A metadata object returned after waiting for a reply to an event /// This object needs to be awaited for to get the actual reply pub struct EmitMetadataWithResponseStream { pub(crate) timeout: Option, - pub(crate) fut: Option> + Send + Sync>>>, + pub(crate) fut: Option>>, pub(crate) emit_metadata: Option>, } +type StreamFutureResult = Result<(Option, Context, Receiver)>; + /// An asynchronous stream one can read all responses to a specific event from. pub struct ResponseStream { event_id: u64, ctx: Option, receiver: Option>, timeout: Duration, - fut: Option, Context, Receiver)>>>>>, + fut: Option>, } impl ResponseStream { @@ -71,7 +74,7 @@ impl Future for EmitMetadataWithResponse let timeout = self .timeout .take() - .unwrap_or_else(|| ctx.default_reply_timeout.clone()); + .unwrap_or(ctx.default_reply_timeout); let event_id = match poll_unwrap!(emit_metadata.event_metadata.as_mut()).get_event() { Ok(e) => e.id(), diff --git a/src/ipc/stream_emitter/event_metadata.rs b/src/ipc/stream_emitter/event_metadata.rs index 4ed9a57c..8834c3ea 100644 --- a/src/ipc/stream_emitter/event_metadata.rs +++ b/src/ipc/stream_emitter/event_metadata.rs @@ -37,11 +37,11 @@ impl EventMetadata

{ let payload = self.payload.take().ok_or(Error::InvalidState)?; let res_id = self.res_id.take().ok_or(Error::InvalidState)?; let event_type = self.event_type.take().ok_or(Error::InvalidState)?; - let payload_bytes = payload.into_payload(&ctx)?.into(); + let payload_bytes = payload.into_payload(&ctx)?; let event = Event::new( namespace, - event.to_string(), + event, payload_bytes, res_id, event_type,