Add concurrent kernel executor

Signed-off-by: Trivernis <trivernis@protonmail.com>
pull/1/head
Trivernis 4 years ago
parent 7ee06b1d35
commit 16e3e4a1bc

@ -0,0 +1,84 @@
use crate::kernel_controller::primes::PrimeCalculationResult;
use crate::kernel_controller::KernelController;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::mpsc::Sender;
use std::sync::Arc;
use std::thread::Builder as ThreadBuilder;
pub struct ConcurrentKernelExecutor {
kernel_controller: KernelController,
}
impl ConcurrentKernelExecutor {
pub fn new(kernel_controller: KernelController) -> Self {
Self { kernel_controller }
}
pub fn calculate_primes(
&self,
mut offset: u64,
numbers_per_step: usize,
stop: u64,
no_cache: bool,
num_threads: usize,
sender: Sender<PrimeCalculationResult>,
) {
let mut handles = Vec::new();
if offset % 2 == 0 {
offset += 1;
}
let offset = Arc::new(AtomicU64::new(offset));
let panic = Arc::new(AtomicBool::new(false));
for i in 0..num_threads {
let sender = Sender::clone(&sender);
let controller = self.kernel_controller.clone();
let offset = Arc::clone(&offset);
let panic = Arc::clone(&panic);
handles.push(
ThreadBuilder::new()
.name(format!("executor-{}", i))
.spawn(move || loop {
if panic.load(Ordering::Relaxed) {
panic!("Planned panic");
}
if offset.load(Ordering::SeqCst) >= stop {
break;
}
let offset =
offset.fetch_add(numbers_per_step as u64 * 2, Ordering::SeqCst);
let numbers = (offset..(numbers_per_step as u64 * 2 + offset))
.step_by(2)
.collect::<Vec<u64>>();
let prime_result = if no_cache {
controller
.filter_primes_simple(numbers)
.map_err(|e| {
panic.store(true, Ordering::Relaxed);
e
})
.unwrap()
} else {
controller
.filter_primes(numbers)
.map_err(|e| {
panic.store(true, Ordering::Relaxed);
e
})
.unwrap()
};
if let Err(e) = sender.send(prime_result) {
panic.store(true, Ordering::Relaxed);
panic!(e);
}
})
.unwrap(),
);
}
for handle in handles {
handle.join().unwrap();
}
}
}

@ -0,0 +1 @@
pub mod executor;

@ -11,6 +11,7 @@ use ocl::ProQue;
pub mod bench; pub mod bench;
pub mod primes; pub mod primes;
#[derive(Clone)]
pub struct KernelController { pub struct KernelController {
pro_que: ProQue, pro_que: ProQue,
} }

@ -73,13 +73,18 @@ impl KernelController {
)); ));
} }
let prime_buffer = self let prime_buffer = {
.pro_que let prime_cache = PRIME_CACHE.lock();
.buffer_builder() let prime_buffer = self
.len(PRIME_CACHE.lock().len()) .pro_que
.build()?; .buffer_builder()
.len(prime_cache.len())
prime_buffer.write(&PRIME_CACHE.lock()[..]).enq()?; .build()?;
prime_buffer.write(&prime_cache[..]).enq()?;
prime_buffer
};
let input_buffer = self.pro_que.buffer_builder().len(input.len()).build()?; let input_buffer = self.pro_que.buffer_builder().len(input.len()).build()?;
input_buffer.write(&input[..]).enq()?; input_buffer.write(&input[..]).enq()?;

