diff --git a/Cargo.lock b/Cargo.lock index a7557c4d..392a3e7d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -500,8 +500,9 @@ dependencies = [ [[package]] name = "rmp-ipc" -version = "0.8.1" +version = "0.8.2" dependencies = [ + "byteorder", "criterion", "lazy_static", "rmp-serde", diff --git a/Cargo.toml b/Cargo.toml index a3a4a6fe..836934aa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "rmp-ipc" -version = "0.8.1" +version = "0.8.2" authors = ["trivernis "] edition = "2018" readme = "README.md" @@ -25,6 +25,7 @@ rmp-serde = "0.15.4" tracing = "0.1.29" lazy_static = "1.4.0" typemap_rev = "0.1.5" +byteorder = "1.4.3" [dependencies.serde] version = "1.0.130" diff --git a/src/events/payload.rs b/src/events/payload.rs index 5a793b9c..b066eb52 100644 --- a/src/events/payload.rs +++ b/src/events/payload.rs @@ -1,4 +1,5 @@ use crate::prelude::IPCResult; +use byteorder::{BigEndian, ReadBytesExt}; use serde::de::DeserializeOwned; use serde::Serialize; use std::io::Read; @@ -47,6 +48,11 @@ impl BytePayload { pub fn new(bytes: Vec) -> Self { Self { bytes } } + + /// Returns the bytes of the payload + pub fn into_inner(self) -> Vec { + self.bytes + } } impl EventSendPayload for BytePayload { @@ -63,3 +69,66 @@ impl EventReceivePayload for BytePayload { Ok(Self::new(buf)) } } + +/// A payload wrapper that allows storing two different payloads +/// independent from each other. For example one payload can be +/// a payload serialized by serde while the other is a raw byte +/// payload +pub struct TandemPayload { + load1: P1, + load2: P2, +} + +impl TandemPayload { + pub fn new(load1: P1, load2: P2) -> Self { + Self { load1, load2 } + } + + /// Returns both payload stored in the tandem payload + pub fn into_inner(self) -> (P1, P2) { + (self.load1, self.load2) + } +} + +impl EventSendPayload for TandemPayload +where + P1: EventSendPayload, + P2: EventSendPayload, +{ + fn to_payload_bytes(self) -> IPCResult> { + let mut p1_bytes = self.load1.to_payload_bytes()?; + let mut p2_bytes = self.load2.to_payload_bytes()?; + + let mut p1_length_bytes = (p1_bytes.len() as u32).to_be_bytes().to_vec(); + let mut p2_length_bytes = (p2_bytes.len() as u32).to_be_bytes().to_vec(); + + let mut bytes = Vec::new(); + bytes.append(&mut p1_length_bytes); + bytes.append(&mut p1_bytes); + bytes.append(&mut p2_length_bytes); + bytes.append(&mut p2_bytes); + + Ok(bytes) + } +} + +impl EventReceivePayload for TandemPayload +where + P1: EventReceivePayload, + P2: EventReceivePayload, +{ + fn from_payload_bytes(mut reader: R) -> IPCResult { + let p1_length = reader.read_u32::()?; + let mut load1_bytes = vec![0u8; p1_length as usize]; + reader.read_exact(&mut load1_bytes)?; + + let p2_length = reader.read_u32::()?; + let mut load2_bytes = vec![0u8; p2_length as usize]; + reader.read_exact(&mut load2_bytes)?; + + Ok(Self { + load1: P1::from_payload_bytes(load1_bytes.as_slice())?, + load2: P2::from_payload_bytes(load2_bytes.as_slice())?, + }) + } +}