Change serialization to be able to use multiple formats

Signed-off-by: trivernis <trivernis@protonmail.com>
pull/26/head
trivernis 2 years ago
parent d1b426e10b
commit 6299f9be02
Signed by: Trivernis
GPG Key ID: DFFFCC2C7A02DB45

2
Cargo.lock generated

@ -93,7 +93,7 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]]
name = "bromine"
version = "0.14.0"
version = "0.15.0"
dependencies = [
"async-trait",
"bincode",

@ -1,6 +1,6 @@
[package]
name = "bromine"
version = "0.14.0"
version = "0.15.0"
authors = ["trivernis <trivernis@protonmail.com>"]
edition = "2018"
readme = "README.md"

@ -1,8 +1,12 @@
use crate::error::{Error, Result};
use crate::events::generate_event_id;
use crate::events::payload::EventReceivePayload;
#[cfg(feature = "serialize")]
use crate::payload::SerdePayload;
use crate::prelude::{IPCError, IPCResult};
use byteorder::{BigEndian, ReadBytesExt};
#[cfg(feature = "serialize")]
use serde::de::DeserializeOwned;
use std::fmt::Debug;
use std::io::{Cursor, Read};
use tokio::io::{AsyncRead, AsyncReadExt};
@ -67,12 +71,21 @@ impl Event {
self.header.ref_id.clone()
}
/// Decodes the data to the given type
/// Decodes the payload to the given type implementing the receive payload trait
#[tracing::instrument(level = "trace", skip(self))]
pub fn data<T: EventReceivePayload>(&self) -> Result<T> {
let data = T::from_payload_bytes(&self.data[..])?;
pub fn payload<T: EventReceivePayload>(&self) -> Result<T> {
let payload = T::from_payload_bytes(&self.data[..])?;
Ok(data)
Ok(payload)
}
#[cfg(feature = "serialize")]
/// Decodes the payload to the given type implementing DeserializeOwned
#[tracing::instrument(level = "trace", skip(self))]
pub fn serde_payload<T: DeserializeOwned>(&self) -> Result<T> {
let payload = SerdePayload::<T>::from_payload_bytes(&self.data[..])?;
Ok(payload.data())
}
/// Returns a reference of the underlying data

@ -112,3 +112,68 @@ where
})
}
}
impl EventSendPayload for () {
fn to_payload_bytes(self) -> IPCResult<Vec<u8>> {
Ok(vec![])
}
}
#[cfg(feature = "serialize")]
mod serde_payload {
use super::DynamicSerializer;
use crate::payload::EventReceivePayload;
use crate::prelude::{EventSendPayload, IPCResult};
use byteorder::ReadBytesExt;
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::io::Read;
/// A payload representing a payload storing serde serialized data
pub struct SerdePayload<T> {
data: T,
serializer: DynamicSerializer,
}
impl<T> SerdePayload<T> {
/// Creates a new serde payload with a specified serializer
pub fn new(serializer: DynamicSerializer, data: T) -> Self {
Self { serializer, data }
}
pub fn data(self) -> T {
self.data
}
}
impl<T> EventSendPayload for SerdePayload<T>
where
T: Serialize,
{
fn to_payload_bytes(self) -> IPCResult<Vec<u8>> {
let mut buf = Vec::new();
let mut data_bytes = self.serializer.serialize(self.data)?;
let format_id = self.serializer as u8;
buf.push(format_id);
buf.append(&mut data_bytes);
Ok(buf)
}
}
impl<T> EventReceivePayload for SerdePayload<T>
where
T: DeserializeOwned,
{
fn from_payload_bytes<R: Read>(mut reader: R) -> IPCResult<Self> {
let format_id = reader.read_u8()?;
let serializer = DynamicSerializer::from_primitive(format_id as usize)?;
let data = serializer.deserialize(reader)?;
Ok(Self { serializer, data })
}
}
}
#[cfg(feature = "serialize")]
pub use serde_payload::*;

