commit
b1ca9a9c57
File diff suppressed because it is too large
Load Diff
@ -1,5 +1,5 @@
|
||||
[package]
|
||||
name = "mediarepo-model"
|
||||
name = "mediarepo-logic"
|
||||
version = "0.1.0"
|
||||
edition = "2018"
|
||||
workspace = ".."
|
@ -0,0 +1,73 @@
|
||||
use std::io::Cursor;
|
||||
|
||||
use chrono::{Local, NaiveDateTime};
|
||||
use sea_orm::ActiveValue::Set;
|
||||
use sea_orm::{ActiveModelTrait, ConnectionTrait, DatabaseTransaction};
|
||||
|
||||
use mediarepo_core::error::RepoResult;
|
||||
use mediarepo_database::entities::{content_descriptor, file, file_metadata};
|
||||
|
||||
use crate::dao::file::FileDao;
|
||||
use crate::dto::{AddFileDto, FileDto};
|
||||
|
||||
impl FileDao {
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
pub async fn add(&self, add_dto: AddFileDto) -> RepoResult<FileDto> {
|
||||
let trx = self.ctx.db.begin().await?;
|
||||
let file_size = add_dto.content.len();
|
||||
let cd_bin = self
|
||||
.ctx
|
||||
.main_storage
|
||||
.add_file(Cursor::new(add_dto.content), None)
|
||||
.await?;
|
||||
let cd_model = content_descriptor::ActiveModel {
|
||||
descriptor: Set(cd_bin),
|
||||
..Default::default()
|
||||
};
|
||||
let cd = cd_model.insert(&trx).await?;
|
||||
|
||||
let model = file::ActiveModel {
|
||||
cd_id: Set(cd.id),
|
||||
mime_type: Set(add_dto.mime_type),
|
||||
..Default::default()
|
||||
};
|
||||
let file: file::Model = model.insert(&trx).await?;
|
||||
|
||||
let metadata = add_file_metadata(
|
||||
&trx,
|
||||
file.id,
|
||||
file_size as i64,
|
||||
add_dto.creation_time,
|
||||
add_dto.change_time,
|
||||
add_dto.name,
|
||||
)
|
||||
.await?;
|
||||
|
||||
trx.commit().await?;
|
||||
|
||||
Ok(FileDto::new(file, cd, Some(metadata)))
|
||||
}
|
||||
}
|
||||
|
||||
async fn add_file_metadata(
|
||||
trx: &DatabaseTransaction,
|
||||
file_id: i64,
|
||||
size: i64,
|
||||
creation_time: NaiveDateTime,
|
||||
change_time: NaiveDateTime,
|
||||
name: Option<String>,
|
||||
) -> RepoResult<file_metadata::Model> {
|
||||
let metadata_model = file_metadata::ActiveModel {
|
||||
file_id: Set(file_id),
|
||||
size: Set(size),
|
||||
import_time: Set(Local::now().naive_local()),
|
||||
creation_time: Set(creation_time),
|
||||
change_time: Set(change_time),
|
||||
name: Set(name),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let metadata = metadata_model.insert(trx).await?;
|
||||
|
||||
Ok(metadata)
|
||||
}
|
@ -0,0 +1,43 @@
|
||||
use sea_orm::ConnectionTrait;
|
||||
use sea_orm::prelude::*;
|
||||
|
||||
use mediarepo_core::error::{RepoResult};
|
||||
use mediarepo_database::entities::{
|
||||
content_descriptor, content_descriptor_tag, file, file_metadata,
|
||||
};
|
||||
|
||||
use crate::dao::file::{FileDao};
|
||||
use crate::dto::FileDto;
|
||||
|
||||
impl FileDao {
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
pub async fn delete(&self, file: FileDto) -> RepoResult<()> {
|
||||
let trx = self.ctx.db.begin().await?;
|
||||
|
||||
file_metadata::Entity::delete_many()
|
||||
.filter(file_metadata::Column::FileId.eq(file.id()))
|
||||
.exec(&trx)
|
||||
.await?;
|
||||
file::Entity::delete_many()
|
||||
.filter(file::Column::Id.eq(file.id()))
|
||||
.exec(&trx)
|
||||
.await?;
|
||||
content_descriptor_tag::Entity::delete_many()
|
||||
.filter(content_descriptor_tag::Column::CdId.eq(file.cd_id()))
|
||||
.exec(&trx)
|
||||
.await?;
|
||||
content_descriptor::Entity::delete_many()
|
||||
.filter(content_descriptor::Column::Id.eq(file.cd_id()))
|
||||
.exec(&trx)
|
||||
.await?;
|
||||
|
||||
self.ctx
|
||||
.thumbnail_storage
|
||||
.delete_parent(&file.encoded_cd())
|
||||
.await?;
|
||||
self.ctx.main_storage.delete_file(file.cd()).await?;
|
||||
trx.commit().await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
@ -0,0 +1,151 @@
|
||||
use sea_orm::prelude::*;
|
||||
use tokio::io::AsyncReadExt;
|
||||
|
||||
use mediarepo_core::error::RepoResult;
|
||||
use mediarepo_database::entities::{content_descriptor, file, file_metadata};
|
||||
|
||||
use crate::dao::{DaoContext, DaoProvider};
|
||||
use crate::dto::{FileDto, FileMetadataDto, ThumbnailDto};
|
||||
|
||||
pub mod add;
|
||||
pub mod delete;
|
||||
pub mod find;
|
||||
pub mod update;
|
||||
|
||||
pub struct FileDao {
|
||||
ctx: DaoContext,
|
||||
}
|
||||
|
||||
impl DaoProvider for FileDao {
|
||||
fn dao_ctx(&self) -> DaoContext {
|
||||
self.ctx.clone()
|
||||
}
|
||||
}
|
||||
|
||||
impl FileDao {
|
||||
pub fn new(ctx: DaoContext) -> Self {
|
||||
Self { ctx }
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
pub async fn all(&self) -> RepoResult<Vec<FileDto>> {
|
||||
let files = file::Entity::find()
|
||||
.find_also_related(content_descriptor::Entity)
|
||||
.all(&self.ctx.db)
|
||||
.await?
|
||||
.into_iter()
|
||||
.filter_map(map_file_and_cd)
|
||||
.collect();
|
||||
|
||||
Ok(files)
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
#[inline]
|
||||
pub async fn by_id(&self, id: i64) -> RepoResult<Option<FileDto>> {
|
||||
self.all_by_id(vec![id]).await.map(|f| f.into_iter().next())
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
#[inline]
|
||||
pub async fn by_cd(&self, cd: Vec<u8>) -> RepoResult<Option<FileDto>> {
|
||||
self.all_by_cd(vec![cd]).await.map(|f| f.into_iter().next())
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
pub async fn all_by_cd(&self, cds: Vec<Vec<u8>>) -> RepoResult<Vec<FileDto>> {
|
||||
if cds.is_empty() {
|
||||
return Ok(vec![]);
|
||||
}
|
||||
|
||||
let files = file::Entity::find()
|
||||
.find_also_related(content_descriptor::Entity)
|
||||
.filter(content_descriptor::Column::Descriptor.is_in(cds))
|
||||
.all(&self.ctx.db)
|
||||
.await?
|
||||
.into_iter()
|
||||
.filter_map(map_file_and_cd)
|
||||
.collect();
|
||||
|
||||
Ok(files)
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
pub async fn all_by_id(&self, ids: Vec<i64>) -> RepoResult<Vec<FileDto>> {
|
||||
if ids.is_empty() {
|
||||
return Ok(vec![]);
|
||||
}
|
||||
|
||||
let files = file::Entity::find()
|
||||
.find_also_related(content_descriptor::Entity)
|
||||
.filter(file::Column::Id.is_in(ids))
|
||||
.all(&self.ctx.db)
|
||||
.await?
|
||||
.into_iter()
|
||||
.filter_map(map_file_and_cd)
|
||||
.collect();
|
||||
|
||||
Ok(files)
|
||||
}
|
||||
|
||||
pub async fn metadata(&self, file_id: i64) -> RepoResult<Option<FileMetadataDto>> {
|
||||
self.all_metadata(vec![file_id])
|
||||
.await
|
||||
.map(|m| m.into_iter().next())
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
pub async fn all_metadata(&self, file_ids: Vec<i64>) -> RepoResult<Vec<FileMetadataDto>> {
|
||||
if file_ids.is_empty() {
|
||||
return Ok(vec![]);
|
||||
}
|
||||
|
||||
let metadata = file_metadata::Entity::find()
|
||||
.filter(file_metadata::Column::FileId.is_in(file_ids))
|
||||
.all(&self.ctx.db)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|m| FileMetadataDto::new(m))
|
||||
.collect();
|
||||
|
||||
Ok(metadata)
|
||||
}
|
||||
|
||||
/// Returns all thumbnails for a cd
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
pub async fn thumbnails(&self, encoded_cd: String) -> RepoResult<Vec<ThumbnailDto>> {
|
||||
let thumbnails = self
|
||||
.ctx
|
||||
.thumbnail_storage
|
||||
.get_thumbnails(&encoded_cd)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|(size, path)| {
|
||||
ThumbnailDto::new(path, encoded_cd.clone(), size, String::from("image/png"))
|
||||
})
|
||||
.collect();
|
||||
|
||||
Ok(thumbnails)
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
pub async fn get_bytes(&self, cd: &[u8]) -> RepoResult<Vec<u8>> {
|
||||
let mut buf = Vec::new();
|
||||
let mut reader = self.ctx.main_storage.get_file(cd).await?.1;
|
||||
reader.read_to_end(&mut buf).await?;
|
||||
|
||||
Ok(buf)
|
||||
}
|
||||
}
|
||||
|
||||
fn map_file_and_cd(
|
||||
(file, cd): (file::Model, Option<content_descriptor::Model>),
|
||||
) -> Option<FileDto> {
|
||||
cd.map(|c| FileDto::new(file, c, None))
|
||||
}
|
||||
|
||||
fn map_cd_and_file(
|
||||
(cd, file): (content_descriptor::Model, Option<file::Model>),
|
||||
) -> Option<FileDto> {
|
||||
file.map(|f| FileDto::new(f, cd, None))
|
||||
}
|
@ -0,0 +1,95 @@
|
||||
use std::fmt::Debug;
|
||||
use std::io::Cursor;
|
||||
use std::str::FromStr;
|
||||
|
||||
use sea_orm::prelude::*;
|
||||
use sea_orm::ActiveValue::{Set, Unchanged};
|
||||
use sea_orm::{ConnectionTrait, NotSet};
|
||||
|
||||
use mediarepo_core::error::{RepoError, RepoResult};
|
||||
use mediarepo_core::fs::thumbnail_store::Dimensions;
|
||||
use mediarepo_core::thumbnailer;
|
||||
use mediarepo_core::thumbnailer::{ThumbnailSize};
|
||||
use mediarepo_database::entities::{content_descriptor, file, file_metadata};
|
||||
|
||||
use crate::dao::file::FileDao;
|
||||
use crate::dao::opt_to_active_val;
|
||||
use crate::dto::{FileDto, FileMetadataDto, ThumbnailDto, UpdateFileDto, UpdateFileMetadataDto};
|
||||
|
||||
impl FileDao {
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
pub async fn update(&self, update_dto: UpdateFileDto) -> RepoResult<FileDto> {
|
||||
let trx = self.ctx.db.begin().await?;
|
||||
let model = file::ActiveModel {
|
||||
id: Set(update_dto.id),
|
||||
cd_id: update_dto.cd_id.map(|v| Set(v)).unwrap_or(NotSet),
|
||||
mime_type: update_dto.mime_type.map(|v| Set(v)).unwrap_or(NotSet),
|
||||
status: update_dto.status.map(|v| Set(v as i32)).unwrap_or(NotSet),
|
||||
};
|
||||
let file_model = model.update(&trx).await?;
|
||||
let cd = file_model
|
||||
.find_related(content_descriptor::Entity)
|
||||
.one(&trx)
|
||||
.await?
|
||||
.ok_or_else(|| RepoError::from("Content descriptor not found"))?;
|
||||
trx.commit().await?;
|
||||
|
||||
Ok(FileDto::new(file_model, cd, None))
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
pub async fn update_metadata(
|
||||
&self,
|
||||
update_dto: UpdateFileMetadataDto,
|
||||
) -> RepoResult<FileMetadataDto> {
|
||||
let model = file_metadata::ActiveModel {
|
||||
file_id: Unchanged(update_dto.file_id),
|
||||
name: opt_to_active_val(update_dto.name),
|
||||
comment: opt_to_active_val(update_dto.comment),
|
||||
size: opt_to_active_val(update_dto.size),
|
||||
change_time: opt_to_active_val(update_dto.change_time),
|
||||
..Default::default()
|
||||
};
|
||||
let metadata = model.update(&self.ctx.db).await?;
|
||||
|
||||
Ok(FileMetadataDto::new(metadata))
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
pub async fn create_thumbnails<I: IntoIterator<Item = ThumbnailSize> + Debug>(
|
||||
&self,
|
||||
file: FileDto,
|
||||
sizes: I,
|
||||
) -> RepoResult<Vec<ThumbnailDto>> {
|
||||
let bytes = self.get_bytes(file.cd()).await?;
|
||||
let mime_type = mime::Mime::from_str(file.mime_type())
|
||||
.unwrap_or_else(|_| mime::APPLICATION_OCTET_STREAM);
|
||||
let thumbnails =
|
||||
thumbnailer::create_thumbnails(Cursor::new(bytes), mime_type.clone(), sizes)?;
|
||||
let mut dtos = Vec::new();
|
||||
|
||||
for thumbnail in thumbnails {
|
||||
let mut buf = Vec::new();
|
||||
let size = thumbnail.size();
|
||||
let size = Dimensions {
|
||||
height: size.1,
|
||||
width: size.0,
|
||||
};
|
||||
thumbnail.write_png(&mut buf)?;
|
||||
|
||||
let path = self
|
||||
.ctx
|
||||
.thumbnail_storage
|
||||
.add_thumbnail(file.encoded_cd(), size.clone(), &buf)
|
||||
.await?;
|
||||
dtos.push(ThumbnailDto::new(
|
||||
path,
|
||||
file.encoded_cd(),
|
||||
size,
|
||||
mime_type.to_string(),
|
||||
))
|
||||
}
|
||||
|
||||
Ok(dtos)
|
||||
}
|
||||
}
|
@ -0,0 +1,46 @@
|
||||
use crate::dao::job::JobDao;
|
||||
use mediarepo_core::content_descriptor::{
|
||||
convert_v1_descriptor_to_v2, encode_content_descriptor, is_v1_content_descriptor,
|
||||
};
|
||||
use mediarepo_core::error::RepoResult;
|
||||
use mediarepo_database::entities::content_descriptor;
|
||||
use sea_orm::prelude::*;
|
||||
use sea_orm::ActiveValue::Set;
|
||||
use sea_orm::ConnectionTrait;
|
||||
|
||||
impl JobDao {
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
pub async fn migrate_content_descriptors(&self) -> RepoResult<()> {
|
||||
let cds: Vec<content_descriptor::Model> =
|
||||
content_descriptor::Entity::find().all(&self.ctx.db).await?;
|
||||
|
||||
tracing::info!("Converting content descriptors to v2 format...");
|
||||
let mut converted_count = 0;
|
||||
|
||||
for cd in cds {
|
||||
if is_v1_content_descriptor(&cd.descriptor) {
|
||||
let trx = self.ctx.db.begin().await?;
|
||||
let src_cd = cd.descriptor;
|
||||
let dst_cd = convert_v1_descriptor_to_v2(&src_cd)?;
|
||||
|
||||
let _active_model = content_descriptor::ActiveModel {
|
||||
id: Set(cd.id),
|
||||
descriptor: Set(dst_cd.clone()),
|
||||
};
|
||||
self.ctx.main_storage.rename_file(&src_cd, &dst_cd).await?;
|
||||
self.ctx
|
||||
.thumbnail_storage
|
||||
.rename_parent(
|
||||
encode_content_descriptor(&src_cd),
|
||||
encode_content_descriptor(&dst_cd),
|
||||
)
|
||||
.await?;
|
||||
trx.commit().await?;
|
||||
converted_count += 1;
|
||||
}
|
||||
}
|
||||
tracing::info!("Converted {} descriptors", converted_count);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
@ -0,0 +1,20 @@
|
||||
pub mod migrate_content_descriptors;
|
||||
pub mod sqlite_operations;
|
||||
|
||||
use crate::dao::{DaoContext, DaoProvider};
|
||||
|
||||
pub struct JobDao {
|
||||
ctx: DaoContext,
|
||||
}
|
||||
|
||||
impl DaoProvider for JobDao {
|
||||
fn dao_ctx(&self) -> DaoContext {
|
||||
self.ctx.clone()
|
||||
}
|
||||
}
|
||||
|
||||
impl JobDao {
|
||||
pub fn new(ctx: DaoContext) -> JobDao {
|
||||
Self { ctx }
|
||||
}
|
||||
}
|
@ -0,0 +1,44 @@
|
||||
use crate::dao::job::JobDao;
|
||||
use mediarepo_core::error::RepoError::Corrupted;
|
||||
use mediarepo_core::error::RepoResult;
|
||||
use sea_orm::DatabaseBackend::Sqlite;
|
||||
use sea_orm::{ConnectionTrait, FromQueryResult, Statement};
|
||||
|
||||
#[derive(Debug, FromQueryResult)]
|
||||
struct IntegrityCheckResult {
|
||||
integrity_check: String,
|
||||
}
|
||||
|
||||
impl JobDao {
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
pub async fn check_integrity(&self) -> RepoResult<()> {
|
||||
let check_result: Option<IntegrityCheckResult> = IntegrityCheckResult::find_by_statement(
|
||||
Statement::from_string(Sqlite, String::from("PRAGMA integrity_check;")),
|
||||
)
|
||||
.one(&self.ctx.db)
|
||||
.await?;
|
||||
tracing::debug!("check result = {:?}", check_result);
|
||||
|
||||
check_result
|
||||
.ok_or_else(|| Corrupted(String::from("no check result")))
|
||||
.and_then(map_check_result)
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
pub async fn vacuum(&self) -> RepoResult<()> {
|
||||
self.ctx
|
||||
.db
|
||||
.execute(Statement::from_string(Sqlite, String::from("VACUUM;")))
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
fn map_check_result(result: IntegrityCheckResult) -> RepoResult<()> {
|
||||
if result.integrity_check == "ok" {
|
||||
Ok(())
|
||||
} else {
|
||||
Err(Corrupted(result.integrity_check))
|
||||
}
|
||||
}
|
@ -0,0 +1,41 @@
|
||||
use sea_orm::{ActiveValue, DatabaseConnection};
|
||||
|
||||
use mediarepo_core::fs::file_hash_store::FileHashStore;
|
||||
use mediarepo_core::fs::thumbnail_store::ThumbnailStore;
|
||||
|
||||
use crate::dao::file::FileDao;
|
||||
use crate::dao::job::JobDao;
|
||||
use crate::dao::tag::TagDao;
|
||||
|
||||
pub mod file;
|
||||
pub mod job;
|
||||
pub mod repo;
|
||||
pub mod tag;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct DaoContext {
|
||||
pub db: DatabaseConnection,
|
||||
pub main_storage: FileHashStore,
|
||||
pub thumbnail_storage: ThumbnailStore,
|
||||
}
|
||||
|
||||
pub trait DaoProvider {
|
||||
fn dao_ctx(&self) -> DaoContext;
|
||||
|
||||
fn file(&self) -> FileDao {
|
||||
FileDao::new(self.dao_ctx())
|
||||
}
|
||||
|
||||
fn tag(&self) -> TagDao {
|
||||
TagDao::new(self.dao_ctx())
|
||||
}
|
||||
|
||||
fn job(&self) -> JobDao {
|
||||
JobDao::new(self.dao_ctx())
|
||||
}
|
||||
}
|
||||
|
||||
fn opt_to_active_val<T: Into<sea_orm::Value>>(opt: Option<T>) -> ActiveValue<T> {
|
||||
opt.map(|v| ActiveValue::Set(v))
|
||||
.unwrap_or(ActiveValue::NotSet)
|
||||
}
|
@ -0,0 +1,81 @@
|
||||
use std::fmt::Debug;
|
||||
|
||||
use std::path::PathBuf;
|
||||
|
||||
use sea_orm::DatabaseConnection;
|
||||
|
||||
use mediarepo_core::error::RepoResult;
|
||||
use mediarepo_core::fs::file_hash_store::FileHashStore;
|
||||
use mediarepo_core::fs::thumbnail_store::ThumbnailStore;
|
||||
|
||||
use crate::dao::{DaoContext, DaoProvider};
|
||||
use mediarepo_database::get_database;
|
||||
use mediarepo_database::queries::analysis::{get_all_counts, Counts};
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Repo {
|
||||
db: DatabaseConnection,
|
||||
main_storage: FileHashStore,
|
||||
thumbnail_storage: ThumbnailStore,
|
||||
}
|
||||
|
||||
impl DaoProvider for Repo {
|
||||
fn dao_ctx(&self) -> DaoContext {
|
||||
DaoContext {
|
||||
db: self.db.clone(),
|
||||
main_storage: self.main_storage.clone(),
|
||||
thumbnail_storage: self.thumbnail_storage.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Repo {
|
||||
pub(crate) fn new(
|
||||
db: DatabaseConnection,
|
||||
file_store_path: PathBuf,
|
||||
thumb_store_path: PathBuf,
|
||||
) -> Self {
|
||||
Self {
|
||||
db,
|
||||
main_storage: FileHashStore::new(file_store_path),
|
||||
thumbnail_storage: ThumbnailStore::new(thumb_store_path),
|
||||
}
|
||||
}
|
||||
|
||||
/// Connects to the database with the given uri
|
||||
#[tracing::instrument(level = "debug")]
|
||||
pub async fn connect<S: AsRef<str> + Debug>(
|
||||
uri: S,
|
||||
file_store_path: PathBuf,
|
||||
thumb_store_path: PathBuf,
|
||||
) -> RepoResult<Self> {
|
||||
let db = get_database(uri).await?;
|
||||
Ok(Self::new(db, file_store_path, thumb_store_path))
|
||||
}
|
||||
|
||||
/// Returns the database of the repo for raw sql queries
|
||||
pub fn db(&self) -> &DatabaseConnection {
|
||||
&self.db
|
||||
}
|
||||
|
||||
/// Returns the size of the main storage
|
||||
#[inline]
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
pub async fn get_main_store_size(&self) -> RepoResult<u64> {
|
||||
self.main_storage.get_size().await
|
||||
}
|
||||
|
||||
/// Returns the size of the thumbnail storage
|
||||
#[inline]
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
pub async fn get_thumb_store_size(&self) -> RepoResult<u64> {
|
||||
self.thumbnail_storage.get_size().await
|
||||
}
|
||||
|
||||
/// Returns all entity counts
|
||||
#[inline]
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
pub async fn get_counts(&self) -> RepoResult<Counts> {
|
||||
get_all_counts(&self.db).await
|
||||
}
|
||||
}
|
@ -0,0 +1,137 @@
|
||||
use crate::dao::tag::{map_tag_dto, TagDao};
|
||||
use crate::dto::{AddTagDto, NamespaceDto, TagDto};
|
||||
use mediarepo_core::error::RepoResult;
|
||||
use mediarepo_database::entities::{namespace, tag};
|
||||
use sea_orm::prelude::*;
|
||||
use sea_orm::ActiveValue::Set;
|
||||
use sea_orm::{Condition, ConnectionTrait, DatabaseTransaction};
|
||||
use std::collections::HashMap;
|
||||
use std::iter::FromIterator;
|
||||
|
||||
impl TagDao {
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
pub async fn add_all(&self, mut tags: Vec<AddTagDto>) -> RepoResult<Vec<TagDto>> {
|
||||
let namespaces = tags.iter().filter_map(|t| t.namespace.clone()).collect();
|
||||
let trx = self.ctx.db.begin().await?;
|
||||
let existing_tags = tags_by_name(&trx, tags.clone()).await?;
|
||||
|
||||
if existing_tags.len() == tags.len() {
|
||||
return Ok(existing_tags);
|
||||
}
|
||||
let existing_tag_map: HashMap<String, TagDto> =
|
||||
HashMap::from_iter(existing_tags.into_iter().map(|t| (t.normalized_name(), t)));
|
||||
|
||||
tags.retain(|dto| !existing_tag_map.contains_key(&dto.normalized_name()));
|
||||
let namespace_map = add_or_get_all_namespaces(&trx, namespaces).await?;
|
||||
|
||||
if tags.is_empty() {
|
||||
return Ok(existing_tag_map.into_values().collect());
|
||||
}
|
||||
|
||||
let tag_models: Vec<tag::ActiveModel> = tags
|
||||
.iter()
|
||||
.map(|t| tag::ActiveModel {
|
||||
name: Set(t.name.to_owned()),
|
||||
namespace_id: Set(t
|
||||
.namespace
|
||||
.as_ref()
|
||||
.and_then(|n| namespace_map.get(n))
|
||||
.map(|n| n.id())),
|
||||
..Default::default()
|
||||
})
|
||||
.collect();
|
||||
tag::Entity::insert_many(tag_models).exec(&trx).await?;
|
||||
let mut tag_dtos = tags_by_name(&trx, tags).await?;
|
||||
trx.commit().await?;
|
||||
tag_dtos.append(&mut existing_tag_map.into_values().collect());
|
||||
|
||||
Ok(tag_dtos)
|
||||
}
|
||||
}
|
||||
|
||||
async fn add_or_get_all_namespaces(
|
||||
trx: &DatabaseTransaction,
|
||||
mut namespaces: Vec<String>,
|
||||
) -> RepoResult<HashMap<String, NamespaceDto>> {
|
||||
if namespaces.is_empty() {
|
||||
return Ok(HashMap::with_capacity(0));
|
||||
}
|
||||
let existing_namespaces = namespaces_by_name(trx, namespaces.clone()).await?;
|
||||
let mut namespace_map = HashMap::from_iter(
|
||||
existing_namespaces
|
||||
.into_iter()
|
||||
.map(|nsp| (nsp.name().to_owned(), nsp)),
|
||||
);
|
||||
if namespaces.len() == namespace_map.len() {
|
||||
return Ok(namespace_map);
|
||||
}
|
||||
namespaces.retain(|nsp| !namespace_map.contains_key(nsp));
|
||||
if namespaces.is_empty() {
|
||||
return Ok(namespace_map);
|
||||
}
|
||||
let namespace_models: Vec<namespace::ActiveModel> = namespaces
|
||||
.iter()
|
||||
.map(|nsp| namespace::ActiveModel {
|
||||
name: Set(nsp.to_owned()),
|
||||
..Default::default()
|
||||
})
|
||||
.collect();
|
||||
namespace::Entity::insert_many(namespace_models)
|
||||
.exec(trx)
|
||||
.await?;
|
||||
let additional_namespaces = namespaces_by_name(trx, namespaces.clone()).await?;
|
||||
|
||||
for nsp in additional_namespaces {
|
||||
namespace_map.insert(nsp.name().to_owned(), nsp);
|
||||
}
|
||||
|
||||
Ok(namespace_map)
|
||||
}
|
||||
|
||||
async fn namespaces_by_name(
|
||||
trx: &DatabaseTransaction,
|
||||
names: Vec<String>,
|
||||
) -> RepoResult<Vec<NamespaceDto>> {
|
||||
if names.is_empty() {
|
||||
return Ok(vec![]);
|
||||
}
|
||||
let namespaces: Vec<NamespaceDto> = namespace::Entity::find()
|
||||
.filter(namespace::Column::Name.is_in(names))
|
||||
.all(trx)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(NamespaceDto::new)
|
||||
.collect();
|
||||
|
||||
Ok(namespaces)
|
||||
}
|
||||
|
||||
async fn tags_by_name(trx: &DatabaseTransaction, tags: Vec<AddTagDto>) -> RepoResult<Vec<TagDto>> {
|
||||
if tags.is_empty() {
|
||||
return Ok(vec![]);
|
||||
}
|
||||
let condition = tags
|
||||
.into_iter()
|
||||
.map(build_tag_condition)
|
||||
.fold(Condition::any(), Condition::add);
|
||||
let tags = tag::Entity::find()
|
||||
.find_also_related(namespace::Entity)
|
||||
.filter(condition)
|
||||
.all(trx)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(map_tag_dto)
|
||||
.collect();
|
||||
|
||||
Ok(tags)
|
||||
}
|
||||
|
||||
fn build_tag_condition(tag: AddTagDto) -> Condition {
|
||||
if let Some(namespace) = tag.namespace {
|
||||
Condition::all()
|
||||
.add(tag::Column::Name.eq(tag.name))
|
||||
.add(namespace::Column::Name.eq(namespace))
|
||||
} else {
|
||||
Condition::all().add(tag::Column::Name.eq(tag.name))
|
||||
}
|
||||
}
|
@ -0,0 +1,64 @@
|
||||
use crate::dao::tag::{map_tag_dto, TagDao};
|
||||
use crate::dto::TagDto;
|
||||
use mediarepo_core::error::RepoResult;
|
||||
use mediarepo_database::entities::{namespace, tag};
|
||||
use sea_orm::prelude::*;
|
||||
use sea_orm::sea_query::Expr;
|
||||
use sea_orm::{Condition, QuerySelect};
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct TagByNameQuery {
|
||||
pub namespace: Option<String>,
|
||||
pub name: String,
|
||||
}
|
||||
|
||||
impl TagDao {
|
||||
/// Filters all tags by names
|
||||
/// wildcards are supported
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
pub async fn all_by_name(&self, names: Vec<TagByNameQuery>) -> RepoResult<Vec<TagDto>> {
|
||||
let mut condition_count = 0;
|
||||
let condition = names
|
||||
.into_iter()
|
||||
.filter_map(name_query_to_condition)
|
||||
.inspect(|_| condition_count += 1)
|
||||
.fold(Condition::any(), Condition::add);
|
||||
if condition_count == 0 {
|
||||
return Ok(vec![]);
|
||||
}
|
||||
|
||||
let tags = tag::Entity::find()
|
||||
.find_also_related(namespace::Entity)
|
||||
.filter(condition)
|
||||
.group_by(tag::Column::Id)
|
||||
.all(&self.ctx.db)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(map_tag_dto)
|
||||
.collect();
|
||||
|
||||
Ok(tags)
|
||||
}
|
||||
}
|
||||
|
||||
fn name_query_to_condition(query: TagByNameQuery) -> Option<Condition> {
|
||||
let TagByNameQuery { namespace, name } = query;
|
||||
let mut condition = Condition::all();
|
||||
|
||||
if !name.ends_with('*') {
|
||||
condition = condition.add(tag::Column::Name.eq(name))
|
||||
} else if name.len() > 1 {
|
||||
condition =
|
||||
condition.add(tag::Column::Name.like(&*format!("{}%", name.trim_end_matches("*"))))
|
||||
} else if namespace.is_none() {
|
||||
return None;
|
||||
}
|
||||
|
||||
condition = if let Some(namespace) = namespace {
|
||||
condition.add(namespace::Column::Name.eq(namespace))
|
||||
} else {
|
||||
condition.add(Expr::tbl(tag::Entity, tag::Column::NamespaceId).is_null())
|
||||
};
|
||||
|
||||
Some(condition)
|
||||
}
|
@ -0,0 +1,66 @@
|
||||
use sea_orm::{ConnectionTrait, DatabaseTransaction};
|
||||
use sea_orm::ActiveValue::Set;
|
||||
use sea_orm::prelude::*;
|
||||
|
||||
use mediarepo_core::error::RepoResult;
|
||||
use mediarepo_database::entities::content_descriptor_tag;
|
||||
|
||||
use crate::dao::tag::TagDao;
|
||||
|
||||
impl TagDao {
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
pub async fn upsert_mappings(&self, cd_ids: Vec<i64>, tag_ids: Vec<i64>) -> RepoResult<()> {
|
||||
let trx = self.ctx.db.begin().await?;
|
||||
|
||||
let existing_mappings = get_existing_mappings(&trx, &cd_ids, &tag_ids).await?;
|
||||
|
||||
let active_models: Vec<content_descriptor_tag::ActiveModel> = cd_ids
|
||||
.into_iter()
|
||||
.flat_map(|cd_id: i64| {
|
||||
tag_ids
|
||||
.iter()
|
||||
.filter(|tag_id| !existing_mappings.contains(&(cd_id, **tag_id)))
|
||||
.map(move |tag_id| content_descriptor_tag::ActiveModel {
|
||||
cd_id: Set(cd_id),
|
||||
tag_id: Set(*tag_id),
|
||||
})
|
||||
.collect::<Vec<content_descriptor_tag::ActiveModel>>()
|
||||
})
|
||||
.collect();
|
||||
|
||||
content_descriptor_tag::Entity::insert_many(active_models)
|
||||
.exec(&trx)
|
||||
.await?;
|
||||
|
||||
trx.commit().await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
pub async fn remove_mappings(&self, cd_ids: Vec<i64>, tag_ids: Vec<i64>) -> RepoResult<()> {
|
||||
content_descriptor_tag::Entity::delete_many()
|
||||
.filter(content_descriptor_tag::Column::CdId.is_in(cd_ids))
|
||||
.filter(content_descriptor_tag::Column::TagId.is_in(tag_ids))
|
||||
.exec(&self.ctx.db)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_existing_mappings(
|
||||
trx: &DatabaseTransaction,
|
||||
cd_ids: &Vec<i64>,
|
||||
tag_ids: &Vec<i64>,
|
||||
) -> RepoResult<Vec<(i64, i64)>> {
|
||||
let existing_mappings: Vec<(i64, i64)> = content_descriptor_tag::Entity::find()
|
||||
.filter(content_descriptor_tag::Column::CdId.is_in(cd_ids.clone()))
|
||||
.filter(content_descriptor_tag::Column::TagId.is_in(tag_ids.clone()))
|
||||
.all(trx)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|model: content_descriptor_tag::Model| (model.tag_id, model.cd_id))
|
||||
.collect();
|
||||
Ok(existing_mappings)
|
||||
}
|
@ -0,0 +1,174 @@
|
||||
use sea_orm::prelude::*;
|
||||
use sea_orm::JoinType;
|
||||
use sea_orm::QuerySelect;
|
||||
use std::collections::HashMap;
|
||||
use std::iter::FromIterator;
|
||||
|
||||
use mediarepo_core::error::RepoResult;
|
||||
use mediarepo_core::itertools::Itertools;
|
||||
|
||||
use mediarepo_core::utils::parse_namespace_and_tag;
|
||||
use mediarepo_database::entities::{content_descriptor, content_descriptor_tag, namespace, tag};
|
||||
|
||||
use crate::dao::tag::by_name::TagByNameQuery;
|
||||
use crate::dao::{DaoContext, DaoProvider};
|
||||
use crate::dto::{NamespaceDto, TagDto};
|
||||
|
||||
pub mod add;
|
||||
pub mod by_name;
|
||||
pub mod mappings;
|
||||
|
||||
pub struct TagDao {
|
||||
ctx: DaoContext,
|
||||
}
|
||||
|
||||
impl DaoProvider for TagDao {
|
||||
fn dao_ctx(&self) -> DaoContext {
|
||||
self.ctx.clone()
|
||||
}
|
||||
}
|
||||
|
||||
impl TagDao {
|
||||
pub fn new(ctx: DaoContext) -> Self {
|
||||
Self { ctx }
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
pub async fn all(&self) -> RepoResult<Vec<TagDto>> {
|
||||
let tags = tag::Entity::find()
|
||||
.find_also_related(namespace::Entity)
|
||||
.all(&self.ctx.db)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(map_tag_dto)
|
||||
.collect();
|
||||
|
||||
Ok(tags)
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
pub async fn all_namespaces(&self) -> RepoResult<Vec<NamespaceDto>> {
|
||||
let namespaces = namespace::Entity::find()
|
||||
.all(&self.ctx.db)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(NamespaceDto::new)
|
||||
.collect();
|
||||
|
||||
Ok(namespaces)
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "debug", skip(self, cds))]
|
||||
pub async fn all_for_cds(&self, cds: Vec<Vec<u8>>) -> RepoResult<Vec<TagDto>> {
|
||||
let tags = tag::Entity::find()
|
||||
.find_also_related(namespace::Entity)
|
||||
.join(
|
||||
JoinType::LeftJoin,
|
||||
content_descriptor_tag::Relation::Tag.def().rev(),
|
||||
)
|
||||
.join(
|
||||
JoinType::InnerJoin,
|
||||
content_descriptor_tag::Relation::ContentDescriptorId.def(),
|
||||
)
|
||||
.filter(content_descriptor::Column::Descriptor.is_in(cds))
|
||||
.group_by(tag::Column::Id)
|
||||
.all(&self.ctx.db)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(map_tag_dto)
|
||||
.collect();
|
||||
|
||||
Ok(tags)
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "debug", skip(self, cds))]
|
||||
pub async fn all_for_cds_map(
|
||||
&self,
|
||||
cds: Vec<Vec<u8>>,
|
||||
) -> RepoResult<HashMap<Vec<u8>, Vec<TagDto>>> {
|
||||
let tag_cd_entries: Vec<(
|
||||
content_descriptor_tag::Model,
|
||||
Option<content_descriptor::Model>,
|
||||
)> = content_descriptor_tag::Entity::find()
|
||||
.find_also_related(content_descriptor::Entity)
|
||||
.filter(content_descriptor::Column::Descriptor.is_in(cds))
|
||||
.all(&self.ctx.db)
|
||||
.await?;
|
||||
|
||||
let tag_ids: Vec<i64> = tag_cd_entries
|
||||
.iter()
|
||||
.map(|(t, _)| t.tag_id)
|
||||
.unique()
|
||||
.collect();
|
||||
|
||||
let tags: Vec<TagDto> = tag::Entity::find()
|
||||
.find_also_related(namespace::Entity)
|
||||
.filter(tag::Column::Id.is_in(tag_ids))
|
||||
.all(&self.ctx.db)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(map_tag_dto)
|
||||
.collect();
|
||||
|
||||
let tag_id_map = tags
|
||||
.into_iter()
|
||||
.map(|t| (t.id(), t))
|
||||
.collect::<HashMap<i64, TagDto>>();
|
||||
let cd_tag_map = tag_cd_entries
|
||||
.into_iter()
|
||||
.filter_map(|(t, cd)| Some((cd?, tag_id_map.get(&t.tag_id)?.clone())))
|
||||
.sorted_by_key(|(cd, _)| cd.id)
|
||||
.group_by(|(cd, _)| cd.descriptor.to_owned())
|
||||
.into_iter()
|
||||
.map(|(key, group)| (key, group.map(|(_, t)| t).collect::<Vec<TagDto>>()))
|
||||
.collect();
|
||||
|
||||
Ok(cd_tag_map)
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
pub async fn tags_for_cd(&self, cd_id: i64) -> RepoResult<Vec<TagDto>> {
|
||||
let tags = tag::Entity::find()
|
||||
.find_also_related(namespace::Entity)
|
||||
.join(
|
||||
JoinType::LeftJoin,
|
||||
content_descriptor_tag::Relation::Tag.def().rev(),
|
||||
)
|
||||
.join(
|
||||
JoinType::InnerJoin,
|
||||
content_descriptor_tag::Relation::ContentDescriptorId.def(),
|
||||
)
|
||||
.filter(content_descriptor::Column::Id.eq(cd_id))
|
||||
.all(&self.ctx.db)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(map_tag_dto)
|
||||
.collect();
|
||||
|
||||
Ok(tags)
|
||||
}
|
||||
|
||||
/// Returns a map mapping tag names to ids
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
pub async fn normalized_tags_to_ids(
|
||||
&self,
|
||||
names: Vec<String>,
|
||||
) -> RepoResult<HashMap<String, i64>> {
|
||||
let queries = names
|
||||
.into_iter()
|
||||
.map(parse_namespace_and_tag)
|
||||
.map(|(namespace, name)| TagByNameQuery { namespace, name })
|
||||
.collect();
|
||||
let tags = self.all_by_name(queries).await?;
|
||||
let tag_map = HashMap::from_iter(
|
||||
tags.into_iter()
|
||||
.map(|tag| (tag.normalized_name(), tag.id())),
|
||||
);
|
||||
|
||||
Ok(tag_map)
|
||||
}
|
||||
}
|
||||
|
||||
fn map_tag_dto(result: (tag::Model, Option<namespace::Model>)) -> TagDto {
|
||||
TagDto::new(result.0, result.1)
|
||||
}
|
@ -0,0 +1,112 @@
|
||||
use chrono::NaiveDateTime;
|
||||
|
||||
use mediarepo_core::content_descriptor::encode_content_descriptor;
|
||||
use mediarepo_core::mediarepo_api::types::files::FileStatus as ApiFileStatus;
|
||||
use mediarepo_database::entities::content_descriptor;
|
||||
use mediarepo_database::entities::file;
|
||||
use mediarepo_database::entities::file_metadata;
|
||||
|
||||
use crate::dto::FileMetadataDto;
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct FileDto {
|
||||
model: file::Model,
|
||||
content_descriptor: content_descriptor::Model,
|
||||
metadata: Option<FileMetadataDto>,
|
||||
}
|
||||
|
||||
impl FileDto {
|
||||
pub(crate) fn new(
|
||||
model: file::Model,
|
||||
content_descriptor: content_descriptor::Model,
|
||||
metadata: Option<file_metadata::Model>,
|
||||
) -> Self {
|
||||
Self {
|
||||
model,
|
||||
content_descriptor,
|
||||
metadata: metadata.map(FileMetadataDto::new),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn id(&self) -> i64 {
|
||||
self.model.id
|
||||
}
|
||||
|
||||
pub fn cd_id(&self) -> i64 {
|
||||
self.model.cd_id
|
||||
}
|
||||
|
||||
pub fn cd(&self) -> &[u8] {
|
||||
&self.content_descriptor.descriptor
|
||||
}
|
||||
|
||||
pub fn encoded_cd(&self) -> String {
|
||||
encode_content_descriptor(&self.content_descriptor.descriptor)
|
||||
}
|
||||
|
||||
pub fn status(&self) -> FileStatus {
|
||||
match self.model.status {
|
||||
10 => FileStatus::Imported,
|
||||
20 => FileStatus::Archived,
|
||||
30 => FileStatus::Deleted,
|
||||
_ => FileStatus::Imported,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn mime_type(&self) -> &String {
|
||||
&self.model.mime_type
|
||||
}
|
||||
|
||||
pub fn metadata(&self) -> Option<&FileMetadataDto> {
|
||||
self.metadata.as_ref()
|
||||
}
|
||||
|
||||
pub fn into_metadata(self) -> Option<FileMetadataDto> {
|
||||
self.metadata
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct AddFileDto {
|
||||
pub content: Vec<u8>,
|
||||
pub mime_type: String,
|
||||
pub creation_time: NaiveDateTime,
|
||||
pub change_time: NaiveDateTime,
|
||||
pub name: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct UpdateFileDto {
|
||||
pub id: i64,
|
||||
pub cd_id: Option<i64>,
|
||||
pub mime_type: Option<String>,
|
||||
pub status: Option<FileStatus>,
|
||||
}
|
||||
|
||||
impl Default for UpdateFileDto {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
id: 0,
|
||||
cd_id: None,
|
||||
mime_type: None,
|
||||
status: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Debug)]
|
||||
pub enum FileStatus {
|
||||
Imported = 10,
|
||||
Archived = 20,
|
||||
Deleted = 30,
|
||||
}
|
||||
|
||||
impl From<ApiFileStatus> for FileStatus {
|
||||
fn from(s: ApiFileStatus) -> Self {
|
||||
match s {
|
||||
ApiFileStatus::Imported => Self::Imported,
|
||||
ApiFileStatus::Archived => Self::Archived,
|
||||
ApiFileStatus::Deleted => Self::Deleted,
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,51 @@
|
||||
use chrono::NaiveDateTime;
|
||||
|
||||
use mediarepo_database::entities::file_metadata;
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct FileMetadataDto {
|
||||
model: file_metadata::Model,
|
||||
}
|
||||
|
||||
impl FileMetadataDto {
|
||||
pub(crate) fn new(model: file_metadata::Model) -> Self {
|
||||
Self { model }
|
||||
}
|
||||
|
||||
pub fn file_id(&self) -> i64 {
|
||||
self.model.file_id
|
||||
}
|
||||
|
||||
pub fn name(&self) -> Option<&String> {
|
||||
self.model.name.as_ref()
|
||||
}
|
||||
|
||||
pub fn comment(&self) -> Option<&String> {
|
||||
self.model.comment.as_ref()
|
||||
}
|
||||
|
||||
pub fn size(&self) -> i64 {
|
||||
self.model.size
|
||||
}
|
||||
|
||||
pub fn import_time(&self) -> NaiveDateTime {
|
||||
self.model.import_time
|
||||
}
|
||||
|
||||
pub fn creation_time(&self) -> NaiveDateTime {
|
||||
self.model.creation_time
|
||||
}
|
||||
|
||||
pub fn change_time(&self) -> NaiveDateTime {
|
||||
self.model.change_time
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Default)]
|
||||
pub struct UpdateFileMetadataDto {
|
||||
pub file_id: i64,
|
||||
pub name: Option<Option<String>>,
|
||||
pub comment: Option<Option<String>>,
|
||||
pub size: Option<i64>,
|
||||
pub change_time: Option<NaiveDateTime>,
|
||||
}
|
@ -0,0 +1,12 @@
|
||||
pub use file::*;
|
||||
pub use file_metadata::*;
|
||||
pub use namespace::*;
|
||||
pub use tag::*;
|
||||
pub use thumbnail::*;
|
||||
|
||||
mod file;
|
||||
mod file_metadata;
|
||||
mod tag;
|
||||
mod namespace;
|
||||
mod thumbnail;
|
||||
|
@ -0,0 +1,20 @@
|
||||
use mediarepo_database::entities::namespace;
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct NamespaceDto {
|
||||
model: namespace::Model,
|
||||
}
|
||||
|
||||
impl NamespaceDto {
|
||||
pub(crate) fn new(model: namespace::Model) -> Self {
|
||||
Self {model}
|
||||
}
|
||||
|
||||
pub fn id(&self) -> i64 {
|
||||
self.model.id
|
||||
}
|
||||
|
||||
pub fn name(&self) -> &String {
|
||||
&self.model.name
|
||||
}
|
||||
}
|
@ -0,0 +1,62 @@
|
||||
pub use mediarepo_database::entities::namespace;
|
||||
pub use mediarepo_database::entities::tag;
|
||||
|
||||
use crate::dto::NamespaceDto;
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct TagDto {
|
||||
model: tag::Model,
|
||||
namespace: Option<NamespaceDto>,
|
||||
}
|
||||
|
||||
impl TagDto {
|
||||
pub(crate) fn new(model: tag::Model, namespace_model: Option<namespace::Model>) -> Self {
|
||||
Self {
|
||||
model,
|
||||
namespace: namespace_model.map(NamespaceDto::new),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn id(&self) -> i64 {
|
||||
self.model.id
|
||||
}
|
||||
|
||||
pub fn name(&self) -> &String {
|
||||
&self.model.name
|
||||
}
|
||||
|
||||
pub fn namespace(&self) -> Option<&NamespaceDto> {
|
||||
self.namespace.as_ref()
|
||||
}
|
||||
|
||||
/// Returns the normalized name of the tag (namespace:tag)
|
||||
pub fn normalized_name(&self) -> String {
|
||||
if let Some(namespace) = &self.namespace {
|
||||
format!("{}:{}", namespace.name(), self.name())
|
||||
} else {
|
||||
self.name().to_owned()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct AddTagDto {
|
||||
pub namespace: Option<String>,
|
||||
pub name: String,
|
||||
}
|
||||
|
||||
impl AddTagDto {
|
||||
pub fn from_tuple(tuple: (Option<String>, String)) -> Self {
|
||||
let (namespace, name) = tuple;
|
||||
Self { namespace, name }
|
||||
}
|
||||
|
||||
/// Returns the normalized name of the tag (namespace:tag)
|
||||
pub fn normalized_name(&self) -> String {
|
||||
if let Some(namespace) = &self.namespace {
|
||||
format!("{}:{}", namespace, &self.name)
|
||||
} else {
|
||||
self.name.to_owned()
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,53 @@
|
||||
use std::path::PathBuf;
|
||||
|
||||
use tokio::fs;
|
||||
use tokio::fs::{File, OpenOptions};
|
||||
use tokio::io::BufReader;
|
||||
|
||||
use mediarepo_core::error::RepoResult;
|
||||
use mediarepo_core::fs::thumbnail_store::Dimensions;
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct ThumbnailDto {
|
||||
path: PathBuf,
|
||||
parent_cd: String,
|
||||
size: Dimensions,
|
||||
mime_type: String,
|
||||
}
|
||||
|
||||
impl ThumbnailDto {
|
||||
pub fn new(path: PathBuf, parent_cd: String, size: Dimensions, mime_type: String) -> Self {
|
||||
Self {
|
||||
path,
|
||||
parent_cd,
|
||||
size,
|
||||
mime_type,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn parent_cd(&self) -> &String {
|
||||
&self.parent_cd
|
||||
}
|
||||
|
||||
pub fn size(&self) -> &Dimensions {
|
||||
&self.size
|
||||
}
|
||||
|
||||
pub fn mime_type(&self) -> &String {
|
||||
&self.mime_type
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "debug")]
|
||||
pub async fn get_reader(&self) -> RepoResult<BufReader<File>> {
|
||||
let file = OpenOptions::new().read(true).open(&self.path).await?;
|
||||
Ok(BufReader::new(file))
|
||||
}
|
||||
|
||||
/// Deletes the thumbnail
|
||||
#[tracing::instrument(level = "debug")]
|
||||
pub async fn delete(self) -> RepoResult<()> {
|
||||
fs::remove_file(&self.path).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
@ -0,0 +1,3 @@
|
||||
pub mod dao;
|
||||
pub mod dto;
|
||||
pub mod type_keys;
|
@ -1,7 +1,9 @@
|
||||
use crate::repo::Repo;
|
||||
use std::sync::Arc;
|
||||
|
||||
use typemap_rev::TypeMapKey;
|
||||
|
||||
use crate::dao::repo::Repo;
|
||||
|
||||
pub struct RepoKey;
|
||||
|
||||
impl TypeMapKey for RepoKey {
|
@ -1,101 +0,0 @@
|
||||
use crate::file::File;
|
||||
use mediarepo_core::content_descriptor::convert_v1_descriptor_to_v2;
|
||||
use mediarepo_core::error::RepoResult;
|
||||
use mediarepo_database::entities::content_descriptor;
|
||||
use mediarepo_database::entities::file;
|
||||
use sea_orm::prelude::*;
|
||||
use sea_orm::{DatabaseConnection, Set};
|
||||
use std::fmt::Debug;
|
||||
|
||||
pub struct ContentDescriptor {
|
||||
db: DatabaseConnection,
|
||||
model: content_descriptor::Model,
|
||||
}
|
||||
|
||||
impl ContentDescriptor {
|
||||
#[tracing::instrument(level = "trace")]
|
||||
pub(crate) fn new(db: DatabaseConnection, model: content_descriptor::Model) -> Self {
|
||||
Self { db, model }
|
||||
}
|
||||
|
||||
pub async fn all(db: DatabaseConnection) -> RepoResult<Vec<Self>> {
|
||||
let descriptors = content_descriptor::Entity::find()
|
||||
.all(&db)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|model| Self::new(db.clone(), model))
|
||||
.collect();
|
||||
|
||||
Ok(descriptors)
|
||||
}
|
||||
|
||||
/// Searches for the hash by id
|
||||
#[tracing::instrument(level = "debug", skip(db))]
|
||||
pub async fn by_id(db: DatabaseConnection, id: i64) -> RepoResult<Option<Self>> {
|
||||
let hash = content_descriptor::Entity::find_by_id(id)
|
||||
.one(&db)
|
||||
.await?
|
||||
.map(|model| Self::new(db, model));
|
||||
|
||||
Ok(hash)
|
||||
}
|
||||
|
||||
/// Returns the hash by value
|
||||
#[tracing::instrument(level = "debug", skip(db))]
|
||||
pub async fn by_value<D: AsRef<[u8]> + Debug>(
|
||||
db: DatabaseConnection,
|
||||
descriptor: D,
|
||||
) -> RepoResult<Option<Self>> {
|
||||
let cid = content_descriptor::Entity::find()
|
||||
.filter(content_descriptor::Column::Descriptor.eq(descriptor.as_ref()))
|
||||
.one(&db)
|
||||
.await?
|
||||
.map(|model| Self::new(db, model));
|
||||
|
||||
Ok(cid)
|
||||
}
|
||||
|
||||
/// Adds a new hash to the database
|
||||
#[tracing::instrument(level = "debug", skip(db))]
|
||||
pub async fn add(db: DatabaseConnection, descriptor: Vec<u8>) -> RepoResult<Self> {
|
||||
let active_model = content_descriptor::ActiveModel {
|
||||
descriptor: Set(descriptor),
|
||||
..Default::default()
|
||||
};
|
||||
let model = active_model.insert(&db).await?;
|
||||
|
||||
Ok(Self::new(db, model))
|
||||
}
|
||||
|
||||
pub fn id(&self) -> i64 {
|
||||
self.model.id
|
||||
}
|
||||
|
||||
pub fn descriptor(&self) -> &[u8] {
|
||||
&self.model.descriptor[..]
|
||||
}
|
||||
|
||||
/// Returns the file associated with the hash
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
pub async fn file(&self) -> RepoResult<Option<File>> {
|
||||
let file = self
|
||||
.model
|
||||
.find_related(file::Entity)
|
||||
.one(&self.db)
|
||||
.await?
|
||||
.map(|file_model| File::new(self.db.clone(), file_model, self.model.clone()));
|
||||
|
||||
Ok(file)
|
||||
}
|
||||
|
||||
pub async fn convert_v1_to_v2(&mut self) -> RepoResult<()> {
|
||||
let descriptor = convert_v1_descriptor_to_v2(&self.model.descriptor)?;
|
||||
let active_model = content_descriptor::ActiveModel {
|
||||
id: Set(self.id()),
|
||||
descriptor: Set(descriptor),
|
||||
};
|
||||
self.model = active_model.update(&self.db).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
@ -1,334 +0,0 @@
|
||||
pub mod filter;
|
||||
|
||||
use std::fmt::Debug;
|
||||
use std::io::Cursor;
|
||||
use std::str::FromStr;
|
||||
|
||||
use mediarepo_core::content_descriptor::encode_content_descriptor;
|
||||
use sea_orm::prelude::*;
|
||||
use sea_orm::{ConnectionTrait, DatabaseConnection, Set};
|
||||
use sea_orm::{JoinType, QuerySelect};
|
||||
use tokio::io::{AsyncReadExt, BufReader};
|
||||
|
||||
use crate::file::filter::FilterProperty;
|
||||
use crate::file_metadata::FileMetadata;
|
||||
use mediarepo_core::error::{RepoError, RepoResult};
|
||||
use mediarepo_core::fs::file_hash_store::FileHashStore;
|
||||
use mediarepo_core::mediarepo_api::types::files::FileStatus as ApiFileStatus;
|
||||
use mediarepo_core::thumbnailer::{self, Thumbnail as ThumbnailerThumb, ThumbnailSize};
|
||||
use mediarepo_database::entities::content_descriptor;
|
||||
use mediarepo_database::entities::content_descriptor_tag;
|
||||
use mediarepo_database::entities::file;
|
||||
use mediarepo_database::entities::file_metadata;
|
||||
use mediarepo_database::entities::namespace;
|
||||
use mediarepo_database::entities::tag;
|
||||
|
||||
use crate::tag::Tag;
|
||||
|
||||
pub enum FileStatus {
|
||||
Imported = 10,
|
||||
Archived = 20,
|
||||
Deleted = 30,
|
||||
}
|
||||
|
||||
impl From<ApiFileStatus> for FileStatus {
|
||||
fn from(s: ApiFileStatus) -> Self {
|
||||
match s {
|
||||
ApiFileStatus::Imported => Self::Imported,
|
||||
ApiFileStatus::Archived => Self::Archived,
|
||||
ApiFileStatus::Deleted => Self::Deleted,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct File {
|
||||
db: DatabaseConnection,
|
||||
model: file::Model,
|
||||
content_descriptor: content_descriptor::Model,
|
||||
}
|
||||
|
||||
impl File {
|
||||
#[tracing::instrument(level = "trace")]
|
||||
pub(crate) fn new(
|
||||
db: DatabaseConnection,
|
||||
model: file::Model,
|
||||
hash: content_descriptor::Model,
|
||||
) -> Self {
|
||||
Self {
|
||||
db,
|
||||
model,
|
||||
content_descriptor: hash,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns a list of all known stored files
|
||||
#[tracing::instrument(level = "debug", skip(db))]
|
||||
pub async fn all(db: DatabaseConnection) -> RepoResult<Vec<File>> {
|
||||
let files: Vec<(file::Model, Option<content_descriptor::Model>)> = file::Entity::find()
|
||||
.find_also_related(content_descriptor::Entity)
|
||||
.all(&db)
|
||||
.await?;
|
||||
let files = files
|
||||
.into_iter()
|
||||
.filter_map(|(f, h)| {
|
||||
let h = h?;
|
||||
Some(Self::new(db.clone(), f, h))
|
||||
})
|
||||
.collect();
|
||||
|
||||
Ok(files)
|
||||
}
|
||||
|
||||
/// Fetches the file by id
|
||||
#[tracing::instrument(level = "debug", skip(db))]
|
||||
pub async fn by_id(db: DatabaseConnection, id: i64) -> RepoResult<Option<Self>> {
|
||||
if let Some((model, Some(hash))) = file::Entity::find_by_id(id)
|
||||
.find_also_related(content_descriptor::Entity)
|
||||
.one(&db)
|
||||
.await?
|
||||
{
|
||||
let file = File::new(db, model, hash);
|
||||
Ok(Some(file))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
/// Finds the file by hash
|
||||
#[tracing::instrument(level = "debug", skip(db))]
|
||||
pub async fn by_cd(db: DatabaseConnection, cd: &[u8]) -> RepoResult<Option<Self>> {
|
||||
if let Some((hash, Some(model))) = content_descriptor::Entity::find()
|
||||
.filter(content_descriptor::Column::Descriptor.eq(cd))
|
||||
.find_also_related(file::Entity)
|
||||
.one(&db)
|
||||
.await?
|
||||
{
|
||||
let file = File::new(db, model, hash);
|
||||
Ok(Some(file))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
/// Finds the file by tags
|
||||
#[tracing::instrument(level = "debug", skip(db))]
|
||||
pub(crate) async fn find_by_filters(
|
||||
db: DatabaseConnection,
|
||||
filters: Vec<Vec<FilterProperty>>,
|
||||
) -> RepoResult<Vec<Self>> {
|
||||
let main_condition = filter::build_find_filter_conditions(filters);
|
||||
|
||||
let results: Vec<(content_descriptor::Model, Option<file::Model>)> =
|
||||
content_descriptor::Entity::find()
|
||||
.find_also_related(file::Entity)
|
||||
.filter(main_condition)
|
||||
.group_by(file::Column::Id)
|
||||
.all(&db)
|
||||
.await?;
|
||||
let files: Vec<Self> = results
|
||||
.into_iter()
|
||||
.filter_map(|(hash, tag)| Some(Self::new(db.clone(), tag?, hash)))
|
||||
.collect();
|
||||
|
||||
Ok(files)
|
||||
}
|
||||
|
||||
/// Adds a file with its hash to the database
|
||||
#[tracing::instrument(level = "debug", skip(db))]
|
||||
pub(crate) async fn add(
|
||||
db: DatabaseConnection,
|
||||
cd_id: i64,
|
||||
mime_type: String,
|
||||
) -> RepoResult<Self> {
|
||||
let file = file::ActiveModel {
|
||||
cd_id: Set(cd_id),
|
||||
mime_type: Set(mime_type),
|
||||
..Default::default()
|
||||
};
|
||||
let file: file::ActiveModel = file.insert(&db).await?.into();
|
||||
let file = Self::by_id(db, file.id.unwrap())
|
||||
.await?
|
||||
.expect("Inserted file does not exist");
|
||||
|
||||
Ok(file)
|
||||
}
|
||||
|
||||
/// Returns the unique identifier of the file
|
||||
pub fn id(&self) -> i64 {
|
||||
self.model.id
|
||||
}
|
||||
|
||||
/// Returns the hash of the file (content identifier)
|
||||
pub fn cd(&self) -> &[u8] {
|
||||
&self.content_descriptor.descriptor
|
||||
}
|
||||
|
||||
/// Returns the encoded content descriptor
|
||||
pub fn encoded_cd(&self) -> String {
|
||||
encode_content_descriptor(self.cd())
|
||||
}
|
||||
|
||||
/// Returns the id of the civ (content identifier value) of the file
|
||||
pub fn cd_id(&self) -> i64 {
|
||||
self.content_descriptor.id
|
||||
}
|
||||
|
||||
/// Returns the mime type of the file
|
||||
pub fn mime_type(&self) -> &String {
|
||||
&self.model.mime_type
|
||||
}
|
||||
|
||||
/// Returns the status of the file
|
||||
pub fn status(&self) -> FileStatus {
|
||||
match self.model.status {
|
||||
10 => FileStatus::Imported,
|
||||
20 => FileStatus::Archived,
|
||||
30 => FileStatus::Deleted,
|
||||
_ => FileStatus::Imported,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn set_status(&mut self, status: FileStatus) -> RepoResult<()> {
|
||||
let active_model = file::ActiveModel {
|
||||
id: Set(self.model.id),
|
||||
status: Set(status as i32),
|
||||
..Default::default()
|
||||
};
|
||||
self.model = active_model.update(&self.db).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Returns the metadata associated with this file
|
||||
/// A file MUST always have metadata associated
|
||||
pub async fn metadata(&self) -> RepoResult<FileMetadata> {
|
||||
FileMetadata::by_id(self.db.clone(), self.model.id)
|
||||
.await
|
||||
.and_then(|f| f.ok_or_else(|| RepoError::from("missing file metadata")))
|
||||
}
|
||||
|
||||
/// Returns the list of tags of the file
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
pub async fn tags(&self) -> RepoResult<Vec<Tag>> {
|
||||
let tags: Vec<(tag::Model, Option<namespace::Model>)> = tag::Entity::find()
|
||||
.find_also_related(namespace::Entity)
|
||||
.join(
|
||||
JoinType::LeftJoin,
|
||||
content_descriptor_tag::Relation::Tag.def().rev(),
|
||||
)
|
||||
.join(
|
||||
JoinType::InnerJoin,
|
||||
content_descriptor_tag::Relation::ContentDescriptorId.def(),
|
||||
)
|
||||
.filter(content_descriptor::Column::Id.eq(self.content_descriptor.id))
|
||||
.all(&self.db)
|
||||
.await?;
|
||||
let tags = tags
|
||||
.into_iter()
|
||||
.map(|(tag, namespace)| Tag::new(self.db.clone(), tag, namespace))
|
||||
.collect();
|
||||
|
||||
Ok(tags)
|
||||
}
|
||||
|
||||
/// Adds a single tag to the file
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
pub async fn add_tag(&mut self, tag_id: i64) -> RepoResult<()> {
|
||||
let cd_id = self.content_descriptor.id;
|
||||
let active_model = content_descriptor_tag::ActiveModel {
|
||||
cd_id: Set(cd_id),
|
||||
tag_id: Set(tag_id),
|
||||
};
|
||||
active_model.insert(&self.db).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Adds multiple tags to the file at once
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
pub async fn add_tags(&self, tag_ids: Vec<i64>) -> RepoResult<()> {
|
||||
if tag_ids.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
let cd_id = self.content_descriptor.id;
|
||||
let models: Vec<content_descriptor_tag::ActiveModel> = tag_ids
|
||||
.into_iter()
|
||||
.map(|tag_id| content_descriptor_tag::ActiveModel {
|
||||
cd_id: Set(cd_id),
|
||||
tag_id: Set(tag_id),
|
||||
})
|
||||
.collect();
|
||||
content_descriptor_tag::Entity::insert_many(models)
|
||||
.exec(&self.db)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Removes multiple tags from the file
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
pub async fn remove_tags(&self, tag_ids: Vec<i64>) -> RepoResult<()> {
|
||||
let hash_id = self.content_descriptor.id;
|
||||
content_descriptor_tag::Entity::delete_many()
|
||||
.filter(content_descriptor_tag::Column::CdId.eq(hash_id))
|
||||
.filter(content_descriptor_tag::Column::TagId.is_in(tag_ids))
|
||||
.exec(&self.db)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Returns the reader for the file
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
pub async fn get_reader(
|
||||
&self,
|
||||
storage: &FileHashStore,
|
||||
) -> RepoResult<BufReader<tokio::fs::File>> {
|
||||
storage
|
||||
.get_file(&self.content_descriptor.descriptor)
|
||||
.await
|
||||
.map(|(_, f)| f)
|
||||
}
|
||||
|
||||
/// Creates a thumbnail for the file
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
pub async fn create_thumbnail<I: IntoIterator<Item = ThumbnailSize> + Debug>(
|
||||
&self,
|
||||
storage: &FileHashStore,
|
||||
sizes: I,
|
||||
) -> RepoResult<Vec<ThumbnailerThumb>> {
|
||||
let mut buf = Vec::new();
|
||||
self.get_reader(storage)
|
||||
.await?
|
||||
.read_to_end(&mut buf)
|
||||
.await?;
|
||||
let mime_type = self.model.mime_type.clone();
|
||||
let mime_type =
|
||||
mime::Mime::from_str(&mime_type).unwrap_or_else(|_| mime::APPLICATION_OCTET_STREAM);
|
||||
let thumbs = thumbnailer::create_thumbnails(Cursor::new(buf), mime_type, sizes)?;
|
||||
|
||||
Ok(thumbs)
|
||||
}
|
||||
|
||||
/// Deletes the file as well as the content descriptor, tag mappings and metadata about the file
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
pub async fn delete(self) -> RepoResult<()> {
|
||||
let trx = self.db.begin().await?;
|
||||
file_metadata::Entity::delete_many()
|
||||
.filter(file_metadata::Column::FileId.eq(self.model.id))
|
||||
.exec(&trx)
|
||||
.await?;
|
||||
self.model.delete(&trx).await?;
|
||||
content_descriptor_tag::Entity::delete_many()
|
||||
.filter(content_descriptor_tag::Column::CdId.eq(self.content_descriptor.id))
|
||||
.exec(&trx)
|
||||
.await?;
|
||||
content_descriptor::Entity::delete_many()
|
||||
.filter(content_descriptor::Column::Id.eq(self.content_descriptor.id))
|
||||
.exec(&trx)
|
||||
.await?;
|
||||
trx.commit().await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
@ -1,124 +0,0 @@
|
||||
use std::fmt::Debug;
|
||||
|
||||
use chrono::{Local, NaiveDateTime};
|
||||
use sea_orm::prelude::*;
|
||||
use sea_orm::{DatabaseConnection, Set};
|
||||
|
||||
use mediarepo_core::error::RepoResult;
|
||||
use mediarepo_database::entities::file_metadata;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct FileMetadata {
|
||||
db: DatabaseConnection,
|
||||
model: file_metadata::Model,
|
||||
}
|
||||
|
||||
impl FileMetadata {
|
||||
#[tracing::instrument(level = "trace")]
|
||||
pub(crate) fn new(db: DatabaseConnection, model: file_metadata::Model) -> Self {
|
||||
Self { db, model }
|
||||
}
|
||||
|
||||
/// Fetches the file by id
|
||||
#[tracing::instrument(level = "debug", skip(db))]
|
||||
pub async fn by_id(db: DatabaseConnection, id: i64) -> RepoResult<Option<Self>> {
|
||||
let file_metadata = file_metadata::Entity::find_by_id(id)
|
||||
.one(&db)
|
||||
.await?
|
||||
.map(|m| FileMetadata::new(db, m));
|
||||
|
||||
Ok(file_metadata)
|
||||
}
|
||||
|
||||
/// Fetches metadata for all given file ids
|
||||
#[tracing::instrument(level = "debug", skip(db))]
|
||||
pub async fn all_by_ids(db: DatabaseConnection, ids: Vec<i64>) -> RepoResult<Vec<Self>> {
|
||||
let file_metadata = file_metadata::Entity::find()
|
||||
.filter(file_metadata::Column::FileId.is_in(ids))
|
||||
.all(&db)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|m| FileMetadata::new(db.clone(), m))
|
||||
.collect();
|
||||
|
||||
Ok(file_metadata)
|
||||
}
|
||||
|
||||
/// Adds a file with its hash to the database
|
||||
#[tracing::instrument(level = "debug", skip(db))]
|
||||
pub(crate) async fn add(
|
||||
db: DatabaseConnection,
|
||||
file_id: i64,
|
||||
size: i64,
|
||||
creation_time: NaiveDateTime,
|
||||
change_time: NaiveDateTime,
|
||||
) -> RepoResult<Self> {
|
||||
let file = file_metadata::ActiveModel {
|
||||
file_id: Set(file_id),
|
||||
size: Set(size),
|
||||
import_time: Set(Local::now().naive_local()),
|
||||
creation_time: Set(creation_time),
|
||||
change_time: Set(change_time),
|
||||
..Default::default()
|
||||
};
|
||||
let model = file.insert(&db).await?;
|
||||
|
||||
Ok(Self::new(db, model))
|
||||
}
|
||||
|
||||
pub fn file_id(&self) -> i64 {
|
||||
self.model.file_id
|
||||
}
|
||||
|
||||
pub fn size(&self) -> i64 {
|
||||
self.model.size
|
||||
}
|
||||
|
||||
pub fn name(&self) -> &Option<String> {
|
||||
&self.model.name
|
||||
}
|
||||
|
||||
pub fn comment(&self) -> &Option<String> {
|
||||
&self.model.comment
|
||||
}
|
||||
|
||||
pub fn import_time(&self) -> &NaiveDateTime {
|
||||
&self.model.import_time
|
||||
}
|
||||
|
||||
pub fn creation_time(&self) -> &NaiveDateTime {
|
||||
&self.model.creation_time
|
||||
}
|
||||
|
||||
pub fn change_time(&self) -> &NaiveDateTime {
|
||||
&self.model.change_time
|
||||
}
|
||||
|
||||
/// Changes the name of the file
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
pub async fn set_name<S: ToString + Debug>(&mut self, name: S) -> RepoResult<()> {
|
||||
let mut active_model = self.get_active_model();
|
||||
active_model.name = Set(Some(name.to_string()));
|
||||
self.model = active_model.update(&self.db).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Changes the comment of the file
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
pub async fn set_comment<S: ToString + Debug>(&mut self, comment: S) -> RepoResult<()> {
|
||||
let mut active_file = self.get_active_model();
|
||||
active_file.comment = Set(Some(comment.to_string()));
|
||||
self.model = active_file.update(&self.db).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Returns the active model of the file with only the id set
|
||||
fn get_active_model(&self) -> file_metadata::ActiveModel {
|
||||
file_metadata::ActiveModel {
|
||||
file_id: Set(self.file_id()),
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
}
|
@ -1,8 +0,0 @@
|
||||
pub mod content_descriptor;
|
||||
pub mod file;
|
||||
pub mod file_metadata;
|
||||
pub mod namespace;
|
||||
pub mod repo;
|
||||
pub mod tag;
|
||||
pub mod thumbnail;
|
||||
pub mod type_keys;
|
@ -1,141 +0,0 @@
|
||||
use mediarepo_core::error::RepoResult;
|
||||
use mediarepo_database::entities::namespace;
|
||||
use sea_orm::prelude::*;
|
||||
use sea_orm::{
|
||||
Condition, ConnectionTrait, DatabaseBackend, DatabaseConnection, InsertResult, Set, Statement,
|
||||
};
|
||||
use std::fmt::Debug;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Namespace {
|
||||
#[allow(dead_code)]
|
||||
db: DatabaseConnection,
|
||||
model: namespace::Model,
|
||||
}
|
||||
|
||||
impl Namespace {
|
||||
#[tracing::instrument(level = "trace")]
|
||||
pub(crate) fn new(db: DatabaseConnection, model: namespace::Model) -> Self {
|
||||
Self { db, model }
|
||||
}
|
||||
|
||||
/// Retrieves a list of all namespaces
|
||||
#[tracing::instrument(level = "debug", skip(db))]
|
||||
pub async fn all(db: DatabaseConnection) -> RepoResult<Vec<Self>> {
|
||||
let namespaces = namespace::Entity::find()
|
||||
.all(&db)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|model| Self::new(db.clone(), model))
|
||||
.collect();
|
||||
|
||||
Ok(namespaces)
|
||||
}
|
||||
|
||||
/// Retrieves the namespace by id
|
||||
#[tracing::instrument(level = "debug", skip(db))]
|
||||
pub async fn by_id(db: DatabaseConnection, id: i64) -> RepoResult<Option<Self>> {
|
||||
let namespace = namespace::Entity::find_by_id(id)
|
||||
.one(&db)
|
||||
.await?
|
||||
.map(|model| Self::new(db, model));
|
||||
|
||||
Ok(namespace)
|
||||
}
|
||||
|
||||
/// Retrieves a namespace by its name
|
||||
#[tracing::instrument(level = "debug", skip(db))]
|
||||
pub async fn by_name<S: AsRef<str> + Debug>(
|
||||
db: DatabaseConnection,
|
||||
name: S,
|
||||
) -> RepoResult<Option<Self>> {
|
||||
let namespace = namespace::Entity::find()
|
||||
.filter(namespace::Column::Name.eq(name.as_ref()))
|
||||
.one(&db)
|
||||
.await?
|
||||
.map(|model| Self::new(db, model));
|
||||
|
||||
Ok(namespace)
|
||||
}
|
||||
|
||||
/// Returns all namespaces by name
|
||||
#[tracing::instrument(level = "debug", skip(db))]
|
||||
pub async fn all_by_name(db: DatabaseConnection, names: Vec<String>) -> RepoResult<Vec<Self>> {
|
||||
if names.is_empty() {
|
||||
return Ok(Vec::with_capacity(0));
|
||||
}
|
||||
let mut condition = Condition::any();
|
||||
for name in names {
|
||||
condition = condition.add(namespace::Column::Name.eq(name));
|
||||
}
|
||||
|
||||
let namespaces = namespace::Entity::find()
|
||||
.filter(condition)
|
||||
.all(&db)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|model| Self::new(db.clone(), model))
|
||||
.collect();
|
||||
|
||||
Ok(namespaces)
|
||||
}
|
||||
|
||||
/// Adds all namespaces to the database
|
||||
#[tracing::instrument(level = "debug", skip(db))]
|
||||
pub async fn add_all(db: DatabaseConnection, names: Vec<String>) -> RepoResult<Vec<Self>> {
|
||||
if names.is_empty() {
|
||||
return Ok(vec![]);
|
||||
}
|
||||
let models: Vec<namespace::ActiveModel> = names
|
||||
.into_iter()
|
||||
.map(|name| namespace::ActiveModel {
|
||||
name: Set(name),
|
||||
..Default::default()
|
||||
})
|
||||
.collect();
|
||||
let txn = db.begin().await?;
|
||||
let last_id = txn
|
||||
.query_one(Statement::from_string(
|
||||
DatabaseBackend::Sqlite,
|
||||
r#"SELECT MAX(id) AS "max_id" FROM namespaces;"#.to_owned(),
|
||||
))
|
||||
.await?
|
||||
.and_then(|result| result.try_get("", "max_id").ok())
|
||||
.unwrap_or(-1);
|
||||
let result: InsertResult<namespace::ActiveModel> =
|
||||
namespace::Entity::insert_many(models).exec(&txn).await?;
|
||||
|
||||
let namespaces = namespace::Entity::find()
|
||||
.filter(namespace::Column::Id.between(last_id, result.last_insert_id + 1))
|
||||
.all(&txn)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|model| Self::new(db.clone(), model))
|
||||
.collect();
|
||||
txn.commit().await?;
|
||||
|
||||
Ok(namespaces)
|
||||
}
|
||||
|
||||
/// Adds a namespace to the database
|
||||
#[tracing::instrument(level = "debug", skip(db))]
|
||||
pub async fn add<S: ToString + Debug>(db: DatabaseConnection, name: S) -> RepoResult<Self> {
|
||||
let active_model = namespace::ActiveModel {
|
||||
name: Set(name.to_string()),
|
||||
..Default::default()
|
||||
};
|
||||
let model = active_model.insert(&db).await?;
|
||||
|
||||
Ok(Self::new(db, model))
|
||||
}
|
||||
|
||||
/// The ID of the namespace
|
||||
pub fn id(&self) -> i64 {
|
||||
self.model.id
|
||||
}
|
||||
|
||||
/// The name of the namespace
|
||||
pub fn name(&self) -> &String {
|
||||
&self.model.name
|
||||
}
|
||||
}
|
@ -1,432 +0,0 @@
|
||||
use crate::content_descriptor::ContentDescriptor;
|
||||
use crate::file::filter::FilterProperty;
|
||||
use crate::file::File;
|
||||
use crate::file_metadata::FileMetadata;
|
||||
use crate::namespace::Namespace;
|
||||
use crate::tag::Tag;
|
||||
use crate::thumbnail::Thumbnail;
|
||||
use chrono::{Local, NaiveDateTime};
|
||||
use mediarepo_core::content_descriptor::{encode_content_descriptor, is_v1_content_descriptor};
|
||||
use mediarepo_core::error::{RepoError, RepoResult};
|
||||
use mediarepo_core::fs::file_hash_store::FileHashStore;
|
||||
use mediarepo_core::fs::thumbnail_store::{Dimensions, ThumbnailStore};
|
||||
use mediarepo_core::itertools::Itertools;
|
||||
use mediarepo_core::thumbnailer::ThumbnailSize;
|
||||
use mediarepo_core::utils::parse_namespace_and_tag;
|
||||
use mediarepo_database::get_database;
|
||||
use mediarepo_database::queries::analysis::{get_all_counts, Counts};
|
||||
use sea_orm::DatabaseConnection;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::fmt::Debug;
|
||||
use std::io::Cursor;
|
||||
use std::iter::FromIterator;
|
||||
use std::path::PathBuf;
|
||||
use std::str::FromStr;
|
||||
use tokio::fs::OpenOptions;
|
||||
use tokio::io::AsyncReadExt;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Repo {
|
||||
db: DatabaseConnection,
|
||||
main_storage: FileHashStore,
|
||||
thumbnail_storage: ThumbnailStore,
|
||||
}
|
||||
|
||||
impl Repo {
|
||||
pub(crate) fn new(
|
||||
db: DatabaseConnection,
|
||||
file_store_path: PathBuf,
|
||||
thumb_store_path: PathBuf,
|
||||
) -> Self {
|
||||
Self {
|
||||
db,
|
||||
main_storage: FileHashStore::new(file_store_path),
|
||||
thumbnail_storage: ThumbnailStore::new(thumb_store_path),
|
||||
}
|
||||
}
|
||||
|
||||
/// Connects to the database with the given uri
|
||||
#[tracing::instrument(level = "debug")]
|
||||
pub async fn connect<S: AsRef<str> + Debug>(
|
||||
uri: S,
|
||||
file_store_path: PathBuf,
|
||||
thumb_store_path: PathBuf,
|
||||
) -> RepoResult<Self> {
|
||||
let db = get_database(uri).await?;
|
||||
Ok(Self::new(db, file_store_path, thumb_store_path))
|
||||
}
|
||||
|
||||
/// Returns the database of the repo for raw sql queries
|
||||
pub fn db(&self) -> &DatabaseConnection {
|
||||
&self.db
|
||||
}
|
||||
|
||||
/// Returns a file by its mapped hash
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
pub async fn file_by_cd(&self, cd: &[u8]) -> RepoResult<Option<File>> {
|
||||
File::by_cd(self.db.clone(), cd).await
|
||||
}
|
||||
|
||||
/// Returns a file by id
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
pub async fn file_by_id(&self, id: i64) -> RepoResult<Option<File>> {
|
||||
File::by_id(self.db.clone(), id).await
|
||||
}
|
||||
|
||||
/// Returns a list of all stored files
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
pub async fn files(&self) -> RepoResult<Vec<File>> {
|
||||
File::all(self.db.clone()).await
|
||||
}
|
||||
|
||||
/// Finds all files by a list of tags
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
pub async fn find_files_by_filters(
|
||||
&self,
|
||||
filters: Vec<Vec<FilterProperty>>,
|
||||
) -> RepoResult<Vec<File>> {
|
||||
File::find_by_filters(self.db.clone(), filters).await
|
||||
}
|
||||
|
||||
/// Returns all file metadata entries for the given file ids
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
pub async fn get_file_metadata_for_ids(&self, ids: Vec<i64>) -> RepoResult<Vec<FileMetadata>> {
|
||||
FileMetadata::all_by_ids(self.db.clone(), ids).await
|
||||
}
|
||||
|
||||
/// Adds a file from bytes to the database
|
||||
#[tracing::instrument(level = "debug", skip(self, content))]
|
||||
pub async fn add_file(
|
||||
&self,
|
||||
mime_type: Option<String>,
|
||||
content: Vec<u8>,
|
||||
creation_time: NaiveDateTime,
|
||||
change_time: NaiveDateTime,
|
||||
) -> RepoResult<File> {
|
||||
let file_size = content.len();
|
||||
let reader = Cursor::new(content);
|
||||
let cd_binary = self.main_storage.add_file(reader, None).await?;
|
||||
let cd = ContentDescriptor::add(self.db.clone(), cd_binary).await?;
|
||||
|
||||
let mime_type = mime_type
|
||||
.and_then(|m| mime::Mime::from_str(&m).ok())
|
||||
.unwrap_or_else(|| mime::APPLICATION_OCTET_STREAM)
|
||||
.to_string();
|
||||
|
||||
let file = File::add(self.db.clone(), cd.id(), mime_type).await?;
|
||||
FileMetadata::add(
|
||||
self.db.clone(),
|
||||
file.id(),
|
||||
file_size as i64,
|
||||
creation_time,
|
||||
change_time,
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(file)
|
||||
}
|
||||
|
||||
/// Adds a file to the database by its readable path in the file system
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
pub async fn add_file_by_path(&self, path: PathBuf) -> RepoResult<File> {
|
||||
let mime_type = mime_guess::from_path(&path).first().map(|m| m.to_string());
|
||||
|
||||
let mut os_file = OpenOptions::new().read(true).open(&path).await?;
|
||||
let mut buf = Vec::new();
|
||||
os_file.read_to_end(&mut buf).await?;
|
||||
|
||||
self.add_file(
|
||||
mime_type,
|
||||
buf,
|
||||
Local::now().naive_local(),
|
||||
Local::now().naive_local(),
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Deletes a file from the database and disk
|
||||
#[tracing::instrument(level = "debug", skip(self, file))]
|
||||
pub async fn delete_file(&self, file: File) -> RepoResult<()> {
|
||||
let cd = file.cd().to_owned();
|
||||
let cd_string = file.encoded_cd();
|
||||
file.delete().await?;
|
||||
self.main_storage.delete_file(&cd).await?;
|
||||
self.thumbnail_storage.delete_parent(&cd_string).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Returns all thumbnails of a file
|
||||
pub async fn get_file_thumbnails(&self, file_cd: &[u8]) -> RepoResult<Vec<Thumbnail>> {
|
||||
let file_cd = encode_content_descriptor(file_cd);
|
||||
let thumbnails = self
|
||||
.thumbnail_storage
|
||||
.get_thumbnails(&file_cd)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|(size, path)| Thumbnail {
|
||||
file_hash: file_cd.to_owned(),
|
||||
path,
|
||||
size,
|
||||
mime_type: mime::IMAGE_PNG.to_string(),
|
||||
})
|
||||
.collect_vec();
|
||||
|
||||
Ok(thumbnails)
|
||||
}
|
||||
|
||||
pub async fn get_file_bytes(&self, file: &File) -> RepoResult<Vec<u8>> {
|
||||
let mut buf = Vec::new();
|
||||
let mut reader = file.get_reader(&self.main_storage).await?;
|
||||
reader.read_to_end(&mut buf).await?;
|
||||
|
||||
Ok(buf)
|
||||
}
|
||||
|
||||
/// Creates thumbnails of all sizes for a file
|
||||
#[tracing::instrument(level = "debug", skip(self, file))]
|
||||
pub async fn create_thumbnails_for_file(&self, file: &File) -> RepoResult<Vec<Thumbnail>> {
|
||||
let size = ThumbnailSize::Medium;
|
||||
let (height, width) = size.dimensions();
|
||||
let thumbs = file.create_thumbnail(&self.main_storage, [size]).await?;
|
||||
let mut created_thumbs = Vec::with_capacity(1);
|
||||
|
||||
for thumb in thumbs {
|
||||
let entry = self
|
||||
.store_single_thumbnail(file.encoded_cd(), height, width, thumb)
|
||||
.await?;
|
||||
created_thumbs.push(entry);
|
||||
}
|
||||
|
||||
Ok(created_thumbs)
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "debug", skip(self, file))]
|
||||
pub async fn create_file_thumbnail(
|
||||
&self,
|
||||
file: &File,
|
||||
size: ThumbnailSize,
|
||||
) -> RepoResult<Thumbnail> {
|
||||
let (height, width) = size.dimensions();
|
||||
let thumb = file
|
||||
.create_thumbnail(&self.main_storage, [size])
|
||||
.await?
|
||||
.pop()
|
||||
.ok_or_else(|| RepoError::from("Failed to create thumbnail"))?;
|
||||
let thumbnail = self
|
||||
.store_single_thumbnail(file.encoded_cd(), height, width, thumb)
|
||||
.await?;
|
||||
|
||||
Ok(thumbnail)
|
||||
}
|
||||
|
||||
/// Stores a single thumbnail
|
||||
async fn store_single_thumbnail(
|
||||
&self,
|
||||
file_hash: String,
|
||||
height: u32,
|
||||
width: u32,
|
||||
thumb: mediarepo_core::thumbnailer::Thumbnail,
|
||||
) -> RepoResult<Thumbnail> {
|
||||
let mut buf = Vec::new();
|
||||
thumb.write_png(&mut buf)?;
|
||||
let size = Dimensions { height, width };
|
||||
let path = self
|
||||
.thumbnail_storage
|
||||
.add_thumbnail(&file_hash, size.clone(), &buf)
|
||||
.await?;
|
||||
|
||||
let thumbnail = Thumbnail {
|
||||
file_hash,
|
||||
path,
|
||||
size,
|
||||
mime_type: mime::IMAGE_PNG.to_string(),
|
||||
};
|
||||
|
||||
Ok(thumbnail)
|
||||
}
|
||||
|
||||
/// Returns all tags stored in the database
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
pub async fn tags(&self) -> RepoResult<Vec<Tag>> {
|
||||
Tag::all(self.db.clone()).await
|
||||
}
|
||||
|
||||
/// Returns all namespaces stored in the database
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
pub async fn namespaces(&self) -> RepoResult<Vec<Namespace>> {
|
||||
Namespace::all(self.db.clone()).await
|
||||
}
|
||||
|
||||
/// Converts a list of tag names to tag ids
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
pub async fn tag_names_to_ids(&self, tags: Vec<String>) -> RepoResult<HashMap<String, i64>> {
|
||||
let parsed_tags = tags
|
||||
.iter()
|
||||
.map(|tag| parse_namespace_and_tag(tag.clone()))
|
||||
.unique()
|
||||
.collect();
|
||||
|
||||
let db_tags = self.tags_by_names(parsed_tags).await?;
|
||||
let tag_map: HashMap<String, i64> =
|
||||
HashMap::from_iter(db_tags.into_iter().map(|t| (t.normalized_name(), t.id())));
|
||||
|
||||
Ok(tag_map)
|
||||
}
|
||||
|
||||
/// Finds all tags by name
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
pub async fn tags_by_names(&self, tags: Vec<(Option<String>, String)>) -> RepoResult<Vec<Tag>> {
|
||||
Tag::all_by_name(self.db.clone(), tags).await
|
||||
}
|
||||
|
||||
/// Finds all tags that are assigned to the given list of hashes
|
||||
#[tracing::instrument(level = "debug", skip_all)]
|
||||
pub async fn find_tags_for_file_identifiers(&self, cds: Vec<Vec<u8>>) -> RepoResult<Vec<Tag>> {
|
||||
Tag::for_cd_list(self.db.clone(), cds).await
|
||||
}
|
||||
|
||||
/// Adds all tags that are not in the database to the database and returns the ones already existing as well
|
||||
#[tracing::instrument(level = "debug", skip_all)]
|
||||
pub async fn add_all_tags(&self, tags: Vec<(Option<String>, String)>) -> RepoResult<Vec<Tag>> {
|
||||
let mut tags_to_add = tags.into_iter().unique().collect_vec();
|
||||
let mut namespaces_to_add = tags_to_add
|
||||
.iter()
|
||||
.filter_map(|(namespace, _)| namespace.clone())
|
||||
.unique()
|
||||
.collect_vec();
|
||||
|
||||
let mut existing_namespaces =
|
||||
Namespace::all_by_name(self.db.clone(), namespaces_to_add.clone()).await?;
|
||||
{
|
||||
let existing_namespaces_set = existing_namespaces
|
||||
.iter()
|
||||
.map(|n| n.name().clone())
|
||||
.collect::<HashSet<String>>();
|
||||
namespaces_to_add.retain(|namespace| !existing_namespaces_set.contains(namespace));
|
||||
}
|
||||
existing_namespaces
|
||||
.append(&mut Namespace::add_all(self.db.clone(), namespaces_to_add).await?);
|
||||
|
||||
let mut existing_tags = self.tags_by_names(tags_to_add.clone()).await?;
|
||||
{
|
||||
let existing_tags_set = existing_tags
|
||||
.iter()
|
||||
.map(|t| (t.namespace().map(|n| n.name().clone()), t.name().clone()))
|
||||
.collect::<HashSet<(Option<String>, String)>>();
|
||||
|
||||
tags_to_add.retain(|t| !existing_tags_set.contains(t));
|
||||
}
|
||||
let namespace_map = existing_namespaces
|
||||
.into_iter()
|
||||
.map(|namespace| (namespace.name().clone(), namespace.id()))
|
||||
.collect::<HashMap<String, i64>>();
|
||||
let tags_to_add = tags_to_add
|
||||
.into_iter()
|
||||
.map(|(nsp, name)| (nsp.and_then(|n| namespace_map.get(&n)).map(|i| *i), name))
|
||||
.collect_vec();
|
||||
existing_tags.append(&mut Tag::add_all(self.db.clone(), tags_to_add).await?);
|
||||
|
||||
Ok(existing_tags)
|
||||
}
|
||||
|
||||
/// Adds or finds a tag
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
pub async fn add_or_find_tag<S: ToString + Debug>(&self, tag: S) -> RepoResult<Tag> {
|
||||
let (namespace, name) = parse_namespace_and_tag(tag.to_string());
|
||||
if let Some(namespace) = namespace {
|
||||
self.add_or_find_namespaced_tag(name, namespace).await
|
||||
} else {
|
||||
self.add_or_find_unnamespaced_tag(name).await
|
||||
}
|
||||
}
|
||||
|
||||
/// Adds or finds an unnamespaced tag
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
pub async fn add_or_find_unnamespaced_tag(&self, name: String) -> RepoResult<Tag> {
|
||||
if let Some(tag) = Tag::by_name(self.db.clone(), &name, None).await? {
|
||||
Ok(tag)
|
||||
} else {
|
||||
self.add_unnamespaced_tag(name).await
|
||||
}
|
||||
}
|
||||
|
||||
/// Adds an unnamespaced tag
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
pub async fn add_unnamespaced_tag(&self, name: String) -> RepoResult<Tag> {
|
||||
Tag::add(self.db.clone(), name, None).await
|
||||
}
|
||||
|
||||
/// Adds or finds a namespaced tag
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
pub async fn add_or_find_namespaced_tag(
|
||||
&self,
|
||||
name: String,
|
||||
namespace: String,
|
||||
) -> RepoResult<Tag> {
|
||||
if let Some(tag) = Tag::by_name(self.db.clone(), &name, Some(namespace.clone())).await? {
|
||||
Ok(tag)
|
||||
} else {
|
||||
self.add_namespaced_tag(name, namespace).await
|
||||
}
|
||||
}
|
||||
|
||||
/// Adds a namespaced tag
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
pub async fn add_namespaced_tag(&self, name: String, namespace: String) -> RepoResult<Tag> {
|
||||
let namespace =
|
||||
if let Some(namespace) = Namespace::by_name(self.db.clone(), &namespace).await? {
|
||||
namespace
|
||||
} else {
|
||||
Namespace::add(self.db.clone(), namespace).await?
|
||||
};
|
||||
Tag::add(self.db.clone(), name, Some(namespace.id())).await
|
||||
}
|
||||
|
||||
/// Returns the size of the main storage
|
||||
#[inline]
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
pub async fn get_main_store_size(&self) -> RepoResult<u64> {
|
||||
self.main_storage.get_size().await
|
||||
}
|
||||
|
||||
/// Returns the size of the thumbnail storage
|
||||
#[inline]
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
pub async fn get_thumb_store_size(&self) -> RepoResult<u64> {
|
||||
self.thumbnail_storage.get_size().await
|
||||
}
|
||||
|
||||
/// Returns all entity counts
|
||||
#[inline]
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
pub async fn get_counts(&self) -> RepoResult<Counts> {
|
||||
get_all_counts(&self.db).await
|
||||
}
|
||||
|
||||
pub async fn migrate(&self) -> RepoResult<()> {
|
||||
let cds = ContentDescriptor::all(self.db.clone()).await?;
|
||||
|
||||
tracing::info!("Converting content descriptors to v2 format...");
|
||||
let mut converted_count = 0;
|
||||
|
||||
for mut cd in cds {
|
||||
if is_v1_content_descriptor(cd.descriptor()) {
|
||||
let src_cd = cd.descriptor().to_owned();
|
||||
cd.convert_v1_to_v2().await?;
|
||||
let dst_cd = cd.descriptor().to_owned();
|
||||
self.main_storage.rename_file(&src_cd, &dst_cd).await?;
|
||||
self.thumbnail_storage
|
||||
.rename_parent(
|
||||
encode_content_descriptor(&src_cd),
|
||||
encode_content_descriptor(&dst_cd),
|
||||
)
|
||||
.await?;
|
||||
converted_count += 1;
|
||||
}
|
||||
}
|
||||
tracing::info!("Converted {} descriptors", converted_count);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
@ -1,226 +0,0 @@
|
||||
use std::fmt::Debug;
|
||||
|
||||
use mediarepo_core::error::RepoResult;
|
||||
use mediarepo_database::entities::content_descriptor;
|
||||
use mediarepo_database::entities::content_descriptor_tag;
|
||||
use mediarepo_database::entities::namespace;
|
||||
use mediarepo_database::entities::tag;
|
||||
use sea_orm::prelude::*;
|
||||
use sea_orm::query::ConnectionTrait;
|
||||
use sea_orm::sea_query::Expr;
|
||||
use sea_orm::{Condition, DatabaseBackend, DatabaseConnection, JoinType, Set, Statement};
|
||||
use sea_orm::{InsertResult, QuerySelect};
|
||||
|
||||
use crate::namespace::Namespace;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Tag {
|
||||
db: DatabaseConnection,
|
||||
model: tag::Model,
|
||||
namespace: Option<namespace::Model>,
|
||||
}
|
||||
|
||||
impl Tag {
|
||||
#[tracing::instrument(level = "trace")]
|
||||
pub(crate) fn new(
|
||||
db: DatabaseConnection,
|
||||
model: tag::Model,
|
||||
namespace: Option<namespace::Model>,
|
||||
) -> Self {
|
||||
Self {
|
||||
db,
|
||||
model,
|
||||
namespace,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns all tags stored in the database
|
||||
#[tracing::instrument(level = "debug", skip(db))]
|
||||
pub async fn all(db: DatabaseConnection) -> RepoResult<Vec<Self>> {
|
||||
let tags: Vec<Self> = tag::Entity::find()
|
||||
.left_join(namespace::Entity)
|
||||
.select_also(namespace::Entity)
|
||||
.all(&db)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|(tag, namespace)| Self::new(db.clone(), tag, namespace))
|
||||
.collect();
|
||||
|
||||
Ok(tags)
|
||||
}
|
||||
|
||||
/// Returns the tag by id
|
||||
#[tracing::instrument(level = "debug", skip(db))]
|
||||
pub async fn by_id(db: DatabaseConnection, id: i64) -> RepoResult<Option<Self>> {
|
||||
let tag = tag::Entity::find_by_id(id)
|
||||
.find_also_related(namespace::Entity)
|
||||
.one(&db)
|
||||
.await?
|
||||
.map(|(model, namespace)| Self::new(db, model, namespace));
|
||||
|
||||
Ok(tag)
|
||||
}
|
||||
|
||||
/// Returns one tag by name and namespace
|
||||
#[tracing::instrument(level = "debug", skip(db))]
|
||||
pub async fn by_name<S1: ToString + Debug>(
|
||||
db: DatabaseConnection,
|
||||
name: S1,
|
||||
namespace: Option<String>,
|
||||
) -> RepoResult<Option<Self>> {
|
||||
let mut entries = Self::all_by_name(db, vec![(namespace, name.to_string())]).await?;
|
||||
|
||||
Ok(entries.pop())
|
||||
}
|
||||
|
||||
/// Retrieves the namespaced tags by name and namespace
|
||||
#[tracing::instrument(level = "debug", skip(db))]
|
||||
pub async fn all_by_name(
|
||||
db: DatabaseConnection,
|
||||
namespaces_with_names: Vec<(Option<String>, String)>,
|
||||
) -> RepoResult<Vec<Self>> {
|
||||
if namespaces_with_names.is_empty() {
|
||||
return Ok(vec![]);
|
||||
}
|
||||
let mut or_condition = Condition::any();
|
||||
|
||||
for (namespace, name) in namespaces_with_names {
|
||||
let mut all_condition = Condition::all();
|
||||
if !name.ends_with('*') {
|
||||
all_condition = all_condition.add(tag::Column::Name.eq(name))
|
||||
} else if name.len() > 1 {
|
||||
all_condition = all_condition
|
||||
.add(tag::Column::Name.like(&*format!("{}%", name.trim_end_matches("*"))))
|
||||
} else if namespace.is_none() {
|
||||
continue; // would result in an empty condition otherwise
|
||||
}
|
||||
|
||||
all_condition = if let Some(namespace) = namespace {
|
||||
all_condition.add(namespace::Column::Name.eq(namespace))
|
||||
} else {
|
||||
all_condition.add(Expr::tbl(tag::Entity, tag::Column::NamespaceId).is_null())
|
||||
};
|
||||
or_condition = or_condition.add(all_condition);
|
||||
}
|
||||
|
||||
let tags: Vec<Self> = tag::Entity::find()
|
||||
.find_also_related(namespace::Entity)
|
||||
.filter(or_condition)
|
||||
.group_by(tag::Column::Id)
|
||||
.all(&db)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|(t, n)| Self::new(db.clone(), t, n))
|
||||
.collect();
|
||||
|
||||
Ok(tags)
|
||||
}
|
||||
|
||||
/// Returns all tags that are assigned to any of the passed hashes
|
||||
#[tracing::instrument(level = "debug", skip_all)]
|
||||
pub async fn for_cd_list(db: DatabaseConnection, cds: Vec<Vec<u8>>) -> RepoResult<Vec<Self>> {
|
||||
let tags: Vec<Self> = tag::Entity::find()
|
||||
.find_also_related(namespace::Entity)
|
||||
.join(
|
||||
JoinType::LeftJoin,
|
||||
content_descriptor_tag::Relation::Tag.def().rev(),
|
||||
)
|
||||
.join(
|
||||
JoinType::InnerJoin,
|
||||
content_descriptor_tag::Relation::ContentDescriptorId.def(),
|
||||
)
|
||||
.filter(content_descriptor::Column::Descriptor.is_in(cds))
|
||||
.group_by(tag::Column::Id)
|
||||
.all(&db)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|(t, n)| Self::new(db.clone(), t, n))
|
||||
.collect();
|
||||
|
||||
Ok(tags)
|
||||
}
|
||||
|
||||
pub async fn add_all(
|
||||
db: DatabaseConnection,
|
||||
namespaces_with_names: Vec<(Option<i64>, String)>,
|
||||
) -> RepoResult<Vec<Self>> {
|
||||
if namespaces_with_names.is_empty() {
|
||||
return Ok(vec![]);
|
||||
}
|
||||
let models: Vec<tag::ActiveModel> = namespaces_with_names
|
||||
.into_iter()
|
||||
.map(|(namespace_id, name)| tag::ActiveModel {
|
||||
name: Set(name),
|
||||
namespace_id: Set(namespace_id),
|
||||
..Default::default()
|
||||
})
|
||||
.collect();
|
||||
let txn = db.begin().await?;
|
||||
let last_id: i64 = txn
|
||||
.query_one(Statement::from_string(
|
||||
DatabaseBackend::Sqlite,
|
||||
r#"SELECT MAX(id) as "max_id" FROM tags"#.to_owned(),
|
||||
))
|
||||
.await?
|
||||
.and_then(|res| res.try_get("", "max_id").ok())
|
||||
.unwrap_or(-1);
|
||||
|
||||
let result: InsertResult<tag::ActiveModel> =
|
||||
tag::Entity::insert_many(models).exec(&txn).await?;
|
||||
let tags: Vec<Self> = tag::Entity::find()
|
||||
.find_also_related(namespace::Entity)
|
||||
.filter(tag::Column::Id.between(last_id, result.last_insert_id + 1))
|
||||
.all(&txn)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|(t, n)| Self::new(db.clone(), t, n))
|
||||
.collect();
|
||||
txn.commit().await?;
|
||||
|
||||
Ok(tags)
|
||||
}
|
||||
|
||||
/// Adds a new tag to the database
|
||||
#[tracing::instrument(level = "debug", skip(db))]
|
||||
pub async fn add<S: ToString + Debug>(
|
||||
db: DatabaseConnection,
|
||||
name: S,
|
||||
namespace_id: Option<i64>,
|
||||
) -> RepoResult<Self> {
|
||||
let active_model = tag::ActiveModel {
|
||||
name: Set(name.to_string()),
|
||||
namespace_id: Set(namespace_id),
|
||||
..Default::default()
|
||||
};
|
||||
let model: tag::Model = active_model.insert(&db).await?;
|
||||
let namespace = model.find_related(namespace::Entity).one(&db).await?;
|
||||
|
||||
Ok(Self::new(db, model, namespace))
|
||||
}
|
||||
|
||||
/// The ID of the tag
|
||||
pub fn id(&self) -> i64 {
|
||||
self.model.id
|
||||
}
|
||||
|
||||
/// The name of the tag
|
||||
pub fn name(&self) -> &String {
|
||||
&self.model.name
|
||||
}
|
||||
|
||||
/// The namespace of the tag
|
||||
pub fn namespace(&self) -> Option<Namespace> {
|
||||
self.namespace
|
||||
.clone()
|
||||
.map(|n| Namespace::new(self.db.clone(), n))
|
||||
}
|
||||
|
||||
/// Returns the normalized name of the tag (namespace:tag)
|
||||
pub fn normalized_name(&self) -> String {
|
||||
if let Some(namespace) = &self.namespace {
|
||||
format!("{}:{}", namespace.name, self.model.name)
|
||||
} else {
|
||||
self.model.name.to_owned()
|
||||
}
|
||||
}
|
||||
}
|
@ -1,30 +0,0 @@
|
||||
use mediarepo_core::error::RepoResult;
|
||||
use mediarepo_core::fs::thumbnail_store::Dimensions;
|
||||
use std::path::PathBuf;
|
||||
use tokio::fs::{self, File, OpenOptions};
|
||||
use tokio::io::BufReader;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Thumbnail {
|
||||
pub file_hash: String,
|
||||
pub path: PathBuf,
|
||||
pub size: Dimensions,
|
||||
pub mime_type: String,
|
||||
}
|
||||
|
||||
impl Thumbnail {
|
||||
/// Returns the reader of the thumbnail file
|
||||
#[tracing::instrument(level = "debug")]
|
||||
pub async fn get_reader(&self) -> RepoResult<BufReader<File>> {
|
||||
let file = OpenOptions::new().read(true).open(&self.path).await?;
|
||||
Ok(BufReader::new(file))
|
||||
}
|
||||
|
||||
/// Deletes the thumbnail
|
||||
#[tracing::instrument(level = "debug")]
|
||||
pub async fn delete(self) -> RepoResult<()> {
|
||||
fs::remove_file(&self.path).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
@ -1,3 +1,4 @@
|
||||
export type JobType = "MigrateContentDescriptors"
|
||||
| "CalculateSizes"
|
||||
| "CheckIntegrity";
|
||||
| "CheckIntegrity"
|
||||
| "Vacuum";
|
||||
|
@ -1,53 +0,0 @@
|
||||
import {Injectable} from "@angular/core";
|
||||
import {listen} from "@tauri-apps/api/event";
|
||||
|
||||
@Injectable({
|
||||
providedIn: "root"
|
||||
})
|
||||
export class ErrorBrokerService {
|
||||
|
||||
errorCb: Function | undefined;
|
||||
infoCb: Function | undefined;
|
||||
|
||||
constructor() {
|
||||
this.registerListener().catch(err => console.error(err));
|
||||
}
|
||||
|
||||
async registerListener() {
|
||||
const _unlisten = await listen("error", event => {
|
||||
const payload: any = event.payload;
|
||||
if (payload.message) {
|
||||
this.showError(payload);
|
||||
} else {
|
||||
this.showError(payload.toString());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
async try<T>(fn: () => Promise<T>): Promise<T | undefined> {
|
||||
try {
|
||||
return await fn();
|
||||
} catch (err) {
|
||||
this.showError(err);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
showInfo(info: string) {
|
||||
console.log(info);
|
||||
if (this.infoCb) {
|
||||
this.infoCb(info);
|
||||
}
|
||||
}
|
||||
|
||||
showError(error: { message: string } | any) {
|
||||
console.error(error);
|
||||
if (this.errorCb) {
|
||||
if (!error.message) {
|
||||
this.errorCb({ message: error });
|
||||
} else {
|
||||
this.errorCb({ ...error });
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,24 @@
|
||||
export enum LogLevel {
|
||||
Trace,
|
||||
Debug,
|
||||
Info,
|
||||
Warn,
|
||||
Error,
|
||||
}
|
||||
|
||||
export class LogEntry {
|
||||
constructor(private message: string, private level: LogLevel, private error?: Error) {
|
||||
}
|
||||
|
||||
public getMessage(): string {
|
||||
return this.message;
|
||||
}
|
||||
|
||||
public getLevel(): LogLevel {
|
||||
return this.level;
|
||||
}
|
||||
|
||||
public getError(): Error | undefined {
|
||||
return this.error;
|
||||
}
|
||||
}
|
@ -1,13 +1,13 @@
|
||||
import {TestBed} from "@angular/core/testing";
|
||||
|
||||
import {ErrorBrokerService} from "./error-broker.service";
|
||||
import {LoggingService} from "./logging.service";
|
||||
|
||||
describe("ErrorBrokerService", () => {
|
||||
let service: ErrorBrokerService;
|
||||
let service: LoggingService;
|
||||
|
||||
beforeEach(() => {
|
||||
TestBed.configureTestingModule({});
|
||||
service = TestBed.inject(ErrorBrokerService);
|
||||
service = TestBed.inject(LoggingService);
|
||||
});
|
||||
|
||||
it("should be created", () => {
|
@ -0,0 +1,60 @@
|
||||
import {Injectable} from "@angular/core";
|
||||
import {listen} from "@tauri-apps/api/event";
|
||||
import {BehaviorSubject} from "rxjs";
|
||||
import {LogEntry, LogLevel} from "./LogEntry";
|
||||
|
||||
@Injectable({
|
||||
providedIn: "root"
|
||||
})
|
||||
export class LoggingService {
|
||||
|
||||
logs = new BehaviorSubject<LogEntry>(new LogEntry("Log initialized", LogLevel.Trace));
|
||||
|
||||
constructor() {
|
||||
this.registerListener().catch(err => console.error(err));
|
||||
}
|
||||
|
||||
async registerListener() {
|
||||
const _unlisten = await listen("error", event => {
|
||||
const payload: any = event.payload;
|
||||
if (payload.message) {
|
||||
this.error(payload);
|
||||
} else {
|
||||
this.error(payload.toString());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
async try<T>(fn: () => Promise<T>): Promise<T | undefined> {
|
||||
try {
|
||||
return await fn();
|
||||
} catch (err: any) {
|
||||
this.error(err);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
trace(message: string) {
|
||||
this.log(LogLevel.Trace, message);
|
||||
}
|
||||
|
||||
debug(message: string) {
|
||||
this.log(LogLevel.Debug, message);
|
||||
}
|
||||
|
||||
info(message: string) {
|
||||
this.log(LogLevel.Info, message);
|
||||
}
|
||||
|
||||
warn(message: string) {
|
||||
this.log(LogLevel.Warn, message);
|
||||
}
|
||||
|
||||
error(error: Error, message?: string) {
|
||||
this.log(LogLevel.Error, message ?? error.message ?? error.toString(), error);
|
||||
}
|
||||
|
||||
public log(level: LogLevel, message: string, error?: Error) {
|
||||
this.logs.next(new LogEntry(message, level, error));
|
||||
}
|
||||
}
|
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in New Issue