diff --git a/Cargo.toml b/Cargo.toml index 5db4a4b..22cabf9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "vented" description = "Event driven encrypted tcp communicaton" -version = "0.3.1" +version = "0.4.0" authors = ["trivernis "] edition = "2018" readme = "README.md" diff --git a/src/event/mod.rs b/src/event/mod.rs index c58e2ca..9343a33 100644 --- a/src/event/mod.rs +++ b/src/event/mod.rs @@ -16,7 +16,7 @@ pub struct EmptyPayload {} /// A single event that has a name and payload. /// The payload is encoded with message pack -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct Event { pub name: String, pub payload: Vec, diff --git a/src/server/data.rs b/src/server/data.rs index a02f1f0..d6492e3 100644 --- a/src/server/data.rs +++ b/src/server/data.rs @@ -43,14 +43,6 @@ impl Future { } } - /// Creates the future with an already resolved value - pub fn with_value(value: T) -> Self { - Self { - value: Arc::new(Mutex::new(Some(value))), - wg: None, - } - } - /// Sets the value of the future consuming the wait group pub fn set_value(&mut self, value: T) { self.value.lock().replace(value); diff --git a/src/server/mod.rs b/src/server/mod.rs index 6c215bc..18021df 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -11,8 +11,10 @@ use crate::result::VentedError::UnknownNode; use crate::result::{VentedError, VentedResult}; use crate::server::data::{Future, Node, ServerConnectionContext}; use crate::server::server_events::{ - AuthPayload, ChallengePayload, NodeInformationPayload, VersionMismatchPayload, ACCEPT_EVENT, - AUTH_EVENT, CHALLENGE_EVENT, CONNECT_EVENT, MISMATCH_EVENT, READY_EVENT, REJECT_EVENT, + AuthPayload, ChallengePayload, NodeInformationPayload, RedirectPayload, + RedirectResponsePayload, VersionMismatchPayload, ACCEPT_EVENT, AUTH_EVENT, CHALLENGE_EVENT, + CONNECT_EVENT, MISMATCH_EVENT, READY_EVENT, REDIRECT_CONFIRM_EVENT, REDIRECT_EVENT, + REDIRECT_FAIL_EVENT, REDIRECT_REDIRECTED_EVENT, REJECT_EVENT, }; use crossbeam_utils::sync::WaitGroup; use parking_lot::Mutex; @@ -69,6 +71,7 @@ pub struct VentedServer { event_handler: Arc>, global_secret_key: SecretKey, node_id: String, + redirect_handles: Arc>>>, } impl VentedServer { @@ -82,7 +85,7 @@ impl VentedServer { nodes: Vec, num_threads: usize, ) -> Self { - Self { + let mut server = Self { node_id, event_handler: Arc::new(Mutex::new(EventHandler::new())), listener_pool: Arc::new(Mutex::new(ScheduledThreadPool::with_name( @@ -97,7 +100,11 @@ impl VentedServer { forwarded_connections: Arc::new(Mutex::new(HashMap::new())), global_secret_key: secret_key, known_nodes: Arc::new(Mutex::new(nodes)), - } + redirect_handles: Arc::new(Mutex::new(HashMap::new())), + }; + server.register_events(); + + server } /// Returns the nodeId of the server @@ -115,19 +122,24 @@ impl VentedServer { /// With the returned wait group one can wait for the event to be written. pub fn emit(&self, node_id: String, event: Event) -> VentedResult { let wg = WaitGroup::new(); - let stream = self.get_connection(node_id)?; - - self.sender_pool.lock().execute({ - let wg = WaitGroup::clone(&wg); - let connections = Arc::clone(&self.connections); - move || { - if let Err(e) = stream.send(event) { - log::error!("Failed to send event: {}", e); - connections.lock().remove(stream.receiver_node()); + if let Ok(stream) = self.get_connection(&node_id) { + self.sender_pool.lock().execute({ + let wg = WaitGroup::clone(&wg); + let connections = Arc::clone(&self.connections); + move || { + if let Err(e) = stream.send(event) { + log::error!("Failed to send event: {}", e); + connections.lock().remove(stream.receiver_node()); + } + std::mem::drop(wg); } - std::mem::drop(wg); - } - }); + }); + } else { + log::trace!( + "Trying to redirect the event to a different node to be sent to target node..." + ); + self.send_event_redirected(node_id, event)?; + } Ok(wg) } @@ -177,6 +189,88 @@ impl VentedServer { wg2 } + /// Registers default server events + fn register_events(&mut self) { + self.on(REDIRECT_CONFIRM_EVENT, { + let redirect_handles = Arc::clone(&self.redirect_handles); + move |event| { + let payload = event.get_payload::().ok()?; + let mut future = redirect_handles.lock().remove(&payload.id)?; + future.set_value(true); + + None + } + }); + self.on(REDIRECT_FAIL_EVENT, { + let redirect_handles = Arc::clone(&self.redirect_handles); + move |event| { + let payload = event.get_payload::().ok()?; + let mut future = redirect_handles.lock().remove(&payload.id)?; + future.set_value(false); + + None + } + }); + self.on(REDIRECT_EVENT, { + let connections = Arc::clone(&self.connections); + + move |event| { + let payload = event.get_payload::().ok()?; + let stream = connections.lock().get(&payload.target)?.clone(); + if stream + .send(Event::with_payload(REDIRECT_REDIRECTED_EVENT, &payload)) + .is_ok() + { + Some(Event::with_payload( + REDIRECT_CONFIRM_EVENT, + &RedirectResponsePayload { id: payload.id }, + )) + } else { + Some(Event::with_payload( + REDIRECT_FAIL_EVENT, + &RedirectResponsePayload { id: payload.id }, + )) + } + } + }); + self.on(REDIRECT_REDIRECTED_EVENT, { + let event_handler = Arc::clone(&self.event_handler); + let connections = Arc::clone(&self.connections); + let pool = Arc::clone(&self.sender_pool); + + move |event| { + let payload = event.get_payload::().ok()?; + let event = Event::from_bytes(&mut &payload.content[..]).ok()?; + let proxy_stream = connections.lock().get(&payload.proxy)?.clone(); + + pool.lock().execute({ + let event_handler = Arc::clone(&event_handler); + move || { + let response = event_handler.lock().handle_event(event); + let event = response.map(|mut value| { + Event::with_payload( + REDIRECT_EVENT, + &RedirectPayload::new( + payload.target, + payload.proxy, + payload.source, + value.as_bytes(), + ), + ) + }); + if let Some(event) = event { + proxy_stream + .send(event) + .expect("Failed to respond to redirected event."); + } + } + }); + + None + } + }) + } + /// Returns a copy of the servers metadata fn get_server_context(&self) -> ServerConnectionContext { ServerConnectionContext { @@ -191,6 +285,36 @@ impl VentedServer { } } + /// Tries to send an event redirected by emitting a redirect event to all public nodes + fn send_event_redirected(&self, target: String, event: Event) -> VentedResult<()> { + let public_nodes = self + .known_nodes + .lock() + .iter() + .filter(|node| node.address.is_some()) + .cloned() + .collect::>(); + for node in public_nodes { + let payload = RedirectPayload::new( + self.node_id.clone(), + node.id.clone(), + target.clone(), + event.clone().as_bytes(), + ); + let mut future = Future::new(); + self.redirect_handles + .lock() + .insert(payload.id, Future::clone(&future)); + self.emit(node.id, Event::with_payload(REDIRECT_EVENT, &payload))?; + + if future.get_value() { + return Ok(()); + } + } + + Err(VentedError::UnknownNode(target)) + } + /// Handles a single connection by first performing a key exchange and /// then establishing an encrypted connection fn handle_connection(params: ServerConnectionContext, stream: TcpStream) -> VentedResult<()> { @@ -240,9 +364,9 @@ impl VentedServer { /// Takes three attempts to retrieve a connection for the given node. /// First it tries to use the already established connection stored in the shared connections vector. /// If that fails it tries to establish a new connection to the node by using the known address - fn get_connection(&self, target: String) -> VentedResult { + fn get_connection(&self, target: &String) -> VentedResult { log::trace!("Trying to connect to {}", target); - if let Some(stream) = self.connections.lock().get(&target) { + if let Some(stream) = self.connections.lock().get(target) { log::trace!("Reusing existing connection."); return Ok(CryptoStream::clone(stream)); } @@ -251,7 +375,7 @@ impl VentedServer { self.known_nodes .lock() .iter() - .find(|node| node.id == target) + .find(|node| &node.id == target) .cloned() .ok_or(VentedError::UnknownNode(target.clone()))? }; @@ -267,7 +391,7 @@ impl VentedServer { log::debug!("All connection attempts to {} failed!", target); - Err(VentedError::NotAServer(target)) + Err(VentedError::NotAServer(target.clone())) } /// Establishes a crypto stream for the given stream diff --git a/src/server/router.rs b/src/server/router.rs deleted file mode 100644 index e69de29..0000000 diff --git a/src/server/server_events.rs b/src/server/server_events.rs index 9b7d390..466f3db 100644 --- a/src/server/server_events.rs +++ b/src/server/server_events.rs @@ -1,3 +1,4 @@ +use rand::{thread_rng, RngCore}; use serde::{Deserialize, Serialize}; pub(crate) const CONNECT_EVENT: &str = "conn:connect"; @@ -6,6 +7,10 @@ pub(crate) const CHALLENGE_EVENT: &str = "conn:challenge"; pub(crate) const ACCEPT_EVENT: &str = "conn:accept"; pub(crate) const REJECT_EVENT: &str = "conn:reject"; pub(crate) const MISMATCH_EVENT: &str = "conn:reject_version_mismatch"; +pub(crate) const REDIRECT_EVENT: &str = "conn:redirect"; +pub(crate) const REDIRECT_CONFIRM_EVENT: &str = "conn:redirect_confirm"; +pub(crate) const REDIRECT_FAIL_EVENT: &str = "conn:redirect_failed"; +pub(crate) const REDIRECT_REDIRECTED_EVENT: &str = "conn:redirect_redirected"; pub const READY_EVENT: &str = "server:ready"; #[derive(Serialize, Deserialize, Debug)] @@ -39,3 +44,32 @@ impl VersionMismatchPayload { } } } + +#[derive(Serialize, Deserialize, Clone)] +pub(crate) struct RedirectPayload { + pub(crate) source: String, + pub(crate) proxy: String, + pub(crate) target: String, + pub(crate) content: Vec, + pub(crate) id: [u8; 16], +} + +impl RedirectPayload { + pub fn new(source: String, proxy: String, target: String, content: Vec) -> Self { + let mut id = [0u8; 16]; + thread_rng().fill_bytes(&mut id); + + Self { + source, + target, + content, + proxy, + id, + } + } +} + +#[derive(Serialize, Deserialize)] +pub(crate) struct RedirectResponsePayload { + pub(crate) id: [u8; 16], +} diff --git a/tests/test_communication.rs b/tests/test_communication.rs index 27dcb1a..33568cb 100644 --- a/tests/test_communication.rs +++ b/tests/test_communication.rs @@ -98,16 +98,17 @@ fn test_server_communication() { .emit("B".to_string(), Event::new("pong".to_string())) .unwrap(); wg.wait(); - assert!(server_b + let wg = server_b .emit("C".to_string(), Event::new("ping".to_string())) - .is_err()); + .unwrap(); + wg.wait(); // wait one second to make sure the servers were able to process the events for _ in 0..100 { thread::sleep(Duration::from_millis(10)); } - assert_eq!(ping_c_count.load(Ordering::SeqCst), 0); + assert_eq!(ping_c_count.load(Ordering::SeqCst), 1); assert_eq!(ready_count.load(Ordering::SeqCst), 3); assert_eq!(ping_count.load(Ordering::SeqCst), 10); assert_eq!(pong_count.load(Ordering::SeqCst), 10);