Add backpressure to listener and parallel event handling

Signed-off-by: trivernis <trivernis@protonmail.com>
pull/1/head
trivernis 4 years ago
parent d017e8e250
commit 5e603887d2
Signed by: Trivernis
GPG Key ID: DFFFCC2C7A02DB45

@ -1,7 +1,7 @@
[package]
name = "vented"
description = "Event driven encrypted tcp communicaton"
version = "0.11.0"
version = "0.11.1"
authors = ["trivernis <trivernis@protonmail.com>"]
edition = "2018"
readme = "README.md"
@ -26,6 +26,8 @@ typenum = "1.12.0"
x25519-dalek = "1.1.0"
crossbeam-utils = "0.8.0"
async-std = "1.7.0"
async-listen = "0.2.1"
futures = "0.3.8"
[dev-dependencies]
simple_logger = "1.11.0"

@ -56,20 +56,21 @@ impl EventHandler {
/// Handles a single event
pub async fn handle_event(&mut self, event: Event) -> Vec<Event> {
let mut response_events: Vec<Event> = Vec::new();
let mut option_futures = Vec::new();
if let Some(handlers) = self.event_handlers.lock().get(&event.name) {
for handler in handlers {
let result = handler(event.clone());
task::block_on(async {
if let Some(e) = result.await {
response_events.push(e.clone());
}
})
option_futures.push(result);
}
}
response_events
task::block_on(async move {
futures::future::join_all(option_futures)
.await
.into_iter()
.filter_map(|opt| opt)
.collect::<Vec<Event>>()
})
}
}

@ -2,7 +2,7 @@ use async_std::net::{TcpListener, TcpStream};
use std::collections::HashMap;
use std::iter::FromIterator;
use std::sync::Arc;
use std::time::Instant;
use std::time::{Duration, Instant};
use crypto_box::{PublicKey, SecretKey};
use parking_lot::Mutex;
@ -20,6 +20,7 @@ use crate::server::server_events::{
use crate::stream::cryptostream::CryptoStream;
use crate::utils::result::{VentedError, VentedResult};
use crate::utils::sync::AsyncValue;
use async_listen::ListenExt;
use async_std::prelude::*;
use async_std::task;
use std::pin::Pin;
@ -158,22 +159,22 @@ impl VentedServer {
}
};
log::info!("Listener running on {}", address);
while let Some(connection) = listener.incoming().next().await {
match connection {
Ok(stream) => {
while let Some((token, stream)) = listener
.incoming()
.log_warnings(|e| log::warn!("Failed to establish connection: {}", e))
.handle_errors(Duration::from_millis(500))
.backpressure(1000)
.next()
.await
{
let mut this = this.clone();
task::spawn(async move {
if let Err(e) = this.handle_connection(stream).await {
log::error!("Failed to handle connection: {}", e);
}
std::mem::drop(token)
});
}
Err(e) => {
log::trace!("Failed to establish connection: {}", e);
continue;
}
}
}
});
}

Loading…
Cancel
Save