diff --git a/src/concurrency/executor.rs b/src/concurrency/executor.rs new file mode 100644 index 0000000..7096866 --- /dev/null +++ b/src/concurrency/executor.rs @@ -0,0 +1,84 @@ +use crate::kernel_controller::primes::PrimeCalculationResult; +use crate::kernel_controller::KernelController; +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; +use std::sync::mpsc::Sender; +use std::sync::Arc; +use std::thread::Builder as ThreadBuilder; + +pub struct ConcurrentKernelExecutor { + kernel_controller: KernelController, +} + +impl ConcurrentKernelExecutor { + pub fn new(kernel_controller: KernelController) -> Self { + Self { kernel_controller } + } + + pub fn calculate_primes( + &self, + mut offset: u64, + numbers_per_step: usize, + stop: u64, + no_cache: bool, + num_threads: usize, + sender: Sender, + ) { + let mut handles = Vec::new(); + if offset % 2 == 0 { + offset += 1; + } + let offset = Arc::new(AtomicU64::new(offset)); + let panic = Arc::new(AtomicBool::new(false)); + + for i in 0..num_threads { + let sender = Sender::clone(&sender); + let controller = self.kernel_controller.clone(); + let offset = Arc::clone(&offset); + let panic = Arc::clone(&panic); + + handles.push( + ThreadBuilder::new() + .name(format!("executor-{}", i)) + .spawn(move || loop { + if panic.load(Ordering::Relaxed) { + panic!("Planned panic"); + } + if offset.load(Ordering::SeqCst) >= stop { + break; + } + let offset = + offset.fetch_add(numbers_per_step as u64 * 2, Ordering::SeqCst); + + let numbers = (offset..(numbers_per_step as u64 * 2 + offset)) + .step_by(2) + .collect::>(); + let prime_result = if no_cache { + controller + .filter_primes_simple(numbers) + .map_err(|e| { + panic.store(true, Ordering::Relaxed); + e + }) + .unwrap() + } else { + controller + .filter_primes(numbers) + .map_err(|e| { + panic.store(true, Ordering::Relaxed); + e + }) + .unwrap() + }; + if let Err(e) = sender.send(prime_result) { + panic.store(true, Ordering::Relaxed); + panic!(e); + } + }) + .unwrap(), + ); + } + for handle in handles { + handle.join().unwrap(); + } + } +} diff --git a/src/concurrency/mod.rs b/src/concurrency/mod.rs new file mode 100644 index 0000000..0c95fda --- /dev/null +++ b/src/concurrency/mod.rs @@ -0,0 +1 @@ +pub mod executor; diff --git a/src/kernel_controller/mod.rs b/src/kernel_controller/mod.rs index 4ac4186..5d454f3 100644 --- a/src/kernel_controller/mod.rs +++ b/src/kernel_controller/mod.rs @@ -11,6 +11,7 @@ use ocl::ProQue; pub mod bench; pub mod primes; +#[derive(Clone)] pub struct KernelController { pro_que: ProQue, } diff --git a/src/kernel_controller/primes.rs b/src/kernel_controller/primes.rs index 059ce41..a939a8b 100644 --- a/src/kernel_controller/primes.rs +++ b/src/kernel_controller/primes.rs @@ -73,13 +73,18 @@ impl KernelController { )); } - let prime_buffer = self - .pro_que - .buffer_builder() - .len(PRIME_CACHE.lock().len()) - .build()?; - - prime_buffer.write(&PRIME_CACHE.lock()[..]).enq()?; + let prime_buffer = { + let prime_cache = PRIME_CACHE.lock(); + let prime_buffer = self + .pro_que + .buffer_builder() + .len(prime_cache.len()) + .build()?; + + prime_buffer.write(&prime_cache[..]).enq()?; + + prime_buffer + }; let input_buffer = self.pro_que.buffer_builder().len(input.len()).build()?; input_buffer.write(&input[..]).enq()?; diff --git a/src/main.rs b/src/main.rs index 490f2e8..2ad185c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,6 +4,7 @@ * See LICENSE for more information */ +use crate::concurrency::executor::ConcurrentKernelExecutor; use crate::kernel_controller::primes::is_prime; use crate::kernel_controller::KernelController; use crate::output::csv::CSVWriter; @@ -13,9 +14,11 @@ use std::fs::OpenOptions; use std::io::BufWriter; use std::mem; use std::path::PathBuf; -use std::time::{Duration, Instant}; +use std::sync::mpsc::channel; +use std::time::Duration; use structopt::StructOpt; +mod concurrency; mod kernel_controller; mod output; @@ -65,6 +68,9 @@ struct CalculatePrimes { /// If the calculated prime numbers should be validated on the cpu by a simple prime algorithm #[structopt(long = "cpu-validate")] cpu_validate: bool, + + #[structopt(short = "p", long = "parallel", default_value = "2")] + num_threads: usize, } #[derive(StructOpt, Clone, Debug)] @@ -112,7 +118,7 @@ fn calculate_primes(prime_opts: CalculatePrimes, controller: KernelController) - OpenOptions::new() .create(true) .append(true) - .open(prime_opts.output_file) + .open(&prime_opts.output_file) .unwrap(), ); let timings = BufWriter::new( @@ -120,18 +126,12 @@ fn calculate_primes(prime_opts: CalculatePrimes, controller: KernelController) - .create(true) .truncate(true) .write(true) - .open(prime_opts.timings_file) + .open(&prime_opts.timings_file) .unwrap(), ); let timings = CSVWriter::new( timings, - &[ - "offset", - "count", - "gpu_duration", - "filter_duration", - "total_duration", - ], + &["offset", "count", "gpu_duration", "filter_duration"], ) .unwrap(); @@ -145,29 +145,30 @@ fn calculate_primes(prime_opts: CalculatePrimes, controller: KernelController) - if offset < 2 { prime_sender.send(vec![2]).unwrap(); } - loop { - let start = Instant::now(); - let numbers = (offset..(prime_opts.numbers_per_step as u64 * 2 + offset)) - .step_by(2) - .collect::>(); - println!( - "Filtering primes from {} numbers, offset: {}", - numbers.len(), - offset - ); - let prime_result = if prime_opts.no_cache { - controller.filter_primes_simple(numbers)? - } else { - controller.filter_primes(numbers)? - }; + let executor = ConcurrentKernelExecutor::new(controller); + let (tx, rx) = channel(); + + let executor_thread = std::thread::spawn({ + let prime_opts = prime_opts.clone(); + move || { + executor.calculate_primes( + prime_opts.start_offset, + prime_opts.numbers_per_step, + prime_opts.max_number, + prime_opts.no_cache, + prime_opts.num_threads, + tx, + ) + } + }); + for prime_result in rx { + let offset = prime_result.primes.last().cloned().unwrap(); let primes = prime_result.primes; - let elapsed_ms = start.elapsed().as_secs_f64() * 1000f64; - println!( - "Calculated {} primes in {:.4} ms: {:.4} checks/s", + "Calculated {} primes: {:.4} checks/s, offset: {}", primes.len(), - elapsed_ms, - prime_opts.numbers_per_step as f64 / start.elapsed().as_secs_f64() + prime_opts.numbers_per_step as f64 / prime_result.gpu_duration.as_secs_f64(), + offset, ); csv_sender .send(vec![ @@ -175,28 +176,19 @@ fn calculate_primes(prime_opts: CalculatePrimes, controller: KernelController) - primes.len().to_string(), duration_to_ms_string(&prime_result.gpu_duration), duration_to_ms_string(&prime_result.filter_duration), - elapsed_ms.to_string(), ]) .unwrap(); - if prime_opts.cpu_validate { validate_primes_on_cpu(&primes) } - println!(); prime_sender.send(primes).unwrap(); - - if (prime_opts.numbers_per_step as u128 * 2 + offset as u128) - > prime_opts.max_number as u128 - { - break; - } - offset += prime_opts.numbers_per_step as u64 * 2; } mem::drop(prime_sender); mem::drop(csv_sender); prime_handle.join().unwrap(); csv_handle.join().unwrap(); + executor_thread.join().unwrap(); Ok(()) }