Switch to pooled ipc client

Signed-off-by: trivernis <trivernis@protonmail.com>
pull/4/head
trivernis 3 years ago
parent b64cb29137
commit 9c1ba03bac

@ -6,12 +6,13 @@ use crate::types::files::{
}; };
use crate::types::identifier::FileIdentifier; use crate::types::identifier::FileIdentifier;
use async_trait::async_trait; use async_trait::async_trait;
use rmp_ipc::context::{PoolGuard, PooledContext};
use rmp_ipc::payload::{BytePayload, EventSendPayload}; use rmp_ipc::payload::{BytePayload, EventSendPayload};
use rmp_ipc::prelude::Context; use rmp_ipc::prelude::Context;
#[derive(Clone)] #[derive(Clone)]
pub struct FileApi { pub struct FileApi {
ctx: Context, ctx: PooledContext,
} }
#[async_trait] #[async_trait]
@ -20,14 +21,14 @@ impl IPCApi for FileApi {
"files" "files"
} }
fn ctx(&self) -> &Context { fn ctx(&self) -> PoolGuard<Context> {
&self.ctx self.ctx.acquire()
} }
} }
impl FileApi { impl FileApi {
/// Creates a new file api client /// Creates a new file api client
pub fn new(ctx: Context) -> Self { pub fn new(ctx: PooledContext) -> Self {
Self { ctx } Self { ctx }
} }

@ -7,6 +7,7 @@ use crate::client_api::file::FileApi;
use crate::client_api::tag::TagApi; use crate::client_api::tag::TagApi;
use crate::types::misc::InfoResponse; use crate::types::misc::InfoResponse;
use async_trait::async_trait; use async_trait::async_trait;
use rmp_ipc::context::{PoolGuard, PooledContext};
use rmp_ipc::ipc::context::Context; use rmp_ipc::ipc::context::Context;
use rmp_ipc::ipc::stream_emitter::EmitMetadata; use rmp_ipc::ipc::stream_emitter::EmitMetadata;
use rmp_ipc::payload::{EventReceivePayload, EventSendPayload}; use rmp_ipc::payload::{EventReceivePayload, EventSendPayload};
@ -16,7 +17,7 @@ use std::fmt::Debug;
#[async_trait] #[async_trait]
pub trait IPCApi { pub trait IPCApi {
fn namespace() -> &'static str; fn namespace() -> &'static str;
fn ctx(&self) -> &Context; fn ctx(&self) -> PoolGuard<Context>;
async fn emit<T: EventSendPayload + Debug + Send>( async fn emit<T: EventSendPayload + Debug + Send>(
&self, &self,
@ -41,7 +42,7 @@ pub trait IPCApi {
data: T, data: T,
) -> ApiResult<R> { ) -> ApiResult<R> {
let meta = self.emit(event_name, data).await?; let meta = self.emit(event_name, data).await?;
let response = meta.await_reply(self.ctx()).await?; let response = meta.await_reply(&self.ctx()).await?;
Ok(response.data()?) Ok(response.data()?)
} }
@ -49,14 +50,14 @@ pub trait IPCApi {
#[derive(Clone)] #[derive(Clone)]
pub struct ApiClient { pub struct ApiClient {
ctx: Context, ctx: PooledContext,
pub file: FileApi, pub file: FileApi,
pub tag: TagApi, pub tag: TagApi,
} }
impl ApiClient { impl ApiClient {
/// Creates a new client from an existing ipc context /// Creates a new client from an existing ipc context
pub fn new(ctx: Context) -> Self { pub fn new(ctx: PooledContext) -> Self {
Self { Self {
file: FileApi::new(ctx.clone()), file: FileApi::new(ctx.clone()),
tag: TagApi::new(ctx.clone()), tag: TagApi::new(ctx.clone()),
@ -67,7 +68,10 @@ impl ApiClient {
/// Connects to the ipc Socket /// Connects to the ipc Socket
#[tracing::instrument(level = "debug")] #[tracing::instrument(level = "debug")]
pub async fn connect(address: &str) -> ApiResult<Self> { pub async fn connect(address: &str) -> ApiResult<Self> {
let ctx = IPCBuilder::new().address(address).build_client().await?; let ctx = IPCBuilder::new()
.address(address)
.build_pooled_client(8)
.await?;
Ok(Self::new(ctx)) Ok(Self::new(ctx))
} }
@ -75,19 +79,20 @@ impl ApiClient {
/// Returns information about the connected ipc server /// Returns information about the connected ipc server
#[tracing::instrument(level = "debug", skip(self))] #[tracing::instrument(level = "debug", skip(self))]
pub async fn info(&self) -> ApiResult<InfoResponse> { pub async fn info(&self) -> ApiResult<InfoResponse> {
let res = self let ctx = self.ctx.acquire();
.ctx let res = ctx
.emitter .emitter
.emit("info", ()) .emit("info", ())
.await? .await?
.await_reply(&self.ctx) .await_reply(&ctx)
.await?; .await?;
Ok(res.data::<InfoResponse>()?) Ok(res.data::<InfoResponse>()?)
} }
#[tracing::instrument(level = "debug", skip(self))] #[tracing::instrument(level = "debug", skip(self))]
pub async fn exit(self) -> ApiResult<()> { pub async fn exit(self) -> ApiResult<()> {
self.ctx.stop().await?; let ctx = (*self.ctx.acquire()).clone();
ctx.stop().await?;
Ok(()) Ok(())
} }
} }

@ -4,11 +4,12 @@ use crate::types::files::GetFileTagsRequest;
use crate::types::identifier::FileIdentifier; use crate::types::identifier::FileIdentifier;
use crate::types::tags::TagResponse; use crate::types::tags::TagResponse;
use async_trait::async_trait; use async_trait::async_trait;
use rmp_ipc::context::{PoolGuard, PooledContext};
use rmp_ipc::ipc::context::Context; use rmp_ipc::ipc::context::Context;
#[derive(Clone)] #[derive(Clone)]
pub struct TagApi { pub struct TagApi {
ctx: Context, ctx: PooledContext,
} }
#[async_trait] #[async_trait]
@ -17,13 +18,13 @@ impl IPCApi for TagApi {
"tags" "tags"
} }
fn ctx(&self) -> &Context { fn ctx(&self) -> PoolGuard<Context> {
&self.ctx self.ctx.acquire()
} }
} }
impl TagApi { impl TagApi {
pub fn new(ctx: Context) -> Self { pub fn new(ctx: PooledContext) -> Self {
Self { ctx } Self { ctx }
} }

@ -1,4 +1,5 @@
use crate::client_api::error::ApiError; use crate::client_api::error::ApiError;
use rmp_ipc::error::Error;
use serde::Serialize; use serde::Serialize;
use std::fmt::{Display, Formatter}; use std::fmt::{Display, Formatter};
@ -27,9 +28,19 @@ impl From<&str> for PluginError {
impl From<ApiError> for PluginError { impl From<ApiError> for PluginError {
fn from(e: ApiError) -> Self { fn from(e: ApiError) -> Self {
Self { let message = match e {
message: format!("ApiError: {:?}", e), ApiError::IPC(ipc_error) => match ipc_error {
} Error::Message(message) => message,
Error::SendError => String::from("Failed to send event to daemon"),
Error::ErrorEvent(e) => {
format!("Received error: {}", e.to_string())
}
e => {
format!("{:?}", e)
}
},
};
Self { message }
} }
} }

Loading…
Cancel
Save