diff --git a/Cargo.lock b/Cargo.lock index 201a661..2286b27 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -97,6 +97,15 @@ dependencies = [ "winapi", ] +[[package]] +name = "async-listen" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c0eff11d7d3dbf808fb25952cc54a0bcf50b501ae6d6ea98a817009b330d0a2a" +dependencies = [ + "async-std", +] + [[package]] name = "async-mutex" version = "1.4.0" @@ -139,6 +148,17 @@ version = "4.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e91831deabf0d6d7ec49552e489aed63b7456a7a3c46cff62adad428110b0af0" +[[package]] +name = "async-trait" +version = "0.1.41" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b246867b8b3b6ae56035f1eb1ed557c1d8eae97f0d53696138a50fa0e3a3b8c0" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "atomic-waker" version = "1.0.0" @@ -434,6 +454,21 @@ dependencies = [ "log", ] +[[package]] +name = "futures" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b3b0c040a1fe6529d30b3c5944b280c7f0dcb2930d2c3062bca967b602583d0" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" version = "0.3.8" @@ -441,6 +476,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4b7109687aa4e177ef6fe84553af6280ef2778bdb7783ba44c9dc3399110fe64" dependencies = [ "futures-core", + "futures-sink", ] [[package]] @@ -449,6 +485,17 @@ version = "0.3.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "847ce131b72ffb13b6109a221da9ad97a64cbe48feb1028356b836b47b8f1748" +[[package]] +name = "futures-executor" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4caa2b2b68b880003057c1dd49f1ed937e38f22fcf6c212188a121f08cf40a65" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + [[package]] name = "futures-io" version = "0.3.8" @@ -470,6 +517,53 @@ dependencies = [ "waker-fn", ] +[[package]] +name = "futures-macro" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77408a692f1f97bcc61dc001d752e00643408fbc922e4d634c655df50d595556" +dependencies = [ + "proc-macro-hack", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "futures-sink" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f878195a49cee50e006b02b93cf7e0a95a38ac7b776b4c4d9cc1207cd20fcb3d" + +[[package]] +name = "futures-task" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7c554eb5bf48b2426c4771ab68c6b14468b6e76cc90996f528c3338d761a4d0d" +dependencies = [ + "once_cell", +] + +[[package]] +name = "futures-util" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d304cff4a7b99cfb7986f7d43fbe93d175e72e704a8860787cc95e9ffd85cbd2" +dependencies = [ + "futures-channel", + "futures-core", + "futures-io", + "futures-macro", + "futures-sink", + "futures-task", + "memchr", + "pin-project", + "pin-utils", + "proc-macro-hack", + "proc-macro-nested", + "slab", +] + [[package]] name = "generic-array" version = "0.14.4" @@ -796,6 +890,26 @@ dependencies = [ "winapi", ] +[[package]] +name = "pin-project" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee41d838744f60d959d7074e3afb6b35c7456d0f61cad38a24e35e6553f73841" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81a4ffa594b66bff340084d4081df649a7dc049ac8d7fc458d8e628bfbbb2f86" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "pin-project-lite" version = "0.1.11" @@ -866,6 +980,18 @@ dependencies = [ "version_check", ] +[[package]] +name = "proc-macro-hack" +version = "0.5.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dbf0c48bc1d91375ae5c3cd81e3722dff1abcf81a30960240640d223f59fe0e5" + +[[package]] +name = "proc-macro-nested" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eba180dafb9038b050a4c280019bbedf9f2467b61e5d892dcad585bb57aadc5a" + [[package]] name = "proc-macro2" version = "1.0.24" @@ -1007,15 +1133,6 @@ dependencies = [ "zeroize", ] -[[package]] -name = "scheduled-thread-pool" -version = "0.2.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dc6f74fd1204073fa02d5d5d68bec8021be4c38690b61264b2fdb48083d0e7d7" -dependencies = [ - "parking_lot", -] - [[package]] name = "scopeguard" version = "1.1.0" @@ -1111,11 +1228,13 @@ name = "snekcloud-server" version = "0.1.0" dependencies = [ "async-std", + "async-trait", "base64", "chrono", "colored", "config", "fern", + "futures", "glob", "hostname", "lazy_static", @@ -1126,7 +1245,6 @@ dependencies = [ "rand", "regex", "rusqlite", - "scheduled-thread-pool", "serde 1.0.117", "serde_json", "structopt", @@ -1291,14 +1409,16 @@ checksum = "f1bddf1187be692e79c5ffeab891132dfb0f236ed36a43c7ed39f1165ee20191" [[package]] name = "vented" -version = "0.11.0" +version = "0.11.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "33386edc15f14833ecb3b40f3ec75052684202ad892a916e8baf603d686ae543" +checksum = "cdd0a331d4b17a1ac906ffc38825113984f244e376bd58d90affbafee5b650ed" dependencies = [ + "async-listen", "async-std", "byteorder", "crossbeam-utils", "crypto_box", + "futures", "generic-array", "log", "parking_lot", diff --git a/Cargo.toml b/Cargo.toml index d7fd4ab..83a89a3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,7 +11,7 @@ path = "src/main.rs" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -vented = "0.11.0" +vented = "0.11.2" rusqlite = "0.24.1" rand = "0.7.3" base64 = "0.13.0" @@ -25,11 +25,12 @@ toml = "0.5.7" serde = { version = "1.0.117", features = ["serde_derive"] } config = "0.10.1" glob = "0.3.0" -scheduled-thread-pool = "0.2.5" num_cpus = "1.13.0" lazy_static = "1.4.0" parking_lot = "0.11.0" serde_json = "1.0.59" fern = "0.6.0" regex = "1.4.2" -async-std = "1.7.0" \ No newline at end of file +async-std = {version = "1.7.0", features=["unstable"]} +async-trait = "0.1.41" +futures = "0.3.8" \ No newline at end of file diff --git a/src/modules/heartbeat/mod.rs b/src/modules/heartbeat/mod.rs index e9b2a64..b104ff3 100644 --- a/src/modules/heartbeat/mod.rs +++ b/src/modules/heartbeat/mod.rs @@ -1,12 +1,14 @@ use crate::modules::heartbeat::payloads::HeartbeatPayload; use crate::modules::heartbeat::settings::HeartbeatSettings; use crate::modules::Module; -use crate::server::tick_context::TickContext; +use crate::server::tick_context::RunContext; use crate::utils::result::SnekcloudResult; use crate::utils::settings::get_settings; use crate::utils::write_json_pretty; +use async_std::task; +use async_trait::async_trait; +use chrono::Local; use parking_lot::Mutex; -use scheduled_thread_pool::ScheduledThreadPool; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::sync::Arc; @@ -28,6 +30,7 @@ enum NodeState { struct NodeInfo { ping: Option, state: NodeState, + timestamp: String, } impl NodeInfo { @@ -35,18 +38,19 @@ impl NodeInfo { Self { ping: Some(ping), state: NodeState::Alive, + timestamp: Local::now().format("%Y-%m-%dT%H:%M:%S").to_string(), } } fn dead() -> Self { Self { ping: None, state: NodeState::Dead, + timestamp: Local::now().format("%Y-%m-%dT%H:%M:%S").to_string(), } } } pub struct HeartbeatModule { - last_tick: Instant, settings: HeartbeatSettings, node_states: Arc>>>, } @@ -54,23 +58,19 @@ pub struct HeartbeatModule { impl HeartbeatModule { pub fn new() -> Self { Self { - last_tick: Instant::now(), settings: get_settings().modules.heartbeat, node_states: Arc::new(Mutex::new(HashMap::new())), } } } +#[async_trait] impl Module for HeartbeatModule { fn name(&self) -> String { "HeartbeatModule".to_string() } - fn init( - &mut self, - server: &mut VentedServer, - pool: &mut ScheduledThreadPool, - ) -> SnekcloudResult<()> { + fn init(&mut self, server: &mut VentedServer) -> SnekcloudResult<()> { server.on(HEARTBEAT_BEAT_EVENT, { let node_states = Arc::clone(&self.node_states); @@ -92,19 +92,6 @@ impl Module for HeartbeatModule { }) } }); - if let Some(output) = &self.settings.output_file { - pool.execute_at_fixed_rate(self.settings.interval(), self.settings.interval(), { - let path = output.clone(); - let states = Arc::clone(&self.node_states); - move || { - let states = states.lock(); - - if let Err(e) = write_json_pretty(&path, &*states) { - log::error!("Failed to write output states to file: {}", e) - } - } - }); - } Ok(()) } @@ -113,40 +100,39 @@ impl Module for HeartbeatModule { Box::new(self) } - fn tick( - &mut self, - mut context: TickContext, - pool: &mut ScheduledThreadPool, - ) -> SnekcloudResult<()> { - if self.last_tick.elapsed() > self.settings.interval() { - log::trace!("Sending heartbeat..."); - for node in context.living_nodes() { - let mut future = context.emit( - node.id.clone(), - Event::with_payload( - HEARTBEAT_BEAT_EVENT, - &HeartbeatPayload::now(context.node_id().clone()), - ), - ); - let states = Arc::clone(&self.node_states); - pool.execute(move || { - match future.get_value_with_timeout(Duration::from_secs(60)) { - Some(Err(e)) => { - log::debug!("Node {} is not reachable: {}", node.id, e); - Self::insert_state(&mut states.lock(), node.id, NodeInfo::dead()); - } - None => { - log::debug!("Node {} is not reachable: Timeout", node.id); - Self::insert_state(&mut states.lock(), node.id, NodeInfo::dead()); + async fn run(&mut self, context: RunContext) -> SnekcloudResult<()> { + for node in context.nodes() { + let mut context = context.clone(); + let node_states = Arc::clone(&self.node_states); + let interval = self.settings.interval(); + + task::spawn(async move { + loop { + Self::send_heartbeat(&mut context, &node.id, Arc::clone(&node_states)).await; + + if !context.check_alive(&node.id) { + let start = Instant::now(); + while !context.check_alive(&node.id) { + task::sleep(Duration::from_secs(10)).await; + if start.elapsed() > interval * 100 { + break; + } } - _ => {} + } else { + task::sleep(interval).await } - }); + } + }); + } + loop { + if let Some(path) = &self.settings.output_file { + let states = self.node_states.lock(); + if let Err(e) = write_json_pretty(path, &*states) { + log::error!("Failed to write output states to file: {}", e) + } } - self.last_tick = Instant::now(); + task::sleep(self.settings.interval()).await } - - Ok(()) } } @@ -164,4 +150,36 @@ impl HeartbeatModule { states.insert(id, vec![state]); } } + + async fn send_heartbeat( + context: &mut RunContext, + target: &String, + states: Arc>>>, + ) { + log::trace!("Sending heartbeat to {}...", target); + let mut value = context + .emit( + target.clone(), + Event::with_payload( + HEARTBEAT_BEAT_EVENT, + &HeartbeatPayload::now(context.node_id().clone()), + ), + ) + .await; + + match value + .get_value_with_timeout_async(Duration::from_secs(60)) + .await + { + Some(Err(e)) => { + log::debug!("Node {} is not reachable: {}", target, e); + Self::insert_state(&mut *states.lock(), target.clone(), NodeInfo::dead()); + } + None => { + log::debug!("Node {} is not reachable: Timeout", target); + Self::insert_state(&mut *states.lock(), target.clone(), NodeInfo::dead()); + } + _ => {} + } + } } diff --git a/src/modules/mod.rs b/src/modules/mod.rs index 3502b9f..2cebf4e 100644 --- a/src/modules/mod.rs +++ b/src/modules/mod.rs @@ -1,19 +1,15 @@ -use crate::server::tick_context::TickContext; +use crate::server::tick_context::RunContext; use crate::utils::result::SnekcloudResult; -use scheduled_thread_pool::ScheduledThreadPool; +use async_trait::async_trait; use vented::server::VentedServer; pub mod heartbeat; pub mod nodes_refresh; +#[async_trait] pub trait Module { fn name(&self) -> String; - fn init( - &mut self, - server: &mut VentedServer, - pool: &mut ScheduledThreadPool, - ) -> SnekcloudResult<()>; + fn init(&mut self, server: &mut VentedServer) -> SnekcloudResult<()>; fn boxed(self) -> Box; - fn tick(&mut self, context: TickContext, pool: &mut ScheduledThreadPool) - -> SnekcloudResult<()>; + async fn run(&mut self, context: RunContext) -> SnekcloudResult<()>; } diff --git a/src/modules/nodes_refresh/mod.rs b/src/modules/nodes_refresh/mod.rs index 5854e2d..7ff8a6c 100644 --- a/src/modules/nodes_refresh/mod.rs +++ b/src/modules/nodes_refresh/mod.rs @@ -1,16 +1,16 @@ use crate::data::node_data::NodeData; use crate::modules::nodes_refresh::settings::NodesRefreshSettings; use crate::modules::Module; -use crate::server::tick_context::TickContext; +use crate::server::tick_context::RunContext; use crate::utils::result::SnekcloudResult; use crate::utils::settings::get_settings; +use async_std::task; +use async_trait::async_trait; use parking_lot::Mutex; -use scheduled_thread_pool::ScheduledThreadPool; use std::collections::HashMap; use std::path::PathBuf; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; -use std::time::{Duration, Instant, UNIX_EPOCH}; use vented::event::Event; use vented::server::data::Node; use vented::server::server_events::{NodeListPayload, NODE_LIST_REQUEST_EVENT}; @@ -22,20 +22,16 @@ pub mod settings; pub struct NodesRefreshModule { nodes: Arc>>, update_required: Arc, - last_request: Instant, settings: NodesRefreshSettings, } +#[async_trait] impl Module for NodesRefreshModule { fn name(&self) -> String { "node_list_refresh".to_string() } - fn init( - &mut self, - server: &mut VentedServer, - pool: &mut ScheduledThreadPool, - ) -> SnekcloudResult<()> { + fn init(&mut self, server: &mut VentedServer) -> SnekcloudResult<()> { { let mut node_list = self.nodes.lock(); for node in server.nodes() { @@ -75,31 +71,6 @@ impl Module for NodesRefreshModule { }) } }); - pool.execute_at_fixed_rate(Duration::from_secs(10), self.settings.update_interval(), { - let nodes = Arc::clone(&self.nodes); - let update_required = Arc::clone(&self.update_required); - - move || { - if update_required.load(Ordering::Relaxed) { - let nodes_folder = get_settings().node_data_dir; - nodes - .lock() - .values() - .cloned() - .map(|node| { - NodeData::with_addresses(node.id, node.addresses, node.public_key) - }) - .for_each(|data| { - let mut path = nodes_folder.clone(); - path.push(PathBuf::from(format!("{}.toml", data.id))); - - if let Err(e) = data.write_to_file(path) { - log::error!("Failed to write updated node data: {}", e); - } - }); - } - } - }); Ok(()) } @@ -108,34 +79,45 @@ impl Module for NodesRefreshModule { Box::new(self) } - fn tick( - &mut self, - mut context: TickContext, - _: &mut ScheduledThreadPool, - ) -> SnekcloudResult<()> { - if self.last_request.elapsed() > self.settings.update_interval() { - context - .living_nodes() - .iter() - .filter(|node| node.trusted) - .for_each(|node| { - context.emit(node.id.clone(), Event::new(NODE_LIST_REQUEST_EVENT)); - }); - self.last_request = Instant::now(); - } + async fn run(&mut self, mut context: RunContext) -> SnekcloudResult<()> { + loop { + for node in context.living_nodes().iter().filter(|node| node.trusted) { + context + .emit(node.id.clone(), Event::new(NODE_LIST_REQUEST_EVENT)) + .await; + } + if self.update_required.load(Ordering::Relaxed) { + self.write_node_data(); + } - Ok(()) + task::sleep(self.settings.update_interval()).await + } } } impl NodesRefreshModule { pub fn new() -> Self { - let null_time = Instant::now() - UNIX_EPOCH.elapsed().unwrap(); Self { nodes: Arc::new(Mutex::new(HashMap::new())), settings: get_settings().modules.nodes_refresh, - last_request: null_time, update_required: Arc::new(AtomicBool::new(false)), } } + + fn write_node_data(&self) { + let nodes_folder = get_settings().node_data_dir; + self.nodes + .lock() + .values() + .cloned() + .map(|node| NodeData::with_addresses(node.id, node.addresses, node.public_key)) + .for_each(|data| { + let mut path = nodes_folder.clone(); + path.push(PathBuf::from(format!("{}.toml", data.id))); + + if let Err(e) = data.write_to_file(path) { + log::error!("Failed to write updated node data: {}", e); + } + }); + } } diff --git a/src/server/mod.rs b/src/server/mod.rs index 33452ef..70d0aac 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -1,28 +1,21 @@ use crate::modules::Module; -use crate::server::tick_context::{EventInvocation, TickContext}; +use crate::server::tick_context::{EventInvocation, RunContext}; use crate::utils::result::{SnekcloudError, SnekcloudResult}; use crate::utils::settings::get_settings; -use parking_lot::Mutex; -use scheduled_thread_pool::ScheduledThreadPool; +use async_std::sync::{channel, Receiver}; use async_std::task; use std::collections::HashMap; use std::mem; -use std::sync::mpsc::{channel, Receiver}; -use std::sync::Arc; -use std::time::Duration; use vented::server::data::Node; use vented::server::VentedServer; use vented::stream::SecretKey; pub mod tick_context; -const SERVER_TICK_RATE_MS: u64 = 10; - pub struct SnekcloudServer { inner: VentedServer, listen_addresses: Vec, - module_pool: HashMap>>, modules: HashMap>, } @@ -32,7 +25,6 @@ impl SnekcloudServer { Self { inner: VentedServer::new(id, private_key, keys, get_settings().timeouts()), listen_addresses: Vec::new(), - module_pool: HashMap::new(), modules: HashMap::new(), } } @@ -48,30 +40,17 @@ impl SnekcloudServer { self.inner.listen(address.clone()) } - let modules = mem::take(&mut self.modules); - let (tx, rx) = channel(); - let tick_context = TickContext::new(self.inner.node_id(), tx, self.inner.nodes_ref()); + let mut modules = mem::take(&mut self.modules).into_iter(); + let (tx, rx) = channel(10); + let tick_context = RunContext::new(self.inner.node_id(), tx, self.inner.nodes_ref()); - for (name, mut module) in modules { - self.module_pool - .get(&name) - .unwrap() - .lock() - .execute_at_fixed_rate( - Duration::from_millis(SERVER_TICK_RATE_MS), - Duration::from_millis(SERVER_TICK_RATE_MS), - { - let mut module_pool = ScheduledThreadPool::new(1); - let tick_context = TickContext::clone(&tick_context); - move || { - if let Err(e) = - module.tick(TickContext::clone(&tick_context), &mut module_pool) - { - log::error!("Error when ticking module {}: {}", name, e); - } - } - }, - ); + while let Some((name, mut module)) = modules.next() { + let tick_context = RunContext::clone(&tick_context); + task::spawn(async move { + if let Err(e) = module.run(RunContext::clone(&tick_context)).await { + log::error!("Error when ticking module {}: {}", name, e); + } + }); } task::block_on(self.handle_invocations(rx)); @@ -81,14 +60,14 @@ impl SnekcloudServer { /// Handles invocations async fn handle_invocations(&self, rx: Receiver) { - for mut invocation in rx { - let result = self - .inner - .emit(invocation.target_node.clone(), invocation.event) - .await; - invocation - .result - .result(result.map_err(SnekcloudError::from)); + while let Ok(mut invocation) = rx.recv().await { + let inner = self.inner.clone(); + task::spawn(async move { + let result = task::block_on(inner.emit(invocation.target_node, invocation.event)); + invocation + .result + .result(result.map_err(SnekcloudError::from)); + }); } } @@ -97,10 +76,7 @@ impl SnekcloudServer { &mut self, mut module: impl Module + Send + Sync, ) -> SnekcloudResult<()> { - let module_pool = Arc::new(Mutex::new(ScheduledThreadPool::new(2))); - - module.init(&mut self.inner, &mut module_pool.lock())?; - self.module_pool.insert(module.name(), module_pool); + module.init(&mut self.inner)?; self.modules.insert(module.name(), module.boxed()); Ok(()) diff --git a/src/server/tick_context.rs b/src/server/tick_context.rs index c4b7e8a..0b22a45 100644 --- a/src/server/tick_context.rs +++ b/src/server/tick_context.rs @@ -1,14 +1,14 @@ use crate::utils::result::SnekcloudError; +use async_std::sync::Sender; use parking_lot::Mutex; use std::collections::HashMap; -use std::sync::mpsc::Sender; use std::sync::Arc; use vented::event::Event; use vented::server::data::{Node, NodeData}; use vented::utils::sync::AsyncValue; #[derive(Clone)] -pub struct TickContext { +pub struct RunContext { nodes: Arc>>, event_sender: Sender, node_id: String, @@ -20,7 +20,7 @@ pub struct EventInvocation { pub target_node: String, } -impl TickContext { +impl RunContext { pub fn new( node_id: String, sender: Sender, @@ -33,7 +33,7 @@ impl TickContext { } } - pub fn emit( + pub async fn emit( &mut self, target_node: S, event: Event, @@ -43,9 +43,9 @@ impl TickContext { .send(EventInvocation { event, target_node: target_node.to_string(), - result: AsyncValue::clone(&value), + result: value.clone(), }) - .unwrap(); + .await; value } diff --git a/src/utils/logging.rs b/src/utils/logging.rs index a866484..3eedad4 100644 --- a/src/utils/logging.rs +++ b/src/utils/logging.rs @@ -47,6 +47,8 @@ pub fn init_logger() { ) .unwrap_or(LevelFilter::Info), ) + .level_for("async_io", log::LevelFilter::Info) + .level_for("polling", log::LevelFilter::Info) .chain(std::io::stdout()) .chain( fern::log_file(log_dir.join(PathBuf::from(format!(