Merge pull request #29 from Trivernis/develop

Add event reply based timeouts
pull/32/head
Julius Riegel 2 years ago committed by GitHub
commit e3536c1c0a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

105
Cargo.lock generated

@ -13,9 +13,9 @@ dependencies = [
[[package]] [[package]]
name = "async-trait" name = "async-trait"
version = "0.1.51" version = "0.1.52"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "44318e776df68115a881de9a8fd1b9e53368d7a4a5ce4cc48517da3393233a5e" checksum = "061a7acccaa286c011ddc30970520b98fa40e00c9d644633fb26b5fc63a265e3"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
@ -93,7 +93,7 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]] [[package]]
name = "bromine" name = "bromine"
version = "0.16.1" version = "0.16.2"
dependencies = [ dependencies = [
"async-trait", "async-trait",
"bincode", "bincode",
@ -282,7 +282,7 @@ checksum = "22813a6dc45b335f9bade10bf7271dc477e81113e89eb251a0bc2a8a81c536e1"
dependencies = [ dependencies = [
"bstr", "bstr",
"csv-core", "csv-core",
"itoa", "itoa 0.4.8",
"ryu", "ryu",
"serde", "serde",
] ]
@ -314,9 +314,9 @@ dependencies = [
[[package]] [[package]]
name = "futures" name = "futures"
version = "0.3.18" version = "0.3.19"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8cd0210d8c325c245ff06fd95a3b13689a1a276ac8cfa8e8720cb840bfb84b9e" checksum = "28560757fe2bb34e79f907794bb6b22ae8b0e5c669b638a1132f2592b19035b4"
dependencies = [ dependencies = [
"futures-channel", "futures-channel",
"futures-core", "futures-core",
@ -329,9 +329,9 @@ dependencies = [
[[package]] [[package]]
name = "futures-channel" name = "futures-channel"
version = "0.3.18" version = "0.3.19"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7fc8cd39e3dbf865f7340dce6a2d401d24fd37c6fe6c4f0ee0de8bfca2252d27" checksum = "ba3dda0b6588335f360afc675d0564c17a77a2bda81ca178a4b6081bd86c7f0b"
dependencies = [ dependencies = [
"futures-core", "futures-core",
"futures-sink", "futures-sink",
@ -339,15 +339,15 @@ dependencies = [
[[package]] [[package]]
name = "futures-core" name = "futures-core"
version = "0.3.18" version = "0.3.19"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "629316e42fe7c2a0b9a65b47d159ceaa5453ab14e8f0a3c5eedbb8cd55b4a445" checksum = "d0c8ff0461b82559810cdccfde3215c3f373807f5e5232b71479bff7bb2583d7"
[[package]] [[package]]
name = "futures-executor" name = "futures-executor"
version = "0.3.18" version = "0.3.19"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b808bf53348a36cab739d7e04755909b9fcaaa69b7d7e588b37b6ec62704c97" checksum = "29d6d2ff5bb10fb95c85b8ce46538a2e5f5e7fdc755623a7d4529ab8a4ed9d2a"
dependencies = [ dependencies = [
"futures-core", "futures-core",
"futures-task", "futures-task",
@ -356,15 +356,15 @@ dependencies = [
[[package]] [[package]]
name = "futures-io" name = "futures-io"
version = "0.3.18" version = "0.3.19"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e481354db6b5c353246ccf6a728b0c5511d752c08da7260546fc0933869daa11" checksum = "b1f9d34af5a1aac6fb380f735fe510746c38067c5bf16c7fd250280503c971b2"
[[package]] [[package]]
name = "futures-macro" name = "futures-macro"
version = "0.3.18" version = "0.3.19"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a89f17b21645bc4ed773c69af9c9a0effd4a3f1a3876eadd453469f8854e7fdd" checksum = "6dbd947adfffb0efc70599b3ddcf7b5597bb5fa9e245eb99f62b3a5f7bb8bd3c"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
@ -373,21 +373,21 @@ dependencies = [
[[package]] [[package]]
name = "futures-sink" name = "futures-sink"
version = "0.3.18" version = "0.3.19"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "996c6442437b62d21a32cd9906f9c41e7dc1e19a9579843fad948696769305af" checksum = "e3055baccb68d74ff6480350f8d6eb8fcfa3aa11bdc1a1ae3afdd0514617d508"
[[package]] [[package]]
name = "futures-task" name = "futures-task"
version = "0.3.18" version = "0.3.19"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dabf1872aaab32c886832f2276d2f5399887e2bd613698a02359e4ea83f8de12" checksum = "6ee7c6485c30167ce4dfb83ac568a849fe53274c831081476ee13e0dce1aad72"
[[package]] [[package]]
name = "futures-util" name = "futures-util"
version = "0.3.18" version = "0.3.19"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "41d22213122356472061ac0f1ab2cee28d2bac8491410fd68c2af53d1cedb83e" checksum = "d9b5cf40b47a271f77a8b1bec03ca09044d99d2372c0de244e66430761127164"
dependencies = [ dependencies = [
"futures-channel", "futures-channel",
"futures-core", "futures-core",
@ -418,9 +418,9 @@ dependencies = [
[[package]] [[package]]
name = "heapless" name = "heapless"
version = "0.7.8" version = "0.7.9"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c1ad878e07405df82b695089e63d278244344f80e764074d0bdfe99b89460f3" checksum = "7e476c64197665c3725621f0ac3f9e5209aa5e889e02a08b1daf5f16dc5fd952"
dependencies = [ dependencies = [
"atomic-polyfill", "atomic-polyfill",
"hash32", "hash32",
@ -440,9 +440,9 @@ dependencies = [
[[package]] [[package]]
name = "itertools" name = "itertools"
version = "0.10.1" version = "0.10.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69ddb889f9d0d08a67338271fa9b62996bc788c7796a5c18cf057420aaed5eaf" checksum = "a9a9d19fa1e79b6215ff29b9d6880b706147f16e9b1dbb1e4e5947b5b02bc5e3"
dependencies = [ dependencies = [
"either", "either",
] ]
@ -453,6 +453,12 @@ version = "0.4.8"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b71991ff56294aa922b450139ee08b3bfc70982c6b2c7562771375cf73542dd4" checksum = "b71991ff56294aa922b450139ee08b3bfc70982c6b2c7562771375cf73542dd4"
[[package]]
name = "itoa"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1aab8fc367588b89dcee83ab0fd66b72b50b72fa1904d7095045ace2b0c81c35"
[[package]] [[package]]
name = "js-sys" name = "js-sys"
version = "0.3.55" version = "0.3.55"
@ -470,9 +476,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
[[package]] [[package]]
name = "libc" name = "libc"
version = "0.2.108" version = "0.2.112"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8521a1b57e76b1ec69af7599e75e38e7b7fad6610f037db8c79b127201b5d119" checksum = "1b03d17f364a3a042d5e5d46b053bbbf82c92c9430c592dd4c064dc6ee997125"
[[package]] [[package]]
name = "lock_api" name = "lock_api"
@ -500,9 +506,9 @@ checksum = "308cc39be01b73d0d18f82a0e7b2a3df85245f84af96fdddc5d202d27e47b86a"
[[package]] [[package]]
name = "memoffset" name = "memoffset"
version = "0.6.4" version = "0.6.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "59accc507f1338036a0477ef61afdae33cde60840f4dfe481319ce3ad116ddf9" checksum = "5aa361d4faea93603064a027415f07bd8e1d5c88c9fbf68bf56a285428fd79ce"
dependencies = [ dependencies = [
"autocfg", "autocfg",
] ]
@ -564,9 +570,9 @@ dependencies = [
[[package]] [[package]]
name = "num_cpus" name = "num_cpus"
version = "1.13.0" version = "1.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "05499f3756671c15885fee9034446956fff3f243d6077b91e5767df161f766b3" checksum = "19e64526ebdee182341572e50e9ad03965aa510cd94427a4549448f285e957a1"
dependencies = [ dependencies = [
"hermit-abi", "hermit-abi",
"libc", "libc",
@ -637,9 +643,9 @@ checksum = "7c68cb38ed13fd7bc9dd5db8f165b7c8d9c1a315104083a2b10f11354c2af97f"
[[package]] [[package]]
name = "proc-macro2" name = "proc-macro2"
version = "1.0.32" version = "1.0.35"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba508cc11742c0dc5c1659771673afbab7a0efab23aa17e854cbab0837ed0b43" checksum = "392a54546fda6b7cc663379d0e6ce8b324cf88aecc5a499838e1be9781bdce2e"
dependencies = [ dependencies = [
"unicode-xid", "unicode-xid",
] ]
@ -763,9 +769,9 @@ dependencies = [
[[package]] [[package]]
name = "ryu" name = "ryu"
version = "1.0.6" version = "1.0.9"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3c9613b5a66ab9ba26415184cfc41156594925a9cf3a2057e57f31ff145f6568" checksum = "73b4b750c782965c211b42f022f59af1fbceabdd026623714f104152f1ec149f"
[[package]] [[package]]
name = "same-file" name = "same-file"
@ -805,9 +811,9 @@ checksum = "388a1df253eca08550bef6c72392cfe7c30914bf41df5269b68cbd6ff8f570a3"
[[package]] [[package]]
name = "serde" name = "serde"
version = "1.0.130" version = "1.0.132"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f12d06de37cf59146fbdecab66aa99f9fe4f78722e3607577a5375d66bd0c913" checksum = "8b9875c23cf305cd1fd7eb77234cbb705f21ea6a72c637a5c6db5fe4b8e7f008"
dependencies = [ dependencies = [
"serde_derive", "serde_derive",
] ]
@ -824,9 +830,9 @@ dependencies = [
[[package]] [[package]]
name = "serde_derive" name = "serde_derive"
version = "1.0.130" version = "1.0.132"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d7bc1a1ab1961464eae040d96713baa5a724a8152c1222492465b54322ec508b" checksum = "ecc0db5cb2556c0e558887d9bbdcf6ac4471e83ff66cf696e5419024d1606276"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
@ -835,11 +841,11 @@ dependencies = [
[[package]] [[package]]
name = "serde_json" name = "serde_json"
version = "1.0.72" version = "1.0.73"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d0ffa0837f2dfa6fb90868c2b5468cad482e175f7dad97e7421951e663f2b527" checksum = "bcbd0344bc6533bc7ec56df11d42fb70f1b912351c0825ccb7211b59d8af7cf5"
dependencies = [ dependencies = [
"itoa", "itoa 1.0.1",
"ryu", "ryu",
"serde", "serde",
] ]
@ -867,9 +873,9 @@ checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3"
[[package]] [[package]]
name = "syn" name = "syn"
version = "1.0.82" version = "1.0.84"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8daf5dd0bb60cbd4137b1b587d2fc0ae729bc07cf01cd70b36a1ed5ade3b9d59" checksum = "ecb2e6da8ee5eb9a61068762a32fa9619cc591ceb055b3687f4cd4051ec2e06b"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
@ -917,11 +923,10 @@ dependencies = [
[[package]] [[package]]
name = "tokio" name = "tokio"
version = "1.14.0" version = "1.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "70e992e41e0d2fb9f755b37446f20900f64446ef54874f40a60c78f021ac6144" checksum = "fbbf1c778ec206785635ce8ad57fe52b3009ae9e0c9f574a728f3049d3e55838"
dependencies = [ dependencies = [
"autocfg",
"bytes", "bytes",
"libc", "libc",
"memchr", "memchr",
@ -934,9 +939,9 @@ dependencies = [
[[package]] [[package]]
name = "tokio-macros" name = "tokio-macros"
version = "1.6.0" version = "1.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c9efc1aba077437943f7515666aa2b882dfabfbfdf89c819ea75a8d6e9eaba5e" checksum = "b557f72f448c511a979e2564e55d74e6c4432fc96ff4f6241bc6bded342643b7"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",

@ -1,6 +1,6 @@
[package] [package]
name = "bromine" name = "bromine"
version = "0.16.1" version = "0.16.2"
authors = ["trivernis <trivernis@protonmail.com>"] authors = ["trivernis <trivernis@protonmail.com>"]
edition = "2018" edition = "2018"
readme = "README.md" readme = "README.md"
@ -25,19 +25,19 @@ tracing = "0.1.29"
lazy_static = "1.4.0" lazy_static = "1.4.0"
typemap_rev = "0.1.5" typemap_rev = "0.1.5"
byteorder = "1.4.3" byteorder = "1.4.3"
async-trait = "0.1.51" async-trait = "0.1.52"
futures = "0.3.17" futures = "0.3.19"
rmp-serde = {version = "0.15.5", optional = true} rmp-serde = {version = "0.15.5", optional = true}
bincode = {version = "1.3.3", optional = true} bincode = {version = "1.3.3", optional = true}
serde_json = {version = "1.0.72", optional = true} serde_json = {version = "1.0.73", optional = true}
[dependencies.serde] [dependencies.serde]
optional = true optional = true
version = "1.0.130" version = "1.0.132"
features = [] features = []
[dependencies.tokio] [dependencies.tokio]
version = "1.12.0" version = "1.15.0"
features = ["net", "io-std", "io-util", "sync", "time"] features = ["net", "io-std", "io-util", "sync", "time"]
[dependencies.postcard] [dependencies.postcard]
@ -46,11 +46,11 @@ optional = true
features = ["alloc"] features = ["alloc"]
[dev-dependencies] [dev-dependencies]
rmp-serde = "0.15.4" rmp-serde = "0.15.5"
crossbeam-utils = "0.8.5" crossbeam-utils = "0.8.5"
[dev-dependencies.serde] [dev-dependencies.serde]
version = "1.0.130" version = "1.0.132"
features = ["serde_derive"] features = ["serde_derive"]
[dev-dependencies.criterion] [dev-dependencies.criterion]
@ -58,7 +58,7 @@ version = "0.3.5"
features = ["async_tokio", "html_reports"] features = ["async_tokio", "html_reports"]
[dev-dependencies.tokio] [dev-dependencies.tokio]
version = "1.12.0" version = "1.15.0"
features = ["macros", "rt-multi-thread"] features = ["macros", "rt-multi-thread"]
[features] [features]

@ -56,17 +56,20 @@ impl Event {
} }
/// The identifier of the message /// The identifier of the message
#[inline]
pub fn id(&self) -> u64 { pub fn id(&self) -> u64 {
self.header.id self.header.id
} }
/// The ID of the message referenced by this message. /// The ID of the message referenced by this message.
/// It represents the message that is replied to and can be None. /// It represents the message that is replied to and can be None.
#[inline]
pub fn reference_id(&self) -> Option<u64> { pub fn reference_id(&self) -> Option<u64> {
self.header.ref_id.clone() self.header.ref_id.clone()
} }
/// Decodes the payload to the given type implementing the receive payload trait /// Decodes the payload to the given type implementing the receive payload trait
#[inline]
#[tracing::instrument(level = "trace", skip(self))] #[tracing::instrument(level = "trace", skip(self))]
pub fn payload<T: FromPayload>(&self) -> Result<T> { pub fn payload<T: FromPayload>(&self) -> Result<T> {
let payload = T::from_payload(&self.data[..])?; let payload = T::from_payload(&self.data[..])?;
@ -75,16 +78,19 @@ impl Event {
} }
/// Returns a reference of the underlying data /// Returns a reference of the underlying data
#[inline]
pub fn data_raw(&self) -> &[u8] { pub fn data_raw(&self) -> &[u8] {
&self.data &self.data
} }
/// Returns a reference to the namespace /// Returns a reference to the namespace
#[inline]
pub fn namespace(&self) -> &Option<String> { pub fn namespace(&self) -> &Option<String> {
&self.header.namespace &self.header.namespace
} }
/// Returns the name of the event /// Returns the name of the event
#[inline]
pub fn name(&self) -> &str { pub fn name(&self) -> &str {
&self.header.name &self.header.name
} }
@ -172,6 +178,7 @@ impl EventHeader {
} }
/// Reads and validates the format version /// Reads and validates the format version
#[inline]
fn read_version<R: Read>(reader: &mut R) -> Result<Vec<u8>> { fn read_version<R: Read>(reader: &mut R) -> Result<Vec<u8>> {
let mut version = vec![0u8; 3]; let mut version = vec![0u8; 3];
reader.read_exact(&mut version)?; reader.read_exact(&mut version)?;
@ -184,6 +191,7 @@ impl EventHeader {
} }
/// Reads the reference event id /// Reads the reference event id
#[inline]
fn read_ref_id<R: Read>(reader: &mut R) -> Result<Option<u64>> { fn read_ref_id<R: Read>(reader: &mut R) -> Result<Option<u64>> {
let ref_id_exists = reader.read_u8()?; let ref_id_exists = reader.read_u8()?;
let ref_id = match ref_id_exists { let ref_id = match ref_id_exists {
@ -196,6 +204,7 @@ impl EventHeader {
} }
/// Reads the name of the event /// Reads the name of the event
#[inline]
fn read_name<R: Read>(reader: &mut R) -> Result<String> { fn read_name<R: Read>(reader: &mut R) -> Result<String> {
let name_len = reader.read_u16::<BigEndian>()?; let name_len = reader.read_u16::<BigEndian>()?;
@ -203,6 +212,7 @@ impl EventHeader {
} }
/// Reads the namespace of the event /// Reads the namespace of the event
#[inline]
fn read_namespace<R: Read>(reader: &mut R, namespace_len: u16) -> Result<Option<String>> { fn read_namespace<R: Read>(reader: &mut R, namespace_len: u16) -> Result<Option<String>> {
let namespace = if namespace_len > 0 { let namespace = if namespace_len > 0 {
Some(Self::read_string(reader, namespace_len as usize)?) Some(Self::read_string(reader, namespace_len as usize)?)
@ -213,6 +223,7 @@ impl EventHeader {
Ok(namespace) Ok(namespace)
} }
#[inline]
fn read_string<R: Read>(reader: &mut R, length: usize) -> Result<String> { fn read_string<R: Read>(reader: &mut R, length: usize) -> Result<String> {
let mut string_buf = vec![0u8; length]; let mut string_buf = vec![0u8; length];
reader.read_exact(&mut string_buf)?; reader.read_exact(&mut string_buf)?;

@ -54,6 +54,7 @@ impl EventHandler {
} }
/// Handles a received event /// Handles a received event
#[inline]
#[tracing::instrument(level = "debug", skip(self, ctx, event))] #[tracing::instrument(level = "debug", skip(self, ctx, event))]
pub async fn handle_event(&self, ctx: &Context, event: Event) -> Result<()> { pub async fn handle_event(&self, ctx: &Context, event: Event) -> Result<()> {
if let Some(cb) = self.callbacks.get(event.name()) { if let Some(cb) = self.callbacks.get(event.name()) {

@ -35,23 +35,27 @@ pub struct BytePayload {
} }
impl BytePayload { impl BytePayload {
#[inline]
pub fn new(bytes: Vec<u8>) -> Self { pub fn new(bytes: Vec<u8>) -> Self {
Self { bytes } Self { bytes }
} }
/// Returns the bytes of the payload /// Returns the bytes of the payload
#[inline]
pub fn into_inner(self) -> Vec<u8> { pub fn into_inner(self) -> Vec<u8> {
self.bytes self.bytes
} }
} }
impl IntoPayload for BytePayload { impl IntoPayload for BytePayload {
#[inline]
fn into_payload(self, _: &Context) -> IPCResult<Vec<u8>> { fn into_payload(self, _: &Context) -> IPCResult<Vec<u8>> {
Ok(self.bytes) Ok(self.bytes)
} }
} }
impl FromPayload for BytePayload { impl FromPayload for BytePayload {
#[inline]
fn from_payload<R: Read>(mut reader: R) -> IPCResult<Self> { fn from_payload<R: Read>(mut reader: R) -> IPCResult<Self> {
let mut buf = Vec::new(); let mut buf = Vec::new();
reader.read_to_end(&mut buf)?; reader.read_to_end(&mut buf)?;
@ -70,11 +74,13 @@ pub struct TandemPayload<P1, P2> {
} }
impl<P1, P2> TandemPayload<P1, P2> { impl<P1, P2> TandemPayload<P1, P2> {
#[inline]
pub fn new(load1: P1, load2: P2) -> Self { pub fn new(load1: P1, load2: P2) -> Self {
Self { load1, load2 } Self { load1, load2 }
} }
/// Returns both payload stored in the tandem payload /// Returns both payload stored in the tandem payload
#[inline]
pub fn into_inner(self) -> (P1, P2) { pub fn into_inner(self) -> (P1, P2) {
(self.load1, self.load2) (self.load1, self.load2)
} }
@ -141,10 +147,12 @@ mod serde_payload {
impl<T> SerdePayload<T> { impl<T> SerdePayload<T> {
/// Creates a new serde payload with a specified serializer /// Creates a new serde payload with a specified serializer
#[inline]
pub fn new(serializer: DynamicSerializer, data: T) -> Self { pub fn new(serializer: DynamicSerializer, data: T) -> Self {
Self { serializer, data } Self { serializer, data }
} }
#[inline]
pub fn data(self) -> T { pub fn data(self) -> T {
self.data self.data
} }
@ -172,6 +180,7 @@ mod serde_payload {
} }
impl<T: Serialize> IntoPayload for SerdePayload<T> { impl<T: Serialize> IntoPayload for SerdePayload<T> {
#[inline]
fn into_payload(self, _: &Context) -> IPCResult<Vec<u8>> { fn into_payload(self, _: &Context) -> IPCResult<Vec<u8>> {
self.try_into_bytes() self.try_into_bytes()
} }
@ -188,12 +197,14 @@ mod serde_payload {
} }
impl<T: Serialize> IntoPayload for T { impl<T: Serialize> IntoPayload for T {
#[inline]
fn into_payload(self, ctx: &Context) -> IPCResult<Vec<u8>> { fn into_payload(self, ctx: &Context) -> IPCResult<Vec<u8>> {
ctx.create_serde_payload(self).into_payload(&ctx) ctx.create_serde_payload(self).into_payload(&ctx)
} }
} }
impl<T: DeserializeOwned> FromPayload for T { impl<T: DeserializeOwned> FromPayload for T {
#[inline]
fn from_payload<R: Read>(reader: R) -> IPCResult<Self> { fn from_payload<R: Read>(reader: R) -> IPCResult<Self> {
let serde_payload = SerdePayload::<Self>::from_payload(reader)?; let serde_payload = SerdePayload::<Self>::from_payload(reader)?;

@ -90,6 +90,7 @@ impl DynamicSerializer {
} }
} }
#[inline]
pub fn from_primitive(num: usize) -> SerializationResult<Self> { pub fn from_primitive(num: usize) -> SerializationResult<Self> {
match num { match num {
#[cfg(feature = "serialize_rmp")] #[cfg(feature = "serialize_rmp")]

@ -3,12 +3,14 @@ use serde::de::DeserializeOwned;
use serde::Serialize; use serde::Serialize;
use std::io::Read; use std::io::Read;
#[inline]
pub fn serialize<T: Serialize>(data: T) -> SerializationResult<Vec<u8>> { pub fn serialize<T: Serialize>(data: T) -> SerializationResult<Vec<u8>> {
let bytes = bincode::serialize(&data)?; let bytes = bincode::serialize(&data)?;
Ok(bytes) Ok(bytes)
} }
#[inline]
pub fn deserialize<R: Read, T: DeserializeOwned>(reader: R) -> SerializationResult<T> { pub fn deserialize<R: Read, T: DeserializeOwned>(reader: R) -> SerializationResult<T> {
let type_data = bincode::deserialize_from(reader)?; let type_data = bincode::deserialize_from(reader)?;
Ok(type_data) Ok(type_data)

@ -3,12 +3,14 @@ use serde::de::DeserializeOwned;
use serde::Serialize; use serde::Serialize;
use std::io::Read; use std::io::Read;
#[inline]
pub fn serialize<T: Serialize>(data: T) -> SerializationResult<Vec<u8>> { pub fn serialize<T: Serialize>(data: T) -> SerializationResult<Vec<u8>> {
let bytes = serde_json::to_vec(&data)?; let bytes = serde_json::to_vec(&data)?;
Ok(bytes) Ok(bytes)
} }
#[inline]
pub fn deserialize<R: Read, T: DeserializeOwned>(reader: R) -> SerializationResult<T> { pub fn deserialize<R: Read, T: DeserializeOwned>(reader: R) -> SerializationResult<T> {
let type_data = serde_json::from_reader(reader)?; let type_data = serde_json::from_reader(reader)?;

@ -3,12 +3,14 @@ use serde::de::DeserializeOwned;
use serde::Serialize; use serde::Serialize;
use std::io::Read; use std::io::Read;
#[inline]
pub fn serialize<T: Serialize>(data: T) -> SerializationResult<Vec<u8>> { pub fn serialize<T: Serialize>(data: T) -> SerializationResult<Vec<u8>> {
let bytes = postcard::to_allocvec(&data)?.to_vec(); let bytes = postcard::to_allocvec(&data)?.to_vec();
Ok(bytes) Ok(bytes)
} }
#[inline]
pub fn deserialize<R: Read, T: DeserializeOwned>(mut reader: R) -> SerializationResult<T> { pub fn deserialize<R: Read, T: DeserializeOwned>(mut reader: R) -> SerializationResult<T> {
let mut buf = Vec::new(); let mut buf = Vec::new();
// reading to end means reading the full size of the provided data // reading to end means reading the full size of the provided data

@ -3,12 +3,14 @@ use serde::de::DeserializeOwned;
use serde::Serialize; use serde::Serialize;
use std::io::Read; use std::io::Read;
#[inline]
pub fn serialize<T: Serialize>(data: T) -> SerializationResult<Vec<u8>> { pub fn serialize<T: Serialize>(data: T) -> SerializationResult<Vec<u8>> {
let bytes = rmp_serde::to_vec(&data)?; let bytes = rmp_serde::to_vec(&data)?;
Ok(bytes) Ok(bytes)
} }
#[inline]
pub fn deserialize<R: Read, T: DeserializeOwned>(reader: R) -> SerializationResult<T> { pub fn deserialize<R: Read, T: DeserializeOwned>(reader: R) -> SerializationResult<T> {
let type_data = rmp_serde::from_read(reader)?; let type_data = rmp_serde::from_read(reader)?;
Ok(type_data) Ok(type_data)

@ -1,21 +1,22 @@
use crate::error::{Error, Result};
use crate::event::Event;
use crate::ipc::stream_emitter::{EmitMetadata, StreamEmitter};
use futures::future;
use futures::future::Either;
use std::collections::HashMap; use std::collections::HashMap;
use std::mem; use std::mem;
use std::ops::{Deref, DerefMut}; use std::ops::{Deref, DerefMut};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc; use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use futures::future;
use futures::future::Either;
use tokio::sync::{Mutex, oneshot, RwLock};
use tokio::sync::oneshot::Sender; use tokio::sync::oneshot::Sender;
use tokio::sync::{oneshot, Mutex, RwLock};
use tokio::time::Duration; use tokio::time::Duration;
use typemap_rev::TypeMap; use typemap_rev::TypeMap;
use crate::payload::IntoPayload; use crate::error::{Error, Result};
use crate::event::Event;
use crate::ipc::stream_emitter::{EmitMetadata, StreamEmitter};
#[cfg(feature = "serialize")] #[cfg(feature = "serialize")]
use crate::payload::{DynamicSerializer, SerdePayload}; use crate::payload::{DynamicSerializer, SerdePayload};
use crate::payload::IntoPayload;
pub(crate) type ReplyListeners = Arc<Mutex<HashMap<u64, oneshot::Sender<Event>>>>; pub(crate) type ReplyListeners = Arc<Mutex<HashMap<u64, oneshot::Sender<Event>>>>;
@ -108,18 +109,26 @@ impl Context {
} }
/// Waits for a reply to the given message ID /// Waits for a reply to the given message ID
#[inline]
#[tracing::instrument(level = "debug", skip(self))] #[tracing::instrument(level = "debug", skip(self))]
pub async fn await_reply(&self, message_id: u64) -> Result<Event> { pub async fn await_reply(&self, message_id: u64) -> Result<Event> {
self.await_reply_with_timeout(message_id, self.reply_timeout.to_owned()).await
}
/// Waits for a reply to the given Message ID with a given timeout
#[tracing::instrument(level = "debug", skip(self))]
pub async fn await_reply_with_timeout(&self, message_id: u64, timeout: Duration) -> Result<Event> {
let (rx, tx) = oneshot::channel(); let (rx, tx) = oneshot::channel();
{ {
let mut listeners = self.reply_listeners.lock().await; let mut listeners = self.reply_listeners.lock().await;
listeners.insert(message_id, rx); listeners.insert(message_id, rx);
} }
let result = future::select( let result = future::select(
Box::pin(tx), Box::pin(tx),
Box::pin(tokio::time::sleep(self.reply_timeout)), Box::pin(tokio::time::sleep(timeout)),
) )
.await; .await;
let event = match result { let event = match result {
Either::Left((tx_result, _)) => Ok(tx_result?), Either::Left((tx_result, _)) => Ok(tx_result?),
@ -145,16 +154,19 @@ impl Context {
} }
#[cfg(feature = "serialize")] #[cfg(feature = "serialize")]
#[inline]
pub fn create_serde_payload<T>(&self, data: T) -> SerdePayload<T> { pub fn create_serde_payload<T>(&self, data: T) -> SerdePayload<T> {
SerdePayload::new(self.default_serializer.clone(), data) SerdePayload::new(self.default_serializer.clone(), data)
} }
/// Returns the channel for a reply to the given message id /// Returns the channel for a reply to the given message id
#[inline]
pub(crate) async fn get_reply_sender(&self, ref_id: u64) -> Option<oneshot::Sender<Event>> { pub(crate) async fn get_reply_sender(&self, ref_id: u64) -> Option<oneshot::Sender<Event>> {
let mut listeners = self.reply_listeners.lock().await; let mut listeners = self.reply_listeners.lock().await;
listeners.remove(&ref_id) listeners.remove(&ref_id)
} }
#[inline]
pub(crate) fn set_ref_id(&mut self, id: Option<u64>) { pub(crate) fn set_ref_id(&mut self, id: Option<u64>) {
self.ref_id = id; self.ref_id = id;
} }
@ -166,37 +178,40 @@ pub struct PooledContext {
} }
pub struct PoolGuard<T> pub struct PoolGuard<T>
where where
T: Clone, T: Clone,
{ {
inner: T, inner: T,
count: Arc<AtomicUsize>, count: Arc<AtomicUsize>,
} }
impl<T> Deref for PoolGuard<T> impl<T> Deref for PoolGuard<T>
where where
T: Clone, T: Clone,
{ {
type Target = T; type Target = T;
#[inline]
fn deref(&self) -> &Self::Target { fn deref(&self) -> &Self::Target {
&self.inner &self.inner
} }
} }
impl<T> DerefMut for PoolGuard<T> impl<T> DerefMut for PoolGuard<T>
where where
T: Clone, T: Clone,
{ {
#[inline]
fn deref_mut(&mut self) -> &mut Self::Target { fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.inner &mut self.inner
} }
} }
impl<T> Clone for PoolGuard<T> impl<T> Clone for PoolGuard<T>
where where
T: Clone, T: Clone,
{ {
#[inline]
fn clone(&self) -> Self { fn clone(&self) -> Self {
self.acquire(); self.acquire();
@ -208,17 +223,18 @@ where
} }
impl<T> Drop for PoolGuard<T> impl<T> Drop for PoolGuard<T>
where where
T: Clone, T: Clone,
{ {
#[inline]
fn drop(&mut self) { fn drop(&mut self) {
self.release(); self.release();
} }
} }
impl<T> PoolGuard<T> impl<T> PoolGuard<T>
where where
T: Clone, T: Clone,
{ {
pub(crate) fn new(inner: T) -> Self { pub(crate) fn new(inner: T) -> Self {
Self { Self {
@ -228,6 +244,7 @@ where
} }
/// Acquires the context by adding 1 to the count /// Acquires the context by adding 1 to the count
#[inline]
#[tracing::instrument(level = "trace", skip_all)] #[tracing::instrument(level = "trace", skip_all)]
pub(crate) fn acquire(&self) { pub(crate) fn acquire(&self) {
let count = self.count.fetch_add(1, Ordering::Relaxed); let count = self.count.fetch_add(1, Ordering::Relaxed);
@ -235,12 +252,14 @@ where
} }
/// Releases the connection by subtracting from the stored count /// Releases the connection by subtracting from the stored count
#[inline]
#[tracing::instrument(level = "trace", skip_all)] #[tracing::instrument(level = "trace", skip_all)]
pub(crate) fn release(&self) { pub(crate) fn release(&self) {
let count = self.count.fetch_sub(1, Ordering::Relaxed); let count = self.count.fetch_sub(1, Ordering::Relaxed);
tracing::trace!(count); tracing::trace!(count);
} }
#[inline]
pub(crate) fn count(&self) -> usize { pub(crate) fn count(&self) -> usize {
self.count.load(Ordering::Relaxed) self.count.load(Ordering::Relaxed)
} }
@ -256,6 +275,7 @@ impl PooledContext {
/// Acquires a context from the pool /// Acquires a context from the pool
/// It always chooses the one that is used the least /// It always chooses the one that is used the least
#[inline]
#[tracing::instrument(level = "trace", skip_all)] #[tracing::instrument(level = "trace", skip_all)]
pub fn acquire(&self) -> PoolGuard<Context> { pub fn acquire(&self) -> PoolGuard<Context> {
self.contexts self.contexts

@ -5,6 +5,7 @@ use crate::ipc::context::Context;
use crate::protocol::AsyncProtocolStream; use crate::protocol::AsyncProtocolStream;
use std::ops::DerefMut; use std::ops::DerefMut;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration;
use tokio::io::{AsyncWrite, AsyncWriteExt}; use tokio::io::{AsyncWrite, AsyncWriteExt};
use tokio::sync::Mutex; use tokio::sync::Mutex;
@ -50,6 +51,7 @@ impl StreamEmitter {
} }
/// Emits an event /// Emits an event
#[inline]
pub(crate) async fn emit<S: AsRef<str>>( pub(crate) async fn emit<S: AsRef<str>>(
&self, &self,
event: S, event: S,
@ -59,6 +61,7 @@ impl StreamEmitter {
} }
/// Emits an event to a specific namespace /// Emits an event to a specific namespace
#[inline]
pub(crate) async fn emit_to<S1: AsRef<str>, S2: AsRef<str>>( pub(crate) async fn emit_to<S1: AsRef<str>, S2: AsRef<str>>(
&self, &self,
namespace: S1, namespace: S1,
@ -70,6 +73,7 @@ impl StreamEmitter {
} }
/// Emits a response to an event /// Emits a response to an event
#[inline]
pub(crate) async fn emit_response<S: AsRef<str>>( pub(crate) async fn emit_response<S: AsRef<str>>(
&self, &self,
event_id: u64, event_id: u64,
@ -81,6 +85,7 @@ impl StreamEmitter {
} }
/// Emits a response to an event to a namespace /// Emits a response to an event to a namespace
#[inline]
pub(crate) async fn emit_response_to<S1: AsRef<str>, S2: AsRef<str>>( pub(crate) async fn emit_response_to<S1: AsRef<str>, S2: AsRef<str>>(
&self, &self,
event_id: u64, event_id: u64,
@ -102,22 +107,37 @@ impl StreamEmitter {
/// This object can be used to wait for a response to an event. /// This object can be used to wait for a response to an event.
pub struct EmitMetadata { pub struct EmitMetadata {
message_id: u64, message_id: u64,
timeout: Option<Duration>
} }
impl EmitMetadata { impl EmitMetadata {
#[inline]
pub(crate) fn new(message_id: u64) -> Self { pub(crate) fn new(message_id: u64) -> Self {
Self { message_id } Self { message_id, timeout: None }
} }
/// The ID of the emitted message /// The ID of the emitted message
#[inline]
pub fn message_id(&self) -> u64 { pub fn message_id(&self) -> u64 {
self.message_id self.message_id
} }
/// Sets a timeout for awaiting replies to this emitted event
#[inline]
pub fn with_timeout(mut self, timeout: Duration) -> Self {
self.timeout = Some(timeout);
self
}
/// Waits for a reply to the given message. /// Waits for a reply to the given message.
#[tracing::instrument(skip(self, ctx), fields(self.message_id))] #[tracing::instrument(skip(self, ctx), fields(self.message_id))]
pub async fn await_reply(&self, ctx: &Context) -> Result<Event> { pub async fn await_reply(&self, ctx: &Context) -> Result<Event> {
let reply = ctx.await_reply(self.message_id).await?; let reply = if let Some(timeout) = self.timeout {
ctx.await_reply_with_timeout(self.message_id, timeout.clone()).await?
} else {
ctx.await_reply(self.message_id).await?
};
if reply.name() == ERROR_EVENT_NAME { if reply.name() == ERROR_EVENT_NAME {
Err(reply.payload::<ErrorEventData>()?.into()) Err(reply.payload::<ErrorEventData>()?.into())
} else { } else {

@ -83,6 +83,7 @@ async fn it_receives_error_responses() {
.emit("create_error", EmptyPayload) .emit("create_error", EmptyPayload)
.await .await
.unwrap() .unwrap()
.with_timeout(Duration::from_millis(100))
.await_reply(&ctx) .await_reply(&ctx)
.await; .await;

@ -1,48 +1,61 @@
#[cfg(feature = "serialize")] #[cfg(feature = "serialize")]
use bromine::prelude::*; use bromine::prelude::*;
#[cfg(feature = "serialize")]
use serde::{de::DeserializeOwned, Serialize};
use std::fmt::Debug;
#[cfg(feature = "serialize_rmp")] #[cfg(feature = "serialize_rmp")]
#[test] #[test]
fn it_serializes_messagepack() { fn it_serializes_messagepack() {
test_serialization(DynamicSerializer::Messagepack) test_serialization::<BigTestPayload>(DynamicSerializer::Messagepack).unwrap();
test_serialization::<AdjacentlyTaggedEnum>(DynamicSerializer::Messagepack).unwrap();
test_serialization::<u128>(DynamicSerializer::Messagepack).unwrap_err();
} }
#[cfg(feature = "serialize_bincode")] #[cfg(feature = "serialize_bincode")]
#[test] #[test]
fn it_serializes_bincode() { fn it_serializes_bincode() {
test_serialization(DynamicSerializer::Bincode) test_serialization::<BigTestPayload>(DynamicSerializer::Bincode).unwrap();
test_serialization::<AdjacentlyTaggedEnum>(DynamicSerializer::Bincode).unwrap_err();
test_serialization::<u128>(DynamicSerializer::Bincode).unwrap();
} }
#[cfg(feature = "serialize_postcard")] #[cfg(feature = "serialize_postcard")]
#[test] #[test]
fn it_serializes_postcard() { fn it_serializes_postcard() {
test_serialization(DynamicSerializer::Postcard) test_serialization::<BigTestPayload>(DynamicSerializer::Postcard).unwrap();
test_serialization::<AdjacentlyTaggedEnum>(DynamicSerializer::Postcard).unwrap_err();
test_serialization::<u128>(DynamicSerializer::Postcard).unwrap();
} }
#[cfg(feature = "serialize_json")] #[cfg(feature = "serialize_json")]
#[test] #[test]
fn it_serializes_json() { fn it_serializes_json() {
test_serialization(DynamicSerializer::Json) test_serialization::<BigTestPayload>(DynamicSerializer::Json).unwrap();
test_serialization::<AdjacentlyTaggedEnum>(DynamicSerializer::Json).unwrap();
test_serialization::<u128>(DynamicSerializer::Json).unwrap();
} }
#[cfg(feature = "serialize")] #[cfg(feature = "serialize")]
fn test_serialization(serializer: DynamicSerializer) { fn test_serialization<D: Default + Serialize + DeserializeOwned + Clone + Eq + Debug>(
let test_payload = get_test_payload(serializer); serializer: DynamicSerializer,
let payload_bytes = test_payload.clone().try_into_bytes().unwrap(); ) -> IPCResult<()> {
let payload = TestSerdePayload::from_payload(&payload_bytes[..]).unwrap(); let test_payload = SerdePayload::new(serializer, D::default());
assert_eq!(payload.data(), test_payload.data()) let payload_bytes = test_payload.clone().try_into_bytes()?;
let payload = SerdePayload::<D>::from_payload(&payload_bytes[..])?;
assert_eq!(payload.data(), test_payload.data());
Ok(())
} }
#[cfg(feature = "serialize")] #[cfg(feature = "serialize")]
pub mod payload { pub mod payload {
use bromine::payload::{DynamicSerializer, SerdePayload};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::collections::HashMap; use std::collections::HashMap;
pub type TestSerdePayload = SerdePayload<TestPayload>;
#[derive(Clone, Serialize, Deserialize, Eq, PartialEq, Debug)] #[derive(Clone, Serialize, Deserialize, Eq, PartialEq, Debug)]
pub struct TestPayload { pub struct BigTestPayload {
items: Vec<u64>, items: Vec<u64>,
variant: TestPayloadEnum, variant: TestPayloadEnum,
string: String, string: String,
@ -57,20 +70,34 @@ pub mod payload {
Third(usize), Third(usize),
} }
pub fn get_test_payload(serializer: DynamicSerializer) -> SerdePayload<TestPayload> { #[derive(Clone, Serialize, Deserialize, Eq, PartialEq, Debug)]
let mut maps = HashMap::new(); #[serde(tag = "variant", content = "data")]
maps.insert("Hello".to_string(), 12); pub enum AdjacentlyTaggedEnum {
Variant1(u64),
Variant2(String),
Variant3(Vec<u8>),
}
impl Default for AdjacentlyTaggedEnum {
fn default() -> Self {
Self::Variant3(vec![0, 1, 2])
}
}
maps.insert("Wäüörld".to_string(), -12380); impl Default for BigTestPayload {
let inner_payload = TestPayload { fn default() -> Self {
items: vec![0u64, 12452u64, u64::MAX], let mut maps = HashMap::new();
variant: TestPayloadEnum::Third(12), maps.insert("Hello".to_string(), 12);
string: String::from("Hello World ſð"), maps.insert("Wäüörld".to_string(), -12380);
signed: -12,
maps,
};
SerdePayload::new(serializer, inner_payload) BigTestPayload {
items: vec![0u64, 12452u64, u64::MAX],
variant: TestPayloadEnum::Third(12),
string: String::from("Hello World ſð"),
signed: -12,
maps,
}
}
} }
} }
#[cfg(feature = "serialize")] #[cfg(feature = "serialize")]

Loading…
Cancel
Save