Replace log with tracing crate

Signed-off-by: trivernis <trivernis@protonmail.com>
pull/14/head
trivernis 3 years ago
parent adea767aa0
commit 3b7dd510d8
Signed by: Trivernis
GPG Key ID: DFFFCC2C7A02DB45

34
Cargo.lock generated

@ -504,11 +504,11 @@ version = "0.7.0"
dependencies = [
"criterion",
"lazy_static",
"log",
"rmp-serde",
"serde",
"thiserror",
"tokio",
"tracing",
"typemap_rev",
]
@ -678,6 +678,38 @@ dependencies = [
"syn",
]
[[package]]
name = "tracing"
version = "0.1.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "375a639232caf30edfc78e8d89b2d4c375515393e7af7e16f01cd96917fb2105"
dependencies = [
"cfg-if",
"pin-project-lite",
"tracing-attributes",
"tracing-core",
]
[[package]]
name = "tracing-attributes"
version = "0.1.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f4f480b8f81512e825f337ad51e94c1eb5d3bbdf2b363dcd01e2b19a9ffe3f8e"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "tracing-core"
version = "0.1.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1f4ed65637b8390770814083d20756f87bfa2c21bf2f110babdc5438351746e4"
dependencies = [
"lazy_static",
]
[[package]]
name = "typemap_rev"
version = "0.1.5"

@ -19,7 +19,7 @@ harness = false
[dependencies]
thiserror = "1.0.30"
rmp-serde = "0.15.4"
log = "0.4.14"
tracing = "0.1.29"
lazy_static = "1.4.0"
typemap_rev = "0.1.5"

