Integrate worker into main application

Signed-off-by: trivernis <trivernis@protonmail.com>
feature/jobs
trivernis 2 years ago
parent a145d604d9
commit 056166ee60
Signed by: Trivernis
GPG Key ID: DFFFCC2C7A02DB45

@ -63,9 +63,9 @@ dependencies = [
[[package]]
name = "anyhow"
version = "1.0.55"
version = "1.0.56"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "159bb86af3a200e19a068f4224eae4c8bb2d0fa054c7e5d1cacd5cef95e684cd"
checksum = "4361135be9122e0870de935d7c439aef945b9f9ddd4199a553b5270b49c82a27"
[[package]]
name = "arrayref"
@ -270,9 +270,8 @@ dependencies = [
[[package]]
name = "bromine"
version = "0.18.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5dd7887995490657bf3ec578f39e747ef7b5355a8dc6c99b3d5be59ca70dc4d5"
version = "0.18.3"
source = "git+https://github.com/Trivernis/bromine#c2728a44ead210e1535bce5fc2d3979530700b96"
dependencies = [
"async-trait",
"bincode",
@ -301,9 +300,9 @@ checksum = "e3b5ca7a04898ad4bcd41c90c5285445ff5b791899bb1b0abdd2a2aa791211d7"
[[package]]
name = "bytemuck"
version = "1.7.3"
version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "439989e6b8c38d1b6570a384ef1e49c8848128f5a97f3914baef02920842712f"
checksum = "0e851ca7c24871e7336801608a4797d7376545b6928a10d32d75685687141ead"
[[package]]
name = "byteorder"
@ -709,9 +708,9 @@ dependencies = [
[[package]]
name = "flume"
version = "0.10.11"
version = "0.10.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b279436a715a9de95dcd26b151db590a71961cc06e54918b24fe0dd5b7d3fc4"
checksum = "843c03199d0c0ca54bc1ea90ac0d507274c28abcc4f691ae8b4eaa375087c76a"
dependencies = [
"futures-core",
"futures-sink",
@ -885,7 +884,7 @@ dependencies = [
"cfg-if 1.0.0",
"js-sys",
"libc",
"wasi",
"wasi 0.10.0+wasi-snapshot-preview1",
"wasm-bindgen",
]
@ -907,9 +906,9 @@ checksum = "9b919933a397b79c37e33b77bb2aa3dc8eb6e165ad809e58ff75bc7db2e34574"
[[package]]
name = "h2"
version = "0.3.11"
version = "0.3.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d9f1f717ddc7b2ba36df7e871fd88db79326551d3d6f1fc406fbfd28b582ff8e"
checksum = "62eeb471aa3e3c9197aa4bfeabfe02982f6dc96f750486c0bb0009ac58b26d2b"
dependencies = [
"bytes",
"fnv",
@ -1373,7 +1372,6 @@ dependencies = [
"serde",
"tokio",
"tracing",
"typemap_rev",
]
[[package]]
@ -1465,14 +1463,15 @@ dependencies = [
[[package]]
name = "mio"
version = "0.8.0"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba272f85fa0b41fc91872be579b3bbe0f56b792aa361a380eb669469f68dafb2"
checksum = "7ba42135c6a5917b9db9cd7b293e5409e1c6b041e6f9825e92e55a894c63b6f8"
dependencies = [
"libc",
"log",
"miow",
"ntapi",
"wasi 0.11.0+wasi-snapshot-preview1",
"winapi",
]
@ -1535,9 +1534,9 @@ checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a"
[[package]]
name = "nanorand"
version = "0.6.1"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "729eb334247daa1803e0a094d0a5c55711b85571179f5ec6e53eccfdf7008958"
checksum = "6a51313c5820b0b02bd422f4b44776fbf47961755c74ce64afc73bfad10226c3"
dependencies = [
"getrandom",
]
@ -1644,18 +1643,18 @@ dependencies = [
[[package]]
name = "num_enum"
version = "0.5.6"
version = "0.5.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "720d3ea1055e4e4574c0c0b0f8c3fd4f24c4cdaf465948206dea090b57b526ad"
checksum = "cf5395665662ef45796a4ff5486c5d41d29e0c09640af4c5f17fd94ee2c119c9"
dependencies = [
"num_enum_derive",
]
[[package]]
name = "num_enum_derive"
version = "0.5.6"
version = "0.5.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0d992b768490d7fe0d8586d9b5745f6c49f557da6d81dc982b1d167ad4edbb21"
checksum = "3b0498641e53dd6ac1a4f22547548caa6864cc4933784319cd1775271c5a46ce"
dependencies = [
"proc-macro-crate",
"proc-macro2 1.0.36",
@ -1665,18 +1664,18 @@ dependencies = [
[[package]]
name = "num_threads"
version = "0.1.3"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "97ba99ba6393e2c3734791401b66902d981cb03bf190af674ca69949b6d5fb15"
checksum = "c539a50b93a303167eded6e8dff5220cd39447409fb659f4cd24b1f72fe4f133"
dependencies = [
"libc",
]
[[package]]
name = "once_cell"
version = "1.9.0"
version = "1.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da32515d9f6e6e489d7bc9d84c71b060db7247dc035bbe44eac88cf87486d8d5"
checksum = "87f3e037eac156d1775da914196f0f37741a274155e34a0b7e427c35d2a2ecb9"
[[package]]
name = "opaque-debug"
@ -1947,9 +1946,9 @@ checksum = "58893f751c9b0412871a09abd62ecd2a00298c6c83befa223ef98c52aef40cbe"
[[package]]
name = "png"
version = "0.17.3"
version = "0.17.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e8f1882177b17c98ec33a51f5910ecbf4db92ca0def706781a1f8d0c661f393"
checksum = "dc38c0ad57efb786dd57b9864e5b18bae478c00c824dc55a38bbc9da95dde3ba"
dependencies = [
"bitflags",
"crc32fast",
@ -2149,18 +2148,18 @@ dependencies = [
[[package]]
name = "redox_syscall"
version = "0.2.10"
version = "0.2.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8383f39639269cde97d255a32bdb68c047337295414940c68bdd30c2e13203ff"
checksum = "8380fe0152551244f0747b1bf41737e0f8a74f97a14ccefd1148187271634f3c"
dependencies = [
"bitflags",
]
[[package]]
name = "regex"
version = "1.5.4"
version = "1.5.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d07a8629359eb56f1e2fb1652bb04212c072a87ba68546a04065d525673ac461"
checksum = "1a11647b6b25ff05a515cb92c365cec08801e83423a235b51e231e1808747286"
dependencies = [
"aho-corasick",
"memchr",
@ -2809,7 +2808,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6db9e6914ab8b1ae1c260a4ae7a49b6c5611b40328a735b21862567685e73255"
dependencies = [
"libc",
"wasi",
"wasi 0.10.0+wasi-snapshot-preview1",
"winapi",
]
@ -3039,9 +3038,9 @@ checksum = "360dfd1d6d30e05fda32ace2c8c70e9c0a9da713275777f5a4dbb8a1893930c6"
[[package]]
name = "tracing"
version = "0.1.31"
version = "0.1.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f6c650a8ef0cd2dd93736f033d21cbd1224c5a967aa0c258d00fcf7dafef9b9f"
checksum = "4a1bdf54a7c28a2bbf701e1d2233f6c77f473486b94bee4f9678da5a148dca7f"
dependencies = [
"cfg-if 1.0.0",
"log",
@ -3052,9 +3051,9 @@ dependencies = [
[[package]]
name = "tracing-appender"
version = "0.2.0"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94571df2eae3ed4353815ea5a90974a594a1792d8782ff2cbcc9392d1101f366"
checksum = "9ab026b18a46ac429e5c98bec10ca06424a97b3ad7b3949d9b4a102fff6623c4"
dependencies = [
"crossbeam-channel",
"time 0.3.7",
@ -3063,9 +3062,9 @@ dependencies = [
[[package]]
name = "tracing-attributes"
version = "0.1.19"
version = "0.1.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8276d9a4a3a558d7b7ad5303ad50b53d58264641b82914b7ada36bd762e7a716"
checksum = "2e65ce065b4b5c53e73bb28912318cb8c9e9ad3921f1d669eb0e68b4c8143a2b"
dependencies = [
"proc-macro2 1.0.36",
"quote 1.0.15",
@ -3074,9 +3073,9 @@ dependencies = [
[[package]]
name = "tracing-core"
version = "0.1.22"
version = "0.1.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "03cfcb51380632a72d3111cb8d3447a8d908e577d31beeac006f836383d29a23"
checksum = "aa31669fa42c09c34d94d8165dd2012e8ff3c66aca50f3bb226b68f216f2706c"
dependencies = [
"lazy_static",
"valuable",
@ -3168,8 +3167,7 @@ checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642"
[[package]]
name = "typemap_rev"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ed5b74f0a24b5454580a79abb6994393b09adf0ab8070f15827cb666255de155"
source = "git+https://github.com/Trivernis/typemap_rev?rev=750c67bffe8024d2a47725daa473f068ad653fc4#750c67bffe8024d2a47725daa473f068ad653fc4"
[[package]]
name = "typenum"
@ -3305,6 +3303,12 @@ version = "0.10.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1a143597ca7c7793eff794def352d41792a93c481eb1042423ff7ff72ba2c31f"
[[package]]
name = "wasi"
version = "0.11.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
[[package]]
name = "wasm-bindgen"
version = "0.2.79"

@ -16,12 +16,12 @@ name = "mediarepo-daemon"
path = "src/main.rs"
[dependencies]
tracing = "0.1.31"
tracing = "0.1.32"
toml = "0.5.8"
structopt = "0.3.26"
glob = "0.3.0"
tracing-flame = "0.2.0"
tracing-appender = "0.2.0"
tracing-appender = "0.2.1"
tracing-log = "0.1.2"
rolling-file = "0.1.0"
num-integer = "0.1.44"

@ -13,17 +13,20 @@ multibase = "0.9.1"
base64 = "0.13.0"
toml = "0.5.8"
serde = "1.0.136"
typemap_rev = "0.1.5"
futures = "0.3.21"
itertools = "0.10.3"
glob = "0.3.0"
tracing = "0.1.31"
tracing = "0.1.32"
data-encoding = "2.3.2"
tokio-graceful-shutdown = "0.4.3"
thumbnailer = "0.4.0"
bincode = "1.3.3"
tracing-subscriber = "0.3.9"
[dependencies.typemap_rev]
git = "https://github.com/Trivernis/typemap_rev"
rev = "750c67bffe8024d2a47725daa473f068ad653fc4"
[dependencies.sea-orm]
version = "0.6.0"
default-features = false

@ -5,6 +5,7 @@ pub use mediarepo_api;
pub use mediarepo_api::bromine;
pub use thumbnailer;
pub use tokio_graceful_shutdown;
pub use typemap_rev;
pub mod content_descriptor;
pub mod context;
@ -13,4 +14,5 @@ pub mod fs;
pub mod settings;
pub mod tracing_layer_list;
pub mod type_keys;
pub mod type_list;
pub mod utils;

@ -0,0 +1,46 @@
use std::any::{Any, TypeId};
use std::vec::IntoIter;
use typemap_rev::TypeMapKey;
pub trait CloneAny: Any + Send + Sync {
fn clone_any(&self) -> Box<dyn CloneAny>;
}
impl<T: Any + Clone + Send + Sync> CloneAny for T {
fn clone_any(&self) -> Box<dyn CloneAny> {
Box::new(self.clone())
}
}
impl Clone for Box<dyn CloneAny> {
fn clone(&self) -> Self {
(**self).clone_any()
}
}
#[derive(Default, Clone)]
pub struct TypeList(Vec<(TypeId, Box<dyn CloneAny>)>);
impl TypeList {
pub fn add<T: TypeMapKey<Value = C>, C: CloneAny>(&mut self, value: T::Value) {
self.0.push((TypeId::of::<T>(), Box::new(value)))
}
}
impl IntoIterator for TypeList {
type Item = (TypeId, Box<dyn Any + Send + Sync>);
type IntoIter = IntoIter<Self::Item>;
fn into_iter(self) -> Self::IntoIter {
self.0
.into_iter()
.map(|(t, v)| {
(t, unsafe {
// SAFETY: CloneAny requires types to be Any + Send + Sync (+ Clone)
std::mem::transmute::<Box<dyn CloneAny>, Box<dyn Any + Send + Sync>>(v)
})
})
.collect::<Vec<(TypeId, Box<dyn Any + Send + Sync>)>>()
.into_iter()
}
}

@ -8,7 +8,7 @@ workspace = ".."
[dependencies]
chrono = "0.4.19"
tracing = "0.1.31"
tracing = "0.1.32"
[dependencies.mediarepo-core]
path = "../mediarepo-core"

@ -1,15 +1,5 @@
CREATE TABLE jobs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
job_type INTEGER NOT NULL,
name VARCHAR(255),
next_run DATETIME,
interval INTEGER
);
CREATE TABLE job_states (
job_id INTEGER,
key VARCHAR(128) NOT NULL DEFAULT 'default',
job_type INTEGER NOT NULL,
value BLOB,
PRIMARY KEY (job_id, key),
FOREIGN KEY (job_id) REFERENCES jobs (id)
PRIMARY KEY (job_type)
);

@ -1,29 +1,45 @@
use sea_orm::prelude::*;
use sea_orm::TryFromU64;
#[derive(Clone, Debug, PartialEq, DeriveEntityModel)]
#[sea_orm(table_name = "namespaces")]
pub struct Model {
#[sea_orm(primary_key)]
pub job_id: i64,
#[sea_orm(primary_key)]
pub key: String,
pub job_type: JobType,
pub value: Vec<u8>,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {
#[sea_orm(
belongs_to = "super::job::Entity",
from = "Column::JobId",
to = "super::job::Column::Id"
)]
Job,
#[derive(Clone, Copy, Debug, PartialEq, EnumIter, DeriveActiveEnum)]
#[sea_orm(rs_type = "u32", db_type = "Integer")]
pub enum JobType {
#[sea_orm(num_value = 10)]
MigrateCDs,
#[sea_orm(num_value = 20)]
CalculateSizes,
#[sea_orm(num_value = 30)]
GenerateThumbs,
#[sea_orm(num_value = 40)]
CheckIntegrity,
#[sea_orm(num_value = 50)]
Vacuum,
}
impl Related<super::job::Entity> for Entity {
fn to() -> RelationDef {
Relation::Job.def()
impl TryFromU64 for JobType {
fn try_from_u64(n: u64) -> Result<Self, DbErr> {
let value = match n {
10 => Self::MigrateCDs,
20 => Self::CalculateSizes,
30 => Self::GenerateThumbs,
40 => Self::CheckIntegrity,
50 => Self::Vacuum,
_ => return Err(DbErr::Custom(String::from("Invalid job type"))),
};
Ok(value)
}
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {}
impl ActiveModelBehavior for ActiveModel {}

@ -3,7 +3,6 @@ pub mod content_descriptor_source;
pub mod content_descriptor_tag;
pub mod file;
pub mod file_metadata;
pub mod job;
pub mod job_state;
pub mod namespace;
pub mod sort_key;

@ -8,11 +8,10 @@ workspace = ".."
[dependencies]
chrono = "0.4.19"
typemap_rev = "0.1.5"
serde = "1.0.136"
mime_guess = "2.0.4"
mime = "0.3.16"
tracing = "0.1.31"
tracing = "0.1.32"
async-trait = "0.1.52"
[dependencies.mediarepo-core]

@ -1,9 +1,4 @@
use crate::dao_provider;
use crate::dto::JobDto;
use chrono::Local;
use mediarepo_core::error::RepoResult;
use mediarepo_database::entities::job;
use sea_orm::prelude::*;
pub mod generate_missing_thumbnails;
pub mod migrate_content_descriptors;
@ -11,19 +6,3 @@ pub mod sqlite_operations;
pub mod state;
dao_provider!(JobDao);
impl JobDao {
/// Returns a list of all jobs that are scheduled (have a next_run date)
pub async fn scheduled_for_now(&self) -> RepoResult<Vec<JobDto>> {
let jobs = job::Entity::find()
.filter(job::Column::NextRun.is_not_null())
.filter(job::Column::NextRun.lt(Local::now().naive_local()))
.all(&self.ctx.db)
.await?
.into_iter()
.map(JobDto::new)
.collect();
Ok(jobs)
}
}

@ -2,15 +2,16 @@ use crate::dao::job::JobDao;
use crate::dto::{JobStateDto, UpsertJobStateDto};
use mediarepo_core::error::RepoResult;
use mediarepo_database::entities::job_state;
use mediarepo_database::entities::job_state::JobType;
use sea_orm::prelude::*;
use sea_orm::ActiveValue::Set;
use sea_orm::{Condition, TransactionTrait};
impl JobDao {
/// Returns all job states for a given job id
pub async fn states_for_job_id(&self, job_id: i64) -> RepoResult<Vec<JobStateDto>> {
pub async fn states_for_job_type(&self, job_type: JobType) -> RepoResult<Vec<JobStateDto>> {
let states = job_state::Entity::find()
.filter(job_state::Column::JobId.eq(job_id))
.filter(job_state::Column::JobType.eq(job_type))
.all(&self.ctx.db)
.await?
.into_iter()
@ -40,11 +41,7 @@ impl JobDao {
fn build_state_filters(states: &Vec<UpsertJobStateDto>) -> Condition {
states
.iter()
.map(|s| {
Condition::all()
.add(job_state::Column::JobId.eq(s.job_id))
.add(job_state::Column::Key.eq(s.key.to_owned()))
})
.map(|s| Condition::all().add(job_state::Column::JobType.eq(s.job_type)))
.fold(Condition::any(), |acc, cond| acc.add(cond))
}
@ -52,8 +49,7 @@ fn build_active_state_models(states: Vec<UpsertJobStateDto>) -> Vec<job_state::A
states
.into_iter()
.map(|s| job_state::ActiveModel {
job_id: Set(s.job_id),
key: Set(s.key),
job_type: Set(s.job_type),
value: Set(s.value),
})
.collect()

@ -1,4 +1,5 @@
use mediarepo_database::entities::job_state;
use mediarepo_database::entities::job_state::JobType;
#[derive(Clone, Debug)]
pub struct JobStateDto {
@ -10,12 +11,8 @@ impl JobStateDto {
Self { model }
}
pub fn job_id(&self) -> i64 {
self.model.job_id
}
pub fn key(&self) -> &String {
&self.model.key
pub fn job_type(&self) -> JobType {
self.model.job_type
}
pub fn value(&self) -> &[u8] {
@ -29,7 +26,6 @@ impl JobStateDto {
#[derive(Clone, Debug)]
pub struct UpsertJobStateDto {
pub job_id: i64,
pub key: String,
pub job_type: JobType,
pub value: Vec<u8>,
}

@ -1,6 +1,5 @@
pub use file::*;
pub use file_metadata::*;
pub use job::*;
pub use job_state::*;
pub use namespace::*;
pub use sorting_preset::*;
@ -9,7 +8,6 @@ pub use thumbnail::*;
mod file;
mod file_metadata;
mod job;
mod job_state;
mod namespace;
mod sorting_preset;

@ -1,8 +1,7 @@
use std::sync::Arc;
use typemap_rev::TypeMapKey;
use crate::dao::repo::Repo;
use mediarepo_core::typemap_rev::TypeMapKey;
pub struct RepoKey;

@ -8,7 +8,7 @@ workspace = ".."
[dependencies]
serde = "1.0.136"
tracing = "0.1.31"
tracing = "0.1.32"
compare = "0.1.0"
port_check = "0.1.5"
rayon = "1.5.1"

@ -1,6 +1,4 @@
use std::net::SocketAddr;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::net::TcpListener;
use tokio::task::JoinHandle;
@ -10,20 +8,18 @@ use mediarepo_core::error::{RepoError, RepoResult};
use mediarepo_core::mediarepo_api::types::misc::InfoResponse;
use mediarepo_core::settings::{PortSetting, Settings};
use mediarepo_core::tokio_graceful_shutdown::SubsystemHandle;
use mediarepo_core::type_keys::{RepoPathKey, SettingsKey, SizeMetadataKey, SubsystemKey};
use mediarepo_logic::dao::repo::Repo;
use mediarepo_logic::type_keys::RepoKey;
use mediarepo_core::type_keys::{SizeMetadataKey, SubsystemKey};
use mediarepo_core::type_list::TypeList;
mod from_model;
mod namespaces;
mod utils;
#[tracing::instrument(skip(subsystem, settings, repo))]
#[tracing::instrument(skip_all)]
pub fn start_tcp_server(
subsystem: SubsystemHandle,
repo_path: PathBuf,
settings: Settings,
repo: Repo,
shared_data: TypeList,
) -> RepoResult<(String, JoinHandle<()>)> {
let port = match &settings.server.tcp.port {
PortSetting::Fixed(p) => {
@ -45,9 +41,7 @@ pub fn start_tcp_server(
.spawn(async move {
get_builder::<TcpListener>(address)
.insert::<SubsystemKey>(subsystem)
.insert::<RepoKey>(Arc::new(repo))
.insert::<SettingsKey>(settings)
.insert::<RepoPathKey>(repo_path)
.insert_all(shared_data)
.insert::<SizeMetadataKey>(Default::default())
.build_server()
.await
@ -58,13 +52,11 @@ pub fn start_tcp_server(
}
#[cfg(unix)]
#[tracing::instrument(skip(subsystem, settings, repo))]
#[tracing::instrument(skip_all)]
pub fn create_unix_socket(
subsystem: SubsystemHandle,
path: std::path::PathBuf,
repo_path: PathBuf,
settings: Settings,
repo: Repo,
shared_data: TypeList,
) -> RepoResult<JoinHandle<()>> {
use std::fs;
use tokio::net::UnixListener;
@ -77,9 +69,7 @@ pub fn create_unix_socket(
.spawn(async move {
get_builder::<UnixListener>(path)
.insert::<SubsystemKey>(subsystem)
.insert::<RepoKey>(Arc::new(repo))
.insert::<SettingsKey>(settings)
.insert::<RepoPathKey>(repo_path)
.insert_all(shared_data)
.insert::<SizeMetadataKey>(Default::default())
.build_server()
.await

@ -7,7 +7,7 @@ edition = "2021"
[dependencies]
async-trait = "0.1.52"
tracing = "0.1.31"
tracing = "0.1.32"
[dependencies.mediarepo-core]
path = "../mediarepo-core"
@ -28,4 +28,4 @@ features = ["serde"]
[dependencies.serde]
version = "1.0.136"
features = ["derive"]
features = ["derive"]

@ -0,0 +1,79 @@
use crate::jobs::{Job, JobTypeKey};
use mediarepo_core::tokio_graceful_shutdown::SubsystemHandle;
use mediarepo_core::typemap_rev::{TypeMap, TypeMapKey};
use mediarepo_logic::dao::repo::Repo;
use mediarepo_logic::dao::DaoProvider;
use std::cell::UnsafeCell;
use std::sync::Arc;
use tokio::sync::RwLock;
#[derive(Clone)]
pub struct JobDispatcher {
subsystem: Arc<UnsafeCell<SubsystemHandle>>,
job_status_map: Arc<RwLock<TypeMap>>,
repo: Arc<Repo>,
}
impl JobDispatcher {
pub fn new(subsystem: SubsystemHandle, repo: Repo) -> Self {
Self {
job_status_map: Default::default(),
subsystem: Arc::new(UnsafeCell::new(subsystem)),
repo: Arc::new(repo),
}
}
#[tracing::instrument(level = "debug", skip_all)]
pub async fn dispatch<T: 'static + Job>(&self, job: T) -> Arc<RwLock<T::JobStatus>> {
let status = job.status();
self.add_status::<JobTypeKey<T>>(status.clone()).await;
let subsystem = unsafe {
// SAFETY: the subsystem requires a mutable borrow for the start method
// the implementation of start doesn't need that mutability. So until that's
// changed we have to do some trickery.
&mut *self.subsystem.get()
};
let repo = self.repo.clone();
subsystem.start("worker-job", |subsystem| async move {
let job_2 = job.clone();
let result = tokio::select! {
_ = subsystem.on_shutdown_requested() => {
job_2.save_status(repo.job()).await
}
r = job.run(repo.clone()) => {
if let Err(e) = r {
Err(e)
} else {
job.save_status(repo.job()).await
}
}
};
if let Err(e) = result {
tracing::error!("job failed with error: {}", e);
}
Ok(())
});
status
}
#[inline]
async fn add_status<T: TypeMapKey>(&self, status: T::Value) {
let mut status_map = self.job_status_map.write().await;
status_map.insert::<T>(status);
}
}
pub struct DispatcherKey;
impl TypeMapKey for DispatcherKey {
type Value = JobDispatcher;
}
unsafe impl Send for JobDispatcher {}
unsafe impl Sync for JobDispatcher {}

@ -1,18 +1,34 @@
mod vacuum;
use std::marker::PhantomData;
use std::sync::Arc;
pub use vacuum::*;
use crate::execution_state::JobExecutionState;
use crate::progress::{JobProgressUpdate, ProgressSender};
use crate::state_data::StateData;
use async_trait::async_trait;
use mediarepo_core::error::RepoResult;
use mediarepo_core::typemap_rev::TypeMapKey;
use mediarepo_logic::dao::job::JobDao;
use mediarepo_logic::dao::repo::Repo;
use tokio::sync::mpsc::Sender;
use tokio::sync::RwLock;
type EmptyStatus = Arc<RwLock<()>>;
#[async_trait]
pub trait ScheduledJob {
async fn set_state(&self, state: StateData) -> RepoResult<()>;
pub trait Job: Clone + Send + Sync {
type JobStatus: Send + Sync;
fn status(&self) -> Arc<RwLock<Self::JobStatus>>;
async fn run(&self, repo: Arc<Repo>) -> RepoResult<()>;
async fn save_status(&self, job_dao: JobDao) -> RepoResult<()>;
}
pub struct JobTypeKey<T: Job>(PhantomData<T>);
async fn run(&self, sender: &ProgressSender, repo: Repo) -> RepoResult<()>;
impl<T: 'static> TypeMapKey for JobTypeKey<T>
where
T: Job,
{
type Value = Arc<RwLock<T::JobStatus>>;
}

@ -1,29 +1,32 @@
use crate::execution_state::{ExecutionStateSynchronizer, JobExecutionState};
use crate::jobs::ScheduledJob;
use crate::progress::{JobProgressUpdate, ProgressSender};
use crate::state_data::StateData;
use crate::jobs::{EmptyStatus, Job};
use async_trait::async_trait;
use mediarepo_core::error::RepoResult;
use mediarepo_logic::dao::job::JobDao;
use mediarepo_logic::dao::repo::Repo;
use mediarepo_logic::dao::DaoProvider;
use std::sync::Arc;
use tokio::sync::mpsc::Sender;
use tokio::sync::RwLock;
#[derive(Default, Clone)]
pub struct VacuumJob;
#[async_trait]
impl ScheduledJob for VacuumJob {
async fn set_state(&self, _: StateData) -> RepoResult<()> {
Ok(())
impl Job for VacuumJob {
type JobStatus = ();
fn status(&self) -> Arc<RwLock<Self::JobStatus>> {
EmptyStatus::default()
}
async fn run(&self, sender: &ProgressSender, repo: Repo) -> RepoResult<()> {
sender.send_progress_percent(0.0);
#[tracing::instrument(level = "debug", skip_all)]
async fn run(&self, repo: Arc<Repo>) -> RepoResult<()> {
repo.job().vacuum().await?;
sender.send_progress_percent(1.0);
Ok(())
}
#[tracing::instrument(level = "debug", skip_all)]
async fn save_status(&self, _: JobDao) -> RepoResult<()> {
Ok(())
}
}

@ -1,6 +1,27 @@
pub mod execution_state;
use crate::job_dispatcher::JobDispatcher;
use crate::jobs::VacuumJob;
use mediarepo_core::error::RepoError;
use mediarepo_core::tokio_graceful_shutdown::Toplevel;
use mediarepo_logic::dao::repo::Repo;
use tokio::sync::oneshot::channel;
pub mod job_dispatcher;
pub mod jobs;
pub mod jobs_table;
pub mod progress;
pub mod scheduler;
pub mod state_data;
pub async fn start(top_level: Toplevel, repo: Repo) -> (Toplevel, JobDispatcher) {
let (tx, rx) = channel();
let top_level = top_level.start("mediarepo-worker", |subsystem| async move {
let dispatcher = JobDispatcher::new(subsystem, repo);
tx.send(dispatcher.clone())
.map_err(|_| RepoError::from("failed to send dispatcher"))?;
dispatcher.dispatch(VacuumJob::default()).await;
Ok(())
});
let receiver = rx
.await
.expect("failed to create background job dispatcher");
(top_level, receiver)
}

@ -1,5 +1,6 @@
use std::env;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use structopt::StructOpt;
@ -10,8 +11,12 @@ use mediarepo_core::error::RepoResult;
use mediarepo_core::fs::drop_file::DropFile;
use mediarepo_core::settings::{PathSettings, Settings};
use mediarepo_core::tokio_graceful_shutdown::{SubsystemHandle, Toplevel};
use mediarepo_core::type_keys::{RepoPathKey, SettingsKey};
use mediarepo_core::type_list::TypeList;
use mediarepo_logic::dao::repo::Repo;
use mediarepo_logic::type_keys::RepoKey;
use mediarepo_socket::start_tcp_server;
use mediarepo_worker::job_dispatcher::DispatcherKey;
use crate::utils::{create_paths_for_repo, get_repo, load_settings};
@ -99,18 +104,23 @@ async fn init_repo(opt: &Opt, paths: &PathSettings) -> RepoResult<Repo> {
/// Starts the server
async fn start_server(opt: Opt, settings: Settings) -> RepoResult<()> {
let repo = init_repo(&opt, &settings.paths).await?;
let mut top_level = Toplevel::new();
let (mut top_level, dispatcher) = mediarepo_worker::start(Toplevel::new(), repo.clone()).await;
let mut shared_data = TypeList::default();
shared_data.add::<RepoKey, _>(Arc::new(repo));
shared_data.add::<SettingsKey, _>(settings.clone());
shared_data.add::<RepoPathKey, _>(opt.repo.clone());
shared_data.add::<DispatcherKey, _>(dispatcher);
#[cfg(unix)]
{
if settings.server.unix_socket.enabled {
let settings = settings.clone();
let repo_path = opt.repo.clone();
let repo = repo.clone();
let shared_data = shared_data.clone();
top_level = top_level.start("mediarepo-unix-socket", |subsystem| {
Box::pin(async move {
start_and_await_unix_socket(subsystem, repo_path, settings, repo).await?;
start_and_await_unix_socket(subsystem, repo_path, shared_data).await?;
Ok(())
})
})
@ -120,7 +130,7 @@ async fn start_server(opt: Opt, settings: Settings) -> RepoResult<()> {
if settings.server.tcp.enabled {
top_level = top_level.start("mediarepo-tcp", move |subsystem| {
Box::pin(async move {
start_and_await_tcp_server(subsystem, opt.repo, settings, repo).await?;
start_and_await_tcp_server(subsystem, opt.repo, settings, shared_data).await?;
Ok(())
})
@ -147,9 +157,9 @@ async fn start_and_await_tcp_server(
subsystem: SubsystemHandle,
repo_path: PathBuf,
settings: Settings,
repo: Repo,
shared_data: TypeList,
) -> RepoResult<()> {
let (address, handle) = start_tcp_server(subsystem.clone(), repo_path.clone(), settings, repo)?;
let (address, handle) = start_tcp_server(subsystem.clone(), settings, shared_data)?;
let (mut file, _guard) = DropFile::new(repo_path.join("repo.tcp")).await?;
file.write_all(&address.into_bytes()).await?;
@ -172,17 +182,10 @@ async fn start_and_await_tcp_server(
async fn start_and_await_unix_socket(
subsystem: SubsystemHandle,
repo_path: PathBuf,
settings: Settings,
repo: Repo,
shared_data: TypeList,
) -> RepoResult<()> {
let socket_path = repo_path.join("repo.sock");
let handle = mediarepo_socket::create_unix_socket(
subsystem.clone(),
socket_path,
repo_path.clone(),
settings,
repo,
)?;
let handle = mediarepo_socket::create_unix_socket(subsystem.clone(), socket_path, shared_data)?;
let _guard = DropFile::from_path(repo_path.join("repo.sock"));
tokio::select! {

Loading…
Cancel
Save