From 8685bcbab832354f849e202312347fcf970bfd55 Mon Sep 17 00:00:00 2001 From: Trivernis Date: Wed, 29 Dec 2021 14:52:40 +0100 Subject: [PATCH 1/2] Shorten emitting and reply awaiting to implement Future for metadata Signed-off-by: Trivernis --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/error.rs | 3 + src/ipc/context.rs | 61 ++------ src/ipc/mod.rs | 11 +- src/ipc/stream_emitter.rs | 248 ++++++++++++++++++++++-------- src/lib.rs | 2 +- tests/test_events_with_payload.rs | 4 +- tests/test_raw_events.rs | 8 +- 9 files changed, 217 insertions(+), 124 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index eab0c89c..6dff7535 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -93,7 +93,7 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" [[package]] name = "bromine" -version = "0.16.2" +version = "0.17.0" dependencies = [ "async-trait", "bincode", diff --git a/Cargo.toml b/Cargo.toml index af6a4deb..1a6e3cd4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "bromine" -version = "0.16.2" +version = "0.17.0" authors = ["trivernis "] edition = "2018" readme = "README.md" diff --git a/src/error.rs b/src/error.rs index 0ae67c6b..2985edfd 100644 --- a/src/error.rs +++ b/src/error.rs @@ -36,6 +36,9 @@ pub enum Error { #[error("Unsupported API Version {0}")] UnsupportedVersion(String), + + #[error("Invalid state")] + InvalidState, } impl Error { diff --git a/src/ipc/context.rs b/src/ipc/context.rs index e9646cd7..9fb5172d 100644 --- a/src/ipc/context.rs +++ b/src/ipc/context.rs @@ -4,10 +4,8 @@ use std::ops::{Deref, DerefMut}; use std::sync::Arc; use std::sync::atomic::{AtomicUsize, Ordering}; -use futures::future; -use futures::future::Either; use tokio::sync::{Mutex, oneshot, RwLock}; -use tokio::sync::oneshot::Sender; +use tokio::sync::oneshot::{Sender, Receiver}; use tokio::time::Duration; use typemap_rev::TypeMap; @@ -42,9 +40,9 @@ pub struct Context { stop_sender: Arc>>>, - reply_listeners: ReplyListeners, + pub(crate) reply_listeners: ReplyListeners, - reply_timeout: Duration, + pub default_reply_timeout: Duration, ref_id: Option, @@ -66,7 +64,7 @@ impl Context { reply_listeners, data, stop_sender: Arc::new(Mutex::new(stop_sender)), - reply_timeout, + default_reply_timeout: reply_timeout, #[cfg(feature = "serialize")] default_serializer, ref_id: None, @@ -74,72 +72,45 @@ impl Context { } /// Emits an event with a given payload that can be serialized into bytes - pub async fn emit, P: IntoPayload>( + pub fn emit, P: IntoPayload>( &self, name: S, payload: P, - ) -> Result { - let payload_bytes = payload.into_payload(&self)?; - + ) -> EmitMetadata

{ if let Some(ref_id) = &self.ref_id { self.emitter - .emit_response(*ref_id, name, payload_bytes) - .await + .emit_response(self.clone(), *ref_id, name, payload) } else { - self.emitter.emit(name, payload_bytes).await + self.emitter.emit(self.clone(), name, payload) } } /// Emits an event to a specific namespace - pub async fn emit_to, S2: AsRef, P: IntoPayload>( + pub fn emit_to, S2: AsRef, P: IntoPayload>( &self, namespace: S1, name: S2, payload: P, - ) -> Result { - let payload_bytes = payload.into_payload(&self)?; - + ) -> EmitMetadata

