Merge pull request #26 from Trivernis/develop

Rewrite format handling
pull/32/head
Julius Riegel 3 years ago committed by GitHub
commit e3860618f5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -45,4 +45,7 @@ jobs:
run: cargo test --verbose --all --features serialize_postcard
- name: Run json serialization tests
run: cargo test --verbose --all --features serialize_json
run: cargo test --verbose --all --features serialize_json
- name: Run all serialization tests
run: cargo test --verbose --all --all-features

2
Cargo.lock generated

@ -93,7 +93,7 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]]
name = "bromine"
version = "0.14.0"
version = "0.15.0"
dependencies = [
"async-trait",
"bincode",

@ -1,6 +1,6 @@
[package]
name = "bromine"
version = "0.14.0"
version = "0.15.0"
authors = ["trivernis <trivernis@protonmail.com>"]
edition = "2018"
readme = "README.md"

@ -0,0 +1,94 @@
# Specification
This specification is split into two parts. The first one explaining how each type is represented in binary form and the
second one specifying the behaviour of event passing and handling.
## Binary Representation of Events
Events store contain two types of data. The header data with the id, name, namespace and referenced event id and the
payload with the associated data. Both the header and the payload have a dynamic length and should be extendable while
ensuring backwards compatibility. The binary layout looks like this:
| Name | Type | Length (bytes) | Description |
|---------------|-----------|----------------------------|------------------------------------|
| total_length | `u64` | 8 | total length of the event in bytes |
| header_length | `u16` | 2 | length of the event header |
| header | `Header` | header_length | the header of the event |
| data | `Vec<u8>` | total_length - data_length | the payload of the event |
### Header
| Name | Type | Length (bytes) | Description |
|------------------|----------|------------------|------------------------------------------------------|
| format_version | `[u8]` | 3 | version of the specification |
| id | `u64` | 8 | id of the event |
| ref_id_exists | `u8` | 1 | 0xFF indicates that a ref id exists and must be read |
| ref_id | `u64` | 8 | ref id. only when the indicator is 0xFF |
| namespace_length | `u16` | 2 | length of the namespace. 0 means there's none |
| namespace | `String` | namespace_length | namespace of the event |
| name_length | `u16` | 2 | length of the event name |
| name | `String` | name_length | name of the event |
The header format ensures that it can be read without knowing its length.
That means that a valid header can be deserialized even if the length of the header bytes
is longer. Additional header fields can therefore be appended without having to worry about
backwards compatibility of the format.
## Binary Representation of Special Payloads
### Raw Payload
The raw payload is a `Vec<u8>` and written as is without serialization or deserialization.
### Tandem Payload
The tandem payload contains two inner payloads which can be serialized and deserialized
independently.
Its layout is as follows:
| Name | Type | Length (bytes) | Description |
|-----------------|-------|-----------------|------------------------------|
| payload1_length | `u64` | 8 | length of the first payload |
| payload1 | `T1` | payload1_length | the first payload |
| payload2_length | `u64` | 8 | length of the second payload |
| payload2 | `T2` | payload2_length | the second payload |
### Serde Payload
The serde payload stores an encoded payload with additional information about the format
the data was serialized as.
| Name | Type | Length (bytes) | Description |
|-----------|------|----------------|------------------------------------------|
| format_id | `u8` | 1 | the format the payload was serialized as |
| payload | `T` | ~ | the serialized payload |
## Behaviour
### Receiving events
When receiving an event the handler registered for the name of the event is called.
The event will be ignored if no handler is registered.
### Receiving namespaced events
Namespaced events are handled similar to regular event handling. Instead of searching
for a handler that handles the event with the given name, first the namespace for the
event is retrieved. On the namespace the handler registered for that specific event is called.
If no namespace for the event namespace is registered or no handler is registered for
the event name, the event will be ignored.
### Receiving answers to emitted events
When emitting an event to a peer, the emitter can wait for an answer to that event.
This is achieved by emitting events as a response to a specific event id.
When an event with a reference event id (ref_id) is received, first the registry is
searched for handlers waiting for a response (by trying to receive from a channel).
If a handler can be found, the event is passed to the handler waiting for the response.
Otherwise, the event will be processed as a regular event.

@ -33,6 +33,20 @@ pub enum Error {
#[error("timed out")]
Timeout,
#[error("Unsupported API Version {0}")]
UnsupportedVersion(String),
}
impl Error {
pub fn unsupported_version_vec(version: Vec<u8>) -> Self {
let mut version_string = version
.into_iter()
.fold(String::new(), |acc, val| format!("{}{}.", acc, val));
version_string.pop();
Self::UnsupportedVersion(version_string)
}
}
impl From<String> for Error {

@ -1,9 +1,18 @@
use crate::error::{Error, Result};
use crate::events::generate_event_id;
use crate::events::payload::EventReceivePayload;
#[cfg(feature = "serialize")]
use crate::payload::SerdePayload;
use crate::prelude::{IPCError, IPCResult};
use byteorder::{BigEndian, ReadBytesExt};
#[cfg(feature = "serialize")]
use serde::de::DeserializeOwned;
use std::fmt::Debug;
use std::io::{Cursor, Read};
use tokio::io::{AsyncRead, AsyncReadExt};
pub const FORMAT_VERSION: [u8; 3] = [0, 9, 0];
/// A container representing an event and underlying binary data.
/// The data can be decoded into an object representation or read
/// as raw binary data.
@ -62,12 +71,21 @@ impl Event {
self.header.ref_id.clone()
}
/// Decodes the data to the given type
/// Decodes the payload to the given type implementing the receive payload trait
#[tracing::instrument(level = "trace", skip(self))]
pub fn payload<T: EventReceivePayload>(&self) -> Result<T> {
let payload = T::from_payload_bytes(&self.data[..])?;
Ok(payload)
}
#[cfg(feature = "serialize")]
/// Decodes the payload to the given type implementing DeserializeOwned
#[tracing::instrument(level = "trace", skip(self))]
pub fn data<T: EventReceivePayload>(&self) -> Result<T> {
let data = T::from_payload_bytes(&self.data[..])?;
pub fn serde_payload<T: DeserializeOwned>(&self) -> Result<T> {
let payload = SerdePayload::<T>::from_payload_bytes(&self.data[..])?;
Ok(data)
Ok(payload.data())
}
/// Returns a reference of the underlying data
@ -93,7 +111,10 @@ impl Event {
let data_length = total_length - header_length as u64;
tracing::trace!(total_length, header_length, data_length);
let header: EventHeader = EventHeader::from_async_read(reader).await?;
let mut header_bytes = vec![0u8; header_length as usize];
reader.read_exact(&mut header_bytes).await?;
// additional header fields can be added a the end because when reading they will just be ignored
let header: EventHeader = EventHeader::from_read(&mut Cursor::new(header_bytes))?;
let mut data = vec![0u8; data_length as usize];
reader.read_exact(&mut data).await?;
@ -124,7 +145,7 @@ impl Event {
impl EventHeader {
/// Serializes the event header into bytes
pub fn into_bytes(self) -> Vec<u8> {
let mut buf = Vec::new();
let mut buf = FORMAT_VERSION.to_vec();
buf.append(&mut self.id.to_be_bytes().to_vec());
if let Some(ref_id) = self.ref_id {
@ -148,33 +169,67 @@ impl EventHeader {
}
/// Parses an event header from an async reader
pub async fn from_async_read<R: AsyncRead + Unpin>(reader: &mut R) -> Result<Self> {
let id = reader.read_u64().await?;
let ref_id_exists = reader.read_u8().await?;
pub fn from_read<R: Read>(reader: &mut R) -> Result<Self> {
Self::read_version(reader)?;
let id = reader.read_u64::<BigEndian>()?;
let ref_id = Self::read_ref_id(reader)?;
let namespace_len = reader.read_u16::<BigEndian>()?;
let namespace = Self::read_namespace(reader, namespace_len)?;
let name = Self::read_name(reader)?;
Ok(Self {
id,
ref_id,
namespace,
name,
})
}
/// Reads and validates the format version
fn read_version<R: Read>(reader: &mut R) -> IPCResult<Vec<u8>> {
let mut version = vec![0u8; 3];
reader.read_exact(&mut version)?;
if version[0] != FORMAT_VERSION[0] {
return Err(IPCError::unsupported_version_vec(version));
}
Ok(version)
}
/// Reads the reference event id
fn read_ref_id<R: Read>(reader: &mut R) -> IPCResult<Option<u64>> {
let ref_id_exists = reader.read_u8()?;
let ref_id = match ref_id_exists {
0x00 => None,
0xFF => Some(reader.read_u64().await?),
0xFF => Some(reader.read_u64::<BigEndian>()?),
_ => return Err(Error::CorruptedEvent),
};
let namespace_len = reader.read_u16().await?;
Ok(ref_id)
}
/// Reads the name of the event
fn read_name<R: Read>(reader: &mut R) -> IPCResult<String> {
let name_len = reader.read_u16::<BigEndian>()?;
Self::read_string(reader, name_len as usize)
}
/// Reads the namespace of the event
fn read_namespace<R: Read>(reader: &mut R, namespace_len: u16) -> IPCResult<Option<String>> {
let namespace = if namespace_len > 0 {
let mut namespace_buf = vec![0u8; namespace_len as usize];
reader.read_exact(&mut namespace_buf).await?;
Some(String::from_utf8(namespace_buf).map_err(|_| Error::CorruptedEvent)?)
Some(Self::read_string(reader, namespace_len as usize)?)
} else {
None
};
let name_len = reader.read_u16().await?;
let mut name_buf = vec![0u8; name_len as usize];
reader.read_exact(&mut name_buf).await?;
let name = String::from_utf8(name_buf).map_err(|_| Error::CorruptedEvent)?;
Ok(Self {
id,
ref_id,
namespace,
name,
})
Ok(namespace)
}
fn read_string<R: Read>(reader: &mut R, length: usize) -> IPCResult<String> {
let mut string_buf = vec![0u8; length];
reader.read_exact(&mut string_buf)?;
String::from_utf8(string_buf).map_err(|_| Error::CorruptedEvent)
}
}

@ -112,3 +112,80 @@ where
})
}
}
impl EventSendPayload for () {
fn to_payload_bytes(self) -> IPCResult<Vec<u8>> {
Ok(vec![])
}
}
#[cfg(feature = "serialize")]
mod serde_payload {
use super::DynamicSerializer;
use crate::payload::EventReceivePayload;
use crate::prelude::{EventSendPayload, IPCResult};
use byteorder::ReadBytesExt;
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::io::Read;
/// A payload representing a payload storing serde serialized data
pub struct SerdePayload<T> {
data: T,
serializer: DynamicSerializer,
}
impl<T> SerdePayload<T> {
/// Creates a new serde payload with a specified serializer
pub fn new(serializer: DynamicSerializer, data: T) -> Self {
Self { serializer, data }
}
pub fn data(self) -> T {
self.data
}
}
impl<T> Clone for SerdePayload<T>
where
T: Clone,
{
fn clone(&self) -> Self {
Self {
serializer: self.serializer.clone(),
data: self.data.clone(),
}
}
}
impl<T> EventSendPayload for SerdePayload<T>
where
T: Serialize,
{
fn to_payload_bytes(self) -> IPCResult<Vec<u8>> {
let mut buf = Vec::new();
let mut data_bytes = self.serializer.serialize(self.data)?;
let format_id = self.serializer as u8;
buf.push(format_id);
buf.append(&mut data_bytes);
Ok(buf)
}
}
impl<T> EventReceivePayload for SerdePayload<T>
where
T: DeserializeOwned,
{
fn from_payload_bytes<R: Read>(mut reader: R) -> IPCResult<Self> {
let format_id = reader.read_u8()?;
let serializer = DynamicSerializer::from_primitive(format_id as usize)?;
let data = serializer.deserialize(reader)?;
Ok(Self { serializer, data })
}
}
}
#[cfg(feature = "serialize")]
pub use serde_payload::*;

@ -1,23 +1,158 @@
#[cfg(feature = "serialize_rmp")]
mod serialize_rmp;
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::io::Read;
use thiserror::Error;
#[cfg(feature = "serialize_rmp")]
pub use serialize_rmp::*;
mod serialize_rmp;
#[cfg(feature = "serialize_bincode")]
mod serialize_bincode;
#[cfg(feature = "serialize_bincode")]
pub use serialize_bincode::*;
#[cfg(feature = "serialize_postcard")]
mod serialize_postcard;
#[cfg(feature = "serialize_postcard")]
pub use serialize_postcard::*;
#[cfg(feature = "serialize_json")]
mod serialize_json;
#[cfg(feature = "serialize_json")]
pub use serialize_json::*;
pub type SerializationResult<T> = std::result::Result<T, SerializationError>;
#[derive(Debug, Error)]
pub enum SerializationError {
#[cfg(feature = "serialize_rmp")]
#[error("failed to serialize messagepack payload: {0}")]
SerializeRmp(#[from] rmp_serde::encode::Error),
#[cfg(feature = "serialize_rmp")]
#[error("failed to deserialize messagepack payload: {0}")]
DeserializeRmp(#[from] rmp_serde::decode::Error),
#[cfg(feature = "serialize_bincode")]
#[error("failed to de/serialize bincode payload: {0}")]
Bincode(#[from] bincode::Error),
#[cfg(feature = "serialize_postcard")]
#[error("failed to de/serialize postcard payload: {0}")]
Postcard(#[from] postcard::Error),
#[cfg(feature = "serialize_json")]
#[error("failed to de/serialize json payload: {0}")]
Json(#[from] serde_json::Error),
#[error("io error occurred on de/serialization: {0}")]
Io(#[from] std::io::Error),
#[error("the format {0:?} is not available")]
UnavailableFormat(DynamicSerializer),
#[error("tried to create serializer for unknown format {0}")]
UnknownFormat(usize),
}
#[derive(Clone, Debug, Ord, PartialOrd, Eq, PartialEq)]
pub enum DynamicSerializer {
Messagepack,
Bincode,
Postcard,
Json,
}
impl DynamicSerializer {
pub fn first_available() -> Self {
#[cfg(feature = "serialize_rmp")]
{
Self::Messagepack
}
#[cfg(all(feature = "serialize_bincode", not(feature = "serialize_rmp")))]
{
Self::Bincode
}
#[cfg(all(
feature = "serialize_postcard",
not(any(feature = "serialize_rmp", feature = "serialize_bincode"))
))]
{
Self::Postcard
}
#[cfg(all(
feature = "serialize_json",
not(any(
feature = "serialize_rmp",
feature = "serialize_bincode",
feature = "serialize_postcard"
))
))]
{
Self::Json
}
}
pub fn from_primitive(num: usize) -> SerializationResult<Self> {
match num {
#[cfg(feature = "serialize_rmp")]
0 => Ok(Self::Messagepack),
#[cfg(feature = "serialize_bincode")]
1 => Ok(Self::Bincode),
#[cfg(feature = "serialize_postcard")]
2 => Ok(Self::Postcard),
#[cfg(feature = "serialize_json")]
3 => Ok(Self::Json),
n => Err(SerializationError::UnknownFormat(n)),
}
}
pub fn serialize<T: Serialize>(&self, data: T) -> SerializationResult<Vec<u8>> {
match self {
#[cfg(feature = "serialize_rmp")]
DynamicSerializer::Messagepack => serialize_rmp::serialize(data),
#[cfg(feature = "serialize_bincode")]
DynamicSerializer::Bincode => serialize_bincode::serialize(data),
#[cfg(feature = "serialize_postcard")]
DynamicSerializer::Postcard => serialize_postcard::serialize(data),
#[cfg(feature = "serialize_json")]
DynamicSerializer::Json => serialize_json::serialize(data),
#[cfg(not(all(
feature = "serialize_rmp",
feature = "serialize_bincode",
feature = "serialize_postcard",
feature = "serialize_json"
)))]
_ => Err(SerializationError::UnavailableFormat(self.clone())),
}
}
pub fn deserialize<T: DeserializeOwned, R: Read>(&self, reader: R) -> SerializationResult<T> {
match self {
#[cfg(feature = "serialize_rmp")]
DynamicSerializer::Messagepack => serialize_rmp::deserialize(reader),
#[cfg(feature = "serialize_bincode")]
DynamicSerializer::Bincode => serialize_bincode::deserialize(reader),
#[cfg(feature = "serialize_postcard")]
DynamicSerializer::Postcard => serialize_postcard::deserialize(reader),
#[cfg(feature = "serialize_json")]
DynamicSerializer::Json => serialize_json::deserialize(reader),
#[cfg(not(all(
feature = "serialize_rmp",
feature = "serialize_bincode",
feature = "serialize_postcard",
feature = "serialize_json"
)))]
_ => Err(SerializationError::UnavailableFormat(self.clone())),
}
}
}

