Change module execution

Modules are now executed on multiple threads in a fixed interval.
Events and other data is shared via a thread safe tick context object.
Events are sent via a channel that is read in the main thread to send
the messages to the receiver nodes.

Signed-off-by: trivernis <trivernis@protonmail.com>
pull/1/head
trivernis 4 years ago
parent 3630941962
commit 4783aa829d
Signed by: Trivernis
GPG Key ID: DFFFCC2C7A02DB45

4
Cargo.lock generated

@ -1026,9 +1026,9 @@ checksum = "f1bddf1187be692e79c5ffeab891132dfb0f236ed36a43c7ed39f1165ee20191"
[[package]]
name = "vented"
version = "0.6.3"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "03ea719ca6db23a76ee5d0f8ebbcd615d9d4954af88b7a88977b9d5a7d469a85"
checksum = "0bb03c973ba228728f10ab9480ca5b74d8dc3bd57d2da29476efa0d0332b4688"
dependencies = [
"byteorder",
"crossbeam-utils",

@ -7,7 +7,7 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
vented = "0.6.3"
vented = "0.8.1"
rusqlite = "0.24.1"
rand = "0.7.3"
base64 = "0.13.0"

@ -1,28 +1,30 @@
use crate::data::node_data::NodeData;
use crate::modules::heartbeat::HeartbeatModule;
use crate::modules::nodes_refresh::NodesRefreshModule;
use crate::server::SnekcloudServer;
use crate::utils::settings::{Settings, get_settings};
use crate::utils::keys::{extract_private_key, read_node_keys, generate_private_key, armor_private_key};
use std::path::PathBuf;
use crate::utils::result::SnekcloudResult;
use crate::utils::keys::{
armor_private_key, extract_private_key, generate_private_key, read_node_keys,
};
use crate::utils::logging::init_logger;
use crate::utils::result::SnekcloudResult;
use crate::utils::settings::{get_settings, Settings};
use std::fs;
use std::path::PathBuf;
use structopt::StructOpt;
use vented::crypto::SecretKey;
use crate::data::node_data::NodeData;
use crate::modules::heartbeat::HeartbeatModule;
use crate::modules::nodes_refresh::NodesRefreshModule;
#[macro_use]
extern crate lazy_static;
pub(crate) mod utils;
pub(crate) mod server;
pub(crate) mod data;
pub(crate) mod modules;
pub(crate) mod server;
pub(crate) mod utils;
#[derive(StructOpt, Debug)]
struct Opt {
#[structopt(subcommand)]
sub_command: Option<SubCommand>
sub_command: Option<SubCommand>,
}
#[derive(StructOpt, Debug)]
@ -31,25 +33,24 @@ enum SubCommand {
/// Generates a new private key
GenerateKey(GenerateKeyOptions),
WriteInfoFile(WriteInfoFileOptions)
WriteInfoFile(WriteInfoFileOptions),
}
#[derive(StructOpt, Debug)]
struct GenerateKeyOptions {
/// The file the key is stored to
#[structopt(parse(from_os_str))]
output_file: PathBuf
output_file: PathBuf,
}
#[derive(StructOpt, Debug)]
struct WriteInfoFileOptions {
/// The file the info is stored to
#[structopt(parse(from_os_str))]
output_file: PathBuf
output_file: PathBuf,
}
fn main() -> SnekcloudResult<()>{
fn main() -> SnekcloudResult<()> {
init_logger();
let opt: Opt = Opt::from_args();
let settings = get_settings();
@ -60,10 +61,14 @@ fn main() -> SnekcloudResult<()>{
let key = generate_private_key();
let string_content = armor_private_key(key);
fs::write(options.output_file, string_content)?;
},
}
SubCommand::WriteInfoFile(options) => {
let key = get_private_key(&settings)?;
let data = NodeData::with_addresses(settings.node_id, settings.listen_addresses, key.public_key());
let data = NodeData::with_addresses(
settings.node_id,
settings.listen_addresses,
key.public_key(),
);
data.write_to_file(options.output_file)?;
}
}
@ -76,7 +81,12 @@ fn main() -> SnekcloudResult<()>{
fn start_server(_options: Opt, settings: &Settings) -> SnekcloudResult<()> {
let keys = read_node_keys(&settings.node_data_dir)?;
let mut server = SnekcloudServer::new(settings.node_id.clone(), get_private_key(settings)?, keys, 8);
let mut server = SnekcloudServer::new(
settings.node_id.clone(),
get_private_key(settings)?,
keys,
settings.num_threads,
);
for address in &settings.listen_addresses {
server.add_listen_address(address.clone());
@ -90,4 +100,4 @@ fn start_server(_options: Opt, settings: &Settings) -> SnekcloudResult<()> {
fn get_private_key(settings: &Settings) -> SnekcloudResult<SecretKey> {
extract_private_key(&fs::read_to_string(&settings.private_key)?)
}
}

@ -1,27 +1,28 @@
use crate::modules::heartbeat::payloads::HeartbeatPayload;
use crate::modules::heartbeat::settings::HeartbeatSettings;
use crate::modules::Module;
use vented::server::VentedServer;
use crate::server::tick_context::TickContext;
use crate::utils::result::SnekcloudResult;
use scheduled_thread_pool::ScheduledThreadPool;
use vented::result::{VentedResult};
use crate::modules::heartbeat::payloads::HeartbeatPayload;
use std::time::{Instant};
use vented::event::Event;
use crate::utils::settings::get_settings;
use crate::modules::heartbeat::settings::HeartbeatSettings;
use std::sync::Arc;
use crate::utils::write_json_pretty;
use parking_lot::Mutex;
use scheduled_thread_pool::ScheduledThreadPool;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use serde::{Serialize, Deserialize};
use crate::utils::{write_json_pretty};
use std::sync::Arc;
use std::time::{Duration, Instant};
use vented::event::Event;
use vented::result::VentedResult;
use vented::server::VentedServer;
pub mod settings;
mod payloads;
pub mod settings;
const HEARTBEAT_BEAT_EVENT: &str = "heartbeat:beat";
#[derive(Serialize, Deserialize, Clone, Debug)]
enum NodeState {
Alive,
Dead
Dead,
}
#[derive(Serialize, Deserialize, Clone, Debug)]
@ -56,7 +57,7 @@ impl HeartbeatModule {
Self {
last_tick: Instant::now(),
settings: get_settings().modules.heartbeat,
node_states: Arc::new(Mutex::new(HashMap::new()))
node_states: Arc::new(Mutex::new(HashMap::new())),
}
}
}
@ -66,7 +67,11 @@ impl Module for HeartbeatModule {
"HeartbeatModule".to_string()
}
fn init(&mut self, server: &mut VentedServer, pool: &mut ScheduledThreadPool) -> SnekcloudResult<()> {
fn init(
&mut self,
server: &mut VentedServer,
pool: &mut ScheduledThreadPool,
) -> SnekcloudResult<()> {
server.on(HEARTBEAT_BEAT_EVENT, {
let node_states = Arc::clone(&self.node_states);
@ -76,8 +81,11 @@ impl Module for HeartbeatModule {
log::debug!("Latency to node {} is {} ms", payload.node_id, latency);
let mut states = node_states.lock();
Self::insert_state(&mut states, payload.node_id, NodeInfo::alive(latency as u64));
Self::insert_state(
&mut states,
payload.node_id,
NodeInfo::alive(latency as u64),
);
None
}
@ -99,18 +107,34 @@ impl Module for HeartbeatModule {
Ok(())
}
fn boxed(self) -> Box<dyn Module> {
fn boxed(self) -> Box<dyn Module + Send + Sync> {
Box::new(self)
}
fn tick(&mut self, server: &mut VentedServer, _: &mut ScheduledThreadPool) -> VentedResult<()> {
fn tick(
&mut self,
mut context: TickContext,
pool: &mut ScheduledThreadPool,
) -> VentedResult<()> {
if self.last_tick.elapsed() > self.settings.interval() {
for node in server.nodes() {
if let Err(e) = server.emit(node.id.clone(), Event::with_payload(HEARTBEAT_BEAT_EVENT, &HeartbeatPayload::now(server.node_id()))) {
log::debug!("Node {} is not reachable: {}", node.id, e);
let mut states = self.node_states.lock();
Self::insert_state(&mut states, node.id, NodeInfo::dead());
}
for node in context.nodes() {
let mut future = context.emit(
node.id.clone(),
Event::with_payload(
HEARTBEAT_BEAT_EVENT,
&HeartbeatPayload::now(context.node_id().clone()),
),
);
let states = Arc::clone(&self.node_states);
pool.execute(move || {
if let Some(value) = future.get_value_with_timeout(Duration::from_secs(10)) {
if let Err(e) = &*value {
log::debug!("Node {} is not reachable: {}", node.id, e);
let mut states = states.lock();
Self::insert_state(&mut states, node.id, NodeInfo::dead());
}
}
});
}
self.last_tick = Instant::now();
}
@ -121,7 +145,9 @@ impl Module for HeartbeatModule {
impl HeartbeatModule {
fn insert_state(states: &mut HashMap<String, Vec<NodeInfo>>, id: String, state: NodeInfo) {
lazy_static! {static ref MAX_RECORDS: usize = get_settings().modules.heartbeat.max_record_history;}
lazy_static! {
static ref MAX_RECORDS: usize = get_settings().modules.heartbeat.max_record_history;
}
if let Some(states) = states.get_mut(&id) {
if states.len() > *MAX_RECORDS {
states.remove(0);
@ -131,4 +157,4 @@ impl HeartbeatModule {
states.insert(id, vec![state]);
}
}
}
}

@ -2,6 +2,7 @@ use vented::server::VentedServer;
use crate::utils::result::SnekcloudResult;
use scheduled_thread_pool::ScheduledThreadPool;
use vented::result::VentedResult;
use crate::server::tick_context::TickContext;
pub mod heartbeat;
pub mod nodes_refresh;
@ -9,6 +10,6 @@ pub mod nodes_refresh;
pub trait Module {
fn name(&self) -> String;
fn init(&mut self, server: &mut VentedServer, pool: &mut ScheduledThreadPool) -> SnekcloudResult<()>;
fn boxed(self) -> Box<dyn Module>;
fn tick(&mut self, server: &mut VentedServer, pool: &mut ScheduledThreadPool) -> VentedResult<()>;
fn boxed(self) -> Box<dyn Module + Send + Sync>;
fn tick(&mut self, context: TickContext, pool: &mut ScheduledThreadPool) -> VentedResult<()>;
}

@ -3,7 +3,7 @@ use vented::result::VentedResult;
use vented::server::VentedServer;
use crate::utils::result::SnekcloudResult;
use scheduled_thread_pool::ScheduledThreadPool;
use vented::server::server_events::{ NodeListPayload};
use vented::server::server_events::{NodeListPayload, NODE_LIST_REQUEST_EVENT};
use vented::server::data::Node;
use std::sync::Arc;
use parking_lot::Mutex;
@ -15,6 +15,8 @@ use crate::utils::settings::get_settings;
use crate::data::node_data::NodeData;
use std::path::PathBuf;
use crate::modules::nodes_refresh::settings::NodesRefreshSettings;
use crate::server::tick_context::TickContext;
use vented::event::Event;
pub mod settings;
@ -92,15 +94,15 @@ impl Module for NodesRefreshModule {
Ok(())
}
fn boxed(self) -> Box<dyn Module> {
fn boxed(self) -> Box<dyn Module + Send + Sync> {
Box::new(self)
}
fn tick(&mut self, server: &mut VentedServer, _: &mut ScheduledThreadPool) -> VentedResult<()> {
fn tick(&mut self, mut context: TickContext, _: &mut ScheduledThreadPool) -> VentedResult<()> {
if self.last_request.elapsed() > self.settings.update_interval() {
if let Err(e) = server.request_node_list() {
log::debug!("Failed to refresh node list: {}", e);
}
context.nodes().iter().filter(|node| node.trusted).for_each(|node| {
context.emit(node.id.clone(), Event::new(NODE_LIST_REQUEST_EVENT));
});
self.last_request = Instant::now();
}

@ -1,13 +1,19 @@
use vented::server::VentedServer;
use crate::utils::result::SnekcloudResult;
use vented::crypto::SecretKey;
use vented::server::data::Node;
use vented::WaitGroup;
use crate::modules::Module;
use crate::server::tick_context::TickContext;
use crate::utils::result::{SnekcloudError, SnekcloudResult};
use parking_lot::Mutex;
use scheduled_thread_pool::ScheduledThreadPool;
use std::collections::HashMap;
use std::thread;
use std::time::{Duration, Instant};
use std::mem;
use std::sync::mpsc::channel;
use std::sync::Arc;
use std::time::Duration;
use vented::crypto::SecretKey;
use vented::server::data::Node;
use vented::server::VentedServer;
use vented::WaitGroup;
pub mod tick_context;
const SERVER_TICK_RATE_MS: u64 = 10;
@ -15,8 +21,8 @@ pub struct SnekcloudServer {
inner: VentedServer,
listen_addresses: Vec<String>,
listeners: Vec<WaitGroup>,
module_pool: ScheduledThreadPool,
modules: HashMap<String, Box<dyn Module>>
module_pool: Arc<Mutex<ScheduledThreadPool>>,
modules: HashMap<String, Box<dyn Module + Send + Sync>>,
}
impl SnekcloudServer {
@ -26,8 +32,11 @@ impl SnekcloudServer {
inner: VentedServer::new(id, private_key, keys, num_threads),
listen_addresses: Vec::new(),
listeners: Vec::new(),
module_pool: ScheduledThreadPool::with_name("modules", num_threads),
modules: HashMap::new()
module_pool: Arc::new(Mutex::new(ScheduledThreadPool::with_name(
"modules",
num_threads,
))),
modules: HashMap::new(),
}
}
@ -41,28 +50,53 @@ impl SnekcloudServer {
for address in &self.listen_addresses {
self.listeners.push(self.inner.listen(address.clone()))
}
let sleep_duration = Duration::from_millis(SERVER_TICK_RATE_MS);
loop {
let start = Instant::now();
for (name, module) in &mut self.modules {
if let Err(e) = module.tick(&mut self.inner, &mut self.module_pool) {
log::error!("Error when ticking module {}: {}", name, e);
}
}
let elapsed = start.elapsed();
if elapsed < sleep_duration {
thread::sleep( sleep_duration - elapsed);
} else {
log::warn!("Can't keep up. Last tick took {} ms", elapsed.as_millis())
let modules = mem::take(&mut self.modules);
let module_pool = Arc::clone(&self.module_pool);
let (tx, rx) = channel();
let tick_context = TickContext::new(self.inner.node_id(), tx, self.inner.nodes_ref());
for (name, mut module) in modules {
module_pool.lock().execute_at_fixed_rate(
Duration::from_millis(SERVER_TICK_RATE_MS),
Duration::from_millis(SERVER_TICK_RATE_MS),
{
let module_pool = Arc::clone(&module_pool);
let tick_context = TickContext::clone(&tick_context);
move || {
let mut module_pool = module_pool.lock();
if let Err(e) =
module.tick(TickContext::clone(&tick_context), &mut module_pool)
{
log::error!("Error when ticking module {}: {}", name, e);
}
}
},
);
}
for mut invocation in rx {
match self.inner.emit(invocation.target_node, invocation.event) {
Ok(_) => invocation.result.set_value(Arc::new(Ok(()))),
Err(e) => invocation
.result
.set_value(Arc::new(Err(SnekcloudError::from(e)))),
}
}
Ok(())
}
/// Registers a module on the server
pub fn register_module(&mut self, mut module: impl Module) -> SnekcloudResult<()> {
module.init(&mut self.inner, &mut self.module_pool)?;
pub fn register_module(
&mut self,
mut module: impl Module + Send + Sync,
) -> SnekcloudResult<()> {
let mut module_pool = self.module_pool.lock();
module.init(&mut self.inner, &mut module_pool)?;
self.modules.insert(module.name(), module.boxed());
Ok(())
}
}
}

@ -0,0 +1,51 @@
use std::sync::mpsc::Sender;
use vented::event::Event;
use std::sync::Arc;
use crate::utils::result::SnekcloudResult;
use vented::server::data::{AsyncValue, Node};
use parking_lot::Mutex;
use std::collections::HashMap;
#[derive(Clone)]
pub struct TickContext {
nodes: Arc<Mutex<HashMap<String, Node>>>,
event_sender: Sender<EventInvocation>,
node_id: String,
}
pub struct EventInvocation {
pub result: AsyncValue<Arc<SnekcloudResult<()>>>,
pub event: Event,
pub target_node: String,
}
impl TickContext {
pub fn new(node_id: String, sender: Sender<EventInvocation>, nodes: Arc<Mutex<HashMap<String, Node>>>) -> Self {
Self {
nodes,
node_id,
event_sender: sender,
}
}
pub fn emit<S: ToString>(&mut self, target_node: S, event: Event) -> AsyncValue<Arc<SnekcloudResult<()>>> {
let value = AsyncValue::new();
self.event_sender.send(EventInvocation {
event,
target_node: target_node.to_string(),
result: AsyncValue::clone(&value),
}).unwrap();
value
}
/// Returns a copy of the nodes of the server
pub fn nodes(&self) -> Vec<Node> {
self.nodes.lock().values().cloned().collect()
}
/// Returns the node
pub fn node_id(&self) -> &String {
&self.node_id
}
}

@ -1,10 +1,10 @@
use std::path::{PathBuf, Path};
use vented::crypto::{SecretKey, PublicKey};
use crate::utils::result::{SnekcloudResult, SnekcloudError};
use vented::server::data::Node;
use crate::data::node_data::NodeData;
use std::fs::create_dir;
use crate::utils::result::{SnekcloudError, SnekcloudResult};
use crate::utils::settings::get_settings;
use std::fs::create_dir;
use std::path::{Path, PathBuf};
use vented::crypto::{PublicKey, SecretKey};
use vented::server::data::Node;
const PRIVATE_KEY_HEADER_LINE: &str = "---BEGIN-SNEKCLOUD-PRIVATE-KEY---\n";
const PRIVATE_KEY_FOOTER_LINE: &str = "\n---END-SNEKCLOUD-PRIVATE-KEY---";
@ -17,21 +17,20 @@ pub fn read_node_keys(path: &PathBuf) -> SnekcloudResult<Vec<Node>> {
if !Path::new(path).exists() {
create_dir(path)?;
}
let dir_content = path.read_dir()?;
let trusted_nodes = get_settings().trusted_nodes;
let content = dir_content
.filter_map(|entry| {
let entry = entry.ok()?;
let content = glob::glob(format!("{}/*.toml", path.to_string_lossy()).as_str())?
.filter_map(|path| {
let mut data = NodeData::from_file(path.ok()?).ok()?;
Some((entry.metadata().ok()?, entry))
Some(Node {
public_key: data.public_key(),
address: data.addresses.pop(),
trusted: trusted_nodes.contains(&data.id),
id: data.id,
})
})
.filter(|(meta, _)|meta.is_file())
.filter_map(|(_, entry)|{
let mut data = NodeData::from_file(entry.path()).ok()?;
Some(Node {public_key: data.public_key(), address: data.addresses.pop(), trusted: trusted_nodes.contains(&data.id), id: data.id})
}).collect();
.collect();
Ok(content)
}
@ -52,8 +51,12 @@ pub fn extract_public_key(content: &str) -> SnekcloudResult<PublicKey> {
/// Extracts a base64 encoded key between the prefix and suffix
fn extract_key(content: &str, prefix: &str, suffix: &str) -> SnekcloudResult<[u8; 32]> {
let mut content = content.strip_prefix(prefix).ok_or(SnekcloudError::InvalidKey)?;
content = content.strip_suffix(suffix).ok_or(SnekcloudError::InvalidKey)?;
let mut content = content
.strip_prefix(prefix)
.ok_or(SnekcloudError::InvalidKey)?;
content = content
.strip_suffix(suffix)
.ok_or(SnekcloudError::InvalidKey)?;
let key = base64::decode(content)?;
if key.len() != 32 {
@ -67,12 +70,20 @@ fn extract_key(content: &str, prefix: &str, suffix: &str) -> SnekcloudResult<[u8
/// Encodes and encases the public key for text representation
pub fn armor_public_key(key: PublicKey) -> String {
armor_key(key.to_bytes(), PUBLIC_KEY_HEADER_LINE, PUBLIC_KEY_FOOTER_LINE)
armor_key(
key.to_bytes(),
PUBLIC_KEY_HEADER_LINE,
PUBLIC_KEY_FOOTER_LINE,
)
}
/// Encodes and encases the secret key for text representation
pub fn armor_private_key(key: SecretKey) -> String {
armor_key(key.to_bytes(), PRIVATE_KEY_HEADER_LINE, PRIVATE_KEY_FOOTER_LINE)
armor_key(
key.to_bytes(),
PRIVATE_KEY_HEADER_LINE,
PRIVATE_KEY_FOOTER_LINE,
)
}
/// Returns an armored key
@ -83,4 +94,4 @@ fn armor_key(key: [u8; 32], prefix: &str, suffix: &str) -> String {
/// Generates a new private key
pub fn generate_private_key() -> SecretKey {
SecretKey::generate(&mut rand::thread_rng())
}
}

@ -1,17 +1,16 @@
use crate::utils::result::{SnekcloudResult, SnekcloudError};
use serde::{Serialize, Deserialize};
use crate::modules::heartbeat::settings::HeartbeatSettings;
use crate::modules::nodes_refresh::settings::NodesRefreshSettings;
use crate::utils::result::{SnekcloudError, SnekcloudResult};
use crate::utils::{get_node_id, write_toml_pretty};
use config::File;
use serde::{Deserialize, Serialize};
use std::fs;
use std::path::{Path, PathBuf};
use config::File;
use crate::modules::heartbeat::settings::HeartbeatSettings;
use crate::modules::nodes_refresh::settings::NodesRefreshSettings;
const CONFIG_DIR: &str = "config/";
const DEFAULT_CONFIG: &str = "config/00_default.toml";
const GLOB_CONFIG: &str = "config/*.toml";
const ENV_PREFIX : &str = "SNEKCLOUD";
const ENV_PREFIX: &str = "SNEKCLOUD";
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct Settings {
@ -19,6 +18,7 @@ pub struct Settings {
pub node_id: String,
pub private_key: PathBuf,
pub node_data_dir: PathBuf,
pub num_threads: usize,
/// List of trusted nodes
pub trusted_nodes: Vec<String>,
// modules need to be last because it's a table
@ -39,6 +39,7 @@ impl Default for Settings {
private_key: PathBuf::from("node_key"),
node_data_dir: PathBuf::from("nodes"),
trusted_nodes: vec![],
num_threads: num_cpus::get(),
modules: ModuleSettings::default(),
}
}
@ -48,15 +49,16 @@ impl Default for ModuleSettings {
fn default() -> Self {
Self {
heartbeat: HeartbeatSettings::default(),
nodes_refresh: NodesRefreshSettings::default()
nodes_refresh: NodesRefreshSettings::default(),
}
}
}
/// Returns the settings that are lazily retrieved at runtime
pub fn get_settings() -> Settings {
lazy_static! { static ref SETTINGS: Settings = load_settings().expect("Failed to get settings"); }
lazy_static! {
static ref SETTINGS: Settings = load_settings().expect("Failed to get settings");
}
SETTINGS.clone()
}
@ -70,10 +72,12 @@ fn load_settings() -> SnekcloudResult<Settings> {
let mut settings = config::Config::default();
settings
.merge(config::File::with_name(DEFAULT_CONFIG))?
.merge(glob::glob(GLOB_CONFIG)?.map(|path| File::from(path.unwrap()))
.collect::<Vec<_>>())?
.merge(
glob::glob(GLOB_CONFIG)?
.map(|path| File::from(path.unwrap()))
.collect::<Vec<_>>(),
)?
.merge(config::Environment::with_prefix(ENV_PREFIX))?;
settings.try_into().map_err(SnekcloudError::from)
}
}

Loading…
Cancel
Save