From ac81f69209993e54363719fe3b5815c8397b52ad Mon Sep 17 00:00:00 2001 From: trivernis Date: Sat, 7 Nov 2020 22:52:21 +0100 Subject: [PATCH] Fix memory leak Signed-off-by: trivernis --- Cargo.toml | 2 +- src/result.rs | 4 ++-- src/server/data.rs | 16 ++++++++++++++++ src/server/mod.rs | 10 ++++++---- 4 files changed, 25 insertions(+), 7 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 22cabf9..373aa86 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "vented" description = "Event driven encrypted tcp communicaton" -version = "0.4.0" +version = "0.4.1" authors = ["trivernis "] edition = "2018" readme = "README.md" diff --git a/src/result.rs b/src/result.rs index 367804f..d716b9e 100644 --- a/src/result.rs +++ b/src/result.rs @@ -8,7 +8,7 @@ pub type VentedResult = Result; pub enum VentedError { NameDecodingError, NotReady, - NotAServer(String), + UnreachableNode(String), IOError(io::Error), SerializeError(rmp_serde::encode::Error), DeserializeError(rmp_serde::decode::Error), @@ -31,7 +31,6 @@ impl fmt::Display for VentedError { Self::UnexpectedEvent(e) => write!(f, "Received unexpected event: {}", e), Self::UnknownNode(n) => write!(f, "Received connection from unknown node: {}", n), Self::NotReady => write!(f, "The connection is still being established."), - Self::NotAServer(n) => write!(f, "The given node {} is not a server", n), Self::Rejected => write!(f, "The connection was rejected"), Self::AuthFailed => write!(f, "Failed to authenticate the other party"), Self::VersionMismatch(version) => write!( @@ -39,6 +38,7 @@ impl fmt::Display for VentedError { "Version mismatch: Expected {} got {}", CRATE_VERSION, version ), + Self::UnreachableNode(node) => write!(f, "Node {} can't be reached", node), } } } diff --git a/src/server/data.rs b/src/server/data.rs index d6492e3..e772823 100644 --- a/src/server/data.rs +++ b/src/server/data.rs @@ -7,6 +7,8 @@ use scheduled_thread_pool::ScheduledThreadPool; use std::collections::HashMap; use std::mem; use std::sync::Arc; +use std::thread; +use std::time::{Duration, Instant}; use x25519_dalek::PublicKey; #[derive(Clone, Debug)] @@ -51,10 +53,24 @@ impl Future { /// Returns the value of the future after it has been set. /// This call blocks + #[allow(dead_code)] pub fn get_value(&mut self) -> T { if let Some(wg) = mem::take(&mut self.wg) { wg.wait(); } self.value.lock().take().unwrap() } + + /// Returns the value of the future only blocking for the given timeout + pub fn get_value_with_timeout(&mut self, millis: u128) -> Option { + let start = Instant::now(); + + while self.value.lock().is_none() { + thread::sleep(Duration::from_millis(1)); + if start.elapsed().as_millis() > millis { + break; + } + } + self.value.lock().take() + } } diff --git a/src/server/mod.rs b/src/server/mod.rs index 18021df..8c72148 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -307,12 +307,14 @@ impl VentedServer { .insert(payload.id, Future::clone(&future)); self.emit(node.id, Event::with_payload(REDIRECT_EVENT, &payload))?; - if future.get_value() { - return Ok(()); + if let Some(value) = future.get_value_with_timeout(1000) { + if value { + return Ok(()); + } } } - Err(VentedError::UnknownNode(target)) + Err(VentedError::UnreachableNode(target)) } /// Handles a single connection by first performing a key exchange and @@ -391,7 +393,7 @@ impl VentedServer { log::debug!("All connection attempts to {} failed!", target); - Err(VentedError::NotAServer(target.clone())) + Err(VentedError::UnreachableNode(target.clone())) } /// Establishes a crypto stream for the given stream