Update README

Signed-off-by: trivernis <trivernis@protonmail.com>
pull/1/head
trivernis 4 years ago
parent ce423d5c3d
commit da6cd1faaf
Signed by: Trivernis
GPG Key ID: DFFFCC2C7A02DB45

@ -19,4 +19,5 @@ rand = "0.7.3"
sha2 = "0.9.2" sha2 = "0.9.2"
generic-array = "0.14.4" generic-array = "0.14.4"
typenum = "1.12.0" typenum = "1.12.0"
x25519-dalek = "1.1.0" x25519-dalek = "1.1.0"
crossbeam-utils = "0.8.0"

@ -1,3 +1,45 @@
# Vented # Vented
Vented is an event based TCP server that uses message pack for payload data. Vented is an event based TCP server with encryption that uses message pack for payload data.
## Encryption
Vented uses key cryptography to encrypt the connection between the client and the serve.
The authenticity of both parties is validated by global public keys that need to be known
to both parties beforehand. The encryption itself uses randomly generated keys and a nonce
that corresponds to the message number. The crate used for encryption is [crypto_box](https://crates.io/crates/crypto_box)
which the [XChaCha20Poly1305](https://github.com/RustCrypto/AEADs/tree/master/chacha20poly1305) encryption.
The crate used for the key exchanges is [x25519-dalek](https://crates.io/crates/x25519-dalek).
## Usage
```rust
use vented::server::VentedServer;
use vented::server::data::Node;
use vented::crypto::SecretKey;
use rand::thread_rng;
use vented::event::Event;
fn main() {
let nodes = vec![
Node {
id: "B".to_string(),
address: None,
public_key: global_secret_b.public_key() // load it from somewhere
},
];
// in a real world example the secret key needs to be loaded from somewhere because connections
// with unknown keys are not accepted.
let global_secret = SecretKey::new(&mut thread_rng());
let mut server = VentedServer::new("A".to_string(), global_secret, nodes.clone(), 4);
server.listen("localhost:20000".to_string());
server.on("pong", |_event| {
println!("Pong!");
None // the return value is the response event Option<Event>
});
server.emit("B".to_string(), Event::new("ping".to_string())).unwrap();
}
```

@ -13,6 +13,8 @@ use typenum::U24;
use crate::event::Event; use crate::event::Event;
use crate::result::VentedResult; use crate::result::VentedResult;
pub use crypto_box::SecretKey;
/// A cryptographical stream object that handles encryption and decryption of streams /// A cryptographical stream object that handles encryption and decryption of streams
#[derive(Clone)] #[derive(Clone)]
pub struct CryptoStream { pub struct CryptoStream {

@ -20,8 +20,8 @@ impl EventHandler {
/// Adds a handler for the given event /// Adds a handler for the given event
pub fn on<F: 'static>(&mut self, event_name: &str, handler: F) pub fn on<F: 'static>(&mut self, event_name: &str, handler: F)
where where
F: Fn(Event) -> Option<Event> + Send + Sync, F: Fn(Event) -> Option<Event> + Send + Sync,
{ {
match self.event_handlers.get_mut(event_name) { match self.event_handlers.get_mut(event_name) {
Some(handlers) => handlers.push(Box::new(handler)), Some(handlers) => handlers.push(Box::new(handler)),

@ -14,6 +14,7 @@ use crate::server::server_events::{
AuthPayload, NodeInformationPayload, AUTH_EVENT, CONNECT_EVENT, CONN_ACCEPT_EVENT, AuthPayload, NodeInformationPayload, AUTH_EVENT, CONNECT_EVENT, CONN_ACCEPT_EVENT,
CONN_CHALLENGE_EVENT, CONN_REJECT_EVENT, READY_EVENT, CONN_CHALLENGE_EVENT, CONN_REJECT_EVENT, READY_EVENT,
}; };
use crossbeam_utils::sync::WaitGroup;
use parking_lot::Mutex; use parking_lot::Mutex;
use std::io::Write; use std::io::Write;
use std::sync::Arc; use std::sync::Arc;
@ -24,6 +25,35 @@ pub mod data;
pub mod server_events; pub mod server_events;
/// The vented server that provides parallel handling of connections /// The vented server that provides parallel handling of connections
/// Usage:
/// ```rust
/// use vented::server::VentedServer;
/// use vented::server::data::Node;
/// use vented::crypto::SecretKey;
/// use rand::thread_rng;
/// use vented::event::Event;
///
/// let nodes = vec![
/// Node {
/// id: "B".to_string(),
/// address: None,
/// public_key: global_secret_b.public_key() // load it from somewhere
/// },
///];
/// // in a real world example the secret key needs to be loaded from somewhere because connections
/// // with unknown keys are not accepted.
/// let global_secret = SecretKey::new(&mut thread_rng());
/// let mut server = VentedServer::new("A".to_string(), global_secret, nodes.clone(), 4);
///
///
/// server.listen("localhost:20000".to_string());
/// server.on("pong", |_event| {
/// println!("Pong!");
///
/// None // the return value is the response event Option<Event>
/// });
/// server.emit("B".to_string(), Event::new("ping".to_string())).unwrap();
/// ```
pub struct VentedServer { pub struct VentedServer {
connections: Arc<Mutex<HashMap<String, CryptoStream>>>, connections: Arc<Mutex<HashMap<String, CryptoStream>>>,
known_nodes: Arc<Mutex<Vec<Node>>>, known_nodes: Arc<Mutex<Vec<Node>>>,
@ -35,6 +65,10 @@ pub struct VentedServer {
} }
impl VentedServer { impl VentedServer {
/// Creates a new vented server with a given node_id and secret key that are
/// used to authenticate against other servers.
/// The given nodes are used for authentication.
/// The server runs with 2x the given amount of threads.
pub fn new( pub fn new(
node_id: String, node_id: String,
secret_key: SecretKey, secret_key: SecretKey,
@ -53,14 +87,19 @@ impl VentedServer {
} }
/// Emits an event to the specified Node /// Emits an event to the specified Node
pub fn emit(&self, node_id: String, event: Event) -> VentedResult<()> { /// The actual writing is done in a separate thread from the thread pool.
/// With the returned waitgroup one can wait for the event to be written.
pub fn emit(&self, node_id: String, event: Event) -> VentedResult<WaitGroup> {
let handler = self.connections.lock().get(&node_id).cloned(); let handler = self.connections.lock().get(&node_id).cloned();
let wg = WaitGroup::new();
let wg2 = WaitGroup::clone(&wg);
if let Some(handler) = handler { if let Some(handler) = handler {
self.sender_pool.lock().execute(move || { self.sender_pool.lock().execute(move || {
handler.send(event).expect("Failed to send event"); handler.send(event).expect("Failed to send event");
std::mem::drop(wg);
}); });
Ok(()) Ok(wg2)
} else { } else {
let found_node = self let found_node = self
.known_nodes .known_nodes
@ -73,8 +112,9 @@ impl VentedServer {
let handler = self.connect(address.clone())?; let handler = self.connect(address.clone())?;
self.sender_pool.lock().execute(move || { self.sender_pool.lock().execute(move || {
handler.send(event).expect("Failed to send event"); handler.send(event).expect("Failed to send event");
std::mem::drop(wg);
}); });
Ok(()) Ok(wg2)
} else { } else {
Err(VentedError::NotAServer(node_id)) Err(VentedError::NotAServer(node_id))
} }
@ -84,7 +124,10 @@ impl VentedServer {
} }
} }
/// Adds a handler for the given event /// Adds a handler for the given event.
/// The event returned by the handler is returned to the server.
/// If there is more than one handler, the response will be piped to the next handler.
/// The oder is by order of insertion. The first registered handler will be executed first.
pub fn on<F: 'static>(&mut self, event_name: &str, handler: F) pub fn on<F: 'static>(&mut self, event_name: &str, handler: F)
where where
F: Fn(Event) -> Option<Event> + Send + Sync, F: Fn(Event) -> Option<Event> + Send + Sync,
@ -93,11 +136,18 @@ impl VentedServer {
} }
/// Starts listening on the specified address (with port!) /// Starts listening on the specified address (with port!)
pub fn listen(&mut self, address: String) { /// This will cause a new thread to start up so that the method returns immediately
/// With the returned wait group one can wait for the server to be ready.
/// The method can be called multiple times to start listeners on multiple ports.
pub fn listen(&mut self, address: String) -> WaitGroup {
let context = self.get_server_context(); let context = self.get_server_context();
let wg = WaitGroup::new();
let wg2 = WaitGroup::clone(&wg);
thread::spawn(move || match TcpListener::bind(address) { thread::spawn(move || match TcpListener::bind(&address) {
Ok(listener) => { Ok(listener) => {
log::info!("Listener running on {}", address);
std::mem::drop(wg);
for connection in listener.incoming() { for connection in listener.incoming() {
match connection { match connection {
Ok(stream) => { Ok(stream) => {
@ -109,8 +159,13 @@ impl VentedServer {
} }
} }
} }
Err(e) => log::error!("Failed to bind listener: {}", e), Err(e) => {
log::error!("Failed to bind listener: {}", e);
std::mem::drop(wg);
}
}); });
wg2
} }
/// Returns a copy of the servers metadata /// Returns a copy of the servers metadata

@ -30,8 +30,8 @@ fn test_server_communication() {
]; ];
let mut server_a = VentedServer::new("A".to_string(), global_secret_a, nodes.clone(), 4); let mut server_a = VentedServer::new("A".to_string(), global_secret_a, nodes.clone(), 4);
let mut server_b = VentedServer::new("B".to_string(), global_secret_b, nodes, 4); let mut server_b = VentedServer::new("B".to_string(), global_secret_b, nodes, 4);
server_a.listen("localhost:22222".to_string()); let wg = server_a.listen("localhost:22222".to_string());
thread::sleep(Duration::from_millis(10)); wg.wait();
server_a.on("ping", { server_a.on("ping", {
let ping_count = Arc::clone(&ping_count); let ping_count = Arc::clone(&ping_count);
@ -63,14 +63,15 @@ fn test_server_communication() {
} }
}); });
for _ in 0..10 { for _ in 0..10 {
server_b let wg = server_b
.emit("A".to_string(), Event::new("ping".to_string())) .emit("A".to_string(), Event::new("ping".to_string()))
.unwrap(); .unwrap();
thread::sleep(Duration::from_millis(10)); wg.wait();
} }
server_a let wg = server_a
.emit("B".to_string(), Event::new("pong".to_string())) .emit("B".to_string(), Event::new("pong".to_string()))
.unwrap(); .unwrap();
wg.wait();
// wait one second to make sure the servers were able to process the events // wait one second to make sure the servers were able to process the events
for _ in 0..100 { for _ in 0..100 {

Loading…
Cancel
Save