From 9a1c1df91d537194f460a4dd69671ff1007d6024 Mon Sep 17 00:00:00 2001 From: trivernis Date: Mon, 9 Nov 2020 22:34:43 +0100 Subject: [PATCH] Improve stream read write error handling Signed-off-by: trivernis --- Cargo.toml | 2 +- src/stream/cryptostream.rs | 20 ++++++++++++++------ src/stream/manager.rs | 27 +++++++++++++++++++++------ 3 files changed, 36 insertions(+), 13 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index a825d78..e491e52 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "vented" description = "Event driven encrypted tcp communicaton" -version = "0.10.1" +version = "0.10.2" authors = ["trivernis "] edition = "2018" readme = "README.md" diff --git a/src/stream/cryptostream.rs b/src/stream/cryptostream.rs index 2575171..2fab4b6 100644 --- a/src/stream/cryptostream.rs +++ b/src/stream/cryptostream.rs @@ -1,10 +1,10 @@ use std::io::{Read, Write}; -use std::net::TcpStream; +use std::net::{Shutdown, TcpStream}; use std::sync::Arc; use byteorder::{BigEndian, ByteOrder}; -use crypto_box::{ChaChaBox, SecretKey}; use crypto_box::aead::{Aead, Payload}; +use crypto_box::{ChaChaBox, SecretKey}; use generic_array::GenericArray; use parking_lot::Mutex; use sha2::Digest; @@ -99,19 +99,27 @@ impl CryptoStream { pub fn receiver_node(&self) -> &String { &self.recv_node_id } + + /// Closes both streams + pub fn shutdown(&self) -> VentedResult<()> { + self.send_stream.lock().shutdown(Shutdown::Both)?; + self.recv_stream.lock().shutdown(Shutdown::Both)?; + + Ok(()) + } } pub struct EncryptionBox - where - T: Aead, +where + T: Aead, { inner: T, counter: u128, } impl EncryptionBox - where - T: Aead, +where + T: Aead, { /// Creates a new encryption box with the given inner value pub fn new(inner: T) -> Self { diff --git a/src/stream/manager.rs b/src/stream/manager.rs index d4f75a4..e233c8f 100644 --- a/src/stream/manager.rs +++ b/src/stream/manager.rs @@ -97,6 +97,9 @@ impl ConcurrentStreamManager { } future.resolve(()); } + if let Err(e) = stream.shutdown() { + log::error!("Failed to shutdown stream: {}", e); + } emitters.lock().remove(&recv_id); threads.lock().remove(&thread::current().id()); } @@ -112,14 +115,26 @@ impl ConcurrentStreamManager { let wg = WaitGroup::clone(&wg); move || { mem::drop(wg); - while let Ok(mut event) = stream.read() { - event.origin = Some(stream.receiver_node().clone()); - - if let Err(e) = sender.send((stream.receiver_node().clone(), event)) { - log::trace!("Failed to get event from {}: {}", recv_id, e); - break; + loop { + match stream.read() { + Ok(mut event) => { + event.origin = Some(stream.receiver_node().clone()); + + if let Err(e) = sender.send((stream.receiver_node().clone(), event)) + { + log::trace!("Failed to get event from {}: {}", recv_id, e); + break; + } + } + Err(e) => { + log::error!("Failed to send event: {}", e); + break; + } } } + if let Err(e) = stream.shutdown() { + log::error!("Failed to shutdown stream: {}", e); + } threads.lock().remove(&thread::current().id()); } })?;