diff --git a/.gitignore b/.gitignore index 8d81ed9..f9f9632 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,6 @@ /target .idea *.env -test config nodes +testdata \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 8050f67..4594a3d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -519,6 +519,16 @@ dependencies = [ "autocfg", ] +[[package]] +name = "num_cpus" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05499f3756671c15885fee9034446956fff3f243d6077b91e5767df161f766b3" +dependencies = [ + "hermit-abi", + "libc", +] + [[package]] name = "opaque-debug" version = "0.3.0" @@ -843,8 +853,10 @@ dependencies = [ "hostname", "log", "mac_address", + "num_cpus", "rand", "rusqlite", + "scheduled-thread-pool", "serde 1.0.117", "structopt", "toml", @@ -1011,9 +1023,9 @@ checksum = "f1bddf1187be692e79c5ffeab891132dfb0f236ed36a43c7ed39f1165ee20191" [[package]] name = "vented" -version = "0.1.2" +version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b92984cc8ed261509126ebc9b043c814e98f3d8da9dbf2e1d21e072c6906c8a7" +checksum = "3b3526b3689e915e1545250137a9c06875736629ef2def374e78c9755ee32868" dependencies = [ "byteorder", "crossbeam-utils", diff --git a/Cargo.toml b/Cargo.toml index 7a4d578..960077f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,7 +7,7 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -vented = "0.1.2" +vented = "0.3.1" rusqlite = "0.24.1" rand = "0.7.3" base64 = "0.13.0" @@ -21,4 +21,6 @@ chrono = "0.4.19" toml = "0.5.7" serde = { version = "1.0.117", features = ["serde_derive"] } config = "0.10.1" -glob = "0.3.0" \ No newline at end of file +glob = "0.3.0" +scheduled-thread-pool = "0.2.5" +num_cpus = "1.13.0" \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index 03df805..b347fe0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,6 +8,7 @@ use std::fs; use structopt::StructOpt; use vented::crypto::SecretKey; use crate::data::node_data::NodeData; +use crate::modules::heartbeat::HeartbeatModule; pub(crate) mod utils; pub(crate) mod server; @@ -76,6 +77,7 @@ fn start_server(_options: Opt, settings: &Settings) -> SnekcloudResult<()> { for address in &settings.listen_addresses { server.add_listen_address(address.clone()); } + server.register_module(HeartbeatModule::new())?; server.run()?; Ok(()) diff --git a/src/modules/heartbeat/mod.rs b/src/modules/heartbeat/mod.rs new file mode 100644 index 0000000..cf27724 --- /dev/null +++ b/src/modules/heartbeat/mod.rs @@ -0,0 +1,58 @@ +use crate::modules::Module; +use vented::server::VentedServer; +use crate::utils::result::SnekcloudResult; +use scheduled_thread_pool::ScheduledThreadPool; +use vented::result::{VentedResult}; +use crate::modules::heartbeat::payloads::HeartbeatPayload; +use std::time::{Instant, Duration}; +use vented::event::Event; + +mod payloads; +const HEARTBEAT_BEAT_EVENT: &str = "heartbeat:beat"; +const HEARTBEAT_RATE_SECONDS: u64 = 10; + +pub struct HeartbeatModule { + last_tick: Instant, +} + +impl HeartbeatModule { + pub fn new() -> Self { + Self { + last_tick: Instant::now() + } + } +} + +impl Module for HeartbeatModule { + fn name(&self) -> String { + "HeartbeatModule".to_string() + } + + fn init(&mut self, server: &mut VentedServer, _: &mut ScheduledThreadPool) -> SnekcloudResult<()> { + server.on(HEARTBEAT_BEAT_EVENT, |event| { + let payload = event.get_payload::().unwrap(); + log::debug!("Latency to node {} is {} ms", payload.node_id, payload.get_beat_time().elapsed().unwrap().as_millis()); + + None + }); + + Ok(()) + } + + fn boxed(self) -> Box { + Box::new(self) + } + + fn tick(&mut self, server: &mut VentedServer, _: &mut ScheduledThreadPool) -> VentedResult<()> { + if self.last_tick.elapsed() > Duration::from_secs(HEARTBEAT_RATE_SECONDS) { + for node in server.nodes() { + if let Err(e) = server.emit(node.id.clone(), Event::with_payload(HEARTBEAT_BEAT_EVENT, &HeartbeatPayload::now(server.node_id()))) { + log::warn!("Node {} is not reachable: {}", node.id, e) + } + } + self.last_tick = Instant::now(); + } + + Ok(()) + } +} \ No newline at end of file diff --git a/src/modules/heartbeat/payloads.rs b/src/modules/heartbeat/payloads.rs new file mode 100644 index 0000000..c0eebac --- /dev/null +++ b/src/modules/heartbeat/payloads.rs @@ -0,0 +1,24 @@ +use serde::{Serialize, Deserialize}; +use std::time::{ UNIX_EPOCH, Duration, SystemTime}; +use std::ops::Add; + + +#[derive(Serialize, Deserialize)] +pub struct HeartbeatPayload { + pub node_id: String, + beat_at: u64, +} + +impl HeartbeatPayload { + pub fn now(node_id: String) -> Self { + let start = SystemTime::now(); + Self { + node_id, + beat_at: start.duration_since(UNIX_EPOCH).unwrap().as_millis() as u64 + } + } + + pub fn get_beat_time(&self) -> SystemTime { + UNIX_EPOCH.add(Duration::from_millis(self.beat_at)) + } +} \ No newline at end of file diff --git a/src/modules/mod.rs b/src/modules/mod.rs index 2829737..4d67c12 100644 --- a/src/modules/mod.rs +++ b/src/modules/mod.rs @@ -1,6 +1,13 @@ use vented::server::VentedServer; use crate::utils::result::SnekcloudResult; +use scheduled_thread_pool::ScheduledThreadPool; +use vented::result::VentedResult; + +pub mod heartbeat; pub trait Module { - fn init(&mut self, server: &VentedServer) -> SnekcloudResult<()>; + fn name(&self) -> String; + fn init(&mut self, server: &mut VentedServer, pool: &mut ScheduledThreadPool) -> SnekcloudResult<()>; + fn boxed(self) -> Box; + fn tick(&mut self, server: &mut VentedServer, pool: &mut ScheduledThreadPool) -> VentedResult<()>; } \ No newline at end of file diff --git a/src/server/mod.rs b/src/server/mod.rs index 9957d2a..c332507 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -4,11 +4,19 @@ use vented::crypto::SecretKey; use vented::server::data::Node; use vented::WaitGroup; use crate::modules::Module; +use scheduled_thread_pool::ScheduledThreadPool; +use std::collections::HashMap; +use std::thread; +use std::time::{Duration, Instant}; + +const SERVER_TICK_RATE_MS: u64 = 1; pub struct SnekcloudServer { inner: VentedServer, listen_addresses: Vec, listeners: Vec, + module_pool: ScheduledThreadPool, + modules: HashMap> } impl SnekcloudServer { @@ -18,6 +26,8 @@ impl SnekcloudServer { inner: VentedServer::new(id, private_key, keys, num_threads), listen_addresses: Vec::new(), listeners: Vec::new(), + module_pool: ScheduledThreadPool::with_name("modules", num_threads), + modules: HashMap::new() } } @@ -26,17 +36,33 @@ impl SnekcloudServer { self.listen_addresses.push(address); } - /// Starts listening on all addresses + /// Starts listening on all addresses and runs the module tick loop pub fn run(&mut self) -> SnekcloudResult<()> { for address in &self.listen_addresses { self.listeners.push(self.inner.listen(address.clone())) } - - Ok(()) + let sleep_duration = Duration::from_millis(SERVER_TICK_RATE_MS); + loop { + let start = Instant::now(); + for (name, module) in &mut self.modules { + if let Err(e) = module.tick(&mut self.inner, &mut self.module_pool) { + log::error!("Error when ticking module {}: {}", name, e); + } + } + let elapsed = start.elapsed(); + if elapsed < sleep_duration { + thread::sleep( sleep_duration - elapsed); + } else { + log::warn!("Can't keep up. Last tick took {} ms", elapsed.as_millis()) + } + } } /// Registers a module on the server - pub fn register_module(&mut self, module: &mut impl Module) -> SnekcloudResult<()> { - module.init(&mut self.inner) + pub fn register_module(&mut self, mut module: impl Module) -> SnekcloudResult<()> { + module.init(&mut self.inner, &mut self.module_pool)?; + self.modules.insert(module.name(), module.boxed()); + + Ok(()) } } \ No newline at end of file diff --git a/src/utils/logging.rs b/src/utils/logging.rs index 4f1ebb7..2d6ba20 100644 --- a/src/utils/logging.rs +++ b/src/utils/logging.rs @@ -13,7 +13,7 @@ pub fn init_logger() { let color = get_level_style(record.level()); writeln!( buf, - "{:<12} {:<45}| {} {}: {}", + "{:<25} {:<45}| {} {}: {}", format!("thread::{}", thread::current().name().unwrap_or("main")).dimmed(), record.target().dimmed().italic(), Local::now().format("%Y-%m-%dT%H:%M:%S"), diff --git a/src/utils/settings.rs b/src/utils/settings.rs index 67ac985..b89464e 100644 --- a/src/utils/settings.rs +++ b/src/utils/settings.rs @@ -22,7 +22,7 @@ pub struct Settings { impl Default for Settings { fn default() -> Self { Self { - listen_addresses: vec!["127.0.0.1:22222".to_string()], + listen_addresses: vec![], node_id: get_node_id(), private_key: PathBuf::from("node_key"), node_data_dir: PathBuf::from("nodes"),