diff --git a/Cargo.lock b/Cargo.lock index 7e547f6..a3b40ef 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -33,39 +33,12 @@ dependencies = [ "winapi", ] -[[package]] -name = "arr_macro" -version = "0.1.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6a105bfda48707cf19220129e78fca01e9639433ffaef4163546ed8fb04120a5" -dependencies = [ - "arr_macro_impl", - "proc-macro-hack", -] - -[[package]] -name = "arr_macro_impl" -version = "0.1.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0609c78bd572f4edc74310dfb63a01f5609d53fa8b4dd7c4d98aef3b3e8d72d1" -dependencies = [ - "proc-macro-hack", - "quote", - "syn", -] - [[package]] name = "arrayvec" version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "23b62fc65de8e4e7f52534fb52b0f3ed04746ae267519eef2a83941e8085068b" -[[package]] -name = "async-task" -version = "4.0.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e91831deabf0d6d7ec49552e489aed63b7456a7a3c46cff62adad428110b0af0" - [[package]] name = "atty" version = "0.2.14" @@ -238,68 +211,12 @@ checksum = "8aebca1129a03dc6dc2b127edd729435bbc4a37e1d5f4d7513165089ceb02634" [[package]] name = "crossbeam-channel" -version = "0.4.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b153fe7cbef478c567df0f972e02e6d736db11affe43dfc9c56a9374d1adfb87" -dependencies = [ - "crossbeam-utils 0.7.2", - "maybe-uninit", -] - -[[package]] -name = "crossbeam-deque" -version = "0.7.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9f02af974daeee82218205558e51ec8768b48cf524bd01d550abe5573a608285" -dependencies = [ - "crossbeam-epoch", - "crossbeam-utils 0.7.2", - "maybe-uninit", -] - -[[package]] -name = "crossbeam-epoch" -version = "0.8.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "058ed274caafc1f60c4997b5fc07bf7dc7cca454af7c6e81edffe5f33f70dace" -dependencies = [ - "autocfg", - "cfg-if 0.1.10", - "crossbeam-utils 0.7.2", - "lazy_static", - "maybe-uninit", - "memoffset", - "scopeguard", -] - -[[package]] -name = "crossbeam-queue" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7c979cd6cfe72335896575c6b5688da489e420d36a27a0b9eb0c73db574b4a4b" -dependencies = [ - "crossbeam-utils 0.6.6", -] - -[[package]] -name = "crossbeam-utils" -version = "0.6.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "04973fa96e96579258a5091af6003abde64af786b860f18622b82e026cca60e6" -dependencies = [ - "cfg-if 0.1.10", - "lazy_static", -] - -[[package]] -name = "crossbeam-utils" -version = "0.7.2" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c3c7c73a2d1e9fc0886a08b93e98eb643461230d5f1925e4036204d5f2e261a8" +checksum = "dca26ee1f8d361640700bde38b2c37d8c22b3ce2d360e1fc1c74ea4b0aa7d775" dependencies = [ - "autocfg", - "cfg-if 0.1.10", - "lazy_static", + "cfg-if 1.0.0", + "crossbeam-utils", ] [[package]] @@ -363,24 +280,6 @@ dependencies = [ "termcolor", ] -[[package]] -name = "executors" -version = "0.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f99e7b1533b6caa2e16120bfc652aeb087c9197b1bf419edfc8587e6022f2fc9" -dependencies = [ - "arr_macro", - "async-task", - "crossbeam-channel", - "crossbeam-deque", - "crossbeam-utils 0.7.2", - "log", - "num_cpus", - "rand", - "synchronoise", - "threadpool", -] - [[package]] name = "fallible-iterator" version = "0.2.0" @@ -573,27 +472,12 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ffbee8634e0d45d258acb448e7eaab3fce7a0a467395d4d9f228e3c1f01fb2e4" -[[package]] -name = "maybe-uninit" -version = "2.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "60302e4db3a61da70c0cb7991976248362f30319e88850c487b9b95bbf059e00" - [[package]] name = "memchr" version = "2.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ee1c47aaa256ecabcaea351eae4a9b01ef39ed810004e298d2511ed284b1525" -[[package]] -name = "memoffset" -version = "0.5.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "043175f069eda7b85febe4a74abbaeff828d9f8b448515d3151a14a3542811aa" -dependencies = [ - "autocfg", -] - [[package]] name = "nix" version = "0.19.0" @@ -732,12 +616,6 @@ dependencies = [ "version_check", ] -[[package]] -name = "proc-macro-hack" -version = "0.5.19" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dbf0c48bc1d91375ae5c3cd81e3722dff1abcf81a30960240640d223f59fe0e5" - [[package]] name = "proc-macro2" version = "1.0.24" @@ -1051,15 +929,6 @@ dependencies = [ "unicode-xid", ] -[[package]] -name = "synchronoise" -version = "1.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d717ed0efc9d39ab3b642a096bc369a3e02a38a51c41845d7fe31bdad1d6eaeb" -dependencies = [ - "crossbeam-queue", -] - [[package]] name = "synstructure" version = "0.12.4" @@ -1099,15 +968,6 @@ dependencies = [ "lazy_static", ] -[[package]] -name = "threadpool" -version = "1.8.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d050e60b33d41c19108b32cea32164033a9013fe3b46cbd4457559bfbf77afaa" -dependencies = [ - "num_cpus", -] - [[package]] name = "time" version = "0.1.44" @@ -1176,20 +1036,21 @@ checksum = "f1bddf1187be692e79c5ffeab891132dfb0f236ed36a43c7ed39f1165ee20191" [[package]] name = "vented" -version = "0.9.1" +version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8c9e9ae15109d0c517d65788a39e290e7edfcc5200f4aa92cbc174ad262aaba2" +checksum = "e15d3be30d3ab31cd2b12f7929b20368fe9e8690dba0dd1d1c405456e6005e79" dependencies = [ "byteorder", - "crossbeam-utils 0.8.0", + "crossbeam-channel", + "crossbeam-utils", "crypto_box", - "executors", "generic-array", "log", "parking_lot", "rand", "rmp", "rmp-serde", + "scheduled-thread-pool", "serde 1.0.117", "sha2", "typenum", diff --git a/Cargo.toml b/Cargo.toml index afcd67f..0b393f5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,7 +7,7 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -vented = "0.9.1" +vented = "0.10.1" rusqlite = "0.24.1" rand = "0.7.3" base64 = "0.13.0" diff --git a/src/data/node_data.rs b/src/data/node_data.rs index b9b6d06..02a9551 100644 --- a/src/data/node_data.rs +++ b/src/data/node_data.rs @@ -1,10 +1,10 @@ -use serde::{Serialize, Deserialize}; -use vented::crypto::PublicKey; use crate::utils::keys::{armor_public_key, extract_public_key}; -use std::path::PathBuf; use crate::utils::result::SnekcloudResult; -use std::fs; use crate::utils::write_toml_pretty; +use serde::{Deserialize, Serialize}; +use std::fs; +use std::path::PathBuf; +use vented::stream::PublicKey; #[derive(Serialize, Deserialize)] pub struct NodeData { @@ -14,21 +14,12 @@ pub struct NodeData { } impl NodeData { - pub fn new(id: String, public_key: PublicKey) -> Self { - let public_key = armor_public_key(public_key); - Self { - id, - addresses: Vec::with_capacity(0), - public_key - } - } - pub fn with_addresses(id: String, addresses: Vec, public_key: PublicKey) -> Self { let public_key = armor_public_key(public_key); Self { id, addresses, - public_key + public_key, } } @@ -48,4 +39,4 @@ impl NodeData { pub fn public_key(&self) -> PublicKey { extract_public_key(&self.public_key).unwrap() } -} \ No newline at end of file +} diff --git a/src/main.rs b/src/main.rs index 89ebf3f..e36d171 100644 --- a/src/main.rs +++ b/src/main.rs @@ -11,7 +11,7 @@ use crate::utils::settings::{get_settings, Settings}; use std::fs; use std::path::PathBuf; use structopt::StructOpt; -use vented::crypto::SecretKey; +use vented::stream::SecretKey; #[macro_use] extern crate lazy_static; diff --git a/src/modules/heartbeat/mod.rs b/src/modules/heartbeat/mod.rs index 46ac9c0..9085cbb 100644 --- a/src/modules/heartbeat/mod.rs +++ b/src/modules/heartbeat/mod.rs @@ -116,7 +116,8 @@ impl Module for HeartbeatModule { pool: &mut ScheduledThreadPool, ) -> SnekcloudResult<()> { if self.last_tick.elapsed() > self.settings.interval() { - for node in context.nodes() { + log::trace!("Sending heartbeat..."); + for node in context.living_nodes() { let mut future = context.emit( node.id.clone(), Event::with_payload( @@ -128,8 +129,7 @@ impl Module for HeartbeatModule { pool.execute(move || { if let Some(Err(e)) = future.get_value_with_timeout(Duration::from_secs(10)) { log::debug!("Node {} is not reachable: {}", node.id, e); - let mut states = states.lock(); - Self::insert_state(&mut states, node.id, NodeInfo::dead()); + Self::insert_state(&mut states.lock(), node.id, NodeInfo::dead()); } }); } diff --git a/src/modules/nodes_refresh/mod.rs b/src/modules/nodes_refresh/mod.rs index ab0c7d8..04677de 100644 --- a/src/modules/nodes_refresh/mod.rs +++ b/src/modules/nodes_refresh/mod.rs @@ -11,11 +11,11 @@ use std::path::PathBuf; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::{Duration, Instant, UNIX_EPOCH}; -use vented::crypto::PublicKey; use vented::event::Event; use vented::server::data::Node; use vented::server::server_events::{NodeListPayload, NODE_LIST_REQUEST_EVENT}; use vented::server::VentedServer; +use vented::stream::PublicKey; pub mod settings; @@ -58,7 +58,7 @@ impl Module for NodesRefreshModule { id: node.id, trusted: false, public_key: PublicKey::from(node.public_key), - address: node.address, + addresses: node.addresses, }, ); new_nodes = true; @@ -83,11 +83,7 @@ impl Module for NodesRefreshModule { .values() .cloned() .map(|node| { - if let Some(address) = node.address { - NodeData::with_addresses(node.id, vec![address], node.public_key) - } else { - NodeData::new(node.id, node.public_key) - } + NodeData::with_addresses(node.id, node.addresses, node.public_key) }) .for_each(|data| { let mut path = nodes_folder.clone(); @@ -115,7 +111,7 @@ impl Module for NodesRefreshModule { ) -> SnekcloudResult<()> { if self.last_request.elapsed() > self.settings.update_interval() { context - .nodes() + .living_nodes() .iter() .filter(|node| node.trusted) .for_each(|node| { diff --git a/src/server/mod.rs b/src/server/mod.rs index 8e27148..c097427 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -3,14 +3,16 @@ use crate::server::tick_context::TickContext; use crate::utils::result::{SnekcloudError, SnekcloudResult}; use parking_lot::Mutex; use scheduled_thread_pool::ScheduledThreadPool; +use std::cmp::max; use std::collections::HashMap; use std::mem; use std::sync::mpsc::channel; use std::sync::Arc; use std::time::Duration; -use vented::crypto::SecretKey; use vented::server::data::Node; use vented::server::VentedServer; +use vented::stream::SecretKey; +use vented::utils::result::VentedError; use vented::WaitGroup; pub mod tick_context; @@ -21,21 +23,19 @@ pub struct SnekcloudServer { inner: VentedServer, listen_addresses: Vec, listeners: Vec, - module_pool: Arc>, + module_pool: HashMap>>, modules: HashMap>, } impl SnekcloudServer { /// Creates a new snekcloud server with the provided keys and number of threads pub fn new(id: String, private_key: SecretKey, keys: Vec, num_threads: usize) -> Self { + let num_threads = max(num_threads, keys.len()); Self { - inner: VentedServer::new(id, private_key, keys, num_threads), + inner: VentedServer::new(id, private_key, keys, num_threads * 2, num_threads * 10), listen_addresses: Vec::new(), listeners: Vec::new(), - module_pool: Arc::new(Mutex::new(ScheduledThreadPool::with_name( - "modules", - num_threads, - ))), + module_pool: HashMap::new(), modules: HashMap::new(), } } @@ -52,36 +52,50 @@ impl SnekcloudServer { } let modules = mem::take(&mut self.modules); - let module_pool = Arc::clone(&self.module_pool); let (tx, rx) = channel(); let tick_context = TickContext::new(self.inner.node_id(), tx, self.inner.nodes_ref()); + let node_count = self.inner.nodes().len(); for (name, mut module) in modules { - module_pool.lock().execute_at_fixed_rate( - Duration::from_millis(SERVER_TICK_RATE_MS), - Duration::from_millis(SERVER_TICK_RATE_MS), - { - let module_pool = Arc::clone(&module_pool); - let tick_context = TickContext::clone(&tick_context); - move || { - let mut module_pool = module_pool.lock(); - - if let Err(e) = - module.tick(TickContext::clone(&tick_context), &mut module_pool) - { - log::error!("Error when ticking module {}: {}", name, e); + self.module_pool + .get(&name) + .unwrap() + .lock() + .execute_at_fixed_rate( + Duration::from_millis(SERVER_TICK_RATE_MS), + Duration::from_millis(SERVER_TICK_RATE_MS), + { + let mut module_pool = ScheduledThreadPool::new(1); + let tick_context = TickContext::clone(&tick_context); + move || { + if let Err(e) = + module.tick(TickContext::clone(&tick_context), &mut module_pool) + { + log::error!("Error when ticking module {}: {}", name, e); + } } - } - }, - ); + }, + ); } + let invocation_pool = ScheduledThreadPool::new(node_count); for invocation in rx { - let mut future = self.inner.emit(invocation.target_node, invocation.event); + let mut future = self + .inner + .emit(invocation.target_node.clone(), invocation.event); let mut invocation_result = invocation.result; + let node_id = invocation.target_node; + + invocation_pool.execute(move || { + let result = future.get_value_with_timeout(Duration::from_secs(60)); - module_pool.lock().execute(move || { - let result = future.get_value(); - invocation_result.result(result.map_err(SnekcloudError::from)); + if let Some(result) = result { + invocation_result.result(result.map_err(SnekcloudError::from)); + } else { + log::error!("Failed to send event: Timeout after 5s"); + invocation_result.reject(SnekcloudError::Vented(VentedError::UnreachableNode( + node_id, + ))); + } }); } @@ -93,9 +107,10 @@ impl SnekcloudServer { &mut self, mut module: impl Module + Send + Sync, ) -> SnekcloudResult<()> { - let mut module_pool = self.module_pool.lock(); + let module_pool = Arc::new(Mutex::new(ScheduledThreadPool::new(2))); - module.init(&mut self.inner, &mut module_pool)?; + module.init(&mut self.inner, &mut module_pool.lock())?; + self.module_pool.insert(module.name(), module_pool); self.modules.insert(module.name(), module.boxed()); Ok(()) diff --git a/src/server/tick_context.rs b/src/server/tick_context.rs index af3076f..c4b7e8a 100644 --- a/src/server/tick_context.rs +++ b/src/server/tick_context.rs @@ -4,12 +4,12 @@ use std::collections::HashMap; use std::sync::mpsc::Sender; use std::sync::Arc; use vented::event::Event; -use vented::server::data::Node; +use vented::server::data::{Node, NodeData}; use vented::utils::sync::AsyncValue; #[derive(Clone)] pub struct TickContext { - nodes: Arc>>, + nodes: Arc>>, event_sender: Sender, node_id: String, } @@ -24,7 +24,7 @@ impl TickContext { pub fn new( node_id: String, sender: Sender, - nodes: Arc>>, + nodes: Arc>>, ) -> Self { Self { nodes, @@ -51,8 +51,33 @@ impl TickContext { } /// Returns a copy of the nodes of the server + #[allow(dead_code)] pub fn nodes(&self) -> Vec { - self.nodes.lock().values().cloned().collect() + self.nodes + .lock() + .values() + .cloned() + .map(Node::from) + .collect() + } + + pub fn living_nodes(&self) -> Vec { + self.nodes + .lock() + .values() + .cloned() + .filter(|node| !node.is_dead()) + .map(Node::from) + .collect() + } + + #[allow(dead_code)] + pub fn check_alive(&self, node_id: &String) -> bool { + if let Some(node) = self.nodes.lock().get(node_id) { + !node.is_dead() + } else { + false + } } /// Returns the node diff --git a/src/utils/keys.rs b/src/utils/keys.rs index e58b39b..32c6b14 100644 --- a/src/utils/keys.rs +++ b/src/utils/keys.rs @@ -3,8 +3,8 @@ use crate::utils::result::{SnekcloudError, SnekcloudResult}; use crate::utils::settings::get_settings; use std::fs::create_dir; use std::path::{Path, PathBuf}; -use vented::crypto::{PublicKey, SecretKey}; use vented::server::data::Node; +use vented::stream::{PublicKey, SecretKey}; const PRIVATE_KEY_HEADER_LINE: &str = "---BEGIN-SNEKCLOUD-PRIVATE-KEY---\n"; const PRIVATE_KEY_FOOTER_LINE: &str = "\n---END-SNEKCLOUD-PRIVATE-KEY---"; @@ -21,11 +21,11 @@ pub fn read_node_keys(path: &PathBuf) -> SnekcloudResult> { let content = glob::glob(format!("{}/*.toml", path.to_string_lossy()).as_str())? .filter_map(|path| { - let mut data = NodeData::from_file(path.ok()?).ok()?; + let data = NodeData::from_file(path.ok()?).ok()?; Some(Node { public_key: data.public_key(), - address: data.addresses.pop(), + addresses: data.addresses, trusted: trusted_nodes.contains(&data.id), id: data.id, })