From 0408a5aea00b4ddf0631074ffd498455a1fe5020 Mon Sep 17 00:00:00 2001 From: trivernis Date: Sun, 15 Mar 2020 11:21:52 +0100 Subject: [PATCH] Change to dropping work sender explicitly --- Cargo.toml | 2 +- src/io.rs | 22 +++++++++------------- src/lib.rs | 9 +++++++-- 3 files changed, 17 insertions(+), 16 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index d92a888..ccb7d10 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "bdflib" -version = "0.3.4" +version = "0.3.5" authors = ["trivernis "] edition = "2018" license-file = "LICENSE" diff --git a/src/io.rs b/src/io.rs index f2aa775..bff9520 100644 --- a/src/io.rs +++ b/src/io.rs @@ -13,7 +13,7 @@ const ENTRIES_PER_CHUNK: u32 = 100_000; #[derive(Debug)] struct ThreadManager { - pub sender_work: Option>, + pub sender_work: Sender, pub receiver_work: Receiver, pub sender_result: Sender, pub receiver_result: Receiver, @@ -48,7 +48,7 @@ impl ThreadManager { let (s1, r1) = bounded(cap); let (s2, r2) = bounded(cap); Self { - sender_work: Some(s1), + sender_work: s1, receiver_work: r1, sender_result: s2, receiver_result: r2, @@ -59,7 +59,10 @@ impl ThreadManager { /// Drops the sender for work. pub fn drop_sender(&mut self) { - self.sender_work = None; + let sender = self.sender_work.clone(); + let (s1, _) = bounded(0); + self.sender_work = s1; + drop(sender); } /// Waits for the wait group @@ -94,7 +97,7 @@ impl BDFWriter { } /// Starts threads for parallel chunk compression - pub fn start_threads(&self) { + fn start_threads(&self) { for _ in 0..num_cpus::get() { let compress = self.compressed; let compression_level = self.compression_level; @@ -158,16 +161,9 @@ impl BDFWriter { self.start_threads(); self.thread_manager.threads_started = true; } - let mut data_chunk = + let data_chunk = GenericChunk::from_data_entries(&self.data_entries, &self.lookup_table); - if let Some(sender) = &self.thread_manager.sender_work { - sender.send(data_chunk).expect("failed to send work to threads"); - } else { - if self.compressed { - data_chunk.compress(self.compression_level)?; - } - self.thread_manager.sender_result.send(data_chunk.serialize()).expect("failed to send serialization result"); - } + self.thread_manager.sender_work.send(data_chunk).expect("failed to send work to threads"); self.write_serialized()?; self.data_entries = Vec::new(); diff --git a/src/lib.rs b/src/lib.rs index 038a8b1..12eaf03 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -36,9 +36,9 @@ mod tests { #[test] fn it_writes_compressed() -> Result<(), Error> { - let mut writer = new_writer("tmp2.bdf", 3, true)?; + let mut writer = new_writer("tmp2.bdf", 4, true)?; writer.set_compression_level(3); - writer.set_entries_per_chunk(2)?; + writer.set_entries_per_chunk(3)?; writer.add_lookup_entry(HashEntry::new(FOO.to_string(), 4))?; writer.add_lookup_entry(HashEntry::new(BAR.to_string(), 5))?; @@ -58,6 +58,11 @@ mod tests { entry_3.add_hash_value(FOO.to_string(), vec![5, 5, 2, 3]); writer.add_data_entry(entry_3)?; + let mut entry_4 = DataEntry::new("lool".to_string()); + entry_4.add_hash_value(BAR.to_string(), vec![1, 3, 2, 1, 5]); + entry_4.add_hash_value(FOO.to_string(), vec![5, 5, 2, 3]); + writer.add_data_entry(entry_4)?; + writer.finish()?; remove_file("tmp2.bdf")?;