diff --git a/Cargo.lock b/Cargo.lock index 4ece67e..bb8cd8b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -517,13 +517,14 @@ dependencies = [ [[package]] name = "ocl-stream" -version = "0.3.0" +version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6571c0dc580e1603bdf23e277402b8dea73c0631de6a123b796da9a27681c960" +checksum = "2cc003c0e91a8daaa706bd4231a05080d18346c97dc051955cce45de60a54ac7" dependencies = [ "crossbeam-channel 0.5.0", "num_cpus", "ocl", + "parking_lot", "thiserror", ] diff --git a/Cargo.toml b/Cargo.toml index 0db18a9..7ee7240 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,5 +12,5 @@ structopt = "0.3.20" lazy_static = "1.4.0" parking_lot = "0.11.1" rayon = "1.5.0" -ocl-stream = "0.3.0" +ocl-stream = "0.3.4" crossbeam-channel = "0.5.0" \ No newline at end of file diff --git a/src/kernel_controller/bench.rs b/src/kernel_controller/bench.rs index 4a63ad0..a1e18fd 100644 --- a/src/kernel_controller/bench.rs +++ b/src/kernel_controller/bench.rs @@ -4,14 +4,22 @@ * See LICENSE for more information */ +use crate::benching::enqueue_profiled; use crate::kernel_controller::KernelController; +use ocl_stream::executor::context::ExecutorContext; +use ocl_stream::executor::stream::OCLStream; +use ocl_stream::traits::*; +use ocl_stream::utils::result::OCLStreamResult; +use ocl_stream::utils::shared_buffer::SharedBuffer; use std::fmt::{self, Display, Formatter}; +use std::ops::Deref; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::time::{Duration, Instant}; pub struct BenchStatistics { pub calc_count: u32, - pub num_tasks: usize, - pub local_size: Option, + pub global_size: usize, + pub local_size: usize, pub write_duration: Duration, pub calc_duration: Duration, pub read_duration: Duration, @@ -23,8 +31,8 @@ impl Display for BenchStatistics { f, "Calculation Count: {}\nTask Count: {}\nLocal Size: {}\nWrite Duration: {} ms\nGPU Duration: {} ms\nRead Duration: {} ms", self.calc_count, - self.num_tasks, - self.local_size.map(|v|v.to_string()).unwrap_or("n/a".to_string()), + self.global_size, + self.local_size, self.write_duration.as_secs_f64() * 1000f64, self.calc_duration.as_secs_f64() * 1000f64, self.read_duration.as_secs_f64() * 1000f64 @@ -32,55 +40,108 @@ impl Display for BenchStatistics { } } -impl BenchStatistics { - pub fn avg(&mut self, other: Self) { - self.read_duration = (self.read_duration + other.read_duration) / 2; - self.write_duration = (self.write_duration + other.write_duration) / 2; - self.calc_duration = (self.calc_duration + other.calc_duration) / 2; +impl KernelController { + /// Benchmarks the value for the global size + pub fn bench_global_size( + &self, + local_size: usize, + global_size_start: usize, + global_size_step: usize, + global_size_stop: usize, + calc_count: u32, + repetitions: usize, + ) -> OCLStreamResult> { + let global_size = AtomicUsize::new(global_size_start); + + let stream = self.executor.execute_bounded(global_size_stop, move |ctx| { + loop { + if global_size.load(Ordering::SeqCst) > global_size_stop { + break; + } + let global_size = global_size.fetch_add(global_size_step, Ordering::SeqCst); + if global_size % local_size != 0 { + continue; + } + let input_buffer: SharedBuffer = + vec![0u32; global_size].to_shared_buffer(ctx.pro_que())?; + + for _ in 0..repetitions { + let stats = + Self::bench_int(&ctx, local_size, calc_count, input_buffer.clone())?; + ctx.sender().send(stats)?; + } + } + Ok(()) + }); + + Ok(stream) } -} -impl KernelController { - /// Benches an integer - pub fn bench_int( + /// Benchmarks the value for the local size + pub fn bench_local_size( &self, + global_size: usize, + local_size_start: usize, + local_size_step: usize, + local_size_stop: usize, calc_count: u32, - num_tasks: usize, - local_size: Option, + repetitions: usize, + ) -> OCLStreamResult> { + let input_buffer: SharedBuffer = + vec![0u32; global_size].to_shared_buffer(self.executor.pro_que())?; + let local_size = AtomicUsize::new(local_size_start); + + let stream = self.executor.execute_bounded(global_size, move |ctx| { + loop { + if local_size.load(Ordering::SeqCst) > local_size_stop { + break; + } + let local_size = local_size.fetch_add(local_size_step, Ordering::SeqCst); + if local_size > 1024 || global_size % local_size != 0 { + continue; + } + + for _ in 0..repetitions { + let stats = + Self::bench_int(&ctx, local_size, calc_count, input_buffer.clone())?; + ctx.sender().send(stats)?; + } + } + Ok(()) + }); + + Ok(stream) + } + + /// Benches an integer + fn bench_int( + ctx: &ExecutorContext, + local_size: usize, + calc_count: u32, + input_buffer: SharedBuffer, ) -> ocl::Result { + let num_tasks = input_buffer.inner().lock().len(); let write_start = Instant::now(); - let input_buffer = self - .pro_que - .buffer_builder() - .len(num_tasks) - .fill_val(0u32) - .build()?; let write_duration = write_start.elapsed(); - let mut builder = self.pro_que.kernel_builder("bench_int"); - - if let Some(local_size) = local_size { - builder.local_work_size(local_size); - } - - let kernel = builder - .arg(calc_count) - .arg(&input_buffer) + let kernel = ctx + .pro_que() + .kernel_builder("bench_int") + .local_work_size(local_size) .global_work_size(num_tasks) + .arg(calc_count) + .arg(input_buffer.inner().lock().deref()) .build()?; - let calc_start = Instant::now(); - unsafe { - kernel.enq()?; - } - self.pro_que.finish()?; - let calc_duration = calc_start.elapsed(); + + let calc_duration = enqueue_profiled(ctx.pro_que(), &kernel)?; + let mut output = vec![0u32; num_tasks]; let read_start = Instant::now(); - input_buffer.read(&mut output).enq()?; + input_buffer.read(&mut output)?; let read_duration = read_start.elapsed(); Ok(BenchStatistics { - num_tasks, + global_size: num_tasks, calc_count, local_size, read_duration, diff --git a/src/kernel_controller/primes_streamed.rs b/src/kernel_controller/primes_streamed.rs index 0a393b8..249c0c8 100644 --- a/src/kernel_controller/primes_streamed.rs +++ b/src/kernel_controller/primes_streamed.rs @@ -68,6 +68,7 @@ impl KernelController { .fill_val(0u8) .build()?; let input_buffer = numbers.to_ocl_buffer(pro_que)?; + let kernel = pro_que .kernel_builder("check_prime") .local_work_size(local_size) diff --git a/src/main.rs b/src/main.rs index 1f157cb..245766c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -11,7 +11,9 @@ use crate::output::create_prime_write_thread; use crate::output::csv::ThreadedCSVWriter; use crate::output::threaded::ThreadedWriter; -use ocl_stream::utils::result::OCLStreamResult; +use crate::kernel_controller::bench::BenchStatistics; +use ocl_stream::stream::OCLStream; +use ocl_stream::utils::result::{OCLStreamError, OCLStreamResult}; use rayon::prelude::*; use std::fs::{File, OpenOptions}; use std::io::BufWriter; @@ -33,9 +35,13 @@ enum Opts { #[structopt(name = "calculate-primes")] CalculatePrimes(CalculatePrimes), - /// Benchmarks the number of tasks used for the calculations - #[structopt(name = "bench-task-count")] - BenchmarkTaskCount(BenchmarkTaskCount), + /// Benchmarks the local size value + #[structopt(name = "bench-local-size")] + BenchLocalSize(BenchLocalSize), + + /// Benchmarks the global size (number of tasks) value + #[structopt(name = "bench-global-size")] + BenchGlobalSize(BenchGlobalSize), /// Prints GPU information Info, @@ -90,62 +96,82 @@ struct CalculatePrimes { } #[derive(StructOpt, Clone, Debug)] -struct BenchmarkTaskCount { - /// How many calculations steps should be done per GPU thread - #[structopt(long = "calculation-steps", default_value = "1000000")] - calculation_steps: u32, - - /// The initial number of tasks for the benchmark - #[structopt(long = "num-tasks-start", default_value = "1")] - num_tasks_start: usize, +struct BenchLocalSize { + #[structopt(flatten)] + bench_options: BenchOptions, /// The initial number for the local size - #[structopt(long = "local-size-start")] - local_size_start: Option, + #[structopt(long = "local-size-start", default_value = "4")] + local_size_start: usize, /// The amount the local size increases by every step - #[structopt(long = "local-size-step", default_value = "10")] + #[structopt(long = "local-size-step", default_value = "4")] local_size_step: usize, /// The maximum amount of the local size /// Can't be greater than the maximum local size of the gpu /// that can be retrieved with the info command - #[structopt(long = "local-size-stop")] - local_size_stop: Option, + #[structopt(long = "local-size-stop", default_value = "1024")] + local_size_stop: usize, /// The maximum number of tasks for the benchmark - #[structopt(long = "num-tasks-stop", default_value = "10000000")] - num_tasks_stop: usize, + #[structopt(long = "global-size", default_value = "6144")] + global_size: usize, +} + +#[derive(StructOpt, Clone, Debug)] +pub struct BenchGlobalSize { + #[structopt(flatten)] + options: BenchOptions, - /// The amount the task number increases per step - #[structopt(long = "num-tasks-step", default_value = "10")] - num_tasks_step: usize, + /// The start value for the used global size + #[structopt(long = "global-size-start", default_value = "1024")] + global_size_start: usize, - /// The average of n runs that is used instead of using one value only. - /// By default the benchmark for each step is only run once - #[structopt(long = "average-of", default_value = "1")] - average_of: usize, + /// The step value for the used global size + #[structopt(long = "global-size-step", default_value = "128")] + global_size_step: usize, + + /// The stop value for the used global size + #[structopt(long = "global-size-stop", default_value = "1048576")] + global_size_stop: usize, + + /// The maximum number of tasks for the benchmark + #[structopt(long = "local-size", default_value = "128")] + local_size: usize, +} + +#[derive(StructOpt, Clone, Debug)] +pub struct BenchOptions { + /// How many calculations steps should be done per GPU thread + #[structopt(short = "n", long = "calculation-steps", default_value = "1000000")] + calculation_steps: u32, /// The output file for timings - #[structopt(long = "bench-output", default_value = "bench.csv")] + #[structopt(short = "o", long = "bench-output", default_value = "bench.csv")] benchmark_file: PathBuf, + + /// The average of n runs that is used instead of using one value only. + /// By default the benchmark for each step is only run once + #[structopt(short = "r", long = "repetitions", default_value = "1")] + repetitions: usize, } -fn main() -> ocl::Result<()> { +fn main() -> OCLStreamResult<()> { let opts: Opts = Opts::from_args(); let controller = KernelController::new()?; match opts { - Opts::Info => controller.print_info(), + Opts::Info => controller.print_info().map_err(OCLStreamError::from), Opts::CalculatePrimes(prime_opts) => { if prime_opts.streamed { - calculate_primes_streamed(prime_opts, controller).unwrap(); - Ok(()) + calculate_primes_streamed(prime_opts, controller) } else { - calculate_primes(prime_opts, controller) + calculate_primes(prime_opts, controller).map_err(OCLStreamError::from) } } - Opts::BenchmarkTaskCount(bench_opts) => bench_task_count(bench_opts, controller), + Opts::BenchGlobalSize(bench_opts) => bench_global_size(bench_opts, controller), + Opts::BenchLocalSize(bench_opts) => bench_local_size(bench_opts, controller), } } @@ -275,65 +301,86 @@ fn calculate_primes(prime_opts: CalculatePrimes, controller: KernelController) - Ok(()) } -fn bench_task_count(opts: BenchmarkTaskCount, controller: KernelController) -> ocl::Result<()> { - let bench_writer = BufWriter::new( - OpenOptions::new() - .truncate(true) - .write(true) - .create(true) - .open(opts.benchmark_file) - .unwrap(), +/// Benchmarks the local size used for calculations +fn bench_local_size(opts: BenchLocalSize, controller: KernelController) -> OCLStreamResult<()> { + let bench_writer = open_write_buffered(&opts.bench_options.benchmark_file); + let csv_writer = ThreadedCSVWriter::new( + bench_writer, + &[ + "local_size", + "global_size", + "calc_count", + "write_duration", + "gpu_duration", + "read_duration", + ], ); - let mut csv_writer = ThreadedCSVWriter::new( + let stream = controller.bench_local_size( + opts.global_size, + opts.local_size_start, + opts.local_size_step, + opts.local_size_stop, + opts.bench_options.calculation_steps, + opts.bench_options.repetitions, + )?; + read_bench_results(opts.bench_options.calculation_steps, csv_writer, stream); + + Ok(()) +} + +/// Benchmarks the global size used for calculations +fn bench_global_size(opts: BenchGlobalSize, controller: KernelController) -> OCLStreamResult<()> { + let bench_writer = open_write_buffered(&opts.options.benchmark_file); + let csv_writer = ThreadedCSVWriter::new( bench_writer, &[ "local_size", - "num_tasks", + "global_size", "calc_count", "write_duration", "gpu_duration", "read_duration", ], ); - for n in (opts.num_tasks_start..=opts.num_tasks_stop).step_by(opts.num_tasks_step) { - if let (Some(start), Some(stop)) = (opts.local_size_start, opts.local_size_stop) { - for l in (start..=stop) - .step_by(opts.local_size_step) - .filter(|v| n % v == 0) - { - let mut stats = controller.bench_int(opts.calculation_steps, n, Some(l))?; - for _ in 1..opts.average_of { - stats.avg(controller.bench_int(opts.calculation_steps, n, Some(l))?) - } + let stream = controller.bench_global_size( + opts.local_size, + opts.global_size_start, + opts.global_size_step, + opts.global_size_stop, + opts.options.calculation_steps, + opts.options.repetitions, + )?; + read_bench_results(opts.options.calculation_steps, csv_writer, stream); + + Ok(()) +} + +/// Reads benchmark results from the stream and prints +/// them to the console +fn read_bench_results( + calculation_steps: u32, + mut csv_writer: ThreadedCSVWriter, + mut stream: OCLStream, +) { + loop { + match stream.next() { + Ok(stats) => { println!("{}\n", stats); csv_writer.add_row(vec![ - l.to_string(), - n.to_string(), - opts.calculation_steps.to_string(), + stats.local_size.to_string(), + stats.global_size.to_string(), + calculation_steps.to_string(), duration_to_ms_string(&stats.write_duration), duration_to_ms_string(&stats.calc_duration), duration_to_ms_string(&stats.read_duration), ]) } - } else { - let mut stats = controller.bench_int(opts.calculation_steps, n, None)?; - for _ in 1..opts.average_of { - stats.avg(controller.bench_int(opts.calculation_steps, n, None)?) + _ => { + break; } - println!("{}\n", stats); - csv_writer.add_row(vec![ - "n/a".to_string(), - n.to_string(), - opts.calculation_steps.to_string(), - duration_to_ms_string(&stats.write_duration), - duration_to_ms_string(&stats.calc_duration), - duration_to_ms_string(&stats.read_duration), - ]); } } csv_writer.close(); - - Ok(()) } fn validate_primes_on_cpu(primes: &Vec) {