From e59e26c72f65b53202f98e58f552c1520ede467d Mon Sep 17 00:00:00 2001 From: trivernis Date: Mon, 9 Nov 2020 22:04:51 +0100 Subject: [PATCH] Fix blocked receiver thread Signed-off-by: trivernis --- Cargo.toml | 2 +- src/server/data.rs | 93 +++++++++++++++++++- src/server/mod.rs | 169 +++++++++++++++++++++++++----------- src/server/server_events.rs | 21 ++--- src/stream/manager.rs | 4 +- tests/test_communication.rs | 20 ++++- 6 files changed, 239 insertions(+), 70 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index db3e83b..a825d78 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "vented" description = "Event driven encrypted tcp communicaton" -version = "0.10.0" +version = "0.10.1" authors = ["trivernis "] edition = "2018" readme = "README.md" diff --git a/src/server/data.rs b/src/server/data.rs index 660288f..8bfa9aa 100644 --- a/src/server/data.rs +++ b/src/server/data.rs @@ -11,24 +11,111 @@ use crate::stream::cryptostream::CryptoStream; use crate::stream::manager::ConcurrentStreamManager; use crate::utils::result::VentedError; use crate::utils::sync::AsyncValue; +use std::time::{Duration, Instant}; #[derive(Clone, Debug)] pub struct Node { pub id: String, pub public_key: PublicKey, - pub address: Option, + pub addresses: Vec, pub trusted: bool, } +#[derive(Clone, Debug)] +pub struct NodeData { + inner: Node, + state: NodeState, +} + +#[derive(Clone, Debug)] +pub enum NodeState { + Alive(Instant), + Dead(Instant), + Unknown, +} + #[derive(Clone)] pub(crate) struct ServerConnectionContext { pub is_server: bool, pub node_id: String, pub global_secret: SecretKey, - pub known_nodes: Arc>>, + pub known_nodes: Arc>>, pub event_handler: Arc>, pub forwarded_connections: Arc>>>, - pub pool: Arc>, + pub sender_pool: Arc>, + pub recv_pool: Arc>, pub redirect_handles: Arc>>>, pub manager: ConcurrentStreamManager, } + +impl From for NodeData { + fn from(node: Node) -> Self { + Self { + inner: node, + state: NodeState::Unknown, + } + } +} + +impl From for Node { + fn from(other: NodeData) -> Self { + other.inner + } +} + +// how long is a node assumed to be in a state before rechecking is necessary +const NODE_STATE_TTL_SECONDS: u64 = 600; + +impl NodeData { + /// Returns the inner node data + pub fn node(&self) -> &Node { + &self.inner + } + + /// Returns a mutable reference of the inner node data + pub fn node_mut(&mut self) -> &mut Node { + &mut self.inner + } + + /// Returns the state of the node + pub fn node_state(&mut self) -> &NodeState { + let ttl = Duration::from_secs(NODE_STATE_TTL_SECONDS); + match &self.state { + NodeState::Alive(since) | NodeState::Dead(since) if since.elapsed() > ttl => { + self.state = NodeState::Unknown; + log::trace!( + "Node state of {} updated to {:?}", + self.inner.id, + self.state + ) + } + _ => {} + } + &self.state + } + + /// Sets the state of the node + pub fn set_node_state(&mut self, state: NodeState) { + self.state = state; + log::trace!( + "Node state of {} updated to {:?}", + self.inner.id, + self.state + ) + } + + /// Returns if the node is dead + pub fn is_dead(&self) -> bool { + match &self.state { + NodeState::Dead(_) => true, + _ => false, + } + } + + pub fn is_alive(&self) -> bool { + match &self.state { + NodeState::Alive(_) => true, + _ => false, + } + } +} diff --git a/src/server/mod.rs b/src/server/mod.rs index cdfcd6e..eb3bd34 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -5,7 +5,7 @@ use std::mem; use std::net::{TcpListener, TcpStream}; use std::sync::Arc; use std::thread; -use std::time::Duration; +use std::time::{Duration, Instant}; use crossbeam_utils::sync::WaitGroup; use crypto_box::{PublicKey, SecretKey}; @@ -16,21 +16,23 @@ use x25519_dalek::StaticSecret; use crate::event::Event; use crate::event_handler::EventHandler; -use crate::server::data::{Node, ServerConnectionContext}; +use crate::server::data::{Node, NodeData, NodeState, ServerConnectionContext}; use crate::server::server_events::{ AuthPayload, ChallengePayload, NodeInformationPayload, RedirectPayload, VersionMismatchPayload, ACCEPT_EVENT, AUTH_EVENT, CHALLENGE_EVENT, CONNECT_EVENT, MISMATCH_EVENT, READY_EVENT, REDIRECT_EVENT, REJECT_EVENT, }; use crate::stream::cryptostream::CryptoStream; -use crate::stream::manager::ConcurrentStreamManager; +use crate::stream::manager::{ConcurrentStreamManager, CONNECTION_TIMEOUT_SECONDS}; use crate::utils::result::{VentedError, VentedResult}; use crate::utils::sync::AsyncValue; +use std::cmp::max; pub mod data; pub mod server_events; pub(crate) const CRATE_VERSION: &str = env!("CARGO_PKG_VERSION"); +pub const PROTOCOL_VERSION: &str = "1.0"; type ForwardFutureVector = Arc>>>; @@ -47,7 +49,7 @@ type ForwardFutureVector = Arc>>, + known_nodes: Arc>>, event_handler: Arc>, global_secret_key: SecretKey, node_id: String, redirect_handles: Arc>>>, manager: ConcurrentStreamManager, - pool: Arc>, + sender_pool: Arc>, + receiver_pool: Arc>, } impl VentedServer { @@ -95,10 +98,20 @@ impl VentedServer { forwarded_connections: Arc::new(Mutex::new(HashMap::new())), global_secret_key: secret_key, known_nodes: Arc::new(Mutex::new(HashMap::from_iter( - nodes.iter().cloned().map(|node| (node.id.clone(), node)), + nodes + .iter() + .cloned() + .map(|node| (node.id.clone(), node.into())), ))), redirect_handles: Arc::new(Mutex::new(HashMap::new())), - pool: Arc::new(Mutex::new(ScheduledThreadPool::new(num_threads))), + sender_pool: Arc::new(Mutex::new(ScheduledThreadPool::new(max( + num_threads / 2, + 1, + )))), + receiver_pool: Arc::new(Mutex::new(ScheduledThreadPool::new(max( + num_threads / 2, + 1, + )))), }; server.register_events(); server.start_event_listener(); @@ -113,11 +126,16 @@ impl VentedServer { /// Returns the nodes known to the server pub fn nodes(&self) -> Vec { - self.known_nodes.lock().values().cloned().collect() + self.known_nodes + .lock() + .values() + .cloned() + .map(Node::from) + .collect() } /// Returns the actual reference to the inner node list - pub fn nodes_ref(&self) -> Arc>> { + pub fn nodes_ref(&self) -> Arc>> { Arc::clone(&self.known_nodes) } @@ -180,10 +198,11 @@ impl VentedServer { global_secret: self.global_secret_key.clone(), known_nodes: Arc::clone(&self.known_nodes), event_handler: Arc::clone(&self.event_handler), - pool: Arc::clone(&self.pool), + sender_pool: Arc::clone(&self.sender_pool), forwarded_connections: Arc::clone(&self.forwarded_connections), redirect_handles: Arc::clone(&self.redirect_handles), manager: self.manager.clone(), + recv_pool: Arc::clone(&self.receiver_pool), } } @@ -199,6 +218,9 @@ impl VentedServer { move || { mem::drop(wg); while let Ok((origin, event)) = receiver.recv() { + if let Some(node) = context.known_nodes.lock().get_mut(&origin) { + node.set_node_state(NodeState::Alive(Instant::now())); + } let responses = event_handler.lock().handle_event(event); for response in responses { @@ -220,34 +242,64 @@ impl VentedServer { event: Event, redirect: bool, ) -> AsyncValue<(), VentedError> { + log::trace!( + "Emitting: '{}' from {} to {}", + event.name, + context.node_id, + target + ); if context.manager.has_connection(target) { + log::trace!("Reusing existing connection."); context.manager.send(target, event) } else { let future = AsyncValue::new(); - context.pool.lock().execute({ + context.sender_pool.lock().execute({ let mut future = AsyncValue::clone(&future); let node_id = target.clone(); let context = context.clone(); move || { - log::trace!( - "Trying to redirect the event to a different node to be sent to target node..." - ); - if let Ok(connection) = Self::get_connection(context.clone(), &node_id) { + log::trace!("Trying to establish connection..."); + let node_state = if let Ok(connection) = + Self::get_connection(context.clone(), &node_id) + { if let Err(e) = context.manager.add_connection(connection) { future.reject(e); return; } log::trace!("Established new connection."); let result = context.manager.send(&node_id, event).get_value(); - future.result(result); + match result { + Ok(_) => { + future.resolve(()); + NodeState::Alive(Instant::now()) + } + Err(e) => { + future.reject(e); + NodeState::Dead(Instant::now()) + } + } } else if redirect { - log::trace!("Trying to send event redirected"); - let result = Self::send_event_redirected(context, &node_id, event); - future.result(result); + log::trace!("Trying to use a proxy node..."); + let result = Self::send_event_redirected(context.clone(), &node_id, event); + match result { + Ok(_) => { + future.resolve(()); + NodeState::Alive(Instant::now()) + } + Err(e) => { + future.reject(e); + NodeState::Dead(Instant::now()) + } + } } else { - future.reject(VentedError::UnreachableNode(node_id)) + log::trace!("Failed to emit event to node {}", node_id); + future.reject(VentedError::UnreachableNode(node_id.clone())); + NodeState::Dead(Instant::now()) + }; + if let Some(node) = context.known_nodes.lock().get_mut(&node_id) { + node.set_node_state(node_state); } } }); @@ -266,14 +318,14 @@ impl VentedServer { .known_nodes .lock() .values() - .filter(|node| node.address.is_some()) + .filter(|node| !node.node().addresses.is_empty() && node.is_alive()) .cloned() - .collect::>(); + .collect::>(); for node in public_nodes { let payload = RedirectPayload::new( context.node_id.clone(), - node.id.clone(), + node.node().id.clone(), target.clone(), event.clone().as_bytes(), ); @@ -285,17 +337,21 @@ impl VentedServer { if let Err(e) = Self::send_event( context.clone(), - &node.id, + &node.node().id, Event::with_payload(REDIRECT_EVENT, &payload), false, ) .get_value() { - log::error!("Failed to redirect via {}: {}", node.id, e); + log::error!("Failed to redirect via {}: {}", node.node().id, e); } - if let Some(Ok(_)) = future.get_value_with_timeout(Duration::from_secs(10)) { + if let Some(Ok(_)) = + future.get_value_with_timeout(Duration::from_secs(CONNECTION_TIMEOUT_SECONDS)) + { return Ok(()); + } else { + log::error!("Failed to redirect via {}: Timeout", node.node().id); } } @@ -306,12 +362,14 @@ impl VentedServer { /// then establishing an encrypted connection fn handle_connection(context: ServerConnectionContext, stream: TcpStream) -> VentedResult<()> { let event_handler = Arc::clone(&context.event_handler); + stream.set_read_timeout(Some(Duration::from_secs(CONNECTION_TIMEOUT_SECONDS)))?; + stream.set_write_timeout(Some(Duration::from_secs(CONNECTION_TIMEOUT_SECONDS)))?; log::trace!( "Received connection from {}", stream.peer_addr().expect("Failed to get peer address") ); - context.pool.lock().execute({ + context.recv_pool.lock().execute({ let context = context.clone(); move || { let manager = context.manager.clone(); @@ -349,15 +407,27 @@ impl VentedServer { .cloned() .ok_or(VentedError::UnknownNode(target.clone()))?; - if let Some(address) = target_node.address { - log::trace!("Connecting to known address"); - - Self::connect(context, address) - } else { - log::trace!("All direct connection attempts to {} failed", target); - - Err(VentedError::UnreachableNode(target.clone())) + log::trace!("Connecting to known addresses"); + + for address in &target_node.node().addresses { + match Self::connect(context.clone(), address.clone()) { + Ok(stream) => return Ok(stream), + Err(e) => { + log::error!("Failed to connect to node {}'s address: {}", target, e); + context + .known_nodes + .lock() + .get_mut(target) + .unwrap() + .node_mut() + .addresses + .retain(|a| a != address); + } + } } + + log::trace!("All direct connection attempts to {} failed", target); + Err(VentedError::UnreachableNode(target.clone())) } /// Establishes a crypto stream for the given stream @@ -365,9 +435,6 @@ impl VentedServer { context: ServerConnectionContext, stream: TcpStream, ) -> VentedResult { - stream.set_read_timeout(Some(Duration::from_secs(10)))?; - stream.set_write_timeout(Some(Duration::from_secs(10)))?; - let (_, stream) = VentedServer::perform_key_exchange( context.is_server, stream, @@ -385,6 +452,8 @@ impl VentedServer { address: String, ) -> VentedResult { let stream = TcpStream::connect(address)?; + stream.set_read_timeout(Some(Duration::from_secs(CONNECTION_TIMEOUT_SECONDS)))?; + stream.set_write_timeout(Some(Duration::from_secs(CONNECTION_TIMEOUT_SECONDS)))?; context.is_server = false; let stream = Self::get_crypto_stream(context, stream)?; @@ -397,7 +466,7 @@ impl VentedServer { stream: TcpStream, own_node_id: String, global_secret: SecretKey, - known_nodes: Arc>>, + known_nodes: Arc>>, ) -> VentedResult<(String, CryptoStream)> { let secret_key = SecretKey::generate(&mut rand::thread_rng()); if is_server { @@ -425,7 +494,7 @@ impl VentedServer { secret_key: &SecretKey, own_node_id: String, global_secret: SecretKey, - known_nodes: Arc>>, + known_nodes: Arc>>, ) -> VentedResult<(String, CryptoStream)> { stream.write( &Event::with_payload( @@ -433,7 +502,7 @@ impl VentedServer { &NodeInformationPayload { public_key: secret_key.public_key().to_bytes(), node_id: own_node_id, - vented_version: CRATE_VERSION.to_string(), + vented_version: PROTOCOL_VERSION.to_string(), }, ) .as_bytes(), @@ -450,11 +519,11 @@ impl VentedServer { vented_version, } = event.get_payload::().unwrap(); - if !Self::compare_version(&vented_version, CRATE_VERSION) { + if !Self::compare_version(&vented_version, PROTOCOL_VERSION) { stream.write( &Event::with_payload( MISMATCH_EVENT, - &VersionMismatchPayload::new(CRATE_VERSION, &vented_version), + &VersionMismatchPayload::new(PROTOCOL_VERSION, &vented_version), ) .as_bytes(), )?; @@ -475,7 +544,7 @@ impl VentedServer { let mut stream = CryptoStream::new(node_id.clone(), stream, &public_key, &secret_key)?; log::trace!("Authenticating recipient..."); - let key_a = Self::authenticate_other(&mut stream, node_data.public_key)?; + let key_a = Self::authenticate_other(&mut stream, node_data.node().public_key)?; log::trace!("Authenticating self..."); let key_b = Self::authenticate_self(&mut stream, StaticSecret::from(global_secret.to_bytes()))?; @@ -497,7 +566,7 @@ impl VentedServer { secret_key: &SecretKey, own_node_id: String, global_secret: SecretKey, - known_nodes: Arc>>, + known_nodes: Arc>>, ) -> VentedResult<(String, CryptoStream)> { let event = Event::from_bytes(&mut stream)?; if event.name != CONNECT_EVENT { @@ -509,11 +578,11 @@ impl VentedServer { vented_version, } = event.get_payload::().unwrap(); - if !Self::compare_version(&vented_version, CRATE_VERSION) { + if !Self::compare_version(&vented_version, PROTOCOL_VERSION) { stream.write( &Event::with_payload( MISMATCH_EVENT, - &VersionMismatchPayload::new(CRATE_VERSION, &vented_version), + &VersionMismatchPayload::new(PROTOCOL_VERSION, &vented_version), ) .as_bytes(), )?; @@ -536,7 +605,7 @@ impl VentedServer { &NodeInformationPayload { public_key: secret_key.public_key().to_bytes(), node_id: own_node_id, - vented_version: CRATE_VERSION.to_string(), + vented_version: PROTOCOL_VERSION.to_string(), }, ) .as_bytes(), @@ -549,7 +618,7 @@ impl VentedServer { let key_a = Self::authenticate_self(&mut stream, StaticSecret::from(global_secret.to_bytes()))?; log::trace!("Authenticating recipient..."); - let key_b = Self::authenticate_other(&mut stream, node_data.public_key)?; + let key_b = Self::authenticate_other(&mut stream, node_data.node().public_key)?; log::trace!("Connection fully authenticated."); let pre_secret = StaticSecret::from(secret_key.to_bytes()).diffie_hellman(&public_key); diff --git a/src/server/server_events.rs b/src/server/server_events.rs index af9b813..ea71fe3 100644 --- a/src/server/server_events.rs +++ b/src/server/server_events.rs @@ -94,7 +94,7 @@ pub struct NodeListPayload { pub struct NodeListElement { pub id: String, pub public_key: [u8; 32], - pub address: Option, + pub addresses: Vec, } impl VentedServer { @@ -122,7 +122,7 @@ impl VentedServer { }); self.on(REDIRECT_EVENT, { let manager = self.manager.clone(); - let pool = Arc::clone(&self.pool); + let pool = Arc::clone(&self.sender_pool); move |event| { let payload = event.get_payload::().ok()?; @@ -157,7 +157,7 @@ impl VentedServer { self.on(REDIRECT_REDIRECTED_EVENT, { let event_handler = Arc::clone(&self.event_handler); let manager = self.manager.clone(); - let pool = self.pool.clone(); + let pool = self.sender_pool.clone(); let known_nodes = Arc::clone(&self.known_nodes); move |event| { @@ -204,7 +204,7 @@ impl VentedServer { let mut own_nodes = node_list.lock(); let origin = event.origin?; - if !own_nodes.get(&origin)?.trusted { + if !own_nodes.get(&origin)?.node().trusted { log::warn!("Untrusted node '{}' tried to send network update!", origin); return None; } @@ -218,8 +218,9 @@ impl VentedServer { id: node.id, trusted: false, public_key: PublicKey::from(node.public_key), - address: node.address, - }, + addresses: node.addresses, + } + .into(), ); new_nodes += 1; } @@ -237,11 +238,11 @@ impl VentedServer { let nodes = node_list .lock() .values() - .filter(|node| node.id != sender_id) + .filter(|node| node.node().id != sender_id) .map(|node| NodeListElement { - id: node.id.clone(), - address: node.address.clone(), - public_key: node.public_key.to_bytes(), + id: node.node().id.clone(), + addresses: node.node().addresses.clone(), + public_key: node.node().public_key.to_bytes(), }) .collect(); diff --git a/src/stream/manager.rs b/src/stream/manager.rs index 8b0171a..d4f75a4 100644 --- a/src/stream/manager.rs +++ b/src/stream/manager.rs @@ -15,7 +15,7 @@ use crate::utils::sync::AsyncValue; use crate::WaitGroup; const MAX_ENQUEUED_EVENTS: usize = 50; -const SEND_TIMEOUT_SECONDS: u64 = 60; +pub const CONNECTION_TIMEOUT_SECONDS: u64 = 5; #[derive(Clone, Debug)] pub struct ConcurrentStreamManager { @@ -55,7 +55,7 @@ impl ConcurrentStreamManager { if let Some(emitter) = self.emitters.lock().get(target) { if let Err(_) = emitter.send_timeout( (event, value.clone()), - Duration::from_secs(SEND_TIMEOUT_SECONDS), + Duration::from_secs(CONNECTION_TIMEOUT_SECONDS), ) { value.reject(VentedError::UnreachableNode(target.clone())); } diff --git a/tests/test_communication.rs b/tests/test_communication.rs index 721427f..e2861f2 100644 --- a/tests/test_communication.rs +++ b/tests/test_communication.rs @@ -25,24 +25,33 @@ fn test_server_communication() { let nodes = vec![ Node { id: "A".to_string(), - address: Some("localhost:22222".to_string()), + addresses: vec!["localhost:22222".to_string()], public_key: global_secret_a.public_key(), trusted: true, }, Node { id: "B".to_string(), - address: None, + addresses: vec![], public_key: global_secret_b.public_key(), trusted: false, }, Node { id: "C".to_string(), - address: None, + addresses: vec![], public_key: global_secret_c.public_key(), trusted: false, }, ]; - let mut server_a = VentedServer::new("A".to_string(), global_secret_a, nodes.clone(), 2, 100); + let mut nodes_a = nodes.clone(); + for i in 0..10 { + nodes_a.push(Node { + id: format!("Node-{}", i), + addresses: vec!["192.168.178.1".to_string()], + public_key: global_secret_c.public_key(), + trusted: false, + }) + } + let mut server_a = VentedServer::new("A".to_string(), global_secret_a, nodes_a, 20, 100); let mut server_b = VentedServer::new("B".to_string(), global_secret_b, nodes.clone(), 3, 100); let server_c = VentedServer::new("C".to_string(), global_secret_c, nodes, 3, 100); let wg = server_a.listen("localhost:22222".to_string()); @@ -63,6 +72,9 @@ fn test_server_communication() { None } }); + for i in 0..10 { + server_a.emit(format!("Nodes-{}", i), Event::new("ping")); + } server_b .emit("A", Event::new(NODE_LIST_REQUEST_EVENT)) .on_success(|_| println!("Success"))