From e316d29807270c907a010d1cfa849c52629ca2c7 Mon Sep 17 00:00:00 2001 From: trivernis Date: Sun, 6 Feb 2022 13:08:17 +0100 Subject: [PATCH] Move emit metadata to separate modules Signed-off-by: trivernis --- src/ipc/context.rs | 5 +- src/ipc/stream_emitter.rs | 356 ------------------ src/ipc/stream_emitter/emit_metadata.rs | 92 +++++ .../emit_metadata_with_response.rs | 80 ++++ src/ipc/stream_emitter/event_metadata.rs | 54 +++ src/ipc/stream_emitter/mod.rs | 166 ++++++++ 6 files changed, 395 insertions(+), 358 deletions(-) delete mode 100644 src/ipc/stream_emitter.rs create mode 100644 src/ipc/stream_emitter/emit_metadata.rs create mode 100644 src/ipc/stream_emitter/emit_metadata_with_response.rs create mode 100644 src/ipc/stream_emitter/event_metadata.rs create mode 100644 src/ipc/stream_emitter/mod.rs diff --git a/src/ipc/context.rs b/src/ipc/context.rs index 658e0c15..b7f8b51a 100644 --- a/src/ipc/context.rs +++ b/src/ipc/context.rs @@ -5,13 +5,14 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use tokio::sync::oneshot::{Receiver, Sender}; -use tokio::sync::{oneshot, Mutex, RwLock}; +use tokio::sync::{Mutex, oneshot, RwLock}; use tokio::time::Duration; use typemap_rev::TypeMap; use crate::error::{Error, Result}; use crate::event::{Event, EventType}; -use crate::ipc::stream_emitter::{EmitMetadata, StreamEmitter}; +use crate::ipc::stream_emitter::StreamEmitter; +use crate::ipc::stream_emitter::emit_metadata::EmitMetadata; use crate::payload::IntoPayload; #[cfg(feature = "serialize")] use crate::payload::{DynamicSerializer, SerdePayload}; diff --git a/src/ipc/stream_emitter.rs b/src/ipc/stream_emitter.rs deleted file mode 100644 index 76164fc5..00000000 --- a/src/ipc/stream_emitter.rs +++ /dev/null @@ -1,356 +0,0 @@ -use std::future::Future; -use std::mem; -use std::ops::DerefMut; -use std::pin::Pin; -use std::sync::Arc; -use std::task::Poll; -use std::time::Duration; - -use futures::future; -use futures::future::Either; -use tokio::io::{AsyncWrite, AsyncWriteExt}; -use tokio::sync::Mutex; -use tracing; - -use crate::error::{Error, Result}; -use crate::error_event::ErrorEventData; -use crate::event::EventType; -use crate::events::event::Event; -use crate::ipc::context::Context; -use crate::payload::IntoPayload; -use crate::protocol::AsyncProtocolStream; - -macro_rules! poll_unwrap { - ($val:expr) => { - if let Some(v) = $val { - v - } else { - tracing::error!("Polling a future with an invalid state."); - return Poll::Ready(Err(Error::InvalidState)); - } - }; -} - -type SendStream = Arc>; - -/// An abstraction over any type that implements the AsyncProtocolStream trait -/// to emit events and share a connection across multiple -/// contexts. -#[derive(Clone)] -pub struct StreamEmitter { - stream: SendStream, -} - -impl StreamEmitter { - pub fn new(stream: P::OwnedSplitWriteHalf) -> Self { - Self { - stream: Arc::new(Mutex::new(stream)), - } - } - - #[tracing::instrument(level = "trace", skip(self, ctx, payload))] - fn _emit( - &self, - ctx: Context, - namespace: Option, - event: &str, - payload: P, - res_id: Option, - event_type: EventType, - ) -> EmitMetadata

{ - EmitMetadata::new( - ctx, - self.stream.clone(), - event.to_string(), - namespace, - payload, - res_id, - event_type, - ) - } - - /// Emits an event - #[inline] - pub(crate) fn emit, P: IntoPayload>( - &self, - ctx: Context, - event: S, - payload: P, - ) -> EmitMetadata

{ - self._emit( - ctx, - None, - event.as_ref(), - payload, - None, - EventType::Initiator, - ) - } - - /// Emits an event to a specific namespace - #[inline] - pub(crate) fn emit_to, S2: AsRef, P: IntoPayload>( - &self, - ctx: Context, - namespace: S1, - event: S2, - payload: P, - ) -> EmitMetadata

