Add tests for payloads

Signed-off-by: trivernis <trivernis@protonmail.com>
pull/25/head
trivernis 3 years ago
parent 5b54140011
commit 38fb1ee16a
Signed by: Trivernis
GPG Key ID: DFFFCC2C7A02DB45

2
Cargo.lock generated

@ -38,7 +38,7 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]]
name = "bromine"
version = "0.13.0"
version = "0.14.0"
dependencies = [
"async-trait",
"byteorder",

@ -1,6 +1,6 @@
[package]
name = "bromine"
version = "0.13.0"
version = "0.14.0"
authors = ["trivernis <trivernis@protonmail.com>"]
edition = "2018"
readme = "README.md"
@ -55,5 +55,5 @@ version = "1.12.0"
features = ["macros", "rt-multi-thread"]
[features]
default = ["messagepack"]
default = []
messagepack = ["serde", "rmp-serde"]

@ -101,9 +101,6 @@
//! # }
//! ```
#[cfg(test)]
mod tests;
pub mod error;
mod events;
pub mod ipc;

@ -1,12 +0,0 @@
use crate::events::generate_event_id;
use std::collections::HashSet;
#[test]
fn event_ids_work() {
let mut ids = HashSet::new();
// simple collision test
for _ in 0..100000 {
assert!(ids.insert(generate_event_id()))
}
}

@ -1,235 +0,0 @@
use super::utils::PingEventData;
use crate::prelude::*;
use crate::tests::utils::start_test_server;
use std::net::ToSocketAddrs;
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use tokio::net::TcpListener;
use typemap_rev::TypeMapKey;
async fn handle_ping_event(ctx: &Context, e: Event) -> IPCResult<()> {
tokio::time::sleep(Duration::from_secs(1)).await;
let mut ping_data = e.data::<PingEventData>()?;
ping_data.time = SystemTime::now();
ping_data.ttl -= 1;
if ping_data.ttl > 0 {
ctx.emitter.emit_response(e.id(), "pong", ping_data).await?;
}
Ok(())
}
fn get_builder_with_ping<L: AsyncStreamProtocolListener>(address: L::AddressType) -> IPCBuilder<L> {
IPCBuilder::new()
.on("ping", |ctx, e| Box::pin(handle_ping_event(ctx, e)))
.timeout(Duration::from_secs(10))
.address(address)
}
#[tokio::test]
async fn it_receives_tcp_events() {
let socket_address = "127.0.0.1:8281".to_socket_addrs().unwrap().next().unwrap();
it_receives_events::<TcpListener>(socket_address).await;
}
#[cfg(unix)]
#[tokio::test]
async fn it_receives_unix_socket_events() {
let socket_path = PathBuf::from("/tmp/test_socket");
if socket_path.exists() {
std::fs::remove_file(&socket_path).unwrap();
}
it_receives_events::<tokio::net::UnixListener>(socket_path).await;
}
async fn it_receives_events<L: 'static + AsyncStreamProtocolListener>(address: L::AddressType) {
let builder = get_builder_with_ping::<L>(address.clone());
let server_running = Arc::new(AtomicBool::new(false));
tokio::spawn({
let server_running = Arc::clone(&server_running);
let builder = get_builder_with_ping::<L>(address);
async move {
server_running.store(true, Ordering::SeqCst);
builder.build_server().await.unwrap();
}
});
while !server_running.load(Ordering::Relaxed) {
tokio::time::sleep(Duration::from_millis(10)).await;
}
let pool = builder.build_pooled_client(8).await.unwrap();
let reply = pool
.acquire()
.emitter
.emit(
"ping",
PingEventData {
ttl: 16,
time: SystemTime::now(),
},
)
.await
.unwrap()
.await_reply(&pool.acquire())
.await
.unwrap();
assert_eq!(reply.name(), "pong");
}
fn get_builder_with_ping_namespace(address: &str) -> IPCBuilder<TcpListener> {
IPCBuilder::new()
.namespace("mainspace")
.on("ping", callback!(handle_ping_event))
.build()
.address(address.to_socket_addrs().unwrap().next().unwrap())
}
pub struct TestNamespace;
impl TestNamespace {
async fn ping(_c: &Context, _e: Event) -> IPCResult<()> {
println!("Ping received");
Ok(())
}
}
impl NamespaceProvider for TestNamespace {
fn name() -> &'static str {
"Test"
}
fn register(handler: &mut EventHandler) {
events!(handler,
"ping" => Self::ping,
"ping2" => Self::ping
);
}
}
#[tokio::test]
async fn it_receives_namespaced_events() {
let builder = get_builder_with_ping_namespace("127.0.0.1:8282");
let server_running = Arc::new(AtomicBool::new(false));
tokio::spawn({
let server_running = Arc::clone(&server_running);
let builder = get_builder_with_ping_namespace("127.0.0.1:8282");
async move {
server_running.store(true, Ordering::SeqCst);
builder.build_server().await.unwrap();
}
});
while !server_running.load(Ordering::Relaxed) {
tokio::time::sleep(Duration::from_millis(10)).await;
}
let ctx = builder
.add_namespace(namespace!(TestNamespace))
.build_client()
.await
.unwrap();
let reply = ctx
.emitter
.emit_to(
"mainspace",
"ping",
PingEventData {
ttl: 16,
time: SystemTime::now(),
},
)
.await
.unwrap()
.await_reply(&ctx)
.await
.unwrap();
assert_eq!(reply.name(), "pong");
}
struct ErrorOccurredKey;
impl TypeMapKey for ErrorOccurredKey {
type Value = Arc<AtomicBool>;
}
fn get_builder_with_error_handling(
error_occurred: Arc<AtomicBool>,
address: &str,
) -> IPCBuilder<TcpListener> {
IPCBuilder::new()
.insert::<ErrorOccurredKey>(error_occurred)
.on("ping", move |_, _| {
Box::pin(async move { Err(IPCError::from("ERRROROROROR")) })
})
.on(
"error",
callback!(ctx, event, async move {
let error = event.data::<error_event::ErrorEventData>()?;
assert!(error.message.len() > 0);
assert_eq!(error.code, 500);
{
let data = ctx.data.read().await;
let error_occurred = data.get::<ErrorOccurredKey>().unwrap();
error_occurred.store(true, Ordering::SeqCst);
}
Ok(())
}),
)
.address(address.to_socket_addrs().unwrap().next().unwrap())
}
#[tokio::test]
async fn it_handles_errors() {
let error_occurred = Arc::new(AtomicBool::new(false));
let builder = get_builder_with_error_handling(Arc::clone(&error_occurred), "127.0.0.1:8283");
let server_running = Arc::new(AtomicBool::new(false));
tokio::spawn({
let server_running = Arc::clone(&server_running);
let error_occurred = Arc::clone(&error_occurred);
let builder = get_builder_with_error_handling(error_occurred, "127.0.0.1:8283");
async move {
server_running.store(true, Ordering::SeqCst);
builder.build_server().await.unwrap();
}
});
while !server_running.load(Ordering::Relaxed) {
tokio::time::sleep(Duration::from_millis(10)).await;
}
let ctx = builder.build_client().await.unwrap();
ctx.emitter.emit("ping", ()).await.unwrap();
tokio::time::sleep(Duration::from_secs(1)).await;
assert!(error_occurred.load(Ordering::SeqCst));
}
#[tokio::test]
async fn test_error_responses() {
static ADDRESS: &str = "127.0.0.1:8284";
start_test_server(ADDRESS).await.unwrap();
let ctx = IPCBuilder::<TcpListener>::new()
.address(ADDRESS.to_socket_addrs().unwrap().next().unwrap())
.build_client()
.await
.unwrap();
let reply = ctx
.emitter
.emit("ping", ())
.await
.unwrap()
.await_reply(&ctx)
.await
.unwrap();
assert_eq!(reply.name(), "pong");
let reply = ctx
.emitter
.emit("trigger_error", ())
.await
.unwrap()
.await_reply(&ctx)
.await;
assert!(reply.is_err());
}

