diff --git a/Cargo.lock b/Cargo.lock index 1e339cbc..54ead485 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -93,7 +93,7 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" [[package]] name = "bromine" -version = "0.17.1" +version = "0.18.0" dependencies = [ "async-trait", "bincode", @@ -101,7 +101,9 @@ dependencies = [ "criterion", "crossbeam-utils", "futures", + "futures-core", "lazy_static", + "num_enum", "postcard", "rmp-serde", "serde", @@ -578,6 +580,27 @@ dependencies = [ "libc", ] +[[package]] +name = "num_enum" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "720d3ea1055e4e4574c0c0b0f8c3fd4f24c4cdaf465948206dea090b57b526ad" +dependencies = [ + "num_enum_derive", +] + +[[package]] +name = "num_enum_derive" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d992b768490d7fe0d8586d9b5745f6c49f557da6d81dc982b1d167ad4edbb21" +dependencies = [ + "proc-macro-crate", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "oorandom" version = "11.1.3" @@ -641,6 +664,16 @@ version = "0.1.5-pre" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7c68cb38ed13fd7bc9dd5db8f165b7c8d9c1a315104083a2b10f11354c2af97f" +[[package]] +name = "proc-macro-crate" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ebace6889caf889b4d3f76becee12e90353f2b8c7d875534a71e5742f8f6f83" +dependencies = [ + "thiserror", + "toml", +] + [[package]] name = "proc-macro2" version = "1.0.35" @@ -948,6 +981,15 @@ dependencies = [ "syn", ] +[[package]] +name = "toml" +version = "0.5.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a31142970826733df8241ef35dc040ef98c679ab14d7c3e54d827099b3acecaa" +dependencies = [ + "serde", +] + [[package]] name = "tracing" version = "0.1.29" diff --git a/Cargo.toml b/Cargo.toml index d6285cf8..fb6e652e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "bromine" -version = "0.17.1" +version = "0.18.0" authors = ["trivernis "] edition = "2018" readme = "README.md" @@ -27,6 +27,8 @@ typemap_rev = "0.1.5" byteorder = "1.4.3" async-trait = "0.1.52" futures = "0.3.19" +num_enum = "0.5.6" +futures-core = "0.3.19" rmp-serde = {version = "0.15.5", optional = true} bincode = {version = "1.3.3", optional = true} serde_json = {version = "1.0.73", optional = true} diff --git a/README.md b/README.md index 6a4c26ad..2f36a63a 100644 --- a/README.md +++ b/README.md @@ -19,7 +19,7 @@ use tokio::net::TcpListener; async fn handle_ping(ctx: &Context, event: Event) -> Result<()> { println!("Received ping event."); ctx.emit("pong", ()).await?; - Ok(()) + Ok(Response::empty()) } #[tokio::main] @@ -31,8 +31,15 @@ async fn main() { .on("ping", callback!(handle_ping)) .build_client().await.unwrap(); -// emit an initial event - let response = ctx.emit("ping", ()).await_response().await?; + // emit an event and wait for responses + let response = ctx.emit("ping", ()).await_reply().await?; + + // emit an event and get all responses as stream + let stream = ctx.emit("ping", ()).stream_replies().await?; + + while let Some(Ok(event)) = stream.next().await { + println!("{}", event.name()); + } } ``` @@ -50,7 +57,10 @@ async fn main() { // register callback .on("ping", callback!(ctx, event, async move { println!("Received ping event."); - Ok(()) + for _ in 0..10 { + ctx.emit("pong", ()).await?; + } + Ok(Response::empty()) })) .build_server().await.unwrap(); } diff --git a/benches/deserialization_benchmark.rs b/benches/deserialization_benchmark.rs index af65bea2..c79889f1 100644 --- a/benches/deserialization_benchmark.rs +++ b/benches/deserialization_benchmark.rs @@ -9,7 +9,7 @@ use tokio::runtime::Runtime; pub const EVENT_NAME: &str = "bench_event"; fn create_event_bytes_reader(data_size: usize) -> Cursor> { - let bytes = Event::new(EVENT_NAME.to_string(), vec![0u8; data_size], None) + let bytes = Event::initiator(None, EVENT_NAME.to_string(), vec![0u8; data_size]) .into_bytes() .unwrap(); Cursor::new(bytes) diff --git a/benches/serialization_benchmark.rs b/benches/serialization_benchmark.rs index 8f9f6e98..2d25643b 100644 --- a/benches/serialization_benchmark.rs +++ b/benches/serialization_benchmark.rs @@ -6,7 +6,7 @@ use criterion::{ pub const EVENT_NAME: &str = "bench_event"; fn create_event(data_size: usize) -> Event { - Event::new(EVENT_NAME.to_string(), vec![0u8; data_size], None) + Event::initiator(None, EVENT_NAME.to_string(), vec![0u8; data_size]) } fn event_serialization(c: &mut Criterion) { diff --git a/src/events/error_event.rs b/src/events/error_event.rs index 2b6e160e..bff310a1 100644 --- a/src/events/error_event.rs +++ b/src/events/error_event.rs @@ -8,6 +8,7 @@ use std::fmt::{Display, Formatter}; use std::io::Read; pub static ERROR_EVENT_NAME: &str = "error"; +pub static END_EVENT_NAME: &str = "end"; /// Data returned on error event. /// The error event has a default handler that just logs that diff --git a/src/events/event.rs b/src/events/event.rs index e04f0268..c6e33ddc 100644 --- a/src/events/event.rs +++ b/src/events/event.rs @@ -2,6 +2,8 @@ use crate::error::{Error, Result}; use crate::events::generate_event_id; use crate::events::payload::FromPayload; use byteorder::{BigEndian, ReadBytesExt}; +use num_enum::{IntoPrimitive, TryFromPrimitive}; +use std::convert::TryFrom; use std::fmt::Debug; use std::io::{Cursor, Read}; use tokio::io::{AsyncRead, AsyncReadExt}; @@ -20,36 +22,65 @@ pub struct Event { #[derive(Debug)] struct EventHeader { id: u64, + event_type: EventType, ref_id: Option, namespace: Option, name: String, } +#[derive(Clone, Debug, TryFromPrimitive, IntoPrimitive, Copy, Ord, PartialOrd, Eq, PartialEq)] +#[repr(u8)] +pub enum EventType { + Initiator, + Response, + End, + Error, +} + impl Event { - /// Creates a new event with a namespace + /// Creates a new event that acts as an initiator for further response events #[tracing::instrument(level = "trace", skip(data))] - pub fn with_namespace( - namespace: String, - name: String, - data: Vec, - ref_id: Option, - ) -> Self { - let header = EventHeader { - id: generate_event_id(), - ref_id, - namespace: Some(namespace), - name, - }; - Self { header, data } + #[inline] + pub fn initiator(namespace: Option, name: String, data: Vec) -> Self { + Self::new(namespace, name, data, None, EventType::Initiator) + } + + /// Creates a new event that is a response to a previous event + #[tracing::instrument(level = "trace", skip(data))] + #[inline] + pub fn response(namespace: Option, name: String, data: Vec, ref_id: u64) -> Self { + Self::new(namespace, name, data, Some(ref_id), EventType::Response) + } + + /// Creates a new error event as a response to a previous event + #[tracing::instrument(level = "trace", skip(data))] + #[inline] + pub fn error(namespace: Option, name: String, data: Vec, ref_id: u64) -> Self { + Self::new(namespace, name, data, Some(ref_id), EventType::Error) + } + + /// Creates a new event that indicates the end of a series of responses (in an event handler) + /// and might contain a final response payload + #[tracing::instrument(level = "trace", skip(data))] + #[inline] + pub fn end(namespace: Option, name: String, data: Vec, ref_id: u64) -> Self { + Self::new(namespace, name, data, Some(ref_id), EventType::Response) } /// Creates a new event #[tracing::instrument(level = "trace", skip(data))] - pub fn new(name: String, data: Vec, ref_id: Option) -> Self { + pub(crate) fn new( + namespace: Option, + name: String, + data: Vec, + ref_id: Option, + event_type: EventType, + ) -> Self { let header = EventHeader { id: generate_event_id(), + event_type, ref_id, - namespace: None, + namespace, name, }; Self { header, data } @@ -61,6 +92,12 @@ impl Event { self.header.id } + /// The type of the event + #[inline] + pub fn event_type(&self) -> EventType { + self.header.event_type + } + /// The ID of the message referenced by this message. /// It represents the message that is replied to and can be None. #[inline] @@ -139,6 +176,7 @@ impl EventHeader { pub fn into_bytes(self) -> Vec { let mut buf = FORMAT_VERSION.to_vec(); buf.append(&mut self.id.to_be_bytes().to_vec()); + buf.push(self.event_type.into()); if let Some(ref_id) = self.ref_id { buf.push(0xFF); @@ -164,6 +202,8 @@ impl EventHeader { pub fn from_read(reader: &mut R) -> Result { Self::read_version(reader)?; let id = reader.read_u64::()?; + let event_type_num = reader.read_u8()?; + let event_type = EventType::try_from(event_type_num).map_err(|_| Error::CorruptedEvent)?; let ref_id = Self::read_ref_id(reader)?; let namespace_len = reader.read_u16::()?; let namespace = Self::read_namespace(reader, namespace_len)?; @@ -171,6 +211,7 @@ impl EventHeader { Ok(Self { id, + event_type, ref_id, namespace, name, diff --git a/src/events/event_handler.rs b/src/events/event_handler.rs index 2ef18062..8b87f603 100644 --- a/src/events/event_handler.rs +++ b/src/events/event_handler.rs @@ -1,14 +1,38 @@ use crate::error::Result; use crate::events::event::Event; use crate::ipc::context::Context; +use crate::payload::{BytePayload, IntoPayload}; use std::collections::HashMap; use std::fmt::{Debug, Formatter}; use std::future::Future; use std::pin::Pin; use std::sync::Arc; +pub struct Response(Vec); + +impl Response { + /// Creates a new response with a given payload + pub fn payload(ctx: &Context, payload: P) -> Result { + let bytes = payload.into_payload(ctx)?; + + Ok(Self(bytes)) + } + + /// Creates an empty response + pub fn empty() -> Self { + Self(vec![]) + } + + pub(crate) fn into_byte_payload(self) -> BytePayload { + BytePayload::new(self.0) + } +} + type EventCallback = Arc< - dyn for<'a> Fn(&'a Context, Event) -> Pin> + Send + 'a)>> + dyn for<'a> Fn( + &'a Context, + Event, + ) -> Pin> + Send + 'a)>> + Send + Sync, >; @@ -46,7 +70,7 @@ impl EventHandler { F: for<'a> Fn( &'a Context, Event, - ) -> Pin> + Send + 'a)>> + ) -> Pin> + Send + 'a)>> + Send + Sync, { @@ -56,11 +80,11 @@ impl EventHandler { /// Handles a received event #[inline] #[tracing::instrument(level = "debug", skip(self, ctx, event))] - pub async fn handle_event(&self, ctx: &Context, event: Event) -> Result<()> { + pub async fn handle_event(&self, ctx: &Context, event: Event) -> Result { if let Some(cb) = self.callbacks.get(event.name()) { - cb.as_ref()(ctx, event).await?; + cb.as_ref()(ctx, event).await + } else { + Ok(Response::empty()) } - - Ok(()) } } diff --git a/src/events/mod.rs b/src/events/mod.rs index 207ecb7b..2e039d59 100644 --- a/src/events/mod.rs +++ b/src/events/mod.rs @@ -10,7 +10,6 @@ pub mod payload; #[cfg(feature = "serialize")] pub mod payload_serializer; - /// Generates a new event id pub(crate) fn generate_event_id() -> u64 { lazy_static::lazy_static! { diff --git a/src/ipc/builder.rs b/src/ipc/builder.rs index 58810081..0019dc21 100644 --- a/src/ipc/builder.rs +++ b/src/ipc/builder.rs @@ -1,5 +1,7 @@ use crate::error::{Error, Result}; -use crate::events::error_event::{ErrorEventData, ERROR_EVENT_NAME}; +use crate::error_event::ErrorEventData; +use crate::event_handler::Response; +use crate::events::error_event::ERROR_EVENT_NAME; use crate::events::event::Event; use crate::events::event_handler::EventHandler; use crate::ipc::client::IPCClient; @@ -24,6 +26,7 @@ use typemap_rev::{TypeMap, TypeMapKey}; /// use typemap_rev::TypeMapKey; /// use bromine::IPCBuilder; /// use tokio::net::TcpListener; +/// use bromine::prelude::Response; /// /// struct CustomKey; /// @@ -37,13 +40,13 @@ use typemap_rev::{TypeMap, TypeMapKey}; /// // register callback /// .on("ping", |_ctx, _event| Box::pin(async move { /// println!("Received ping event."); -/// Ok(()) +/// Ok(Response::empty()) /// })) /// // register a namespace /// .namespace("namespace") /// .on("namespace-event", |_ctx, _event| Box::pin(async move { /// println!("Namespace event."); -/// Ok(()) +/// Ok(Response::empty()) /// })) /// .build() /// // add context shared data @@ -75,7 +78,7 @@ where tracing::warn!(error_data.code); tracing::warn!("error_data.message = '{}'", error_data.message); - Ok(()) + Ok(Response::empty()) }) }); Self { @@ -102,7 +105,7 @@ where F: for<'a> Fn( &'a Context, Event, - ) -> Pin> + Send + 'a)>> + ) -> Pin> + Send + 'a)>> + Send + Sync, { diff --git a/src/ipc/context.rs b/src/ipc/context.rs index 9fb5172d..1f0e081e 100644 --- a/src/ipc/context.rs +++ b/src/ipc/context.rs @@ -1,22 +1,24 @@ use std::collections::HashMap; use std::mem; use std::ops::{Deref, DerefMut}; -use std::sync::Arc; use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; -use tokio::sync::{Mutex, oneshot, RwLock}; -use tokio::sync::oneshot::{Sender, Receiver}; +use tokio::sync::mpsc::Receiver; +use tokio::sync::{mpsc, oneshot, Mutex, RwLock}; use tokio::time::Duration; use typemap_rev::TypeMap; use crate::error::{Error, Result}; -use crate::event::Event; -use crate::ipc::stream_emitter::{EmitMetadata, StreamEmitter}; +use crate::event::{Event, EventType}; +use crate::ipc::stream_emitter::emit_metadata::EmitMetadata; +use crate::ipc::stream_emitter::StreamEmitter; +use crate::payload::IntoPayload; #[cfg(feature = "serialize")] use crate::payload::{DynamicSerializer, SerdePayload}; -use crate::payload::IntoPayload; +use crate::prelude::Response; -pub(crate) type ReplyListeners = Arc>>>; +pub(crate) type ReplyListeners = Arc>>>; /// An object provided to each callback function. /// Currently it only holds the event emitter to emit response events in event callbacks. @@ -38,7 +40,7 @@ pub struct Context { /// Field to store additional context data pub data: Arc>, - stop_sender: Arc>>>, + stop_sender: Arc>>>, pub(crate) reply_listeners: ReplyListeners, @@ -54,7 +56,7 @@ impl Context { pub(crate) fn new( emitter: StreamEmitter, data: Arc>, - stop_sender: Option>, + stop_sender: Option>, reply_listeners: ReplyListeners, reply_timeout: Duration, #[cfg(feature = "serialize")] default_serializer: DynamicSerializer, @@ -71,12 +73,26 @@ impl Context { } } - /// Emits an event with a given payload that can be serialized into bytes - pub fn emit, P: IntoPayload>( + /// Emits a raw event. Only for internal use + pub(crate) fn emit_raw( &self, - name: S, + name: &str, + namespace: Option, + event_type: EventType, payload: P, ) -> EmitMetadata

{ + self.emitter.emit_raw( + self.clone(), + self.ref_id.clone(), + name, + namespace, + event_type, + payload, + ) + } + + /// Emits an event with a given payload that can be serialized into bytes + pub fn emit, P: IntoPayload>(&self, name: S, payload: P) -> EmitMetadata

{ if let Some(ref_id) = &self.ref_id { self.emitter .emit_response(self.clone(), *ref_id, name, payload) @@ -100,11 +116,16 @@ impl Context { } } + /// Ends the event flow by creating a final response + pub fn response(&self, payload: P) -> Result { + Response::payload(self, payload) + } + /// Registers a reply listener for a given event #[inline] #[tracing::instrument(level = "debug", skip(self))] pub(crate) async fn register_reply_listener(&self, event_id: u64) -> Result> { - let (rx, tx) = oneshot::channel(); + let (rx, tx) = mpsc::channel(8); { let mut listeners = self.reply_listeners.lock().await; listeners.insert(event_id, rx); @@ -132,9 +153,9 @@ impl Context { /// Returns the channel for a reply to the given message id #[inline] - pub(crate) async fn get_reply_sender(&self, ref_id: u64) -> Option> { - let mut listeners = self.reply_listeners.lock().await; - listeners.remove(&ref_id) + pub(crate) async fn get_reply_sender(&self, ref_id: u64) -> Option> { + let listeners = self.reply_listeners.lock().await; + listeners.get(&ref_id).cloned() } #[inline] @@ -149,16 +170,16 @@ pub struct PooledContext { } pub struct PoolGuard - where - T: Clone, +where + T: Clone, { inner: T, count: Arc, } impl Deref for PoolGuard - where - T: Clone, +where + T: Clone, { type Target = T; @@ -169,8 +190,8 @@ impl Deref for PoolGuard } impl DerefMut for PoolGuard - where - T: Clone, +where + T: Clone, { #[inline] fn deref_mut(&mut self) -> &mut Self::Target { @@ -179,8 +200,8 @@ impl DerefMut for PoolGuard } impl Clone for PoolGuard - where - T: Clone, +where + T: Clone, { #[inline] fn clone(&self) -> Self { @@ -194,8 +215,8 @@ impl Clone for PoolGuard } impl Drop for PoolGuard - where - T: Clone, +where + T: Clone, { #[inline] fn drop(&mut self) { @@ -204,8 +225,8 @@ impl Drop for PoolGuard } impl PoolGuard - where - T: Clone, +where + T: Clone, { pub(crate) fn new(inner: T) -> Self { Self { diff --git a/src/ipc/mod.rs b/src/ipc/mod.rs index 8d280315..8efcc30f 100644 --- a/src/ipc/mod.rs +++ b/src/ipc/mod.rs @@ -1,7 +1,8 @@ use std::collections::HashMap; use std::sync::Arc; -use crate::error_event::{ERROR_EVENT_NAME, ErrorEventData}; +use crate::error_event::{ErrorEventData, END_EVENT_NAME, ERROR_EVENT_NAME}; +use crate::event::EventType; use crate::events::event_handler::EventHandler; use crate::namespaces::namespace::Namespace; use crate::prelude::*; @@ -33,13 +34,19 @@ async fn handle_connection( // get the listener for replies if let Some(sender) = ctx.get_reply_sender(ref_id).await { // try sending the event to the listener for replies - if let Err(event) = sender.send(event) { - handle_event(Context::clone(&ctx), Arc::clone(&handler), event); + if let Err(event) = sender.send(event).await { + handle_event(Context::clone(&ctx), Arc::clone(&handler), event.0); } continue; } tracing::trace!("No response listener found for event. Passing to regular listener."); } + + if event.event_type() == EventType::End { + tracing::debug!("Received dangling end event with no listener"); + continue; + } + if let Some(namespace) = event.namespace().clone().and_then(|n| namespaces.get(&n)) { tracing::trace!("Passing event to namespace listener"); let handler = Arc::clone(&namespace.handler); @@ -55,23 +62,41 @@ async fn handle_connection( /// Handles a single event in a different tokio context fn handle_event(mut ctx: Context, handler: Arc, event: Event) { ctx.set_ref_id(Some(event.id())); + let event_id = event.id(); tokio::spawn(async move { - if let Err(e) = handler.handle_event(&ctx, event).await { - // emit an error event - if let Err(e) = ctx - .emit( - ERROR_EVENT_NAME, - ErrorEventData { - message: format!("{:?}", e), - code: 500, - }, - ).await - { - tracing::error!("Error occurred when sending error response: {:?}", e); + match handler.handle_event(&ctx, event).await { + Ok(r) => { + // emit the response under a unique name to prevent it being interpreted as a new + // event initiator + if let Err(e) = ctx + .emit_raw(END_EVENT_NAME, None, EventType::End, r.into_byte_payload()) + .await + { + tracing::error!("Error occurred when sending error response: {:?}", e); + } + let mut reply_listeners = ctx.reply_listeners.lock().await; + reply_listeners.remove(&event_id); } + Err(e) => { + // emit an error event + if let Err(e) = ctx + .emit_raw( + ERROR_EVENT_NAME, + None, + EventType::Error, + ErrorEventData { + message: format!("{:?}", e), + code: 500, + }, + ) + .await + { + tracing::error!("Error occurred when sending error response: {:?}", e); + } - tracing::error!("Failed to handle event: {:?}", e); + tracing::error!("Failed to handle event: {:?}", e); + } } }); } diff --git a/src/ipc/stream_emitter.rs b/src/ipc/stream_emitter.rs deleted file mode 100644 index fb058198..00000000 --- a/src/ipc/stream_emitter.rs +++ /dev/null @@ -1,288 +0,0 @@ -use std::future::Future; -use std::mem; -use std::ops::DerefMut; -use std::pin::Pin; -use std::sync::Arc; -use std::task::Poll; -use std::time::Duration; - -use futures::future; -use futures::future::Either; -use tokio::io::{AsyncWrite, AsyncWriteExt}; -use tokio::sync::Mutex; -use tracing; - -use crate::error::{Error, Result}; -use crate::error_event::{ERROR_EVENT_NAME, ErrorEventData}; -use crate::events::event::Event; -use crate::ipc::context::Context; -use crate::payload::IntoPayload; -use crate::protocol::AsyncProtocolStream; - -macro_rules! poll_unwrap { - ($val:expr) => { - if let Some(v) = $val { - v - } else { - tracing::error!("Polling a future with an invalid state."); - return Poll::Ready(Err(Error::InvalidState)) - } - } -} - -type SendStream = Arc>; - -/// An abstraction over any type that implements the AsyncProtocolStream trait -/// to emit events and share a connection across multiple -/// contexts. -#[derive(Clone)] -pub struct StreamEmitter { - stream: SendStream, -} - -impl StreamEmitter { - pub fn new(stream: P::OwnedSplitWriteHalf) -> Self { - Self { - stream: Arc::new(Mutex::new(stream)), - } - } - - #[tracing::instrument(level = "trace", skip(self, ctx, payload))] - fn _emit( - &self, - ctx: Context, - namespace: Option<&str>, - event: &str, - payload: P, - res_id: Option, - ) -> EmitMetadata

{ - EmitMetadata::new(ctx, self.stream.clone(), event.to_string(), namespace.map(|n| n.to_string()), payload, res_id) - } - - /// Emits an event - #[inline] - pub(crate) fn emit, P: IntoPayload>( - &self, - ctx: Context, - event: S, - payload: P, - ) -> EmitMetadata

{ - self._emit(ctx, None, event.as_ref(), payload, None) - } - - /// Emits an event to a specific namespace - #[inline] - pub(crate) fn emit_to, S2: AsRef, P: IntoPayload>( - &self, - ctx: Context, - namespace: S1, - event: S2, - payload: P, - ) -> EmitMetadata

{ - self._emit(ctx, Some(namespace.as_ref()), event.as_ref(), payload, None) - } - - /// Emits a response to an event - #[inline] - pub(crate) fn emit_response, P: IntoPayload>( - &self, - ctx: Context, - event_id: u64, - event: S, - payload: P, - ) -> EmitMetadata

{ - self._emit(ctx, None, event.as_ref(), payload, Some(event_id)) - } - - /// Emits a response to an event to a namespace - #[inline] - pub(crate) fn emit_response_to, S2: AsRef, P: IntoPayload>( - &self, - ctx: Context, - event_id: u64, - namespace: S1, - event: S2, - payload: P, - ) -> EmitMetadata

{ - self._emit( - ctx, - Some(namespace.as_ref()), - event.as_ref(), - payload, - Some(event_id), - ) - } -} - -struct EventMetadata { - event: Option, - ctx: Option, - event_namespace: Option>, - event_name: Option, - res_id: Option>, - payload: Option

, -} - -impl EventMetadata

{ - pub fn get_event(&mut self) -> Result<&Event> { - if self.event.is_none() { - self.build_event()?; - } - Ok(self.event.as_ref().unwrap()) - } - - pub fn take_event(mut self) -> Result> { - if self.event.is_none() { - self.build_event()?; - } - Ok(mem::take(&mut self.event)) - } - - fn build_event(&mut self) -> Result<()> { - let ctx = self.ctx.take().ok_or(Error::InvalidState)?; - let event = self.event_name.take().ok_or(Error::InvalidState)?; - let namespace = self.event_namespace.take().ok_or(Error::InvalidState)?; - let payload = self.payload.take().ok_or(Error::InvalidState)?; - let res_id = self.res_id.take().ok_or(Error::InvalidState)?; - let payload_bytes = payload.into_payload(&ctx)?; - - let event = if let Some(namespace) = namespace { - Event::with_namespace(namespace.to_string(), event.to_string(), payload_bytes, res_id) - } else { - Event::new(event.to_string(), payload_bytes, res_id) - }; - self.event = Some(event); - - Ok(()) - } -} - -/// A metadata object returned after emitting an event. -/// To send the event this object needs to be awaited -/// This object can be used to wait for a response to an event. -/// The result contains the emitted event id. -pub struct EmitMetadata { - event_metadata: Option>, - stream: Option, - fut: Option> + Send + Sync>>>, -} - -/// A metadata object returned after waiting for a reply to an event -/// This object needs to be awaited for to get the actual reply -pub struct EmitMetadataWithResponse { - timeout: Option, - fut: Option> + Send + Sync>>>, - emit_metadata: Option>, -} - -impl EmitMetadata

{ - #[inline] - pub(crate) fn new(ctx: Context, stream: SendStream, event_name: String, event_namespace: Option, payload: P, res_id: Option) -> Self { - Self { - event_metadata: Some(EventMetadata { - event: None, - ctx: Some(ctx), - event_name: Some(event_name), - event_namespace: Some(event_namespace), - payload: Some(payload), - res_id: Some(res_id), - }), - stream: Some(stream), - fut: None, - } - } - - /// Waits for a reply to the given message. - #[tracing::instrument(skip(self), fields(self.message_id))] - pub fn await_reply(self) -> EmitMetadataWithResponse

{ - EmitMetadataWithResponse { - timeout: None, - fut: None, - emit_metadata: Some(self), - } - } -} - -impl EmitMetadataWithResponse

{ - /// Sets a timeout for awaiting replies to this emitted event - #[inline] - pub fn with_timeout(mut self, timeout: Duration) -> Self { - self.timeout = Some(timeout); - - self - } -} - -impl Unpin for EmitMetadata

{} - -impl Unpin for EmitMetadataWithResponse

{} - -impl Future for EmitMetadata

{ - type Output = Result; - - fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { - if self.fut.is_none() { - let event_metadata = poll_unwrap!(self.event_metadata.take()); - let stream = poll_unwrap!(self.stream.take()); - - let event = match event_metadata.take_event() { - Ok(m) => { m } - Err(e) => { return Poll::Ready(Err(e)); } - }.expect("poll after future was done"); - - self.fut = Some(Box::pin(async move { - let event_id = event.id(); - let event_bytes = event.into_bytes()?; - let mut stream = stream.lock().await; - stream.deref_mut().write_all(&event_bytes[..]).await?; - tracing::trace!(bytes_len = event_bytes.len()); - - Ok(event_id) - })); - } - self.fut.as_mut().unwrap().as_mut().poll(cx) - } -} - -impl Future for EmitMetadataWithResponse

{ - type Output = Result; - - fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { - if self.fut.is_none() { - let mut emit_metadata = poll_unwrap!(self.emit_metadata.take()); - let ctx = poll_unwrap!(emit_metadata.event_metadata.as_ref().and_then(|m| m.ctx.clone())); - let timeout = self.timeout.clone().unwrap_or(ctx.default_reply_timeout.clone()); - - let event_id = match poll_unwrap!(emit_metadata.event_metadata.as_mut()).get_event() { - Ok(e) => { e.id() } - Err(e) => { return Poll::Ready(Err(e)); } - }; - - self.fut = Some(Box::pin(async move { - let tx = ctx.register_reply_listener(event_id).await?; - emit_metadata.await?; - - let result = future::select( - Box::pin(tx), - Box::pin(tokio::time::sleep(timeout)), - ) - .await; - - let reply = match result { - Either::Left((tx_result, _)) => Ok(tx_result?), - Either::Right(_) => { - let mut listeners = ctx.reply_listeners.lock().await; - listeners.remove(&event_id); - Err(Error::Timeout) - } - }?; - if reply.name() == ERROR_EVENT_NAME { - Err(reply.payload::()?.into()) - } else { - Ok(reply) - } - })) - } - self.fut.as_mut().unwrap().as_mut().poll(cx) - } -} \ No newline at end of file diff --git a/src/ipc/stream_emitter/emit_metadata.rs b/src/ipc/stream_emitter/emit_metadata.rs new file mode 100644 index 00000000..57d77fab --- /dev/null +++ b/src/ipc/stream_emitter/emit_metadata.rs @@ -0,0 +1,101 @@ +use crate::context::Context; +use crate::error::Error; +use crate::event::EventType; +use crate::ipc::stream_emitter::emit_metadata_with_response::EmitMetadataWithResponse; +use crate::ipc::stream_emitter::emit_metadata_with_response_stream::EmitMetadataWithResponseStream; +use crate::ipc::stream_emitter::event_metadata::EventMetadata; +use crate::ipc::stream_emitter::SendStream; +use crate::payload::IntoPayload; +use crate::{error, poll_unwrap}; +use std::future::Future; +use std::ops::DerefMut; +use std::pin::Pin; +use std::task::Poll; +use tokio::io::AsyncWriteExt; + +/// A metadata object returned after emitting an event. +/// To send the event this object needs to be awaited +/// This object can be used to wait for a response to an event. +/// The result contains the emitted event id. +pub struct EmitMetadata { + pub(crate) event_metadata: Option>, + stream: Option, + fut: Option> + Send + Sync>>>, +} + +impl EmitMetadata

{ + #[inline] + pub(crate) fn new( + ctx: Context, + stream: SendStream, + event_name: String, + event_namespace: Option, + payload: P, + res_id: Option, + event_type: EventType, + ) -> Self { + Self { + event_metadata: Some(EventMetadata { + event: None, + ctx: Some(ctx), + event_name: Some(event_name), + event_namespace: Some(event_namespace), + payload: Some(payload), + res_id: Some(res_id), + event_type: Some(event_type), + }), + stream: Some(stream), + fut: None, + } + } + + /// Waits for a reply to the given message. + #[tracing::instrument(skip(self), fields(self.message_id))] + pub fn await_reply(self) -> EmitMetadataWithResponse

{ + EmitMetadataWithResponse { + timeout: None, + fut: None, + emit_metadata: Some(self), + } + } + + pub fn stream_replies(self) -> EmitMetadataWithResponseStream

{ + EmitMetadataWithResponseStream { + timeout: None, + fut: None, + emit_metadata: Some(self), + } + } +} + +impl Unpin for EmitMetadata

{} + +impl Future for EmitMetadata

{ + type Output = error::Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { + if self.fut.is_none() { + let event_metadata = poll_unwrap!(self.event_metadata.take()); + let stream = poll_unwrap!(self.stream.take()); + + let event = match event_metadata.take_event() { + Ok(m) => m, + Err(e) => { + return Poll::Ready(Err(e)); + } + } + .expect("poll after future was done"); + + self.fut = Some(Box::pin(async move { + let event_id = event.id(); + let event_bytes = event.into_bytes()?; + let mut stream = stream.lock().await; + stream.deref_mut().write_all(&event_bytes[..]).await?; + tracing::trace!(bytes_len = event_bytes.len()); + + Ok(event_id) + })); + } + self.fut.as_mut().unwrap().as_mut().poll(cx) + } +} diff --git a/src/ipc/stream_emitter/emit_metadata_with_response.rs b/src/ipc/stream_emitter/emit_metadata_with_response.rs new file mode 100644 index 00000000..29a8c2c0 --- /dev/null +++ b/src/ipc/stream_emitter/emit_metadata_with_response.rs @@ -0,0 +1,84 @@ +use crate::context::Context; +use crate::error::Error; +use crate::error_event::ErrorEventData; +use crate::event::{Event, EventType}; +use crate::ipc::stream_emitter::emit_metadata::EmitMetadata; +use crate::payload::IntoPayload; +use crate::{error, poll_unwrap}; +use std::future::Future; +use std::pin::Pin; +use std::task::Poll; +use std::time::Duration; + +/// A metadata object returned after waiting for a reply to an event +/// This object needs to be awaited for to get the actual reply +pub struct EmitMetadataWithResponse { + pub(crate) timeout: Option, + pub(crate) fut: Option> + Send + Sync>>>, + pub(crate) emit_metadata: Option>, +} + +impl EmitMetadataWithResponse

{ + /// Sets a timeout for awaiting replies to this emitted event + #[inline] + pub fn with_timeout(mut self, timeout: Duration) -> Self { + self.timeout = Some(timeout); + + self + } +} + +impl Unpin for EmitMetadataWithResponse

{} + +impl Future for EmitMetadataWithResponse

{ + type Output = error::Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { + if self.fut.is_none() { + let mut emit_metadata = poll_unwrap!(self.emit_metadata.take()); + let ctx = poll_unwrap!(emit_metadata + .event_metadata + .as_ref() + .and_then(|m| m.ctx.clone())); + let timeout = self + .timeout + .take() + .unwrap_or_else(|| ctx.default_reply_timeout.clone()); + + let event_id = match poll_unwrap!(emit_metadata.event_metadata.as_mut()).get_event() { + Ok(e) => e.id(), + Err(e) => { + return Poll::Ready(Err(e)); + } + }; + + self.fut = Some(Box::pin(async move { + let mut tx = ctx.register_reply_listener(event_id).await?; + emit_metadata.await?; + + let reply = tokio::select! { + tx_result = tx.recv() => { + Ok(tx_result.ok_or_else(|| Error::SendError)?) + } + _ = tokio::time::sleep(timeout) => { + Err(Error::Timeout) + } + }?; + + remove_reply_listener(&ctx, event_id).await; + + if reply.event_type() == EventType::Error { + Err(reply.payload::()?.into()) + } else { + Ok(reply) + } + })) + } + self.fut.as_mut().unwrap().as_mut().poll(cx) + } +} + +pub(crate) async fn remove_reply_listener(ctx: &Context, event_id: u64) { + let mut listeners = ctx.reply_listeners.lock().await; + listeners.remove(&event_id); +} diff --git a/src/ipc/stream_emitter/emit_metadata_with_response_stream.rs b/src/ipc/stream_emitter/emit_metadata_with_response_stream.rs new file mode 100644 index 00000000..a0288de4 --- /dev/null +++ b/src/ipc/stream_emitter/emit_metadata_with_response_stream.rs @@ -0,0 +1,151 @@ +use crate::context::Context; +use crate::error::{Error, Result}; +use crate::event::{Event, EventType}; +use crate::ipc::stream_emitter::emit_metadata::EmitMetadata; +use crate::ipc::stream_emitter::emit_metadata_with_response::remove_reply_listener; +use crate::payload::IntoPayload; +use crate::poll_unwrap; +use futures_core::Stream; +use std::future::Future; +use std::pin::Pin; +use std::task::Poll; +use std::time::Duration; +use tokio::sync::mpsc::Receiver; + +/// A metadata object returned after waiting for a reply to an event +/// This object needs to be awaited for to get the actual reply +pub struct EmitMetadataWithResponseStream { + pub(crate) timeout: Option, + pub(crate) fut: Option> + Send + Sync>>>, + pub(crate) emit_metadata: Option>, +} + +/// An asynchronous stream one can read all responses to a specific event from. +pub struct ResponseStream { + event_id: u64, + ctx: Option, + receiver: Option>, + timeout: Duration, + fut: Option, Context, Receiver)>>>>>, +} + +impl ResponseStream { + pub(crate) fn new( + event_id: u64, + timeout: Duration, + ctx: Context, + receiver: Receiver, + ) -> Self { + Self { + event_id, + ctx: Some(ctx), + receiver: Some(receiver), + timeout, + fut: None, + } + } +} + +impl Unpin for EmitMetadataWithResponseStream

{} + +impl EmitMetadataWithResponseStream

{ + /// Sets a timeout for awaiting replies to this emitted event + #[inline] + pub fn with_timeout(mut self, timeout: Duration) -> Self { + self.timeout = Some(timeout); + + self + } +} + +impl Future for EmitMetadataWithResponseStream

{ + type Output = Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { + if self.fut.is_none() { + let mut emit_metadata = poll_unwrap!(self.emit_metadata.take()); + let ctx = poll_unwrap!(emit_metadata + .event_metadata + .as_ref() + .and_then(|m| m.ctx.clone())); + let timeout = self + .timeout + .take() + .unwrap_or_else(|| ctx.default_reply_timeout.clone()); + + let event_id = match poll_unwrap!(emit_metadata.event_metadata.as_mut()).get_event() { + Ok(e) => e.id(), + Err(e) => { + return Poll::Ready(Err(e)); + } + }; + + self.fut = Some(Box::pin(async move { + let tx = ctx.register_reply_listener(event_id).await?; + emit_metadata.await?; + + Ok(ResponseStream::new(event_id, timeout, ctx, tx)) + })) + } + self.fut.as_mut().unwrap().as_mut().poll(cx) + } +} + +impl Unpin for ResponseStream {} + +impl Stream for ResponseStream { + type Item = Result; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + if self.fut.is_none() { + if self.ctx.is_none() || self.receiver.is_none() { + return Poll::Ready(None); + } + let ctx = self.ctx.take().unwrap(); + let mut receiver = self.receiver.take().unwrap(); + let timeout = self.timeout; + let event_id = self.event_id; + + self.fut = Some(Box::pin(async move { + let event: Option = tokio::select! { + tx_result = receiver.recv() => { + Ok(tx_result) + } + _ = tokio::time::sleep(timeout) => { + Err(Error::Timeout) + } + }?; + + if event.is_none() || event.as_ref().unwrap().event_type() == EventType::End { + remove_reply_listener(&ctx, event_id).await; + } + + Ok((event, ctx, receiver)) + })); + } + + match self.fut.as_mut().unwrap().as_mut().poll(cx) { + Poll::Ready(r) => match r { + Ok((event, ctx, tx)) => { + self.fut = None; + + if let Some(event) = event { + if event.event_type() != EventType::End { + self.ctx = Some(ctx); + self.receiver = Some(tx); + } + + Poll::Ready(Some(Ok(event))) + } else { + Poll::Ready(None) + } + } + Err(e) => Poll::Ready(Some(Err(e))), + }, + Poll::Pending => Poll::Pending, + } + } +} diff --git a/src/ipc/stream_emitter/event_metadata.rs b/src/ipc/stream_emitter/event_metadata.rs new file mode 100644 index 00000000..c9a834f3 --- /dev/null +++ b/src/ipc/stream_emitter/event_metadata.rs @@ -0,0 +1,54 @@ +use crate::context::Context; +use crate::error; +use crate::error::Error; +use crate::event::{Event, EventType}; +use crate::payload::IntoPayload; +use std::mem; + +pub(crate) struct EventMetadata { + pub(crate) event: Option, + pub(crate) ctx: Option, + pub(crate) event_namespace: Option>, + pub(crate) event_name: Option, + pub(crate) res_id: Option>, + pub(crate) event_type: Option, + pub(crate) payload: Option

, +} + +impl EventMetadata

{ + pub fn get_event(&mut self) -> error::Result<&Event> { + if self.event.is_none() { + self.build_event()?; + } + Ok(self.event.as_ref().unwrap()) + } + + pub fn take_event(mut self) -> error::Result> { + if self.event.is_none() { + self.build_event()?; + } + Ok(mem::take(&mut self.event)) + } + + fn build_event(&mut self) -> error::Result<()> { + let ctx = self.ctx.take().ok_or(Error::InvalidState)?; + let event = self.event_name.take().ok_or(Error::InvalidState)?; + let namespace = self.event_namespace.take().ok_or(Error::InvalidState)?; + let payload = self.payload.take().ok_or(Error::InvalidState)?; + let res_id = self.res_id.take().ok_or(Error::InvalidState)?; + let event_type = self.event_type.take().ok_or(Error::InvalidState)?; + let payload_bytes = payload.into_payload(&ctx)?; + + let event = Event::new( + namespace, + event.to_string(), + payload_bytes, + res_id, + event_type, + ); + + self.event = Some(event); + + Ok(()) + } +} diff --git a/src/ipc/stream_emitter/mod.rs b/src/ipc/stream_emitter/mod.rs new file mode 100644 index 00000000..bc0f1297 --- /dev/null +++ b/src/ipc/stream_emitter/mod.rs @@ -0,0 +1,159 @@ +pub mod emit_metadata; +pub mod emit_metadata_with_response; +pub mod emit_metadata_with_response_stream; +mod event_metadata; + +use std::sync::Arc; + +use tokio::io::AsyncWrite; +use tokio::sync::Mutex; +use tracing; + +use crate::event::EventType; +use crate::ipc::context::Context; +use crate::payload::IntoPayload; +use crate::protocol::AsyncProtocolStream; + +pub use emit_metadata_with_response_stream::ResponseStream; +use crate::prelude::emit_metadata::EmitMetadata; + +#[macro_export] +macro_rules! poll_unwrap { + ($val:expr) => { + if let Some(v) = $val { + v + } else { + tracing::error!("Polling a future with an invalid state."); + return Poll::Ready(Err(Error::InvalidState)); + } + }; +} + +type SendStream = Arc>; + +/// An abstraction over any type that implements the AsyncProtocolStream trait +/// to emit events and share a connection across multiple +/// contexts. +#[derive(Clone)] +pub struct StreamEmitter { + stream: SendStream, +} + +impl StreamEmitter { + pub fn new(stream: P::OwnedSplitWriteHalf) -> Self { + Self { + stream: Arc::new(Mutex::new(stream)), + } + } + + #[tracing::instrument(level = "trace", skip(self, ctx, payload))] + fn _emit( + &self, + ctx: Context, + namespace: Option, + event: &str, + payload: P, + res_id: Option, + event_type: EventType, + ) -> EmitMetadata

{ + EmitMetadata::new( + ctx, + self.stream.clone(), + event.to_string(), + namespace, + payload, + res_id, + event_type, + ) + } + + /// Emits an event + #[inline] + pub(crate) fn emit, P: IntoPayload>( + &self, + ctx: Context, + event: S, + payload: P, + ) -> EmitMetadata

{ + self._emit( + ctx, + None, + event.as_ref(), + payload, + None, + EventType::Initiator, + ) + } + + /// Emits an event to a specific namespace + #[inline] + pub(crate) fn emit_to, S2: AsRef, P: IntoPayload>( + &self, + ctx: Context, + namespace: S1, + event: S2, + payload: P, + ) -> EmitMetadata

{ + self._emit( + ctx, + Some(namespace.as_ref().to_string()), + event.as_ref(), + payload, + None, + EventType::Initiator, + ) + } + + /// Emits a raw event + #[inline] + pub(crate) fn emit_raw( + &self, + ctx: Context, + res_id: Option, + event: &str, + namespace: Option, + event_type: EventType, + payload: P, + ) -> EmitMetadata

{ + self._emit(ctx, namespace, event, payload, res_id, event_type) + } + + /// Emits a response to an event + #[inline] + pub(crate) fn emit_response, P: IntoPayload>( + &self, + ctx: Context, + event_id: u64, + event: S, + payload: P, + ) -> EmitMetadata

{ + self._emit( + ctx, + None, + event.as_ref(), + payload, + Some(event_id), + EventType::Response, + ) + } + + /// Emits a response to an event to a namespace + #[inline] + pub(crate) fn emit_response_to, S2: AsRef, P: IntoPayload>( + &self, + ctx: Context, + event_id: u64, + namespace: S1, + event: S2, + payload: P, + ) -> EmitMetadata

{ + self._emit( + ctx, + Some(namespace.as_ref().to_string()), + event.as_ref(), + payload, + Some(event_id), + EventType::Response, + ) + } +} diff --git a/src/lib.rs b/src/lib.rs index a251ef06..6ba5e22b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -6,19 +6,19 @@ //! use tokio::net::TcpListener; //! //! /// Callback ping function -//! async fn handle_ping(ctx: &Context, event: Event) -> IPCResult<()> { +//! async fn handle_ping(ctx: &Context, event: Event) -> IPCResult { //! println!("Received ping event."); //! ctx.emit("pong", ()).await?; //! -//! Ok(()) +//! Ok(Response::empty()) //! } //! //! pub struct MyNamespace; //! //! impl MyNamespace { -//! async fn ping(_ctx: &Context, _event: Event) -> IPCResult<()> { +//! async fn ping(_ctx: &Context, _event: Event) -> IPCResult { //! println!("My namespace received a ping"); -//! Ok(()) +//! Ok(Response::empty()) //! } //! } //! @@ -46,7 +46,7 @@ //! .on("something", callback!(ctx, event, async move { //! println!("I think the server did something"); //! ctx.emit_to("mainspace-server", "ok", ()).await?; -//! Ok(()) +//! Ok(Response::empty()) //! })) //! .build() //! .add_namespace(namespace!(MyNamespace)) @@ -63,7 +63,7 @@ //! use std::net::ToSocketAddrs; //! use typemap_rev::TypeMapKey; //! use bromine::IPCBuilder; -//! use bromine::callback; +//! use bromine::prelude::*; //! use tokio::net::TcpListener; //! //! struct MyKey; @@ -80,7 +80,7 @@ //! .on("ping", callback!(ctx, event, async move { //! println!("Received ping event."); //! ctx.emit("pong", ()).await?; -//! Ok(()) +//! Ok(Response::empty()) //! })) //! .namespace("mainspace-server") //! .on("do-something", callback!(ctx, event, async move { @@ -92,7 +92,7 @@ //! *my_key += 1; //! } //! ctx.emit_to("mainspace-client", "something", ()).await?; -//! Ok(()) +//! Ok(Response::empty()) //! })) //! .build() //! // store additional data @@ -134,9 +134,10 @@ pub mod prelude { pub use crate::error::Error as IPCError; pub use crate::error::Result as IPCResult; pub use crate::event::Event; - pub use crate::event_handler::EventHandler; + pub use crate::event_handler::{EventHandler, Response}; pub use crate::ipc::context::Context; pub use crate::ipc::context::{PoolGuard, PooledContext}; + pub use crate::ipc::stream_emitter::*; pub use crate::ipc::*; pub use crate::macros::*; pub use crate::namespace::Namespace; diff --git a/src/macros.rs b/src/macros.rs index 0107c03f..ff15f867 100644 --- a/src/macros.rs +++ b/src/macros.rs @@ -30,4 +30,4 @@ macro_rules! events{ $handler.on($name, callback!($cb)); )* } -} \ No newline at end of file +} diff --git a/src/namespaces/builder.rs b/src/namespaces/builder.rs index a048a053..aa5b0ebe 100644 --- a/src/namespaces/builder.rs +++ b/src/namespaces/builder.rs @@ -1,5 +1,6 @@ use crate::error::Result; use crate::event::Event; +use crate::event_handler::Response; use crate::events::event_handler::EventHandler; use crate::ipc::context::Context; use crate::namespaces::namespace::Namespace; @@ -32,7 +33,7 @@ where F: for<'a> Fn( &'a Context, Event, - ) -> Pin> + Send + 'a)>> + ) -> Pin> + Send + 'a)>> + Send + Sync, { diff --git a/tests/test_event_streams.rs b/tests/test_event_streams.rs new file mode 100644 index 00000000..5a8f2edb --- /dev/null +++ b/tests/test_event_streams.rs @@ -0,0 +1,88 @@ +use crate::utils::call_counter::{get_counter_from_context, increment_counter_for_event}; +use crate::utils::protocol::TestProtocolListener; +use crate::utils::{get_free_port, start_server_and_client}; +use bromine::prelude::*; +use byteorder::ReadBytesExt; +use futures::StreamExt; +use std::io::Read; +use std::time::Duration; + +mod utils; + +/// When awaiting the reply to an event the handler for the event doesn't get called. +/// Therefore we expect it to have a call count of 0. +#[tokio::test] +async fn it_receives_responses() { + let port = get_free_port(); + let ctx = get_client_with_server(port).await; + let mut reply_stream = ctx + .emit("stream", EmptyPayload) + .stream_replies() + .await + .unwrap(); + + let mut reply_stream_2 = ctx + .emit("stream", EmptyPayload) + .stream_replies() + .await + .unwrap(); + + for i in 0u8..=100 { + if let Some(Ok(event)) = reply_stream.next().await { + assert_eq!(event.payload::().unwrap().0, i) + } else { + panic!("stream 1 has no value {}", i); + } + if let Some(Ok(event)) = reply_stream_2.next().await { + assert_eq!(event.payload::().unwrap().0, i) + } else { + panic!("stream 2 has no value {}", i); + } + } + let counter = get_counter_from_context(&ctx).await; + assert_eq!(counter.get("stream").await, 2); +} + +async fn get_client_with_server(port: u8) -> Context { + start_server_and_client(move || get_builder(port)).await +} + +fn get_builder(port: u8) -> IPCBuilder { + IPCBuilder::new() + .address(port) + .timeout(Duration::from_millis(100)) + .on("stream", callback!(handle_stream_event)) +} + +async fn handle_stream_event(ctx: &Context, event: Event) -> IPCResult { + increment_counter_for_event(ctx, &event).await; + for i in 0u8..=99 { + ctx.emit("number", NumberPayload(i)).await?; + } + + ctx.response(NumberPayload(100)) +} + +pub struct EmptyPayload; + +impl IntoPayload for EmptyPayload { + fn into_payload(self, _: &Context) -> IPCResult> { + Ok(vec![]) + } +} + +pub struct NumberPayload(u8); + +impl IntoPayload for NumberPayload { + fn into_payload(self, _: &Context) -> IPCResult> { + Ok(vec![self.0]) + } +} + +impl FromPayload for NumberPayload { + fn from_payload(mut reader: R) -> IPCResult { + let num = reader.read_u8()?; + + Ok(NumberPayload(num)) + } +} diff --git a/tests/test_events_with_payload.rs b/tests/test_events_with_payload.rs index a00f8601..7c14ad7f 100644 --- a/tests/test_events_with_payload.rs +++ b/tests/test_events_with_payload.rs @@ -36,11 +36,7 @@ async fn it_receives_payloads() { number: 0, string: String::from("Hello World"), }; - let reply = ctx - .emit("ping", payload) - .await_reply() - .await - .unwrap(); + let reply = ctx.emit("ping", payload).await_reply().await.unwrap(); let reply_payload = reply.payload::().unwrap(); let counters = get_counter_from_context(&ctx).await; @@ -62,19 +58,19 @@ fn get_builder(port: u8) -> IPCBuilder { .timeout(Duration::from_millis(10)) } -async fn handle_ping_event(ctx: &Context, event: Event) -> IPCResult<()> { +async fn handle_ping_event(ctx: &Context, event: Event) -> IPCResult { increment_counter_for_event(ctx, &event).await; let payload = event.payload::()?; ctx.emit("pong", payload).await?; - Ok(()) + Ok(Response::empty()) } -async fn handle_pong_event(ctx: &Context, event: Event) -> IPCResult<()> { +async fn handle_pong_event(ctx: &Context, event: Event) -> IPCResult { increment_counter_for_event(ctx, &event).await; let _payload = event.payload::()?; - Ok(()) + Ok(Response::empty()) } #[cfg(feature = "serialize")] diff --git a/tests/test_raw_events.rs b/tests/test_raw_events.rs index e885b20d..c690f869 100644 --- a/tests/test_raw_events.rs +++ b/tests/test_raw_events.rs @@ -45,11 +45,7 @@ async fn it_sends_namespaced_events() { async fn it_receives_responses() { let port = get_free_port(); let ctx = get_client_with_server(port).await; - let reply = ctx - .emit("ping", EmptyPayload) - .await_reply() - .await - .unwrap(); + let reply = ctx.emit("ping", EmptyPayload).await_reply().await.unwrap(); let counter = get_counter_from_context(&ctx).await; assert_eq!(reply.name(), "pong"); @@ -108,29 +104,29 @@ fn get_builder(port: u8) -> IPCBuilder { .build() } -async fn handle_ping_event(ctx: &Context, event: Event) -> IPCResult<()> { +async fn handle_ping_event(ctx: &Context, event: Event) -> IPCResult { increment_counter_for_event(ctx, &event).await; ctx.emit("pong", EmptyPayload).await?; - Ok(()) + Ok(Response::empty()) } -async fn handle_pong_event(ctx: &Context, event: Event) -> IPCResult<()> { +async fn handle_pong_event(ctx: &Context, event: Event) -> IPCResult { increment_counter_for_event(ctx, &event).await; - Ok(()) + Ok(Response::empty()) } -async fn handle_create_error_event(ctx: &Context, event: Event) -> IPCResult<()> { +async fn handle_create_error_event(ctx: &Context, event: Event) -> IPCResult { increment_counter_for_event(ctx, &event).await; Err(IPCError::from("Test Error")) } -async fn handle_error_event(ctx: &Context, event: Event) -> IPCResult<()> { +async fn handle_error_event(ctx: &Context, event: Event) -> IPCResult { increment_counter_for_event(ctx, &event).await; - Ok(()) + Ok(Response::empty()) } pub struct EmptyPayload; diff --git a/tests/test_serialization.rs b/tests/test_serialization.rs index ac53c485..7aa71f07 100644 --- a/tests/test_serialization.rs +++ b/tests/test_serialization.rs @@ -2,6 +2,8 @@ use bromine::prelude::*; #[cfg(feature = "serialize")] use serde::{de::DeserializeOwned, Serialize}; + +#[cfg(feature = "serialize")] use std::fmt::Debug; #[cfg(feature = "serialize_rmp")]