From 22f28a79d105e26040a015abc4538fa75579c511 Mon Sep 17 00:00:00 2001 From: trivernis Date: Wed, 16 Mar 2022 21:40:13 +0100 Subject: [PATCH] Update to latest bromine Signed-off-by: trivernis --- mediarepo-api/Cargo.toml | 3 +- mediarepo-daemon/Cargo.lock | 39 ++++++---- mediarepo-daemon/mediarepo-core/Cargo.toml | 7 +- mediarepo-daemon/mediarepo-core/src/lib.rs | 3 +- .../mediarepo-core/src/type_keys.rs | 2 +- .../mediarepo-core/src/type_list.rs | 46 ----------- .../mediarepo-logic/src/type_keys.rs | 2 +- mediarepo-daemon/mediarepo-socket/src/lib.rs | 6 +- .../mediarepo-socket/src/namespaces/jobs.rs | 1 + .../mediarepo-socket/src/namespaces/repo.rs | 1 + .../mediarepo-socket/src/utils.rs | 1 + .../mediarepo-worker/src/job_dispatcher.rs | 78 +++++++++---------- .../mediarepo-worker/src/jobs/mod.rs | 2 +- mediarepo-daemon/src/main.rs | 32 +++++--- 14 files changed, 97 insertions(+), 126 deletions(-) delete mode 100644 mediarepo-daemon/mediarepo-core/src/type_list.rs diff --git a/mediarepo-api/Cargo.toml b/mediarepo-api/Cargo.toml index 62a9c17..ba7b42b 100644 --- a/mediarepo-api/Cargo.toml +++ b/mediarepo-api/Cargo.toml @@ -20,9 +20,8 @@ url = { version = "2.2.2", optional = true } pathsearch = { version = "0.2.0", optional = true } [dependencies.bromine] -version = "0.18.1" +version = "0.19.0" optional = true -git = "https://github.com/Trivernis/bromine" features = ["serialize_bincode"] [dependencies.serde] diff --git a/mediarepo-daemon/Cargo.lock b/mediarepo-daemon/Cargo.lock index 4596f26..5c4ebb6 100644 --- a/mediarepo-daemon/Cargo.lock +++ b/mediarepo-daemon/Cargo.lock @@ -81,9 +81,9 @@ checksum = "8da52d66c7071e2e3fa2a1e5c6d088fec47b593032b254f5e980de8ea54454d6" [[package]] name = "async-recursion" -version = "0.3.2" +version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d7d78656ba01f1b93024b7c3a0467f1608e4be67d725749fdcd7d2c7678fd7a2" +checksum = "2cda8f4bcc10624c4e85bc66b3f452cca98cfa5ca002dc83a16aad2367641bea" dependencies = [ "proc-macro2 1.0.36", "quote 1.0.15", @@ -270,8 +270,9 @@ dependencies = [ [[package]] name = "bromine" -version = "0.18.3" -source = "git+https://github.com/Trivernis/bromine#c2728a44ead210e1535bce5fc2d3979530700b96" +version = "0.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a05cd0cd5646e705df88816dcc36eaf4e21b940cea66f1e027970cd58e3dc897" dependencies = [ "async-trait", "bincode", @@ -283,7 +284,7 @@ dependencies = [ "thiserror", "tokio", "tracing", - "typemap_rev", + "trait-bound-typemap", ] [[package]] @@ -1318,7 +1319,7 @@ dependencies = [ "toml", "tracing", "tracing-subscriber", - "typemap_rev", + "trait-bound-typemap", ] [[package]] @@ -1485,6 +1486,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "multi-trait-object" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a54c9ed2b86c7927b63e7d51f8d7ed4e1f8513c8672828ca1a850ff9d32ab1c" + [[package]] name = "multibase" version = "0.9.1" @@ -2871,16 +2878,16 @@ dependencies = [ [[package]] name = "tokio-graceful-shutdown" -version = "0.4.3" +version = "0.4.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d08ebea7dc6b22273290d8ece2ca448f979f836e38ba629b650595c64204b4f2" +checksum = "4f21c36e43c82d5f32302aff8ac9efb79e10db9538b0940ef69cce38a01614ae" dependencies = [ "anyhow", "async-recursion", "futures 0.3.21", "log", "tokio", - "tokio-util 0.6.9", + "tokio-util 0.7.0", ] [[package]] @@ -3159,17 +3166,21 @@ dependencies = [ "tracing-serde", ] +[[package]] +name = "trait-bound-typemap" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3631df5ba73c0e41b1aa337df3bcca7f15219f042f8fec1100857bc1eb60c767" +dependencies = [ + "multi-trait-object", +] + [[package]] name = "try-lock" version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642" -[[package]] -name = "typemap_rev" -version = "0.1.5" -source = "git+https://github.com/Trivernis/typemap_rev?rev=750c67bffe8024d2a47725daa473f068ad653fc4#750c67bffe8024d2a47725daa473f068ad653fc4" - [[package]] name = "typenum" version = "1.15.0" diff --git a/mediarepo-daemon/mediarepo-core/Cargo.toml b/mediarepo-daemon/mediarepo-core/Cargo.toml index d7af792..48d2bbc 100644 --- a/mediarepo-daemon/mediarepo-core/Cargo.toml +++ b/mediarepo-daemon/mediarepo-core/Cargo.toml @@ -18,14 +18,11 @@ itertools = "0.10.3" glob = "0.3.0" tracing = "0.1.32" data-encoding = "2.3.2" -tokio-graceful-shutdown = "0.4.3" +tokio-graceful-shutdown = "0.4.4" thumbnailer = "0.4.0" bincode = "1.3.3" tracing-subscriber = "0.3.9" - -[dependencies.typemap_rev] -git = "https://github.com/Trivernis/typemap_rev" -rev = "750c67bffe8024d2a47725daa473f068ad653fc4" +trait-bound-typemap = "0.3.3" [dependencies.sea-orm] version = "0.6.0" diff --git a/mediarepo-daemon/mediarepo-core/src/lib.rs b/mediarepo-daemon/mediarepo-core/src/lib.rs index d98957f..1edc42d 100644 --- a/mediarepo-daemon/mediarepo-core/src/lib.rs +++ b/mediarepo-daemon/mediarepo-core/src/lib.rs @@ -5,7 +5,7 @@ pub use mediarepo_api; pub use mediarepo_api::bromine; pub use thumbnailer; pub use tokio_graceful_shutdown; -pub use typemap_rev; +pub use trait_bound_typemap; pub mod content_descriptor; pub mod context; @@ -14,5 +14,4 @@ pub mod fs; pub mod settings; pub mod tracing_layer_list; pub mod type_keys; -pub mod type_list; pub mod utils; diff --git a/mediarepo-daemon/mediarepo-core/src/type_keys.rs b/mediarepo-daemon/mediarepo-core/src/type_keys.rs index 8cebd2c..778bcb8 100644 --- a/mediarepo-daemon/mediarepo-core/src/type_keys.rs +++ b/mediarepo-daemon/mediarepo-core/src/type_keys.rs @@ -3,7 +3,7 @@ use std::path::PathBuf; use mediarepo_api::types::repo::SizeType; use tokio_graceful_shutdown::SubsystemHandle; -use typemap_rev::TypeMapKey; +use trait_bound_typemap::TypeMapKey; use crate::settings::Settings; diff --git a/mediarepo-daemon/mediarepo-core/src/type_list.rs b/mediarepo-daemon/mediarepo-core/src/type_list.rs deleted file mode 100644 index fbe913f..0000000 --- a/mediarepo-daemon/mediarepo-core/src/type_list.rs +++ /dev/null @@ -1,46 +0,0 @@ -use std::any::{Any, TypeId}; -use std::vec::IntoIter; -use typemap_rev::TypeMapKey; - -pub trait CloneAny: Any + Send + Sync { - fn clone_any(&self) -> Box; -} - -impl CloneAny for T { - fn clone_any(&self) -> Box { - Box::new(self.clone()) - } -} - -impl Clone for Box { - fn clone(&self) -> Self { - (**self).clone_any() - } -} - -#[derive(Default, Clone)] -pub struct TypeList(Vec<(TypeId, Box)>); - -impl TypeList { - pub fn add, C: CloneAny>(&mut self, value: T::Value) { - self.0.push((TypeId::of::(), Box::new(value))) - } -} - -impl IntoIterator for TypeList { - type Item = (TypeId, Box); - type IntoIter = IntoIter; - - fn into_iter(self) -> Self::IntoIter { - self.0 - .into_iter() - .map(|(t, v)| { - (t, unsafe { - // SAFETY: CloneAny requires types to be Any + Send + Sync (+ Clone) - std::mem::transmute::, Box>(v) - }) - }) - .collect::)>>() - .into_iter() - } -} diff --git a/mediarepo-daemon/mediarepo-logic/src/type_keys.rs b/mediarepo-daemon/mediarepo-logic/src/type_keys.rs index 649405e..b19a866 100644 --- a/mediarepo-daemon/mediarepo-logic/src/type_keys.rs +++ b/mediarepo-daemon/mediarepo-logic/src/type_keys.rs @@ -1,7 +1,7 @@ +use mediarepo_core::trait_bound_typemap::TypeMapKey; use std::sync::Arc; use crate::dao::repo::Repo; -use mediarepo_core::typemap_rev::TypeMapKey; pub struct RepoKey; diff --git a/mediarepo-daemon/mediarepo-socket/src/lib.rs b/mediarepo-daemon/mediarepo-socket/src/lib.rs index de4847a..5818419 100644 --- a/mediarepo-daemon/mediarepo-socket/src/lib.rs +++ b/mediarepo-daemon/mediarepo-socket/src/lib.rs @@ -8,8 +8,8 @@ use mediarepo_core::error::{RepoError, RepoResult}; use mediarepo_core::mediarepo_api::types::misc::InfoResponse; use mediarepo_core::settings::{PortSetting, Settings}; use mediarepo_core::tokio_graceful_shutdown::SubsystemHandle; +use mediarepo_core::trait_bound_typemap::{SendSyncTypeMap, TypeMap}; use mediarepo_core::type_keys::{SizeMetadataKey, SubsystemKey}; -use mediarepo_core::type_list::TypeList; mod from_model; mod namespaces; @@ -19,7 +19,7 @@ mod utils; pub fn start_tcp_server( subsystem: SubsystemHandle, settings: Settings, - shared_data: TypeList, + shared_data: SendSyncTypeMap, ) -> RepoResult<(String, JoinHandle<()>)> { let port = match &settings.server.tcp.port { PortSetting::Fixed(p) => { @@ -56,7 +56,7 @@ pub fn start_tcp_server( pub fn create_unix_socket( subsystem: SubsystemHandle, path: std::path::PathBuf, - shared_data: TypeList, + shared_data: SendSyncTypeMap, ) -> RepoResult> { use std::fs; use tokio::net::UnixListener; diff --git a/mediarepo-daemon/mediarepo-socket/src/namespaces/jobs.rs b/mediarepo-daemon/mediarepo-socket/src/namespaces/jobs.rs index 0fe150c..998d279 100644 --- a/mediarepo-daemon/mediarepo-socket/src/namespaces/jobs.rs +++ b/mediarepo-daemon/mediarepo-socket/src/namespaces/jobs.rs @@ -1,3 +1,4 @@ +use crate::TypeMap; use mediarepo_core::bromine::prelude::*; use mediarepo_core::error::RepoResult; use mediarepo_core::mediarepo_api::types::jobs::{JobType, RunJobRequest}; diff --git a/mediarepo-daemon/mediarepo-socket/src/namespaces/repo.rs b/mediarepo-daemon/mediarepo-socket/src/namespaces/repo.rs index d1516c2..05d605f 100644 --- a/mediarepo-daemon/mediarepo-socket/src/namespaces/repo.rs +++ b/mediarepo-daemon/mediarepo-socket/src/namespaces/repo.rs @@ -2,6 +2,7 @@ use std::path::PathBuf; use tokio::fs; +use crate::TypeMap; use mediarepo_core::bromine::prelude::*; use mediarepo_core::mediarepo_api::types::repo::{ FrontendState, RepositoryMetadata, SizeMetadata, SizeType, diff --git a/mediarepo-daemon/mediarepo-socket/src/utils.rs b/mediarepo-daemon/mediarepo-socket/src/utils.rs index e61681a..2c3fc57 100644 --- a/mediarepo-daemon/mediarepo-socket/src/utils.rs +++ b/mediarepo-daemon/mediarepo-socket/src/utils.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use crate::TypeMap; use mediarepo_core::bromine::ipc::context::Context; use mediarepo_core::content_descriptor::decode_content_descriptor; use mediarepo_core::error::{RepoError, RepoResult}; diff --git a/mediarepo-daemon/mediarepo-worker/src/job_dispatcher.rs b/mediarepo-daemon/mediarepo-worker/src/job_dispatcher.rs index 869ea77..5d17e05 100644 --- a/mediarepo-daemon/mediarepo-worker/src/job_dispatcher.rs +++ b/mediarepo-daemon/mediarepo-worker/src/job_dispatcher.rs @@ -1,9 +1,8 @@ use crate::jobs::{Job, JobTypeKey}; use mediarepo_core::tokio_graceful_shutdown::SubsystemHandle; -use mediarepo_core::typemap_rev::{TypeMap, TypeMapKey}; +use mediarepo_core::trait_bound_typemap::{SendSyncTypeMap, TypeMap, TypeMapKey}; use mediarepo_logic::dao::repo::Repo; use mediarepo_logic::dao::DaoProvider; -use std::cell::UnsafeCell; use std::sync::Arc; use std::time::Duration; use tokio::sync::RwLock; @@ -11,16 +10,16 @@ use tokio::time::Instant; #[derive(Clone)] pub struct JobDispatcher { - subsystem: Arc>, - job_status_map: Arc>, + subsystem: SubsystemHandle, + job_status_map: Arc>, repo: Arc, } impl JobDispatcher { pub fn new(subsystem: SubsystemHandle, repo: Repo) -> Self { Self { - job_status_map: Default::default(), - subsystem: Arc::new(UnsafeCell::new(subsystem)), + job_status_map: Arc::new(RwLock::new(SendSyncTypeMap::new())), + subsystem, repo: Arc::new(repo), } } @@ -46,54 +45,51 @@ impl JobDispatcher { let status = job.state(); self.add_status::>(status.clone()).await; - let subsystem = unsafe { - // SAFETY: the subsystem requires a mutable borrow for the start method - // the implementation of start doesn't need that mutability. So until that's - // changed we have to do some trickery. - &mut *self.subsystem.get() - }; - let repo = self.repo.clone(); - subsystem.start("worker-job", move |subsystem| async move { - loop { - let start = Instant::now(); - let job_2 = job.clone(); - let result = tokio::select! { - _ = subsystem.on_shutdown_requested() => { - job_2.save_state(repo.job()).await - } - r = job.run(repo.clone()) => { + self.subsystem + .start("worker-job", move |subsystem| async move { + loop { + let start = Instant::now(); + let job_2 = job.clone(); + let result = tokio::select! { + _ = subsystem.on_shutdown_requested() => { + job_2.save_state(repo.job()).await + } + r = job.run(repo.clone()) => { - if let Err(e) = r { - Err(e) - } else { - job.save_state(repo.job()).await + if let Err(e) = r { + Err(e) + } else { + job.save_state(repo.job()).await + } } + }; + if let Err(e) = result { + tracing::error!("job failed with error: {}", e); } - }; - if let Err(e) = result { - tracing::error!("job failed with error: {}", e); - } - if let Some(interval) = interval { - let sleep_duration = interval - start.elapsed(); - tokio::select! { - _ = tokio::time::sleep(sleep_duration) => {}, - _ = subsystem.on_shutdown_requested() => {break} + if let Some(interval) = interval { + let sleep_duration = interval - start.elapsed(); + tokio::select! { + _ = tokio::time::sleep(sleep_duration) => {}, + _ = subsystem.on_shutdown_requested() => {break} + } + } else { + break; } - } else { - break; } - } - Ok(()) - }); + Ok(()) + }); status } #[inline] - async fn add_status(&self, status: T::Value) { + async fn add_status(&self, status: T::Value) + where + ::Value: Send + Sync, + { let mut status_map = self.job_status_map.write().await; status_map.insert::(status); } diff --git a/mediarepo-daemon/mediarepo-worker/src/jobs/mod.rs b/mediarepo-daemon/mediarepo-worker/src/jobs/mod.rs index 607ea62..afb8075 100644 --- a/mediarepo-daemon/mediarepo-worker/src/jobs/mod.rs +++ b/mediarepo-daemon/mediarepo-worker/src/jobs/mod.rs @@ -8,7 +8,7 @@ pub use vacuum::*; use async_trait::async_trait; use mediarepo_core::error::RepoResult; -use mediarepo_core::typemap_rev::TypeMapKey; +use mediarepo_core::trait_bound_typemap::TypeMapKey; use mediarepo_logic::dao::job::JobDao; use mediarepo_logic::dao::repo::Repo; use tokio::sync::RwLock; diff --git a/mediarepo-daemon/src/main.rs b/mediarepo-daemon/src/main.rs index dc4b950..d28a49c 100644 --- a/mediarepo-daemon/src/main.rs +++ b/mediarepo-daemon/src/main.rs @@ -1,4 +1,5 @@ use std::env; +use std::iter::FromIterator; use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; @@ -11,8 +12,8 @@ use mediarepo_core::error::RepoResult; use mediarepo_core::fs::drop_file::DropFile; use mediarepo_core::settings::{PathSettings, Settings}; use mediarepo_core::tokio_graceful_shutdown::{SubsystemHandle, Toplevel}; +use mediarepo_core::trait_bound_typemap::{CloneSendSyncTypeMap, SendSyncTypeMap, TypeMap}; use mediarepo_core::type_keys::{RepoPathKey, SettingsKey}; -use mediarepo_core::type_list::TypeList; use mediarepo_logic::dao::repo::Repo; use mediarepo_logic::type_keys::RepoKey; use mediarepo_socket::start_tcp_server; @@ -106,11 +107,11 @@ async fn start_server(opt: Opt, settings: Settings) -> RepoResult<()> { let repo = init_repo(&opt, &settings.paths).await?; let (mut top_level, dispatcher) = mediarepo_worker::start(Toplevel::new(), repo.clone()).await; - let mut shared_data = TypeList::default(); - shared_data.add::(Arc::new(repo)); - shared_data.add::(settings.clone()); - shared_data.add::(opt.repo.clone()); - shared_data.add::(dispatcher); + let mut shared_data = CloneSendSyncTypeMap::new(); + shared_data.insert::(Arc::new(repo)); + shared_data.insert::(settings.clone()); + shared_data.insert::(opt.repo.clone()); + shared_data.insert::(dispatcher); #[cfg(unix)] { @@ -120,7 +121,12 @@ async fn start_server(opt: Opt, settings: Settings) -> RepoResult<()> { top_level = top_level.start("mediarepo-unix-socket", |subsystem| { Box::pin(async move { - start_and_await_unix_socket(subsystem, repo_path, shared_data).await?; + start_and_await_unix_socket( + subsystem, + repo_path, + SendSyncTypeMap::from_iter(shared_data), + ) + .await?; Ok(()) }) }) @@ -130,7 +136,13 @@ async fn start_server(opt: Opt, settings: Settings) -> RepoResult<()> { if settings.server.tcp.enabled { top_level = top_level.start("mediarepo-tcp", move |subsystem| { Box::pin(async move { - start_and_await_tcp_server(subsystem, opt.repo, settings, shared_data).await?; + start_and_await_tcp_server( + subsystem, + opt.repo, + settings, + SendSyncTypeMap::from_iter(shared_data), + ) + .await?; Ok(()) }) @@ -157,7 +169,7 @@ async fn start_and_await_tcp_server( subsystem: SubsystemHandle, repo_path: PathBuf, settings: Settings, - shared_data: TypeList, + shared_data: SendSyncTypeMap, ) -> RepoResult<()> { let (address, handle) = start_tcp_server(subsystem.clone(), settings, shared_data)?; let (mut file, _guard) = DropFile::new(repo_path.join("repo.tcp")).await?; @@ -182,7 +194,7 @@ async fn start_and_await_tcp_server( async fn start_and_await_unix_socket( subsystem: SubsystemHandle, repo_path: PathBuf, - shared_data: TypeList, + shared_data: SendSyncTypeMap, ) -> RepoResult<()> { let socket_path = repo_path.join("repo.sock"); let handle = mediarepo_socket::create_unix_socket(subsystem.clone(), socket_path, shared_data)?;