diff --git a/Cargo.toml b/Cargo.toml index af063507..18d89502 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "rmp-ipc" -version = "0.2.0" +version = "0.2.1" authors = ["trivernis "] edition = "2018" readme = "README.md" diff --git a/src/ipc/mod.rs b/src/ipc/mod.rs index 5507c23f..7dfb75b8 100644 --- a/src/ipc/mod.rs +++ b/src/ipc/mod.rs @@ -33,11 +33,13 @@ async fn handle_connection(handler: Arc, mut read_half: OwnedReadH /// Handles a single event in a different tokio context fn handle_event(ctx: Context, handler: Arc, event: Event) { tokio::spawn(async move { + let id = event.id(); if let Err(e) = handler.handle_event(&ctx, event).await { // emit an error event if let Err(e) = ctx .emitter - .emit( + .emit_response( + id, ERROR_EVENT_NAME, ErrorEventData { message: format!("{:?}", e), diff --git a/src/tests/ipc_tests.rs b/src/tests/ipc_tests.rs index 6afe5872..8f510bd4 100644 --- a/src/tests/ipc_tests.rs +++ b/src/tests/ipc_tests.rs @@ -1,6 +1,7 @@ use self::super::utils::PingEventData; use crate::error::Error; use crate::events::error_event::ErrorEventData; +use crate::tests::utils::start_test_server; use crate::IPCBuilder; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; @@ -96,3 +97,33 @@ async fn it_handles_errors() { tokio::time::sleep(Duration::from_secs(1)).await; assert!(error_occurred.load(Ordering::SeqCst)); } + +#[tokio::test] +async fn test_error_responses() { + static ADDRESS: &str = "127.0.0.1:8284"; + start_test_server(ADDRESS).await.unwrap(); + let ctx = IPCBuilder::new() + .address(ADDRESS) + .build_client() + .await + .unwrap(); + let reply = ctx + .emitter + .emit("ping", ()) + .await + .unwrap() + .await_reply(&ctx) + .await + .unwrap(); + assert_eq!(reply.name(), "pong"); + + let reply = ctx + .emitter + .emit("trigger_error", ()) + .await + .unwrap() + .await_reply(&ctx) + .await + .unwrap(); + assert_eq!(reply.name(), "error"); +} diff --git a/src/tests/utils.rs b/src/tests/utils.rs index 416a027d..0804847d 100644 --- a/src/tests/utils.rs +++ b/src/tests/utils.rs @@ -1,8 +1,35 @@ +use crate::error::Error; +use crate::IPCBuilder; use serde::{Deserialize, Serialize}; use std::time::SystemTime; +use tokio::sync::oneshot; #[derive(Clone, Serialize, Deserialize)] pub struct PingEventData { pub time: SystemTime, pub ttl: u8, } + +/// Starts a test IPC server +pub fn start_test_server(address: &'static str) -> oneshot::Receiver { + let (tx, rx) = oneshot::channel(); + tokio::task::spawn(async move { + tx.send(true).unwrap(); + IPCBuilder::new() + .address(address) + .on("ping", |ctx, event| { + Box::pin(async move { + ctx.emitter.emit_response(event.id(), "pong", ()).await?; + Ok(()) + }) + }) + .on("trigger_error", |_, _| { + Box::pin(async move { Err(Error::from("An error occurred.")) }) + }) + .build_server() + .await + .unwrap(); + }); + + rx +}