Fix CD handling and update api types

Signed-off-by: trivernis <trivernis@protonmail.com>
pull/4/head
trivernis 2 years ago
parent 2a3ba0d10d
commit ff844f3156

@ -1136,8 +1136,8 @@ checksum = "a3e378b66a060d48947b590737b30a1be76706c8dd7b8ba0f2fe3989c68a853f"
[[package]]
name = "mediarepo-api"
version = "0.23.0"
source = "git+https://github.com/Trivernis/mediarepo-api.git?rev=773e2132c80a80ad90a0585c50baf499c7ceb53f#773e2132c80a80ad90a0585c50baf499c7ceb53f"
version = "0.24.1"
source = "git+https://github.com/Trivernis/mediarepo-api.git?rev=91d8182548bfdb19f2de9afd8c29d5c8ebd48993#91d8182548bfdb19f2de9afd8c29d5c8ebd48993"
dependencies = [
"bromine",
"chrono",

@ -39,7 +39,7 @@ features = ["fs", "io-util", "io-std"]
[dependencies.mediarepo-api]
git = "https://github.com/Trivernis/mediarepo-api.git"
rev = "773e2132c80a80ad90a0585c50baf499c7ceb53f"
rev = "91d8182548bfdb19f2de9afd8c29d5c8ebd48993"
features = ["bromine"]
[features]

@ -56,6 +56,27 @@ impl FileHashStore {
Ok((extension, reader))
}
/// Renames a file
pub async fn rename_file(
&self,
src_descriptor: &[u8],
dst_descriptor: &[u8],
) -> RepoResult<()> {
let src_path = self.descriptor_to_file_path(src_descriptor);
if !src_path.exists() {
tracing::warn!("file {:?} doesn't exist", src_path);
return Ok(());
}
let dst_path = self.descriptor_to_file_path(dst_descriptor);
let dst_parent = dst_path.parent().unwrap();
if !dst_parent.exists() {
fs::create_dir(dst_parent).await?;
}
fs::rename(src_path, dst_path).await?;
Ok(())
}
/// Scans the size of the folder
#[inline]
pub async fn get_size(&self) -> RepoResult<u64> {

@ -1,5 +1,6 @@
use crate::error::RepoResult;
use crate::utils::get_folder_size;
use std::fmt::Debug;
use std::io::Result;
use std::path::PathBuf;
use tokio::fs;
@ -77,6 +78,23 @@ impl ThumbnailStore {
Ok(entries)
}
/// Renames a thumbnail parent
#[tracing::instrument(level = "debug")]
pub async fn rename_parent<S1: AsRef<str> + Debug, S2: AsRef<str> + Debug>(
&self,
src: S1,
dst: S2,
) -> Result<()> {
let src_dir = self.path.join(src.as_ref());
if !src_dir.exists() {
tracing::warn!("directory {:?} doesn't exist", src_dir);
return Ok(());
}
let dst_dir = self.path.join(dst.as_ref());
fs::rename(src_dir, dst_dir).await
}
/// Returns the size of the folder
#[tracing::instrument(level = "debug")]
pub async fn get_size(&self) -> RepoResult<u64> {

@ -1,4 +1,5 @@
use crate::file::File;
use mediarepo_core::content_descriptor::convert_v1_descriptor_to_v2;
use mediarepo_core::error::RepoResult;
use mediarepo_database::entities::content_descriptor;
use mediarepo_database::entities::file;
@ -6,17 +7,28 @@ use sea_orm::prelude::*;
use sea_orm::{DatabaseConnection, Set};
use std::fmt::Debug;
pub struct Hash {
pub struct ContentDescriptor {
db: DatabaseConnection,
model: content_descriptor::Model,
}
impl Hash {
impl ContentDescriptor {
#[tracing::instrument(level = "trace")]
pub(crate) fn new(db: DatabaseConnection, model: content_descriptor::Model) -> Self {
Self { db, model }
}
pub async fn all(db: DatabaseConnection) -> RepoResult<Vec<Self>> {
let descriptors = content_descriptor::Entity::find()
.all(&db)
.await?
.into_iter()
.map(|model| Self::new(db.clone(), model))
.collect();
Ok(descriptors)
}
/// Searches for the hash by id
#[tracing::instrument(level = "debug", skip(db))]
pub async fn by_id(db: DatabaseConnection, id: i64) -> RepoResult<Option<Self>> {
@ -75,4 +87,15 @@ impl Hash {
Ok(file)
}
pub async fn convert_v1_to_v2(&mut self) -> RepoResult<()> {
let descriptor = convert_v1_descriptor_to_v2(&self.model.descriptor)?;
let active_model = content_descriptor::ActiveModel {
id: Set(self.id()),
descriptor: Set(descriptor),
};
self.model = active_model.update(&self.db).await?;
Ok(())
}
}

@ -83,12 +83,9 @@ impl File {
/// Finds the file by hash
#[tracing::instrument(level = "debug", skip(db))]
pub async fn by_cd<S: AsRef<str> + Debug>(
db: DatabaseConnection,
cid: S,
) -> RepoResult<Option<Self>> {
pub async fn by_cd(db: DatabaseConnection, cd: &[u8]) -> RepoResult<Option<Self>> {
if let Some((hash, Some(model))) = content_descriptor::Entity::find()
.filter(content_descriptor::Column::Descriptor.eq(cid.as_ref()))
.filter(content_descriptor::Column::Descriptor.eq(cd))
.find_also_related(file::Entity)
.one(&db)
.await?

@ -30,6 +30,20 @@ impl FileMetadata {
Ok(file_metadata)
}
/// Fetches metadata for all given file ids
#[tracing::instrument(level = "debug", skip(db))]
pub async fn all_by_ids(db: DatabaseConnection, ids: Vec<i64>) -> RepoResult<Vec<Self>> {
let file_metadata = file_metadata::Entity::find()
.filter(file_metadata::Column::FileId.is_in(ids))
.all(&db)
.await?
.into_iter()
.map(|m| FileMetadata::new(db.clone(), m))
.collect();
Ok(file_metadata)
}
/// Adds a file with its hash to the database
#[tracing::instrument(level = "debug", skip(db))]
pub(crate) async fn add(

@ -1,3 +1,4 @@
use crate::content_descriptor::ContentDescriptor;
use crate::file::File;
use crate::file_metadata::FileMetadata;
use crate::namespace::Namespace;
@ -5,7 +6,7 @@ use crate::storage::Storage;
use crate::tag::Tag;
use crate::thumbnail::Thumbnail;
use chrono::{Local, NaiveDateTime};
use mediarepo_core::content_descriptor::encode_content_descriptor;
use mediarepo_core::content_descriptor::{encode_content_descriptor, is_v1_content_descriptor};
use mediarepo_core::error::{RepoError, RepoResult};
use mediarepo_core::fs::thumbnail_store::{Dimensions, ThumbnailStore};
use mediarepo_core::itertools::Itertools;
@ -92,8 +93,8 @@ impl Repo {
/// Returns a file by its mapped hash
#[tracing::instrument(level = "debug", skip(self))]
pub async fn file_by_cd<S: AsRef<str> + Debug>(&self, hash: S) -> RepoResult<Option<File>> {
File::by_cd(self.db.clone(), hash).await
pub async fn file_by_cd(&self, cd: &[u8]) -> RepoResult<Option<File>> {
File::by_cd(self.db.clone(), cd).await
}
/// Returns a file by id
@ -129,6 +130,12 @@ impl Repo {
File::find_by_tags(self.db.clone(), tag_ids).await
}
/// Returns all file metadata entries for the given file ids
#[tracing::instrument(level = "debug", skip(self))]
pub async fn get_file_metadata_for_ids(&self, ids: Vec<i64>) -> RepoResult<Vec<FileMetadata>> {
FileMetadata::all_by_ids(self.db.clone(), ids).await
}
/// Adds a file from bytes to the database
#[tracing::instrument(level = "debug", skip(self, content))]
pub async fn add_file(
@ -283,11 +290,8 @@ impl Repo {
/// Finds all tags that are assigned to the given list of hashes
#[tracing::instrument(level = "debug", skip_all)]
pub async fn find_tags_for_file_identifiers(
&self,
hashes: Vec<Vec<u8>>,
) -> RepoResult<Vec<Tag>> {
Tag::for_hash_list(self.db.clone(), hashes).await
pub async fn find_tags_for_file_identifiers(&self, cds: Vec<Vec<u8>>) -> RepoResult<Vec<Tag>> {
Tag::for_cd_list(self.db.clone(), cds).await
}
/// Adds all tags that are not in the database to the database and returns the ones already existing as well
@ -410,6 +414,34 @@ impl Repo {
get_all_counts(&self.db).await
}
pub async fn migrate(&self) -> RepoResult<()> {
let cds = ContentDescriptor::all(self.db.clone()).await?;
tracing::info!("Converting content descriptors to v2 format...");
let mut converted_count = 0;
let thumb_store = self.get_thumbnail_storage()?;
let file_store = self.get_main_storage()?;
for mut cd in cds {
if is_v1_content_descriptor(cd.descriptor()) {
let src_cd = cd.descriptor().to_owned();
cd.convert_v1_to_v2().await?;
let dst_cd = cd.descriptor().to_owned();
file_store.rename_entry(&src_cd, &dst_cd).await?;
thumb_store
.rename_parent(
encode_content_descriptor(&src_cd),
encode_content_descriptor(&dst_cd),
)
.await?;
converted_count += 1;
}
}
tracing::info!("Converted {} descriptors", converted_count);
Ok(())
}
#[tracing::instrument(level = "trace", skip(self))]
fn get_main_storage(&self) -> RepoResult<&Storage> {
if let Some(storage) = &self.main_storage {

@ -1,4 +1,4 @@
use crate::content_descriptor::Hash;
use crate::content_descriptor::ContentDescriptor;
use mediarepo_core::error::RepoResult;
use mediarepo_core::fs::file_hash_store::FileHashStore;
use mediarepo_database::entities::storage;
@ -161,15 +161,28 @@ impl Storage {
/// Adds a thumbnail
#[tracing::instrument(level = "debug", skip(self, reader))]
pub async fn store_entry<R: AsyncRead + Unpin>(&self, reader: R) -> RepoResult<Hash> {
let hash = self.store.add_file(reader, None).await?;
if let Some(hash) = Hash::by_value(self.db.clone(), &hash).await? {
pub async fn store_entry<R: AsyncRead + Unpin>(
&self,
reader: R,
) -> RepoResult<ContentDescriptor> {
let descriptor = self.store.add_file(reader, None).await?;
if let Some(hash) = ContentDescriptor::by_value(self.db.clone(), &descriptor).await? {
Ok(hash)
} else {
Hash::add(self.db.clone(), hash).await
ContentDescriptor::add(self.db.clone(), descriptor).await
}
}
/// Renames an entry
#[tracing::instrument(level = "debug", skip(self))]
pub async fn rename_entry(
&self,
src_descriptor: &[u8],
dst_descriptor: &[u8],
) -> RepoResult<()> {
self.store.rename_file(src_descriptor, dst_descriptor).await
}
/// Returns the buf reader to the given hash
#[tracing::instrument(level = "debug", skip(self))]
pub async fn get_file_reader(

@ -118,10 +118,7 @@ impl Tag {
/// Returns all tags that are assigned to any of the passed hashes
#[tracing::instrument(level = "debug", skip_all)]
pub async fn for_hash_list(
db: DatabaseConnection,
hashes: Vec<Vec<u8>>,
) -> RepoResult<Vec<Self>> {
pub async fn for_cd_list(db: DatabaseConnection, cds: Vec<Vec<u8>>) -> RepoResult<Vec<Self>> {
let tags: Vec<Self> = tag::Entity::find()
.find_also_related(namespace::Entity)
.join(
@ -132,7 +129,7 @@ impl Tag {
JoinType::InnerJoin,
content_descriptor_tag::Relation::ContentDescriptorId.def(),
)
.filter(content_descriptor::Column::Descriptor.is_in(hashes))
.filter(content_descriptor::Column::Descriptor.is_in(cds))
.group_by(tag::Column::Id)
.all(&db)
.await?

@ -18,7 +18,6 @@ impl FromModel<FileMetadata> for FileMetadataResponse {
file_id: metadata.file_id(),
name: metadata.name().to_owned(),
comment: metadata.comment().to_owned(),
file_type: 0,
creation_time: metadata.creation_time().to_owned(),
change_time: metadata.change_time().to_owned(),
import_time: metadata.import_time().to_owned(),
@ -31,7 +30,7 @@ impl FromModel<File> for FileBasicDataResponse {
FileBasicDataResponse {
id: file.id(),
status: FileStatus::from_model(file.status()),
cid: file.encoded_cd(),
cd: file.encoded_cd(),
mime_type: file.mime_type().to_owned(),
}
}

@ -16,8 +16,11 @@ use mediarepo_core::utils::parse_namespace_and_tag;
use mediarepo_database::queries::tags::{
get_cids_with_namespaced_tags, get_content_descriptors_with_tag_count,
};
use mediarepo_model::file_metadata::FileMetadata;
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
use std::cmp::Ordering;
use std::collections::HashMap;
use std::iter::FromIterator;
use tokio::io::AsyncReadExt;
pub struct FilesNamespace;
@ -139,30 +142,36 @@ impl FilesNamespace {
.collect();
let mut files = repo.find_files_by_tags(tags).await?;
let hash_ids: Vec<i64> = files.iter().map(|f| f.cd_id()).collect();
let hash_ids: Vec<i64> = files.par_iter().map(|f| f.cd_id()).collect();
let file_ids: Vec<i64> = files.par_iter().map(|f| f.id()).collect();
let mut cid_nsp: HashMap<i64, HashMap<String, Vec<String>>> =
get_cids_with_namespaced_tags(repo.db(), hash_ids.clone()).await?;
let mut cid_tag_counts =
get_content_descriptors_with_tag_count(repo.db(), hash_ids).await?;
let files_metadata = repo.get_file_metadata_for_ids(file_ids).await?;
let mut file_metadata_map: HashMap<i64, FileMetadata> =
HashMap::from_iter(files_metadata.into_iter().map(|m| (m.file_id(), m)));
let mut contexts = HashMap::new();
for file in &files {
let metadata = file.metadata().await?;
let context = FileSortContext {
name: metadata.name().to_owned(),
size: metadata.size() as u64,
mime_type: file.mime_type().to_owned(),
namespaces: cid_nsp
.remove(&file.cd_id())
.unwrap_or(HashMap::with_capacity(0)),
tag_count: cid_tag_counts.remove(&file.cd_id()).unwrap_or(0),
import_time: metadata.import_time().to_owned(),
create_time: metadata.import_time().to_owned(),
change_time: metadata.change_time().to_owned(),
};
contexts.insert(file.id(), context);
if let Some(metadata) = file_metadata_map.remove(&file.id()) {
let context = FileSortContext {
name: metadata.name().to_owned(),
size: metadata.size() as u64,
mime_type: file.mime_type().to_owned(),
namespaces: cid_nsp
.remove(&file.cd_id())
.unwrap_or(HashMap::with_capacity(0)),
tag_count: cid_tag_counts.remove(&file.cd_id()).unwrap_or(0),
import_time: metadata.import_time().to_owned(),
create_time: metadata.import_time().to_owned(),
change_time: metadata.change_time().to_owned(),
};
contexts.insert(file.id(), context);
}
}
let sort_expression = req.sort_expression;
tracing::debug!("sort_expression = {:?}", sort_expression);

@ -16,9 +16,9 @@ pub async fn get_repo_from_context(ctx: &Context) -> Arc<Repo> {
pub async fn file_by_identifier(identifier: FileIdentifier, repo: &Repo) -> RepoResult<File> {
let file = match identifier {
FileIdentifier::ID(id) => repo.file_by_id(id).await,
FileIdentifier::CD(cd) => repo.file_by_cd(cd).await,
FileIdentifier::CD(cd) => repo.file_by_cd(&decode_content_descriptor(cd)?).await,
}?;
file.ok_or_else(|| RepoError::from("Thumbnail not found"))
file.ok_or_else(|| RepoError::from("File not found"))
}
pub async fn cd_by_identifier(identifier: FileIdentifier, repo: &Repo) -> RepoResult<Vec<u8>> {

@ -116,6 +116,7 @@ async fn init_repo(opt: &Opt) -> RepoResult<(Settings, Repo)> {
repo.set_main_storage(&settings.default_file_store).await?;
repo.set_thumbnail_storage(opt.repo.join(&settings.thumbnail_store))
.await?;
repo.migrate().await?;
Ok((settings, repo))
}

Loading…
Cancel
Save