Merge pull request #27 from Trivernis/develop

Add IntoSerdePayload trait
pull/32/head
Julius Riegel 3 years ago committed by GitHub
commit 62d0ad1821
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

2
Cargo.lock generated

@ -93,7 +93,7 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]]
name = "bromine"
version = "0.15.0"
version = "0.16.0"
dependencies = [
"async-trait",
"bincode",

@ -1,6 +1,6 @@
[package]
name = "bromine"
version = "0.15.0"
version = "0.16.0"
authors = ["trivernis <trivernis@protonmail.com>"]
edition = "2018"
readme = "README.md"

@ -84,11 +84,13 @@ If no namespace for the event namespace is registered or no handler is registere
the event name, the event will be ignored.
### Receiving answers to emitted events
### Receiving replies to emitted events
When emitting an event to a peer, the emitter can wait for an answer to that event.
This is achieved by emitting events as a response to a specific event id.
When an event with a reference event id (ref_id) is received, first the registry is
searched for handlers waiting for a response (by trying to receive from a channel).
If a handler can be found, the event is passed to the handler waiting for the response.
Otherwise, the event will be processed as a regular event.
Otherwise, the event will be processed as a regular event.
Events passed from an event handler are always passed as replies to the event that
called that handler.

@ -1,5 +1,6 @@
use crate::context::Context;
use crate::error::Result;
use crate::payload::{EventReceivePayload, EventSendPayload};
use crate::payload::{FromPayload, IntoPayload};
use crate::prelude::{IPCError, IPCResult};
use byteorder::{BigEndian, ReadBytesExt};
use std::error::Error;
@ -26,8 +27,8 @@ impl Display for ErrorEventData {
}
}
impl EventSendPayload for ErrorEventData {
fn to_payload_bytes(self) -> IPCResult<Vec<u8>> {
impl IntoPayload for ErrorEventData {
fn into_payload(self, _: &Context) -> IPCResult<Vec<u8>> {
let mut buf = Vec::new();
buf.append(&mut self.code.to_be_bytes().to_vec());
let message_len = self.message.len() as u32;
@ -38,8 +39,8 @@ impl EventSendPayload for ErrorEventData {
}
}
impl EventReceivePayload for ErrorEventData {
fn from_payload_bytes<R: Read>(mut reader: R) -> Result<Self> {
impl FromPayload for ErrorEventData {
fn from_payload<R: Read>(mut reader: R) -> Result<Self> {
let code = reader.read_u16::<BigEndian>()?;
let message_len = reader.read_u32::<BigEndian>()?;
let mut message_buf = vec![0u8; message_len as usize];

@ -1,12 +1,7 @@
use crate::error::{Error, Result};
use crate::events::generate_event_id;
use crate::events::payload::EventReceivePayload;
#[cfg(feature = "serialize")]
use crate::payload::SerdePayload;
use crate::prelude::{IPCError, IPCResult};
use crate::events::payload::FromPayload;
use byteorder::{BigEndian, ReadBytesExt};
#[cfg(feature = "serialize")]
use serde::de::DeserializeOwned;
use std::fmt::Debug;
use std::io::{Cursor, Read};
use tokio::io::{AsyncRead, AsyncReadExt};
@ -73,21 +68,12 @@ impl Event {
/// Decodes the payload to the given type implementing the receive payload trait
#[tracing::instrument(level = "trace", skip(self))]
pub fn payload<T: EventReceivePayload>(&self) -> Result<T> {
let payload = T::from_payload_bytes(&self.data[..])?;
pub fn payload<T: FromPayload>(&self) -> Result<T> {
let payload = T::from_payload(&self.data[..])?;
Ok(payload)
}
#[cfg(feature = "serialize")]
/// Decodes the payload to the given type implementing DeserializeOwned
#[tracing::instrument(level = "trace", skip(self))]
pub fn serde_payload<T: DeserializeOwned>(&self) -> Result<T> {
let payload = SerdePayload::<T>::from_payload_bytes(&self.data[..])?;
Ok(payload.data())
}
/// Returns a reference of the underlying data
pub fn data_raw(&self) -> &[u8] {
&self.data
@ -186,19 +172,19 @@ impl EventHeader {
}
/// Reads and validates the format version
fn read_version<R: Read>(reader: &mut R) -> IPCResult<Vec<u8>> {
fn read_version<R: Read>(reader: &mut R) -> Result<Vec<u8>> {
let mut version = vec![0u8; 3];
reader.read_exact(&mut version)?;
if version[0] != FORMAT_VERSION[0] {
return Err(IPCError::unsupported_version_vec(version));
return Err(Error::unsupported_version_vec(version));
}
Ok(version)
}
/// Reads the reference event id
fn read_ref_id<R: Read>(reader: &mut R) -> IPCResult<Option<u64>> {
fn read_ref_id<R: Read>(reader: &mut R) -> Result<Option<u64>> {
let ref_id_exists = reader.read_u8()?;
let ref_id = match ref_id_exists {
0x00 => None,
@ -210,14 +196,14 @@ impl EventHeader {
}
/// Reads the name of the event
fn read_name<R: Read>(reader: &mut R) -> IPCResult<String> {
fn read_name<R: Read>(reader: &mut R) -> Result<String> {
let name_len = reader.read_u16::<BigEndian>()?;
Self::read_string(reader, name_len as usize)
}
/// Reads the namespace of the event
fn read_namespace<R: Read>(reader: &mut R, namespace_len: u16) -> IPCResult<Option<String>> {
fn read_namespace<R: Read>(reader: &mut R, namespace_len: u16) -> Result<Option<String>> {
let namespace = if namespace_len > 0 {
Some(Self::read_string(reader, namespace_len as usize)?)
} else {
@ -227,7 +213,7 @@ impl EventHeader {
Ok(namespace)
}
fn read_string<R: Read>(reader: &mut R, length: usize) -> IPCResult<String> {
fn read_string<R: Read>(reader: &mut R, length: usize) -> Result<String> {
let mut string_buf = vec![0u8; length];
reader.read_exact(&mut string_buf)?;
String::from_utf8(string_buf).map_err(|_| Error::CorruptedEvent)

@ -5,16 +5,26 @@ use std::io::Read;
#[cfg(feature = "serialize")]
pub use super::payload_serializer::*;
/// Trait that serializes a type into bytes and can fail
pub trait TryIntoBytes {
fn try_into_bytes(self) -> IPCResult<Vec<u8>>;
}
/// Trait that serializes a type into bytes and never fails
pub trait IntoBytes {
fn into_bytes(self) -> Vec<u8>;
}
/// Trait to convert event data into sending bytes
/// It is implemented for all types that implement Serialize
pub trait EventSendPayload {
fn to_payload_bytes(self) -> IPCResult<Vec<u8>>;
pub trait IntoPayload {
fn into_payload(self, ctx: &Context) -> IPCResult<Vec<u8>>;
}
/// Trait to get the event data from receiving bytes.
/// It is implemented for all types that are DeserializeOwned
pub trait EventReceivePayload: Sized {
fn from_payload_bytes<R: Read>(reader: R) -> IPCResult<Self>;
pub trait FromPayload: Sized {
fn from_payload<R: Read>(reader: R) -> IPCResult<Self>;
}
/// A payload wrapper type for sending bytes directly without
@ -35,14 +45,14 @@ impl BytePayload {
}
}
impl EventSendPayload for BytePayload {
fn to_payload_bytes(self) -> IPCResult<Vec<u8>> {
impl IntoPayload for BytePayload {
fn into_payload(self, _: &Context) -> IPCResult<Vec<u8>> {
Ok(self.bytes)
}
}
impl EventReceivePayload for BytePayload {
fn from_payload_bytes<R: Read>(mut reader: R) -> IPCResult<Self> {
impl FromPayload for BytePayload {
fn from_payload<R: Read>(mut reader: R) -> IPCResult<Self> {
let mut buf = Vec::new();
reader.read_to_end(&mut buf)?;
@ -70,14 +80,10 @@ impl<P1, P2> TandemPayload<P1, P2> {
}
}
impl<P1, P2> EventSendPayload for TandemPayload<P1, P2>
where
P1: EventSendPayload,
P2: EventSendPayload,
{
fn to_payload_bytes(self) -> IPCResult<Vec<u8>> {
let mut p1_bytes = self.load1.to_payload_bytes()?;
let mut p2_bytes = self.load2.to_payload_bytes()?;
impl<P1: IntoPayload, P2: IntoPayload> IntoPayload for TandemPayload<P1, P2> {
fn into_payload(self, ctx: &Context) -> IPCResult<Vec<u8>> {
let mut p1_bytes = self.load1.into_payload(&ctx)?;
let mut p2_bytes = self.load2.into_payload(&ctx)?;
let mut p1_length_bytes = (p1_bytes.len() as u64).to_be_bytes().to_vec();
let mut p2_length_bytes = (p2_bytes.len() as u64).to_be_bytes().to_vec();
@ -92,12 +98,8 @@ where
}
}
impl<P1, P2> EventReceivePayload for TandemPayload<P1, P2>
where
P1: EventReceivePayload,
P2: EventReceivePayload,
{
fn from_payload_bytes<R: Read>(mut reader: R) -> IPCResult<Self> {
impl<P1: FromPayload, P2: FromPayload> FromPayload for TandemPayload<P1, P2> {
fn from_payload<R: Read>(mut reader: R) -> IPCResult<Self> {
let p1_length = reader.read_u64::<BigEndian>()?;
let mut load1_bytes = vec![0u8; p1_length as usize];
reader.read_exact(&mut load1_bytes)?;
@ -107,14 +109,15 @@ where
reader.read_exact(&mut load2_bytes)?;
Ok(Self {
load1: P1::from_payload_bytes(load1_bytes.as_slice())?,
load2: P2::from_payload_bytes(load2_bytes.as_slice())?,
load1: P1::from_payload(load1_bytes.as_slice())?,
load2: P2::from_payload(load2_bytes.as_slice())?,
})
}
}
impl EventSendPayload for () {
fn to_payload_bytes(self) -> IPCResult<Vec<u8>> {
#[cfg(not(feature = "serialize"))]
impl IntoPayload for () {
fn into_payload(self, _: &Context) -> IPCResult<Vec<u8>> {
Ok(vec![])
}
}
@ -122,8 +125,9 @@ impl EventSendPayload for () {
#[cfg(feature = "serialize")]
mod serde_payload {
use super::DynamicSerializer;
use crate::payload::EventReceivePayload;
use crate::prelude::{EventSendPayload, IPCResult};
use crate::context::Context;
use crate::payload::{FromPayload, TryIntoBytes};
use crate::prelude::{IPCResult, IntoPayload};
use byteorder::ReadBytesExt;
use serde::de::DeserializeOwned;
use serde::Serialize;
@ -146,10 +150,7 @@ mod serde_payload {
}
}
impl<T> Clone for SerdePayload<T>
where
T: Clone,
{
impl<T: Clone> Clone for SerdePayload<T> {
fn clone(&self) -> Self {
Self {
serializer: self.serializer.clone(),
@ -158,11 +159,8 @@ mod serde_payload {
}
}
impl<T> EventSendPayload for SerdePayload<T>
where
T: Serialize,
{
fn to_payload_bytes(self) -> IPCResult<Vec<u8>> {
impl<T: Serialize> TryIntoBytes for SerdePayload<T> {
fn try_into_bytes(self) -> IPCResult<Vec<u8>> {
let mut buf = Vec::new();
let mut data_bytes = self.serializer.serialize(self.data)?;
let format_id = self.serializer as u8;
@ -173,11 +171,14 @@ mod serde_payload {
}
}
impl<T> EventReceivePayload for SerdePayload<T>
where
T: DeserializeOwned,
{
fn from_payload_bytes<R: Read>(mut reader: R) -> IPCResult<Self> {
impl<T: Serialize> IntoPayload for SerdePayload<T> {
fn into_payload(self, _: &Context) -> IPCResult<Vec<u8>> {
self.try_into_bytes()
}
}
impl<T: DeserializeOwned> FromPayload for SerdePayload<T> {
fn from_payload<R: Read>(mut reader: R) -> IPCResult<Self> {
let format_id = reader.read_u8()?;
let serializer = DynamicSerializer::from_primitive(format_id as usize)?;
let data = serializer.deserialize(reader)?;
@ -185,7 +186,22 @@ mod serde_payload {
Ok(Self { serializer, data })
}
}
impl<T: Serialize> IntoPayload for T {
fn into_payload(self, ctx: &Context) -> IPCResult<Vec<u8>> {
ctx.create_serde_payload(self).into_payload(&ctx)
}
}
impl<T: DeserializeOwned> FromPayload for T {
fn from_payload<R: Read>(reader: R) -> IPCResult<Self> {
let serde_payload = SerdePayload::<Self>::from_payload(reader)?;
Ok(serde_payload.data)
}
}
}
use crate::context::Context;
#[cfg(feature = "serialize")]
pub use serde_payload::*;

@ -1,10 +1,8 @@
use crate::error::{Error, Result};
use crate::event::Event;
use crate::ipc::stream_emitter::StreamEmitter;
use crate::ipc::stream_emitter::{EmitMetadata, StreamEmitter};
use futures::future;
use futures::future::Either;
#[cfg(feature = "serialize")]
use serde::Serialize;
use std::collections::HashMap;
use std::mem;
use std::ops::{Deref, DerefMut};
@ -15,6 +13,7 @@ use tokio::sync::{oneshot, Mutex, RwLock};
use tokio::time::Duration;
use typemap_rev::TypeMap;
use crate::payload::IntoPayload;
#[cfg(feature = "serialize")]
use crate::payload::{DynamicSerializer, SerdePayload};
@ -28,14 +27,14 @@ pub(crate) type ReplyListeners = Arc<Mutex<HashMap<u64, oneshot::Sender<Event>>>
/// async fn my_callback(ctx: &Context, _event: Event) -> IPCResult<()> {
/// // use the emitter on the context object to emit events
/// // inside callbacks
/// ctx.emitter.emit("ping", ()).await?;
/// ctx.emit("ping", ()).await?;
/// Ok(())
/// }
/// ```
#[derive(Clone)]
pub struct Context {
/// The event emitter
pub emitter: StreamEmitter,
emitter: StreamEmitter,
/// Field to store additional context data
pub data: Arc<RwLock<TypeMap>>,
@ -46,8 +45,10 @@ pub struct Context {
reply_timeout: Duration,
ref_id: Option<u64>,
#[cfg(feature = "serialize")]
default_serializer: DynamicSerializer,
pub default_serializer: DynamicSerializer,
}
impl Context {
@ -67,6 +68,42 @@ impl Context {
reply_timeout,
#[cfg(feature = "serialize")]
default_serializer,
ref_id: None,
}
}
/// Emits an event with a given payload that can be serialized into bytes
pub async fn emit<S: AsRef<str>, P: IntoPayload>(
&self,
name: S,
payload: P,
) -> Result<EmitMetadata> {
let payload_bytes = payload.into_payload(&self)?;
if let Some(ref_id) = &self.ref_id {
self.emitter
.emit_response(*ref_id, name, payload_bytes)
.await
} else {
self.emitter.emit(name, payload_bytes).await
}
}
/// Emits an event to a specific namespace
pub async fn emit_to<S1: AsRef<str>, S2: AsRef<str>, P: IntoPayload>(
&self,
namespace: S1,
name: S2,
payload: P,
) -> Result<EmitMetadata> {
let payload_bytes = payload.into_payload(&self)?;
if let Some(ref_id) = &self.ref_id {
self.emitter
.emit_response_to(*ref_id, namespace, name, payload_bytes)
.await
} else {
self.emitter.emit_to(namespace, name, payload_bytes).await
}
}
@ -108,7 +145,7 @@ impl Context {
}
#[cfg(feature = "serialize")]
pub fn create_serde_payload<T: Serialize>(&self, data: T) -> SerdePayload<T> {
pub fn create_serde_payload<T>(&self, data: T) -> SerdePayload<T> {
SerdePayload::new(self.default_serializer.clone(), data)
}
@ -117,6 +154,10 @@ impl Context {
let mut listeners = self.reply_listeners.lock().await;
listeners.remove(&ref_id)
}
pub(crate) fn set_ref_id(&mut self, id: Option<u64>) {
self.ref_id = id;
}
}
pub struct PooledContext {

@ -52,15 +52,14 @@ async fn handle_connection<S: 'static + AsyncProtocolStream>(
}
/// Handles a single event in a different tokio context
fn handle_event(ctx: Context, handler: Arc<EventHandler>, event: Event) {
fn handle_event(mut ctx: Context, handler: Arc<EventHandler>, event: Event) {
ctx.set_ref_id(Some(event.id()));
tokio::spawn(async move {
let id = event.id();
if let Err(e) = handler.handle_event(&ctx, event).await {
// emit an error event
if let Err(e) = ctx
.emitter
.emit_response(
id,
.emit(
ERROR_EVENT_NAME,
ErrorEventData {
message: format!("{:?}", e),

@ -1,7 +1,6 @@
use crate::error::Result;
use crate::error_event::{ErrorEventData, ERROR_EVENT_NAME};
use crate::events::event::Event;
use crate::events::payload::EventSendPayload;
use crate::ipc::context::Context;
use crate::protocol::AsyncProtocolStream;
use std::ops::DerefMut;
@ -12,18 +11,11 @@ use tokio::sync::Mutex;
/// An abstraction over any type that implements the AsyncProtocolStream trait
/// to emit events and share a connection across multiple
/// contexts.
#[derive(Clone)]
pub struct StreamEmitter {
stream: Arc<Mutex<dyn AsyncWrite + Send + Sync + Unpin + 'static>>,
}
impl Clone for StreamEmitter {
fn clone(&self) -> Self {
Self {
stream: Arc::clone(&self.stream),
}
}
}
impl StreamEmitter {
pub fn new<P: AsyncProtocolStream + 'static>(stream: P::OwnedSplitWriteHalf) -> Self {
Self {
@ -32,7 +24,7 @@ impl StreamEmitter {
}
#[tracing::instrument(level = "trace", skip(self, data_bytes))]
pub async fn _emit(
async fn _emit(
&self,
namespace: Option<&str>,
event: &str,
@ -58,59 +50,48 @@ impl StreamEmitter {
}
/// Emits an event
pub async fn emit<S: AsRef<str>, T: EventSendPayload>(
pub(crate) async fn emit<S: AsRef<str>>(
&self,
event: S,
payload: T,
payload: Vec<u8>,
) -> Result<EmitMetadata> {
self._emit(None, event.as_ref(), payload.to_payload_bytes()?, None)
.await
self._emit(None, event.as_ref(), payload, None).await
}
/// Emits an event to a specific namespace
pub async fn emit_to<S1: AsRef<str>, S2: AsRef<str>, T: EventSendPayload>(
pub(crate) async fn emit_to<S1: AsRef<str>, S2: AsRef<str>>(
&self,
namespace: S1,
event: S2,
payload: T,
payload: Vec<u8>,
) -> Result<EmitMetadata> {
self._emit(
Some(namespace.as_ref()),
event.as_ref(),
payload.to_payload_bytes()?,
None,
)
.await
self._emit(Some(namespace.as_ref()), event.as_ref(), payload, None)
.await
}
/// Emits a response to an event
pub async fn emit_response<S: AsRef<str>, T: EventSendPayload>(
pub(crate) async fn emit_response<S: AsRef<str>>(
&self,
event_id: u64,
event: S,
payload: T,
payload: Vec<u8>,
) -> Result<EmitMetadata> {
self._emit(
None,
event.as_ref(),
payload.to_payload_bytes()?,
Some(event_id),
)
.await
self._emit(None, event.as_ref(), payload, Some(event_id))
.await
}
/// Emits a response to an event to a namespace
pub async fn emit_response_to<S1: AsRef<str>, S2: AsRef<str>, T: EventSendPayload>(
pub(crate) async fn emit_response_to<S1: AsRef<str>, S2: AsRef<str>>(
&self,
event_id: u64,
namespace: S1,
event: S2,
payload: T,
payload: Vec<u8>,
) -> Result<EmitMetadata> {
self._emit(
Some(namespace.as_ref()),
event.as_ref(),
payload.to_payload_bytes()?,
payload,
Some(event_id),
)
.await

@ -8,7 +8,7 @@
//! /// Callback ping function
//! async fn handle_ping(ctx: &Context, event: Event) -> IPCResult<()> {
//! println!("Received ping event.");
//! ctx.emitter.emit_response(event.id(), "pong", ()).await?;
//! ctx.emit("pong", ()).await?;
//!
//! Ok(())
//! }
@ -45,7 +45,7 @@
//! // register callback inline
//! .on("something", callback!(ctx, event, async move {
//! println!("I think the server did something");
//! ctx.emitter.emit_response_to(event.id(), "mainspace-server", "ok", ()).await?;
//! ctx.emit_to("mainspace-server", "ok", ()).await?;
//! Ok(())
//! }))
//! .build()
@ -53,7 +53,7 @@
//! .build_client().await.unwrap();
//!
//! // emit an initial event
//! let response = ctx.emitter.emit("ping", ()).await.unwrap().await_reply(&ctx).await.unwrap();
//! let response = ctx.emit("ping", ()).await.unwrap().await_reply(&ctx).await.unwrap();
//! assert_eq!(response.name(), "pong");
//! }
//! ```
@ -79,7 +79,7 @@
//! // register callback
//! .on("ping", callback!(ctx, event, async move {
//! println!("Received ping event.");
//! ctx.emitter.emit_response(event.id(), "pong", ()).await?;
//! ctx.emit("pong", ()).await?;
//! Ok(())
//! }))
//! .namespace("mainspace-server")
@ -91,7 +91,7 @@
//! let mut my_key = data.get_mut::<MyKey>().unwrap();
//! *my_key += 1;
//! }
//! ctx.emitter.emit_response_to(event.id(), "mainspace-client", "something", ()).await?;
//! ctx.emit_to("mainspace-client", "something", ()).await?;
//! Ok(())
//! }))
//! .build()

@ -12,14 +12,12 @@ use utils::protocol::*;
async fn it_sends_payloads() {
let port = get_free_port();
let ctx = get_client_with_server(port).await;
let payload = SimplePayload {
number: 0,
string: String::from("Hello World"),
};
#[cfg(feature = "serialize")]
let payload = ctx.create_serde_payload(payload);
ctx.emitter.emit("ping", payload).await.unwrap();
ctx.emit("ping", payload).await.unwrap();
// wait for the event to be handled
tokio::time::sleep(Duration::from_millis(10)).await;
@ -38,23 +36,15 @@ async fn it_receives_payloads() {
number: 0,
string: String::from("Hello World"),
};
#[cfg(feature = "serialize")]
let payload = ctx.create_serde_payload(payload);
let reply = ctx
.emitter
.emit("ping", payload)
.await
.unwrap()
.await_reply(&ctx)
.await
.unwrap();
#[cfg(not(feature = "serialize"))]
let reply_payload = reply.payload::<SimplePayload>().unwrap();
#[cfg(feature = "serialize")]
let reply_payload = reply.serde_payload::<SimplePayload>().unwrap();
let counters = get_counter_from_context(&ctx).await;
assert_eq!(counters.get("ping").await, 1);
@ -76,41 +66,19 @@ fn get_builder(port: u8) -> IPCBuilder<TestProtocolListener> {
async fn handle_ping_event(ctx: &Context, event: Event) -> IPCResult<()> {
increment_counter_for_event(ctx, &event).await;
let payload = get_simple_payload(&event)?;
#[cfg(feature = "serialize")]
{
ctx.emitter
.emit_response(event.id(), "pong", ctx.create_serde_payload(payload))
.await?;
}
#[cfg(not(feature = "serialize"))]
{
ctx.emitter
.emit_response(event.id(), "pong", payload)
.await?;
}
let payload = event.payload::<SimplePayload>()?;
ctx.emit("pong", payload).await?;
Ok(())
}
async fn handle_pong_event(ctx: &Context, event: Event) -> IPCResult<()> {
increment_counter_for_event(ctx, &event).await;
let _payload = get_simple_payload(&event)?;
let _payload = event.payload::<SimplePayload>()?;
Ok(())
}
fn get_simple_payload(event: &Event) -> IPCResult<SimplePayload> {
#[cfg(feature = "serialize")]
{
event.serde_payload::<SimplePayload>()
}
#[cfg(not(feature = "serialize"))]
{
event.payload::<SimplePayload>()
}
}
#[cfg(feature = "serialize")]
mod payload_impl {
use serde::{Deserialize, Serialize};
@ -124,8 +92,9 @@ mod payload_impl {
#[cfg(not(feature = "serialize"))]
mod payload_impl {
use bromine::context::Context;
use bromine::error::Result;
use bromine::payload::{EventReceivePayload, EventSendPayload};
use bromine::payload::{FromPayload, IntoPayload};
use bromine::prelude::IPCResult;
use byteorder::{BigEndian, ReadBytesExt};
use std::io::Read;
@ -135,8 +104,8 @@ mod payload_impl {
pub number: u32,
}
impl EventSendPayload for SimplePayload {
fn to_payload_bytes(self) -> IPCResult<Vec<u8>> {
impl IntoPayload for SimplePayload {
fn into_payload(self, _: &Context) -> IPCResult<Vec<u8>> {
let mut buf = Vec::new();
let string_length = self.string.len() as u16;
let string_length_bytes = string_length.to_be_bytes();
@ -150,8 +119,8 @@ mod payload_impl {
}
}
impl EventReceivePayload for SimplePayload {
fn from_payload_bytes<R: Read>(mut reader: R) -> Result<Self> {
impl FromPayload for SimplePayload {
fn from_payload<R: Read>(mut reader: R) -> Result<Self> {
let string_length = reader.read_u16::<BigEndian>()?;
let mut string_buf = vec![0u8; string_length as usize];
reader.read_exact(&mut string_buf)?;

@ -13,7 +13,7 @@ use utils::protocol::*;
async fn it_sends_events() {
let port = get_free_port();
let ctx = get_client_with_server(port).await;
ctx.emitter.emit("ping", EmptyPayload).await.unwrap();
ctx.emit("ping", EmptyPayload).await.unwrap();
// allow the event to be processed
tokio::time::sleep(Duration::from_millis(10)).await;
@ -28,14 +28,8 @@ async fn it_sends_events() {
async fn it_sends_namespaced_events() {
let port = get_free_port();
let ctx = get_client_with_server(port).await;
ctx.emitter
.emit_to("test", "ping", EmptyPayload)
.await
.unwrap();
ctx.emitter
.emit_to("test", "pong", EmptyPayload)
.await
.unwrap();
ctx.emit_to("test", "ping", EmptyPayload).await.unwrap();
ctx.emit_to("test", "pong", EmptyPayload).await.unwrap();
// allow the event to be processed
tokio::time::sleep(Duration::from_millis(10)).await;
@ -52,7 +46,6 @@ async fn it_receives_responses() {
let port = get_free_port();
let ctx = get_client_with_server(port).await;
let reply = ctx
.emitter
.emit("ping", EmptyPayload)
.await
.unwrap()
@ -72,10 +65,7 @@ async fn it_receives_responses() {
async fn it_handles_errors() {
let port = get_free_port();
let ctx = get_client_with_server(port).await;
ctx.emitter
.emit("create_error", EmptyPayload)
.await
.unwrap();
ctx.emit("create_error", EmptyPayload).await.unwrap();
// allow the event to be processed
tokio::time::sleep(Duration::from_millis(10)).await;
let counter = get_counter_from_context(&ctx).await;
@ -90,7 +80,6 @@ async fn it_receives_error_responses() {
let port = get_free_port();
let ctx = get_client_with_server(port).await;
let result = ctx
.emitter
.emit("create_error", EmptyPayload)
.await
.unwrap()
@ -124,9 +113,7 @@ fn get_builder(port: u8) -> IPCBuilder<TestProtocolListener> {
async fn handle_ping_event(ctx: &Context, event: Event) -> IPCResult<()> {
increment_counter_for_event(ctx, &event).await;
ctx.emitter
.emit_response(event.id(), "pong", EmptyPayload)
.await?;
ctx.emit("pong", EmptyPayload).await?;
Ok(())
}
@ -151,8 +138,8 @@ async fn handle_error_event(ctx: &Context, event: Event) -> IPCResult<()> {
pub struct EmptyPayload;
impl EventSendPayload for EmptyPayload {
fn to_payload_bytes(self) -> IPCResult<Vec<u8>> {
impl IntoPayload for EmptyPayload {
fn into_payload(self, _: &Context) -> IPCResult<Vec<u8>> {
Ok(vec![])
}
}

@ -1,3 +1,4 @@
#[cfg(feature = "serialize")]
use bromine::prelude::*;
#[cfg(feature = "serialize_rmp")]
@ -27,8 +28,8 @@ fn it_serializes_json() {
#[cfg(feature = "serialize")]
fn test_serialization(serializer: DynamicSerializer) {
let test_payload = get_test_payload(serializer);
let payload_bytes = test_payload.clone().to_payload_bytes().unwrap();
let payload = TestSerdePayload::from_payload_bytes(&payload_bytes[..]).unwrap();
let payload_bytes = test_payload.clone().try_into_bytes().unwrap();
let payload = TestSerdePayload::from_payload(&payload_bytes[..]).unwrap();
assert_eq!(payload.data(), test_payload.data())
}

Loading…
Cancel
Save