From 7e54755c567a86b9e57b3842e0657235abf49202 Mon Sep 17 00:00:00 2001 From: Trivernis Date: Tue, 19 Apr 2022 12:02:40 +0200 Subject: [PATCH 1/4] Fix clippy errors and missing rt feature Signed-off-by: Trivernis --- Cargo.lock | 2 +- Cargo.toml | 4 ++-- src/events/event.rs | 2 +- src/events/event_handler.rs | 6 ++---- src/events/payload.rs | 4 ++-- src/ipc/builder.rs | 10 ++++++++-- src/ipc/context.rs | 2 +- src/ipc/server.rs | 4 ++-- src/ipc/stream_emitter/emit_metadata_with_response.rs | 4 ++-- .../emit_metadata_with_response_stream.rs | 9 ++++++--- src/ipc/stream_emitter/event_metadata.rs | 4 ++-- 11 files changed, 29 insertions(+), 22 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2bd55d12..5247a424 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -120,7 +120,7 @@ dependencies = [ [[package]] name = "bromine" -version = "0.20.1" +version = "0.20.2" dependencies = [ "async-trait", "bincode", diff --git a/Cargo.toml b/Cargo.toml index 2182ea62..cb60aeca 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "bromine" -version = "0.20.1" +version = "0.20.2" authors = ["trivernis "] edition = "2018" readme = "README.md" @@ -46,7 +46,7 @@ features = [] [dependencies.tokio] version = "1.17.0" -features = ["net", "io-std", "io-util", "sync", "time", "macros"] +features = ["net", "io-std", "io-util", "sync", "time", "macros", "rt"] [dependencies.postcard] version = "0.7.3" diff --git a/src/events/event.rs b/src/events/event.rs index 59475f31..2ad6a441 100644 --- a/src/events/event.rs +++ b/src/events/event.rs @@ -103,7 +103,7 @@ impl Event { /// It represents the message that is replied to and can be None. #[inline] pub fn reference_id(&self) -> Option { - self.header.ref_id.clone() + self.header.ref_id } /// Decodes the payload to the given type implementing the receive payload trait diff --git a/src/events/event_handler.rs b/src/events/event_handler.rs index 16c09fe4..5f9168e7 100644 --- a/src/events/event_handler.rs +++ b/src/events/event_handler.rs @@ -39,7 +39,7 @@ type EventCallback = Arc< >; /// Handler for events -#[derive(Clone)] +#[derive(Clone, Default)] pub struct EventHandler { callbacks: HashMap, } @@ -59,9 +59,7 @@ impl Debug for EventHandler { impl EventHandler { /// Creates a new event handler pub fn new() -> Self { - Self { - callbacks: HashMap::new(), - } + Self::default() } /// Adds a new event callback diff --git a/src/events/payload.rs b/src/events/payload.rs index bad2d26f..0d70d7f4 100644 --- a/src/events/payload.rs +++ b/src/events/payload.rs @@ -98,8 +98,8 @@ impl TandemPayload { impl IntoPayload for TandemPayload { fn into_payload(self, ctx: &Context) -> IPCResult { - let p1_bytes = self.load1.into_payload(&ctx)?; - let p2_bytes = self.load2.into_payload(&ctx)?; + let p1_bytes = self.load1.into_payload(ctx)?; + let p2_bytes = self.load2.into_payload(ctx)?; let mut bytes = BytesMut::with_capacity(p1_bytes.len() + p2_bytes.len() + 16); diff --git a/src/ipc/builder.rs b/src/ipc/builder.rs index 566d6ffa..6bcfdd4c 100644 --- a/src/ipc/builder.rs +++ b/src/ipc/builder.rs @@ -69,6 +69,12 @@ pub struct IPCBuilder { stream_options: ::StreamOptions, } +impl Default for IPCBuilder { + fn default() -> Self { + Self::new() + } +} + impl IPCBuilder where L: AsyncStreamProtocolListener, @@ -236,7 +242,7 @@ where #[tracing::instrument(skip(self))] pub async fn build_pooled_client(self, pool_size: usize) -> Result { if pool_size == 0 { - Error::BuildError("Pool size must be greater than 0".to_string()); + return Err(Error::BuildError("Pool size must be greater than 0".to_string())); } self.validate()?; let data = Arc::new(RwLock::new(self.data)); @@ -250,7 +256,7 @@ where handler: self.handler.clone(), data: Arc::clone(&data), reply_listeners: reply_listeners.clone(), - timeout: self.timeout.clone(), + timeout: self.timeout, #[cfg(feature = "serialize")] default_serializer: self.default_serializer.clone(), diff --git a/src/ipc/context.rs b/src/ipc/context.rs index 540302b9..12f1c56c 100644 --- a/src/ipc/context.rs +++ b/src/ipc/context.rs @@ -83,7 +83,7 @@ impl Context { ) -> EmitMetadata

{ self.emitter.emit_raw( self.clone(), - self.ref_id.clone(), + self.ref_id, name, namespace, event_type, diff --git a/src/ipc/server.rs b/src/ipc/server.rs index c88d238a..ec88fa93 100644 --- a/src/ipc/server.rs +++ b/src/ipc/server.rs @@ -48,7 +48,7 @@ impl IPCServer { let handler = Arc::clone(&handler); let namespaces = Arc::clone(&namespaces); let data = Arc::clone(&data); - let timeout = self.timeout.clone(); + let timeout = self.timeout; #[cfg(feature = "serialize")] let default_serializer = self.default_serializer.clone(); @@ -69,7 +69,7 @@ impl IPCServer { default_serializer.clone(), ); #[cfg(not(feature = "serialize"))] - let ctx = Context::new(emitter, data, None, reply_listeners, timeout.into()); + let ctx = Context::new(emitter, data, None, reply_listeners, timeout); handle_connection::(namespaces, handler, read_half, ctx).await; }); diff --git a/src/ipc/stream_emitter/emit_metadata_with_response.rs b/src/ipc/stream_emitter/emit_metadata_with_response.rs index 896735d1..acd3b045 100644 --- a/src/ipc/stream_emitter/emit_metadata_with_response.rs +++ b/src/ipc/stream_emitter/emit_metadata_with_response.rs @@ -43,7 +43,7 @@ impl Future for EmitMetadataWithResponse let timeout = self .timeout .take() - .unwrap_or_else(|| ctx.default_reply_timeout.clone()); + .unwrap_or(ctx.default_reply_timeout); let event_id = match poll_unwrap!(emit_metadata.event_metadata.as_mut()).get_event() { Ok(e) => e.id(), @@ -58,7 +58,7 @@ impl Future for EmitMetadataWithResponse let reply = tokio::select! { tx_result = tx.recv() => { - Ok(tx_result.ok_or_else(|| Error::SendError)?) + tx_result.ok_or(Error::SendError) } _ = tokio::time::sleep(timeout) => { Err(Error::Timeout) diff --git a/src/ipc/stream_emitter/emit_metadata_with_response_stream.rs b/src/ipc/stream_emitter/emit_metadata_with_response_stream.rs index a0288de4..d90d40b5 100644 --- a/src/ipc/stream_emitter/emit_metadata_with_response_stream.rs +++ b/src/ipc/stream_emitter/emit_metadata_with_response_stream.rs @@ -10,23 +10,26 @@ use std::future::Future; use std::pin::Pin; use std::task::Poll; use std::time::Duration; +use futures_core::future::BoxFuture; use tokio::sync::mpsc::Receiver; /// A metadata object returned after waiting for a reply to an event /// This object needs to be awaited for to get the actual reply pub struct EmitMetadataWithResponseStream { pub(crate) timeout: Option, - pub(crate) fut: Option> + Send + Sync>>>, + pub(crate) fut: Option>>, pub(crate) emit_metadata: Option>, } +type StreamFutureResult = Result<(Option, Context, Receiver)>; + /// An asynchronous stream one can read all responses to a specific event from. pub struct ResponseStream { event_id: u64, ctx: Option, receiver: Option>, timeout: Duration, - fut: Option, Context, Receiver)>>>>>, + fut: Option>, } impl ResponseStream { @@ -71,7 +74,7 @@ impl Future for EmitMetadataWithResponse let timeout = self .timeout .take() - .unwrap_or_else(|| ctx.default_reply_timeout.clone()); + .unwrap_or(ctx.default_reply_timeout); let event_id = match poll_unwrap!(emit_metadata.event_metadata.as_mut()).get_event() { Ok(e) => e.id(), diff --git a/src/ipc/stream_emitter/event_metadata.rs b/src/ipc/stream_emitter/event_metadata.rs index 4ed9a57c..8834c3ea 100644 --- a/src/ipc/stream_emitter/event_metadata.rs +++ b/src/ipc/stream_emitter/event_metadata.rs @@ -37,11 +37,11 @@ impl EventMetadata

{ let payload = self.payload.take().ok_or(Error::InvalidState)?; let res_id = self.res_id.take().ok_or(Error::InvalidState)?; let event_type = self.event_type.take().ok_or(Error::InvalidState)?; - let payload_bytes = payload.into_payload(&ctx)?.into(); + let payload_bytes = payload.into_payload(&ctx)?; let event = Event::new( namespace, - event.to_string(), + event, payload_bytes, res_id, event_type, From 9912622dc1bcca4bc4da18a3d2099d75d17a765a Mon Sep 17 00:00:00 2001 From: Trivernis Date: Tue, 19 Apr 2022 12:06:50 +0200 Subject: [PATCH 2/4] Add clippy task and fix antipattern Signed-off-by: Trivernis --- .github/workflows/lint.yml | 33 +++++++++++++++++++++++++++++++++ src/ipc/builder.rs | 4 ++-- 2 files changed, 35 insertions(+), 2 deletions(-) create mode 100644 .github/workflows/lint.yml diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml new file mode 100644 index 00000000..2dcd14b5 --- /dev/null +++ b/.github/workflows/lint.yml @@ -0,0 +1,33 @@ +name: Lint project files + +on: + push: + branches: [ main, develop ] + pull_request: + branches: [ main, develop ] + +env: + CARGO_TERM_COLOR: always + +jobs: + + lint: + strategy: + fail-fast: false + matrix: + os: [ubuntu-latest, macos-latest, windows-latest] + runs-on: ${{ matrix.os }} + steps: + - uses: actions/checkout@v2 + + - name: Cache build data + uses: actions/cache@v2 + with: + path: | + target + ~/.cargo/ + key: ${{ runner.os }}-cargo-${{ hashFiles('Cargo.lock') }} + restore-keys: | + ${{ runner.os }}-cargo- + - name: Clippy + run: cargo clippy \ No newline at end of file diff --git a/src/ipc/builder.rs b/src/ipc/builder.rs index 6bcfdd4c..85fb15d4 100644 --- a/src/ipc/builder.rs +++ b/src/ipc/builder.rs @@ -98,8 +98,8 @@ where timeout: Duration::from_secs(60), #[cfg(feature = "serialize")] default_serializer: DynamicSerializer::first_available(), - listener_options: Default::default(), - stream_options: Default::default(), + listener_options: L::ListenerOptions::default(), + stream_options: ::StreamOptions::default(), } } From 58ae10bdc9e6edb7d977019de2aaaaebf563656e Mon Sep 17 00:00:00 2001 From: Trivernis Date: Tue, 19 Apr 2022 12:07:19 +0200 Subject: [PATCH 3/4] Change runner for linting to ubuntu only Signed-off-by: Trivernis --- .github/workflows/lint.yml | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index 2dcd14b5..02e36f5e 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -12,11 +12,7 @@ env: jobs: lint: - strategy: - fail-fast: false - matrix: - os: [ubuntu-latest, macos-latest, windows-latest] - runs-on: ${{ matrix.os }} + runs-on: ubuntu-latest steps: - uses: actions/checkout@v2 From c390ec41c1358aa82757eb28f0eca7a68733bae3 Mon Sep 17 00:00:00 2001 From: Trivernis Date: Tue, 19 Apr 2022 12:18:02 +0200 Subject: [PATCH 4/4] Improve the README Signed-off-by: Trivernis --- README.md | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 2f36a63a..17a4cc9b 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,14 @@

bromine

+

+ + + + + + +

Asynchronous event driven interprocess communication supporting tcp and unix domain sockets.

@@ -138,10 +146,6 @@ async fn main() { } ``` -## Benchmarks - -Benchmarks are generated on each commit. They can be reviewed [here](https://trivernis.github.io/rmp-ipc/report/). - ## License Apache-2.0 \ No newline at end of file