From 52f9cc1cd17bc3cfdf63f9a355cb8c6160222895 Mon Sep 17 00:00:00 2001 From: trivernis Date: Sun, 8 Nov 2020 20:47:34 +0100 Subject: [PATCH] Change emit to be synchronous to avoid too many threads Signed-off-by: trivernis --- Cargo.toml | 2 +- src/event_handler/tests.rs | 2 +- src/server/mod.rs | 43 ++++++++++++++----------------------- src/server/server_events.rs | 2 +- tests/test_communication.rs | 12 ++++------- 5 files changed, 23 insertions(+), 38 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 963fff6..89bde36 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "vented" description = "Event driven encrypted tcp communicaton" -version = "0.6.4" +version = "0.7.0" authors = ["trivernis "] edition = "2018" readme = "README.md" diff --git a/src/event_handler/tests.rs b/src/event_handler/tests.rs index fcfd7d8..7845141 100644 --- a/src/event_handler/tests.rs +++ b/src/event_handler/tests.rs @@ -45,5 +45,5 @@ fn it_handles_events() { handler.handle_event(Event::new("test".to_string())); handler.handle_event(Event::new("test2".to_string())); - assert_eq!(call_count.load(Ordering::Relaxed), 5) + assert_eq!(call_count.load(Ordering::Relaxed), 6) } diff --git a/src/server/mod.rs b/src/server/mod.rs index 9f32260..b2c591b 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -41,6 +41,7 @@ type CryptoStreamMap = Arc>>; /// use rand::thread_rng; /// use vented::event::Event; /// +/// let global_secret_b = SecretKey::generate(&mut thread_rng()); /// let nodes = vec![ /// Node { /// id: "B".to_string(), @@ -51,7 +52,7 @@ type CryptoStreamMap = Arc>>; ///]; /// // in a real world example the secret key needs to be loaded from somewhere because connections /// // with unknown keys are not accepted. -/// let global_secret = SecretKey::new(&mut thread_rng()); +/// let global_secret = SecretKey::generate(&mut thread_rng()); /// let mut server = VentedServer::new("A".to_string(), global_secret, nodes.clone(), 4); /// /// @@ -61,14 +62,13 @@ type CryptoStreamMap = Arc>>; /// /// None // the return value is the response event Option /// }); -/// server.emit("B".to_string(), Event::new("ping".to_string())).unwrap(); +/// assert!(server.emit("B".to_string(), Event::new("ping".to_string())).is_err()) // this won't work without a known node B /// ``` pub struct VentedServer { connections: CryptoStreamMap, forwarded_connections: ForwardFutureVector, known_nodes: Arc>>, - listener_pool: Arc>, - sender_pool: Arc>, + pool: Arc>, event_handler: Arc>, global_secret_key: SecretKey, node_id: String, @@ -79,7 +79,6 @@ impl VentedServer { /// Creates a new vented server with a given node_id and secret key that are /// used to authenticate against other servers. /// The given nodes are used for authentication. - /// The server runs with 2x the given amount of threads. pub fn new( node_id: String, secret_key: SecretKey, @@ -89,12 +88,8 @@ impl VentedServer { let mut server = Self { node_id, event_handler: Arc::new(Mutex::new(EventHandler::new())), - listener_pool: Arc::new(Mutex::new(ScheduledThreadPool::with_name( - "vented_listeners", - num_threads, - ))), - sender_pool: Arc::new(Mutex::new(ScheduledThreadPool::with_name( - "vented_senders", + pool: Arc::new(Mutex::new(ScheduledThreadPool::with_name( + "vented", num_threads, ))), connections: Arc::new(Mutex::new(HashMap::new())), @@ -123,20 +118,14 @@ impl VentedServer { /// Emits an event to the specified Node /// The actual writing is done in a separate thread from the thread pool. /// With the returned wait group one can wait for the event to be written. - pub fn emit(&self, node_id: String, event: Event) -> VentedResult { - let wg = WaitGroup::new(); + pub fn emit(&self, node_id: String, event: Event) -> VentedResult<()> { if let Ok(stream) = self.get_connection(&node_id) { - self.sender_pool.lock().execute({ - let wg = WaitGroup::clone(&wg); - let connections = Arc::clone(&self.connections); - move || { - if let Err(e) = stream.send(event) { - log::error!("Failed to send event: {}", e); - connections.lock().remove(stream.receiver_node()); - } - std::mem::drop(wg); - } - }); + if let Err(e) = stream.send(event) { + log::error!("Failed to send event: {}", e); + self.connections.lock().remove(stream.receiver_node()); + + return Err(e); + } } else { log::trace!( "Trying to redirect the event to a different node to be sent to target node..." @@ -144,7 +133,7 @@ impl VentedServer { self.send_event_redirected(node_id, event)?; } - Ok(wg) + Ok(()) } /// Adds a handler for the given event. @@ -216,7 +205,7 @@ impl VentedServer { known_nodes: Arc::clone(&self.known_nodes), connections: Arc::clone(&self.connections), event_handler: Arc::clone(&self.event_handler), - listener_pool: Arc::clone(&self.listener_pool), + listener_pool: Arc::clone(&self.pool), forwarded_connections: Arc::clone(&self.forwarded_connections), } } @@ -368,7 +357,7 @@ impl VentedServer { let connections = Arc::clone(&context.connections); let stream = Self::get_crypto_stream(context.clone(), stream)?; - self.listener_pool.lock().execute({ + self.pool.lock().execute({ let stream = CryptoStream::clone(&stream); let event_handler = Arc::clone(&self.event_handler); diff --git a/src/server/server_events.rs b/src/server/server_events.rs index 6e58515..5540628 100644 --- a/src/server/server_events.rs +++ b/src/server/server_events.rs @@ -142,7 +142,7 @@ impl VentedServer { self.on(REDIRECT_REDIRECTED_EVENT, { let event_handler = Arc::clone(&self.event_handler); let connections = Arc::clone(&self.connections); - let pool = Arc::clone(&self.sender_pool); + let pool = Arc::clone(&self.pool); let known_nodes = Arc::clone(&self.known_nodes); move |event| { diff --git a/tests/test_communication.rs b/tests/test_communication.rs index 455a3b7..7e31e46 100644 --- a/tests/test_communication.rs +++ b/tests/test_communication.rs @@ -98,24 +98,20 @@ fn test_server_communication() { } }); server_b.request_node_list().unwrap(); - let wg = server_c + server_c .emit("A".to_string(), Event::new("ping".to_string())) .unwrap(); - wg.wait(); for _ in 0..9 { - let wg = server_b + server_b .emit("A".to_string(), Event::new("ping".to_string())) .unwrap(); - wg.wait(); } - let wg = server_a + server_a .emit("B".to_string(), Event::new("pong".to_string())) .unwrap(); - wg.wait(); - let wg = server_b + server_b .emit("C".to_string(), Event::new("ping".to_string())) .unwrap(); - wg.wait(); // wait one second to make sure the servers were able to process the events for _ in 0..100 {