Skip to content

Commit

Permalink
Add optional LZ4 flex (#124)
Browse files Browse the repository at this point in the history
Co-authored-by: Kyle Barron <[email protected]>
  • Loading branch information
jorgecarleitao and kylebarron authored Apr 15, 2022
1 parent e32fce0 commit efc2a91
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 2 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ jobs:
deactivate
- name: Run
run: cargo test
- name: Run lz4-flex
run: cargo test --no-default-features --features lz4_flex,bloom_filter,stream,snappy,brotli,zstd,gzip

clippy:
name: Clippy
Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ brotli = { version = "^3.3", optional = true }
flate2 = { version = "^1.0", optional = true }
lz4 = { version = "1.23.3", optional = true }
zstd = { version = "^0.11", optional = true, default-features = false }
lz4_flex = { version = "^0.9.2", optional = true }

xxhash-rust = { version="0.8.3", optional = true, features = ["xxh64"] }

Expand Down
19 changes: 17 additions & 2 deletions src/compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,17 @@ pub fn compress(
crate::error::Feature::Snappy,
"compress to snappy".to_string(),
)),
#[cfg(all(feature = "lz4_flex", not(feature = "lz4")))]
Compression::Lz4Raw => {
let output_buf_len = output_buf.len();
let required_len = lz4_flex::block::get_maximum_output_size(input_buf.len());
output_buf.resize(output_buf_len + required_len, 0);

let compressed_size =
lz4_flex::block::compress_into(input_buf, &mut output_buf[output_buf_len..])?;
output_buf.truncate(output_buf_len + compressed_size);
Ok(())
}
#[cfg(feature = "lz4")]
Compression::Lz4Raw => {
let output_buf_len = output_buf.len();
Expand All @@ -76,7 +87,7 @@ pub fn compress(
output_buf.truncate(output_buf_len + size);
Ok(())
}
#[cfg(not(feature = "lz4"))]
#[cfg(all(not(feature = "lz4"), not(feature = "lz4_flex")))]
Compression::Lz4Raw => Err(Error::FeatureNotActive(
crate::error::Feature::Lz4,
"compress to lz4".to_string(),
Expand Down Expand Up @@ -153,13 +164,17 @@ pub fn decompress(compression: Compression, input_buf: &[u8], output_buf: &mut [
crate::error::Feature::Snappy,
"decompress with snappy".to_string(),
)),
#[cfg(all(feature = "lz4_flex", not(feature = "lz4")))]
Compression::Lz4Raw => lz4_flex::block::decompress_into(input_buf, output_buf)
.map(|_| {})
.map_err(|e| e.into()),
#[cfg(feature = "lz4")]
Compression::Lz4Raw => {
lz4::block::decompress_to_buffer(input_buf, Some(output_buf.len() as i32), output_buf)
.map(|_| {})
.map_err(|e| e.into())
}
#[cfg(not(feature = "lz4"))]
#[cfg(all(not(feature = "lz4"), not(feature = "lz4_flex")))]
Compression::Lz4Raw => Err(Error::FeatureNotActive(
crate::error::Feature::Lz4,
"decompress with lz4".to_string(),
Expand Down
14 changes: 14 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,20 @@ impl From<snap::Error> for Error {
}
}

#[cfg(feature = "lz4_flex")]
impl From<lz4_flex::block::DecompressError> for Error {
fn from(e: lz4_flex::block::DecompressError) -> Error {
Error::General(format!("underlying lz4_flex error: {}", e))
}
}

#[cfg(feature = "lz4_flex")]
impl From<lz4_flex::block::CompressError> for Error {
fn from(e: lz4_flex::block::CompressError) -> Error {
Error::General(format!("underlying lz4_flex error: {}", e))
}
}

impl From<parquet_format_async_temp::thrift::Error> for Error {
fn from(e: parquet_format_async_temp::thrift::Error) -> Error {
Error::General(format!("underlying thrift error: {}", e))
Expand Down

0 comments on commit efc2a91

Please sign in to comment.