Fix blocked receiver thread

Signed-off-by: trivernis <trivernis@protonmail.com>
pull/1/head
trivernis 4 years ago
parent 7fb2b5d647
commit e59e26c72f
Signed by: Trivernis
GPG Key ID: DFFFCC2C7A02DB45

@ -1,7 +1,7 @@
[package]
name = "vented"
description = "Event driven encrypted tcp communicaton"
version = "0.10.0"
version = "0.10.1"
authors = ["trivernis <trivernis@protonmail.com>"]
edition = "2018"
readme = "README.md"

@ -11,24 +11,111 @@ use crate::stream::cryptostream::CryptoStream;
use crate::stream::manager::ConcurrentStreamManager;
use crate::utils::result::VentedError;
use crate::utils::sync::AsyncValue;
use std::time::{Duration, Instant};
#[derive(Clone, Debug)]
pub struct Node {
pub id: String,
pub public_key: PublicKey,
pub address: Option<String>,
pub addresses: Vec<String>,
pub trusted: bool,
}
#[derive(Clone, Debug)]
pub struct NodeData {
inner: Node,
state: NodeState,
}
#[derive(Clone, Debug)]
pub enum NodeState {
Alive(Instant),
Dead(Instant),
Unknown,
}
#[derive(Clone)]
pub(crate) struct ServerConnectionContext {
pub is_server: bool,
pub node_id: String,
pub global_secret: SecretKey,
pub known_nodes: Arc<Mutex<HashMap<String, Node>>>,
pub known_nodes: Arc<Mutex<HashMap<String, NodeData>>>,
pub event_handler: Arc<Mutex<EventHandler>>,
pub forwarded_connections: Arc<Mutex<HashMap<(String, String), AsyncValue<CryptoStream, ()>>>>,
pub pool: Arc<Mutex<ScheduledThreadPool>>,
pub sender_pool: Arc<Mutex<ScheduledThreadPool>>,
pub recv_pool: Arc<Mutex<ScheduledThreadPool>>,
pub redirect_handles: Arc<Mutex<HashMap<[u8; 16], AsyncValue<(), VentedError>>>>,
pub manager: ConcurrentStreamManager,
}
impl From<Node> for NodeData {
fn from(node: Node) -> Self {
Self {
inner: node,
state: NodeState::Unknown,
}
}
}
impl From<NodeData> for Node {
fn from(other: NodeData) -> Self {
other.inner
}
}
// how long is a node assumed to be in a state before rechecking is necessary
const NODE_STATE_TTL_SECONDS: u64 = 600;
impl NodeData {
/// Returns the inner node data
pub fn node(&self) -> &Node {
&self.inner
}
/// Returns a mutable reference of the inner node data
pub fn node_mut(&mut self) -> &mut Node {
&mut self.inner
}
/// Returns the state of the node
pub fn node_state(&mut self) -> &NodeState {
let ttl = Duration::from_secs(NODE_STATE_TTL_SECONDS);
match &self.state {
NodeState::Alive(since) | NodeState::Dead(since) if since.elapsed() > ttl => {
self.state = NodeState::Unknown;
log::trace!(
"Node state of {} updated to {:?}",
self.inner.id,
self.state
)
}
_ => {}
}
&self.state
}
/// Sets the state of the node
pub fn set_node_state(&mut self, state: NodeState) {
self.state = state;
log::trace!(
"Node state of {} updated to {:?}",
self.inner.id,
self.state
)
}
/// Returns if the node is dead
pub fn is_dead(&self) -> bool {
match &self.state {
NodeState::Dead(_) => true,
_ => false,
}
}
pub fn is_alive(&self) -> bool {
match &self.state {
NodeState::Alive(_) => true,
_ => false,
}
}
}

