From ac471d296e007118c64ec7b3aea364adf0e5dde0 Mon Sep 17 00:00:00 2001 From: trivernis Date: Sat, 26 Mar 2022 12:12:48 +0100 Subject: [PATCH] Change internal bytes representation to Bytes object from bytes crate Signed-off-by: trivernis --- Cargo.lock | 3 +- Cargo.toml | 3 +- benches/deserialization_benchmark.rs | 15 ++-- benches/serialization_benchmark.rs | 9 ++- src/events/error_event.rs | 16 ++-- src/events/event.rs | 67 +++++++++-------- src/events/event_handler.rs | 7 +- src/events/payload.rs | 73 +++++++++++-------- src/events/payload_serializer/mod.rs | 5 +- .../payload_serializer/serialize_bincode.rs | 5 +- .../payload_serializer/serialize_json.rs | 5 +- .../payload_serializer/serialize_postcard.rs | 5 +- .../payload_serializer/serialize_rmp.rs | 5 +- src/ipc/stream_emitter/event_metadata.rs | 2 +- src/lib.rs | 7 ++ tests/test_event_streams.rs | 9 ++- tests/test_events_with_payload.rs | 19 ++--- tests/test_raw_events.rs | 5 +- 18 files changed, 148 insertions(+), 112 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 49e57c63..c21e5985 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -93,11 +93,12 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" [[package]] name = "bromine" -version = "0.19.0" +version = "0.20.0" dependencies = [ "async-trait", "bincode", "byteorder", + "bytes", "criterion", "crossbeam-utils", "futures", diff --git a/Cargo.toml b/Cargo.toml index a197f693..e9e82cd2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "bromine" -version = "0.19.0" +version = "0.20.0" authors = ["trivernis "] edition = "2018" readme = "README.md" @@ -31,6 +31,7 @@ trait-bound-typemap = "0.3.3" rmp-serde = { version = "1.0.0", optional = true } bincode = { version = "1.3.3", optional = true } serde_json = { version = "1.0.79", optional = true } +bytes = "1.1.0" [dependencies.serde] optional = true diff --git a/benches/deserialization_benchmark.rs b/benches/deserialization_benchmark.rs index c79889f1..76c2ee02 100644 --- a/benches/deserialization_benchmark.rs +++ b/benches/deserialization_benchmark.rs @@ -1,3 +1,4 @@ +use bytes::Bytes; use criterion::{black_box, BenchmarkId, Throughput}; use criterion::{criterion_group, criterion_main}; use criterion::{BatchSize, Criterion}; @@ -9,17 +10,21 @@ use tokio::runtime::Runtime; pub const EVENT_NAME: &str = "bench_event"; fn create_event_bytes_reader(data_size: usize) -> Cursor> { - let bytes = Event::initiator(None, EVENT_NAME.to_string(), vec![0u8; data_size]) - .into_bytes() - .unwrap(); - Cursor::new(bytes) + let bytes = Event::initiator( + None, + EVENT_NAME.to_string(), + Bytes::from(vec![0u8; data_size]), + ) + .into_bytes() + .unwrap(); + Cursor::new(bytes.to_vec()) } fn event_deserialization(c: &mut Criterion) { let runtime = Runtime::new().unwrap(); let mut group = c.benchmark_group("event_deserialization"); - for size in (0..10) + for size in (0..16) .step_by(2) .map(|i| 1024 * 2u32.pow(i as u32) as usize) { diff --git a/benches/serialization_benchmark.rs b/benches/serialization_benchmark.rs index 2d25643b..f99a31f0 100644 --- a/benches/serialization_benchmark.rs +++ b/benches/serialization_benchmark.rs @@ -1,4 +1,5 @@ use bromine::event::Event; +use bytes::Bytes; use criterion::{ black_box, criterion_group, criterion_main, BatchSize, BenchmarkId, Criterion, Throughput, }; @@ -6,13 +7,17 @@ use criterion::{ pub const EVENT_NAME: &str = "bench_event"; fn create_event(data_size: usize) -> Event { - Event::initiator(None, EVENT_NAME.to_string(), vec![0u8; data_size]) + Event::initiator( + None, + EVENT_NAME.to_string(), + Bytes::from(vec![0u8; data_size]), + ) } fn event_serialization(c: &mut Criterion) { let mut group = c.benchmark_group("event_serialization"); - for size in (0..10) + for size in (0..16) .step_by(2) .map(|i| 1024 * 2u32.pow(i as u32) as usize) { diff --git a/src/events/error_event.rs b/src/events/error_event.rs index bff310a1..90c78e34 100644 --- a/src/events/error_event.rs +++ b/src/events/error_event.rs @@ -3,6 +3,7 @@ use crate::error::Result; use crate::payload::{FromPayload, IntoPayload}; use crate::prelude::{IPCError, IPCResult}; use byteorder::{BigEndian, ReadBytesExt}; +use bytes::{BufMut, Bytes, BytesMut}; use std::error::Error; use std::fmt::{Display, Formatter}; use std::io::Read; @@ -29,14 +30,13 @@ impl Display for ErrorEventData { } impl IntoPayload for ErrorEventData { - fn into_payload(self, _: &Context) -> IPCResult> { - let mut buf = Vec::new(); - buf.append(&mut self.code.to_be_bytes().to_vec()); - let message_len = self.message.len() as u32; - buf.append(&mut message_len.to_be_bytes().to_vec()); - buf.append(&mut self.message.into_bytes()); - - Ok(buf) + fn into_payload(self, _: &Context) -> IPCResult { + let mut buf = BytesMut::new(); + buf.put_u16(self.code); + buf.put_u32(self.message.len() as u32); + buf.put(Bytes::from(self.message)); + + Ok(buf.freeze()) } } diff --git a/src/events/event.rs b/src/events/event.rs index c6e33ddc..59475f31 100644 --- a/src/events/event.rs +++ b/src/events/event.rs @@ -2,6 +2,7 @@ use crate::error::{Error, Result}; use crate::events::generate_event_id; use crate::events::payload::FromPayload; use byteorder::{BigEndian, ReadBytesExt}; +use bytes::{BufMut, Bytes, BytesMut}; use num_enum::{IntoPrimitive, TryFromPrimitive}; use std::convert::TryFrom; use std::fmt::Debug; @@ -16,7 +17,7 @@ pub const FORMAT_VERSION: [u8; 3] = [0, 9, 0]; #[derive(Debug)] pub struct Event { header: EventHeader, - data: Vec, + data: Bytes, } #[derive(Debug)] @@ -41,21 +42,21 @@ impl Event { /// Creates a new event that acts as an initiator for further response events #[tracing::instrument(level = "trace", skip(data))] #[inline] - pub fn initiator(namespace: Option, name: String, data: Vec) -> Self { + pub fn initiator(namespace: Option, name: String, data: Bytes) -> Self { Self::new(namespace, name, data, None, EventType::Initiator) } /// Creates a new event that is a response to a previous event #[tracing::instrument(level = "trace", skip(data))] #[inline] - pub fn response(namespace: Option, name: String, data: Vec, ref_id: u64) -> Self { + pub fn response(namespace: Option, name: String, data: Bytes, ref_id: u64) -> Self { Self::new(namespace, name, data, Some(ref_id), EventType::Response) } /// Creates a new error event as a response to a previous event #[tracing::instrument(level = "trace", skip(data))] #[inline] - pub fn error(namespace: Option, name: String, data: Vec, ref_id: u64) -> Self { + pub fn error(namespace: Option, name: String, data: Bytes, ref_id: u64) -> Self { Self::new(namespace, name, data, Some(ref_id), EventType::Error) } @@ -63,7 +64,7 @@ impl Event { /// and might contain a final response payload #[tracing::instrument(level = "trace", skip(data))] #[inline] - pub fn end(namespace: Option, name: String, data: Vec, ref_id: u64) -> Self { + pub fn end(namespace: Option, name: String, data: Bytes, ref_id: u64) -> Self { Self::new(namespace, name, data, Some(ref_id), EventType::Response) } @@ -72,7 +73,7 @@ impl Event { pub(crate) fn new( namespace: Option, name: String, - data: Vec, + data: Bytes, ref_id: Option, event_type: EventType, ) -> Self { @@ -145,57 +146,59 @@ impl Event { // additional header fields can be added a the end because when reading they will just be ignored let header: EventHeader = EventHeader::from_read(&mut Cursor::new(header_bytes))?; - let mut data = vec![0u8; data_length as usize]; - reader.read_exact(&mut data).await?; - let event = Event { header, data }; + let mut buf = vec![0u8; data_length as usize]; + reader.read_exact(&mut buf).await?; + let event = Event { + header, + data: Bytes::from(buf), + }; Ok(event) } /// Encodes the event into bytes #[tracing::instrument(level = "trace", skip(self))] - pub fn into_bytes(mut self) -> Result> { - let mut header_bytes = self.header.into_bytes(); + pub fn into_bytes(self) -> Result { + let header_bytes = self.header.into_bytes(); let header_length = header_bytes.len() as u16; let data_length = self.data.len(); let total_length = header_length as u64 + data_length as u64; tracing::trace!(total_length, header_length, data_length); - let mut buf = Vec::with_capacity(total_length as usize); - buf.append(&mut total_length.to_be_bytes().to_vec()); - buf.append(&mut header_length.to_be_bytes().to_vec()); - buf.append(&mut header_bytes); - buf.append(&mut self.data); + let mut buf = BytesMut::with_capacity(total_length as usize); + buf.put_u64(total_length); + buf.put_u16(header_length); + buf.put(header_bytes); + buf.put(self.data); - Ok(buf) + Ok(buf.freeze()) } } impl EventHeader { /// Serializes the event header into bytes - pub fn into_bytes(self) -> Vec { - let mut buf = FORMAT_VERSION.to_vec(); - buf.append(&mut self.id.to_be_bytes().to_vec()); - buf.push(self.event_type.into()); + pub fn into_bytes(self) -> Bytes { + let mut buf = BytesMut::with_capacity(256); + buf.put_slice(&FORMAT_VERSION); + buf.put_u64(self.id); + buf.put_u8(u8::from(self.event_type)); if let Some(ref_id) = self.ref_id { - buf.push(0xFF); - buf.append(&mut ref_id.to_be_bytes().to_vec()); + buf.put_u8(0xFF); + buf.put_u64(ref_id); } else { - buf.push(0x00); + buf.put_u8(0x00); } if let Some(namespace) = self.namespace { - let namespace_len = namespace.len() as u16; - buf.append(&mut namespace_len.to_be_bytes().to_vec()); - buf.append(&mut namespace.into_bytes()); + buf.put_u16(namespace.len() as u16); + buf.put(Bytes::from(namespace)); } else { - buf.append(&mut 0u16.to_be_bytes().to_vec()); + buf.put_u16(0); } - let name_len = self.name.len() as u16; - buf.append(&mut name_len.to_be_bytes().to_vec()); - buf.append(&mut self.name.into_bytes()); + buf.put_u16(self.name.len() as u16); + buf.put(Bytes::from(self.name)); - buf + buf.freeze() } /// Parses an event header from an async reader diff --git a/src/events/event_handler.rs b/src/events/event_handler.rs index 8b87f603..16c09fe4 100644 --- a/src/events/event_handler.rs +++ b/src/events/event_handler.rs @@ -2,13 +2,14 @@ use crate::error::Result; use crate::events::event::Event; use crate::ipc::context::Context; use crate::payload::{BytePayload, IntoPayload}; +use bytes::Bytes; use std::collections::HashMap; use std::fmt::{Debug, Formatter}; use std::future::Future; use std::pin::Pin; use std::sync::Arc; -pub struct Response(Vec); +pub struct Response(Bytes); impl Response { /// Creates a new response with a given payload @@ -20,11 +21,11 @@ impl Response { /// Creates an empty response pub fn empty() -> Self { - Self(vec![]) + Self(Bytes::new()) } pub(crate) fn into_byte_payload(self) -> BytePayload { - BytePayload::new(self.0) + BytePayload::from(self.0) } } diff --git a/src/events/payload.rs b/src/events/payload.rs index 2d7dbd21..bad2d26f 100644 --- a/src/events/payload.rs +++ b/src/events/payload.rs @@ -1,5 +1,6 @@ use crate::prelude::IPCResult; use byteorder::{BigEndian, ReadBytesExt}; +use bytes::{BufMut, Bytes, BytesMut}; use std::io::Read; #[cfg(feature = "serialize")] @@ -7,18 +8,13 @@ pub use super::payload_serializer::*; /// Trait that serializes a type into bytes and can fail pub trait TryIntoBytes { - fn try_into_bytes(self) -> IPCResult>; -} - -/// Trait that serializes a type into bytes and never fails -pub trait IntoBytes { - fn into_bytes(self) -> Vec; + fn try_into_bytes(self) -> IPCResult; } /// Trait to convert event data into sending bytes /// It is implemented for all types that implement Serialize pub trait IntoPayload { - fn into_payload(self, ctx: &Context) -> IPCResult>; + fn into_payload(self, ctx: &Context) -> IPCResult; } /// Trait to get the event data from receiving bytes. @@ -31,25 +27,39 @@ pub trait FromPayload: Sized { /// serializing them #[derive(Clone)] pub struct BytePayload { - bytes: Vec, + bytes: Bytes, } impl BytePayload { #[inline] pub fn new(bytes: Vec) -> Self { - Self { bytes } + Self { + bytes: Bytes::from(bytes), + } } - /// Returns the bytes of the payload + /// Returns the bytes as a `Vec` of the payload #[inline] pub fn into_inner(self) -> Vec { + self.bytes.to_vec() + } + + /// Returns the bytes struct of the payload + #[inline] + pub fn into_bytes(self) -> Bytes { self.bytes } } +impl From for BytePayload { + fn from(bytes: Bytes) -> Self { + Self { bytes } + } +} + impl IntoPayload for BytePayload { #[inline] - fn into_payload(self, _: &Context) -> IPCResult> { + fn into_payload(self, _: &Context) -> IPCResult { Ok(self.bytes) } } @@ -87,20 +97,18 @@ impl TandemPayload { } impl IntoPayload for TandemPayload { - fn into_payload(self, ctx: &Context) -> IPCResult> { - let mut p1_bytes = self.load1.into_payload(&ctx)?; - let mut p2_bytes = self.load2.into_payload(&ctx)?; + fn into_payload(self, ctx: &Context) -> IPCResult { + let p1_bytes = self.load1.into_payload(&ctx)?; + let p2_bytes = self.load2.into_payload(&ctx)?; - let mut p1_length_bytes = (p1_bytes.len() as u64).to_be_bytes().to_vec(); - let mut p2_length_bytes = (p2_bytes.len() as u64).to_be_bytes().to_vec(); + let mut bytes = BytesMut::with_capacity(p1_bytes.len() + p2_bytes.len() + 16); - let mut bytes = Vec::new(); - bytes.append(&mut p1_length_bytes); - bytes.append(&mut p1_bytes); - bytes.append(&mut p2_length_bytes); - bytes.append(&mut p2_bytes); + bytes.put_u64(p1_bytes.len() as u64); + bytes.put(p1_bytes); + bytes.put_u64(p2_bytes.len() as u64); + bytes.put(p2_bytes); - Ok(bytes) + Ok(bytes.freeze()) } } @@ -123,8 +131,8 @@ impl FromPayload for TandemPayload { #[cfg(not(feature = "serialize"))] impl IntoPayload for () { - fn into_payload(self, _: &Context) -> IPCResult> { - Ok(vec![]) + fn into_payload(self, _: &Context) -> IPCResult { + Ok(Bytes::new()) } } @@ -135,6 +143,7 @@ mod serde_payload { use crate::payload::{FromPayload, TryIntoBytes}; use crate::prelude::{IPCResult, IntoPayload}; use byteorder::ReadBytesExt; + use bytes::{BufMut, Bytes, BytesMut}; use serde::de::DeserializeOwned; use serde::Serialize; use std::io::Read; @@ -168,20 +177,20 @@ mod serde_payload { } impl TryIntoBytes for SerdePayload { - fn try_into_bytes(self) -> IPCResult> { - let mut buf = Vec::new(); - let mut data_bytes = self.serializer.serialize(self.data)?; + fn try_into_bytes(self) -> IPCResult { + let mut buf = BytesMut::new(); + let data_bytes = self.serializer.serialize(self.data)?; let format_id = self.serializer as u8; - buf.push(format_id); - buf.append(&mut data_bytes); + buf.put_u8(format_id); + buf.put(data_bytes); - Ok(buf) + Ok(buf.freeze()) } } impl IntoPayload for SerdePayload { #[inline] - fn into_payload(self, _: &Context) -> IPCResult> { + fn into_payload(self, _: &Context) -> IPCResult { self.try_into_bytes() } } @@ -198,7 +207,7 @@ mod serde_payload { impl IntoPayload for T { #[inline] - fn into_payload(self, ctx: &Context) -> IPCResult> { + fn into_payload(self, ctx: &Context) -> IPCResult { ctx.create_serde_payload(self).into_payload(&ctx) } } diff --git a/src/events/payload_serializer/mod.rs b/src/events/payload_serializer/mod.rs index 7dab9643..c96923c6 100644 --- a/src/events/payload_serializer/mod.rs +++ b/src/events/payload_serializer/mod.rs @@ -1,3 +1,4 @@ +use bytes::Bytes; use serde::de::DeserializeOwned; use serde::Serialize; use std::io::Read; @@ -49,7 +50,7 @@ pub enum SerializationError { UnknownFormat(usize), } -#[derive(Clone, Debug, Ord, PartialOrd, Eq, PartialEq)] +#[derive(Clone, Copy, Debug, Ord, PartialOrd, Eq, PartialEq)] pub enum DynamicSerializer { Messagepack, Bincode, @@ -109,7 +110,7 @@ impl DynamicSerializer { } } - pub fn serialize(&self, data: T) -> SerializationResult> { + pub fn serialize(&self, data: T) -> SerializationResult { match self { #[cfg(feature = "serialize_rmp")] DynamicSerializer::Messagepack => serialize_rmp::serialize(data), diff --git a/src/events/payload_serializer/serialize_bincode.rs b/src/events/payload_serializer/serialize_bincode.rs index 9567327b..afbc75b9 100644 --- a/src/events/payload_serializer/serialize_bincode.rs +++ b/src/events/payload_serializer/serialize_bincode.rs @@ -1,13 +1,14 @@ use crate::payload::SerializationResult; +use bytes::Bytes; use serde::de::DeserializeOwned; use serde::Serialize; use std::io::Read; #[inline] -pub fn serialize(data: T) -> SerializationResult> { +pub fn serialize(data: T) -> SerializationResult { let bytes = bincode::serialize(&data)?; - Ok(bytes) + Ok(Bytes::from(bytes)) } #[inline] diff --git a/src/events/payload_serializer/serialize_json.rs b/src/events/payload_serializer/serialize_json.rs index c35af059..5f41495c 100644 --- a/src/events/payload_serializer/serialize_json.rs +++ b/src/events/payload_serializer/serialize_json.rs @@ -1,13 +1,14 @@ use crate::payload::SerializationResult; +use bytes::Bytes; use serde::de::DeserializeOwned; use serde::Serialize; use std::io::Read; #[inline] -pub fn serialize(data: T) -> SerializationResult> { +pub fn serialize(data: T) -> SerializationResult { let bytes = serde_json::to_vec(&data)?; - Ok(bytes) + Ok(Bytes::from(bytes)) } #[inline] diff --git a/src/events/payload_serializer/serialize_postcard.rs b/src/events/payload_serializer/serialize_postcard.rs index 75237289..ab50d081 100644 --- a/src/events/payload_serializer/serialize_postcard.rs +++ b/src/events/payload_serializer/serialize_postcard.rs @@ -1,13 +1,14 @@ use crate::payload::SerializationResult; +use bytes::Bytes; use serde::de::DeserializeOwned; use serde::Serialize; use std::io::Read; #[inline] -pub fn serialize(data: T) -> SerializationResult> { +pub fn serialize(data: T) -> SerializationResult { let bytes = postcard::to_allocvec(&data)?.to_vec(); - Ok(bytes) + Ok(Bytes::from(bytes)) } #[inline] diff --git a/src/events/payload_serializer/serialize_rmp.rs b/src/events/payload_serializer/serialize_rmp.rs index ec9dfcc6..13e081af 100644 --- a/src/events/payload_serializer/serialize_rmp.rs +++ b/src/events/payload_serializer/serialize_rmp.rs @@ -1,13 +1,14 @@ use crate::payload::SerializationResult; +use bytes::Bytes; use serde::de::DeserializeOwned; use serde::Serialize; use std::io::Read; #[inline] -pub fn serialize(data: T) -> SerializationResult> { +pub fn serialize(data: T) -> SerializationResult { let bytes = rmp_serde::to_vec(&data)?; - Ok(bytes) + Ok(Bytes::from(bytes)) } #[inline] diff --git a/src/ipc/stream_emitter/event_metadata.rs b/src/ipc/stream_emitter/event_metadata.rs index c9a834f3..4ed9a57c 100644 --- a/src/ipc/stream_emitter/event_metadata.rs +++ b/src/ipc/stream_emitter/event_metadata.rs @@ -37,7 +37,7 @@ impl EventMetadata

