Add job trait and state data objects

Signed-off-by: trivernis <trivernis@protonmail.com>
feature/jobs
trivernis 2 years ago
parent 8b342f6aac
commit 530fbd7606
Signed by: Trivernis
GPG Key ID: DFFFCC2C7A02DB45

@ -1294,6 +1294,7 @@ name = "mediarepo-core"
version = "0.1.0"
dependencies = [
"base64",
"bincode",
"config",
"data-encoding",
"futures 0.3.21",
@ -1324,6 +1325,7 @@ dependencies = [
"mediarepo-core",
"mediarepo-logic",
"mediarepo-socket",
"mediarepo-worker",
"num-integer",
"rolling-file",
"structopt",
@ -1381,6 +1383,18 @@ dependencies = [
"tracing-futures",
]
[[package]]
name = "mediarepo-worker"
version = "0.1.0"
dependencies = [
"async-trait",
"chrono",
"mediarepo-core",
"mediarepo-logic",
"serde",
"tokio",
]
[[package]]
name = "memchr"
version = "2.4.1"

@ -1,6 +1,6 @@
[workspace]
members = ["mediarepo-core", "mediarepo-database", "mediarepo-logic", "mediarepo-socket", "."]
default-members = ["mediarepo-core", "mediarepo-database", "mediarepo-logic", "mediarepo-socket", "."]
members = ["mediarepo-core", "mediarepo-database", "mediarepo-logic", "mediarepo-socket", "mediarepo-worker", "."]
default-members = ["mediarepo-core", "mediarepo-database", "mediarepo-logic", "mediarepo-socket", "mediarepo-worker", "."]
[package]
name = "mediarepo-daemon"
@ -37,6 +37,9 @@ path = "mediarepo-logic"
[dependencies.mediarepo-socket]
path = "./mediarepo-socket"
[dependencies.mediarepo-worker]
path = "./mediarepo-worker"
[dependencies.tokio]
version = "1.17.0"
features = ["macros", "rt-multi-thread", "io-std", "io-util"]

@ -21,6 +21,7 @@ tracing = "0.1.31"
data-encoding = "2.3.2"
tokio-graceful-shutdown = "0.4.3"
thumbnailer = "0.4.0"
bincode = "1.3.3"
[dependencies.sea-orm]
version = "0.6.0"

@ -43,6 +43,9 @@ pub enum RepoError {
#[error("the database file is corrupted {0}")]
Corrupted(String),
#[error("bincode de-/serialization failed {0}")]
Bincode(#[from] bincode::Error),
}
#[derive(Error, Debug)]

@ -1,3 +1,4 @@
pub use bincode;
pub use futures;
pub use itertools;
pub use mediarepo_api;

@ -12,7 +12,7 @@ pub struct Model {
pub interval: Option<i64>,
}
#[derive(Clone, Debug, PartialEq, EnumIter, DeriveActiveEnum)]
#[derive(Clone, Copy, Debug, PartialEq, EnumIter, DeriveActiveEnum)]
#[sea_orm(rs_type = "u32", db_type = "Integer")]
pub enum JobType {
#[sea_orm(num_value = 10)]

@ -3,5 +3,6 @@ use crate::dao_provider;
pub mod generate_missing_thumbnails;
pub mod migrate_content_descriptors;
pub mod sqlite_operations;
pub mod state;
dao_provider!(JobDao);

@ -0,0 +1,20 @@
use crate::dao::job::JobDao;
use crate::dto::JobStateDto;
use mediarepo_core::error::RepoResult;
use mediarepo_database::entities::job_state;
use sea_orm::prelude::*;
impl JobDao {
/// Returns all job states for a given job id
pub async fn states_for_job_id(&self, job_id: i64) -> RepoResult<Vec<JobStateDto>> {
let states = job_state::Entity::find()
.filter(job_state::Column::JobId.eq(job_id))
.all(&self.ctx.db)
.await?
.into_iter()
.map(JobStateDto::new)
.collect();
Ok(states)
}
}

@ -0,0 +1,33 @@
use chrono::NaiveDateTime;
use mediarepo_database::entities::job;
use mediarepo_database::entities::job::JobType;
pub struct JobDto {
model: job::Model,
}
impl JobDto {
pub(crate) fn new(model: job::Model) -> Self {
Self { model }
}
pub fn id(&self) -> i64 {
self.model.id
}
pub fn job_type(&self) -> JobType {
self.model.job_type
}
pub fn name(&self) -> Option<&String> {
self.model.name.as_ref()
}
pub fn next_run(&self) -> Option<NaiveDateTime> {
self.model.next_run
}
pub fn interval(&self) -> Option<i64> {
self.model.interval
}
}

@ -0,0 +1,35 @@
use mediarepo_database::entities::job_state;
#[derive(Clone, Debug)]
pub struct JobStateDto {
model: job_state::Model,
}
impl JobStateDto {
pub(crate) fn new(model: job_state::Model) -> Self {
Self { model }
}
pub fn job_id(&self) -> i64 {
self.model.job_id
}
pub fn key(&self) -> &String {
&self.model.key
}
pub fn value(&self) -> &[u8] {
&self.model.value
}
pub fn into_value(self) -> Vec<u8> {
self.model.value
}
}
#[derive(Clone, Debug)]
pub struct UpsertJobStateDto {
pub job_id: i64,
pub key: String,
pub value: Vec<u8>,
}

@ -1,5 +1,7 @@
pub use file::*;
pub use file_metadata::*;
pub use job::*;
pub use job_state::*;
pub use namespace::*;
pub use sorting_preset::*;
pub use tag::*;
@ -7,6 +9,8 @@ pub use thumbnail::*;
mod file;
mod file_metadata;
mod job;
mod job_state;
mod namespace;
mod sorting_preset;
mod tag;

@ -0,0 +1,27 @@
[package]
name = "mediarepo-worker"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
async-trait = "0.1.52"
[dependencies.mediarepo-core]
path = "../mediarepo-core"
[dependencies.mediarepo-logic]
path = "../mediarepo-logic"
[dependencies.tokio]
version = "1.17.0"
features = []
[dependencies.chrono]
version = "0.4.19"
features = ["serde"]
[dependencies.serde]
version = "1.0.136"
features = ["derive"]

@ -0,0 +1,23 @@
use crate::state_data::StateData;
use async_trait::async_trait;
use mediarepo_core::error::RepoResult;
use mediarepo_logic::dao::repo::Repo;
use mediarepo_logic::dto::JobDto;
#[async_trait]
pub trait ScheduledJob {
fn new(dto: JobDto) -> Self;
async fn set_state(&self, state: StateData) -> RepoResult<()>;
async fn run(&self, repo: Repo) -> RepoResult<()>;
fn execution_state(&self) -> JobExecutionState;
}
#[derive(Clone, Copy, Debug, Ord, PartialOrd, Eq, PartialEq)]
pub enum JobExecutionState {
Scheduled,
Running,
Finished,
}

@ -0,0 +1,3 @@
pub mod jobs;
pub mod scheduler;
pub mod state_data;

@ -0,0 +1,7 @@
use mediarepo_logic::dao::repo::Repo;
pub struct Scheduler {
pub repo: Repo,
}
impl Scheduler {}

@ -0,0 +1,84 @@
use mediarepo_core::bincode;
use mediarepo_core::error::RepoResult;
use mediarepo_logic::dao::job::JobDao;
use mediarepo_logic::dto::UpsertJobStateDto;
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use tokio::sync::RwLock;
#[derive(Debug)]
pub struct StateData {
job_id: i64,
inner: Arc<RwLock<HashMap<String, Vec<u8>>>>,
changed_keys: Arc<RwLock<HashSet<String>>>,
}
impl StateData {
/// Loads the state from the database
pub async fn load(job_dao: JobDao, job_id: i64) -> RepoResult<Self> {
let states = job_dao.states_for_job_id(job_id).await?;
let states_map = states
.into_iter()
.map(|s| (s.key().to_owned(), s.into_value()))
.collect::<HashMap<String, Vec<u8>>>();
Ok(Self {
job_id,
inner: Arc::new(RwLock::new(states_map)),
changed_keys: Default::default(),
})
}
/// Returns the deserialized copy of a state object from the inner map
pub async fn entry<S: AsRef<str>, T: DeserializeOwned>(&self, key: S) -> RepoResult<Option<T>> {
let entries = self.inner.read().await;
let entry = entries.get(key.as_ref());
if let Some(bytes) = entry {
let value = bincode::deserialize(bytes)?;
Ok(Some(value))
} else {
Ok(None)
}
}
/// Stores an entry in inner map
pub async fn store_entry<T: Serialize>(&self, key: String, value: &T) -> RepoResult<()> {
let entry_bytes = bincode::serialize(value)?;
let mut entries = self.inner.write().await;
entries.insert(key.clone(), entry_bytes);
let mut changed_entries = self.changed_keys.write().await;
changed_entries.insert(key);
Ok(())
}
/// Returns a list of all changed state objects as an upsert list
pub async fn changed_states(&self) -> Vec<UpsertJobStateDto> {
let mut upsert_list = Vec::new();
{
let changed_keys = self.changed_keys.read().await;
let entries = self.inner.read().await;
for key in &*changed_keys {
if let Some(value) = entries.get(key) {
upsert_list.push(UpsertJobStateDto {
job_id: self.job_id,
key: key.to_owned(),
value: value.clone(),
});
}
}
}
{
let mut changed_keys = self.changed_keys.write().await;
changed_keys.clear();
}
upsert_list
}
}
Loading…
Cancel
Save