diff --git a/mediarepo-daemon/Cargo.lock b/mediarepo-daemon/Cargo.lock index cf12abc..3f7b0bc 100644 --- a/mediarepo-daemon/Cargo.lock +++ b/mediarepo-daemon/Cargo.lock @@ -63,9 +63,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.55" +version = "1.0.56" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "159bb86af3a200e19a068f4224eae4c8bb2d0fa054c7e5d1cacd5cef95e684cd" +checksum = "4361135be9122e0870de935d7c439aef945b9f9ddd4199a553b5270b49c82a27" [[package]] name = "arrayref" @@ -270,9 +270,8 @@ dependencies = [ [[package]] name = "bromine" -version = "0.18.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5dd7887995490657bf3ec578f39e747ef7b5355a8dc6c99b3d5be59ca70dc4d5" +version = "0.18.3" +source = "git+https://github.com/Trivernis/bromine#c2728a44ead210e1535bce5fc2d3979530700b96" dependencies = [ "async-trait", "bincode", @@ -301,9 +300,9 @@ checksum = "e3b5ca7a04898ad4bcd41c90c5285445ff5b791899bb1b0abdd2a2aa791211d7" [[package]] name = "bytemuck" -version = "1.7.3" +version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "439989e6b8c38d1b6570a384ef1e49c8848128f5a97f3914baef02920842712f" +checksum = "0e851ca7c24871e7336801608a4797d7376545b6928a10d32d75685687141ead" [[package]] name = "byteorder" @@ -709,9 +708,9 @@ dependencies = [ [[package]] name = "flume" -version = "0.10.11" +version = "0.10.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b279436a715a9de95dcd26b151db590a71961cc06e54918b24fe0dd5b7d3fc4" +checksum = "843c03199d0c0ca54bc1ea90ac0d507274c28abcc4f691ae8b4eaa375087c76a" dependencies = [ "futures-core", "futures-sink", @@ -885,7 +884,7 @@ dependencies = [ "cfg-if 1.0.0", "js-sys", "libc", - "wasi", + "wasi 0.10.0+wasi-snapshot-preview1", "wasm-bindgen", ] @@ -907,9 +906,9 @@ checksum = "9b919933a397b79c37e33b77bb2aa3dc8eb6e165ad809e58ff75bc7db2e34574" [[package]] name = "h2" -version = "0.3.11" +version = "0.3.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d9f1f717ddc7b2ba36df7e871fd88db79326551d3d6f1fc406fbfd28b582ff8e" +checksum = "62eeb471aa3e3c9197aa4bfeabfe02982f6dc96f750486c0bb0009ac58b26d2b" dependencies = [ "bytes", "fnv", @@ -1373,7 +1372,6 @@ dependencies = [ "serde", "tokio", "tracing", - "typemap_rev", ] [[package]] @@ -1465,14 +1463,15 @@ dependencies = [ [[package]] name = "mio" -version = "0.8.0" +version = "0.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ba272f85fa0b41fc91872be579b3bbe0f56b792aa361a380eb669469f68dafb2" +checksum = "7ba42135c6a5917b9db9cd7b293e5409e1c6b041e6f9825e92e55a894c63b6f8" dependencies = [ "libc", "log", "miow", "ntapi", + "wasi 0.11.0+wasi-snapshot-preview1", "winapi", ] @@ -1535,9 +1534,9 @@ checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a" [[package]] name = "nanorand" -version = "0.6.1" +version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "729eb334247daa1803e0a094d0a5c55711b85571179f5ec6e53eccfdf7008958" +checksum = "6a51313c5820b0b02bd422f4b44776fbf47961755c74ce64afc73bfad10226c3" dependencies = [ "getrandom", ] @@ -1644,18 +1643,18 @@ dependencies = [ [[package]] name = "num_enum" -version = "0.5.6" +version = "0.5.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "720d3ea1055e4e4574c0c0b0f8c3fd4f24c4cdaf465948206dea090b57b526ad" +checksum = "cf5395665662ef45796a4ff5486c5d41d29e0c09640af4c5f17fd94ee2c119c9" dependencies = [ "num_enum_derive", ] [[package]] name = "num_enum_derive" -version = "0.5.6" +version = "0.5.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0d992b768490d7fe0d8586d9b5745f6c49f557da6d81dc982b1d167ad4edbb21" +checksum = "3b0498641e53dd6ac1a4f22547548caa6864cc4933784319cd1775271c5a46ce" dependencies = [ "proc-macro-crate", "proc-macro2 1.0.36", @@ -1665,18 +1664,18 @@ dependencies = [ [[package]] name = "num_threads" -version = "0.1.3" +version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97ba99ba6393e2c3734791401b66902d981cb03bf190af674ca69949b6d5fb15" +checksum = "c539a50b93a303167eded6e8dff5220cd39447409fb659f4cd24b1f72fe4f133" dependencies = [ "libc", ] [[package]] name = "once_cell" -version = "1.9.0" +version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da32515d9f6e6e489d7bc9d84c71b060db7247dc035bbe44eac88cf87486d8d5" +checksum = "87f3e037eac156d1775da914196f0f37741a274155e34a0b7e427c35d2a2ecb9" [[package]] name = "opaque-debug" @@ -1947,9 +1946,9 @@ checksum = "58893f751c9b0412871a09abd62ecd2a00298c6c83befa223ef98c52aef40cbe" [[package]] name = "png" -version = "0.17.3" +version = "0.17.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e8f1882177b17c98ec33a51f5910ecbf4db92ca0def706781a1f8d0c661f393" +checksum = "dc38c0ad57efb786dd57b9864e5b18bae478c00c824dc55a38bbc9da95dde3ba" dependencies = [ "bitflags", "crc32fast", @@ -2149,18 +2148,18 @@ dependencies = [ [[package]] name = "redox_syscall" -version = "0.2.10" +version = "0.2.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8383f39639269cde97d255a32bdb68c047337295414940c68bdd30c2e13203ff" +checksum = "8380fe0152551244f0747b1bf41737e0f8a74f97a14ccefd1148187271634f3c" dependencies = [ "bitflags", ] [[package]] name = "regex" -version = "1.5.4" +version = "1.5.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d07a8629359eb56f1e2fb1652bb04212c072a87ba68546a04065d525673ac461" +checksum = "1a11647b6b25ff05a515cb92c365cec08801e83423a235b51e231e1808747286" dependencies = [ "aho-corasick", "memchr", @@ -2809,7 +2808,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6db9e6914ab8b1ae1c260a4ae7a49b6c5611b40328a735b21862567685e73255" dependencies = [ "libc", - "wasi", + "wasi 0.10.0+wasi-snapshot-preview1", "winapi", ] @@ -3039,9 +3038,9 @@ checksum = "360dfd1d6d30e05fda32ace2c8c70e9c0a9da713275777f5a4dbb8a1893930c6" [[package]] name = "tracing" -version = "0.1.31" +version = "0.1.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f6c650a8ef0cd2dd93736f033d21cbd1224c5a967aa0c258d00fcf7dafef9b9f" +checksum = "4a1bdf54a7c28a2bbf701e1d2233f6c77f473486b94bee4f9678da5a148dca7f" dependencies = [ "cfg-if 1.0.0", "log", @@ -3052,9 +3051,9 @@ dependencies = [ [[package]] name = "tracing-appender" -version = "0.2.0" +version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "94571df2eae3ed4353815ea5a90974a594a1792d8782ff2cbcc9392d1101f366" +checksum = "9ab026b18a46ac429e5c98bec10ca06424a97b3ad7b3949d9b4a102fff6623c4" dependencies = [ "crossbeam-channel", "time 0.3.7", @@ -3063,9 +3062,9 @@ dependencies = [ [[package]] name = "tracing-attributes" -version = "0.1.19" +version = "0.1.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8276d9a4a3a558d7b7ad5303ad50b53d58264641b82914b7ada36bd762e7a716" +checksum = "2e65ce065b4b5c53e73bb28912318cb8c9e9ad3921f1d669eb0e68b4c8143a2b" dependencies = [ "proc-macro2 1.0.36", "quote 1.0.15", @@ -3074,9 +3073,9 @@ dependencies = [ [[package]] name = "tracing-core" -version = "0.1.22" +version = "0.1.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "03cfcb51380632a72d3111cb8d3447a8d908e577d31beeac006f836383d29a23" +checksum = "aa31669fa42c09c34d94d8165dd2012e8ff3c66aca50f3bb226b68f216f2706c" dependencies = [ "lazy_static", "valuable", @@ -3168,8 +3167,7 @@ checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642" [[package]] name = "typemap_rev" version = "0.1.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ed5b74f0a24b5454580a79abb6994393b09adf0ab8070f15827cb666255de155" +source = "git+https://github.com/Trivernis/typemap_rev?rev=750c67bffe8024d2a47725daa473f068ad653fc4#750c67bffe8024d2a47725daa473f068ad653fc4" [[package]] name = "typenum" @@ -3305,6 +3303,12 @@ version = "0.10.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1a143597ca7c7793eff794def352d41792a93c481eb1042423ff7ff72ba2c31f" +[[package]] +name = "wasi" +version = "0.11.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" + [[package]] name = "wasm-bindgen" version = "0.2.79" diff --git a/mediarepo-daemon/Cargo.toml b/mediarepo-daemon/Cargo.toml index fb92df1..cf21780 100644 --- a/mediarepo-daemon/Cargo.toml +++ b/mediarepo-daemon/Cargo.toml @@ -16,12 +16,12 @@ name = "mediarepo-daemon" path = "src/main.rs" [dependencies] -tracing = "0.1.31" +tracing = "0.1.32" toml = "0.5.8" structopt = "0.3.26" glob = "0.3.0" tracing-flame = "0.2.0" -tracing-appender = "0.2.0" +tracing-appender = "0.2.1" tracing-log = "0.1.2" rolling-file = "0.1.0" num-integer = "0.1.44" diff --git a/mediarepo-daemon/mediarepo-core/Cargo.toml b/mediarepo-daemon/mediarepo-core/Cargo.toml index 3016aee..d7af792 100644 --- a/mediarepo-daemon/mediarepo-core/Cargo.toml +++ b/mediarepo-daemon/mediarepo-core/Cargo.toml @@ -13,17 +13,20 @@ multibase = "0.9.1" base64 = "0.13.0" toml = "0.5.8" serde = "1.0.136" -typemap_rev = "0.1.5" futures = "0.3.21" itertools = "0.10.3" glob = "0.3.0" -tracing = "0.1.31" +tracing = "0.1.32" data-encoding = "2.3.2" tokio-graceful-shutdown = "0.4.3" thumbnailer = "0.4.0" bincode = "1.3.3" tracing-subscriber = "0.3.9" +[dependencies.typemap_rev] +git = "https://github.com/Trivernis/typemap_rev" +rev = "750c67bffe8024d2a47725daa473f068ad653fc4" + [dependencies.sea-orm] version = "0.6.0" default-features = false diff --git a/mediarepo-daemon/mediarepo-core/src/lib.rs b/mediarepo-daemon/mediarepo-core/src/lib.rs index e9a928c..d98957f 100644 --- a/mediarepo-daemon/mediarepo-core/src/lib.rs +++ b/mediarepo-daemon/mediarepo-core/src/lib.rs @@ -5,6 +5,7 @@ pub use mediarepo_api; pub use mediarepo_api::bromine; pub use thumbnailer; pub use tokio_graceful_shutdown; +pub use typemap_rev; pub mod content_descriptor; pub mod context; @@ -13,4 +14,5 @@ pub mod fs; pub mod settings; pub mod tracing_layer_list; pub mod type_keys; +pub mod type_list; pub mod utils; diff --git a/mediarepo-daemon/mediarepo-core/src/type_list.rs b/mediarepo-daemon/mediarepo-core/src/type_list.rs index e69de29..fbe913f 100644 --- a/mediarepo-daemon/mediarepo-core/src/type_list.rs +++ b/mediarepo-daemon/mediarepo-core/src/type_list.rs @@ -0,0 +1,46 @@ +use std::any::{Any, TypeId}; +use std::vec::IntoIter; +use typemap_rev::TypeMapKey; + +pub trait CloneAny: Any + Send + Sync { + fn clone_any(&self) -> Box; +} + +impl CloneAny for T { + fn clone_any(&self) -> Box { + Box::new(self.clone()) + } +} + +impl Clone for Box { + fn clone(&self) -> Self { + (**self).clone_any() + } +} + +#[derive(Default, Clone)] +pub struct TypeList(Vec<(TypeId, Box)>); + +impl TypeList { + pub fn add, C: CloneAny>(&mut self, value: T::Value) { + self.0.push((TypeId::of::(), Box::new(value))) + } +} + +impl IntoIterator for TypeList { + type Item = (TypeId, Box); + type IntoIter = IntoIter; + + fn into_iter(self) -> Self::IntoIter { + self.0 + .into_iter() + .map(|(t, v)| { + (t, unsafe { + // SAFETY: CloneAny requires types to be Any + Send + Sync (+ Clone) + std::mem::transmute::, Box>(v) + }) + }) + .collect::)>>() + .into_iter() + } +} diff --git a/mediarepo-daemon/mediarepo-database/Cargo.toml b/mediarepo-daemon/mediarepo-database/Cargo.toml index 6ed1944..df9387a 100644 --- a/mediarepo-daemon/mediarepo-database/Cargo.toml +++ b/mediarepo-daemon/mediarepo-database/Cargo.toml @@ -8,7 +8,7 @@ workspace = ".." [dependencies] chrono = "0.4.19" -tracing = "0.1.31" +tracing = "0.1.32" [dependencies.mediarepo-core] path = "../mediarepo-core" diff --git a/mediarepo-daemon/mediarepo-database/migrations/20220225183244_add-jobs-tables.sql b/mediarepo-daemon/mediarepo-database/migrations/20220225183244_add-jobs-tables.sql index b7438e9..2a65989 100644 --- a/mediarepo-daemon/mediarepo-database/migrations/20220225183244_add-jobs-tables.sql +++ b/mediarepo-daemon/mediarepo-database/migrations/20220225183244_add-jobs-tables.sql @@ -1,15 +1,5 @@ -CREATE TABLE jobs ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - job_type INTEGER NOT NULL, - name VARCHAR(255), - next_run DATETIME, - interval INTEGER -); - CREATE TABLE job_states ( - job_id INTEGER, - key VARCHAR(128) NOT NULL DEFAULT 'default', + job_type INTEGER NOT NULL, value BLOB, - PRIMARY KEY (job_id, key), - FOREIGN KEY (job_id) REFERENCES jobs (id) + PRIMARY KEY (job_type) ); \ No newline at end of file diff --git a/mediarepo-daemon/mediarepo-database/src/entities/job_state.rs b/mediarepo-daemon/mediarepo-database/src/entities/job_state.rs index 8180a6c..01f3360 100644 --- a/mediarepo-daemon/mediarepo-database/src/entities/job_state.rs +++ b/mediarepo-daemon/mediarepo-database/src/entities/job_state.rs @@ -1,29 +1,45 @@ use sea_orm::prelude::*; +use sea_orm::TryFromU64; #[derive(Clone, Debug, PartialEq, DeriveEntityModel)] #[sea_orm(table_name = "namespaces")] pub struct Model { #[sea_orm(primary_key)] - pub job_id: i64, - #[sea_orm(primary_key)] - pub key: String, + pub job_type: JobType, pub value: Vec, } -#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] -pub enum Relation { - #[sea_orm( - belongs_to = "super::job::Entity", - from = "Column::JobId", - to = "super::job::Column::Id" - )] - Job, +#[derive(Clone, Copy, Debug, PartialEq, EnumIter, DeriveActiveEnum)] +#[sea_orm(rs_type = "u32", db_type = "Integer")] +pub enum JobType { + #[sea_orm(num_value = 10)] + MigrateCDs, + #[sea_orm(num_value = 20)] + CalculateSizes, + #[sea_orm(num_value = 30)] + GenerateThumbs, + #[sea_orm(num_value = 40)] + CheckIntegrity, + #[sea_orm(num_value = 50)] + Vacuum, } -impl Related for Entity { - fn to() -> RelationDef { - Relation::Job.def() +impl TryFromU64 for JobType { + fn try_from_u64(n: u64) -> Result { + let value = match n { + 10 => Self::MigrateCDs, + 20 => Self::CalculateSizes, + 30 => Self::GenerateThumbs, + 40 => Self::CheckIntegrity, + 50 => Self::Vacuum, + _ => return Err(DbErr::Custom(String::from("Invalid job type"))), + }; + + Ok(value) } } +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation {} + impl ActiveModelBehavior for ActiveModel {} diff --git a/mediarepo-daemon/mediarepo-database/src/entities/mod.rs b/mediarepo-daemon/mediarepo-database/src/entities/mod.rs index a29c7b5..ca4af27 100644 --- a/mediarepo-daemon/mediarepo-database/src/entities/mod.rs +++ b/mediarepo-daemon/mediarepo-database/src/entities/mod.rs @@ -3,7 +3,6 @@ pub mod content_descriptor_source; pub mod content_descriptor_tag; pub mod file; pub mod file_metadata; -pub mod job; pub mod job_state; pub mod namespace; pub mod sort_key; diff --git a/mediarepo-daemon/mediarepo-logic/Cargo.toml b/mediarepo-daemon/mediarepo-logic/Cargo.toml index 018bcd4..7880acd 100644 --- a/mediarepo-daemon/mediarepo-logic/Cargo.toml +++ b/mediarepo-daemon/mediarepo-logic/Cargo.toml @@ -8,11 +8,10 @@ workspace = ".." [dependencies] chrono = "0.4.19" -typemap_rev = "0.1.5" serde = "1.0.136" mime_guess = "2.0.4" mime = "0.3.16" -tracing = "0.1.31" +tracing = "0.1.32" async-trait = "0.1.52" [dependencies.mediarepo-core] diff --git a/mediarepo-daemon/mediarepo-logic/src/dao/job/mod.rs b/mediarepo-daemon/mediarepo-logic/src/dao/job/mod.rs index 1e17ca9..b2077bc 100644 --- a/mediarepo-daemon/mediarepo-logic/src/dao/job/mod.rs +++ b/mediarepo-daemon/mediarepo-logic/src/dao/job/mod.rs @@ -1,9 +1,4 @@ use crate::dao_provider; -use crate::dto::JobDto; -use chrono::Local; -use mediarepo_core::error::RepoResult; -use mediarepo_database::entities::job; -use sea_orm::prelude::*; pub mod generate_missing_thumbnails; pub mod migrate_content_descriptors; @@ -11,19 +6,3 @@ pub mod sqlite_operations; pub mod state; dao_provider!(JobDao); - -impl JobDao { - /// Returns a list of all jobs that are scheduled (have a next_run date) - pub async fn scheduled_for_now(&self) -> RepoResult> { - let jobs = job::Entity::find() - .filter(job::Column::NextRun.is_not_null()) - .filter(job::Column::NextRun.lt(Local::now().naive_local())) - .all(&self.ctx.db) - .await? - .into_iter() - .map(JobDto::new) - .collect(); - - Ok(jobs) - } -} diff --git a/mediarepo-daemon/mediarepo-logic/src/dao/job/state.rs b/mediarepo-daemon/mediarepo-logic/src/dao/job/state.rs index 8370305..f125d5e 100644 --- a/mediarepo-daemon/mediarepo-logic/src/dao/job/state.rs +++ b/mediarepo-daemon/mediarepo-logic/src/dao/job/state.rs @@ -2,15 +2,16 @@ use crate::dao::job::JobDao; use crate::dto::{JobStateDto, UpsertJobStateDto}; use mediarepo_core::error::RepoResult; use mediarepo_database::entities::job_state; +use mediarepo_database::entities::job_state::JobType; use sea_orm::prelude::*; use sea_orm::ActiveValue::Set; use sea_orm::{Condition, TransactionTrait}; impl JobDao { /// Returns all job states for a given job id - pub async fn states_for_job_id(&self, job_id: i64) -> RepoResult> { + pub async fn states_for_job_type(&self, job_type: JobType) -> RepoResult> { let states = job_state::Entity::find() - .filter(job_state::Column::JobId.eq(job_id)) + .filter(job_state::Column::JobType.eq(job_type)) .all(&self.ctx.db) .await? .into_iter() @@ -40,11 +41,7 @@ impl JobDao { fn build_state_filters(states: &Vec) -> Condition { states .iter() - .map(|s| { - Condition::all() - .add(job_state::Column::JobId.eq(s.job_id)) - .add(job_state::Column::Key.eq(s.key.to_owned())) - }) + .map(|s| Condition::all().add(job_state::Column::JobType.eq(s.job_type))) .fold(Condition::any(), |acc, cond| acc.add(cond)) } @@ -52,8 +49,7 @@ fn build_active_state_models(states: Vec) -> Vec i64 { - self.model.job_id - } - - pub fn key(&self) -> &String { - &self.model.key + pub fn job_type(&self) -> JobType { + self.model.job_type } pub fn value(&self) -> &[u8] { @@ -29,7 +26,6 @@ impl JobStateDto { #[derive(Clone, Debug)] pub struct UpsertJobStateDto { - pub job_id: i64, - pub key: String, + pub job_type: JobType, pub value: Vec, } diff --git a/mediarepo-daemon/mediarepo-logic/src/dto/mod.rs b/mediarepo-daemon/mediarepo-logic/src/dto/mod.rs index c1285fa..638fe32 100644 --- a/mediarepo-daemon/mediarepo-logic/src/dto/mod.rs +++ b/mediarepo-daemon/mediarepo-logic/src/dto/mod.rs @@ -1,6 +1,5 @@ pub use file::*; pub use file_metadata::*; -pub use job::*; pub use job_state::*; pub use namespace::*; pub use sorting_preset::*; @@ -9,7 +8,6 @@ pub use thumbnail::*; mod file; mod file_metadata; -mod job; mod job_state; mod namespace; mod sorting_preset; diff --git a/mediarepo-daemon/mediarepo-logic/src/type_keys.rs b/mediarepo-daemon/mediarepo-logic/src/type_keys.rs index bd12d0c..649405e 100644 --- a/mediarepo-daemon/mediarepo-logic/src/type_keys.rs +++ b/mediarepo-daemon/mediarepo-logic/src/type_keys.rs @@ -1,8 +1,7 @@ use std::sync::Arc; -use typemap_rev::TypeMapKey; - use crate::dao::repo::Repo; +use mediarepo_core::typemap_rev::TypeMapKey; pub struct RepoKey; diff --git a/mediarepo-daemon/mediarepo-socket/Cargo.toml b/mediarepo-daemon/mediarepo-socket/Cargo.toml index 3911bc4..5f5b75a 100644 --- a/mediarepo-daemon/mediarepo-socket/Cargo.toml +++ b/mediarepo-daemon/mediarepo-socket/Cargo.toml @@ -8,7 +8,7 @@ workspace = ".." [dependencies] serde = "1.0.136" -tracing = "0.1.31" +tracing = "0.1.32" compare = "0.1.0" port_check = "0.1.5" rayon = "1.5.1" diff --git a/mediarepo-daemon/mediarepo-socket/src/lib.rs b/mediarepo-daemon/mediarepo-socket/src/lib.rs index b8efdfe..de4847a 100644 --- a/mediarepo-daemon/mediarepo-socket/src/lib.rs +++ b/mediarepo-daemon/mediarepo-socket/src/lib.rs @@ -1,6 +1,4 @@ use std::net::SocketAddr; -use std::path::PathBuf; -use std::sync::Arc; use tokio::net::TcpListener; use tokio::task::JoinHandle; @@ -10,20 +8,18 @@ use mediarepo_core::error::{RepoError, RepoResult}; use mediarepo_core::mediarepo_api::types::misc::InfoResponse; use mediarepo_core::settings::{PortSetting, Settings}; use mediarepo_core::tokio_graceful_shutdown::SubsystemHandle; -use mediarepo_core::type_keys::{RepoPathKey, SettingsKey, SizeMetadataKey, SubsystemKey}; -use mediarepo_logic::dao::repo::Repo; -use mediarepo_logic::type_keys::RepoKey; +use mediarepo_core::type_keys::{SizeMetadataKey, SubsystemKey}; +use mediarepo_core::type_list::TypeList; mod from_model; mod namespaces; mod utils; -#[tracing::instrument(skip(subsystem, settings, repo))] +#[tracing::instrument(skip_all)] pub fn start_tcp_server( subsystem: SubsystemHandle, - repo_path: PathBuf, settings: Settings, - repo: Repo, + shared_data: TypeList, ) -> RepoResult<(String, JoinHandle<()>)> { let port = match &settings.server.tcp.port { PortSetting::Fixed(p) => { @@ -45,9 +41,7 @@ pub fn start_tcp_server( .spawn(async move { get_builder::(address) .insert::(subsystem) - .insert::(Arc::new(repo)) - .insert::(settings) - .insert::(repo_path) + .insert_all(shared_data) .insert::(Default::default()) .build_server() .await @@ -58,13 +52,11 @@ pub fn start_tcp_server( } #[cfg(unix)] -#[tracing::instrument(skip(subsystem, settings, repo))] +#[tracing::instrument(skip_all)] pub fn create_unix_socket( subsystem: SubsystemHandle, path: std::path::PathBuf, - repo_path: PathBuf, - settings: Settings, - repo: Repo, + shared_data: TypeList, ) -> RepoResult> { use std::fs; use tokio::net::UnixListener; @@ -77,9 +69,7 @@ pub fn create_unix_socket( .spawn(async move { get_builder::(path) .insert::(subsystem) - .insert::(Arc::new(repo)) - .insert::(settings) - .insert::(repo_path) + .insert_all(shared_data) .insert::(Default::default()) .build_server() .await diff --git a/mediarepo-daemon/mediarepo-worker/Cargo.toml b/mediarepo-daemon/mediarepo-worker/Cargo.toml index d448ff5..abebc5e 100644 --- a/mediarepo-daemon/mediarepo-worker/Cargo.toml +++ b/mediarepo-daemon/mediarepo-worker/Cargo.toml @@ -7,7 +7,7 @@ edition = "2021" [dependencies] async-trait = "0.1.52" -tracing = "0.1.31" +tracing = "0.1.32" [dependencies.mediarepo-core] path = "../mediarepo-core" @@ -28,4 +28,4 @@ features = ["serde"] [dependencies.serde] version = "1.0.136" -features = ["derive"] \ No newline at end of file +features = ["derive"] diff --git a/mediarepo-daemon/mediarepo-worker/src/job_dispatcher.rs b/mediarepo-daemon/mediarepo-worker/src/job_dispatcher.rs index e69de29..41c1c22 100644 --- a/mediarepo-daemon/mediarepo-worker/src/job_dispatcher.rs +++ b/mediarepo-daemon/mediarepo-worker/src/job_dispatcher.rs @@ -0,0 +1,79 @@ +use crate::jobs::{Job, JobTypeKey}; +use mediarepo_core::tokio_graceful_shutdown::SubsystemHandle; +use mediarepo_core::typemap_rev::{TypeMap, TypeMapKey}; +use mediarepo_logic::dao::repo::Repo; +use mediarepo_logic::dao::DaoProvider; +use std::cell::UnsafeCell; +use std::sync::Arc; +use tokio::sync::RwLock; + +#[derive(Clone)] +pub struct JobDispatcher { + subsystem: Arc>, + job_status_map: Arc>, + repo: Arc, +} + +impl JobDispatcher { + pub fn new(subsystem: SubsystemHandle, repo: Repo) -> Self { + Self { + job_status_map: Default::default(), + subsystem: Arc::new(UnsafeCell::new(subsystem)), + repo: Arc::new(repo), + } + } + + #[tracing::instrument(level = "debug", skip_all)] + pub async fn dispatch(&self, job: T) -> Arc> { + let status = job.status(); + self.add_status::>(status.clone()).await; + + let subsystem = unsafe { + // SAFETY: the subsystem requires a mutable borrow for the start method + // the implementation of start doesn't need that mutability. So until that's + // changed we have to do some trickery. + &mut *self.subsystem.get() + }; + + let repo = self.repo.clone(); + + subsystem.start("worker-job", |subsystem| async move { + let job_2 = job.clone(); + let result = tokio::select! { + _ = subsystem.on_shutdown_requested() => { + job_2.save_status(repo.job()).await + } + r = job.run(repo.clone()) => { + + if let Err(e) = r { + Err(e) + } else { + job.save_status(repo.job()).await + } + } + }; + if let Err(e) = result { + tracing::error!("job failed with error: {}", e); + } + + Ok(()) + }); + + status + } + + #[inline] + async fn add_status(&self, status: T::Value) { + let mut status_map = self.job_status_map.write().await; + status_map.insert::(status); + } +} + +pub struct DispatcherKey; + +impl TypeMapKey for DispatcherKey { + type Value = JobDispatcher; +} + +unsafe impl Send for JobDispatcher {} +unsafe impl Sync for JobDispatcher {} diff --git a/mediarepo-daemon/mediarepo-worker/src/jobs/mod.rs b/mediarepo-daemon/mediarepo-worker/src/jobs/mod.rs index 4f42740..8b34449 100644 --- a/mediarepo-daemon/mediarepo-worker/src/jobs/mod.rs +++ b/mediarepo-daemon/mediarepo-worker/src/jobs/mod.rs @@ -1,18 +1,34 @@ mod vacuum; +use std::marker::PhantomData; +use std::sync::Arc; pub use vacuum::*; -use crate::execution_state::JobExecutionState; -use crate::progress::{JobProgressUpdate, ProgressSender}; -use crate::state_data::StateData; use async_trait::async_trait; use mediarepo_core::error::RepoResult; +use mediarepo_core::typemap_rev::TypeMapKey; +use mediarepo_logic::dao::job::JobDao; use mediarepo_logic::dao::repo::Repo; -use tokio::sync::mpsc::Sender; +use tokio::sync::RwLock; + +type EmptyStatus = Arc>; #[async_trait] -pub trait ScheduledJob { - async fn set_state(&self, state: StateData) -> RepoResult<()>; +pub trait Job: Clone + Send + Sync { + type JobStatus: Send + Sync; + + fn status(&self) -> Arc>; + + async fn run(&self, repo: Arc) -> RepoResult<()>; + + async fn save_status(&self, job_dao: JobDao) -> RepoResult<()>; +} + +pub struct JobTypeKey(PhantomData); - async fn run(&self, sender: &ProgressSender, repo: Repo) -> RepoResult<()>; +impl TypeMapKey for JobTypeKey +where + T: Job, +{ + type Value = Arc>; } diff --git a/mediarepo-daemon/mediarepo-worker/src/jobs/vacuum.rs b/mediarepo-daemon/mediarepo-worker/src/jobs/vacuum.rs index 7527d85..0be7d76 100644 --- a/mediarepo-daemon/mediarepo-worker/src/jobs/vacuum.rs +++ b/mediarepo-daemon/mediarepo-worker/src/jobs/vacuum.rs @@ -1,29 +1,32 @@ -use crate::execution_state::{ExecutionStateSynchronizer, JobExecutionState}; -use crate::jobs::ScheduledJob; -use crate::progress::{JobProgressUpdate, ProgressSender}; -use crate::state_data::StateData; +use crate::jobs::{EmptyStatus, Job}; use async_trait::async_trait; use mediarepo_core::error::RepoResult; +use mediarepo_logic::dao::job::JobDao; use mediarepo_logic::dao::repo::Repo; use mediarepo_logic::dao::DaoProvider; use std::sync::Arc; -use tokio::sync::mpsc::Sender; use tokio::sync::RwLock; #[derive(Default, Clone)] pub struct VacuumJob; #[async_trait] -impl ScheduledJob for VacuumJob { - async fn set_state(&self, _: StateData) -> RepoResult<()> { - Ok(()) +impl Job for VacuumJob { + type JobStatus = (); + + fn status(&self) -> Arc> { + EmptyStatus::default() } - async fn run(&self, sender: &ProgressSender, repo: Repo) -> RepoResult<()> { - sender.send_progress_percent(0.0); + #[tracing::instrument(level = "debug", skip_all)] + async fn run(&self, repo: Arc) -> RepoResult<()> { repo.job().vacuum().await?; - sender.send_progress_percent(1.0); Ok(()) } + + #[tracing::instrument(level = "debug", skip_all)] + async fn save_status(&self, _: JobDao) -> RepoResult<()> { + Ok(()) + } } diff --git a/mediarepo-daemon/mediarepo-worker/src/lib.rs b/mediarepo-daemon/mediarepo-worker/src/lib.rs index 8044986..b72a922 100644 --- a/mediarepo-daemon/mediarepo-worker/src/lib.rs +++ b/mediarepo-daemon/mediarepo-worker/src/lib.rs @@ -1,6 +1,27 @@ -pub mod execution_state; +use crate::job_dispatcher::JobDispatcher; +use crate::jobs::VacuumJob; +use mediarepo_core::error::RepoError; +use mediarepo_core::tokio_graceful_shutdown::Toplevel; +use mediarepo_logic::dao::repo::Repo; +use tokio::sync::oneshot::channel; + +pub mod job_dispatcher; pub mod jobs; -pub mod jobs_table; -pub mod progress; -pub mod scheduler; -pub mod state_data; + +pub async fn start(top_level: Toplevel, repo: Repo) -> (Toplevel, JobDispatcher) { + let (tx, rx) = channel(); + + let top_level = top_level.start("mediarepo-worker", |subsystem| async move { + let dispatcher = JobDispatcher::new(subsystem, repo); + tx.send(dispatcher.clone()) + .map_err(|_| RepoError::from("failed to send dispatcher"))?; + dispatcher.dispatch(VacuumJob::default()).await; + + Ok(()) + }); + let receiver = rx + .await + .expect("failed to create background job dispatcher"); + + (top_level, receiver) +} diff --git a/mediarepo-daemon/src/main.rs b/mediarepo-daemon/src/main.rs index f633bb4..dc4b950 100644 --- a/mediarepo-daemon/src/main.rs +++ b/mediarepo-daemon/src/main.rs @@ -1,5 +1,6 @@ use std::env; use std::path::PathBuf; +use std::sync::Arc; use std::time::Duration; use structopt::StructOpt; @@ -10,8 +11,12 @@ use mediarepo_core::error::RepoResult; use mediarepo_core::fs::drop_file::DropFile; use mediarepo_core::settings::{PathSettings, Settings}; use mediarepo_core::tokio_graceful_shutdown::{SubsystemHandle, Toplevel}; +use mediarepo_core::type_keys::{RepoPathKey, SettingsKey}; +use mediarepo_core::type_list::TypeList; use mediarepo_logic::dao::repo::Repo; +use mediarepo_logic::type_keys::RepoKey; use mediarepo_socket::start_tcp_server; +use mediarepo_worker::job_dispatcher::DispatcherKey; use crate::utils::{create_paths_for_repo, get_repo, load_settings}; @@ -99,18 +104,23 @@ async fn init_repo(opt: &Opt, paths: &PathSettings) -> RepoResult { /// Starts the server async fn start_server(opt: Opt, settings: Settings) -> RepoResult<()> { let repo = init_repo(&opt, &settings.paths).await?; - let mut top_level = Toplevel::new(); + let (mut top_level, dispatcher) = mediarepo_worker::start(Toplevel::new(), repo.clone()).await; + + let mut shared_data = TypeList::default(); + shared_data.add::(Arc::new(repo)); + shared_data.add::(settings.clone()); + shared_data.add::(opt.repo.clone()); + shared_data.add::(dispatcher); #[cfg(unix)] { if settings.server.unix_socket.enabled { - let settings = settings.clone(); let repo_path = opt.repo.clone(); - let repo = repo.clone(); + let shared_data = shared_data.clone(); top_level = top_level.start("mediarepo-unix-socket", |subsystem| { Box::pin(async move { - start_and_await_unix_socket(subsystem, repo_path, settings, repo).await?; + start_and_await_unix_socket(subsystem, repo_path, shared_data).await?; Ok(()) }) }) @@ -120,7 +130,7 @@ async fn start_server(opt: Opt, settings: Settings) -> RepoResult<()> { if settings.server.tcp.enabled { top_level = top_level.start("mediarepo-tcp", move |subsystem| { Box::pin(async move { - start_and_await_tcp_server(subsystem, opt.repo, settings, repo).await?; + start_and_await_tcp_server(subsystem, opt.repo, settings, shared_data).await?; Ok(()) }) @@ -147,9 +157,9 @@ async fn start_and_await_tcp_server( subsystem: SubsystemHandle, repo_path: PathBuf, settings: Settings, - repo: Repo, + shared_data: TypeList, ) -> RepoResult<()> { - let (address, handle) = start_tcp_server(subsystem.clone(), repo_path.clone(), settings, repo)?; + let (address, handle) = start_tcp_server(subsystem.clone(), settings, shared_data)?; let (mut file, _guard) = DropFile::new(repo_path.join("repo.tcp")).await?; file.write_all(&address.into_bytes()).await?; @@ -172,17 +182,10 @@ async fn start_and_await_tcp_server( async fn start_and_await_unix_socket( subsystem: SubsystemHandle, repo_path: PathBuf, - settings: Settings, - repo: Repo, + shared_data: TypeList, ) -> RepoResult<()> { let socket_path = repo_path.join("repo.sock"); - let handle = mediarepo_socket::create_unix_socket( - subsystem.clone(), - socket_path, - repo_path.clone(), - settings, - repo, - )?; + let handle = mediarepo_socket::create_unix_socket(subsystem.clone(), socket_path, shared_data)?; let _guard = DropFile::from_path(repo_path.join("repo.sock")); tokio::select! {