Change error events to be identified by event type and not by name

Signed-off-by: trivernis <trivernis@protonmail.com>
pull/32/head
trivernis 3 years ago
parent 4803a655d7
commit 2fa63da081
Signed by: Trivernis
GPG Key ID: DFFFCC2C7A02DB45

@ -1,20 +1,20 @@
use std::collections::HashMap; use std::collections::HashMap;
use std::mem; use std::mem;
use std::ops::{Deref, DerefMut}; use std::ops::{Deref, DerefMut};
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use tokio::sync::{Mutex, oneshot, RwLock}; use tokio::sync::oneshot::{Receiver, Sender};
use tokio::sync::oneshot::{Sender, Receiver}; use tokio::sync::{oneshot, Mutex, RwLock};
use tokio::time::Duration; use tokio::time::Duration;
use typemap_rev::TypeMap; use typemap_rev::TypeMap;
use crate::error::{Error, Result}; use crate::error::{Error, Result};
use crate::event::Event; use crate::event::{Event, EventType};
use crate::ipc::stream_emitter::{EmitMetadata, StreamEmitter}; use crate::ipc::stream_emitter::{EmitMetadata, StreamEmitter};
use crate::payload::IntoPayload;
#[cfg(feature = "serialize")] #[cfg(feature = "serialize")]
use crate::payload::{DynamicSerializer, SerdePayload}; use crate::payload::{DynamicSerializer, SerdePayload};
use crate::payload::IntoPayload;
pub(crate) type ReplyListeners = Arc<Mutex<HashMap<u64, oneshot::Sender<Event>>>>; pub(crate) type ReplyListeners = Arc<Mutex<HashMap<u64, oneshot::Sender<Event>>>>;
@ -71,12 +71,26 @@ impl Context {
} }
} }
/// Emits an event with a given payload that can be serialized into bytes /// Emits a raw event. Only for internal use
pub fn emit<S: AsRef<str>, P: IntoPayload>( pub(crate) fn emit_raw<P: IntoPayload>(
&self, &self,
name: S, name: &str,
namespace: Option<String>,
event_type: EventType,
payload: P, payload: P,
) -> EmitMetadata<P> { ) -> EmitMetadata<P> {
self.emitter.emit_raw(
self.clone(),
self.ref_id.clone(),
name,
namespace,
event_type,
payload,
)
}
/// Emits an event with a given payload that can be serialized into bytes
pub fn emit<S: AsRef<str>, P: IntoPayload>(&self, name: S, payload: P) -> EmitMetadata<P> {
if let Some(ref_id) = &self.ref_id { if let Some(ref_id) = &self.ref_id {
self.emitter self.emitter
.emit_response(self.clone(), *ref_id, name, payload) .emit_response(self.clone(), *ref_id, name, payload)
@ -149,16 +163,16 @@ pub struct PooledContext {
} }
pub struct PoolGuard<T> pub struct PoolGuard<T>
where where
T: Clone, T: Clone,
{ {
inner: T, inner: T,
count: Arc<AtomicUsize>, count: Arc<AtomicUsize>,
} }
impl<T> Deref for PoolGuard<T> impl<T> Deref for PoolGuard<T>
where where
T: Clone, T: Clone,
{ {
type Target = T; type Target = T;
@ -169,8 +183,8 @@ impl<T> Deref for PoolGuard<T>
} }
impl<T> DerefMut for PoolGuard<T> impl<T> DerefMut for PoolGuard<T>
where where
T: Clone, T: Clone,
{ {
#[inline] #[inline]
fn deref_mut(&mut self) -> &mut Self::Target { fn deref_mut(&mut self) -> &mut Self::Target {
@ -179,8 +193,8 @@ impl<T> DerefMut for PoolGuard<T>
} }
impl<T> Clone for PoolGuard<T> impl<T> Clone for PoolGuard<T>
where where
T: Clone, T: Clone,
{ {
#[inline] #[inline]
fn clone(&self) -> Self { fn clone(&self) -> Self {
@ -194,8 +208,8 @@ impl<T> Clone for PoolGuard<T>
} }
impl<T> Drop for PoolGuard<T> impl<T> Drop for PoolGuard<T>
where where
T: Clone, T: Clone,
{ {
#[inline] #[inline]
fn drop(&mut self) { fn drop(&mut self) {
@ -204,8 +218,8 @@ impl<T> Drop for PoolGuard<T>
} }
impl<T> PoolGuard<T> impl<T> PoolGuard<T>
where where
T: Clone, T: Clone,
{ {
pub(crate) fn new(inner: T) -> Self { pub(crate) fn new(inner: T) -> Self {
Self { Self {

@ -1,7 +1,8 @@
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use crate::error_event::{ERROR_EVENT_NAME, ErrorEventData}; use crate::error_event::{ErrorEventData, ERROR_EVENT_NAME};
use crate::event::EventType;
use crate::events::event_handler::EventHandler; use crate::events::event_handler::EventHandler;
use crate::namespaces::namespace::Namespace; use crate::namespaces::namespace::Namespace;
use crate::prelude::*; use crate::prelude::*;
@ -60,13 +61,16 @@ fn handle_event(mut ctx: Context, handler: Arc<EventHandler>, event: Event) {
if let Err(e) = handler.handle_event(&ctx, event).await { if let Err(e) = handler.handle_event(&ctx, event).await {
// emit an error event // emit an error event
if let Err(e) = ctx if let Err(e) = ctx
.emit( .emit_raw(
ERROR_EVENT_NAME, ERROR_EVENT_NAME,
None,
EventType::Error,
ErrorEventData { ErrorEventData {
message: format!("{:?}", e), message: format!("{:?}", e),
code: 500, code: 500,
}, },
).await )
.await
{ {
tracing::error!("Error occurred when sending error response: {:?}", e); tracing::error!("Error occurred when sending error response: {:?}", e);
} }

@ -13,7 +13,7 @@ use tokio::sync::Mutex;
use tracing; use tracing;
use crate::error::{Error, Result}; use crate::error::{Error, Result};
use crate::error_event::{ErrorEventData, ERROR_EVENT_NAME}; use crate::error_event::ErrorEventData;
use crate::event::EventType; use crate::event::EventType;
use crate::events::event::Event; use crate::events::event::Event;
use crate::ipc::context::Context; use crate::ipc::context::Context;
@ -52,7 +52,7 @@ impl StreamEmitter {
fn _emit<P: IntoPayload>( fn _emit<P: IntoPayload>(
&self, &self,
ctx: Context, ctx: Context,
namespace: Option<&str>, namespace: Option<String>,
event: &str, event: &str,
payload: P, payload: P,
res_id: Option<u64>, res_id: Option<u64>,
@ -62,7 +62,7 @@ impl StreamEmitter {
ctx, ctx,
self.stream.clone(), self.stream.clone(),
event.to_string(), event.to_string(),
namespace.map(|n| n.to_string()), namespace,
payload, payload,
res_id, res_id,
event_type, event_type,
@ -98,7 +98,7 @@ impl StreamEmitter {
) -> EmitMetadata<P> { ) -> EmitMetadata<P> {
self._emit( self._emit(
ctx, ctx,
Some(namespace.as_ref()), Some(namespace.as_ref().to_string()),
event.as_ref(), event.as_ref(),
payload, payload,
None, None,
@ -106,6 +106,20 @@ impl StreamEmitter {
) )
} }
/// Emits a raw event
#[inline]
pub(crate) fn emit_raw<P: IntoPayload>(
&self,
ctx: Context,
res_id: Option<u64>,
event: &str,
namespace: Option<String>,
event_type: EventType,
payload: P,
) -> EmitMetadata<P> {
self._emit(ctx, namespace, event, payload, res_id, event_type)
}
/// Emits a response to an event /// Emits a response to an event
#[inline] #[inline]
pub(crate) fn emit_response<S: AsRef<str>, P: IntoPayload>( pub(crate) fn emit_response<S: AsRef<str>, P: IntoPayload>(
@ -137,7 +151,7 @@ impl StreamEmitter {
) -> EmitMetadata<P> { ) -> EmitMetadata<P> {
self._emit( self._emit(
ctx, ctx,
Some(namespace.as_ref()), Some(namespace.as_ref().to_string()),
event.as_ref(), event.as_ref(),
payload, payload,
Some(event_id), Some(event_id),
@ -330,7 +344,7 @@ impl<P: IntoPayload + Send + Sync + 'static> Future for EmitMetadataWithResponse
Err(Error::Timeout) Err(Error::Timeout)
} }
}?; }?;
if reply.name() == ERROR_EVENT_NAME { if reply.event_type() == EventType::Error {
Err(reply.payload::<ErrorEventData>()?.into()) Err(reply.payload::<ErrorEventData>()?.into())
} else { } else {
Ok(reply) Ok(reply)

Loading…
Cancel
Save