Add multithreaded serialization of chunks

This results in the flush and flush_writer function being private since the finish function now waits for threads to finish.
master
trivernis 5 years ago
parent 886ac5ee49
commit 7c43812937

@ -1,6 +1,6 @@
[package] [package]
name = "bdflib" name = "bdflib"
version = "0.2.0" version = "0.3.0"
authors = ["trivernis <trivernis@gmail.com>"] authors = ["trivernis <trivernis@gmail.com>"]
edition = "2018" edition = "2018"
license-file = "LICENSE" license-file = "LICENSE"
@ -18,4 +18,7 @@ crate-type = ["lib"]
crc = "1.8.1" crc = "1.8.1"
xz2 = "0.1.6" xz2 = "0.1.6"
byteorder = "1.3.4" byteorder = "1.3.4"
crossbeam-channel = "0.4.2"
crossbeam-utils = "0.7.2"
num_cpus = "1.12.0"

@ -42,8 +42,7 @@ fn main() {
let mut entry = DataEntry::new("foo".into()); let mut entry = DataEntry::new("foo".into());
entry.add_hash_value("fakehash".into(), vec![0, 2, 3]); entry.add_hash_value("fakehash".into(), vec![0, 2, 3]);
bdf_writer.add_data_entry(entry).unwrap(); bdf_writer.add_data_entry(entry).unwrap();
bdf_writer.flush().unwrap(); bdf_writer.finish().unwrap();
bdf_writer.flush_writer().unwrap();
println!("Finished writing!"); println!("Finished writing!");
} }
``` ```

