Fix shutdown to close the repository instead of aborting completely

Signed-off-by: trivernis <trivernis@protonmail.com>
pull/4/head
trivernis 3 years ago
parent f77a45e963
commit 810f9986af

@ -79,6 +79,17 @@ version = "0.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8da52d66c7071e2e3fa2a1e5c6d088fec47b593032b254f5e980de8ea54454d6"
[[package]]
name = "async-recursion"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d7d78656ba01f1b93024b7c3a0467f1608e4be67d725749fdcd7d2c7678fd7a2"
dependencies = [
"proc-macro2 1.0.35",
"quote 1.0.10",
"syn 1.0.84",
]
[[package]]
name = "async-stream"
version = "0.3.2"
@ -1171,8 +1182,8 @@ checksum = "a3e378b66a060d48947b590737b30a1be76706c8dd7b8ba0f2fe3989c68a853f"
[[package]]
name = "mediarepo-api"
version = "0.24.2"
source = "git+https://github.com/Trivernis/mediarepo-api.git?rev=7b210251f0986e3be060bcfd69cfddcec4e45466#7b210251f0986e3be060bcfd69cfddcec4e45466"
version = "0.26.0"
source = "git+https://github.com/Trivernis/mediarepo-api.git?rev=845874bafc253b6ed670594dbcf6d754709ac1e4#845874bafc253b6ed670594dbcf6d754709ac1e4"
dependencies = [
"bromine",
"chrono",
@ -1201,6 +1212,7 @@ dependencies = [
"thiserror",
"thumbnailer",
"tokio",
"tokio-graceful-shutdown",
"toml",
"tracing",
"typemap_rev",
@ -2216,6 +2228,15 @@ version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7fdf1b9db47230893d76faad238fd6097fd6d6a9245cd7a4d90dbd639536bbd2"
[[package]]
name = "signal-hook-registry"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e51e73328dc4ac0c7ccbda3a494dfa03df1de2f46018127f60c693f2648455b0"
dependencies = [
"libc",
]
[[package]]
name = "slab"
version = "0.4.5"
@ -2549,7 +2570,9 @@ dependencies = [
"memchr",
"mio",
"num_cpus",
"once_cell",
"pin-project-lite",
"signal-hook-registry",
"tokio-macros",
"tracing",
"winapi",
@ -2565,6 +2588,20 @@ dependencies = [
"futures 0.1.31",
]
[[package]]
name = "tokio-graceful-shutdown"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d08ebea7dc6b22273290d8ece2ca448f979f836e38ba629b650595c64204b4f2"
dependencies = [
"anyhow",
"async-recursion",
"futures 0.3.19",
"log",
"tokio",
"tokio-util",
]
[[package]]
name = "tokio-io-timeout"
version = "1.1.1"

@ -19,6 +19,7 @@ itertools = "^0.10.3"
glob = "^0.3.0"
tracing = "^0.1.29"
data-encoding = "^2.3.2"
tokio-graceful-shutdown = "^0.4.3"
[dependencies.thumbnailer]
version = "^0.2.4"
@ -43,7 +44,7 @@ features = ["toml"]
[dependencies.mediarepo-api]
git = "https://github.com/Trivernis/mediarepo-api.git"
rev = "7b210251f0986e3be060bcfd69cfddcec4e45466"
rev = "845874bafc253b6ed670594dbcf6d754709ac1e4"
features = ["bromine"]
[features]

@ -0,0 +1,35 @@
use std::io::Result;
use std::path::{Path, PathBuf};
use tokio::fs::{File, OpenOptions};
/// A file that only exists while being owned.
/// Will automatically be deleted on Drop
pub struct DropFile {
path: PathBuf,
}
impl DropFile {
pub async fn new<P: AsRef<Path>>(path: P) -> Result<(File, Self)> {
let file = OpenOptions::new()
.write(true)
.read(true)
.create(true)
.open(path.as_ref())
.await?;
Ok((file, Self::from_path(path)))
}
pub fn from_path<P: AsRef<Path>>(path: P) -> Self {
Self {
path: path.as_ref().to_path_buf(),
}
}
}
impl Drop for DropFile {
fn drop(&mut self) {
if let Err(e) = std::fs::remove_file(&self.path) {
tracing::error!("failed to remove drop file '{}'", e);
}
}
}

@ -1,2 +1,3 @@
pub mod drop_file;
pub mod file_hash_store;
pub mod thumbnail_store;

@ -3,6 +3,7 @@ pub use itertools;
pub use mediarepo_api;
pub use mediarepo_api::bromine;
pub use thumbnailer;
pub use tokio_graceful_shutdown;
pub mod content_descriptor;
pub mod context;

@ -30,7 +30,7 @@ impl Settings {
FileFormat::Toml,
))?
.merge(config::File::from(root.join("repo")))?
.merge(config::Environment::with_prefix("MEDIAREPO"))?;
.merge(config::Environment::with_prefix("MEDIAREPO").separator("."))?;
tracing::debug!("Settings are: {:#?}", settings);
Ok(settings.try_into::<Settings>()?)

@ -2,6 +2,7 @@ use crate::settings::Settings;
use mediarepo_api::types::repo::SizeType;
use std::collections::HashMap;
use std::path::PathBuf;
use tokio_graceful_shutdown::SubsystemHandle;
use typemap_rev::TypeMapKey;
pub struct SettingsKey;
@ -21,3 +22,9 @@ pub struct SizeMetadataKey;
impl TypeMapKey for SizeMetadataKey {
type Value = HashMap<SizeType, u64>;
}
pub struct SubsystemKey;
impl TypeMapKey for SubsystemKey {
type Value = SubsystemHandle;
}

@ -1,6 +1,7 @@
use mediarepo_core::error::RepoDatabaseResult;
use sea_orm::{Database, DatabaseConnection};
use sea_orm::{ConnectOptions, Database, DatabaseConnection};
use sqlx::migrate::MigrateDatabase;
use std::time::Duration;
pub mod entities;
pub mod queries;
@ -8,7 +9,11 @@ pub mod queries;
/// Connects to the database, runs migrations and returns the RepoDatabase wrapper type
pub async fn get_database<S: AsRef<str>>(uri: S) -> RepoDatabaseResult<DatabaseConnection> {
migrate(uri.as_ref()).await?;
let conn = Database::connect(uri.as_ref()).await?;
let mut opt = ConnectOptions::new(uri.as_ref().to_string());
opt.connect_timeout(Duration::from_secs(10))
.idle_timeout(Duration::from_secs(10));
let conn = Database::connect(opt).await?;
Ok(conn)
}

@ -2,7 +2,8 @@ use mediarepo_core::bromine::prelude::*;
use mediarepo_core::error::{RepoError, RepoResult};
use mediarepo_core::mediarepo_api::types::misc::InfoResponse;
use mediarepo_core::settings::{PortSetting, Settings};
use mediarepo_core::type_keys::{RepoPathKey, SettingsKey, SizeMetadataKey};
use mediarepo_core::tokio_graceful_shutdown::SubsystemHandle;
use mediarepo_core::type_keys::{RepoPathKey, SettingsKey, SizeMetadataKey, SubsystemKey};
use mediarepo_model::repo::Repo;
use mediarepo_model::type_keys::RepoKey;
use std::net::SocketAddr;
@ -15,8 +16,9 @@ mod from_model;
mod namespaces;
mod utils;
#[tracing::instrument(skip(settings, repo))]
#[tracing::instrument(skip(subsystem, settings, repo))]
pub fn start_tcp_server(
subsystem: SubsystemHandle,
repo_path: PathBuf,
settings: Settings,
repo: Repo,
@ -40,6 +42,7 @@ pub fn start_tcp_server(
.name("mediarepo_tcp::listen")
.spawn(async move {
get_builder::<TcpListener>(address)
.insert::<SubsystemKey>(subsystem)
.insert::<RepoKey>(Arc::new(repo))
.insert::<SettingsKey>(settings)
.insert::<RepoPathKey>(repo_path)
@ -53,8 +56,9 @@ pub fn start_tcp_server(
}
#[cfg(unix)]
#[tracing::instrument(skip(settings, repo))]
#[tracing::instrument(skip(subsystem, settings, repo))]
pub fn create_unix_socket(
subsystem: SubsystemHandle,
path: std::path::PathBuf,
repo_path: PathBuf,
settings: Settings,
@ -70,6 +74,7 @@ pub fn create_unix_socket(
.name("mediarepo_unix_socket::listen")
.spawn(async move {
get_builder::<UnixListener>(path)
.insert::<SubsystemKey>(subsystem)
.insert::<RepoKey>(Arc::new(repo))
.insert::<SettingsKey>(settings)
.insert::<RepoPathKey>(repo_path)
@ -83,7 +88,9 @@ pub fn create_unix_socket(
}
fn get_builder<L: AsyncStreamProtocolListener>(address: L::AddressType) -> IPCBuilder<L> {
namespaces::build_namespaces(IPCBuilder::new().address(address)).on("info", callback!(info))
namespaces::build_namespaces(IPCBuilder::new().address(address))
.on("info", callback!(info))
.on("shutdown", callback!(shutdown))
}
#[tracing::instrument(skip_all)]
@ -96,3 +103,17 @@ async fn info(ctx: &Context, _: Event) -> IPCResult<()> {
Ok(())
}
#[tracing::instrument(skip_all)]
async fn shutdown(ctx: &Context, _: Event) -> IPCResult<()> {
ctx.clone().stop().await?;
{
let data = ctx.data.read().await;
let subsystem = data.get::<SubsystemKey>().unwrap();
subsystem.request_shutdown();
subsystem.on_shutdown_requested().await;
}
ctx.emit("shutdown", ()).await?;
Ok(())
}

@ -1,15 +1,18 @@
mod searching;
mod sorting;
use crate::from_model::FromModel;
use crate::utils::{cd_by_identifier, file_by_identifier, get_repo_from_context};
use chrono::NaiveDateTime;
use compare::Compare;
use mediarepo_core::bromine::prelude::*;
use mediarepo_core::fs::thumbnail_store::Dimensions;
use mediarepo_core::itertools::Itertools;
use mediarepo_core::mediarepo_api::types::files::{
AddFileRequestHeader, FileBasicDataResponse, FileMetadataResponse, FilterExpression,
FindFilesRequest, GetFileThumbnailOfSizeRequest, GetFileThumbnailsRequest, ReadFileRequest,
SortDirection, SortKey, ThumbnailMetadataResponse, UpdateFileNameRequest,
AddFileRequestHeader, FileBasicDataResponse, FileMetadataResponse,
GetFileThumbnailOfSizeRequest, GetFileThumbnailsRequest, ReadFileRequest,
ThumbnailMetadataResponse, UpdateFileNameRequest,
};
use mediarepo_core::mediarepo_api::types::filtering::{FilterExpression, FindFilesRequest};
use mediarepo_core::mediarepo_api::types::identifier::FileIdentifier;
use mediarepo_core::thumbnailer::ThumbnailSize;
use mediarepo_core::utils::parse_namespace_and_tag;
@ -18,7 +21,6 @@ use mediarepo_database::queries::tags::{
};
use mediarepo_model::file_metadata::FileMetadata;
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
use std::cmp::Ordering;
use std::collections::HashMap;
use std::iter::FromIterator;
use tokio::io::AsyncReadExt;
@ -177,7 +179,7 @@ impl FilesNamespace {
tracing::debug!("sort_expression = {:?}", sort_expression);
files.sort_by(|a, b| {
compare_files(
sorting::compare_files(
contexts.get(&a.id()).unwrap(),
contexts.get(&b.id()).unwrap(),
&sort_expression,
@ -345,113 +347,3 @@ impl FilesNamespace {
Ok(())
}
}
#[tracing::instrument(level = "trace", skip_all)]
fn compare_files(
ctx_a: &FileSortContext,
ctx_b: &FileSortContext,
expression: &Vec<SortKey>,
) -> Ordering {
let cmp_date = compare::natural();
let cmp_u64 = compare::natural();
let cmp_u32 = compare::natural();
for sort_key in expression {
let ordering = match sort_key {
SortKey::Namespace(namespace) => {
let list_a = ctx_a.namespaces.get(&namespace.name);
let list_b = ctx_b.namespaces.get(&namespace.name);
let cmp_result = if let (Some(list_a), Some(list_b)) = (list_a, list_b) {
compare_tag_lists(list_a, list_b)
} else if list_a.is_some() {
Ordering::Greater
} else if list_b.is_some() {
Ordering::Less
} else {
Ordering::Equal
};
adjust_for_dir(cmp_result, &namespace.direction)
}
SortKey::FileName(direction) => {
adjust_for_dir(compare_opts(&ctx_a.name, &ctx_b.name), direction)
}
SortKey::FileSize(direction) => {
adjust_for_dir(cmp_u64.compare(&ctx_a.size, &ctx_b.size), direction)
}
SortKey::FileImportedTime(direction) => adjust_for_dir(
cmp_date.compare(&ctx_a.import_time, &ctx_b.import_time),
direction,
),
SortKey::FileCreatedTime(direction) => adjust_for_dir(
cmp_date.compare(&ctx_a.create_time, &ctx_b.create_time),
direction,
),
SortKey::FileChangeTime(direction) => adjust_for_dir(
cmp_date.compare(&ctx_a.change_time, &ctx_b.change_time),
direction,
),
SortKey::FileType(direction) => {
adjust_for_dir(ctx_a.mime_type.cmp(&ctx_b.mime_type), direction)
}
SortKey::NumTags(direction) => adjust_for_dir(
cmp_u32.compare(&ctx_a.tag_count, &ctx_b.tag_count),
direction,
),
};
if !ordering.is_eq() {
return ordering;
}
}
Ordering::Equal
}
fn compare_opts<T: Ord + Sized>(opt_a: &Option<T>, opt_b: &Option<T>) -> Ordering {
let cmp = compare::natural();
if let (Some(a), Some(b)) = (opt_a, opt_b) {
cmp.compare(a, b)
} else if opt_a.is_some() {
Ordering::Greater
} else if opt_b.is_some() {
Ordering::Less
} else {
Ordering::Equal
}
}
fn compare_f32(a: f32, b: f32) -> Ordering {
if a > b {
Ordering::Greater
} else if b > a {
Ordering::Less
} else {
Ordering::Equal
}
}
fn adjust_for_dir(ordering: Ordering, direction: &SortDirection) -> Ordering {
if *direction == SortDirection::Descending {
ordering.reverse()
} else {
ordering
}
}
fn compare_tag_lists(list_a: &Vec<String>, list_b: &Vec<String>) -> Ordering {
let first_diff = list_a
.into_iter()
.zip(list_b.into_iter())
.find(|(a, b)| *a != *b);
if let Some(diff) = first_diff {
if let (Some(num_a), Some(num_b)) = (diff.0.parse::<f32>().ok(), diff.1.parse::<f32>().ok())
{
compare_f32(num_a, num_b)
} else {
let cmp = compare::natural();
cmp.compare(diff.0, diff.1)
}
} else {
Ordering::Equal
}
}

@ -0,0 +1,114 @@
use crate::namespaces::files::FileSortContext;
use compare::Compare;
use mediarepo_core::mediarepo_api::types::filtering::{SortDirection, SortKey};
use std::cmp::Ordering;
#[tracing::instrument(level = "trace", skip_all)]
pub fn compare_files(
ctx_a: &FileSortContext,
ctx_b: &FileSortContext,
expression: &Vec<SortKey>,
) -> Ordering {
let cmp_date = compare::natural();
let cmp_u64 = compare::natural();
let cmp_u32 = compare::natural();
for sort_key in expression {
let ordering = match sort_key {
SortKey::Namespace(namespace) => {
let list_a = ctx_a.namespaces.get(&namespace.name);
let list_b = ctx_b.namespaces.get(&namespace.name);
let cmp_result = if let (Some(list_a), Some(list_b)) = (list_a, list_b) {
compare_tag_lists(list_a, list_b)
} else if list_a.is_some() {
Ordering::Greater
} else if list_b.is_some() {
Ordering::Less
} else {
Ordering::Equal
};
adjust_for_dir(cmp_result, &namespace.direction)
}
SortKey::FileName(direction) => {
adjust_for_dir(compare_opts(&ctx_a.name, &ctx_b.name), direction)
}
SortKey::FileSize(direction) => {
adjust_for_dir(cmp_u64.compare(&ctx_a.size, &ctx_b.size), direction)
}
SortKey::FileImportedTime(direction) => adjust_for_dir(
cmp_date.compare(&ctx_a.import_time, &ctx_b.import_time),
direction,
),
SortKey::FileCreatedTime(direction) => adjust_for_dir(
cmp_date.compare(&ctx_a.create_time, &ctx_b.create_time),
direction,
),
SortKey::FileChangeTime(direction) => adjust_for_dir(
cmp_date.compare(&ctx_a.change_time, &ctx_b.change_time),
direction,
),
SortKey::FileType(direction) => {
adjust_for_dir(ctx_a.mime_type.cmp(&ctx_b.mime_type), direction)
}
SortKey::NumTags(direction) => adjust_for_dir(
cmp_u32.compare(&ctx_a.tag_count, &ctx_b.tag_count),
direction,
),
};
if !ordering.is_eq() {
return ordering;
}
}
Ordering::Equal
}
fn compare_opts<T: Ord + Sized>(opt_a: &Option<T>, opt_b: &Option<T>) -> Ordering {
let cmp = compare::natural();
if let (Some(a), Some(b)) = (opt_a, opt_b) {
cmp.compare(a, b)
} else if opt_a.is_some() {
Ordering::Greater
} else if opt_b.is_some() {
Ordering::Less
} else {
Ordering::Equal
}
}
fn compare_f32(a: f32, b: f32) -> Ordering {
if a > b {
Ordering::Greater
} else if b > a {
Ordering::Less
} else {
Ordering::Equal
}
}
fn adjust_for_dir(ordering: Ordering, direction: &SortDirection) -> Ordering {
if *direction == SortDirection::Descending {
ordering.reverse()
} else {
ordering
}
}
fn compare_tag_lists(list_a: &Vec<String>, list_b: &Vec<String>) -> Ordering {
let first_diff = list_a
.into_iter()
.zip(list_b.into_iter())
.find(|(a, b)| *a != *b);
if let Some(diff) = first_diff {
if let (Some(num_a), Some(num_b)) = (diff.0.parse::<f32>().ok(), diff.1.parse::<f32>().ok())
{
compare_f32(num_a, num_b)
} else {
let cmp = compare::natural();
cmp.compare(diff.0, diff.1)
}
} else {
Ordering::Equal
}
}

@ -6,11 +6,14 @@ use tokio::runtime;
use tokio::runtime::Runtime;
use mediarepo_core::error::RepoResult;
use mediarepo_core::futures;
use mediarepo_core::fs::drop_file::DropFile;
use mediarepo_core::settings::{PathSettings, Settings};
use mediarepo_core::tokio_graceful_shutdown::{SubsystemHandle, Toplevel};
use mediarepo_model::repo::Repo;
use mediarepo_socket::start_tcp_server;
use std::env;
use std::time::Duration;
use tokio::io::AsyncWriteExt;
use crate::utils::{create_paths_for_repo, get_repo, load_settings};
@ -56,6 +59,7 @@ fn main() -> RepoResult<()> {
} else {
Settings::default()
};
clean_old_connection_files(&opt.repo)?;
let mut guards = Vec::new();
if opt.profile {
@ -105,24 +109,103 @@ async fn init_repo(opt: &Opt, paths: &PathSettings) -> RepoResult<Repo> {
/// Starts the server
async fn start_server(opt: Opt, settings: Settings) -> RepoResult<()> {
let repo = init_repo(&opt, &settings.paths).await?;
let mut handles = Vec::new();
let mut top_level = Toplevel::new();
#[cfg(unix)]
{
let socket_path = opt.repo.join("repo.sock");
let handle = mediarepo_socket::create_unix_socket(
socket_path,
opt.repo.clone(),
settings.clone(),
repo.clone(),
)?;
handles.push(handle);
if settings.server.unix_socket.enabled {
let settings = settings.clone();
let repo_path = opt.repo.clone();
let repo = repo.clone();
top_level = top_level.start("mediarepo-unix-socket", |subsystem| {
Box::pin(async move {
start_and_await_unix_socket(subsystem, repo_path, settings, repo).await?;
Ok(())
})
})
}
}
if settings.server.tcp.enabled {
top_level = top_level.start("mediarepo-tcp", move |subsystem| {
Box::pin(async move {
start_and_await_tcp_server(subsystem, opt.repo, settings, repo).await?;
Ok(())
})
})
}
if let Err(e) = top_level
.catch_signals()
.handle_shutdown_requests(Duration::from_millis(1000))
.await
{
tracing::error!("an error occurred when running the servers {}", e);
}
tracing::warn!(
r"All servers quit.
Either they were requested to stop, a fatal error occurred or no servers are enabled in the config.
Stopping daemon..."
);
Ok(())
}
async fn start_and_await_tcp_server(
subsystem: SubsystemHandle,
repo_path: PathBuf,
settings: Settings,
repo: Repo,
) -> RepoResult<()> {
let (address, handle) = start_tcp_server(subsystem.clone(), repo_path.clone(), settings, repo)?;
let (mut file, _guard) = DropFile::new(repo_path.join("repo.tcp")).await?;
file.write_all(&address.into_bytes()).await?;
tokio::select! {
_ = subsystem.on_shutdown_requested() => {
tracing::info!("shutdown requested")
},
result = handle => {
if let Err(e) = result {
tracing::error!("the tcp server shut down with an error {}", e);
subsystem.request_shutdown();
}
}
}
let (address, tcp_handle) = start_tcp_server(opt.repo.clone(), settings, repo)?;
handles.push(tcp_handle);
fs::write(opt.repo.join("repo.tcp"), &address.into_bytes()).await?;
futures::future::join_all(handles.into_iter()).await;
Ok(())
}
#[cfg(unix)]
async fn start_and_await_unix_socket(
subsystem: SubsystemHandle,
repo_path: PathBuf,
settings: Settings,
repo: Repo,
) -> RepoResult<()> {
let socket_path = repo_path.join("repo.sock");
let handle = mediarepo_socket::create_unix_socket(
subsystem.clone(),
socket_path,
repo_path.clone(),
settings,
repo,
)?;
let _guard = DropFile::from_path(repo_path.join("repo.sock"));
tokio::select! {
_ = subsystem.on_shutdown_requested() => {
tracing::info!("shutdown requested")
},
result = handle => {
if let Err(e) = result {
tracing::error!("the unix socket shut down with an error {}", e);
subsystem.request_shutdown();
}
}
}
Ok(())
}
@ -153,3 +236,17 @@ async fn init(opt: Opt, force: bool) -> RepoResult<()> {
Ok(())
}
fn clean_old_connection_files(root: &PathBuf) -> RepoResult<()> {
let paths = ["repo.tcp", "repo.sock"];
for path in paths {
let path = root.join(path);
if path.exists() {
std::fs::remove_file(&path)?;
}
}
Ok(())
}

Loading…
Cancel
Save