Add heartbeat module

Signed-off-by: trivernis <trivernis@protonmail.com>
pull/1/head
trivernis 4 years ago
parent 12f82e72f0
commit ab30a684bd
Signed by: Trivernis
GPG Key ID: DFFFCC2C7A02DB45

2
.gitignore vendored

@ -1,6 +1,6 @@
/target
.idea
*.env
test
config
nodes
testdata

16
Cargo.lock generated

@ -519,6 +519,16 @@ dependencies = [
"autocfg",
]
[[package]]
name = "num_cpus"
version = "1.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "05499f3756671c15885fee9034446956fff3f243d6077b91e5767df161f766b3"
dependencies = [
"hermit-abi",
"libc",
]
[[package]]
name = "opaque-debug"
version = "0.3.0"
@ -843,8 +853,10 @@ dependencies = [
"hostname",
"log",
"mac_address",
"num_cpus",
"rand",
"rusqlite",
"scheduled-thread-pool",
"serde 1.0.117",
"structopt",
"toml",
@ -1011,9 +1023,9 @@ checksum = "f1bddf1187be692e79c5ffeab891132dfb0f236ed36a43c7ed39f1165ee20191"
[[package]]
name = "vented"
version = "0.1.2"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b92984cc8ed261509126ebc9b043c814e98f3d8da9dbf2e1d21e072c6906c8a7"
checksum = "3b3526b3689e915e1545250137a9c06875736629ef2def374e78c9755ee32868"
dependencies = [
"byteorder",
"crossbeam-utils",

@ -7,7 +7,7 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
vented = "0.1.2"
vented = "0.3.1"
rusqlite = "0.24.1"
rand = "0.7.3"
base64 = "0.13.0"
@ -21,4 +21,6 @@ chrono = "0.4.19"
toml = "0.5.7"
serde = { version = "1.0.117", features = ["serde_derive"] }
config = "0.10.1"
glob = "0.3.0"
glob = "0.3.0"
scheduled-thread-pool = "0.2.5"
num_cpus = "1.13.0"

@ -8,6 +8,7 @@ use std::fs;
use structopt::StructOpt;
use vented::crypto::SecretKey;
use crate::data::node_data::NodeData;
use crate::modules::heartbeat::HeartbeatModule;
pub(crate) mod utils;
pub(crate) mod server;
@ -76,6 +77,7 @@ fn start_server(_options: Opt, settings: &Settings) -> SnekcloudResult<()> {
for address in &settings.listen_addresses {
server.add_listen_address(address.clone());
}
server.register_module(HeartbeatModule::new())?;
server.run()?;
Ok(())

@ -0,0 +1,58 @@
use crate::modules::Module;
use vented::server::VentedServer;
use crate::utils::result::SnekcloudResult;
use scheduled_thread_pool::ScheduledThreadPool;
use vented::result::{VentedResult};
use crate::modules::heartbeat::payloads::HeartbeatPayload;
use std::time::{Instant, Duration};
use vented::event::Event;
mod payloads;
const HEARTBEAT_BEAT_EVENT: &str = "heartbeat:beat";
const HEARTBEAT_RATE_SECONDS: u64 = 10;
pub struct HeartbeatModule {
last_tick: Instant,
}
impl HeartbeatModule {
pub fn new() -> Self {
Self {
last_tick: Instant::now()
}
}
}
impl Module for HeartbeatModule {
fn name(&self) -> String {
"HeartbeatModule".to_string()
}
fn init(&mut self, server: &mut VentedServer, _: &mut ScheduledThreadPool) -> SnekcloudResult<()> {
server.on(HEARTBEAT_BEAT_EVENT, |event| {
let payload = event.get_payload::<HeartbeatPayload>().unwrap();
log::debug!("Latency to node {} is {} ms", payload.node_id, payload.get_beat_time().elapsed().unwrap().as_millis());
None
});
Ok(())
}
fn boxed(self) -> Box<dyn Module> {
Box::new(self)
}
fn tick(&mut self, server: &mut VentedServer, _: &mut ScheduledThreadPool) -> VentedResult<()> {
if self.last_tick.elapsed() > Duration::from_secs(HEARTBEAT_RATE_SECONDS) {
for node in server.nodes() {
if let Err(e) = server.emit(node.id.clone(), Event::with_payload(HEARTBEAT_BEAT_EVENT, &HeartbeatPayload::now(server.node_id()))) {
log::warn!("Node {} is not reachable: {}", node.id, e)
}
}
self.last_tick = Instant::now();
}
Ok(())
}
}

@ -0,0 +1,24 @@
use serde::{Serialize, Deserialize};
use std::time::{ UNIX_EPOCH, Duration, SystemTime};
use std::ops::Add;
#[derive(Serialize, Deserialize)]
pub struct HeartbeatPayload {
pub node_id: String,
beat_at: u64,
}
impl HeartbeatPayload {
pub fn now(node_id: String) -> Self {
let start = SystemTime::now();
Self {
node_id,
beat_at: start.duration_since(UNIX_EPOCH).unwrap().as_millis() as u64
}
}
pub fn get_beat_time(&self) -> SystemTime {
UNIX_EPOCH.add(Duration::from_millis(self.beat_at))
}
}

@ -1,6 +1,13 @@
use vented::server::VentedServer;
use crate::utils::result::SnekcloudResult;
use scheduled_thread_pool::ScheduledThreadPool;
use vented::result::VentedResult;
pub mod heartbeat;
pub trait Module {
fn init(&mut self, server: &VentedServer) -> SnekcloudResult<()>;
fn name(&self) -> String;
fn init(&mut self, server: &mut VentedServer, pool: &mut ScheduledThreadPool) -> SnekcloudResult<()>;
fn boxed(self) -> Box<dyn Module>;
fn tick(&mut self, server: &mut VentedServer, pool: &mut ScheduledThreadPool) -> VentedResult<()>;
}

@ -4,11 +4,19 @@ use vented::crypto::SecretKey;
use vented::server::data::Node;
use vented::WaitGroup;
use crate::modules::Module;
use scheduled_thread_pool::ScheduledThreadPool;
use std::collections::HashMap;
use std::thread;
use std::time::{Duration, Instant};
const SERVER_TICK_RATE_MS: u64 = 1;
pub struct SnekcloudServer {
inner: VentedServer,
listen_addresses: Vec<String>,
listeners: Vec<WaitGroup>,
module_pool: ScheduledThreadPool,
modules: HashMap<String, Box<dyn Module>>
}
impl SnekcloudServer {
@ -18,6 +26,8 @@ impl SnekcloudServer {
inner: VentedServer::new(id, private_key, keys, num_threads),
listen_addresses: Vec::new(),
listeners: Vec::new(),
module_pool: ScheduledThreadPool::with_name("modules", num_threads),
modules: HashMap::new()
}
}
@ -26,17 +36,33 @@ impl SnekcloudServer {
self.listen_addresses.push(address);
}
/// Starts listening on all addresses
/// Starts listening on all addresses and runs the module tick loop
pub fn run(&mut self) -> SnekcloudResult<()> {
for address in &self.listen_addresses {
self.listeners.push(self.inner.listen(address.clone()))
}
Ok(())
let sleep_duration = Duration::from_millis(SERVER_TICK_RATE_MS);
loop {
let start = Instant::now();
for (name, module) in &mut self.modules {
if let Err(e) = module.tick(&mut self.inner, &mut self.module_pool) {
log::error!("Error when ticking module {}: {}", name, e);
}
}
let elapsed = start.elapsed();
if elapsed < sleep_duration {
thread::sleep( sleep_duration - elapsed);
} else {
log::warn!("Can't keep up. Last tick took {} ms", elapsed.as_millis())
}
}
}
/// Registers a module on the server
pub fn register_module(&mut self, module: &mut impl Module) -> SnekcloudResult<()> {
module.init(&mut self.inner)
pub fn register_module(&mut self, mut module: impl Module) -> SnekcloudResult<()> {
module.init(&mut self.inner, &mut self.module_pool)?;
self.modules.insert(module.name(), module.boxed());
Ok(())
}
}

@ -13,7 +13,7 @@ pub fn init_logger() {
let color = get_level_style(record.level());
writeln!(
buf,
"{:<12} {:<45}| {} {}: {}",
"{:<25} {:<45}| {} {}: {}",
format!("thread::{}", thread::current().name().unwrap_or("main")).dimmed(),
record.target().dimmed().italic(),
Local::now().format("%Y-%m-%dT%H:%M:%S"),

@ -22,7 +22,7 @@ pub struct Settings {
impl Default for Settings {
fn default() -> Self {
Self {
listen_addresses: vec!["127.0.0.1:22222".to_string()],
listen_addresses: vec![],
node_id: get_node_id(),
private_key: PathBuf::from("node_key"),
node_data_dir: PathBuf::from("nodes"),

Loading…
Cancel
Save