{ if let Some(ref_id) = &self.ref_id { self.emitter - .emit_response_to(*ref_id, namespace, name, payload_bytes) - .await + .emit_response_to(self.clone(), *ref_id, namespace, name, payload) } else { - self.emitter.emit_to(namespace, name, payload_bytes).await + self.emitter.emit_to(self.clone(), namespace, name, payload) } } - /// Waits for a reply to the given message ID + /// Registers a reply listener for a given event #[inline] #[tracing::instrument(level = "debug", skip(self))] - pub async fn await_reply(&self, message_id: u64) -> Result { - self.await_reply_with_timeout(message_id, self.reply_timeout.to_owned()).await - } - - /// Waits for a reply to the given Message ID with a given timeout - #[tracing::instrument(level = "debug", skip(self))] - pub async fn await_reply_with_timeout(&self, message_id: u64, timeout: Duration) -> Result { + pub(crate) async fn register_reply_listener(&self, event_id: u64) -> Result> { let (rx, tx) = oneshot::channel(); { let mut listeners = self.reply_listeners.lock().await; - listeners.insert(message_id, rx); + listeners.insert(event_id, rx); } - let result = future::select( - Box::pin(tx), - Box::pin(tokio::time::sleep(timeout)), - ) - .await; - - let event = match result { - Either::Left((tx_result, _)) => Ok(tx_result?), - Either::Right(_) => { - let mut listeners = self.reply_listeners.lock().await; - listeners.remove(&message_id); - Err(Error::Timeout) - } - }?; - - Ok(event) + Ok(tx) } /// Stops the listener and closes the connection diff --git a/src/ipc/mod.rs b/src/ipc/mod.rs index be02f5a9..8d280315 100644 --- a/src/ipc/mod.rs +++ b/src/ipc/mod.rs @@ -1,10 +1,11 @@ -use crate::error_event::{ErrorEventData, ERROR_EVENT_NAME}; +use std::collections::HashMap; +use std::sync::Arc; + +use crate::error_event::{ERROR_EVENT_NAME, ErrorEventData}; use crate::events::event_handler::EventHandler; use crate::namespaces::namespace::Namespace; use crate::prelude::*; use crate::protocol::AsyncProtocolStream; -use std::collections::HashMap; -use std::sync::Arc; pub mod builder; pub mod client; @@ -65,11 +66,11 @@ fn handle_event(mut ctx: Context, handler: Arc, event: Event) { message: format!("{:?}", e), code: 500, }, - ) - .await + ).await { tracing::error!("Error occurred when sending error response: {:?}", e); } + tracing::error!("Failed to handle event: {:?}", e); } }); diff --git a/src/ipc/stream_emitter.rs b/src/ipc/stream_emitter.rs index 2dbac28a..f1a86d64 100644 --- a/src/ipc/stream_emitter.rs +++ b/src/ipc/stream_emitter.rs @@ -1,20 +1,29 @@ -use crate::error::Result; +use std::future::Future; +use crate::error::{Result, Error}; use crate::error_event::{ErrorEventData, ERROR_EVENT_NAME}; use crate::events::event::Event; use crate::ipc::context::Context; use crate::protocol::AsyncProtocolStream; use std::ops::DerefMut; +use std::pin::Pin; use std::sync::Arc; +use std::task::Poll; use std::time::Duration; use tokio::io::{AsyncWrite, AsyncWriteExt}; use tokio::sync::Mutex; +use std::mem; +use futures::future; +use futures::future::Either; +use crate::payload::IntoPayload; + +type SendStream = Arc>; /// An abstraction over any type that implements the AsyncProtocolStream trait /// to emit events and share a connection across multiple /// contexts. #[derive(Clone)] pub struct StreamEmitter { - stream: Arc>, + stream: SendStream, } impl StreamEmitter { @@ -24,104 +33,161 @@ impl StreamEmitter { } } - #[tracing::instrument(level = "trace", skip(self, data_bytes))] - async fn _emit( + #[tracing::instrument(level = "trace", skip(self, ctx, payload))] + fn _emit( &self, + ctx: Context, namespace: Option<&str>, event: &str, - data_bytes: Vec, + payload: P, res_id: Option, - ) -> Result { - let event = if let Some(namespace) = namespace { - Event::with_namespace(namespace.to_string(), event.to_string(), data_bytes, res_id) - } else { - Event::new(event.to_string(), data_bytes, res_id) - }; - - let event_id = event.id(); - - let event_bytes = event.into_bytes()?; - { - let mut stream = self.stream.lock().await; - stream.deref_mut().write_all(&event_bytes[..]).await?; - tracing::trace!(bytes_len = event_bytes.len()); - } - - Ok(EmitMetadata::new(event_id)) + ) -> EmitMetadata

