Add event response handling

Signed-off-by: trivernis <trivernis@protonmail.com>
pull/2/head
trivernis 3 years ago
parent 2ef6ba8ced
commit 24b5da1933
Signed by: Trivernis
GPG Key ID: DFFFCC2C7A02DB45

@ -1,4 +1,5 @@
use thiserror::Error; use thiserror::Error;
use tokio::sync::oneshot;
pub type Result<T> = std::result::Result<T, Error>; pub type Result<T> = std::result::Result<T, Error>;
@ -18,6 +19,9 @@ pub enum Error {
#[error("{0}")] #[error("{0}")]
Message(String), Message(String),
#[error("Channel Error: {0}")]
ReceiveError(#[from] oneshot::error::RecvError),
} }
impl From<String> for Error { impl From<String> for Error {

@ -65,4 +65,15 @@ impl Event {
Ok(event_bytes) Ok(event_bytes)
} }
/// The identifier of the message
pub fn id(&self) -> u64 {
self.id
}
/// The ID of the message referenced by this message.
/// It represents the message that is replied to and can be None.
pub fn reference_id(&self) -> Option<u64> {
self.ref_id.clone()
}
} }

@ -5,7 +5,6 @@ use crate::events::event_handler::EventHandler;
use crate::ipc::client::IPCClient; use crate::ipc::client::IPCClient;
use crate::ipc::context::Context; use crate::ipc::context::Context;
use crate::ipc::server::IPCServer; use crate::ipc::server::IPCServer;
use crate::ipc::stream_emitter::StreamEmitter;
use std::future::Future; use std::future::Future;
use std::pin::Pin; use std::pin::Pin;
@ -85,15 +84,15 @@ impl IPCBuilder {
} }
/// Builds an ipc client /// Builds an ipc client
pub async fn build_client(self) -> Result<StreamEmitter> { pub async fn build_client(self) -> Result<Context> {
self.validate()?; self.validate()?;
let client = IPCClient { let client = IPCClient {
handler: self.handler, handler: self.handler,
}; };
let emitter = client.connect(&self.address.unwrap()).await?; let ctx = client.connect(&self.address.unwrap()).await?;
Ok(emitter) Ok(ctx)
} }
/// Validates that all required fields have been provided /// Validates that all required fields have been provided

@ -16,17 +16,20 @@ pub struct IPCClient {
impl IPCClient { impl IPCClient {
/// Connects to a given address and returns an emitter for events to that address. /// Connects to a given address and returns an emitter for events to that address.
/// Invoked by [IPCBuilder::build_client](crate::builder::IPCBuilder::build_client) /// Invoked by [IPCBuilder::build_client](crate::builder::IPCBuilder::build_client)
pub async fn connect(self, address: &str) -> Result<StreamEmitter> { pub async fn connect(self, address: &str) -> Result<Context> {
let stream = TcpStream::connect(address).await?; let stream = TcpStream::connect(address).await?;
let (read_half, write_half) = stream.into_split(); let (read_half, write_half) = stream.into_split();
let emitter = StreamEmitter::new(write_half); let emitter = StreamEmitter::new(write_half);
let ctx = Context::new(StreamEmitter::clone(&emitter)); let ctx = Context::new(StreamEmitter::clone(&emitter));
let handler = Arc::new(self.handler); let handler = Arc::new(self.handler);
tokio::spawn(async move { tokio::spawn({
handle_connection(handler, read_half, ctx).await; let ctx = Context::clone(&ctx);
async move {
handle_connection(handler, read_half, ctx).await;
}
}); });
Ok(emitter) Ok(ctx)
} }
} }

@ -1,4 +1,9 @@
use crate::error::Result;
use crate::ipc::stream_emitter::StreamEmitter; use crate::ipc::stream_emitter::StreamEmitter;
use crate::Event;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::{oneshot, Mutex};
/// An object provided to each callback function. /// An object provided to each callback function.
/// Currently it only holds the event emitter to emit response events in event callbacks. /// Currently it only holds the event emitter to emit response events in event callbacks.
@ -18,10 +23,33 @@ use crate::ipc::stream_emitter::StreamEmitter;
pub struct Context { pub struct Context {
/// The event emitter /// The event emitter
pub emitter: StreamEmitter, pub emitter: StreamEmitter,
reply_listeners: Arc<Mutex<HashMap<u64, oneshot::Sender<Event>>>>,
} }
impl Context { impl Context {
pub(crate) fn new(emitter: StreamEmitter) -> Self { pub(crate) fn new(emitter: StreamEmitter) -> Self {
Self { emitter } Self {
emitter,
reply_listeners: Arc::new(Mutex::new(HashMap::new())),
}
}
/// Waits for a reply to the given message ID
pub async fn await_reply(&self, message_id: u64) -> Result<Event> {
let (rx, tx) = oneshot::channel();
{
let mut listeners = self.reply_listeners.lock().await;
listeners.insert(message_id, rx);
}
let event = tx.await?;
Ok(event)
}
/// Returns the channel for a reply to the given message id
pub(crate) async fn get_reply_sender(&self, ref_id: u64) -> Option<oneshot::Sender<Event>> {
let mut listeners = self.reply_listeners.lock().await;
listeners.remove(&ref_id)
} }
} }

@ -14,28 +14,41 @@ pub mod stream_emitter;
/// Handles listening to a connection and triggering the corresponding event functions /// Handles listening to a connection and triggering the corresponding event functions
async fn handle_connection(handler: Arc<EventHandler>, mut read_half: OwnedReadHalf, ctx: Context) { async fn handle_connection(handler: Arc<EventHandler>, mut read_half: OwnedReadHalf, ctx: Context) {
while let Ok(event) = Event::from_async_read(&mut read_half).await { while let Ok(event) = Event::from_async_read(&mut read_half).await {
let ctx = Context::clone(&ctx); // check if the event is a reply
let handler = Arc::clone(&handler); if let Some(ref_id) = event.reference_id() {
// get the listener for replies
tokio::spawn(async move { if let Some(sender) = ctx.get_reply_sender(ref_id).await {
if let Err(e) = handler.handle_event(&ctx, event).await { // try sending the event to the listener for replies
// emit an error event if let Err(event) = sender.send(event) {
if let Err(e) = ctx handle_event(Context::clone(&ctx), Arc::clone(&handler), event);
.emitter
.emit(
ERROR_EVENT_NAME,
ErrorEventData {
message: format!("{:?}", e),
code: 500,
},
)
.await
{
log::error!("Error occurred when sending error response: {:?}", e);
} }
log::error!("Failed to handle event: {:?}", e); continue;
} }
}); }
handle_event(Context::clone(&ctx), Arc::clone(&handler), event);
} }
log::debug!("Connection closed."); log::debug!("Connection closed.");
} }
/// Handles a single event in a different tokio context
fn handle_event(ctx: Context, handler: Arc<EventHandler>, event: Event) {
tokio::spawn(async move {
if let Err(e) = handler.handle_event(&ctx, event).await {
// emit an error event
if let Err(e) = ctx
.emitter
.emit(
ERROR_EVENT_NAME,
ErrorEventData {
message: format!("{:?}", e),
code: 500,
},
)
.await
{
log::error!("Error occurred when sending error response: {:?}", e);
}
log::error!("Failed to handle event: {:?}", e);
}
});
}

