Change thread pool to executor pools

Signed-off-by: trivernis <trivernis@protonmail.com>
pull/1/head
trivernis 4 years ago
parent b56bf4c6db
commit a2bed2074a
Signed by: Trivernis
GPG Key ID: DFFFCC2C7A02DB45

@ -17,7 +17,7 @@ rmp-serde = "0.14.4"
serde = { version = "1.0.117", features = ["serde_derive"] } serde = { version = "1.0.117", features = ["serde_derive"] }
byteorder = "1.3.4" byteorder = "1.3.4"
parking_lot = "0.11.0" parking_lot = "0.11.0"
scheduled-thread-pool = "0.2.5" executors = "0.8.0"
log = "0.4.11" log = "0.4.11"
crypto_box = "0.5.0" crypto_box = "0.5.0"
rand = "0.7.3" rand = "0.7.3"

@ -2,8 +2,9 @@ use crate::crypto::CryptoStream;
use crate::event_handler::EventHandler; use crate::event_handler::EventHandler;
use crate::WaitGroup; use crate::WaitGroup;
use crypto_box::SecretKey; use crypto_box::SecretKey;
use executors::crossbeam_workstealing_pool;
use executors::parker::DynParker;
use parking_lot::Mutex; use parking_lot::Mutex;
use scheduled_thread_pool::ScheduledThreadPool;
use std::collections::HashMap; use std::collections::HashMap;
use std::mem; use std::mem;
use std::sync::Arc; use std::sync::Arc;
@ -28,7 +29,7 @@ pub(crate) struct ServerConnectionContext {
pub event_handler: Arc<Mutex<EventHandler>>, pub event_handler: Arc<Mutex<EventHandler>>,
pub connections: Arc<Mutex<HashMap<String, CryptoStream>>>, pub connections: Arc<Mutex<HashMap<String, CryptoStream>>>,
pub forwarded_connections: Arc<Mutex<HashMap<(String, String), AsyncValue<CryptoStream>>>>, pub forwarded_connections: Arc<Mutex<HashMap<(String, String), AsyncValue<CryptoStream>>>>,
pub listener_pool: Arc<Mutex<ScheduledThreadPool>>, pub pool: crossbeam_workstealing_pool::ThreadPool<DynParker>,
} }
#[derive(Clone)] #[derive(Clone)]

