Merge pull request #2 from Trivernis/develop

Stuff
pull/3/head
Trivernis 4 years ago committed by GitHub
commit e457670d2b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -0,0 +1,33 @@
name: Build and Test
on:
push:
branches: [ main, develop ]
pull_request:
branches: [ main, develop ]
env:
CARGO_TERM_COLOR: always
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Cache build data
uses: actions/cache@v2
with:
path: |
target
~/.cargo/
key: ${{ runner.os }}-cargo-${{ hashFiles('Cargo.lock') }}
restore-keys: |
${{ runner.os }}-cargo-
- name: Build
run: cargo build --verbose
- name: Run tests
run: cargo test --verbose

@ -1,12 +1,12 @@
[package]
name = "rmp-ipc"
version = "0.1.1"
version = "0.2.0"
authors = ["trivernis <trivernis@protonmail.com>"]
edition = "2018"
readme = "README.md"
license = "Apache-2.0"
repository = "https://github.com/Trivernis/rmp-ipc"
description = "IPC using Rust Message Pack (rmp)"
description = "IPC using Rust MessagePack (rmp)"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
@ -14,6 +14,7 @@ description = "IPC using Rust Message Pack (rmp)"
thiserror = "1.0.24"
rmp-serde = "0.15.4"
log = "0.4.14"
lazy_static = "1.4.0"
[dependencies.serde]
version = "1.0.125"

@ -1,6 +1,6 @@
# rmp-ipc
Interprocess Communication via TCP using Rust Message Pack.
Interprocess Communication via TCP using Rust MessagePack.
## Usage

