From 9fd548b86cc5d76ba9f6b907317b407d828802b2 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Thu, 3 Mar 2022 12:00:42 -0700 Subject: [PATCH 1/7] wip --- parquet/Cargo.toml | 5 ++-- parquet/src/compression.rs | 47 +++++++++++++++++++------------------- 2 files changed, 27 insertions(+), 25 deletions(-) diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml index 3cad64cf8caa..d7aa5a78ccb7 100644 --- a/parquet/Cargo.toml +++ b/parquet/Cargo.toml @@ -36,7 +36,8 @@ thrift = "0.13" snap = { version = "1.0", optional = true } brotli = { version = "3.3", optional = true } flate2 = { version = "1.0", optional = true } -lz4 = { version = "1.23", optional = true } +# lz4 = { version = "1.23", optional = true } +lz4_flex = { version = "0.9.2", optional = true, default-features = false, features = ["checked-decode"] } zstd = { version = "0.10", optional = true } chrono = { version = "0.4", default-features = false } num = "0.4" @@ -61,7 +62,7 @@ serde_json = { version = "1.0", features = ["preserve_order"] } arrow = { path = "../arrow", version = "9.1.0", default-features = false, features = ["test_utils", "prettyprint"] } [features] -default = ["arrow", "snap", "brotli", "flate2", "lz4", "zstd", "base64"] +default = ["arrow", "snap", "brotli", "flate2", "lz4_flex", "zstd", "base64"] cli = ["serde_json", "base64", "clap"] test_common = [] # Experimental, unstable functionality primarily used for testing diff --git a/parquet/src/compression.rs b/parquet/src/compression.rs index f4aecbf4e86f..6b2ec39af00b 100644 --- a/parquet/src/compression.rs +++ b/parquet/src/compression.rs @@ -245,32 +245,33 @@ mod lz4_codec { input_buf: &[u8], output_buf: &mut Vec, ) -> Result { - let mut decoder = lz4::Decoder::new(input_buf)?; - let mut buffer: [u8; LZ4_BUFFER_SIZE] = [0; LZ4_BUFFER_SIZE]; - let mut total_len = 0; - loop { - let len = decoder.read(&mut buffer)?; - if len == 0 { - break; - } - total_len += len; - output_buf.write_all(&buffer[0..len])?; - } - Ok(total_len) + let mut decoder = lz4_flex::frame::FrameDecoder::new(input_buf); + std::io::copy(&mut decoder, &mut output_buf)?; + Ok(output_buf.len()) } + // use lz4_flex::block::compress_prepend_size; + // use lz4_flex::block::decompress_size_prepended; + fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec) -> Result<()> { - let mut encoder = lz4::EncoderBuilder::new().build(output_buf)?; - let mut from = 0; - loop { - let to = std::cmp::min(from + LZ4_BUFFER_SIZE, input_buf.len()); - encoder.write_all(&input_buf[from..to])?; - from += LZ4_BUFFER_SIZE; - if from >= input_buf.len() { - break; - } - } - encoder.finish().1.map_err(|e| e.into()) + let mut encoder = lz4_flex::frame::FrameEncoder::new(output_buf); + std::io::copy(&mut input_buf, &mut encoder)?; + Ok(encoder.finish()) + + // let buf = lz4_flex::block::compress_prepend_size(input_buf); + // output_buf + + // let mut encoder = lz4::EncoderBuilder::new().build(output_buf)?; + // let mut from = 0; + // loop { + // let to = std::cmp::min(from + LZ4_BUFFER_SIZE, input_buf.len()); + // encoder.write_all(&input_buf[from..to])?; + // from += LZ4_BUFFER_SIZE; + // if from >= input_buf.len() { + // break; + // } + // } + // encoder.finish().1.map_err(|e| e.into()) } } } From 51536a3a53de94789994392468638f403066d229 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Thu, 3 Mar 2022 12:35:19 -0700 Subject: [PATCH 2/7] WIP lz4 --- parquet/Cargo.toml | 3 ++- parquet/src/compression.rs | 6 +++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml index d7aa5a78ccb7..6328ccb7635d 100644 --- a/parquet/Cargo.toml +++ b/parquet/Cargo.toml @@ -57,7 +57,8 @@ snap = "1.0" tempfile = "3.0" brotli = "3.3" flate2 = "1.0" -lz4 = "1.23" +lz4_flex = { version = "0.9.2", optional = true, default-features = false, features = ["checked-decode"] } +# lz4 = "1.23" serde_json = { version = "1.0", features = ["preserve_order"] } arrow = { path = "../arrow", version = "9.1.0", default-features = false, features = ["test_utils", "prettyprint"] } diff --git a/parquet/src/compression.rs b/parquet/src/compression.rs index 6b2ec39af00b..a5b061fad7f9 100644 --- a/parquet/src/compression.rs +++ b/parquet/src/compression.rs @@ -72,7 +72,7 @@ pub fn create_codec(codec: CodecType) -> Result>> { CodecType::GZIP => Ok(Some(Box::new(GZipCodec::new()))), #[cfg(any(feature = "snap", test))] CodecType::SNAPPY => Ok(Some(Box::new(SnappyCodec::new()))), - #[cfg(any(feature = "lz4", test))] + #[cfg(any(feature = "lz4_flex", test))] CodecType::LZ4 => Ok(Some(Box::new(LZ4Codec::new()))), #[cfg(any(feature = "zstd", test))] CodecType::ZSTD => Ok(Some(Box::new(ZSTDCodec::new()))), @@ -220,7 +220,7 @@ mod brotli_codec { #[cfg(any(feature = "brotli", test))] pub use brotli_codec::*; -#[cfg(any(feature = "lz4", test))] +#[cfg(any(feature = "lz4_flex", test))] mod lz4_codec { use std::io::{Read, Write}; @@ -275,7 +275,7 @@ mod lz4_codec { } } } -#[cfg(any(feature = "lz4", test))] +#[cfg(any(feature = "lz4_flex", test))] pub use lz4_codec::*; #[cfg(any(feature = "zstd", test))] From 7bba52051757590b01796b235bac8a7aaa134ef4 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Thu, 3 Mar 2022 19:00:19 -0700 Subject: [PATCH 3/7] wip --- parquet/Cargo.toml | 2 +- parquet/src/compression.rs | 13 +++++-------- 2 files changed, 6 insertions(+), 9 deletions(-) diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml index 6328ccb7635d..662ffa1df29f 100644 --- a/parquet/Cargo.toml +++ b/parquet/Cargo.toml @@ -57,7 +57,7 @@ snap = "1.0" tempfile = "3.0" brotli = "3.3" flate2 = "1.0" -lz4_flex = { version = "0.9.2", optional = true, default-features = false, features = ["checked-decode"] } +lz4_flex = { version = "0.9.2" } # lz4 = "1.23" serde_json = { version = "1.0", features = ["preserve_order"] } arrow = { path = "../arrow", version = "9.1.0", default-features = false, features = ["test_utils", "prettyprint"] } diff --git a/parquet/src/compression.rs b/parquet/src/compression.rs index a5b061fad7f9..ae65e6fcac23 100644 --- a/parquet/src/compression.rs +++ b/parquet/src/compression.rs @@ -222,8 +222,6 @@ pub use brotli_codec::*; #[cfg(any(feature = "lz4_flex", test))] mod lz4_codec { - use std::io::{Read, Write}; - use crate::compression::Codec; use crate::errors::Result; @@ -245,8 +243,8 @@ mod lz4_codec { input_buf: &[u8], output_buf: &mut Vec, ) -> Result { - let mut decoder = lz4_flex::frame::FrameDecoder::new(input_buf); - std::io::copy(&mut decoder, &mut output_buf)?; + let output = lz4_flex::block::decompress_size_prepended(input_buf).unwrap(); + output_buf.extend_from_slice(output.as_slice()); Ok(output_buf.len()) } @@ -254,10 +252,9 @@ mod lz4_codec { // use lz4_flex::block::decompress_size_prepended; fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec) -> Result<()> { - let mut encoder = lz4_flex::frame::FrameEncoder::new(output_buf); - std::io::copy(&mut input_buf, &mut encoder)?; - Ok(encoder.finish()) - + let output = lz4_flex::block::compress_prepend_size(input_buf); + output_buf.extend_from_slice(output.as_slice()); + Ok(()) // let buf = lz4_flex::block::compress_prepend_size(input_buf); // output_buf From 8ef281f3d27a003b4eb63e22d80de3789bf6a58d Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Thu, 3 Mar 2022 20:29:54 -0700 Subject: [PATCH 4/7] Working lz4? --- parquet/src/compression.rs | 48 +++++++++++++++++++++----------------- 1 file changed, 26 insertions(+), 22 deletions(-) diff --git a/parquet/src/compression.rs b/parquet/src/compression.rs index ae65e6fcac23..c0820a9cbf89 100644 --- a/parquet/src/compression.rs +++ b/parquet/src/compression.rs @@ -222,6 +222,8 @@ pub use brotli_codec::*; #[cfg(any(feature = "lz4_flex", test))] mod lz4_codec { + use std::io::{Read, Write}; + use crate::compression::Codec; use crate::errors::Result; @@ -243,32 +245,34 @@ mod lz4_codec { input_buf: &[u8], output_buf: &mut Vec, ) -> Result { - let output = lz4_flex::block::decompress_size_prepended(input_buf).unwrap(); - output_buf.extend_from_slice(output.as_slice()); - Ok(output_buf.len()) + let mut decoder = lz4_flex::frame::FrameDecoder::new(input_buf); + let mut buffer: [u8; LZ4_BUFFER_SIZE] = [0; LZ4_BUFFER_SIZE]; + let mut total_len = 0; + loop { + let len = decoder.read(&mut buffer).unwrap(); + // let len = decoder.read_exact(&mut buffer)?; + if len == 0 { + break; + } + total_len += len; + output_buf.write_all(&buffer[0..len])?; + } + Ok(total_len) } - // use lz4_flex::block::compress_prepend_size; - // use lz4_flex::block::decompress_size_prepended; - fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec) -> Result<()> { - let output = lz4_flex::block::compress_prepend_size(input_buf); - output_buf.extend_from_slice(output.as_slice()); + let mut encoder = lz4_flex::frame::FrameEncoder::new(output_buf); + let mut from = 0; + loop { + let to = std::cmp::min(from + LZ4_BUFFER_SIZE, input_buf.len()); + encoder.write_all(&input_buf[from..to])?; + from += LZ4_BUFFER_SIZE; + if from >= input_buf.len() { + break; + } + } + encoder.finish().unwrap(); Ok(()) - // let buf = lz4_flex::block::compress_prepend_size(input_buf); - // output_buf - - // let mut encoder = lz4::EncoderBuilder::new().build(output_buf)?; - // let mut from = 0; - // loop { - // let to = std::cmp::min(from + LZ4_BUFFER_SIZE, input_buf.len()); - // encoder.write_all(&input_buf[from..to])?; - // from += LZ4_BUFFER_SIZE; - // if from >= input_buf.len() { - // break; - // } - // } - // encoder.finish().1.map_err(|e| e.into()) } } } From 7188c06f40b8cc6607be8679df40c2a0c0f9f54b Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Thu, 3 Mar 2022 20:39:51 -0700 Subject: [PATCH 5/7] trigger ci --- README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.md b/README.md index 54dcbe74ff07..01ef517037e9 100644 --- a/README.md +++ b/README.md @@ -17,6 +17,8 @@ under the License. --> +trigger ci + # Native Rust implementation of Apache Arrow and Parquet [![Coverage Status](https://codecov.io/gh/apache/arrow-rs/rust/branch/master/graph/badge.svg)](https://codecov.io/gh/apache/arrow-rs?branch=master) From 6284b9208d6ee40e050da6d09fcacfb1cdc8ee72 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Thu, 3 Mar 2022 20:43:24 -0700 Subject: [PATCH 6/7] Rename lz4 optional back to lz4 --- README.md | 2 -- parquet/Cargo.toml | 7 +++---- parquet/src/compression.rs | 6 +++--- 3 files changed, 6 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index 01ef517037e9..54dcbe74ff07 100644 --- a/README.md +++ b/README.md @@ -17,8 +17,6 @@ under the License. --> -trigger ci - # Native Rust implementation of Apache Arrow and Parquet [![Coverage Status](https://codecov.io/gh/apache/arrow-rs/rust/branch/master/graph/badge.svg)](https://codecov.io/gh/apache/arrow-rs?branch=master) diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml index 662ffa1df29f..c06cf2af885e 100644 --- a/parquet/Cargo.toml +++ b/parquet/Cargo.toml @@ -36,8 +36,7 @@ thrift = "0.13" snap = { version = "1.0", optional = true } brotli = { version = "3.3", optional = true } flate2 = { version = "1.0", optional = true } -# lz4 = { version = "1.23", optional = true } -lz4_flex = { version = "0.9.2", optional = true, default-features = false, features = ["checked-decode"] } +lz4_flex = { version = "0.9.2", optional = true, default-features = false } zstd = { version = "0.10", optional = true } chrono = { version = "0.4", default-features = false } num = "0.4" @@ -58,12 +57,12 @@ tempfile = "3.0" brotli = "3.3" flate2 = "1.0" lz4_flex = { version = "0.9.2" } -# lz4 = "1.23" serde_json = { version = "1.0", features = ["preserve_order"] } arrow = { path = "../arrow", version = "9.1.0", default-features = false, features = ["test_utils", "prettyprint"] } [features] -default = ["arrow", "snap", "brotli", "flate2", "lz4_flex", "zstd", "base64"] +default = ["arrow", "snap", "brotli", "flate2", "lz4", "zstd", "base64"] +lz4 = ["lz4_flex"] cli = ["serde_json", "base64", "clap"] test_common = [] # Experimental, unstable functionality primarily used for testing diff --git a/parquet/src/compression.rs b/parquet/src/compression.rs index c0820a9cbf89..c4fbbc58600c 100644 --- a/parquet/src/compression.rs +++ b/parquet/src/compression.rs @@ -72,7 +72,7 @@ pub fn create_codec(codec: CodecType) -> Result>> { CodecType::GZIP => Ok(Some(Box::new(GZipCodec::new()))), #[cfg(any(feature = "snap", test))] CodecType::SNAPPY => Ok(Some(Box::new(SnappyCodec::new()))), - #[cfg(any(feature = "lz4_flex", test))] + #[cfg(any(feature = "lz4", test))] CodecType::LZ4 => Ok(Some(Box::new(LZ4Codec::new()))), #[cfg(any(feature = "zstd", test))] CodecType::ZSTD => Ok(Some(Box::new(ZSTDCodec::new()))), @@ -220,7 +220,7 @@ mod brotli_codec { #[cfg(any(feature = "brotli", test))] pub use brotli_codec::*; -#[cfg(any(feature = "lz4_flex", test))] +#[cfg(any(feature = "lz4", test))] mod lz4_codec { use std::io::{Read, Write}; @@ -276,7 +276,7 @@ mod lz4_codec { } } } -#[cfg(any(feature = "lz4_flex", test))] +#[cfg(any(feature = "lz4", test))] pub use lz4_codec::*; #[cfg(any(feature = "zstd", test))] From 83cafbd53703337961cd676f705ffd574c6c2417 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Thu, 3 Mar 2022 20:52:20 -0700 Subject: [PATCH 7/7] Use lz4 flex default features --- parquet/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml index c06cf2af885e..e810bb4bcbaa 100644 --- a/parquet/Cargo.toml +++ b/parquet/Cargo.toml @@ -36,7 +36,7 @@ thrift = "0.13" snap = { version = "1.0", optional = true } brotli = { version = "3.3", optional = true } flate2 = { version = "1.0", optional = true } -lz4_flex = { version = "0.9.2", optional = true, default-features = false } +lz4_flex = { version = "0.9.2", optional = true } zstd = { version = "0.10", optional = true } chrono = { version = "0.4", default-features = false } num = "0.4"