Add calculate_sizes implementation as dispatchable job

Signed-off-by: trivernis <trivernis@protonmail.com>
feature/jobs
trivernis 3 years ago
parent a57a6f32c4
commit 2f11c87395
Signed by: Trivernis
GPG Key ID: DFFFCC2C7A02DB45

@ -1383,6 +1383,7 @@ dependencies = [
"mediarepo-core", "mediarepo-core",
"mediarepo-database", "mediarepo-database",
"mediarepo-logic", "mediarepo-logic",
"mediarepo-worker",
"port_check", "port_check",
"rayon", "rayon",
"serde", "serde",

@ -22,6 +22,9 @@ path = "../mediarepo-database"
[dependencies.mediarepo-logic] [dependencies.mediarepo-logic]
path = "../mediarepo-logic" path = "../mediarepo-logic"
[dependencies.mediarepo-worker]
path = "../mediarepo-worker"
[dependencies.tokio] [dependencies.tokio]
version = "1.17.0" version = "1.17.0"
features = ["net"] features = ["net"]

@ -1,11 +1,11 @@
use mediarepo_core::bromine::prelude::*; use mediarepo_core::bromine::prelude::*;
use mediarepo_core::error::RepoResult; use mediarepo_core::error::RepoResult;
use mediarepo_core::mediarepo_api::types::jobs::{JobType, RunJobRequest}; use mediarepo_core::mediarepo_api::types::jobs::{JobType, RunJobRequest};
use mediarepo_core::mediarepo_api::types::repo::SizeType; use mediarepo_core::type_keys::{RepoPathKey, SettingsKey, SizeMetadataKey};
use mediarepo_core::type_keys::SizeMetadataKey;
use mediarepo_logic::dao::DaoProvider; use mediarepo_logic::dao::DaoProvider;
use mediarepo_worker::jobs::{CalculateSizesJob, VacuumJob};
use crate::utils::{calculate_size, get_repo_from_context}; use crate::utils::{get_job_dispatcher_from_context, get_repo_from_context};
pub struct JobsNamespace; pub struct JobsNamespace;
@ -26,6 +26,7 @@ impl JobsNamespace {
pub async fn run_job(ctx: &Context, event: Event) -> IPCResult<Response> { pub async fn run_job(ctx: &Context, event: Event) -> IPCResult<Response> {
let run_request = event.payload::<RunJobRequest>()?; let run_request = event.payload::<RunJobRequest>()?;
let job_dao = get_repo_from_context(ctx).await.job(); let job_dao = get_repo_from_context(ctx).await.job();
let dispatcher = get_job_dispatcher_from_context(ctx).await;
if !run_request.sync { if !run_request.sync {
// early response to indicate that the job will be run // early response to indicate that the job will be run
@ -36,7 +37,9 @@ impl JobsNamespace {
JobType::MigrateContentDescriptors => job_dao.migrate_content_descriptors().await?, JobType::MigrateContentDescriptors => job_dao.migrate_content_descriptors().await?,
JobType::CalculateSizes => calculate_all_sizes(ctx).await?, JobType::CalculateSizes => calculate_all_sizes(ctx).await?,
JobType::CheckIntegrity => job_dao.check_integrity().await?, JobType::CheckIntegrity => job_dao.check_integrity().await?,
JobType::Vacuum => job_dao.vacuum().await?, JobType::Vacuum => {
dispatcher.dispatch(VacuumJob::default()).await;
}
JobType::GenerateThumbnails => job_dao.generate_missing_thumbnails().await?, JobType::GenerateThumbnails => job_dao.generate_missing_thumbnails().await?,
} }
@ -45,14 +48,22 @@ impl JobsNamespace {
} }
async fn calculate_all_sizes(ctx: &Context) -> RepoResult<()> { async fn calculate_all_sizes(ctx: &Context) -> RepoResult<()> {
let size_types = vec![ let (repo_path, settings) = {
SizeType::Total, let data = ctx.data.read().await;
SizeType::FileFolder, (
SizeType::ThumbFolder, data.get::<RepoPathKey>().unwrap().clone(),
SizeType::DatabaseFile, data.get::<SettingsKey>().unwrap().clone(),
]; )
for size_type in size_types { };
let size = calculate_size(&size_type, ctx).await?; let job = CalculateSizesJob::new(repo_path, settings);
let dispatcher = get_job_dispatcher_from_context(ctx).await;
let state = dispatcher.dispatch(job).await;
let mut rx = {
let state = state.read().await;
state.sizes_channel.subscribe()
};
while let Ok((size_type, size)) = rx.recv().await {
let mut data = ctx.data.write().await; let mut data = ctx.data.write().await;
let size_map = data.get_mut::<SizeMetadataKey>().unwrap(); let size_map = data.get_mut::<SizeMetadataKey>().unwrap();
size_map.insert(size_type, size); size_map.insert(size_type, size);

@ -8,7 +8,7 @@ use mediarepo_core::mediarepo_api::types::repo::{
}; };
use mediarepo_core::type_keys::{RepoPathKey, SettingsKey, SizeMetadataKey}; use mediarepo_core::type_keys::{RepoPathKey, SettingsKey, SizeMetadataKey};
use crate::utils::{calculate_size, get_repo_from_context}; use crate::utils::get_repo_from_context;
pub struct RepoNamespace; pub struct RepoNamespace;
@ -56,7 +56,7 @@ impl RepoNamespace {
let size = if let Some(size) = size_cache.get(&size_type) { let size = if let Some(size) = size_cache.get(&size_type) {
*size *size
} else { } else {
calculate_size(&size_type, ctx).await? 0
}; };
ctx.response(SizeMetadata { size, size_type }) ctx.response(SizeMetadata { size, size_type })

@ -1,18 +1,14 @@
use std::sync::Arc; use std::sync::Arc;
use tokio::fs;
use mediarepo_core::bromine::ipc::context::Context; use mediarepo_core::bromine::ipc::context::Context;
use mediarepo_core::content_descriptor::decode_content_descriptor; use mediarepo_core::content_descriptor::decode_content_descriptor;
use mediarepo_core::error::{RepoError, RepoResult}; use mediarepo_core::error::{RepoError, RepoResult};
use mediarepo_core::mediarepo_api::types::identifier::FileIdentifier; use mediarepo_core::mediarepo_api::types::identifier::FileIdentifier;
use mediarepo_core::mediarepo_api::types::repo::SizeType;
use mediarepo_core::type_keys::{RepoPathKey, SettingsKey};
use mediarepo_core::utils::get_folder_size;
use mediarepo_logic::dao::DaoProvider;
use mediarepo_logic::dao::repo::Repo; use mediarepo_logic::dao::repo::Repo;
use mediarepo_logic::dao::DaoProvider;
use mediarepo_logic::dto::FileDto; use mediarepo_logic::dto::FileDto;
use mediarepo_logic::type_keys::RepoKey; use mediarepo_logic::type_keys::RepoKey;
use mediarepo_worker::job_dispatcher::{DispatcherKey, JobDispatcher};
pub async fn get_repo_from_context(ctx: &Context) -> Arc<Repo> { pub async fn get_repo_from_context(ctx: &Context) -> Arc<Repo> {
let data = ctx.data.read().await; let data = ctx.data.read().await;
@ -20,6 +16,11 @@ pub async fn get_repo_from_context(ctx: &Context) -> Arc<Repo> {
Arc::clone(repo) Arc::clone(repo)
} }
pub async fn get_job_dispatcher_from_context(ctx: &Context) -> JobDispatcher {
let data = ctx.data.read().await;
data.get::<DispatcherKey>().unwrap().clone()
}
pub async fn file_by_identifier(identifier: FileIdentifier, repo: &Repo) -> RepoResult<FileDto> { pub async fn file_by_identifier(identifier: FileIdentifier, repo: &Repo) -> RepoResult<FileDto> {
let file = match identifier { let file = match identifier {
FileIdentifier::ID(id) => repo.file().by_id(id).await, FileIdentifier::ID(id) => repo.file().by_id(id).await,
@ -41,27 +42,3 @@ pub async fn cd_by_identifier(identifier: FileIdentifier, repo: &Repo) -> RepoRe
FileIdentifier::CD(cd) => decode_content_descriptor(cd), FileIdentifier::CD(cd) => decode_content_descriptor(cd),
} }
} }
pub async fn calculate_size(size_type: &SizeType, ctx: &Context) -> RepoResult<u64> {
let repo = get_repo_from_context(ctx).await;
let (repo_path, settings) = {
let data = ctx.data.read().await;
(
data.get::<RepoPathKey>().unwrap().clone(),
data.get::<SettingsKey>().unwrap().clone(),
)
};
let size = match &size_type {
SizeType::Total => get_folder_size(repo_path).await?,
SizeType::FileFolder => repo.get_main_store_size().await?,
SizeType::ThumbFolder => repo.get_thumb_store_size().await?,
SizeType::DatabaseFile => {
let db_path = settings.paths.db_file_path(&repo_path);
let database_metadata = fs::metadata(db_path).await?;
database_metadata.len()
}
};
Ok(size)
}

@ -25,7 +25,7 @@ impl JobDispatcher {
} }
} }
pub async fn dispatch<T: 'static + Job>(&self, job: T) -> Arc<RwLock<T::JobStatus>> { pub async fn dispatch<T: 'static + Job>(&self, job: T) -> Arc<RwLock<T::JobState>> {
self._dispatch(job, None).await self._dispatch(job, None).await
} }
@ -33,7 +33,7 @@ impl JobDispatcher {
&self, &self,
job: T, job: T,
interval: Duration, interval: Duration,
) -> Arc<RwLock<T::JobStatus>> { ) -> Arc<RwLock<T::JobState>> {
self._dispatch(job, Some(interval)).await self._dispatch(job, Some(interval)).await
} }
@ -42,8 +42,8 @@ impl JobDispatcher {
&self, &self,
job: T, job: T,
interval: Option<Duration>, interval: Option<Duration>,
) -> Arc<RwLock<T::JobStatus>> { ) -> Arc<RwLock<T::JobState>> {
let status = job.status(); let status = job.state();
self.add_status::<JobTypeKey<T>>(status.clone()).await; self.add_status::<JobTypeKey<T>>(status.clone()).await;
let subsystem = unsafe { let subsystem = unsafe {
@ -61,14 +61,14 @@ impl JobDispatcher {
let job_2 = job.clone(); let job_2 = job.clone();
let result = tokio::select! { let result = tokio::select! {
_ = subsystem.on_shutdown_requested() => { _ = subsystem.on_shutdown_requested() => {
job_2.save_status(repo.job()).await job_2.save_state(repo.job()).await
} }
r = job.run(repo.clone()) => { r = job.run(repo.clone()) => {
if let Err(e) = r { if let Err(e) = r {
Err(e) Err(e)
} else { } else {
job.save_status(repo.job()).await job.save_state(repo.job()).await
} }
} }
}; };

@ -0,0 +1,90 @@
use crate::jobs::Job;
use crate::status_utils::SimpleProgress;
use async_trait::async_trait;
use mediarepo_core::error::{RepoError, RepoResult};
use mediarepo_core::mediarepo_api::types::repo::SizeType;
use mediarepo_core::settings::Settings;
use mediarepo_core::utils::get_folder_size;
use mediarepo_logic::dao::repo::Repo;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::fs;
use tokio::sync::broadcast::{self, Sender};
use tokio::sync::RwLock;
pub struct CalculateSizesState {
pub progress: SimpleProgress,
pub sizes_channel: Sender<(SizeType, u64)>,
}
#[derive(Clone)]
pub struct CalculateSizesJob {
repo_path: PathBuf,
settings: Settings,
state: Arc<RwLock<CalculateSizesState>>,
}
impl CalculateSizesJob {
pub fn new(repo_path: PathBuf, settings: Settings) -> Self {
let (tx, _) = broadcast::channel(4);
Self {
repo_path,
settings,
state: Arc::new(RwLock::new(CalculateSizesState {
sizes_channel: tx,
progress: SimpleProgress::new(4),
})),
}
}
}
#[async_trait]
impl Job for CalculateSizesJob {
type JobState = CalculateSizesState;
fn state(&self) -> Arc<RwLock<Self::JobState>> {
self.state.clone()
}
#[tracing::instrument(level = "debug", skip_all)]
async fn run(&self, repo: Arc<Repo>) -> RepoResult<()> {
let size_types = vec![
SizeType::Total,
SizeType::FileFolder,
SizeType::ThumbFolder,
SizeType::DatabaseFile,
];
for size_type in size_types {
let size = calculate_size(&size_type, &repo, &self.repo_path, &self.settings).await?;
let mut state = self.state.write().await;
state
.sizes_channel
.send((size_type, size))
.map_err(|_| RepoError::from("failed to broadcast new size"))?;
state.progress.tick();
}
Ok(())
}
}
async fn calculate_size(
size_type: &SizeType,
repo: &Repo,
repo_path: &PathBuf,
settings: &Settings,
) -> RepoResult<u64> {
let size = match &size_type {
SizeType::Total => get_folder_size(repo_path.clone()).await?,
SizeType::FileFolder => repo.get_main_store_size().await?,
SizeType::ThumbFolder => repo.get_thumb_store_size().await?,
SizeType::DatabaseFile => {
let db_path = settings.paths.db_file_path(repo_path);
let database_metadata = fs::metadata(db_path).await?;
database_metadata.len()
}
};
Ok(size)
}

@ -1,5 +1,7 @@
mod calculate_sizes;
mod vacuum; mod vacuum;
pub use calculate_sizes::*;
use std::marker::PhantomData; use std::marker::PhantomData;
use std::sync::Arc; use std::sync::Arc;
pub use vacuum::*; pub use vacuum::*;
@ -15,13 +17,16 @@ type EmptyStatus = Arc<RwLock<()>>;
#[async_trait] #[async_trait]
pub trait Job: Clone + Send + Sync { pub trait Job: Clone + Send + Sync {
type JobStatus: Send + Sync; type JobState: Send + Sync;
fn status(&self) -> Arc<RwLock<Self::JobStatus>>; fn state(&self) -> Arc<RwLock<Self::JobState>>;
async fn run(&self, repo: Arc<Repo>) -> RepoResult<()>; async fn run(&self, repo: Arc<Repo>) -> RepoResult<()>;
async fn save_status(&self, job_dao: JobDao) -> RepoResult<()>; #[tracing::instrument(level = "debug", skip_all)]
async fn save_state(&self, _job_dao: JobDao) -> RepoResult<()> {
Ok(())
}
} }
pub struct JobTypeKey<T: Job>(PhantomData<T>); pub struct JobTypeKey<T: Job>(PhantomData<T>);
@ -30,5 +35,5 @@ impl<T: 'static> TypeMapKey for JobTypeKey<T>
where where
T: Job, T: Job,
{ {
type Value = Arc<RwLock<T::JobStatus>>; type Value = Arc<RwLock<T::JobState>>;
} }

@ -1,7 +1,6 @@
use crate::jobs::{EmptyStatus, Job}; use crate::jobs::{EmptyStatus, Job};
use async_trait::async_trait; use async_trait::async_trait;
use mediarepo_core::error::RepoResult; use mediarepo_core::error::RepoResult;
use mediarepo_logic::dao::job::JobDao;
use mediarepo_logic::dao::repo::Repo; use mediarepo_logic::dao::repo::Repo;
use mediarepo_logic::dao::DaoProvider; use mediarepo_logic::dao::DaoProvider;
use std::sync::Arc; use std::sync::Arc;
@ -12,9 +11,9 @@ pub struct VacuumJob;
#[async_trait] #[async_trait]
impl Job for VacuumJob { impl Job for VacuumJob {
type JobStatus = (); type JobState = ();
fn status(&self) -> Arc<RwLock<Self::JobStatus>> { fn state(&self) -> Arc<RwLock<Self::JobState>> {
EmptyStatus::default() EmptyStatus::default()
} }
@ -24,9 +23,4 @@ impl Job for VacuumJob {
Ok(()) Ok(())
} }
#[tracing::instrument(level = "debug", skip_all)]
async fn save_status(&self, _: JobDao) -> RepoResult<()> {
Ok(())
}
} }

@ -8,6 +8,7 @@ use tokio::sync::oneshot::channel;
pub mod job_dispatcher; pub mod job_dispatcher;
pub mod jobs; pub mod jobs;
pub mod status_utils;
pub async fn start(top_level: Toplevel, repo: Repo) -> (Toplevel, JobDispatcher) { pub async fn start(top_level: Toplevel, repo: Repo) -> (Toplevel, JobDispatcher) {
let (tx, rx) = channel(); let (tx, rx) = channel();

@ -0,0 +1,34 @@
pub struct SimpleProgress {
pub current: u64,
pub total: u64,
}
impl Default for SimpleProgress {
fn default() -> Self {
Self {
total: 100,
current: 0,
}
}
}
impl SimpleProgress {
pub fn new(total: u64) -> Self {
Self { total, current: 0 }
}
/// Increments the current progress by 1
pub fn tick(&mut self) {
self.current += 1;
}
/// Sets the current progress to a defined value
pub fn set_current(&mut self, current: u64) {
self.current = current;
}
/// Returns the total progress in percent
pub fn percent(&self) -> f64 {
(self.current as f64) / (self.total as f64)
}
}
Loading…
Cancel
Save