{ - self._emit( - ctx, - Some(namespace.as_ref().to_string()), - event.as_ref(), - payload, - None, - EventType::Initiator, - ) - } - - /// Emits a raw event - #[inline] - pub(crate) fn emit_raw( - &self, - ctx: Context, - res_id: Option, - event: &str, - namespace: Option, - event_type: EventType, - payload: P, - ) -> EmitMetadata

{ - self._emit(ctx, namespace, event, payload, res_id, event_type) - } - - /// Emits a response to an event - #[inline] - pub(crate) fn emit_response, P: IntoPayload>( - &self, - ctx: Context, - event_id: u64, - event: S, - payload: P, - ) -> EmitMetadata

{ - self._emit( - ctx, - None, - event.as_ref(), - payload, - Some(event_id), - EventType::Response, - ) - } - - /// Emits a response to an event to a namespace - #[inline] - pub(crate) fn emit_response_to, S2: AsRef, P: IntoPayload>( - &self, - ctx: Context, - event_id: u64, - namespace: S1, - event: S2, - payload: P, - ) -> EmitMetadata

{ - self._emit( - ctx, - Some(namespace.as_ref().to_string()), - event.as_ref(), - payload, - Some(event_id), - EventType::Response, - ) - } -} - -struct EventMetadata { - event: Option, - ctx: Option, - event_namespace: Option>, - event_name: Option, - res_id: Option>, - event_type: Option, - payload: Option

, -} - -impl EventMetadata

{ - pub fn get_event(&mut self) -> Result<&Event> { - if self.event.is_none() { - self.build_event()?; - } - Ok(self.event.as_ref().unwrap()) - } - - pub fn take_event(mut self) -> Result> { - if self.event.is_none() { - self.build_event()?; - } - Ok(mem::take(&mut self.event)) - } - - fn build_event(&mut self) -> Result<()> { - let ctx = self.ctx.take().ok_or(Error::InvalidState)?; - let event = self.event_name.take().ok_or(Error::InvalidState)?; - let namespace = self.event_namespace.take().ok_or(Error::InvalidState)?; - let payload = self.payload.take().ok_or(Error::InvalidState)?; - let res_id = self.res_id.take().ok_or(Error::InvalidState)?; - let event_type = self.event_type.take().ok_or(Error::InvalidState)?; - let payload_bytes = payload.into_payload(&ctx)?; - - let event = Event::new( - namespace, - event.to_string(), - payload_bytes, - res_id, - event_type, - ); - - self.event = Some(event); - - Ok(()) - } -} - -/// A metadata object returned after emitting an event. -/// To send the event this object needs to be awaited -/// This object can be used to wait for a response to an event. -/// The result contains the emitted event id. -pub struct EmitMetadata { - event_metadata: Option>, - stream: Option, - fut: Option> + Send + Sync>>>, -} - -/// A metadata object returned after waiting for a reply to an event -/// This object needs to be awaited for to get the actual reply -pub struct EmitMetadataWithResponse { - timeout: Option, - fut: Option> + Send + Sync>>>, - emit_metadata: Option>, -} - -impl EmitMetadata

{ - #[inline] - pub(crate) fn new( - ctx: Context, - stream: SendStream, - event_name: String, - event_namespace: Option, - payload: P, - res_id: Option, - event_type: EventType, - ) -> Self { - Self { - event_metadata: Some(EventMetadata { - event: None, - ctx: Some(ctx), - event_name: Some(event_name), - event_namespace: Some(event_namespace), - payload: Some(payload), - res_id: Some(res_id), - event_type: Some(event_type), - }), - stream: Some(stream), - fut: None, - } - } - - /// Waits for a reply to the given message. - #[tracing::instrument(skip(self), fields(self.message_id))] - pub fn await_reply(self) -> EmitMetadataWithResponse

{ - EmitMetadataWithResponse { - timeout: None, - fut: None, - emit_metadata: Some(self), - } - } -} - -impl EmitMetadataWithResponse

