|
|
|
@ -1,9 +1,14 @@
|
|
|
|
|
use crate::error::{Error, Result};
|
|
|
|
|
use crate::events::generate_event_id;
|
|
|
|
|
use crate::events::payload::EventReceivePayload;
|
|
|
|
|
use crate::prelude::{IPCError, IPCResult};
|
|
|
|
|
use byteorder::{BigEndian, ReadBytesExt};
|
|
|
|
|
use std::fmt::Debug;
|
|
|
|
|
use std::io::{Cursor, Read};
|
|
|
|
|
use tokio::io::{AsyncRead, AsyncReadExt};
|
|
|
|
|
|
|
|
|
|
pub const FORMAT_VERSION: [u8; 3] = [0, 9, 0];
|
|
|
|
|
|
|
|
|
|
/// A container representing an event and underlying binary data.
|
|
|
|
|
/// The data can be decoded into an object representation or read
|
|
|
|
|
/// as raw binary data.
|
|
|
|
@ -93,7 +98,10 @@ impl Event {
|
|
|
|
|
let data_length = total_length - header_length as u64;
|
|
|
|
|
tracing::trace!(total_length, header_length, data_length);
|
|
|
|
|
|
|
|
|
|
let header: EventHeader = EventHeader::from_async_read(reader).await?;
|
|
|
|
|
let mut header_bytes = vec![0u8; header_length as usize];
|
|
|
|
|
reader.read_exact(&mut header_bytes).await?;
|
|
|
|
|
// additional header fields can be added a the end because when reading they will just be ignored
|
|
|
|
|
let header: EventHeader = EventHeader::from_read(&mut Cursor::new(header_bytes))?;
|
|
|
|
|
|
|
|
|
|
let mut data = vec![0u8; data_length as usize];
|
|
|
|
|
reader.read_exact(&mut data).await?;
|
|
|
|
@ -124,7 +132,7 @@ impl Event {
|
|
|
|
|
impl EventHeader {
|
|
|
|
|
/// Serializes the event header into bytes
|
|
|
|
|
pub fn into_bytes(self) -> Vec<u8> {
|
|
|
|
|
let mut buf = Vec::new();
|
|
|
|
|
let mut buf = FORMAT_VERSION.to_vec();
|
|
|
|
|
buf.append(&mut self.id.to_be_bytes().to_vec());
|
|
|
|
|
|
|
|
|
|
if let Some(ref_id) = self.ref_id {
|
|
|
|
@ -148,33 +156,67 @@ impl EventHeader {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Parses an event header from an async reader
|
|
|
|
|
pub async fn from_async_read<R: AsyncRead + Unpin>(reader: &mut R) -> Result<Self> {
|
|
|
|
|
let id = reader.read_u64().await?;
|
|
|
|
|
let ref_id_exists = reader.read_u8().await?;
|
|
|
|
|
pub fn from_read<R: Read>(reader: &mut R) -> Result<Self> {
|
|
|
|
|
Self::read_version(reader)?;
|
|
|
|
|
let id = reader.read_u64::<BigEndian>()?;
|
|
|
|
|
let ref_id = Self::read_ref_id(reader)?;
|
|
|
|
|
let namespace_len = reader.read_u16::<BigEndian>()?;
|
|
|
|
|
let namespace = Self::read_namespace(reader, namespace_len)?;
|
|
|
|
|
let name = Self::read_name(reader)?;
|
|
|
|
|
|
|
|
|
|
Ok(Self {
|
|
|
|
|
id,
|
|
|
|
|
ref_id,
|
|
|
|
|
namespace,
|
|
|
|
|
name,
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Reads and validates the format version
|
|
|
|
|
fn read_version<R: Read>(reader: &mut R) -> IPCResult<Vec<u8>> {
|
|
|
|
|
let mut version = vec![0u8; 3];
|
|
|
|
|
reader.read_exact(&mut version)?;
|
|
|
|
|
|
|
|
|
|
if version[0] != FORMAT_VERSION[0] {
|
|
|
|
|
return Err(IPCError::unsupported_version_vec(version));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Ok(version)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Reads the reference event id
|
|
|
|
|
fn read_ref_id<R: Read>(reader: &mut R) -> IPCResult<Option<u64>> {
|
|
|
|
|
let ref_id_exists = reader.read_u8()?;
|
|
|
|
|
let ref_id = match ref_id_exists {
|
|
|
|
|
0x00 => None,
|
|
|
|
|
0xFF => Some(reader.read_u64().await?),
|
|
|
|
|
0xFF => Some(reader.read_u64::<BigEndian>()?),
|
|
|
|
|
_ => return Err(Error::CorruptedEvent),
|
|
|
|
|
};
|
|
|
|
|
let namespace_len = reader.read_u16().await?;
|
|
|
|
|
|
|
|
|
|
Ok(ref_id)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Reads the name of the event
|
|
|
|
|
fn read_name<R: Read>(reader: &mut R) -> IPCResult<String> {
|
|
|
|
|
let name_len = reader.read_u16::<BigEndian>()?;
|
|
|
|
|
|
|
|
|
|
Self::read_string(reader, name_len as usize)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Reads the namespace of the event
|
|
|
|
|
fn read_namespace<R: Read>(reader: &mut R, namespace_len: u16) -> IPCResult<Option<String>> {
|
|
|
|
|
let namespace = if namespace_len > 0 {
|
|
|
|
|
let mut namespace_buf = vec![0u8; namespace_len as usize];
|
|
|
|
|
reader.read_exact(&mut namespace_buf).await?;
|
|
|
|
|
Some(String::from_utf8(namespace_buf).map_err(|_| Error::CorruptedEvent)?)
|
|
|
|
|
Some(Self::read_string(reader, namespace_len as usize)?)
|
|
|
|
|
} else {
|
|
|
|
|
None
|
|
|
|
|
};
|
|
|
|
|
let name_len = reader.read_u16().await?;
|
|
|
|
|
let mut name_buf = vec![0u8; name_len as usize];
|
|
|
|
|
reader.read_exact(&mut name_buf).await?;
|
|
|
|
|
let name = String::from_utf8(name_buf).map_err(|_| Error::CorruptedEvent)?;
|
|
|
|
|
|
|
|
|
|
Ok(Self {
|
|
|
|
|
id,
|
|
|
|
|
ref_id,
|
|
|
|
|
namespace,
|
|
|
|
|
name,
|
|
|
|
|
})
|
|
|
|
|
Ok(namespace)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn read_string<R: Read>(reader: &mut R, length: usize) -> IPCResult<String> {
|
|
|
|
|
let mut string_buf = vec![0u8; length];
|
|
|
|
|
reader.read_exact(&mut string_buf)?;
|
|
|
|
|
String::from_utf8(string_buf).map_err(|_| Error::CorruptedEvent)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|