Add vented client implementation

Signed-off-by: trivernis <trivernis@protonmail.com>
pull/1/head
trivernis 4 years ago
parent bcc3024fc3
commit 15221b6f59
Signed by: Trivernis
GPG Key ID: DFFFCC2C7A02DB45

@ -0,0 +1,9 @@
use crate::event::Event;
use crate::result::VentedResult;
pub mod tcp;
pub trait VentedClient: Sized {
fn connect(address: &str) -> VentedResult<Self>;
fn emit(&mut self, event: Event) -> VentedResult<Event>;
}

@ -0,0 +1,23 @@
use crate::client::VentedClient;
use crate::event::Event;
use crate::result::VentedResult;
use std::io::Write;
use std::net::TcpStream;
pub struct VentedTcpClient {
connection: TcpStream,
}
impl VentedClient for VentedTcpClient {
fn connect(address: &str) -> VentedResult<Self> {
Ok(Self {
connection: TcpStream::connect(address)?,
})
}
fn emit(&mut self, mut event: Event) -> VentedResult<Event> {
self.connection.write(&event.as_bytes())?;
Event::from_bytes(&mut self.connection)
}
}

@ -1,3 +1,4 @@
pub mod client;
pub mod event; pub mod event;
pub mod event_handler; pub mod event_handler;
pub mod result; pub mod result;

@ -1,5 +1,5 @@
use std::{fmt, io};
use std::error::Error; use std::error::Error;
use std::{fmt, io};
pub type VentedResult<T> = Result<T, VentedError>; pub type VentedResult<T> = Result<T, VentedError>;
@ -7,6 +7,7 @@ pub type VentedResult<T> = Result<T, VentedError>;
pub enum VentedError { pub enum VentedError {
NameDecodingError, NameDecodingError,
IOError(io::Error), IOError(io::Error),
TLSError(native_tls::Error),
SerializeError(rmp_serde::encode::Error), SerializeError(rmp_serde::encode::Error),
DeserializeError(rmp_serde::decode::Error), DeserializeError(rmp_serde::decode::Error),
} }
@ -18,6 +19,7 @@ impl fmt::Display for VentedError {
Self::IOError(e) => write!(f, "IO Error: {}", e.to_string()), Self::IOError(e) => write!(f, "IO Error: {}", e.to_string()),
Self::SerializeError(e) => write!(f, "Serialization Error: {}", e.to_string()), Self::SerializeError(e) => write!(f, "Serialization Error: {}", e.to_string()),
Self::DeserializeError(e) => write!(f, "Deserialization Error: {}", e.to_string()), Self::DeserializeError(e) => write!(f, "Deserialization Error: {}", e.to_string()),
Self::TLSError(e) => write!(f, "TLS Error: {}", e.to_string()),
} }
} }
} }
@ -41,3 +43,9 @@ impl From<rmp_serde::decode::Error> for VentedError {
Self::DeserializeError(other) Self::DeserializeError(other)
} }
} }
impl From<native_tls::Error> for VentedError {
fn from(other: native_tls::Error) -> Self {
Self::TLSError(other)
}
}

@ -7,7 +7,7 @@ pub mod tls;
pub trait VentedServer { pub trait VentedServer {
fn listen(&mut self, address: &str) -> VentedResult<()>; fn listen(&mut self, address: &str) -> VentedResult<()>;
fn register_handler<F: 'static>(&mut self, event_name: &str, handler: F) fn on<F: 'static>(&mut self, event_name: &str, handler: F)
where where
F: Fn(Event) -> Option<Event> + Send + Sync; F: Fn(Event) -> Option<Event> + Send + Sync;
} }

