Add job scheduler implementation with progress report

Signed-off-by: trivernis <trivernis@protonmail.com>
feature/jobs
trivernis 2 years ago
parent 530fbd7606
commit 496d720219
Signed by: Trivernis
GPG Key ID: DFFFCC2C7A02DB45

@ -1393,6 +1393,7 @@ dependencies = [
"mediarepo-logic",
"serde",
"tokio",
"tracing",
]
[[package]]

@ -1,4 +1,9 @@
use crate::dao_provider;
use crate::dto::JobDto;
use chrono::Local;
use mediarepo_core::error::RepoResult;
use mediarepo_database::entities::job;
use sea_orm::prelude::*;
pub mod generate_missing_thumbnails;
pub mod migrate_content_descriptors;
@ -6,3 +11,19 @@ pub mod sqlite_operations;
pub mod state;
dao_provider!(JobDao);
impl JobDao {
/// Returns a list of all jobs that are scheduled (have a next_run date)
pub async fn scheduled_for_now(&self) -> RepoResult<Vec<JobDto>> {
let jobs = job::Entity::find()
.filter(job::Column::NextRun.is_not_null())
.filter(job::Column::NextRun.lt(Local::now().naive_local()))
.all(&self.ctx.db)
.await?
.into_iter()
.map(JobDto::new)
.collect();
Ok(jobs)
}
}

@ -1,8 +1,10 @@
use crate::dao::job::JobDao;
use crate::dto::JobStateDto;
use crate::dto::{JobStateDto, UpsertJobStateDto};
use mediarepo_core::error::RepoResult;
use mediarepo_database::entities::job_state;
use sea_orm::prelude::*;
use sea_orm::ActiveValue::Set;
use sea_orm::{Condition, TransactionTrait};
impl JobDao {
/// Returns all job states for a given job id
@ -17,4 +19,42 @@ impl JobDao {
Ok(states)
}
pub async fn upsert_multiple_states(&self, states: Vec<UpsertJobStateDto>) -> RepoResult<()> {
let trx = self.ctx.db.begin().await?;
job_state::Entity::delete_many()
.filter(build_state_filters(&states))
.exec(&trx)
.await?;
job_state::Entity::insert_many(build_active_state_models(states))
.exec(&trx)
.await?;
trx.commit().await?;
Ok(())
}
}
fn build_state_filters(states: &Vec<UpsertJobStateDto>) -> Condition {
states
.iter()
.map(|s| {
Condition::all()
.add(job_state::Column::JobId.eq(s.job_id))
.add(job_state::Column::Key.eq(s.key.to_owned()))
})
.fold(Condition::any(), |acc, cond| acc.add(cond))
}
fn build_active_state_models(states: Vec<UpsertJobStateDto>) -> Vec<job_state::ActiveModel> {
states
.into_iter()
.map(|s| job_state::ActiveModel {
job_id: Set(s.job_id),
key: Set(s.key),
value: Set(s.value),
})
.collect()
}

@ -2,6 +2,7 @@ use chrono::NaiveDateTime;
use mediarepo_database::entities::job;
use mediarepo_database::entities::job::JobType;
#[derive(Clone, Debug)]
pub struct JobDto {
model: job::Model,
}

@ -7,6 +7,7 @@ edition = "2021"
[dependencies]
async-trait = "0.1.52"
tracing = "0.1.31"
[dependencies.mediarepo-core]
path = "../mediarepo-core"
@ -16,7 +17,7 @@ path = "../mediarepo-logic"
[dependencies.tokio]
version = "1.17.0"
features = []
features = ["macros"]
[dependencies.chrono]
version = "0.4.19"

@ -1,16 +1,15 @@
use crate::progress::JobProgressUpdate;
use crate::state_data::StateData;
use async_trait::async_trait;
use mediarepo_core::error::RepoResult;
use mediarepo_logic::dao::repo::Repo;
use mediarepo_logic::dto::JobDto;
use tokio::sync::mpsc::Sender;
#[async_trait]
pub trait ScheduledJob {
fn new(dto: JobDto) -> Self;
async fn set_state(&self, state: StateData) -> RepoResult<()>;
async fn run(&self, repo: Repo) -> RepoResult<()>;
async fn run(&self, sender: &mut Sender<JobProgressUpdate>, repo: Repo) -> RepoResult<()>;
fn execution_state(&self) -> JobExecutionState;
}

@ -0,0 +1,13 @@
use crate::progress::JobProgressUpdate;
use mediarepo_logic::dto::JobDto;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
pub type JobsTable = Arc<RwLock<HashMap<i64, JobEntry>>>;
#[derive(Clone, Debug)]
pub struct JobEntry {
pub dto: JobDto,
pub last_update: Option<JobProgressUpdate>,
}

@ -1,3 +1,5 @@
pub mod jobs;
pub mod jobs_table;
pub mod progress;
pub mod scheduler;
pub mod state_data;

@ -0,0 +1,48 @@
use crate::jobs::JobExecutionState;
#[derive(Clone, Debug)]
pub struct JobProgressUpdate {
id: i64,
state: JobExecutionState,
progress: Option<u64>,
total: Option<u64>,
}
impl JobProgressUpdate {
pub fn new(id: i64) -> Self {
Self {
id,
state: JobExecutionState::Scheduled,
progress: None,
total: None,
}
}
pub fn id(&self) -> i64 {
self.id
}
pub fn state(&self) -> JobExecutionState {
self.state
}
pub fn set_state(&mut self, state: JobExecutionState) {
self.state = state;
}
pub fn progress(&self) -> Option<u64> {
self.progress
}
pub fn set_progress(&mut self, progress: u64) {
self.progress = Some(progress);
}
pub fn total(&self) -> Option<u64> {
self.total
}
pub fn set_total(&mut self, total: u64) {
self.total = Some(total)
}
}