{ - /// Sets a timeout for awaiting replies to this emitted event - #[inline] - pub fn with_timeout(mut self, timeout: Duration) -> Self { - self.timeout = Some(timeout); - - self - } -} - -impl Unpin for EmitMetadata

{} - -impl Unpin for EmitMetadataWithResponse

{} - -impl Future for EmitMetadata

{ - type Output = Result; - - fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { - if self.fut.is_none() { - let event_metadata = poll_unwrap!(self.event_metadata.take()); - let stream = poll_unwrap!(self.stream.take()); - - let event = match event_metadata.take_event() { - Ok(m) => m, - Err(e) => { - return Poll::Ready(Err(e)); - } - } - .expect("poll after future was done"); - - self.fut = Some(Box::pin(async move { - let event_id = event.id(); - let event_bytes = event.into_bytes()?; - let mut stream = stream.lock().await; - stream.deref_mut().write_all(&event_bytes[..]).await?; - tracing::trace!(bytes_len = event_bytes.len()); - - Ok(event_id) - })); - } - self.fut.as_mut().unwrap().as_mut().poll(cx) - } -} - -impl Future for EmitMetadataWithResponse

{ - type Output = Result; - - fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { - if self.fut.is_none() { - let mut emit_metadata = poll_unwrap!(self.emit_metadata.take()); - let ctx = poll_unwrap!(emit_metadata - .event_metadata - .as_ref() - .and_then(|m| m.ctx.clone())); - let timeout = self - .timeout - .clone() - .unwrap_or(ctx.default_reply_timeout.clone()); - - let event_id = match poll_unwrap!(emit_metadata.event_metadata.as_mut()).get_event() { - Ok(e) => e.id(), - Err(e) => { - return Poll::Ready(Err(e)); - } - }; - - self.fut = Some(Box::pin(async move { - let tx = ctx.register_reply_listener(event_id).await?; - emit_metadata.await?; - - let result = - future::select(Box::pin(tx), Box::pin(tokio::time::sleep(timeout))).await; - - let reply = match result { - Either::Left((tx_result, _)) => Ok(tx_result?), - Either::Right(_) => { - let mut listeners = ctx.reply_listeners.lock().await; - listeners.remove(&event_id); - Err(Error::Timeout) - } - }?; - if reply.event_type() == EventType::Error { - Err(reply.payload::()?.into()) - } else { - Ok(reply) - } - })) - } - self.fut.as_mut().unwrap().as_mut().poll(cx) - } -} diff --git a/src/ipc/stream_emitter/emit_metadata.rs b/src/ipc/stream_emitter/emit_metadata.rs new file mode 100644 index 00000000..ebc4f559 --- /dev/null +++ b/src/ipc/stream_emitter/emit_metadata.rs @@ -0,0 +1,92 @@ +use crate::context::Context; +use crate::error::Error; +use crate::event::EventType; +use crate::ipc::stream_emitter::emit_metadata_with_response::EmitMetadataWithResponse; +use crate::ipc::stream_emitter::event_metadata::EventMetadata; +use crate::ipc::stream_emitter::SendStream; +use crate::payload::IntoPayload; +use crate::{error, poll_unwrap}; +use std::future::Future; +use std::ops::DerefMut; +use std::pin::Pin; +use std::task::Poll; +use tokio::io::AsyncWriteExt; + +/// A metadata object returned after emitting an event. +/// To send the event this object needs to be awaited +/// This object can be used to wait for a response to an event. +/// The result contains the emitted event id. +pub struct EmitMetadata { + pub(crate) event_metadata: Option>, + stream: Option, + fut: Option> + Send + Sync>>>, +} + +impl EmitMetadata

{ + #[inline] + pub(crate) fn new( + ctx: Context, + stream: SendStream, + event_name: String, + event_namespace: Option, + payload: P, + res_id: Option, + event_type: EventType, + ) -> Self { + Self { + event_metadata: Some(EventMetadata { + event: None, + ctx: Some(ctx), + event_name: Some(event_name), + event_namespace: Some(event_namespace), + payload: Some(payload), + res_id: Some(res_id), + event_type: Some(event_type), + }), + stream: Some(stream), + fut: None, + } + } + + /// Waits for a reply to the given message. + #[tracing::instrument(skip(self), fields(self.message_id))] + pub fn await_reply(self) -> EmitMetadataWithResponse

{ + EmitMetadataWithResponse { + timeout: None, + fut: None, + emit_metadata: Some(self), + } + } +} + +impl Unpin for EmitMetadata

{} + +impl Future for EmitMetadata

{ + type Output = error::Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { + if self.fut.is_none() { + let event_metadata = poll_unwrap!(self.event_metadata.take()); + let stream = poll_unwrap!(self.stream.take()); + + let event = match event_metadata.take_event() { + Ok(m) => m, + Err(e) => { + return Poll::Ready(Err(e)); + } + } + .expect("poll after future was done"); + + self.fut = Some(Box::pin(async move { + let event_id = event.id(); + let event_bytes = event.into_bytes()?; + let mut stream = stream.lock().await; + stream.deref_mut().write_all(&event_bytes[..]).await?; + tracing::trace!(bytes_len = event_bytes.len()); + + Ok(event_id) + })); + } + self.fut.as_mut().unwrap().as_mut().poll(cx) + } +} diff --git a/src/ipc/stream_emitter/emit_metadata_with_response.rs b/src/ipc/stream_emitter/emit_metadata_with_response.rs new file mode 100644 index 00000000..cdbf4d54 --- /dev/null +++ b/src/ipc/stream_emitter/emit_metadata_with_response.rs @@ -0,0 +1,80 @@ +use crate::error::Error; +use crate::error_event::ErrorEventData; +use crate::event::{Event, EventType}; +use crate::ipc::stream_emitter::emit_metadata::EmitMetadata; +use crate::payload::IntoPayload; +use crate::{error, poll_unwrap}; +use futures::future; +use futures::future::Either; +use std::future::Future; +use std::pin::Pin; +use std::task::Poll; +use std::time::Duration; + +/// A metadata object returned after waiting for a reply to an event +/// This object needs to be awaited for to get the actual reply +pub struct EmitMetadataWithResponse { + pub(crate) timeout: Option, + pub(crate) fut: Option> + Send + Sync>>>, + pub(crate) emit_metadata: Option>, +} + +impl EmitMetadataWithResponse

{ + /// Sets a timeout for awaiting replies to this emitted event + #[inline] + pub fn with_timeout(mut self, timeout: Duration) -> Self { + self.timeout = Some(timeout); + + self + } +} + +impl Unpin for EmitMetadataWithResponse

{} + +impl Future for EmitMetadataWithResponse

{ + type Output = error::Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { + if self.fut.is_none() { + let mut emit_metadata = poll_unwrap!(self.emit_metadata.take()); + let ctx = poll_unwrap!(emit_metadata + .event_metadata + .as_ref() + .and_then(|m| m.ctx.clone())); + let timeout = self + .timeout + .clone() + .unwrap_or(ctx.default_reply_timeout.clone()); + + let event_id = match poll_unwrap!(emit_metadata.event_metadata.as_mut()).get_event() { + Ok(e) => e.id(), + Err(e) => { + return Poll::Ready(Err(e)); + } + }; + + self.fut = Some(Box::pin(async move { + let tx = ctx.register_reply_listener(event_id).await?; + emit_metadata.await?; + + let result = + future::select(Box::pin(tx), Box::pin(tokio::time::sleep(timeout))).await; + + let reply = match result { + Either::Left((tx_result, _)) => Ok(tx_result?), + Either::Right(_) => { + let mut listeners = ctx.reply_listeners.lock().await; + listeners.remove(&event_id); + Err(Error::Timeout) + } + }?; + if reply.event_type() == EventType::Error { + Err(reply.payload::()?.into()) + } else { + Ok(reply) + } + })) + } + self.fut.as_mut().unwrap().as_mut().poll(cx) + } +} diff --git a/src/ipc/stream_emitter/event_metadata.rs b/src/ipc/stream_emitter/event_metadata.rs new file mode 100644 index 00000000..c9a834f3 --- /dev/null +++ b/src/ipc/stream_emitter/event_metadata.rs @@ -0,0 +1,54 @@ +use crate::context::Context; +use crate::error; +use crate::error::Error; +use crate::event::{Event, EventType}; +use crate::payload::IntoPayload; +use std::mem; + +pub(crate) struct EventMetadata { + pub(crate) event: Option, + pub(crate) ctx: Option, + pub(crate) event_namespace: Option>, + pub(crate) event_name: Option, + pub(crate) res_id: Option>, + pub(crate) event_type: Option, + pub(crate) payload: Option

, +} + +impl EventMetadata

{ + pub fn get_event(&mut self) -> error::Result<&Event> { + if self.event.is_none() { + self.build_event()?; + } + Ok(self.event.as_ref().unwrap()) + } + + pub fn take_event(mut self) -> error::Result> { + if self.event.is_none() { + self.build_event()?; + } + Ok(mem::take(&mut self.event)) + } + + fn build_event(&mut self) -> error::Result<()> { + let ctx = self.ctx.take().ok_or(Error::InvalidState)?; + let event = self.event_name.take().ok_or(Error::InvalidState)?; + let namespace = self.event_namespace.take().ok_or(Error::InvalidState)?; + let payload = self.payload.take().ok_or(Error::InvalidState)?; + let res_id = self.res_id.take().ok_or(Error::InvalidState)?; + let event_type = self.event_type.take().ok_or(Error::InvalidState)?; + let payload_bytes = payload.into_payload(&ctx)?; + + let event = Event::new( + namespace, + event.to_string(), + payload_bytes, + res_id, + event_type, + ); + + self.event = Some(event); + + Ok(()) + } +} diff --git a/src/ipc/stream_emitter/mod.rs b/src/ipc/stream_emitter/mod.rs new file mode 100644 index 00000000..b018f74f --- /dev/null +++ b/src/ipc/stream_emitter/mod.rs @@ -0,0 +1,166 @@ +pub mod emit_metadata; +pub mod emit_metadata_with_response; +mod event_metadata; + +use std::future::Future; +use std::mem; +use std::ops::DerefMut; +use std::pin::Pin; +use std::sync::Arc; +use std::task::Poll; +use std::time::Duration; + +use emit_metadata::EmitMetadata; +use event_metadata::EventMetadata; +use futures::future; +use futures::future::Either; +use tokio::io::{AsyncWrite, AsyncWriteExt}; +use tokio::sync::Mutex; +use tracing; + +use crate::error::Result; +use crate::event::EventType; +use crate::ipc::context::Context; +use crate::payload::IntoPayload; +use crate::protocol::AsyncProtocolStream; + +#[macro_export] +macro_rules! poll_unwrap { + ($val:expr) => { + if let Some(v) = $val { + v + } else { + tracing::error!("Polling a future with an invalid state."); + return Poll::Ready(Err(Error::InvalidState)); + } + }; +} + +type SendStream = Arc>; + +/// An abstraction over any type that implements the AsyncProtocolStream trait +/// to emit events and share a connection across multiple +/// contexts. +#[derive(Clone)] +pub struct StreamEmitter { + stream: SendStream, +} + +impl StreamEmitter { + pub fn new(stream: P::OwnedSplitWriteHalf) -> Self { + Self { + stream: Arc::new(Mutex::new(stream)), + } + } + + #[tracing::instrument(level = "trace", skip(self, ctx, payload))] + fn _emit( + &self, + ctx: Context, + namespace: Option, + event: &str, + payload: P, + res_id: Option, + event_type: EventType, + ) -> EmitMetadata

{ + EmitMetadata::new( + ctx, + self.stream.clone(), + event.to_string(), + namespace, + payload, + res_id, + event_type, + ) + } + + /// Emits an event + #[inline] + pub(crate) fn emit, P: IntoPayload>( + &self, + ctx: Context, + event: S, + payload: P, + ) -> EmitMetadata

{ + self._emit( + ctx, + None, + event.as_ref(), + payload, + None, + EventType::Initiator, + ) + } + + /// Emits an event to a specific namespace + #[inline] + pub(crate) fn emit_to, S2: AsRef, P: IntoPayload>( + &self, + ctx: Context, + namespace: S1, + event: S2, + payload: P, + ) -> EmitMetadata

{ + self._emit( + ctx, + Some(namespace.as_ref().to_string()), + event.as_ref(), + payload, + None, + EventType::Initiator, + ) + } + + /// Emits a raw event + #[inline] + pub(crate) fn emit_raw( + &self, + ctx: Context, + res_id: Option, + event: &str, + namespace: Option, + event_type: EventType, + payload: P, + ) -> EmitMetadata

{ + self._emit(ctx, namespace, event, payload, res_id, event_type) + } + + /// Emits a response to an event + #[inline] + pub(crate) fn emit_response, P: IntoPayload>( + &self, + ctx: Context, + event_id: u64, + event: S, + payload: P, + ) -> EmitMetadata

{ + self._emit( + ctx, + None, + event.as_ref(), + payload, + Some(event_id), + EventType::Response, + ) + } + + /// Emits a response to an event to a namespace + #[inline] + pub(crate) fn emit_response_to, S2: AsRef, P: IntoPayload>( + &self, + ctx: Context, + event_id: u64, + namespace: S1, + event: S2, + payload: P, + ) -> EmitMetadata

{ + self._emit( + ctx, + Some(namespace.as_ref().to_string()), + event.as_ref(), + payload, + Some(event_id), + EventType::Response, + ) + } +}