Change database schema to split mandatory file data and metadata

Signed-off-by: trivernis <trivernis@protonmail.com>
pull/4/head
trivernis 3 years ago
parent 90b9f17be2
commit 2a3ba0d10d

@ -1136,8 +1136,8 @@ checksum = "a3e378b66a060d48947b590737b30a1be76706c8dd7b8ba0f2fe3989c68a853f"
[[package]]
name = "mediarepo-api"
version = "0.20.0"
source = "git+https://github.com/Trivernis/mediarepo-api.git?rev=0c897acfd959c776fc10bd8fabdd2eb22b437be3#0c897acfd959c776fc10bd8fabdd2eb22b437be3"
version = "0.23.0"
source = "git+https://github.com/Trivernis/mediarepo-api.git?rev=773e2132c80a80ad90a0585c50baf499c7ceb53f#773e2132c80a80ad90a0585c50baf499c7ceb53f"
dependencies = [
"bromine",
"chrono",
@ -1152,6 +1152,7 @@ name = "mediarepo-core"
version = "0.1.0"
dependencies = [
"base64",
"data-encoding",
"futures 0.3.19",
"glob",
"itertools",
@ -1229,6 +1230,7 @@ dependencies = [
"mediarepo-database",
"mediarepo-model",
"port_check",
"rayon",
"serde",
"tokio",
"tracing",

@ -18,6 +18,7 @@ futures = "^0.3.19"
itertools = "^0.10.3"
glob = "^0.3.0"
tracing = "0.1.29"
data-encoding = "2.3.2"
[dependencies.thumbnailer]
version = "^0.2.4"
@ -38,7 +39,7 @@ features = ["fs", "io-util", "io-std"]
[dependencies.mediarepo-api]
git = "https://github.com/Trivernis/mediarepo-api.git"
rev = "0c897acfd959c776fc10bd8fabdd2eb22b437be3"
rev = "773e2132c80a80ad90a0585c50baf499c7ceb53f"
features = ["bromine"]
[features]

@ -0,0 +1,65 @@
use crate::error::RepoResult;
use multihash::{Code, MultihashDigest};
/// Creates a new content descriptor for the given file
pub fn create_content_descriptor(bytes: &[u8]) -> Vec<u8> {
Code::Sha2_256.digest(bytes).to_bytes()
}
/// Encodes a content descriptor while respecting the version
pub fn encode_content_descriptor(descriptor: &[u8]) -> String {
if is_v1_content_descriptor(descriptor) {
encode_content_descriptor_v1(descriptor)
} else {
encode_content_descriptor_v2(descriptor)
}
}
/// Encodes a v1 descriptor that is already stored encoded in the database (only interprets it as string)
pub fn encode_content_descriptor_v1(descriptor: &[u8]) -> String {
String::from_utf8_lossy(descriptor).to_string()
}
/// Encodes the content descriptor as base32 lowercase
pub fn encode_content_descriptor_v2(descriptor: &[u8]) -> String {
data_encoding::BASE32_DNSSEC.encode(descriptor)
}
/// Decodes a content descriptor
pub fn decode_content_descriptor<S: AsRef<str>>(descriptor: S) -> RepoResult<Vec<u8>> {
// check for v1 descriptor with a fixed length of 53 starting with the prefix of the base and hash
if is_v1_content_descriptor_string(descriptor.as_ref()) {
decode_content_descriptor_v1(descriptor)
} else {
decode_content_descriptor_v2(descriptor)
}
}
/// Decodes the first version of content descriptors (multibase)
pub fn decode_content_descriptor_v1<S: AsRef<str>>(descriptor: S) -> RepoResult<Vec<u8>> {
Ok(descriptor.as_ref().as_bytes().to_vec())
}
/// Decodes the second version of content descriptors (faster fixed base32)
pub fn decode_content_descriptor_v2<S: AsRef<str>>(descriptor: S) -> RepoResult<Vec<u8>> {
let data = data_encoding::BASE32_DNSSEC.decode(descriptor.as_ref().as_bytes())?;
Ok(data)
}
/// Decodes the data stored in the v1 content descriptor into the v2 format
pub fn convert_v1_descriptor_to_v2(descriptor_v1: &[u8]) -> RepoResult<Vec<u8>> {
let (_, data) = multibase::decode(encode_content_descriptor_v1(descriptor_v1))?;
Ok(data)
}
/// Checks if a binary descriptor is v1
pub fn is_v1_content_descriptor(descriptor: &[u8]) -> bool {
descriptor.len() == 56 && descriptor.starts_with(b"bciq")
}
/// Checks if a descriptor string is a v1 descriptor
pub fn is_v1_content_descriptor_string<S: AsRef<str>>(descriptor: S) -> bool {
descriptor.as_ref().len() == 56 && descriptor.as_ref().starts_with("bciq")
}

@ -31,8 +31,11 @@ pub enum RepoError {
#[error(transparent)]
Thumbnailer(#[from] thumbnailer::error::ThumbError),
#[error("No free tcp port available")]
#[error("no free tcp port available")]
PortUnavailable,
#[error("failed to decode data {0}")]
Decode(#[from] data_encoding::DecodeError),
}
#[derive(Error, Debug)]

@ -1,14 +1,11 @@
use crate::content_descriptor::{create_content_descriptor, encode_content_descriptor};
use crate::error::RepoResult;
use crate::utils::get_folder_size;
use multibase::Base;
use multihash::{Code, MultihashDigest};
use std::path::PathBuf;
use tokio::fs;
use tokio::fs::{File, OpenOptions};
use tokio::io::{AsyncRead, AsyncReadExt, BufReader};
static STORE_BASE: Base = Base::Base32Lower;
#[derive(Clone, Debug)]
pub struct FileHashStore {
path: PathBuf,
@ -24,35 +21,31 @@ impl FileHashStore {
&self,
mut reader: R,
extension: Option<&str>,
) -> RepoResult<String> {
) -> RepoResult<Vec<u8>> {
let mut buf = Vec::new();
reader.read_to_end(&mut buf).await?;
let hash: Vec<u8> = Code::Sha2_256.digest(&buf).to_bytes();
let hash: String = multibase::encode(STORE_BASE, &hash);
let folder_path = self.hash_to_folder_path(&hash);
let descriptor = create_content_descriptor(&buf);
let file_path = self.descriptor_to_file_path(&descriptor);
let folder_path = file_path.parent().unwrap();
if !folder_path.exists() {
fs::create_dir(folder_path).await?;
}
let mut file_path = self.hash_to_file_path(&hash);
let mut file_path = self.descriptor_to_file_path(&descriptor);
if let Some(extension) = extension {
file_path.set_extension(extension);
}
fs::write(file_path, buf).await?;
Ok(hash)
Ok(descriptor)
}
/// Returns the file extension and a reader for the file by hash
pub async fn get_file(
&self,
mut hash: String,
descriptor: &[u8],
) -> RepoResult<(Option<String>, BufReader<File>)> {
let (base, data) = multibase::decode(&hash)?;
if base != STORE_BASE {
hash = multibase::encode(STORE_BASE, data);
}
let file_path = self.hash_to_file_path(&hash);
let file_path = self.descriptor_to_file_path(descriptor);
let extension = file_path
.extension()
.and_then(|s| s.to_str())
@ -69,17 +62,18 @@ impl FileHashStore {
get_folder_size(self.path.to_owned()).await
}
fn hash_to_file_path(&self, hash: &str) -> PathBuf {
let mut path = self.hash_to_folder_path(hash);
path.push(hash);
fn descriptor_to_file_path(&self, descriptor: &[u8]) -> PathBuf {
let descriptor_string = encode_content_descriptor(descriptor);
let mut path = self.descriptor_string_to_folder_path(&descriptor_string);
path.push(descriptor_string);
path
}
fn hash_to_folder_path(&self, hash: &str) -> PathBuf {
assert!(hash.len() >= 2);
fn descriptor_string_to_folder_path(&self, descriptor: &str) -> PathBuf {
assert!(descriptor.len() >= 2);
let mut path = self.path.clone();
path.push(&hash[hash.len() - 3..hash.len() - 1]);
path.push(&descriptor[descriptor.len() - 3..descriptor.len() - 1]);
path
}

@ -4,6 +4,7 @@ pub use mediarepo_api;
pub use mediarepo_api::bromine;
pub use thumbnailer;
pub mod content_descriptor;
pub mod context;
pub mod error;
pub mod fs;

@ -0,0 +1,107 @@
-- Add migration script here
PRAGMA foreign_keys= off;
-- create backup files table
ALTER TABLE files
RENAME TO _files_old;
-- create backup hashes table
ALTER TABLE hashes
RENAME TO _hashes_old;
-- create backup hash_tag_mappings table
ALTER TABLE hash_tag_mappings
RENAME TO _hash_tag_mappings_old;
-- create backup hash_source_mappings table
ALTER TABLE hash_source_mappings
RENAME TO _hash_source_mappings_old;
-- create content id table
CREATE TABLE content_descriptors
(
id INTEGER PRIMARY KEY AUTOINCREMENT,
descriptor BLOB NOT NULL
);
CREATE UNIQUE INDEX content_descriptor_values ON content_descriptors (descriptor);
-- create content-id tag mappings table
CREATE TABLE cd_tag_mappings
(
cd_id INTEGER NOT NULL REFERENCES content_descriptors (id),
tag_id INTEGER NOT NULL REFERENCES tags (id),
PRIMARY KEY (cd_id, tag_id)
);
CREATE UNIQUE INDEX content_descriptor_tag_mapping_unique ON cd_tag_mappings (cd_id, tag_id);
CREATE INDEX content_descriptor_tag_mapping_tag ON cd_tag_mappings (tag_id);
-- create content-id source mappings table
CREATE TABLE cd_source_mappings
(
cd_id INTEGER NOT NULL REFERENCES content_descriptors (id),
source_id INTEGER NOT NULL REFERENCES sources (id),
PRIMARY KEY (cd_id, source_id)
);
CREATE UNIQUE INDEX content_descriptor_source_mapping_unique ON cd_source_mappings (cd_id, source_id);
-- create new files table
CREATE TABLE files
(
id INTEGER PRIMARY KEY AUTOINCREMENT,
status INTEGER NOT NULL DEFAULT 10,
storage_id INTEGER NOT NULL REFERENCES storage_locations (id),
cd_id INTEGER NOT NULL REFERENCES content_descriptors (id),
mime_type VARCHAR(128) NOT NULL DEFAULT 'application/octet-stream'
);
CREATE INDEX files_contend_descriptor ON files (cd_id);
-- create metadata table
CREATE TABLE file_metadata
(
file_id INTEGER PRIMARY KEY REFERENCES files (id),
size INTEGER NOT NULL,
name VARCHAR(128),
comment VARCHAR(1024),
import_time DATETIME NOT NULL,
creation_time DATETIME NOT NULL,
change_time DATETIME NOT NULL
);
CREATE UNIQUE INDEX file_metadata_file_id_unique ON file_metadata (file_id);
-- add content identifiers from hashes table
INSERT INTO content_descriptors
SELECT id, value
FROM _hashes_old;
-- add files from files table
INSERT INTO files (id, storage_id, cd_id, mime_type)
SELECT id, storage_id, hash_id AS content_id, mime_type
FROM _files_old;
-- add metadata from files table
INSERT INTO file_metadata
SELECT id AS file_id, size, name, comment, import_time, creation_time, change_time
FROM _files_old;
-- add content tag mappings
INSERT INTO cd_tag_mappings
SELECT hash_id AS content_id, tag_id
FROM _hash_tag_mappings_old;
-- add content id source mappings
INSERT INTO cd_source_mappings
SELECT hash_id AS content_id, source_id
FROM _hash_source_mappings_old;
-- drop all old tables
DROP TABLE _hash_source_mappings_old;
DROP TABLE _hash_tag_mappings_old;
DROP TABLE _files_old;
DROP TABLE _hashes_old;
pragma foreign_keys= on;

@ -1,11 +1,11 @@
use sea_orm::prelude::*;
#[derive(Clone, Debug, PartialEq, DeriveEntityModel)]
#[sea_orm(table_name = "hashes")]
#[sea_orm(table_name = "content_descriptors")]
pub struct Model {
#[sea_orm(primary_key)]
pub id: i64,
pub value: String,
pub descriptor: Vec<u8>,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
@ -13,27 +13,35 @@ pub enum Relation {}
impl Related<super::file::Entity> for Entity {
fn to() -> RelationDef {
super::file::Relation::Hash.def().rev()
super::file::Relation::ContentDescriptorId.def().rev()
}
}
impl Related<super::tag::Entity> for Entity {
fn to() -> RelationDef {
super::hash_tag::Relation::Tag.def()
super::content_descriptor_tag::Relation::Tag.def()
}
fn via() -> Option<RelationDef> {
Some(super::hash_tag::Relation::Hash.def().rev())
Some(
super::content_descriptor_tag::Relation::ContentDescriptorId
.def()
.rev(),
)
}
}
impl Related<super::source::Entity> for Entity {
fn to() -> RelationDef {
super::hash_source::Relation::Source.def()
super::content_descriptor_source::Relation::Source.def()
}
fn via() -> Option<RelationDef> {
Some(super::hash_source::Relation::Hash.def().rev())
Some(
super::content_descriptor_source::Relation::ContentDescriptorId
.def()
.rev(),
)
}
}

@ -1,10 +1,10 @@
use sea_orm::prelude::*;
#[derive(Clone, Debug, PartialEq, DeriveEntityModel)]
#[sea_orm(table_name = "hash_source_mappings")]
#[sea_orm(table_name = "cd_source_mappings")]
pub struct Model {
#[sea_orm(primary_key)]
pub hash_id: i64,
pub cd_id: i64,
#[sea_orm(primary_key)]
pub source_id: i64,
}
@ -12,11 +12,11 @@ pub struct Model {
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {
#[sea_orm(
belongs_to = "super::hash::Entity",
from = "Column::HashId",
to = "super::hash::Column::Id"
belongs_to = "super::content_descriptor::Entity",
from = "Column::CdId",
to = "super::content_descriptor::Column::Id"
)]
Hash,
ContentDescriptorId,
#[sea_orm(
belongs_to = "super::source::Entity",
from = "Column::SourceId",
@ -25,9 +25,9 @@ pub enum Relation {
Source,
}
impl Related<super::hash::Entity> for Entity {
impl Related<super::content_descriptor::Entity> for Entity {
fn to() -> RelationDef {
Relation::Hash.def()
Relation::ContentDescriptorId.def()
}
}

@ -1,10 +1,10 @@
use sea_orm::prelude::*;
#[derive(Clone, Debug, PartialEq, DeriveEntityModel)]
#[sea_orm(table_name = "hash_tag_mappings")]
#[sea_orm(table_name = "cd_tag_mappings")]
pub struct Model {
#[sea_orm(primary_key)]
pub hash_id: i64,
pub cd_id: i64,
#[sea_orm(primary_key)]
pub tag_id: i64,
}
@ -12,11 +12,11 @@ pub struct Model {
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {
#[sea_orm(
belongs_to = "super::hash::Entity",
from = "Column::HashId",
to = "super::hash::Column::Id"
belongs_to = "super::content_descriptor::Entity",
from = "Column::CdId",
to = "super::content_descriptor::Column::Id"
)]
Hash,
ContentDescriptorId,
#[sea_orm(
belongs_to = "super::tag::Entity",
from = "Column::TagId",
@ -25,9 +25,9 @@ pub enum Relation {
Tag,
}
impl Related<super::hash::Entity> for Entity {
impl Related<super::content_descriptor::Entity> for Entity {
fn to() -> RelationDef {
Relation::Hash.def()
Relation::ContentDescriptorId.def()
}
}

@ -1,4 +1,3 @@
use chrono::NaiveDateTime;
use sea_orm::prelude::*;
#[derive(Clone, Debug, PartialEq, DeriveEntityModel)]
@ -6,26 +5,20 @@ use sea_orm::prelude::*;
pub struct Model {
#[sea_orm(primary_key)]
pub id: i64,
pub file_type: u32,
pub name: Option<String>,
pub comment: Option<String>,
pub mime_type: Option<String>,
pub size: Option<i64>,
pub status: i32,
pub mime_type: String,
pub storage_id: i64,
pub hash_id: i64,
pub import_time: NaiveDateTime,
pub creation_time: NaiveDateTime,
pub change_time: NaiveDateTime,
pub cd_id: i64,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {
#[sea_orm(
belongs_to = "super::hash::Entity",
from = "Column::HashId",
to = "super::hash::Column::Id"
belongs_to = "super::content_descriptor::Entity",
from = "Column::CdId",
to = "super::content_descriptor::Column::Id"
)]
Hash,
ContentDescriptorId,
#[sea_orm(
belongs_to = "super::storage::Entity",
@ -35,9 +28,9 @@ pub enum Relation {
Storage,
}
impl Related<super::hash::Entity> for Entity {
impl Related<super::content_descriptor::Entity> for Entity {
fn to() -> RelationDef {
Relation::Hash.def()
Relation::ContentDescriptorId.def()
}
}
@ -47,4 +40,10 @@ impl Related<super::storage::Entity> for Entity {
}
}
impl Related<super::file_metadata::Entity> for Entity {
fn to() -> RelationDef {
super::file_metadata::Relation::File.def().rev()
}
}
impl ActiveModelBehavior for ActiveModel {}

@ -0,0 +1,32 @@
use chrono::NaiveDateTime;
use sea_orm::prelude::*;
#[derive(Clone, Debug, PartialEq, DeriveEntityModel)]
#[sea_orm(table_name = "file_metadata")]
pub struct Model {
#[sea_orm(primary_key)]
pub file_id: i64,
pub name: Option<String>,
pub comment: Option<String>,
pub size: i64,
pub import_time: NaiveDateTime,
pub creation_time: NaiveDateTime,
pub change_time: NaiveDateTime,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {
#[sea_orm(
belongs_to = "super::file::Entity",
from = "Column::FileId",
to = "super::file::Column::Id"
)]
File,
}
impl Related<super::file::Entity> for Entity {
fn to() -> RelationDef {
Relation::File.def()
}
}
impl ActiveModelBehavior for ActiveModel {}

@ -1,7 +1,8 @@
pub mod content_descriptor;
pub mod content_descriptor_source;
pub mod content_descriptor_tag;
pub mod file;
pub mod hash;
pub mod hash_source;
pub mod hash_tag;
pub mod file_metadata;
pub mod namespace;
pub mod source;
pub mod storage;

@ -11,13 +11,17 @@ pub struct Model {
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {}
impl Related<super::hash::Entity> for Entity {
impl Related<super::content_descriptor::Entity> for Entity {
fn to() -> RelationDef {
super::hash_source::Relation::Hash.def()
super::content_descriptor_source::Relation::ContentDescriptorId.def()
}
fn via() -> Option<RelationDef> {
Some(super::hash_source::Relation::Source.def().rev())
Some(
super::content_descriptor_source::Relation::Source
.def()
.rev(),
)
}
}

@ -19,13 +19,13 @@ pub enum Relation {
Namespace,
}
impl Related<super::hash::Entity> for Entity {
impl Related<super::content_descriptor::Entity> for Entity {
fn to() -> RelationDef {
super::hash_tag::Relation::Hash.def()
super::content_descriptor_tag::Relation::ContentDescriptorId.def()
}
fn via() -> Option<RelationDef> {
Some(super::hash_tag::Relation::Tag.def().rev())
Some(super::content_descriptor_tag::Relation::Tag.def().rev())
}
}

@ -7,7 +7,7 @@ use mediarepo_core::error::{RepoError, RepoResult};
#[derive(Debug, FromQueryResult)]
pub struct Counts {
pub file_count: i64,
pub hash_count: i64,
pub cd_count: i64,
pub tag_count: i64,
pub namespace_count: i64,
pub source_count: i64,
@ -20,11 +20,11 @@ pub async fn get_all_counts(db: &DatabaseConnection) -> RepoResult<Counts> {
r#"
SELECT *
FROM (SELECT COUNT(*) AS file_count FROM files),
(SELECT COUNT(*) AS hash_count FROM hashes),
(SELECT COUNT(*) AS cd_count FROM content_descriptors),
(SELECT COUNT(*) AS tag_count FROM tags),
(SELECT COUNT(*) AS namespace_count FROM namespaces),
(SELECT COUNT(*) AS source_count FROM sources),
(SELECT COUNT(*) AS mapping_count FROM hash_tag_mappings)
(SELECT COUNT(*) AS mapping_count FROM cd_tag_mappings)
"#,
vec![],
))

@ -7,27 +7,27 @@ use std::fmt::Display;
use std::iter::FromIterator;
#[derive(Debug, FromQueryResult)]
struct HashNamespaceTags {
hash_id: i64,
struct CIDNamespaceTag {
cd_id: i64,
namespace: String,
tag: String,
}
#[tracing::instrument(level = "debug", skip_all)]
pub async fn get_hashes_with_namespaced_tags(
pub async fn get_cids_with_namespaced_tags(
db: &DatabaseConnection,
hash_ids: Vec<i64>,
) -> RepoResult<HashMap<i64, HashMap<String, Vec<String>>>> {
let hash_namespace_tags: Vec<HashNamespaceTags> =
HashNamespaceTags::find_by_statement(Statement::from_sql_and_values(
let hash_namespace_tags: Vec<CIDNamespaceTag> =
CIDNamespaceTag::find_by_statement(Statement::from_sql_and_values(
DbBackend::Sqlite,
format!(
r#"SELECT htm.hash_id, n.name as namespace, t.name as tag
FROM hash_tag_mappings htm
INNER JOIN tags t on htm.tag_id = t.id
r#"SELECT ctm.cd_id, n.name as namespace, t.name as tag
FROM cd_tag_mappings ctm
INNER JOIN tags t on ctm.tag_id = t.id
JOIN namespaces n on t.namespace_id = n.id
WHERE t.namespace_id IS NOT NULL
AND htm.hash_id IN ({}) ORDER BY t.namespace_id;"#,
AND ctm.cd_id IN ({}) ORDER BY t.namespace_id;"#,
vec_to_query_list(hash_ids)
)
.as_str(),
@ -35,49 +35,49 @@ pub async fn get_hashes_with_namespaced_tags(
))
.all(db)
.await?;
let mut hash_namespaces: HashMap<i64, HashMap<String, Vec<String>>> = HashMap::new();
let mut cd_id_namespaces: HashMap<i64, HashMap<String, Vec<String>>> = HashMap::new();
for hnt in hash_namespace_tags {
if let Some(entry) = hash_namespaces.get_mut(&hnt.hash_id) {
if let Some(entry) = cd_id_namespaces.get_mut(&hnt.cd_id) {
if let Some(nsp_entry) = entry.get_mut(&hnt.namespace) {
nsp_entry.push(hnt.tag);
} else {
entry.insert(hnt.namespace, vec![hnt.tag]);
}
} else {
hash_namespaces.insert(
hnt.hash_id,
cd_id_namespaces.insert(
hnt.cd_id,
HashMap::from_iter(vec![(hnt.namespace, vec![hnt.tag])].into_iter()),
);
}
}
Ok(hash_namespaces)
Ok(cd_id_namespaces)
}
#[derive(Debug, FromQueryResult)]
struct HashTagCount {
hash_id: i64,
struct CIDTagCount {
cd_id: i64,
tag_count: i32,
}
#[tracing::instrument(level = "debug", skip_all)]
pub async fn get_hashes_with_tag_count(
pub async fn get_content_descriptors_with_tag_count(
db: &DatabaseConnection,
hash_ids: Vec<i64>,
cd_ids: Vec<i64>,
) -> RepoResult<HashMap<i64, u32>> {
if hash_ids.is_empty() {
if cd_ids.is_empty() {
return Ok(HashMap::new());
}
let hash_tag_counts: Vec<HashTagCount> =
HashTagCount::find_by_statement(Statement::from_sql_and_values(
let hash_tag_counts: Vec<CIDTagCount> =
CIDTagCount::find_by_statement(Statement::from_sql_and_values(
DbBackend::Sqlite,
format!(
r#"
SELECT htm.hash_id, COUNT(htm.tag_id) AS "tag_count" from hash_tag_mappings htm
WHERE htm.hash_id IN ({})
GROUP BY hash_id
SELECT ctm.cd_id, COUNT(ctm.tag_id) AS "tag_count" from cd_tag_mappings ctm
WHERE ctm.cd_id IN ({})
GROUP BY cd_id
"#,
vec_to_query_list(hash_ids)
vec_to_query_list(cd_ids)
)
.as_str(),
vec![],
@ -87,7 +87,12 @@ pub async fn get_hashes_with_tag_count(
let mappings = hash_tag_counts
.into_iter()
.map(|HashTagCount { hash_id, tag_count }| (hash_id, tag_count as u32))
.map(
|CIDTagCount {
cd_id: hash_id,
tag_count,
}| (hash_id, tag_count as u32),
)
.collect::<HashMap<i64, u32>>();
Ok(mappings)

@ -15,7 +15,6 @@ mime = "^0.3.16"
tracing = "^0.1.29"
async-trait = "^0.1.51"
[dependencies.mediarepo-core]
path = "../mediarepo-core"

@ -1,26 +1,26 @@
use crate::file::File;
use mediarepo_core::error::RepoResult;
use mediarepo_database::entities::content_descriptor;
use mediarepo_database::entities::file;
use mediarepo_database::entities::hash;
use sea_orm::prelude::*;
use sea_orm::{DatabaseConnection, Set};
use std::fmt::Debug;
pub struct Hash {
db: DatabaseConnection,
model: hash::Model,
model: content_descriptor::Model,
}
impl Hash {
#[tracing::instrument(level = "trace")]
pub(crate) fn new(db: DatabaseConnection, model: hash::Model) -> Self {
pub(crate) fn new(db: DatabaseConnection, model: content_descriptor::Model) -> Self {
Self { db, model }
}
/// Searches for the hash by id
#[tracing::instrument(level = "debug", skip(db))]
pub async fn by_id(db: DatabaseConnection, id: i64) -> RepoResult<Option<Self>> {
let hash = hash::Entity::find_by_id(id)
let hash = content_descriptor::Entity::find_by_id(id)
.one(&db)
.await?
.map(|model| Self::new(db, model));
@ -30,24 +30,24 @@ impl Hash {
/// Returns the hash by value
#[tracing::instrument(level = "debug", skip(db))]
pub async fn by_value<S: AsRef<str> + Debug>(
pub async fn by_value<D: AsRef<[u8]> + Debug>(
db: DatabaseConnection,
value: S,
descriptor: D,
) -> RepoResult<Option<Self>> {
let hash = hash::Entity::find()
.filter(hash::Column::Value.eq(value.as_ref()))
let cid = content_descriptor::Entity::find()
.filter(content_descriptor::Column::Descriptor.eq(descriptor.as_ref()))
.one(&db)
.await?
.map(|model| Self::new(db, model));
Ok(hash)
Ok(cid)
}
/// Adds a new hash to the database
#[tracing::instrument(level = "debug", skip(db))]
pub async fn add(db: DatabaseConnection, value: String) -> RepoResult<Self> {
let active_model = hash::ActiveModel {
value: Set(value),
pub async fn add(db: DatabaseConnection, descriptor: Vec<u8>) -> RepoResult<Self> {
let active_model = content_descriptor::ActiveModel {
descriptor: Set(descriptor),
..Default::default()
};
let model = active_model.insert(&db).await?;
@ -59,8 +59,8 @@ impl Hash {
self.model.id
}
pub fn value(&self) -> &String {
&self.model.value
pub fn descriptor(&self) -> &[u8] {
&self.model.descriptor[..]
}
/// Returns the file associated with the hash

@ -2,43 +2,57 @@ use std::fmt::Debug;
use std::io::Cursor;
use std::str::FromStr;
use chrono::{Local, NaiveDateTime};
use mediarepo_core::content_descriptor::encode_content_descriptor;
use sea_orm::prelude::*;
use sea_orm::sea_query::{Expr, Query};
use sea_orm::{Condition, DatabaseConnection, Set};
use sea_orm::{JoinType, QuerySelect};
use tokio::io::{AsyncReadExt, BufReader};
use mediarepo_core::error::RepoResult;
use crate::file_metadata::FileMetadata;
use mediarepo_core::error::{RepoError, RepoResult};
use mediarepo_core::thumbnailer::{self, Thumbnail as ThumbnailerThumb, ThumbnailSize};
use mediarepo_database::entities::content_descriptor;
use mediarepo_database::entities::content_descriptor_tag;
use mediarepo_database::entities::file;
use mediarepo_database::entities::hash;
use mediarepo_database::entities::hash_tag;
use mediarepo_database::entities::namespace;
use mediarepo_database::entities::tag;
use crate::file_type::FileType;
use crate::storage::Storage;
use crate::tag::Tag;
pub enum FileStatus {
Imported = 10,
Archived = 20,
Deleted = 30,
}
#[derive(Clone)]
pub struct File {
db: DatabaseConnection,
model: file::Model,
hash: hash::Model,
content_descriptor: content_descriptor::Model,
}
impl File {
#[tracing::instrument(level = "trace")]
pub(crate) fn new(db: DatabaseConnection, model: file::Model, hash: hash::Model) -> Self {
Self { db, model, hash }
pub(crate) fn new(
db: DatabaseConnection,
model: file::Model,
hash: content_descriptor::Model,
) -> Self {
Self {
db,
model,
content_descriptor: hash,
}
}
/// Returns a list of all known stored files
#[tracing::instrument(level = "debug", skip(db))]
pub async fn all(db: DatabaseConnection) -> RepoResult<Vec<File>> {
let files: Vec<(file::Model, Option<hash::Model>)> = file::Entity::find()
.find_also_related(hash::Entity)
let files: Vec<(file::Model, Option<content_descriptor::Model>)> = file::Entity::find()
.find_also_related(content_descriptor::Entity)
.all(&db)
.await?;
let files = files
@ -56,7 +70,7 @@ impl File {
#[tracing::instrument(level = "debug", skip(db))]
pub async fn by_id(db: DatabaseConnection, id: i64) -> RepoResult<Option<Self>> {
if let Some((model, Some(hash))) = file::Entity::find_by_id(id)
.find_also_related(hash::Entity)
.find_also_related(content_descriptor::Entity)
.one(&db)
.await?
{
@ -69,12 +83,12 @@ impl File {
/// Finds the file by hash
#[tracing::instrument(level = "debug", skip(db))]
pub async fn by_hash<S: AsRef<str> + Debug>(
pub async fn by_cd<S: AsRef<str> + Debug>(
db: DatabaseConnection,
hash: S,
cid: S,
) -> RepoResult<Option<Self>> {
if let Some((hash, Some(model))) = hash::Entity::find()
.filter(hash::Column::Value.eq(hash.as_ref()))
if let Some((hash, Some(model))) = content_descriptor::Entity::find()
.filter(content_descriptor::Column::Descriptor.eq(cid.as_ref()))
.find_also_related(file::Entity)
.one(&db)
.await?
@ -94,12 +108,13 @@ impl File {
) -> RepoResult<Vec<Self>> {
let main_condition = build_find_filter_conditions(tag_ids);
let results: Vec<(hash::Model, Option<file::Model>)> = hash::Entity::find()
.find_also_related(file::Entity)
.filter(main_condition)
.group_by(file::Column::Id)
.all(&db)
.await?;
let results: Vec<(content_descriptor::Model, Option<file::Model>)> =
content_descriptor::Entity::find()
.find_also_related(file::Entity)
.filter(main_condition)
.group_by(file::Column::Id)
.all(&db)
.await?;
let files: Vec<Self> = results
.into_iter()
.filter_map(|(hash, tag)| Some(Self::new(db.clone(), tag?, hash)))
@ -113,20 +128,13 @@ impl File {
pub(crate) async fn add(
db: DatabaseConnection,
storage_id: i64,
hash_id: i64,
file_type: FileType,
mime_type: Option<String>,
creation_time: NaiveDateTime,
change_time: NaiveDateTime,
cd_id: i64,
mime_type: String,
) -> RepoResult<Self> {
let file = file::ActiveModel {
hash_id: Set(hash_id),
file_type: Set(file_type as u32),
cd_id: Set(cd_id),
mime_type: Set(mime_type),
storage_id: Set(storage_id),
import_time: Set(Local::now().naive_local()),
creation_time: Set(creation_time),
change_time: Set(change_time),
..Default::default()
};
let file: file::ActiveModel = file.insert(&db).await?.into();
@ -143,53 +151,41 @@ impl File {
}
/// Returns the hash of the file (content identifier)
pub fn hash(&self) -> &String {
&self.hash.value
pub fn cd(&self) -> &[u8] {
&self.content_descriptor.descriptor
}
/// Returns the hash id of the file
pub fn hash_id(&self) -> i64 {
self.hash.id
/// Returns the encoded content descriptor
pub fn encoded_cd(&self) -> String {
encode_content_descriptor(self.cd())
}
/// Returns the type of the file
pub fn file_type(&self) -> FileType {
match self.model.file_type {
1 => FileType::Image,
2 => FileType::Video,
3 => FileType::Audio,
_ => FileType::Unknown,
}
/// Returns the id of the civ (content identifier value) of the file
pub fn cd_id(&self) -> i64 {
self.content_descriptor.id
}
/// Returns the optional mime type of the file
pub fn mime_type(&self) -> &Option<String> {
/// Returns the mime type of the file
pub fn mime_type(&self) -> &String {
&self.model.mime_type
}
/// Returns the optional name of the file
pub fn name(&self) -> &Option<String> {
&self.model.name
}
/// Returns the comment of the file
pub fn comment(&self) -> &Option<String> {
&self.model.comment
}
/// Returns the import time of the file
pub fn import_time(&self) -> &NaiveDateTime {
&self.model.import_time
}
/// Returns the datetime when the file was created
pub fn creation_time(&self) -> &NaiveDateTime {
&self.model.creation_time
/// Returns the status of the file
pub fn status(&self) -> FileStatus {
match self.model.status {
10 => FileStatus::Imported,
20 => FileStatus::Archived,
30 => FileStatus::Deleted,
_ => FileStatus::Imported,
}
}
/// Returns the last time the file was changed
pub fn change_time(&self) -> &NaiveDateTime {
&self.model.change_time
/// Returns the metadata associated with this file
/// A file MUST always have metadata associated
pub async fn metadata(&self) -> RepoResult<FileMetadata> {
FileMetadata::by_id(self.db.clone(), self.model.id)
.await
.and_then(|f| f.ok_or_else(|| RepoError::from("missing file metadata")))
}
/// Returns the storage where the file is stored
@ -206,9 +202,15 @@ impl File {
pub async fn tags(&self) -> RepoResult<Vec<Tag>> {
let tags: Vec<(tag::Model, Option<namespace::Model>)> = tag::Entity::find()
.find_also_related(namespace::Entity)
.join(JoinType::LeftJoin, hash_tag::Relation::Tag.def().rev())
.join(JoinType::InnerJoin, hash_tag::Relation::Hash.def())
.filter(hash::Column::Id.eq(self.hash.id))
.join(
JoinType::LeftJoin,
content_descriptor_tag::Relation::Tag.def().rev(),
)
.join(
JoinType::InnerJoin,
content_descriptor_tag::Relation::ContentDescriptorId.def(),
)
.filter(content_descriptor::Column::Id.eq(self.content_descriptor.id))
.all(&self.db)
.await?;
let tags = tags
@ -219,45 +221,12 @@ impl File {
Ok(tags)
}
/// Changes the name of the file
#[tracing::instrument(level = "debug", skip(self))]
pub async fn set_name<S: ToString + Debug>(&mut self, name: S) -> RepoResult<()> {
let mut active_file = self.get_active_model();
active_file.name = Set(Some(name.to_string()));
let active_file = active_file.update(&self.db).await?;
self.model.name = active_file.name;
Ok(())
}
/// Changes the comment of the file
#[tracing::instrument(level = "debug", skip(self))]
pub async fn set_comment<S: ToString + Debug>(&mut self, comment: S) -> RepoResult<()> {
let mut active_file = self.get_active_model();
active_file.comment = Set(Some(comment.to_string()));
let active_file = active_file.update(&self.db).await?;
self.model.comment = active_file.comment;
Ok(())
}
/// Changes the type of the file
#[tracing::instrument(level = "debug", skip(self))]
pub async fn set_file_type(&mut self, file_type: FileType) -> RepoResult<()> {
let mut active_file = self.get_active_model();
active_file.file_type = Set(file_type as u32);
let active_file = active_file.update(&self.db).await?;
self.model.file_type = active_file.file_type;
Ok(())
}
/// Adds a single tag to the file
#[tracing::instrument(level = "debug", skip(self))]
pub async fn add_tag(&mut self, tag_id: i64) -> RepoResult<()> {
let hash_id = self.hash.id;
let active_model = hash_tag::ActiveModel {
hash_id: Set(hash_id),
let cd_id = self.content_descriptor.id;
let active_model = content_descriptor_tag::ActiveModel {
cd_id: Set(cd_id),
tag_id: Set(tag_id),
};
active_model.insert(&self.db).await?;
@ -270,15 +239,17 @@ impl File {
if tag_ids.is_empty() {
return Ok(());
}
let hash_id = self.hash.id;
let models: Vec<hash_tag::ActiveModel> = tag_ids
let cd_id = self.content_descriptor.id;
let models: Vec<content_descriptor_tag::ActiveModel> = tag_ids
.into_iter()
.map(|tag_id| hash_tag::ActiveModel {
hash_id: Set(hash_id),
.map(|tag_id| content_descriptor_tag::ActiveModel {
cd_id: Set(cd_id),
tag_id: Set(tag_id),
})
.collect();
hash_tag::Entity::insert_many(models).exec(&self.db).await?;
content_descriptor_tag::Entity::insert_many(models)
.exec(&self.db)
.await?;
Ok(())
}
@ -286,10 +257,10 @@ impl File {
/// Removes multiple tags from the file
#[tracing::instrument(level = "debug", skip(self))]
pub async fn remove_tags(&self, tag_ids: Vec<i64>) -> RepoResult<()> {
let hash_id = self.hash.id;
hash_tag::Entity::delete_many()
.filter(hash_tag::Column::HashId.eq(hash_id))
.filter(hash_tag::Column::TagId.is_in(tag_ids))
let hash_id = self.content_descriptor.id;
content_descriptor_tag::Entity::delete_many()
.filter(content_descriptor_tag::Column::CdId.eq(hash_id))
.filter(content_descriptor_tag::Column::TagId.is_in(tag_ids))
.exec(&self.db)
.await?;
@ -301,26 +272,9 @@ impl File {
pub async fn get_reader(&self) -> RepoResult<BufReader<tokio::fs::File>> {
let storage = self.storage().await?;
storage.get_file_reader(&self.hash.value).await
}
/// Retrieves the size of the file from its content
#[tracing::instrument(level = "trace", skip(self))]
pub async fn get_size(&self) -> RepoResult<u64> {
if let Some(size) = self.model.size {
Ok(size as u64)
} else {
let mut reader = self.get_reader().await?;
let size = {
let mut buf = Vec::new();
reader.read_to_end(&mut buf).await
}?;
let mut model = self.get_active_model();
model.size = Set(Some(size as i64));
model.update(&self.db).await?;
Ok(size as u64)
}
storage
.get_file_reader(&self.content_descriptor.descriptor)
.await
}
/// Creates a thumbnail for the file
@ -331,24 +285,13 @@ impl File {
) -> RepoResult<Vec<ThumbnailerThumb>> {
let mut buf = Vec::new();
self.get_reader().await?.read_to_end(&mut buf).await?;
let mime_type = self
.model
.mime_type
.clone()
.map(|mime_type| mime::Mime::from_str(&mime_type).unwrap())
.unwrap_or(mime::IMAGE_STAR);
let mime_type = self.model.mime_type.clone();
let mime_type =
mime::Mime::from_str(&mime_type).unwrap_or_else(|_| mime::APPLICATION_OCTET_STREAM);
let thumbs = thumbnailer::create_thumbnails(Cursor::new(buf), mime_type, sizes)?;
Ok(thumbs)
}
/// Returns the active model of the file with only the id set
fn get_active_model(&self) -> file::ActiveModel {
file::ActiveModel {
id: Set(self.id()),
..Default::default()
}
}
}
fn build_find_filter_conditions(tag_ids: Vec<Vec<(i64, bool)>>) -> Condition {
@ -373,21 +316,21 @@ fn build_find_filter_conditions(tag_ids: Vec<Vec<(i64, bool)>>) -> Condition {
fn add_single_filter_expression(condition: Condition, tag_id: i64, negated: bool) -> Condition {
if negated {
condition.add(
hash::Column::Id.not_in_subquery(
content_descriptor::Column::Id.not_in_subquery(
Query::select()
.expr(Expr::col(hash_tag::Column::HashId))
.from(hash_tag::Entity)
.cond_where(hash_tag::Column::TagId.eq(tag_id))
.expr(Expr::col(content_descriptor_tag::Column::CdId))
.from(content_descriptor_tag::Entity)
.cond_where(content_descriptor_tag::Column::TagId.eq(tag_id))
.to_owned(),
),
)
} else {
condition.add(
hash::Column::Id.in_subquery(
content_descriptor::Column::Id.in_subquery(
Query::select()
.expr(Expr::col(hash_tag::Column::HashId))
.from(hash_tag::Entity)
.cond_where(hash_tag::Column::TagId.eq(tag_id))
.expr(Expr::col(content_descriptor_tag::Column::CdId))
.from(content_descriptor_tag::Entity)
.cond_where(content_descriptor_tag::Column::TagId.eq(tag_id))
.to_owned(),
),
)

@ -0,0 +1,110 @@
use std::fmt::Debug;
use chrono::{Local, NaiveDateTime};
use sea_orm::prelude::*;
use sea_orm::{DatabaseConnection, Set};
use mediarepo_core::error::RepoResult;
use mediarepo_database::entities::file_metadata;
#[derive(Clone)]
pub struct FileMetadata {
db: DatabaseConnection,
model: file_metadata::Model,
}
impl FileMetadata {
#[tracing::instrument(level = "trace")]
pub(crate) fn new(db: DatabaseConnection, model: file_metadata::Model) -> Self {
Self { db, model }
}
/// Fetches the file by id
#[tracing::instrument(level = "debug", skip(db))]
pub async fn by_id(db: DatabaseConnection, id: i64) -> RepoResult<Option<Self>> {
let file_metadata = file_metadata::Entity::find_by_id(id)
.one(&db)
.await?
.map(|m| FileMetadata::new(db, m));
Ok(file_metadata)
}
/// Adds a file with its hash to the database
#[tracing::instrument(level = "debug", skip(db))]
pub(crate) async fn add(
db: DatabaseConnection,
file_id: i64,
size: i64,
creation_time: NaiveDateTime,
change_time: NaiveDateTime,
) -> RepoResult<Self> {
let file = file_metadata::ActiveModel {
file_id: Set(file_id),
size: Set(size),
import_time: Set(Local::now().naive_local()),
creation_time: Set(creation_time),
change_time: Set(change_time),
..Default::default()
};
let model = file.insert(&db).await?;
Ok(Self::new(db, model))
}
pub fn file_id(&self) -> i64 {
self.model.file_id
}
pub fn size(&self) -> i64 {
self.model.size
}
pub fn name(&self) -> &Option<String> {
&self.model.name
}
pub fn comment(&self) -> &Option<String> {
&self.model.comment
}
pub fn import_time(&self) -> &NaiveDateTime {
&self.model.import_time
}
pub fn creation_time(&self) -> &NaiveDateTime {
&self.model.creation_time
}
pub fn change_time(&self) -> &NaiveDateTime {
&self.model.change_time
}
/// Changes the name of the file
#[tracing::instrument(level = "debug", skip(self))]
pub async fn set_name<S: ToString + Debug>(&mut self, name: S) -> RepoResult<()> {
let mut active_model = self.get_active_model();
active_model.name = Set(Some(name.to_string()));
self.model = active_model.update(&self.db).await?;
Ok(())
}
/// Changes the comment of the file
#[tracing::instrument(level = "debug", skip(self))]
pub async fn set_comment<S: ToString + Debug>(&mut self, comment: S) -> RepoResult<()> {
let mut active_file = self.get_active_model();
active_file.comment = Set(Some(comment.to_string()));
self.model = active_file.update(&self.db).await?;
Ok(())
}
/// Returns the active model of the file with only the id set
fn get_active_model(&self) -> file_metadata::ActiveModel {
file_metadata::ActiveModel {
file_id: Set(self.file_id()),
..Default::default()
}
}
}

@ -1,22 +0,0 @@
use mime::Mime;
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq)]
pub enum FileType {
Other = -1,
Unknown = 0,
Image = 1,
Video = 2,
Audio = 3,
}
impl From<Mime> for FileType {
fn from(mime_type: Mime) -> Self {
match mime_type.type_() {
mime::IMAGE => Self::Image,
mime::VIDEO => Self::Video,
mime::AUDIO => Self::Audio,
_ => Self::Other,
}
}
}

@ -1,7 +1,7 @@
pub mod content_descriptor;
pub mod file;
pub mod file_type;
pub mod file_metadata;
pub mod handles;
pub mod hash;
pub mod namespace;
pub mod repo;
pub mod storage;

@ -1,10 +1,11 @@
use crate::file::File;
use crate::file_type::FileType;
use crate::file_metadata::FileMetadata;
use crate::namespace::Namespace;
use crate::storage::Storage;
use crate::tag::Tag;
use crate::thumbnail::Thumbnail;
use chrono::{Local, NaiveDateTime};
use mediarepo_core::content_descriptor::encode_content_descriptor;
use mediarepo_core::error::{RepoError, RepoResult};
use mediarepo_core::fs::thumbnail_store::{Dimensions, ThumbnailStore};
use mediarepo_core::itertools::Itertools;
@ -20,7 +21,7 @@ use std::iter::FromIterator;
use std::path::PathBuf;
use std::str::FromStr;
use tokio::fs::OpenOptions;
use tokio::io::BufReader;
use tokio::io::AsyncReadExt;
#[derive(Clone)]
pub struct Repo {
@ -91,8 +92,8 @@ impl Repo {
/// Returns a file by its mapped hash
#[tracing::instrument(level = "debug", skip(self))]
pub async fn file_by_hash<S: AsRef<str> + Debug>(&self, hash: S) -> RepoResult<Option<File>> {
File::by_hash(self.db.clone(), hash).await
pub async fn file_by_cd<S: AsRef<str> + Debug>(&self, hash: S) -> RepoResult<Option<File>> {
File::by_cd(self.db.clone(), hash).await
}
/// Returns a file by id
@ -138,47 +139,40 @@ impl Repo {
change_time: NaiveDateTime,
) -> RepoResult<File> {
let storage = self.get_main_storage()?;
let file_size = content.len();
let reader = Cursor::new(content);
let hash = storage.store_entry(reader).await?;
let (mime_type, file_type) = mime_type
let mime_type = mime_type
.and_then(|m| mime::Mime::from_str(&m).ok())
.map(|m| (Some(m.to_string()), FileType::from(m)))
.unwrap_or((None, FileType::Unknown));
.unwrap_or_else(|| mime::APPLICATION_OCTET_STREAM)
.to_string();
File::add(
let file = File::add(self.db.clone(), storage.id(), hash.id(), mime_type).await?;
FileMetadata::add(
self.db.clone(),
storage.id(),
hash.id(),
file_type,
mime_type,
file.id(),
file_size as i64,
creation_time,
change_time,
)
.await
.await?;
Ok(file)
}
/// Adds a file to the database by its readable path in the file system
#[tracing::instrument(level = "debug", skip(self))]
pub async fn add_file_by_path(&self, path: PathBuf) -> RepoResult<File> {
let mime_match = mime_guess::from_path(&path).first();
let mime_type = mime_guess::from_path(&path).first().map(|m| m.to_string());
let (mime_type, file_type) = if let Some(mime) = mime_match {
(Some(mime.clone().to_string()), FileType::from(mime))
} else {
(None, FileType::Unknown)
};
let os_file = OpenOptions::new().read(true).open(&path).await?;
let reader = BufReader::new(os_file);
let mut os_file = OpenOptions::new().read(true).open(&path).await?;
let mut buf = Vec::new();
os_file.read_to_end(&mut buf).await?;
let storage = self.get_main_storage()?;
let hash = storage.store_entry(reader).await?;
File::add(
self.db.clone(),
storage.id(),
hash.id(),
file_type,
self.add_file(
mime_type,
buf,
Local::now().naive_local(),
Local::now().naive_local(),
)
@ -186,14 +180,15 @@ impl Repo {
}
/// Returns all thumbnails of a file
pub async fn get_file_thumbnails(&self, file_hash: String) -> RepoResult<Vec<Thumbnail>> {
pub async fn get_file_thumbnails(&self, file_cd: &[u8]) -> RepoResult<Vec<Thumbnail>> {
let thumb_store = self.get_thumbnail_storage()?;
let file_cd = encode_content_descriptor(file_cd);
let thumbnails = thumb_store
.get_thumbnails(&file_hash)
.get_thumbnails(&file_cd)
.await?
.into_iter()
.map(|(size, path)| Thumbnail {
file_hash: file_hash.to_owned(),
file_hash: file_cd.to_owned(),
path,
size,
mime_type: mime::IMAGE_PNG.to_string(),
@ -214,7 +209,7 @@ impl Repo {
for thumb in thumbs {
let entry = self
.store_single_thumbnail(file.hash().to_owned(), thumb_storage, height, width, thumb)
.store_single_thumbnail(file.encoded_cd(), thumb_storage, height, width, thumb)
.await?;
created_thumbs.push(entry);
}
@ -236,7 +231,7 @@ impl Repo {
.pop()
.ok_or_else(|| RepoError::from("Failed to create thumbnail"))?;
let thumbnail = self
.store_single_thumbnail(file.hash().to_owned(), thumb_storage, height, width, thumb)
.store_single_thumbnail(file.encoded_cd(), thumb_storage, height, width, thumb)
.await?;
Ok(thumbnail)
@ -288,7 +283,10 @@ impl Repo {
/// Finds all tags that are assigned to the given list of hashes
#[tracing::instrument(level = "debug", skip_all)]
pub async fn find_tags_for_hashes(&self, hashes: Vec<String>) -> RepoResult<Vec<Tag>> {
pub async fn find_tags_for_file_identifiers(
&self,
hashes: Vec<Vec<u8>>,
) -> RepoResult<Vec<Tag>> {
Tag::for_hash_list(self.db.clone(), hashes).await
}

@ -1,4 +1,4 @@
use crate::hash::Hash;
use crate::content_descriptor::Hash;
use mediarepo_core::error::RepoResult;
use mediarepo_core::fs::file_hash_store::FileHashStore;
use mediarepo_database::entities::storage;
@ -172,11 +172,11 @@ impl Storage {
/// Returns the buf reader to the given hash
#[tracing::instrument(level = "debug", skip(self))]
pub async fn get_file_reader<S: ToString + Debug>(
pub async fn get_file_reader(
&self,
hash: S,
descriptor: &[u8],
) -> RepoResult<BufReader<tokio::fs::File>> {
let (_ext, reader) = self.store.get_file(hash.to_string()).await?;
let (_ext, reader) = self.store.get_file(descriptor).await?;
Ok(reader)
}

@ -1,8 +1,8 @@
use std::fmt::Debug;
use mediarepo_core::error::RepoResult;
use mediarepo_database::entities::hash;
use mediarepo_database::entities::hash_tag;
use mediarepo_database::entities::content_descriptor;
use mediarepo_database::entities::content_descriptor_tag;
use mediarepo_database::entities::namespace;
use mediarepo_database::entities::tag;
use sea_orm::prelude::*;
@ -120,13 +120,19 @@ impl Tag {
#[tracing::instrument(level = "debug", skip_all)]
pub async fn for_hash_list(
db: DatabaseConnection,
hashes: Vec<String>,
hashes: Vec<Vec<u8>>,
) -> RepoResult<Vec<Self>> {
let tags: Vec<Self> = tag::Entity::find()
.find_also_related(namespace::Entity)
.join(JoinType::LeftJoin, hash_tag::Relation::Tag.def().rev())
.join(JoinType::InnerJoin, hash_tag::Relation::Hash.def())
.filter(hash::Column::Value.is_in(hashes))
.join(
JoinType::LeftJoin,
content_descriptor_tag::Relation::Tag.def().rev(),
)
.join(
JoinType::InnerJoin,
content_descriptor_tag::Relation::ContentDescriptorId.def(),
)
.filter(content_descriptor::Column::Descriptor.is_in(hashes))
.group_by(tag::Column::Id)
.all(&db)
.await?

@ -11,6 +11,7 @@ serde = "^1.0.130"
tracing = "^0.1.29"
compare = "^0.1.0"
port_check = "^0.1.5"
rayon = "1.5.1"
[dependencies.mediarepo-core]
path = "../mediarepo-core"

@ -1,8 +1,9 @@
use mediarepo_core::mediarepo_api::types::files::{
FileMetadataResponse, ThumbnailMetadataResponse,
FileBasicDataResponse, FileMetadataResponse, FileStatus, ThumbnailMetadataResponse,
};
use mediarepo_core::mediarepo_api::types::tags::{NamespaceResponse, TagResponse};
use mediarepo_model::file::File;
use mediarepo_model::file::{File, FileStatus as FileStatusModel};
use mediarepo_model::file_metadata::FileMetadata;
use mediarepo_model::namespace::Namespace;
use mediarepo_model::tag::Tag;
use mediarepo_model::thumbnail::Thumbnail;
@ -11,18 +12,37 @@ pub trait FromModel<M> {
fn from_model(model: M) -> Self;
}
impl FromModel<File> for FileMetadataResponse {
fn from_model(file: File) -> Self {
impl FromModel<FileMetadata> for FileMetadataResponse {
fn from_model(metadata: FileMetadata) -> Self {
Self {
file_id: metadata.file_id(),
name: metadata.name().to_owned(),
comment: metadata.comment().to_owned(),
file_type: 0,
creation_time: metadata.creation_time().to_owned(),
change_time: metadata.change_time().to_owned(),
import_time: metadata.import_time().to_owned(),
}
}
}
impl FromModel<File> for FileBasicDataResponse {
fn from_model(file: File) -> Self {
FileBasicDataResponse {
id: file.id(),
name: file.name().to_owned(),
comment: file.comment().to_owned(),
hash: file.hash().to_owned(),
file_type: file.file_type() as u32,
status: FileStatus::from_model(file.status()),
cid: file.encoded_cd(),
mime_type: file.mime_type().to_owned(),
creation_time: file.creation_time().to_owned(),
change_time: file.change_time().to_owned(),
import_time: file.import_time().to_owned(),
}
}
}
impl FromModel<FileStatusModel> for FileStatus {
fn from_model(status: FileStatusModel) -> Self {
match status {
FileStatusModel::Imported => FileStatus::Imported,
FileStatusModel::Archived => FileStatus::Archived,
FileStatusModel::Deleted => FileStatus::Deleted,
}
}
}

@ -1,20 +1,20 @@
use crate::from_model::FromModel;
use crate::utils::{file_by_identifier, get_repo_from_context, hash_by_identifier};
use crate::utils::{cd_by_identifier, file_by_identifier, get_repo_from_context};
use chrono::NaiveDateTime;
use compare::Compare;
use mediarepo_core::bromine::prelude::*;
use mediarepo_core::fs::thumbnail_store::Dimensions;
use mediarepo_core::itertools::Itertools;
use mediarepo_core::mediarepo_api::types::files::{
AddFileRequestHeader, FileMetadataResponse, FilterExpression, FindFilesRequest,
GetFileThumbnailOfSizeRequest, GetFileThumbnailsRequest, ReadFileRequest, SortDirection,
SortKey, ThumbnailMetadataResponse, UpdateFileNameRequest,
AddFileRequestHeader, FileBasicDataResponse, FileMetadataResponse, FilterExpression,
FindFilesRequest, GetFileThumbnailOfSizeRequest, GetFileThumbnailsRequest, ReadFileRequest,
SortDirection, SortKey, ThumbnailMetadataResponse, UpdateFileNameRequest,
};
use mediarepo_core::mediarepo_api::types::identifier::FileIdentifier;
use mediarepo_core::thumbnailer::ThumbnailSize;
use mediarepo_core::utils::parse_namespace_and_tag;
use mediarepo_database::queries::tags::{
get_hashes_with_namespaced_tags, get_hashes_with_tag_count,
get_cids_with_namespaced_tags, get_content_descriptors_with_tag_count,
};
use std::cmp::Ordering;
use std::collections::HashMap;
@ -24,7 +24,7 @@ pub struct FilesNamespace;
pub struct FileSortContext {
name: Option<String>,
size: u64,
mime_type: Option<String>,
mime_type: String,
namespaces: HashMap<String, Vec<String>>,
tag_count: u32,
import_time: NaiveDateTime,
@ -41,6 +41,7 @@ impl NamespaceProvider for FilesNamespace {
events!(handler,
"all_files" => Self::all_files,
"get_file" => Self::get_file,
"get_file_metadata" => Self::get_file_metadata,
"get_files" => Self::get_files,
"find_files" => Self::find_files,
"add_file" => Self::add_file,
@ -60,9 +61,9 @@ impl FilesNamespace {
let repo = get_repo_from_context(ctx).await;
let files = repo.files().await?;
let responses: Vec<FileMetadataResponse> = files
let responses: Vec<FileBasicDataResponse> = files
.into_iter()
.map(FileMetadataResponse::from_model)
.map(FileBasicDataResponse::from_model)
.collect();
ctx.emit_to(Self::name(), "all_files", responses).await?;
@ -76,12 +77,29 @@ impl FilesNamespace {
let id = event.payload::<FileIdentifier>()?;
let repo = get_repo_from_context(ctx).await;
let file = file_by_identifier(id, &repo).await?;
let response = FileMetadataResponse::from_model(file);
let response = FileBasicDataResponse::from_model(file);
ctx.emit_to(Self::name(), "get_file", response).await?;
Ok(())
}
/// Returns metadata for a given file
#[tracing::instrument(skip_all)]
async fn get_file_metadata(ctx: &Context, event: Event) -> IPCResult<()> {
let id = event.payload::<FileIdentifier>()?;
let repo = get_repo_from_context(ctx).await;
let file = file_by_identifier(id, &repo).await?;
let metadata = file.metadata().await?;
ctx.emit_to(
Self::name(),
"get_file_metadata",
FileMetadataResponse::from_model(metadata),
)
.await?;
Ok(())
}
/// Returns a list of files by identifier
#[tracing::instrument(skip_all)]
async fn get_files(ctx: &Context, event: Event) -> IPCResult<()> {
@ -93,7 +111,7 @@ impl FilesNamespace {
responses.push(
file_by_identifier(id, &repo)
.await
.map(FileMetadataResponse::from_model)?,
.map(FileBasicDataResponse::from_model)?,
);
}
ctx.emit_to(Self::name(), "get_files", responses).await?;
@ -121,26 +139,28 @@ impl FilesNamespace {
.collect();
let mut files = repo.find_files_by_tags(tags).await?;
let hash_ids: Vec<i64> = files.iter().map(|f| f.hash_id()).collect();
let hash_ids: Vec<i64> = files.iter().map(|f| f.cd_id()).collect();
let mut hash_nsp: HashMap<i64, HashMap<String, Vec<String>>> =
get_hashes_with_namespaced_tags(repo.db(), hash_ids.clone()).await?;
let mut hash_tag_counts = get_hashes_with_tag_count(repo.db(), hash_ids).await?;
let mut cid_nsp: HashMap<i64, HashMap<String, Vec<String>>> =
get_cids_with_namespaced_tags(repo.db(), hash_ids.clone()).await?;
let mut cid_tag_counts =
get_content_descriptors_with_tag_count(repo.db(), hash_ids).await?;
let mut contexts = HashMap::new();
for file in &files {
let metadata = file.metadata().await?;
let context = FileSortContext {
name: file.name().to_owned(),
size: file.get_size().await?,
name: metadata.name().to_owned(),
size: metadata.size() as u64,
mime_type: file.mime_type().to_owned(),
namespaces: hash_nsp
.remove(&file.hash_id())
namespaces: cid_nsp
.remove(&file.cd_id())
.unwrap_or(HashMap::with_capacity(0)),
tag_count: hash_tag_counts.remove(&file.hash_id()).unwrap_or(0),
import_time: file.import_time().to_owned(),
create_time: file.import_time().to_owned(),
change_time: file.change_time().to_owned(),
tag_count: cid_tag_counts.remove(&file.cd_id()).unwrap_or(0),
import_time: metadata.import_time().to_owned(),
create_time: metadata.import_time().to_owned(),
change_time: metadata.change_time().to_owned(),
};
contexts.insert(file.id(), context);
}
@ -155,9 +175,9 @@ impl FilesNamespace {
)
});
let responses: Vec<FileMetadataResponse> = files
let responses: Vec<FileBasicDataResponse> = files
.into_iter()
.map(FileMetadataResponse::from_model)
.map(FileBasicDataResponse::from_model)
.collect();
ctx.emit_to(Self::name(), "find_files", responses).await?;
Ok(())
@ -172,7 +192,7 @@ impl FilesNamespace {
let AddFileRequestHeader { metadata, tags } = request;
let repo = get_repo_from_context(ctx).await;
let mut file = repo
let file = repo
.add_file(
metadata.mime_type,
bytes.into_inner(),
@ -180,7 +200,7 @@ impl FilesNamespace {
metadata.change_time,
)
.await?;
file.set_name(metadata.name).await?;
file.metadata().await?.set_name(metadata.name).await?;
let tags = repo
.add_all_tags(tags.into_iter().map(parse_namespace_and_tag).collect())
@ -191,7 +211,7 @@ impl FilesNamespace {
ctx.emit_to(
Self::name(),
"add_file",
FileMetadataResponse::from_model(file),
FileBasicDataResponse::from_model(file),
)
.await?;
@ -220,8 +240,8 @@ impl FilesNamespace {
async fn thumbnails(ctx: &Context, event: Event) -> IPCResult<()> {
let request = event.payload::<GetFileThumbnailsRequest>()?;
let repo = get_repo_from_context(ctx).await;
let file_hash = hash_by_identifier(request.id.clone(), &repo).await?;
let mut thumbnails = repo.get_file_thumbnails(file_hash).await?;
let file_cd = cd_by_identifier(request.id.clone(), &repo).await?;
let mut thumbnails = repo.get_file_thumbnails(&file_cd).await?;
if thumbnails.is_empty() {
tracing::debug!("No thumbnails for file found. Creating thumbnails...");
@ -245,8 +265,8 @@ impl FilesNamespace {
async fn get_thumbnail_of_size(ctx: &Context, event: Event) -> IPCResult<()> {
let request = event.payload::<GetFileThumbnailOfSizeRequest>()?;
let repo = get_repo_from_context(ctx).await;
let file_hash = hash_by_identifier(request.id.clone(), &repo).await?;
let thumbnails = repo.get_file_thumbnails(file_hash).await?;
let file_cd = cd_by_identifier(request.id.clone(), &repo).await?;
let thumbnails = repo.get_file_thumbnails(&file_cd).await?;
let min_size = request.min_size;
let max_size = request.max_size;
@ -289,12 +309,14 @@ impl FilesNamespace {
async fn update_file_name(ctx: &Context, event: Event) -> IPCResult<()> {
let repo = get_repo_from_context(ctx).await;
let request = event.payload::<UpdateFileNameRequest>()?;
let mut file = file_by_identifier(request.file_id, &repo).await?;
file.set_name(request.name).await?;
let file = file_by_identifier(request.file_id, &repo).await?;
let mut metadata = file.metadata().await?;
metadata.set_name(request.name).await?;
ctx.emit_to(
Self::name(),
"update_file_name",
FileMetadataResponse::from_model(file),
FileMetadataResponse::from_model(metadata),
)
.await?;
@ -307,7 +329,7 @@ impl FilesNamespace {
let repo = get_repo_from_context(ctx).await;
let id = event.payload::<FileIdentifier>()?;
let file = file_by_identifier(id, &repo).await?;
let thumbnails = repo.get_file_thumbnails(file.hash().to_owned()).await?;
let thumbnails = repo.get_file_thumbnails(file.cd()).await?;
for thumb in thumbnails {
thumb.delete().await?;
@ -363,7 +385,7 @@ fn compare_files(
direction,
),
SortKey::FileType(direction) => {
adjust_for_dir(compare_opts(&ctx_a.mime_type, &ctx_b.mime_type), direction)
adjust_for_dir(ctx_a.mime_type.cmp(&ctx_b.mime_type), direction)
}
SortKey::NumTags(direction) => adjust_for_dir(
cmp_u32.compare(&ctx_a.tag_count, &ctx_b.tag_count),

@ -40,7 +40,7 @@ impl RepoNamespace {
tag_count: counts.tag_count as u64,
namespace_count: counts.namespace_count as u64,
mapping_count: counts.mapping_count as u64,
hash_count: counts.hash_count as u64,
hash_count: counts.cd_count as u64,
};
tracing::debug!("metadata = {:?}", metadata);

@ -1,10 +1,12 @@
use crate::from_model::FromModel;
use crate::utils::{file_by_identifier, get_repo_from_context};
use mediarepo_core::bromine::prelude::*;
use mediarepo_core::content_descriptor::decode_content_descriptor;
use mediarepo_core::mediarepo_api::types::files::{GetFileTagsRequest, GetFilesTagsRequest};
use mediarepo_core::mediarepo_api::types::tags::{
ChangeFileTagsRequest, NamespaceResponse, TagResponse,
};
use rayon::iter::{IntoParallelIterator, ParallelIterator};
pub struct TagsNamespace;
@ -78,7 +80,13 @@ impl TagsNamespace {
let repo = get_repo_from_context(ctx).await;
let request = event.payload::<GetFilesTagsRequest>()?;
let tag_responses: Vec<TagResponse> = repo
.find_tags_for_hashes(request.hashes)
.find_tags_for_file_identifiers(
request
.cds
.into_par_iter()
.filter_map(|c| decode_content_descriptor(c).ok())
.collect(),
)
.await?
.into_iter()
.map(TagResponse::from_model)

@ -1,4 +1,5 @@
use mediarepo_core::bromine::ipc::context::Context;
use mediarepo_core::content_descriptor::decode_content_descriptor;
use mediarepo_core::error::{RepoError, RepoResult};
use mediarepo_core::mediarepo_api::types::identifier::FileIdentifier;
use mediarepo_model::file::File;
@ -15,20 +16,20 @@ pub async fn get_repo_from_context(ctx: &Context) -> Arc<Repo> {
pub async fn file_by_identifier(identifier: FileIdentifier, repo: &Repo) -> RepoResult<File> {
let file = match identifier {
FileIdentifier::ID(id) => repo.file_by_id(id).await,
FileIdentifier::Hash(hash) => repo.file_by_hash(hash).await,
FileIdentifier::CD(cd) => repo.file_by_cd(cd).await,
}?;
file.ok_or_else(|| RepoError::from("Thumbnail not found"))
}
pub async fn hash_by_identifier(identifier: FileIdentifier, repo: &Repo) -> RepoResult<String> {
pub async fn cd_by_identifier(identifier: FileIdentifier, repo: &Repo) -> RepoResult<Vec<u8>> {
match identifier {
FileIdentifier::ID(id) => {
let file = repo
.file_by_id(id)
.await?
.ok_or_else(|| "Thumbnail not found")?;
Ok(file.hash().to_owned())
Ok(file.cd().to_owned())
}
FileIdentifier::Hash(hash) => Ok(hash),
FileIdentifier::CD(cd) => decode_content_descriptor(cd),
}
}

@ -291,7 +291,7 @@ async fn add_tags_from_tags_file(
#[tracing::instrument(skip(repo, file))]
async fn create_file_thumbnails(repo: &Repo, file: File) -> RepoResult<()> {
let file_thumbnails = repo.get_file_thumbnails(file.hash().to_owned()).await?;
let file_thumbnails = repo.get_file_thumbnails(file.cd()).await?;
if file_thumbnails.is_empty() {
repo.create_thumbnails_for_file(&file).await?;

Loading…
Cancel
Save