Change to dropping work sender explicitly

master
trivernis 5 years ago
parent 6f2ea29725
commit 0408a5aea0

@ -1,6 +1,6 @@
[package] [package]
name = "bdflib" name = "bdflib"
version = "0.3.4" version = "0.3.5"
authors = ["trivernis <trivernis@gmail.com>"] authors = ["trivernis <trivernis@gmail.com>"]
edition = "2018" edition = "2018"
license-file = "LICENSE" license-file = "LICENSE"

@ -13,7 +13,7 @@ const ENTRIES_PER_CHUNK: u32 = 100_000;
#[derive(Debug)] #[derive(Debug)]
struct ThreadManager<T1, T2> { struct ThreadManager<T1, T2> {
pub sender_work: Option<Sender<T1>>, pub sender_work: Sender<T1>,
pub receiver_work: Receiver<T1>, pub receiver_work: Receiver<T1>,
pub sender_result: Sender<T2>, pub sender_result: Sender<T2>,
pub receiver_result: Receiver<T2>, pub receiver_result: Receiver<T2>,
@ -48,7 +48,7 @@ impl<T1, T2> ThreadManager<T1, T2> {
let (s1, r1) = bounded(cap); let (s1, r1) = bounded(cap);
let (s2, r2) = bounded(cap); let (s2, r2) = bounded(cap);
Self { Self {
sender_work: Some(s1), sender_work: s1,
receiver_work: r1, receiver_work: r1,
sender_result: s2, sender_result: s2,
receiver_result: r2, receiver_result: r2,
@ -59,7 +59,10 @@ impl<T1, T2> ThreadManager<T1, T2> {
/// Drops the sender for work. /// Drops the sender for work.
pub fn drop_sender(&mut self) { pub fn drop_sender(&mut self) {
self.sender_work = None; let sender = self.sender_work.clone();
let (s1, _) = bounded(0);
self.sender_work = s1;
drop(sender);
} }
/// Waits for the wait group /// Waits for the wait group
@ -94,7 +97,7 @@ impl BDFWriter {
} }
/// Starts threads for parallel chunk compression /// Starts threads for parallel chunk compression
pub fn start_threads(&self) { fn start_threads(&self) {
for _ in 0..num_cpus::get() { for _ in 0..num_cpus::get() {
let compress = self.compressed; let compress = self.compressed;
let compression_level = self.compression_level; let compression_level = self.compression_level;
@ -158,16 +161,9 @@ impl BDFWriter {
self.start_threads(); self.start_threads();
self.thread_manager.threads_started = true; self.thread_manager.threads_started = true;
} }
let mut data_chunk = let data_chunk =
GenericChunk::from_data_entries(&self.data_entries, &self.lookup_table); GenericChunk::from_data_entries(&self.data_entries, &self.lookup_table);
if let Some(sender) = &self.thread_manager.sender_work { self.thread_manager.sender_work.send(data_chunk).expect("failed to send work to threads");
sender.send(data_chunk).expect("failed to send work to threads");
} else {
if self.compressed {
data_chunk.compress(self.compression_level)?;
}
self.thread_manager.sender_result.send(data_chunk.serialize()).expect("failed to send serialization result");
}
self.write_serialized()?; self.write_serialized()?;
self.data_entries = Vec::new(); self.data_entries = Vec::new();

@ -36,9 +36,9 @@ mod tests {
#[test] #[test]
fn it_writes_compressed() -> Result<(), Error> { fn it_writes_compressed() -> Result<(), Error> {
let mut writer = new_writer("tmp2.bdf", 3, true)?; let mut writer = new_writer("tmp2.bdf", 4, true)?;
writer.set_compression_level(3); writer.set_compression_level(3);
writer.set_entries_per_chunk(2)?; writer.set_entries_per_chunk(3)?;
writer.add_lookup_entry(HashEntry::new(FOO.to_string(), 4))?; writer.add_lookup_entry(HashEntry::new(FOO.to_string(), 4))?;
writer.add_lookup_entry(HashEntry::new(BAR.to_string(), 5))?; writer.add_lookup_entry(HashEntry::new(BAR.to_string(), 5))?;
@ -58,6 +58,11 @@ mod tests {
entry_3.add_hash_value(FOO.to_string(), vec![5, 5, 2, 3]); entry_3.add_hash_value(FOO.to_string(), vec![5, 5, 2, 3]);
writer.add_data_entry(entry_3)?; writer.add_data_entry(entry_3)?;
let mut entry_4 = DataEntry::new("lool".to_string());
entry_4.add_hash_value(BAR.to_string(), vec![1, 3, 2, 1, 5]);
entry_4.add_hash_value(FOO.to_string(), vec![5, 5, 2, 3]);
writer.add_data_entry(entry_4)?;
writer.finish()?; writer.finish()?;
remove_file("tmp2.bdf")?; remove_file("tmp2.bdf")?;

Loading…
Cancel
Save