From 2fa63da081f31f64173452acb28a4a280c4952ae Mon Sep 17 00:00:00 2001 From: trivernis Date: Sun, 6 Feb 2022 11:23:10 +0100 Subject: [PATCH] Change error events to be identified by event type and not by name Signed-off-by: trivernis --- src/ipc/context.rs | 54 ++++++++++++++++++++++++--------------- src/ipc/mod.rs | 10 +++++--- src/ipc/stream_emitter.rs | 26 ++++++++++++++----- 3 files changed, 61 insertions(+), 29 deletions(-) diff --git a/src/ipc/context.rs b/src/ipc/context.rs index 9fb5172d..c939a376 100644 --- a/src/ipc/context.rs +++ b/src/ipc/context.rs @@ -1,20 +1,20 @@ use std::collections::HashMap; use std::mem; use std::ops::{Deref, DerefMut}; -use std::sync::Arc; use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; -use tokio::sync::{Mutex, oneshot, RwLock}; -use tokio::sync::oneshot::{Sender, Receiver}; +use tokio::sync::oneshot::{Receiver, Sender}; +use tokio::sync::{oneshot, Mutex, RwLock}; use tokio::time::Duration; use typemap_rev::TypeMap; use crate::error::{Error, Result}; -use crate::event::Event; +use crate::event::{Event, EventType}; use crate::ipc::stream_emitter::{EmitMetadata, StreamEmitter}; +use crate::payload::IntoPayload; #[cfg(feature = "serialize")] use crate::payload::{DynamicSerializer, SerdePayload}; -use crate::payload::IntoPayload; pub(crate) type ReplyListeners = Arc>>>; @@ -71,12 +71,26 @@ impl Context { } } - /// Emits an event with a given payload that can be serialized into bytes - pub fn emit, P: IntoPayload>( + /// Emits a raw event. Only for internal use + pub(crate) fn emit_raw( &self, - name: S, + name: &str, + namespace: Option, + event_type: EventType, payload: P, ) -> EmitMetadata

{ + self.emitter.emit_raw( + self.clone(), + self.ref_id.clone(), + name, + namespace, + event_type, + payload, + ) + } + + /// Emits an event with a given payload that can be serialized into bytes + pub fn emit, P: IntoPayload>(&self, name: S, payload: P) -> EmitMetadata

{ if let Some(ref_id) = &self.ref_id { self.emitter .emit_response(self.clone(), *ref_id, name, payload) @@ -149,16 +163,16 @@ pub struct PooledContext { } pub struct PoolGuard - where - T: Clone, +where + T: Clone, { inner: T, count: Arc, } impl Deref for PoolGuard - where - T: Clone, +where + T: Clone, { type Target = T; @@ -169,8 +183,8 @@ impl Deref for PoolGuard } impl DerefMut for PoolGuard - where - T: Clone, +where + T: Clone, { #[inline] fn deref_mut(&mut self) -> &mut Self::Target { @@ -179,8 +193,8 @@ impl DerefMut for PoolGuard } impl Clone for PoolGuard - where - T: Clone, +where + T: Clone, { #[inline] fn clone(&self) -> Self { @@ -194,8 +208,8 @@ impl Clone for PoolGuard } impl Drop for PoolGuard - where - T: Clone, +where + T: Clone, { #[inline] fn drop(&mut self) { @@ -204,8 +218,8 @@ impl Drop for PoolGuard } impl PoolGuard - where - T: Clone, +where + T: Clone, { pub(crate) fn new(inner: T) -> Self { Self { diff --git a/src/ipc/mod.rs b/src/ipc/mod.rs index 8d280315..0319f6aa 100644 --- a/src/ipc/mod.rs +++ b/src/ipc/mod.rs @@ -1,7 +1,8 @@ use std::collections::HashMap; use std::sync::Arc; -use crate::error_event::{ERROR_EVENT_NAME, ErrorEventData}; +use crate::error_event::{ErrorEventData, ERROR_EVENT_NAME}; +use crate::event::EventType; use crate::events::event_handler::EventHandler; use crate::namespaces::namespace::Namespace; use crate::prelude::*; @@ -60,13 +61,16 @@ fn handle_event(mut ctx: Context, handler: Arc, event: Event) { if let Err(e) = handler.handle_event(&ctx, event).await { // emit an error event if let Err(e) = ctx - .emit( + .emit_raw( ERROR_EVENT_NAME, + None, + EventType::Error, ErrorEventData { message: format!("{:?}", e), code: 500, }, - ).await + ) + .await { tracing::error!("Error occurred when sending error response: {:?}", e); } diff --git a/src/ipc/stream_emitter.rs b/src/ipc/stream_emitter.rs index ba6b9fc1..76164fc5 100644 --- a/src/ipc/stream_emitter.rs +++ b/src/ipc/stream_emitter.rs @@ -13,7 +13,7 @@ use tokio::sync::Mutex; use tracing; use crate::error::{Error, Result}; -use crate::error_event::{ErrorEventData, ERROR_EVENT_NAME}; +use crate::error_event::ErrorEventData; use crate::event::EventType; use crate::events::event::Event; use crate::ipc::context::Context; @@ -52,7 +52,7 @@ impl StreamEmitter { fn _emit( &self, ctx: Context, - namespace: Option<&str>, + namespace: Option, event: &str, payload: P, res_id: Option, @@ -62,7 +62,7 @@ impl StreamEmitter { ctx, self.stream.clone(), event.to_string(), - namespace.map(|n| n.to_string()), + namespace, payload, res_id, event_type, @@ -98,7 +98,7 @@ impl StreamEmitter { ) -> EmitMetadata

{ self._emit( ctx, - Some(namespace.as_ref()), + Some(namespace.as_ref().to_string()), event.as_ref(), payload, None, @@ -106,6 +106,20 @@ impl StreamEmitter { ) } + /// Emits a raw event + #[inline] + pub(crate) fn emit_raw( + &self, + ctx: Context, + res_id: Option, + event: &str, + namespace: Option, + event_type: EventType, + payload: P, + ) -> EmitMetadata

{ + self._emit(ctx, namespace, event, payload, res_id, event_type) + } + /// Emits a response to an event #[inline] pub(crate) fn emit_response, P: IntoPayload>( @@ -137,7 +151,7 @@ impl StreamEmitter { ) -> EmitMetadata

{ self._emit( ctx, - Some(namespace.as_ref()), + Some(namespace.as_ref().to_string()), event.as_ref(), payload, Some(event_id), @@ -330,7 +344,7 @@ impl Future for EmitMetadataWithResponse Err(Error::Timeout) } }?; - if reply.name() == ERROR_EVENT_NAME { + if reply.event_type() == EventType::Error { Err(reply.payload::()?.into()) } else { Ok(reply)