Change emit to return an async value

Signed-off-by: trivernis <trivernis@protonmail.com>
pull/1/head
trivernis 4 years ago
parent a2bed2074a
commit d3d6d0baaf
Signed by: Trivernis
GPG Key ID: DFFFCC2C7A02DB45

@ -1,7 +1,7 @@
[package] [package]
name = "vented" name = "vented"
description = "Event driven encrypted tcp communicaton" description = "Event driven encrypted tcp communicaton"
version = "0.8.1" version = "0.9.0"
authors = ["trivernis <trivernis@protonmail.com>"] authors = ["trivernis <trivernis@protonmail.com>"]
edition = "2018" edition = "2018"
readme = "README.md" readme = "README.md"

@ -1,5 +1,6 @@
use crate::crypto::CryptoStream; use crate::crypto::CryptoStream;
use crate::event_handler::EventHandler; use crate::event_handler::EventHandler;
use crate::result::VentedError;
use crate::WaitGroup; use crate::WaitGroup;
use crypto_box::SecretKey; use crypto_box::SecretKey;
use executors::crossbeam_workstealing_pool; use executors::crossbeam_workstealing_pool;
@ -28,43 +29,88 @@ pub(crate) struct ServerConnectionContext {
pub known_nodes: Arc<Mutex<HashMap<String, Node>>>, pub known_nodes: Arc<Mutex<HashMap<String, Node>>>,
pub event_handler: Arc<Mutex<EventHandler>>, pub event_handler: Arc<Mutex<EventHandler>>,
pub connections: Arc<Mutex<HashMap<String, CryptoStream>>>, pub connections: Arc<Mutex<HashMap<String, CryptoStream>>>,
pub forwarded_connections: Arc<Mutex<HashMap<(String, String), AsyncValue<CryptoStream>>>>, pub forwarded_connections: Arc<Mutex<HashMap<(String, String), AsyncValue<CryptoStream, ()>>>>,
pub pool: crossbeam_workstealing_pool::ThreadPool<DynParker>, pub pool: crossbeam_workstealing_pool::ThreadPool<DynParker>,
pub redirect_handles: Arc<Mutex<HashMap<[u8; 16], AsyncValue<(), VentedError>>>>,
} }
#[derive(Clone)] pub struct AsyncValue<T, E> {
pub struct AsyncValue<T> {
value: Arc<Mutex<Option<T>>>, value: Arc<Mutex<Option<T>>>,
error: Arc<Mutex<Option<E>>>,
wg: Option<WaitGroup>, wg: Option<WaitGroup>,
} }
impl<T> AsyncValue<T> { impl<V, E> AsyncValue<V, E>
where
E: std::fmt::Display,
{
/// Creates the future with no value /// Creates the future with no value
pub fn new() -> Self { pub fn new() -> Self {
Self { Self {
value: Arc::new(Mutex::new(None)), value: Arc::new(Mutex::new(None)),
error: Arc::new(Mutex::new(None)),
wg: Some(WaitGroup::new()), wg: Some(WaitGroup::new()),
} }
} }
/// Creates a new AsyncValue with an already resolved value
pub fn with_value(value: V) -> Self {
Self {
value: Arc::new(Mutex::new(Some(value))),
error: Arc::new(Mutex::new(None)),
wg: None,
}
}
pub fn with_error(error: E) -> Self {
Self {
value: Arc::new(Mutex::new(None)),
error: Arc::new(Mutex::new(Some(error))),
wg: None,
}
}
/// Sets the value of the future consuming the wait group /// Sets the value of the future consuming the wait group
pub fn set_value(&mut self, value: T) { pub fn resolve(&mut self, value: V) {
self.value.lock().replace(value); self.value.lock().replace(value);
mem::take(&mut self.wg); mem::take(&mut self.wg);
} }
/// Sets an error for the value
pub fn reject(&mut self, error: E) {
self.error.lock().replace(error);
mem::take(&mut self.wg);
}
pub fn result(&mut self, result: Result<V, E>) {
match result {
Ok(v) => self.resolve(v),
Err(e) => self.reject(e),
}
}
pub fn block_unwrap(&mut self) -> V {
match self.get_value() {
Ok(v) => v,
Err(e) => panic!("Unwrap on Err value: {}", e),
}
}
/// Returns the value of the future after it has been set. /// Returns the value of the future after it has been set.
/// This call blocks /// This call blocks
#[allow(dead_code)] pub fn get_value(&mut self) -> Result<V, E> {
pub fn get_value(&mut self) -> T {
if let Some(wg) = mem::take(&mut self.wg) { if let Some(wg) = mem::take(&mut self.wg) {
wg.wait(); wg.wait();
} }
self.value.lock().take().unwrap() if let Some(err) = self.error.lock().take() {
Err(err)
} else {
Ok(self.value.lock().take().unwrap())
}
} }
/// Returns the value of the future only blocking for the given timeout /// Returns the value of the future only blocking for the given timeout
pub fn get_value_with_timeout(&mut self, timeout: Duration) -> Option<T> { pub fn get_value_with_timeout(&mut self, timeout: Duration) -> Option<Result<V, E>> {
let start = Instant::now(); let start = Instant::now();
while self.value.lock().is_none() { while self.value.lock().is_none() {
@ -73,6 +119,22 @@ impl<T> AsyncValue<T> {
break; break;
} }
} }
self.value.lock().take() if let Some(err) = self.error.lock().take() {
Some(Err(err))
} else if let Some(value) = self.value.lock().take() {
Some(Ok(value))
} else {
None
}
}
}
impl<T, E> Clone for AsyncValue<T, E> {
fn clone(&self) -> Self {
Self {
value: Arc::clone(&self.value),
error: Arc::clone(&self.error),
wg: self.wg.clone(),
}
} }
} }

