Remove thread pools and use raw threads

Signed-off-by: Trivernis <trivernis@protonmail.com>
main
Trivernis 4 years ago
parent a47edc97dc
commit b28c17e4b1

@ -4,7 +4,7 @@ description = "OpenCL Stream execution framework"
repository = "https://github.com/parallel-programming-hwr/ocl-stream-rs"
license = "Apache-2.0"
readme = "README.md"
version = "0.2.1"
version = "0.3.0"
authors = ["Trivernis <trivernis@protonmail.com>"]
edition = "2018"
@ -13,6 +13,5 @@ edition = "2018"
[dependencies]
ocl = "0.19.3"
num_cpus = "1.13.0"
scheduled-thread-pool = "0.2.5"
crossbeam-channel = "0.5.0"
thiserror = "1.0.23"

@ -8,8 +8,8 @@ use crate::executor::context::ExecutorContext;
use crate::executor::stream::{OCLStream, OCLStreamSender};
use crate::utils::result::OCLStreamResult;
use ocl::ProQue;
use scheduled_thread_pool::ScheduledThreadPool;
use std::sync::Arc;
use std::thread;
pub mod context;
pub mod stream;
@ -18,7 +18,6 @@ pub mod stream;
#[derive(Clone)]
pub struct OCLStreamExecutor {
pro_que: ProQue,
pool: Arc<ScheduledThreadPool>,
concurrency: usize,
}
@ -33,7 +32,6 @@ impl OCLStreamExecutor {
pub fn new(pro_que: ProQue) -> Self {
Self {
pro_que,
pool: Arc::new(ScheduledThreadPool::new(num_cpus::get())),
concurrency: 1,
}
}
@ -49,11 +47,6 @@ impl OCLStreamExecutor {
self.concurrency = num_tasks;
}
/// Replaces the used pool with a new one
pub fn set_pool(&mut self, pool: ScheduledThreadPool) {
self.pool = Arc::new(pool);
}
/// Executes a closure in the ocl context with a bounded channel
pub fn execute_bounded<F, T>(&self, size: usize, func: F) -> OCLStream<T>
where
@ -91,13 +84,16 @@ impl OCLStreamExecutor {
let func = Arc::clone(&func);
let context = self.build_context(task_id, sender.clone());
self.pool.execute(move || {
let sender2 = context.sender().clone();
thread::Builder::new()
.name(format!("ocl-{}", task_id))
.spawn(move || {
let sender = context.sender().clone();
if let Err(e) = func(context) {
sender2.err(e).unwrap();
}
});
if let Err(e) = func(context) {
sender.err(e).unwrap();
}
})
.expect("Failed to spawn ocl thread");
}
}

@ -5,6 +5,7 @@
*/
pub mod executor;
pub mod traits;
pub mod utils;
pub use executor::stream;
@ -15,6 +16,7 @@ pub use ocl;
#[cfg(test)]
mod tests {
use crate::executor::OCLStreamExecutor;
use crate::traits::*;
use ocl::ProQue;
#[test]
@ -39,7 +41,7 @@ mod tests {
let mut stream = stream_executor.execute_bounded(10, |ctx| {
let pro_que = ctx.pro_que();
let tx = ctx.sender();
let input_buffer = pro_que.buffer_builder().len(100).fill_val(0u32).build()?;
let input_buffer = vec![0u32; 100].to_ocl_buffer(pro_que)?;
let kernel = pro_que
.kernel_builder("bench_int")

@ -0,0 +1,8 @@
/*
* opencl stream executor
* Copyright (C) 2021 trivernis
* See LICENSE for more information
*/
pub mod to_ocl_buffer;
pub use to_ocl_buffer::*;

@ -0,0 +1,26 @@
/*
* opencl stream executor
* Copyright (C) 2021 trivernis
* See LICENSE for more information
*/
use ocl::{Buffer, OclPrm, ProQue};
pub trait ToOclBuffer<T>
where
T: OclPrm,
{
fn to_ocl_buffer(&self, pro_que: &ProQue) -> ocl::Result<Buffer<T>>;
}
impl<T> ToOclBuffer<T> for Vec<T>
where
T: OclPrm,
{
fn to_ocl_buffer(&self, pro_que: &ProQue) -> ocl::Result<Buffer<T>> {
let buffer = pro_que.buffer_builder().len(self.len()).build()?;
buffer.write(&self[..]).enq()?;
Ok(buffer)
}
}
Loading…
Cancel
Save