From 5e603887d27060515266140b42201011333859ce Mon Sep 17 00:00:00 2001 From: trivernis Date: Thu, 12 Nov 2020 15:10:51 +0100 Subject: [PATCH] Add backpressure to listener and parallel event handling Signed-off-by: trivernis --- Cargo.toml | 4 +++- src/event_handler/mod.rs | 17 +++++++++-------- src/server/mod.rs | 31 ++++++++++++++++--------------- 3 files changed, 28 insertions(+), 24 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 32538db..569fc8b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "vented" description = "Event driven encrypted tcp communicaton" -version = "0.11.0" +version = "0.11.1" authors = ["trivernis "] edition = "2018" readme = "README.md" @@ -26,6 +26,8 @@ typenum = "1.12.0" x25519-dalek = "1.1.0" crossbeam-utils = "0.8.0" async-std = "1.7.0" +async-listen = "0.2.1" +futures = "0.3.8" [dev-dependencies] simple_logger = "1.11.0" \ No newline at end of file diff --git a/src/event_handler/mod.rs b/src/event_handler/mod.rs index a4f0166..a1478e2 100644 --- a/src/event_handler/mod.rs +++ b/src/event_handler/mod.rs @@ -56,20 +56,21 @@ impl EventHandler { /// Handles a single event pub async fn handle_event(&mut self, event: Event) -> Vec { - let mut response_events: Vec = Vec::new(); + let mut option_futures = Vec::new(); if let Some(handlers) = self.event_handlers.lock().get(&event.name) { for handler in handlers { let result = handler(event.clone()); - task::block_on(async { - if let Some(e) = result.await { - response_events.push(e.clone()); - } - }) + option_futures.push(result); } } - - response_events + task::block_on(async move { + futures::future::join_all(option_futures) + .await + .into_iter() + .filter_map(|opt| opt) + .collect::>() + }) } } diff --git a/src/server/mod.rs b/src/server/mod.rs index 31618fa..26d12cf 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -2,7 +2,7 @@ use async_std::net::{TcpListener, TcpStream}; use std::collections::HashMap; use std::iter::FromIterator; use std::sync::Arc; -use std::time::Instant; +use std::time::{Duration, Instant}; use crypto_box::{PublicKey, SecretKey}; use parking_lot::Mutex; @@ -20,6 +20,7 @@ use crate::server::server_events::{ use crate::stream::cryptostream::CryptoStream; use crate::utils::result::{VentedError, VentedResult}; use crate::utils::sync::AsyncValue; +use async_listen::ListenExt; use async_std::prelude::*; use async_std::task; use std::pin::Pin; @@ -158,21 +159,21 @@ impl VentedServer { } }; log::info!("Listener running on {}", address); - while let Some(connection) = listener.incoming().next().await { - match connection { - Ok(stream) => { - let mut this = this.clone(); - task::spawn(async move { - if let Err(e) = this.handle_connection(stream).await { - log::error!("Failed to handle connection: {}", e); - } - }); - } - Err(e) => { - log::trace!("Failed to establish connection: {}", e); - continue; + while let Some((token, stream)) = listener + .incoming() + .log_warnings(|e| log::warn!("Failed to establish connection: {}", e)) + .handle_errors(Duration::from_millis(500)) + .backpressure(1000) + .next() + .await + { + let mut this = this.clone(); + task::spawn(async move { + if let Err(e) = this.handle_connection(stream).await { + log::error!("Failed to handle connection: {}", e); } - } + std::mem::drop(token) + }); } }); }