From d1b426e10b0b9fc760d31d22b16a0dedf0c98b98 Mon Sep 17 00:00:00 2001 From: trivernis Date: Sun, 5 Dec 2021 10:30:06 +0100 Subject: [PATCH 1/6] Add event format version validation Signed-off-by: trivernis --- src/error.rs | 14 ++++++++ src/events/event.rs | 82 ++++++++++++++++++++++++++++++++++----------- 2 files changed, 76 insertions(+), 20 deletions(-) diff --git a/src/error.rs b/src/error.rs index 6bf711cc..0ae67c6b 100644 --- a/src/error.rs +++ b/src/error.rs @@ -33,6 +33,20 @@ pub enum Error { #[error("timed out")] Timeout, + + #[error("Unsupported API Version {0}")] + UnsupportedVersion(String), +} + +impl Error { + pub fn unsupported_version_vec(version: Vec) -> Self { + let mut version_string = version + .into_iter() + .fold(String::new(), |acc, val| format!("{}{}.", acc, val)); + version_string.pop(); + + Self::UnsupportedVersion(version_string) + } } impl From for Error { diff --git a/src/events/event.rs b/src/events/event.rs index 30c9f5f0..facdaf7c 100644 --- a/src/events/event.rs +++ b/src/events/event.rs @@ -1,9 +1,14 @@ use crate::error::{Error, Result}; use crate::events::generate_event_id; use crate::events::payload::EventReceivePayload; +use crate::prelude::{IPCError, IPCResult}; +use byteorder::{BigEndian, ReadBytesExt}; use std::fmt::Debug; +use std::io::{Cursor, Read}; use tokio::io::{AsyncRead, AsyncReadExt}; +pub const FORMAT_VERSION: [u8; 3] = [0, 9, 0]; + /// A container representing an event and underlying binary data. /// The data can be decoded into an object representation or read /// as raw binary data. @@ -93,7 +98,10 @@ impl Event { let data_length = total_length - header_length as u64; tracing::trace!(total_length, header_length, data_length); - let header: EventHeader = EventHeader::from_async_read(reader).await?; + let mut header_bytes = vec![0u8; header_length as usize]; + reader.read_exact(&mut header_bytes).await?; + // additional header fields can be added a the end because when reading they will just be ignored + let header: EventHeader = EventHeader::from_read(&mut Cursor::new(header_bytes))?; let mut data = vec![0u8; data_length as usize]; reader.read_exact(&mut data).await?; @@ -124,7 +132,7 @@ impl Event { impl EventHeader { /// Serializes the event header into bytes pub fn into_bytes(self) -> Vec { - let mut buf = Vec::new(); + let mut buf = FORMAT_VERSION.to_vec(); buf.append(&mut self.id.to_be_bytes().to_vec()); if let Some(ref_id) = self.ref_id { @@ -148,33 +156,67 @@ impl EventHeader { } /// Parses an event header from an async reader - pub async fn from_async_read(reader: &mut R) -> Result { - let id = reader.read_u64().await?; - let ref_id_exists = reader.read_u8().await?; + pub fn from_read(reader: &mut R) -> Result { + Self::read_version(reader)?; + let id = reader.read_u64::()?; + let ref_id = Self::read_ref_id(reader)?; + let namespace_len = reader.read_u16::()?; + let namespace = Self::read_namespace(reader, namespace_len)?; + let name = Self::read_name(reader)?; + + Ok(Self { + id, + ref_id, + namespace, + name, + }) + } + + /// Reads and validates the format version + fn read_version(reader: &mut R) -> IPCResult> { + let mut version = vec![0u8; 3]; + reader.read_exact(&mut version)?; + + if version[0] != FORMAT_VERSION[0] { + return Err(IPCError::unsupported_version_vec(version)); + } + + Ok(version) + } + + /// Reads the reference event id + fn read_ref_id(reader: &mut R) -> IPCResult> { + let ref_id_exists = reader.read_u8()?; let ref_id = match ref_id_exists { 0x00 => None, - 0xFF => Some(reader.read_u64().await?), + 0xFF => Some(reader.read_u64::()?), _ => return Err(Error::CorruptedEvent), }; - let namespace_len = reader.read_u16().await?; + Ok(ref_id) + } + + /// Reads the name of the event + fn read_name(reader: &mut R) -> IPCResult { + let name_len = reader.read_u16::()?; + + Self::read_string(reader, name_len as usize) + } + + /// Reads the namespace of the event + fn read_namespace(reader: &mut R, namespace_len: u16) -> IPCResult> { let namespace = if namespace_len > 0 { - let mut namespace_buf = vec![0u8; namespace_len as usize]; - reader.read_exact(&mut namespace_buf).await?; - Some(String::from_utf8(namespace_buf).map_err(|_| Error::CorruptedEvent)?) + Some(Self::read_string(reader, namespace_len as usize)?) } else { None }; - let name_len = reader.read_u16().await?; - let mut name_buf = vec![0u8; name_len as usize]; - reader.read_exact(&mut name_buf).await?; - let name = String::from_utf8(name_buf).map_err(|_| Error::CorruptedEvent)?; - Ok(Self { - id, - ref_id, - namespace, - name, - }) + Ok(namespace) + } + + fn read_string(reader: &mut R, length: usize) -> IPCResult { + let mut string_buf = vec![0u8; length]; + reader.read_exact(&mut string_buf)?; + String::from_utf8(string_buf).map_err(|_| Error::CorruptedEvent) } } From 6299f9be0286980644b2cf016cf8d96be7fdeecc Mon Sep 17 00:00:00 2001 From: trivernis Date: Sun, 5 Dec 2021 17:01:47 +0100 Subject: [PATCH 2/6] Change serialization to be able to use multiple formats Signed-off-by: trivernis --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/events/event.rs | 21 ++- src/events/payload.rs | 65 ++++++++ src/events/payload_serializer/mod.rs | 157 ++++++++++++++++-- .../payload_serializer/serialize_bincode.rs | 27 +-- .../payload_serializer/serialize_json.rs | 27 +-- .../payload_serializer/serialize_postcard.rs | 33 ++-- .../payload_serializer/serialize_rmp.rs | 47 +----- src/ipc/builder.rs | 26 ++- src/ipc/client.rs | 18 ++ src/ipc/context.rs | 16 ++ src/ipc/server.rs | 18 +- src/ipc/stream_emitter.rs | 40 +++-- src/lib.rs | 38 +---- tests/test_events_with_payload.rs | 69 +++++--- 16 files changed, 409 insertions(+), 197 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6bdae6af..ce558d8d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -93,7 +93,7 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" [[package]] name = "bromine" -version = "0.14.0" +version = "0.15.0" dependencies = [ "async-trait", "bincode", diff --git a/Cargo.toml b/Cargo.toml index 47c3047d..1a0be446 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "bromine" -version = "0.14.0" +version = "0.15.0" authors = ["trivernis "] edition = "2018" readme = "README.md" diff --git a/src/events/event.rs b/src/events/event.rs index facdaf7c..637c7cec 100644 --- a/src/events/event.rs +++ b/src/events/event.rs @@ -1,8 +1,12 @@ use crate::error::{Error, Result}; use crate::events::generate_event_id; use crate::events::payload::EventReceivePayload; +#[cfg(feature = "serialize")] +use crate::payload::SerdePayload; use crate::prelude::{IPCError, IPCResult}; use byteorder::{BigEndian, ReadBytesExt}; +#[cfg(feature = "serialize")] +use serde::de::DeserializeOwned; use std::fmt::Debug; use std::io::{Cursor, Read}; use tokio::io::{AsyncRead, AsyncReadExt}; @@ -67,12 +71,21 @@ impl Event { self.header.ref_id.clone() } - /// Decodes the data to the given type + /// Decodes the payload to the given type implementing the receive payload trait #[tracing::instrument(level = "trace", skip(self))] - pub fn data(&self) -> Result { - let data = T::from_payload_bytes(&self.data[..])?; + pub fn payload(&self) -> Result { + let payload = T::from_payload_bytes(&self.data[..])?; - Ok(data) + Ok(payload) + } + + #[cfg(feature = "serialize")] + /// Decodes the payload to the given type implementing DeserializeOwned + #[tracing::instrument(level = "trace", skip(self))] + pub fn serde_payload(&self) -> Result { + let payload = SerdePayload::::from_payload_bytes(&self.data[..])?; + + Ok(payload.data()) } /// Returns a reference of the underlying data diff --git a/src/events/payload.rs b/src/events/payload.rs index 7e5d5731..a663f5d3 100644 --- a/src/events/payload.rs +++ b/src/events/payload.rs @@ -112,3 +112,68 @@ where }) } } + +impl EventSendPayload for () { + fn to_payload_bytes(self) -> IPCResult> { + Ok(vec![]) + } +} + +#[cfg(feature = "serialize")] +mod serde_payload { + use super::DynamicSerializer; + use crate::payload::EventReceivePayload; + use crate::prelude::{EventSendPayload, IPCResult}; + use byteorder::ReadBytesExt; + use serde::de::DeserializeOwned; + use serde::Serialize; + use std::io::Read; + + /// A payload representing a payload storing serde serialized data + pub struct SerdePayload { + data: T, + serializer: DynamicSerializer, + } + + impl SerdePayload { + /// Creates a new serde payload with a specified serializer + pub fn new(serializer: DynamicSerializer, data: T) -> Self { + Self { serializer, data } + } + + pub fn data(self) -> T { + self.data + } + } + + impl EventSendPayload for SerdePayload + where + T: Serialize, + { + fn to_payload_bytes(self) -> IPCResult> { + let mut buf = Vec::new(); + let mut data_bytes = self.serializer.serialize(self.data)?; + let format_id = self.serializer as u8; + buf.push(format_id); + buf.append(&mut data_bytes); + + Ok(buf) + } + } + + impl EventReceivePayload for SerdePayload + where + T: DeserializeOwned, + { + fn from_payload_bytes(mut reader: R) -> IPCResult { + let format_id = reader.read_u8()?; + let serializer = DynamicSerializer::from_primitive(format_id as usize)?; + let data = serializer.deserialize(reader)?; + + Ok(Self { serializer, data }) + } + } +} + +#[cfg(feature = "serialize")] +pub use serde_payload::*; diff --git a/src/events/payload_serializer/mod.rs b/src/events/payload_serializer/mod.rs index 2abdad40..1348de3f 100644 --- a/src/events/payload_serializer/mod.rs +++ b/src/events/payload_serializer/mod.rs @@ -1,23 +1,158 @@ -#[cfg(feature = "serialize_rmp")] -mod serialize_rmp; +use serde::de::DeserializeOwned; +use serde::Serialize; +use std::io::Read; +use thiserror::Error; #[cfg(feature = "serialize_rmp")] -pub use serialize_rmp::*; +mod serialize_rmp; #[cfg(feature = "serialize_bincode")] mod serialize_bincode; -#[cfg(feature = "serialize_bincode")] -pub use serialize_bincode::*; - #[cfg(feature = "serialize_postcard")] mod serialize_postcard; -#[cfg(feature = "serialize_postcard")] -pub use serialize_postcard::*; - #[cfg(feature = "serialize_json")] mod serialize_json; -#[cfg(feature = "serialize_json")] -pub use serialize_json::*; +pub type SerializationResult = std::result::Result; + +#[derive(Debug, Error)] +pub enum SerializationError { + #[cfg(feature = "serialize_rmp")] + #[error("failed to serialize messagepack payload: {0}")] + SerializeRmp(#[from] rmp_serde::encode::Error), + + #[cfg(feature = "serialize_rmp")] + #[error("failed to deserialize messagepack payload: {0}")] + DeserializeRmp(#[from] rmp_serde::decode::Error), + + #[cfg(feature = "serialize_bincode")] + #[error("failed to de/serialize bincode payload: {0}")] + Bincode(#[from] bincode::Error), + + #[cfg(feature = "serialize_postcard")] + #[error("failed to de/serialize postcard payload: {0}")] + Postcard(#[from] postcard::Error), + + #[cfg(feature = "serialize_json")] + #[error("failed to de/serialize json payload: {0}")] + Json(#[from] serde_json::Error), + + #[error("io error occurred on de/serialization: {0}")] + Io(#[from] std::io::Error), + + #[error("the format {0:?} is not available")] + UnavailableFormat(DynamicSerializer), + + #[error("tried to create serializer for unknown format {0}")] + UnknownFormat(usize), +} + +#[derive(Clone, Debug, Ord, PartialOrd, Eq, PartialEq)] +pub enum DynamicSerializer { + Messagepack, + Bincode, + Postcard, + Json, +} + +impl DynamicSerializer { + pub fn first_available() -> Self { + #[cfg(feature = "serialize_rmp")] + { + Self::Messagepack + } + + #[cfg(all(feature = "serialize_bincode", not(feature = "serialize_rmp")))] + { + Self::Bincode + } + + #[cfg(all( + feature = "serialize_postcard", + not(any(feature = "serialize_rmp", feature = "serialize_bincode")) + ))] + { + Self::Postcard + } + + #[cfg(all( + feature = "serialize_json", + not(any( + feature = "serialize_rmp", + feature = "serialize_bincode", + feature = "serialize_postcard" + )) + ))] + { + Self::Json + } + } + + pub fn from_primitive(num: usize) -> SerializationResult { + match num { + #[cfg(feature = "serialize_rmp")] + 0 => Ok(Self::Messagepack), + + #[cfg(feature = "serialize_bincode")] + 1 => Ok(Self::Bincode), + + #[cfg(feature = "serialize_postcard")] + 2 => Ok(Self::Postcard), + + #[cfg(feature = "serialize_json")] + 3 => Ok(Self::Json), + + n => Err(SerializationError::UnknownFormat(n)), + } + } + + pub fn serialize(&self, data: T) -> SerializationResult> { + match self { + #[cfg(feature = "serialize_rmp")] + DynamicSerializer::Messagepack => serialize_rmp::serialize(data), + + #[cfg(feature = "serialize_bincode")] + DynamicSerializer::Bincode => serialize_bincode::serialize(data), + + #[cfg(feature = "serialize_postcard")] + DynamicSerializer::Postcard => serialize_postcard::serialize(data), + + #[cfg(feature = "serialize_json")] + DynamicSerializer::Json => serialize_json::serialize(data), + + #[cfg(not(all( + feature = "serialize_rmp", + feature = "serialize_bincode", + feature = "serialize_postcard", + feature = "serialize_json" + )))] + _ => Err(SerializationError::UnavailableFormat(self.clone())), + } + } + + pub fn deserialize(&self, reader: R) -> SerializationResult { + match self { + #[cfg(feature = "serialize_rmp")] + DynamicSerializer::Messagepack => serialize_rmp::deserialize(reader), + + #[cfg(feature = "serialize_bincode")] + DynamicSerializer::Bincode => serialize_bincode::deserialize(reader), + + #[cfg(feature = "serialize_postcard")] + DynamicSerializer::Postcard => serialize_postcard::deserialize(reader), + + #[cfg(feature = "serialize_json")] + DynamicSerializer::Json => serialize_json::deserialize(reader), + + #[cfg(not(all( + feature = "serialize_rmp", + feature = "serialize_bincode", + feature = "serialize_postcard", + feature = "serialize_json" + )))] + _ => Err(SerializationError::UnavailableFormat(self.clone())), + } + } +} diff --git a/src/events/payload_serializer/serialize_bincode.rs b/src/events/payload_serializer/serialize_bincode.rs index c6cc9c16..ef9fd8ed 100644 --- a/src/events/payload_serializer/serialize_bincode.rs +++ b/src/events/payload_serializer/serialize_bincode.rs @@ -1,28 +1,15 @@ -use crate::payload::{EventReceivePayload, EventSendPayload}; -use crate::prelude::IPCResult; +use crate::payload::SerializationResult; use serde::de::DeserializeOwned; use serde::Serialize; use std::io::Read; -pub type SerializationError = bincode::Error; +pub fn serialize(data: T) -> SerializationResult> { + let bytes = bincode::serialize(&data)?; -impl EventSendPayload for T -where - T: Serialize, -{ - fn to_payload_bytes(self) -> IPCResult> { - let bytes = bincode::serialize(&self)?; - - Ok(bytes) - } + Ok(bytes) } -impl EventReceivePayload for T -where - T: DeserializeOwned, -{ - fn from_payload_bytes(reader: R) -> IPCResult { - let type_data = bincode::deserialize_from(reader)?; - Ok(type_data) - } +pub fn deserialize(reader: R) -> SerializationResult { + let type_data = bincode::deserialize_from(reader)?; + Ok(type_data) } diff --git a/src/events/payload_serializer/serialize_json.rs b/src/events/payload_serializer/serialize_json.rs index 59484b1e..5355ebc2 100644 --- a/src/events/payload_serializer/serialize_json.rs +++ b/src/events/payload_serializer/serialize_json.rs @@ -1,29 +1,16 @@ -use crate::payload::{EventReceivePayload, EventSendPayload}; -use crate::prelude::IPCResult; +use crate::payload::SerializationResult; use serde::de::DeserializeOwned; use serde::Serialize; use std::io::Read; -pub type SerializationError = serde_json::Error; +pub fn serialize(data: T) -> SerializationResult> { + let bytes = serde_json::to_vec(&data)?; -impl EventSendPayload for T -where - T: Serialize, -{ - fn to_payload_bytes(self) -> IPCResult> { - let bytes = serde_json::to_vec(&self)?; - - Ok(bytes) - } + Ok(bytes) } -impl EventReceivePayload for T -where - T: DeserializeOwned, -{ - fn from_payload_bytes(reader: R) -> IPCResult { - let type_data = serde_json::from_reader(reader)?; +pub fn deserialize(reader: R) -> SerializationResult { + let type_data = serde_json::from_reader(reader)?; - Ok(type_data) - } + Ok(type_data) } diff --git a/src/events/payload_serializer/serialize_postcard.rs b/src/events/payload_serializer/serialize_postcard.rs index 5bd02fc3..9e7aeb3e 100644 --- a/src/events/payload_serializer/serialize_postcard.rs +++ b/src/events/payload_serializer/serialize_postcard.rs @@ -1,32 +1,19 @@ -use crate::payload::{EventReceivePayload, EventSendPayload}; -use crate::prelude::IPCResult; +use crate::payload::SerializationResult; use serde::de::DeserializeOwned; use serde::Serialize; use std::io::Read; -pub type SerializationError = postcard::Error; +pub fn serialize(data: T) -> SerializationResult> { + let bytes = postcard::to_allocvec(&data)?.to_vec(); -impl EventSendPayload for T -where - T: Serialize, -{ - fn to_payload_bytes(self) -> IPCResult> { - let bytes = postcard::to_allocvec(&self)?.to_vec(); - - Ok(bytes) - } + Ok(bytes) } -impl EventReceivePayload for T -where - T: DeserializeOwned, -{ - fn from_payload_bytes(mut reader: R) -> IPCResult { - let mut buf = Vec::new(); - // reading to end means reading the full size of the provided data - reader.read_to_end(&mut buf)?; - let type_data = postcard::from_bytes(&buf)?; +pub fn deserialize(mut reader: R) -> SerializationResult { + let mut buf = Vec::new(); + // reading to end means reading the full size of the provided data + reader.read_to_end(&mut buf)?; + let type_data = postcard::from_bytes(&buf)?; - Ok(type_data) - } + Ok(type_data) } diff --git a/src/events/payload_serializer/serialize_rmp.rs b/src/events/payload_serializer/serialize_rmp.rs index 36d963d6..452d88df 100644 --- a/src/events/payload_serializer/serialize_rmp.rs +++ b/src/events/payload_serializer/serialize_rmp.rs @@ -1,48 +1,15 @@ -use crate::payload::{EventReceivePayload, EventSendPayload}; -use crate::prelude::{IPCError, IPCResult}; +use crate::payload::SerializationResult; use serde::de::DeserializeOwned; use serde::Serialize; use std::io::Read; -use thiserror::Error; -#[derive(Debug, Error)] -pub enum SerializationError { - #[error("failed to serialize with rmp: {0}")] - Serialize(#[from] rmp_serde::encode::Error), +pub fn serialize(data: T) -> SerializationResult> { + let bytes = rmp_serde::to_vec(&data)?; - #[error("failed to deserialize with rmp: {0}")] - Deserialize(#[from] rmp_serde::decode::Error), + Ok(bytes) } -impl From for IPCError { - fn from(e: rmp_serde::decode::Error) -> Self { - IPCError::Serialization(SerializationError::Deserialize(e)) - } -} - -impl From for IPCError { - fn from(e: rmp_serde::encode::Error) -> Self { - IPCError::Serialization(SerializationError::Serialize(e)) - } -} - -impl EventSendPayload for T -where - T: Serialize, -{ - fn to_payload_bytes(self) -> IPCResult> { - let bytes = rmp_serde::to_vec(&self)?; - - Ok(bytes) - } -} - -impl EventReceivePayload for T -where - T: DeserializeOwned, -{ - fn from_payload_bytes(reader: R) -> IPCResult { - let type_data = rmp_serde::from_read(reader)?; - Ok(type_data) - } +pub fn deserialize(reader: R) -> SerializationResult { + let type_data = rmp_serde::from_read(reader)?; + Ok(type_data) } diff --git a/src/ipc/builder.rs b/src/ipc/builder.rs index 856d531f..58810081 100644 --- a/src/ipc/builder.rs +++ b/src/ipc/builder.rs @@ -7,6 +7,8 @@ use crate::ipc::context::{Context, PooledContext, ReplyListeners}; use crate::ipc::server::IPCServer; use crate::namespaces::builder::NamespaceBuilder; use crate::namespaces::namespace::Namespace; +#[cfg(feature = "serialize")] +use crate::payload::DynamicSerializer; use crate::protocol::AsyncStreamProtocolListener; use std::collections::HashMap; use std::future::Future; @@ -57,6 +59,8 @@ pub struct IPCBuilder { namespaces: HashMap, data: TypeMap, timeout: Duration, + #[cfg(feature = "serialize")] + default_serializer: DynamicSerializer, } impl IPCBuilder @@ -67,7 +71,7 @@ where let mut handler = EventHandler::new(); handler.on(ERROR_EVENT_NAME, |_, event| { Box::pin(async move { - let error_data = event.data::()?; + let error_data = event.payload::()?; tracing::warn!(error_data.code); tracing::warn!("error_data.message = '{}'", error_data.message); @@ -80,6 +84,8 @@ where namespaces: HashMap::new(), data: TypeMap::new(), timeout: Duration::from_secs(60), + #[cfg(feature = "serialize")] + default_serializer: DynamicSerializer::first_available(), } } @@ -132,6 +138,15 @@ where self } + #[cfg(feature = "serialize")] + /// Sets the default serializer used for rust types that implement + /// serdes Serialize or Deserialize + pub fn default_serializer(mut self, serializer: DynamicSerializer) -> Self { + self.default_serializer = serializer; + + self + } + /// Builds an ipc server #[tracing::instrument(skip(self))] pub async fn build_server(self) -> Result<()> { @@ -141,6 +156,9 @@ where handler: self.handler, data: self.data, timeout: self.timeout, + + #[cfg(feature = "serialize")] + default_serializer: self.default_serializer, }; server.start::(self.address.unwrap()).await?; @@ -153,12 +171,15 @@ where self.validate()?; let data = Arc::new(RwLock::new(self.data)); let reply_listeners = ReplyListeners::default(); + let client = IPCClient { namespaces: self.namespaces, handler: self.handler, data, reply_listeners, timeout: self.timeout, + #[cfg(feature = "serialize")] + default_serializer: self.default_serializer, }; let ctx = client.connect::(self.address.unwrap()).await?; @@ -188,6 +209,9 @@ where data: Arc::clone(&data), reply_listeners: Arc::clone(&reply_listeners), timeout: self.timeout.clone(), + + #[cfg(feature = "serialize")] + default_serializer: self.default_serializer.clone(), }; let ctx = client.connect::(address.clone()).await?; diff --git a/src/ipc/client.rs b/src/ipc/client.rs index 51aeaf23..157d16f8 100644 --- a/src/ipc/client.rs +++ b/src/ipc/client.rs @@ -12,6 +12,9 @@ use tokio::sync::oneshot; use tokio::sync::RwLock; use typemap_rev::TypeMap; +#[cfg(feature = "serialize")] +use crate::payload::DynamicSerializer; + /// The IPC Client to connect to an IPC Server. /// Use the [IPCBuilder](crate::builder::IPCBuilder) to create the client. /// Usually one does not need to use the IPCClient object directly. @@ -22,6 +25,9 @@ pub struct IPCClient { pub(crate) data: Arc>, pub(crate) reply_listeners: ReplyListeners, pub(crate) timeout: Duration, + + #[cfg(feature = "serialize")] + pub(crate) default_serializer: DynamicSerializer, } impl IPCClient { @@ -34,8 +40,20 @@ impl IPCClient { ) -> Result { let stream = S::protocol_connect(address).await?; let (read_half, write_half) = stream.protocol_into_split(); + let emitter = StreamEmitter::new::(write_half); + let (tx, rx) = oneshot::channel(); + #[cfg(feature = "serialize")] + let ctx = Context::new( + StreamEmitter::clone(&emitter), + self.data, + Some(tx), + self.reply_listeners, + self.timeout, + self.default_serializer, + ); + #[cfg(not(feature = "serialize"))] let ctx = Context::new( StreamEmitter::clone(&emitter), self.data, diff --git a/src/ipc/context.rs b/src/ipc/context.rs index 4e34fae9..eb8af95d 100644 --- a/src/ipc/context.rs +++ b/src/ipc/context.rs @@ -3,6 +3,8 @@ use crate::event::Event; use crate::ipc::stream_emitter::StreamEmitter; use futures::future; use futures::future::Either; +#[cfg(feature = "serialize")] +use serde::Serialize; use std::collections::HashMap; use std::mem; use std::ops::{Deref, DerefMut}; @@ -13,6 +15,9 @@ use tokio::sync::{oneshot, Mutex, RwLock}; use tokio::time::Duration; use typemap_rev::TypeMap; +#[cfg(feature = "serialize")] +use crate::payload::{DynamicSerializer, SerdePayload}; + pub(crate) type ReplyListeners = Arc>>>; /// An object provided to each callback function. @@ -40,6 +45,9 @@ pub struct Context { reply_listeners: ReplyListeners, reply_timeout: Duration, + + #[cfg(feature = "serialize")] + default_serializer: DynamicSerializer, } impl Context { @@ -49,6 +57,7 @@ impl Context { stop_sender: Option>, reply_listeners: ReplyListeners, reply_timeout: Duration, + #[cfg(feature = "serialize")] default_serializer: DynamicSerializer, ) -> Self { Self { emitter, @@ -56,6 +65,8 @@ impl Context { data, stop_sender: Arc::new(Mutex::new(stop_sender)), reply_timeout, + #[cfg(feature = "serialize")] + default_serializer, } } @@ -96,6 +107,11 @@ impl Context { Ok(()) } + #[cfg(feature = "serialize")] + pub fn create_serde_payload(&self, data: T) -> SerdePayload { + SerdePayload::new(self.default_serializer.clone(), data) + } + /// Returns the channel for a reply to the given message id pub(crate) async fn get_reply_sender(&self, ref_id: u64) -> Option> { let mut listeners = self.reply_listeners.lock().await; diff --git a/src/ipc/server.rs b/src/ipc/server.rs index 53526e68..a415b238 100644 --- a/src/ipc/server.rs +++ b/src/ipc/server.rs @@ -4,6 +4,10 @@ use crate::events::event_handler::EventHandler; use crate::ipc::context::{Context, ReplyListeners}; use crate::ipc::stream_emitter::StreamEmitter; use crate::namespaces::namespace::Namespace; + +#[cfg(feature = "serialize")] +use crate::payload::DynamicSerializer; + use crate::protocol::{AsyncProtocolStreamSplit, AsyncStreamProtocolListener}; use std::collections::HashMap; use std::sync::Arc; @@ -19,6 +23,9 @@ pub struct IPCServer { pub(crate) namespaces: HashMap, pub(crate) data: TypeMap, pub(crate) timeout: Duration, + + #[cfg(feature = "serialize")] + pub(crate) default_serializer: DynamicSerializer, } impl IPCServer { @@ -41,18 +48,27 @@ impl IPCServer { let namespaces = Arc::clone(&namespaces); let data = Arc::clone(&data); let timeout = self.timeout.clone(); + #[cfg(feature = "serialize")] + let default_serializer = self.default_serializer.clone(); tokio::spawn(async move { let (read_half, write_half) = stream.protocol_into_split(); + let emitter = StreamEmitter::new::(write_half); + let reply_listeners = ReplyListeners::default(); + + #[cfg(feature = "serialize")] let ctx = Context::new( - StreamEmitter::clone(&emitter), + emitter, data, None, reply_listeners, timeout.into(), + default_serializer.clone(), ); + #[cfg(not(feature = "serialize"))] + let ctx = Context::new(emitter, data, None, reply_listeners, timeout.into()); handle_connection::(namespaces, handler, read_half, ctx).await; }); diff --git a/src/ipc/stream_emitter.rs b/src/ipc/stream_emitter.rs index 1f95358d..7cbd232e 100644 --- a/src/ipc/stream_emitter.rs +++ b/src/ipc/stream_emitter.rs @@ -31,16 +31,14 @@ impl StreamEmitter { } } - #[tracing::instrument(level = "trace", skip(self, data))] - pub async fn _emit( + #[tracing::instrument(level = "trace", skip(self, data_bytes))] + pub async fn _emit( &self, namespace: Option<&str>, event: &str, - data: T, + data_bytes: Vec, res_id: Option, ) -> Result { - let data_bytes = data.to_payload_bytes()?; - let event = if let Some(namespace) = namespace { Event::with_namespace(namespace.to_string(), event.to_string(), data_bytes, res_id) } else { @@ -63,9 +61,10 @@ impl StreamEmitter { pub async fn emit, T: EventSendPayload>( &self, event: S, - data: T, + payload: T, ) -> Result { - self._emit(None, event.as_ref(), data, None).await + self._emit(None, event.as_ref(), payload.to_payload_bytes()?, None) + .await } /// Emits an event to a specific namespace @@ -73,10 +72,15 @@ impl StreamEmitter { &self, namespace: S1, event: S2, - data: T, + payload: T, ) -> Result { - self._emit(Some(namespace.as_ref()), event.as_ref(), data, None) - .await + self._emit( + Some(namespace.as_ref()), + event.as_ref(), + payload.to_payload_bytes()?, + None, + ) + .await } /// Emits a response to an event @@ -84,9 +88,15 @@ impl StreamEmitter { &self, event_id: u64, event: S, - data: T, + payload: T, ) -> Result { - self._emit(None, event.as_ref(), data, Some(event_id)).await + self._emit( + None, + event.as_ref(), + payload.to_payload_bytes()?, + Some(event_id), + ) + .await } /// Emits a response to an event to a namespace @@ -95,12 +105,12 @@ impl StreamEmitter { event_id: u64, namespace: S1, event: S2, - data: T, + payload: T, ) -> Result { self._emit( Some(namespace.as_ref()), event.as_ref(), - data, + payload.to_payload_bytes()?, Some(event_id), ) .await @@ -128,7 +138,7 @@ impl EmitMetadata { pub async fn await_reply(&self, ctx: &Context) -> Result { let reply = ctx.await_reply(self.message_id).await?; if reply.name() == ERROR_EVENT_NAME { - Err(reply.data::()?.into()) + Err(reply.payload::()?.into()) } else { Ok(reply) } diff --git a/src/lib.rs b/src/lib.rs index 4639d7dc..165322c0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -110,43 +110,7 @@ feature = "serialize_json" )) ))] -compile_error!("Feature 'serialize' cannot be used by its own. Choose one of 'serialize_rmp', 'serialize_bincode', 'serialize_postcard' instead."); - -#[cfg(any( - all( - feature = "serialize_rmp", - any( - feature = "serialize_postcard", - feature = "serialize_bincode", - feature = "serialize_json" - ) - ), - all( - feature = "serialize_bincode", - any( - feature = "serialize_rmp", - feature = "serialize_postcard", - feature = "serialize_json" - ) - ), - all( - feature = "serialize_postcard", - any( - feature = "serialize_rmp", - feature = "serialize_bincode", - feature = "serialize_json" - ) - ), - all( - feature = "serialize_json", - any( - feature = "serialize_rmp", - feature = "serialize_bincode", - feature = "serialize_postcard" - ) - ) -))] -compile_error!("You cannot use two serialize_* features at the same time"); +compile_error!("Feature 'serialize' cannot be used by its own. Choose one of 'serialize_rmp', 'serialize_bincode', 'serialize_postcard', 'serialize_json instead."); pub mod error; mod events; diff --git a/tests/test_events_with_payload.rs b/tests/test_events_with_payload.rs index dc0e2f35..5eaddafc 100644 --- a/tests/test_events_with_payload.rs +++ b/tests/test_events_with_payload.rs @@ -12,17 +12,14 @@ use utils::protocol::*; async fn it_sends_payloads() { let port = get_free_port(); let ctx = get_client_with_server(port).await; + let payload = SimplePayload { + number: 0, + string: String::from("Hello World"), + }; + #[cfg(feature = "serialize")] + let payload = ctx.create_serde_payload(payload); - ctx.emitter - .emit( - "ping", - SimplePayload { - number: 0, - string: String::from("Hello World"), - }, - ) - .await - .unwrap(); + ctx.emitter.emit("ping", payload).await.unwrap(); // wait for the event to be handled tokio::time::sleep(Duration::from_millis(10)).await; @@ -37,21 +34,27 @@ async fn it_sends_payloads() { async fn it_receives_payloads() { let port = get_free_port(); let ctx = get_client_with_server(port).await; + let payload = SimplePayload { + number: 0, + string: String::from("Hello World"), + }; + #[cfg(feature = "serialize")] + let payload = ctx.create_serde_payload(payload); + let reply = ctx .emitter - .emit( - "ping", - SimplePayload { - number: 0, - string: String::from("Hello World"), - }, - ) + .emit("ping", payload) .await .unwrap() .await_reply(&ctx) .await .unwrap(); - let reply_payload = reply.data::().unwrap(); + #[cfg(not(feature = "serialize"))] + let reply_payload = reply.payload::().unwrap(); + + #[cfg(feature = "serialize")] + let reply_payload = reply.serde_payload::().unwrap(); + let counters = get_counter_from_context(&ctx).await; assert_eq!(counters.get("ping").await, 1); @@ -73,21 +76,41 @@ fn get_builder(port: u8) -> IPCBuilder { async fn handle_ping_event(ctx: &Context, event: Event) -> IPCResult<()> { increment_counter_for_event(ctx, &event).await; - let payload = event.data::()?; - ctx.emitter - .emit_response(event.id(), "pong", payload) - .await?; + let payload = get_simple_payload(&event)?; + #[cfg(feature = "serialize")] + { + ctx.emitter + .emit_response(event.id(), "pong", ctx.create_serde_payload(payload)) + .await?; + } + #[cfg(not(feature = "serialize"))] + { + ctx.emitter + .emit_response(event.id(), "pong", payload) + .await?; + } Ok(()) } async fn handle_pong_event(ctx: &Context, event: Event) -> IPCResult<()> { increment_counter_for_event(ctx, &event).await; - let _payload = event.data::()?; + let _payload = get_simple_payload(&event)?; Ok(()) } +fn get_simple_payload(event: &Event) -> IPCResult { + #[cfg(feature = "serialize")] + { + event.serde_payload::() + } + #[cfg(not(feature = "serialize"))] + { + event.payload::() + } +} + #[cfg(feature = "serialize")] mod payload_impl { use serde::{Deserialize, Serialize}; From fb62135f86bd8e8656f1c8800831a85432173b98 Mon Sep 17 00:00:00 2001 From: trivernis Date: Sun, 5 Dec 2021 17:21:54 +0100 Subject: [PATCH 3/6] Add tests for all payloads Signed-off-by: trivernis --- src/events/payload.rs | 12 ++++++ tests/test_serialization.rs | 76 +++++++++++++++++++++++++++++++++++++ 2 files changed, 88 insertions(+) create mode 100644 tests/test_serialization.rs diff --git a/src/events/payload.rs b/src/events/payload.rs index a663f5d3..77bc6d65 100644 --- a/src/events/payload.rs +++ b/src/events/payload.rs @@ -146,6 +146,18 @@ mod serde_payload { } } + impl Clone for SerdePayload + where + T: Clone, + { + fn clone(&self) -> Self { + Self { + serializer: self.serializer.clone(), + data: self.data.clone(), + } + } + } + impl EventSendPayload for SerdePayload where T: Serialize, diff --git a/tests/test_serialization.rs b/tests/test_serialization.rs new file mode 100644 index 00000000..bdf264ac --- /dev/null +++ b/tests/test_serialization.rs @@ -0,0 +1,76 @@ +use bromine::prelude::*; + +#[cfg(feature = "serialize_rmp")] +#[test] +fn it_serializes_messagepack() { + test_serialization(DynamicSerializer::Messagepack) +} + +#[cfg(feature = "serialize_bincode")] +#[test] +fn it_serializes_bincode() { + test_serialization(DynamicSerializer::Bincode) +} + +#[cfg(feature = "serialize_postcard")] +#[test] +fn it_serializes_postcard() { + test_serialization(DynamicSerializer::Postcard) +} + +#[cfg(feature = "serialize_json")] +#[test] +fn it_serializes_json() { + test_serialization(DynamicSerializer::Json) +} + +#[cfg(feature = "serialize")] +fn test_serialization(serializer: DynamicSerializer) { + let test_payload = get_test_payload(serializer); + let payload_bytes = test_payload.clone().to_payload_bytes().unwrap(); + let payload = TestSerdePayload::from_payload_bytes(&payload_bytes[..]).unwrap(); + assert_eq!(payload.data(), test_payload.data()) +} + +#[cfg(feature = "serialize")] +pub mod payload { + use bromine::payload::{DynamicSerializer, SerdePayload}; + use serde::{Deserialize, Serialize}; + use std::collections::HashMap; + + pub type TestSerdePayload = SerdePayload; + + #[derive(Clone, Serialize, Deserialize, Eq, PartialEq, Debug)] + pub struct TestPayload { + items: Vec, + variant: TestPayloadEnum, + string: String, + signed: i32, + maps: HashMap, + } + + #[derive(Clone, Serialize, Deserialize, Eq, PartialEq, Debug)] + pub enum TestPayloadEnum { + First, + Second, + Third(usize), + } + + pub fn get_test_payload(serializer: DynamicSerializer) -> SerdePayload { + let mut maps = HashMap::new(); + maps.insert("Hello".to_string(), 12); + + maps.insert("Wäüörld".to_string(), -12380); + let inner_payload = TestPayload { + items: vec![0u128, 12452u128, u128::MAX], + variant: TestPayloadEnum::Third(12), + string: String::from("Hello World ſð"), + signed: -12, + maps, + }; + + SerdePayload::new(serializer, inner_payload) + } +} +#[cfg(feature = "serialize")] +pub use payload::*; From e485a81c4cca7ae849ecce8b2e3dd2f59a42f817 Mon Sep 17 00:00:00 2001 From: trivernis Date: Sun, 5 Dec 2021 17:22:40 +0100 Subject: [PATCH 4/6] Update github test action Signed-off-by: trivernis --- .github/workflows/build.yml | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 0a2ff894..1911d2de 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -45,4 +45,7 @@ jobs: run: cargo test --verbose --all --features serialize_postcard - name: Run json serialization tests - run: cargo test --verbose --all --features serialize_json \ No newline at end of file + run: cargo test --verbose --all --features serialize_json + + - name: Run all serialization tests + run: cargo test --verbose --all --all-features \ No newline at end of file From 639db8fc870c7d90ff8d58fe1f5da00a109c1aca Mon Sep 17 00:00:00 2001 From: trivernis Date: Sun, 5 Dec 2021 17:27:59 +0100 Subject: [PATCH 5/6] Update tests to be supported by messagepack Signed-off-by: trivernis --- tests/test_serialization.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_serialization.rs b/tests/test_serialization.rs index bdf264ac..b2a1c7c7 100644 --- a/tests/test_serialization.rs +++ b/tests/test_serialization.rs @@ -42,7 +42,7 @@ pub mod payload { #[derive(Clone, Serialize, Deserialize, Eq, PartialEq, Debug)] pub struct TestPayload { - items: Vec, + items: Vec, variant: TestPayloadEnum, string: String, signed: i32, @@ -62,7 +62,7 @@ pub mod payload { maps.insert("Wäüörld".to_string(), -12380); let inner_payload = TestPayload { - items: vec![0u128, 12452u128, u128::MAX], + items: vec![0u64, 12452u64, u64::MAX], variant: TestPayloadEnum::Third(12), string: String::from("Hello World ſð"), signed: -12, From 4f1884f2afee2416e4265a497cbc2aef63b09379 Mon Sep 17 00:00:00 2001 From: trivernis Date: Sun, 5 Dec 2021 17:59:23 +0100 Subject: [PATCH 6/6] Add specification Signed-off-by: trivernis --- SPECIFICATON.md | 94 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 94 insertions(+) create mode 100644 SPECIFICATON.md diff --git a/SPECIFICATON.md b/SPECIFICATON.md new file mode 100644 index 00000000..73401fcd --- /dev/null +++ b/SPECIFICATON.md @@ -0,0 +1,94 @@ +# Specification + +This specification is split into two parts. The first one explaining how each type is represented in binary form and the +second one specifying the behaviour of event passing and handling. + +## Binary Representation of Events + +Events store contain two types of data. The header data with the id, name, namespace and referenced event id and the +payload with the associated data. Both the header and the payload have a dynamic length and should be extendable while +ensuring backwards compatibility. The binary layout looks like this: + +| Name | Type | Length (bytes) | Description | +|---------------|-----------|----------------------------|------------------------------------| +| total_length | `u64` | 8 | total length of the event in bytes | +| header_length | `u16` | 2 | length of the event header | +| header | `Header` | header_length | the header of the event | +| data | `Vec` | total_length - data_length | the payload of the event | + +### Header + +| Name | Type | Length (bytes) | Description | +|------------------|----------|------------------|------------------------------------------------------| +| format_version | `[u8]` | 3 | version of the specification | +| id | `u64` | 8 | id of the event | +| ref_id_exists | `u8` | 1 | 0xFF indicates that a ref id exists and must be read | +| ref_id | `u64` | 8 | ref id. only when the indicator is 0xFF | +| namespace_length | `u16` | 2 | length of the namespace. 0 means there's none | +| namespace | `String` | namespace_length | namespace of the event | +| name_length | `u16` | 2 | length of the event name | +| name | `String` | name_length | name of the event | + +The header format ensures that it can be read without knowing its length. +That means that a valid header can be deserialized even if the length of the header bytes +is longer. Additional header fields can therefore be appended without having to worry about +backwards compatibility of the format. + + +## Binary Representation of Special Payloads + +### Raw Payload + +The raw payload is a `Vec` and written as is without serialization or deserialization. + + +### Tandem Payload + +The tandem payload contains two inner payloads which can be serialized and deserialized +independently. +Its layout is as follows: + +| Name | Type | Length (bytes) | Description | +|-----------------|-------|-----------------|------------------------------| +| payload1_length | `u64` | 8 | length of the first payload | +| payload1 | `T1` | payload1_length | the first payload | +| payload2_length | `u64` | 8 | length of the second payload | +| payload2 | `T2` | payload2_length | the second payload | + + +### Serde Payload + +The serde payload stores an encoded payload with additional information about the format +the data was serialized as. + +| Name | Type | Length (bytes) | Description | +|-----------|------|----------------|------------------------------------------| +| format_id | `u8` | 1 | the format the payload was serialized as | +| payload | `T` | ~ | the serialized payload | + + +## Behaviour + +### Receiving events + +When receiving an event the handler registered for the name of the event is called. +The event will be ignored if no handler is registered. + + +### Receiving namespaced events + +Namespaced events are handled similar to regular event handling. Instead of searching +for a handler that handles the event with the given name, first the namespace for the +event is retrieved. On the namespace the handler registered for that specific event is called. +If no namespace for the event namespace is registered or no handler is registered for +the event name, the event will be ignored. + + +### Receiving answers to emitted events + +When emitting an event to a peer, the emitter can wait for an answer to that event. +This is achieved by emitting events as a response to a specific event id. +When an event with a reference event id (ref_id) is received, first the registry is +searched for handlers waiting for a response (by trying to receive from a channel). +If a handler can be found, the event is passed to the handler waiting for the response. +Otherwise, the event will be processed as a regular event. \ No newline at end of file