Change thread start behaviour

Signed-off-by: trivernis <trivernis@protonmail.com>
pull/1/head
trivernis 4 years ago
parent 9137eeb673
commit f5932aa45e
Signed by: Trivernis
GPG Key ID: DFFFCC2C7A02DB45

@ -1,7 +1,7 @@
[package] [package]
name = "vented" name = "vented"
description = "Event driven encrypted tcp communicaton" description = "Event driven encrypted tcp communicaton"
version = "0.9.1" version = "0.9.2"
authors = ["trivernis <trivernis@protonmail.com>"] authors = ["trivernis <trivernis@protonmail.com>"]
edition = "2018" edition = "2018"
readme = "README.md" readme = "README.md"

@ -7,6 +7,7 @@ use executors::crossbeam_workstealing_pool;
use executors::parker::DynParker; use executors::parker::DynParker;
use parking_lot::Mutex; use parking_lot::Mutex;
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc; use std::sync::Arc;
use x25519_dalek::PublicKey; use x25519_dalek::PublicKey;
@ -29,4 +30,5 @@ pub(crate) struct ServerConnectionContext {
pub forwarded_connections: Arc<Mutex<HashMap<(String, String), AsyncValue<CryptoStream, ()>>>>, pub forwarded_connections: Arc<Mutex<HashMap<(String, String), AsyncValue<CryptoStream, ()>>>>,
pub pool: crossbeam_workstealing_pool::ThreadPool<DynParker>, pub pool: crossbeam_workstealing_pool::ThreadPool<DynParker>,
pub redirect_handles: Arc<Mutex<HashMap<[u8; 16], AsyncValue<(), VentedError>>>>, pub redirect_handles: Arc<Mutex<HashMap<[u8; 16], AsyncValue<(), VentedError>>>>,
pub listener_count: Arc<AtomicUsize>,
} }