@ -5,9 +5,21 @@ use std::convert::TryInto;
use std::fs::File; use std::fs::File;
use std::io::Error; use std::io::Error;
use std::io::{BufReader, BufWriter, ErrorKind, Read, Write}; use std::io::{BufReader, BufWriter, ErrorKind, Read, Write};
use std::thread;
use crossbeam_channel::{bounded, Sender, Receiver};
use crossbeam_utils::sync::WaitGroup;
const ENTRIES_PER_CHUNK: u32 = 100_000; const ENTRIES_PER_CHUNK: u32 = 100_000;
struct ThreadManager<T1, T2> {
pub sender_work: Option<Sender<T1>>,
pub receiver_work: Receiver<T1>,
pub sender_result: Sender<T2>,
pub receiver_result: Receiver<T2>,
pub wg: WaitGroup,
pub threads_started: bool,
}
pub struct BDFReader { pub struct BDFReader {
reader: BufReader<File>, reader: BufReader<File>,
pub metadata: Option<MetaChunk>, pub metadata: Option<MetaChunk>,
@ -23,6 +35,36 @@ pub struct BDFWriter {
head_written: bool, head_written: bool,
compressed: bool, compressed: bool,
compression_level: u32, compression_level: u32,
thread_manager: ThreadManager<GenericChunk, Vec<u8>>,
}
impl<T1, T2> ThreadManager<T1, T2> {
/// Creates a new thread manager to store channels and information
/// about threads to control them
pub fn new(cap: usize) -> Self {
let (s1, r1) = bounded(cap);
let (s2, r2) = bounded(cap);
Self {
sender_work: Some(s1),
receiver_work: r1,
sender_result: s2,
receiver_result: r2,
wg: WaitGroup::new(),
threads_started: false,
}
}
/// Drops the sender for work.
pub fn drop_sender(&mut self) {
self.sender_work = None;
}
/// Waits for the waitgroup
pub fn wait(&mut self) {
let wg = self.wg.clone();
self.wg = WaitGroup::new();
wg.wait();
}
} }
impl BDFWriter { impl BDFWriter {
@ -35,6 +77,7 @@ impl BDFWriter {
/// If the `compress` parameter is true, each data chunk will be compressed /// If the `compress` parameter is true, each data chunk will be compressed
/// using lzma with a default level of 1. /// using lzma with a default level of 1.
pub fn new(inner: File, entry_count: u64, compress: bool) -> Self { pub fn new(inner: File, entry_count: u64, compress: bool) -> Self {
let thread_manager = ThreadManager::new(num_cpus::get());
Self { Self {
metadata: MetaChunk::new(entry_count, ENTRIES_PER_CHUNK, compress), metadata: MetaChunk::new(entry_count, ENTRIES_PER_CHUNK, compress),
lookup_table: HashLookupTable::new(HashMap::new()), lookup_table: HashLookupTable::new(HashMap::new()),
@ -43,6 +86,27 @@ impl BDFWriter {
head_written: false, head_written: false,
compressed: compress, compressed: compress,
compression_level: 1, compression_level: 1,
thread_manager,
}
}
/// Starts threads for parallel chunk compression
pub fn start_threads(&self) {
for _ in 0..num_cpus::get() {
let r = self.thread_manager.receiver_work.clone();
let s = self.thread_manager.sender_result.clone();
let wg: WaitGroup = self.thread_manager.wg.clone();
let compress = self.compressed;
let compression_level = self.compression_level;
thread::spawn(move || {
for mut chunk in r.iter() {
if compress {
chunk.compress(compression_level).expect("failed to compress chunk");
}
s.send(chunk.serialize()).expect("failed to send result");
}
drop(wg);
});
} }
} }
@ -75,7 +139,7 @@ impl BDFWriter {
} }
/// Writes the data to the file /// Writes the data to the file
pub fn flush(&mut self) -> Result<(), Error> { fn flush(&mut self) -> Result<(), Error> {
if !self.head_written { if !self.head_written {
self.writer.write(BDF_HDR)?; self.writer.write(BDF_HDR)?;
let mut generic_meta = GenericChunk::from(&self.metadata); let mut generic_meta = GenericChunk::from(&self.metadata);
@ -84,13 +148,23 @@ impl BDFWriter {
self.writer.write(generic_lookup.serialize().as_slice())?; self.writer.write(generic_lookup.serialize().as_slice())?;
self.head_written = true; self.head_written = true;
} }
if !self.thread_manager.threads_started {
self.start_threads();
self.thread_manager.threads_started = true;
}
let mut data_chunk = let mut data_chunk =
GenericChunk::from_data_entries(&self.data_entries, &self.lookup_table); GenericChunk::from_data_entries(&self.data_entries, &self.lookup_table);
if self.compressed { if let Some(sender) = &self.thread_manager.sender_work {
data_chunk.compress(self.compression_level)?; sender.send(data_chunk).expect("failed to send work to threads");
} else {
if self.compressed {
data_chunk.compress(self.compression_level)?;
}
self.thread_manager.sender_result.send(data_chunk.serialize()).expect("failed to send serialization result");
}
while let Ok(data) = self.thread_manager.receiver_result.try_recv() {
self.writer.write(data.as_slice())?;
} }
let data = data_chunk.serialize();
self.writer.write(data.as_slice())?;
self.data_entries = Vec::new(); self.data_entries = Vec::new();
Ok(()) Ok(())
@ -98,13 +172,15 @@ impl BDFWriter {
/// Flushes the writer /// Flushes the writer
/// This should be called when no more data is being written /// This should be called when no more data is being written
pub fn flush_writer(&mut self) -> Result<(), Error> { fn flush_writer(&mut self) -> Result<(), Error> {
self.writer.flush() self.writer.flush()
} }
/// Flushes the buffered chunk data and the writer /// Flushes the buffered chunk data and the writer
/// to finish the file. /// to finish the file.
pub fn finish(&mut self) -> Result<(), Error> { pub fn finish(&mut self) -> Result<(), Error> {
self.thread_manager.drop_sender();
self.thread_manager.wait();
self.flush()?; self.flush()?;
self.flush_writer()?; self.flush_writer()?;

@ -28,8 +28,7 @@ mod tests {
entry_2.add_hash_value(FOO.to_string(), vec![4, 5, 2, 3]); entry_2.add_hash_value(FOO.to_string(), vec![4, 5, 2, 3]);
writer.add_data_entry(entry_2)?; writer.add_data_entry(entry_2)?;
writer.flush()?; writer.finish()?;
writer.flush_writer()?;
remove_file("tmp1.bdf")?; remove_file("tmp1.bdf")?;
Ok(()) Ok(())
@ -98,8 +97,7 @@ mod tests {
entry_1.add_hash_value(FOO.to_string(), vec![2, 4, 0, 2]); entry_1.add_hash_value(FOO.to_string(), vec![2, 4, 0, 2]);
writer.add_data_entry(entry_1)?; writer.add_data_entry(entry_1)?;
writer.flush()?; writer.finish()?;
writer.flush_writer()?;
Ok(()) Ok(())
} }

Loading…
Cancel
Save