@ -33,7 +33,7 @@ impl VentedServer for VentedTcpServer {
} }
/// Registers an event on the internal event handler /// Registers an event on the internal event handler
fn register_handler<F: 'static>(&mut self, event_name: &str, handler: F) fn on<F: 'static>(&mut self, event_name: &str, handler: F)
where where
F: Fn(Event) -> Option<Event> + Send + Sync, F: Fn(Event) -> Option<Event> + Send + Sync,
{ {
@ -59,7 +59,7 @@ impl VentedTcpServer {
/// Handles what happens on connection /// Handles what happens on connection
fn handle_connection(&mut self, mut stream: TcpStream) { fn handle_connection(&mut self, mut stream: TcpStream) {
let handler = Arc::clone(&self.event_handler); let handler = Arc::clone(&self.event_handler);
self.pool.execute(move || { self.pool.execute(move || loop {
if let Ok(event) = Event::from_bytes(&mut stream) { if let Ok(event) = Event::from_bytes(&mut stream) {
if let Some(mut event) = handler.lock().handle_event(event) { if let Some(mut event) = handler.lock().handle_event(event) {
if let Err(e) = stream.write(&event.as_bytes()) { if let Err(e) = stream.write(&event.as_bytes()) {
@ -67,7 +67,8 @@ impl VentedTcpServer {
} }
} }
} else { } else {
log::warn!("Failed to create an Event from received bytes.") log::warn!("Failed to create an Event from received bytes.");
break;
} }
}); });
} }

@ -3,11 +3,11 @@ use crate::event_handler::EventHandler;
use crate::result::VentedResult; use crate::result::VentedResult;
use crate::server::server_events::get_server_event_handler; use crate::server::server_events::get_server_event_handler;
use crate::server::VentedServer; use crate::server::VentedServer;
use native_tls::{Identity, TlsAcceptor, TlsStream}; use native_tls::{Identity, TlsAcceptor};
use parking_lot::Mutex; use parking_lot::Mutex;
use scheduled_thread_pool::ScheduledThreadPool; use scheduled_thread_pool::ScheduledThreadPool;
use std::borrow::BorrowMut; use std::borrow::BorrowMut;
use std::io::{Read, Write}; use std::io::Write;
use std::net::{TcpListener, TcpStream}; use std::net::{TcpListener, TcpStream};
use std::sync::Arc; use std::sync::Arc;
@ -34,7 +34,7 @@ impl VentedServer for VentedTlsServer {
Ok(()) Ok(())
} }
fn register_handler<F: 'static>(&mut self, event_name: &str, handler: F) fn on<F: 'static>(&mut self, event_name: &str, handler: F)
where where
F: Fn(Event) -> Option<Event> + Send + Sync, F: Fn(Event) -> Option<Event> + Send + Sync,
{ {
@ -59,19 +59,24 @@ impl VentedTlsServer {
} }
} }
fn handle_connection(&self, mut stream: TcpStream, acceptor: Arc<TlsAcceptor>) { fn handle_connection(&self, stream: TcpStream, acceptor: Arc<TlsAcceptor>) {
let handler = Arc::clone(&self.event_handler); let handler = Arc::clone(&self.event_handler);
self.pool.execute(move || { self.pool.execute(move || {
acceptor.accept(stream); if let Ok(mut stream) = acceptor.accept(stream) {
loop {
if let Ok(event) = Event::from_bytes(&mut stream) { if let Ok(event) = Event::from_bytes(&mut stream) {
if let Some(mut event) = handler.lock().handle_event(event) { if let Some(mut event) = handler.lock().handle_event(event) {
if let Err(e) = stream.write(&event.as_bytes()) { if let Err(e) = stream.write(&event.as_bytes()) {
log::error!("Failed to respond to event: {}", e) log::error!("Failed to respond to event: {}", e)
}
}
} else {
log::warn!("Failed to create an Event from received bytes.");
break;
} }
} }
} else { } else {
log::warn!("Failed to create an Event from received bytes.") log::error!("TLS Error when handling connection")
} }
}); });
} }

@ -0,0 +1,46 @@
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::Duration;
use vented::client::tcp::VentedTcpClient;
use vented::client::VentedClient;
use vented::event::Event;
use vented::server::tcp::VentedTcpServer;
use vented::server::VentedServer;
#[test]
fn test_pong_event() {
static ADDRESS: &str = "localhost:22222";
static PING: &str = "ping";
static PONG: &str = "pong";
let ping_count = Arc::new(AtomicUsize::new(0));
let server_ready = Arc::new(AtomicBool::new(false));
let mut server = VentedTcpServer::new(1);
{
let ping_received = Arc::clone(&ping_count);
server.on(PING, move |_event| {
ping_received.fetch_add(1, Ordering::Relaxed);
Some(Event::new(PONG.to_string()))
});
}
thread::spawn({
let server_ready = Arc::clone(&server_ready);
move || {
server_ready.store(true, Ordering::Relaxed);
server.listen(ADDRESS).unwrap();
}
});
while !server_ready.load(Ordering::Relaxed) {
thread::sleep(Duration::from_millis(1));
}
let mut client = VentedTcpClient::connect(ADDRESS).unwrap();
client.emit(Event::new(PING.to_string())).unwrap();
let response = client.emit(Event::new(PING.to_string())).unwrap();
assert_eq!(ping_count.load(Ordering::Relaxed), 2);
assert_eq!(response.name, PONG);
}
Loading…
Cancel
Save