diff --git a/tests/utils/protocol.rs b/tests/utils/protocol.rs index 192aec51..329645de 100644 --- a/tests/utils/protocol.rs +++ b/tests/utils/protocol.rs @@ -104,7 +104,7 @@ pub struct TestProtocolStream { impl TestProtocolStream { /// Read from the receiver and remaining buffer async fn read_from_receiver( - buf: &mut ReadBuf<'static>, + buf: &mut ReadBuf<'_>, receiver: Arc>>>, remaining_buf: Arc>>, ) { @@ -133,7 +133,7 @@ impl TestProtocolStream { /// Read from the remaining buffer returning a boolean if the /// read buffer has been filled async fn read_from_remaining_buffer( - buf: &mut ReadBuf<'static>, + buf: &mut ReadBuf<'_>, remaining_buf: &mut Vec, ) -> bool { if remaining_buf.len() < buf.capacity() { @@ -180,70 +180,66 @@ impl AsyncProtocolStream for TestProtocolStream { impl AsyncRead for TestProtocolStream { fn poll_read( - self: Pin<&mut Self>, + mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll> { - unsafe { - // we need a mutable reference to access the inner future - let stream = self.get_unchecked_mut(); + if self.future.is_none() { + // we need to change the lifetime to be able to use the read buffer in the read future + let buf: &mut ReadBuf<'static> = unsafe { + // SAFETY: idk tbh + mem::transmute(buf) + }; + let receiver = Arc::clone(&self.receiver); + let remaining_buf = Arc::clone(&self.remaining_buf); - if stream.future.is_none() { - // we need to change the lifetime to be able to use the read buffer in the read future - let buf: &mut ReadBuf<'static> = mem::transmute(buf); - let receiver = Arc::clone(&stream.receiver); - let remaining_buf = Arc::clone(&stream.remaining_buf); - - let future = TestProtocolStream::read_from_receiver(buf, receiver, remaining_buf); - stream.future = Some(Box::pin(future)); - } - if let Some(future) = &mut stream.future { - match future.as_mut().poll(cx) { - Poll::Ready(_) => { - stream.future = None; - Poll::Ready(Ok(())) - } - Poll::Pending => Poll::Pending, + let future = TestProtocolStream::read_from_receiver(buf, receiver, remaining_buf); + self.future = Some(Box::pin(future)); + } + if let Some(future) = &mut self.future { + match future.as_mut().poll(cx) { + Poll::Ready(_) => { + self.future = None; + Poll::Ready(Ok(())) } - } else { - Poll::Pending + Poll::Pending => Poll::Pending, } + } else { + Poll::Pending } } } +impl Unpin for TestProtocolStream {} + impl AsyncWrite for TestProtocolStream { fn poll_write( - self: Pin<&mut Self>, + mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll> { let write_len = buf.len(); - unsafe { - // we need a mutable reference to access the inner future - let stream = self.get_unchecked_mut(); - if stream.future.is_none() { - // we take ownership here so that we don't need to change lifetimes here - let buf = buf.to_vec(); - let sender = stream.sender.clone(); + if self.future.is_none() { + // we take ownership here so that we don't need to change lifetimes here + let buf = buf.to_vec(); + let sender = self.sender.clone(); - let future = async move { - sender.send(buf).await.unwrap(); - }; - stream.future = Some(Box::pin(future)); - } - if let Some(future) = &mut stream.future { - match future.as_mut().poll(cx) { - Poll::Ready(_) => { - stream.future = None; - Poll::Ready(Ok(write_len)) - } - Poll::Pending => Poll::Pending, + let future = async move { + sender.send(buf).await.unwrap(); + }; + self.future = Some(Box::pin(future)); + } + if let Some(future) = &mut self.future { + match future.as_mut().poll(cx) { + Poll::Ready(_) => { + self.future = None; + Poll::Ready(Ok(write_len)) } - } else { - Poll::Pending + Poll::Pending => Poll::Pending, } + } else { + Poll::Pending } }