Add tcp server implemenation and change event to store generic payloads

Signed-off-by: trivernis <trivernis@protonmail.com>
pull/1/head
trivernis 4 years ago
parent c550acc129
commit e6edb20b7e
Signed by: Trivernis
GPG Key ID: DFFFCC2C7A02DB45

@ -9,6 +9,8 @@ edition = "2018"
[dependencies] [dependencies]
rmp = "0.8.9" rmp = "0.8.9"
rmp-serde = "0.14.4" rmp-serde = "0.14.4"
rmpv = "0.4.5"
serde = { version = "1.0.117", features = ["serde_derive"] } serde = { version = "1.0.117", features = ["serde_derive"] }
byteorder = "1.3.4" byteorder = "1.3.4"
parking_lot = "0.11.0" parking_lot = "0.11.0"
scheduled-thread-pool = "0.2.5"

@ -1,9 +1,13 @@
use serde::{Serialize, Deserialize};
use crate::result::{VentedResult, VentedError};
use byteorder::{BigEndian, ByteOrder, ReadBytesExt};
use std::io::Read; use std::io::Read;
use byteorder::{BigEndian, ByteOrder, ReadBytesExt};
use serde::{Deserialize, Serialize};
use serde::de::DeserializeOwned; use serde::de::DeserializeOwned;
use crate::result::{VentedError, VentedResult};
pub trait GenericEvent {}
#[cfg(test)] #[cfg(test)]
mod tests; mod tests;
@ -13,29 +17,26 @@ pub struct EmptyPayload {}
/// A single event that has a name and payload. /// A single event that has a name and payload.
/// The payload is encoded with message pack /// The payload is encoded with message pack
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct Event<T> { pub struct Event {
pub name: String, pub name: String,
pub payload: T, pub payload: Vec<u8>,
} }
impl Event<EmptyPayload> { impl Event {
/// Creates a new Event with an empty payload /// Creates a new Event with an empty payload
pub fn new(name: String) -> Self { pub fn new(name: String) -> Self {
Self { Self {
name, name,
payload: EmptyPayload {} payload: Vec::with_capacity(0),
} }
} }
} }
impl<T> Event<T> where T: Serialize + DeserializeOwned { impl Event {
/// Creates a new Event with a payload /// Creates a new Event with a payload
pub fn with_payload(name: String, payload: T) -> Self { pub fn with_payload<T: Serialize>(name: String, payload: &T) -> Self {
Self { let payload = rmp_serde::encode::to_vec(payload).unwrap();
name, Self { name, payload }
payload,
}
} }
/// Returns the byte representation for the message /// Returns the byte representation for the message
@ -44,15 +45,14 @@ impl<T> Event<T> where T: Serialize + DeserializeOwned {
/// `name`: `name-length`, /// `name`: `name-length`,
/// `payload-length`: `u64`, /// `payload-length`: `u64`,
/// `payload`: `payload-length`, /// `payload`: `payload-length`,
pub fn to_bytes(&self) -> VentedResult<Vec<u8>> { pub fn as_bytes(&mut self) -> VentedResult<Vec<u8>> {
let mut payload_raw = rmp_serde::to_vec(&self.payload)?;
let mut name_raw = self.name.as_bytes().to_vec(); let mut name_raw = self.name.as_bytes().to_vec();
let name_length = name_raw.len(); let name_length = name_raw.len();
let mut name_length_raw = [0u8; 2]; let mut name_length_raw = [0u8; 2];
BigEndian::write_u16(&mut name_length_raw, name_length as u16); BigEndian::write_u16(&mut name_length_raw, name_length as u16);
let payload_length = payload_raw.len(); let payload_length = self.payload.len();
let mut payload_length_raw = [0u8; 8]; let mut payload_length_raw = [0u8; 8];
BigEndian::write_u64(&mut payload_length_raw, payload_length as u64); BigEndian::write_u64(&mut payload_length_raw, payload_length as u64);
@ -61,7 +61,7 @@ impl<T> Event<T> where T: Serialize + DeserializeOwned {
data.append(&mut name_length_raw.to_vec()); data.append(&mut name_length_raw.to_vec());
data.append(&mut name_raw); data.append(&mut name_raw);
data.append(&mut payload_length_raw.to_vec()); data.append(&mut payload_length_raw.to_vec());
data.append(&mut payload_raw); data.append(&mut self.payload);
Ok(data) Ok(data)
} }
@ -75,11 +75,19 @@ impl<T> Event<T> where T: Serialize + DeserializeOwned {
let event_name = String::from_utf8(name_buf).map_err(|_| VentedError::NameDecodingError)?; let event_name = String::from_utf8(name_buf).map_err(|_| VentedError::NameDecodingError)?;
let payload_length = bytes.read_u64::<BigEndian>()?; let payload_length = bytes.read_u64::<BigEndian>()?;
let payload = rmp_serde::from_read(bytes.take(payload_length))?; let mut payload = vec![0u8; payload_length as usize];
bytes.read_exact(&mut payload)?;
Ok(Self { Ok(Self {
name: event_name, name: event_name,
payload, payload,
}) })
} }
/// Returns the payload of the event as a deserialized messagepack value
pub fn get_payload<T: DeserializeOwned>(&self) -> VentedResult<T> {
let payload = rmp_serde::decode::from_read(&self.payload[..])?;
Ok(payload)
}
} }

@ -1,11 +1,12 @@
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use crate::event::Event; use crate::event::Event;
#[derive(PartialEq, Serialize, Deserialize, Clone, Debug)] #[derive(PartialEq, Serialize, Deserialize, Clone, Debug)]
struct SimplePayload { struct SimplePayload {
string: String, string: String,
number: u32, number: u32,
float: f32 float: f32,
} }
#[test] #[test]
@ -13,11 +14,11 @@ fn it_serializes_events() {
let payload = SimplePayload { let payload = SimplePayload {
string: "test".to_string(), string: "test".to_string(),
number: 7, number: 7,
float: 2.1 float: 2.1,
}; };
let payload_raw = rmp_serde::to_vec(&payload).unwrap(); let payload_raw = rmp_serde::to_vec(&payload).unwrap();
let event = Event::with_payload("test".to_string(), payload); let mut event = Event::with_payload("test".to_string(), &payload);
let event_bytes = event.to_bytes().unwrap(); let event_bytes = event.as_bytes().unwrap();
assert_eq!(event_bytes[0..2], [0x00, 0x04]); assert_eq!(event_bytes[0..2], [0x00, 0x04]);
assert_eq!(event_bytes[6..14], payload_raw.len().to_be_bytes()); assert_eq!(event_bytes[6..14], payload_raw.len().to_be_bytes());
@ -28,12 +29,15 @@ fn it_deserializes_events() {
let payload = SimplePayload { let payload = SimplePayload {
string: "test".to_string(), string: "test".to_string(),
number: 7, number: 7,
float: 2.1 float: 2.1,
}; };
let event = Event::with_payload("test".to_string(), payload.clone()); let mut event = Event::with_payload("test".to_string(), &payload);
let event_bytes = event.to_bytes().unwrap(); let event_bytes = event.as_bytes().unwrap();
let deserialized_event = Event::<SimplePayload>::from_bytes(&mut event_bytes.as_slice()).unwrap(); let deserialized_event = Event::from_bytes(&mut event_bytes.as_slice()).unwrap();
assert_eq!(deserialized_event.name, "test".to_string()); assert_eq!(deserialized_event.name, "test".to_string());
assert_eq!(deserialized_event.payload, payload); assert_eq!(
deserialized_event.get_payload::<SimplePayload>().unwrap(),
payload
);
} }

@ -1,20 +1,16 @@
use crate::event::Event;
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::collections::HashMap; use std::collections::HashMap;
use crate::event::Event;
#[cfg(test)] #[cfg(test)]
mod tests; mod tests;
/// A handler for events /// A handler for events
pub struct EventHandler<T> { pub struct EventHandler {
event_handlers: HashMap<String, Vec<Box<dyn Fn(&Event<T>)>>>, event_handlers: HashMap<String, Vec<Box<dyn Fn(Event) -> Option<Event> + Send + Sync>>>,
} }
impl<T> EventHandler<T> impl EventHandler {
where
T: Serialize + DeserializeOwned,
{
/// Creates a new vented event_handler /// Creates a new vented event_handler
pub fn new() -> Self { pub fn new() -> Self {
Self { Self {
@ -25,7 +21,7 @@ where
/// Adds a handler for the given event /// Adds a handler for the given event
pub fn on<F: 'static>(&mut self, event_name: &str, handler: F) pub fn on<F: 'static>(&mut self, event_name: &str, handler: F)
where where
F: Fn(&Event<T>), F: Fn(Event) -> Option<Event> + Send + Sync,
{ {
match self.event_handlers.get_mut(event_name) { match self.event_handlers.get_mut(event_name) {
Some(handlers) => handlers.push(Box::new(handler)), Some(handlers) => handlers.push(Box::new(handler)),
@ -37,9 +33,16 @@ where
} }
/// Handles a single event /// Handles a single event
pub fn handle_event(&mut self, event: Event<T>) -> bool { pub fn handle_event(&mut self, event: Event) -> bool {
if let Some(handlers) = self.event_handlers.get(&event.name) { if let Some(handlers) = self.event_handlers.get(&event.name) {
handlers.iter().for_each(|handler| handler(&event)); let mut event = event;
for handler in handlers {
if let Some(e) = handler(event) {
event = e;
} else {
break;
}
}
true true
} else { } else {

@ -1,22 +1,43 @@
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use crate::event::Event; use crate::event::Event;
use crate::event_handler::EventHandler; use crate::event_handler::EventHandler;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
#[test] #[test]
fn it_handles_events() { fn it_handles_events() {
let mut handler = EventHandler::new(); let mut handler = EventHandler::new();
let call_count = Arc::new(AtomicUsize::new(0)); let call_count = Arc::new(AtomicUsize::new(0));
{
let call_count = Arc::clone(&call_count);
handler.on("test", move |event| {
call_count.fetch_add(1, Ordering::Relaxed);
Some(event)
});
}
{ {
let call_count = Arc::clone(&call_count); let call_count = Arc::clone(&call_count);
handler.on("test", move |_event| { handler.on("test", move |_event| {
call_count.fetch_add(1, Ordering::Relaxed); call_count.fetch_add(1, Ordering::Relaxed);
None
});
}
{
let call_count = Arc::clone(&call_count);
handler.on("test2", move |_event| {
call_count.fetch_add(1, Ordering::Relaxed);
None
}); });
} }
{ {
let call_count = Arc::clone(&call_count); let call_count = Arc::clone(&call_count);
handler.on("test2", move |_event| { handler.on("test2", move |_event| {
call_count.fetch_add(1, Ordering::Relaxed); call_count.fetch_add(1, Ordering::Relaxed);
None
}) })
} }
@ -24,5 +45,5 @@ fn it_handles_events() {
handler.handle_event(Event::new("test".to_string())); handler.handle_event(Event::new("test".to_string()));
handler.handle_event(Event::new("test2".to_string())); handler.handle_event(Event::new("test2".to_string()));
assert_eq!(call_count.load(Ordering::Relaxed), 3) assert_eq!(call_count.load(Ordering::Relaxed), 5)
} }

@ -1,3 +1,4 @@
pub mod event; pub mod event;
pub mod event_handler; pub mod event_handler;
pub mod result; pub mod result;
pub mod server;

@ -0,0 +1,11 @@
use crate::event::Event;
use crate::result::VentedResult;
pub mod tcp;
pub trait VentedServer {
fn listen(&mut self, address: &str) -> VentedResult<()>;
fn register_handler<F: 'static>(&mut self, event_name: &str, handler: F)
where
F: Fn(Event) -> Option<Event> + Send + Sync;
}

@ -0,0 +1,52 @@
use std::borrow::BorrowMut;
use std::net::{TcpListener, TcpStream};
use std::sync::Arc;
use parking_lot::Mutex;
use scheduled_thread_pool::ScheduledThreadPool;
use crate::event::Event;
use crate::event_handler::EventHandler;
use crate::result::VentedResult;
use crate::server::VentedServer;
pub struct VentedTcpServer {
event_handler: Arc<Mutex<EventHandler>>,
pool: ScheduledThreadPool,
}
impl VentedServer for VentedTcpServer {
fn listen(&mut self, address: &str) -> VentedResult<()> {
let listener = TcpListener::bind(address)?;
for stream in listener.incoming() {
match stream {
Ok(stream) => self.handle_connection(stream),
Err(_) => {}
}
}
Ok(())
}
/// Registers an event on the internal event handler
fn register_handler<F: 'static>(&mut self, event_name: &str, handler: F)
where
F: Fn(Event) -> Option<Event> + Send + Sync,
{
self.event_handler
.lock()
.borrow_mut()
.on(event_name, handler);
}
}
impl VentedTcpServer {
fn handle_connection(&mut self, mut stream: TcpStream) {
let handler = Arc::clone(&self.event_handler);
self.pool.execute(move || {
if let Ok(event) = Event::from_bytes(&mut stream) {
handler.lock().handle_event(event);
}
});
}
}
Loading…
Cancel
Save