diff --git a/Cargo.toml b/Cargo.toml index dff95a7..5db4a4b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "vented" description = "Event driven encrypted tcp communicaton" -version = "0.3.0" +version = "0.3.1" authors = ["trivernis "] edition = "2018" readme = "README.md" diff --git a/src/crypto/mod.rs b/src/crypto/mod.rs index 3da8cd1..3c3cbd8 100644 --- a/src/crypto/mod.rs +++ b/src/crypto/mod.rs @@ -19,6 +19,7 @@ pub use crypto_box::SecretKey; /// A cryptographical stream object that handles encryption and decryption of streams #[derive(Clone)] pub struct CryptoStream { + recv_node_id: String, send_stream: Arc>, recv_stream: Arc>, send_secret: Arc>>, @@ -28,6 +29,7 @@ pub struct CryptoStream { impl CryptoStream { /// Creates a new crypto stream from a given Tcp Stream and with a given secret pub fn new( + node_id: String, inner: TcpStream, public_key: &PublicKey, secret_key: &SecretKey, @@ -39,6 +41,7 @@ impl CryptoStream { let recv_box = EncryptionBox::new(ChaChaBox::new(public_key, secret_key)); Ok(Self { + recv_node_id: node_id, send_stream, recv_stream, send_secret: Arc::new(Mutex::new(send_box)), @@ -94,6 +97,10 @@ impl CryptoStream { self.recv_secret.lock().swap_box(recv_box); log::trace!("Updated secret"); } + + pub fn receiver_node(&self) -> &String { + &self.recv_node_id + } } pub struct EncryptionBox diff --git a/src/server/mod.rs b/src/server/mod.rs index 40190de..85b53f9 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -107,15 +107,19 @@ impl VentedServer { /// Emits an event to the specified Node /// The actual writing is done in a separate thread from the thread pool. - /// With the returned waitgroup one can wait for the event to be written. + /// With the returned wait group one can wait for the event to be written. pub fn emit(&self, node_id: String, event: Event) -> VentedResult { let handler = self.connections.lock().get(&node_id).cloned(); let wg = WaitGroup::new(); let wg2 = WaitGroup::clone(&wg); if let Some(handler) = handler { + let connections = Arc::clone(&self.connections); self.sender_pool.lock().execute(move || { - handler.send(event).expect("Failed to send event"); + if let Err(e) = handler.send(event) { + log::error!("Failed to send event: {}", e); + connections.lock().remove(handler.receiver_node()); + } std::mem::drop(wg); }); Ok(wg2) @@ -207,6 +211,7 @@ impl VentedServer { let event_handler = Arc::clone(¶ms.event_handler); pool.lock().execute(move || { + let connections = Arc::clone(¶ms.connections); let stream = match VentedServer::get_crypto_stream(params, stream) { Ok(stream) => stream, Err(e) => { @@ -219,9 +224,13 @@ impl VentedServer { .handle_event(Event::new(READY_EVENT.to_string())); while let Ok(event) = stream.read() { if let Some(response) = event_handler.lock().handle_event(event) { - stream.send(response).expect("Failed to send response"); + if let Err(e) = stream.send(response) { + log::error!("Failed to send response event: {}", e); + break; + } } } + connections.lock().remove(stream.receiver_node()); }); Ok(()) @@ -254,6 +263,7 @@ impl VentedServer { let mut context = self.get_server_context(); context.is_server = false; + let connections = Arc::clone(&context.connections); let stream = Self::get_crypto_stream(context, stream)?; self.listener_pool.lock().execute({ let stream = CryptoStream::clone(&stream); @@ -261,9 +271,13 @@ impl VentedServer { move || { while let Ok(event) = stream.read() { if let Some(response) = event_handler.lock().handle_event(event) { - stream.send(response).expect("Failed to send response"); + if let Err(e) = stream.send(response) { + log::error!("Failed to send response event: {}", e); + break; + } } } + connections.lock().remove(stream.receiver_node()); } }); self.event_handler @@ -353,7 +367,7 @@ impl VentedServer { return Err(UnknownNode(node_id)); }; - let mut stream = CryptoStream::new(stream, &public_key, &secret_key)?; + let mut stream = CryptoStream::new(node_id.clone(), stream, &public_key, &secret_key)?; log::trace!("Authenticating recipient..."); let key_a = Self::authenticate_other(&mut stream, node_data.public_key)?; @@ -424,7 +438,7 @@ impl VentedServer { )?; stream.flush()?; - let mut stream = CryptoStream::new(stream, &public_key, &secret_key)?; + let mut stream = CryptoStream::new(node_id.clone(), stream, &public_key, &secret_key)?; log::trace!("Authenticating self..."); let key_a =