From d1b426e10b0b9fc760d31d22b16a0dedf0c98b98 Mon Sep 17 00:00:00 2001 From: trivernis Date: Sun, 5 Dec 2021 10:30:06 +0100 Subject: [PATCH] Add event format version validation Signed-off-by: trivernis --- src/error.rs | 14 ++++++++ src/events/event.rs | 82 ++++++++++++++++++++++++++++++++++----------- 2 files changed, 76 insertions(+), 20 deletions(-) diff --git a/src/error.rs b/src/error.rs index 6bf711cc..0ae67c6b 100644 --- a/src/error.rs +++ b/src/error.rs @@ -33,6 +33,20 @@ pub enum Error { #[error("timed out")] Timeout, + + #[error("Unsupported API Version {0}")] + UnsupportedVersion(String), +} + +impl Error { + pub fn unsupported_version_vec(version: Vec) -> Self { + let mut version_string = version + .into_iter() + .fold(String::new(), |acc, val| format!("{}{}.", acc, val)); + version_string.pop(); + + Self::UnsupportedVersion(version_string) + } } impl From for Error { diff --git a/src/events/event.rs b/src/events/event.rs index 30c9f5f0..facdaf7c 100644 --- a/src/events/event.rs +++ b/src/events/event.rs @@ -1,9 +1,14 @@ use crate::error::{Error, Result}; use crate::events::generate_event_id; use crate::events::payload::EventReceivePayload; +use crate::prelude::{IPCError, IPCResult}; +use byteorder::{BigEndian, ReadBytesExt}; use std::fmt::Debug; +use std::io::{Cursor, Read}; use tokio::io::{AsyncRead, AsyncReadExt}; +pub const FORMAT_VERSION: [u8; 3] = [0, 9, 0]; + /// A container representing an event and underlying binary data. /// The data can be decoded into an object representation or read /// as raw binary data. @@ -93,7 +98,10 @@ impl Event { let data_length = total_length - header_length as u64; tracing::trace!(total_length, header_length, data_length); - let header: EventHeader = EventHeader::from_async_read(reader).await?; + let mut header_bytes = vec![0u8; header_length as usize]; + reader.read_exact(&mut header_bytes).await?; + // additional header fields can be added a the end because when reading they will just be ignored + let header: EventHeader = EventHeader::from_read(&mut Cursor::new(header_bytes))?; let mut data = vec![0u8; data_length as usize]; reader.read_exact(&mut data).await?; @@ -124,7 +132,7 @@ impl Event { impl EventHeader { /// Serializes the event header into bytes pub fn into_bytes(self) -> Vec { - let mut buf = Vec::new(); + let mut buf = FORMAT_VERSION.to_vec(); buf.append(&mut self.id.to_be_bytes().to_vec()); if let Some(ref_id) = self.ref_id { @@ -148,33 +156,67 @@ impl EventHeader { } /// Parses an event header from an async reader - pub async fn from_async_read(reader: &mut R) -> Result { - let id = reader.read_u64().await?; - let ref_id_exists = reader.read_u8().await?; + pub fn from_read(reader: &mut R) -> Result { + Self::read_version(reader)?; + let id = reader.read_u64::()?; + let ref_id = Self::read_ref_id(reader)?; + let namespace_len = reader.read_u16::()?; + let namespace = Self::read_namespace(reader, namespace_len)?; + let name = Self::read_name(reader)?; + + Ok(Self { + id, + ref_id, + namespace, + name, + }) + } + + /// Reads and validates the format version + fn read_version(reader: &mut R) -> IPCResult> { + let mut version = vec![0u8; 3]; + reader.read_exact(&mut version)?; + + if version[0] != FORMAT_VERSION[0] { + return Err(IPCError::unsupported_version_vec(version)); + } + + Ok(version) + } + + /// Reads the reference event id + fn read_ref_id(reader: &mut R) -> IPCResult> { + let ref_id_exists = reader.read_u8()?; let ref_id = match ref_id_exists { 0x00 => None, - 0xFF => Some(reader.read_u64().await?), + 0xFF => Some(reader.read_u64::()?), _ => return Err(Error::CorruptedEvent), }; - let namespace_len = reader.read_u16().await?; + Ok(ref_id) + } + + /// Reads the name of the event + fn read_name(reader: &mut R) -> IPCResult { + let name_len = reader.read_u16::()?; + + Self::read_string(reader, name_len as usize) + } + + /// Reads the namespace of the event + fn read_namespace(reader: &mut R, namespace_len: u16) -> IPCResult> { let namespace = if namespace_len > 0 { - let mut namespace_buf = vec![0u8; namespace_len as usize]; - reader.read_exact(&mut namespace_buf).await?; - Some(String::from_utf8(namespace_buf).map_err(|_| Error::CorruptedEvent)?) + Some(Self::read_string(reader, namespace_len as usize)?) } else { None }; - let name_len = reader.read_u16().await?; - let mut name_buf = vec![0u8; name_len as usize]; - reader.read_exact(&mut name_buf).await?; - let name = String::from_utf8(name_buf).map_err(|_| Error::CorruptedEvent)?; - Ok(Self { - id, - ref_id, - namespace, - name, - }) + Ok(namespace) + } + + fn read_string(reader: &mut R, length: usize) -> IPCResult { + let mut string_buf = vec![0u8; length]; + reader.read_exact(&mut string_buf)?; + String::from_utf8(string_buf).map_err(|_| Error::CorruptedEvent) } }