From bf72aeeeb8c616b552ec42c7761242a7bf42f5fd Mon Sep 17 00:00:00 2001 From: trivernis Date: Wed, 4 Nov 2020 17:37:45 +0100 Subject: [PATCH] Rewrite server to use custom encryption Signed-off-by: trivernis --- Cargo.toml | 6 +- src/client/mod.rs | 9 --- src/client/tcp/mod.rs | 23 ------ src/crypto/mod.rs | 97 +++++++++++++++++++++++ src/event_handler/mod.rs | 4 +- src/lib.rs | 2 +- src/result.rs | 20 +++-- src/server/mod.rs | 151 +++++++++++++++++++++++++++++++++--- src/server/server_events.rs | 12 ++- src/server/tcp/mod.rs | 75 ------------------ src/server/tls/mod.rs | 83 -------------------- tests/test_communication.rs | 45 ----------- 12 files changed, 267 insertions(+), 260 deletions(-) delete mode 100644 src/client/mod.rs delete mode 100644 src/client/tcp/mod.rs create mode 100644 src/crypto/mod.rs delete mode 100644 src/server/tcp/mod.rs delete mode 100644 src/server/tls/mod.rs diff --git a/Cargo.toml b/Cargo.toml index c292d85..62a07a1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,4 +14,8 @@ byteorder = "1.3.4" parking_lot = "0.11.0" scheduled-thread-pool = "0.2.5" log = "0.4.11" -native-tls = "0.2.4" \ No newline at end of file +crypto_box = "0.5.0" +rand = "0.7.3" +sha2 = "0.9.2" +generic-array = "0.14.4" +typenum = "1.12.0" \ No newline at end of file diff --git a/src/client/mod.rs b/src/client/mod.rs deleted file mode 100644 index fedec60..0000000 --- a/src/client/mod.rs +++ /dev/null @@ -1,9 +0,0 @@ -use crate::event::Event; -use crate::result::VentedResult; - -pub mod tcp; - -pub trait VentedClient: Sized { - fn connect(address: &str) -> VentedResult; - fn emit(&mut self, event: Event) -> VentedResult; -} diff --git a/src/client/tcp/mod.rs b/src/client/tcp/mod.rs deleted file mode 100644 index b208f6d..0000000 --- a/src/client/tcp/mod.rs +++ /dev/null @@ -1,23 +0,0 @@ -use crate::client::VentedClient; -use crate::event::Event; -use crate::result::VentedResult; -use std::io::Write; -use std::net::TcpStream; - -pub struct VentedTcpClient { - connection: TcpStream, -} - -impl VentedClient for VentedTcpClient { - fn connect(address: &str) -> VentedResult { - Ok(Self { - connection: TcpStream::connect(address)?, - }) - } - - fn emit(&mut self, mut event: Event) -> VentedResult { - self.connection.write(&event.as_bytes())?; - - Event::from_bytes(&mut self.connection) - } -} diff --git a/src/crypto/mod.rs b/src/crypto/mod.rs new file mode 100644 index 0000000..f0351a9 --- /dev/null +++ b/src/crypto/mod.rs @@ -0,0 +1,97 @@ +use std::io::{Read, Write}; +use std::net::TcpStream; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; + +use byteorder::{BigEndian, ByteOrder}; +use crypto_box::aead::{Aead, Payload}; +use parking_lot::Mutex; +use sha2::digest::generic_array::GenericArray; +use sha2::Digest; +use typenum::U24; + +use crate::event::Event; +use crate::result::VentedResult; + +/// A cryptographical stream object that handles encryption and decryption of streams +#[derive(Clone)] +pub struct CryptoStream { + send_stream: Arc>, + recv_stream: Arc>, + sent_count: Arc, + recv_count: Arc, + secret_box: Arc>, +} + +impl CryptoStream { + /// Creates a new crypto stream from a given Tcp Stream and with a given secret + pub fn new(inner: TcpStream, secret_box: crypto_box::ChaChaBox) -> VentedResult { + let send_stream = Arc::new(Mutex::new(inner.try_clone()?)); + let recv_stream = Arc::new(Mutex::new(inner)); + + Ok(Self { + send_stream, + recv_stream, + sent_count: Arc::new(AtomicUsize::new(0)), + recv_count: Arc::new(AtomicUsize::new(0)), + secret_box: Arc::new(Mutex::new(secret_box)), + }) + } + + /// Sends a new event encrypted + /// format: + /// length: u64 + /// data: length + pub fn send(&self, mut event: Event) -> VentedResult<()> { + let number = self.sent_count.fetch_add(1, Ordering::SeqCst); + let nonce = generate_nonce(number); + let ciphertext = self.secret_box.lock().encrypt( + &nonce, + Payload { + msg: &event.as_bytes(), + aad: &[], + }, + )?; + let mut stream = self.send_stream.lock(); + let mut length_raw = [0u8; 8]; + BigEndian::write_u64(&mut length_raw, ciphertext.len() as u64); + + stream.write(&length_raw)?; + stream.write(&ciphertext)?; + stream.flush()?; + + Ok(()) + } + + /// Reads an event from the stream. Blocks until data is received + pub fn read(&self) -> VentedResult { + let mut stream = self.recv_stream.lock(); + let mut length_raw = [0u8; 64]; + stream.read_exact(&mut length_raw)?; + + let length = BigEndian::read_u64(&length_raw); + let mut ciphertext = vec![0u8; length as usize]; + stream.read_exact(&mut ciphertext)?; + + let number = self.recv_count.fetch_add(1, Ordering::SeqCst); + let nonce = generate_nonce(number); + let plaintext = self.secret_box.lock().decrypt( + &nonce, + Payload { + msg: &ciphertext, + aad: &[], + }, + )?; + + Event::from_bytes(&mut &plaintext[..]) + } +} + +/// Generates a nonce by hashing the input number which is the message counter +fn generate_nonce(number: usize) -> GenericArray { + let result = sha2::Sha256::digest(&number.to_be_bytes()).to_vec(); + let mut nonce = [0u8; 24]; + nonce.copy_from_slice(&result); + + nonce.into() +} diff --git a/src/event_handler/mod.rs b/src/event_handler/mod.rs index cea2fda..ebe6d3a 100644 --- a/src/event_handler/mod.rs +++ b/src/event_handler/mod.rs @@ -20,8 +20,8 @@ impl EventHandler { /// Adds a handler for the given event pub fn on(&mut self, event_name: &str, handler: F) - where - F: Fn(Event) -> Option + Send + Sync, + where + F: Fn(Event) -> Option + Send + Sync, { match self.event_handlers.get_mut(event_name) { Some(handlers) => handlers.push(Box::new(handler)), diff --git a/src/lib.rs b/src/lib.rs index c10e72d..e002755 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,4 @@ -pub mod client; +pub mod crypto; pub mod event; pub mod event_handler; pub mod result; diff --git a/src/result.rs b/src/result.rs index 7124342..c12e9ca 100644 --- a/src/result.rs +++ b/src/result.rs @@ -7,19 +7,23 @@ pub type VentedResult = Result; pub enum VentedError { NameDecodingError, IOError(io::Error), - TLSError(native_tls::Error), SerializeError(rmp_serde::encode::Error), DeserializeError(rmp_serde::decode::Error), + CryptoError(crypto_box::aead::Error), + UnexpectedEvent(String), + UnknownNode(String), } impl fmt::Display for VentedError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { Self::NameDecodingError => write!(f, "Failed to decode event name"), - Self::IOError(e) => write!(f, "IO Error: {}", e.to_string()), - Self::SerializeError(e) => write!(f, "Serialization Error: {}", e.to_string()), - Self::DeserializeError(e) => write!(f, "Deserialization Error: {}", e.to_string()), - Self::TLSError(e) => write!(f, "TLS Error: {}", e.to_string()), + Self::IOError(e) => write!(f, "IO Error: {}", e), + Self::SerializeError(e) => write!(f, "Serialization Error: {}", e), + Self::DeserializeError(e) => write!(f, "Deserialization Error: {}", e), + Self::CryptoError(e) => write!(f, "Cryptography Error: {}", e), + Self::UnexpectedEvent(e) => write!(f, "Received unexpected event: {}", e), + Self::UnknownNode(n) => write!(f, "Received connection from unknown node: {}", n), } } } @@ -44,8 +48,8 @@ impl From for VentedError { } } -impl From for VentedError { - fn from(other: native_tls::Error) -> Self { - Self::TLSError(other) +impl From for VentedError { + fn from(other: crypto_box::aead::Error) -> Self { + Self::CryptoError(other) } } diff --git a/src/server/mod.rs b/src/server/mod.rs index 06950a3..0c7e3fd 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -1,13 +1,146 @@ +use std::collections::HashMap; +use std::net::{TcpListener, TcpStream}; + +use crypto_box::{ChaChaBox, PublicKey, SecretKey}; +use scheduled_thread_pool::ScheduledThreadPool; + +use crate::crypto::CryptoStream; use crate::event::Event; -use crate::result::VentedResult; +use crate::event_handler::EventHandler; +use crate::result::{VentedError, VentedResult}; +use crate::server::server_events::{ + NodeInformationPayload, CONNECT_EVENT, CONN_ACCEPT_EVENT, CONN_REJECT_EVENT, +}; +use parking_lot::Mutex; +use std::io::Write; +use std::sync::Arc; pub(crate) mod server_events; -pub mod tcp; -pub mod tls; - -pub trait VentedServer { - fn listen(&mut self, address: &str) -> VentedResult<()>; - fn on(&mut self, event_name: &str, handler: F) - where - F: Fn(Event) -> Option + Send + Sync; + +/// The vented server that provides parallel handling of connections +pub struct VentedServer { + connections: Arc>>, + known_nodes: Arc>>, + listener_pool: ScheduledThreadPool, + sender_pool: ScheduledThreadPool, + event_handler: Arc>, + secret_key: SecretKey, + node_id: String, +} + +impl VentedServer { + pub fn new(node_id: String, nodes: Vec, num_threads: usize) -> Self { + let mut rng = rand::thread_rng(); + Self { + node_id, + event_handler: Arc::new(Mutex::new(EventHandler::new())), + listener_pool: ScheduledThreadPool::new(num_threads), + sender_pool: ScheduledThreadPool::new(num_threads), + connections: Arc::new(Mutex::new(HashMap::new())), + secret_key: SecretKey::generate(&mut rng), + known_nodes: Arc::new(Mutex::new(nodes)), + } + } + + /// Starts listening on the specified address (with port!) + pub fn listen(&mut self, address: &str) -> VentedResult<()> { + let listener = TcpListener::bind(address)?; + for connection in listener.incoming() { + match connection { + Ok(stream) => self.handle_connection(stream)?, + Err(e) => log::trace!("Failed to establish connection: {}", e), + } + } + + Ok(()) + } + + /// Handles a single connection by first performing a key exchange and + /// then establishing an encrypted connection + fn handle_connection(&mut self, mut stream: TcpStream) -> VentedResult<()> { + let secret_key = self.secret_key.clone(); + let self_public_key = secret_key.public_key(); + let connections = Arc::clone(&self.connections); + let own_node_id = self.node_id.clone(); + let known_nodes = Arc::clone(&self.known_nodes); + let event_handler = Arc::clone(&self.event_handler); + + self.listener_pool.execute(move || { + match VentedServer::perform_key_exchange( + &mut stream, + &secret_key, + &self_public_key, + own_node_id, + known_nodes, + ) { + Ok((node_id, secret_box)) => { + let stream = CryptoStream::new(stream, secret_box) + .expect("Failed to create crypto stream"); + connections + .lock() + .insert(node_id, CryptoStream::clone(&stream)); + while let Ok(event) = stream.read() { + if let Some(response) = event_handler.lock().handle_event(event) { + stream.send(response).expect("Failed to send response"); + } + } + } + Err(e) => log::error!("Failed to establish connection: {}", e), + } + }); + Ok(()) + } + + /// Emits an event to the specified Node + pub fn emit(&self, node_id: &str, event: Event) -> bool { + let handler = self.connections.lock().get(node_id).cloned(); + + if let Some(handler) = handler { + self.sender_pool.execute(move || { + handler.send(event).expect("Failed to send event"); + }); + true + } else { + false + } + } + + /// Performs a DH key exchange by using the crypto_box module and events + /// On success it returns a secret box with the established secret and the node id of the client + fn perform_key_exchange( + mut stream: &mut TcpStream, + secret_key: &SecretKey, + self_public_key: &PublicKey, + own_node_id: String, + known_nodes: Arc>>, + ) -> VentedResult<(String, ChaChaBox)> { + let event = Event::from_bytes(&mut stream)?; + if event.name != CONNECT_EVENT { + return Err(VentedError::UnexpectedEvent(event.name)); + } + let NodeInformationPayload { + public_key, + node_id, + } = event.get_payload::().unwrap(); + let public_key = PublicKey::from(public_key); + + if !known_nodes.lock().contains(&node_id) { + stream.write(&Event::new(CONN_REJECT_EVENT.to_string()).as_bytes())?; + return Err(VentedError::UnknownNode(node_id)); + } + + let secret_box = ChaChaBox::new(&public_key, &secret_key); + stream.write( + &Event::with_payload( + CONN_ACCEPT_EVENT.to_string(), + &NodeInformationPayload { + public_key: self_public_key.to_bytes(), + node_id: own_node_id, + }, + ) + .as_bytes(), + )?; + + Ok((node_id, secret_box)) + } } diff --git a/src/server/server_events.rs b/src/server/server_events.rs index 96a6651..ddcd7fc 100644 --- a/src/server/server_events.rs +++ b/src/server/server_events.rs @@ -1,7 +1,11 @@ -use crate::event_handler::EventHandler; +use serde::{Deserialize, Serialize}; -pub(crate) fn get_server_event_handler() -> EventHandler { - let handler = EventHandler::new(); +pub(crate) const CONNECT_EVENT: &str = "client:connect"; +pub(crate) const CONN_ACCEPT_EVENT: &str = "server:conn_accept"; +pub(crate) const CONN_REJECT_EVENT: &str = "server:conn_reject"; - handler +#[derive(Serialize, Deserialize, Debug)] +pub(crate) struct NodeInformationPayload { + pub node_id: String, + pub public_key: [u8; 32], } diff --git a/src/server/tcp/mod.rs b/src/server/tcp/mod.rs deleted file mode 100644 index 6ccacba..0000000 --- a/src/server/tcp/mod.rs +++ /dev/null @@ -1,75 +0,0 @@ -use std::borrow::BorrowMut; -use std::net::{TcpListener, TcpStream}; -use std::sync::Arc; - -use parking_lot::Mutex; -use scheduled_thread_pool::ScheduledThreadPool; - -use crate::event::Event; -use crate::event_handler::EventHandler; -use crate::result::VentedResult; -use crate::server::server_events::get_server_event_handler; -use crate::server::VentedServer; -use std::io::Write; - -pub struct VentedTcpServer { - event_handler: Arc>, - pool: ScheduledThreadPool, -} - -impl VentedServer for VentedTcpServer { - /// Starts listening on the given address - fn listen(&mut self, address: &str) -> VentedResult<()> { - let listener = TcpListener::bind(address)?; - for stream in listener.incoming() { - log::trace!("Connection received."); - match stream { - Ok(stream) => self.handle_connection(stream), - Err(e) => log::error!("Failed to handle connection: {}", e), - } - } - - Ok(()) - } - - /// Registers an event on the internal event handler - fn on(&mut self, event_name: &str, handler: F) - where - F: Fn(Event) -> Option + Send + Sync, - { - self.event_handler - .lock() - .borrow_mut() - .on(event_name, handler); - } -} - -impl VentedTcpServer { - /// Creates a new server that runs on the specified number of threads - pub fn new(num_threads: usize) -> Self { - let event_handler = get_server_event_handler(); - let pool = ScheduledThreadPool::new(num_threads); - - Self { - event_handler: Arc::new(Mutex::new(event_handler)), - pool, - } - } - - /// Handles what happens on connection - fn handle_connection(&mut self, mut stream: TcpStream) { - let handler = Arc::clone(&self.event_handler); - self.pool.execute(move || loop { - if let Ok(event) = Event::from_bytes(&mut stream) { - if let Some(mut event) = handler.lock().handle_event(event) { - if let Err(e) = stream.write(&event.as_bytes()) { - log::error!("Failed to respond to event: {}", e) - } - } - } else { - log::warn!("Failed to create an Event from received bytes."); - break; - } - }); - } -} diff --git a/src/server/tls/mod.rs b/src/server/tls/mod.rs deleted file mode 100644 index a48af4a..0000000 --- a/src/server/tls/mod.rs +++ /dev/null @@ -1,83 +0,0 @@ -use crate::event::Event; -use crate::event_handler::EventHandler; -use crate::result::VentedResult; -use crate::server::server_events::get_server_event_handler; -use crate::server::VentedServer; -use native_tls::{Identity, TlsAcceptor}; -use parking_lot::Mutex; -use scheduled_thread_pool::ScheduledThreadPool; -use std::borrow::BorrowMut; -use std::io::Write; -use std::net::{TcpListener, TcpStream}; -use std::sync::Arc; - -pub struct VentedTlsServer { - event_handler: Arc>, - pool: ScheduledThreadPool, - identity: Identity, -} - -impl VentedServer for VentedTlsServer { - fn listen(&mut self, address: &str) -> VentedResult<()> { - let listener = TcpListener::bind(address)?; - let acceptor = TlsAcceptor::new(self.identity.clone())?; - let acceptor = Arc::new(acceptor); - - for stream in listener.incoming() { - log::trace!("Connection received."); - match stream { - Ok(stream) => self.handle_connection(stream, Arc::clone(&acceptor)), - Err(e) => log::error!("Failed to handle connection: {}", e), - } - } - - Ok(()) - } - - fn on(&mut self, event_name: &str, handler: F) - where - F: Fn(Event) -> Option + Send + Sync, - { - self.event_handler - .lock() - .borrow_mut() - .on(event_name, handler); - } -} - -impl VentedTlsServer { - /// Creates a new server that runs on the specified number of threads - /// with the given tls identity - pub fn new(num_threads: usize, identity: Identity) -> Self { - let event_handler = get_server_event_handler(); - let pool = ScheduledThreadPool::new(num_threads); - - Self { - event_handler: Arc::new(Mutex::new(event_handler)), - pool, - identity, - } - } - - fn handle_connection(&self, stream: TcpStream, acceptor: Arc) { - let handler = Arc::clone(&self.event_handler); - self.pool.execute(move || { - if let Ok(mut stream) = acceptor.accept(stream) { - loop { - if let Ok(event) = Event::from_bytes(&mut stream) { - if let Some(mut event) = handler.lock().handle_event(event) { - if let Err(e) = stream.write(&event.as_bytes()) { - log::error!("Failed to respond to event: {}", e) - } - } - } else { - log::warn!("Failed to create an Event from received bytes."); - break; - } - } - } else { - log::error!("TLS Error when handling connection") - } - }); - } -} diff --git a/tests/test_communication.rs b/tests/test_communication.rs index 8be6373..8b13789 100644 --- a/tests/test_communication.rs +++ b/tests/test_communication.rs @@ -1,46 +1 @@ -use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; -use std::sync::Arc; -use std::thread; -use std::time::Duration; -use vented::client::tcp::VentedTcpClient; -use vented::client::VentedClient; -use vented::event::Event; -use vented::server::tcp::VentedTcpServer; -use vented::server::VentedServer; -#[test] -fn test_pong_event() { - static ADDRESS: &str = "localhost:22222"; - static PING: &str = "ping"; - static PONG: &str = "pong"; - let ping_count = Arc::new(AtomicUsize::new(0)); - let server_ready = Arc::new(AtomicBool::new(false)); - - let mut server = VentedTcpServer::new(1); - { - let ping_received = Arc::clone(&ping_count); - server.on(PING, move |_event| { - ping_received.fetch_add(1, Ordering::Relaxed); - - Some(Event::new(PONG.to_string())) - }); - } - - thread::spawn({ - let server_ready = Arc::clone(&server_ready); - move || { - server_ready.store(true, Ordering::Relaxed); - server.listen(ADDRESS).unwrap(); - } - }); - - while !server_ready.load(Ordering::Relaxed) { - thread::sleep(Duration::from_millis(1)); - } - let mut client = VentedTcpClient::connect(ADDRESS).unwrap(); - client.emit(Event::new(PING.to_string())).unwrap(); - let response = client.emit(Event::new(PING.to_string())).unwrap(); - - assert_eq!(ping_count.load(Ordering::Relaxed), 2); - assert_eq!(response.name, PONG); -}