From 4803a655d7c4ab813177b5486555144528b1e8cd Mon Sep 17 00:00:00 2001 From: trivernis Date: Sun, 6 Feb 2022 11:11:08 +0100 Subject: [PATCH 01/12] Introduce event types Signed-off-by: trivernis --- Cargo.lock | 41 ++++++++++++++ Cargo.toml | 1 + src/events/event.rs | 77 ++++++++++++++++++++----- src/events/mod.rs | 1 - src/ipc/stream_emitter.rs | 110 +++++++++++++++++++++++++++--------- tests/test_serialization.rs | 1 - 6 files changed, 188 insertions(+), 43 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1e339cbc..347cf69e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -102,6 +102,7 @@ dependencies = [ "crossbeam-utils", "futures", "lazy_static", + "num_enum", "postcard", "rmp-serde", "serde", @@ -578,6 +579,27 @@ dependencies = [ "libc", ] +[[package]] +name = "num_enum" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "720d3ea1055e4e4574c0c0b0f8c3fd4f24c4cdaf465948206dea090b57b526ad" +dependencies = [ + "num_enum_derive", +] + +[[package]] +name = "num_enum_derive" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d992b768490d7fe0d8586d9b5745f6c49f557da6d81dc982b1d167ad4edbb21" +dependencies = [ + "proc-macro-crate", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "oorandom" version = "11.1.3" @@ -641,6 +663,16 @@ version = "0.1.5-pre" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7c68cb38ed13fd7bc9dd5db8f165b7c8d9c1a315104083a2b10f11354c2af97f" +[[package]] +name = "proc-macro-crate" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ebace6889caf889b4d3f76becee12e90353f2b8c7d875534a71e5742f8f6f83" +dependencies = [ + "thiserror", + "toml", +] + [[package]] name = "proc-macro2" version = "1.0.35" @@ -948,6 +980,15 @@ dependencies = [ "syn", ] +[[package]] +name = "toml" +version = "0.5.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a31142970826733df8241ef35dc040ef98c679ab14d7c3e54d827099b3acecaa" +dependencies = [ + "serde", +] + [[package]] name = "tracing" version = "0.1.29" diff --git a/Cargo.toml b/Cargo.toml index d6285cf8..5f58824f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,6 +27,7 @@ typemap_rev = "0.1.5" byteorder = "1.4.3" async-trait = "0.1.52" futures = "0.3.19" +num_enum = "0.5.6" rmp-serde = {version = "0.15.5", optional = true} bincode = {version = "1.3.3", optional = true} serde_json = {version = "1.0.73", optional = true} diff --git a/src/events/event.rs b/src/events/event.rs index e04f0268..f94df91c 100644 --- a/src/events/event.rs +++ b/src/events/event.rs @@ -2,6 +2,8 @@ use crate::error::{Error, Result}; use crate::events::generate_event_id; use crate::events::payload::FromPayload; use byteorder::{BigEndian, ReadBytesExt}; +use num_enum::{IntoPrimitive, TryFromPrimitive}; +use std::convert::TryFrom; use std::fmt::Debug; use std::io::{Cursor, Read}; use tokio::io::{AsyncRead, AsyncReadExt}; @@ -20,36 +22,75 @@ pub struct Event { #[derive(Debug)] struct EventHeader { id: u64, + event_type: EventType, ref_id: Option, namespace: Option, name: String, } +#[derive(Clone, Debug, TryFromPrimitive, IntoPrimitive, Copy, Ord, PartialOrd, Eq, PartialEq)] +#[repr(u8)] +pub enum EventType { + Initiator, + Response, + End, + Error, +} + impl Event { - /// Creates a new event with a namespace + /// Creates a new event that acts as an initiator for further response events #[tracing::instrument(level = "trace", skip(data))] - pub fn with_namespace( - namespace: String, + #[inline] + pub(crate) fn initiator(namespace: Option, name: String, data: Vec) -> Self { + Self::new(namespace, name, data, None, EventType::Initiator) + } + + /// Creates a new event that is a response to a previous event + #[tracing::instrument(level = "trace", skip(data))] + #[inline] + pub(crate) fn response( + namespace: Option, name: String, data: Vec, - ref_id: Option, + ref_id: u64, ) -> Self { - let header = EventHeader { - id: generate_event_id(), - ref_id, - namespace: Some(namespace), - name, - }; - Self { header, data } + Self::new(namespace, name, data, Some(ref_id), EventType::Response) + } + + /// Creates a new error event as a response to a previous event + #[tracing::instrument(level = "trace", skip(data))] + #[inline] + pub(crate) fn error( + namespace: Option, + name: String, + data: Vec, + ref_id: u64, + ) -> Self { + Self::new(namespace, name, data, Some(ref_id), EventType::Error) + } + + /// Creates a new event that indicates the end of a series of responses (in an event handler) + /// and might contain a final response payload + #[tracing::instrument(level = "trace", skip(data))] + #[inline] + pub(crate) fn end(namespace: Option, name: String, data: Vec, ref_id: u64) -> Self { + Self::new(namespace, name, data, Some(ref_id), EventType::Response) } /// Creates a new event #[tracing::instrument(level = "trace", skip(data))] - pub fn new(name: String, data: Vec, ref_id: Option) -> Self { + pub(crate) fn new( + namespace: Option, + name: String, + data: Vec, + ref_id: Option, + event_type: EventType, + ) -> Self { let header = EventHeader { id: generate_event_id(), + event_type, ref_id, - namespace: None, + namespace, name, }; Self { header, data } @@ -61,6 +102,12 @@ impl Event { self.header.id } + /// The type of the event + #[inline] + pub fn event_type(&self) -> EventType { + self.header.event_type + } + /// The ID of the message referenced by this message. /// It represents the message that is replied to and can be None. #[inline] @@ -139,6 +186,7 @@ impl EventHeader { pub fn into_bytes(self) -> Vec { let mut buf = FORMAT_VERSION.to_vec(); buf.append(&mut self.id.to_be_bytes().to_vec()); + buf.push(self.event_type.into()); if let Some(ref_id) = self.ref_id { buf.push(0xFF); @@ -164,6 +212,8 @@ impl EventHeader { pub fn from_read(reader: &mut R) -> Result { Self::read_version(reader)?; let id = reader.read_u64::()?; + let event_type_num = reader.read_u8()?; + let event_type = EventType::try_from(event_type_num).map_err(|_| Error::CorruptedEvent)?; let ref_id = Self::read_ref_id(reader)?; let namespace_len = reader.read_u16::()?; let namespace = Self::read_namespace(reader, namespace_len)?; @@ -171,6 +221,7 @@ impl EventHeader { Ok(Self { id, + event_type, ref_id, namespace, name, diff --git a/src/events/mod.rs b/src/events/mod.rs index 207ecb7b..2e039d59 100644 --- a/src/events/mod.rs +++ b/src/events/mod.rs @@ -10,7 +10,6 @@ pub mod payload; #[cfg(feature = "serialize")] pub mod payload_serializer; - /// Generates a new event id pub(crate) fn generate_event_id() -> u64 { lazy_static::lazy_static! { diff --git a/src/ipc/stream_emitter.rs b/src/ipc/stream_emitter.rs index fb058198..ba6b9fc1 100644 --- a/src/ipc/stream_emitter.rs +++ b/src/ipc/stream_emitter.rs @@ -13,7 +13,8 @@ use tokio::sync::Mutex; use tracing; use crate::error::{Error, Result}; -use crate::error_event::{ERROR_EVENT_NAME, ErrorEventData}; +use crate::error_event::{ErrorEventData, ERROR_EVENT_NAME}; +use crate::event::EventType; use crate::events::event::Event; use crate::ipc::context::Context; use crate::payload::IntoPayload; @@ -25,9 +26,9 @@ macro_rules! poll_unwrap { v } else { tracing::error!("Polling a future with an invalid state."); - return Poll::Ready(Err(Error::InvalidState)) + return Poll::Ready(Err(Error::InvalidState)); } - } + }; } type SendStream = Arc>; @@ -55,8 +56,17 @@ impl StreamEmitter { event: &str, payload: P, res_id: Option, + event_type: EventType, ) -> EmitMetadata

{ - EmitMetadata::new(ctx, self.stream.clone(), event.to_string(), namespace.map(|n| n.to_string()), payload, res_id) + EmitMetadata::new( + ctx, + self.stream.clone(), + event.to_string(), + namespace.map(|n| n.to_string()), + payload, + res_id, + event_type, + ) } /// Emits an event @@ -67,7 +77,14 @@ impl StreamEmitter { event: S, payload: P, ) -> EmitMetadata

{ - self._emit(ctx, None, event.as_ref(), payload, None) + self._emit( + ctx, + None, + event.as_ref(), + payload, + None, + EventType::Initiator, + ) } /// Emits an event to a specific namespace @@ -79,7 +96,14 @@ impl StreamEmitter { event: S2, payload: P, ) -> EmitMetadata

{ - self._emit(ctx, Some(namespace.as_ref()), event.as_ref(), payload, None) + self._emit( + ctx, + Some(namespace.as_ref()), + event.as_ref(), + payload, + None, + EventType::Initiator, + ) } /// Emits a response to an event @@ -91,7 +115,14 @@ impl StreamEmitter { event: S, payload: P, ) -> EmitMetadata

{ - self._emit(ctx, None, event.as_ref(), payload, Some(event_id)) + self._emit( + ctx, + None, + event.as_ref(), + payload, + Some(event_id), + EventType::Response, + ) } /// Emits a response to an event to a namespace @@ -110,6 +141,7 @@ impl StreamEmitter { event.as_ref(), payload, Some(event_id), + EventType::Response, ) } } @@ -120,6 +152,7 @@ struct EventMetadata { event_namespace: Option>, event_name: Option, res_id: Option>, + event_type: Option, payload: Option

, } @@ -144,13 +177,17 @@ impl EventMetadata

