Add most ipc stuff

Signed-off-by: trivernis <trivernis@protonmail.com>
pull/1/head
trivernis 4 years ago
commit 5792c6757d
Signed by: Trivernis
GPG Key ID: DFFFCC2C7A02DB45

3
.gitignore vendored

@ -0,0 +1,3 @@
/target
Cargo.lock
.idea

@ -0,0 +1,25 @@
[package]
name = "rmp-ipc"
version = "0.1.0"
authors = ["trivernis <trivernis@protonmail.com>"]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
thiserror = "1.0.24"
rmp-serde = "0.15.4"
log = "0.4.14"
[dependencies.serde]
version = "1.0.125"
features = ["serde_derive"]
[dependencies.tokio]
version = "1.5.0"
features = ["net", "io-std", "io-util", "sync", "time"]
[dev-dependencies.tokio]
version = "1.5.0"
features = ["macros", "rt-multi-thread"]

@ -0,0 +1,33 @@
use thiserror::Error;
pub type Result<T> = std::result::Result<T, Error>;
#[derive(Error, Debug)]
pub enum Error {
#[error(transparent)]
IoError(#[from] tokio::io::Error),
#[error(transparent)]
Decode(#[from] rmp_serde::decode::Error),
#[error(transparent)]
Encode(#[from] rmp_serde::encode::Error),
#[error("Build Error: {0}")]
BuildError(String),
#[error("{0}")]
Message(String),
}
impl From<String> for Error {
fn from(s: String) -> Self {
Error::Message(s)
}
}
impl From<&str> for Error {
fn from(s: &str) -> Self {
Error::Message(s.to_string())
}
}

@ -0,0 +1,60 @@
use crate::error::Result;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use tokio::io::{AsyncRead, AsyncReadExt};
/// A container representing an event and underlying binary data.
/// The data can be decoded into an object representation or read
/// as raw binary data.
#[derive(Serialize, Deserialize)]
pub struct Event {
name: String,
data: Vec<u8>,
}
impl Event {
/// Creates a new event
pub fn new(name: String, data: Vec<u8>) -> Self {
Self { name, data }
}
/// Decodes the data to the given type
pub fn data<T: DeserializeOwned>(&self) -> Result<T> {
let data = rmp_serde::from_read(&self.data[..])?;
Ok(data)
}
/// Returns a reference of the underlying data
pub fn data_raw(&self) -> &[u8] {
&self.data
}
/// Returns the name of the event
pub fn name(&self) -> &str {
&self.name
}
/// Reads an event message
pub async fn from_async_read<R: AsyncRead + Unpin>(reader: &mut R) -> Result<Self> {
let length = reader.read_u32().await?;
let mut data = vec![0u8; length as usize];
reader.read_exact(&mut data).await?;
let event = rmp_serde::from_read(&data[..])?;
Ok(event)
}
/// Encodes the event into bytes
pub fn to_bytes(&self) -> Result<Vec<u8>> {
let mut event_bytes = rmp_serde::to_vec(&self)?;
let mut length_bytes = (event_bytes.len() as u32).to_be_bytes().to_vec();
length_bytes.reverse();
for byte in length_bytes {
event_bytes.insert(0, byte);
}
Ok(event_bytes)
}
}

@ -0,0 +1,50 @@
use crate::error::Result;
use crate::events::event::Event;
use crate::ipc::context::Context;
use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
type EventCallback = Arc<
dyn for<'a> Fn(&'a Context, Event) -> Pin<Box<(dyn Future<Output = Result<()>> + Send + 'a)>>
+ Send
+ Sync,
>;
/// Handler for events
#[derive(Clone)]
pub struct EventHandler {
callbacks: HashMap<String, EventCallback>,
}
impl EventHandler {
/// Creates a new event handler
pub fn new() -> Self {
Self {
callbacks: HashMap::new(),
}
}
/// Adds a new event callback
pub fn on<F: 'static>(&mut self, name: &str, callback: F)
where
F: for<'a> Fn(
&'a Context,
Event,
) -> Pin<Box<(dyn Future<Output = Result<()>> + Send + 'a)>>
+ Send
+ Sync,
{
self.callbacks.insert(name.to_string(), Arc::new(callback));
}
/// Handles a received event
pub async fn handle_event(&self, ctx: &Context, event: Event) -> Result<()> {
if let Some(cb) = self.callbacks.get(event.name()) {
cb.as_ref()(ctx, event).await?;
}
Ok(())
}
}

@ -0,0 +1,2 @@
pub mod event;
pub mod event_handler;

@ -0,0 +1,78 @@
use crate::error::{Error, Result};
use crate::events::event::Event;
use crate::events::event_handler::EventHandler;
use crate::ipc::client::IPCClient;
use crate::ipc::context::Context;
use crate::ipc::server::IPCServer;
use crate::ipc::stream_emitter::StreamEmitter;
use std::future::Future;
use std::pin::Pin;
#[derive(Clone)]
pub struct IPCBuilder {
handler: EventHandler,
address: Option<String>,
}
impl IPCBuilder {
pub fn new() -> Self {
Self {
handler: EventHandler::new(),
address: None,
}
}
/// Adds an event callback
pub fn on<F: 'static>(mut self, event: &str, callback: F) -> Self
where
F: for<'a> Fn(
&'a Context,
Event,
) -> Pin<Box<(dyn Future<Output = Result<()>> + Send + 'a)>>
+ Send
+ Sync,
{
self.handler.on(event, callback);
self
}
/// Adds the address to connect to
pub fn address<S: ToString>(mut self, address: S) -> Self {
self.address = Some(address.to_string());
self
}
/// Builds an ipc server
pub async fn build_server(self) -> Result<()> {
self.validate()?;
let server = IPCServer {
handler: self.handler,
};
server.start(&self.address.unwrap()).await?;
Ok(())
}
/// Builds an ipc client
pub async fn build_client(self) -> Result<StreamEmitter> {
self.validate()?;
let client = IPCClient {
handler: self.handler,
};
let emitter = client.connect(&self.address.unwrap()).await?;
Ok(emitter)
}
/// Validates that all required fields have been provided
fn validate(&self) -> Result<()> {
if self.address.is_none() {
Err(Error::BuildError("Missing Address".to_string()))
} else {
Ok(())
}
}
}

@ -0,0 +1,31 @@
use crate::error::Result;
use crate::events::event::Event;
use crate::events::event_handler::EventHandler;
use crate::ipc::context::Context;
use crate::ipc::stream_emitter::StreamEmitter;
use tokio::net::TcpStream;
pub struct IPCClient {
pub(crate) handler: EventHandler,
}
impl IPCClient {
/// Connects to a given address and returns an emitter for events to that address
pub async fn connect(self, address: &str) -> Result<StreamEmitter> {
let stream = TcpStream::connect(address).await?;
let (mut read_half, write_half) = stream.into_split();
let emitter = StreamEmitter::new(write_half);
let ctx = Context::new(StreamEmitter::clone(&emitter));
let handler = self.handler;
tokio::spawn(async move {
while let Ok(event) = Event::from_async_read(&mut read_half).await {
if let Err(e) = handler.handle_event(&ctx, event).await {
log::error!("Failed to handle event: {:?}", e);
}
}
});
Ok(emitter)
}
}

@ -0,0 +1,11 @@
use crate::ipc::stream_emitter::StreamEmitter;
pub struct Context {
pub emitter: StreamEmitter,
}
impl Context {
pub fn new(emitter: StreamEmitter) -> Self {
Self { emitter }
}
}

@ -0,0 +1,5 @@
pub mod builder;
pub mod client;
pub mod context;
pub mod server;
pub mod stream_emitter;

@ -0,0 +1,42 @@
use crate::error::Result;
use crate::events::event::Event;
use crate::events::event_handler::EventHandler;
use crate::ipc::context::Context;
use crate::ipc::stream_emitter::StreamEmitter;
use std::sync::Arc;
use tokio::net::{TcpListener, TcpStream};
pub struct IPCServer {
pub(crate) handler: EventHandler,
}
impl IPCServer {
pub async fn start(self, address: &str) -> Result<()> {
let listener = TcpListener::bind(address).await?;
let handler = Arc::new(self.handler);
while let Ok((stream, _)) = listener.accept().await {
let handler = handler.clone();
tokio::spawn(async {
Self::handle_connection(handler, stream).await;
});
}
Ok(())
}
/// Handles a single tcp connection
pub async fn handle_connection(handler: Arc<EventHandler>, stream: TcpStream) {
let (mut read_half, write_half) = stream.into_split();
let emitter = StreamEmitter::new(write_half);
let ctx = Context::new(StreamEmitter::clone(&emitter));
while let Ok(event) = Event::from_async_read(&mut read_half).await {
if let Err(e) = handler.handle_event(&ctx, event).await {
log::error!("Failed to handle event: {:?}", e);
}
}
log::debug!("Connection closed.");
}
}

@ -0,0 +1,36 @@
use crate::error::Result;
use crate::events::event::Event;
use serde::Serialize;
use std::sync::Arc;
use tokio::io::AsyncWriteExt;
use tokio::net::tcp::OwnedWriteHalf;
use tokio::sync::Mutex;
/// An abstraction over the raw tokio tcp stream
/// to emit events and share a connection across multiple
/// contexts.
#[derive(Clone)]
pub struct StreamEmitter {
stream: Arc<Mutex<OwnedWriteHalf>>,
}
impl StreamEmitter {
pub fn new(stream: OwnedWriteHalf) -> Self {
Self {
stream: Arc::new(Mutex::new(stream)),
}
}
/// Emits an event
pub async fn emit<T: Serialize>(&self, event: &str, data: T) -> Result<()> {
let data_bytes = rmp_serde::to_vec(&data)?;
let event = Event::new(event.to_string(), data_bytes);
let event_bytes = event.to_bytes()?;
{
let mut stream = self.stream.lock().await;
(*stream).write_all(&event_bytes[..]).await?;
}
Ok(())
}
}

@ -0,0 +1,13 @@
//! This project provides an ipc server and client implementation using
//! messagepack. All calls are asynchronous and event based.
#[cfg(test)]
mod tests;
pub mod error;
mod events;
mod ipc;
pub use events::event::Event;
pub use ipc::builder::IPCBuilder;
pub use ipc::*;

@ -0,0 +1,55 @@
use self::super::utils::PingEventData;
use crate::IPCBuilder;
use std::sync::atomic::{AtomicBool, AtomicU8, Ordering};
use std::sync::Arc;
use std::time::{Duration, SystemTime};
#[tokio::test]
async fn it_receives_events() {
let ctr = Arc::new(AtomicU8::new(0));
let builder = IPCBuilder::new()
.on("ping", {
let ctr = Arc::clone(&ctr);
move |ctx, e| {
let ctr = Arc::clone(&ctr);
Box::pin(async move {
ctr.fetch_add(1, Ordering::Relaxed);
let mut ping_data = e.data::<PingEventData>()?;
ping_data.time = SystemTime::now();
ping_data.ttl -= 1;
if ping_data.ttl > 0 {
ctx.emitter.emit("ping", ping_data).await?;
}
Ok(())
})
}
})
.address("127.0.0.1:8282");
let server_running = Arc::new(AtomicBool::new(false));
tokio::spawn({
let server_running = Arc::clone(&server_running);
let builder = builder.clone();
async move {
server_running.store(true, Ordering::SeqCst);
builder.build_server().await.unwrap();
}
});
while !server_running.load(Ordering::Relaxed) {
tokio::time::sleep(Duration::from_millis(10)).await;
}
let client = builder.build_client().await.unwrap();
client
.emit(
"ping",
PingEventData {
ttl: 16,
time: SystemTime::now(),
},
)
.await
.unwrap();
tokio::time::sleep(Duration::from_secs(1)).await;
assert_eq!(ctr.load(Ordering::SeqCst), 16);
}

@ -0,0 +1,2 @@
mod ipc_tests;
mod utils;

@ -0,0 +1,8 @@
use serde::{Deserialize, Serialize};
use std::time::SystemTime;
#[derive(Clone, Serialize, Deserialize)]
pub struct PingEventData {
pub time: SystemTime,
pub ttl: u8,
}
Loading…
Cancel
Save