diff --git a/Cargo.toml b/Cargo.toml index 5522d73..c5254c0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,6 +9,8 @@ edition = "2018" [dependencies] rmp = "0.8.9" rmp-serde = "0.14.4" -serde = {version = "1.0.117", features = ["serde_derive"]} +rmpv = "0.4.5" +serde = { version = "1.0.117", features = ["serde_derive"] } byteorder = "1.3.4" -parking_lot = "0.11.0" \ No newline at end of file +parking_lot = "0.11.0" +scheduled-thread-pool = "0.2.5" \ No newline at end of file diff --git a/src/event/mod.rs b/src/event/mod.rs index 6f46a82..580b4f9 100644 --- a/src/event/mod.rs +++ b/src/event/mod.rs @@ -1,9 +1,13 @@ -use serde::{Serialize, Deserialize}; -use crate::result::{VentedResult, VentedError}; -use byteorder::{BigEndian, ByteOrder, ReadBytesExt}; use std::io::Read; + +use byteorder::{BigEndian, ByteOrder, ReadBytesExt}; +use serde::{Deserialize, Serialize}; use serde::de::DeserializeOwned; +use crate::result::{VentedError, VentedResult}; + +pub trait GenericEvent {} + #[cfg(test)] mod tests; @@ -13,29 +17,26 @@ pub struct EmptyPayload {} /// A single event that has a name and payload. /// The payload is encoded with message pack #[derive(Clone, Debug)] -pub struct Event { +pub struct Event { pub name: String, - pub payload: T, + pub payload: Vec, } -impl Event { +impl Event { /// Creates a new Event with an empty payload pub fn new(name: String) -> Self { - Self { + Self { name, - payload: EmptyPayload {} + payload: Vec::with_capacity(0), } } } -impl Event where T: Serialize + DeserializeOwned { - +impl Event { /// Creates a new Event with a payload - pub fn with_payload(name: String, payload: T) -> Self { - Self { - name, - payload, - } + pub fn with_payload(name: String, payload: &T) -> Self { + let payload = rmp_serde::encode::to_vec(payload).unwrap(); + Self { name, payload } } /// Returns the byte representation for the message @@ -44,15 +45,14 @@ impl Event where T: Serialize + DeserializeOwned { /// `name`: `name-length`, /// `payload-length`: `u64`, /// `payload`: `payload-length`, - pub fn to_bytes(&self) -> VentedResult> { - let mut payload_raw = rmp_serde::to_vec(&self.payload)?; + pub fn as_bytes(&mut self) -> VentedResult> { let mut name_raw = self.name.as_bytes().to_vec(); let name_length = name_raw.len(); let mut name_length_raw = [0u8; 2]; BigEndian::write_u16(&mut name_length_raw, name_length as u16); - let payload_length = payload_raw.len(); + let payload_length = self.payload.len(); let mut payload_length_raw = [0u8; 8]; BigEndian::write_u64(&mut payload_length_raw, payload_length as u64); @@ -61,7 +61,7 @@ impl Event where T: Serialize + DeserializeOwned { data.append(&mut name_length_raw.to_vec()); data.append(&mut name_raw); data.append(&mut payload_length_raw.to_vec()); - data.append(&mut payload_raw); + data.append(&mut self.payload); Ok(data) } @@ -75,11 +75,19 @@ impl Event where T: Serialize + DeserializeOwned { let event_name = String::from_utf8(name_buf).map_err(|_| VentedError::NameDecodingError)?; let payload_length = bytes.read_u64::()?; - let payload = rmp_serde::from_read(bytes.take(payload_length))?; + let mut payload = vec![0u8; payload_length as usize]; + bytes.read_exact(&mut payload)?; Ok(Self { name: event_name, payload, }) } -} \ No newline at end of file + + /// Returns the payload of the event as a deserialized messagepack value + pub fn get_payload(&self) -> VentedResult { + let payload = rmp_serde::decode::from_read(&self.payload[..])?; + + Ok(payload) + } +} diff --git a/src/event/tests.rs b/src/event/tests.rs index 0fc5ca3..9f83578 100644 --- a/src/event/tests.rs +++ b/src/event/tests.rs @@ -1,11 +1,12 @@ use serde::{Deserialize, Serialize}; + use crate::event::Event; #[derive(PartialEq, Serialize, Deserialize, Clone, Debug)] struct SimplePayload { string: String, number: u32, - float: f32 + float: f32, } #[test] @@ -13,11 +14,11 @@ fn it_serializes_events() { let payload = SimplePayload { string: "test".to_string(), number: 7, - float: 2.1 + float: 2.1, }; let payload_raw = rmp_serde::to_vec(&payload).unwrap(); - let event = Event::with_payload("test".to_string(), payload); - let event_bytes = event.to_bytes().unwrap(); + let mut event = Event::with_payload("test".to_string(), &payload); + let event_bytes = event.as_bytes().unwrap(); assert_eq!(event_bytes[0..2], [0x00, 0x04]); assert_eq!(event_bytes[6..14], payload_raw.len().to_be_bytes()); @@ -28,12 +29,15 @@ fn it_deserializes_events() { let payload = SimplePayload { string: "test".to_string(), number: 7, - float: 2.1 + float: 2.1, }; - let event = Event::with_payload("test".to_string(), payload.clone()); - let event_bytes = event.to_bytes().unwrap(); + let mut event = Event::with_payload("test".to_string(), &payload); + let event_bytes = event.as_bytes().unwrap(); - let deserialized_event = Event::::from_bytes(&mut event_bytes.as_slice()).unwrap(); + let deserialized_event = Event::from_bytes(&mut event_bytes.as_slice()).unwrap(); assert_eq!(deserialized_event.name, "test".to_string()); - assert_eq!(deserialized_event.payload, payload); -} \ No newline at end of file + assert_eq!( + deserialized_event.get_payload::().unwrap(), + payload + ); +} diff --git a/src/event_handler/mod.rs b/src/event_handler/mod.rs index f792efd..00d9be2 100644 --- a/src/event_handler/mod.rs +++ b/src/event_handler/mod.rs @@ -1,20 +1,16 @@ -use crate::event::Event; -use serde::de::DeserializeOwned; -use serde::Serialize; use std::collections::HashMap; +use crate::event::Event; + #[cfg(test)] mod tests; /// A handler for events -pub struct EventHandler { - event_handlers: HashMap)>>>, +pub struct EventHandler { + event_handlers: HashMap Option + Send + Sync>>>, } -impl EventHandler -where - T: Serialize + DeserializeOwned, -{ +impl EventHandler { /// Creates a new vented event_handler pub fn new() -> Self { Self { @@ -24,8 +20,8 @@ where /// Adds a handler for the given event pub fn on(&mut self, event_name: &str, handler: F) - where - F: Fn(&Event), + where + F: Fn(Event) -> Option + Send + Sync, { match self.event_handlers.get_mut(event_name) { Some(handlers) => handlers.push(Box::new(handler)), @@ -37,9 +33,16 @@ where } /// Handles a single event - pub fn handle_event(&mut self, event: Event) -> bool { + pub fn handle_event(&mut self, event: Event) -> bool { if let Some(handlers) = self.event_handlers.get(&event.name) { - handlers.iter().for_each(|handler| handler(&event)); + let mut event = event; + for handler in handlers { + if let Some(e) = handler(event) { + event = e; + } else { + break; + } + } true } else { diff --git a/src/event_handler/tests.rs b/src/event_handler/tests.rs index e0b4822..5844e77 100644 --- a/src/event_handler/tests.rs +++ b/src/event_handler/tests.rs @@ -1,22 +1,43 @@ +use std::sync::Arc; +use std::sync::atomic::{AtomicUsize, Ordering}; + use crate::event::Event; use crate::event_handler::EventHandler; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::Arc; #[test] fn it_handles_events() { let mut handler = EventHandler::new(); let call_count = Arc::new(AtomicUsize::new(0)); + { + let call_count = Arc::clone(&call_count); + handler.on("test", move |event| { + call_count.fetch_add(1, Ordering::Relaxed); + + Some(event) + }); + } { let call_count = Arc::clone(&call_count); handler.on("test", move |_event| { call_count.fetch_add(1, Ordering::Relaxed); + + None + }); + } + { + let call_count = Arc::clone(&call_count); + handler.on("test2", move |_event| { + call_count.fetch_add(1, Ordering::Relaxed); + + None }); } { let call_count = Arc::clone(&call_count); handler.on("test2", move |_event| { call_count.fetch_add(1, Ordering::Relaxed); + + None }) } @@ -24,5 +45,5 @@ fn it_handles_events() { handler.handle_event(Event::new("test".to_string())); handler.handle_event(Event::new("test2".to_string())); - assert_eq!(call_count.load(Ordering::Relaxed), 3) + assert_eq!(call_count.load(Ordering::Relaxed), 5) } diff --git a/src/lib.rs b/src/lib.rs index ff3d44d..e879237 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,4 @@ pub mod event; pub mod event_handler; pub mod result; +pub mod server; diff --git a/src/result.rs b/src/result.rs index 72ad54e..d2682e4 100644 --- a/src/result.rs +++ b/src/result.rs @@ -40,4 +40,4 @@ impl From for VentedError { fn from(other: rmp_serde::decode::Error) -> Self { Self::DeserializeError(other) } -} \ No newline at end of file +} diff --git a/src/server/mod.rs b/src/server/mod.rs new file mode 100644 index 0000000..89267f1 --- /dev/null +++ b/src/server/mod.rs @@ -0,0 +1,11 @@ +use crate::event::Event; +use crate::result::VentedResult; + +pub mod tcp; + +pub trait VentedServer { + fn listen(&mut self, address: &str) -> VentedResult<()>; + fn register_handler(&mut self, event_name: &str, handler: F) + where + F: Fn(Event) -> Option + Send + Sync; +} diff --git a/src/server/tcp/mod.rs b/src/server/tcp/mod.rs new file mode 100644 index 0000000..a947bf8 --- /dev/null +++ b/src/server/tcp/mod.rs @@ -0,0 +1,52 @@ +use std::borrow::BorrowMut; +use std::net::{TcpListener, TcpStream}; +use std::sync::Arc; + +use parking_lot::Mutex; +use scheduled_thread_pool::ScheduledThreadPool; + +use crate::event::Event; +use crate::event_handler::EventHandler; +use crate::result::VentedResult; +use crate::server::VentedServer; + +pub struct VentedTcpServer { + event_handler: Arc>, + pool: ScheduledThreadPool, +} + +impl VentedServer for VentedTcpServer { + fn listen(&mut self, address: &str) -> VentedResult<()> { + let listener = TcpListener::bind(address)?; + for stream in listener.incoming() { + match stream { + Ok(stream) => self.handle_connection(stream), + Err(_) => {} + } + } + + Ok(()) + } + + /// Registers an event on the internal event handler + fn register_handler(&mut self, event_name: &str, handler: F) + where + F: Fn(Event) -> Option + Send + Sync, + { + self.event_handler + .lock() + .borrow_mut() + .on(event_name, handler); + } +} + +impl VentedTcpServer { + fn handle_connection(&mut self, mut stream: TcpStream) { + let handler = Arc::clone(&self.event_handler); + self.pool.execute(move || { + if let Ok(event) = Event::from_bytes(&mut stream) { + handler.lock().handle_event(event); + } + }); + } +}