diff --git a/src/format/tar.rs b/src/format/tar.rs index c7c6834..526aa98 100644 --- a/src/format/tar.rs +++ b/src/format/tar.rs @@ -1,7 +1,6 @@ use crate::format::gzip::GZipFormat; use crate::format::xz::XZFormat; use crate::format::{FileFormat, FileObject}; -use crate::utils::xz_decoder::XzDecoder; use anyhow::{bail, Context}; use libflate::gzip; use std::fs::File; @@ -48,8 +47,11 @@ impl FileFormat for TarFormat { let mut reader = BufReader::new(File::open(file).context("Opening input")?); match self { TarFormat::Xz => { - let mut decoder = XzDecoder::new(reader); - extract_tar(&mut decoder, output) + tracing::debug!("Creating memory mapped file"); + tracing::debug!("Decompressing into memorys"); + let mut buf = Vec::new(); + lzma_rs::xz_decompress(&mut reader, &mut buf).context("Decompressing file")?; + extract_tar(&mut &buf[..], output) } TarFormat::Gz => { let mut decoder = gzip::Decoder::new(&mut reader).context("Creating decoder")?; diff --git a/src/utils/channel_sink.rs b/src/utils/channel_sink.rs deleted file mode 100644 index aba1b27..0000000 --- a/src/utils/channel_sink.rs +++ /dev/null @@ -1,59 +0,0 @@ -use std::io::{ErrorKind, Write}; -use std::sync::mpsc::{sync_channel, Receiver, SyncSender}; -use std::{io, mem}; - -pub struct ChannelSink { - buffer: Vec, - block_size: usize, - tx: SyncSender>, -} - -impl ChannelSink { - /// Creates a new sink with a channel to send the data to - pub fn new(block_size: usize) -> (Self, Receiver>) { - let (tx, rx) = sync_channel(1); - ( - Self { - buffer: Vec::new(), - block_size, - tx, - }, - rx, - ) - } -} - -impl Write for ChannelSink { - #[tracing::instrument(skip_all, level = "trace")] - fn write(&mut self, buf: &[u8]) -> std::io::Result { - self.buffer.append(&mut buf.to_vec()); - if self.buffer.len() >= self.block_size { - tracing::trace!("Block size reached. Sending buffer..."); - self.tx - .send(mem::take(&mut self.buffer)) - .map_err(|e| io::Error::new(ErrorKind::BrokenPipe, e))?; - } - - Ok(buf.len()) - } - - #[tracing::instrument(skip_all, level = "trace")] - fn flush(&mut self) -> std::io::Result<()> { - if !self.buffer.is_empty() { - self.tx - .send(mem::take(&mut self.buffer)) - .map_err(|e| io::Error::new(ErrorKind::BrokenPipe, e))?; - } - - Ok(()) - } -} - -impl Drop for ChannelSink { - #[tracing::instrument(skip_all, level = "trace")] - fn drop(&mut self) { - if let Err(e) = self.flush() { - tracing::debug!("Error while trying to flush buffer during drop {e}") - } - } -} diff --git a/src/utils/mod.rs b/src/utils/mod.rs index a375c82..8b13789 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -1,2 +1 @@ -pub mod xz_decoder; -pub mod channel_sink; + diff --git a/src/utils/xz_decoder.rs b/src/utils/xz_decoder.rs deleted file mode 100644 index a9f6151..0000000 --- a/src/utils/xz_decoder.rs +++ /dev/null @@ -1,52 +0,0 @@ -use crate::utils::channel_sink::ChannelSink; -use lzma_rs::xz_decompress; -use std::cmp::min; -use std::io; -use std::io::{BufRead, Read, Write}; -use std::sync::mpsc::Receiver; - -pub struct XzDecoder { - buffer: Vec, - rx: Receiver>, -} - -impl XzDecoder { - pub fn new(mut reader: R) -> Self { - let (mut sink, rx) = ChannelSink::new(1024); - std::thread::spawn(move || { - tracing::debug!("Async decompression thread running"); - if let Err(e) = xz_decompress(&mut reader, &mut sink) { - tracing::error!("Async decompressing finished with error {e}"); - } else { - tracing::debug!("async decompressing succeeded"); - } - }); - Self { - rx, - buffer: Vec::new(), - } - } -} - -impl Read for XzDecoder { - #[tracing::instrument(skip_all, level = "trace")] - fn read(&mut self, mut buf: &mut [u8]) -> io::Result { - self.buffer.reverse(); - if self.buffer.is_empty() { - tracing::trace!("Receiving chunk from channel"); - if let Ok(chunk) = self.rx.recv() { - self.buffer = chunk; - } else { - tracing::debug!("Receiving timed out"); - } - } - - let max_write = min(self.buffer.len(), buf.len()); - tracing::trace!("Wrote {max_write} bytes"); - buf.write_all(&self.buffer[0..max_write])?; - self.buffer.reverse(); - self.buffer.truncate(self.buffer.len() - max_write); - - Ok(max_write) - } -}