From a145d604d90a4571520e61b8a3ced2b31abd7ac8 Mon Sep 17 00:00:00 2001 From: trivernis Date: Sat, 12 Mar 2022 17:18:49 +0100 Subject: [PATCH] Simplify job implementation Signed-off-by: trivernis --- .../mediarepo-core/src/type_list.rs | 0 .../mediarepo-database/src/entities/job.rs | 42 ---- .../mediarepo-logic/src/dto/job.rs | 34 ---- .../mediarepo-worker/src/execution_state.rs | 58 ------ .../mediarepo-worker/src/job_dispatcher.rs | 0 .../mediarepo-worker/src/jobs_table.rs | 13 -- .../mediarepo-worker/src/progress.rs | 78 -------- .../mediarepo-worker/src/scheduler.rs | 182 ------------------ .../mediarepo-worker/src/state_data.rs | 88 --------- 9 files changed, 495 deletions(-) create mode 100644 mediarepo-daemon/mediarepo-core/src/type_list.rs delete mode 100644 mediarepo-daemon/mediarepo-database/src/entities/job.rs delete mode 100644 mediarepo-daemon/mediarepo-logic/src/dto/job.rs delete mode 100644 mediarepo-daemon/mediarepo-worker/src/execution_state.rs create mode 100644 mediarepo-daemon/mediarepo-worker/src/job_dispatcher.rs delete mode 100644 mediarepo-daemon/mediarepo-worker/src/jobs_table.rs delete mode 100644 mediarepo-daemon/mediarepo-worker/src/progress.rs delete mode 100644 mediarepo-daemon/mediarepo-worker/src/scheduler.rs delete mode 100644 mediarepo-daemon/mediarepo-worker/src/state_data.rs diff --git a/mediarepo-daemon/mediarepo-core/src/type_list.rs b/mediarepo-daemon/mediarepo-core/src/type_list.rs new file mode 100644 index 0000000..e69de29 diff --git a/mediarepo-daemon/mediarepo-database/src/entities/job.rs b/mediarepo-daemon/mediarepo-database/src/entities/job.rs deleted file mode 100644 index dddf46c..0000000 --- a/mediarepo-daemon/mediarepo-database/src/entities/job.rs +++ /dev/null @@ -1,42 +0,0 @@ -use chrono::NaiveDateTime; -use sea_orm::prelude::*; - -#[derive(Clone, Debug, PartialEq, DeriveEntityModel)] -#[sea_orm(table_name = "namespaces")] -pub struct Model { - #[sea_orm(primary_key)] - pub id: i64, - pub job_type: JobType, - pub name: Option, - pub next_run: Option, - pub interval: Option, -} - -#[derive(Clone, Copy, Debug, PartialEq, EnumIter, DeriveActiveEnum)] -#[sea_orm(rs_type = "u32", db_type = "Integer")] -pub enum JobType { - #[sea_orm(num_value = 10)] - MigrateCDs, - #[sea_orm(num_value = 20)] - CalculateSizes, - #[sea_orm(num_value = 30)] - GenerateThumbs, - #[sea_orm(num_value = 40)] - CheckIntegrity, - #[sea_orm(num_value = 50)] - Vacuum, -} - -#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] -pub enum Relation { - #[sea_orm(has_many = "super::job_state::Entity")] - JobState, -} - -impl Related for Entity { - fn to() -> RelationDef { - Relation::JobState.def() - } -} - -impl ActiveModelBehavior for ActiveModel {} diff --git a/mediarepo-daemon/mediarepo-logic/src/dto/job.rs b/mediarepo-daemon/mediarepo-logic/src/dto/job.rs deleted file mode 100644 index f4c186e..0000000 --- a/mediarepo-daemon/mediarepo-logic/src/dto/job.rs +++ /dev/null @@ -1,34 +0,0 @@ -use chrono::NaiveDateTime; -use mediarepo_database::entities::job; -use mediarepo_database::entities::job::JobType; - -#[derive(Clone, Debug)] -pub struct JobDto { - model: job::Model, -} - -impl JobDto { - pub(crate) fn new(model: job::Model) -> Self { - Self { model } - } - - pub fn id(&self) -> i64 { - self.model.id - } - - pub fn job_type(&self) -> JobType { - self.model.job_type - } - - pub fn name(&self) -> Option<&String> { - self.model.name.as_ref() - } - - pub fn next_run(&self) -> Option { - self.model.next_run - } - - pub fn interval(&self) -> Option { - self.model.interval - } -} diff --git a/mediarepo-daemon/mediarepo-worker/src/execution_state.rs b/mediarepo-daemon/mediarepo-worker/src/execution_state.rs deleted file mode 100644 index 82a2959..0000000 --- a/mediarepo-daemon/mediarepo-worker/src/execution_state.rs +++ /dev/null @@ -1,58 +0,0 @@ -use std::sync::atomic::{AtomicU8, Ordering}; -use std::sync::Arc; - -#[derive(Clone, Copy, Debug, Ord, PartialOrd, Eq, PartialEq)] -pub enum JobExecutionState { - Scheduled, - Running, - Finished, -} - -pub struct ExecutionStateSynchronizer { - state: Arc, -} - -impl Default for ExecutionStateSynchronizer { - fn default() -> Self { - Self { - state: Arc::new(AtomicU8::new(0)), - } - } -} - -impl ExecutionStateSynchronizer { - pub fn set_scheduled(&self) { - self.state.store(0, Ordering::Relaxed); - } - - #[must_use] - pub fn set_running(&self) -> RunningHandle { - self.state.store(1, Ordering::Relaxed); - RunningHandle { - state: Arc::clone(&self.state), - } - } - - pub fn set_finished(&self) { - self.state.store(2, Ordering::SeqCst) - } - - pub fn state(&self) -> JobExecutionState { - match self.state.load(Ordering::SeqCst) { - 0 => JobExecutionState::Scheduled, - 1 => JobExecutionState::Running, - 2 => JobExecutionState::Scheduled, - _ => JobExecutionState::Finished, - } - } -} - -pub struct RunningHandle { - state: Arc, -} - -impl Drop for RunningHandle { - fn drop(&mut self) { - self.state.store(2, Ordering::SeqCst); - } -} diff --git a/mediarepo-daemon/mediarepo-worker/src/job_dispatcher.rs b/mediarepo-daemon/mediarepo-worker/src/job_dispatcher.rs new file mode 100644 index 0000000..e69de29 diff --git a/mediarepo-daemon/mediarepo-worker/src/jobs_table.rs b/mediarepo-daemon/mediarepo-worker/src/jobs_table.rs deleted file mode 100644 index d9c30f7..0000000 --- a/mediarepo-daemon/mediarepo-worker/src/jobs_table.rs +++ /dev/null @@ -1,13 +0,0 @@ -use crate::progress::JobProgressUpdate; -use mediarepo_logic::dto::JobDto; -use std::collections::HashMap; -use std::sync::Arc; -use tokio::sync::RwLock; - -pub type JobsTable = Arc>>; - -#[derive(Clone, Debug)] -pub struct JobEntry { - pub dto: JobDto, - pub last_update: Option, -} diff --git a/mediarepo-daemon/mediarepo-worker/src/progress.rs b/mediarepo-daemon/mediarepo-worker/src/progress.rs deleted file mode 100644 index f643089..0000000 --- a/mediarepo-daemon/mediarepo-worker/src/progress.rs +++ /dev/null @@ -1,78 +0,0 @@ -use crate::execution_state::{ExecutionStateSynchronizer, JobExecutionState, RunningHandle}; -use tokio::sync::mpsc::Sender; - -#[derive(Clone, Debug)] -pub struct JobProgressUpdate { - id: i64, - state: JobExecutionState, - progress: Option, - total: Option, -} - -impl JobProgressUpdate { - pub fn new(id: i64) -> Self { - Self { - id, - state: JobExecutionState::Scheduled, - progress: None, - total: None, - } - } - - pub fn id(&self) -> i64 { - self.id - } - - pub fn state(&self) -> JobExecutionState { - self.state - } - - pub fn set_state(&mut self, state: JobExecutionState) { - self.state = state; - } - - pub fn progress(&self) -> Option { - self.progress - } - - pub fn set_progress(&mut self, progress: u64) { - self.progress = Some(progress); - } - - pub fn total(&self) -> Option { - self.total - } - - pub fn set_total(&mut self, total: u64) { - self.total = Some(total) - } -} - -pub struct ProgressSender { - job_id: i64, - execution_state_sync: ExecutionStateSynchronizer, - pub inner: Sender, -} - -impl ProgressSender { - pub fn new(job_id: i64, sender: Sender) -> Self { - Self { - job_id, - inner: sender, - execution_state_sync: ExecutionStateSynchronizer::default(), - } - } - - pub fn send_progress(&self, progress: u64, total: u64) { - let _ = self.inner.send(JobProgressUpdate { - id: self.job_id, - state: JobExecutionState::Running, - progress: Some(progress), - total: Some(total), - }); - } - - pub fn send_progress_percent(&self, percent: f64) { - self.send_progress((percent * 100.0) as u64, 100); - } -} diff --git a/mediarepo-daemon/mediarepo-worker/src/scheduler.rs b/mediarepo-daemon/mediarepo-worker/src/scheduler.rs deleted file mode 100644 index 3303d97..0000000 --- a/mediarepo-daemon/mediarepo-worker/src/scheduler.rs +++ /dev/null @@ -1,182 +0,0 @@ -use crate::execution_state::JobExecutionState; -use crate::jobs::{ScheduledJob, VacuumJob}; -use crate::jobs_table::JobsTable; -use crate::progress::{JobProgressUpdate, ProgressSender}; -use crate::state_data::StateData; -use mediarepo_core::error::RepoResult; -use mediarepo_core::futures::select; -use mediarepo_core::settings::LogLevel::Debug; -use mediarepo_core::tokio_graceful_shutdown::SubsystemHandle; -use mediarepo_database::entities::job::JobType; -use mediarepo_logic::dao::repo::Repo; -use mediarepo_logic::dao::DaoProvider; -use mediarepo_logic::dto::JobDto; -use std::sync::Arc; -use std::time::Duration; -use tokio::sync::mpsc::{channel, Receiver, Sender}; -use tokio::sync::RwLock; - -#[derive(Clone)] -pub struct Scheduler { - repo: Repo, - state_data: Arc>>, - jobs_table: JobsTable, -} - -impl Scheduler { - pub fn new(repo: Repo) -> Self { - Self { - repo, - state_data: Default::default(), - jobs_table: Default::default(), - } - } - - pub fn run(self, subsystem: SubsystemHandle) -> JobsTable { - tokio::task::spawn({ - let subsystem = subsystem.clone(); - let scheduler = self.clone(); - async move { - scheduler.loop_save_states(subsystem).await; - } - }); - - let (tx, rx) = channel(32); - - tokio::task::spawn({ - let scheduler = self.clone(); - async move { - scheduler.loop_schedule(subsystem, tx).await; - } - }); - - let jobs_table = self.jobs_table.clone(); - - tokio::task::spawn(async move { self.update_on_progress(rx).await }); - - jobs_table - } - - async fn loop_schedule( - self, - subsystem: SubsystemHandle, - job_progress_sender: Sender, - ) { - loop { - if let Err(e) = self.schedule(&job_progress_sender).await { - tracing::error!("failed to schedule jobs: {}", e); - } - tokio::select! { - _ = tokio::time::sleep(Duration::from_secs(1)) => {}, - _ = subsystem.on_shutdown_requested() => { - break; - } - } - } - } - - async fn schedule(&self, job_progress_sender: &Sender) -> RepoResult<()> { - let mut scheduled_jobs = self.repo.job().scheduled_for_now().await?; - let running_jobs = self.running_jobs().await; - - scheduled_jobs.retain(|j| !running_jobs.contains(&j.id())); - - for job in scheduled_jobs { - let mut sender = job_progress_sender.clone(); - let mut progress = JobProgressUpdate::new(job.id()); - let scheduled_job = create_job(job); - let _ = sender.send(progress.clone()).await; - let repo = self.repo.clone(); - - tokio::task::spawn(async move { - progress.set_state(JobExecutionState::Running); - let _ = sender.send(progress.clone()).await; - - let progress_sender = ProgressSender::new(progress.id(), sender); - if let Err(e) = scheduled_job.run(&progress_sender, repo).await { - tracing::error!("error occurred during job execution: {}", e); - } - let sender = progress_sender.inner; - progress.set_state(JobExecutionState::Finished); - let _ = sender.send(progress).await; - }); - } - - Ok(()) - } - - async fn loop_save_states(self, subsystem: SubsystemHandle) { - loop { - if let Err(e) = self.save_states().await { - tracing::error!("failed to save job state {}", e); - } - - tokio::select! { - _ = tokio::time::sleep(Duration::from_secs(1)) => {}, - _ = subsystem.on_shutdown_requested() => { - let _ = self.save_states().await; - break; - } - } - } - } - - async fn save_states(&self) -> RepoResult<()> { - let mut changed_states = Vec::new(); - { - let states = self.state_data.read().await; - for state in &*states { - changed_states.append(&mut state.changed_states().await); - } - } - self.repo - .job() - .upsert_multiple_states(changed_states) - .await?; - - Ok(()) - } - - async fn update_on_progress(mut self, mut rx: Receiver) { - while let Some(progress) = rx.recv().await { - let mut jobs_table = self.jobs_table.write().await; - - if let JobExecutionState::Finished = progress.state() { - let mut state_data = self.state_data.write().await; - state_data.retain(|s| s.job_id() != progress.id()); - } - - if let Some(entry) = jobs_table.get_mut(&progress.id()) { - entry.last_update = Some(progress); - } - } - } - - async fn running_jobs(&self) -> Vec { - let jobs_table = self.jobs_table.read().await; - jobs_table - .values() - .filter_map(|v| v.last_update.as_ref()) - .filter(|u| u.state() != JobExecutionState::Finished) - .map(|u| u.id()) - .collect() - } -} - -fn create_job(dto: JobDto) -> Box { - match dto.job_type() { - JobType::MigrateCDs => { - todo!() - } - JobType::CalculateSizes => { - todo!() - } - JobType::GenerateThumbs => { - todo!() - } - JobType::CheckIntegrity => { - todo!() - } - JobType::Vacuum => Box::new(VacuumJob), - } -} diff --git a/mediarepo-daemon/mediarepo-worker/src/state_data.rs b/mediarepo-daemon/mediarepo-worker/src/state_data.rs deleted file mode 100644 index 7a9f5e7..0000000 --- a/mediarepo-daemon/mediarepo-worker/src/state_data.rs +++ /dev/null @@ -1,88 +0,0 @@ -use mediarepo_core::bincode; -use mediarepo_core::error::RepoResult; -use mediarepo_logic::dao::job::JobDao; -use mediarepo_logic::dto::UpsertJobStateDto; -use serde::de::DeserializeOwned; -use serde::Serialize; -use std::collections::{HashMap, HashSet}; -use std::sync::Arc; -use tokio::sync::RwLock; - -#[derive(Debug)] -pub struct StateData { - job_id: i64, - inner: Arc>>>, - changed_keys: Arc>>, -} - -impl StateData { - pub fn job_id(&self) -> i64 { - self.job_id - } - - /// Loads the state from the database - pub async fn load(job_dao: JobDao, job_id: i64) -> RepoResult { - let states = job_dao.states_for_job_id(job_id).await?; - let states_map = states - .into_iter() - .map(|s| (s.key().to_owned(), s.into_value())) - .collect::>>(); - - Ok(Self { - job_id, - inner: Arc::new(RwLock::new(states_map)), - changed_keys: Default::default(), - }) - } - - /// Returns the deserialized copy of a state object from the inner map - pub async fn entry, T: DeserializeOwned>(&self, key: S) -> RepoResult> { - let entries = self.inner.read().await; - let entry = entries.get(key.as_ref()); - - if let Some(bytes) = entry { - let value = bincode::deserialize(bytes)?; - - Ok(Some(value)) - } else { - Ok(None) - } - } - - /// Stores an entry in inner map - pub async fn store_entry(&self, key: String, value: &T) -> RepoResult<()> { - let entry_bytes = bincode::serialize(value)?; - let mut entries = self.inner.write().await; - entries.insert(key.clone(), entry_bytes); - let mut changed_entries = self.changed_keys.write().await; - changed_entries.insert(key); - - Ok(()) - } - - /// Returns a list of all changed state objects as an upsert list - pub async fn changed_states(&self) -> Vec { - let mut upsert_list = Vec::new(); - - { - let changed_keys = self.changed_keys.read().await; - let entries = self.inner.read().await; - - for key in &*changed_keys { - if let Some(value) = entries.get(key) { - upsert_list.push(UpsertJobStateDto { - job_id: self.job_id, - key: key.to_owned(), - value: value.clone(), - }); - } - } - } - { - let mut changed_keys = self.changed_keys.write().await; - changed_keys.clear(); - } - - upsert_list - } -}