Update to latest bromine

Signed-off-by: trivernis <trivernis@protonmail.com>
feature/jobs
trivernis 3 years ago
parent 2f11c87395
commit 22f28a79d1
Signed by: Trivernis
GPG Key ID: DFFFCC2C7A02DB45

@ -20,9 +20,8 @@ url = { version = "2.2.2", optional = true }
pathsearch = { version = "0.2.0", optional = true } pathsearch = { version = "0.2.0", optional = true }
[dependencies.bromine] [dependencies.bromine]
version = "0.18.1" version = "0.19.0"
optional = true optional = true
git = "https://github.com/Trivernis/bromine"
features = ["serialize_bincode"] features = ["serialize_bincode"]
[dependencies.serde] [dependencies.serde]

@ -81,9 +81,9 @@ checksum = "8da52d66c7071e2e3fa2a1e5c6d088fec47b593032b254f5e980de8ea54454d6"
[[package]] [[package]]
name = "async-recursion" name = "async-recursion"
version = "0.3.2" version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d7d78656ba01f1b93024b7c3a0467f1608e4be67d725749fdcd7d2c7678fd7a2" checksum = "2cda8f4bcc10624c4e85bc66b3f452cca98cfa5ca002dc83a16aad2367641bea"
dependencies = [ dependencies = [
"proc-macro2 1.0.36", "proc-macro2 1.0.36",
"quote 1.0.15", "quote 1.0.15",
@ -270,8 +270,9 @@ dependencies = [
[[package]] [[package]]
name = "bromine" name = "bromine"
version = "0.18.3" version = "0.19.0"
source = "git+https://github.com/Trivernis/bromine#c2728a44ead210e1535bce5fc2d3979530700b96" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a05cd0cd5646e705df88816dcc36eaf4e21b940cea66f1e027970cd58e3dc897"
dependencies = [ dependencies = [
"async-trait", "async-trait",
"bincode", "bincode",
@ -283,7 +284,7 @@ dependencies = [
"thiserror", "thiserror",
"tokio", "tokio",
"tracing", "tracing",
"typemap_rev", "trait-bound-typemap",
] ]
[[package]] [[package]]
@ -1318,7 +1319,7 @@ dependencies = [
"toml", "toml",
"tracing", "tracing",
"tracing-subscriber", "tracing-subscriber",
"typemap_rev", "trait-bound-typemap",
] ]
[[package]] [[package]]
@ -1485,6 +1486,12 @@ dependencies = [
"winapi", "winapi",
] ]
[[package]]
name = "multi-trait-object"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a54c9ed2b86c7927b63e7d51f8d7ed4e1f8513c8672828ca1a850ff9d32ab1c"
[[package]] [[package]]
name = "multibase" name = "multibase"
version = "0.9.1" version = "0.9.1"
@ -2871,16 +2878,16 @@ dependencies = [
[[package]] [[package]]
name = "tokio-graceful-shutdown" name = "tokio-graceful-shutdown"
version = "0.4.3" version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d08ebea7dc6b22273290d8ece2ca448f979f836e38ba629b650595c64204b4f2" checksum = "4f21c36e43c82d5f32302aff8ac9efb79e10db9538b0940ef69cce38a01614ae"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-recursion", "async-recursion",
"futures 0.3.21", "futures 0.3.21",
"log", "log",
"tokio", "tokio",
"tokio-util 0.6.9", "tokio-util 0.7.0",
] ]
[[package]] [[package]]
@ -3159,17 +3166,21 @@ dependencies = [
"tracing-serde", "tracing-serde",
] ]
[[package]]
name = "trait-bound-typemap"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3631df5ba73c0e41b1aa337df3bcca7f15219f042f8fec1100857bc1eb60c767"
dependencies = [
"multi-trait-object",
]
[[package]] [[package]]
name = "try-lock" name = "try-lock"
version = "0.2.3" version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642" checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642"
[[package]]
name = "typemap_rev"
version = "0.1.5"
source = "git+https://github.com/Trivernis/typemap_rev?rev=750c67bffe8024d2a47725daa473f068ad653fc4#750c67bffe8024d2a47725daa473f068ad653fc4"
[[package]] [[package]]
name = "typenum" name = "typenum"
version = "1.15.0" version = "1.15.0"

