Update rmp-ipc to bromine and reexport it

Signed-off-by: trivernis <trivernis@protonmail.com>
pull/4/head
trivernis 3 years ago
parent 8218ae3235
commit f789863287

@ -1,7 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?> <?xml version="1.0" encoding="UTF-8"?>
<project version="4"> <project version="4">
<component name="DiscordProjectSettings"> <component name="DiscordProjectSettings">
<option name="show" value="ASK" /> <option name="show" value="PROJECT_FILES" />
<option name="description" value="" /> <option name="description" value="" />
</component> </component>
</project> </project>

@ -1,6 +1,6 @@
[package] [package]
name = "mediarepo-api" name = "mediarepo-api"
version = "0.12.0" version = "0.13.0"
edition = "2018" edition = "2018"
license = "gpl-3" license = "gpl-3"
@ -10,7 +10,6 @@ license = "gpl-3"
tracing = "0.1.29" tracing = "0.1.29"
thiserror = "1.0.30" thiserror = "1.0.30"
async-trait = {version = "0.1.51", optional=true} async-trait = {version = "0.1.51", optional=true}
rmp-ipc = {version = "0.11.0", optional=true}
parking_lot = {version="0.11.2", optional=true} parking_lot = {version="0.11.2", optional=true}
serde_json = {version="1.0.68", optional=true} serde_json = {version="1.0.68", optional=true}
directories = {version="4.0.1", optional=true} directories = {version="4.0.1", optional=true}
@ -19,6 +18,11 @@ serde_piecewise_default = "0.2.0"
futures = {version = "0.3.17", optional=true} futures = {version = "0.3.17", optional=true}
url = {version = "2.2.2", optional=true } url = {version = "2.2.2", optional=true }
[dependencies.bromine]
version = "0.16.1"
optional = true
features = ["serialize_bincode"]
[dependencies.serde] [dependencies.serde]
version = "1.0.130" version = "1.0.130"
features = ["serde_derive"] features = ["serde_derive"]
@ -43,5 +47,5 @@ version = "0.5.8"
optional = true optional = true
[features] [features]
tauri-plugin = ["client-api","tauri", "rmp-ipc", "parking_lot", "serde_json", "tokio", "toml", "directories", "mime_guess", "futures", "url"] tauri-plugin = ["client-api","tauri", "parking_lot", "serde_json", "tokio", "toml", "directories", "mime_guess", "futures", "url"]
client-api = ["rmp-ipc", "async-trait", "tokio"] client-api = ["bromine", "async-trait", "tokio"]

