Add asynchronous response streams

Signed-off-by: trivernis <trivernis@protonmail.com>
pull/32/head
trivernis 3 years ago
parent 619a0173f0
commit 9cc7d1ffe8
Signed by: Trivernis
GPG Key ID: DFFFCC2C7A02DB45

1
Cargo.lock generated

@ -101,6 +101,7 @@ dependencies = [
"criterion", "criterion",
"crossbeam-utils", "crossbeam-utils",
"futures", "futures",
"futures-core",
"lazy_static", "lazy_static",
"num_enum", "num_enum",
"postcard", "postcard",

@ -28,6 +28,7 @@ byteorder = "1.4.3"
async-trait = "0.1.52" async-trait = "0.1.52"
futures = "0.3.19" futures = "0.3.19"
num_enum = "0.5.6" num_enum = "0.5.6"
futures-core = "0.3.19"
rmp-serde = {version = "0.15.5", optional = true} rmp-serde = {version = "0.15.5", optional = true}
bincode = {version = "1.3.3", optional = true} bincode = {version = "1.3.3", optional = true}
serde_json = {version = "1.0.73", optional = true} serde_json = {version = "1.0.73", optional = true}

@ -4,21 +4,21 @@ use std::ops::{Deref, DerefMut};
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::oneshot::{Receiver, Sender}; use tokio::sync::mpsc::Receiver;
use tokio::sync::{Mutex, oneshot, RwLock}; use tokio::sync::{mpsc, oneshot, Mutex, RwLock};
use tokio::time::Duration; use tokio::time::Duration;
use typemap_rev::TypeMap; use typemap_rev::TypeMap;
use crate::error::{Error, Result}; use crate::error::{Error, Result};
use crate::event::{Event, EventType}; use crate::event::{Event, EventType};
use crate::ipc::stream_emitter::StreamEmitter;
use crate::ipc::stream_emitter::emit_metadata::EmitMetadata; use crate::ipc::stream_emitter::emit_metadata::EmitMetadata;
use crate::ipc::stream_emitter::StreamEmitter;
use crate::payload::IntoPayload; use crate::payload::IntoPayload;
#[cfg(feature = "serialize")] #[cfg(feature = "serialize")]
use crate::payload::{DynamicSerializer, SerdePayload}; use crate::payload::{DynamicSerializer, SerdePayload};
use crate::prelude::Response; use crate::prelude::Response;
pub(crate) type ReplyListeners = Arc<Mutex<HashMap<u64, oneshot::Sender<Event>>>>; pub(crate) type ReplyListeners = Arc<Mutex<HashMap<u64, mpsc::Sender<Event>>>>;
/// An object provided to each callback function. /// An object provided to each callback function.
/// Currently it only holds the event emitter to emit response events in event callbacks. /// Currently it only holds the event emitter to emit response events in event callbacks.
@ -40,7 +40,7 @@ pub struct Context {
/// Field to store additional context data /// Field to store additional context data
pub data: Arc<RwLock<TypeMap>>, pub data: Arc<RwLock<TypeMap>>,
stop_sender: Arc<Mutex<Option<Sender<()>>>>, stop_sender: Arc<Mutex<Option<oneshot::Sender<()>>>>,
pub(crate) reply_listeners: ReplyListeners, pub(crate) reply_listeners: ReplyListeners,
@ -56,7 +56,7 @@ impl Context {
pub(crate) fn new( pub(crate) fn new(
emitter: StreamEmitter, emitter: StreamEmitter,
data: Arc<RwLock<TypeMap>>, data: Arc<RwLock<TypeMap>>,
stop_sender: Option<Sender<()>>, stop_sender: Option<oneshot::Sender<()>>,
reply_listeners: ReplyListeners, reply_listeners: ReplyListeners,
reply_timeout: Duration, reply_timeout: Duration,
#[cfg(feature = "serialize")] default_serializer: DynamicSerializer, #[cfg(feature = "serialize")] default_serializer: DynamicSerializer,
@ -125,7 +125,7 @@ impl Context {
#[inline] #[inline]
#[tracing::instrument(level = "debug", skip(self))] #[tracing::instrument(level = "debug", skip(self))]
pub(crate) async fn register_reply_listener(&self, event_id: u64) -> Result<Receiver<Event>> { pub(crate) async fn register_reply_listener(&self, event_id: u64) -> Result<Receiver<Event>> {
let (rx, tx) = oneshot::channel(); let (rx, tx) = mpsc::channel(8);
{ {
let mut listeners = self.reply_listeners.lock().await; let mut listeners = self.reply_listeners.lock().await;
listeners.insert(event_id, rx); listeners.insert(event_id, rx);
@ -153,9 +153,9 @@ impl Context {
/// Returns the channel for a reply to the given message id /// Returns the channel for a reply to the given message id
#[inline] #[inline]
pub(crate) async fn get_reply_sender(&self, ref_id: u64) -> Option<oneshot::Sender<Event>> { pub(crate) async fn get_reply_sender(&self, ref_id: u64) -> Option<mpsc::Sender<Event>> {
let mut listeners = self.reply_listeners.lock().await; let listeners = self.reply_listeners.lock().await;
listeners.remove(&ref_id) listeners.get(&ref_id).cloned()
} }
#[inline] #[inline]

@ -34,8 +34,8 @@ async fn handle_connection<S: 'static + AsyncProtocolStream>(
// get the listener for replies // get the listener for replies
if let Some(sender) = ctx.get_reply_sender(ref_id).await { if let Some(sender) = ctx.get_reply_sender(ref_id).await {
// try sending the event to the listener for replies // try sending the event to the listener for replies
if let Err(event) = sender.send(event) { if let Err(event) = sender.send(event).await {
handle_event(Context::clone(&ctx), Arc::clone(&handler), event); handle_event(Context::clone(&ctx), Arc::clone(&handler), event.0);
} }
continue; continue;
} }

@ -2,6 +2,7 @@ use crate::context::Context;
use crate::error::Error; use crate::error::Error;
use crate::event::EventType; use crate::event::EventType;
use crate::ipc::stream_emitter::emit_metadata_with_response::EmitMetadataWithResponse; use crate::ipc::stream_emitter::emit_metadata_with_response::EmitMetadataWithResponse;
use crate::ipc::stream_emitter::emit_metadata_with_response_stream::EmitMetadataWithResponseStream;
use crate::ipc::stream_emitter::event_metadata::EventMetadata; use crate::ipc::stream_emitter::event_metadata::EventMetadata;
use crate::ipc::stream_emitter::SendStream; use crate::ipc::stream_emitter::SendStream;
use crate::payload::IntoPayload; use crate::payload::IntoPayload;
@ -57,6 +58,14 @@ impl<P: IntoPayload> EmitMetadata<P> {
emit_metadata: Some(self), emit_metadata: Some(self),
} }
} }
pub fn stream_replies(self) -> EmitMetadataWithResponseStream<P> {
EmitMetadataWithResponseStream {
timeout: None,
fut: None,
emit_metadata: Some(self),
}
}
} }
impl<P: IntoPayload> Unpin for EmitMetadata<P> {} impl<P: IntoPayload> Unpin for EmitMetadata<P> {}

@ -1,11 +1,10 @@
use crate::context::Context;
use crate::error::Error; use crate::error::Error;
use crate::error_event::ErrorEventData; use crate::error_event::ErrorEventData;
use crate::event::{Event, EventType}; use crate::event::{Event, EventType};
use crate::ipc::stream_emitter::emit_metadata::EmitMetadata; use crate::ipc::stream_emitter::emit_metadata::EmitMetadata;
use crate::payload::IntoPayload; use crate::payload::IntoPayload;
use crate::{error, poll_unwrap}; use crate::{error, poll_unwrap};
use futures::future;
use futures::future::Either;
use std::future::Future; use std::future::Future;
use std::pin::Pin; use std::pin::Pin;
use std::task::Poll; use std::task::Poll;
@ -54,20 +53,20 @@ impl<P: IntoPayload + Send + Sync + 'static> Future for EmitMetadataWithResponse
}; };
self.fut = Some(Box::pin(async move { self.fut = Some(Box::pin(async move {
let tx = ctx.register_reply_listener(event_id).await?; let mut tx = ctx.register_reply_listener(event_id).await?;
emit_metadata.await?; emit_metadata.await?;
let result = let reply = tokio::select! {
future::select(Box::pin(tx), Box::pin(tokio::time::sleep(timeout))).await; tx_result = tx.recv() => {
Ok(tx_result.ok_or_else(|| Error::SendError)?)
let reply = match result { }
Either::Left((tx_result, _)) => Ok(tx_result?), _ = tokio::time::sleep(timeout) => {
Either::Right(_) => {
let mut listeners = ctx.reply_listeners.lock().await;
listeners.remove(&event_id);
Err(Error::Timeout) Err(Error::Timeout)
} }
}?; }?;
remove_reply_listener(&ctx, event_id).await;
if reply.event_type() == EventType::Error { if reply.event_type() == EventType::Error {
Err(reply.payload::<ErrorEventData>()?.into()) Err(reply.payload::<ErrorEventData>()?.into())
} else { } else {
@ -78,3 +77,8 @@ impl<P: IntoPayload + Send + Sync + 'static> Future for EmitMetadataWithResponse
self.fut.as_mut().unwrap().as_mut().poll(cx) self.fut.as_mut().unwrap().as_mut().poll(cx)
} }
} }
pub(crate) async fn remove_reply_listener(ctx: &Context, event_id: u64) {
let mut listeners = ctx.reply_listeners.lock().await;
listeners.remove(&event_id);
}

@ -0,0 +1,150 @@
use crate::context::Context;
use crate::error::{Error, Result};
use crate::event::{Event, EventType};
use crate::ipc::stream_emitter::emit_metadata::EmitMetadata;
use crate::ipc::stream_emitter::emit_metadata_with_response::remove_reply_listener;
use crate::payload::IntoPayload;
use crate::poll_unwrap;
use futures_core::Stream;
use std::future::Future;
use std::pin::Pin;
use std::task::Poll;
use std::time::Duration;
use tokio::sync::mpsc::Receiver;
/// A metadata object returned after waiting for a reply to an event
/// This object needs to be awaited for to get the actual reply
pub struct EmitMetadataWithResponseStream<P: IntoPayload> {
pub(crate) timeout: Option<Duration>,
pub(crate) fut: Option<Pin<Box<dyn Future<Output = Result<ResponseStream>> + Send + Sync>>>,
pub(crate) emit_metadata: Option<EmitMetadata<P>>,
}
pub struct ResponseStream {
event_id: u64,
ctx: Option<Context>,
receiver: Option<Receiver<Event>>,
timeout: Duration,
fut: Option<Pin<Box<dyn Future<Output = Result<(Option<Event>, Context, Receiver<Event>)>>>>>,
}
impl ResponseStream {
pub(crate) fn new(
event_id: u64,
timeout: Duration,
ctx: Context,
receiver: Receiver<Event>,
) -> Self {
Self {
event_id,
ctx: Some(ctx),
receiver: Some(receiver),
timeout,
fut: None,
}
}
}
impl<P: IntoPayload> Unpin for EmitMetadataWithResponseStream<P> {}
impl<P: IntoPayload> EmitMetadataWithResponseStream<P> {
/// Sets a timeout for awaiting replies to this emitted event
#[inline]
pub fn with_timeout(mut self, timeout: Duration) -> Self {
self.timeout = Some(timeout);
self
}
}
impl<P: IntoPayload + Send + Sync + 'static> Future for EmitMetadataWithResponseStream<P> {
type Output = Result<ResponseStream>;
fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
if self.fut.is_none() {
let mut emit_metadata = poll_unwrap!(self.emit_metadata.take());
let ctx = poll_unwrap!(emit_metadata
.event_metadata
.as_ref()
.and_then(|m| m.ctx.clone()));
let timeout = self
.timeout
.clone()
.unwrap_or(ctx.default_reply_timeout.clone());
let event_id = match poll_unwrap!(emit_metadata.event_metadata.as_mut()).get_event() {
Ok(e) => e.id(),
Err(e) => {
return Poll::Ready(Err(e));
}
};
self.fut = Some(Box::pin(async move {
let tx = ctx.register_reply_listener(event_id).await?;
emit_metadata.await?;
Ok(ResponseStream::new(event_id, timeout, ctx, tx))
}))
}
self.fut.as_mut().unwrap().as_mut().poll(cx)
}
}
impl Unpin for ResponseStream {}
impl Stream for ResponseStream {
type Item = Result<Event>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
if self.fut.is_none() {
if self.ctx.is_none() || self.receiver.is_none() {
return Poll::Ready(None);
}
let ctx = self.ctx.take().unwrap();
let mut receiver = self.receiver.take().unwrap();
let timeout = self.timeout;
let event_id = self.event_id;
self.fut = Some(Box::pin(async move {
let event: Option<Event> = tokio::select! {
tx_result = receiver.recv() => {
Ok(tx_result)
}
_ = tokio::time::sleep(timeout) => {
Err(Error::Timeout)
}
}?;
if event.is_none() || event.as_ref().unwrap().event_type() == EventType::End {
remove_reply_listener(&ctx, event_id).await;
}
Ok((event, ctx, receiver))
}));
}
match self.fut.as_mut().unwrap().as_mut().poll(cx) {
Poll::Ready(r) => match r {
Ok((event, ctx, tx)) => {
self.fut = None;
if let Some(event) = event {
if event.event_type() != EventType::End {
self.ctx = Some(ctx);
self.receiver = Some(tx);
}
Poll::Ready(Some(Ok(event)))
} else {
Poll::Ready(None)
}
}
Err(e) => Poll::Ready(Some(Err(e))),
},
Poll::Pending => Poll::Pending,
}
}
}

@ -1,29 +1,22 @@
pub mod emit_metadata; pub mod emit_metadata;
pub mod emit_metadata_with_response; pub mod emit_metadata_with_response;
pub mod emit_metadata_with_response_stream;
mod event_metadata; mod event_metadata;
use std::sync::Arc; use std::sync::Arc;
use tokio::io::AsyncWrite;
use emit_metadata::EmitMetadata;
use tokio::io::{AsyncWrite};
use tokio::sync::Mutex; use tokio::sync::Mutex;
use tracing; use tracing;
use crate::event::EventType; use crate::event::EventType;
use crate::ipc::context::Context; use crate::ipc::context::Context;
use crate::payload::IntoPayload; use crate::payload::IntoPayload;
use crate::protocol::AsyncProtocolStream; use crate::protocol::AsyncProtocolStream;
pub use emit_metadata_with_response_stream::ResponseStream;
use crate::prelude::emit_metadata::EmitMetadata;
#[macro_export] #[macro_export]
macro_rules! poll_unwrap { macro_rules! poll_unwrap {
($val:expr) => { ($val:expr) => {

@ -137,6 +137,7 @@ pub mod prelude {
pub use crate::event_handler::{EventHandler, Response}; pub use crate::event_handler::{EventHandler, Response};
pub use crate::ipc::context::Context; pub use crate::ipc::context::Context;
pub use crate::ipc::context::{PoolGuard, PooledContext}; pub use crate::ipc::context::{PoolGuard, PooledContext};
pub use crate::ipc::stream_emitter::*;
pub use crate::ipc::*; pub use crate::ipc::*;
pub use crate::macros::*; pub use crate::macros::*;
pub use crate::namespace::Namespace; pub use crate::namespace::Namespace;

@ -0,0 +1,88 @@
use crate::utils::call_counter::{get_counter_from_context, increment_counter_for_event};
use crate::utils::protocol::TestProtocolListener;
use crate::utils::{get_free_port, start_server_and_client};
use bromine::prelude::*;
use byteorder::ReadBytesExt;
use futures::StreamExt;
use std::io::Read;
use std::time::Duration;
mod utils;
/// When awaiting the reply to an event the handler for the event doesn't get called.
/// Therefore we expect it to have a call count of 0.
#[tokio::test]
async fn it_receives_responses() {
let port = get_free_port();
let ctx = get_client_with_server(port).await;
let mut reply_stream = ctx
.emit("stream", EmptyPayload)
.stream_replies()
.await
.unwrap();
let mut reply_stream_2 = ctx
.emit("stream", EmptyPayload)
.stream_replies()
.await
.unwrap();
for i in 0u8..=100 {
if let Some(Ok(event)) = reply_stream.next().await {
assert_eq!(event.payload::<NumberPayload>().unwrap().0, i)
} else {
panic!("stream 1 has no value {}", i);
}
if let Some(Ok(event)) = reply_stream_2.next().await {
assert_eq!(event.payload::<NumberPayload>().unwrap().0, i)
} else {
panic!("stream 2 has no value {}", i);
}
}
let counter = get_counter_from_context(&ctx).await;
assert_eq!(counter.get("stream").await, 2);
}
async fn get_client_with_server(port: u8) -> Context {
start_server_and_client(move || get_builder(port)).await
}
fn get_builder(port: u8) -> IPCBuilder<TestProtocolListener> {
IPCBuilder::new()
.address(port)
.timeout(Duration::from_millis(100))
.on("stream", callback!(handle_stream_event))
}
async fn handle_stream_event(ctx: &Context, event: Event) -> IPCResult<Response> {
increment_counter_for_event(ctx, &event).await;
for i in 0u8..=99 {
ctx.emit("number", NumberPayload(i)).await?;
}
ctx.response(NumberPayload(100))
}
pub struct EmptyPayload;
impl IntoPayload for EmptyPayload {
fn into_payload(self, _: &Context) -> IPCResult<Vec<u8>> {
Ok(vec![])
}
}
pub struct NumberPayload(u8);
impl IntoPayload for NumberPayload {
fn into_payload(self, _: &Context) -> IPCResult<Vec<u8>> {
Ok(vec![self.0])
}
}
impl FromPayload for NumberPayload {
fn from_payload<R: Read>(mut reader: R) -> IPCResult<Self> {
let num = reader.read_u8()?;
Ok(NumberPayload(num))
}
}
Loading…
Cancel
Save