Merge pull request #21 from Trivernis/feature/jobs

Feature/jobs
pull/22/head
Julius Riegel 3 years ago committed by GitHub
commit 1047320c7c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -7,20 +7,20 @@ license = "gpl-3"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
tracing = "0.1.30"
tracing = "0.1.32"
thiserror = "1.0.30"
async-trait = { version = "0.1.52", optional = true }
parking_lot = { version = "0.12.0", optional = true }
serde_json = { version = "1.0.78", optional = true }
serde_json = { version = "1.0.79", optional = true }
directories = { version = "4.0.1", optional = true }
mime_guess = { version = "2.0.3", optional = true }
mime_guess = { version = "2.0.4", optional = true }
serde_piecewise_default = "0.2.0"
futures = { version = "0.3.21", optional = true }
url = { version = "2.2.2", optional = true }
pathsearch = { version = "0.2.0", optional = true }
[dependencies.bromine]
version = "0.18.1"
version = "0.19.0"
optional = true
features = ["serialize_bincode"]
@ -34,12 +34,12 @@ features = ["serde"]
[dependencies.tauri]
version = "1.0.0-rc.4"
optional=true
optional = true
default-features = false
features = []
[dependencies.tokio]
version = "1.16.1"
version = "1.17.0"
optional = true
features = ["sync", "fs", "net", "io-util", "io-std", "time", "rt", "process"]

