Add event based timeout support

Signed-off-by: Trivernis <trivernis@protonmail.com>
pull/29/head
Trivernis 2 years ago
parent 2a9426badc
commit 248fb403d5
No known key found for this signature in database
GPG Key ID: EB543D89E02BC83F

103
Cargo.lock generated

@ -13,9 +13,9 @@ dependencies = [
[[package]]
name = "async-trait"
version = "0.1.51"
version = "0.1.52"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "44318e776df68115a881de9a8fd1b9e53368d7a4a5ce4cc48517da3393233a5e"
checksum = "061a7acccaa286c011ddc30970520b98fa40e00c9d644633fb26b5fc63a265e3"
dependencies = [
"proc-macro2",
"quote",
@ -282,7 +282,7 @@ checksum = "22813a6dc45b335f9bade10bf7271dc477e81113e89eb251a0bc2a8a81c536e1"
dependencies = [
"bstr",
"csv-core",
"itoa",
"itoa 0.4.8",
"ryu",
"serde",
]
@ -314,9 +314,9 @@ dependencies = [
[[package]]
name = "futures"
version = "0.3.18"
version = "0.3.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8cd0210d8c325c245ff06fd95a3b13689a1a276ac8cfa8e8720cb840bfb84b9e"
checksum = "28560757fe2bb34e79f907794bb6b22ae8b0e5c669b638a1132f2592b19035b4"
dependencies = [
"futures-channel",
"futures-core",
@ -329,9 +329,9 @@ dependencies = [
[[package]]
name = "futures-channel"
version = "0.3.18"
version = "0.3.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7fc8cd39e3dbf865f7340dce6a2d401d24fd37c6fe6c4f0ee0de8bfca2252d27"
checksum = "ba3dda0b6588335f360afc675d0564c17a77a2bda81ca178a4b6081bd86c7f0b"
dependencies = [
"futures-core",
"futures-sink",
@ -339,15 +339,15 @@ dependencies = [
[[package]]
name = "futures-core"
version = "0.3.18"
version = "0.3.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "629316e42fe7c2a0b9a65b47d159ceaa5453ab14e8f0a3c5eedbb8cd55b4a445"
checksum = "d0c8ff0461b82559810cdccfde3215c3f373807f5e5232b71479bff7bb2583d7"
[[package]]
name = "futures-executor"
version = "0.3.18"
version = "0.3.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b808bf53348a36cab739d7e04755909b9fcaaa69b7d7e588b37b6ec62704c97"
checksum = "29d6d2ff5bb10fb95c85b8ce46538a2e5f5e7fdc755623a7d4529ab8a4ed9d2a"
dependencies = [
"futures-core",
"futures-task",
@ -356,15 +356,15 @@ dependencies = [
[[package]]
name = "futures-io"
version = "0.3.18"
version = "0.3.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e481354db6b5c353246ccf6a728b0c5511d752c08da7260546fc0933869daa11"
checksum = "b1f9d34af5a1aac6fb380f735fe510746c38067c5bf16c7fd250280503c971b2"
[[package]]
name = "futures-macro"
version = "0.3.18"
version = "0.3.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a89f17b21645bc4ed773c69af9c9a0effd4a3f1a3876eadd453469f8854e7fdd"
checksum = "6dbd947adfffb0efc70599b3ddcf7b5597bb5fa9e245eb99f62b3a5f7bb8bd3c"
dependencies = [
"proc-macro2",
"quote",
@ -373,21 +373,21 @@ dependencies = [
[[package]]
name = "futures-sink"
version = "0.3.18"
version = "0.3.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "996c6442437b62d21a32cd9906f9c41e7dc1e19a9579843fad948696769305af"
checksum = "e3055baccb68d74ff6480350f8d6eb8fcfa3aa11bdc1a1ae3afdd0514617d508"
[[package]]
name = "futures-task"
version = "0.3.18"
version = "0.3.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dabf1872aaab32c886832f2276d2f5399887e2bd613698a02359e4ea83f8de12"
checksum = "6ee7c6485c30167ce4dfb83ac568a849fe53274c831081476ee13e0dce1aad72"
[[package]]
name = "futures-util"
version = "0.3.18"
version = "0.3.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "41d22213122356472061ac0f1ab2cee28d2bac8491410fd68c2af53d1cedb83e"
checksum = "d9b5cf40b47a271f77a8b1bec03ca09044d99d2372c0de244e66430761127164"
dependencies = [
"futures-channel",
"futures-core",
@ -418,9 +418,9 @@ dependencies = [
[[package]]
name = "heapless"
version = "0.7.8"
version = "0.7.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c1ad878e07405df82b695089e63d278244344f80e764074d0bdfe99b89460f3"
checksum = "7e476c64197665c3725621f0ac3f9e5209aa5e889e02a08b1daf5f16dc5fd952"
dependencies = [
"atomic-polyfill",
"hash32",
@ -440,9 +440,9 @@ dependencies = [
[[package]]
name = "itertools"
version = "0.10.1"
version = "0.10.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69ddb889f9d0d08a67338271fa9b62996bc788c7796a5c18cf057420aaed5eaf"
checksum = "a9a9d19fa1e79b6215ff29b9d6880b706147f16e9b1dbb1e4e5947b5b02bc5e3"
dependencies = [
"either",
]
@ -453,6 +453,12 @@ version = "0.4.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b71991ff56294aa922b450139ee08b3bfc70982c6b2c7562771375cf73542dd4"
[[package]]
name = "itoa"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1aab8fc367588b89dcee83ab0fd66b72b50b72fa1904d7095045ace2b0c81c35"
[[package]]
name = "js-sys"
version = "0.3.55"
@ -470,9 +476,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
[[package]]
name = "libc"
version = "0.2.108"
version = "0.2.112"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8521a1b57e76b1ec69af7599e75e38e7b7fad6610f037db8c79b127201b5d119"
checksum = "1b03d17f364a3a042d5e5d46b053bbbf82c92c9430c592dd4c064dc6ee997125"
[[package]]
name = "lock_api"
@ -500,9 +506,9 @@ checksum = "308cc39be01b73d0d18f82a0e7b2a3df85245f84af96fdddc5d202d27e47b86a"
[[package]]
name = "memoffset"
version = "0.6.4"
version = "0.6.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "59accc507f1338036a0477ef61afdae33cde60840f4dfe481319ce3ad116ddf9"
checksum = "5aa361d4faea93603064a027415f07bd8e1d5c88c9fbf68bf56a285428fd79ce"
dependencies = [
"autocfg",
]
@ -564,9 +570,9 @@ dependencies = [
[[package]]
name = "num_cpus"
version = "1.13.0"
version = "1.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "05499f3756671c15885fee9034446956fff3f243d6077b91e5767df161f766b3"
checksum = "19e64526ebdee182341572e50e9ad03965aa510cd94427a4549448f285e957a1"
dependencies = [
"hermit-abi",
"libc",
@ -637,9 +643,9 @@ checksum = "7c68cb38ed13fd7bc9dd5db8f165b7c8d9c1a315104083a2b10f11354c2af97f"
[[package]]
name = "proc-macro2"
version = "1.0.32"
version = "1.0.35"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba508cc11742c0dc5c1659771673afbab7a0efab23aa17e854cbab0837ed0b43"
checksum = "392a54546fda6b7cc663379d0e6ce8b324cf88aecc5a499838e1be9781bdce2e"
dependencies = [
"unicode-xid",
]
@ -763,9 +769,9 @@ dependencies = [
[[package]]
name = "ryu"
version = "1.0.6"
version = "1.0.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3c9613b5a66ab9ba26415184cfc41156594925a9cf3a2057e57f31ff145f6568"
checksum = "73b4b750c782965c211b42f022f59af1fbceabdd026623714f104152f1ec149f"
[[package]]
name = "same-file"
@ -805,9 +811,9 @@ checksum = "388a1df253eca08550bef6c72392cfe7c30914bf41df5269b68cbd6ff8f570a3"
[[package]]
name = "serde"
version = "1.0.130"
version = "1.0.132"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f12d06de37cf59146fbdecab66aa99f9fe4f78722e3607577a5375d66bd0c913"
checksum = "8b9875c23cf305cd1fd7eb77234cbb705f21ea6a72c637a5c6db5fe4b8e7f008"
dependencies = [
"serde_derive",
]
@ -824,9 +830,9 @@ dependencies = [
[[package]]
name = "serde_derive"
version = "1.0.130"
version = "1.0.132"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d7bc1a1ab1961464eae040d96713baa5a724a8152c1222492465b54322ec508b"
checksum = "ecc0db5cb2556c0e558887d9bbdcf6ac4471e83ff66cf696e5419024d1606276"
dependencies = [
"proc-macro2",
"quote",
@ -835,11 +841,11 @@ dependencies = [
[[package]]
name = "serde_json"
version = "1.0.72"
version = "1.0.73"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d0ffa0837f2dfa6fb90868c2b5468cad482e175f7dad97e7421951e663f2b527"
checksum = "bcbd0344bc6533bc7ec56df11d42fb70f1b912351c0825ccb7211b59d8af7cf5"
dependencies = [
"itoa",
"itoa 1.0.1",
"ryu",
"serde",
]
@ -867,9 +873,9 @@ checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3"
[[package]]
name = "syn"
version = "1.0.82"
version = "1.0.84"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8daf5dd0bb60cbd4137b1b587d2fc0ae729bc07cf01cd70b36a1ed5ade3b9d59"
checksum = "ecb2e6da8ee5eb9a61068762a32fa9619cc591ceb055b3687f4cd4051ec2e06b"
dependencies = [
"proc-macro2",
"quote",
@ -917,11 +923,10 @@ dependencies = [
[[package]]
name = "tokio"
version = "1.14.0"
version = "1.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "70e992e41e0d2fb9f755b37446f20900f64446ef54874f40a60c78f021ac6144"
checksum = "fbbf1c778ec206785635ce8ad57fe52b3009ae9e0c9f574a728f3049d3e55838"
dependencies = [
"autocfg",
"bytes",
"libc",
"memchr",
@ -934,9 +939,9 @@ dependencies = [
[[package]]
name = "tokio-macros"
version = "1.6.0"
version = "1.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c9efc1aba077437943f7515666aa2b882dfabfbfdf89c819ea75a8d6e9eaba5e"
checksum = "b557f72f448c511a979e2564e55d74e6c4432fc96ff4f6241bc6bded342643b7"
dependencies = [
"proc-macro2",
"quote",

@ -25,19 +25,19 @@ tracing = "0.1.29"
lazy_static = "1.4.0"
typemap_rev = "0.1.5"
byteorder = "1.4.3"
async-trait = "0.1.51"
futures = "0.3.17"
async-trait = "0.1.52"
futures = "0.3.19"
rmp-serde = {version = "0.15.5", optional = true}
bincode = {version = "1.3.3", optional = true}
serde_json = {version = "1.0.72", optional = true}
serde_json = {version = "1.0.73", optional = true}
[dependencies.serde]
optional = true
version = "1.0.130"
version = "1.0.132"
features = []
[dependencies.tokio]
version = "1.12.0"
version = "1.15.0"
features = ["net", "io-std", "io-util", "sync", "time"]
[dependencies.postcard]
@ -46,11 +46,11 @@ optional = true
features = ["alloc"]
[dev-dependencies]
rmp-serde = "0.15.4"
rmp-serde = "0.15.5"
crossbeam-utils = "0.8.5"
[dev-dependencies.serde]
version = "1.0.130"
version = "1.0.132"
features = ["serde_derive"]
[dev-dependencies.criterion]
@ -58,7 +58,7 @@ version = "0.3.5"
features = ["async_tokio", "html_reports"]
[dev-dependencies.tokio]
version = "1.12.0"
version = "1.15.0"
features = ["macros", "rt-multi-thread"]
[features]

@ -56,17 +56,20 @@ impl Event {
}
/// The identifier of the message
#[inline]
pub fn id(&self) -> u64 {
self.header.id
}
/// The ID of the message referenced by this message.
/// It represents the message that is replied to and can be None.
#[inline]
pub fn reference_id(&self) -> Option<u64> {
self.header.ref_id.clone()
}
/// Decodes the payload to the given type implementing the receive payload trait
#[inline]
#[tracing::instrument(level = "trace", skip(self))]
pub fn payload<T: FromPayload>(&self) -> Result<T> {
let payload = T::from_payload(&self.data[..])?;
@ -75,16 +78,19 @@ impl Event {
}
/// Returns a reference of the underlying data
#[inline]
pub fn data_raw(&self) -> &[u8] {
&self.data
}
/// Returns a reference to the namespace
#[inline]
pub fn namespace(&self) -> &Option<String> {
&self.header.namespace
}
/// Returns the name of the event
#[inline]
pub fn name(&self) -> &str {
&self.header.name
}
@ -172,6 +178,7 @@ impl EventHeader {
}
/// Reads and validates the format version
#[inline]
fn read_version<R: Read>(reader: &mut R) -> Result<Vec<u8>> {
let mut version = vec![0u8; 3];
reader.read_exact(&mut version)?;
@ -184,6 +191,7 @@ impl EventHeader {
}
/// Reads the reference event id
#[inline]
fn read_ref_id<R: Read>(reader: &mut R) -> Result<Option<u64>> {
let ref_id_exists = reader.read_u8()?;
let ref_id = match ref_id_exists {
@ -196,6 +204,7 @@ impl EventHeader {
}
/// Reads the name of the event
#[inline]
fn read_name<R: Read>(reader: &mut R) -> Result<String> {
let name_len = reader.read_u16::<BigEndian>()?;
@ -203,6 +212,7 @@ impl EventHeader {
}
/// Reads the namespace of the event
#[inline]
fn read_namespace<R: Read>(reader: &mut R, namespace_len: u16) -> Result<Option<String>> {
let namespace = if namespace_len > 0 {
Some(Self::read_string(reader, namespace_len as usize)?)
@ -213,6 +223,7 @@ impl EventHeader {
Ok(namespace)
}
#[inline]
fn read_string<R: Read>(reader: &mut R, length: usize) -> Result<String> {
let mut string_buf = vec![0u8; length];
reader.read_exact(&mut string_buf)?;

@ -54,6 +54,7 @@ impl EventHandler {
}
/// Handles a received event
#[inline]
#[tracing::instrument(level = "debug", skip(self, ctx, event))]
pub async fn handle_event(&self, ctx: &Context, event: Event) -> Result<()> {
if let Some(cb) = self.callbacks.get(event.name()) {

@ -35,23 +35,27 @@ pub struct BytePayload {
}
impl BytePayload {
#[inline]
pub fn new(bytes: Vec<u8>) -> Self {
Self { bytes }
}
/// Returns the bytes of the payload
#[inline]
pub fn into_inner(self) -> Vec<u8> {
self.bytes
}
}
impl IntoPayload for BytePayload {
#[inline]
fn into_payload(self, _: &Context) -> IPCResult<Vec<u8>> {
Ok(self.bytes)
}
}
impl FromPayload for BytePayload {
#[inline]
fn from_payload<R: Read>(mut reader: R) -> IPCResult<Self> {
let mut buf = Vec::new();
reader.read_to_end(&mut buf)?;
@ -70,11 +74,13 @@ pub struct TandemPayload<P1, P2> {
}
impl<P1, P2> TandemPayload<P1, P2> {
#[inline]
pub fn new(load1: P1, load2: P2) -> Self {
Self { load1, load2 }
}
/// Returns both payload stored in the tandem payload
#[inline]
pub fn into_inner(self) -> (P1, P2) {
(self.load1, self.load2)
}
@ -141,10 +147,12 @@ mod serde_payload {
impl<T> SerdePayload<T> {
/// Creates a new serde payload with a specified serializer
#[inline]
pub fn new(serializer: DynamicSerializer, data: T) -> Self {
Self { serializer, data }
}
#[inline]
pub fn data(self) -> T {
self.data
}
@ -172,6 +180,7 @@ mod serde_payload {
}
impl<T: Serialize> IntoPayload for SerdePayload<T> {
#[inline]
fn into_payload(self, _: &Context) -> IPCResult<Vec<u8>> {
self.try_into_bytes()
}
@ -188,12 +197,14 @@ mod serde_payload {
}
impl<T: Serialize> IntoPayload for T {
#[inline]
fn into_payload(self, ctx: &Context) -> IPCResult<Vec<u8>> {
ctx.create_serde_payload(self).into_payload(&ctx)
}
}
impl<T: DeserializeOwned> FromPayload for T {
#[inline]
fn from_payload<R: Read>(reader: R) -> IPCResult<Self> {
let serde_payload = SerdePayload::<Self>::from_payload(reader)?;

@ -90,6 +90,7 @@ impl DynamicSerializer {
}
}
#[inline]
pub fn from_primitive(num: usize) -> SerializationResult<Self> {
match num {
#[cfg(feature = "serialize_rmp")]

@ -3,12 +3,14 @@ use serde::de::DeserializeOwned;
use serde::Serialize;
use std::io::Read;
#[inline]
pub fn serialize<T: Serialize>(data: T) -> SerializationResult<Vec<u8>> {
let bytes = bincode::serialize(&data)?;
Ok(bytes)
}
#[inline]
pub fn deserialize<R: Read, T: DeserializeOwned>(reader: R) -> SerializationResult<T> {
let type_data = bincode::deserialize_from(reader)?;
Ok(type_data)

@ -3,12 +3,14 @@ use serde::de::DeserializeOwned;
use serde::Serialize;
use std::io::Read;
#[inline]
pub fn serialize<T: Serialize>(data: T) -> SerializationResult<Vec<u8>> {
let bytes = serde_json::to_vec(&data)?;
Ok(bytes)
}
#[inline]
pub fn deserialize<R: Read, T: DeserializeOwned>(reader: R) -> SerializationResult<T> {
let type_data = serde_json::from_reader(reader)?;

@ -3,12 +3,14 @@ use serde::de::DeserializeOwned;
use serde::Serialize;
use std::io::Read;
#[inline]
pub fn serialize<T: Serialize>(data: T) -> SerializationResult<Vec<u8>> {
let bytes = postcard::to_allocvec(&data)?.to_vec();
Ok(bytes)
}
#[inline]
pub fn deserialize<R: Read, T: DeserializeOwned>(mut reader: R) -> SerializationResult<T> {
let mut buf = Vec::new();
// reading to end means reading the full size of the provided data

@ -3,12 +3,14 @@ use serde::de::DeserializeOwned;
use serde::Serialize;
use std::io::Read;
#[inline]
pub fn serialize<T: Serialize>(data: T) -> SerializationResult<Vec<u8>> {
let bytes = rmp_serde::to_vec(&data)?;
Ok(bytes)
}
#[inline]
pub fn deserialize<R: Read, T: DeserializeOwned>(reader: R) -> SerializationResult<T> {
let type_data = rmp_serde::from_read(reader)?;
Ok(type_data)

@ -1,21 +1,22 @@
use crate::error::{Error, Result};
use crate::event::Event;
use crate::ipc::stream_emitter::{EmitMetadata, StreamEmitter};
use futures::future;
use futures::future::Either;
use std::collections::HashMap;
use std::mem;
use std::ops::{Deref, DerefMut};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use futures::future;
use futures::future::Either;
use tokio::sync::{Mutex, oneshot, RwLock};
use tokio::sync::oneshot::Sender;
use tokio::sync::{oneshot, Mutex, RwLock};
use tokio::time::Duration;
use typemap_rev::TypeMap;
use crate::payload::IntoPayload;
use crate::error::{Error, Result};
use crate::event::Event;
use crate::ipc::stream_emitter::{EmitMetadata, StreamEmitter};
#[cfg(feature = "serialize")]
use crate::payload::{DynamicSerializer, SerdePayload};
use crate::payload::IntoPayload;
pub(crate) type ReplyListeners = Arc<Mutex<HashMap<u64, oneshot::Sender<Event>>>>;
@ -108,8 +109,15 @@ impl Context {
}
/// Waits for a reply to the given message ID
#[inline]
#[tracing::instrument(level = "debug", skip(self))]
pub async fn await_reply(&self, message_id: u64) -> Result<Event> {
self.await_reply_with_timeout(message_id, self.reply_timeout.to_owned()).await
}
/// Waits for a reply to the given Message ID with a given timeout
#[tracing::instrument(level = "debug", skip(self))]
pub async fn await_reply_with_timeout(&self, message_id: u64, timeout: Duration) -> Result<Event> {
let (rx, tx) = oneshot::channel();
{
let mut listeners = self.reply_listeners.lock().await;
@ -117,9 +125,9 @@ impl Context {
}
let result = future::select(
Box::pin(tx),
Box::pin(tokio::time::sleep(self.reply_timeout)),
Box::pin(tokio::time::sleep(timeout)),
)
.await;
.await;
let event = match result {
Either::Left((tx_result, _)) => Ok(tx_result?),
@ -145,16 +153,19 @@ impl Context {
}
#[cfg(feature = "serialize")]
#[inline]
pub fn create_serde_payload<T>(&self, data: T) -> SerdePayload<T> {
SerdePayload::new(self.default_serializer.clone(), data)
}
/// Returns the channel for a reply to the given message id
#[inline]
pub(crate) async fn get_reply_sender(&self, ref_id: u64) -> Option<oneshot::Sender<Event>> {
let mut listeners = self.reply_listeners.lock().await;
listeners.remove(&ref_id)
}
#[inline]
pub(crate) fn set_ref_id(&mut self, id: Option<u64>) {
self.ref_id = id;
}
@ -166,37 +177,40 @@ pub struct PooledContext {
}
pub struct PoolGuard<T>
where
T: Clone,
where
T: Clone,
{
inner: T,
count: Arc<AtomicUsize>,
}
impl<T> Deref for PoolGuard<T>
where
T: Clone,
where
T: Clone,
{
type Target = T;
#[inline]
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl<T> DerefMut for PoolGuard<T>
where
T: Clone,
where
T: Clone,
{
#[inline]
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.inner
}
}
impl<T> Clone for PoolGuard<T>
where
T: Clone,
where
T: Clone,
{
#[inline]
fn clone(&self) -> Self {
self.acquire();
@ -208,17 +222,18 @@ where
}
impl<T> Drop for PoolGuard<T>
where
T: Clone,
where
T: Clone,
{
#[inline]
fn drop(&mut self) {
self.release();
}
}
impl<T> PoolGuard<T>
where
T: Clone,
where
T: Clone,
{
pub(crate) fn new(inner: T) -> Self {
Self {
@ -228,6 +243,7 @@ where
}
/// Acquires the context by adding 1 to the count
#[inline]
#[tracing::instrument(level = "trace", skip_all)]
pub(crate) fn acquire(&self) {
let count = self.count.fetch_add(1, Ordering::Relaxed);
@ -235,12 +251,14 @@ where
}
/// Releases the connection by subtracting from the stored count
#[inline]
#[tracing::instrument(level = "trace", skip_all)]
pub(crate) fn release(&self) {
let count = self.count.fetch_sub(1, Ordering::Relaxed);
tracing::trace!(count);
}
#[inline]
pub(crate) fn count(&self) -> usize {
self.count.load(Ordering::Relaxed)
}
@ -256,6 +274,7 @@ impl PooledContext {
/// Acquires a context from the pool
/// It always chooses the one that is used the least
#[inline]
#[tracing::instrument(level = "trace", skip_all)]
pub fn acquire(&self) -> PoolGuard<Context> {
self.contexts

@ -5,6 +5,7 @@ use crate::ipc::context::Context;
use crate::protocol::AsyncProtocolStream;
use std::ops::DerefMut;
use std::sync::Arc;
use std::time::Duration;
use tokio::io::{AsyncWrite, AsyncWriteExt};
use tokio::sync::Mutex;
@ -50,6 +51,7 @@ impl StreamEmitter {
}
/// Emits an event
#[inline]
pub(crate) async fn emit<S: AsRef<str>>(
&self,
event: S,
@ -59,6 +61,7 @@ impl StreamEmitter {
}
/// Emits an event to a specific namespace
#[inline]
pub(crate) async fn emit_to<S1: AsRef<str>, S2: AsRef<str>>(
&self,
namespace: S1,
@ -70,6 +73,7 @@ impl StreamEmitter {
}
/// Emits a response to an event
#[inline]
pub(crate) async fn emit_response<S: AsRef<str>>(
&self,
event_id: u64,
@ -81,6 +85,7 @@ impl StreamEmitter {
}
/// Emits a response to an event to a namespace
#[inline]
pub(crate) async fn emit_response_to<S1: AsRef<str>, S2: AsRef<str>>(
&self,
event_id: u64,
@ -102,22 +107,37 @@ impl StreamEmitter {
/// This object can be used to wait for a response to an event.
pub struct EmitMetadata {
message_id: u64,
timeout: Option<Duration>
}
impl EmitMetadata {
#[inline]
pub(crate) fn new(message_id: u64) -> Self {
Self { message_id }
Self { message_id, timeout: None }
}
/// The ID of the emitted message
#[inline]
pub fn message_id(&self) -> u64 {
self.message_id
}
/// Sets a timeout for awaiting replies to this emitted event
#[inline]
pub fn with_timeout(mut self, timeout: Duration) -> Self {
self.timeout = Some(timeout);
self
}
/// Waits for a reply to the given message.
#[tracing::instrument(skip(self, ctx), fields(self.message_id))]
pub async fn await_reply(&self, ctx: &Context) -> Result<Event> {
let reply = ctx.await_reply(self.message_id).await?;
let reply = if let Some(timeout) = self.timeout {
ctx.await_reply_with_timeout(self.message_id, timeout.clone()).await?
} else {
ctx.await_reply(self.message_id).await?
};
if reply.name() == ERROR_EVENT_NAME {
Err(reply.payload::<ErrorEventData>()?.into())
} else {

@ -83,6 +83,7 @@ async fn it_receives_error_responses() {
.emit("create_error", EmptyPayload)
.await
.unwrap()
.with_timeout(Duration::from_millis(100))
.await_reply(&ctx)
.await;

Loading…
Cancel
Save