From 2f11c873951b369f20c74406aba2892f519a95c7 Mon Sep 17 00:00:00 2001 From: trivernis Date: Sat, 12 Mar 2022 18:32:22 +0100 Subject: [PATCH] Add calculate_sizes implementation as dispatchable job Signed-off-by: trivernis --- mediarepo-daemon/Cargo.lock | 1 + mediarepo-daemon/mediarepo-socket/Cargo.toml | 3 + .../mediarepo-socket/src/namespaces/jobs.rs | 35 +++++--- .../mediarepo-socket/src/namespaces/repo.rs | 4 +- .../mediarepo-socket/src/utils.rs | 37 ++------ .../mediarepo-worker/src/job_dispatcher.rs | 12 +-- .../src/jobs/calculate_sizes.rs | 90 +++++++++++++++++++ .../mediarepo-worker/src/jobs/mod.rs | 13 ++- .../mediarepo-worker/src/jobs/vacuum.rs | 10 +-- mediarepo-daemon/mediarepo-worker/src/lib.rs | 1 + .../mediarepo-worker/src/status_utils.rs | 34 +++++++ 11 files changed, 178 insertions(+), 62 deletions(-) create mode 100644 mediarepo-daemon/mediarepo-worker/src/jobs/calculate_sizes.rs create mode 100644 mediarepo-daemon/mediarepo-worker/src/status_utils.rs diff --git a/mediarepo-daemon/Cargo.lock b/mediarepo-daemon/Cargo.lock index 3f7b0bc..4596f26 100644 --- a/mediarepo-daemon/Cargo.lock +++ b/mediarepo-daemon/Cargo.lock @@ -1383,6 +1383,7 @@ dependencies = [ "mediarepo-core", "mediarepo-database", "mediarepo-logic", + "mediarepo-worker", "port_check", "rayon", "serde", diff --git a/mediarepo-daemon/mediarepo-socket/Cargo.toml b/mediarepo-daemon/mediarepo-socket/Cargo.toml index 5f5b75a..b498f81 100644 --- a/mediarepo-daemon/mediarepo-socket/Cargo.toml +++ b/mediarepo-daemon/mediarepo-socket/Cargo.toml @@ -22,6 +22,9 @@ path = "../mediarepo-database" [dependencies.mediarepo-logic] path = "../mediarepo-logic" +[dependencies.mediarepo-worker] +path = "../mediarepo-worker" + [dependencies.tokio] version = "1.17.0" features = ["net"] diff --git a/mediarepo-daemon/mediarepo-socket/src/namespaces/jobs.rs b/mediarepo-daemon/mediarepo-socket/src/namespaces/jobs.rs index c912b70..0fe150c 100644 --- a/mediarepo-daemon/mediarepo-socket/src/namespaces/jobs.rs +++ b/mediarepo-daemon/mediarepo-socket/src/namespaces/jobs.rs @@ -1,11 +1,11 @@ use mediarepo_core::bromine::prelude::*; use mediarepo_core::error::RepoResult; use mediarepo_core::mediarepo_api::types::jobs::{JobType, RunJobRequest}; -use mediarepo_core::mediarepo_api::types::repo::SizeType; -use mediarepo_core::type_keys::SizeMetadataKey; +use mediarepo_core::type_keys::{RepoPathKey, SettingsKey, SizeMetadataKey}; use mediarepo_logic::dao::DaoProvider; +use mediarepo_worker::jobs::{CalculateSizesJob, VacuumJob}; -use crate::utils::{calculate_size, get_repo_from_context}; +use crate::utils::{get_job_dispatcher_from_context, get_repo_from_context}; pub struct JobsNamespace; @@ -26,6 +26,7 @@ impl JobsNamespace { pub async fn run_job(ctx: &Context, event: Event) -> IPCResult { let run_request = event.payload::()?; let job_dao = get_repo_from_context(ctx).await.job(); + let dispatcher = get_job_dispatcher_from_context(ctx).await; if !run_request.sync { // early response to indicate that the job will be run @@ -36,7 +37,9 @@ impl JobsNamespace { JobType::MigrateContentDescriptors => job_dao.migrate_content_descriptors().await?, JobType::CalculateSizes => calculate_all_sizes(ctx).await?, JobType::CheckIntegrity => job_dao.check_integrity().await?, - JobType::Vacuum => job_dao.vacuum().await?, + JobType::Vacuum => { + dispatcher.dispatch(VacuumJob::default()).await; + } JobType::GenerateThumbnails => job_dao.generate_missing_thumbnails().await?, } @@ -45,14 +48,22 @@ impl JobsNamespace { } async fn calculate_all_sizes(ctx: &Context) -> RepoResult<()> { - let size_types = vec![ - SizeType::Total, - SizeType::FileFolder, - SizeType::ThumbFolder, - SizeType::DatabaseFile, - ]; - for size_type in size_types { - let size = calculate_size(&size_type, ctx).await?; + let (repo_path, settings) = { + let data = ctx.data.read().await; + ( + data.get::().unwrap().clone(), + data.get::().unwrap().clone(), + ) + }; + let job = CalculateSizesJob::new(repo_path, settings); + let dispatcher = get_job_dispatcher_from_context(ctx).await; + let state = dispatcher.dispatch(job).await; + let mut rx = { + let state = state.read().await; + state.sizes_channel.subscribe() + }; + + while let Ok((size_type, size)) = rx.recv().await { let mut data = ctx.data.write().await; let size_map = data.get_mut::().unwrap(); size_map.insert(size_type, size); diff --git a/mediarepo-daemon/mediarepo-socket/src/namespaces/repo.rs b/mediarepo-daemon/mediarepo-socket/src/namespaces/repo.rs index 953f0a3..d1516c2 100644 --- a/mediarepo-daemon/mediarepo-socket/src/namespaces/repo.rs +++ b/mediarepo-daemon/mediarepo-socket/src/namespaces/repo.rs @@ -8,7 +8,7 @@ use mediarepo_core::mediarepo_api::types::repo::{ }; use mediarepo_core::type_keys::{RepoPathKey, SettingsKey, SizeMetadataKey}; -use crate::utils::{calculate_size, get_repo_from_context}; +use crate::utils::get_repo_from_context; pub struct RepoNamespace; @@ -56,7 +56,7 @@ impl RepoNamespace { let size = if let Some(size) = size_cache.get(&size_type) { *size } else { - calculate_size(&size_type, ctx).await? + 0 }; ctx.response(SizeMetadata { size, size_type }) diff --git a/mediarepo-daemon/mediarepo-socket/src/utils.rs b/mediarepo-daemon/mediarepo-socket/src/utils.rs index 5e48cdc..e61681a 100644 --- a/mediarepo-daemon/mediarepo-socket/src/utils.rs +++ b/mediarepo-daemon/mediarepo-socket/src/utils.rs @@ -1,18 +1,14 @@ use std::sync::Arc; -use tokio::fs; - use mediarepo_core::bromine::ipc::context::Context; use mediarepo_core::content_descriptor::decode_content_descriptor; use mediarepo_core::error::{RepoError, RepoResult}; use mediarepo_core::mediarepo_api::types::identifier::FileIdentifier; -use mediarepo_core::mediarepo_api::types::repo::SizeType; -use mediarepo_core::type_keys::{RepoPathKey, SettingsKey}; -use mediarepo_core::utils::get_folder_size; -use mediarepo_logic::dao::DaoProvider; use mediarepo_logic::dao::repo::Repo; +use mediarepo_logic::dao::DaoProvider; use mediarepo_logic::dto::FileDto; use mediarepo_logic::type_keys::RepoKey; +use mediarepo_worker::job_dispatcher::{DispatcherKey, JobDispatcher}; pub async fn get_repo_from_context(ctx: &Context) -> Arc { let data = ctx.data.read().await; @@ -20,6 +16,11 @@ pub async fn get_repo_from_context(ctx: &Context) -> Arc { Arc::clone(repo) } +pub async fn get_job_dispatcher_from_context(ctx: &Context) -> JobDispatcher { + let data = ctx.data.read().await; + data.get::().unwrap().clone() +} + pub async fn file_by_identifier(identifier: FileIdentifier, repo: &Repo) -> RepoResult { let file = match identifier { FileIdentifier::ID(id) => repo.file().by_id(id).await, @@ -41,27 +42,3 @@ pub async fn cd_by_identifier(identifier: FileIdentifier, repo: &Repo) -> RepoRe FileIdentifier::CD(cd) => decode_content_descriptor(cd), } } - -pub async fn calculate_size(size_type: &SizeType, ctx: &Context) -> RepoResult { - let repo = get_repo_from_context(ctx).await; - let (repo_path, settings) = { - let data = ctx.data.read().await; - ( - data.get::().unwrap().clone(), - data.get::().unwrap().clone(), - ) - }; - let size = match &size_type { - SizeType::Total => get_folder_size(repo_path).await?, - SizeType::FileFolder => repo.get_main_store_size().await?, - SizeType::ThumbFolder => repo.get_thumb_store_size().await?, - SizeType::DatabaseFile => { - let db_path = settings.paths.db_file_path(&repo_path); - - let database_metadata = fs::metadata(db_path).await?; - database_metadata.len() - } - }; - - Ok(size) -} diff --git a/mediarepo-daemon/mediarepo-worker/src/job_dispatcher.rs b/mediarepo-daemon/mediarepo-worker/src/job_dispatcher.rs index 03f4c61..869ea77 100644 --- a/mediarepo-daemon/mediarepo-worker/src/job_dispatcher.rs +++ b/mediarepo-daemon/mediarepo-worker/src/job_dispatcher.rs @@ -25,7 +25,7 @@ impl JobDispatcher { } } - pub async fn dispatch(&self, job: T) -> Arc> { + pub async fn dispatch(&self, job: T) -> Arc> { self._dispatch(job, None).await } @@ -33,7 +33,7 @@ impl JobDispatcher { &self, job: T, interval: Duration, - ) -> Arc> { + ) -> Arc> { self._dispatch(job, Some(interval)).await } @@ -42,8 +42,8 @@ impl JobDispatcher { &self, job: T, interval: Option, - ) -> Arc> { - let status = job.status(); + ) -> Arc> { + let status = job.state(); self.add_status::>(status.clone()).await; let subsystem = unsafe { @@ -61,14 +61,14 @@ impl JobDispatcher { let job_2 = job.clone(); let result = tokio::select! { _ = subsystem.on_shutdown_requested() => { - job_2.save_status(repo.job()).await + job_2.save_state(repo.job()).await } r = job.run(repo.clone()) => { if let Err(e) = r { Err(e) } else { - job.save_status(repo.job()).await + job.save_state(repo.job()).await } } }; diff --git a/mediarepo-daemon/mediarepo-worker/src/jobs/calculate_sizes.rs b/mediarepo-daemon/mediarepo-worker/src/jobs/calculate_sizes.rs new file mode 100644 index 0000000..6f13a60 --- /dev/null +++ b/mediarepo-daemon/mediarepo-worker/src/jobs/calculate_sizes.rs @@ -0,0 +1,90 @@ +use crate::jobs::Job; +use crate::status_utils::SimpleProgress; +use async_trait::async_trait; +use mediarepo_core::error::{RepoError, RepoResult}; +use mediarepo_core::mediarepo_api::types::repo::SizeType; +use mediarepo_core::settings::Settings; +use mediarepo_core::utils::get_folder_size; +use mediarepo_logic::dao::repo::Repo; +use std::path::PathBuf; +use std::sync::Arc; +use tokio::fs; +use tokio::sync::broadcast::{self, Sender}; +use tokio::sync::RwLock; + +pub struct CalculateSizesState { + pub progress: SimpleProgress, + pub sizes_channel: Sender<(SizeType, u64)>, +} + +#[derive(Clone)] +pub struct CalculateSizesJob { + repo_path: PathBuf, + settings: Settings, + state: Arc>, +} + +impl CalculateSizesJob { + pub fn new(repo_path: PathBuf, settings: Settings) -> Self { + let (tx, _) = broadcast::channel(4); + Self { + repo_path, + settings, + state: Arc::new(RwLock::new(CalculateSizesState { + sizes_channel: tx, + progress: SimpleProgress::new(4), + })), + } + } +} + +#[async_trait] +impl Job for CalculateSizesJob { + type JobState = CalculateSizesState; + + fn state(&self) -> Arc> { + self.state.clone() + } + + #[tracing::instrument(level = "debug", skip_all)] + async fn run(&self, repo: Arc) -> RepoResult<()> { + let size_types = vec![ + SizeType::Total, + SizeType::FileFolder, + SizeType::ThumbFolder, + SizeType::DatabaseFile, + ]; + for size_type in size_types { + let size = calculate_size(&size_type, &repo, &self.repo_path, &self.settings).await?; + let mut state = self.state.write().await; + state + .sizes_channel + .send((size_type, size)) + .map_err(|_| RepoError::from("failed to broadcast new size"))?; + state.progress.tick(); + } + + Ok(()) + } +} + +async fn calculate_size( + size_type: &SizeType, + repo: &Repo, + repo_path: &PathBuf, + settings: &Settings, +) -> RepoResult { + let size = match &size_type { + SizeType::Total => get_folder_size(repo_path.clone()).await?, + SizeType::FileFolder => repo.get_main_store_size().await?, + SizeType::ThumbFolder => repo.get_thumb_store_size().await?, + SizeType::DatabaseFile => { + let db_path = settings.paths.db_file_path(repo_path); + + let database_metadata = fs::metadata(db_path).await?; + database_metadata.len() + } + }; + + Ok(size) +} diff --git a/mediarepo-daemon/mediarepo-worker/src/jobs/mod.rs b/mediarepo-daemon/mediarepo-worker/src/jobs/mod.rs index 8b34449..607ea62 100644 --- a/mediarepo-daemon/mediarepo-worker/src/jobs/mod.rs +++ b/mediarepo-daemon/mediarepo-worker/src/jobs/mod.rs @@ -1,5 +1,7 @@ +mod calculate_sizes; mod vacuum; +pub use calculate_sizes::*; use std::marker::PhantomData; use std::sync::Arc; pub use vacuum::*; @@ -15,13 +17,16 @@ type EmptyStatus = Arc>; #[async_trait] pub trait Job: Clone + Send + Sync { - type JobStatus: Send + Sync; + type JobState: Send + Sync; - fn status(&self) -> Arc>; + fn state(&self) -> Arc>; async fn run(&self, repo: Arc) -> RepoResult<()>; - async fn save_status(&self, job_dao: JobDao) -> RepoResult<()>; + #[tracing::instrument(level = "debug", skip_all)] + async fn save_state(&self, _job_dao: JobDao) -> RepoResult<()> { + Ok(()) + } } pub struct JobTypeKey(PhantomData); @@ -30,5 +35,5 @@ impl TypeMapKey for JobTypeKey where T: Job, { - type Value = Arc>; + type Value = Arc>; } diff --git a/mediarepo-daemon/mediarepo-worker/src/jobs/vacuum.rs b/mediarepo-daemon/mediarepo-worker/src/jobs/vacuum.rs index 0be7d76..5335de6 100644 --- a/mediarepo-daemon/mediarepo-worker/src/jobs/vacuum.rs +++ b/mediarepo-daemon/mediarepo-worker/src/jobs/vacuum.rs @@ -1,7 +1,6 @@ use crate::jobs::{EmptyStatus, Job}; use async_trait::async_trait; use mediarepo_core::error::RepoResult; -use mediarepo_logic::dao::job::JobDao; use mediarepo_logic::dao::repo::Repo; use mediarepo_logic::dao::DaoProvider; use std::sync::Arc; @@ -12,9 +11,9 @@ pub struct VacuumJob; #[async_trait] impl Job for VacuumJob { - type JobStatus = (); + type JobState = (); - fn status(&self) -> Arc> { + fn state(&self) -> Arc> { EmptyStatus::default() } @@ -24,9 +23,4 @@ impl Job for VacuumJob { Ok(()) } - - #[tracing::instrument(level = "debug", skip_all)] - async fn save_status(&self, _: JobDao) -> RepoResult<()> { - Ok(()) - } } diff --git a/mediarepo-daemon/mediarepo-worker/src/lib.rs b/mediarepo-daemon/mediarepo-worker/src/lib.rs index a362087..617eb4f 100644 --- a/mediarepo-daemon/mediarepo-worker/src/lib.rs +++ b/mediarepo-daemon/mediarepo-worker/src/lib.rs @@ -8,6 +8,7 @@ use tokio::sync::oneshot::channel; pub mod job_dispatcher; pub mod jobs; +pub mod status_utils; pub async fn start(top_level: Toplevel, repo: Repo) -> (Toplevel, JobDispatcher) { let (tx, rx) = channel(); diff --git a/mediarepo-daemon/mediarepo-worker/src/status_utils.rs b/mediarepo-daemon/mediarepo-worker/src/status_utils.rs new file mode 100644 index 0000000..2cdafc4 --- /dev/null +++ b/mediarepo-daemon/mediarepo-worker/src/status_utils.rs @@ -0,0 +1,34 @@ +pub struct SimpleProgress { + pub current: u64, + pub total: u64, +} + +impl Default for SimpleProgress { + fn default() -> Self { + Self { + total: 100, + current: 0, + } + } +} + +impl SimpleProgress { + pub fn new(total: u64) -> Self { + Self { total, current: 0 } + } + + /// Increments the current progress by 1 + pub fn tick(&mut self) { + self.current += 1; + } + + /// Sets the current progress to a defined value + pub fn set_current(&mut self, current: u64) { + self.current = current; + } + + /// Returns the total progress in percent + pub fn percent(&self) -> f64 { + (self.current as f64) / (self.total as f64) + } +}