Add FromPayload implementation for DeserializeOwned

Signed-off-by: trivernis <trivernis@protonmail.com>
pull/27/head
trivernis 3 years ago
parent 324a788031
commit f189fe11f0
Signed by: Trivernis
GPG Key ID: DFFFCC2C7A02DB45

@ -1,12 +1,7 @@
use crate::error::{Error, Result};
use crate::events::generate_event_id;
use crate::events::payload::FromPayload;
#[cfg(feature = "serialize")]
use crate::payload::SerdePayload;
use crate::prelude::{IPCError, IPCResult};
use byteorder::{BigEndian, ReadBytesExt};
#[cfg(feature = "serialize")]
use serde::de::DeserializeOwned;
use std::fmt::Debug;
use std::io::{Cursor, Read};
use tokio::io::{AsyncRead, AsyncReadExt};
@ -79,15 +74,6 @@ impl Event {
Ok(payload)
}
#[cfg(feature = "serialize")]
/// Decodes the payload to the given type implementing DeserializeOwned
#[tracing::instrument(level = "trace", skip(self))]
pub fn serde_payload<T: DeserializeOwned>(&self) -> Result<T> {
let payload = SerdePayload::<T>::from_payload(&self.data[..])?;
Ok(payload.data())
}
/// Returns a reference of the underlying data
pub fn data_raw(&self) -> &[u8] {
&self.data
@ -186,19 +172,19 @@ impl EventHeader {
}
/// Reads and validates the format version
fn read_version<R: Read>(reader: &mut R) -> IPCResult<Vec<u8>> {
fn read_version<R: Read>(reader: &mut R) -> Result<Vec<u8>> {
let mut version = vec![0u8; 3];
reader.read_exact(&mut version)?;
if version[0] != FORMAT_VERSION[0] {
return Err(IPCError::unsupported_version_vec(version));
return Err(Error::unsupported_version_vec(version));
}
Ok(version)
}
/// Reads the reference event id
fn read_ref_id<R: Read>(reader: &mut R) -> IPCResult<Option<u64>> {
fn read_ref_id<R: Read>(reader: &mut R) -> Result<Option<u64>> {
let ref_id_exists = reader.read_u8()?;
let ref_id = match ref_id_exists {
0x00 => None,
@ -210,14 +196,14 @@ impl EventHeader {
}
/// Reads the name of the event
fn read_name<R: Read>(reader: &mut R) -> IPCResult<String> {
fn read_name<R: Read>(reader: &mut R) -> Result<String> {
let name_len = reader.read_u16::<BigEndian>()?;
Self::read_string(reader, name_len as usize)
}
/// Reads the namespace of the event
fn read_namespace<R: Read>(reader: &mut R, namespace_len: u16) -> IPCResult<Option<String>> {
fn read_namespace<R: Read>(reader: &mut R, namespace_len: u16) -> Result<Option<String>> {
let namespace = if namespace_len > 0 {
Some(Self::read_string(reader, namespace_len as usize)?)
} else {
@ -227,7 +213,7 @@ impl EventHeader {
Ok(namespace)
}
fn read_string<R: Read>(reader: &mut R, length: usize) -> IPCResult<String> {
fn read_string<R: Read>(reader: &mut R, length: usize) -> Result<String> {
let mut string_buf = vec![0u8; length];
reader.read_exact(&mut string_buf)?;
String::from_utf8(string_buf).map_err(|_| Error::CorruptedEvent)

@ -192,6 +192,14 @@ mod serde_payload {
ctx.create_serde_payload(self).into_payload(&ctx)
}
}
impl<T: DeserializeOwned> FromPayload for T {
fn from_payload<R: Read>(reader: R) -> IPCResult<Self> {
let serde_payload = SerdePayload::<Self>::from_payload(reader)?;
Ok(serde_payload.data)
}
}
}
use crate::context::Context;

@ -12,11 +12,11 @@ use utils::protocol::*;
async fn it_sends_payloads() {
let port = get_free_port();
let ctx = get_client_with_server(port).await;
let payload = SimplePayload {
number: 0,
string: String::from("Hello World"),
};
#[cfg(feature = "serialize")]
ctx.emit("ping", payload).await.unwrap();
// wait for the event to be handled
@ -36,7 +36,6 @@ async fn it_receives_payloads() {
number: 0,
string: String::from("Hello World"),
};
#[cfg(feature = "serialize")]
let reply = ctx
.emit("ping", payload)
.await
@ -44,12 +43,8 @@ async fn it_receives_payloads() {
.await_reply(&ctx)
.await
.unwrap();
#[cfg(not(feature = "serialize"))]
let reply_payload = reply.payload::<SimplePayload>().unwrap();
#[cfg(feature = "serialize")]
let reply_payload = reply.serde_payload::<SimplePayload>().unwrap();
let counters = get_counter_from_context(&ctx).await;
assert_eq!(counters.get("ping").await, 1);
@ -71,39 +66,19 @@ fn get_builder(port: u8) -> IPCBuilder<TestProtocolListener> {
async fn handle_ping_event(ctx: &Context, event: Event) -> IPCResult<()> {
increment_counter_for_event(ctx, &event).await;
let payload = get_simple_payload(&event)?;
#[cfg(feature = "serialize")]
{
ctx.emit("pong", payload).await?;
}
#[cfg(not(feature = "serialize"))]
{
ctx.emitter
.emit_response(event.id(), "pong", payload)
.await?;
}
let payload = event.payload::<SimplePayload>()?;
ctx.emit("pong", payload).await?;
Ok(())
}
async fn handle_pong_event(ctx: &Context, event: Event) -> IPCResult<()> {
increment_counter_for_event(ctx, &event).await;
let _payload = get_simple_payload(&event)?;
let _payload = event.payload::<SimplePayload>()?;
Ok(())
}
fn get_simple_payload(event: &Event) -> IPCResult<SimplePayload> {
#[cfg(feature = "serialize")]
{
event.serde_payload::<SimplePayload>()
}
#[cfg(not(feature = "serialize"))]
{
event.payload::<SimplePayload>()
}
}
#[cfg(feature = "serialize")]
mod payload_impl {
use serde::{Deserialize, Serialize};
@ -117,6 +92,7 @@ mod payload_impl {
#[cfg(not(feature = "serialize"))]
mod payload_impl {
use bromine::context::Context;
use bromine::error::Result;
use bromine::payload::{FromPayload, IntoPayload};
use bromine::prelude::IPCResult;
@ -129,7 +105,7 @@ mod payload_impl {
}
impl IntoPayload for SimplePayload {
fn to_payload_bytes(self) -> IPCResult<Vec<u8>> {
fn into_payload(self, _: &Context) -> IPCResult<Vec<u8>> {
let mut buf = Vec::new();
let string_length = self.string.len() as u16;
let string_length_bytes = string_length.to_be_bytes();

@ -1,3 +1,4 @@
#[cfg(feature = "serialize")]
use bromine::prelude::*;
#[cfg(feature = "serialize_rmp")]

Loading…
Cancel
Save