@ -1,3 +0,0 @@
mod event_tests;
mod ipc_tests;
mod utils;

@ -1,37 +0,0 @@
use crate::error::Error;
use crate::IPCBuilder;
use serde::{Deserialize, Serialize};
use std::net::ToSocketAddrs;
use std::time::SystemTime;
use tokio::net::TcpListener;
use tokio::sync::oneshot;
#[derive(Clone, Serialize, Deserialize, Debug)]
pub struct PingEventData {
pub time: SystemTime,
pub ttl: u8,
}
/// Starts a test IPC server
pub fn start_test_server(address: &'static str) -> oneshot::Receiver<bool> {
let (tx, rx) = oneshot::channel();
tokio::task::spawn(async move {
tx.send(true).unwrap();
IPCBuilder::<TcpListener>::new()
.address(address.to_socket_addrs().unwrap().next().unwrap())
.on("ping", |ctx, event| {
Box::pin(async move {
ctx.emitter.emit_response(event.id(), "pong", ()).await?;
Ok(())
})
})
.on("trigger_error", |_, _| {
Box::pin(async move { Err(Error::from("An error occurred.")) })
})
.build_server()
.await
.unwrap();
});
rx
}

@ -0,0 +1,141 @@
mod utils;
use crate::utils::start_server_and_client;
use bromine::prelude::*;
use payload_impl::SimplePayload;
use std::time::Duration;
use utils::call_counter::*;
use utils::get_free_port;
use utils::protocol::*;
#[tokio::test]
async fn it_sends_payloads() {
let port = get_free_port();
let ctx = get_client_with_server(port).await;
ctx.emitter
.emit(
"ping",
SimplePayload {
number: 0,
string: String::from("Hello World"),
},
)
.await
.unwrap();
// wait for the event to be handled
tokio::time::sleep(Duration::from_millis(10)).await;
let counters = get_counter_from_context(&ctx).await;
assert_eq!(counters.get("ping").await, 1);
assert_eq!(counters.get("pong").await, 1);
}
#[tokio::test]
async fn it_receives_payloads() {
let port = get_free_port();
let ctx = get_client_with_server(port).await;
let reply = ctx
.emitter
.emit(
"ping",
SimplePayload {
number: 0,
string: String::from("Hello World"),
},
)
.await
.unwrap()
.await_reply(&ctx)
.await
.unwrap();
let reply_payload = reply.data::<SimplePayload>().unwrap();
let counters = get_counter_from_context(&ctx).await;
assert_eq!(counters.get("ping").await, 1);
assert_eq!(reply_payload.string, String::from("Hello World"));
assert_eq!(reply_payload.number, 0);
}
async fn get_client_with_server(port: u8) -> Context {
start_server_and_client(move || get_builder(port)).await
}
fn get_builder(port: u8) -> IPCBuilder<TestProtocolListener> {
IPCBuilder::new()
.address(port)
.on("ping", callback!(handle_ping_event))
.on("pong", callback!(handle_pong_event))
.timeout(Duration::from_millis(10))
}
async fn handle_ping_event(ctx: &Context, event: Event) -> IPCResult<()> {
increment_counter_for_event(ctx, &event).await;
let payload = event.data::<SimplePayload>()?;
ctx.emitter
.emit_response(event.id(), "pong", payload)
.await?;
Ok(())
}
async fn handle_pong_event(ctx: &Context, event: Event) -> IPCResult<()> {
increment_counter_for_event(ctx, &event).await;
let _payload = event.data::<SimplePayload>()?;
Ok(())
}
#[cfg(feature = "messagepack")]
mod payload_impl {
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize)]
pub struct SimplePayload {
pub string: String,
pub number: u32,
}
}
#[cfg(not(feature = "messagepack"))]
mod payload_impl {
use bromine::error::Result;
use bromine::payload::{EventReceivePayload, EventSendPayload};
use bromine::prelude::IPCResult;
use byteorder::{BigEndian, ReadBytesExt};
use std::io::Read;
pub struct SimplePayload {
pub string: String,
pub number: u32,
}
impl EventSendPayload for SimplePayload {
fn to_payload_bytes(self) -> IPCResult<Vec<u8>> {
let mut buf = Vec::new();
let string_length = self.string.len() as u16;
let string_length_bytes = string_length.to_be_bytes();
buf.append(&mut string_length_bytes.to_vec());
let mut string_bytes = self.string.into_bytes();
buf.append(&mut string_bytes);
let num_bytes = self.number.to_be_bytes();
buf.append(&mut num_bytes.to_vec());
Ok(buf)
}
}
impl EventReceivePayload for SimplePayload {
fn from_payload_bytes<R: Read>(mut reader: R) -> Result<Self> {
let string_length = reader.read_u16::<BigEndian>()?;
let mut string_buf = vec![0u8; string_length as usize];
reader.read_exact(&mut string_buf)?;
let string = String::from_utf8(string_buf).unwrap();
let number = reader.read_u32::<BigEndian>()?;
Ok(Self { string, number })
}
}
}

