Fix memory leak

Signed-off-by: trivernis <trivernis@protonmail.com>
pull/1/head
trivernis 4 years ago
parent 12d17f09c9
commit ac81f69209
Signed by: Trivernis
GPG Key ID: DFFFCC2C7A02DB45

@ -1,7 +1,7 @@
[package]
name = "vented"
description = "Event driven encrypted tcp communicaton"
version = "0.4.0"
version = "0.4.1"
authors = ["trivernis <trivernis@protonmail.com>"]
edition = "2018"
readme = "README.md"

@ -8,7 +8,7 @@ pub type VentedResult<T> = Result<T, VentedError>;
pub enum VentedError {
NameDecodingError,
NotReady,
NotAServer(String),
UnreachableNode(String),
IOError(io::Error),
SerializeError(rmp_serde::encode::Error),
DeserializeError(rmp_serde::decode::Error),
@ -31,7 +31,6 @@ impl fmt::Display for VentedError {
Self::UnexpectedEvent(e) => write!(f, "Received unexpected event: {}", e),
Self::UnknownNode(n) => write!(f, "Received connection from unknown node: {}", n),
Self::NotReady => write!(f, "The connection is still being established."),
Self::NotAServer(n) => write!(f, "The given node {} is not a server", n),
Self::Rejected => write!(f, "The connection was rejected"),
Self::AuthFailed => write!(f, "Failed to authenticate the other party"),
Self::VersionMismatch(version) => write!(
@ -39,6 +38,7 @@ impl fmt::Display for VentedError {
"Version mismatch: Expected {} got {}",
CRATE_VERSION, version
),
Self::UnreachableNode(node) => write!(f, "Node {} can't be reached", node),
}
}
}

@ -7,6 +7,8 @@ use scheduled_thread_pool::ScheduledThreadPool;
use std::collections::HashMap;
use std::mem;
use std::sync::Arc;
use std::thread;
use std::time::{Duration, Instant};
use x25519_dalek::PublicKey;
#[derive(Clone, Debug)]
@ -51,10 +53,24 @@ impl<T> Future<T> {
/// Returns the value of the future after it has been set.
/// This call blocks
#[allow(dead_code)]
pub fn get_value(&mut self) -> T {
if let Some(wg) = mem::take(&mut self.wg) {
wg.wait();
}
self.value.lock().take().unwrap()
}
/// Returns the value of the future only blocking for the given timeout
pub fn get_value_with_timeout(&mut self, millis: u128) -> Option<T> {
let start = Instant::now();
while self.value.lock().is_none() {
thread::sleep(Duration::from_millis(1));
if start.elapsed().as_millis() > millis {
break;
}
}
self.value.lock().take()
}
}

@ -307,12 +307,14 @@ impl VentedServer {
.insert(payload.id, Future::clone(&future));
self.emit(node.id, Event::with_payload(REDIRECT_EVENT, &payload))?;
if future.get_value() {
return Ok(());
if let Some(value) = future.get_value_with_timeout(1000) {
if value {
return Ok(());
}
}
}
Err(VentedError::UnknownNode(target))
Err(VentedError::UnreachableNode(target))
}
/// Handles a single connection by first performing a key exchange and
@ -391,7 +393,7 @@ impl VentedServer {
log::debug!("All connection attempts to {} failed!", target);
Err(VentedError::NotAServer(target.clone()))
Err(VentedError::UnreachableNode(target.clone()))
}
/// Establishes a crypto stream for the given stream

Loading…
Cancel
Save