diff --git a/Cargo.toml b/Cargo.toml index 09d0a0f..93a6f4e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,7 +4,7 @@ description = "OpenCL Stream execution framework" repository = "https://github.com/parallel-programming-hwr/ocl-stream-rs" license = "Apache-2.0" readme = "README.md" -version = "0.2.1" +version = "0.3.0" authors = ["Trivernis "] edition = "2018" @@ -13,6 +13,5 @@ edition = "2018" [dependencies] ocl = "0.19.3" num_cpus = "1.13.0" -scheduled-thread-pool = "0.2.5" crossbeam-channel = "0.5.0" thiserror = "1.0.23" \ No newline at end of file diff --git a/src/executor/mod.rs b/src/executor/mod.rs index 27c3140..929938b 100644 --- a/src/executor/mod.rs +++ b/src/executor/mod.rs @@ -8,8 +8,8 @@ use crate::executor::context::ExecutorContext; use crate::executor::stream::{OCLStream, OCLStreamSender}; use crate::utils::result::OCLStreamResult; use ocl::ProQue; -use scheduled_thread_pool::ScheduledThreadPool; use std::sync::Arc; +use std::thread; pub mod context; pub mod stream; @@ -18,7 +18,6 @@ pub mod stream; #[derive(Clone)] pub struct OCLStreamExecutor { pro_que: ProQue, - pool: Arc, concurrency: usize, } @@ -33,7 +32,6 @@ impl OCLStreamExecutor { pub fn new(pro_que: ProQue) -> Self { Self { pro_que, - pool: Arc::new(ScheduledThreadPool::new(num_cpus::get())), concurrency: 1, } } @@ -49,11 +47,6 @@ impl OCLStreamExecutor { self.concurrency = num_tasks; } - /// Replaces the used pool with a new one - pub fn set_pool(&mut self, pool: ScheduledThreadPool) { - self.pool = Arc::new(pool); - } - /// Executes a closure in the ocl context with a bounded channel pub fn execute_bounded(&self, size: usize, func: F) -> OCLStream where @@ -91,13 +84,16 @@ impl OCLStreamExecutor { let func = Arc::clone(&func); let context = self.build_context(task_id, sender.clone()); - self.pool.execute(move || { - let sender2 = context.sender().clone(); + thread::Builder::new() + .name(format!("ocl-{}", task_id)) + .spawn(move || { + let sender = context.sender().clone(); - if let Err(e) = func(context) { - sender2.err(e).unwrap(); - } - }); + if let Err(e) = func(context) { + sender.err(e).unwrap(); + } + }) + .expect("Failed to spawn ocl thread"); } } diff --git a/src/lib.rs b/src/lib.rs index edd9dce..ba884bd 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,6 +5,7 @@ */ pub mod executor; +pub mod traits; pub mod utils; pub use executor::stream; @@ -15,6 +16,7 @@ pub use ocl; #[cfg(test)] mod tests { use crate::executor::OCLStreamExecutor; + use crate::traits::*; use ocl::ProQue; #[test] @@ -39,7 +41,7 @@ mod tests { let mut stream = stream_executor.execute_bounded(10, |ctx| { let pro_que = ctx.pro_que(); let tx = ctx.sender(); - let input_buffer = pro_que.buffer_builder().len(100).fill_val(0u32).build()?; + let input_buffer = vec![0u32; 100].to_ocl_buffer(pro_que)?; let kernel = pro_que .kernel_builder("bench_int") diff --git a/src/traits/mod.rs b/src/traits/mod.rs new file mode 100644 index 0000000..a1720f9 --- /dev/null +++ b/src/traits/mod.rs @@ -0,0 +1,8 @@ +/* + * opencl stream executor + * Copyright (C) 2021 trivernis + * See LICENSE for more information + */ + +pub mod to_ocl_buffer; +pub use to_ocl_buffer::*; diff --git a/src/traits/to_ocl_buffer.rs b/src/traits/to_ocl_buffer.rs new file mode 100644 index 0000000..78f1b30 --- /dev/null +++ b/src/traits/to_ocl_buffer.rs @@ -0,0 +1,26 @@ +/* + * opencl stream executor + * Copyright (C) 2021 trivernis + * See LICENSE for more information + */ + +use ocl::{Buffer, OclPrm, ProQue}; + +pub trait ToOclBuffer +where + T: OclPrm, +{ + fn to_ocl_buffer(&self, pro_que: &ProQue) -> ocl::Result>; +} + +impl ToOclBuffer for Vec +where + T: OclPrm, +{ + fn to_ocl_buffer(&self, pro_que: &ProQue) -> ocl::Result> { + let buffer = pro_que.buffer_builder().len(self.len()).build()?; + buffer.write(&self[..]).enq()?; + + Ok(buffer) + } +}