@ -13,7 +13,7 @@ use utils::protocol::*;
async fn it_sends_events() {
let port = get_free_port();
let ctx = get_client_with_server(port).await;
ctx.emitter.emit("ping", ()).await.unwrap();
ctx.emitter.emit("ping", EmptyPayload).await.unwrap();
// allow the event to be processed
tokio::time::sleep(Duration::from_millis(10)).await;
@ -28,8 +28,14 @@ async fn it_sends_events() {
async fn it_sends_namespaced_events() {
let port = get_free_port();
let ctx = get_client_with_server(port).await;
ctx.emitter.emit_to("test", "ping", ()).await.unwrap();
ctx.emitter.emit_to("test", "pong", ()).await.unwrap();
ctx.emitter
.emit_to("test", "ping", EmptyPayload)
.await
.unwrap();
ctx.emitter
.emit_to("test", "pong", EmptyPayload)
.await
.unwrap();
// allow the event to be processed
tokio::time::sleep(Duration::from_millis(10)).await;
@ -47,7 +53,7 @@ async fn it_receives_responses() {
let ctx = get_client_with_server(port).await;
let reply = ctx
.emitter
.emit("ping", ())
.emit("ping", EmptyPayload)
.await
.unwrap()
.await_reply(&ctx)
@ -66,7 +72,10 @@ async fn it_receives_responses() {
async fn it_handles_errors() {
let port = get_free_port();
let ctx = get_client_with_server(port).await;
ctx.emitter.emit("create_error", ()).await.unwrap();
ctx.emitter
.emit("create_error", EmptyPayload)
.await
.unwrap();
// allow the event to be processed
tokio::time::sleep(Duration::from_millis(10)).await;
let counter = get_counter_from_context(&ctx).await;
@ -82,7 +91,7 @@ async fn it_receives_error_responses() {
let ctx = get_client_with_server(port).await;
let result = ctx
.emitter
.emit("create_error", ())
.emit("create_error", EmptyPayload)
.await
.unwrap()
.await_reply(&ctx)
@ -115,7 +124,9 @@ fn get_builder(port: u8) -> IPCBuilder<TestProtocolListener> {
async fn handle_ping_event(ctx: &Context, event: Event) -> IPCResult<()> {
increment_counter_for_event(ctx, &event).await;
ctx.emitter.emit_response(event.id(), "pong", ()).await?;
ctx.emitter
.emit_response(event.id(), "pong", EmptyPayload)
.await?;
Ok(())
}
@ -137,3 +148,11 @@ async fn handle_error_event(ctx: &Context, event: Event) -> IPCResult<()> {
Ok(())
}
pub struct EmptyPayload;
impl EventSendPayload for EmptyPayload {
fn to_payload_bytes(self) -> IPCResult<Vec<u8>> {
Ok(vec![])
}
}

Loading…
Cancel
Save