@ -1,23 +1,158 @@
#[cfg(feature = "serialize_rmp")]
mod serialize_rmp;
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::io::Read;
use thiserror::Error;
#[cfg(feature = "serialize_rmp")]
pub use serialize_rmp::*;
mod serialize_rmp;
#[cfg(feature = "serialize_bincode")]
mod serialize_bincode;
#[cfg(feature = "serialize_bincode")]
pub use serialize_bincode::*;
#[cfg(feature = "serialize_postcard")]
mod serialize_postcard;
#[cfg(feature = "serialize_postcard")]
pub use serialize_postcard::*;
#[cfg(feature = "serialize_json")]
mod serialize_json;
#[cfg(feature = "serialize_json")]
pub use serialize_json::*;
pub type SerializationResult<T> = std::result::Result<T, SerializationError>;
#[derive(Debug, Error)]
pub enum SerializationError {
#[cfg(feature = "serialize_rmp")]
#[error("failed to serialize messagepack payload: {0}")]
SerializeRmp(#[from] rmp_serde::encode::Error),
#[cfg(feature = "serialize_rmp")]
#[error("failed to deserialize messagepack payload: {0}")]
DeserializeRmp(#[from] rmp_serde::decode::Error),
#[cfg(feature = "serialize_bincode")]
#[error("failed to de/serialize bincode payload: {0}")]
Bincode(#[from] bincode::Error),
#[cfg(feature = "serialize_postcard")]
#[error("failed to de/serialize postcard payload: {0}")]
Postcard(#[from] postcard::Error),
#[cfg(feature = "serialize_json")]
#[error("failed to de/serialize json payload: {0}")]
Json(#[from] serde_json::Error),
#[error("io error occurred on de/serialization: {0}")]
Io(#[from] std::io::Error),
#[error("the format {0:?} is not available")]
UnavailableFormat(DynamicSerializer),
#[error("tried to create serializer for unknown format {0}")]
UnknownFormat(usize),
}
#[derive(Clone, Debug, Ord, PartialOrd, Eq, PartialEq)]
pub enum DynamicSerializer {
Messagepack,
Bincode,
Postcard,
Json,
}
impl DynamicSerializer {
pub fn first_available() -> Self {
#[cfg(feature = "serialize_rmp")]
{
Self::Messagepack
}
#[cfg(all(feature = "serialize_bincode", not(feature = "serialize_rmp")))]
{
Self::Bincode
}
#[cfg(all(
feature = "serialize_postcard",
not(any(feature = "serialize_rmp", feature = "serialize_bincode"))
))]
{
Self::Postcard
}
#[cfg(all(
feature = "serialize_json",
not(any(
feature = "serialize_rmp",
feature = "serialize_bincode",
feature = "serialize_postcard"
))
))]
{
Self::Json
}
}
pub fn from_primitive(num: usize) -> SerializationResult<Self> {
match num {
#[cfg(feature = "serialize_rmp")]
0 => Ok(Self::Messagepack),
#[cfg(feature = "serialize_bincode")]
1 => Ok(Self::Bincode),
#[cfg(feature = "serialize_postcard")]
2 => Ok(Self::Postcard),
#[cfg(feature = "serialize_json")]
3 => Ok(Self::Json),
n => Err(SerializationError::UnknownFormat(n)),
}
}
pub fn serialize<T: Serialize>(&self, data: T) -> SerializationResult<Vec<u8>> {
match self {
#[cfg(feature = "serialize_rmp")]
DynamicSerializer::Messagepack => serialize_rmp::serialize(data),
#[cfg(feature = "serialize_bincode")]
DynamicSerializer::Bincode => serialize_bincode::serialize(data),
#[cfg(feature = "serialize_postcard")]
DynamicSerializer::Postcard => serialize_postcard::serialize(data),
#[cfg(feature = "serialize_json")]
DynamicSerializer::Json => serialize_json::serialize(data),
#[cfg(not(all(
feature = "serialize_rmp",
feature = "serialize_bincode",
feature = "serialize_postcard",
feature = "serialize_json"
)))]
_ => Err(SerializationError::UnavailableFormat(self.clone())),
}
}
pub fn deserialize<T: DeserializeOwned, R: Read>(&self, reader: R) -> SerializationResult<T> {
match self {
#[cfg(feature = "serialize_rmp")]
DynamicSerializer::Messagepack => serialize_rmp::deserialize(reader),
#[cfg(feature = "serialize_bincode")]
DynamicSerializer::Bincode => serialize_bincode::deserialize(reader),
#[cfg(feature = "serialize_postcard")]
DynamicSerializer::Postcard => serialize_postcard::deserialize(reader),
#[cfg(feature = "serialize_json")]
DynamicSerializer::Json => serialize_json::deserialize(reader),
#[cfg(not(all(
feature = "serialize_rmp",
feature = "serialize_bincode",
feature = "serialize_postcard",
feature = "serialize_json"
)))]
_ => Err(SerializationError::UnavailableFormat(self.clone())),
}
}
}

