From 15221b6f59034ac089a594b9b9726ed2defbd415 Mon Sep 17 00:00:00 2001 From: trivernis Date: Tue, 3 Nov 2020 21:00:28 +0100 Subject: [PATCH] Add vented client implementation Signed-off-by: trivernis --- src/client/mod.rs | 9 ++++++++ src/client/tcp/mod.rs | 23 +++++++++++++++++++ src/lib.rs | 1 + src/result.rs | 10 +++++++- src/server/mod.rs | 2 +- src/server/tcp/mod.rs | 7 +++--- src/server/tls/mod.rs | 27 +++++++++++++--------- tests/test_communication.rs | 46 +++++++++++++++++++++++++++++++++++++ 8 files changed, 109 insertions(+), 16 deletions(-) create mode 100644 src/client/mod.rs create mode 100644 src/client/tcp/mod.rs create mode 100644 tests/test_communication.rs diff --git a/src/client/mod.rs b/src/client/mod.rs new file mode 100644 index 0000000..fedec60 --- /dev/null +++ b/src/client/mod.rs @@ -0,0 +1,9 @@ +use crate::event::Event; +use crate::result::VentedResult; + +pub mod tcp; + +pub trait VentedClient: Sized { + fn connect(address: &str) -> VentedResult; + fn emit(&mut self, event: Event) -> VentedResult; +} diff --git a/src/client/tcp/mod.rs b/src/client/tcp/mod.rs new file mode 100644 index 0000000..b208f6d --- /dev/null +++ b/src/client/tcp/mod.rs @@ -0,0 +1,23 @@ +use crate::client::VentedClient; +use crate::event::Event; +use crate::result::VentedResult; +use std::io::Write; +use std::net::TcpStream; + +pub struct VentedTcpClient { + connection: TcpStream, +} + +impl VentedClient for VentedTcpClient { + fn connect(address: &str) -> VentedResult { + Ok(Self { + connection: TcpStream::connect(address)?, + }) + } + + fn emit(&mut self, mut event: Event) -> VentedResult { + self.connection.write(&event.as_bytes())?; + + Event::from_bytes(&mut self.connection) + } +} diff --git a/src/lib.rs b/src/lib.rs index e879237..c10e72d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,4 @@ +pub mod client; pub mod event; pub mod event_handler; pub mod result; diff --git a/src/result.rs b/src/result.rs index d2682e4..7124342 100644 --- a/src/result.rs +++ b/src/result.rs @@ -1,5 +1,5 @@ -use std::{fmt, io}; use std::error::Error; +use std::{fmt, io}; pub type VentedResult = Result; @@ -7,6 +7,7 @@ pub type VentedResult = Result; pub enum VentedError { NameDecodingError, IOError(io::Error), + TLSError(native_tls::Error), SerializeError(rmp_serde::encode::Error), DeserializeError(rmp_serde::decode::Error), } @@ -18,6 +19,7 @@ impl fmt::Display for VentedError { Self::IOError(e) => write!(f, "IO Error: {}", e.to_string()), Self::SerializeError(e) => write!(f, "Serialization Error: {}", e.to_string()), Self::DeserializeError(e) => write!(f, "Deserialization Error: {}", e.to_string()), + Self::TLSError(e) => write!(f, "TLS Error: {}", e.to_string()), } } } @@ -41,3 +43,9 @@ impl From for VentedError { Self::DeserializeError(other) } } + +impl From for VentedError { + fn from(other: native_tls::Error) -> Self { + Self::TLSError(other) + } +} diff --git a/src/server/mod.rs b/src/server/mod.rs index d410af0..06950a3 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -7,7 +7,7 @@ pub mod tls; pub trait VentedServer { fn listen(&mut self, address: &str) -> VentedResult<()>; - fn register_handler(&mut self, event_name: &str, handler: F) + fn on(&mut self, event_name: &str, handler: F) where F: Fn(Event) -> Option + Send + Sync; } diff --git a/src/server/tcp/mod.rs b/src/server/tcp/mod.rs index 71fb8c4..6ccacba 100644 --- a/src/server/tcp/mod.rs +++ b/src/server/tcp/mod.rs @@ -33,7 +33,7 @@ impl VentedServer for VentedTcpServer { } /// Registers an event on the internal event handler - fn register_handler(&mut self, event_name: &str, handler: F) + fn on(&mut self, event_name: &str, handler: F) where F: Fn(Event) -> Option + Send + Sync, { @@ -59,7 +59,7 @@ impl VentedTcpServer { /// Handles what happens on connection fn handle_connection(&mut self, mut stream: TcpStream) { let handler = Arc::clone(&self.event_handler); - self.pool.execute(move || { + self.pool.execute(move || loop { if let Ok(event) = Event::from_bytes(&mut stream) { if let Some(mut event) = handler.lock().handle_event(event) { if let Err(e) = stream.write(&event.as_bytes()) { @@ -67,7 +67,8 @@ impl VentedTcpServer { } } } else { - log::warn!("Failed to create an Event from received bytes.") + log::warn!("Failed to create an Event from received bytes."); + break; } }); } diff --git a/src/server/tls/mod.rs b/src/server/tls/mod.rs index a371679..a48af4a 100644 --- a/src/server/tls/mod.rs +++ b/src/server/tls/mod.rs @@ -3,11 +3,11 @@ use crate::event_handler::EventHandler; use crate::result::VentedResult; use crate::server::server_events::get_server_event_handler; use crate::server::VentedServer; -use native_tls::{Identity, TlsAcceptor, TlsStream}; +use native_tls::{Identity, TlsAcceptor}; use parking_lot::Mutex; use scheduled_thread_pool::ScheduledThreadPool; use std::borrow::BorrowMut; -use std::io::{Read, Write}; +use std::io::Write; use std::net::{TcpListener, TcpStream}; use std::sync::Arc; @@ -34,7 +34,7 @@ impl VentedServer for VentedTlsServer { Ok(()) } - fn register_handler(&mut self, event_name: &str, handler: F) + fn on(&mut self, event_name: &str, handler: F) where F: Fn(Event) -> Option + Send + Sync, { @@ -59,19 +59,24 @@ impl VentedTlsServer { } } - fn handle_connection(&self, mut stream: TcpStream, acceptor: Arc) { + fn handle_connection(&self, stream: TcpStream, acceptor: Arc) { let handler = Arc::clone(&self.event_handler); self.pool.execute(move || { - acceptor.accept(stream); - - if let Ok(event) = Event::from_bytes(&mut stream) { - if let Some(mut event) = handler.lock().handle_event(event) { - if let Err(e) = stream.write(&event.as_bytes()) { - log::error!("Failed to respond to event: {}", e) + if let Ok(mut stream) = acceptor.accept(stream) { + loop { + if let Ok(event) = Event::from_bytes(&mut stream) { + if let Some(mut event) = handler.lock().handle_event(event) { + if let Err(e) = stream.write(&event.as_bytes()) { + log::error!("Failed to respond to event: {}", e) + } + } + } else { + log::warn!("Failed to create an Event from received bytes."); + break; } } } else { - log::warn!("Failed to create an Event from received bytes.") + log::error!("TLS Error when handling connection") } }); } diff --git a/tests/test_communication.rs b/tests/test_communication.rs new file mode 100644 index 0000000..8be6373 --- /dev/null +++ b/tests/test_communication.rs @@ -0,0 +1,46 @@ +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use std::sync::Arc; +use std::thread; +use std::time::Duration; +use vented::client::tcp::VentedTcpClient; +use vented::client::VentedClient; +use vented::event::Event; +use vented::server::tcp::VentedTcpServer; +use vented::server::VentedServer; + +#[test] +fn test_pong_event() { + static ADDRESS: &str = "localhost:22222"; + static PING: &str = "ping"; + static PONG: &str = "pong"; + let ping_count = Arc::new(AtomicUsize::new(0)); + let server_ready = Arc::new(AtomicBool::new(false)); + + let mut server = VentedTcpServer::new(1); + { + let ping_received = Arc::clone(&ping_count); + server.on(PING, move |_event| { + ping_received.fetch_add(1, Ordering::Relaxed); + + Some(Event::new(PONG.to_string())) + }); + } + + thread::spawn({ + let server_ready = Arc::clone(&server_ready); + move || { + server_ready.store(true, Ordering::Relaxed); + server.listen(ADDRESS).unwrap(); + } + }); + + while !server_ready.load(Ordering::Relaxed) { + thread::sleep(Duration::from_millis(1)); + } + let mut client = VentedTcpClient::connect(ADDRESS).unwrap(); + client.emit(Event::new(PING.to_string())).unwrap(); + let response = client.emit(Event::new(PING.to_string())).unwrap(); + + assert_eq!(ping_count.load(Ordering::Relaxed), 2); + assert_eq!(response.name, PONG); +}