@ -5,7 +5,7 @@ use std::mem;
use std::net::{TcpListener, TcpStream};
use std::sync::Arc;
use std::thread;
use std::time::Duration;
use std::time::{Duration, Instant};
use crossbeam_utils::sync::WaitGroup;
use crypto_box::{PublicKey, SecretKey};
@ -16,21 +16,23 @@ use x25519_dalek::StaticSecret;
use crate::event::Event;
use crate::event_handler::EventHandler;
use crate::server::data::{Node, ServerConnectionContext};
use crate::server::data::{Node, NodeData, NodeState, ServerConnectionContext};
use crate::server::server_events::{
AuthPayload, ChallengePayload, NodeInformationPayload, RedirectPayload, VersionMismatchPayload,
ACCEPT_EVENT, AUTH_EVENT, CHALLENGE_EVENT, CONNECT_EVENT, MISMATCH_EVENT, READY_EVENT,
REDIRECT_EVENT, REJECT_EVENT,
};
use crate::stream::cryptostream::CryptoStream;
use crate::stream::manager::ConcurrentStreamManager;
use crate::stream::manager::{ConcurrentStreamManager, CONNECTION_TIMEOUT_SECONDS};
use crate::utils::result::{VentedError, VentedResult};
use crate::utils::sync::AsyncValue;
use std::cmp::max;
pub mod data;
pub mod server_events;
pub(crate) const CRATE_VERSION: &str = env!("CARGO_PKG_VERSION");
pub const PROTOCOL_VERSION: &str = "1.0";
type ForwardFutureVector = Arc<Mutex<HashMap<(String, String), AsyncValue<CryptoStream, ()>>>>;
@ -47,7 +49,7 @@ type ForwardFutureVector = Arc<Mutex<HashMap<(String, String), AsyncValue<Crypto
/// let nodes = vec![
/// Node {
/// id: "B".to_string(),
/// address: None,
/// addresses: vec![],
/// trusted: true,
/// public_key: global_secret_b.public_key() // load it from somewhere
/// },
@ -68,13 +70,14 @@ type ForwardFutureVector = Arc<Mutex<HashMap<(String, String), AsyncValue<Crypto
/// ```
pub struct VentedServer {
forwarded_connections: ForwardFutureVector,
known_nodes: Arc<Mutex<HashMap<String, Node>>>,
known_nodes: Arc<Mutex<HashMap<String, NodeData>>>,
event_handler: Arc<Mutex<EventHandler>>,
global_secret_key: SecretKey,
node_id: String,
redirect_handles: Arc<Mutex<HashMap<[u8; 16], AsyncValue<(), VentedError>>>>,
manager: ConcurrentStreamManager,
pool: Arc<Mutex<ScheduledThreadPool>>,
sender_pool: Arc<Mutex<ScheduledThreadPool>>,
receiver_pool: Arc<Mutex<ScheduledThreadPool>>,
}
impl VentedServer {
@ -95,10 +98,20 @@ impl VentedServer {
forwarded_connections: Arc::new(Mutex::new(HashMap::new())),
global_secret_key: secret_key,
known_nodes: Arc::new(Mutex::new(HashMap::from_iter(
nodes.iter().cloned().map(|node| (node.id.clone(), node)),
nodes
.iter()
.cloned()
.map(|node| (node.id.clone(), node.into())),
))),
redirect_handles: Arc::new(Mutex::new(HashMap::new())),
pool: Arc::new(Mutex::new(ScheduledThreadPool::new(num_threads))),
sender_pool: Arc::new(Mutex::new(ScheduledThreadPool::new(max(
num_threads / 2,
1,
)))),
receiver_pool: Arc::new(Mutex::new(ScheduledThreadPool::new(max(
num_threads / 2,
1,
)))),
};
server.register_events();
server.start_event_listener();
@ -113,11 +126,16 @@ impl VentedServer {
/// Returns the nodes known to the server
pub fn nodes(&self) -> Vec<Node> {
self.known_nodes.lock().values().cloned().collect()
self.known_nodes
.lock()
.values()
.cloned()
.map(Node::from)
.collect()
}
/// Returns the actual reference to the inner node list
pub fn nodes_ref(&self) -> Arc<Mutex<HashMap<String, Node>>> {
pub fn nodes_ref(&self) -> Arc<Mutex<HashMap<String, NodeData>>> {
Arc::clone(&self.known_nodes)
}
@ -180,10 +198,11 @@ impl VentedServer {
global_secret: self.global_secret_key.clone(),
known_nodes: Arc::clone(&self.known_nodes),
event_handler: Arc::clone(&self.event_handler),
pool: Arc::clone(&self.pool),
sender_pool: Arc::clone(&self.sender_pool),
forwarded_connections: Arc::clone(&self.forwarded_connections),
redirect_handles: Arc::clone(&self.redirect_handles),
manager: self.manager.clone(),
recv_pool: Arc::clone(&self.receiver_pool),
}
}
@ -199,6 +218,9 @@ impl VentedServer {
move || {
mem::drop(wg);
while let Ok((origin, event)) = receiver.recv() {
if let Some(node) = context.known_nodes.lock().get_mut(&origin) {
node.set_node_state(NodeState::Alive(Instant::now()));
}
let responses = event_handler.lock().handle_event(event);
for response in responses {
@ -220,34 +242,64 @@ impl VentedServer {
event: Event,
redirect: bool,
) -> AsyncValue<(), VentedError> {
log::trace!(
"Emitting: '{}' from {} to {}",
event.name,
context.node_id,
target
);
if context.manager.has_connection(target) {
log::trace!("Reusing existing connection.");
context.manager.send(target, event)
} else {
let future = AsyncValue::new();
context.pool.lock().execute({
context.sender_pool.lock().execute({
let mut future = AsyncValue::clone(&future);
let node_id = target.clone();
let context = context.clone();
move || {
log::trace!(
"Trying to redirect the event to a different node to be sent to target node..."
);
if let Ok(connection) = Self::get_connection(context.clone(), &node_id) {
log::trace!("Trying to establish connection...");
let node_state = if let Ok(connection) =
Self::get_connection(context.clone(), &node_id)
{
if let Err(e) = context.manager.add_connection(connection) {
future.reject(e);
return;
}
log::trace!("Established new connection.");
let result = context.manager.send(&node_id, event).get_value();
future.result(result);
match result {
Ok(_) => {
future.resolve(());
NodeState::Alive(Instant::now())
}
Err(e) => {
future.reject(e);
NodeState::Dead(Instant::now())
}
}
} else if redirect {
log::trace!("Trying to send event redirected");
let result = Self::send_event_redirected(context, &node_id, event);
future.result(result);
log::trace!("Trying to use a proxy node...");
let result = Self::send_event_redirected(context.clone(), &node_id, event);
match result {
Ok(_) => {
future.resolve(());
NodeState::Alive(Instant::now())
}
Err(e) => {
future.reject(e);
NodeState::Dead(Instant::now())
}
}
} else {
future.reject(VentedError::UnreachableNode(node_id))
log::trace!("Failed to emit event to node {}", node_id);
future.reject(VentedError::UnreachableNode(node_id.clone()));
NodeState::Dead(Instant::now())
};
if let Some(node) = context.known_nodes.lock().get_mut(&node_id) {
node.set_node_state(node_state);
}
}
});
@ -266,14 +318,14 @@ impl VentedServer {
.known_nodes
.lock()
.values()
.filter(|node| node.address.is_some())
.filter(|node| !node.node().addresses.is_empty() && node.is_alive())
.cloned()
.collect::<Vec<Node>>();
.collect::<Vec<NodeData>>();
for node in public_nodes {
let payload = RedirectPayload::new(
context.node_id.clone(),
node.id.clone(),
node.node().id.clone(),
target.clone(),
event.clone().as_bytes(),
);
@ -285,17 +337,21 @@ impl VentedServer {
if let Err(e) = Self::send_event(
context.clone(),
&node.id,
&node.node().id,
Event::with_payload(REDIRECT_EVENT, &payload),
false,
)
.get_value()
{
log::error!("Failed to redirect via {}: {}", node.id, e);
log::error!("Failed to redirect via {}: {}", node.node().id, e);
}
if let Some(Ok(_)) = future.get_value_with_timeout(Duration::from_secs(10)) {
if let Some(Ok(_)) =
future.get_value_with_timeout(Duration::from_secs(CONNECTION_TIMEOUT_SECONDS))
{
return Ok(());
} else {
log::error!("Failed to redirect via {}: Timeout", node.node().id);
}
}
@ -306,12 +362,14 @@ impl VentedServer {
/// then establishing an encrypted connection
fn handle_connection(context: ServerConnectionContext, stream: TcpStream) -> VentedResult<()> {
let event_handler = Arc::clone(&context.event_handler);
stream.set_read_timeout(Some(Duration::from_secs(CONNECTION_TIMEOUT_SECONDS)))?;
stream.set_write_timeout(Some(Duration::from_secs(CONNECTION_TIMEOUT_SECONDS)))?;
log::trace!(
"Received connection from {}",
stream.peer_addr().expect("Failed to get peer address")
);
context.pool.lock().execute({
context.recv_pool.lock().execute({
let context = context.clone();
move || {
let manager = context.manager.clone();
@ -349,25 +407,34 @@ impl VentedServer {
.cloned()
.ok_or(VentedError::UnknownNode(target.clone()))?;
if let Some(address) = target_node.address {
log::trace!("Connecting to known address");
log::trace!("Connecting to known addresses");
Self::connect(context, address)
} else {
log::trace!("All direct connection attempts to {} failed", target);
for address in &target_node.node().addresses {
match Self::connect(context.clone(), address.clone()) {
Ok(stream) => return Ok(stream),
Err(e) => {
log::error!("Failed to connect to node {}'s address: {}", target, e);
context
.known_nodes
.lock()
.get_mut(target)
.unwrap()
.node_mut()
.addresses
.retain(|a| a != address);
}
}
}
log::trace!("All direct connection attempts to {} failed", target);
Err(VentedError::UnreachableNode(target.clone()))
}
}
/// Establishes a crypto stream for the given stream
fn get_crypto_stream(
context: ServerConnectionContext,
stream: TcpStream,
) -> VentedResult<CryptoStream> {
stream.set_read_timeout(Some(Duration::from_secs(10)))?;
stream.set_write_timeout(Some(Duration::from_secs(10)))?;
let (_, stream) = VentedServer::perform_key_exchange(
context.is_server,
stream,
@ -385,6 +452,8 @@ impl VentedServer {
address: String,
) -> VentedResult<CryptoStream> {
let stream = TcpStream::connect(address)?;
stream.set_read_timeout(Some(Duration::from_secs(CONNECTION_TIMEOUT_SECONDS)))?;
stream.set_write_timeout(Some(Duration::from_secs(CONNECTION_TIMEOUT_SECONDS)))?;
context.is_server = false;
let stream = Self::get_crypto_stream(context, stream)?;
@ -397,7 +466,7 @@ impl VentedServer {
stream: TcpStream,
own_node_id: String,
global_secret: SecretKey,
known_nodes: Arc<Mutex<HashMap<String, Node>>>,
known_nodes: Arc<Mutex<HashMap<String, NodeData>>>,
) -> VentedResult<(String, CryptoStream)> {
let secret_key = SecretKey::generate(&mut rand::thread_rng());
if is_server {
@ -425,7 +494,7 @@ impl VentedServer {
secret_key: &SecretKey,
own_node_id: String,
global_secret: SecretKey,
known_nodes: Arc<Mutex<HashMap<String, Node>>>,
known_nodes: Arc<Mutex<HashMap<String, NodeData>>>,
) -> VentedResult<(String, CryptoStream)> {
stream.write(
&Event::with_payload(
@ -433,7 +502,7 @@ impl VentedServer {
&NodeInformationPayload {
public_key: secret_key.public_key().to_bytes(),
node_id: own_node_id,
vented_version: CRATE_VERSION.to_string(),
vented_version: PROTOCOL_VERSION.to_string(),
},
)
.as_bytes(),
@ -450,11 +519,11 @@ impl VentedServer {
vented_version,
} = event.get_payload::<NodeInformationPayload>().unwrap();
if !Self::compare_version(&vented_version, CRATE_VERSION) {
if !Self::compare_version(&vented_version, PROTOCOL_VERSION) {
stream.write(
&Event::with_payload(
MISMATCH_EVENT,
&VersionMismatchPayload::new(CRATE_VERSION, &vented_version),
&VersionMismatchPayload::new(PROTOCOL_VERSION, &vented_version),
)
.as_bytes(),
)?;
@ -475,7 +544,7 @@ impl VentedServer {
let mut stream = CryptoStream::new(node_id.clone(), stream, &public_key, &secret_key)?;
log::trace!("Authenticating recipient...");
let key_a = Self::authenticate_other(&mut stream, node_data.public_key)?;
let key_a = Self::authenticate_other(&mut stream, node_data.node().public_key)?;
log::trace!("Authenticating self...");
let key_b =
Self::authenticate_self(&mut stream, StaticSecret::from(global_secret.to_bytes()))?;
@ -497,7 +566,7 @@ impl VentedServer {
secret_key: &SecretKey,
own_node_id: String,
global_secret: SecretKey,
known_nodes: Arc<Mutex<HashMap<String, Node>>>,
known_nodes: Arc<Mutex<HashMap<String, NodeData>>>,
) -> VentedResult<(String, CryptoStream)> {
let event = Event::from_bytes(&mut stream)?;
if event.name != CONNECT_EVENT {
@ -509,11 +578,11 @@ impl VentedServer {
vented_version,
} = event.get_payload::<NodeInformationPayload>().unwrap();
if !Self::compare_version(&vented_version, CRATE_VERSION) {
if !Self::compare_version(&vented_version, PROTOCOL_VERSION) {
stream.write(
&Event::with_payload(
MISMATCH_EVENT,
&VersionMismatchPayload::new(CRATE_VERSION, &vented_version),
&VersionMismatchPayload::new(PROTOCOL_VERSION, &vented_version),
)
.as_bytes(),
)?;
@ -536,7 +605,7 @@ impl VentedServer {
&NodeInformationPayload {
public_key: secret_key.public_key().to_bytes(),
node_id: own_node_id,
vented_version: CRATE_VERSION.to_string(),
vented_version: PROTOCOL_VERSION.to_string(),
},
)
.as_bytes(),
@ -549,7 +618,7 @@ impl VentedServer {
let key_a =
Self::authenticate_self(&mut stream, StaticSecret::from(global_secret.to_bytes()))?;
log::trace!("Authenticating recipient...");
let key_b = Self::authenticate_other(&mut stream, node_data.public_key)?;
let key_b = Self::authenticate_other(&mut stream, node_data.node().public_key)?;
log::trace!("Connection fully authenticated.");
let pre_secret = StaticSecret::from(secret_key.to_bytes()).diffie_hellman(&public_key);

@ -94,7 +94,7 @@ pub struct NodeListPayload {
pub struct NodeListElement {
pub id: String,
pub public_key: [u8; 32],
pub address: Option<String>,
pub addresses: Vec<String>,
}
impl VentedServer {
@ -122,7 +122,7 @@ impl VentedServer {
});
self.on(REDIRECT_EVENT, {
let manager = self.manager.clone();
let pool = Arc::clone(&self.pool);
let pool = Arc::clone(&self.sender_pool);
move |event| {
let payload = event.get_payload::<RedirectPayload>().ok()?;
@ -157,7 +157,7 @@ impl VentedServer {
self.on(REDIRECT_REDIRECTED_EVENT, {
let event_handler = Arc::clone(&self.event_handler);
let manager = self.manager.clone();
let pool = self.pool.clone();
let pool = self.sender_pool.clone();
let known_nodes = Arc::clone(&self.known_nodes);
move |event| {
@ -204,7 +204,7 @@ impl VentedServer {
let mut own_nodes = node_list.lock();
let origin = event.origin?;
if !own_nodes.get(&origin)?.trusted {
if !own_nodes.get(&origin)?.node().trusted {
log::warn!("Untrusted node '{}' tried to send network update!", origin);
return None;
}
@ -218,8 +218,9 @@ impl VentedServer {
id: node.id,
trusted: false,
public_key: PublicKey::from(node.public_key),
address: node.address,
},
addresses: node.addresses,
}
.into(),
);
new_nodes += 1;
}
@ -237,11 +238,11 @@ impl VentedServer {
let nodes = node_list
.lock()
.values()
.filter(|node| node.id != sender_id)
.filter(|node| node.node().id != sender_id)
.map(|node| NodeListElement {
id: node.id.clone(),
address: node.address.clone(),
public_key: node.public_key.to_bytes(),
id: node.node().id.clone(),
addresses: node.node().addresses.clone(),
public_key: node.node().public_key.to_bytes(),
})
.collect();

@ -15,7 +15,7 @@ use crate::utils::sync::AsyncValue;
use crate::WaitGroup;
const MAX_ENQUEUED_EVENTS: usize = 50;
const SEND_TIMEOUT_SECONDS: u64 = 60;
pub const CONNECTION_TIMEOUT_SECONDS: u64 = 5;
#[derive(Clone, Debug)]
pub struct ConcurrentStreamManager {
@ -55,7 +55,7 @@ impl ConcurrentStreamManager {
if let Some(emitter) = self.emitters.lock().get(target) {
if let Err(_) = emitter.send_timeout(
(event, value.clone()),
Duration::from_secs(SEND_TIMEOUT_SECONDS),
Duration::from_secs(CONNECTION_TIMEOUT_SECONDS),
) {
value.reject(VentedError::UnreachableNode(target.clone()));
}

@ -25,24 +25,33 @@ fn test_server_communication() {
let nodes = vec![
Node {
id: "A".to_string(),
address: Some("localhost:22222".to_string()),
addresses: vec!["localhost:22222".to_string()],
public_key: global_secret_a.public_key(),
trusted: true,
},
Node {
id: "B".to_string(),
address: None,
addresses: vec![],
public_key: global_secret_b.public_key(),
trusted: false,
},
Node {
id: "C".to_string(),
address: None,
addresses: vec![],
public_key: global_secret_c.public_key(),
trusted: false,
},
];
let mut server_a = VentedServer::new("A".to_string(), global_secret_a, nodes.clone(), 2, 100);
let mut nodes_a = nodes.clone();
for i in 0..10 {
nodes_a.push(Node {
id: format!("Node-{}", i),
addresses: vec!["192.168.178.1".to_string()],
public_key: global_secret_c.public_key(),
trusted: false,
})
}
let mut server_a = VentedServer::new("A".to_string(), global_secret_a, nodes_a, 20, 100);
let mut server_b = VentedServer::new("B".to_string(), global_secret_b, nodes.clone(), 3, 100);
let server_c = VentedServer::new("C".to_string(), global_secret_c, nodes, 3, 100);
let wg = server_a.listen("localhost:22222".to_string());
@ -63,6 +72,9 @@ fn test_server_communication() {
None
}
});
for i in 0..10 {
server_a.emit(format!("Nodes-{}", i), Event::new("ping"));
}
server_b
.emit("A", Event::new(NODE_LIST_REQUEST_EVENT))
.on_success(|_| println!("Success"))

Loading…
Cancel
Save