Rename Future to AsyncValue and expose it

Signed-off-by: trivernis <trivernis@protonmail.com>
pull/1/head
trivernis 4 years ago
parent 52f9cc1cd1
commit 64203e5cac
Signed by: Trivernis
GPG Key ID: DFFFCC2C7A02DB45

@ -1,7 +1,7 @@
[package] [package]
name = "vented" name = "vented"
description = "Event driven encrypted tcp communicaton" description = "Event driven encrypted tcp communicaton"
version = "0.7.0" version = "0.7.1"
authors = ["trivernis <trivernis@protonmail.com>"] authors = ["trivernis <trivernis@protonmail.com>"]
edition = "2018" edition = "2018"
readme = "README.md" readme = "README.md"

@ -27,17 +27,17 @@ pub(crate) struct ServerConnectionContext {
pub known_nodes: Arc<Mutex<HashMap<String, Node>>>, pub known_nodes: Arc<Mutex<HashMap<String, Node>>>,
pub event_handler: Arc<Mutex<EventHandler>>, pub event_handler: Arc<Mutex<EventHandler>>,
pub connections: Arc<Mutex<HashMap<String, CryptoStream>>>, pub connections: Arc<Mutex<HashMap<String, CryptoStream>>>,
pub forwarded_connections: Arc<Mutex<HashMap<(String, String), Future<CryptoStream>>>>, pub forwarded_connections: Arc<Mutex<HashMap<(String, String), AsyncValue<CryptoStream>>>>,
pub listener_pool: Arc<Mutex<ScheduledThreadPool>>, pub listener_pool: Arc<Mutex<ScheduledThreadPool>>,
} }
#[derive(Clone)] #[derive(Clone)]
pub(crate) struct Future<T> { pub struct AsyncValue<T> {
value: Arc<Mutex<Option<T>>>, value: Arc<Mutex<Option<T>>>,
wg: Option<WaitGroup>, wg: Option<WaitGroup>,
} }
impl<T> Future<T> { impl<T> AsyncValue<T> {
/// Creates the future with no value /// Creates the future with no value
pub fn new() -> Self { pub fn new() -> Self {
Self { Self {
@ -63,12 +63,12 @@ impl<T> Future<T> {
} }
/// Returns the value of the future only blocking for the given timeout /// Returns the value of the future only blocking for the given timeout
pub fn get_value_with_timeout(&mut self, millis: u128) -> Option<T> { pub fn get_value_with_timeout(&mut self, timeout: Duration) -> Option<T> {
let start = Instant::now(); let start = Instant::now();
while self.value.lock().is_none() { while self.value.lock().is_none() {
thread::sleep(Duration::from_millis(1)); thread::sleep(Duration::from_millis(1));
if start.elapsed().as_millis() > millis { if start.elapsed() > timeout {
break; break;
} }
} }

@ -9,7 +9,7 @@ use crate::event::Event;
use crate::event_handler::EventHandler; use crate::event_handler::EventHandler;
use crate::result::VentedError::UnknownNode; use crate::result::VentedError::UnknownNode;
use crate::result::{VentedError, VentedResult}; use crate::result::{VentedError, VentedResult};
use crate::server::data::{Future, Node, ServerConnectionContext}; use crate::server::data::{AsyncValue, Node, ServerConnectionContext};
use crate::server::server_events::{ use crate::server::server_events::{
AuthPayload, ChallengePayload, NodeInformationPayload, RedirectPayload, VersionMismatchPayload, AuthPayload, ChallengePayload, NodeInformationPayload, RedirectPayload, VersionMismatchPayload,
ACCEPT_EVENT, AUTH_EVENT, CHALLENGE_EVENT, CONNECT_EVENT, MISMATCH_EVENT, ACCEPT_EVENT, AUTH_EVENT, CHALLENGE_EVENT, CONNECT_EVENT, MISMATCH_EVENT,
@ -22,6 +22,7 @@ use std::io::Write;
use std::iter::FromIterator; use std::iter::FromIterator;
use std::sync::Arc; use std::sync::Arc;
use std::thread; use std::thread;
use std::time::Duration;
use x25519_dalek::StaticSecret; use x25519_dalek::StaticSecret;
pub mod data; pub mod data;
@ -29,7 +30,7 @@ pub mod server_events;
pub(crate) const CRATE_VERSION: &str = env!("CARGO_PKG_VERSION"); pub(crate) const CRATE_VERSION: &str = env!("CARGO_PKG_VERSION");
type ForwardFutureVector = Arc<Mutex<HashMap<(String, String), Future<CryptoStream>>>>; type ForwardFutureVector = Arc<Mutex<HashMap<(String, String), AsyncValue<CryptoStream>>>>;
type CryptoStreamMap = Arc<Mutex<HashMap<String, CryptoStream>>>; type CryptoStreamMap = Arc<Mutex<HashMap<String, CryptoStream>>>;
/// The vented server that provides parallel handling of connections /// The vented server that provides parallel handling of connections
@ -72,7 +73,7 @@ pub struct VentedServer {
event_handler: Arc<Mutex<EventHandler>>, event_handler: Arc<Mutex<EventHandler>>,
global_secret_key: SecretKey, global_secret_key: SecretKey,
node_id: String, node_id: String,
redirect_handles: Arc<Mutex<HashMap<[u8; 16], Future<bool>>>>, redirect_handles: Arc<Mutex<HashMap<[u8; 16], AsyncValue<bool>>>>,
} }
impl VentedServer { impl VentedServer {
@ -226,10 +227,10 @@ impl VentedServer {
target.clone(), target.clone(),
event.clone().as_bytes(), event.clone().as_bytes(),
); );
let mut future = Future::new(); let mut future = AsyncValue::new();
self.redirect_handles self.redirect_handles
.lock() .lock()
.insert(payload.id, Future::clone(&future)); .insert(payload.id, AsyncValue::clone(&future));
if let Ok(stream) = self.get_connection(&node.id) { if let Ok(stream) = self.get_connection(&node.id) {
if let Err(e) = stream.send(Event::with_payload(REDIRECT_EVENT, &payload)) { if let Err(e) = stream.send(Event::with_payload(REDIRECT_EVENT, &payload)) {
@ -238,7 +239,7 @@ impl VentedServer {
} }
} }
if let Some(value) = future.get_value_with_timeout(1000) { if let Some(value) = future.get_value_with_timeout(Duration::from_secs(1)) {
if value { if value {
return Ok(()); return Ok(());
} }

Loading…
Cancel
Save