|
|
|
@ -3,11 +3,15 @@ use crate::event::Event;
|
|
|
|
|
use crate::ipc::stream_emitter::StreamEmitter;
|
|
|
|
|
use std::collections::HashMap;
|
|
|
|
|
use std::mem;
|
|
|
|
|
use std::ops::{Deref, DerefMut};
|
|
|
|
|
use std::sync::atomic::{AtomicUsize, Ordering};
|
|
|
|
|
use std::sync::Arc;
|
|
|
|
|
use tokio::sync::oneshot::Sender;
|
|
|
|
|
use tokio::sync::{oneshot, Mutex, RwLock};
|
|
|
|
|
use typemap_rev::TypeMap;
|
|
|
|
|
|
|
|
|
|
pub(crate) type ReplyListeners = Arc<Mutex<HashMap<u64, oneshot::Sender<Event>>>>;
|
|
|
|
|
|
|
|
|
|
/// An object provided to each callback function.
|
|
|
|
|
/// Currently it only holds the event emitter to emit response events in event callbacks.
|
|
|
|
|
/// ```rust
|
|
|
|
@ -30,7 +34,7 @@ pub struct Context {
|
|
|
|
|
|
|
|
|
|
stop_sender: Arc<Mutex<Option<Sender<()>>>>,
|
|
|
|
|
|
|
|
|
|
reply_listeners: Arc<Mutex<HashMap<u64, oneshot::Sender<Event>>>>,
|
|
|
|
|
reply_listeners: ReplyListeners,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl Context {
|
|
|
|
@ -38,10 +42,11 @@ impl Context {
|
|
|
|
|
emitter: StreamEmitter,
|
|
|
|
|
data: Arc<RwLock<TypeMap>>,
|
|
|
|
|
stop_sender: Option<Sender<()>>,
|
|
|
|
|
reply_listeners: ReplyListeners,
|
|
|
|
|
) -> Self {
|
|
|
|
|
Self {
|
|
|
|
|
emitter,
|
|
|
|
|
reply_listeners: Arc::new(Mutex::new(HashMap::new())),
|
|
|
|
|
reply_listeners,
|
|
|
|
|
data,
|
|
|
|
|
stop_sender: Arc::new(Mutex::new(stop_sender)),
|
|
|
|
|
}
|
|
|
|
@ -77,3 +82,90 @@ impl Context {
|
|
|
|
|
listeners.remove(&ref_id)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[derive(Clone)]
|
|
|
|
|
pub struct PooledContext {
|
|
|
|
|
contexts: Vec<PoolGuard<Context>>,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[derive(Clone)]
|
|
|
|
|
pub struct PoolGuard<T>
|
|
|
|
|
where
|
|
|
|
|
T: Clone,
|
|
|
|
|
{
|
|
|
|
|
inner: T,
|
|
|
|
|
count: Arc<AtomicUsize>,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl<T> Deref for PoolGuard<T>
|
|
|
|
|
where
|
|
|
|
|
T: Clone,
|
|
|
|
|
{
|
|
|
|
|
type Target = T;
|
|
|
|
|
|
|
|
|
|
fn deref(&self) -> &Self::Target {
|
|
|
|
|
&self.inner
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl<T> DerefMut for PoolGuard<T>
|
|
|
|
|
where
|
|
|
|
|
T: Clone,
|
|
|
|
|
{
|
|
|
|
|
fn deref_mut(&mut self) -> &mut Self::Target {
|
|
|
|
|
&mut self.inner
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl<T> Drop for PoolGuard<T>
|
|
|
|
|
where
|
|
|
|
|
T: Clone,
|
|
|
|
|
{
|
|
|
|
|
fn drop(&mut self) {
|
|
|
|
|
self.count.fetch_sub(1, Ordering::Relaxed);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl<T> PoolGuard<T>
|
|
|
|
|
where
|
|
|
|
|
T: Clone,
|
|
|
|
|
{
|
|
|
|
|
pub(crate) fn new(inner: T) -> Self {
|
|
|
|
|
Self {
|
|
|
|
|
inner,
|
|
|
|
|
count: Arc::new(AtomicUsize::new(0)),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Acquires the context by adding 1 to the count and
|
|
|
|
|
/// returning the cloned variant
|
|
|
|
|
pub(crate) fn aqcuire(&self) -> Self {
|
|
|
|
|
self.count.fetch_add(1, Ordering::Relaxed);
|
|
|
|
|
|
|
|
|
|
self.clone()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub(crate) fn count(&self) -> usize {
|
|
|
|
|
self.count.load(Ordering::Relaxed)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl PooledContext {
|
|
|
|
|
/// Creates a new pooled context from a list of contexts
|
|
|
|
|
pub(crate) fn new(contexts: Vec<Context>) -> Self {
|
|
|
|
|
Self {
|
|
|
|
|
contexts: contexts.into_iter().map(PoolGuard::new).collect(),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Acquires a context from the pool
|
|
|
|
|
/// It always chooses the one that is used the least
|
|
|
|
|
#[tracing::instrument(level = "trace", skip_all)]
|
|
|
|
|
pub fn acquire(&self) -> PoolGuard<Context> {
|
|
|
|
|
self.contexts
|
|
|
|
|
.iter()
|
|
|
|
|
.min_by_key(|c| c.count())
|
|
|
|
|
.unwrap()
|
|
|
|
|
.aqcuire()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|