Add namespaces

Signed-off-by: trivernis <trivernis@protonmail.com>
pull/4/head
trivernis 3 years ago
parent 5afd48a8a6
commit 1921c2a704
Signed by: Trivernis
GPG Key ID: DFFFCC2C7A02DB45

@ -35,6 +35,46 @@ IPCBuilder::new()
.build_server().await.unwrap();
```
### Namespaces
**Client:**
```rust
use rmp_ipc::IPCBuilder;
// create the client
let ctx = IPCBuilder::new()
.address("127.0.0.1:2020")
// register namespace
.namespace("mainspace-client")
// register callback
.on("ping", |_ctx, _event| Box::pin(async move {
println!("Received ping event.");
Ok(())
}))
.build()
.build_client().await.unwrap();
// emit an initial event
let response = ctx.emitter.emit_to("mainspace-server", "ping", ()).await?
.await_response(&ctx).await?;
```
**Server:**
```rust
use rmp_ipc::IPCBuilder;
// create the server
IPCBuilder::new()
.address("127.0.0.1:2020")
// register namespace
.namespace("mainspace-server")
// register callback
.on("ping", |_ctx, _event| Box::pin(async move {
println!("Received ping event.");
Ok(())
}))
.build()
.build_server().await.unwrap();
```
## License
Apache-2.0

@ -11,16 +11,34 @@ use tokio::io::{AsyncRead, AsyncReadExt};
pub struct Event {
id: u64,
ref_id: Option<u64>,
namespace: Option<String>,
name: String,
data: Vec<u8>,
}
impl Event {
/// Creates a new event with a namespace
pub fn with_namespace(
namespace: String,
name: String,
data: Vec<u8>,
ref_id: Option<u64>,
) -> Self {
Self {
id: generate_event_id(),
ref_id,
namespace: Some(namespace),
name,
data,
}
}
/// Creates a new event
pub fn new(name: String, data: Vec<u8>, ref_id: Option<u64>) -> Self {
Self {
id: generate_event_id(),
ref_id,
namespace: None,
name,
data,
}
@ -38,6 +56,11 @@ impl Event {
&self.data
}
/// Returns a reference to the namespace
pub fn namespace(&self) -> &Option<String> {
&self.namespace
}
/// Returns the name of the event
pub fn name(&self) -> &str {
&self.name

@ -5,6 +5,9 @@ use crate::events::event_handler::EventHandler;
use crate::ipc::client::IPCClient;
use crate::ipc::context::Context;
use crate::ipc::server::IPCServer;
use crate::namespaces::builder::NamespaceBuilder;
use crate::namespaces::namespace::Namespace;
use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;
@ -27,6 +30,7 @@ use std::pin::Pin;
pub struct IPCBuilder {
handler: EventHandler,
address: Option<String>,
namespaces: HashMap<String, Namespace>,
}
impl IPCBuilder {
@ -47,6 +51,7 @@ impl IPCBuilder {
Self {
handler,
address: None,
namespaces: HashMap::new(),
}
}
@ -72,10 +77,24 @@ impl IPCBuilder {
self
}
/// Adds a namespace
pub fn namespace<S: ToString>(self, name: S) -> NamespaceBuilder {
NamespaceBuilder::new(self, name.to_string())
}
/// Adds a namespace to the ipc server
pub fn add_namespace(mut self, namespace: Namespace) -> Self {
self.namespaces
.insert(namespace.name().to_owned(), namespace);
self
}
/// Builds an ipc server
pub async fn build_server(self) -> Result<()> {
self.validate()?;
let server = IPCServer {
namespaces: self.namespaces,
handler: self.handler,
};
server.start(&self.address.unwrap()).await?;
@ -87,6 +106,7 @@ impl IPCBuilder {
pub async fn build_client(self) -> Result<Context> {
self.validate()?;
let client = IPCClient {
namespaces: self.namespaces,
handler: self.handler,
};

@ -3,6 +3,8 @@ use crate::error::Result;
use crate::events::event_handler::EventHandler;
use crate::ipc::context::Context;
use crate::ipc::stream_emitter::StreamEmitter;
use crate::namespaces::namespace::Namespace;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::net::TcpStream;
@ -11,6 +13,7 @@ use tokio::net::TcpStream;
/// Usually one does not need to use the IPCClient object directly.
pub struct IPCClient {
pub(crate) handler: EventHandler,
pub(crate) namespaces: HashMap<String, Namespace>,
}
impl IPCClient {
@ -22,11 +25,12 @@ impl IPCClient {
let emitter = StreamEmitter::new(write_half);
let ctx = Context::new(StreamEmitter::clone(&emitter));
let handler = Arc::new(self.handler);
let namespaces = Arc::new(self.namespaces);
tokio::spawn({
let ctx = Context::clone(&ctx);
async move {
handle_connection(handler, read_half, ctx).await;
handle_connection(namespaces, handler, read_half, ctx).await;
}
});

@ -1,7 +1,9 @@
use crate::context::Context;
use crate::error_event::{ErrorEventData, ERROR_EVENT_NAME};
use crate::events::event_handler::EventHandler;
use crate::namespaces::namespace::Namespace;
use crate::Event;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::net::tcp::OwnedReadHalf;
@ -12,7 +14,12 @@ pub mod server;
pub mod stream_emitter;
/// Handles listening to a connection and triggering the corresponding event functions
async fn handle_connection(handler: Arc<EventHandler>, mut read_half: OwnedReadHalf, ctx: Context) {
async fn handle_connection(
namespaces: Arc<HashMap<String, Namespace>>,
handler: Arc<EventHandler>,
mut read_half: OwnedReadHalf,
ctx: Context,
) {
while let Ok(event) = Event::from_async_read(&mut read_half).await {
// check if the event is a reply
if let Some(ref_id) = event.reference_id() {
@ -25,7 +32,12 @@ async fn handle_connection(handler: Arc<EventHandler>, mut read_half: OwnedReadH
continue;
}
}
handle_event(Context::clone(&ctx), Arc::clone(&handler), event);
if let Some(namespace) = event.namespace().clone().and_then(|n| namespaces.get(&n)) {
let handler = Arc::clone(&namespace.handler);
handle_event(Context::clone(&ctx), handler, event);
} else {
handle_event(Context::clone(&ctx), Arc::clone(&handler), event);
}
}
log::debug!("Connection closed.");
}

@ -3,6 +3,8 @@ use crate::error::Result;
use crate::events::event_handler::EventHandler;
use crate::ipc::context::Context;
use crate::ipc::stream_emitter::StreamEmitter;
use crate::namespaces::namespace::Namespace;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::net::TcpListener;
@ -11,6 +13,7 @@ use tokio::net::TcpListener;
/// Usually one does not need to use the IPCServer object directly.
pub struct IPCServer {
pub(crate) handler: EventHandler,
pub(crate) namespaces: HashMap<String, Namespace>,
}
impl IPCServer {
@ -19,16 +22,18 @@ impl IPCServer {
pub async fn start(self, address: &str) -> Result<()> {
let listener = TcpListener::bind(address).await?;
let handler = Arc::new(self.handler);
let namespaces = Arc::new(self.namespaces);
while let Ok((stream, _)) = listener.accept().await {
let handler = Arc::clone(&handler);
let namespaces = Arc::clone(&namespaces);
tokio::spawn(async {
let (read_half, write_half) = stream.into_split();
let emitter = StreamEmitter::new(write_half);
let ctx = Context::new(StreamEmitter::clone(&emitter));
handle_connection(handler, read_half, ctx).await;
handle_connection(namespaces, handler, read_half, ctx).await;
});
}

@ -24,12 +24,18 @@ impl StreamEmitter {
pub async fn _emit<T: Serialize>(
&self,
namespace: Option<&str>,
event: &str,
data: T,
res_id: Option<u64>,
) -> Result<EmitMetadata> {
let data_bytes = rmp_serde::to_vec(&data)?;
let event = Event::new(event.to_string(), data_bytes, res_id);
let event = if let Some(namespace) = namespace {
Event::with_namespace(namespace.to_string(), event.to_string(), data_bytes, res_id)
} else {
Event::new(event.to_string(), data_bytes, res_id)
};
let event_bytes = event.to_bytes()?;
{
let mut stream = self.stream.lock().await;
@ -40,22 +46,50 @@ impl StreamEmitter {
}
/// Emits an event
pub async fn emit<T: Serialize>(&self, event: &str, data: T) -> Result<EmitMetadata> {
let metadata = self._emit(event, data, None).await?;
pub async fn emit<S: AsRef<str>, T: Serialize>(
&self,
event: S,
data: T,
) -> Result<EmitMetadata> {
self._emit(None, event.as_ref(), data, None).await
}
Ok(metadata)
/// Emits an event to a specific namespace
pub async fn emit_to<S1: AsRef<str>, S2: AsRef<str>, T: Serialize>(
&self,
namespace: S1,
event: S2,
data: T,
) -> Result<EmitMetadata> {
self._emit(Some(namespace.as_ref()), event.as_ref(), data, None)
.await
}
/// Emits a response to an event
pub async fn emit_response<T: Serialize>(
pub async fn emit_response<S: AsRef<str>, T: Serialize>(
&self,
event_id: u64,
event: &str,
event: S,
data: T,
) -> Result<EmitMetadata> {
let metadata = self._emit(event, data, Some(event_id)).await?;
self._emit(None, event.as_ref(), data, Some(event_id)).await
}
Ok(metadata)
/// Emits a response to an event
pub async fn emit_response_to<S1: AsRef<str>, S2: AsRef<str>, T: Serialize>(
&self,
event_id: u64,
namespace: S1,
event: S2,
data: T,
) -> Result<EmitMetadata> {
self._emit(
Some(namespace.as_ref()),
event.as_ref(),
data,
Some(event_id),
)
.await
}
}

@ -14,6 +14,13 @@
//! ctx.emitter.emit_response(event.id(), "pong", ()).await?;
//! Ok(())
//! }))
//! .namespace("mainspace-client")
//! .on("something", |ctx, event| Box::pin(async move {
//! println!("I think the server did something");
//! ctx.emitter.emit_response_to(event.id(), "mainspace-server", "ok", ()).await?;
//! Ok(())
//! }))
//! .build()
//! .build_client().await.unwrap();
//!
//! // emit an initial event
@ -35,6 +42,13 @@
//! ctx.emitter.emit_response(event.id(), "pong", ()).await?;
//! Ok(())
//! }))
//! .namespace("mainspace-server")
//! .on("do-something", |ctx, event| Box::pin(async move {
//! println!("Doing something");
//! ctx.emitter.emit_response_to(event.id(), "mainspace-client", "something", ()).await?;
//! Ok(())
//! }))
//! .build()
//! .build_server().await.unwrap();
//! # }
//! ```
@ -45,6 +59,7 @@ mod tests;
pub mod error;
mod events;
mod ipc;
mod namespaces;
pub use events::error_event;
pub use events::event::Event;