@ -31,7 +31,7 @@ pub mod server_events;
pub(crate) const CRATE_VERSION: &str = env!("CARGO_PKG_VERSION"); pub(crate) const CRATE_VERSION: &str = env!("CARGO_PKG_VERSION");
type ForwardFutureVector = Arc<Mutex<HashMap<(String, String), AsyncValue<CryptoStream>>>>; type ForwardFutureVector = Arc<Mutex<HashMap<(String, String), AsyncValue<CryptoStream, ()>>>>;
type CryptoStreamMap = Arc<Mutex<HashMap<String, CryptoStream>>>; type CryptoStreamMap = Arc<Mutex<HashMap<String, CryptoStream>>>;
/// The vented server that provides parallel handling of connections /// The vented server that provides parallel handling of connections
@ -64,7 +64,7 @@ type CryptoStreamMap = Arc<Mutex<HashMap<String, CryptoStream>>>;
/// ///
/// None // the return value is the response event Option<Event> /// None // the return value is the response event Option<Event>
/// }); /// });
/// assert!(server.emit("B".to_string(), Event::new("ping".to_string())).is_err()) // this won't work without a known node B /// assert!(server.emit("B".to_string(), Event::new("ping".to_string())).get_value().is_err()) // this won't work without a known node B
/// ``` /// ```
pub struct VentedServer { pub struct VentedServer {
connections: CryptoStreamMap, connections: CryptoStreamMap,
@ -74,7 +74,7 @@ pub struct VentedServer {
event_handler: Arc<Mutex<EventHandler>>, event_handler: Arc<Mutex<EventHandler>>,
global_secret_key: SecretKey, global_secret_key: SecretKey,
node_id: String, node_id: String,
redirect_handles: Arc<Mutex<HashMap<[u8; 16], AsyncValue<bool>>>>, redirect_handles: Arc<Mutex<HashMap<[u8; 16], AsyncValue<(), VentedError>>>>,
} }
impl VentedServer { impl VentedServer {
@ -122,22 +122,34 @@ impl VentedServer {
/// Emits an event to the specified Node /// Emits an event to the specified Node
/// The actual writing is done in a separate thread from the thread pool. /// The actual writing is done in a separate thread from the thread pool.
/// With the returned wait group one can wait for the event to be written. /// With the returned wait group one can wait for the event to be written.
pub fn emit(&self, node_id: String, event: Event) -> VentedResult<()> { pub fn emit(&self, node_id: String, event: Event) -> AsyncValue<(), VentedError> {
if let Ok(stream) = self.get_connection(&node_id) { let future = AsyncValue::new();
if let Err(e) = stream.send(event) {
log::error!("Failed to send event: {}", e);
self.connections.lock().remove(stream.receiver_node());
return Err(e); self.pool.execute({
let mut future = AsyncValue::clone(&future);
let context = self.get_server_context();
move || {
if let Ok(stream) = Self::get_connection(context.clone(), &node_id) {
if let Err(e) = stream.send(event) {
log::error!("Failed to send event: {}", e);
context.connections.lock().remove(stream.receiver_node());
future.reject(e);
} else {
future.resolve(());
}
} else {
log::trace!(
"Trying to redirect the event to a different node to be sent to target node..."
);
let result = Self::send_event_redirected(context.clone(), node_id, event);
future.result(result);
}
} }
} else { });
log::trace!(
"Trying to redirect the event to a different node to be sent to target node..."
);
self.send_event_redirected(node_id, event)?;
}
Ok(()) future
} }
/// Adds a handler for the given event. /// Adds a handler for the given event.
@ -195,12 +207,17 @@ impl VentedServer {
event_handler: Arc::clone(&self.event_handler), event_handler: Arc::clone(&self.event_handler),
pool: self.pool.clone(), pool: self.pool.clone(),
forwarded_connections: Arc::clone(&self.forwarded_connections), forwarded_connections: Arc::clone(&self.forwarded_connections),
redirect_handles: Arc::clone(&self.redirect_handles),
} }
} }
/// Tries to send an event redirected by emitting a redirect event to all public nodes /// Tries to send an event redirected by emitting a redirect event to all public nodes
fn send_event_redirected(&self, target: String, event: Event) -> VentedResult<()> { fn send_event_redirected(
let public_nodes = self context: ServerConnectionContext,
target: String,
event: Event,
) -> VentedResult<()> {
let public_nodes = context
.known_nodes .known_nodes
.lock() .lock()
.values() .values()
@ -209,27 +226,26 @@ impl VentedServer {
.collect::<Vec<Node>>(); .collect::<Vec<Node>>();
for node in public_nodes { for node in public_nodes {
let payload = RedirectPayload::new( let payload = RedirectPayload::new(
self.node_id.clone(), context.node_id.clone(),
node.id.clone(), node.id.clone(),
target.clone(), target.clone(),
event.clone().as_bytes(), event.clone().as_bytes(),
); );
let mut future = AsyncValue::new(); let mut future = AsyncValue::new();
self.redirect_handles context
.redirect_handles
.lock() .lock()
.insert(payload.id, AsyncValue::clone(&future)); .insert(payload.id, AsyncValue::clone(&future));
if let Ok(stream) = self.get_connection(&node.id) { if let Ok(stream) = Self::get_connection(context.clone(), &node.id) {
if let Err(e) = stream.send(Event::with_payload(REDIRECT_EVENT, &payload)) { if let Err(e) = stream.send(Event::with_payload(REDIRECT_EVENT, &payload)) {
log::error!("Failed to send event: {}", e); log::error!("Failed to send event: {}", e);
self.connections.lock().remove(stream.receiver_node()); context.connections.lock().remove(stream.receiver_node());
} }
} }
if let Some(value) = future.get_value_with_timeout(Duration::from_secs(1)) { if let Some(Ok(_)) = future.get_value_with_timeout(Duration::from_secs(1)) {
if value { return Ok(());
return Ok(());
}
} }
} }
@ -286,33 +302,34 @@ impl VentedServer {
/// Takes three attempts to retrieve a connection for the given node. /// Takes three attempts to retrieve a connection for the given node.
/// First it tries to use the already established connection stored in the shared connections vector. /// First it tries to use the already established connection stored in the shared connections vector.
/// If that fails it tries to establish a new connection to the node by using the known address /// If that fails it tries to establish a new connection to the node by using the known address
fn get_connection(&self, target: &String) -> VentedResult<CryptoStream> { fn get_connection(
context: ServerConnectionContext,
target: &String,
) -> VentedResult<CryptoStream> {
log::trace!("Trying to connect to {}", target); log::trace!("Trying to connect to {}", target);
if let Some(stream) = self.connections.lock().get(target) {
if let Some(stream) = context.connections.lock().get(target) {
log::trace!("Reusing existing connection."); log::trace!("Reusing existing connection.");
return Ok(CryptoStream::clone(stream)); return Ok(CryptoStream::clone(stream));
} }
let target_node = { let target_node = context
self.known_nodes .known_nodes
.lock() .lock()
.get(target) .get(target)
.cloned() .cloned()
.ok_or(VentedError::UnknownNode(target.clone()))? .ok_or(VentedError::UnknownNode(target.clone()))?;
};
if let Some(address) = target_node.address { if let Some(address) = target_node.address {
log::trace!("Connecting to known address"); log::trace!("Connecting to known address");
match self.connect(address) {
Ok(stream) => {
return Ok(stream);
}
Err(e) => log::error!("Failed to connect to node '{}': {}", target, e),
}
}
log::trace!("All direct connection attempts to {} failed", target); Self::connect(context, address)
} else {
log::trace!("All direct connection attempts to {} failed", target);
Err(VentedError::UnreachableNode(target.clone())) Err(VentedError::UnreachableNode(target.clone()))
}
} }
/// Establishes a crypto stream for the given stream /// Establishes a crypto stream for the given stream
@ -337,17 +354,20 @@ impl VentedServer {
} }
/// Connects to the given address as a tcp client /// Connects to the given address as a tcp client
fn connect(&self, address: String) -> VentedResult<CryptoStream> { fn connect(
mut context: ServerConnectionContext,
address: String,
) -> VentedResult<CryptoStream> {
let stream = TcpStream::connect(address)?; let stream = TcpStream::connect(address)?;
let mut context = self.get_server_context();
context.is_server = false; context.is_server = false;
let connections = Arc::clone(&context.connections); let connections = Arc::clone(&context.connections);
let stream = Self::get_crypto_stream(context.clone(), stream)?; let pool = context.pool.clone();
let event_handler = Arc::clone(&context.event_handler);
let stream = Self::get_crypto_stream(context, stream)?;
self.pool.execute({ pool.execute({
let stream = CryptoStream::clone(&stream); let stream = CryptoStream::clone(&stream);
let event_handler = Arc::clone(&self.event_handler);
move || { move || {
event_handler.lock().handle_event(Event::new(READY_EVENT)); event_handler.lock().handle_event(Event::new(READY_EVENT));

@ -1,4 +1,5 @@
use crate::event::Event; use crate::event::Event;
use crate::result::VentedError;
use crate::server::data::Node; use crate::server::data::Node;
use crate::server::VentedServer; use crate::server::VentedServer;
use executors::Executor; use executors::Executor;
@ -103,7 +104,7 @@ impl VentedServer {
move |event| { move |event| {
let payload = event.get_payload::<RedirectResponsePayload>().ok()?; let payload = event.get_payload::<RedirectResponsePayload>().ok()?;
let mut future = redirect_handles.lock().remove(&payload.id)?; let mut future = redirect_handles.lock().remove(&payload.id)?;
future.set_value(true); future.resolve(());
None None
} }
@ -113,7 +114,7 @@ impl VentedServer {
move |event| { move |event| {
let payload = event.get_payload::<RedirectResponsePayload>().ok()?; let payload = event.get_payload::<RedirectResponsePayload>().ok()?;
let mut future = redirect_handles.lock().remove(&payload.id)?; let mut future = redirect_handles.lock().remove(&payload.id)?;
future.set_value(false); future.reject(VentedError::Rejected);
None None
} }

@ -91,21 +91,21 @@ fn test_server_communication() {
}); });
server_b server_b
.emit("A".to_string(), Event::new(NODE_LIST_REQUEST_EVENT)) .emit("A".to_string(), Event::new(NODE_LIST_REQUEST_EVENT))
.unwrap(); .block_unwrap();
server_c server_c
.emit("A".to_string(), Event::new("ping".to_string())) .emit("A".to_string(), Event::new("ping".to_string()))
.unwrap(); .block_unwrap();
for _ in 0..9 { for _ in 0..9 {
server_b server_b
.emit("A".to_string(), Event::new("ping".to_string())) .emit("A".to_string(), Event::new("ping".to_string()))
.unwrap(); .block_unwrap();
} }
server_a server_a
.emit("B".to_string(), Event::new("pong".to_string())) .emit("B".to_string(), Event::new("pong".to_string()))
.unwrap(); .block_unwrap();
server_b server_b
.emit("C".to_string(), Event::new("ping".to_string())) .emit("C".to_string(), Event::new("ping".to_string()))
.unwrap(); .block_unwrap();
// wait one second to make sure the servers were able to process the events // wait one second to make sure the servers were able to process the events
for _ in 0..100 { for _ in 0..100 {

Loading…
Cancel
Save