From b55c6e526b12278371c0b7dd2bfb13817734802e Mon Sep 17 00:00:00 2001 From: trivernis Date: Sun, 7 Nov 2021 10:58:51 +0100 Subject: [PATCH 1/7] Add AsyncStreamProtocol trait with subtraits Signed-off-by: trivernis --- Cargo.lock | 12 ++++++++++++ Cargo.toml | 1 + src/lib.rs | 1 + src/protocol/mod.rs | 31 +++++++++++++++++++++++++++++++ 4 files changed, 45 insertions(+) create mode 100644 src/protocol/mod.rs diff --git a/Cargo.lock b/Cargo.lock index 392a3e7d..519b5849 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,17 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "async-trait" +version = "0.1.51" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "44318e776df68115a881de9a8fd1b9e53368d7a4a5ce4cc48517da3393233a5e" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "atty" version = "0.2.14" @@ -502,6 +513,7 @@ dependencies = [ name = "rmp-ipc" version = "0.8.2" dependencies = [ + "async-trait", "byteorder", "criterion", "lazy_static", diff --git a/Cargo.toml b/Cargo.toml index 836934aa..3f2fea2a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,6 +26,7 @@ tracing = "0.1.29" lazy_static = "1.4.0" typemap_rev = "0.1.5" byteorder = "1.4.3" +async-trait = "0.1.51" [dependencies.serde] version = "1.0.130" diff --git a/src/lib.rs b/src/lib.rs index 5a647a1f..b5f5b603 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -105,6 +105,7 @@ mod events; pub mod ipc; mod macros; mod namespaces; +pub(crate) mod protocol; pub use events::error_event; pub use events::event; diff --git a/src/protocol/mod.rs b/src/protocol/mod.rs new file mode 100644 index 00000000..1108d034 --- /dev/null +++ b/src/protocol/mod.rs @@ -0,0 +1,31 @@ +use crate::prelude::IPCResult; +use async_trait::async_trait; +use tokio::io::{AsyncRead, AsyncWrite}; + +#[async_trait] +pub trait AsyncStreamProtocol { + type Listener: AsyncStreamProtocolListener; + type Stream: AsyncProtocolStream; +} + +#[async_trait] +pub trait AsyncStreamProtocolListener: Sized { + type AddressType; + type RemoteAddressType; + type Stream: AsyncProtocolStream; + + async fn bind(address: Self::AddressType) -> IPCResult; + + async fn accept(&self) -> IPCResult<(Self::Stream, Self::RemoteAddressType)>; +} + +#[async_trait] +pub trait AsyncProtocolStream: AsyncRead + AsyncWrite + Sized + Send + Sync { + type AddressType; + type OwnedSplitReadHalf: AsyncRead + Send + Sync; + type OwnedSplitWriteHalf: AsyncWrite + Send + Sync; + + async fn connect(address: Self::AddressType) -> IPCResult; + + async fn into_split(self) -> IPCResult<(Self::OwnedSplitReadHalf, Self::OwnedSplitWriteHalf)>; +} From cff9b07e76dad6a9f115d879bb6f83fe31ba9cef Mon Sep 17 00:00:00 2001 From: trivernis Date: Sun, 7 Nov 2021 11:13:20 +0100 Subject: [PATCH 2/7] Add protocol impelemtation for tcp Signed-off-by: trivernis --- src/protocol/mod.rs | 10 +++++---- src/protocol/tcp.rs | 50 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 56 insertions(+), 4 deletions(-) create mode 100644 src/protocol/tcp.rs diff --git a/src/protocol/mod.rs b/src/protocol/mod.rs index 1108d034..949aa553 100644 --- a/src/protocol/mod.rs +++ b/src/protocol/mod.rs @@ -1,3 +1,5 @@ +mod tcp; + use crate::prelude::IPCResult; use async_trait::async_trait; use tokio::io::{AsyncRead, AsyncWrite}; @@ -14,9 +16,9 @@ pub trait AsyncStreamProtocolListener: Sized { type RemoteAddressType; type Stream: AsyncProtocolStream; - async fn bind(address: Self::AddressType) -> IPCResult; + async fn protocol_bind(address: Self::AddressType) -> IPCResult; - async fn accept(&self) -> IPCResult<(Self::Stream, Self::RemoteAddressType)>; + async fn protocol_accept(&self) -> IPCResult<(Self::Stream, Self::RemoteAddressType)>; } #[async_trait] @@ -25,7 +27,7 @@ pub trait AsyncProtocolStream: AsyncRead + AsyncWrite + Sized + Send + Sync { type OwnedSplitReadHalf: AsyncRead + Send + Sync; type OwnedSplitWriteHalf: AsyncWrite + Send + Sync; - async fn connect(address: Self::AddressType) -> IPCResult; + async fn protocol_connect(address: Self::AddressType) -> IPCResult; - async fn into_split(self) -> IPCResult<(Self::OwnedSplitReadHalf, Self::OwnedSplitWriteHalf)>; + async fn protocol_into_split(self) -> (Self::OwnedSplitReadHalf, Self::OwnedSplitWriteHalf); } diff --git a/src/protocol/tcp.rs b/src/protocol/tcp.rs new file mode 100644 index 00000000..70b35434 --- /dev/null +++ b/src/protocol/tcp.rs @@ -0,0 +1,50 @@ +use crate::prelude::IPCResult; +use crate::protocol::{AsyncProtocolStream, AsyncStreamProtocol, AsyncStreamProtocolListener}; +use async_trait::async_trait; +use std::net::SocketAddr; +use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf}; +use tokio::net::{TcpListener, TcpStream}; + +pub struct TcpProtocol; + +#[async_trait] +impl AsyncStreamProtocol for TcpProtocol { + type Listener = TcpListener; + type Stream = TcpStream; +} + +#[async_trait] +impl AsyncStreamProtocolListener for TcpListener { + type AddressType = SocketAddr; + type RemoteAddressType = SocketAddr; + type Stream = TcpStream; + + async fn protocol_bind(address: Self::AddressType) -> IPCResult { + let listener = TcpListener::bind(address).await?; + + Ok(listener) + } + + async fn protocol_accept(&self) -> IPCResult<(Self::Stream, Self::RemoteAddressType)> { + let connection = self.accept().await?; + + Ok(connection) + } +} + +#[async_trait] +impl AsyncProtocolStream for TcpStream { + type AddressType = SocketAddr; + type OwnedSplitReadHalf = OwnedReadHalf; + type OwnedSplitWriteHalf = OwnedWriteHalf; + + async fn protocol_connect(address: Self::AddressType) -> IPCResult { + let stream = TcpStream::connect(address).await?; + + Ok(stream) + } + + async fn protocol_into_split(self) -> (Self::OwnedSplitReadHalf, Self::OwnedSplitWriteHalf) { + self.into_split() + } +} From 4fe9ed16db67d3c6840e88fa4aa4637802eeb8ec Mon Sep 17 00:00:00 2001 From: trivernis Date: Sun, 7 Nov 2021 12:33:22 +0100 Subject: [PATCH 3/7] Change all explicit protocols to generic trait bindings Signed-off-by: trivernis --- README.md | 26 ++++++++------- src/events/event_handler.rs | 35 +++++++++++++++------ src/ipc/builder.rs | 44 +++++++++++++++----------- src/ipc/client.rs | 19 ++++++----- src/ipc/context.rs | 54 +++++++++++++++++++++++++------- src/ipc/mod.rs | 18 ++++++----- src/ipc/server.rs | 23 ++++++++------ src/ipc/stream_emitter.rs | 27 +++++++++++----- src/lib.rs | 21 ++++++++----- src/namespaces/builder.rs | 18 ++++++----- src/namespaces/namespace.rs | 26 ++++++++++++--- src/namespaces/provider_trait.rs | 8 +++-- src/protocol/mod.rs | 32 +++++++++---------- src/protocol/tcp.rs | 25 ++++++--------- src/tests/ipc_tests.rs | 32 +++++++++++-------- src/tests/utils.rs | 6 ++-- 16 files changed, 263 insertions(+), 151 deletions(-) diff --git a/README.md b/README.md index 8403b24b..d9061a2b 100644 --- a/README.md +++ b/README.md @@ -6,10 +6,11 @@ Interprocess Communication via TCP using Rust MessagePack. **Client:** ```rust -use rmp_ipc::{callback, Event, context::Context, IPCBuilder, error::Result}; +use rmp_ipc::prelude::*; +use tokio::net::TcpListener; /// Callback ping function -async fn handle_ping(ctx: &Context, event: Event) -> Result<()> { +async fn handle_ping(ctx: &Context, event: Event) -> Result<()> { println!("Received ping event."); ctx.emitter.emit_response(event.id(), "pong", ()).await?; Ok(()) @@ -18,7 +19,7 @@ async fn handle_ping(ctx: &Context, event: Event) -> Result<()> { #[tokio::main] async fn main() { // create the client - let ctx = IPCBuilder::new() + let ctx = IPCBuilder::::new() .address("127.0.0.1:2020") // register callback .on("ping", callback!(handle_ping)) @@ -31,12 +32,13 @@ async fn main() { **Server:** ```rust -use rmp_ipc::{IPCBuilder, callback}; +use rmp_ipc::prelude::*; +use tokio::net::TcpListener; // create the server #[tokio::main] async fn main() { - IPCBuilder::new() + IPCBuilder::::new() .address("127.0.0.1:2020") // register callback .on("ping", callback!(ctx, event, async move { @@ -51,12 +53,13 @@ async fn main() { **Client:** ```rust -use rmp_ipc::IPCBuilder; +use rmp_ipc::prelude::*; +use tokio::net::TcpListener; // create the client #[tokio::main] async fn main() { - let ctx = IPCBuilder::new() + let ctx = IPCBuilder::::new() .address("127.0.0.1:2020") // register namespace .namespace("mainspace-client") @@ -76,13 +79,14 @@ async fn main() { **Server:** ```rust -use rmp_ipc::{IPCBuilder, EventHandler, namespace, command, Event, context::Context}; +use rmp_ipc::prelude::*; +use tokio::net::TcpListener; // create the server pub struct MyNamespace; impl MyNamespace { - async fn ping(_ctx: &Context, _event: Event) -> Result<()> { + async fn ping(_ctx: &Context, _event: Event) -> Result<()> { println!("My namespace received a ping"); Ok(()) } @@ -91,7 +95,7 @@ impl MyNamespace { impl NamespaceProvider for MyNamespace { fn name() -> &'static str {"my_namespace"} - fn register(handler: &mut EventHandler) { + fn register(handler: &mut EventHandler) { events!(handler, "ping" => Self::ping ); @@ -100,7 +104,7 @@ impl NamespaceProvider for MyNamespace { #[tokio::main] async fn main() { - IPCBuilder::new() + IPCBuilder::::new() .address("127.0.0.1:2020") // register namespace .namespace("mainspace-server") diff --git a/src/events/event_handler.rs b/src/events/event_handler.rs index 8cd9a897..c54057d2 100644 --- a/src/events/event_handler.rs +++ b/src/events/event_handler.rs @@ -1,25 +1,39 @@ use crate::error::Result; use crate::events::event::Event; use crate::ipc::context::Context; +use crate::protocol::AsyncProtocolStream; use std::collections::HashMap; use std::fmt::{Debug, Formatter}; use std::future::Future; use std::pin::Pin; use std::sync::Arc; -type EventCallback = Arc< - dyn for<'a> Fn(&'a Context, Event) -> Pin> + Send + 'a)>> +type EventCallback

= Arc< + dyn for<'a> Fn(&'a Context

, Event) -> Pin> + Send + 'a)>> + Send + Sync, >; /// Handler for events -#[derive(Clone)] -pub struct EventHandler { - callbacks: HashMap, +pub struct EventHandler { + callbacks: HashMap>, } -impl Debug for EventHandler { +impl Clone for EventHandler +where + S: AsyncProtocolStream, +{ + fn clone(&self) -> Self { + Self { + callbacks: self.callbacks.clone(), + } + } +} + +impl

Debug for EventHandler

+where + P: AsyncProtocolStream, +{ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { let callback_names: String = self .callbacks @@ -31,7 +45,10 @@ impl Debug for EventHandler { } } -impl EventHandler { +impl

EventHandler

+where + P: AsyncProtocolStream, +{ /// Creates a new event handler pub fn new() -> Self { Self { @@ -44,7 +61,7 @@ impl EventHandler { pub fn on(&mut self, name: &str, callback: F) where F: for<'a> Fn( - &'a Context, + &'a Context

, Event, ) -> Pin> + Send + 'a)>> + Send @@ -55,7 +72,7 @@ impl EventHandler { /// Handles a received event #[tracing::instrument(level = "debug", skip(self, ctx, event))] - pub async fn handle_event(&self, ctx: &Context, event: Event) -> Result<()> { + pub async fn handle_event(&self, ctx: &Context

, event: Event) -> Result<()> { if let Some(cb) = self.callbacks.get(event.name()) { cb.as_ref()(ctx, event).await?; } diff --git a/src/ipc/builder.rs b/src/ipc/builder.rs index fffb5e27..dcfe6948 100644 --- a/src/ipc/builder.rs +++ b/src/ipc/builder.rs @@ -7,6 +7,7 @@ use crate::ipc::context::{Context, PooledContext, ReplyListeners}; use crate::ipc::server::IPCServer; use crate::namespaces::builder::NamespaceBuilder; use crate::namespaces::namespace::Namespace; +use crate::protocol::AsyncStreamProtocolListener; use std::collections::HashMap; use std::future::Future; use std::pin::Pin; @@ -16,8 +17,10 @@ use typemap_rev::{TypeMap, TypeMapKey}; /// A builder for the IPC server or client. /// ```no_run -///use typemap_rev::TypeMapKey; +/// use std::net::ToSocketAddrs; +/// use typemap_rev::TypeMapKey; /// use rmp_ipc::IPCBuilder; +/// use tokio::net::TcpListener; /// /// struct CustomKey; /// @@ -26,8 +29,8 @@ use typemap_rev::{TypeMap, TypeMapKey}; /// } /// ///# async fn a() { -/// IPCBuilder::new() -/// .address("127.0.0.1:2020") +/// IPCBuilder::::new() +/// .address("127.0.0.1:2020".to_socket_addrs().unwrap().next().unwrap()) /// // register callback /// .on("ping", |_ctx, _event| Box::pin(async move { /// println!("Received ping event."); @@ -46,14 +49,17 @@ use typemap_rev::{TypeMap, TypeMapKey}; /// .build_server().await.unwrap(); ///# } /// ``` -pub struct IPCBuilder { - handler: EventHandler, - address: Option, - namespaces: HashMap, +pub struct IPCBuilder { + handler: EventHandler, + address: Option, + namespaces: HashMap>, data: TypeMap, } -impl IPCBuilder { +impl IPCBuilder +where + L: AsyncStreamProtocolListener, +{ pub fn new() -> Self { let mut handler = EventHandler::new(); handler.on(ERROR_EVENT_NAME, |_, event| { @@ -84,7 +90,7 @@ impl IPCBuilder { pub fn on(mut self, event: &str, callback: F) -> Self where F: for<'a> Fn( - &'a Context, + &'a Context, Event, ) -> Pin> + Send + 'a)>> + Send @@ -96,19 +102,19 @@ impl IPCBuilder { } /// Adds the address to connect to - pub fn address(mut self, address: S) -> Self { - self.address = Some(address.to_string()); + pub fn address(mut self, address: L::AddressType) -> Self { + self.address = Some(address); self } /// Adds a namespace - pub fn namespace(self, name: S) -> NamespaceBuilder { + pub fn namespace(self, name: S) -> NamespaceBuilder { NamespaceBuilder::new(self, name.to_string()) } /// Adds a namespace to the ipc server - pub fn add_namespace(mut self, namespace: Namespace) -> Self { + pub fn add_namespace(mut self, namespace: Namespace) -> Self { self.namespaces .insert(namespace.name().to_owned(), namespace); @@ -119,19 +125,19 @@ impl IPCBuilder { #[tracing::instrument(skip(self))] pub async fn build_server(self) -> Result<()> { self.validate()?; - let server = IPCServer { + let server = IPCServer:: { namespaces: self.namespaces, handler: self.handler, data: self.data, }; - server.start(&self.address.unwrap()).await?; + server.start(self.address.unwrap()).await?; Ok(()) } /// Builds an ipc client #[tracing::instrument(skip(self))] - pub async fn build_client(self) -> Result { + pub async fn build_client(self) -> Result> { self.validate()?; let data = Arc::new(RwLock::new(self.data)); let reply_listeners = ReplyListeners::default(); @@ -142,7 +148,7 @@ impl IPCBuilder { reply_listeners, }; - let ctx = client.connect(&self.address.unwrap()).await?; + let ctx = client.connect(self.address.unwrap()).await?; Ok(ctx) } @@ -152,7 +158,7 @@ impl IPCBuilder { /// return a [crate::context::PooledContext] that allows one to [crate::context::PooledContext::acquire] a single context /// to emit events. #[tracing::instrument(skip(self))] - pub async fn build_pooled_client(self, pool_size: usize) -> Result { + pub async fn build_pooled_client(self, pool_size: usize) -> Result> { if pool_size == 0 { Error::BuildError("Pool size must be greater than 0".to_string()); } @@ -170,7 +176,7 @@ impl IPCBuilder { reply_listeners: Arc::clone(&reply_listeners), }; - let ctx = client.connect(&address).await?; + let ctx = client.connect(address.clone()).await?; contexts.push(ctx); } diff --git a/src/ipc/client.rs b/src/ipc/client.rs index 1546f1e2..d8cc9b6b 100644 --- a/src/ipc/client.rs +++ b/src/ipc/client.rs @@ -4,9 +4,9 @@ use crate::events::event_handler::EventHandler; use crate::ipc::context::{Context, ReplyListeners}; use crate::ipc::stream_emitter::StreamEmitter; use crate::namespaces::namespace::Namespace; +use crate::protocol::AsyncProtocolStream; use std::collections::HashMap; use std::sync::Arc; -use tokio::net::TcpStream; use tokio::sync::oneshot; use tokio::sync::RwLock; use typemap_rev::TypeMap; @@ -15,20 +15,23 @@ use typemap_rev::TypeMap; /// Use the [IPCBuilder](crate::builder::IPCBuilder) to create the client. /// Usually one does not need to use the IPCClient object directly. #[derive(Clone)] -pub struct IPCClient { - pub(crate) handler: EventHandler, - pub(crate) namespaces: HashMap, +pub struct IPCClient { + pub(crate) handler: EventHandler, + pub(crate) namespaces: HashMap>, pub(crate) data: Arc>, pub(crate) reply_listeners: ReplyListeners, } -impl IPCClient { +impl IPCClient +where + S: 'static + AsyncProtocolStream, +{ /// Connects to a given address and returns an emitter for events to that address. /// Invoked by [IPCBuilder::build_client](crate::builder::IPCBuilder::build_client) #[tracing::instrument(skip(self))] - pub async fn connect(self, address: &str) -> Result { - let stream = TcpStream::connect(address).await?; - let (read_half, write_half) = stream.into_split(); + pub async fn connect(self, address: S::AddressType) -> Result> { + let stream = S::protocol_connect(address).await?; + let (read_half, write_half) = stream.protocol_into_split(); let emitter = StreamEmitter::new(write_half); let (tx, rx) = oneshot::channel(); let ctx = Context::new( diff --git a/src/ipc/context.rs b/src/ipc/context.rs index beff8e25..aa341a90 100644 --- a/src/ipc/context.rs +++ b/src/ipc/context.rs @@ -1,6 +1,7 @@ use crate::error::{Error, Result}; use crate::event::Event; use crate::ipc::stream_emitter::StreamEmitter; +use crate::protocol::AsyncProtocolStream; use std::collections::HashMap; use std::mem; use std::ops::{Deref, DerefMut}; @@ -17,17 +18,16 @@ pub(crate) type ReplyListeners = Arc>> /// ```rust /// use rmp_ipc::prelude::*; /// -/// async fn my_callback(ctx: &Context, _event: Event) -> IPCResult<()> { +/// async fn my_callback(ctx: &Context, _event: Event) -> IPCResult<()> { /// // use the emitter on the context object to emit events /// // inside callbacks /// ctx.emitter.emit("ping", ()).await?; /// Ok(()) /// } /// ``` -#[derive(Clone)] -pub struct Context { +pub struct Context { /// The event emitter - pub emitter: StreamEmitter, + pub emitter: StreamEmitter, /// Field to store additional context data pub data: Arc>, @@ -37,9 +37,26 @@ pub struct Context { reply_listeners: ReplyListeners, } -impl Context { +impl Clone for Context +where + S: AsyncProtocolStream, +{ + fn clone(&self) -> Self { + Self { + emitter: self.emitter.clone(), + data: Arc::clone(&self.data), + stop_sender: Arc::clone(&self.stop_sender), + reply_listeners: Arc::clone(&self.reply_listeners), + } + } +} + +impl

Context

+where + P: AsyncProtocolStream, +{ pub(crate) fn new( - emitter: StreamEmitter, + emitter: StreamEmitter

, data: Arc>, stop_sender: Option>, reply_listeners: ReplyListeners, @@ -83,9 +100,19 @@ impl Context { } } -#[derive(Clone)] -pub struct PooledContext { - contexts: Vec>, +pub struct PooledContext { + contexts: Vec>>, +} + +impl Clone for PooledContext +where + S: AsyncProtocolStream, +{ + fn clone(&self) -> Self { + Self { + contexts: self.contexts.clone(), + } + } } pub struct PoolGuard @@ -169,9 +196,12 @@ where } } -impl PooledContext { +impl

PooledContext

+where + P: AsyncProtocolStream, +{ /// Creates a new pooled context from a list of contexts - pub(crate) fn new(contexts: Vec) -> Self { + pub(crate) fn new(contexts: Vec>) -> Self { Self { contexts: contexts.into_iter().map(PoolGuard::new).collect(), } @@ -180,7 +210,7 @@ impl PooledContext { /// Acquires a context from the pool /// It always chooses the one that is used the least #[tracing::instrument(level = "trace", skip_all)] - pub fn acquire(&self) -> PoolGuard { + pub fn acquire(&self) -> PoolGuard> { self.contexts .iter() .min_by_key(|c| c.count()) diff --git a/src/ipc/mod.rs b/src/ipc/mod.rs index b0d370b9..81ff1f69 100644 --- a/src/ipc/mod.rs +++ b/src/ipc/mod.rs @@ -2,9 +2,9 @@ use crate::error_event::{ErrorEventData, ERROR_EVENT_NAME}; use crate::events::event_handler::EventHandler; use crate::namespaces::namespace::Namespace; use crate::prelude::*; +use crate::protocol::AsyncProtocolStream; use std::collections::HashMap; use std::sync::Arc; -use tokio::net::tcp::OwnedReadHalf; pub mod builder; pub mod client; @@ -13,11 +13,11 @@ pub mod server; pub mod stream_emitter; /// Handles listening to a connection and triggering the corresponding event functions -async fn handle_connection( - namespaces: Arc>, - handler: Arc, - mut read_half: OwnedReadHalf, - ctx: Context, +async fn handle_connection( + namespaces: Arc>>, + handler: Arc>, + mut read_half: S::OwnedSplitReadHalf, + ctx: Context, ) { while let Ok(event) = Event::from_async_read(&mut read_half).await { tracing::trace!( @@ -52,7 +52,11 @@ async fn handle_connection( } /// Handles a single event in a different tokio context -fn handle_event(ctx: Context, handler: Arc, event: Event) { +fn handle_event( + ctx: Context, + handler: Arc>, + event: Event, +) { tokio::spawn(async move { let id = event.id(); if let Err(e) = handler.handle_event(&ctx, event).await { diff --git a/src/ipc/server.rs b/src/ipc/server.rs index ff0d53e4..ce3f78d5 100644 --- a/src/ipc/server.rs +++ b/src/ipc/server.rs @@ -4,33 +4,36 @@ use crate::events::event_handler::EventHandler; use crate::ipc::context::{Context, ReplyListeners}; use crate::ipc::stream_emitter::StreamEmitter; use crate::namespaces::namespace::Namespace; +use crate::protocol::{AsyncProtocolStreamSplit, AsyncStreamProtocolListener}; use std::collections::HashMap; use std::sync::Arc; -use tokio::net::TcpListener; use tokio::sync::RwLock; use typemap_rev::TypeMap; /// The IPC Server listening for connections. /// Use the [IPCBuilder](crate::builder::IPCBuilder) to create a server. /// Usually one does not need to use the IPCServer object directly. -pub struct IPCServer { - pub(crate) handler: EventHandler, - pub(crate) namespaces: HashMap, +pub struct IPCServer { + pub(crate) handler: EventHandler, + pub(crate) namespaces: HashMap>, pub(crate) data: TypeMap, } -impl IPCServer { +impl IPCServer +where + L: AsyncStreamProtocolListener, +{ /// Starts the IPC Server. /// Invoked by [IPCBuilder::build_server](crate::builder::IPCBuilder::build_server) #[tracing::instrument(skip(self))] - pub async fn start(self, address: &str) -> Result<()> { - let listener = TcpListener::bind(address).await?; + pub async fn start(self, address: L::AddressType) -> Result<()> { + let listener = L::protocol_bind(address.clone()).await?; let handler = Arc::new(self.handler); let namespaces = Arc::new(self.namespaces); let data = Arc::new(RwLock::new(self.data)); - tracing::info!(address); + tracing::info!("address = {}", address.to_string()); - while let Ok((stream, remote_address)) = listener.accept().await { + while let Ok((stream, remote_address)) = listener.protocol_accept().await { let remote_address = remote_address.to_string(); tracing::debug!("remote_address = {}", remote_address); let handler = Arc::clone(&handler); @@ -38,7 +41,7 @@ impl IPCServer { let data = Arc::clone(&data); tokio::spawn(async { - let (read_half, write_half) = stream.into_split(); + let (read_half, write_half) = stream.protocol_into_split(); let emitter = StreamEmitter::new(write_half); let reply_listeners = ReplyListeners::default(); let ctx = Context::new(StreamEmitter::clone(&emitter), data, None, reply_listeners); diff --git a/src/ipc/stream_emitter.rs b/src/ipc/stream_emitter.rs index 996fdb88..51ebc801 100644 --- a/src/ipc/stream_emitter.rs +++ b/src/ipc/stream_emitter.rs @@ -3,22 +3,35 @@ use crate::error_event::{ErrorEventData, ERROR_EVENT_NAME}; use crate::events::event::Event; use crate::events::payload::EventSendPayload; use crate::ipc::context::Context; +use crate::protocol::AsyncProtocolStream; use std::fmt::Debug; use std::sync::Arc; use tokio::io::AsyncWriteExt; -use tokio::net::tcp::OwnedWriteHalf; use tokio::sync::Mutex; /// An abstraction over the raw tokio tcp stream /// to emit events and share a connection across multiple /// contexts. -#[derive(Clone)] -pub struct StreamEmitter { - stream: Arc>, +pub struct StreamEmitter { + stream: Arc>, } -impl StreamEmitter { - pub fn new(stream: OwnedWriteHalf) -> Self { +impl Clone for StreamEmitter +where + S: AsyncProtocolStream, +{ + fn clone(&self) -> Self { + Self { + stream: Arc::clone(&self.stream), + } + } +} + +impl

StreamEmitter

+where + P: AsyncProtocolStream, +{ + pub fn new(stream: P::OwnedSplitWriteHalf) -> Self { Self { stream: Arc::new(Mutex::new(stream)), } @@ -118,7 +131,7 @@ impl EmitMetadata { /// Waits for a reply to the given message. #[tracing::instrument(skip(self, ctx), fields(self.message_id))] - pub async fn await_reply(&self, ctx: &Context) -> Result { + pub async fn await_reply(&self, ctx: &Context

) -> Result { let reply = ctx.await_reply(self.message_id).await?; if reply.name() == ERROR_EVENT_NAME { Err(reply.data::()?.into()) diff --git a/src/lib.rs b/src/lib.rs index b5f5b603..f8a36a21 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,9 +3,10 @@ //! Client Example: //! ```no_run //! use rmp_ipc::prelude::*; +//! use tokio::net::TcpListener; //! //! /// Callback ping function -//! async fn handle_ping(ctx: &Context, event: Event) -> IPCResult<()> { +//! async fn handle_ping(ctx: &Context, event: Event) -> IPCResult<()> { //! println!("Received ping event."); //! ctx.emitter.emit_response(event.id(), "pong", ()).await?; //! @@ -15,7 +16,7 @@ //! pub struct MyNamespace; //! //! impl MyNamespace { -//! async fn ping(_ctx: &Context, _event: Event) -> IPCResult<()> { +//! async fn ping(_ctx: &Context, _event: Event) -> IPCResult<()> { //! println!("My namespace received a ping"); //! Ok(()) //! } @@ -24,7 +25,7 @@ //! impl NamespaceProvider for MyNamespace { //! fn name() -> &'static str {"my_namespace"} //! -//! fn register(handler: &mut EventHandler) { +//! fn register(handler: &mut EventHandler) { //! events!(handler, //! "ping" => Self::ping, //! "ping2" => Self::ping @@ -35,8 +36,9 @@ //! #[tokio::main] //! async fn main() { //! // create the client -//! let ctx = IPCBuilder::new() -//! .address("127.0.0.1:2020") +//! use std::net::ToSocketAddrs; +//! let ctx = IPCBuilder::::new() +//! .address("127.0.0.1:2020".to_socket_addrs().unwrap().next().unwrap()) //! // register callback //! .on("ping", callback!(handle_ping)) //! .namespace("mainspace-client") @@ -58,9 +60,11 @@ //! //! Server Example: //! ```no_run +//! use std::net::ToSocketAddrs; //! use typemap_rev::TypeMapKey; //! use rmp_ipc::IPCBuilder; //! use rmp_ipc::callback; +//! use tokio::net::TcpListener; //! //! struct MyKey; //! @@ -70,8 +74,8 @@ //! //! // create the server //!# async fn a() { -//! IPCBuilder::new() -//! .address("127.0.0.1:2020") +//! IPCBuilder::::new() +//! .address("127.0.0.1:2020".to_socket_addrs().unwrap().next().unwrap()) //! // register callback //! .on("ping", callback!(ctx, event, async move { //! println!("Received ping event."); @@ -105,7 +109,7 @@ mod events; pub mod ipc; mod macros; mod namespaces; -pub(crate) mod protocol; +pub mod protocol; pub use events::error_event; pub use events::event; @@ -131,5 +135,6 @@ pub mod prelude { pub use crate::namespaces::builder::NamespaceBuilder; pub use crate::namespaces::provider_trait::*; pub use crate::payload::*; + pub use crate::protocol::*; pub use crate::*; } diff --git a/src/namespaces/builder.rs b/src/namespaces/builder.rs index 45862c7b..05688522 100644 --- a/src/namespaces/builder.rs +++ b/src/namespaces/builder.rs @@ -3,18 +3,22 @@ use crate::event::Event; use crate::events::event_handler::EventHandler; use crate::ipc::context::Context; use crate::namespaces::namespace::Namespace; +use crate::protocol::AsyncStreamProtocolListener; use crate::IPCBuilder; use std::future::Future; use std::pin::Pin; -pub struct NamespaceBuilder { +pub struct NamespaceBuilder { name: String, - handler: EventHandler, - ipc_builder: IPCBuilder, + handler: EventHandler, + ipc_builder: IPCBuilder, } -impl NamespaceBuilder { - pub(crate) fn new(ipc_builder: IPCBuilder, name: String) -> Self { +impl NamespaceBuilder +where + L: AsyncStreamProtocolListener, +{ + pub(crate) fn new(ipc_builder: IPCBuilder, name: String) -> Self { Self { name, handler: EventHandler::new(), @@ -26,7 +30,7 @@ impl NamespaceBuilder { pub fn on(mut self, event: &str, callback: F) -> Self where F: for<'a> Fn( - &'a Context, + &'a Context, Event, ) -> Pin> + Send + 'a)>> + Send @@ -39,7 +43,7 @@ impl NamespaceBuilder { /// Builds the namespace #[tracing::instrument(skip(self))] - pub fn build(self) -> IPCBuilder { + pub fn build(self) -> IPCBuilder { let namespace = Namespace::new(self.name, self.handler); self.ipc_builder.add_namespace(namespace) } diff --git a/src/namespaces/namespace.rs b/src/namespaces/namespace.rs index 72dc77c8..cae6afe9 100644 --- a/src/namespaces/namespace.rs +++ b/src/namespaces/namespace.rs @@ -1,15 +1,31 @@ use crate::events::event_handler::EventHandler; +use crate::protocol::AsyncProtocolStream; use std::sync::Arc; -#[derive(Clone, Debug)] -pub struct Namespace { +#[derive(Debug)] +pub struct Namespace { name: String, - pub(crate) handler: Arc, + pub(crate) handler: Arc>, } -impl Namespace { +impl Clone for Namespace +where + S: AsyncProtocolStream, +{ + fn clone(&self) -> Self { + Self { + name: self.name.clone(), + handler: Arc::clone(&self.handler), + } + } +} + +impl Namespace +where + S: AsyncProtocolStream, +{ /// Creates a new namespace with an event handler to register event callbacks on - pub fn new(name: S, handler: EventHandler) -> Self { + pub fn new(name: S2, handler: EventHandler) -> Self { Self { name: name.to_string(), handler: Arc::new(handler), diff --git a/src/namespaces/provider_trait.rs b/src/namespaces/provider_trait.rs index ce7f1829..21a08809 100644 --- a/src/namespaces/provider_trait.rs +++ b/src/namespaces/provider_trait.rs @@ -1,12 +1,16 @@ use crate::events::event_handler::EventHandler; use crate::namespace::Namespace; +use crate::protocol::AsyncProtocolStream; pub trait NamespaceProvider { fn name() -> &'static str; - fn register(handler: &mut EventHandler); + fn register(handler: &mut EventHandler); } -impl Namespace { +impl Namespace +where + S: AsyncProtocolStream, +{ pub fn from_provider() -> Self { let name = N::name(); let mut handler = EventHandler::new(); diff --git a/src/protocol/mod.rs b/src/protocol/mod.rs index 949aa553..55a95e39 100644 --- a/src/protocol/mod.rs +++ b/src/protocol/mod.rs @@ -1,33 +1,33 @@ -mod tcp; +pub mod tcp; use crate::prelude::IPCResult; use async_trait::async_trait; +use std::fmt::Debug; use tokio::io::{AsyncRead, AsyncWrite}; -#[async_trait] -pub trait AsyncStreamProtocol { - type Listener: AsyncStreamProtocolListener; - type Stream: AsyncProtocolStream; -} - #[async_trait] pub trait AsyncStreamProtocolListener: Sized { - type AddressType; - type RemoteAddressType; - type Stream: AsyncProtocolStream; + type AddressType: ToString + Clone + Debug; + type RemoteAddressType: ToString; + type Stream: 'static + AsyncProtocolStream; async fn protocol_bind(address: Self::AddressType) -> IPCResult; async fn protocol_accept(&self) -> IPCResult<(Self::Stream, Self::RemoteAddressType)>; } +pub trait AsyncProtocolStreamSplit { + type OwnedSplitReadHalf: AsyncRead + Send + Sync + Unpin; + type OwnedSplitWriteHalf: AsyncWrite + Send + Sync + Unpin; + + fn protocol_into_split(self) -> (Self::OwnedSplitReadHalf, Self::OwnedSplitWriteHalf); +} + #[async_trait] -pub trait AsyncProtocolStream: AsyncRead + AsyncWrite + Sized + Send + Sync { - type AddressType; - type OwnedSplitReadHalf: AsyncRead + Send + Sync; - type OwnedSplitWriteHalf: AsyncWrite + Send + Sync; +pub trait AsyncProtocolStream: + AsyncRead + AsyncWrite + Sized + Send + Sync + AsyncProtocolStreamSplit +{ + type AddressType: ToString + Clone + Debug; async fn protocol_connect(address: Self::AddressType) -> IPCResult; - - async fn protocol_into_split(self) -> (Self::OwnedSplitReadHalf, Self::OwnedSplitWriteHalf); } diff --git a/src/protocol/tcp.rs b/src/protocol/tcp.rs index 70b35434..abe43d4f 100644 --- a/src/protocol/tcp.rs +++ b/src/protocol/tcp.rs @@ -1,18 +1,10 @@ use crate::prelude::IPCResult; -use crate::protocol::{AsyncProtocolStream, AsyncStreamProtocol, AsyncStreamProtocolListener}; +use crate::protocol::{AsyncProtocolStream, AsyncProtocolStreamSplit, AsyncStreamProtocolListener}; use async_trait::async_trait; use std::net::SocketAddr; use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf}; use tokio::net::{TcpListener, TcpStream}; -pub struct TcpProtocol; - -#[async_trait] -impl AsyncStreamProtocol for TcpProtocol { - type Listener = TcpListener; - type Stream = TcpStream; -} - #[async_trait] impl AsyncStreamProtocolListener for TcpListener { type AddressType = SocketAddr; @@ -32,19 +24,22 @@ impl AsyncStreamProtocolListener for TcpListener { } } +impl AsyncProtocolStreamSplit for TcpStream { + type OwnedSplitReadHalf = OwnedReadHalf; + type OwnedSplitWriteHalf = OwnedWriteHalf; + + fn protocol_into_split(self) -> (Self::OwnedSplitReadHalf, Self::OwnedSplitWriteHalf) { + self.into_split() + } +} + #[async_trait] impl AsyncProtocolStream for TcpStream { type AddressType = SocketAddr; - type OwnedSplitReadHalf = OwnedReadHalf; - type OwnedSplitWriteHalf = OwnedWriteHalf; async fn protocol_connect(address: Self::AddressType) -> IPCResult { let stream = TcpStream::connect(address).await?; Ok(stream) } - - async fn protocol_into_split(self) -> (Self::OwnedSplitReadHalf, Self::OwnedSplitWriteHalf) { - self.into_split() - } } diff --git a/src/tests/ipc_tests.rs b/src/tests/ipc_tests.rs index 142566f4..1a484fbe 100644 --- a/src/tests/ipc_tests.rs +++ b/src/tests/ipc_tests.rs @@ -1,12 +1,15 @@ use super::utils::PingEventData; use crate::prelude::*; +use crate::protocol::AsyncProtocolStream; use crate::tests::utils::start_test_server; +use std::net::ToSocketAddrs; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::{Duration, SystemTime}; +use tokio::net::TcpListener; use typemap_rev::TypeMapKey; -async fn handle_ping_event(ctx: &Context, e: Event) -> IPCResult<()> { +async fn handle_ping_event(ctx: &Context

, e: Event) -> IPCResult<()> { let mut ping_data = e.data::()?; ping_data.time = SystemTime::now(); ping_data.ttl -= 1; @@ -18,10 +21,10 @@ async fn handle_ping_event(ctx: &Context, e: Event) -> IPCResult<()> { Ok(()) } -fn get_builder_with_ping(address: &str) -> IPCBuilder { +fn get_builder_with_ping(address: &str) -> IPCBuilder { IPCBuilder::new() .on("ping", |ctx, e| Box::pin(handle_ping_event(ctx, e))) - .address(address) + .address(address.to_socket_addrs().unwrap().next().unwrap()) } #[tokio::test] @@ -58,18 +61,18 @@ async fn it_receives_events() { assert_eq!(reply.name(), "pong"); } -fn get_builder_with_ping_mainspace(address: &str) -> IPCBuilder { +fn get_builder_with_ping_namespace(address: &str) -> IPCBuilder { IPCBuilder::new() .namespace("mainspace") .on("ping", callback!(handle_ping_event)) .build() - .address(address) + .address(address.to_socket_addrs().unwrap().next().unwrap()) } pub struct TestNamespace; impl TestNamespace { - async fn ping(_c: &Context, _e: Event) -> IPCResult<()> { + async fn ping(_c: &Context

, _e: Event) -> IPCResult<()> { println!("Ping received"); Ok(()) } @@ -80,7 +83,7 @@ impl NamespaceProvider for TestNamespace { "Test" } - fn register(handler: &mut EventHandler) { + fn register(handler: &mut EventHandler) { events!(handler, "ping" => Self::ping, "ping2" => Self::ping @@ -90,11 +93,11 @@ impl NamespaceProvider for TestNamespace { #[tokio::test] async fn it_receives_namespaced_events() { - let builder = get_builder_with_ping_mainspace("127.0.0.1:8282"); + let builder = get_builder_with_ping_namespace("127.0.0.1:8282"); let server_running = Arc::new(AtomicBool::new(false)); tokio::spawn({ let server_running = Arc::clone(&server_running); - let builder = get_builder_with_ping_mainspace("127.0.0.1:8282"); + let builder = get_builder_with_ping_namespace("127.0.0.1:8282"); async move { server_running.store(true, Ordering::SeqCst); builder.build_server().await.unwrap(); @@ -132,7 +135,10 @@ impl TypeMapKey for ErrorOccurredKey { type Value = Arc; } -fn get_builder_with_error_handling(error_occurred: Arc, address: &str) -> IPCBuilder { +fn get_builder_with_error_handling( + error_occurred: Arc, + address: &str, +) -> IPCBuilder { IPCBuilder::new() .insert::(error_occurred) .on("ping", move |_, _| { @@ -152,7 +158,7 @@ fn get_builder_with_error_handling(error_occurred: Arc, address: &st Ok(()) }), ) - .address(address) + .address(address.to_socket_addrs().unwrap().next().unwrap()) } #[tokio::test] @@ -185,8 +191,8 @@ async fn it_handles_errors() { async fn test_error_responses() { static ADDRESS: &str = "127.0.0.1:8284"; start_test_server(ADDRESS).await.unwrap(); - let ctx = IPCBuilder::new() - .address(ADDRESS) + let ctx = IPCBuilder::::new() + .address(ADDRESS.to_socket_addrs().unwrap().next().unwrap()) .build_client() .await .unwrap(); diff --git a/src/tests/utils.rs b/src/tests/utils.rs index f3c4507e..0ecdbf96 100644 --- a/src/tests/utils.rs +++ b/src/tests/utils.rs @@ -1,7 +1,9 @@ use crate::error::Error; use crate::IPCBuilder; use serde::{Deserialize, Serialize}; +use std::net::ToSocketAddrs; use std::time::SystemTime; +use tokio::net::TcpListener; use tokio::sync::oneshot; #[derive(Clone, Serialize, Deserialize, Debug)] @@ -15,8 +17,8 @@ pub fn start_test_server(address: &'static str) -> oneshot::Receiver { let (tx, rx) = oneshot::channel(); tokio::task::spawn(async move { tx.send(true).unwrap(); - IPCBuilder::new() - .address(address) + IPCBuilder::::new() + .address(address.to_socket_addrs().unwrap().next().unwrap()) .on("ping", |ctx, event| { Box::pin(async move { ctx.emitter.emit_response(event.id(), "pong", ()).await?; From 32883a98dbf43f5a16e9f161b736830381685583 Mon Sep 17 00:00:00 2001 From: trivernis Date: Sun, 7 Nov 2021 12:58:34 +0100 Subject: [PATCH 4/7] Add implementation for unix sockets Signed-off-by: trivernis --- src/ipc/server.rs | 5 ++-- src/protocol/mod.rs | 13 ++++++---- src/protocol/unix_socket.rs | 51 +++++++++++++++++++++++++++++++++++++ src/tests/ipc_tests.rs | 29 ++++++++++++++++----- 4 files changed, 84 insertions(+), 14 deletions(-) create mode 100644 src/protocol/unix_socket.rs diff --git a/src/ipc/server.rs b/src/ipc/server.rs index ce3f78d5..095310df 100644 --- a/src/ipc/server.rs +++ b/src/ipc/server.rs @@ -31,11 +31,10 @@ where let handler = Arc::new(self.handler); let namespaces = Arc::new(self.namespaces); let data = Arc::new(RwLock::new(self.data)); - tracing::info!("address = {}", address.to_string()); + tracing::info!("address = {:?}", address); while let Ok((stream, remote_address)) = listener.protocol_accept().await { - let remote_address = remote_address.to_string(); - tracing::debug!("remote_address = {}", remote_address); + tracing::debug!("remote_address = {:?}", remote_address); let handler = Arc::clone(&handler); let namespaces = Arc::clone(&namespaces); let data = Arc::clone(&data); diff --git a/src/protocol/mod.rs b/src/protocol/mod.rs index 55a95e39..4837ac0f 100644 --- a/src/protocol/mod.rs +++ b/src/protocol/mod.rs @@ -1,14 +1,17 @@ pub mod tcp; +#[cfg(unix)] +pub mod unix_socket; + use crate::prelude::IPCResult; use async_trait::async_trait; use std::fmt::Debug; use tokio::io::{AsyncRead, AsyncWrite}; #[async_trait] -pub trait AsyncStreamProtocolListener: Sized { - type AddressType: ToString + Clone + Debug; - type RemoteAddressType: ToString; +pub trait AsyncStreamProtocolListener: Sized + Send + Sync { + type AddressType: Clone + Debug + Send + Sync; + type RemoteAddressType: Debug; type Stream: 'static + AsyncProtocolStream; async fn protocol_bind(address: Self::AddressType) -> IPCResult; @@ -25,9 +28,9 @@ pub trait AsyncProtocolStreamSplit { #[async_trait] pub trait AsyncProtocolStream: - AsyncRead + AsyncWrite + Sized + Send + Sync + AsyncProtocolStreamSplit + AsyncRead + AsyncWrite + Send + Sync + AsyncProtocolStreamSplit + Sized { - type AddressType: ToString + Clone + Debug; + type AddressType: Clone + Debug + Send + Sync; async fn protocol_connect(address: Self::AddressType) -> IPCResult; } diff --git a/src/protocol/unix_socket.rs b/src/protocol/unix_socket.rs new file mode 100644 index 00000000..91809083 --- /dev/null +++ b/src/protocol/unix_socket.rs @@ -0,0 +1,51 @@ +use crate::error::Result; +use crate::prelude::IPCResult; +use crate::protocol::{AsyncProtocolStream, AsyncProtocolStreamSplit, AsyncStreamProtocolListener}; +use async_trait::async_trait; +use std::path::PathBuf; +use tokio::io::Interest; +use tokio::net::unix::OwnedWriteHalf; +use tokio::net::unix::{OwnedReadHalf, SocketAddr}; +use tokio::net::{UnixListener, UnixStream}; + +#[async_trait] +impl AsyncStreamProtocolListener for UnixListener { + type AddressType = PathBuf; + type RemoteAddressType = SocketAddr; + type Stream = UnixStream; + + async fn protocol_bind(address: Self::AddressType) -> Result { + let listener = UnixListener::bind(address)?; + + Ok(listener) + } + + async fn protocol_accept(&self) -> Result<(Self::Stream, Self::RemoteAddressType)> { + let connection = self.accept().await?; + + Ok(connection) + } +} + +impl AsyncProtocolStreamSplit for UnixStream { + type OwnedSplitReadHalf = OwnedReadHalf; + type OwnedSplitWriteHalf = OwnedWriteHalf; + + fn protocol_into_split(self) -> (Self::OwnedSplitReadHalf, Self::OwnedSplitWriteHalf) { + self.into_split() + } +} + +#[async_trait] +impl AsyncProtocolStream for UnixStream { + type AddressType = PathBuf; + + async fn protocol_connect(address: Self::AddressType) -> IPCResult { + let stream = UnixStream::connect(address).await?; + stream + .ready(Interest::READABLE | Interest::WRITABLE) + .await?; + + Ok(stream) + } +} diff --git a/src/tests/ipc_tests.rs b/src/tests/ipc_tests.rs index 1a484fbe..3760f644 100644 --- a/src/tests/ipc_tests.rs +++ b/src/tests/ipc_tests.rs @@ -3,10 +3,11 @@ use crate::prelude::*; use crate::protocol::AsyncProtocolStream; use crate::tests::utils::start_test_server; use std::net::ToSocketAddrs; +use std::path::PathBuf; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::{Duration, SystemTime}; -use tokio::net::TcpListener; +use tokio::net::{TcpListener, UnixListener}; use typemap_rev::TypeMapKey; async fn handle_ping_event(ctx: &Context

, e: Event) -> IPCResult<()> { @@ -21,19 +22,35 @@ async fn handle_ping_event(ctx: &Context

, e: Event) - Ok(()) } -fn get_builder_with_ping(address: &str) -> IPCBuilder { +fn get_builder_with_ping(address: L::AddressType) -> IPCBuilder { IPCBuilder::new() .on("ping", |ctx, e| Box::pin(handle_ping_event(ctx, e))) - .address(address.to_socket_addrs().unwrap().next().unwrap()) + .address(address) +} + +#[tokio::test] +async fn it_receives_tcp_events() { + let socket_address = "127.0.0.1:8281".to_socket_addrs().unwrap().next().unwrap(); + it_receives_events::(socket_address).await; } +#[cfg(unix)] #[tokio::test] -async fn it_receives_events() { - let builder = get_builder_with_ping("127.0.0.1:8281"); +async fn it_receives_unix_socket_events() { + let socket_path = PathBuf::from("/tmp/test_socket"); + if socket_path.exists() { + std::fs::remove_file(&socket_path).unwrap(); + } + it_receives_events::(socket_path).await; +} + +async fn it_receives_events(address: L::AddressType) { + let builder = get_builder_with_ping::(address.clone()); let server_running = Arc::new(AtomicBool::new(false)); + tokio::spawn({ let server_running = Arc::clone(&server_running); - let builder = get_builder_with_ping("127.0.0.1:8281"); + let builder = get_builder_with_ping::(address); async move { server_running.store(true, Ordering::SeqCst); builder.build_server().await.unwrap(); From b0736c59efef8f23bab9d144e8543be22632416f Mon Sep 17 00:00:00 2001 From: trivernis Date: Sun, 7 Nov 2021 13:01:35 +0100 Subject: [PATCH 5/7] Increment version Signed-off-by: trivernis --- Cargo.lock | 2 +- Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 519b5849..3e6172fe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -511,7 +511,7 @@ dependencies = [ [[package]] name = "rmp-ipc" -version = "0.8.2" +version = "0.9.0" dependencies = [ "async-trait", "byteorder", diff --git a/Cargo.toml b/Cargo.toml index 3f2fea2a..ca478c01 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "rmp-ipc" -version = "0.8.2" +version = "0.9.0" authors = ["trivernis "] edition = "2018" readme = "README.md" From 37e66ad0ab83d1b6577b89008316c0e7bcf82b00 Mon Sep 17 00:00:00 2001 From: trivernis Date: Sun, 7 Nov 2021 13:04:03 +0100 Subject: [PATCH 6/7] Add matrix strategy to ci tests Signed-off-by: trivernis --- .github/workflows/build.yml | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index bfbe5a6b..8862f890 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -10,10 +10,13 @@ env: CARGO_TERM_COLOR: always jobs: - build: - - runs-on: ubuntu-latest + build: + strategy: + fail-fast: false + matrix: + os: [ubuntu-latest, macos-latest, windows-latest] + runs-on: ${{ matrix.os }} steps: - uses: actions/checkout@v2 From bccbb7e0871966c1ae168f8758f129f472d94320 Mon Sep 17 00:00:00 2001 From: trivernis Date: Sun, 7 Nov 2021 13:08:17 +0100 Subject: [PATCH 7/7] Fix test imports being os-specific without conditional compilation Signed-off-by: trivernis --- src/tests/ipc_tests.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/tests/ipc_tests.rs b/src/tests/ipc_tests.rs index 3760f644..4541cf97 100644 --- a/src/tests/ipc_tests.rs +++ b/src/tests/ipc_tests.rs @@ -7,7 +7,7 @@ use std::path::PathBuf; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::{Duration, SystemTime}; -use tokio::net::{TcpListener, UnixListener}; +use tokio::net::TcpListener; use typemap_rev::TypeMapKey; async fn handle_ping_event(ctx: &Context

, e: Event) -> IPCResult<()> { @@ -41,7 +41,7 @@ async fn it_receives_unix_socket_events() { if socket_path.exists() { std::fs::remove_file(&socket_path).unwrap(); } - it_receives_events::(socket_path).await; + it_receives_events::(socket_path).await; } async fn it_receives_events(address: L::AddressType) {