@ -18,14 +18,11 @@ itertools = "0.10.3"
glob = "0.3.0" glob = "0.3.0"
tracing = "0.1.32" tracing = "0.1.32"
data-encoding = "2.3.2" data-encoding = "2.3.2"
tokio-graceful-shutdown = "0.4.3" tokio-graceful-shutdown = "0.4.4"
thumbnailer = "0.4.0" thumbnailer = "0.4.0"
bincode = "1.3.3" bincode = "1.3.3"
tracing-subscriber = "0.3.9" tracing-subscriber = "0.3.9"
trait-bound-typemap = "0.3.3"
[dependencies.typemap_rev]
git = "https://github.com/Trivernis/typemap_rev"
rev = "750c67bffe8024d2a47725daa473f068ad653fc4"
[dependencies.sea-orm] [dependencies.sea-orm]
version = "0.6.0" version = "0.6.0"

@ -5,7 +5,7 @@ pub use mediarepo_api;
pub use mediarepo_api::bromine; pub use mediarepo_api::bromine;
pub use thumbnailer; pub use thumbnailer;
pub use tokio_graceful_shutdown; pub use tokio_graceful_shutdown;
pub use typemap_rev; pub use trait_bound_typemap;
pub mod content_descriptor; pub mod content_descriptor;
pub mod context; pub mod context;
@ -14,5 +14,4 @@ pub mod fs;
pub mod settings; pub mod settings;
pub mod tracing_layer_list; pub mod tracing_layer_list;
pub mod type_keys; pub mod type_keys;
pub mod type_list;
pub mod utils; pub mod utils;

@ -3,7 +3,7 @@ use std::path::PathBuf;
use mediarepo_api::types::repo::SizeType; use mediarepo_api::types::repo::SizeType;
use tokio_graceful_shutdown::SubsystemHandle; use tokio_graceful_shutdown::SubsystemHandle;
use typemap_rev::TypeMapKey; use trait_bound_typemap::TypeMapKey;
use crate::settings::Settings; use crate::settings::Settings;

@ -1,46 +0,0 @@
use std::any::{Any, TypeId};
use std::vec::IntoIter;
use typemap_rev::TypeMapKey;
pub trait CloneAny: Any + Send + Sync {
fn clone_any(&self) -> Box<dyn CloneAny>;
}
impl<T: Any + Clone + Send + Sync> CloneAny for T {
fn clone_any(&self) -> Box<dyn CloneAny> {
Box::new(self.clone())
}
}
impl Clone for Box<dyn CloneAny> {
fn clone(&self) -> Self {
(**self).clone_any()
}
}
#[derive(Default, Clone)]
pub struct TypeList(Vec<(TypeId, Box<dyn CloneAny>)>);
impl TypeList {
pub fn add<T: TypeMapKey<Value = C>, C: CloneAny>(&mut self, value: T::Value) {
self.0.push((TypeId::of::<T>(), Box::new(value)))
}
}
impl IntoIterator for TypeList {
type Item = (TypeId, Box<dyn Any + Send + Sync>);
type IntoIter = IntoIter<Self::Item>;
fn into_iter(self) -> Self::IntoIter {
self.0
.into_iter()
.map(|(t, v)| {
(t, unsafe {
// SAFETY: CloneAny requires types to be Any + Send + Sync (+ Clone)
std::mem::transmute::<Box<dyn CloneAny>, Box<dyn Any + Send + Sync>>(v)
})
})
.collect::<Vec<(TypeId, Box<dyn Any + Send + Sync>)>>()
.into_iter()
}
}

@ -1,7 +1,7 @@
use mediarepo_core::trait_bound_typemap::TypeMapKey;
use std::sync::Arc; use std::sync::Arc;
use crate::dao::repo::Repo; use crate::dao::repo::Repo;
use mediarepo_core::typemap_rev::TypeMapKey;
pub struct RepoKey; pub struct RepoKey;