@ -63,9 +63,9 @@ dependencies = [
[[package]]
name = "anyhow"
version = "1.0.55"
version = "1.0.56"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "159bb86af3a200e19a068f4224eae4c8bb2d0fa054c7e5d1cacd5cef95e684cd"
checksum = "4361135be9122e0870de935d7c439aef945b9f9ddd4199a553b5270b49c82a27"
[[package]]
name = "arrayref"
@ -81,9 +81,9 @@ checksum = "8da52d66c7071e2e3fa2a1e5c6d088fec47b593032b254f5e980de8ea54454d6"
[[package]]
name = "async-recursion"
version = "0.3.2"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d7d78656ba01f1b93024b7c3a0467f1608e4be67d725749fdcd7d2c7678fd7a2"
checksum = "2cda8f4bcc10624c4e85bc66b3f452cca98cfa5ca002dc83a16aad2367641bea"
dependencies = [
"proc-macro2 1.0.36",
"quote 1.0.15",
@ -270,9 +270,9 @@ dependencies = [
[[package]]
name = "bromine"
version = "0.18.1"
version = "0.19.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5dd7887995490657bf3ec578f39e747ef7b5355a8dc6c99b3d5be59ca70dc4d5"
checksum = "a05cd0cd5646e705df88816dcc36eaf4e21b940cea66f1e027970cd58e3dc897"
dependencies = [
"async-trait",
"bincode",
@ -284,7 +284,7 @@ dependencies = [
"thiserror",
"tokio",
"tracing",
"typemap_rev",
"trait-bound-typemap",
]
[[package]]
@ -301,9 +301,9 @@ checksum = "e3b5ca7a04898ad4bcd41c90c5285445ff5b791899bb1b0abdd2a2aa791211d7"
[[package]]
name = "bytemuck"
version = "1.7.3"
version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "439989e6b8c38d1b6570a384ef1e49c8848128f5a97f3914baef02920842712f"
checksum = "0e851ca7c24871e7336801608a4797d7376545b6928a10d32d75685687141ead"
[[package]]
name = "byteorder"
@ -709,9 +709,9 @@ dependencies = [
[[package]]
name = "flume"
version = "0.10.11"
version = "0.10.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b279436a715a9de95dcd26b151db590a71961cc06e54918b24fe0dd5b7d3fc4"
checksum = "843c03199d0c0ca54bc1ea90ac0d507274c28abcc4f691ae8b4eaa375087c76a"
dependencies = [
"futures-core",
"futures-sink",
@ -885,7 +885,7 @@ dependencies = [
"cfg-if 1.0.0",
"js-sys",
"libc",
"wasi",
"wasi 0.10.0+wasi-snapshot-preview1",
"wasm-bindgen",
]
@ -907,9 +907,9 @@ checksum = "9b919933a397b79c37e33b77bb2aa3dc8eb6e165ad809e58ff75bc7db2e34574"
[[package]]
name = "h2"
version = "0.3.11"
version = "0.3.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d9f1f717ddc7b2ba36df7e871fd88db79326551d3d6f1fc406fbfd28b582ff8e"
checksum = "62eeb471aa3e3c9197aa4bfeabfe02982f6dc96f750486c0bb0009ac58b26d2b"
dependencies = [
"bytes",
"fnv",
@ -1300,6 +1300,7 @@ name = "mediarepo-core"
version = "0.1.0"
dependencies = [
"base64",
"bincode",
"config",
"data-encoding",
"futures 0.3.21",
@ -1318,7 +1319,7 @@ dependencies = [
"toml",
"tracing",
"tracing-subscriber",
"typemap_rev",
"trait-bound-typemap",
]
[[package]]
@ -1331,6 +1332,7 @@ dependencies = [
"mediarepo-core",
"mediarepo-logic",
"mediarepo-socket",
"mediarepo-worker",
"num-integer",
"opentelemetry",
"opentelemetry-jaeger",
@ -1371,7 +1373,6 @@ dependencies = [
"serde",
"tokio",
"tracing",
"typemap_rev",
]
[[package]]
@ -1383,6 +1384,7 @@ dependencies = [
"mediarepo-core",
"mediarepo-database",
"mediarepo-logic",
"mediarepo-worker",
"port_check",
"rayon",
"serde",
@ -1391,6 +1393,20 @@ dependencies = [
"tracing-futures",
]
[[package]]
name = "mediarepo-worker"
version = "0.1.0"
dependencies = [
"async-trait",
"chrono",
"mediarepo-core",
"mediarepo-database",
"mediarepo-logic",
"serde",
"tokio",
"tracing",
]
[[package]]
name = "memchr"
version = "2.4.1"
@ -1449,14 +1465,15 @@ dependencies = [
[[package]]
name = "mio"
version = "0.8.0"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba272f85fa0b41fc91872be579b3bbe0f56b792aa361a380eb669469f68dafb2"
checksum = "7ba42135c6a5917b9db9cd7b293e5409e1c6b041e6f9825e92e55a894c63b6f8"
dependencies = [
"libc",
"log",
"miow",
"ntapi",
"wasi 0.11.0+wasi-snapshot-preview1",
"winapi",
]
@ -1469,6 +1486,12 @@ dependencies = [
"winapi",
]
[[package]]
name = "multi-trait-object"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a54c9ed2b86c7927b63e7d51f8d7ed4e1f8513c8672828ca1a850ff9d32ab1c"
[[package]]
name = "multibase"
version = "0.9.1"
@ -1519,9 +1542,9 @@ checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a"
[[package]]
name = "nanorand"
version = "0.6.1"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "729eb334247daa1803e0a094d0a5c55711b85571179f5ec6e53eccfdf7008958"
checksum = "6a51313c5820b0b02bd422f4b44776fbf47961755c74ce64afc73bfad10226c3"
dependencies = [
"getrandom",
]
@ -1628,18 +1651,18 @@ dependencies = [
[[package]]
name = "num_enum"
version = "0.5.6"
version = "0.5.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "720d3ea1055e4e4574c0c0b0f8c3fd4f24c4cdaf465948206dea090b57b526ad"
checksum = "cf5395665662ef45796a4ff5486c5d41d29e0c09640af4c5f17fd94ee2c119c9"
dependencies = [
"num_enum_derive",
]
[[package]]
name = "num_enum_derive"
version = "0.5.6"
version = "0.5.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0d992b768490d7fe0d8586d9b5745f6c49f557da6d81dc982b1d167ad4edbb21"
checksum = "3b0498641e53dd6ac1a4f22547548caa6864cc4933784319cd1775271c5a46ce"
dependencies = [
"proc-macro-crate",
"proc-macro2 1.0.36",
@ -1649,18 +1672,18 @@ dependencies = [
[[package]]
name = "num_threads"
version = "0.1.3"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "97ba99ba6393e2c3734791401b66902d981cb03bf190af674ca69949b6d5fb15"
checksum = "c539a50b93a303167eded6e8dff5220cd39447409fb659f4cd24b1f72fe4f133"
dependencies = [
"libc",
]
[[package]]
name = "once_cell"
version = "1.9.0"
version = "1.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da32515d9f6e6e489d7bc9d84c71b060db7247dc035bbe44eac88cf87486d8d5"
checksum = "87f3e037eac156d1775da914196f0f37741a274155e34a0b7e427c35d2a2ecb9"
[[package]]
name = "opaque-debug"
@ -1931,9 +1954,9 @@ checksum = "58893f751c9b0412871a09abd62ecd2a00298c6c83befa223ef98c52aef40cbe"
[[package]]
name = "png"
version = "0.17.3"
version = "0.17.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e8f1882177b17c98ec33a51f5910ecbf4db92ca0def706781a1f8d0c661f393"
checksum = "dc38c0ad57efb786dd57b9864e5b18bae478c00c824dc55a38bbc9da95dde3ba"
dependencies = [
"bitflags",
"crc32fast",
@ -2133,18 +2156,18 @@ dependencies = [
[[package]]
name = "redox_syscall"
version = "0.2.10"
version = "0.2.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8383f39639269cde97d255a32bdb68c047337295414940c68bdd30c2e13203ff"
checksum = "8380fe0152551244f0747b1bf41737e0f8a74f97a14ccefd1148187271634f3c"
dependencies = [
"bitflags",
]
[[package]]
name = "regex"
version = "1.5.4"
version = "1.5.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d07a8629359eb56f1e2fb1652bb04212c072a87ba68546a04065d525673ac461"
checksum = "1a11647b6b25ff05a515cb92c365cec08801e83423a235b51e231e1808747286"
dependencies = [
"aho-corasick",
"memchr",
@ -2793,7 +2816,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6db9e6914ab8b1ae1c260a4ae7a49b6c5611b40328a735b21862567685e73255"
dependencies = [
"libc",
"wasi",
"wasi 0.10.0+wasi-snapshot-preview1",
"winapi",
]
@ -2855,16 +2878,16 @@ dependencies = [
[[package]]
name = "tokio-graceful-shutdown"
version = "0.4.3"
version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d08ebea7dc6b22273290d8ece2ca448f979f836e38ba629b650595c64204b4f2"
checksum = "4f21c36e43c82d5f32302aff8ac9efb79e10db9538b0940ef69cce38a01614ae"
dependencies = [
"anyhow",
"async-recursion",
"futures 0.3.21",
"log",
"tokio",
"tokio-util 0.6.9",
"tokio-util 0.7.0",
]
[[package]]
@ -3023,9 +3046,9 @@ checksum = "360dfd1d6d30e05fda32ace2c8c70e9c0a9da713275777f5a4dbb8a1893930c6"
[[package]]
name = "tracing"
version = "0.1.31"
version = "0.1.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f6c650a8ef0cd2dd93736f033d21cbd1224c5a967aa0c258d00fcf7dafef9b9f"
checksum = "4a1bdf54a7c28a2bbf701e1d2233f6c77f473486b94bee4f9678da5a148dca7f"
dependencies = [
"cfg-if 1.0.0",
"log",
@ -3036,9 +3059,9 @@ dependencies = [
[[package]]
name = "tracing-appender"
version = "0.2.0"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94571df2eae3ed4353815ea5a90974a594a1792d8782ff2cbcc9392d1101f366"
checksum = "9ab026b18a46ac429e5c98bec10ca06424a97b3ad7b3949d9b4a102fff6623c4"
dependencies = [
"crossbeam-channel",
"time 0.3.7",
@ -3047,9 +3070,9 @@ dependencies = [
[[package]]
name = "tracing-attributes"
version = "0.1.19"
version = "0.1.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8276d9a4a3a558d7b7ad5303ad50b53d58264641b82914b7ada36bd762e7a716"
checksum = "2e65ce065b4b5c53e73bb28912318cb8c9e9ad3921f1d669eb0e68b4c8143a2b"
dependencies = [
"proc-macro2 1.0.36",
"quote 1.0.15",
@ -3058,9 +3081,9 @@ dependencies = [
[[package]]
name = "tracing-core"
version = "0.1.22"
version = "0.1.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "03cfcb51380632a72d3111cb8d3447a8d908e577d31beeac006f836383d29a23"
checksum = "aa31669fa42c09c34d94d8165dd2012e8ff3c66aca50f3bb226b68f216f2706c"
dependencies = [
"lazy_static",
"valuable",
@ -3144,16 +3167,19 @@ dependencies = [
]
[[package]]
name = "try-lock"
version = "0.2.3"
name = "trait-bound-typemap"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642"
checksum = "3631df5ba73c0e41b1aa337df3bcca7f15219f042f8fec1100857bc1eb60c767"
dependencies = [
"multi-trait-object",
]
[[package]]
name = "typemap_rev"
version = "0.1.5"
name = "try-lock"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ed5b74f0a24b5454580a79abb6994393b09adf0ab8070f15827cb666255de155"
checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642"
[[package]]
name = "typenum"
@ -3289,6 +3315,12 @@ version = "0.10.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1a143597ca7c7793eff794def352d41792a93c481eb1042423ff7ff72ba2c31f"
[[package]]
name = "wasi"
version = "0.11.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
[[package]]
name = "wasm-bindgen"
version = "0.2.79"

@ -1,6 +1,6 @@
[workspace]
members = ["mediarepo-core", "mediarepo-database", "mediarepo-logic", "mediarepo-socket", "."]
default-members = ["mediarepo-core", "mediarepo-database", "mediarepo-logic", "mediarepo-socket", "."]
members = ["mediarepo-core", "mediarepo-database", "mediarepo-logic", "mediarepo-socket", "mediarepo-worker", "."]
default-members = ["mediarepo-core", "mediarepo-database", "mediarepo-logic", "mediarepo-socket", "mediarepo-worker", "."]
[package]
name = "mediarepo-daemon"
@ -16,12 +16,12 @@ name = "mediarepo-daemon"
path = "src/main.rs"
[dependencies]
tracing = "0.1.31"
tracing = "0.1.32"
toml = "0.5.8"
structopt = "0.3.26"
glob = "0.3.0"
tracing-flame = "0.2.0"
tracing-appender = "0.2.0"
tracing-appender = "0.2.1"
tracing-log = "0.1.2"
rolling-file = "0.1.0"
num-integer = "0.1.44"
@ -40,6 +40,9 @@ path = "mediarepo-logic"
[dependencies.mediarepo-socket]
path = "./mediarepo-socket"
[dependencies.mediarepo-worker]
path = "./mediarepo-worker"
[dependencies.tokio]
version = "1.17.0"
features = ["macros", "rt-multi-thread", "io-std", "io-util"]

@ -13,15 +13,16 @@ multibase = "0.9.1"
base64 = "0.13.0"
toml = "0.5.8"
serde = "1.0.136"
typemap_rev = "0.1.5"
futures = "0.3.21"
itertools = "0.10.3"
glob = "0.3.0"
tracing = "0.1.31"
tracing = "0.1.32"
data-encoding = "2.3.2"
tokio-graceful-shutdown = "0.4.3"
tokio-graceful-shutdown = "0.4.4"
thumbnailer = "0.4.0"
bincode = "1.3.3"
tracing-subscriber = "0.3.9"
trait-bound-typemap = "0.3.3"
[dependencies.sea-orm]
version = "0.6.0"

@ -43,6 +43,9 @@ pub enum RepoError {
#[error("the database file is corrupted {0}")]
Corrupted(String),
#[error("bincode de-/serialization failed {0}")]
Bincode(#[from] bincode::Error),
}
#[derive(Error, Debug)]

@ -1,9 +1,11 @@
pub use bincode;
pub use futures;
pub use itertools;
pub use mediarepo_api;
pub use mediarepo_api::bromine;
pub use thumbnailer;
pub use tokio_graceful_shutdown;
pub use trait_bound_typemap;
pub mod content_descriptor;
pub mod context;

@ -3,7 +3,7 @@ use std::path::PathBuf;
use mediarepo_api::types::repo::SizeType;
use tokio_graceful_shutdown::SubsystemHandle;
use typemap_rev::TypeMapKey;
use trait_bound_typemap::TypeMapKey;
use crate::settings::Settings;

@ -8,7 +8,7 @@ workspace = ".."
[dependencies]
chrono = "0.4.19"
tracing = "0.1.31"
tracing = "0.1.32"
[dependencies.mediarepo-core]
path = "../mediarepo-core"

@ -0,0 +1,5 @@
CREATE TABLE job_states (
job_type INTEGER NOT NULL,
value BLOB,
PRIMARY KEY (job_type)
);

@ -0,0 +1,45 @@
use sea_orm::prelude::*;
use sea_orm::TryFromU64;
#[derive(Clone, Debug, PartialEq, DeriveEntityModel)]
#[sea_orm(table_name = "job_states")]
pub struct Model {
#[sea_orm(primary_key)]
pub job_type: JobType,
pub value: Vec<u8>,
}
#[derive(Clone, Copy, Debug, PartialEq, EnumIter, DeriveActiveEnum)]
#[sea_orm(rs_type = "u32", db_type = "Integer")]
pub enum JobType {
#[sea_orm(num_value = 10)]
MigrateCDs,
#[sea_orm(num_value = 20)]
CalculateSizes,
#[sea_orm(num_value = 30)]
GenerateThumbs,
#[sea_orm(num_value = 40)]
CheckIntegrity,
#[sea_orm(num_value = 50)]
Vacuum,
}
impl TryFromU64 for JobType {
fn try_from_u64(n: u64) -> Result<Self, DbErr> {
let value = match n {
10 => Self::MigrateCDs,
20 => Self::CalculateSizes,
30 => Self::GenerateThumbs,
40 => Self::CheckIntegrity,
50 => Self::Vacuum,
_ => return Err(DbErr::Custom(String::from("Invalid job type"))),
};
Ok(value)
}
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {}
impl ActiveModelBehavior for ActiveModel {}

@ -3,6 +3,7 @@ pub mod content_descriptor_source;
pub mod content_descriptor_tag;
pub mod file;
pub mod file_metadata;
pub mod job_state;
pub mod namespace;
pub mod sort_key;
pub mod sorting_preset;

@ -8,11 +8,10 @@ workspace = ".."
[dependencies]
chrono = "0.4.19"
typemap_rev = "0.1.5"
serde = "1.0.136"
mime_guess = "2.0.4"
mime = "0.3.16"
tracing = "0.1.31"
tracing = "0.1.32"
async-trait = "0.1.52"
[dependencies.mediarepo-core]

@ -3,5 +3,6 @@ use crate::dao_provider;
pub mod generate_missing_thumbnails;
pub mod migrate_content_descriptors;
pub mod sqlite_operations;
pub mod state;
dao_provider!(JobDao);

@ -0,0 +1,58 @@
use crate::dao::job::JobDao;
use crate::dto::{JobStateDto, UpsertJobStateDto};
use mediarepo_core::error::RepoResult;
use mediarepo_database::entities::job_state;
use mediarepo_database::entities::job_state::JobType;
use sea_orm::prelude::*;
use sea_orm::ActiveValue::Set;
use sea_orm::{Condition, TransactionTrait};
impl JobDao {
/// Returns all job states for a given job id
pub async fn state_for_job_type(&self, job_type: JobType) -> RepoResult<Option<JobStateDto>> {
let state = job_state::Entity::find()
.filter(job_state::Column::JobType.eq(job_type))
.one(&self.ctx.db)
.await?
.map(JobStateDto::new);
Ok(state)
}
pub async fn upsert_state(&self, state: UpsertJobStateDto) -> RepoResult<()> {
self.upsert_multiple_states(vec![state]).await
}
pub async fn upsert_multiple_states(&self, states: Vec<UpsertJobStateDto>) -> RepoResult<()> {
let trx = self.ctx.db.begin().await?;
job_state::Entity::delete_many()
.filter(build_state_filters(&states))
.exec(&trx)
.await?;
job_state::Entity::insert_many(build_active_state_models(states))
.exec(&trx)
.await?;
trx.commit().await?;
Ok(())
}
}
fn build_state_filters(states: &Vec<UpsertJobStateDto>) -> Condition {
states
.iter()
.map(|s| Condition::all().add(job_state::Column::JobType.eq(s.job_type)))
.fold(Condition::any(), |acc, cond| acc.add(cond))
}
fn build_active_state_models(states: Vec<UpsertJobStateDto>) -> Vec<job_state::ActiveModel> {
states
.into_iter()
.map(|s| job_state::ActiveModel {
job_type: Set(s.job_type),
value: Set(s.value),
})
.collect()
}

@ -0,0 +1,31 @@
use mediarepo_database::entities::job_state;
use mediarepo_database::entities::job_state::JobType;
#[derive(Clone, Debug)]
pub struct JobStateDto {
model: job_state::Model,
}
impl JobStateDto {
pub(crate) fn new(model: job_state::Model) -> Self {
Self { model }
}
pub fn job_type(&self) -> JobType {
self.model.job_type
}
pub fn value(&self) -> &[u8] {
&self.model.value
}
pub fn into_value(self) -> Vec<u8> {
self.model.value
}
}
#[derive(Clone, Debug)]
pub struct UpsertJobStateDto {
pub job_type: JobType,
pub value: Vec<u8>,
}

@ -1,5 +1,6 @@
pub use file::*;
pub use file_metadata::*;
pub use job_state::*;
pub use namespace::*;
pub use sorting_preset::*;
pub use tag::*;
@ -7,6 +8,7 @@ pub use thumbnail::*;
mod file;
mod file_metadata;
mod job_state;
mod namespace;
mod sorting_preset;
mod tag;

@ -1,7 +1,6 @@
use mediarepo_core::trait_bound_typemap::TypeMapKey;
use std::sync::Arc;
use typemap_rev::TypeMapKey;
use crate::dao::repo::Repo;
pub struct RepoKey;

@ -8,7 +8,7 @@ workspace = ".."
[dependencies]
serde = "1.0.136"
tracing = "0.1.31"
tracing = "0.1.32"
compare = "0.1.0"
port_check = "0.1.5"
rayon = "1.5.1"
@ -22,6 +22,9 @@ path = "../mediarepo-database"
[dependencies.mediarepo-logic]
path = "../mediarepo-logic"
[dependencies.mediarepo-worker]
path = "../mediarepo-worker"
[dependencies.tokio]
version = "1.17.0"
features = ["net"]

@ -1,6 +1,4 @@
use std::net::SocketAddr;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::net::TcpListener;
use tokio::task::JoinHandle;
@ -10,20 +8,18 @@ use mediarepo_core::error::{RepoError, RepoResult};
use mediarepo_core::mediarepo_api::types::misc::InfoResponse;
use mediarepo_core::settings::{PortSetting, Settings};
use mediarepo_core::tokio_graceful_shutdown::SubsystemHandle;
use mediarepo_core::type_keys::{RepoPathKey, SettingsKey, SizeMetadataKey, SubsystemKey};
use mediarepo_logic::dao::repo::Repo;
use mediarepo_logic::type_keys::RepoKey;
use mediarepo_core::trait_bound_typemap::{SendSyncTypeMap, TypeMap};
use mediarepo_core::type_keys::{SizeMetadataKey, SubsystemKey};
mod from_model;
mod namespaces;
mod utils;
#[tracing::instrument(skip(subsystem, settings, repo))]
#[tracing::instrument(skip_all)]
pub fn start_tcp_server(
subsystem: SubsystemHandle,
repo_path: PathBuf,
settings: Settings,
repo: Repo,
shared_data: SendSyncTypeMap,
) -> RepoResult<(String, JoinHandle<()>)> {
let port = match &settings.server.tcp.port {
PortSetting::Fixed(p) => {
@ -45,9 +41,7 @@ pub fn start_tcp_server(
.spawn(async move {
get_builder::<TcpListener>(address)
.insert::<SubsystemKey>(subsystem)
.insert::<RepoKey>(Arc::new(repo))
.insert::<SettingsKey>(settings)
.insert::<RepoPathKey>(repo_path)
.insert_all(shared_data)
.insert::<SizeMetadataKey>(Default::default())
.build_server()
.await
@ -58,13 +52,11 @@ pub fn start_tcp_server(
}
#[cfg(unix)]
#[tracing::instrument(skip(subsystem, settings, repo))]
#[tracing::instrument(skip_all)]
pub fn create_unix_socket(
subsystem: SubsystemHandle,
path: std::path::PathBuf,
repo_path: PathBuf,
settings: Settings,
repo: Repo,
shared_data: SendSyncTypeMap,
) -> RepoResult<JoinHandle<()>> {
use std::fs;
use tokio::net::UnixListener;
@ -77,9 +69,7 @@ pub fn create_unix_socket(
.spawn(async move {
get_builder::<UnixListener>(path)
.insert::<SubsystemKey>(subsystem)
.insert::<RepoKey>(Arc::new(repo))
.insert::<SettingsKey>(settings)
.insert::<RepoPathKey>(repo_path)
.insert_all(shared_data)
.insert::<SizeMetadataKey>(Default::default())
.build_server()
.await

@ -1,11 +1,15 @@
use crate::TypeMap;
use mediarepo_core::bromine::prelude::*;
use mediarepo_core::error::RepoResult;
use mediarepo_core::mediarepo_api::types::jobs::{JobType, RunJobRequest};
use mediarepo_core::mediarepo_api::types::repo::SizeType;
use mediarepo_core::type_keys::SizeMetadataKey;
use mediarepo_logic::dao::DaoProvider;
use mediarepo_core::type_keys::{RepoPathKey, SettingsKey, SizeMetadataKey};
use mediarepo_worker::handle::JobState;
use mediarepo_worker::job_dispatcher::JobDispatcher;
use mediarepo_worker::jobs::{
CalculateSizesJob, CheckIntegrityJob, GenerateMissingThumbsJob, Job, MigrateCDsJob, VacuumJob,
};
use crate::utils::{calculate_size, get_repo_from_context};
use crate::utils::get_job_dispatcher_from_context;
pub struct JobsNamespace;
@ -25,7 +29,7 @@ impl JobsNamespace {
#[tracing::instrument(skip_all)]
pub async fn run_job(ctx: &Context, event: Event) -> IPCResult<Response> {
let run_request = event.payload::<RunJobRequest>()?;
let job_dao = get_repo_from_context(ctx).await.job();
let dispatcher = get_job_dispatcher_from_context(ctx).await;
if !run_request.sync {
// early response to indicate that the job will be run
@ -33,26 +37,69 @@ impl JobsNamespace {
}
match run_request.job_type {
JobType::MigrateContentDescriptors => job_dao.migrate_content_descriptors().await?,
JobType::MigrateContentDescriptors => {
dispatch_job(&dispatcher, MigrateCDsJob::default(), run_request.sync).await?
}
JobType::CalculateSizes => calculate_all_sizes(ctx).await?,
JobType::CheckIntegrity => job_dao.check_integrity().await?,
JobType::Vacuum => job_dao.vacuum().await?,
JobType::GenerateThumbnails => job_dao.generate_missing_thumbnails().await?,
JobType::CheckIntegrity => {
dispatch_job(&dispatcher, CheckIntegrityJob::default(), run_request.sync).await?
}
JobType::Vacuum => {
dispatch_job(&dispatcher, VacuumJob::default(), run_request.sync).await?
}
JobType::GenerateThumbnails => {
dispatch_job(
&dispatcher,
GenerateMissingThumbsJob::default(),
run_request.sync,
)
.await?
}
}
Ok(Response::empty())
}
}
async fn dispatch_job<J: 'static + Job>(
dispatcher: &JobDispatcher,
job: J,
sync: bool,
) -> RepoResult<()> {
let mut handle = if let Some(handle) = dispatcher.get_handle::<J>().await {
if handle.state().await == JobState::Running {
handle
} else {
dispatcher.dispatch(job).await
}
} else {
dispatcher.dispatch(job).await
};
if sync {
if let Some(result) = handle.take_result().await {
result?;
}
}
Ok(())
}
async fn calculate_all_sizes(ctx: &Context) -> RepoResult<()> {
let size_types = vec![
SizeType::Total,
SizeType::FileFolder,
SizeType::ThumbFolder,
SizeType::DatabaseFile,
];
for size_type in size_types {
let size = calculate_size(&size_type, ctx).await?;
let (repo_path, settings) = {
let data = ctx.data.read().await;
(
data.get::<RepoPathKey>().unwrap().clone(),
data.get::<SettingsKey>().unwrap().clone(),
)
};
let job = CalculateSizesJob::new(repo_path, settings);
let dispatcher = get_job_dispatcher_from_context(ctx).await;
let handle = dispatcher.dispatch(job).await;
let mut rx = {
let status = handle.status().read().await;
status.sizes_channel.subscribe()
};
while let Ok((size_type, size)) = rx.recv().await {
let mut data = ctx.data.write().await;
let size_map = data.get_mut::<SizeMetadataKey>().unwrap();
size_map.insert(size_type, size);

@ -2,13 +2,14 @@ use std::path::PathBuf;
use tokio::fs;
use crate::TypeMap;
use mediarepo_core::bromine::prelude::*;
use mediarepo_core::mediarepo_api::types::repo::{
FrontendState, RepositoryMetadata, SizeMetadata, SizeType,
};
use mediarepo_core::type_keys::{RepoPathKey, SettingsKey, SizeMetadataKey};
use crate::utils::{calculate_size, get_repo_from_context};
use crate::utils::get_repo_from_context;
pub struct RepoNamespace;
@ -56,7 +57,7 @@ impl RepoNamespace {
let size = if let Some(size) = size_cache.get(&size_type) {
*size
} else {
calculate_size(&size_type, ctx).await?
0
};
ctx.response(SizeMetadata { size, size_type })

@ -1,18 +1,15 @@
use std::sync::Arc;
use tokio::fs;
use crate::TypeMap;
use mediarepo_core::bromine::ipc::context::Context;
use mediarepo_core::content_descriptor::decode_content_descriptor;
use mediarepo_core::error::{RepoError, RepoResult};
use mediarepo_core::mediarepo_api::types::identifier::FileIdentifier;
use mediarepo_core::mediarepo_api::types::repo::SizeType;
use mediarepo_core::type_keys::{RepoPathKey, SettingsKey};
use mediarepo_core::utils::get_folder_size;
use mediarepo_logic::dao::DaoProvider;
use mediarepo_logic::dao::repo::Repo;
use mediarepo_logic::dao::DaoProvider;
use mediarepo_logic::dto::FileDto;
use mediarepo_logic::type_keys::RepoKey;
use mediarepo_worker::job_dispatcher::{DispatcherKey, JobDispatcher};
pub async fn get_repo_from_context(ctx: &Context) -> Arc<Repo> {
let data = ctx.data.read().await;
@ -20,6 +17,11 @@ pub async fn get_repo_from_context(ctx: &Context) -> Arc<Repo> {
Arc::clone(repo)
}
pub async fn get_job_dispatcher_from_context(ctx: &Context) -> JobDispatcher {
let data = ctx.data.read().await;
data.get::<DispatcherKey>().unwrap().clone()
}
pub async fn file_by_identifier(identifier: FileIdentifier, repo: &Repo) -> RepoResult<FileDto> {
let file = match identifier {
FileIdentifier::ID(id) => repo.file().by_id(id).await,
@ -41,27 +43,3 @@ pub async fn cd_by_identifier(identifier: FileIdentifier, repo: &Repo) -> RepoRe
FileIdentifier::CD(cd) => decode_content_descriptor(cd),
}
}
pub async fn calculate_size(size_type: &SizeType, ctx: &Context) -> RepoResult<u64> {
let repo = get_repo_from_context(ctx).await;
let (repo_path, settings) = {
let data = ctx.data.read().await;
(
data.get::<RepoPathKey>().unwrap().clone(),
data.get::<SettingsKey>().unwrap().clone(),
)
};
let size = match &size_type {
SizeType::Total => get_folder_size(repo_path).await?,
SizeType::FileFolder => repo.get_main_store_size().await?,
SizeType::ThumbFolder => repo.get_thumb_store_size().await?,
SizeType::DatabaseFile => {
let db_path = settings.paths.db_file_path(&repo_path);
let database_metadata = fs::metadata(db_path).await?;
database_metadata.len()
}
};
Ok(size)
}

@ -0,0 +1,31 @@
[package]
name = "mediarepo-worker"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
async-trait = "0.1.52"
tracing = "0.1.32"
[dependencies.mediarepo-core]
path = "../mediarepo-core"
[dependencies.mediarepo-logic]
path = "../mediarepo-logic"
[dependencies.mediarepo-database]
path = "../mediarepo-database"
[dependencies.tokio]
version = "1.17.0"
features = ["macros"]
[dependencies.chrono]
version = "0.4.19"
features = ["serde"]
[dependencies.serde]
version = "1.0.136"
features = ["derive"]

@ -0,0 +1,101 @@
use mediarepo_core::error::{RepoError, RepoResult};
use std::ops::{Deref, DerefMut};
use std::sync::Arc;
use tokio::sync::broadcast::{Receiver, Sender};
use tokio::sync::RwLock;
pub struct JobHandle<T: Send + Sync, R: Send + Sync> {
status: Arc<RwLock<T>>,
state: Arc<RwLock<JobState>>,
result_receiver: CloneableReceiver<Arc<RwLock<Option<RepoResult<R>>>>>,
}
impl<T: Send + Sync, R: Send + Sync> Clone for JobHandle<T, R> {
fn clone(&self) -> Self {
Self {
status: self.status.clone(),
state: self.state.clone(),
result_receiver: self.result_receiver.clone(),
}
}
}
impl<T: Send + Sync, R: Send + Sync> JobHandle<T, R> {
pub fn new(
status: Arc<RwLock<T>>,
state: Arc<RwLock<JobState>>,
result_receiver: CloneableReceiver<Arc<RwLock<Option<RepoResult<R>>>>>,
) -> Self {
Self {
status,
state,
result_receiver,
}
}
pub async fn state(&self) -> JobState {
*self.state.read().await
}
pub fn status(&self) -> &Arc<RwLock<T>> {
&self.status
}
pub async fn result(&mut self) -> Arc<RwLock<Option<RepoResult<R>>>> {
match self.result_receiver.recv().await {
Ok(v) => v,
Err(e) => Arc::new(RwLock::new(Some(Err(RepoError::from(&*e.to_string()))))),
}
}
pub async fn take_result(&mut self) -> Option<RepoResult<R>> {
let shared_result = self.result().await;
let mut result = shared_result.write().await;
result.take()
}
}
#[derive(Clone, Copy, Debug, Ord, PartialOrd, Eq, PartialEq)]
pub enum JobState {
Queued,
Scheduled,
Running,
Finished,
}
pub struct CloneableReceiver<T: Clone> {
receiver: Receiver<T>,
sender: Sender<T>,
}
impl<T: Clone> CloneableReceiver<T> {
pub fn new(sender: Sender<T>) -> Self {
Self {
receiver: sender.subscribe(),
sender,
}
}
}
impl<T: Clone> Clone for CloneableReceiver<T> {
fn clone(&self) -> Self {
Self {
sender: self.sender.clone(),
receiver: self.sender.subscribe(),
}
}
}
impl<T: Clone> Deref for CloneableReceiver<T> {
type Target = Receiver<T>;
fn deref(&self) -> &Self::Target {
&self.receiver
}
}
impl<T: Clone> DerefMut for CloneableReceiver<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.receiver
}
}

@ -0,0 +1,137 @@
use crate::handle::{CloneableReceiver, JobHandle, JobState};
use crate::jobs::{Job, JobTypeKey};
use mediarepo_core::tokio_graceful_shutdown::SubsystemHandle;
use mediarepo_core::trait_bound_typemap::{SendSyncTypeMap, TypeMap, TypeMapKey};
use mediarepo_logic::dao::repo::Repo;
use mediarepo_logic::dao::DaoProvider;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::broadcast::channel;
use tokio::sync::RwLock;
use tokio::time::Instant;
#[derive(Clone)]
pub struct JobDispatcher {
subsystem: SubsystemHandle,
job_handle_map: Arc<RwLock<SendSyncTypeMap>>,
repo: Arc<Repo>,
}
impl JobDispatcher {
pub fn new(subsystem: SubsystemHandle, repo: Repo) -> Self {
Self {
job_handle_map: Arc::new(RwLock::new(SendSyncTypeMap::new())),
subsystem,
repo: Arc::new(repo),
}
}
pub async fn dispatch<T: 'static + Job>(&self, job: T) -> JobHandle<T::JobStatus, T::Result> {
self._dispatch(job, None).await
}
pub async fn dispatch_periodically<T: 'static + Job>(
&self,
job: T,
interval: Duration,
) -> JobHandle<T::JobStatus, T::Result> {
self._dispatch(job, Some(interval)).await
}
#[tracing::instrument(level = "debug", skip_all)]
async fn _dispatch<T: 'static + Job>(
&self,
job: T,
interval: Option<Duration>,
) -> JobHandle<T::JobStatus, T::Result> {
let status = job.status();
let state = Arc::new(RwLock::new(JobState::Queued));
let (sender, mut receiver) = channel(1);
self.subsystem
.start("channel-consumer", move |subsystem| async move {
tokio::select! {
_ = receiver.recv() => (),
_ = subsystem.on_shutdown_requested() => (),
}
Ok(())
});
let receiver = CloneableReceiver::new(sender.clone());
let handle = JobHandle::new(status.clone(), state.clone(), receiver);
self.add_handle::<T>(handle.clone()).await;
let repo = self.repo.clone();
self.subsystem
.start("worker-job", move |subsystem| async move {
loop {
let start = Instant::now();
let job_2 = job.clone();
{
let mut state = state.write().await;
*state = JobState::Running;
}
if let Err(e) = job.load_state(repo.job()).await {
tracing::error!("failed to load the jobs state: {}", e);
}
let result = tokio::select! {
_ = subsystem.on_shutdown_requested() => {
job_2.save_state(repo.job()).await
}
r = job.run(repo.clone()) => {
match r {
Err(e) => Err(e),
Ok(v) => {
let _ = sender.send(Arc::new(RwLock::new(Some(Ok(v)))));
job.save_state(repo.job()).await
}
}
}
};
if let Err(e) = result {
tracing::error!("job failed with error: {}", e);
let _ = sender.send(Arc::new(RwLock::new(Some(Err(e)))));
}
if let Some(interval) = interval {
{
let mut state = state.write().await;
*state = JobState::Scheduled;
}
let sleep_duration = interval - start.elapsed();
tokio::select! {
_ = tokio::time::sleep(sleep_duration) => {},
_ = subsystem.on_shutdown_requested() => {break}
}
} else {
let mut state = state.write().await;
*state = JobState::Finished;
break;
}
}
Ok(())
});
handle
}
#[inline]
async fn add_handle<T: 'static + Job>(&self, handle: JobHandle<T::JobStatus, T::Result>) {
let mut status_map = self.job_handle_map.write().await;
status_map.insert::<JobTypeKey<T>>(handle);
}
#[inline]
pub async fn get_handle<T: 'static + Job>(&self) -> Option<JobHandle<T::JobStatus, T::Result>> {
let map = self.job_handle_map.read().await;
map.get::<JobTypeKey<T>>().cloned()
}
}
pub struct DispatcherKey;
impl TypeMapKey for DispatcherKey {
type Value = JobDispatcher;
}
unsafe impl Send for JobDispatcher {}
unsafe impl Sync for JobDispatcher {}

@ -0,0 +1,91 @@
use crate::jobs::Job;
use crate::status_utils::SimpleProgress;
use async_trait::async_trait;
use mediarepo_core::error::{RepoError, RepoResult};
use mediarepo_core::mediarepo_api::types::repo::SizeType;
use mediarepo_core::settings::Settings;
use mediarepo_core::utils::get_folder_size;
use mediarepo_logic::dao::repo::Repo;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::fs;
use tokio::sync::broadcast::{self, Sender};
use tokio::sync::RwLock;
pub struct CalculateSizesState {
pub progress: SimpleProgress,
pub sizes_channel: Sender<(SizeType, u64)>,
}
#[derive(Clone)]
pub struct CalculateSizesJob {
repo_path: PathBuf,
settings: Settings,
state: Arc<RwLock<CalculateSizesState>>,
}
impl CalculateSizesJob {
pub fn new(repo_path: PathBuf, settings: Settings) -> Self {
let (tx, _) = broadcast::channel(4);
Self {
repo_path,
settings,
state: Arc::new(RwLock::new(CalculateSizesState {
sizes_channel: tx,
progress: SimpleProgress::new(4),
})),
}
}
}
#[async_trait]
impl Job for CalculateSizesJob {
type JobStatus = CalculateSizesState;
type Result = ();
fn status(&self) -> Arc<RwLock<Self::JobStatus>> {
self.state.clone()
}
#[tracing::instrument(level = "debug", skip_all)]
async fn run(&self, repo: Arc<Repo>) -> RepoResult<()> {
let size_types = vec![
SizeType::Total,
SizeType::FileFolder,
SizeType::ThumbFolder,
SizeType::DatabaseFile,
];
for size_type in size_types {
let size = calculate_size(&size_type, &repo, &self.repo_path, &self.settings).await?;
let mut state = self.state.write().await;
state
.sizes_channel
.send((size_type, size))
.map_err(|_| RepoError::from("failed to broadcast new size"))?;
state.progress.tick();
}
Ok(())
}
}
async fn calculate_size(
size_type: &SizeType,
repo: &Repo,
repo_path: &PathBuf,
settings: &Settings,
) -> RepoResult<u64> {
let size = match &size_type {
SizeType::Total => get_folder_size(repo_path.clone()).await?,
SizeType::FileFolder => repo.get_main_store_size().await?,
SizeType::ThumbFolder => repo.get_thumb_store_size().await?,
SizeType::DatabaseFile => {
let db_path = settings.paths.db_file_path(repo_path);
let database_metadata = fs::metadata(db_path).await?;
database_metadata.len()
}
};
Ok(size)
}

@ -0,0 +1,32 @@
use crate::jobs::Job;
use crate::status_utils::SimpleProgress;
use async_trait::async_trait;
use mediarepo_core::error::RepoResult;
use mediarepo_logic::dao::repo::Repo;
use mediarepo_logic::dao::DaoProvider;
use std::sync::Arc;
use tokio::sync::RwLock;
#[derive(Clone, Default)]
pub struct CheckIntegrityJob {
progress: Arc<RwLock<SimpleProgress>>,
}
#[async_trait]
impl Job for CheckIntegrityJob {
type JobStatus = SimpleProgress;
type Result = ();
fn status(&self) -> Arc<RwLock<Self::JobStatus>> {
self.progress.clone()
}
async fn run(&self, repo: Arc<Repo>) -> RepoResult<Self::Result> {
repo.job().check_integrity().await?;
{
let mut progress = self.progress.write().await;
progress.set_total(100);
}
Ok(())
}
}

@ -0,0 +1,109 @@
use crate::jobs::{deserialize_state, serialize_state, Job};
use crate::status_utils::SimpleProgress;
use async_trait::async_trait;
use mediarepo_core::error::RepoResult;
use mediarepo_core::thumbnailer::ThumbnailSize;
use mediarepo_database::entities::job_state::JobType;
use mediarepo_logic::dao::job::JobDao;
use mediarepo_logic::dao::repo::Repo;
use mediarepo_logic::dao::DaoProvider;
use serde::{Deserialize, Serialize};
use std::mem;
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use tokio::sync::RwLock;
#[derive(Clone, Default)]
pub struct GenerateMissingThumbsJob {
state: Arc<RwLock<SimpleProgress>>,
inner_state: Arc<RwLock<GenerateThumbsState>>,
}
#[async_trait]
impl Job for GenerateMissingThumbsJob {
type JobStatus = SimpleProgress;
type Result = ();
fn status(&self) -> Arc<RwLock<Self::JobStatus>> {
self.state.clone()
}
async fn load_state(&self, job_dao: JobDao) -> RepoResult<()> {
if let Some(state) = job_dao.state_for_job_type(JobType::GenerateThumbs).await? {
let mut inner_state = self.inner_state.write().await;
let state = deserialize_state::<GenerateThumbsState>(state)?;
let _ = mem::replace(&mut *inner_state, state);
}
Ok(())
}
async fn run(&self, repo: Arc<Repo>) -> RepoResult<()> {
if !self.needs_generation(&repo).await? {
return Ok(());
}
let file_dao = repo.file();
let all_files = file_dao.all().await?;
{
let mut progress = self.state.write().await;
progress.set_total(all_files.len() as u64);
}
for file in all_files {
if file_dao.thumbnails(file.encoded_cd()).await?.is_empty() {
let _ = file_dao
.create_thumbnails(&file, vec![ThumbnailSize::Medium])
.await;
}
{
let mut progress = self.state.write().await;
progress.tick();
}
}
self.refresh_state(&repo).await?;
Ok(())
}
async fn save_state(&self, job_dao: JobDao) -> RepoResult<()> {
let state = self.inner_state.read().await;
let state = serialize_state(JobType::GenerateThumbs, &*state)?;
job_dao.upsert_state(state).await
}
}
impl GenerateMissingThumbsJob {
async fn needs_generation(&self, repo: &Repo) -> RepoResult<bool> {
let repo_counts = repo.get_counts().await?;
let file_count = repo_counts.file_count as u64;
let state = self.inner_state.read().await;
Ok(state.file_count != file_count
|| state.last_run.elapsed().unwrap() > Duration::from_secs(60 * 60))
}
async fn refresh_state(&self, repo: &Repo) -> RepoResult<()> {
let repo_counts = repo.get_counts().await?;
let mut state = self.inner_state.write().await;
state.last_run = SystemTime::now();
state.file_count = repo_counts.file_count as u64;
Ok(())
}
}
#[derive(Serialize, Deserialize)]
struct GenerateThumbsState {
file_count: u64,
last_run: SystemTime,
}
impl Default for GenerateThumbsState {
fn default() -> Self {
Self {
file_count: 0,
last_run: SystemTime::now(),
}
}
}

@ -0,0 +1,65 @@
use crate::jobs::{deserialize_state, serialize_state, Job};
use crate::status_utils::SimpleProgress;
use async_trait::async_trait;
use mediarepo_core::error::RepoResult;
use mediarepo_database::entities::job_state::JobType;
use mediarepo_logic::dao::job::JobDao;
use mediarepo_logic::dao::repo::Repo;
use mediarepo_logic::dao::DaoProvider;
use serde::{Deserialize, Serialize};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use tokio::sync::RwLock;
#[derive(Clone, Default)]
pub struct MigrateCDsJob {
progress: Arc<RwLock<SimpleProgress>>,
migrated: Arc<AtomicBool>,
}
#[async_trait]
impl Job for MigrateCDsJob {
type JobStatus = SimpleProgress;
type Result = ();
fn status(&self) -> Arc<tokio::sync::RwLock<Self::JobStatus>> {
self.progress.clone()
}
async fn load_state(&self, job_dao: JobDao) -> RepoResult<()> {
if let Some(state) = job_dao.state_for_job_type(JobType::MigrateCDs).await? {
let state = deserialize_state::<MigrationStatus>(state)?;
self.migrated.store(state.migrated, Ordering::SeqCst);
}
Ok(())
}
async fn run(&self, repo: Arc<Repo>) -> RepoResult<Self::Result> {
if self.migrated.load(Ordering::SeqCst) {
return Ok(());
}
let job_dao = repo.job();
job_dao.migrate_content_descriptors().await?;
self.migrated.store(true, Ordering::Relaxed);
{
let mut progress = self.progress.write().await;
progress.set_total(100);
}
Ok(())
}
async fn save_state(&self, job_dao: JobDao) -> RepoResult<()> {
if self.migrated.load(Ordering::Relaxed) {
let state = serialize_state(JobType::MigrateCDs, &MigrationStatus { migrated: true })?;
job_dao.upsert_state(state).await?;
}
Ok(())
}
}
#[derive(Serialize, Deserialize)]
struct MigrationStatus {
pub migrated: bool,
}

@ -0,0 +1,71 @@
mod calculate_sizes;
mod check_integrity;
mod generate_missing_thumbnails;
mod migrate_content_descriptors;
mod vacuum;
pub use calculate_sizes::*;
pub use check_integrity::*;
pub use generate_missing_thumbnails::*;
pub use migrate_content_descriptors::*;
use std::marker::PhantomData;
use std::sync::Arc;
pub use vacuum::*;
use crate::handle::JobHandle;
use async_trait::async_trait;
use mediarepo_core::bincode;
use mediarepo_core::error::{RepoError, RepoResult};
use mediarepo_core::trait_bound_typemap::TypeMapKey;
use mediarepo_database::entities::job_state::JobType;
use mediarepo_logic::dao::job::JobDao;
use mediarepo_logic::dao::repo::Repo;
use mediarepo_logic::dto::{JobStateDto, UpsertJobStateDto};
use serde::de::DeserializeOwned;
use serde::Serialize;
use tokio::sync::RwLock;
type EmptyStatus = Arc<RwLock<()>>;
#[async_trait]
pub trait Job: Clone + Send + Sync {
type JobStatus: Send + Sync;
type Result: Send + Sync;
fn status(&self) -> Arc<RwLock<Self::JobStatus>>;
async fn load_state(&self, _job_dao: JobDao) -> RepoResult<()> {
Ok(())
}
async fn run(&self, repo: Arc<Repo>) -> RepoResult<Self::Result>;
async fn save_state(&self, _job_dao: JobDao) -> RepoResult<()> {
Ok(())
}
}
pub struct JobTypeKey<T: Job>(PhantomData<T>);
impl<T: 'static> TypeMapKey for JobTypeKey<T>
where
T: Job,
{
type Value = JobHandle<T::JobStatus, T::Result>;
}
pub fn deserialize_state<T: DeserializeOwned>(dto: JobStateDto) -> RepoResult<T> {
bincode::deserialize(dto.value()).map_err(RepoError::from)
}
pub fn serialize_state<T: Serialize>(
job_type: JobType,
state: &T,
) -> RepoResult<UpsertJobStateDto> {
let dto = UpsertJobStateDto {
value: bincode::serialize(state)?,
job_type,
};
Ok(dto)
}

@ -0,0 +1,27 @@
use crate::jobs::{EmptyStatus, Job};
use async_trait::async_trait;
use mediarepo_core::error::RepoResult;
use mediarepo_logic::dao::repo::Repo;
use mediarepo_logic::dao::DaoProvider;
use std::sync::Arc;
use tokio::sync::RwLock;
#[derive(Default, Clone)]
pub struct VacuumJob;
#[async_trait]
impl Job for VacuumJob {
type JobStatus = ();
type Result = ();
fn status(&self) -> Arc<RwLock<Self::JobStatus>> {
EmptyStatus::default()
}
#[tracing::instrument(level = "debug", skip_all)]
async fn run(&self, repo: Arc<Repo>) -> RepoResult<()> {
repo.job().vacuum().await?;
Ok(())
}
}

@ -0,0 +1,39 @@
use crate::job_dispatcher::JobDispatcher;
use crate::jobs::{CheckIntegrityJob, MigrateCDsJob, VacuumJob};
use mediarepo_core::error::RepoError;
use mediarepo_core::tokio_graceful_shutdown::Toplevel;
use mediarepo_logic::dao::repo::Repo;
use std::time::Duration;
use tokio::sync::oneshot::channel;
pub mod handle;
pub mod job_dispatcher;
pub mod jobs;
pub mod status_utils;
pub async fn start(top_level: Toplevel, repo: Repo) -> (Toplevel, JobDispatcher) {
let (tx, rx) = channel();
let top_level = top_level.start("mediarepo-worker", |subsystem| async move {
let dispatcher = JobDispatcher::new(subsystem, repo);
tx.send(dispatcher.clone())
.map_err(|_| RepoError::from("failed to send dispatcher"))?;
dispatcher
.dispatch_periodically(VacuumJob::default(), Duration::from_secs(60 * 30))
.await;
dispatcher
.dispatch_periodically(
CheckIntegrityJob::default(),
Duration::from_secs(60 * 60 * 24),
)
.await;
dispatcher.dispatch(MigrateCDsJob::default()).await;
Ok(())
});
let receiver = rx
.await
.expect("failed to create background job dispatcher");
(top_level, receiver)
}

@ -0,0 +1,39 @@
pub struct SimpleProgress {
pub current: u64,
pub total: u64,
}
impl Default for SimpleProgress {
fn default() -> Self {
Self {
total: 100,
current: 0,
}
}
}
impl SimpleProgress {
pub fn new(total: u64) -> Self {
Self { total, current: 0 }
}
/// Sets the total count
pub fn set_total(&mut self, total: u64) {
self.total = total;
}
/// Increments the current progress by 1
pub fn tick(&mut self) {
self.current += 1;
}
/// Sets the current progress to a defined value
pub fn set_current(&mut self, current: u64) {
self.current = current;
}
/// Returns the total progress in percent
pub fn percent(&self) -> f64 {
(self.current as f64) / (self.total as f64)
}
}

@ -1,5 +1,7 @@
use std::env;
use std::iter::FromIterator;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use structopt::StructOpt;
@ -10,8 +12,12 @@ use mediarepo_core::error::RepoResult;
use mediarepo_core::fs::drop_file::DropFile;
use mediarepo_core::settings::{PathSettings, Settings};
use mediarepo_core::tokio_graceful_shutdown::{SubsystemHandle, Toplevel};
use mediarepo_core::trait_bound_typemap::{CloneSendSyncTypeMap, SendSyncTypeMap, TypeMap};
use mediarepo_core::type_keys::{RepoPathKey, SettingsKey};
use mediarepo_logic::dao::repo::Repo;
use mediarepo_logic::type_keys::RepoKey;
use mediarepo_socket::start_tcp_server;
use mediarepo_worker::job_dispatcher::DispatcherKey;
use crate::utils::{create_paths_for_repo, get_repo, load_settings};
@ -99,18 +105,28 @@ async fn init_repo(opt: &Opt, paths: &PathSettings) -> RepoResult<Repo> {
/// Starts the server
async fn start_server(opt: Opt, settings: Settings) -> RepoResult<()> {
let repo = init_repo(&opt, &settings.paths).await?;
let mut top_level = Toplevel::new();
let (mut top_level, dispatcher) = mediarepo_worker::start(Toplevel::new(), repo.clone()).await;
let mut shared_data = CloneSendSyncTypeMap::new();
shared_data.insert::<RepoKey>(Arc::new(repo));
shared_data.insert::<SettingsKey>(settings.clone());
shared_data.insert::<RepoPathKey>(opt.repo.clone());
shared_data.insert::<DispatcherKey>(dispatcher);
#[cfg(unix)]
{
if settings.server.unix_socket.enabled {
let settings = settings.clone();
let repo_path = opt.repo.clone();
let repo = repo.clone();
let shared_data = shared_data.clone();
top_level = top_level.start("mediarepo-unix-socket", |subsystem| {
Box::pin(async move {
start_and_await_unix_socket(subsystem, repo_path, settings, repo).await?;
start_and_await_unix_socket(
subsystem,
repo_path,
SendSyncTypeMap::from_iter(shared_data),
)
.await?;
Ok(())
})
})
@ -120,7 +136,13 @@ async fn start_server(opt: Opt, settings: Settings) -> RepoResult<()> {
if settings.server.tcp.enabled {
top_level = top_level.start("mediarepo-tcp", move |subsystem| {
Box::pin(async move {
start_and_await_tcp_server(subsystem, opt.repo, settings, repo).await?;
start_and_await_tcp_server(
subsystem,
opt.repo,
settings,
SendSyncTypeMap::from_iter(shared_data),
)
.await?;
Ok(())
})
@ -147,9 +169,9 @@ async fn start_and_await_tcp_server(
subsystem: SubsystemHandle,
repo_path: PathBuf,
settings: Settings,
repo: Repo,
shared_data: SendSyncTypeMap,
) -> RepoResult<()> {
let (address, handle) = start_tcp_server(subsystem.clone(), repo_path.clone(), settings, repo)?;
let (address, handle) = start_tcp_server(subsystem.clone(), settings, shared_data)?;
let (mut file, _guard) = DropFile::new(repo_path.join("repo.tcp")).await?;
file.write_all(&address.into_bytes()).await?;
@ -172,17 +194,10 @@ async fn start_and_await_tcp_server(
async fn start_and_await_unix_socket(
subsystem: SubsystemHandle,
repo_path: PathBuf,
settings: Settings,
repo: Repo,
shared_data: SendSyncTypeMap,
) -> RepoResult<()> {
let socket_path = repo_path.join("repo.sock");
let handle = mediarepo_socket::create_unix_socket(
subsystem.clone(),
socket_path,
repo_path.clone(),
settings,
repo,
)?;
let handle = mediarepo_socket::create_unix_socket(subsystem.clone(), socket_path, shared_data)?;
let _guard = DropFile::from_path(repo_path.join("repo.sock"));
tokio::select! {

Loading…
Cancel
Save