From 7c43812937798aafa9d201df88c83e6d41672b81 Mon Sep 17 00:00:00 2001 From: trivernis Date: Sun, 15 Mar 2020 10:27:51 +0100 Subject: [PATCH] Add multithreaded serialization of chunks This results in the flush and flush_writer function being private since the finish function now waits for threads to finish. --- Cargo.toml | 7 +++-- README.md | 3 +- src/io.rs | 88 ++++++++++++++++++++++++++++++++++++++++++++++++++---- src/lib.rs | 6 ++-- 4 files changed, 90 insertions(+), 14 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 9c3e298..4686486 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "bdflib" -version = "0.2.0" +version = "0.3.0" authors = ["trivernis "] edition = "2018" license-file = "LICENSE" @@ -18,4 +18,7 @@ crate-type = ["lib"] crc = "1.8.1" xz2 = "0.1.6" -byteorder = "1.3.4" \ No newline at end of file +byteorder = "1.3.4" +crossbeam-channel = "0.4.2" +crossbeam-utils = "0.7.2" +num_cpus = "1.12.0" \ No newline at end of file diff --git a/README.md b/README.md index 3994ca7..5b94084 100644 --- a/README.md +++ b/README.md @@ -42,8 +42,7 @@ fn main() { let mut entry = DataEntry::new("foo".into()); entry.add_hash_value("fakehash".into(), vec![0, 2, 3]); bdf_writer.add_data_entry(entry).unwrap(); - bdf_writer.flush().unwrap(); - bdf_writer.flush_writer().unwrap(); + bdf_writer.finish().unwrap(); println!("Finished writing!"); } ``` diff --git a/src/io.rs b/src/io.rs index e851d7b..e284993 100644 --- a/src/io.rs +++ b/src/io.rs @@ -5,9 +5,21 @@ use std::convert::TryInto; use std::fs::File; use std::io::Error; use std::io::{BufReader, BufWriter, ErrorKind, Read, Write}; +use std::thread; +use crossbeam_channel::{bounded, Sender, Receiver}; +use crossbeam_utils::sync::WaitGroup; const ENTRIES_PER_CHUNK: u32 = 100_000; +struct ThreadManager { + pub sender_work: Option>, + pub receiver_work: Receiver, + pub sender_result: Sender, + pub receiver_result: Receiver, + pub wg: WaitGroup, + pub threads_started: bool, +} + pub struct BDFReader { reader: BufReader, pub metadata: Option, @@ -23,6 +35,36 @@ pub struct BDFWriter { head_written: bool, compressed: bool, compression_level: u32, + thread_manager: ThreadManager>, +} + +impl ThreadManager { + /// Creates a new thread manager to store channels and information + /// about threads to control them + pub fn new(cap: usize) -> Self { + let (s1, r1) = bounded(cap); + let (s2, r2) = bounded(cap); + Self { + sender_work: Some(s1), + receiver_work: r1, + sender_result: s2, + receiver_result: r2, + wg: WaitGroup::new(), + threads_started: false, + } + } + + /// Drops the sender for work. + pub fn drop_sender(&mut self) { + self.sender_work = None; + } + + /// Waits for the waitgroup + pub fn wait(&mut self) { + let wg = self.wg.clone(); + self.wg = WaitGroup::new(); + wg.wait(); + } } impl BDFWriter { @@ -35,6 +77,7 @@ impl BDFWriter { /// If the `compress` parameter is true, each data chunk will be compressed /// using lzma with a default level of 1. pub fn new(inner: File, entry_count: u64, compress: bool) -> Self { + let thread_manager = ThreadManager::new(num_cpus::get()); Self { metadata: MetaChunk::new(entry_count, ENTRIES_PER_CHUNK, compress), lookup_table: HashLookupTable::new(HashMap::new()), @@ -43,6 +86,27 @@ impl BDFWriter { head_written: false, compressed: compress, compression_level: 1, + thread_manager, + } + } + + /// Starts threads for parallel chunk compression + pub fn start_threads(&self) { + for _ in 0..num_cpus::get() { + let r = self.thread_manager.receiver_work.clone(); + let s = self.thread_manager.sender_result.clone(); + let wg: WaitGroup = self.thread_manager.wg.clone(); + let compress = self.compressed; + let compression_level = self.compression_level; + thread::spawn(move || { + for mut chunk in r.iter() { + if compress { + chunk.compress(compression_level).expect("failed to compress chunk"); + } + s.send(chunk.serialize()).expect("failed to send result"); + } + drop(wg); + }); } } @@ -75,7 +139,7 @@ impl BDFWriter { } /// Writes the data to the file - pub fn flush(&mut self) -> Result<(), Error> { + fn flush(&mut self) -> Result<(), Error> { if !self.head_written { self.writer.write(BDF_HDR)?; let mut generic_meta = GenericChunk::from(&self.metadata); @@ -84,13 +148,23 @@ impl BDFWriter { self.writer.write(generic_lookup.serialize().as_slice())?; self.head_written = true; } + if !self.thread_manager.threads_started { + self.start_threads(); + self.thread_manager.threads_started = true; + } let mut data_chunk = GenericChunk::from_data_entries(&self.data_entries, &self.lookup_table); - if self.compressed { - data_chunk.compress(self.compression_level)?; + if let Some(sender) = &self.thread_manager.sender_work { + sender.send(data_chunk).expect("failed to send work to threads"); + } else { + if self.compressed { + data_chunk.compress(self.compression_level)?; + } + self.thread_manager.sender_result.send(data_chunk.serialize()).expect("failed to send serialization result"); + } + while let Ok(data) = self.thread_manager.receiver_result.try_recv() { + self.writer.write(data.as_slice())?; } - let data = data_chunk.serialize(); - self.writer.write(data.as_slice())?; self.data_entries = Vec::new(); Ok(()) @@ -98,13 +172,15 @@ impl BDFWriter { /// Flushes the writer /// This should be called when no more data is being written - pub fn flush_writer(&mut self) -> Result<(), Error> { + fn flush_writer(&mut self) -> Result<(), Error> { self.writer.flush() } /// Flushes the buffered chunk data and the writer /// to finish the file. pub fn finish(&mut self) -> Result<(), Error> { + self.thread_manager.drop_sender(); + self.thread_manager.wait(); self.flush()?; self.flush_writer()?; diff --git a/src/lib.rs b/src/lib.rs index c8eb1b2..c8220bf 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -28,8 +28,7 @@ mod tests { entry_2.add_hash_value(FOO.to_string(), vec![4, 5, 2, 3]); writer.add_data_entry(entry_2)?; - writer.flush()?; - writer.flush_writer()?; + writer.finish()?; remove_file("tmp1.bdf")?; Ok(()) @@ -98,8 +97,7 @@ mod tests { entry_1.add_hash_value(FOO.to_string(), vec![2, 4, 0, 2]); writer.add_data_entry(entry_1)?; - writer.flush()?; - writer.flush_writer()?; + writer.finish()?; Ok(()) }