@ -4,6 +4,7 @@
* See LICENSE for more information * See LICENSE for more information
*/ */
use crate::concurrency::executor::ConcurrentKernelExecutor;
use crate::kernel_controller::primes::is_prime; use crate::kernel_controller::primes::is_prime;
use crate::kernel_controller::KernelController; use crate::kernel_controller::KernelController;
use crate::output::csv::CSVWriter; use crate::output::csv::CSVWriter;
@ -13,9 +14,11 @@ use std::fs::OpenOptions;
use std::io::BufWriter; use std::io::BufWriter;
use std::mem; use std::mem;
use std::path::PathBuf; use std::path::PathBuf;
use std::time::{Duration, Instant}; use std::sync::mpsc::channel;
use std::time::Duration;
use structopt::StructOpt; use structopt::StructOpt;
mod concurrency;
mod kernel_controller; mod kernel_controller;
mod output; mod output;
@ -65,6 +68,9 @@ struct CalculatePrimes {
/// If the calculated prime numbers should be validated on the cpu by a simple prime algorithm /// If the calculated prime numbers should be validated on the cpu by a simple prime algorithm
#[structopt(long = "cpu-validate")] #[structopt(long = "cpu-validate")]
cpu_validate: bool, cpu_validate: bool,
#[structopt(short = "p", long = "parallel", default_value = "2")]
num_threads: usize,
} }
#[derive(StructOpt, Clone, Debug)] #[derive(StructOpt, Clone, Debug)]
@ -112,7 +118,7 @@ fn calculate_primes(prime_opts: CalculatePrimes, controller: KernelController) -
OpenOptions::new() OpenOptions::new()
.create(true) .create(true)
.append(true) .append(true)
.open(prime_opts.output_file) .open(&prime_opts.output_file)
.unwrap(), .unwrap(),
); );
let timings = BufWriter::new( let timings = BufWriter::new(
@ -120,18 +126,12 @@ fn calculate_primes(prime_opts: CalculatePrimes, controller: KernelController) -
.create(true) .create(true)
.truncate(true) .truncate(true)
.write(true) .write(true)
.open(prime_opts.timings_file) .open(&prime_opts.timings_file)
.unwrap(), .unwrap(),
); );
let timings = CSVWriter::new( let timings = CSVWriter::new(
timings, timings,
&[ &["offset", "count", "gpu_duration", "filter_duration"],
"offset",
"count",
"gpu_duration",
"filter_duration",
"total_duration",
],
) )
.unwrap(); .unwrap();
@ -145,29 +145,30 @@ fn calculate_primes(prime_opts: CalculatePrimes, controller: KernelController) -
if offset < 2 { if offset < 2 {
prime_sender.send(vec![2]).unwrap(); prime_sender.send(vec![2]).unwrap();
} }
loop { let executor = ConcurrentKernelExecutor::new(controller);
let start = Instant::now(); let (tx, rx) = channel();
let numbers = (offset..(prime_opts.numbers_per_step as u64 * 2 + offset))
.step_by(2) let executor_thread = std::thread::spawn({
.collect::<Vec<u64>>(); let prime_opts = prime_opts.clone();
println!( move || {
"Filtering primes from {} numbers, offset: {}", executor.calculate_primes(
numbers.len(), prime_opts.start_offset,
offset prime_opts.numbers_per_step,
); prime_opts.max_number,
let prime_result = if prime_opts.no_cache { prime_opts.no_cache,
controller.filter_primes_simple(numbers)? prime_opts.num_threads,
} else { tx,
controller.filter_primes(numbers)? )
}; }
});
for prime_result in rx {
let offset = prime_result.primes.last().cloned().unwrap();
let primes = prime_result.primes; let primes = prime_result.primes;
let elapsed_ms = start.elapsed().as_secs_f64() * 1000f64;
println!( println!(
"Calculated {} primes in {:.4} ms: {:.4} checks/s", "Calculated {} primes: {:.4} checks/s, offset: {}",
primes.len(), primes.len(),
elapsed_ms, prime_opts.numbers_per_step as f64 / prime_result.gpu_duration.as_secs_f64(),
prime_opts.numbers_per_step as f64 / start.elapsed().as_secs_f64() offset,
); );
csv_sender csv_sender
.send(vec![ .send(vec![
@ -175,28 +176,19 @@ fn calculate_primes(prime_opts: CalculatePrimes, controller: KernelController) -
primes.len().to_string(), primes.len().to_string(),
duration_to_ms_string(&prime_result.gpu_duration), duration_to_ms_string(&prime_result.gpu_duration),
duration_to_ms_string(&prime_result.filter_duration), duration_to_ms_string(&prime_result.filter_duration),
elapsed_ms.to_string(),
]) ])
.unwrap(); .unwrap();
if prime_opts.cpu_validate { if prime_opts.cpu_validate {
validate_primes_on_cpu(&primes) validate_primes_on_cpu(&primes)
} }
println!();
prime_sender.send(primes).unwrap(); prime_sender.send(primes).unwrap();
if (prime_opts.numbers_per_step as u128 * 2 + offset as u128)
> prime_opts.max_number as u128
{
break;
}
offset += prime_opts.numbers_per_step as u64 * 2;
} }
mem::drop(prime_sender); mem::drop(prime_sender);
mem::drop(csv_sender); mem::drop(csv_sender);
prime_handle.join().unwrap(); prime_handle.join().unwrap();
csv_handle.join().unwrap(); csv_handle.join().unwrap();
executor_thread.join().unwrap();
Ok(()) Ok(())
} }

Loading…
Cancel
Save