diff --git a/mediarepo-api/Cargo.toml b/mediarepo-api/Cargo.toml index 9030736..42c9029 100644 --- a/mediarepo-api/Cargo.toml +++ b/mediarepo-api/Cargo.toml @@ -10,7 +10,7 @@ license = "gpl-3" tracing = "0.1.29" thiserror = "1.0.30" async-trait = {version = "0.1.51", optional=true} -rmp-ipc = {version = "0.8.1", optional=true} +rmp-ipc = {version = "0.9.0", optional=true} parking_lot = {version="0.11.2", optional=true} serde_json = {version="1.0.68", optional=true} directories = {version="4.0.1", optional=true} diff --git a/mediarepo-api/src/client_api/file.rs b/mediarepo-api/src/client_api/file.rs index e3fb0ed..51904be 100644 --- a/mediarepo-api/src/client_api/file.rs +++ b/mediarepo-api/src/client_api/file.rs @@ -8,27 +8,43 @@ use crate::types::identifier::FileIdentifier; use async_trait::async_trait; use rmp_ipc::context::{PoolGuard, PooledContext}; use rmp_ipc::payload::{BytePayload, EventSendPayload}; -use rmp_ipc::prelude::Context; +use rmp_ipc::prelude::*; -#[derive(Clone)] -pub struct FileApi { - ctx: PooledContext, +pub struct FileApi { + ctx: PooledContext, +} + +impl Clone for FileApi +where + S: AsyncProtocolStream, +{ + fn clone(&self) -> Self { + Self { + ctx: self.ctx.clone(), + } + } } #[async_trait] -impl IPCApi for FileApi { +impl IPCApi for FileApi +where + S: AsyncProtocolStream, +{ fn namespace() -> &'static str { "files" } - fn ctx(&self) -> PoolGuard { + fn ctx(&self) -> PoolGuard> { self.ctx.acquire() } } -impl FileApi { +impl FileApi +where + S: AsyncProtocolStream, +{ /// Creates a new file api client - pub fn new(ctx: PooledContext) -> Self { + pub fn new(ctx: PooledContext) -> Self { Self { ctx } } diff --git a/mediarepo-api/src/client_api/mod.rs b/mediarepo-api/src/client_api/mod.rs index 650c651..219acca 100644 --- a/mediarepo-api/src/client_api/mod.rs +++ b/mediarepo-api/src/client_api/mod.rs @@ -1,5 +1,6 @@ pub mod error; pub mod file; +pub mod protocol; pub mod tag; use crate::client_api::error::{ApiError, ApiResult}; @@ -11,13 +12,14 @@ use rmp_ipc::context::{PoolGuard, PooledContext}; use rmp_ipc::ipc::context::Context; use rmp_ipc::ipc::stream_emitter::EmitMetadata; use rmp_ipc::payload::{EventReceivePayload, EventSendPayload}; +use rmp_ipc::prelude::{AsyncProtocolStream, AsyncStreamProtocolListener}; use rmp_ipc::IPCBuilder; use std::fmt::Debug; #[async_trait] -pub trait IPCApi { +pub trait IPCApi { fn namespace() -> &'static str; - fn ctx(&self) -> PoolGuard; + fn ctx(&self) -> PoolGuard>; async fn emit( &self, @@ -47,17 +49,31 @@ pub trait IPCApi { Ok(response.data()?) } } +pub struct ApiClient { + ctx: PooledContext, + pub file: FileApi, + pub tag: TagApi, +} -#[derive(Clone)] -pub struct ApiClient { - ctx: PooledContext, - pub file: FileApi, - pub tag: TagApi, +impl Clone for ApiClient +where + L: AsyncStreamProtocolListener, +{ + fn clone(&self) -> Self { + Self { + ctx: self.ctx.clone(), + file: self.file.clone(), + tag: self.tag.clone(), + } + } } -impl ApiClient { +impl ApiClient +where + L: AsyncStreamProtocolListener, +{ /// Creates a new client from an existing ipc context - pub fn new(ctx: PooledContext) -> Self { + pub fn new(ctx: PooledContext) -> Self { Self { file: FileApi::new(ctx.clone()), tag: TagApi::new(ctx.clone()), @@ -67,8 +83,8 @@ impl ApiClient { /// Connects to the ipc Socket #[tracing::instrument(level = "debug")] - pub async fn connect(address: &str) -> ApiResult { - let ctx = IPCBuilder::new() + pub async fn connect(address: L::AddressType) -> ApiResult { + let ctx = IPCBuilder::::new() .address(address) .build_pooled_client(8) .await?; diff --git a/mediarepo-api/src/client_api/protocol.rs b/mediarepo-api/src/client_api/protocol.rs new file mode 100644 index 0000000..90659e0 --- /dev/null +++ b/mediarepo-api/src/client_api/protocol.rs @@ -0,0 +1,177 @@ +use async_trait::async_trait; +use rmp_ipc::error::Result; +use rmp_ipc::prelude::IPCResult; +use rmp_ipc::protocol::*; +use std::io::Error; +use std::net::ToSocketAddrs; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; +use tokio::net::{TcpListener, TcpStream}; + +#[derive(Debug)] +pub enum ApiProtocolListener { + #[cfg(unix)] + UnixSocket(tokio::net::UnixListener), + + Tcp(TcpListener), +} + +unsafe impl Send for ApiProtocolListener {} +unsafe impl Sync for ApiProtocolListener {} + +#[async_trait] +impl AsyncStreamProtocolListener for ApiProtocolListener { + type AddressType = String; + type RemoteAddressType = String; + type Stream = ApiProtocolStream; + + async fn protocol_bind(address: Self::AddressType) -> Result { + if let Some(addr) = address.to_socket_addrs().ok().and_then(|mut a| a.next()) { + let listener = TcpListener::bind(addr).await?; + Ok(Self::Tcp(listener)) + } else { + #[cfg(unix)] + { + use std::path::PathBuf; + use tokio::net::UnixListener; + let path = PathBuf::from(address); + let listener = UnixListener::bind(path)?; + + Ok(Self::UnixSocket(listener)) + } + #[cfg(not(unix))] + { + Err(IPCError::BuildError( + "The address can not be made into a socket address".to_string(), + )) + } + } + } + + async fn protocol_accept(&self) -> Result<(Self::Stream, Self::RemoteAddressType)> { + match self { + ApiProtocolListener::UnixSocket(listener) => { + let (stream, addr) = listener.accept().await?; + Ok(( + ApiProtocolStream::UnixSocket(stream), + addr.as_pathname() + .map(|p| p.to_str().unwrap().to_string()) + .unwrap_or(String::from("unknown")), + )) + } + ApiProtocolListener::Tcp(listener) => { + let (stream, addr) = listener.accept().await?; + Ok((ApiProtocolStream::Tcp(stream), addr.to_string())) + } + } + } +} + +#[derive(Debug)] +pub enum ApiProtocolStream { + #[cfg(unix)] + UnixSocket(tokio::net::UnixStream), + + Tcp(TcpStream), +} + +unsafe impl Send for ApiProtocolStream {} +unsafe impl Sync for ApiProtocolStream {} + +impl AsyncProtocolStreamSplit for ApiProtocolStream { + type OwnedSplitReadHalf = Box; + type OwnedSplitWriteHalf = Box; + + fn protocol_into_split(self) -> (Self::OwnedSplitReadHalf, Self::OwnedSplitWriteHalf) { + match self { + #[cfg(unix)] + ApiProtocolStream::UnixSocket(stream) => { + let (read, write) = stream.into_split(); + (Box::new(read), Box::new(write)) + } + ApiProtocolStream::Tcp(stream) => { + let (read, write) = stream.into_split(); + (Box::new(read), Box::new(write)) + } + } + } +} + +#[async_trait] +impl AsyncProtocolStream for ApiProtocolStream { + type AddressType = String; + + async fn protocol_connect(address: Self::AddressType) -> IPCResult { + if let Some(addr) = address.to_socket_addrs().ok().and_then(|mut a| a.next()) { + let stream = TcpStream::connect(addr).await?; + Ok(Self::Tcp(stream)) + } else { + #[cfg(unix)] + { + use std::path::PathBuf; + use tokio::net::UnixStream; + let path = PathBuf::from(address); + let stream = UnixStream::connect(path).await?; + + Ok(Self::UnixSocket(stream)) + } + #[cfg(not(unix))] + { + Err(IPCError::BuildError( + "The address can not be made into a socket address".to_string(), + )) + } + } + } +} + +impl AsyncRead for ApiProtocolStream { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + match self.get_mut() { + #[cfg(unix)] + ApiProtocolStream::UnixSocket(stream) => Pin::new(stream).poll_read(cx, buf), + ApiProtocolStream::Tcp(stream) => Pin::new(stream).poll_read(cx, buf), + } + } +} + +impl AsyncWrite for ApiProtocolStream { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + match self.get_mut() { + #[cfg(unix)] + ApiProtocolStream::UnixSocket(stream) => Pin::new(stream).poll_write(cx, buf), + ApiProtocolStream::Tcp(stream) => Pin::new(stream).poll_write(cx, buf), + } + } + + fn poll_flush( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + match self.get_mut() { + #[cfg(unix)] + ApiProtocolStream::UnixSocket(stream) => Pin::new(stream).poll_flush(cx), + ApiProtocolStream::Tcp(stream) => Pin::new(stream).poll_flush(cx), + } + } + + fn poll_shutdown( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + match self.get_mut() { + #[cfg(unix)] + ApiProtocolStream::UnixSocket(stream) => Pin::new(stream).poll_shutdown(cx), + ApiProtocolStream::Tcp(stream) => Pin::new(stream).poll_flush(cx), + } + } +} diff --git a/mediarepo-api/src/client_api/tag.rs b/mediarepo-api/src/client_api/tag.rs index 4ae9d8e..0a77859 100644 --- a/mediarepo-api/src/client_api/tag.rs +++ b/mediarepo-api/src/client_api/tag.rs @@ -6,25 +6,42 @@ use crate::types::tags::{ChangeFileTagsRequest, TagResponse}; use async_trait::async_trait; use rmp_ipc::context::{PoolGuard, PooledContext}; use rmp_ipc::ipc::context::Context; +use rmp_ipc::protocol::AsyncProtocolStream; -#[derive(Clone)] -pub struct TagApi { - ctx: PooledContext, +pub struct TagApi { + ctx: PooledContext, +} + +impl Clone for TagApi +where + S: AsyncProtocolStream, +{ + fn clone(&self) -> Self { + Self { + ctx: self.ctx.clone(), + } + } } #[async_trait] -impl IPCApi for TagApi { +impl IPCApi for TagApi +where + S: AsyncProtocolStream, +{ fn namespace() -> &'static str { "tags" } - fn ctx(&self) -> PoolGuard { + fn ctx(&self) -> PoolGuard> { self.ctx.acquire() } } -impl TagApi { - pub fn new(ctx: PooledContext) -> Self { +impl TagApi +where + S: AsyncProtocolStream, +{ + pub fn new(ctx: PooledContext) -> Self { Self { ctx } } diff --git a/mediarepo-api/src/tauri_plugin/commands/daemon.rs b/mediarepo-api/src/tauri_plugin/commands/daemon.rs index bf96dd7..5dc67f7 100644 --- a/mediarepo-api/src/tauri_plugin/commands/daemon.rs +++ b/mediarepo-api/src/tauri_plugin/commands/daemon.rs @@ -1,7 +1,10 @@ use crate::tauri_plugin::commands::AppAccess; use crate::tauri_plugin::error::PluginResult; -use rmp_ipc::prelude::IPCResult; +use rmp_ipc::prelude::{IPCError, IPCResult}; use rmp_ipc::IPCBuilder; +use std::io::ErrorKind; +use std::net::{SocketAddr, ToSocketAddrs}; +use tokio::net::TcpListener; #[tauri::command] pub async fn init_repository(app_state: AppAccess<'_>, repo_path: String) -> PluginResult<()> { @@ -35,7 +38,11 @@ pub async fn check_daemon_running(address: String) -> PluginResult { } async fn try_connect_daemon(address: String) -> IPCResult<()> { - let ctx = IPCBuilder::new().address(address).build_client().await?; + let address = get_socket_address(address)?; + let ctx = IPCBuilder::::new() + .address(address) + .build_client() + .await?; ctx.emitter .emit("info", ()) .await? @@ -44,3 +51,16 @@ async fn try_connect_daemon(address: String) -> IPCResult<()> { ctx.stop().await?; Ok(()) } + +fn get_socket_address(address: String) -> IPCResult { + address + .to_socket_addrs() + .ok() + .and_then(|mut addr| addr.next()) + .ok_or_else(|| { + IPCError::IoError(std::io::Error::new( + ErrorKind::InvalidInput, + "Invalid Socket address", + )) + }) +} diff --git a/mediarepo-api/src/tauri_plugin/commands/repo.rs b/mediarepo-api/src/tauri_plugin/commands/repo.rs index 015a351..fd39b4d 100644 --- a/mediarepo-api/src/tauri_plugin/commands/repo.rs +++ b/mediarepo-api/src/tauri_plugin/commands/repo.rs @@ -143,7 +143,7 @@ pub async fn select_repository( config.listen_address }; - let client = ApiClient::connect(&address).await?; + let client = ApiClient::connect(address).await?; api_state.set_api(client).await; let mut active_repo = app_state.active_repo.write().await; diff --git a/mediarepo-api/src/tauri_plugin/state.rs b/mediarepo-api/src/tauri_plugin/state.rs index e59ea80..dba0be4 100644 --- a/mediarepo-api/src/tauri_plugin/state.rs +++ b/mediarepo-api/src/tauri_plugin/state.rs @@ -8,15 +8,19 @@ use parking_lot::RwLock as ParkingRwLock; use tauri::async_runtime::RwLock; use tokio::time::Instant; +use crate::client_api::protocol::ApiProtocolListener; use crate::client_api::ApiClient; use crate::daemon_management::cli::DaemonCli; use crate::tauri_plugin::error::{PluginError, PluginResult}; use crate::tauri_plugin::settings::{load_settings, Repository, Settings}; pub struct ApiState { - inner: Arc>>, + inner: Arc>>>, } +unsafe impl Send for ApiState {} +unsafe impl Sync for ApiState {} + impl ApiState { pub fn new() -> Self { Self { @@ -25,7 +29,7 @@ impl ApiState { } /// Sets the active api client and disconnects the old one - pub async fn set_api(&self, client: ApiClient) { + pub async fn set_api(&self, client: ApiClient) { let mut inner = self.inner.write().await; let old_client = mem::replace(&mut *inner, Some(client)); @@ -44,7 +48,7 @@ impl ApiState { } } - pub async fn api(&self) -> PluginResult { + pub async fn api(&self) -> PluginResult> { let inner = self.inner.read().await; inner .clone() diff --git a/mediarepo-api/src/types/files.rs b/mediarepo-api/src/types/files.rs index 0760c80..c812672 100644 --- a/mediarepo-api/src/types/files.rs +++ b/mediarepo-api/src/types/files.rs @@ -17,6 +17,13 @@ pub struct GetFileThumbnailsRequest { pub id: FileIdentifier, } +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct GetFileThumbnailOfSizeRequest { + pub id: FileIdentifier, + pub min_size: (u32, u32), + pub max_size: (u32, u32), +} + #[derive(Clone, Debug, Serialize, Deserialize)] pub struct GetFileTagsRequest { pub id: FileIdentifier, @@ -81,6 +88,7 @@ pub struct FileMetadataResponse { #[derive(Clone, Debug, Serialize, Deserialize)] pub struct ThumbnailMetadataResponse { pub id: i64, + pub file_id: i64, pub hash: String, pub height: i32, pub width: i32, @@ -92,3 +100,9 @@ pub struct UpdateFileNameRequest { pub file_id: FileIdentifier, pub name: String, } + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct ThumbnailFullResponse { + pub metadata: ThumbnailMetadataResponse, + pub data: Vec, +}