Merge pull request #20 from Trivernis/feature/telemetry

Feature/telemetry
pull/22/head
Julius Riegel 3 years ago committed by GitHub
commit 4c170ca333
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -1129,6 +1129,12 @@ dependencies = [
"cfg-if 1.0.0", "cfg-if 1.0.0",
] ]
[[package]]
name = "integer-encoding"
version = "3.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0e85a1509a128c855368e135cffcde7eac17d8e1083f41e2b98c58bc1a5074be"
[[package]] [[package]]
name = "itertools" name = "itertools"
version = "0.10.3" version = "0.10.3"
@ -1311,6 +1317,7 @@ dependencies = [
"tokio-graceful-shutdown", "tokio-graceful-shutdown",
"toml", "toml",
"tracing", "tracing",
"tracing-subscriber",
"typemap_rev", "typemap_rev",
] ]
@ -1325,6 +1332,8 @@ dependencies = [
"mediarepo-logic", "mediarepo-logic",
"mediarepo-socket", "mediarepo-socket",
"num-integer", "num-integer",
"opentelemetry",
"opentelemetry-jaeger",
"rolling-file", "rolling-file",
"structopt", "structopt",
"tokio", "tokio",
@ -1333,6 +1342,7 @@ dependencies = [
"tracing-appender", "tracing-appender",
"tracing-flame", "tracing-flame",
"tracing-log", "tracing-log",
"tracing-opentelemetry",
"tracing-subscriber", "tracing-subscriber",
] ]
@ -1697,6 +1707,60 @@ dependencies = [
"vcpkg", "vcpkg",
] ]
[[package]]
name = "opentelemetry"
version = "0.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6105e89802af13fdf48c49d7646d3b533a70e536d818aae7e78ba0433d01acb8"
dependencies = [
"async-trait",
"crossbeam-channel",
"futures-channel",
"futures-executor",
"futures-util",
"js-sys",
"lazy_static",
"percent-encoding",
"pin-project",
"rand",
"thiserror",
"tokio",
"tokio-stream",
]
[[package]]
name = "opentelemetry-jaeger"
version = "0.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8c0b12cd9e3f9b35b52f6e0dac66866c519b26f424f4bbf96e3fe8bfbdc5229"
dependencies = [
"async-trait",
"lazy_static",
"opentelemetry",
"opentelemetry-semantic-conventions",
"thiserror",
"thrift",
"tokio",
]
[[package]]
name = "opentelemetry-semantic-conventions"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "985cc35d832d412224b2cffe2f9194b1b89b6aa5d0bef76d080dce09d90e62bd"
dependencies = [
"opentelemetry",
]
[[package]]
name = "ordered-float"
version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3305af35278dd29f46fcdd139e0b1fbfae2153f0e5928b39b035542dd31e37b7"
dependencies = [
"num-traits",
]
[[package]] [[package]]
name = "ordered-multimap" name = "ordered-multimap"
version = "0.3.1" version = "0.3.1"
@ -2684,6 +2748,19 @@ dependencies = [
"num_cpus", "num_cpus",
] ]
[[package]]
name = "thrift"
version = "0.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b82ca8f46f95b3ce96081fe3dd89160fdea970c254bb72925255d1b62aae692e"
dependencies = [
"byteorder",
"integer-encoding",
"log",
"ordered-float",
"threadpool",
]
[[package]] [[package]]
name = "thumbnailer" name = "thumbnailer"
version = "0.4.0" version = "0.4.0"
@ -3022,6 +3099,19 @@ dependencies = [
"tracing-core", "tracing-core",
] ]
[[package]]
name = "tracing-opentelemetry"
version = "0.17.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1f9378e96a9361190ae297e7f3a8ff644aacd2897f244b1ff81f381669196fa6"
dependencies = [
"opentelemetry",
"tracing",
"tracing-core",
"tracing-log",
"tracing-subscriber",
]
[[package]] [[package]]
name = "tracing-serde" name = "tracing-serde"
version = "0.1.3" version = "0.1.3"