@ -0,0 +1,44 @@
use crate::context::Context;
use crate::error::Result;
use crate::events::event_handler::EventHandler;
use crate::namespaces::namespace::Namespace;
use crate::{Event, IPCBuilder};
use std::future::Future;
use std::pin::Pin;
pub struct NamespaceBuilder {
name: String,
handler: EventHandler,
ipc_builder: IPCBuilder,
}
impl NamespaceBuilder {
pub(crate) fn new(ipc_builder: IPCBuilder, name: String) -> Self {
Self {
name,
handler: EventHandler::new(),
ipc_builder,
}
}
/// Adds an event callback on the namespace
pub fn on<F: 'static>(mut self, event: &str, callback: F) -> Self
where
F: for<'a> Fn(
&'a Context,
Event,
) -> Pin<Box<(dyn Future<Output = Result<()>> + Send + 'a)>>
+ Send
+ Sync,
{
self.handler.on(event, callback);
self
}
/// Builds the namespace
pub fn build(self) -> IPCBuilder {
let namespace = Namespace::new(self.name, self.handler);
self.ipc_builder.add_namespace(namespace)
}
}

@ -0,0 +1,2 @@
pub mod builder;
pub mod namespace;

@ -0,0 +1,21 @@
use crate::events::event_handler::EventHandler;
use std::sync::Arc;
#[derive(Clone)]
pub struct Namespace {
name: String,
pub(crate) handler: Arc<EventHandler>,
}
impl Namespace {
pub fn new<S: ToString>(name: S, handler: EventHandler) -> Self {
Self {
name: name.to_string(),
handler: Arc::new(handler),
}
}
pub fn name(&self) -> &String {
&self.name
}
}

