Merge pull request #32 from Trivernis/develop

Stream Responses
main
Julius Riegel 3 years ago committed by GitHub
commit 891b688ff5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

44
Cargo.lock generated

@ -93,7 +93,7 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]] [[package]]
name = "bromine" name = "bromine"
version = "0.17.1" version = "0.18.0"
dependencies = [ dependencies = [
"async-trait", "async-trait",
"bincode", "bincode",
@ -101,7 +101,9 @@ dependencies = [
"criterion", "criterion",
"crossbeam-utils", "crossbeam-utils",
"futures", "futures",
"futures-core",
"lazy_static", "lazy_static",
"num_enum",
"postcard", "postcard",
"rmp-serde", "rmp-serde",
"serde", "serde",
@ -578,6 +580,27 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "num_enum"
version = "0.5.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "720d3ea1055e4e4574c0c0b0f8c3fd4f24c4cdaf465948206dea090b57b526ad"
dependencies = [
"num_enum_derive",
]
[[package]]
name = "num_enum_derive"
version = "0.5.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0d992b768490d7fe0d8586d9b5745f6c49f557da6d81dc982b1d167ad4edbb21"
dependencies = [
"proc-macro-crate",
"proc-macro2",
"quote",
"syn",
]
[[package]] [[package]]
name = "oorandom" name = "oorandom"
version = "11.1.3" version = "11.1.3"
@ -641,6 +664,16 @@ version = "0.1.5-pre"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7c68cb38ed13fd7bc9dd5db8f165b7c8d9c1a315104083a2b10f11354c2af97f" checksum = "7c68cb38ed13fd7bc9dd5db8f165b7c8d9c1a315104083a2b10f11354c2af97f"
[[package]]
name = "proc-macro-crate"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1ebace6889caf889b4d3f76becee12e90353f2b8c7d875534a71e5742f8f6f83"
dependencies = [
"thiserror",
"toml",
]
[[package]] [[package]]
name = "proc-macro2" name = "proc-macro2"
version = "1.0.35" version = "1.0.35"
@ -948,6 +981,15 @@ dependencies = [
"syn", "syn",
] ]
[[package]]
name = "toml"
version = "0.5.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a31142970826733df8241ef35dc040ef98c679ab14d7c3e54d827099b3acecaa"
dependencies = [
"serde",
]
[[package]] [[package]]
name = "tracing" name = "tracing"
version = "0.1.29" version = "0.1.29"

@ -1,6 +1,6 @@
[package] [package]
name = "bromine" name = "bromine"
version = "0.17.1" version = "0.18.0"
authors = ["trivernis <trivernis@protonmail.com>"] authors = ["trivernis <trivernis@protonmail.com>"]
edition = "2018" edition = "2018"
readme = "README.md" readme = "README.md"
@ -27,6 +27,8 @@ typemap_rev = "0.1.5"
byteorder = "1.4.3" byteorder = "1.4.3"
async-trait = "0.1.52" async-trait = "0.1.52"
futures = "0.3.19" futures = "0.3.19"
num_enum = "0.5.6"
futures-core = "0.3.19"
rmp-serde = {version = "0.15.5", optional = true} rmp-serde = {version = "0.15.5", optional = true}
bincode = {version = "1.3.3", optional = true} bincode = {version = "1.3.3", optional = true}
serde_json = {version = "1.0.73", optional = true} serde_json = {version = "1.0.73", optional = true}

@ -19,7 +19,7 @@ use tokio::net::TcpListener;
async fn handle_ping(ctx: &Context, event: Event) -> Result<()> { async fn handle_ping(ctx: &Context, event: Event) -> Result<()> {
println!("Received ping event."); println!("Received ping event.");
ctx.emit("pong", ()).await?; ctx.emit("pong", ()).await?;
Ok(()) Ok(Response::empty())
} }
#[tokio::main] #[tokio::main]
@ -31,8 +31,15 @@ async fn main() {
.on("ping", callback!(handle_ping)) .on("ping", callback!(handle_ping))
.build_client().await.unwrap(); .build_client().await.unwrap();
// emit an initial event // emit an event and wait for responses
let response = ctx.emit("ping", ()).await_response().await?; let response = ctx.emit("ping", ()).await_reply().await?;
// emit an event and get all responses as stream
let stream = ctx.emit("ping", ()).stream_replies().await?;
while let Some(Ok(event)) = stream.next().await {
println!("{}", event.name());
}
} }
``` ```
@ -50,7 +57,10 @@ async fn main() {
// register callback // register callback
.on("ping", callback!(ctx, event, async move { .on("ping", callback!(ctx, event, async move {
println!("Received ping event."); println!("Received ping event.");
Ok(()) for _ in 0..10 {
ctx.emit("pong", ()).await?;
}
Ok(Response::empty())
})) }))
.build_server().await.unwrap(); .build_server().await.unwrap();
} }

