Fix broken streams not being removed from pool

Signed-off-by: trivernis <trivernis@protonmail.com>
pull/1/head
trivernis 4 years ago
parent 5e659509dd
commit 2c05e2736f
Signed by: Trivernis
GPG Key ID: DFFFCC2C7A02DB45

@ -1,7 +1,7 @@
[package] [package]
name = "vented" name = "vented"
description = "Event driven encrypted tcp communicaton" description = "Event driven encrypted tcp communicaton"
version = "0.3.0" version = "0.3.1"
authors = ["trivernis <trivernis@protonmail.com>"] authors = ["trivernis <trivernis@protonmail.com>"]
edition = "2018" edition = "2018"
readme = "README.md" readme = "README.md"

@ -19,6 +19,7 @@ pub use crypto_box::SecretKey;
/// A cryptographical stream object that handles encryption and decryption of streams /// A cryptographical stream object that handles encryption and decryption of streams
#[derive(Clone)] #[derive(Clone)]
pub struct CryptoStream { pub struct CryptoStream {
recv_node_id: String,
send_stream: Arc<Mutex<TcpStream>>, send_stream: Arc<Mutex<TcpStream>>,
recv_stream: Arc<Mutex<TcpStream>>, recv_stream: Arc<Mutex<TcpStream>>,
send_secret: Arc<Mutex<EncryptionBox<ChaChaBox>>>, send_secret: Arc<Mutex<EncryptionBox<ChaChaBox>>>,
@ -28,6 +29,7 @@ pub struct CryptoStream {
impl CryptoStream { impl CryptoStream {
/// Creates a new crypto stream from a given Tcp Stream and with a given secret /// Creates a new crypto stream from a given Tcp Stream and with a given secret
pub fn new( pub fn new(
node_id: String,
inner: TcpStream, inner: TcpStream,
public_key: &PublicKey, public_key: &PublicKey,
secret_key: &SecretKey, secret_key: &SecretKey,
@ -39,6 +41,7 @@ impl CryptoStream {
let recv_box = EncryptionBox::new(ChaChaBox::new(public_key, secret_key)); let recv_box = EncryptionBox::new(ChaChaBox::new(public_key, secret_key));
Ok(Self { Ok(Self {
recv_node_id: node_id,
send_stream, send_stream,
recv_stream, recv_stream,
send_secret: Arc::new(Mutex::new(send_box)), send_secret: Arc::new(Mutex::new(send_box)),
@ -94,6 +97,10 @@ impl CryptoStream {
self.recv_secret.lock().swap_box(recv_box); self.recv_secret.lock().swap_box(recv_box);
log::trace!("Updated secret"); log::trace!("Updated secret");
} }
pub fn receiver_node(&self) -> &String {
&self.recv_node_id
}
} }
pub struct EncryptionBox<T> pub struct EncryptionBox<T>

@ -107,15 +107,19 @@ impl VentedServer {
/// Emits an event to the specified Node /// Emits an event to the specified Node
/// The actual writing is done in a separate thread from the thread pool. /// The actual writing is done in a separate thread from the thread pool.
/// With the returned waitgroup one can wait for the event to be written. /// With the returned wait group one can wait for the event to be written.
pub fn emit(&self, node_id: String, event: Event) -> VentedResult<WaitGroup> { pub fn emit(&self, node_id: String, event: Event) -> VentedResult<WaitGroup> {
let handler = self.connections.lock().get(&node_id).cloned(); let handler = self.connections.lock().get(&node_id).cloned();
let wg = WaitGroup::new(); let wg = WaitGroup::new();
let wg2 = WaitGroup::clone(&wg); let wg2 = WaitGroup::clone(&wg);
if let Some(handler) = handler { if let Some(handler) = handler {
let connections = Arc::clone(&self.connections);
self.sender_pool.lock().execute(move || { self.sender_pool.lock().execute(move || {
handler.send(event).expect("Failed to send event"); if let Err(e) = handler.send(event) {
log::error!("Failed to send event: {}", e);
connections.lock().remove(handler.receiver_node());
}
std::mem::drop(wg); std::mem::drop(wg);
}); });
Ok(wg2) Ok(wg2)
@ -207,6 +211,7 @@ impl VentedServer {
let event_handler = Arc::clone(&params.event_handler); let event_handler = Arc::clone(&params.event_handler);
pool.lock().execute(move || { pool.lock().execute(move || {
let connections = Arc::clone(&params.connections);
let stream = match VentedServer::get_crypto_stream(params, stream) { let stream = match VentedServer::get_crypto_stream(params, stream) {
Ok(stream) => stream, Ok(stream) => stream,
Err(e) => { Err(e) => {
@ -219,9 +224,13 @@ impl VentedServer {
.handle_event(Event::new(READY_EVENT.to_string())); .handle_event(Event::new(READY_EVENT.to_string()));
while let Ok(event) = stream.read() { while let Ok(event) = stream.read() {
if let Some(response) = event_handler.lock().handle_event(event) { if let Some(response) = event_handler.lock().handle_event(event) {
stream.send(response).expect("Failed to send response"); if let Err(e) = stream.send(response) {
log::error!("Failed to send response event: {}", e);
break;
}
} }
} }
connections.lock().remove(stream.receiver_node());
}); });
Ok(()) Ok(())
@ -254,6 +263,7 @@ impl VentedServer {
let mut context = self.get_server_context(); let mut context = self.get_server_context();
context.is_server = false; context.is_server = false;
let connections = Arc::clone(&context.connections);
let stream = Self::get_crypto_stream(context, stream)?; let stream = Self::get_crypto_stream(context, stream)?;
self.listener_pool.lock().execute({ self.listener_pool.lock().execute({
let stream = CryptoStream::clone(&stream); let stream = CryptoStream::clone(&stream);
@ -261,9 +271,13 @@ impl VentedServer {
move || { move || {
while let Ok(event) = stream.read() { while let Ok(event) = stream.read() {
if let Some(response) = event_handler.lock().handle_event(event) { if let Some(response) = event_handler.lock().handle_event(event) {
stream.send(response).expect("Failed to send response"); if let Err(e) = stream.send(response) {
log::error!("Failed to send response event: {}", e);
break;
}
} }
} }
connections.lock().remove(stream.receiver_node());
} }
}); });
self.event_handler self.event_handler
@ -353,7 +367,7 @@ impl VentedServer {
return Err(UnknownNode(node_id)); return Err(UnknownNode(node_id));
}; };
let mut stream = CryptoStream::new(stream, &public_key, &secret_key)?; let mut stream = CryptoStream::new(node_id.clone(), stream, &public_key, &secret_key)?;
log::trace!("Authenticating recipient..."); log::trace!("Authenticating recipient...");
let key_a = Self::authenticate_other(&mut stream, node_data.public_key)?; let key_a = Self::authenticate_other(&mut stream, node_data.public_key)?;
@ -424,7 +438,7 @@ impl VentedServer {
)?; )?;
stream.flush()?; stream.flush()?;
let mut stream = CryptoStream::new(stream, &public_key, &secret_key)?; let mut stream = CryptoStream::new(node_id.clone(), stream, &public_key, &secret_key)?;
log::trace!("Authenticating self..."); log::trace!("Authenticating self...");
let key_a = let key_a =

Loading…
Cancel
Save