Skip to content

Conversation

@nevi-me
Copy link
Contributor

@nevi-me nevi-me commented Jan 8, 2021

Adds Recordbatch body compression, which compresses the buffers that make up arrays (e.g. offsets, null buffer).
I've restricted the write side to only work with v5 of the metadata. We can expand on this later, as I think the non-legacy v4 supports the BodyCompression method implemented here. Reading should be fine if the compression info is specified.

This PR is built on top of ARROW-10299 (#9122).

I have not yet implemented ZSTD compression, but I expect it shouldn't be too much work, so can still be done as part of this PR.

@nevi-me
Copy link
Contributor Author

nevi-me commented Jan 8, 2021

For anyone who understands how LZ4 works, I need some help.
I'm able to read LZ4 compressed data written in Python from Rust, but I get an error when trying to read a file written by the Rust writer from pyarrow.

The error that I'm getting is:

OSError: Lz4 compressed input contains more than one frame

To reproduce the error, please:

  1. Run the arrow unit tests, so that a compressed file is created.
  2. Run the below to read the file in pyarrow
import pyarrow as pa
file = pa.ipc.open_file("$ARROW_ROOT/rust/arrow/target/debug/testdata/primitive_lz4.arrow_file")
batches = file.read_all()

Thanks

@mqy
Copy link
Contributor

mqy commented Jan 8, 2021

The error can be found at https://github.com/apache/arrow/blob/master/cpp/src/arrow/util/compression_lz4.cc line 283-285:

if (input_len != 0) {
      return Status::IOError("Lz4 compressed input contains more than one frame");
}

Seems that input_len remains non-zero while the decompression completed.

@github-actions
Copy link

github-actions bot commented Jan 8, 2021

@nevi-me
Copy link
Contributor Author

nevi-me commented Jan 8, 2021

Seems that input_len remains non-zero while the decompression completed.

the logic of how the buffers are compressed isn't explained in a beginner-friendly way, so I likely am implementing the compression incorrectly. I used the Java implementation in #8949 as a reference. I couldn't quite understand the C++ implementation.

output_buf.write_all(&(input_buf.len() as i64).to_le_bytes())?;
let mut encoder = lz4::EncoderBuilder::new().build(output_buf)?;
let mut from = 0;
loop {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This loop can be omitted.

@mqy
Copy link
Contributor

mqy commented Jan 9, 2021

@nevi-me would you please have a look at BodyCompressionBuilder from ipc/gen/Message.rs?

@nevi-me
Copy link
Contributor Author

nevi-me commented Jan 9, 2021

@nevi-me would you please have a look at BodyCompressionBuilder from ipc/gen/Message.rs?

Did you notice something with it? I might need a bit more context :)

@mqy
Copy link
Contributor

mqy commented Jan 9, 2021

Did you notice something with it? I might need a bit more context :)

pub struct BodyCompressionArgs {
    pub codec: CompressionType,
    pub method: BodyCompressionMethod,
}

Perhaps you missed RecordBatchBuilder::add_compression()?

@nevi-me
Copy link
Contributor Author

nevi-me commented Jan 9, 2021

Perhaps you missed RecordBatchBuilder::add_compression()?

I missed it at the dictionary write, but not when writing a plain recordbatch; so it's not that. If I hadn't written the compression details completely, the C++ implementation wouldn't have known that the message is compressed or with LZ4.
I'll debug when I have sufficient time

@mqy
Copy link
Contributor

mqy commented Jan 10, 2021

Perhaps you missed RecordBatchBuilder::add_compression()?

I missed it at the dictionary write, but not when writing a plain recordbatch; so it's not that. If I hadn't written the compression details completely, the C++ implementation wouldn't have known that the message is compressed or with LZ4.
I'll debug when I have sufficient time

@nevi-me I pulled your branch locally, found add_compression in writer.rs. I'm sorry that I failed to search add_compression from https://github.com/apache/arrow/pull/9137/fies because writer.rs is not loaded.

I'm installing pyarrow, after run test test_write_file_v5_compressed.

@mqy
Copy link
Contributor

mqy commented Jan 10, 2021

@nevi-me no luck to install pyarrow 2.0.0 due to various dependency errors, but found something:
Message.rs requires LZ4 frame format, not the block format.

  // LZ4 frame format, for portability, as provided by lz4frame.h or wrappers
  // thereof. Not to be confused with "raw" (also called "block") format
  // provided by lz4.h

It's a bit complicate to implement with c bindings as what frameCompress.c does. Also found another crate named lzzzz for lz4 frame:
https://docs.rs/lzzzz/0.8.0/lzzzz/lz4f/index.html, where both compress_to_vec and decompress_to_vec looks quite simple, I had made the unit test passed with these APIs.

FYI:

@mqy
Copy link
Contributor

mqy commented Jan 11, 2021

C++ has flags for codec. Perhaps we should also add feature for this, because:

  1. Compression may not be a very common feature, I saw ARROW-11188: [Rust] Support crypto functions from PostgreSQL dialect #9139 also has similar discussions.
  2. lz4 library has unsafe codes that depend on liblz4

Copy link
Contributor

@mqy mqy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps the arg total_len of buffers.push(ipc::Buffer::new(...) should not account for pad_len.

let pad_len = pad_to_8(len as u32);
let total_len: i64 = (len + pad_len) as i64;
// assert_eq!(len % 8, 0, "Buffer width not a multiple of 8 bytes");
buffers.push(ipc::Buffer::new(offset, total_len));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nevi-me perhaps the arg total_len of buffers.push(ipc::Buffer::new(...) should not account for pad_len. After fixing this possible bug, my test on pyarrow passed and no failure with cargo test, but I'm doubting if this is the root cause, because:

  • I changed quite a lot based on your PR.
  • This line was last updated in PR ARROW-518 a year ago.

I took quite some time to compare with the C++ code, it looks c++ set buffer size as actual memory size.
The IPC spec recordbatch-message states that:

The size field of Buffer is not required to account for padding bytes. Since this metadata can be used to communicate in-memory pointer addresses between libraries, it is recommended to set size to the actual memory size rather than the padded size.

FYI: 68a558b at master...mqy:nevi-me_ARROW-8676 based on your PR.

@nevi-me
Copy link
Contributor Author

nevi-me commented Jan 25, 2021

I'll come back to this in the coming weeks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants