Move messagepack to features and implement raw byte protocol for serialisation

Signed-off-by: trivernis <trivernis@protonmail.com>
pull/24/head
trivernis 2 years ago
parent 005a4bfc68
commit be869a7faa
Signed by: Trivernis
GPG Key ID: DFFFCC2C7A02DB45

2
Cargo.lock generated

@ -38,7 +38,7 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]]
name = "bromine"
version = "0.12.0"
version = "0.13.0"
dependencies = [
"async-trait",
"byteorder",

@ -1,6 +1,6 @@
[package]
name = "bromine"
version = "0.12.0"
version = "0.13.0"
authors = ["trivernis <trivernis@protonmail.com>"]
edition = "2018"
readme = "README.md"
@ -21,26 +21,38 @@ harness = false
[dependencies]
thiserror = "1.0.30"
rmp-serde = "0.15.4"
tracing = "0.1.29"
lazy_static = "1.4.0"
typemap_rev = "0.1.5"
byteorder = "1.4.3"
async-trait = "0.1.51"
futures = "0.3.17"
rmp-serde = {version = "0.15.5", optional = true}
[dependencies.serde]
optional = true
version = "1.0.130"
features = ["serde_derive"]
features = []
[dependencies.tokio]
version = "1.12.0"
features = ["net", "io-std", "io-util", "sync", "time"]
[dev-dependencies]
rmp-serde = "0.15.4"
[dev-dependencies.serde]
version = "1.0.130"
features = ["serde_derive"]
[dev-dependencies.criterion]
version = "0.3.5"
features = ["async_tokio", "html_reports"]
[dev-dependencies.tokio]
version = "1.12.0"
features = ["macros", "rt-multi-thread"]
features = ["macros", "rt-multi-thread"]
[features]
default = ["messagepack"]
messagepack = ["serde", "rmp-serde"]

@ -9,9 +9,11 @@ pub enum Error {
#[error(transparent)]
IoError(#[from] tokio::io::Error),
#[cfg(feature = "messagepack")]
#[error(transparent)]
Decode(#[from] rmp_serde::decode::Error),
#[cfg(feature = "messagepack")]
#[error(transparent)]
Encode(#[from] rmp_serde::encode::Error),
@ -24,6 +26,9 @@ pub enum Error {
#[error("Channel Error: {0}")]
ReceiveError(#[from] oneshot::error::RecvError),
#[error("The received event was corrupted")]
CorruptedEvent,
#[error("Send Error")]
SendError,

@ -1,6 +1,10 @@
use serde::{Deserialize, Serialize};
use crate::error::Result;
use crate::payload::{EventReceivePayload, EventSendPayload};
use crate::prelude::{IPCError, IPCResult};
use byteorder::{BigEndian, ReadBytesExt};
use std::error::Error;
use std::fmt::{Display, Formatter};
use std::io::Read;
pub static ERROR_EVENT_NAME: &str = "error";
@ -8,7 +12,7 @@ pub static ERROR_EVENT_NAME: &str = "error";
/// The error event has a default handler that just logs that
/// an error occurred. For a custom handler, register a handler on
/// the [ERROR_EVENT_NAME] event.
#[derive(Clone, Deserialize, Serialize, Debug)]
#[derive(Clone, Debug)]
pub struct ErrorEventData {
pub code: u16,
pub message: String,
@ -21,3 +25,27 @@ impl Display for ErrorEventData {
write!(f, "IPC Code {}: '{}'", self.code, self.message)
}
}
impl EventSendPayload for ErrorEventData {
fn to_payload_bytes(self) -> IPCResult<Vec<u8>> {
let mut buf = Vec::new();
buf.append(&mut self.code.to_be_bytes().to_vec());
let message_len = self.message.len() as u32;
buf.append(&mut message_len.to_be_bytes().to_vec());
buf.append(&mut self.message.into_bytes());
Ok(buf)
}
}
impl EventReceivePayload for ErrorEventData {
fn from_payload_bytes<R: Read>(mut reader: R) -> Result<Self> {
let code = reader.read_u16::<BigEndian>()?;
let message_len = reader.read_u32::<BigEndian>()?;
let mut message_buf = vec![0u8; message_len as usize];
reader.read_exact(&mut message_buf)?;
let message = String::from_utf8(message_buf).map_err(|_| IPCError::CorruptedEvent)?;
Ok(ErrorEventData { code, message })
}
}

@ -1,7 +1,6 @@
use crate::error::Result;
use crate::error::{Error, Result};
use crate::events::generate_event_id;
use crate::events::payload::EventReceivePayload;
use serde::{Deserialize, Serialize};
use std::fmt::Debug;
use tokio::io::{AsyncRead, AsyncReadExt};
@ -14,7 +13,7 @@ pub struct Event {
data: Vec<u8>,
}
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug)]
struct EventHeader {
id: u64,
ref_id: Option<u64>,
@ -94,11 +93,8 @@ impl Event {
let data_length = total_length - header_length as u64;
tracing::trace!(total_length, header_length, data_length);
let header: EventHeader = {
let mut header_bytes = vec![0u8; header_length as usize];
reader.read_exact(&mut header_bytes).await?;
rmp_serde::from_read(&header_bytes[..])?
};
let header: EventHeader = EventHeader::from_async_read(reader).await?;
let mut data = vec![0u8; data_length as usize];
reader.read_exact(&mut data).await?;
let event = Event { header, data };
@ -109,7 +105,7 @@ impl Event {
/// Encodes the event into bytes
#[tracing::instrument(level = "trace", skip(self))]
pub fn into_bytes(mut self) -> Result<Vec<u8>> {
let mut header_bytes = rmp_serde::to_vec(&self.header)?;
let mut header_bytes = self.header.into_bytes();
let header_length = header_bytes.len() as u16;
let data_length = self.data.len();
let total_length = header_length as u64 + data_length as u64;
@ -124,3 +120,61 @@ impl Event {
Ok(buf)
}
}
impl EventHeader {
/// Serializes the event header into bytes
pub fn into_bytes(self) -> Vec<u8> {
let mut buf = Vec::new();
buf.append(&mut self.id.to_be_bytes().to_vec());
if let Some(ref_id) = self.ref_id {
buf.push(0xFF);
buf.append(&mut ref_id.to_be_bytes().to_vec());
} else {
buf.push(0x00);
}
if let Some(namespace) = self.namespace {
let namespace_len = namespace.len() as u16;
buf.append(&mut namespace_len.to_be_bytes().to_vec());
buf.append(&mut namespace.into_bytes());
} else {
buf.append(&mut 0u16.to_be_bytes().to_vec());
}
let name_len = self.name.len() as u16;
buf.append(&mut name_len.to_be_bytes().to_vec());
buf.append(&mut self.name.into_bytes());
buf
}
/// Parses an event header from an async reader
pub async fn from_async_read<R: AsyncRead + Unpin>(reader: &mut R) -> Result<Self> {
let id = reader.read_u64().await?;
let ref_id_exists = reader.read_u8().await?;
let ref_id = match ref_id_exists {
0x00 => None,
0xFF => Some(reader.read_u64().await?),
_ => return Err(Error::CorruptedEvent),
};
let namespace_len = reader.read_u16().await?;
let namespace = if namespace_len > 0 {
let mut namespace_buf = vec![0u8; namespace_len as usize];
reader.read_exact(&mut namespace_buf).await?;
Some(String::from_utf8(namespace_buf).map_err(|_| Error::CorruptedEvent)?)
} else {
None
};
let name_len = reader.read_u16().await?;
let mut name_buf = vec![0u8; name_len as usize];
reader.read_exact(&mut name_buf).await?;
let name = String::from_utf8(name_buf).map_err(|_| Error::CorruptedEvent)?;
Ok(Self {
id,
ref_id,
namespace,
name,
})
}
}

@ -1,7 +1,5 @@
use crate::prelude::IPCResult;
use byteorder::{BigEndian, ReadBytesExt};
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::io::Read;
/// Trait to convert event data into sending bytes
@ -10,33 +8,12 @@ pub trait EventSendPayload {
fn to_payload_bytes(self) -> IPCResult<Vec<u8>>;
}
impl<T> EventSendPayload for T
where
T: Serialize,
{
fn to_payload_bytes(self) -> IPCResult<Vec<u8>> {
let bytes = rmp_serde::to_vec(&self)?;
Ok(bytes)
}
}
/// Trait to get the event data from receiving bytes.
/// It is implemented for all types that are DeserializeOwned
pub trait EventReceivePayload: Sized {
fn from_payload_bytes<R: Read>(reader: R) -> IPCResult<Self>;
}
impl<T> EventReceivePayload for T
where
T: DeserializeOwned,
{
fn from_payload_bytes<R: Read>(reader: R) -> IPCResult<Self> {
let type_data = rmp_serde::from_read(reader)?;
Ok(type_data)
}
}
/// A payload wrapper type for sending bytes directly without
/// serializing them
#[derive(Clone)]
@ -132,3 +109,36 @@ where
})
}
}
#[cfg(feature = "messagepack")]
mod rmp_impl {
use super::{EventReceivePayload, EventSendPayload};
use crate::prelude::IPCResult;
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::io::Read;
impl<T> EventSendPayload for T
where
T: Serialize,
{
fn to_payload_bytes(self) -> IPCResult<Vec<u8>> {
let bytes = rmp_serde::to_vec(&self)?;
Ok(bytes)
}
}
impl<T> EventReceivePayload for T
where
T: DeserializeOwned,
{
fn from_payload_bytes<R: Read>(reader: R) -> IPCResult<Self> {
let type_data = rmp_serde::from_read(reader)?;
Ok(type_data)
}
}
}
#[cfg(feature = "messagepack")]
pub use rmp_impl::*;

Loading…
Cancel
Save