{ let payload = self.payload.take().ok_or(Error::InvalidState)?; let res_id = self.res_id.take().ok_or(Error::InvalidState)?; let event_type = self.event_type.take().ok_or(Error::InvalidState)?; - let payload_bytes = payload.into_payload(&ctx)?; + let payload_bytes = payload.into_payload(&ctx)?.into(); let event = Event::new( namespace, diff --git a/src/lib.rs b/src/lib.rs index d15bc06e..b3cc6fe8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -119,6 +119,12 @@ mod macros; mod namespaces; pub mod protocol; +/// Reexported for usage in payload implementations +pub use bytes; + +/// Reexported for sharing data in context +pub use trait_bound_typemap; + pub use events::error_event; pub use events::event; pub use events::event_handler; @@ -146,4 +152,5 @@ pub mod prelude { pub use crate::payload::*; pub use crate::protocol::*; pub use crate::*; + pub use trait_bound_typemap::TypeMap; } diff --git a/tests/test_event_streams.rs b/tests/test_event_streams.rs index 5a8f2edb..dfdbbd51 100644 --- a/tests/test_event_streams.rs +++ b/tests/test_event_streams.rs @@ -3,6 +3,7 @@ use crate::utils::protocol::TestProtocolListener; use crate::utils::{get_free_port, start_server_and_client}; use bromine::prelude::*; use byteorder::ReadBytesExt; +use bytes::Bytes; use futures::StreamExt; use std::io::Read; use std::time::Duration; @@ -66,16 +67,16 @@ async fn handle_stream_event(ctx: &Context, event: Event) -> IPCResult pub struct EmptyPayload; impl IntoPayload for EmptyPayload { - fn into_payload(self, _: &Context) -> IPCResult> { - Ok(vec![]) + fn into_payload(self, _: &Context) -> IPCResult { + Ok(Bytes::new()) } } pub struct NumberPayload(u8); impl IntoPayload for NumberPayload { - fn into_payload(self, _: &Context) -> IPCResult> { - Ok(vec![self.0]) + fn into_payload(self, _: &Context) -> IPCResult { + Ok(Bytes::from(vec![self.0])) } } diff --git a/tests/test_events_with_payload.rs b/tests/test_events_with_payload.rs index 7c14ad7f..1e2be84c 100644 --- a/tests/test_events_with_payload.rs +++ b/tests/test_events_with_payload.rs @@ -91,6 +91,7 @@ mod payload_impl { use bromine::payload::{FromPayload, IntoPayload}; use bromine::prelude::IPCResult; use byteorder::{BigEndian, ReadBytesExt}; + use bytes::{BufMut, Bytes, BytesMut}; use std::io::Read; pub struct SimplePayload { @@ -99,17 +100,13 @@ mod payload_impl { } impl IntoPayload for SimplePayload { - fn into_payload(self, _: &Context) -> IPCResult> { - let mut buf = Vec::new(); - let string_length = self.string.len() as u16; - let string_length_bytes = string_length.to_be_bytes(); - buf.append(&mut string_length_bytes.to_vec()); - let mut string_bytes = self.string.into_bytes(); - buf.append(&mut string_bytes); - let num_bytes = self.number.to_be_bytes(); - buf.append(&mut num_bytes.to_vec()); - - Ok(buf) + fn into_payload(self, _: &Context) -> IPCResult { + let mut buf = BytesMut::new(); + buf.put_u16(self.string.len() as u16); + buf.put(Bytes::from(self.string)); + buf.put_u32(self.number); + + Ok(buf.freeze()) } } diff --git a/tests/test_raw_events.rs b/tests/test_raw_events.rs index c690f869..dfa9e7d3 100644 --- a/tests/test_raw_events.rs +++ b/tests/test_raw_events.rs @@ -2,6 +2,7 @@ mod utils; use crate::utils::start_server_and_client; use bromine::prelude::*; +use bytes::Bytes; use std::time::Duration; use utils::call_counter::*; use utils::get_free_port; @@ -132,7 +133,7 @@ async fn handle_error_event(ctx: &Context, event: Event) -> IPCResult pub struct EmptyPayload; impl IntoPayload for EmptyPayload { - fn into_payload(self, _: &Context) -> IPCResult> { - Ok(vec![]) + fn into_payload(self, _: &Context) -> IPCResult { + Ok(Bytes::new()) } }