From 984c1308f900a5a6ce00f581339936d005fd3511 Mon Sep 17 00:00:00 2001 From: trivernis Date: Tue, 10 Nov 2020 09:30:09 +0100 Subject: [PATCH] Make timeouts configurable Signed-off-by: trivernis --- Cargo.toml | 2 +- src/server/data.rs | 18 +++++++++++++++++- src/server/mod.rs | 20 ++++++++++++-------- tests/test_communication.rs | 29 +++++++++++++++++++++++++---- 4 files changed, 55 insertions(+), 14 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 8077b46..00b04a2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "vented" description = "Event driven encrypted tcp communicaton" -version = "0.10.4" +version = "0.10.5" authors = ["trivernis "] edition = "2018" readme = "README.md" diff --git a/src/server/data.rs b/src/server/data.rs index 8bfa9aa..6f89df2 100644 --- a/src/server/data.rs +++ b/src/server/data.rs @@ -8,7 +8,7 @@ use x25519_dalek::PublicKey; use crate::event_handler::EventHandler; use crate::stream::cryptostream::CryptoStream; -use crate::stream::manager::ConcurrentStreamManager; +use crate::stream::manager::{ConcurrentStreamManager, CONNECTION_TIMEOUT_SECONDS}; use crate::utils::result::VentedError; use crate::utils::sync::AsyncValue; use std::time::{Duration, Instant}; @@ -46,6 +46,22 @@ pub(crate) struct ServerConnectionContext { pub recv_pool: Arc>, pub redirect_handles: Arc>>>, pub manager: ConcurrentStreamManager, + pub timeouts: ServerTimeouts, +} + +#[derive(Clone, Debug)] +pub struct ServerTimeouts { + pub send_timeout: Duration, + pub redirect_timeout: Duration, +} + +impl Default for ServerTimeouts { + fn default() -> Self { + Self { + send_timeout: Duration::from_secs(CONNECTION_TIMEOUT_SECONDS), + redirect_timeout: Duration::from_secs(CONNECTION_TIMEOUT_SECONDS * 2), + } + } } impl From for NodeData { diff --git a/src/server/mod.rs b/src/server/mod.rs index 06d76d6..a87b1ca 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -5,7 +5,7 @@ use std::mem; use std::net::{TcpListener, TcpStream}; use std::sync::Arc; use std::thread; -use std::time::{Duration, Instant}; +use std::time::Instant; use crossbeam_utils::sync::WaitGroup; use crypto_box::{PublicKey, SecretKey}; @@ -16,14 +16,14 @@ use x25519_dalek::StaticSecret; use crate::event::Event; use crate::event_handler::EventHandler; -use crate::server::data::{Node, NodeData, NodeState, ServerConnectionContext}; +use crate::server::data::{Node, NodeData, NodeState, ServerConnectionContext, ServerTimeouts}; use crate::server::server_events::{ AuthPayload, ChallengePayload, NodeInformationPayload, RedirectPayload, VersionMismatchPayload, ACCEPT_EVENT, AUTH_EVENT, CHALLENGE_EVENT, CONNECT_EVENT, MISMATCH_EVENT, READY_EVENT, REDIRECT_EVENT, REJECT_EVENT, }; use crate::stream::cryptostream::CryptoStream; -use crate::stream::manager::{ConcurrentStreamManager, CONNECTION_TIMEOUT_SECONDS}; +use crate::stream::manager::ConcurrentStreamManager; use crate::utils::result::{VentedError, VentedResult}; use crate::utils::sync::AsyncValue; use std::cmp::max; @@ -40,7 +40,7 @@ type ForwardFutureVector = Arc>, receiver_pool: Arc>, + timeouts: ServerTimeouts, } impl VentedServer { @@ -88,6 +89,7 @@ impl VentedServer { node_id: String, secret_key: SecretKey, nodes: Vec, + timeouts: ServerTimeouts, num_threads: usize, max_threads: usize, ) -> Self { @@ -112,6 +114,7 @@ impl VentedServer { num_threads / 2, 1, )))), + timeouts, }; server.register_events(); server.start_event_listener(); @@ -203,6 +206,7 @@ impl VentedServer { redirect_handles: Arc::clone(&self.redirect_handles), manager: self.manager.clone(), recv_pool: Arc::clone(&self.receiver_pool), + timeouts: self.timeouts.clone(), } } @@ -347,7 +351,7 @@ impl VentedServer { } if let Some(Ok(_)) = - future.get_value_with_timeout(Duration::from_secs(CONNECTION_TIMEOUT_SECONDS)) + future.get_value_with_timeout(context.timeouts.redirect_timeout.clone()) { return Ok(()); } else { @@ -362,7 +366,7 @@ impl VentedServer { /// then establishing an encrypted connection fn handle_connection(context: ServerConnectionContext, stream: TcpStream) -> VentedResult<()> { let event_handler = Arc::clone(&context.event_handler); - stream.set_write_timeout(Some(Duration::from_secs(CONNECTION_TIMEOUT_SECONDS)))?; + stream.set_write_timeout(Some(context.timeouts.send_timeout))?; log::trace!( "Received connection from {}", stream.peer_addr().expect("Failed to get peer address") @@ -451,7 +455,7 @@ impl VentedServer { address: String, ) -> VentedResult { let stream = TcpStream::connect(address)?; - stream.set_write_timeout(Some(Duration::from_secs(CONNECTION_TIMEOUT_SECONDS)))?; + stream.set_write_timeout(Some(context.timeouts.send_timeout))?; context.is_server = false; let stream = Self::get_crypto_stream(context, stream)?; diff --git a/tests/test_communication.rs b/tests/test_communication.rs index e2861f2..036dce6 100644 --- a/tests/test_communication.rs +++ b/tests/test_communication.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use std::thread; use std::time::Duration; use vented::event::Event; -use vented::server::data::Node; +use vented::server::data::{Node, ServerTimeouts}; use vented::server::server_events::NODE_LIST_REQUEST_EVENT; use vented::server::VentedServer; @@ -51,9 +51,30 @@ fn test_server_communication() { trusted: false, }) } - let mut server_a = VentedServer::new("A".to_string(), global_secret_a, nodes_a, 20, 100); - let mut server_b = VentedServer::new("B".to_string(), global_secret_b, nodes.clone(), 3, 100); - let server_c = VentedServer::new("C".to_string(), global_secret_c, nodes, 3, 100); + let mut server_a = VentedServer::new( + "A".to_string(), + global_secret_a, + nodes_a, + ServerTimeouts::default(), + 20, + 100, + ); + let mut server_b = VentedServer::new( + "B".to_string(), + global_secret_b, + nodes.clone(), + ServerTimeouts::default(), + 3, + 100, + ); + let server_c = VentedServer::new( + "C".to_string(), + global_secret_c, + nodes, + ServerTimeouts::default(), + 3, + 100, + ); let wg = server_a.listen("localhost:22222".to_string()); wg.wait();