Simplify job implementation

Signed-off-by: trivernis <trivernis@protonmail.com>
feature/jobs
trivernis 2 years ago
parent 1735d08c1d
commit a145d604d9
Signed by: Trivernis
GPG Key ID: DFFFCC2C7A02DB45

@ -1,42 +0,0 @@
use chrono::NaiveDateTime;
use sea_orm::prelude::*;
#[derive(Clone, Debug, PartialEq, DeriveEntityModel)]
#[sea_orm(table_name = "namespaces")]
pub struct Model {
#[sea_orm(primary_key)]
pub id: i64,
pub job_type: JobType,
pub name: Option<String>,
pub next_run: Option<NaiveDateTime>,
pub interval: Option<i64>,
}
#[derive(Clone, Copy, Debug, PartialEq, EnumIter, DeriveActiveEnum)]
#[sea_orm(rs_type = "u32", db_type = "Integer")]
pub enum JobType {
#[sea_orm(num_value = 10)]
MigrateCDs,
#[sea_orm(num_value = 20)]
CalculateSizes,
#[sea_orm(num_value = 30)]
GenerateThumbs,
#[sea_orm(num_value = 40)]
CheckIntegrity,
#[sea_orm(num_value = 50)]
Vacuum,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {
#[sea_orm(has_many = "super::job_state::Entity")]
JobState,
}
impl Related<super::job_state::Entity> for Entity {
fn to() -> RelationDef {
Relation::JobState.def()
}
}
impl ActiveModelBehavior for ActiveModel {}

@ -1,34 +0,0 @@
use chrono::NaiveDateTime;
use mediarepo_database::entities::job;
use mediarepo_database::entities::job::JobType;
#[derive(Clone, Debug)]
pub struct JobDto {
model: job::Model,
}
impl JobDto {
pub(crate) fn new(model: job::Model) -> Self {
Self { model }
}
pub fn id(&self) -> i64 {
self.model.id
}
pub fn job_type(&self) -> JobType {
self.model.job_type
}
pub fn name(&self) -> Option<&String> {
self.model.name.as_ref()
}
pub fn next_run(&self) -> Option<NaiveDateTime> {
self.model.next_run
}
pub fn interval(&self) -> Option<i64> {
self.model.interval
}
}

@ -1,58 +0,0 @@
use std::sync::atomic::{AtomicU8, Ordering};
use std::sync::Arc;
#[derive(Clone, Copy, Debug, Ord, PartialOrd, Eq, PartialEq)]
pub enum JobExecutionState {
Scheduled,
Running,
Finished,
}
pub struct ExecutionStateSynchronizer {
state: Arc<AtomicU8>,
}
impl Default for ExecutionStateSynchronizer {
fn default() -> Self {
Self {
state: Arc::new(AtomicU8::new(0)),
}
}
}
impl ExecutionStateSynchronizer {
pub fn set_scheduled(&self) {
self.state.store(0, Ordering::Relaxed);
}
#[must_use]
pub fn set_running(&self) -> RunningHandle {
self.state.store(1, Ordering::Relaxed);
RunningHandle {
state: Arc::clone(&self.state),
}
}
pub fn set_finished(&self) {
self.state.store(2, Ordering::SeqCst)
}
pub fn state(&self) -> JobExecutionState {
match self.state.load(Ordering::SeqCst) {
0 => JobExecutionState::Scheduled,
1 => JobExecutionState::Running,
2 => JobExecutionState::Scheduled,
_ => JobExecutionState::Finished,
}
}
}
pub struct RunningHandle {
state: Arc<AtomicU8>,
}
impl Drop for RunningHandle {
fn drop(&mut self) {
self.state.store(2, Ordering::SeqCst);
}
}

@ -1,13 +0,0 @@
use crate::progress::JobProgressUpdate;
use mediarepo_logic::dto::JobDto;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
pub type JobsTable = Arc<RwLock<HashMap<i64, JobEntry>>>;
#[derive(Clone, Debug)]
pub struct JobEntry {
pub dto: JobDto,
pub last_update: Option<JobProgressUpdate>,
}

@ -1,78 +0,0 @@
use crate::execution_state::{ExecutionStateSynchronizer, JobExecutionState, RunningHandle};
use tokio::sync::mpsc::Sender;
#[derive(Clone, Debug)]
pub struct JobProgressUpdate {
id: i64,
state: JobExecutionState,
progress: Option<u64>,
total: Option<u64>,
}
impl JobProgressUpdate {
pub fn new(id: i64) -> Self {
Self {
id,
state: JobExecutionState::Scheduled,
progress: None,
total: None,
}
}
pub fn id(&self) -> i64 {
self.id
}
pub fn state(&self) -> JobExecutionState {
self.state
}
pub fn set_state(&mut self, state: JobExecutionState) {
self.state = state;
}
pub fn progress(&self) -> Option<u64> {
self.progress
}
pub fn set_progress(&mut self, progress: u64) {
self.progress = Some(progress);
}
pub fn total(&self) -> Option<u64> {
self.total
}
pub fn set_total(&mut self, total: u64) {
self.total = Some(total)
}
}
pub struct ProgressSender {
job_id: i64,
execution_state_sync: ExecutionStateSynchronizer,
pub inner: Sender<JobProgressUpdate>,
}
impl ProgressSender {
pub fn new(job_id: i64, sender: Sender<JobProgressUpdate>) -> Self {
Self {
job_id,
inner: sender,
execution_state_sync: ExecutionStateSynchronizer::default(),
}
}
pub fn send_progress(&self, progress: u64, total: u64) {
let _ = self.inner.send(JobProgressUpdate {
id: self.job_id,
state: JobExecutionState::Running,
progress: Some(progress),
total: Some(total),
});
}
pub fn send_progress_percent(&self, percent: f64) {
self.send_progress((percent * 100.0) as u64, 100);
}
}

@ -1,182 +0,0 @@
use crate::execution_state::JobExecutionState;
use crate::jobs::{ScheduledJob, VacuumJob};
use crate::jobs_table::JobsTable;
use crate::progress::{JobProgressUpdate, ProgressSender};
use crate::state_data::StateData;
use mediarepo_core::error::RepoResult;
use mediarepo_core::futures::select;
use mediarepo_core::settings::LogLevel::Debug;
use mediarepo_core::tokio_graceful_shutdown::SubsystemHandle;
use mediarepo_database::entities::job::JobType;
use mediarepo_logic::dao::repo::Repo;
use mediarepo_logic::dao::DaoProvider;
use mediarepo_logic::dto::JobDto;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::RwLock;
#[derive(Clone)]
pub struct Scheduler {
repo: Repo,
state_data: Arc<RwLock<Vec<StateData>>>,
jobs_table: JobsTable,
}
impl Scheduler {
pub fn new(repo: Repo) -> Self {
Self {
repo,
state_data: Default::default(),
jobs_table: Default::default(),
}
}
pub fn run(self, subsystem: SubsystemHandle) -> JobsTable {
tokio::task::spawn({
let subsystem = subsystem.clone();
let scheduler = self.clone();
async move {
scheduler.loop_save_states(subsystem).await;
}
});
let (tx, rx) = channel(32);
tokio::task::spawn({
let scheduler = self.clone();
async move {
scheduler.loop_schedule(subsystem, tx).await;
}
});
let jobs_table = self.jobs_table.clone();
tokio::task::spawn(async move { self.update_on_progress(rx).await });
jobs_table
}
async fn loop_schedule(
self,
subsystem: SubsystemHandle,
job_progress_sender: Sender<JobProgressUpdate>,
) {
loop {
if let Err(e) = self.schedule(&job_progress_sender).await {
tracing::error!("failed to schedule jobs: {}", e);
}
tokio::select! {
_ = tokio::time::sleep(Duration::from_secs(1)) => {},
_ = subsystem.on_shutdown_requested() => {
break;
}
}
}
}
async fn schedule(&self, job_progress_sender: &Sender<JobProgressUpdate>) -> RepoResult<()> {
let mut scheduled_jobs = self.repo.job().scheduled_for_now().await?;
let running_jobs = self.running_jobs().await;
scheduled_jobs.retain(|j| !running_jobs.contains(&j.id()));
for job in scheduled_jobs {
let mut sender = job_progress_sender.clone();
let mut progress = JobProgressUpdate::new(job.id());
let scheduled_job = create_job(job);
let _ = sender.send(progress.clone()).await;
let repo = self.repo.clone();
tokio::task::spawn(async move {
progress.set_state(JobExecutionState::Running);
let _ = sender.send(progress.clone()).await;
let progress_sender = ProgressSender::new(progress.id(), sender);
if let Err(e) = scheduled_job.run(&progress_sender, repo).await {
tracing::error!("error occurred during job execution: {}", e);
}
let sender = progress_sender.inner;
progress.set_state(JobExecutionState::Finished);
let _ = sender.send(progress).await;
});
}
Ok(())
}
async fn loop_save_states(self, subsystem: SubsystemHandle) {
loop {
if let Err(e) = self.save_states().await {
tracing::error!("failed to save job state {}", e);
}
tokio::select! {
_ = tokio::time::sleep(Duration::from_secs(1)) => {},
_ = subsystem.on_shutdown_requested() => {
let _ = self.save_states().await;
break;
}
}
}
}
async fn save_states(&self) -> RepoResult<()> {
let mut changed_states = Vec::new();
{
let states = self.state_data.read().await;
for state in &*states {
changed_states.append(&mut state.changed_states().await);
}
}
self.repo
.job()
.upsert_multiple_states(changed_states)
.await?;
Ok(())
}
async fn update_on_progress(mut self, mut rx: Receiver<JobProgressUpdate>) {
while let Some(progress) = rx.recv().await {
let mut jobs_table = self.jobs_table.write().await;
if let JobExecutionState::Finished = progress.state() {
let mut state_data = self.state_data.write().await;
state_data.retain(|s| s.job_id() != progress.id());
}
if let Some(entry) = jobs_table.get_mut(&progress.id()) {
entry.last_update = Some(progress);
}
}
}
async fn running_jobs(&self) -> Vec<i64> {
let jobs_table = self.jobs_table.read().await;
jobs_table
.values()
.filter_map(|v| v.last_update.as_ref())
.filter(|u| u.state() != JobExecutionState::Finished)
.map(|u| u.id())
.collect()
}
}
fn create_job(dto: JobDto) -> Box<dyn ScheduledJob + Send + Sync> {
match dto.job_type() {
JobType::MigrateCDs => {
todo!()
}
JobType::CalculateSizes => {
todo!()
}
JobType::GenerateThumbs => {
todo!()
}
JobType::CheckIntegrity => {
todo!()
}
JobType::Vacuum => Box::new(VacuumJob),
}
}

@ -1,88 +0,0 @@
use mediarepo_core::bincode;
use mediarepo_core::error::RepoResult;
use mediarepo_logic::dao::job::JobDao;
use mediarepo_logic::dto::UpsertJobStateDto;
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use tokio::sync::RwLock;
#[derive(Debug)]
pub struct StateData {
job_id: i64,
inner: Arc<RwLock<HashMap<String, Vec<u8>>>>,
changed_keys: Arc<RwLock<HashSet<String>>>,
}
impl StateData {
pub fn job_id(&self) -> i64 {
self.job_id
}
/// Loads the state from the database
pub async fn load(job_dao: JobDao, job_id: i64) -> RepoResult<Self> {
let states = job_dao.states_for_job_id(job_id).await?;
let states_map = states
.into_iter()
.map(|s| (s.key().to_owned(), s.into_value()))
.collect::<HashMap<String, Vec<u8>>>();
Ok(Self {
job_id,
inner: Arc::new(RwLock::new(states_map)),
changed_keys: Default::default(),
})
}
/// Returns the deserialized copy of a state object from the inner map
pub async fn entry<S: AsRef<str>, T: DeserializeOwned>(&self, key: S) -> RepoResult<Option<T>> {
let entries = self.inner.read().await;
let entry = entries.get(key.as_ref());
if let Some(bytes) = entry {
let value = bincode::deserialize(bytes)?;
Ok(Some(value))
} else {
Ok(None)
}
}
/// Stores an entry in inner map
pub async fn store_entry<T: Serialize>(&self, key: String, value: &T) -> RepoResult<()> {
let entry_bytes = bincode::serialize(value)?;
let mut entries = self.inner.write().await;
entries.insert(key.clone(), entry_bytes);
let mut changed_entries = self.changed_keys.write().await;
changed_entries.insert(key);
Ok(())
}
/// Returns a list of all changed state objects as an upsert list
pub async fn changed_states(&self) -> Vec<UpsertJobStateDto> {
let mut upsert_list = Vec::new();
{
let changed_keys = self.changed_keys.read().await;
let entries = self.inner.read().await;
for key in &*changed_keys {
if let Some(value) = entries.get(key) {
upsert_list.push(UpsertJobStateDto {
job_id: self.job_id,
key: key.to_owned(),
value: value.clone(),
});
}
}
}
{
let mut changed_keys = self.changed_keys.write().await;
changed_keys.clear();
}
upsert_list
}
}
Loading…
Cancel
Save