@ -27,6 +27,9 @@ rolling-file = "0.1.0"
num-integer = "0.1.44" num-integer = "0.1.44"
console-subscriber = "0.1.3" console-subscriber = "0.1.3"
log = "0.4.14" log = "0.4.14"
opentelemetry = { version = "0.17.0", features = ["rt-tokio"] }
opentelemetry-jaeger = { version = "0.16.0", features = ["rt-tokio"] }
tracing-opentelemetry = "0.17.2"
[dependencies.mediarepo-core] [dependencies.mediarepo-core]
path = "./mediarepo-core" path = "./mediarepo-core"

@ -21,6 +21,7 @@ tracing = "0.1.31"
data-encoding = "2.3.2" data-encoding = "2.3.2"
tokio-graceful-shutdown = "0.4.3" tokio-graceful-shutdown = "0.4.3"
thumbnailer = "0.4.0" thumbnailer = "0.4.0"
tracing-subscriber = "0.3.9"
[dependencies.sea-orm] [dependencies.sea-orm]
version = "0.6.0" version = "0.6.0"

@ -10,5 +10,6 @@ pub mod context;
pub mod error; pub mod error;
pub mod fs; pub mod fs;
pub mod settings; pub mod settings;
pub mod tracing_layer_list;
pub mod type_keys; pub mod type_keys;
pub mod utils; pub mod utils;

@ -1,11 +1,15 @@
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tracing::Level; use tracing::Level;
const DEFAULT_TELEMETRY_ENDPOINT: &str = "telemetry.trivernis.net:6831";
#[derive(Clone, Debug, Deserialize, Serialize)] #[derive(Clone, Debug, Deserialize, Serialize)]
pub struct LoggingSettings { pub struct LoggingSettings {
pub level: LogLevel, pub level: LogLevel,
pub trace_sql: bool, pub trace_sql: bool,
pub trace_api_calls: bool, pub trace_api_calls: bool,
pub telemetry: bool,
pub telemetry_endpoint: String,
} }
impl Default for LoggingSettings { impl Default for LoggingSettings {
@ -14,6 +18,8 @@ impl Default for LoggingSettings {
level: LogLevel::Info, level: LogLevel::Info,
trace_sql: false, trace_sql: false,
trace_api_calls: false, trace_api_calls: false,
telemetry: false,
telemetry_endpoint: String::from(DEFAULT_TELEMETRY_ENDPOINT),
} }
} }
} }

