Add job handle to jobs to wait for results

Signed-off-by: trivernis <trivernis@protonmail.com>
feature/jobs
trivernis 2 years ago
parent a87c341867
commit e9dbbd1bd5
Signed by: Trivernis
GPG Key ID: DFFFCC2C7A02DB45

@ -39,12 +39,18 @@ impl JobsNamespace {
JobType::CalculateSizes => calculate_all_sizes(ctx).await?, JobType::CalculateSizes => calculate_all_sizes(ctx).await?,
JobType::CheckIntegrity => job_dao.check_integrity().await?, JobType::CheckIntegrity => job_dao.check_integrity().await?,
JobType::Vacuum => { JobType::Vacuum => {
dispatcher.dispatch(VacuumJob::default()).await; let mut handle = dispatcher.dispatch(VacuumJob::default()).await;
if run_request.sync {
handle.try_result().await?;
}
} }
JobType::GenerateThumbnails => { JobType::GenerateThumbnails => {
dispatcher let mut handle = dispatcher
.dispatch(GenerateMissingThumbsJob::default()) .dispatch(GenerateMissingThumbsJob::default())
.await; .await;
if run_request.sync {
handle.try_result().await?;
}
} }
} }
@ -62,10 +68,10 @@ async fn calculate_all_sizes(ctx: &Context) -> RepoResult<()> {
}; };
let job = CalculateSizesJob::new(repo_path, settings); let job = CalculateSizesJob::new(repo_path, settings);
let dispatcher = get_job_dispatcher_from_context(ctx).await; let dispatcher = get_job_dispatcher_from_context(ctx).await;
let state = dispatcher.dispatch(job).await; let handle = dispatcher.dispatch(job).await;
let mut rx = { let mut rx = {
let state = state.read().await; let status = handle.status().read().await;
state.sizes_channel.subscribe() status.sizes_channel.subscribe()
}; };
while let Ok((size_type, size)) = rx.recv().await { while let Ok((size_type, size)) = rx.recv().await {

@ -0,0 +1,102 @@
use mediarepo_core::error::{RepoError, RepoResult};
use std::mem;
use std::ops::{Deref, DerefMut};
use std::sync::Arc;
use tokio::sync::broadcast::{Receiver, Sender};
use tokio::sync::RwLock;
pub struct JobHandle<T: Send + Sync, R: Send + Sync> {
status: Arc<RwLock<T>>,
state: Arc<RwLock<JobState>>,
result_receiver: CloneableReceiver<Arc<RwLock<RepoResult<R>>>>,
}
impl<T: Send + Sync, R: Send + Sync> Clone for JobHandle<T, R> {
fn clone(&self) -> Self {
Self {
status: self.status.clone(),
state: self.state.clone(),
result_receiver: self.result_receiver.clone(),
}
}
}
impl<T: Send + Sync, R: Send + Sync> JobHandle<T, R> {
pub fn new(
status: Arc<RwLock<T>>,
state: Arc<RwLock<JobState>>,
result_receiver: CloneableReceiver<Arc<RwLock<RepoResult<R>>>>,
) -> Self {
Self {
status,
state,
result_receiver,
}
}
pub async fn state(&self) -> JobState {
*self.state.read().await
}
pub fn status(&self) -> &Arc<RwLock<T>> {
&self.status
}
pub async fn result(&mut self) -> Arc<RwLock<RepoResult<R>>> {
match self.result_receiver.recv().await {
Ok(v) => v,
Err(e) => Arc::new(RwLock::new(Err(RepoError::from(&*e.to_string())))),
}
}
pub async fn try_result(&mut self) -> RepoResult<R> {
let shared_result = self.result().await;
let mut result = shared_result.write().await;
mem::replace(&mut *result, Err(RepoError::from("result taken")))
}
}
#[derive(Clone, Copy, Debug, Ord, PartialOrd, Eq, PartialEq)]
pub enum JobState {
Queued,
Scheduled,
Running,
Finished,
}
pub struct CloneableReceiver<T: Clone> {
receiver: Receiver<T>,
sender: Sender<T>,
}
impl<T: Clone> CloneableReceiver<T> {
pub fn new(sender: Sender<T>) -> Self {
Self {
receiver: sender.subscribe(),
sender,
}
}
}
impl<T: Clone> Clone for CloneableReceiver<T> {
fn clone(&self) -> Self {
Self {
sender: self.sender.clone(),
receiver: self.sender.subscribe(),
}
}
}
impl<T: Clone> Deref for CloneableReceiver<T> {
type Target = Receiver<T>;
fn deref(&self) -> &Self::Target {
&self.receiver
}
}
impl<T: Clone> DerefMut for CloneableReceiver<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.receiver
}
}

