Add file dao and tag dao
Signed-off-by: trivernis <trivernis@protonmail.com>pull/5/head
parent
a6da2b9e1e
commit
cb493b4651
@ -0,0 +1,71 @@
|
||||
use crate::dto::{AddFileDto, FileDto};
|
||||
use chrono::{Local, NaiveDateTime};
|
||||
use mediarepo_core::error::RepoResult;
|
||||
use mediarepo_database::entities::{content_descriptor, file, file_metadata};
|
||||
use sea_orm::ActiveValue::Set;
|
||||
use sea_orm::{ActiveModelTrait, ConnectionTrait, DatabaseTransaction};
|
||||
use std::io::Cursor;
|
||||
|
||||
use crate::dao::file::FileDao;
|
||||
|
||||
impl FileDao {
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
pub async fn add(&self, add_dto: AddFileDto) -> RepoResult<FileDto> {
|
||||
let trx = self.ctx.db.begin().await?;
|
||||
let file_size = add_dto.content.len();
|
||||
let cd_bin = self
|
||||
.ctx
|
||||
.main_storage
|
||||
.add_file(Cursor::new(add_dto.content), None)
|
||||
.await?;
|
||||
let cd_model = content_descriptor::ActiveModel {
|
||||
descriptor: Set(cd_bin),
|
||||
..Default::default()
|
||||
};
|
||||
let cd = cd_model.insert(&trx).await?;
|
||||
|
||||
let model = file::ActiveModel {
|
||||
cd_id: Set(cd.id),
|
||||
mime_type: Set(add_dto.mime_type),
|
||||
..Default::default()
|
||||
};
|
||||
let file: file::Model = model.insert(&trx).await?;
|
||||
|
||||
let metadata = add_file_metadata(
|
||||
&trx,
|
||||
file.id,
|
||||
file_size as i64,
|
||||
add_dto.creation_time,
|
||||
add_dto.change_time,
|
||||
add_dto.name,
|
||||
)
|
||||
.await?;
|
||||
|
||||
trx.commit().await?;
|
||||
|
||||
Ok(FileDto::new(file, cd, Some(metadata)))
|
||||
}
|
||||
}
|
||||
|
||||
async fn add_file_metadata(
|
||||
trx: &DatabaseTransaction,
|
||||
file_id: i64,
|
||||
size: i64,
|
||||
creation_time: NaiveDateTime,
|
||||
change_time: NaiveDateTime,
|
||||
name: Option<String>,
|
||||
) -> RepoResult<file_metadata::Model> {
|
||||
let metadata_model = file_metadata::ActiveModel {
|
||||
file_id: Set(file_id),
|
||||
size: Set(size),
|
||||
import_time: Set(Local::now().naive_local()),
|
||||
creation_time: Set(creation_time),
|
||||
change_time: Set(change_time),
|
||||
name: Set(name),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let metadata = metadata_model.insert(trx).await?;
|
||||
|
||||
Ok(metadata)
|
||||
}
|
@ -0,0 +1,41 @@
|
||||
use crate::dao::file::{map_file_and_cd, FileDao};
|
||||
use crate::dto::FileDto;
|
||||
use mediarepo_core::error::{RepoError, RepoResult};
|
||||
use mediarepo_database::entities::{
|
||||
content_descriptor, content_descriptor_tag, file, file_metadata,
|
||||
};
|
||||
use sea_orm::prelude::*;
|
||||
use sea_orm::ConnectionTrait;
|
||||
|
||||
impl FileDao {
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
pub async fn delete(&self, file: FileDto) -> RepoResult<()> {
|
||||
let trx = self.ctx.db.begin().await?;
|
||||
|
||||
file_metadata::Entity::delete_many()
|
||||
.filter(file_metadata::Column::FileId.eq(file.id()))
|
||||
.exec(&trx)
|
||||
.await?;
|
||||
file::Entity::delete_many()
|
||||
.filter(file::Column::Id.eq(file.id()))
|
||||
.exec(&trx)
|
||||
.await?;
|
||||
content_descriptor_tag::Entity::delete_many()
|
||||
.filter(content_descriptor_tag::Column::CdId.eq(file.cd_id()))
|
||||
.exec(&trx)
|
||||
.await?;
|
||||
content_descriptor::Entity::delete_many()
|
||||
.filter(content_descriptor::Column::Id.eq(file.cd_id()))
|
||||
.exec(&trx)
|
||||
.await?;
|
||||
|
||||
self.ctx
|
||||
.thumbnail_storage
|
||||
.delete_parent(&file.encoded_cd())
|
||||
.await?;
|
||||
self.ctx.main_storage.delete_file(file.cd()).await?;
|
||||
trx.commit().await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
@ -0,0 +1,149 @@
|
||||
use crate::dao::{DaoContext, DaoProvider};
|
||||
use crate::dto::{FileDto, FileMetadataDto, ThumbnailDto};
|
||||
use mediarepo_core::error::RepoResult;
|
||||
use mediarepo_database::entities::{content_descriptor, file, file_metadata};
|
||||
use sea_orm::prelude::*;
|
||||
use tokio::io::AsyncReadExt;
|
||||
|
||||
pub mod add;
|
||||
pub mod delete;
|
||||
pub mod find;
|
||||
pub mod update;
|
||||
|
||||
pub struct FileDao {
|
||||
ctx: DaoContext,
|
||||
}
|
||||
|
||||
impl DaoProvider for FileDao {
|
||||
fn dao_ctx(&self) -> DaoContext {
|
||||
self.ctx.clone()
|
||||
}
|
||||
}
|
||||
|
||||
impl FileDao {
|
||||
pub fn new(ctx: DaoContext) -> Self {
|
||||
Self { ctx }
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
pub async fn all(&self) -> RepoResult<Vec<FileDto>> {
|
||||
let files = file::Entity::find()
|
||||
.find_also_related(content_descriptor::Entity)
|
||||
.all(&self.ctx.db)
|
||||
.await?
|
||||
.into_iter()
|
||||
.filter_map(map_file_and_cd)
|
||||
.collect();
|
||||
|
||||
Ok(files)
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
#[inline]
|
||||
pub async fn by_id(&self, id: i64) -> RepoResult<Option<FileDto>> {
|
||||
self.all_by_id(vec![id]).await.map(|f| f.into_iter().next())
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
#[inline]
|
||||
pub async fn by_cd(&self, cd: Vec<u8>) -> RepoResult<Option<FileDto>> {
|
||||
self.all_by_cd(vec![cd]).await.map(|f| f.into_iter().next())
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
pub async fn all_by_cd(&self, cds: Vec<Vec<u8>>) -> RepoResult<Vec<FileDto>> {
|
||||
if cds.is_empty() {
|
||||
return Ok(vec![]);
|
||||
}
|
||||
|
||||
let files = file::Entity::find()
|
||||
.find_also_related(content_descriptor::Entity)
|
||||
.filter(content_descriptor::Column::Descriptor.is_in(cds))
|
||||
.all(&self.ctx.db)
|
||||
.await?
|
||||
.into_iter()
|
||||
.filter_map(map_file_and_cd)
|
||||
.collect();
|
||||
|
||||
Ok(files)
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
pub async fn all_by_id(&self, ids: Vec<i64>) -> RepoResult<Vec<FileDto>> {
|
||||
if ids.is_empty() {
|
||||
return Ok(vec![]);
|
||||
}
|
||||
|
||||
let files = file::Entity::find()
|
||||
.find_also_related(content_descriptor::Entity)
|
||||
.filter(file::Column::Id.is_in(ids))
|
||||
.all(&self.ctx.db)
|
||||
.await?
|
||||
.into_iter()
|
||||
.filter_map(map_file_and_cd)
|
||||
.collect();
|
||||
|
||||
Ok(files)
|
||||
}
|
||||
|
||||
pub async fn metadata(&self, file_id: i64) -> RepoResult<Option<FileMetadataDto>> {
|
||||
self.all_metadata(vec![file_id])
|
||||
.await
|
||||
.map(|m| m.into_iter().next())
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
pub async fn all_metadata(&self, file_ids: Vec<i64>) -> RepoResult<Vec<FileMetadataDto>> {
|
||||
if file_ids.is_empty() {
|
||||
return Ok(vec![]);
|
||||
}
|
||||
|
||||
let metadata = file_metadata::Entity::find()
|
||||
.filter(file_metadata::Column::FileId.is_in(file_ids))
|
||||
.all(&self.ctx.db)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|m| FileMetadataDto::new(m))
|
||||
.collect();
|
||||
|
||||
Ok(metadata)
|
||||
}
|
||||
|
||||
/// Returns all thumbnails for a cd
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
pub async fn thumbnails(&self, encoded_cd: String) -> RepoResult<Vec<ThumbnailDto>> {
|
||||
let thumbnails = self
|
||||
.ctx
|
||||
.thumbnail_storage
|
||||
.get_thumbnails(&encoded_cd)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|(size, path)| {
|
||||
ThumbnailDto::new(path, encoded_cd.clone(), size, String::from("image/png"))
|
||||
})
|
||||
.collect();
|
||||
|
||||
Ok(thumbnails)
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
pub async fn get_bytes(&self, cd: &[u8]) -> RepoResult<Vec<u8>> {
|
||||
let mut buf = Vec::new();
|
||||
let mut reader = self.ctx.main_storage.get_file(cd).await?.1;
|
||||
reader.read_to_end(&mut buf).await?;
|
||||
|
||||
Ok(buf)
|
||||
}
|
||||
}
|
||||
|
||||
fn map_file_and_cd(
|
||||
(file, cd): (file::Model, Option<content_descriptor::Model>),
|
||||
) -> Option<FileDto> {
|
||||
cd.map(|c| FileDto::new(file, c, None))
|
||||
}
|
||||
|
||||
fn map_cd_and_file(
|
||||
(cd, file): (content_descriptor::Model, Option<file::Model>),
|
||||
) -> Option<FileDto> {
|
||||
file.map(|f| FileDto::new(f, cd, None))
|
||||
}
|
@ -0,0 +1,93 @@
|
||||
use crate::dto::{FileDto, FileMetadataDto, ThumbnailDto, UpdateFileDto, UpdateFileMetadataDto};
|
||||
use mediarepo_core::error::{RepoError, RepoResult};
|
||||
use mediarepo_core::fs::thumbnail_store::Dimensions;
|
||||
use mediarepo_core::thumbnailer;
|
||||
use mediarepo_core::thumbnailer::{Thumbnail, ThumbnailSize};
|
||||
use mediarepo_database::entities::{content_descriptor, file, file_metadata};
|
||||
use sea_orm::prelude::*;
|
||||
use sea_orm::ActiveValue::{Set, Unchanged};
|
||||
use sea_orm::{ConnectionTrait, NotSet};
|
||||
use std::fmt::Debug;
|
||||
use std::io::Cursor;
|
||||
use std::str::FromStr;
|
||||
|
||||
use crate::dao::file::FileDao;
|
||||
use crate::dao::opt_to_active_val;
|
||||
|
||||
impl FileDao {
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
pub async fn update(&self, update_dto: UpdateFileDto) -> RepoResult<FileDto> {
|
||||
let trx = self.ctx.db.begin().await?;
|
||||
let model = file::ActiveModel {
|
||||
id: Set(update_dto.id),
|
||||
cd_id: update_dto.cd_id.map(|v| Set(v)).unwrap_or(NotSet),
|
||||
mime_type: update_dto.mime_type.map(|v| Set(v)).unwrap_or(NotSet),
|
||||
status: update_dto.status.map(|v| Set(v as i32)).unwrap_or(NotSet),
|
||||
};
|
||||
let file_model = model.update(&trx).await?;
|
||||
let cd = file_model
|
||||
.find_related(content_descriptor::Entity)
|
||||
.one(&trx)
|
||||
.await?
|
||||
.ok_or_else(|| RepoError::from("Content descriptor not found"))?;
|
||||
trx.commit().await?;
|
||||
|
||||
Ok(FileDto::new(file_model, cd, None))
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
pub async fn update_metadata(
|
||||
&self,
|
||||
update_dto: UpdateFileMetadataDto,
|
||||
) -> RepoResult<FileMetadataDto> {
|
||||
let model = file_metadata::ActiveModel {
|
||||
file_id: Unchanged(update_dto.file_id),
|
||||
name: opt_to_active_val(update_dto.name),
|
||||
comment: opt_to_active_val(update_dto.comment),
|
||||
size: opt_to_active_val(update_dto.size),
|
||||
change_time: opt_to_active_val(update_dto.change_time),
|
||||
..Default::default()
|
||||
};
|
||||
let metadata = model.update(&self.ctx.db).await?;
|
||||
|
||||
Ok(FileMetadataDto::new(metadata))
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
pub async fn create_thumbnails<I: IntoIterator<Item = ThumbnailSize> + Debug>(
|
||||
&self,
|
||||
file: FileDto,
|
||||
sizes: I,
|
||||
) -> RepoResult<Vec<ThumbnailDto>> {
|
||||
let bytes = self.get_bytes(file.cd()).await?;
|
||||
let mime_type = mime::Mime::from_str(file.mime_type())
|
||||
.unwrap_or_else(|_| mime::APPLICATION_OCTET_STREAM);
|
||||
let thumbnails =
|
||||
thumbnailer::create_thumbnails(Cursor::new(bytes), mime_type.clone(), sizes)?;
|
||||
let mut dtos = Vec::new();
|
||||
|
||||
for thumbnail in thumbnails {
|
||||
let mut buf = Vec::new();
|
||||
let size = thumbnail.size();
|
||||
let size = Dimensions {
|
||||
height: size.1,
|
||||
width: size.0,
|
||||
};
|
||||
thumbnail.write_png(&mut buf)?;
|
||||
|
||||
let path = self
|
||||
.ctx
|
||||
.thumbnail_storage
|
||||
.add_thumbnail(file.encoded_cd(), size.clone(), &buf)
|
||||
.await?;
|
||||
dtos.push(ThumbnailDto::new(
|
||||
path,
|
||||
file.encoded_cd(),
|
||||
size,
|
||||
mime_type.to_string(),
|
||||
))
|
||||
}
|
||||
|
||||
Ok(dtos)
|
||||
}
|
||||
}
|
@ -0,0 +1,33 @@
|
||||
pub mod file;
|
||||
pub mod repo;
|
||||
pub mod tag;
|
||||
|
||||
use crate::dao::file::FileDao;
|
||||
use crate::dao::tag::TagDao;
|
||||
use mediarepo_core::fs::file_hash_store::FileHashStore;
|
||||
use mediarepo_core::fs::thumbnail_store::ThumbnailStore;
|
||||
use sea_orm::{ActiveValue, DatabaseConnection};
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct DaoContext {
|
||||
pub db: DatabaseConnection,
|
||||
pub main_storage: FileHashStore,
|
||||
pub thumbnail_storage: ThumbnailStore,
|
||||
}
|
||||
|
||||
pub trait DaoProvider {
|
||||
fn dao_ctx(&self) -> DaoContext;
|
||||
|
||||
fn file(&self) -> FileDao {
|
||||
FileDao::new(self.dao_ctx())
|
||||
}
|
||||
|
||||
fn tag(&self) -> TagDao {
|
||||
TagDao::new(self.dao_ctx())
|
||||
}
|
||||
}
|
||||
|
||||
fn opt_to_active_val<T: Into<sea_orm::Value>>(opt: Option<T>) -> ActiveValue<T> {
|
||||
opt.map(|v| ActiveValue::Set(v))
|
||||
.unwrap_or(ActiveValue::NotSet)
|
||||
}
|
@ -0,0 +1,64 @@
|
||||
use crate::dao::tag::TagDao;
|
||||
use mediarepo_core::error::RepoResult;
|
||||
use mediarepo_database::entities::content_descriptor_tag;
|
||||
use sea_orm::prelude::*;
|
||||
use sea_orm::ActiveValue::Set;
|
||||
use sea_orm::{ConnectionTrait, DatabaseTransaction};
|
||||
|
||||
impl TagDao {
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
pub async fn upsert_mappings(&self, cd_ids: Vec<i64>, tag_ids: Vec<i64>) -> RepoResult<()> {
|
||||
let trx = self.ctx.db.begin().await?;
|
||||
|
||||
let existing_mappings = get_existing_mappings(&trx, &cd_ids, &tag_ids).await?;
|
||||
|
||||
let active_models: Vec<content_descriptor_tag::ActiveModel> = cd_ids
|
||||
.into_iter()
|
||||
.flat_map(|cd_id: i64| {
|
||||
tag_ids
|
||||
.iter()
|
||||
.filter(|tag_id| !existing_mappings.contains(&(cd_id, **tag_id)))
|
||||
.map(move |tag_id| content_descriptor_tag::ActiveModel {
|
||||
cd_id: Set(cd_id),
|
||||
tag_id: Set(*tag_id),
|
||||
})
|
||||
.collect::<Vec<content_descriptor_tag::ActiveModel>>()
|
||||
})
|
||||
.collect();
|
||||
|
||||
content_descriptor_tag::Entity::insert_many(active_models)
|
||||
.exec(&trx)
|
||||
.await?;
|
||||
|
||||
trx.commit().await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
pub async fn remove_mappings(&self, cd_ids: Vec<i64>, tag_ids: Vec<i64>) -> RepoResult<()> {
|
||||
content_descriptor_tag::Entity::delete_many()
|
||||
.filter(content_descriptor_tag::Column::CdId.is_in(cd_ids))
|
||||
.filter(content_descriptor_tag::Column::TagId.is_in(tag_ids))
|
||||
.exec(&self.ctx.db)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_existing_mappings(
|
||||
trx: &DatabaseTransaction,
|
||||
cd_ids: &Vec<i64>,
|
||||
tag_ids: &Vec<i64>,
|
||||
) -> RepoResult<Vec<(i64, i64)>> {
|
||||
let existing_mappings: Vec<(i64, i64)> = content_descriptor_tag::Entity::find()
|
||||
.filter(content_descriptor_tag::Column::CdId.is_in(cd_ids.clone()))
|
||||
.filter(content_descriptor_tag::Column::TagId.is_in(tag_ids.clone()))
|
||||
.all(trx)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|model: content_descriptor_tag::Model| (model.tag_id, model.cd_id))
|
||||
.collect();
|
||||
Ok(existing_mappings)
|
||||
}
|
@ -0,0 +1,47 @@
|
||||
pub mod mappings;
|
||||
|
||||
use crate::dao::{DaoContext, DaoProvider};
|
||||
use crate::dto::TagDto;
|
||||
use mediarepo_core::error::RepoResult;
|
||||
use mediarepo_database::entities::{content_descriptor, content_descriptor_tag, namespace, tag};
|
||||
use sea_orm::prelude::*;
|
||||
use sea_orm::QuerySelect;
|
||||
use sea_orm::{DatabaseConnection, JoinType};
|
||||
|
||||
pub struct TagDao {
|
||||
ctx: DaoContext,
|
||||
}
|
||||
|
||||
impl DaoProvider for TagDao {
|
||||
fn dao_ctx(&self) -> DaoContext {
|
||||
self.ctx.clone()
|
||||
}
|
||||
}
|
||||
|
||||
impl TagDao {
|
||||
pub fn new(ctx: DaoContext) -> Self {
|
||||
Self { ctx }
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
pub async fn tags_for_cd(&self, cd_id: i64) -> RepoResult<Vec<TagDto>> {
|
||||
let tags = tag::Entity::find()
|
||||
.find_also_related(namespace::Entity)
|
||||
.join(
|
||||
JoinType::LeftJoin,
|
||||
content_descriptor_tag::Relation::Tag.def().rev(),
|
||||
)
|
||||
.join(
|
||||
JoinType::InnerJoin,
|
||||
content_descriptor_tag::Relation::ContentDescriptorId.def(),
|
||||
)
|
||||
.filter(content_descriptor::Column::Id.eq(cd_id))
|
||||
.all(&self.ctx.db)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|(t, n)| TagDto::new(t, n))
|
||||
.collect();
|
||||
|
||||
Ok(tags)
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue