Add error and success callback to AsyncValue

Signed-off-by: trivernis <trivernis@protonmail.com>
pull/1/head
trivernis 4 years ago
parent d3d6d0baaf
commit 9137eeb673
Signed by: Trivernis
GPG Key ID: DFFFCC2C7A02DB45

@ -1,7 +1,7 @@
[package] [package]
name = "vented" name = "vented"
description = "Event driven encrypted tcp communicaton" description = "Event driven encrypted tcp communicaton"
version = "0.9.0" version = "0.9.1"
authors = ["trivernis <trivernis@protonmail.com>"] authors = ["trivernis <trivernis@protonmail.com>"]
edition = "2018" edition = "2018"
readme = "README.md" readme = "README.md"

@ -10,8 +10,8 @@ use sha2::Digest;
use typenum::U24; use typenum::U24;
use crate::event::Event; use crate::event::Event;
use crate::result::VentedResult;
use crate::utils::result::VentedResult;
use crypto_box::ChaChaBox; use crypto_box::ChaChaBox;
pub use crypto_box::PublicKey; pub use crypto_box::PublicKey;
pub use crypto_box::SecretKey; pub use crypto_box::SecretKey;

@ -1,11 +1,10 @@
use std::io::Read; use std::io::Read;
use crate::utils::result::{VentedError, VentedResult};
use byteorder::{BigEndian, ByteOrder, ReadBytesExt}; use byteorder::{BigEndian, ByteOrder, ReadBytesExt};
use serde::de::DeserializeOwned; use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use crate::result::{VentedError, VentedResult};
pub trait GenericEvent {} pub trait GenericEvent {}
#[cfg(test)] #[cfg(test)]

@ -1,7 +1,7 @@
pub mod crypto; pub mod crypto;
pub mod event; pub mod event;
pub mod event_handler; pub mod event_handler;
pub mod result;
pub mod server; pub mod server;
pub mod utils;
pub use crossbeam_utils::sync::WaitGroup; pub use crossbeam_utils::sync::WaitGroup;

@ -1,16 +1,13 @@
use crate::crypto::CryptoStream; use crate::crypto::CryptoStream;
use crate::event_handler::EventHandler; use crate::event_handler::EventHandler;
use crate::result::VentedError; use crate::utils::result::VentedError;
use crate::WaitGroup; use crate::utils::sync::AsyncValue;
use crypto_box::SecretKey; use crypto_box::SecretKey;
use executors::crossbeam_workstealing_pool; use executors::crossbeam_workstealing_pool;
use executors::parker::DynParker; use executors::parker::DynParker;
use parking_lot::Mutex; use parking_lot::Mutex;
use std::collections::HashMap; use std::collections::HashMap;
use std::mem;
use std::sync::Arc; use std::sync::Arc;
use std::thread;
use std::time::{Duration, Instant};
use x25519_dalek::PublicKey; use x25519_dalek::PublicKey;
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
@ -33,108 +30,3 @@ pub(crate) struct ServerConnectionContext {
pub pool: crossbeam_workstealing_pool::ThreadPool<DynParker>, pub pool: crossbeam_workstealing_pool::ThreadPool<DynParker>,
pub redirect_handles: Arc<Mutex<HashMap<[u8; 16], AsyncValue<(), VentedError>>>>, pub redirect_handles: Arc<Mutex<HashMap<[u8; 16], AsyncValue<(), VentedError>>>>,
} }
pub struct AsyncValue<T, E> {
value: Arc<Mutex<Option<T>>>,
error: Arc<Mutex<Option<E>>>,
wg: Option<WaitGroup>,
}
impl<V, E> AsyncValue<V, E>
where
E: std::fmt::Display,
{
/// Creates the future with no value
pub fn new() -> Self {
Self {
value: Arc::new(Mutex::new(None)),
error: Arc::new(Mutex::new(None)),
wg: Some(WaitGroup::new()),
}
}
/// Creates a new AsyncValue with an already resolved value
pub fn with_value(value: V) -> Self {
Self {
value: Arc::new(Mutex::new(Some(value))),
error: Arc::new(Mutex::new(None)),
wg: None,
}
}
pub fn with_error(error: E) -> Self {
Self {
value: Arc::new(Mutex::new(None)),
error: Arc::new(Mutex::new(Some(error))),
wg: None,
}
}
/// Sets the value of the future consuming the wait group
pub fn resolve(&mut self, value: V) {
self.value.lock().replace(value);
mem::take(&mut self.wg);
}
/// Sets an error for the value
pub fn reject(&mut self, error: E) {
self.error.lock().replace(error);
mem::take(&mut self.wg);
}
pub fn result(&mut self, result: Result<V, E>) {
match result {
Ok(v) => self.resolve(v),
Err(e) => self.reject(e),
}
}
pub fn block_unwrap(&mut self) -> V {
match self.get_value() {
Ok(v) => v,
Err(e) => panic!("Unwrap on Err value: {}", e),
}
}
/// Returns the value of the future after it has been set.
/// This call blocks
pub fn get_value(&mut self) -> Result<V, E> {
if let Some(wg) = mem::take(&mut self.wg) {
wg.wait();
}
if let Some(err) = self.error.lock().take() {
Err(err)
} else {
Ok(self.value.lock().take().unwrap())
}
}
/// Returns the value of the future only blocking for the given timeout
pub fn get_value_with_timeout(&mut self, timeout: Duration) -> Option<Result<V, E>> {
let start = Instant::now();
while self.value.lock().is_none() {
thread::sleep(Duration::from_millis(1));
if start.elapsed() > timeout {
break;
}
}
if let Some(err) = self.error.lock().take() {
Some(Err(err))
} else if let Some(value) = self.value.lock().take() {
Some(Ok(value))
} else {
None
}
}
}
impl<T, E> Clone for AsyncValue<T, E> {
fn clone(&self) -> Self {
Self {
value: Arc::clone(&self.value),
error: Arc::clone(&self.error),
wg: self.wg.clone(),
}
}
}