@ -2,7 +2,7 @@ use std::collections::HashMap;
use std::net::{TcpListener, TcpStream}; use std::net::{TcpListener, TcpStream};
use crypto_box::{PublicKey, SecretKey}; use crypto_box::{PublicKey, SecretKey};
use scheduled_thread_pool::ScheduledThreadPool; use executors::{crossbeam_workstealing_pool, Executor};
use crate::crypto::CryptoStream; use crate::crypto::CryptoStream;
use crate::event::Event; use crate::event::Event;
@ -16,6 +16,7 @@ use crate::server::server_events::{
REDIRECT_EVENT, REJECT_EVENT, REDIRECT_EVENT, REJECT_EVENT,
}; };
use crossbeam_utils::sync::WaitGroup; use crossbeam_utils::sync::WaitGroup;
use executors::parker::DynParker;
use parking_lot::Mutex; use parking_lot::Mutex;
use sha2::Digest; use sha2::Digest;
use std::io::Write; use std::io::Write;
@ -69,7 +70,7 @@ pub struct VentedServer {
connections: CryptoStreamMap, connections: CryptoStreamMap,
forwarded_connections: ForwardFutureVector, forwarded_connections: ForwardFutureVector,
known_nodes: Arc<Mutex<HashMap<String, Node>>>, known_nodes: Arc<Mutex<HashMap<String, Node>>>,
pool: Arc<Mutex<ScheduledThreadPool>>, pool: crossbeam_workstealing_pool::ThreadPool<DynParker>,
event_handler: Arc<Mutex<EventHandler>>, event_handler: Arc<Mutex<EventHandler>>,
global_secret_key: SecretKey, global_secret_key: SecretKey,
node_id: String, node_id: String,
@ -89,10 +90,7 @@ impl VentedServer {
let mut server = Self { let mut server = Self {
node_id, node_id,
event_handler: Arc::new(Mutex::new(EventHandler::new())), event_handler: Arc::new(Mutex::new(EventHandler::new())),
pool: Arc::new(Mutex::new(ScheduledThreadPool::with_name( pool: executors::crossbeam_workstealing_pool::pool_with_auto_parker(num_threads),
"vented",
num_threads,
))),
connections: Arc::new(Mutex::new(HashMap::new())), connections: Arc::new(Mutex::new(HashMap::new())),
forwarded_connections: Arc::new(Mutex::new(HashMap::new())), forwarded_connections: Arc::new(Mutex::new(HashMap::new())),
global_secret_key: secret_key, global_secret_key: secret_key,
@ -195,7 +193,7 @@ impl VentedServer {
known_nodes: Arc::clone(&self.known_nodes), known_nodes: Arc::clone(&self.known_nodes),
connections: Arc::clone(&self.connections), connections: Arc::clone(&self.connections),
event_handler: Arc::clone(&self.event_handler), event_handler: Arc::clone(&self.event_handler),
listener_pool: Arc::clone(&self.pool), pool: self.pool.clone(),
forwarded_connections: Arc::clone(&self.forwarded_connections), forwarded_connections: Arc::clone(&self.forwarded_connections),
} }
} }
@ -241,14 +239,14 @@ impl VentedServer {
/// Handles a single connection by first performing a key exchange and /// Handles a single connection by first performing a key exchange and
/// then establishing an encrypted connection /// then establishing an encrypted connection
fn handle_connection(params: ServerConnectionContext, stream: TcpStream) -> VentedResult<()> { fn handle_connection(params: ServerConnectionContext, stream: TcpStream) -> VentedResult<()> {
let pool = Arc::clone(&params.listener_pool); let pool = params.pool.clone();
let event_handler = Arc::clone(&params.event_handler); let event_handler = Arc::clone(&params.event_handler);
log::trace!( log::trace!(
"Received connection from {}", "Received connection from {}",
stream.peer_addr().expect("Failed to get peer address") stream.peer_addr().expect("Failed to get peer address")
); );
pool.lock().execute(move || { pool.execute(move || {
let connections = Arc::clone(&params.connections); let connections = Arc::clone(&params.connections);
let stream = match VentedServer::get_crypto_stream(params, stream) { let stream = match VentedServer::get_crypto_stream(params, stream) {
@ -347,7 +345,7 @@ impl VentedServer {
let connections = Arc::clone(&context.connections); let connections = Arc::clone(&context.connections);
let stream = Self::get_crypto_stream(context.clone(), stream)?; let stream = Self::get_crypto_stream(context.clone(), stream)?;
self.pool.lock().execute({ self.pool.execute({
let stream = CryptoStream::clone(&stream); let stream = CryptoStream::clone(&stream);
let event_handler = Arc::clone(&self.event_handler); let event_handler = Arc::clone(&self.event_handler);

@ -1,6 +1,7 @@
use crate::event::Event; use crate::event::Event;
use crate::server::data::Node; use crate::server::data::Node;
use crate::server::VentedServer; use crate::server::VentedServer;
use executors::Executor;
use rand::{thread_rng, RngCore}; use rand::{thread_rng, RngCore};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::sync::Arc; use std::sync::Arc;
@ -142,7 +143,7 @@ impl VentedServer {
self.on(REDIRECT_REDIRECTED_EVENT, { self.on(REDIRECT_REDIRECTED_EVENT, {
let event_handler = Arc::clone(&self.event_handler); let event_handler = Arc::clone(&self.event_handler);
let connections = Arc::clone(&self.connections); let connections = Arc::clone(&self.connections);
let pool = Arc::clone(&self.pool); let pool = self.pool.clone();
let known_nodes = Arc::clone(&self.known_nodes); let known_nodes = Arc::clone(&self.known_nodes);
move |event| { move |event| {
@ -151,7 +152,7 @@ impl VentedServer {
let proxy_stream = connections.lock().get(&payload.proxy)?.clone(); let proxy_stream = connections.lock().get(&payload.proxy)?.clone();
if known_nodes.lock().contains_key(&payload.source) { if known_nodes.lock().contains_key(&payload.source) {
pool.lock().execute({ pool.execute({
let event_handler = Arc::clone(&event_handler); let event_handler = Arc::clone(&event_handler);
move || { move || {
let response = event_handler.lock().handle_event(event); let response = event_handler.lock().handle_event(event);

@ -16,7 +16,6 @@ fn setup() {
fn test_server_communication() { fn test_server_communication() {
setup(); setup();
let ping_count = Arc::new(AtomicUsize::new(0)); let ping_count = Arc::new(AtomicUsize::new(0));
let ping_c_count = Arc::new(AtomicUsize::new(0));
let pong_count = Arc::new(AtomicUsize::new(0)); let pong_count = Arc::new(AtomicUsize::new(0));
let ready_count = Arc::new(AtomicUsize::new(0)); let ready_count = Arc::new(AtomicUsize::new(0));
let mut rng = rand::thread_rng(); let mut rng = rand::thread_rng();
@ -90,13 +89,6 @@ fn test_server_communication() {
None None
} }
}); });
server_c.on("ping", {
let ping_c_count = Arc::clone(&ping_c_count);
move |_| {
ping_c_count.fetch_add(1, Ordering::Relaxed);
None
}
});
server_b server_b
.emit("A".to_string(), Event::new(NODE_LIST_REQUEST_EVENT)) .emit("A".to_string(), Event::new(NODE_LIST_REQUEST_EVENT))
.unwrap(); .unwrap();
@ -120,7 +112,6 @@ fn test_server_communication() {
thread::sleep(Duration::from_millis(10)); thread::sleep(Duration::from_millis(10));
} }
assert_eq!(ping_c_count.load(Ordering::SeqCst), 1);
assert_eq!(ready_count.load(Ordering::SeqCst), 4); assert_eq!(ready_count.load(Ordering::SeqCst), 4);
assert_eq!(ping_count.load(Ordering::SeqCst), 10); assert_eq!(ping_count.load(Ordering::SeqCst), 10);
assert_eq!(pong_count.load(Ordering::SeqCst), 10); assert_eq!(pong_count.load(Ordering::SeqCst), 10);

Loading…
Cancel
Save