Add shared buffers

Signed-off-by: Trivernis <trivernis@protonmail.com>
main
Trivernis 4 years ago
parent b28c17e4b1
commit 2848904507

@ -4,7 +4,7 @@ description = "OpenCL Stream execution framework"
repository = "https://github.com/parallel-programming-hwr/ocl-stream-rs"
license = "Apache-2.0"
readme = "README.md"
version = "0.3.0"
version = "0.3.1"
authors = ["Trivernis <trivernis@protonmail.com>"]
edition = "2018"
@ -14,4 +14,5 @@ edition = "2018"
ocl = "0.19.3"
num_cpus = "1.13.0"
crossbeam-channel = "0.5.0"
thiserror = "1.0.23"
thiserror = "1.0.23"
parking_lot = "0.11.1"

@ -7,7 +7,8 @@
use crate::executor::context::ExecutorContext;
use crate::executor::stream::{OCLStream, OCLStreamSender};
use crate::utils::result::OCLStreamResult;
use ocl::ProQue;
use crate::utils::shared_buffer::SharedBuffer;
use ocl::{OclPrm, ProQue};
use std::sync::Arc;
use std::thread;
@ -59,6 +60,11 @@ impl OCLStreamExecutor {
stream
}
/// Returns the inner pro_que object
pub fn pro_que(&self) -> &ProQue {
&self.pro_que
}
/// Executes a closure in the ocl context with an unbounded channel
/// for streaming
pub fn execute_unbounded<F, T>(&self, func: F) -> OCLStream<T>
@ -97,6 +103,15 @@ impl OCLStreamExecutor {
}
}
/// Creates a new shard buffer with a given length
pub fn create_shared_buffer<T>(&self, len: usize) -> ocl::Result<SharedBuffer<T>>
where
T: OclPrm,
{
let buffer = self.pro_que.buffer_builder().len(len).build()?;
Ok(SharedBuffer::new(buffer))
}
/// Builds the executor context for the executor
fn build_context<T>(&self, task_id: usize, sender: OCLStreamSender<T>) -> ExecutorContext<T>
where

@ -17,7 +17,9 @@ pub use ocl;
mod tests {
use crate::executor::OCLStreamExecutor;
use crate::traits::*;
use crate::utils::shared_buffer::SharedBuffer;
use ocl::ProQue;
use std::ops::Deref;
#[test]
fn it_streams_ocl_calculations() {
@ -37,16 +39,18 @@ mod tests {
.build()
.unwrap();
let stream_executor = OCLStreamExecutor::new(pro_que);
let input_buffer: SharedBuffer<u32> = vec![0u32; 100]
.to_shared_buffer(stream_executor.pro_que())
.unwrap();
let mut stream = stream_executor.execute_bounded(10, |ctx| {
let mut stream = stream_executor.execute_bounded(10, move |ctx| {
let pro_que = ctx.pro_que();
let tx = ctx.sender();
let input_buffer = vec![0u32; 100].to_ocl_buffer(pro_que)?;
let kernel = pro_que
.kernel_builder("bench_int")
.arg(100)
.arg(&input_buffer)
.arg(input_buffer.inner().lock().deref())
.global_work_size(100)
.build()?;
unsafe {
@ -54,7 +58,7 @@ mod tests {
}
let mut result = vec![0u32; 100];
input_buffer.read(&mut result).enq()?;
input_buffer.read(&mut result)?;
for num in result {
tx.send(num)?;

@ -5,4 +5,6 @@
*/
pub mod to_ocl_buffer;
pub mod to_shared_buffer;
pub use to_ocl_buffer::*;
pub use to_shared_buffer::*;

@ -0,0 +1,27 @@
/*
* opencl stream executor
* Copyright (C) 2021 trivernis
* See LICENSE for more information
*/
use crate::traits::ToOclBuffer;
use crate::utils::shared_buffer::SharedBuffer;
use ocl::{OclPrm, ProQue};
pub trait ToSharedBuffer<T>
where
T: OclPrm,
{
fn to_shared_buffer(&self, pro_que: &ProQue) -> ocl::Result<SharedBuffer<T>>;
}
impl<T> ToSharedBuffer<T> for Vec<T>
where
T: OclPrm,
{
fn to_shared_buffer(&self, pro_que: &ProQue) -> ocl::Result<SharedBuffer<T>> {
let buffer = self.to_ocl_buffer(pro_que)?;
Ok(SharedBuffer::new(buffer))
}
}

@ -5,3 +5,4 @@
*/
pub mod result;
pub mod shared_buffer;

@ -0,0 +1,45 @@
/*
* opencl stream executor
* Copyright (C) 2021 trivernis
* See LICENSE for more information
*/
use ocl::{Buffer, OclPrm};
use parking_lot::Mutex;
use std::sync::Arc;
#[derive(Clone)]
pub struct SharedBuffer<T>
where
T: OclPrm,
{
inner: Arc<Mutex<Buffer<T>>>,
}
impl<T> SharedBuffer<T>
where
T: OclPrm,
{
/// Creates a new shared buffer with an inner ocl buffer
pub fn new(buf: Buffer<T>) -> Self {
Self {
inner: Arc::new(Mutex::new(buf)),
}
}
/// Writes into the buffer
pub fn write(&self, src: &[T]) -> ocl::Result<()> {
let buffer = self.inner.lock();
buffer.write(src).enq()
}
/// Reads from the buffer
pub fn read(&self, dst: &mut [T]) -> ocl::Result<()> {
let buffer = self.inner.lock();
buffer.read(dst).enq()
}
/// Returns the inner buffer
pub fn inner(&self) -> Arc<Mutex<Buffer<T>>> {
Arc::clone(&self.inner)
}
}
Loading…
Cancel
Save