{ + EmitMetadata::new(ctx, self.stream.clone(), event.to_string(), namespace.map(|n| n.to_string()), payload, res_id) } /// Emits an event #[inline] - pub(crate) async fn emit>( + pub(crate) fn emit, P: IntoPayload>( &self, + ctx: Context, event: S, - payload: Vec, - ) -> Result { - self._emit(None, event.as_ref(), payload, None).await + payload: P, + ) -> EmitMetadata

{ + self._emit(ctx, None, event.as_ref(), payload, None) } /// Emits an event to a specific namespace #[inline] - pub(crate) async fn emit_to, S2: AsRef>( + pub(crate) fn emit_to, S2: AsRef,P: IntoPayload>( &self, + ctx: Context, namespace: S1, event: S2, - payload: Vec, - ) -> Result { - self._emit(Some(namespace.as_ref()), event.as_ref(), payload, None) - .await + payload: P, + ) -> EmitMetadata

{ + self._emit(ctx, Some(namespace.as_ref()), event.as_ref(), payload, None) } /// Emits a response to an event #[inline] - pub(crate) async fn emit_response>( + pub(crate) fn emit_response, P: IntoPayload>( &self, + ctx: Context, event_id: u64, event: S, - payload: Vec, - ) -> Result { - self._emit(None, event.as_ref(), payload, Some(event_id)) - .await + payload: P, + ) -> EmitMetadata

{ + self._emit(ctx, None, event.as_ref(), payload, Some(event_id)) } /// Emits a response to an event to a namespace #[inline] - pub(crate) async fn emit_response_to, S2: AsRef>( + pub(crate) fn emit_response_to, S2: AsRef, P: IntoPayload>( &self, + ctx: Context, event_id: u64, namespace: S1, event: S2, - payload: Vec, - ) -> Result { + payload: P, + ) -> EmitMetadata

{ self._emit( + ctx, Some(namespace.as_ref()), event.as_ref(), payload, Some(event_id), ) - .await + } +} + +struct EventMetadata { + event: Option, + ctx: Option, + event_namespace: Option>, + event_name: Option, + res_id: Option>, + payload: Option

, +} + +impl EventMetadata

{ + pub fn get_event(&mut self) -> Result<&Event> { + if self.event.is_none() { + self.build_event()?; + } + Ok(self.event.as_ref().unwrap()) + } + + pub fn take_event(mut self) -> Result> { + if self.event.is_none() { + self.build_event()?; + } + Ok(mem::take(&mut self.event)) + } + + fn build_event(&mut self) -> Result<()> { + let ctx =self.ctx.take().ok_or(Error::InvalidState)?; + let event = self.event_name.take().ok_or(Error::InvalidState)?; + let namespace = self.event_namespace.take().ok_or(Error::InvalidState)?; + let payload = self.payload.take().ok_or(Error::InvalidState)?; + let res_id = self.res_id.take().ok_or(Error::InvalidState)?; + let payload_bytes = payload.into_payload(&ctx)?; + + let event = if let Some(namespace) = namespace { + Event::with_namespace(namespace.to_string(), event.to_string(), payload_bytes, res_id) + } else { + Event::new(event.to_string(), payload_bytes, res_id) + }; + self.event = Some(event); + + Ok(()) } } /// A metadata object returned after emitting an event. +/// To send the event this object needs to be awaited /// This object can be used to wait for a response to an event. -pub struct EmitMetadata { - message_id: u64, - timeout: Option +/// The result contains the emitted event id. +pub struct EmitMetadata { + event_metadata: Option>, + stream: Option, + fut: Option> + Send>>>, +} + +/// A metadata object returned after waiting for a reply to an event +/// This object needs to be awaited for to get the actual reply +pub struct EmitMetadataWithResponse { + timeout: Option, + fut: Option>>>>, + emit_metadata: Option>, } -impl EmitMetadata { +impl EmitMetadata

