Add multithreaded decompression of chunks

master
trivernis 5 years ago
parent dae491b88a
commit 116122d3d8

@ -1,6 +1,6 @@
[package] [package]
name = "bdflib" name = "bdflib"
version = "0.3.7" version = "0.4.0"
authors = ["trivernis <trivernis@gmail.com>"] authors = ["trivernis <trivernis@gmail.com>"]
edition = "2018" edition = "2018"
license-file = "LICENSE" license-file = "LICENSE"

@ -27,6 +27,7 @@ pub struct BDFReader {
pub metadata: Option<MetaChunk>, pub metadata: Option<MetaChunk>,
pub lookup_table: Option<HashLookupTable>, pub lookup_table: Option<HashLookupTable>,
compressed: bool, compressed: bool,
thread_manager: ThreadManager<GenericChunk, GenericChunk>,
} }
#[derive(Debug)] #[derive(Debug)]
@ -83,7 +84,6 @@ impl BDFWriter {
/// If the `compress` parameter is true, each data chunk will be compressed /// If the `compress` parameter is true, each data chunk will be compressed
/// using lzma with a default level of 1. /// using lzma with a default level of 1.
pub fn new(inner: File, entry_count: u64, compress: bool) -> Self { pub fn new(inner: File, entry_count: u64, compress: bool) -> Self {
let thread_manager = ThreadManager::new(num_cpus::get());
Self { Self {
metadata: MetaChunk::new(entry_count, ENTRIES_PER_CHUNK, compress), metadata: MetaChunk::new(entry_count, ENTRIES_PER_CHUNK, compress),
lookup_table: HashLookupTable::new(HashMap::new()), lookup_table: HashLookupTable::new(HashMap::new()),
@ -92,7 +92,7 @@ impl BDFWriter {
head_written: false, head_written: false,
compressed: compress, compressed: compress,
compression_level: 1, compression_level: 1,
thread_manager, thread_manager: ThreadManager::new(num_cpus::get()),
} }
} }
@ -225,6 +225,7 @@ impl BDFReader {
lookup_table: None, lookup_table: None,
reader: BufReader::new(inner), reader: BufReader::new(inner),
compressed: false, compressed: false,
thread_manager: ThreadManager::new(num_cpus::get()),
} }
} }
@ -236,12 +237,46 @@ impl BDFReader {
Ok(()) Ok(())
} }
/// Starts threads for decompressing chunks
fn start_threads(&mut self) {
for _ in 0..num_cpus::get() {
thread::spawn({
let r = self.thread_manager.receiver_work.clone();
let s = self.thread_manager.sender_result.clone();
let wg = self.thread_manager.wg.clone();
move || {
for mut chunk in r {
chunk.decompress().expect("failed to decompress chunk");
s.send(chunk).expect("failed to send decompression result");
}
drop(wg);
}
});
if let Err(_) = self.add_compression_chunk() {
self.thread_manager.drop_sender();
break;
}
}
}
/// Adds a chunk to the decompression channel to be decompressed by a worker thread
fn add_compression_chunk(&mut self) -> Result<(), Error> {
let gen_chunk = self.next_chunk_raw()?;
if gen_chunk.name == DTBL_CHUNK_NAME.to_string() && self.compressed {
if let Err(_) = self.thread_manager.sender_work.send(gen_chunk) {
return Err(Error::new(ErrorKind::Other, "failed to send chunk data"))
}
}
Ok(())
}
/// Verifies the header of the file and reads and stores the metadata /// Verifies the header of the file and reads and stores the metadata
pub fn read_metadata(&mut self) -> Result<&MetaChunk, Error> { pub fn read_metadata(&mut self) -> Result<&MetaChunk, Error> {
if !self.validate_header() { if !self.validate_header() {
return Err(Error::new(ErrorKind::InvalidData, "invalid BDF Header")); return Err(Error::new(ErrorKind::InvalidData, "invalid BDF Header"));
} }
let meta_chunk: MetaChunk = self.next_chunk()?.try_into()?; let meta_chunk: MetaChunk = self.next_chunk_raw()?.try_into()?;
if let Some(method) = &meta_chunk.compression_method { if let Some(method) = &meta_chunk.compression_method {
if *method == LZMA.to_string() { if *method == LZMA.to_string() {
self.compressed = true; self.compressed = true;
@ -271,9 +306,13 @@ impl BDFReader {
None => self.read_metadata()?, None => self.read_metadata()?,
Some(t) => t, Some(t) => t,
}; };
let lookup_table: HashLookupTable = self.next_chunk()?.try_into()?; let lookup_table: HashLookupTable = self.next_chunk_raw()?.try_into()?;
self.lookup_table = Some(lookup_table); self.lookup_table = Some(lookup_table);
if self.compressed {
self.start_threads();
}
if let Some(chunk) = &self.lookup_table { if let Some(chunk) = &self.lookup_table {
Ok(&chunk) Ok(&chunk)
} else { } else {
@ -292,8 +331,24 @@ impl BDFReader {
header == BDF_HDR.as_ref() header == BDF_HDR.as_ref()
} }
/// Returns the next chunk if one is available. /// Returns the next chunk
pub fn next_chunk(&mut self) -> Result<GenericChunk, Error> { pub fn next_chunk(&mut self) -> Result<GenericChunk, Error> {
if self.compressed {
if let Err(_) = self.add_compression_chunk() {
self.thread_manager.drop_sender();
}
if let Ok(chunk) = self.thread_manager.receiver_result.recv() {
Ok(chunk)
} else {
Err(Error::new(ErrorKind::Other, "failed to get chunk"))
}
} else {
self.next_chunk_raw()
}
}
/// Returns the next chunk if one is available.
fn next_chunk_raw(&mut self) -> Result<GenericChunk, Error> {
let mut length_raw = [0u8; 4]; let mut length_raw = [0u8; 4];
let _ = self.reader.read_exact(&mut length_raw)?; let _ = self.reader.read_exact(&mut length_raw)?;
let length = BigEndian::read_u32(&mut length_raw); let length = BigEndian::read_u32(&mut length_raw);
@ -305,17 +360,12 @@ impl BDFReader {
let mut crc_raw = [0u8; 4]; let mut crc_raw = [0u8; 4];
let _ = self.reader.read_exact(&mut crc_raw)?; let _ = self.reader.read_exact(&mut crc_raw)?;
let crc = BigEndian::read_u32(&mut crc_raw); let crc = BigEndian::read_u32(&mut crc_raw);
let mut gen_chunk = GenericChunk {
Ok(GenericChunk {
length, length,
name, name,
data, data,
crc, crc,
}; })
if gen_chunk.name == DTBL_CHUNK_NAME.to_string() && self.compressed {
gen_chunk.decompress()?;
}
Ok(gen_chunk)
} }
} }

Loading…
Cancel
Save