Update vented and fix sending heartbeats to dead nodes

Signed-off-by: trivernis <trivernis@protonmail.com>
pull/1/head
trivernis 4 years ago
parent 32ae74fa34
commit 44ea715db7
Signed by: Trivernis
GPG Key ID: DFFFCC2C7A02DB45

157
Cargo.lock generated

@ -33,39 +33,12 @@ dependencies = [
"winapi",
]
[[package]]
name = "arr_macro"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6a105bfda48707cf19220129e78fca01e9639433ffaef4163546ed8fb04120a5"
dependencies = [
"arr_macro_impl",
"proc-macro-hack",
]
[[package]]
name = "arr_macro_impl"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0609c78bd572f4edc74310dfb63a01f5609d53fa8b4dd7c4d98aef3b3e8d72d1"
dependencies = [
"proc-macro-hack",
"quote",
"syn",
]
[[package]]
name = "arrayvec"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "23b62fc65de8e4e7f52534fb52b0f3ed04746ae267519eef2a83941e8085068b"
[[package]]
name = "async-task"
version = "4.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e91831deabf0d6d7ec49552e489aed63b7456a7a3c46cff62adad428110b0af0"
[[package]]
name = "atty"
version = "0.2.14"
@ -238,68 +211,12 @@ checksum = "8aebca1129a03dc6dc2b127edd729435bbc4a37e1d5f4d7513165089ceb02634"
[[package]]
name = "crossbeam-channel"
version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b153fe7cbef478c567df0f972e02e6d736db11affe43dfc9c56a9374d1adfb87"
dependencies = [
"crossbeam-utils 0.7.2",
"maybe-uninit",
]
[[package]]
name = "crossbeam-deque"
version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9f02af974daeee82218205558e51ec8768b48cf524bd01d550abe5573a608285"
dependencies = [
"crossbeam-epoch",
"crossbeam-utils 0.7.2",
"maybe-uninit",
]
[[package]]
name = "crossbeam-epoch"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "058ed274caafc1f60c4997b5fc07bf7dc7cca454af7c6e81edffe5f33f70dace"
dependencies = [
"autocfg",
"cfg-if 0.1.10",
"crossbeam-utils 0.7.2",
"lazy_static",
"maybe-uninit",
"memoffset",
"scopeguard",
]
[[package]]
name = "crossbeam-queue"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7c979cd6cfe72335896575c6b5688da489e420d36a27a0b9eb0c73db574b4a4b"
dependencies = [
"crossbeam-utils 0.6.6",
]
[[package]]
name = "crossbeam-utils"
version = "0.6.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "04973fa96e96579258a5091af6003abde64af786b860f18622b82e026cca60e6"
dependencies = [
"cfg-if 0.1.10",
"lazy_static",
]
[[package]]
name = "crossbeam-utils"
version = "0.7.2"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c3c7c73a2d1e9fc0886a08b93e98eb643461230d5f1925e4036204d5f2e261a8"
checksum = "dca26ee1f8d361640700bde38b2c37d8c22b3ce2d360e1fc1c74ea4b0aa7d775"
dependencies = [
"autocfg",
"cfg-if 0.1.10",
"lazy_static",
"cfg-if 1.0.0",
"crossbeam-utils",
]
[[package]]
@ -363,24 +280,6 @@ dependencies = [
"termcolor",
]
[[package]]
name = "executors"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f99e7b1533b6caa2e16120bfc652aeb087c9197b1bf419edfc8587e6022f2fc9"
dependencies = [
"arr_macro",
"async-task",
"crossbeam-channel",
"crossbeam-deque",
"crossbeam-utils 0.7.2",
"log",
"num_cpus",
"rand",
"synchronoise",
"threadpool",
]
[[package]]
name = "fallible-iterator"
version = "0.2.0"
@ -573,27 +472,12 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ffbee8634e0d45d258acb448e7eaab3fce7a0a467395d4d9f228e3c1f01fb2e4"
[[package]]
name = "maybe-uninit"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "60302e4db3a61da70c0cb7991976248362f30319e88850c487b9b95bbf059e00"
[[package]]
name = "memchr"
version = "2.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ee1c47aaa256ecabcaea351eae4a9b01ef39ed810004e298d2511ed284b1525"
[[package]]
name = "memoffset"
version = "0.5.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "043175f069eda7b85febe4a74abbaeff828d9f8b448515d3151a14a3542811aa"
dependencies = [
"autocfg",
]
[[package]]
name = "nix"
version = "0.19.0"
@ -732,12 +616,6 @@ dependencies = [
"version_check",
]
[[package]]
name = "proc-macro-hack"
version = "0.5.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dbf0c48bc1d91375ae5c3cd81e3722dff1abcf81a30960240640d223f59fe0e5"
[[package]]
name = "proc-macro2"
version = "1.0.24"
@ -1051,15 +929,6 @@ dependencies = [
"unicode-xid",
]
[[package]]
name = "synchronoise"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d717ed0efc9d39ab3b642a096bc369a3e02a38a51c41845d7fe31bdad1d6eaeb"
dependencies = [
"crossbeam-queue",
]
[[package]]
name = "synstructure"
version = "0.12.4"
@ -1099,15 +968,6 @@ dependencies = [
"lazy_static",
]
[[package]]
name = "threadpool"
version = "1.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d050e60b33d41c19108b32cea32164033a9013fe3b46cbd4457559bfbf77afaa"
dependencies = [
"num_cpus",
]
[[package]]
name = "time"
version = "0.1.44"
@ -1176,20 +1036,21 @@ checksum = "f1bddf1187be692e79c5ffeab891132dfb0f236ed36a43c7ed39f1165ee20191"
[[package]]
name = "vented"
version = "0.9.1"
version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8c9e9ae15109d0c517d65788a39e290e7edfcc5200f4aa92cbc174ad262aaba2"
checksum = "e15d3be30d3ab31cd2b12f7929b20368fe9e8690dba0dd1d1c405456e6005e79"
dependencies = [
"byteorder",
"crossbeam-utils 0.8.0",
"crossbeam-channel",
"crossbeam-utils",
"crypto_box",
"executors",
"generic-array",
"log",
"parking_lot",
"rand",
"rmp",
"rmp-serde",
"scheduled-thread-pool",
"serde 1.0.117",
"sha2",
"typenum",

@ -7,7 +7,7 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
vented = "0.9.1"
vented = "0.10.1"
rusqlite = "0.24.1"
rand = "0.7.3"
base64 = "0.13.0"

@ -1,10 +1,10 @@
use serde::{Serialize, Deserialize};
use vented::crypto::PublicKey;
use crate::utils::keys::{armor_public_key, extract_public_key};
use std::path::PathBuf;
use crate::utils::result::SnekcloudResult;
use std::fs;
use crate::utils::write_toml_pretty;
use serde::{Deserialize, Serialize};
use std::fs;
use std::path::PathBuf;
use vented::stream::PublicKey;
#[derive(Serialize, Deserialize)]
pub struct NodeData {
@ -14,21 +14,12 @@ pub struct NodeData {
}
impl NodeData {
pub fn new(id: String, public_key: PublicKey) -> Self {
let public_key = armor_public_key(public_key);
Self {
id,
addresses: Vec::with_capacity(0),
public_key
}
}
pub fn with_addresses(id: String, addresses: Vec<String>, public_key: PublicKey) -> Self {
let public_key = armor_public_key(public_key);
Self {
id,
addresses,
public_key
public_key,
}
}

@ -11,7 +11,7 @@ use crate::utils::settings::{get_settings, Settings};
use std::fs;
use std::path::PathBuf;
use structopt::StructOpt;
use vented::crypto::SecretKey;
use vented::stream::SecretKey;
#[macro_use]
extern crate lazy_static;

@ -116,7 +116,8 @@ impl Module for HeartbeatModule {
pool: &mut ScheduledThreadPool,
) -> SnekcloudResult<()> {
if self.last_tick.elapsed() > self.settings.interval() {
for node in context.nodes() {
log::trace!("Sending heartbeat...");
for node in context.living_nodes() {
let mut future = context.emit(
node.id.clone(),
Event::with_payload(
@ -128,8 +129,7 @@ impl Module for HeartbeatModule {
pool.execute(move || {
if let Some(Err(e)) = future.get_value_with_timeout(Duration::from_secs(10)) {
log::debug!("Node {} is not reachable: {}", node.id, e);
let mut states = states.lock();
Self::insert_state(&mut states, node.id, NodeInfo::dead());
Self::insert_state(&mut states.lock(), node.id, NodeInfo::dead());
}
});
}

@ -11,11 +11,11 @@ use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant, UNIX_EPOCH};
use vented::crypto::PublicKey;
use vented::event::Event;
use vented::server::data::Node;
use vented::server::server_events::{NodeListPayload, NODE_LIST_REQUEST_EVENT};
use vented::server::VentedServer;
use vented::stream::PublicKey;
pub mod settings;
@ -58,7 +58,7 @@ impl Module for NodesRefreshModule {
id: node.id,
trusted: false,
public_key: PublicKey::from(node.public_key),
address: node.address,
addresses: node.addresses,
},
);
new_nodes = true;
@ -83,11 +83,7 @@ impl Module for NodesRefreshModule {
.values()
.cloned()
.map(|node| {
if let Some(address) = node.address {
NodeData::with_addresses(node.id, vec![address], node.public_key)
} else {
NodeData::new(node.id, node.public_key)
}
NodeData::with_addresses(node.id, node.addresses, node.public_key)
})
.for_each(|data| {
let mut path = nodes_folder.clone();
@ -115,7 +111,7 @@ impl Module for NodesRefreshModule {
) -> SnekcloudResult<()> {
if self.last_request.elapsed() > self.settings.update_interval() {
context
.nodes()
.living_nodes()
.iter()
.filter(|node| node.trusted)
.for_each(|node| {

@ -3,14 +3,16 @@ use crate::server::tick_context::TickContext;
use crate::utils::result::{SnekcloudError, SnekcloudResult};
use parking_lot::Mutex;
use scheduled_thread_pool::ScheduledThreadPool;
use std::cmp::max;
use std::collections::HashMap;
use std::mem;
use std::sync::mpsc::channel;
use std::sync::Arc;
use std::time::Duration;
use vented::crypto::SecretKey;
use vented::server::data::Node;
use vented::server::VentedServer;
use vented::stream::SecretKey;
use vented::utils::result::VentedError;
use vented::WaitGroup;
pub mod tick_context;
@ -21,21 +23,19 @@ pub struct SnekcloudServer {
inner: VentedServer,
listen_addresses: Vec<String>,
listeners: Vec<WaitGroup>,
module_pool: Arc<Mutex<ScheduledThreadPool>>,
module_pool: HashMap<String, Arc<Mutex<ScheduledThreadPool>>>,
modules: HashMap<String, Box<dyn Module + Send + Sync>>,
}
impl SnekcloudServer {
/// Creates a new snekcloud server with the provided keys and number of threads
pub fn new(id: String, private_key: SecretKey, keys: Vec<Node>, num_threads: usize) -> Self {
let num_threads = max(num_threads, keys.len());
Self {
inner: VentedServer::new(id, private_key, keys, num_threads),
inner: VentedServer::new(id, private_key, keys, num_threads * 2, num_threads * 10),
listen_addresses: Vec::new(),
listeners: Vec::new(),
module_pool: Arc::new(Mutex::new(ScheduledThreadPool::with_name(
"modules",
num_threads,
))),
module_pool: HashMap::new(),
modules: HashMap::new(),
}
}
@ -52,36 +52,50 @@ impl SnekcloudServer {
}
let modules = mem::take(&mut self.modules);
let module_pool = Arc::clone(&self.module_pool);
let (tx, rx) = channel();
let tick_context = TickContext::new(self.inner.node_id(), tx, self.inner.nodes_ref());
let node_count = self.inner.nodes().len();
for (name, mut module) in modules {
module_pool.lock().execute_at_fixed_rate(
Duration::from_millis(SERVER_TICK_RATE_MS),
Duration::from_millis(SERVER_TICK_RATE_MS),
{
let module_pool = Arc::clone(&module_pool);
let tick_context = TickContext::clone(&tick_context);
move || {
let mut module_pool = module_pool.lock();
if let Err(e) =
module.tick(TickContext::clone(&tick_context), &mut module_pool)
{
log::error!("Error when ticking module {}: {}", name, e);
self.module_pool
.get(&name)
.unwrap()
.lock()
.execute_at_fixed_rate(
Duration::from_millis(SERVER_TICK_RATE_MS),
Duration::from_millis(SERVER_TICK_RATE_MS),
{
let mut module_pool = ScheduledThreadPool::new(1);
let tick_context = TickContext::clone(&tick_context);
move || {
if let Err(e) =
module.tick(TickContext::clone(&tick_context), &mut module_pool)
{
log::error!("Error when ticking module {}: {}", name, e);
}
}
}
},
);
},
);
}
let invocation_pool = ScheduledThreadPool::new(node_count);
for invocation in rx {
let mut future = self.inner.emit(invocation.target_node, invocation.event);
let mut future = self
.inner
.emit(invocation.target_node.clone(), invocation.event);
let mut invocation_result = invocation.result;
let node_id = invocation.target_node;
invocation_pool.execute(move || {
let result = future.get_value_with_timeout(Duration::from_secs(60));
module_pool.lock().execute(move || {
let result = future.get_value();
invocation_result.result(result.map_err(SnekcloudError::from));
if let Some(result) = result {
invocation_result.result(result.map_err(SnekcloudError::from));
} else {
log::error!("Failed to send event: Timeout after 5s");
invocation_result.reject(SnekcloudError::Vented(VentedError::UnreachableNode(
node_id,
)));
}
});
}
@ -93,9 +107,10 @@ impl SnekcloudServer {
&mut self,
mut module: impl Module + Send + Sync,
) -> SnekcloudResult<()> {
let mut module_pool = self.module_pool.lock();
let module_pool = Arc::new(Mutex::new(ScheduledThreadPool::new(2)));
module.init(&mut self.inner, &mut module_pool)?;
module.init(&mut self.inner, &mut module_pool.lock())?;
self.module_pool.insert(module.name(), module_pool);
self.modules.insert(module.name(), module.boxed());
Ok(())

@ -4,12 +4,12 @@ use std::collections::HashMap;
use std::sync::mpsc::Sender;
use std::sync::Arc;
use vented::event::Event;
use vented::server::data::Node;
use vented::server::data::{Node, NodeData};
use vented::utils::sync::AsyncValue;
#[derive(Clone)]
pub struct TickContext {
nodes: Arc<Mutex<HashMap<String, Node>>>,
nodes: Arc<Mutex<HashMap<String, NodeData>>>,
event_sender: Sender<EventInvocation>,
node_id: String,
}
@ -24,7 +24,7 @@ impl TickContext {
pub fn new(
node_id: String,
sender: Sender<EventInvocation>,
nodes: Arc<Mutex<HashMap<String, Node>>>,
nodes: Arc<Mutex<HashMap<String, NodeData>>>,
) -> Self {
Self {
nodes,
@ -51,8 +51,33 @@ impl TickContext {
}
/// Returns a copy of the nodes of the server
#[allow(dead_code)]
pub fn nodes(&self) -> Vec<Node> {
self.nodes.lock().values().cloned().collect()
self.nodes
.lock()
.values()
.cloned()
.map(Node::from)
.collect()
}
pub fn living_nodes(&self) -> Vec<Node> {
self.nodes
.lock()
.values()
.cloned()
.filter(|node| !node.is_dead())
.map(Node::from)
.collect()
}
#[allow(dead_code)]
pub fn check_alive(&self, node_id: &String) -> bool {
if let Some(node) = self.nodes.lock().get(node_id) {
!node.is_dead()
} else {
false
}
}
/// Returns the node

@ -3,8 +3,8 @@ use crate::utils::result::{SnekcloudError, SnekcloudResult};
use crate::utils::settings::get_settings;
use std::fs::create_dir;
use std::path::{Path, PathBuf};
use vented::crypto::{PublicKey, SecretKey};
use vented::server::data::Node;
use vented::stream::{PublicKey, SecretKey};
const PRIVATE_KEY_HEADER_LINE: &str = "---BEGIN-SNEKCLOUD-PRIVATE-KEY---\n";
const PRIVATE_KEY_FOOTER_LINE: &str = "\n---END-SNEKCLOUD-PRIVATE-KEY---";
@ -21,11 +21,11 @@ pub fn read_node_keys(path: &PathBuf) -> SnekcloudResult<Vec<Node>> {
let content = glob::glob(format!("{}/*.toml", path.to_string_lossy()).as_str())?
.filter_map(|path| {
let mut data = NodeData::from_file(path.ok()?).ok()?;
let data = NodeData::from_file(path.ok()?).ok()?;
Some(Node {
public_key: data.public_key(),
address: data.addresses.pop(),
addresses: data.addresses,
trusted: trusted_nodes.contains(&data.id),
id: data.id,
})

Loading…
Cancel
Save