From a76e41850db5378b2e2479975b50cf47ac67e02c Mon Sep 17 00:00:00 2001 From: trivernis Date: Wed, 20 Oct 2021 20:24:52 +0200 Subject: [PATCH] Change event serialization to not serialize the data twice Signed-off-by: trivernis --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/events/event.rs | 86 ++++++++++++++++++++++++--------------- src/ipc/stream_emitter.rs | 6 ++- 4 files changed, 59 insertions(+), 37 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 35da139d..e45be309 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -148,7 +148,7 @@ dependencies = [ [[package]] name = "rmp-ipc" -version = "0.6.1" +version = "0.7.0" dependencies = [ "lazy_static", "log", diff --git a/Cargo.toml b/Cargo.toml index ec65a37b..9af750e7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "rmp-ipc" -version = "0.6.1" +version = "0.7.0" authors = ["trivernis "] edition = "2018" readme = "README.md" diff --git a/src/events/event.rs b/src/events/event.rs index b8acfb78..40f3327b 100644 --- a/src/events/event.rs +++ b/src/events/event.rs @@ -7,13 +7,17 @@ use tokio::io::{AsyncRead, AsyncReadExt}; /// A container representing an event and underlying binary data. /// The data can be decoded into an object representation or read /// as raw binary data. -#[derive(Serialize, Deserialize)] pub struct Event { + header: EventHeader, + data: Vec, +} + +#[derive(Serialize, Deserialize)] +struct EventHeader { id: u64, ref_id: Option, namespace: Option, name: String, - data: Vec, } impl Event { @@ -24,24 +28,35 @@ impl Event { data: Vec, ref_id: Option, ) -> Self { - Self { + let header = EventHeader { id: generate_event_id(), ref_id, namespace: Some(namespace), name, - data, - } + }; + Self { header, data } } /// Creates a new event pub fn new(name: String, data: Vec, ref_id: Option) -> Self { - Self { + let header = EventHeader { id: generate_event_id(), ref_id, namespace: None, name, - data, - } + }; + Self { header, data } + } + + /// The identifier of the message + pub fn id(&self) -> u64 { + self.header.id + } + + /// The ID of the message referenced by this message. + /// It represents the message that is replied to and can be None. + pub fn reference_id(&self) -> Option { + self.header.ref_id.clone() } /// Decodes the data to the given type @@ -58,46 +73,51 @@ impl Event { /// Returns a reference to the namespace pub fn namespace(&self) -> &Option { - &self.namespace + &self.header.namespace } /// Returns the name of the event pub fn name(&self) -> &str { - &self.name + &self.header.name } /// Reads an event message pub async fn from_async_read(reader: &mut R) -> Result { - let length = reader.read_u32().await?; - log::trace!("Parsing event of length {}", length); - let mut data = vec![0u8; length as usize]; + let total_length = reader.read_u64().await?; + let header_length = reader.read_u16().await?; + let data_length = total_length - header_length as u64; + log::trace!( + "Parsing event of length {} ({} header, {} data)", + total_length, + header_length, + data_length + ); + + let header: EventHeader = { + let mut header_bytes = vec![0u8; header_length as usize]; + reader.read_exact(&mut header_bytes).await?; + rmp_serde::from_read(&header_bytes[..])? + }; + let mut data = vec![0u8; data_length as usize]; reader.read_exact(&mut data).await?; - let event = rmp_serde::from_read(&data[..])?; + let event = Event { header, data }; Ok(event) } /// Encodes the event into bytes - pub fn to_bytes(&self) -> Result> { - let mut event_bytes = rmp_serde::to_vec(&self)?; - let mut length_bytes = (event_bytes.len() as u32).to_be_bytes().to_vec(); - length_bytes.reverse(); - - for byte in length_bytes { - event_bytes.insert(0, byte); - } - - Ok(event_bytes) - } + pub fn into_bytes(mut self) -> Result> { + let mut header_bytes = rmp_serde::to_vec(&self.header)?; + let header_length = header_bytes.len() as u16; + let data_length = self.data.len(); + let total_length = header_length as u64 + data_length as u64; - /// The identifier of the message - pub fn id(&self) -> u64 { - self.id - } + let mut buf = Vec::with_capacity(total_length as usize); + buf.append(&mut total_length.to_be_bytes().to_vec()); + buf.append(&mut header_length.to_be_bytes().to_vec()); + buf.append(&mut header_bytes); + buf.append(&mut self.data); - /// The ID of the message referenced by this message. - /// It represents the message that is replied to and can be None. - pub fn reference_id(&self) -> Option { - self.ref_id.clone() + Ok(buf) } } diff --git a/src/ipc/stream_emitter.rs b/src/ipc/stream_emitter.rs index e6945de8..b0c7a787 100644 --- a/src/ipc/stream_emitter.rs +++ b/src/ipc/stream_emitter.rs @@ -39,7 +39,9 @@ impl StreamEmitter { Event::new(event.to_string(), data_bytes, res_id) }; - let event_bytes = event.to_bytes()?; + let event_id = event.id(); + + let event_bytes = event.into_bytes()?; { let start = Instant::now(); let mut stream = self.stream.lock().await; @@ -47,7 +49,7 @@ impl StreamEmitter { log::trace!("Wrote {} bytes in {:?}", event_bytes.len(), start.elapsed()); } - Ok(EmitMetadata::new(event.id())) + Ok(EmitMetadata::new(event_id)) } /// Emits an event