Add streaming executor

Signed-off-by: Trivernis <trivernis@protonmail.com>
pull/1/head
Trivernis 4 years ago
parent 8704bd387c
commit d89c574589

34
Cargo.lock generated

@ -515,6 +515,18 @@ dependencies = [
"num", "num",
] ]
[[package]]
name = "ocl-stream"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6571c0dc580e1603bdf23e277402b8dea73c0631de6a123b796da9a27681c960"
dependencies = [
"crossbeam-channel 0.5.0",
"num_cpus",
"ocl",
"thiserror",
]
[[package]] [[package]]
name = "parking_lot" name = "parking_lot"
version = "0.11.1" version = "0.11.1"
@ -665,8 +677,10 @@ checksum = "41cc0f7e4d5d4544e8861606a285bb08d3e70712ccc7d2b84d7c0ccfaf4b05ce"
name = "rust-opencl-demo" name = "rust-opencl-demo"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"crossbeam-channel 0.5.0",
"lazy_static", "lazy_static",
"ocl", "ocl",
"ocl-stream",
"parking_lot", "parking_lot",
"rayon", "rayon",
"structopt", "structopt",
@ -773,6 +787,26 @@ dependencies = [
"unicode-width", "unicode-width",
] ]
[[package]]
name = "thiserror"
version = "1.0.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "76cc616c6abf8c8928e2fdcc0dbfab37175edd8fb49a4641066ad1364fdab146"
dependencies = [
"thiserror-impl",
]
[[package]]
name = "thiserror-impl"
version = "1.0.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9be73a2caec27583d0046ef3796c3794f868a5bc813db689eed00c7631275cd1"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]] [[package]]
name = "unicode-segmentation" name = "unicode-segmentation"
version = "1.7.0" version = "1.7.0"

@ -12,3 +12,5 @@ structopt = "0.3.20"
lazy_static = "1.4.0" lazy_static = "1.4.0"
parking_lot = "0.11.1" parking_lot = "0.11.1"
rayon = "1.5.0" rayon = "1.5.0"
ocl-stream = "0.3.0"
crossbeam-channel = "0.5.0"

@ -0,0 +1,22 @@
use ocl::core::{get_event_profiling_info, wait_for_event, ProfilingInfo};
use ocl::{EventList, Kernel, ProQue};
use std::time::Duration;
pub mod result;
/// Runs a benchmark on the kernel
/// The ProQue needs to have profiling enabled
pub fn enqueue_profiled(pro_que: &ProQue, kernel: &Kernel) -> ocl::Result<Duration> {
let event_start = pro_que.queue().enqueue_marker::<EventList>(None)?;
unsafe {
kernel.enq()?;
}
let event_stop = pro_que.queue().enqueue_marker::<EventList>(None)?;
wait_for_event(&event_start)?;
wait_for_event(&event_stop)?;
let start = get_event_profiling_info(&event_start, ProfilingInfo::End)?;
let stop = get_event_profiling_info(&event_stop, ProfilingInfo::Start)?;
let gpu_calc_duration = Duration::from_nanos(stop.time()? - start.time()?);
Ok(gpu_calc_duration)
}

@ -0,0 +1,34 @@
use std::time::Duration;
/// Result of a benched kernel execution
#[derive(Clone, Debug)]
pub struct ProfiledResult<T>
where
T: Send + Sync + Clone,
{
gpu_duration: Duration,
value: T,
}
impl<T> ProfiledResult<T>
where
T: Send + Sync + Clone,
{
/// Creates a new profiled result with the given duraiton and value
pub fn new(gpu_duration: Duration, value: T) -> Self {
Self {
gpu_duration,
value,
}
}
/// Returns the execution duration on the gpu
pub fn gpu_duration(&self) -> &Duration {
&self.gpu_duration
}
/// Returns the value of the result
pub fn value(&self) -> &T {
&self.value
}
}

