Introduce event types

Signed-off-by: trivernis <trivernis@protonmail.com>
pull/32/head
trivernis 2 years ago
parent 782cc1f460
commit 4803a655d7
Signed by: Trivernis
GPG Key ID: DFFFCC2C7A02DB45

41
Cargo.lock generated

@ -102,6 +102,7 @@ dependencies = [
"crossbeam-utils",
"futures",
"lazy_static",
"num_enum",
"postcard",
"rmp-serde",
"serde",
@ -578,6 +579,27 @@ dependencies = [
"libc",
]
[[package]]
name = "num_enum"
version = "0.5.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "720d3ea1055e4e4574c0c0b0f8c3fd4f24c4cdaf465948206dea090b57b526ad"
dependencies = [
"num_enum_derive",
]
[[package]]
name = "num_enum_derive"
version = "0.5.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0d992b768490d7fe0d8586d9b5745f6c49f557da6d81dc982b1d167ad4edbb21"
dependencies = [
"proc-macro-crate",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "oorandom"
version = "11.1.3"
@ -641,6 +663,16 @@ version = "0.1.5-pre"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7c68cb38ed13fd7bc9dd5db8f165b7c8d9c1a315104083a2b10f11354c2af97f"
[[package]]
name = "proc-macro-crate"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1ebace6889caf889b4d3f76becee12e90353f2b8c7d875534a71e5742f8f6f83"
dependencies = [
"thiserror",
"toml",
]
[[package]]
name = "proc-macro2"
version = "1.0.35"
@ -948,6 +980,15 @@ dependencies = [
"syn",
]
[[package]]
name = "toml"
version = "0.5.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a31142970826733df8241ef35dc040ef98c679ab14d7c3e54d827099b3acecaa"
dependencies = [
"serde",
]
[[package]]
name = "tracing"
version = "0.1.29"

@ -27,6 +27,7 @@ typemap_rev = "0.1.5"
byteorder = "1.4.3"
async-trait = "0.1.52"
futures = "0.3.19"
num_enum = "0.5.6"
rmp-serde = {version = "0.15.5", optional = true}
bincode = {version = "1.3.3", optional = true}
serde_json = {version = "1.0.73", optional = true}

@ -2,6 +2,8 @@ use crate::error::{Error, Result};
use crate::events::generate_event_id;
use crate::events::payload::FromPayload;
use byteorder::{BigEndian, ReadBytesExt};
use num_enum::{IntoPrimitive, TryFromPrimitive};
use std::convert::TryFrom;
use std::fmt::Debug;
use std::io::{Cursor, Read};
use tokio::io::{AsyncRead, AsyncReadExt};
@ -20,36 +22,75 @@ pub struct Event {
#[derive(Debug)]
struct EventHeader {
id: u64,
event_type: EventType,
ref_id: Option<u64>,
namespace: Option<String>,
name: String,
}
#[derive(Clone, Debug, TryFromPrimitive, IntoPrimitive, Copy, Ord, PartialOrd, Eq, PartialEq)]
#[repr(u8)]
pub enum EventType {
Initiator,
Response,
End,
Error,
}
impl Event {
/// Creates a new event with a namespace
/// Creates a new event that acts as an initiator for further response events
#[tracing::instrument(level = "trace", skip(data))]
pub fn with_namespace(
namespace: String,
#[inline]
pub(crate) fn initiator(namespace: Option<String>, name: String, data: Vec<u8>) -> Self {
Self::new(namespace, name, data, None, EventType::Initiator)
}
/// Creates a new event that is a response to a previous event
#[tracing::instrument(level = "trace", skip(data))]
#[inline]
pub(crate) fn response(
namespace: Option<String>,
name: String,
data: Vec<u8>,
ref_id: Option<u64>,
ref_id: u64,
) -> Self {
let header = EventHeader {
id: generate_event_id(),
ref_id,
namespace: Some(namespace),
name,
};
Self { header, data }
Self::new(namespace, name, data, Some(ref_id), EventType::Response)
}
/// Creates a new error event as a response to a previous event
#[tracing::instrument(level = "trace", skip(data))]
#[inline]
pub(crate) fn error(
namespace: Option<String>,
name: String,
data: Vec<u8>,
ref_id: u64,
) -> Self {
Self::new(namespace, name, data, Some(ref_id), EventType::Error)
}
/// Creates a new event that indicates the end of a series of responses (in an event handler)
/// and might contain a final response payload
#[tracing::instrument(level = "trace", skip(data))]
#[inline]
pub(crate) fn end(namespace: Option<String>, name: String, data: Vec<u8>, ref_id: u64) -> Self {
Self::new(namespace, name, data, Some(ref_id), EventType::Response)
}
/// Creates a new event
#[tracing::instrument(level = "trace", skip(data))]
pub fn new(name: String, data: Vec<u8>, ref_id: Option<u64>) -> Self {
pub(crate) fn new(
namespace: Option<String>,
name: String,
data: Vec<u8>,
ref_id: Option<u64>,
event_type: EventType,
) -> Self {
let header = EventHeader {
id: generate_event_id(),
event_type,
ref_id,
namespace: None,
namespace,
name,
};
Self { header, data }
@ -61,6 +102,12 @@ impl Event {
self.header.id
}
/// The type of the event
#[inline]
pub fn event_type(&self) -> EventType {
self.header.event_type
}
/// The ID of the message referenced by this message.
/// It represents the message that is replied to and can be None.
#[inline]
@ -139,6 +186,7 @@ impl EventHeader {
pub fn into_bytes(self) -> Vec<u8> {
let mut buf = FORMAT_VERSION.to_vec();
buf.append(&mut self.id.to_be_bytes().to_vec());
buf.push(self.event_type.into());
if let Some(ref_id) = self.ref_id {
buf.push(0xFF);
@ -164,6 +212,8 @@ impl EventHeader {
pub fn from_read<R: Read>(reader: &mut R) -> Result<Self> {
Self::read_version(reader)?;
let id = reader.read_u64::<BigEndian>()?;
let event_type_num = reader.read_u8()?;
let event_type = EventType::try_from(event_type_num).map_err(|_| Error::CorruptedEvent)?;
let ref_id = Self::read_ref_id(reader)?;
let namespace_len = reader.read_u16::<BigEndian>()?;
let namespace = Self::read_namespace(reader, namespace_len)?;
@ -171,6 +221,7 @@ impl EventHeader {
Ok(Self {
id,
event_type,
ref_id,
namespace,
name,

@ -10,7 +10,6 @@ pub mod payload;
#[cfg(feature = "serialize")]
pub mod payload_serializer;
/// Generates a new event id
pub(crate) fn generate_event_id() -> u64 {
lazy_static::lazy_static! {

@ -13,7 +13,8 @@ use tokio::sync::Mutex;
use tracing;
use crate::error::{Error, Result};
use crate::error_event::{ERROR_EVENT_NAME, ErrorEventData};
use crate::error_event::{ErrorEventData, ERROR_EVENT_NAME};
use crate::event::EventType;
use crate::events::event::Event;
use crate::ipc::context::Context;
use crate::payload::IntoPayload;
@ -25,9 +26,9 @@ macro_rules! poll_unwrap {
v
} else {
tracing::error!("Polling a future with an invalid state.");
return Poll::Ready(Err(Error::InvalidState))
return Poll::Ready(Err(Error::InvalidState));
}
}
};
}
type SendStream = Arc<Mutex<dyn AsyncWrite + Send + Sync + Unpin + 'static>>;
@ -55,8 +56,17 @@ impl StreamEmitter {
event: &str,
payload: P,
res_id: Option<u64>,
event_type: EventType,
) -> EmitMetadata<P> {
EmitMetadata::new(ctx, self.stream.clone(), event.to_string(), namespace.map(|n| n.to_string()), payload, res_id)
EmitMetadata::new(
ctx,
self.stream.clone(),
event.to_string(),
namespace.map(|n| n.to_string()),
payload,
res_id,
event_type,
)
}
/// Emits an event
@ -67,7 +77,14 @@ impl StreamEmitter {
event: S,
payload: P,
) -> EmitMetadata<P> {
self._emit(ctx, None, event.as_ref(), payload, None)
self._emit(
ctx,
None,
event.as_ref(),
payload,
None,
EventType::Initiator,
)
}
/// Emits an event to a specific namespace
@ -79,7 +96,14 @@ impl StreamEmitter {
event: S2,
payload: P,
) -> EmitMetadata<P> {
self._emit(ctx, Some(namespace.as_ref()), event.as_ref(), payload, None)
self._emit(
ctx,
Some(namespace.as_ref()),
event.as_ref(),
payload,
None,
EventType::Initiator,
)
}
/// Emits a response to an event
@ -91,7 +115,14 @@ impl StreamEmitter {
event: S,
payload: P,
) -> EmitMetadata<P> {
self._emit(ctx, None, event.as_ref(), payload, Some(event_id))
self._emit(
ctx,
None,
event.as_ref(),
payload,
Some(event_id),
EventType::Response,
)
}
/// Emits a response to an event to a namespace
@ -110,6 +141,7 @@ impl StreamEmitter {
event.as_ref(),
payload,
Some(event_id),
EventType::Response,
)
}
}
@ -120,6 +152,7 @@ struct EventMetadata<P: IntoPayload> {
event_namespace: Option<Option<String>>,
event_name: Option<String>,
res_id: Option<Option<u64>>,
event_type: Option<EventType>,
payload: Option<P>,
}
@ -144,13 +177,17 @@ impl<P: IntoPayload> EventMetadata<P> {
let namespace = self.event_namespace.take().ok_or(Error::InvalidState)?;
let payload = self.payload.take().ok_or(Error::InvalidState)?;
let res_id = self.res_id.take().ok_or(Error::InvalidState)?;
let event_type = self.event_type.take().ok_or(Error::InvalidState)?;
let payload_bytes = payload.into_payload(&ctx)?;
let event = if let Some(namespace) = namespace {
Event::with_namespace(namespace.to_string(), event.to_string(), payload_bytes, res_id)
} else {
Event::new(event.to_string(), payload_bytes, res_id)
};
let event = Event::new(
namespace,
event.to_string(),
payload_bytes,
res_id,
event_type,
);
self.event = Some(event);
Ok(())
@ -164,20 +201,28 @@ impl<P: IntoPayload> EventMetadata<P> {
pub struct EmitMetadata<P: IntoPayload> {
event_metadata: Option<EventMetadata<P>>,
stream: Option<SendStream>,
fut: Option<Pin<Box<dyn Future<Output=Result<u64>> + Send + Sync>>>,
fut: Option<Pin<Box<dyn Future<Output = Result<u64>> + Send + Sync>>>,
}
/// A metadata object returned after waiting for a reply to an event
/// This object needs to be awaited for to get the actual reply
pub struct EmitMetadataWithResponse<P: IntoPayload> {
timeout: Option<Duration>,
fut: Option<Pin<Box<dyn Future<Output=Result<Event>> + Send + Sync>>>,
fut: Option<Pin<Box<dyn Future<Output = Result<Event>> + Send + Sync>>>,
emit_metadata: Option<EmitMetadata<P>>,
}
impl<P: IntoPayload> EmitMetadata<P> {
#[inline]
pub(crate) fn new(ctx: Context, stream: SendStream, event_name: String, event_namespace: Option<String>, payload: P, res_id: Option<u64>) -> Self {
pub(crate) fn new(
ctx: Context,
stream: SendStream,
event_name: String,
event_namespace: Option<String>,
payload: P,
res_id: Option<u64>,
event_type: EventType,
) -> Self {
Self {
event_metadata: Some(EventMetadata {
event: None,
@ -186,6 +231,7 @@ impl<P: IntoPayload> EmitMetadata<P> {
event_namespace: Some(event_namespace),
payload: Some(payload),
res_id: Some(res_id),
event_type: Some(event_type),
}),
stream: Some(stream),
fut: None,
@ -226,9 +272,12 @@ impl<P: IntoPayload + Send + Sync + 'static> Future for EmitMetadata<P> {
let stream = poll_unwrap!(self.stream.take());
let event = match event_metadata.take_event() {
Ok(m) => { m }
Err(e) => { return Poll::Ready(Err(e)); }
}.expect("poll after future was done");
Ok(m) => m,
Err(e) => {
return Poll::Ready(Err(e));
}
}
.expect("poll after future was done");
self.fut = Some(Box::pin(async move {
let event_id = event.id();
@ -250,23 +299,28 @@ impl<P: IntoPayload + Send + Sync + 'static> Future for EmitMetadataWithResponse
fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
if self.fut.is_none() {
let mut emit_metadata = poll_unwrap!(self.emit_metadata.take());
let ctx = poll_unwrap!(emit_metadata.event_metadata.as_ref().and_then(|m| m.ctx.clone()));
let timeout = self.timeout.clone().unwrap_or(ctx.default_reply_timeout.clone());
let ctx = poll_unwrap!(emit_metadata
.event_metadata
.as_ref()
.and_then(|m| m.ctx.clone()));
let timeout = self
.timeout
.clone()
.unwrap_or(ctx.default_reply_timeout.clone());
let event_id = match poll_unwrap!(emit_metadata.event_metadata.as_mut()).get_event() {
Ok(e) => { e.id() }
Err(e) => { return Poll::Ready(Err(e)); }
Ok(e) => e.id(),
Err(e) => {
return Poll::Ready(Err(e));
}
};
self.fut = Some(Box::pin(async move {
let tx = ctx.register_reply_listener(event_id).await?;
emit_metadata.await?;
let result = future::select(
Box::pin(tx),
Box::pin(tokio::time::sleep(timeout)),
)
.await;
let result =
future::select(Box::pin(tx), Box::pin(tokio::time::sleep(timeout))).await;
let reply = match result {
Either::Left((tx_result, _)) => Ok(tx_result?),
@ -285,4 +339,4 @@ impl<P: IntoPayload + Send + Sync + 'static> Future for EmitMetadataWithResponse
}
self.fut.as_mut().unwrap().as_mut().poll(cx)
}
}
}

@ -2,7 +2,6 @@
use bromine::prelude::*;
#[cfg(feature = "serialize")]
use serde::{de::DeserializeOwned, Serialize};
use std::fmt::Debug;
#[cfg(feature = "serialize_rmp")]
#[test]

Loading…
Cancel
Save