Merge pull request #18 from Trivernis/feature/protocol-trait

Feature/protocol trait
pull/19/head
Julius Riegel 3 years ago committed by GitHub
commit aec08da839
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -10,10 +10,13 @@ env:
CARGO_TERM_COLOR: always
jobs:
build:
runs-on: ubuntu-latest
build:
strategy:
fail-fast: false
matrix:
os: [ubuntu-latest, macos-latest, windows-latest]
runs-on: ${{ matrix.os }}
steps:
- uses: actions/checkout@v2

14
Cargo.lock generated

@ -2,6 +2,17 @@
# It is not intended for manual editing.
version = 3
[[package]]
name = "async-trait"
version = "0.1.51"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "44318e776df68115a881de9a8fd1b9e53368d7a4a5ce4cc48517da3393233a5e"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "atty"
version = "0.2.14"
@ -500,8 +511,9 @@ dependencies = [
[[package]]
name = "rmp-ipc"
version = "0.8.2"
version = "0.9.0"
dependencies = [
"async-trait",
"byteorder",
"criterion",
"lazy_static",

@ -1,6 +1,6 @@
[package]
name = "rmp-ipc"
version = "0.8.2"
version = "0.9.0"
authors = ["trivernis <trivernis@protonmail.com>"]
edition = "2018"
readme = "README.md"
@ -26,6 +26,7 @@ tracing = "0.1.29"
lazy_static = "1.4.0"
typemap_rev = "0.1.5"
byteorder = "1.4.3"
async-trait = "0.1.51"
[dependencies.serde]
version = "1.0.130"

@ -6,10 +6,11 @@ Interprocess Communication via TCP using Rust MessagePack.
**Client:**
```rust
use rmp_ipc::{callback, Event, context::Context, IPCBuilder, error::Result};
use rmp_ipc::prelude::*;
use tokio::net::TcpListener;
/// Callback ping function
async fn handle_ping(ctx: &Context, event: Event) -> Result<()> {
async fn handle_ping<S: AsyncProtocolStream>(ctx: &Context<S>, event: Event) -> Result<()> {
println!("Received ping event.");
ctx.emitter.emit_response(event.id(), "pong", ()).await?;
Ok(())
@ -18,7 +19,7 @@ async fn handle_ping(ctx: &Context, event: Event) -> Result<()> {
#[tokio::main]
async fn main() {
// create the client
let ctx = IPCBuilder::new()
let ctx = IPCBuilder::<TcpListener>::new()
.address("127.0.0.1:2020")
// register callback
.on("ping", callback!(handle_ping))
@ -31,12 +32,13 @@ async fn main() {
**Server:**
```rust
use rmp_ipc::{IPCBuilder, callback};
use rmp_ipc::prelude::*;
use tokio::net::TcpListener;
// create the server
#[tokio::main]
async fn main() {
IPCBuilder::new()
IPCBuilder::<TcpListener>::new()
.address("127.0.0.1:2020")
// register callback
.on("ping", callback!(ctx, event, async move {
@ -51,12 +53,13 @@ async fn main() {
**Client:**
```rust
use rmp_ipc::IPCBuilder;
use rmp_ipc::prelude::*;
use tokio::net::TcpListener;
// create the client
#[tokio::main]
async fn main() {
let ctx = IPCBuilder::new()
let ctx = IPCBuilder::<TcpListener>::new()
.address("127.0.0.1:2020")
// register namespace
.namespace("mainspace-client")
@ -76,13 +79,14 @@ async fn main() {
**Server:**
```rust
use rmp_ipc::{IPCBuilder, EventHandler, namespace, command, Event, context::Context};
use rmp_ipc::prelude::*;
use tokio::net::TcpListener;
// create the server
pub struct MyNamespace;
impl MyNamespace {
async fn ping(_ctx: &Context, _event: Event) -> Result<()> {
async fn ping<S: AsyncProtocolStream>(_ctx: &Context<S>, _event: Event) -> Result<()> {
println!("My namespace received a ping");
Ok(())
}
@ -91,7 +95,7 @@ impl MyNamespace {
impl NamespaceProvider for MyNamespace {
fn name() -> &'static str {"my_namespace"}
fn register(handler: &mut EventHandler) {
fn register<S: AsyncProtocolStream>(handler: &mut EventHandler<S>) {
events!(handler,
"ping" => Self::ping
);
@ -100,7 +104,7 @@ impl NamespaceProvider for MyNamespace {
#[tokio::main]
async fn main() {
IPCBuilder::new()
IPCBuilder::<TcpListener>::new()
.address("127.0.0.1:2020")
// register namespace
.namespace("mainspace-server")

@ -1,25 +1,39 @@
use crate::error::Result;
use crate::events::event::Event;
use crate::ipc::context::Context;
use crate::protocol::AsyncProtocolStream;
use std::collections::HashMap;
use std::fmt::{Debug, Formatter};
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
type EventCallback = Arc<
dyn for<'a> Fn(&'a Context, Event) -> Pin<Box<(dyn Future<Output = Result<()>> + Send + 'a)>>
type EventCallback<P> = Arc<
dyn for<'a> Fn(&'a Context<P>, Event) -> Pin<Box<(dyn Future<Output = Result<()>> + Send + 'a)>>
+ Send
+ Sync,
>;
/// Handler for events
#[derive(Clone)]
pub struct EventHandler {
callbacks: HashMap<String, EventCallback>,
pub struct EventHandler<P: AsyncProtocolStream> {
callbacks: HashMap<String, EventCallback<P>>,
}
impl Debug for EventHandler {
impl<S> Clone for EventHandler<S>
where
S: AsyncProtocolStream,
{
fn clone(&self) -> Self {
Self {
callbacks: self.callbacks.clone(),
}
}
}
impl<P> Debug for EventHandler<P>
where
P: AsyncProtocolStream,
{
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let callback_names: String = self
.callbacks
@ -31,7 +45,10 @@ impl Debug for EventHandler {
}
}
impl EventHandler {
impl<P> EventHandler<P>
where
P: AsyncProtocolStream,
{
/// Creates a new event handler
pub fn new() -> Self {
Self {
@ -44,7 +61,7 @@ impl EventHandler {
pub fn on<F: 'static>(&mut self, name: &str, callback: F)
where
F: for<'a> Fn(
&'a Context,
&'a Context<P>,
Event,
) -> Pin<Box<(dyn Future<Output = Result<()>> + Send + 'a)>>
+ Send
@ -55,7 +72,7 @@ impl EventHandler {
/// Handles a received event
#[tracing::instrument(level = "debug", skip(self, ctx, event))]
pub async fn handle_event(&self, ctx: &Context, event: Event) -> Result<()> {
pub async fn handle_event(&self, ctx: &Context<P>, event: Event) -> Result<()> {
if let Some(cb) = self.callbacks.get(event.name()) {
cb.as_ref()(ctx, event).await?;
}

@ -7,6 +7,7 @@ use crate::ipc::context::{Context, PooledContext, ReplyListeners};
use crate::ipc::server::IPCServer;
use crate::namespaces::builder::NamespaceBuilder;
use crate::namespaces::namespace::Namespace;
use crate::protocol::AsyncStreamProtocolListener;
use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;
@ -16,8 +17,10 @@ use typemap_rev::{TypeMap, TypeMapKey};
/// A builder for the IPC server or client.
/// ```no_run
///use typemap_rev::TypeMapKey;
/// use std::net::ToSocketAddrs;
/// use typemap_rev::TypeMapKey;
/// use rmp_ipc::IPCBuilder;
/// use tokio::net::TcpListener;
///
/// struct CustomKey;
///
@ -26,8 +29,8 @@ use typemap_rev::{TypeMap, TypeMapKey};
/// }
///
///# async fn a() {
/// IPCBuilder::new()
/// .address("127.0.0.1:2020")
/// IPCBuilder::<TcpListener>::new()
/// .address("127.0.0.1:2020".to_socket_addrs().unwrap().next().unwrap())
/// // register callback
/// .on("ping", |_ctx, _event| Box::pin(async move {
/// println!("Received ping event.");
@ -46,14 +49,17 @@ use typemap_rev::{TypeMap, TypeMapKey};
/// .build_server().await.unwrap();
///# }
/// ```
pub struct IPCBuilder {
handler: EventHandler,
address: Option<String>,
namespaces: HashMap<String, Namespace>,
pub struct IPCBuilder<L: AsyncStreamProtocolListener> {
handler: EventHandler<L::Stream>,
address: Option<L::AddressType>,
namespaces: HashMap<String, Namespace<L::Stream>>,
data: TypeMap,
}
impl IPCBuilder {
impl<L> IPCBuilder<L>
where
L: AsyncStreamProtocolListener,
{
pub fn new() -> Self {
let mut handler = EventHandler::new();
handler.on(ERROR_EVENT_NAME, |_, event| {
@ -84,7 +90,7 @@ impl IPCBuilder {
pub fn on<F: 'static>(mut self, event: &str, callback: F) -> Self
where
F: for<'a> Fn(
&'a Context,
&'a Context<L::Stream>,
Event,
) -> Pin<Box<(dyn Future<Output = Result<()>> + Send + 'a)>>
+ Send
@ -96,19 +102,19 @@ impl IPCBuilder {
}
/// Adds the address to connect to
pub fn address<S: ToString>(mut self, address: S) -> Self {
self.address = Some(address.to_string());
pub fn address(mut self, address: L::AddressType) -> Self {
self.address = Some(address);
self
}
/// Adds a namespace
pub fn namespace<S: ToString>(self, name: S) -> NamespaceBuilder {
pub fn namespace<S: ToString>(self, name: S) -> NamespaceBuilder<L> {
NamespaceBuilder::new(self, name.to_string())
}
/// Adds a namespace to the ipc server
pub fn add_namespace(mut self, namespace: Namespace) -> Self {
pub fn add_namespace(mut self, namespace: Namespace<L::Stream>) -> Self {
self.namespaces
.insert(namespace.name().to_owned(), namespace);
@ -119,19 +125,19 @@ impl IPCBuilder {
#[tracing::instrument(skip(self))]
pub async fn build_server(self) -> Result<()> {
self.validate()?;
let server = IPCServer {
let server = IPCServer::<L> {
namespaces: self.namespaces,
handler: self.handler,
data: self.data,
};
server.start(&self.address.unwrap()).await?;
server.start(self.address.unwrap()).await?;
Ok(())
}
/// Builds an ipc client
#[tracing::instrument(skip(self))]
pub async fn build_client(self) -> Result<Context> {
pub async fn build_client(self) -> Result<Context<L::Stream>> {
self.validate()?;
let data = Arc::new(RwLock::new(self.data));
let reply_listeners = ReplyListeners::default();
@ -142,7 +148,7 @@ impl IPCBuilder {
reply_listeners,
};
let ctx = client.connect(&self.address.unwrap()).await?;
let ctx = client.connect(self.address.unwrap()).await?;
Ok(ctx)
}
@ -152,7 +158,7 @@ impl IPCBuilder {
/// return a [crate::context::PooledContext] that allows one to [crate::context::PooledContext::acquire] a single context
/// to emit events.
#[tracing::instrument(skip(self))]
pub async fn build_pooled_client(self, pool_size: usize) -> Result<PooledContext> {
pub async fn build_pooled_client(self, pool_size: usize) -> Result<PooledContext<L::Stream>> {
if pool_size == 0 {
Error::BuildError("Pool size must be greater than 0".to_string());
}
@ -170,7 +176,7 @@ impl IPCBuilder {
reply_listeners: Arc::clone(&reply_listeners),
};
let ctx = client.connect(&address).await?;
let ctx = client.connect(address.clone()).await?;
contexts.push(ctx);
}

@ -4,9 +4,9 @@ use crate::events::event_handler::EventHandler;
use crate::ipc::context::{Context, ReplyListeners};
use crate::ipc::stream_emitter::StreamEmitter;
use crate::namespaces::namespace::Namespace;
use crate::protocol::AsyncProtocolStream;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::net::TcpStream;
use tokio::sync::oneshot;
use tokio::sync::RwLock;
use typemap_rev::TypeMap;
@ -15,20 +15,23 @@ use typemap_rev::TypeMap;
/// Use the [IPCBuilder](crate::builder::IPCBuilder) to create the client.
/// Usually one does not need to use the IPCClient object directly.
#[derive(Clone)]
pub struct IPCClient {
pub(crate) handler: EventHandler,
pub(crate) namespaces: HashMap<String, Namespace>,
pub struct IPCClient<S: AsyncProtocolStream> {
pub(crate) handler: EventHandler<S>,
pub(crate) namespaces: HashMap<String, Namespace<S>>,
pub(crate) data: Arc<RwLock<TypeMap>>,
pub(crate) reply_listeners: ReplyListeners,
}
impl IPCClient {
impl<S> IPCClient<S>
where
S: 'static + AsyncProtocolStream,
{
/// Connects to a given address and returns an emitter for events to that address.
/// Invoked by [IPCBuilder::build_client](crate::builder::IPCBuilder::build_client)
#[tracing::instrument(skip(self))]
pub async fn connect(self, address: &str) -> Result<Context> {
let stream = TcpStream::connect(address).await?;
let (read_half, write_half) = stream.into_split();
pub async fn connect(self, address: S::AddressType) -> Result<Context<S>> {
let stream = S::protocol_connect(address).await?;
let (read_half, write_half) = stream.protocol_into_split();
let emitter = StreamEmitter::new(write_half);
let (tx, rx) = oneshot::channel();
let ctx = Context::new(

@ -1,6 +1,7 @@
use crate::error::{Error, Result};
use crate::event::Event;
use crate::ipc::stream_emitter::StreamEmitter;
use crate::protocol::AsyncProtocolStream;
use std::collections::HashMap;
use std::mem;
use std::ops::{Deref, DerefMut};
@ -17,17 +18,16 @@ pub(crate) type ReplyListeners = Arc<Mutex<HashMap<u64, oneshot::Sender<Event>>>
/// ```rust
/// use rmp_ipc::prelude::*;
///
/// async fn my_callback(ctx: &Context, _event: Event) -> IPCResult<()> {
/// async fn my_callback<S: AsyncProtocolStream>(ctx: &Context<S>, _event: Event) -> IPCResult<()> {
/// // use the emitter on the context object to emit events
/// // inside callbacks
/// ctx.emitter.emit("ping", ()).await?;
/// Ok(())
/// }
/// ```
#[derive(Clone)]
pub struct Context {
pub struct Context<S: AsyncProtocolStream> {
/// The event emitter
pub emitter: StreamEmitter,
pub emitter: StreamEmitter<S>,
/// Field to store additional context data
pub data: Arc<RwLock<TypeMap>>,
@ -37,9 +37,26 @@ pub struct Context {
reply_listeners: ReplyListeners,
}
impl Context {
impl<S> Clone for Context<S>
where
S: AsyncProtocolStream,
{
fn clone(&self) -> Self {
Self {
emitter: self.emitter.clone(),
data: Arc::clone(&self.data),
stop_sender: Arc::clone(&self.stop_sender),
reply_listeners: Arc::clone(&self.reply_listeners),
}
}
}
impl<P> Context<P>
where
P: AsyncProtocolStream,
{
pub(crate) fn new(
emitter: StreamEmitter,
emitter: StreamEmitter<P>,
data: Arc<RwLock<TypeMap>>,
stop_sender: Option<Sender<()>>,
reply_listeners: ReplyListeners,
@ -83,9 +100,19 @@ impl Context {
}
}
#[derive(Clone)]
pub struct PooledContext {
contexts: Vec<PoolGuard<Context>>,
pub struct PooledContext<S: AsyncProtocolStream> {
contexts: Vec<PoolGuard<Context<S>>>,
}
impl<S> Clone for PooledContext<S>
where
S: AsyncProtocolStream,
{
fn clone(&self) -> Self {
Self {
contexts: self.contexts.clone(),
}
}
}
pub struct PoolGuard<T>
@ -169,9 +196,12 @@ where
}
}
impl PooledContext {
impl<P> PooledContext<P>
where
P: AsyncProtocolStream,
{
/// Creates a new pooled context from a list of contexts
pub(crate) fn new(contexts: Vec<Context>) -> Self {
pub(crate) fn new(contexts: Vec<Context<P>>) -> Self {
Self {
contexts: contexts.into_iter().map(PoolGuard::new).collect(),
}
@ -180,7 +210,7 @@ impl PooledContext {
/// Acquires a context from the pool
/// It always chooses the one that is used the least
#[tracing::instrument(level = "trace", skip_all)]
pub fn acquire(&self) -> PoolGuard<Context> {
pub fn acquire(&self) -> PoolGuard<Context<P>> {
self.contexts
.iter()
.min_by_key(|c| c.count())

@ -2,9 +2,9 @@ use crate::error_event::{ErrorEventData, ERROR_EVENT_NAME};
use crate::events::event_handler::EventHandler;
use crate::namespaces::namespace::Namespace;
use crate::prelude::*;
use crate::protocol::AsyncProtocolStream;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::net::tcp::OwnedReadHalf;
pub mod builder;
pub mod client;
@ -13,11 +13,11 @@ pub mod server;
pub mod stream_emitter;
/// Handles listening to a connection and triggering the corresponding event functions
async fn handle_connection(
namespaces: Arc<HashMap<String, Namespace>>,
handler: Arc<EventHandler>,
mut read_half: OwnedReadHalf,
ctx: Context,
async fn handle_connection<S: 'static + AsyncProtocolStream>(
namespaces: Arc<HashMap<String, Namespace<S>>>,
handler: Arc<EventHandler<S>>,
mut read_half: S::OwnedSplitReadHalf,
ctx: Context<S>,
) {
while let Ok(event) = Event::from_async_read(&mut read_half).await {
tracing::trace!(
@ -52,7 +52,11 @@ async fn handle_connection(
}
/// Handles a single event in a different tokio context
fn handle_event(ctx: Context, handler: Arc<EventHandler>, event: Event) {
fn handle_event<S: 'static + AsyncProtocolStream>(
ctx: Context<S>,
handler: Arc<EventHandler<S>>,
event: Event,
) {
tokio::spawn(async move {
let id = event.id();
if let Err(e) = handler.handle_event(&ctx, event).await {

@ -4,41 +4,43 @@ use crate::events::event_handler::EventHandler;
use crate::ipc::context::{Context, ReplyListeners};
use crate::ipc::stream_emitter::StreamEmitter;
use crate::namespaces::namespace::Namespace;
use crate::protocol::{AsyncProtocolStreamSplit, AsyncStreamProtocolListener};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::net::TcpListener;
use tokio::sync::RwLock;
use typemap_rev::TypeMap;
/// The IPC Server listening for connections.
/// Use the [IPCBuilder](crate::builder::IPCBuilder) to create a server.
/// Usually one does not need to use the IPCServer object directly.
pub struct IPCServer {
pub(crate) handler: EventHandler,
pub(crate) namespaces: HashMap<String, Namespace>,
pub struct IPCServer<L: AsyncStreamProtocolListener> {
pub(crate) handler: EventHandler<L::Stream>,
pub(crate) namespaces: HashMap<String, Namespace<L::Stream>>,
pub(crate) data: TypeMap,
}
impl IPCServer {
impl<L> IPCServer<L>
where
L: AsyncStreamProtocolListener,
{
/// Starts the IPC Server.
/// Invoked by [IPCBuilder::build_server](crate::builder::IPCBuilder::build_server)
#[tracing::instrument(skip(self))]
pub async fn start(self, address: &str) -> Result<()> {
let listener = TcpListener::bind(address).await?;
pub async fn start(self, address: L::AddressType) -> Result<()> {
let listener = L::protocol_bind(address.clone()).await?;
let handler = Arc::new(self.handler);
let namespaces = Arc::new(self.namespaces);
let data = Arc::new(RwLock::new(self.data));
tracing::info!(address);
tracing::info!("address = {:?}", address);
while let Ok((stream, remote_address)) = listener.accept().await {
let remote_address = remote_address.to_string();
tracing::debug!("remote_address = {}", remote_address);
while let Ok((stream, remote_address)) = listener.protocol_accept().await {
tracing::debug!("remote_address = {:?}", remote_address);
let handler = Arc::clone(&handler);
let namespaces = Arc::clone(&namespaces);
let data = Arc::clone(&data);
tokio::spawn(async {
let (read_half, write_half) = stream.into_split();
let (read_half, write_half) = stream.protocol_into_split();
let emitter = StreamEmitter::new(write_half);
let reply_listeners = ReplyListeners::default();
let ctx = Context::new(StreamEmitter::clone(&emitter), data, None, reply_listeners);

@ -3,22 +3,35 @@ use crate::error_event::{ErrorEventData, ERROR_EVENT_NAME};
use crate::events::event::Event;
use crate::events::payload::EventSendPayload;
use crate::ipc::context::Context;
use crate::protocol::AsyncProtocolStream;
use std::fmt::Debug;
use std::sync::Arc;
use tokio::io::AsyncWriteExt;
use tokio::net::tcp::OwnedWriteHalf;
use tokio::sync::Mutex;
/// An abstraction over the raw tokio tcp stream
/// to emit events and share a connection across multiple
/// contexts.
#[derive(Clone)]
pub struct StreamEmitter {
stream: Arc<Mutex<OwnedWriteHalf>>,
pub struct StreamEmitter<S: AsyncProtocolStream> {
stream: Arc<Mutex<S::OwnedSplitWriteHalf>>,
}
impl StreamEmitter {
pub fn new(stream: OwnedWriteHalf) -> Self {
impl<S> Clone for StreamEmitter<S>
where
S: AsyncProtocolStream,
{
fn clone(&self) -> Self {
Self {
stream: Arc::clone(&self.stream),
}
}
}
impl<P> StreamEmitter<P>
where
P: AsyncProtocolStream,
{
pub fn new(stream: P::OwnedSplitWriteHalf) -> Self {
Self {
stream: Arc::new(Mutex::new(stream)),
}
@ -118,7 +131,7 @@ impl EmitMetadata {
/// Waits for a reply to the given message.
#[tracing::instrument(skip(self, ctx), fields(self.message_id))]
pub async fn await_reply(&self, ctx: &Context) -> Result<Event> {
pub async fn await_reply<P: AsyncProtocolStream>(&self, ctx: &Context<P>) -> Result<Event> {
let reply = ctx.await_reply(self.message_id).await?;
if reply.name() == ERROR_EVENT_NAME {
Err(reply.data::<ErrorEventData>()?.into())

@ -3,9 +3,10 @@
//! Client Example:
//! ```no_run
//! use rmp_ipc::prelude::*;
//! use tokio::net::TcpListener;
//!
//! /// Callback ping function
//! async fn handle_ping(ctx: &Context, event: Event) -> IPCResult<()> {
//! async fn handle_ping<S: AsyncProtocolStream>(ctx: &Context<S>, event: Event) -> IPCResult<()> {
//! println!("Received ping event.");
//! ctx.emitter.emit_response(event.id(), "pong", ()).await?;
//!
@ -15,7 +16,7 @@
//! pub struct MyNamespace;
//!
//! impl MyNamespace {
//! async fn ping(_ctx: &Context, _event: Event) -> IPCResult<()> {
//! async fn ping<S: AsyncProtocolStream>(_ctx: &Context<S>, _event: Event) -> IPCResult<()> {
//! println!("My namespace received a ping");
//! Ok(())
//! }
@ -24,7 +25,7 @@
//! impl NamespaceProvider for MyNamespace {
//! fn name() -> &'static str {"my_namespace"}
//!
//! fn register(handler: &mut EventHandler) {
//! fn register<S: AsyncProtocolStream>(handler: &mut EventHandler<S>) {
//! events!(handler,
//! "ping" => Self::ping,
//! "ping2" => Self::ping
@ -35,8 +36,9 @@
//! #[tokio::main]
//! async fn main() {
//! // create the client
//! let ctx = IPCBuilder::new()
//! .address("127.0.0.1:2020")
//! use std::net::ToSocketAddrs;
//! let ctx = IPCBuilder::<TcpListener>::new()
//! .address("127.0.0.1:2020".to_socket_addrs().unwrap().next().unwrap())
//! // register callback
//! .on("ping", callback!(handle_ping))
//! .namespace("mainspace-client")
@ -58,9 +60,11 @@
//!
//! Server Example:
//! ```no_run
//! use std::net::ToSocketAddrs;
//! use typemap_rev::TypeMapKey;
//! use rmp_ipc::IPCBuilder;
//! use rmp_ipc::callback;
//! use tokio::net::TcpListener;
//!
//! struct MyKey;
//!
@ -70,8 +74,8 @@
//!
//! // create the server
//!# async fn a() {
//! IPCBuilder::new()
//! .address("127.0.0.1:2020")
//! IPCBuilder::<TcpListener>::new()
//! .address("127.0.0.1:2020".to_socket_addrs().unwrap().next().unwrap())
//! // register callback
//! .on("ping", callback!(ctx, event, async move {
//! println!("Received ping event.");
@ -105,6 +109,7 @@ mod events;
pub mod ipc;
mod macros;
mod namespaces;
pub mod protocol;
pub use events::error_event;
pub use events::event;
@ -130,5 +135,6 @@ pub mod prelude {
pub use crate::namespaces::builder::NamespaceBuilder;
pub use crate::namespaces::provider_trait::*;
pub use crate::payload::*;
pub use crate::protocol::*;
pub use crate::*;
}

@ -3,18 +3,22 @@ use crate::event::Event;
use crate::events::event_handler::EventHandler;
use crate::ipc::context::Context;
use crate::namespaces::namespace::Namespace;
use crate::protocol::AsyncStreamProtocolListener;
use crate::IPCBuilder;
use std::future::Future;
use std::pin::Pin;
pub struct NamespaceBuilder {
pub struct NamespaceBuilder<L: AsyncStreamProtocolListener> {
name: String,
handler: EventHandler,
ipc_builder: IPCBuilder,
handler: EventHandler<L::Stream>,
ipc_builder: IPCBuilder<L>,
}
impl NamespaceBuilder {
pub(crate) fn new(ipc_builder: IPCBuilder, name: String) -> Self {
impl<L> NamespaceBuilder<L>
where
L: AsyncStreamProtocolListener,
{
pub(crate) fn new(ipc_builder: IPCBuilder<L>, name: String) -> Self {
Self {
name,
handler: EventHandler::new(),
@ -26,7 +30,7 @@ impl NamespaceBuilder {
pub fn on<F: 'static>(mut self, event: &str, callback: F) -> Self
where
F: for<'a> Fn(
&'a Context,
&'a Context<L::Stream>,
Event,
) -> Pin<Box<(dyn Future<Output = Result<()>> + Send + 'a)>>
+ Send
@ -39,7 +43,7 @@ impl NamespaceBuilder {
/// Builds the namespace
#[tracing::instrument(skip(self))]
pub fn build(self) -> IPCBuilder {
pub fn build(self) -> IPCBuilder<L> {
let namespace = Namespace::new(self.name, self.handler);
self.ipc_builder.add_namespace(namespace)
}

@ -1,15 +1,31 @@
use crate::events::event_handler::EventHandler;
use crate::protocol::AsyncProtocolStream;
use std::sync::Arc;
#[derive(Clone, Debug)]
pub struct Namespace {
#[derive(Debug)]
pub struct Namespace<S: AsyncProtocolStream> {
name: String,
pub(crate) handler: Arc<EventHandler>,
pub(crate) handler: Arc<EventHandler<S>>,
}
impl Namespace {
impl<S> Clone for Namespace<S>
where
S: AsyncProtocolStream,
{
fn clone(&self) -> Self {
Self {
name: self.name.clone(),
handler: Arc::clone(&self.handler),
}
}
}
impl<S> Namespace<S>
where
S: AsyncProtocolStream,
{
/// Creates a new namespace with an event handler to register event callbacks on
pub fn new<S: ToString>(name: S, handler: EventHandler) -> Self {
pub fn new<S2: ToString>(name: S2, handler: EventHandler<S>) -> Self {
Self {
name: name.to_string(),
handler: Arc::new(handler),

@ -1,12 +1,16 @@
use crate::events::event_handler::EventHandler;
use crate::namespace::Namespace;
use crate::protocol::AsyncProtocolStream;
pub trait NamespaceProvider {
fn name() -> &'static str;
fn register(handler: &mut EventHandler);
fn register<S: AsyncProtocolStream>(handler: &mut EventHandler<S>);
}
impl Namespace {
impl<S> Namespace<S>
where
S: AsyncProtocolStream,
{
pub fn from_provider<N: NamespaceProvider>() -> Self {
let name = N::name();
let mut handler = EventHandler::new();

@ -0,0 +1,36 @@
pub mod tcp;
#[cfg(unix)]
pub mod unix_socket;
use crate::prelude::IPCResult;
use async_trait::async_trait;
use std::fmt::Debug;
use tokio::io::{AsyncRead, AsyncWrite};
#[async_trait]
pub trait AsyncStreamProtocolListener: Sized + Send + Sync {
type AddressType: Clone + Debug + Send + Sync;
type RemoteAddressType: Debug;
type Stream: 'static + AsyncProtocolStream<AddressType = Self::AddressType>;
async fn protocol_bind(address: Self::AddressType) -> IPCResult<Self>;
async fn protocol_accept(&self) -> IPCResult<(Self::Stream, Self::RemoteAddressType)>;
}
pub trait AsyncProtocolStreamSplit {
type OwnedSplitReadHalf: AsyncRead + Send + Sync + Unpin;
type OwnedSplitWriteHalf: AsyncWrite + Send + Sync + Unpin;
fn protocol_into_split(self) -> (Self::OwnedSplitReadHalf, Self::OwnedSplitWriteHalf);
}
#[async_trait]
pub trait AsyncProtocolStream:
AsyncRead + AsyncWrite + Send + Sync + AsyncProtocolStreamSplit + Sized
{
type AddressType: Clone + Debug + Send + Sync;
async fn protocol_connect(address: Self::AddressType) -> IPCResult<Self>;
}

@ -0,0 +1,45 @@
use crate::prelude::IPCResult;
use crate::protocol::{AsyncProtocolStream, AsyncProtocolStreamSplit, AsyncStreamProtocolListener};
use async_trait::async_trait;
use std::net::SocketAddr;
use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
use tokio::net::{TcpListener, TcpStream};
#[async_trait]
impl AsyncStreamProtocolListener for TcpListener {
type AddressType = SocketAddr;
type RemoteAddressType = SocketAddr;
type Stream = TcpStream;
async fn protocol_bind(address: Self::AddressType) -> IPCResult<Self> {
let listener = TcpListener::bind(address).await?;
Ok(listener)
}
async fn protocol_accept(&self) -> IPCResult<(Self::Stream, Self::RemoteAddressType)> {
let connection = self.accept().await?;
Ok(connection)
}
}
impl AsyncProtocolStreamSplit for TcpStream {
type OwnedSplitReadHalf = OwnedReadHalf;
type OwnedSplitWriteHalf = OwnedWriteHalf;
fn protocol_into_split(self) -> (Self::OwnedSplitReadHalf, Self::OwnedSplitWriteHalf) {
self.into_split()
}
}
#[async_trait]
impl AsyncProtocolStream for TcpStream {
type AddressType = SocketAddr;
async fn protocol_connect(address: Self::AddressType) -> IPCResult<Self> {
let stream = TcpStream::connect(address).await?;
Ok(stream)
}
}

@ -0,0 +1,51 @@
use crate::error::Result;
use crate::prelude::IPCResult;
use crate::protocol::{AsyncProtocolStream, AsyncProtocolStreamSplit, AsyncStreamProtocolListener};
use async_trait::async_trait;
use std::path::PathBuf;
use tokio::io::Interest;
use tokio::net::unix::OwnedWriteHalf;
use tokio::net::unix::{OwnedReadHalf, SocketAddr};
use tokio::net::{UnixListener, UnixStream};
#[async_trait]
impl AsyncStreamProtocolListener for UnixListener {
type AddressType = PathBuf;
type RemoteAddressType = SocketAddr;
type Stream = UnixStream;
async fn protocol_bind(address: Self::AddressType) -> Result<Self> {
let listener = UnixListener::bind(address)?;
Ok(listener)
}
async fn protocol_accept(&self) -> Result<(Self::Stream, Self::RemoteAddressType)> {
let connection = self.accept().await?;
Ok(connection)
}
}
impl AsyncProtocolStreamSplit for UnixStream {
type OwnedSplitReadHalf = OwnedReadHalf;
type OwnedSplitWriteHalf = OwnedWriteHalf;
fn protocol_into_split(self) -> (Self::OwnedSplitReadHalf, Self::OwnedSplitWriteHalf) {
self.into_split()
}
}
#[async_trait]
impl AsyncProtocolStream for UnixStream {
type AddressType = PathBuf;
async fn protocol_connect(address: Self::AddressType) -> IPCResult<Self> {
let stream = UnixStream::connect(address).await?;
stream
.ready(Interest::READABLE | Interest::WRITABLE)
.await?;
Ok(stream)
}
}

@ -1,12 +1,16 @@
use super::utils::PingEventData;
use crate::prelude::*;
use crate::protocol::AsyncProtocolStream;
use crate::tests::utils::start_test_server;
use std::net::ToSocketAddrs;
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use tokio::net::TcpListener;
use typemap_rev::TypeMapKey;
async fn handle_ping_event(ctx: &Context, e: Event) -> IPCResult<()> {
async fn handle_ping_event<P: AsyncProtocolStream>(ctx: &Context<P>, e: Event) -> IPCResult<()> {
let mut ping_data = e.data::<PingEventData>()?;
ping_data.time = SystemTime::now();
ping_data.ttl -= 1;
@ -18,19 +22,35 @@ async fn handle_ping_event(ctx: &Context, e: Event) -> IPCResult<()> {
Ok(())
}
fn get_builder_with_ping(address: &str) -> IPCBuilder {
fn get_builder_with_ping<L: AsyncStreamProtocolListener>(address: L::AddressType) -> IPCBuilder<L> {
IPCBuilder::new()
.on("ping", |ctx, e| Box::pin(handle_ping_event(ctx, e)))
.address(address)
}
#[tokio::test]
async fn it_receives_events() {
let builder = get_builder_with_ping("127.0.0.1:8281");
async fn it_receives_tcp_events() {
let socket_address = "127.0.0.1:8281".to_socket_addrs().unwrap().next().unwrap();
it_receives_events::<TcpListener>(socket_address).await;
}
#[cfg(unix)]
#[tokio::test]
async fn it_receives_unix_socket_events() {
let socket_path = PathBuf::from("/tmp/test_socket");
if socket_path.exists() {
std::fs::remove_file(&socket_path).unwrap();
}
it_receives_events::<tokio::net::UnixListener>(socket_path).await;
}
async fn it_receives_events<L: 'static + AsyncStreamProtocolListener>(address: L::AddressType) {
let builder = get_builder_with_ping::<L>(address.clone());
let server_running = Arc::new(AtomicBool::new(false));
tokio::spawn({
let server_running = Arc::clone(&server_running);
let builder = get_builder_with_ping("127.0.0.1:8281");
let builder = get_builder_with_ping::<L>(address);
async move {
server_running.store(true, Ordering::SeqCst);
builder.build_server().await.unwrap();
@ -58,18 +78,18 @@ async fn it_receives_events() {
assert_eq!(reply.name(), "pong");
}
fn get_builder_with_ping_mainspace(address: &str) -> IPCBuilder {
fn get_builder_with_ping_namespace(address: &str) -> IPCBuilder<TcpListener> {
IPCBuilder::new()
.namespace("mainspace")
.on("ping", callback!(handle_ping_event))
.build()
.address(address)
.address(address.to_socket_addrs().unwrap().next().unwrap())
}
pub struct TestNamespace;
impl TestNamespace {
async fn ping(_c: &Context, _e: Event) -> IPCResult<()> {
async fn ping<P: AsyncProtocolStream>(_c: &Context<P>, _e: Event) -> IPCResult<()> {
println!("Ping received");
Ok(())
}
@ -80,7 +100,7 @@ impl NamespaceProvider for TestNamespace {
"Test"
}
fn register(handler: &mut EventHandler) {
fn register<S: AsyncProtocolStream>(handler: &mut EventHandler<S>) {
events!(handler,
"ping" => Self::ping,
"ping2" => Self::ping
@ -90,11 +110,11 @@ impl NamespaceProvider for TestNamespace {
#[tokio::test]
async fn it_receives_namespaced_events() {
let builder = get_builder_with_ping_mainspace("127.0.0.1:8282");
let builder = get_builder_with_ping_namespace("127.0.0.1:8282");
let server_running = Arc::new(AtomicBool::new(false));
tokio::spawn({
let server_running = Arc::clone(&server_running);
let builder = get_builder_with_ping_mainspace("127.0.0.1:8282");
let builder = get_builder_with_ping_namespace("127.0.0.1:8282");
async move {
server_running.store(true, Ordering::SeqCst);
builder.build_server().await.unwrap();
@ -132,7 +152,10 @@ impl TypeMapKey for ErrorOccurredKey {
type Value = Arc<AtomicBool>;
}
fn get_builder_with_error_handling(error_occurred: Arc<AtomicBool>, address: &str) -> IPCBuilder {
fn get_builder_with_error_handling(
error_occurred: Arc<AtomicBool>,
address: &str,
) -> IPCBuilder<TcpListener> {
IPCBuilder::new()
.insert::<ErrorOccurredKey>(error_occurred)
.on("ping", move |_, _| {
@ -152,7 +175,7 @@ fn get_builder_with_error_handling(error_occurred: Arc<AtomicBool>, address: &st
Ok(())
}),
)
.address(address)
.address(address.to_socket_addrs().unwrap().next().unwrap())
}
#[tokio::test]
@ -185,8 +208,8 @@ async fn it_handles_errors() {
async fn test_error_responses() {
static ADDRESS: &str = "127.0.0.1:8284";
start_test_server(ADDRESS).await.unwrap();
let ctx = IPCBuilder::new()
.address(ADDRESS)
let ctx = IPCBuilder::<TcpListener>::new()
.address(ADDRESS.to_socket_addrs().unwrap().next().unwrap())
.build_client()
.await
.unwrap();

@ -1,7 +1,9 @@
use crate::error::Error;
use crate::IPCBuilder;
use serde::{Deserialize, Serialize};
use std::net::ToSocketAddrs;
use std::time::SystemTime;
use tokio::net::TcpListener;
use tokio::sync::oneshot;
#[derive(Clone, Serialize, Deserialize, Debug)]
@ -15,8 +17,8 @@ pub fn start_test_server(address: &'static str) -> oneshot::Receiver<bool> {
let (tx, rx) = oneshot::channel();
tokio::task::spawn(async move {
tx.send(true).unwrap();
IPCBuilder::new()
.address(address)
IPCBuilder::<TcpListener>::new()
.address(address.to_socket_addrs().unwrap().next().unwrap())
.on("ping", |ctx, event| {
Box::pin(async move {
ctx.emitter.emit_response(event.id(), "pong", ()).await?;

Loading…
Cancel
Save