Add executor and required structs for execution

Signed-off-by: Trivernis <trivernis@protonmail.com>
main
Trivernis 3 years ago
commit 9ac070224b

2
.gitignore vendored

@ -0,0 +1,2 @@
/target
Cargo.lock

8
.idea/.gitignore vendored

@ -0,0 +1,8 @@
# Default ignored files
/shelf/
/workspace.xml
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml
# Editor-based HTTP Client requests
/httpRequests/

@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/ocl-stream.iml" filepath="$PROJECT_DIR$/.idea/ocl-stream.iml" />
</modules>
</component>
</project>

@ -0,0 +1,11 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="CPP_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$">
<sourceFolder url="file://$MODULE_DIR$/src" isTestSource="false" />
<excludeFolder url="file://$MODULE_DIR$/target" />
</content>
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>

@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$" vcs="Git" />
</component>
</project>

@ -0,0 +1,13 @@
[package]
name = "ocl-stream"
version = "0.1.0"
authors = ["Trivernis <trivernis@protonmail.com>"]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
ocl = "0.19.3"
num_cpus = "1.13.0"
scheduled-thread-pool = "0.2.5"
crossbeam-channel = "0.5.0"

@ -0,0 +1,45 @@
use crate::executor::ocl_stream::OCLStreamSender;
use ocl::ProQue;
/// Context passed to the executing closure
/// to provide additional information and
/// access to the ProQue.
#[derive(Clone)]
pub struct ExecutorContext<T>
where
T: Send + Sync,
{
pro_que: ProQue,
sender: OCLStreamSender<T>,
task_id: usize,
}
impl<T> ExecutorContext<T>
where
T: Send + Sync,
{
/// Creates a new executor context.
pub fn new(pro_que: ProQue, task_id: usize, sender: OCLStreamSender<T>) -> Self {
Self {
pro_que,
task_id,
sender,
}
}
/// Returns the ProQue
pub fn pro_que(&self) -> &ProQue {
&self.pro_que
}
/// Returns the Sender
pub fn sender(&self) -> &OCLStreamSender<T> {
&self.sender
}
/// Returns the unique task id of the scheduled
/// task
pub fn task_id(&self) -> usize {
self.task_id
}
}

@ -0,0 +1,72 @@
use crate::executor::context::ExecutorContext;
use crate::executor::ocl_stream::{OCLStream, OCLStreamSender};
use crate::utils::result::OCLStreamResult;
use ocl::ProQue;
use scheduled_thread_pool::ScheduledThreadPool;
use std::sync::Arc;
pub mod context;
pub mod ocl_stream;
/// Stream executor for OpenCL Programs
#[derive(Clone)]
pub struct OCLStreamExecutor {
pro_que: ProQue,
pool: Arc<ScheduledThreadPool>,
concurrency: usize,
}
impl OCLStreamExecutor {
/// Creates a new OpenCL Stream executor
pub fn new(pro_que: ProQue, pool: ScheduledThreadPool) -> Self {
Self {
pro_que,
pool: Arc::new(pool),
concurrency: 1,
}
}
/// Sets how many threads should be used to schedule kernels on
/// the gpu. Using multiple threads reduces the idle time of the gpu.
/// While one kernel is running, the next one can be prepared in a
/// different thread. A value of 0 means that the number of cpu cores should be used.
pub fn set_concurrency(&mut self, mut num_tasks: usize) {
if num_tasks == 0 {
num_tasks = num_cpus::get();
}
self.concurrency = num_tasks;
}
/// Executes a closure in the ocl context
pub fn execute<F, T>(&self, func: F) -> OCLStream<T>
where
F: Fn(ExecutorContext<T>) -> OCLStreamResult<()> + Send + Sync + 'static,
T: Send + Sync + 'static,
{
let (stream, sender) = ocl_stream::create();
let func = Arc::new(func);
for task_id in 0..(self.concurrency) {
let func = Arc::clone(&func);
let context = self.build_context(task_id, sender.clone());
self.pool.execute(move || {
let sender2 = context.sender().clone();
if let Err(e) = func(context) {
sender2.err(e).unwrap();
}
});
}
stream
}
/// Builds the executor context for the executor
fn build_context<T>(&self, task_id: usize, sender: OCLStreamSender<T>) -> ExecutorContext<T>
where
T: Send + Sync,
{
ExecutorContext::new(self.pro_que.clone(), task_id, sender)
}
}

