diff --git a/mediarepo-daemon/Cargo.lock b/mediarepo-daemon/Cargo.lock index 0a991af..f032b1c 100644 --- a/mediarepo-daemon/Cargo.lock +++ b/mediarepo-daemon/Cargo.lock @@ -1390,6 +1390,7 @@ dependencies = [ "async-trait", "chrono", "mediarepo-core", + "mediarepo-database", "mediarepo-logic", "serde", "tokio", diff --git a/mediarepo-daemon/mediarepo-worker/Cargo.toml b/mediarepo-daemon/mediarepo-worker/Cargo.toml index b5903ec..d448ff5 100644 --- a/mediarepo-daemon/mediarepo-worker/Cargo.toml +++ b/mediarepo-daemon/mediarepo-worker/Cargo.toml @@ -15,6 +15,9 @@ path = "../mediarepo-core" [dependencies.mediarepo-logic] path = "../mediarepo-logic" +[dependencies.mediarepo-database] +path = "../mediarepo-database" + [dependencies.tokio] version = "1.17.0" features = ["macros"] diff --git a/mediarepo-daemon/mediarepo-worker/src/execution_state.rs b/mediarepo-daemon/mediarepo-worker/src/execution_state.rs new file mode 100644 index 0000000..82a2959 --- /dev/null +++ b/mediarepo-daemon/mediarepo-worker/src/execution_state.rs @@ -0,0 +1,58 @@ +use std::sync::atomic::{AtomicU8, Ordering}; +use std::sync::Arc; + +#[derive(Clone, Copy, Debug, Ord, PartialOrd, Eq, PartialEq)] +pub enum JobExecutionState { + Scheduled, + Running, + Finished, +} + +pub struct ExecutionStateSynchronizer { + state: Arc, +} + +impl Default for ExecutionStateSynchronizer { + fn default() -> Self { + Self { + state: Arc::new(AtomicU8::new(0)), + } + } +} + +impl ExecutionStateSynchronizer { + pub fn set_scheduled(&self) { + self.state.store(0, Ordering::Relaxed); + } + + #[must_use] + pub fn set_running(&self) -> RunningHandle { + self.state.store(1, Ordering::Relaxed); + RunningHandle { + state: Arc::clone(&self.state), + } + } + + pub fn set_finished(&self) { + self.state.store(2, Ordering::SeqCst) + } + + pub fn state(&self) -> JobExecutionState { + match self.state.load(Ordering::SeqCst) { + 0 => JobExecutionState::Scheduled, + 1 => JobExecutionState::Running, + 2 => JobExecutionState::Scheduled, + _ => JobExecutionState::Finished, + } + } +} + +pub struct RunningHandle { + state: Arc, +} + +impl Drop for RunningHandle { + fn drop(&mut self) { + self.state.store(2, Ordering::SeqCst); + } +} diff --git a/mediarepo-daemon/mediarepo-worker/src/jobs/mod.rs b/mediarepo-daemon/mediarepo-worker/src/jobs/mod.rs index 205a5f2..4f42740 100644 --- a/mediarepo-daemon/mediarepo-worker/src/jobs/mod.rs +++ b/mediarepo-daemon/mediarepo-worker/src/jobs/mod.rs @@ -1,4 +1,9 @@ -use crate::progress::JobProgressUpdate; +mod vacuum; + +pub use vacuum::*; + +use crate::execution_state::JobExecutionState; +use crate::progress::{JobProgressUpdate, ProgressSender}; use crate::state_data::StateData; use async_trait::async_trait; use mediarepo_core::error::RepoResult; @@ -9,14 +14,5 @@ use tokio::sync::mpsc::Sender; pub trait ScheduledJob { async fn set_state(&self, state: StateData) -> RepoResult<()>; - async fn run(&self, sender: &mut Sender, repo: Repo) -> RepoResult<()>; - - fn execution_state(&self) -> JobExecutionState; -} - -#[derive(Clone, Copy, Debug, Ord, PartialOrd, Eq, PartialEq)] -pub enum JobExecutionState { - Scheduled, - Running, - Finished, + async fn run(&self, sender: &ProgressSender, repo: Repo) -> RepoResult<()>; } diff --git a/mediarepo-daemon/mediarepo-worker/src/jobs/vacuum.rs b/mediarepo-daemon/mediarepo-worker/src/jobs/vacuum.rs new file mode 100644 index 0000000..7527d85 --- /dev/null +++ b/mediarepo-daemon/mediarepo-worker/src/jobs/vacuum.rs @@ -0,0 +1,29 @@ +use crate::execution_state::{ExecutionStateSynchronizer, JobExecutionState}; +use crate::jobs::ScheduledJob; +use crate::progress::{JobProgressUpdate, ProgressSender}; +use crate::state_data::StateData; +use async_trait::async_trait; +use mediarepo_core::error::RepoResult; +use mediarepo_logic::dao::repo::Repo; +use mediarepo_logic::dao::DaoProvider; +use std::sync::Arc; +use tokio::sync::mpsc::Sender; +use tokio::sync::RwLock; + +#[derive(Default, Clone)] +pub struct VacuumJob; + +#[async_trait] +impl ScheduledJob for VacuumJob { + async fn set_state(&self, _: StateData) -> RepoResult<()> { + Ok(()) + } + + async fn run(&self, sender: &ProgressSender, repo: Repo) -> RepoResult<()> { + sender.send_progress_percent(0.0); + repo.job().vacuum().await?; + sender.send_progress_percent(1.0); + + Ok(()) + } +} diff --git a/mediarepo-daemon/mediarepo-worker/src/lib.rs b/mediarepo-daemon/mediarepo-worker/src/lib.rs index ac3b2ec..8044986 100644 --- a/mediarepo-daemon/mediarepo-worker/src/lib.rs +++ b/mediarepo-daemon/mediarepo-worker/src/lib.rs @@ -1,3 +1,4 @@ +pub mod execution_state; pub mod jobs; pub mod jobs_table; pub mod progress; diff --git a/mediarepo-daemon/mediarepo-worker/src/progress.rs b/mediarepo-daemon/mediarepo-worker/src/progress.rs index 59659d2..f643089 100644 --- a/mediarepo-daemon/mediarepo-worker/src/progress.rs +++ b/mediarepo-daemon/mediarepo-worker/src/progress.rs @@ -1,4 +1,5 @@ -use crate::jobs::JobExecutionState; +use crate::execution_state::{ExecutionStateSynchronizer, JobExecutionState, RunningHandle}; +use tokio::sync::mpsc::Sender; #[derive(Clone, Debug)] pub struct JobProgressUpdate { @@ -46,3 +47,32 @@ impl JobProgressUpdate { self.total = Some(total) } } + +pub struct ProgressSender { + job_id: i64, + execution_state_sync: ExecutionStateSynchronizer, + pub inner: Sender, +} + +impl ProgressSender { + pub fn new(job_id: i64, sender: Sender) -> Self { + Self { + job_id, + inner: sender, + execution_state_sync: ExecutionStateSynchronizer::default(), + } + } + + pub fn send_progress(&self, progress: u64, total: u64) { + let _ = self.inner.send(JobProgressUpdate { + id: self.job_id, + state: JobExecutionState::Running, + progress: Some(progress), + total: Some(total), + }); + } + + pub fn send_progress_percent(&self, percent: f64) { + self.send_progress((percent * 100.0) as u64, 100); + } +} diff --git a/mediarepo-daemon/mediarepo-worker/src/scheduler.rs b/mediarepo-daemon/mediarepo-worker/src/scheduler.rs index c10d632..3303d97 100644 --- a/mediarepo-daemon/mediarepo-worker/src/scheduler.rs +++ b/mediarepo-daemon/mediarepo-worker/src/scheduler.rs @@ -1,11 +1,13 @@ -use crate::jobs::{JobExecutionState, ScheduledJob}; +use crate::execution_state::JobExecutionState; +use crate::jobs::{ScheduledJob, VacuumJob}; use crate::jobs_table::JobsTable; -use crate::progress::JobProgressUpdate; +use crate::progress::{JobProgressUpdate, ProgressSender}; use crate::state_data::StateData; use mediarepo_core::error::RepoResult; use mediarepo_core::futures::select; use mediarepo_core::settings::LogLevel::Debug; use mediarepo_core::tokio_graceful_shutdown::SubsystemHandle; +use mediarepo_database::entities::job::JobType; use mediarepo_logic::dao::repo::Repo; use mediarepo_logic::dao::DaoProvider; use mediarepo_logic::dto::JobDto; @@ -90,9 +92,11 @@ impl Scheduler { progress.set_state(JobExecutionState::Running); let _ = sender.send(progress.clone()).await; - if let Err(e) = scheduled_job.run(&mut sender, repo).await { + let progress_sender = ProgressSender::new(progress.id(), sender); + if let Err(e) = scheduled_job.run(&progress_sender, repo).await { tracing::error!("error occurred during job execution: {}", e); } + let sender = progress_sender.inner; progress.set_state(JobExecutionState::Finished); let _ = sender.send(progress).await; }); @@ -160,5 +164,19 @@ impl Scheduler { } fn create_job(dto: JobDto) -> Box { - todo!("implement") + match dto.job_type() { + JobType::MigrateCDs => { + todo!() + } + JobType::CalculateSizes => { + todo!() + } + JobType::GenerateThumbs => { + todo!() + } + JobType::CheckIntegrity => { + todo!() + } + JobType::Vacuum => Box::new(VacuumJob), + } }