From a2aef104ee6965b027ce61876eabfc0538946d0f Mon Sep 17 00:00:00 2001 From: trivernis Date: Wed, 9 Mar 2022 19:55:59 +0100 Subject: [PATCH 1/3] Move whole main function into an async context Signed-off-by: trivernis --- mediarepo-daemon/src/main.rs | 32 +++++++------------------------- 1 file changed, 7 insertions(+), 25 deletions(-) diff --git a/mediarepo-daemon/src/main.rs b/mediarepo-daemon/src/main.rs index 55f608d..cc766bc 100644 --- a/mediarepo-daemon/src/main.rs +++ b/mediarepo-daemon/src/main.rs @@ -5,8 +5,6 @@ use std::time::Duration; use structopt::StructOpt; use tokio::fs; use tokio::io::AsyncWriteExt; -use tokio::runtime; -use tokio::runtime::Runtime; use mediarepo_core::error::RepoResult; use mediarepo_core::fs::drop_file::DropFile; @@ -49,7 +47,8 @@ enum SubCommand { Start, } -fn main() -> RepoResult<()> { +#[tokio::main] +async fn main() -> RepoResult<()> { let mut opt: Opt = Opt::from_args(); opt.repo = env::current_dir().unwrap().join(opt.repo); @@ -66,7 +65,7 @@ fn main() -> RepoResult<()> { } else { Settings::default() }; - clean_old_connection_files(&opt.repo)?; + clean_old_connection_files(&opt.repo).await?; let mut guards = Vec::new(); if opt.profile { @@ -76,8 +75,8 @@ fn main() -> RepoResult<()> { } let result = match opt.cmd.clone() { - SubCommand::Init { force } => get_single_thread_runtime().block_on(init(opt, force)), - SubCommand::Start => get_multi_thread_runtime().block_on(start_server(opt, settings)), + SubCommand::Init { force } => init(opt, force).await, + SubCommand::Start => start_server(opt, settings).await, }; match result { @@ -90,23 +89,6 @@ fn main() -> RepoResult<()> { } } -fn get_single_thread_runtime() -> Runtime { - log::info!("Using current thread runtime"); - runtime::Builder::new_current_thread() - .enable_all() - .max_blocking_threads(1) - .build() - .unwrap() -} - -fn get_multi_thread_runtime() -> Runtime { - log::info!("Using multi thread runtime"); - runtime::Builder::new_multi_thread() - .enable_all() - .build() - .unwrap() -} - async fn init_repo(opt: &Opt, paths: &PathSettings) -> RepoResult { let repo = get_repo(&opt.repo, paths).await?; @@ -244,14 +226,14 @@ async fn init(opt: Opt, force: bool) -> RepoResult<()> { Ok(()) } -fn clean_old_connection_files(root: &PathBuf) -> RepoResult<()> { +async fn clean_old_connection_files(root: &PathBuf) -> RepoResult<()> { let paths = ["repo.tcp", "repo.sock"]; for path in paths { let path = root.join(path); if path.exists() { - std::fs::remove_file(&path)?; + tokio::fs::remove_file(&path).await?; } } From aa772ea17358314f31ce9c6ec2885cfb6037acfb Mon Sep 17 00:00:00 2001 From: trivernis Date: Wed, 9 Mar 2022 20:39:59 +0100 Subject: [PATCH 2/3] Add tracing layer list and refactor logging implementation Signed-off-by: trivernis --- mediarepo-daemon/Cargo.lock | 1 + mediarepo-daemon/mediarepo-core/Cargo.toml | 1 + mediarepo-daemon/mediarepo-core/src/lib.rs | 1 + .../mediarepo-core/src/tracing_layer_list.rs | 118 ++++++++++++++++ mediarepo-daemon/src/logging.rs | 130 +++++++++++------- 5 files changed, 203 insertions(+), 48 deletions(-) create mode 100644 mediarepo-daemon/mediarepo-core/src/tracing_layer_list.rs diff --git a/mediarepo-daemon/Cargo.lock b/mediarepo-daemon/Cargo.lock index 741d923..dbdf80a 100644 --- a/mediarepo-daemon/Cargo.lock +++ b/mediarepo-daemon/Cargo.lock @@ -1311,6 +1311,7 @@ dependencies = [ "tokio-graceful-shutdown", "toml", "tracing", + "tracing-subscriber", "typemap_rev", ] diff --git a/mediarepo-daemon/mediarepo-core/Cargo.toml b/mediarepo-daemon/mediarepo-core/Cargo.toml index 5a007dd..bf4f1d5 100644 --- a/mediarepo-daemon/mediarepo-core/Cargo.toml +++ b/mediarepo-daemon/mediarepo-core/Cargo.toml @@ -21,6 +21,7 @@ tracing = "0.1.31" data-encoding = "2.3.2" tokio-graceful-shutdown = "0.4.3" thumbnailer = "0.4.0" +tracing-subscriber = "0.3.9" [dependencies.sea-orm] version = "0.6.0" diff --git a/mediarepo-daemon/mediarepo-core/src/lib.rs b/mediarepo-daemon/mediarepo-core/src/lib.rs index a33546e..688c48b 100644 --- a/mediarepo-daemon/mediarepo-core/src/lib.rs +++ b/mediarepo-daemon/mediarepo-core/src/lib.rs @@ -10,5 +10,6 @@ pub mod context; pub mod error; pub mod fs; pub mod settings; +pub mod tracing_layer_list; pub mod type_keys; pub mod utils; diff --git a/mediarepo-daemon/mediarepo-core/src/tracing_layer_list.rs b/mediarepo-daemon/mediarepo-core/src/tracing_layer_list.rs new file mode 100644 index 0000000..bdf13f8 --- /dev/null +++ b/mediarepo-daemon/mediarepo-core/src/tracing_layer_list.rs @@ -0,0 +1,118 @@ +use std::slice::{Iter, IterMut}; +use tracing::level_filters::LevelFilter; +use tracing::span::{Attributes, Record}; +use tracing::subscriber::Interest; +use tracing::{Event, Id, Metadata, Subscriber}; +use tracing_subscriber::Layer; + +pub struct DynLayerList(Vec + Send + Sync + 'static>>); + +impl DynLayerList { + pub fn new() -> Self { + Self(Vec::new()) + } + + pub fn iter(&self) -> Iter<'_, Box + Send + Sync>> { + self.0.iter() + } + + pub fn iter_mut(&mut self) -> IterMut<'_, Box + Send + Sync>> { + self.0.iter_mut() + } +} + +impl DynLayerList +where + S: Subscriber, +{ + pub fn add + Send + Sync>(&mut self, layer: L) { + self.0.push(Box::new(layer)); + } +} + +impl Layer for DynLayerList +where + S: Subscriber, +{ + fn on_layer(&mut self, subscriber: &mut S) { + self.iter_mut().for_each(|l| l.on_layer(subscriber)); + } + + fn register_callsite(&self, metadata: &'static Metadata<'static>) -> Interest { + // Return highest level of interest. + let mut interest = Interest::never(); + for layer in &self.0 { + let new_interest = layer.register_callsite(metadata); + if (interest.is_sometimes() && new_interest.is_always()) + || (interest.is_never() && !new_interest.is_never()) + { + interest = new_interest; + } + } + interest + } + + fn enabled( + &self, + metadata: &Metadata<'_>, + ctx: tracing_subscriber::layer::Context<'_, S>, + ) -> bool { + self.iter().any(|l| l.enabled(metadata, ctx.clone())) + } + + fn on_new_span( + &self, + attrs: &Attributes<'_>, + id: &Id, + ctx: tracing_subscriber::layer::Context<'_, S>, + ) { + self.iter() + .for_each(|l| l.on_new_span(attrs, id, ctx.clone())); + } + + fn max_level_hint(&self) -> Option { + self.iter().filter_map(|l| l.max_level_hint()).max() + } + + fn on_record( + &self, + span: &Id, + values: &Record<'_>, + ctx: tracing_subscriber::layer::Context<'_, S>, + ) { + self.iter() + .for_each(|l| l.on_record(span, values, ctx.clone())); + } + + fn on_follows_from( + &self, + span: &Id, + follows: &Id, + ctx: tracing_subscriber::layer::Context<'_, S>, + ) { + self.iter() + .for_each(|l| l.on_follows_from(span, follows, ctx.clone())); + } + + fn on_event(&self, event: &Event<'_>, ctx: tracing_subscriber::layer::Context<'_, S>) { + self.iter().for_each(|l| l.on_event(event, ctx.clone())); + } + + fn on_enter(&self, id: &Id, ctx: tracing_subscriber::layer::Context<'_, S>) { + self.iter().for_each(|l| l.on_enter(id, ctx.clone())); + } + + fn on_exit(&self, id: &Id, ctx: tracing_subscriber::layer::Context<'_, S>) { + self.iter().for_each(|l| l.on_exit(id, ctx.clone())); + } + + fn on_close(&self, id: Id, ctx: tracing_subscriber::layer::Context<'_, S>) { + self.iter() + .for_each(|l| l.on_close(id.clone(), ctx.clone())); + } + + fn on_id_change(&self, old: &Id, new: &Id, ctx: tracing_subscriber::layer::Context<'_, S>) { + self.iter() + .for_each(|l| l.on_id_change(old, new, ctx.clone())); + } +} diff --git a/mediarepo-daemon/src/logging.rs b/mediarepo-daemon/src/logging.rs index dfb5846..d9fe937 100644 --- a/mediarepo-daemon/src/logging.rs +++ b/mediarepo-daemon/src/logging.rs @@ -7,16 +7,17 @@ use tracing::Level; use tracing_appender::non_blocking::{NonBlocking, WorkerGuard}; use tracing_flame::FlameLayer; use tracing_log::LogTracer; -use tracing_subscriber::{ - fmt::{self}, - Layer, Registry, -}; use tracing_subscriber::filter::{self, Targets}; use tracing_subscriber::fmt::format::FmtSpan; use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::util::SubscriberInitExt; +use tracing_subscriber::{ + fmt::{self}, + Layer, Registry, +}; use mediarepo_core::settings::LoggingSettings; +use mediarepo_core::tracing_layer_list::DynLayerList; #[allow(dyn_drop)] pub type DropGuard = Box; @@ -25,39 +26,60 @@ pub fn init_tracing(repo_path: &PathBuf, log_cfg: &LoggingSettings) -> Vec() - .unwrap_or( - filter::Targets::new() - .with_default(Level::INFO) - .with_target("sqlx", Level::WARN), - ), - ); + add_stdout_layer(&mut guards, &mut layer_list); + add_sql_layer(log_cfg, &log_path, &mut guards, &mut layer_list); + add_bromine_layer(log_cfg, &log_path, &mut guards, &mut layer_list); + add_app_log_layer(log_cfg, &log_path, &mut guards, &mut layer_list); - let (sql_writer, guard) = get_sql_log_writer(&log_path); + let tokio_console_enabled = std::env::var("TOKIO_CONSOLE") + .map(|v| v.eq_ignore_ascii_case("true")) + .unwrap_or(false); + + if tokio_console_enabled { + add_tokio_console_layer(&mut layer_list); + } + + let registry = Registry::default().with(layer_list); + tracing::subscriber::set_global_default(registry).expect("Failed to initialize tracing"); + + guards +} + +fn add_tokio_console_layer(layer_list: &mut DynLayerList) { + let console_layer = ConsoleLayer::builder().with_default_env().spawn(); + layer_list.add(console_layer); +} + +fn add_app_log_layer( + log_cfg: &LoggingSettings, + log_path: &PathBuf, + guards: &mut Vec, + layer_list: &mut DynLayerList, +) { + let (app_log_writer, guard) = get_application_log_writer(&log_path); guards.push(Box::new(guard) as DropGuard); - let sql_layer = fmt::layer() - .with_writer(sql_writer) + let app_log_layer = fmt::layer() + .with_writer(app_log_writer) .pretty() .with_ansi(false) - .with_span_events(FmtSpan::NONE) - .with_filter(get_sql_targets(log_cfg.trace_sql)); + .with_span_events(FmtSpan::NEW | FmtSpan::CLOSE) + .with_filter(get_app_targets(log_cfg.level.clone().into())); + layer_list.add(app_log_layer); +} +fn add_bromine_layer( + log_cfg: &LoggingSettings, + log_path: &PathBuf, + guards: &mut Vec, + layer_list: &mut DynLayerList, +) { let (bromine_writer, guard) = get_bromine_log_writer(&log_path); guards.push(Box::new(guard) as DropGuard); @@ -67,36 +89,48 @@ pub fn init_tracing(repo_path: &PathBuf, log_cfg: &LoggingSettings) -> Vec, + layer_list: &mut DynLayerList, +) { + let (sql_writer, guard) = get_sql_log_writer(&log_path); guards.push(Box::new(guard) as DropGuard); - let app_log_layer = fmt::layer() - .with_writer(app_log_writer) + let sql_layer = fmt::layer() + .with_writer(sql_writer) .pretty() .with_ansi(false) - .with_span_events(FmtSpan::NEW | FmtSpan::CLOSE) - .with_filter(get_app_targets(log_cfg.level.clone().into())); - - let registry = Registry::default() - .with(stdout_layer) - .with(sql_layer) - .with(bromine_layer) - .with(app_log_layer); + .with_span_events(FmtSpan::NONE) + .with_filter(get_sql_targets(log_cfg.trace_sql)); - let tokio_console_enabled = std::env::var("TOKIO_CONSOLE") - .map(|v| v.eq_ignore_ascii_case("true")) - .unwrap_or(false); + layer_list.add(sql_layer); +} - if tokio_console_enabled { - let console_layer = ConsoleLayer::builder().with_default_env().spawn(); - let registry = registry.with(console_layer); - tracing::subscriber::set_global_default(registry).expect("Failed to initialize tracing"); - } else { - tracing::subscriber::set_global_default(registry).expect("Failed to initialize tracing"); - } +fn add_stdout_layer(guards: &mut Vec, layer_list: &mut DynLayerList) { + let (stdout_writer, guard) = tracing_appender::non_blocking(std::io::stdout()); + guards.push(Box::new(guard) as DropGuard); - guards + let stdout_layer = fmt::layer() + .with_thread_names(false) + .with_target(true) + .with_writer(stdout_writer) + .with_span_events(FmtSpan::NEW | FmtSpan::CLOSE) + .with_filter( + std::env::var("RUST_LOG") + .unwrap_or(String::from("info,sqlx=warn")) + .parse::() + .unwrap_or( + filter::Targets::new() + .with_default(Level::INFO) + .with_target("sqlx", Level::WARN), + ), + ); + layer_list.add(stdout_layer); } fn get_sql_log_writer(log_path: &PathBuf) -> (NonBlocking, WorkerGuard) { From a11a2f3dc5d663df78dc19f230f450e9c5672866 Mon Sep 17 00:00:00 2001 From: trivernis Date: Wed, 9 Mar 2022 22:11:22 +0100 Subject: [PATCH 3/3] Add opt-in performance tracing telemetry Signed-off-by: trivernis --- mediarepo-daemon/Cargo.lock | 89 +++++++++++++++++++ mediarepo-daemon/Cargo.toml | 3 + .../mediarepo-core/src/settings/logging.rs | 6 ++ .../mediarepo-database/Cargo.toml | 2 +- .../mediarepo-database/src/lib.rs | 3 +- mediarepo-daemon/src/logging.rs | 39 ++++++++ mediarepo-daemon/src/main.rs | 1 + 7 files changed, 141 insertions(+), 2 deletions(-) diff --git a/mediarepo-daemon/Cargo.lock b/mediarepo-daemon/Cargo.lock index dbdf80a..27cc942 100644 --- a/mediarepo-daemon/Cargo.lock +++ b/mediarepo-daemon/Cargo.lock @@ -1129,6 +1129,12 @@ dependencies = [ "cfg-if 1.0.0", ] +[[package]] +name = "integer-encoding" +version = "3.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e85a1509a128c855368e135cffcde7eac17d8e1083f41e2b98c58bc1a5074be" + [[package]] name = "itertools" version = "0.10.3" @@ -1326,6 +1332,8 @@ dependencies = [ "mediarepo-logic", "mediarepo-socket", "num-integer", + "opentelemetry", + "opentelemetry-jaeger", "rolling-file", "structopt", "tokio", @@ -1334,6 +1342,7 @@ dependencies = [ "tracing-appender", "tracing-flame", "tracing-log", + "tracing-opentelemetry", "tracing-subscriber", ] @@ -1698,6 +1707,60 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "opentelemetry" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6105e89802af13fdf48c49d7646d3b533a70e536d818aae7e78ba0433d01acb8" +dependencies = [ + "async-trait", + "crossbeam-channel", + "futures-channel", + "futures-executor", + "futures-util", + "js-sys", + "lazy_static", + "percent-encoding", + "pin-project", + "rand", + "thiserror", + "tokio", + "tokio-stream", +] + +[[package]] +name = "opentelemetry-jaeger" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8c0b12cd9e3f9b35b52f6e0dac66866c519b26f424f4bbf96e3fe8bfbdc5229" +dependencies = [ + "async-trait", + "lazy_static", + "opentelemetry", + "opentelemetry-semantic-conventions", + "thiserror", + "thrift", + "tokio", +] + +[[package]] +name = "opentelemetry-semantic-conventions" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "985cc35d832d412224b2cffe2f9194b1b89b6aa5d0bef76d080dce09d90e62bd" +dependencies = [ + "opentelemetry", +] + +[[package]] +name = "ordered-float" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3305af35278dd29f46fcdd139e0b1fbfae2153f0e5928b39b035542dd31e37b7" +dependencies = [ + "num-traits", +] + [[package]] name = "ordered-multimap" version = "0.3.1" @@ -2685,6 +2748,19 @@ dependencies = [ "num_cpus", ] +[[package]] +name = "thrift" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b82ca8f46f95b3ce96081fe3dd89160fdea970c254bb72925255d1b62aae692e" +dependencies = [ + "byteorder", + "integer-encoding", + "log", + "ordered-float", + "threadpool", +] + [[package]] name = "thumbnailer" version = "0.4.0" @@ -3023,6 +3099,19 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-opentelemetry" +version = "0.17.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f9378e96a9361190ae297e7f3a8ff644aacd2897f244b1ff81f381669196fa6" +dependencies = [ + "opentelemetry", + "tracing", + "tracing-core", + "tracing-log", + "tracing-subscriber", +] + [[package]] name = "tracing-serde" version = "0.1.3" diff --git a/mediarepo-daemon/Cargo.toml b/mediarepo-daemon/Cargo.toml index 821495e..5f37c2c 100644 --- a/mediarepo-daemon/Cargo.toml +++ b/mediarepo-daemon/Cargo.toml @@ -27,6 +27,9 @@ rolling-file = "0.1.0" num-integer = "0.1.44" console-subscriber = "0.1.3" log = "0.4.14" +opentelemetry = { version = "0.17.0", features = ["rt-tokio"] } +opentelemetry-jaeger = { version = "0.16.0", features = ["rt-tokio"] } +tracing-opentelemetry = "0.17.2" [dependencies.mediarepo-core] path = "./mediarepo-core" diff --git a/mediarepo-daemon/mediarepo-core/src/settings/logging.rs b/mediarepo-daemon/mediarepo-core/src/settings/logging.rs index a8b9c06..513b7a2 100644 --- a/mediarepo-daemon/mediarepo-core/src/settings/logging.rs +++ b/mediarepo-daemon/mediarepo-core/src/settings/logging.rs @@ -1,11 +1,15 @@ use serde::{Deserialize, Serialize}; use tracing::Level; +const DEFAULT_TELEMETRY_ENDPOINT: &str = "telemetry.trivernis.net:6831"; + #[derive(Clone, Debug, Deserialize, Serialize)] pub struct LoggingSettings { pub level: LogLevel, pub trace_sql: bool, pub trace_api_calls: bool, + pub telemetry: bool, + pub telemetry_endpoint: String, } impl Default for LoggingSettings { @@ -14,6 +18,8 @@ impl Default for LoggingSettings { level: LogLevel::Info, trace_sql: false, trace_api_calls: false, + telemetry: false, + telemetry_endpoint: String::from(DEFAULT_TELEMETRY_ENDPOINT), } } } diff --git a/mediarepo-daemon/mediarepo-database/Cargo.toml b/mediarepo-daemon/mediarepo-database/Cargo.toml index b37e501..6ed1944 100644 --- a/mediarepo-daemon/mediarepo-database/Cargo.toml +++ b/mediarepo-daemon/mediarepo-database/Cargo.toml @@ -19,5 +19,5 @@ features = ["migrate"] [dependencies.sea-orm] version = "0.6.0" -features = ["sqlx-sqlite", "runtime-tokio-native-tls", "macros", "debug-print"] +features = ["sqlx-sqlite", "runtime-tokio-native-tls", "macros"] default-features = false diff --git a/mediarepo-daemon/mediarepo-database/src/lib.rs b/mediarepo-daemon/mediarepo-database/src/lib.rs index 4e946a3..7628774 100644 --- a/mediarepo-daemon/mediarepo-database/src/lib.rs +++ b/mediarepo-daemon/mediarepo-database/src/lib.rs @@ -13,7 +13,8 @@ pub async fn get_database>(uri: S) -> RepoDatabaseResult Vec) { layer_list.add(console_layer); } +fn add_telemetry_layer(log_cfg: &LoggingSettings, layer_list: &mut DynLayerList) { + match opentelemetry_jaeger::new_pipeline() + .with_agent_endpoint(&log_cfg.telemetry_endpoint) + .with_trace_config( + sdk::trace::Config::default() + .with_resource(Resource::new(vec![KeyValue::new( + "service.name", + "mediarepo-daemon", + )])) + .with_max_attributes_per_span(1), + ) + .with_instrumentation_library_tags(false) + .with_service_name("mediarepo-daemon") + .install_batch(opentelemetry::runtime::Tokio) + { + Ok(tracer) => { + let telemetry_layer = tracing_opentelemetry::layer() + .with_tracer(tracer) + .with_filter( + filter::Targets::new() + .with_target("tokio", Level::INFO) + .with_target("h2", Level::INFO) + .with_target("sqlx", Level::ERROR) + .with_target("sea_orm", Level::INFO), + ); + layer_list.add(telemetry_layer); + } + Err(e) => { + eprintln!("Failed to initialize telemetry tracing: {}", e); + } + } +} + fn add_app_log_layer( log_cfg: &LoggingSettings, log_path: &PathBuf, diff --git a/mediarepo-daemon/src/main.rs b/mediarepo-daemon/src/main.rs index cc766bc..f633bb4 100644 --- a/mediarepo-daemon/src/main.rs +++ b/mediarepo-daemon/src/main.rs @@ -79,6 +79,7 @@ async fn main() -> RepoResult<()> { SubCommand::Start => start_server(opt, settings).await, }; + opentelemetry::global::shutdown_tracer_provider(); match result { Ok(_) => Ok(()), Err(e) => {