From 284890450777d27cb5651eb0eeecd4397ad18d67 Mon Sep 17 00:00:00 2001 From: Trivernis Date: Wed, 13 Jan 2021 09:38:37 +0100 Subject: [PATCH] Add shared buffers Signed-off-by: Trivernis --- Cargo.toml | 5 ++-- src/executor/mod.rs | 17 ++++++++++++- src/lib.rs | 12 ++++++--- src/traits/mod.rs | 2 ++ src/traits/to_shared_buffer.rs | 27 ++++++++++++++++++++ src/utils/mod.rs | 1 + src/utils/shared_buffer.rs | 45 ++++++++++++++++++++++++++++++++++ 7 files changed, 102 insertions(+), 7 deletions(-) create mode 100644 src/traits/to_shared_buffer.rs create mode 100644 src/utils/shared_buffer.rs diff --git a/Cargo.toml b/Cargo.toml index 93a6f4e..22b3a66 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,7 +4,7 @@ description = "OpenCL Stream execution framework" repository = "https://github.com/parallel-programming-hwr/ocl-stream-rs" license = "Apache-2.0" readme = "README.md" -version = "0.3.0" +version = "0.3.1" authors = ["Trivernis "] edition = "2018" @@ -14,4 +14,5 @@ edition = "2018" ocl = "0.19.3" num_cpus = "1.13.0" crossbeam-channel = "0.5.0" -thiserror = "1.0.23" \ No newline at end of file +thiserror = "1.0.23" +parking_lot = "0.11.1" \ No newline at end of file diff --git a/src/executor/mod.rs b/src/executor/mod.rs index 929938b..4a9bc31 100644 --- a/src/executor/mod.rs +++ b/src/executor/mod.rs @@ -7,7 +7,8 @@ use crate::executor::context::ExecutorContext; use crate::executor::stream::{OCLStream, OCLStreamSender}; use crate::utils::result::OCLStreamResult; -use ocl::ProQue; +use crate::utils::shared_buffer::SharedBuffer; +use ocl::{OclPrm, ProQue}; use std::sync::Arc; use std::thread; @@ -59,6 +60,11 @@ impl OCLStreamExecutor { stream } + /// Returns the inner pro_que object + pub fn pro_que(&self) -> &ProQue { + &self.pro_que + } + /// Executes a closure in the ocl context with an unbounded channel /// for streaming pub fn execute_unbounded(&self, func: F) -> OCLStream @@ -97,6 +103,15 @@ impl OCLStreamExecutor { } } + /// Creates a new shard buffer with a given length + pub fn create_shared_buffer(&self, len: usize) -> ocl::Result> + where + T: OclPrm, + { + let buffer = self.pro_que.buffer_builder().len(len).build()?; + Ok(SharedBuffer::new(buffer)) + } + /// Builds the executor context for the executor fn build_context(&self, task_id: usize, sender: OCLStreamSender) -> ExecutorContext where diff --git a/src/lib.rs b/src/lib.rs index ba884bd..b45853f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -17,7 +17,9 @@ pub use ocl; mod tests { use crate::executor::OCLStreamExecutor; use crate::traits::*; + use crate::utils::shared_buffer::SharedBuffer; use ocl::ProQue; + use std::ops::Deref; #[test] fn it_streams_ocl_calculations() { @@ -37,16 +39,18 @@ mod tests { .build() .unwrap(); let stream_executor = OCLStreamExecutor::new(pro_que); + let input_buffer: SharedBuffer = vec![0u32; 100] + .to_shared_buffer(stream_executor.pro_que()) + .unwrap(); - let mut stream = stream_executor.execute_bounded(10, |ctx| { + let mut stream = stream_executor.execute_bounded(10, move |ctx| { let pro_que = ctx.pro_que(); let tx = ctx.sender(); - let input_buffer = vec![0u32; 100].to_ocl_buffer(pro_que)?; let kernel = pro_que .kernel_builder("bench_int") .arg(100) - .arg(&input_buffer) + .arg(input_buffer.inner().lock().deref()) .global_work_size(100) .build()?; unsafe { @@ -54,7 +58,7 @@ mod tests { } let mut result = vec![0u32; 100]; - input_buffer.read(&mut result).enq()?; + input_buffer.read(&mut result)?; for num in result { tx.send(num)?; diff --git a/src/traits/mod.rs b/src/traits/mod.rs index a1720f9..8afe915 100644 --- a/src/traits/mod.rs +++ b/src/traits/mod.rs @@ -5,4 +5,6 @@ */ pub mod to_ocl_buffer; +pub mod to_shared_buffer; pub use to_ocl_buffer::*; +pub use to_shared_buffer::*; diff --git a/src/traits/to_shared_buffer.rs b/src/traits/to_shared_buffer.rs new file mode 100644 index 0000000..8f619ca --- /dev/null +++ b/src/traits/to_shared_buffer.rs @@ -0,0 +1,27 @@ +/* + * opencl stream executor + * Copyright (C) 2021 trivernis + * See LICENSE for more information + */ + +use crate::traits::ToOclBuffer; +use crate::utils::shared_buffer::SharedBuffer; +use ocl::{OclPrm, ProQue}; + +pub trait ToSharedBuffer +where + T: OclPrm, +{ + fn to_shared_buffer(&self, pro_que: &ProQue) -> ocl::Result>; +} + +impl ToSharedBuffer for Vec +where + T: OclPrm, +{ + fn to_shared_buffer(&self, pro_que: &ProQue) -> ocl::Result> { + let buffer = self.to_ocl_buffer(pro_que)?; + + Ok(SharedBuffer::new(buffer)) + } +} diff --git a/src/utils/mod.rs b/src/utils/mod.rs index 99aeea8..90264b3 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -5,3 +5,4 @@ */ pub mod result; +pub mod shared_buffer; diff --git a/src/utils/shared_buffer.rs b/src/utils/shared_buffer.rs new file mode 100644 index 0000000..43730d0 --- /dev/null +++ b/src/utils/shared_buffer.rs @@ -0,0 +1,45 @@ +/* + * opencl stream executor + * Copyright (C) 2021 trivernis + * See LICENSE for more information + */ +use ocl::{Buffer, OclPrm}; +use parking_lot::Mutex; +use std::sync::Arc; + +#[derive(Clone)] +pub struct SharedBuffer +where + T: OclPrm, +{ + inner: Arc>>, +} + +impl SharedBuffer +where + T: OclPrm, +{ + /// Creates a new shared buffer with an inner ocl buffer + pub fn new(buf: Buffer) -> Self { + Self { + inner: Arc::new(Mutex::new(buf)), + } + } + + /// Writes into the buffer + pub fn write(&self, src: &[T]) -> ocl::Result<()> { + let buffer = self.inner.lock(); + buffer.write(src).enq() + } + + /// Reads from the buffer + pub fn read(&self, dst: &mut [T]) -> ocl::Result<()> { + let buffer = self.inner.lock(); + buffer.read(dst).enq() + } + + /// Returns the inner buffer + pub fn inner(&self) -> Arc>> { + Arc::clone(&self.inner) + } +}