Add event emitting to server

Signed-off-by: trivernis <trivernis@protonmail.com>
pull/1/head
trivernis 4 years ago
parent bf72aeeeb8
commit 99efe4d587
Signed by: Trivernis
GPG Key ID: DFFFCC2C7A02DB45

@ -26,6 +26,7 @@ pub struct CryptoStream {
impl CryptoStream { impl CryptoStream {
/// Creates a new crypto stream from a given Tcp Stream and with a given secret /// Creates a new crypto stream from a given Tcp Stream and with a given secret
pub fn new(inner: TcpStream, secret_box: crypto_box::ChaChaBox) -> VentedResult<Self> { pub fn new(inner: TcpStream, secret_box: crypto_box::ChaChaBox) -> VentedResult<Self> {
inner.set_nonblocking(false)?;
let send_stream = Arc::new(Mutex::new(inner.try_clone()?)); let send_stream = Arc::new(Mutex::new(inner.try_clone()?));
let recv_stream = Arc::new(Mutex::new(inner)); let recv_stream = Arc::new(Mutex::new(inner));
@ -66,12 +67,12 @@ impl CryptoStream {
/// Reads an event from the stream. Blocks until data is received /// Reads an event from the stream. Blocks until data is received
pub fn read(&self) -> VentedResult<Event> { pub fn read(&self) -> VentedResult<Event> {
let mut stream = self.recv_stream.lock(); let mut stream = self.recv_stream.lock();
let mut length_raw = [0u8; 64]; let mut length_raw = [0u8; 8];
stream.read_exact(&mut length_raw)?; stream.read_exact(&mut length_raw)?;
let length = BigEndian::read_u64(&length_raw); let length = BigEndian::read_u64(&length_raw);
let mut ciphertext = vec![0u8; length as usize]; let mut ciphertext = vec![0u8; length as usize];
stream.read_exact(&mut ciphertext)?; stream.read(&mut ciphertext)?;
let number = self.recv_count.fetch_add(1, Ordering::SeqCst); let number = self.recv_count.fetch_add(1, Ordering::SeqCst);
let nonce = generate_nonce(number); let nonce = generate_nonce(number);
@ -91,7 +92,7 @@ impl CryptoStream {
fn generate_nonce(number: usize) -> GenericArray<u8, U24> { fn generate_nonce(number: usize) -> GenericArray<u8, U24> {
let result = sha2::Sha256::digest(&number.to_be_bytes()).to_vec(); let result = sha2::Sha256::digest(&number.to_be_bytes()).to_vec();
let mut nonce = [0u8; 24]; let mut nonce = [0u8; 24];
nonce.copy_from_slice(&result); nonce.copy_from_slice(&result[0..24]);
nonce.into() nonce.into()
} }

@ -6,6 +6,8 @@ pub type VentedResult<T> = Result<T, VentedError>;
#[derive(Debug)] #[derive(Debug)]
pub enum VentedError { pub enum VentedError {
NameDecodingError, NameDecodingError,
NotReady,
NotAServer(String),
IOError(io::Error), IOError(io::Error),
SerializeError(rmp_serde::encode::Error), SerializeError(rmp_serde::encode::Error),
DeserializeError(rmp_serde::decode::Error), DeserializeError(rmp_serde::decode::Error),
@ -24,6 +26,8 @@ impl fmt::Display for VentedError {
Self::CryptoError(e) => write!(f, "Cryptography Error: {}", e), Self::CryptoError(e) => write!(f, "Cryptography Error: {}", e),
Self::UnexpectedEvent(e) => write!(f, "Received unexpected event: {}", e), Self::UnexpectedEvent(e) => write!(f, "Received unexpected event: {}", e),
Self::UnknownNode(n) => write!(f, "Received connection from unknown node: {}", n), Self::UnknownNode(n) => write!(f, "Received connection from unknown node: {}", n),
Self::NotReady => write!(f, "The connection is still being established."),
Self::NotAServer(n) => write!(f, "The given node {} is not a server", n),
} }
} }
} }

@ -0,0 +1,24 @@
use crate::crypto::CryptoStream;
use crate::event_handler::EventHandler;
use crypto_box::SecretKey;
use parking_lot::Mutex;
use scheduled_thread_pool::ScheduledThreadPool;
use std::collections::HashMap;
use std::sync::Arc;
#[derive(Clone, Debug)]
pub struct Node {
pub id: String,
pub address: Option<String>,
}
#[derive(Clone)]
pub(crate) struct ServerConnectionContext {
pub is_server: bool,
pub secret_key: SecretKey,
pub own_node_id: String,
pub known_nodes: Arc<Mutex<Vec<Node>>>,
pub event_handler: Arc<Mutex<EventHandler>>,
pub connections: Arc<Mutex<HashMap<String, CryptoStream>>>,
pub listener_pool: Arc<Mutex<ScheduledThreadPool>>,
}

@ -8,111 +8,225 @@ use crate::crypto::CryptoStream;
use crate::event::Event; use crate::event::Event;
use crate::event_handler::EventHandler; use crate::event_handler::EventHandler;
use crate::result::{VentedError, VentedResult}; use crate::result::{VentedError, VentedResult};
use crate::server::data::{Node, ServerConnectionContext};
use crate::server::server_events::{ use crate::server::server_events::{
NodeInformationPayload, CONNECT_EVENT, CONN_ACCEPT_EVENT, CONN_REJECT_EVENT, NodeInformationPayload, CONNECT_EVENT, CONN_ACCEPT_EVENT, CONN_REJECT_EVENT,
}; };
use parking_lot::Mutex; use parking_lot::Mutex;
use std::io::Write; use std::io::Write;
use std::sync::Arc; use std::sync::Arc;
use std::thread;
pub mod data;
pub(crate) mod server_events; pub(crate) mod server_events;
/// The vented server that provides parallel handling of connections /// The vented server that provides parallel handling of connections
pub struct VentedServer { pub struct VentedServer {
connections: Arc<Mutex<HashMap<String, CryptoStream>>>, connections: Arc<Mutex<HashMap<String, CryptoStream>>>,
known_nodes: Arc<Mutex<Vec<String>>>, known_nodes: Arc<Mutex<Vec<Node>>>,
listener_pool: ScheduledThreadPool, listener_pool: Arc<Mutex<ScheduledThreadPool>>,
sender_pool: ScheduledThreadPool, sender_pool: Arc<Mutex<ScheduledThreadPool>>,
event_handler: Arc<Mutex<EventHandler>>, event_handler: Arc<Mutex<EventHandler>>,
secret_key: SecretKey, secret_key: SecretKey,
node_id: String, node_id: String,
} }
impl VentedServer { impl VentedServer {
pub fn new(node_id: String, nodes: Vec<String>, num_threads: usize) -> Self { pub fn new(node_id: String, nodes: Vec<Node>, num_threads: usize) -> Self {
let mut rng = rand::thread_rng(); let mut rng = rand::thread_rng();
Self { Self {
node_id, node_id,
event_handler: Arc::new(Mutex::new(EventHandler::new())), event_handler: Arc::new(Mutex::new(EventHandler::new())),
listener_pool: ScheduledThreadPool::new(num_threads), listener_pool: Arc::new(Mutex::new(ScheduledThreadPool::new(num_threads))),
sender_pool: ScheduledThreadPool::new(num_threads), sender_pool: Arc::new(Mutex::new(ScheduledThreadPool::new(num_threads))),
connections: Arc::new(Mutex::new(HashMap::new())), connections: Arc::new(Mutex::new(HashMap::new())),
secret_key: SecretKey::generate(&mut rng), secret_key: SecretKey::generate(&mut rng),
known_nodes: Arc::new(Mutex::new(nodes)), known_nodes: Arc::new(Mutex::new(nodes)),
} }
} }
/// Starts listening on the specified address (with port!) /// Emits an event to the specified Node
pub fn listen(&mut self, address: &str) -> VentedResult<()> { pub fn emit(&self, node_id: String, event: Event) -> VentedResult<()> {
let listener = TcpListener::bind(address)?; let handler = self.connections.lock().get(&node_id).cloned();
for connection in listener.incoming() {
match connection { if let Some(handler) = handler {
Ok(stream) => self.handle_connection(stream)?, self.sender_pool.lock().execute(move || {
Err(e) => log::trace!("Failed to establish connection: {}", e), handler.send(event).expect("Failed to send event");
});
Ok(())
} else {
if let Some(node) = self.known_nodes.lock().iter().find(|n| n.id == node_id) {
if let Some(address) = &node.address {
let handler = self.connect(address.clone())?;
self.sender_pool.lock().execute(move || {
handler.send(event).expect("Failed to send event");
});
Ok(())
} else {
Err(VentedError::NotAServer(node_id))
}
} else {
Err(VentedError::UnknownNode(node_id))
} }
} }
}
Ok(()) /// Adds a handler for the given event
pub fn on<F: 'static>(&mut self, event_name: &str, handler: F)
where
F: Fn(Event) -> Option<Event> + Send + Sync,
{
self.event_handler.lock().on(event_name, handler);
} }
/// Handles a single connection by first performing a key exchange and /// Starts listening on the specified address (with port!)
/// then establishing an encrypted connection pub fn listen(&mut self, address: String) {
fn handle_connection(&mut self, mut stream: TcpStream) -> VentedResult<()> { let context = self.get_server_context();
let secret_key = self.secret_key.clone();
let self_public_key = secret_key.public_key(); thread::spawn(move || match TcpListener::bind(address) {
let connections = Arc::clone(&self.connections); Ok(listener) => {
let own_node_id = self.node_id.clone(); for connection in listener.incoming() {
let known_nodes = Arc::clone(&self.known_nodes); match connection {
let event_handler = Arc::clone(&self.event_handler); Ok(stream) => {
if let Err(e) = Self::handle_connection(context.clone(), stream) {
self.listener_pool.execute(move || { log::error!("Failed to handle connection: {}", e);
match VentedServer::perform_key_exchange( }
&mut stream,
&secret_key,
&self_public_key,
own_node_id,
known_nodes,
) {
Ok((node_id, secret_box)) => {
let stream = CryptoStream::new(stream, secret_box)
.expect("Failed to create crypto stream");
connections
.lock()
.insert(node_id, CryptoStream::clone(&stream));
while let Ok(event) = stream.read() {
if let Some(response) = event_handler.lock().handle_event(event) {
stream.send(response).expect("Failed to send response");
} }
Err(e) => log::trace!("Failed to establish connection: {}", e),
} }
} }
Err(e) => log::error!("Failed to establish connection: {}", e), }
Err(e) => log::error!("Failed to bind listener: {}", e),
});
}
/// Returns a copy of the servers metadata
fn get_server_context(&self) -> ServerConnectionContext {
ServerConnectionContext {
is_server: true,
own_node_id: self.node_id.clone(),
secret_key: self.secret_key.clone(),
known_nodes: Arc::clone(&self.known_nodes),
connections: Arc::clone(&self.connections),
event_handler: Arc::clone(&self.event_handler),
listener_pool: Arc::clone(&self.listener_pool),
}
}
/// Handles a single connection by first performing a key exchange and
/// then establishing an encrypted connection
fn handle_connection(params: ServerConnectionContext, stream: TcpStream) -> VentedResult<()> {
let pool = Arc::clone(&params.listener_pool);
let event_handler = Arc::clone(&params.event_handler);
pool.lock().execute(move || {
let stream = VentedServer::get_crypto_stream(params, stream).expect("Listener failed");
while let Ok(event) = stream.read() {
if let Some(response) = event_handler.lock().handle_event(event) {
stream.send(response).expect("Failed to send response");
}
} }
}); });
Ok(()) Ok(())
} }
/// Emits an event to the specified Node fn get_crypto_stream(
pub fn emit(&self, node_id: &str, event: Event) -> bool { params: ServerConnectionContext,
let handler = self.connections.lock().get(node_id).cloned(); mut stream: TcpStream,
) -> VentedResult<CryptoStream> {
let (node_id, secret_box) = VentedServer::perform_key_exchange(
params.is_server,
&mut stream,
&params.secret_key,
params.own_node_id,
params.known_nodes,
)?;
if let Some(handler) = handler { let stream = CryptoStream::new(stream, secret_box)?;
self.sender_pool.execute(move || { params
handler.send(event).expect("Failed to send event"); .connections
}); .lock()
true .insert(node_id, CryptoStream::clone(&stream));
Ok(stream)
}
/// Connects to the given address as a tcp client
fn connect(&self, address: String) -> VentedResult<CryptoStream> {
let stream = TcpStream::connect(address)?;
let mut context = self.get_server_context();
context.is_server = false;
let stream = Self::get_crypto_stream(context, stream)?;
self.listener_pool.lock().execute({
let stream = CryptoStream::clone(&stream);
let event_handler = Arc::clone(&self.event_handler);
move || {
while let Ok(event) = stream.read() {
if let Some(response) = event_handler.lock().handle_event(event) {
stream.send(response).expect("Failed to send response");
}
}
}
});
Ok(stream)
}
/// Performs a key exchange
fn perform_key_exchange(
is_server: bool,
stream: &mut TcpStream,
secret_key: &SecretKey,
own_node_id: String,
known_nodes: Arc<Mutex<Vec<Node>>>,
) -> VentedResult<(String, ChaChaBox)> {
if is_server {
Self::perform_server_key_exchange(stream, secret_key, own_node_id, known_nodes)
} else { } else {
false Self::perform_client_key_exchange(stream, secret_key, own_node_id)
} }
} }
/// Performs the client side DH key exchange
fn perform_client_key_exchange(
mut stream: &mut TcpStream,
secret_key: &SecretKey,
own_node_id: String,
) -> VentedResult<(String, ChaChaBox)> {
stream.write(
&Event::with_payload(
CONNECT_EVENT.to_string(),
&NodeInformationPayload {
public_key: secret_key.public_key().to_bytes(),
node_id: own_node_id,
},
)
.as_bytes(),
)?;
stream.flush()?;
let event = Event::from_bytes(&mut stream)?;
if event.name != CONN_ACCEPT_EVENT {
return Err(VentedError::UnknownNode(event.name));
}
let NodeInformationPayload {
public_key,
node_id,
} = event.get_payload::<NodeInformationPayload>().unwrap();
let public_key = PublicKey::from(public_key);
let secret_box = ChaChaBox::new(&public_key, &secret_key);
Ok((node_id, secret_box))
}
/// Performs a DH key exchange by using the crypto_box module and events /// Performs a DH key exchange by using the crypto_box module and events
/// On success it returns a secret box with the established secret and the node id of the client /// On success it returns a secret box with the established secret and the node id of the client
fn perform_key_exchange( fn perform_server_key_exchange(
mut stream: &mut TcpStream, mut stream: &mut TcpStream,
secret_key: &SecretKey, secret_key: &SecretKey,
self_public_key: &PublicKey,
own_node_id: String, own_node_id: String,
known_nodes: Arc<Mutex<Vec<String>>>, known_nodes: Arc<Mutex<Vec<Node>>>,
) -> VentedResult<(String, ChaChaBox)> { ) -> VentedResult<(String, ChaChaBox)> {
let event = Event::from_bytes(&mut stream)?; let event = Event::from_bytes(&mut stream)?;
if event.name != CONNECT_EVENT { if event.name != CONNECT_EVENT {
@ -124,8 +238,14 @@ impl VentedServer {
} = event.get_payload::<NodeInformationPayload>().unwrap(); } = event.get_payload::<NodeInformationPayload>().unwrap();
let public_key = PublicKey::from(public_key); let public_key = PublicKey::from(public_key);
if !known_nodes.lock().contains(&node_id) { if known_nodes
.lock()
.iter()
.find(|n| n.id == node_id)
.is_none()
{
stream.write(&Event::new(CONN_REJECT_EVENT.to_string()).as_bytes())?; stream.write(&Event::new(CONN_REJECT_EVENT.to_string()).as_bytes())?;
stream.flush()?;
return Err(VentedError::UnknownNode(node_id)); return Err(VentedError::UnknownNode(node_id));
} }
@ -134,12 +254,13 @@ impl VentedServer {
&Event::with_payload( &Event::with_payload(
CONN_ACCEPT_EVENT.to_string(), CONN_ACCEPT_EVENT.to_string(),
&NodeInformationPayload { &NodeInformationPayload {
public_key: self_public_key.to_bytes(), public_key: secret_key.public_key().to_bytes(),
node_id: own_node_id, node_id: own_node_id,
}, },
) )
.as_bytes(), .as_bytes(),
)?; )?;
stream.flush()?;
Ok((node_id, secret_box)) Ok((node_id, secret_box))
} }

@ -1 +1,57 @@
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::Duration;
use vented::event::Event;
use vented::server::data::Node;
use vented::server::VentedServer;
#[test]
fn test_server_communication() {
let ping_count = Arc::new(AtomicUsize::new(0));
let pong_count = Arc::new(AtomicUsize::new(0));
let nodes = vec![
Node {
id: "A".to_string(),
address: Some("localhost:22222".to_string()),
},
Node {
id: "B".to_string(),
address: None,
},
];
let mut server_a = VentedServer::new("A".to_string(), nodes.clone(), 2);
let mut server_b = VentedServer::new("B".to_string(), nodes, 2);
server_a.listen("localhost:22222".to_string());
thread::sleep(Duration::from_millis(10));
server_a.on("ping", {
let ping_count = Arc::clone(&ping_count);
move |_| {
ping_count.fetch_add(1, Ordering::Relaxed);
Some(Event::new("pong".to_string()))
}
});
server_b.on("pong", {
let pong_count = Arc::clone(&pong_count);
move |_| {
pong_count.fetch_add(1, Ordering::Relaxed);
None
}
});
for _ in 0..10 {
server_b
.emit("A".to_string(), Event::new("ping".to_string()))
.unwrap();
}
server_a
.emit("B".to_string(), Event::new("pong".to_string()))
.unwrap();
// wait one second to make sure the servers were able to process the events
thread::sleep(Duration::from_secs(1));
assert_eq!(ping_count.load(Ordering::Relaxed), 10);
assert_eq!(pong_count.load(Ordering::Relaxed), 11);
}

Loading…
Cancel
Save