@ -7,25 +7,30 @@
use ocl::core::DeviceInfo; use ocl::core::DeviceInfo;
use ocl::enums::DeviceInfoResult; use ocl::enums::DeviceInfoResult;
use ocl::{CommandQueueProperties, ProQue}; use ocl::{CommandQueueProperties, ProQue};
use ocl_stream::OCLStreamExecutor;
pub mod bench; pub mod bench;
pub mod primes; pub mod primes;
pub mod primes_streamed;
#[derive(Clone)] #[derive(Clone)]
pub struct KernelController { pub struct KernelController {
pro_que: ProQue, pro_que: ProQue,
executor: OCLStreamExecutor,
} }
impl KernelController { impl KernelController {
pub fn new() -> ocl::Result<Self> { pub fn new() -> ocl::Result<Self> {
let pro_que = ProQue::builder() let pro_que = ProQue::builder()
.src(include_str!("kernel.cl")) .src(include_str!("kernel.cl"))
.dims(1 << 20) .dims(1) // won't be used as buffer sizes are declared explicitly
.queue_properties(CommandQueueProperties::PROFILING_ENABLE) .queue_properties(CommandQueueProperties::PROFILING_ENABLE)
.build()?; .build()?;
let mut executor = OCLStreamExecutor::new(pro_que.clone());
executor.set_concurrency(3);
println!("Using device {}", pro_que.device().name()?); println!("Using device {}", pro_que.device().name()?);
Ok(Self { pro_que }) Ok(Self { pro_que, executor })
} }
/// Prints information about the gpu capabilities /// Prints information about the gpu capabilities

@ -244,7 +244,7 @@ pub fn is_prime(number: u64) -> bool {
} }
#[inline] #[inline]
fn map_gpu_prime_result(input: Vec<u64>, output: Vec<u8>) -> Vec<u64> { pub fn map_gpu_prime_result(input: Vec<u64>, output: Vec<u8>) -> Vec<u64> {
input input
.into_iter() .into_iter()
.enumerate() .enumerate()

@ -0,0 +1,69 @@
use crate::benching::enqueue_profiled;
use crate::benching::result::ProfiledResult;
use crate::kernel_controller::primes::map_gpu_prime_result;
use crate::kernel_controller::KernelController;
use ocl::ProQue;
use ocl_stream::stream::OCLStream;
use ocl_stream::traits::ToOclBuffer;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
impl KernelController {
pub fn get_primes(
&self,
mut start: u64,
stop: u64,
step: usize,
local_size: usize,
) -> OCLStream<ProfiledResult<Vec<u64>>> {
if start % 2 == 0 {
start += 1;
}
let offset = Arc::new(AtomicU64::new(start));
self.executor.execute_bounded(step * 10, move |ctx| {
loop {
let pro_que = ctx.pro_que();
let sender = ctx.sender();
if offset.load(Ordering::SeqCst) >= stop {
break;
}
let offset = offset.fetch_add(step as u64 * 2, Ordering::SeqCst);
let numbers = (offset..(step as u64 * 2 + offset))
.step_by(2)
.collect::<Vec<u64>>();
let result = Self::filter_primes_streamed(pro_que, numbers, local_size)?;
sender.send(result)?;
}
Ok(())
})
}
/// Creates the prime filter kernel and executes it
fn filter_primes_streamed(
pro_que: &ProQue,
numbers: Vec<u64>,
local_size: usize,
) -> ocl::Result<ProfiledResult<Vec<u64>>> {
let output_buffer = pro_que
.buffer_builder()
.len(numbers.len())
.fill_val(0u8)
.build()?;
let input_buffer = numbers.to_ocl_buffer(pro_que)?;
let kernel = pro_que
.kernel_builder("check_prime")
.local_work_size(local_size)
.arg(&input_buffer)
.arg(&output_buffer)
.global_work_size(numbers.len())
.build()?;
let duration = enqueue_profiled(pro_que, &kernel)?;
let mut output = vec![0u8; output_buffer.len()];
output_buffer.read(&mut output).enq()?;
let primes = map_gpu_prime_result(numbers, output);
Ok(ProfiledResult::new(duration, primes))
}
}

@ -7,10 +7,13 @@
use crate::concurrency::executor::ConcurrentKernelExecutor; use crate::concurrency::executor::ConcurrentKernelExecutor;
use crate::kernel_controller::primes::is_prime; use crate::kernel_controller::primes::is_prime;
use crate::kernel_controller::KernelController; use crate::kernel_controller::KernelController;
use crate::output::csv::CSVWriter; use crate::output::create_prime_write_thread;
use crate::output::{create_csv_write_thread, create_prime_write_thread}; use crate::output::csv::ThreadedCSVWriter;
use crate::output::threaded::ThreadedWriter;
use ocl_stream::utils::result::OCLStreamResult;
use rayon::prelude::*; use rayon::prelude::*;
use std::fs::OpenOptions; use std::fs::{File, OpenOptions};
use std::io::BufWriter; use std::io::BufWriter;
use std::mem; use std::mem;
use std::path::PathBuf; use std::path::PathBuf;
@ -18,6 +21,7 @@ use std::sync::mpsc::channel;
use std::time::Duration; use std::time::Duration;
use structopt::StructOpt; use structopt::StructOpt;
mod benching;
mod concurrency; mod concurrency;
mod kernel_controller; mod kernel_controller;
mod output; mod output;
@ -76,8 +80,13 @@ struct CalculatePrimes {
#[structopt(long = "cpu-validate")] #[structopt(long = "cpu-validate")]
cpu_validate: bool, cpu_validate: bool,
/// number of used threads
#[structopt(short = "p", long = "parallel", default_value = "2")] #[structopt(short = "p", long = "parallel", default_value = "2")]
num_threads: usize, num_threads: usize,
/// if the result should be streamed
#[structopt(long = "streamed")]
streamed: bool,
} }
#[derive(StructOpt, Clone, Debug)] #[derive(StructOpt, Clone, Debug)]
@ -128,11 +137,60 @@ fn main() -> ocl::Result<()> {
match opts { match opts {
Opts::Info => controller.print_info(), Opts::Info => controller.print_info(),
Opts::CalculatePrimes(prime_opts) => calculate_primes(prime_opts, controller), Opts::CalculatePrimes(prime_opts) => {
if prime_opts.streamed {
calculate_primes_streamed(prime_opts, controller).unwrap();
Ok(())
} else {
calculate_primes(prime_opts, controller)
}
}
Opts::BenchmarkTaskCount(bench_opts) => bench_task_count(bench_opts, controller), Opts::BenchmarkTaskCount(bench_opts) => bench_task_count(bench_opts, controller),
} }
} }
fn calculate_primes_streamed(
prime_opts: CalculatePrimes,
controller: KernelController,
) -> OCLStreamResult<()> {
let csv_file = open_write_buffered(&prime_opts.timings_file);
let mut csv_writer = ThreadedCSVWriter::new(csv_file, &["first", "count", "gpu_duration"]);
let output_file = open_write_buffered(&prime_opts.output_file);
let output_writer = ThreadedWriter::new(output_file, |v: Vec<u64>| {
v.iter()
.map(|v| v.to_string())
.fold("".to_string(), |a, b| format!("{}\n{}", a, b))
.into_bytes()
});
let mut stream = controller.get_primes(
prime_opts.start_offset,
prime_opts.max_number,
prime_opts.numbers_per_step,
prime_opts.local_size.unwrap_or(128),
);
while let Ok(r) = stream.next() {
let primes = r.value();
let first = *primes.first().unwrap(); // if there's none, rip
println!(
"Calculated {} primes in {:?}, offset: {}",
primes.len(),
r.gpu_duration(),
first
);
csv_writer.add_row(vec![
first.to_string(),
primes.len().to_string(),
duration_to_ms_string(r.gpu_duration()),
]);
output_writer.write(primes.clone());
}
csv_writer.close();
output_writer.close();
Ok(())
}
/// Calculates Prime numbers with GPU acceleration /// Calculates Prime numbers with GPU acceleration
fn calculate_primes(prime_opts: CalculatePrimes, controller: KernelController) -> ocl::Result<()> { fn calculate_primes(prime_opts: CalculatePrimes, controller: KernelController) -> ocl::Result<()> {
let output = BufWriter::new( let output = BufWriter::new(
@ -150,14 +208,12 @@ fn calculate_primes(prime_opts: CalculatePrimes, controller: KernelController) -
.open(&prime_opts.timings_file) .open(&prime_opts.timings_file)
.unwrap(), .unwrap(),
); );
let timings = CSVWriter::new( let mut csv_writer = ThreadedCSVWriter::new(
timings, timings,
&["offset", "count", "gpu_duration", "filter_duration"], &["offset", "count", "gpu_duration", "filter_duration"],
) );
.unwrap();
let (prime_sender, prime_handle) = create_prime_write_thread(output); let (prime_sender, prime_handle) = create_prime_write_thread(output);
let (csv_sender, csv_handle) = create_csv_write_thread(timings);
let mut offset = prime_opts.start_offset; let mut offset = prime_opts.start_offset;
if offset % 2 == 0 { if offset % 2 == 0 {
@ -192,14 +248,12 @@ fn calculate_primes(prime_opts: CalculatePrimes, controller: KernelController) -
prime_opts.numbers_per_step as f64 / prime_result.gpu_duration.as_secs_f64(), prime_opts.numbers_per_step as f64 / prime_result.gpu_duration.as_secs_f64(),
offset, offset,
); );
csv_sender csv_writer.add_row(vec![
.send(vec![ offset.to_string(),
offset.to_string(), primes.len().to_string(),
primes.len().to_string(), duration_to_ms_string(&prime_result.gpu_duration),
duration_to_ms_string(&prime_result.gpu_duration), duration_to_ms_string(&prime_result.filter_duration),
duration_to_ms_string(&prime_result.filter_duration), ]);
])
.unwrap();
if prime_opts.cpu_validate { if prime_opts.cpu_validate {
validate_primes_on_cpu(&primes) validate_primes_on_cpu(&primes)
} }
@ -207,9 +261,8 @@ fn calculate_primes(prime_opts: CalculatePrimes, controller: KernelController) -
} }
mem::drop(prime_sender); mem::drop(prime_sender);
mem::drop(csv_sender);
prime_handle.join().unwrap(); prime_handle.join().unwrap();
csv_handle.join().unwrap(); csv_writer.close();
executor_thread.join().unwrap(); executor_thread.join().unwrap();
Ok(()) Ok(())
@ -224,7 +277,7 @@ fn bench_task_count(opts: BenchmarkTaskCount, controller: KernelController) -> o
.open(opts.benchmark_file) .open(opts.benchmark_file)
.unwrap(), .unwrap(),
); );
let csv_writer = CSVWriter::new( let mut csv_writer = ThreadedCSVWriter::new(
bench_writer, bench_writer,
&[ &[
"local_size", "local_size",
@ -234,9 +287,7 @@ fn bench_task_count(opts: BenchmarkTaskCount, controller: KernelController) -> o
"gpu_duration", "gpu_duration",
"read_duration", "read_duration",
], ],
) );
.unwrap();
let (bench_sender, bench_handle) = create_csv_write_thread(csv_writer);
for n in (opts.num_tasks_start..=opts.num_tasks_stop).step_by(opts.num_tasks_step) { for n in (opts.num_tasks_start..=opts.num_tasks_stop).step_by(opts.num_tasks_step) {
if let (Some(start), Some(stop)) = (opts.local_size_start, opts.local_size_stop) { if let (Some(start), Some(stop)) = (opts.local_size_start, opts.local_size_stop) {
for l in (start..=stop) for l in (start..=stop)
@ -248,16 +299,14 @@ fn bench_task_count(opts: BenchmarkTaskCount, controller: KernelController) -> o
stats.avg(controller.bench_int(opts.calculation_steps, n, Some(l))?) stats.avg(controller.bench_int(opts.calculation_steps, n, Some(l))?)
} }
println!("{}\n", stats); println!("{}\n", stats);
bench_sender csv_writer.add_row(vec![
.send(vec![ l.to_string(),
l.to_string(), n.to_string(),
n.to_string(), opts.calculation_steps.to_string(),
opts.calculation_steps.to_string(), duration_to_ms_string(&stats.write_duration),
duration_to_ms_string(&stats.write_duration), duration_to_ms_string(&stats.calc_duration),
duration_to_ms_string(&stats.calc_duration), duration_to_ms_string(&stats.read_duration),
duration_to_ms_string(&stats.read_duration), ])
])
.unwrap();
} }
} else { } else {
let mut stats = controller.bench_int(opts.calculation_steps, n, None)?; let mut stats = controller.bench_int(opts.calculation_steps, n, None)?;
@ -265,21 +314,17 @@ fn bench_task_count(opts: BenchmarkTaskCount, controller: KernelController) -> o
stats.avg(controller.bench_int(opts.calculation_steps, n, None)?) stats.avg(controller.bench_int(opts.calculation_steps, n, None)?)
} }
println!("{}\n", stats); println!("{}\n", stats);
bench_sender csv_writer.add_row(vec![
.send(vec![ "n/a".to_string(),
"n/a".to_string(), n.to_string(),
n.to_string(), opts.calculation_steps.to_string(),
opts.calculation_steps.to_string(), duration_to_ms_string(&stats.write_duration),
duration_to_ms_string(&stats.write_duration), duration_to_ms_string(&stats.calc_duration),
duration_to_ms_string(&stats.calc_duration), duration_to_ms_string(&stats.read_duration),
duration_to_ms_string(&stats.read_duration), ]);
])
.unwrap();
} }
} }
csv_writer.close();
mem::drop(bench_sender);
bench_handle.join().unwrap();
Ok(()) Ok(())
} }
@ -304,3 +349,16 @@ fn validate_primes_on_cpu(primes: &Vec<u64>) {
fn duration_to_ms_string(duration: &Duration) -> String { fn duration_to_ms_string(duration: &Duration) -> String {
format!("{}", duration.as_secs_f64() * 1000f64) format!("{}", duration.as_secs_f64() * 1000f64)
} }
/// opens a file in a buffered writer
/// if it already exists it will be recreated
fn open_write_buffered(path: &PathBuf) -> BufWriter<File> {
BufWriter::new(
OpenOptions::new()
.truncate(true)
.write(true)
.create(true)
.open(path)
.expect("Failed to open file!"),
)
}

@ -4,49 +4,51 @@
* See LICENSE for more information * See LICENSE for more information
*/ */
use crate::output::threaded::ThreadedWriter;
use std::collections::HashMap; use std::collections::HashMap;
use std::io::{Result, Write}; use std::io::Write;
pub struct CSVWriter<W: Write> { pub struct ThreadedCSVWriter {
inner: W, inner: ThreadedWriter<String>,
columns: Vec<String>, columns: Vec<String>,
} }
impl<W> CSVWriter<W> impl ThreadedCSVWriter {
where
W: Write,
{
/// Creates a new CSVWriter with a defined list of columns /// Creates a new CSVWriter with a defined list of columns
pub fn new(writer: W, columns: &[&str]) -> Result<Self> { pub fn new<W>(writer: W, columns: &[&str]) -> Self
where
W: Write + Send + Sync + 'static,
{
let column_vec = columns let column_vec = columns
.iter() .iter()
.map(|column| column.to_string()) .map(|column| column.to_string())
.collect::<Vec<String>>(); .collect::<Vec<String>>();
let writer = ThreadedWriter::new(writer, |v: String| v.as_bytes().to_vec());
let mut csv_writer = Self { let mut csv_writer = Self {
inner: writer, inner: writer,
columns: column_vec.clone(), columns: column_vec.clone(),
}; };
csv_writer.add_row(column_vec)?; csv_writer.add_row(column_vec);
Ok(csv_writer) csv_writer
} }
/// Adds a new row of values to the file /// Adds a new row of values to the file
pub fn add_row(&mut self, items: Vec<String>) -> Result<()> { pub fn add_row(&mut self, items: Vec<String>) {
self.inner.write_all( self.inner.write(
items items
.iter() .iter()
.fold("".to_string(), |a, b| format!("{},{}", a, b)) .fold("".to_string(), |a, b| format!("{},{}", a, b))
.trim_start_matches(',') .trim_start_matches(',')
.as_bytes(), .to_string()
)?; + "\n",
self.inner.write_all("\n".as_bytes()) );
} }
/// Adds a new row of values stored in a map to the file /// Adds a new row of values stored in a map to the file
#[allow(dead_code)] #[allow(dead_code)]
pub fn add_row_map(&mut self, item_map: &HashMap<String, String>) -> Result<()> { pub fn add_row_map(&mut self, item_map: &HashMap<String, String>) {
let mut items = Vec::new(); let mut items = Vec::new();
for key in &self.columns { for key in &self.columns {
items.push(item_map.get(key).cloned().unwrap_or("".to_string())); items.push(item_map.get(key).cloned().unwrap_or("".to_string()));
@ -55,7 +57,7 @@ where
self.add_row(items) self.add_row(items)
} }
pub fn flush(&mut self) -> Result<()> { pub fn close(self) {
self.inner.flush() self.inner.close()
} }
} }

@ -3,13 +3,13 @@
* Copyright (C) 2020 trivernis * Copyright (C) 2020 trivernis
* See LICENSE for more information * See LICENSE for more information
*/ */
use crate::output::csv::CSVWriter;
use std::fs::File; use std::fs::File;
use std::io::{BufWriter, Write}; use std::io::{BufWriter, Write};
use std::sync::mpsc::{channel, Sender}; use std::sync::mpsc::{channel, Sender};
use std::thread::{self, JoinHandle}; use std::thread::{self, JoinHandle};
pub mod csv; pub mod csv;
pub mod threaded;
pub fn create_prime_write_thread( pub fn create_prime_write_thread(
mut writer: BufWriter<File>, mut writer: BufWriter<File>,
@ -26,17 +26,3 @@ pub fn create_prime_write_thread(
(tx, handle) (tx, handle)
} }
pub fn create_csv_write_thread(
mut writer: CSVWriter<BufWriter<File>>,
) -> (Sender<Vec<String>>, JoinHandle<()>) {
let (tx, rx) = channel();
let handle = thread::spawn(move || {
for row in rx {
writer.add_row(row).unwrap();
}
writer.flush().unwrap();
});
(tx, handle)
}

@ -0,0 +1,45 @@
use crossbeam_channel::Sender;
use std::io::Write;
use std::mem;
use std::thread::{self, JoinHandle};
pub struct ThreadedWriter<T>
where
T: Send + Sync,
{
handle: JoinHandle<()>,
tx: Sender<T>,
}
impl<T> ThreadedWriter<T>
where
T: Send + Sync + 'static,
{
/// Creates a new threaded writer
pub fn new<W, F>(mut writer: W, serializer: F) -> Self
where
F: Fn(T) -> Vec<u8> + Send + Sync + 'static,
W: Write + Send + Sync + 'static,
{
let (tx, rx) = crossbeam_channel::bounded(1024);
let handle = thread::spawn(move || {
for value in rx {
let mut bytes = serializer(value);
writer.write_all(&mut bytes[..]).unwrap();
writer.flush().unwrap();
}
});
Self { handle, tx }
}
/// Writes a value
pub fn write(&self, value: T) {
self.tx.send(value).unwrap();
}
/// Closes the channel to the writer and waits for the writer thread to stop
pub fn close(self) {
mem::drop(self.tx);
self.handle.join().unwrap();
}
}
Loading…
Cancel
Save