@ -1,3 +1,4 @@
use crate::handle::{CloneableReceiver, JobHandle, JobState};
use crate::jobs::{Job, JobTypeKey}; use crate::jobs::{Job, JobTypeKey};
use mediarepo_core::tokio_graceful_shutdown::SubsystemHandle; use mediarepo_core::tokio_graceful_shutdown::SubsystemHandle;
use mediarepo_core::trait_bound_typemap::{SendSyncTypeMap, TypeMap, TypeMapKey}; use mediarepo_core::trait_bound_typemap::{SendSyncTypeMap, TypeMap, TypeMapKey};
@ -5,26 +6,27 @@ use mediarepo_logic::dao::repo::Repo;
use mediarepo_logic::dao::DaoProvider; use mediarepo_logic::dao::DaoProvider;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use tokio::sync::broadcast::channel;
use tokio::sync::RwLock; use tokio::sync::RwLock;
use tokio::time::Instant; use tokio::time::Instant;
#[derive(Clone)] #[derive(Clone)]
pub struct JobDispatcher { pub struct JobDispatcher {
subsystem: SubsystemHandle, subsystem: SubsystemHandle,
job_status_map: Arc<RwLock<SendSyncTypeMap>>, job_handle_map: Arc<RwLock<SendSyncTypeMap>>,
repo: Arc<Repo>, repo: Arc<Repo>,
} }
impl JobDispatcher { impl JobDispatcher {
pub fn new(subsystem: SubsystemHandle, repo: Repo) -> Self { pub fn new(subsystem: SubsystemHandle, repo: Repo) -> Self {
Self { Self {
job_status_map: Arc::new(RwLock::new(SendSyncTypeMap::new())), job_handle_map: Arc::new(RwLock::new(SendSyncTypeMap::new())),
subsystem, subsystem,
repo: Arc::new(repo), repo: Arc::new(repo),
} }
} }
pub async fn dispatch<T: 'static + Job>(&self, job: T) -> Arc<RwLock<T::JobState>> { pub async fn dispatch<T: 'static + Job>(&self, job: T) -> JobHandle<T::JobStatus, T::Result> {
self._dispatch(job, None).await self._dispatch(job, None).await
} }
@ -32,7 +34,7 @@ impl JobDispatcher {
&self, &self,
job: T, job: T,
interval: Duration, interval: Duration,
) -> Arc<RwLock<T::JobState>> { ) -> JobHandle<T::JobStatus, T::Result> {
self._dispatch(job, Some(interval)).await self._dispatch(job, Some(interval)).await
} }
@ -41,9 +43,21 @@ impl JobDispatcher {
&self, &self,
job: T, job: T,
interval: Option<Duration>, interval: Option<Duration>,
) -> Arc<RwLock<T::JobState>> { ) -> JobHandle<T::JobStatus, T::Result> {
let status = job.state(); let status = job.status();
self.add_status::<JobTypeKey<T>>(status.clone()).await; let state = Arc::new(RwLock::new(JobState::Queued));
let (sender, mut receiver) = channel(1);
self.subsystem
.start("channel-consumer", move |subsystem| async move {
tokio::select! {
_ = receiver.recv() => (),
_ = subsystem.on_shutdown_requested() => (),
}
Ok(())
});
let receiver = CloneableReceiver::new(sender.clone());
let handle = JobHandle::new(status.clone(), state.clone(), receiver);
self.add_handle::<JobTypeKey<T>>(handle.clone()).await;
let repo = self.repo.clone(); let repo = self.repo.clone();
@ -52,16 +66,21 @@ impl JobDispatcher {
loop { loop {
let start = Instant::now(); let start = Instant::now();
let job_2 = job.clone(); let job_2 = job.clone();
{
let mut state = state.write().await;
*state = JobState::Running;
}
let result = tokio::select! { let result = tokio::select! {
_ = subsystem.on_shutdown_requested() => { _ = subsystem.on_shutdown_requested() => {
job_2.save_state(repo.job()).await job_2.save_state(repo.job()).await
} }
r = job.run(repo.clone()) => { r = job.run(repo.clone()) => {
match r {
if let Err(e) = r { Err(e) => Err(e),
Err(e) Ok(v) => {
} else { let _ = sender.send(Arc::new(RwLock::new(Ok(v))));
job.save_state(repo.job()).await job.save_state(repo.job()).await
}
} }
} }
}; };
@ -69,12 +88,18 @@ impl JobDispatcher {
tracing::error!("job failed with error: {}", e); tracing::error!("job failed with error: {}", e);
} }
if let Some(interval) = interval { if let Some(interval) = interval {
{
let mut state = state.write().await;
*state = JobState::Scheduled;
}
let sleep_duration = interval - start.elapsed(); let sleep_duration = interval - start.elapsed();
tokio::select! { tokio::select! {
_ = tokio::time::sleep(sleep_duration) => {}, _ = tokio::time::sleep(sleep_duration) => {},
_ = subsystem.on_shutdown_requested() => {break} _ = subsystem.on_shutdown_requested() => {break}
} }
} else { } else {
let mut state = state.write().await;
*state = JobState::Finished;
break; break;
} }
} }
@ -82,15 +107,15 @@ impl JobDispatcher {
Ok(()) Ok(())
}); });
status handle
} }
#[inline] #[inline]
async fn add_status<T: TypeMapKey>(&self, status: T::Value) async fn add_handle<T: TypeMapKey>(&self, status: T::Value)
where where
<T as TypeMapKey>::Value: Send + Sync, <T as TypeMapKey>::Value: Send + Sync,
{ {
let mut status_map = self.job_status_map.write().await; let mut status_map = self.job_handle_map.write().await;
status_map.insert::<T>(status); status_map.insert::<T>(status);
} }
} }