@ -1,30 +1,67 @@
use self::super::utils::PingEventData;
use crate::context::Context;
use crate::error::Error;
use crate::error::Result;
use crate::events::error_event::ErrorEventData;
use crate::tests::utils::start_test_server;
use crate::IPCBuilder;
use crate::{Event, IPCBuilder};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::{Duration, SystemTime};
async fn handle_ping_event(ctx: &Context, e: Event) -> Result<()> {
let mut ping_data = e.data::<PingEventData>()?;
ping_data.time = SystemTime::now();
ping_data.ttl -= 1;
if ping_data.ttl > 0 {
ctx.emitter.emit_response(e.id(), "pong", ping_data).await?;
}
Ok(())
}
#[tokio::test]
async fn it_receives_events() {
let builder = IPCBuilder::new()
.on("ping", {
move |ctx, e| {
Box::pin(async move {
let mut ping_data = e.data::<PingEventData>()?;
ping_data.time = SystemTime::now();
ping_data.ttl -= 1;
if ping_data.ttl > 0 {
ctx.emitter.emit_response(e.id(), "pong", ping_data).await?;
}
.on("ping", |ctx, e| Box::pin(handle_ping_event(ctx, e)))
.address("127.0.0.1:8281");
let server_running = Arc::new(AtomicBool::new(false));
tokio::spawn({
let server_running = Arc::clone(&server_running);
let builder = builder.clone();
async move {
server_running.store(true, Ordering::SeqCst);
builder.build_server().await.unwrap();
}
});
while !server_running.load(Ordering::Relaxed) {
tokio::time::sleep(Duration::from_millis(10)).await;
}
let ctx = builder.build_client().await.unwrap();
let reply = ctx
.emitter
.emit(
"ping",
PingEventData {
ttl: 16,
time: SystemTime::now(),
},
)
.await
.unwrap()
.await_reply(&ctx)
.await
.unwrap();
assert_eq!(reply.name(), "pong");
}
Ok(())
})
}
})
#[tokio::test]
async fn it_receives_namespaced_events() {
let builder = IPCBuilder::new()
.namespace("mainspace")
.on("ping", |ctx, e| Box::pin(handle_ping_event(ctx, e)))
.build()
.address("127.0.0.1:8282");
let server_running = Arc::new(AtomicBool::new(false));
tokio::spawn({
@ -41,7 +78,8 @@ async fn it_receives_events() {
let ctx = builder.build_client().await.unwrap();
let reply = ctx
.emitter
.emit(
.emit_to(
"mainspace",
"ping",
PingEventData {
ttl: 16,

Loading…
Cancel
Save