diff --git a/src/ipc/builder.rs b/src/ipc/builder.rs index 524bd109..856d531f 100644 --- a/src/ipc/builder.rs +++ b/src/ipc/builder.rs @@ -50,6 +50,7 @@ use typemap_rev::{TypeMap, TypeMapKey}; /// .build_server().await.unwrap(); ///# } /// ``` +/// pub struct IPCBuilder { handler: EventHandler, address: Option, diff --git a/tests/test_protocol.rs b/tests/test_protocol.rs new file mode 100644 index 00000000..1263f7ee --- /dev/null +++ b/tests/test_protocol.rs @@ -0,0 +1,151 @@ +use async_trait::async_trait; +use bromine::error::Result; +use bromine::prelude::{AsyncProtocolStreamSplit, IPCError}; +use bromine::protocol::{AsyncProtocolStream, AsyncStreamProtocolListener}; +use lazy_static::lazy_static; +use std::collections::HashMap; +use std::io::Error; +use std::pin::Pin; +use std::sync::mpsc; +use std::sync::mpsc::{Receiver, Sender}; +use std::sync::Arc; +use std::sync::Mutex; +use std::task::{Context, Poll}; +use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; +use tokio::sync::mpsc::{ + channel as async_channel, Receiver as AsyncReceiver, Sender as AsyncSender, +}; +use tokio::sync::Mutex as AsyncMutex; + +lazy_static! { + static ref LISTENERS_REF: Arc>>> = + Arc::new(AsyncMutex::new(HashMap::new())); +} + +async fn add_port(number: u8, sender: tokio::sync::mpsc::Sender) { + let mut listeners = LISTENERS_REF.lock().await; + listeners.insert(number, sender); +} + +async fn get_port(number: u8) -> Option { + let mut listeners = LISTENERS_REF.lock().await; + + if let Some(sender) = listeners.get_mut(&number) { + let (s1, r1) = mpsc::channel(); + let (s2, r2) = mpsc::channel(); + let stream_1 = TestProtocolStream { + sender: Arc::new(Mutex::new(s1)), + receiver: Arc::new(Mutex::new(r2)), + }; + let stream_2 = TestProtocolStream { + sender: Arc::new(Mutex::new(s2)), + receiver: Arc::new(Mutex::new(r1)), + }; + sender.send(stream_2).await.ok(); + + Some(stream_1) + } else { + None + } +} + +pub struct TestProtocolListener { + receiver: Arc>>, +} + +#[async_trait] +impl AsyncStreamProtocolListener for TestProtocolListener { + type AddressType = u8; + type RemoteAddressType = u8; + type Stream = TestProtocolStream; + + async fn protocol_bind(address: Self::AddressType) -> Result { + let (sender, receiver) = async_channel(1); + add_port(address, sender).await; + + Ok(Self { + receiver: Arc::new(AsyncMutex::new(receiver)), + }) + } + + async fn protocol_accept(&self) -> Result<(Self::Stream, Self::RemoteAddressType)> { + self.receiver + .lock() + .await + .recv() + .await + .map(|r| (r, 0u8)) + .ok_or_else(|| IPCError::from("Failed to accept")) + } +} + +#[derive(Clone)] +pub struct TestProtocolStream { + sender: Arc>>>, + receiver: Arc>>>, +} + +impl AsyncProtocolStreamSplit for TestProtocolStream { + type OwnedSplitReadHalf = Self; + type OwnedSplitWriteHalf = Self; + + fn protocol_into_split(self) -> (Self::OwnedSplitReadHalf, Self::OwnedSplitWriteHalf) { + (self.clone(), self) + } +} + +#[async_trait] +impl AsyncProtocolStream for TestProtocolStream { + type AddressType = u8; + + async fn protocol_connect(address: Self::AddressType) -> Result { + get_port(address) + .await + .ok_or_else(|| IPCError::from("Failed to connect")) + } +} + +impl AsyncRead for TestProtocolStream { + fn poll_read( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + let receiver = self.receiver.lock().unwrap(); + if let Ok(b) = receiver.recv() { + buf.put_slice(&b); + Poll::Ready(Ok(())) + } else { + Poll::Pending + } + } +} + +impl AsyncWrite for TestProtocolStream { + fn poll_write( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + let sender = self.sender.lock().unwrap(); + let vec_buf = buf.to_vec(); + let buf_len = vec_buf.len(); + sender.send(vec_buf).unwrap(); + + Poll::Ready(Ok(buf_len)) + } + + fn poll_flush( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll> { + Poll::Ready(Ok(())) + } + + fn poll_shutdown( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll> { + Poll::Ready(Ok(())) + } +} diff --git a/tests/test_raw_events.rs b/tests/test_raw_events.rs new file mode 100644 index 00000000..eeefd8b8 --- /dev/null +++ b/tests/test_raw_events.rs @@ -0,0 +1,45 @@ +mod test_protocol; + +use bromine::prelude::*; +use std::time::Duration; +use test_protocol::*; + +async fn handle_ping_event(ctx: &Context, event: Event) -> IPCResult<()> { + ctx.emitter.emit_response(event.id(), "pong", ()).await?; + + Ok(()) +} + +async fn handle_pong_event(_ctx: &Context, _event: Event) -> IPCResult<()> { + Ok(()) +} + +fn get_builder(port: u8) -> IPCBuilder { + IPCBuilder::new() + .address(port) + .on( + "ping", + callback!( + ctx, + event, + async move { handle_ping_event(ctx, event).await } + ), + ) + .timeout(Duration::from_millis(100)) + .on( + "pong", + callback!( + ctx, + event, + async move { handle_pong_event(ctx, event).await } + ), + ) +} + +#[tokio::test] +async fn it_passes_events() { + tokio::task::spawn(async { get_builder(0).build_server().await.unwrap() }); + tokio::time::sleep(Duration::from_millis(100)).await; + let ctx = get_builder(0).build_client().await.unwrap(); + ctx.emitter.emit("ping", ()).await.unwrap(); // todo fix reply deadlock +}