From 8e483b97cb9899ee0286563ec8681b5dfb50f897 Mon Sep 17 00:00:00 2001 From: trivernis Date: Mon, 10 May 2021 18:51:11 +0200 Subject: [PATCH] Add event_ids to events Signed-off-by: trivernis --- Cargo.toml | 1 + src/events/event.rs | 12 ++++++++++-- src/events/mod.rs | 16 ++++++++++++++++ src/ipc/stream_emitter.rs | 29 ++++++++++++++++++++++++++--- src/tests/event_tests.rs | 12 ++++++++++++ src/tests/mod.rs | 1 + 6 files changed, 66 insertions(+), 5 deletions(-) create mode 100644 src/tests/event_tests.rs diff --git a/Cargo.toml b/Cargo.toml index 6ab99b7a..7610affe 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,6 +14,7 @@ description = "IPC using Rust MessagePack (rmp)" thiserror = "1.0.24" rmp-serde = "0.15.4" log = "0.4.14" +lazy_static = "1.4.0" [dependencies.serde] version = "1.0.125" diff --git a/src/events/event.rs b/src/events/event.rs index 74f35a75..26fab4bb 100644 --- a/src/events/event.rs +++ b/src/events/event.rs @@ -1,4 +1,5 @@ use crate::error::Result; +use crate::events::generate_event_id; use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; use tokio::io::{AsyncRead, AsyncReadExt}; @@ -8,14 +9,21 @@ use tokio::io::{AsyncRead, AsyncReadExt}; /// as raw binary data. #[derive(Serialize, Deserialize)] pub struct Event { + id: u64, + ref_id: Option, name: String, data: Vec, } impl Event { /// Creates a new event - pub fn new(name: String, data: Vec) -> Self { - Self { name, data } + pub fn new(name: String, data: Vec, ref_id: Option) -> Self { + Self { + id: generate_event_id(), + ref_id, + name, + data, + } } /// Decodes the data to the given type diff --git a/src/events/mod.rs b/src/events/mod.rs index fb1138df..b0a5dd12 100644 --- a/src/events/mod.rs +++ b/src/events/mod.rs @@ -1,3 +1,19 @@ +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Arc; +use std::time::{SystemTime, UNIX_EPOCH}; + pub mod error_event; pub mod event; pub mod event_handler; + +/// Generates a new event id +pub(crate) fn generate_event_id() -> u64 { + lazy_static::lazy_static! { + static ref COUNTER: Arc = Arc::new(AtomicU64::new(0)); + } + let epoch_elapsed = SystemTime::now().duration_since(UNIX_EPOCH).unwrap(); + + (epoch_elapsed.as_secs() % u16::MAX as u64) << 48 + | (epoch_elapsed.subsec_millis() as u64) << 32 + | COUNTER.fetch_add(1, Ordering::SeqCst) +} diff --git a/src/ipc/stream_emitter.rs b/src/ipc/stream_emitter.rs index 281283e4..8c2d4e5c 100644 --- a/src/ipc/stream_emitter.rs +++ b/src/ipc/stream_emitter.rs @@ -21,10 +21,14 @@ impl StreamEmitter { } } - /// Emits an event - pub async fn emit(&self, event: &str, data: T) -> Result<()> { + pub async fn _emit( + &self, + event: &str, + data: T, + res_id: Option, + ) -> Result<()> { let data_bytes = rmp_serde::to_vec(&data)?; - let event = Event::new(event.to_string(), data_bytes); + let event = Event::new(event.to_string(), data_bytes, res_id); let event_bytes = event.to_bytes()?; { let mut stream = self.stream.lock().await; @@ -33,4 +37,23 @@ impl StreamEmitter { Ok(()) } + + /// Emits an event + pub async fn emit(&self, event: &str, data: T) -> Result<()> { + self._emit(event, data, None).await?; + + Ok(()) + } + + /// Emits a response to an event + pub async fn emit_response( + &self, + event_id: u64, + event: &str, + data: T, + ) -> Result<()> { + self._emit(event, data, Some(event_id)).await?; + + Ok(()) + } } diff --git a/src/tests/event_tests.rs b/src/tests/event_tests.rs new file mode 100644 index 00000000..0eca8653 --- /dev/null +++ b/src/tests/event_tests.rs @@ -0,0 +1,12 @@ +use crate::events::generate_event_id; +use std::collections::HashSet; + +#[test] +fn event_ids_work() { + let mut ids = HashSet::new(); + + // simple collision test + for _ in 0..100000 { + assert!(ids.insert(generate_event_id())) + } +} diff --git a/src/tests/mod.rs b/src/tests/mod.rs index 1b176c03..8f29a871 100644 --- a/src/tests/mod.rs +++ b/src/tests/mod.rs @@ -1,2 +1,3 @@ mod ipc_tests; mod utils; +mod event_tests;