@ -0,0 +1,78 @@
use crossbeam_channel::{Receiver, Sender};
use crate::utils::result::{OCLStreamError, OCLStreamResult};
/// Creates a new OCLStream with the corresponding sender
/// to communicate between the scheduler thread and the receiver thread
pub fn create<T>() -> (OCLStream<T>, OCLStreamSender<T>)
where
T: Send + Sync,
{
let (tx, rx) = crossbeam_channel::unbounded();
let stream = OCLStream { rx };
let sender = OCLStreamSender { tx };
(stream, sender)
}
/// Receiver for OCL Data
#[derive(Clone, Debug)]
pub struct OCLStream<T>
where
T: Send + Sync,
{
rx: Receiver<OCLStreamResult<T>>,
}
impl<T> OCLStream<T>
where
T: Send + Sync,
{
/// Reads the next value from the channel
pub fn next(&mut self) -> Result<T, OCLStreamError> {
self.rx.recv()?
}
/// Returns if there is a value in the channel
pub fn has_next(&self) -> bool {
!self.rx.is_empty()
}
}
/// Sender for OCL Data
pub struct OCLStreamSender<T>
where
T: Send + Sync,
{
tx: Sender<OCLStreamResult<T>>,
}
impl<T> Clone for OCLStreamSender<T>
where
T: Send + Sync,
{
fn clone(&self) -> Self {
Self {
tx: self.tx.clone(),
}
}
}
impl<T> OCLStreamSender<T>
where
T: Send + Sync,
{
/// Sends a value into the channel
pub fn send(&self, value: T) -> OCLStreamResult<()> {
self.tx
.send(Ok(value))
.map_err(|_| OCLStreamError::SendError)
}
/// Sends an error into the channel
pub fn err(&self, err: OCLStreamError) -> OCLStreamResult<()> {
self.tx
.send(Err(err))
.map_err(|_| OCLStreamError::SendError)
}
}

@ -0,0 +1,67 @@
pub mod executor;
pub mod utils;
pub use executor::ocl_stream;
pub use executor::OCLStreamExecutor;
#[cfg(test)]
mod tests {
use crate::executor::OCLStreamExecutor;
use ocl::ProQue;
use scheduled_thread_pool::ScheduledThreadPool;
#[test]
fn it_streams_ocl_calculations() {
let pro_que = ProQue::builder()
.src(
"\
__kernel void bench_int(const uint limit, __global int *NUMBERS) {
uint id = get_global_id(0);
int num = NUMBERS[id];
for (int i = 0; i < limit; i++) {
num += i;
}
NUMBERS[id] = num;
}",
)
.dims(1)
.build()
.unwrap();
let pool = ScheduledThreadPool::new(num_cpus::get());
let stream_executor = OCLStreamExecutor::new(pro_que, pool);
let mut stream = stream_executor.execute(|ctx| {
let pro_que = ctx.pro_que();
let tx = ctx.sender();
let input_buffer = pro_que.buffer_builder().len(100).fill_val(0u32).build()?;
let kernel = pro_que
.kernel_builder("bench_int")
.arg(100)
.arg(&input_buffer)
.global_work_size(100)
.build()?;
unsafe {
kernel.enq()?;
}
let mut result = vec![0u32; 100];
input_buffer.read(&mut result).enq()?;
for num in result {
tx.send(num)?;
}
Ok(())
});
let mut count = 0;
let num = (99f32.powf(2.0) + 99f32) / 2f32;
while let Ok(n) = stream.next() {
assert_eq!(n, num as u32);
count += 1;
}
assert_eq!(count, 100)
}
}

@ -0,0 +1 @@
pub mod result;

@ -0,0 +1,36 @@
use crossbeam_channel::RecvError;
use std::error::Error;
use std::fmt::{self, Display, Formatter};
pub type OCLStreamResult<T> = Result<T, OCLStreamError>;
#[derive(Debug)]
pub enum OCLStreamError {
OCLError(ocl::Error),
RecvError(RecvError),
SendError,
}
impl Display for OCLStreamError {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match self {
OCLStreamError::OCLError(e) => write!(f, "OCL Error: {}", e),
OCLStreamError::RecvError(e) => write!(f, "Stream Receive Error: {}", e),
OCLStreamError::SendError => write!(f, "Stream Send Error"),
}
}
}
impl Error for OCLStreamError {}
impl From<ocl::Error> for OCLStreamError {
fn from(e: ocl::Error) -> Self {
Self::OCLError(e)
}
}
impl From<RecvError> for OCLStreamError {
fn from(e: RecvError) -> Self {
Self::RecvError(e)
}
}
Loading…
Cancel
Save