diff --git a/Cargo.lock b/Cargo.lock index d741236..5e75c76 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1026,9 +1026,9 @@ checksum = "f1bddf1187be692e79c5ffeab891132dfb0f236ed36a43c7ed39f1165ee20191" [[package]] name = "vented" -version = "0.6.3" +version = "0.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "03ea719ca6db23a76ee5d0f8ebbcd615d9d4954af88b7a88977b9d5a7d469a85" +checksum = "0bb03c973ba228728f10ab9480ca5b74d8dc3bd57d2da29476efa0d0332b4688" dependencies = [ "byteorder", "crossbeam-utils", diff --git a/Cargo.toml b/Cargo.toml index f2560f8..2cc5985 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,7 +7,7 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -vented = "0.6.3" +vented = "0.8.1" rusqlite = "0.24.1" rand = "0.7.3" base64 = "0.13.0" diff --git a/src/main.rs b/src/main.rs index fc8e237..89ebf3f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,28 +1,30 @@ +use crate::data::node_data::NodeData; +use crate::modules::heartbeat::HeartbeatModule; +use crate::modules::nodes_refresh::NodesRefreshModule; use crate::server::SnekcloudServer; -use crate::utils::settings::{Settings, get_settings}; -use crate::utils::keys::{extract_private_key, read_node_keys, generate_private_key, armor_private_key}; -use std::path::PathBuf; -use crate::utils::result::SnekcloudResult; +use crate::utils::keys::{ + armor_private_key, extract_private_key, generate_private_key, read_node_keys, +}; use crate::utils::logging::init_logger; +use crate::utils::result::SnekcloudResult; +use crate::utils::settings::{get_settings, Settings}; use std::fs; +use std::path::PathBuf; use structopt::StructOpt; use vented::crypto::SecretKey; -use crate::data::node_data::NodeData; -use crate::modules::heartbeat::HeartbeatModule; -use crate::modules::nodes_refresh::NodesRefreshModule; #[macro_use] extern crate lazy_static; -pub(crate) mod utils; -pub(crate) mod server; pub(crate) mod data; pub(crate) mod modules; +pub(crate) mod server; +pub(crate) mod utils; #[derive(StructOpt, Debug)] struct Opt { #[structopt(subcommand)] - sub_command: Option + sub_command: Option, } #[derive(StructOpt, Debug)] @@ -31,25 +33,24 @@ enum SubCommand { /// Generates a new private key GenerateKey(GenerateKeyOptions), - WriteInfoFile(WriteInfoFileOptions) + WriteInfoFile(WriteInfoFileOptions), } #[derive(StructOpt, Debug)] struct GenerateKeyOptions { /// The file the key is stored to #[structopt(parse(from_os_str))] - output_file: PathBuf + output_file: PathBuf, } - #[derive(StructOpt, Debug)] struct WriteInfoFileOptions { /// The file the info is stored to #[structopt(parse(from_os_str))] - output_file: PathBuf + output_file: PathBuf, } -fn main() -> SnekcloudResult<()>{ +fn main() -> SnekcloudResult<()> { init_logger(); let opt: Opt = Opt::from_args(); let settings = get_settings(); @@ -60,10 +61,14 @@ fn main() -> SnekcloudResult<()>{ let key = generate_private_key(); let string_content = armor_private_key(key); fs::write(options.output_file, string_content)?; - }, + } SubCommand::WriteInfoFile(options) => { let key = get_private_key(&settings)?; - let data = NodeData::with_addresses(settings.node_id, settings.listen_addresses, key.public_key()); + let data = NodeData::with_addresses( + settings.node_id, + settings.listen_addresses, + key.public_key(), + ); data.write_to_file(options.output_file)?; } } @@ -76,7 +81,12 @@ fn main() -> SnekcloudResult<()>{ fn start_server(_options: Opt, settings: &Settings) -> SnekcloudResult<()> { let keys = read_node_keys(&settings.node_data_dir)?; - let mut server = SnekcloudServer::new(settings.node_id.clone(), get_private_key(settings)?, keys, 8); + let mut server = SnekcloudServer::new( + settings.node_id.clone(), + get_private_key(settings)?, + keys, + settings.num_threads, + ); for address in &settings.listen_addresses { server.add_listen_address(address.clone()); @@ -90,4 +100,4 @@ fn start_server(_options: Opt, settings: &Settings) -> SnekcloudResult<()> { fn get_private_key(settings: &Settings) -> SnekcloudResult { extract_private_key(&fs::read_to_string(&settings.private_key)?) -} \ No newline at end of file +} diff --git a/src/modules/heartbeat/mod.rs b/src/modules/heartbeat/mod.rs index d60611f..e6f53e0 100644 --- a/src/modules/heartbeat/mod.rs +++ b/src/modules/heartbeat/mod.rs @@ -1,27 +1,28 @@ +use crate::modules::heartbeat::payloads::HeartbeatPayload; +use crate::modules::heartbeat::settings::HeartbeatSettings; use crate::modules::Module; -use vented::server::VentedServer; +use crate::server::tick_context::TickContext; use crate::utils::result::SnekcloudResult; -use scheduled_thread_pool::ScheduledThreadPool; -use vented::result::{VentedResult}; -use crate::modules::heartbeat::payloads::HeartbeatPayload; -use std::time::{Instant}; -use vented::event::Event; use crate::utils::settings::get_settings; -use crate::modules::heartbeat::settings::HeartbeatSettings; -use std::sync::Arc; +use crate::utils::write_json_pretty; use parking_lot::Mutex; +use scheduled_thread_pool::ScheduledThreadPool; +use serde::{Deserialize, Serialize}; use std::collections::HashMap; -use serde::{Serialize, Deserialize}; -use crate::utils::{write_json_pretty}; +use std::sync::Arc; +use std::time::{Duration, Instant}; +use vented::event::Event; +use vented::result::VentedResult; +use vented::server::VentedServer; -pub mod settings; mod payloads; +pub mod settings; const HEARTBEAT_BEAT_EVENT: &str = "heartbeat:beat"; #[derive(Serialize, Deserialize, Clone, Debug)] enum NodeState { Alive, - Dead + Dead, } #[derive(Serialize, Deserialize, Clone, Debug)] @@ -56,7 +57,7 @@ impl HeartbeatModule { Self { last_tick: Instant::now(), settings: get_settings().modules.heartbeat, - node_states: Arc::new(Mutex::new(HashMap::new())) + node_states: Arc::new(Mutex::new(HashMap::new())), } } } @@ -66,7 +67,11 @@ impl Module for HeartbeatModule { "HeartbeatModule".to_string() } - fn init(&mut self, server: &mut VentedServer, pool: &mut ScheduledThreadPool) -> SnekcloudResult<()> { + fn init( + &mut self, + server: &mut VentedServer, + pool: &mut ScheduledThreadPool, + ) -> SnekcloudResult<()> { server.on(HEARTBEAT_BEAT_EVENT, { let node_states = Arc::clone(&self.node_states); @@ -76,8 +81,11 @@ impl Module for HeartbeatModule { log::debug!("Latency to node {} is {} ms", payload.node_id, latency); let mut states = node_states.lock(); - Self::insert_state(&mut states, payload.node_id, NodeInfo::alive(latency as u64)); - + Self::insert_state( + &mut states, + payload.node_id, + NodeInfo::alive(latency as u64), + ); None } @@ -99,18 +107,34 @@ impl Module for HeartbeatModule { Ok(()) } - fn boxed(self) -> Box { + fn boxed(self) -> Box { Box::new(self) } - fn tick(&mut self, server: &mut VentedServer, _: &mut ScheduledThreadPool) -> VentedResult<()> { + fn tick( + &mut self, + mut context: TickContext, + pool: &mut ScheduledThreadPool, + ) -> VentedResult<()> { if self.last_tick.elapsed() > self.settings.interval() { - for node in server.nodes() { - if let Err(e) = server.emit(node.id.clone(), Event::with_payload(HEARTBEAT_BEAT_EVENT, &HeartbeatPayload::now(server.node_id()))) { - log::debug!("Node {} is not reachable: {}", node.id, e); - let mut states = self.node_states.lock(); - Self::insert_state(&mut states, node.id, NodeInfo::dead()); - } + for node in context.nodes() { + let mut future = context.emit( + node.id.clone(), + Event::with_payload( + HEARTBEAT_BEAT_EVENT, + &HeartbeatPayload::now(context.node_id().clone()), + ), + ); + let states = Arc::clone(&self.node_states); + pool.execute(move || { + if let Some(value) = future.get_value_with_timeout(Duration::from_secs(10)) { + if let Err(e) = &*value { + log::debug!("Node {} is not reachable: {}", node.id, e); + let mut states = states.lock(); + Self::insert_state(&mut states, node.id, NodeInfo::dead()); + } + } + }); } self.last_tick = Instant::now(); } @@ -121,7 +145,9 @@ impl Module for HeartbeatModule { impl HeartbeatModule { fn insert_state(states: &mut HashMap>, id: String, state: NodeInfo) { - lazy_static! {static ref MAX_RECORDS: usize = get_settings().modules.heartbeat.max_record_history;} + lazy_static! { + static ref MAX_RECORDS: usize = get_settings().modules.heartbeat.max_record_history; + } if let Some(states) = states.get_mut(&id) { if states.len() > *MAX_RECORDS { states.remove(0); @@ -131,4 +157,4 @@ impl HeartbeatModule { states.insert(id, vec![state]); } } -} \ No newline at end of file +} diff --git a/src/modules/mod.rs b/src/modules/mod.rs index e7d728b..a41d1a2 100644 --- a/src/modules/mod.rs +++ b/src/modules/mod.rs @@ -2,6 +2,7 @@ use vented::server::VentedServer; use crate::utils::result::SnekcloudResult; use scheduled_thread_pool::ScheduledThreadPool; use vented::result::VentedResult; +use crate::server::tick_context::TickContext; pub mod heartbeat; pub mod nodes_refresh; @@ -9,6 +10,6 @@ pub mod nodes_refresh; pub trait Module { fn name(&self) -> String; fn init(&mut self, server: &mut VentedServer, pool: &mut ScheduledThreadPool) -> SnekcloudResult<()>; - fn boxed(self) -> Box; - fn tick(&mut self, server: &mut VentedServer, pool: &mut ScheduledThreadPool) -> VentedResult<()>; + fn boxed(self) -> Box; + fn tick(&mut self, context: TickContext, pool: &mut ScheduledThreadPool) -> VentedResult<()>; } \ No newline at end of file diff --git a/src/modules/nodes_refresh/mod.rs b/src/modules/nodes_refresh/mod.rs index 59baeb5..f023937 100644 --- a/src/modules/nodes_refresh/mod.rs +++ b/src/modules/nodes_refresh/mod.rs @@ -3,7 +3,7 @@ use vented::result::VentedResult; use vented::server::VentedServer; use crate::utils::result::SnekcloudResult; use scheduled_thread_pool::ScheduledThreadPool; -use vented::server::server_events::{ NodeListPayload}; +use vented::server::server_events::{NodeListPayload, NODE_LIST_REQUEST_EVENT}; use vented::server::data::Node; use std::sync::Arc; use parking_lot::Mutex; @@ -15,6 +15,8 @@ use crate::utils::settings::get_settings; use crate::data::node_data::NodeData; use std::path::PathBuf; use crate::modules::nodes_refresh::settings::NodesRefreshSettings; +use crate::server::tick_context::TickContext; +use vented::event::Event; pub mod settings; @@ -92,15 +94,15 @@ impl Module for NodesRefreshModule { Ok(()) } - fn boxed(self) -> Box { + fn boxed(self) -> Box { Box::new(self) } - fn tick(&mut self, server: &mut VentedServer, _: &mut ScheduledThreadPool) -> VentedResult<()> { + fn tick(&mut self, mut context: TickContext, _: &mut ScheduledThreadPool) -> VentedResult<()> { if self.last_request.elapsed() > self.settings.update_interval() { - if let Err(e) = server.request_node_list() { - log::debug!("Failed to refresh node list: {}", e); - } + context.nodes().iter().filter(|node| node.trusted).for_each(|node| { + context.emit(node.id.clone(), Event::new(NODE_LIST_REQUEST_EVENT)); + }); self.last_request = Instant::now(); } diff --git a/src/server/mod.rs b/src/server/mod.rs index 30cb5a3..a2b55e6 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -1,13 +1,19 @@ -use vented::server::VentedServer; -use crate::utils::result::SnekcloudResult; -use vented::crypto::SecretKey; -use vented::server::data::Node; -use vented::WaitGroup; use crate::modules::Module; +use crate::server::tick_context::TickContext; +use crate::utils::result::{SnekcloudError, SnekcloudResult}; +use parking_lot::Mutex; use scheduled_thread_pool::ScheduledThreadPool; use std::collections::HashMap; -use std::thread; -use std::time::{Duration, Instant}; +use std::mem; +use std::sync::mpsc::channel; +use std::sync::Arc; +use std::time::Duration; +use vented::crypto::SecretKey; +use vented::server::data::Node; +use vented::server::VentedServer; +use vented::WaitGroup; + +pub mod tick_context; const SERVER_TICK_RATE_MS: u64 = 10; @@ -15,8 +21,8 @@ pub struct SnekcloudServer { inner: VentedServer, listen_addresses: Vec, listeners: Vec, - module_pool: ScheduledThreadPool, - modules: HashMap> + module_pool: Arc>, + modules: HashMap>, } impl SnekcloudServer { @@ -26,8 +32,11 @@ impl SnekcloudServer { inner: VentedServer::new(id, private_key, keys, num_threads), listen_addresses: Vec::new(), listeners: Vec::new(), - module_pool: ScheduledThreadPool::with_name("modules", num_threads), - modules: HashMap::new() + module_pool: Arc::new(Mutex::new(ScheduledThreadPool::with_name( + "modules", + num_threads, + ))), + modules: HashMap::new(), } } @@ -41,28 +50,53 @@ impl SnekcloudServer { for address in &self.listen_addresses { self.listeners.push(self.inner.listen(address.clone())) } - let sleep_duration = Duration::from_millis(SERVER_TICK_RATE_MS); - loop { - let start = Instant::now(); - for (name, module) in &mut self.modules { - if let Err(e) = module.tick(&mut self.inner, &mut self.module_pool) { - log::error!("Error when ticking module {}: {}", name, e); - } - } - let elapsed = start.elapsed(); - if elapsed < sleep_duration { - thread::sleep( sleep_duration - elapsed); - } else { - log::warn!("Can't keep up. Last tick took {} ms", elapsed.as_millis()) + + let modules = mem::take(&mut self.modules); + let module_pool = Arc::clone(&self.module_pool); + let (tx, rx) = channel(); + let tick_context = TickContext::new(self.inner.node_id(), tx, self.inner.nodes_ref()); + + for (name, mut module) in modules { + module_pool.lock().execute_at_fixed_rate( + Duration::from_millis(SERVER_TICK_RATE_MS), + Duration::from_millis(SERVER_TICK_RATE_MS), + { + let module_pool = Arc::clone(&module_pool); + let tick_context = TickContext::clone(&tick_context); + move || { + let mut module_pool = module_pool.lock(); + + if let Err(e) = + module.tick(TickContext::clone(&tick_context), &mut module_pool) + { + log::error!("Error when ticking module {}: {}", name, e); + } + } + }, + ); + } + for mut invocation in rx { + match self.inner.emit(invocation.target_node, invocation.event) { + Ok(_) => invocation.result.set_value(Arc::new(Ok(()))), + Err(e) => invocation + .result + .set_value(Arc::new(Err(SnekcloudError::from(e)))), } } + + Ok(()) } /// Registers a module on the server - pub fn register_module(&mut self, mut module: impl Module) -> SnekcloudResult<()> { - module.init(&mut self.inner, &mut self.module_pool)?; + pub fn register_module( + &mut self, + mut module: impl Module + Send + Sync, + ) -> SnekcloudResult<()> { + let mut module_pool = self.module_pool.lock(); + + module.init(&mut self.inner, &mut module_pool)?; self.modules.insert(module.name(), module.boxed()); Ok(()) } -} \ No newline at end of file +} diff --git a/src/server/tick_context.rs b/src/server/tick_context.rs new file mode 100644 index 0000000..af4db83 --- /dev/null +++ b/src/server/tick_context.rs @@ -0,0 +1,51 @@ +use std::sync::mpsc::Sender; +use vented::event::Event; +use std::sync::Arc; +use crate::utils::result::SnekcloudResult; +use vented::server::data::{AsyncValue, Node}; +use parking_lot::Mutex; +use std::collections::HashMap; + +#[derive(Clone)] +pub struct TickContext { + nodes: Arc>>, + event_sender: Sender, + node_id: String, +} + +pub struct EventInvocation { + pub result: AsyncValue>>, + pub event: Event, + pub target_node: String, +} + +impl TickContext { + pub fn new(node_id: String, sender: Sender, nodes: Arc>>) -> Self { + Self { + nodes, + node_id, + event_sender: sender, + } + } + + pub fn emit(&mut self, target_node: S, event: Event) -> AsyncValue>> { + let value = AsyncValue::new(); + self.event_sender.send(EventInvocation { + event, + target_node: target_node.to_string(), + result: AsyncValue::clone(&value), + }).unwrap(); + + value + } + + /// Returns a copy of the nodes of the server + pub fn nodes(&self) -> Vec { + self.nodes.lock().values().cloned().collect() + } + + /// Returns the node + pub fn node_id(&self) -> &String { + &self.node_id + } +} \ No newline at end of file diff --git a/src/utils/keys.rs b/src/utils/keys.rs index 5863891..e58b39b 100644 --- a/src/utils/keys.rs +++ b/src/utils/keys.rs @@ -1,10 +1,10 @@ -use std::path::{PathBuf, Path}; -use vented::crypto::{SecretKey, PublicKey}; -use crate::utils::result::{SnekcloudResult, SnekcloudError}; -use vented::server::data::Node; use crate::data::node_data::NodeData; -use std::fs::create_dir; +use crate::utils::result::{SnekcloudError, SnekcloudResult}; use crate::utils::settings::get_settings; +use std::fs::create_dir; +use std::path::{Path, PathBuf}; +use vented::crypto::{PublicKey, SecretKey}; +use vented::server::data::Node; const PRIVATE_KEY_HEADER_LINE: &str = "---BEGIN-SNEKCLOUD-PRIVATE-KEY---\n"; const PRIVATE_KEY_FOOTER_LINE: &str = "\n---END-SNEKCLOUD-PRIVATE-KEY---"; @@ -17,21 +17,20 @@ pub fn read_node_keys(path: &PathBuf) -> SnekcloudResult> { if !Path::new(path).exists() { create_dir(path)?; } - let dir_content = path.read_dir()?; let trusted_nodes = get_settings().trusted_nodes; - let content = dir_content - .filter_map(|entry| { - let entry = entry.ok()?; + let content = glob::glob(format!("{}/*.toml", path.to_string_lossy()).as_str())? + .filter_map(|path| { + let mut data = NodeData::from_file(path.ok()?).ok()?; - Some((entry.metadata().ok()?, entry)) + Some(Node { + public_key: data.public_key(), + address: data.addresses.pop(), + trusted: trusted_nodes.contains(&data.id), + id: data.id, + }) }) - .filter(|(meta, _)|meta.is_file()) - .filter_map(|(_, entry)|{ - let mut data = NodeData::from_file(entry.path()).ok()?; - - Some(Node {public_key: data.public_key(), address: data.addresses.pop(), trusted: trusted_nodes.contains(&data.id), id: data.id}) - }).collect(); + .collect(); Ok(content) } @@ -52,8 +51,12 @@ pub fn extract_public_key(content: &str) -> SnekcloudResult { /// Extracts a base64 encoded key between the prefix and suffix fn extract_key(content: &str, prefix: &str, suffix: &str) -> SnekcloudResult<[u8; 32]> { - let mut content = content.strip_prefix(prefix).ok_or(SnekcloudError::InvalidKey)?; - content = content.strip_suffix(suffix).ok_or(SnekcloudError::InvalidKey)?; + let mut content = content + .strip_prefix(prefix) + .ok_or(SnekcloudError::InvalidKey)?; + content = content + .strip_suffix(suffix) + .ok_or(SnekcloudError::InvalidKey)?; let key = base64::decode(content)?; if key.len() != 32 { @@ -67,12 +70,20 @@ fn extract_key(content: &str, prefix: &str, suffix: &str) -> SnekcloudResult<[u8 /// Encodes and encases the public key for text representation pub fn armor_public_key(key: PublicKey) -> String { - armor_key(key.to_bytes(), PUBLIC_KEY_HEADER_LINE, PUBLIC_KEY_FOOTER_LINE) + armor_key( + key.to_bytes(), + PUBLIC_KEY_HEADER_LINE, + PUBLIC_KEY_FOOTER_LINE, + ) } /// Encodes and encases the secret key for text representation pub fn armor_private_key(key: SecretKey) -> String { - armor_key(key.to_bytes(), PRIVATE_KEY_HEADER_LINE, PRIVATE_KEY_FOOTER_LINE) + armor_key( + key.to_bytes(), + PRIVATE_KEY_HEADER_LINE, + PRIVATE_KEY_FOOTER_LINE, + ) } /// Returns an armored key @@ -83,4 +94,4 @@ fn armor_key(key: [u8; 32], prefix: &str, suffix: &str) -> String { /// Generates a new private key pub fn generate_private_key() -> SecretKey { SecretKey::generate(&mut rand::thread_rng()) -} \ No newline at end of file +} diff --git a/src/utils/settings.rs b/src/utils/settings.rs index 47ea22e..a137ddf 100644 --- a/src/utils/settings.rs +++ b/src/utils/settings.rs @@ -1,17 +1,16 @@ -use crate::utils::result::{SnekcloudResult, SnekcloudError}; -use serde::{Serialize, Deserialize}; +use crate::modules::heartbeat::settings::HeartbeatSettings; +use crate::modules::nodes_refresh::settings::NodesRefreshSettings; +use crate::utils::result::{SnekcloudError, SnekcloudResult}; use crate::utils::{get_node_id, write_toml_pretty}; +use config::File; +use serde::{Deserialize, Serialize}; use std::fs; use std::path::{Path, PathBuf}; -use config::File; -use crate::modules::heartbeat::settings::HeartbeatSettings; -use crate::modules::nodes_refresh::settings::NodesRefreshSettings; - const CONFIG_DIR: &str = "config/"; const DEFAULT_CONFIG: &str = "config/00_default.toml"; const GLOB_CONFIG: &str = "config/*.toml"; -const ENV_PREFIX : &str = "SNEKCLOUD"; +const ENV_PREFIX: &str = "SNEKCLOUD"; #[derive(Serialize, Deserialize, Clone, Debug)] pub struct Settings { @@ -19,6 +18,7 @@ pub struct Settings { pub node_id: String, pub private_key: PathBuf, pub node_data_dir: PathBuf, + pub num_threads: usize, /// List of trusted nodes pub trusted_nodes: Vec, // modules need to be last because it's a table @@ -39,6 +39,7 @@ impl Default for Settings { private_key: PathBuf::from("node_key"), node_data_dir: PathBuf::from("nodes"), trusted_nodes: vec![], + num_threads: num_cpus::get(), modules: ModuleSettings::default(), } } @@ -48,15 +49,16 @@ impl Default for ModuleSettings { fn default() -> Self { Self { heartbeat: HeartbeatSettings::default(), - nodes_refresh: NodesRefreshSettings::default() + nodes_refresh: NodesRefreshSettings::default(), } } } /// Returns the settings that are lazily retrieved at runtime pub fn get_settings() -> Settings { - - lazy_static! { static ref SETTINGS: Settings = load_settings().expect("Failed to get settings"); } + lazy_static! { + static ref SETTINGS: Settings = load_settings().expect("Failed to get settings"); + } SETTINGS.clone() } @@ -70,10 +72,12 @@ fn load_settings() -> SnekcloudResult { let mut settings = config::Config::default(); settings .merge(config::File::with_name(DEFAULT_CONFIG))? - .merge(glob::glob(GLOB_CONFIG)?.map(|path| File::from(path.unwrap())) - .collect::>())? + .merge( + glob::glob(GLOB_CONFIG)? + .map(|path| File::from(path.unwrap())) + .collect::>(), + )? .merge(config::Environment::with_prefix(ENV_PREFIX))?; - settings.try_into().map_err(SnekcloudError::from) -} \ No newline at end of file +}