@ -1,4 +1,5 @@
use thiserror::Error;
use tokio::sync::oneshot;
pub type Result<T> = std::result::Result<T, Error>;
@ -18,6 +19,9 @@ pub enum Error {
#[error("{0}")]
Message(String),
#[error("Channel Error: {0}")]
ReceiveError(#[from] oneshot::error::RecvError),
}
impl From<String> for Error {

@ -1,4 +1,5 @@
use crate::error::Result;
use crate::events::generate_event_id;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use tokio::io::{AsyncRead, AsyncReadExt};
@ -8,14 +9,21 @@ use tokio::io::{AsyncRead, AsyncReadExt};
/// as raw binary data.
#[derive(Serialize, Deserialize)]
pub struct Event {
id: u64,
ref_id: Option<u64>,
name: String,
data: Vec<u8>,
}
impl Event {
/// Creates a new event
pub fn new(name: String, data: Vec<u8>) -> Self {
Self { name, data }
pub fn new(name: String, data: Vec<u8>, ref_id: Option<u64>) -> Self {
Self {
id: generate_event_id(),
ref_id,
name,
data,
}
}
/// Decodes the data to the given type
@ -57,4 +65,15 @@ impl Event {
Ok(event_bytes)
}
/// The identifier of the message
pub fn id(&self) -> u64 {
self.id
}
/// The ID of the message referenced by this message.
/// It represents the message that is replied to and can be None.
pub fn reference_id(&self) -> Option<u64> {
self.ref_id.clone()
}
}

@ -1,3 +1,19 @@
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
pub mod error_event;
pub mod event;
pub mod event_handler;
/// Generates a new event id
pub(crate) fn generate_event_id() -> u64 {
lazy_static::lazy_static! {
static ref COUNTER: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
}
let epoch_elapsed = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
(epoch_elapsed.as_secs() % u16::MAX as u64) << 48
| (epoch_elapsed.subsec_millis() as u64) << 32
| COUNTER.fetch_add(1, Ordering::SeqCst)
}

@ -5,15 +5,15 @@ use crate::events::event_handler::EventHandler;
use crate::ipc::client::IPCClient;
use crate::ipc::context::Context;
use crate::ipc::server::IPCServer;
use crate::ipc::stream_emitter::StreamEmitter;
use std::future::Future;
use std::pin::Pin;
#[derive(Clone)]
/// A builder for the IPC server or client.
/// ```rust
/// ```no_run
///use rmp_ipc::IPCBuilder;
///IPCBuilder::new()
///# async fn a() {
/// IPCBuilder::new()
/// .address("127.0.0.1:2020")
/// // register callback
/// .on("ping", |_ctx, _event| Box::pin(async move {
@ -22,6 +22,7 @@ use std::pin::Pin;
/// }))
/// // can also be build_client which would return an emitter for events
/// .build_server().await.unwrap();
///# }
/// ```
pub struct IPCBuilder {
handler: EventHandler,
@ -83,15 +84,15 @@ impl IPCBuilder {
}
/// Builds an ipc client
pub async fn build_client(self) -> Result<StreamEmitter> {
pub async fn build_client(self) -> Result<Context> {
self.validate()?;
let client = IPCClient {
handler: self.handler,
};
let emitter = client.connect(&self.address.unwrap()).await?;
let ctx = client.connect(&self.address.unwrap()).await?;
Ok(emitter)
Ok(ctx)
}
/// Validates that all required fields have been provided

@ -1,8 +1,9 @@
use super::handle_connection;
use crate::error::Result;
use crate::events::event::Event;
use crate::events::event_handler::EventHandler;
use crate::ipc::context::Context;
use crate::ipc::stream_emitter::StreamEmitter;
use std::sync::Arc;
use tokio::net::TcpStream;
/// The IPC Client to connect to an IPC Server.
@ -15,21 +16,20 @@ pub struct IPCClient {
impl IPCClient {
/// Connects to a given address and returns an emitter for events to that address.
/// Invoked by [IPCBuilder::build_client](crate::builder::IPCBuilder::build_client)
pub async fn connect(self, address: &str) -> Result<StreamEmitter> {
pub async fn connect(self, address: &str) -> Result<Context> {
let stream = TcpStream::connect(address).await?;
let (mut read_half, write_half) = stream.into_split();
let (read_half, write_half) = stream.into_split();
let emitter = StreamEmitter::new(write_half);
let ctx = Context::new(StreamEmitter::clone(&emitter));
let handler = self.handler;
let handler = Arc::new(self.handler);
tokio::spawn(async move {
while let Ok(event) = Event::from_async_read(&mut read_half).await {
if let Err(e) = handler.handle_event(&ctx, event).await {
log::error!("Failed to handle event: {:?}", e);
}
tokio::spawn({
let ctx = Context::clone(&ctx);
async move {
handle_connection(handler, read_half, ctx).await;
}
});
Ok(emitter)
Ok(ctx)
}
}

@ -1,4 +1,9 @@
use crate::error::Result;
use crate::ipc::stream_emitter::StreamEmitter;
use crate::Event;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::{oneshot, Mutex};
/// An object provided to each callback function.
/// Currently it only holds the event emitter to emit response events in event callbacks.
@ -14,13 +19,37 @@ use crate::ipc::stream_emitter::StreamEmitter;
/// Ok(())
/// }
/// ```
#[derive(Clone)]
pub struct Context {
/// The event emitter
pub emitter: StreamEmitter,
reply_listeners: Arc<Mutex<HashMap<u64, oneshot::Sender<Event>>>>,
}
impl Context {
pub(crate) fn new(emitter: StreamEmitter) -> Self {
Self { emitter }
Self {
emitter,
reply_listeners: Arc::new(Mutex::new(HashMap::new())),
}
}
/// Waits for a reply to the given message ID
pub async fn await_reply(&self, message_id: u64) -> Result<Event> {
let (rx, tx) = oneshot::channel();
{
let mut listeners = self.reply_listeners.lock().await;
listeners.insert(message_id, rx);
}
let event = tx.await?;
Ok(event)
}
/// Returns the channel for a reply to the given message id
pub(crate) async fn get_reply_sender(&self, ref_id: u64) -> Option<oneshot::Sender<Event>> {
let mut listeners = self.reply_listeners.lock().await;
listeners.remove(&ref_id)
}
}

@ -1,5 +1,54 @@
use crate::context::Context;
use crate::error_event::{ErrorEventData, ERROR_EVENT_NAME};
use crate::events::event_handler::EventHandler;
use crate::Event;
use std::sync::Arc;
use tokio::net::tcp::OwnedReadHalf;
pub mod builder;
pub mod client;
pub mod context;
pub mod server;
pub mod stream_emitter;
/// Handles listening to a connection and triggering the corresponding event functions
async fn handle_connection(handler: Arc<EventHandler>, mut read_half: OwnedReadHalf, ctx: Context) {
while let Ok(event) = Event::from_async_read(&mut read_half).await {
// check if the event is a reply
if let Some(ref_id) = event.reference_id() {
// get the listener for replies
if let Some(sender) = ctx.get_reply_sender(ref_id).await {
// try sending the event to the listener for replies
if let Err(event) = sender.send(event) {
handle_event(Context::clone(&ctx), Arc::clone(&handler), event);
}
continue;
}
}
handle_event(Context::clone(&ctx), Arc::clone(&handler), event);
}
log::debug!("Connection closed.");
}
/// Handles a single event in a different tokio context
fn handle_event(ctx: Context, handler: Arc<EventHandler>, event: Event) {
tokio::spawn(async move {
if let Err(e) = handler.handle_event(&ctx, event).await {
// emit an error event
if let Err(e) = ctx
.emitter
.emit(
ERROR_EVENT_NAME,
ErrorEventData {
message: format!("{:?}", e),
code: 500,
},
)
.await
{
log::error!("Error occurred when sending error response: {:?}", e);
}
log::error!("Failed to handle event: {:?}", e);
}
});
}

@ -1,11 +1,10 @@
use super::handle_connection;
use crate::error::Result;
use crate::events::error_event::{ErrorEventData, ERROR_EVENT_NAME};
use crate::events::event::Event;
use crate::events::event_handler::EventHandler;
use crate::ipc::context::Context;
use crate::ipc::stream_emitter::StreamEmitter;
use std::sync::Arc;
use tokio::net::{TcpListener, TcpStream};
use tokio::net::TcpListener;
/// The IPC Server listening for connections.
/// Use the [IPCBuilder](crate::builder::IPCBuilder) to create a server.
@ -22,41 +21,17 @@ impl IPCServer {
let handler = Arc::new(self.handler);
while let Ok((stream, _)) = listener.accept().await {
let handler = handler.clone();
let handler = Arc::clone(&handler);
tokio::spawn(async {
Self::handle_connection(handler, stream).await;
let (read_half, write_half) = stream.into_split();
let emitter = StreamEmitter::new(write_half);
let ctx = Context::new(StreamEmitter::clone(&emitter));
handle_connection(handler, read_half, ctx).await;
});
}
Ok(())
}
/// Handles a single tcp connection
async fn handle_connection(handler: Arc<EventHandler>, stream: TcpStream) {
let (mut read_half, write_half) = stream.into_split();
let emitter = StreamEmitter::new(write_half);
let ctx = Context::new(StreamEmitter::clone(&emitter));
while let Ok(event) = Event::from_async_read(&mut read_half).await {
if let Err(e) = handler.handle_event(&ctx, event).await {
// emit an error event
if emitter
.emit(
ERROR_EVENT_NAME,
ErrorEventData {
message: format!("{:?}", e),
code: 500,
},
)
.await
.is_err()
{
break;
}
log::error!("Failed to handle event: {:?}", e);
}
}
log::debug!("Connection closed.");
}
}

@ -1,3 +1,4 @@
use crate::context::Context;
use crate::error::Result;
use crate::events::event::Event;
use serde::Serialize;
@ -21,16 +22,62 @@ impl StreamEmitter {
}
}
/// Emits an event
pub async fn emit<T: Serialize>(&self, event: &str, data: T) -> Result<()> {
pub async fn _emit<T: Serialize>(
&self,
event: &str,
data: T,
res_id: Option<u64>,
) -> Result<EmitMetadata> {
let data_bytes = rmp_serde::to_vec(&data)?;
let event = Event::new(event.to_string(), data_bytes);
let event = Event::new(event.to_string(), data_bytes, res_id);
let event_bytes = event.to_bytes()?;
{
let mut stream = self.stream.lock().await;
(*stream).write_all(&event_bytes[..]).await?;
}
Ok(())
Ok(EmitMetadata::new(event.id()))
}
/// Emits an event
pub async fn emit<T: Serialize>(&self, event: &str, data: T) -> Result<EmitMetadata> {
let metadata = self._emit(event, data, None).await?;
Ok(metadata)
}
/// Emits a response to an event
pub async fn emit_response<T: Serialize>(
&self,
event_id: u64,
event: &str,
data: T,
) -> Result<EmitMetadata> {
let metadata = self._emit(event, data, Some(event_id)).await?;
Ok(metadata)
}
}
/// A metadata object returned after emitting an event.
/// This object can be used to wait for a response to an event.
pub struct EmitMetadata {
message_id: u64,
}
impl EmitMetadata {
pub(crate) fn new(message_id: u64) -> Self {
Self { message_id }
}
/// The ID of the emitted message
pub fn message_id(&self) -> u64 {
self.message_id
}
/// Waits for a reply to the given message.
pub async fn await_reply(&self, ctx: &Context) -> Result<Event> {
let reply = ctx.await_reply(self.message_id).await?;
Ok(reply)
}
}

@ -1,34 +1,42 @@
//! This project provides an ipc server and client implementation using
//! messagepack. All calls are asynchronous and event based.
//! Client Example:
//! ```rust
//! ```no_run
//! use rmp_ipc::IPCBuilder;
//! // create the client
//! let emitter = IPCBuilder::new()
//! # async fn a() {
//!
//! let ctx = IPCBuilder::new()
//! .address("127.0.0.1:2020")
//! // register callback
//! .on("ping", |_ctx, _event| Box::pin(async move {
//! .on("ping", |ctx, event| Box::pin(async move {
//! println!("Received ping event.");
//! ctx.emitter.emit_response(event.id(), "pong", ()).await?;
//! Ok(())
//! }))
//! .build_client().await.unwrap();
//!
//! // emit an initial event
//! emitter.emit("ping", ()).await?;
//! let response = ctx.emitter.emit("ping", ()).await.unwrap().await_reply(&ctx).await.unwrap();
//! assert_eq!(response.name(), "pong");
//! # }
//! ```
//!
//! Server Example:
//! ```rust
//! ```no_run
//! use rmp_ipc::IPCBuilder;
//! // create the server
//!# async fn a() {
//! IPCBuilder::new()
//! .address("127.0.0.1:2020")
//! // register callback
//! .on("ping", |_ctx, _event| Box::pin(async move {
//! .on("ping", |ctx, event| Box::pin(async move {
//! println!("Received ping event.");
//! ctx.emitter.emit_response(event.id(), "pong", ()).await?;
//! Ok(())
//! }))
//! .build_server().await.unwrap();
//! # }
//! ```
#[cfg(test)]

@ -0,0 +1,12 @@
use crate::events::generate_event_id;
use std::collections::HashSet;
#[test]
fn event_ids_work() {
let mut ids = HashSet::new();
// simple collision test
for _ in 0..100000 {
assert!(ids.insert(generate_event_id()))
}
}

@ -2,26 +2,22 @@ use self::super::utils::PingEventData;
use crate::error::Error;
use crate::events::error_event::ErrorEventData;
use crate::IPCBuilder;
use std::sync::atomic::{AtomicBool, AtomicU8, Ordering};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::{Duration, SystemTime};
#[tokio::test]
async fn it_receives_events() {
let ctr = Arc::new(AtomicU8::new(0));
let builder = IPCBuilder::new()
.on("ping", {
let ctr = Arc::clone(&ctr);
move |ctx, e| {
let ctr = Arc::clone(&ctr);
Box::pin(async move {
ctr.fetch_add(1, Ordering::Relaxed);
let mut ping_data = e.data::<PingEventData>()?;
ping_data.time = SystemTime::now();
ping_data.ttl -= 1;
if ping_data.ttl > 0 {
ctx.emitter.emit("ping", ping_data).await?;
ctx.emitter.emit_response(e.id(), "pong", ping_data).await?;
}
Ok(())
@ -41,8 +37,9 @@ async fn it_receives_events() {
while !server_running.load(Ordering::Relaxed) {
tokio::time::sleep(Duration::from_millis(10)).await;
}
let client = builder.build_client().await.unwrap();
client
let ctx = builder.build_client().await.unwrap();
let reply = ctx
.emitter
.emit(
"ping",
PingEventData {
@ -51,9 +48,11 @@ async fn it_receives_events() {
},
)
.await
.unwrap()
.await_reply(&ctx)
.await
.unwrap();
tokio::time::sleep(Duration::from_secs(1)).await;
assert_eq!(ctr.load(Ordering::SeqCst), 16);
assert_eq!(reply.name(), "pong");
}
#[tokio::test]
@ -91,8 +90,8 @@ async fn it_handles_errors() {
while !server_running.load(Ordering::Relaxed) {
tokio::time::sleep(Duration::from_millis(10)).await;
}
let client = builder.build_client().await.unwrap();
client.emit("ping", ()).await.unwrap();
let ctx = builder.build_client().await.unwrap();
ctx.emitter.emit("ping", ()).await.unwrap();
tokio::time::sleep(Duration::from_secs(1)).await;
assert!(error_occurred.load(Ordering::SeqCst));

@ -1,2 +1,3 @@
mod ipc_tests;
mod utils;
mod event_tests;

Loading…
Cancel
Save