|
|
|
@ -2,6 +2,8 @@ use crate::error::{Error, Result};
|
|
|
|
|
use crate::event::Event;
|
|
|
|
|
use crate::ipc::stream_emitter::StreamEmitter;
|
|
|
|
|
use crate::protocol::AsyncProtocolStream;
|
|
|
|
|
use futures::future;
|
|
|
|
|
use futures::future::Either;
|
|
|
|
|
use std::collections::HashMap;
|
|
|
|
|
use std::mem;
|
|
|
|
|
use std::ops::{Deref, DerefMut};
|
|
|
|
@ -9,6 +11,7 @@ use std::sync::atomic::{AtomicUsize, Ordering};
|
|
|
|
|
use std::sync::Arc;
|
|
|
|
|
use tokio::sync::oneshot::Sender;
|
|
|
|
|
use tokio::sync::{oneshot, Mutex, RwLock};
|
|
|
|
|
use tokio::time::Duration;
|
|
|
|
|
use typemap_rev::TypeMap;
|
|
|
|
|
|
|
|
|
|
pub(crate) type ReplyListeners = Arc<Mutex<HashMap<u64, oneshot::Sender<Event>>>>;
|
|
|
|
@ -35,6 +38,8 @@ pub struct Context<S: AsyncProtocolStream> {
|
|
|
|
|
stop_sender: Arc<Mutex<Option<Sender<()>>>>,
|
|
|
|
|
|
|
|
|
|
reply_listeners: ReplyListeners,
|
|
|
|
|
|
|
|
|
|
reply_timeout: Duration,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl<S> Clone for Context<S>
|
|
|
|
@ -47,6 +52,7 @@ where
|
|
|
|
|
data: Arc::clone(&self.data),
|
|
|
|
|
stop_sender: Arc::clone(&self.stop_sender),
|
|
|
|
|
reply_listeners: Arc::clone(&self.reply_listeners),
|
|
|
|
|
reply_timeout: self.reply_timeout.clone(),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -60,12 +66,14 @@ where
|
|
|
|
|
data: Arc<RwLock<TypeMap>>,
|
|
|
|
|
stop_sender: Option<Sender<()>>,
|
|
|
|
|
reply_listeners: ReplyListeners,
|
|
|
|
|
reply_timeout: Duration,
|
|
|
|
|
) -> Self {
|
|
|
|
|
Self {
|
|
|
|
|
emitter,
|
|
|
|
|
reply_listeners,
|
|
|
|
|
data,
|
|
|
|
|
stop_sender: Arc::new(Mutex::new(stop_sender)),
|
|
|
|
|
reply_timeout,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -77,7 +85,20 @@ where
|
|
|
|
|
let mut listeners = self.reply_listeners.lock().await;
|
|
|
|
|
listeners.insert(message_id, rx);
|
|
|
|
|
}
|
|
|
|
|
let event = tx.await?;
|
|
|
|
|
let result = future::select(
|
|
|
|
|
Box::pin(tx),
|
|
|
|
|
Box::pin(tokio::time::sleep(self.reply_timeout)),
|
|
|
|
|
)
|
|
|
|
|
.await;
|
|
|
|
|
|
|
|
|
|
let event = match result {
|
|
|
|
|
Either::Left((tx_result, _)) => Ok(tx_result?),
|
|
|
|
|
Either::Right(_) => {
|
|
|
|
|
let mut listeners = self.reply_listeners.lock().await;
|
|
|
|
|
listeners.remove(&message_id);
|
|
|
|
|
Err(Error::Timeout)
|
|
|
|
|
}
|
|
|
|
|
}?;
|
|
|
|
|
|
|
|
|
|
Ok(event)
|
|
|
|
|
}
|
|
|
|
|