Update vented to async version 0.11

Signed-off-by: trivernis <trivernis@protonmail.com>
main
trivernis 4 years ago
parent efe6a9fce8
commit c3710fa60b
Signed by: Trivernis
GPG Key ID: DFFFCC2C7A02DB45

391
Cargo.lock generated

@ -39,6 +39,112 @@ version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "23b62fc65de8e4e7f52534fb52b0f3ed04746ae267519eef2a83941e8085068b" checksum = "23b62fc65de8e4e7f52534fb52b0f3ed04746ae267519eef2a83941e8085068b"
[[package]]
name = "async-channel"
version = "1.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "59740d83946db6a5af71ae25ddf9562c2b176b2ca42cf99a455f09f4a220d6b9"
dependencies = [
"concurrent-queue",
"event-listener",
"futures-core",
]
[[package]]
name = "async-executor"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eb877970c7b440ead138f6321a3b5395d6061183af779340b65e20c0fede9146"
dependencies = [
"async-task",
"concurrent-queue",
"fastrand",
"futures-lite",
"once_cell",
"vec-arena",
]
[[package]]
name = "async-global-executor"
version = "1.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "73079b49cd26b8fd5a15f68fc7707fc78698dc2a3d61430f2a7a9430230dfa04"
dependencies = [
"async-executor",
"async-io",
"futures-lite",
"num_cpus",
"once_cell",
]
[[package]]
name = "async-io"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "40a0b2bb8ae20fede194e779150fe283f65a4a08461b496de546ec366b174ad9"
dependencies = [
"concurrent-queue",
"fastrand",
"futures-lite",
"libc",
"log",
"nb-connect",
"once_cell",
"parking",
"polling",
"vec-arena",
"waker-fn",
"winapi",
]
[[package]]
name = "async-mutex"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "479db852db25d9dbf6204e6cb6253698f175c15726470f78af0d918e99d6156e"
dependencies = [
"event-listener",
]
[[package]]
name = "async-std"
version = "1.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a7e82538bc65a25dbdff70e4c5439d52f068048ab97cdea0acd73f131594caa1"
dependencies = [
"async-global-executor",
"async-io",
"async-mutex",
"blocking",
"crossbeam-utils",
"futures-channel",
"futures-core",
"futures-io",
"futures-lite",
"gloo-timers",
"kv-log-macro",
"log",
"memchr",
"num_cpus",
"once_cell",
"pin-project-lite",
"pin-utils",
"slab",
"wasm-bindgen-futures",
]
[[package]]
name = "async-task"
version = "4.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e91831deabf0d6d7ec49552e489aed63b7456a7a3c46cff62adad428110b0af0"
[[package]]
name = "atomic-waker"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "065374052e7df7ee4047b1160cca5e1467a12351a40b3da123c870ba0b8eda2a"
[[package]] [[package]]
name = "atty" name = "atty"
version = "0.2.14" version = "0.2.14"
@ -77,12 +183,38 @@ dependencies = [
"generic-array", "generic-array",
] ]
[[package]]
name = "blocking"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c5e170dbede1f740736619b776d7251cb1b9095c435c34d8ca9f57fcd2f335e9"
dependencies = [
"async-channel",
"async-task",
"atomic-waker",
"fastrand",
"futures-lite",
"once_cell",
]
[[package]]
name = "bumpalo"
version = "3.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2e8c087f005730276d1096a652e92a8bacee2e2472bcc9715a74d2bec38b5820"
[[package]] [[package]]
name = "byteorder" name = "byteorder"
version = "1.3.4" version = "1.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "08c48aae112d48ed9f069b33538ea9e3e90aa263cfa3d1c24309612b1f7472de" checksum = "08c48aae112d48ed9f069b33538ea9e3e90aa263cfa3d1c24309612b1f7472de"
[[package]]
name = "cache-padded"
version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "631ae5198c9be5e753e5cc215e1bd73c2b466a3565173db433f52bb9d3e66dba"
[[package]] [[package]]
name = "cc" name = "cc"
version = "1.0.61" version = "1.0.61"
@ -181,6 +313,15 @@ dependencies = [
"winapi", "winapi",
] ]
[[package]]
name = "concurrent-queue"
version = "1.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "30ed07550be01594c6026cff2a1d7fe9c8f683caa798e12b68694ac9e88286a3"
dependencies = [
"cache-padded",
]
[[package]] [[package]]
name = "config" name = "config"
version = "0.10.1" version = "0.10.1"
@ -209,16 +350,6 @@ version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8aebca1129a03dc6dc2b127edd729435bbc4a37e1d5f4d7513165089ceb02634" checksum = "8aebca1129a03dc6dc2b127edd729435bbc4a37e1d5f4d7513165089ceb02634"
[[package]]
name = "crossbeam-channel"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dca26ee1f8d361640700bde38b2c37d8c22b3ce2d360e1fc1c74ea4b0aa7d775"
dependencies = [
"cfg-if 1.0.0",
"crossbeam-utils",
]
[[package]] [[package]]
name = "crossbeam-utils" name = "crossbeam-utils"
version = "0.8.0" version = "0.8.0"
@ -267,6 +398,12 @@ dependencies = [
"generic-array", "generic-array",
] ]
[[package]]
name = "event-listener"
version = "2.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7531096570974c3a9dcf9e4b8e1cede1ec26cf5046219fb3b9d897503b9be59"
[[package]] [[package]]
name = "fallible-iterator" name = "fallible-iterator"
version = "0.2.0" version = "0.2.0"
@ -279,6 +416,15 @@ version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a" checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a"
[[package]]
name = "fastrand"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ca5faf057445ce5c9d4329e382b2ce7ca38550ef3b73a5348362d5f24e0c7fe3"
dependencies = [
"instant",
]
[[package]] [[package]]
name = "fern" name = "fern"
version = "0.6.0" version = "0.6.0"
@ -288,6 +434,42 @@ dependencies = [
"log", "log",
] ]
[[package]]
name = "futures-channel"
version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4b7109687aa4e177ef6fe84553af6280ef2778bdb7783ba44c9dc3399110fe64"
dependencies = [
"futures-core",
]
[[package]]
name = "futures-core"
version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "847ce131b72ffb13b6109a221da9ad97a64cbe48feb1028356b836b47b8f1748"
[[package]]
name = "futures-io"
version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "611834ce18aaa1bd13c4b374f5d653e1027cf99b6b502584ff8c9a64413b30bb"
[[package]]
name = "futures-lite"
version = "1.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5e6c079abfac3ab269e2927ec048dabc89d009ebfdda6b8ee86624f30c689658"
dependencies = [
"fastrand",
"futures-core",
"futures-io",
"memchr",
"parking",
"pin-project-lite",
"waker-fn",
]
[[package]] [[package]]
name = "generic-array" name = "generic-array"
version = "0.14.4" version = "0.14.4"
@ -315,6 +497,19 @@ version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b919933a397b79c37e33b77bb2aa3dc8eb6e165ad809e58ff75bc7db2e34574" checksum = "9b919933a397b79c37e33b77bb2aa3dc8eb6e165ad809e58ff75bc7db2e34574"
[[package]]
name = "gloo-timers"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "47204a46aaff920a1ea58b11d03dec6f704287d27561724a4631e450654a891f"
dependencies = [
"futures-channel",
"futures-core",
"js-sys",
"wasm-bindgen",
"web-sys",
]
[[package]] [[package]]
name = "hashbrown" name = "hashbrown"
version = "0.9.1" version = "0.9.1"
@ -377,6 +572,24 @@ version = "0.4.6"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dc6f3ad7b9d11a0c00842ff8de1b60ee58661048eb8049ed33c73594f359d7e6" checksum = "dc6f3ad7b9d11a0c00842ff8de1b60ee58661048eb8049ed33c73594f359d7e6"
[[package]]
name = "js-sys"
version = "0.3.45"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ca059e81d9486668f12d455a4ea6daa600bd408134cd17e3d3fb5a32d1f016f8"
dependencies = [
"wasm-bindgen",
]
[[package]]
name = "kv-log-macro"
version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0de8b303297635ad57c9f5059fd9cee7a47f8e8daa09df0fcd07dd39fb22977f"
dependencies = [
"log",
]
[[package]] [[package]]
name = "lazy_static" name = "lazy_static"
version = "1.4.0" version = "1.4.0"
@ -468,6 +681,16 @@ version = "2.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ee1c47aaa256ecabcaea351eae4a9b01ef39ed810004e298d2511ed284b1525" checksum = "0ee1c47aaa256ecabcaea351eae4a9b01ef39ed810004e298d2511ed284b1525"
[[package]]
name = "nb-connect"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8123a81538e457d44b933a02faf885d3fe8408806b23fa700e8f01c6c3a98998"
dependencies = [
"libc",
"winapi",
]
[[package]] [[package]]
name = "nix" name = "nix"
version = "0.19.0" version = "0.19.0"
@ -529,12 +752,24 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "once_cell"
version = "1.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f53cef67919d7d247eb9a2f128ca9e522789967ef1eb4ccd8c71a95a8aedf596"
[[package]] [[package]]
name = "opaque-debug" name = "opaque-debug"
version = "0.3.0" version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5" checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5"
[[package]]
name = "parking"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "427c3892f9e783d91cc128285287e70a59e206ca452770ece88a76f7a3eddd72"
[[package]] [[package]]
name = "parking_lot" name = "parking_lot"
version = "0.11.0" version = "0.11.0"
@ -561,12 +796,37 @@ dependencies = [
"winapi", "winapi",
] ]
[[package]]
name = "pin-project-lite"
version = "0.1.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c917123afa01924fc84bb20c4c03f004d9c38e5127e3c039bbf7f4b9c76a2f6b"
[[package]]
name = "pin-utils"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]] [[package]]
name = "pkg-config" name = "pkg-config"
version = "0.3.19" version = "0.3.19"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3831453b3449ceb48b6d9c7ad7c96d5ea673e9b470a1dc578c2ce6521230884c" checksum = "3831453b3449ceb48b6d9c7ad7c96d5ea673e9b470a1dc578c2ce6521230884c"
[[package]]
name = "polling"
version = "2.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2a7bc6b2a29e632e45451c941832803a18cce6781db04de8a04696cdca8bde4"
dependencies = [
"cfg-if 0.1.10",
"libc",
"log",
"wepoll-sys",
"winapi",
]
[[package]] [[package]]
name = "poly1305" name = "poly1305"
version = "0.6.1" version = "0.6.1"
@ -834,6 +1094,12 @@ dependencies = [
"opaque-debug", "opaque-debug",
] ]
[[package]]
name = "slab"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8"
[[package]] [[package]]
name = "smallvec" name = "smallvec"
version = "1.4.2" version = "1.4.2"
@ -844,6 +1110,7 @@ checksum = "fbee7696b84bbf3d89a1c2eccff0850e3047ed46bfcd2e92c29a2d074d57e252"
name = "snekcloud-server" name = "snekcloud-server"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"async-std",
"base64", "base64",
"chrono", "chrono",
"colored", "colored",
@ -1010,6 +1277,12 @@ version = "0.2.10"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6454029bf181f092ad1b853286f23e2c507d8e8194d01d92da4a55c274a5508c" checksum = "6454029bf181f092ad1b853286f23e2c507d8e8194d01d92da4a55c274a5508c"
[[package]]
name = "vec-arena"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eafc1b9b2dfc6f5529177b62cf806484db55b32dc7c9658a118e11bbeb33061d"
[[package]] [[package]]
name = "vec_map" name = "vec_map"
version = "0.8.2" version = "0.8.2"
@ -1018,12 +1291,12 @@ checksum = "f1bddf1187be692e79c5ffeab891132dfb0f236ed36a43c7ed39f1165ee20191"
[[package]] [[package]]
name = "vented" name = "vented"
version = "0.10.5" version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7465a4d13f2b81be52d2902753c7a47edd342e6b398a23df73109f32cdad59c9" checksum = "33386edc15f14833ecb3b40f3ec75052684202ad892a916e8baf603d686ae543"
dependencies = [ dependencies = [
"async-std",
"byteorder", "byteorder",
"crossbeam-channel",
"crossbeam-utils", "crossbeam-utils",
"crypto_box", "crypto_box",
"generic-array", "generic-array",
@ -1032,7 +1305,6 @@ dependencies = [
"rand", "rand",
"rmp", "rmp",
"rmp-serde", "rmp-serde",
"scheduled-thread-pool",
"serde 1.0.117", "serde 1.0.117",
"sha2", "sha2",
"typenum", "typenum",
@ -1045,6 +1317,12 @@ version = "0.9.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b5a972e5669d67ba988ce3dc826706fb0a8b01471c088cb0b6110b805cc36aed" checksum = "b5a972e5669d67ba988ce3dc826706fb0a8b01471c088cb0b6110b805cc36aed"
[[package]]
name = "waker-fn"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9d5b2c62b4012a3e1eca5a7e077d13b3bf498c4073e33ccd58626607748ceeca"
[[package]] [[package]]
name = "wasi" name = "wasi"
version = "0.9.0+wasi-snapshot-preview1" version = "0.9.0+wasi-snapshot-preview1"
@ -1057,6 +1335,91 @@ version = "0.10.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1a143597ca7c7793eff794def352d41792a93c481eb1042423ff7ff72ba2c31f" checksum = "1a143597ca7c7793eff794def352d41792a93c481eb1042423ff7ff72ba2c31f"
[[package]]
name = "wasm-bindgen"
version = "0.2.68"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1ac64ead5ea5f05873d7c12b545865ca2b8d28adfc50a49b84770a3a97265d42"
dependencies = [
"cfg-if 0.1.10",
"wasm-bindgen-macro",
]
[[package]]
name = "wasm-bindgen-backend"
version = "0.2.68"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f22b422e2a757c35a73774860af8e112bff612ce6cb604224e8e47641a9e4f68"
dependencies = [
"bumpalo",
"lazy_static",
"log",
"proc-macro2",
"quote",
"syn",
"wasm-bindgen-shared",
]
[[package]]
name = "wasm-bindgen-futures"
version = "0.4.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b7866cab0aa01de1edf8b5d7936938a7e397ee50ce24119aef3e1eaa3b6171da"
dependencies = [
"cfg-if 0.1.10",
"js-sys",
"wasm-bindgen",
"web-sys",
]
[[package]]
name = "wasm-bindgen-macro"
version = "0.2.68"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6b13312a745c08c469f0b292dd2fcd6411dba5f7160f593da6ef69b64e407038"
dependencies = [
"quote",
"wasm-bindgen-macro-support",
]
[[package]]
name = "wasm-bindgen-macro-support"
version = "0.2.68"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f249f06ef7ee334cc3b8ff031bfc11ec99d00f34d86da7498396dc1e3b1498fe"
dependencies = [
"proc-macro2",
"quote",
"syn",
"wasm-bindgen-backend",
"wasm-bindgen-shared",
]
[[package]]
name = "wasm-bindgen-shared"
version = "0.2.68"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1d649a3145108d7d3fbcde896a468d1bd636791823c9921135218ad89be08307"
[[package]]
name = "web-sys"
version = "0.3.45"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4bf6ef87ad7ae8008e15a355ce696bed26012b7caa21605188cfd8214ab51e2d"
dependencies = [
"js-sys",
"wasm-bindgen",
]
[[package]]
name = "wepoll-sys"
version = "3.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0fcb14dea929042224824779fbc82d9fab8d2e6d3cbc0ac404de8edf489e77ff"
dependencies = [
"cc",
]
[[package]] [[package]]
name = "winapi" name = "winapi"
version = "0.3.9" version = "0.3.9"

@ -11,7 +11,7 @@ path = "src/main.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
vented = "0.10.5" vented = "0.11.0"
rusqlite = "0.24.1" rusqlite = "0.24.1"
rand = "0.7.3" rand = "0.7.3"
base64 = "0.13.0" base64 = "0.13.0"
@ -32,3 +32,4 @@ parking_lot = "0.11.0"
serde_json = "1.0.59" serde_json = "1.0.59"
fern = "0.6.0" fern = "0.6.0"
regex = "1.4.2" regex = "1.4.2"
async-std = "1.7.0"

@ -107,12 +107,7 @@ fn start_server(_options: Opt, settings: &Settings) -> SnekcloudResult<()> {
.join(PathBuf::from("local.toml")), .join(PathBuf::from("local.toml")),
)?; )?;
let mut server = SnekcloudServer::new( let mut server = SnekcloudServer::new(settings.node_id.clone(), private_key, keys);
settings.node_id.clone(),
private_key,
keys,
settings.num_threads,
);
for address in &settings.listen_addresses { for address in &settings.listen_addresses {
server.add_listen_address(address.clone()); server.add_listen_address(address.clone());

@ -75,18 +75,21 @@ impl Module for HeartbeatModule {
let node_states = Arc::clone(&self.node_states); let node_states = Arc::clone(&self.node_states);
move |event| { move |event| {
let payload = event.get_payload::<HeartbeatPayload>().unwrap(); let node_states = Arc::clone(&node_states);
let latency = payload.get_beat_time().elapsed().ok()?.as_millis(); Box::pin(async move {
log::debug!("Latency to node {} is {} ms", payload.node_id, latency); let payload = event.get_payload::<HeartbeatPayload>().unwrap();
let latency = payload.get_beat_time().elapsed().ok()?.as_millis();
let mut states = node_states.lock(); log::debug!("Latency to node {} is {} ms", payload.node_id, latency);
Self::insert_state(
&mut states, let mut states = node_states.lock();
payload.node_id, Self::insert_state(
NodeInfo::alive(latency as u64), &mut states,
); payload.node_id,
NodeInfo::alive(latency as u64),
None );
None
})
} }
}); });
if let Some(output) = &self.settings.output_file { if let Some(output) = &self.settings.output_file {

@ -47,28 +47,32 @@ impl Module for NodesRefreshModule {
let update_required = Arc::clone(&self.update_required); let update_required = Arc::clone(&self.update_required);
move |event| { move |event| {
let mut nodes = nodes.lock(); let nodes = Arc::clone(&nodes);
let mut new_nodes = false; let update_required = Arc::clone(&update_required);
Box::pin(async move {
let mut nodes = nodes.lock();
let mut new_nodes = false;
for node in event.get_payload::<NodeListPayload>().ok()?.nodes { for node in event.get_payload::<NodeListPayload>().ok()?.nodes {
if !nodes.contains_key(&node.id) { if !nodes.contains_key(&node.id) {
nodes.insert( nodes.insert(
node.id.clone(), node.id.clone(),
Node { Node {
id: node.id, id: node.id,
trusted: false, trusted: false,
public_key: PublicKey::from(node.public_key), public_key: PublicKey::from(node.public_key),
addresses: node.addresses, addresses: node.addresses,
}, },
); );
new_nodes = true; new_nodes = true;
}
} }
}
if new_nodes { if new_nodes {
update_required.store(true, Ordering::Relaxed) update_required.store(true, Ordering::Relaxed)
} }
None None
})
} }
}); });
pool.execute_at_fixed_rate(Duration::from_secs(10), self.settings.update_interval(), { pool.execute_at_fixed_rate(Duration::from_secs(10), self.settings.update_interval(), {

@ -1,20 +1,19 @@
use crate::modules::Module; use crate::modules::Module;
use crate::server::tick_context::TickContext; use crate::server::tick_context::{EventInvocation, TickContext};
use crate::utils::result::{SnekcloudError, SnekcloudResult}; use crate::utils::result::{SnekcloudError, SnekcloudResult};
use crate::utils::settings::get_settings; use crate::utils::settings::get_settings;
use parking_lot::Mutex; use parking_lot::Mutex;
use scheduled_thread_pool::ScheduledThreadPool; use scheduled_thread_pool::ScheduledThreadPool;
use std::cmp::max;
use async_std::task;
use std::collections::HashMap; use std::collections::HashMap;
use std::mem; use std::mem;
use std::sync::mpsc::channel; use std::sync::mpsc::{channel, Receiver};
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use vented::server::data::Node; use vented::server::data::Node;
use vented::server::VentedServer; use vented::server::VentedServer;
use vented::stream::SecretKey; use vented::stream::SecretKey;
use vented::utils::result::VentedError;
use vented::WaitGroup;
pub mod tick_context; pub mod tick_context;
@ -23,26 +22,16 @@ const SERVER_TICK_RATE_MS: u64 = 10;
pub struct SnekcloudServer { pub struct SnekcloudServer {
inner: VentedServer, inner: VentedServer,
listen_addresses: Vec<String>, listen_addresses: Vec<String>,
listeners: Vec<WaitGroup>,
module_pool: HashMap<String, Arc<Mutex<ScheduledThreadPool>>>, module_pool: HashMap<String, Arc<Mutex<ScheduledThreadPool>>>,
modules: HashMap<String, Box<dyn Module + Send + Sync>>, modules: HashMap<String, Box<dyn Module + Send + Sync>>,
} }
impl SnekcloudServer { impl SnekcloudServer {
/// Creates a new snekcloud server with the provided keys and number of threads /// Creates a new snekcloud server with the provided keys and number of threads
pub fn new(id: String, private_key: SecretKey, keys: Vec<Node>, num_threads: usize) -> Self { pub fn new(id: String, private_key: SecretKey, keys: Vec<Node>) -> Self {
let num_threads = max(max(num_cpus::get(), num_threads), 4);
Self { Self {
inner: VentedServer::new( inner: VentedServer::new(id, private_key, keys, get_settings().timeouts()),
id,
private_key,
keys,
get_settings().timeouts(),
num_threads * 2,
num_threads * 10,
),
listen_addresses: Vec::new(), listen_addresses: Vec::new(),
listeners: Vec::new(),
module_pool: HashMap::new(), module_pool: HashMap::new(),
modules: HashMap::new(), modules: HashMap::new(),
} }
@ -56,13 +45,12 @@ impl SnekcloudServer {
/// Starts listening on all addresses and runs the module tick loop /// Starts listening on all addresses and runs the module tick loop
pub fn run(&mut self) -> SnekcloudResult<()> { pub fn run(&mut self) -> SnekcloudResult<()> {
for address in &self.listen_addresses { for address in &self.listen_addresses {
self.listeners.push(self.inner.listen(address.clone())) self.inner.listen(address.clone())
} }
let modules = mem::take(&mut self.modules); let modules = mem::take(&mut self.modules);
let (tx, rx) = channel(); let (tx, rx) = channel();
let tick_context = TickContext::new(self.inner.node_id(), tx, self.inner.nodes_ref()); let tick_context = TickContext::new(self.inner.node_id(), tx, self.inner.nodes_ref());
let node_count = self.inner.nodes().len();
for (name, mut module) in modules { for (name, mut module) in modules {
self.module_pool self.module_pool
@ -85,31 +73,25 @@ impl SnekcloudServer {
}, },
); );
} }
let invocation_pool = ScheduledThreadPool::new(node_count);
for invocation in rx {
let mut future = self
.inner
.emit(invocation.target_node.clone(), invocation.event);
let mut invocation_result = invocation.result;
let node_id = invocation.target_node;
invocation_pool.execute(move || { task::block_on(self.handle_invocations(rx));
let result = future.get_value_with_timeout(Duration::from_secs(60));
if let Some(result) = result {
invocation_result.result(result.map_err(SnekcloudError::from));
} else {
log::error!("Failed to send event: Timeout after 5s");
invocation_result.reject(SnekcloudError::Vented(VentedError::UnreachableNode(
node_id,
)));
}
});
}
Ok(()) Ok(())
} }
/// Handles invocations
async fn handle_invocations(&self, rx: Receiver<EventInvocation>) {
for mut invocation in rx {
let result = self
.inner
.emit(invocation.target_node.clone(), invocation.event)
.await;
invocation
.result
.result(result.map_err(SnekcloudError::from));
}
}
/// Registers a module on the server /// Registers a module on the server
pub fn register_module( pub fn register_module(
&mut self, &mut self,

@ -24,7 +24,6 @@ pub struct Settings {
pub node_id: String, pub node_id: String,
pub private_key: PathBuf, pub private_key: PathBuf,
pub node_data_dir: PathBuf, pub node_data_dir: PathBuf,
pub num_threads: usize,
pub trusted_nodes: Vec<String>, pub trusted_nodes: Vec<String>,
pub send_timeout_secs: u64, pub send_timeout_secs: u64,
pub redirect_timeout_secs: u64, pub redirect_timeout_secs: u64,
@ -48,7 +47,6 @@ impl Default for Settings {
node_data_dir: PathBuf::from("nodes"), node_data_dir: PathBuf::from("nodes"),
log_folder: PathBuf::from("logs"), log_folder: PathBuf::from("logs"),
trusted_nodes: vec![], trusted_nodes: vec![],
num_threads: num_cpus::get(),
send_timeout_secs: 5, send_timeout_secs: 5,
redirect_timeout_secs: 20, redirect_timeout_secs: 20,
modules: ModuleSettings::default(), modules: ModuleSettings::default(),
@ -88,9 +86,6 @@ impl ValidateSettings for Settings {
if !validate_node_id(&self.node_id) { if !validate_node_id(&self.node_id) {
panic!(format!("Invalid NodeID {}", self.node_id)); panic!(format!("Invalid NodeID {}", self.node_id));
} }
if self.num_threads == 0 {
panic!("Thread number must be greater than 0")
}
self.modules.validate(); self.modules.validate();
} }
} }

Loading…
Cancel
Save