@ -1,5 +1,5 @@
use std::collections::HashMap; use std::collections::HashMap;
use std::net::{TcpListener, TcpStream}; use std::net::{Shutdown, TcpListener, TcpStream};
use crypto_box::{PublicKey, SecretKey}; use crypto_box::{PublicKey, SecretKey};
use executors::{crossbeam_workstealing_pool, Executor}; use executors::{crossbeam_workstealing_pool, Executor};
@ -21,6 +21,7 @@ use parking_lot::Mutex;
use sha2::Digest; use sha2::Digest;
use std::io::Write; use std::io::Write;
use std::iter::FromIterator; use std::iter::FromIterator;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc; use std::sync::Arc;
use std::thread; use std::thread;
use std::time::Duration; use std::time::Duration;
@ -75,6 +76,8 @@ pub struct VentedServer {
global_secret_key: SecretKey, global_secret_key: SecretKey,
node_id: String, node_id: String,
redirect_handles: Arc<Mutex<HashMap<[u8; 16], AsyncValue<(), VentedError>>>>, redirect_handles: Arc<Mutex<HashMap<[u8; 16], AsyncValue<(), VentedError>>>>,
listener_count: Arc<AtomicUsize>,
num_threads: usize,
} }
impl VentedServer { impl VentedServer {
@ -89,6 +92,7 @@ impl VentedServer {
) -> Self { ) -> Self {
let mut server = Self { let mut server = Self {
node_id, node_id,
num_threads,
event_handler: Arc::new(Mutex::new(EventHandler::new())), event_handler: Arc::new(Mutex::new(EventHandler::new())),
pool: executors::crossbeam_workstealing_pool::pool_with_auto_parker(num_threads), pool: executors::crossbeam_workstealing_pool::pool_with_auto_parker(num_threads),
connections: Arc::new(Mutex::new(HashMap::new())), connections: Arc::new(Mutex::new(HashMap::new())),
@ -98,6 +102,7 @@ impl VentedServer {
nodes.iter().cloned().map(|node| (node.id.clone(), node)), nodes.iter().cloned().map(|node| (node.id.clone(), node)),
))), ))),
redirect_handles: Arc::new(Mutex::new(HashMap::new())), redirect_handles: Arc::new(Mutex::new(HashMap::new())),
listener_count: Arc::new(AtomicUsize::new(0)),
}; };
server.register_events(); server.register_events();
@ -170,6 +175,8 @@ impl VentedServer {
let context = self.get_server_context(); let context = self.get_server_context();
let wg = WaitGroup::new(); let wg = WaitGroup::new();
let wg2 = WaitGroup::clone(&wg); let wg2 = WaitGroup::clone(&wg);
let num_threads = self.num_threads;
let listener_count = Arc::clone(&self.listener_count);
thread::spawn(move || match TcpListener::bind(&address) { thread::spawn(move || match TcpListener::bind(&address) {
Ok(listener) => { Ok(listener) => {
@ -179,10 +186,19 @@ impl VentedServer {
for connection in listener.incoming() { for connection in listener.incoming() {
match connection { match connection {
Ok(stream) => { Ok(stream) => {
let listener_count = listener_count.load(Ordering::Relaxed);
if listener_count >= num_threads {
log::warn!("Connection limit reached. Shutting down incoming connection...");
if let Err(e) = stream.shutdown(Shutdown::Both) {
log::error!("Failed to shutdown connection: {}", e)
}
} else {
if let Err(e) = Self::handle_connection(context.clone(), stream) { if let Err(e) = Self::handle_connection(context.clone(), stream) {
log::error!("Failed to handle connection: {}", e); log::error!("Failed to handle connection: {}", e);
} }
} }
}
Err(e) => log::trace!("Failed to establish connection: {}", e), Err(e) => log::trace!("Failed to establish connection: {}", e),
} }
} }
@ -208,6 +224,7 @@ impl VentedServer {
pool: self.pool.clone(), pool: self.pool.clone(),
forwarded_connections: Arc::clone(&self.forwarded_connections), forwarded_connections: Arc::clone(&self.forwarded_connections),
redirect_handles: Arc::clone(&self.redirect_handles), redirect_handles: Arc::clone(&self.redirect_handles),
listener_count: Arc::clone(&self.listener_count),
} }
} }
@ -224,6 +241,7 @@ impl VentedServer {
.filter(|node| node.address.is_some()) .filter(|node| node.address.is_some())
.cloned() .cloned()
.collect::<Vec<Node>>(); .collect::<Vec<Node>>();
for node in public_nodes { for node in public_nodes {
let payload = RedirectPayload::new( let payload = RedirectPayload::new(
context.node_id.clone(), context.node_id.clone(),
@ -255,15 +273,16 @@ impl VentedServer {
/// Handles a single connection by first performing a key exchange and /// Handles a single connection by first performing a key exchange and
/// then establishing an encrypted connection /// then establishing an encrypted connection
fn handle_connection(params: ServerConnectionContext, stream: TcpStream) -> VentedResult<()> { fn handle_connection(params: ServerConnectionContext, stream: TcpStream) -> VentedResult<()> {
let pool = params.pool.clone();
let event_handler = Arc::clone(&params.event_handler); let event_handler = Arc::clone(&params.event_handler);
log::trace!( log::trace!(
"Received connection from {}", "Received connection from {}",
stream.peer_addr().expect("Failed to get peer address") stream.peer_addr().expect("Failed to get peer address")
); );
pool.execute(move || { thread::spawn(move || {
let connections = Arc::clone(&params.connections); let connections = Arc::clone(&params.connections);
let listener_count = Arc::clone(&params.listener_count);
listener_count.fetch_add(1, Ordering::Relaxed);
let stream = match VentedServer::get_crypto_stream(params, stream) { let stream = match VentedServer::get_crypto_stream(params, stream) {
Ok(stream) => stream, Ok(stream) => stream,
@ -279,6 +298,7 @@ impl VentedServer {
} }
connections.lock().remove(stream.receiver_node()); connections.lock().remove(stream.receiver_node());
listener_count.fetch_sub(1, Ordering::Relaxed);
}); });
Ok(()) Ok(())
@ -337,6 +357,9 @@ impl VentedServer {
params: ServerConnectionContext, params: ServerConnectionContext,
stream: TcpStream, stream: TcpStream,
) -> VentedResult<CryptoStream> { ) -> VentedResult<CryptoStream> {
stream.set_read_timeout(Some(Duration::from_secs(10)))?;
stream.set_write_timeout(Some(Duration::from_secs(10)))?;
let (node_id, stream) = VentedServer::perform_key_exchange( let (node_id, stream) = VentedServer::perform_key_exchange(
params.is_server, params.is_server,
stream, stream,
@ -362,19 +385,21 @@ impl VentedServer {
context.is_server = false; context.is_server = false;
let connections = Arc::clone(&context.connections); let connections = Arc::clone(&context.connections);
let pool = context.pool.clone();
let event_handler = Arc::clone(&context.event_handler); let event_handler = Arc::clone(&context.event_handler);
let listener_count = Arc::clone(&context.listener_count);
let stream = Self::get_crypto_stream(context, stream)?; let stream = Self::get_crypto_stream(context, stream)?;
pool.execute({ thread::spawn({
let stream = CryptoStream::clone(&stream); let stream = CryptoStream::clone(&stream);
move || { move || {
listener_count.fetch_add(1, Ordering::Relaxed);
event_handler.lock().handle_event(Event::new(READY_EVENT)); event_handler.lock().handle_event(Event::new(READY_EVENT));
if let Err(e) = Self::handle_read(event_handler, &stream) { if let Err(e) = Self::handle_read(event_handler, &stream) {
log::error!("Connection aborted: {}", e); log::error!("Connection aborted: {}", e);
} }
connections.lock().remove(stream.receiver_node()); connections.lock().remove(stream.receiver_node());
listener_count.fetch_sub(1, Ordering::Relaxed);
} }
}); });

@ -43,7 +43,7 @@ fn test_server_communication() {
trusted: false, trusted: false,
}, },
]; ];
let mut server_a = VentedServer::new("A".to_string(), global_secret_a, nodes.clone(), 6); let mut server_a = VentedServer::new("A".to_string(), global_secret_a, nodes.clone(), 2);
let mut server_b = VentedServer::new("B".to_string(), global_secret_b, nodes.clone(), 3); let mut server_b = VentedServer::new("B".to_string(), global_secret_b, nodes.clone(), 3);
let mut server_c = VentedServer::new("C".to_string(), global_secret_c, nodes, 3); let mut server_c = VentedServer::new("C".to_string(), global_secret_c, nodes, 3);
let wg = server_a.listen("localhost:22222".to_string()); let wg = server_a.listen("localhost:22222".to_string());

Loading…
Cancel
Save