From 1921c2a70433dc0ce975d2afbbc2baf453201bec Mon Sep 17 00:00:00 2001 From: trivernis Date: Sat, 9 Oct 2021 10:19:26 +0200 Subject: [PATCH] Add namespaces Signed-off-by: trivernis --- README.md | 40 +++++++++++++++++++++ src/events/event.rs | 23 ++++++++++++ src/ipc/builder.rs | 20 +++++++++++ src/ipc/client.rs | 6 +++- src/ipc/mod.rs | 16 +++++++-- src/ipc/server.rs | 7 +++- src/ipc/stream_emitter.rs | 50 +++++++++++++++++++++----- src/lib.rs | 15 ++++++++ src/namespaces/builder.rs | 44 +++++++++++++++++++++++ src/namespaces/mod.rs | 2 ++ src/namespaces/namespace.rs | 21 +++++++++++ src/tests/ipc_tests.rs | 70 ++++++++++++++++++++++++++++--------- 12 files changed, 286 insertions(+), 28 deletions(-) create mode 100644 src/namespaces/builder.rs create mode 100644 src/namespaces/mod.rs create mode 100644 src/namespaces/namespace.rs diff --git a/README.md b/README.md index 5da2a8b9..8520a451 100644 --- a/README.md +++ b/README.md @@ -35,6 +35,46 @@ IPCBuilder::new() .build_server().await.unwrap(); ``` +### Namespaces + +**Client:** +```rust +use rmp_ipc::IPCBuilder; +// create the client +let ctx = IPCBuilder::new() + .address("127.0.0.1:2020") + // register namespace + .namespace("mainspace-client") + // register callback + .on("ping", |_ctx, _event| Box::pin(async move { + println!("Received ping event."); + Ok(()) + })) + .build() + .build_client().await.unwrap(); + +// emit an initial event +let response = ctx.emitter.emit_to("mainspace-server", "ping", ()).await? + .await_response(&ctx).await?; +``` + +**Server:** +```rust +use rmp_ipc::IPCBuilder; +// create the server +IPCBuilder::new() + .address("127.0.0.1:2020") + // register namespace + .namespace("mainspace-server") + // register callback + .on("ping", |_ctx, _event| Box::pin(async move { + println!("Received ping event."); + Ok(()) + })) + .build() + .build_server().await.unwrap(); +``` + ## License Apache-2.0 \ No newline at end of file diff --git a/src/events/event.rs b/src/events/event.rs index 1dd00791..aeaaac6e 100644 --- a/src/events/event.rs +++ b/src/events/event.rs @@ -11,16 +11,34 @@ use tokio::io::{AsyncRead, AsyncReadExt}; pub struct Event { id: u64, ref_id: Option, + namespace: Option, name: String, data: Vec, } impl Event { + /// Creates a new event with a namespace + pub fn with_namespace( + namespace: String, + name: String, + data: Vec, + ref_id: Option, + ) -> Self { + Self { + id: generate_event_id(), + ref_id, + namespace: Some(namespace), + name, + data, + } + } + /// Creates a new event pub fn new(name: String, data: Vec, ref_id: Option) -> Self { Self { id: generate_event_id(), ref_id, + namespace: None, name, data, } @@ -38,6 +56,11 @@ impl Event { &self.data } + /// Returns a reference to the namespace + pub fn namespace(&self) -> &Option { + &self.namespace + } + /// Returns the name of the event pub fn name(&self) -> &str { &self.name diff --git a/src/ipc/builder.rs b/src/ipc/builder.rs index 7171f40a..a02bc36e 100644 --- a/src/ipc/builder.rs +++ b/src/ipc/builder.rs @@ -5,6 +5,9 @@ use crate::events::event_handler::EventHandler; use crate::ipc::client::IPCClient; use crate::ipc::context::Context; use crate::ipc::server::IPCServer; +use crate::namespaces::builder::NamespaceBuilder; +use crate::namespaces::namespace::Namespace; +use std::collections::HashMap; use std::future::Future; use std::pin::Pin; @@ -27,6 +30,7 @@ use std::pin::Pin; pub struct IPCBuilder { handler: EventHandler, address: Option, + namespaces: HashMap, } impl IPCBuilder { @@ -47,6 +51,7 @@ impl IPCBuilder { Self { handler, address: None, + namespaces: HashMap::new(), } } @@ -72,10 +77,24 @@ impl IPCBuilder { self } + /// Adds a namespace + pub fn namespace(self, name: S) -> NamespaceBuilder { + NamespaceBuilder::new(self, name.to_string()) + } + + /// Adds a namespace to the ipc server + pub fn add_namespace(mut self, namespace: Namespace) -> Self { + self.namespaces + .insert(namespace.name().to_owned(), namespace); + + self + } + /// Builds an ipc server pub async fn build_server(self) -> Result<()> { self.validate()?; let server = IPCServer { + namespaces: self.namespaces, handler: self.handler, }; server.start(&self.address.unwrap()).await?; @@ -87,6 +106,7 @@ impl IPCBuilder { pub async fn build_client(self) -> Result { self.validate()?; let client = IPCClient { + namespaces: self.namespaces, handler: self.handler, }; diff --git a/src/ipc/client.rs b/src/ipc/client.rs index 5e55892c..a23d9792 100644 --- a/src/ipc/client.rs +++ b/src/ipc/client.rs @@ -3,6 +3,8 @@ use crate::error::Result; use crate::events::event_handler::EventHandler; use crate::ipc::context::Context; use crate::ipc::stream_emitter::StreamEmitter; +use crate::namespaces::namespace::Namespace; +use std::collections::HashMap; use std::sync::Arc; use tokio::net::TcpStream; @@ -11,6 +13,7 @@ use tokio::net::TcpStream; /// Usually one does not need to use the IPCClient object directly. pub struct IPCClient { pub(crate) handler: EventHandler, + pub(crate) namespaces: HashMap, } impl IPCClient { @@ -22,11 +25,12 @@ impl IPCClient { let emitter = StreamEmitter::new(write_half); let ctx = Context::new(StreamEmitter::clone(&emitter)); let handler = Arc::new(self.handler); + let namespaces = Arc::new(self.namespaces); tokio::spawn({ let ctx = Context::clone(&ctx); async move { - handle_connection(handler, read_half, ctx).await; + handle_connection(namespaces, handler, read_half, ctx).await; } }); diff --git a/src/ipc/mod.rs b/src/ipc/mod.rs index 7dfb75b8..a68b59bf 100644 --- a/src/ipc/mod.rs +++ b/src/ipc/mod.rs @@ -1,7 +1,9 @@ use crate::context::Context; use crate::error_event::{ErrorEventData, ERROR_EVENT_NAME}; use crate::events::event_handler::EventHandler; +use crate::namespaces::namespace::Namespace; use crate::Event; +use std::collections::HashMap; use std::sync::Arc; use tokio::net::tcp::OwnedReadHalf; @@ -12,7 +14,12 @@ pub mod server; pub mod stream_emitter; /// Handles listening to a connection and triggering the corresponding event functions -async fn handle_connection(handler: Arc, mut read_half: OwnedReadHalf, ctx: Context) { +async fn handle_connection( + namespaces: Arc>, + handler: Arc, + mut read_half: OwnedReadHalf, + ctx: Context, +) { while let Ok(event) = Event::from_async_read(&mut read_half).await { // check if the event is a reply if let Some(ref_id) = event.reference_id() { @@ -25,7 +32,12 @@ async fn handle_connection(handler: Arc, mut read_half: OwnedReadH continue; } } - handle_event(Context::clone(&ctx), Arc::clone(&handler), event); + if let Some(namespace) = event.namespace().clone().and_then(|n| namespaces.get(&n)) { + let handler = Arc::clone(&namespace.handler); + handle_event(Context::clone(&ctx), handler, event); + } else { + handle_event(Context::clone(&ctx), Arc::clone(&handler), event); + } } log::debug!("Connection closed."); } diff --git a/src/ipc/server.rs b/src/ipc/server.rs index e9d3697e..c7ad1564 100644 --- a/src/ipc/server.rs +++ b/src/ipc/server.rs @@ -3,6 +3,8 @@ use crate::error::Result; use crate::events::event_handler::EventHandler; use crate::ipc::context::Context; use crate::ipc::stream_emitter::StreamEmitter; +use crate::namespaces::namespace::Namespace; +use std::collections::HashMap; use std::sync::Arc; use tokio::net::TcpListener; @@ -11,6 +13,7 @@ use tokio::net::TcpListener; /// Usually one does not need to use the IPCServer object directly. pub struct IPCServer { pub(crate) handler: EventHandler, + pub(crate) namespaces: HashMap, } impl IPCServer { @@ -19,16 +22,18 @@ impl IPCServer { pub async fn start(self, address: &str) -> Result<()> { let listener = TcpListener::bind(address).await?; let handler = Arc::new(self.handler); + let namespaces = Arc::new(self.namespaces); while let Ok((stream, _)) = listener.accept().await { let handler = Arc::clone(&handler); + let namespaces = Arc::clone(&namespaces); tokio::spawn(async { let (read_half, write_half) = stream.into_split(); let emitter = StreamEmitter::new(write_half); let ctx = Context::new(StreamEmitter::clone(&emitter)); - handle_connection(handler, read_half, ctx).await; + handle_connection(namespaces, handler, read_half, ctx).await; }); } diff --git a/src/ipc/stream_emitter.rs b/src/ipc/stream_emitter.rs index 2e637d29..500074ef 100644 --- a/src/ipc/stream_emitter.rs +++ b/src/ipc/stream_emitter.rs @@ -24,12 +24,18 @@ impl StreamEmitter { pub async fn _emit( &self, + namespace: Option<&str>, event: &str, data: T, res_id: Option, ) -> Result { let data_bytes = rmp_serde::to_vec(&data)?; - let event = Event::new(event.to_string(), data_bytes, res_id); + let event = if let Some(namespace) = namespace { + Event::with_namespace(namespace.to_string(), event.to_string(), data_bytes, res_id) + } else { + Event::new(event.to_string(), data_bytes, res_id) + }; + let event_bytes = event.to_bytes()?; { let mut stream = self.stream.lock().await; @@ -40,22 +46,50 @@ impl StreamEmitter { } /// Emits an event - pub async fn emit(&self, event: &str, data: T) -> Result { - let metadata = self._emit(event, data, None).await?; + pub async fn emit, T: Serialize>( + &self, + event: S, + data: T, + ) -> Result { + self._emit(None, event.as_ref(), data, None).await + } - Ok(metadata) + /// Emits an event to a specific namespace + pub async fn emit_to, S2: AsRef, T: Serialize>( + &self, + namespace: S1, + event: S2, + data: T, + ) -> Result { + self._emit(Some(namespace.as_ref()), event.as_ref(), data, None) + .await } /// Emits a response to an event - pub async fn emit_response( + pub async fn emit_response, T: Serialize>( &self, event_id: u64, - event: &str, + event: S, data: T, ) -> Result { - let metadata = self._emit(event, data, Some(event_id)).await?; + self._emit(None, event.as_ref(), data, Some(event_id)).await + } - Ok(metadata) + /// Emits a response to an event + pub async fn emit_response_to, S2: AsRef, T: Serialize>( + &self, + event_id: u64, + namespace: S1, + event: S2, + data: T, + ) -> Result { + self._emit( + Some(namespace.as_ref()), + event.as_ref(), + data, + Some(event_id), + ) + .await } } diff --git a/src/lib.rs b/src/lib.rs index caae50a8..68d6f6e5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -14,6 +14,13 @@ //! ctx.emitter.emit_response(event.id(), "pong", ()).await?; //! Ok(()) //! })) +//! .namespace("mainspace-client") +//! .on("something", |ctx, event| Box::pin(async move { +//! println!("I think the server did something"); +//! ctx.emitter.emit_response_to(event.id(), "mainspace-server", "ok", ()).await?; +//! Ok(()) +//! })) +//! .build() //! .build_client().await.unwrap(); //! //! // emit an initial event @@ -35,6 +42,13 @@ //! ctx.emitter.emit_response(event.id(), "pong", ()).await?; //! Ok(()) //! })) +//! .namespace("mainspace-server") +//! .on("do-something", |ctx, event| Box::pin(async move { +//! println!("Doing something"); +//! ctx.emitter.emit_response_to(event.id(), "mainspace-client", "something", ()).await?; +//! Ok(()) +//! })) +//! .build() //! .build_server().await.unwrap(); //! # } //! ``` @@ -45,6 +59,7 @@ mod tests; pub mod error; mod events; mod ipc; +mod namespaces; pub use events::error_event; pub use events::event::Event; diff --git a/src/namespaces/builder.rs b/src/namespaces/builder.rs new file mode 100644 index 00000000..4b3d7fb7 --- /dev/null +++ b/src/namespaces/builder.rs @@ -0,0 +1,44 @@ +use crate::context::Context; +use crate::error::Result; +use crate::events::event_handler::EventHandler; +use crate::namespaces::namespace::Namespace; +use crate::{Event, IPCBuilder}; +use std::future::Future; +use std::pin::Pin; + +pub struct NamespaceBuilder { + name: String, + handler: EventHandler, + ipc_builder: IPCBuilder, +} + +impl NamespaceBuilder { + pub(crate) fn new(ipc_builder: IPCBuilder, name: String) -> Self { + Self { + name, + handler: EventHandler::new(), + ipc_builder, + } + } + + /// Adds an event callback on the namespace + pub fn on(mut self, event: &str, callback: F) -> Self + where + F: for<'a> Fn( + &'a Context, + Event, + ) -> Pin> + Send + 'a)>> + + Send + + Sync, + { + self.handler.on(event, callback); + + self + } + + /// Builds the namespace + pub fn build(self) -> IPCBuilder { + let namespace = Namespace::new(self.name, self.handler); + self.ipc_builder.add_namespace(namespace) + } +} diff --git a/src/namespaces/mod.rs b/src/namespaces/mod.rs new file mode 100644 index 00000000..54853061 --- /dev/null +++ b/src/namespaces/mod.rs @@ -0,0 +1,2 @@ +pub mod builder; +pub mod namespace; diff --git a/src/namespaces/namespace.rs b/src/namespaces/namespace.rs new file mode 100644 index 00000000..65cd9b1b --- /dev/null +++ b/src/namespaces/namespace.rs @@ -0,0 +1,21 @@ +use crate::events::event_handler::EventHandler; +use std::sync::Arc; + +#[derive(Clone)] +pub struct Namespace { + name: String, + pub(crate) handler: Arc, +} + +impl Namespace { + pub fn new(name: S, handler: EventHandler) -> Self { + Self { + name: name.to_string(), + handler: Arc::new(handler), + } + } + + pub fn name(&self) -> &String { + &self.name + } +} diff --git a/src/tests/ipc_tests.rs b/src/tests/ipc_tests.rs index 8f510bd4..c2e98b2a 100644 --- a/src/tests/ipc_tests.rs +++ b/src/tests/ipc_tests.rs @@ -1,30 +1,67 @@ use self::super::utils::PingEventData; +use crate::context::Context; use crate::error::Error; +use crate::error::Result; use crate::events::error_event::ErrorEventData; use crate::tests::utils::start_test_server; -use crate::IPCBuilder; +use crate::{Event, IPCBuilder}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::{Duration, SystemTime}; +async fn handle_ping_event(ctx: &Context, e: Event) -> Result<()> { + let mut ping_data = e.data::()?; + ping_data.time = SystemTime::now(); + ping_data.ttl -= 1; + + if ping_data.ttl > 0 { + ctx.emitter.emit_response(e.id(), "pong", ping_data).await?; + } + + Ok(()) +} + #[tokio::test] async fn it_receives_events() { let builder = IPCBuilder::new() - .on("ping", { - move |ctx, e| { - Box::pin(async move { - let mut ping_data = e.data::()?; - ping_data.time = SystemTime::now(); - ping_data.ttl -= 1; - - if ping_data.ttl > 0 { - ctx.emitter.emit_response(e.id(), "pong", ping_data).await?; - } + .on("ping", |ctx, e| Box::pin(handle_ping_event(ctx, e))) + .address("127.0.0.1:8281"); + let server_running = Arc::new(AtomicBool::new(false)); + tokio::spawn({ + let server_running = Arc::clone(&server_running); + let builder = builder.clone(); + async move { + server_running.store(true, Ordering::SeqCst); + builder.build_server().await.unwrap(); + } + }); + while !server_running.load(Ordering::Relaxed) { + tokio::time::sleep(Duration::from_millis(10)).await; + } + let ctx = builder.build_client().await.unwrap(); + let reply = ctx + .emitter + .emit( + "ping", + PingEventData { + ttl: 16, + time: SystemTime::now(), + }, + ) + .await + .unwrap() + .await_reply(&ctx) + .await + .unwrap(); + assert_eq!(reply.name(), "pong"); +} - Ok(()) - }) - } - }) +#[tokio::test] +async fn it_receives_namespaced_events() { + let builder = IPCBuilder::new() + .namespace("mainspace") + .on("ping", |ctx, e| Box::pin(handle_ping_event(ctx, e))) + .build() .address("127.0.0.1:8282"); let server_running = Arc::new(AtomicBool::new(false)); tokio::spawn({ @@ -41,7 +78,8 @@ async fn it_receives_events() { let ctx = builder.build_client().await.unwrap(); let reply = ctx .emitter - .emit( + .emit_to( + "mainspace", "ping", PingEventData { ttl: 16,