Change tar.xz deompression to take place in memory only

Signed-off-by: trivernis <trivernis@protonmail.com>
main
trivernis 2 years ago
parent 7d80d886ac
commit 69bd274c39
Signed by: Trivernis
GPG Key ID: DFFFCC2C7A02DB45

@ -1,7 +1,6 @@
use crate::format::gzip::GZipFormat; use crate::format::gzip::GZipFormat;
use crate::format::xz::XZFormat; use crate::format::xz::XZFormat;
use crate::format::{FileFormat, FileObject}; use crate::format::{FileFormat, FileObject};
use crate::utils::xz_decoder::XzDecoder;
use anyhow::{bail, Context}; use anyhow::{bail, Context};
use libflate::gzip; use libflate::gzip;
use std::fs::File; use std::fs::File;
@ -48,8 +47,11 @@ impl FileFormat for TarFormat {
let mut reader = BufReader::new(File::open(file).context("Opening input")?); let mut reader = BufReader::new(File::open(file).context("Opening input")?);
match self { match self {
TarFormat::Xz => { TarFormat::Xz => {
let mut decoder = XzDecoder::new(reader); tracing::debug!("Creating memory mapped file");
extract_tar(&mut decoder, output) tracing::debug!("Decompressing into memorys");
let mut buf = Vec::new();
lzma_rs::xz_decompress(&mut reader, &mut buf).context("Decompressing file")?;
extract_tar(&mut &buf[..], output)
} }
TarFormat::Gz => { TarFormat::Gz => {
let mut decoder = gzip::Decoder::new(&mut reader).context("Creating decoder")?; let mut decoder = gzip::Decoder::new(&mut reader).context("Creating decoder")?;

@ -1,59 +0,0 @@
use std::io::{ErrorKind, Write};
use std::sync::mpsc::{sync_channel, Receiver, SyncSender};
use std::{io, mem};
pub struct ChannelSink {
buffer: Vec<u8>,
block_size: usize,
tx: SyncSender<Vec<u8>>,
}
impl ChannelSink {
/// Creates a new sink with a channel to send the data to
pub fn new(block_size: usize) -> (Self, Receiver<Vec<u8>>) {
let (tx, rx) = sync_channel(1);
(
Self {
buffer: Vec::new(),
block_size,
tx,
},
rx,
)
}
}
impl Write for ChannelSink {
#[tracing::instrument(skip_all, level = "trace")]
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
self.buffer.append(&mut buf.to_vec());
if self.buffer.len() >= self.block_size {
tracing::trace!("Block size reached. Sending buffer...");
self.tx
.send(mem::take(&mut self.buffer))
.map_err(|e| io::Error::new(ErrorKind::BrokenPipe, e))?;
}
Ok(buf.len())
}
#[tracing::instrument(skip_all, level = "trace")]
fn flush(&mut self) -> std::io::Result<()> {
if !self.buffer.is_empty() {
self.tx
.send(mem::take(&mut self.buffer))
.map_err(|e| io::Error::new(ErrorKind::BrokenPipe, e))?;
}
Ok(())
}
}
impl Drop for ChannelSink {
#[tracing::instrument(skip_all, level = "trace")]
fn drop(&mut self) {
if let Err(e) = self.flush() {
tracing::debug!("Error while trying to flush buffer during drop {e}")
}
}
}

@ -1,2 +1 @@
pub mod xz_decoder;
pub mod channel_sink;

@ -1,52 +0,0 @@
use crate::utils::channel_sink::ChannelSink;
use lzma_rs::xz_decompress;
use std::cmp::min;
use std::io;
use std::io::{BufRead, Read, Write};
use std::sync::mpsc::Receiver;
pub struct XzDecoder {
buffer: Vec<u8>,
rx: Receiver<Vec<u8>>,
}
impl XzDecoder {
pub fn new<R: BufRead + Send + 'static>(mut reader: R) -> Self {
let (mut sink, rx) = ChannelSink::new(1024);
std::thread::spawn(move || {
tracing::debug!("Async decompression thread running");
if let Err(e) = xz_decompress(&mut reader, &mut sink) {
tracing::error!("Async decompressing finished with error {e}");
} else {
tracing::debug!("async decompressing succeeded");
}
});
Self {
rx,
buffer: Vec::new(),
}
}
}
impl Read for XzDecoder {
#[tracing::instrument(skip_all, level = "trace")]
fn read(&mut self, mut buf: &mut [u8]) -> io::Result<usize> {
self.buffer.reverse();
if self.buffer.is_empty() {
tracing::trace!("Receiving chunk from channel");
if let Ok(chunk) = self.rx.recv() {
self.buffer = chunk;
} else {
tracing::debug!("Receiving timed out");
}
}
let max_write = min(self.buffer.len(), buf.len());
tracing::trace!("Wrote {max_write} bytes");
buf.write_all(&self.buffer[0..max_write])?;
self.buffer.reverse();
self.buffer.truncate(self.buffer.len() - max_write);
Ok(max_write)
}
}
Loading…
Cancel
Save