@ -40,9 +40,10 @@ impl CalculateSizesJob {
#[async_trait] #[async_trait]
impl Job for CalculateSizesJob { impl Job for CalculateSizesJob {
type JobState = CalculateSizesState; type JobStatus = CalculateSizesState;
type Result = ();
fn state(&self) -> Arc<RwLock<Self::JobState>> { fn status(&self) -> Arc<RwLock<Self::JobStatus>> {
self.state.clone() self.state.clone()
} }

@ -15,9 +15,10 @@ pub struct GenerateMissingThumbsJob {
#[async_trait] #[async_trait]
impl Job for GenerateMissingThumbsJob { impl Job for GenerateMissingThumbsJob {
type JobState = SimpleProgress; type JobStatus = SimpleProgress;
type Result = ();
fn state(&self) -> Arc<RwLock<Self::JobState>> { fn status(&self) -> Arc<RwLock<Self::JobStatus>> {
self.state.clone() self.state.clone()
} }

@ -8,6 +8,7 @@ use std::marker::PhantomData;
use std::sync::Arc; use std::sync::Arc;
pub use vacuum::*; pub use vacuum::*;
use crate::handle::JobHandle;
use async_trait::async_trait; use async_trait::async_trait;
use mediarepo_core::error::RepoResult; use mediarepo_core::error::RepoResult;
use mediarepo_core::trait_bound_typemap::TypeMapKey; use mediarepo_core::trait_bound_typemap::TypeMapKey;
@ -19,11 +20,12 @@ type EmptyStatus = Arc<RwLock<()>>;
#[async_trait] #[async_trait]
pub trait Job: Clone + Send + Sync { pub trait Job: Clone + Send + Sync {
type JobState: Send + Sync; type JobStatus: Send + Sync;
type Result: Send + Sync;
fn state(&self) -> Arc<RwLock<Self::JobState>>; fn status(&self) -> Arc<RwLock<Self::JobStatus>>;
async fn run(&self, repo: Arc<Repo>) -> RepoResult<()>; async fn run(&self, repo: Arc<Repo>) -> RepoResult<Self::Result>;
#[tracing::instrument(level = "debug", skip_all)] #[tracing::instrument(level = "debug", skip_all)]
async fn save_state(&self, _job_dao: JobDao) -> RepoResult<()> { async fn save_state(&self, _job_dao: JobDao) -> RepoResult<()> {
@ -37,5 +39,5 @@ impl<T: 'static> TypeMapKey for JobTypeKey<T>
where where
T: Job, T: Job,
{ {
type Value = Arc<RwLock<T::JobState>>; type Value = JobHandle<T::JobStatus, T::Result>;
} }

@ -11,9 +11,10 @@ pub struct VacuumJob;
#[async_trait] #[async_trait]
impl Job for VacuumJob { impl Job for VacuumJob {
type JobState = (); type JobStatus = ();
type Result = ();
fn state(&self) -> Arc<RwLock<Self::JobState>> { fn status(&self) -> Arc<RwLock<Self::JobStatus>> {
EmptyStatus::default() EmptyStatus::default()
} }

@ -6,6 +6,7 @@ use mediarepo_logic::dao::repo::Repo;
use std::time::Duration; use std::time::Duration;
use tokio::sync::oneshot::channel; use tokio::sync::oneshot::channel;
pub mod handle;
pub mod job_dispatcher; pub mod job_dispatcher;
pub mod jobs; pub mod jobs;
pub mod status_utils; pub mod status_utils;

Loading…
Cancel
Save