@ -1,3 +1,4 @@
use crate::context::Context;
use crate::error::Result; use crate::error::Result;
use crate::events::event::Event; use crate::events::event::Event;
use serde::Serialize; use serde::Serialize;
@ -26,7 +27,7 @@ impl StreamEmitter {
event: &str, event: &str,
data: T, data: T,
res_id: Option<u64>, res_id: Option<u64>,
) -> Result<()> { ) -> Result<EmitMetadata> {
let data_bytes = rmp_serde::to_vec(&data)?; let data_bytes = rmp_serde::to_vec(&data)?;
let event = Event::new(event.to_string(), data_bytes, res_id); let event = Event::new(event.to_string(), data_bytes, res_id);
let event_bytes = event.to_bytes()?; let event_bytes = event.to_bytes()?;
@ -35,14 +36,14 @@ impl StreamEmitter {
(*stream).write_all(&event_bytes[..]).await?; (*stream).write_all(&event_bytes[..]).await?;
} }
Ok(()) Ok(EmitMetadata::new(event.id()))
} }
/// Emits an event /// Emits an event
pub async fn emit<T: Serialize>(&self, event: &str, data: T) -> Result<()> { pub async fn emit<T: Serialize>(&self, event: &str, data: T) -> Result<EmitMetadata> {
self._emit(event, data, None).await?; let metadata = self._emit(event, data, None).await?;
Ok(()) Ok(metadata)
} }
/// Emits a response to an event /// Emits a response to an event
@ -51,9 +52,32 @@ impl StreamEmitter {
event_id: u64, event_id: u64,
event: &str, event: &str,
data: T, data: T,
) -> Result<()> { ) -> Result<EmitMetadata> {
self._emit(event, data, Some(event_id)).await?; let metadata = self._emit(event, data, Some(event_id)).await?;
Ok(()) Ok(metadata)
}
}
/// A metadata object returned after emitting an event.
/// This object can be used to wait for a response to an event.
pub struct EmitMetadata {
message_id: u64,
}
impl EmitMetadata {
pub(crate) fn new(message_id: u64) -> Self {
Self { message_id }
}
/// The ID of the emitted message
pub fn message_id(&self) -> u64 {
self.message_id
}
/// Waits for a reply to the given message.
pub async fn await_reply(&self, ctx: &Context) -> Result<Event> {
let reply = ctx.await_reply(self.message_id).await?;
Ok(reply)
} }
} }