@ -1,7 +1,164 @@
use crate::jobs::{JobExecutionState, ScheduledJob};
use crate::jobs_table::JobsTable;
use crate::progress::JobProgressUpdate;
use crate::state_data::StateData;
use mediarepo_core::error::RepoResult;
use mediarepo_core::futures::select;
use mediarepo_core::settings::LogLevel::Debug;
use mediarepo_core::tokio_graceful_shutdown::SubsystemHandle;
use mediarepo_logic::dao::repo::Repo;
use mediarepo_logic::dao::DaoProvider;
use mediarepo_logic::dto::JobDto;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::RwLock;
#[derive(Clone)]
pub struct Scheduler {
pub repo: Repo,
repo: Repo,
state_data: Arc<RwLock<Vec<StateData>>>,
jobs_table: JobsTable,
}
impl Scheduler {}
impl Scheduler {
pub fn new(repo: Repo) -> Self {
Self {
repo,
state_data: Default::default(),
jobs_table: Default::default(),
}
}
pub fn run(self, subsystem: SubsystemHandle) -> JobsTable {
tokio::task::spawn({
let subsystem = subsystem.clone();
let scheduler = self.clone();
async move {
scheduler.loop_save_states(subsystem).await;
}
});
let (tx, rx) = channel(32);
tokio::task::spawn({
let scheduler = self.clone();
async move {
scheduler.loop_schedule(subsystem, tx).await;
}
});
let jobs_table = self.jobs_table.clone();
tokio::task::spawn(async move { self.update_on_progress(rx).await });
jobs_table
}
async fn loop_schedule(
self,
subsystem: SubsystemHandle,
job_progress_sender: Sender<JobProgressUpdate>,
) {
loop {
if let Err(e) = self.schedule(&job_progress_sender).await {
tracing::error!("failed to schedule jobs: {}", e);
}
tokio::select! {
_ = tokio::time::sleep(Duration::from_secs(1)) => {},
_ = subsystem.on_shutdown_requested() => {
break;
}
}
}
}
async fn schedule(&self, job_progress_sender: &Sender<JobProgressUpdate>) -> RepoResult<()> {
let mut scheduled_jobs = self.repo.job().scheduled_for_now().await?;
let running_jobs = self.running_jobs().await;
scheduled_jobs.retain(|j| !running_jobs.contains(&j.id()));
for job in scheduled_jobs {
let mut sender = job_progress_sender.clone();
let mut progress = JobProgressUpdate::new(job.id());
let scheduled_job = create_job(job);
let _ = sender.send(progress.clone()).await;
let repo = self.repo.clone();
tokio::task::spawn(async move {
progress.set_state(JobExecutionState::Running);
let _ = sender.send(progress.clone()).await;
if let Err(e) = scheduled_job.run(&mut sender, repo).await {
tracing::error!("error occurred during job execution: {}", e);
}
progress.set_state(JobExecutionState::Finished);
let _ = sender.send(progress).await;
});
}
Ok(())
}
async fn loop_save_states(self, subsystem: SubsystemHandle) {
loop {
if let Err(e) = self.save_states().await {
tracing::error!("failed to save job state {}", e);
}
tokio::select! {
_ = tokio::time::sleep(Duration::from_secs(1)) => {},
_ = subsystem.on_shutdown_requested() => {
let _ = self.save_states().await;
break;
}
}
}
}
async fn save_states(&self) -> RepoResult<()> {
let mut changed_states = Vec::new();
{
let states = self.state_data.read().await;
for state in &*states {
changed_states.append(&mut state.changed_states().await);
}
}
self.repo
.job()
.upsert_multiple_states(changed_states)
.await?;
Ok(())
}
async fn update_on_progress(mut self, mut rx: Receiver<JobProgressUpdate>) {
while let Some(progress) = rx.recv().await {
let mut jobs_table = self.jobs_table.write().await;
if let JobExecutionState::Finished = progress.state() {
let mut state_data = self.state_data.write().await;
state_data.retain(|s| s.job_id() != progress.id());
}
if let Some(entry) = jobs_table.get_mut(&progress.id()) {
entry.last_update = Some(progress);
}
}
}
async fn running_jobs(&self) -> Vec<i64> {
let jobs_table = self.jobs_table.read().await;
jobs_table
.values()
.filter_map(|v| v.last_update.as_ref())
.filter(|u| u.state() != JobExecutionState::Finished)
.map(|u| u.id())
.collect()
}
}
fn create_job(dto: JobDto) -> Box<dyn ScheduledJob + Send + Sync> {
todo!("implement")
}

@ -16,6 +16,10 @@ pub struct StateData {
}
impl StateData {
pub fn job_id(&self) -> i64 {
self.job_id
}
/// Loads the state from the database
pub async fn load(job_dao: JobDao, job_id: i64) -> RepoResult<Self> {
let states = job_dao.states_for_job_id(job_id).await?;

Loading…
Cancel
Save