Improve job state loading and storing

Signed-off-by: trivernis <trivernis@protonmail.com>
feature/jobs
trivernis 3 years ago
parent fb869dabb1
commit 0eda0d2c22
Signed by: Trivernis
GPG Key ID: DFFFCC2C7A02DB45

@ -2,7 +2,7 @@ use sea_orm::prelude::*;
use sea_orm::TryFromU64;
#[derive(Clone, Debug, PartialEq, DeriveEntityModel)]
#[sea_orm(table_name = "namespaces")]
#[sea_orm(table_name = "job_states")]
pub struct Model {
#[sea_orm(primary_key)]
pub job_type: JobType,

@ -9,16 +9,18 @@ use sea_orm::{Condition, TransactionTrait};
impl JobDao {
/// Returns all job states for a given job id
pub async fn states_for_job_type(&self, job_type: JobType) -> RepoResult<Vec<JobStateDto>> {
let states = job_state::Entity::find()
pub async fn state_for_job_type(&self, job_type: JobType) -> RepoResult<Option<JobStateDto>> {
let state = job_state::Entity::find()
.filter(job_state::Column::JobType.eq(job_type))
.all(&self.ctx.db)
.one(&self.ctx.db)
.await?
.into_iter()
.map(JobStateDto::new)
.collect();
.map(JobStateDto::new);
Ok(states)
Ok(state)
}
pub async fn upsert_state(&self, state: UpsertJobStateDto) -> RepoResult<()> {
self.upsert_multiple_states(vec![state]).await
}
pub async fn upsert_multiple_states(&self, states: Vec<UpsertJobStateDto>) -> RepoResult<()> {

@ -3,13 +3,13 @@ use mediarepo_core::bromine::prelude::*;
use mediarepo_core::error::RepoResult;
use mediarepo_core::mediarepo_api::types::jobs::{JobType, RunJobRequest};
use mediarepo_core::type_keys::{RepoPathKey, SettingsKey, SizeMetadataKey};
use mediarepo_logic::dao::DaoProvider;
use mediarepo_worker::handle::JobState;
use mediarepo_worker::job_dispatcher::JobDispatcher;
use mediarepo_worker::jobs::{
CalculateSizesJob, CheckIntegrityJob, GenerateMissingThumbsJob, Job, VacuumJob,
CalculateSizesJob, CheckIntegrityJob, GenerateMissingThumbsJob, Job, MigrateCDsJob, VacuumJob,
};
use crate::utils::{get_job_dispatcher_from_context, get_repo_from_context};
use crate::utils::get_job_dispatcher_from_context;
pub struct JobsNamespace;
@ -29,7 +29,6 @@ impl JobsNamespace {
#[tracing::instrument(skip_all)]
pub async fn run_job(ctx: &Context, event: Event) -> IPCResult<Response> {
let run_request = event.payload::<RunJobRequest>()?;
let job_dao = get_repo_from_context(ctx).await.job();
let dispatcher = get_job_dispatcher_from_context(ctx).await;
if !run_request.sync {
@ -38,7 +37,9 @@ impl JobsNamespace {
}
match run_request.job_type {
JobType::MigrateContentDescriptors => job_dao.migrate_content_descriptors().await?,
JobType::MigrateContentDescriptors => {
dispatch_job(&dispatcher, MigrateCDsJob::default(), run_request.sync).await?
}
JobType::CalculateSizes => calculate_all_sizes(ctx).await?,
JobType::CheckIntegrity => {
dispatch_job(&dispatcher, CheckIntegrityJob::default(), run_request.sync).await?
@ -65,9 +66,19 @@ async fn dispatch_job<J: 'static + Job>(
job: J,
sync: bool,
) -> RepoResult<()> {
let mut handle = dispatcher.dispatch(job).await;
let mut handle = if let Some(handle) = dispatcher.get_handle::<J>().await {
if handle.state().await == JobState::Running {
handle
} else {
dispatcher.dispatch(job).await
}
} else {
dispatcher.dispatch(job).await
};
if sync {
handle.try_result().await?;
if let Some(result) = handle.take_result().await {
result?;
}
}
Ok(())
}

@ -1,5 +1,4 @@
use mediarepo_core::error::{RepoError, RepoResult};
use std::mem;
use std::ops::{Deref, DerefMut};
use std::sync::Arc;
use tokio::sync::broadcast::{Receiver, Sender};
@ -8,7 +7,7 @@ use tokio::sync::RwLock;
pub struct JobHandle<T: Send + Sync, R: Send + Sync> {
status: Arc<RwLock<T>>,
state: Arc<RwLock<JobState>>,
result_receiver: CloneableReceiver<Arc<RwLock<RepoResult<R>>>>,
result_receiver: CloneableReceiver<Arc<RwLock<Option<RepoResult<R>>>>>,
}
impl<T: Send + Sync, R: Send + Sync> Clone for JobHandle<T, R> {
@ -25,7 +24,7 @@ impl<T: Send + Sync, R: Send + Sync> JobHandle<T, R> {
pub fn new(
status: Arc<RwLock<T>>,
state: Arc<RwLock<JobState>>,
result_receiver: CloneableReceiver<Arc<RwLock<RepoResult<R>>>>,
result_receiver: CloneableReceiver<Arc<RwLock<Option<RepoResult<R>>>>>,
) -> Self {
Self {
status,
@ -42,17 +41,17 @@ impl<T: Send + Sync, R: Send + Sync> JobHandle<T, R> {
&self.status
}
pub async fn result(&mut self) -> Arc<RwLock<RepoResult<R>>> {
pub async fn result(&mut self) -> Arc<RwLock<Option<RepoResult<R>>>> {
match self.result_receiver.recv().await {
Ok(v) => v,
Err(e) => Arc::new(RwLock::new(Err(RepoError::from(&*e.to_string())))),
Err(e) => Arc::new(RwLock::new(Some(Err(RepoError::from(&*e.to_string()))))),
}
}
pub async fn try_result(&mut self) -> RepoResult<R> {
pub async fn take_result(&mut self) -> Option<RepoResult<R>> {
let shared_result = self.result().await;
let mut result = shared_result.write().await;
mem::replace(&mut *result, Err(RepoError::from("result taken")))
result.take()
}
}

@ -57,7 +57,7 @@ impl JobDispatcher {
});
let receiver = CloneableReceiver::new(sender.clone());
let handle = JobHandle::new(status.clone(), state.clone(), receiver);
self.add_handle::<JobTypeKey<T>>(handle.clone()).await;
self.add_handle::<T>(handle.clone()).await;
let repo = self.repo.clone();
@ -70,6 +70,9 @@ impl JobDispatcher {
let mut state = state.write().await;
*state = JobState::Running;
}
if let Err(e) = job.load_state(repo.job()).await {
tracing::error!("failed to load the jobs state: {}", e);
}
let result = tokio::select! {
_ = subsystem.on_shutdown_requested() => {
job_2.save_state(repo.job()).await
@ -78,7 +81,7 @@ impl JobDispatcher {
match r {
Err(e) => Err(e),
Ok(v) => {
let _ = sender.send(Arc::new(RwLock::new(Ok(v))));
let _ = sender.send(Arc::new(RwLock::new(Some(Ok(v)))));
job.save_state(repo.job()).await
}
}
@ -86,6 +89,7 @@ impl JobDispatcher {
};
if let Err(e) = result {
tracing::error!("job failed with error: {}", e);
let _ = sender.send(Arc::new(RwLock::new(Some(Err(e)))));
}
if let Some(interval) = interval {
{
@ -111,12 +115,15 @@ impl JobDispatcher {
}
#[inline]
async fn add_handle<T: TypeMapKey>(&self, status: T::Value)
where
<T as TypeMapKey>::Value: Send + Sync,
{
async fn add_handle<T: 'static + Job>(&self, handle: JobHandle<T::JobStatus, T::Result>) {
let mut status_map = self.job_handle_map.write().await;
status_map.insert::<T>(status);
status_map.insert::<JobTypeKey<T>>(handle);
}
#[inline]
pub async fn get_handle<T: 'static + Job>(&self) -> Option<JobHandle<T::JobStatus, T::Result>> {
let map = self.job_handle_map.read().await;
map.get::<JobTypeKey<T>>().cloned()
}
}

@ -1,16 +1,22 @@
use crate::jobs::Job;
use crate::jobs::{deserialize_state, serialize_state, Job};
use crate::status_utils::SimpleProgress;
use async_trait::async_trait;
use mediarepo_core::error::RepoResult;
use mediarepo_core::thumbnailer::ThumbnailSize;
use mediarepo_database::entities::job_state::JobType;
use mediarepo_logic::dao::job::JobDao;
use mediarepo_logic::dao::repo::Repo;
use mediarepo_logic::dao::DaoProvider;
use serde::{Deserialize, Serialize};
use std::mem;
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use tokio::sync::RwLock;
#[derive(Clone, Default)]
pub struct GenerateMissingThumbsJob {
state: Arc<RwLock<SimpleProgress>>,
inner_state: Arc<RwLock<GenerateThumbsState>>,
}
#[async_trait]
@ -22,7 +28,20 @@ impl Job for GenerateMissingThumbsJob {
self.state.clone()
}
async fn load_state(&self, job_dao: JobDao) -> RepoResult<()> {
if let Some(state) = job_dao.state_for_job_type(JobType::GenerateThumbs).await? {
let mut inner_state = self.inner_state.write().await;
let state = deserialize_state::<GenerateThumbsState>(state)?;
let _ = mem::replace(&mut *inner_state, state);
}
Ok(())
}
async fn run(&self, repo: Arc<Repo>) -> RepoResult<()> {
if !self.needs_generation(&repo).await? {
return Ok(());
}
let file_dao = repo.file();
let all_files = file_dao.all().await?;
{
@ -42,6 +61,49 @@ impl Job for GenerateMissingThumbsJob {
}
}
self.refresh_state(&repo).await?;
Ok(())
}
async fn save_state(&self, job_dao: JobDao) -> RepoResult<()> {
let state = self.inner_state.read().await;
let state = serialize_state(JobType::GenerateThumbs, &*state)?;
job_dao.upsert_state(state).await
}
}
impl GenerateMissingThumbsJob {
async fn needs_generation(&self, repo: &Repo) -> RepoResult<bool> {
let repo_counts = repo.get_counts().await?;
let file_count = repo_counts.file_count as u64;
let state = self.inner_state.read().await;
Ok(state.file_count != file_count
|| state.last_run.elapsed().unwrap() > Duration::from_secs(60 * 60))
}
async fn refresh_state(&self, repo: &Repo) -> RepoResult<()> {
let repo_counts = repo.get_counts().await?;
let mut state = self.inner_state.write().await;
state.last_run = SystemTime::now();
state.file_count = repo_counts.file_count as u64;
Ok(())
}
}
#[derive(Serialize, Deserialize)]
struct GenerateThumbsState {
file_count: u64,
last_run: SystemTime,
}
impl Default for GenerateThumbsState {
fn default() -> Self {
Self {
file_count: 0,
last_run: SystemTime::now(),
}
}
}

@ -0,0 +1,65 @@
use crate::jobs::{deserialize_state, serialize_state, Job};
use crate::status_utils::SimpleProgress;
use async_trait::async_trait;
use mediarepo_core::error::RepoResult;
use mediarepo_database::entities::job_state::JobType;
use mediarepo_logic::dao::job::JobDao;
use mediarepo_logic::dao::repo::Repo;
use mediarepo_logic::dao::DaoProvider;
use serde::{Deserialize, Serialize};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use tokio::sync::RwLock;
#[derive(Clone, Default)]
pub struct MigrateCDsJob {
progress: Arc<RwLock<SimpleProgress>>,
migrated: Arc<AtomicBool>,
}
#[async_trait]
impl Job for MigrateCDsJob {
type JobStatus = SimpleProgress;
type Result = ();
fn status(&self) -> Arc<tokio::sync::RwLock<Self::JobStatus>> {
self.progress.clone()
}
async fn load_state(&self, job_dao: JobDao) -> RepoResult<()> {
if let Some(state) = job_dao.state_for_job_type(JobType::MigrateCDs).await? {
let state = deserialize_state::<MigrationStatus>(state)?;
self.migrated.store(state.migrated, Ordering::SeqCst);
}
Ok(())
}
async fn run(&self, repo: Arc<Repo>) -> RepoResult<Self::Result> {
if self.migrated.load(Ordering::SeqCst) {
return Ok(());
}
let job_dao = repo.job();
job_dao.migrate_content_descriptors().await?;
self.migrated.store(true, Ordering::Relaxed);
{
let mut progress = self.progress.write().await;
progress.set_total(100);
}
Ok(())
}
async fn save_state(&self, job_dao: JobDao) -> RepoResult<()> {
if self.migrated.load(Ordering::Relaxed) {
let state = serialize_state(JobType::MigrateCDs, &MigrationStatus { migrated: true })?;
job_dao.upsert_state(state).await?;
}
Ok(())
}
}
#[derive(Serialize, Deserialize)]
struct MigrationStatus {
pub migrated: bool,
}

@ -1,21 +1,28 @@
mod calculate_sizes;
mod check_integrity;
mod generate_missing_thumbnails;
mod migrate_content_descriptors;
mod vacuum;
pub use calculate_sizes::*;
pub use check_integrity::*;
pub use generate_missing_thumbnails::*;
pub use migrate_content_descriptors::*;
use std::marker::PhantomData;
use std::sync::Arc;
pub use vacuum::*;
use crate::handle::JobHandle;
use async_trait::async_trait;
use mediarepo_core::error::RepoResult;
use mediarepo_core::bincode;
use mediarepo_core::error::{RepoError, RepoResult};
use mediarepo_core::trait_bound_typemap::TypeMapKey;
use mediarepo_database::entities::job_state::JobType;
use mediarepo_logic::dao::job::JobDao;
use mediarepo_logic::dao::repo::Repo;
use mediarepo_logic::dto::{JobStateDto, UpsertJobStateDto};
use serde::de::DeserializeOwned;
use serde::Serialize;
use tokio::sync::RwLock;
type EmptyStatus = Arc<RwLock<()>>;
@ -27,9 +34,12 @@ pub trait Job: Clone + Send + Sync {
fn status(&self) -> Arc<RwLock<Self::JobStatus>>;
async fn load_state(&self, _job_dao: JobDao) -> RepoResult<()> {
Ok(())
}
async fn run(&self, repo: Arc<Repo>) -> RepoResult<Self::Result>;
#[tracing::instrument(level = "debug", skip_all)]
async fn save_state(&self, _job_dao: JobDao) -> RepoResult<()> {
Ok(())
}
@ -43,3 +53,19 @@ where
{
type Value = JobHandle<T::JobStatus, T::Result>;
}
pub fn deserialize_state<T: DeserializeOwned>(dto: JobStateDto) -> RepoResult<T> {
bincode::deserialize(dto.value()).map_err(RepoError::from)
}
pub fn serialize_state<T: Serialize>(
job_type: JobType,
state: &T,
) -> RepoResult<UpsertJobStateDto> {
let dto = UpsertJobStateDto {
value: bincode::serialize(state)?,
job_type,
};
Ok(dto)
}

@ -1,5 +1,5 @@
use crate::job_dispatcher::JobDispatcher;
use crate::jobs::{CheckIntegrityJob, VacuumJob};
use crate::jobs::{CheckIntegrityJob, MigrateCDsJob, VacuumJob};
use mediarepo_core::error::RepoError;
use mediarepo_core::tokio_graceful_shutdown::Toplevel;
use mediarepo_logic::dao::repo::Repo;
@ -27,6 +27,7 @@ pub async fn start(top_level: Toplevel, repo: Repo) -> (Toplevel, JobDispatcher)
Duration::from_secs(60 * 60 * 24),
)
.await;
dispatcher.dispatch(MigrateCDsJob::default()).await;
Ok(())
});

Loading…
Cancel
Save