diff --git a/Cargo.lock b/Cargo.lock index 7889ca8..4ece67e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -515,6 +515,18 @@ dependencies = [ "num", ] +[[package]] +name = "ocl-stream" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6571c0dc580e1603bdf23e277402b8dea73c0631de6a123b796da9a27681c960" +dependencies = [ + "crossbeam-channel 0.5.0", + "num_cpus", + "ocl", + "thiserror", +] + [[package]] name = "parking_lot" version = "0.11.1" @@ -665,8 +677,10 @@ checksum = "41cc0f7e4d5d4544e8861606a285bb08d3e70712ccc7d2b84d7c0ccfaf4b05ce" name = "rust-opencl-demo" version = "0.1.0" dependencies = [ + "crossbeam-channel 0.5.0", "lazy_static", "ocl", + "ocl-stream", "parking_lot", "rayon", "structopt", @@ -773,6 +787,26 @@ dependencies = [ "unicode-width", ] +[[package]] +name = "thiserror" +version = "1.0.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76cc616c6abf8c8928e2fdcc0dbfab37175edd8fb49a4641066ad1364fdab146" +dependencies = [ + "thiserror-impl", +] + +[[package]] +name = "thiserror-impl" +version = "1.0.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9be73a2caec27583d0046ef3796c3794f868a5bc813db689eed00c7631275cd1" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "unicode-segmentation" version = "1.7.0" diff --git a/Cargo.toml b/Cargo.toml index 7ecc5aa..0db18a9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,4 +11,6 @@ ocl = "0.19.3" structopt = "0.3.20" lazy_static = "1.4.0" parking_lot = "0.11.1" -rayon = "1.5.0" \ No newline at end of file +rayon = "1.5.0" +ocl-stream = "0.3.0" +crossbeam-channel = "0.5.0" \ No newline at end of file diff --git a/src/benching/mod.rs b/src/benching/mod.rs new file mode 100644 index 0000000..53a2e05 --- /dev/null +++ b/src/benching/mod.rs @@ -0,0 +1,22 @@ +use ocl::core::{get_event_profiling_info, wait_for_event, ProfilingInfo}; +use ocl::{EventList, Kernel, ProQue}; +use std::time::Duration; + +pub mod result; + +/// Runs a benchmark on the kernel +/// The ProQue needs to have profiling enabled +pub fn enqueue_profiled(pro_que: &ProQue, kernel: &Kernel) -> ocl::Result { + let event_start = pro_que.queue().enqueue_marker::(None)?; + unsafe { + kernel.enq()?; + } + let event_stop = pro_que.queue().enqueue_marker::(None)?; + wait_for_event(&event_start)?; + wait_for_event(&event_stop)?; + let start = get_event_profiling_info(&event_start, ProfilingInfo::End)?; + let stop = get_event_profiling_info(&event_stop, ProfilingInfo::Start)?; + let gpu_calc_duration = Duration::from_nanos(stop.time()? - start.time()?); + + Ok(gpu_calc_duration) +} diff --git a/src/benching/result.rs b/src/benching/result.rs new file mode 100644 index 0000000..9394e36 --- /dev/null +++ b/src/benching/result.rs @@ -0,0 +1,34 @@ +use std::time::Duration; + +/// Result of a benched kernel execution +#[derive(Clone, Debug)] +pub struct ProfiledResult +where + T: Send + Sync + Clone, +{ + gpu_duration: Duration, + value: T, +} + +impl ProfiledResult +where + T: Send + Sync + Clone, +{ + /// Creates a new profiled result with the given duraiton and value + pub fn new(gpu_duration: Duration, value: T) -> Self { + Self { + gpu_duration, + value, + } + } + + /// Returns the execution duration on the gpu + pub fn gpu_duration(&self) -> &Duration { + &self.gpu_duration + } + + /// Returns the value of the result + pub fn value(&self) -> &T { + &self.value + } +} diff --git a/src/kernel_controller/mod.rs b/src/kernel_controller/mod.rs index ddc8940..eedb677 100644 --- a/src/kernel_controller/mod.rs +++ b/src/kernel_controller/mod.rs @@ -7,25 +7,30 @@ use ocl::core::DeviceInfo; use ocl::enums::DeviceInfoResult; use ocl::{CommandQueueProperties, ProQue}; +use ocl_stream::OCLStreamExecutor; pub mod bench; pub mod primes; +pub mod primes_streamed; #[derive(Clone)] pub struct KernelController { pro_que: ProQue, + executor: OCLStreamExecutor, } impl KernelController { pub fn new() -> ocl::Result { let pro_que = ProQue::builder() .src(include_str!("kernel.cl")) - .dims(1 << 20) + .dims(1) // won't be used as buffer sizes are declared explicitly .queue_properties(CommandQueueProperties::PROFILING_ENABLE) .build()?; + let mut executor = OCLStreamExecutor::new(pro_que.clone()); + executor.set_concurrency(3); println!("Using device {}", pro_que.device().name()?); - Ok(Self { pro_que }) + Ok(Self { pro_que, executor }) } /// Prints information about the gpu capabilities diff --git a/src/kernel_controller/primes.rs b/src/kernel_controller/primes.rs index fd3fac9..e7b66c9 100644 --- a/src/kernel_controller/primes.rs +++ b/src/kernel_controller/primes.rs @@ -244,7 +244,7 @@ pub fn is_prime(number: u64) -> bool { } #[inline] -fn map_gpu_prime_result(input: Vec, output: Vec) -> Vec { +pub fn map_gpu_prime_result(input: Vec, output: Vec) -> Vec { input .into_iter() .enumerate() diff --git a/src/kernel_controller/primes_streamed.rs b/src/kernel_controller/primes_streamed.rs new file mode 100644 index 0000000..5c25b05 --- /dev/null +++ b/src/kernel_controller/primes_streamed.rs @@ -0,0 +1,69 @@ +use crate::benching::enqueue_profiled; +use crate::benching::result::ProfiledResult; +use crate::kernel_controller::primes::map_gpu_prime_result; +use crate::kernel_controller::KernelController; +use ocl::ProQue; +use ocl_stream::stream::OCLStream; +use ocl_stream::traits::ToOclBuffer; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Arc; + +impl KernelController { + pub fn get_primes( + &self, + mut start: u64, + stop: u64, + step: usize, + local_size: usize, + ) -> OCLStream>> { + if start % 2 == 0 { + start += 1; + } + let offset = Arc::new(AtomicU64::new(start)); + self.executor.execute_bounded(step * 10, move |ctx| { + loop { + let pro_que = ctx.pro_que(); + let sender = ctx.sender(); + if offset.load(Ordering::SeqCst) >= stop { + break; + } + let offset = offset.fetch_add(step as u64 * 2, Ordering::SeqCst); + let numbers = (offset..(step as u64 * 2 + offset)) + .step_by(2) + .collect::>(); + let result = Self::filter_primes_streamed(pro_que, numbers, local_size)?; + sender.send(result)?; + } + + Ok(()) + }) + } + + /// Creates the prime filter kernel and executes it + fn filter_primes_streamed( + pro_que: &ProQue, + numbers: Vec, + local_size: usize, + ) -> ocl::Result>> { + let output_buffer = pro_que + .buffer_builder() + .len(numbers.len()) + .fill_val(0u8) + .build()?; + let input_buffer = numbers.to_ocl_buffer(pro_que)?; + let kernel = pro_que + .kernel_builder("check_prime") + .local_work_size(local_size) + .arg(&input_buffer) + .arg(&output_buffer) + .global_work_size(numbers.len()) + .build()?; + let duration = enqueue_profiled(pro_que, &kernel)?; + + let mut output = vec![0u8; output_buffer.len()]; + output_buffer.read(&mut output).enq()?; + let primes = map_gpu_prime_result(numbers, output); + + Ok(ProfiledResult::new(duration, primes)) + } +} diff --git a/src/main.rs b/src/main.rs index 0eb5f92..c885197 100644 --- a/src/main.rs +++ b/src/main.rs @@ -7,10 +7,13 @@ use crate::concurrency::executor::ConcurrentKernelExecutor; use crate::kernel_controller::primes::is_prime; use crate::kernel_controller::KernelController; -use crate::output::csv::CSVWriter; -use crate::output::{create_csv_write_thread, create_prime_write_thread}; +use crate::output::create_prime_write_thread; +use crate::output::csv::ThreadedCSVWriter; +use crate::output::threaded::ThreadedWriter; + +use ocl_stream::utils::result::OCLStreamResult; use rayon::prelude::*; -use std::fs::OpenOptions; +use std::fs::{File, OpenOptions}; use std::io::BufWriter; use std::mem; use std::path::PathBuf; @@ -18,6 +21,7 @@ use std::sync::mpsc::channel; use std::time::Duration; use structopt::StructOpt; +mod benching; mod concurrency; mod kernel_controller; mod output; @@ -76,8 +80,13 @@ struct CalculatePrimes { #[structopt(long = "cpu-validate")] cpu_validate: bool, + /// number of used threads #[structopt(short = "p", long = "parallel", default_value = "2")] num_threads: usize, + + /// if the result should be streamed + #[structopt(long = "streamed")] + streamed: bool, } #[derive(StructOpt, Clone, Debug)] @@ -128,11 +137,60 @@ fn main() -> ocl::Result<()> { match opts { Opts::Info => controller.print_info(), - Opts::CalculatePrimes(prime_opts) => calculate_primes(prime_opts, controller), + Opts::CalculatePrimes(prime_opts) => { + if prime_opts.streamed { + calculate_primes_streamed(prime_opts, controller).unwrap(); + Ok(()) + } else { + calculate_primes(prime_opts, controller) + } + } Opts::BenchmarkTaskCount(bench_opts) => bench_task_count(bench_opts, controller), } } +fn calculate_primes_streamed( + prime_opts: CalculatePrimes, + controller: KernelController, +) -> OCLStreamResult<()> { + let csv_file = open_write_buffered(&prime_opts.timings_file); + let mut csv_writer = ThreadedCSVWriter::new(csv_file, &["first", "count", "gpu_duration"]); + let output_file = open_write_buffered(&prime_opts.output_file); + let output_writer = ThreadedWriter::new(output_file, |v: Vec| { + v.iter() + .map(|v| v.to_string()) + .fold("".to_string(), |a, b| format!("{}\n{}", a, b)) + .into_bytes() + }); + + let mut stream = controller.get_primes( + prime_opts.start_offset, + prime_opts.max_number, + prime_opts.numbers_per_step, + prime_opts.local_size.unwrap_or(128), + ); + while let Ok(r) = stream.next() { + let primes = r.value(); + let first = *primes.first().unwrap(); // if there's none, rip + println!( + "Calculated {} primes in {:?}, offset: {}", + primes.len(), + r.gpu_duration(), + first + ); + csv_writer.add_row(vec![ + first.to_string(), + primes.len().to_string(), + duration_to_ms_string(r.gpu_duration()), + ]); + output_writer.write(primes.clone()); + } + csv_writer.close(); + output_writer.close(); + + Ok(()) +} + /// Calculates Prime numbers with GPU acceleration fn calculate_primes(prime_opts: CalculatePrimes, controller: KernelController) -> ocl::Result<()> { let output = BufWriter::new( @@ -150,14 +208,12 @@ fn calculate_primes(prime_opts: CalculatePrimes, controller: KernelController) - .open(&prime_opts.timings_file) .unwrap(), ); - let timings = CSVWriter::new( + let mut csv_writer = ThreadedCSVWriter::new( timings, &["offset", "count", "gpu_duration", "filter_duration"], - ) - .unwrap(); + ); let (prime_sender, prime_handle) = create_prime_write_thread(output); - let (csv_sender, csv_handle) = create_csv_write_thread(timings); let mut offset = prime_opts.start_offset; if offset % 2 == 0 { @@ -192,14 +248,12 @@ fn calculate_primes(prime_opts: CalculatePrimes, controller: KernelController) - prime_opts.numbers_per_step as f64 / prime_result.gpu_duration.as_secs_f64(), offset, ); - csv_sender - .send(vec![ - offset.to_string(), - primes.len().to_string(), - duration_to_ms_string(&prime_result.gpu_duration), - duration_to_ms_string(&prime_result.filter_duration), - ]) - .unwrap(); + csv_writer.add_row(vec![ + offset.to_string(), + primes.len().to_string(), + duration_to_ms_string(&prime_result.gpu_duration), + duration_to_ms_string(&prime_result.filter_duration), + ]); if prime_opts.cpu_validate { validate_primes_on_cpu(&primes) } @@ -207,9 +261,8 @@ fn calculate_primes(prime_opts: CalculatePrimes, controller: KernelController) - } mem::drop(prime_sender); - mem::drop(csv_sender); prime_handle.join().unwrap(); - csv_handle.join().unwrap(); + csv_writer.close(); executor_thread.join().unwrap(); Ok(()) @@ -224,7 +277,7 @@ fn bench_task_count(opts: BenchmarkTaskCount, controller: KernelController) -> o .open(opts.benchmark_file) .unwrap(), ); - let csv_writer = CSVWriter::new( + let mut csv_writer = ThreadedCSVWriter::new( bench_writer, &[ "local_size", @@ -234,9 +287,7 @@ fn bench_task_count(opts: BenchmarkTaskCount, controller: KernelController) -> o "gpu_duration", "read_duration", ], - ) - .unwrap(); - let (bench_sender, bench_handle) = create_csv_write_thread(csv_writer); + ); for n in (opts.num_tasks_start..=opts.num_tasks_stop).step_by(opts.num_tasks_step) { if let (Some(start), Some(stop)) = (opts.local_size_start, opts.local_size_stop) { for l in (start..=stop) @@ -248,16 +299,14 @@ fn bench_task_count(opts: BenchmarkTaskCount, controller: KernelController) -> o stats.avg(controller.bench_int(opts.calculation_steps, n, Some(l))?) } println!("{}\n", stats); - bench_sender - .send(vec![ - l.to_string(), - n.to_string(), - opts.calculation_steps.to_string(), - duration_to_ms_string(&stats.write_duration), - duration_to_ms_string(&stats.calc_duration), - duration_to_ms_string(&stats.read_duration), - ]) - .unwrap(); + csv_writer.add_row(vec![ + l.to_string(), + n.to_string(), + opts.calculation_steps.to_string(), + duration_to_ms_string(&stats.write_duration), + duration_to_ms_string(&stats.calc_duration), + duration_to_ms_string(&stats.read_duration), + ]) } } else { let mut stats = controller.bench_int(opts.calculation_steps, n, None)?; @@ -265,21 +314,17 @@ fn bench_task_count(opts: BenchmarkTaskCount, controller: KernelController) -> o stats.avg(controller.bench_int(opts.calculation_steps, n, None)?) } println!("{}\n", stats); - bench_sender - .send(vec![ - "n/a".to_string(), - n.to_string(), - opts.calculation_steps.to_string(), - duration_to_ms_string(&stats.write_duration), - duration_to_ms_string(&stats.calc_duration), - duration_to_ms_string(&stats.read_duration), - ]) - .unwrap(); + csv_writer.add_row(vec![ + "n/a".to_string(), + n.to_string(), + opts.calculation_steps.to_string(), + duration_to_ms_string(&stats.write_duration), + duration_to_ms_string(&stats.calc_duration), + duration_to_ms_string(&stats.read_duration), + ]); } } - - mem::drop(bench_sender); - bench_handle.join().unwrap(); + csv_writer.close(); Ok(()) } @@ -304,3 +349,16 @@ fn validate_primes_on_cpu(primes: &Vec) { fn duration_to_ms_string(duration: &Duration) -> String { format!("{}", duration.as_secs_f64() * 1000f64) } + +/// opens a file in a buffered writer +/// if it already exists it will be recreated +fn open_write_buffered(path: &PathBuf) -> BufWriter { + BufWriter::new( + OpenOptions::new() + .truncate(true) + .write(true) + .create(true) + .open(path) + .expect("Failed to open file!"), + ) +} diff --git a/src/output/csv.rs b/src/output/csv.rs index a8e27db..a6023f7 100644 --- a/src/output/csv.rs +++ b/src/output/csv.rs @@ -4,49 +4,51 @@ * See LICENSE for more information */ +use crate::output::threaded::ThreadedWriter; use std::collections::HashMap; -use std::io::{Result, Write}; +use std::io::Write; -pub struct CSVWriter { - inner: W, +pub struct ThreadedCSVWriter { + inner: ThreadedWriter, columns: Vec, } -impl CSVWriter -where - W: Write, -{ +impl ThreadedCSVWriter { /// Creates a new CSVWriter with a defined list of columns - pub fn new(writer: W, columns: &[&str]) -> Result { + pub fn new(writer: W, columns: &[&str]) -> Self + where + W: Write + Send + Sync + 'static, + { let column_vec = columns .iter() .map(|column| column.to_string()) .collect::>(); + let writer = ThreadedWriter::new(writer, |v: String| v.as_bytes().to_vec()); let mut csv_writer = Self { inner: writer, columns: column_vec.clone(), }; - csv_writer.add_row(column_vec)?; + csv_writer.add_row(column_vec); - Ok(csv_writer) + csv_writer } /// Adds a new row of values to the file - pub fn add_row(&mut self, items: Vec) -> Result<()> { - self.inner.write_all( + pub fn add_row(&mut self, items: Vec) { + self.inner.write( items .iter() .fold("".to_string(), |a, b| format!("{},{}", a, b)) .trim_start_matches(',') - .as_bytes(), - )?; - self.inner.write_all("\n".as_bytes()) + .to_string() + + "\n", + ); } /// Adds a new row of values stored in a map to the file #[allow(dead_code)] - pub fn add_row_map(&mut self, item_map: &HashMap) -> Result<()> { + pub fn add_row_map(&mut self, item_map: &HashMap) { let mut items = Vec::new(); for key in &self.columns { items.push(item_map.get(key).cloned().unwrap_or("".to_string())); @@ -55,7 +57,7 @@ where self.add_row(items) } - pub fn flush(&mut self) -> Result<()> { - self.inner.flush() + pub fn close(self) { + self.inner.close() } } diff --git a/src/output/mod.rs b/src/output/mod.rs index 4936d2d..86b2b18 100644 --- a/src/output/mod.rs +++ b/src/output/mod.rs @@ -3,13 +3,13 @@ * Copyright (C) 2020 trivernis * See LICENSE for more information */ -use crate::output::csv::CSVWriter; use std::fs::File; use std::io::{BufWriter, Write}; use std::sync::mpsc::{channel, Sender}; use std::thread::{self, JoinHandle}; pub mod csv; +pub mod threaded; pub fn create_prime_write_thread( mut writer: BufWriter, @@ -26,17 +26,3 @@ pub fn create_prime_write_thread( (tx, handle) } - -pub fn create_csv_write_thread( - mut writer: CSVWriter>, -) -> (Sender>, JoinHandle<()>) { - let (tx, rx) = channel(); - let handle = thread::spawn(move || { - for row in rx { - writer.add_row(row).unwrap(); - } - writer.flush().unwrap(); - }); - - (tx, handle) -} diff --git a/src/output/threaded.rs b/src/output/threaded.rs new file mode 100644 index 0000000..2f6a643 --- /dev/null +++ b/src/output/threaded.rs @@ -0,0 +1,45 @@ +use crossbeam_channel::Sender; +use std::io::Write; +use std::mem; +use std::thread::{self, JoinHandle}; + +pub struct ThreadedWriter +where + T: Send + Sync, +{ + handle: JoinHandle<()>, + tx: Sender, +} + +impl ThreadedWriter +where + T: Send + Sync + 'static, +{ + /// Creates a new threaded writer + pub fn new(mut writer: W, serializer: F) -> Self + where + F: Fn(T) -> Vec + Send + Sync + 'static, + W: Write + Send + Sync + 'static, + { + let (tx, rx) = crossbeam_channel::bounded(1024); + let handle = thread::spawn(move || { + for value in rx { + let mut bytes = serializer(value); + writer.write_all(&mut bytes[..]).unwrap(); + writer.flush().unwrap(); + } + }); + Self { handle, tx } + } + + /// Writes a value + pub fn write(&self, value: T) { + self.tx.send(value).unwrap(); + } + + /// Closes the channel to the writer and waits for the writer thread to stop + pub fn close(self) { + mem::drop(self.tx); + self.handle.join().unwrap(); + } +}