Add event redirection via proxy nodes

Signed-off-by: trivernis <trivernis@protonmail.com>
pull/1/head
trivernis 4 years ago
parent 32f15a2c89
commit 12d17f09c9
Signed by: Trivernis
GPG Key ID: DFFFCC2C7A02DB45

@ -1,7 +1,7 @@
[package] [package]
name = "vented" name = "vented"
description = "Event driven encrypted tcp communicaton" description = "Event driven encrypted tcp communicaton"
version = "0.3.1" version = "0.4.0"
authors = ["trivernis <trivernis@protonmail.com>"] authors = ["trivernis <trivernis@protonmail.com>"]
edition = "2018" edition = "2018"
readme = "README.md" readme = "README.md"

@ -16,7 +16,7 @@ pub struct EmptyPayload {}
/// A single event that has a name and payload. /// A single event that has a name and payload.
/// The payload is encoded with message pack /// The payload is encoded with message pack
#[derive(Clone, Debug)] #[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Event { pub struct Event {
pub name: String, pub name: String,
pub payload: Vec<u8>, pub payload: Vec<u8>,

@ -43,14 +43,6 @@ impl<T> Future<T> {
} }
} }
/// Creates the future with an already resolved value
pub fn with_value(value: T) -> Self {
Self {
value: Arc::new(Mutex::new(Some(value))),
wg: None,
}
}
/// Sets the value of the future consuming the wait group /// Sets the value of the future consuming the wait group
pub fn set_value(&mut self, value: T) { pub fn set_value(&mut self, value: T) {
self.value.lock().replace(value); self.value.lock().replace(value);

@ -11,8 +11,10 @@ use crate::result::VentedError::UnknownNode;
use crate::result::{VentedError, VentedResult}; use crate::result::{VentedError, VentedResult};
use crate::server::data::{Future, Node, ServerConnectionContext}; use crate::server::data::{Future, Node, ServerConnectionContext};
use crate::server::server_events::{ use crate::server::server_events::{
AuthPayload, ChallengePayload, NodeInformationPayload, VersionMismatchPayload, ACCEPT_EVENT, AuthPayload, ChallengePayload, NodeInformationPayload, RedirectPayload,
AUTH_EVENT, CHALLENGE_EVENT, CONNECT_EVENT, MISMATCH_EVENT, READY_EVENT, REJECT_EVENT, RedirectResponsePayload, VersionMismatchPayload, ACCEPT_EVENT, AUTH_EVENT, CHALLENGE_EVENT,
CONNECT_EVENT, MISMATCH_EVENT, READY_EVENT, REDIRECT_CONFIRM_EVENT, REDIRECT_EVENT,
REDIRECT_FAIL_EVENT, REDIRECT_REDIRECTED_EVENT, REJECT_EVENT,
}; };
use crossbeam_utils::sync::WaitGroup; use crossbeam_utils::sync::WaitGroup;
use parking_lot::Mutex; use parking_lot::Mutex;
@ -69,6 +71,7 @@ pub struct VentedServer {
event_handler: Arc<Mutex<EventHandler>>, event_handler: Arc<Mutex<EventHandler>>,
global_secret_key: SecretKey, global_secret_key: SecretKey,
node_id: String, node_id: String,
redirect_handles: Arc<Mutex<HashMap<[u8; 16], Future<bool>>>>,
} }
impl VentedServer { impl VentedServer {
@ -82,7 +85,7 @@ impl VentedServer {
nodes: Vec<Node>, nodes: Vec<Node>,
num_threads: usize, num_threads: usize,
) -> Self { ) -> Self {
Self { let mut server = Self {
node_id, node_id,
event_handler: Arc::new(Mutex::new(EventHandler::new())), event_handler: Arc::new(Mutex::new(EventHandler::new())),
listener_pool: Arc::new(Mutex::new(ScheduledThreadPool::with_name( listener_pool: Arc::new(Mutex::new(ScheduledThreadPool::with_name(
@ -97,7 +100,11 @@ impl VentedServer {
forwarded_connections: Arc::new(Mutex::new(HashMap::new())), forwarded_connections: Arc::new(Mutex::new(HashMap::new())),
global_secret_key: secret_key, global_secret_key: secret_key,
known_nodes: Arc::new(Mutex::new(nodes)), known_nodes: Arc::new(Mutex::new(nodes)),
} redirect_handles: Arc::new(Mutex::new(HashMap::new())),
};
server.register_events();
server
} }
/// Returns the nodeId of the server /// Returns the nodeId of the server
@ -115,8 +122,7 @@ impl VentedServer {
/// With the returned wait group one can wait for the event to be written. /// With the returned wait group one can wait for the event to be written.
pub fn emit(&self, node_id: String, event: Event) -> VentedResult<WaitGroup> { pub fn emit(&self, node_id: String, event: Event) -> VentedResult<WaitGroup> {
let wg = WaitGroup::new(); let wg = WaitGroup::new();
let stream = self.get_connection(node_id)?; if let Ok(stream) = self.get_connection(&node_id) {
self.sender_pool.lock().execute({ self.sender_pool.lock().execute({
let wg = WaitGroup::clone(&wg); let wg = WaitGroup::clone(&wg);
let connections = Arc::clone(&self.connections); let connections = Arc::clone(&self.connections);
@ -128,6 +134,12 @@ impl VentedServer {
std::mem::drop(wg); std::mem::drop(wg);
} }
}); });
} else {
log::trace!(
"Trying to redirect the event to a different node to be sent to target node..."
);
self.send_event_redirected(node_id, event)?;
}
Ok(wg) Ok(wg)
} }
@ -177,6 +189,88 @@ impl VentedServer {
wg2 wg2
} }
/// Registers default server events
fn register_events(&mut self) {
self.on(REDIRECT_CONFIRM_EVENT, {
let redirect_handles = Arc::clone(&self.redirect_handles);
move |event| {
let payload = event.get_payload::<RedirectResponsePayload>().ok()?;
let mut future = redirect_handles.lock().remove(&payload.id)?;
future.set_value(true);
None
}
});
self.on(REDIRECT_FAIL_EVENT, {
let redirect_handles = Arc::clone(&self.redirect_handles);
move |event| {
let payload = event.get_payload::<RedirectResponsePayload>().ok()?;
let mut future = redirect_handles.lock().remove(&payload.id)?;
future.set_value(false);
None
}
});
self.on(REDIRECT_EVENT, {
let connections = Arc::clone(&self.connections);
move |event| {
let payload = event.get_payload::<RedirectPayload>().ok()?;
let stream = connections.lock().get(&payload.target)?.clone();
if stream
.send(Event::with_payload(REDIRECT_REDIRECTED_EVENT, &payload))
.is_ok()
{
Some(Event::with_payload(
REDIRECT_CONFIRM_EVENT,
&RedirectResponsePayload { id: payload.id },
))
} else {
Some(Event::with_payload(
REDIRECT_FAIL_EVENT,
&RedirectResponsePayload { id: payload.id },
))
}
}
});
self.on(REDIRECT_REDIRECTED_EVENT, {
let event_handler = Arc::clone(&self.event_handler);
let connections = Arc::clone(&self.connections);
let pool = Arc::clone(&self.sender_pool);
move |event| {
let payload = event.get_payload::<RedirectPayload>().ok()?;
let event = Event::from_bytes(&mut &payload.content[..]).ok()?;
let proxy_stream = connections.lock().get(&payload.proxy)?.clone();
pool.lock().execute({
let event_handler = Arc::clone(&event_handler);
move || {
let response = event_handler.lock().handle_event(event);
let event = response.map(|mut value| {
Event::with_payload(
REDIRECT_EVENT,
&RedirectPayload::new(
payload.target,
payload.proxy,
payload.source,
value.as_bytes(),
),
)
});
if let Some(event) = event {
proxy_stream
.send(event)
.expect("Failed to respond to redirected event.");
}
}
});
None
}
})
}
/// Returns a copy of the servers metadata /// Returns a copy of the servers metadata
fn get_server_context(&self) -> ServerConnectionContext { fn get_server_context(&self) -> ServerConnectionContext {
ServerConnectionContext { ServerConnectionContext {
@ -191,6 +285,36 @@ impl VentedServer {
} }
} }
/// Tries to send an event redirected by emitting a redirect event to all public nodes
fn send_event_redirected(&self, target: String, event: Event) -> VentedResult<()> {
let public_nodes = self
.known_nodes
.lock()
.iter()
.filter(|node| node.address.is_some())
.cloned()
.collect::<Vec<Node>>();
for node in public_nodes {
let payload = RedirectPayload::new(
self.node_id.clone(),
node.id.clone(),
target.clone(),
event.clone().as_bytes(),
);
let mut future = Future::new();
self.redirect_handles
.lock()
.insert(payload.id, Future::clone(&future));
self.emit(node.id, Event::with_payload(REDIRECT_EVENT, &payload))?;
if future.get_value() {
return Ok(());
}
}
Err(VentedError::UnknownNode(target))
}
/// Handles a single connection by first performing a key exchange and /// Handles a single connection by first performing a key exchange and
/// then establishing an encrypted connection /// then establishing an encrypted connection
fn handle_connection(params: ServerConnectionContext, stream: TcpStream) -> VentedResult<()> { fn handle_connection(params: ServerConnectionContext, stream: TcpStream) -> VentedResult<()> {
@ -240,9 +364,9 @@ impl VentedServer {
/// Takes three attempts to retrieve a connection for the given node. /// Takes three attempts to retrieve a connection for the given node.
/// First it tries to use the already established connection stored in the shared connections vector. /// First it tries to use the already established connection stored in the shared connections vector.
/// If that fails it tries to establish a new connection to the node by using the known address /// If that fails it tries to establish a new connection to the node by using the known address
fn get_connection(&self, target: String) -> VentedResult<CryptoStream> { fn get_connection(&self, target: &String) -> VentedResult<CryptoStream> {
log::trace!("Trying to connect to {}", target); log::trace!("Trying to connect to {}", target);
if let Some(stream) = self.connections.lock().get(&target) { if let Some(stream) = self.connections.lock().get(target) {
log::trace!("Reusing existing connection."); log::trace!("Reusing existing connection.");
return Ok(CryptoStream::clone(stream)); return Ok(CryptoStream::clone(stream));
} }
@ -251,7 +375,7 @@ impl VentedServer {
self.known_nodes self.known_nodes
.lock() .lock()
.iter() .iter()
.find(|node| node.id == target) .find(|node| &node.id == target)
.cloned() .cloned()
.ok_or(VentedError::UnknownNode(target.clone()))? .ok_or(VentedError::UnknownNode(target.clone()))?
}; };
@ -267,7 +391,7 @@ impl VentedServer {
log::debug!("All connection attempts to {} failed!", target); log::debug!("All connection attempts to {} failed!", target);
Err(VentedError::NotAServer(target)) Err(VentedError::NotAServer(target.clone()))
} }
/// Establishes a crypto stream for the given stream /// Establishes a crypto stream for the given stream

@ -1,3 +1,4 @@
use rand::{thread_rng, RngCore};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
pub(crate) const CONNECT_EVENT: &str = "conn:connect"; pub(crate) const CONNECT_EVENT: &str = "conn:connect";
@ -6,6 +7,10 @@ pub(crate) const CHALLENGE_EVENT: &str = "conn:challenge";
pub(crate) const ACCEPT_EVENT: &str = "conn:accept"; pub(crate) const ACCEPT_EVENT: &str = "conn:accept";
pub(crate) const REJECT_EVENT: &str = "conn:reject"; pub(crate) const REJECT_EVENT: &str = "conn:reject";
pub(crate) const MISMATCH_EVENT: &str = "conn:reject_version_mismatch"; pub(crate) const MISMATCH_EVENT: &str = "conn:reject_version_mismatch";
pub(crate) const REDIRECT_EVENT: &str = "conn:redirect";
pub(crate) const REDIRECT_CONFIRM_EVENT: &str = "conn:redirect_confirm";
pub(crate) const REDIRECT_FAIL_EVENT: &str = "conn:redirect_failed";
pub(crate) const REDIRECT_REDIRECTED_EVENT: &str = "conn:redirect_redirected";
pub const READY_EVENT: &str = "server:ready"; pub const READY_EVENT: &str = "server:ready";
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug)]
@ -39,3 +44,32 @@ impl VersionMismatchPayload {
} }
} }
} }
#[derive(Serialize, Deserialize, Clone)]
pub(crate) struct RedirectPayload {
pub(crate) source: String,
pub(crate) proxy: String,
pub(crate) target: String,
pub(crate) content: Vec<u8>,
pub(crate) id: [u8; 16],
}
impl RedirectPayload {
pub fn new(source: String, proxy: String, target: String, content: Vec<u8>) -> Self {
let mut id = [0u8; 16];
thread_rng().fill_bytes(&mut id);
Self {
source,
target,
content,
proxy,
id,
}
}
}
#[derive(Serialize, Deserialize)]
pub(crate) struct RedirectResponsePayload {
pub(crate) id: [u8; 16],
}

@ -98,16 +98,17 @@ fn test_server_communication() {
.emit("B".to_string(), Event::new("pong".to_string())) .emit("B".to_string(), Event::new("pong".to_string()))
.unwrap(); .unwrap();
wg.wait(); wg.wait();
assert!(server_b let wg = server_b
.emit("C".to_string(), Event::new("ping".to_string())) .emit("C".to_string(), Event::new("ping".to_string()))
.is_err()); .unwrap();
wg.wait();
// wait one second to make sure the servers were able to process the events // wait one second to make sure the servers were able to process the events
for _ in 0..100 { for _ in 0..100 {
thread::sleep(Duration::from_millis(10)); thread::sleep(Duration::from_millis(10));
} }
assert_eq!(ping_c_count.load(Ordering::SeqCst), 0); assert_eq!(ping_c_count.load(Ordering::SeqCst), 1);
assert_eq!(ready_count.load(Ordering::SeqCst), 3); assert_eq!(ready_count.load(Ordering::SeqCst), 3);
assert_eq!(ping_count.load(Ordering::SeqCst), 10); assert_eq!(ping_count.load(Ordering::SeqCst), 10);
assert_eq!(pong_count.load(Ordering::SeqCst), 10); assert_eq!(pong_count.load(Ordering::SeqCst), 10);

Loading…
Cancel
Save