Rewrite server to use custom encryption

Signed-off-by: trivernis <trivernis@protonmail.com>
pull/1/head
trivernis 4 years ago
parent 15221b6f59
commit bf72aeeeb8
Signed by: Trivernis
GPG Key ID: DFFFCC2C7A02DB45

@ -14,4 +14,8 @@ byteorder = "1.3.4"
parking_lot = "0.11.0"
scheduled-thread-pool = "0.2.5"
log = "0.4.11"
native-tls = "0.2.4"
crypto_box = "0.5.0"
rand = "0.7.3"
sha2 = "0.9.2"
generic-array = "0.14.4"
typenum = "1.12.0"

@ -1,9 +0,0 @@
use crate::event::Event;
use crate::result::VentedResult;
pub mod tcp;
pub trait VentedClient: Sized {
fn connect(address: &str) -> VentedResult<Self>;
fn emit(&mut self, event: Event) -> VentedResult<Event>;
}

@ -1,23 +0,0 @@
use crate::client::VentedClient;
use crate::event::Event;
use crate::result::VentedResult;
use std::io::Write;
use std::net::TcpStream;
pub struct VentedTcpClient {
connection: TcpStream,
}
impl VentedClient for VentedTcpClient {
fn connect(address: &str) -> VentedResult<Self> {
Ok(Self {
connection: TcpStream::connect(address)?,
})
}
fn emit(&mut self, mut event: Event) -> VentedResult<Event> {
self.connection.write(&event.as_bytes())?;
Event::from_bytes(&mut self.connection)
}
}

@ -0,0 +1,97 @@
use std::io::{Read, Write};
use std::net::TcpStream;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use byteorder::{BigEndian, ByteOrder};
use crypto_box::aead::{Aead, Payload};
use parking_lot::Mutex;
use sha2::digest::generic_array::GenericArray;
use sha2::Digest;
use typenum::U24;
use crate::event::Event;
use crate::result::VentedResult;
/// A cryptographical stream object that handles encryption and decryption of streams
#[derive(Clone)]
pub struct CryptoStream {
send_stream: Arc<Mutex<TcpStream>>,
recv_stream: Arc<Mutex<TcpStream>>,
sent_count: Arc<AtomicUsize>,
recv_count: Arc<AtomicUsize>,
secret_box: Arc<Mutex<crypto_box::ChaChaBox>>,
}
impl CryptoStream {
/// Creates a new crypto stream from a given Tcp Stream and with a given secret
pub fn new(inner: TcpStream, secret_box: crypto_box::ChaChaBox) -> VentedResult<Self> {
let send_stream = Arc::new(Mutex::new(inner.try_clone()?));
let recv_stream = Arc::new(Mutex::new(inner));
Ok(Self {
send_stream,
recv_stream,
sent_count: Arc::new(AtomicUsize::new(0)),
recv_count: Arc::new(AtomicUsize::new(0)),
secret_box: Arc::new(Mutex::new(secret_box)),
})
}
/// Sends a new event encrypted
/// format:
/// length: u64
/// data: length
pub fn send(&self, mut event: Event) -> VentedResult<()> {
let number = self.sent_count.fetch_add(1, Ordering::SeqCst);
let nonce = generate_nonce(number);
let ciphertext = self.secret_box.lock().encrypt(
&nonce,
Payload {
msg: &event.as_bytes(),
aad: &[],
},
)?;
let mut stream = self.send_stream.lock();
let mut length_raw = [0u8; 8];
BigEndian::write_u64(&mut length_raw, ciphertext.len() as u64);
stream.write(&length_raw)?;
stream.write(&ciphertext)?;
stream.flush()?;
Ok(())
}
/// Reads an event from the stream. Blocks until data is received
pub fn read(&self) -> VentedResult<Event> {
let mut stream = self.recv_stream.lock();
let mut length_raw = [0u8; 64];
stream.read_exact(&mut length_raw)?;
let length = BigEndian::read_u64(&length_raw);
let mut ciphertext = vec![0u8; length as usize];
stream.read_exact(&mut ciphertext)?;
let number = self.recv_count.fetch_add(1, Ordering::SeqCst);
let nonce = generate_nonce(number);
let plaintext = self.secret_box.lock().decrypt(
&nonce,
Payload {
msg: &ciphertext,
aad: &[],
},
)?;
Event::from_bytes(&mut &plaintext[..])
}
}
/// Generates a nonce by hashing the input number which is the message counter
fn generate_nonce(number: usize) -> GenericArray<u8, U24> {
let result = sha2::Sha256::digest(&number.to_be_bytes()).to_vec();
let mut nonce = [0u8; 24];
nonce.copy_from_slice(&result);
nonce.into()
}

