From bca5b90d57ec7e7ec95b846a7f9be1bbde8c4a4f Mon Sep 17 00:00:00 2001 From: Trivernis Date: Wed, 29 Dec 2021 15:04:08 +0100 Subject: [PATCH] Remove some dirty unwraps Signed-off-by: Trivernis --- src/ipc/stream_emitter.rs | 79 ++++++++++++++++++++++++--------------- src/macros.rs | 2 +- 2 files changed, 49 insertions(+), 32 deletions(-) diff --git a/src/ipc/stream_emitter.rs b/src/ipc/stream_emitter.rs index f1a86d64..91249083 100644 --- a/src/ipc/stream_emitter.rs +++ b/src/ipc/stream_emitter.rs @@ -1,20 +1,34 @@ use std::future::Future; -use crate::error::{Result, Error}; -use crate::error_event::{ErrorEventData, ERROR_EVENT_NAME}; -use crate::events::event::Event; -use crate::ipc::context::Context; -use crate::protocol::AsyncProtocolStream; +use std::mem; use std::ops::DerefMut; use std::pin::Pin; use std::sync::Arc; use std::task::Poll; use std::time::Duration; -use tokio::io::{AsyncWrite, AsyncWriteExt}; -use tokio::sync::Mutex; -use std::mem; + use futures::future; use futures::future::Either; +use tokio::io::{AsyncWrite, AsyncWriteExt}; +use tokio::sync::Mutex; +use tracing; + +use crate::error::{Error, Result}; +use crate::error_event::{ERROR_EVENT_NAME, ErrorEventData}; +use crate::events::event::Event; +use crate::ipc::context::Context; use crate::payload::IntoPayload; +use crate::protocol::AsyncProtocolStream; + +macro_rules! poll_unwrap { + ($val:expr) => { + if let Some(v) = $val { + v + } else { + tracing::error!("Polling a future with an invalid state."); + return Poll::Ready(Err(Error::InvalidState)) + } + } +} type SendStream = Arc>; @@ -58,7 +72,7 @@ impl StreamEmitter { /// Emits an event to a specific namespace #[inline] - pub(crate) fn emit_to, S2: AsRef,P: IntoPayload>( + pub(crate) fn emit_to, S2: AsRef, P: IntoPayload>( &self, ctx: Context, namespace: S1, @@ -125,7 +139,7 @@ impl EventMetadata

{ } fn build_event(&mut self) -> Result<()> { - let ctx =self.ctx.take().ok_or(Error::InvalidState)?; + let ctx = self.ctx.take().ok_or(Error::InvalidState)?; let event = self.event_name.take().ok_or(Error::InvalidState)?; let namespace = self.event_namespace.take().ok_or(Error::InvalidState)?; let payload = self.payload.take().ok_or(Error::InvalidState)?; @@ -161,28 +175,30 @@ pub struct EmitMetadataWithResponse { emit_metadata: Option>, } -impl EmitMetadata

{ - +impl EmitMetadata

{ #[inline] pub(crate) fn new(ctx: Context, stream: SendStream, event_name: String, event_namespace: Option, payload: P, res_id: Option) -> Self { - Self { event_metadata: Some(EventMetadata { - event: None, - ctx: Some(ctx), - event_name: Some(event_name), - event_namespace: Some(event_namespace), - payload: Some(payload), - res_id: Some(res_id), - }), stream: Some(stream), fut: None } + Self { + event_metadata: Some(EventMetadata { + event: None, + ctx: Some(ctx), + event_name: Some(event_name), + event_namespace: Some(event_namespace), + payload: Some(payload), + res_id: Some(res_id), + }), + stream: Some(stream), + fut: None, + } } /// Waits for a reply to the given message. #[tracing::instrument(skip(self), fields(self.message_id))] pub fn await_reply(self) -> EmitMetadataWithResponse

{ - EmitMetadataWithResponse { timeout: None, fut: None, - emit_metadata: Some(self) + emit_metadata: Some(self), } } } @@ -198,6 +214,7 @@ impl EmitMetadataWithResponse

{ } impl Unpin for EmitMetadata

{} + impl Unpin for EmitMetadataWithResponse

{} impl Future for EmitMetadata

{ @@ -205,12 +222,12 @@ impl Future for EmitMetadata

{ fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { if self.fut.is_none() { - let event_metadata= self.event_metadata.take().expect("poll after future was done"); - let stream = self.stream.take().expect("poll after future was done"); + let event_metadata = poll_unwrap!(self.event_metadata.take()); + let stream = poll_unwrap!(self.stream.take()); let event = match event_metadata.take_event() { - Ok(m) => {m} - Err(e) => {return Poll::Ready(Err(e))} + Ok(m) => { m } + Err(e) => { return Poll::Ready(Err(e)); } }.expect("poll after future was done"); self.fut = Some(Box::pin(async move { @@ -232,13 +249,13 @@ impl Future for EmitMetadataWithResponse

{ fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { if self.fut.is_none() { - let mut emit_metadata = self.emit_metadata.take().expect("poll after future was done"); - let ctx = emit_metadata.event_metadata.as_ref().expect("poll before waiting for reply").ctx.clone().expect("poll before waiting for reply"); + let mut emit_metadata = poll_unwrap!(self.emit_metadata.take()); + let ctx = poll_unwrap!(emit_metadata.event_metadata.as_ref().and_then(|m| m.ctx.clone())); let timeout = self.timeout.clone().unwrap_or(ctx.default_reply_timeout.clone()); - let event_id = match emit_metadata.event_metadata.as_mut().expect("poll before waiting for reply").get_event() { - Ok(e) => {e.id()} - Err(e) => {return Poll::Ready(Err(e))} + let event_id = match poll_unwrap!(emit_metadata.event_metadata.as_mut()).get_event() { + Ok(e) => { e.id() } + Err(e) => { return Poll::Ready(Err(e)); } }; self.fut = Some(Box::pin(async move { diff --git a/src/macros.rs b/src/macros.rs index ff15f867..0107c03f 100644 --- a/src/macros.rs +++ b/src/macros.rs @@ -30,4 +30,4 @@ macro_rules! events{ $handler.on($name, callback!($cb)); )* } -} +} \ No newline at end of file