From 0eda0d2c2226f4a5d70e7c287beaaf31a141a09f Mon Sep 17 00:00:00 2001 From: trivernis Date: Sun, 27 Mar 2022 18:01:43 +0200 Subject: [PATCH] Improve job state loading and storing Signed-off-by: trivernis --- .../src/entities/job_state.rs | 2 +- .../mediarepo-logic/src/dao/job/state.rs | 16 +++-- .../mediarepo-socket/src/namespaces/jobs.rs | 25 +++++-- .../mediarepo-worker/src/handle.rs | 13 ++-- .../mediarepo-worker/src/job_dispatcher.rs | 21 ++++-- .../src/jobs/generate_missing_thumbnails.rs | 64 +++++++++++++++++- .../src/jobs/migrate_content_descriptors.rs | 65 +++++++++++++++++++ .../mediarepo-worker/src/jobs/mod.rs | 30 ++++++++- mediarepo-daemon/mediarepo-worker/src/lib.rs | 3 +- 9 files changed, 206 insertions(+), 33 deletions(-) create mode 100644 mediarepo-daemon/mediarepo-worker/src/jobs/migrate_content_descriptors.rs diff --git a/mediarepo-daemon/mediarepo-database/src/entities/job_state.rs b/mediarepo-daemon/mediarepo-database/src/entities/job_state.rs index 01f3360..2c60e59 100644 --- a/mediarepo-daemon/mediarepo-database/src/entities/job_state.rs +++ b/mediarepo-daemon/mediarepo-database/src/entities/job_state.rs @@ -2,7 +2,7 @@ use sea_orm::prelude::*; use sea_orm::TryFromU64; #[derive(Clone, Debug, PartialEq, DeriveEntityModel)] -#[sea_orm(table_name = "namespaces")] +#[sea_orm(table_name = "job_states")] pub struct Model { #[sea_orm(primary_key)] pub job_type: JobType, diff --git a/mediarepo-daemon/mediarepo-logic/src/dao/job/state.rs b/mediarepo-daemon/mediarepo-logic/src/dao/job/state.rs index f125d5e..a8aab72 100644 --- a/mediarepo-daemon/mediarepo-logic/src/dao/job/state.rs +++ b/mediarepo-daemon/mediarepo-logic/src/dao/job/state.rs @@ -9,16 +9,18 @@ use sea_orm::{Condition, TransactionTrait}; impl JobDao { /// Returns all job states for a given job id - pub async fn states_for_job_type(&self, job_type: JobType) -> RepoResult> { - let states = job_state::Entity::find() + pub async fn state_for_job_type(&self, job_type: JobType) -> RepoResult> { + let state = job_state::Entity::find() .filter(job_state::Column::JobType.eq(job_type)) - .all(&self.ctx.db) + .one(&self.ctx.db) .await? - .into_iter() - .map(JobStateDto::new) - .collect(); + .map(JobStateDto::new); - Ok(states) + Ok(state) + } + + pub async fn upsert_state(&self, state: UpsertJobStateDto) -> RepoResult<()> { + self.upsert_multiple_states(vec![state]).await } pub async fn upsert_multiple_states(&self, states: Vec) -> RepoResult<()> { diff --git a/mediarepo-daemon/mediarepo-socket/src/namespaces/jobs.rs b/mediarepo-daemon/mediarepo-socket/src/namespaces/jobs.rs index 5025786..d86b3f1 100644 --- a/mediarepo-daemon/mediarepo-socket/src/namespaces/jobs.rs +++ b/mediarepo-daemon/mediarepo-socket/src/namespaces/jobs.rs @@ -3,13 +3,13 @@ use mediarepo_core::bromine::prelude::*; use mediarepo_core::error::RepoResult; use mediarepo_core::mediarepo_api::types::jobs::{JobType, RunJobRequest}; use mediarepo_core::type_keys::{RepoPathKey, SettingsKey, SizeMetadataKey}; -use mediarepo_logic::dao::DaoProvider; +use mediarepo_worker::handle::JobState; use mediarepo_worker::job_dispatcher::JobDispatcher; use mediarepo_worker::jobs::{ - CalculateSizesJob, CheckIntegrityJob, GenerateMissingThumbsJob, Job, VacuumJob, + CalculateSizesJob, CheckIntegrityJob, GenerateMissingThumbsJob, Job, MigrateCDsJob, VacuumJob, }; -use crate::utils::{get_job_dispatcher_from_context, get_repo_from_context}; +use crate::utils::get_job_dispatcher_from_context; pub struct JobsNamespace; @@ -29,7 +29,6 @@ impl JobsNamespace { #[tracing::instrument(skip_all)] pub async fn run_job(ctx: &Context, event: Event) -> IPCResult { let run_request = event.payload::()?; - let job_dao = get_repo_from_context(ctx).await.job(); let dispatcher = get_job_dispatcher_from_context(ctx).await; if !run_request.sync { @@ -38,7 +37,9 @@ impl JobsNamespace { } match run_request.job_type { - JobType::MigrateContentDescriptors => job_dao.migrate_content_descriptors().await?, + JobType::MigrateContentDescriptors => { + dispatch_job(&dispatcher, MigrateCDsJob::default(), run_request.sync).await? + } JobType::CalculateSizes => calculate_all_sizes(ctx).await?, JobType::CheckIntegrity => { dispatch_job(&dispatcher, CheckIntegrityJob::default(), run_request.sync).await? @@ -65,9 +66,19 @@ async fn dispatch_job( job: J, sync: bool, ) -> RepoResult<()> { - let mut handle = dispatcher.dispatch(job).await; + let mut handle = if let Some(handle) = dispatcher.get_handle::().await { + if handle.state().await == JobState::Running { + handle + } else { + dispatcher.dispatch(job).await + } + } else { + dispatcher.dispatch(job).await + }; if sync { - handle.try_result().await?; + if let Some(result) = handle.take_result().await { + result?; + } } Ok(()) } diff --git a/mediarepo-daemon/mediarepo-worker/src/handle.rs b/mediarepo-daemon/mediarepo-worker/src/handle.rs index 8c9a603..b9929ea 100644 --- a/mediarepo-daemon/mediarepo-worker/src/handle.rs +++ b/mediarepo-daemon/mediarepo-worker/src/handle.rs @@ -1,5 +1,4 @@ use mediarepo_core::error::{RepoError, RepoResult}; -use std::mem; use std::ops::{Deref, DerefMut}; use std::sync::Arc; use tokio::sync::broadcast::{Receiver, Sender}; @@ -8,7 +7,7 @@ use tokio::sync::RwLock; pub struct JobHandle { status: Arc>, state: Arc>, - result_receiver: CloneableReceiver>>>, + result_receiver: CloneableReceiver>>>>, } impl Clone for JobHandle { @@ -25,7 +24,7 @@ impl JobHandle { pub fn new( status: Arc>, state: Arc>, - result_receiver: CloneableReceiver>>>, + result_receiver: CloneableReceiver>>>>, ) -> Self { Self { status, @@ -42,17 +41,17 @@ impl JobHandle { &self.status } - pub async fn result(&mut self) -> Arc>> { + pub async fn result(&mut self) -> Arc>>> { match self.result_receiver.recv().await { Ok(v) => v, - Err(e) => Arc::new(RwLock::new(Err(RepoError::from(&*e.to_string())))), + Err(e) => Arc::new(RwLock::new(Some(Err(RepoError::from(&*e.to_string()))))), } } - pub async fn try_result(&mut self) -> RepoResult { + pub async fn take_result(&mut self) -> Option> { let shared_result = self.result().await; let mut result = shared_result.write().await; - mem::replace(&mut *result, Err(RepoError::from("result taken"))) + result.take() } } diff --git a/mediarepo-daemon/mediarepo-worker/src/job_dispatcher.rs b/mediarepo-daemon/mediarepo-worker/src/job_dispatcher.rs index 1cf58c2..2c74c02 100644 --- a/mediarepo-daemon/mediarepo-worker/src/job_dispatcher.rs +++ b/mediarepo-daemon/mediarepo-worker/src/job_dispatcher.rs @@ -57,7 +57,7 @@ impl JobDispatcher { }); let receiver = CloneableReceiver::new(sender.clone()); let handle = JobHandle::new(status.clone(), state.clone(), receiver); - self.add_handle::>(handle.clone()).await; + self.add_handle::(handle.clone()).await; let repo = self.repo.clone(); @@ -70,6 +70,9 @@ impl JobDispatcher { let mut state = state.write().await; *state = JobState::Running; } + if let Err(e) = job.load_state(repo.job()).await { + tracing::error!("failed to load the jobs state: {}", e); + } let result = tokio::select! { _ = subsystem.on_shutdown_requested() => { job_2.save_state(repo.job()).await @@ -78,7 +81,7 @@ impl JobDispatcher { match r { Err(e) => Err(e), Ok(v) => { - let _ = sender.send(Arc::new(RwLock::new(Ok(v)))); + let _ = sender.send(Arc::new(RwLock::new(Some(Ok(v))))); job.save_state(repo.job()).await } } @@ -86,6 +89,7 @@ impl JobDispatcher { }; if let Err(e) = result { tracing::error!("job failed with error: {}", e); + let _ = sender.send(Arc::new(RwLock::new(Some(Err(e))))); } if let Some(interval) = interval { { @@ -111,12 +115,15 @@ impl JobDispatcher { } #[inline] - async fn add_handle(&self, status: T::Value) - where - ::Value: Send + Sync, - { + async fn add_handle(&self, handle: JobHandle) { let mut status_map = self.job_handle_map.write().await; - status_map.insert::(status); + status_map.insert::>(handle); + } + + #[inline] + pub async fn get_handle(&self) -> Option> { + let map = self.job_handle_map.read().await; + map.get::>().cloned() } } diff --git a/mediarepo-daemon/mediarepo-worker/src/jobs/generate_missing_thumbnails.rs b/mediarepo-daemon/mediarepo-worker/src/jobs/generate_missing_thumbnails.rs index 05cf9e7..9eee427 100644 --- a/mediarepo-daemon/mediarepo-worker/src/jobs/generate_missing_thumbnails.rs +++ b/mediarepo-daemon/mediarepo-worker/src/jobs/generate_missing_thumbnails.rs @@ -1,16 +1,22 @@ -use crate::jobs::Job; +use crate::jobs::{deserialize_state, serialize_state, Job}; use crate::status_utils::SimpleProgress; use async_trait::async_trait; use mediarepo_core::error::RepoResult; use mediarepo_core::thumbnailer::ThumbnailSize; +use mediarepo_database::entities::job_state::JobType; +use mediarepo_logic::dao::job::JobDao; use mediarepo_logic::dao::repo::Repo; use mediarepo_logic::dao::DaoProvider; +use serde::{Deserialize, Serialize}; +use std::mem; use std::sync::Arc; +use std::time::{Duration, SystemTime}; use tokio::sync::RwLock; #[derive(Clone, Default)] pub struct GenerateMissingThumbsJob { state: Arc>, + inner_state: Arc>, } #[async_trait] @@ -22,7 +28,20 @@ impl Job for GenerateMissingThumbsJob { self.state.clone() } + async fn load_state(&self, job_dao: JobDao) -> RepoResult<()> { + if let Some(state) = job_dao.state_for_job_type(JobType::GenerateThumbs).await? { + let mut inner_state = self.inner_state.write().await; + let state = deserialize_state::(state)?; + let _ = mem::replace(&mut *inner_state, state); + } + + Ok(()) + } + async fn run(&self, repo: Arc) -> RepoResult<()> { + if !self.needs_generation(&repo).await? { + return Ok(()); + } let file_dao = repo.file(); let all_files = file_dao.all().await?; { @@ -42,6 +61,49 @@ impl Job for GenerateMissingThumbsJob { } } + self.refresh_state(&repo).await?; + Ok(()) } + + async fn save_state(&self, job_dao: JobDao) -> RepoResult<()> { + let state = self.inner_state.read().await; + let state = serialize_state(JobType::GenerateThumbs, &*state)?; + job_dao.upsert_state(state).await + } +} + +impl GenerateMissingThumbsJob { + async fn needs_generation(&self, repo: &Repo) -> RepoResult { + let repo_counts = repo.get_counts().await?; + let file_count = repo_counts.file_count as u64; + let state = self.inner_state.read().await; + + Ok(state.file_count != file_count + || state.last_run.elapsed().unwrap() > Duration::from_secs(60 * 60)) + } + + async fn refresh_state(&self, repo: &Repo) -> RepoResult<()> { + let repo_counts = repo.get_counts().await?; + let mut state = self.inner_state.write().await; + state.last_run = SystemTime::now(); + state.file_count = repo_counts.file_count as u64; + + Ok(()) + } +} + +#[derive(Serialize, Deserialize)] +struct GenerateThumbsState { + file_count: u64, + last_run: SystemTime, +} + +impl Default for GenerateThumbsState { + fn default() -> Self { + Self { + file_count: 0, + last_run: SystemTime::now(), + } + } } diff --git a/mediarepo-daemon/mediarepo-worker/src/jobs/migrate_content_descriptors.rs b/mediarepo-daemon/mediarepo-worker/src/jobs/migrate_content_descriptors.rs new file mode 100644 index 0000000..e320371 --- /dev/null +++ b/mediarepo-daemon/mediarepo-worker/src/jobs/migrate_content_descriptors.rs @@ -0,0 +1,65 @@ +use crate::jobs::{deserialize_state, serialize_state, Job}; +use crate::status_utils::SimpleProgress; +use async_trait::async_trait; +use mediarepo_core::error::RepoResult; +use mediarepo_database::entities::job_state::JobType; +use mediarepo_logic::dao::job::JobDao; +use mediarepo_logic::dao::repo::Repo; +use mediarepo_logic::dao::DaoProvider; +use serde::{Deserialize, Serialize}; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use tokio::sync::RwLock; + +#[derive(Clone, Default)] +pub struct MigrateCDsJob { + progress: Arc>, + migrated: Arc, +} + +#[async_trait] +impl Job for MigrateCDsJob { + type JobStatus = SimpleProgress; + type Result = (); + + fn status(&self) -> Arc> { + self.progress.clone() + } + + async fn load_state(&self, job_dao: JobDao) -> RepoResult<()> { + if let Some(state) = job_dao.state_for_job_type(JobType::MigrateCDs).await? { + let state = deserialize_state::(state)?; + self.migrated.store(state.migrated, Ordering::SeqCst); + } + + Ok(()) + } + + async fn run(&self, repo: Arc) -> RepoResult { + if self.migrated.load(Ordering::SeqCst) { + return Ok(()); + } + let job_dao = repo.job(); + + job_dao.migrate_content_descriptors().await?; + self.migrated.store(true, Ordering::Relaxed); + { + let mut progress = self.progress.write().await; + progress.set_total(100); + } + Ok(()) + } + + async fn save_state(&self, job_dao: JobDao) -> RepoResult<()> { + if self.migrated.load(Ordering::Relaxed) { + let state = serialize_state(JobType::MigrateCDs, &MigrationStatus { migrated: true })?; + job_dao.upsert_state(state).await?; + } + Ok(()) + } +} + +#[derive(Serialize, Deserialize)] +struct MigrationStatus { + pub migrated: bool, +} diff --git a/mediarepo-daemon/mediarepo-worker/src/jobs/mod.rs b/mediarepo-daemon/mediarepo-worker/src/jobs/mod.rs index f2d652f..846a39e 100644 --- a/mediarepo-daemon/mediarepo-worker/src/jobs/mod.rs +++ b/mediarepo-daemon/mediarepo-worker/src/jobs/mod.rs @@ -1,21 +1,28 @@ mod calculate_sizes; mod check_integrity; mod generate_missing_thumbnails; +mod migrate_content_descriptors; mod vacuum; pub use calculate_sizes::*; pub use check_integrity::*; pub use generate_missing_thumbnails::*; +pub use migrate_content_descriptors::*; use std::marker::PhantomData; use std::sync::Arc; pub use vacuum::*; use crate::handle::JobHandle; use async_trait::async_trait; -use mediarepo_core::error::RepoResult; +use mediarepo_core::bincode; +use mediarepo_core::error::{RepoError, RepoResult}; use mediarepo_core::trait_bound_typemap::TypeMapKey; +use mediarepo_database::entities::job_state::JobType; use mediarepo_logic::dao::job::JobDao; use mediarepo_logic::dao::repo::Repo; +use mediarepo_logic::dto::{JobStateDto, UpsertJobStateDto}; +use serde::de::DeserializeOwned; +use serde::Serialize; use tokio::sync::RwLock; type EmptyStatus = Arc>; @@ -27,9 +34,12 @@ pub trait Job: Clone + Send + Sync { fn status(&self) -> Arc>; + async fn load_state(&self, _job_dao: JobDao) -> RepoResult<()> { + Ok(()) + } + async fn run(&self, repo: Arc) -> RepoResult; - #[tracing::instrument(level = "debug", skip_all)] async fn save_state(&self, _job_dao: JobDao) -> RepoResult<()> { Ok(()) } @@ -43,3 +53,19 @@ where { type Value = JobHandle; } + +pub fn deserialize_state(dto: JobStateDto) -> RepoResult { + bincode::deserialize(dto.value()).map_err(RepoError::from) +} + +pub fn serialize_state( + job_type: JobType, + state: &T, +) -> RepoResult { + let dto = UpsertJobStateDto { + value: bincode::serialize(state)?, + job_type, + }; + + Ok(dto) +} diff --git a/mediarepo-daemon/mediarepo-worker/src/lib.rs b/mediarepo-daemon/mediarepo-worker/src/lib.rs index 9dd35d8..1b696f5 100644 --- a/mediarepo-daemon/mediarepo-worker/src/lib.rs +++ b/mediarepo-daemon/mediarepo-worker/src/lib.rs @@ -1,5 +1,5 @@ use crate::job_dispatcher::JobDispatcher; -use crate::jobs::{CheckIntegrityJob, VacuumJob}; +use crate::jobs::{CheckIntegrityJob, MigrateCDsJob, VacuumJob}; use mediarepo_core::error::RepoError; use mediarepo_core::tokio_graceful_shutdown::Toplevel; use mediarepo_logic::dao::repo::Repo; @@ -27,6 +27,7 @@ pub async fn start(top_level: Toplevel, repo: Repo) -> (Toplevel, JobDispatcher) Duration::from_secs(60 * 60 * 24), ) .await; + dispatcher.dispatch(MigrateCDsJob::default()).await; Ok(()) });