diff --git a/Cargo.lock b/Cargo.lock index 5fda57b5..35da139d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -148,7 +148,7 @@ dependencies = [ [[package]] name = "rmp-ipc" -version = "0.6.0" +version = "0.6.1" dependencies = [ "lazy_static", "log", diff --git a/Cargo.toml b/Cargo.toml index 150cbf74..ec65a37b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "rmp-ipc" -version = "0.6.0" +version = "0.6.1" authors = ["trivernis "] edition = "2018" readme = "README.md" diff --git a/src/events/mod.rs b/src/events/mod.rs index b0a5dd12..2d66b5eb 100644 --- a/src/events/mod.rs +++ b/src/events/mod.rs @@ -5,6 +5,7 @@ use std::time::{SystemTime, UNIX_EPOCH}; pub mod error_event; pub mod event; pub mod event_handler; +pub mod payload; /// Generates a new event id pub(crate) fn generate_event_id() -> u64 { diff --git a/src/events/payload.rs b/src/events/payload.rs new file mode 100644 index 00000000..b39f4916 --- /dev/null +++ b/src/events/payload.rs @@ -0,0 +1,17 @@ +use crate::prelude::IPCResult; +use serde::Serialize; + +pub trait EventSendPayload { + fn to_payload_bytes(self) -> IPCResult>; +} + +impl EventSendPayload for T +where + T: Serialize, +{ + fn to_payload_bytes(self) -> IPCResult> { + let bytes = rmp_serde::to_vec(&self)?; + + Ok(bytes) + } +} diff --git a/src/ipc/stream_emitter.rs b/src/ipc/stream_emitter.rs index 14dff799..e6945de8 100644 --- a/src/ipc/stream_emitter.rs +++ b/src/ipc/stream_emitter.rs @@ -1,11 +1,12 @@ use crate::error::Result; use crate::events::event::Event; +use crate::events::payload::EventSendPayload; use crate::ipc::context::Context; -use serde::Serialize; use std::sync::Arc; use tokio::io::AsyncWriteExt; use tokio::net::tcp::OwnedWriteHalf; use tokio::sync::Mutex; +use tokio::time::Instant; /// An abstraction over the raw tokio tcp stream /// to emit events and share a connection across multiple @@ -22,14 +23,14 @@ impl StreamEmitter { } } - pub async fn _emit( + pub async fn _emit( &self, namespace: Option<&str>, event: &str, data: T, res_id: Option, ) -> Result { - let data_bytes = rmp_serde::to_vec(&data)?; + let data_bytes = data.to_payload_bytes()?; log::debug!("Emitting event {:?}:{}", namespace, event); let event = if let Some(namespace) = namespace { @@ -40,16 +41,17 @@ impl StreamEmitter { let event_bytes = event.to_bytes()?; { - log::trace!("Writing {} bytes", event_bytes.len()); + let start = Instant::now(); let mut stream = self.stream.lock().await; (*stream).write_all(&event_bytes[..]).await?; + log::trace!("Wrote {} bytes in {:?}", event_bytes.len(), start.elapsed()); } Ok(EmitMetadata::new(event.id())) } /// Emits an event - pub async fn emit, T: Serialize>( + pub async fn emit, T: EventSendPayload>( &self, event: S, data: T, @@ -58,7 +60,7 @@ impl StreamEmitter { } /// Emits an event to a specific namespace - pub async fn emit_to, S2: AsRef, T: Serialize>( + pub async fn emit_to, S2: AsRef, T: EventSendPayload>( &self, namespace: S1, event: S2, @@ -69,7 +71,7 @@ impl StreamEmitter { } /// Emits a response to an event - pub async fn emit_response, T: Serialize>( + pub async fn emit_response, T: EventSendPayload>( &self, event_id: u64, event: S, @@ -79,7 +81,7 @@ impl StreamEmitter { } /// Emits a response to an event to a namespace - pub async fn emit_response_to, S2: AsRef, T: Serialize>( + pub async fn emit_response_to, S2: AsRef, T: EventSendPayload>( &self, event_id: u64, namespace: S1,