From 4803a655d7c4ab813177b5486555144528b1e8cd Mon Sep 17 00:00:00 2001 From: trivernis Date: Sun, 6 Feb 2022 11:11:08 +0100 Subject: [PATCH] Introduce event types Signed-off-by: trivernis --- Cargo.lock | 41 ++++++++++++++ Cargo.toml | 1 + src/events/event.rs | 77 ++++++++++++++++++++----- src/events/mod.rs | 1 - src/ipc/stream_emitter.rs | 110 +++++++++++++++++++++++++++--------- tests/test_serialization.rs | 1 - 6 files changed, 188 insertions(+), 43 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1e339cbc..347cf69e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -102,6 +102,7 @@ dependencies = [ "crossbeam-utils", "futures", "lazy_static", + "num_enum", "postcard", "rmp-serde", "serde", @@ -578,6 +579,27 @@ dependencies = [ "libc", ] +[[package]] +name = "num_enum" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "720d3ea1055e4e4574c0c0b0f8c3fd4f24c4cdaf465948206dea090b57b526ad" +dependencies = [ + "num_enum_derive", +] + +[[package]] +name = "num_enum_derive" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d992b768490d7fe0d8586d9b5745f6c49f557da6d81dc982b1d167ad4edbb21" +dependencies = [ + "proc-macro-crate", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "oorandom" version = "11.1.3" @@ -641,6 +663,16 @@ version = "0.1.5-pre" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7c68cb38ed13fd7bc9dd5db8f165b7c8d9c1a315104083a2b10f11354c2af97f" +[[package]] +name = "proc-macro-crate" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ebace6889caf889b4d3f76becee12e90353f2b8c7d875534a71e5742f8f6f83" +dependencies = [ + "thiserror", + "toml", +] + [[package]] name = "proc-macro2" version = "1.0.35" @@ -948,6 +980,15 @@ dependencies = [ "syn", ] +[[package]] +name = "toml" +version = "0.5.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a31142970826733df8241ef35dc040ef98c679ab14d7c3e54d827099b3acecaa" +dependencies = [ + "serde", +] + [[package]] name = "tracing" version = "0.1.29" diff --git a/Cargo.toml b/Cargo.toml index d6285cf8..5f58824f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,6 +27,7 @@ typemap_rev = "0.1.5" byteorder = "1.4.3" async-trait = "0.1.52" futures = "0.3.19" +num_enum = "0.5.6" rmp-serde = {version = "0.15.5", optional = true} bincode = {version = "1.3.3", optional = true} serde_json = {version = "1.0.73", optional = true} diff --git a/src/events/event.rs b/src/events/event.rs index e04f0268..f94df91c 100644 --- a/src/events/event.rs +++ b/src/events/event.rs @@ -2,6 +2,8 @@ use crate::error::{Error, Result}; use crate::events::generate_event_id; use crate::events::payload::FromPayload; use byteorder::{BigEndian, ReadBytesExt}; +use num_enum::{IntoPrimitive, TryFromPrimitive}; +use std::convert::TryFrom; use std::fmt::Debug; use std::io::{Cursor, Read}; use tokio::io::{AsyncRead, AsyncReadExt}; @@ -20,36 +22,75 @@ pub struct Event { #[derive(Debug)] struct EventHeader { id: u64, + event_type: EventType, ref_id: Option, namespace: Option, name: String, } +#[derive(Clone, Debug, TryFromPrimitive, IntoPrimitive, Copy, Ord, PartialOrd, Eq, PartialEq)] +#[repr(u8)] +pub enum EventType { + Initiator, + Response, + End, + Error, +} + impl Event { - /// Creates a new event with a namespace + /// Creates a new event that acts as an initiator for further response events #[tracing::instrument(level = "trace", skip(data))] - pub fn with_namespace( - namespace: String, + #[inline] + pub(crate) fn initiator(namespace: Option, name: String, data: Vec) -> Self { + Self::new(namespace, name, data, None, EventType::Initiator) + } + + /// Creates a new event that is a response to a previous event + #[tracing::instrument(level = "trace", skip(data))] + #[inline] + pub(crate) fn response( + namespace: Option, name: String, data: Vec, - ref_id: Option, + ref_id: u64, ) -> Self { - let header = EventHeader { - id: generate_event_id(), - ref_id, - namespace: Some(namespace), - name, - }; - Self { header, data } + Self::new(namespace, name, data, Some(ref_id), EventType::Response) + } + + /// Creates a new error event as a response to a previous event + #[tracing::instrument(level = "trace", skip(data))] + #[inline] + pub(crate) fn error( + namespace: Option, + name: String, + data: Vec, + ref_id: u64, + ) -> Self { + Self::new(namespace, name, data, Some(ref_id), EventType::Error) + } + + /// Creates a new event that indicates the end of a series of responses (in an event handler) + /// and might contain a final response payload + #[tracing::instrument(level = "trace", skip(data))] + #[inline] + pub(crate) fn end(namespace: Option, name: String, data: Vec, ref_id: u64) -> Self { + Self::new(namespace, name, data, Some(ref_id), EventType::Response) } /// Creates a new event #[tracing::instrument(level = "trace", skip(data))] - pub fn new(name: String, data: Vec, ref_id: Option) -> Self { + pub(crate) fn new( + namespace: Option, + name: String, + data: Vec, + ref_id: Option, + event_type: EventType, + ) -> Self { let header = EventHeader { id: generate_event_id(), + event_type, ref_id, - namespace: None, + namespace, name, }; Self { header, data } @@ -61,6 +102,12 @@ impl Event { self.header.id } + /// The type of the event + #[inline] + pub fn event_type(&self) -> EventType { + self.header.event_type + } + /// The ID of the message referenced by this message. /// It represents the message that is replied to and can be None. #[inline] @@ -139,6 +186,7 @@ impl EventHeader { pub fn into_bytes(self) -> Vec { let mut buf = FORMAT_VERSION.to_vec(); buf.append(&mut self.id.to_be_bytes().to_vec()); + buf.push(self.event_type.into()); if let Some(ref_id) = self.ref_id { buf.push(0xFF); @@ -164,6 +212,8 @@ impl EventHeader { pub fn from_read(reader: &mut R) -> Result { Self::read_version(reader)?; let id = reader.read_u64::()?; + let event_type_num = reader.read_u8()?; + let event_type = EventType::try_from(event_type_num).map_err(|_| Error::CorruptedEvent)?; let ref_id = Self::read_ref_id(reader)?; let namespace_len = reader.read_u16::()?; let namespace = Self::read_namespace(reader, namespace_len)?; @@ -171,6 +221,7 @@ impl EventHeader { Ok(Self { id, + event_type, ref_id, namespace, name, diff --git a/src/events/mod.rs b/src/events/mod.rs index 207ecb7b..2e039d59 100644 --- a/src/events/mod.rs +++ b/src/events/mod.rs @@ -10,7 +10,6 @@ pub mod payload; #[cfg(feature = "serialize")] pub mod payload_serializer; - /// Generates a new event id pub(crate) fn generate_event_id() -> u64 { lazy_static::lazy_static! { diff --git a/src/ipc/stream_emitter.rs b/src/ipc/stream_emitter.rs index fb058198..ba6b9fc1 100644 --- a/src/ipc/stream_emitter.rs +++ b/src/ipc/stream_emitter.rs @@ -13,7 +13,8 @@ use tokio::sync::Mutex; use tracing; use crate::error::{Error, Result}; -use crate::error_event::{ERROR_EVENT_NAME, ErrorEventData}; +use crate::error_event::{ErrorEventData, ERROR_EVENT_NAME}; +use crate::event::EventType; use crate::events::event::Event; use crate::ipc::context::Context; use crate::payload::IntoPayload; @@ -25,9 +26,9 @@ macro_rules! poll_unwrap { v } else { tracing::error!("Polling a future with an invalid state."); - return Poll::Ready(Err(Error::InvalidState)) + return Poll::Ready(Err(Error::InvalidState)); } - } + }; } type SendStream = Arc>; @@ -55,8 +56,17 @@ impl StreamEmitter { event: &str, payload: P, res_id: Option, + event_type: EventType, ) -> EmitMetadata

{ - EmitMetadata::new(ctx, self.stream.clone(), event.to_string(), namespace.map(|n| n.to_string()), payload, res_id) + EmitMetadata::new( + ctx, + self.stream.clone(), + event.to_string(), + namespace.map(|n| n.to_string()), + payload, + res_id, + event_type, + ) } /// Emits an event @@ -67,7 +77,14 @@ impl StreamEmitter { event: S, payload: P, ) -> EmitMetadata

{ - self._emit(ctx, None, event.as_ref(), payload, None) + self._emit( + ctx, + None, + event.as_ref(), + payload, + None, + EventType::Initiator, + ) } /// Emits an event to a specific namespace @@ -79,7 +96,14 @@ impl StreamEmitter { event: S2, payload: P, ) -> EmitMetadata

{ - self._emit(ctx, Some(namespace.as_ref()), event.as_ref(), payload, None) + self._emit( + ctx, + Some(namespace.as_ref()), + event.as_ref(), + payload, + None, + EventType::Initiator, + ) } /// Emits a response to an event @@ -91,7 +115,14 @@ impl StreamEmitter { event: S, payload: P, ) -> EmitMetadata

{ - self._emit(ctx, None, event.as_ref(), payload, Some(event_id)) + self._emit( + ctx, + None, + event.as_ref(), + payload, + Some(event_id), + EventType::Response, + ) } /// Emits a response to an event to a namespace @@ -110,6 +141,7 @@ impl StreamEmitter { event.as_ref(), payload, Some(event_id), + EventType::Response, ) } } @@ -120,6 +152,7 @@ struct EventMetadata { event_namespace: Option>, event_name: Option, res_id: Option>, + event_type: Option, payload: Option

, } @@ -144,13 +177,17 @@ impl EventMetadata

