diff --git a/mediarepo-daemon/Cargo.lock b/mediarepo-daemon/Cargo.lock index 741d923..7dfc9f1 100644 --- a/mediarepo-daemon/Cargo.lock +++ b/mediarepo-daemon/Cargo.lock @@ -1294,6 +1294,7 @@ name = "mediarepo-core" version = "0.1.0" dependencies = [ "base64", + "bincode", "config", "data-encoding", "futures 0.3.21", @@ -1324,6 +1325,7 @@ dependencies = [ "mediarepo-core", "mediarepo-logic", "mediarepo-socket", + "mediarepo-worker", "num-integer", "rolling-file", "structopt", @@ -1381,6 +1383,18 @@ dependencies = [ "tracing-futures", ] +[[package]] +name = "mediarepo-worker" +version = "0.1.0" +dependencies = [ + "async-trait", + "chrono", + "mediarepo-core", + "mediarepo-logic", + "serde", + "tokio", +] + [[package]] name = "memchr" version = "2.4.1" diff --git a/mediarepo-daemon/Cargo.toml b/mediarepo-daemon/Cargo.toml index 821495e..81deeb0 100644 --- a/mediarepo-daemon/Cargo.toml +++ b/mediarepo-daemon/Cargo.toml @@ -1,6 +1,6 @@ [workspace] -members = ["mediarepo-core", "mediarepo-database", "mediarepo-logic", "mediarepo-socket", "."] -default-members = ["mediarepo-core", "mediarepo-database", "mediarepo-logic", "mediarepo-socket", "."] +members = ["mediarepo-core", "mediarepo-database", "mediarepo-logic", "mediarepo-socket", "mediarepo-worker", "."] +default-members = ["mediarepo-core", "mediarepo-database", "mediarepo-logic", "mediarepo-socket", "mediarepo-worker", "."] [package] name = "mediarepo-daemon" @@ -37,6 +37,9 @@ path = "mediarepo-logic" [dependencies.mediarepo-socket] path = "./mediarepo-socket" +[dependencies.mediarepo-worker] +path = "./mediarepo-worker" + [dependencies.tokio] version = "1.17.0" features = ["macros", "rt-multi-thread", "io-std", "io-util"] diff --git a/mediarepo-daemon/mediarepo-core/Cargo.toml b/mediarepo-daemon/mediarepo-core/Cargo.toml index 5a007dd..09cdf3f 100644 --- a/mediarepo-daemon/mediarepo-core/Cargo.toml +++ b/mediarepo-daemon/mediarepo-core/Cargo.toml @@ -21,6 +21,7 @@ tracing = "0.1.31" data-encoding = "2.3.2" tokio-graceful-shutdown = "0.4.3" thumbnailer = "0.4.0" +bincode = "1.3.3" [dependencies.sea-orm] version = "0.6.0" diff --git a/mediarepo-daemon/mediarepo-core/src/error.rs b/mediarepo-daemon/mediarepo-core/src/error.rs index 7f5dfbc..4173ed4 100644 --- a/mediarepo-daemon/mediarepo-core/src/error.rs +++ b/mediarepo-daemon/mediarepo-core/src/error.rs @@ -43,6 +43,9 @@ pub enum RepoError { #[error("the database file is corrupted {0}")] Corrupted(String), + + #[error("bincode de-/serialization failed {0}")] + Bincode(#[from] bincode::Error), } #[derive(Error, Debug)] diff --git a/mediarepo-daemon/mediarepo-core/src/lib.rs b/mediarepo-daemon/mediarepo-core/src/lib.rs index a33546e..ab1a2fc 100644 --- a/mediarepo-daemon/mediarepo-core/src/lib.rs +++ b/mediarepo-daemon/mediarepo-core/src/lib.rs @@ -1,3 +1,4 @@ +pub use bincode; pub use futures; pub use itertools; pub use mediarepo_api; diff --git a/mediarepo-daemon/mediarepo-database/src/entities/job.rs b/mediarepo-daemon/mediarepo-database/src/entities/job.rs index f87f5ba..dddf46c 100644 --- a/mediarepo-daemon/mediarepo-database/src/entities/job.rs +++ b/mediarepo-daemon/mediarepo-database/src/entities/job.rs @@ -12,7 +12,7 @@ pub struct Model { pub interval: Option, } -#[derive(Clone, Debug, PartialEq, EnumIter, DeriveActiveEnum)] +#[derive(Clone, Copy, Debug, PartialEq, EnumIter, DeriveActiveEnum)] #[sea_orm(rs_type = "u32", db_type = "Integer")] pub enum JobType { #[sea_orm(num_value = 10)] diff --git a/mediarepo-daemon/mediarepo-logic/src/dao/job/mod.rs b/mediarepo-daemon/mediarepo-logic/src/dao/job/mod.rs index 31ab290..b2077bc 100644 --- a/mediarepo-daemon/mediarepo-logic/src/dao/job/mod.rs +++ b/mediarepo-daemon/mediarepo-logic/src/dao/job/mod.rs @@ -3,5 +3,6 @@ use crate::dao_provider; pub mod generate_missing_thumbnails; pub mod migrate_content_descriptors; pub mod sqlite_operations; +pub mod state; dao_provider!(JobDao); diff --git a/mediarepo-daemon/mediarepo-logic/src/dao/job/state.rs b/mediarepo-daemon/mediarepo-logic/src/dao/job/state.rs new file mode 100644 index 0000000..db6c13b --- /dev/null +++ b/mediarepo-daemon/mediarepo-logic/src/dao/job/state.rs @@ -0,0 +1,20 @@ +use crate::dao::job::JobDao; +use crate::dto::JobStateDto; +use mediarepo_core::error::RepoResult; +use mediarepo_database::entities::job_state; +use sea_orm::prelude::*; + +impl JobDao { + /// Returns all job states for a given job id + pub async fn states_for_job_id(&self, job_id: i64) -> RepoResult> { + let states = job_state::Entity::find() + .filter(job_state::Column::JobId.eq(job_id)) + .all(&self.ctx.db) + .await? + .into_iter() + .map(JobStateDto::new) + .collect(); + + Ok(states) + } +} diff --git a/mediarepo-daemon/mediarepo-logic/src/dto/job.rs b/mediarepo-daemon/mediarepo-logic/src/dto/job.rs new file mode 100644 index 0000000..32a27ee --- /dev/null +++ b/mediarepo-daemon/mediarepo-logic/src/dto/job.rs @@ -0,0 +1,33 @@ +use chrono::NaiveDateTime; +use mediarepo_database::entities::job; +use mediarepo_database::entities::job::JobType; + +pub struct JobDto { + model: job::Model, +} + +impl JobDto { + pub(crate) fn new(model: job::Model) -> Self { + Self { model } + } + + pub fn id(&self) -> i64 { + self.model.id + } + + pub fn job_type(&self) -> JobType { + self.model.job_type + } + + pub fn name(&self) -> Option<&String> { + self.model.name.as_ref() + } + + pub fn next_run(&self) -> Option { + self.model.next_run + } + + pub fn interval(&self) -> Option { + self.model.interval + } +} diff --git a/mediarepo-daemon/mediarepo-logic/src/dto/job_state.rs b/mediarepo-daemon/mediarepo-logic/src/dto/job_state.rs new file mode 100644 index 0000000..05bbdce --- /dev/null +++ b/mediarepo-daemon/mediarepo-logic/src/dto/job_state.rs @@ -0,0 +1,35 @@ +use mediarepo_database::entities::job_state; + +#[derive(Clone, Debug)] +pub struct JobStateDto { + model: job_state::Model, +} + +impl JobStateDto { + pub(crate) fn new(model: job_state::Model) -> Self { + Self { model } + } + + pub fn job_id(&self) -> i64 { + self.model.job_id + } + + pub fn key(&self) -> &String { + &self.model.key + } + + pub fn value(&self) -> &[u8] { + &self.model.value + } + + pub fn into_value(self) -> Vec { + self.model.value + } +} + +#[derive(Clone, Debug)] +pub struct UpsertJobStateDto { + pub job_id: i64, + pub key: String, + pub value: Vec, +} diff --git a/mediarepo-daemon/mediarepo-logic/src/dto/mod.rs b/mediarepo-daemon/mediarepo-logic/src/dto/mod.rs index 2444214..c1285fa 100644 --- a/mediarepo-daemon/mediarepo-logic/src/dto/mod.rs +++ b/mediarepo-daemon/mediarepo-logic/src/dto/mod.rs @@ -1,5 +1,7 @@ pub use file::*; pub use file_metadata::*; +pub use job::*; +pub use job_state::*; pub use namespace::*; pub use sorting_preset::*; pub use tag::*; @@ -7,6 +9,8 @@ pub use thumbnail::*; mod file; mod file_metadata; +mod job; +mod job_state; mod namespace; mod sorting_preset; mod tag; diff --git a/mediarepo-daemon/mediarepo-worker/Cargo.toml b/mediarepo-daemon/mediarepo-worker/Cargo.toml new file mode 100644 index 0000000..96af684 --- /dev/null +++ b/mediarepo-daemon/mediarepo-worker/Cargo.toml @@ -0,0 +1,27 @@ +[package] +name = "mediarepo-worker" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +async-trait = "0.1.52" + +[dependencies.mediarepo-core] +path = "../mediarepo-core" + +[dependencies.mediarepo-logic] +path = "../mediarepo-logic" + +[dependencies.tokio] +version = "1.17.0" +features = [] + +[dependencies.chrono] +version = "0.4.19" +features = ["serde"] + +[dependencies.serde] +version = "1.0.136" +features = ["derive"] \ No newline at end of file diff --git a/mediarepo-daemon/mediarepo-worker/src/jobs/mod.rs b/mediarepo-daemon/mediarepo-worker/src/jobs/mod.rs new file mode 100644 index 0000000..ad9c8d8 --- /dev/null +++ b/mediarepo-daemon/mediarepo-worker/src/jobs/mod.rs @@ -0,0 +1,23 @@ +use crate::state_data::StateData; +use async_trait::async_trait; +use mediarepo_core::error::RepoResult; +use mediarepo_logic::dao::repo::Repo; +use mediarepo_logic::dto::JobDto; + +#[async_trait] +pub trait ScheduledJob { + fn new(dto: JobDto) -> Self; + + async fn set_state(&self, state: StateData) -> RepoResult<()>; + + async fn run(&self, repo: Repo) -> RepoResult<()>; + + fn execution_state(&self) -> JobExecutionState; +} + +#[derive(Clone, Copy, Debug, Ord, PartialOrd, Eq, PartialEq)] +pub enum JobExecutionState { + Scheduled, + Running, + Finished, +} diff --git a/mediarepo-daemon/mediarepo-worker/src/lib.rs b/mediarepo-daemon/mediarepo-worker/src/lib.rs new file mode 100644 index 0000000..6046de1 --- /dev/null +++ b/mediarepo-daemon/mediarepo-worker/src/lib.rs @@ -0,0 +1,3 @@ +pub mod jobs; +pub mod scheduler; +pub mod state_data; diff --git a/mediarepo-daemon/mediarepo-worker/src/scheduler.rs b/mediarepo-daemon/mediarepo-worker/src/scheduler.rs new file mode 100644 index 0000000..f7e244d --- /dev/null +++ b/mediarepo-daemon/mediarepo-worker/src/scheduler.rs @@ -0,0 +1,7 @@ +use mediarepo_logic::dao::repo::Repo; + +pub struct Scheduler { + pub repo: Repo, +} + +impl Scheduler {} diff --git a/mediarepo-daemon/mediarepo-worker/src/state_data.rs b/mediarepo-daemon/mediarepo-worker/src/state_data.rs new file mode 100644 index 0000000..f197fb5 --- /dev/null +++ b/mediarepo-daemon/mediarepo-worker/src/state_data.rs @@ -0,0 +1,84 @@ +use mediarepo_core::bincode; +use mediarepo_core::error::RepoResult; +use mediarepo_logic::dao::job::JobDao; +use mediarepo_logic::dto::UpsertJobStateDto; +use serde::de::DeserializeOwned; +use serde::Serialize; +use std::collections::{HashMap, HashSet}; +use std::sync::Arc; +use tokio::sync::RwLock; + +#[derive(Debug)] +pub struct StateData { + job_id: i64, + inner: Arc>>>, + changed_keys: Arc>>, +} + +impl StateData { + /// Loads the state from the database + pub async fn load(job_dao: JobDao, job_id: i64) -> RepoResult { + let states = job_dao.states_for_job_id(job_id).await?; + let states_map = states + .into_iter() + .map(|s| (s.key().to_owned(), s.into_value())) + .collect::>>(); + + Ok(Self { + job_id, + inner: Arc::new(RwLock::new(states_map)), + changed_keys: Default::default(), + }) + } + + /// Returns the deserialized copy of a state object from the inner map + pub async fn entry, T: DeserializeOwned>(&self, key: S) -> RepoResult> { + let entries = self.inner.read().await; + let entry = entries.get(key.as_ref()); + + if let Some(bytes) = entry { + let value = bincode::deserialize(bytes)?; + + Ok(Some(value)) + } else { + Ok(None) + } + } + + /// Stores an entry in inner map + pub async fn store_entry(&self, key: String, value: &T) -> RepoResult<()> { + let entry_bytes = bincode::serialize(value)?; + let mut entries = self.inner.write().await; + entries.insert(key.clone(), entry_bytes); + let mut changed_entries = self.changed_keys.write().await; + changed_entries.insert(key); + + Ok(()) + } + + /// Returns a list of all changed state objects as an upsert list + pub async fn changed_states(&self) -> Vec { + let mut upsert_list = Vec::new(); + + { + let changed_keys = self.changed_keys.read().await; + let entries = self.inner.read().await; + + for key in &*changed_keys { + if let Some(value) = entries.get(key) { + upsert_list.push(UpsertJobStateDto { + job_id: self.job_id, + key: key.to_owned(), + value: value.clone(), + }); + } + } + } + { + let mut changed_keys = self.changed_keys.write().await; + changed_keys.clear(); + } + + upsert_list + } +}