@ -8,8 +8,8 @@ use mediarepo_core::error::{RepoError, RepoResult};
use mediarepo_core::mediarepo_api::types::misc::InfoResponse; use mediarepo_core::mediarepo_api::types::misc::InfoResponse;
use mediarepo_core::settings::{PortSetting, Settings}; use mediarepo_core::settings::{PortSetting, Settings};
use mediarepo_core::tokio_graceful_shutdown::SubsystemHandle; use mediarepo_core::tokio_graceful_shutdown::SubsystemHandle;
use mediarepo_core::trait_bound_typemap::{SendSyncTypeMap, TypeMap};
use mediarepo_core::type_keys::{SizeMetadataKey, SubsystemKey}; use mediarepo_core::type_keys::{SizeMetadataKey, SubsystemKey};
use mediarepo_core::type_list::TypeList;
mod from_model; mod from_model;
mod namespaces; mod namespaces;
@ -19,7 +19,7 @@ mod utils;
pub fn start_tcp_server( pub fn start_tcp_server(
subsystem: SubsystemHandle, subsystem: SubsystemHandle,
settings: Settings, settings: Settings,
shared_data: TypeList, shared_data: SendSyncTypeMap,
) -> RepoResult<(String, JoinHandle<()>)> { ) -> RepoResult<(String, JoinHandle<()>)> {
let port = match &settings.server.tcp.port { let port = match &settings.server.tcp.port {
PortSetting::Fixed(p) => { PortSetting::Fixed(p) => {
@ -56,7 +56,7 @@ pub fn start_tcp_server(
pub fn create_unix_socket( pub fn create_unix_socket(
subsystem: SubsystemHandle, subsystem: SubsystemHandle,
path: std::path::PathBuf, path: std::path::PathBuf,
shared_data: TypeList, shared_data: SendSyncTypeMap,
) -> RepoResult<JoinHandle<()>> { ) -> RepoResult<JoinHandle<()>> {
use std::fs; use std::fs;
use tokio::net::UnixListener; use tokio::net::UnixListener;

@ -1,3 +1,4 @@
use crate::TypeMap;
use mediarepo_core::bromine::prelude::*; use mediarepo_core::bromine::prelude::*;
use mediarepo_core::error::RepoResult; use mediarepo_core::error::RepoResult;
use mediarepo_core::mediarepo_api::types::jobs::{JobType, RunJobRequest}; use mediarepo_core::mediarepo_api::types::jobs::{JobType, RunJobRequest};

@ -2,6 +2,7 @@ use std::path::PathBuf;
use tokio::fs; use tokio::fs;
use crate::TypeMap;
use mediarepo_core::bromine::prelude::*; use mediarepo_core::bromine::prelude::*;
use mediarepo_core::mediarepo_api::types::repo::{ use mediarepo_core::mediarepo_api::types::repo::{
FrontendState, RepositoryMetadata, SizeMetadata, SizeType, FrontendState, RepositoryMetadata, SizeMetadata, SizeType,

@ -1,5 +1,6 @@
use std::sync::Arc; use std::sync::Arc;
use crate::TypeMap;
use mediarepo_core::bromine::ipc::context::Context; use mediarepo_core::bromine::ipc::context::Context;
use mediarepo_core::content_descriptor::decode_content_descriptor; use mediarepo_core::content_descriptor::decode_content_descriptor;
use mediarepo_core::error::{RepoError, RepoResult}; use mediarepo_core::error::{RepoError, RepoResult};

@ -1,9 +1,8 @@
use crate::jobs::{Job, JobTypeKey}; use crate::jobs::{Job, JobTypeKey};
use mediarepo_core::tokio_graceful_shutdown::SubsystemHandle; use mediarepo_core::tokio_graceful_shutdown::SubsystemHandle;
use mediarepo_core::typemap_rev::{TypeMap, TypeMapKey}; use mediarepo_core::trait_bound_typemap::{SendSyncTypeMap, TypeMap, TypeMapKey};
use mediarepo_logic::dao::repo::Repo; use mediarepo_logic::dao::repo::Repo;
use mediarepo_logic::dao::DaoProvider; use mediarepo_logic::dao::DaoProvider;
use std::cell::UnsafeCell;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use tokio::sync::RwLock; use tokio::sync::RwLock;
@ -11,16 +10,16 @@ use tokio::time::Instant;
#[derive(Clone)] #[derive(Clone)]
pub struct JobDispatcher { pub struct JobDispatcher {
subsystem: Arc<UnsafeCell<SubsystemHandle>>, subsystem: SubsystemHandle,
job_status_map: Arc<RwLock<TypeMap>>, job_status_map: Arc<RwLock<SendSyncTypeMap>>,
repo: Arc<Repo>, repo: Arc<Repo>,
} }
impl JobDispatcher { impl JobDispatcher {
pub fn new(subsystem: SubsystemHandle, repo: Repo) -> Self { pub fn new(subsystem: SubsystemHandle, repo: Repo) -> Self {
Self { Self {
job_status_map: Default::default(), job_status_map: Arc::new(RwLock::new(SendSyncTypeMap::new())),
subsystem: Arc::new(UnsafeCell::new(subsystem)), subsystem,
repo: Arc::new(repo), repo: Arc::new(repo),
} }
} }
@ -46,54 +45,51 @@ impl JobDispatcher {
let status = job.state(); let status = job.state();
self.add_status::<JobTypeKey<T>>(status.clone()).await; self.add_status::<JobTypeKey<T>>(status.clone()).await;
let subsystem = unsafe {
// SAFETY: the subsystem requires a mutable borrow for the start method
// the implementation of start doesn't need that mutability. So until that's
// changed we have to do some trickery.
&mut *self.subsystem.get()
};
let repo = self.repo.clone(); let repo = self.repo.clone();
subsystem.start("worker-job", move |subsystem| async move { self.subsystem
loop { .start("worker-job", move |subsystem| async move {
let start = Instant::now(); loop {
let job_2 = job.clone(); let start = Instant::now();
let result = tokio::select! { let job_2 = job.clone();
_ = subsystem.on_shutdown_requested() => { let result = tokio::select! {
job_2.save_state(repo.job()).await _ = subsystem.on_shutdown_requested() => {
} job_2.save_state(repo.job()).await
r = job.run(repo.clone()) => { }
r = job.run(repo.clone()) => {
if let Err(e) = r { if let Err(e) = r {
Err(e) Err(e)
} else { } else {
job.save_state(repo.job()).await job.save_state(repo.job()).await
}
} }
};
if let Err(e) = result {
tracing::error!("job failed with error: {}", e);
} }
}; if let Some(interval) = interval {
if let Err(e) = result { let sleep_duration = interval - start.elapsed();
tracing::error!("job failed with error: {}", e); tokio::select! {
} _ = tokio::time::sleep(sleep_duration) => {},
if let Some(interval) = interval { _ = subsystem.on_shutdown_requested() => {break}
let sleep_duration = interval - start.elapsed(); }
tokio::select! { } else {
_ = tokio::time::sleep(sleep_duration) => {}, break;
_ = subsystem.on_shutdown_requested() => {break}
} }
} else {
break;
} }
}
Ok(()) Ok(())
}); });
status status
} }
#[inline] #[inline]
async fn add_status<T: TypeMapKey>(&self, status: T::Value) { async fn add_status<T: TypeMapKey>(&self, status: T::Value)
where
<T as TypeMapKey>::Value: Send + Sync,
{
let mut status_map = self.job_status_map.write().await; let mut status_map = self.job_status_map.write().await;
status_map.insert::<T>(status); status_map.insert::<T>(status);
} }

@ -8,7 +8,7 @@ pub use vacuum::*;
use async_trait::async_trait; use async_trait::async_trait;
use mediarepo_core::error::RepoResult; use mediarepo_core::error::RepoResult;
use mediarepo_core::typemap_rev::TypeMapKey; use mediarepo_core::trait_bound_typemap::TypeMapKey;
use mediarepo_logic::dao::job::JobDao; use mediarepo_logic::dao::job::JobDao;
use mediarepo_logic::dao::repo::Repo; use mediarepo_logic::dao::repo::Repo;
use tokio::sync::RwLock; use tokio::sync::RwLock;

@ -1,4 +1,5 @@
use std::env; use std::env;
use std::iter::FromIterator;
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
@ -11,8 +12,8 @@ use mediarepo_core::error::RepoResult;
use mediarepo_core::fs::drop_file::DropFile; use mediarepo_core::fs::drop_file::DropFile;
use mediarepo_core::settings::{PathSettings, Settings}; use mediarepo_core::settings::{PathSettings, Settings};
use mediarepo_core::tokio_graceful_shutdown::{SubsystemHandle, Toplevel}; use mediarepo_core::tokio_graceful_shutdown::{SubsystemHandle, Toplevel};
use mediarepo_core::trait_bound_typemap::{CloneSendSyncTypeMap, SendSyncTypeMap, TypeMap};
use mediarepo_core::type_keys::{RepoPathKey, SettingsKey}; use mediarepo_core::type_keys::{RepoPathKey, SettingsKey};
use mediarepo_core::type_list::TypeList;
use mediarepo_logic::dao::repo::Repo; use mediarepo_logic::dao::repo::Repo;
use mediarepo_logic::type_keys::RepoKey; use mediarepo_logic::type_keys::RepoKey;
use mediarepo_socket::start_tcp_server; use mediarepo_socket::start_tcp_server;
@ -106,11 +107,11 @@ async fn start_server(opt: Opt, settings: Settings) -> RepoResult<()> {
let repo = init_repo(&opt, &settings.paths).await?; let repo = init_repo(&opt, &settings.paths).await?;
let (mut top_level, dispatcher) = mediarepo_worker::start(Toplevel::new(), repo.clone()).await; let (mut top_level, dispatcher) = mediarepo_worker::start(Toplevel::new(), repo.clone()).await;
let mut shared_data = TypeList::default(); let mut shared_data = CloneSendSyncTypeMap::new();
shared_data.add::<RepoKey, _>(Arc::new(repo)); shared_data.insert::<RepoKey>(Arc::new(repo));
shared_data.add::<SettingsKey, _>(settings.clone()); shared_data.insert::<SettingsKey>(settings.clone());
shared_data.add::<RepoPathKey, _>(opt.repo.clone()); shared_data.insert::<RepoPathKey>(opt.repo.clone());
shared_data.add::<DispatcherKey, _>(dispatcher); shared_data.insert::<DispatcherKey>(dispatcher);
#[cfg(unix)] #[cfg(unix)]
{ {
@ -120,7 +121,12 @@ async fn start_server(opt: Opt, settings: Settings) -> RepoResult<()> {
top_level = top_level.start("mediarepo-unix-socket", |subsystem| { top_level = top_level.start("mediarepo-unix-socket", |subsystem| {
Box::pin(async move { Box::pin(async move {
start_and_await_unix_socket(subsystem, repo_path, shared_data).await?; start_and_await_unix_socket(
subsystem,
repo_path,
SendSyncTypeMap::from_iter(shared_data),
)
.await?;
Ok(()) Ok(())
}) })
}) })
@ -130,7 +136,13 @@ async fn start_server(opt: Opt, settings: Settings) -> RepoResult<()> {
if settings.server.tcp.enabled { if settings.server.tcp.enabled {
top_level = top_level.start("mediarepo-tcp", move |subsystem| { top_level = top_level.start("mediarepo-tcp", move |subsystem| {
Box::pin(async move { Box::pin(async move {
start_and_await_tcp_server(subsystem, opt.repo, settings, shared_data).await?; start_and_await_tcp_server(
subsystem,
opt.repo,
settings,
SendSyncTypeMap::from_iter(shared_data),
)
.await?;
Ok(()) Ok(())
}) })
@ -157,7 +169,7 @@ async fn start_and_await_tcp_server(
subsystem: SubsystemHandle, subsystem: SubsystemHandle,
repo_path: PathBuf, repo_path: PathBuf,
settings: Settings, settings: Settings,
shared_data: TypeList, shared_data: SendSyncTypeMap,
) -> RepoResult<()> { ) -> RepoResult<()> {
let (address, handle) = start_tcp_server(subsystem.clone(), settings, shared_data)?; let (address, handle) = start_tcp_server(subsystem.clone(), settings, shared_data)?;
let (mut file, _guard) = DropFile::new(repo_path.join("repo.tcp")).await?; let (mut file, _guard) = DropFile::new(repo_path.join("repo.tcp")).await?;
@ -182,7 +194,7 @@ async fn start_and_await_tcp_server(
async fn start_and_await_unix_socket( async fn start_and_await_unix_socket(
subsystem: SubsystemHandle, subsystem: SubsystemHandle,
repo_path: PathBuf, repo_path: PathBuf,
shared_data: TypeList, shared_data: SendSyncTypeMap,
) -> RepoResult<()> { ) -> RepoResult<()> {
let socket_path = repo_path.join("repo.sock"); let socket_path = repo_path.join("repo.sock");
let handle = mediarepo_socket::create_unix_socket(subsystem.clone(), socket_path, shared_data)?; let handle = mediarepo_socket::create_unix_socket(subsystem.clone(), socket_path, shared_data)?;

Loading…
Cancel
Save