{ let namespace = self.event_namespace.take().ok_or(Error::InvalidState)?; let payload = self.payload.take().ok_or(Error::InvalidState)?; let res_id = self.res_id.take().ok_or(Error::InvalidState)?; + let event_type = self.event_type.take().ok_or(Error::InvalidState)?; let payload_bytes = payload.into_payload(&ctx)?; - let event = if let Some(namespace) = namespace { - Event::with_namespace(namespace.to_string(), event.to_string(), payload_bytes, res_id) - } else { - Event::new(event.to_string(), payload_bytes, res_id) - }; + let event = Event::new( + namespace, + event.to_string(), + payload_bytes, + res_id, + event_type, + ); + self.event = Some(event); Ok(()) @@ -164,20 +201,28 @@ impl EventMetadata

{ pub struct EmitMetadata { event_metadata: Option>, stream: Option, - fut: Option> + Send + Sync>>>, + fut: Option> + Send + Sync>>>, } /// A metadata object returned after waiting for a reply to an event /// This object needs to be awaited for to get the actual reply pub struct EmitMetadataWithResponse { timeout: Option, - fut: Option> + Send + Sync>>>, + fut: Option> + Send + Sync>>>, emit_metadata: Option>, } impl EmitMetadata

{ #[inline] - pub(crate) fn new(ctx: Context, stream: SendStream, event_name: String, event_namespace: Option, payload: P, res_id: Option) -> Self { + pub(crate) fn new( + ctx: Context, + stream: SendStream, + event_name: String, + event_namespace: Option, + payload: P, + res_id: Option, + event_type: EventType, + ) -> Self { Self { event_metadata: Some(EventMetadata { event: None, @@ -186,6 +231,7 @@ impl EmitMetadata

{ event_namespace: Some(event_namespace), payload: Some(payload), res_id: Some(res_id), + event_type: Some(event_type), }), stream: Some(stream), fut: None, @@ -226,9 +272,12 @@ impl Future for EmitMetadata

{ let stream = poll_unwrap!(self.stream.take()); let event = match event_metadata.take_event() { - Ok(m) => { m } - Err(e) => { return Poll::Ready(Err(e)); } - }.expect("poll after future was done"); + Ok(m) => m, + Err(e) => { + return Poll::Ready(Err(e)); + } + } + .expect("poll after future was done"); self.fut = Some(Box::pin(async move { let event_id = event.id(); @@ -250,23 +299,28 @@ impl Future for EmitMetadataWithResponse fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { if self.fut.is_none() { let mut emit_metadata = poll_unwrap!(self.emit_metadata.take()); - let ctx = poll_unwrap!(emit_metadata.event_metadata.as_ref().and_then(|m| m.ctx.clone())); - let timeout = self.timeout.clone().unwrap_or(ctx.default_reply_timeout.clone()); + let ctx = poll_unwrap!(emit_metadata + .event_metadata + .as_ref() + .and_then(|m| m.ctx.clone())); + let timeout = self + .timeout + .clone() + .unwrap_or(ctx.default_reply_timeout.clone()); let event_id = match poll_unwrap!(emit_metadata.event_metadata.as_mut()).get_event() { - Ok(e) => { e.id() } - Err(e) => { return Poll::Ready(Err(e)); } + Ok(e) => e.id(), + Err(e) => { + return Poll::Ready(Err(e)); + } }; self.fut = Some(Box::pin(async move { let tx = ctx.register_reply_listener(event_id).await?; emit_metadata.await?; - let result = future::select( - Box::pin(tx), - Box::pin(tokio::time::sleep(timeout)), - ) - .await; + let result = + future::select(Box::pin(tx), Box::pin(tokio::time::sleep(timeout))).await; let reply = match result { Either::Left((tx_result, _)) => Ok(tx_result?), @@ -285,4 +339,4 @@ impl Future for EmitMetadataWithResponse } self.fut.as_mut().unwrap().as_mut().poll(cx) } -} \ No newline at end of file +} diff --git a/tests/test_serialization.rs b/tests/test_serialization.rs index ac53c485..7826eec3 100644 --- a/tests/test_serialization.rs +++ b/tests/test_serialization.rs @@ -2,7 +2,6 @@ use bromine::prelude::*; #[cfg(feature = "serialize")] use serde::{de::DeserializeOwned, Serialize}; -use std::fmt::Debug; #[cfg(feature = "serialize_rmp")] #[test]