From c3710fa60b87f415cb30c7ffc5cac057992c2bdc Mon Sep 17 00:00:00 2001 From: trivernis Date: Thu, 12 Nov 2020 13:12:44 +0100 Subject: [PATCH] Update vented to async version 0.11 Signed-off-by: trivernis --- Cargo.lock | 391 +++++++++++++++++++++++++++++-- Cargo.toml | 5 +- src/main.rs | 7 +- src/modules/heartbeat/mod.rs | 27 ++- src/modules/nodes_refresh/mod.rs | 42 ++-- src/server/mod.rs | 60 ++--- src/utils/settings.rs | 5 - 7 files changed, 440 insertions(+), 97 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 98345e4..201a661 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -39,6 +39,112 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "23b62fc65de8e4e7f52534fb52b0f3ed04746ae267519eef2a83941e8085068b" +[[package]] +name = "async-channel" +version = "1.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59740d83946db6a5af71ae25ddf9562c2b176b2ca42cf99a455f09f4a220d6b9" +dependencies = [ + "concurrent-queue", + "event-listener", + "futures-core", +] + +[[package]] +name = "async-executor" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eb877970c7b440ead138f6321a3b5395d6061183af779340b65e20c0fede9146" +dependencies = [ + "async-task", + "concurrent-queue", + "fastrand", + "futures-lite", + "once_cell", + "vec-arena", +] + +[[package]] +name = "async-global-executor" +version = "1.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73079b49cd26b8fd5a15f68fc7707fc78698dc2a3d61430f2a7a9430230dfa04" +dependencies = [ + "async-executor", + "async-io", + "futures-lite", + "num_cpus", + "once_cell", +] + +[[package]] +name = "async-io" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "40a0b2bb8ae20fede194e779150fe283f65a4a08461b496de546ec366b174ad9" +dependencies = [ + "concurrent-queue", + "fastrand", + "futures-lite", + "libc", + "log", + "nb-connect", + "once_cell", + "parking", + "polling", + "vec-arena", + "waker-fn", + "winapi", +] + +[[package]] +name = "async-mutex" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "479db852db25d9dbf6204e6cb6253698f175c15726470f78af0d918e99d6156e" +dependencies = [ + "event-listener", +] + +[[package]] +name = "async-std" +version = "1.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7e82538bc65a25dbdff70e4c5439d52f068048ab97cdea0acd73f131594caa1" +dependencies = [ + "async-global-executor", + "async-io", + "async-mutex", + "blocking", + "crossbeam-utils", + "futures-channel", + "futures-core", + "futures-io", + "futures-lite", + "gloo-timers", + "kv-log-macro", + "log", + "memchr", + "num_cpus", + "once_cell", + "pin-project-lite", + "pin-utils", + "slab", + "wasm-bindgen-futures", +] + +[[package]] +name = "async-task" +version = "4.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e91831deabf0d6d7ec49552e489aed63b7456a7a3c46cff62adad428110b0af0" + +[[package]] +name = "atomic-waker" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "065374052e7df7ee4047b1160cca5e1467a12351a40b3da123c870ba0b8eda2a" + [[package]] name = "atty" version = "0.2.14" @@ -77,12 +183,38 @@ dependencies = [ "generic-array", ] +[[package]] +name = "blocking" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c5e170dbede1f740736619b776d7251cb1b9095c435c34d8ca9f57fcd2f335e9" +dependencies = [ + "async-channel", + "async-task", + "atomic-waker", + "fastrand", + "futures-lite", + "once_cell", +] + +[[package]] +name = "bumpalo" +version = "3.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e8c087f005730276d1096a652e92a8bacee2e2472bcc9715a74d2bec38b5820" + [[package]] name = "byteorder" version = "1.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "08c48aae112d48ed9f069b33538ea9e3e90aa263cfa3d1c24309612b1f7472de" +[[package]] +name = "cache-padded" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "631ae5198c9be5e753e5cc215e1bd73c2b466a3565173db433f52bb9d3e66dba" + [[package]] name = "cc" version = "1.0.61" @@ -181,6 +313,15 @@ dependencies = [ "winapi", ] +[[package]] +name = "concurrent-queue" +version = "1.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30ed07550be01594c6026cff2a1d7fe9c8f683caa798e12b68694ac9e88286a3" +dependencies = [ + "cache-padded", +] + [[package]] name = "config" version = "0.10.1" @@ -209,16 +350,6 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8aebca1129a03dc6dc2b127edd729435bbc4a37e1d5f4d7513165089ceb02634" -[[package]] -name = "crossbeam-channel" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dca26ee1f8d361640700bde38b2c37d8c22b3ce2d360e1fc1c74ea4b0aa7d775" -dependencies = [ - "cfg-if 1.0.0", - "crossbeam-utils", -] - [[package]] name = "crossbeam-utils" version = "0.8.0" @@ -267,6 +398,12 @@ dependencies = [ "generic-array", ] +[[package]] +name = "event-listener" +version = "2.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7531096570974c3a9dcf9e4b8e1cede1ec26cf5046219fb3b9d897503b9be59" + [[package]] name = "fallible-iterator" version = "0.2.0" @@ -279,6 +416,15 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a" +[[package]] +name = "fastrand" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca5faf057445ce5c9d4329e382b2ce7ca38550ef3b73a5348362d5f24e0c7fe3" +dependencies = [ + "instant", +] + [[package]] name = "fern" version = "0.6.0" @@ -288,6 +434,42 @@ dependencies = [ "log", ] +[[package]] +name = "futures-channel" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b7109687aa4e177ef6fe84553af6280ef2778bdb7783ba44c9dc3399110fe64" +dependencies = [ + "futures-core", +] + +[[package]] +name = "futures-core" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "847ce131b72ffb13b6109a221da9ad97a64cbe48feb1028356b836b47b8f1748" + +[[package]] +name = "futures-io" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "611834ce18aaa1bd13c4b374f5d653e1027cf99b6b502584ff8c9a64413b30bb" + +[[package]] +name = "futures-lite" +version = "1.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e6c079abfac3ab269e2927ec048dabc89d009ebfdda6b8ee86624f30c689658" +dependencies = [ + "fastrand", + "futures-core", + "futures-io", + "memchr", + "parking", + "pin-project-lite", + "waker-fn", +] + [[package]] name = "generic-array" version = "0.14.4" @@ -315,6 +497,19 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b919933a397b79c37e33b77bb2aa3dc8eb6e165ad809e58ff75bc7db2e34574" +[[package]] +name = "gloo-timers" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47204a46aaff920a1ea58b11d03dec6f704287d27561724a4631e450654a891f" +dependencies = [ + "futures-channel", + "futures-core", + "js-sys", + "wasm-bindgen", + "web-sys", +] + [[package]] name = "hashbrown" version = "0.9.1" @@ -377,6 +572,24 @@ version = "0.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dc6f3ad7b9d11a0c00842ff8de1b60ee58661048eb8049ed33c73594f359d7e6" +[[package]] +name = "js-sys" +version = "0.3.45" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca059e81d9486668f12d455a4ea6daa600bd408134cd17e3d3fb5a32d1f016f8" +dependencies = [ + "wasm-bindgen", +] + +[[package]] +name = "kv-log-macro" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0de8b303297635ad57c9f5059fd9cee7a47f8e8daa09df0fcd07dd39fb22977f" +dependencies = [ + "log", +] + [[package]] name = "lazy_static" version = "1.4.0" @@ -468,6 +681,16 @@ version = "2.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ee1c47aaa256ecabcaea351eae4a9b01ef39ed810004e298d2511ed284b1525" +[[package]] +name = "nb-connect" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8123a81538e457d44b933a02faf885d3fe8408806b23fa700e8f01c6c3a98998" +dependencies = [ + "libc", + "winapi", +] + [[package]] name = "nix" version = "0.19.0" @@ -529,12 +752,24 @@ dependencies = [ "libc", ] +[[package]] +name = "once_cell" +version = "1.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f53cef67919d7d247eb9a2f128ca9e522789967ef1eb4ccd8c71a95a8aedf596" + [[package]] name = "opaque-debug" version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5" +[[package]] +name = "parking" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "427c3892f9e783d91cc128285287e70a59e206ca452770ece88a76f7a3eddd72" + [[package]] name = "parking_lot" version = "0.11.0" @@ -561,12 +796,37 @@ dependencies = [ "winapi", ] +[[package]] +name = "pin-project-lite" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c917123afa01924fc84bb20c4c03f004d9c38e5127e3c039bbf7f4b9c76a2f6b" + +[[package]] +name = "pin-utils" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" + [[package]] name = "pkg-config" version = "0.3.19" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3831453b3449ceb48b6d9c7ad7c96d5ea673e9b470a1dc578c2ce6521230884c" +[[package]] +name = "polling" +version = "2.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2a7bc6b2a29e632e45451c941832803a18cce6781db04de8a04696cdca8bde4" +dependencies = [ + "cfg-if 0.1.10", + "libc", + "log", + "wepoll-sys", + "winapi", +] + [[package]] name = "poly1305" version = "0.6.1" @@ -834,6 +1094,12 @@ dependencies = [ "opaque-debug", ] +[[package]] +name = "slab" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8" + [[package]] name = "smallvec" version = "1.4.2" @@ -844,6 +1110,7 @@ checksum = "fbee7696b84bbf3d89a1c2eccff0850e3047ed46bfcd2e92c29a2d074d57e252" name = "snekcloud-server" version = "0.1.0" dependencies = [ + "async-std", "base64", "chrono", "colored", @@ -1010,6 +1277,12 @@ version = "0.2.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6454029bf181f092ad1b853286f23e2c507d8e8194d01d92da4a55c274a5508c" +[[package]] +name = "vec-arena" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eafc1b9b2dfc6f5529177b62cf806484db55b32dc7c9658a118e11bbeb33061d" + [[package]] name = "vec_map" version = "0.8.2" @@ -1018,12 +1291,12 @@ checksum = "f1bddf1187be692e79c5ffeab891132dfb0f236ed36a43c7ed39f1165ee20191" [[package]] name = "vented" -version = "0.10.5" +version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7465a4d13f2b81be52d2902753c7a47edd342e6b398a23df73109f32cdad59c9" +checksum = "33386edc15f14833ecb3b40f3ec75052684202ad892a916e8baf603d686ae543" dependencies = [ + "async-std", "byteorder", - "crossbeam-channel", "crossbeam-utils", "crypto_box", "generic-array", @@ -1032,7 +1305,6 @@ dependencies = [ "rand", "rmp", "rmp-serde", - "scheduled-thread-pool", "serde 1.0.117", "sha2", "typenum", @@ -1045,6 +1317,12 @@ version = "0.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b5a972e5669d67ba988ce3dc826706fb0a8b01471c088cb0b6110b805cc36aed" +[[package]] +name = "waker-fn" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d5b2c62b4012a3e1eca5a7e077d13b3bf498c4073e33ccd58626607748ceeca" + [[package]] name = "wasi" version = "0.9.0+wasi-snapshot-preview1" @@ -1057,6 +1335,91 @@ version = "0.10.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1a143597ca7c7793eff794def352d41792a93c481eb1042423ff7ff72ba2c31f" +[[package]] +name = "wasm-bindgen" +version = "0.2.68" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ac64ead5ea5f05873d7c12b545865ca2b8d28adfc50a49b84770a3a97265d42" +dependencies = [ + "cfg-if 0.1.10", + "wasm-bindgen-macro", +] + +[[package]] +name = "wasm-bindgen-backend" +version = "0.2.68" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f22b422e2a757c35a73774860af8e112bff612ce6cb604224e8e47641a9e4f68" +dependencies = [ + "bumpalo", + "lazy_static", + "log", + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-futures" +version = "0.4.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b7866cab0aa01de1edf8b5d7936938a7e397ee50ce24119aef3e1eaa3b6171da" +dependencies = [ + "cfg-if 0.1.10", + "js-sys", + "wasm-bindgen", + "web-sys", +] + +[[package]] +name = "wasm-bindgen-macro" +version = "0.2.68" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6b13312a745c08c469f0b292dd2fcd6411dba5f7160f593da6ef69b64e407038" +dependencies = [ + "quote", + "wasm-bindgen-macro-support", +] + +[[package]] +name = "wasm-bindgen-macro-support" +version = "0.2.68" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f249f06ef7ee334cc3b8ff031bfc11ec99d00f34d86da7498396dc1e3b1498fe" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-backend", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-shared" +version = "0.2.68" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d649a3145108d7d3fbcde896a468d1bd636791823c9921135218ad89be08307" + +[[package]] +name = "web-sys" +version = "0.3.45" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4bf6ef87ad7ae8008e15a355ce696bed26012b7caa21605188cfd8214ab51e2d" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + +[[package]] +name = "wepoll-sys" +version = "3.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fcb14dea929042224824779fbc82d9fab8d2e6d3cbc0ac404de8edf489e77ff" +dependencies = [ + "cc", +] + [[package]] name = "winapi" version = "0.3.9" diff --git a/Cargo.toml b/Cargo.toml index c37167f..d7fd4ab 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,7 +11,7 @@ path = "src/main.rs" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -vented = "0.10.5" +vented = "0.11.0" rusqlite = "0.24.1" rand = "0.7.3" base64 = "0.13.0" @@ -31,4 +31,5 @@ lazy_static = "1.4.0" parking_lot = "0.11.0" serde_json = "1.0.59" fern = "0.6.0" -regex = "1.4.2" \ No newline at end of file +regex = "1.4.2" +async-std = "1.7.0" \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index d102b27..4657918 100644 --- a/src/main.rs +++ b/src/main.rs @@ -107,12 +107,7 @@ fn start_server(_options: Opt, settings: &Settings) -> SnekcloudResult<()> { .join(PathBuf::from("local.toml")), )?; - let mut server = SnekcloudServer::new( - settings.node_id.clone(), - private_key, - keys, - settings.num_threads, - ); + let mut server = SnekcloudServer::new(settings.node_id.clone(), private_key, keys); for address in &settings.listen_addresses { server.add_listen_address(address.clone()); diff --git a/src/modules/heartbeat/mod.rs b/src/modules/heartbeat/mod.rs index 06c2296..e9b2a64 100644 --- a/src/modules/heartbeat/mod.rs +++ b/src/modules/heartbeat/mod.rs @@ -75,18 +75,21 @@ impl Module for HeartbeatModule { let node_states = Arc::clone(&self.node_states); move |event| { - let payload = event.get_payload::().unwrap(); - let latency = payload.get_beat_time().elapsed().ok()?.as_millis(); - log::debug!("Latency to node {} is {} ms", payload.node_id, latency); - - let mut states = node_states.lock(); - Self::insert_state( - &mut states, - payload.node_id, - NodeInfo::alive(latency as u64), - ); - - None + let node_states = Arc::clone(&node_states); + Box::pin(async move { + let payload = event.get_payload::().unwrap(); + let latency = payload.get_beat_time().elapsed().ok()?.as_millis(); + log::debug!("Latency to node {} is {} ms", payload.node_id, latency); + + let mut states = node_states.lock(); + Self::insert_state( + &mut states, + payload.node_id, + NodeInfo::alive(latency as u64), + ); + + None + }) } }); if let Some(output) = &self.settings.output_file { diff --git a/src/modules/nodes_refresh/mod.rs b/src/modules/nodes_refresh/mod.rs index 04677de..5854e2d 100644 --- a/src/modules/nodes_refresh/mod.rs +++ b/src/modules/nodes_refresh/mod.rs @@ -47,28 +47,32 @@ impl Module for NodesRefreshModule { let update_required = Arc::clone(&self.update_required); move |event| { - let mut nodes = nodes.lock(); - let mut new_nodes = false; + let nodes = Arc::clone(&nodes); + let update_required = Arc::clone(&update_required); + Box::pin(async move { + let mut nodes = nodes.lock(); + let mut new_nodes = false; - for node in event.get_payload::().ok()?.nodes { - if !nodes.contains_key(&node.id) { - nodes.insert( - node.id.clone(), - Node { - id: node.id, - trusted: false, - public_key: PublicKey::from(node.public_key), - addresses: node.addresses, - }, - ); - new_nodes = true; + for node in event.get_payload::().ok()?.nodes { + if !nodes.contains_key(&node.id) { + nodes.insert( + node.id.clone(), + Node { + id: node.id, + trusted: false, + public_key: PublicKey::from(node.public_key), + addresses: node.addresses, + }, + ); + new_nodes = true; + } } - } - if new_nodes { - update_required.store(true, Ordering::Relaxed) - } - None + if new_nodes { + update_required.store(true, Ordering::Relaxed) + } + None + }) } }); pool.execute_at_fixed_rate(Duration::from_secs(10), self.settings.update_interval(), { diff --git a/src/server/mod.rs b/src/server/mod.rs index a05e0da..33452ef 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -1,20 +1,19 @@ use crate::modules::Module; -use crate::server::tick_context::TickContext; +use crate::server::tick_context::{EventInvocation, TickContext}; use crate::utils::result::{SnekcloudError, SnekcloudResult}; use crate::utils::settings::get_settings; use parking_lot::Mutex; use scheduled_thread_pool::ScheduledThreadPool; -use std::cmp::max; + +use async_std::task; use std::collections::HashMap; use std::mem; -use std::sync::mpsc::channel; +use std::sync::mpsc::{channel, Receiver}; use std::sync::Arc; use std::time::Duration; use vented::server::data::Node; use vented::server::VentedServer; use vented::stream::SecretKey; -use vented::utils::result::VentedError; -use vented::WaitGroup; pub mod tick_context; @@ -23,26 +22,16 @@ const SERVER_TICK_RATE_MS: u64 = 10; pub struct SnekcloudServer { inner: VentedServer, listen_addresses: Vec, - listeners: Vec, module_pool: HashMap>>, modules: HashMap>, } impl SnekcloudServer { /// Creates a new snekcloud server with the provided keys and number of threads - pub fn new(id: String, private_key: SecretKey, keys: Vec, num_threads: usize) -> Self { - let num_threads = max(max(num_cpus::get(), num_threads), 4); + pub fn new(id: String, private_key: SecretKey, keys: Vec) -> Self { Self { - inner: VentedServer::new( - id, - private_key, - keys, - get_settings().timeouts(), - num_threads * 2, - num_threads * 10, - ), + inner: VentedServer::new(id, private_key, keys, get_settings().timeouts()), listen_addresses: Vec::new(), - listeners: Vec::new(), module_pool: HashMap::new(), modules: HashMap::new(), } @@ -56,13 +45,12 @@ impl SnekcloudServer { /// Starts listening on all addresses and runs the module tick loop pub fn run(&mut self) -> SnekcloudResult<()> { for address in &self.listen_addresses { - self.listeners.push(self.inner.listen(address.clone())) + self.inner.listen(address.clone()) } let modules = mem::take(&mut self.modules); let (tx, rx) = channel(); let tick_context = TickContext::new(self.inner.node_id(), tx, self.inner.nodes_ref()); - let node_count = self.inner.nodes().len(); for (name, mut module) in modules { self.module_pool @@ -85,31 +73,25 @@ impl SnekcloudServer { }, ); } - let invocation_pool = ScheduledThreadPool::new(node_count); - for invocation in rx { - let mut future = self - .inner - .emit(invocation.target_node.clone(), invocation.event); - let mut invocation_result = invocation.result; - let node_id = invocation.target_node; - invocation_pool.execute(move || { - let result = future.get_value_with_timeout(Duration::from_secs(60)); - - if let Some(result) = result { - invocation_result.result(result.map_err(SnekcloudError::from)); - } else { - log::error!("Failed to send event: Timeout after 5s"); - invocation_result.reject(SnekcloudError::Vented(VentedError::UnreachableNode( - node_id, - ))); - } - }); - } + task::block_on(self.handle_invocations(rx)); Ok(()) } + /// Handles invocations + async fn handle_invocations(&self, rx: Receiver) { + for mut invocation in rx { + let result = self + .inner + .emit(invocation.target_node.clone(), invocation.event) + .await; + invocation + .result + .result(result.map_err(SnekcloudError::from)); + } + } + /// Registers a module on the server pub fn register_module( &mut self, diff --git a/src/utils/settings.rs b/src/utils/settings.rs index 21426a7..883ce46 100644 --- a/src/utils/settings.rs +++ b/src/utils/settings.rs @@ -24,7 +24,6 @@ pub struct Settings { pub node_id: String, pub private_key: PathBuf, pub node_data_dir: PathBuf, - pub num_threads: usize, pub trusted_nodes: Vec, pub send_timeout_secs: u64, pub redirect_timeout_secs: u64, @@ -48,7 +47,6 @@ impl Default for Settings { node_data_dir: PathBuf::from("nodes"), log_folder: PathBuf::from("logs"), trusted_nodes: vec![], - num_threads: num_cpus::get(), send_timeout_secs: 5, redirect_timeout_secs: 20, modules: ModuleSettings::default(), @@ -88,9 +86,6 @@ impl ValidateSettings for Settings { if !validate_node_id(&self.node_id) { panic!(format!("Invalid NodeID {}", self.node_id)); } - if self.num_threads == 0 { - panic!("Thread number must be greater than 0") - } self.modules.validate(); } }