diff --git a/Cargo.toml b/Cargo.toml index af80508..9c35149 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,4 +19,5 @@ rand = "0.7.3" sha2 = "0.9.2" generic-array = "0.14.4" typenum = "1.12.0" -x25519-dalek = "1.1.0" \ No newline at end of file +x25519-dalek = "1.1.0" +crossbeam-utils = "0.8.0" \ No newline at end of file diff --git a/README.md b/README.md index 7ec7dcf..5d67e11 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,45 @@ # Vented -Vented is an event based TCP server that uses message pack for payload data. \ No newline at end of file +Vented is an event based TCP server with encryption that uses message pack for payload data. + +## Encryption + +Vented uses key cryptography to encrypt the connection between the client and the serve. +The authenticity of both parties is validated by global public keys that need to be known +to both parties beforehand. The encryption itself uses randomly generated keys and a nonce +that corresponds to the message number. The crate used for encryption is [crypto_box](https://crates.io/crates/crypto_box) +which the [XChaCha20Poly1305](https://github.com/RustCrypto/AEADs/tree/master/chacha20poly1305) encryption. +The crate used for the key exchanges is [x25519-dalek](https://crates.io/crates/x25519-dalek). + +## Usage + + ```rust +use vented::server::VentedServer; +use vented::server::data::Node; +use vented::crypto::SecretKey; +use rand::thread_rng; +use vented::event::Event; + +fn main() { + let nodes = vec![ + Node { + id: "B".to_string(), + address: None, + public_key: global_secret_b.public_key() // load it from somewhere + }, + ]; + // in a real world example the secret key needs to be loaded from somewhere because connections + // with unknown keys are not accepted. + let global_secret = SecretKey::new(&mut thread_rng()); + let mut server = VentedServer::new("A".to_string(), global_secret, nodes.clone(), 4); + + + server.listen("localhost:20000".to_string()); + server.on("pong", |_event| { + println!("Pong!"); + + None // the return value is the response event Option + }); + server.emit("B".to_string(), Event::new("ping".to_string())).unwrap(); +} + ``` \ No newline at end of file diff --git a/src/crypto/mod.rs b/src/crypto/mod.rs index a8d4feb..e8e4fbc 100644 --- a/src/crypto/mod.rs +++ b/src/crypto/mod.rs @@ -13,6 +13,8 @@ use typenum::U24; use crate::event::Event; use crate::result::VentedResult; +pub use crypto_box::SecretKey; + /// A cryptographical stream object that handles encryption and decryption of streams #[derive(Clone)] pub struct CryptoStream { diff --git a/src/event_handler/mod.rs b/src/event_handler/mod.rs index ebe6d3a..cea2fda 100644 --- a/src/event_handler/mod.rs +++ b/src/event_handler/mod.rs @@ -20,8 +20,8 @@ impl EventHandler { /// Adds a handler for the given event pub fn on(&mut self, event_name: &str, handler: F) - where - F: Fn(Event) -> Option + Send + Sync, + where + F: Fn(Event) -> Option + Send + Sync, { match self.event_handlers.get_mut(event_name) { Some(handlers) => handlers.push(Box::new(handler)), diff --git a/src/server/mod.rs b/src/server/mod.rs index c677e3b..fbaa753 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -14,6 +14,7 @@ use crate::server::server_events::{ AuthPayload, NodeInformationPayload, AUTH_EVENT, CONNECT_EVENT, CONN_ACCEPT_EVENT, CONN_CHALLENGE_EVENT, CONN_REJECT_EVENT, READY_EVENT, }; +use crossbeam_utils::sync::WaitGroup; use parking_lot::Mutex; use std::io::Write; use std::sync::Arc; @@ -24,6 +25,35 @@ pub mod data; pub mod server_events; /// The vented server that provides parallel handling of connections +/// Usage: +/// ```rust +/// use vented::server::VentedServer; +/// use vented::server::data::Node; +/// use vented::crypto::SecretKey; +/// use rand::thread_rng; +/// use vented::event::Event; +/// +/// let nodes = vec![ +/// Node { +/// id: "B".to_string(), +/// address: None, +/// public_key: global_secret_b.public_key() // load it from somewhere +/// }, +///]; +/// // in a real world example the secret key needs to be loaded from somewhere because connections +/// // with unknown keys are not accepted. +/// let global_secret = SecretKey::new(&mut thread_rng()); +/// let mut server = VentedServer::new("A".to_string(), global_secret, nodes.clone(), 4); +/// +/// +/// server.listen("localhost:20000".to_string()); +/// server.on("pong", |_event| { +/// println!("Pong!"); +/// +/// None // the return value is the response event Option +/// }); +/// server.emit("B".to_string(), Event::new("ping".to_string())).unwrap(); +/// ``` pub struct VentedServer { connections: Arc>>, known_nodes: Arc>>, @@ -35,6 +65,10 @@ pub struct VentedServer { } impl VentedServer { + /// Creates a new vented server with a given node_id and secret key that are + /// used to authenticate against other servers. + /// The given nodes are used for authentication. + /// The server runs with 2x the given amount of threads. pub fn new( node_id: String, secret_key: SecretKey, @@ -53,14 +87,19 @@ impl VentedServer { } /// Emits an event to the specified Node - pub fn emit(&self, node_id: String, event: Event) -> VentedResult<()> { + /// The actual writing is done in a separate thread from the thread pool. + /// With the returned waitgroup one can wait for the event to be written. + pub fn emit(&self, node_id: String, event: Event) -> VentedResult { let handler = self.connections.lock().get(&node_id).cloned(); + let wg = WaitGroup::new(); + let wg2 = WaitGroup::clone(&wg); if let Some(handler) = handler { self.sender_pool.lock().execute(move || { handler.send(event).expect("Failed to send event"); + std::mem::drop(wg); }); - Ok(()) + Ok(wg2) } else { let found_node = self .known_nodes @@ -73,8 +112,9 @@ impl VentedServer { let handler = self.connect(address.clone())?; self.sender_pool.lock().execute(move || { handler.send(event).expect("Failed to send event"); + std::mem::drop(wg); }); - Ok(()) + Ok(wg2) } else { Err(VentedError::NotAServer(node_id)) } @@ -84,7 +124,10 @@ impl VentedServer { } } - /// Adds a handler for the given event + /// Adds a handler for the given event. + /// The event returned by the handler is returned to the server. + /// If there is more than one handler, the response will be piped to the next handler. + /// The oder is by order of insertion. The first registered handler will be executed first. pub fn on(&mut self, event_name: &str, handler: F) where F: Fn(Event) -> Option + Send + Sync, @@ -93,11 +136,18 @@ impl VentedServer { } /// Starts listening on the specified address (with port!) - pub fn listen(&mut self, address: String) { + /// This will cause a new thread to start up so that the method returns immediately + /// With the returned wait group one can wait for the server to be ready. + /// The method can be called multiple times to start listeners on multiple ports. + pub fn listen(&mut self, address: String) -> WaitGroup { let context = self.get_server_context(); + let wg = WaitGroup::new(); + let wg2 = WaitGroup::clone(&wg); - thread::spawn(move || match TcpListener::bind(address) { + thread::spawn(move || match TcpListener::bind(&address) { Ok(listener) => { + log::info!("Listener running on {}", address); + std::mem::drop(wg); for connection in listener.incoming() { match connection { Ok(stream) => { @@ -109,8 +159,13 @@ impl VentedServer { } } } - Err(e) => log::error!("Failed to bind listener: {}", e), + Err(e) => { + log::error!("Failed to bind listener: {}", e); + std::mem::drop(wg); + } }); + + wg2 } /// Returns a copy of the servers metadata diff --git a/tests/test_communication.rs b/tests/test_communication.rs index 9547167..d438002 100644 --- a/tests/test_communication.rs +++ b/tests/test_communication.rs @@ -30,8 +30,8 @@ fn test_server_communication() { ]; let mut server_a = VentedServer::new("A".to_string(), global_secret_a, nodes.clone(), 4); let mut server_b = VentedServer::new("B".to_string(), global_secret_b, nodes, 4); - server_a.listen("localhost:22222".to_string()); - thread::sleep(Duration::from_millis(10)); + let wg = server_a.listen("localhost:22222".to_string()); + wg.wait(); server_a.on("ping", { let ping_count = Arc::clone(&ping_count); @@ -63,14 +63,15 @@ fn test_server_communication() { } }); for _ in 0..10 { - server_b + let wg = server_b .emit("A".to_string(), Event::new("ping".to_string())) .unwrap(); - thread::sleep(Duration::from_millis(10)); + wg.wait(); } - server_a + let wg = server_a .emit("B".to_string(), Event::new("pong".to_string())) .unwrap(); + wg.wait(); // wait one second to make sure the servers were able to process the events for _ in 0..100 {