Add parallel processing implementation

feature/processing-pipeline
trivernis 1 year ago
parent 143868b4ae
commit b8021c6faf
Signed by: Trivernis
GPG Key ID: DFFFCC2C7A02DB45

@ -1,6 +1,12 @@
use std::path::PathBuf; use std::path::PathBuf;
#[derive(Clone, Debug)]
pub struct Context { pub struct Context {
pub dirs: Dirs,
}
#[derive(Clone, Debug)]
pub struct Dirs {
pub content_dir: PathBuf, pub content_dir: PathBuf,
pub template_dir: PathBuf, pub template_dir: PathBuf,
pub stylesheet_dir: PathBuf, pub stylesheet_dir: PathBuf,

@ -3,7 +3,7 @@ use std::{path::Path, sync::Arc};
use args::BuildArgs; use args::BuildArgs;
use clap::Parser; use clap::Parser;
use config::{read_config, Config}; use config::{read_config, Config};
use context::Context; use context::{Context, Dirs};
use data::DirLoader; use data::DirLoader;
use miette::Result; use miette::Result;
use rendering::ContentRenderer; use rendering::ContentRenderer;
@ -37,7 +37,7 @@ async fn build(args: &Args, _build_args: &BuildArgs, cfg: Config) -> Result<()>
let base_path = &args.directory; let base_path = &args.directory;
let ctx = Arc::new(build_context(&base_path, &cfg)); let ctx = Arc::new(build_context(&base_path, &cfg));
let dirs = DirLoader::new(ctx.content_dir.to_owned()) let dirs = DirLoader::new(ctx.dirs.content_dir.to_owned())
.read_content() .read_content()
.await?; .await?;
@ -54,10 +54,12 @@ fn build_context(base_path: &Path, config: &Config) -> Context {
let stylesheet_dir = base_path.join(folders.stylesheets.unwrap_or("styles".into())); let stylesheet_dir = base_path.join(folders.stylesheets.unwrap_or("styles".into()));
Context { Context {
content_dir, dirs: Dirs {
template_dir, content_dir,
stylesheet_dir, template_dir,
output_dir, stylesheet_dir,
output_dir,
},
} }
} }

@ -1,7 +1,14 @@
use async_trait::async_trait; use async_trait::async_trait;
use futures::future;
use miette::Result; use miette::Result;
/// The result of combining two processing steps
pub struct ProcessingChain<S1: ProcessingStep, S2: ProcessingStep<Input = S1::Output>>(S1, S2); pub struct ProcessingChain<S1: ProcessingStep, S2: ProcessingStep<Input = S1::Output>>(S1, S2);
/// An adapter to execute a step with multiple inputs in parallel
pub struct ParallelPipeline<S: ProcessingStep>(S);
/// A generic wrapper for processing pipelines
pub struct ProcessingPipeline<I: Send + Sync, O: Send + Sync>( pub struct ProcessingPipeline<I: Send + Sync, O: Send + Sync>(
Box<dyn ProcessingStep<Input = I, Output = O>>, Box<dyn ProcessingStep<Input = I, Output = O>>,
); );
@ -27,6 +34,22 @@ impl<S1: ProcessingStep, S2: ProcessingStep<Input = S1::Output>> ProcessingStep
} }
} }
impl<S: ProcessingStep> ParallelPipeline<S> {
pub fn new(step: S) -> Self {
Self(step)
}
}
#[async_trait]
impl<S: ProcessingStep> ProcessingStep for ParallelPipeline<S> {
type Input = Vec<S::Input>;
type Output = Vec<S::Output>;
async fn process(&self, input: Self::Input) -> Result<Self::Output> {
future::try_join_all(input.into_iter().map(|i| self.0.process(i))).await
}
}
pub trait ProcessingStepChain: Sized + ProcessingStep { pub trait ProcessingStepChain: Sized + ProcessingStep {
fn chain<S: ProcessingStep<Input = Self::Output>>(self, other: S) -> ProcessingChain<Self, S> { fn chain<S: ProcessingStep<Input = Self::Output>>(self, other: S) -> ProcessingChain<Self, S> {
ProcessingChain(self, other) ProcessingChain(self, other)

@ -23,8 +23,8 @@ pub struct ContentRenderer {
impl ContentRenderer { impl ContentRenderer {
pub async fn new(ctx: Arc<Context>) -> Result<Self> { pub async fn new(ctx: Arc<Context>) -> Result<Self> {
let template_glob = format!("{}/**/*", ctx.template_dir.to_string_lossy()); let template_glob = format!("{}/**/*", ctx.dirs.template_dir.to_string_lossy());
let styles = load_stylesheets(&ctx.stylesheet_dir).await?; let styles = load_stylesheets(&ctx.dirs.stylesheet_dir).await?;
Ok(Self { Ok(Self {
template_glob, template_glob,
@ -35,8 +35,8 @@ impl ContentRenderer {
#[tracing::instrument(level = "trace", skip_all)] #[tracing::instrument(level = "trace", skip_all)]
pub async fn render_all(&self, dirs: Vec<FolderData>) -> Result<()> { pub async fn render_all(&self, dirs: Vec<FolderData>) -> Result<()> {
if self.ctx.output_dir.exists() { if self.ctx.dirs.output_dir.exists() {
fs::remove_dir_all(&self.ctx.output_dir) fs::remove_dir_all(&self.ctx.dirs.output_dir)
.await .await
.into_diagnostic()?; .into_diagnostic()?;
} }
@ -99,7 +99,7 @@ impl ContentRenderer {
{ {
let mut styles = self.styles.lock().await; let mut styles = self.styles.lock().await;
let style_embed = styles let style_embed = styles
.get_style_embed(&style_name, &self.ctx.output_dir) .get_style_embed(&style_name, &self.ctx.dirs.output_dir)
.await?; .await?;
context.insert("style", &style_embed); context.insert("style", &style_embed);
}; };
@ -110,9 +110,9 @@ impl ContentRenderer {
.render(&format!("{template_name}.html"), &context) .render(&format!("{template_name}.html"), &context)
.into_diagnostic()?; .into_diagnostic()?;
let rel_path = page_path let rel_path = page_path
.strip_prefix(&self.ctx.content_dir) .strip_prefix(&self.ctx.dirs.content_dir)
.into_diagnostic()?; .into_diagnostic()?;
let mut out_path = self.ctx.output_dir.join(rel_path); let mut out_path = self.ctx.dirs.output_dir.join(rel_path);
out_path.set_extension("html"); out_path.set_extension("html");
let parent = out_path.parent().unwrap(); let parent = out_path.parent().unwrap();

Loading…
Cancel
Save