Update rmp-ipc and implement wrapper protocol

Signed-off-by: trivernis <trivernis@protonmail.com>
pull/4/head
trivernis 3 years ago
parent 5fee8a2e4a
commit f48a9aad64

@ -10,7 +10,7 @@ license = "gpl-3"
tracing = "0.1.29"
thiserror = "1.0.30"
async-trait = {version = "0.1.51", optional=true}
rmp-ipc = {version = "0.8.1", optional=true}
rmp-ipc = {version = "0.9.0", optional=true}
parking_lot = {version="0.11.2", optional=true}
serde_json = {version="1.0.68", optional=true}
directories = {version="4.0.1", optional=true}

@ -8,27 +8,43 @@ use crate::types::identifier::FileIdentifier;
use async_trait::async_trait;
use rmp_ipc::context::{PoolGuard, PooledContext};
use rmp_ipc::payload::{BytePayload, EventSendPayload};
use rmp_ipc::prelude::Context;
use rmp_ipc::prelude::*;
#[derive(Clone)]
pub struct FileApi {
ctx: PooledContext,
pub struct FileApi<S: AsyncProtocolStream> {
ctx: PooledContext<S>,
}
impl<S> Clone for FileApi<S>
where
S: AsyncProtocolStream,
{
fn clone(&self) -> Self {
Self {
ctx: self.ctx.clone(),
}
}
}
#[async_trait]
impl IPCApi for FileApi {
impl<S> IPCApi<S> for FileApi<S>
where
S: AsyncProtocolStream,
{
fn namespace() -> &'static str {
"files"
}
fn ctx(&self) -> PoolGuard<Context> {
fn ctx(&self) -> PoolGuard<Context<S>> {
self.ctx.acquire()
}
}
impl FileApi {
impl<S> FileApi<S>
where
S: AsyncProtocolStream,
{
/// Creates a new file api client
pub fn new(ctx: PooledContext) -> Self {
pub fn new(ctx: PooledContext<S>) -> Self {
Self { ctx }
}

@ -1,5 +1,6 @@
pub mod error;
pub mod file;
pub mod protocol;
pub mod tag;
use crate::client_api::error::{ApiError, ApiResult};
@ -11,13 +12,14 @@ use rmp_ipc::context::{PoolGuard, PooledContext};
use rmp_ipc::ipc::context::Context;
use rmp_ipc::ipc::stream_emitter::EmitMetadata;
use rmp_ipc::payload::{EventReceivePayload, EventSendPayload};
use rmp_ipc::prelude::{AsyncProtocolStream, AsyncStreamProtocolListener};
use rmp_ipc::IPCBuilder;
use std::fmt::Debug;
#[async_trait]
pub trait IPCApi {
pub trait IPCApi<S: AsyncProtocolStream> {
fn namespace() -> &'static str;
fn ctx(&self) -> PoolGuard<Context>;
fn ctx(&self) -> PoolGuard<Context<S>>;
async fn emit<T: EventSendPayload + Debug + Send>(
&self,
@ -47,17 +49,31 @@ pub trait IPCApi {
Ok(response.data()?)
}
}
pub struct ApiClient<L: AsyncStreamProtocolListener> {
ctx: PooledContext<L::Stream>,
pub file: FileApi<L::Stream>,
pub tag: TagApi<L::Stream>,
}
#[derive(Clone)]
pub struct ApiClient {
ctx: PooledContext,
pub file: FileApi,
pub tag: TagApi,
impl<L> Clone for ApiClient<L>
where
L: AsyncStreamProtocolListener,
{
fn clone(&self) -> Self {
Self {
ctx: self.ctx.clone(),
file: self.file.clone(),
tag: self.tag.clone(),
}
}
}
impl ApiClient {
impl<L> ApiClient<L>
where
L: AsyncStreamProtocolListener,
{
/// Creates a new client from an existing ipc context
pub fn new(ctx: PooledContext) -> Self {
pub fn new(ctx: PooledContext<L::Stream>) -> Self {
Self {
file: FileApi::new(ctx.clone()),
tag: TagApi::new(ctx.clone()),
@ -67,8 +83,8 @@ impl ApiClient {
/// Connects to the ipc Socket
#[tracing::instrument(level = "debug")]
pub async fn connect(address: &str) -> ApiResult<Self> {
let ctx = IPCBuilder::new()
pub async fn connect(address: L::AddressType) -> ApiResult<Self> {
let ctx = IPCBuilder::<L>::new()
.address(address)
.build_pooled_client(8)
.await?;

@ -0,0 +1,177 @@
use async_trait::async_trait;
use rmp_ipc::error::Result;
use rmp_ipc::prelude::IPCResult;
use rmp_ipc::protocol::*;
use std::io::Error;
use std::net::ToSocketAddrs;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio::net::{TcpListener, TcpStream};
#[derive(Debug)]
pub enum ApiProtocolListener {
#[cfg(unix)]
UnixSocket(tokio::net::UnixListener),
Tcp(TcpListener),
}
unsafe impl Send for ApiProtocolListener {}
unsafe impl Sync for ApiProtocolListener {}
#[async_trait]
impl AsyncStreamProtocolListener for ApiProtocolListener {
type AddressType = String;
type RemoteAddressType = String;
type Stream = ApiProtocolStream;
async fn protocol_bind(address: Self::AddressType) -> Result<Self> {
if let Some(addr) = address.to_socket_addrs().ok().and_then(|mut a| a.next()) {
let listener = TcpListener::bind(addr).await?;
Ok(Self::Tcp(listener))
} else {
#[cfg(unix)]
{
use std::path::PathBuf;
use tokio::net::UnixListener;
let path = PathBuf::from(address);
let listener = UnixListener::bind(path)?;
Ok(Self::UnixSocket(listener))
}
#[cfg(not(unix))]
{
Err(IPCError::BuildError(
"The address can not be made into a socket address".to_string(),
))
}
}
}
async fn protocol_accept(&self) -> Result<(Self::Stream, Self::RemoteAddressType)> {
match self {
ApiProtocolListener::UnixSocket(listener) => {
let (stream, addr) = listener.accept().await?;
Ok((
ApiProtocolStream::UnixSocket(stream),
addr.as_pathname()
.map(|p| p.to_str().unwrap().to_string())
.unwrap_or(String::from("unknown")),
))
}
ApiProtocolListener::Tcp(listener) => {
let (stream, addr) = listener.accept().await?;
Ok((ApiProtocolStream::Tcp(stream), addr.to_string()))
}
}
}
}
#[derive(Debug)]
pub enum ApiProtocolStream {
#[cfg(unix)]
UnixSocket(tokio::net::UnixStream),
Tcp(TcpStream),
}
unsafe impl Send for ApiProtocolStream {}
unsafe impl Sync for ApiProtocolStream {}
impl AsyncProtocolStreamSplit for ApiProtocolStream {
type OwnedSplitReadHalf = Box<dyn AsyncRead + Unpin + Send + Sync>;
type OwnedSplitWriteHalf = Box<dyn AsyncWrite + Unpin + Send + Sync>;
fn protocol_into_split(self) -> (Self::OwnedSplitReadHalf, Self::OwnedSplitWriteHalf) {
match self {
#[cfg(unix)]
ApiProtocolStream::UnixSocket(stream) => {
let (read, write) = stream.into_split();
(Box::new(read), Box::new(write))
}
ApiProtocolStream::Tcp(stream) => {
let (read, write) = stream.into_split();
(Box::new(read), Box::new(write))
}
}
}
}
#[async_trait]
impl AsyncProtocolStream for ApiProtocolStream {
type AddressType = String;
async fn protocol_connect(address: Self::AddressType) -> IPCResult<Self> {
if let Some(addr) = address.to_socket_addrs().ok().and_then(|mut a| a.next()) {
let stream = TcpStream::connect(addr).await?;
Ok(Self::Tcp(stream))
} else {
#[cfg(unix)]
{
use std::path::PathBuf;
use tokio::net::UnixStream;
let path = PathBuf::from(address);
let stream = UnixStream::connect(path).await?;
Ok(Self::UnixSocket(stream))
}
#[cfg(not(unix))]
{
Err(IPCError::BuildError(
"The address can not be made into a socket address".to_string(),
))
}
}
}
}
impl AsyncRead for ApiProtocolStream {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
match self.get_mut() {
#[cfg(unix)]
ApiProtocolStream::UnixSocket(stream) => Pin::new(stream).poll_read(cx, buf),
ApiProtocolStream::Tcp(stream) => Pin::new(stream).poll_read(cx, buf),
}
}
}
impl AsyncWrite for ApiProtocolStream {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<std::prelude::rust_2015::Result<usize, Error>> {
match self.get_mut() {
#[cfg(unix)]
ApiProtocolStream::UnixSocket(stream) => Pin::new(stream).poll_write(cx, buf),
ApiProtocolStream::Tcp(stream) => Pin::new(stream).poll_write(cx, buf),
}
}
fn poll_flush(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<std::prelude::rust_2015::Result<(), Error>> {
match self.get_mut() {
#[cfg(unix)]
ApiProtocolStream::UnixSocket(stream) => Pin::new(stream).poll_flush(cx),
ApiProtocolStream::Tcp(stream) => Pin::new(stream).poll_flush(cx),
}
}
fn poll_shutdown(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<std::prelude::rust_2015::Result<(), Error>> {
match self.get_mut() {
#[cfg(unix)]
ApiProtocolStream::UnixSocket(stream) => Pin::new(stream).poll_shutdown(cx),
ApiProtocolStream::Tcp(stream) => Pin::new(stream).poll_flush(cx),
}
}
}

@ -6,25 +6,42 @@ use crate::types::tags::{ChangeFileTagsRequest, TagResponse};
use async_trait::async_trait;
use rmp_ipc::context::{PoolGuard, PooledContext};
use rmp_ipc::ipc::context::Context;
use rmp_ipc::protocol::AsyncProtocolStream;
#[derive(Clone)]
pub struct TagApi {
ctx: PooledContext,
pub struct TagApi<S: AsyncProtocolStream> {
ctx: PooledContext<S>,
}
impl<S> Clone for TagApi<S>
where
S: AsyncProtocolStream,
{
fn clone(&self) -> Self {
Self {
ctx: self.ctx.clone(),
}
}
}
#[async_trait]
impl IPCApi for TagApi {
impl<S> IPCApi<S> for TagApi<S>
where
S: AsyncProtocolStream,
{
fn namespace() -> &'static str {
"tags"
}
fn ctx(&self) -> PoolGuard<Context> {
fn ctx(&self) -> PoolGuard<Context<S>> {
self.ctx.acquire()
}
}
impl TagApi {
pub fn new(ctx: PooledContext) -> Self {
impl<S> TagApi<S>
where
S: AsyncProtocolStream,
{
pub fn new(ctx: PooledContext<S>) -> Self {
Self { ctx }
}

@ -1,7 +1,10 @@
use crate::tauri_plugin::commands::AppAccess;
use crate::tauri_plugin::error::PluginResult;
use rmp_ipc::prelude::IPCResult;
use rmp_ipc::prelude::{IPCError, IPCResult};
use rmp_ipc::IPCBuilder;
use std::io::ErrorKind;
use std::net::{SocketAddr, ToSocketAddrs};
use tokio::net::TcpListener;
#[tauri::command]
pub async fn init_repository(app_state: AppAccess<'_>, repo_path: String) -> PluginResult<()> {
@ -35,7 +38,11 @@ pub async fn check_daemon_running(address: String) -> PluginResult<bool> {
}
async fn try_connect_daemon(address: String) -> IPCResult<()> {
let ctx = IPCBuilder::new().address(address).build_client().await?;
let address = get_socket_address(address)?;
let ctx = IPCBuilder::<TcpListener>::new()
.address(address)
.build_client()
.await?;
ctx.emitter
.emit("info", ())
.await?
@ -44,3 +51,16 @@ async fn try_connect_daemon(address: String) -> IPCResult<()> {
ctx.stop().await?;
Ok(())
}
fn get_socket_address(address: String) -> IPCResult<SocketAddr> {
address
.to_socket_addrs()
.ok()
.and_then(|mut addr| addr.next())
.ok_or_else(|| {
IPCError::IoError(std::io::Error::new(
ErrorKind::InvalidInput,
"Invalid Socket address",
))
})
}

@ -143,7 +143,7 @@ pub async fn select_repository(
config.listen_address
};
let client = ApiClient::connect(&address).await?;
let client = ApiClient::connect(address).await?;
api_state.set_api(client).await;
let mut active_repo = app_state.active_repo.write().await;

@ -8,15 +8,19 @@ use parking_lot::RwLock as ParkingRwLock;
use tauri::async_runtime::RwLock;
use tokio::time::Instant;
use crate::client_api::protocol::ApiProtocolListener;
use crate::client_api::ApiClient;
use crate::daemon_management::cli::DaemonCli;
use crate::tauri_plugin::error::{PluginError, PluginResult};
use crate::tauri_plugin::settings::{load_settings, Repository, Settings};
pub struct ApiState {
inner: Arc<RwLock<Option<ApiClient>>>,
inner: Arc<RwLock<Option<ApiClient<ApiProtocolListener>>>>,
}
unsafe impl Send for ApiState {}
unsafe impl Sync for ApiState {}
impl ApiState {
pub fn new() -> Self {
Self {
@ -25,7 +29,7 @@ impl ApiState {
}
/// Sets the active api client and disconnects the old one
pub async fn set_api(&self, client: ApiClient) {
pub async fn set_api(&self, client: ApiClient<ApiProtocolListener>) {
let mut inner = self.inner.write().await;
let old_client = mem::replace(&mut *inner, Some(client));
@ -44,7 +48,7 @@ impl ApiState {
}
}
pub async fn api(&self) -> PluginResult<ApiClient> {
pub async fn api(&self) -> PluginResult<ApiClient<ApiProtocolListener>> {
let inner = self.inner.read().await;
inner
.clone()

@ -17,6 +17,13 @@ pub struct GetFileThumbnailsRequest {
pub id: FileIdentifier,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct GetFileThumbnailOfSizeRequest {
pub id: FileIdentifier,
pub min_size: (u32, u32),
pub max_size: (u32, u32),
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct GetFileTagsRequest {
pub id: FileIdentifier,
@ -81,6 +88,7 @@ pub struct FileMetadataResponse {
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ThumbnailMetadataResponse {
pub id: i64,
pub file_id: i64,
pub hash: String,
pub height: i32,
pub width: i32,
@ -92,3 +100,9 @@ pub struct UpdateFileNameRequest {
pub file_id: FileIdentifier,
pub name: String,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ThumbnailFullResponse {
pub metadata: ThumbnailMetadataResponse,
pub data: Vec<u8>,
}

Loading…
Cancel
Save