@ -1,28 +1,15 @@
use crate::payload::{EventReceivePayload, EventSendPayload};
use crate::prelude::IPCResult;
use crate::payload::SerializationResult;
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::io::Read;
pub type SerializationError = bincode::Error;
pub fn serialize<T: Serialize>(data: T) -> SerializationResult<Vec<u8>> {
let bytes = bincode::serialize(&data)?;
impl<T> EventSendPayload for T
where
T: Serialize,
{
fn to_payload_bytes(self) -> IPCResult<Vec<u8>> {
let bytes = bincode::serialize(&self)?;
Ok(bytes)
}
Ok(bytes)
}
impl<T> EventReceivePayload for T
where
T: DeserializeOwned,
{
fn from_payload_bytes<R: Read>(reader: R) -> IPCResult<Self> {
let type_data = bincode::deserialize_from(reader)?;
Ok(type_data)
}
pub fn deserialize<R: Read, T: DeserializeOwned>(reader: R) -> SerializationResult<T> {
let type_data = bincode::deserialize_from(reader)?;
Ok(type_data)
}

@ -1,29 +1,16 @@
use crate::payload::{EventReceivePayload, EventSendPayload};
use crate::prelude::IPCResult;
use crate::payload::SerializationResult;
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::io::Read;
pub type SerializationError = serde_json::Error;
pub fn serialize<T: Serialize>(data: T) -> SerializationResult<Vec<u8>> {
let bytes = serde_json::to_vec(&data)?;
impl<T> EventSendPayload for T
where
T: Serialize,
{
fn to_payload_bytes(self) -> IPCResult<Vec<u8>> {
let bytes = serde_json::to_vec(&self)?;
Ok(bytes)
}
Ok(bytes)
}
impl<T> EventReceivePayload for T
where
T: DeserializeOwned,
{
fn from_payload_bytes<R: Read>(reader: R) -> IPCResult<Self> {
let type_data = serde_json::from_reader(reader)?;
pub fn deserialize<R: Read, T: DeserializeOwned>(reader: R) -> SerializationResult<T> {
let type_data = serde_json::from_reader(reader)?;
Ok(type_data)
}
Ok(type_data)
}

@ -1,32 +1,19 @@
use crate::payload::{EventReceivePayload, EventSendPayload};
use crate::prelude::IPCResult;
use crate::payload::SerializationResult;
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::io::Read;
pub type SerializationError = postcard::Error;
pub fn serialize<T: Serialize>(data: T) -> SerializationResult<Vec<u8>> {
let bytes = postcard::to_allocvec(&data)?.to_vec();
impl<T> EventSendPayload for T
where
T: Serialize,
{
fn to_payload_bytes(self) -> IPCResult<Vec<u8>> {
let bytes = postcard::to_allocvec(&self)?.to_vec();
Ok(bytes)
}
Ok(bytes)
}
impl<T> EventReceivePayload for T
where
T: DeserializeOwned,
{
fn from_payload_bytes<R: Read>(mut reader: R) -> IPCResult<Self> {
let mut buf = Vec::new();
// reading to end means reading the full size of the provided data
reader.read_to_end(&mut buf)?;
let type_data = postcard::from_bytes(&buf)?;
pub fn deserialize<R: Read, T: DeserializeOwned>(mut reader: R) -> SerializationResult<T> {
let mut buf = Vec::new();
// reading to end means reading the full size of the provided data
reader.read_to_end(&mut buf)?;
let type_data = postcard::from_bytes(&buf)?;
Ok(type_data)
}
Ok(type_data)
}

@ -1,48 +1,15 @@
use crate::payload::{EventReceivePayload, EventSendPayload};
use crate::prelude::{IPCError, IPCResult};
use crate::payload::SerializationResult;
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::io::Read;
use thiserror::Error;
#[derive(Debug, Error)]
pub enum SerializationError {
#[error("failed to serialize with rmp: {0}")]
Serialize(#[from] rmp_serde::encode::Error),
pub fn serialize<T: Serialize>(data: T) -> SerializationResult<Vec<u8>> {
let bytes = rmp_serde::to_vec(&data)?;
#[error("failed to deserialize with rmp: {0}")]
Deserialize(#[from] rmp_serde::decode::Error),
Ok(bytes)
}
impl From<rmp_serde::decode::Error> for IPCError {
fn from(e: rmp_serde::decode::Error) -> Self {
IPCError::Serialization(SerializationError::Deserialize(e))
}
}
impl From<rmp_serde::encode::Error> for IPCError {
fn from(e: rmp_serde::encode::Error) -> Self {
IPCError::Serialization(SerializationError::Serialize(e))
}
}
impl<T> EventSendPayload for T
where
T: Serialize,
{
fn to_payload_bytes(self) -> IPCResult<Vec<u8>> {
let bytes = rmp_serde::to_vec(&self)?;
Ok(bytes)
}
}
impl<T> EventReceivePayload for T
where
T: DeserializeOwned,
{
fn from_payload_bytes<R: Read>(reader: R) -> IPCResult<Self> {
let type_data = rmp_serde::from_read(reader)?;
Ok(type_data)
}
pub fn deserialize<R: Read, T: DeserializeOwned>(reader: R) -> SerializationResult<T> {
let type_data = rmp_serde::from_read(reader)?;
Ok(type_data)
}

@ -7,6 +7,8 @@ use crate::ipc::context::{Context, PooledContext, ReplyListeners};
use crate::ipc::server::IPCServer;
use crate::namespaces::builder::NamespaceBuilder;
use crate::namespaces::namespace::Namespace;
#[cfg(feature = "serialize")]
use crate::payload::DynamicSerializer;
use crate::protocol::AsyncStreamProtocolListener;
use std::collections::HashMap;
use std::future::Future;
@ -57,6 +59,8 @@ pub struct IPCBuilder<L: AsyncStreamProtocolListener> {
namespaces: HashMap<String, Namespace>,
data: TypeMap,
timeout: Duration,
#[cfg(feature = "serialize")]
default_serializer: DynamicSerializer,
}
impl<L> IPCBuilder<L>
@ -67,7 +71,7 @@ where
let mut handler = EventHandler::new();
handler.on(ERROR_EVENT_NAME, |_, event| {
Box::pin(async move {
let error_data = event.data::<ErrorEventData>()?;
let error_data = event.payload::<ErrorEventData>()?;
tracing::warn!(error_data.code);
tracing::warn!("error_data.message = '{}'", error_data.message);
@ -80,6 +84,8 @@ where
namespaces: HashMap::new(),
data: TypeMap::new(),
timeout: Duration::from_secs(60),
#[cfg(feature = "serialize")]
default_serializer: DynamicSerializer::first_available(),
}
}
@ -132,6 +138,15 @@ where
self
}
#[cfg(feature = "serialize")]
/// Sets the default serializer used for rust types that implement
/// serdes Serialize or Deserialize
pub fn default_serializer(mut self, serializer: DynamicSerializer) -> Self {
self.default_serializer = serializer;
self
}
/// Builds an ipc server
#[tracing::instrument(skip(self))]
pub async fn build_server(self) -> Result<()> {
@ -141,6 +156,9 @@ where
handler: self.handler,
data: self.data,
timeout: self.timeout,
#[cfg(feature = "serialize")]
default_serializer: self.default_serializer,
};
server.start::<L>(self.address.unwrap()).await?;
@ -153,12 +171,15 @@ where
self.validate()?;
let data = Arc::new(RwLock::new(self.data));
let reply_listeners = ReplyListeners::default();
let client = IPCClient {
namespaces: self.namespaces,
handler: self.handler,
data,
reply_listeners,
timeout: self.timeout,
#[cfg(feature = "serialize")]
default_serializer: self.default_serializer,
};
let ctx = client.connect::<L::Stream>(self.address.unwrap()).await?;
@ -188,6 +209,9 @@ where
data: Arc::clone(&data),
reply_listeners: Arc::clone(&reply_listeners),
timeout: self.timeout.clone(),
#[cfg(feature = "serialize")]
default_serializer: self.default_serializer.clone(),
};
let ctx = client.connect::<L::Stream>(address.clone()).await?;

@ -12,6 +12,9 @@ use tokio::sync::oneshot;
use tokio::sync::RwLock;
use typemap_rev::TypeMap;
#[cfg(feature = "serialize")]
use crate::payload::DynamicSerializer;
/// The IPC Client to connect to an IPC Server.
/// Use the [IPCBuilder](crate::builder::IPCBuilder) to create the client.
/// Usually one does not need to use the IPCClient object directly.
@ -22,6 +25,9 @@ pub struct IPCClient {
pub(crate) data: Arc<RwLock<TypeMap>>,
pub(crate) reply_listeners: ReplyListeners,
pub(crate) timeout: Duration,
#[cfg(feature = "serialize")]
pub(crate) default_serializer: DynamicSerializer,
}
impl IPCClient {
@ -34,8 +40,20 @@ impl IPCClient {
) -> Result<Context> {
let stream = S::protocol_connect(address).await?;
let (read_half, write_half) = stream.protocol_into_split();
let emitter = StreamEmitter::new::<S>(write_half);
let (tx, rx) = oneshot::channel();
#[cfg(feature = "serialize")]
let ctx = Context::new(
StreamEmitter::clone(&emitter),
self.data,
Some(tx),
self.reply_listeners,
self.timeout,
self.default_serializer,
);
#[cfg(not(feature = "serialize"))]
let ctx = Context::new(
StreamEmitter::clone(&emitter),
self.data,

@ -3,6 +3,8 @@ use crate::event::Event;
use crate::ipc::stream_emitter::StreamEmitter;
use futures::future;
use futures::future::Either;
#[cfg(feature = "serialize")]
use serde::Serialize;
use std::collections::HashMap;
use std::mem;
use std::ops::{Deref, DerefMut};
@ -13,6 +15,9 @@ use tokio::sync::{oneshot, Mutex, RwLock};
use tokio::time::Duration;
use typemap_rev::TypeMap;
#[cfg(feature = "serialize")]
use crate::payload::{DynamicSerializer, SerdePayload};
pub(crate) type ReplyListeners = Arc<Mutex<HashMap<u64, oneshot::Sender<Event>>>>;
/// An object provided to each callback function.
@ -40,6 +45,9 @@ pub struct Context {
reply_listeners: ReplyListeners,
reply_timeout: Duration,
#[cfg(feature = "serialize")]
default_serializer: DynamicSerializer,
}
impl Context {
@ -49,6 +57,7 @@ impl Context {
stop_sender: Option<Sender<()>>,
reply_listeners: ReplyListeners,
reply_timeout: Duration,
#[cfg(feature = "serialize")] default_serializer: DynamicSerializer,
) -> Self {
Self {
emitter,
@ -56,6 +65,8 @@ impl Context {
data,
stop_sender: Arc::new(Mutex::new(stop_sender)),
reply_timeout,
#[cfg(feature = "serialize")]
default_serializer,
}
}
@ -96,6 +107,11 @@ impl Context {
Ok(())
}
#[cfg(feature = "serialize")]
pub fn create_serde_payload<T: Serialize>(&self, data: T) -> SerdePayload<T> {
SerdePayload::new(self.default_serializer.clone(), data)
}
/// Returns the channel for a reply to the given message id
pub(crate) async fn get_reply_sender(&self, ref_id: u64) -> Option<oneshot::Sender<Event>> {
let mut listeners = self.reply_listeners.lock().await;

@ -4,6 +4,10 @@ use crate::events::event_handler::EventHandler;
use crate::ipc::context::{Context, ReplyListeners};
use crate::ipc::stream_emitter::StreamEmitter;
use crate::namespaces::namespace::Namespace;
#[cfg(feature = "serialize")]
use crate::payload::DynamicSerializer;
use crate::protocol::{AsyncProtocolStreamSplit, AsyncStreamProtocolListener};
use std::collections::HashMap;
use std::sync::Arc;
@ -19,6 +23,9 @@ pub struct IPCServer {
pub(crate) namespaces: HashMap<String, Namespace>,
pub(crate) data: TypeMap,
pub(crate) timeout: Duration,
#[cfg(feature = "serialize")]
pub(crate) default_serializer: DynamicSerializer,
}
impl IPCServer {
@ -41,18 +48,27 @@ impl IPCServer {
let namespaces = Arc::clone(&namespaces);
let data = Arc::clone(&data);
let timeout = self.timeout.clone();
#[cfg(feature = "serialize")]
let default_serializer = self.default_serializer.clone();
tokio::spawn(async move {
let (read_half, write_half) = stream.protocol_into_split();
let emitter = StreamEmitter::new::<L::Stream>(write_half);
let reply_listeners = ReplyListeners::default();
#[cfg(feature = "serialize")]
let ctx = Context::new(
StreamEmitter::clone(&emitter),
emitter,
data,
None,
reply_listeners,
timeout.into(),
default_serializer.clone(),
);
#[cfg(not(feature = "serialize"))]
let ctx = Context::new(emitter, data, None, reply_listeners, timeout.into());
handle_connection::<L::Stream>(namespaces, handler, read_half, ctx).await;
});

@ -31,16 +31,14 @@ impl StreamEmitter {
}
}
#[tracing::instrument(level = "trace", skip(self, data))]
pub async fn _emit<T: EventSendPayload>(
#[tracing::instrument(level = "trace", skip(self, data_bytes))]
pub async fn _emit(
&self,
namespace: Option<&str>,
event: &str,
data: T,
data_bytes: Vec<u8>,
res_id: Option<u64>,
) -> Result<EmitMetadata> {
let data_bytes = data.to_payload_bytes()?;
let event = if let Some(namespace) = namespace {
Event::with_namespace(namespace.to_string(), event.to_string(), data_bytes, res_id)
} else {
@ -63,9 +61,10 @@ impl StreamEmitter {
pub async fn emit<S: AsRef<str>, T: EventSendPayload>(
&self,
event: S,
data: T,
payload: T,
) -> Result<EmitMetadata> {
self._emit(None, event.as_ref(), data, None).await
self._emit(None, event.as_ref(), payload.to_payload_bytes()?, None)
.await
}
/// Emits an event to a specific namespace
@ -73,10 +72,15 @@ impl StreamEmitter {
&self,
namespace: S1,
event: S2,
data: T,
payload: T,
) -> Result<EmitMetadata> {
self._emit(Some(namespace.as_ref()), event.as_ref(), data, None)
.await
self._emit(
Some(namespace.as_ref()),
event.as_ref(),
payload.to_payload_bytes()?,
None,
)
.await
}
/// Emits a response to an event
@ -84,9 +88,15 @@ impl StreamEmitter {
&self,
event_id: u64,
event: S,
data: T,
payload: T,
) -> Result<EmitMetadata> {
self._emit(None, event.as_ref(), data, Some(event_id)).await
self._emit(
None,
event.as_ref(),
payload.to_payload_bytes()?,
Some(event_id),
)
.await
}
/// Emits a response to an event to a namespace
@ -95,12 +105,12 @@ impl StreamEmitter {
event_id: u64,
namespace: S1,
event: S2,
data: T,
payload: T,
) -> Result<EmitMetadata> {
self._emit(
Some(namespace.as_ref()),
event.as_ref(),
data,
payload.to_payload_bytes()?,
Some(event_id),
)
.await
@ -128,7 +138,7 @@ impl EmitMetadata {
pub async fn await_reply(&self, ctx: &Context) -> Result<Event> {
let reply = ctx.await_reply(self.message_id).await?;
if reply.name() == ERROR_EVENT_NAME {
Err(reply.data::<ErrorEventData>()?.into())
Err(reply.payload::<ErrorEventData>()?.into())
} else {
Ok(reply)
}

@ -110,43 +110,7 @@
feature = "serialize_json"
))
))]
compile_error!("Feature 'serialize' cannot be used by its own. Choose one of 'serialize_rmp', 'serialize_bincode', 'serialize_postcard' instead.");
#[cfg(any(
all(
feature = "serialize_rmp",
any(
feature = "serialize_postcard",
feature = "serialize_bincode",
feature = "serialize_json"
)
),
all(
feature = "serialize_bincode",
any(
feature = "serialize_rmp",
feature = "serialize_postcard",
feature = "serialize_json"
)
),
all(
feature = "serialize_postcard",
any(
feature = "serialize_rmp",
feature = "serialize_bincode",
feature = "serialize_json"
)
),
all(
feature = "serialize_json",
any(
feature = "serialize_rmp",
feature = "serialize_bincode",
feature = "serialize_postcard"
)
)
))]
compile_error!("You cannot use two serialize_* features at the same time");
compile_error!("Feature 'serialize' cannot be used by its own. Choose one of 'serialize_rmp', 'serialize_bincode', 'serialize_postcard', 'serialize_json instead.");
pub mod error;
mod events;

@ -12,17 +12,14 @@ use utils::protocol::*;
async fn it_sends_payloads() {
let port = get_free_port();
let ctx = get_client_with_server(port).await;
let payload = SimplePayload {
number: 0,
string: String::from("Hello World"),
};
#[cfg(feature = "serialize")]
let payload = ctx.create_serde_payload(payload);
ctx.emitter
.emit(
"ping",
SimplePayload {
number: 0,
string: String::from("Hello World"),
},
)
.await
.unwrap();
ctx.emitter.emit("ping", payload).await.unwrap();
// wait for the event to be handled
tokio::time::sleep(Duration::from_millis(10)).await;
@ -37,21 +34,27 @@ async fn it_sends_payloads() {
async fn it_receives_payloads() {
let port = get_free_port();
let ctx = get_client_with_server(port).await;
let payload = SimplePayload {
number: 0,
string: String::from("Hello World"),
};
#[cfg(feature = "serialize")]
let payload = ctx.create_serde_payload(payload);
let reply = ctx
.emitter
.emit(
"ping",
SimplePayload {
number: 0,
string: String::from("Hello World"),
},
)
.emit("ping", payload)
.await
.unwrap()
.await_reply(&ctx)
.await
.unwrap();
let reply_payload = reply.data::<SimplePayload>().unwrap();
#[cfg(not(feature = "serialize"))]
let reply_payload = reply.payload::<SimplePayload>().unwrap();
#[cfg(feature = "serialize")]
let reply_payload = reply.serde_payload::<SimplePayload>().unwrap();
let counters = get_counter_from_context(&ctx).await;
assert_eq!(counters.get("ping").await, 1);
@ -73,21 +76,41 @@ fn get_builder(port: u8) -> IPCBuilder<TestProtocolListener> {
async fn handle_ping_event(ctx: &Context, event: Event) -> IPCResult<()> {
increment_counter_for_event(ctx, &event).await;
let payload = event.data::<SimplePayload>()?;
ctx.emitter
.emit_response(event.id(), "pong", payload)
.await?;
let payload = get_simple_payload(&event)?;
#[cfg(feature = "serialize")]
{
ctx.emitter
.emit_response(event.id(), "pong", ctx.create_serde_payload(payload))
.await?;
}
#[cfg(not(feature = "serialize"))]
{
ctx.emitter
.emit_response(event.id(), "pong", payload)
.await?;
}
Ok(())
}
async fn handle_pong_event(ctx: &Context, event: Event) -> IPCResult<()> {
increment_counter_for_event(ctx, &event).await;
let _payload = event.data::<SimplePayload>()?;
let _payload = get_simple_payload(&event)?;
Ok(())
}
fn get_simple_payload(event: &Event) -> IPCResult<SimplePayload> {
#[cfg(feature = "serialize")]
{
event.serde_payload::<SimplePayload>()
}
#[cfg(not(feature = "serialize"))]
{
event.payload::<SimplePayload>()
}
}
#[cfg(feature = "serialize")]
mod payload_impl {
use serde::{Deserialize, Serialize};

Loading…
Cancel
Save