@ -7,14 +7,14 @@ use executors::{crossbeam_workstealing_pool, Executor};
use crate::crypto::CryptoStream; use crate::crypto::CryptoStream;
use crate::event::Event; use crate::event::Event;
use crate::event_handler::EventHandler; use crate::event_handler::EventHandler;
use crate::result::VentedError::UnknownNode; use crate::server::data::{Node, ServerConnectionContext};
use crate::result::{VentedError, VentedResult};
use crate::server::data::{AsyncValue, Node, ServerConnectionContext};
use crate::server::server_events::{ use crate::server::server_events::{
AuthPayload, ChallengePayload, NodeInformationPayload, RedirectPayload, VersionMismatchPayload, AuthPayload, ChallengePayload, NodeInformationPayload, RedirectPayload, VersionMismatchPayload,
ACCEPT_EVENT, AUTH_EVENT, CHALLENGE_EVENT, CONNECT_EVENT, MISMATCH_EVENT, READY_EVENT, ACCEPT_EVENT, AUTH_EVENT, CHALLENGE_EVENT, CONNECT_EVENT, MISMATCH_EVENT, READY_EVENT,
REDIRECT_EVENT, REJECT_EVENT, REDIRECT_EVENT, REJECT_EVENT,
}; };
use crate::utils::result::{VentedError, VentedResult};
use crate::utils::sync::AsyncValue;
use crossbeam_utils::sync::WaitGroup; use crossbeam_utils::sync::WaitGroup;
use executors::parker::DynParker; use executors::parker::DynParker;
use parking_lot::Mutex; use parking_lot::Mutex;
@ -459,7 +459,7 @@ impl VentedServer {
} else { } else {
stream.write(&Event::new(REJECT_EVENT).as_bytes())?; stream.write(&Event::new(REJECT_EVENT).as_bytes())?;
stream.flush()?; stream.flush()?;
return Err(UnknownNode(node_id)); return Err(VentedError::UnknownNode(node_id));
}; };
let mut stream = CryptoStream::new(node_id.clone(), stream, &public_key, &secret_key)?; let mut stream = CryptoStream::new(node_id.clone(), stream, &public_key, &secret_key)?;
@ -517,7 +517,7 @@ impl VentedServer {
} else { } else {
stream.write(&Event::new(REJECT_EVENT).as_bytes())?; stream.write(&Event::new(REJECT_EVENT).as_bytes())?;
stream.flush()?; stream.flush()?;
return Err(UnknownNode(node_id)); return Err(VentedError::UnknownNode(node_id));
}; };
stream.write( stream.write(

@ -1,7 +1,7 @@
use crate::event::Event; use crate::event::Event;
use crate::result::VentedError;
use crate::server::data::Node; use crate::server::data::Node;
use crate::server::VentedServer; use crate::server::VentedServer;
use crate::utils::result::VentedError;
use executors::Executor; use executors::Executor;
use rand::{thread_rng, RngCore}; use rand::{thread_rng, RngCore};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};

@ -0,0 +1,2 @@
pub mod result;
pub mod sync;

@ -0,0 +1,144 @@
use crate::WaitGroup;
use parking_lot::Mutex;
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::{mem, thread};
pub struct AsyncValue<V, E> {
value: Arc<Mutex<Option<V>>>,
error: Arc<Mutex<Option<E>>>,
wg: Option<WaitGroup>,
err_cb: Arc<Mutex<Option<Box<dyn FnOnce(&E) -> () + Send + Sync>>>>,
ok_cb: Arc<Mutex<Option<Box<dyn FnOnce(&V) -> () + Send + Sync>>>>,
}
impl<V, E> AsyncValue<V, E>
where
E: std::fmt::Display,
{
/// Creates the future with no value
pub fn new() -> Self {
Self {
value: Arc::new(Mutex::new(None)),
error: Arc::new(Mutex::new(None)),
wg: Some(WaitGroup::new()),
err_cb: Arc::new(Mutex::new(None)),
ok_cb: Arc::new(Mutex::new(None)),
}
}
/// Creates a new AsyncValue with an already resolved value
pub fn with_value(value: V) -> Self {
Self {
value: Arc::new(Mutex::new(Some(value))),
error: Arc::new(Mutex::new(None)),
wg: None,
err_cb: Arc::new(Mutex::new(None)),
ok_cb: Arc::new(Mutex::new(None)),
}
}
pub fn with_error(error: E) -> Self {
Self {
value: Arc::new(Mutex::new(None)),
error: Arc::new(Mutex::new(Some(error))),
wg: None,
err_cb: Arc::new(Mutex::new(None)),
ok_cb: Arc::new(Mutex::new(None)),
}
}
pub fn on_error<F>(&mut self, cb: F) -> &mut Self
where
F: FnOnce(&E) -> () + Send + Sync + 'static,
{
self.err_cb.lock().replace(Box::new(cb));
self
}
pub fn on_success<F>(&mut self, cb: F) -> &mut Self
where
F: FnOnce(&V) -> () + Send + Sync + 'static,
{
self.ok_cb.lock().replace(Box::new(cb));
self
}
/// Sets the value of the future consuming the wait group
pub fn resolve(&mut self, value: V) {
if let Some(cb) = self.ok_cb.lock().take() {
cb(&value)
}
self.value.lock().replace(value);
mem::take(&mut self.wg);
}
/// Sets an error for the value
pub fn reject(&mut self, error: E) {
if let Some(cb) = self.err_cb.lock().take() {
cb(&error)
}
self.error.lock().replace(error);
mem::take(&mut self.wg);
}
pub fn result(&mut self, result: Result<V, E>) {
match result {
Ok(v) => self.resolve(v),
Err(e) => self.reject(e),
}
}
pub fn block_unwrap(&mut self) -> V {
match self.get_value() {
Ok(v) => v,
Err(e) => panic!("Unwrap on Err value: {}", e),
}
}
/// Returns the value of the future after it has been set.
/// This call blocks
pub fn get_value(&mut self) -> Result<V, E> {
if let Some(wg) = mem::take(&mut self.wg) {
wg.wait();
}
if let Some(err) = self.error.lock().take() {
Err(err)
} else {
Ok(self.value.lock().take().unwrap())
}
}
/// Returns the value of the future only blocking for the given timeout
pub fn get_value_with_timeout(&mut self, timeout: Duration) -> Option<Result<V, E>> {
let start = Instant::now();
while self.value.lock().is_none() {
thread::sleep(Duration::from_millis(1));
if start.elapsed() > timeout {
break;
}
}
if let Some(err) = self.error.lock().take() {
Some(Err(err))
} else if let Some(value) = self.value.lock().take() {
Some(Ok(value))
} else {
None
}
}
}
impl<T, E> Clone for AsyncValue<T, E> {
fn clone(&self) -> Self {
Self {
value: Arc::clone(&self.value),
error: Arc::clone(&self.error),
wg: self.wg.clone(),
err_cb: Arc::clone(&self.err_cb),
ok_cb: Arc::clone(&self.ok_cb),
}
}
}

@ -91,6 +91,7 @@ fn test_server_communication() {
}); });
server_b server_b
.emit("A".to_string(), Event::new(NODE_LIST_REQUEST_EVENT)) .emit("A".to_string(), Event::new(NODE_LIST_REQUEST_EVENT))
.on_success(|_| println!("Success"))
.block_unwrap(); .block_unwrap();
server_c server_c
.emit("A".to_string(), Event::new("ping".to_string())) .emit("A".to_string(), Event::new("ping".to_string()))

Loading…
Cancel
Save