Add vacuum job

Signed-off-by: trivernis <trivernis@protonmail.com>
feature/jobs
trivernis 3 years ago
parent 496d720219
commit 0cb37e1268
Signed by: Trivernis
GPG Key ID: DFFFCC2C7A02DB45

@ -1390,6 +1390,7 @@ dependencies = [
"async-trait", "async-trait",
"chrono", "chrono",
"mediarepo-core", "mediarepo-core",
"mediarepo-database",
"mediarepo-logic", "mediarepo-logic",
"serde", "serde",
"tokio", "tokio",

@ -15,6 +15,9 @@ path = "../mediarepo-core"
[dependencies.mediarepo-logic] [dependencies.mediarepo-logic]
path = "../mediarepo-logic" path = "../mediarepo-logic"
[dependencies.mediarepo-database]
path = "../mediarepo-database"
[dependencies.tokio] [dependencies.tokio]
version = "1.17.0" version = "1.17.0"
features = ["macros"] features = ["macros"]

@ -0,0 +1,58 @@
use std::sync::atomic::{AtomicU8, Ordering};
use std::sync::Arc;
#[derive(Clone, Copy, Debug, Ord, PartialOrd, Eq, PartialEq)]
pub enum JobExecutionState {
Scheduled,
Running,
Finished,
}
pub struct ExecutionStateSynchronizer {
state: Arc<AtomicU8>,
}
impl Default for ExecutionStateSynchronizer {
fn default() -> Self {
Self {
state: Arc::new(AtomicU8::new(0)),
}
}
}
impl ExecutionStateSynchronizer {
pub fn set_scheduled(&self) {
self.state.store(0, Ordering::Relaxed);
}
#[must_use]
pub fn set_running(&self) -> RunningHandle {
self.state.store(1, Ordering::Relaxed);
RunningHandle {
state: Arc::clone(&self.state),
}
}
pub fn set_finished(&self) {
self.state.store(2, Ordering::SeqCst)
}
pub fn state(&self) -> JobExecutionState {
match self.state.load(Ordering::SeqCst) {
0 => JobExecutionState::Scheduled,
1 => JobExecutionState::Running,
2 => JobExecutionState::Scheduled,
_ => JobExecutionState::Finished,
}
}
}
pub struct RunningHandle {
state: Arc<AtomicU8>,
}
impl Drop for RunningHandle {
fn drop(&mut self) {
self.state.store(2, Ordering::SeqCst);
}
}

@ -1,4 +1,9 @@
use crate::progress::JobProgressUpdate; mod vacuum;
pub use vacuum::*;
use crate::execution_state::JobExecutionState;
use crate::progress::{JobProgressUpdate, ProgressSender};
use crate::state_data::StateData; use crate::state_data::StateData;
use async_trait::async_trait; use async_trait::async_trait;
use mediarepo_core::error::RepoResult; use mediarepo_core::error::RepoResult;
@ -9,14 +14,5 @@ use tokio::sync::mpsc::Sender;
pub trait ScheduledJob { pub trait ScheduledJob {
async fn set_state(&self, state: StateData) -> RepoResult<()>; async fn set_state(&self, state: StateData) -> RepoResult<()>;
async fn run(&self, sender: &mut Sender<JobProgressUpdate>, repo: Repo) -> RepoResult<()>; async fn run(&self, sender: &ProgressSender, repo: Repo) -> RepoResult<()>;
fn execution_state(&self) -> JobExecutionState;
}
#[derive(Clone, Copy, Debug, Ord, PartialOrd, Eq, PartialEq)]
pub enum JobExecutionState {
Scheduled,
Running,
Finished,
} }

@ -0,0 +1,29 @@
use crate::execution_state::{ExecutionStateSynchronizer, JobExecutionState};
use crate::jobs::ScheduledJob;
use crate::progress::{JobProgressUpdate, ProgressSender};
use crate::state_data::StateData;
use async_trait::async_trait;
use mediarepo_core::error::RepoResult;
use mediarepo_logic::dao::repo::Repo;
use mediarepo_logic::dao::DaoProvider;
use std::sync::Arc;
use tokio::sync::mpsc::Sender;
use tokio::sync::RwLock;
#[derive(Default, Clone)]
pub struct VacuumJob;
#[async_trait]
impl ScheduledJob for VacuumJob {
async fn set_state(&self, _: StateData) -> RepoResult<()> {
Ok(())
}
async fn run(&self, sender: &ProgressSender, repo: Repo) -> RepoResult<()> {
sender.send_progress_percent(0.0);
repo.job().vacuum().await?;
sender.send_progress_percent(1.0);
Ok(())
}
}

@ -1,3 +1,4 @@
pub mod execution_state;
pub mod jobs; pub mod jobs;
pub mod jobs_table; pub mod jobs_table;
pub mod progress; pub mod progress;

@ -1,4 +1,5 @@
use crate::jobs::JobExecutionState; use crate::execution_state::{ExecutionStateSynchronizer, JobExecutionState, RunningHandle};
use tokio::sync::mpsc::Sender;
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct JobProgressUpdate { pub struct JobProgressUpdate {
@ -46,3 +47,32 @@ impl JobProgressUpdate {
self.total = Some(total) self.total = Some(total)
} }
} }
pub struct ProgressSender {
job_id: i64,
execution_state_sync: ExecutionStateSynchronizer,
pub inner: Sender<JobProgressUpdate>,
}
impl ProgressSender {
pub fn new(job_id: i64, sender: Sender<JobProgressUpdate>) -> Self {
Self {
job_id,
inner: sender,
execution_state_sync: ExecutionStateSynchronizer::default(),
}
}
pub fn send_progress(&self, progress: u64, total: u64) {
let _ = self.inner.send(JobProgressUpdate {
id: self.job_id,
state: JobExecutionState::Running,
progress: Some(progress),
total: Some(total),
});
}
pub fn send_progress_percent(&self, percent: f64) {
self.send_progress((percent * 100.0) as u64, 100);
}
}

