diff --git a/mediarepo-daemon/mediarepo-worker/src/job_dispatcher.rs b/mediarepo-daemon/mediarepo-worker/src/job_dispatcher.rs index 41c1c22..03f4c61 100644 --- a/mediarepo-daemon/mediarepo-worker/src/job_dispatcher.rs +++ b/mediarepo-daemon/mediarepo-worker/src/job_dispatcher.rs @@ -5,7 +5,9 @@ use mediarepo_logic::dao::repo::Repo; use mediarepo_logic::dao::DaoProvider; use std::cell::UnsafeCell; use std::sync::Arc; +use std::time::Duration; use tokio::sync::RwLock; +use tokio::time::Instant; #[derive(Clone)] pub struct JobDispatcher { @@ -23,8 +25,24 @@ impl JobDispatcher { } } - #[tracing::instrument(level = "debug", skip_all)] pub async fn dispatch(&self, job: T) -> Arc> { + self._dispatch(job, None).await + } + + pub async fn dispatch_periodically( + &self, + job: T, + interval: Duration, + ) -> Arc> { + self._dispatch(job, Some(interval)).await + } + + #[tracing::instrument(level = "debug", skip_all)] + async fn _dispatch( + &self, + job: T, + interval: Option, + ) -> Arc> { let status = job.status(); self.add_status::>(status.clone()).await; @@ -37,23 +55,35 @@ impl JobDispatcher { let repo = self.repo.clone(); - subsystem.start("worker-job", |subsystem| async move { - let job_2 = job.clone(); - let result = tokio::select! { - _ = subsystem.on_shutdown_requested() => { - job_2.save_status(repo.job()).await - } - r = job.run(repo.clone()) => { + subsystem.start("worker-job", move |subsystem| async move { + loop { + let start = Instant::now(); + let job_2 = job.clone(); + let result = tokio::select! { + _ = subsystem.on_shutdown_requested() => { + job_2.save_status(repo.job()).await + } + r = job.run(repo.clone()) => { - if let Err(e) = r { - Err(e) - } else { - job.save_status(repo.job()).await + if let Err(e) = r { + Err(e) + } else { + job.save_status(repo.job()).await + } + } + }; + if let Err(e) = result { + tracing::error!("job failed with error: {}", e); + } + if let Some(interval) = interval { + let sleep_duration = interval - start.elapsed(); + tokio::select! { + _ = tokio::time::sleep(sleep_duration) => {}, + _ = subsystem.on_shutdown_requested() => {break} } + } else { + break; } - }; - if let Err(e) = result { - tracing::error!("job failed with error: {}", e); } Ok(()) diff --git a/mediarepo-daemon/mediarepo-worker/src/lib.rs b/mediarepo-daemon/mediarepo-worker/src/lib.rs index b72a922..a362087 100644 --- a/mediarepo-daemon/mediarepo-worker/src/lib.rs +++ b/mediarepo-daemon/mediarepo-worker/src/lib.rs @@ -3,6 +3,7 @@ use crate::jobs::VacuumJob; use mediarepo_core::error::RepoError; use mediarepo_core::tokio_graceful_shutdown::Toplevel; use mediarepo_logic::dao::repo::Repo; +use std::time::Duration; use tokio::sync::oneshot::channel; pub mod job_dispatcher; @@ -15,7 +16,9 @@ pub async fn start(top_level: Toplevel, repo: Repo) -> (Toplevel, JobDispatcher) let dispatcher = JobDispatcher::new(subsystem, repo); tx.send(dispatcher.clone()) .map_err(|_| RepoError::from("failed to send dispatcher"))?; - dispatcher.dispatch(VacuumJob::default()).await; + dispatcher + .dispatch_periodically(VacuumJob::default(), Duration::from_secs(60 * 30)) + .await; Ok(()) });