Improve background task handling

Signed-off-by: trivernis <trivernis@protonmail.com>
pull/14/head
trivernis 3 years ago
parent b86b6f21ac
commit 1578a917b7
Signed by: Trivernis
GPG Key ID: DFFFCC2C7A02DB45

@ -0,0 +1,158 @@
use crate::tauri_plugin::error::{PluginError, PluginResult};
use futures::future;
use std::collections::HashMap;
use std::fmt::{Debug, Formatter};
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use std::{mem, thread};
use tokio::sync::{Mutex, RwLock};
#[derive(Clone, Debug)]
pub struct TaskContext {
tasks: Arc<RwLock<HashMap<String, AsyncTask>>>,
}
impl TaskContext {
pub fn new() -> Self {
Self {
tasks: Default::default(),
}
}
pub async fn add_task<S: ToString, F: 'static + Future<Output = PluginResult<()>>>(
&self,
name: S,
task: F,
) {
self.tasks
.write()
.await
.insert(name.to_string(), AsyncTask::new(task));
}
pub async fn task_state<S: AsRef<str>>(&self, name: S) -> Option<TaskState> {
let state = {
let tasks = self.tasks.read().await;
if let Some(task) = tasks.get(name.as_ref()) {
Some(task.state().await)
} else {
None
}
};
if let Some(TaskState::Finished) = state {
self.tasks.write().await.remove(name.as_ref());
}
state
}
/// Returns all tasks queued for execution
async fn queued_tasks(&self) -> Vec<AsyncTask> {
let task_map = self.tasks.read().await;
let mut tasks = Vec::new();
for task in task_map.values() {
if task.state().await == TaskState::Queued {
tasks.push(task.clone());
}
}
tasks
}
}
#[derive(Clone, Debug, Ord, PartialOrd, Eq, PartialEq)]
pub enum TaskState {
Queued,
Running,
Finished,
Error,
}
impl TaskState {
pub fn error(&self) -> bool {
*self == TaskState::Error
}
}
#[derive(Clone)]
pub struct AsyncTask {
state: Arc<RwLock<TaskState>>,
inner: Arc<Mutex<Option<Pin<Box<dyn Future<Output = PluginResult<()>>>>>>>,
error: Arc<RwLock<Option<PluginError>>>,
}
impl Debug for AsyncTask {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "AsyncTask (state: {:?})", self.state)
}
}
impl AsyncTask {
pub fn new<F: 'static + Future<Output = PluginResult<()>>>(inner: F) -> Self {
Self {
state: Arc::new(RwLock::new(TaskState::Queued)),
inner: Arc::new(Mutex::new(Some(Box::pin(inner)))),
error: Default::default(),
}
}
pub async fn exec(&self) {
self.set_state(TaskState::Running).await;
let inner = self.inner.lock().await.take();
if let Some(task) = inner {
if let Err(e) = task.await {
let _ = mem::replace(&mut *self.error.write().await, Some(e));
self.set_state(TaskState::Error).await;
} else {
self.set_state(TaskState::Finished).await;
}
} else {
self.set_state(TaskState::Finished).await;
}
}
pub async fn state(&self) -> TaskState {
self.state.read().await.clone()
}
async fn set_state(&self, state: TaskState) {
let _ = mem::replace(&mut *self.state.write().await, state);
}
}
unsafe impl Send for AsyncTask {}
unsafe impl Sync for AsyncTask {}
pub fn start_background_task_runtime(ctx: TaskContext) {
thread::spawn(move || {
tokio::runtime::Builder::new_current_thread()
.thread_name("background_tasks")
.enable_time()
.build()
.expect("failed to build background task runtime")
.block_on(async move {
tracing::debug!("background task listener ready");
loop {
let tasks = ctx.queued_tasks().await;
if tasks.len() > 0 {
tracing::debug!("executing {} async background tasks", tasks.len());
let start = SystemTime::now();
future::join_all(tasks.iter().map(|t| t.exec())).await;
tracing::debug!(
"background tasks executed in {} ms",
start.elapsed().unwrap().as_millis()
);
} else {
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
});
tracing::error!("background task executor exited!");
});
}

@ -1,12 +1,13 @@
use crate::client_api::ApiClient;
use crate::tauri_plugin::background_tasks::TaskContext;
use crate::tauri_plugin::error::{PluginError, PluginResult};
use crate::tauri_plugin::state::{ApiState, AppState, AsyncTask, BufferState};
use crate::tauri_plugin::state::{ApiState, BufferState};
use crate::types::identifier::FileIdentifier;
use std::borrow::Cow;
use std::collections::HashMap;
use std::sync::Arc;
use tauri::http::{Request, Response, ResponseBuilder};
use tauri::{AppHandle, Builder, Manager, Runtime};
use tauri::{AppHandle, Builder, Manager, Runtime, State};
use tokio::runtime::{Builder as TokioRuntimeBuilder, Runtime as TokioRuntime};
use url::Url;
@ -55,7 +56,6 @@ fn once_scheme<R: Runtime>(app: &AppHandle<R>, request: &Request) -> Result<Resp
#[tracing::instrument(level = "debug", skip_all)]
async fn content_scheme<R: Runtime>(app: &AppHandle<R>, request: &Request) -> Result<Response> {
let api_state = app.state::<ApiState>();
let buf_state = app.state::<BufferState>();
let hash = request.uri().trim_start_matches("content://");
@ -67,7 +67,10 @@ async fn content_scheme<R: Runtime>(app: &AppHandle<R>, request: &Request) -> Re
.body(buffer.buf)
} else {
tracing::debug!("Fetching content from daemon");
let api_state = app.state::<ApiState>();
let api = api_state.api().await?;
let file = api
.file
.get_file(FileIdentifier::CD(hash.to_string()))
@ -79,18 +82,17 @@ async fn content_scheme<R: Runtime>(app: &AppHandle<R>, request: &Request) -> Re
.await?;
tracing::debug!("Received {} content bytes", bytes.len());
buf_state.add_entry(hash.to_string(), mime.clone(), bytes.clone());
ResponseBuilder::new()
.mimetype(&mime)
.status(200)
.mimetype(&mime)
.body(bytes)
}
}
#[tracing::instrument(level = "debug", skip_all)]
async fn thumb_scheme<R: Runtime>(app: &AppHandle<R>, request: &Request) -> Result<Response> {
let api_state = app.state::<ApiState>();
let buf_state = app.state::<BufferState>();
let app_state = app.state::<AppState>();
let url = Url::parse(request.uri())?;
let hash = url
@ -118,20 +120,28 @@ async fn thumb_scheme<R: Runtime>(app: &AppHandle<R>, request: &Request) -> Resu
.mimetype(&buffer.mime)
.body(buffer.buf)
} else {
tracing::debug!("Content not loaded. Singnaling retry.");
let api = api_state.api().await?;
let buf_state = buf_state.inner().clone();
tracing::debug!("Content not loaded. Signaling retry.");
let task_ctx = app.state::<TaskContext>();
let state = task_ctx.task_state(request.uri()).await;
if state.is_none() || state.unwrap().error() {
let buf_state = buf_state.inner().clone();
let api_state = app.state::<ApiState>();
let api = api_state.api().await?;
app_state
.add_async_task(build_fetch_thumbnail_task(
add_fetch_thumbnail_task(
request.uri(),
task_ctx,
buf_state,
api,
hash.to_string(),
request.uri().to_string(),
width,
height,
))
)
.await;
}
ResponseBuilder::new()
.mimetype("text/plain")
@ -141,27 +151,31 @@ async fn thumb_scheme<R: Runtime>(app: &AppHandle<R>, request: &Request) -> Resu
}
}
fn build_fetch_thumbnail_task(
async fn add_fetch_thumbnail_task(
name: &str,
task_ctx: State<'_, TaskContext>,
buf_state: BufferState,
api: ApiClient,
hash: String,
request_uri: String,
width: u32,
height: u32,
) -> AsyncTask {
AsyncTask::new(async move {
tracing::debug!("Fetching content from daemon");
let (thumb, bytes) = api
.file
.get_thumbnail_of_size(
FileIdentifier::CD(hash),
((height as f32 * 0.5) as u32, (width as f32 * 0.5) as u32),
((height as f32 * 1.5) as u32, (width as f32 * 1.5) as u32),
)
.await?;
tracing::debug!("Received {} content bytes", bytes.len());
buf_state.add_entry(request_uri, thumb.mime_type.clone(), bytes.clone());
Ok(())
})
) {
task_ctx
.add_task(name, async move {
tracing::debug!("Fetching content from daemon");
let (thumb, bytes) = api
.file
.get_thumbnail_of_size(
FileIdentifier::CD(hash),
((height as f32 * 0.5) as u32, (width as f32 * 0.5) as u32),
((height as f32 * 1.5) as u32, (width as f32 * 1.5) as u32),
)
.await?;
tracing::debug!("Received {} content bytes", bytes.len());
buf_state.add_entry(request_uri, thumb.mime_type.clone(), bytes.clone());
Ok(())
})
.await;
}

@ -4,11 +4,10 @@ use tauri::{AppHandle, Builder, Invoke, Manager, Runtime};
use state::ApiState;
use crate::tauri_plugin::state::{AppState, BufferState};
use futures::future;
use std::mem;
use std::thread;
use std::time::Duration;
mod background_tasks;
pub(crate) mod commands;
pub mod custom_schemes;
pub mod error;
@ -16,9 +15,10 @@ mod settings;
mod state;
mod utils;
use crate::tauri_plugin::background_tasks::{start_background_task_runtime, TaskContext};
use commands::*;
const MAX_BUFFER_SIZE: usize = 2 * 1024 * 1024 * 1024;
const MAX_BUFFER_SIZE: usize = 2 * 1024 * 1024 * 1024; // 2GiB
pub fn register_plugin<R: Runtime>(builder: Builder<R>) -> Builder<R> {
let repo_plugin = MediarepoPlugin::new();
@ -99,36 +99,17 @@ impl<R: Runtime> Plugin<R> for MediarepoPlugin<R> {
app.manage(buffer_state.clone());
let repo_state = AppState::load()?;
let background_tasks = repo_state.background_tasks();
app.manage(repo_state);
let task_context = TaskContext::new();
start_background_task_runtime(task_context.clone());
app.manage(task_context);
thread::spawn(move || loop {
thread::sleep(Duration::from_secs(10));
buffer_state.clear_expired();
buffer_state.trim_to_size(MAX_BUFFER_SIZE);
});
thread::spawn(move || {
tokio::runtime::Builder::new_current_thread()
.thread_name("background_tasks")
.enable_time()
.build()
.expect("failed to build background task runtime")
.block_on(async move {
tracing::debug!("background task listener ready");
loop {
let tasks = mem::take(&mut *background_tasks.lock().await);
if tasks.len() > 0 {
tracing::debug!("executing {} async background tasks", tasks.len());
future::join_all(tasks.into_iter().map(|t| t.exec())).await;
tracing::debug!("background tasks executed");
} else {
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
});
tracing::error!("background task executor exited!");
});
Ok(())
}

@ -1,16 +1,12 @@
use std::collections::HashMap;
use std::fmt::{Debug, Formatter};
use std::future::Future;
use std::mem;
use std::ops::Deref;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use parking_lot::Mutex;
use parking_lot::RwLock as ParkingRwLock;
use tauri::async_runtime::RwLock;
use tokio::sync::Mutex as TokioMutex;
use tokio::time::Instant;
use crate::client_api::ApiClient;
@ -173,7 +169,6 @@ pub struct AppState {
pub active_repo: Arc<RwLock<Option<Repository>>>,
pub settings: Arc<RwLock<Settings>>,
pub running_daemons: Arc<RwLock<HashMap<String, DaemonCli>>>,
pub background_tasks: Arc<TokioMutex<Vec<AsyncTask>>>,
}
impl AppState {
@ -185,7 +180,6 @@ impl AppState {
active_repo: Default::default(),
settings: Arc::new(RwLock::new(settings)),
running_daemons: Default::default(),
background_tasks: Default::default(),
};
Ok(state)
@ -224,37 +218,4 @@ impl AppState {
Ok(())
}
pub async fn add_async_task(&self, task: AsyncTask) {
self.background_tasks.lock().await.push(task);
}
pub fn background_tasks(&self) -> Arc<TokioMutex<Vec<AsyncTask>>> {
self.background_tasks.clone()
}
}
pub struct AsyncTask {
inner: Pin<Box<dyn Future<Output = PluginResult<()>>>>,
}
impl Debug for AsyncTask {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
"AsyncTask".fmt(f)
}
}
impl AsyncTask {
pub fn new<F: 'static + Future<Output = PluginResult<()>>>(inner: F) -> Self {
Self {
inner: Box::pin(inner),
}
}
pub async fn exec(self) -> PluginResult<()> {
self.inner.await
}
}
unsafe impl Send for AsyncTask {}
unsafe impl Sync for AsyncTask {}

@ -1,5 +1,14 @@
<ng-content></ng-content>
<div *ngIf="this.busy" [class.blur]="this.blurBackground" [class.darken]="this.darkenBackground"
class="busy-indicator-overlay">
<mat-progress-spinner *ngIf="this.busy" [mode]="mode" [value]="value" color="primary"></mat-progress-spinner>
<mat-progress-spinner *ngIf="indicatorType === 'spinner' && this.busy"
[mode]="mode"
[value]="value"
color="primary"></mat-progress-spinner>
<app-middle-centered *ngIf="indicatorType === 'pulse' && this.busy">
<div class="loading-indicator-pulse-outer">
<div class="loading-indicator-pulse-inner"></div>
</div>
</app-middle-centered>
</div>

@ -1,3 +1,5 @@
@import "src/colors";
:host {
display: block;
position: relative;
@ -38,9 +40,80 @@ hidden {
display: none;
}
::ng-deep app-busy-indicator {
width: 100%;
height: 100%;
position: relative;
.loading-indicator-pulse-outer {
display: flex;
background-color: $primary;
animation-name: pulse-outer;
animation-duration: 2.5s;
border-radius: 1em;
width: 2em;
height: 2em;
animation-iteration-count: infinite;
animation-timing-function: ease-in-out;
}
.loading-indicator-pulse-inner {
display: block;
margin: auto;
background-color: $primary-lighter-10;
animation-name: pulse-inner;
animation-duration: 2.5s;
border-radius: 0.5em;
width: 1em;
height: 1em;
animation-iteration-count: infinite;
animation-timing-function: ease-in-out;
}
@keyframes pulse-outer {
2% {
border-radius: 1em;
width: 2em;
height: 2em;
}
48% {
border-radius: 2em;
width: 4em;
height: 4em;
}
52% {
border-radius: 2em;
width: 4em;
height: 4em;
}
98% {
border-radius: 1em;
width: 2em;
height: 2em;
}
}
@keyframes pulse-inner {
15% {
border-radius: 0.5em;
width: 1em;
height: 1em;
}
55% {
border-radius: 1.75em;
width: 2.5em;
height: 2.5em;
}
65% {
border-radius: 1.75em;
width: 2.5em;
height: 2.5em;
}
100% {
border-radius: 0.5em;
width: 1em;
height: 1em;
}
}

@ -12,6 +12,7 @@ export class BusyIndicatorComponent implements OnChanges {
@Input() blurBackground: boolean = false;
@Input() darkenBackground: boolean = false;
@Input() mode: ProgressSpinnerMode = "indeterminate";
@Input() indicatorType: "spinner" | "pulse" = "spinner";
@Input() value: number | undefined;
constructor(private changeDetector: ChangeDetectorRef) {

@ -10,7 +10,14 @@
<div (cdkDragMoved)="this.onDragMoved($event)"
[cdkDragFreeDragPosition]="this.imagePosition" cdkDrag class="image-drag-container">
<div [style]="{scale: this.imageZoom}" class="image-scale-container">
<app-content-aware-image [imageSrc]="this.imageUrl" decoding="sync"></app-content-aware-image>
<app-busy-indicator [busy]="this.loading" indicatorType="pulse">
<app-content-aware-image (appLoadEnd)="this.loading = false" (appLoadError)="this.loading = false"
[class.hidden]="this.loading"
[imageSrc]="this.imageUrl"
[maxRetry]="3"
[retryDelay]="500"
decoding="sync"></app-content-aware-image>
</app-busy-indicator>
</div>
</div>
</div>

@ -38,3 +38,7 @@
display: block;
position: relative;
}
.hidden {
display: none;
}

@ -13,12 +13,15 @@ export class ImageViewerComponent implements OnChanges {
public imagePosition = { x: 0, y: 0 };
public mouseInImageView = false;
public loading = true;
constructor() {
}
public ngOnChanges(changes: SimpleChanges): void {
if (changes["imageUrl"]) {
this.resetImage();
this.loading = true;
}
}

@ -1,13 +1,13 @@
<div (click)="this.onClick()" (dblclick)="dblClickEvent.emit(this)"
[class.loading]="this.loading"
[class.selected]="this.entry.selected | async" class="file-card-inner">
<app-middle-centered *ngIf="this.loading">
<div class="loading-indicator"></div>
</app-middle-centered>
<app-file-thumbnail
(loadEnd)="this.loading = false"
[class.hidden]="this.loading"
[fileChanged]="this.fileChanged"
[file]="this.entry.data"
class="entry-image"></app-file-thumbnail>
<app-busy-indicator [busy]="this.loading" indicatorType="pulse">
<app-file-thumbnail
(loadEnd)="this.loading = false"
[class.hidden]="this.loading"
[fileChanged]="this.fileChanged"
[file]="this.entry.data"
class="entry-image"></app-file-thumbnail>
</app-busy-indicator>
</div>

@ -29,43 +29,6 @@
display: flex;
}
.loading-indicator {
display: block;
background-color: $primary;
animation-name: pulse;
animation-duration: 3s;
border-radius: 1em;
width: 2em;
height: 2em;
animation-iteration-count: infinite;
animation-timing-function: ease-in-out;
}
@keyframes pulse {
2% {
border-radius: 1em;
width: 2em;
height: 2em;
}
48% {
border-radius: 2em;
width: 4em;
height: 4em;
}
52% {
border-radius: 2em;
width: 4em;
height: 4em;
}
98% {
border-radius: 1em;
width: 2em;
height: 2em;
}
}
.hidden {
display: none;

@ -1,7 +1,7 @@
<app-content-aware-image (appLoadEnd)="this.loadEnd.emit()" (appLoadError)="this.onImageLoadError()"
*ngIf="this.thumbnailSupported && this.thumbUrl"
[imageSrc]="this.thumbUrl"
[maxRetry]="5" [retryDelay]="250"
[maxRetry]="5" [retryDelay]="100"
borderRadius="0.25em"></app-content-aware-image>
<div *ngIf="this.thumbnailSupported && this.thumbUrl" class="file-icon-overlay">
<ng-icon *ngIf="fileType === 'video'" name="mat-movie"></ng-icon>

Loading…
Cancel
Save