Add response event generation to event handler

Signed-off-by: trivernis <trivernis@protonmail.com>
pull/1/head
trivernis 4 years ago
parent e6edb20b7e
commit 46c2b6266d
Signed by: Trivernis
GPG Key ID: DFFFCC2C7A02DB45

@ -14,3 +14,4 @@ serde = { version = "1.0.117", features = ["serde_derive"] }
byteorder = "1.3.4" byteorder = "1.3.4"
parking_lot = "0.11.0" parking_lot = "0.11.0"
scheduled-thread-pool = "0.2.5" scheduled-thread-pool = "0.2.5"
log = "0.4.11"

@ -1,8 +1,8 @@
use std::io::Read; use std::io::Read;
use byteorder::{BigEndian, ByteOrder, ReadBytesExt}; use byteorder::{BigEndian, ByteOrder, ReadBytesExt};
use serde::{Deserialize, Serialize};
use serde::de::DeserializeOwned; use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use crate::result::{VentedError, VentedResult}; use crate::result::{VentedError, VentedResult};
@ -45,7 +45,7 @@ impl Event {
/// `name`: `name-length`, /// `name`: `name-length`,
/// `payload-length`: `u64`, /// `payload-length`: `u64`,
/// `payload`: `payload-length`, /// `payload`: `payload-length`,
pub fn as_bytes(&mut self) -> VentedResult<Vec<u8>> { pub fn as_bytes(&mut self) -> Vec<u8> {
let mut name_raw = self.name.as_bytes().to_vec(); let mut name_raw = self.name.as_bytes().to_vec();
let name_length = name_raw.len(); let name_length = name_raw.len();
@ -63,7 +63,7 @@ impl Event {
data.append(&mut payload_length_raw.to_vec()); data.append(&mut payload_length_raw.to_vec());
data.append(&mut self.payload); data.append(&mut self.payload);
Ok(data) data
} }
/// Deserializes the message from bytes that can be read from the given reader /// Deserializes the message from bytes that can be read from the given reader

@ -18,7 +18,7 @@ fn it_serializes_events() {
}; };
let payload_raw = rmp_serde::to_vec(&payload).unwrap(); let payload_raw = rmp_serde::to_vec(&payload).unwrap();
let mut event = Event::with_payload("test".to_string(), &payload); let mut event = Event::with_payload("test".to_string(), &payload);
let event_bytes = event.as_bytes().unwrap(); let event_bytes = event.as_bytes();
assert_eq!(event_bytes[0..2], [0x00, 0x04]); assert_eq!(event_bytes[0..2], [0x00, 0x04]);
assert_eq!(event_bytes[6..14], payload_raw.len().to_be_bytes()); assert_eq!(event_bytes[6..14], payload_raw.len().to_be_bytes());
@ -32,7 +32,7 @@ fn it_deserializes_events() {
float: 2.1, float: 2.1,
}; };
let mut event = Event::with_payload("test".to_string(), &payload); let mut event = Event::with_payload("test".to_string(), &payload);
let event_bytes = event.as_bytes().unwrap(); let event_bytes = event.as_bytes();
let deserialized_event = Event::from_bytes(&mut event_bytes.as_slice()).unwrap(); let deserialized_event = Event::from_bytes(&mut event_bytes.as_slice()).unwrap();
assert_eq!(deserialized_event.name, "test".to_string()); assert_eq!(deserialized_event.name, "test".to_string());

