Fix event redirect handling

Signed-off-by: trivernis <trivernis@protonmail.com>
pull/2/head
trivernis 4 years ago
parent f9988b62c1
commit 19c1cdb649
Signed by: Trivernis
GPG Key ID: DFFFCC2C7A02DB45

@ -1,7 +1,7 @@
[package] [package]
name = "vented" name = "vented"
description = "Event driven encrypted tcp communicaton" description = "Event driven encrypted tcp communicaton"
version = "0.11.3" version = "0.11.4"
authors = ["trivernis <trivernis@protonmail.com>"] authors = ["trivernis <trivernis@protonmail.com>"]
edition = "2018" edition = "2018"
readme = "README.md" readme = "README.md"

@ -325,23 +325,27 @@ impl VentedServer {
async fn read_stream( async fn read_stream(
mut stream: CryptoStream, mut stream: CryptoStream,
connections: Arc<Mutex<HashMap<String, CryptoStream>>>, connections: Arc<Mutex<HashMap<String, CryptoStream>>>,
mut handler: EventHandler, handler: EventHandler,
) { ) {
loop { loop {
match stream.read().await { match stream.read().await {
Ok(mut event) => { Ok(mut event) => {
event.origin = Some(stream.receiver_node().clone()); let mut handler = handler.clone();
let results = handler.handle_event(event).await; let mut stream = stream.clone();
for result in results { task::spawn(async move {
if let Err(e) = stream.send(result).await { event.origin = Some(stream.receiver_node().clone());
log::error!( let results = handler.handle_event(event).await;
"Failed to send event to {}: {}", for result in results {
stream.receiver_node(), if let Err(e) = stream.send(result).await {
e log::error!(
); "Failed to send event to {}: {}",
break; stream.receiver_node(),
e
);
stream.shutdown().expect("Failed to shutdown stream");
}
} }
} });
} }
Err(e) => { Err(e) => {
log::error!( log::error!(
@ -354,6 +358,7 @@ impl VentedServer {
} }
} }
connections.lock().remove(stream.receiver_node()); connections.lock().remove(stream.receiver_node());
stream.shutdown().expect("Failed to shutdown stream");
} }
/// Takes three attempts to retrieve a connection for the given node. /// Takes three attempts to retrieve a connection for the given node.

@ -6,6 +6,7 @@
use std::sync::Arc; use std::sync::Arc;
use async_std::task;
use rand::{thread_rng, RngCore}; use rand::{thread_rng, RngCore};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use x25519_dalek::PublicKey; use x25519_dalek::PublicKey;
@ -137,13 +138,21 @@ impl VentedServer {
let connections = Arc::clone(&connections); let connections = Arc::clone(&connections);
Box::pin(async move { Box::pin(async move {
let payload = event.get_payload::<RedirectPayload>().ok()?; let payload = event.get_payload::<RedirectPayload>().ok()?;
if payload.source == event.origin? { if payload.source == event.origin? {
log::trace!(
"Handling redirect from {} via {} to {}",
payload.source,
payload.proxy,
payload.target
);
let opt_stream = connections.lock().get(&payload.target).cloned(); let opt_stream = connections.lock().get(&payload.target).cloned();
if let Some(mut stream) = opt_stream { if let Some(mut stream) = opt_stream {
if let Ok(_) = stream if let Ok(_) = stream
.send(Event::with_payload(REDIRECT_REDIRECTED_EVENT, &payload)) .send(Event::with_payload(REDIRECT_REDIRECTED_EVENT, &payload))
.await .await
{ {
log::trace!("Redirect succeeded");
return Some(Event::with_payload( return Some(Event::with_payload(
REDIRECT_CONFIRM_EVENT, REDIRECT_CONFIRM_EVENT,
&RedirectResponsePayload { id: payload.id }, &RedirectResponsePayload { id: payload.id },
@ -152,6 +161,7 @@ impl VentedServer {
} }
} }
log::trace!("Redirect failed");
Some(Event::with_payload( Some(Event::with_payload(
REDIRECT_FAIL_EVENT, REDIRECT_FAIL_EVENT,
&RedirectResponsePayload { id: payload.id }, &RedirectResponsePayload { id: payload.id },
@ -168,32 +178,44 @@ impl VentedServer {
let mut event_handler = event_handler.clone(); let mut event_handler = event_handler.clone();
Box::pin(async move { Box::pin(async move {
let payload = event.get_payload::<RedirectPayload>().ok()?; let payload = event.get_payload::<RedirectPayload>().ok()?;
let event = Event::from(&mut &payload.content[..]).ok()?;
let origin = event.origin.clone()?; let origin = event.origin.clone()?;
let event = Event::from(&mut &payload.content[..]).ok()?;
let responses = event_handler.handle_event(event).await; log::trace!("Spawning task to handle redirect responses");
let responses = responses task::spawn(async move {
.iter() log::trace!("Emitting redirected event...");
.cloned() let responses = event_handler.handle_event(event).await;
.map(|mut value| { log::trace!("Mapping responses...");
let payload = payload.clone(); let responses = responses
Event::with_payload( .iter()
REDIRECT_EVENT, .cloned()
&RedirectPayload::new( .map(|mut value| {
payload.target, let payload = payload.clone();
payload.proxy, Event::with_payload(
payload.source, REDIRECT_EVENT,
value.as_bytes(), &RedirectPayload::new(
), payload.target,
) payload.proxy,
}) payload.source,
.collect::<Vec<Event>>(); value.as_bytes(),
let opt_stream = connections.lock().get(&origin).cloned(); ),
if let Some(mut stream) = opt_stream { )
for response in responses { })
stream.send(response).await.ok()?; .collect::<Vec<Event>>();
let opt_stream = connections.lock().get(&origin).cloned();
log::trace!("Sending responses...");
if let Some(mut stream) = opt_stream {
for response in responses {
if let Err(e) = stream.send(response).await {
log::error!("Failed to send response events: {}", e);
connections.lock().remove(stream.receiver_node());
stream.shutdown().expect("Failed to shutdown stream");
}
}
} }
} });
log::trace!("Done");
None None
}) })

@ -1,7 +1,7 @@
use async_std::task; use async_std::task;
use crypto_box::SecretKey; use crypto_box::SecretKey;
use log::LevelFilter; use log::LevelFilter;
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use vented::event::Event; use vented::event::Event;
@ -23,6 +23,7 @@ fn test_server_communication() {
setup(); setup();
let ping_count = Arc::new(AtomicUsize::new(0)); let ping_count = Arc::new(AtomicUsize::new(0));
let pong_count = Arc::new(AtomicUsize::new(0)); let pong_count = Arc::new(AtomicUsize::new(0));
let c_pinged = Arc::new(AtomicBool::new(false));
let mut rng = rand::thread_rng(); let mut rng = rand::thread_rng();
let global_secret_a = SecretKey::generate(&mut rng); let global_secret_a = SecretKey::generate(&mut rng);
let global_secret_b = SecretKey::generate(&mut rng); let global_secret_b = SecretKey::generate(&mut rng);
@ -71,7 +72,7 @@ fn test_server_communication() {
nodes.clone(), nodes.clone(),
ServerTimeouts::default(), ServerTimeouts::default(),
); );
let server_c = VentedServer::new( let mut server_c = VentedServer::new(
"C".to_string(), "C".to_string(),
global_secret_c, global_secret_c,
nodes, nodes,
@ -100,6 +101,16 @@ fn test_server_communication() {
}) })
} }
}); });
server_c.on("ping", {
let c_pinged = Arc::clone(&c_pinged);
move |_| {
let c_pinged = Arc::clone(&c_pinged);
Box::pin(async move {
c_pinged.store(true, Ordering::Relaxed);
None
})
}
});
for i in 0..10 { for i in 0..10 {
assert!(server_a assert!(server_a
.emit(format!("Nodes-{}", i), Event::new("ping")) .emit(format!("Nodes-{}", i), Event::new("ping"))
@ -114,6 +125,10 @@ fn test_server_communication() {
.emit("A", Event::new("ping".to_string())) .emit("A", Event::new("ping".to_string()))
.await .await
.unwrap(); .unwrap();
server_b
.emit("C", Event::new("ping".to_string()))
.await
.unwrap();
for _ in 0..9 { for _ in 0..9 {
server_b server_b
.emit("A", Event::new("ping".to_string())) .emit("A", Event::new("ping".to_string()))
@ -124,14 +139,11 @@ fn test_server_communication() {
.emit("B", Event::new("pong".to_string())) .emit("B", Event::new("pong".to_string()))
.await .await
.unwrap(); .unwrap();
server_b task::sleep(Duration::from_secs(2)).await;
.emit("C", Event::new("ping".to_string()))
.await
.unwrap();
task::sleep(Duration::from_secs(1)).await;
}); });
// wait one second to make sure the servers were able to process the events // wait one second to make sure the servers were able to process the events
assert_eq!(ping_count.load(Ordering::SeqCst), 10); assert_eq!(ping_count.load(Ordering::SeqCst), 10);
assert_eq!(pong_count.load(Ordering::SeqCst), 10); assert_eq!(pong_count.load(Ordering::SeqCst), 10);
assert!(c_pinged.load(Ordering::SeqCst));
} }

Loading…
Cancel
Save