@ -1,11 +1,13 @@
use crate::jobs::{JobExecutionState, ScheduledJob}; use crate::execution_state::JobExecutionState;
use crate::jobs::{ScheduledJob, VacuumJob};
use crate::jobs_table::JobsTable; use crate::jobs_table::JobsTable;
use crate::progress::JobProgressUpdate; use crate::progress::{JobProgressUpdate, ProgressSender};
use crate::state_data::StateData; use crate::state_data::StateData;
use mediarepo_core::error::RepoResult; use mediarepo_core::error::RepoResult;
use mediarepo_core::futures::select; use mediarepo_core::futures::select;
use mediarepo_core::settings::LogLevel::Debug; use mediarepo_core::settings::LogLevel::Debug;
use mediarepo_core::tokio_graceful_shutdown::SubsystemHandle; use mediarepo_core::tokio_graceful_shutdown::SubsystemHandle;
use mediarepo_database::entities::job::JobType;
use mediarepo_logic::dao::repo::Repo; use mediarepo_logic::dao::repo::Repo;
use mediarepo_logic::dao::DaoProvider; use mediarepo_logic::dao::DaoProvider;
use mediarepo_logic::dto::JobDto; use mediarepo_logic::dto::JobDto;
@ -90,9 +92,11 @@ impl Scheduler {
progress.set_state(JobExecutionState::Running); progress.set_state(JobExecutionState::Running);
let _ = sender.send(progress.clone()).await; let _ = sender.send(progress.clone()).await;
if let Err(e) = scheduled_job.run(&mut sender, repo).await { let progress_sender = ProgressSender::new(progress.id(), sender);
if let Err(e) = scheduled_job.run(&progress_sender, repo).await {
tracing::error!("error occurred during job execution: {}", e); tracing::error!("error occurred during job execution: {}", e);
} }
let sender = progress_sender.inner;
progress.set_state(JobExecutionState::Finished); progress.set_state(JobExecutionState::Finished);
let _ = sender.send(progress).await; let _ = sender.send(progress).await;
}); });
@ -160,5 +164,19 @@ impl Scheduler {
} }
fn create_job(dto: JobDto) -> Box<dyn ScheduledJob + Send + Sync> { fn create_job(dto: JobDto) -> Box<dyn ScheduledJob + Send + Sync> {
todo!("implement") match dto.job_type() {
JobType::MigrateCDs => {
todo!()
}
JobType::CalculateSizes => {
todo!()
}
JobType::GenerateThumbs => {
todo!()
}
JobType::CheckIntegrity => {
todo!()
}
JobType::Vacuum => Box::new(VacuumJob),
}
} }

Loading…
Cancel
Save