diff --git a/native/Cargo.lock b/native/Cargo.lock index 538c40ee23..ad572acb9e 100644 --- a/native/Cargo.lock +++ b/native/Cargo.lock @@ -428,17 +428,6 @@ dependencies = [ "generic-array", ] -[[package]] -name = "brotli" -version = "3.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d640d25bc63c50fb1f0b545ffd80207d2e10a4c965530809b40ba3386825c391" -dependencies = [ - "alloc-no-stdlib", - "alloc-stdlib", - "brotli-decompressor 2.5.1", -] - [[package]] name = "brotli" version = "7.0.0" @@ -447,17 +436,7 @@ checksum = "cc97b8f16f944bba54f0433f07e30be199b6dc2bd25937444bbad560bcea29bd" dependencies = [ "alloc-no-stdlib", "alloc-stdlib", - "brotli-decompressor 4.0.1", -] - -[[package]] -name = "brotli-decompressor" -version = "2.5.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4e2e4afe60d7dd600fdd3de8d0f08c2b7ec039712e3b6137ff98b7004e82de4f" -dependencies = [ - "alloc-no-stdlib", - "alloc-stdlib", + "brotli-decompressor", ] [[package]] @@ -900,7 +879,6 @@ dependencies = [ "arrow-schema", "assertables", "async-trait", - "brotli 3.5.0", "bytes", "crc32fast", "criterion", @@ -912,7 +890,6 @@ dependencies = [ "datafusion-expr", "datafusion-functions-nested", "datafusion-physical-expr", - "flate2", "futures", "hex", "itertools 0.11.0", @@ -920,7 +897,6 @@ dependencies = [ "lazy_static", "log", "log4rs", - "lz4", "mimalloc", "num", "once_cell", @@ -932,7 +908,6 @@ dependencies = [ "regex", "serde", "simd-adler32", - "snap", "tempfile", "thiserror", "tokio", @@ -2111,25 +2086,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "lz4" -version = "1.28.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4d1febb2b4a79ddd1980eede06a8f7902197960aa0383ffcfdd62fe723036725" -dependencies = [ - "lz4-sys", -] - -[[package]] -name = "lz4-sys" -version = "1.11.1+lz4-1.10.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6bd8c0d6c6ed0cd30b3652886bb8711dc4bb01d637a68105a3d5158039b418e6" -dependencies = [ - "cc", - "libc", -] - [[package]] name = "lz4_flex" version = "0.11.3" @@ -2382,7 +2338,7 @@ dependencies = [ "arrow-schema", "arrow-select", "base64", - "brotli 7.0.0", + "brotli", "bytes", "chrono", "flate2", diff --git a/native/core/Cargo.toml b/native/core/Cargo.toml index 489da46d47..5089e67a03 100644 --- a/native/core/Cargo.toml +++ b/native/core/Cargo.toml @@ -52,10 +52,6 @@ serde = { version = "1", features = ["derive"] } lazy_static = "1.4.0" prost = "0.12.1" jni = "0.21" -snap = "1.1" -brotli = "3.3" -flate2 = "1.0" -lz4 = "1.24" zstd = "0.11" rand = { workspace = true} num = { workspace = true } diff --git a/native/core/src/parquet/compression.rs b/native/core/src/parquet/compression.rs deleted file mode 100644 index 37b857f4a2..0000000000 --- a/native/core/src/parquet/compression.rs +++ /dev/null @@ -1,319 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! Contains codec interface and supported codec implementations. -//! -//! See [`Compression`](crate::basic::Compression) enum for all available compression -//! algorithms. -//! -//! # Example -//! -//! ```no_run -//! use comet::parquet::{basic::Compression, compression::create_codec}; -//! -//! let mut codec = match create_codec(Compression::SNAPPY) { -//! Ok(Some(codec)) => codec, -//! _ => panic!(), -//! }; -//! -//! let data = vec![b'p', b'a', b'r', b'q', b'u', b'e', b't']; -//! let mut compressed = vec![]; -//! codec.compress(&data[..], &mut compressed).unwrap(); -//! -//! let mut output = vec![]; -//! codec.decompress(&compressed[..], &mut output).unwrap(); -//! -//! assert_eq!(output, data); -//! ``` - -use super::basic::Compression as CodecType; -use crate::errors::{ParquetError, ParquetResult as Result}; - -use brotli::Decompressor; -use flate2::{read, write, Compression}; -use snap::raw::{decompress_len, max_compress_len, Decoder, Encoder}; -use std::io::{copy, Read, Write}; - -/// Parquet compression codec interface. -#[allow(clippy::ptr_arg)] -pub trait Codec { - /// Compresses data stored in slice `input_buf` and writes the compressed result - /// to `output_buf`. - /// Note that you'll need to call `clear()` before reusing the same `output_buf` - /// across different `compress` calls. - fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec) -> Result<()>; - - /// Decompresses data stored in slice `input_buf` and writes output to `output_buf`. - /// Returns the total number of bytes written. - fn decompress(&mut self, input_buf: &[u8], output_buf: &mut Vec) -> Result; -} - -/// Given the compression type `codec`, returns a codec used to compress and decompress -/// bytes for the compression type. -/// This returns `None` if the codec type is `UNCOMPRESSED`. -pub fn create_codec(codec: CodecType) -> Result>> { - match codec { - CodecType::BROTLI => Ok(Some(Box::new(BrotliCodec::new()))), - CodecType::GZIP => Ok(Some(Box::new(GZipCodec::new()))), - CodecType::SNAPPY => Ok(Some(Box::new(SnappyCodec::new()))), - CodecType::LZ4 => Ok(Some(Box::new(LZ4Codec::new()))), - CodecType::ZSTD => Ok(Some(Box::new(ZSTDCodec::new()))), - CodecType::UNCOMPRESSED => Ok(None), - _ => Err(nyi_err!("The codec type {} is not supported yet", codec)), - } -} - -/// Codec for Snappy compression format. -pub struct SnappyCodec { - decoder: Decoder, - encoder: Encoder, -} - -impl SnappyCodec { - /// Creates new Snappy compression codec. - pub(crate) fn new() -> Self { - Self { - decoder: Decoder::new(), - encoder: Encoder::new(), - } - } -} - -impl Codec for SnappyCodec { - fn decompress(&mut self, input_buf: &[u8], output_buf: &mut Vec) -> Result { - let len = decompress_len(input_buf)?; - output_buf.resize(len, 0); - self.decoder - .decompress(input_buf, output_buf) - .map_err(|e| e.into()) - } - - fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec) -> Result<()> { - let output_buf_len = output_buf.len(); - let required_len = max_compress_len(input_buf.len()); - output_buf.resize(output_buf_len + required_len, 0); - let n = self - .encoder - .compress(input_buf, &mut output_buf[output_buf_len..])?; - output_buf.truncate(output_buf_len + n); - Ok(()) - } -} - -/// Codec for GZIP compression algorithm. -pub struct GZipCodec {} - -impl GZipCodec { - /// Creates new GZIP compression codec. - pub(crate) fn new() -> Self { - Self {} - } -} - -impl Codec for GZipCodec { - fn decompress(&mut self, input_buf: &[u8], output_buf: &mut Vec) -> Result { - let mut decoder = read::GzDecoder::new(input_buf); - decoder.read_to_end(output_buf).map_err(|e| e.into()) - } - - fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec) -> Result<()> { - let mut encoder = write::GzEncoder::new(output_buf, Compression::default()); - encoder.write_all(input_buf)?; - encoder.try_finish().map_err(|e| e.into()) - } -} - -const BROTLI_DEFAULT_BUFFER_SIZE: usize = 4096; -const BROTLI_DEFAULT_COMPRESSION_QUALITY: u32 = 1; // supported levels 0-9 -const BROTLI_DEFAULT_LG_WINDOW_SIZE: u32 = 22; // recommended between 20-22 - -/// Codec for Brotli compression algorithm. -pub struct BrotliCodec {} - -impl BrotliCodec { - /// Creates new Brotli compression codec. - pub(crate) fn new() -> Self { - Self {} - } -} - -impl Codec for BrotliCodec { - fn decompress(&mut self, input_buf: &[u8], output_buf: &mut Vec) -> Result { - Decompressor::new(input_buf, BROTLI_DEFAULT_BUFFER_SIZE) - .read_to_end(output_buf) - .map_err(|e| e.into()) - } - - fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec) -> Result<()> { - let mut encoder = brotli::CompressorWriter::new( - output_buf, - BROTLI_DEFAULT_BUFFER_SIZE, - BROTLI_DEFAULT_COMPRESSION_QUALITY, - BROTLI_DEFAULT_LG_WINDOW_SIZE, - ); - encoder.write_all(input_buf)?; - encoder.flush().map_err(|e| e.into()) - } -} - -const LZ4_BUFFER_SIZE: usize = 4096; - -/// Codec for LZ4 compression algorithm. -pub struct LZ4Codec {} - -impl LZ4Codec { - /// Creates new LZ4 compression codec. - pub(crate) fn new() -> Self { - Self {} - } -} - -impl Codec for LZ4Codec { - fn decompress(&mut self, input_buf: &[u8], output_buf: &mut Vec) -> Result { - let mut decoder = lz4::Decoder::new(input_buf)?; - let mut buffer: [u8; LZ4_BUFFER_SIZE] = [0; LZ4_BUFFER_SIZE]; - let mut total_len = 0; - loop { - let len = decoder.read(&mut buffer)?; - if len == 0 { - break; - } - total_len += len; - output_buf.write_all(&buffer[0..len])?; - } - Ok(total_len) - } - - fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec) -> Result<()> { - let mut encoder = lz4::EncoderBuilder::new().build(output_buf)?; - let mut from = 0; - loop { - let to = std::cmp::min(from + LZ4_BUFFER_SIZE, input_buf.len()); - encoder.write_all(&input_buf[from..to])?; - from += LZ4_BUFFER_SIZE; - if from >= input_buf.len() { - break; - } - } - encoder.finish().1.map_err(|e| e.into()) - } -} - -/// Codec for Zstandard compression algorithm. -pub struct ZSTDCodec {} - -impl ZSTDCodec { - /// Creates new Zstandard compression codec. - pub(crate) fn new() -> Self { - Self {} - } -} - -/// Compression level (1-21) for ZSTD. Choose 1 here for better compression speed. -const ZSTD_COMPRESSION_LEVEL: i32 = 1; - -impl Codec for ZSTDCodec { - fn decompress(&mut self, input_buf: &[u8], output_buf: &mut Vec) -> Result { - let mut decoder = zstd::Decoder::new(input_buf)?; - match copy(&mut decoder, output_buf) { - Ok(n) => Ok(n as usize), - Err(e) => Err(e.into()), - } - } - - fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec) -> Result<()> { - let mut encoder = zstd::Encoder::new(output_buf, ZSTD_COMPRESSION_LEVEL)?; - encoder.write_all(input_buf)?; - match encoder.finish() { - Ok(_) => Ok(()), - Err(e) => Err(e.into()), - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - - use crate::parquet::util::test_common::*; - - fn test_roundtrip(c: CodecType, data: &[u8]) { - let mut c1 = create_codec(c).unwrap().unwrap(); - let mut c2 = create_codec(c).unwrap().unwrap(); - - // Compress with c1 - let mut compressed = Vec::new(); - let mut decompressed = Vec::new(); - c1.compress(data, &mut compressed) - .expect("Error when compressing"); - - // Decompress with c2 - let mut decompressed_size = c2 - .decompress(compressed.as_slice(), &mut decompressed) - .expect("Error when decompressing"); - assert_eq!(data.len(), decompressed_size); - decompressed.truncate(decompressed_size); - assert_eq!(data, decompressed.as_slice()); - - compressed.clear(); - - // Compress with c2 - c2.compress(data, &mut compressed) - .expect("Error when compressing"); - - // Decompress with c1 - decompressed_size = c1 - .decompress(compressed.as_slice(), &mut decompressed) - .expect("Error when decompressing"); - assert_eq!(data.len(), decompressed_size); - decompressed.truncate(decompressed_size); - assert_eq!(data, decompressed.as_slice()); - } - - fn test_codec(c: CodecType) { - let sizes = vec![100, 10000, 100000]; - for size in sizes { - let data = random_bytes(size); - test_roundtrip(c, &data); - } - } - - #[test] - fn test_codec_snappy() { - test_codec(CodecType::SNAPPY); - } - - #[test] - fn test_codec_gzip() { - test_codec(CodecType::GZIP); - } - - #[test] - fn test_codec_brotli() { - test_codec(CodecType::BROTLI); - } - - #[test] - fn test_codec_lz4() { - test_codec(CodecType::LZ4); - } - - #[test] - fn test_codec_zstd() { - test_codec(CodecType::ZSTD); - } -} diff --git a/native/core/src/parquet/util/jni_buffer.rs b/native/core/src/parquet/util/jni_buffer.rs deleted file mode 100644 index 33f36ed9dd..0000000000 --- a/native/core/src/parquet/util/jni_buffer.rs +++ /dev/null @@ -1,98 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use core::slice; -use std::ptr::NonNull; - -use jni::{ - objects::{ReleaseMode, TypeArray}, - sys::{jbyte, jbyteArray, JNI_TRUE}, - JavaVM, -}; - -use crate::errors::{CometError, CometResult as Result}; - -use super::Buffer; - -/// An immutable byte buffer wrapping a JNI byte array allocated on heap. -/// -/// Unlike `AutoArray`, this doesn't have a lifetime and can be used across different JNI calls. -pub struct JniBuffer { - /// A pointer for the JVM instance, used to obtain byte array elements (via - /// `GetByteArrayElements`) and release byte array elements (via `ReleaseByteArrayElements`). - jvm: JavaVM, - /// The original JNI byte array that backs this buffer - inner: jbyteArray, - /// The raw pointer from the JNI byte array - ptr: NonNull, - /// Total number of bytes in the original array (i.e., `inner`). - len: usize, - /// Whether the JNI byte array is copied or not. - is_copy: bool, -} - -impl JniBuffer { - pub fn try_new(jvm: JavaVM, array: jbyteArray, len: usize) -> Result { - let env = jvm.get_env()?; - let mut is_copy = 0xff; - let ptr = jbyte::get(&env, array.into(), &mut is_copy)?; - let res = Self { - jvm, - inner: array, - ptr: NonNull::new(ptr) - .ok_or_else(|| CometError::NullPointer("null byte array pointer".to_string()))?, - len, - is_copy: is_copy == JNI_TRUE, - }; - Ok(res) - } - - /// Whether the JNI byte array is copied or not, i.e., whether the JVM pinned down the original - /// Java byte array, or made a new copy of it. - pub fn is_copy(&self) -> bool { - self.is_copy - } -} - -impl Buffer for JniBuffer { - fn len(&self) -> usize { - self.len - } - - fn data(&self) -> &[u8] { - self.as_ref() - } -} - -impl AsRef<[u8]> for JniBuffer { - fn as_ref(&self) -> &[u8] { - unsafe { slice::from_raw_parts(self.ptr.as_ptr() as *mut u8 as *const u8, self.len) } - } -} - -impl Drop for JniBuffer { - fn drop(&mut self) { - let env = self.jvm.get_env().unwrap(); // TODO: log error here - jbyte::release( - &env, - self.inner.into(), - self.ptr, - ReleaseMode::NoCopyBack as i32, // don't copy back since it's read-only here - ) - .unwrap(); - } -}