{ let namespace = self.event_namespace.take().ok_or(Error::InvalidState)?; let payload = self.payload.take().ok_or(Error::InvalidState)?; let res_id = self.res_id.take().ok_or(Error::InvalidState)?; + let event_type = self.event_type.take().ok_or(Error::InvalidState)?; let payload_bytes = payload.into_payload(&ctx)?; - let event = if let Some(namespace) = namespace { - Event::with_namespace(namespace.to_string(), event.to_string(), payload_bytes, res_id) - } else { - Event::new(event.to_string(), payload_bytes, res_id) - }; + let event = Event::new( + namespace, + event.to_string(), + payload_bytes, + res_id, + event_type, + ); + self.event = Some(event); Ok(()) @@ -164,20 +201,28 @@ impl EventMetadata

{ pub struct EmitMetadata { event_metadata: Option>, stream: Option, - fut: Option> + Send + Sync>>>, + fut: Option> + Send + Sync>>>, } /// A metadata object returned after waiting for a reply to an event /// This object needs to be awaited for to get the actual reply pub struct EmitMetadataWithResponse { timeout: Option, - fut: Option> + Send + Sync>>>, + fut: Option> + Send + Sync>>>, emit_metadata: Option>, } impl EmitMetadata

{ #[inline] - pub(crate) fn new(ctx: Context, stream: SendStream, event_name: String, event_namespace: Option, payload: P, res_id: Option) -> Self { + pub(crate) fn new( + ctx: Context, + stream: SendStream, + event_name: String, + event_namespace: Option, + payload: P, + res_id: Option, + event_type: EventType, + ) -> Self { Self { event_metadata: Some(EventMetadata { event: None, @@ -186,6 +231,7 @@ impl EmitMetadata

{ event_namespace: Some(event_namespace), payload: Some(payload), res_id: Some(res_id), + event_type: Some(event_type), }), stream: Some(stream), fut: None, @@ -226,9 +272,12 @@ impl Future for EmitMetadata

{ let stream = poll_unwrap!(self.stream.take()); let event = match event_metadata.take_event() { - Ok(m) => { m } - Err(e) => { return Poll::Ready(Err(e)); } - }.expect("poll after future was done"); + Ok(m) => m, + Err(e) => { + return Poll::Ready(Err(e)); + } + } + .expect("poll after future was done"); self.fut = Some(Box::pin(async move { let event_id = event.id(); @@ -250,23 +299,28 @@ impl Future for EmitMetadataWithResponse fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { if self.fut.is_none() { let mut emit_metadata = poll_unwrap!(self.emit_metadata.take()); - let ctx = poll_unwrap!(emit_metadata.event_metadata.as_ref().and_then(|m| m.ctx.clone())); - let timeout = self.timeout.clone().unwrap_or(ctx.default_reply_timeout.clone()); + let ctx = poll_unwrap!(emit_metadata + .event_metadata + .as_ref() + .and_then(|m| m.ctx.clone())); + let timeout = self + .timeout + .clone() + .unwrap_or(ctx.default_reply_timeout.clone()); let event_id = match poll_unwrap!(emit_metadata.event_metadata.as_mut()).get_event() { - Ok(e) => { e.id() } - Err(e) => { return Poll::Ready(Err(e)); } + Ok(e) => e.id(), + Err(e) => { + return Poll::Ready(Err(e)); + } }; self.fut = Some(Box::pin(async move { let tx = ctx.register_reply_listener(event_id).await?; emit_metadata.await?; - let result = future::select( - Box::pin(tx), - Box::pin(tokio::time::sleep(timeout)), - ) - .await; + let result = + future::select(Box::pin(tx), Box::pin(tokio::time::sleep(timeout))).await; let reply = match result { Either::Left((tx_result, _)) => Ok(tx_result?), @@ -285,4 +339,4 @@ impl Future for EmitMetadataWithResponse } self.fut.as_mut().unwrap().as_mut().poll(cx) } -} \ No newline at end of file +} diff --git a/tests/test_serialization.rs b/tests/test_serialization.rs index ac53c485..7826eec3 100644 --- a/tests/test_serialization.rs +++ b/tests/test_serialization.rs @@ -2,7 +2,6 @@ use bromine::prelude::*; #[cfg(feature = "serialize")] use serde::{de::DeserializeOwned, Serialize}; -use std::fmt::Debug; #[cfg(feature = "serialize_rmp")] #[test] From 2fa63da081f31f64173452acb28a4a280c4952ae Mon Sep 17 00:00:00 2001 From: trivernis Date: Sun, 6 Feb 2022 11:23:10 +0100 Subject: [PATCH 02/12] Change error events to be identified by event type and not by name Signed-off-by: trivernis --- src/ipc/context.rs | 54 ++++++++++++++++++++++++--------------- src/ipc/mod.rs | 10 +++++--- src/ipc/stream_emitter.rs | 26 ++++++++++++++----- 3 files changed, 61 insertions(+), 29 deletions(-) diff --git a/src/ipc/context.rs b/src/ipc/context.rs index 9fb5172d..c939a376 100644 --- a/src/ipc/context.rs +++ b/src/ipc/context.rs @@ -1,20 +1,20 @@ use std::collections::HashMap; use std::mem; use std::ops::{Deref, DerefMut}; -use std::sync::Arc; use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; -use tokio::sync::{Mutex, oneshot, RwLock}; -use tokio::sync::oneshot::{Sender, Receiver}; +use tokio::sync::oneshot::{Receiver, Sender}; +use tokio::sync::{oneshot, Mutex, RwLock}; use tokio::time::Duration; use typemap_rev::TypeMap; use crate::error::{Error, Result}; -use crate::event::Event; +use crate::event::{Event, EventType}; use crate::ipc::stream_emitter::{EmitMetadata, StreamEmitter}; +use crate::payload::IntoPayload; #[cfg(feature = "serialize")] use crate::payload::{DynamicSerializer, SerdePayload}; -use crate::payload::IntoPayload; pub(crate) type ReplyListeners = Arc>>>; @@ -71,12 +71,26 @@ impl Context { } } - /// Emits an event with a given payload that can be serialized into bytes - pub fn emit, P: IntoPayload>( + /// Emits a raw event. Only for internal use + pub(crate) fn emit_raw( &self, - name: S, + name: &str, + namespace: Option, + event_type: EventType, payload: P, ) -> EmitMetadata

{ + self.emitter.emit_raw( + self.clone(), + self.ref_id.clone(), + name, + namespace, + event_type, + payload, + ) + } + + /// Emits an event with a given payload that can be serialized into bytes + pub fn emit, P: IntoPayload>(&self, name: S, payload: P) -> EmitMetadata

{ if let Some(ref_id) = &self.ref_id { self.emitter .emit_response(self.clone(), *ref_id, name, payload) @@ -149,16 +163,16 @@ pub struct PooledContext { } pub struct PoolGuard - where - T: Clone, +where + T: Clone, { inner: T, count: Arc, } impl Deref for PoolGuard - where - T: Clone, +where + T: Clone, { type Target = T; @@ -169,8 +183,8 @@ impl Deref for PoolGuard } impl DerefMut for PoolGuard - where - T: Clone, +where + T: Clone, { #[inline] fn deref_mut(&mut self) -> &mut Self::Target { @@ -179,8 +193,8 @@ impl DerefMut for PoolGuard } impl Clone for PoolGuard - where - T: Clone, +where + T: Clone, { #[inline] fn clone(&self) -> Self { @@ -194,8 +208,8 @@ impl Clone for PoolGuard } impl Drop for PoolGuard - where - T: Clone, +where + T: Clone, { #[inline] fn drop(&mut self) { @@ -204,8 +218,8 @@ impl Drop for PoolGuard } impl PoolGuard - where - T: Clone, +where + T: Clone, { pub(crate) fn new(inner: T) -> Self { Self { diff --git a/src/ipc/mod.rs b/src/ipc/mod.rs index 8d280315..0319f6aa 100644 --- a/src/ipc/mod.rs +++ b/src/ipc/mod.rs @@ -1,7 +1,8 @@ use std::collections::HashMap; use std::sync::Arc; -use crate::error_event::{ERROR_EVENT_NAME, ErrorEventData}; +use crate::error_event::{ErrorEventData, ERROR_EVENT_NAME}; +use crate::event::EventType; use crate::events::event_handler::EventHandler; use crate::namespaces::namespace::Namespace; use crate::prelude::*; @@ -60,13 +61,16 @@ fn handle_event(mut ctx: Context, handler: Arc, event: Event) { if let Err(e) = handler.handle_event(&ctx, event).await { // emit an error event if let Err(e) = ctx - .emit( + .emit_raw( ERROR_EVENT_NAME, + None, + EventType::Error, ErrorEventData { message: format!("{:?}", e), code: 500, }, - ).await + ) + .await { tracing::error!("Error occurred when sending error response: {:?}", e); } diff --git a/src/ipc/stream_emitter.rs b/src/ipc/stream_emitter.rs index ba6b9fc1..76164fc5 100644 --- a/src/ipc/stream_emitter.rs +++ b/src/ipc/stream_emitter.rs @@ -13,7 +13,7 @@ use tokio::sync::Mutex; use tracing; use crate::error::{Error, Result}; -use crate::error_event::{ErrorEventData, ERROR_EVENT_NAME}; +use crate::error_event::ErrorEventData; use crate::event::EventType; use crate::events::event::Event; use crate::ipc::context::Context; @@ -52,7 +52,7 @@ impl StreamEmitter { fn _emit( &self, ctx: Context, - namespace: Option<&str>, + namespace: Option, event: &str, payload: P, res_id: Option, @@ -62,7 +62,7 @@ impl StreamEmitter { ctx, self.stream.clone(), event.to_string(), - namespace.map(|n| n.to_string()), + namespace, payload, res_id, event_type, @@ -98,7 +98,7 @@ impl StreamEmitter { ) -> EmitMetadata

{ self._emit( ctx, - Some(namespace.as_ref()), + Some(namespace.as_ref().to_string()), event.as_ref(), payload, None, @@ -106,6 +106,20 @@ impl StreamEmitter { ) } + /// Emits a raw event + #[inline] + pub(crate) fn emit_raw( + &self, + ctx: Context, + res_id: Option, + event: &str, + namespace: Option, + event_type: EventType, + payload: P, + ) -> EmitMetadata

{ + self._emit(ctx, namespace, event, payload, res_id, event_type) + } + /// Emits a response to an event #[inline] pub(crate) fn emit_response, P: IntoPayload>( @@ -137,7 +151,7 @@ impl StreamEmitter { ) -> EmitMetadata

{ self._emit( ctx, - Some(namespace.as_ref()), + Some(namespace.as_ref().to_string()), event.as_ref(), payload, Some(event_id), @@ -330,7 +344,7 @@ impl Future for EmitMetadataWithResponse Err(Error::Timeout) } }?; - if reply.name() == ERROR_EVENT_NAME { + if reply.event_type() == EventType::Error { Err(reply.payload::()?.into()) } else { Ok(reply) From fbee60e64ba32f9b647cbe7c5e955e6fbe3d8b74 Mon Sep 17 00:00:00 2001 From: trivernis Date: Sun, 6 Feb 2022 12:34:05 +0100 Subject: [PATCH 03/12] Change handler result to contain a response Signed-off-by: trivernis --- src/events/error_event.rs | 1 + src/events/event_handler.rs | 36 +++++++++++++++++---- src/ipc/builder.rs | 13 +++++--- src/ipc/context.rs | 6 ++++ src/ipc/mod.rs | 52 +++++++++++++++++++++---------- src/lib.rs | 2 +- src/macros.rs | 2 +- src/namespaces/builder.rs | 3 +- tests/test_events_with_payload.rs | 14 +++------ tests/test_raw_events.rs | 20 +++++------- 10 files changed, 97 insertions(+), 52 deletions(-) diff --git a/src/events/error_event.rs b/src/events/error_event.rs index 2b6e160e..bff310a1 100644 --- a/src/events/error_event.rs +++ b/src/events/error_event.rs @@ -8,6 +8,7 @@ use std::fmt::{Display, Formatter}; use std::io::Read; pub static ERROR_EVENT_NAME: &str = "error"; +pub static END_EVENT_NAME: &str = "end"; /// Data returned on error event. /// The error event has a default handler that just logs that diff --git a/src/events/event_handler.rs b/src/events/event_handler.rs index 2ef18062..8b87f603 100644 --- a/src/events/event_handler.rs +++ b/src/events/event_handler.rs @@ -1,14 +1,38 @@ use crate::error::Result; use crate::events::event::Event; use crate::ipc::context::Context; +use crate::payload::{BytePayload, IntoPayload}; use std::collections::HashMap; use std::fmt::{Debug, Formatter}; use std::future::Future; use std::pin::Pin; use std::sync::Arc; +pub struct Response(Vec); + +impl Response { + /// Creates a new response with a given payload + pub fn payload(ctx: &Context, payload: P) -> Result { + let bytes = payload.into_payload(ctx)?; + + Ok(Self(bytes)) + } + + /// Creates an empty response + pub fn empty() -> Self { + Self(vec![]) + } + + pub(crate) fn into_byte_payload(self) -> BytePayload { + BytePayload::new(self.0) + } +} + type EventCallback = Arc< - dyn for<'a> Fn(&'a Context, Event) -> Pin> + Send + 'a)>> + dyn for<'a> Fn( + &'a Context, + Event, + ) -> Pin> + Send + 'a)>> + Send + Sync, >; @@ -46,7 +70,7 @@ impl EventHandler { F: for<'a> Fn( &'a Context, Event, - ) -> Pin> + Send + 'a)>> + ) -> Pin> + Send + 'a)>> + Send + Sync, { @@ -56,11 +80,11 @@ impl EventHandler { /// Handles a received event #[inline] #[tracing::instrument(level = "debug", skip(self, ctx, event))] - pub async fn handle_event(&self, ctx: &Context, event: Event) -> Result<()> { + pub async fn handle_event(&self, ctx: &Context, event: Event) -> Result { if let Some(cb) = self.callbacks.get(event.name()) { - cb.as_ref()(ctx, event).await?; + cb.as_ref()(ctx, event).await + } else { + Ok(Response::empty()) } - - Ok(()) } } diff --git a/src/ipc/builder.rs b/src/ipc/builder.rs index 58810081..0019dc21 100644 --- a/src/ipc/builder.rs +++ b/src/ipc/builder.rs @@ -1,5 +1,7 @@ use crate::error::{Error, Result}; -use crate::events::error_event::{ErrorEventData, ERROR_EVENT_NAME}; +use crate::error_event::ErrorEventData; +use crate::event_handler::Response; +use crate::events::error_event::ERROR_EVENT_NAME; use crate::events::event::Event; use crate::events::event_handler::EventHandler; use crate::ipc::client::IPCClient; @@ -24,6 +26,7 @@ use typemap_rev::{TypeMap, TypeMapKey}; /// use typemap_rev::TypeMapKey; /// use bromine::IPCBuilder; /// use tokio::net::TcpListener; +/// use bromine::prelude::Response; /// /// struct CustomKey; /// @@ -37,13 +40,13 @@ use typemap_rev::{TypeMap, TypeMapKey}; /// // register callback /// .on("ping", |_ctx, _event| Box::pin(async move { /// println!("Received ping event."); -/// Ok(()) +/// Ok(Response::empty()) /// })) /// // register a namespace /// .namespace("namespace") /// .on("namespace-event", |_ctx, _event| Box::pin(async move { /// println!("Namespace event."); -/// Ok(()) +/// Ok(Response::empty()) /// })) /// .build() /// // add context shared data @@ -75,7 +78,7 @@ where tracing::warn!(error_data.code); tracing::warn!("error_data.message = '{}'", error_data.message); - Ok(()) + Ok(Response::empty()) }) }); Self { @@ -102,7 +105,7 @@ where F: for<'a> Fn( &'a Context, Event, - ) -> Pin> + Send + 'a)>> + ) -> Pin> + Send + 'a)>> + Send + Sync, { diff --git a/src/ipc/context.rs b/src/ipc/context.rs index c939a376..658e0c15 100644 --- a/src/ipc/context.rs +++ b/src/ipc/context.rs @@ -15,6 +15,7 @@ use crate::ipc::stream_emitter::{EmitMetadata, StreamEmitter}; use crate::payload::IntoPayload; #[cfg(feature = "serialize")] use crate::payload::{DynamicSerializer, SerdePayload}; +use crate::prelude::Response; pub(crate) type ReplyListeners = Arc>>>; @@ -114,6 +115,11 @@ impl Context { } } + /// Ends the event flow by creating a final response + pub fn response(&self, payload: P) -> Result { + Response::payload(self, payload) + } + /// Registers a reply listener for a given event #[inline] #[tracing::instrument(level = "debug", skip(self))] diff --git a/src/ipc/mod.rs b/src/ipc/mod.rs index 0319f6aa..db9f456b 100644 --- a/src/ipc/mod.rs +++ b/src/ipc/mod.rs @@ -1,7 +1,7 @@ use std::collections::HashMap; use std::sync::Arc; -use crate::error_event::{ErrorEventData, ERROR_EVENT_NAME}; +use crate::error_event::{ErrorEventData, END_EVENT_NAME, ERROR_EVENT_NAME}; use crate::event::EventType; use crate::events::event_handler::EventHandler; use crate::namespaces::namespace::Namespace; @@ -41,6 +41,12 @@ async fn handle_connection( } tracing::trace!("No response listener found for event. Passing to regular listener."); } + + if event.event_type() == EventType::End { + tracing::debug!("Received dangling end event with no listener"); + continue; + } + if let Some(namespace) = event.namespace().clone().and_then(|n| namespaces.get(&n)) { tracing::trace!("Passing event to namespace listener"); let handler = Arc::clone(&namespace.handler); @@ -58,24 +64,36 @@ fn handle_event(mut ctx: Context, handler: Arc, event: Event) { ctx.set_ref_id(Some(event.id())); tokio::spawn(async move { - if let Err(e) = handler.handle_event(&ctx, event).await { - // emit an error event - if let Err(e) = ctx - .emit_raw( - ERROR_EVENT_NAME, - None, - EventType::Error, - ErrorEventData { - message: format!("{:?}", e), - code: 500, - }, - ) - .await - { - tracing::error!("Error occurred when sending error response: {:?}", e); + match handler.handle_event(&ctx, event).await { + Ok(r) => { + // emit the response under a unique name to prevent it being interpreted as a new + // event initiator + if let Err(e) = ctx + .emit_raw(END_EVENT_NAME, None, EventType::End, r.into_byte_payload()) + .await + { + tracing::error!("Error occurred when sending error response: {:?}", e); + } } + Err(e) => { + // emit an error event + if let Err(e) = ctx + .emit_raw( + ERROR_EVENT_NAME, + None, + EventType::Error, + ErrorEventData { + message: format!("{:?}", e), + code: 500, + }, + ) + .await + { + tracing::error!("Error occurred when sending error response: {:?}", e); + } - tracing::error!("Failed to handle event: {:?}", e); + tracing::error!("Failed to handle event: {:?}", e); + } } }); } diff --git a/src/lib.rs b/src/lib.rs index a251ef06..6c824af0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -134,7 +134,7 @@ pub mod prelude { pub use crate::error::Error as IPCError; pub use crate::error::Result as IPCResult; pub use crate::event::Event; - pub use crate::event_handler::EventHandler; + pub use crate::event_handler::{EventHandler, Response}; pub use crate::ipc::context::Context; pub use crate::ipc::context::{PoolGuard, PooledContext}; pub use crate::ipc::*; diff --git a/src/macros.rs b/src/macros.rs index 0107c03f..ff15f867 100644 --- a/src/macros.rs +++ b/src/macros.rs @@ -30,4 +30,4 @@ macro_rules! events{ $handler.on($name, callback!($cb)); )* } -} \ No newline at end of file +} diff --git a/src/namespaces/builder.rs b/src/namespaces/builder.rs index a048a053..aa5b0ebe 100644 --- a/src/namespaces/builder.rs +++ b/src/namespaces/builder.rs @@ -1,5 +1,6 @@ use crate::error::Result; use crate::event::Event; +use crate::event_handler::Response; use crate::events::event_handler::EventHandler; use crate::ipc::context::Context; use crate::namespaces::namespace::Namespace; @@ -32,7 +33,7 @@ where F: for<'a> Fn( &'a Context, Event, - ) -> Pin> + Send + 'a)>> + ) -> Pin> + Send + 'a)>> + Send + Sync, { diff --git a/tests/test_events_with_payload.rs b/tests/test_events_with_payload.rs index a00f8601..7c14ad7f 100644 --- a/tests/test_events_with_payload.rs +++ b/tests/test_events_with_payload.rs @@ -36,11 +36,7 @@ async fn it_receives_payloads() { number: 0, string: String::from("Hello World"), }; - let reply = ctx - .emit("ping", payload) - .await_reply() - .await - .unwrap(); + let reply = ctx.emit("ping", payload).await_reply().await.unwrap(); let reply_payload = reply.payload::().unwrap(); let counters = get_counter_from_context(&ctx).await; @@ -62,19 +58,19 @@ fn get_builder(port: u8) -> IPCBuilder { .timeout(Duration::from_millis(10)) } -async fn handle_ping_event(ctx: &Context, event: Event) -> IPCResult<()> { +async fn handle_ping_event(ctx: &Context, event: Event) -> IPCResult { increment_counter_for_event(ctx, &event).await; let payload = event.payload::()?; ctx.emit("pong", payload).await?; - Ok(()) + Ok(Response::empty()) } -async fn handle_pong_event(ctx: &Context, event: Event) -> IPCResult<()> { +async fn handle_pong_event(ctx: &Context, event: Event) -> IPCResult { increment_counter_for_event(ctx, &event).await; let _payload = event.payload::()?; - Ok(()) + Ok(Response::empty()) } #[cfg(feature = "serialize")] diff --git a/tests/test_raw_events.rs b/tests/test_raw_events.rs index e885b20d..c690f869 100644 --- a/tests/test_raw_events.rs +++ b/tests/test_raw_events.rs @@ -45,11 +45,7 @@ async fn it_sends_namespaced_events() { async fn it_receives_responses() { let port = get_free_port(); let ctx = get_client_with_server(port).await; - let reply = ctx - .emit("ping", EmptyPayload) - .await_reply() - .await - .unwrap(); + let reply = ctx.emit("ping", EmptyPayload).await_reply().await.unwrap(); let counter = get_counter_from_context(&ctx).await; assert_eq!(reply.name(), "pong"); @@ -108,29 +104,29 @@ fn get_builder(port: u8) -> IPCBuilder { .build() } -async fn handle_ping_event(ctx: &Context, event: Event) -> IPCResult<()> { +async fn handle_ping_event(ctx: &Context, event: Event) -> IPCResult { increment_counter_for_event(ctx, &event).await; ctx.emit("pong", EmptyPayload).await?; - Ok(()) + Ok(Response::empty()) } -async fn handle_pong_event(ctx: &Context, event: Event) -> IPCResult<()> { +async fn handle_pong_event(ctx: &Context, event: Event) -> IPCResult { increment_counter_for_event(ctx, &event).await; - Ok(()) + Ok(Response::empty()) } -async fn handle_create_error_event(ctx: &Context, event: Event) -> IPCResult<()> { +async fn handle_create_error_event(ctx: &Context, event: Event) -> IPCResult { increment_counter_for_event(ctx, &event).await; Err(IPCError::from("Test Error")) } -async fn handle_error_event(ctx: &Context, event: Event) -> IPCResult<()> { +async fn handle_error_event(ctx: &Context, event: Event) -> IPCResult { increment_counter_for_event(ctx, &event).await; - Ok(()) + Ok(Response::empty()) } pub struct EmptyPayload; From 25277634944e6605f781ea2ff9349a77fd95d2ed Mon Sep 17 00:00:00 2001 From: trivernis Date: Sun, 6 Feb 2022 12:38:07 +0100 Subject: [PATCH 04/12] Fix compile errors Signed-off-by: trivernis --- src/lib.rs | 16 ++++++++-------- tests/test_serialization.rs | 1 + 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 6c824af0..0b898607 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -6,19 +6,19 @@ //! use tokio::net::TcpListener; //! //! /// Callback ping function -//! async fn handle_ping(ctx: &Context, event: Event) -> IPCResult<()> { +//! async fn handle_ping(ctx: &Context, event: Event) -> IPCResult { //! println!("Received ping event."); //! ctx.emit("pong", ()).await?; //! -//! Ok(()) +//! Ok(Response::empty()) //! } //! //! pub struct MyNamespace; //! //! impl MyNamespace { -//! async fn ping(_ctx: &Context, _event: Event) -> IPCResult<()> { +//! async fn ping(_ctx: &Context, _event: Event) -> IPCResult { //! println!("My namespace received a ping"); -//! Ok(()) +//! Ok(Response::empty()) //! } //! } //! @@ -46,7 +46,7 @@ //! .on("something", callback!(ctx, event, async move { //! println!("I think the server did something"); //! ctx.emit_to("mainspace-server", "ok", ()).await?; -//! Ok(()) +//! Ok(Response::empty()) //! })) //! .build() //! .add_namespace(namespace!(MyNamespace)) @@ -63,7 +63,7 @@ //! use std::net::ToSocketAddrs; //! use typemap_rev::TypeMapKey; //! use bromine::IPCBuilder; -//! use bromine::callback; +//! use bromine::prelude::*; //! use tokio::net::TcpListener; //! //! struct MyKey; @@ -80,7 +80,7 @@ //! .on("ping", callback!(ctx, event, async move { //! println!("Received ping event."); //! ctx.emit("pong", ()).await?; -//! Ok(()) +//! Ok(Response::empty()) //! })) //! .namespace("mainspace-server") //! .on("do-something", callback!(ctx, event, async move { @@ -92,7 +92,7 @@ //! *my_key += 1; //! } //! ctx.emit_to("mainspace-client", "something", ()).await?; -//! Ok(()) +//! Ok(Response::empty()) //! })) //! .build() //! // store additional data diff --git a/tests/test_serialization.rs b/tests/test_serialization.rs index 7826eec3..ac53c485 100644 --- a/tests/test_serialization.rs +++ b/tests/test_serialization.rs @@ -2,6 +2,7 @@ use bromine::prelude::*; #[cfg(feature = "serialize")] use serde::{de::DeserializeOwned, Serialize}; +use std::fmt::Debug; #[cfg(feature = "serialize_rmp")] #[test] From 51f53e7fbe88b21fcfe328bc2a611b7f020ef494 Mon Sep 17 00:00:00 2001 From: trivernis Date: Sun, 6 Feb 2022 12:45:17 +0100 Subject: [PATCH 05/12] Fix errors in benchmarks Signed-off-by: trivernis --- benches/deserialization_benchmark.rs | 4 ++-- benches/serialization_benchmark.rs | 2 +- src/events/event.rs | 18 ++++-------------- 3 files changed, 7 insertions(+), 17 deletions(-) diff --git a/benches/deserialization_benchmark.rs b/benches/deserialization_benchmark.rs index af65bea2..3428c2d6 100644 --- a/benches/deserialization_benchmark.rs +++ b/benches/deserialization_benchmark.rs @@ -3,13 +3,13 @@ use criterion::{criterion_group, criterion_main}; use criterion::{BatchSize, Criterion}; use std::io::Cursor; -use bromine::event::Event; +use bromine::event::{Event, EventType}; use tokio::runtime::Runtime; pub const EVENT_NAME: &str = "bench_event"; fn create_event_bytes_reader(data_size: usize) -> Cursor> { - let bytes = Event::new(EVENT_NAME.to_string(), vec![0u8; data_size], None) + let bytes = Event::initiator(None, EVENT_NAME.to_string(), vec![0u8; data_size]) .into_bytes() .unwrap(); Cursor::new(bytes) diff --git a/benches/serialization_benchmark.rs b/benches/serialization_benchmark.rs index 8f9f6e98..2d25643b 100644 --- a/benches/serialization_benchmark.rs +++ b/benches/serialization_benchmark.rs @@ -6,7 +6,7 @@ use criterion::{ pub const EVENT_NAME: &str = "bench_event"; fn create_event(data_size: usize) -> Event { - Event::new(EVENT_NAME.to_string(), vec![0u8; data_size], None) + Event::initiator(None, EVENT_NAME.to_string(), vec![0u8; data_size]) } fn event_serialization(c: &mut Criterion) { diff --git a/src/events/event.rs b/src/events/event.rs index f94df91c..c6e33ddc 100644 --- a/src/events/event.rs +++ b/src/events/event.rs @@ -41,31 +41,21 @@ impl Event { /// Creates a new event that acts as an initiator for further response events #[tracing::instrument(level = "trace", skip(data))] #[inline] - pub(crate) fn initiator(namespace: Option, name: String, data: Vec) -> Self { + pub fn initiator(namespace: Option, name: String, data: Vec) -> Self { Self::new(namespace, name, data, None, EventType::Initiator) } /// Creates a new event that is a response to a previous event #[tracing::instrument(level = "trace", skip(data))] #[inline] - pub(crate) fn response( - namespace: Option, - name: String, - data: Vec, - ref_id: u64, - ) -> Self { + pub fn response(namespace: Option, name: String, data: Vec, ref_id: u64) -> Self { Self::new(namespace, name, data, Some(ref_id), EventType::Response) } /// Creates a new error event as a response to a previous event #[tracing::instrument(level = "trace", skip(data))] #[inline] - pub(crate) fn error( - namespace: Option, - name: String, - data: Vec, - ref_id: u64, - ) -> Self { + pub fn error(namespace: Option, name: String, data: Vec, ref_id: u64) -> Self { Self::new(namespace, name, data, Some(ref_id), EventType::Error) } @@ -73,7 +63,7 @@ impl Event { /// and might contain a final response payload #[tracing::instrument(level = "trace", skip(data))] #[inline] - pub(crate) fn end(namespace: Option, name: String, data: Vec, ref_id: u64) -> Self { + pub fn end(namespace: Option, name: String, data: Vec, ref_id: u64) -> Self { Self::new(namespace, name, data, Some(ref_id), EventType::Response) } From e316d29807270c907a010d1cfa849c52629ca2c7 Mon Sep 17 00:00:00 2001 From: trivernis Date: Sun, 6 Feb 2022 13:08:17 +0100 Subject: [PATCH 06/12] Move emit metadata to separate modules Signed-off-by: trivernis --- src/ipc/context.rs | 5 +- src/ipc/stream_emitter.rs | 356 ------------------ src/ipc/stream_emitter/emit_metadata.rs | 92 +++++ .../emit_metadata_with_response.rs | 80 ++++ src/ipc/stream_emitter/event_metadata.rs | 54 +++ src/ipc/stream_emitter/mod.rs | 166 ++++++++ 6 files changed, 395 insertions(+), 358 deletions(-) delete mode 100644 src/ipc/stream_emitter.rs create mode 100644 src/ipc/stream_emitter/emit_metadata.rs create mode 100644 src/ipc/stream_emitter/emit_metadata_with_response.rs create mode 100644 src/ipc/stream_emitter/event_metadata.rs create mode 100644 src/ipc/stream_emitter/mod.rs diff --git a/src/ipc/context.rs b/src/ipc/context.rs index 658e0c15..b7f8b51a 100644 --- a/src/ipc/context.rs +++ b/src/ipc/context.rs @@ -5,13 +5,14 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use tokio::sync::oneshot::{Receiver, Sender}; -use tokio::sync::{oneshot, Mutex, RwLock}; +use tokio::sync::{Mutex, oneshot, RwLock}; use tokio::time::Duration; use typemap_rev::TypeMap; use crate::error::{Error, Result}; use crate::event::{Event, EventType}; -use crate::ipc::stream_emitter::{EmitMetadata, StreamEmitter}; +use crate::ipc::stream_emitter::StreamEmitter; +use crate::ipc::stream_emitter::emit_metadata::EmitMetadata; use crate::payload::IntoPayload; #[cfg(feature = "serialize")] use crate::payload::{DynamicSerializer, SerdePayload}; diff --git a/src/ipc/stream_emitter.rs b/src/ipc/stream_emitter.rs deleted file mode 100644 index 76164fc5..00000000 --- a/src/ipc/stream_emitter.rs +++ /dev/null @@ -1,356 +0,0 @@ -use std::future::Future; -use std::mem; -use std::ops::DerefMut; -use std::pin::Pin; -use std::sync::Arc; -use std::task::Poll; -use std::time::Duration; - -use futures::future; -use futures::future::Either; -use tokio::io::{AsyncWrite, AsyncWriteExt}; -use tokio::sync::Mutex; -use tracing; - -use crate::error::{Error, Result}; -use crate::error_event::ErrorEventData; -use crate::event::EventType; -use crate::events::event::Event; -use crate::ipc::context::Context; -use crate::payload::IntoPayload; -use crate::protocol::AsyncProtocolStream; - -macro_rules! poll_unwrap { - ($val:expr) => { - if let Some(v) = $val { - v - } else { - tracing::error!("Polling a future with an invalid state."); - return Poll::Ready(Err(Error::InvalidState)); - } - }; -} - -type SendStream = Arc>; - -/// An abstraction over any type that implements the AsyncProtocolStream trait -/// to emit events and share a connection across multiple -/// contexts. -#[derive(Clone)] -pub struct StreamEmitter { - stream: SendStream, -} - -impl StreamEmitter { - pub fn new(stream: P::OwnedSplitWriteHalf) -> Self { - Self { - stream: Arc::new(Mutex::new(stream)), - } - } - - #[tracing::instrument(level = "trace", skip(self, ctx, payload))] - fn _emit( - &self, - ctx: Context, - namespace: Option, - event: &str, - payload: P, - res_id: Option, - event_type: EventType, - ) -> EmitMetadata

{ - EmitMetadata::new( - ctx, - self.stream.clone(), - event.to_string(), - namespace, - payload, - res_id, - event_type, - ) - } - - /// Emits an event - #[inline] - pub(crate) fn emit, P: IntoPayload>( - &self, - ctx: Context, - event: S, - payload: P, - ) -> EmitMetadata

{ - self._emit( - ctx, - None, - event.as_ref(), - payload, - None, - EventType::Initiator, - ) - } - - /// Emits an event to a specific namespace - #[inline] - pub(crate) fn emit_to, S2: AsRef, P: IntoPayload>( - &self, - ctx: Context, - namespace: S1, - event: S2, - payload: P, - ) -> EmitMetadata

{ - self._emit( - ctx, - Some(namespace.as_ref().to_string()), - event.as_ref(), - payload, - None, - EventType::Initiator, - ) - } - - /// Emits a raw event - #[inline] - pub(crate) fn emit_raw( - &self, - ctx: Context, - res_id: Option, - event: &str, - namespace: Option, - event_type: EventType, - payload: P, - ) -> EmitMetadata

{ - self._emit(ctx, namespace, event, payload, res_id, event_type) - } - - /// Emits a response to an event - #[inline] - pub(crate) fn emit_response, P: IntoPayload>( - &self, - ctx: Context, - event_id: u64, - event: S, - payload: P, - ) -> EmitMetadata

{ - self._emit( - ctx, - None, - event.as_ref(), - payload, - Some(event_id), - EventType::Response, - ) - } - - /// Emits a response to an event to a namespace - #[inline] - pub(crate) fn emit_response_to, S2: AsRef, P: IntoPayload>( - &self, - ctx: Context, - event_id: u64, - namespace: S1, - event: S2, - payload: P, - ) -> EmitMetadata

{ - self._emit( - ctx, - Some(namespace.as_ref().to_string()), - event.as_ref(), - payload, - Some(event_id), - EventType::Response, - ) - } -} - -struct EventMetadata { - event: Option, - ctx: Option, - event_namespace: Option>, - event_name: Option, - res_id: Option>, - event_type: Option, - payload: Option

, -} - -impl EventMetadata

{ - pub fn get_event(&mut self) -> Result<&Event> { - if self.event.is_none() { - self.build_event()?; - } - Ok(self.event.as_ref().unwrap()) - } - - pub fn take_event(mut self) -> Result> { - if self.event.is_none() { - self.build_event()?; - } - Ok(mem::take(&mut self.event)) - } - - fn build_event(&mut self) -> Result<()> { - let ctx = self.ctx.take().ok_or(Error::InvalidState)?; - let event = self.event_name.take().ok_or(Error::InvalidState)?; - let namespace = self.event_namespace.take().ok_or(Error::InvalidState)?; - let payload = self.payload.take().ok_or(Error::InvalidState)?; - let res_id = self.res_id.take().ok_or(Error::InvalidState)?; - let event_type = self.event_type.take().ok_or(Error::InvalidState)?; - let payload_bytes = payload.into_payload(&ctx)?; - - let event = Event::new( - namespace, - event.to_string(), - payload_bytes, - res_id, - event_type, - ); - - self.event = Some(event); - - Ok(()) - } -} - -/// A metadata object returned after emitting an event. -/// To send the event this object needs to be awaited -/// This object can be used to wait for a response to an event. -/// The result contains the emitted event id. -pub struct EmitMetadata { - event_metadata: Option>, - stream: Option, - fut: Option> + Send + Sync>>>, -} - -/// A metadata object returned after waiting for a reply to an event -/// This object needs to be awaited for to get the actual reply -pub struct EmitMetadataWithResponse { - timeout: Option, - fut: Option> + Send + Sync>>>, - emit_metadata: Option>, -} - -impl EmitMetadata

{ - #[inline] - pub(crate) fn new( - ctx: Context, - stream: SendStream, - event_name: String, - event_namespace: Option, - payload: P, - res_id: Option, - event_type: EventType, - ) -> Self { - Self { - event_metadata: Some(EventMetadata { - event: None, - ctx: Some(ctx), - event_name: Some(event_name), - event_namespace: Some(event_namespace), - payload: Some(payload), - res_id: Some(res_id), - event_type: Some(event_type), - }), - stream: Some(stream), - fut: None, - } - } - - /// Waits for a reply to the given message. - #[tracing::instrument(skip(self), fields(self.message_id))] - pub fn await_reply(self) -> EmitMetadataWithResponse

{ - EmitMetadataWithResponse { - timeout: None, - fut: None, - emit_metadata: Some(self), - } - } -} - -impl EmitMetadataWithResponse

{ - /// Sets a timeout for awaiting replies to this emitted event - #[inline] - pub fn with_timeout(mut self, timeout: Duration) -> Self { - self.timeout = Some(timeout); - - self - } -} - -impl Unpin for EmitMetadata

{} - -impl Unpin for EmitMetadataWithResponse

{} - -impl Future for EmitMetadata

{ - type Output = Result; - - fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { - if self.fut.is_none() { - let event_metadata = poll_unwrap!(self.event_metadata.take()); - let stream = poll_unwrap!(self.stream.take()); - - let event = match event_metadata.take_event() { - Ok(m) => m, - Err(e) => { - return Poll::Ready(Err(e)); - } - } - .expect("poll after future was done"); - - self.fut = Some(Box::pin(async move { - let event_id = event.id(); - let event_bytes = event.into_bytes()?; - let mut stream = stream.lock().await; - stream.deref_mut().write_all(&event_bytes[..]).await?; - tracing::trace!(bytes_len = event_bytes.len()); - - Ok(event_id) - })); - } - self.fut.as_mut().unwrap().as_mut().poll(cx) - } -} - -impl Future for EmitMetadataWithResponse

{ - type Output = Result; - - fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { - if self.fut.is_none() { - let mut emit_metadata = poll_unwrap!(self.emit_metadata.take()); - let ctx = poll_unwrap!(emit_metadata - .event_metadata - .as_ref() - .and_then(|m| m.ctx.clone())); - let timeout = self - .timeout - .clone() - .unwrap_or(ctx.default_reply_timeout.clone()); - - let event_id = match poll_unwrap!(emit_metadata.event_metadata.as_mut()).get_event() { - Ok(e) => e.id(), - Err(e) => { - return Poll::Ready(Err(e)); - } - }; - - self.fut = Some(Box::pin(async move { - let tx = ctx.register_reply_listener(event_id).await?; - emit_metadata.await?; - - let result = - future::select(Box::pin(tx), Box::pin(tokio::time::sleep(timeout))).await; - - let reply = match result { - Either::Left((tx_result, _)) => Ok(tx_result?), - Either::Right(_) => { - let mut listeners = ctx.reply_listeners.lock().await; - listeners.remove(&event_id); - Err(Error::Timeout) - } - }?; - if reply.event_type() == EventType::Error { - Err(reply.payload::()?.into()) - } else { - Ok(reply) - } - })) - } - self.fut.as_mut().unwrap().as_mut().poll(cx) - } -} diff --git a/src/ipc/stream_emitter/emit_metadata.rs b/src/ipc/stream_emitter/emit_metadata.rs new file mode 100644 index 00000000..ebc4f559 --- /dev/null +++ b/src/ipc/stream_emitter/emit_metadata.rs @@ -0,0 +1,92 @@ +use crate::context::Context; +use crate::error::Error; +use crate::event::EventType; +use crate::ipc::stream_emitter::emit_metadata_with_response::EmitMetadataWithResponse; +use crate::ipc::stream_emitter::event_metadata::EventMetadata; +use crate::ipc::stream_emitter::SendStream; +use crate::payload::IntoPayload; +use crate::{error, poll_unwrap}; +use std::future::Future; +use std::ops::DerefMut; +use std::pin::Pin; +use std::task::Poll; +use tokio::io::AsyncWriteExt; + +/// A metadata object returned after emitting an event. +/// To send the event this object needs to be awaited +/// This object can be used to wait for a response to an event. +/// The result contains the emitted event id. +pub struct EmitMetadata { + pub(crate) event_metadata: Option>, + stream: Option, + fut: Option> + Send + Sync>>>, +} + +impl EmitMetadata

{ + #[inline] + pub(crate) fn new( + ctx: Context, + stream: SendStream, + event_name: String, + event_namespace: Option, + payload: P, + res_id: Option, + event_type: EventType, + ) -> Self { + Self { + event_metadata: Some(EventMetadata { + event: None, + ctx: Some(ctx), + event_name: Some(event_name), + event_namespace: Some(event_namespace), + payload: Some(payload), + res_id: Some(res_id), + event_type: Some(event_type), + }), + stream: Some(stream), + fut: None, + } + } + + /// Waits for a reply to the given message. + #[tracing::instrument(skip(self), fields(self.message_id))] + pub fn await_reply(self) -> EmitMetadataWithResponse

{ + EmitMetadataWithResponse { + timeout: None, + fut: None, + emit_metadata: Some(self), + } + } +} + +impl Unpin for EmitMetadata

{} + +impl Future for EmitMetadata

{ + type Output = error::Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { + if self.fut.is_none() { + let event_metadata = poll_unwrap!(self.event_metadata.take()); + let stream = poll_unwrap!(self.stream.take()); + + let event = match event_metadata.take_event() { + Ok(m) => m, + Err(e) => { + return Poll::Ready(Err(e)); + } + } + .expect("poll after future was done"); + + self.fut = Some(Box::pin(async move { + let event_id = event.id(); + let event_bytes = event.into_bytes()?; + let mut stream = stream.lock().await; + stream.deref_mut().write_all(&event_bytes[..]).await?; + tracing::trace!(bytes_len = event_bytes.len()); + + Ok(event_id) + })); + } + self.fut.as_mut().unwrap().as_mut().poll(cx) + } +} diff --git a/src/ipc/stream_emitter/emit_metadata_with_response.rs b/src/ipc/stream_emitter/emit_metadata_with_response.rs new file mode 100644 index 00000000..cdbf4d54 --- /dev/null +++ b/src/ipc/stream_emitter/emit_metadata_with_response.rs @@ -0,0 +1,80 @@ +use crate::error::Error; +use crate::error_event::ErrorEventData; +use crate::event::{Event, EventType}; +use crate::ipc::stream_emitter::emit_metadata::EmitMetadata; +use crate::payload::IntoPayload; +use crate::{error, poll_unwrap}; +use futures::future; +use futures::future::Either; +use std::future::Future; +use std::pin::Pin; +use std::task::Poll; +use std::time::Duration; + +/// A metadata object returned after waiting for a reply to an event +/// This object needs to be awaited for to get the actual reply +pub struct EmitMetadataWithResponse { + pub(crate) timeout: Option, + pub(crate) fut: Option> + Send + Sync>>>, + pub(crate) emit_metadata: Option>, +} + +impl EmitMetadataWithResponse

{ + /// Sets a timeout for awaiting replies to this emitted event + #[inline] + pub fn with_timeout(mut self, timeout: Duration) -> Self { + self.timeout = Some(timeout); + + self + } +} + +impl Unpin for EmitMetadataWithResponse

{} + +impl Future for EmitMetadataWithResponse

{ + type Output = error::Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { + if self.fut.is_none() { + let mut emit_metadata = poll_unwrap!(self.emit_metadata.take()); + let ctx = poll_unwrap!(emit_metadata + .event_metadata + .as_ref() + .and_then(|m| m.ctx.clone())); + let timeout = self + .timeout + .clone() + .unwrap_or(ctx.default_reply_timeout.clone()); + + let event_id = match poll_unwrap!(emit_metadata.event_metadata.as_mut()).get_event() { + Ok(e) => e.id(), + Err(e) => { + return Poll::Ready(Err(e)); + } + }; + + self.fut = Some(Box::pin(async move { + let tx = ctx.register_reply_listener(event_id).await?; + emit_metadata.await?; + + let result = + future::select(Box::pin(tx), Box::pin(tokio::time::sleep(timeout))).await; + + let reply = match result { + Either::Left((tx_result, _)) => Ok(tx_result?), + Either::Right(_) => { + let mut listeners = ctx.reply_listeners.lock().await; + listeners.remove(&event_id); + Err(Error::Timeout) + } + }?; + if reply.event_type() == EventType::Error { + Err(reply.payload::()?.into()) + } else { + Ok(reply) + } + })) + } + self.fut.as_mut().unwrap().as_mut().poll(cx) + } +} diff --git a/src/ipc/stream_emitter/event_metadata.rs b/src/ipc/stream_emitter/event_metadata.rs new file mode 100644 index 00000000..c9a834f3 --- /dev/null +++ b/src/ipc/stream_emitter/event_metadata.rs @@ -0,0 +1,54 @@ +use crate::context::Context; +use crate::error; +use crate::error::Error; +use crate::event::{Event, EventType}; +use crate::payload::IntoPayload; +use std::mem; + +pub(crate) struct EventMetadata { + pub(crate) event: Option, + pub(crate) ctx: Option, + pub(crate) event_namespace: Option>, + pub(crate) event_name: Option, + pub(crate) res_id: Option>, + pub(crate) event_type: Option, + pub(crate) payload: Option

, +} + +impl EventMetadata

{ + pub fn get_event(&mut self) -> error::Result<&Event> { + if self.event.is_none() { + self.build_event()?; + } + Ok(self.event.as_ref().unwrap()) + } + + pub fn take_event(mut self) -> error::Result> { + if self.event.is_none() { + self.build_event()?; + } + Ok(mem::take(&mut self.event)) + } + + fn build_event(&mut self) -> error::Result<()> { + let ctx = self.ctx.take().ok_or(Error::InvalidState)?; + let event = self.event_name.take().ok_or(Error::InvalidState)?; + let namespace = self.event_namespace.take().ok_or(Error::InvalidState)?; + let payload = self.payload.take().ok_or(Error::InvalidState)?; + let res_id = self.res_id.take().ok_or(Error::InvalidState)?; + let event_type = self.event_type.take().ok_or(Error::InvalidState)?; + let payload_bytes = payload.into_payload(&ctx)?; + + let event = Event::new( + namespace, + event.to_string(), + payload_bytes, + res_id, + event_type, + ); + + self.event = Some(event); + + Ok(()) + } +} diff --git a/src/ipc/stream_emitter/mod.rs b/src/ipc/stream_emitter/mod.rs new file mode 100644 index 00000000..b018f74f --- /dev/null +++ b/src/ipc/stream_emitter/mod.rs @@ -0,0 +1,166 @@ +pub mod emit_metadata; +pub mod emit_metadata_with_response; +mod event_metadata; + +use std::future::Future; +use std::mem; +use std::ops::DerefMut; +use std::pin::Pin; +use std::sync::Arc; +use std::task::Poll; +use std::time::Duration; + +use emit_metadata::EmitMetadata; +use event_metadata::EventMetadata; +use futures::future; +use futures::future::Either; +use tokio::io::{AsyncWrite, AsyncWriteExt}; +use tokio::sync::Mutex; +use tracing; + +use crate::error::Result; +use crate::event::EventType; +use crate::ipc::context::Context; +use crate::payload::IntoPayload; +use crate::protocol::AsyncProtocolStream; + +#[macro_export] +macro_rules! poll_unwrap { + ($val:expr) => { + if let Some(v) = $val { + v + } else { + tracing::error!("Polling a future with an invalid state."); + return Poll::Ready(Err(Error::InvalidState)); + } + }; +} + +type SendStream = Arc>; + +/// An abstraction over any type that implements the AsyncProtocolStream trait +/// to emit events and share a connection across multiple +/// contexts. +#[derive(Clone)] +pub struct StreamEmitter { + stream: SendStream, +} + +impl StreamEmitter { + pub fn new(stream: P::OwnedSplitWriteHalf) -> Self { + Self { + stream: Arc::new(Mutex::new(stream)), + } + } + + #[tracing::instrument(level = "trace", skip(self, ctx, payload))] + fn _emit( + &self, + ctx: Context, + namespace: Option, + event: &str, + payload: P, + res_id: Option, + event_type: EventType, + ) -> EmitMetadata

{ + EmitMetadata::new( + ctx, + self.stream.clone(), + event.to_string(), + namespace, + payload, + res_id, + event_type, + ) + } + + /// Emits an event + #[inline] + pub(crate) fn emit, P: IntoPayload>( + &self, + ctx: Context, + event: S, + payload: P, + ) -> EmitMetadata

{ + self._emit( + ctx, + None, + event.as_ref(), + payload, + None, + EventType::Initiator, + ) + } + + /// Emits an event to a specific namespace + #[inline] + pub(crate) fn emit_to, S2: AsRef, P: IntoPayload>( + &self, + ctx: Context, + namespace: S1, + event: S2, + payload: P, + ) -> EmitMetadata

{ + self._emit( + ctx, + Some(namespace.as_ref().to_string()), + event.as_ref(), + payload, + None, + EventType::Initiator, + ) + } + + /// Emits a raw event + #[inline] + pub(crate) fn emit_raw( + &self, + ctx: Context, + res_id: Option, + event: &str, + namespace: Option, + event_type: EventType, + payload: P, + ) -> EmitMetadata

{ + self._emit(ctx, namespace, event, payload, res_id, event_type) + } + + /// Emits a response to an event + #[inline] + pub(crate) fn emit_response, P: IntoPayload>( + &self, + ctx: Context, + event_id: u64, + event: S, + payload: P, + ) -> EmitMetadata

{ + self._emit( + ctx, + None, + event.as_ref(), + payload, + Some(event_id), + EventType::Response, + ) + } + + /// Emits a response to an event to a namespace + #[inline] + pub(crate) fn emit_response_to, S2: AsRef, P: IntoPayload>( + &self, + ctx: Context, + event_id: u64, + namespace: S1, + event: S2, + payload: P, + ) -> EmitMetadata

{ + self._emit( + ctx, + Some(namespace.as_ref().to_string()), + event.as_ref(), + payload, + Some(event_id), + EventType::Response, + ) + } +} From 619a0173f09769b3bd9a340cee5f988deb57db27 Mon Sep 17 00:00:00 2001 From: trivernis Date: Sun, 6 Feb 2022 13:12:58 +0100 Subject: [PATCH 07/12] Fix style issues Signed-off-by: trivernis --- benches/deserialization_benchmark.rs | 2 +- src/ipc/stream_emitter/mod.rs | 22 +++++++++++----------- tests/test_serialization.rs | 2 ++ 3 files changed, 14 insertions(+), 12 deletions(-) diff --git a/benches/deserialization_benchmark.rs b/benches/deserialization_benchmark.rs index 3428c2d6..74984d83 100644 --- a/benches/deserialization_benchmark.rs +++ b/benches/deserialization_benchmark.rs @@ -3,7 +3,7 @@ use criterion::{criterion_group, criterion_main}; use criterion::{BatchSize, Criterion}; use std::io::Cursor; -use bromine::event::{Event, EventType}; +use bromine::event::{Event}; use tokio::runtime::Runtime; pub const EVENT_NAME: &str = "bench_event"; diff --git a/src/ipc/stream_emitter/mod.rs b/src/ipc/stream_emitter/mod.rs index b018f74f..ded33cd5 100644 --- a/src/ipc/stream_emitter/mod.rs +++ b/src/ipc/stream_emitter/mod.rs @@ -2,23 +2,23 @@ pub mod emit_metadata; pub mod emit_metadata_with_response; mod event_metadata; -use std::future::Future; -use std::mem; -use std::ops::DerefMut; -use std::pin::Pin; + + + + use std::sync::Arc; -use std::task::Poll; -use std::time::Duration; + + use emit_metadata::EmitMetadata; -use event_metadata::EventMetadata; -use futures::future; -use futures::future::Either; -use tokio::io::{AsyncWrite, AsyncWriteExt}; + + + +use tokio::io::{AsyncWrite}; use tokio::sync::Mutex; use tracing; -use crate::error::Result; + use crate::event::EventType; use crate::ipc::context::Context; use crate::payload::IntoPayload; diff --git a/tests/test_serialization.rs b/tests/test_serialization.rs index ac53c485..7aa71f07 100644 --- a/tests/test_serialization.rs +++ b/tests/test_serialization.rs @@ -2,6 +2,8 @@ use bromine::prelude::*; #[cfg(feature = "serialize")] use serde::{de::DeserializeOwned, Serialize}; + +#[cfg(feature = "serialize")] use std::fmt::Debug; #[cfg(feature = "serialize_rmp")] From 9cc7d1ffe8ad00e1bba41d0e86f3aa06ea61ca15 Mon Sep 17 00:00:00 2001 From: trivernis Date: Sun, 6 Feb 2022 15:03:29 +0100 Subject: [PATCH 08/12] Add asynchronous response streams Signed-off-by: trivernis --- Cargo.lock | 1 + Cargo.toml | 1 + src/ipc/context.rs | 20 +-- src/ipc/mod.rs | 4 +- src/ipc/stream_emitter/emit_metadata.rs | 9 ++ .../emit_metadata_with_response.rs | 26 +-- .../emit_metadata_with_response_stream.rs | 150 ++++++++++++++++++ src/ipc/stream_emitter/mod.rs | 17 +- src/lib.rs | 1 + tests/test_event_streams.rs | 88 ++++++++++ 10 files changed, 282 insertions(+), 35 deletions(-) create mode 100644 src/ipc/stream_emitter/emit_metadata_with_response_stream.rs create mode 100644 tests/test_event_streams.rs diff --git a/Cargo.lock b/Cargo.lock index 347cf69e..9951cb54 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -101,6 +101,7 @@ dependencies = [ "criterion", "crossbeam-utils", "futures", + "futures-core", "lazy_static", "num_enum", "postcard", diff --git a/Cargo.toml b/Cargo.toml index 5f58824f..68dcbaa9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,6 +28,7 @@ byteorder = "1.4.3" async-trait = "0.1.52" futures = "0.3.19" num_enum = "0.5.6" +futures-core = "0.3.19" rmp-serde = {version = "0.15.5", optional = true} bincode = {version = "1.3.3", optional = true} serde_json = {version = "1.0.73", optional = true} diff --git a/src/ipc/context.rs b/src/ipc/context.rs index b7f8b51a..1f0e081e 100644 --- a/src/ipc/context.rs +++ b/src/ipc/context.rs @@ -4,21 +4,21 @@ use std::ops::{Deref, DerefMut}; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; -use tokio::sync::oneshot::{Receiver, Sender}; -use tokio::sync::{Mutex, oneshot, RwLock}; +use tokio::sync::mpsc::Receiver; +use tokio::sync::{mpsc, oneshot, Mutex, RwLock}; use tokio::time::Duration; use typemap_rev::TypeMap; use crate::error::{Error, Result}; use crate::event::{Event, EventType}; -use crate::ipc::stream_emitter::StreamEmitter; use crate::ipc::stream_emitter::emit_metadata::EmitMetadata; +use crate::ipc::stream_emitter::StreamEmitter; use crate::payload::IntoPayload; #[cfg(feature = "serialize")] use crate::payload::{DynamicSerializer, SerdePayload}; use crate::prelude::Response; -pub(crate) type ReplyListeners = Arc>>>; +pub(crate) type ReplyListeners = Arc>>>; /// An object provided to each callback function. /// Currently it only holds the event emitter to emit response events in event callbacks. @@ -40,7 +40,7 @@ pub struct Context { /// Field to store additional context data pub data: Arc>, - stop_sender: Arc>>>, + stop_sender: Arc>>>, pub(crate) reply_listeners: ReplyListeners, @@ -56,7 +56,7 @@ impl Context { pub(crate) fn new( emitter: StreamEmitter, data: Arc>, - stop_sender: Option>, + stop_sender: Option>, reply_listeners: ReplyListeners, reply_timeout: Duration, #[cfg(feature = "serialize")] default_serializer: DynamicSerializer, @@ -125,7 +125,7 @@ impl Context { #[inline] #[tracing::instrument(level = "debug", skip(self))] pub(crate) async fn register_reply_listener(&self, event_id: u64) -> Result> { - let (rx, tx) = oneshot::channel(); + let (rx, tx) = mpsc::channel(8); { let mut listeners = self.reply_listeners.lock().await; listeners.insert(event_id, rx); @@ -153,9 +153,9 @@ impl Context { /// Returns the channel for a reply to the given message id #[inline] - pub(crate) async fn get_reply_sender(&self, ref_id: u64) -> Option> { - let mut listeners = self.reply_listeners.lock().await; - listeners.remove(&ref_id) + pub(crate) async fn get_reply_sender(&self, ref_id: u64) -> Option> { + let listeners = self.reply_listeners.lock().await; + listeners.get(&ref_id).cloned() } #[inline] diff --git a/src/ipc/mod.rs b/src/ipc/mod.rs index db9f456b..39b4ca5d 100644 --- a/src/ipc/mod.rs +++ b/src/ipc/mod.rs @@ -34,8 +34,8 @@ async fn handle_connection( // get the listener for replies if let Some(sender) = ctx.get_reply_sender(ref_id).await { // try sending the event to the listener for replies - if let Err(event) = sender.send(event) { - handle_event(Context::clone(&ctx), Arc::clone(&handler), event); + if let Err(event) = sender.send(event).await { + handle_event(Context::clone(&ctx), Arc::clone(&handler), event.0); } continue; } diff --git a/src/ipc/stream_emitter/emit_metadata.rs b/src/ipc/stream_emitter/emit_metadata.rs index ebc4f559..57d77fab 100644 --- a/src/ipc/stream_emitter/emit_metadata.rs +++ b/src/ipc/stream_emitter/emit_metadata.rs @@ -2,6 +2,7 @@ use crate::context::Context; use crate::error::Error; use crate::event::EventType; use crate::ipc::stream_emitter::emit_metadata_with_response::EmitMetadataWithResponse; +use crate::ipc::stream_emitter::emit_metadata_with_response_stream::EmitMetadataWithResponseStream; use crate::ipc::stream_emitter::event_metadata::EventMetadata; use crate::ipc::stream_emitter::SendStream; use crate::payload::IntoPayload; @@ -57,6 +58,14 @@ impl EmitMetadata

{ emit_metadata: Some(self), } } + + pub fn stream_replies(self) -> EmitMetadataWithResponseStream

{ + EmitMetadataWithResponseStream { + timeout: None, + fut: None, + emit_metadata: Some(self), + } + } } impl Unpin for EmitMetadata

{} diff --git a/src/ipc/stream_emitter/emit_metadata_with_response.rs b/src/ipc/stream_emitter/emit_metadata_with_response.rs index cdbf4d54..5f6c6c1a 100644 --- a/src/ipc/stream_emitter/emit_metadata_with_response.rs +++ b/src/ipc/stream_emitter/emit_metadata_with_response.rs @@ -1,11 +1,10 @@ +use crate::context::Context; use crate::error::Error; use crate::error_event::ErrorEventData; use crate::event::{Event, EventType}; use crate::ipc::stream_emitter::emit_metadata::EmitMetadata; use crate::payload::IntoPayload; use crate::{error, poll_unwrap}; -use futures::future; -use futures::future::Either; use std::future::Future; use std::pin::Pin; use std::task::Poll; @@ -54,20 +53,20 @@ impl Future for EmitMetadataWithResponse }; self.fut = Some(Box::pin(async move { - let tx = ctx.register_reply_listener(event_id).await?; + let mut tx = ctx.register_reply_listener(event_id).await?; emit_metadata.await?; - let result = - future::select(Box::pin(tx), Box::pin(tokio::time::sleep(timeout))).await; - - let reply = match result { - Either::Left((tx_result, _)) => Ok(tx_result?), - Either::Right(_) => { - let mut listeners = ctx.reply_listeners.lock().await; - listeners.remove(&event_id); + let reply = tokio::select! { + tx_result = tx.recv() => { + Ok(tx_result.ok_or_else(|| Error::SendError)?) + } + _ = tokio::time::sleep(timeout) => { Err(Error::Timeout) } }?; + + remove_reply_listener(&ctx, event_id).await; + if reply.event_type() == EventType::Error { Err(reply.payload::()?.into()) } else { @@ -78,3 +77,8 @@ impl Future for EmitMetadataWithResponse self.fut.as_mut().unwrap().as_mut().poll(cx) } } + +pub(crate) async fn remove_reply_listener(ctx: &Context, event_id: u64) { + let mut listeners = ctx.reply_listeners.lock().await; + listeners.remove(&event_id); +} diff --git a/src/ipc/stream_emitter/emit_metadata_with_response_stream.rs b/src/ipc/stream_emitter/emit_metadata_with_response_stream.rs new file mode 100644 index 00000000..8ef0de3d --- /dev/null +++ b/src/ipc/stream_emitter/emit_metadata_with_response_stream.rs @@ -0,0 +1,150 @@ +use crate::context::Context; +use crate::error::{Error, Result}; +use crate::event::{Event, EventType}; +use crate::ipc::stream_emitter::emit_metadata::EmitMetadata; +use crate::ipc::stream_emitter::emit_metadata_with_response::remove_reply_listener; +use crate::payload::IntoPayload; +use crate::poll_unwrap; +use futures_core::Stream; +use std::future::Future; +use std::pin::Pin; +use std::task::Poll; +use std::time::Duration; +use tokio::sync::mpsc::Receiver; + +/// A metadata object returned after waiting for a reply to an event +/// This object needs to be awaited for to get the actual reply +pub struct EmitMetadataWithResponseStream { + pub(crate) timeout: Option, + pub(crate) fut: Option> + Send + Sync>>>, + pub(crate) emit_metadata: Option>, +} + +pub struct ResponseStream { + event_id: u64, + ctx: Option, + receiver: Option>, + timeout: Duration, + fut: Option, Context, Receiver)>>>>>, +} + +impl ResponseStream { + pub(crate) fn new( + event_id: u64, + timeout: Duration, + ctx: Context, + receiver: Receiver, + ) -> Self { + Self { + event_id, + ctx: Some(ctx), + receiver: Some(receiver), + timeout, + fut: None, + } + } +} + +impl Unpin for EmitMetadataWithResponseStream

{} + +impl EmitMetadataWithResponseStream

{ + /// Sets a timeout for awaiting replies to this emitted event + #[inline] + pub fn with_timeout(mut self, timeout: Duration) -> Self { + self.timeout = Some(timeout); + + self + } +} + +impl Future for EmitMetadataWithResponseStream

{ + type Output = Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { + if self.fut.is_none() { + let mut emit_metadata = poll_unwrap!(self.emit_metadata.take()); + let ctx = poll_unwrap!(emit_metadata + .event_metadata + .as_ref() + .and_then(|m| m.ctx.clone())); + let timeout = self + .timeout + .clone() + .unwrap_or(ctx.default_reply_timeout.clone()); + + let event_id = match poll_unwrap!(emit_metadata.event_metadata.as_mut()).get_event() { + Ok(e) => e.id(), + Err(e) => { + return Poll::Ready(Err(e)); + } + }; + + self.fut = Some(Box::pin(async move { + let tx = ctx.register_reply_listener(event_id).await?; + emit_metadata.await?; + + Ok(ResponseStream::new(event_id, timeout, ctx, tx)) + })) + } + self.fut.as_mut().unwrap().as_mut().poll(cx) + } +} + +impl Unpin for ResponseStream {} + +impl Stream for ResponseStream { + type Item = Result; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + if self.fut.is_none() { + if self.ctx.is_none() || self.receiver.is_none() { + return Poll::Ready(None); + } + let ctx = self.ctx.take().unwrap(); + let mut receiver = self.receiver.take().unwrap(); + let timeout = self.timeout; + let event_id = self.event_id; + + self.fut = Some(Box::pin(async move { + let event: Option = tokio::select! { + tx_result = receiver.recv() => { + Ok(tx_result) + } + _ = tokio::time::sleep(timeout) => { + Err(Error::Timeout) + } + }?; + + if event.is_none() || event.as_ref().unwrap().event_type() == EventType::End { + remove_reply_listener(&ctx, event_id).await; + } + + Ok((event, ctx, receiver)) + })); + } + + match self.fut.as_mut().unwrap().as_mut().poll(cx) { + Poll::Ready(r) => match r { + Ok((event, ctx, tx)) => { + self.fut = None; + + if let Some(event) = event { + if event.event_type() != EventType::End { + self.ctx = Some(ctx); + self.receiver = Some(tx); + } + + Poll::Ready(Some(Ok(event))) + } else { + Poll::Ready(None) + } + } + Err(e) => Poll::Ready(Some(Err(e))), + }, + Poll::Pending => Poll::Pending, + } + } +} diff --git a/src/ipc/stream_emitter/mod.rs b/src/ipc/stream_emitter/mod.rs index ded33cd5..bc0f1297 100644 --- a/src/ipc/stream_emitter/mod.rs +++ b/src/ipc/stream_emitter/mod.rs @@ -1,29 +1,22 @@ pub mod emit_metadata; pub mod emit_metadata_with_response; +pub mod emit_metadata_with_response_stream; mod event_metadata; - - - - use std::sync::Arc; - - -use emit_metadata::EmitMetadata; - - - -use tokio::io::{AsyncWrite}; +use tokio::io::AsyncWrite; use tokio::sync::Mutex; use tracing; - use crate::event::EventType; use crate::ipc::context::Context; use crate::payload::IntoPayload; use crate::protocol::AsyncProtocolStream; +pub use emit_metadata_with_response_stream::ResponseStream; +use crate::prelude::emit_metadata::EmitMetadata; + #[macro_export] macro_rules! poll_unwrap { ($val:expr) => { diff --git a/src/lib.rs b/src/lib.rs index 0b898607..6ba5e22b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -137,6 +137,7 @@ pub mod prelude { pub use crate::event_handler::{EventHandler, Response}; pub use crate::ipc::context::Context; pub use crate::ipc::context::{PoolGuard, PooledContext}; + pub use crate::ipc::stream_emitter::*; pub use crate::ipc::*; pub use crate::macros::*; pub use crate::namespace::Namespace; diff --git a/tests/test_event_streams.rs b/tests/test_event_streams.rs new file mode 100644 index 00000000..5a8f2edb --- /dev/null +++ b/tests/test_event_streams.rs @@ -0,0 +1,88 @@ +use crate::utils::call_counter::{get_counter_from_context, increment_counter_for_event}; +use crate::utils::protocol::TestProtocolListener; +use crate::utils::{get_free_port, start_server_and_client}; +use bromine::prelude::*; +use byteorder::ReadBytesExt; +use futures::StreamExt; +use std::io::Read; +use std::time::Duration; + +mod utils; + +/// When awaiting the reply to an event the handler for the event doesn't get called. +/// Therefore we expect it to have a call count of 0. +#[tokio::test] +async fn it_receives_responses() { + let port = get_free_port(); + let ctx = get_client_with_server(port).await; + let mut reply_stream = ctx + .emit("stream", EmptyPayload) + .stream_replies() + .await + .unwrap(); + + let mut reply_stream_2 = ctx + .emit("stream", EmptyPayload) + .stream_replies() + .await + .unwrap(); + + for i in 0u8..=100 { + if let Some(Ok(event)) = reply_stream.next().await { + assert_eq!(event.payload::().unwrap().0, i) + } else { + panic!("stream 1 has no value {}", i); + } + if let Some(Ok(event)) = reply_stream_2.next().await { + assert_eq!(event.payload::().unwrap().0, i) + } else { + panic!("stream 2 has no value {}", i); + } + } + let counter = get_counter_from_context(&ctx).await; + assert_eq!(counter.get("stream").await, 2); +} + +async fn get_client_with_server(port: u8) -> Context { + start_server_and_client(move || get_builder(port)).await +} + +fn get_builder(port: u8) -> IPCBuilder { + IPCBuilder::new() + .address(port) + .timeout(Duration::from_millis(100)) + .on("stream", callback!(handle_stream_event)) +} + +async fn handle_stream_event(ctx: &Context, event: Event) -> IPCResult { + increment_counter_for_event(ctx, &event).await; + for i in 0u8..=99 { + ctx.emit("number", NumberPayload(i)).await?; + } + + ctx.response(NumberPayload(100)) +} + +pub struct EmptyPayload; + +impl IntoPayload for EmptyPayload { + fn into_payload(self, _: &Context) -> IPCResult> { + Ok(vec![]) + } +} + +pub struct NumberPayload(u8); + +impl IntoPayload for NumberPayload { + fn into_payload(self, _: &Context) -> IPCResult> { + Ok(vec![self.0]) + } +} + +impl FromPayload for NumberPayload { + fn from_payload(mut reader: R) -> IPCResult { + let num = reader.read_u8()?; + + Ok(NumberPayload(num)) + } +} From 38342eac44c97612f9c0206db4108bd09ab460c9 Mon Sep 17 00:00:00 2001 From: trivernis Date: Sun, 6 Feb 2022 15:07:32 +0100 Subject: [PATCH 09/12] Fix dangling response listeners Signed-off-by: trivernis --- src/ipc/mod.rs | 3 +++ src/ipc/stream_emitter/emit_metadata_with_response_stream.rs | 1 + 2 files changed, 4 insertions(+) diff --git a/src/ipc/mod.rs b/src/ipc/mod.rs index 39b4ca5d..8efcc30f 100644 --- a/src/ipc/mod.rs +++ b/src/ipc/mod.rs @@ -62,6 +62,7 @@ async fn handle_connection( /// Handles a single event in a different tokio context fn handle_event(mut ctx: Context, handler: Arc, event: Event) { ctx.set_ref_id(Some(event.id())); + let event_id = event.id(); tokio::spawn(async move { match handler.handle_event(&ctx, event).await { @@ -74,6 +75,8 @@ fn handle_event(mut ctx: Context, handler: Arc, event: Event) { { tracing::error!("Error occurred when sending error response: {:?}", e); } + let mut reply_listeners = ctx.reply_listeners.lock().await; + reply_listeners.remove(&event_id); } Err(e) => { // emit an error event diff --git a/src/ipc/stream_emitter/emit_metadata_with_response_stream.rs b/src/ipc/stream_emitter/emit_metadata_with_response_stream.rs index 8ef0de3d..339e9773 100644 --- a/src/ipc/stream_emitter/emit_metadata_with_response_stream.rs +++ b/src/ipc/stream_emitter/emit_metadata_with_response_stream.rs @@ -20,6 +20,7 @@ pub struct EmitMetadataWithResponseStream { pub(crate) emit_metadata: Option>, } +/// An asynchronous stream one can read all responses to a specific event from. pub struct ResponseStream { event_id: u64, ctx: Option, From 18412c4a2fe1921fac8743489714bf5ab170b03a Mon Sep 17 00:00:00 2001 From: trivernis Date: Sun, 6 Feb 2022 15:17:28 +0100 Subject: [PATCH 10/12] Increment version Signed-off-by: trivernis --- Cargo.lock | 2 +- Cargo.toml | 2 +- README.md | 18 ++++++++++++++---- 3 files changed, 16 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9951cb54..54ead485 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -93,7 +93,7 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" [[package]] name = "bromine" -version = "0.17.1" +version = "0.18.0" dependencies = [ "async-trait", "bincode", diff --git a/Cargo.toml b/Cargo.toml index 68dcbaa9..fb6e652e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "bromine" -version = "0.17.1" +version = "0.18.0" authors = ["trivernis "] edition = "2018" readme = "README.md" diff --git a/README.md b/README.md index 6a4c26ad..2f36a63a 100644 --- a/README.md +++ b/README.md @@ -19,7 +19,7 @@ use tokio::net::TcpListener; async fn handle_ping(ctx: &Context, event: Event) -> Result<()> { println!("Received ping event."); ctx.emit("pong", ()).await?; - Ok(()) + Ok(Response::empty()) } #[tokio::main] @@ -31,8 +31,15 @@ async fn main() { .on("ping", callback!(handle_ping)) .build_client().await.unwrap(); -// emit an initial event - let response = ctx.emit("ping", ()).await_response().await?; + // emit an event and wait for responses + let response = ctx.emit("ping", ()).await_reply().await?; + + // emit an event and get all responses as stream + let stream = ctx.emit("ping", ()).stream_replies().await?; + + while let Some(Ok(event)) = stream.next().await { + println!("{}", event.name()); + } } ``` @@ -50,7 +57,10 @@ async fn main() { // register callback .on("ping", callback!(ctx, event, async move { println!("Received ping event."); - Ok(()) + for _ in 0..10 { + ctx.emit("pong", ()).await?; + } + Ok(Response::empty()) })) .build_server().await.unwrap(); } From 1effb4f6c97a40e4e1054c1ea1b088f28daa01a9 Mon Sep 17 00:00:00 2001 From: trivernis Date: Sun, 6 Feb 2022 15:20:22 +0100 Subject: [PATCH 11/12] Replace one unwrap_or with unwrap_or_else Signed-off-by: trivernis --- src/ipc/stream_emitter/emit_metadata_with_response.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/ipc/stream_emitter/emit_metadata_with_response.rs b/src/ipc/stream_emitter/emit_metadata_with_response.rs index 5f6c6c1a..29a8c2c0 100644 --- a/src/ipc/stream_emitter/emit_metadata_with_response.rs +++ b/src/ipc/stream_emitter/emit_metadata_with_response.rs @@ -42,8 +42,8 @@ impl Future for EmitMetadataWithResponse .and_then(|m| m.ctx.clone())); let timeout = self .timeout - .clone() - .unwrap_or(ctx.default_reply_timeout.clone()); + .take() + .unwrap_or_else(|| ctx.default_reply_timeout.clone()); let event_id = match poll_unwrap!(emit_metadata.event_metadata.as_mut()).get_event() { Ok(e) => e.id(), From 9586ed6d36e3dffc33dbfc774b523ff992345b5d Mon Sep 17 00:00:00 2001 From: trivernis Date: Sun, 6 Feb 2022 15:25:11 +0100 Subject: [PATCH 12/12] Fix further performance issues Signed-off-by: trivernis --- benches/deserialization_benchmark.rs | 2 +- src/ipc/stream_emitter/emit_metadata_with_response_stream.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/benches/deserialization_benchmark.rs b/benches/deserialization_benchmark.rs index 74984d83..c79889f1 100644 --- a/benches/deserialization_benchmark.rs +++ b/benches/deserialization_benchmark.rs @@ -3,7 +3,7 @@ use criterion::{criterion_group, criterion_main}; use criterion::{BatchSize, Criterion}; use std::io::Cursor; -use bromine::event::{Event}; +use bromine::event::Event; use tokio::runtime::Runtime; pub const EVENT_NAME: &str = "bench_event"; diff --git a/src/ipc/stream_emitter/emit_metadata_with_response_stream.rs b/src/ipc/stream_emitter/emit_metadata_with_response_stream.rs index 339e9773..a0288de4 100644 --- a/src/ipc/stream_emitter/emit_metadata_with_response_stream.rs +++ b/src/ipc/stream_emitter/emit_metadata_with_response_stream.rs @@ -70,8 +70,8 @@ impl Future for EmitMetadataWithResponse .and_then(|m| m.ctx.clone())); let timeout = self .timeout - .clone() - .unwrap_or(ctx.default_reply_timeout.clone()); + .take() + .unwrap_or_else(|| ctx.default_reply_timeout.clone()); let event_id = match poll_unwrap!(emit_metadata.event_metadata.as_mut()).get_event() { Ok(e) => e.id(),