diff --git a/src/ipc/client.rs b/src/ipc/client.rs index 2d1a3b0b..3a8af336 100644 --- a/src/ipc/client.rs +++ b/src/ipc/client.rs @@ -1,8 +1,9 @@ +use super::handle_connection; use crate::error::Result; -use crate::events::event::Event; use crate::events::event_handler::EventHandler; use crate::ipc::context::Context; use crate::ipc::stream_emitter::StreamEmitter; +use std::sync::Arc; use tokio::net::TcpStream; /// The IPC Client to connect to an IPC Server. @@ -17,17 +18,13 @@ impl IPCClient { /// Invoked by [IPCBuilder::build_client](crate::builder::IPCBuilder::build_client) pub async fn connect(self, address: &str) -> Result { let stream = TcpStream::connect(address).await?; - let (mut read_half, write_half) = stream.into_split(); + let (read_half, write_half) = stream.into_split(); let emitter = StreamEmitter::new(write_half); let ctx = Context::new(StreamEmitter::clone(&emitter)); - let handler = self.handler; + let handler = Arc::new(self.handler); tokio::spawn(async move { - while let Ok(event) = Event::from_async_read(&mut read_half).await { - if let Err(e) = handler.handle_event(&ctx, event).await { - log::error!("Failed to handle event: {:?}", e); - } - } + handle_connection(handler, read_half, ctx).await; }); Ok(emitter) diff --git a/src/ipc/context.rs b/src/ipc/context.rs index 6395bb01..53ec5ab3 100644 --- a/src/ipc/context.rs +++ b/src/ipc/context.rs @@ -14,6 +14,7 @@ use crate::ipc::stream_emitter::StreamEmitter; /// Ok(()) /// } /// ``` +#[derive(Clone)] pub struct Context { /// The event emitter pub emitter: StreamEmitter, diff --git a/src/ipc/mod.rs b/src/ipc/mod.rs index 81b79180..c3761594 100644 --- a/src/ipc/mod.rs +++ b/src/ipc/mod.rs @@ -1,5 +1,41 @@ +use crate::context::Context; +use crate::error_event::{ErrorEventData, ERROR_EVENT_NAME}; +use crate::events::event_handler::EventHandler; +use crate::Event; +use std::sync::Arc; +use tokio::net::tcp::OwnedReadHalf; + pub mod builder; pub mod client; pub mod context; pub mod server; pub mod stream_emitter; + +/// Handles listening to a connection and triggering the corresponding event functions +async fn handle_connection(handler: Arc, mut read_half: OwnedReadHalf, ctx: Context) { + while let Ok(event) = Event::from_async_read(&mut read_half).await { + let ctx = Context::clone(&ctx); + let handler = Arc::clone(&handler); + + tokio::spawn(async move { + if let Err(e) = handler.handle_event(&ctx, event).await { + // emit an error event + if let Err(e) = ctx + .emitter + .emit( + ERROR_EVENT_NAME, + ErrorEventData { + message: format!("{:?}", e), + code: 500, + }, + ) + .await + { + log::error!("Error occurred when sending error response: {:?}", e); + } + log::error!("Failed to handle event: {:?}", e); + } + }); + } + log::debug!("Connection closed."); +} diff --git a/src/ipc/server.rs b/src/ipc/server.rs index 296bdc41..e9d3697e 100644 --- a/src/ipc/server.rs +++ b/src/ipc/server.rs @@ -1,11 +1,10 @@ +use super::handle_connection; use crate::error::Result; -use crate::events::error_event::{ErrorEventData, ERROR_EVENT_NAME}; -use crate::events::event::Event; use crate::events::event_handler::EventHandler; use crate::ipc::context::Context; use crate::ipc::stream_emitter::StreamEmitter; use std::sync::Arc; -use tokio::net::{TcpListener, TcpStream}; +use tokio::net::TcpListener; /// The IPC Server listening for connections. /// Use the [IPCBuilder](crate::builder::IPCBuilder) to create a server. @@ -22,41 +21,17 @@ impl IPCServer { let handler = Arc::new(self.handler); while let Ok((stream, _)) = listener.accept().await { - let handler = handler.clone(); + let handler = Arc::clone(&handler); tokio::spawn(async { - Self::handle_connection(handler, stream).await; + let (read_half, write_half) = stream.into_split(); + let emitter = StreamEmitter::new(write_half); + let ctx = Context::new(StreamEmitter::clone(&emitter)); + + handle_connection(handler, read_half, ctx).await; }); } Ok(()) } - - /// Handles a single tcp connection - async fn handle_connection(handler: Arc, stream: TcpStream) { - let (mut read_half, write_half) = stream.into_split(); - let emitter = StreamEmitter::new(write_half); - let ctx = Context::new(StreamEmitter::clone(&emitter)); - - while let Ok(event) = Event::from_async_read(&mut read_half).await { - if let Err(e) = handler.handle_event(&ctx, event).await { - // emit an error event - if emitter - .emit( - ERROR_EVENT_NAME, - ErrorEventData { - message: format!("{:?}", e), - code: 500, - }, - ) - .await - .is_err() - { - break; - } - log::error!("Failed to handle event: {:?}", e); - } - } - log::debug!("Connection closed."); - } }