From 810f9986af7269f8d5ec50e42abc6e1cd2f131b8 Mon Sep 17 00:00:00 2001 From: trivernis Date: Sun, 9 Jan 2022 20:28:38 +0100 Subject: [PATCH] Fix shutdown to close the repository instead of aborting completely Signed-off-by: trivernis --- mediarepo-daemon/Cargo.lock | 41 +++++- mediarepo-daemon/mediarepo-core/Cargo.toml | 3 +- .../mediarepo-core/src/fs/drop_file.rs | 35 +++++ mediarepo-daemon/mediarepo-core/src/fs/mod.rs | 1 + mediarepo-daemon/mediarepo-core/src/lib.rs | 1 + .../mediarepo-core/src/settings/mod.rs | 2 +- .../mediarepo-core/src/type_keys.rs | 7 + .../mediarepo-database/src/lib.rs | 9 +- mediarepo-daemon/mediarepo-socket/src/lib.rs | 29 +++- .../src/namespaces/{files.rs => files/mod.rs} | 124 ++--------------- .../src/namespaces/files/searching.rs | 0 .../src/namespaces/files/sorting.rs | 114 ++++++++++++++++ mediarepo-daemon/src/main.rs | 125 ++++++++++++++++-- 13 files changed, 351 insertions(+), 140 deletions(-) create mode 100644 mediarepo-daemon/mediarepo-core/src/fs/drop_file.rs rename mediarepo-daemon/mediarepo-socket/src/namespaces/{files.rs => files/mod.rs} (76%) create mode 100644 mediarepo-daemon/mediarepo-socket/src/namespaces/files/searching.rs create mode 100644 mediarepo-daemon/mediarepo-socket/src/namespaces/files/sorting.rs diff --git a/mediarepo-daemon/Cargo.lock b/mediarepo-daemon/Cargo.lock index 69ffb70..b02d088 100644 --- a/mediarepo-daemon/Cargo.lock +++ b/mediarepo-daemon/Cargo.lock @@ -79,6 +79,17 @@ version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8da52d66c7071e2e3fa2a1e5c6d088fec47b593032b254f5e980de8ea54454d6" +[[package]] +name = "async-recursion" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7d78656ba01f1b93024b7c3a0467f1608e4be67d725749fdcd7d2c7678fd7a2" +dependencies = [ + "proc-macro2 1.0.35", + "quote 1.0.10", + "syn 1.0.84", +] + [[package]] name = "async-stream" version = "0.3.2" @@ -1171,8 +1182,8 @@ checksum = "a3e378b66a060d48947b590737b30a1be76706c8dd7b8ba0f2fe3989c68a853f" [[package]] name = "mediarepo-api" -version = "0.24.2" -source = "git+https://github.com/Trivernis/mediarepo-api.git?rev=7b210251f0986e3be060bcfd69cfddcec4e45466#7b210251f0986e3be060bcfd69cfddcec4e45466" +version = "0.26.0" +source = "git+https://github.com/Trivernis/mediarepo-api.git?rev=845874bafc253b6ed670594dbcf6d754709ac1e4#845874bafc253b6ed670594dbcf6d754709ac1e4" dependencies = [ "bromine", "chrono", @@ -1201,6 +1212,7 @@ dependencies = [ "thiserror", "thumbnailer", "tokio", + "tokio-graceful-shutdown", "toml", "tracing", "typemap_rev", @@ -2216,6 +2228,15 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7fdf1b9db47230893d76faad238fd6097fd6d6a9245cd7a4d90dbd639536bbd2" +[[package]] +name = "signal-hook-registry" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e51e73328dc4ac0c7ccbda3a494dfa03df1de2f46018127f60c693f2648455b0" +dependencies = [ + "libc", +] + [[package]] name = "slab" version = "0.4.5" @@ -2549,7 +2570,9 @@ dependencies = [ "memchr", "mio", "num_cpus", + "once_cell", "pin-project-lite", + "signal-hook-registry", "tokio-macros", "tracing", "winapi", @@ -2565,6 +2588,20 @@ dependencies = [ "futures 0.1.31", ] +[[package]] +name = "tokio-graceful-shutdown" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d08ebea7dc6b22273290d8ece2ca448f979f836e38ba629b650595c64204b4f2" +dependencies = [ + "anyhow", + "async-recursion", + "futures 0.3.19", + "log", + "tokio", + "tokio-util", +] + [[package]] name = "tokio-io-timeout" version = "1.1.1" diff --git a/mediarepo-daemon/mediarepo-core/Cargo.toml b/mediarepo-daemon/mediarepo-core/Cargo.toml index a626b3b..2037b70 100644 --- a/mediarepo-daemon/mediarepo-core/Cargo.toml +++ b/mediarepo-daemon/mediarepo-core/Cargo.toml @@ -19,6 +19,7 @@ itertools = "^0.10.3" glob = "^0.3.0" tracing = "^0.1.29" data-encoding = "^2.3.2" +tokio-graceful-shutdown = "^0.4.3" [dependencies.thumbnailer] version = "^0.2.4" @@ -43,7 +44,7 @@ features = ["toml"] [dependencies.mediarepo-api] git = "https://github.com/Trivernis/mediarepo-api.git" -rev = "7b210251f0986e3be060bcfd69cfddcec4e45466" +rev = "845874bafc253b6ed670594dbcf6d754709ac1e4" features = ["bromine"] [features] diff --git a/mediarepo-daemon/mediarepo-core/src/fs/drop_file.rs b/mediarepo-daemon/mediarepo-core/src/fs/drop_file.rs new file mode 100644 index 0000000..53fb266 --- /dev/null +++ b/mediarepo-daemon/mediarepo-core/src/fs/drop_file.rs @@ -0,0 +1,35 @@ +use std::io::Result; +use std::path::{Path, PathBuf}; +use tokio::fs::{File, OpenOptions}; + +/// A file that only exists while being owned. +/// Will automatically be deleted on Drop +pub struct DropFile { + path: PathBuf, +} + +impl DropFile { + pub async fn new>(path: P) -> Result<(File, Self)> { + let file = OpenOptions::new() + .write(true) + .read(true) + .create(true) + .open(path.as_ref()) + .await?; + Ok((file, Self::from_path(path))) + } + + pub fn from_path>(path: P) -> Self { + Self { + path: path.as_ref().to_path_buf(), + } + } +} + +impl Drop for DropFile { + fn drop(&mut self) { + if let Err(e) = std::fs::remove_file(&self.path) { + tracing::error!("failed to remove drop file '{}'", e); + } + } +} diff --git a/mediarepo-daemon/mediarepo-core/src/fs/mod.rs b/mediarepo-daemon/mediarepo-core/src/fs/mod.rs index 00afe5a..b781e12 100644 --- a/mediarepo-daemon/mediarepo-core/src/fs/mod.rs +++ b/mediarepo-daemon/mediarepo-core/src/fs/mod.rs @@ -1,2 +1,3 @@ +pub mod drop_file; pub mod file_hash_store; pub mod thumbnail_store; diff --git a/mediarepo-daemon/mediarepo-core/src/lib.rs b/mediarepo-daemon/mediarepo-core/src/lib.rs index 9296ec7..a33546e 100644 --- a/mediarepo-daemon/mediarepo-core/src/lib.rs +++ b/mediarepo-daemon/mediarepo-core/src/lib.rs @@ -3,6 +3,7 @@ pub use itertools; pub use mediarepo_api; pub use mediarepo_api::bromine; pub use thumbnailer; +pub use tokio_graceful_shutdown; pub mod content_descriptor; pub mod context; diff --git a/mediarepo-daemon/mediarepo-core/src/settings/mod.rs b/mediarepo-daemon/mediarepo-core/src/settings/mod.rs index c3fcd4a..329e32a 100644 --- a/mediarepo-daemon/mediarepo-core/src/settings/mod.rs +++ b/mediarepo-daemon/mediarepo-core/src/settings/mod.rs @@ -30,7 +30,7 @@ impl Settings { FileFormat::Toml, ))? .merge(config::File::from(root.join("repo")))? - .merge(config::Environment::with_prefix("MEDIAREPO"))?; + .merge(config::Environment::with_prefix("MEDIAREPO").separator("."))?; tracing::debug!("Settings are: {:#?}", settings); Ok(settings.try_into::()?) diff --git a/mediarepo-daemon/mediarepo-core/src/type_keys.rs b/mediarepo-daemon/mediarepo-core/src/type_keys.rs index 50c9a8a..5aa447d 100644 --- a/mediarepo-daemon/mediarepo-core/src/type_keys.rs +++ b/mediarepo-daemon/mediarepo-core/src/type_keys.rs @@ -2,6 +2,7 @@ use crate::settings::Settings; use mediarepo_api::types::repo::SizeType; use std::collections::HashMap; use std::path::PathBuf; +use tokio_graceful_shutdown::SubsystemHandle; use typemap_rev::TypeMapKey; pub struct SettingsKey; @@ -21,3 +22,9 @@ pub struct SizeMetadataKey; impl TypeMapKey for SizeMetadataKey { type Value = HashMap; } + +pub struct SubsystemKey; + +impl TypeMapKey for SubsystemKey { + type Value = SubsystemHandle; +} diff --git a/mediarepo-daemon/mediarepo-database/src/lib.rs b/mediarepo-daemon/mediarepo-database/src/lib.rs index e716e45..ea5fb18 100644 --- a/mediarepo-daemon/mediarepo-database/src/lib.rs +++ b/mediarepo-daemon/mediarepo-database/src/lib.rs @@ -1,6 +1,7 @@ use mediarepo_core::error::RepoDatabaseResult; -use sea_orm::{Database, DatabaseConnection}; +use sea_orm::{ConnectOptions, Database, DatabaseConnection}; use sqlx::migrate::MigrateDatabase; +use std::time::Duration; pub mod entities; pub mod queries; @@ -8,7 +9,11 @@ pub mod queries; /// Connects to the database, runs migrations and returns the RepoDatabase wrapper type pub async fn get_database>(uri: S) -> RepoDatabaseResult { migrate(uri.as_ref()).await?; - let conn = Database::connect(uri.as_ref()).await?; + let mut opt = ConnectOptions::new(uri.as_ref().to_string()); + opt.connect_timeout(Duration::from_secs(10)) + .idle_timeout(Duration::from_secs(10)); + + let conn = Database::connect(opt).await?; Ok(conn) } diff --git a/mediarepo-daemon/mediarepo-socket/src/lib.rs b/mediarepo-daemon/mediarepo-socket/src/lib.rs index 52ce779..50f6918 100644 --- a/mediarepo-daemon/mediarepo-socket/src/lib.rs +++ b/mediarepo-daemon/mediarepo-socket/src/lib.rs @@ -2,7 +2,8 @@ use mediarepo_core::bromine::prelude::*; use mediarepo_core::error::{RepoError, RepoResult}; use mediarepo_core::mediarepo_api::types::misc::InfoResponse; use mediarepo_core::settings::{PortSetting, Settings}; -use mediarepo_core::type_keys::{RepoPathKey, SettingsKey, SizeMetadataKey}; +use mediarepo_core::tokio_graceful_shutdown::SubsystemHandle; +use mediarepo_core::type_keys::{RepoPathKey, SettingsKey, SizeMetadataKey, SubsystemKey}; use mediarepo_model::repo::Repo; use mediarepo_model::type_keys::RepoKey; use std::net::SocketAddr; @@ -15,8 +16,9 @@ mod from_model; mod namespaces; mod utils; -#[tracing::instrument(skip(settings, repo))] +#[tracing::instrument(skip(subsystem, settings, repo))] pub fn start_tcp_server( + subsystem: SubsystemHandle, repo_path: PathBuf, settings: Settings, repo: Repo, @@ -40,6 +42,7 @@ pub fn start_tcp_server( .name("mediarepo_tcp::listen") .spawn(async move { get_builder::(address) + .insert::(subsystem) .insert::(Arc::new(repo)) .insert::(settings) .insert::(repo_path) @@ -53,8 +56,9 @@ pub fn start_tcp_server( } #[cfg(unix)] -#[tracing::instrument(skip(settings, repo))] +#[tracing::instrument(skip(subsystem, settings, repo))] pub fn create_unix_socket( + subsystem: SubsystemHandle, path: std::path::PathBuf, repo_path: PathBuf, settings: Settings, @@ -70,6 +74,7 @@ pub fn create_unix_socket( .name("mediarepo_unix_socket::listen") .spawn(async move { get_builder::(path) + .insert::(subsystem) .insert::(Arc::new(repo)) .insert::(settings) .insert::(repo_path) @@ -83,7 +88,9 @@ pub fn create_unix_socket( } fn get_builder(address: L::AddressType) -> IPCBuilder { - namespaces::build_namespaces(IPCBuilder::new().address(address)).on("info", callback!(info)) + namespaces::build_namespaces(IPCBuilder::new().address(address)) + .on("info", callback!(info)) + .on("shutdown", callback!(shutdown)) } #[tracing::instrument(skip_all)] @@ -96,3 +103,17 @@ async fn info(ctx: &Context, _: Event) -> IPCResult<()> { Ok(()) } + +#[tracing::instrument(skip_all)] +async fn shutdown(ctx: &Context, _: Event) -> IPCResult<()> { + ctx.clone().stop().await?; + { + let data = ctx.data.read().await; + let subsystem = data.get::().unwrap(); + subsystem.request_shutdown(); + subsystem.on_shutdown_requested().await; + } + ctx.emit("shutdown", ()).await?; + + Ok(()) +} diff --git a/mediarepo-daemon/mediarepo-socket/src/namespaces/files.rs b/mediarepo-daemon/mediarepo-socket/src/namespaces/files/mod.rs similarity index 76% rename from mediarepo-daemon/mediarepo-socket/src/namespaces/files.rs rename to mediarepo-daemon/mediarepo-socket/src/namespaces/files/mod.rs index ec5a4fe..7667d9c 100644 --- a/mediarepo-daemon/mediarepo-socket/src/namespaces/files.rs +++ b/mediarepo-daemon/mediarepo-socket/src/namespaces/files/mod.rs @@ -1,15 +1,18 @@ +mod searching; +mod sorting; + use crate::from_model::FromModel; use crate::utils::{cd_by_identifier, file_by_identifier, get_repo_from_context}; use chrono::NaiveDateTime; -use compare::Compare; use mediarepo_core::bromine::prelude::*; use mediarepo_core::fs::thumbnail_store::Dimensions; use mediarepo_core::itertools::Itertools; use mediarepo_core::mediarepo_api::types::files::{ - AddFileRequestHeader, FileBasicDataResponse, FileMetadataResponse, FilterExpression, - FindFilesRequest, GetFileThumbnailOfSizeRequest, GetFileThumbnailsRequest, ReadFileRequest, - SortDirection, SortKey, ThumbnailMetadataResponse, UpdateFileNameRequest, + AddFileRequestHeader, FileBasicDataResponse, FileMetadataResponse, + GetFileThumbnailOfSizeRequest, GetFileThumbnailsRequest, ReadFileRequest, + ThumbnailMetadataResponse, UpdateFileNameRequest, }; +use mediarepo_core::mediarepo_api::types::filtering::{FilterExpression, FindFilesRequest}; use mediarepo_core::mediarepo_api::types::identifier::FileIdentifier; use mediarepo_core::thumbnailer::ThumbnailSize; use mediarepo_core::utils::parse_namespace_and_tag; @@ -18,7 +21,6 @@ use mediarepo_database::queries::tags::{ }; use mediarepo_model::file_metadata::FileMetadata; use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; -use std::cmp::Ordering; use std::collections::HashMap; use std::iter::FromIterator; use tokio::io::AsyncReadExt; @@ -177,7 +179,7 @@ impl FilesNamespace { tracing::debug!("sort_expression = {:?}", sort_expression); files.sort_by(|a, b| { - compare_files( + sorting::compare_files( contexts.get(&a.id()).unwrap(), contexts.get(&b.id()).unwrap(), &sort_expression, @@ -345,113 +347,3 @@ impl FilesNamespace { Ok(()) } } - -#[tracing::instrument(level = "trace", skip_all)] -fn compare_files( - ctx_a: &FileSortContext, - ctx_b: &FileSortContext, - expression: &Vec, -) -> Ordering { - let cmp_date = compare::natural(); - let cmp_u64 = compare::natural(); - let cmp_u32 = compare::natural(); - - for sort_key in expression { - let ordering = match sort_key { - SortKey::Namespace(namespace) => { - let list_a = ctx_a.namespaces.get(&namespace.name); - let list_b = ctx_b.namespaces.get(&namespace.name); - - let cmp_result = if let (Some(list_a), Some(list_b)) = (list_a, list_b) { - compare_tag_lists(list_a, list_b) - } else if list_a.is_some() { - Ordering::Greater - } else if list_b.is_some() { - Ordering::Less - } else { - Ordering::Equal - }; - adjust_for_dir(cmp_result, &namespace.direction) - } - SortKey::FileName(direction) => { - adjust_for_dir(compare_opts(&ctx_a.name, &ctx_b.name), direction) - } - SortKey::FileSize(direction) => { - adjust_for_dir(cmp_u64.compare(&ctx_a.size, &ctx_b.size), direction) - } - SortKey::FileImportedTime(direction) => adjust_for_dir( - cmp_date.compare(&ctx_a.import_time, &ctx_b.import_time), - direction, - ), - SortKey::FileCreatedTime(direction) => adjust_for_dir( - cmp_date.compare(&ctx_a.create_time, &ctx_b.create_time), - direction, - ), - SortKey::FileChangeTime(direction) => adjust_for_dir( - cmp_date.compare(&ctx_a.change_time, &ctx_b.change_time), - direction, - ), - SortKey::FileType(direction) => { - adjust_for_dir(ctx_a.mime_type.cmp(&ctx_b.mime_type), direction) - } - SortKey::NumTags(direction) => adjust_for_dir( - cmp_u32.compare(&ctx_a.tag_count, &ctx_b.tag_count), - direction, - ), - }; - if !ordering.is_eq() { - return ordering; - } - } - - Ordering::Equal -} - -fn compare_opts(opt_a: &Option, opt_b: &Option) -> Ordering { - let cmp = compare::natural(); - if let (Some(a), Some(b)) = (opt_a, opt_b) { - cmp.compare(a, b) - } else if opt_a.is_some() { - Ordering::Greater - } else if opt_b.is_some() { - Ordering::Less - } else { - Ordering::Equal - } -} - -fn compare_f32(a: f32, b: f32) -> Ordering { - if a > b { - Ordering::Greater - } else if b > a { - Ordering::Less - } else { - Ordering::Equal - } -} - -fn adjust_for_dir(ordering: Ordering, direction: &SortDirection) -> Ordering { - if *direction == SortDirection::Descending { - ordering.reverse() - } else { - ordering - } -} - -fn compare_tag_lists(list_a: &Vec, list_b: &Vec) -> Ordering { - let first_diff = list_a - .into_iter() - .zip(list_b.into_iter()) - .find(|(a, b)| *a != *b); - if let Some(diff) = first_diff { - if let (Some(num_a), Some(num_b)) = (diff.0.parse::().ok(), diff.1.parse::().ok()) - { - compare_f32(num_a, num_b) - } else { - let cmp = compare::natural(); - cmp.compare(diff.0, diff.1) - } - } else { - Ordering::Equal - } -} diff --git a/mediarepo-daemon/mediarepo-socket/src/namespaces/files/searching.rs b/mediarepo-daemon/mediarepo-socket/src/namespaces/files/searching.rs new file mode 100644 index 0000000..e69de29 diff --git a/mediarepo-daemon/mediarepo-socket/src/namespaces/files/sorting.rs b/mediarepo-daemon/mediarepo-socket/src/namespaces/files/sorting.rs new file mode 100644 index 0000000..06ffc5a --- /dev/null +++ b/mediarepo-daemon/mediarepo-socket/src/namespaces/files/sorting.rs @@ -0,0 +1,114 @@ +use crate::namespaces::files::FileSortContext; +use compare::Compare; +use mediarepo_core::mediarepo_api::types::filtering::{SortDirection, SortKey}; +use std::cmp::Ordering; + +#[tracing::instrument(level = "trace", skip_all)] +pub fn compare_files( + ctx_a: &FileSortContext, + ctx_b: &FileSortContext, + expression: &Vec, +) -> Ordering { + let cmp_date = compare::natural(); + let cmp_u64 = compare::natural(); + let cmp_u32 = compare::natural(); + + for sort_key in expression { + let ordering = match sort_key { + SortKey::Namespace(namespace) => { + let list_a = ctx_a.namespaces.get(&namespace.name); + let list_b = ctx_b.namespaces.get(&namespace.name); + + let cmp_result = if let (Some(list_a), Some(list_b)) = (list_a, list_b) { + compare_tag_lists(list_a, list_b) + } else if list_a.is_some() { + Ordering::Greater + } else if list_b.is_some() { + Ordering::Less + } else { + Ordering::Equal + }; + adjust_for_dir(cmp_result, &namespace.direction) + } + SortKey::FileName(direction) => { + adjust_for_dir(compare_opts(&ctx_a.name, &ctx_b.name), direction) + } + SortKey::FileSize(direction) => { + adjust_for_dir(cmp_u64.compare(&ctx_a.size, &ctx_b.size), direction) + } + SortKey::FileImportedTime(direction) => adjust_for_dir( + cmp_date.compare(&ctx_a.import_time, &ctx_b.import_time), + direction, + ), + SortKey::FileCreatedTime(direction) => adjust_for_dir( + cmp_date.compare(&ctx_a.create_time, &ctx_b.create_time), + direction, + ), + SortKey::FileChangeTime(direction) => adjust_for_dir( + cmp_date.compare(&ctx_a.change_time, &ctx_b.change_time), + direction, + ), + SortKey::FileType(direction) => { + adjust_for_dir(ctx_a.mime_type.cmp(&ctx_b.mime_type), direction) + } + SortKey::NumTags(direction) => adjust_for_dir( + cmp_u32.compare(&ctx_a.tag_count, &ctx_b.tag_count), + direction, + ), + }; + if !ordering.is_eq() { + return ordering; + } + } + + Ordering::Equal +} + +fn compare_opts(opt_a: &Option, opt_b: &Option) -> Ordering { + let cmp = compare::natural(); + if let (Some(a), Some(b)) = (opt_a, opt_b) { + cmp.compare(a, b) + } else if opt_a.is_some() { + Ordering::Greater + } else if opt_b.is_some() { + Ordering::Less + } else { + Ordering::Equal + } +} + +fn compare_f32(a: f32, b: f32) -> Ordering { + if a > b { + Ordering::Greater + } else if b > a { + Ordering::Less + } else { + Ordering::Equal + } +} + +fn adjust_for_dir(ordering: Ordering, direction: &SortDirection) -> Ordering { + if *direction == SortDirection::Descending { + ordering.reverse() + } else { + ordering + } +} + +fn compare_tag_lists(list_a: &Vec, list_b: &Vec) -> Ordering { + let first_diff = list_a + .into_iter() + .zip(list_b.into_iter()) + .find(|(a, b)| *a != *b); + if let Some(diff) = first_diff { + if let (Some(num_a), Some(num_b)) = (diff.0.parse::().ok(), diff.1.parse::().ok()) + { + compare_f32(num_a, num_b) + } else { + let cmp = compare::natural(); + cmp.compare(diff.0, diff.1) + } + } else { + Ordering::Equal + } +} diff --git a/mediarepo-daemon/src/main.rs b/mediarepo-daemon/src/main.rs index aa83514..3e60d09 100644 --- a/mediarepo-daemon/src/main.rs +++ b/mediarepo-daemon/src/main.rs @@ -6,11 +6,14 @@ use tokio::runtime; use tokio::runtime::Runtime; use mediarepo_core::error::RepoResult; -use mediarepo_core::futures; +use mediarepo_core::fs::drop_file::DropFile; use mediarepo_core::settings::{PathSettings, Settings}; +use mediarepo_core::tokio_graceful_shutdown::{SubsystemHandle, Toplevel}; use mediarepo_model::repo::Repo; use mediarepo_socket::start_tcp_server; use std::env; +use std::time::Duration; +use tokio::io::AsyncWriteExt; use crate::utils::{create_paths_for_repo, get_repo, load_settings}; @@ -56,6 +59,7 @@ fn main() -> RepoResult<()> { } else { Settings::default() }; + clean_old_connection_files(&opt.repo)?; let mut guards = Vec::new(); if opt.profile { @@ -105,24 +109,103 @@ async fn init_repo(opt: &Opt, paths: &PathSettings) -> RepoResult { /// Starts the server async fn start_server(opt: Opt, settings: Settings) -> RepoResult<()> { let repo = init_repo(&opt, &settings.paths).await?; - let mut handles = Vec::new(); + let mut top_level = Toplevel::new(); #[cfg(unix)] { - let socket_path = opt.repo.join("repo.sock"); - let handle = mediarepo_socket::create_unix_socket( - socket_path, - opt.repo.clone(), - settings.clone(), - repo.clone(), - )?; - handles.push(handle); + if settings.server.unix_socket.enabled { + let settings = settings.clone(); + let repo_path = opt.repo.clone(); + let repo = repo.clone(); + + top_level = top_level.start("mediarepo-unix-socket", |subsystem| { + Box::pin(async move { + start_and_await_unix_socket(subsystem, repo_path, settings, repo).await?; + Ok(()) + }) + }) + } + } + + if settings.server.tcp.enabled { + top_level = top_level.start("mediarepo-tcp", move |subsystem| { + Box::pin(async move { + start_and_await_tcp_server(subsystem, opt.repo, settings, repo).await?; + + Ok(()) + }) + }) + } + if let Err(e) = top_level + .catch_signals() + .handle_shutdown_requests(Duration::from_millis(1000)) + .await + { + tracing::error!("an error occurred when running the servers {}", e); + } + + tracing::warn!( + r"All servers quit. + Either they were requested to stop, a fatal error occurred or no servers are enabled in the config. + Stopping daemon..." + ); + + Ok(()) +} + +async fn start_and_await_tcp_server( + subsystem: SubsystemHandle, + repo_path: PathBuf, + settings: Settings, + repo: Repo, +) -> RepoResult<()> { + let (address, handle) = start_tcp_server(subsystem.clone(), repo_path.clone(), settings, repo)?; + let (mut file, _guard) = DropFile::new(repo_path.join("repo.tcp")).await?; + file.write_all(&address.into_bytes()).await?; + + tokio::select! { + _ = subsystem.on_shutdown_requested() => { + tracing::info!("shutdown requested") + }, + result = handle => { + if let Err(e) = result { + tracing::error!("the tcp server shut down with an error {}", e); + subsystem.request_shutdown(); + } + } } - let (address, tcp_handle) = start_tcp_server(opt.repo.clone(), settings, repo)?; - handles.push(tcp_handle); - fs::write(opt.repo.join("repo.tcp"), &address.into_bytes()).await?; - futures::future::join_all(handles.into_iter()).await; + Ok(()) +} + +#[cfg(unix)] +async fn start_and_await_unix_socket( + subsystem: SubsystemHandle, + repo_path: PathBuf, + settings: Settings, + repo: Repo, +) -> RepoResult<()> { + let socket_path = repo_path.join("repo.sock"); + let handle = mediarepo_socket::create_unix_socket( + subsystem.clone(), + socket_path, + repo_path.clone(), + settings, + repo, + )?; + let _guard = DropFile::from_path(repo_path.join("repo.sock")); + + tokio::select! { + _ = subsystem.on_shutdown_requested() => { + tracing::info!("shutdown requested") + }, + result = handle => { + if let Err(e) = result { + tracing::error!("the unix socket shut down with an error {}", e); + subsystem.request_shutdown(); + } + } + } Ok(()) } @@ -153,3 +236,17 @@ async fn init(opt: Opt, force: bool) -> RepoResult<()> { Ok(()) } + +fn clean_old_connection_files(root: &PathBuf) -> RepoResult<()> { + let paths = ["repo.tcp", "repo.sock"]; + + for path in paths { + let path = root.join(path); + + if path.exists() { + std::fs::remove_file(&path)?; + } + } + + Ok(()) +}