@ -0,0 +1,118 @@
use std::slice::{Iter, IterMut};
use tracing::level_filters::LevelFilter;
use tracing::span::{Attributes, Record};
use tracing::subscriber::Interest;
use tracing::{Event, Id, Metadata, Subscriber};
use tracing_subscriber::Layer;
pub struct DynLayerList<S>(Vec<Box<dyn Layer<S> + Send + Sync + 'static>>);
impl<S> DynLayerList<S> {
pub fn new() -> Self {
Self(Vec::new())
}
pub fn iter(&self) -> Iter<'_, Box<dyn Layer<S> + Send + Sync>> {
self.0.iter()
}
pub fn iter_mut(&mut self) -> IterMut<'_, Box<dyn Layer<S> + Send + Sync>> {
self.0.iter_mut()
}
}
impl<S> DynLayerList<S>
where
S: Subscriber,
{
pub fn add<L: Layer<S> + Send + Sync>(&mut self, layer: L) {
self.0.push(Box::new(layer));
}
}
impl<S> Layer<S> for DynLayerList<S>
where
S: Subscriber,
{
fn on_layer(&mut self, subscriber: &mut S) {
self.iter_mut().for_each(|l| l.on_layer(subscriber));
}
fn register_callsite(&self, metadata: &'static Metadata<'static>) -> Interest {
// Return highest level of interest.
let mut interest = Interest::never();
for layer in &self.0 {
let new_interest = layer.register_callsite(metadata);
if (interest.is_sometimes() && new_interest.is_always())
|| (interest.is_never() && !new_interest.is_never())
{
interest = new_interest;
}
}
interest
}
fn enabled(
&self,
metadata: &Metadata<'_>,
ctx: tracing_subscriber::layer::Context<'_, S>,
) -> bool {
self.iter().any(|l| l.enabled(metadata, ctx.clone()))
}
fn on_new_span(
&self,
attrs: &Attributes<'_>,
id: &Id,
ctx: tracing_subscriber::layer::Context<'_, S>,
) {
self.iter()
.for_each(|l| l.on_new_span(attrs, id, ctx.clone()));
}
fn max_level_hint(&self) -> Option<LevelFilter> {
self.iter().filter_map(|l| l.max_level_hint()).max()
}
fn on_record(
&self,
span: &Id,
values: &Record<'_>,
ctx: tracing_subscriber::layer::Context<'_, S>,
) {
self.iter()
.for_each(|l| l.on_record(span, values, ctx.clone()));
}
fn on_follows_from(
&self,
span: &Id,
follows: &Id,
ctx: tracing_subscriber::layer::Context<'_, S>,
) {
self.iter()
.for_each(|l| l.on_follows_from(span, follows, ctx.clone()));
}
fn on_event(&self, event: &Event<'_>, ctx: tracing_subscriber::layer::Context<'_, S>) {
self.iter().for_each(|l| l.on_event(event, ctx.clone()));
}
fn on_enter(&self, id: &Id, ctx: tracing_subscriber::layer::Context<'_, S>) {
self.iter().for_each(|l| l.on_enter(id, ctx.clone()));
}
fn on_exit(&self, id: &Id, ctx: tracing_subscriber::layer::Context<'_, S>) {
self.iter().for_each(|l| l.on_exit(id, ctx.clone()));
}
fn on_close(&self, id: Id, ctx: tracing_subscriber::layer::Context<'_, S>) {
self.iter()
.for_each(|l| l.on_close(id.clone(), ctx.clone()));
}
fn on_id_change(&self, old: &Id, new: &Id, ctx: tracing_subscriber::layer::Context<'_, S>) {
self.iter()
.for_each(|l| l.on_id_change(old, new, ctx.clone()));
}
}

@ -19,5 +19,5 @@ features = ["migrate"]
[dependencies.sea-orm] [dependencies.sea-orm]
version = "0.6.0" version = "0.6.0"
features = ["sqlx-sqlite", "runtime-tokio-native-tls", "macros", "debug-print"] features = ["sqlx-sqlite", "runtime-tokio-native-tls", "macros"]
default-features = false default-features = false

@ -13,7 +13,8 @@ pub async fn get_database<S: AsRef<str>>(uri: S) -> RepoDatabaseResult<DatabaseC
migrate(uri.as_ref()).await?; migrate(uri.as_ref()).await?;
let mut opt = ConnectOptions::new(uri.as_ref().to_string()); let mut opt = ConnectOptions::new(uri.as_ref().to_string());
opt.connect_timeout(Duration::from_secs(10)) opt.connect_timeout(Duration::from_secs(10))
.idle_timeout(Duration::from_secs(10)); .idle_timeout(Duration::from_secs(10))
.sqlx_logging(false);
let conn = Database::connect(opt).await?; let conn = Database::connect(opt).await?;

