Fix event handling to not block other events when sending

Signed-off-by: trivernis <trivernis@protonmail.com>
main
trivernis 4 years ago
parent c3710fa60b
commit 3fb35e9383
Signed by: Trivernis
GPG Key ID: DFFFCC2C7A02DB45

144
Cargo.lock generated

@ -97,6 +97,15 @@ dependencies = [
"winapi",
]
[[package]]
name = "async-listen"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c0eff11d7d3dbf808fb25952cc54a0bcf50b501ae6d6ea98a817009b330d0a2a"
dependencies = [
"async-std",
]
[[package]]
name = "async-mutex"
version = "1.4.0"
@ -139,6 +148,17 @@ version = "4.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e91831deabf0d6d7ec49552e489aed63b7456a7a3c46cff62adad428110b0af0"
[[package]]
name = "async-trait"
version = "0.1.41"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b246867b8b3b6ae56035f1eb1ed557c1d8eae97f0d53696138a50fa0e3a3b8c0"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "atomic-waker"
version = "1.0.0"
@ -434,6 +454,21 @@ dependencies = [
"log",
]
[[package]]
name = "futures"
version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b3b0c040a1fe6529d30b3c5944b280c7f0dcb2930d2c3062bca967b602583d0"
dependencies = [
"futures-channel",
"futures-core",
"futures-executor",
"futures-io",
"futures-sink",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-channel"
version = "0.3.8"
@ -441,6 +476,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4b7109687aa4e177ef6fe84553af6280ef2778bdb7783ba44c9dc3399110fe64"
dependencies = [
"futures-core",
"futures-sink",
]
[[package]]
@ -449,6 +485,17 @@ version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "847ce131b72ffb13b6109a221da9ad97a64cbe48feb1028356b836b47b8f1748"
[[package]]
name = "futures-executor"
version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4caa2b2b68b880003057c1dd49f1ed937e38f22fcf6c212188a121f08cf40a65"
dependencies = [
"futures-core",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-io"
version = "0.3.8"
@ -470,6 +517,53 @@ dependencies = [
"waker-fn",
]
[[package]]
name = "futures-macro"
version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77408a692f1f97bcc61dc001d752e00643408fbc922e4d634c655df50d595556"
dependencies = [
"proc-macro-hack",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "futures-sink"
version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f878195a49cee50e006b02b93cf7e0a95a38ac7b776b4c4d9cc1207cd20fcb3d"
[[package]]
name = "futures-task"
version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7c554eb5bf48b2426c4771ab68c6b14468b6e76cc90996f528c3338d761a4d0d"
dependencies = [
"once_cell",
]
[[package]]
name = "futures-util"
version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d304cff4a7b99cfb7986f7d43fbe93d175e72e704a8860787cc95e9ffd85cbd2"
dependencies = [
"futures-channel",
"futures-core",
"futures-io",
"futures-macro",
"futures-sink",
"futures-task",
"memchr",
"pin-project",
"pin-utils",
"proc-macro-hack",
"proc-macro-nested",
"slab",
]
[[package]]
name = "generic-array"
version = "0.14.4"
@ -796,6 +890,26 @@ dependencies = [
"winapi",
]
[[package]]
name = "pin-project"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ee41d838744f60d959d7074e3afb6b35c7456d0f61cad38a24e35e6553f73841"
dependencies = [
"pin-project-internal",
]
[[package]]
name = "pin-project-internal"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "81a4ffa594b66bff340084d4081df649a7dc049ac8d7fc458d8e628bfbbb2f86"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "pin-project-lite"
version = "0.1.11"
@ -866,6 +980,18 @@ dependencies = [
"version_check",
]
[[package]]
name = "proc-macro-hack"
version = "0.5.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dbf0c48bc1d91375ae5c3cd81e3722dff1abcf81a30960240640d223f59fe0e5"
[[package]]
name = "proc-macro-nested"
version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eba180dafb9038b050a4c280019bbedf9f2467b61e5d892dcad585bb57aadc5a"
[[package]]
name = "proc-macro2"
version = "1.0.24"
@ -1007,15 +1133,6 @@ dependencies = [
"zeroize",
]
[[package]]
name = "scheduled-thread-pool"
version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dc6f74fd1204073fa02d5d5d68bec8021be4c38690b61264b2fdb48083d0e7d7"
dependencies = [
"parking_lot",
]
[[package]]
name = "scopeguard"
version = "1.1.0"
@ -1111,11 +1228,13 @@ name = "snekcloud-server"
version = "0.1.0"
dependencies = [
"async-std",
"async-trait",
"base64",
"chrono",
"colored",
"config",
"fern",
"futures",
"glob",
"hostname",
"lazy_static",
@ -1126,7 +1245,6 @@ dependencies = [
"rand",
"regex",
"rusqlite",
"scheduled-thread-pool",
"serde 1.0.117",
"serde_json",
"structopt",
@ -1291,14 +1409,16 @@ checksum = "f1bddf1187be692e79c5ffeab891132dfb0f236ed36a43c7ed39f1165ee20191"
[[package]]
name = "vented"
version = "0.11.0"
version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "33386edc15f14833ecb3b40f3ec75052684202ad892a916e8baf603d686ae543"
checksum = "cdd0a331d4b17a1ac906ffc38825113984f244e376bd58d90affbafee5b650ed"
dependencies = [
"async-listen",
"async-std",
"byteorder",
"crossbeam-utils",
"crypto_box",
"futures",
"generic-array",
"log",
"parking_lot",

@ -11,7 +11,7 @@ path = "src/main.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
vented = "0.11.0"
vented = "0.11.2"
rusqlite = "0.24.1"
rand = "0.7.3"
base64 = "0.13.0"
@ -25,11 +25,12 @@ toml = "0.5.7"
serde = { version = "1.0.117", features = ["serde_derive"] }
config = "0.10.1"
glob = "0.3.0"
scheduled-thread-pool = "0.2.5"
num_cpus = "1.13.0"
lazy_static = "1.4.0"
parking_lot = "0.11.0"
serde_json = "1.0.59"
fern = "0.6.0"
regex = "1.4.2"
async-std = "1.7.0"
async-std = {version = "1.7.0", features=["unstable"]}
async-trait = "0.1.41"
futures = "0.3.8"

@ -1,12 +1,14 @@
use crate::modules::heartbeat::payloads::HeartbeatPayload;
use crate::modules::heartbeat::settings::HeartbeatSettings;
use crate::modules::Module;
use crate::server::tick_context::TickContext;
use crate::server::tick_context::RunContext;
use crate::utils::result::SnekcloudResult;
use crate::utils::settings::get_settings;
use crate::utils::write_json_pretty;
use async_std::task;
use async_trait::async_trait;
use chrono::Local;
use parking_lot::Mutex;
use scheduled_thread_pool::ScheduledThreadPool;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
@ -28,6 +30,7 @@ enum NodeState {
struct NodeInfo {
ping: Option<u64>,
state: NodeState,
timestamp: String,
}
impl NodeInfo {
@ -35,18 +38,19 @@ impl NodeInfo {
Self {
ping: Some(ping),
state: NodeState::Alive,
timestamp: Local::now().format("%Y-%m-%dT%H:%M:%S").to_string(),
}
}
fn dead() -> Self {
Self {
ping: None,
state: NodeState::Dead,
timestamp: Local::now().format("%Y-%m-%dT%H:%M:%S").to_string(),
}
}
}
pub struct HeartbeatModule {
last_tick: Instant,
settings: HeartbeatSettings,
node_states: Arc<Mutex<HashMap<String, Vec<NodeInfo>>>>,
}
@ -54,23 +58,19 @@ pub struct HeartbeatModule {
impl HeartbeatModule {
pub fn new() -> Self {
Self {
last_tick: Instant::now(),
settings: get_settings().modules.heartbeat,
node_states: Arc::new(Mutex::new(HashMap::new())),
}
}
}
#[async_trait]
impl Module for HeartbeatModule {
fn name(&self) -> String {
"HeartbeatModule".to_string()
}
fn init(
&mut self,
server: &mut VentedServer,
pool: &mut ScheduledThreadPool,
) -> SnekcloudResult<()> {
fn init(&mut self, server: &mut VentedServer) -> SnekcloudResult<()> {
server.on(HEARTBEAT_BEAT_EVENT, {
let node_states = Arc::clone(&self.node_states);
@ -92,19 +92,6 @@ impl Module for HeartbeatModule {
})
}
});
if let Some(output) = &self.settings.output_file {
pool.execute_at_fixed_rate(self.settings.interval(), self.settings.interval(), {
let path = output.clone();
let states = Arc::clone(&self.node_states);
move || {
let states = states.lock();
if let Err(e) = write_json_pretty(&path, &*states) {
log::error!("Failed to write output states to file: {}", e)
}
}
});
}
Ok(())
}
@ -113,40 +100,39 @@ impl Module for HeartbeatModule {
Box::new(self)
}
fn tick(
&mut self,
mut context: TickContext,
pool: &mut ScheduledThreadPool,
) -> SnekcloudResult<()> {
if self.last_tick.elapsed() > self.settings.interval() {
log::trace!("Sending heartbeat...");
for node in context.living_nodes() {
let mut future = context.emit(
node.id.clone(),
Event::with_payload(
HEARTBEAT_BEAT_EVENT,
&HeartbeatPayload::now(context.node_id().clone()),
),
);
let states = Arc::clone(&self.node_states);
pool.execute(move || {
match future.get_value_with_timeout(Duration::from_secs(60)) {
Some(Err(e)) => {
log::debug!("Node {} is not reachable: {}", node.id, e);
Self::insert_state(&mut states.lock(), node.id, NodeInfo::dead());
async fn run(&mut self, context: RunContext) -> SnekcloudResult<()> {
for node in context.nodes() {
let mut context = context.clone();
let node_states = Arc::clone(&self.node_states);
let interval = self.settings.interval();
task::spawn(async move {
loop {
Self::send_heartbeat(&mut context, &node.id, Arc::clone(&node_states)).await;
if !context.check_alive(&node.id) {
let start = Instant::now();
while !context.check_alive(&node.id) {
task::sleep(Duration::from_secs(10)).await;
if start.elapsed() > interval * 100 {
break;
}
None => {
log::debug!("Node {} is not reachable: Timeout", node.id);
Self::insert_state(&mut states.lock(), node.id, NodeInfo::dead());
}
_ => {}
} else {
task::sleep(interval).await
}
}
});
}
self.last_tick = Instant::now();
loop {
if let Some(path) = &self.settings.output_file {
let states = self.node_states.lock();
if let Err(e) = write_json_pretty(path, &*states) {
log::error!("Failed to write output states to file: {}", e)
}
}
task::sleep(self.settings.interval()).await
}
Ok(())
}
}
@ -164,4 +150,36 @@ impl HeartbeatModule {
states.insert(id, vec![state]);
}
}
async fn send_heartbeat(
context: &mut RunContext,
target: &String,
states: Arc<Mutex<HashMap<String, Vec<NodeInfo>>>>,
) {
log::trace!("Sending heartbeat to {}...", target);
let mut value = context
.emit(
target.clone(),
Event::with_payload(
HEARTBEAT_BEAT_EVENT,
&HeartbeatPayload::now(context.node_id().clone()),
),
)
.await;
match value
.get_value_with_timeout_async(Duration::from_secs(60))
.await
{
Some(Err(e)) => {
log::debug!("Node {} is not reachable: {}", target, e);
Self::insert_state(&mut *states.lock(), target.clone(), NodeInfo::dead());
}
None => {
log::debug!("Node {} is not reachable: Timeout", target);
Self::insert_state(&mut *states.lock(), target.clone(), NodeInfo::dead());
}
_ => {}
}
}
}

@ -1,19 +1,15 @@
use crate::server::tick_context::TickContext;
use crate::server::tick_context::RunContext;
use crate::utils::result::SnekcloudResult;
use scheduled_thread_pool::ScheduledThreadPool;
use async_trait::async_trait;
use vented::server::VentedServer;
pub mod heartbeat;
pub mod nodes_refresh;
#[async_trait]
pub trait Module {
fn name(&self) -> String;
fn init(
&mut self,
server: &mut VentedServer,
pool: &mut ScheduledThreadPool,
) -> SnekcloudResult<()>;
fn init(&mut self, server: &mut VentedServer) -> SnekcloudResult<()>;
fn boxed(self) -> Box<dyn Module + Send + Sync>;
fn tick(&mut self, context: TickContext, pool: &mut ScheduledThreadPool)
-> SnekcloudResult<()>;
async fn run(&mut self, context: RunContext) -> SnekcloudResult<()>;
}

@ -1,16 +1,16 @@
use crate::data::node_data::NodeData;
use crate::modules::nodes_refresh::settings::NodesRefreshSettings;
use crate::modules::Module;
use crate::server::tick_context::TickContext;
use crate::server::tick_context::RunContext;
use crate::utils::result::SnekcloudResult;
use crate::utils::settings::get_settings;
use async_std::task;
use async_trait::async_trait;
use parking_lot::Mutex;
use scheduled_thread_pool::ScheduledThreadPool;
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant, UNIX_EPOCH};
use vented::event::Event;
use vented::server::data::Node;
use vented::server::server_events::{NodeListPayload, NODE_LIST_REQUEST_EVENT};
@ -22,20 +22,16 @@ pub mod settings;
pub struct NodesRefreshModule {
nodes: Arc<Mutex<HashMap<String, Node>>>,
update_required: Arc<AtomicBool>,
last_request: Instant,
settings: NodesRefreshSettings,
}
#[async_trait]
impl Module for NodesRefreshModule {
fn name(&self) -> String {
"node_list_refresh".to_string()
}
fn init(
&mut self,
server: &mut VentedServer,
pool: &mut ScheduledThreadPool,
) -> SnekcloudResult<()> {
fn init(&mut self, server: &mut VentedServer) -> SnekcloudResult<()> {
{
let mut node_list = self.nodes.lock();
for node in server.nodes() {
@ -75,31 +71,6 @@ impl Module for NodesRefreshModule {
})
}
});
pool.execute_at_fixed_rate(Duration::from_secs(10), self.settings.update_interval(), {
let nodes = Arc::clone(&self.nodes);
let update_required = Arc::clone(&self.update_required);
move || {
if update_required.load(Ordering::Relaxed) {
let nodes_folder = get_settings().node_data_dir;
nodes
.lock()
.values()
.cloned()
.map(|node| {
NodeData::with_addresses(node.id, node.addresses, node.public_key)
})
.for_each(|data| {
let mut path = nodes_folder.clone();
path.push(PathBuf::from(format!("{}.toml", data.id)));
if let Err(e) = data.write_to_file(path) {
log::error!("Failed to write updated node data: {}", e);
}
});
}
}
});
Ok(())
}
@ -108,34 +79,45 @@ impl Module for NodesRefreshModule {
Box::new(self)
}
fn tick(
&mut self,
mut context: TickContext,
_: &mut ScheduledThreadPool,
) -> SnekcloudResult<()> {
if self.last_request.elapsed() > self.settings.update_interval() {
async fn run(&mut self, mut context: RunContext) -> SnekcloudResult<()> {
loop {
for node in context.living_nodes().iter().filter(|node| node.trusted) {
context
.living_nodes()
.iter()
.filter(|node| node.trusted)
.for_each(|node| {
context.emit(node.id.clone(), Event::new(NODE_LIST_REQUEST_EVENT));
});
self.last_request = Instant::now();
.emit(node.id.clone(), Event::new(NODE_LIST_REQUEST_EVENT))
.await;
}
if self.update_required.load(Ordering::Relaxed) {
self.write_node_data();
}
Ok(())
task::sleep(self.settings.update_interval()).await
}
}
}
impl NodesRefreshModule {
pub fn new() -> Self {
let null_time = Instant::now() - UNIX_EPOCH.elapsed().unwrap();
Self {
nodes: Arc::new(Mutex::new(HashMap::new())),
settings: get_settings().modules.nodes_refresh,
last_request: null_time,
update_required: Arc::new(AtomicBool::new(false)),
}
}
fn write_node_data(&self) {
let nodes_folder = get_settings().node_data_dir;
self.nodes
.lock()
.values()
.cloned()
.map(|node| NodeData::with_addresses(node.id, node.addresses, node.public_key))
.for_each(|data| {
let mut path = nodes_folder.clone();
path.push(PathBuf::from(format!("{}.toml", data.id)));
if let Err(e) = data.write_to_file(path) {
log::error!("Failed to write updated node data: {}", e);
}
});
}
}

@ -1,28 +1,21 @@
use crate::modules::Module;
use crate::server::tick_context::{EventInvocation, TickContext};
use crate::server::tick_context::{EventInvocation, RunContext};
use crate::utils::result::{SnekcloudError, SnekcloudResult};
use crate::utils::settings::get_settings;
use parking_lot::Mutex;
use scheduled_thread_pool::ScheduledThreadPool;
use async_std::sync::{channel, Receiver};
use async_std::task;
use std::collections::HashMap;
use std::mem;
use std::sync::mpsc::{channel, Receiver};
use std::sync::Arc;
use std::time::Duration;
use vented::server::data::Node;
use vented::server::VentedServer;
use vented::stream::SecretKey;
pub mod tick_context;
const SERVER_TICK_RATE_MS: u64 = 10;
pub struct SnekcloudServer {
inner: VentedServer,
listen_addresses: Vec<String>,
module_pool: HashMap<String, Arc<Mutex<ScheduledThreadPool>>>,
modules: HashMap<String, Box<dyn Module + Send + Sync>>,
}
@ -32,7 +25,6 @@ impl SnekcloudServer {
Self {
inner: VentedServer::new(id, private_key, keys, get_settings().timeouts()),
listen_addresses: Vec::new(),
module_pool: HashMap::new(),
modules: HashMap::new(),
}
}
@ -48,30 +40,17 @@ impl SnekcloudServer {
self.inner.listen(address.clone())
}
let modules = mem::take(&mut self.modules);
let (tx, rx) = channel();
let tick_context = TickContext::new(self.inner.node_id(), tx, self.inner.nodes_ref());
let mut modules = mem::take(&mut self.modules).into_iter();
let (tx, rx) = channel(10);
let tick_context = RunContext::new(self.inner.node_id(), tx, self.inner.nodes_ref());
for (name, mut module) in modules {
self.module_pool
.get(&name)
.unwrap()
.lock()
.execute_at_fixed_rate(
Duration::from_millis(SERVER_TICK_RATE_MS),
Duration::from_millis(SERVER_TICK_RATE_MS),
{
let mut module_pool = ScheduledThreadPool::new(1);
let tick_context = TickContext::clone(&tick_context);
move || {
if let Err(e) =
module.tick(TickContext::clone(&tick_context), &mut module_pool)
{
while let Some((name, mut module)) = modules.next() {
let tick_context = RunContext::clone(&tick_context);
task::spawn(async move {
if let Err(e) = module.run(RunContext::clone(&tick_context)).await {
log::error!("Error when ticking module {}: {}", name, e);
}
}
},
);
});
}
task::block_on(self.handle_invocations(rx));
@ -81,14 +60,14 @@ impl SnekcloudServer {
/// Handles invocations
async fn handle_invocations(&self, rx: Receiver<EventInvocation>) {
for mut invocation in rx {
let result = self
.inner
.emit(invocation.target_node.clone(), invocation.event)
.await;
while let Ok(mut invocation) = rx.recv().await {
let inner = self.inner.clone();
task::spawn(async move {
let result = task::block_on(inner.emit(invocation.target_node, invocation.event));
invocation
.result
.result(result.map_err(SnekcloudError::from));
});
}
}
@ -97,10 +76,7 @@ impl SnekcloudServer {
&mut self,
mut module: impl Module + Send + Sync,
) -> SnekcloudResult<()> {
let module_pool = Arc::new(Mutex::new(ScheduledThreadPool::new(2)));
module.init(&mut self.inner, &mut module_pool.lock())?;
self.module_pool.insert(module.name(), module_pool);
module.init(&mut self.inner)?;
self.modules.insert(module.name(), module.boxed());
Ok(())

@ -1,14 +1,14 @@
use crate::utils::result::SnekcloudError;
use async_std::sync::Sender;
use parking_lot::Mutex;
use std::collections::HashMap;
use std::sync::mpsc::Sender;
use std::sync::Arc;
use vented::event::Event;
use vented::server::data::{Node, NodeData};
use vented::utils::sync::AsyncValue;
#[derive(Clone)]
pub struct TickContext {
pub struct RunContext {
nodes: Arc<Mutex<HashMap<String, NodeData>>>,
event_sender: Sender<EventInvocation>,
node_id: String,
@ -20,7 +20,7 @@ pub struct EventInvocation {
pub target_node: String,
}
impl TickContext {
impl RunContext {
pub fn new(
node_id: String,
sender: Sender<EventInvocation>,
@ -33,7 +33,7 @@ impl TickContext {
}
}
pub fn emit<S: ToString>(
pub async fn emit<S: ToString>(
&mut self,
target_node: S,
event: Event,
@ -43,9 +43,9 @@ impl TickContext {
.send(EventInvocation {
event,
target_node: target_node.to_string(),
result: AsyncValue::clone(&value),
result: value.clone(),
})
.unwrap();
.await;
value
}

@ -47,6 +47,8 @@ pub fn init_logger() {
)
.unwrap_or(LevelFilter::Info),
)
.level_for("async_io", log::LevelFilter::Info)
.level_for("polling", log::LevelFilter::Info)
.chain(std::io::stdout())
.chain(
fern::log_file(log_dir.join(PathBuf::from(format!(

Loading…
Cancel
Save