{ + #[inline] - pub(crate) fn new(message_id: u64) -> Self { - Self { message_id, timeout: None } + pub(crate) fn new(ctx: Context, stream: SendStream, event_name: String, event_namespace: Option, payload: P, res_id: Option) -> Self { + Self { event_metadata: Some(EventMetadata { + event: None, + ctx: Some(ctx), + event_name: Some(event_name), + event_namespace: Some(event_namespace), + payload: Some(payload), + res_id: Some(res_id), + }), stream: Some(stream), fut: None } } - /// The ID of the emitted message - #[inline] - pub fn message_id(&self) -> u64 { - self.message_id + /// Waits for a reply to the given message. + #[tracing::instrument(skip(self), fields(self.message_id))] + pub fn await_reply(self) -> EmitMetadataWithResponse

{ + + EmitMetadataWithResponse { + timeout: None, + fut: None, + emit_metadata: Some(self) + } } +} +impl EmitMetadataWithResponse

{ /// Sets a timeout for awaiting replies to this emitted event #[inline] pub fn with_timeout(mut self, timeout: Duration) -> Self { @@ -129,19 +195,77 @@ impl EmitMetadata { self } +} - /// Waits for a reply to the given message. - #[tracing::instrument(skip(self, ctx), fields(self.message_id))] - pub async fn await_reply(&self, ctx: &Context) -> Result { - let reply = if let Some(timeout) = self.timeout { - ctx.await_reply_with_timeout(self.message_id, timeout.clone()).await? - } else { - ctx.await_reply(self.message_id).await? - }; - if reply.name() == ERROR_EVENT_NAME { - Err(reply.payload::()?.into()) - } else { - Ok(reply) +impl Unpin for EmitMetadata

{} +impl Unpin for EmitMetadataWithResponse

{} + +impl Future for EmitMetadata

{ + type Output = Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { + if self.fut.is_none() { + let event_metadata= self.event_metadata.take().expect("poll after future was done"); + let stream = self.stream.take().expect("poll after future was done"); + + let event = match event_metadata.take_event() { + Ok(m) => {m} + Err(e) => {return Poll::Ready(Err(e))} + }.expect("poll after future was done"); + + self.fut = Some(Box::pin(async move { + let event_id = event.id(); + let event_bytes = event.into_bytes()?; + let mut stream = stream.lock().await; + stream.deref_mut().write_all(&event_bytes[..]).await?; + tracing::trace!(bytes_len = event_bytes.len()); + + Ok(event_id) + })); } + self.fut.as_mut().unwrap().as_mut().poll(cx) } } + +impl Future for EmitMetadataWithResponse

{ + type Output = Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { + if self.fut.is_none() { + let mut emit_metadata = self.emit_metadata.take().expect("poll after future was done"); + let ctx = emit_metadata.event_metadata.as_ref().expect("poll before waiting for reply").ctx.clone().expect("poll before waiting for reply"); + let timeout = self.timeout.clone().unwrap_or(ctx.default_reply_timeout.clone()); + + let event_id = match emit_metadata.event_metadata.as_mut().expect("poll before waiting for reply").get_event() { + Ok(e) => {e.id()} + Err(e) => {return Poll::Ready(Err(e))} + }; + + self.fut = Some(Box::pin(async move { + let tx = ctx.register_reply_listener(event_id).await?; + emit_metadata.await?; + + let result = future::select( + Box::pin(tx), + Box::pin(tokio::time::sleep(timeout)), + ) + .await; + + let reply = match result { + Either::Left((tx_result, _)) => Ok(tx_result?), + Either::Right(_) => { + let mut listeners = ctx.reply_listeners.lock().await; + listeners.remove(&event_id); + Err(Error::Timeout) + } + }?; + if reply.name() == ERROR_EVENT_NAME { + Err(reply.payload::()?.into()) + } else { + Ok(reply) + } + })) + } + self.fut.as_mut().unwrap().as_mut().poll(cx) + } +} \ No newline at end of file diff --git a/src/lib.rs b/src/lib.rs index b6e0acbe..a251ef06 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -53,7 +53,7 @@ //! .build_client().await.unwrap(); //! //! // emit an initial event -//! let response = ctx.emit("ping", ()).await.unwrap().await_reply(&ctx).await.unwrap(); +//! let response = ctx.emit("ping", ()).await_reply().await.unwrap(); //! assert_eq!(response.name(), "pong"); //! } //! ``` diff --git a/tests/test_events_with_payload.rs b/tests/test_events_with_payload.rs index ee1ef19a..a00f8601 100644 --- a/tests/test_events_with_payload.rs +++ b/tests/test_events_with_payload.rs @@ -38,9 +38,7 @@ async fn it_receives_payloads() { }; let reply = ctx .emit("ping", payload) - .await - .unwrap() - .await_reply(&ctx) + .await_reply() .await .unwrap(); let reply_payload = reply.payload::().unwrap(); diff --git a/tests/test_raw_events.rs b/tests/test_raw_events.rs index da0be6a7..e885b20d 100644 --- a/tests/test_raw_events.rs +++ b/tests/test_raw_events.rs @@ -47,9 +47,7 @@ async fn it_receives_responses() { let ctx = get_client_with_server(port).await; let reply = ctx .emit("ping", EmptyPayload) - .await - .unwrap() - .await_reply(&ctx) + .await_reply() .await .unwrap(); let counter = get_counter_from_context(&ctx).await; @@ -81,10 +79,8 @@ async fn it_receives_error_responses() { let ctx = get_client_with_server(port).await; let result = ctx .emit("create_error", EmptyPayload) - .await - .unwrap() + .await_reply() .with_timeout(Duration::from_millis(100)) - .await_reply(&ctx) .await; let counter = get_counter_from_context(&ctx).await; From bca5b90d57ec7e7ec95b846a7f9be1bbde8c4a4f Mon Sep 17 00:00:00 2001 From: Trivernis Date: Wed, 29 Dec 2021 15:04:08 +0100 Subject: [PATCH 2/2] Remove some dirty unwraps Signed-off-by: Trivernis --- src/ipc/stream_emitter.rs | 79 ++++++++++++++++++++++++--------------- src/macros.rs | 2 +- 2 files changed, 49 insertions(+), 32 deletions(-) diff --git a/src/ipc/stream_emitter.rs b/src/ipc/stream_emitter.rs index f1a86d64..91249083 100644 --- a/src/ipc/stream_emitter.rs +++ b/src/ipc/stream_emitter.rs @@ -1,20 +1,34 @@ use std::future::Future; -use crate::error::{Result, Error}; -use crate::error_event::{ErrorEventData, ERROR_EVENT_NAME}; -use crate::events::event::Event; -use crate::ipc::context::Context; -use crate::protocol::AsyncProtocolStream; +use std::mem; use std::ops::DerefMut; use std::pin::Pin; use std::sync::Arc; use std::task::Poll; use std::time::Duration; -use tokio::io::{AsyncWrite, AsyncWriteExt}; -use tokio::sync::Mutex; -use std::mem; + use futures::future; use futures::future::Either; +use tokio::io::{AsyncWrite, AsyncWriteExt}; +use tokio::sync::Mutex; +use tracing; + +use crate::error::{Error, Result}; +use crate::error_event::{ERROR_EVENT_NAME, ErrorEventData}; +use crate::events::event::Event; +use crate::ipc::context::Context; use crate::payload::IntoPayload; +use crate::protocol::AsyncProtocolStream; + +macro_rules! poll_unwrap { + ($val:expr) => { + if let Some(v) = $val { + v + } else { + tracing::error!("Polling a future with an invalid state."); + return Poll::Ready(Err(Error::InvalidState)) + } + } +} type SendStream = Arc>; @@ -58,7 +72,7 @@ impl StreamEmitter { /// Emits an event to a specific namespace #[inline] - pub(crate) fn emit_to, S2: AsRef,P: IntoPayload>( + pub(crate) fn emit_to, S2: AsRef, P: IntoPayload>( &self, ctx: Context, namespace: S1, @@ -125,7 +139,7 @@ impl EventMetadata

{ } fn build_event(&mut self) -> Result<()> { - let ctx =self.ctx.take().ok_or(Error::InvalidState)?; + let ctx = self.ctx.take().ok_or(Error::InvalidState)?; let event = self.event_name.take().ok_or(Error::InvalidState)?; let namespace = self.event_namespace.take().ok_or(Error::InvalidState)?; let payload = self.payload.take().ok_or(Error::InvalidState)?; @@ -161,28 +175,30 @@ pub struct EmitMetadataWithResponse { emit_metadata: Option>, } -impl EmitMetadata

{ - +impl EmitMetadata

{ #[inline] pub(crate) fn new(ctx: Context, stream: SendStream, event_name: String, event_namespace: Option, payload: P, res_id: Option) -> Self { - Self { event_metadata: Some(EventMetadata { - event: None, - ctx: Some(ctx), - event_name: Some(event_name), - event_namespace: Some(event_namespace), - payload: Some(payload), - res_id: Some(res_id), - }), stream: Some(stream), fut: None } + Self { + event_metadata: Some(EventMetadata { + event: None, + ctx: Some(ctx), + event_name: Some(event_name), + event_namespace: Some(event_namespace), + payload: Some(payload), + res_id: Some(res_id), + }), + stream: Some(stream), + fut: None, + } } /// Waits for a reply to the given message. #[tracing::instrument(skip(self), fields(self.message_id))] pub fn await_reply(self) -> EmitMetadataWithResponse

{ - EmitMetadataWithResponse { timeout: None, fut: None, - emit_metadata: Some(self) + emit_metadata: Some(self), } } } @@ -198,6 +214,7 @@ impl EmitMetadataWithResponse

{ } impl Unpin for EmitMetadata

{} + impl Unpin for EmitMetadataWithResponse

{} impl Future for EmitMetadata

{ @@ -205,12 +222,12 @@ impl Future for EmitMetadata

{ fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { if self.fut.is_none() { - let event_metadata= self.event_metadata.take().expect("poll after future was done"); - let stream = self.stream.take().expect("poll after future was done"); + let event_metadata = poll_unwrap!(self.event_metadata.take()); + let stream = poll_unwrap!(self.stream.take()); let event = match event_metadata.take_event() { - Ok(m) => {m} - Err(e) => {return Poll::Ready(Err(e))} + Ok(m) => { m } + Err(e) => { return Poll::Ready(Err(e)); } }.expect("poll after future was done"); self.fut = Some(Box::pin(async move { @@ -232,13 +249,13 @@ impl Future for EmitMetadataWithResponse

{ fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { if self.fut.is_none() { - let mut emit_metadata = self.emit_metadata.take().expect("poll after future was done"); - let ctx = emit_metadata.event_metadata.as_ref().expect("poll before waiting for reply").ctx.clone().expect("poll before waiting for reply"); + let mut emit_metadata = poll_unwrap!(self.emit_metadata.take()); + let ctx = poll_unwrap!(emit_metadata.event_metadata.as_ref().and_then(|m| m.ctx.clone())); let timeout = self.timeout.clone().unwrap_or(ctx.default_reply_timeout.clone()); - let event_id = match emit_metadata.event_metadata.as_mut().expect("poll before waiting for reply").get_event() { - Ok(e) => {e.id()} - Err(e) => {return Poll::Ready(Err(e))} + let event_id = match poll_unwrap!(emit_metadata.event_metadata.as_mut()).get_event() { + Ok(e) => { e.id() } + Err(e) => { return Poll::Ready(Err(e)); } }; self.fut = Some(Box::pin(async move { diff --git a/src/macros.rs b/src/macros.rs index ff15f867..0107c03f 100644 --- a/src/macros.rs +++ b/src/macros.rs @@ -30,4 +30,4 @@ macro_rules! events{ $handler.on($name, callback!($cb)); )* } -} +} \ No newline at end of file