Add progress bar and detailed logging

Signed-off-by: Trivernis <trivernis@protonmail.com>
main
Trivernis 3 years ago
parent 5659ee2923
commit 189e3788c2
No known key found for this signature in database
GPG Key ID: EB543D89E02BC83F

133
Cargo.lock generated

@ -73,6 +73,19 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "chrono"
version = "0.4.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "670ad68c9088c2a963aaa298cb369688cf3f9465ce5e2d4ca10e6e0098a1ce73"
dependencies = [
"libc",
"num-integer",
"num-traits 0.2.14",
"time",
"winapi",
]
[[package]]
name = "cl-sys"
version = "0.4.2"
@ -106,6 +119,32 @@ dependencies = [
"bitflags",
]
[[package]]
name = "colored"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b3616f750b84d8f0de8a58bda93e08e2a81ad3f523089b05f1dffecab48c6cbd"
dependencies = [
"atty",
"lazy_static",
"winapi",
]
[[package]]
name = "console"
version = "0.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7cc80946b3480f421c2f17ed1cb841753a371c7c5104f51d507e13f532c856aa"
dependencies = [
"encode_unicode",
"lazy_static",
"libc",
"regex",
"terminal_size",
"unicode-width",
"winapi",
]
[[package]]
name = "const_fn"
version = "0.4.3"
@ -236,6 +275,12 @@ version = "1.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e78d4f1cc4ae33bbfc157ed5d5a5ef3bc29227303d595861deb238fcec4e9457"
[[package]]
name = "encode_unicode"
version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a357d28ed41a50f9c765dbfe56cbc04a64e53e5fc58ba79fbc34c10ef3df831f"
[[package]]
name = "enum_primitive"
version = "0.1.1"
@ -267,6 +312,15 @@ dependencies = [
"synstructure",
]
[[package]]
name = "fern"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8c9a4820f0ccc8a7afd67c39a0f1a0f4b07ca1725164271a64939d7aeb9af065"
dependencies = [
"log",
]
[[package]]
name = "fuchsia-cprng"
version = "0.1.1"
@ -303,6 +357,18 @@ dependencies = [
"libc",
]
[[package]]
name = "indicatif"
version = "0.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7baab56125e25686df467fe470785512329883aab42696d661247aca2a2896e4"
dependencies = [
"console",
"lazy_static",
"number_prefix",
"regex",
]
[[package]]
name = "instant"
version = "0.1.9"
@ -333,6 +399,15 @@ dependencies = [
"scopeguard",
]
[[package]]
name = "log"
version = "0.4.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fcf3805d4480bb5b86070dcfeb9e2cb2ebc148adb753c5cca5f884d1d65a42b2"
dependencies = [
"cfg-if 0.1.10",
]
[[package]]
name = "maybe-uninit"
version = "2.0.0"
@ -470,6 +545,12 @@ dependencies = [
"libc",
]
[[package]]
name = "number_prefix"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "17b02fc0ff9a9e4b35b3342880f48e896ebf69f2967921fe8646bf5b7125956a"
[[package]]
name = "object"
version = "0.22.0"
@ -517,11 +598,12 @@ dependencies = [
[[package]]
name = "ocl-stream"
version = "0.3.4"
version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2cc003c0e91a8daaa706bd4231a05080d18346c97dc051955cce45de60a54ac7"
checksum = "7804102f4531c792a44904e2f93b7378397fdf73e552205c9ab404ab6d1bba8b"
dependencies = [
"crossbeam-channel 0.5.0",
"log",
"num_cpus",
"ocl",
"parking_lot",
@ -674,12 +756,32 @@ version = "0.1.57"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "41cc0f7e4d5d4544e8861606a285bb08d3e70712ccc7d2b84d7c0ccfaf4b05ce"
[[package]]
name = "regex"
version = "1.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d9251239e129e16308e70d853559389de218ac275b515068abc96829d05b948a"
dependencies = [
"regex-syntax",
]
[[package]]
name = "regex-syntax"
version = "0.6.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b5eb417147ba9860a96cfe72a0b93bf88fee1744b5636ec99ab20c1aa9376581"
[[package]]
name = "rust-opencl-demo"
version = "0.1.0"
dependencies = [
"chrono",
"colored",
"crossbeam-channel 0.5.0",
"fern",
"indicatif",
"lazy_static",
"log",
"ocl",
"ocl-stream",
"parking_lot",
@ -779,6 +881,16 @@ dependencies = [
"unicode-xid",
]
[[package]]
name = "terminal_size"
version = "0.1.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4bd2d183bd3fac5f5fe38ddbeb4dc9aec4a39a9d7d59e7491d900302da01cbe1"
dependencies = [
"libc",
"winapi",
]
[[package]]
name = "textwrap"
version = "0.11.0"
@ -808,6 +920,17 @@ dependencies = [
"syn",
]
[[package]]
name = "time"
version = "0.1.44"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6db9e6914ab8b1ae1c260a4ae7a49b6c5611b40328a735b21862567685e73255"
dependencies = [
"libc",
"wasi",
"winapi",
]
[[package]]
name = "unicode-segmentation"
version = "1.7.0"
@ -838,6 +961,12 @@ version = "0.9.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b5a972e5669d67ba988ce3dc826706fb0a8b01471c088cb0b6110b805cc36aed"
[[package]]
name = "wasi"
version = "0.10.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1a143597ca7c7793eff794def352d41792a93c481eb1042423ff7ff72ba2c31f"
[[package]]
name = "winapi"
version = "0.3.9"

@ -12,5 +12,10 @@ structopt = "0.3.20"
lazy_static = "1.4.0"
parking_lot = "0.11.1"
rayon = "1.5.0"
ocl-stream = "0.3.4"
crossbeam-channel = "0.5.0"
ocl-stream = "0.3.5"
crossbeam-channel = "0.5.0"
log = "0.4.13"
fern = "0.6.0"
colored = "2.0.0"
chrono = "0.4.19"
indicatif = "0.15.0"

@ -1,3 +1,9 @@
/*
* opencl demos with rust
* Copyright (C) 2021 trivernis
* See LICENSE for more information
*/
use ocl::core::{get_event_profiling_info, wait_for_event, ProfilingInfo};
use ocl::{EventList, Kernel, ProQue};
use std::time::Duration;
@ -7,16 +13,28 @@ pub mod result;
/// Runs a benchmark on the kernel
/// The ProQue needs to have profiling enabled
pub fn enqueue_profiled(pro_que: &ProQue, kernel: &Kernel) -> ocl::Result<Duration> {
log::trace!("Running kernel with profiling");
log::trace!("Enqueueing start event");
let event_start = pro_que.queue().enqueue_marker::<EventList>(None)?;
log::trace!("Enqueueing Kernel");
unsafe {
kernel.enq()?;
}
log::trace!("Enqueueing stop event");
let event_stop = pro_que.queue().enqueue_marker::<EventList>(None)?;
log::trace!("Waiting for start event");
wait_for_event(&event_start)?;
log::trace!("Waiting for stop event");
wait_for_event(&event_stop)?;
let start = get_event_profiling_info(&event_start, ProfilingInfo::End)?;
let stop = get_event_profiling_info(&event_stop, ProfilingInfo::Start)?;
let gpu_calc_duration = Duration::from_nanos(stop.time()? - start.time()?);
log::trace!(
"Elapsed time between start and stop: {:?}",
gpu_calc_duration
);
Ok(gpu_calc_duration)
}

@ -1,3 +1,9 @@
/*
* opencl demos with rust
* Copyright (C) 2021 trivernis
* See LICENSE for more information
*/
use std::time::Duration;
/// Result of a benched kernel execution

@ -1,86 +0,0 @@
use crate::kernel_controller::primes::PrimeCalculationResult;
use crate::kernel_controller::KernelController;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::mpsc::Sender;
use std::sync::Arc;
use std::thread::Builder as ThreadBuilder;
pub struct ConcurrentKernelExecutor {
kernel_controller: KernelController,
}
impl ConcurrentKernelExecutor {
pub fn new(kernel_controller: KernelController) -> Self {
Self { kernel_controller }
}
pub fn calculate_primes(
&self,
mut offset: u64,
numbers_per_step: usize,
local_size: Option<usize>,
stop: u64,
no_cache: bool,
num_threads: usize,
sender: Sender<PrimeCalculationResult>,
) {
let mut handles = Vec::new();
if offset % 2 == 0 {
offset += 1;
}
let offset = Arc::new(AtomicU64::new(offset));
let panic = Arc::new(AtomicBool::new(false));
for i in 0..num_threads {
let sender = Sender::clone(&sender);
let controller = self.kernel_controller.clone();
let offset = Arc::clone(&offset);
let panic = Arc::clone(&panic);
let local_size = local_size.clone();
handles.push(
ThreadBuilder::new()
.name(format!("executor-{}", i))
.spawn(move || loop {
if panic.load(Ordering::Relaxed) {
panic!("Planned panic");
}
if offset.load(Ordering::SeqCst) >= stop {
break;
}
let offset =
offset.fetch_add(numbers_per_step as u64 * 2, Ordering::SeqCst);
let numbers = (offset..(numbers_per_step as u64 * 2 + offset))
.step_by(2)
.collect::<Vec<u64>>();
let prime_result = if no_cache {
controller
.filter_primes_simple(numbers, local_size.clone())
.map_err(|e| {
panic.store(true, Ordering::Relaxed);
e
})
.unwrap()
} else {
controller
.filter_primes(numbers, local_size.clone())
.map_err(|e| {
panic.store(true, Ordering::Relaxed);
e
})
.unwrap()
};
if let Err(e) = sender.send(prime_result) {
panic.store(true, Ordering::Relaxed);
panic!(e);
}
})
.unwrap(),
);
}
for handle in handles {
handle.join().unwrap();
}
}
}

@ -1 +0,0 @@
pub mod executor;

@ -1,21 +1,25 @@
/*
* opencl demos with rust
* Copyright (C) 2020 trivernis
* Copyright (C) 2021 trivernis
* See LICENSE for more information
*/
use crate::benching::enqueue_profiled;
use crate::kernel_controller::KernelController;
use std::fmt::{self, Display, Formatter};
use std::ops::Deref;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::{Duration, Instant};
use ocl_stream::executor::context::ExecutorContext;
use ocl_stream::executor::stream::OCLStream;
use ocl_stream::traits::*;
use ocl_stream::utils::result::OCLStreamResult;
use ocl_stream::utils::shared_buffer::SharedBuffer;
use std::fmt::{self, Display, Formatter};
use std::ops::Deref;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::{Duration, Instant};
use crate::benching::enqueue_profiled;
use crate::kernel_controller::KernelController;
use crate::utils::progress::get_progress_bar;
#[derive(Clone, Debug)]
pub struct BenchStatistics {
pub calc_count: u32,
pub global_size: usize,
@ -51,24 +55,39 @@ impl KernelController {
calc_count: u32,
repetitions: usize,
) -> OCLStreamResult<OCLStream<BenchStatistics>> {
log::debug!("Benchmarking global size. Global Size: {}, Start: {}, Step: {} ,Stop: {}, Calculations: {}, Repetitions: {}",
local_size, global_size_start, global_size_step, global_size_stop, calc_count, repetitions);
let global_size = AtomicUsize::new(global_size_start);
let pb = get_progress_bar(
((global_size_stop - global_size_start) / global_size_step) as u64 * repetitions as u64,
);
let stream = self.executor.execute_bounded(global_size_stop, move |ctx| {
loop {
if global_size.load(Ordering::SeqCst) > global_size_stop {
log::trace!("Stop reached");
break;
}
let global_size = global_size.fetch_add(global_size_step, Ordering::SeqCst);
if global_size % local_size != 0 {
log::trace!("Global size not divisible by local size. Continuing");
pb.inc(repetitions as u64);
continue;
}
let input_buffer: SharedBuffer<u32> =
vec![0u32; global_size].to_shared_buffer(ctx.pro_que())?;
log::trace!(
"Benching global size {} with {} repetitions",
global_size,
repetitions
);
for _ in 0..repetitions {
let stats =
Self::bench_int(&ctx, local_size, calc_count, input_buffer.clone())?;
ctx.sender().send(stats)?;
pb.inc(1);
}
}
Ok(())
@ -87,24 +106,40 @@ impl KernelController {
calc_count: u32,
repetitions: usize,
) -> OCLStreamResult<OCLStream<BenchStatistics>> {
log::debug!("Benchmarking local size. Global Size: {}, Start: {}, Step: {} ,Stop: {}, Calculations: {}, Repetitions: {}",
global_size, local_size_start, local_size_step, local_size_stop, calc_count, repetitions);
let input_buffer: SharedBuffer<u32> =
vec![0u32; global_size].to_shared_buffer(self.executor.pro_que())?;
let local_size = AtomicUsize::new(local_size_start);
let pb = get_progress_bar(
((local_size_stop - local_size_start) / local_size_step) as u64 * repetitions as u64,
);
let stream = self.executor.execute_bounded(global_size, move |ctx| {
loop {
if local_size.load(Ordering::SeqCst) > local_size_stop {
log::trace!("Stop Reached");
break;
}
let local_size = local_size.fetch_add(local_size_step, Ordering::SeqCst);
if local_size > 1024 || global_size % local_size != 0 {
log::trace!("Global size not divisible by local size. Continuing");
pb.inc(repetitions as u64);
continue;
}
log::trace!(
"Benching local size {} with {} repetitions",
local_size,
repetitions
);
for _ in 0..repetitions {
let stats =
Self::bench_int(&ctx, local_size, calc_count, input_buffer.clone())?;
ctx.sender().send(stats)?;
pb.inc(1);
}
}
Ok(())
@ -121,9 +156,8 @@ impl KernelController {
input_buffer: SharedBuffer<u32>,
) -> ocl::Result<BenchStatistics> {
let num_tasks = input_buffer.inner().lock().len();
let write_start = Instant::now();
let write_duration = write_start.elapsed();
log::trace!("Building kernel");
let kernel = ctx
.pro_que()
.kernel_builder("bench_int")
@ -135,6 +169,7 @@ impl KernelController {
let calc_duration = enqueue_profiled(ctx.pro_que(), &kernel)?;
log::trace!("Reading output");
let mut output = vec![0u32; num_tasks];
let read_start = Instant::now();
input_buffer.read(&mut output)?;
@ -146,7 +181,7 @@ impl KernelController {
local_size,
read_duration,
calc_duration,
write_duration,
write_duration: Duration::from_nanos(0),
})
}
}

@ -1,6 +1,6 @@
/*
* opencl demos with rust
* Copyright (C) 2020 trivernis
* Copyright (C) 2021 trivernis
* See LICENSE for more information
*/
@ -11,7 +11,6 @@ use ocl_stream::OCLStreamExecutor;
pub mod bench;
pub mod primes;
pub mod primes_streamed;
#[derive(Clone)]
pub struct KernelController {
@ -67,6 +66,7 @@ impl KernelController {
Ok(())
}
#[allow(dead_code)]
fn available_memory(&self) -> ocl::Result<u64> {
match self.pro_que.device().info(DeviceInfo::GlobalMemSize)? {
DeviceInfoResult::GlobalMemSize(size) => Ok(size),

@ -1,189 +1,172 @@
/*
* opencl demos with rust
* Copyright (C) 2020 trivernis
* Copyright (C) 2021 trivernis
* See LICENSE for more information
*/
use crate::benching::enqueue_profiled;
use crate::benching::result::ProfiledResult;
use crate::kernel_controller::KernelController;
use ocl::core::{get_event_profiling_info, wait_for_event, ProfilingInfo};
use ocl::EventList;
use crate::utils::progress::get_progress_bar;
use ocl::ProQue;
use ocl_stream::stream::OCLStream;
use ocl_stream::traits::ToOclBuffer;
use parking_lot::Mutex;
use std::mem::size_of;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::time::Instant;
pub struct PrimeCalculationResult {
pub primes: Vec<u64>,
pub gpu_duration: Duration,
pub filter_duration: Duration,
}
const MEMORY_LIMIT: u64 = 4 * 1024 * 1024 * 1024;
impl KernelController {
/// Filters all primes from the input without using a precalculated list of primes
/// for divisibility checks
pub fn filter_primes_simple(
pub fn calculate_primes(
&self,
input: Vec<u64>,
local_size: Option<usize>,
) -> ocl::Result<PrimeCalculationResult> {
let input_buffer = self.pro_que.buffer_builder().len(input.len()).build()?;
input_buffer.write(&input[..]).enq()?;
let output_buffer = self
.pro_que
.buffer_builder()
.len(input.len())
.fill_val(0u8)
.build()?;
let mut builder = self.pro_que.kernel_builder("check_prime");
if let Some(local_size) = local_size {
builder.local_work_size(local_size);
mut start: u64,
stop: u64,
step: usize,
local_size: usize,
use_cache: bool,
) -> OCLStream<ProfiledResult<Vec<u64>>> {
if start % 2 == 0 {
start += 1;
}
let kernel = builder
.arg(&input_buffer)
.arg(&output_buffer)
.global_work_size(input.len())
.build()?;
let start_cpu = Instant::now();
let event_start = self.pro_que.queue().enqueue_marker::<EventList>(None)?;
log::debug!(
"Calculating primes between {} and {} with {} number per step and a local size of {}",
start,
stop,
step,
local_size
);
let offset = Arc::new(AtomicU64::new(start));
let prime_cache = Arc::new(Mutex::new(Vec::new()));
unsafe {
kernel.enq()?;
if use_cache {
prime_cache
.lock()
.append(&mut get_primes(start + step as u64));
}
let event_stop = self.pro_que.queue().enqueue_marker::<EventList>(None)?;
wait_for_event(&event_start)?;
wait_for_event(&event_stop)?;
let pb = get_progress_bar((stop - start) / (step * 2) as u64);
let start = get_event_profiling_info(&event_start, ProfilingInfo::End)?;
let stop = get_event_profiling_info(&event_stop, ProfilingInfo::Start)?;
let gpu_calc_duration = Duration::from_nanos(stop.time()? - start.time()?);
let mut output = vec![0u8; output_buffer.len()];
output_buffer.read(&mut output).enq()?;
println!(
"GPU Calculation: {} ms\nGPU IO + Calculation: {} ms",
gpu_calc_duration.as_secs_f64() * 1000f64,
start_cpu.elapsed().as_secs_f64() * 1000f64
);
let filter_start = Instant::now();
let primes = map_gpu_prime_result(input, output);
self.executor.execute_bounded(step * 10, move |ctx| {
loop {
let pro_que = ctx.pro_que();
let sender = ctx.sender();
if offset.load(Ordering::SeqCst) >= stop {
log::trace!("Stop reached.");
break;
}
let offset = offset.fetch_add(step as u64 * 2, Ordering::SeqCst);
log::trace!("Calculating {} primes beginning from {}", step, offset);
let numbers = (offset..(step as u64 * 2 + offset))
.step_by(2)
.collect::<Vec<u64>>();
let result = if use_cache {
let prime_cache = Arc::clone(&prime_cache);
log::trace!("Using optimized function with cached primes");
Self::filter_primes_cached(pro_que, numbers, local_size, prime_cache)?
} else {
log::trace!("Using normal prime calculation function");
Self::filter_primes(pro_que, numbers, local_size)?
};
sender.send(result)?;
pb.inc(1);
}
Ok(PrimeCalculationResult {
primes,
filter_duration: filter_start.elapsed(),
gpu_duration: gpu_calc_duration,
Ok(())
})
}
/// Filters the primes from a list of numbers by using a precalculated list of primes to check
/// for divisibility
pub fn filter_primes(
&self,
input: Vec<u64>,
local_size: Option<usize>,
) -> ocl::Result<PrimeCalculationResult> {
lazy_static::lazy_static! {static ref PRIME_CACHE: Arc<Mutex<Vec<u64>>> = Arc::new(Mutex::new(Vec::new()));}
if PRIME_CACHE.lock().len() == 0 {
PRIME_CACHE.lock().append(&mut get_primes(
(*input.iter().max().unwrap_or(&1024) as f64).sqrt().ceil() as u64,
));
}
/// Creates the prime filter kernel and executes it
fn filter_primes(
pro_que: &ProQue,
numbers: Vec<u64>,
local_size: usize,
) -> ocl::Result<ProfiledResult<Vec<u64>>> {
log::trace!("Creating 0u8 output buffer");
let output_buffer = pro_que
.buffer_builder()
.len(numbers.len())
.fill_val(0u8)
.build()?;
let prime_buffer = {
let prime_cache = PRIME_CACHE.lock();
let prime_buffer = self
.pro_que
.buffer_builder()
.len(prime_cache.len())
.build()?;
let input_buffer = numbers.to_ocl_buffer(pro_que)?;
prime_buffer.write(&prime_cache[..]).enq()?;
log::trace!("Building 'check_prime' kernel");
let kernel = pro_que
.kernel_builder("check_prime")
.local_work_size(local_size)
.arg(&input_buffer)
.arg(&output_buffer)
.global_work_size(numbers.len())
.build()?;
let duration = enqueue_profiled(pro_que, &kernel)?;
prime_buffer
};
log::trace!("Reading output");
let mut output = vec![0u8; output_buffer.len()];
output_buffer.read(&mut output).enq()?;
log::trace!("Filtering primes");
let primes = map_gpu_prime_result(numbers, output);
log::trace!("Calculated {} primes", primes.len());
let input_buffer = self.pro_que.buffer_builder().len(input.len()).build()?;
input_buffer.write(&input[..]).enq()?;
Ok(ProfiledResult::new(duration, primes))
}
let output_buffer = self
.pro_que
pub fn filter_primes_cached(
pro_que: &ProQue,
numbers: Vec<u64>,
local_size: usize,
prime_cache: Arc<Mutex<Vec<u64>>>,
) -> ocl::Result<ProfiledResult<Vec<u64>>> {
let prime_buffer = prime_cache.lock().to_ocl_buffer(pro_que)?;
let input_buffer = numbers.to_ocl_buffer(pro_que)?;
log::trace!("Creating output buffer");
let output_buffer = pro_que
.buffer_builder()
.len(input.len())
.len(numbers.len())
.fill_val(0u8)
.build()?;
let mut builder = self.pro_que.kernel_builder("check_prime_cached");
if let Some(local_size) = local_size {
builder.local_work_size(local_size);
}
let kernel = builder
log::trace!("Building 'check_prime_cached' kernel");
let kernel = pro_que
.kernel_builder("check_prime_cached")
.local_work_size(local_size)
.arg(prime_buffer.len() as u32)
.arg(&prime_buffer)
.arg(&input_buffer)
.arg(&output_buffer)
.local_work_size(2)
.global_work_size(input.len())
.global_work_size(numbers.len())
.build()?;
let event_start = self.pro_que.queue().enqueue_marker::<EventList>(None)?;
let start_cpu = Instant::now();
unsafe {
kernel.enq()?;
}
let event_stop = self.pro_que.queue().enqueue_marker::<EventList>(None)?;
wait_for_event(&event_start)?;
wait_for_event(&event_stop)?;
let start = get_event_profiling_info(&event_start, ProfilingInfo::End)?;
let stop = get_event_profiling_info(&event_stop, ProfilingInfo::Start)?;
let gpu_calc_duration = Duration::from_nanos(stop.time()? - start.time()?);
let duration = enqueue_profiled(pro_que, &kernel)?;
log::trace!("Reading output");
let mut output = vec![0u8; output_buffer.len()];
output_buffer.read(&mut output).enq()?;
println!(
"GPU Calculation: {} ms\nGPU IO + Calculation: {} ms",
gpu_calc_duration.as_secs_f64() * 1000f64,
start_cpu.elapsed().as_secs_f64() * 1000f64
);
let prime_filter_start = Instant::now();
let primes = map_gpu_prime_result(input, output);
let filter_duration = prime_filter_start.elapsed();
log::trace!("Mapping prime result");
let primes = map_gpu_prime_result(numbers, output);
log::trace!("Calculated {} primes", primes.len());
let prime_calc_start = Instant::now();
let mut prime_cache = PRIME_CACHE.lock();
let mut prime_cache = prime_cache.lock();
if (prime_cache.len() + primes.len()) * size_of::<i64>()
< self.available_memory()? as usize / 4
{
log::trace!("Updating prime cache");
if (prime_cache.len() + primes.len()) * size_of::<i64>() < MEMORY_LIMIT as usize / 4 {
prime_cache.append(&mut primes.clone());
prime_cache.sort();
prime_cache.dedup();
}
let cache_duration = prime_calc_start.elapsed();
println!(
"Prime caching took: {} ms, size: {}",
cache_duration.as_secs_f64() * 1000f64,
prime_cache.len(),
);
Ok(PrimeCalculationResult {
primes,
gpu_duration: gpu_calc_duration,
filter_duration,
})
Ok(ProfiledResult::new(duration, primes))
}
}
/// Returns a list of prime numbers that can be used to speed up the divisibility check
pub fn get_primes(max_number: u64) -> Vec<u64> {
fn get_primes(max_number: u64) -> Vec<u64> {
log::trace!("Calculating primes until {} on the cpu", max_number);
let start = Instant::now();
let mut primes = Vec::with_capacity((max_number as f64).sqrt() as usize);
let mut num = 1;
@ -217,7 +200,7 @@ pub fn get_primes(max_number: u64) -> Vec<u64> {
}
num += 2;
}
println!(
log::trace!(
"Generated {} primes on the cpu in {} ms",
primes.len(),
start.elapsed().as_secs_f64() * 1000f64,

@ -1,130 +0,0 @@
use crate::benching::enqueue_profiled;
use crate::benching::result::ProfiledResult;
use crate::kernel_controller::primes::{get_primes, map_gpu_prime_result};
use crate::kernel_controller::KernelController;
use ocl::ProQue;
use ocl_stream::stream::OCLStream;
use ocl_stream::traits::ToOclBuffer;
use parking_lot::Mutex;
use std::mem::size_of;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
const MEMORY_LIMIT: u64 = 4 * 1024 * 1024 * 1024;
impl KernelController {
pub fn get_primes(
&self,
mut start: u64,
stop: u64,
step: usize,
local_size: usize,
use_cache: bool,
) -> OCLStream<ProfiledResult<Vec<u64>>> {
if start % 2 == 0 {
start += 1;
}
let offset = Arc::new(AtomicU64::new(start));
let prime_cache = Arc::new(Mutex::new(Vec::new()));
if use_cache {
prime_cache
.lock()
.append(&mut get_primes(start + step as u64));
}
self.executor.execute_bounded(step * 10, move |ctx| {
loop {
let pro_que = ctx.pro_que();
let sender = ctx.sender();
if offset.load(Ordering::SeqCst) >= stop {
break;
}
let offset = offset.fetch_add(step as u64 * 2, Ordering::SeqCst);
let numbers = (offset..(step as u64 * 2 + offset))
.step_by(2)
.collect::<Vec<u64>>();
let result = if use_cache {
let prime_cache = Arc::clone(&prime_cache);
Self::filter_primes_streamed_cached(pro_que, numbers, local_size, prime_cache)?
} else {
Self::filter_primes_streamed(pro_que, numbers, local_size)?
};
sender.send(result)?;
}
Ok(())
})
}
/// Creates the prime filter kernel and executes it
fn filter_primes_streamed(
pro_que: &ProQue,
numbers: Vec<u64>,
local_size: usize,
) -> ocl::Result<ProfiledResult<Vec<u64>>> {
let output_buffer = pro_que
.buffer_builder()
.len(numbers.len())
.fill_val(0u8)
.build()?;
let input_buffer = numbers.to_ocl_buffer(pro_que)?;
let kernel = pro_que
.kernel_builder("check_prime")
.local_work_size(local_size)
.arg(&input_buffer)
.arg(&output_buffer)
.global_work_size(numbers.len())
.build()?;
let duration = enqueue_profiled(pro_que, &kernel)?;
let mut output = vec![0u8; output_buffer.len()];
output_buffer.read(&mut output).enq()?;
let primes = map_gpu_prime_result(numbers, output);
Ok(ProfiledResult::new(duration, primes))
}
pub fn filter_primes_streamed_cached(
pro_que: &ProQue,
numbers: Vec<u64>,
local_size: usize,
prime_cache: Arc<Mutex<Vec<u64>>>,
) -> ocl::Result<ProfiledResult<Vec<u64>>> {
let prime_buffer = prime_cache.lock().to_ocl_buffer(pro_que)?;
let input_buffer = numbers.to_ocl_buffer(pro_que)?;
let output_buffer = pro_que
.buffer_builder()
.len(numbers.len())
.fill_val(0u8)
.build()?;
let kernel = pro_que
.kernel_builder("check_prime_cached")
.local_work_size(local_size)
.arg(prime_buffer.len() as u32)
.arg(&prime_buffer)
.arg(&input_buffer)
.arg(&output_buffer)
.global_work_size(numbers.len())
.build()?;
let duration = enqueue_profiled(pro_que, &kernel)?;
let mut output = vec![0u8; output_buffer.len()];
output_buffer.read(&mut output).enq()?;
let primes = map_gpu_prime_result(numbers, output);
let mut prime_cache = prime_cache.lock();
if (prime_cache.len() + primes.len()) * size_of::<i64>() < MEMORY_LIMIT as usize / 4 {
prime_cache.append(&mut primes.clone());
prime_cache.sort();
prime_cache.dedup();
}
Ok(ProfiledResult::new(duration, primes))
}
}

@ -1,32 +1,29 @@
/*
* opencl demos with rust
* Copyright (C) 2020 trivernis
* Copyright (C) 2021 trivernis
* See LICENSE for more information
*/
use crate::concurrency::executor::ConcurrentKernelExecutor;
use crate::kernel_controller::primes::is_prime;
use crate::kernel_controller::KernelController;
use crate::output::create_prime_write_thread;
use crate::output::csv::ThreadedCSVWriter;
use crate::output::threaded::ThreadedWriter;
use crate::kernel_controller::bench::BenchStatistics;
use crate::utils::logging::init_logger;
use ocl_stream::stream::OCLStream;
use ocl_stream::utils::result::{OCLStreamError, OCLStreamResult};
use rayon::prelude::*;
use std::fs::{File, OpenOptions};
use std::io::BufWriter;
use std::mem;
use std::path::PathBuf;
use std::sync::mpsc::channel;
use std::time::Duration;
use structopt::StructOpt;
mod benching;
mod concurrency;
mod kernel_controller;
mod output;
mod utils;
#[derive(StructOpt, Clone, Debug)]
#[structopt()]
@ -89,10 +86,6 @@ struct CalculatePrimes {
/// number of used threads
#[structopt(short = "p", long = "parallel", default_value = "2")]
num_threads: usize,
/// if the result should be streamed
#[structopt(long = "streamed")]
streamed: bool,
}
#[derive(StructOpt, Clone, Debug)]
@ -160,22 +153,18 @@ pub struct BenchOptions {
fn main() -> OCLStreamResult<()> {
let opts: Opts = Opts::from_args();
let controller = KernelController::new()?;
init_logger();
match opts {
Opts::Info => controller.print_info().map_err(OCLStreamError::from),
Opts::CalculatePrimes(prime_opts) => {
if prime_opts.streamed {
calculate_primes_streamed(prime_opts, controller)
} else {
calculate_primes(prime_opts, controller).map_err(OCLStreamError::from)
}
}
Opts::CalculatePrimes(prime_opts) => calculate_primes(prime_opts, controller),
Opts::BenchGlobalSize(bench_opts) => bench_global_size(bench_opts, controller),
Opts::BenchLocalSize(bench_opts) => bench_local_size(bench_opts, controller),
}
}
fn calculate_primes_streamed(
/// Calculates primes on the GPU
fn calculate_primes(
prime_opts: CalculatePrimes,
mut controller: KernelController,
) -> OCLStreamResult<()> {
@ -192,7 +181,7 @@ fn calculate_primes_streamed(
.into_bytes()
});
let mut stream = controller.get_primes(
let mut stream = controller.calculate_primes(
prime_opts.start_offset,
prime_opts.max_number,
prime_opts.numbers_per_step,
@ -205,7 +194,7 @@ fn calculate_primes_streamed(
validate_primes_on_cpu(primes);
}
let first = *primes.first().unwrap(); // if there's none, rip
println!(
log::debug!(
"Calculated {} primes in {:?}, offset: {}",
primes.len(),
r.gpu_duration(),
@ -224,83 +213,6 @@ fn calculate_primes_streamed(
Ok(())
}
/// Calculates Prime numbers with GPU acceleration
fn calculate_primes(prime_opts: CalculatePrimes, controller: KernelController) -> ocl::Result<()> {
let output = BufWriter::new(
OpenOptions::new()
.create(true)
.append(true)
.open(&prime_opts.output_file)
.unwrap(),
);
let timings = BufWriter::new(
OpenOptions::new()
.create(true)
.truncate(true)
.write(true)
.open(&prime_opts.timings_file)
.unwrap(),
);
let mut csv_writer = ThreadedCSVWriter::new(
timings,
&["offset", "count", "gpu_duration", "filter_duration"],
);
let (prime_sender, prime_handle) = create_prime_write_thread(output);
let mut offset = prime_opts.start_offset;
if offset % 2 == 0 {
offset += 1;
}
if offset < 2 {
prime_sender.send(vec![2]).unwrap();
}
let executor = ConcurrentKernelExecutor::new(controller);
let (tx, rx) = channel();
let executor_thread = std::thread::spawn({
let prime_opts = prime_opts.clone();
move || {
executor.calculate_primes(
prime_opts.start_offset,
prime_opts.numbers_per_step,
prime_opts.local_size,
prime_opts.max_number,
prime_opts.no_cache,
prime_opts.num_threads,
tx,
)
}
});
for prime_result in rx {
let offset = prime_result.primes.last().cloned().unwrap();
let primes = prime_result.primes;
println!(
"Calculated {} primes: {:.4} checks/s, offset: {}",
primes.len(),
prime_opts.numbers_per_step as f64 / prime_result.gpu_duration.as_secs_f64(),
offset,
);
csv_writer.add_row(vec![
offset.to_string(),
primes.len().to_string(),
duration_to_ms_string(&prime_result.gpu_duration),
duration_to_ms_string(&prime_result.filter_duration),
]);
if prime_opts.cpu_validate {
validate_primes_on_cpu(&primes)
}
prime_sender.send(primes).unwrap();
}
mem::drop(prime_sender);
prime_handle.join().unwrap();
csv_writer.close();
executor_thread.join().unwrap();
Ok(())
}
/// Benchmarks the local size used for calculations
fn bench_local_size(opts: BenchLocalSize, controller: KernelController) -> OCLStreamResult<()> {
let bench_writer = open_write_buffered(&opts.bench_options.benchmark_file);
@ -365,7 +277,7 @@ fn read_bench_results(
loop {
match stream.next() {
Ok(stats) => {
println!("{}\n", stats);
log::debug!("{:?}", stats);
csv_writer.add_row(vec![
stats.local_size.to_string(),
stats.global_size.to_string(),
@ -384,7 +296,7 @@ fn read_bench_results(
}
fn validate_primes_on_cpu(primes: &Vec<u64>) {
println!("Validating...");
log::debug!("Validating primes on the cpu");
let failures = primes
.par_iter()
.filter(|n| !is_prime(**n))
@ -396,7 +308,7 @@ fn validate_primes_on_cpu(primes: &Vec<u64>) {
failures
);
} else {
println!("No failures found.");
log::debug!("No failures found.");
}
}

@ -1,6 +1,6 @@
/*
* opencl demos with rust
* Copyright (C) 2020 trivernis
* Copyright (C) 2021 trivernis
* See LICENSE for more information
*/
@ -23,6 +23,7 @@ impl ThreadedCSVWriter {
.iter()
.map(|column| column.to_string())
.collect::<Vec<String>>();
log::trace!("Creating new CSV Writer with columns: {:?}", column_vec);
let writer = ThreadedWriter::new(writer, |v: String| v.as_bytes().to_vec());
let mut csv_writer = Self {
@ -36,6 +37,7 @@ impl ThreadedCSVWriter {
/// Adds a new row of values to the file
pub fn add_row(&mut self, items: Vec<String>) {
log::trace!("Adding row to CSV: {:?}", items);
self.inner.write(
items
.iter()

@ -1,28 +1,7 @@
/*
* opencl demos with rust
* Copyright (C) 2020 trivernis
* Copyright (C) 2021 trivernis
* See LICENSE for more information
*/
use std::fs::File;
use std::io::{BufWriter, Write};
use std::sync::mpsc::{channel, Sender};
use std::thread::{self, JoinHandle};
pub mod csv;
pub mod threaded;
pub fn create_prime_write_thread(
mut writer: BufWriter<File>,
) -> (Sender<Vec<u64>>, JoinHandle<()>) {
let (tx, rx) = channel();
let handle = thread::spawn(move || {
for primes in rx {
for prime in primes {
writer.write_all(format!("{}\n", prime).as_bytes()).unwrap();
}
writer.flush().unwrap();
}
});
(tx, handle)
}

@ -1,3 +1,9 @@
/*
* opencl demos with rust
* Copyright (C) 2021 trivernis
* See LICENSE for more information
*/
use crossbeam_channel::Sender;
use std::io::Write;
use std::mem;
@ -21,24 +27,33 @@ where
F: Fn(T) -> Vec<u8> + Send + Sync + 'static,
W: Write + Send + Sync + 'static,
{
log::trace!("Creating new threaded writer");
let (tx, rx) = crossbeam_channel::bounded(1024);
let handle = thread::spawn(move || {
for value in rx {
let mut bytes = serializer(value);
writer.write_all(&mut bytes[..]).unwrap();
writer.flush().unwrap();
}
});
let handle = thread::Builder::new()
.name("io-thread".to_string())
.spawn(move || {
log::trace!("Writing thread running");
for value in rx {
log::trace!("Writing received value");
let mut bytes = serializer(value);
writer.write_all(&mut bytes[..]).unwrap();
writer.flush().unwrap();
}
})
.unwrap();
Self { handle, tx }
}
/// Writes a value
pub fn write(&self, value: T) {
log::trace!("Writing into threaded writer");
self.tx.send(value).unwrap();
}
/// Closes the channel to the writer and waits for the writer thread to stop
pub fn close(self) {
log::trace!("Closing file and shutting down thread");
mem::drop(self.tx);
self.handle.join().unwrap();
}

@ -0,0 +1,62 @@
/*
* opencl demos with rust
* Copyright (C) 2021 trivernis
* See LICENSE for more information
*/
use chrono::Local;
use colored::*;
use log::{Level, LevelFilter};
use std::str::FromStr;
use std::thread;
/// Initializes the env_logger with a custom format
/// that also logs the thread names
pub fn init_logger() {
fern::Dispatch::new()
.format(|out, message, record| {
let color = get_level_style(record.level());
let mut thread_name = format!(
"thread::{}",
thread::current().name().unwrap_or("main").to_string()
);
thread_name.truncate(34);
let mut target = record.target().to_string();
target.truncate(39);
out.finish(format_args!(
"{:<20} {:<40}| {} {}: {}",
thread_name.dimmed(),
target.dimmed().italic(),
Local::now().format("%Y-%m-%dT%H:%M:%S.%f"),
record
.level()
.to_string()
.to_lowercase()
.as_str()
.color(color),
message
))
})
.level(
log::LevelFilter::from_str(
std::env::var("RUST_LOG")
.unwrap_or("info".to_string())
.as_str(),
)
.unwrap_or(LevelFilter::Info),
)
.chain(std::io::stdout())
.apply()
.expect("failed to init logger");
}
fn get_level_style(level: Level) -> colored::Color {
match level {
Level::Trace => colored::Color::Magenta,
Level::Debug => colored::Color::Blue,
Level::Info => colored::Color::Green,
Level::Warn => colored::Color::Yellow,
Level::Error => colored::Color::Red,
}
}

@ -0,0 +1,7 @@
/*
* opencl demos with rust
* Copyright (C) 2021 trivernis
* See LICENSE for more information
*/
pub mod logging;
pub mod progress;

@ -0,0 +1,22 @@
/*
* opencl demos with rust
* Copyright (C) 2021 trivernis
* See LICENSE for more information
*/
use indicatif::{ProgressBar, ProgressStyle};
use log::LevelFilter;
pub fn get_progress_bar(size: u64) -> ProgressBar {
if log::max_level() == LevelFilter::Info {
let bar = ProgressBar::new(size);
bar.set_style(
ProgressStyle::default_bar()
.template("[ETA:{eta}] {bar:60.cyan/blue} {pos:>7}/{len:7} {msg}")
.progress_chars("#>-"),
);
bar
} else {
ProgressBar::hidden()
}
}
Loading…
Cancel
Save