Merge pull request #5 from Trivernis/feature/query-optimizations

Feature/query optimizations
pull/7/head
Julius Riegel 3 years ago committed by GitHub
commit 26817cfec9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -1219,13 +1219,13 @@ dependencies = [
[[package]]
name = "mediarepo-daemon"
version = "0.13.0"
version = "0.13.1"
dependencies = [
"console-subscriber",
"glob",
"log",
"mediarepo-core",
"mediarepo-model",
"mediarepo-logic",
"mediarepo-socket",
"num-integer",
"rolling-file",
@ -1251,7 +1251,7 @@ dependencies = [
]
[[package]]
name = "mediarepo-model"
name = "mediarepo-logic"
version = "0.1.0"
dependencies = [
"async-trait",
@ -1275,7 +1275,7 @@ dependencies = [
"compare",
"mediarepo-core",
"mediarepo-database",
"mediarepo-model",
"mediarepo-logic",
"port_check",
"rayon",
"serde 1.0.132",
@ -2499,9 +2499,9 @@ dependencies = [
[[package]]
name = "thumbnailer"
version = "0.2.4"
version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8981b60fe29b8213c829340b7d5bea9d0bfb145fc460450ea3db01cf54d8643a"
checksum = "6017341c89a0c406e38801119f67dd0b67d045ff0e50aa2cf8fc1de4a1b48c3b"
dependencies = [
"ffmpeg-next",
"image",

@ -1,6 +1,6 @@
[workspace]
members = ["mediarepo-core", "mediarepo-database", "mediarepo-model", "mediarepo-socket", "."]
default-members = ["mediarepo-core", "mediarepo-database", "mediarepo-model", "mediarepo-socket", "."]
members = ["mediarepo-core", "mediarepo-database", "mediarepo-logic", "mediarepo-socket", "."]
default-members = ["mediarepo-core", "mediarepo-database", "mediarepo-logic", "mediarepo-socket", "."]
[package]
name = "mediarepo-daemon"
@ -31,8 +31,8 @@ log = "^0.4.14"
[dependencies.mediarepo-core]
path = "./mediarepo-core"
[dependencies.mediarepo-model]
path = "./mediarepo-model"
[dependencies.mediarepo-logic]
path = "mediarepo-logic"
[dependencies.mediarepo-socket]
path = "./mediarepo-socket"
@ -47,4 +47,4 @@ features = ["env-filter", "ansi", "json"]
[features]
default = ["ffmpeg"]
ffmpeg = ["mediarepo-core/ffmpeg", "mediarepo-model/ffmpeg"]
ffmpeg = ["mediarepo-core/ffmpeg", "mediarepo-logic/ffmpeg"]

@ -22,7 +22,7 @@ data-encoding = "^2.3.2"
tokio-graceful-shutdown = "^0.4.3"
[dependencies.thumbnailer]
version = "^0.2.4"
version = "^0.2.5"
default-features = false
[dependencies.sea-orm]

@ -1,6 +1,7 @@
use crate::error::RepoResult;
use multihash::{Code, MultihashDigest};
use crate::error::RepoResult;
/// Creates a new content descriptor for the given file
pub fn create_content_descriptor(bytes: &[u8]) -> Vec<u8> {
Code::Sha2_256.digest(bytes).to_bytes()

@ -1,8 +1,10 @@
use crate::settings::Settings;
use sea_orm::DatabaseConnection;
use std::sync::Arc;
use sea_orm::DatabaseConnection;
use tokio::sync::Mutex;
use crate::settings::Settings;
#[derive(Clone, Default)]
pub struct Context {
pub settings: Arc<Mutex<Settings>>,

@ -1,5 +1,6 @@
use sea_orm::DbErr;
use std::fmt::{Debug, Formatter};
use sea_orm::DbErr;
use thiserror::Error;
pub type RepoResult<T> = Result<T, RepoError>;
@ -37,8 +38,11 @@ pub enum RepoError {
#[error("failed to decode data {0}")]
Decode(#[from] data_encoding::DecodeError),
#[error("Failed to read repo.toml configuration file {0}")]
#[error("failed to read repo.toml configuration file {0}")]
Config(#[from] config::ConfigError),
#[error("the database file is corrupted {0}")]
Corrupted(String),
}
#[derive(Error, Debug)]

@ -1,5 +1,6 @@
use std::io::Result;
use std::path::{Path, PathBuf};
use tokio::fs::{File, OpenOptions};
/// A file that only exists while being owned.

@ -1,11 +1,13 @@
use crate::content_descriptor::{create_content_descriptor, encode_content_descriptor};
use crate::error::RepoResult;
use crate::utils::get_folder_size;
use std::path::PathBuf;
use tokio::fs;
use tokio::fs::{File, OpenOptions};
use tokio::io::{AsyncRead, AsyncReadExt, BufReader};
use crate::content_descriptor::{create_content_descriptor, encode_content_descriptor};
use crate::error::RepoResult;
use crate::utils::get_folder_size;
#[derive(Clone, Debug)]
pub struct FileHashStore {
path: PathBuf,

@ -1,12 +1,14 @@
use crate::error::RepoResult;
use crate::utils::get_folder_size;
use std::fmt::Debug;
use std::io::Result;
use std::path::PathBuf;
use tokio::fs;
use tokio::fs::OpenOptions;
use tokio::io::{AsyncWriteExt, BufWriter};
use crate::error::RepoResult;
use crate::utils::get_folder_size;
#[derive(Clone, Debug)]
pub struct ThumbnailStore {
path: PathBuf,

@ -1,19 +1,21 @@
mod logging;
mod paths;
mod server;
pub mod v1;
use std::fs;
use std::path::PathBuf;
use crate::error::RepoResult;
use crate::settings::v1::SettingsV1;
use config::{Config, FileFormat};
use serde::{Deserialize, Serialize};
use std::fs;
use std::path::PathBuf;
pub use logging::*;
pub use paths::*;
pub use server::*;
use crate::error::RepoResult;
use crate::settings::v1::SettingsV1;
mod logging;
mod paths;
mod server;
pub mod v1;
#[derive(Clone, Debug, Deserialize, Serialize, Default)]
pub struct Settings {
pub server: ServerSettings,

@ -1,6 +1,7 @@
use serde::{Deserialize, Serialize};
use std::path::PathBuf;
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct PathSettings {
pub(crate) database_directory: String,

@ -1,6 +1,7 @@
use serde::{Deserialize, Serialize};
use std::net::IpAddr;
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, Deserialize, Serialize, Default)]
pub struct ServerSettings {
pub tcp: TcpServerSettings,

@ -1,7 +1,9 @@
use crate::error::RepoResult;
use serde::{Deserialize, Serialize};
use std::net::IpAddr;
use serde::{Deserialize, Serialize};
use crate::error::RepoResult;
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct SettingsV1 {
pub listen_address: IpAddr,

@ -1,10 +1,12 @@
use crate::settings::Settings;
use mediarepo_api::types::repo::SizeType;
use std::collections::HashMap;
use std::path::PathBuf;
use mediarepo_api::types::repo::SizeType;
use tokio_graceful_shutdown::SubsystemHandle;
use typemap_rev::TypeMapKey;
use crate::settings::Settings;
pub struct SettingsKey;
impl TypeMapKey for SettingsKey {

@ -1,9 +1,11 @@
use crate::error::RepoResult;
use futures::future;
use std::path::PathBuf;
use futures::future;
use tokio::fs::{self, OpenOptions};
use tokio::io::{AsyncBufReadExt, BufReader};
use crate::error::RepoResult;
/// Parses a normalized tag into its two components of namespace and tag
pub fn parse_namespace_and_tag(norm_tag: String) -> (Option<String>, String) {
norm_tag

@ -1,7 +1,9 @@
use mediarepo_core::error::RepoDatabaseResult;
use std::time::Duration;
use sea_orm::{ConnectOptions, Database, DatabaseConnection};
use sqlx::migrate::MigrateDatabase;
use std::time::Duration;
use mediarepo_core::error::RepoDatabaseResult;
pub mod entities;
pub mod queries;

@ -1,11 +1,13 @@
use mediarepo_core::error::RepoResult;
use sea_orm::DbBackend;
use sea_orm::FromQueryResult;
use sea_orm::{DatabaseConnection, Statement};
use std::collections::HashMap;
use std::fmt::Display;
use std::iter::FromIterator;
use sea_orm::{DatabaseConnection, Statement};
use sea_orm::DbBackend;
use sea_orm::FromQueryResult;
use mediarepo_core::error::RepoResult;
#[derive(Debug, FromQueryResult)]
struct CIDNamespaceTag {
cd_id: i64,

@ -1,5 +1,5 @@
[package]
name = "mediarepo-model"
name = "mediarepo-logic"
version = "0.1.0"
edition = "2018"
workspace = ".."

@ -0,0 +1,73 @@
use std::io::Cursor;
use chrono::{Local, NaiveDateTime};
use sea_orm::ActiveValue::Set;
use sea_orm::{ActiveModelTrait, ConnectionTrait, DatabaseTransaction};
use mediarepo_core::error::RepoResult;
use mediarepo_database::entities::{content_descriptor, file, file_metadata};
use crate::dao::file::FileDao;
use crate::dto::{AddFileDto, FileDto};
impl FileDao {
#[tracing::instrument(level = "debug", skip(self))]
pub async fn add(&self, add_dto: AddFileDto) -> RepoResult<FileDto> {
let trx = self.ctx.db.begin().await?;
let file_size = add_dto.content.len();
let cd_bin = self
.ctx
.main_storage
.add_file(Cursor::new(add_dto.content), None)
.await?;
let cd_model = content_descriptor::ActiveModel {
descriptor: Set(cd_bin),
..Default::default()
};
let cd = cd_model.insert(&trx).await?;
let model = file::ActiveModel {
cd_id: Set(cd.id),
mime_type: Set(add_dto.mime_type),
..Default::default()
};
let file: file::Model = model.insert(&trx).await?;
let metadata = add_file_metadata(
&trx,
file.id,
file_size as i64,
add_dto.creation_time,
add_dto.change_time,
add_dto.name,
)
.await?;
trx.commit().await?;
Ok(FileDto::new(file, cd, Some(metadata)))
}
}
async fn add_file_metadata(
trx: &DatabaseTransaction,
file_id: i64,
size: i64,
creation_time: NaiveDateTime,
change_time: NaiveDateTime,
name: Option<String>,
) -> RepoResult<file_metadata::Model> {
let metadata_model = file_metadata::ActiveModel {
file_id: Set(file_id),
size: Set(size),
import_time: Set(Local::now().naive_local()),
creation_time: Set(creation_time),
change_time: Set(change_time),
name: Set(name),
..Default::default()
};
let metadata = metadata_model.insert(trx).await?;
Ok(metadata)
}

@ -0,0 +1,43 @@
use sea_orm::ConnectionTrait;
use sea_orm::prelude::*;
use mediarepo_core::error::{RepoResult};
use mediarepo_database::entities::{
content_descriptor, content_descriptor_tag, file, file_metadata,
};
use crate::dao::file::{FileDao};
use crate::dto::FileDto;
impl FileDao {
#[tracing::instrument(level = "debug", skip(self))]
pub async fn delete(&self, file: FileDto) -> RepoResult<()> {
let trx = self.ctx.db.begin().await?;
file_metadata::Entity::delete_many()
.filter(file_metadata::Column::FileId.eq(file.id()))
.exec(&trx)
.await?;
file::Entity::delete_many()
.filter(file::Column::Id.eq(file.id()))
.exec(&trx)
.await?;
content_descriptor_tag::Entity::delete_many()
.filter(content_descriptor_tag::Column::CdId.eq(file.cd_id()))
.exec(&trx)
.await?;
content_descriptor::Entity::delete_many()
.filter(content_descriptor::Column::Id.eq(file.cd_id()))
.exec(&trx)
.await?;
self.ctx
.thumbnail_storage
.delete_parent(&file.encoded_cd())
.await?;
self.ctx.main_storage.delete_file(file.cd()).await?;
trx.commit().await?;
Ok(())
}
}

@ -1,11 +1,16 @@
use chrono::NaiveDateTime;
use sea_orm::{ColumnTrait, EntityTrait, QueryFilter, QuerySelect};
use sea_orm::Condition;
use sea_orm::sea_query::{Alias, Expr, Query, SimpleExpr};
use mediarepo_core::error::RepoResult;
use mediarepo_database::entities::content_descriptor;
use mediarepo_database::entities::content_descriptor_tag;
use mediarepo_database::entities::file;
use mediarepo_database::entities::file_metadata;
use sea_orm::sea_query::{Alias, Expr, Query, SimpleExpr};
use sea_orm::ColumnTrait;
use sea_orm::Condition;
use crate::dao::file::{FileDao, map_cd_and_file};
use crate::dto::FileDto;
macro_rules! apply_ordering_comparator {
($column:expr, $filter:expr) => {
@ -53,8 +58,28 @@ pub enum NegatableComparator<T> {
IsNot(T),
}
impl FileDao {
/// Finds files by filters
#[tracing::instrument(level = "debug", skip(self))]
pub async fn find(&self, filters: Vec<Vec<FilterProperty>>) -> RepoResult<Vec<FileDto>> {
let main_condition = build_find_filter_conditions(filters);
let files = content_descriptor::Entity::find()
.find_also_related(file::Entity)
.filter(main_condition)
.group_by(file::Column::Id)
.all(&self.ctx.db)
.await?
.into_iter()
.filter_map(map_cd_and_file)
.collect();
Ok(files)
}
}
#[tracing::instrument(level = "debug")]
pub fn build_find_filter_conditions(filters: Vec<Vec<FilterProperty>>) -> Condition {
fn build_find_filter_conditions(filters: Vec<Vec<FilterProperty>>) -> Condition {
filters
.into_iter()
.fold(Condition::all(), |all_cond, mut expression| {

@ -0,0 +1,151 @@
use sea_orm::prelude::*;
use tokio::io::AsyncReadExt;
use mediarepo_core::error::RepoResult;
use mediarepo_database::entities::{content_descriptor, file, file_metadata};
use crate::dao::{DaoContext, DaoProvider};
use crate::dto::{FileDto, FileMetadataDto, ThumbnailDto};
pub mod add;
pub mod delete;
pub mod find;
pub mod update;
pub struct FileDao {
ctx: DaoContext,
}
impl DaoProvider for FileDao {
fn dao_ctx(&self) -> DaoContext {
self.ctx.clone()
}
}
impl FileDao {
pub fn new(ctx: DaoContext) -> Self {
Self { ctx }
}
#[tracing::instrument(level = "debug", skip(self))]
pub async fn all(&self) -> RepoResult<Vec<FileDto>> {
let files = file::Entity::find()
.find_also_related(content_descriptor::Entity)
.all(&self.ctx.db)
.await?
.into_iter()
.filter_map(map_file_and_cd)
.collect();
Ok(files)
}
#[tracing::instrument(level = "debug", skip(self))]
#[inline]
pub async fn by_id(&self, id: i64) -> RepoResult<Option<FileDto>> {
self.all_by_id(vec![id]).await.map(|f| f.into_iter().next())
}
#[tracing::instrument(level = "debug", skip(self))]
#[inline]
pub async fn by_cd(&self, cd: Vec<u8>) -> RepoResult<Option<FileDto>> {
self.all_by_cd(vec![cd]).await.map(|f| f.into_iter().next())
}
#[tracing::instrument(level = "debug", skip(self))]
pub async fn all_by_cd(&self, cds: Vec<Vec<u8>>) -> RepoResult<Vec<FileDto>> {
if cds.is_empty() {
return Ok(vec![]);
}
let files = file::Entity::find()
.find_also_related(content_descriptor::Entity)
.filter(content_descriptor::Column::Descriptor.is_in(cds))
.all(&self.ctx.db)
.await?
.into_iter()
.filter_map(map_file_and_cd)
.collect();
Ok(files)
}
#[tracing::instrument(level = "debug", skip(self))]
pub async fn all_by_id(&self, ids: Vec<i64>) -> RepoResult<Vec<FileDto>> {
if ids.is_empty() {
return Ok(vec![]);
}
let files = file::Entity::find()
.find_also_related(content_descriptor::Entity)
.filter(file::Column::Id.is_in(ids))
.all(&self.ctx.db)
.await?
.into_iter()
.filter_map(map_file_and_cd)
.collect();
Ok(files)
}
pub async fn metadata(&self, file_id: i64) -> RepoResult<Option<FileMetadataDto>> {
self.all_metadata(vec![file_id])
.await
.map(|m| m.into_iter().next())
}
#[tracing::instrument(level = "debug", skip(self))]
pub async fn all_metadata(&self, file_ids: Vec<i64>) -> RepoResult<Vec<FileMetadataDto>> {
if file_ids.is_empty() {
return Ok(vec![]);
}
let metadata = file_metadata::Entity::find()
.filter(file_metadata::Column::FileId.is_in(file_ids))
.all(&self.ctx.db)
.await?
.into_iter()
.map(|m| FileMetadataDto::new(m))
.collect();
Ok(metadata)
}
/// Returns all thumbnails for a cd
#[tracing::instrument(level = "debug", skip(self))]
pub async fn thumbnails(&self, encoded_cd: String) -> RepoResult<Vec<ThumbnailDto>> {
let thumbnails = self
.ctx
.thumbnail_storage
.get_thumbnails(&encoded_cd)
.await?
.into_iter()
.map(|(size, path)| {
ThumbnailDto::new(path, encoded_cd.clone(), size, String::from("image/png"))
})
.collect();
Ok(thumbnails)
}
#[tracing::instrument(level = "debug", skip(self))]
pub async fn get_bytes(&self, cd: &[u8]) -> RepoResult<Vec<u8>> {
let mut buf = Vec::new();
let mut reader = self.ctx.main_storage.get_file(cd).await?.1;
reader.read_to_end(&mut buf).await?;
Ok(buf)
}
}
fn map_file_and_cd(
(file, cd): (file::Model, Option<content_descriptor::Model>),
) -> Option<FileDto> {
cd.map(|c| FileDto::new(file, c, None))
}
fn map_cd_and_file(
(cd, file): (content_descriptor::Model, Option<file::Model>),
) -> Option<FileDto> {
file.map(|f| FileDto::new(f, cd, None))
}

@ -0,0 +1,95 @@
use std::fmt::Debug;
use std::io::Cursor;
use std::str::FromStr;
use sea_orm::prelude::*;
use sea_orm::ActiveValue::{Set, Unchanged};
use sea_orm::{ConnectionTrait, NotSet};
use mediarepo_core::error::{RepoError, RepoResult};
use mediarepo_core::fs::thumbnail_store::Dimensions;
use mediarepo_core::thumbnailer;
use mediarepo_core::thumbnailer::{ThumbnailSize};
use mediarepo_database::entities::{content_descriptor, file, file_metadata};
use crate::dao::file::FileDao;
use crate::dao::opt_to_active_val;
use crate::dto::{FileDto, FileMetadataDto, ThumbnailDto, UpdateFileDto, UpdateFileMetadataDto};
impl FileDao {
#[tracing::instrument(level = "debug", skip(self))]
pub async fn update(&self, update_dto: UpdateFileDto) -> RepoResult<FileDto> {
let trx = self.ctx.db.begin().await?;
let model = file::ActiveModel {
id: Set(update_dto.id),
cd_id: update_dto.cd_id.map(|v| Set(v)).unwrap_or(NotSet),
mime_type: update_dto.mime_type.map(|v| Set(v)).unwrap_or(NotSet),
status: update_dto.status.map(|v| Set(v as i32)).unwrap_or(NotSet),
};
let file_model = model.update(&trx).await?;
let cd = file_model
.find_related(content_descriptor::Entity)
.one(&trx)
.await?
.ok_or_else(|| RepoError::from("Content descriptor not found"))?;
trx.commit().await?;
Ok(FileDto::new(file_model, cd, None))
}
#[tracing::instrument(level = "debug", skip(self))]
pub async fn update_metadata(
&self,
update_dto: UpdateFileMetadataDto,
) -> RepoResult<FileMetadataDto> {
let model = file_metadata::ActiveModel {
file_id: Unchanged(update_dto.file_id),
name: opt_to_active_val(update_dto.name),
comment: opt_to_active_val(update_dto.comment),
size: opt_to_active_val(update_dto.size),
change_time: opt_to_active_val(update_dto.change_time),
..Default::default()
};
let metadata = model.update(&self.ctx.db).await?;
Ok(FileMetadataDto::new(metadata))
}
#[tracing::instrument(level = "debug", skip(self))]
pub async fn create_thumbnails<I: IntoIterator<Item = ThumbnailSize> + Debug>(
&self,
file: FileDto,
sizes: I,
) -> RepoResult<Vec<ThumbnailDto>> {
let bytes = self.get_bytes(file.cd()).await?;
let mime_type = mime::Mime::from_str(file.mime_type())
.unwrap_or_else(|_| mime::APPLICATION_OCTET_STREAM);
let thumbnails =
thumbnailer::create_thumbnails(Cursor::new(bytes), mime_type.clone(), sizes)?;
let mut dtos = Vec::new();
for thumbnail in thumbnails {
let mut buf = Vec::new();
let size = thumbnail.size();
let size = Dimensions {
height: size.1,
width: size.0,
};
thumbnail.write_png(&mut buf)?;
let path = self
.ctx
.thumbnail_storage
.add_thumbnail(file.encoded_cd(), size.clone(), &buf)
.await?;
dtos.push(ThumbnailDto::new(
path,
file.encoded_cd(),
size,
mime_type.to_string(),
))
}
Ok(dtos)
}
}

@ -0,0 +1,46 @@
use crate::dao::job::JobDao;
use mediarepo_core::content_descriptor::{
convert_v1_descriptor_to_v2, encode_content_descriptor, is_v1_content_descriptor,
};
use mediarepo_core::error::RepoResult;
use mediarepo_database::entities::content_descriptor;
use sea_orm::prelude::*;
use sea_orm::ActiveValue::Set;
use sea_orm::ConnectionTrait;
impl JobDao {
#[tracing::instrument(level = "debug", skip(self))]
pub async fn migrate_content_descriptors(&self) -> RepoResult<()> {
let cds: Vec<content_descriptor::Model> =
content_descriptor::Entity::find().all(&self.ctx.db).await?;
tracing::info!("Converting content descriptors to v2 format...");
let mut converted_count = 0;
for cd in cds {
if is_v1_content_descriptor(&cd.descriptor) {
let trx = self.ctx.db.begin().await?;
let src_cd = cd.descriptor;
let dst_cd = convert_v1_descriptor_to_v2(&src_cd)?;
let _active_model = content_descriptor::ActiveModel {
id: Set(cd.id),
descriptor: Set(dst_cd.clone()),
};
self.ctx.main_storage.rename_file(&src_cd, &dst_cd).await?;
self.ctx
.thumbnail_storage
.rename_parent(
encode_content_descriptor(&src_cd),
encode_content_descriptor(&dst_cd),
)
.await?;
trx.commit().await?;
converted_count += 1;
}
}
tracing::info!("Converted {} descriptors", converted_count);
Ok(())
}
}

@ -0,0 +1,20 @@
pub mod migrate_content_descriptors;
pub mod sqlite_operations;
use crate::dao::{DaoContext, DaoProvider};
pub struct JobDao {
ctx: DaoContext,
}
impl DaoProvider for JobDao {
fn dao_ctx(&self) -> DaoContext {
self.ctx.clone()
}
}
impl JobDao {
pub fn new(ctx: DaoContext) -> JobDao {
Self { ctx }
}
}

@ -0,0 +1,44 @@
use crate::dao::job::JobDao;
use mediarepo_core::error::RepoError::Corrupted;
use mediarepo_core::error::RepoResult;
use sea_orm::DatabaseBackend::Sqlite;
use sea_orm::{ConnectionTrait, FromQueryResult, Statement};
#[derive(Debug, FromQueryResult)]
struct IntegrityCheckResult {
integrity_check: String,
}
impl JobDao {
#[tracing::instrument(level = "debug", skip(self))]
pub async fn check_integrity(&self) -> RepoResult<()> {
let check_result: Option<IntegrityCheckResult> = IntegrityCheckResult::find_by_statement(
Statement::from_string(Sqlite, String::from("PRAGMA integrity_check;")),
)
.one(&self.ctx.db)
.await?;
tracing::debug!("check result = {:?}", check_result);
check_result
.ok_or_else(|| Corrupted(String::from("no check result")))
.and_then(map_check_result)
}
#[tracing::instrument(level = "debug", skip(self))]
pub async fn vacuum(&self) -> RepoResult<()> {
self.ctx
.db
.execute(Statement::from_string(Sqlite, String::from("VACUUM;")))
.await?;
Ok(())
}
}
fn map_check_result(result: IntegrityCheckResult) -> RepoResult<()> {
if result.integrity_check == "ok" {
Ok(())
} else {
Err(Corrupted(result.integrity_check))
}
}

@ -0,0 +1,41 @@
use sea_orm::{ActiveValue, DatabaseConnection};
use mediarepo_core::fs::file_hash_store::FileHashStore;
use mediarepo_core::fs::thumbnail_store::ThumbnailStore;
use crate::dao::file::FileDao;
use crate::dao::job::JobDao;
use crate::dao::tag::TagDao;
pub mod file;
pub mod job;
pub mod repo;
pub mod tag;
#[derive(Clone)]
pub struct DaoContext {
pub db: DatabaseConnection,
pub main_storage: FileHashStore,
pub thumbnail_storage: ThumbnailStore,
}
pub trait DaoProvider {
fn dao_ctx(&self) -> DaoContext;
fn file(&self) -> FileDao {
FileDao::new(self.dao_ctx())
}
fn tag(&self) -> TagDao {
TagDao::new(self.dao_ctx())
}
fn job(&self) -> JobDao {
JobDao::new(self.dao_ctx())
}
}
fn opt_to_active_val<T: Into<sea_orm::Value>>(opt: Option<T>) -> ActiveValue<T> {
opt.map(|v| ActiveValue::Set(v))
.unwrap_or(ActiveValue::NotSet)
}

@ -0,0 +1,81 @@
use std::fmt::Debug;
use std::path::PathBuf;
use sea_orm::DatabaseConnection;
use mediarepo_core::error::RepoResult;
use mediarepo_core::fs::file_hash_store::FileHashStore;
use mediarepo_core::fs::thumbnail_store::ThumbnailStore;
use crate::dao::{DaoContext, DaoProvider};
use mediarepo_database::get_database;
use mediarepo_database::queries::analysis::{get_all_counts, Counts};
#[derive(Clone)]
pub struct Repo {
db: DatabaseConnection,
main_storage: FileHashStore,
thumbnail_storage: ThumbnailStore,
}
impl DaoProvider for Repo {
fn dao_ctx(&self) -> DaoContext {
DaoContext {
db: self.db.clone(),
main_storage: self.main_storage.clone(),
thumbnail_storage: self.thumbnail_storage.clone(),
}
}
}
impl Repo {
pub(crate) fn new(
db: DatabaseConnection,
file_store_path: PathBuf,
thumb_store_path: PathBuf,
) -> Self {
Self {
db,
main_storage: FileHashStore::new(file_store_path),
thumbnail_storage: ThumbnailStore::new(thumb_store_path),
}
}
/// Connects to the database with the given uri
#[tracing::instrument(level = "debug")]
pub async fn connect<S: AsRef<str> + Debug>(
uri: S,
file_store_path: PathBuf,
thumb_store_path: PathBuf,
) -> RepoResult<Self> {
let db = get_database(uri).await?;
Ok(Self::new(db, file_store_path, thumb_store_path))
}
/// Returns the database of the repo for raw sql queries
pub fn db(&self) -> &DatabaseConnection {
&self.db
}
/// Returns the size of the main storage
#[inline]
#[tracing::instrument(level = "debug", skip(self))]
pub async fn get_main_store_size(&self) -> RepoResult<u64> {
self.main_storage.get_size().await
}
/// Returns the size of the thumbnail storage
#[inline]
#[tracing::instrument(level = "debug", skip(self))]
pub async fn get_thumb_store_size(&self) -> RepoResult<u64> {
self.thumbnail_storage.get_size().await
}
/// Returns all entity counts
#[inline]
#[tracing::instrument(level = "debug", skip(self))]
pub async fn get_counts(&self) -> RepoResult<Counts> {
get_all_counts(&self.db).await
}
}

@ -0,0 +1,137 @@
use crate::dao::tag::{map_tag_dto, TagDao};
use crate::dto::{AddTagDto, NamespaceDto, TagDto};
use mediarepo_core::error::RepoResult;
use mediarepo_database::entities::{namespace, tag};
use sea_orm::prelude::*;
use sea_orm::ActiveValue::Set;
use sea_orm::{Condition, ConnectionTrait, DatabaseTransaction};
use std::collections::HashMap;
use std::iter::FromIterator;
impl TagDao {
#[tracing::instrument(level = "debug", skip(self))]
pub async fn add_all(&self, mut tags: Vec<AddTagDto>) -> RepoResult<Vec<TagDto>> {
let namespaces = tags.iter().filter_map(|t| t.namespace.clone()).collect();
let trx = self.ctx.db.begin().await?;
let existing_tags = tags_by_name(&trx, tags.clone()).await?;
if existing_tags.len() == tags.len() {
return Ok(existing_tags);
}
let existing_tag_map: HashMap<String, TagDto> =
HashMap::from_iter(existing_tags.into_iter().map(|t| (t.normalized_name(), t)));
tags.retain(|dto| !existing_tag_map.contains_key(&dto.normalized_name()));
let namespace_map = add_or_get_all_namespaces(&trx, namespaces).await?;
if tags.is_empty() {
return Ok(existing_tag_map.into_values().collect());
}
let tag_models: Vec<tag::ActiveModel> = tags
.iter()
.map(|t| tag::ActiveModel {
name: Set(t.name.to_owned()),
namespace_id: Set(t
.namespace
.as_ref()
.and_then(|n| namespace_map.get(n))
.map(|n| n.id())),
..Default::default()
})
.collect();
tag::Entity::insert_many(tag_models).exec(&trx).await?;
let mut tag_dtos = tags_by_name(&trx, tags).await?;
trx.commit().await?;
tag_dtos.append(&mut existing_tag_map.into_values().collect());
Ok(tag_dtos)
}
}
async fn add_or_get_all_namespaces(
trx: &DatabaseTransaction,
mut namespaces: Vec<String>,
) -> RepoResult<HashMap<String, NamespaceDto>> {
if namespaces.is_empty() {
return Ok(HashMap::with_capacity(0));
}
let existing_namespaces = namespaces_by_name(trx, namespaces.clone()).await?;
let mut namespace_map = HashMap::from_iter(
existing_namespaces
.into_iter()
.map(|nsp| (nsp.name().to_owned(), nsp)),
);
if namespaces.len() == namespace_map.len() {
return Ok(namespace_map);
}
namespaces.retain(|nsp| !namespace_map.contains_key(nsp));
if namespaces.is_empty() {
return Ok(namespace_map);
}
let namespace_models: Vec<namespace::ActiveModel> = namespaces
.iter()
.map(|nsp| namespace::ActiveModel {
name: Set(nsp.to_owned()),
..Default::default()
})
.collect();
namespace::Entity::insert_many(namespace_models)
.exec(trx)
.await?;
let additional_namespaces = namespaces_by_name(trx, namespaces.clone()).await?;
for nsp in additional_namespaces {
namespace_map.insert(nsp.name().to_owned(), nsp);
}
Ok(namespace_map)
}
async fn namespaces_by_name(
trx: &DatabaseTransaction,
names: Vec<String>,
) -> RepoResult<Vec<NamespaceDto>> {
if names.is_empty() {
return Ok(vec![]);
}
let namespaces: Vec<NamespaceDto> = namespace::Entity::find()
.filter(namespace::Column::Name.is_in(names))
.all(trx)
.await?
.into_iter()
.map(NamespaceDto::new)
.collect();
Ok(namespaces)
}
async fn tags_by_name(trx: &DatabaseTransaction, tags: Vec<AddTagDto>) -> RepoResult<Vec<TagDto>> {
if tags.is_empty() {
return Ok(vec![]);
}
let condition = tags
.into_iter()
.map(build_tag_condition)
.fold(Condition::any(), Condition::add);
let tags = tag::Entity::find()
.find_also_related(namespace::Entity)
.filter(condition)
.all(trx)
.await?
.into_iter()
.map(map_tag_dto)
.collect();
Ok(tags)
}
fn build_tag_condition(tag: AddTagDto) -> Condition {
if let Some(namespace) = tag.namespace {
Condition::all()
.add(tag::Column::Name.eq(tag.name))
.add(namespace::Column::Name.eq(namespace))
} else {
Condition::all().add(tag::Column::Name.eq(tag.name))
}
}

@ -0,0 +1,63 @@
use crate::dao::tag::{map_tag_dto, TagDao};
use crate::dto::TagDto;
use mediarepo_core::error::RepoResult;
use mediarepo_database::entities::{namespace, tag};
use sea_orm::prelude::*;
use sea_orm::sea_query::Expr;
use sea_orm::Condition;
#[derive(Clone, Debug)]
pub struct TagByNameQuery {
pub namespace: Option<String>,
pub name: String,
}
impl TagDao {
/// Filters all tags by names
/// wildcards are supported
#[tracing::instrument(level = "debug", skip(self))]
pub async fn all_by_name(&self, names: Vec<TagByNameQuery>) -> RepoResult<Vec<TagDto>> {
let mut condition_count = 0;
let condition = names
.into_iter()
.filter_map(name_query_to_condition)
.inspect(|_| condition_count += 1)
.fold(Condition::any(), Condition::add);
if condition_count == 0 {
return Ok(vec![]);
}
let tags = tag::Entity::find()
.find_also_related(namespace::Entity)
.filter(condition)
.all(&self.ctx.db)
.await?
.into_iter()
.map(map_tag_dto)
.collect();
Ok(tags)
}
}
fn name_query_to_condition(query: TagByNameQuery) -> Option<Condition> {
let TagByNameQuery { namespace, name } = query;
let mut condition = Condition::all();
if !name.ends_with('*') {
condition = condition.add(tag::Column::Name.eq(name))
} else if name.len() > 1 {
condition =
condition.add(tag::Column::Name.like(&*format!("{}%", name.trim_end_matches("*"))))
} else if namespace.is_none() {
return None;
}
condition = if let Some(namespace) = namespace {
condition.add(namespace::Column::Name.eq(namespace))
} else {
condition.add(Expr::tbl(tag::Entity, tag::Column::NamespaceId).is_null())
};
Some(condition)
}

@ -0,0 +1,66 @@
use sea_orm::{ConnectionTrait, DatabaseTransaction};
use sea_orm::ActiveValue::Set;
use sea_orm::prelude::*;
use mediarepo_core::error::RepoResult;
use mediarepo_database::entities::content_descriptor_tag;
use crate::dao::tag::TagDao;
impl TagDao {
#[tracing::instrument(level = "debug", skip(self))]
pub async fn upsert_mappings(&self, cd_ids: Vec<i64>, tag_ids: Vec<i64>) -> RepoResult<()> {
let trx = self.ctx.db.begin().await?;
let existing_mappings = get_existing_mappings(&trx, &cd_ids, &tag_ids).await?;
let active_models: Vec<content_descriptor_tag::ActiveModel> = cd_ids
.into_iter()
.flat_map(|cd_id: i64| {
tag_ids
.iter()
.filter(|tag_id| !existing_mappings.contains(&(cd_id, **tag_id)))
.map(move |tag_id| content_descriptor_tag::ActiveModel {
cd_id: Set(cd_id),
tag_id: Set(*tag_id),
})
.collect::<Vec<content_descriptor_tag::ActiveModel>>()
})
.collect();
content_descriptor_tag::Entity::insert_many(active_models)
.exec(&trx)
.await?;
trx.commit().await?;
Ok(())
}
#[tracing::instrument(level = "debug", skip(self))]
pub async fn remove_mappings(&self, cd_ids: Vec<i64>, tag_ids: Vec<i64>) -> RepoResult<()> {
content_descriptor_tag::Entity::delete_many()
.filter(content_descriptor_tag::Column::CdId.is_in(cd_ids))
.filter(content_descriptor_tag::Column::TagId.is_in(tag_ids))
.exec(&self.ctx.db)
.await?;
Ok(())
}
}
async fn get_existing_mappings(
trx: &DatabaseTransaction,
cd_ids: &Vec<i64>,
tag_ids: &Vec<i64>,
) -> RepoResult<Vec<(i64, i64)>> {
let existing_mappings: Vec<(i64, i64)> = content_descriptor_tag::Entity::find()
.filter(content_descriptor_tag::Column::CdId.is_in(cd_ids.clone()))
.filter(content_descriptor_tag::Column::TagId.is_in(tag_ids.clone()))
.all(trx)
.await?
.into_iter()
.map(|model: content_descriptor_tag::Model| (model.tag_id, model.cd_id))
.collect();
Ok(existing_mappings)
}

@ -0,0 +1,127 @@
use sea_orm::prelude::*;
use sea_orm::JoinType;
use sea_orm::QuerySelect;
use std::collections::HashMap;
use std::iter::FromIterator;
use mediarepo_core::error::RepoResult;
use mediarepo_core::utils::parse_namespace_and_tag;
use mediarepo_database::entities::{content_descriptor, content_descriptor_tag, namespace, tag};
use crate::dao::tag::by_name::TagByNameQuery;
use crate::dao::{DaoContext, DaoProvider};
use crate::dto::{NamespaceDto, TagDto};
pub mod add;
pub mod by_name;
pub mod mappings;
pub struct TagDao {
ctx: DaoContext,
}
impl DaoProvider for TagDao {
fn dao_ctx(&self) -> DaoContext {
self.ctx.clone()
}
}
impl TagDao {
pub fn new(ctx: DaoContext) -> Self {
Self { ctx }
}
#[tracing::instrument(level = "debug", skip(self))]
pub async fn all(&self) -> RepoResult<Vec<TagDto>> {
let tags = tag::Entity::find()
.find_also_related(namespace::Entity)
.all(&self.ctx.db)
.await?
.into_iter()
.map(map_tag_dto)
.collect();
Ok(tags)
}
#[tracing::instrument(level = "debug", skip(self))]
pub async fn all_namespaces(&self) -> RepoResult<Vec<NamespaceDto>> {
let namespaces = namespace::Entity::find()
.all(&self.ctx.db)
.await?
.into_iter()
.map(NamespaceDto::new)
.collect();
Ok(namespaces)
}
#[tracing::instrument(level = "debug", skip(self))]
pub async fn all_for_cds(&self, cds: Vec<Vec<u8>>) -> RepoResult<Vec<TagDto>> {
let tags = tag::Entity::find()
.find_also_related(namespace::Entity)
.join(
JoinType::LeftJoin,
content_descriptor_tag::Relation::Tag.def().rev(),
)
.join(
JoinType::InnerJoin,
content_descriptor_tag::Relation::ContentDescriptorId.def(),
)
.filter(content_descriptor::Column::Descriptor.is_in(cds))
.all(&self.ctx.db)
.await?
.into_iter()
.map(map_tag_dto)
.collect();
Ok(tags)
}
#[tracing::instrument(level = "debug", skip(self))]
pub async fn tags_for_cd(&self, cd_id: i64) -> RepoResult<Vec<TagDto>> {
let tags = tag::Entity::find()
.find_also_related(namespace::Entity)
.join(
JoinType::LeftJoin,
content_descriptor_tag::Relation::Tag.def().rev(),
)
.join(
JoinType::InnerJoin,
content_descriptor_tag::Relation::ContentDescriptorId.def(),
)
.filter(content_descriptor::Column::Id.eq(cd_id))
.all(&self.ctx.db)
.await?
.into_iter()
.map(map_tag_dto)
.collect();
Ok(tags)
}
/// Returns a map mapping tag names to ids
#[tracing::instrument(level = "debug", skip(self))]
pub async fn normalized_tags_to_ids(
&self,
names: Vec<String>,
) -> RepoResult<HashMap<String, i64>> {
let queries = names
.into_iter()
.map(parse_namespace_and_tag)
.map(|(namespace, name)| TagByNameQuery { namespace, name })
.collect();
let tags = self.all_by_name(queries).await?;
let tag_map = HashMap::from_iter(
tags.into_iter()
.map(|tag| (tag.normalized_name(), tag.id())),
);
Ok(tag_map)
}
}
fn map_tag_dto(result: (tag::Model, Option<namespace::Model>)) -> TagDto {
TagDto::new(result.0, result.1)
}

@ -0,0 +1,112 @@
use chrono::NaiveDateTime;
use mediarepo_core::content_descriptor::encode_content_descriptor;
use mediarepo_core::mediarepo_api::types::files::FileStatus as ApiFileStatus;
use mediarepo_database::entities::content_descriptor;
use mediarepo_database::entities::file;
use mediarepo_database::entities::file_metadata;
use crate::dto::FileMetadataDto;
#[derive(Clone, Debug)]
pub struct FileDto {
model: file::Model,
content_descriptor: content_descriptor::Model,
metadata: Option<FileMetadataDto>,
}
impl FileDto {
pub(crate) fn new(
model: file::Model,
content_descriptor: content_descriptor::Model,
metadata: Option<file_metadata::Model>,
) -> Self {
Self {
model,
content_descriptor,
metadata: metadata.map(FileMetadataDto::new),
}
}
pub fn id(&self) -> i64 {
self.model.id
}
pub fn cd_id(&self) -> i64 {
self.model.cd_id
}
pub fn cd(&self) -> &[u8] {
&self.content_descriptor.descriptor
}
pub fn encoded_cd(&self) -> String {
encode_content_descriptor(&self.content_descriptor.descriptor)
}
pub fn status(&self) -> FileStatus {
match self.model.status {
10 => FileStatus::Imported,
20 => FileStatus::Archived,
30 => FileStatus::Deleted,
_ => FileStatus::Imported,
}
}
pub fn mime_type(&self) -> &String {
&self.model.mime_type
}
pub fn metadata(&self) -> Option<&FileMetadataDto> {
self.metadata.as_ref()
}
pub fn into_metadata(self) -> Option<FileMetadataDto> {
self.metadata
}
}
#[derive(Clone, Debug)]
pub struct AddFileDto {
pub content: Vec<u8>,
pub mime_type: String,
pub creation_time: NaiveDateTime,
pub change_time: NaiveDateTime,
pub name: Option<String>,
}
#[derive(Clone, Debug)]
pub struct UpdateFileDto {
pub id: i64,
pub cd_id: Option<i64>,
pub mime_type: Option<String>,
pub status: Option<FileStatus>,
}
impl Default for UpdateFileDto {
fn default() -> Self {
Self {
id: 0,
cd_id: None,
mime_type: None,
status: None,
}
}
}
#[derive(Copy, Clone, Debug)]
pub enum FileStatus {
Imported = 10,
Archived = 20,
Deleted = 30,
}
impl From<ApiFileStatus> for FileStatus {
fn from(s: ApiFileStatus) -> Self {
match s {
ApiFileStatus::Imported => Self::Imported,
ApiFileStatus::Archived => Self::Archived,
ApiFileStatus::Deleted => Self::Deleted,
}
}
}

@ -0,0 +1,51 @@
use chrono::NaiveDateTime;
use mediarepo_database::entities::file_metadata;
#[derive(Clone, Debug)]
pub struct FileMetadataDto {
model: file_metadata::Model,
}
impl FileMetadataDto {
pub(crate) fn new(model: file_metadata::Model) -> Self {
Self { model }
}
pub fn file_id(&self) -> i64 {
self.model.file_id
}
pub fn name(&self) -> Option<&String> {
self.model.name.as_ref()
}
pub fn comment(&self) -> Option<&String> {
self.model.comment.as_ref()
}
pub fn size(&self) -> i64 {
self.model.size
}
pub fn import_time(&self) -> NaiveDateTime {
self.model.import_time
}
pub fn creation_time(&self) -> NaiveDateTime {
self.model.creation_time
}
pub fn change_time(&self) -> NaiveDateTime {
self.model.change_time
}
}
#[derive(Clone, Debug, Default)]
pub struct UpdateFileMetadataDto {
pub file_id: i64,
pub name: Option<Option<String>>,
pub comment: Option<Option<String>>,
pub size: Option<i64>,
pub change_time: Option<NaiveDateTime>,
}

@ -0,0 +1,12 @@
pub use file::*;
pub use file_metadata::*;
pub use namespace::*;
pub use tag::*;
pub use thumbnail::*;
mod file;
mod file_metadata;
mod tag;
mod namespace;
mod thumbnail;

@ -0,0 +1,20 @@
use mediarepo_database::entities::namespace;
#[derive(Clone, Debug)]
pub struct NamespaceDto {
model: namespace::Model,
}
impl NamespaceDto {
pub(crate) fn new(model: namespace::Model) -> Self {
Self {model}
}
pub fn id(&self) -> i64 {
self.model.id
}
pub fn name(&self) -> &String {
&self.model.name
}
}

@ -0,0 +1,62 @@
pub use mediarepo_database::entities::namespace;
pub use mediarepo_database::entities::tag;
use crate::dto::NamespaceDto;
#[derive(Clone, Debug)]
pub struct TagDto {
model: tag::Model,
namespace: Option<NamespaceDto>,
}
impl TagDto {
pub(crate) fn new(model: tag::Model, namespace_model: Option<namespace::Model>) -> Self {
Self {
model,
namespace: namespace_model.map(NamespaceDto::new),
}
}
pub fn id(&self) -> i64 {
self.model.id
}
pub fn name(&self) -> &String {
&self.model.name
}
pub fn namespace(&self) -> Option<&NamespaceDto> {
self.namespace.as_ref()
}
/// Returns the normalized name of the tag (namespace:tag)
pub fn normalized_name(&self) -> String {
if let Some(namespace) = &self.namespace {
format!("{}:{}", namespace.name(), self.name())
} else {
self.name().to_owned()
}
}
}
#[derive(Clone, Debug)]
pub struct AddTagDto {
pub namespace: Option<String>,
pub name: String,
}
impl AddTagDto {
pub fn from_tuple(tuple: (Option<String>, String)) -> Self {
let (namespace, name) = tuple;
Self { namespace, name }
}
/// Returns the normalized name of the tag (namespace:tag)
pub fn normalized_name(&self) -> String {
if let Some(namespace) = &self.namespace {
format!("{}:{}", namespace, &self.name)
} else {
self.name.to_owned()
}
}
}

@ -0,0 +1,53 @@
use std::path::PathBuf;
use tokio::fs;
use tokio::fs::{File, OpenOptions};
use tokio::io::BufReader;
use mediarepo_core::error::RepoResult;
use mediarepo_core::fs::thumbnail_store::Dimensions;
#[derive(Clone, Debug)]
pub struct ThumbnailDto {
path: PathBuf,
parent_cd: String,
size: Dimensions,
mime_type: String,
}
impl ThumbnailDto {
pub fn new(path: PathBuf, parent_cd: String, size: Dimensions, mime_type: String) -> Self {
Self {
path,
parent_cd,
size,
mime_type,
}
}
pub fn parent_cd(&self) -> &String {
&self.parent_cd
}
pub fn size(&self) -> &Dimensions {
&self.size
}
pub fn mime_type(&self) -> &String {
&self.mime_type
}
#[tracing::instrument(level = "debug")]
pub async fn get_reader(&self) -> RepoResult<BufReader<File>> {
let file = OpenOptions::new().read(true).open(&self.path).await?;
Ok(BufReader::new(file))
}
/// Deletes the thumbnail
#[tracing::instrument(level = "debug")]
pub async fn delete(self) -> RepoResult<()> {
fs::remove_file(&self.path).await?;
Ok(())
}
}

@ -0,0 +1,3 @@
pub mod dao;
pub mod dto;
pub mod type_keys;

@ -1,7 +1,9 @@
use crate::repo::Repo;
use std::sync::Arc;
use typemap_rev::TypeMapKey;
use crate::dao::repo::Repo;
pub struct RepoKey;
impl TypeMapKey for RepoKey {

@ -1,101 +0,0 @@
use crate::file::File;
use mediarepo_core::content_descriptor::convert_v1_descriptor_to_v2;
use mediarepo_core::error::RepoResult;
use mediarepo_database::entities::content_descriptor;
use mediarepo_database::entities::file;
use sea_orm::prelude::*;
use sea_orm::{DatabaseConnection, Set};
use std::fmt::Debug;
pub struct ContentDescriptor {
db: DatabaseConnection,
model: content_descriptor::Model,
}
impl ContentDescriptor {
#[tracing::instrument(level = "trace")]
pub(crate) fn new(db: DatabaseConnection, model: content_descriptor::Model) -> Self {
Self { db, model }
}
pub async fn all(db: DatabaseConnection) -> RepoResult<Vec<Self>> {
let descriptors = content_descriptor::Entity::find()
.all(&db)
.await?
.into_iter()
.map(|model| Self::new(db.clone(), model))
.collect();
Ok(descriptors)
}
/// Searches for the hash by id
#[tracing::instrument(level = "debug", skip(db))]
pub async fn by_id(db: DatabaseConnection, id: i64) -> RepoResult<Option<Self>> {
let hash = content_descriptor::Entity::find_by_id(id)
.one(&db)
.await?
.map(|model| Self::new(db, model));
Ok(hash)
}
/// Returns the hash by value
#[tracing::instrument(level = "debug", skip(db))]
pub async fn by_value<D: AsRef<[u8]> + Debug>(
db: DatabaseConnection,
descriptor: D,
) -> RepoResult<Option<Self>> {
let cid = content_descriptor::Entity::find()
.filter(content_descriptor::Column::Descriptor.eq(descriptor.as_ref()))
.one(&db)
.await?
.map(|model| Self::new(db, model));
Ok(cid)
}
/// Adds a new hash to the database
#[tracing::instrument(level = "debug", skip(db))]
pub async fn add(db: DatabaseConnection, descriptor: Vec<u8>) -> RepoResult<Self> {
let active_model = content_descriptor::ActiveModel {
descriptor: Set(descriptor),
..Default::default()
};
let model = active_model.insert(&db).await?;
Ok(Self::new(db, model))
}
pub fn id(&self) -> i64 {
self.model.id
}
pub fn descriptor(&self) -> &[u8] {
&self.model.descriptor[..]
}
/// Returns the file associated with the hash
#[tracing::instrument(level = "debug", skip(self))]
pub async fn file(&self) -> RepoResult<Option<File>> {
let file = self
.model
.find_related(file::Entity)
.one(&self.db)
.await?
.map(|file_model| File::new(self.db.clone(), file_model, self.model.clone()));
Ok(file)
}
pub async fn convert_v1_to_v2(&mut self) -> RepoResult<()> {
let descriptor = convert_v1_descriptor_to_v2(&self.model.descriptor)?;
let active_model = content_descriptor::ActiveModel {
id: Set(self.id()),
descriptor: Set(descriptor),
};
self.model = active_model.update(&self.db).await?;
Ok(())
}
}

@ -1,334 +0,0 @@
pub mod filter;
use std::fmt::Debug;
use std::io::Cursor;
use std::str::FromStr;
use mediarepo_core::content_descriptor::encode_content_descriptor;
use sea_orm::prelude::*;
use sea_orm::{ConnectionTrait, DatabaseConnection, Set};
use sea_orm::{JoinType, QuerySelect};
use tokio::io::{AsyncReadExt, BufReader};
use crate::file::filter::FilterProperty;
use crate::file_metadata::FileMetadata;
use mediarepo_core::error::{RepoError, RepoResult};
use mediarepo_core::fs::file_hash_store::FileHashStore;
use mediarepo_core::mediarepo_api::types::files::FileStatus as ApiFileStatus;
use mediarepo_core::thumbnailer::{self, Thumbnail as ThumbnailerThumb, ThumbnailSize};
use mediarepo_database::entities::content_descriptor;
use mediarepo_database::entities::content_descriptor_tag;
use mediarepo_database::entities::file;
use mediarepo_database::entities::file_metadata;
use mediarepo_database::entities::namespace;
use mediarepo_database::entities::tag;
use crate::tag::Tag;
pub enum FileStatus {
Imported = 10,
Archived = 20,
Deleted = 30,
}
impl From<ApiFileStatus> for FileStatus {
fn from(s: ApiFileStatus) -> Self {
match s {
ApiFileStatus::Imported => Self::Imported,
ApiFileStatus::Archived => Self::Archived,
ApiFileStatus::Deleted => Self::Deleted,
}
}
}
#[derive(Clone)]
pub struct File {
db: DatabaseConnection,
model: file::Model,
content_descriptor: content_descriptor::Model,
}
impl File {
#[tracing::instrument(level = "trace")]
pub(crate) fn new(
db: DatabaseConnection,
model: file::Model,
hash: content_descriptor::Model,
) -> Self {
Self {
db,
model,
content_descriptor: hash,
}
}
/// Returns a list of all known stored files
#[tracing::instrument(level = "debug", skip(db))]
pub async fn all(db: DatabaseConnection) -> RepoResult<Vec<File>> {
let files: Vec<(file::Model, Option<content_descriptor::Model>)> = file::Entity::find()
.find_also_related(content_descriptor::Entity)
.all(&db)
.await?;
let files = files
.into_iter()
.filter_map(|(f, h)| {
let h = h?;
Some(Self::new(db.clone(), f, h))
})
.collect();
Ok(files)
}
/// Fetches the file by id
#[tracing::instrument(level = "debug", skip(db))]
pub async fn by_id(db: DatabaseConnection, id: i64) -> RepoResult<Option<Self>> {
if let Some((model, Some(hash))) = file::Entity::find_by_id(id)
.find_also_related(content_descriptor::Entity)
.one(&db)
.await?
{
let file = File::new(db, model, hash);
Ok(Some(file))
} else {
Ok(None)
}
}
/// Finds the file by hash
#[tracing::instrument(level = "debug", skip(db))]
pub async fn by_cd(db: DatabaseConnection, cd: &[u8]) -> RepoResult<Option<Self>> {
if let Some((hash, Some(model))) = content_descriptor::Entity::find()
.filter(content_descriptor::Column::Descriptor.eq(cd))
.find_also_related(file::Entity)
.one(&db)
.await?
{
let file = File::new(db, model, hash);
Ok(Some(file))
} else {
Ok(None)
}
}
/// Finds the file by tags
#[tracing::instrument(level = "debug", skip(db))]
pub(crate) async fn find_by_filters(
db: DatabaseConnection,
filters: Vec<Vec<FilterProperty>>,
) -> RepoResult<Vec<Self>> {
let main_condition = filter::build_find_filter_conditions(filters);
let results: Vec<(content_descriptor::Model, Option<file::Model>)> =
content_descriptor::Entity::find()
.find_also_related(file::Entity)
.filter(main_condition)
.group_by(file::Column::Id)
.all(&db)
.await?;
let files: Vec<Self> = results
.into_iter()
.filter_map(|(hash, tag)| Some(Self::new(db.clone(), tag?, hash)))
.collect();
Ok(files)
}
/// Adds a file with its hash to the database
#[tracing::instrument(level = "debug", skip(db))]
pub(crate) async fn add(
db: DatabaseConnection,
cd_id: i64,
mime_type: String,
) -> RepoResult<Self> {
let file = file::ActiveModel {
cd_id: Set(cd_id),
mime_type: Set(mime_type),
..Default::default()
};
let file: file::ActiveModel = file.insert(&db).await?.into();
let file = Self::by_id(db, file.id.unwrap())
.await?
.expect("Inserted file does not exist");
Ok(file)
}
/// Returns the unique identifier of the file
pub fn id(&self) -> i64 {
self.model.id
}
/// Returns the hash of the file (content identifier)
pub fn cd(&self) -> &[u8] {
&self.content_descriptor.descriptor
}
/// Returns the encoded content descriptor
pub fn encoded_cd(&self) -> String {
encode_content_descriptor(self.cd())
}
/// Returns the id of the civ (content identifier value) of the file
pub fn cd_id(&self) -> i64 {
self.content_descriptor.id
}
/// Returns the mime type of the file
pub fn mime_type(&self) -> &String {
&self.model.mime_type
}
/// Returns the status of the file
pub fn status(&self) -> FileStatus {
match self.model.status {
10 => FileStatus::Imported,
20 => FileStatus::Archived,
30 => FileStatus::Deleted,
_ => FileStatus::Imported,
}
}
pub async fn set_status(&mut self, status: FileStatus) -> RepoResult<()> {
let active_model = file::ActiveModel {
id: Set(self.model.id),
status: Set(status as i32),
..Default::default()
};
self.model = active_model.update(&self.db).await?;
Ok(())
}
/// Returns the metadata associated with this file
/// A file MUST always have metadata associated
pub async fn metadata(&self) -> RepoResult<FileMetadata> {
FileMetadata::by_id(self.db.clone(), self.model.id)
.await
.and_then(|f| f.ok_or_else(|| RepoError::from("missing file metadata")))
}
/// Returns the list of tags of the file
#[tracing::instrument(level = "debug", skip(self))]
pub async fn tags(&self) -> RepoResult<Vec<Tag>> {
let tags: Vec<(tag::Model, Option<namespace::Model>)> = tag::Entity::find()
.find_also_related(namespace::Entity)
.join(
JoinType::LeftJoin,
content_descriptor_tag::Relation::Tag.def().rev(),
)
.join(
JoinType::InnerJoin,
content_descriptor_tag::Relation::ContentDescriptorId.def(),
)
.filter(content_descriptor::Column::Id.eq(self.content_descriptor.id))
.all(&self.db)
.await?;
let tags = tags
.into_iter()
.map(|(tag, namespace)| Tag::new(self.db.clone(), tag, namespace))
.collect();
Ok(tags)
}
/// Adds a single tag to the file
#[tracing::instrument(level = "debug", skip(self))]
pub async fn add_tag(&mut self, tag_id: i64) -> RepoResult<()> {
let cd_id = self.content_descriptor.id;
let active_model = content_descriptor_tag::ActiveModel {
cd_id: Set(cd_id),
tag_id: Set(tag_id),
};
active_model.insert(&self.db).await?;
Ok(())
}
/// Adds multiple tags to the file at once
#[tracing::instrument(level = "debug", skip(self))]
pub async fn add_tags(&self, tag_ids: Vec<i64>) -> RepoResult<()> {
if tag_ids.is_empty() {
return Ok(());
}
let cd_id = self.content_descriptor.id;
let models: Vec<content_descriptor_tag::ActiveModel> = tag_ids
.into_iter()
.map(|tag_id| content_descriptor_tag::ActiveModel {
cd_id: Set(cd_id),
tag_id: Set(tag_id),
})
.collect();
content_descriptor_tag::Entity::insert_many(models)
.exec(&self.db)
.await?;
Ok(())
}
/// Removes multiple tags from the file
#[tracing::instrument(level = "debug", skip(self))]
pub async fn remove_tags(&self, tag_ids: Vec<i64>) -> RepoResult<()> {
let hash_id = self.content_descriptor.id;
content_descriptor_tag::Entity::delete_many()
.filter(content_descriptor_tag::Column::CdId.eq(hash_id))
.filter(content_descriptor_tag::Column::TagId.is_in(tag_ids))
.exec(&self.db)
.await?;
Ok(())
}
/// Returns the reader for the file
#[tracing::instrument(level = "debug", skip(self))]
pub async fn get_reader(
&self,
storage: &FileHashStore,
) -> RepoResult<BufReader<tokio::fs::File>> {
storage
.get_file(&self.content_descriptor.descriptor)
.await
.map(|(_, f)| f)
}
/// Creates a thumbnail for the file
#[tracing::instrument(level = "debug", skip(self))]
pub async fn create_thumbnail<I: IntoIterator<Item = ThumbnailSize> + Debug>(
&self,
storage: &FileHashStore,
sizes: I,
) -> RepoResult<Vec<ThumbnailerThumb>> {
let mut buf = Vec::new();
self.get_reader(storage)
.await?
.read_to_end(&mut buf)
.await?;
let mime_type = self.model.mime_type.clone();
let mime_type =
mime::Mime::from_str(&mime_type).unwrap_or_else(|_| mime::APPLICATION_OCTET_STREAM);
let thumbs = thumbnailer::create_thumbnails(Cursor::new(buf), mime_type, sizes)?;
Ok(thumbs)
}
/// Deletes the file as well as the content descriptor, tag mappings and metadata about the file
#[tracing::instrument(level = "debug", skip(self))]
pub async fn delete(self) -> RepoResult<()> {
let trx = self.db.begin().await?;
file_metadata::Entity::delete_many()
.filter(file_metadata::Column::FileId.eq(self.model.id))
.exec(&trx)
.await?;
self.model.delete(&trx).await?;
content_descriptor_tag::Entity::delete_many()
.filter(content_descriptor_tag::Column::CdId.eq(self.content_descriptor.id))
.exec(&trx)
.await?;
content_descriptor::Entity::delete_many()
.filter(content_descriptor::Column::Id.eq(self.content_descriptor.id))
.exec(&trx)
.await?;
trx.commit().await?;
Ok(())
}
}

@ -1,124 +0,0 @@
use std::fmt::Debug;
use chrono::{Local, NaiveDateTime};
use sea_orm::prelude::*;
use sea_orm::{DatabaseConnection, Set};
use mediarepo_core::error::RepoResult;
use mediarepo_database::entities::file_metadata;
#[derive(Clone)]
pub struct FileMetadata {
db: DatabaseConnection,
model: file_metadata::Model,
}
impl FileMetadata {
#[tracing::instrument(level = "trace")]
pub(crate) fn new(db: DatabaseConnection, model: file_metadata::Model) -> Self {
Self { db, model }
}
/// Fetches the file by id
#[tracing::instrument(level = "debug", skip(db))]
pub async fn by_id(db: DatabaseConnection, id: i64) -> RepoResult<Option<Self>> {
let file_metadata = file_metadata::Entity::find_by_id(id)
.one(&db)
.await?
.map(|m| FileMetadata::new(db, m));
Ok(file_metadata)
}
/// Fetches metadata for all given file ids
#[tracing::instrument(level = "debug", skip(db))]
pub async fn all_by_ids(db: DatabaseConnection, ids: Vec<i64>) -> RepoResult<Vec<Self>> {
let file_metadata = file_metadata::Entity::find()
.filter(file_metadata::Column::FileId.is_in(ids))
.all(&db)
.await?
.into_iter()
.map(|m| FileMetadata::new(db.clone(), m))
.collect();
Ok(file_metadata)
}
/// Adds a file with its hash to the database
#[tracing::instrument(level = "debug", skip(db))]
pub(crate) async fn add(
db: DatabaseConnection,
file_id: i64,
size: i64,
creation_time: NaiveDateTime,
change_time: NaiveDateTime,
) -> RepoResult<Self> {
let file = file_metadata::ActiveModel {
file_id: Set(file_id),
size: Set(size),
import_time: Set(Local::now().naive_local()),
creation_time: Set(creation_time),
change_time: Set(change_time),
..Default::default()
};
let model = file.insert(&db).await?;
Ok(Self::new(db, model))
}
pub fn file_id(&self) -> i64 {
self.model.file_id
}
pub fn size(&self) -> i64 {
self.model.size
}
pub fn name(&self) -> &Option<String> {
&self.model.name
}
pub fn comment(&self) -> &Option<String> {
&self.model.comment
}
pub fn import_time(&self) -> &NaiveDateTime {
&self.model.import_time
}
pub fn creation_time(&self) -> &NaiveDateTime {
&self.model.creation_time
}
pub fn change_time(&self) -> &NaiveDateTime {
&self.model.change_time
}
/// Changes the name of the file
#[tracing::instrument(level = "debug", skip(self))]
pub async fn set_name<S: ToString + Debug>(&mut self, name: S) -> RepoResult<()> {
let mut active_model = self.get_active_model();
active_model.name = Set(Some(name.to_string()));
self.model = active_model.update(&self.db).await?;
Ok(())
}
/// Changes the comment of the file
#[tracing::instrument(level = "debug", skip(self))]
pub async fn set_comment<S: ToString + Debug>(&mut self, comment: S) -> RepoResult<()> {
let mut active_file = self.get_active_model();
active_file.comment = Set(Some(comment.to_string()));
self.model = active_file.update(&self.db).await?;
Ok(())
}
/// Returns the active model of the file with only the id set
fn get_active_model(&self) -> file_metadata::ActiveModel {
file_metadata::ActiveModel {
file_id: Set(self.file_id()),
..Default::default()
}
}
}

@ -1,8 +0,0 @@
pub mod content_descriptor;
pub mod file;
pub mod file_metadata;
pub mod namespace;
pub mod repo;
pub mod tag;
pub mod thumbnail;
pub mod type_keys;

@ -1,141 +0,0 @@
use mediarepo_core::error::RepoResult;
use mediarepo_database::entities::namespace;
use sea_orm::prelude::*;
use sea_orm::{
Condition, ConnectionTrait, DatabaseBackend, DatabaseConnection, InsertResult, Set, Statement,
};
use std::fmt::Debug;
#[derive(Clone)]
pub struct Namespace {
#[allow(dead_code)]
db: DatabaseConnection,
model: namespace::Model,
}
impl Namespace {
#[tracing::instrument(level = "trace")]
pub(crate) fn new(db: DatabaseConnection, model: namespace::Model) -> Self {
Self { db, model }
}
/// Retrieves a list of all namespaces
#[tracing::instrument(level = "debug", skip(db))]
pub async fn all(db: DatabaseConnection) -> RepoResult<Vec<Self>> {
let namespaces = namespace::Entity::find()
.all(&db)
.await?
.into_iter()
.map(|model| Self::new(db.clone(), model))
.collect();
Ok(namespaces)
}
/// Retrieves the namespace by id
#[tracing::instrument(level = "debug", skip(db))]
pub async fn by_id(db: DatabaseConnection, id: i64) -> RepoResult<Option<Self>> {
let namespace = namespace::Entity::find_by_id(id)
.one(&db)
.await?
.map(|model| Self::new(db, model));
Ok(namespace)
}
/// Retrieves a namespace by its name
#[tracing::instrument(level = "debug", skip(db))]
pub async fn by_name<S: AsRef<str> + Debug>(
db: DatabaseConnection,
name: S,
) -> RepoResult<Option<Self>> {
let namespace = namespace::Entity::find()
.filter(namespace::Column::Name.eq(name.as_ref()))
.one(&db)
.await?
.map(|model| Self::new(db, model));
Ok(namespace)
}
/// Returns all namespaces by name
#[tracing::instrument(level = "debug", skip(db))]
pub async fn all_by_name(db: DatabaseConnection, names: Vec<String>) -> RepoResult<Vec<Self>> {
if names.is_empty() {
return Ok(Vec::with_capacity(0));
}
let mut condition = Condition::any();
for name in names {
condition = condition.add(namespace::Column::Name.eq(name));
}
let namespaces = namespace::Entity::find()
.filter(condition)
.all(&db)
.await?
.into_iter()
.map(|model| Self::new(db.clone(), model))
.collect();
Ok(namespaces)
}
/// Adds all namespaces to the database
#[tracing::instrument(level = "debug", skip(db))]
pub async fn add_all(db: DatabaseConnection, names: Vec<String>) -> RepoResult<Vec<Self>> {
if names.is_empty() {
return Ok(vec![]);
}
let models: Vec<namespace::ActiveModel> = names
.into_iter()
.map(|name| namespace::ActiveModel {
name: Set(name),
..Default::default()
})
.collect();
let txn = db.begin().await?;
let last_id = txn
.query_one(Statement::from_string(
DatabaseBackend::Sqlite,
r#"SELECT MAX(id) AS "max_id" FROM namespaces;"#.to_owned(),
))
.await?
.and_then(|result| result.try_get("", "max_id").ok())
.unwrap_or(-1);
let result: InsertResult<namespace::ActiveModel> =
namespace::Entity::insert_many(models).exec(&txn).await?;
let namespaces = namespace::Entity::find()
.filter(namespace::Column::Id.between(last_id, result.last_insert_id + 1))
.all(&txn)
.await?
.into_iter()
.map(|model| Self::new(db.clone(), model))
.collect();
txn.commit().await?;
Ok(namespaces)
}
/// Adds a namespace to the database
#[tracing::instrument(level = "debug", skip(db))]
pub async fn add<S: ToString + Debug>(db: DatabaseConnection, name: S) -> RepoResult<Self> {
let active_model = namespace::ActiveModel {
name: Set(name.to_string()),
..Default::default()
};
let model = active_model.insert(&db).await?;
Ok(Self::new(db, model))
}
/// The ID of the namespace
pub fn id(&self) -> i64 {
self.model.id
}
/// The name of the namespace
pub fn name(&self) -> &String {
&self.model.name
}
}

@ -1,432 +0,0 @@
use crate::content_descriptor::ContentDescriptor;
use crate::file::filter::FilterProperty;
use crate::file::File;
use crate::file_metadata::FileMetadata;
use crate::namespace::Namespace;
use crate::tag::Tag;
use crate::thumbnail::Thumbnail;
use chrono::{Local, NaiveDateTime};
use mediarepo_core::content_descriptor::{encode_content_descriptor, is_v1_content_descriptor};
use mediarepo_core::error::{RepoError, RepoResult};
use mediarepo_core::fs::file_hash_store::FileHashStore;
use mediarepo_core::fs::thumbnail_store::{Dimensions, ThumbnailStore};
use mediarepo_core::itertools::Itertools;
use mediarepo_core::thumbnailer::ThumbnailSize;
use mediarepo_core::utils::parse_namespace_and_tag;
use mediarepo_database::get_database;
use mediarepo_database::queries::analysis::{get_all_counts, Counts};
use sea_orm::DatabaseConnection;
use std::collections::{HashMap, HashSet};
use std::fmt::Debug;
use std::io::Cursor;
use std::iter::FromIterator;
use std::path::PathBuf;
use std::str::FromStr;
use tokio::fs::OpenOptions;
use tokio::io::AsyncReadExt;
#[derive(Clone)]
pub struct Repo {
db: DatabaseConnection,
main_storage: FileHashStore,
thumbnail_storage: ThumbnailStore,
}
impl Repo {
pub(crate) fn new(
db: DatabaseConnection,
file_store_path: PathBuf,
thumb_store_path: PathBuf,
) -> Self {
Self {
db,
main_storage: FileHashStore::new(file_store_path),
thumbnail_storage: ThumbnailStore::new(thumb_store_path),
}
}
/// Connects to the database with the given uri
#[tracing::instrument(level = "debug")]
pub async fn connect<S: AsRef<str> + Debug>(
uri: S,
file_store_path: PathBuf,
thumb_store_path: PathBuf,
) -> RepoResult<Self> {
let db = get_database(uri).await?;
Ok(Self::new(db, file_store_path, thumb_store_path))
}
/// Returns the database of the repo for raw sql queries
pub fn db(&self) -> &DatabaseConnection {
&self.db
}
/// Returns a file by its mapped hash
#[tracing::instrument(level = "debug", skip(self))]
pub async fn file_by_cd(&self, cd: &[u8]) -> RepoResult<Option<File>> {
File::by_cd(self.db.clone(), cd).await
}
/// Returns a file by id
#[tracing::instrument(level = "debug", skip(self))]
pub async fn file_by_id(&self, id: i64) -> RepoResult<Option<File>> {
File::by_id(self.db.clone(), id).await
}
/// Returns a list of all stored files
#[tracing::instrument(level = "debug", skip(self))]
pub async fn files(&self) -> RepoResult<Vec<File>> {
File::all(self.db.clone()).await
}
/// Finds all files by a list of tags
#[tracing::instrument(level = "debug", skip(self))]
pub async fn find_files_by_filters(
&self,
filters: Vec<Vec<FilterProperty>>,
) -> RepoResult<Vec<File>> {
File::find_by_filters(self.db.clone(), filters).await
}
/// Returns all file metadata entries for the given file ids
#[tracing::instrument(level = "debug", skip(self))]
pub async fn get_file_metadata_for_ids(&self, ids: Vec<i64>) -> RepoResult<Vec<FileMetadata>> {
FileMetadata::all_by_ids(self.db.clone(), ids).await
}
/// Adds a file from bytes to the database
#[tracing::instrument(level = "debug", skip(self, content))]
pub async fn add_file(
&self,
mime_type: Option<String>,
content: Vec<u8>,
creation_time: NaiveDateTime,
change_time: NaiveDateTime,
) -> RepoResult<File> {
let file_size = content.len();
let reader = Cursor::new(content);
let cd_binary = self.main_storage.add_file(reader, None).await?;
let cd = ContentDescriptor::add(self.db.clone(), cd_binary).await?;
let mime_type = mime_type
.and_then(|m| mime::Mime::from_str(&m).ok())
.unwrap_or_else(|| mime::APPLICATION_OCTET_STREAM)
.to_string();
let file = File::add(self.db.clone(), cd.id(), mime_type).await?;
FileMetadata::add(
self.db.clone(),
file.id(),
file_size as i64,
creation_time,
change_time,
)
.await?;
Ok(file)
}
/// Adds a file to the database by its readable path in the file system
#[tracing::instrument(level = "debug", skip(self))]
pub async fn add_file_by_path(&self, path: PathBuf) -> RepoResult<File> {
let mime_type = mime_guess::from_path(&path).first().map(|m| m.to_string());
let mut os_file = OpenOptions::new().read(true).open(&path).await?;
let mut buf = Vec::new();
os_file.read_to_end(&mut buf).await?;
self.add_file(
mime_type,
buf,
Local::now().naive_local(),
Local::now().naive_local(),
)
.await
}
/// Deletes a file from the database and disk
#[tracing::instrument(level = "debug", skip(self, file))]
pub async fn delete_file(&self, file: File) -> RepoResult<()> {
let cd = file.cd().to_owned();
let cd_string = file.encoded_cd();
file.delete().await?;
self.main_storage.delete_file(&cd).await?;
self.thumbnail_storage.delete_parent(&cd_string).await?;
Ok(())
}
/// Returns all thumbnails of a file
pub async fn get_file_thumbnails(&self, file_cd: &[u8]) -> RepoResult<Vec<Thumbnail>> {
let file_cd = encode_content_descriptor(file_cd);
let thumbnails = self
.thumbnail_storage
.get_thumbnails(&file_cd)
.await?
.into_iter()
.map(|(size, path)| Thumbnail {
file_hash: file_cd.to_owned(),
path,
size,
mime_type: mime::IMAGE_PNG.to_string(),
})
.collect_vec();
Ok(thumbnails)
}
pub async fn get_file_bytes(&self, file: &File) -> RepoResult<Vec<u8>> {
let mut buf = Vec::new();
let mut reader = file.get_reader(&self.main_storage).await?;
reader.read_to_end(&mut buf).await?;
Ok(buf)
}
/// Creates thumbnails of all sizes for a file
#[tracing::instrument(level = "debug", skip(self, file))]
pub async fn create_thumbnails_for_file(&self, file: &File) -> RepoResult<Vec<Thumbnail>> {
let size = ThumbnailSize::Medium;
let (height, width) = size.dimensions();
let thumbs = file.create_thumbnail(&self.main_storage, [size]).await?;
let mut created_thumbs = Vec::with_capacity(1);
for thumb in thumbs {
let entry = self
.store_single_thumbnail(file.encoded_cd(), height, width, thumb)
.await?;
created_thumbs.push(entry);
}
Ok(created_thumbs)
}
#[tracing::instrument(level = "debug", skip(self, file))]
pub async fn create_file_thumbnail(
&self,
file: &File,
size: ThumbnailSize,
) -> RepoResult<Thumbnail> {
let (height, width) = size.dimensions();
let thumb = file
.create_thumbnail(&self.main_storage, [size])
.await?
.pop()
.ok_or_else(|| RepoError::from("Failed to create thumbnail"))?;
let thumbnail = self
.store_single_thumbnail(file.encoded_cd(), height, width, thumb)
.await?;
Ok(thumbnail)
}
/// Stores a single thumbnail
async fn store_single_thumbnail(
&self,
file_hash: String,
height: u32,
width: u32,
thumb: mediarepo_core::thumbnailer::Thumbnail,
) -> RepoResult<Thumbnail> {
let mut buf = Vec::new();
thumb.write_png(&mut buf)?;
let size = Dimensions { height, width };
let path = self
.thumbnail_storage
.add_thumbnail(&file_hash, size.clone(), &buf)
.await?;
let thumbnail = Thumbnail {
file_hash,
path,
size,
mime_type: mime::IMAGE_PNG.to_string(),
};
Ok(thumbnail)
}
/// Returns all tags stored in the database
#[tracing::instrument(level = "debug", skip(self))]
pub async fn tags(&self) -> RepoResult<Vec<Tag>> {
Tag::all(self.db.clone()).await
}
/// Returns all namespaces stored in the database
#[tracing::instrument(level = "debug", skip(self))]
pub async fn namespaces(&self) -> RepoResult<Vec<Namespace>> {
Namespace::all(self.db.clone()).await
}
/// Converts a list of tag names to tag ids
#[tracing::instrument(level = "debug", skip(self))]
pub async fn tag_names_to_ids(&self, tags: Vec<String>) -> RepoResult<HashMap<String, i64>> {
let parsed_tags = tags
.iter()
.map(|tag| parse_namespace_and_tag(tag.clone()))
.unique()
.collect();
let db_tags = self.tags_by_names(parsed_tags).await?;
let tag_map: HashMap<String, i64> =
HashMap::from_iter(db_tags.into_iter().map(|t| (t.normalized_name(), t.id())));
Ok(tag_map)
}
/// Finds all tags by name
#[tracing::instrument(level = "debug", skip(self))]
pub async fn tags_by_names(&self, tags: Vec<(Option<String>, String)>) -> RepoResult<Vec<Tag>> {
Tag::all_by_name(self.db.clone(), tags).await
}
/// Finds all tags that are assigned to the given list of hashes
#[tracing::instrument(level = "debug", skip_all)]
pub async fn find_tags_for_file_identifiers(&self, cds: Vec<Vec<u8>>) -> RepoResult<Vec<Tag>> {
Tag::for_cd_list(self.db.clone(), cds).await
}
/// Adds all tags that are not in the database to the database and returns the ones already existing as well
#[tracing::instrument(level = "debug", skip_all)]
pub async fn add_all_tags(&self, tags: Vec<(Option<String>, String)>) -> RepoResult<Vec<Tag>> {
let mut tags_to_add = tags.into_iter().unique().collect_vec();
let mut namespaces_to_add = tags_to_add
.iter()
.filter_map(|(namespace, _)| namespace.clone())
.unique()
.collect_vec();
let mut existing_namespaces =
Namespace::all_by_name(self.db.clone(), namespaces_to_add.clone()).await?;
{
let existing_namespaces_set = existing_namespaces
.iter()
.map(|n| n.name().clone())
.collect::<HashSet<String>>();
namespaces_to_add.retain(|namespace| !existing_namespaces_set.contains(namespace));
}
existing_namespaces
.append(&mut Namespace::add_all(self.db.clone(), namespaces_to_add).await?);
let mut existing_tags = self.tags_by_names(tags_to_add.clone()).await?;
{
let existing_tags_set = existing_tags
.iter()
.map(|t| (t.namespace().map(|n| n.name().clone()), t.name().clone()))
.collect::<HashSet<(Option<String>, String)>>();
tags_to_add.retain(|t| !existing_tags_set.contains(t));
}
let namespace_map = existing_namespaces
.into_iter()
.map(|namespace| (namespace.name().clone(), namespace.id()))
.collect::<HashMap<String, i64>>();
let tags_to_add = tags_to_add
.into_iter()
.map(|(nsp, name)| (nsp.and_then(|n| namespace_map.get(&n)).map(|i| *i), name))
.collect_vec();
existing_tags.append(&mut Tag::add_all(self.db.clone(), tags_to_add).await?);
Ok(existing_tags)
}
/// Adds or finds a tag
#[tracing::instrument(level = "debug", skip(self))]
pub async fn add_or_find_tag<S: ToString + Debug>(&self, tag: S) -> RepoResult<Tag> {
let (namespace, name) = parse_namespace_and_tag(tag.to_string());
if let Some(namespace) = namespace {
self.add_or_find_namespaced_tag(name, namespace).await
} else {
self.add_or_find_unnamespaced_tag(name).await
}
}
/// Adds or finds an unnamespaced tag
#[tracing::instrument(level = "debug", skip(self))]
pub async fn add_or_find_unnamespaced_tag(&self, name: String) -> RepoResult<Tag> {
if let Some(tag) = Tag::by_name(self.db.clone(), &name, None).await? {
Ok(tag)
} else {
self.add_unnamespaced_tag(name).await
}
}
/// Adds an unnamespaced tag
#[tracing::instrument(level = "debug", skip(self))]
pub async fn add_unnamespaced_tag(&self, name: String) -> RepoResult<Tag> {
Tag::add(self.db.clone(), name, None).await
}
/// Adds or finds a namespaced tag
#[tracing::instrument(level = "debug", skip(self))]
pub async fn add_or_find_namespaced_tag(
&self,
name: String,
namespace: String,
) -> RepoResult<Tag> {
if let Some(tag) = Tag::by_name(self.db.clone(), &name, Some(namespace.clone())).await? {
Ok(tag)
} else {
self.add_namespaced_tag(name, namespace).await
}
}
/// Adds a namespaced tag
#[tracing::instrument(level = "debug", skip(self))]
pub async fn add_namespaced_tag(&self, name: String, namespace: String) -> RepoResult<Tag> {
let namespace =
if let Some(namespace) = Namespace::by_name(self.db.clone(), &namespace).await? {
namespace
} else {
Namespace::add(self.db.clone(), namespace).await?
};
Tag::add(self.db.clone(), name, Some(namespace.id())).await
}
/// Returns the size of the main storage
#[inline]
#[tracing::instrument(level = "debug", skip(self))]
pub async fn get_main_store_size(&self) -> RepoResult<u64> {
self.main_storage.get_size().await
}
/// Returns the size of the thumbnail storage
#[inline]
#[tracing::instrument(level = "debug", skip(self))]
pub async fn get_thumb_store_size(&self) -> RepoResult<u64> {
self.thumbnail_storage.get_size().await
}
/// Returns all entity counts
#[inline]
#[tracing::instrument(level = "debug", skip(self))]
pub async fn get_counts(&self) -> RepoResult<Counts> {
get_all_counts(&self.db).await
}
pub async fn migrate(&self) -> RepoResult<()> {
let cds = ContentDescriptor::all(self.db.clone()).await?;
tracing::info!("Converting content descriptors to v2 format...");
let mut converted_count = 0;
for mut cd in cds {
if is_v1_content_descriptor(cd.descriptor()) {
let src_cd = cd.descriptor().to_owned();
cd.convert_v1_to_v2().await?;
let dst_cd = cd.descriptor().to_owned();
self.main_storage.rename_file(&src_cd, &dst_cd).await?;
self.thumbnail_storage
.rename_parent(
encode_content_descriptor(&src_cd),
encode_content_descriptor(&dst_cd),
)
.await?;
converted_count += 1;
}
}
tracing::info!("Converted {} descriptors", converted_count);
Ok(())
}
}

@ -1,226 +0,0 @@
use std::fmt::Debug;
use mediarepo_core::error::RepoResult;
use mediarepo_database::entities::content_descriptor;
use mediarepo_database::entities::content_descriptor_tag;
use mediarepo_database::entities::namespace;
use mediarepo_database::entities::tag;
use sea_orm::prelude::*;
use sea_orm::query::ConnectionTrait;
use sea_orm::sea_query::Expr;
use sea_orm::{Condition, DatabaseBackend, DatabaseConnection, JoinType, Set, Statement};
use sea_orm::{InsertResult, QuerySelect};
use crate::namespace::Namespace;
#[derive(Clone)]
pub struct Tag {
db: DatabaseConnection,
model: tag::Model,
namespace: Option<namespace::Model>,
}
impl Tag {
#[tracing::instrument(level = "trace")]
pub(crate) fn new(
db: DatabaseConnection,
model: tag::Model,
namespace: Option<namespace::Model>,
) -> Self {
Self {
db,
model,
namespace,
}
}
/// Returns all tags stored in the database
#[tracing::instrument(level = "debug", skip(db))]
pub async fn all(db: DatabaseConnection) -> RepoResult<Vec<Self>> {
let tags: Vec<Self> = tag::Entity::find()
.left_join(namespace::Entity)
.select_also(namespace::Entity)
.all(&db)
.await?
.into_iter()
.map(|(tag, namespace)| Self::new(db.clone(), tag, namespace))
.collect();
Ok(tags)
}
/// Returns the tag by id
#[tracing::instrument(level = "debug", skip(db))]
pub async fn by_id(db: DatabaseConnection, id: i64) -> RepoResult<Option<Self>> {
let tag = tag::Entity::find_by_id(id)
.find_also_related(namespace::Entity)
.one(&db)
.await?
.map(|(model, namespace)| Self::new(db, model, namespace));
Ok(tag)
}
/// Returns one tag by name and namespace
#[tracing::instrument(level = "debug", skip(db))]
pub async fn by_name<S1: ToString + Debug>(
db: DatabaseConnection,
name: S1,
namespace: Option<String>,
) -> RepoResult<Option<Self>> {
let mut entries = Self::all_by_name(db, vec![(namespace, name.to_string())]).await?;
Ok(entries.pop())
}
/// Retrieves the namespaced tags by name and namespace
#[tracing::instrument(level = "debug", skip(db))]
pub async fn all_by_name(
db: DatabaseConnection,
namespaces_with_names: Vec<(Option<String>, String)>,
) -> RepoResult<Vec<Self>> {
if namespaces_with_names.is_empty() {
return Ok(vec![]);
}
let mut or_condition = Condition::any();
for (namespace, name) in namespaces_with_names {
let mut all_condition = Condition::all();
if !name.ends_with('*') {
all_condition = all_condition.add(tag::Column::Name.eq(name))
} else if name.len() > 1 {
all_condition = all_condition
.add(tag::Column::Name.like(&*format!("{}%", name.trim_end_matches("*"))))
} else if namespace.is_none() {
continue; // would result in an empty condition otherwise
}
all_condition = if let Some(namespace) = namespace {
all_condition.add(namespace::Column::Name.eq(namespace))
} else {
all_condition.add(Expr::tbl(tag::Entity, tag::Column::NamespaceId).is_null())
};
or_condition = or_condition.add(all_condition);
}
let tags: Vec<Self> = tag::Entity::find()
.find_also_related(namespace::Entity)
.filter(or_condition)
.group_by(tag::Column::Id)
.all(&db)
.await?
.into_iter()
.map(|(t, n)| Self::new(db.clone(), t, n))
.collect();
Ok(tags)
}
/// Returns all tags that are assigned to any of the passed hashes
#[tracing::instrument(level = "debug", skip_all)]
pub async fn for_cd_list(db: DatabaseConnection, cds: Vec<Vec<u8>>) -> RepoResult<Vec<Self>> {
let tags: Vec<Self> = tag::Entity::find()
.find_also_related(namespace::Entity)
.join(
JoinType::LeftJoin,
content_descriptor_tag::Relation::Tag.def().rev(),
)
.join(
JoinType::InnerJoin,
content_descriptor_tag::Relation::ContentDescriptorId.def(),
)
.filter(content_descriptor::Column::Descriptor.is_in(cds))
.group_by(tag::Column::Id)
.all(&db)
.await?
.into_iter()
.map(|(t, n)| Self::new(db.clone(), t, n))
.collect();
Ok(tags)
}
pub async fn add_all(
db: DatabaseConnection,
namespaces_with_names: Vec<(Option<i64>, String)>,
) -> RepoResult<Vec<Self>> {
if namespaces_with_names.is_empty() {
return Ok(vec![]);
}
let models: Vec<tag::ActiveModel> = namespaces_with_names
.into_iter()
.map(|(namespace_id, name)| tag::ActiveModel {
name: Set(name),
namespace_id: Set(namespace_id),
..Default::default()
})
.collect();
let txn = db.begin().await?;
let last_id: i64 = txn
.query_one(Statement::from_string(
DatabaseBackend::Sqlite,
r#"SELECT MAX(id) as "max_id" FROM tags"#.to_owned(),
))
.await?
.and_then(|res| res.try_get("", "max_id").ok())
.unwrap_or(-1);
let result: InsertResult<tag::ActiveModel> =
tag::Entity::insert_many(models).exec(&txn).await?;
let tags: Vec<Self> = tag::Entity::find()
.find_also_related(namespace::Entity)
.filter(tag::Column::Id.between(last_id, result.last_insert_id + 1))
.all(&txn)
.await?
.into_iter()
.map(|(t, n)| Self::new(db.clone(), t, n))
.collect();
txn.commit().await?;
Ok(tags)
}
/// Adds a new tag to the database
#[tracing::instrument(level = "debug", skip(db))]
pub async fn add<S: ToString + Debug>(
db: DatabaseConnection,
name: S,
namespace_id: Option<i64>,
) -> RepoResult<Self> {
let active_model = tag::ActiveModel {
name: Set(name.to_string()),
namespace_id: Set(namespace_id),
..Default::default()
};
let model: tag::Model = active_model.insert(&db).await?;
let namespace = model.find_related(namespace::Entity).one(&db).await?;
Ok(Self::new(db, model, namespace))
}
/// The ID of the tag
pub fn id(&self) -> i64 {
self.model.id
}
/// The name of the tag
pub fn name(&self) -> &String {
&self.model.name
}
/// The namespace of the tag
pub fn namespace(&self) -> Option<Namespace> {
self.namespace
.clone()
.map(|n| Namespace::new(self.db.clone(), n))
}
/// Returns the normalized name of the tag (namespace:tag)
pub fn normalized_name(&self) -> String {
if let Some(namespace) = &self.namespace {
format!("{}:{}", namespace.name, self.model.name)
} else {
self.model.name.to_owned()
}
}
}

@ -1,30 +0,0 @@
use mediarepo_core::error::RepoResult;
use mediarepo_core::fs::thumbnail_store::Dimensions;
use std::path::PathBuf;
use tokio::fs::{self, File, OpenOptions};
use tokio::io::BufReader;
#[derive(Debug)]
pub struct Thumbnail {
pub file_hash: String,
pub path: PathBuf,
pub size: Dimensions,
pub mime_type: String,
}
impl Thumbnail {
/// Returns the reader of the thumbnail file
#[tracing::instrument(level = "debug")]
pub async fn get_reader(&self) -> RepoResult<BufReader<File>> {
let file = OpenOptions::new().read(true).open(&self.path).await?;
Ok(BufReader::new(file))
}
/// Deletes the thumbnail
#[tracing::instrument(level = "debug")]
pub async fn delete(self) -> RepoResult<()> {
fs::remove_file(&self.path).await?;
Ok(())
}
}

@ -19,8 +19,8 @@ path = "../mediarepo-core"
[dependencies.mediarepo-database]
path = "../mediarepo-database"
[dependencies.mediarepo-model]
path = "../mediarepo-model"
[dependencies.mediarepo-logic]
path = "../mediarepo-logic"
[dependencies.tokio]
version = "^1.15.0"

@ -2,36 +2,34 @@ use mediarepo_core::mediarepo_api::types::files::{
FileBasicDataResponse, FileMetadataResponse, FileStatus, ThumbnailMetadataResponse,
};
use mediarepo_core::mediarepo_api::types::tags::{NamespaceResponse, TagResponse};
use mediarepo_model::file::{File, FileStatus as FileStatusModel};
use mediarepo_model::file_metadata::FileMetadata;
use mediarepo_model::namespace::Namespace;
use mediarepo_model::tag::Tag;
use mediarepo_model::thumbnail::Thumbnail;
use mediarepo_logic::dto::{
FileDto, FileMetadataDto, FileStatus as FileStatusModel, NamespaceDto, TagDto, ThumbnailDto,
};
pub trait FromModel<M> {
fn from_model(model: M) -> Self;
}
impl FromModel<FileMetadata> for FileMetadataResponse {
fn from_model(metadata: FileMetadata) -> Self {
impl FromModel<FileMetadataDto> for FileMetadataResponse {
fn from_model(model: FileMetadataDto) -> Self {
Self {
file_id: metadata.file_id(),
name: metadata.name().to_owned(),
comment: metadata.comment().to_owned(),
creation_time: metadata.creation_time().to_owned(),
change_time: metadata.change_time().to_owned(),
import_time: metadata.import_time().to_owned(),
file_id: model.file_id(),
name: model.name().cloned(),
comment: model.comment().cloned(),
creation_time: model.creation_time().to_owned(),
change_time: model.change_time().to_owned(),
import_time: model.import_time().to_owned(),
}
}
}
impl FromModel<File> for FileBasicDataResponse {
fn from_model(file: File) -> Self {
impl FromModel<FileDto> for FileBasicDataResponse {
fn from_model(model: FileDto) -> Self {
FileBasicDataResponse {
id: file.id(),
status: FileStatus::from_model(file.status()),
cd: file.encoded_cd(),
mime_type: file.mime_type().to_owned(),
id: model.id(),
status: FileStatus::from_model(model.status()),
cd: model.encoded_cd(),
mime_type: model.mime_type().to_owned(),
}
}
}
@ -46,8 +44,8 @@ impl FromModel<FileStatusModel> for FileStatus {
}
}
impl FromModel<Tag> for TagResponse {
fn from_model(model: Tag) -> Self {
impl FromModel<TagDto> for TagResponse {
fn from_model(model: TagDto) -> Self {
Self {
id: model.id(),
namespace: model.namespace().map(|n| n.name().to_owned()),
@ -56,19 +54,19 @@ impl FromModel<Tag> for TagResponse {
}
}
impl FromModel<Thumbnail> for ThumbnailMetadataResponse {
fn from_model(model: Thumbnail) -> Self {
impl FromModel<ThumbnailDto> for ThumbnailMetadataResponse {
fn from_model(model: ThumbnailDto) -> Self {
Self {
file_hash: model.file_hash,
height: model.size.height,
width: model.size.width,
mime_type: model.mime_type.to_owned(),
file_hash: model.parent_cd().to_owned(),
height: model.size().height,
width: model.size().width,
mime_type: model.mime_type().to_owned(),
}
}
}
impl FromModel<Namespace> for NamespaceResponse {
fn from_model(model: Namespace) -> Self {
impl FromModel<NamespaceDto> for NamespaceResponse {
fn from_model(model: NamespaceDto) -> Self {
Self {
id: model.id(),
name: model.name().to_owned(),

@ -1,16 +1,18 @@
use std::net::SocketAddr;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::net::TcpListener;
use tokio::task::JoinHandle;
use mediarepo_core::bromine::prelude::*;
use mediarepo_core::error::{RepoError, RepoResult};
use mediarepo_core::mediarepo_api::types::misc::InfoResponse;
use mediarepo_core::settings::{PortSetting, Settings};
use mediarepo_core::tokio_graceful_shutdown::SubsystemHandle;
use mediarepo_core::type_keys::{RepoPathKey, SettingsKey, SizeMetadataKey, SubsystemKey};
use mediarepo_model::repo::Repo;
use mediarepo_model::type_keys::RepoKey;
use std::net::SocketAddr;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::net::TcpListener;
use tokio::task::JoinHandle;
use mediarepo_logic::dao::repo::Repo;
use mediarepo_logic::type_keys::RepoKey;
mod from_model;
mod namespaces;

@ -1,11 +1,8 @@
mod searching;
mod sorting;
use tokio::io::AsyncReadExt;
use crate::from_model::FromModel;
use crate::namespaces::files::searching::find_files_for_filters;
use crate::namespaces::files::sorting::sort_files_by_properties;
use crate::utils::{cd_by_identifier, file_by_identifier, get_repo_from_context};
use mediarepo_core::bromine::prelude::*;
use mediarepo_core::content_descriptor::{create_content_descriptor, encode_content_descriptor};
use mediarepo_core::error::RepoError;
use mediarepo_core::fs::thumbnail_store::Dimensions;
use mediarepo_core::itertools::Itertools;
use mediarepo_core::mediarepo_api::types::files::{
@ -17,7 +14,16 @@ use mediarepo_core::mediarepo_api::types::filtering::FindFilesRequest;
use mediarepo_core::mediarepo_api::types::identifier::FileIdentifier;
use mediarepo_core::thumbnailer::ThumbnailSize;
use mediarepo_core::utils::parse_namespace_and_tag;
use tokio::io::AsyncReadExt;
use mediarepo_logic::dao::DaoProvider;
use mediarepo_logic::dto::{AddFileDto, AddTagDto, UpdateFileDto, UpdateFileMetadataDto};
use crate::from_model::FromModel;
use crate::namespaces::files::searching::find_files_for_filters;
use crate::namespaces::files::sorting::sort_files_by_properties;
use crate::utils::{cd_by_identifier, file_by_identifier, get_repo_from_context};
mod searching;
mod sorting;
pub struct FilesNamespace;
@ -50,7 +56,7 @@ impl FilesNamespace {
#[tracing::instrument(skip_all)]
async fn all_files(ctx: &Context, _event: Event) -> IPCResult<()> {
let repo = get_repo_from_context(ctx).await;
let files = repo.files().await?;
let files = repo.file().all().await?;
let responses: Vec<FileBasicDataResponse> = files
.into_iter()
@ -80,7 +86,17 @@ impl FilesNamespace {
let id = event.payload::<FileIdentifier>()?;
let repo = get_repo_from_context(ctx).await;
let file = file_by_identifier(id, &repo).await?;
let metadata = file.metadata().await?;
let file_id = file.id();
let metadata = if let Some(metadata) = file.into_metadata() {
metadata
} else {
repo.file()
.metadata(file_id)
.await?
.ok_or_else(|| RepoError::from("file metadata not found"))?
};
ctx.emit_to(
Self::name(),
"get_file_metadata",
@ -135,22 +151,38 @@ impl FilesNamespace {
.into_inner();
let AddFileRequestHeader { metadata, tags } = request;
let repo = get_repo_from_context(ctx).await;
let bytes = bytes.into_inner();
let cd = create_content_descriptor(&bytes);
let file = repo
.add_file(
metadata.mime_type,
bytes.into_inner(),
metadata.creation_time,
metadata.change_time,
)
.await?;
file.metadata().await?.set_name(metadata.name).await?;
let file = if let Some(file) = repo.file().by_cd(cd).await? {
tracing::debug!("Inserted file already exists");
file
} else {
let add_dto = AddFileDto {
content: bytes,
mime_type: metadata
.mime_type
.unwrap_or(String::from("application/octet-stream")),
creation_time: metadata.creation_time,
change_time: metadata.change_time,
name: Some(metadata.name),
};
repo.file().add(add_dto).await?
};
let tags = repo
.add_all_tags(tags.into_iter().map(parse_namespace_and_tag).collect())
.tag()
.add_all(
tags.into_iter()
.map(parse_namespace_and_tag)
.map(AddTagDto::from_tuple)
.collect(),
)
.await?;
let tag_ids: Vec<i64> = tags.into_iter().map(|t| t.id()).unique().collect();
file.add_tags(tag_ids).await?;
repo.tag()
.upsert_mappings(vec![file.cd_id()], tag_ids)
.await?;
ctx.emit_to(
Self::name(),
@ -167,7 +199,14 @@ impl FilesNamespace {
let request = event.payload::<UpdateFileStatusRequest>()?;
let repo = get_repo_from_context(ctx).await;
let mut file = file_by_identifier(request.file_id, &repo).await?;
file.set_status(request.status.into()).await?;
file = repo
.file()
.update(UpdateFileDto {
id: file.id(),
status: Some(request.status.into()),
..Default::default()
})
.await?;
ctx.emit_to(
Self::name(),
"update_file_status",
@ -184,7 +223,7 @@ impl FilesNamespace {
let request = event.payload::<ReadFileRequest>()?;
let repo = get_repo_from_context(ctx).await;
let file = file_by_identifier(request.id, &repo).await?;
let bytes = repo.get_file_bytes(&file).await?;
let bytes = repo.file().get_bytes(file.cd()).await?;
ctx.emit_to(Self::name(), "read_file", BytePayload::new(bytes))
.await?;
@ -198,7 +237,7 @@ impl FilesNamespace {
let id = event.payload::<FileIdentifier>()?;
let repo = get_repo_from_context(ctx).await;
let file = file_by_identifier(id, &repo).await?;
repo.delete_file(file).await?;
repo.file().delete(file).await?;
ctx.emit_to(Self::name(), "delete_file", ()).await?;
@ -211,12 +250,18 @@ impl FilesNamespace {
let request = event.payload::<GetFileThumbnailsRequest>()?;
let repo = get_repo_from_context(ctx).await;
let file_cd = cd_by_identifier(request.id.clone(), &repo).await?;
let mut thumbnails = repo.get_file_thumbnails(&file_cd).await?;
let mut thumbnails = repo
.file()
.thumbnails(encode_content_descriptor(&file_cd))
.await?;
if thumbnails.is_empty() {
tracing::debug!("No thumbnails for file found. Creating thumbnails...");
let file = file_by_identifier(request.id, &repo).await?;
thumbnails = repo.create_thumbnails_for_file(&file).await?;
thumbnails = repo
.file()
.create_thumbnails(file, vec![ThumbnailSize::Medium])
.await?;
tracing::debug!("Thumbnails for file created.");
}
@ -236,17 +281,20 @@ impl FilesNamespace {
let request = event.payload::<GetFileThumbnailOfSizeRequest>()?;
let repo = get_repo_from_context(ctx).await;
let file_cd = cd_by_identifier(request.id.clone(), &repo).await?;
let thumbnails = repo.get_file_thumbnails(&file_cd).await?;
let min_size = request.min_size;
let max_size = request.max_size;
let thumbnails = repo
.file()
.thumbnails(encode_content_descriptor(&file_cd))
.await?;
let found_thumbnail = thumbnails.into_iter().find(|thumb| {
let Dimensions { height, width } = thumb.size;
let Dimensions { height, width } = thumb.size();
height >= min_size.0
&& height <= max_size.0
&& width >= min_size.1
&& width <= max_size.1
*height >= min_size.0
&& *height <= max_size.0
&& *width >= min_size.1
&& *width <= max_size.1
});
let thumbnail = if let Some(thumbnail) = found_thumbnail {
@ -255,10 +303,14 @@ impl FilesNamespace {
let file = file_by_identifier(request.id, &repo).await?;
let middle_size = ((max_size.0 + min_size.0) / 2, (max_size.1 + min_size.1) / 2);
let thumbnail = repo
.create_file_thumbnail(&file, ThumbnailSize::Custom(middle_size))
.file()
.create_thumbnails(file, vec![ThumbnailSize::Custom(middle_size)])
.await?;
thumbnail
.into_iter()
.next()
.ok_or_else(|| RepoError::from("thumbnail could not be created"))?
};
let mut buf = Vec::new();
thumbnail.get_reader().await?.read_to_end(&mut buf).await?;
@ -280,8 +332,15 @@ impl FilesNamespace {
let repo = get_repo_from_context(ctx).await;
let request = event.payload::<UpdateFileNameRequest>()?;
let file = file_by_identifier(request.file_id, &repo).await?;
let mut metadata = file.metadata().await?;
metadata.set_name(request.name).await?;
let metadata = repo
.file()
.update_metadata(UpdateFileMetadataDto {
file_id: file.id(),
name: Some(Some(request.name)),
..Default::default()
})
.await?;
ctx.emit_to(
Self::name(),
@ -299,7 +358,7 @@ impl FilesNamespace {
let repo = get_repo_from_context(ctx).await;
let id = event.payload::<FileIdentifier>()?;
let file = file_by_identifier(id, &repo).await?;
let thumbnails = repo.get_file_thumbnails(file.cd()).await?;
let thumbnails = repo.file().thumbnails(file.encoded_cd()).await?;
for thumb in thumbnails {
thumb.delete().await?;

@ -1,25 +1,27 @@
use std::collections::HashMap;
use mediarepo_core::content_descriptor::decode_content_descriptor;
use mediarepo_core::error::RepoResult;
use mediarepo_core::mediarepo_api::types::files::FileStatus as ApiFileStatus;
use mediarepo_core::mediarepo_api::types::filtering::{
FilterExpression, FilterQuery, PropertyQuery, TagQuery, ValueComparator,
};
use mediarepo_model::file::filter::NegatableComparator::{Is, IsNot};
use mediarepo_model::file::filter::{FilterFileProperty, FilterProperty, OrderingComparator};
use mediarepo_model::file::{File, FileStatus};
use mediarepo_model::repo::Repo;
use std::collections::HashMap;
use mediarepo_logic::dao::file::find::NegatableComparator::{Is, IsNot};
use mediarepo_logic::dao::file::find::{FilterFileProperty, FilterProperty, OrderingComparator};
use mediarepo_logic::dao::repo::Repo;
use mediarepo_logic::dao::DaoProvider;
use mediarepo_logic::dto::{FileDto, FileStatus};
#[tracing::instrument(level = "debug", skip(repo))]
pub async fn find_files_for_filters(
repo: &Repo,
expressions: Vec<FilterExpression>,
) -> RepoResult<Vec<File>> {
) -> RepoResult<Vec<FileDto>> {
let tag_names = get_tag_names_from_expressions(&expressions);
let tag_id_map = repo.tag_names_to_ids(tag_names).await?;
let tag_id_map = repo.tag().normalized_tags_to_ids(tag_names).await?;
let filters = build_filters_from_expressions(expressions, &tag_id_map);
repo.find_files_by_filters(filters).await
repo.file().find(filters).await
}
#[tracing::instrument(level = "debug")]

@ -1,17 +1,20 @@
use std::cmp::Ordering;
use std::collections::HashMap;
use std::iter::FromIterator;
use chrono::NaiveDateTime;
use compare::Compare;
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
use mediarepo_core::error::RepoResult;
use mediarepo_core::mediarepo_api::types::filtering::{SortDirection, SortKey};
use mediarepo_database::queries::tags::{
get_cids_with_namespaced_tags, get_content_descriptors_with_tag_count,
};
use mediarepo_model::file::File;
use mediarepo_model::file_metadata::FileMetadata;
use mediarepo_model::repo::Repo;
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
use std::cmp::Ordering;
use std::collections::HashMap;
use std::iter::FromIterator;
use mediarepo_logic::dao::DaoProvider;
use mediarepo_logic::dao::repo::Repo;
use mediarepo_logic::dto::{FileDto, FileMetadataDto};
pub struct FileSortContext {
name: Option<String>,
@ -28,7 +31,7 @@ pub struct FileSortContext {
pub async fn sort_files_by_properties(
repo: &Repo,
sort_expression: Vec<SortKey>,
files: &mut Vec<File>,
files: &mut Vec<FileDto>,
) -> RepoResult<()> {
let contexts = build_sort_context(repo, files).await?;
@ -45,7 +48,7 @@ pub async fn sort_files_by_properties(
async fn build_sort_context(
repo: &Repo,
files: &Vec<File>,
files: &Vec<FileDto>,
) -> RepoResult<HashMap<i64, FileSortContext>> {
let hash_ids: Vec<i64> = files.par_iter().map(|f| f.cd_id()).collect();
let file_ids: Vec<i64> = files.par_iter().map(|f| f.id()).collect();
@ -54,9 +57,9 @@ async fn build_sort_context(
get_cids_with_namespaced_tags(repo.db(), hash_ids.clone()).await?;
let mut cid_tag_counts = get_content_descriptors_with_tag_count(repo.db(), hash_ids).await?;
let files_metadata = repo.get_file_metadata_for_ids(file_ids).await?;
let files_metadata = repo.file().all_metadata(file_ids).await?;
let mut file_metadata_map: HashMap<i64, FileMetadata> =
let mut file_metadata_map: HashMap<i64, FileMetadataDto> =
HashMap::from_iter(files_metadata.into_iter().map(|m| (m.file_id(), m)));
let mut contexts = HashMap::new();
@ -64,7 +67,7 @@ async fn build_sort_context(
for file in files {
if let Some(metadata) = file_metadata_map.remove(&file.id()) {
let context = FileSortContext {
name: metadata.name().to_owned(),
name: metadata.name().cloned(),
size: metadata.size() as u64,
mime_type: file.mime_type().to_owned(),
namespaces: cid_nsp

@ -1,9 +1,11 @@
use crate::utils::{calculate_size, get_repo_from_context};
use mediarepo_core::bromine::prelude::*;
use mediarepo_core::error::RepoResult;
use mediarepo_core::mediarepo_api::types::jobs::{JobType, RunJobRequest};
use mediarepo_core::mediarepo_api::types::repo::SizeType;
use mediarepo_core::type_keys::SizeMetadataKey;
use mediarepo_logic::dao::DaoProvider;
use crate::utils::{calculate_size, get_repo_from_context};
pub struct JobsNamespace;
@ -23,12 +25,12 @@ impl JobsNamespace {
#[tracing::instrument(skip_all)]
pub async fn run_job(ctx: &Context, event: Event) -> IPCResult<()> {
let run_request = event.payload::<RunJobRequest>()?;
let repo = get_repo_from_context(ctx).await;
let job_dao = get_repo_from_context(ctx).await.job();
match run_request.job_type {
JobType::MigrateContentDescriptors => repo.migrate().await?,
JobType::MigrateContentDescriptors => job_dao.migrate_content_descriptors().await?,
JobType::CalculateSizes => calculate_all_sizes(ctx).await?,
JobType::CheckIntegrity => {}
JobType::CheckIntegrity => job_dao.check_integrity().await?,
}
ctx.emit_to(Self::name(), "run_job", ()).await?;

@ -1,12 +1,17 @@
use crate::from_model::FromModel;
use crate::utils::{file_by_identifier, get_repo_from_context};
use rayon::iter::{IntoParallelIterator, ParallelIterator};
use mediarepo_core::bromine::prelude::*;
use mediarepo_core::content_descriptor::decode_content_descriptor;
use mediarepo_core::mediarepo_api::types::files::{GetFileTagsRequest, GetFilesTagsRequest};
use mediarepo_core::mediarepo_api::types::tags::{
ChangeFileTagsRequest, NamespaceResponse, TagResponse,
};
use rayon::iter::{IntoParallelIterator, ParallelIterator};
use mediarepo_core::utils::parse_namespace_and_tag;
use mediarepo_logic::dao::DaoProvider;
use mediarepo_logic::dto::AddTagDto;
use crate::from_model::FromModel;
use crate::utils::{file_by_identifier, get_repo_from_context};
pub struct TagsNamespace;
@ -33,7 +38,8 @@ impl TagsNamespace {
async fn all_tags(ctx: &Context, _event: Event) -> IPCResult<()> {
let repo = get_repo_from_context(ctx).await;
let tags: Vec<TagResponse> = repo
.tags()
.tag()
.all()
.await?
.into_iter()
.map(TagResponse::from_model)
@ -48,7 +54,8 @@ impl TagsNamespace {
async fn all_namespaces(ctx: &Context, _event: Event) -> IPCResult<()> {
let repo = get_repo_from_context(ctx).await;
let namespaces: Vec<NamespaceResponse> = repo
.namespaces()
.tag()
.all_namespaces()
.await?
.into_iter()
.map(NamespaceResponse::from_model)
@ -65,7 +72,7 @@ impl TagsNamespace {
let repo = get_repo_from_context(ctx).await;
let request = event.payload::<GetFileTagsRequest>()?;
let file = file_by_identifier(request.id, &repo).await?;
let tags = file.tags().await?;
let tags = repo.tag().tags_for_cd(file.cd_id()).await?;
let responses: Vec<TagResponse> = tags.into_iter().map(TagResponse::from_model).collect();
ctx.emit_to(Self::name(), "tags_for_file", responses)
@ -80,7 +87,8 @@ impl TagsNamespace {
let repo = get_repo_from_context(ctx).await;
let request = event.payload::<GetFilesTagsRequest>()?;
let tag_responses: Vec<TagResponse> = repo
.find_tags_for_file_identifiers(
.tag()
.all_for_cds(
request
.cds
.into_par_iter()
@ -97,17 +105,21 @@ impl TagsNamespace {
Ok(())
}
/// Creates all tags given as input or returns the existing tag
/// Creates all tags given as input or returns the existing tags
#[tracing::instrument(skip_all)]
async fn create_tags(ctx: &Context, event: Event) -> IPCResult<()> {
let repo = get_repo_from_context(ctx).await;
let tags = event.payload::<Vec<String>>()?;
let mut created_tags = Vec::new();
let created_tags = repo
.tag()
.add_all(
tags.into_iter()
.map(parse_namespace_and_tag)
.map(AddTagDto::from_tuple)
.collect(),
)
.await?;
for tag in tags {
let created_tag = repo.add_or_find_tag(tag).await?;
created_tags.push(created_tag);
}
let responses: Vec<TagResponse> = created_tags
.into_iter()
.map(TagResponse::from_model)
@ -126,14 +138,19 @@ impl TagsNamespace {
let file = file_by_identifier(request.file_id, &repo).await?;
if !request.added_tags.is_empty() {
file.add_tags(request.added_tags).await?;
repo.tag()
.upsert_mappings(vec![file.cd_id()], request.added_tags)
.await?;
}
if !request.removed_tags.is_empty() {
file.remove_tags(request.removed_tags).await?;
repo.tag()
.remove_mappings(vec![file.cd_id()], request.removed_tags)
.await?;
}
let responses: Vec<TagResponse> = file
.tags()
let responses: Vec<TagResponse> = repo
.tag()
.tags_for_cd(file.cd_id())
.await?
.into_iter()
.map(TagResponse::from_model)

@ -1,3 +1,7 @@
use std::sync::Arc;
use tokio::fs;
use mediarepo_core::bromine::ipc::context::Context;
use mediarepo_core::content_descriptor::decode_content_descriptor;
use mediarepo_core::error::{RepoError, RepoResult};
@ -5,11 +9,10 @@ use mediarepo_core::mediarepo_api::types::identifier::FileIdentifier;
use mediarepo_core::mediarepo_api::types::repo::SizeType;
use mediarepo_core::type_keys::{RepoPathKey, SettingsKey};
use mediarepo_core::utils::get_folder_size;
use mediarepo_model::file::File;
use mediarepo_model::repo::Repo;
use mediarepo_model::type_keys::RepoKey;
use std::sync::Arc;
use tokio::fs;
use mediarepo_logic::dao::DaoProvider;
use mediarepo_logic::dao::repo::Repo;
use mediarepo_logic::dto::FileDto;
use mediarepo_logic::type_keys::RepoKey;
pub async fn get_repo_from_context(ctx: &Context) -> Arc<Repo> {
let data = ctx.data.read().await;
@ -17,10 +20,10 @@ pub async fn get_repo_from_context(ctx: &Context) -> Arc<Repo> {
Arc::clone(repo)
}
pub async fn file_by_identifier(identifier: FileIdentifier, repo: &Repo) -> RepoResult<File> {
pub async fn file_by_identifier(identifier: FileIdentifier, repo: &Repo) -> RepoResult<FileDto> {
let file = match identifier {
FileIdentifier::ID(id) => repo.file_by_id(id).await,
FileIdentifier::CD(cd) => repo.file_by_cd(&decode_content_descriptor(cd)?).await,
FileIdentifier::ID(id) => repo.file().by_id(id).await,
FileIdentifier::CD(cd) => repo.file().by_cd(decode_content_descriptor(cd)?).await,
}?;
file.ok_or_else(|| RepoError::from("File not found"))
}
@ -29,7 +32,8 @@ pub async fn cd_by_identifier(identifier: FileIdentifier, repo: &Repo) -> RepoRe
match identifier {
FileIdentifier::ID(id) => {
let file = repo
.file_by_id(id)
.file()
.by_id(id)
.await?
.ok_or_else(|| "Thumbnail not found")?;
Ok(file.cd().to_owned())

@ -1,21 +1,22 @@
use console_subscriber::ConsoleLayer;
use rolling_file::RollingConditionBasic;
use std::fs;
use std::path::PathBuf;
use mediarepo_core::settings::LoggingSettings;
use console_subscriber::ConsoleLayer;
use rolling_file::RollingConditionBasic;
use tracing::Level;
use tracing_appender::non_blocking::{NonBlocking, WorkerGuard};
use tracing_flame::FlameLayer;
use tracing_log::LogTracer;
use tracing_subscriber::filter::{self, Targets};
use tracing_subscriber::fmt::format::FmtSpan;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::{
fmt::{self},
Layer, Registry,
};
use tracing_subscriber::filter::{self, Targets};
use tracing_subscriber::fmt::format::FmtSpan;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use mediarepo_core::settings::LoggingSettings;
#[allow(dyn_drop)]
pub type DropGuard = Box<dyn Drop>;

@ -1,7 +1,10 @@
use std::env;
use std::path::PathBuf;
use std::time::Duration;
use structopt::StructOpt;
use tokio::fs;
use tokio::io::AsyncWriteExt;
use tokio::runtime;
use tokio::runtime::Runtime;
@ -9,11 +12,8 @@ use mediarepo_core::error::RepoResult;
use mediarepo_core::fs::drop_file::DropFile;
use mediarepo_core::settings::{PathSettings, Settings};
use mediarepo_core::tokio_graceful_shutdown::{SubsystemHandle, Toplevel};
use mediarepo_model::repo::Repo;
use mediarepo_logic::dao::repo::Repo;
use mediarepo_socket::start_tcp_server;
use std::env;
use std::time::Duration;
use tokio::io::AsyncWriteExt;
use crate::utils::{create_paths_for_repo, get_repo, load_settings};

@ -1,10 +1,12 @@
use mediarepo_core::error::RepoResult;
use mediarepo_core::settings::v1::SettingsV1;
use mediarepo_core::settings::{PathSettings, Settings};
use mediarepo_model::repo::Repo;
use std::path::PathBuf;
use tokio::fs;
use mediarepo_core::error::RepoResult;
use mediarepo_core::settings::{PathSettings, Settings};
use mediarepo_core::settings::v1::SettingsV1;
use mediarepo_logic::dao::repo::Repo;
/// Loads the settings from a toml path
pub fn load_settings(root_path: &PathBuf) -> RepoResult<Settings> {
let contents = std::fs::read_to_string(root_path.join("repo.toml"))?;

Loading…
Cancel
Save