diff --git a/.gitignore b/.gitignore index 408b8a57..2a0038a4 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,2 @@ /target -Cargo.lock .idea \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock new file mode 100644 index 00000000..1ff90b5c --- /dev/null +++ b/Cargo.lock @@ -0,0 +1,277 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 3 + +[[package]] +name = "autocfg" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a" + +[[package]] +name = "byteorder" +version = "1.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" + +[[package]] +name = "bytes" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4872d67bab6358e59559027aa3b9157c53d9358c51423c17554809a8858e0f8" + +[[package]] +name = "cfg-if" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" + +[[package]] +name = "hermit-abi" +version = "0.1.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62b467343b94ba476dcb2500d242dadbb39557df889310ac77c5d99100aaac33" +dependencies = [ + "libc", +] + +[[package]] +name = "lazy_static" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" + +[[package]] +name = "libc" +version = "0.2.103" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dd8f7255a17a627354f321ef0055d63b898c6fb27eff628af4d1b66b7331edf6" + +[[package]] +name = "log" +version = "0.4.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51b9bbe6c47d51fc3e1a9b945965946b4c44142ab8792c50835a980d362c2710" +dependencies = [ + "cfg-if", +] + +[[package]] +name = "memchr" +version = "2.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "308cc39be01b73d0d18f82a0e7b2a3df85245f84af96fdddc5d202d27e47b86a" + +[[package]] +name = "mio" +version = "0.7.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8c2bdb6314ec10835cd3293dd268473a835c02b7b352e788be788b3c6ca6bb16" +dependencies = [ + "libc", + "log", + "miow", + "ntapi", + "winapi", +] + +[[package]] +name = "miow" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9f1c5b025cda876f66ef43a113f91ebc9f4ccef34843000e0adf6ebbab84e21" +dependencies = [ + "winapi", +] + +[[package]] +name = "ntapi" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f6bb902e437b6d86e03cce10a7e2af662292c5dfef23b65899ea3ac9354ad44" +dependencies = [ + "winapi", +] + +[[package]] +name = "num-traits" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a64b1ec5cda2586e284722486d802acf1f7dbdc623e2bfc57e65ca1cd099290" +dependencies = [ + "autocfg", +] + +[[package]] +name = "num_cpus" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05499f3756671c15885fee9034446956fff3f243d6077b91e5767df161f766b3" +dependencies = [ + "hermit-abi", + "libc", +] + +[[package]] +name = "pin-project-lite" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8d31d11c69a6b52a174b42bdc0c30e5e11670f90788b2c471c31c1d17d449443" + +[[package]] +name = "proc-macro2" +version = "1.0.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9f5105d4fdaab20335ca9565e106a5d9b82b6219b5ba735731124ac6711d23d" +dependencies = [ + "unicode-xid", +] + +[[package]] +name = "quote" +version = "1.0.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38bc8cc6a5f2e3655e0899c1b848643b2562f853f114bfec7be120678e3ace05" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "rmp" +version = "0.8.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4f55e5fa1446c4d5dd1f5daeed2a4fe193071771a2636274d0d7a3b082aa7ad6" +dependencies = [ + "byteorder", + "num-traits", +] + +[[package]] +name = "rmp-ipc" +version = "0.2.1" +dependencies = [ + "lazy_static", + "log", + "rmp-serde", + "serde", + "thiserror", + "tokio", +] + +[[package]] +name = "rmp-serde" +version = "0.15.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "723ecff9ad04f4ad92fe1c8ca6c20d2196d9286e9c60727c4cb5511629260e9d" +dependencies = [ + "byteorder", + "rmp", + "serde", +] + +[[package]] +name = "serde" +version = "1.0.130" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f12d06de37cf59146fbdecab66aa99f9fe4f78722e3607577a5375d66bd0c913" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_derive" +version = "1.0.130" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7bc1a1ab1961464eae040d96713baa5a724a8152c1222492465b54322ec508b" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "syn" +version = "1.0.80" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d010a1623fbd906d51d650a9916aaefc05ffa0e4053ff7fe601167f3e715d194" +dependencies = [ + "proc-macro2", + "quote", + "unicode-xid", +] + +[[package]] +name = "thiserror" +version = "1.0.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "854babe52e4df1653706b98fcfc05843010039b406875930a70e4d9644e5c417" +dependencies = [ + "thiserror-impl", +] + +[[package]] +name = "thiserror-impl" +version = "1.0.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aa32fd3f627f367fe16f893e2597ae3c05020f8bba2666a4e6ea73d377e5714b" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tokio" +version = "1.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c2c2416fdedca8443ae44b4527de1ea633af61d8f7169ffa6e72c5b53d24efcc" +dependencies = [ + "autocfg", + "bytes", + "libc", + "memchr", + "mio", + "num_cpus", + "pin-project-lite", + "tokio-macros", + "winapi", +] + +[[package]] +name = "tokio-macros" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "154794c8f499c2619acd19e839294703e9e32e7630ef5f46ea80d4ef0fbee5eb" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "unicode-xid" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ccb82d61f80a663efe1f787a51b16b5a51e3314d6ac365b08639f52387b33f3" + +[[package]] +name = "winapi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" +dependencies = [ + "winapi-i686-pc-windows-gnu", + "winapi-x86_64-pc-windows-gnu", +] + +[[package]] +name = "winapi-i686-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" + +[[package]] +name = "winapi-x86_64-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" diff --git a/Cargo.toml b/Cargo.toml index 18d89502..70b33317 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,20 +11,20 @@ description = "IPC using Rust MessagePack (rmp)" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -thiserror = "1.0.24" +thiserror = "1.0.30" rmp-serde = "0.15.4" log = "0.4.14" lazy_static = "1.4.0" [dependencies.serde] -version = "1.0.125" +version = "1.0.130" features = ["serde_derive"] [dependencies.tokio] -version = "1.5.0" +version = "1.12.0" features = ["net", "io-std", "io-util", "sync", "time"] [dev-dependencies.tokio] -version = "1.5.0" +version = "1.12.0" features = ["macros", "rt-multi-thread"] \ No newline at end of file diff --git a/README.md b/README.md index 5da2a8b9..8520a451 100644 --- a/README.md +++ b/README.md @@ -35,6 +35,46 @@ IPCBuilder::new() .build_server().await.unwrap(); ``` +### Namespaces + +**Client:** +```rust +use rmp_ipc::IPCBuilder; +// create the client +let ctx = IPCBuilder::new() + .address("127.0.0.1:2020") + // register namespace + .namespace("mainspace-client") + // register callback + .on("ping", |_ctx, _event| Box::pin(async move { + println!("Received ping event."); + Ok(()) + })) + .build() + .build_client().await.unwrap(); + +// emit an initial event +let response = ctx.emitter.emit_to("mainspace-server", "ping", ()).await? + .await_response(&ctx).await?; +``` + +**Server:** +```rust +use rmp_ipc::IPCBuilder; +// create the server +IPCBuilder::new() + .address("127.0.0.1:2020") + // register namespace + .namespace("mainspace-server") + // register callback + .on("ping", |_ctx, _event| Box::pin(async move { + println!("Received ping event."); + Ok(()) + })) + .build() + .build_server().await.unwrap(); +``` + ## License Apache-2.0 \ No newline at end of file diff --git a/src/events/event.rs b/src/events/event.rs index 1dd00791..aeaaac6e 100644 --- a/src/events/event.rs +++ b/src/events/event.rs @@ -11,16 +11,34 @@ use tokio::io::{AsyncRead, AsyncReadExt}; pub struct Event { id: u64, ref_id: Option, + namespace: Option, name: String, data: Vec, } impl Event { + /// Creates a new event with a namespace + pub fn with_namespace( + namespace: String, + name: String, + data: Vec, + ref_id: Option, + ) -> Self { + Self { + id: generate_event_id(), + ref_id, + namespace: Some(namespace), + name, + data, + } + } + /// Creates a new event pub fn new(name: String, data: Vec, ref_id: Option) -> Self { Self { id: generate_event_id(), ref_id, + namespace: None, name, data, } @@ -38,6 +56,11 @@ impl Event { &self.data } + /// Returns a reference to the namespace + pub fn namespace(&self) -> &Option { + &self.namespace + } + /// Returns the name of the event pub fn name(&self) -> &str { &self.name diff --git a/src/ipc/builder.rs b/src/ipc/builder.rs index 7171f40a..a02bc36e 100644 --- a/src/ipc/builder.rs +++ b/src/ipc/builder.rs @@ -5,6 +5,9 @@ use crate::events::event_handler::EventHandler; use crate::ipc::client::IPCClient; use crate::ipc::context::Context; use crate::ipc::server::IPCServer; +use crate::namespaces::builder::NamespaceBuilder; +use crate::namespaces::namespace::Namespace; +use std::collections::HashMap; use std::future::Future; use std::pin::Pin; @@ -27,6 +30,7 @@ use std::pin::Pin; pub struct IPCBuilder { handler: EventHandler, address: Option, + namespaces: HashMap, } impl IPCBuilder { @@ -47,6 +51,7 @@ impl IPCBuilder { Self { handler, address: None, + namespaces: HashMap::new(), } } @@ -72,10 +77,24 @@ impl IPCBuilder { self } + /// Adds a namespace + pub fn namespace(self, name: S) -> NamespaceBuilder { + NamespaceBuilder::new(self, name.to_string()) + } + + /// Adds a namespace to the ipc server + pub fn add_namespace(mut self, namespace: Namespace) -> Self { + self.namespaces + .insert(namespace.name().to_owned(), namespace); + + self + } + /// Builds an ipc server pub async fn build_server(self) -> Result<()> { self.validate()?; let server = IPCServer { + namespaces: self.namespaces, handler: self.handler, }; server.start(&self.address.unwrap()).await?; @@ -87,6 +106,7 @@ impl IPCBuilder { pub async fn build_client(self) -> Result { self.validate()?; let client = IPCClient { + namespaces: self.namespaces, handler: self.handler, }; diff --git a/src/ipc/client.rs b/src/ipc/client.rs index 5e55892c..a23d9792 100644 --- a/src/ipc/client.rs +++ b/src/ipc/client.rs @@ -3,6 +3,8 @@ use crate::error::Result; use crate::events::event_handler::EventHandler; use crate::ipc::context::Context; use crate::ipc::stream_emitter::StreamEmitter; +use crate::namespaces::namespace::Namespace; +use std::collections::HashMap; use std::sync::Arc; use tokio::net::TcpStream; @@ -11,6 +13,7 @@ use tokio::net::TcpStream; /// Usually one does not need to use the IPCClient object directly. pub struct IPCClient { pub(crate) handler: EventHandler, + pub(crate) namespaces: HashMap, } impl IPCClient { @@ -22,11 +25,12 @@ impl IPCClient { let emitter = StreamEmitter::new(write_half); let ctx = Context::new(StreamEmitter::clone(&emitter)); let handler = Arc::new(self.handler); + let namespaces = Arc::new(self.namespaces); tokio::spawn({ let ctx = Context::clone(&ctx); async move { - handle_connection(handler, read_half, ctx).await; + handle_connection(namespaces, handler, read_half, ctx).await; } }); diff --git a/src/ipc/mod.rs b/src/ipc/mod.rs index 7dfb75b8..a68b59bf 100644 --- a/src/ipc/mod.rs +++ b/src/ipc/mod.rs @@ -1,7 +1,9 @@ use crate::context::Context; use crate::error_event::{ErrorEventData, ERROR_EVENT_NAME}; use crate::events::event_handler::EventHandler; +use crate::namespaces::namespace::Namespace; use crate::Event; +use std::collections::HashMap; use std::sync::Arc; use tokio::net::tcp::OwnedReadHalf; @@ -12,7 +14,12 @@ pub mod server; pub mod stream_emitter; /// Handles listening to a connection and triggering the corresponding event functions -async fn handle_connection(handler: Arc, mut read_half: OwnedReadHalf, ctx: Context) { +async fn handle_connection( + namespaces: Arc>, + handler: Arc, + mut read_half: OwnedReadHalf, + ctx: Context, +) { while let Ok(event) = Event::from_async_read(&mut read_half).await { // check if the event is a reply if let Some(ref_id) = event.reference_id() { @@ -25,7 +32,12 @@ async fn handle_connection(handler: Arc, mut read_half: OwnedReadH continue; } } - handle_event(Context::clone(&ctx), Arc::clone(&handler), event); + if let Some(namespace) = event.namespace().clone().and_then(|n| namespaces.get(&n)) { + let handler = Arc::clone(&namespace.handler); + handle_event(Context::clone(&ctx), handler, event); + } else { + handle_event(Context::clone(&ctx), Arc::clone(&handler), event); + } } log::debug!("Connection closed."); } diff --git a/src/ipc/server.rs b/src/ipc/server.rs index e9d3697e..c7ad1564 100644 --- a/src/ipc/server.rs +++ b/src/ipc/server.rs @@ -3,6 +3,8 @@ use crate::error::Result; use crate::events::event_handler::EventHandler; use crate::ipc::context::Context; use crate::ipc::stream_emitter::StreamEmitter; +use crate::namespaces::namespace::Namespace; +use std::collections::HashMap; use std::sync::Arc; use tokio::net::TcpListener; @@ -11,6 +13,7 @@ use tokio::net::TcpListener; /// Usually one does not need to use the IPCServer object directly. pub struct IPCServer { pub(crate) handler: EventHandler, + pub(crate) namespaces: HashMap, } impl IPCServer { @@ -19,16 +22,18 @@ impl IPCServer { pub async fn start(self, address: &str) -> Result<()> { let listener = TcpListener::bind(address).await?; let handler = Arc::new(self.handler); + let namespaces = Arc::new(self.namespaces); while let Ok((stream, _)) = listener.accept().await { let handler = Arc::clone(&handler); + let namespaces = Arc::clone(&namespaces); tokio::spawn(async { let (read_half, write_half) = stream.into_split(); let emitter = StreamEmitter::new(write_half); let ctx = Context::new(StreamEmitter::clone(&emitter)); - handle_connection(handler, read_half, ctx).await; + handle_connection(namespaces, handler, read_half, ctx).await; }); } diff --git a/src/ipc/stream_emitter.rs b/src/ipc/stream_emitter.rs index 2e637d29..500074ef 100644 --- a/src/ipc/stream_emitter.rs +++ b/src/ipc/stream_emitter.rs @@ -24,12 +24,18 @@ impl StreamEmitter { pub async fn _emit( &self, + namespace: Option<&str>, event: &str, data: T, res_id: Option, ) -> Result { let data_bytes = rmp_serde::to_vec(&data)?; - let event = Event::new(event.to_string(), data_bytes, res_id); + let event = if let Some(namespace) = namespace { + Event::with_namespace(namespace.to_string(), event.to_string(), data_bytes, res_id) + } else { + Event::new(event.to_string(), data_bytes, res_id) + }; + let event_bytes = event.to_bytes()?; { let mut stream = self.stream.lock().await; @@ -40,22 +46,50 @@ impl StreamEmitter { } /// Emits an event - pub async fn emit(&self, event: &str, data: T) -> Result { - let metadata = self._emit(event, data, None).await?; + pub async fn emit, T: Serialize>( + &self, + event: S, + data: T, + ) -> Result { + self._emit(None, event.as_ref(), data, None).await + } - Ok(metadata) + /// Emits an event to a specific namespace + pub async fn emit_to, S2: AsRef, T: Serialize>( + &self, + namespace: S1, + event: S2, + data: T, + ) -> Result { + self._emit(Some(namespace.as_ref()), event.as_ref(), data, None) + .await } /// Emits a response to an event - pub async fn emit_response( + pub async fn emit_response, T: Serialize>( &self, event_id: u64, - event: &str, + event: S, data: T, ) -> Result { - let metadata = self._emit(event, data, Some(event_id)).await?; + self._emit(None, event.as_ref(), data, Some(event_id)).await + } - Ok(metadata) + /// Emits a response to an event + pub async fn emit_response_to, S2: AsRef, T: Serialize>( + &self, + event_id: u64, + namespace: S1, + event: S2, + data: T, + ) -> Result { + self._emit( + Some(namespace.as_ref()), + event.as_ref(), + data, + Some(event_id), + ) + .await } } diff --git a/src/lib.rs b/src/lib.rs index caae50a8..68d6f6e5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -14,6 +14,13 @@ //! ctx.emitter.emit_response(event.id(), "pong", ()).await?; //! Ok(()) //! })) +//! .namespace("mainspace-client") +//! .on("something", |ctx, event| Box::pin(async move { +//! println!("I think the server did something"); +//! ctx.emitter.emit_response_to(event.id(), "mainspace-server", "ok", ()).await?; +//! Ok(()) +//! })) +//! .build() //! .build_client().await.unwrap(); //! //! // emit an initial event @@ -35,6 +42,13 @@ //! ctx.emitter.emit_response(event.id(), "pong", ()).await?; //! Ok(()) //! })) +//! .namespace("mainspace-server") +//! .on("do-something", |ctx, event| Box::pin(async move { +//! println!("Doing something"); +//! ctx.emitter.emit_response_to(event.id(), "mainspace-client", "something", ()).await?; +//! Ok(()) +//! })) +//! .build() //! .build_server().await.unwrap(); //! # } //! ``` @@ -45,6 +59,7 @@ mod tests; pub mod error; mod events; mod ipc; +mod namespaces; pub use events::error_event; pub use events::event::Event; diff --git a/src/namespaces/builder.rs b/src/namespaces/builder.rs new file mode 100644 index 00000000..4b3d7fb7 --- /dev/null +++ b/src/namespaces/builder.rs @@ -0,0 +1,44 @@ +use crate::context::Context; +use crate::error::Result; +use crate::events::event_handler::EventHandler; +use crate::namespaces::namespace::Namespace; +use crate::{Event, IPCBuilder}; +use std::future::Future; +use std::pin::Pin; + +pub struct NamespaceBuilder { + name: String, + handler: EventHandler, + ipc_builder: IPCBuilder, +} + +impl NamespaceBuilder { + pub(crate) fn new(ipc_builder: IPCBuilder, name: String) -> Self { + Self { + name, + handler: EventHandler::new(), + ipc_builder, + } + } + + /// Adds an event callback on the namespace + pub fn on(mut self, event: &str, callback: F) -> Self + where + F: for<'a> Fn( + &'a Context, + Event, + ) -> Pin> + Send + 'a)>> + + Send + + Sync, + { + self.handler.on(event, callback); + + self + } + + /// Builds the namespace + pub fn build(self) -> IPCBuilder { + let namespace = Namespace::new(self.name, self.handler); + self.ipc_builder.add_namespace(namespace) + } +} diff --git a/src/namespaces/mod.rs b/src/namespaces/mod.rs new file mode 100644 index 00000000..54853061 --- /dev/null +++ b/src/namespaces/mod.rs @@ -0,0 +1,2 @@ +pub mod builder; +pub mod namespace; diff --git a/src/namespaces/namespace.rs b/src/namespaces/namespace.rs new file mode 100644 index 00000000..65cd9b1b --- /dev/null +++ b/src/namespaces/namespace.rs @@ -0,0 +1,21 @@ +use crate::events::event_handler::EventHandler; +use std::sync::Arc; + +#[derive(Clone)] +pub struct Namespace { + name: String, + pub(crate) handler: Arc, +} + +impl Namespace { + pub fn new(name: S, handler: EventHandler) -> Self { + Self { + name: name.to_string(), + handler: Arc::new(handler), + } + } + + pub fn name(&self) -> &String { + &self.name + } +} diff --git a/src/tests/ipc_tests.rs b/src/tests/ipc_tests.rs index 8f510bd4..c2e98b2a 100644 --- a/src/tests/ipc_tests.rs +++ b/src/tests/ipc_tests.rs @@ -1,30 +1,67 @@ use self::super::utils::PingEventData; +use crate::context::Context; use crate::error::Error; +use crate::error::Result; use crate::events::error_event::ErrorEventData; use crate::tests::utils::start_test_server; -use crate::IPCBuilder; +use crate::{Event, IPCBuilder}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::{Duration, SystemTime}; +async fn handle_ping_event(ctx: &Context, e: Event) -> Result<()> { + let mut ping_data = e.data::()?; + ping_data.time = SystemTime::now(); + ping_data.ttl -= 1; + + if ping_data.ttl > 0 { + ctx.emitter.emit_response(e.id(), "pong", ping_data).await?; + } + + Ok(()) +} + #[tokio::test] async fn it_receives_events() { let builder = IPCBuilder::new() - .on("ping", { - move |ctx, e| { - Box::pin(async move { - let mut ping_data = e.data::()?; - ping_data.time = SystemTime::now(); - ping_data.ttl -= 1; - - if ping_data.ttl > 0 { - ctx.emitter.emit_response(e.id(), "pong", ping_data).await?; - } + .on("ping", |ctx, e| Box::pin(handle_ping_event(ctx, e))) + .address("127.0.0.1:8281"); + let server_running = Arc::new(AtomicBool::new(false)); + tokio::spawn({ + let server_running = Arc::clone(&server_running); + let builder = builder.clone(); + async move { + server_running.store(true, Ordering::SeqCst); + builder.build_server().await.unwrap(); + } + }); + while !server_running.load(Ordering::Relaxed) { + tokio::time::sleep(Duration::from_millis(10)).await; + } + let ctx = builder.build_client().await.unwrap(); + let reply = ctx + .emitter + .emit( + "ping", + PingEventData { + ttl: 16, + time: SystemTime::now(), + }, + ) + .await + .unwrap() + .await_reply(&ctx) + .await + .unwrap(); + assert_eq!(reply.name(), "pong"); +} - Ok(()) - }) - } - }) +#[tokio::test] +async fn it_receives_namespaced_events() { + let builder = IPCBuilder::new() + .namespace("mainspace") + .on("ping", |ctx, e| Box::pin(handle_ping_event(ctx, e))) + .build() .address("127.0.0.1:8282"); let server_running = Arc::new(AtomicBool::new(false)); tokio::spawn({ @@ -41,7 +78,8 @@ async fn it_receives_events() { let ctx = builder.build_client().await.unwrap(); let reply = ctx .emitter - .emit( + .emit_to( + "mainspace", "ping", PingEventData { ttl: 16,