Update rmp-ipc and add dynamic tcp ports

Signed-off-by: trivernis <trivernis@protonmail.com>
pull/4/head
trivernis 3 years ago
parent 2735033859
commit 9b2092270a

@ -849,8 +849,8 @@ checksum = "a3e378b66a060d48947b590737b30a1be76706c8dd7b8ba0f2fe3989c68a853f"
[[package]]
name = "mediarepo-api"
version = "0.4.2"
source = "git+https://github.com/Trivernis/mediarepo-api.git?rev=28b25e94eb2cdb8cec86e3e452081a649b8cd64e#28b25e94eb2cdb8cec86e3e452081a649b8cd64e"
version = "0.5.1"
source = "git+https://github.com/Trivernis/mediarepo-api.git?rev=17a7ade9a8112d3c8450ab6ea67c4f184d05744e#17a7ade9a8112d3c8450ab6ea67c4f184d05744e"
dependencies = [
"chrono",
"serde",
@ -934,6 +934,7 @@ dependencies = [
"mediarepo-core",
"mediarepo-database",
"mediarepo-model",
"port_check",
"serde",
"tokio",
"tracing",
@ -1286,6 +1287,12 @@ dependencies = [
"miniz_oxide 0.3.7",
]
[[package]]
name = "port_check"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f6519412c9e0d4be579b9f0618364d19cb434b324fc6ddb1b27b1e682c7105ed"
[[package]]
name = "ppv-lite86"
version = "0.2.14"
@ -1519,10 +1526,12 @@ dependencies = [
[[package]]
name = "rmp-ipc"
version = "0.7.2"
version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f68b5ec0f51d53896979bb5364c10c6b0edf753b56570f1f2425b24ea6e85955"
checksum = "05b0a3f127316ca5ca832cb7e9e616641ffbab659c2cb2ab7210d60e7441f70f"
dependencies = [
"async-trait",
"byteorder",
"lazy_static",
"rmp-serde",
"serde",

@ -1061,10 +1061,12 @@ dependencies = [
[[package]]
name = "rmp-ipc"
version = "0.7.2"
version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f68b5ec0f51d53896979bb5364c10c6b0edf753b56570f1f2425b24ea6e85955"
checksum = "05b0a3f127316ca5ca832cb7e9e616641ffbab659c2cb2ab7210d60e7441f70f"
dependencies = [
"async-trait",
"byteorder",
"lazy_static",
"rmp-serde",
"serde",

@ -12,7 +12,7 @@ multibase = "0.9.1"
base64 = "0.13.0"
toml = "0.5.8"
serde = "1.0.130"
rmp-ipc = "0.7.2"
rmp-ipc = "0.9.1"
typemap_rev = "0.1.5"
futures = "0.3.17"
thumbnailer = "0.1.0"

@ -31,6 +31,9 @@ pub enum RepoError {
#[error(transparent)]
Thumbnailer(#[from] thumbnailer::error::ThumbError),
#[error("No free tcp port available")]
PortUnavailable,
}
#[derive(Error, Debug)]

@ -1,9 +1,11 @@
use crate::error::RepoResult;
use serde::{Deserialize, Serialize};
use std::net::IpAddr;
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct Settings {
pub listen_address: String,
pub listen_address: IpAddr,
pub port_range: (u16, u16),
pub database_path: String,
pub default_file_store: String,
pub thumbnail_store: String,
@ -12,7 +14,8 @@ pub struct Settings {
impl Default for Settings {
fn default() -> Self {
Self {
listen_address: "127.0.0.1:3425".to_string(),
listen_address: IpAddr::from([127, 0, 0, 1]),
port_range: (3400, 3500),
database_path: "./db/repo.db".to_string(),
default_file_store: "Main".to_string(),
thumbnail_store: "Thumbnails".to_string(),

@ -1295,10 +1295,12 @@ dependencies = [
[[package]]
name = "rmp-ipc"
version = "0.7.2"
version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f68b5ec0f51d53896979bb5364c10c6b0edf753b56570f1f2425b24ea6e85955"
checksum = "05b0a3f127316ca5ca832cb7e9e616641ffbab659c2cb2ab7210d60e7441f70f"
dependencies = [
"async-trait",
"byteorder",
"lazy_static",
"rmp-serde",
"serde",

@ -1321,10 +1321,12 @@ dependencies = [
[[package]]
name = "rmp-ipc"
version = "0.7.2"
version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f68b5ec0f51d53896979bb5364c10c6b0edf753b56570f1f2425b24ea6e85955"
checksum = "05b0a3f127316ca5ca832cb7e9e616641ffbab659c2cb2ab7210d60e7441f70f"
dependencies = [
"async-trait",
"byteorder",
"lazy_static",
"rmp-serde",
"serde",

@ -101,6 +101,10 @@ impl Thumbnail {
self.model.id
}
pub fn file_id(&self) -> i64 {
self.model.file_id
}
pub fn hash(&self) -> &String {
&self.hash.value
}

@ -790,8 +790,8 @@ checksum = "a3e378b66a060d48947b590737b30a1be76706c8dd7b8ba0f2fe3989c68a853f"
[[package]]
name = "mediarepo-api"
version = "0.4.2"
source = "git+https://github.com/Trivernis/mediarepo-api.git?rev=28b25e94eb2cdb8cec86e3e452081a649b8cd64e#28b25e94eb2cdb8cec86e3e452081a649b8cd64e"
version = "0.5.1"
source = "git+https://github.com/Trivernis/mediarepo-api.git?rev=17a7ade9a8112d3c8450ab6ea67c4f184d05744e#17a7ade9a8112d3c8450ab6ea67c4f184d05744e"
dependencies = [
"chrono",
"serde",
@ -856,6 +856,7 @@ dependencies = [
"mediarepo-core",
"mediarepo-database",
"mediarepo-model",
"port_check",
"serde",
"tokio",
"tracing",
@ -1208,6 +1209,12 @@ dependencies = [
"miniz_oxide 0.3.7",
]
[[package]]
name = "port_check"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f6519412c9e0d4be579b9f0618364d19cb434b324fc6ddb1b27b1e682c7105ed"
[[package]]
name = "ppv-lite86"
version = "0.2.10"
@ -1417,10 +1424,12 @@ dependencies = [
[[package]]
name = "rmp-ipc"
version = "0.7.2"
version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f68b5ec0f51d53896979bb5364c10c6b0edf753b56570f1f2425b24ea6e85955"
checksum = "05b0a3f127316ca5ca832cb7e9e616641ffbab659c2cb2ab7210d60e7441f70f"
dependencies = [
"async-trait",
"byteorder",
"lazy_static",
"rmp-serde",
"serde",

@ -9,6 +9,7 @@ edition = "2018"
serde = "1.0.130"
tracing = "0.1.29"
compare = "0.1.0"
port_check = "0.1.5"
[dependencies.mediarepo-core]
path = "../mediarepo-core"
@ -33,4 +34,4 @@ features = ["tokio-executor"]
[dependencies.mediarepo-api]
git = "https://github.com/Trivernis/mediarepo-api.git"
rev = "28b25e94eb2cdb8cec86e3e452081a649b8cd64e"
rev = "17a7ade9a8112d3c8450ab6ea67c4f184d05744e"

@ -38,6 +38,7 @@ impl FromModel<Thumbnail> for ThumbnailMetadataResponse {
fn from_model(model: Thumbnail) -> Self {
Self {
id: model.id(),
file_id: model.file_id(),
hash: model.hash().to_owned(),
height: model.height(),
width: model.width(),

@ -1,16 +1,48 @@
use mediarepo_api::types::misc::InfoResponse;
use mediarepo_core::error::{RepoError, RepoResult};
use mediarepo_core::rmp_ipc::prelude::*;
use mediarepo_core::settings::Settings;
use mediarepo_core::type_keys::SettingsKey;
use mediarepo_model::repo::Repo;
use mediarepo_model::type_keys::RepoKey;
use std::net::{IpAddr, SocketAddr};
use std::sync::Arc;
use tokio::net::TcpListener;
use tokio::task::JoinHandle;
mod from_model;
mod namespaces;
mod utils;
pub fn get_builder(address: &str) -> IPCBuilder {
pub fn start_tcp_server(
ip: IpAddr,
port_range: (u16, u16),
settings: Settings,
repo: Repo,
) -> RepoResult<(String, JoinHandle<()>)> {
let port = port_check::free_local_port_in_range(port_range.0, port_range.1)
.ok_or_else(|| RepoError::PortUnavailable)?;
let address = SocketAddr::new(ip, port);
let address_string = address.to_string();
let join_handle = tokio::task::spawn(async move {
get_builder::<TcpListener>(address)
.insert::<RepoKey>(Arc::new(repo))
.insert::<SettingsKey>(settings)
.build_server()
.await
.expect("Failed to start tcp server")
});
Ok((address_string, join_handle))
}
fn get_builder<L: AsyncStreamProtocolListener>(address: L::AddressType) -> IPCBuilder<L> {
namespaces::build_namespaces(IPCBuilder::new().address(address)).on("info", callback!(info))
}
#[tracing::instrument(skip_all)]
async fn info(ctx: &Context, event: Event) -> IPCResult<()> {
async fn info<S: AsyncProtocolStream>(ctx: &Context<S>, event: Event) -> IPCResult<()> {
let response = InfoResponse::new(
env!("CARGO_PKG_NAME").to_string(),
env!("CARGO_PKG_VERSION").to_string(),

@ -21,7 +21,7 @@ impl NamespaceProvider for FilesNamespace {
"files"
}
fn register(handler: &mut EventHandler) {
fn register<S: AsyncProtocolStream>(handler: &mut EventHandler<S>) {
events!(handler,
"all_files" => Self::all_files,
"find_files" => Self::find_files,
@ -37,7 +37,7 @@ impl NamespaceProvider for FilesNamespace {
impl FilesNamespace {
/// Returns a list of all files
#[tracing::instrument(skip_all)]
async fn all_files(ctx: &Context, event: Event) -> IPCResult<()> {
async fn all_files<S: AsyncProtocolStream>(ctx: &Context<S>, event: Event) -> IPCResult<()> {
let repo = get_repo_from_context(ctx).await;
let files = repo.files().await?;
@ -55,7 +55,7 @@ impl FilesNamespace {
/// Searches for files by tags
#[tracing::instrument(skip_all)]
async fn find_files(ctx: &Context, event: Event) -> IPCResult<()> {
async fn find_files<S: AsyncProtocolStream>(ctx: &Context<S>, event: Event) -> IPCResult<()> {
let req = event.data::<FindFilesByTagsRequest>()?;
let repo = get_repo_from_context(ctx).await;
let tags = req.tags.into_iter().map(|t| (t.name, t.negate)).collect();
@ -91,7 +91,7 @@ impl FilesNamespace {
/// Adds a file to the repository
#[tracing::instrument(skip_all)]
async fn add_file(ctx: &Context, event: Event) -> IPCResult<()> {
async fn add_file<S: AsyncProtocolStream>(ctx: &Context<S>, event: Event) -> IPCResult<()> {
let request = event.data::<AddFileRequest>()?;
let path = PathBuf::from(request.path);
let repo = get_repo_from_context(ctx).await;
@ -111,7 +111,7 @@ impl FilesNamespace {
/// Reads the binary contents of a file
#[tracing::instrument(skip_all)]
async fn read_file(ctx: &Context, event: Event) -> IPCResult<()> {
async fn read_file<S: AsyncProtocolStream>(ctx: &Context<S>, event: Event) -> IPCResult<()> {
let request = event.data::<ReadFileRequest>()?;
let repo = get_repo_from_context(ctx).await;
@ -129,7 +129,7 @@ impl FilesNamespace {
/// Returns a list of available thumbnails of a file
#[tracing::instrument(skip_all)]
async fn thumbnails(ctx: &Context, event: Event) -> IPCResult<()> {
async fn thumbnails<S: AsyncProtocolStream>(ctx: &Context<S>, event: Event) -> IPCResult<()> {
let request = event.data::<GetFileThumbnailsRequest>()?;
let repo = get_repo_from_context(ctx).await;
let file = file_by_identifier(request.id, &repo).await?;
@ -155,7 +155,10 @@ impl FilesNamespace {
/// Reads a thumbnail for the given thumbnail hash
#[tracing::instrument(skip_all)]
async fn read_thumbnail(ctx: &Context, event: Event) -> IPCResult<()> {
async fn read_thumbnail<S: AsyncProtocolStream>(
ctx: &Context<S>,
event: Event,
) -> IPCResult<()> {
let hash = event.data::<String>()?;
let repo = get_repo_from_context(ctx).await;
let thumbnail = repo
@ -179,7 +182,10 @@ impl FilesNamespace {
/// Updates the name of a file
#[tracing::instrument(skip_all)]
async fn update_file_name(ctx: &Context, event: Event) -> IPCResult<()> {
async fn update_file_name<S: AsyncProtocolStream>(
ctx: &Context<S>,
event: Event,
) -> IPCResult<()> {
let repo = get_repo_from_context(ctx).await;
let request = event.data::<UpdateFileNameRequest>()?;
let mut file = file_by_identifier(request.file_id, &repo).await?;

@ -1,9 +1,10 @@
use mediarepo_core::rmp_ipc::prelude::AsyncStreamProtocolListener;
use mediarepo_core::rmp_ipc::{namespace, namespace::Namespace, IPCBuilder};
pub mod files;
pub mod tags;
pub fn build_namespaces(builder: IPCBuilder) -> IPCBuilder {
pub fn build_namespaces<L: AsyncStreamProtocolListener>(builder: IPCBuilder<L>) -> IPCBuilder<L> {
builder
.add_namespace(namespace!(files::FilesNamespace))
.add_namespace(namespace!(tags::TagsNamespace))

@ -11,7 +11,7 @@ impl NamespaceProvider for TagsNamespace {
"tags"
}
fn register(handler: &mut EventHandler) {
fn register<S: AsyncProtocolStream>(handler: &mut EventHandler<S>) {
events!(handler,
"all_tags" => Self::all_tags,
"tags_for_file" => Self::tags_for_file,
@ -25,7 +25,7 @@ impl NamespaceProvider for TagsNamespace {
impl TagsNamespace {
/// Returns a list of all tags in the database
#[tracing::instrument(skip_all)]
async fn all_tags(ctx: &Context, event: Event) -> IPCResult<()> {
async fn all_tags<S: AsyncProtocolStream>(ctx: &Context<S>, event: Event) -> IPCResult<()> {
let repo = get_repo_from_context(ctx).await;
let tags: Vec<TagResponse> = repo
.tags()
@ -42,7 +42,10 @@ impl TagsNamespace {
/// Returns all tags for a single file
#[tracing::instrument(skip_all)]
async fn tags_for_file(ctx: &Context, event: Event) -> IPCResult<()> {
async fn tags_for_file<S: AsyncProtocolStream>(
ctx: &Context<S>,
event: Event,
) -> IPCResult<()> {
let repo = get_repo_from_context(ctx).await;
let request = event.data::<GetFileTagsRequest>()?;
let file = file_by_identifier(request.id, &repo).await?;
@ -58,7 +61,10 @@ impl TagsNamespace {
/// Returns all tags for a given list of file hashes
#[tracing::instrument(skip_all)]
async fn tags_for_files(ctx: &Context, event: Event) -> IPCResult<()> {
async fn tags_for_files<S: AsyncProtocolStream>(
ctx: &Context<S>,
event: Event,
) -> IPCResult<()> {
let repo = get_repo_from_context(ctx).await;
let request = event.data::<GetFilesTagsRequest>()?;
let tag_responses: Vec<TagResponse> = repo
@ -76,7 +82,7 @@ impl TagsNamespace {
/// Creates all tags given as input or returns the existing tag
#[tracing::instrument(skip_all)]
async fn create_tags(ctx: &Context, event: Event) -> IPCResult<()> {
async fn create_tags<S: AsyncProtocolStream>(ctx: &Context<S>, event: Event) -> IPCResult<()> {
let repo = get_repo_from_context(ctx).await;
let tags = event.data::<Vec<String>>()?;
let mut created_tags = Vec::new();
@ -99,7 +105,10 @@ impl TagsNamespace {
/// Changes tags of a file
/// it removes the tags from the removed list and adds the one from the add list
#[tracing::instrument(skip_all)]
async fn change_file_tags(ctx: &Context, event: Event) -> IPCResult<()> {
async fn change_file_tags<S: AsyncProtocolStream>(
ctx: &Context<S>,
event: Event,
) -> IPCResult<()> {
let repo = get_repo_from_context(ctx).await;
let request = event.data::<ChangeFileTagsRequest>()?;
let file = file_by_identifier(request.file_id, &repo).await?;

@ -1,12 +1,13 @@
use mediarepo_api::types::identifier::FileIdentifier;
use mediarepo_core::error::{RepoError, RepoResult};
use mediarepo_core::rmp_ipc::ipc::context::Context;
use mediarepo_core::rmp_ipc::protocol::AsyncProtocolStream;
use mediarepo_model::file::File;
use mediarepo_model::repo::Repo;
use mediarepo_model::type_keys::RepoKey;
use std::sync::Arc;
pub async fn get_repo_from_context(ctx: &Context) -> Arc<Repo> {
pub async fn get_repo_from_context<S: AsyncProtocolStream>(ctx: &Context<S>) -> Arc<Repo> {
let data = ctx.data.read().await;
let repo = data.get::<RepoKey>().unwrap();
Arc::clone(repo)

@ -1,5 +1,4 @@
use std::path::PathBuf;
use std::sync::Arc;
use structopt::StructOpt;
use tokio::fs;
@ -9,12 +8,10 @@ use tokio::runtime::Runtime;
use mediarepo_core::error::RepoResult;
use mediarepo_core::futures;
use mediarepo_core::settings::Settings;
use mediarepo_core::type_keys::SettingsKey;
use mediarepo_core::utils::parse_tags_file;
use mediarepo_model::file::{File as RepoFile, File};
use mediarepo_model::repo::Repo;
use mediarepo_model::type_keys::RepoKey;
use mediarepo_socket::get_builder;
use mediarepo_socket::start_tcp_server;
use num_integer::Integer;
use std::env;
@ -124,12 +121,14 @@ async fn init_repo(opt: &Opt) -> RepoResult<(Settings, Repo)> {
/// Starts the server
async fn start_server(opt: Opt) -> RepoResult<()> {
let (settings, repo) = init_repo(&opt).await?;
get_builder(&settings.listen_address)
.insert::<SettingsKey>(settings)
.insert::<RepoKey>(Arc::new(repo))
.build_server()
.await?;
let (address, handle) = start_tcp_server(
settings.listen_address.clone(),
settings.port_range,
settings,
repo,
)?;
fs::write(opt.repo.join(".tcp"), &address.into_bytes()).await?;
handle.await.unwrap();
Ok(())
}

Loading…
Cancel
Save