use crate::error::{Error, Result}; use crate::events::generate_event_id; use crate::events::payload::FromPayload; use byteorder::{BigEndian, ReadBytesExt}; use bytes::{BufMut, Bytes, BytesMut}; use num_enum::{IntoPrimitive, TryFromPrimitive}; use std::convert::TryFrom; use std::fmt::Debug; use std::io::{Cursor, Read}; use tokio::io::{AsyncRead, AsyncReadExt}; pub const FORMAT_VERSION: [u8; 3] = [0, 9, 0]; /// A container representing an event and underlying binary data. /// The data can be decoded into an object representation or read /// as raw binary data. #[derive(Debug)] pub struct Event { header: EventHeader, data: Bytes, } #[derive(Debug)] struct EventHeader { id: u64, event_type: EventType, ref_id: Option, namespace: Option, name: String, } #[derive(Clone, Debug, TryFromPrimitive, IntoPrimitive, Copy, Ord, PartialOrd, Eq, PartialEq)] #[repr(u8)] pub enum EventType { Initiator, Response, End, Error, } impl Event { /// Creates a new event that acts as an initiator for further response events #[tracing::instrument(level = "trace", skip(data))] #[inline] pub fn initiator(namespace: Option, name: String, data: Bytes) -> Self { Self::new(namespace, name, data, None, EventType::Initiator) } /// Creates a new event that is a response to a previous event #[tracing::instrument(level = "trace", skip(data))] #[inline] pub fn response(namespace: Option, name: String, data: Bytes, ref_id: u64) -> Self { Self::new(namespace, name, data, Some(ref_id), EventType::Response) } /// Creates a new error event as a response to a previous event #[tracing::instrument(level = "trace", skip(data))] #[inline] pub fn error(namespace: Option, name: String, data: Bytes, ref_id: u64) -> Self { Self::new(namespace, name, data, Some(ref_id), EventType::Error) } /// Creates a new event that indicates the end of a series of responses (in an event handler) /// and might contain a final response payload #[tracing::instrument(level = "trace", skip(data))] #[inline] pub fn end(namespace: Option, name: String, data: Bytes, ref_id: u64) -> Self { Self::new(namespace, name, data, Some(ref_id), EventType::Response) } /// Creates a new event #[tracing::instrument(level = "trace", skip(data))] pub(crate) fn new( namespace: Option, name: String, data: Bytes, ref_id: Option, event_type: EventType, ) -> Self { let header = EventHeader { id: generate_event_id(), event_type, ref_id, namespace, name, }; Self { header, data } } /// The identifier of the message #[inline] pub fn id(&self) -> u64 { self.header.id } /// The type of the event #[inline] pub fn event_type(&self) -> EventType { self.header.event_type } /// The ID of the message referenced by this message. /// It represents the message that is replied to and can be None. #[inline] pub fn reference_id(&self) -> Option { self.header.ref_id.clone() } /// Decodes the payload to the given type implementing the receive payload trait #[inline] #[tracing::instrument(level = "trace", skip(self))] pub fn payload(&self) -> Result { let payload = T::from_payload(&self.data[..])?; Ok(payload) } /// Returns a reference of the underlying data #[inline] pub fn data_raw(&self) -> &[u8] { &self.data } /// Returns a reference to the namespace #[inline] pub fn namespace(&self) -> &Option { &self.header.namespace } /// Returns the name of the event #[inline] pub fn name(&self) -> &str { &self.header.name } /// Reads an event message #[tracing::instrument(level = "trace", skip(reader))] pub async fn from_async_read(reader: &mut R) -> Result { let total_length = reader.read_u64().await?; let header_length = reader.read_u16().await?; let data_length = total_length - header_length as u64; tracing::trace!(total_length, header_length, data_length); let mut header_bytes = vec![0u8; header_length as usize]; reader.read_exact(&mut header_bytes).await?; // additional header fields can be added a the end because when reading they will just be ignored let header: EventHeader = EventHeader::from_read(&mut Cursor::new(header_bytes))?; let mut buf = vec![0u8; data_length as usize]; reader.read_exact(&mut buf).await?; let event = Event { header, data: Bytes::from(buf), }; Ok(event) } /// Encodes the event into bytes #[tracing::instrument(level = "trace", skip(self))] pub fn into_bytes(self) -> Result { let header_bytes = self.header.into_bytes(); let header_length = header_bytes.len() as u16; let data_length = self.data.len(); let total_length = header_length as u64 + data_length as u64; tracing::trace!(total_length, header_length, data_length); let mut buf = BytesMut::with_capacity(total_length as usize); buf.put_u64(total_length); buf.put_u16(header_length); buf.put(header_bytes); buf.put(self.data); Ok(buf.freeze()) } } impl EventHeader { /// Serializes the event header into bytes pub fn into_bytes(self) -> Bytes { let mut buf = BytesMut::with_capacity(256); buf.put_slice(&FORMAT_VERSION); buf.put_u64(self.id); buf.put_u8(u8::from(self.event_type)); if let Some(ref_id) = self.ref_id { buf.put_u8(0xFF); buf.put_u64(ref_id); } else { buf.put_u8(0x00); } if let Some(namespace) = self.namespace { buf.put_u16(namespace.len() as u16); buf.put(Bytes::from(namespace)); } else { buf.put_u16(0); } buf.put_u16(self.name.len() as u16); buf.put(Bytes::from(self.name)); buf.freeze() } /// Parses an event header from an async reader pub fn from_read(reader: &mut R) -> Result { Self::read_version(reader)?; let id = reader.read_u64::()?; let event_type_num = reader.read_u8()?; let event_type = EventType::try_from(event_type_num).map_err(|_| Error::CorruptedEvent)?; let ref_id = Self::read_ref_id(reader)?; let namespace_len = reader.read_u16::()?; let namespace = Self::read_namespace(reader, namespace_len)?; let name = Self::read_name(reader)?; Ok(Self { id, event_type, ref_id, namespace, name, }) } /// Reads and validates the format version #[inline] fn read_version(reader: &mut R) -> Result> { let mut version = vec![0u8; 3]; reader.read_exact(&mut version)?; if version[0] != FORMAT_VERSION[0] { return Err(Error::unsupported_version_vec(version)); } Ok(version) } /// Reads the reference event id #[inline] fn read_ref_id(reader: &mut R) -> Result> { let ref_id_exists = reader.read_u8()?; let ref_id = match ref_id_exists { 0x00 => None, 0xFF => Some(reader.read_u64::()?), _ => return Err(Error::CorruptedEvent), }; Ok(ref_id) } /// Reads the name of the event #[inline] fn read_name(reader: &mut R) -> Result { let name_len = reader.read_u16::()?; Self::read_string(reader, name_len as usize) } /// Reads the namespace of the event #[inline] fn read_namespace(reader: &mut R, namespace_len: u16) -> Result> { let namespace = if namespace_len > 0 { Some(Self::read_string(reader, namespace_len as usize)?) } else { None }; Ok(namespace) } #[inline] fn read_string(reader: &mut R, length: usize) -> Result { let mut string_buf = vec![0u8; length]; reader.read_exact(&mut string_buf)?; String::from_utf8(string_buf).map_err(|_| Error::CorruptedEvent) } }