|
|
@ -11,6 +11,12 @@ pub struct Parallel<S: ProcessingStep>(S);
|
|
|
|
/// An adapter to map the result of the pipeline
|
|
|
|
/// An adapter to map the result of the pipeline
|
|
|
|
pub struct Map<S: ProcessingStep, T: Send + Sync>(S, Box<dyn Fn(S::Output) -> T + Send + Sync>);
|
|
|
|
pub struct Map<S: ProcessingStep, T: Send + Sync>(S, Box<dyn Fn(S::Output) -> T + Send + Sync>);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/// An adapter to dynamically construct the next step mapper depending on the previous one
|
|
|
|
|
|
|
|
pub struct Construct<S1: ProcessingStep, S2: ProcessingStep<Input = T>, T>(
|
|
|
|
|
|
|
|
S1,
|
|
|
|
|
|
|
|
Box<dyn Fn(S1::Output) -> (T, S2) + Send + Sync>,
|
|
|
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
|
|
/// A generic wrapper for processing pipelines
|
|
|
|
/// A generic wrapper for processing pipelines
|
|
|
|
pub struct ProcessingPipeline<I: Send + Sync, O: Send + Sync>(
|
|
|
|
pub struct ProcessingPipeline<I: Send + Sync, O: Send + Sync>(
|
|
|
|
Box<dyn ProcessingStep<Input = I, Output = O>>,
|
|
|
|
Box<dyn ProcessingStep<Input = I, Output = O>>,
|
|
|
@ -87,10 +93,41 @@ impl<S: ProcessingStep, T: Send + Sync> ProcessingStep for Map<S, T> {
|
|
|
|
|
|
|
|
|
|
|
|
async fn process(&self, input: Self::Input) -> Result<Self::Output> {
|
|
|
|
async fn process(&self, input: Self::Input) -> Result<Self::Output> {
|
|
|
|
let inner_result = self.0.process(input).await?;
|
|
|
|
let inner_result = self.0.process(input).await?;
|
|
|
|
|
|
|
|
|
|
|
|
Ok(self.1(inner_result))
|
|
|
|
Ok(self.1(inner_result))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
pub trait ProcessingConstruct: ProcessingStep + Sized {
|
|
|
|
|
|
|
|
fn construct<
|
|
|
|
|
|
|
|
F: Fn(Self::Output) -> (T, S) + Send + Sync + 'static,
|
|
|
|
|
|
|
|
S: ProcessingStep<Input = T>,
|
|
|
|
|
|
|
|
T: Send + Sync,
|
|
|
|
|
|
|
|
>(
|
|
|
|
|
|
|
|
self,
|
|
|
|
|
|
|
|
construct_fn: F,
|
|
|
|
|
|
|
|
) -> Construct<Self, S, T> {
|
|
|
|
|
|
|
|
Construct(self, Box::new(construct_fn))
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
impl<S: ProcessingStep> ProcessingConstruct for S {}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
#[async_trait]
|
|
|
|
|
|
|
|
impl<S1: ProcessingStep, S2: ProcessingStep<Input = T>, T: Send + Sync> ProcessingStep
|
|
|
|
|
|
|
|
for Construct<S1, S2, T>
|
|
|
|
|
|
|
|
{
|
|
|
|
|
|
|
|
type Input = S1::Input;
|
|
|
|
|
|
|
|
type Output = S2::Output;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async fn process(&self, input: Self::Input) -> Result<Self::Output> {
|
|
|
|
|
|
|
|
let inner_output = self.0.process(input).await?;
|
|
|
|
|
|
|
|
let (new_input, step) = self.1(inner_output);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
step.process(new_input).await
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
#[async_trait]
|
|
|
|
#[async_trait]
|
|
|
|
impl<I: Send + Sync, O: Send + Sync> ProcessingStep for ProcessingPipeline<I, O> {
|
|
|
|
impl<I: Send + Sync, O: Send + Sync> ProcessingStep for ProcessingPipeline<I, O> {
|
|
|
|
type Input = I;
|
|
|
|
type Input = I;
|
|
|
|