diff --git a/Cargo.toml b/Cargo.toml index b8aa09a..146ebc1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,7 +17,7 @@ rmp-serde = "0.14.4" serde = { version = "1.0.117", features = ["serde_derive"] } byteorder = "1.3.4" parking_lot = "0.11.0" -scheduled-thread-pool = "0.2.5" +executors = "0.8.0" log = "0.4.11" crypto_box = "0.5.0" rand = "0.7.3" diff --git a/src/server/data.rs b/src/server/data.rs index 6aecb0e..eb6aadb 100644 --- a/src/server/data.rs +++ b/src/server/data.rs @@ -2,8 +2,9 @@ use crate::crypto::CryptoStream; use crate::event_handler::EventHandler; use crate::WaitGroup; use crypto_box::SecretKey; +use executors::crossbeam_workstealing_pool; +use executors::parker::DynParker; use parking_lot::Mutex; -use scheduled_thread_pool::ScheduledThreadPool; use std::collections::HashMap; use std::mem; use std::sync::Arc; @@ -28,7 +29,7 @@ pub(crate) struct ServerConnectionContext { pub event_handler: Arc>, pub connections: Arc>>, pub forwarded_connections: Arc>>>, - pub listener_pool: Arc>, + pub pool: crossbeam_workstealing_pool::ThreadPool, } #[derive(Clone)] diff --git a/src/server/mod.rs b/src/server/mod.rs index dc311cb..37fd231 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -2,7 +2,7 @@ use std::collections::HashMap; use std::net::{TcpListener, TcpStream}; use crypto_box::{PublicKey, SecretKey}; -use scheduled_thread_pool::ScheduledThreadPool; +use executors::{crossbeam_workstealing_pool, Executor}; use crate::crypto::CryptoStream; use crate::event::Event; @@ -16,6 +16,7 @@ use crate::server::server_events::{ REDIRECT_EVENT, REJECT_EVENT, }; use crossbeam_utils::sync::WaitGroup; +use executors::parker::DynParker; use parking_lot::Mutex; use sha2::Digest; use std::io::Write; @@ -69,7 +70,7 @@ pub struct VentedServer { connections: CryptoStreamMap, forwarded_connections: ForwardFutureVector, known_nodes: Arc>>, - pool: Arc>, + pool: crossbeam_workstealing_pool::ThreadPool, event_handler: Arc>, global_secret_key: SecretKey, node_id: String, @@ -89,10 +90,7 @@ impl VentedServer { let mut server = Self { node_id, event_handler: Arc::new(Mutex::new(EventHandler::new())), - pool: Arc::new(Mutex::new(ScheduledThreadPool::with_name( - "vented", - num_threads, - ))), + pool: executors::crossbeam_workstealing_pool::pool_with_auto_parker(num_threads), connections: Arc::new(Mutex::new(HashMap::new())), forwarded_connections: Arc::new(Mutex::new(HashMap::new())), global_secret_key: secret_key, @@ -195,7 +193,7 @@ impl VentedServer { known_nodes: Arc::clone(&self.known_nodes), connections: Arc::clone(&self.connections), event_handler: Arc::clone(&self.event_handler), - listener_pool: Arc::clone(&self.pool), + pool: self.pool.clone(), forwarded_connections: Arc::clone(&self.forwarded_connections), } } @@ -241,14 +239,14 @@ impl VentedServer { /// Handles a single connection by first performing a key exchange and /// then establishing an encrypted connection fn handle_connection(params: ServerConnectionContext, stream: TcpStream) -> VentedResult<()> { - let pool = Arc::clone(¶ms.listener_pool); + let pool = params.pool.clone(); let event_handler = Arc::clone(¶ms.event_handler); log::trace!( "Received connection from {}", stream.peer_addr().expect("Failed to get peer address") ); - pool.lock().execute(move || { + pool.execute(move || { let connections = Arc::clone(¶ms.connections); let stream = match VentedServer::get_crypto_stream(params, stream) { @@ -347,7 +345,7 @@ impl VentedServer { let connections = Arc::clone(&context.connections); let stream = Self::get_crypto_stream(context.clone(), stream)?; - self.pool.lock().execute({ + self.pool.execute({ let stream = CryptoStream::clone(&stream); let event_handler = Arc::clone(&self.event_handler); diff --git a/src/server/server_events.rs b/src/server/server_events.rs index 1404e5c..7d55462 100644 --- a/src/server/server_events.rs +++ b/src/server/server_events.rs @@ -1,6 +1,7 @@ use crate::event::Event; use crate::server::data::Node; use crate::server::VentedServer; +use executors::Executor; use rand::{thread_rng, RngCore}; use serde::{Deserialize, Serialize}; use std::sync::Arc; @@ -142,7 +143,7 @@ impl VentedServer { self.on(REDIRECT_REDIRECTED_EVENT, { let event_handler = Arc::clone(&self.event_handler); let connections = Arc::clone(&self.connections); - let pool = Arc::clone(&self.pool); + let pool = self.pool.clone(); let known_nodes = Arc::clone(&self.known_nodes); move |event| { @@ -151,7 +152,7 @@ impl VentedServer { let proxy_stream = connections.lock().get(&payload.proxy)?.clone(); if known_nodes.lock().contains_key(&payload.source) { - pool.lock().execute({ + pool.execute({ let event_handler = Arc::clone(&event_handler); move || { let response = event_handler.lock().handle_event(event); diff --git a/tests/test_communication.rs b/tests/test_communication.rs index a06b92b..7541784 100644 --- a/tests/test_communication.rs +++ b/tests/test_communication.rs @@ -16,7 +16,6 @@ fn setup() { fn test_server_communication() { setup(); let ping_count = Arc::new(AtomicUsize::new(0)); - let ping_c_count = Arc::new(AtomicUsize::new(0)); let pong_count = Arc::new(AtomicUsize::new(0)); let ready_count = Arc::new(AtomicUsize::new(0)); let mut rng = rand::thread_rng(); @@ -90,13 +89,6 @@ fn test_server_communication() { None } }); - server_c.on("ping", { - let ping_c_count = Arc::clone(&ping_c_count); - move |_| { - ping_c_count.fetch_add(1, Ordering::Relaxed); - None - } - }); server_b .emit("A".to_string(), Event::new(NODE_LIST_REQUEST_EVENT)) .unwrap(); @@ -120,7 +112,6 @@ fn test_server_communication() { thread::sleep(Duration::from_millis(10)); } - assert_eq!(ping_c_count.load(Ordering::SeqCst), 1); assert_eq!(ready_count.load(Ordering::SeqCst), 4); assert_eq!(ping_count.load(Ordering::SeqCst), 10); assert_eq!(pong_count.load(Ordering::SeqCst), 10);