Extract handle_connection to an independent function

Signed-off-by: trivernis <trivernis@protonmail.com>
pull/2/head
trivernis 4 years ago
parent 8e483b97cb
commit 2ef6ba8ced
Signed by: Trivernis
GPG Key ID: DFFFCC2C7A02DB45

@ -1,8 +1,9 @@
use super::handle_connection;
use crate::error::Result; use crate::error::Result;
use crate::events::event::Event;
use crate::events::event_handler::EventHandler; use crate::events::event_handler::EventHandler;
use crate::ipc::context::Context; use crate::ipc::context::Context;
use crate::ipc::stream_emitter::StreamEmitter; use crate::ipc::stream_emitter::StreamEmitter;
use std::sync::Arc;
use tokio::net::TcpStream; use tokio::net::TcpStream;
/// The IPC Client to connect to an IPC Server. /// The IPC Client to connect to an IPC Server.
@ -17,17 +18,13 @@ impl IPCClient {
/// Invoked by [IPCBuilder::build_client](crate::builder::IPCBuilder::build_client) /// Invoked by [IPCBuilder::build_client](crate::builder::IPCBuilder::build_client)
pub async fn connect(self, address: &str) -> Result<StreamEmitter> { pub async fn connect(self, address: &str) -> Result<StreamEmitter> {
let stream = TcpStream::connect(address).await?; let stream = TcpStream::connect(address).await?;
let (mut read_half, write_half) = stream.into_split(); let (read_half, write_half) = stream.into_split();
let emitter = StreamEmitter::new(write_half); let emitter = StreamEmitter::new(write_half);
let ctx = Context::new(StreamEmitter::clone(&emitter)); let ctx = Context::new(StreamEmitter::clone(&emitter));
let handler = self.handler; let handler = Arc::new(self.handler);
tokio::spawn(async move { tokio::spawn(async move {
while let Ok(event) = Event::from_async_read(&mut read_half).await { handle_connection(handler, read_half, ctx).await;
if let Err(e) = handler.handle_event(&ctx, event).await {
log::error!("Failed to handle event: {:?}", e);
}
}
}); });
Ok(emitter) Ok(emitter)

@ -14,6 +14,7 @@ use crate::ipc::stream_emitter::StreamEmitter;
/// Ok(()) /// Ok(())
/// } /// }
/// ``` /// ```
#[derive(Clone)]
pub struct Context { pub struct Context {
/// The event emitter /// The event emitter
pub emitter: StreamEmitter, pub emitter: StreamEmitter,

@ -1,5 +1,41 @@
use crate::context::Context;
use crate::error_event::{ErrorEventData, ERROR_EVENT_NAME};
use crate::events::event_handler::EventHandler;
use crate::Event;
use std::sync::Arc;
use tokio::net::tcp::OwnedReadHalf;
pub mod builder; pub mod builder;
pub mod client; pub mod client;
pub mod context; pub mod context;
pub mod server; pub mod server;
pub mod stream_emitter; pub mod stream_emitter;
/// Handles listening to a connection and triggering the corresponding event functions
async fn handle_connection(handler: Arc<EventHandler>, mut read_half: OwnedReadHalf, ctx: Context) {
while let Ok(event) = Event::from_async_read(&mut read_half).await {
let ctx = Context::clone(&ctx);
let handler = Arc::clone(&handler);
tokio::spawn(async move {
if let Err(e) = handler.handle_event(&ctx, event).await {
// emit an error event
if let Err(e) = ctx
.emitter
.emit(
ERROR_EVENT_NAME,
ErrorEventData {
message: format!("{:?}", e),
code: 500,
},
)
.await
{
log::error!("Error occurred when sending error response: {:?}", e);
}
log::error!("Failed to handle event: {:?}", e);
}
});
}
log::debug!("Connection closed.");
}

@ -1,11 +1,10 @@
use super::handle_connection;
use crate::error::Result; use crate::error::Result;
use crate::events::error_event::{ErrorEventData, ERROR_EVENT_NAME};
use crate::events::event::Event;
use crate::events::event_handler::EventHandler; use crate::events::event_handler::EventHandler;
use crate::ipc::context::Context; use crate::ipc::context::Context;
use crate::ipc::stream_emitter::StreamEmitter; use crate::ipc::stream_emitter::StreamEmitter;
use std::sync::Arc; use std::sync::Arc;
use tokio::net::{TcpListener, TcpStream}; use tokio::net::TcpListener;
/// The IPC Server listening for connections. /// The IPC Server listening for connections.
/// Use the [IPCBuilder](crate::builder::IPCBuilder) to create a server. /// Use the [IPCBuilder](crate::builder::IPCBuilder) to create a server.
@ -22,41 +21,17 @@ impl IPCServer {
let handler = Arc::new(self.handler); let handler = Arc::new(self.handler);
while let Ok((stream, _)) = listener.accept().await { while let Ok((stream, _)) = listener.accept().await {
let handler = handler.clone(); let handler = Arc::clone(&handler);
tokio::spawn(async { tokio::spawn(async {
Self::handle_connection(handler, stream).await; let (read_half, write_half) = stream.into_split();
let emitter = StreamEmitter::new(write_half);
let ctx = Context::new(StreamEmitter::clone(&emitter));
handle_connection(handler, read_half, ctx).await;
}); });
} }
Ok(()) Ok(())
} }
/// Handles a single tcp connection
async fn handle_connection(handler: Arc<EventHandler>, stream: TcpStream) {
let (mut read_half, write_half) = stream.into_split();
let emitter = StreamEmitter::new(write_half);
let ctx = Context::new(StreamEmitter::clone(&emitter));
while let Ok(event) = Event::from_async_read(&mut read_half).await {
if let Err(e) = handler.handle_event(&ctx, event).await {
// emit an error event
if emitter
.emit(
ERROR_EVENT_NAME,
ErrorEventData {
message: format!("{:?}", e),
code: 500,
},
)
.await
.is_err()
{
break;
}
log::error!("Failed to handle event: {:?}", e);
}
}
log::debug!("Connection closed.");
}
} }

Loading…
Cancel
Save