From 32ae74fa34ae1b466a05a92e1c6b2917c172322f Mon Sep 17 00:00:00 2001 From: trivernis Date: Mon, 9 Nov 2020 12:42:48 +0100 Subject: [PATCH] Update vented to async emitter version Signed-off-by: trivernis --- Cargo.lock | 158 ++++++++++++++++++++++++++++++- Cargo.toml | 2 +- src/modules/heartbeat/mod.rs | 13 +-- src/modules/mod.rs | 16 ++-- src/modules/nodes_refresh/mod.rs | 100 +++++++++++-------- src/server/mod.rs | 15 +-- src/server/tick_context.rs | 39 +++++--- src/utils/result.rs | 9 +- 8 files changed, 266 insertions(+), 86 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5e75c76..7e547f6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -33,12 +33,39 @@ dependencies = [ "winapi", ] +[[package]] +name = "arr_macro" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a105bfda48707cf19220129e78fca01e9639433ffaef4163546ed8fb04120a5" +dependencies = [ + "arr_macro_impl", + "proc-macro-hack", +] + +[[package]] +name = "arr_macro_impl" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0609c78bd572f4edc74310dfb63a01f5609d53fa8b4dd7c4d98aef3b3e8d72d1" +dependencies = [ + "proc-macro-hack", + "quote", + "syn", +] + [[package]] name = "arrayvec" version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "23b62fc65de8e4e7f52534fb52b0f3ed04746ae267519eef2a83941e8085068b" +[[package]] +name = "async-task" +version = "4.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e91831deabf0d6d7ec49552e489aed63b7456a7a3c46cff62adad428110b0af0" + [[package]] name = "atty" version = "0.2.14" @@ -209,6 +236,72 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8aebca1129a03dc6dc2b127edd729435bbc4a37e1d5f4d7513165089ceb02634" +[[package]] +name = "crossbeam-channel" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b153fe7cbef478c567df0f972e02e6d736db11affe43dfc9c56a9374d1adfb87" +dependencies = [ + "crossbeam-utils 0.7.2", + "maybe-uninit", +] + +[[package]] +name = "crossbeam-deque" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9f02af974daeee82218205558e51ec8768b48cf524bd01d550abe5573a608285" +dependencies = [ + "crossbeam-epoch", + "crossbeam-utils 0.7.2", + "maybe-uninit", +] + +[[package]] +name = "crossbeam-epoch" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "058ed274caafc1f60c4997b5fc07bf7dc7cca454af7c6e81edffe5f33f70dace" +dependencies = [ + "autocfg", + "cfg-if 0.1.10", + "crossbeam-utils 0.7.2", + "lazy_static", + "maybe-uninit", + "memoffset", + "scopeguard", +] + +[[package]] +name = "crossbeam-queue" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7c979cd6cfe72335896575c6b5688da489e420d36a27a0b9eb0c73db574b4a4b" +dependencies = [ + "crossbeam-utils 0.6.6", +] + +[[package]] +name = "crossbeam-utils" +version = "0.6.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "04973fa96e96579258a5091af6003abde64af786b860f18622b82e026cca60e6" +dependencies = [ + "cfg-if 0.1.10", + "lazy_static", +] + +[[package]] +name = "crossbeam-utils" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c3c7c73a2d1e9fc0886a08b93e98eb643461230d5f1925e4036204d5f2e261a8" +dependencies = [ + "autocfg", + "cfg-if 0.1.10", + "lazy_static", +] + [[package]] name = "crossbeam-utils" version = "0.8.0" @@ -270,6 +363,24 @@ dependencies = [ "termcolor", ] +[[package]] +name = "executors" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f99e7b1533b6caa2e16120bfc652aeb087c9197b1bf419edfc8587e6022f2fc9" +dependencies = [ + "arr_macro", + "async-task", + "crossbeam-channel", + "crossbeam-deque", + "crossbeam-utils 0.7.2", + "log", + "num_cpus", + "rand", + "synchronoise", + "threadpool", +] + [[package]] name = "fallible-iterator" version = "0.2.0" @@ -462,12 +573,27 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ffbee8634e0d45d258acb448e7eaab3fce7a0a467395d4d9f228e3c1f01fb2e4" +[[package]] +name = "maybe-uninit" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "60302e4db3a61da70c0cb7991976248362f30319e88850c487b9b95bbf059e00" + [[package]] name = "memchr" version = "2.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ee1c47aaa256ecabcaea351eae4a9b01ef39ed810004e298d2511ed284b1525" +[[package]] +name = "memoffset" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "043175f069eda7b85febe4a74abbaeff828d9f8b448515d3151a14a3542811aa" +dependencies = [ + "autocfg", +] + [[package]] name = "nix" version = "0.19.0" @@ -606,6 +732,12 @@ dependencies = [ "version_check", ] +[[package]] +name = "proc-macro-hack" +version = "0.5.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dbf0c48bc1d91375ae5c3cd81e3722dff1abcf81a30960240640d223f59fe0e5" + [[package]] name = "proc-macro2" version = "1.0.24" @@ -919,6 +1051,15 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "synchronoise" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d717ed0efc9d39ab3b642a096bc369a3e02a38a51c41845d7fe31bdad1d6eaeb" +dependencies = [ + "crossbeam-queue", +] + [[package]] name = "synstructure" version = "0.12.4" @@ -958,6 +1099,15 @@ dependencies = [ "lazy_static", ] +[[package]] +name = "threadpool" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d050e60b33d41c19108b32cea32164033a9013fe3b46cbd4457559bfbf77afaa" +dependencies = [ + "num_cpus", +] + [[package]] name = "time" version = "0.1.44" @@ -1026,20 +1176,20 @@ checksum = "f1bddf1187be692e79c5ffeab891132dfb0f236ed36a43c7ed39f1165ee20191" [[package]] name = "vented" -version = "0.8.1" +version = "0.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0bb03c973ba228728f10ab9480ca5b74d8dc3bd57d2da29476efa0d0332b4688" +checksum = "8c9e9ae15109d0c517d65788a39e290e7edfcc5200f4aa92cbc174ad262aaba2" dependencies = [ "byteorder", - "crossbeam-utils", + "crossbeam-utils 0.8.0", "crypto_box", + "executors", "generic-array", "log", "parking_lot", "rand", "rmp", "rmp-serde", - "scheduled-thread-pool", "serde 1.0.117", "sha2", "typenum", diff --git a/Cargo.toml b/Cargo.toml index 2cc5985..afcd67f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,7 +7,7 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -vented = "0.8.1" +vented = "0.9.1" rusqlite = "0.24.1" rand = "0.7.3" base64 = "0.13.0" diff --git a/src/modules/heartbeat/mod.rs b/src/modules/heartbeat/mod.rs index e6f53e0..46ac9c0 100644 --- a/src/modules/heartbeat/mod.rs +++ b/src/modules/heartbeat/mod.rs @@ -12,7 +12,6 @@ use std::collections::HashMap; use std::sync::Arc; use std::time::{Duration, Instant}; use vented::event::Event; -use vented::result::VentedResult; use vented::server::VentedServer; mod payloads; @@ -115,7 +114,7 @@ impl Module for HeartbeatModule { &mut self, mut context: TickContext, pool: &mut ScheduledThreadPool, - ) -> VentedResult<()> { + ) -> SnekcloudResult<()> { if self.last_tick.elapsed() > self.settings.interval() { for node in context.nodes() { let mut future = context.emit( @@ -127,12 +126,10 @@ impl Module for HeartbeatModule { ); let states = Arc::clone(&self.node_states); pool.execute(move || { - if let Some(value) = future.get_value_with_timeout(Duration::from_secs(10)) { - if let Err(e) = &*value { - log::debug!("Node {} is not reachable: {}", node.id, e); - let mut states = states.lock(); - Self::insert_state(&mut states, node.id, NodeInfo::dead()); - } + if let Some(Err(e)) = future.get_value_with_timeout(Duration::from_secs(10)) { + log::debug!("Node {} is not reachable: {}", node.id, e); + let mut states = states.lock(); + Self::insert_state(&mut states, node.id, NodeInfo::dead()); } }); } diff --git a/src/modules/mod.rs b/src/modules/mod.rs index a41d1a2..3502b9f 100644 --- a/src/modules/mod.rs +++ b/src/modules/mod.rs @@ -1,15 +1,19 @@ -use vented::server::VentedServer; +use crate::server::tick_context::TickContext; use crate::utils::result::SnekcloudResult; use scheduled_thread_pool::ScheduledThreadPool; -use vented::result::VentedResult; -use crate::server::tick_context::TickContext; +use vented::server::VentedServer; pub mod heartbeat; pub mod nodes_refresh; pub trait Module { fn name(&self) -> String; - fn init(&mut self, server: &mut VentedServer, pool: &mut ScheduledThreadPool) -> SnekcloudResult<()>; + fn init( + &mut self, + server: &mut VentedServer, + pool: &mut ScheduledThreadPool, + ) -> SnekcloudResult<()>; fn boxed(self) -> Box; - fn tick(&mut self, context: TickContext, pool: &mut ScheduledThreadPool) -> VentedResult<()>; -} \ No newline at end of file + fn tick(&mut self, context: TickContext, pool: &mut ScheduledThreadPool) + -> SnekcloudResult<()>; +} diff --git a/src/modules/nodes_refresh/mod.rs b/src/modules/nodes_refresh/mod.rs index f023937..ab0c7d8 100644 --- a/src/modules/nodes_refresh/mod.rs +++ b/src/modules/nodes_refresh/mod.rs @@ -1,24 +1,23 @@ +use crate::data::node_data::NodeData; +use crate::modules::nodes_refresh::settings::NodesRefreshSettings; use crate::modules::Module; -use vented::result::VentedResult; -use vented::server::VentedServer; +use crate::server::tick_context::TickContext; use crate::utils::result::SnekcloudResult; -use scheduled_thread_pool::ScheduledThreadPool; -use vented::server::server_events::{NodeListPayload, NODE_LIST_REQUEST_EVENT}; -use vented::server::data::Node; -use std::sync::Arc; +use crate::utils::settings::get_settings; use parking_lot::Mutex; -use std::collections::{HashMap}; +use scheduled_thread_pool::ScheduledThreadPool; +use std::collections::HashMap; +use std::path::PathBuf; use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::time::{Duration, Instant, UNIX_EPOCH}; use vented::crypto::PublicKey; -use std::time::{Instant, Duration, UNIX_EPOCH}; -use crate::utils::settings::get_settings; -use crate::data::node_data::NodeData; -use std::path::PathBuf; -use crate::modules::nodes_refresh::settings::NodesRefreshSettings; -use crate::server::tick_context::TickContext; use vented::event::Event; +use vented::server::data::Node; +use vented::server::server_events::{NodeListPayload, NODE_LIST_REQUEST_EVENT}; +use vented::server::VentedServer; -pub mod settings; +pub mod settings; pub struct NodesRefreshModule { nodes: Arc>>, @@ -28,12 +27,15 @@ pub struct NodesRefreshModule { } impl Module for NodesRefreshModule { - fn name(&self) -> String { "node_list_refresh".to_string() } - fn init(&mut self, server: &mut VentedServer, pool: &mut ScheduledThreadPool) -> SnekcloudResult<()> { + fn init( + &mut self, + server: &mut VentedServer, + pool: &mut ScheduledThreadPool, + ) -> SnekcloudResult<()> { { let mut node_list = self.nodes.lock(); for node in server.nodes() { @@ -50,12 +52,15 @@ impl Module for NodesRefreshModule { for node in event.get_payload::().ok()?.nodes { if !nodes.contains_key(&node.id) { - nodes.insert(node.id.clone(), Node { - id: node.id, - trusted: false, - public_key: PublicKey::from(node.public_key), - address: node.address, - }); + nodes.insert( + node.id.clone(), + Node { + id: node.id, + trusted: false, + public_key: PublicKey::from(node.public_key), + address: node.address, + }, + ); new_nodes = true; } } @@ -73,20 +78,25 @@ impl Module for NodesRefreshModule { move || { if update_required.load(Ordering::Relaxed) { let nodes_folder = get_settings().node_data_dir; - nodes.lock().values().cloned().map(|node| { - if let Some(address) = node.address { - NodeData::with_addresses(node.id, vec![address], node.public_key) - } else { - NodeData::new(node.id, node.public_key) - } - }).for_each(|data| { - let mut path = nodes_folder.clone(); - path.push(PathBuf::from(format!("{}.toml", data.id))); + nodes + .lock() + .values() + .cloned() + .map(|node| { + if let Some(address) = node.address { + NodeData::with_addresses(node.id, vec![address], node.public_key) + } else { + NodeData::new(node.id, node.public_key) + } + }) + .for_each(|data| { + let mut path = nodes_folder.clone(); + path.push(PathBuf::from(format!("{}.toml", data.id))); - if let Err(e) = data.write_to_file(path) { - log::error!("Failed to write updated node data: {}", e); - } - }); + if let Err(e) = data.write_to_file(path) { + log::error!("Failed to write updated node data: {}", e); + } + }); } } }); @@ -98,11 +108,19 @@ impl Module for NodesRefreshModule { Box::new(self) } - fn tick(&mut self, mut context: TickContext, _: &mut ScheduledThreadPool) -> VentedResult<()> { + fn tick( + &mut self, + mut context: TickContext, + _: &mut ScheduledThreadPool, + ) -> SnekcloudResult<()> { if self.last_request.elapsed() > self.settings.update_interval() { - context.nodes().iter().filter(|node| node.trusted).for_each(|node| { - context.emit(node.id.clone(), Event::new(NODE_LIST_REQUEST_EVENT)); - }); + context + .nodes() + .iter() + .filter(|node| node.trusted) + .for_each(|node| { + context.emit(node.id.clone(), Event::new(NODE_LIST_REQUEST_EVENT)); + }); self.last_request = Instant::now(); } @@ -117,7 +135,7 @@ impl NodesRefreshModule { nodes: Arc::new(Mutex::new(HashMap::new())), settings: get_settings().modules.nodes_refresh, last_request: null_time, - update_required: Arc::new(AtomicBool::new(false)) + update_required: Arc::new(AtomicBool::new(false)), } } -} \ No newline at end of file +} diff --git a/src/server/mod.rs b/src/server/mod.rs index a2b55e6..8e27148 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -75,13 +75,14 @@ impl SnekcloudServer { }, ); } - for mut invocation in rx { - match self.inner.emit(invocation.target_node, invocation.event) { - Ok(_) => invocation.result.set_value(Arc::new(Ok(()))), - Err(e) => invocation - .result - .set_value(Arc::new(Err(SnekcloudError::from(e)))), - } + for invocation in rx { + let mut future = self.inner.emit(invocation.target_node, invocation.event); + let mut invocation_result = invocation.result; + + module_pool.lock().execute(move || { + let result = future.get_value(); + invocation_result.result(result.map_err(SnekcloudError::from)); + }); } Ok(()) diff --git a/src/server/tick_context.rs b/src/server/tick_context.rs index af4db83..af3076f 100644 --- a/src/server/tick_context.rs +++ b/src/server/tick_context.rs @@ -1,10 +1,11 @@ -use std::sync::mpsc::Sender; -use vented::event::Event; -use std::sync::Arc; -use crate::utils::result::SnekcloudResult; -use vented::server::data::{AsyncValue, Node}; +use crate::utils::result::SnekcloudError; use parking_lot::Mutex; use std::collections::HashMap; +use std::sync::mpsc::Sender; +use std::sync::Arc; +use vented::event::Event; +use vented::server::data::Node; +use vented::utils::sync::AsyncValue; #[derive(Clone)] pub struct TickContext { @@ -14,13 +15,17 @@ pub struct TickContext { } pub struct EventInvocation { - pub result: AsyncValue>>, + pub result: AsyncValue<(), SnekcloudError>, pub event: Event, pub target_node: String, } impl TickContext { - pub fn new(node_id: String, sender: Sender, nodes: Arc>>) -> Self { + pub fn new( + node_id: String, + sender: Sender, + nodes: Arc>>, + ) -> Self { Self { nodes, node_id, @@ -28,13 +33,19 @@ impl TickContext { } } - pub fn emit(&mut self, target_node: S, event: Event) -> AsyncValue>> { + pub fn emit( + &mut self, + target_node: S, + event: Event, + ) -> AsyncValue<(), SnekcloudError> { let value = AsyncValue::new(); - self.event_sender.send(EventInvocation { - event, - target_node: target_node.to_string(), - result: AsyncValue::clone(&value), - }).unwrap(); + self.event_sender + .send(EventInvocation { + event, + target_node: target_node.to_string(), + result: AsyncValue::clone(&value), + }) + .unwrap(); value } @@ -48,4 +59,4 @@ impl TickContext { pub fn node_id(&self) -> &String { &self.node_id } -} \ No newline at end of file +} diff --git a/src/utils/result.rs b/src/utils/result.rs index 435d35b..7dc9a57 100644 --- a/src/utils/result.rs +++ b/src/utils/result.rs @@ -1,8 +1,7 @@ -use vented::result::VentedError; +use std::error::Error; use std::fmt; use std::io; -use std::error::Error; - +use vented::utils::result::VentedError; pub type SnekcloudResult = Result; @@ -28,7 +27,7 @@ impl fmt::Display for SnekcloudError { Self::InvalidKey => write!(f, "Invalid Key!"), Self::TomlDeserializeError(e) => write!(f, "Toml Deserialization Error: {}", e), Self::TomlSerializeError(e) => write!(f, "Toml Serialization Error: {}", e), - Self::ConfigError(e) => write!(f, "Config Error: {}",e), + Self::ConfigError(e) => write!(f, "Config Error: {}", e), Self::GlobPatternError(e) => write!(f, "Glob Error {}", e), Self::JsonError(e) => write!(f, "JSON Error: {}", e), } @@ -83,4 +82,4 @@ impl From for SnekcloudError { fn from(error: serde_json::error::Error) -> Self { Self::JsonError(error) } -} \ No newline at end of file +}