From e088680f685afc8eb6d32ca4c60ecfba070671be Mon Sep 17 00:00:00 2001 From: trivernis Date: Mon, 10 May 2021 12:20:08 +0200 Subject: [PATCH] Add error events Signed-off-by: trivernis --- Cargo.toml | 2 +- src/events/error_event.rs | 13 ++++++++++++ src/events/mod.rs | 1 + src/ipc/builder.rs | 16 +++++++++++++- src/ipc/server.rs | 15 +++++++++++++ src/lib.rs | 1 + src/tests/ipc_tests.rs | 44 +++++++++++++++++++++++++++++++++++++++ 7 files changed, 90 insertions(+), 2 deletions(-) create mode 100644 src/events/error_event.rs diff --git a/Cargo.toml b/Cargo.toml index 46826cfa..f50dbf26 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "rmp-ipc" -version = "0.1.0" +version = "0.1.1" authors = ["trivernis "] edition = "2018" readme = "README.md" diff --git a/src/events/error_event.rs b/src/events/error_event.rs new file mode 100644 index 00000000..e11ae703 --- /dev/null +++ b/src/events/error_event.rs @@ -0,0 +1,13 @@ +use serde::{Deserialize, Serialize}; + +pub static ERROR_EVENT_NAME: &str = "error"; + +/// Data returned on error event. +/// The error event has a default handler that just logs that +/// an error occurred. For a custom handler, register a handler on +/// the [ERROR_EVENT_NAME] event. +#[derive(Clone, Deserialize, Serialize, Debug)] +pub struct ErrorEventData { + pub code: u16, + pub message: String, +} diff --git a/src/events/mod.rs b/src/events/mod.rs index 3c79c5c6..fb1138df 100644 --- a/src/events/mod.rs +++ b/src/events/mod.rs @@ -1,2 +1,3 @@ +pub mod error_event; pub mod event; pub mod event_handler; diff --git a/src/ipc/builder.rs b/src/ipc/builder.rs index 8f3e9c17..88040fe0 100644 --- a/src/ipc/builder.rs +++ b/src/ipc/builder.rs @@ -1,4 +1,5 @@ use crate::error::{Error, Result}; +use crate::events::error_event::{ErrorEventData, ERROR_EVENT_NAME}; use crate::events::event::Event; use crate::events::event_handler::EventHandler; use crate::ipc::client::IPCClient; @@ -29,8 +30,21 @@ pub struct IPCBuilder { impl IPCBuilder { pub fn new() -> Self { + let mut handler = EventHandler::new(); + handler.on(ERROR_EVENT_NAME, |_, event| { + Box::pin(async move { + let error_data = event.data::()?; + log::warn!( + "Received Error Response from Server: {} - {}", + error_data.code, + error_data.message + ); + + Ok(()) + }) + }); Self { - handler: EventHandler::new(), + handler, address: None, } } diff --git a/src/ipc/server.rs b/src/ipc/server.rs index 6da64816..296bdc41 100644 --- a/src/ipc/server.rs +++ b/src/ipc/server.rs @@ -1,4 +1,5 @@ use crate::error::Result; +use crate::events::error_event::{ErrorEventData, ERROR_EVENT_NAME}; use crate::events::event::Event; use crate::events::event_handler::EventHandler; use crate::ipc::context::Context; @@ -39,6 +40,20 @@ impl IPCServer { while let Ok(event) = Event::from_async_read(&mut read_half).await { if let Err(e) = handler.handle_event(&ctx, event).await { + // emit an error event + if emitter + .emit( + ERROR_EVENT_NAME, + ErrorEventData { + message: format!("{:?}", e), + code: 500, + }, + ) + .await + .is_err() + { + break; + } log::error!("Failed to handle event: {:?}", e); } } diff --git a/src/lib.rs b/src/lib.rs index e5575d1f..25dd5a4f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -38,6 +38,7 @@ pub mod error; mod events; mod ipc; +pub use events::error_event; pub use events::event::Event; pub use ipc::builder::IPCBuilder; pub use ipc::*; diff --git a/src/tests/ipc_tests.rs b/src/tests/ipc_tests.rs index 7473f978..823dfda5 100644 --- a/src/tests/ipc_tests.rs +++ b/src/tests/ipc_tests.rs @@ -1,4 +1,6 @@ use self::super::utils::PingEventData; +use crate::error::Error; +use crate::events::error_event::ErrorEventData; use crate::IPCBuilder; use std::sync::atomic::{AtomicBool, AtomicU8, Ordering}; use std::sync::Arc; @@ -53,3 +55,45 @@ async fn it_receives_events() { tokio::time::sleep(Duration::from_secs(1)).await; assert_eq!(ctr.load(Ordering::SeqCst), 16); } + +#[tokio::test] +async fn it_handles_errors() { + let error_occurred = Arc::new(AtomicBool::new(false)); + let builder = IPCBuilder::new() + .on("ping", move |_, _| { + Box::pin(async move { Err(Error::from("ERRROROROROR")) }) + }) + .on("error", { + let error_occurred = Arc::clone(&error_occurred); + move |_, e| { + let error_occurred = Arc::clone(&error_occurred); + Box::pin(async move { + let error = e.data::()?; + assert!(error.message.len() > 0); + assert_eq!(error.code, 500); + error_occurred.store(true, Ordering::SeqCst); + Ok(()) + }) + } + }) + .address("127.0.0.1:8283"); + let server_running = Arc::new(AtomicBool::new(false)); + + tokio::spawn({ + let server_running = Arc::clone(&server_running); + let builder = builder.clone(); + async move { + server_running.store(true, Ordering::SeqCst); + builder.build_server().await.unwrap(); + } + }); + + while !server_running.load(Ordering::Relaxed) { + tokio::time::sleep(Duration::from_millis(10)).await; + } + let client = builder.build_client().await.unwrap(); + client.emit("ping", ()).await.unwrap(); + + tokio::time::sleep(Duration::from_secs(1)).await; + assert!(error_occurred.load(Ordering::SeqCst)); +}