diff --git a/Cargo.lock b/Cargo.lock index 096de1a8..d28d8f52 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -504,11 +504,11 @@ version = "0.7.0" dependencies = [ "criterion", "lazy_static", - "log", "rmp-serde", "serde", "thiserror", "tokio", + "tracing", "typemap_rev", ] @@ -678,6 +678,38 @@ dependencies = [ "syn", ] +[[package]] +name = "tracing" +version = "0.1.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "375a639232caf30edfc78e8d89b2d4c375515393e7af7e16f01cd96917fb2105" +dependencies = [ + "cfg-if", + "pin-project-lite", + "tracing-attributes", + "tracing-core", +] + +[[package]] +name = "tracing-attributes" +version = "0.1.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f4f480b8f81512e825f337ad51e94c1eb5d3bbdf2b363dcd01e2b19a9ffe3f8e" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tracing-core" +version = "0.1.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f4ed65637b8390770814083d20756f87bfa2c21bf2f110babdc5438351746e4" +dependencies = [ + "lazy_static", +] + [[package]] name = "typemap_rev" version = "0.1.5" diff --git a/Cargo.toml b/Cargo.toml index b78f0d42..5fb39b67 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,7 +19,7 @@ harness = false [dependencies] thiserror = "1.0.30" rmp-serde = "0.15.4" -log = "0.4.14" +tracing = "0.1.29" lazy_static = "1.4.0" typemap_rev = "0.1.5" diff --git a/src/events/event.rs b/src/events/event.rs index 40f3327b..768a55c8 100644 --- a/src/events/event.rs +++ b/src/events/event.rs @@ -2,17 +2,19 @@ use crate::error::Result; use crate::events::generate_event_id; use crate::events::payload::EventReceivePayload; use serde::{Deserialize, Serialize}; +use std::fmt::Debug; use tokio::io::{AsyncRead, AsyncReadExt}; /// A container representing an event and underlying binary data. /// The data can be decoded into an object representation or read /// as raw binary data. +#[derive(Debug)] pub struct Event { header: EventHeader, data: Vec, } -#[derive(Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize)] struct EventHeader { id: u64, ref_id: Option, @@ -22,6 +24,7 @@ struct EventHeader { impl Event { /// Creates a new event with a namespace + #[tracing::instrument(level = "trace")] pub fn with_namespace( namespace: String, name: String, @@ -38,6 +41,7 @@ impl Event { } /// Creates a new event + #[tracing::instrument(level = "trace")] pub fn new(name: String, data: Vec, ref_id: Option) -> Self { let header = EventHeader { id: generate_event_id(), @@ -60,6 +64,7 @@ impl Event { } /// Decodes the data to the given type + #[tracing::instrument(level = "trace", skip(self))] pub fn data(&self) -> Result { let data = T::from_payload_bytes(&self.data[..])?; @@ -82,16 +87,12 @@ impl Event { } /// Reads an event message + #[tracing::instrument(level = "trace", skip(reader))] pub async fn from_async_read(reader: &mut R) -> Result { let total_length = reader.read_u64().await?; let header_length = reader.read_u16().await?; let data_length = total_length - header_length as u64; - log::trace!( - "Parsing event of length {} ({} header, {} data)", - total_length, - header_length, - data_length - ); + tracing::trace!(total_length, header_length, data_length); let header: EventHeader = { let mut header_bytes = vec![0u8; header_length as usize]; @@ -106,11 +107,13 @@ impl Event { } /// Encodes the event into bytes + #[tracing::instrument(level = "trace")] pub fn into_bytes(mut self) -> Result> { let mut header_bytes = rmp_serde::to_vec(&self.header)?; let header_length = header_bytes.len() as u16; let data_length = self.data.len(); let total_length = header_length as u64 + data_length as u64; + tracing::trace!(total_length, header_length, data_length); let mut buf = Vec::with_capacity(total_length as usize); buf.append(&mut total_length.to_be_bytes().to_vec()); diff --git a/src/events/event_handler.rs b/src/events/event_handler.rs index 78ce5692..7df84bad 100644 --- a/src/events/event_handler.rs +++ b/src/events/event_handler.rs @@ -2,6 +2,7 @@ use crate::error::Result; use crate::events::event::Event; use crate::ipc::context::Context; use std::collections::HashMap; +use std::fmt::{Debug, Formatter}; use std::future::Future; use std::pin::Pin; use std::sync::Arc; @@ -18,6 +19,18 @@ pub struct EventHandler { callbacks: HashMap, } +impl Debug for EventHandler { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let callback_names: String = self + .callbacks + .keys() + .cloned() + .collect::>() + .join(", "); + format!("EventHandler {{callbacks: [{}]}}", callback_names).fmt(f) + } +} + impl EventHandler { /// Creates a new event handler pub fn new() -> Self { @@ -27,6 +40,7 @@ impl EventHandler { } /// Adds a new event callback + #[tracing::instrument(skip(self, callback))] pub fn on(&mut self, name: &str, callback: F) where F: for<'a> Fn( @@ -40,6 +54,7 @@ impl EventHandler { } /// Handles a received event + #[tracing::instrument(level = "debug", skip(self, ctx))] pub async fn handle_event(&self, ctx: &Context, event: Event) -> Result<()> { if let Some(cb) = self.callbacks.get(event.name()) { cb.as_ref()(ctx, event).await?; diff --git a/src/ipc/builder.rs b/src/ipc/builder.rs index 516349b9..cff2d4b3 100644 --- a/src/ipc/builder.rs +++ b/src/ipc/builder.rs @@ -57,11 +57,8 @@ impl IPCBuilder { handler.on(ERROR_EVENT_NAME, |_, event| { Box::pin(async move { let error_data = event.data::()?; - log::warn!( - "Received Error Response from Server: {} - {}", - error_data.code, - error_data.message - ); + tracing::warn!(error_data.code); + tracing::warn!("error_data.message = '{}'", error_data.message); Ok(()) }) @@ -117,6 +114,7 @@ impl IPCBuilder { } /// Builds an ipc server + #[tracing::instrument(skip(self))] pub async fn build_server(self) -> Result<()> { self.validate()?; let server = IPCServer { @@ -130,6 +128,7 @@ impl IPCBuilder { } /// Builds an ipc client + #[tracing::instrument(skip(self))] pub async fn build_client(self) -> Result { self.validate()?; let client = IPCClient { @@ -144,6 +143,7 @@ impl IPCBuilder { } /// Validates that all required fields have been provided + #[tracing::instrument(skip(self))] fn validate(&self) -> Result<()> { if self.address.is_none() { Err(Error::BuildError("Missing Address".to_string())) diff --git a/src/ipc/client.rs b/src/ipc/client.rs index adaae1f2..ba929738 100644 --- a/src/ipc/client.rs +++ b/src/ipc/client.rs @@ -23,6 +23,7 @@ pub struct IPCClient { impl IPCClient { /// Connects to a given address and returns an emitter for events to that address. /// Invoked by [IPCBuilder::build_client](crate::builder::IPCBuilder::build_client) + #[tracing::instrument(skip(self))] pub async fn connect(self, address: &str) -> Result { let stream = TcpStream::connect(address).await?; let (read_half, write_half) = stream.into_split(); @@ -35,7 +36,6 @@ impl IPCClient { ); let handler = Arc::new(self.handler); let namespaces = Arc::new(self.namespaces); - log::debug!("IPC client connected to {}", address); let handle = tokio::spawn({ let ctx = Context::clone(&ctx); diff --git a/src/ipc/context.rs b/src/ipc/context.rs index e625c25d..81914c83 100644 --- a/src/ipc/context.rs +++ b/src/ipc/context.rs @@ -48,6 +48,7 @@ impl Context { } /// Waits for a reply to the given message ID + #[tracing::instrument(level = "debug", skip(self))] pub async fn await_reply(&self, message_id: u64) -> Result { let (rx, tx) = oneshot::channel(); { @@ -60,6 +61,7 @@ impl Context { } /// Stops the listener and closes the connection + #[tracing::instrument(level = "debug", skip(self))] pub async fn stop(self) -> Result<()> { let mut sender = self.stop_sender.lock().await; if let Some(sender) = mem::take(&mut *sender) { diff --git a/src/ipc/mod.rs b/src/ipc/mod.rs index 3a84923c..e6e196af 100644 --- a/src/ipc/mod.rs +++ b/src/ipc/mod.rs @@ -20,11 +20,11 @@ async fn handle_connection( ctx: Context, ) { while let Ok(event) = Event::from_async_read(&mut read_half).await { - log::debug!("Received {:?}:{} event", event.namespace(), event.name()); + tracing::trace!("event = {:?}", event); // check if the event is a reply if let Some(ref_id) = event.reference_id() { + tracing::trace!("Event has reference id. Passing to reply listener"); // get the listener for replies - log::trace!("Event is response to {}", ref_id); if let Some(sender) = ctx.get_reply_sender(ref_id).await { // try sending the event to the listener for replies if let Err(event) = sender.send(event) { @@ -32,18 +32,18 @@ async fn handle_connection( } continue; } - log::trace!("No response listener found for event. Passing to regular listener."); + tracing::trace!("No response listener found for event. Passing to regular listener."); } if let Some(namespace) = event.namespace().clone().and_then(|n| namespaces.get(&n)) { - log::trace!("Passing event to namespace listener"); + tracing::trace!("Passing event to namespace listener"); let handler = Arc::clone(&namespace.handler); handle_event(Context::clone(&ctx), handler, event); } else { - log::trace!("Passing event to global listener"); + tracing::trace!("Passing event to global listener"); handle_event(Context::clone(&ctx), Arc::clone(&handler), event); } } - log::debug!("Connection closed."); + tracing::debug!("Connection closed."); } /// Handles a single event in a different tokio context @@ -64,9 +64,9 @@ fn handle_event(ctx: Context, handler: Arc, event: Event) { ) .await { - log::error!("Error occurred when sending error response: {:?}", e); + tracing::error!("Error occurred when sending error response: {:?}", e); } - log::error!("Failed to handle event: {:?}", e); + tracing::error!("Failed to handle event: {:?}", e); } }); } diff --git a/src/ipc/server.rs b/src/ipc/server.rs index 6926eb60..d1ed1113 100644 --- a/src/ipc/server.rs +++ b/src/ipc/server.rs @@ -22,14 +22,17 @@ pub struct IPCServer { impl IPCServer { /// Starts the IPC Server. /// Invoked by [IPCBuilder::build_server](crate::builder::IPCBuilder::build_server) + #[tracing::instrument(skip(self))] pub async fn start(self, address: &str) -> Result<()> { let listener = TcpListener::bind(address).await?; let handler = Arc::new(self.handler); let namespaces = Arc::new(self.namespaces); let data = Arc::new(RwLock::new(self.data)); - log::debug!("IPC server listening on {}", address); + tracing::info!(address); - while let Ok((stream, _)) = listener.accept().await { + while let Ok((stream, remote_address)) = listener.accept().await { + let remote_address = remote_address.to_string(); + tracing::debug!("remote_address = {}", remote_address); let handler = Arc::clone(&handler); let namespaces = Arc::clone(&namespaces); let data = Arc::clone(&data); diff --git a/src/ipc/stream_emitter.rs b/src/ipc/stream_emitter.rs index b0c7a787..37cbef91 100644 --- a/src/ipc/stream_emitter.rs +++ b/src/ipc/stream_emitter.rs @@ -2,11 +2,11 @@ use crate::error::Result; use crate::events::event::Event; use crate::events::payload::EventSendPayload; use crate::ipc::context::Context; +use std::fmt::Debug; use std::sync::Arc; use tokio::io::AsyncWriteExt; use tokio::net::tcp::OwnedWriteHalf; use tokio::sync::Mutex; -use tokio::time::Instant; /// An abstraction over the raw tokio tcp stream /// to emit events and share a connection across multiple @@ -23,7 +23,8 @@ impl StreamEmitter { } } - pub async fn _emit( + #[tracing::instrument(level = "trace", skip(self))] + pub async fn _emit( &self, namespace: Option<&str>, event: &str, @@ -31,7 +32,6 @@ impl StreamEmitter { res_id: Option, ) -> Result { let data_bytes = data.to_payload_bytes()?; - log::debug!("Emitting event {:?}:{}", namespace, event); let event = if let Some(namespace) = namespace { Event::with_namespace(namespace.to_string(), event.to_string(), data_bytes, res_id) @@ -43,17 +43,16 @@ impl StreamEmitter { let event_bytes = event.into_bytes()?; { - let start = Instant::now(); let mut stream = self.stream.lock().await; (*stream).write_all(&event_bytes[..]).await?; - log::trace!("Wrote {} bytes in {:?}", event_bytes.len(), start.elapsed()); + tracing::trace!(bytes_len = event_bytes.len()); } Ok(EmitMetadata::new(event_id)) } /// Emits an event - pub async fn emit, T: EventSendPayload>( + pub async fn emit, T: EventSendPayload + Debug>( &self, event: S, data: T, @@ -62,7 +61,7 @@ impl StreamEmitter { } /// Emits an event to a specific namespace - pub async fn emit_to, S2: AsRef, T: EventSendPayload>( + pub async fn emit_to, S2: AsRef, T: EventSendPayload + Debug>( &self, namespace: S1, event: S2, @@ -73,7 +72,7 @@ impl StreamEmitter { } /// Emits a response to an event - pub async fn emit_response, T: EventSendPayload>( + pub async fn emit_response, T: EventSendPayload + Debug>( &self, event_id: u64, event: S, @@ -83,7 +82,7 @@ impl StreamEmitter { } /// Emits a response to an event to a namespace - pub async fn emit_response_to, S2: AsRef, T: EventSendPayload>( + pub async fn emit_response_to, S2: AsRef, T: EventSendPayload + Debug>( &self, event_id: u64, namespace: S1, diff --git a/src/namespaces/builder.rs b/src/namespaces/builder.rs index 9f9dd214..45862c7b 100644 --- a/src/namespaces/builder.rs +++ b/src/namespaces/builder.rs @@ -38,6 +38,7 @@ impl NamespaceBuilder { } /// Builds the namespace + #[tracing::instrument(skip(self))] pub fn build(self) -> IPCBuilder { let namespace = Namespace::new(self.name, self.handler); self.ipc_builder.add_namespace(namespace) diff --git a/src/namespaces/namespace.rs b/src/namespaces/namespace.rs index 4659eeb8..72dc77c8 100644 --- a/src/namespaces/namespace.rs +++ b/src/namespaces/namespace.rs @@ -1,7 +1,7 @@ use crate::events::event_handler::EventHandler; use std::sync::Arc; -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct Namespace { name: String, pub(crate) handler: Arc, diff --git a/src/tests/mod.rs b/src/tests/mod.rs index 8f29a871..9c52e641 100644 --- a/src/tests/mod.rs +++ b/src/tests/mod.rs @@ -1,3 +1,3 @@ +mod event_tests; mod ipc_tests; mod utils; -mod event_tests; diff --git a/src/tests/utils.rs b/src/tests/utils.rs index 0804847d..f3c4507e 100644 --- a/src/tests/utils.rs +++ b/src/tests/utils.rs @@ -4,7 +4,7 @@ use serde::{Deserialize, Serialize}; use std::time::SystemTime; use tokio::sync::oneshot; -#[derive(Clone, Serialize, Deserialize)] +#[derive(Clone, Serialize, Deserialize, Debug)] pub struct PingEventData { pub time: SystemTime, pub ttl: u8,