Update vented to async emitter version

Signed-off-by: trivernis <trivernis@protonmail.com>
pull/1/head
trivernis 4 years ago
parent 4783aa829d
commit 32ae74fa34
Signed by: Trivernis
GPG Key ID: DFFFCC2C7A02DB45

158
Cargo.lock generated

@ -33,12 +33,39 @@ dependencies = [
"winapi", "winapi",
] ]
[[package]]
name = "arr_macro"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6a105bfda48707cf19220129e78fca01e9639433ffaef4163546ed8fb04120a5"
dependencies = [
"arr_macro_impl",
"proc-macro-hack",
]
[[package]]
name = "arr_macro_impl"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0609c78bd572f4edc74310dfb63a01f5609d53fa8b4dd7c4d98aef3b3e8d72d1"
dependencies = [
"proc-macro-hack",
"quote",
"syn",
]
[[package]] [[package]]
name = "arrayvec" name = "arrayvec"
version = "0.5.2" version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "23b62fc65de8e4e7f52534fb52b0f3ed04746ae267519eef2a83941e8085068b" checksum = "23b62fc65de8e4e7f52534fb52b0f3ed04746ae267519eef2a83941e8085068b"
[[package]]
name = "async-task"
version = "4.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e91831deabf0d6d7ec49552e489aed63b7456a7a3c46cff62adad428110b0af0"
[[package]] [[package]]
name = "atty" name = "atty"
version = "0.2.14" version = "0.2.14"
@ -209,6 +236,72 @@ version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8aebca1129a03dc6dc2b127edd729435bbc4a37e1d5f4d7513165089ceb02634" checksum = "8aebca1129a03dc6dc2b127edd729435bbc4a37e1d5f4d7513165089ceb02634"
[[package]]
name = "crossbeam-channel"
version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b153fe7cbef478c567df0f972e02e6d736db11affe43dfc9c56a9374d1adfb87"
dependencies = [
"crossbeam-utils 0.7.2",
"maybe-uninit",
]
[[package]]
name = "crossbeam-deque"
version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9f02af974daeee82218205558e51ec8768b48cf524bd01d550abe5573a608285"
dependencies = [
"crossbeam-epoch",
"crossbeam-utils 0.7.2",
"maybe-uninit",
]
[[package]]
name = "crossbeam-epoch"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "058ed274caafc1f60c4997b5fc07bf7dc7cca454af7c6e81edffe5f33f70dace"
dependencies = [
"autocfg",
"cfg-if 0.1.10",
"crossbeam-utils 0.7.2",
"lazy_static",
"maybe-uninit",
"memoffset",
"scopeguard",
]
[[package]]
name = "crossbeam-queue"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7c979cd6cfe72335896575c6b5688da489e420d36a27a0b9eb0c73db574b4a4b"
dependencies = [
"crossbeam-utils 0.6.6",
]
[[package]]
name = "crossbeam-utils"
version = "0.6.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "04973fa96e96579258a5091af6003abde64af786b860f18622b82e026cca60e6"
dependencies = [
"cfg-if 0.1.10",
"lazy_static",
]
[[package]]
name = "crossbeam-utils"
version = "0.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c3c7c73a2d1e9fc0886a08b93e98eb643461230d5f1925e4036204d5f2e261a8"
dependencies = [
"autocfg",
"cfg-if 0.1.10",
"lazy_static",
]
[[package]] [[package]]
name = "crossbeam-utils" name = "crossbeam-utils"
version = "0.8.0" version = "0.8.0"
@ -270,6 +363,24 @@ dependencies = [
"termcolor", "termcolor",
] ]
[[package]]
name = "executors"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f99e7b1533b6caa2e16120bfc652aeb087c9197b1bf419edfc8587e6022f2fc9"
dependencies = [
"arr_macro",
"async-task",
"crossbeam-channel",
"crossbeam-deque",
"crossbeam-utils 0.7.2",
"log",
"num_cpus",
"rand",
"synchronoise",
"threadpool",
]
[[package]] [[package]]
name = "fallible-iterator" name = "fallible-iterator"
version = "0.2.0" version = "0.2.0"
@ -462,12 +573,27 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ffbee8634e0d45d258acb448e7eaab3fce7a0a467395d4d9f228e3c1f01fb2e4" checksum = "ffbee8634e0d45d258acb448e7eaab3fce7a0a467395d4d9f228e3c1f01fb2e4"
[[package]]
name = "maybe-uninit"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "60302e4db3a61da70c0cb7991976248362f30319e88850c487b9b95bbf059e00"
[[package]] [[package]]
name = "memchr" name = "memchr"
version = "2.3.4" version = "2.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ee1c47aaa256ecabcaea351eae4a9b01ef39ed810004e298d2511ed284b1525" checksum = "0ee1c47aaa256ecabcaea351eae4a9b01ef39ed810004e298d2511ed284b1525"
[[package]]
name = "memoffset"
version = "0.5.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "043175f069eda7b85febe4a74abbaeff828d9f8b448515d3151a14a3542811aa"
dependencies = [
"autocfg",
]
[[package]] [[package]]
name = "nix" name = "nix"
version = "0.19.0" version = "0.19.0"
@ -606,6 +732,12 @@ dependencies = [
"version_check", "version_check",
] ]
[[package]]
name = "proc-macro-hack"
version = "0.5.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dbf0c48bc1d91375ae5c3cd81e3722dff1abcf81a30960240640d223f59fe0e5"
[[package]] [[package]]
name = "proc-macro2" name = "proc-macro2"
version = "1.0.24" version = "1.0.24"
@ -919,6 +1051,15 @@ dependencies = [
"unicode-xid", "unicode-xid",
] ]
[[package]]
name = "synchronoise"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d717ed0efc9d39ab3b642a096bc369a3e02a38a51c41845d7fe31bdad1d6eaeb"
dependencies = [
"crossbeam-queue",
]
[[package]] [[package]]
name = "synstructure" name = "synstructure"
version = "0.12.4" version = "0.12.4"
@ -958,6 +1099,15 @@ dependencies = [
"lazy_static", "lazy_static",
] ]
[[package]]
name = "threadpool"
version = "1.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d050e60b33d41c19108b32cea32164033a9013fe3b46cbd4457559bfbf77afaa"
dependencies = [
"num_cpus",
]
[[package]] [[package]]
name = "time" name = "time"
version = "0.1.44" version = "0.1.44"
@ -1026,20 +1176,20 @@ checksum = "f1bddf1187be692e79c5ffeab891132dfb0f236ed36a43c7ed39f1165ee20191"
[[package]] [[package]]
name = "vented" name = "vented"
version = "0.8.1" version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0bb03c973ba228728f10ab9480ca5b74d8dc3bd57d2da29476efa0d0332b4688" checksum = "8c9e9ae15109d0c517d65788a39e290e7edfcc5200f4aa92cbc174ad262aaba2"
dependencies = [ dependencies = [
"byteorder", "byteorder",
"crossbeam-utils", "crossbeam-utils 0.8.0",
"crypto_box", "crypto_box",
"executors",
"generic-array", "generic-array",
"log", "log",
"parking_lot", "parking_lot",
"rand", "rand",
"rmp", "rmp",
"rmp-serde", "rmp-serde",
"scheduled-thread-pool",
"serde 1.0.117", "serde 1.0.117",
"sha2", "sha2",
"typenum", "typenum",

@ -7,7 +7,7 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
vented = "0.8.1" vented = "0.9.1"
rusqlite = "0.24.1" rusqlite = "0.24.1"
rand = "0.7.3" rand = "0.7.3"
base64 = "0.13.0" base64 = "0.13.0"

@ -12,7 +12,6 @@ use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use vented::event::Event; use vented::event::Event;
use vented::result::VentedResult;
use vented::server::VentedServer; use vented::server::VentedServer;
mod payloads; mod payloads;
@ -115,7 +114,7 @@ impl Module for HeartbeatModule {
&mut self, &mut self,
mut context: TickContext, mut context: TickContext,
pool: &mut ScheduledThreadPool, pool: &mut ScheduledThreadPool,
) -> VentedResult<()> { ) -> SnekcloudResult<()> {
if self.last_tick.elapsed() > self.settings.interval() { if self.last_tick.elapsed() > self.settings.interval() {
for node in context.nodes() { for node in context.nodes() {
let mut future = context.emit( let mut future = context.emit(
@ -127,12 +126,10 @@ impl Module for HeartbeatModule {
); );
let states = Arc::clone(&self.node_states); let states = Arc::clone(&self.node_states);
pool.execute(move || { pool.execute(move || {
if let Some(value) = future.get_value_with_timeout(Duration::from_secs(10)) { if let Some(Err(e)) = future.get_value_with_timeout(Duration::from_secs(10)) {
if let Err(e) = &*value { log::debug!("Node {} is not reachable: {}", node.id, e);
log::debug!("Node {} is not reachable: {}", node.id, e); let mut states = states.lock();
let mut states = states.lock(); Self::insert_state(&mut states, node.id, NodeInfo::dead());
Self::insert_state(&mut states, node.id, NodeInfo::dead());
}
} }
}); });
} }

@ -1,15 +1,19 @@
use vented::server::VentedServer; use crate::server::tick_context::TickContext;
use crate::utils::result::SnekcloudResult; use crate::utils::result::SnekcloudResult;
use scheduled_thread_pool::ScheduledThreadPool; use scheduled_thread_pool::ScheduledThreadPool;
use vented::result::VentedResult; use vented::server::VentedServer;
use crate::server::tick_context::TickContext;
pub mod heartbeat; pub mod heartbeat;
pub mod nodes_refresh; pub mod nodes_refresh;
pub trait Module { pub trait Module {
fn name(&self) -> String; fn name(&self) -> String;
fn init(&mut self, server: &mut VentedServer, pool: &mut ScheduledThreadPool) -> SnekcloudResult<()>; fn init(
&mut self,
server: &mut VentedServer,
pool: &mut ScheduledThreadPool,
) -> SnekcloudResult<()>;
fn boxed(self) -> Box<dyn Module + Send + Sync>; fn boxed(self) -> Box<dyn Module + Send + Sync>;
fn tick(&mut self, context: TickContext, pool: &mut ScheduledThreadPool) -> VentedResult<()>; fn tick(&mut self, context: TickContext, pool: &mut ScheduledThreadPool)
-> SnekcloudResult<()>;
} }

@ -1,24 +1,23 @@
use crate::data::node_data::NodeData;
use crate::modules::nodes_refresh::settings::NodesRefreshSettings;
use crate::modules::Module; use crate::modules::Module;
use vented::result::VentedResult; use crate::server::tick_context::TickContext;
use vented::server::VentedServer;
use crate::utils::result::SnekcloudResult; use crate::utils::result::SnekcloudResult;
use scheduled_thread_pool::ScheduledThreadPool; use crate::utils::settings::get_settings;
use vented::server::server_events::{NodeListPayload, NODE_LIST_REQUEST_EVENT};
use vented::server::data::Node;
use std::sync::Arc;
use parking_lot::Mutex; use parking_lot::Mutex;
use std::collections::{HashMap}; use scheduled_thread_pool::ScheduledThreadPool;
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant, UNIX_EPOCH};
use vented::crypto::PublicKey; use vented::crypto::PublicKey;
use std::time::{Instant, Duration, UNIX_EPOCH};
use crate::utils::settings::get_settings;
use crate::data::node_data::NodeData;
use std::path::PathBuf;
use crate::modules::nodes_refresh::settings::NodesRefreshSettings;
use crate::server::tick_context::TickContext;
use vented::event::Event; use vented::event::Event;
use vented::server::data::Node;
use vented::server::server_events::{NodeListPayload, NODE_LIST_REQUEST_EVENT};
use vented::server::VentedServer;
pub mod settings; pub mod settings;
pub struct NodesRefreshModule { pub struct NodesRefreshModule {
nodes: Arc<Mutex<HashMap<String, Node>>>, nodes: Arc<Mutex<HashMap<String, Node>>>,
@ -28,12 +27,15 @@ pub struct NodesRefreshModule {
} }
impl Module for NodesRefreshModule { impl Module for NodesRefreshModule {
fn name(&self) -> String { fn name(&self) -> String {
"node_list_refresh".to_string() "node_list_refresh".to_string()
} }
fn init(&mut self, server: &mut VentedServer, pool: &mut ScheduledThreadPool) -> SnekcloudResult<()> { fn init(
&mut self,
server: &mut VentedServer,
pool: &mut ScheduledThreadPool,
) -> SnekcloudResult<()> {
{ {
let mut node_list = self.nodes.lock(); let mut node_list = self.nodes.lock();
for node in server.nodes() { for node in server.nodes() {
@ -50,12 +52,15 @@ impl Module for NodesRefreshModule {
for node in event.get_payload::<NodeListPayload>().ok()?.nodes { for node in event.get_payload::<NodeListPayload>().ok()?.nodes {
if !nodes.contains_key(&node.id) { if !nodes.contains_key(&node.id) {
nodes.insert(node.id.clone(), Node { nodes.insert(
id: node.id, node.id.clone(),
trusted: false, Node {
public_key: PublicKey::from(node.public_key), id: node.id,
address: node.address, trusted: false,
}); public_key: PublicKey::from(node.public_key),
address: node.address,
},
);
new_nodes = true; new_nodes = true;
} }
} }
@ -73,20 +78,25 @@ impl Module for NodesRefreshModule {
move || { move || {
if update_required.load(Ordering::Relaxed) { if update_required.load(Ordering::Relaxed) {
let nodes_folder = get_settings().node_data_dir; let nodes_folder = get_settings().node_data_dir;
nodes.lock().values().cloned().map(|node| { nodes
if let Some(address) = node.address { .lock()
NodeData::with_addresses(node.id, vec![address], node.public_key) .values()
} else { .cloned()
NodeData::new(node.id, node.public_key) .map(|node| {
} if let Some(address) = node.address {
}).for_each(|data| { NodeData::with_addresses(node.id, vec![address], node.public_key)
let mut path = nodes_folder.clone(); } else {
path.push(PathBuf::from(format!("{}.toml", data.id))); NodeData::new(node.id, node.public_key)
}
})
.for_each(|data| {
let mut path = nodes_folder.clone();
path.push(PathBuf::from(format!("{}.toml", data.id)));
if let Err(e) = data.write_to_file(path) { if let Err(e) = data.write_to_file(path) {
log::error!("Failed to write updated node data: {}", e); log::error!("Failed to write updated node data: {}", e);
} }
}); });
} }
} }
}); });
@ -98,11 +108,19 @@ impl Module for NodesRefreshModule {
Box::new(self) Box::new(self)
} }
fn tick(&mut self, mut context: TickContext, _: &mut ScheduledThreadPool) -> VentedResult<()> { fn tick(
&mut self,
mut context: TickContext,
_: &mut ScheduledThreadPool,
) -> SnekcloudResult<()> {
if self.last_request.elapsed() > self.settings.update_interval() { if self.last_request.elapsed() > self.settings.update_interval() {
context.nodes().iter().filter(|node| node.trusted).for_each(|node| { context
context.emit(node.id.clone(), Event::new(NODE_LIST_REQUEST_EVENT)); .nodes()
}); .iter()
.filter(|node| node.trusted)
.for_each(|node| {
context.emit(node.id.clone(), Event::new(NODE_LIST_REQUEST_EVENT));
});
self.last_request = Instant::now(); self.last_request = Instant::now();
} }
@ -117,7 +135,7 @@ impl NodesRefreshModule {
nodes: Arc::new(Mutex::new(HashMap::new())), nodes: Arc::new(Mutex::new(HashMap::new())),
settings: get_settings().modules.nodes_refresh, settings: get_settings().modules.nodes_refresh,
last_request: null_time, last_request: null_time,
update_required: Arc::new(AtomicBool::new(false)) update_required: Arc::new(AtomicBool::new(false)),
} }
} }
} }

@ -75,13 +75,14 @@ impl SnekcloudServer {
}, },
); );
} }
for mut invocation in rx { for invocation in rx {
match self.inner.emit(invocation.target_node, invocation.event) { let mut future = self.inner.emit(invocation.target_node, invocation.event);
Ok(_) => invocation.result.set_value(Arc::new(Ok(()))), let mut invocation_result = invocation.result;
Err(e) => invocation
.result module_pool.lock().execute(move || {
.set_value(Arc::new(Err(SnekcloudError::from(e)))), let result = future.get_value();
} invocation_result.result(result.map_err(SnekcloudError::from));
});
} }
Ok(()) Ok(())

@ -1,10 +1,11 @@
use std::sync::mpsc::Sender; use crate::utils::result::SnekcloudError;
use vented::event::Event;
use std::sync::Arc;
use crate::utils::result::SnekcloudResult;
use vented::server::data::{AsyncValue, Node};
use parking_lot::Mutex; use parking_lot::Mutex;
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::mpsc::Sender;
use std::sync::Arc;
use vented::event::Event;
use vented::server::data::Node;
use vented::utils::sync::AsyncValue;
#[derive(Clone)] #[derive(Clone)]
pub struct TickContext { pub struct TickContext {
@ -14,13 +15,17 @@ pub struct TickContext {
} }
pub struct EventInvocation { pub struct EventInvocation {
pub result: AsyncValue<Arc<SnekcloudResult<()>>>, pub result: AsyncValue<(), SnekcloudError>,
pub event: Event, pub event: Event,
pub target_node: String, pub target_node: String,
} }
impl TickContext { impl TickContext {
pub fn new(node_id: String, sender: Sender<EventInvocation>, nodes: Arc<Mutex<HashMap<String, Node>>>) -> Self { pub fn new(
node_id: String,
sender: Sender<EventInvocation>,
nodes: Arc<Mutex<HashMap<String, Node>>>,
) -> Self {
Self { Self {
nodes, nodes,
node_id, node_id,
@ -28,13 +33,19 @@ impl TickContext {
} }
} }
pub fn emit<S: ToString>(&mut self, target_node: S, event: Event) -> AsyncValue<Arc<SnekcloudResult<()>>> { pub fn emit<S: ToString>(
&mut self,
target_node: S,
event: Event,
) -> AsyncValue<(), SnekcloudError> {
let value = AsyncValue::new(); let value = AsyncValue::new();
self.event_sender.send(EventInvocation { self.event_sender
event, .send(EventInvocation {
target_node: target_node.to_string(), event,
result: AsyncValue::clone(&value), target_node: target_node.to_string(),
}).unwrap(); result: AsyncValue::clone(&value),
})
.unwrap();
value value
} }

@ -1,8 +1,7 @@
use vented::result::VentedError; use std::error::Error;
use std::fmt; use std::fmt;
use std::io; use std::io;
use std::error::Error; use vented::utils::result::VentedError;
pub type SnekcloudResult<T> = Result<T, SnekcloudError>; pub type SnekcloudResult<T> = Result<T, SnekcloudError>;
@ -28,7 +27,7 @@ impl fmt::Display for SnekcloudError {
Self::InvalidKey => write!(f, "Invalid Key!"), Self::InvalidKey => write!(f, "Invalid Key!"),
Self::TomlDeserializeError(e) => write!(f, "Toml Deserialization Error: {}", e), Self::TomlDeserializeError(e) => write!(f, "Toml Deserialization Error: {}", e),
Self::TomlSerializeError(e) => write!(f, "Toml Serialization Error: {}", e), Self::TomlSerializeError(e) => write!(f, "Toml Serialization Error: {}", e),
Self::ConfigError(e) => write!(f, "Config Error: {}",e), Self::ConfigError(e) => write!(f, "Config Error: {}", e),
Self::GlobPatternError(e) => write!(f, "Glob Error {}", e), Self::GlobPatternError(e) => write!(f, "Glob Error {}", e),
Self::JsonError(e) => write!(f, "JSON Error: {}", e), Self::JsonError(e) => write!(f, "JSON Error: {}", e),
} }

Loading…
Cancel
Save