diff --git a/Cargo.lock b/Cargo.lock index 5754d327..0caff5d4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -93,7 +93,7 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" [[package]] name = "bromine" -version = "0.15.1" +version = "0.16.0" dependencies = [ "async-trait", "bincode", diff --git a/Cargo.toml b/Cargo.toml index 7eb91762..545b93fc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "bromine" -version = "0.15.1" +version = "0.16.0" authors = ["trivernis "] edition = "2018" readme = "README.md" diff --git a/SPECIFICATON.md b/SPECIFICATON.md index 73401fcd..e112845e 100644 --- a/SPECIFICATON.md +++ b/SPECIFICATON.md @@ -84,11 +84,13 @@ If no namespace for the event namespace is registered or no handler is registere the event name, the event will be ignored. -### Receiving answers to emitted events +### Receiving replies to emitted events When emitting an event to a peer, the emitter can wait for an answer to that event. This is achieved by emitting events as a response to a specific event id. When an event with a reference event id (ref_id) is received, first the registry is searched for handlers waiting for a response (by trying to receive from a channel). If a handler can be found, the event is passed to the handler waiting for the response. -Otherwise, the event will be processed as a regular event. \ No newline at end of file +Otherwise, the event will be processed as a regular event. +Events passed from an event handler are always passed as replies to the event that +called that handler. \ No newline at end of file diff --git a/src/events/error_event.rs b/src/events/error_event.rs index 5359dbca..2b6e160e 100644 --- a/src/events/error_event.rs +++ b/src/events/error_event.rs @@ -1,5 +1,6 @@ +use crate::context::Context; use crate::error::Result; -use crate::payload::{EventReceivePayload, EventSendPayload}; +use crate::payload::{FromPayload, IntoPayload}; use crate::prelude::{IPCError, IPCResult}; use byteorder::{BigEndian, ReadBytesExt}; use std::error::Error; @@ -26,8 +27,8 @@ impl Display for ErrorEventData { } } -impl EventSendPayload for ErrorEventData { - fn to_payload_bytes(self) -> IPCResult> { +impl IntoPayload for ErrorEventData { + fn into_payload(self, _: &Context) -> IPCResult> { let mut buf = Vec::new(); buf.append(&mut self.code.to_be_bytes().to_vec()); let message_len = self.message.len() as u32; @@ -38,8 +39,8 @@ impl EventSendPayload for ErrorEventData { } } -impl EventReceivePayload for ErrorEventData { - fn from_payload_bytes(mut reader: R) -> Result { +impl FromPayload for ErrorEventData { + fn from_payload(mut reader: R) -> Result { let code = reader.read_u16::()?; let message_len = reader.read_u32::()?; let mut message_buf = vec![0u8; message_len as usize]; diff --git a/src/events/event.rs b/src/events/event.rs index 637c7cec..a57243c1 100644 --- a/src/events/event.rs +++ b/src/events/event.rs @@ -1,6 +1,6 @@ use crate::error::{Error, Result}; use crate::events::generate_event_id; -use crate::events::payload::EventReceivePayload; +use crate::events::payload::FromPayload; #[cfg(feature = "serialize")] use crate::payload::SerdePayload; use crate::prelude::{IPCError, IPCResult}; @@ -73,8 +73,8 @@ impl Event { /// Decodes the payload to the given type implementing the receive payload trait #[tracing::instrument(level = "trace", skip(self))] - pub fn payload(&self) -> Result { - let payload = T::from_payload_bytes(&self.data[..])?; + pub fn payload(&self) -> Result { + let payload = T::from_payload(&self.data[..])?; Ok(payload) } @@ -83,7 +83,7 @@ impl Event { /// Decodes the payload to the given type implementing DeserializeOwned #[tracing::instrument(level = "trace", skip(self))] pub fn serde_payload(&self) -> Result { - let payload = SerdePayload::::from_payload_bytes(&self.data[..])?; + let payload = SerdePayload::::from_payload(&self.data[..])?; Ok(payload.data()) } diff --git a/src/events/payload.rs b/src/events/payload.rs index 9c62c05f..3bc152cc 100644 --- a/src/events/payload.rs +++ b/src/events/payload.rs @@ -5,16 +5,26 @@ use std::io::Read; #[cfg(feature = "serialize")] pub use super::payload_serializer::*; +/// Trait that serializes a type into bytes and can fail +pub trait TryIntoBytes { + fn try_into_bytes(self) -> IPCResult>; +} + +/// Trait that serializes a type into bytes and never fails +pub trait IntoBytes { + fn into_bytes(self) -> Vec; +} + /// Trait to convert event data into sending bytes /// It is implemented for all types that implement Serialize -pub trait EventSendPayload { - fn to_payload_bytes(self) -> IPCResult>; +pub trait IntoPayload { + fn into_payload(self, ctx: &Context) -> IPCResult>; } /// Trait to get the event data from receiving bytes. /// It is implemented for all types that are DeserializeOwned -pub trait EventReceivePayload: Sized { - fn from_payload_bytes(reader: R) -> IPCResult; +pub trait FromPayload: Sized { + fn from_payload(reader: R) -> IPCResult; } /// A payload wrapper type for sending bytes directly without @@ -35,14 +45,14 @@ impl BytePayload { } } -impl EventSendPayload for BytePayload { - fn to_payload_bytes(self) -> IPCResult> { +impl IntoPayload for BytePayload { + fn into_payload(self, _: &Context) -> IPCResult> { Ok(self.bytes) } } -impl EventReceivePayload for BytePayload { - fn from_payload_bytes(mut reader: R) -> IPCResult { +impl FromPayload for BytePayload { + fn from_payload(mut reader: R) -> IPCResult { let mut buf = Vec::new(); reader.read_to_end(&mut buf)?; @@ -70,14 +80,10 @@ impl TandemPayload { } } -impl EventSendPayload for TandemPayload -where - P1: EventSendPayload, - P2: EventSendPayload, -{ - fn to_payload_bytes(self) -> IPCResult> { - let mut p1_bytes = self.load1.to_payload_bytes()?; - let mut p2_bytes = self.load2.to_payload_bytes()?; +impl IntoPayload for TandemPayload { + fn into_payload(self, ctx: &Context) -> IPCResult> { + let mut p1_bytes = self.load1.into_payload(&ctx)?; + let mut p2_bytes = self.load2.into_payload(&ctx)?; let mut p1_length_bytes = (p1_bytes.len() as u64).to_be_bytes().to_vec(); let mut p2_length_bytes = (p2_bytes.len() as u64).to_be_bytes().to_vec(); @@ -92,12 +98,8 @@ where } } -impl EventReceivePayload for TandemPayload -where - P1: EventReceivePayload, - P2: EventReceivePayload, -{ - fn from_payload_bytes(mut reader: R) -> IPCResult { +impl FromPayload for TandemPayload { + fn from_payload(mut reader: R) -> IPCResult { let p1_length = reader.read_u64::()?; let mut load1_bytes = vec![0u8; p1_length as usize]; reader.read_exact(&mut load1_bytes)?; @@ -107,14 +109,15 @@ where reader.read_exact(&mut load2_bytes)?; Ok(Self { - load1: P1::from_payload_bytes(load1_bytes.as_slice())?, - load2: P2::from_payload_bytes(load2_bytes.as_slice())?, + load1: P1::from_payload(load1_bytes.as_slice())?, + load2: P2::from_payload(load2_bytes.as_slice())?, }) } } -impl EventSendPayload for () { - fn to_payload_bytes(self) -> IPCResult> { +#[cfg(not(feature = "serialize"))] +impl IntoPayload for () { + fn into_payload(self, _: &Context) -> IPCResult> { Ok(vec![]) } } @@ -123,8 +126,8 @@ impl EventSendPayload for () { mod serde_payload { use super::DynamicSerializer; use crate::context::Context; - use crate::payload::EventReceivePayload; - use crate::prelude::{EventSendPayload, IPCResult}; + use crate::payload::{FromPayload, TryIntoBytes}; + use crate::prelude::{IPCResult, IntoPayload}; use byteorder::ReadBytesExt; use serde::de::DeserializeOwned; use serde::Serialize; @@ -147,10 +150,7 @@ mod serde_payload { } } - impl Clone for SerdePayload - where - T: Clone, - { + impl Clone for SerdePayload { fn clone(&self) -> Self { Self { serializer: self.serializer.clone(), @@ -159,11 +159,8 @@ mod serde_payload { } } - impl EventSendPayload for SerdePayload - where - T: Serialize, - { - fn to_payload_bytes(self) -> IPCResult> { + impl TryIntoBytes for SerdePayload { + fn try_into_bytes(self) -> IPCResult> { let mut buf = Vec::new(); let mut data_bytes = self.serializer.serialize(self.data)?; let format_id = self.serializer as u8; @@ -174,11 +171,14 @@ mod serde_payload { } } - impl EventReceivePayload for SerdePayload - where - T: DeserializeOwned, - { - fn from_payload_bytes(mut reader: R) -> IPCResult { + impl IntoPayload for SerdePayload { + fn into_payload(self, _: &Context) -> IPCResult> { + self.try_into_bytes() + } + } + + impl FromPayload for SerdePayload { + fn from_payload(mut reader: R) -> IPCResult { let format_id = reader.read_u8()?; let serializer = DynamicSerializer::from_primitive(format_id as usize)?; let data = serializer.deserialize(reader)?; @@ -187,14 +187,13 @@ mod serde_payload { } } - pub trait IntoSerdePayload: Sized { - fn into_serde_payload(self, ctx: &Context) -> SerdePayload { - ctx.create_serde_payload(self) + impl IntoPayload for T { + fn into_payload(self, ctx: &Context) -> IPCResult> { + ctx.create_serde_payload(self).into_payload(&ctx) } } - - impl IntoSerdePayload for T where T: Serialize {} } +use crate::context::Context; #[cfg(feature = "serialize")] pub use serde_payload::*; diff --git a/src/ipc/context.rs b/src/ipc/context.rs index 5f42f22f..f63e8d0e 100644 --- a/src/ipc/context.rs +++ b/src/ipc/context.rs @@ -1,6 +1,6 @@ use crate::error::{Error, Result}; use crate::event::Event; -use crate::ipc::stream_emitter::StreamEmitter; +use crate::ipc::stream_emitter::{EmitMetadata, StreamEmitter}; use futures::future; use futures::future::Either; use std::collections::HashMap; @@ -13,6 +13,7 @@ use tokio::sync::{oneshot, Mutex, RwLock}; use tokio::time::Duration; use typemap_rev::TypeMap; +use crate::payload::IntoPayload; #[cfg(feature = "serialize")] use crate::payload::{DynamicSerializer, SerdePayload}; @@ -26,14 +27,14 @@ pub(crate) type ReplyListeners = Arc>> /// async fn my_callback(ctx: &Context, _event: Event) -> IPCResult<()> { /// // use the emitter on the context object to emit events /// // inside callbacks -/// ctx.emitter.emit("ping", ()).await?; +/// ctx.emit("ping", ()).await?; /// Ok(()) /// } /// ``` #[derive(Clone)] pub struct Context { /// The event emitter - pub emitter: StreamEmitter, + emitter: StreamEmitter, /// Field to store additional context data pub data: Arc>, @@ -44,8 +45,10 @@ pub struct Context { reply_timeout: Duration, + ref_id: Option, + #[cfg(feature = "serialize")] - default_serializer: DynamicSerializer, + pub default_serializer: DynamicSerializer, } impl Context { @@ -65,6 +68,42 @@ impl Context { reply_timeout, #[cfg(feature = "serialize")] default_serializer, + ref_id: None, + } + } + + /// Emits an event with a given payload that can be serialized into bytes + pub async fn emit, P: IntoPayload>( + &self, + name: S, + payload: P, + ) -> Result { + let payload_bytes = payload.into_payload(&self)?; + + if let Some(ref_id) = &self.ref_id { + self.emitter + .emit_response(*ref_id, name, payload_bytes) + .await + } else { + self.emitter.emit(name, payload_bytes).await + } + } + + /// Emits an event to a specific namespace + pub async fn emit_to, S2: AsRef, P: IntoPayload>( + &self, + namespace: S1, + name: S2, + payload: P, + ) -> Result { + let payload_bytes = payload.into_payload(&self)?; + + if let Some(ref_id) = &self.ref_id { + self.emitter + .emit_response_to(*ref_id, namespace, name, payload_bytes) + .await + } else { + self.emitter.emit_to(namespace, name, payload_bytes).await } } @@ -115,6 +154,10 @@ impl Context { let mut listeners = self.reply_listeners.lock().await; listeners.remove(&ref_id) } + + pub(crate) fn set_ref_id(&mut self, id: Option) { + self.ref_id = id; + } } pub struct PooledContext { diff --git a/src/ipc/mod.rs b/src/ipc/mod.rs index 3438d3cc..be02f5a9 100644 --- a/src/ipc/mod.rs +++ b/src/ipc/mod.rs @@ -52,15 +52,14 @@ async fn handle_connection( } /// Handles a single event in a different tokio context -fn handle_event(ctx: Context, handler: Arc, event: Event) { +fn handle_event(mut ctx: Context, handler: Arc, event: Event) { + ctx.set_ref_id(Some(event.id())); + tokio::spawn(async move { - let id = event.id(); if let Err(e) = handler.handle_event(&ctx, event).await { // emit an error event if let Err(e) = ctx - .emitter - .emit_response( - id, + .emit( ERROR_EVENT_NAME, ErrorEventData { message: format!("{:?}", e), diff --git a/src/ipc/stream_emitter.rs b/src/ipc/stream_emitter.rs index 7cbd232e..c937f12e 100644 --- a/src/ipc/stream_emitter.rs +++ b/src/ipc/stream_emitter.rs @@ -1,7 +1,6 @@ use crate::error::Result; use crate::error_event::{ErrorEventData, ERROR_EVENT_NAME}; use crate::events::event::Event; -use crate::events::payload::EventSendPayload; use crate::ipc::context::Context; use crate::protocol::AsyncProtocolStream; use std::ops::DerefMut; @@ -12,18 +11,11 @@ use tokio::sync::Mutex; /// An abstraction over any type that implements the AsyncProtocolStream trait /// to emit events and share a connection across multiple /// contexts. +#[derive(Clone)] pub struct StreamEmitter { stream: Arc>, } -impl Clone for StreamEmitter { - fn clone(&self) -> Self { - Self { - stream: Arc::clone(&self.stream), - } - } -} - impl StreamEmitter { pub fn new(stream: P::OwnedSplitWriteHalf) -> Self { Self { @@ -32,7 +24,7 @@ impl StreamEmitter { } #[tracing::instrument(level = "trace", skip(self, data_bytes))] - pub async fn _emit( + async fn _emit( &self, namespace: Option<&str>, event: &str, @@ -58,59 +50,48 @@ impl StreamEmitter { } /// Emits an event - pub async fn emit, T: EventSendPayload>( + pub(crate) async fn emit>( &self, event: S, - payload: T, + payload: Vec, ) -> Result { - self._emit(None, event.as_ref(), payload.to_payload_bytes()?, None) - .await + self._emit(None, event.as_ref(), payload, None).await } /// Emits an event to a specific namespace - pub async fn emit_to, S2: AsRef, T: EventSendPayload>( + pub(crate) async fn emit_to, S2: AsRef>( &self, namespace: S1, event: S2, - payload: T, + payload: Vec, ) -> Result { - self._emit( - Some(namespace.as_ref()), - event.as_ref(), - payload.to_payload_bytes()?, - None, - ) - .await + self._emit(Some(namespace.as_ref()), event.as_ref(), payload, None) + .await } /// Emits a response to an event - pub async fn emit_response, T: EventSendPayload>( + pub(crate) async fn emit_response>( &self, event_id: u64, event: S, - payload: T, + payload: Vec, ) -> Result { - self._emit( - None, - event.as_ref(), - payload.to_payload_bytes()?, - Some(event_id), - ) - .await + self._emit(None, event.as_ref(), payload, Some(event_id)) + .await } /// Emits a response to an event to a namespace - pub async fn emit_response_to, S2: AsRef, T: EventSendPayload>( + pub(crate) async fn emit_response_to, S2: AsRef>( &self, event_id: u64, namespace: S1, event: S2, - payload: T, + payload: Vec, ) -> Result { self._emit( Some(namespace.as_ref()), event.as_ref(), - payload.to_payload_bytes()?, + payload, Some(event_id), ) .await diff --git a/src/lib.rs b/src/lib.rs index 165322c0..b6e0acbe 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -8,7 +8,7 @@ //! /// Callback ping function //! async fn handle_ping(ctx: &Context, event: Event) -> IPCResult<()> { //! println!("Received ping event."); -//! ctx.emitter.emit_response(event.id(), "pong", ()).await?; +//! ctx.emit("pong", ()).await?; //! //! Ok(()) //! } @@ -45,7 +45,7 @@ //! // register callback inline //! .on("something", callback!(ctx, event, async move { //! println!("I think the server did something"); -//! ctx.emitter.emit_response_to(event.id(), "mainspace-server", "ok", ()).await?; +//! ctx.emit_to("mainspace-server", "ok", ()).await?; //! Ok(()) //! })) //! .build() @@ -53,7 +53,7 @@ //! .build_client().await.unwrap(); //! //! // emit an initial event -//! let response = ctx.emitter.emit("ping", ()).await.unwrap().await_reply(&ctx).await.unwrap(); +//! let response = ctx.emit("ping", ()).await.unwrap().await_reply(&ctx).await.unwrap(); //! assert_eq!(response.name(), "pong"); //! } //! ``` @@ -79,7 +79,7 @@ //! // register callback //! .on("ping", callback!(ctx, event, async move { //! println!("Received ping event."); -//! ctx.emitter.emit_response(event.id(), "pong", ()).await?; +//! ctx.emit("pong", ()).await?; //! Ok(()) //! })) //! .namespace("mainspace-server") @@ -91,7 +91,7 @@ //! let mut my_key = data.get_mut::().unwrap(); //! *my_key += 1; //! } -//! ctx.emitter.emit_response_to(event.id(), "mainspace-client", "something", ()).await?; +//! ctx.emit_to("mainspace-client", "something", ()).await?; //! Ok(()) //! })) //! .build() diff --git a/tests/test_events_with_payload.rs b/tests/test_events_with_payload.rs index 64bde1fc..153faf2f 100644 --- a/tests/test_events_with_payload.rs +++ b/tests/test_events_with_payload.rs @@ -17,9 +17,7 @@ async fn it_sends_payloads() { string: String::from("Hello World"), }; #[cfg(feature = "serialize")] - let payload = payload.into_serde_payload(&ctx); - - ctx.emitter.emit("ping", payload).await.unwrap(); + ctx.emit("ping", payload).await.unwrap(); // wait for the event to be handled tokio::time::sleep(Duration::from_millis(10)).await; @@ -39,10 +37,7 @@ async fn it_receives_payloads() { string: String::from("Hello World"), }; #[cfg(feature = "serialize")] - let payload = payload.into_serde_payload(&ctx); - let reply = ctx - .emitter .emit("ping", payload) .await .unwrap() @@ -79,9 +74,7 @@ async fn handle_ping_event(ctx: &Context, event: Event) -> IPCResult<()> { let payload = get_simple_payload(&event)?; #[cfg(feature = "serialize")] { - ctx.emitter - .emit_response(event.id(), "pong", payload.into_serde_payload(&ctx)) - .await?; + ctx.emit("pong", payload).await?; } #[cfg(not(feature = "serialize"))] { @@ -125,7 +118,7 @@ mod payload_impl { #[cfg(not(feature = "serialize"))] mod payload_impl { use bromine::error::Result; - use bromine::payload::{EventReceivePayload, EventSendPayload}; + use bromine::payload::{FromPayload, IntoPayload}; use bromine::prelude::IPCResult; use byteorder::{BigEndian, ReadBytesExt}; use std::io::Read; @@ -135,7 +128,7 @@ mod payload_impl { pub number: u32, } - impl EventSendPayload for SimplePayload { + impl IntoPayload for SimplePayload { fn to_payload_bytes(self) -> IPCResult> { let mut buf = Vec::new(); let string_length = self.string.len() as u16; @@ -150,8 +143,8 @@ mod payload_impl { } } - impl EventReceivePayload for SimplePayload { - fn from_payload_bytes(mut reader: R) -> Result { + impl FromPayload for SimplePayload { + fn from_payload(mut reader: R) -> Result { let string_length = reader.read_u16::()?; let mut string_buf = vec![0u8; string_length as usize]; reader.read_exact(&mut string_buf)?; diff --git a/tests/test_raw_events.rs b/tests/test_raw_events.rs index 66b7d129..0cbe2f2c 100644 --- a/tests/test_raw_events.rs +++ b/tests/test_raw_events.rs @@ -13,7 +13,7 @@ use utils::protocol::*; async fn it_sends_events() { let port = get_free_port(); let ctx = get_client_with_server(port).await; - ctx.emitter.emit("ping", EmptyPayload).await.unwrap(); + ctx.emit("ping", EmptyPayload).await.unwrap(); // allow the event to be processed tokio::time::sleep(Duration::from_millis(10)).await; @@ -28,14 +28,8 @@ async fn it_sends_events() { async fn it_sends_namespaced_events() { let port = get_free_port(); let ctx = get_client_with_server(port).await; - ctx.emitter - .emit_to("test", "ping", EmptyPayload) - .await - .unwrap(); - ctx.emitter - .emit_to("test", "pong", EmptyPayload) - .await - .unwrap(); + ctx.emit_to("test", "ping", EmptyPayload).await.unwrap(); + ctx.emit_to("test", "pong", EmptyPayload).await.unwrap(); // allow the event to be processed tokio::time::sleep(Duration::from_millis(10)).await; @@ -52,7 +46,6 @@ async fn it_receives_responses() { let port = get_free_port(); let ctx = get_client_with_server(port).await; let reply = ctx - .emitter .emit("ping", EmptyPayload) .await .unwrap() @@ -72,10 +65,7 @@ async fn it_receives_responses() { async fn it_handles_errors() { let port = get_free_port(); let ctx = get_client_with_server(port).await; - ctx.emitter - .emit("create_error", EmptyPayload) - .await - .unwrap(); + ctx.emit("create_error", EmptyPayload).await.unwrap(); // allow the event to be processed tokio::time::sleep(Duration::from_millis(10)).await; let counter = get_counter_from_context(&ctx).await; @@ -90,7 +80,6 @@ async fn it_receives_error_responses() { let port = get_free_port(); let ctx = get_client_with_server(port).await; let result = ctx - .emitter .emit("create_error", EmptyPayload) .await .unwrap() @@ -124,9 +113,7 @@ fn get_builder(port: u8) -> IPCBuilder { async fn handle_ping_event(ctx: &Context, event: Event) -> IPCResult<()> { increment_counter_for_event(ctx, &event).await; - ctx.emitter - .emit_response(event.id(), "pong", EmptyPayload) - .await?; + ctx.emit("pong", EmptyPayload).await?; Ok(()) } @@ -151,8 +138,8 @@ async fn handle_error_event(ctx: &Context, event: Event) -> IPCResult<()> { pub struct EmptyPayload; -impl EventSendPayload for EmptyPayload { - fn to_payload_bytes(self) -> IPCResult> { +impl IntoPayload for EmptyPayload { + fn into_payload(self, _: &Context) -> IPCResult> { Ok(vec![]) } } diff --git a/tests/test_serialization.rs b/tests/test_serialization.rs index b2a1c7c7..1785b9d3 100644 --- a/tests/test_serialization.rs +++ b/tests/test_serialization.rs @@ -27,8 +27,8 @@ fn it_serializes_json() { #[cfg(feature = "serialize")] fn test_serialization(serializer: DynamicSerializer) { let test_payload = get_test_payload(serializer); - let payload_bytes = test_payload.clone().to_payload_bytes().unwrap(); - let payload = TestSerdePayload::from_payload_bytes(&payload_bytes[..]).unwrap(); + let payload_bytes = test_payload.clone().try_into_bytes().unwrap(); + let payload = TestSerdePayload::from_payload(&payload_bytes[..]).unwrap(); assert_eq!(payload.data(), test_payload.data()) }