From 116122d3d8a207f87da408cef8f843bf62448cf6 Mon Sep 17 00:00:00 2001 From: trivernis Date: Sun, 15 Mar 2020 12:42:00 +0100 Subject: [PATCH] Add multithreaded decompression of chunks --- Cargo.toml | 2 +- src/io.rs | 76 ++++++++++++++++++++++++++++++++++++++++++++---------- 2 files changed, 64 insertions(+), 14 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 61df94c..30f0ee8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "bdflib" -version = "0.3.7" +version = "0.4.0" authors = ["trivernis "] edition = "2018" license-file = "LICENSE" diff --git a/src/io.rs b/src/io.rs index bff9520..68fc620 100644 --- a/src/io.rs +++ b/src/io.rs @@ -27,6 +27,7 @@ pub struct BDFReader { pub metadata: Option, pub lookup_table: Option, compressed: bool, + thread_manager: ThreadManager, } #[derive(Debug)] @@ -83,7 +84,6 @@ impl BDFWriter { /// If the `compress` parameter is true, each data chunk will be compressed /// using lzma with a default level of 1. pub fn new(inner: File, entry_count: u64, compress: bool) -> Self { - let thread_manager = ThreadManager::new(num_cpus::get()); Self { metadata: MetaChunk::new(entry_count, ENTRIES_PER_CHUNK, compress), lookup_table: HashLookupTable::new(HashMap::new()), @@ -92,7 +92,7 @@ impl BDFWriter { head_written: false, compressed: compress, compression_level: 1, - thread_manager, + thread_manager: ThreadManager::new(num_cpus::get()), } } @@ -225,6 +225,7 @@ impl BDFReader { lookup_table: None, reader: BufReader::new(inner), compressed: false, + thread_manager: ThreadManager::new(num_cpus::get()), } } @@ -236,12 +237,46 @@ impl BDFReader { Ok(()) } + /// Starts threads for decompressing chunks + fn start_threads(&mut self) { + for _ in 0..num_cpus::get() { + thread::spawn({ + let r = self.thread_manager.receiver_work.clone(); + let s = self.thread_manager.sender_result.clone(); + let wg = self.thread_manager.wg.clone(); + move || { + for mut chunk in r { + chunk.decompress().expect("failed to decompress chunk"); + s.send(chunk).expect("failed to send decompression result"); + } + drop(wg); + } + }); + if let Err(_) = self.add_compression_chunk() { + self.thread_manager.drop_sender(); + break; + } + } + } + + /// Adds a chunk to the decompression channel to be decompressed by a worker thread + fn add_compression_chunk(&mut self) -> Result<(), Error> { + let gen_chunk = self.next_chunk_raw()?; + if gen_chunk.name == DTBL_CHUNK_NAME.to_string() && self.compressed { + if let Err(_) = self.thread_manager.sender_work.send(gen_chunk) { + return Err(Error::new(ErrorKind::Other, "failed to send chunk data")) + } + } + + Ok(()) + } + /// Verifies the header of the file and reads and stores the metadata pub fn read_metadata(&mut self) -> Result<&MetaChunk, Error> { if !self.validate_header() { return Err(Error::new(ErrorKind::InvalidData, "invalid BDF Header")); } - let meta_chunk: MetaChunk = self.next_chunk()?.try_into()?; + let meta_chunk: MetaChunk = self.next_chunk_raw()?.try_into()?; if let Some(method) = &meta_chunk.compression_method { if *method == LZMA.to_string() { self.compressed = true; @@ -271,9 +306,13 @@ impl BDFReader { None => self.read_metadata()?, Some(t) => t, }; - let lookup_table: HashLookupTable = self.next_chunk()?.try_into()?; + let lookup_table: HashLookupTable = self.next_chunk_raw()?.try_into()?; self.lookup_table = Some(lookup_table); + if self.compressed { + self.start_threads(); + } + if let Some(chunk) = &self.lookup_table { Ok(&chunk) } else { @@ -292,8 +331,24 @@ impl BDFReader { header == BDF_HDR.as_ref() } - /// Returns the next chunk if one is available. + /// Returns the next chunk pub fn next_chunk(&mut self) -> Result { + if self.compressed { + if let Err(_) = self.add_compression_chunk() { + self.thread_manager.drop_sender(); + } + if let Ok(chunk) = self.thread_manager.receiver_result.recv() { + Ok(chunk) + } else { + Err(Error::new(ErrorKind::Other, "failed to get chunk")) + } + } else { + self.next_chunk_raw() + } + } + + /// Returns the next chunk if one is available. + fn next_chunk_raw(&mut self) -> Result { let mut length_raw = [0u8; 4]; let _ = self.reader.read_exact(&mut length_raw)?; let length = BigEndian::read_u32(&mut length_raw); @@ -305,17 +360,12 @@ impl BDFReader { let mut crc_raw = [0u8; 4]; let _ = self.reader.read_exact(&mut crc_raw)?; let crc = BigEndian::read_u32(&mut crc_raw); - let mut gen_chunk = GenericChunk { + + Ok(GenericChunk { length, name, data, crc, - }; - - if gen_chunk.name == DTBL_CHUNK_NAME.to_string() && self.compressed { - gen_chunk.decompress()?; - } - - Ok(gen_chunk) + }) } }