From 9b2092270a7858e92dc56a1dc8ef2aa992a85c88 Mon Sep 17 00:00:00 2001 From: trivernis Date: Sun, 7 Nov 2021 16:35:02 +0100 Subject: [PATCH] Update rmp-ipc and add dynamic tcp ports Signed-off-by: trivernis --- mediarepo-daemon/Cargo.lock | 17 ++++++--- mediarepo-daemon/mediarepo-core/Cargo.lock | 6 ++-- mediarepo-daemon/mediarepo-core/Cargo.toml | 2 +- mediarepo-daemon/mediarepo-core/src/error.rs | 3 ++ .../mediarepo-core/src/settings.rs | 7 ++-- .../mediarepo-database/Cargo.lock | 6 ++-- mediarepo-daemon/mediarepo-model/Cargo.lock | 6 ++-- .../mediarepo-model/src/thumbnail.rs | 4 +++ mediarepo-daemon/mediarepo-socket/Cargo.lock | 17 ++++++--- mediarepo-daemon/mediarepo-socket/Cargo.toml | 3 +- .../mediarepo-socket/src/from_model.rs | 1 + mediarepo-daemon/mediarepo-socket/src/lib.rs | 36 +++++++++++++++++-- .../mediarepo-socket/src/namespaces/files.rs | 22 +++++++----- .../mediarepo-socket/src/namespaces/mod.rs | 3 +- .../mediarepo-socket/src/namespaces/tags.rs | 21 +++++++---- .../mediarepo-socket/src/utils.rs | 3 +- mediarepo-daemon/src/main.rs | 19 +++++----- 17 files changed, 130 insertions(+), 46 deletions(-) diff --git a/mediarepo-daemon/Cargo.lock b/mediarepo-daemon/Cargo.lock index 8ebebb0..d849ec2 100644 --- a/mediarepo-daemon/Cargo.lock +++ b/mediarepo-daemon/Cargo.lock @@ -849,8 +849,8 @@ checksum = "a3e378b66a060d48947b590737b30a1be76706c8dd7b8ba0f2fe3989c68a853f" [[package]] name = "mediarepo-api" -version = "0.4.2" -source = "git+https://github.com/Trivernis/mediarepo-api.git?rev=28b25e94eb2cdb8cec86e3e452081a649b8cd64e#28b25e94eb2cdb8cec86e3e452081a649b8cd64e" +version = "0.5.1" +source = "git+https://github.com/Trivernis/mediarepo-api.git?rev=17a7ade9a8112d3c8450ab6ea67c4f184d05744e#17a7ade9a8112d3c8450ab6ea67c4f184d05744e" dependencies = [ "chrono", "serde", @@ -934,6 +934,7 @@ dependencies = [ "mediarepo-core", "mediarepo-database", "mediarepo-model", + "port_check", "serde", "tokio", "tracing", @@ -1286,6 +1287,12 @@ dependencies = [ "miniz_oxide 0.3.7", ] +[[package]] +name = "port_check" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6519412c9e0d4be579b9f0618364d19cb434b324fc6ddb1b27b1e682c7105ed" + [[package]] name = "ppv-lite86" version = "0.2.14" @@ -1519,10 +1526,12 @@ dependencies = [ [[package]] name = "rmp-ipc" -version = "0.7.2" +version = "0.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f68b5ec0f51d53896979bb5364c10c6b0edf753b56570f1f2425b24ea6e85955" +checksum = "05b0a3f127316ca5ca832cb7e9e616641ffbab659c2cb2ab7210d60e7441f70f" dependencies = [ + "async-trait", + "byteorder", "lazy_static", "rmp-serde", "serde", diff --git a/mediarepo-daemon/mediarepo-core/Cargo.lock b/mediarepo-daemon/mediarepo-core/Cargo.lock index 4aadeb9..5db5c70 100644 --- a/mediarepo-daemon/mediarepo-core/Cargo.lock +++ b/mediarepo-daemon/mediarepo-core/Cargo.lock @@ -1061,10 +1061,12 @@ dependencies = [ [[package]] name = "rmp-ipc" -version = "0.7.2" +version = "0.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f68b5ec0f51d53896979bb5364c10c6b0edf753b56570f1f2425b24ea6e85955" +checksum = "05b0a3f127316ca5ca832cb7e9e616641ffbab659c2cb2ab7210d60e7441f70f" dependencies = [ + "async-trait", + "byteorder", "lazy_static", "rmp-serde", "serde", diff --git a/mediarepo-daemon/mediarepo-core/Cargo.toml b/mediarepo-daemon/mediarepo-core/Cargo.toml index fc92aa1..fe29c80 100644 --- a/mediarepo-daemon/mediarepo-core/Cargo.toml +++ b/mediarepo-daemon/mediarepo-core/Cargo.toml @@ -12,7 +12,7 @@ multibase = "0.9.1" base64 = "0.13.0" toml = "0.5.8" serde = "1.0.130" -rmp-ipc = "0.7.2" +rmp-ipc = "0.9.1" typemap_rev = "0.1.5" futures = "0.3.17" thumbnailer = "0.1.0" diff --git a/mediarepo-daemon/mediarepo-core/src/error.rs b/mediarepo-daemon/mediarepo-core/src/error.rs index baa544d..8217e32 100644 --- a/mediarepo-daemon/mediarepo-core/src/error.rs +++ b/mediarepo-daemon/mediarepo-core/src/error.rs @@ -31,6 +31,9 @@ pub enum RepoError { #[error(transparent)] Thumbnailer(#[from] thumbnailer::error::ThumbError), + + #[error("No free tcp port available")] + PortUnavailable, } #[derive(Error, Debug)] diff --git a/mediarepo-daemon/mediarepo-core/src/settings.rs b/mediarepo-daemon/mediarepo-core/src/settings.rs index 2d72071..423efcc 100644 --- a/mediarepo-daemon/mediarepo-core/src/settings.rs +++ b/mediarepo-daemon/mediarepo-core/src/settings.rs @@ -1,9 +1,11 @@ use crate::error::RepoResult; use serde::{Deserialize, Serialize}; +use std::net::IpAddr; #[derive(Clone, Debug, Deserialize, Serialize)] pub struct Settings { - pub listen_address: String, + pub listen_address: IpAddr, + pub port_range: (u16, u16), pub database_path: String, pub default_file_store: String, pub thumbnail_store: String, @@ -12,7 +14,8 @@ pub struct Settings { impl Default for Settings { fn default() -> Self { Self { - listen_address: "127.0.0.1:3425".to_string(), + listen_address: IpAddr::from([127, 0, 0, 1]), + port_range: (3400, 3500), database_path: "./db/repo.db".to_string(), default_file_store: "Main".to_string(), thumbnail_store: "Thumbnails".to_string(), diff --git a/mediarepo-daemon/mediarepo-database/Cargo.lock b/mediarepo-daemon/mediarepo-database/Cargo.lock index e8aa5d7..3cf3895 100644 --- a/mediarepo-daemon/mediarepo-database/Cargo.lock +++ b/mediarepo-daemon/mediarepo-database/Cargo.lock @@ -1295,10 +1295,12 @@ dependencies = [ [[package]] name = "rmp-ipc" -version = "0.7.2" +version = "0.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f68b5ec0f51d53896979bb5364c10c6b0edf753b56570f1f2425b24ea6e85955" +checksum = "05b0a3f127316ca5ca832cb7e9e616641ffbab659c2cb2ab7210d60e7441f70f" dependencies = [ + "async-trait", + "byteorder", "lazy_static", "rmp-serde", "serde", diff --git a/mediarepo-daemon/mediarepo-model/Cargo.lock b/mediarepo-daemon/mediarepo-model/Cargo.lock index 85a9e8d..896dba4 100644 --- a/mediarepo-daemon/mediarepo-model/Cargo.lock +++ b/mediarepo-daemon/mediarepo-model/Cargo.lock @@ -1321,10 +1321,12 @@ dependencies = [ [[package]] name = "rmp-ipc" -version = "0.7.2" +version = "0.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f68b5ec0f51d53896979bb5364c10c6b0edf753b56570f1f2425b24ea6e85955" +checksum = "05b0a3f127316ca5ca832cb7e9e616641ffbab659c2cb2ab7210d60e7441f70f" dependencies = [ + "async-trait", + "byteorder", "lazy_static", "rmp-serde", "serde", diff --git a/mediarepo-daemon/mediarepo-model/src/thumbnail.rs b/mediarepo-daemon/mediarepo-model/src/thumbnail.rs index 9c34871..746be5c 100644 --- a/mediarepo-daemon/mediarepo-model/src/thumbnail.rs +++ b/mediarepo-daemon/mediarepo-model/src/thumbnail.rs @@ -101,6 +101,10 @@ impl Thumbnail { self.model.id } + pub fn file_id(&self) -> i64 { + self.model.file_id + } + pub fn hash(&self) -> &String { &self.hash.value } diff --git a/mediarepo-daemon/mediarepo-socket/Cargo.lock b/mediarepo-daemon/mediarepo-socket/Cargo.lock index 25d539c..935dfe5 100644 --- a/mediarepo-daemon/mediarepo-socket/Cargo.lock +++ b/mediarepo-daemon/mediarepo-socket/Cargo.lock @@ -790,8 +790,8 @@ checksum = "a3e378b66a060d48947b590737b30a1be76706c8dd7b8ba0f2fe3989c68a853f" [[package]] name = "mediarepo-api" -version = "0.4.2" -source = "git+https://github.com/Trivernis/mediarepo-api.git?rev=28b25e94eb2cdb8cec86e3e452081a649b8cd64e#28b25e94eb2cdb8cec86e3e452081a649b8cd64e" +version = "0.5.1" +source = "git+https://github.com/Trivernis/mediarepo-api.git?rev=17a7ade9a8112d3c8450ab6ea67c4f184d05744e#17a7ade9a8112d3c8450ab6ea67c4f184d05744e" dependencies = [ "chrono", "serde", @@ -856,6 +856,7 @@ dependencies = [ "mediarepo-core", "mediarepo-database", "mediarepo-model", + "port_check", "serde", "tokio", "tracing", @@ -1208,6 +1209,12 @@ dependencies = [ "miniz_oxide 0.3.7", ] +[[package]] +name = "port_check" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6519412c9e0d4be579b9f0618364d19cb434b324fc6ddb1b27b1e682c7105ed" + [[package]] name = "ppv-lite86" version = "0.2.10" @@ -1417,10 +1424,12 @@ dependencies = [ [[package]] name = "rmp-ipc" -version = "0.7.2" +version = "0.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f68b5ec0f51d53896979bb5364c10c6b0edf753b56570f1f2425b24ea6e85955" +checksum = "05b0a3f127316ca5ca832cb7e9e616641ffbab659c2cb2ab7210d60e7441f70f" dependencies = [ + "async-trait", + "byteorder", "lazy_static", "rmp-serde", "serde", diff --git a/mediarepo-daemon/mediarepo-socket/Cargo.toml b/mediarepo-daemon/mediarepo-socket/Cargo.toml index 53f3d1a..3328ee4 100644 --- a/mediarepo-daemon/mediarepo-socket/Cargo.toml +++ b/mediarepo-daemon/mediarepo-socket/Cargo.toml @@ -9,6 +9,7 @@ edition = "2018" serde = "1.0.130" tracing = "0.1.29" compare = "0.1.0" +port_check = "0.1.5" [dependencies.mediarepo-core] path = "../mediarepo-core" @@ -33,4 +34,4 @@ features = ["tokio-executor"] [dependencies.mediarepo-api] git = "https://github.com/Trivernis/mediarepo-api.git" -rev = "28b25e94eb2cdb8cec86e3e452081a649b8cd64e" \ No newline at end of file +rev = "17a7ade9a8112d3c8450ab6ea67c4f184d05744e" \ No newline at end of file diff --git a/mediarepo-daemon/mediarepo-socket/src/from_model.rs b/mediarepo-daemon/mediarepo-socket/src/from_model.rs index f89a2ff..b1e28db 100644 --- a/mediarepo-daemon/mediarepo-socket/src/from_model.rs +++ b/mediarepo-daemon/mediarepo-socket/src/from_model.rs @@ -38,6 +38,7 @@ impl FromModel for ThumbnailMetadataResponse { fn from_model(model: Thumbnail) -> Self { Self { id: model.id(), + file_id: model.file_id(), hash: model.hash().to_owned(), height: model.height(), width: model.width(), diff --git a/mediarepo-daemon/mediarepo-socket/src/lib.rs b/mediarepo-daemon/mediarepo-socket/src/lib.rs index 0c37d74..aa49f57 100644 --- a/mediarepo-daemon/mediarepo-socket/src/lib.rs +++ b/mediarepo-daemon/mediarepo-socket/src/lib.rs @@ -1,16 +1,48 @@ use mediarepo_api::types::misc::InfoResponse; +use mediarepo_core::error::{RepoError, RepoResult}; use mediarepo_core::rmp_ipc::prelude::*; +use mediarepo_core::settings::Settings; +use mediarepo_core::type_keys::SettingsKey; +use mediarepo_model::repo::Repo; +use mediarepo_model::type_keys::RepoKey; +use std::net::{IpAddr, SocketAddr}; +use std::sync::Arc; +use tokio::net::TcpListener; +use tokio::task::JoinHandle; mod from_model; mod namespaces; mod utils; -pub fn get_builder(address: &str) -> IPCBuilder { +pub fn start_tcp_server( + ip: IpAddr, + port_range: (u16, u16), + settings: Settings, + repo: Repo, +) -> RepoResult<(String, JoinHandle<()>)> { + let port = port_check::free_local_port_in_range(port_range.0, port_range.1) + .ok_or_else(|| RepoError::PortUnavailable)?; + let address = SocketAddr::new(ip, port); + let address_string = address.to_string(); + + let join_handle = tokio::task::spawn(async move { + get_builder::(address) + .insert::(Arc::new(repo)) + .insert::(settings) + .build_server() + .await + .expect("Failed to start tcp server") + }); + + Ok((address_string, join_handle)) +} + +fn get_builder(address: L::AddressType) -> IPCBuilder { namespaces::build_namespaces(IPCBuilder::new().address(address)).on("info", callback!(info)) } #[tracing::instrument(skip_all)] -async fn info(ctx: &Context, event: Event) -> IPCResult<()> { +async fn info(ctx: &Context, event: Event) -> IPCResult<()> { let response = InfoResponse::new( env!("CARGO_PKG_NAME").to_string(), env!("CARGO_PKG_VERSION").to_string(), diff --git a/mediarepo-daemon/mediarepo-socket/src/namespaces/files.rs b/mediarepo-daemon/mediarepo-socket/src/namespaces/files.rs index fb6ffb4..6142a9d 100644 --- a/mediarepo-daemon/mediarepo-socket/src/namespaces/files.rs +++ b/mediarepo-daemon/mediarepo-socket/src/namespaces/files.rs @@ -21,7 +21,7 @@ impl NamespaceProvider for FilesNamespace { "files" } - fn register(handler: &mut EventHandler) { + fn register(handler: &mut EventHandler) { events!(handler, "all_files" => Self::all_files, "find_files" => Self::find_files, @@ -37,7 +37,7 @@ impl NamespaceProvider for FilesNamespace { impl FilesNamespace { /// Returns a list of all files #[tracing::instrument(skip_all)] - async fn all_files(ctx: &Context, event: Event) -> IPCResult<()> { + async fn all_files(ctx: &Context, event: Event) -> IPCResult<()> { let repo = get_repo_from_context(ctx).await; let files = repo.files().await?; @@ -55,7 +55,7 @@ impl FilesNamespace { /// Searches for files by tags #[tracing::instrument(skip_all)] - async fn find_files(ctx: &Context, event: Event) -> IPCResult<()> { + async fn find_files(ctx: &Context, event: Event) -> IPCResult<()> { let req = event.data::()?; let repo = get_repo_from_context(ctx).await; let tags = req.tags.into_iter().map(|t| (t.name, t.negate)).collect(); @@ -91,7 +91,7 @@ impl FilesNamespace { /// Adds a file to the repository #[tracing::instrument(skip_all)] - async fn add_file(ctx: &Context, event: Event) -> IPCResult<()> { + async fn add_file(ctx: &Context, event: Event) -> IPCResult<()> { let request = event.data::()?; let path = PathBuf::from(request.path); let repo = get_repo_from_context(ctx).await; @@ -111,7 +111,7 @@ impl FilesNamespace { /// Reads the binary contents of a file #[tracing::instrument(skip_all)] - async fn read_file(ctx: &Context, event: Event) -> IPCResult<()> { + async fn read_file(ctx: &Context, event: Event) -> IPCResult<()> { let request = event.data::()?; let repo = get_repo_from_context(ctx).await; @@ -129,7 +129,7 @@ impl FilesNamespace { /// Returns a list of available thumbnails of a file #[tracing::instrument(skip_all)] - async fn thumbnails(ctx: &Context, event: Event) -> IPCResult<()> { + async fn thumbnails(ctx: &Context, event: Event) -> IPCResult<()> { let request = event.data::()?; let repo = get_repo_from_context(ctx).await; let file = file_by_identifier(request.id, &repo).await?; @@ -155,7 +155,10 @@ impl FilesNamespace { /// Reads a thumbnail for the given thumbnail hash #[tracing::instrument(skip_all)] - async fn read_thumbnail(ctx: &Context, event: Event) -> IPCResult<()> { + async fn read_thumbnail( + ctx: &Context, + event: Event, + ) -> IPCResult<()> { let hash = event.data::()?; let repo = get_repo_from_context(ctx).await; let thumbnail = repo @@ -179,7 +182,10 @@ impl FilesNamespace { /// Updates the name of a file #[tracing::instrument(skip_all)] - async fn update_file_name(ctx: &Context, event: Event) -> IPCResult<()> { + async fn update_file_name( + ctx: &Context, + event: Event, + ) -> IPCResult<()> { let repo = get_repo_from_context(ctx).await; let request = event.data::()?; let mut file = file_by_identifier(request.file_id, &repo).await?; diff --git a/mediarepo-daemon/mediarepo-socket/src/namespaces/mod.rs b/mediarepo-daemon/mediarepo-socket/src/namespaces/mod.rs index bdd4c2d..5767e91 100644 --- a/mediarepo-daemon/mediarepo-socket/src/namespaces/mod.rs +++ b/mediarepo-daemon/mediarepo-socket/src/namespaces/mod.rs @@ -1,9 +1,10 @@ +use mediarepo_core::rmp_ipc::prelude::AsyncStreamProtocolListener; use mediarepo_core::rmp_ipc::{namespace, namespace::Namespace, IPCBuilder}; pub mod files; pub mod tags; -pub fn build_namespaces(builder: IPCBuilder) -> IPCBuilder { +pub fn build_namespaces(builder: IPCBuilder) -> IPCBuilder { builder .add_namespace(namespace!(files::FilesNamespace)) .add_namespace(namespace!(tags::TagsNamespace)) diff --git a/mediarepo-daemon/mediarepo-socket/src/namespaces/tags.rs b/mediarepo-daemon/mediarepo-socket/src/namespaces/tags.rs index b8e9922..b47ea7e 100644 --- a/mediarepo-daemon/mediarepo-socket/src/namespaces/tags.rs +++ b/mediarepo-daemon/mediarepo-socket/src/namespaces/tags.rs @@ -11,7 +11,7 @@ impl NamespaceProvider for TagsNamespace { "tags" } - fn register(handler: &mut EventHandler) { + fn register(handler: &mut EventHandler) { events!(handler, "all_tags" => Self::all_tags, "tags_for_file" => Self::tags_for_file, @@ -25,7 +25,7 @@ impl NamespaceProvider for TagsNamespace { impl TagsNamespace { /// Returns a list of all tags in the database #[tracing::instrument(skip_all)] - async fn all_tags(ctx: &Context, event: Event) -> IPCResult<()> { + async fn all_tags(ctx: &Context, event: Event) -> IPCResult<()> { let repo = get_repo_from_context(ctx).await; let tags: Vec = repo .tags() @@ -42,7 +42,10 @@ impl TagsNamespace { /// Returns all tags for a single file #[tracing::instrument(skip_all)] - async fn tags_for_file(ctx: &Context, event: Event) -> IPCResult<()> { + async fn tags_for_file( + ctx: &Context, + event: Event, + ) -> IPCResult<()> { let repo = get_repo_from_context(ctx).await; let request = event.data::()?; let file = file_by_identifier(request.id, &repo).await?; @@ -58,7 +61,10 @@ impl TagsNamespace { /// Returns all tags for a given list of file hashes #[tracing::instrument(skip_all)] - async fn tags_for_files(ctx: &Context, event: Event) -> IPCResult<()> { + async fn tags_for_files( + ctx: &Context, + event: Event, + ) -> IPCResult<()> { let repo = get_repo_from_context(ctx).await; let request = event.data::()?; let tag_responses: Vec = repo @@ -76,7 +82,7 @@ impl TagsNamespace { /// Creates all tags given as input or returns the existing tag #[tracing::instrument(skip_all)] - async fn create_tags(ctx: &Context, event: Event) -> IPCResult<()> { + async fn create_tags(ctx: &Context, event: Event) -> IPCResult<()> { let repo = get_repo_from_context(ctx).await; let tags = event.data::>()?; let mut created_tags = Vec::new(); @@ -99,7 +105,10 @@ impl TagsNamespace { /// Changes tags of a file /// it removes the tags from the removed list and adds the one from the add list #[tracing::instrument(skip_all)] - async fn change_file_tags(ctx: &Context, event: Event) -> IPCResult<()> { + async fn change_file_tags( + ctx: &Context, + event: Event, + ) -> IPCResult<()> { let repo = get_repo_from_context(ctx).await; let request = event.data::()?; let file = file_by_identifier(request.file_id, &repo).await?; diff --git a/mediarepo-daemon/mediarepo-socket/src/utils.rs b/mediarepo-daemon/mediarepo-socket/src/utils.rs index 6038554..6b2d759 100644 --- a/mediarepo-daemon/mediarepo-socket/src/utils.rs +++ b/mediarepo-daemon/mediarepo-socket/src/utils.rs @@ -1,12 +1,13 @@ use mediarepo_api::types::identifier::FileIdentifier; use mediarepo_core::error::{RepoError, RepoResult}; use mediarepo_core::rmp_ipc::ipc::context::Context; +use mediarepo_core::rmp_ipc::protocol::AsyncProtocolStream; use mediarepo_model::file::File; use mediarepo_model::repo::Repo; use mediarepo_model::type_keys::RepoKey; use std::sync::Arc; -pub async fn get_repo_from_context(ctx: &Context) -> Arc { +pub async fn get_repo_from_context(ctx: &Context) -> Arc { let data = ctx.data.read().await; let repo = data.get::().unwrap(); Arc::clone(repo) diff --git a/mediarepo-daemon/src/main.rs b/mediarepo-daemon/src/main.rs index 8b20fd4..f388332 100644 --- a/mediarepo-daemon/src/main.rs +++ b/mediarepo-daemon/src/main.rs @@ -1,5 +1,4 @@ use std::path::PathBuf; -use std::sync::Arc; use structopt::StructOpt; use tokio::fs; @@ -9,12 +8,10 @@ use tokio::runtime::Runtime; use mediarepo_core::error::RepoResult; use mediarepo_core::futures; use mediarepo_core::settings::Settings; -use mediarepo_core::type_keys::SettingsKey; use mediarepo_core::utils::parse_tags_file; use mediarepo_model::file::{File as RepoFile, File}; use mediarepo_model::repo::Repo; -use mediarepo_model::type_keys::RepoKey; -use mediarepo_socket::get_builder; +use mediarepo_socket::start_tcp_server; use num_integer::Integer; use std::env; @@ -124,12 +121,14 @@ async fn init_repo(opt: &Opt) -> RepoResult<(Settings, Repo)> { /// Starts the server async fn start_server(opt: Opt) -> RepoResult<()> { let (settings, repo) = init_repo(&opt).await?; - - get_builder(&settings.listen_address) - .insert::(settings) - .insert::(Arc::new(repo)) - .build_server() - .await?; + let (address, handle) = start_tcp_server( + settings.listen_address.clone(), + settings.port_range, + settings, + repo, + )?; + fs::write(opt.repo.join(".tcp"), &address.into_bytes()).await?; + handle.await.unwrap(); Ok(()) }