From 46c2b6266dae37463a1c7c1c7481822a63ea80ad Mon Sep 17 00:00:00 2001 From: trivernis Date: Mon, 2 Nov 2020 21:18:06 +0100 Subject: [PATCH] Add response event generation to event handler Signed-off-by: trivernis --- Cargo.toml | 3 ++- src/event/mod.rs | 6 +++--- src/event/tests.rs | 4 ++-- src/event_handler/mod.rs | 17 +++++++++-------- src/event_handler/tests.rs | 2 +- src/server/mod.rs | 1 + src/server/server_events.rs | 7 +++++++ src/server/tcp/mod.rs | 26 ++++++++++++++++++++++++-- 8 files changed, 49 insertions(+), 17 deletions(-) create mode 100644 src/server/server_events.rs diff --git a/Cargo.toml b/Cargo.toml index c5254c0..64bc48e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,4 +13,5 @@ rmpv = "0.4.5" serde = { version = "1.0.117", features = ["serde_derive"] } byteorder = "1.3.4" parking_lot = "0.11.0" -scheduled-thread-pool = "0.2.5" \ No newline at end of file +scheduled-thread-pool = "0.2.5" +log = "0.4.11" \ No newline at end of file diff --git a/src/event/mod.rs b/src/event/mod.rs index 580b4f9..a38f76b 100644 --- a/src/event/mod.rs +++ b/src/event/mod.rs @@ -1,8 +1,8 @@ use std::io::Read; use byteorder::{BigEndian, ByteOrder, ReadBytesExt}; -use serde::{Deserialize, Serialize}; use serde::de::DeserializeOwned; +use serde::{Deserialize, Serialize}; use crate::result::{VentedError, VentedResult}; @@ -45,7 +45,7 @@ impl Event { /// `name`: `name-length`, /// `payload-length`: `u64`, /// `payload`: `payload-length`, - pub fn as_bytes(&mut self) -> VentedResult> { + pub fn as_bytes(&mut self) -> Vec { let mut name_raw = self.name.as_bytes().to_vec(); let name_length = name_raw.len(); @@ -63,7 +63,7 @@ impl Event { data.append(&mut payload_length_raw.to_vec()); data.append(&mut self.payload); - Ok(data) + data } /// Deserializes the message from bytes that can be read from the given reader diff --git a/src/event/tests.rs b/src/event/tests.rs index 9f83578..6f6c9e9 100644 --- a/src/event/tests.rs +++ b/src/event/tests.rs @@ -18,7 +18,7 @@ fn it_serializes_events() { }; let payload_raw = rmp_serde::to_vec(&payload).unwrap(); let mut event = Event::with_payload("test".to_string(), &payload); - let event_bytes = event.as_bytes().unwrap(); + let event_bytes = event.as_bytes(); assert_eq!(event_bytes[0..2], [0x00, 0x04]); assert_eq!(event_bytes[6..14], payload_raw.len().to_be_bytes()); @@ -32,7 +32,7 @@ fn it_deserializes_events() { float: 2.1, }; let mut event = Event::with_payload("test".to_string(), &payload); - let event_bytes = event.as_bytes().unwrap(); + let event_bytes = event.as_bytes(); let deserialized_event = Event::from_bytes(&mut event_bytes.as_slice()).unwrap(); assert_eq!(deserialized_event.name, "test".to_string()); diff --git a/src/event_handler/mod.rs b/src/event_handler/mod.rs index 00d9be2..cea2fda 100644 --- a/src/event_handler/mod.rs +++ b/src/event_handler/mod.rs @@ -20,8 +20,8 @@ impl EventHandler { /// Adds a handler for the given event pub fn on(&mut self, event_name: &str, handler: F) - where - F: Fn(Event) -> Option + Send + Sync, + where + F: Fn(Event) -> Option + Send + Sync, { match self.event_handlers.get_mut(event_name) { Some(handlers) => handlers.push(Box::new(handler)), @@ -33,20 +33,21 @@ impl EventHandler { } /// Handles a single event - pub fn handle_event(&mut self, event: Event) -> bool { + pub fn handle_event(&mut self, event: Event) -> Option { if let Some(handlers) = self.event_handlers.get(&event.name) { - let mut event = event; + let mut event = Some(event); for handler in handlers { - if let Some(e) = handler(event) { - event = e; + if let Some(e) = handler(event.unwrap()) { + event = Some(e); } else { + event = None; break; } } - true + event } else { - false + None } } } diff --git a/src/event_handler/tests.rs b/src/event_handler/tests.rs index 5844e77..fcfd7d8 100644 --- a/src/event_handler/tests.rs +++ b/src/event_handler/tests.rs @@ -1,5 +1,5 @@ -use std::sync::Arc; use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; use crate::event::Event; use crate::event_handler::EventHandler; diff --git a/src/server/mod.rs b/src/server/mod.rs index 89267f1..1b7b589 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -1,6 +1,7 @@ use crate::event::Event; use crate::result::VentedResult; +pub(crate) mod server_events; pub mod tcp; pub trait VentedServer { diff --git a/src/server/server_events.rs b/src/server/server_events.rs new file mode 100644 index 0000000..96a6651 --- /dev/null +++ b/src/server/server_events.rs @@ -0,0 +1,7 @@ +use crate::event_handler::EventHandler; + +pub(crate) fn get_server_event_handler() -> EventHandler { + let handler = EventHandler::new(); + + handler +} diff --git a/src/server/tcp/mod.rs b/src/server/tcp/mod.rs index a947bf8..71fb8c4 100644 --- a/src/server/tcp/mod.rs +++ b/src/server/tcp/mod.rs @@ -8,7 +8,9 @@ use scheduled_thread_pool::ScheduledThreadPool; use crate::event::Event; use crate::event_handler::EventHandler; use crate::result::VentedResult; +use crate::server::server_events::get_server_event_handler; use crate::server::VentedServer; +use std::io::Write; pub struct VentedTcpServer { event_handler: Arc>, @@ -16,12 +18,14 @@ pub struct VentedTcpServer { } impl VentedServer for VentedTcpServer { + /// Starts listening on the given address fn listen(&mut self, address: &str) -> VentedResult<()> { let listener = TcpListener::bind(address)?; for stream in listener.incoming() { + log::trace!("Connection received."); match stream { Ok(stream) => self.handle_connection(stream), - Err(_) => {} + Err(e) => log::error!("Failed to handle connection: {}", e), } } @@ -41,11 +45,29 @@ impl VentedServer for VentedTcpServer { } impl VentedTcpServer { + /// Creates a new server that runs on the specified number of threads + pub fn new(num_threads: usize) -> Self { + let event_handler = get_server_event_handler(); + let pool = ScheduledThreadPool::new(num_threads); + + Self { + event_handler: Arc::new(Mutex::new(event_handler)), + pool, + } + } + + /// Handles what happens on connection fn handle_connection(&mut self, mut stream: TcpStream) { let handler = Arc::clone(&self.event_handler); self.pool.execute(move || { if let Ok(event) = Event::from_bytes(&mut stream) { - handler.lock().handle_event(event); + if let Some(mut event) = handler.lock().handle_event(event) { + if let Err(e) = stream.write(&event.as_bytes()) { + log::error!("Failed to respond to event: {}", e) + } + } + } else { + log::warn!("Failed to create an Event from received bytes.") } }); }