Add support for node list synchronization

Signed-off-by: trivernis <trivernis@protonmail.com>
pull/1/head
trivernis 4 years ago
parent 4635e24a43
commit 3630941962
Signed by: Trivernis
GPG Key ID: DFFFCC2C7A02DB45

7
Cargo.lock generated

@ -851,13 +851,16 @@ dependencies = [
"env_logger",
"glob",
"hostname",
"lazy_static",
"log",
"mac_address",
"num_cpus",
"parking_lot",
"rand",
"rusqlite",
"scheduled-thread-pool",
"serde 1.0.117",
"serde_json",
"structopt",
"toml",
"vented",
@ -1023,9 +1026,9 @@ checksum = "f1bddf1187be692e79c5ffeab891132dfb0f236ed36a43c7ed39f1165ee20191"
[[package]]
name = "vented"
version = "0.4.3"
version = "0.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "576943d091e907e201d53816e80cd1bdca2320bb9a33754dda66e6cc730f0d27"
checksum = "03ea719ca6db23a76ee5d0f8ebbcd615d9d4954af88b7a88977b9d5a7d469a85"
dependencies = [
"byteorder",
"crossbeam-utils",

@ -7,7 +7,7 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
vented = "0.4.3"
vented = "0.6.3"
rusqlite = "0.24.1"
rand = "0.7.3"
base64 = "0.13.0"
@ -24,3 +24,6 @@ config = "0.10.1"
glob = "0.3.0"
scheduled-thread-pool = "0.2.5"
num_cpus = "1.13.0"
lazy_static = "1.4.0"
parking_lot = "0.11.0"
serde_json = "1.0.59"

@ -14,6 +14,15 @@ pub struct NodeData {
}
impl NodeData {
pub fn new(id: String, public_key: PublicKey) -> Self {
let public_key = armor_public_key(public_key);
Self {
id,
addresses: Vec::with_capacity(0),
public_key
}
}
pub fn with_addresses(id: String, addresses: Vec<String>, public_key: PublicKey) -> Self {
let public_key = armor_public_key(public_key);
Self {

@ -9,6 +9,10 @@ use structopt::StructOpt;
use vented::crypto::SecretKey;
use crate::data::node_data::NodeData;
use crate::modules::heartbeat::HeartbeatModule;
use crate::modules::nodes_refresh::NodesRefreshModule;
#[macro_use]
extern crate lazy_static;
pub(crate) mod utils;
pub(crate) mod server;
@ -48,7 +52,7 @@ struct WriteInfoFileOptions {
fn main() -> SnekcloudResult<()>{
init_logger();
let opt: Opt = Opt::from_args();
let settings = get_settings()?;
let settings = get_settings();
if let Some(command) = opt.sub_command {
match command {
@ -78,6 +82,7 @@ fn start_server(_options: Opt, settings: &Settings) -> SnekcloudResult<()> {
server.add_listen_address(address.clone());
}
server.register_module(HeartbeatModule::new())?;
server.register_module(NodesRefreshModule::new())?;
server.run()?;
Ok(())

@ -4,21 +4,59 @@ use crate::utils::result::SnekcloudResult;
use scheduled_thread_pool::ScheduledThreadPool;
use vented::result::{VentedResult};
use crate::modules::heartbeat::payloads::HeartbeatPayload;
use std::time::{Instant, Duration};
use std::time::{Instant};
use vented::event::Event;
use crate::utils::settings::get_settings;
use crate::modules::heartbeat::settings::HeartbeatSettings;
use std::sync::Arc;
use parking_lot::Mutex;
use std::collections::HashMap;
use serde::{Serialize, Deserialize};
use crate::utils::{write_json_pretty};
pub mod settings;
mod payloads;
const HEARTBEAT_BEAT_EVENT: &str = "heartbeat:beat";
const HEARTBEAT_RATE_SECONDS: u64 = 10;
#[derive(Serialize, Deserialize, Clone, Debug)]
enum NodeState {
Alive,
Dead
}
#[derive(Serialize, Deserialize, Clone, Debug)]
struct NodeInfo {
ping: Option<u64>,
state: NodeState,
}
impl NodeInfo {
fn alive(ping: u64) -> Self {
Self {
ping: Some(ping),
state: NodeState::Alive,
}
}
fn dead() -> Self {
Self {
ping: None,
state: NodeState::Dead,
}
}
}
pub struct HeartbeatModule {
last_tick: Instant,
settings: HeartbeatSettings,
node_states: Arc<Mutex<HashMap<String, Vec<NodeInfo>>>>,
}
impl HeartbeatModule {
pub fn new() -> Self {
Self {
last_tick: Instant::now()
last_tick: Instant::now(),
settings: get_settings().modules.heartbeat,
node_states: Arc::new(Mutex::new(HashMap::new()))
}
}
}
@ -28,13 +66,35 @@ impl Module for HeartbeatModule {
"HeartbeatModule".to_string()
}
fn init(&mut self, server: &mut VentedServer, _: &mut ScheduledThreadPool) -> SnekcloudResult<()> {
server.on(HEARTBEAT_BEAT_EVENT, |event| {
let payload = event.get_payload::<HeartbeatPayload>().unwrap();
log::debug!("Latency to node {} is {} ms", payload.node_id, payload.get_beat_time().elapsed().unwrap().as_millis());
fn init(&mut self, server: &mut VentedServer, pool: &mut ScheduledThreadPool) -> SnekcloudResult<()> {
server.on(HEARTBEAT_BEAT_EVENT, {
let node_states = Arc::clone(&self.node_states);
move |event| {
let payload = event.get_payload::<HeartbeatPayload>().unwrap();
let latency = payload.get_beat_time().elapsed().ok()?.as_millis();
log::debug!("Latency to node {} is {} ms", payload.node_id, latency);
None
let mut states = node_states.lock();
Self::insert_state(&mut states, payload.node_id, NodeInfo::alive(latency as u64));
None
}
});
if let Some(output) = &self.settings.output_file {
pool.execute_at_fixed_rate(self.settings.interval(), self.settings.interval(), {
let path = output.clone();
let states = Arc::clone(&self.node_states);
move || {
let states = states.lock();
if let Err(e) = write_json_pretty(&path, &*states) {
log::error!("Failed to write output states to file: {}", e)
}
}
});
}
Ok(())
}
@ -44,10 +104,12 @@ impl Module for HeartbeatModule {
}
fn tick(&mut self, server: &mut VentedServer, _: &mut ScheduledThreadPool) -> VentedResult<()> {
if self.last_tick.elapsed() > Duration::from_secs(HEARTBEAT_RATE_SECONDS) {
if self.last_tick.elapsed() > self.settings.interval() {
for node in server.nodes() {
if let Err(e) = server.emit(node.id.clone(), Event::with_payload(HEARTBEAT_BEAT_EVENT, &HeartbeatPayload::now(server.node_id()))) {
log::warn!("Node {} is not reachable: {}", node.id, e)
log::debug!("Node {} is not reachable: {}", node.id, e);
let mut states = self.node_states.lock();
Self::insert_state(&mut states, node.id, NodeInfo::dead());
}
}
self.last_tick = Instant::now();
@ -56,3 +118,17 @@ impl Module for HeartbeatModule {
Ok(())
}
}
impl HeartbeatModule {
fn insert_state(states: &mut HashMap<String, Vec<NodeInfo>>, id: String, state: NodeInfo) {
lazy_static! {static ref MAX_RECORDS: usize = get_settings().modules.heartbeat.max_record_history;}
if let Some(states) = states.get_mut(&id) {
if states.len() > *MAX_RECORDS {
states.remove(0);
}
states.push(state);
} else {
states.insert(id, vec![state]);
}
}
}

@ -0,0 +1,26 @@
use std::path::PathBuf;
use serde::{Serialize, Deserialize};
use std::time::Duration;
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct HeartbeatSettings {
pub output_file: Option<PathBuf>,
pub interval_ms: u64,
pub max_record_history: usize,
}
impl Default for HeartbeatSettings {
fn default() -> Self {
Self {
output_file: None,
interval_ms: 10000,
max_record_history: 10,
}
}
}
impl HeartbeatSettings {
pub fn interval(&self) -> Duration {
Duration::from_millis(self.interval_ms as u64)
}
}

@ -4,6 +4,7 @@ use scheduled_thread_pool::ScheduledThreadPool;
use vented::result::VentedResult;
pub mod heartbeat;
pub mod nodes_refresh;
pub trait Module {
fn name(&self) -> String;

@ -0,0 +1,121 @@
use crate::modules::Module;
use vented::result::VentedResult;
use vented::server::VentedServer;
use crate::utils::result::SnekcloudResult;
use scheduled_thread_pool::ScheduledThreadPool;
use vented::server::server_events::{ NodeListPayload};
use vented::server::data::Node;
use std::sync::Arc;
use parking_lot::Mutex;
use std::collections::{HashMap};
use std::sync::atomic::{AtomicBool, Ordering};
use vented::crypto::PublicKey;
use std::time::{Instant, Duration, UNIX_EPOCH};
use crate::utils::settings::get_settings;
use crate::data::node_data::NodeData;
use std::path::PathBuf;
use crate::modules::nodes_refresh::settings::NodesRefreshSettings;
pub mod settings;
pub struct NodesRefreshModule {
nodes: Arc<Mutex<HashMap<String, Node>>>,
update_required: Arc<AtomicBool>,
last_request: Instant,
settings: NodesRefreshSettings,
}
impl Module for NodesRefreshModule {
fn name(&self) -> String {
"node_list_refresh".to_string()
}
fn init(&mut self, server: &mut VentedServer, pool: &mut ScheduledThreadPool) -> SnekcloudResult<()> {
{
let mut node_list = self.nodes.lock();
for node in server.nodes() {
node_list.insert(node.id.clone(), node);
}
}
server.on("conn:node_list", {
let nodes = Arc::clone(&self.nodes);
let update_required = Arc::clone(&self.update_required);
move |event| {
let mut nodes = nodes.lock();
let mut new_nodes = false;
for node in event.get_payload::<NodeListPayload>().ok()?.nodes {
if !nodes.contains_key(&node.id) {
nodes.insert(node.id.clone(), Node {
id: node.id,
trusted: false,
public_key: PublicKey::from(node.public_key),
address: node.address,
});
new_nodes = true;
}
}
if new_nodes {
update_required.store(true, Ordering::Relaxed)
}
None
}
});
pool.execute_at_fixed_rate(Duration::from_secs(10), self.settings.update_interval(), {
let nodes = Arc::clone(&self.nodes);
let update_required = Arc::clone(&self.update_required);
move || {
if update_required.load(Ordering::Relaxed) {
let nodes_folder = get_settings().node_data_dir;
nodes.lock().values().cloned().map(|node| {
if let Some(address) = node.address {
NodeData::with_addresses(node.id, vec![address], node.public_key)
} else {
NodeData::new(node.id, node.public_key)
}
}).for_each(|data| {
let mut path = nodes_folder.clone();
path.push(PathBuf::from(format!("{}.toml", data.id)));
if let Err(e) = data.write_to_file(path) {
log::error!("Failed to write updated node data: {}", e);
}
});
}
}
});
Ok(())
}
fn boxed(self) -> Box<dyn Module> {
Box::new(self)
}
fn tick(&mut self, server: &mut VentedServer, _: &mut ScheduledThreadPool) -> VentedResult<()> {
if self.last_request.elapsed() > self.settings.update_interval() {
if let Err(e) = server.request_node_list() {
log::debug!("Failed to refresh node list: {}", e);
}
self.last_request = Instant::now();
}
Ok(())
}
}
impl NodesRefreshModule {
pub fn new() -> Self {
let null_time = Instant::now() - UNIX_EPOCH.elapsed().unwrap();
Self {
nodes: Arc::new(Mutex::new(HashMap::new())),
settings: get_settings().modules.nodes_refresh,
last_request: null_time,
update_required: Arc::new(AtomicBool::new(false))
}
}
}

@ -0,0 +1,21 @@
use serde::{Serialize, Deserialize};
use std::time::Duration;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct NodesRefreshSettings {
pub update_interval_ms: u64,
}
impl Default for NodesRefreshSettings {
fn default() -> Self {
Self {
update_interval_ms: 3600000
}
}
}
impl NodesRefreshSettings {
pub fn update_interval(&self) -> Duration {
Duration::from_millis(self.update_interval_ms)
}
}

@ -4,6 +4,7 @@ use crate::utils::result::{SnekcloudResult, SnekcloudError};
use vented::server::data::Node;
use crate::data::node_data::NodeData;
use std::fs::create_dir;
use crate::utils::settings::get_settings;
const PRIVATE_KEY_HEADER_LINE: &str = "---BEGIN-SNEKCLOUD-PRIVATE-KEY---\n";
const PRIVATE_KEY_FOOTER_LINE: &str = "\n---END-SNEKCLOUD-PRIVATE-KEY---";
@ -17,6 +18,7 @@ pub fn read_node_keys(path: &PathBuf) -> SnekcloudResult<Vec<Node>> {
create_dir(path)?;
}
let dir_content = path.read_dir()?;
let trusted_nodes = get_settings().trusted_nodes;
let content = dir_content
.filter_map(|entry| {
@ -28,7 +30,7 @@ pub fn read_node_keys(path: &PathBuf) -> SnekcloudResult<Vec<Node>> {
.filter_map(|(_, entry)|{
let mut data = NodeData::from_file(entry.path()).ok()?;
Some(Node {public_key: data.public_key(), address: data.addresses.pop(), id: data.id})
Some(Node {public_key: data.public_key(), address: data.addresses.pop(), trusted: trusted_nodes.contains(&data.id), id: data.id})
}).collect();
Ok(content)

@ -37,3 +37,10 @@ pub fn write_toml_pretty<T: Serialize>(path: &PathBuf, value: &T) -> SnekcloudRe
Ok(())
}
pub fn write_json_pretty<T: Serialize>(path: &PathBuf, value: &T) -> SnekcloudResult<()> {
let string_value = serde_json::to_string_pretty(value)?;
fs::write(path, string_value.as_bytes())?;
Ok(())
}

@ -13,6 +13,7 @@ pub enum SnekcloudError {
Base64DecodeError(base64::DecodeError),
TomlDeserializeError(toml::de::Error),
TomlSerializeError(toml::ser::Error),
JsonError(serde_json::error::Error),
InvalidKey,
ConfigError(config::ConfigError),
GlobPatternError(glob::PatternError),
@ -29,6 +30,7 @@ impl fmt::Display for SnekcloudError {
Self::TomlSerializeError(e) => write!(f, "Toml Serialization Error: {}", e),
Self::ConfigError(e) => write!(f, "Config Error: {}",e),
Self::GlobPatternError(e) => write!(f, "Glob Error {}", e),
Self::JsonError(e) => write!(f, "JSON Error: {}", e),
}
}
}
@ -76,3 +78,9 @@ impl From<glob::PatternError> for SnekcloudError {
Self::GlobPatternError(error)
}
}
impl From<serde_json::error::Error> for SnekcloudError {
fn from(error: serde_json::error::Error) -> Self {
Self::JsonError(error)
}
}

@ -4,6 +4,8 @@ use crate::utils::{get_node_id, write_toml_pretty};
use std::fs;
use std::path::{Path, PathBuf};
use config::File;
use crate::modules::heartbeat::settings::HeartbeatSettings;
use crate::modules::nodes_refresh::settings::NodesRefreshSettings;
const CONFIG_DIR: &str = "config/";
@ -17,6 +19,16 @@ pub struct Settings {
pub node_id: String,
pub private_key: PathBuf,
pub node_data_dir: PathBuf,
/// List of trusted nodes
pub trusted_nodes: Vec<String>,
// modules need to be last because it's a table
pub modules: ModuleSettings,
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct ModuleSettings {
pub heartbeat: HeartbeatSettings,
pub nodes_refresh: NodesRefreshSettings,
}
impl Default for Settings {
@ -26,11 +38,30 @@ impl Default for Settings {
node_id: get_node_id(),
private_key: PathBuf::from("node_key"),
node_data_dir: PathBuf::from("nodes"),
trusted_nodes: vec![],
modules: ModuleSettings::default(),
}
}
}
pub fn get_settings() -> SnekcloudResult<Settings> {
impl Default for ModuleSettings {
fn default() -> Self {
Self {
heartbeat: HeartbeatSettings::default(),
nodes_refresh: NodesRefreshSettings::default()
}
}
}
/// Returns the settings that are lazily retrieved at runtime
pub fn get_settings() -> Settings {
lazy_static! { static ref SETTINGS: Settings = load_settings().expect("Failed to get settings"); }
SETTINGS.clone()
}
fn load_settings() -> SnekcloudResult<Settings> {
if !Path::new(CONFIG_DIR).exists() {
fs::create_dir(CONFIG_DIR)?;
}

Loading…
Cancel
Save