diff --git a/mediarepo-api/Cargo.toml b/mediarepo-api/Cargo.toml index 4510d07..ba7b42b 100644 --- a/mediarepo-api/Cargo.toml +++ b/mediarepo-api/Cargo.toml @@ -7,20 +7,20 @@ license = "gpl-3" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -tracing = "0.1.30" +tracing = "0.1.32" thiserror = "1.0.30" async-trait = { version = "0.1.52", optional = true } parking_lot = { version = "0.12.0", optional = true } -serde_json = { version = "1.0.78", optional = true } +serde_json = { version = "1.0.79", optional = true } directories = { version = "4.0.1", optional = true } -mime_guess = { version = "2.0.3", optional = true } +mime_guess = { version = "2.0.4", optional = true } serde_piecewise_default = "0.2.0" futures = { version = "0.3.21", optional = true } url = { version = "2.2.2", optional = true } pathsearch = { version = "0.2.0", optional = true } [dependencies.bromine] -version = "0.18.1" +version = "0.19.0" optional = true features = ["serialize_bincode"] @@ -34,12 +34,12 @@ features = ["serde"] [dependencies.tauri] version = "1.0.0-rc.4" -optional=true +optional = true default-features = false features = [] [dependencies.tokio] -version = "1.16.1" +version = "1.17.0" optional = true features = ["sync", "fs", "net", "io-util", "io-std", "time", "rt", "process"] diff --git a/mediarepo-daemon/Cargo.lock b/mediarepo-daemon/Cargo.lock index 27cc942..5c4ebb6 100644 --- a/mediarepo-daemon/Cargo.lock +++ b/mediarepo-daemon/Cargo.lock @@ -63,9 +63,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.55" +version = "1.0.56" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "159bb86af3a200e19a068f4224eae4c8bb2d0fa054c7e5d1cacd5cef95e684cd" +checksum = "4361135be9122e0870de935d7c439aef945b9f9ddd4199a553b5270b49c82a27" [[package]] name = "arrayref" @@ -81,9 +81,9 @@ checksum = "8da52d66c7071e2e3fa2a1e5c6d088fec47b593032b254f5e980de8ea54454d6" [[package]] name = "async-recursion" -version = "0.3.2" +version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d7d78656ba01f1b93024b7c3a0467f1608e4be67d725749fdcd7d2c7678fd7a2" +checksum = "2cda8f4bcc10624c4e85bc66b3f452cca98cfa5ca002dc83a16aad2367641bea" dependencies = [ "proc-macro2 1.0.36", "quote 1.0.15", @@ -270,9 +270,9 @@ dependencies = [ [[package]] name = "bromine" -version = "0.18.1" +version = "0.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5dd7887995490657bf3ec578f39e747ef7b5355a8dc6c99b3d5be59ca70dc4d5" +checksum = "a05cd0cd5646e705df88816dcc36eaf4e21b940cea66f1e027970cd58e3dc897" dependencies = [ "async-trait", "bincode", @@ -284,7 +284,7 @@ dependencies = [ "thiserror", "tokio", "tracing", - "typemap_rev", + "trait-bound-typemap", ] [[package]] @@ -301,9 +301,9 @@ checksum = "e3b5ca7a04898ad4bcd41c90c5285445ff5b791899bb1b0abdd2a2aa791211d7" [[package]] name = "bytemuck" -version = "1.7.3" +version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "439989e6b8c38d1b6570a384ef1e49c8848128f5a97f3914baef02920842712f" +checksum = "0e851ca7c24871e7336801608a4797d7376545b6928a10d32d75685687141ead" [[package]] name = "byteorder" @@ -709,9 +709,9 @@ dependencies = [ [[package]] name = "flume" -version = "0.10.11" +version = "0.10.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b279436a715a9de95dcd26b151db590a71961cc06e54918b24fe0dd5b7d3fc4" +checksum = "843c03199d0c0ca54bc1ea90ac0d507274c28abcc4f691ae8b4eaa375087c76a" dependencies = [ "futures-core", "futures-sink", @@ -885,7 +885,7 @@ dependencies = [ "cfg-if 1.0.0", "js-sys", "libc", - "wasi", + "wasi 0.10.0+wasi-snapshot-preview1", "wasm-bindgen", ] @@ -907,9 +907,9 @@ checksum = "9b919933a397b79c37e33b77bb2aa3dc8eb6e165ad809e58ff75bc7db2e34574" [[package]] name = "h2" -version = "0.3.11" +version = "0.3.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d9f1f717ddc7b2ba36df7e871fd88db79326551d3d6f1fc406fbfd28b582ff8e" +checksum = "62eeb471aa3e3c9197aa4bfeabfe02982f6dc96f750486c0bb0009ac58b26d2b" dependencies = [ "bytes", "fnv", @@ -1300,6 +1300,7 @@ name = "mediarepo-core" version = "0.1.0" dependencies = [ "base64", + "bincode", "config", "data-encoding", "futures 0.3.21", @@ -1318,7 +1319,7 @@ dependencies = [ "toml", "tracing", "tracing-subscriber", - "typemap_rev", + "trait-bound-typemap", ] [[package]] @@ -1331,6 +1332,7 @@ dependencies = [ "mediarepo-core", "mediarepo-logic", "mediarepo-socket", + "mediarepo-worker", "num-integer", "opentelemetry", "opentelemetry-jaeger", @@ -1371,7 +1373,6 @@ dependencies = [ "serde", "tokio", "tracing", - "typemap_rev", ] [[package]] @@ -1383,6 +1384,7 @@ dependencies = [ "mediarepo-core", "mediarepo-database", "mediarepo-logic", + "mediarepo-worker", "port_check", "rayon", "serde", @@ -1391,6 +1393,20 @@ dependencies = [ "tracing-futures", ] +[[package]] +name = "mediarepo-worker" +version = "0.1.0" +dependencies = [ + "async-trait", + "chrono", + "mediarepo-core", + "mediarepo-database", + "mediarepo-logic", + "serde", + "tokio", + "tracing", +] + [[package]] name = "memchr" version = "2.4.1" @@ -1449,14 +1465,15 @@ dependencies = [ [[package]] name = "mio" -version = "0.8.0" +version = "0.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ba272f85fa0b41fc91872be579b3bbe0f56b792aa361a380eb669469f68dafb2" +checksum = "7ba42135c6a5917b9db9cd7b293e5409e1c6b041e6f9825e92e55a894c63b6f8" dependencies = [ "libc", "log", "miow", "ntapi", + "wasi 0.11.0+wasi-snapshot-preview1", "winapi", ] @@ -1469,6 +1486,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "multi-trait-object" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a54c9ed2b86c7927b63e7d51f8d7ed4e1f8513c8672828ca1a850ff9d32ab1c" + [[package]] name = "multibase" version = "0.9.1" @@ -1519,9 +1542,9 @@ checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a" [[package]] name = "nanorand" -version = "0.6.1" +version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "729eb334247daa1803e0a094d0a5c55711b85571179f5ec6e53eccfdf7008958" +checksum = "6a51313c5820b0b02bd422f4b44776fbf47961755c74ce64afc73bfad10226c3" dependencies = [ "getrandom", ] @@ -1628,18 +1651,18 @@ dependencies = [ [[package]] name = "num_enum" -version = "0.5.6" +version = "0.5.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "720d3ea1055e4e4574c0c0b0f8c3fd4f24c4cdaf465948206dea090b57b526ad" +checksum = "cf5395665662ef45796a4ff5486c5d41d29e0c09640af4c5f17fd94ee2c119c9" dependencies = [ "num_enum_derive", ] [[package]] name = "num_enum_derive" -version = "0.5.6" +version = "0.5.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0d992b768490d7fe0d8586d9b5745f6c49f557da6d81dc982b1d167ad4edbb21" +checksum = "3b0498641e53dd6ac1a4f22547548caa6864cc4933784319cd1775271c5a46ce" dependencies = [ "proc-macro-crate", "proc-macro2 1.0.36", @@ -1649,18 +1672,18 @@ dependencies = [ [[package]] name = "num_threads" -version = "0.1.3" +version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97ba99ba6393e2c3734791401b66902d981cb03bf190af674ca69949b6d5fb15" +checksum = "c539a50b93a303167eded6e8dff5220cd39447409fb659f4cd24b1f72fe4f133" dependencies = [ "libc", ] [[package]] name = "once_cell" -version = "1.9.0" +version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da32515d9f6e6e489d7bc9d84c71b060db7247dc035bbe44eac88cf87486d8d5" +checksum = "87f3e037eac156d1775da914196f0f37741a274155e34a0b7e427c35d2a2ecb9" [[package]] name = "opaque-debug" @@ -1931,9 +1954,9 @@ checksum = "58893f751c9b0412871a09abd62ecd2a00298c6c83befa223ef98c52aef40cbe" [[package]] name = "png" -version = "0.17.3" +version = "0.17.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e8f1882177b17c98ec33a51f5910ecbf4db92ca0def706781a1f8d0c661f393" +checksum = "dc38c0ad57efb786dd57b9864e5b18bae478c00c824dc55a38bbc9da95dde3ba" dependencies = [ "bitflags", "crc32fast", @@ -2133,18 +2156,18 @@ dependencies = [ [[package]] name = "redox_syscall" -version = "0.2.10" +version = "0.2.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8383f39639269cde97d255a32bdb68c047337295414940c68bdd30c2e13203ff" +checksum = "8380fe0152551244f0747b1bf41737e0f8a74f97a14ccefd1148187271634f3c" dependencies = [ "bitflags", ] [[package]] name = "regex" -version = "1.5.4" +version = "1.5.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d07a8629359eb56f1e2fb1652bb04212c072a87ba68546a04065d525673ac461" +checksum = "1a11647b6b25ff05a515cb92c365cec08801e83423a235b51e231e1808747286" dependencies = [ "aho-corasick", "memchr", @@ -2793,7 +2816,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6db9e6914ab8b1ae1c260a4ae7a49b6c5611b40328a735b21862567685e73255" dependencies = [ "libc", - "wasi", + "wasi 0.10.0+wasi-snapshot-preview1", "winapi", ] @@ -2855,16 +2878,16 @@ dependencies = [ [[package]] name = "tokio-graceful-shutdown" -version = "0.4.3" +version = "0.4.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d08ebea7dc6b22273290d8ece2ca448f979f836e38ba629b650595c64204b4f2" +checksum = "4f21c36e43c82d5f32302aff8ac9efb79e10db9538b0940ef69cce38a01614ae" dependencies = [ "anyhow", "async-recursion", "futures 0.3.21", "log", "tokio", - "tokio-util 0.6.9", + "tokio-util 0.7.0", ] [[package]] @@ -3023,9 +3046,9 @@ checksum = "360dfd1d6d30e05fda32ace2c8c70e9c0a9da713275777f5a4dbb8a1893930c6" [[package]] name = "tracing" -version = "0.1.31" +version = "0.1.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f6c650a8ef0cd2dd93736f033d21cbd1224c5a967aa0c258d00fcf7dafef9b9f" +checksum = "4a1bdf54a7c28a2bbf701e1d2233f6c77f473486b94bee4f9678da5a148dca7f" dependencies = [ "cfg-if 1.0.0", "log", @@ -3036,9 +3059,9 @@ dependencies = [ [[package]] name = "tracing-appender" -version = "0.2.0" +version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "94571df2eae3ed4353815ea5a90974a594a1792d8782ff2cbcc9392d1101f366" +checksum = "9ab026b18a46ac429e5c98bec10ca06424a97b3ad7b3949d9b4a102fff6623c4" dependencies = [ "crossbeam-channel", "time 0.3.7", @@ -3047,9 +3070,9 @@ dependencies = [ [[package]] name = "tracing-attributes" -version = "0.1.19" +version = "0.1.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8276d9a4a3a558d7b7ad5303ad50b53d58264641b82914b7ada36bd762e7a716" +checksum = "2e65ce065b4b5c53e73bb28912318cb8c9e9ad3921f1d669eb0e68b4c8143a2b" dependencies = [ "proc-macro2 1.0.36", "quote 1.0.15", @@ -3058,9 +3081,9 @@ dependencies = [ [[package]] name = "tracing-core" -version = "0.1.22" +version = "0.1.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "03cfcb51380632a72d3111cb8d3447a8d908e577d31beeac006f836383d29a23" +checksum = "aa31669fa42c09c34d94d8165dd2012e8ff3c66aca50f3bb226b68f216f2706c" dependencies = [ "lazy_static", "valuable", @@ -3144,16 +3167,19 @@ dependencies = [ ] [[package]] -name = "try-lock" -version = "0.2.3" +name = "trait-bound-typemap" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642" +checksum = "3631df5ba73c0e41b1aa337df3bcca7f15219f042f8fec1100857bc1eb60c767" +dependencies = [ + "multi-trait-object", +] [[package]] -name = "typemap_rev" -version = "0.1.5" +name = "try-lock" +version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ed5b74f0a24b5454580a79abb6994393b09adf0ab8070f15827cb666255de155" +checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642" [[package]] name = "typenum" @@ -3289,6 +3315,12 @@ version = "0.10.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1a143597ca7c7793eff794def352d41792a93c481eb1042423ff7ff72ba2c31f" +[[package]] +name = "wasi" +version = "0.11.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" + [[package]] name = "wasm-bindgen" version = "0.2.79" diff --git a/mediarepo-daemon/Cargo.toml b/mediarepo-daemon/Cargo.toml index 5f37c2c..cf21780 100644 --- a/mediarepo-daemon/Cargo.toml +++ b/mediarepo-daemon/Cargo.toml @@ -1,6 +1,6 @@ [workspace] -members = ["mediarepo-core", "mediarepo-database", "mediarepo-logic", "mediarepo-socket", "."] -default-members = ["mediarepo-core", "mediarepo-database", "mediarepo-logic", "mediarepo-socket", "."] +members = ["mediarepo-core", "mediarepo-database", "mediarepo-logic", "mediarepo-socket", "mediarepo-worker", "."] +default-members = ["mediarepo-core", "mediarepo-database", "mediarepo-logic", "mediarepo-socket", "mediarepo-worker", "."] [package] name = "mediarepo-daemon" @@ -16,12 +16,12 @@ name = "mediarepo-daemon" path = "src/main.rs" [dependencies] -tracing = "0.1.31" +tracing = "0.1.32" toml = "0.5.8" structopt = "0.3.26" glob = "0.3.0" tracing-flame = "0.2.0" -tracing-appender = "0.2.0" +tracing-appender = "0.2.1" tracing-log = "0.1.2" rolling-file = "0.1.0" num-integer = "0.1.44" @@ -40,6 +40,9 @@ path = "mediarepo-logic" [dependencies.mediarepo-socket] path = "./mediarepo-socket" +[dependencies.mediarepo-worker] +path = "./mediarepo-worker" + [dependencies.tokio] version = "1.17.0" features = ["macros", "rt-multi-thread", "io-std", "io-util"] diff --git a/mediarepo-daemon/mediarepo-core/Cargo.toml b/mediarepo-daemon/mediarepo-core/Cargo.toml index bf4f1d5..48d2bbc 100644 --- a/mediarepo-daemon/mediarepo-core/Cargo.toml +++ b/mediarepo-daemon/mediarepo-core/Cargo.toml @@ -13,15 +13,16 @@ multibase = "0.9.1" base64 = "0.13.0" toml = "0.5.8" serde = "1.0.136" -typemap_rev = "0.1.5" futures = "0.3.21" itertools = "0.10.3" glob = "0.3.0" -tracing = "0.1.31" +tracing = "0.1.32" data-encoding = "2.3.2" -tokio-graceful-shutdown = "0.4.3" +tokio-graceful-shutdown = "0.4.4" thumbnailer = "0.4.0" +bincode = "1.3.3" tracing-subscriber = "0.3.9" +trait-bound-typemap = "0.3.3" [dependencies.sea-orm] version = "0.6.0" diff --git a/mediarepo-daemon/mediarepo-core/src/error.rs b/mediarepo-daemon/mediarepo-core/src/error.rs index 7f5dfbc..4173ed4 100644 --- a/mediarepo-daemon/mediarepo-core/src/error.rs +++ b/mediarepo-daemon/mediarepo-core/src/error.rs @@ -43,6 +43,9 @@ pub enum RepoError { #[error("the database file is corrupted {0}")] Corrupted(String), + + #[error("bincode de-/serialization failed {0}")] + Bincode(#[from] bincode::Error), } #[derive(Error, Debug)] diff --git a/mediarepo-daemon/mediarepo-core/src/lib.rs b/mediarepo-daemon/mediarepo-core/src/lib.rs index 688c48b..1edc42d 100644 --- a/mediarepo-daemon/mediarepo-core/src/lib.rs +++ b/mediarepo-daemon/mediarepo-core/src/lib.rs @@ -1,9 +1,11 @@ +pub use bincode; pub use futures; pub use itertools; pub use mediarepo_api; pub use mediarepo_api::bromine; pub use thumbnailer; pub use tokio_graceful_shutdown; +pub use trait_bound_typemap; pub mod content_descriptor; pub mod context; diff --git a/mediarepo-daemon/mediarepo-core/src/type_keys.rs b/mediarepo-daemon/mediarepo-core/src/type_keys.rs index 8cebd2c..778bcb8 100644 --- a/mediarepo-daemon/mediarepo-core/src/type_keys.rs +++ b/mediarepo-daemon/mediarepo-core/src/type_keys.rs @@ -3,7 +3,7 @@ use std::path::PathBuf; use mediarepo_api::types::repo::SizeType; use tokio_graceful_shutdown::SubsystemHandle; -use typemap_rev::TypeMapKey; +use trait_bound_typemap::TypeMapKey; use crate::settings::Settings; diff --git a/mediarepo-daemon/mediarepo-database/Cargo.toml b/mediarepo-daemon/mediarepo-database/Cargo.toml index 6ed1944..df9387a 100644 --- a/mediarepo-daemon/mediarepo-database/Cargo.toml +++ b/mediarepo-daemon/mediarepo-database/Cargo.toml @@ -8,7 +8,7 @@ workspace = ".." [dependencies] chrono = "0.4.19" -tracing = "0.1.31" +tracing = "0.1.32" [dependencies.mediarepo-core] path = "../mediarepo-core" diff --git a/mediarepo-daemon/mediarepo-database/migrations/20220225183244_add-jobs-tables.sql b/mediarepo-daemon/mediarepo-database/migrations/20220225183244_add-jobs-tables.sql new file mode 100644 index 0000000..2a65989 --- /dev/null +++ b/mediarepo-daemon/mediarepo-database/migrations/20220225183244_add-jobs-tables.sql @@ -0,0 +1,5 @@ +CREATE TABLE job_states ( + job_type INTEGER NOT NULL, + value BLOB, + PRIMARY KEY (job_type) +); \ No newline at end of file diff --git a/mediarepo-daemon/mediarepo-database/src/entities/job_state.rs b/mediarepo-daemon/mediarepo-database/src/entities/job_state.rs new file mode 100644 index 0000000..2c60e59 --- /dev/null +++ b/mediarepo-daemon/mediarepo-database/src/entities/job_state.rs @@ -0,0 +1,45 @@ +use sea_orm::prelude::*; +use sea_orm::TryFromU64; + +#[derive(Clone, Debug, PartialEq, DeriveEntityModel)] +#[sea_orm(table_name = "job_states")] +pub struct Model { + #[sea_orm(primary_key)] + pub job_type: JobType, + pub value: Vec, +} + +#[derive(Clone, Copy, Debug, PartialEq, EnumIter, DeriveActiveEnum)] +#[sea_orm(rs_type = "u32", db_type = "Integer")] +pub enum JobType { + #[sea_orm(num_value = 10)] + MigrateCDs, + #[sea_orm(num_value = 20)] + CalculateSizes, + #[sea_orm(num_value = 30)] + GenerateThumbs, + #[sea_orm(num_value = 40)] + CheckIntegrity, + #[sea_orm(num_value = 50)] + Vacuum, +} + +impl TryFromU64 for JobType { + fn try_from_u64(n: u64) -> Result { + let value = match n { + 10 => Self::MigrateCDs, + 20 => Self::CalculateSizes, + 30 => Self::GenerateThumbs, + 40 => Self::CheckIntegrity, + 50 => Self::Vacuum, + _ => return Err(DbErr::Custom(String::from("Invalid job type"))), + }; + + Ok(value) + } +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation {} + +impl ActiveModelBehavior for ActiveModel {} diff --git a/mediarepo-daemon/mediarepo-database/src/entities/mod.rs b/mediarepo-daemon/mediarepo-database/src/entities/mod.rs index 3eee5ae..ca4af27 100644 --- a/mediarepo-daemon/mediarepo-database/src/entities/mod.rs +++ b/mediarepo-daemon/mediarepo-database/src/entities/mod.rs @@ -3,6 +3,7 @@ pub mod content_descriptor_source; pub mod content_descriptor_tag; pub mod file; pub mod file_metadata; +pub mod job_state; pub mod namespace; pub mod sort_key; pub mod sorting_preset; diff --git a/mediarepo-daemon/mediarepo-logic/Cargo.toml b/mediarepo-daemon/mediarepo-logic/Cargo.toml index 018bcd4..7880acd 100644 --- a/mediarepo-daemon/mediarepo-logic/Cargo.toml +++ b/mediarepo-daemon/mediarepo-logic/Cargo.toml @@ -8,11 +8,10 @@ workspace = ".." [dependencies] chrono = "0.4.19" -typemap_rev = "0.1.5" serde = "1.0.136" mime_guess = "2.0.4" mime = "0.3.16" -tracing = "0.1.31" +tracing = "0.1.32" async-trait = "0.1.52" [dependencies.mediarepo-core] diff --git a/mediarepo-daemon/mediarepo-logic/src/dao/job/mod.rs b/mediarepo-daemon/mediarepo-logic/src/dao/job/mod.rs index 31ab290..b2077bc 100644 --- a/mediarepo-daemon/mediarepo-logic/src/dao/job/mod.rs +++ b/mediarepo-daemon/mediarepo-logic/src/dao/job/mod.rs @@ -3,5 +3,6 @@ use crate::dao_provider; pub mod generate_missing_thumbnails; pub mod migrate_content_descriptors; pub mod sqlite_operations; +pub mod state; dao_provider!(JobDao); diff --git a/mediarepo-daemon/mediarepo-logic/src/dao/job/state.rs b/mediarepo-daemon/mediarepo-logic/src/dao/job/state.rs new file mode 100644 index 0000000..a8aab72 --- /dev/null +++ b/mediarepo-daemon/mediarepo-logic/src/dao/job/state.rs @@ -0,0 +1,58 @@ +use crate::dao::job::JobDao; +use crate::dto::{JobStateDto, UpsertJobStateDto}; +use mediarepo_core::error::RepoResult; +use mediarepo_database::entities::job_state; +use mediarepo_database::entities::job_state::JobType; +use sea_orm::prelude::*; +use sea_orm::ActiveValue::Set; +use sea_orm::{Condition, TransactionTrait}; + +impl JobDao { + /// Returns all job states for a given job id + pub async fn state_for_job_type(&self, job_type: JobType) -> RepoResult> { + let state = job_state::Entity::find() + .filter(job_state::Column::JobType.eq(job_type)) + .one(&self.ctx.db) + .await? + .map(JobStateDto::new); + + Ok(state) + } + + pub async fn upsert_state(&self, state: UpsertJobStateDto) -> RepoResult<()> { + self.upsert_multiple_states(vec![state]).await + } + + pub async fn upsert_multiple_states(&self, states: Vec) -> RepoResult<()> { + let trx = self.ctx.db.begin().await?; + + job_state::Entity::delete_many() + .filter(build_state_filters(&states)) + .exec(&trx) + .await?; + job_state::Entity::insert_many(build_active_state_models(states)) + .exec(&trx) + .await?; + + trx.commit().await?; + + Ok(()) + } +} + +fn build_state_filters(states: &Vec) -> Condition { + states + .iter() + .map(|s| Condition::all().add(job_state::Column::JobType.eq(s.job_type))) + .fold(Condition::any(), |acc, cond| acc.add(cond)) +} + +fn build_active_state_models(states: Vec) -> Vec { + states + .into_iter() + .map(|s| job_state::ActiveModel { + job_type: Set(s.job_type), + value: Set(s.value), + }) + .collect() +} diff --git a/mediarepo-daemon/mediarepo-logic/src/dto/job_state.rs b/mediarepo-daemon/mediarepo-logic/src/dto/job_state.rs new file mode 100644 index 0000000..8c551c0 --- /dev/null +++ b/mediarepo-daemon/mediarepo-logic/src/dto/job_state.rs @@ -0,0 +1,31 @@ +use mediarepo_database::entities::job_state; +use mediarepo_database::entities::job_state::JobType; + +#[derive(Clone, Debug)] +pub struct JobStateDto { + model: job_state::Model, +} + +impl JobStateDto { + pub(crate) fn new(model: job_state::Model) -> Self { + Self { model } + } + + pub fn job_type(&self) -> JobType { + self.model.job_type + } + + pub fn value(&self) -> &[u8] { + &self.model.value + } + + pub fn into_value(self) -> Vec { + self.model.value + } +} + +#[derive(Clone, Debug)] +pub struct UpsertJobStateDto { + pub job_type: JobType, + pub value: Vec, +} diff --git a/mediarepo-daemon/mediarepo-logic/src/dto/mod.rs b/mediarepo-daemon/mediarepo-logic/src/dto/mod.rs index 2444214..638fe32 100644 --- a/mediarepo-daemon/mediarepo-logic/src/dto/mod.rs +++ b/mediarepo-daemon/mediarepo-logic/src/dto/mod.rs @@ -1,5 +1,6 @@ pub use file::*; pub use file_metadata::*; +pub use job_state::*; pub use namespace::*; pub use sorting_preset::*; pub use tag::*; @@ -7,6 +8,7 @@ pub use thumbnail::*; mod file; mod file_metadata; +mod job_state; mod namespace; mod sorting_preset; mod tag; diff --git a/mediarepo-daemon/mediarepo-logic/src/type_keys.rs b/mediarepo-daemon/mediarepo-logic/src/type_keys.rs index bd12d0c..b19a866 100644 --- a/mediarepo-daemon/mediarepo-logic/src/type_keys.rs +++ b/mediarepo-daemon/mediarepo-logic/src/type_keys.rs @@ -1,7 +1,6 @@ +use mediarepo_core::trait_bound_typemap::TypeMapKey; use std::sync::Arc; -use typemap_rev::TypeMapKey; - use crate::dao::repo::Repo; pub struct RepoKey; diff --git a/mediarepo-daemon/mediarepo-socket/Cargo.toml b/mediarepo-daemon/mediarepo-socket/Cargo.toml index 3911bc4..b498f81 100644 --- a/mediarepo-daemon/mediarepo-socket/Cargo.toml +++ b/mediarepo-daemon/mediarepo-socket/Cargo.toml @@ -8,7 +8,7 @@ workspace = ".." [dependencies] serde = "1.0.136" -tracing = "0.1.31" +tracing = "0.1.32" compare = "0.1.0" port_check = "0.1.5" rayon = "1.5.1" @@ -22,6 +22,9 @@ path = "../mediarepo-database" [dependencies.mediarepo-logic] path = "../mediarepo-logic" +[dependencies.mediarepo-worker] +path = "../mediarepo-worker" + [dependencies.tokio] version = "1.17.0" features = ["net"] diff --git a/mediarepo-daemon/mediarepo-socket/src/lib.rs b/mediarepo-daemon/mediarepo-socket/src/lib.rs index b8efdfe..5818419 100644 --- a/mediarepo-daemon/mediarepo-socket/src/lib.rs +++ b/mediarepo-daemon/mediarepo-socket/src/lib.rs @@ -1,6 +1,4 @@ use std::net::SocketAddr; -use std::path::PathBuf; -use std::sync::Arc; use tokio::net::TcpListener; use tokio::task::JoinHandle; @@ -10,20 +8,18 @@ use mediarepo_core::error::{RepoError, RepoResult}; use mediarepo_core::mediarepo_api::types::misc::InfoResponse; use mediarepo_core::settings::{PortSetting, Settings}; use mediarepo_core::tokio_graceful_shutdown::SubsystemHandle; -use mediarepo_core::type_keys::{RepoPathKey, SettingsKey, SizeMetadataKey, SubsystemKey}; -use mediarepo_logic::dao::repo::Repo; -use mediarepo_logic::type_keys::RepoKey; +use mediarepo_core::trait_bound_typemap::{SendSyncTypeMap, TypeMap}; +use mediarepo_core::type_keys::{SizeMetadataKey, SubsystemKey}; mod from_model; mod namespaces; mod utils; -#[tracing::instrument(skip(subsystem, settings, repo))] +#[tracing::instrument(skip_all)] pub fn start_tcp_server( subsystem: SubsystemHandle, - repo_path: PathBuf, settings: Settings, - repo: Repo, + shared_data: SendSyncTypeMap, ) -> RepoResult<(String, JoinHandle<()>)> { let port = match &settings.server.tcp.port { PortSetting::Fixed(p) => { @@ -45,9 +41,7 @@ pub fn start_tcp_server( .spawn(async move { get_builder::(address) .insert::(subsystem) - .insert::(Arc::new(repo)) - .insert::(settings) - .insert::(repo_path) + .insert_all(shared_data) .insert::(Default::default()) .build_server() .await @@ -58,13 +52,11 @@ pub fn start_tcp_server( } #[cfg(unix)] -#[tracing::instrument(skip(subsystem, settings, repo))] +#[tracing::instrument(skip_all)] pub fn create_unix_socket( subsystem: SubsystemHandle, path: std::path::PathBuf, - repo_path: PathBuf, - settings: Settings, - repo: Repo, + shared_data: SendSyncTypeMap, ) -> RepoResult> { use std::fs; use tokio::net::UnixListener; @@ -77,9 +69,7 @@ pub fn create_unix_socket( .spawn(async move { get_builder::(path) .insert::(subsystem) - .insert::(Arc::new(repo)) - .insert::(settings) - .insert::(repo_path) + .insert_all(shared_data) .insert::(Default::default()) .build_server() .await diff --git a/mediarepo-daemon/mediarepo-socket/src/namespaces/jobs.rs b/mediarepo-daemon/mediarepo-socket/src/namespaces/jobs.rs index c912b70..d86b3f1 100644 --- a/mediarepo-daemon/mediarepo-socket/src/namespaces/jobs.rs +++ b/mediarepo-daemon/mediarepo-socket/src/namespaces/jobs.rs @@ -1,11 +1,15 @@ +use crate::TypeMap; use mediarepo_core::bromine::prelude::*; use mediarepo_core::error::RepoResult; use mediarepo_core::mediarepo_api::types::jobs::{JobType, RunJobRequest}; -use mediarepo_core::mediarepo_api::types::repo::SizeType; -use mediarepo_core::type_keys::SizeMetadataKey; -use mediarepo_logic::dao::DaoProvider; +use mediarepo_core::type_keys::{RepoPathKey, SettingsKey, SizeMetadataKey}; +use mediarepo_worker::handle::JobState; +use mediarepo_worker::job_dispatcher::JobDispatcher; +use mediarepo_worker::jobs::{ + CalculateSizesJob, CheckIntegrityJob, GenerateMissingThumbsJob, Job, MigrateCDsJob, VacuumJob, +}; -use crate::utils::{calculate_size, get_repo_from_context}; +use crate::utils::get_job_dispatcher_from_context; pub struct JobsNamespace; @@ -25,7 +29,7 @@ impl JobsNamespace { #[tracing::instrument(skip_all)] pub async fn run_job(ctx: &Context, event: Event) -> IPCResult { let run_request = event.payload::()?; - let job_dao = get_repo_from_context(ctx).await.job(); + let dispatcher = get_job_dispatcher_from_context(ctx).await; if !run_request.sync { // early response to indicate that the job will be run @@ -33,26 +37,69 @@ impl JobsNamespace { } match run_request.job_type { - JobType::MigrateContentDescriptors => job_dao.migrate_content_descriptors().await?, + JobType::MigrateContentDescriptors => { + dispatch_job(&dispatcher, MigrateCDsJob::default(), run_request.sync).await? + } JobType::CalculateSizes => calculate_all_sizes(ctx).await?, - JobType::CheckIntegrity => job_dao.check_integrity().await?, - JobType::Vacuum => job_dao.vacuum().await?, - JobType::GenerateThumbnails => job_dao.generate_missing_thumbnails().await?, + JobType::CheckIntegrity => { + dispatch_job(&dispatcher, CheckIntegrityJob::default(), run_request.sync).await? + } + JobType::Vacuum => { + dispatch_job(&dispatcher, VacuumJob::default(), run_request.sync).await? + } + JobType::GenerateThumbnails => { + dispatch_job( + &dispatcher, + GenerateMissingThumbsJob::default(), + run_request.sync, + ) + .await? + } } Ok(Response::empty()) } } +async fn dispatch_job( + dispatcher: &JobDispatcher, + job: J, + sync: bool, +) -> RepoResult<()> { + let mut handle = if let Some(handle) = dispatcher.get_handle::().await { + if handle.state().await == JobState::Running { + handle + } else { + dispatcher.dispatch(job).await + } + } else { + dispatcher.dispatch(job).await + }; + if sync { + if let Some(result) = handle.take_result().await { + result?; + } + } + Ok(()) +} + async fn calculate_all_sizes(ctx: &Context) -> RepoResult<()> { - let size_types = vec![ - SizeType::Total, - SizeType::FileFolder, - SizeType::ThumbFolder, - SizeType::DatabaseFile, - ]; - for size_type in size_types { - let size = calculate_size(&size_type, ctx).await?; + let (repo_path, settings) = { + let data = ctx.data.read().await; + ( + data.get::().unwrap().clone(), + data.get::().unwrap().clone(), + ) + }; + let job = CalculateSizesJob::new(repo_path, settings); + let dispatcher = get_job_dispatcher_from_context(ctx).await; + let handle = dispatcher.dispatch(job).await; + let mut rx = { + let status = handle.status().read().await; + status.sizes_channel.subscribe() + }; + + while let Ok((size_type, size)) = rx.recv().await { let mut data = ctx.data.write().await; let size_map = data.get_mut::().unwrap(); size_map.insert(size_type, size); diff --git a/mediarepo-daemon/mediarepo-socket/src/namespaces/repo.rs b/mediarepo-daemon/mediarepo-socket/src/namespaces/repo.rs index 953f0a3..05d605f 100644 --- a/mediarepo-daemon/mediarepo-socket/src/namespaces/repo.rs +++ b/mediarepo-daemon/mediarepo-socket/src/namespaces/repo.rs @@ -2,13 +2,14 @@ use std::path::PathBuf; use tokio::fs; +use crate::TypeMap; use mediarepo_core::bromine::prelude::*; use mediarepo_core::mediarepo_api::types::repo::{ FrontendState, RepositoryMetadata, SizeMetadata, SizeType, }; use mediarepo_core::type_keys::{RepoPathKey, SettingsKey, SizeMetadataKey}; -use crate::utils::{calculate_size, get_repo_from_context}; +use crate::utils::get_repo_from_context; pub struct RepoNamespace; @@ -56,7 +57,7 @@ impl RepoNamespace { let size = if let Some(size) = size_cache.get(&size_type) { *size } else { - calculate_size(&size_type, ctx).await? + 0 }; ctx.response(SizeMetadata { size, size_type }) diff --git a/mediarepo-daemon/mediarepo-socket/src/utils.rs b/mediarepo-daemon/mediarepo-socket/src/utils.rs index 5e48cdc..2c3fc57 100644 --- a/mediarepo-daemon/mediarepo-socket/src/utils.rs +++ b/mediarepo-daemon/mediarepo-socket/src/utils.rs @@ -1,18 +1,15 @@ use std::sync::Arc; -use tokio::fs; - +use crate::TypeMap; use mediarepo_core::bromine::ipc::context::Context; use mediarepo_core::content_descriptor::decode_content_descriptor; use mediarepo_core::error::{RepoError, RepoResult}; use mediarepo_core::mediarepo_api::types::identifier::FileIdentifier; -use mediarepo_core::mediarepo_api::types::repo::SizeType; -use mediarepo_core::type_keys::{RepoPathKey, SettingsKey}; -use mediarepo_core::utils::get_folder_size; -use mediarepo_logic::dao::DaoProvider; use mediarepo_logic::dao::repo::Repo; +use mediarepo_logic::dao::DaoProvider; use mediarepo_logic::dto::FileDto; use mediarepo_logic::type_keys::RepoKey; +use mediarepo_worker::job_dispatcher::{DispatcherKey, JobDispatcher}; pub async fn get_repo_from_context(ctx: &Context) -> Arc { let data = ctx.data.read().await; @@ -20,6 +17,11 @@ pub async fn get_repo_from_context(ctx: &Context) -> Arc { Arc::clone(repo) } +pub async fn get_job_dispatcher_from_context(ctx: &Context) -> JobDispatcher { + let data = ctx.data.read().await; + data.get::().unwrap().clone() +} + pub async fn file_by_identifier(identifier: FileIdentifier, repo: &Repo) -> RepoResult { let file = match identifier { FileIdentifier::ID(id) => repo.file().by_id(id).await, @@ -41,27 +43,3 @@ pub async fn cd_by_identifier(identifier: FileIdentifier, repo: &Repo) -> RepoRe FileIdentifier::CD(cd) => decode_content_descriptor(cd), } } - -pub async fn calculate_size(size_type: &SizeType, ctx: &Context) -> RepoResult { - let repo = get_repo_from_context(ctx).await; - let (repo_path, settings) = { - let data = ctx.data.read().await; - ( - data.get::().unwrap().clone(), - data.get::().unwrap().clone(), - ) - }; - let size = match &size_type { - SizeType::Total => get_folder_size(repo_path).await?, - SizeType::FileFolder => repo.get_main_store_size().await?, - SizeType::ThumbFolder => repo.get_thumb_store_size().await?, - SizeType::DatabaseFile => { - let db_path = settings.paths.db_file_path(&repo_path); - - let database_metadata = fs::metadata(db_path).await?; - database_metadata.len() - } - }; - - Ok(size) -} diff --git a/mediarepo-daemon/mediarepo-worker/Cargo.toml b/mediarepo-daemon/mediarepo-worker/Cargo.toml new file mode 100644 index 0000000..abebc5e --- /dev/null +++ b/mediarepo-daemon/mediarepo-worker/Cargo.toml @@ -0,0 +1,31 @@ +[package] +name = "mediarepo-worker" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +async-trait = "0.1.52" +tracing = "0.1.32" + +[dependencies.mediarepo-core] +path = "../mediarepo-core" + +[dependencies.mediarepo-logic] +path = "../mediarepo-logic" + +[dependencies.mediarepo-database] +path = "../mediarepo-database" + +[dependencies.tokio] +version = "1.17.0" +features = ["macros"] + +[dependencies.chrono] +version = "0.4.19" +features = ["serde"] + +[dependencies.serde] +version = "1.0.136" +features = ["derive"] diff --git a/mediarepo-daemon/mediarepo-worker/src/handle.rs b/mediarepo-daemon/mediarepo-worker/src/handle.rs new file mode 100644 index 0000000..b9929ea --- /dev/null +++ b/mediarepo-daemon/mediarepo-worker/src/handle.rs @@ -0,0 +1,101 @@ +use mediarepo_core::error::{RepoError, RepoResult}; +use std::ops::{Deref, DerefMut}; +use std::sync::Arc; +use tokio::sync::broadcast::{Receiver, Sender}; +use tokio::sync::RwLock; + +pub struct JobHandle { + status: Arc>, + state: Arc>, + result_receiver: CloneableReceiver>>>>, +} + +impl Clone for JobHandle { + fn clone(&self) -> Self { + Self { + status: self.status.clone(), + state: self.state.clone(), + result_receiver: self.result_receiver.clone(), + } + } +} + +impl JobHandle { + pub fn new( + status: Arc>, + state: Arc>, + result_receiver: CloneableReceiver>>>>, + ) -> Self { + Self { + status, + state, + result_receiver, + } + } + + pub async fn state(&self) -> JobState { + *self.state.read().await + } + + pub fn status(&self) -> &Arc> { + &self.status + } + + pub async fn result(&mut self) -> Arc>>> { + match self.result_receiver.recv().await { + Ok(v) => v, + Err(e) => Arc::new(RwLock::new(Some(Err(RepoError::from(&*e.to_string()))))), + } + } + + pub async fn take_result(&mut self) -> Option> { + let shared_result = self.result().await; + let mut result = shared_result.write().await; + result.take() + } +} + +#[derive(Clone, Copy, Debug, Ord, PartialOrd, Eq, PartialEq)] +pub enum JobState { + Queued, + Scheduled, + Running, + Finished, +} + +pub struct CloneableReceiver { + receiver: Receiver, + sender: Sender, +} + +impl CloneableReceiver { + pub fn new(sender: Sender) -> Self { + Self { + receiver: sender.subscribe(), + sender, + } + } +} + +impl Clone for CloneableReceiver { + fn clone(&self) -> Self { + Self { + sender: self.sender.clone(), + receiver: self.sender.subscribe(), + } + } +} + +impl Deref for CloneableReceiver { + type Target = Receiver; + + fn deref(&self) -> &Self::Target { + &self.receiver + } +} + +impl DerefMut for CloneableReceiver { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.receiver + } +} diff --git a/mediarepo-daemon/mediarepo-worker/src/job_dispatcher.rs b/mediarepo-daemon/mediarepo-worker/src/job_dispatcher.rs new file mode 100644 index 0000000..2c74c02 --- /dev/null +++ b/mediarepo-daemon/mediarepo-worker/src/job_dispatcher.rs @@ -0,0 +1,137 @@ +use crate::handle::{CloneableReceiver, JobHandle, JobState}; +use crate::jobs::{Job, JobTypeKey}; +use mediarepo_core::tokio_graceful_shutdown::SubsystemHandle; +use mediarepo_core::trait_bound_typemap::{SendSyncTypeMap, TypeMap, TypeMapKey}; +use mediarepo_logic::dao::repo::Repo; +use mediarepo_logic::dao::DaoProvider; +use std::sync::Arc; +use std::time::Duration; +use tokio::sync::broadcast::channel; +use tokio::sync::RwLock; +use tokio::time::Instant; + +#[derive(Clone)] +pub struct JobDispatcher { + subsystem: SubsystemHandle, + job_handle_map: Arc>, + repo: Arc, +} + +impl JobDispatcher { + pub fn new(subsystem: SubsystemHandle, repo: Repo) -> Self { + Self { + job_handle_map: Arc::new(RwLock::new(SendSyncTypeMap::new())), + subsystem, + repo: Arc::new(repo), + } + } + + pub async fn dispatch(&self, job: T) -> JobHandle { + self._dispatch(job, None).await + } + + pub async fn dispatch_periodically( + &self, + job: T, + interval: Duration, + ) -> JobHandle { + self._dispatch(job, Some(interval)).await + } + + #[tracing::instrument(level = "debug", skip_all)] + async fn _dispatch( + &self, + job: T, + interval: Option, + ) -> JobHandle { + let status = job.status(); + let state = Arc::new(RwLock::new(JobState::Queued)); + let (sender, mut receiver) = channel(1); + self.subsystem + .start("channel-consumer", move |subsystem| async move { + tokio::select! { + _ = receiver.recv() => (), + _ = subsystem.on_shutdown_requested() => (), + } + Ok(()) + }); + let receiver = CloneableReceiver::new(sender.clone()); + let handle = JobHandle::new(status.clone(), state.clone(), receiver); + self.add_handle::(handle.clone()).await; + + let repo = self.repo.clone(); + + self.subsystem + .start("worker-job", move |subsystem| async move { + loop { + let start = Instant::now(); + let job_2 = job.clone(); + { + let mut state = state.write().await; + *state = JobState::Running; + } + if let Err(e) = job.load_state(repo.job()).await { + tracing::error!("failed to load the jobs state: {}", e); + } + let result = tokio::select! { + _ = subsystem.on_shutdown_requested() => { + job_2.save_state(repo.job()).await + } + r = job.run(repo.clone()) => { + match r { + Err(e) => Err(e), + Ok(v) => { + let _ = sender.send(Arc::new(RwLock::new(Some(Ok(v))))); + job.save_state(repo.job()).await + } + } + } + }; + if let Err(e) = result { + tracing::error!("job failed with error: {}", e); + let _ = sender.send(Arc::new(RwLock::new(Some(Err(e))))); + } + if let Some(interval) = interval { + { + let mut state = state.write().await; + *state = JobState::Scheduled; + } + let sleep_duration = interval - start.elapsed(); + tokio::select! { + _ = tokio::time::sleep(sleep_duration) => {}, + _ = subsystem.on_shutdown_requested() => {break} + } + } else { + let mut state = state.write().await; + *state = JobState::Finished; + break; + } + } + + Ok(()) + }); + + handle + } + + #[inline] + async fn add_handle(&self, handle: JobHandle) { + let mut status_map = self.job_handle_map.write().await; + status_map.insert::>(handle); + } + + #[inline] + pub async fn get_handle(&self) -> Option> { + let map = self.job_handle_map.read().await; + map.get::>().cloned() + } +} + +pub struct DispatcherKey; + +impl TypeMapKey for DispatcherKey { + type Value = JobDispatcher; +} + +unsafe impl Send for JobDispatcher {} +unsafe impl Sync for JobDispatcher {} diff --git a/mediarepo-daemon/mediarepo-worker/src/jobs/calculate_sizes.rs b/mediarepo-daemon/mediarepo-worker/src/jobs/calculate_sizes.rs new file mode 100644 index 0000000..d9ce393 --- /dev/null +++ b/mediarepo-daemon/mediarepo-worker/src/jobs/calculate_sizes.rs @@ -0,0 +1,91 @@ +use crate::jobs::Job; +use crate::status_utils::SimpleProgress; +use async_trait::async_trait; +use mediarepo_core::error::{RepoError, RepoResult}; +use mediarepo_core::mediarepo_api::types::repo::SizeType; +use mediarepo_core::settings::Settings; +use mediarepo_core::utils::get_folder_size; +use mediarepo_logic::dao::repo::Repo; +use std::path::PathBuf; +use std::sync::Arc; +use tokio::fs; +use tokio::sync::broadcast::{self, Sender}; +use tokio::sync::RwLock; + +pub struct CalculateSizesState { + pub progress: SimpleProgress, + pub sizes_channel: Sender<(SizeType, u64)>, +} + +#[derive(Clone)] +pub struct CalculateSizesJob { + repo_path: PathBuf, + settings: Settings, + state: Arc>, +} + +impl CalculateSizesJob { + pub fn new(repo_path: PathBuf, settings: Settings) -> Self { + let (tx, _) = broadcast::channel(4); + Self { + repo_path, + settings, + state: Arc::new(RwLock::new(CalculateSizesState { + sizes_channel: tx, + progress: SimpleProgress::new(4), + })), + } + } +} + +#[async_trait] +impl Job for CalculateSizesJob { + type JobStatus = CalculateSizesState; + type Result = (); + + fn status(&self) -> Arc> { + self.state.clone() + } + + #[tracing::instrument(level = "debug", skip_all)] + async fn run(&self, repo: Arc) -> RepoResult<()> { + let size_types = vec![ + SizeType::Total, + SizeType::FileFolder, + SizeType::ThumbFolder, + SizeType::DatabaseFile, + ]; + for size_type in size_types { + let size = calculate_size(&size_type, &repo, &self.repo_path, &self.settings).await?; + let mut state = self.state.write().await; + state + .sizes_channel + .send((size_type, size)) + .map_err(|_| RepoError::from("failed to broadcast new size"))?; + state.progress.tick(); + } + + Ok(()) + } +} + +async fn calculate_size( + size_type: &SizeType, + repo: &Repo, + repo_path: &PathBuf, + settings: &Settings, +) -> RepoResult { + let size = match &size_type { + SizeType::Total => get_folder_size(repo_path.clone()).await?, + SizeType::FileFolder => repo.get_main_store_size().await?, + SizeType::ThumbFolder => repo.get_thumb_store_size().await?, + SizeType::DatabaseFile => { + let db_path = settings.paths.db_file_path(repo_path); + + let database_metadata = fs::metadata(db_path).await?; + database_metadata.len() + } + }; + + Ok(size) +} diff --git a/mediarepo-daemon/mediarepo-worker/src/jobs/check_integrity.rs b/mediarepo-daemon/mediarepo-worker/src/jobs/check_integrity.rs new file mode 100644 index 0000000..8ef6d65 --- /dev/null +++ b/mediarepo-daemon/mediarepo-worker/src/jobs/check_integrity.rs @@ -0,0 +1,32 @@ +use crate::jobs::Job; +use crate::status_utils::SimpleProgress; +use async_trait::async_trait; +use mediarepo_core::error::RepoResult; +use mediarepo_logic::dao::repo::Repo; +use mediarepo_logic::dao::DaoProvider; +use std::sync::Arc; +use tokio::sync::RwLock; + +#[derive(Clone, Default)] +pub struct CheckIntegrityJob { + progress: Arc>, +} + +#[async_trait] +impl Job for CheckIntegrityJob { + type JobStatus = SimpleProgress; + type Result = (); + + fn status(&self) -> Arc> { + self.progress.clone() + } + + async fn run(&self, repo: Arc) -> RepoResult { + repo.job().check_integrity().await?; + { + let mut progress = self.progress.write().await; + progress.set_total(100); + } + Ok(()) + } +} diff --git a/mediarepo-daemon/mediarepo-worker/src/jobs/generate_missing_thumbnails.rs b/mediarepo-daemon/mediarepo-worker/src/jobs/generate_missing_thumbnails.rs new file mode 100644 index 0000000..9eee427 --- /dev/null +++ b/mediarepo-daemon/mediarepo-worker/src/jobs/generate_missing_thumbnails.rs @@ -0,0 +1,109 @@ +use crate::jobs::{deserialize_state, serialize_state, Job}; +use crate::status_utils::SimpleProgress; +use async_trait::async_trait; +use mediarepo_core::error::RepoResult; +use mediarepo_core::thumbnailer::ThumbnailSize; +use mediarepo_database::entities::job_state::JobType; +use mediarepo_logic::dao::job::JobDao; +use mediarepo_logic::dao::repo::Repo; +use mediarepo_logic::dao::DaoProvider; +use serde::{Deserialize, Serialize}; +use std::mem; +use std::sync::Arc; +use std::time::{Duration, SystemTime}; +use tokio::sync::RwLock; + +#[derive(Clone, Default)] +pub struct GenerateMissingThumbsJob { + state: Arc>, + inner_state: Arc>, +} + +#[async_trait] +impl Job for GenerateMissingThumbsJob { + type JobStatus = SimpleProgress; + type Result = (); + + fn status(&self) -> Arc> { + self.state.clone() + } + + async fn load_state(&self, job_dao: JobDao) -> RepoResult<()> { + if let Some(state) = job_dao.state_for_job_type(JobType::GenerateThumbs).await? { + let mut inner_state = self.inner_state.write().await; + let state = deserialize_state::(state)?; + let _ = mem::replace(&mut *inner_state, state); + } + + Ok(()) + } + + async fn run(&self, repo: Arc) -> RepoResult<()> { + if !self.needs_generation(&repo).await? { + return Ok(()); + } + let file_dao = repo.file(); + let all_files = file_dao.all().await?; + { + let mut progress = self.state.write().await; + progress.set_total(all_files.len() as u64); + } + + for file in all_files { + if file_dao.thumbnails(file.encoded_cd()).await?.is_empty() { + let _ = file_dao + .create_thumbnails(&file, vec![ThumbnailSize::Medium]) + .await; + } + { + let mut progress = self.state.write().await; + progress.tick(); + } + } + + self.refresh_state(&repo).await?; + + Ok(()) + } + + async fn save_state(&self, job_dao: JobDao) -> RepoResult<()> { + let state = self.inner_state.read().await; + let state = serialize_state(JobType::GenerateThumbs, &*state)?; + job_dao.upsert_state(state).await + } +} + +impl GenerateMissingThumbsJob { + async fn needs_generation(&self, repo: &Repo) -> RepoResult { + let repo_counts = repo.get_counts().await?; + let file_count = repo_counts.file_count as u64; + let state = self.inner_state.read().await; + + Ok(state.file_count != file_count + || state.last_run.elapsed().unwrap() > Duration::from_secs(60 * 60)) + } + + async fn refresh_state(&self, repo: &Repo) -> RepoResult<()> { + let repo_counts = repo.get_counts().await?; + let mut state = self.inner_state.write().await; + state.last_run = SystemTime::now(); + state.file_count = repo_counts.file_count as u64; + + Ok(()) + } +} + +#[derive(Serialize, Deserialize)] +struct GenerateThumbsState { + file_count: u64, + last_run: SystemTime, +} + +impl Default for GenerateThumbsState { + fn default() -> Self { + Self { + file_count: 0, + last_run: SystemTime::now(), + } + } +} diff --git a/mediarepo-daemon/mediarepo-worker/src/jobs/migrate_content_descriptors.rs b/mediarepo-daemon/mediarepo-worker/src/jobs/migrate_content_descriptors.rs new file mode 100644 index 0000000..e320371 --- /dev/null +++ b/mediarepo-daemon/mediarepo-worker/src/jobs/migrate_content_descriptors.rs @@ -0,0 +1,65 @@ +use crate::jobs::{deserialize_state, serialize_state, Job}; +use crate::status_utils::SimpleProgress; +use async_trait::async_trait; +use mediarepo_core::error::RepoResult; +use mediarepo_database::entities::job_state::JobType; +use mediarepo_logic::dao::job::JobDao; +use mediarepo_logic::dao::repo::Repo; +use mediarepo_logic::dao::DaoProvider; +use serde::{Deserialize, Serialize}; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use tokio::sync::RwLock; + +#[derive(Clone, Default)] +pub struct MigrateCDsJob { + progress: Arc>, + migrated: Arc, +} + +#[async_trait] +impl Job for MigrateCDsJob { + type JobStatus = SimpleProgress; + type Result = (); + + fn status(&self) -> Arc> { + self.progress.clone() + } + + async fn load_state(&self, job_dao: JobDao) -> RepoResult<()> { + if let Some(state) = job_dao.state_for_job_type(JobType::MigrateCDs).await? { + let state = deserialize_state::(state)?; + self.migrated.store(state.migrated, Ordering::SeqCst); + } + + Ok(()) + } + + async fn run(&self, repo: Arc) -> RepoResult { + if self.migrated.load(Ordering::SeqCst) { + return Ok(()); + } + let job_dao = repo.job(); + + job_dao.migrate_content_descriptors().await?; + self.migrated.store(true, Ordering::Relaxed); + { + let mut progress = self.progress.write().await; + progress.set_total(100); + } + Ok(()) + } + + async fn save_state(&self, job_dao: JobDao) -> RepoResult<()> { + if self.migrated.load(Ordering::Relaxed) { + let state = serialize_state(JobType::MigrateCDs, &MigrationStatus { migrated: true })?; + job_dao.upsert_state(state).await?; + } + Ok(()) + } +} + +#[derive(Serialize, Deserialize)] +struct MigrationStatus { + pub migrated: bool, +} diff --git a/mediarepo-daemon/mediarepo-worker/src/jobs/mod.rs b/mediarepo-daemon/mediarepo-worker/src/jobs/mod.rs new file mode 100644 index 0000000..846a39e --- /dev/null +++ b/mediarepo-daemon/mediarepo-worker/src/jobs/mod.rs @@ -0,0 +1,71 @@ +mod calculate_sizes; +mod check_integrity; +mod generate_missing_thumbnails; +mod migrate_content_descriptors; +mod vacuum; + +pub use calculate_sizes::*; +pub use check_integrity::*; +pub use generate_missing_thumbnails::*; +pub use migrate_content_descriptors::*; +use std::marker::PhantomData; +use std::sync::Arc; +pub use vacuum::*; + +use crate::handle::JobHandle; +use async_trait::async_trait; +use mediarepo_core::bincode; +use mediarepo_core::error::{RepoError, RepoResult}; +use mediarepo_core::trait_bound_typemap::TypeMapKey; +use mediarepo_database::entities::job_state::JobType; +use mediarepo_logic::dao::job::JobDao; +use mediarepo_logic::dao::repo::Repo; +use mediarepo_logic::dto::{JobStateDto, UpsertJobStateDto}; +use serde::de::DeserializeOwned; +use serde::Serialize; +use tokio::sync::RwLock; + +type EmptyStatus = Arc>; + +#[async_trait] +pub trait Job: Clone + Send + Sync { + type JobStatus: Send + Sync; + type Result: Send + Sync; + + fn status(&self) -> Arc>; + + async fn load_state(&self, _job_dao: JobDao) -> RepoResult<()> { + Ok(()) + } + + async fn run(&self, repo: Arc) -> RepoResult; + + async fn save_state(&self, _job_dao: JobDao) -> RepoResult<()> { + Ok(()) + } +} + +pub struct JobTypeKey(PhantomData); + +impl TypeMapKey for JobTypeKey +where + T: Job, +{ + type Value = JobHandle; +} + +pub fn deserialize_state(dto: JobStateDto) -> RepoResult { + bincode::deserialize(dto.value()).map_err(RepoError::from) +} + +pub fn serialize_state( + job_type: JobType, + state: &T, +) -> RepoResult { + let dto = UpsertJobStateDto { + value: bincode::serialize(state)?, + job_type, + }; + + Ok(dto) +} diff --git a/mediarepo-daemon/mediarepo-worker/src/jobs/vacuum.rs b/mediarepo-daemon/mediarepo-worker/src/jobs/vacuum.rs new file mode 100644 index 0000000..60bc275 --- /dev/null +++ b/mediarepo-daemon/mediarepo-worker/src/jobs/vacuum.rs @@ -0,0 +1,27 @@ +use crate::jobs::{EmptyStatus, Job}; +use async_trait::async_trait; +use mediarepo_core::error::RepoResult; +use mediarepo_logic::dao::repo::Repo; +use mediarepo_logic::dao::DaoProvider; +use std::sync::Arc; +use tokio::sync::RwLock; + +#[derive(Default, Clone)] +pub struct VacuumJob; + +#[async_trait] +impl Job for VacuumJob { + type JobStatus = (); + type Result = (); + + fn status(&self) -> Arc> { + EmptyStatus::default() + } + + #[tracing::instrument(level = "debug", skip_all)] + async fn run(&self, repo: Arc) -> RepoResult<()> { + repo.job().vacuum().await?; + + Ok(()) + } +} diff --git a/mediarepo-daemon/mediarepo-worker/src/lib.rs b/mediarepo-daemon/mediarepo-worker/src/lib.rs new file mode 100644 index 0000000..1b696f5 --- /dev/null +++ b/mediarepo-daemon/mediarepo-worker/src/lib.rs @@ -0,0 +1,39 @@ +use crate::job_dispatcher::JobDispatcher; +use crate::jobs::{CheckIntegrityJob, MigrateCDsJob, VacuumJob}; +use mediarepo_core::error::RepoError; +use mediarepo_core::tokio_graceful_shutdown::Toplevel; +use mediarepo_logic::dao::repo::Repo; +use std::time::Duration; +use tokio::sync::oneshot::channel; + +pub mod handle; +pub mod job_dispatcher; +pub mod jobs; +pub mod status_utils; + +pub async fn start(top_level: Toplevel, repo: Repo) -> (Toplevel, JobDispatcher) { + let (tx, rx) = channel(); + + let top_level = top_level.start("mediarepo-worker", |subsystem| async move { + let dispatcher = JobDispatcher::new(subsystem, repo); + tx.send(dispatcher.clone()) + .map_err(|_| RepoError::from("failed to send dispatcher"))?; + dispatcher + .dispatch_periodically(VacuumJob::default(), Duration::from_secs(60 * 30)) + .await; + dispatcher + .dispatch_periodically( + CheckIntegrityJob::default(), + Duration::from_secs(60 * 60 * 24), + ) + .await; + dispatcher.dispatch(MigrateCDsJob::default()).await; + + Ok(()) + }); + let receiver = rx + .await + .expect("failed to create background job dispatcher"); + + (top_level, receiver) +} diff --git a/mediarepo-daemon/mediarepo-worker/src/status_utils.rs b/mediarepo-daemon/mediarepo-worker/src/status_utils.rs new file mode 100644 index 0000000..0045cc7 --- /dev/null +++ b/mediarepo-daemon/mediarepo-worker/src/status_utils.rs @@ -0,0 +1,39 @@ +pub struct SimpleProgress { + pub current: u64, + pub total: u64, +} + +impl Default for SimpleProgress { + fn default() -> Self { + Self { + total: 100, + current: 0, + } + } +} + +impl SimpleProgress { + pub fn new(total: u64) -> Self { + Self { total, current: 0 } + } + + /// Sets the total count + pub fn set_total(&mut self, total: u64) { + self.total = total; + } + + /// Increments the current progress by 1 + pub fn tick(&mut self) { + self.current += 1; + } + + /// Sets the current progress to a defined value + pub fn set_current(&mut self, current: u64) { + self.current = current; + } + + /// Returns the total progress in percent + pub fn percent(&self) -> f64 { + (self.current as f64) / (self.total as f64) + } +} diff --git a/mediarepo-daemon/src/main.rs b/mediarepo-daemon/src/main.rs index f633bb4..d28a49c 100644 --- a/mediarepo-daemon/src/main.rs +++ b/mediarepo-daemon/src/main.rs @@ -1,5 +1,7 @@ use std::env; +use std::iter::FromIterator; use std::path::PathBuf; +use std::sync::Arc; use std::time::Duration; use structopt::StructOpt; @@ -10,8 +12,12 @@ use mediarepo_core::error::RepoResult; use mediarepo_core::fs::drop_file::DropFile; use mediarepo_core::settings::{PathSettings, Settings}; use mediarepo_core::tokio_graceful_shutdown::{SubsystemHandle, Toplevel}; +use mediarepo_core::trait_bound_typemap::{CloneSendSyncTypeMap, SendSyncTypeMap, TypeMap}; +use mediarepo_core::type_keys::{RepoPathKey, SettingsKey}; use mediarepo_logic::dao::repo::Repo; +use mediarepo_logic::type_keys::RepoKey; use mediarepo_socket::start_tcp_server; +use mediarepo_worker::job_dispatcher::DispatcherKey; use crate::utils::{create_paths_for_repo, get_repo, load_settings}; @@ -99,18 +105,28 @@ async fn init_repo(opt: &Opt, paths: &PathSettings) -> RepoResult { /// Starts the server async fn start_server(opt: Opt, settings: Settings) -> RepoResult<()> { let repo = init_repo(&opt, &settings.paths).await?; - let mut top_level = Toplevel::new(); + let (mut top_level, dispatcher) = mediarepo_worker::start(Toplevel::new(), repo.clone()).await; + + let mut shared_data = CloneSendSyncTypeMap::new(); + shared_data.insert::(Arc::new(repo)); + shared_data.insert::(settings.clone()); + shared_data.insert::(opt.repo.clone()); + shared_data.insert::(dispatcher); #[cfg(unix)] { if settings.server.unix_socket.enabled { - let settings = settings.clone(); let repo_path = opt.repo.clone(); - let repo = repo.clone(); + let shared_data = shared_data.clone(); top_level = top_level.start("mediarepo-unix-socket", |subsystem| { Box::pin(async move { - start_and_await_unix_socket(subsystem, repo_path, settings, repo).await?; + start_and_await_unix_socket( + subsystem, + repo_path, + SendSyncTypeMap::from_iter(shared_data), + ) + .await?; Ok(()) }) }) @@ -120,7 +136,13 @@ async fn start_server(opt: Opt, settings: Settings) -> RepoResult<()> { if settings.server.tcp.enabled { top_level = top_level.start("mediarepo-tcp", move |subsystem| { Box::pin(async move { - start_and_await_tcp_server(subsystem, opt.repo, settings, repo).await?; + start_and_await_tcp_server( + subsystem, + opt.repo, + settings, + SendSyncTypeMap::from_iter(shared_data), + ) + .await?; Ok(()) }) @@ -147,9 +169,9 @@ async fn start_and_await_tcp_server( subsystem: SubsystemHandle, repo_path: PathBuf, settings: Settings, - repo: Repo, + shared_data: SendSyncTypeMap, ) -> RepoResult<()> { - let (address, handle) = start_tcp_server(subsystem.clone(), repo_path.clone(), settings, repo)?; + let (address, handle) = start_tcp_server(subsystem.clone(), settings, shared_data)?; let (mut file, _guard) = DropFile::new(repo_path.join("repo.tcp")).await?; file.write_all(&address.into_bytes()).await?; @@ -172,17 +194,10 @@ async fn start_and_await_tcp_server( async fn start_and_await_unix_socket( subsystem: SubsystemHandle, repo_path: PathBuf, - settings: Settings, - repo: Repo, + shared_data: SendSyncTypeMap, ) -> RepoResult<()> { let socket_path = repo_path.join("repo.sock"); - let handle = mediarepo_socket::create_unix_socket( - subsystem.clone(), - socket_path, - repo_path.clone(), - settings, - repo, - )?; + let handle = mediarepo_socket::create_unix_socket(subsystem.clone(), socket_path, shared_data)?; let _guard = DropFile::from_path(repo_path.join("repo.sock")); tokio::select! {