diff --git a/mediarepo-daemon/mediarepo-socket/src/namespaces/jobs.rs b/mediarepo-daemon/mediarepo-socket/src/namespaces/jobs.rs index 568c283..273dd11 100644 --- a/mediarepo-daemon/mediarepo-socket/src/namespaces/jobs.rs +++ b/mediarepo-daemon/mediarepo-socket/src/namespaces/jobs.rs @@ -39,12 +39,18 @@ impl JobsNamespace { JobType::CalculateSizes => calculate_all_sizes(ctx).await?, JobType::CheckIntegrity => job_dao.check_integrity().await?, JobType::Vacuum => { - dispatcher.dispatch(VacuumJob::default()).await; + let mut handle = dispatcher.dispatch(VacuumJob::default()).await; + if run_request.sync { + handle.try_result().await?; + } } JobType::GenerateThumbnails => { - dispatcher + let mut handle = dispatcher .dispatch(GenerateMissingThumbsJob::default()) .await; + if run_request.sync { + handle.try_result().await?; + } } } @@ -62,10 +68,10 @@ async fn calculate_all_sizes(ctx: &Context) -> RepoResult<()> { }; let job = CalculateSizesJob::new(repo_path, settings); let dispatcher = get_job_dispatcher_from_context(ctx).await; - let state = dispatcher.dispatch(job).await; + let handle = dispatcher.dispatch(job).await; let mut rx = { - let state = state.read().await; - state.sizes_channel.subscribe() + let status = handle.status().read().await; + status.sizes_channel.subscribe() }; while let Ok((size_type, size)) = rx.recv().await { diff --git a/mediarepo-daemon/mediarepo-worker/src/handle.rs b/mediarepo-daemon/mediarepo-worker/src/handle.rs new file mode 100644 index 0000000..8c9a603 --- /dev/null +++ b/mediarepo-daemon/mediarepo-worker/src/handle.rs @@ -0,0 +1,102 @@ +use mediarepo_core::error::{RepoError, RepoResult}; +use std::mem; +use std::ops::{Deref, DerefMut}; +use std::sync::Arc; +use tokio::sync::broadcast::{Receiver, Sender}; +use tokio::sync::RwLock; + +pub struct JobHandle { + status: Arc>, + state: Arc>, + result_receiver: CloneableReceiver>>>, +} + +impl Clone for JobHandle { + fn clone(&self) -> Self { + Self { + status: self.status.clone(), + state: self.state.clone(), + result_receiver: self.result_receiver.clone(), + } + } +} + +impl JobHandle { + pub fn new( + status: Arc>, + state: Arc>, + result_receiver: CloneableReceiver>>>, + ) -> Self { + Self { + status, + state, + result_receiver, + } + } + + pub async fn state(&self) -> JobState { + *self.state.read().await + } + + pub fn status(&self) -> &Arc> { + &self.status + } + + pub async fn result(&mut self) -> Arc>> { + match self.result_receiver.recv().await { + Ok(v) => v, + Err(e) => Arc::new(RwLock::new(Err(RepoError::from(&*e.to_string())))), + } + } + + pub async fn try_result(&mut self) -> RepoResult { + let shared_result = self.result().await; + let mut result = shared_result.write().await; + mem::replace(&mut *result, Err(RepoError::from("result taken"))) + } +} + +#[derive(Clone, Copy, Debug, Ord, PartialOrd, Eq, PartialEq)] +pub enum JobState { + Queued, + Scheduled, + Running, + Finished, +} + +pub struct CloneableReceiver { + receiver: Receiver, + sender: Sender, +} + +impl CloneableReceiver { + pub fn new(sender: Sender) -> Self { + Self { + receiver: sender.subscribe(), + sender, + } + } +} + +impl Clone for CloneableReceiver { + fn clone(&self) -> Self { + Self { + sender: self.sender.clone(), + receiver: self.sender.subscribe(), + } + } +} + +impl Deref for CloneableReceiver { + type Target = Receiver; + + fn deref(&self) -> &Self::Target { + &self.receiver + } +} + +impl DerefMut for CloneableReceiver { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.receiver + } +} diff --git a/mediarepo-daemon/mediarepo-worker/src/job_dispatcher.rs b/mediarepo-daemon/mediarepo-worker/src/job_dispatcher.rs index 5d17e05..1cf58c2 100644 --- a/mediarepo-daemon/mediarepo-worker/src/job_dispatcher.rs +++ b/mediarepo-daemon/mediarepo-worker/src/job_dispatcher.rs @@ -1,3 +1,4 @@ +use crate::handle::{CloneableReceiver, JobHandle, JobState}; use crate::jobs::{Job, JobTypeKey}; use mediarepo_core::tokio_graceful_shutdown::SubsystemHandle; use mediarepo_core::trait_bound_typemap::{SendSyncTypeMap, TypeMap, TypeMapKey}; @@ -5,26 +6,27 @@ use mediarepo_logic::dao::repo::Repo; use mediarepo_logic::dao::DaoProvider; use std::sync::Arc; use std::time::Duration; +use tokio::sync::broadcast::channel; use tokio::sync::RwLock; use tokio::time::Instant; #[derive(Clone)] pub struct JobDispatcher { subsystem: SubsystemHandle, - job_status_map: Arc>, + job_handle_map: Arc>, repo: Arc, } impl JobDispatcher { pub fn new(subsystem: SubsystemHandle, repo: Repo) -> Self { Self { - job_status_map: Arc::new(RwLock::new(SendSyncTypeMap::new())), + job_handle_map: Arc::new(RwLock::new(SendSyncTypeMap::new())), subsystem, repo: Arc::new(repo), } } - pub async fn dispatch(&self, job: T) -> Arc> { + pub async fn dispatch(&self, job: T) -> JobHandle { self._dispatch(job, None).await } @@ -32,7 +34,7 @@ impl JobDispatcher { &self, job: T, interval: Duration, - ) -> Arc> { + ) -> JobHandle { self._dispatch(job, Some(interval)).await } @@ -41,9 +43,21 @@ impl JobDispatcher { &self, job: T, interval: Option, - ) -> Arc> { - let status = job.state(); - self.add_status::>(status.clone()).await; + ) -> JobHandle { + let status = job.status(); + let state = Arc::new(RwLock::new(JobState::Queued)); + let (sender, mut receiver) = channel(1); + self.subsystem + .start("channel-consumer", move |subsystem| async move { + tokio::select! { + _ = receiver.recv() => (), + _ = subsystem.on_shutdown_requested() => (), + } + Ok(()) + }); + let receiver = CloneableReceiver::new(sender.clone()); + let handle = JobHandle::new(status.clone(), state.clone(), receiver); + self.add_handle::>(handle.clone()).await; let repo = self.repo.clone(); @@ -52,16 +66,21 @@ impl JobDispatcher { loop { let start = Instant::now(); let job_2 = job.clone(); + { + let mut state = state.write().await; + *state = JobState::Running; + } let result = tokio::select! { _ = subsystem.on_shutdown_requested() => { job_2.save_state(repo.job()).await } r = job.run(repo.clone()) => { - - if let Err(e) = r { - Err(e) - } else { - job.save_state(repo.job()).await + match r { + Err(e) => Err(e), + Ok(v) => { + let _ = sender.send(Arc::new(RwLock::new(Ok(v)))); + job.save_state(repo.job()).await + } } } }; @@ -69,12 +88,18 @@ impl JobDispatcher { tracing::error!("job failed with error: {}", e); } if let Some(interval) = interval { + { + let mut state = state.write().await; + *state = JobState::Scheduled; + } let sleep_duration = interval - start.elapsed(); tokio::select! { _ = tokio::time::sleep(sleep_duration) => {}, _ = subsystem.on_shutdown_requested() => {break} } } else { + let mut state = state.write().await; + *state = JobState::Finished; break; } } @@ -82,15 +107,15 @@ impl JobDispatcher { Ok(()) }); - status + handle } #[inline] - async fn add_status(&self, status: T::Value) + async fn add_handle(&self, status: T::Value) where ::Value: Send + Sync, { - let mut status_map = self.job_status_map.write().await; + let mut status_map = self.job_handle_map.write().await; status_map.insert::(status); } } diff --git a/mediarepo-daemon/mediarepo-worker/src/jobs/calculate_sizes.rs b/mediarepo-daemon/mediarepo-worker/src/jobs/calculate_sizes.rs index 6f13a60..d9ce393 100644 --- a/mediarepo-daemon/mediarepo-worker/src/jobs/calculate_sizes.rs +++ b/mediarepo-daemon/mediarepo-worker/src/jobs/calculate_sizes.rs @@ -40,9 +40,10 @@ impl CalculateSizesJob { #[async_trait] impl Job for CalculateSizesJob { - type JobState = CalculateSizesState; + type JobStatus = CalculateSizesState; + type Result = (); - fn state(&self) -> Arc> { + fn status(&self) -> Arc> { self.state.clone() } diff --git a/mediarepo-daemon/mediarepo-worker/src/jobs/generate_missing_thumbnails.rs b/mediarepo-daemon/mediarepo-worker/src/jobs/generate_missing_thumbnails.rs index 9483fd4..05cf9e7 100644 --- a/mediarepo-daemon/mediarepo-worker/src/jobs/generate_missing_thumbnails.rs +++ b/mediarepo-daemon/mediarepo-worker/src/jobs/generate_missing_thumbnails.rs @@ -15,9 +15,10 @@ pub struct GenerateMissingThumbsJob { #[async_trait] impl Job for GenerateMissingThumbsJob { - type JobState = SimpleProgress; + type JobStatus = SimpleProgress; + type Result = (); - fn state(&self) -> Arc> { + fn status(&self) -> Arc> { self.state.clone() } diff --git a/mediarepo-daemon/mediarepo-worker/src/jobs/mod.rs b/mediarepo-daemon/mediarepo-worker/src/jobs/mod.rs index 73f1887..b56b4ed 100644 --- a/mediarepo-daemon/mediarepo-worker/src/jobs/mod.rs +++ b/mediarepo-daemon/mediarepo-worker/src/jobs/mod.rs @@ -8,6 +8,7 @@ use std::marker::PhantomData; use std::sync::Arc; pub use vacuum::*; +use crate::handle::JobHandle; use async_trait::async_trait; use mediarepo_core::error::RepoResult; use mediarepo_core::trait_bound_typemap::TypeMapKey; @@ -19,11 +20,12 @@ type EmptyStatus = Arc>; #[async_trait] pub trait Job: Clone + Send + Sync { - type JobState: Send + Sync; + type JobStatus: Send + Sync; + type Result: Send + Sync; - fn state(&self) -> Arc>; + fn status(&self) -> Arc>; - async fn run(&self, repo: Arc) -> RepoResult<()>; + async fn run(&self, repo: Arc) -> RepoResult; #[tracing::instrument(level = "debug", skip_all)] async fn save_state(&self, _job_dao: JobDao) -> RepoResult<()> { @@ -37,5 +39,5 @@ impl TypeMapKey for JobTypeKey where T: Job, { - type Value = Arc>; + type Value = JobHandle; } diff --git a/mediarepo-daemon/mediarepo-worker/src/jobs/vacuum.rs b/mediarepo-daemon/mediarepo-worker/src/jobs/vacuum.rs index 5335de6..60bc275 100644 --- a/mediarepo-daemon/mediarepo-worker/src/jobs/vacuum.rs +++ b/mediarepo-daemon/mediarepo-worker/src/jobs/vacuum.rs @@ -11,9 +11,10 @@ pub struct VacuumJob; #[async_trait] impl Job for VacuumJob { - type JobState = (); + type JobStatus = (); + type Result = (); - fn state(&self) -> Arc> { + fn status(&self) -> Arc> { EmptyStatus::default() } diff --git a/mediarepo-daemon/mediarepo-worker/src/lib.rs b/mediarepo-daemon/mediarepo-worker/src/lib.rs index 617eb4f..1d6dde8 100644 --- a/mediarepo-daemon/mediarepo-worker/src/lib.rs +++ b/mediarepo-daemon/mediarepo-worker/src/lib.rs @@ -6,6 +6,7 @@ use mediarepo_logic::dao::repo::Repo; use std::time::Duration; use tokio::sync::oneshot::channel; +pub mod handle; pub mod job_dispatcher; pub mod jobs; pub mod status_utils;