@ -5,7 +5,7 @@ pub type ApiResult<T> = Result<T, ApiError>;
#[derive(Debug, Error)] #[derive(Debug, Error)]
pub enum ApiError { pub enum ApiError {
#[error(transparent)] #[error(transparent)]
IPC(#[from] rmp_ipc::error::Error), IPC(#[from] bromine::error::Error),
#[error("The servers api version (version {server:?}) is incompatible with the api client {client:?}")] #[error("The servers api version (version {server:?}) is incompatible with the api client {client:?}")]
VersionMismatch { server: String, client: String }, VersionMismatch { server: String, client: String },

@ -7,18 +7,15 @@ use crate::types::files::{
}; };
use crate::types::identifier::FileIdentifier; use crate::types::identifier::FileIdentifier;
use async_trait::async_trait; use async_trait::async_trait;
use rmp_ipc::context::{PoolGuard, PooledContext}; use bromine::context::{PoolGuard, PooledContext};
use rmp_ipc::payload::{BytePayload, EventSendPayload}; use bromine::payload::BytePayload;
use rmp_ipc::prelude::*; use bromine::prelude::*;
pub struct FileApi<S: AsyncProtocolStream> { pub struct FileApi {
ctx: PooledContext<S>, ctx: PooledContext,
} }
impl<S> Clone for FileApi<S> impl Clone for FileApi {
where
S: AsyncProtocolStream,
{
fn clone(&self) -> Self { fn clone(&self) -> Self {
Self { Self {
ctx: self.ctx.clone(), ctx: self.ctx.clone(),
@ -27,25 +24,19 @@ where
} }
#[async_trait] #[async_trait]
impl<S> IPCApi<S> for FileApi<S> impl IPCApi for FileApi {
where
S: AsyncProtocolStream,
{
fn namespace() -> &'static str { fn namespace() -> &'static str {
"files" "files"
} }
fn ctx(&self) -> PoolGuard<Context<S>> { fn ctx(&self) -> PoolGuard<Context> {
self.ctx.acquire() self.ctx.acquire()
} }
} }
impl<S> FileApi<S> impl FileApi {
where
S: AsyncProtocolStream,
{
/// Creates a new file api client /// Creates a new file api client
pub fn new(ctx: PooledContext<S>) -> Self { pub fn new(ctx: PooledContext) -> Self {
Self { ctx } Self { ctx }
} }
@ -84,7 +75,7 @@ where
.emit_and_get("read_file", ReadFileRequest { id }) .emit_and_get("read_file", ReadFileRequest { id })
.await?; .await?;
Ok(payload.to_payload_bytes()?) Ok(payload.into_inner())
} }
/// Returns a list of all thumbnails of the file /// Returns a list of all thumbnails of the file
@ -105,7 +96,7 @@ where
min_size: (u32, u32), min_size: (u32, u32),
max_size: (u32, u32), max_size: (u32, u32),
) -> ApiResult<(ThumbnailMetadataResponse, Vec<u8>)> { ) -> ApiResult<(ThumbnailMetadataResponse, Vec<u8>)> {
let payload: TandemPayload<ThumbnailMetadataResponse, BytePayload> = self let payload: TandemPayload<SerdePayload<ThumbnailMetadataResponse>, BytePayload> = self
.emit_and_get( .emit_and_get(
"get_thumbnail_of_size", "get_thumbnail_of_size",
GetFileThumbnailOfSizeRequest { GetFileThumbnailOfSizeRequest {
@ -117,7 +108,7 @@ where
.await?; .await?;
let (metadata, bytes) = payload.into_inner(); let (metadata, bytes) = payload.into_inner();
Ok((metadata, bytes.into_inner())) Ok((metadata.data(), bytes.into_inner()))
} }
/// Updates a files name /// Updates a files name

@ -8,34 +8,27 @@ use crate::client_api::file::FileApi;
use crate::client_api::tag::TagApi; use crate::client_api::tag::TagApi;
use crate::types::misc::{check_apis_compatible, get_api_version, InfoResponse}; use crate::types::misc::{check_apis_compatible, get_api_version, InfoResponse};
use async_trait::async_trait; use async_trait::async_trait;
use rmp_ipc::context::{PoolGuard, PooledContext}; use bromine::ipc::stream_emitter::EmitMetadata;
use rmp_ipc::ipc::context::Context; use bromine::prelude::*;
use rmp_ipc::ipc::stream_emitter::EmitMetadata;
use rmp_ipc::payload::{EventReceivePayload, EventSendPayload};
use rmp_ipc::prelude::{AsyncProtocolStream, AsyncStreamProtocolListener};
use rmp_ipc::IPCBuilder;
use std::time::Duration; use std::time::Duration;
#[async_trait] #[async_trait]
pub trait IPCApi<S: AsyncProtocolStream> { pub trait IPCApi {
fn namespace() -> &'static str; fn namespace() -> &'static str;
fn ctx(&self) -> PoolGuard<Context<S>>; fn ctx(&self) -> PoolGuard<Context>;
async fn emit<T: EventSendPayload + Send>( async fn emit<T: IntoPayload + Send>(
&self, &self,
event_name: &str, event_name: &str,
data: T, data: T,
) -> ApiResult<EmitMetadata> { ) -> ApiResult<EmitMetadata> {
let ctx = self.ctx(); let ctx = self.ctx();
let meta = ctx let meta = ctx.emit_to(Self::namespace(), event_name, data).await?;
.emitter
.emit_to(Self::namespace(), event_name, data)
.await?;
Ok(meta) Ok(meta)
} }
async fn emit_and_get<T: EventSendPayload + Send, R: EventReceivePayload + Send>( async fn emit_and_get<T: IntoPayload + Send, R: FromPayload + Send>(
&self, &self,
event_name: &str, event_name: &str,
data: T, data: T,
@ -43,19 +36,16 @@ pub trait IPCApi<S: AsyncProtocolStream> {
let meta = self.emit(event_name, data).await?; let meta = self.emit(event_name, data).await?;
let response = meta.await_reply(&self.ctx()).await?; let response = meta.await_reply(&self.ctx()).await?;
Ok(response.data()?) Ok(response.payload()?)
} }
} }
pub struct ApiClient<L: AsyncStreamProtocolListener> { pub struct ApiClient {
ctx: PooledContext<L::Stream>, ctx: PooledContext,
pub file: FileApi<L::Stream>, pub file: FileApi,
pub tag: TagApi<L::Stream>, pub tag: TagApi,
} }
impl<L> Clone for ApiClient<L> impl Clone for ApiClient {
where
L: AsyncStreamProtocolListener,
{
fn clone(&self) -> Self { fn clone(&self) -> Self {
Self { Self {
ctx: self.ctx.clone(), ctx: self.ctx.clone(),
@ -65,12 +55,9 @@ where
} }
} }
impl<L> ApiClient<L> impl ApiClient {
where
L: AsyncStreamProtocolListener,
{
/// Creates a new client from an existing ipc context /// Creates a new client from an existing ipc context
pub fn new(ctx: PooledContext<L::Stream>) -> Self { pub fn new(ctx: PooledContext) -> Self {
Self { Self {
file: FileApi::new(ctx.clone()), file: FileApi::new(ctx.clone()),
tag: TagApi::new(ctx.clone()), tag: TagApi::new(ctx.clone()),
@ -80,7 +67,9 @@ where
/// Connects to the ipc Socket /// Connects to the ipc Socket
#[tracing::instrument(level = "debug")] #[tracing::instrument(level = "debug")]
pub async fn connect(address: L::AddressType) -> ApiResult<Self> { pub async fn connect<L: AsyncStreamProtocolListener>(
address: L::AddressType,
) -> ApiResult<Self> {
let ctx = IPCBuilder::<L>::new() let ctx = IPCBuilder::<L>::new()
.address(address) .address(address)
.timeout(Duration::from_secs(10)) .timeout(Duration::from_secs(10))
@ -109,13 +98,8 @@ where
#[tracing::instrument(level = "debug", skip(self))] #[tracing::instrument(level = "debug", skip(self))]
pub async fn info(&self) -> ApiResult<InfoResponse> { pub async fn info(&self) -> ApiResult<InfoResponse> {
let ctx = self.ctx.acquire(); let ctx = self.ctx.acquire();
let res = ctx let res = ctx.emit("info", ()).await?.await_reply(&ctx).await?;
.emitter Ok(res.payload::<InfoResponse>()?)
.emit("info", ())
.await?
.await_reply(&ctx)
.await?;
Ok(res.data::<InfoResponse>()?)
} }
#[tracing::instrument(level = "debug", skip(self))] #[tracing::instrument(level = "debug", skip(self))]

@ -1,7 +1,7 @@
use async_trait::async_trait; use async_trait::async_trait;
use rmp_ipc::error::Result; use bromine::error::Result;
use rmp_ipc::prelude::IPCResult; use bromine::prelude::IPCResult;
use rmp_ipc::protocol::*; use bromine::protocol::*;
use std::io::Error; use std::io::Error;
use std::net::ToSocketAddrs; use std::net::ToSocketAddrs;
use std::pin::Pin; use std::pin::Pin;

@ -4,18 +4,14 @@ use crate::types::files::{GetFileTagsRequest, GetFilesTagsRequest};
use crate::types::identifier::FileIdentifier; use crate::types::identifier::FileIdentifier;
use crate::types::tags::{ChangeFileTagsRequest, TagResponse}; use crate::types::tags::{ChangeFileTagsRequest, TagResponse};
use async_trait::async_trait; use async_trait::async_trait;
use rmp_ipc::context::{PoolGuard, PooledContext}; use bromine::context::{PoolGuard, PooledContext};
use rmp_ipc::ipc::context::Context; use bromine::ipc::context::Context;
use rmp_ipc::protocol::AsyncProtocolStream;
pub struct TagApi<S: AsyncProtocolStream> { pub struct TagApi {
ctx: PooledContext<S>, ctx: PooledContext,
} }
impl<S> Clone for TagApi<S> impl Clone for TagApi {
where
S: AsyncProtocolStream,
{
fn clone(&self) -> Self { fn clone(&self) -> Self {
Self { Self {
ctx: self.ctx.clone(), ctx: self.ctx.clone(),
@ -24,24 +20,18 @@ where
} }
#[async_trait] #[async_trait]
impl<S> IPCApi<S> for TagApi<S> impl IPCApi for TagApi {
where
S: AsyncProtocolStream,
{
fn namespace() -> &'static str { fn namespace() -> &'static str {
"tags" "tags"
} }
fn ctx(&self) -> PoolGuard<Context<S>> { fn ctx(&self) -> PoolGuard<Context> {
self.ctx.acquire() self.ctx.acquire()
} }
} }
impl<S> TagApi<S> impl TagApi {
where pub fn new(ctx: PooledContext) -> Self {
S: AsyncProtocolStream,
{
pub fn new(ctx: PooledContext<S>) -> Self {
Self { ctx } Self { ctx }
} }

@ -8,3 +8,5 @@ pub mod daemon_management;
#[cfg(feature = "tauri-plugin")] #[cfg(feature = "tauri-plugin")]
pub mod tauri_plugin; pub mod tauri_plugin;
pub use bromine;

@ -1,7 +1,7 @@
use crate::tauri_plugin::commands::AppAccess; use crate::tauri_plugin::commands::AppAccess;
use crate::tauri_plugin::error::PluginResult; use crate::tauri_plugin::error::PluginResult;
use rmp_ipc::prelude::{IPCError, IPCResult}; use bromine::prelude::{IPCError, IPCResult};
use rmp_ipc::IPCBuilder; use bromine::IPCBuilder;
use std::io::ErrorKind; use std::io::ErrorKind;
use std::net::{SocketAddr, ToSocketAddrs}; use std::net::{SocketAddr, ToSocketAddrs};
use tokio::net::TcpListener; use tokio::net::TcpListener;
@ -43,11 +43,7 @@ async fn try_connect_daemon(address: String) -> IPCResult<()> {
.address(address) .address(address)
.build_client() .build_client()
.await?; .await?;
ctx.emitter ctx.emit("info", ()).await?.await_reply(&ctx).await?;
.emit("info", ())
.await?
.await_reply(&ctx)
.await?;
ctx.stop().await?; ctx.stop().await?;
Ok(()) Ok(())
} }

@ -1,3 +1,4 @@
use crate::client_api::protocol::ApiProtocolListener;
use crate::client_api::ApiClient; use crate::client_api::ApiClient;
use crate::tauri_plugin::commands::{ApiAccess, AppAccess}; use crate::tauri_plugin::commands::{ApiAccess, AppAccess};
use crate::tauri_plugin::error::{PluginError, PluginResult}; use crate::tauri_plugin::error::{PluginError, PluginResult};
@ -165,7 +166,7 @@ pub async fn select_repository(
.ok_or_else(|| PluginError::from("Missing repo path or address in config."))?; .ok_or_else(|| PluginError::from("Missing repo path or address in config."))?;
get_repo_address(path).await? get_repo_address(path).await?
}; };
let client = ApiClient::connect(address).await?; let client = ApiClient::connect::<ApiProtocolListener>(address).await?;
api_state.set_api(client).await; api_state.set_api(client).await;
let mut active_repo = app_state.active_repo.write().await; let mut active_repo = app_state.active_repo.write().await;

@ -1,6 +1,6 @@
use crate::client_api::error::ApiError; use crate::client_api::error::ApiError;
use crate::daemon_management::error::DaemonError; use crate::daemon_management::error::DaemonError;
use rmp_ipc::error::Error; use bromine::error::Error;
use serde::Serialize; use serde::Serialize;
use std::fmt::{Display, Formatter}; use std::fmt::{Display, Formatter};

@ -9,14 +9,13 @@ use parking_lot::RwLock as ParkingRwLock;
use tauri::async_runtime::RwLock; use tauri::async_runtime::RwLock;
use tokio::time::Instant; use tokio::time::Instant;
use crate::client_api::protocol::ApiProtocolListener;
use crate::client_api::ApiClient; use crate::client_api::ApiClient;
use crate::daemon_management::cli::DaemonCli; use crate::daemon_management::cli::DaemonCli;
use crate::tauri_plugin::error::{PluginError, PluginResult}; use crate::tauri_plugin::error::{PluginError, PluginResult};
use crate::tauri_plugin::settings::{load_settings, Repository, Settings}; use crate::tauri_plugin::settings::{load_settings, Repository, Settings};
pub struct ApiState { pub struct ApiState {
inner: Arc<RwLock<Option<ApiClient<ApiProtocolListener>>>>, inner: Arc<RwLock<Option<ApiClient>>>,
} }
unsafe impl Send for ApiState {} unsafe impl Send for ApiState {}
@ -30,7 +29,7 @@ impl ApiState {
} }
/// Sets the active api client and disconnects the old one /// Sets the active api client and disconnects the old one
pub async fn set_api(&self, client: ApiClient<ApiProtocolListener>) { pub async fn set_api(&self, client: ApiClient) {
let mut inner = self.inner.write().await; let mut inner = self.inner.write().await;
let old_client = mem::replace(&mut *inner, Some(client)); let old_client = mem::replace(&mut *inner, Some(client));
@ -49,7 +48,7 @@ impl ApiState {
} }
} }
pub async fn api(&self) -> PluginResult<ApiClient<ApiProtocolListener>> { pub async fn api(&self) -> PluginResult<ApiClient> {
let inner = self.inner.read().await; let inner = self.inner.read().await;
inner inner
.clone() .clone()

Loading…
Cancel
Save