diff --git a/Cargo.lock b/Cargo.lock index a8f945c..d741236 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -851,13 +851,16 @@ dependencies = [ "env_logger", "glob", "hostname", + "lazy_static", "log", "mac_address", "num_cpus", + "parking_lot", "rand", "rusqlite", "scheduled-thread-pool", "serde 1.0.117", + "serde_json", "structopt", "toml", "vented", @@ -1023,9 +1026,9 @@ checksum = "f1bddf1187be692e79c5ffeab891132dfb0f236ed36a43c7ed39f1165ee20191" [[package]] name = "vented" -version = "0.4.3" +version = "0.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "576943d091e907e201d53816e80cd1bdca2320bb9a33754dda66e6cc730f0d27" +checksum = "03ea719ca6db23a76ee5d0f8ebbcd615d9d4954af88b7a88977b9d5a7d469a85" dependencies = [ "byteorder", "crossbeam-utils", diff --git a/Cargo.toml b/Cargo.toml index a508fac..f2560f8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,7 +7,7 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -vented = "0.4.3" +vented = "0.6.3" rusqlite = "0.24.1" rand = "0.7.3" base64 = "0.13.0" @@ -23,4 +23,7 @@ serde = { version = "1.0.117", features = ["serde_derive"] } config = "0.10.1" glob = "0.3.0" scheduled-thread-pool = "0.2.5" -num_cpus = "1.13.0" \ No newline at end of file +num_cpus = "1.13.0" +lazy_static = "1.4.0" +parking_lot = "0.11.0" +serde_json = "1.0.59" \ No newline at end of file diff --git a/src/data/node_data.rs b/src/data/node_data.rs index 3cefabd..b9b6d06 100644 --- a/src/data/node_data.rs +++ b/src/data/node_data.rs @@ -14,6 +14,15 @@ pub struct NodeData { } impl NodeData { + pub fn new(id: String, public_key: PublicKey) -> Self { + let public_key = armor_public_key(public_key); + Self { + id, + addresses: Vec::with_capacity(0), + public_key + } + } + pub fn with_addresses(id: String, addresses: Vec, public_key: PublicKey) -> Self { let public_key = armor_public_key(public_key); Self { diff --git a/src/main.rs b/src/main.rs index b347fe0..fc8e237 100644 --- a/src/main.rs +++ b/src/main.rs @@ -9,6 +9,10 @@ use structopt::StructOpt; use vented::crypto::SecretKey; use crate::data::node_data::NodeData; use crate::modules::heartbeat::HeartbeatModule; +use crate::modules::nodes_refresh::NodesRefreshModule; + +#[macro_use] +extern crate lazy_static; pub(crate) mod utils; pub(crate) mod server; @@ -48,7 +52,7 @@ struct WriteInfoFileOptions { fn main() -> SnekcloudResult<()>{ init_logger(); let opt: Opt = Opt::from_args(); - let settings = get_settings()?; + let settings = get_settings(); if let Some(command) = opt.sub_command { match command { @@ -78,6 +82,7 @@ fn start_server(_options: Opt, settings: &Settings) -> SnekcloudResult<()> { server.add_listen_address(address.clone()); } server.register_module(HeartbeatModule::new())?; + server.register_module(NodesRefreshModule::new())?; server.run()?; Ok(()) diff --git a/src/modules/heartbeat/mod.rs b/src/modules/heartbeat/mod.rs index cf27724..d60611f 100644 --- a/src/modules/heartbeat/mod.rs +++ b/src/modules/heartbeat/mod.rs @@ -4,21 +4,59 @@ use crate::utils::result::SnekcloudResult; use scheduled_thread_pool::ScheduledThreadPool; use vented::result::{VentedResult}; use crate::modules::heartbeat::payloads::HeartbeatPayload; -use std::time::{Instant, Duration}; +use std::time::{Instant}; use vented::event::Event; +use crate::utils::settings::get_settings; +use crate::modules::heartbeat::settings::HeartbeatSettings; +use std::sync::Arc; +use parking_lot::Mutex; +use std::collections::HashMap; +use serde::{Serialize, Deserialize}; +use crate::utils::{write_json_pretty}; +pub mod settings; mod payloads; const HEARTBEAT_BEAT_EVENT: &str = "heartbeat:beat"; -const HEARTBEAT_RATE_SECONDS: u64 = 10; + +#[derive(Serialize, Deserialize, Clone, Debug)] +enum NodeState { + Alive, + Dead +} + +#[derive(Serialize, Deserialize, Clone, Debug)] +struct NodeInfo { + ping: Option, + state: NodeState, +} + +impl NodeInfo { + fn alive(ping: u64) -> Self { + Self { + ping: Some(ping), + state: NodeState::Alive, + } + } + fn dead() -> Self { + Self { + ping: None, + state: NodeState::Dead, + } + } +} pub struct HeartbeatModule { last_tick: Instant, + settings: HeartbeatSettings, + node_states: Arc>>>, } impl HeartbeatModule { pub fn new() -> Self { Self { - last_tick: Instant::now() + last_tick: Instant::now(), + settings: get_settings().modules.heartbeat, + node_states: Arc::new(Mutex::new(HashMap::new())) } } } @@ -28,13 +66,35 @@ impl Module for HeartbeatModule { "HeartbeatModule".to_string() } - fn init(&mut self, server: &mut VentedServer, _: &mut ScheduledThreadPool) -> SnekcloudResult<()> { - server.on(HEARTBEAT_BEAT_EVENT, |event| { - let payload = event.get_payload::().unwrap(); - log::debug!("Latency to node {} is {} ms", payload.node_id, payload.get_beat_time().elapsed().unwrap().as_millis()); + fn init(&mut self, server: &mut VentedServer, pool: &mut ScheduledThreadPool) -> SnekcloudResult<()> { + server.on(HEARTBEAT_BEAT_EVENT, { + let node_states = Arc::clone(&self.node_states); + + move |event| { + let payload = event.get_payload::().unwrap(); + let latency = payload.get_beat_time().elapsed().ok()?.as_millis(); + log::debug!("Latency to node {} is {} ms", payload.node_id, latency); - None + let mut states = node_states.lock(); + Self::insert_state(&mut states, payload.node_id, NodeInfo::alive(latency as u64)); + + + None + } }); + if let Some(output) = &self.settings.output_file { + pool.execute_at_fixed_rate(self.settings.interval(), self.settings.interval(), { + let path = output.clone(); + let states = Arc::clone(&self.node_states); + move || { + let states = states.lock(); + + if let Err(e) = write_json_pretty(&path, &*states) { + log::error!("Failed to write output states to file: {}", e) + } + } + }); + } Ok(()) } @@ -44,10 +104,12 @@ impl Module for HeartbeatModule { } fn tick(&mut self, server: &mut VentedServer, _: &mut ScheduledThreadPool) -> VentedResult<()> { - if self.last_tick.elapsed() > Duration::from_secs(HEARTBEAT_RATE_SECONDS) { + if self.last_tick.elapsed() > self.settings.interval() { for node in server.nodes() { if let Err(e) = server.emit(node.id.clone(), Event::with_payload(HEARTBEAT_BEAT_EVENT, &HeartbeatPayload::now(server.node_id()))) { - log::warn!("Node {} is not reachable: {}", node.id, e) + log::debug!("Node {} is not reachable: {}", node.id, e); + let mut states = self.node_states.lock(); + Self::insert_state(&mut states, node.id, NodeInfo::dead()); } } self.last_tick = Instant::now(); @@ -55,4 +117,18 @@ impl Module for HeartbeatModule { Ok(()) } +} + +impl HeartbeatModule { + fn insert_state(states: &mut HashMap>, id: String, state: NodeInfo) { + lazy_static! {static ref MAX_RECORDS: usize = get_settings().modules.heartbeat.max_record_history;} + if let Some(states) = states.get_mut(&id) { + if states.len() > *MAX_RECORDS { + states.remove(0); + } + states.push(state); + } else { + states.insert(id, vec![state]); + } + } } \ No newline at end of file diff --git a/src/modules/heartbeat/settings.rs b/src/modules/heartbeat/settings.rs new file mode 100644 index 0000000..1459002 --- /dev/null +++ b/src/modules/heartbeat/settings.rs @@ -0,0 +1,26 @@ +use std::path::PathBuf; +use serde::{Serialize, Deserialize}; +use std::time::Duration; + +#[derive(Serialize, Deserialize, Clone, Debug)] +pub struct HeartbeatSettings { + pub output_file: Option, + pub interval_ms: u64, + pub max_record_history: usize, +} + +impl Default for HeartbeatSettings { + fn default() -> Self { + Self { + output_file: None, + interval_ms: 10000, + max_record_history: 10, + } + } +} + +impl HeartbeatSettings { + pub fn interval(&self) -> Duration { + Duration::from_millis(self.interval_ms as u64) + } +} \ No newline at end of file diff --git a/src/modules/mod.rs b/src/modules/mod.rs index 4d67c12..e7d728b 100644 --- a/src/modules/mod.rs +++ b/src/modules/mod.rs @@ -4,6 +4,7 @@ use scheduled_thread_pool::ScheduledThreadPool; use vented::result::VentedResult; pub mod heartbeat; +pub mod nodes_refresh; pub trait Module { fn name(&self) -> String; diff --git a/src/modules/nodes_refresh/mod.rs b/src/modules/nodes_refresh/mod.rs new file mode 100644 index 0000000..59baeb5 --- /dev/null +++ b/src/modules/nodes_refresh/mod.rs @@ -0,0 +1,121 @@ +use crate::modules::Module; +use vented::result::VentedResult; +use vented::server::VentedServer; +use crate::utils::result::SnekcloudResult; +use scheduled_thread_pool::ScheduledThreadPool; +use vented::server::server_events::{ NodeListPayload}; +use vented::server::data::Node; +use std::sync::Arc; +use parking_lot::Mutex; +use std::collections::{HashMap}; +use std::sync::atomic::{AtomicBool, Ordering}; +use vented::crypto::PublicKey; +use std::time::{Instant, Duration, UNIX_EPOCH}; +use crate::utils::settings::get_settings; +use crate::data::node_data::NodeData; +use std::path::PathBuf; +use crate::modules::nodes_refresh::settings::NodesRefreshSettings; + +pub mod settings; + +pub struct NodesRefreshModule { + nodes: Arc>>, + update_required: Arc, + last_request: Instant, + settings: NodesRefreshSettings, +} + +impl Module for NodesRefreshModule { + + fn name(&self) -> String { + "node_list_refresh".to_string() + } + + fn init(&mut self, server: &mut VentedServer, pool: &mut ScheduledThreadPool) -> SnekcloudResult<()> { + { + let mut node_list = self.nodes.lock(); + for node in server.nodes() { + node_list.insert(node.id.clone(), node); + } + } + server.on("conn:node_list", { + let nodes = Arc::clone(&self.nodes); + let update_required = Arc::clone(&self.update_required); + + move |event| { + let mut nodes = nodes.lock(); + let mut new_nodes = false; + + for node in event.get_payload::().ok()?.nodes { + if !nodes.contains_key(&node.id) { + nodes.insert(node.id.clone(), Node { + id: node.id, + trusted: false, + public_key: PublicKey::from(node.public_key), + address: node.address, + }); + new_nodes = true; + } + } + + if new_nodes { + update_required.store(true, Ordering::Relaxed) + } + None + } + }); + pool.execute_at_fixed_rate(Duration::from_secs(10), self.settings.update_interval(), { + let nodes = Arc::clone(&self.nodes); + let update_required = Arc::clone(&self.update_required); + + move || { + if update_required.load(Ordering::Relaxed) { + let nodes_folder = get_settings().node_data_dir; + nodes.lock().values().cloned().map(|node| { + if let Some(address) = node.address { + NodeData::with_addresses(node.id, vec![address], node.public_key) + } else { + NodeData::new(node.id, node.public_key) + } + }).for_each(|data| { + let mut path = nodes_folder.clone(); + path.push(PathBuf::from(format!("{}.toml", data.id))); + + if let Err(e) = data.write_to_file(path) { + log::error!("Failed to write updated node data: {}", e); + } + }); + } + } + }); + + Ok(()) + } + + fn boxed(self) -> Box { + Box::new(self) + } + + fn tick(&mut self, server: &mut VentedServer, _: &mut ScheduledThreadPool) -> VentedResult<()> { + if self.last_request.elapsed() > self.settings.update_interval() { + if let Err(e) = server.request_node_list() { + log::debug!("Failed to refresh node list: {}", e); + } + self.last_request = Instant::now(); + } + + Ok(()) + } +} + +impl NodesRefreshModule { + pub fn new() -> Self { + let null_time = Instant::now() - UNIX_EPOCH.elapsed().unwrap(); + Self { + nodes: Arc::new(Mutex::new(HashMap::new())), + settings: get_settings().modules.nodes_refresh, + last_request: null_time, + update_required: Arc::new(AtomicBool::new(false)) + } + } +} \ No newline at end of file diff --git a/src/modules/nodes_refresh/settings.rs b/src/modules/nodes_refresh/settings.rs new file mode 100644 index 0000000..493f41c --- /dev/null +++ b/src/modules/nodes_refresh/settings.rs @@ -0,0 +1,21 @@ +use serde::{Serialize, Deserialize}; +use std::time::Duration; + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct NodesRefreshSettings { + pub update_interval_ms: u64, +} + +impl Default for NodesRefreshSettings { + fn default() -> Self { + Self { + update_interval_ms: 3600000 + } + } +} + +impl NodesRefreshSettings { + pub fn update_interval(&self) -> Duration { + Duration::from_millis(self.update_interval_ms) + } +} \ No newline at end of file diff --git a/src/utils/keys.rs b/src/utils/keys.rs index 54addf8..5863891 100644 --- a/src/utils/keys.rs +++ b/src/utils/keys.rs @@ -4,6 +4,7 @@ use crate::utils::result::{SnekcloudResult, SnekcloudError}; use vented::server::data::Node; use crate::data::node_data::NodeData; use std::fs::create_dir; +use crate::utils::settings::get_settings; const PRIVATE_KEY_HEADER_LINE: &str = "---BEGIN-SNEKCLOUD-PRIVATE-KEY---\n"; const PRIVATE_KEY_FOOTER_LINE: &str = "\n---END-SNEKCLOUD-PRIVATE-KEY---"; @@ -17,6 +18,7 @@ pub fn read_node_keys(path: &PathBuf) -> SnekcloudResult> { create_dir(path)?; } let dir_content = path.read_dir()?; + let trusted_nodes = get_settings().trusted_nodes; let content = dir_content .filter_map(|entry| { @@ -28,7 +30,7 @@ pub fn read_node_keys(path: &PathBuf) -> SnekcloudResult> { .filter_map(|(_, entry)|{ let mut data = NodeData::from_file(entry.path()).ok()?; - Some(Node {public_key: data.public_key(), address: data.addresses.pop(), id: data.id}) + Some(Node {public_key: data.public_key(), address: data.addresses.pop(), trusted: trusted_nodes.contains(&data.id), id: data.id}) }).collect(); Ok(content) diff --git a/src/utils/mod.rs b/src/utils/mod.rs index f81740e..ab985d7 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -35,5 +35,12 @@ pub fn write_toml_pretty(path: &PathBuf, value: &T) -> SnekcloudRe value.serialize(&mut serializer)?; fs::write(path, buf_str.as_bytes())?; + Ok(()) +} + +pub fn write_json_pretty(path: &PathBuf, value: &T) -> SnekcloudResult<()> { + let string_value = serde_json::to_string_pretty(value)?; + fs::write(path, string_value.as_bytes())?; + Ok(()) } \ No newline at end of file diff --git a/src/utils/result.rs b/src/utils/result.rs index e9cd8ee..435d35b 100644 --- a/src/utils/result.rs +++ b/src/utils/result.rs @@ -13,6 +13,7 @@ pub enum SnekcloudError { Base64DecodeError(base64::DecodeError), TomlDeserializeError(toml::de::Error), TomlSerializeError(toml::ser::Error), + JsonError(serde_json::error::Error), InvalidKey, ConfigError(config::ConfigError), GlobPatternError(glob::PatternError), @@ -29,6 +30,7 @@ impl fmt::Display for SnekcloudError { Self::TomlSerializeError(e) => write!(f, "Toml Serialization Error: {}", e), Self::ConfigError(e) => write!(f, "Config Error: {}",e), Self::GlobPatternError(e) => write!(f, "Glob Error {}", e), + Self::JsonError(e) => write!(f, "JSON Error: {}", e), } } } @@ -75,4 +77,10 @@ impl From for SnekcloudError { fn from(error: glob::PatternError) -> Self { Self::GlobPatternError(error) } +} + +impl From for SnekcloudError { + fn from(error: serde_json::error::Error) -> Self { + Self::JsonError(error) + } } \ No newline at end of file diff --git a/src/utils/settings.rs b/src/utils/settings.rs index b89464e..47ea22e 100644 --- a/src/utils/settings.rs +++ b/src/utils/settings.rs @@ -4,6 +4,8 @@ use crate::utils::{get_node_id, write_toml_pretty}; use std::fs; use std::path::{Path, PathBuf}; use config::File; +use crate::modules::heartbeat::settings::HeartbeatSettings; +use crate::modules::nodes_refresh::settings::NodesRefreshSettings; const CONFIG_DIR: &str = "config/"; @@ -17,6 +19,16 @@ pub struct Settings { pub node_id: String, pub private_key: PathBuf, pub node_data_dir: PathBuf, + /// List of trusted nodes + pub trusted_nodes: Vec, + // modules need to be last because it's a table + pub modules: ModuleSettings, +} + +#[derive(Serialize, Deserialize, Clone, Debug)] +pub struct ModuleSettings { + pub heartbeat: HeartbeatSettings, + pub nodes_refresh: NodesRefreshSettings, } impl Default for Settings { @@ -26,11 +38,30 @@ impl Default for Settings { node_id: get_node_id(), private_key: PathBuf::from("node_key"), node_data_dir: PathBuf::from("nodes"), + trusted_nodes: vec![], + modules: ModuleSettings::default(), + } + } +} + +impl Default for ModuleSettings { + fn default() -> Self { + Self { + heartbeat: HeartbeatSettings::default(), + nodes_refresh: NodesRefreshSettings::default() } } } -pub fn get_settings() -> SnekcloudResult { +/// Returns the settings that are lazily retrieved at runtime +pub fn get_settings() -> Settings { + + lazy_static! { static ref SETTINGS: Settings = load_settings().expect("Failed to get settings"); } + + SETTINGS.clone() +} + +fn load_settings() -> SnekcloudResult { if !Path::new(CONFIG_DIR).exists() { fs::create_dir(CONFIG_DIR)?; } @@ -45,4 +76,4 @@ pub fn get_settings() -> SnekcloudResult { settings.try_into().map_err(SnekcloudError::from) -} +} \ No newline at end of file