@ -2,17 +2,19 @@ use crate::error::Result;
use crate::events::generate_event_id;
use crate::events::payload::EventReceivePayload;
use serde::{Deserialize, Serialize};
use std::fmt::Debug;
use tokio::io::{AsyncRead, AsyncReadExt};
/// A container representing an event and underlying binary data.
/// The data can be decoded into an object representation or read
/// as raw binary data.
#[derive(Debug)]
pub struct Event {
header: EventHeader,
data: Vec<u8>,
}
#[derive(Serialize, Deserialize)]
#[derive(Debug, Serialize, Deserialize)]
struct EventHeader {
id: u64,
ref_id: Option<u64>,
@ -22,6 +24,7 @@ struct EventHeader {
impl Event {
/// Creates a new event with a namespace
#[tracing::instrument(level = "trace")]
pub fn with_namespace(
namespace: String,
name: String,
@ -38,6 +41,7 @@ impl Event {
}
/// Creates a new event
#[tracing::instrument(level = "trace")]
pub fn new(name: String, data: Vec<u8>, ref_id: Option<u64>) -> Self {
let header = EventHeader {
id: generate_event_id(),
@ -60,6 +64,7 @@ impl Event {
}
/// Decodes the data to the given type
#[tracing::instrument(level = "trace", skip(self))]
pub fn data<T: EventReceivePayload>(&self) -> Result<T> {
let data = T::from_payload_bytes(&self.data[..])?;
@ -82,16 +87,12 @@ impl Event {
}
/// Reads an event message
#[tracing::instrument(level = "trace", skip(reader))]
pub async fn from_async_read<R: AsyncRead + Unpin>(reader: &mut R) -> Result<Self> {
let total_length = reader.read_u64().await?;
let header_length = reader.read_u16().await?;
let data_length = total_length - header_length as u64;
log::trace!(
"Parsing event of length {} ({} header, {} data)",
total_length,
header_length,
data_length
);
tracing::trace!(total_length, header_length, data_length);
let header: EventHeader = {
let mut header_bytes = vec![0u8; header_length as usize];
@ -106,11 +107,13 @@ impl Event {
}
/// Encodes the event into bytes
#[tracing::instrument(level = "trace")]
pub fn into_bytes(mut self) -> Result<Vec<u8>> {
let mut header_bytes = rmp_serde::to_vec(&self.header)?;
let header_length = header_bytes.len() as u16;
let data_length = self.data.len();
let total_length = header_length as u64 + data_length as u64;
tracing::trace!(total_length, header_length, data_length);
let mut buf = Vec::with_capacity(total_length as usize);
buf.append(&mut total_length.to_be_bytes().to_vec());

@ -2,6 +2,7 @@ use crate::error::Result;
use crate::events::event::Event;
use crate::ipc::context::Context;
use std::collections::HashMap;
use std::fmt::{Debug, Formatter};
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
@ -18,6 +19,18 @@ pub struct EventHandler {
callbacks: HashMap<String, EventCallback>,
}
impl Debug for EventHandler {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let callback_names: String = self
.callbacks
.keys()
.cloned()
.collect::<Vec<String>>()
.join(", ");
format!("EventHandler {{callbacks: [{}]}}", callback_names).fmt(f)
}
}
impl EventHandler {
/// Creates a new event handler
pub fn new() -> Self {
@ -27,6 +40,7 @@ impl EventHandler {
}
/// Adds a new event callback
#[tracing::instrument(skip(self, callback))]
pub fn on<F: 'static>(&mut self, name: &str, callback: F)
where
F: for<'a> Fn(
@ -40,6 +54,7 @@ impl EventHandler {
}
/// Handles a received event
#[tracing::instrument(level = "debug", skip(self, ctx))]
pub async fn handle_event(&self, ctx: &Context, event: Event) -> Result<()> {
if let Some(cb) = self.callbacks.get(event.name()) {
cb.as_ref()(ctx, event).await?;

@ -57,11 +57,8 @@ impl IPCBuilder {
handler.on(ERROR_EVENT_NAME, |_, event| {
Box::pin(async move {
let error_data = event.data::<ErrorEventData>()?;
log::warn!(
"Received Error Response from Server: {} - {}",
error_data.code,
error_data.message
);
tracing::warn!(error_data.code);
tracing::warn!("error_data.message = '{}'", error_data.message);
Ok(())
})
@ -117,6 +114,7 @@ impl IPCBuilder {
}
/// Builds an ipc server
#[tracing::instrument(skip(self))]
pub async fn build_server(self) -> Result<()> {
self.validate()?;
let server = IPCServer {
@ -130,6 +128,7 @@ impl IPCBuilder {
}
/// Builds an ipc client
#[tracing::instrument(skip(self))]
pub async fn build_client(self) -> Result<Context> {
self.validate()?;
let client = IPCClient {
@ -144,6 +143,7 @@ impl IPCBuilder {
}
/// Validates that all required fields have been provided
#[tracing::instrument(skip(self))]
fn validate(&self) -> Result<()> {
if self.address.is_none() {
Err(Error::BuildError("Missing Address".to_string()))

@ -23,6 +23,7 @@ pub struct IPCClient {
impl IPCClient {
/// Connects to a given address and returns an emitter for events to that address.
/// Invoked by [IPCBuilder::build_client](crate::builder::IPCBuilder::build_client)
#[tracing::instrument(skip(self))]
pub async fn connect(self, address: &str) -> Result<Context> {
let stream = TcpStream::connect(address).await?;
let (read_half, write_half) = stream.into_split();
@ -35,7 +36,6 @@ impl IPCClient {
);
let handler = Arc::new(self.handler);
let namespaces = Arc::new(self.namespaces);
log::debug!("IPC client connected to {}", address);
let handle = tokio::spawn({
let ctx = Context::clone(&ctx);

@ -48,6 +48,7 @@ impl Context {
}
/// Waits for a reply to the given message ID
#[tracing::instrument(level = "debug", skip(self))]
pub async fn await_reply(&self, message_id: u64) -> Result<Event> {
let (rx, tx) = oneshot::channel();
{
@ -60,6 +61,7 @@ impl Context {
}
/// Stops the listener and closes the connection
#[tracing::instrument(level = "debug", skip(self))]
pub async fn stop(self) -> Result<()> {
let mut sender = self.stop_sender.lock().await;
if let Some(sender) = mem::take(&mut *sender) {

@ -20,11 +20,11 @@ async fn handle_connection(
ctx: Context,
) {
while let Ok(event) = Event::from_async_read(&mut read_half).await {
log::debug!("Received {:?}:{} event", event.namespace(), event.name());
tracing::trace!("event = {:?}", event);
// check if the event is a reply
if let Some(ref_id) = event.reference_id() {
tracing::trace!("Event has reference id. Passing to reply listener");
// get the listener for replies
log::trace!("Event is response to {}", ref_id);
if let Some(sender) = ctx.get_reply_sender(ref_id).await {
// try sending the event to the listener for replies
if let Err(event) = sender.send(event) {
@ -32,18 +32,18 @@ async fn handle_connection(
}
continue;
}
log::trace!("No response listener found for event. Passing to regular listener.");
tracing::trace!("No response listener found for event. Passing to regular listener.");
}
if let Some(namespace) = event.namespace().clone().and_then(|n| namespaces.get(&n)) {
log::trace!("Passing event to namespace listener");
tracing::trace!("Passing event to namespace listener");
let handler = Arc::clone(&namespace.handler);
handle_event(Context::clone(&ctx), handler, event);
} else {
log::trace!("Passing event to global listener");
tracing::trace!("Passing event to global listener");
handle_event(Context::clone(&ctx), Arc::clone(&handler), event);
}
}
log::debug!("Connection closed.");
tracing::debug!("Connection closed.");
}
/// Handles a single event in a different tokio context
@ -64,9 +64,9 @@ fn handle_event(ctx: Context, handler: Arc<EventHandler>, event: Event) {
)
.await
{
log::error!("Error occurred when sending error response: {:?}", e);
tracing::error!("Error occurred when sending error response: {:?}", e);
}
log::error!("Failed to handle event: {:?}", e);
tracing::error!("Failed to handle event: {:?}", e);
}
});
}

@ -22,14 +22,17 @@ pub struct IPCServer {
impl IPCServer {
/// Starts the IPC Server.
/// Invoked by [IPCBuilder::build_server](crate::builder::IPCBuilder::build_server)
#[tracing::instrument(skip(self))]
pub async fn start(self, address: &str) -> Result<()> {
let listener = TcpListener::bind(address).await?;
let handler = Arc::new(self.handler);
let namespaces = Arc::new(self.namespaces);
let data = Arc::new(RwLock::new(self.data));
log::debug!("IPC server listening on {}", address);
tracing::info!(address);
while let Ok((stream, _)) = listener.accept().await {
while let Ok((stream, remote_address)) = listener.accept().await {
let remote_address = remote_address.to_string();
tracing::debug!("remote_address = {}", remote_address);
let handler = Arc::clone(&handler);
let namespaces = Arc::clone(&namespaces);
let data = Arc::clone(&data);

@ -2,11 +2,11 @@ use crate::error::Result;
use crate::events::event::Event;
use crate::events::payload::EventSendPayload;
use crate::ipc::context::Context;
use std::fmt::Debug;
use std::sync::Arc;
use tokio::io::AsyncWriteExt;
use tokio::net::tcp::OwnedWriteHalf;
use tokio::sync::Mutex;
use tokio::time::Instant;
/// An abstraction over the raw tokio tcp stream
/// to emit events and share a connection across multiple
@ -23,7 +23,8 @@ impl StreamEmitter {
}
}
pub async fn _emit<T: EventSendPayload>(
#[tracing::instrument(level = "trace", skip(self))]
pub async fn _emit<T: EventSendPayload + Debug>(
&self,
namespace: Option<&str>,
event: &str,
@ -31,7 +32,6 @@ impl StreamEmitter {
res_id: Option<u64>,
) -> Result<EmitMetadata> {
let data_bytes = data.to_payload_bytes()?;
log::debug!("Emitting event {:?}:{}", namespace, event);
let event = if let Some(namespace) = namespace {
Event::with_namespace(namespace.to_string(), event.to_string(), data_bytes, res_id)
@ -43,17 +43,16 @@ impl StreamEmitter {
let event_bytes = event.into_bytes()?;
{
let start = Instant::now();
let mut stream = self.stream.lock().await;
(*stream).write_all(&event_bytes[..]).await?;
log::trace!("Wrote {} bytes in {:?}", event_bytes.len(), start.elapsed());
tracing::trace!(bytes_len = event_bytes.len());
}
Ok(EmitMetadata::new(event_id))
}
/// Emits an event
pub async fn emit<S: AsRef<str>, T: EventSendPayload>(
pub async fn emit<S: AsRef<str>, T: EventSendPayload + Debug>(
&self,
event: S,
data: T,
@ -62,7 +61,7 @@ impl StreamEmitter {
}
/// Emits an event to a specific namespace
pub async fn emit_to<S1: AsRef<str>, S2: AsRef<str>, T: EventSendPayload>(
pub async fn emit_to<S1: AsRef<str>, S2: AsRef<str>, T: EventSendPayload + Debug>(
&self,
namespace: S1,
event: S2,
@ -73,7 +72,7 @@ impl StreamEmitter {
}
/// Emits a response to an event
pub async fn emit_response<S: AsRef<str>, T: EventSendPayload>(
pub async fn emit_response<S: AsRef<str>, T: EventSendPayload + Debug>(
&self,
event_id: u64,
event: S,
@ -83,7 +82,7 @@ impl StreamEmitter {
}
/// Emits a response to an event to a namespace
pub async fn emit_response_to<S1: AsRef<str>, S2: AsRef<str>, T: EventSendPayload>(
pub async fn emit_response_to<S1: AsRef<str>, S2: AsRef<str>, T: EventSendPayload + Debug>(
&self,
event_id: u64,
namespace: S1,

@ -38,6 +38,7 @@ impl NamespaceBuilder {
}
/// Builds the namespace
#[tracing::instrument(skip(self))]
pub fn build(self) -> IPCBuilder {
let namespace = Namespace::new(self.name, self.handler);
self.ipc_builder.add_namespace(namespace)

@ -1,7 +1,7 @@
use crate::events::event_handler::EventHandler;
use std::sync::Arc;
#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct Namespace {
name: String,
pub(crate) handler: Arc<EventHandler>,

@ -1,3 +1,3 @@
mod event_tests;
mod ipc_tests;
mod utils;
mod event_tests;

@ -4,7 +4,7 @@ use serde::{Deserialize, Serialize};
use std::time::SystemTime;
use tokio::sync::oneshot;
#[derive(Clone, Serialize, Deserialize)]
#[derive(Clone, Serialize, Deserialize, Debug)]
pub struct PingEventData {
pub time: SystemTime,
pub ttl: u8,

Loading…
Cancel
Save