diff --git a/mediarepo-daemon/Cargo.lock b/mediarepo-daemon/Cargo.lock index 7dfc9f1..0a991af 100644 --- a/mediarepo-daemon/Cargo.lock +++ b/mediarepo-daemon/Cargo.lock @@ -1393,6 +1393,7 @@ dependencies = [ "mediarepo-logic", "serde", "tokio", + "tracing", ] [[package]] diff --git a/mediarepo-daemon/mediarepo-logic/src/dao/job/mod.rs b/mediarepo-daemon/mediarepo-logic/src/dao/job/mod.rs index b2077bc..1e17ca9 100644 --- a/mediarepo-daemon/mediarepo-logic/src/dao/job/mod.rs +++ b/mediarepo-daemon/mediarepo-logic/src/dao/job/mod.rs @@ -1,4 +1,9 @@ use crate::dao_provider; +use crate::dto::JobDto; +use chrono::Local; +use mediarepo_core::error::RepoResult; +use mediarepo_database::entities::job; +use sea_orm::prelude::*; pub mod generate_missing_thumbnails; pub mod migrate_content_descriptors; @@ -6,3 +11,19 @@ pub mod sqlite_operations; pub mod state; dao_provider!(JobDao); + +impl JobDao { + /// Returns a list of all jobs that are scheduled (have a next_run date) + pub async fn scheduled_for_now(&self) -> RepoResult> { + let jobs = job::Entity::find() + .filter(job::Column::NextRun.is_not_null()) + .filter(job::Column::NextRun.lt(Local::now().naive_local())) + .all(&self.ctx.db) + .await? + .into_iter() + .map(JobDto::new) + .collect(); + + Ok(jobs) + } +} diff --git a/mediarepo-daemon/mediarepo-logic/src/dao/job/state.rs b/mediarepo-daemon/mediarepo-logic/src/dao/job/state.rs index db6c13b..8370305 100644 --- a/mediarepo-daemon/mediarepo-logic/src/dao/job/state.rs +++ b/mediarepo-daemon/mediarepo-logic/src/dao/job/state.rs @@ -1,8 +1,10 @@ use crate::dao::job::JobDao; -use crate::dto::JobStateDto; +use crate::dto::{JobStateDto, UpsertJobStateDto}; use mediarepo_core::error::RepoResult; use mediarepo_database::entities::job_state; use sea_orm::prelude::*; +use sea_orm::ActiveValue::Set; +use sea_orm::{Condition, TransactionTrait}; impl JobDao { /// Returns all job states for a given job id @@ -17,4 +19,42 @@ impl JobDao { Ok(states) } + + pub async fn upsert_multiple_states(&self, states: Vec) -> RepoResult<()> { + let trx = self.ctx.db.begin().await?; + + job_state::Entity::delete_many() + .filter(build_state_filters(&states)) + .exec(&trx) + .await?; + job_state::Entity::insert_many(build_active_state_models(states)) + .exec(&trx) + .await?; + + trx.commit().await?; + + Ok(()) + } +} + +fn build_state_filters(states: &Vec) -> Condition { + states + .iter() + .map(|s| { + Condition::all() + .add(job_state::Column::JobId.eq(s.job_id)) + .add(job_state::Column::Key.eq(s.key.to_owned())) + }) + .fold(Condition::any(), |acc, cond| acc.add(cond)) +} + +fn build_active_state_models(states: Vec) -> Vec { + states + .into_iter() + .map(|s| job_state::ActiveModel { + job_id: Set(s.job_id), + key: Set(s.key), + value: Set(s.value), + }) + .collect() } diff --git a/mediarepo-daemon/mediarepo-logic/src/dto/job.rs b/mediarepo-daemon/mediarepo-logic/src/dto/job.rs index 32a27ee..f4c186e 100644 --- a/mediarepo-daemon/mediarepo-logic/src/dto/job.rs +++ b/mediarepo-daemon/mediarepo-logic/src/dto/job.rs @@ -2,6 +2,7 @@ use chrono::NaiveDateTime; use mediarepo_database::entities::job; use mediarepo_database::entities::job::JobType; +#[derive(Clone, Debug)] pub struct JobDto { model: job::Model, } diff --git a/mediarepo-daemon/mediarepo-worker/Cargo.toml b/mediarepo-daemon/mediarepo-worker/Cargo.toml index 96af684..b5903ec 100644 --- a/mediarepo-daemon/mediarepo-worker/Cargo.toml +++ b/mediarepo-daemon/mediarepo-worker/Cargo.toml @@ -7,6 +7,7 @@ edition = "2021" [dependencies] async-trait = "0.1.52" +tracing = "0.1.31" [dependencies.mediarepo-core] path = "../mediarepo-core" @@ -16,7 +17,7 @@ path = "../mediarepo-logic" [dependencies.tokio] version = "1.17.0" -features = [] +features = ["macros"] [dependencies.chrono] version = "0.4.19" diff --git a/mediarepo-daemon/mediarepo-worker/src/jobs/mod.rs b/mediarepo-daemon/mediarepo-worker/src/jobs/mod.rs index ad9c8d8..205a5f2 100644 --- a/mediarepo-daemon/mediarepo-worker/src/jobs/mod.rs +++ b/mediarepo-daemon/mediarepo-worker/src/jobs/mod.rs @@ -1,16 +1,15 @@ +use crate::progress::JobProgressUpdate; use crate::state_data::StateData; use async_trait::async_trait; use mediarepo_core::error::RepoResult; use mediarepo_logic::dao::repo::Repo; -use mediarepo_logic::dto::JobDto; +use tokio::sync::mpsc::Sender; #[async_trait] pub trait ScheduledJob { - fn new(dto: JobDto) -> Self; - async fn set_state(&self, state: StateData) -> RepoResult<()>; - async fn run(&self, repo: Repo) -> RepoResult<()>; + async fn run(&self, sender: &mut Sender, repo: Repo) -> RepoResult<()>; fn execution_state(&self) -> JobExecutionState; } diff --git a/mediarepo-daemon/mediarepo-worker/src/jobs_table.rs b/mediarepo-daemon/mediarepo-worker/src/jobs_table.rs new file mode 100644 index 0000000..d9c30f7 --- /dev/null +++ b/mediarepo-daemon/mediarepo-worker/src/jobs_table.rs @@ -0,0 +1,13 @@ +use crate::progress::JobProgressUpdate; +use mediarepo_logic::dto::JobDto; +use std::collections::HashMap; +use std::sync::Arc; +use tokio::sync::RwLock; + +pub type JobsTable = Arc>>; + +#[derive(Clone, Debug)] +pub struct JobEntry { + pub dto: JobDto, + pub last_update: Option, +} diff --git a/mediarepo-daemon/mediarepo-worker/src/lib.rs b/mediarepo-daemon/mediarepo-worker/src/lib.rs index 6046de1..ac3b2ec 100644 --- a/mediarepo-daemon/mediarepo-worker/src/lib.rs +++ b/mediarepo-daemon/mediarepo-worker/src/lib.rs @@ -1,3 +1,5 @@ pub mod jobs; +pub mod jobs_table; +pub mod progress; pub mod scheduler; pub mod state_data; diff --git a/mediarepo-daemon/mediarepo-worker/src/progress.rs b/mediarepo-daemon/mediarepo-worker/src/progress.rs new file mode 100644 index 0000000..59659d2 --- /dev/null +++ b/mediarepo-daemon/mediarepo-worker/src/progress.rs @@ -0,0 +1,48 @@ +use crate::jobs::JobExecutionState; + +#[derive(Clone, Debug)] +pub struct JobProgressUpdate { + id: i64, + state: JobExecutionState, + progress: Option, + total: Option, +} + +impl JobProgressUpdate { + pub fn new(id: i64) -> Self { + Self { + id, + state: JobExecutionState::Scheduled, + progress: None, + total: None, + } + } + + pub fn id(&self) -> i64 { + self.id + } + + pub fn state(&self) -> JobExecutionState { + self.state + } + + pub fn set_state(&mut self, state: JobExecutionState) { + self.state = state; + } + + pub fn progress(&self) -> Option { + self.progress + } + + pub fn set_progress(&mut self, progress: u64) { + self.progress = Some(progress); + } + + pub fn total(&self) -> Option { + self.total + } + + pub fn set_total(&mut self, total: u64) { + self.total = Some(total) + } +} diff --git a/mediarepo-daemon/mediarepo-worker/src/scheduler.rs b/mediarepo-daemon/mediarepo-worker/src/scheduler.rs index f7e244d..c10d632 100644 --- a/mediarepo-daemon/mediarepo-worker/src/scheduler.rs +++ b/mediarepo-daemon/mediarepo-worker/src/scheduler.rs @@ -1,7 +1,164 @@ +use crate::jobs::{JobExecutionState, ScheduledJob}; +use crate::jobs_table::JobsTable; +use crate::progress::JobProgressUpdate; +use crate::state_data::StateData; +use mediarepo_core::error::RepoResult; +use mediarepo_core::futures::select; +use mediarepo_core::settings::LogLevel::Debug; +use mediarepo_core::tokio_graceful_shutdown::SubsystemHandle; use mediarepo_logic::dao::repo::Repo; +use mediarepo_logic::dao::DaoProvider; +use mediarepo_logic::dto::JobDto; +use std::sync::Arc; +use std::time::Duration; +use tokio::sync::mpsc::{channel, Receiver, Sender}; +use tokio::sync::RwLock; +#[derive(Clone)] pub struct Scheduler { - pub repo: Repo, + repo: Repo, + state_data: Arc>>, + jobs_table: JobsTable, } -impl Scheduler {} +impl Scheduler { + pub fn new(repo: Repo) -> Self { + Self { + repo, + state_data: Default::default(), + jobs_table: Default::default(), + } + } + + pub fn run(self, subsystem: SubsystemHandle) -> JobsTable { + tokio::task::spawn({ + let subsystem = subsystem.clone(); + let scheduler = self.clone(); + async move { + scheduler.loop_save_states(subsystem).await; + } + }); + + let (tx, rx) = channel(32); + + tokio::task::spawn({ + let scheduler = self.clone(); + async move { + scheduler.loop_schedule(subsystem, tx).await; + } + }); + + let jobs_table = self.jobs_table.clone(); + + tokio::task::spawn(async move { self.update_on_progress(rx).await }); + + jobs_table + } + + async fn loop_schedule( + self, + subsystem: SubsystemHandle, + job_progress_sender: Sender, + ) { + loop { + if let Err(e) = self.schedule(&job_progress_sender).await { + tracing::error!("failed to schedule jobs: {}", e); + } + tokio::select! { + _ = tokio::time::sleep(Duration::from_secs(1)) => {}, + _ = subsystem.on_shutdown_requested() => { + break; + } + } + } + } + + async fn schedule(&self, job_progress_sender: &Sender) -> RepoResult<()> { + let mut scheduled_jobs = self.repo.job().scheduled_for_now().await?; + let running_jobs = self.running_jobs().await; + + scheduled_jobs.retain(|j| !running_jobs.contains(&j.id())); + + for job in scheduled_jobs { + let mut sender = job_progress_sender.clone(); + let mut progress = JobProgressUpdate::new(job.id()); + let scheduled_job = create_job(job); + let _ = sender.send(progress.clone()).await; + let repo = self.repo.clone(); + + tokio::task::spawn(async move { + progress.set_state(JobExecutionState::Running); + let _ = sender.send(progress.clone()).await; + + if let Err(e) = scheduled_job.run(&mut sender, repo).await { + tracing::error!("error occurred during job execution: {}", e); + } + progress.set_state(JobExecutionState::Finished); + let _ = sender.send(progress).await; + }); + } + + Ok(()) + } + + async fn loop_save_states(self, subsystem: SubsystemHandle) { + loop { + if let Err(e) = self.save_states().await { + tracing::error!("failed to save job state {}", e); + } + + tokio::select! { + _ = tokio::time::sleep(Duration::from_secs(1)) => {}, + _ = subsystem.on_shutdown_requested() => { + let _ = self.save_states().await; + break; + } + } + } + } + + async fn save_states(&self) -> RepoResult<()> { + let mut changed_states = Vec::new(); + { + let states = self.state_data.read().await; + for state in &*states { + changed_states.append(&mut state.changed_states().await); + } + } + self.repo + .job() + .upsert_multiple_states(changed_states) + .await?; + + Ok(()) + } + + async fn update_on_progress(mut self, mut rx: Receiver) { + while let Some(progress) = rx.recv().await { + let mut jobs_table = self.jobs_table.write().await; + + if let JobExecutionState::Finished = progress.state() { + let mut state_data = self.state_data.write().await; + state_data.retain(|s| s.job_id() != progress.id()); + } + + if let Some(entry) = jobs_table.get_mut(&progress.id()) { + entry.last_update = Some(progress); + } + } + } + + async fn running_jobs(&self) -> Vec { + let jobs_table = self.jobs_table.read().await; + jobs_table + .values() + .filter_map(|v| v.last_update.as_ref()) + .filter(|u| u.state() != JobExecutionState::Finished) + .map(|u| u.id()) + .collect() + } +} + +fn create_job(dto: JobDto) -> Box { + todo!("implement") +} diff --git a/mediarepo-daemon/mediarepo-worker/src/state_data.rs b/mediarepo-daemon/mediarepo-worker/src/state_data.rs index f197fb5..7a9f5e7 100644 --- a/mediarepo-daemon/mediarepo-worker/src/state_data.rs +++ b/mediarepo-daemon/mediarepo-worker/src/state_data.rs @@ -16,6 +16,10 @@ pub struct StateData { } impl StateData { + pub fn job_id(&self) -> i64 { + self.job_id + } + /// Loads the state from the database pub async fn load(job_dao: JobDao, job_id: i64) -> RepoResult { let states = job_dao.states_for_job_id(job_id).await?;