Merge pull request #5 from Trivernis/develop

Context Data
pull/7/head
Julius Riegel 3 years ago committed by GitHub
commit e119146160
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

9
Cargo.lock generated

@ -148,7 +148,7 @@ dependencies = [
[[package]] [[package]]
name = "rmp-ipc" name = "rmp-ipc"
version = "0.3.0" version = "0.4.0"
dependencies = [ dependencies = [
"lazy_static", "lazy_static",
"log", "log",
@ -156,6 +156,7 @@ dependencies = [
"serde", "serde",
"thiserror", "thiserror",
"tokio", "tokio",
"typemap_rev",
] ]
[[package]] [[package]]
@ -248,6 +249,12 @@ dependencies = [
"syn", "syn",
] ]
[[package]]
name = "typemap_rev"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ed5b74f0a24b5454580a79abb6994393b09adf0ab8070f15827cb666255de155"
[[package]] [[package]]
name = "unicode-xid" name = "unicode-xid"
version = "0.2.2" version = "0.2.2"

@ -1,6 +1,6 @@
[package] [package]
name = "rmp-ipc" name = "rmp-ipc"
version = "0.3.0" version = "0.4.0"
authors = ["trivernis <trivernis@protonmail.com>"] authors = ["trivernis <trivernis@protonmail.com>"]
edition = "2018" edition = "2018"
readme = "README.md" readme = "README.md"
@ -15,6 +15,7 @@ thiserror = "1.0.30"
rmp-serde = "0.15.4" rmp-serde = "0.15.4"
log = "0.4.14" log = "0.4.14"
lazy_static = "1.4.0" lazy_static = "1.4.0"
typemap_rev = "0.1.5"
[dependencies.serde] [dependencies.serde]
version = "1.0.130" version = "1.0.130"

@ -10,11 +10,19 @@ use crate::namespaces::namespace::Namespace;
use std::collections::HashMap; use std::collections::HashMap;
use std::future::Future; use std::future::Future;
use std::pin::Pin; use std::pin::Pin;
use typemap_rev::{TypeMap, TypeMapKey};
#[derive(Clone)]
/// A builder for the IPC server or client. /// A builder for the IPC server or client.
/// ```no_run /// ```no_run
///use rmp_ipc::IPCBuilder; ///use typemap_rev::TypeMapKey;
/// use rmp_ipc::IPCBuilder;
///
/// struct CustomKey;
///
/// impl TypeMapKey for CustomKey {
/// type Value = String;
/// }
///
///# async fn a() { ///# async fn a() {
/// IPCBuilder::new() /// IPCBuilder::new()
/// .address("127.0.0.1:2020") /// .address("127.0.0.1:2020")
@ -23,6 +31,15 @@ use std::pin::Pin;
/// println!("Received ping event."); /// println!("Received ping event.");
/// Ok(()) /// Ok(())
/// })) /// }))
/// // register a namespace
/// .namespace("namespace")
/// .on("namespace-event", |_ctx, _event| Box::pin(async move {
/// println!("Namespace event.");
/// Ok(())
/// }))
/// .build()
/// // add context shared data
/// .insert::<CustomKey>("Hello World".to_string())
/// // can also be build_client which would return an emitter for events /// // can also be build_client which would return an emitter for events
/// .build_server().await.unwrap(); /// .build_server().await.unwrap();
///# } ///# }
@ -31,6 +48,7 @@ pub struct IPCBuilder {
handler: EventHandler, handler: EventHandler,
address: Option<String>, address: Option<String>,
namespaces: HashMap<String, Namespace>, namespaces: HashMap<String, Namespace>,
data: TypeMap,
} }
impl IPCBuilder { impl IPCBuilder {
@ -52,7 +70,15 @@ impl IPCBuilder {
handler, handler,
address: None, address: None,
namespaces: HashMap::new(), namespaces: HashMap::new(),
data: TypeMap::new(),
}
} }
/// Adds globally shared data
pub fn insert<K: TypeMapKey>(mut self, value: K::Value) -> Self {
self.data.insert::<K>(value);
self
} }
/// Adds an event callback /// Adds an event callback
@ -96,6 +122,7 @@ impl IPCBuilder {
let server = IPCServer { let server = IPCServer {
namespaces: self.namespaces, namespaces: self.namespaces,
handler: self.handler, handler: self.handler,
data: self.data,
}; };
server.start(&self.address.unwrap()).await?; server.start(&self.address.unwrap()).await?;
@ -108,6 +135,7 @@ impl IPCBuilder {
let client = IPCClient { let client = IPCClient {
namespaces: self.namespaces, namespaces: self.namespaces,
handler: self.handler, handler: self.handler,
data: self.data,
}; };
let ctx = client.connect(&self.address.unwrap()).await?; let ctx = client.connect(&self.address.unwrap()).await?;

@ -7,6 +7,8 @@ use crate::namespaces::namespace::Namespace;
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use tokio::net::TcpStream; use tokio::net::TcpStream;
use tokio::sync::RwLock;
use typemap_rev::TypeMap;
/// The IPC Client to connect to an IPC Server. /// The IPC Client to connect to an IPC Server.
/// Use the [IPCBuilder](crate::builder::IPCBuilder) to create the client. /// Use the [IPCBuilder](crate::builder::IPCBuilder) to create the client.
@ -14,6 +16,7 @@ use tokio::net::TcpStream;
pub struct IPCClient { pub struct IPCClient {
pub(crate) handler: EventHandler, pub(crate) handler: EventHandler,
pub(crate) namespaces: HashMap<String, Namespace>, pub(crate) namespaces: HashMap<String, Namespace>,
pub(crate) data: TypeMap,
} }
impl IPCClient { impl IPCClient {
@ -23,7 +26,10 @@ impl IPCClient {
let stream = TcpStream::connect(address).await?; let stream = TcpStream::connect(address).await?;
let (read_half, write_half) = stream.into_split(); let (read_half, write_half) = stream.into_split();
let emitter = StreamEmitter::new(write_half); let emitter = StreamEmitter::new(write_half);
let ctx = Context::new(StreamEmitter::clone(&emitter)); let ctx = Context::new(
StreamEmitter::clone(&emitter),
Arc::new(RwLock::new(self.data)),
);
let handler = Arc::new(self.handler); let handler = Arc::new(self.handler);
let namespaces = Arc::new(self.namespaces); let namespaces = Arc::new(self.namespaces);

@ -3,7 +3,8 @@ use crate::ipc::stream_emitter::StreamEmitter;
use crate::Event; use crate::Event;
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::{oneshot, Mutex}; use tokio::sync::{oneshot, Mutex, RwLock};
use typemap_rev::TypeMap;
/// An object provided to each callback function. /// An object provided to each callback function.
/// Currently it only holds the event emitter to emit response events in event callbacks. /// Currently it only holds the event emitter to emit response events in event callbacks.
@ -24,14 +25,18 @@ pub struct Context {
/// The event emitter /// The event emitter
pub emitter: StreamEmitter, pub emitter: StreamEmitter,
/// Field to store additional context data
pub data: Arc<RwLock<TypeMap>>,
reply_listeners: Arc<Mutex<HashMap<u64, oneshot::Sender<Event>>>>, reply_listeners: Arc<Mutex<HashMap<u64, oneshot::Sender<Event>>>>,
} }
impl Context { impl Context {
pub(crate) fn new(emitter: StreamEmitter) -> Self { pub(crate) fn new(emitter: StreamEmitter, data: Arc<RwLock<TypeMap>>) -> Self {
Self { Self {
emitter, emitter,
reply_listeners: Arc::new(Mutex::new(HashMap::new())), reply_listeners: Arc::new(Mutex::new(HashMap::new())),
data,
} }
} }

@ -7,6 +7,8 @@ use crate::namespaces::namespace::Namespace;
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use tokio::net::TcpListener; use tokio::net::TcpListener;
use tokio::sync::RwLock;
use typemap_rev::TypeMap;
/// The IPC Server listening for connections. /// The IPC Server listening for connections.
/// Use the [IPCBuilder](crate::builder::IPCBuilder) to create a server. /// Use the [IPCBuilder](crate::builder::IPCBuilder) to create a server.
@ -14,6 +16,7 @@ use tokio::net::TcpListener;
pub struct IPCServer { pub struct IPCServer {
pub(crate) handler: EventHandler, pub(crate) handler: EventHandler,
pub(crate) namespaces: HashMap<String, Namespace>, pub(crate) namespaces: HashMap<String, Namespace>,
pub(crate) data: TypeMap,
} }
impl IPCServer { impl IPCServer {
@ -23,15 +26,17 @@ impl IPCServer {
let listener = TcpListener::bind(address).await?; let listener = TcpListener::bind(address).await?;
let handler = Arc::new(self.handler); let handler = Arc::new(self.handler);
let namespaces = Arc::new(self.namespaces); let namespaces = Arc::new(self.namespaces);
let data = Arc::new(RwLock::new(self.data));
while let Ok((stream, _)) = listener.accept().await { while let Ok((stream, _)) = listener.accept().await {
let handler = Arc::clone(&handler); let handler = Arc::clone(&handler);
let namespaces = Arc::clone(&namespaces); let namespaces = Arc::clone(&namespaces);
let data = Arc::clone(&data);
tokio::spawn(async { tokio::spawn(async {
let (read_half, write_half) = stream.into_split(); let (read_half, write_half) = stream.into_split();
let emitter = StreamEmitter::new(write_half); let emitter = StreamEmitter::new(write_half);
let ctx = Context::new(StreamEmitter::clone(&emitter)); let ctx = Context::new(StreamEmitter::clone(&emitter), data);
handle_connection(namespaces, handler, read_half, ctx).await; handle_connection(namespaces, handler, read_half, ctx).await;
}); });

@ -31,7 +31,15 @@
//! //!
//! Server Example: //! Server Example:
//! ```no_run //! ```no_run
//! use typemap_rev::TypeMapKey;
//! use rmp_ipc::IPCBuilder; //! use rmp_ipc::IPCBuilder;
//!
//! struct MyKey;
//!
//! impl TypeMapKey for MyKey {
//! type Value = u32;
//! }
//!
//! // create the server //! // create the server
//!# async fn a() { //!# async fn a() {
//! IPCBuilder::new() //! IPCBuilder::new()
@ -45,10 +53,18 @@
//! .namespace("mainspace-server") //! .namespace("mainspace-server")
//! .on("do-something", |ctx, event| Box::pin(async move { //! .on("do-something", |ctx, event| Box::pin(async move {
//! println!("Doing something"); //! println!("Doing something");
//! {
//! // access data
//! let mut data = ctx.data.write().await;
//! let mut my_key = data.get_mut::<MyKey>().unwrap();
//! *my_key += 1;
//! }
//! ctx.emitter.emit_response_to(event.id(), "mainspace-client", "something", ()).await?; //! ctx.emitter.emit_response_to(event.id(), "mainspace-client", "something", ()).await?;
//! Ok(()) //! Ok(())
//! })) //! }))
//! .build() //! .build()
//! // store additional data
//! .insert::<MyKey>(3)
//! .build_server().await.unwrap(); //! .build_server().await.unwrap();
//! # } //! # }
//! ``` //! ```

@ -8,6 +8,7 @@ use crate::{Event, IPCBuilder};
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc; use std::sync::Arc;
use std::time::{Duration, SystemTime}; use std::time::{Duration, SystemTime};
use typemap_rev::TypeMapKey;
async fn handle_ping_event(ctx: &Context, e: Event) -> Result<()> { async fn handle_ping_event(ctx: &Context, e: Event) -> Result<()> {
let mut ping_data = e.data::<PingEventData>()?; let mut ping_data = e.data::<PingEventData>()?;
@ -21,15 +22,19 @@ async fn handle_ping_event(ctx: &Context, e: Event) -> Result<()> {
Ok(()) Ok(())
} }
fn get_builder_with_ping(address: &str) -> IPCBuilder {
IPCBuilder::new()
.on("ping", |ctx, e| Box::pin(handle_ping_event(ctx, e)))
.address(address)
}
#[tokio::test] #[tokio::test]
async fn it_receives_events() { async fn it_receives_events() {
let builder = IPCBuilder::new() let builder = get_builder_with_ping("127.0.0.1:8281");
.on("ping", |ctx, e| Box::pin(handle_ping_event(ctx, e)))
.address("127.0.0.1:8281");
let server_running = Arc::new(AtomicBool::new(false)); let server_running = Arc::new(AtomicBool::new(false));
tokio::spawn({ tokio::spawn({
let server_running = Arc::clone(&server_running); let server_running = Arc::clone(&server_running);
let builder = builder.clone(); let builder = get_builder_with_ping("127.0.0.1:8281");
async move { async move {
server_running.store(true, Ordering::SeqCst); server_running.store(true, Ordering::SeqCst);
builder.build_server().await.unwrap(); builder.build_server().await.unwrap();
@ -56,17 +61,21 @@ async fn it_receives_events() {
assert_eq!(reply.name(), "pong"); assert_eq!(reply.name(), "pong");
} }
#[tokio::test] fn get_builder_with_ping_mainspace(address: &str) -> IPCBuilder {
async fn it_receives_namespaced_events() { IPCBuilder::new()
let builder = IPCBuilder::new()
.namespace("mainspace") .namespace("mainspace")
.on("ping", |ctx, e| Box::pin(handle_ping_event(ctx, e))) .on("ping", |ctx, e| Box::pin(handle_ping_event(ctx, e)))
.build() .build()
.address("127.0.0.1:8282"); .address(address)
}
#[tokio::test]
async fn it_receives_namespaced_events() {
let builder = get_builder_with_ping_mainspace("127.0.0.1:8282");
let server_running = Arc::new(AtomicBool::new(false)); let server_running = Arc::new(AtomicBool::new(false));
tokio::spawn({ tokio::spawn({
let server_running = Arc::clone(&server_running); let server_running = Arc::clone(&server_running);
let builder = builder.clone(); let builder = get_builder_with_ping_mainspace("127.0.0.1:8282");
async move { async move {
server_running.store(true, Ordering::SeqCst); server_running.store(true, Ordering::SeqCst);
builder.build_server().await.unwrap(); builder.build_server().await.unwrap();
@ -94,32 +103,46 @@ async fn it_receives_namespaced_events() {
assert_eq!(reply.name(), "pong"); assert_eq!(reply.name(), "pong");
} }
#[tokio::test] struct ErrorOccurredKey;
async fn it_handles_errors() {
let error_occurred = Arc::new(AtomicBool::new(false)); impl TypeMapKey for ErrorOccurredKey {
let builder = IPCBuilder::new() type Value = Arc<AtomicBool>;
}
fn get_builder_with_error_handling(error_occurred: Arc<AtomicBool>, address: &str) -> IPCBuilder {
IPCBuilder::new()
.insert::<ErrorOccurredKey>(error_occurred)
.on("ping", move |_, _| { .on("ping", move |_, _| {
Box::pin(async move { Err(Error::from("ERRROROROROR")) }) Box::pin(async move { Err(Error::from("ERRROROROROR")) })
}) })
.on("error", { .on("error", {
let error_occurred = Arc::clone(&error_occurred); move |ctx, e| {
move |_, e| {
let error_occurred = Arc::clone(&error_occurred);
Box::pin(async move { Box::pin(async move {
let error = e.data::<ErrorEventData>()?; let error = e.data::<ErrorEventData>()?;
assert!(error.message.len() > 0); assert!(error.message.len() > 0);
assert_eq!(error.code, 500); assert_eq!(error.code, 500);
{
let data = ctx.data.read().await;
let error_occurred = data.get::<ErrorOccurredKey>().unwrap();
error_occurred.store(true, Ordering::SeqCst); error_occurred.store(true, Ordering::SeqCst);
}
Ok(()) Ok(())
}) })
} }
}) })
.address("127.0.0.1:8283"); .address(address)
}
#[tokio::test]
async fn it_handles_errors() {
let error_occurred = Arc::new(AtomicBool::new(false));
let builder = get_builder_with_error_handling(Arc::clone(&error_occurred), "127.0.0.1:8283");
let server_running = Arc::new(AtomicBool::new(false)); let server_running = Arc::new(AtomicBool::new(false));
tokio::spawn({ tokio::spawn({
let server_running = Arc::clone(&server_running); let server_running = Arc::clone(&server_running);
let builder = builder.clone(); let error_occurred = Arc::clone(&error_occurred);
let builder = get_builder_with_error_handling(error_occurred, "127.0.0.1:8283");
async move { async move {
server_running.store(true, Ordering::SeqCst); server_running.store(true, Ordering::SeqCst);
builder.build_server().await.unwrap(); builder.build_server().await.unwrap();

Loading…
Cancel
Save