From 99efe4d587c886d79f2bf12dbb1288c9997571fe Mon Sep 17 00:00:00 2001 From: trivernis Date: Wed, 4 Nov 2020 21:15:00 +0100 Subject: [PATCH] Add event emitting to server Signed-off-by: trivernis --- src/crypto/mod.rs | 7 +- src/result.rs | 4 + src/server/data.rs | 24 ++++ src/server/mod.rs | 233 +++++++++++++++++++++++++++--------- tests/test_communication.rs | 56 +++++++++ 5 files changed, 265 insertions(+), 59 deletions(-) create mode 100644 src/server/data.rs diff --git a/src/crypto/mod.rs b/src/crypto/mod.rs index f0351a9..a8d4feb 100644 --- a/src/crypto/mod.rs +++ b/src/crypto/mod.rs @@ -26,6 +26,7 @@ pub struct CryptoStream { impl CryptoStream { /// Creates a new crypto stream from a given Tcp Stream and with a given secret pub fn new(inner: TcpStream, secret_box: crypto_box::ChaChaBox) -> VentedResult { + inner.set_nonblocking(false)?; let send_stream = Arc::new(Mutex::new(inner.try_clone()?)); let recv_stream = Arc::new(Mutex::new(inner)); @@ -66,12 +67,12 @@ impl CryptoStream { /// Reads an event from the stream. Blocks until data is received pub fn read(&self) -> VentedResult { let mut stream = self.recv_stream.lock(); - let mut length_raw = [0u8; 64]; + let mut length_raw = [0u8; 8]; stream.read_exact(&mut length_raw)?; let length = BigEndian::read_u64(&length_raw); let mut ciphertext = vec![0u8; length as usize]; - stream.read_exact(&mut ciphertext)?; + stream.read(&mut ciphertext)?; let number = self.recv_count.fetch_add(1, Ordering::SeqCst); let nonce = generate_nonce(number); @@ -91,7 +92,7 @@ impl CryptoStream { fn generate_nonce(number: usize) -> GenericArray { let result = sha2::Sha256::digest(&number.to_be_bytes()).to_vec(); let mut nonce = [0u8; 24]; - nonce.copy_from_slice(&result); + nonce.copy_from_slice(&result[0..24]); nonce.into() } diff --git a/src/result.rs b/src/result.rs index c12e9ca..326aac0 100644 --- a/src/result.rs +++ b/src/result.rs @@ -6,6 +6,8 @@ pub type VentedResult = Result; #[derive(Debug)] pub enum VentedError { NameDecodingError, + NotReady, + NotAServer(String), IOError(io::Error), SerializeError(rmp_serde::encode::Error), DeserializeError(rmp_serde::decode::Error), @@ -24,6 +26,8 @@ impl fmt::Display for VentedError { Self::CryptoError(e) => write!(f, "Cryptography Error: {}", e), Self::UnexpectedEvent(e) => write!(f, "Received unexpected event: {}", e), Self::UnknownNode(n) => write!(f, "Received connection from unknown node: {}", n), + Self::NotReady => write!(f, "The connection is still being established."), + Self::NotAServer(n) => write!(f, "The given node {} is not a server", n), } } } diff --git a/src/server/data.rs b/src/server/data.rs new file mode 100644 index 0000000..0b7a939 --- /dev/null +++ b/src/server/data.rs @@ -0,0 +1,24 @@ +use crate::crypto::CryptoStream; +use crate::event_handler::EventHandler; +use crypto_box::SecretKey; +use parking_lot::Mutex; +use scheduled_thread_pool::ScheduledThreadPool; +use std::collections::HashMap; +use std::sync::Arc; + +#[derive(Clone, Debug)] +pub struct Node { + pub id: String, + pub address: Option, +} + +#[derive(Clone)] +pub(crate) struct ServerConnectionContext { + pub is_server: bool, + pub secret_key: SecretKey, + pub own_node_id: String, + pub known_nodes: Arc>>, + pub event_handler: Arc>, + pub connections: Arc>>, + pub listener_pool: Arc>, +} diff --git a/src/server/mod.rs b/src/server/mod.rs index 0c7e3fd..9c2fe0e 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -8,111 +8,225 @@ use crate::crypto::CryptoStream; use crate::event::Event; use crate::event_handler::EventHandler; use crate::result::{VentedError, VentedResult}; +use crate::server::data::{Node, ServerConnectionContext}; use crate::server::server_events::{ NodeInformationPayload, CONNECT_EVENT, CONN_ACCEPT_EVENT, CONN_REJECT_EVENT, }; use parking_lot::Mutex; use std::io::Write; use std::sync::Arc; +use std::thread; +pub mod data; pub(crate) mod server_events; /// The vented server that provides parallel handling of connections pub struct VentedServer { connections: Arc>>, - known_nodes: Arc>>, - listener_pool: ScheduledThreadPool, - sender_pool: ScheduledThreadPool, + known_nodes: Arc>>, + listener_pool: Arc>, + sender_pool: Arc>, event_handler: Arc>, secret_key: SecretKey, node_id: String, } impl VentedServer { - pub fn new(node_id: String, nodes: Vec, num_threads: usize) -> Self { + pub fn new(node_id: String, nodes: Vec, num_threads: usize) -> Self { let mut rng = rand::thread_rng(); Self { node_id, event_handler: Arc::new(Mutex::new(EventHandler::new())), - listener_pool: ScheduledThreadPool::new(num_threads), - sender_pool: ScheduledThreadPool::new(num_threads), + listener_pool: Arc::new(Mutex::new(ScheduledThreadPool::new(num_threads))), + sender_pool: Arc::new(Mutex::new(ScheduledThreadPool::new(num_threads))), connections: Arc::new(Mutex::new(HashMap::new())), secret_key: SecretKey::generate(&mut rng), known_nodes: Arc::new(Mutex::new(nodes)), } } - /// Starts listening on the specified address (with port!) - pub fn listen(&mut self, address: &str) -> VentedResult<()> { - let listener = TcpListener::bind(address)?; - for connection in listener.incoming() { - match connection { - Ok(stream) => self.handle_connection(stream)?, - Err(e) => log::trace!("Failed to establish connection: {}", e), + /// Emits an event to the specified Node + pub fn emit(&self, node_id: String, event: Event) -> VentedResult<()> { + let handler = self.connections.lock().get(&node_id).cloned(); + + if let Some(handler) = handler { + self.sender_pool.lock().execute(move || { + handler.send(event).expect("Failed to send event"); + }); + Ok(()) + } else { + if let Some(node) = self.known_nodes.lock().iter().find(|n| n.id == node_id) { + if let Some(address) = &node.address { + let handler = self.connect(address.clone())?; + self.sender_pool.lock().execute(move || { + handler.send(event).expect("Failed to send event"); + }); + Ok(()) + } else { + Err(VentedError::NotAServer(node_id)) + } + } else { + Err(VentedError::UnknownNode(node_id)) } } + } - Ok(()) + /// Adds a handler for the given event + pub fn on(&mut self, event_name: &str, handler: F) + where + F: Fn(Event) -> Option + Send + Sync, + { + self.event_handler.lock().on(event_name, handler); } - /// Handles a single connection by first performing a key exchange and - /// then establishing an encrypted connection - fn handle_connection(&mut self, mut stream: TcpStream) -> VentedResult<()> { - let secret_key = self.secret_key.clone(); - let self_public_key = secret_key.public_key(); - let connections = Arc::clone(&self.connections); - let own_node_id = self.node_id.clone(); - let known_nodes = Arc::clone(&self.known_nodes); - let event_handler = Arc::clone(&self.event_handler); - - self.listener_pool.execute(move || { - match VentedServer::perform_key_exchange( - &mut stream, - &secret_key, - &self_public_key, - own_node_id, - known_nodes, - ) { - Ok((node_id, secret_box)) => { - let stream = CryptoStream::new(stream, secret_box) - .expect("Failed to create crypto stream"); - connections - .lock() - .insert(node_id, CryptoStream::clone(&stream)); - while let Ok(event) = stream.read() { - if let Some(response) = event_handler.lock().handle_event(event) { - stream.send(response).expect("Failed to send response"); + /// Starts listening on the specified address (with port!) + pub fn listen(&mut self, address: String) { + let context = self.get_server_context(); + + thread::spawn(move || match TcpListener::bind(address) { + Ok(listener) => { + for connection in listener.incoming() { + match connection { + Ok(stream) => { + if let Err(e) = Self::handle_connection(context.clone(), stream) { + log::error!("Failed to handle connection: {}", e); + } } + Err(e) => log::trace!("Failed to establish connection: {}", e), } } - Err(e) => log::error!("Failed to establish connection: {}", e), + } + Err(e) => log::error!("Failed to bind listener: {}", e), + }); + } + + /// Returns a copy of the servers metadata + fn get_server_context(&self) -> ServerConnectionContext { + ServerConnectionContext { + is_server: true, + own_node_id: self.node_id.clone(), + secret_key: self.secret_key.clone(), + known_nodes: Arc::clone(&self.known_nodes), + connections: Arc::clone(&self.connections), + event_handler: Arc::clone(&self.event_handler), + listener_pool: Arc::clone(&self.listener_pool), + } + } + + /// Handles a single connection by first performing a key exchange and + /// then establishing an encrypted connection + fn handle_connection(params: ServerConnectionContext, stream: TcpStream) -> VentedResult<()> { + let pool = Arc::clone(¶ms.listener_pool); + let event_handler = Arc::clone(¶ms.event_handler); + + pool.lock().execute(move || { + let stream = VentedServer::get_crypto_stream(params, stream).expect("Listener failed"); + while let Ok(event) = stream.read() { + if let Some(response) = event_handler.lock().handle_event(event) { + stream.send(response).expect("Failed to send response"); + } } }); + Ok(()) } - /// Emits an event to the specified Node - pub fn emit(&self, node_id: &str, event: Event) -> bool { - let handler = self.connections.lock().get(node_id).cloned(); + fn get_crypto_stream( + params: ServerConnectionContext, + mut stream: TcpStream, + ) -> VentedResult { + let (node_id, secret_box) = VentedServer::perform_key_exchange( + params.is_server, + &mut stream, + ¶ms.secret_key, + params.own_node_id, + params.known_nodes, + )?; - if let Some(handler) = handler { - self.sender_pool.execute(move || { - handler.send(event).expect("Failed to send event"); - }); - true + let stream = CryptoStream::new(stream, secret_box)?; + params + .connections + .lock() + .insert(node_id, CryptoStream::clone(&stream)); + + Ok(stream) + } + + /// Connects to the given address as a tcp client + fn connect(&self, address: String) -> VentedResult { + let stream = TcpStream::connect(address)?; + let mut context = self.get_server_context(); + context.is_server = false; + + let stream = Self::get_crypto_stream(context, stream)?; + self.listener_pool.lock().execute({ + let stream = CryptoStream::clone(&stream); + let event_handler = Arc::clone(&self.event_handler); + move || { + while let Ok(event) = stream.read() { + if let Some(response) = event_handler.lock().handle_event(event) { + stream.send(response).expect("Failed to send response"); + } + } + } + }); + + Ok(stream) + } + + /// Performs a key exchange + fn perform_key_exchange( + is_server: bool, + stream: &mut TcpStream, + secret_key: &SecretKey, + own_node_id: String, + known_nodes: Arc>>, + ) -> VentedResult<(String, ChaChaBox)> { + if is_server { + Self::perform_server_key_exchange(stream, secret_key, own_node_id, known_nodes) } else { - false + Self::perform_client_key_exchange(stream, secret_key, own_node_id) } } + /// Performs the client side DH key exchange + fn perform_client_key_exchange( + mut stream: &mut TcpStream, + secret_key: &SecretKey, + own_node_id: String, + ) -> VentedResult<(String, ChaChaBox)> { + stream.write( + &Event::with_payload( + CONNECT_EVENT.to_string(), + &NodeInformationPayload { + public_key: secret_key.public_key().to_bytes(), + node_id: own_node_id, + }, + ) + .as_bytes(), + )?; + stream.flush()?; + let event = Event::from_bytes(&mut stream)?; + if event.name != CONN_ACCEPT_EVENT { + return Err(VentedError::UnknownNode(event.name)); + } + let NodeInformationPayload { + public_key, + node_id, + } = event.get_payload::().unwrap(); + let public_key = PublicKey::from(public_key); + let secret_box = ChaChaBox::new(&public_key, &secret_key); + + Ok((node_id, secret_box)) + } + /// Performs a DH key exchange by using the crypto_box module and events /// On success it returns a secret box with the established secret and the node id of the client - fn perform_key_exchange( + fn perform_server_key_exchange( mut stream: &mut TcpStream, secret_key: &SecretKey, - self_public_key: &PublicKey, own_node_id: String, - known_nodes: Arc>>, + known_nodes: Arc>>, ) -> VentedResult<(String, ChaChaBox)> { let event = Event::from_bytes(&mut stream)?; if event.name != CONNECT_EVENT { @@ -124,8 +238,14 @@ impl VentedServer { } = event.get_payload::().unwrap(); let public_key = PublicKey::from(public_key); - if !known_nodes.lock().contains(&node_id) { + if known_nodes + .lock() + .iter() + .find(|n| n.id == node_id) + .is_none() + { stream.write(&Event::new(CONN_REJECT_EVENT.to_string()).as_bytes())?; + stream.flush()?; return Err(VentedError::UnknownNode(node_id)); } @@ -134,12 +254,13 @@ impl VentedServer { &Event::with_payload( CONN_ACCEPT_EVENT.to_string(), &NodeInformationPayload { - public_key: self_public_key.to_bytes(), + public_key: secret_key.public_key().to_bytes(), node_id: own_node_id, }, ) .as_bytes(), )?; + stream.flush()?; Ok((node_id, secret_box)) } diff --git a/tests/test_communication.rs b/tests/test_communication.rs index 8b13789..a9ebdf8 100644 --- a/tests/test_communication.rs +++ b/tests/test_communication.rs @@ -1 +1,57 @@ +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; +use std::thread; +use std::time::Duration; +use vented::event::Event; +use vented::server::data::Node; +use vented::server::VentedServer; +#[test] +fn test_server_communication() { + let ping_count = Arc::new(AtomicUsize::new(0)); + let pong_count = Arc::new(AtomicUsize::new(0)); + let nodes = vec![ + Node { + id: "A".to_string(), + address: Some("localhost:22222".to_string()), + }, + Node { + id: "B".to_string(), + address: None, + }, + ]; + let mut server_a = VentedServer::new("A".to_string(), nodes.clone(), 2); + let mut server_b = VentedServer::new("B".to_string(), nodes, 2); + server_a.listen("localhost:22222".to_string()); + thread::sleep(Duration::from_millis(10)); + + server_a.on("ping", { + let ping_count = Arc::clone(&ping_count); + move |_| { + ping_count.fetch_add(1, Ordering::Relaxed); + + Some(Event::new("pong".to_string())) + } + }); + server_b.on("pong", { + let pong_count = Arc::clone(&pong_count); + move |_| { + pong_count.fetch_add(1, Ordering::Relaxed); + None + } + }); + for _ in 0..10 { + server_b + .emit("A".to_string(), Event::new("ping".to_string())) + .unwrap(); + } + server_a + .emit("B".to_string(), Event::new("pong".to_string())) + .unwrap(); + + // wait one second to make sure the servers were able to process the events + thread::sleep(Duration::from_secs(1)); + + assert_eq!(ping_count.load(Ordering::Relaxed), 10); + assert_eq!(pong_count.load(Ordering::Relaxed), 11); +}