@ -20,8 +20,8 @@ impl EventHandler {
/// Adds a handler for the given event /// Adds a handler for the given event
pub fn on<F: 'static>(&mut self, event_name: &str, handler: F) pub fn on<F: 'static>(&mut self, event_name: &str, handler: F)
where where
F: Fn(Event) -> Option<Event> + Send + Sync, F: Fn(Event) -> Option<Event> + Send + Sync,
{ {
match self.event_handlers.get_mut(event_name) { match self.event_handlers.get_mut(event_name) {
Some(handlers) => handlers.push(Box::new(handler)), Some(handlers) => handlers.push(Box::new(handler)),
@ -33,20 +33,21 @@ impl EventHandler {
} }
/// Handles a single event /// Handles a single event
pub fn handle_event(&mut self, event: Event) -> bool { pub fn handle_event(&mut self, event: Event) -> Option<Event> {
if let Some(handlers) = self.event_handlers.get(&event.name) { if let Some(handlers) = self.event_handlers.get(&event.name) {
let mut event = event; let mut event = Some(event);
for handler in handlers { for handler in handlers {
if let Some(e) = handler(event) { if let Some(e) = handler(event.unwrap()) {
event = e; event = Some(e);
} else { } else {
event = None;
break; break;
} }
} }
true event
} else { } else {
false None
} }
} }
} }

@ -1,5 +1,5 @@
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use crate::event::Event; use crate::event::Event;
use crate::event_handler::EventHandler; use crate::event_handler::EventHandler;

@ -1,6 +1,7 @@
use crate::event::Event; use crate::event::Event;
use crate::result::VentedResult; use crate::result::VentedResult;
pub(crate) mod server_events;
pub mod tcp; pub mod tcp;
pub trait VentedServer { pub trait VentedServer {

@ -0,0 +1,7 @@
use crate::event_handler::EventHandler;
pub(crate) fn get_server_event_handler() -> EventHandler {
let handler = EventHandler::new();
handler
}

@ -8,7 +8,9 @@ use scheduled_thread_pool::ScheduledThreadPool;
use crate::event::Event; use crate::event::Event;
use crate::event_handler::EventHandler; use crate::event_handler::EventHandler;
use crate::result::VentedResult; use crate::result::VentedResult;
use crate::server::server_events::get_server_event_handler;
use crate::server::VentedServer; use crate::server::VentedServer;
use std::io::Write;
pub struct VentedTcpServer { pub struct VentedTcpServer {
event_handler: Arc<Mutex<EventHandler>>, event_handler: Arc<Mutex<EventHandler>>,
@ -16,12 +18,14 @@ pub struct VentedTcpServer {
} }
impl VentedServer for VentedTcpServer { impl VentedServer for VentedTcpServer {
/// Starts listening on the given address
fn listen(&mut self, address: &str) -> VentedResult<()> { fn listen(&mut self, address: &str) -> VentedResult<()> {
let listener = TcpListener::bind(address)?; let listener = TcpListener::bind(address)?;
for stream in listener.incoming() { for stream in listener.incoming() {
log::trace!("Connection received.");
match stream { match stream {
Ok(stream) => self.handle_connection(stream), Ok(stream) => self.handle_connection(stream),
Err(_) => {} Err(e) => log::error!("Failed to handle connection: {}", e),
} }
} }
@ -41,11 +45,29 @@ impl VentedServer for VentedTcpServer {
} }
impl VentedTcpServer { impl VentedTcpServer {
/// Creates a new server that runs on the specified number of threads
pub fn new(num_threads: usize) -> Self {
let event_handler = get_server_event_handler();
let pool = ScheduledThreadPool::new(num_threads);
Self {
event_handler: Arc::new(Mutex::new(event_handler)),
pool,
}
}
/// Handles what happens on connection
fn handle_connection(&mut self, mut stream: TcpStream) { fn handle_connection(&mut self, mut stream: TcpStream) {
let handler = Arc::clone(&self.event_handler); let handler = Arc::clone(&self.event_handler);
self.pool.execute(move || { self.pool.execute(move || {
if let Ok(event) = Event::from_bytes(&mut stream) { if let Ok(event) = Event::from_bytes(&mut stream) {
handler.lock().handle_event(event); if let Some(mut event) = handler.lock().handle_event(event) {
if let Err(e) = stream.write(&event.as_bytes()) {
log::error!("Failed to respond to event: {}", e)
}
}
} else {
log::warn!("Failed to create an Event from received bytes.")
} }
}); });
} }

Loading…
Cancel
Save