@ -20,8 +20,8 @@ impl EventHandler {
/// Adds a handler for the given event
pub fn on<F: 'static>(&mut self, event_name: &str, handler: F)
where
F: Fn(Event) -> Option<Event> + Send + Sync,
where
F: Fn(Event) -> Option<Event> + Send + Sync,
{
match self.event_handlers.get_mut(event_name) {
Some(handlers) => handlers.push(Box::new(handler)),

@ -1,4 +1,4 @@
pub mod client;
pub mod crypto;
pub mod event;
pub mod event_handler;
pub mod result;

@ -7,19 +7,23 @@ pub type VentedResult<T> = Result<T, VentedError>;
pub enum VentedError {
NameDecodingError,
IOError(io::Error),
TLSError(native_tls::Error),
SerializeError(rmp_serde::encode::Error),
DeserializeError(rmp_serde::decode::Error),
CryptoError(crypto_box::aead::Error),
UnexpectedEvent(String),
UnknownNode(String),
}
impl fmt::Display for VentedError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::NameDecodingError => write!(f, "Failed to decode event name"),
Self::IOError(e) => write!(f, "IO Error: {}", e.to_string()),
Self::SerializeError(e) => write!(f, "Serialization Error: {}", e.to_string()),
Self::DeserializeError(e) => write!(f, "Deserialization Error: {}", e.to_string()),
Self::TLSError(e) => write!(f, "TLS Error: {}", e.to_string()),
Self::IOError(e) => write!(f, "IO Error: {}", e),
Self::SerializeError(e) => write!(f, "Serialization Error: {}", e),
Self::DeserializeError(e) => write!(f, "Deserialization Error: {}", e),
Self::CryptoError(e) => write!(f, "Cryptography Error: {}", e),
Self::UnexpectedEvent(e) => write!(f, "Received unexpected event: {}", e),
Self::UnknownNode(n) => write!(f, "Received connection from unknown node: {}", n),
}
}
}
@ -44,8 +48,8 @@ impl From<rmp_serde::decode::Error> for VentedError {
}
}
impl From<native_tls::Error> for VentedError {
fn from(other: native_tls::Error) -> Self {
Self::TLSError(other)
impl From<crypto_box::aead::Error> for VentedError {
fn from(other: crypto_box::aead::Error) -> Self {
Self::CryptoError(other)
}
}

@ -1,13 +1,146 @@
use std::collections::HashMap;
use std::net::{TcpListener, TcpStream};
use crypto_box::{ChaChaBox, PublicKey, SecretKey};
use scheduled_thread_pool::ScheduledThreadPool;
use crate::crypto::CryptoStream;
use crate::event::Event;
use crate::result::VentedResult;
use crate::event_handler::EventHandler;
use crate::result::{VentedError, VentedResult};
use crate::server::server_events::{
NodeInformationPayload, CONNECT_EVENT, CONN_ACCEPT_EVENT, CONN_REJECT_EVENT,
};
use parking_lot::Mutex;
use std::io::Write;
use std::sync::Arc;
pub(crate) mod server_events;
pub mod tcp;
pub mod tls;
pub trait VentedServer {
fn listen(&mut self, address: &str) -> VentedResult<()>;
fn on<F: 'static>(&mut self, event_name: &str, handler: F)
where
F: Fn(Event) -> Option<Event> + Send + Sync;
/// The vented server that provides parallel handling of connections
pub struct VentedServer {
connections: Arc<Mutex<HashMap<String, CryptoStream>>>,
known_nodes: Arc<Mutex<Vec<String>>>,
listener_pool: ScheduledThreadPool,
sender_pool: ScheduledThreadPool,
event_handler: Arc<Mutex<EventHandler>>,
secret_key: SecretKey,
node_id: String,
}
impl VentedServer {
pub fn new(node_id: String, nodes: Vec<String>, num_threads: usize) -> Self {
let mut rng = rand::thread_rng();
Self {
node_id,
event_handler: Arc::new(Mutex::new(EventHandler::new())),
listener_pool: ScheduledThreadPool::new(num_threads),
sender_pool: ScheduledThreadPool::new(num_threads),
connections: Arc::new(Mutex::new(HashMap::new())),
secret_key: SecretKey::generate(&mut rng),
known_nodes: Arc::new(Mutex::new(nodes)),
}
}
/// Starts listening on the specified address (with port!)
pub fn listen(&mut self, address: &str) -> VentedResult<()> {
let listener = TcpListener::bind(address)?;
for connection in listener.incoming() {
match connection {
Ok(stream) => self.handle_connection(stream)?,
Err(e) => log::trace!("Failed to establish connection: {}", e),
}
}
Ok(())
}
/// Handles a single connection by first performing a key exchange and
/// then establishing an encrypted connection
fn handle_connection(&mut self, mut stream: TcpStream) -> VentedResult<()> {
let secret_key = self.secret_key.clone();
let self_public_key = secret_key.public_key();
let connections = Arc::clone(&self.connections);
let own_node_id = self.node_id.clone();
let known_nodes = Arc::clone(&self.known_nodes);
let event_handler = Arc::clone(&self.event_handler);
self.listener_pool.execute(move || {
match VentedServer::perform_key_exchange(
&mut stream,
&secret_key,
&self_public_key,
own_node_id,
known_nodes,
) {
Ok((node_id, secret_box)) => {
let stream = CryptoStream::new(stream, secret_box)
.expect("Failed to create crypto stream");
connections
.lock()
.insert(node_id, CryptoStream::clone(&stream));
while let Ok(event) = stream.read() {
if let Some(response) = event_handler.lock().handle_event(event) {
stream.send(response).expect("Failed to send response");
}
}
}
Err(e) => log::error!("Failed to establish connection: {}", e),
}
});
Ok(())
}
/// Emits an event to the specified Node
pub fn emit(&self, node_id: &str, event: Event) -> bool {
let handler = self.connections.lock().get(node_id).cloned();
if let Some(handler) = handler {
self.sender_pool.execute(move || {
handler.send(event).expect("Failed to send event");
});
true
} else {
false
}
}
/// Performs a DH key exchange by using the crypto_box module and events
/// On success it returns a secret box with the established secret and the node id of the client
fn perform_key_exchange(
mut stream: &mut TcpStream,
secret_key: &SecretKey,
self_public_key: &PublicKey,
own_node_id: String,
known_nodes: Arc<Mutex<Vec<String>>>,
) -> VentedResult<(String, ChaChaBox)> {
let event = Event::from_bytes(&mut stream)?;
if event.name != CONNECT_EVENT {
return Err(VentedError::UnexpectedEvent(event.name));
}
let NodeInformationPayload {
public_key,
node_id,
} = event.get_payload::<NodeInformationPayload>().unwrap();
let public_key = PublicKey::from(public_key);
if !known_nodes.lock().contains(&node_id) {
stream.write(&Event::new(CONN_REJECT_EVENT.to_string()).as_bytes())?;
return Err(VentedError::UnknownNode(node_id));
}
let secret_box = ChaChaBox::new(&public_key, &secret_key);
stream.write(
&Event::with_payload(
CONN_ACCEPT_EVENT.to_string(),
&NodeInformationPayload {
public_key: self_public_key.to_bytes(),
node_id: own_node_id,
},
)
.as_bytes(),
)?;
Ok((node_id, secret_box))
}
}

@ -1,7 +1,11 @@
use crate::event_handler::EventHandler;
use serde::{Deserialize, Serialize};
pub(crate) fn get_server_event_handler() -> EventHandler {
let handler = EventHandler::new();
pub(crate) const CONNECT_EVENT: &str = "client:connect";
pub(crate) const CONN_ACCEPT_EVENT: &str = "server:conn_accept";
pub(crate) const CONN_REJECT_EVENT: &str = "server:conn_reject";
handler
#[derive(Serialize, Deserialize, Debug)]
pub(crate) struct NodeInformationPayload {
pub node_id: String,
pub public_key: [u8; 32],
}

@ -1,75 +0,0 @@
use std::borrow::BorrowMut;
use std::net::{TcpListener, TcpStream};
use std::sync::Arc;
use parking_lot::Mutex;
use scheduled_thread_pool::ScheduledThreadPool;
use crate::event::Event;
use crate::event_handler::EventHandler;
use crate::result::VentedResult;
use crate::server::server_events::get_server_event_handler;
use crate::server::VentedServer;
use std::io::Write;
pub struct VentedTcpServer {
event_handler: Arc<Mutex<EventHandler>>,
pool: ScheduledThreadPool,
}
impl VentedServer for VentedTcpServer {
/// Starts listening on the given address
fn listen(&mut self, address: &str) -> VentedResult<()> {
let listener = TcpListener::bind(address)?;
for stream in listener.incoming() {
log::trace!("Connection received.");
match stream {
Ok(stream) => self.handle_connection(stream),
Err(e) => log::error!("Failed to handle connection: {}", e),
}
}
Ok(())
}
/// Registers an event on the internal event handler
fn on<F: 'static>(&mut self, event_name: &str, handler: F)
where
F: Fn(Event) -> Option<Event> + Send + Sync,
{
self.event_handler
.lock()
.borrow_mut()
.on(event_name, handler);
}
}
impl VentedTcpServer {
/// Creates a new server that runs on the specified number of threads
pub fn new(num_threads: usize) -> Self {
let event_handler = get_server_event_handler();
let pool = ScheduledThreadPool::new(num_threads);
Self {
event_handler: Arc::new(Mutex::new(event_handler)),
pool,
}
}
/// Handles what happens on connection
fn handle_connection(&mut self, mut stream: TcpStream) {
let handler = Arc::clone(&self.event_handler);
self.pool.execute(move || loop {
if let Ok(event) = Event::from_bytes(&mut stream) {
if let Some(mut event) = handler.lock().handle_event(event) {
if let Err(e) = stream.write(&event.as_bytes()) {
log::error!("Failed to respond to event: {}", e)
}
}
} else {
log::warn!("Failed to create an Event from received bytes.");
break;
}
});
}
}

@ -1,83 +0,0 @@
use crate::event::Event;
use crate::event_handler::EventHandler;
use crate::result::VentedResult;
use crate::server::server_events::get_server_event_handler;
use crate::server::VentedServer;
use native_tls::{Identity, TlsAcceptor};
use parking_lot::Mutex;
use scheduled_thread_pool::ScheduledThreadPool;
use std::borrow::BorrowMut;
use std::io::Write;
use std::net::{TcpListener, TcpStream};
use std::sync::Arc;
pub struct VentedTlsServer {
event_handler: Arc<Mutex<EventHandler>>,
pool: ScheduledThreadPool,
identity: Identity,
}
impl VentedServer for VentedTlsServer {
fn listen(&mut self, address: &str) -> VentedResult<()> {
let listener = TcpListener::bind(address)?;
let acceptor = TlsAcceptor::new(self.identity.clone())?;
let acceptor = Arc::new(acceptor);
for stream in listener.incoming() {
log::trace!("Connection received.");
match stream {
Ok(stream) => self.handle_connection(stream, Arc::clone(&acceptor)),
Err(e) => log::error!("Failed to handle connection: {}", e),
}
}
Ok(())
}
fn on<F: 'static>(&mut self, event_name: &str, handler: F)
where
F: Fn(Event) -> Option<Event> + Send + Sync,
{
self.event_handler
.lock()
.borrow_mut()
.on(event_name, handler);
}
}
impl VentedTlsServer {
/// Creates a new server that runs on the specified number of threads
/// with the given tls identity
pub fn new(num_threads: usize, identity: Identity) -> Self {
let event_handler = get_server_event_handler();
let pool = ScheduledThreadPool::new(num_threads);
Self {
event_handler: Arc::new(Mutex::new(event_handler)),
pool,
identity,
}
}
fn handle_connection(&self, stream: TcpStream, acceptor: Arc<TlsAcceptor>) {
let handler = Arc::clone(&self.event_handler);
self.pool.execute(move || {
if let Ok(mut stream) = acceptor.accept(stream) {
loop {
if let Ok(event) = Event::from_bytes(&mut stream) {
if let Some(mut event) = handler.lock().handle_event(event) {
if let Err(e) = stream.write(&event.as_bytes()) {
log::error!("Failed to respond to event: {}", e)
}
}
} else {
log::warn!("Failed to create an Event from received bytes.");
break;
}
}
} else {
log::error!("TLS Error when handling connection")
}
});
}
}

@ -1,46 +1 @@
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::Duration;
use vented::client::tcp::VentedTcpClient;
use vented::client::VentedClient;
use vented::event::Event;
use vented::server::tcp::VentedTcpServer;
use vented::server::VentedServer;
#[test]
fn test_pong_event() {
static ADDRESS: &str = "localhost:22222";
static PING: &str = "ping";
static PONG: &str = "pong";
let ping_count = Arc::new(AtomicUsize::new(0));
let server_ready = Arc::new(AtomicBool::new(false));
let mut server = VentedTcpServer::new(1);
{
let ping_received = Arc::clone(&ping_count);
server.on(PING, move |_event| {
ping_received.fetch_add(1, Ordering::Relaxed);
Some(Event::new(PONG.to_string()))
});
}
thread::spawn({
let server_ready = Arc::clone(&server_ready);
move || {
server_ready.store(true, Ordering::Relaxed);
server.listen(ADDRESS).unwrap();
}
});
while !server_ready.load(Ordering::Relaxed) {
thread::sleep(Duration::from_millis(1));
}
let mut client = VentedTcpClient::connect(ADDRESS).unwrap();
client.emit(Event::new(PING.to_string())).unwrap();
let response = client.emit(Event::new(PING.to_string())).unwrap();
assert_eq!(ping_count.load(Ordering::Relaxed), 2);
assert_eq!(response.name, PONG);
}

Loading…
Cancel
Save