@ -1,28 +1,15 @@
use crate::payload::{EventReceivePayload, EventSendPayload};
use crate::prelude::IPCResult;
use crate::payload::SerializationResult;
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::io::Read;
pub type SerializationError = bincode::Error;
pub fn serialize<T: Serialize>(data: T) -> SerializationResult<Vec<u8>> {
let bytes = bincode::serialize(&data)?;
impl<T> EventSendPayload for T
where
T: Serialize,
{
fn to_payload_bytes(self) -> IPCResult<Vec<u8>> {
let bytes = bincode::serialize(&self)?;
Ok(bytes)
}
Ok(bytes)
}
impl<T> EventReceivePayload for T
where
T: DeserializeOwned,
{
fn from_payload_bytes<R: Read>(reader: R) -> IPCResult<Self> {
let type_data = bincode::deserialize_from(reader)?;
Ok(type_data)
}
pub fn deserialize<R: Read, T: DeserializeOwned>(reader: R) -> SerializationResult<T> {
let type_data = bincode::deserialize_from(reader)?;
Ok(type_data)
}

@ -1,29 +1,16 @@
use crate::payload::{EventReceivePayload, EventSendPayload};
use crate::prelude::IPCResult;
use crate::payload::SerializationResult;
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::io::Read;
pub type SerializationError = serde_json::Error;
pub fn serialize<T: Serialize>(data: T) -> SerializationResult<Vec<u8>> {
let bytes = serde_json::to_vec(&data)?;
impl<T> EventSendPayload for T
where
T: Serialize,
{
fn to_payload_bytes(self) -> IPCResult<Vec<u8>> {
let bytes = serde_json::to_vec(&self)?;
Ok(bytes)
}
Ok(bytes)
}
impl<T> EventReceivePayload for T
where
T: DeserializeOwned,
{
fn from_payload_bytes<R: Read>(reader: R) -> IPCResult<Self> {
let type_data = serde_json::from_reader(reader)?;
pub fn deserialize<R: Read, T: DeserializeOwned>(reader: R) -> SerializationResult<T> {
let type_data = serde_json::from_reader(reader)?;
Ok(type_data)
}
Ok(type_data)
}

@ -1,32 +1,19 @@
use crate::payload::{EventReceivePayload, EventSendPayload};
use crate::prelude::IPCResult;
use crate::payload::SerializationResult;
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::io::Read;
pub type SerializationError = postcard::Error;
pub fn serialize<T: Serialize>(data: T) -> SerializationResult<Vec<u8>> {
let bytes = postcard::to_allocvec(&data)?.to_vec();
impl<T> EventSendPayload for T
where
T: Serialize,
{
fn to_payload_bytes(self) -> IPCResult<Vec<u8>> {
let bytes = postcard::to_allocvec(&self)?.to_vec();
Ok(bytes)
}
Ok(bytes)
}
impl<T> EventReceivePayload for T
where
T: DeserializeOwned,
{
fn from_payload_bytes<R: Read>(mut reader: R) -> IPCResult<Self> {
let mut buf = Vec::new();
// reading to end means reading the full size of the provided data
reader.read_to_end(&mut buf)?;
let type_data = postcard::from_bytes(&buf)?;
pub fn deserialize<R: Read, T: DeserializeOwned>(mut reader: R) -> SerializationResult<T> {
let mut buf = Vec::new();
// reading to end means reading the full size of the provided data
reader.read_to_end(&mut buf)?;
let type_data = postcard::from_bytes(&buf)?;
Ok(type_data)
}
Ok(type_data)
}

@ -1,48 +1,15 @@
use crate::payload::{EventReceivePayload, EventSendPayload};
use crate::prelude::{IPCError, IPCResult};
use crate::payload::SerializationResult;
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::io::Read;
use thiserror::Error;
#[derive(Debug, Error)]
pub enum SerializationError {
#[error("failed to serialize with rmp: {0}")]
Serialize(#[from] rmp_serde::encode::Error),
pub fn serialize<T: Serialize>(data: T) -> SerializationResult<Vec<u8>> {
let bytes = rmp_serde::to_vec(&data)?;
#[error("failed to deserialize with rmp: {0}")]
Deserialize(#[from] rmp_serde::decode::Error),
Ok(bytes)
}
impl From<rmp_serde::decode::Error> for IPCError {
fn from(e: rmp_serde::decode::Error) -> Self {
IPCError::Serialization(SerializationError::Deserialize(e))
}
}
impl From<rmp_serde::encode::Error> for IPCError {
fn from(e: rmp_serde::encode::Error) -> Self {
IPCError::Serialization(SerializationError::Serialize(e))
}
}
impl<T> EventSendPayload for T
where
T: Serialize,
{
fn to_payload_bytes(self) -> IPCResult<Vec<u8>> {
let bytes = rmp_serde::to_vec(&self)?;
Ok(bytes)
}
}
impl<T> EventReceivePayload for T
where
T: DeserializeOwned,
{
fn from_payload_bytes<R: Read>(reader: R) -> IPCResult<Self> {
let type_data = rmp_serde::from_read(reader)?;
Ok(type_data)
}
pub fn deserialize<R: Read, T: DeserializeOwned>(reader: R) -> SerializationResult<T> {
let type_data = rmp_serde::from_read(reader)?;
Ok(type_data)
}

@ -7,6 +7,8 @@ use crate::ipc::context::{Context, PooledContext, ReplyListeners};
use crate::ipc::server::IPCServer;
use crate::namespaces::builder::NamespaceBuilder;
use crate::namespaces::namespace::Namespace;
#[cfg(feature = "serialize")]
use crate::payload::DynamicSerializer;
use crate::protocol::AsyncStreamProtocolListener;
use std::collections::HashMap;
use std::future::Future;
@ -57,6 +59,8 @@ pub struct IPCBuilder<L: AsyncStreamProtocolListener> {
namespaces: HashMap<String, Namespace>,
data: TypeMap,
timeout: Duration,
#[cfg(feature = "serialize")]
default_serializer: DynamicSerializer,
}
impl<L> IPCBuilder<L>
@ -67,7 +71,7 @@ where
let mut handler = EventHandler::new();
handler.on(ERROR_EVENT_NAME, |_, event| {
Box::pin(async move {
let error_data = event.data::<ErrorEventData>()?;
let error_data = event.payload::<ErrorEventData>()?;
tracing::warn!(error_data.code);
tracing::warn!("error_data.message = '{}'", error_data.message);
@ -80,6 +84,8 @@ where
namespaces: HashMap::new(),
data: TypeMap::new(),
timeout: Duration::from_secs(60),
#[cfg(feature = "serialize")]
default_serializer: DynamicSerializer::first_available(),
}
}
@ -132,6 +138,15 @@ where
self
}
#[cfg(feature = "serialize")]
/// Sets the default serializer used for rust types that implement
/// serdes Serialize or Deserialize
pub fn default_serializer(mut self, serializer: DynamicSerializer) -> Self {
self.default_serializer = serializer;
self
}
/// Builds an ipc server
#[tracing::instrument(skip(self))]
pub async fn build_server(self) -> Result<()> {
@ -141,6 +156,9 @@ where
handler: self.handler,
data: self.data,
timeout: self.timeout,
#[cfg(feature = "serialize")]
default_serializer: self.default_serializer,
};
server.start::<L>(self.address.unwrap()).await?;
@ -153,12 +171,15 @@ where
self.validate()?;
let data = Arc::new(RwLock::new(self.data));
let reply_listeners = ReplyListeners::default();
let client = IPCClient {
namespaces: self.namespaces,
handler: self.handler,
data,
reply_listeners,
timeout: self.timeout,
#[cfg(feature = "serialize")]
default_serializer: self.default_serializer,
};
let ctx = client.connect::<L::Stream>(self.address.unwrap()).await?;
@ -188,6 +209,9 @@ where
data: Arc::clone(&data),
reply_listeners: Arc::clone(&reply_listeners),
timeout: self.timeout.clone(),
#[cfg(feature = "serialize")]
default_serializer: self.default_serializer.clone(),
};
let ctx = client.connect::<L::Stream>(address.clone()).await?;

@ -12,6 +12,9 @@ use tokio::sync::oneshot;
use tokio::sync::RwLock;
use typemap_rev::TypeMap;
#[cfg(feature = "serialize")]
use crate::payload::DynamicSerializer;
/// The IPC Client to connect to an IPC Server.
/// Use the [IPCBuilder](crate::builder::IPCBuilder) to create the client.
/// Usually one does not need to use the IPCClient object directly.
@ -22,6 +25,9 @@ pub struct IPCClient {
pub(crate) data: Arc<RwLock<TypeMap>>,
pub(crate) reply_listeners: ReplyListeners,
pub(crate) timeout: Duration,
#[cfg(feature = "serialize")]
pub(crate) default_serializer: DynamicSerializer,
}
impl IPCClient {
@ -34,8 +40,20 @@ impl IPCClient {
) -> Result<Context> {
let stream = S::protocol_connect(address).await?;
let (read_half, write_half) = stream.protocol_into_split();
let emitter = StreamEmitter::new::<S>(write_half);
let (tx, rx) = oneshot::channel();
#[cfg(feature = "serialize")]
let ctx = Context::new(
StreamEmitter::clone(&emitter),
self.data,
Some(tx),
self.reply_listeners,
self.timeout,
self.default_serializer,
);
#[cfg(not(feature = "serialize"))]
let ctx = Context::new(
StreamEmitter::clone(&emitter),
self.data,

@ -3,6 +3,8 @@ use crate::event::Event;
use crate::ipc::stream_emitter::StreamEmitter;
use futures::future;
use futures::future::Either;
#[cfg(feature = "serialize")]
use serde::Serialize;
use std::collections::HashMap;
use std::mem;
use std::ops::{Deref, DerefMut};
@ -13,6 +15,9 @@ use tokio::sync::{oneshot, Mutex, RwLock};
use tokio::time::Duration;
use typemap_rev::TypeMap;
#[cfg(feature = "serialize")]
use crate::payload::{DynamicSerializer, SerdePayload};
pub(crate) type ReplyListeners = Arc<Mutex<HashMap<u64, oneshot::Sender<Event>>>>;
/// An object provided to each callback function.
@ -40,6 +45,9 @@ pub struct Context {
reply_listeners: ReplyListeners,
reply_timeout: Duration,
#[cfg(feature = "serialize")]
default_serializer: DynamicSerializer,
}
impl Context {
@ -49,6 +57,7 @@ impl Context {
stop_sender: Option<Sender<()>>,
reply_listeners: ReplyListeners,
reply_timeout: Duration,
#[cfg(feature = "serialize")] default_serializer: DynamicSerializer,
) -> Self {
Self {
emitter,
@ -56,6 +65,8 @@ impl Context {
data,
stop_sender: Arc::new(Mutex::new(stop_sender)),
reply_timeout,
#[cfg(feature = "serialize")]
default_serializer,
}
}
@ -96,6 +107,11 @@ impl Context {
Ok(())
}
#[cfg(feature = "serialize")]
pub fn create_serde_payload<T: Serialize>(&self, data: T) -> SerdePayload<T> {
SerdePayload::new(self.default_serializer.clone(), data)
}
/// Returns the channel for a reply to the given message id
pub(crate) async fn get_reply_sender(&self, ref_id: u64) -> Option<oneshot::Sender<Event>> {
let mut listeners = self.reply_listeners.lock().await;

@ -4,6 +4,10 @@ use crate::events::event_handler::EventHandler;
use crate::ipc::context::{Context, ReplyListeners};
use crate::ipc::stream_emitter::StreamEmitter;
use crate::namespaces::namespace::Namespace;
#[cfg(feature = "serialize")]
use crate::payload::DynamicSerializer;
use crate::protocol::{AsyncProtocolStreamSplit, AsyncStreamProtocolListener};
use std::collections::HashMap;
use std::sync::Arc;
@ -19,6 +23,9 @@ pub struct IPCServer {
pub(crate) namespaces: HashMap<String, Namespace>,
pub(crate) data: TypeMap,
pub(crate) timeout: Duration,
#[cfg(feature = "serialize")]
pub(crate) default_serializer: DynamicSerializer,
}
impl IPCServer {
@ -41,18 +48,27 @@ impl IPCServer {
let namespaces = Arc::clone(&namespaces);
let data = Arc::clone(&data);
let timeout = self.timeout.clone();
#[cfg(feature = "serialize")]
let default_serializer = self.default_serializer.clone();
tokio::spawn(async move {
let (read_half, write_half) = stream.protocol_into_split();
let emitter = StreamEmitter::new::<L::Stream>(write_half);
let reply_listeners = ReplyListeners::default();
#[cfg(feature = "serialize")]
let ctx = Context::new(
StreamEmitter::clone(&emitter),
emitter,
data,
None,
reply_listeners,
timeout.into(),
default_serializer.clone(),
);
#[cfg(not(feature = "serialize"))]
let ctx = Context::new(emitter, data, None, reply_listeners, timeout.into());
handle_connection::<L::Stream>(namespaces, handler, read_half, ctx).await;
});

@ -31,16 +31,14 @@ impl StreamEmitter {
}
}
#[tracing::instrument(level = "trace", skip(self, data))]
pub async fn _emit<T: EventSendPayload>(
#[tracing::instrument(level = "trace", skip(self, data_bytes))]
pub async fn _emit(
&self,
namespace: Option<&str>,
event: &str,
data: T,
data_bytes: Vec<u8>,
res_id: Option<u64>,
) -> Result<EmitMetadata> {
let data_bytes = data.to_payload_bytes()?;
let event = if let Some(namespace) = namespace {
Event::with_namespace(namespace.to_string(), event.to_string(), data_bytes, res_id)
} else {
@ -63,9 +61,10 @@ impl StreamEmitter {
pub async fn emit<S: AsRef<str>, T: EventSendPayload>(
&self,
event: S,
data: T,
payload: T,
) -> Result<EmitMetadata> {
self._emit(None, event.as_ref(), data, None).await
self._emit(None, event.as_ref(), payload.to_payload_bytes()?, None)
.await
}
/// Emits an event to a specific namespace
@ -73,10 +72,15 @@ impl StreamEmitter {
&self,
namespace: S1,
event: S2,
data: T,
payload: T,
) -> Result<EmitMetadata> {
self._emit(Some(namespace.as_ref()), event.as_ref(), data, None)
.await
self._emit(
Some(namespace.as_ref()),
event.as_ref(),
payload.to_payload_bytes()?,
None,
)
.await
}
/// Emits a response to an event
@ -84,9 +88,15 @@ impl StreamEmitter {
&self,
event_id: u64,
event: S,
data: T,
payload: T,
) -> Result<EmitMetadata> {
self._emit(None, event.as_ref(), data, Some(event_id)).await
self._emit(
None,
event.as_ref(),
payload.to_payload_bytes()?,
Some(event_id),
)
.await
}
/// Emits a response to an event to a namespace
@ -95,12 +105,12 @@ impl StreamEmitter {
event_id: u64,
namespace: S1,
event: S2,
data: T,
payload: T,
) -> Result<EmitMetadata> {
self._emit(
Some(namespace.as_ref()),
event.as_ref(),
data,
payload.to_payload_bytes()?,
Some(event_id),
)
.await
@ -128,7 +138,7 @@ impl EmitMetadata {
pub async fn await_reply(&self, ctx: &Context) -> Result<Event> {
let reply = ctx.await_reply(self.message_id).await?;
if reply.name() == ERROR_EVENT_NAME {
Err(reply.data::<ErrorEventData>()?.into())
Err(reply.payload::<ErrorEventData>()?.into())
} else {
Ok(reply)
}

@ -110,43 +110,7 @@
feature = "serialize_json"
))
))]
compile_error!("Feature 'serialize' cannot be used by its own. Choose one of 'serialize_rmp', 'serialize_bincode', 'serialize_postcard' instead.");
#[cfg(any(
all(
feature = "serialize_rmp",
any(
feature = "serialize_postcard",
feature = "serialize_bincode",
feature = "serialize_json"
)
),
all(
feature = "serialize_bincode",
any(
feature = "serialize_rmp",
feature = "serialize_postcard",
feature = "serialize_json"
)
),
all(
feature = "serialize_postcard",
any(
feature = "serialize_rmp",
feature = "serialize_bincode",
feature = "serialize_json"
)
),
all(
feature = "serialize_json",
any(
feature = "serialize_rmp",
feature = "serialize_bincode",
feature = "serialize_postcard"
)
)
))]
compile_error!("You cannot use two serialize_* features at the same time");
compile_error!("Feature 'serialize' cannot be used by its own. Choose one of 'serialize_rmp', 'serialize_bincode', 'serialize_postcard', 'serialize_json instead.");
pub mod error;
mod events;

@ -12,17 +12,14 @@ use utils::protocol::*;
async fn it_sends_payloads() {
let port = get_free_port();
let ctx = get_client_with_server(port).await;
let payload = SimplePayload {
number: 0,
string: String::from("Hello World"),
};
#[cfg(feature = "serialize")]
let payload = ctx.create_serde_payload(payload);
ctx.emitter
.emit(
"ping",
SimplePayload {
number: 0,
string: String::from("Hello World"),
},
)
.await
.unwrap();
ctx.emitter.emit("ping", payload).await.unwrap();
// wait for the event to be handled
tokio::time::sleep(Duration::from_millis(10)).await;
@ -37,21 +34,27 @@ async fn it_sends_payloads() {
async fn it_receives_payloads() {
let port = get_free_port();
let ctx = get_client_with_server(port).await;
let payload = SimplePayload {
number: 0,
string: String::from("Hello World"),
};
#[cfg(feature = "serialize")]
let payload = ctx.create_serde_payload(payload);
let reply = ctx
.emitter
.emit(
"ping",
SimplePayload {
number: 0,
string: String::from("Hello World"),
},
)
.emit("ping", payload)
.await
.unwrap()
.await_reply(&ctx)
.await
.unwrap();
let reply_payload = reply.data::<SimplePayload>().unwrap();
#[cfg(not(feature = "serialize"))]
let reply_payload = reply.payload::<SimplePayload>().unwrap();
#[cfg(feature = "serialize")]
let reply_payload = reply.serde_payload::<SimplePayload>().unwrap();
let counters = get_counter_from_context(&ctx).await;
assert_eq!(counters.get("ping").await, 1);
@ -73,21 +76,41 @@ fn get_builder(port: u8) -> IPCBuilder<TestProtocolListener> {
async fn handle_ping_event(ctx: &Context, event: Event) -> IPCResult<()> {
increment_counter_for_event(ctx, &event).await;
let payload = event.data::<SimplePayload>()?;
ctx.emitter
.emit_response(event.id(), "pong", payload)
.await?;
let payload = get_simple_payload(&event)?;
#[cfg(feature = "serialize")]
{
ctx.emitter
.emit_response(event.id(), "pong", ctx.create_serde_payload(payload))
.await?;
}
#[cfg(not(feature = "serialize"))]
{
ctx.emitter
.emit_response(event.id(), "pong", payload)
.await?;
}
Ok(())
}
async fn handle_pong_event(ctx: &Context, event: Event) -> IPCResult<()> {
increment_counter_for_event(ctx, &event).await;
let _payload = event.data::<SimplePayload>()?;
let _payload = get_simple_payload(&event)?;
Ok(())
}
fn get_simple_payload(event: &Event) -> IPCResult<SimplePayload> {
#[cfg(feature = "serialize")]
{
event.serde_payload::<SimplePayload>()
}
#[cfg(not(feature = "serialize"))]
{
event.payload::<SimplePayload>()
}
}
#[cfg(feature = "serialize")]
mod payload_impl {
use serde::{Deserialize, Serialize};

@ -0,0 +1,76 @@
use bromine::prelude::*;
#[cfg(feature = "serialize_rmp")]
#[test]
fn it_serializes_messagepack() {
test_serialization(DynamicSerializer::Messagepack)
}
#[cfg(feature = "serialize_bincode")]
#[test]
fn it_serializes_bincode() {
test_serialization(DynamicSerializer::Bincode)
}
#[cfg(feature = "serialize_postcard")]
#[test]
fn it_serializes_postcard() {
test_serialization(DynamicSerializer::Postcard)
}
#[cfg(feature = "serialize_json")]
#[test]
fn it_serializes_json() {
test_serialization(DynamicSerializer::Json)
}
#[cfg(feature = "serialize")]
fn test_serialization(serializer: DynamicSerializer) {
let test_payload = get_test_payload(serializer);
let payload_bytes = test_payload.clone().to_payload_bytes().unwrap();
let payload = TestSerdePayload::from_payload_bytes(&payload_bytes[..]).unwrap();
assert_eq!(payload.data(), test_payload.data())
}
#[cfg(feature = "serialize")]
pub mod payload {
use bromine::payload::{DynamicSerializer, SerdePayload};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
pub type TestSerdePayload = SerdePayload<TestPayload>;
#[derive(Clone, Serialize, Deserialize, Eq, PartialEq, Debug)]
pub struct TestPayload {
items: Vec<u64>,
variant: TestPayloadEnum,
string: String,
signed: i32,
maps: HashMap<String, i64>,
}
#[derive(Clone, Serialize, Deserialize, Eq, PartialEq, Debug)]
pub enum TestPayloadEnum {
First,
Second,
Third(usize),
}
pub fn get_test_payload(serializer: DynamicSerializer) -> SerdePayload<TestPayload> {
let mut maps = HashMap::new();
maps.insert("Hello".to_string(), 12);
maps.insert("Wäüörld".to_string(), -12380);
let inner_payload = TestPayload {
items: vec![0u64, 12452u64, u64::MAX],
variant: TestPayloadEnum::Third(12),
string: String::from("Hello World ſð"),
signed: -12,
maps,
};
SerdePayload::new(serializer, inner_payload)
}
}
#[cfg(feature = "serialize")]
pub use payload::*;
Loading…
Cancel
Save