diff --git a/Cargo.lock b/Cargo.lock index bb8cd8b..a2d3ca4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -73,6 +73,19 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "chrono" +version = "0.4.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "670ad68c9088c2a963aaa298cb369688cf3f9465ce5e2d4ca10e6e0098a1ce73" +dependencies = [ + "libc", + "num-integer", + "num-traits 0.2.14", + "time", + "winapi", +] + [[package]] name = "cl-sys" version = "0.4.2" @@ -106,6 +119,32 @@ dependencies = [ "bitflags", ] +[[package]] +name = "colored" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b3616f750b84d8f0de8a58bda93e08e2a81ad3f523089b05f1dffecab48c6cbd" +dependencies = [ + "atty", + "lazy_static", + "winapi", +] + +[[package]] +name = "console" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7cc80946b3480f421c2f17ed1cb841753a371c7c5104f51d507e13f532c856aa" +dependencies = [ + "encode_unicode", + "lazy_static", + "libc", + "regex", + "terminal_size", + "unicode-width", + "winapi", +] + [[package]] name = "const_fn" version = "0.4.3" @@ -236,6 +275,12 @@ version = "1.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e78d4f1cc4ae33bbfc157ed5d5a5ef3bc29227303d595861deb238fcec4e9457" +[[package]] +name = "encode_unicode" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a357d28ed41a50f9c765dbfe56cbc04a64e53e5fc58ba79fbc34c10ef3df831f" + [[package]] name = "enum_primitive" version = "0.1.1" @@ -267,6 +312,15 @@ dependencies = [ "synstructure", ] +[[package]] +name = "fern" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8c9a4820f0ccc8a7afd67c39a0f1a0f4b07ca1725164271a64939d7aeb9af065" +dependencies = [ + "log", +] + [[package]] name = "fuchsia-cprng" version = "0.1.1" @@ -303,6 +357,18 @@ dependencies = [ "libc", ] +[[package]] +name = "indicatif" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7baab56125e25686df467fe470785512329883aab42696d661247aca2a2896e4" +dependencies = [ + "console", + "lazy_static", + "number_prefix", + "regex", +] + [[package]] name = "instant" version = "0.1.9" @@ -333,6 +399,15 @@ dependencies = [ "scopeguard", ] +[[package]] +name = "log" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fcf3805d4480bb5b86070dcfeb9e2cb2ebc148adb753c5cca5f884d1d65a42b2" +dependencies = [ + "cfg-if 0.1.10", +] + [[package]] name = "maybe-uninit" version = "2.0.0" @@ -470,6 +545,12 @@ dependencies = [ "libc", ] +[[package]] +name = "number_prefix" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "17b02fc0ff9a9e4b35b3342880f48e896ebf69f2967921fe8646bf5b7125956a" + [[package]] name = "object" version = "0.22.0" @@ -517,11 +598,12 @@ dependencies = [ [[package]] name = "ocl-stream" -version = "0.3.4" +version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2cc003c0e91a8daaa706bd4231a05080d18346c97dc051955cce45de60a54ac7" +checksum = "7804102f4531c792a44904e2f93b7378397fdf73e552205c9ab404ab6d1bba8b" dependencies = [ "crossbeam-channel 0.5.0", + "log", "num_cpus", "ocl", "parking_lot", @@ -674,12 +756,32 @@ version = "0.1.57" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "41cc0f7e4d5d4544e8861606a285bb08d3e70712ccc7d2b84d7c0ccfaf4b05ce" +[[package]] +name = "regex" +version = "1.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9251239e129e16308e70d853559389de218ac275b515068abc96829d05b948a" +dependencies = [ + "regex-syntax", +] + +[[package]] +name = "regex-syntax" +version = "0.6.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b5eb417147ba9860a96cfe72a0b93bf88fee1744b5636ec99ab20c1aa9376581" + [[package]] name = "rust-opencl-demo" version = "0.1.0" dependencies = [ + "chrono", + "colored", "crossbeam-channel 0.5.0", + "fern", + "indicatif", "lazy_static", + "log", "ocl", "ocl-stream", "parking_lot", @@ -779,6 +881,16 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "terminal_size" +version = "0.1.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4bd2d183bd3fac5f5fe38ddbeb4dc9aec4a39a9d7d59e7491d900302da01cbe1" +dependencies = [ + "libc", + "winapi", +] + [[package]] name = "textwrap" version = "0.11.0" @@ -808,6 +920,17 @@ dependencies = [ "syn", ] +[[package]] +name = "time" +version = "0.1.44" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6db9e6914ab8b1ae1c260a4ae7a49b6c5611b40328a735b21862567685e73255" +dependencies = [ + "libc", + "wasi", + "winapi", +] + [[package]] name = "unicode-segmentation" version = "1.7.0" @@ -838,6 +961,12 @@ version = "0.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b5a972e5669d67ba988ce3dc826706fb0a8b01471c088cb0b6110b805cc36aed" +[[package]] +name = "wasi" +version = "0.10.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a143597ca7c7793eff794def352d41792a93c481eb1042423ff7ff72ba2c31f" + [[package]] name = "winapi" version = "0.3.9" diff --git a/Cargo.toml b/Cargo.toml index 7ee7240..18a3a29 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,5 +12,10 @@ structopt = "0.3.20" lazy_static = "1.4.0" parking_lot = "0.11.1" rayon = "1.5.0" -ocl-stream = "0.3.4" -crossbeam-channel = "0.5.0" \ No newline at end of file +ocl-stream = "0.3.5" +crossbeam-channel = "0.5.0" +log = "0.4.13" +fern = "0.6.0" +colored = "2.0.0" +chrono = "0.4.19" +indicatif = "0.15.0" \ No newline at end of file diff --git a/src/benching/mod.rs b/src/benching/mod.rs index 53a2e05..6c63f5d 100644 --- a/src/benching/mod.rs +++ b/src/benching/mod.rs @@ -1,3 +1,9 @@ +/* + * opencl demos with rust + * Copyright (C) 2021 trivernis + * See LICENSE for more information + */ + use ocl::core::{get_event_profiling_info, wait_for_event, ProfilingInfo}; use ocl::{EventList, Kernel, ProQue}; use std::time::Duration; @@ -7,16 +13,28 @@ pub mod result; /// Runs a benchmark on the kernel /// The ProQue needs to have profiling enabled pub fn enqueue_profiled(pro_que: &ProQue, kernel: &Kernel) -> ocl::Result { + log::trace!("Running kernel with profiling"); + log::trace!("Enqueueing start event"); let event_start = pro_que.queue().enqueue_marker::(None)?; + log::trace!("Enqueueing Kernel"); + unsafe { kernel.enq()?; } + log::trace!("Enqueueing stop event"); let event_stop = pro_que.queue().enqueue_marker::(None)?; + + log::trace!("Waiting for start event"); wait_for_event(&event_start)?; + log::trace!("Waiting for stop event"); wait_for_event(&event_stop)?; let start = get_event_profiling_info(&event_start, ProfilingInfo::End)?; let stop = get_event_profiling_info(&event_stop, ProfilingInfo::Start)?; let gpu_calc_duration = Duration::from_nanos(stop.time()? - start.time()?); + log::trace!( + "Elapsed time between start and stop: {:?}", + gpu_calc_duration + ); Ok(gpu_calc_duration) } diff --git a/src/benching/result.rs b/src/benching/result.rs index 9394e36..30007a4 100644 --- a/src/benching/result.rs +++ b/src/benching/result.rs @@ -1,3 +1,9 @@ +/* + * opencl demos with rust + * Copyright (C) 2021 trivernis + * See LICENSE for more information + */ + use std::time::Duration; /// Result of a benched kernel execution diff --git a/src/concurrency/executor.rs b/src/concurrency/executor.rs deleted file mode 100644 index 49df147..0000000 --- a/src/concurrency/executor.rs +++ /dev/null @@ -1,86 +0,0 @@ -use crate::kernel_controller::primes::PrimeCalculationResult; -use crate::kernel_controller::KernelController; -use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; -use std::sync::mpsc::Sender; -use std::sync::Arc; -use std::thread::Builder as ThreadBuilder; - -pub struct ConcurrentKernelExecutor { - kernel_controller: KernelController, -} - -impl ConcurrentKernelExecutor { - pub fn new(kernel_controller: KernelController) -> Self { - Self { kernel_controller } - } - - pub fn calculate_primes( - &self, - mut offset: u64, - numbers_per_step: usize, - local_size: Option, - stop: u64, - no_cache: bool, - num_threads: usize, - sender: Sender, - ) { - let mut handles = Vec::new(); - if offset % 2 == 0 { - offset += 1; - } - let offset = Arc::new(AtomicU64::new(offset)); - let panic = Arc::new(AtomicBool::new(false)); - - for i in 0..num_threads { - let sender = Sender::clone(&sender); - let controller = self.kernel_controller.clone(); - let offset = Arc::clone(&offset); - let panic = Arc::clone(&panic); - let local_size = local_size.clone(); - - handles.push( - ThreadBuilder::new() - .name(format!("executor-{}", i)) - .spawn(move || loop { - if panic.load(Ordering::Relaxed) { - panic!("Planned panic"); - } - if offset.load(Ordering::SeqCst) >= stop { - break; - } - let offset = - offset.fetch_add(numbers_per_step as u64 * 2, Ordering::SeqCst); - - let numbers = (offset..(numbers_per_step as u64 * 2 + offset)) - .step_by(2) - .collect::>(); - let prime_result = if no_cache { - controller - .filter_primes_simple(numbers, local_size.clone()) - .map_err(|e| { - panic.store(true, Ordering::Relaxed); - e - }) - .unwrap() - } else { - controller - .filter_primes(numbers, local_size.clone()) - .map_err(|e| { - panic.store(true, Ordering::Relaxed); - e - }) - .unwrap() - }; - if let Err(e) = sender.send(prime_result) { - panic.store(true, Ordering::Relaxed); - panic!(e); - } - }) - .unwrap(), - ); - } - for handle in handles { - handle.join().unwrap(); - } - } -} diff --git a/src/concurrency/mod.rs b/src/concurrency/mod.rs deleted file mode 100644 index 0c95fda..0000000 --- a/src/concurrency/mod.rs +++ /dev/null @@ -1 +0,0 @@ -pub mod executor; diff --git a/src/kernel_controller/bench.rs b/src/kernel_controller/bench.rs index a1e18fd..9e9b417 100644 --- a/src/kernel_controller/bench.rs +++ b/src/kernel_controller/bench.rs @@ -1,21 +1,25 @@ /* * opencl demos with rust - * Copyright (C) 2020 trivernis + * Copyright (C) 2021 trivernis * See LICENSE for more information */ -use crate::benching::enqueue_profiled; -use crate::kernel_controller::KernelController; +use std::fmt::{self, Display, Formatter}; +use std::ops::Deref; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::time::{Duration, Instant}; + use ocl_stream::executor::context::ExecutorContext; use ocl_stream::executor::stream::OCLStream; use ocl_stream::traits::*; use ocl_stream::utils::result::OCLStreamResult; use ocl_stream::utils::shared_buffer::SharedBuffer; -use std::fmt::{self, Display, Formatter}; -use std::ops::Deref; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::time::{Duration, Instant}; +use crate::benching::enqueue_profiled; +use crate::kernel_controller::KernelController; +use crate::utils::progress::get_progress_bar; + +#[derive(Clone, Debug)] pub struct BenchStatistics { pub calc_count: u32, pub global_size: usize, @@ -51,24 +55,39 @@ impl KernelController { calc_count: u32, repetitions: usize, ) -> OCLStreamResult> { + log::debug!("Benchmarking global size. Global Size: {}, Start: {}, Step: {} ,Stop: {}, Calculations: {}, Repetitions: {}", + local_size, global_size_start, global_size_step, global_size_stop, calc_count, repetitions); let global_size = AtomicUsize::new(global_size_start); + let pb = get_progress_bar( + ((global_size_stop - global_size_start) / global_size_step) as u64 * repetitions as u64, + ); let stream = self.executor.execute_bounded(global_size_stop, move |ctx| { loop { if global_size.load(Ordering::SeqCst) > global_size_stop { + log::trace!("Stop reached"); break; } let global_size = global_size.fetch_add(global_size_step, Ordering::SeqCst); + if global_size % local_size != 0 { + log::trace!("Global size not divisible by local size. Continuing"); + pb.inc(repetitions as u64); continue; } let input_buffer: SharedBuffer = vec![0u32; global_size].to_shared_buffer(ctx.pro_que())?; + log::trace!( + "Benching global size {} with {} repetitions", + global_size, + repetitions + ); for _ in 0..repetitions { let stats = Self::bench_int(&ctx, local_size, calc_count, input_buffer.clone())?; ctx.sender().send(stats)?; + pb.inc(1); } } Ok(()) @@ -87,24 +106,40 @@ impl KernelController { calc_count: u32, repetitions: usize, ) -> OCLStreamResult> { + log::debug!("Benchmarking local size. Global Size: {}, Start: {}, Step: {} ,Stop: {}, Calculations: {}, Repetitions: {}", + global_size, local_size_start, local_size_step, local_size_stop, calc_count, repetitions); + let input_buffer: SharedBuffer = vec![0u32; global_size].to_shared_buffer(self.executor.pro_que())?; let local_size = AtomicUsize::new(local_size_start); + let pb = get_progress_bar( + ((local_size_stop - local_size_start) / local_size_step) as u64 * repetitions as u64, + ); let stream = self.executor.execute_bounded(global_size, move |ctx| { loop { if local_size.load(Ordering::SeqCst) > local_size_stop { + log::trace!("Stop Reached"); break; } let local_size = local_size.fetch_add(local_size_step, Ordering::SeqCst); + if local_size > 1024 || global_size % local_size != 0 { + log::trace!("Global size not divisible by local size. Continuing"); + pb.inc(repetitions as u64); continue; } + log::trace!( + "Benching local size {} with {} repetitions", + local_size, + repetitions + ); for _ in 0..repetitions { let stats = Self::bench_int(&ctx, local_size, calc_count, input_buffer.clone())?; ctx.sender().send(stats)?; + pb.inc(1); } } Ok(()) @@ -121,9 +156,8 @@ impl KernelController { input_buffer: SharedBuffer, ) -> ocl::Result { let num_tasks = input_buffer.inner().lock().len(); - let write_start = Instant::now(); - let write_duration = write_start.elapsed(); + log::trace!("Building kernel"); let kernel = ctx .pro_que() .kernel_builder("bench_int") @@ -135,6 +169,7 @@ impl KernelController { let calc_duration = enqueue_profiled(ctx.pro_que(), &kernel)?; + log::trace!("Reading output"); let mut output = vec![0u32; num_tasks]; let read_start = Instant::now(); input_buffer.read(&mut output)?; @@ -146,7 +181,7 @@ impl KernelController { local_size, read_duration, calc_duration, - write_duration, + write_duration: Duration::from_nanos(0), }) } } diff --git a/src/kernel_controller/mod.rs b/src/kernel_controller/mod.rs index 43d5409..983d8e0 100644 --- a/src/kernel_controller/mod.rs +++ b/src/kernel_controller/mod.rs @@ -1,6 +1,6 @@ /* * opencl demos with rust - * Copyright (C) 2020 trivernis + * Copyright (C) 2021 trivernis * See LICENSE for more information */ @@ -11,7 +11,6 @@ use ocl_stream::OCLStreamExecutor; pub mod bench; pub mod primes; -pub mod primes_streamed; #[derive(Clone)] pub struct KernelController { @@ -67,6 +66,7 @@ impl KernelController { Ok(()) } + #[allow(dead_code)] fn available_memory(&self) -> ocl::Result { match self.pro_que.device().info(DeviceInfo::GlobalMemSize)? { DeviceInfoResult::GlobalMemSize(size) => Ok(size), diff --git a/src/kernel_controller/primes.rs b/src/kernel_controller/primes.rs index 44322f3..8dd6fd0 100644 --- a/src/kernel_controller/primes.rs +++ b/src/kernel_controller/primes.rs @@ -1,189 +1,172 @@ /* * opencl demos with rust - * Copyright (C) 2020 trivernis + * Copyright (C) 2021 trivernis * See LICENSE for more information */ +use crate::benching::enqueue_profiled; +use crate::benching::result::ProfiledResult; use crate::kernel_controller::KernelController; -use ocl::core::{get_event_profiling_info, wait_for_event, ProfilingInfo}; -use ocl::EventList; +use crate::utils::progress::get_progress_bar; +use ocl::ProQue; +use ocl_stream::stream::OCLStream; +use ocl_stream::traits::ToOclBuffer; use parking_lot::Mutex; use std::mem::size_of; +use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; -use std::time::{Duration, Instant}; +use std::time::Instant; -pub struct PrimeCalculationResult { - pub primes: Vec, - pub gpu_duration: Duration, - pub filter_duration: Duration, -} +const MEMORY_LIMIT: u64 = 4 * 1024 * 1024 * 1024; impl KernelController { - /// Filters all primes from the input without using a precalculated list of primes - /// for divisibility checks - pub fn filter_primes_simple( + pub fn calculate_primes( &self, - input: Vec, - local_size: Option, - ) -> ocl::Result { - let input_buffer = self.pro_que.buffer_builder().len(input.len()).build()?; - input_buffer.write(&input[..]).enq()?; - - let output_buffer = self - .pro_que - .buffer_builder() - .len(input.len()) - .fill_val(0u8) - .build()?; - - let mut builder = self.pro_que.kernel_builder("check_prime"); - if let Some(local_size) = local_size { - builder.local_work_size(local_size); + mut start: u64, + stop: u64, + step: usize, + local_size: usize, + use_cache: bool, + ) -> OCLStream>> { + if start % 2 == 0 { + start += 1; } - let kernel = builder - .arg(&input_buffer) - .arg(&output_buffer) - .global_work_size(input.len()) - .build()?; - - let start_cpu = Instant::now(); - let event_start = self.pro_que.queue().enqueue_marker::(None)?; + log::debug!( + "Calculating primes between {} and {} with {} number per step and a local size of {}", + start, + stop, + step, + local_size + ); + let offset = Arc::new(AtomicU64::new(start)); + let prime_cache = Arc::new(Mutex::new(Vec::new())); - unsafe { - kernel.enq()?; + if use_cache { + prime_cache + .lock() + .append(&mut get_primes(start + step as u64)); } - let event_stop = self.pro_que.queue().enqueue_marker::(None)?; - wait_for_event(&event_start)?; - wait_for_event(&event_stop)?; + let pb = get_progress_bar((stop - start) / (step * 2) as u64); - let start = get_event_profiling_info(&event_start, ProfilingInfo::End)?; - let stop = get_event_profiling_info(&event_stop, ProfilingInfo::Start)?; - let gpu_calc_duration = Duration::from_nanos(stop.time()? - start.time()?); - - let mut output = vec![0u8; output_buffer.len()]; - output_buffer.read(&mut output).enq()?; - println!( - "GPU Calculation: {} ms\nGPU IO + Calculation: {} ms", - gpu_calc_duration.as_secs_f64() * 1000f64, - start_cpu.elapsed().as_secs_f64() * 1000f64 - ); - - let filter_start = Instant::now(); - let primes = map_gpu_prime_result(input, output); + self.executor.execute_bounded(step * 10, move |ctx| { + loop { + let pro_que = ctx.pro_que(); + let sender = ctx.sender(); + if offset.load(Ordering::SeqCst) >= stop { + log::trace!("Stop reached."); + break; + } + let offset = offset.fetch_add(step as u64 * 2, Ordering::SeqCst); + log::trace!("Calculating {} primes beginning from {}", step, offset); + + let numbers = (offset..(step as u64 * 2 + offset)) + .step_by(2) + .collect::>(); + let result = if use_cache { + let prime_cache = Arc::clone(&prime_cache); + log::trace!("Using optimized function with cached primes"); + Self::filter_primes_cached(pro_que, numbers, local_size, prime_cache)? + } else { + log::trace!("Using normal prime calculation function"); + Self::filter_primes(pro_que, numbers, local_size)? + }; + sender.send(result)?; + pb.inc(1); + } - Ok(PrimeCalculationResult { - primes, - filter_duration: filter_start.elapsed(), - gpu_duration: gpu_calc_duration, + Ok(()) }) } - /// Filters the primes from a list of numbers by using a precalculated list of primes to check - /// for divisibility - pub fn filter_primes( - &self, - input: Vec, - local_size: Option, - ) -> ocl::Result { - lazy_static::lazy_static! {static ref PRIME_CACHE: Arc>> = Arc::new(Mutex::new(Vec::new()));} - if PRIME_CACHE.lock().len() == 0 { - PRIME_CACHE.lock().append(&mut get_primes( - (*input.iter().max().unwrap_or(&1024) as f64).sqrt().ceil() as u64, - )); - } + /// Creates the prime filter kernel and executes it + fn filter_primes( + pro_que: &ProQue, + numbers: Vec, + local_size: usize, + ) -> ocl::Result>> { + log::trace!("Creating 0u8 output buffer"); + let output_buffer = pro_que + .buffer_builder() + .len(numbers.len()) + .fill_val(0u8) + .build()?; - let prime_buffer = { - let prime_cache = PRIME_CACHE.lock(); - let prime_buffer = self - .pro_que - .buffer_builder() - .len(prime_cache.len()) - .build()?; + let input_buffer = numbers.to_ocl_buffer(pro_que)?; - prime_buffer.write(&prime_cache[..]).enq()?; + log::trace!("Building 'check_prime' kernel"); + let kernel = pro_que + .kernel_builder("check_prime") + .local_work_size(local_size) + .arg(&input_buffer) + .arg(&output_buffer) + .global_work_size(numbers.len()) + .build()?; + let duration = enqueue_profiled(pro_que, &kernel)?; - prime_buffer - }; + log::trace!("Reading output"); + let mut output = vec![0u8; output_buffer.len()]; + output_buffer.read(&mut output).enq()?; + log::trace!("Filtering primes"); + let primes = map_gpu_prime_result(numbers, output); + log::trace!("Calculated {} primes", primes.len()); - let input_buffer = self.pro_que.buffer_builder().len(input.len()).build()?; - input_buffer.write(&input[..]).enq()?; + Ok(ProfiledResult::new(duration, primes)) + } - let output_buffer = self - .pro_que + pub fn filter_primes_cached( + pro_que: &ProQue, + numbers: Vec, + local_size: usize, + prime_cache: Arc>>, + ) -> ocl::Result>> { + let prime_buffer = prime_cache.lock().to_ocl_buffer(pro_que)?; + let input_buffer = numbers.to_ocl_buffer(pro_que)?; + + log::trace!("Creating output buffer"); + let output_buffer = pro_que .buffer_builder() - .len(input.len()) + .len(numbers.len()) .fill_val(0u8) .build()?; - let mut builder = self.pro_que.kernel_builder("check_prime_cached"); - if let Some(local_size) = local_size { - builder.local_work_size(local_size); - } - let kernel = builder + log::trace!("Building 'check_prime_cached' kernel"); + let kernel = pro_que + .kernel_builder("check_prime_cached") + .local_work_size(local_size) .arg(prime_buffer.len() as u32) .arg(&prime_buffer) .arg(&input_buffer) .arg(&output_buffer) - .local_work_size(2) - .global_work_size(input.len()) + .global_work_size(numbers.len()) .build()?; - let event_start = self.pro_que.queue().enqueue_marker::(None)?; - let start_cpu = Instant::now(); - - unsafe { - kernel.enq()?; - } - let event_stop = self.pro_que.queue().enqueue_marker::(None)?; - - wait_for_event(&event_start)?; - wait_for_event(&event_stop)?; - - let start = get_event_profiling_info(&event_start, ProfilingInfo::End)?; - let stop = get_event_profiling_info(&event_stop, ProfilingInfo::Start)?; - let gpu_calc_duration = Duration::from_nanos(stop.time()? - start.time()?); + let duration = enqueue_profiled(pro_que, &kernel)?; + log::trace!("Reading output"); let mut output = vec![0u8; output_buffer.len()]; output_buffer.read(&mut output).enq()?; - println!( - "GPU Calculation: {} ms\nGPU IO + Calculation: {} ms", - gpu_calc_duration.as_secs_f64() * 1000f64, - start_cpu.elapsed().as_secs_f64() * 1000f64 - ); - - let prime_filter_start = Instant::now(); - let primes = map_gpu_prime_result(input, output); - let filter_duration = prime_filter_start.elapsed(); + log::trace!("Mapping prime result"); + let primes = map_gpu_prime_result(numbers, output); + log::trace!("Calculated {} primes", primes.len()); - let prime_calc_start = Instant::now(); - let mut prime_cache = PRIME_CACHE.lock(); + let mut prime_cache = prime_cache.lock(); - if (prime_cache.len() + primes.len()) * size_of::() - < self.available_memory()? as usize / 4 - { + log::trace!("Updating prime cache"); + if (prime_cache.len() + primes.len()) * size_of::() < MEMORY_LIMIT as usize / 4 { prime_cache.append(&mut primes.clone()); prime_cache.sort(); prime_cache.dedup(); } - let cache_duration = prime_calc_start.elapsed(); - println!( - "Prime caching took: {} ms, size: {}", - cache_duration.as_secs_f64() * 1000f64, - prime_cache.len(), - ); - Ok(PrimeCalculationResult { - primes, - gpu_duration: gpu_calc_duration, - filter_duration, - }) + Ok(ProfiledResult::new(duration, primes)) } } /// Returns a list of prime numbers that can be used to speed up the divisibility check -pub fn get_primes(max_number: u64) -> Vec { +fn get_primes(max_number: u64) -> Vec { + log::trace!("Calculating primes until {} on the cpu", max_number); let start = Instant::now(); let mut primes = Vec::with_capacity((max_number as f64).sqrt() as usize); let mut num = 1; @@ -217,7 +200,7 @@ pub fn get_primes(max_number: u64) -> Vec { } num += 2; } - println!( + log::trace!( "Generated {} primes on the cpu in {} ms", primes.len(), start.elapsed().as_secs_f64() * 1000f64, diff --git a/src/kernel_controller/primes_streamed.rs b/src/kernel_controller/primes_streamed.rs deleted file mode 100644 index 249c0c8..0000000 --- a/src/kernel_controller/primes_streamed.rs +++ /dev/null @@ -1,130 +0,0 @@ -use crate::benching::enqueue_profiled; -use crate::benching::result::ProfiledResult; -use crate::kernel_controller::primes::{get_primes, map_gpu_prime_result}; -use crate::kernel_controller::KernelController; -use ocl::ProQue; -use ocl_stream::stream::OCLStream; -use ocl_stream::traits::ToOclBuffer; -use parking_lot::Mutex; -use std::mem::size_of; -use std::sync::atomic::{AtomicU64, Ordering}; -use std::sync::Arc; - -const MEMORY_LIMIT: u64 = 4 * 1024 * 1024 * 1024; - -impl KernelController { - pub fn get_primes( - &self, - mut start: u64, - stop: u64, - step: usize, - local_size: usize, - use_cache: bool, - ) -> OCLStream>> { - if start % 2 == 0 { - start += 1; - } - let offset = Arc::new(AtomicU64::new(start)); - let prime_cache = Arc::new(Mutex::new(Vec::new())); - if use_cache { - prime_cache - .lock() - .append(&mut get_primes(start + step as u64)); - } - - self.executor.execute_bounded(step * 10, move |ctx| { - loop { - let pro_que = ctx.pro_que(); - let sender = ctx.sender(); - if offset.load(Ordering::SeqCst) >= stop { - break; - } - let offset = offset.fetch_add(step as u64 * 2, Ordering::SeqCst); - let numbers = (offset..(step as u64 * 2 + offset)) - .step_by(2) - .collect::>(); - let result = if use_cache { - let prime_cache = Arc::clone(&prime_cache); - Self::filter_primes_streamed_cached(pro_que, numbers, local_size, prime_cache)? - } else { - Self::filter_primes_streamed(pro_que, numbers, local_size)? - }; - sender.send(result)?; - } - - Ok(()) - }) - } - - /// Creates the prime filter kernel and executes it - fn filter_primes_streamed( - pro_que: &ProQue, - numbers: Vec, - local_size: usize, - ) -> ocl::Result>> { - let output_buffer = pro_que - .buffer_builder() - .len(numbers.len()) - .fill_val(0u8) - .build()?; - let input_buffer = numbers.to_ocl_buffer(pro_que)?; - - let kernel = pro_que - .kernel_builder("check_prime") - .local_work_size(local_size) - .arg(&input_buffer) - .arg(&output_buffer) - .global_work_size(numbers.len()) - .build()?; - let duration = enqueue_profiled(pro_que, &kernel)?; - - let mut output = vec![0u8; output_buffer.len()]; - output_buffer.read(&mut output).enq()?; - let primes = map_gpu_prime_result(numbers, output); - - Ok(ProfiledResult::new(duration, primes)) - } - - pub fn filter_primes_streamed_cached( - pro_que: &ProQue, - numbers: Vec, - local_size: usize, - prime_cache: Arc>>, - ) -> ocl::Result>> { - let prime_buffer = prime_cache.lock().to_ocl_buffer(pro_que)?; - let input_buffer = numbers.to_ocl_buffer(pro_que)?; - - let output_buffer = pro_que - .buffer_builder() - .len(numbers.len()) - .fill_val(0u8) - .build()?; - - let kernel = pro_que - .kernel_builder("check_prime_cached") - .local_work_size(local_size) - .arg(prime_buffer.len() as u32) - .arg(&prime_buffer) - .arg(&input_buffer) - .arg(&output_buffer) - .global_work_size(numbers.len()) - .build()?; - - let duration = enqueue_profiled(pro_que, &kernel)?; - - let mut output = vec![0u8; output_buffer.len()]; - output_buffer.read(&mut output).enq()?; - - let primes = map_gpu_prime_result(numbers, output); - - let mut prime_cache = prime_cache.lock(); - - if (prime_cache.len() + primes.len()) * size_of::() < MEMORY_LIMIT as usize / 4 { - prime_cache.append(&mut primes.clone()); - prime_cache.sort(); - prime_cache.dedup(); - } - - Ok(ProfiledResult::new(duration, primes)) - } -} diff --git a/src/main.rs b/src/main.rs index 245766c..516af90 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,32 +1,29 @@ /* * opencl demos with rust - * Copyright (C) 2020 trivernis + * Copyright (C) 2021 trivernis * See LICENSE for more information */ -use crate::concurrency::executor::ConcurrentKernelExecutor; use crate::kernel_controller::primes::is_prime; use crate::kernel_controller::KernelController; -use crate::output::create_prime_write_thread; use crate::output::csv::ThreadedCSVWriter; use crate::output::threaded::ThreadedWriter; use crate::kernel_controller::bench::BenchStatistics; +use crate::utils::logging::init_logger; use ocl_stream::stream::OCLStream; use ocl_stream::utils::result::{OCLStreamError, OCLStreamResult}; use rayon::prelude::*; use std::fs::{File, OpenOptions}; use std::io::BufWriter; -use std::mem; use std::path::PathBuf; -use std::sync::mpsc::channel; use std::time::Duration; use structopt::StructOpt; mod benching; -mod concurrency; mod kernel_controller; mod output; +mod utils; #[derive(StructOpt, Clone, Debug)] #[structopt()] @@ -89,10 +86,6 @@ struct CalculatePrimes { /// number of used threads #[structopt(short = "p", long = "parallel", default_value = "2")] num_threads: usize, - - /// if the result should be streamed - #[structopt(long = "streamed")] - streamed: bool, } #[derive(StructOpt, Clone, Debug)] @@ -160,22 +153,18 @@ pub struct BenchOptions { fn main() -> OCLStreamResult<()> { let opts: Opts = Opts::from_args(); let controller = KernelController::new()?; + init_logger(); match opts { Opts::Info => controller.print_info().map_err(OCLStreamError::from), - Opts::CalculatePrimes(prime_opts) => { - if prime_opts.streamed { - calculate_primes_streamed(prime_opts, controller) - } else { - calculate_primes(prime_opts, controller).map_err(OCLStreamError::from) - } - } + Opts::CalculatePrimes(prime_opts) => calculate_primes(prime_opts, controller), Opts::BenchGlobalSize(bench_opts) => bench_global_size(bench_opts, controller), Opts::BenchLocalSize(bench_opts) => bench_local_size(bench_opts, controller), } } -fn calculate_primes_streamed( +/// Calculates primes on the GPU +fn calculate_primes( prime_opts: CalculatePrimes, mut controller: KernelController, ) -> OCLStreamResult<()> { @@ -192,7 +181,7 @@ fn calculate_primes_streamed( .into_bytes() }); - let mut stream = controller.get_primes( + let mut stream = controller.calculate_primes( prime_opts.start_offset, prime_opts.max_number, prime_opts.numbers_per_step, @@ -205,7 +194,7 @@ fn calculate_primes_streamed( validate_primes_on_cpu(primes); } let first = *primes.first().unwrap(); // if there's none, rip - println!( + log::debug!( "Calculated {} primes in {:?}, offset: {}", primes.len(), r.gpu_duration(), @@ -224,83 +213,6 @@ fn calculate_primes_streamed( Ok(()) } -/// Calculates Prime numbers with GPU acceleration -fn calculate_primes(prime_opts: CalculatePrimes, controller: KernelController) -> ocl::Result<()> { - let output = BufWriter::new( - OpenOptions::new() - .create(true) - .append(true) - .open(&prime_opts.output_file) - .unwrap(), - ); - let timings = BufWriter::new( - OpenOptions::new() - .create(true) - .truncate(true) - .write(true) - .open(&prime_opts.timings_file) - .unwrap(), - ); - let mut csv_writer = ThreadedCSVWriter::new( - timings, - &["offset", "count", "gpu_duration", "filter_duration"], - ); - - let (prime_sender, prime_handle) = create_prime_write_thread(output); - - let mut offset = prime_opts.start_offset; - if offset % 2 == 0 { - offset += 1; - } - if offset < 2 { - prime_sender.send(vec![2]).unwrap(); - } - let executor = ConcurrentKernelExecutor::new(controller); - let (tx, rx) = channel(); - - let executor_thread = std::thread::spawn({ - let prime_opts = prime_opts.clone(); - move || { - executor.calculate_primes( - prime_opts.start_offset, - prime_opts.numbers_per_step, - prime_opts.local_size, - prime_opts.max_number, - prime_opts.no_cache, - prime_opts.num_threads, - tx, - ) - } - }); - for prime_result in rx { - let offset = prime_result.primes.last().cloned().unwrap(); - let primes = prime_result.primes; - println!( - "Calculated {} primes: {:.4} checks/s, offset: {}", - primes.len(), - prime_opts.numbers_per_step as f64 / prime_result.gpu_duration.as_secs_f64(), - offset, - ); - csv_writer.add_row(vec![ - offset.to_string(), - primes.len().to_string(), - duration_to_ms_string(&prime_result.gpu_duration), - duration_to_ms_string(&prime_result.filter_duration), - ]); - if prime_opts.cpu_validate { - validate_primes_on_cpu(&primes) - } - prime_sender.send(primes).unwrap(); - } - - mem::drop(prime_sender); - prime_handle.join().unwrap(); - csv_writer.close(); - executor_thread.join().unwrap(); - - Ok(()) -} - /// Benchmarks the local size used for calculations fn bench_local_size(opts: BenchLocalSize, controller: KernelController) -> OCLStreamResult<()> { let bench_writer = open_write_buffered(&opts.bench_options.benchmark_file); @@ -365,7 +277,7 @@ fn read_bench_results( loop { match stream.next() { Ok(stats) => { - println!("{}\n", stats); + log::debug!("{:?}", stats); csv_writer.add_row(vec![ stats.local_size.to_string(), stats.global_size.to_string(), @@ -384,7 +296,7 @@ fn read_bench_results( } fn validate_primes_on_cpu(primes: &Vec) { - println!("Validating..."); + log::debug!("Validating primes on the cpu"); let failures = primes .par_iter() .filter(|n| !is_prime(**n)) @@ -396,7 +308,7 @@ fn validate_primes_on_cpu(primes: &Vec) { failures ); } else { - println!("No failures found."); + log::debug!("No failures found."); } } diff --git a/src/output/csv.rs b/src/output/csv.rs index a6023f7..4926fff 100644 --- a/src/output/csv.rs +++ b/src/output/csv.rs @@ -1,6 +1,6 @@ /* * opencl demos with rust - * Copyright (C) 2020 trivernis + * Copyright (C) 2021 trivernis * See LICENSE for more information */ @@ -23,6 +23,7 @@ impl ThreadedCSVWriter { .iter() .map(|column| column.to_string()) .collect::>(); + log::trace!("Creating new CSV Writer with columns: {:?}", column_vec); let writer = ThreadedWriter::new(writer, |v: String| v.as_bytes().to_vec()); let mut csv_writer = Self { @@ -36,6 +37,7 @@ impl ThreadedCSVWriter { /// Adds a new row of values to the file pub fn add_row(&mut self, items: Vec) { + log::trace!("Adding row to CSV: {:?}", items); self.inner.write( items .iter() diff --git a/src/output/mod.rs b/src/output/mod.rs index 86b2b18..e924be8 100644 --- a/src/output/mod.rs +++ b/src/output/mod.rs @@ -1,28 +1,7 @@ /* * opencl demos with rust - * Copyright (C) 2020 trivernis + * Copyright (C) 2021 trivernis * See LICENSE for more information */ -use std::fs::File; -use std::io::{BufWriter, Write}; -use std::sync::mpsc::{channel, Sender}; -use std::thread::{self, JoinHandle}; - pub mod csv; pub mod threaded; - -pub fn create_prime_write_thread( - mut writer: BufWriter, -) -> (Sender>, JoinHandle<()>) { - let (tx, rx) = channel(); - let handle = thread::spawn(move || { - for primes in rx { - for prime in primes { - writer.write_all(format!("{}\n", prime).as_bytes()).unwrap(); - } - writer.flush().unwrap(); - } - }); - - (tx, handle) -} diff --git a/src/output/threaded.rs b/src/output/threaded.rs index 2f6a643..50b977d 100644 --- a/src/output/threaded.rs +++ b/src/output/threaded.rs @@ -1,3 +1,9 @@ +/* + * opencl demos with rust + * Copyright (C) 2021 trivernis + * See LICENSE for more information + */ + use crossbeam_channel::Sender; use std::io::Write; use std::mem; @@ -21,24 +27,33 @@ where F: Fn(T) -> Vec + Send + Sync + 'static, W: Write + Send + Sync + 'static, { + log::trace!("Creating new threaded writer"); let (tx, rx) = crossbeam_channel::bounded(1024); - let handle = thread::spawn(move || { - for value in rx { - let mut bytes = serializer(value); - writer.write_all(&mut bytes[..]).unwrap(); - writer.flush().unwrap(); - } - }); + let handle = thread::Builder::new() + .name("io-thread".to_string()) + .spawn(move || { + log::trace!("Writing thread running"); + + for value in rx { + log::trace!("Writing received value"); + let mut bytes = serializer(value); + writer.write_all(&mut bytes[..]).unwrap(); + writer.flush().unwrap(); + } + }) + .unwrap(); Self { handle, tx } } /// Writes a value pub fn write(&self, value: T) { + log::trace!("Writing into threaded writer"); self.tx.send(value).unwrap(); } /// Closes the channel to the writer and waits for the writer thread to stop pub fn close(self) { + log::trace!("Closing file and shutting down thread"); mem::drop(self.tx); self.handle.join().unwrap(); } diff --git a/src/utils/logging.rs b/src/utils/logging.rs new file mode 100644 index 0000000..7010355 --- /dev/null +++ b/src/utils/logging.rs @@ -0,0 +1,62 @@ +/* + * opencl demos with rust + * Copyright (C) 2021 trivernis + * See LICENSE for more information + */ + +use chrono::Local; +use colored::*; +use log::{Level, LevelFilter}; +use std::str::FromStr; +use std::thread; + +/// Initializes the env_logger with a custom format +/// that also logs the thread names +pub fn init_logger() { + fern::Dispatch::new() + .format(|out, message, record| { + let color = get_level_style(record.level()); + let mut thread_name = format!( + "thread::{}", + thread::current().name().unwrap_or("main").to_string() + ); + thread_name.truncate(34); + let mut target = record.target().to_string(); + target.truncate(39); + + out.finish(format_args!( + "{:<20} {:<40}| {} {}: {}", + thread_name.dimmed(), + target.dimmed().italic(), + Local::now().format("%Y-%m-%dT%H:%M:%S.%f"), + record + .level() + .to_string() + .to_lowercase() + .as_str() + .color(color), + message + )) + }) + .level( + log::LevelFilter::from_str( + std::env::var("RUST_LOG") + .unwrap_or("info".to_string()) + .as_str(), + ) + .unwrap_or(LevelFilter::Info), + ) + .chain(std::io::stdout()) + .apply() + .expect("failed to init logger"); +} + +fn get_level_style(level: Level) -> colored::Color { + match level { + Level::Trace => colored::Color::Magenta, + Level::Debug => colored::Color::Blue, + Level::Info => colored::Color::Green, + Level::Warn => colored::Color::Yellow, + Level::Error => colored::Color::Red, + } +} diff --git a/src/utils/mod.rs b/src/utils/mod.rs new file mode 100644 index 0000000..3aaad85 --- /dev/null +++ b/src/utils/mod.rs @@ -0,0 +1,7 @@ +/* + * opencl demos with rust + * Copyright (C) 2021 trivernis + * See LICENSE for more information + */ +pub mod logging; +pub mod progress; diff --git a/src/utils/progress.rs b/src/utils/progress.rs new file mode 100644 index 0000000..58f65a6 --- /dev/null +++ b/src/utils/progress.rs @@ -0,0 +1,22 @@ +/* + * opencl demos with rust + * Copyright (C) 2021 trivernis + * See LICENSE for more information + */ + +use indicatif::{ProgressBar, ProgressStyle}; +use log::LevelFilter; + +pub fn get_progress_bar(size: u64) -> ProgressBar { + if log::max_level() == LevelFilter::Info { + let bar = ProgressBar::new(size); + bar.set_style( + ProgressStyle::default_bar() + .template("[ETA:{eta}] {bar:60.cyan/blue} {pos:>7}/{len:7} {msg}") + .progress_chars("#>-"), + ); + bar + } else { + ProgressBar::hidden() + } +}