Change emit to be synchronous to avoid too many threads

Signed-off-by: trivernis <trivernis@protonmail.com>
pull/1/head
trivernis 4 years ago
parent 9a556309e4
commit 52f9cc1cd1
Signed by: Trivernis
GPG Key ID: DFFFCC2C7A02DB45

@ -1,7 +1,7 @@
[package] [package]
name = "vented" name = "vented"
description = "Event driven encrypted tcp communicaton" description = "Event driven encrypted tcp communicaton"
version = "0.6.4" version = "0.7.0"
authors = ["trivernis <trivernis@protonmail.com>"] authors = ["trivernis <trivernis@protonmail.com>"]
edition = "2018" edition = "2018"
readme = "README.md" readme = "README.md"

@ -45,5 +45,5 @@ fn it_handles_events() {
handler.handle_event(Event::new("test".to_string())); handler.handle_event(Event::new("test".to_string()));
handler.handle_event(Event::new("test2".to_string())); handler.handle_event(Event::new("test2".to_string()));
assert_eq!(call_count.load(Ordering::Relaxed), 5) assert_eq!(call_count.load(Ordering::Relaxed), 6)
} }

@ -41,6 +41,7 @@ type CryptoStreamMap = Arc<Mutex<HashMap<String, CryptoStream>>>;
/// use rand::thread_rng; /// use rand::thread_rng;
/// use vented::event::Event; /// use vented::event::Event;
/// ///
/// let global_secret_b = SecretKey::generate(&mut thread_rng());
/// let nodes = vec![ /// let nodes = vec![
/// Node { /// Node {
/// id: "B".to_string(), /// id: "B".to_string(),
@ -51,7 +52,7 @@ type CryptoStreamMap = Arc<Mutex<HashMap<String, CryptoStream>>>;
///]; ///];
/// // in a real world example the secret key needs to be loaded from somewhere because connections /// // in a real world example the secret key needs to be loaded from somewhere because connections
/// // with unknown keys are not accepted. /// // with unknown keys are not accepted.
/// let global_secret = SecretKey::new(&mut thread_rng()); /// let global_secret = SecretKey::generate(&mut thread_rng());
/// let mut server = VentedServer::new("A".to_string(), global_secret, nodes.clone(), 4); /// let mut server = VentedServer::new("A".to_string(), global_secret, nodes.clone(), 4);
/// ///
/// ///
@ -61,14 +62,13 @@ type CryptoStreamMap = Arc<Mutex<HashMap<String, CryptoStream>>>;
/// ///
/// None // the return value is the response event Option<Event> /// None // the return value is the response event Option<Event>
/// }); /// });
/// server.emit("B".to_string(), Event::new("ping".to_string())).unwrap(); /// assert!(server.emit("B".to_string(), Event::new("ping".to_string())).is_err()) // this won't work without a known node B
/// ``` /// ```
pub struct VentedServer { pub struct VentedServer {
connections: CryptoStreamMap, connections: CryptoStreamMap,
forwarded_connections: ForwardFutureVector, forwarded_connections: ForwardFutureVector,
known_nodes: Arc<Mutex<HashMap<String, Node>>>, known_nodes: Arc<Mutex<HashMap<String, Node>>>,
listener_pool: Arc<Mutex<ScheduledThreadPool>>, pool: Arc<Mutex<ScheduledThreadPool>>,
sender_pool: Arc<Mutex<ScheduledThreadPool>>,
event_handler: Arc<Mutex<EventHandler>>, event_handler: Arc<Mutex<EventHandler>>,
global_secret_key: SecretKey, global_secret_key: SecretKey,
node_id: String, node_id: String,
@ -79,7 +79,6 @@ impl VentedServer {
/// Creates a new vented server with a given node_id and secret key that are /// Creates a new vented server with a given node_id and secret key that are
/// used to authenticate against other servers. /// used to authenticate against other servers.
/// The given nodes are used for authentication. /// The given nodes are used for authentication.
/// The server runs with 2x the given amount of threads.
pub fn new( pub fn new(
node_id: String, node_id: String,
secret_key: SecretKey, secret_key: SecretKey,
@ -89,12 +88,8 @@ impl VentedServer {
let mut server = Self { let mut server = Self {
node_id, node_id,
event_handler: Arc::new(Mutex::new(EventHandler::new())), event_handler: Arc::new(Mutex::new(EventHandler::new())),
listener_pool: Arc::new(Mutex::new(ScheduledThreadPool::with_name( pool: Arc::new(Mutex::new(ScheduledThreadPool::with_name(
"vented_listeners", "vented",
num_threads,
))),
sender_pool: Arc::new(Mutex::new(ScheduledThreadPool::with_name(
"vented_senders",
num_threads, num_threads,
))), ))),
connections: Arc::new(Mutex::new(HashMap::new())), connections: Arc::new(Mutex::new(HashMap::new())),
@ -123,20 +118,14 @@ impl VentedServer {
/// Emits an event to the specified Node /// Emits an event to the specified Node
/// The actual writing is done in a separate thread from the thread pool. /// The actual writing is done in a separate thread from the thread pool.
/// With the returned wait group one can wait for the event to be written. /// With the returned wait group one can wait for the event to be written.
pub fn emit(&self, node_id: String, event: Event) -> VentedResult<WaitGroup> { pub fn emit(&self, node_id: String, event: Event) -> VentedResult<()> {
let wg = WaitGroup::new();
if let Ok(stream) = self.get_connection(&node_id) { if let Ok(stream) = self.get_connection(&node_id) {
self.sender_pool.lock().execute({
let wg = WaitGroup::clone(&wg);
let connections = Arc::clone(&self.connections);
move || {
if let Err(e) = stream.send(event) { if let Err(e) = stream.send(event) {
log::error!("Failed to send event: {}", e); log::error!("Failed to send event: {}", e);
connections.lock().remove(stream.receiver_node()); self.connections.lock().remove(stream.receiver_node());
}
std::mem::drop(wg); return Err(e);
} }
});
} else { } else {
log::trace!( log::trace!(
"Trying to redirect the event to a different node to be sent to target node..." "Trying to redirect the event to a different node to be sent to target node..."
@ -144,7 +133,7 @@ impl VentedServer {
self.send_event_redirected(node_id, event)?; self.send_event_redirected(node_id, event)?;
} }
Ok(wg) Ok(())
} }
/// Adds a handler for the given event. /// Adds a handler for the given event.
@ -216,7 +205,7 @@ impl VentedServer {
known_nodes: Arc::clone(&self.known_nodes), known_nodes: Arc::clone(&self.known_nodes),
connections: Arc::clone(&self.connections), connections: Arc::clone(&self.connections),
event_handler: Arc::clone(&self.event_handler), event_handler: Arc::clone(&self.event_handler),
listener_pool: Arc::clone(&self.listener_pool), listener_pool: Arc::clone(&self.pool),
forwarded_connections: Arc::clone(&self.forwarded_connections), forwarded_connections: Arc::clone(&self.forwarded_connections),
} }
} }
@ -368,7 +357,7 @@ impl VentedServer {
let connections = Arc::clone(&context.connections); let connections = Arc::clone(&context.connections);
let stream = Self::get_crypto_stream(context.clone(), stream)?; let stream = Self::get_crypto_stream(context.clone(), stream)?;
self.listener_pool.lock().execute({ self.pool.lock().execute({
let stream = CryptoStream::clone(&stream); let stream = CryptoStream::clone(&stream);
let event_handler = Arc::clone(&self.event_handler); let event_handler = Arc::clone(&self.event_handler);

@ -142,7 +142,7 @@ impl VentedServer {
self.on(REDIRECT_REDIRECTED_EVENT, { self.on(REDIRECT_REDIRECTED_EVENT, {
let event_handler = Arc::clone(&self.event_handler); let event_handler = Arc::clone(&self.event_handler);
let connections = Arc::clone(&self.connections); let connections = Arc::clone(&self.connections);
let pool = Arc::clone(&self.sender_pool); let pool = Arc::clone(&self.pool);
let known_nodes = Arc::clone(&self.known_nodes); let known_nodes = Arc::clone(&self.known_nodes);
move |event| { move |event| {

@ -98,24 +98,20 @@ fn test_server_communication() {
} }
}); });
server_b.request_node_list().unwrap(); server_b.request_node_list().unwrap();
let wg = server_c server_c
.emit("A".to_string(), Event::new("ping".to_string())) .emit("A".to_string(), Event::new("ping".to_string()))
.unwrap(); .unwrap();
wg.wait();
for _ in 0..9 { for _ in 0..9 {
let wg = server_b server_b
.emit("A".to_string(), Event::new("ping".to_string())) .emit("A".to_string(), Event::new("ping".to_string()))
.unwrap(); .unwrap();
wg.wait();
} }
let wg = server_a server_a
.emit("B".to_string(), Event::new("pong".to_string())) .emit("B".to_string(), Event::new("pong".to_string()))
.unwrap(); .unwrap();
wg.wait(); server_b
let wg = server_b
.emit("C".to_string(), Event::new("ping".to_string())) .emit("C".to_string(), Event::new("ping".to_string()))
.unwrap(); .unwrap();
wg.wait();
// wait one second to make sure the servers were able to process the events // wait one second to make sure the servers were able to process the events
for _ in 0..100 { for _ in 0..100 {

Loading…
Cancel
Save