@ -9,7 +9,7 @@ use tokio::runtime::Runtime;
pub const EVENT_NAME: &str = "bench_event"; pub const EVENT_NAME: &str = "bench_event";
fn create_event_bytes_reader(data_size: usize) -> Cursor<Vec<u8>> { fn create_event_bytes_reader(data_size: usize) -> Cursor<Vec<u8>> {
let bytes = Event::new(EVENT_NAME.to_string(), vec![0u8; data_size], None) let bytes = Event::initiator(None, EVENT_NAME.to_string(), vec![0u8; data_size])
.into_bytes() .into_bytes()
.unwrap(); .unwrap();
Cursor::new(bytes) Cursor::new(bytes)

@ -6,7 +6,7 @@ use criterion::{
pub const EVENT_NAME: &str = "bench_event"; pub const EVENT_NAME: &str = "bench_event";
fn create_event(data_size: usize) -> Event { fn create_event(data_size: usize) -> Event {
Event::new(EVENT_NAME.to_string(), vec![0u8; data_size], None) Event::initiator(None, EVENT_NAME.to_string(), vec![0u8; data_size])
} }
fn event_serialization(c: &mut Criterion) { fn event_serialization(c: &mut Criterion) {

@ -8,6 +8,7 @@ use std::fmt::{Display, Formatter};
use std::io::Read; use std::io::Read;
pub static ERROR_EVENT_NAME: &str = "error"; pub static ERROR_EVENT_NAME: &str = "error";
pub static END_EVENT_NAME: &str = "end";
/// Data returned on error event. /// Data returned on error event.
/// The error event has a default handler that just logs that /// The error event has a default handler that just logs that

@ -2,6 +2,8 @@ use crate::error::{Error, Result};
use crate::events::generate_event_id; use crate::events::generate_event_id;
use crate::events::payload::FromPayload; use crate::events::payload::FromPayload;
use byteorder::{BigEndian, ReadBytesExt}; use byteorder::{BigEndian, ReadBytesExt};
use num_enum::{IntoPrimitive, TryFromPrimitive};
use std::convert::TryFrom;
use std::fmt::Debug; use std::fmt::Debug;
use std::io::{Cursor, Read}; use std::io::{Cursor, Read};
use tokio::io::{AsyncRead, AsyncReadExt}; use tokio::io::{AsyncRead, AsyncReadExt};
@ -20,36 +22,65 @@ pub struct Event {
#[derive(Debug)] #[derive(Debug)]
struct EventHeader { struct EventHeader {
id: u64, id: u64,
event_type: EventType,
ref_id: Option<u64>, ref_id: Option<u64>,
namespace: Option<String>, namespace: Option<String>,
name: String, name: String,
} }
#[derive(Clone, Debug, TryFromPrimitive, IntoPrimitive, Copy, Ord, PartialOrd, Eq, PartialEq)]
#[repr(u8)]
pub enum EventType {
Initiator,
Response,
End,
Error,
}
impl Event { impl Event {
/// Creates a new event with a namespace /// Creates a new event that acts as an initiator for further response events
#[tracing::instrument(level = "trace", skip(data))] #[tracing::instrument(level = "trace", skip(data))]
pub fn with_namespace( #[inline]
namespace: String, pub fn initiator(namespace: Option<String>, name: String, data: Vec<u8>) -> Self {
name: String, Self::new(namespace, name, data, None, EventType::Initiator)
data: Vec<u8>, }
ref_id: Option<u64>,
) -> Self { /// Creates a new event that is a response to a previous event
let header = EventHeader { #[tracing::instrument(level = "trace", skip(data))]
id: generate_event_id(), #[inline]
ref_id, pub fn response(namespace: Option<String>, name: String, data: Vec<u8>, ref_id: u64) -> Self {
namespace: Some(namespace), Self::new(namespace, name, data, Some(ref_id), EventType::Response)
name, }
};
Self { header, data } /// Creates a new error event as a response to a previous event
#[tracing::instrument(level = "trace", skip(data))]
#[inline]
pub fn error(namespace: Option<String>, name: String, data: Vec<u8>, ref_id: u64) -> Self {
Self::new(namespace, name, data, Some(ref_id), EventType::Error)
}
/// Creates a new event that indicates the end of a series of responses (in an event handler)
/// and might contain a final response payload
#[tracing::instrument(level = "trace", skip(data))]
#[inline]
pub fn end(namespace: Option<String>, name: String, data: Vec<u8>, ref_id: u64) -> Self {
Self::new(namespace, name, data, Some(ref_id), EventType::Response)
} }
/// Creates a new event /// Creates a new event
#[tracing::instrument(level = "trace", skip(data))] #[tracing::instrument(level = "trace", skip(data))]
pub fn new(name: String, data: Vec<u8>, ref_id: Option<u64>) -> Self { pub(crate) fn new(
namespace: Option<String>,
name: String,
data: Vec<u8>,
ref_id: Option<u64>,
event_type: EventType,
) -> Self {
let header = EventHeader { let header = EventHeader {
id: generate_event_id(), id: generate_event_id(),
event_type,
ref_id, ref_id,
namespace: None, namespace,
name, name,
}; };
Self { header, data } Self { header, data }
@ -61,6 +92,12 @@ impl Event {
self.header.id self.header.id
} }
/// The type of the event
#[inline]
pub fn event_type(&self) -> EventType {
self.header.event_type
}
/// The ID of the message referenced by this message. /// The ID of the message referenced by this message.
/// It represents the message that is replied to and can be None. /// It represents the message that is replied to and can be None.
#[inline] #[inline]
@ -139,6 +176,7 @@ impl EventHeader {
pub fn into_bytes(self) -> Vec<u8> { pub fn into_bytes(self) -> Vec<u8> {
let mut buf = FORMAT_VERSION.to_vec(); let mut buf = FORMAT_VERSION.to_vec();
buf.append(&mut self.id.to_be_bytes().to_vec()); buf.append(&mut self.id.to_be_bytes().to_vec());
buf.push(self.event_type.into());
if let Some(ref_id) = self.ref_id { if let Some(ref_id) = self.ref_id {
buf.push(0xFF); buf.push(0xFF);
@ -164,6 +202,8 @@ impl EventHeader {
pub fn from_read<R: Read>(reader: &mut R) -> Result<Self> { pub fn from_read<R: Read>(reader: &mut R) -> Result<Self> {
Self::read_version(reader)?; Self::read_version(reader)?;
let id = reader.read_u64::<BigEndian>()?; let id = reader.read_u64::<BigEndian>()?;
let event_type_num = reader.read_u8()?;
let event_type = EventType::try_from(event_type_num).map_err(|_| Error::CorruptedEvent)?;
let ref_id = Self::read_ref_id(reader)?; let ref_id = Self::read_ref_id(reader)?;
let namespace_len = reader.read_u16::<BigEndian>()?; let namespace_len = reader.read_u16::<BigEndian>()?;
let namespace = Self::read_namespace(reader, namespace_len)?; let namespace = Self::read_namespace(reader, namespace_len)?;
@ -171,6 +211,7 @@ impl EventHeader {
Ok(Self { Ok(Self {
id, id,
event_type,
ref_id, ref_id,
namespace, namespace,
name, name,

@ -1,14 +1,38 @@
use crate::error::Result; use crate::error::Result;
use crate::events::event::Event; use crate::events::event::Event;
use crate::ipc::context::Context; use crate::ipc::context::Context;
use crate::payload::{BytePayload, IntoPayload};
use std::collections::HashMap; use std::collections::HashMap;
use std::fmt::{Debug, Formatter}; use std::fmt::{Debug, Formatter};
use std::future::Future; use std::future::Future;
use std::pin::Pin; use std::pin::Pin;
use std::sync::Arc; use std::sync::Arc;
pub struct Response(Vec<u8>);
impl Response {
/// Creates a new response with a given payload
pub fn payload<P: IntoPayload>(ctx: &Context, payload: P) -> Result<Self> {
let bytes = payload.into_payload(ctx)?;
Ok(Self(bytes))
}
/// Creates an empty response
pub fn empty() -> Self {
Self(vec![])
}
pub(crate) fn into_byte_payload(self) -> BytePayload {
BytePayload::new(self.0)
}
}
type EventCallback = Arc< type EventCallback = Arc<
dyn for<'a> Fn(&'a Context, Event) -> Pin<Box<(dyn Future<Output = Result<()>> + Send + 'a)>> dyn for<'a> Fn(
&'a Context,
Event,
) -> Pin<Box<(dyn Future<Output = Result<Response>> + Send + 'a)>>
+ Send + Send
+ Sync, + Sync,
>; >;
@ -46,7 +70,7 @@ impl EventHandler {
F: for<'a> Fn( F: for<'a> Fn(
&'a Context, &'a Context,
Event, Event,
) -> Pin<Box<(dyn Future<Output = Result<()>> + Send + 'a)>> ) -> Pin<Box<(dyn Future<Output = Result<Response>> + Send + 'a)>>
+ Send + Send
+ Sync, + Sync,
{ {
@ -56,11 +80,11 @@ impl EventHandler {
/// Handles a received event /// Handles a received event
#[inline] #[inline]
#[tracing::instrument(level = "debug", skip(self, ctx, event))] #[tracing::instrument(level = "debug", skip(self, ctx, event))]
pub async fn handle_event(&self, ctx: &Context, event: Event) -> Result<()> { pub async fn handle_event(&self, ctx: &Context, event: Event) -> Result<Response> {
if let Some(cb) = self.callbacks.get(event.name()) { if let Some(cb) = self.callbacks.get(event.name()) {
cb.as_ref()(ctx, event).await?; cb.as_ref()(ctx, event).await
} else {
Ok(Response::empty())
} }
Ok(())
} }
} }

@ -10,7 +10,6 @@ pub mod payload;
#[cfg(feature = "serialize")] #[cfg(feature = "serialize")]
pub mod payload_serializer; pub mod payload_serializer;
/// Generates a new event id /// Generates a new event id
pub(crate) fn generate_event_id() -> u64 { pub(crate) fn generate_event_id() -> u64 {
lazy_static::lazy_static! { lazy_static::lazy_static! {

@ -1,5 +1,7 @@
use crate::error::{Error, Result}; use crate::error::{Error, Result};
use crate::events::error_event::{ErrorEventData, ERROR_EVENT_NAME}; use crate::error_event::ErrorEventData;
use crate::event_handler::Response;
use crate::events::error_event::ERROR_EVENT_NAME;
use crate::events::event::Event; use crate::events::event::Event;
use crate::events::event_handler::EventHandler; use crate::events::event_handler::EventHandler;
use crate::ipc::client::IPCClient; use crate::ipc::client::IPCClient;
@ -24,6 +26,7 @@ use typemap_rev::{TypeMap, TypeMapKey};
/// use typemap_rev::TypeMapKey; /// use typemap_rev::TypeMapKey;
/// use bromine::IPCBuilder; /// use bromine::IPCBuilder;
/// use tokio::net::TcpListener; /// use tokio::net::TcpListener;
/// use bromine::prelude::Response;
/// ///
/// struct CustomKey; /// struct CustomKey;
/// ///
@ -37,13 +40,13 @@ use typemap_rev::{TypeMap, TypeMapKey};
/// // register callback /// // register callback
/// .on("ping", |_ctx, _event| Box::pin(async move { /// .on("ping", |_ctx, _event| Box::pin(async move {
/// println!("Received ping event."); /// println!("Received ping event.");
/// Ok(()) /// Ok(Response::empty())
/// })) /// }))
/// // register a namespace /// // register a namespace
/// .namespace("namespace") /// .namespace("namespace")
/// .on("namespace-event", |_ctx, _event| Box::pin(async move { /// .on("namespace-event", |_ctx, _event| Box::pin(async move {
/// println!("Namespace event."); /// println!("Namespace event.");
/// Ok(()) /// Ok(Response::empty())
/// })) /// }))
/// .build() /// .build()
/// // add context shared data /// // add context shared data
@ -75,7 +78,7 @@ where
tracing::warn!(error_data.code); tracing::warn!(error_data.code);
tracing::warn!("error_data.message = '{}'", error_data.message); tracing::warn!("error_data.message = '{}'", error_data.message);
Ok(()) Ok(Response::empty())
}) })
}); });
Self { Self {
@ -102,7 +105,7 @@ where
F: for<'a> Fn( F: for<'a> Fn(
&'a Context, &'a Context,
Event, Event,
) -> Pin<Box<(dyn Future<Output = Result<()>> + Send + 'a)>> ) -> Pin<Box<(dyn Future<Output = Result<Response>> + Send + 'a)>>
+ Send + Send
+ Sync, + Sync,
{ {

@ -1,22 +1,24 @@
use std::collections::HashMap; use std::collections::HashMap;
use std::mem; use std::mem;
use std::ops::{Deref, DerefMut}; use std::ops::{Deref, DerefMut};
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use tokio::sync::{Mutex, oneshot, RwLock}; use tokio::sync::mpsc::Receiver;
use tokio::sync::oneshot::{Sender, Receiver}; use tokio::sync::{mpsc, oneshot, Mutex, RwLock};
use tokio::time::Duration; use tokio::time::Duration;
use typemap_rev::TypeMap; use typemap_rev::TypeMap;
use crate::error::{Error, Result}; use crate::error::{Error, Result};
use crate::event::Event; use crate::event::{Event, EventType};
use crate::ipc::stream_emitter::{EmitMetadata, StreamEmitter}; use crate::ipc::stream_emitter::emit_metadata::EmitMetadata;
use crate::ipc::stream_emitter::StreamEmitter;
use crate::payload::IntoPayload;
#[cfg(feature = "serialize")] #[cfg(feature = "serialize")]
use crate::payload::{DynamicSerializer, SerdePayload}; use crate::payload::{DynamicSerializer, SerdePayload};
use crate::payload::IntoPayload; use crate::prelude::Response;
pub(crate) type ReplyListeners = Arc<Mutex<HashMap<u64, oneshot::Sender<Event>>>>; pub(crate) type ReplyListeners = Arc<Mutex<HashMap<u64, mpsc::Sender<Event>>>>;
/// An object provided to each callback function. /// An object provided to each callback function.
/// Currently it only holds the event emitter to emit response events in event callbacks. /// Currently it only holds the event emitter to emit response events in event callbacks.
@ -38,7 +40,7 @@ pub struct Context {
/// Field to store additional context data /// Field to store additional context data
pub data: Arc<RwLock<TypeMap>>, pub data: Arc<RwLock<TypeMap>>,
stop_sender: Arc<Mutex<Option<Sender<()>>>>, stop_sender: Arc<Mutex<Option<oneshot::Sender<()>>>>,
pub(crate) reply_listeners: ReplyListeners, pub(crate) reply_listeners: ReplyListeners,
@ -54,7 +56,7 @@ impl Context {
pub(crate) fn new( pub(crate) fn new(
emitter: StreamEmitter, emitter: StreamEmitter,
data: Arc<RwLock<TypeMap>>, data: Arc<RwLock<TypeMap>>,
stop_sender: Option<Sender<()>>, stop_sender: Option<oneshot::Sender<()>>,
reply_listeners: ReplyListeners, reply_listeners: ReplyListeners,
reply_timeout: Duration, reply_timeout: Duration,
#[cfg(feature = "serialize")] default_serializer: DynamicSerializer, #[cfg(feature = "serialize")] default_serializer: DynamicSerializer,
@ -71,12 +73,26 @@ impl Context {
} }
} }
/// Emits an event with a given payload that can be serialized into bytes /// Emits a raw event. Only for internal use
pub fn emit<S: AsRef<str>, P: IntoPayload>( pub(crate) fn emit_raw<P: IntoPayload>(
&self, &self,
name: S, name: &str,
namespace: Option<String>,
event_type: EventType,
payload: P, payload: P,
) -> EmitMetadata<P> { ) -> EmitMetadata<P> {
self.emitter.emit_raw(
self.clone(),
self.ref_id.clone(),
name,
namespace,
event_type,
payload,
)
}
/// Emits an event with a given payload that can be serialized into bytes
pub fn emit<S: AsRef<str>, P: IntoPayload>(&self, name: S, payload: P) -> EmitMetadata<P> {
if let Some(ref_id) = &self.ref_id { if let Some(ref_id) = &self.ref_id {
self.emitter self.emitter
.emit_response(self.clone(), *ref_id, name, payload) .emit_response(self.clone(), *ref_id, name, payload)
@ -100,11 +116,16 @@ impl Context {
} }
} }
/// Ends the event flow by creating a final response
pub fn response<P: IntoPayload>(&self, payload: P) -> Result<Response> {
Response::payload(self, payload)
}
/// Registers a reply listener for a given event /// Registers a reply listener for a given event
#[inline] #[inline]
#[tracing::instrument(level = "debug", skip(self))] #[tracing::instrument(level = "debug", skip(self))]
pub(crate) async fn register_reply_listener(&self, event_id: u64) -> Result<Receiver<Event>> { pub(crate) async fn register_reply_listener(&self, event_id: u64) -> Result<Receiver<Event>> {
let (rx, tx) = oneshot::channel(); let (rx, tx) = mpsc::channel(8);
{ {
let mut listeners = self.reply_listeners.lock().await; let mut listeners = self.reply_listeners.lock().await;
listeners.insert(event_id, rx); listeners.insert(event_id, rx);
@ -132,9 +153,9 @@ impl Context {
/// Returns the channel for a reply to the given message id /// Returns the channel for a reply to the given message id
#[inline] #[inline]
pub(crate) async fn get_reply_sender(&self, ref_id: u64) -> Option<oneshot::Sender<Event>> { pub(crate) async fn get_reply_sender(&self, ref_id: u64) -> Option<mpsc::Sender<Event>> {
let mut listeners = self.reply_listeners.lock().await; let listeners = self.reply_listeners.lock().await;
listeners.remove(&ref_id) listeners.get(&ref_id).cloned()
} }
#[inline] #[inline]
@ -149,16 +170,16 @@ pub struct PooledContext {
} }
pub struct PoolGuard<T> pub struct PoolGuard<T>
where where
T: Clone, T: Clone,
{ {
inner: T, inner: T,
count: Arc<AtomicUsize>, count: Arc<AtomicUsize>,
} }
impl<T> Deref for PoolGuard<T> impl<T> Deref for PoolGuard<T>
where where
T: Clone, T: Clone,
{ {
type Target = T; type Target = T;
@ -169,8 +190,8 @@ impl<T> Deref for PoolGuard<T>
} }
impl<T> DerefMut for PoolGuard<T> impl<T> DerefMut for PoolGuard<T>
where where
T: Clone, T: Clone,
{ {
#[inline] #[inline]
fn deref_mut(&mut self) -> &mut Self::Target { fn deref_mut(&mut self) -> &mut Self::Target {
@ -179,8 +200,8 @@ impl<T> DerefMut for PoolGuard<T>
} }
impl<T> Clone for PoolGuard<T> impl<T> Clone for PoolGuard<T>
where where
T: Clone, T: Clone,
{ {
#[inline] #[inline]
fn clone(&self) -> Self { fn clone(&self) -> Self {
@ -194,8 +215,8 @@ impl<T> Clone for PoolGuard<T>
} }
impl<T> Drop for PoolGuard<T> impl<T> Drop for PoolGuard<T>
where where
T: Clone, T: Clone,
{ {
#[inline] #[inline]
fn drop(&mut self) { fn drop(&mut self) {
@ -204,8 +225,8 @@ impl<T> Drop for PoolGuard<T>
} }
impl<T> PoolGuard<T> impl<T> PoolGuard<T>
where where
T: Clone, T: Clone,
{ {
pub(crate) fn new(inner: T) -> Self { pub(crate) fn new(inner: T) -> Self {
Self { Self {

@ -1,7 +1,8 @@
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use crate::error_event::{ERROR_EVENT_NAME, ErrorEventData}; use crate::error_event::{ErrorEventData, END_EVENT_NAME, ERROR_EVENT_NAME};
use crate::event::EventType;
use crate::events::event_handler::EventHandler; use crate::events::event_handler::EventHandler;
use crate::namespaces::namespace::Namespace; use crate::namespaces::namespace::Namespace;
use crate::prelude::*; use crate::prelude::*;
@ -33,13 +34,19 @@ async fn handle_connection<S: 'static + AsyncProtocolStream>(
// get the listener for replies // get the listener for replies
if let Some(sender) = ctx.get_reply_sender(ref_id).await { if let Some(sender) = ctx.get_reply_sender(ref_id).await {
// try sending the event to the listener for replies // try sending the event to the listener for replies
if let Err(event) = sender.send(event) { if let Err(event) = sender.send(event).await {
handle_event(Context::clone(&ctx), Arc::clone(&handler), event); handle_event(Context::clone(&ctx), Arc::clone(&handler), event.0);
} }
continue; continue;
} }
tracing::trace!("No response listener found for event. Passing to regular listener."); tracing::trace!("No response listener found for event. Passing to regular listener.");
} }
if event.event_type() == EventType::End {
tracing::debug!("Received dangling end event with no listener");
continue;
}
if let Some(namespace) = event.namespace().clone().and_then(|n| namespaces.get(&n)) { if let Some(namespace) = event.namespace().clone().and_then(|n| namespaces.get(&n)) {
tracing::trace!("Passing event to namespace listener"); tracing::trace!("Passing event to namespace listener");
let handler = Arc::clone(&namespace.handler); let handler = Arc::clone(&namespace.handler);
@ -55,23 +62,41 @@ async fn handle_connection<S: 'static + AsyncProtocolStream>(
/// Handles a single event in a different tokio context /// Handles a single event in a different tokio context
fn handle_event(mut ctx: Context, handler: Arc<EventHandler>, event: Event) { fn handle_event(mut ctx: Context, handler: Arc<EventHandler>, event: Event) {
ctx.set_ref_id(Some(event.id())); ctx.set_ref_id(Some(event.id()));
let event_id = event.id();
tokio::spawn(async move { tokio::spawn(async move {
if let Err(e) = handler.handle_event(&ctx, event).await { match handler.handle_event(&ctx, event).await {
// emit an error event Ok(r) => {
if let Err(e) = ctx // emit the response under a unique name to prevent it being interpreted as a new
.emit( // event initiator
ERROR_EVENT_NAME, if let Err(e) = ctx
ErrorEventData { .emit_raw(END_EVENT_NAME, None, EventType::End, r.into_byte_payload())
message: format!("{:?}", e), .await
code: 500, {
}, tracing::error!("Error occurred when sending error response: {:?}", e);
).await }
{ let mut reply_listeners = ctx.reply_listeners.lock().await;
tracing::error!("Error occurred when sending error response: {:?}", e); reply_listeners.remove(&event_id);
} }
Err(e) => {
// emit an error event
if let Err(e) = ctx
.emit_raw(
ERROR_EVENT_NAME,
None,
EventType::Error,
ErrorEventData {
message: format!("{:?}", e),
code: 500,
},
)
.await
{
tracing::error!("Error occurred when sending error response: {:?}", e);
}
tracing::error!("Failed to handle event: {:?}", e); tracing::error!("Failed to handle event: {:?}", e);
}
} }
}); });
} }

@ -1,288 +0,0 @@
use std::future::Future;
use std::mem;
use std::ops::DerefMut;
use std::pin::Pin;
use std::sync::Arc;
use std::task::Poll;
use std::time::Duration;
use futures::future;
use futures::future::Either;
use tokio::io::{AsyncWrite, AsyncWriteExt};
use tokio::sync::Mutex;
use tracing;
use crate::error::{Error, Result};
use crate::error_event::{ERROR_EVENT_NAME, ErrorEventData};
use crate::events::event::Event;
use crate::ipc::context::Context;
use crate::payload::IntoPayload;
use crate::protocol::AsyncProtocolStream;
macro_rules! poll_unwrap {
($val:expr) => {
if let Some(v) = $val {
v
} else {
tracing::error!("Polling a future with an invalid state.");
return Poll::Ready(Err(Error::InvalidState))
}
}
}
type SendStream = Arc<Mutex<dyn AsyncWrite + Send + Sync + Unpin + 'static>>;
/// An abstraction over any type that implements the AsyncProtocolStream trait
/// to emit events and share a connection across multiple
/// contexts.
#[derive(Clone)]
pub struct StreamEmitter {
stream: SendStream,
}
impl StreamEmitter {
pub fn new<P: AsyncProtocolStream + 'static>(stream: P::OwnedSplitWriteHalf) -> Self {
Self {
stream: Arc::new(Mutex::new(stream)),
}
}
#[tracing::instrument(level = "trace", skip(self, ctx, payload))]
fn _emit<P: IntoPayload>(
&self,
ctx: Context,
namespace: Option<&str>,
event: &str,
payload: P,
res_id: Option<u64>,
) -> EmitMetadata<P> {
EmitMetadata::new(ctx, self.stream.clone(), event.to_string(), namespace.map(|n| n.to_string()), payload, res_id)
}
/// Emits an event
#[inline]
pub(crate) fn emit<S: AsRef<str>, P: IntoPayload>(
&self,
ctx: Context,
event: S,
payload: P,
) -> EmitMetadata<P> {
self._emit(ctx, None, event.as_ref(), payload, None)
}
/// Emits an event to a specific namespace
#[inline]
pub(crate) fn emit_to<S1: AsRef<str>, S2: AsRef<str>, P: IntoPayload>(
&self,
ctx: Context,
namespace: S1,
event: S2,
payload: P,
) -> EmitMetadata<P> {
self._emit(ctx, Some(namespace.as_ref()), event.as_ref(), payload, None)
}
/// Emits a response to an event
#[inline]
pub(crate) fn emit_response<S: AsRef<str>, P: IntoPayload>(
&self,
ctx: Context,
event_id: u64,
event: S,
payload: P,
) -> EmitMetadata<P> {
self._emit(ctx, None, event.as_ref(), payload, Some(event_id))
}
/// Emits a response to an event to a namespace
#[inline]
pub(crate) fn emit_response_to<S1: AsRef<str>, S2: AsRef<str>, P: IntoPayload>(
&self,
ctx: Context,
event_id: u64,
namespace: S1,
event: S2,
payload: P,
) -> EmitMetadata<P> {
self._emit(
ctx,
Some(namespace.as_ref()),
event.as_ref(),
payload,
Some(event_id),
)
}
}
struct EventMetadata<P: IntoPayload> {
event: Option<Event>,
ctx: Option<Context>,
event_namespace: Option<Option<String>>,
event_name: Option<String>,
res_id: Option<Option<u64>>,
payload: Option<P>,
}
impl<P: IntoPayload> EventMetadata<P> {
pub fn get_event(&mut self) -> Result<&Event> {
if self.event.is_none() {
self.build_event()?;
}
Ok(self.event.as_ref().unwrap())
}
pub fn take_event(mut self) -> Result<Option<Event>> {
if self.event.is_none() {
self.build_event()?;
}
Ok(mem::take(&mut self.event))
}
fn build_event(&mut self) -> Result<()> {
let ctx = self.ctx.take().ok_or(Error::InvalidState)?;
let event = self.event_name.take().ok_or(Error::InvalidState)?;
let namespace = self.event_namespace.take().ok_or(Error::InvalidState)?;
let payload = self.payload.take().ok_or(Error::InvalidState)?;
let res_id = self.res_id.take().ok_or(Error::InvalidState)?;
let payload_bytes = payload.into_payload(&ctx)?;
let event = if let Some(namespace) = namespace {
Event::with_namespace(namespace.to_string(), event.to_string(), payload_bytes, res_id)
} else {
Event::new(event.to_string(), payload_bytes, res_id)
};
self.event = Some(event);
Ok(())
}
}
/// A metadata object returned after emitting an event.
/// To send the event this object needs to be awaited
/// This object can be used to wait for a response to an event.
/// The result contains the emitted event id.
pub struct EmitMetadata<P: IntoPayload> {
event_metadata: Option<EventMetadata<P>>,
stream: Option<SendStream>,
fut: Option<Pin<Box<dyn Future<Output=Result<u64>> + Send + Sync>>>,
}
/// A metadata object returned after waiting for a reply to an event
/// This object needs to be awaited for to get the actual reply
pub struct EmitMetadataWithResponse<P: IntoPayload> {
timeout: Option<Duration>,
fut: Option<Pin<Box<dyn Future<Output=Result<Event>> + Send + Sync>>>,
emit_metadata: Option<EmitMetadata<P>>,
}
impl<P: IntoPayload> EmitMetadata<P> {
#[inline]
pub(crate) fn new(ctx: Context, stream: SendStream, event_name: String, event_namespace: Option<String>, payload: P, res_id: Option<u64>) -> Self {
Self {
event_metadata: Some(EventMetadata {
event: None,
ctx: Some(ctx),
event_name: Some(event_name),
event_namespace: Some(event_namespace),
payload: Some(payload),
res_id: Some(res_id),
}),
stream: Some(stream),
fut: None,
}
}
/// Waits for a reply to the given message.
#[tracing::instrument(skip(self), fields(self.message_id))]
pub fn await_reply(self) -> EmitMetadataWithResponse<P> {
EmitMetadataWithResponse {
timeout: None,
fut: None,
emit_metadata: Some(self),
}
}
}
impl<P: IntoPayload> EmitMetadataWithResponse<P> {
/// Sets a timeout for awaiting replies to this emitted event
#[inline]
pub fn with_timeout(mut self, timeout: Duration) -> Self {
self.timeout = Some(timeout);
self
}
}
impl<P: IntoPayload> Unpin for EmitMetadata<P> {}
impl<P: IntoPayload> Unpin for EmitMetadataWithResponse<P> {}
impl<P: IntoPayload + Send + Sync + 'static> Future for EmitMetadata<P> {
type Output = Result<u64>;
fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
if self.fut.is_none() {
let event_metadata = poll_unwrap!(self.event_metadata.take());
let stream = poll_unwrap!(self.stream.take());
let event = match event_metadata.take_event() {
Ok(m) => { m }
Err(e) => { return Poll::Ready(Err(e)); }
}.expect("poll after future was done");
self.fut = Some(Box::pin(async move {
let event_id = event.id();
let event_bytes = event.into_bytes()?;
let mut stream = stream.lock().await;
stream.deref_mut().write_all(&event_bytes[..]).await?;
tracing::trace!(bytes_len = event_bytes.len());
Ok(event_id)
}));
}
self.fut.as_mut().unwrap().as_mut().poll(cx)
}
}
impl<P: IntoPayload + Send + Sync + 'static> Future for EmitMetadataWithResponse<P> {
type Output = Result<Event>;
fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
if self.fut.is_none() {
let mut emit_metadata = poll_unwrap!(self.emit_metadata.take());
let ctx = poll_unwrap!(emit_metadata.event_metadata.as_ref().and_then(|m| m.ctx.clone()));
let timeout = self.timeout.clone().unwrap_or(ctx.default_reply_timeout.clone());
let event_id = match poll_unwrap!(emit_metadata.event_metadata.as_mut()).get_event() {
Ok(e) => { e.id() }
Err(e) => { return Poll::Ready(Err(e)); }
};
self.fut = Some(Box::pin(async move {
let tx = ctx.register_reply_listener(event_id).await?;
emit_metadata.await?;
let result = future::select(
Box::pin(tx),
Box::pin(tokio::time::sleep(timeout)),
)
.await;
let reply = match result {
Either::Left((tx_result, _)) => Ok(tx_result?),
Either::Right(_) => {
let mut listeners = ctx.reply_listeners.lock().await;
listeners.remove(&event_id);
Err(Error::Timeout)
}
}?;
if reply.name() == ERROR_EVENT_NAME {
Err(reply.payload::<ErrorEventData>()?.into())
} else {
Ok(reply)
}
}))
}
self.fut.as_mut().unwrap().as_mut().poll(cx)
}
}

@ -0,0 +1,101 @@
use crate::context::Context;
use crate::error::Error;
use crate::event::EventType;
use crate::ipc::stream_emitter::emit_metadata_with_response::EmitMetadataWithResponse;
use crate::ipc::stream_emitter::emit_metadata_with_response_stream::EmitMetadataWithResponseStream;
use crate::ipc::stream_emitter::event_metadata::EventMetadata;
use crate::ipc::stream_emitter::SendStream;
use crate::payload::IntoPayload;
use crate::{error, poll_unwrap};
use std::future::Future;
use std::ops::DerefMut;
use std::pin::Pin;
use std::task::Poll;
use tokio::io::AsyncWriteExt;
/// A metadata object returned after emitting an event.
/// To send the event this object needs to be awaited
/// This object can be used to wait for a response to an event.
/// The result contains the emitted event id.
pub struct EmitMetadata<P: IntoPayload> {
pub(crate) event_metadata: Option<EventMetadata<P>>,
stream: Option<SendStream>,
fut: Option<Pin<Box<dyn Future<Output = error::Result<u64>> + Send + Sync>>>,
}
impl<P: IntoPayload> EmitMetadata<P> {
#[inline]
pub(crate) fn new(
ctx: Context,
stream: SendStream,
event_name: String,
event_namespace: Option<String>,
payload: P,
res_id: Option<u64>,
event_type: EventType,
) -> Self {
Self {
event_metadata: Some(EventMetadata {
event: None,
ctx: Some(ctx),
event_name: Some(event_name),
event_namespace: Some(event_namespace),
payload: Some(payload),
res_id: Some(res_id),
event_type: Some(event_type),
}),
stream: Some(stream),
fut: None,
}
}
/// Waits for a reply to the given message.
#[tracing::instrument(skip(self), fields(self.message_id))]
pub fn await_reply(self) -> EmitMetadataWithResponse<P> {
EmitMetadataWithResponse {
timeout: None,
fut: None,
emit_metadata: Some(self),
}
}
pub fn stream_replies(self) -> EmitMetadataWithResponseStream<P> {
EmitMetadataWithResponseStream {
timeout: None,
fut: None,
emit_metadata: Some(self),
}
}
}
impl<P: IntoPayload> Unpin for EmitMetadata<P> {}
impl<P: IntoPayload + Send + Sync + 'static> Future for EmitMetadata<P> {
type Output = error::Result<u64>;
fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
if self.fut.is_none() {
let event_metadata = poll_unwrap!(self.event_metadata.take());
let stream = poll_unwrap!(self.stream.take());
let event = match event_metadata.take_event() {
Ok(m) => m,
Err(e) => {
return Poll::Ready(Err(e));
}
}
.expect("poll after future was done");
self.fut = Some(Box::pin(async move {
let event_id = event.id();
let event_bytes = event.into_bytes()?;
let mut stream = stream.lock().await;
stream.deref_mut().write_all(&event_bytes[..]).await?;
tracing::trace!(bytes_len = event_bytes.len());
Ok(event_id)
}));
}
self.fut.as_mut().unwrap().as_mut().poll(cx)
}
}

@ -0,0 +1,84 @@
use crate::context::Context;
use crate::error::Error;
use crate::error_event::ErrorEventData;
use crate::event::{Event, EventType};
use crate::ipc::stream_emitter::emit_metadata::EmitMetadata;
use crate::payload::IntoPayload;
use crate::{error, poll_unwrap};
use std::future::Future;
use std::pin::Pin;
use std::task::Poll;
use std::time::Duration;
/// A metadata object returned after waiting for a reply to an event
/// This object needs to be awaited for to get the actual reply
pub struct EmitMetadataWithResponse<P: IntoPayload> {
pub(crate) timeout: Option<Duration>,
pub(crate) fut: Option<Pin<Box<dyn Future<Output = error::Result<Event>> + Send + Sync>>>,
pub(crate) emit_metadata: Option<EmitMetadata<P>>,
}
impl<P: IntoPayload> EmitMetadataWithResponse<P> {
/// Sets a timeout for awaiting replies to this emitted event
#[inline]
pub fn with_timeout(mut self, timeout: Duration) -> Self {
self.timeout = Some(timeout);
self
}
}
impl<P: IntoPayload> Unpin for EmitMetadataWithResponse<P> {}
impl<P: IntoPayload + Send + Sync + 'static> Future for EmitMetadataWithResponse<P> {
type Output = error::Result<Event>;
fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
if self.fut.is_none() {
let mut emit_metadata = poll_unwrap!(self.emit_metadata.take());
let ctx = poll_unwrap!(emit_metadata
.event_metadata
.as_ref()
.and_then(|m| m.ctx.clone()));
let timeout = self
.timeout
.take()
.unwrap_or_else(|| ctx.default_reply_timeout.clone());
let event_id = match poll_unwrap!(emit_metadata.event_metadata.as_mut()).get_event() {
Ok(e) => e.id(),
Err(e) => {
return Poll::Ready(Err(e));
}
};
self.fut = Some(Box::pin(async move {
let mut tx = ctx.register_reply_listener(event_id).await?;
emit_metadata.await?;
let reply = tokio::select! {
tx_result = tx.recv() => {
Ok(tx_result.ok_or_else(|| Error::SendError)?)
}
_ = tokio::time::sleep(timeout) => {
Err(Error::Timeout)
}
}?;
remove_reply_listener(&ctx, event_id).await;
if reply.event_type() == EventType::Error {
Err(reply.payload::<ErrorEventData>()?.into())
} else {
Ok(reply)
}
}))
}
self.fut.as_mut().unwrap().as_mut().poll(cx)
}
}
pub(crate) async fn remove_reply_listener(ctx: &Context, event_id: u64) {
let mut listeners = ctx.reply_listeners.lock().await;
listeners.remove(&event_id);
}

@ -0,0 +1,151 @@
use crate::context::Context;
use crate::error::{Error, Result};
use crate::event::{Event, EventType};
use crate::ipc::stream_emitter::emit_metadata::EmitMetadata;
use crate::ipc::stream_emitter::emit_metadata_with_response::remove_reply_listener;
use crate::payload::IntoPayload;
use crate::poll_unwrap;
use futures_core::Stream;
use std::future::Future;
use std::pin::Pin;
use std::task::Poll;
use std::time::Duration;
use tokio::sync::mpsc::Receiver;
/// A metadata object returned after waiting for a reply to an event
/// This object needs to be awaited for to get the actual reply
pub struct EmitMetadataWithResponseStream<P: IntoPayload> {
pub(crate) timeout: Option<Duration>,
pub(crate) fut: Option<Pin<Box<dyn Future<Output = Result<ResponseStream>> + Send + Sync>>>,
pub(crate) emit_metadata: Option<EmitMetadata<P>>,
}
/// An asynchronous stream one can read all responses to a specific event from.
pub struct ResponseStream {
event_id: u64,
ctx: Option<Context>,
receiver: Option<Receiver<Event>>,
timeout: Duration,
fut: Option<Pin<Box<dyn Future<Output = Result<(Option<Event>, Context, Receiver<Event>)>>>>>,
}
impl ResponseStream {
pub(crate) fn new(
event_id: u64,
timeout: Duration,
ctx: Context,
receiver: Receiver<Event>,
) -> Self {
Self {
event_id,
ctx: Some(ctx),
receiver: Some(receiver),
timeout,
fut: None,
}
}
}
impl<P: IntoPayload> Unpin for EmitMetadataWithResponseStream<P> {}
impl<P: IntoPayload> EmitMetadataWithResponseStream<P> {
/// Sets a timeout for awaiting replies to this emitted event
#[inline]
pub fn with_timeout(mut self, timeout: Duration) -> Self {
self.timeout = Some(timeout);
self
}
}
impl<P: IntoPayload + Send + Sync + 'static> Future for EmitMetadataWithResponseStream<P> {
type Output = Result<ResponseStream>;
fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
if self.fut.is_none() {
let mut emit_metadata = poll_unwrap!(self.emit_metadata.take());
let ctx = poll_unwrap!(emit_metadata
.event_metadata
.as_ref()
.and_then(|m| m.ctx.clone()));
let timeout = self
.timeout
.take()
.unwrap_or_else(|| ctx.default_reply_timeout.clone());
let event_id = match poll_unwrap!(emit_metadata.event_metadata.as_mut()).get_event() {
Ok(e) => e.id(),
Err(e) => {
return Poll::Ready(Err(e));
}
};
self.fut = Some(Box::pin(async move {
let tx = ctx.register_reply_listener(event_id).await?;
emit_metadata.await?;
Ok(ResponseStream::new(event_id, timeout, ctx, tx))
}))
}
self.fut.as_mut().unwrap().as_mut().poll(cx)
}
}
impl Unpin for ResponseStream {}
impl Stream for ResponseStream {
type Item = Result<Event>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
if self.fut.is_none() {
if self.ctx.is_none() || self.receiver.is_none() {
return Poll::Ready(None);
}
let ctx = self.ctx.take().unwrap();
let mut receiver = self.receiver.take().unwrap();
let timeout = self.timeout;
let event_id = self.event_id;
self.fut = Some(Box::pin(async move {
let event: Option<Event> = tokio::select! {
tx_result = receiver.recv() => {
Ok(tx_result)
}
_ = tokio::time::sleep(timeout) => {
Err(Error::Timeout)
}
}?;
if event.is_none() || event.as_ref().unwrap().event_type() == EventType::End {
remove_reply_listener(&ctx, event_id).await;
}
Ok((event, ctx, receiver))
}));
}
match self.fut.as_mut().unwrap().as_mut().poll(cx) {
Poll::Ready(r) => match r {
Ok((event, ctx, tx)) => {
self.fut = None;
if let Some(event) = event {
if event.event_type() != EventType::End {
self.ctx = Some(ctx);
self.receiver = Some(tx);
}
Poll::Ready(Some(Ok(event)))
} else {
Poll::Ready(None)
}
}
Err(e) => Poll::Ready(Some(Err(e))),
},
Poll::Pending => Poll::Pending,
}
}
}

@ -0,0 +1,54 @@
use crate::context::Context;
use crate::error;
use crate::error::Error;
use crate::event::{Event, EventType};
use crate::payload::IntoPayload;
use std::mem;
pub(crate) struct EventMetadata<P: IntoPayload> {
pub(crate) event: Option<Event>,
pub(crate) ctx: Option<Context>,
pub(crate) event_namespace: Option<Option<String>>,
pub(crate) event_name: Option<String>,
pub(crate) res_id: Option<Option<u64>>,
pub(crate) event_type: Option<EventType>,
pub(crate) payload: Option<P>,
}
impl<P: IntoPayload> EventMetadata<P> {
pub fn get_event(&mut self) -> error::Result<&Event> {
if self.event.is_none() {
self.build_event()?;
}
Ok(self.event.as_ref().unwrap())
}
pub fn take_event(mut self) -> error::Result<Option<Event>> {
if self.event.is_none() {
self.build_event()?;
}
Ok(mem::take(&mut self.event))
}
fn build_event(&mut self) -> error::Result<()> {
let ctx = self.ctx.take().ok_or(Error::InvalidState)?;
let event = self.event_name.take().ok_or(Error::InvalidState)?;
let namespace = self.event_namespace.take().ok_or(Error::InvalidState)?;
let payload = self.payload.take().ok_or(Error::InvalidState)?;
let res_id = self.res_id.take().ok_or(Error::InvalidState)?;
let event_type = self.event_type.take().ok_or(Error::InvalidState)?;
let payload_bytes = payload.into_payload(&ctx)?;
let event = Event::new(
namespace,
event.to_string(),
payload_bytes,
res_id,
event_type,
);
self.event = Some(event);
Ok(())
}
}

@ -0,0 +1,159 @@
pub mod emit_metadata;
pub mod emit_metadata_with_response;
pub mod emit_metadata_with_response_stream;
mod event_metadata;
use std::sync::Arc;
use tokio::io::AsyncWrite;
use tokio::sync::Mutex;
use tracing;
use crate::event::EventType;
use crate::ipc::context::Context;
use crate::payload::IntoPayload;
use crate::protocol::AsyncProtocolStream;
pub use emit_metadata_with_response_stream::ResponseStream;
use crate::prelude::emit_metadata::EmitMetadata;
#[macro_export]
macro_rules! poll_unwrap {
($val:expr) => {
if let Some(v) = $val {
v
} else {
tracing::error!("Polling a future with an invalid state.");
return Poll::Ready(Err(Error::InvalidState));
}
};
}
type SendStream = Arc<Mutex<dyn AsyncWrite + Send + Sync + Unpin + 'static>>;
/// An abstraction over any type that implements the AsyncProtocolStream trait
/// to emit events and share a connection across multiple
/// contexts.
#[derive(Clone)]
pub struct StreamEmitter {
stream: SendStream,
}
impl StreamEmitter {
pub fn new<P: AsyncProtocolStream + 'static>(stream: P::OwnedSplitWriteHalf) -> Self {
Self {
stream: Arc::new(Mutex::new(stream)),
}
}
#[tracing::instrument(level = "trace", skip(self, ctx, payload))]
fn _emit<P: IntoPayload>(
&self,
ctx: Context,
namespace: Option<String>,
event: &str,
payload: P,
res_id: Option<u64>,
event_type: EventType,
) -> EmitMetadata<P> {
EmitMetadata::new(
ctx,
self.stream.clone(),
event.to_string(),
namespace,
payload,
res_id,
event_type,
)
}
/// Emits an event
#[inline]
pub(crate) fn emit<S: AsRef<str>, P: IntoPayload>(
&self,
ctx: Context,
event: S,
payload: P,
) -> EmitMetadata<P> {
self._emit(
ctx,
None,
event.as_ref(),
payload,
None,
EventType::Initiator,
)
}
/// Emits an event to a specific namespace
#[inline]
pub(crate) fn emit_to<S1: AsRef<str>, S2: AsRef<str>, P: IntoPayload>(
&self,
ctx: Context,
namespace: S1,
event: S2,
payload: P,
) -> EmitMetadata<P> {
self._emit(
ctx,
Some(namespace.as_ref().to_string()),
event.as_ref(),
payload,
None,
EventType::Initiator,
)
}
/// Emits a raw event
#[inline]
pub(crate) fn emit_raw<P: IntoPayload>(
&self,
ctx: Context,
res_id: Option<u64>,
event: &str,
namespace: Option<String>,
event_type: EventType,
payload: P,
) -> EmitMetadata<P> {
self._emit(ctx, namespace, event, payload, res_id, event_type)
}
/// Emits a response to an event
#[inline]
pub(crate) fn emit_response<S: AsRef<str>, P: IntoPayload>(
&self,
ctx: Context,
event_id: u64,
event: S,
payload: P,
) -> EmitMetadata<P> {
self._emit(
ctx,
None,
event.as_ref(),
payload,
Some(event_id),
EventType::Response,
)
}
/// Emits a response to an event to a namespace
#[inline]
pub(crate) fn emit_response_to<S1: AsRef<str>, S2: AsRef<str>, P: IntoPayload>(
&self,
ctx: Context,
event_id: u64,
namespace: S1,
event: S2,
payload: P,
) -> EmitMetadata<P> {
self._emit(
ctx,
Some(namespace.as_ref().to_string()),
event.as_ref(),
payload,
Some(event_id),
EventType::Response,
)
}
}

@ -6,19 +6,19 @@
//! use tokio::net::TcpListener; //! use tokio::net::TcpListener;
//! //!
//! /// Callback ping function //! /// Callback ping function
//! async fn handle_ping(ctx: &Context, event: Event) -> IPCResult<()> { //! async fn handle_ping(ctx: &Context, event: Event) -> IPCResult<Response> {
//! println!("Received ping event."); //! println!("Received ping event.");
//! ctx.emit("pong", ()).await?; //! ctx.emit("pong", ()).await?;
//! //!
//! Ok(()) //! Ok(Response::empty())
//! } //! }
//! //!
//! pub struct MyNamespace; //! pub struct MyNamespace;
//! //!
//! impl MyNamespace { //! impl MyNamespace {
//! async fn ping(_ctx: &Context, _event: Event) -> IPCResult<()> { //! async fn ping(_ctx: &Context, _event: Event) -> IPCResult<Response> {
//! println!("My namespace received a ping"); //! println!("My namespace received a ping");
//! Ok(()) //! Ok(Response::empty())
//! } //! }
//! } //! }
//! //!
@ -46,7 +46,7 @@
//! .on("something", callback!(ctx, event, async move { //! .on("something", callback!(ctx, event, async move {
//! println!("I think the server did something"); //! println!("I think the server did something");
//! ctx.emit_to("mainspace-server", "ok", ()).await?; //! ctx.emit_to("mainspace-server", "ok", ()).await?;
//! Ok(()) //! Ok(Response::empty())
//! })) //! }))
//! .build() //! .build()
//! .add_namespace(namespace!(MyNamespace)) //! .add_namespace(namespace!(MyNamespace))
@ -63,7 +63,7 @@
//! use std::net::ToSocketAddrs; //! use std::net::ToSocketAddrs;
//! use typemap_rev::TypeMapKey; //! use typemap_rev::TypeMapKey;
//! use bromine::IPCBuilder; //! use bromine::IPCBuilder;
//! use bromine::callback; //! use bromine::prelude::*;
//! use tokio::net::TcpListener; //! use tokio::net::TcpListener;
//! //!
//! struct MyKey; //! struct MyKey;
@ -80,7 +80,7 @@
//! .on("ping", callback!(ctx, event, async move { //! .on("ping", callback!(ctx, event, async move {
//! println!("Received ping event."); //! println!("Received ping event.");
//! ctx.emit("pong", ()).await?; //! ctx.emit("pong", ()).await?;
//! Ok(()) //! Ok(Response::empty())
//! })) //! }))
//! .namespace("mainspace-server") //! .namespace("mainspace-server")
//! .on("do-something", callback!(ctx, event, async move { //! .on("do-something", callback!(ctx, event, async move {
@ -92,7 +92,7 @@
//! *my_key += 1; //! *my_key += 1;
//! } //! }
//! ctx.emit_to("mainspace-client", "something", ()).await?; //! ctx.emit_to("mainspace-client", "something", ()).await?;
//! Ok(()) //! Ok(Response::empty())
//! })) //! }))
//! .build() //! .build()
//! // store additional data //! // store additional data
@ -134,9 +134,10 @@ pub mod prelude {
pub use crate::error::Error as IPCError; pub use crate::error::Error as IPCError;
pub use crate::error::Result as IPCResult; pub use crate::error::Result as IPCResult;
pub use crate::event::Event; pub use crate::event::Event;
pub use crate::event_handler::EventHandler; pub use crate::event_handler::{EventHandler, Response};
pub use crate::ipc::context::Context; pub use crate::ipc::context::Context;
pub use crate::ipc::context::{PoolGuard, PooledContext}; pub use crate::ipc::context::{PoolGuard, PooledContext};
pub use crate::ipc::stream_emitter::*;
pub use crate::ipc::*; pub use crate::ipc::*;
pub use crate::macros::*; pub use crate::macros::*;
pub use crate::namespace::Namespace; pub use crate::namespace::Namespace;

@ -30,4 +30,4 @@ macro_rules! events{
$handler.on($name, callback!($cb)); $handler.on($name, callback!($cb));
)* )*
} }
} }

@ -1,5 +1,6 @@
use crate::error::Result; use crate::error::Result;
use crate::event::Event; use crate::event::Event;
use crate::event_handler::Response;
use crate::events::event_handler::EventHandler; use crate::events::event_handler::EventHandler;
use crate::ipc::context::Context; use crate::ipc::context::Context;
use crate::namespaces::namespace::Namespace; use crate::namespaces::namespace::Namespace;
@ -32,7 +33,7 @@ where
F: for<'a> Fn( F: for<'a> Fn(
&'a Context, &'a Context,
Event, Event,
) -> Pin<Box<(dyn Future<Output = Result<()>> + Send + 'a)>> ) -> Pin<Box<(dyn Future<Output = Result<Response>> + Send + 'a)>>
+ Send + Send
+ Sync, + Sync,
{ {

@ -0,0 +1,88 @@
use crate::utils::call_counter::{get_counter_from_context, increment_counter_for_event};
use crate::utils::protocol::TestProtocolListener;
use crate::utils::{get_free_port, start_server_and_client};
use bromine::prelude::*;
use byteorder::ReadBytesExt;
use futures::StreamExt;
use std::io::Read;
use std::time::Duration;
mod utils;
/// When awaiting the reply to an event the handler for the event doesn't get called.
/// Therefore we expect it to have a call count of 0.
#[tokio::test]
async fn it_receives_responses() {
let port = get_free_port();
let ctx = get_client_with_server(port).await;
let mut reply_stream = ctx
.emit("stream", EmptyPayload)
.stream_replies()
.await
.unwrap();
let mut reply_stream_2 = ctx
.emit("stream", EmptyPayload)
.stream_replies()
.await
.unwrap();
for i in 0u8..=100 {
if let Some(Ok(event)) = reply_stream.next().await {
assert_eq!(event.payload::<NumberPayload>().unwrap().0, i)
} else {
panic!("stream 1 has no value {}", i);
}
if let Some(Ok(event)) = reply_stream_2.next().await {
assert_eq!(event.payload::<NumberPayload>().unwrap().0, i)
} else {
panic!("stream 2 has no value {}", i);
}
}
let counter = get_counter_from_context(&ctx).await;
assert_eq!(counter.get("stream").await, 2);
}
async fn get_client_with_server(port: u8) -> Context {
start_server_and_client(move || get_builder(port)).await
}
fn get_builder(port: u8) -> IPCBuilder<TestProtocolListener> {
IPCBuilder::new()
.address(port)
.timeout(Duration::from_millis(100))
.on("stream", callback!(handle_stream_event))
}
async fn handle_stream_event(ctx: &Context, event: Event) -> IPCResult<Response> {
increment_counter_for_event(ctx, &event).await;
for i in 0u8..=99 {
ctx.emit("number", NumberPayload(i)).await?;
}
ctx.response(NumberPayload(100))
}
pub struct EmptyPayload;
impl IntoPayload for EmptyPayload {
fn into_payload(self, _: &Context) -> IPCResult<Vec<u8>> {
Ok(vec![])
}
}
pub struct NumberPayload(u8);
impl IntoPayload for NumberPayload {
fn into_payload(self, _: &Context) -> IPCResult<Vec<u8>> {
Ok(vec![self.0])
}
}
impl FromPayload for NumberPayload {
fn from_payload<R: Read>(mut reader: R) -> IPCResult<Self> {
let num = reader.read_u8()?;
Ok(NumberPayload(num))
}
}

@ -36,11 +36,7 @@ async fn it_receives_payloads() {
number: 0, number: 0,
string: String::from("Hello World"), string: String::from("Hello World"),
}; };
let reply = ctx let reply = ctx.emit("ping", payload).await_reply().await.unwrap();
.emit("ping", payload)
.await_reply()
.await
.unwrap();
let reply_payload = reply.payload::<SimplePayload>().unwrap(); let reply_payload = reply.payload::<SimplePayload>().unwrap();
let counters = get_counter_from_context(&ctx).await; let counters = get_counter_from_context(&ctx).await;
@ -62,19 +58,19 @@ fn get_builder(port: u8) -> IPCBuilder<TestProtocolListener> {
.timeout(Duration::from_millis(10)) .timeout(Duration::from_millis(10))
} }
async fn handle_ping_event(ctx: &Context, event: Event) -> IPCResult<()> { async fn handle_ping_event(ctx: &Context, event: Event) -> IPCResult<Response> {
increment_counter_for_event(ctx, &event).await; increment_counter_for_event(ctx, &event).await;
let payload = event.payload::<SimplePayload>()?; let payload = event.payload::<SimplePayload>()?;
ctx.emit("pong", payload).await?; ctx.emit("pong", payload).await?;
Ok(()) Ok(Response::empty())
} }
async fn handle_pong_event(ctx: &Context, event: Event) -> IPCResult<()> { async fn handle_pong_event(ctx: &Context, event: Event) -> IPCResult<Response> {
increment_counter_for_event(ctx, &event).await; increment_counter_for_event(ctx, &event).await;
let _payload = event.payload::<SimplePayload>()?; let _payload = event.payload::<SimplePayload>()?;
Ok(()) Ok(Response::empty())
} }
#[cfg(feature = "serialize")] #[cfg(feature = "serialize")]

@ -45,11 +45,7 @@ async fn it_sends_namespaced_events() {
async fn it_receives_responses() { async fn it_receives_responses() {
let port = get_free_port(); let port = get_free_port();
let ctx = get_client_with_server(port).await; let ctx = get_client_with_server(port).await;
let reply = ctx let reply = ctx.emit("ping", EmptyPayload).await_reply().await.unwrap();
.emit("ping", EmptyPayload)
.await_reply()
.await
.unwrap();
let counter = get_counter_from_context(&ctx).await; let counter = get_counter_from_context(&ctx).await;
assert_eq!(reply.name(), "pong"); assert_eq!(reply.name(), "pong");
@ -108,29 +104,29 @@ fn get_builder(port: u8) -> IPCBuilder<TestProtocolListener> {
.build() .build()
} }
async fn handle_ping_event(ctx: &Context, event: Event) -> IPCResult<()> { async fn handle_ping_event(ctx: &Context, event: Event) -> IPCResult<Response> {
increment_counter_for_event(ctx, &event).await; increment_counter_for_event(ctx, &event).await;
ctx.emit("pong", EmptyPayload).await?; ctx.emit("pong", EmptyPayload).await?;
Ok(()) Ok(Response::empty())
} }
async fn handle_pong_event(ctx: &Context, event: Event) -> IPCResult<()> { async fn handle_pong_event(ctx: &Context, event: Event) -> IPCResult<Response> {
increment_counter_for_event(ctx, &event).await; increment_counter_for_event(ctx, &event).await;
Ok(()) Ok(Response::empty())
} }
async fn handle_create_error_event(ctx: &Context, event: Event) -> IPCResult<()> { async fn handle_create_error_event(ctx: &Context, event: Event) -> IPCResult<Response> {
increment_counter_for_event(ctx, &event).await; increment_counter_for_event(ctx, &event).await;
Err(IPCError::from("Test Error")) Err(IPCError::from("Test Error"))
} }
async fn handle_error_event(ctx: &Context, event: Event) -> IPCResult<()> { async fn handle_error_event(ctx: &Context, event: Event) -> IPCResult<Response> {
increment_counter_for_event(ctx, &event).await; increment_counter_for_event(ctx, &event).await;
Ok(()) Ok(Response::empty())
} }
pub struct EmptyPayload; pub struct EmptyPayload;

@ -2,6 +2,8 @@
use bromine::prelude::*; use bromine::prelude::*;
#[cfg(feature = "serialize")] #[cfg(feature = "serialize")]
use serde::{de::DeserializeOwned, Serialize}; use serde::{de::DeserializeOwned, Serialize};
#[cfg(feature = "serialize")]
use std::fmt::Debug; use std::fmt::Debug;
#[cfg(feature = "serialize_rmp")] #[cfg(feature = "serialize_rmp")]

Loading…
Cancel
Save