diff --git a/Cargo.toml b/Cargo.toml index 146ebc1..25d1a09 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "vented" description = "Event driven encrypted tcp communicaton" -version = "0.8.1" +version = "0.9.0" authors = ["trivernis "] edition = "2018" readme = "README.md" diff --git a/src/server/data.rs b/src/server/data.rs index eb6aadb..cd07ad2 100644 --- a/src/server/data.rs +++ b/src/server/data.rs @@ -1,5 +1,6 @@ use crate::crypto::CryptoStream; use crate::event_handler::EventHandler; +use crate::result::VentedError; use crate::WaitGroup; use crypto_box::SecretKey; use executors::crossbeam_workstealing_pool; @@ -28,43 +29,88 @@ pub(crate) struct ServerConnectionContext { pub known_nodes: Arc>>, pub event_handler: Arc>, pub connections: Arc>>, - pub forwarded_connections: Arc>>>, + pub forwarded_connections: Arc>>>, pub pool: crossbeam_workstealing_pool::ThreadPool, + pub redirect_handles: Arc>>>, } -#[derive(Clone)] -pub struct AsyncValue { +pub struct AsyncValue { value: Arc>>, + error: Arc>>, wg: Option, } -impl AsyncValue { +impl AsyncValue +where + E: std::fmt::Display, +{ /// Creates the future with no value pub fn new() -> Self { Self { value: Arc::new(Mutex::new(None)), + error: Arc::new(Mutex::new(None)), wg: Some(WaitGroup::new()), } } + /// Creates a new AsyncValue with an already resolved value + pub fn with_value(value: V) -> Self { + Self { + value: Arc::new(Mutex::new(Some(value))), + error: Arc::new(Mutex::new(None)), + wg: None, + } + } + + pub fn with_error(error: E) -> Self { + Self { + value: Arc::new(Mutex::new(None)), + error: Arc::new(Mutex::new(Some(error))), + wg: None, + } + } + /// Sets the value of the future consuming the wait group - pub fn set_value(&mut self, value: T) { + pub fn resolve(&mut self, value: V) { self.value.lock().replace(value); mem::take(&mut self.wg); } + /// Sets an error for the value + pub fn reject(&mut self, error: E) { + self.error.lock().replace(error); + mem::take(&mut self.wg); + } + + pub fn result(&mut self, result: Result) { + match result { + Ok(v) => self.resolve(v), + Err(e) => self.reject(e), + } + } + + pub fn block_unwrap(&mut self) -> V { + match self.get_value() { + Ok(v) => v, + Err(e) => panic!("Unwrap on Err value: {}", e), + } + } + /// Returns the value of the future after it has been set. /// This call blocks - #[allow(dead_code)] - pub fn get_value(&mut self) -> T { + pub fn get_value(&mut self) -> Result { if let Some(wg) = mem::take(&mut self.wg) { wg.wait(); } - self.value.lock().take().unwrap() + if let Some(err) = self.error.lock().take() { + Err(err) + } else { + Ok(self.value.lock().take().unwrap()) + } } /// Returns the value of the future only blocking for the given timeout - pub fn get_value_with_timeout(&mut self, timeout: Duration) -> Option { + pub fn get_value_with_timeout(&mut self, timeout: Duration) -> Option> { let start = Instant::now(); while self.value.lock().is_none() { @@ -73,6 +119,22 @@ impl AsyncValue { break; } } - self.value.lock().take() + if let Some(err) = self.error.lock().take() { + Some(Err(err)) + } else if let Some(value) = self.value.lock().take() { + Some(Ok(value)) + } else { + None + } + } +} + +impl Clone for AsyncValue { + fn clone(&self) -> Self { + Self { + value: Arc::clone(&self.value), + error: Arc::clone(&self.error), + wg: self.wg.clone(), + } } } diff --git a/src/server/mod.rs b/src/server/mod.rs index 37fd231..e417556 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -31,7 +31,7 @@ pub mod server_events; pub(crate) const CRATE_VERSION: &str = env!("CARGO_PKG_VERSION"); -type ForwardFutureVector = Arc>>>; +type ForwardFutureVector = Arc>>>; type CryptoStreamMap = Arc>>; /// The vented server that provides parallel handling of connections @@ -64,7 +64,7 @@ type CryptoStreamMap = Arc>>; /// /// None // the return value is the response event Option /// }); -/// assert!(server.emit("B".to_string(), Event::new("ping".to_string())).is_err()) // this won't work without a known node B +/// assert!(server.emit("B".to_string(), Event::new("ping".to_string())).get_value().is_err()) // this won't work without a known node B /// ``` pub struct VentedServer { connections: CryptoStreamMap, @@ -74,7 +74,7 @@ pub struct VentedServer { event_handler: Arc>, global_secret_key: SecretKey, node_id: String, - redirect_handles: Arc>>>, + redirect_handles: Arc>>>, } impl VentedServer { @@ -122,22 +122,34 @@ impl VentedServer { /// Emits an event to the specified Node /// The actual writing is done in a separate thread from the thread pool. /// With the returned wait group one can wait for the event to be written. - pub fn emit(&self, node_id: String, event: Event) -> VentedResult<()> { - if let Ok(stream) = self.get_connection(&node_id) { - if let Err(e) = stream.send(event) { - log::error!("Failed to send event: {}", e); - self.connections.lock().remove(stream.receiver_node()); + pub fn emit(&self, node_id: String, event: Event) -> AsyncValue<(), VentedError> { + let future = AsyncValue::new(); - return Err(e); + self.pool.execute({ + let mut future = AsyncValue::clone(&future); + let context = self.get_server_context(); + move || { + + if let Ok(stream) = Self::get_connection(context.clone(), &node_id) { + if let Err(e) = stream.send(event) { + log::error!("Failed to send event: {}", e); + context.connections.lock().remove(stream.receiver_node()); + + future.reject(e); + } else { + future.resolve(()); + } + } else { + log::trace!( + "Trying to redirect the event to a different node to be sent to target node..." + ); + let result = Self::send_event_redirected(context.clone(), node_id, event); + future.result(result); + } } - } else { - log::trace!( - "Trying to redirect the event to a different node to be sent to target node..." - ); - self.send_event_redirected(node_id, event)?; - } + }); - Ok(()) + future } /// Adds a handler for the given event. @@ -195,12 +207,17 @@ impl VentedServer { event_handler: Arc::clone(&self.event_handler), pool: self.pool.clone(), forwarded_connections: Arc::clone(&self.forwarded_connections), + redirect_handles: Arc::clone(&self.redirect_handles), } } /// Tries to send an event redirected by emitting a redirect event to all public nodes - fn send_event_redirected(&self, target: String, event: Event) -> VentedResult<()> { - let public_nodes = self + fn send_event_redirected( + context: ServerConnectionContext, + target: String, + event: Event, + ) -> VentedResult<()> { + let public_nodes = context .known_nodes .lock() .values() @@ -209,27 +226,26 @@ impl VentedServer { .collect::>(); for node in public_nodes { let payload = RedirectPayload::new( - self.node_id.clone(), + context.node_id.clone(), node.id.clone(), target.clone(), event.clone().as_bytes(), ); let mut future = AsyncValue::new(); - self.redirect_handles + context + .redirect_handles .lock() .insert(payload.id, AsyncValue::clone(&future)); - if let Ok(stream) = self.get_connection(&node.id) { + if let Ok(stream) = Self::get_connection(context.clone(), &node.id) { if let Err(e) = stream.send(Event::with_payload(REDIRECT_EVENT, &payload)) { log::error!("Failed to send event: {}", e); - self.connections.lock().remove(stream.receiver_node()); + context.connections.lock().remove(stream.receiver_node()); } } - if let Some(value) = future.get_value_with_timeout(Duration::from_secs(1)) { - if value { - return Ok(()); - } + if let Some(Ok(_)) = future.get_value_with_timeout(Duration::from_secs(1)) { + return Ok(()); } } @@ -286,33 +302,34 @@ impl VentedServer { /// Takes three attempts to retrieve a connection for the given node. /// First it tries to use the already established connection stored in the shared connections vector. /// If that fails it tries to establish a new connection to the node by using the known address - fn get_connection(&self, target: &String) -> VentedResult { + fn get_connection( + context: ServerConnectionContext, + target: &String, + ) -> VentedResult { log::trace!("Trying to connect to {}", target); - if let Some(stream) = self.connections.lock().get(target) { + + if let Some(stream) = context.connections.lock().get(target) { log::trace!("Reusing existing connection."); + return Ok(CryptoStream::clone(stream)); } - let target_node = { - self.known_nodes - .lock() - .get(target) - .cloned() - .ok_or(VentedError::UnknownNode(target.clone()))? - }; + let target_node = context + .known_nodes + .lock() + .get(target) + .cloned() + .ok_or(VentedError::UnknownNode(target.clone()))?; + if let Some(address) = target_node.address { log::trace!("Connecting to known address"); - match self.connect(address) { - Ok(stream) => { - return Ok(stream); - } - Err(e) => log::error!("Failed to connect to node '{}': {}", target, e), - } - } - log::trace!("All direct connection attempts to {} failed", target); + Self::connect(context, address) + } else { + log::trace!("All direct connection attempts to {} failed", target); - Err(VentedError::UnreachableNode(target.clone())) + Err(VentedError::UnreachableNode(target.clone())) + } } /// Establishes a crypto stream for the given stream @@ -337,17 +354,20 @@ impl VentedServer { } /// Connects to the given address as a tcp client - fn connect(&self, address: String) -> VentedResult { + fn connect( + mut context: ServerConnectionContext, + address: String, + ) -> VentedResult { let stream = TcpStream::connect(address)?; - let mut context = self.get_server_context(); context.is_server = false; let connections = Arc::clone(&context.connections); - let stream = Self::get_crypto_stream(context.clone(), stream)?; + let pool = context.pool.clone(); + let event_handler = Arc::clone(&context.event_handler); + let stream = Self::get_crypto_stream(context, stream)?; - self.pool.execute({ + pool.execute({ let stream = CryptoStream::clone(&stream); - let event_handler = Arc::clone(&self.event_handler); move || { event_handler.lock().handle_event(Event::new(READY_EVENT)); diff --git a/src/server/server_events.rs b/src/server/server_events.rs index 7d55462..437299e 100644 --- a/src/server/server_events.rs +++ b/src/server/server_events.rs @@ -1,4 +1,5 @@ use crate::event::Event; +use crate::result::VentedError; use crate::server::data::Node; use crate::server::VentedServer; use executors::Executor; @@ -103,7 +104,7 @@ impl VentedServer { move |event| { let payload = event.get_payload::().ok()?; let mut future = redirect_handles.lock().remove(&payload.id)?; - future.set_value(true); + future.resolve(()); None } @@ -113,7 +114,7 @@ impl VentedServer { move |event| { let payload = event.get_payload::().ok()?; let mut future = redirect_handles.lock().remove(&payload.id)?; - future.set_value(false); + future.reject(VentedError::Rejected); None } diff --git a/tests/test_communication.rs b/tests/test_communication.rs index 7541784..c839d68 100644 --- a/tests/test_communication.rs +++ b/tests/test_communication.rs @@ -91,21 +91,21 @@ fn test_server_communication() { }); server_b .emit("A".to_string(), Event::new(NODE_LIST_REQUEST_EVENT)) - .unwrap(); + .block_unwrap(); server_c .emit("A".to_string(), Event::new("ping".to_string())) - .unwrap(); + .block_unwrap(); for _ in 0..9 { server_b .emit("A".to_string(), Event::new("ping".to_string())) - .unwrap(); + .block_unwrap(); } server_a .emit("B".to_string(), Event::new("pong".to_string())) - .unwrap(); + .block_unwrap(); server_b .emit("C".to_string(), Event::new("ping".to_string())) - .unwrap(); + .block_unwrap(); // wait one second to make sure the servers were able to process the events for _ in 0..100 {