@ -2,21 +2,24 @@ use std::fs;
use std::path::PathBuf; use std::path::PathBuf;
use console_subscriber::ConsoleLayer; use console_subscriber::ConsoleLayer;
use opentelemetry::sdk::Resource;
use opentelemetry::{sdk, KeyValue};
use rolling_file::RollingConditionBasic; use rolling_file::RollingConditionBasic;
use tracing::Level; use tracing::Level;
use tracing_appender::non_blocking::{NonBlocking, WorkerGuard}; use tracing_appender::non_blocking::{NonBlocking, WorkerGuard};
use tracing_flame::FlameLayer; use tracing_flame::FlameLayer;
use tracing_log::LogTracer; use tracing_log::LogTracer;
use tracing_subscriber::{
fmt::{self},
Layer, Registry,
};
use tracing_subscriber::filter::{self, Targets}; use tracing_subscriber::filter::{self, Targets};
use tracing_subscriber::fmt::format::FmtSpan; use tracing_subscriber::fmt::format::FmtSpan;
use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt; use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::{
fmt::{self},
Layer, Registry,
};
use mediarepo_core::settings::LoggingSettings; use mediarepo_core::settings::LoggingSettings;
use mediarepo_core::tracing_layer_list::DynLayerList;
#[allow(dyn_drop)] #[allow(dyn_drop)]
pub type DropGuard = Box<dyn Drop>; pub type DropGuard = Box<dyn Drop>;
@ -25,39 +28,97 @@ pub fn init_tracing(repo_path: &PathBuf, log_cfg: &LoggingSettings) -> Vec<DropG
LogTracer::init().expect("failed to subscribe to log entries"); LogTracer::init().expect("failed to subscribe to log entries");
let log_path = repo_path.join("logs"); let log_path = repo_path.join("logs");
let mut guards = Vec::new(); let mut guards = Vec::new();
let mut layer_list = DynLayerList::new();
if !log_path.exists() { if !log_path.exists() {
fs::create_dir(&log_path).expect("failed to create directory for log files"); fs::create_dir(&log_path).expect("failed to create directory for log files");
} }
let (stdout_writer, guard) = tracing_appender::non_blocking(std::io::stdout());
guards.push(Box::new(guard) as DropGuard);
let stdout_layer = fmt::layer() add_stdout_layer(&mut guards, &mut layer_list);
.with_thread_names(false) add_sql_layer(log_cfg, &log_path, &mut guards, &mut layer_list);
.with_target(true) add_bromine_layer(log_cfg, &log_path, &mut guards, &mut layer_list);
.with_writer(stdout_writer) add_app_log_layer(log_cfg, &log_path, &mut guards, &mut layer_list);
.with_span_events(FmtSpan::NEW | FmtSpan::CLOSE)
.with_filter( if log_cfg.telemetry {
std::env::var("RUST_LOG") add_telemetry_layer(log_cfg, &mut layer_list);
.unwrap_or(String::from("info,sqlx=warn")) }
.parse::<filter::Targets>()
.unwrap_or( let tokio_console_enabled = std::env::var("TOKIO_CONSOLE")
.map(|v| v.eq_ignore_ascii_case("true"))
.unwrap_or(false);
if tokio_console_enabled {
add_tokio_console_layer(&mut layer_list);
}
let registry = Registry::default().with(layer_list);
tracing::subscriber::set_global_default(registry).expect("Failed to initialize tracing");
guards
}
fn add_tokio_console_layer(layer_list: &mut DynLayerList<Registry>) {
let console_layer = ConsoleLayer::builder().with_default_env().spawn();
layer_list.add(console_layer);
}
fn add_telemetry_layer(log_cfg: &LoggingSettings, layer_list: &mut DynLayerList<Registry>) {
match opentelemetry_jaeger::new_pipeline()
.with_agent_endpoint(&log_cfg.telemetry_endpoint)
.with_trace_config(
sdk::trace::Config::default()
.with_resource(Resource::new(vec![KeyValue::new(
"service.name",
"mediarepo-daemon",
)]))
.with_max_attributes_per_span(1),
)
.with_instrumentation_library_tags(false)
.with_service_name("mediarepo-daemon")
.install_batch(opentelemetry::runtime::Tokio)
{
Ok(tracer) => {
let telemetry_layer = tracing_opentelemetry::layer()
.with_tracer(tracer)
.with_filter(
filter::Targets::new() filter::Targets::new()
.with_default(Level::INFO) .with_target("tokio", Level::INFO)
.with_target("sqlx", Level::WARN), .with_target("h2", Level::INFO)
), .with_target("sqlx", Level::ERROR)
); .with_target("sea_orm", Level::INFO),
);
layer_list.add(telemetry_layer);
}
Err(e) => {
eprintln!("Failed to initialize telemetry tracing: {}", e);
}
}
}
let (sql_writer, guard) = get_sql_log_writer(&log_path); fn add_app_log_layer(
log_cfg: &LoggingSettings,
log_path: &PathBuf,
guards: &mut Vec<DropGuard>,
layer_list: &mut DynLayerList<Registry>,
) {
let (app_log_writer, guard) = get_application_log_writer(&log_path);
guards.push(Box::new(guard) as DropGuard); guards.push(Box::new(guard) as DropGuard);
let sql_layer = fmt::layer() let app_log_layer = fmt::layer()
.with_writer(sql_writer) .with_writer(app_log_writer)
.pretty() .pretty()
.with_ansi(false) .with_ansi(false)
.with_span_events(FmtSpan::NONE) .with_span_events(FmtSpan::NEW | FmtSpan::CLOSE)
.with_filter(get_sql_targets(log_cfg.trace_sql)); .with_filter(get_app_targets(log_cfg.level.clone().into()));
layer_list.add(app_log_layer);
}
fn add_bromine_layer(
log_cfg: &LoggingSettings,
log_path: &PathBuf,
guards: &mut Vec<DropGuard>,
layer_list: &mut DynLayerList<Registry>,
) {
let (bromine_writer, guard) = get_bromine_log_writer(&log_path); let (bromine_writer, guard) = get_bromine_log_writer(&log_path);
guards.push(Box::new(guard) as DropGuard); guards.push(Box::new(guard) as DropGuard);
@ -67,36 +128,48 @@ pub fn init_tracing(repo_path: &PathBuf, log_cfg: &LoggingSettings) -> Vec<DropG
.with_ansi(false) .with_ansi(false)
.with_span_events(FmtSpan::NEW | FmtSpan::CLOSE) .with_span_events(FmtSpan::NEW | FmtSpan::CLOSE)
.with_filter(get_bromine_targets(log_cfg.trace_api_calls)); .with_filter(get_bromine_targets(log_cfg.trace_api_calls));
layer_list.add(bromine_layer);
}
let (app_log_writer, guard) = get_application_log_writer(&log_path); fn add_sql_layer(
log_cfg: &LoggingSettings,
log_path: &PathBuf,
guards: &mut Vec<DropGuard>,
layer_list: &mut DynLayerList<Registry>,
) {
let (sql_writer, guard) = get_sql_log_writer(&log_path);
guards.push(Box::new(guard) as DropGuard); guards.push(Box::new(guard) as DropGuard);
let app_log_layer = fmt::layer() let sql_layer = fmt::layer()
.with_writer(app_log_writer) .with_writer(sql_writer)
.pretty() .pretty()
.with_ansi(false) .with_ansi(false)
.with_span_events(FmtSpan::NEW | FmtSpan::CLOSE) .with_span_events(FmtSpan::NONE)
.with_filter(get_app_targets(log_cfg.level.clone().into())); .with_filter(get_sql_targets(log_cfg.trace_sql));
let registry = Registry::default()
.with(stdout_layer)
.with(sql_layer)
.with(bromine_layer)
.with(app_log_layer);
let tokio_console_enabled = std::env::var("TOKIO_CONSOLE") layer_list.add(sql_layer);
.map(|v| v.eq_ignore_ascii_case("true")) }
.unwrap_or(false);
if tokio_console_enabled { fn add_stdout_layer(guards: &mut Vec<DropGuard>, layer_list: &mut DynLayerList<Registry>) {
let console_layer = ConsoleLayer::builder().with_default_env().spawn(); let (stdout_writer, guard) = tracing_appender::non_blocking(std::io::stdout());
let registry = registry.with(console_layer); guards.push(Box::new(guard) as DropGuard);
tracing::subscriber::set_global_default(registry).expect("Failed to initialize tracing");
} else {
tracing::subscriber::set_global_default(registry).expect("Failed to initialize tracing");
}
guards let stdout_layer = fmt::layer()
.with_thread_names(false)
.with_target(true)
.with_writer(stdout_writer)
.with_span_events(FmtSpan::NEW | FmtSpan::CLOSE)
.with_filter(
std::env::var("RUST_LOG")
.unwrap_or(String::from("info,sqlx=warn"))
.parse::<filter::Targets>()
.unwrap_or(
filter::Targets::new()
.with_default(Level::INFO)
.with_target("sqlx", Level::WARN),
),
);
layer_list.add(stdout_layer);
} }
fn get_sql_log_writer(log_path: &PathBuf) -> (NonBlocking, WorkerGuard) { fn get_sql_log_writer(log_path: &PathBuf) -> (NonBlocking, WorkerGuard) {

@ -5,8 +5,6 @@ use std::time::Duration;
use structopt::StructOpt; use structopt::StructOpt;
use tokio::fs; use tokio::fs;
use tokio::io::AsyncWriteExt; use tokio::io::AsyncWriteExt;
use tokio::runtime;
use tokio::runtime::Runtime;
use mediarepo_core::error::RepoResult; use mediarepo_core::error::RepoResult;
use mediarepo_core::fs::drop_file::DropFile; use mediarepo_core::fs::drop_file::DropFile;
@ -49,7 +47,8 @@ enum SubCommand {
Start, Start,
} }
fn main() -> RepoResult<()> { #[tokio::main]
async fn main() -> RepoResult<()> {
let mut opt: Opt = Opt::from_args(); let mut opt: Opt = Opt::from_args();
opt.repo = env::current_dir().unwrap().join(opt.repo); opt.repo = env::current_dir().unwrap().join(opt.repo);
@ -66,7 +65,7 @@ fn main() -> RepoResult<()> {
} else { } else {
Settings::default() Settings::default()
}; };
clean_old_connection_files(&opt.repo)?; clean_old_connection_files(&opt.repo).await?;
let mut guards = Vec::new(); let mut guards = Vec::new();
if opt.profile { if opt.profile {
@ -76,10 +75,11 @@ fn main() -> RepoResult<()> {
} }
let result = match opt.cmd.clone() { let result = match opt.cmd.clone() {
SubCommand::Init { force } => get_single_thread_runtime().block_on(init(opt, force)), SubCommand::Init { force } => init(opt, force).await,
SubCommand::Start => get_multi_thread_runtime().block_on(start_server(opt, settings)), SubCommand::Start => start_server(opt, settings).await,
}; };
opentelemetry::global::shutdown_tracer_provider();
match result { match result {
Ok(_) => Ok(()), Ok(_) => Ok(()),
Err(e) => { Err(e) => {
@ -90,23 +90,6 @@ fn main() -> RepoResult<()> {
} }
} }
fn get_single_thread_runtime() -> Runtime {
log::info!("Using current thread runtime");
runtime::Builder::new_current_thread()
.enable_all()
.max_blocking_threads(1)
.build()
.unwrap()
}
fn get_multi_thread_runtime() -> Runtime {
log::info!("Using multi thread runtime");
runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap()
}
async fn init_repo(opt: &Opt, paths: &PathSettings) -> RepoResult<Repo> { async fn init_repo(opt: &Opt, paths: &PathSettings) -> RepoResult<Repo> {
let repo = get_repo(&opt.repo, paths).await?; let repo = get_repo(&opt.repo, paths).await?;
@ -244,14 +227,14 @@ async fn init(opt: Opt, force: bool) -> RepoResult<()> {
Ok(()) Ok(())
} }
fn clean_old_connection_files(root: &PathBuf) -> RepoResult<()> { async fn clean_old_connection_files(root: &PathBuf) -> RepoResult<()> {
let paths = ["repo.tcp", "repo.sock"]; let paths = ["repo.tcp", "repo.sock"];
for path in paths { for path in paths {
let path = root.join(path); let path = root.join(path);
if path.exists() { if path.exists() {
std::fs::remove_file(&path)?; tokio::fs::remove_file(&path).await?;
} }
} }

Loading…
Cancel
Save