From 9137eeb67310810bf9afbca532bdc60661231733 Mon Sep 17 00:00:00 2001 From: trivernis Date: Mon, 9 Nov 2020 12:21:53 +0100 Subject: [PATCH] Add error and success callback to AsyncValue Signed-off-by: trivernis --- Cargo.toml | 2 +- src/crypto/mod.rs | 2 +- src/event/mod.rs | 3 +- src/lib.rs | 2 +- src/server/data.rs | 112 +--------------------------- src/server/mod.rs | 10 +-- src/server/server_events.rs | 2 +- src/utils/mod.rs | 2 + src/{ => utils}/result.rs | 0 src/utils/sync.rs | 144 ++++++++++++++++++++++++++++++++++++ tests/test_communication.rs | 1 + 11 files changed, 159 insertions(+), 121 deletions(-) create mode 100644 src/utils/mod.rs rename src/{ => utils}/result.rs (100%) create mode 100644 src/utils/sync.rs diff --git a/Cargo.toml b/Cargo.toml index 25d1a09..f2f39d0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "vented" description = "Event driven encrypted tcp communicaton" -version = "0.9.0" +version = "0.9.1" authors = ["trivernis "] edition = "2018" readme = "README.md" diff --git a/src/crypto/mod.rs b/src/crypto/mod.rs index 323f8b4..520868e 100644 --- a/src/crypto/mod.rs +++ b/src/crypto/mod.rs @@ -10,8 +10,8 @@ use sha2::Digest; use typenum::U24; use crate::event::Event; -use crate::result::VentedResult; +use crate::utils::result::VentedResult; use crypto_box::ChaChaBox; pub use crypto_box::PublicKey; pub use crypto_box::SecretKey; diff --git a/src/event/mod.rs b/src/event/mod.rs index 76251c7..a9ea2ac 100644 --- a/src/event/mod.rs +++ b/src/event/mod.rs @@ -1,11 +1,10 @@ use std::io::Read; +use crate::utils::result::{VentedError, VentedResult}; use byteorder::{BigEndian, ByteOrder, ReadBytesExt}; use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; -use crate::result::{VentedError, VentedResult}; - pub trait GenericEvent {} #[cfg(test)] diff --git a/src/lib.rs b/src/lib.rs index b2f7568..f7a3996 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,7 +1,7 @@ pub mod crypto; pub mod event; pub mod event_handler; -pub mod result; pub mod server; +pub mod utils; pub use crossbeam_utils::sync::WaitGroup; diff --git a/src/server/data.rs b/src/server/data.rs index cd07ad2..a4650c2 100644 --- a/src/server/data.rs +++ b/src/server/data.rs @@ -1,16 +1,13 @@ use crate::crypto::CryptoStream; use crate::event_handler::EventHandler; -use crate::result::VentedError; -use crate::WaitGroup; +use crate::utils::result::VentedError; +use crate::utils::sync::AsyncValue; use crypto_box::SecretKey; use executors::crossbeam_workstealing_pool; use executors::parker::DynParker; use parking_lot::Mutex; use std::collections::HashMap; -use std::mem; use std::sync::Arc; -use std::thread; -use std::time::{Duration, Instant}; use x25519_dalek::PublicKey; #[derive(Clone, Debug)] @@ -33,108 +30,3 @@ pub(crate) struct ServerConnectionContext { pub pool: crossbeam_workstealing_pool::ThreadPool, pub redirect_handles: Arc>>>, } - -pub struct AsyncValue { - value: Arc>>, - error: Arc>>, - wg: Option, -} - -impl AsyncValue -where - E: std::fmt::Display, -{ - /// Creates the future with no value - pub fn new() -> Self { - Self { - value: Arc::new(Mutex::new(None)), - error: Arc::new(Mutex::new(None)), - wg: Some(WaitGroup::new()), - } - } - - /// Creates a new AsyncValue with an already resolved value - pub fn with_value(value: V) -> Self { - Self { - value: Arc::new(Mutex::new(Some(value))), - error: Arc::new(Mutex::new(None)), - wg: None, - } - } - - pub fn with_error(error: E) -> Self { - Self { - value: Arc::new(Mutex::new(None)), - error: Arc::new(Mutex::new(Some(error))), - wg: None, - } - } - - /// Sets the value of the future consuming the wait group - pub fn resolve(&mut self, value: V) { - self.value.lock().replace(value); - mem::take(&mut self.wg); - } - - /// Sets an error for the value - pub fn reject(&mut self, error: E) { - self.error.lock().replace(error); - mem::take(&mut self.wg); - } - - pub fn result(&mut self, result: Result) { - match result { - Ok(v) => self.resolve(v), - Err(e) => self.reject(e), - } - } - - pub fn block_unwrap(&mut self) -> V { - match self.get_value() { - Ok(v) => v, - Err(e) => panic!("Unwrap on Err value: {}", e), - } - } - - /// Returns the value of the future after it has been set. - /// This call blocks - pub fn get_value(&mut self) -> Result { - if let Some(wg) = mem::take(&mut self.wg) { - wg.wait(); - } - if let Some(err) = self.error.lock().take() { - Err(err) - } else { - Ok(self.value.lock().take().unwrap()) - } - } - - /// Returns the value of the future only blocking for the given timeout - pub fn get_value_with_timeout(&mut self, timeout: Duration) -> Option> { - let start = Instant::now(); - - while self.value.lock().is_none() { - thread::sleep(Duration::from_millis(1)); - if start.elapsed() > timeout { - break; - } - } - if let Some(err) = self.error.lock().take() { - Some(Err(err)) - } else if let Some(value) = self.value.lock().take() { - Some(Ok(value)) - } else { - None - } - } -} - -impl Clone for AsyncValue { - fn clone(&self) -> Self { - Self { - value: Arc::clone(&self.value), - error: Arc::clone(&self.error), - wg: self.wg.clone(), - } - } -} diff --git a/src/server/mod.rs b/src/server/mod.rs index e417556..90f6fd7 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -7,14 +7,14 @@ use executors::{crossbeam_workstealing_pool, Executor}; use crate::crypto::CryptoStream; use crate::event::Event; use crate::event_handler::EventHandler; -use crate::result::VentedError::UnknownNode; -use crate::result::{VentedError, VentedResult}; -use crate::server::data::{AsyncValue, Node, ServerConnectionContext}; +use crate::server::data::{Node, ServerConnectionContext}; use crate::server::server_events::{ AuthPayload, ChallengePayload, NodeInformationPayload, RedirectPayload, VersionMismatchPayload, ACCEPT_EVENT, AUTH_EVENT, CHALLENGE_EVENT, CONNECT_EVENT, MISMATCH_EVENT, READY_EVENT, REDIRECT_EVENT, REJECT_EVENT, }; +use crate::utils::result::{VentedError, VentedResult}; +use crate::utils::sync::AsyncValue; use crossbeam_utils::sync::WaitGroup; use executors::parker::DynParker; use parking_lot::Mutex; @@ -459,7 +459,7 @@ impl VentedServer { } else { stream.write(&Event::new(REJECT_EVENT).as_bytes())?; stream.flush()?; - return Err(UnknownNode(node_id)); + return Err(VentedError::UnknownNode(node_id)); }; let mut stream = CryptoStream::new(node_id.clone(), stream, &public_key, &secret_key)?; @@ -517,7 +517,7 @@ impl VentedServer { } else { stream.write(&Event::new(REJECT_EVENT).as_bytes())?; stream.flush()?; - return Err(UnknownNode(node_id)); + return Err(VentedError::UnknownNode(node_id)); }; stream.write( diff --git a/src/server/server_events.rs b/src/server/server_events.rs index 437299e..4f5f569 100644 --- a/src/server/server_events.rs +++ b/src/server/server_events.rs @@ -1,7 +1,7 @@ use crate::event::Event; -use crate::result::VentedError; use crate::server::data::Node; use crate::server::VentedServer; +use crate::utils::result::VentedError; use executors::Executor; use rand::{thread_rng, RngCore}; use serde::{Deserialize, Serialize}; diff --git a/src/utils/mod.rs b/src/utils/mod.rs new file mode 100644 index 0000000..54c6d3d --- /dev/null +++ b/src/utils/mod.rs @@ -0,0 +1,2 @@ +pub mod result; +pub mod sync; diff --git a/src/result.rs b/src/utils/result.rs similarity index 100% rename from src/result.rs rename to src/utils/result.rs diff --git a/src/utils/sync.rs b/src/utils/sync.rs new file mode 100644 index 0000000..a77399b --- /dev/null +++ b/src/utils/sync.rs @@ -0,0 +1,144 @@ +use crate::WaitGroup; +use parking_lot::Mutex; +use std::sync::Arc; +use std::time::{Duration, Instant}; +use std::{mem, thread}; + +pub struct AsyncValue { + value: Arc>>, + error: Arc>>, + wg: Option, + err_cb: Arc () + Send + Sync>>>>, + ok_cb: Arc () + Send + Sync>>>>, +} + +impl AsyncValue +where + E: std::fmt::Display, +{ + /// Creates the future with no value + pub fn new() -> Self { + Self { + value: Arc::new(Mutex::new(None)), + error: Arc::new(Mutex::new(None)), + wg: Some(WaitGroup::new()), + err_cb: Arc::new(Mutex::new(None)), + ok_cb: Arc::new(Mutex::new(None)), + } + } + + /// Creates a new AsyncValue with an already resolved value + pub fn with_value(value: V) -> Self { + Self { + value: Arc::new(Mutex::new(Some(value))), + error: Arc::new(Mutex::new(None)), + wg: None, + err_cb: Arc::new(Mutex::new(None)), + ok_cb: Arc::new(Mutex::new(None)), + } + } + + pub fn with_error(error: E) -> Self { + Self { + value: Arc::new(Mutex::new(None)), + error: Arc::new(Mutex::new(Some(error))), + wg: None, + err_cb: Arc::new(Mutex::new(None)), + ok_cb: Arc::new(Mutex::new(None)), + } + } + + pub fn on_error(&mut self, cb: F) -> &mut Self + where + F: FnOnce(&E) -> () + Send + Sync + 'static, + { + self.err_cb.lock().replace(Box::new(cb)); + + self + } + + pub fn on_success(&mut self, cb: F) -> &mut Self + where + F: FnOnce(&V) -> () + Send + Sync + 'static, + { + self.ok_cb.lock().replace(Box::new(cb)); + + self + } + + /// Sets the value of the future consuming the wait group + pub fn resolve(&mut self, value: V) { + if let Some(cb) = self.ok_cb.lock().take() { + cb(&value) + } + self.value.lock().replace(value); + mem::take(&mut self.wg); + } + + /// Sets an error for the value + pub fn reject(&mut self, error: E) { + if let Some(cb) = self.err_cb.lock().take() { + cb(&error) + } + self.error.lock().replace(error); + mem::take(&mut self.wg); + } + + pub fn result(&mut self, result: Result) { + match result { + Ok(v) => self.resolve(v), + Err(e) => self.reject(e), + } + } + + pub fn block_unwrap(&mut self) -> V { + match self.get_value() { + Ok(v) => v, + Err(e) => panic!("Unwrap on Err value: {}", e), + } + } + + /// Returns the value of the future after it has been set. + /// This call blocks + pub fn get_value(&mut self) -> Result { + if let Some(wg) = mem::take(&mut self.wg) { + wg.wait(); + } + if let Some(err) = self.error.lock().take() { + Err(err) + } else { + Ok(self.value.lock().take().unwrap()) + } + } + + /// Returns the value of the future only blocking for the given timeout + pub fn get_value_with_timeout(&mut self, timeout: Duration) -> Option> { + let start = Instant::now(); + + while self.value.lock().is_none() { + thread::sleep(Duration::from_millis(1)); + if start.elapsed() > timeout { + break; + } + } + if let Some(err) = self.error.lock().take() { + Some(Err(err)) + } else if let Some(value) = self.value.lock().take() { + Some(Ok(value)) + } else { + None + } + } +} + +impl Clone for AsyncValue { + fn clone(&self) -> Self { + Self { + value: Arc::clone(&self.value), + error: Arc::clone(&self.error), + wg: self.wg.clone(), + err_cb: Arc::clone(&self.err_cb), + ok_cb: Arc::clone(&self.ok_cb), + } + } +} diff --git a/tests/test_communication.rs b/tests/test_communication.rs index c839d68..e665810 100644 --- a/tests/test_communication.rs +++ b/tests/test_communication.rs @@ -91,6 +91,7 @@ fn test_server_communication() { }); server_b .emit("A".to_string(), Event::new(NODE_LIST_REQUEST_EVENT)) + .on_success(|_| println!("Success")) .block_unwrap(); server_c .emit("A".to_string(), Event::new("ping".to_string()))