@ -6,17 +6,19 @@
//! // create the client //! // create the client
//! # async fn a() { //! # async fn a() {
//! //!
//! let emitter = IPCBuilder::new() //! let ctx = IPCBuilder::new()
//! .address("127.0.0.1:2020") //! .address("127.0.0.1:2020")
//! // register callback //! // register callback
//! .on("ping", |_ctx, _event| Box::pin(async move { //! .on("ping", |ctx, event| Box::pin(async move {
//! println!("Received ping event."); //! println!("Received ping event.");
//! ctx.emitter.emit_response(event.id(), "pong", ()).await?;
//! Ok(()) //! Ok(())
//! })) //! }))
//! .build_client().await.unwrap(); //! .build_client().await.unwrap();
//! //!
//! // emit an initial event //! // emit an initial event
//! emitter.emit("ping", ()).await.unwrap(); //! let response = ctx.emitter.emit("ping", ()).await.unwrap().await_reply(&ctx).await.unwrap();
//! assert_eq!(response.name(), "pong");
//! # } //! # }
//! ``` //! ```
//! //!
@ -28,8 +30,9 @@
//! IPCBuilder::new() //! IPCBuilder::new()
//! .address("127.0.0.1:2020") //! .address("127.0.0.1:2020")
//! // register callback //! // register callback
//! .on("ping", |_ctx, _event| Box::pin(async move { //! .on("ping", |ctx, event| Box::pin(async move {
//! println!("Received ping event."); //! println!("Received ping event.");
//! ctx.emitter.emit_response(event.id(), "pong", ()).await?;
//! Ok(()) //! Ok(())
//! })) //! }))
//! .build_server().await.unwrap(); //! .build_server().await.unwrap();

@ -2,26 +2,22 @@ use self::super::utils::PingEventData;
use crate::error::Error; use crate::error::Error;
use crate::events::error_event::ErrorEventData; use crate::events::error_event::ErrorEventData;
use crate::IPCBuilder; use crate::IPCBuilder;
use std::sync::atomic::{AtomicBool, AtomicU8, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc; use std::sync::Arc;
use std::time::{Duration, SystemTime}; use std::time::{Duration, SystemTime};
#[tokio::test] #[tokio::test]
async fn it_receives_events() { async fn it_receives_events() {
let ctr = Arc::new(AtomicU8::new(0));
let builder = IPCBuilder::new() let builder = IPCBuilder::new()
.on("ping", { .on("ping", {
let ctr = Arc::clone(&ctr);
move |ctx, e| { move |ctx, e| {
let ctr = Arc::clone(&ctr);
Box::pin(async move { Box::pin(async move {
ctr.fetch_add(1, Ordering::Relaxed);
let mut ping_data = e.data::<PingEventData>()?; let mut ping_data = e.data::<PingEventData>()?;
ping_data.time = SystemTime::now(); ping_data.time = SystemTime::now();
ping_data.ttl -= 1; ping_data.ttl -= 1;
if ping_data.ttl > 0 { if ping_data.ttl > 0 {
ctx.emitter.emit("ping", ping_data).await?; ctx.emitter.emit_response(e.id(), "pong", ping_data).await?;
} }
Ok(()) Ok(())
@ -41,8 +37,9 @@ async fn it_receives_events() {
while !server_running.load(Ordering::Relaxed) { while !server_running.load(Ordering::Relaxed) {
tokio::time::sleep(Duration::from_millis(10)).await; tokio::time::sleep(Duration::from_millis(10)).await;
} }
let client = builder.build_client().await.unwrap(); let ctx = builder.build_client().await.unwrap();
client let reply = ctx
.emitter
.emit( .emit(
"ping", "ping",
PingEventData { PingEventData {
@ -51,9 +48,11 @@ async fn it_receives_events() {
}, },
) )
.await .await
.unwrap()
.await_reply(&ctx)
.await
.unwrap(); .unwrap();
tokio::time::sleep(Duration::from_secs(1)).await; assert_eq!(reply.name(), "pong");
assert_eq!(ctr.load(Ordering::SeqCst), 16);
} }
#[tokio::test] #[tokio::test]
@ -91,8 +90,8 @@ async fn it_handles_errors() {
while !server_running.load(Ordering::Relaxed) { while !server_running.load(Ordering::Relaxed) {
tokio::time::sleep(Duration::from_millis(10)).await; tokio::time::sleep(Duration::from_millis(10)).await;
} }
let client = builder.build_client().await.unwrap(); let ctx = builder.build_client().await.unwrap();
client.emit("ping", ()).await.unwrap(); ctx.emitter.emit("ping", ()).await.unwrap();
tokio::time::sleep(Duration::from_secs(1)).await; tokio::time::sleep(Duration::from_secs(1)).await;
assert!(error_occurred.load(Ordering::SeqCst)); assert!(error_occurred.load(Ordering::SeqCst));

Loading…
Cancel
Save