Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 40 additions & 26 deletions cpp/src/arrow/util/compression_lz4.cc
Original file line number Diff line number Diff line change
Expand Up @@ -417,38 +417,52 @@ class Lz4HadoopCodec : public Lz4Codec {

int64_t TryDecompressHadoop(int64_t input_len, const uint8_t* input,
int64_t output_buffer_len, uint8_t* output_buffer) {
// Parquet files written with the Hadoop Lz4Codec contain at the beginning
// of the input buffer two uint32_t's representing (in this order) expected
// decompressed size in bytes and expected compressed size in bytes.
// Parquet files written with the Hadoop Lz4Codec use their own framing.
// The input buffer can contain an arbitrary number of "frames", each
// with the following structure:
// - bytes 0..3: big-endian uint32_t representing the frame decompressed size
// - bytes 4..7: big-endian uint32_t representing the frame compressed size
// - bytes 8...: frame compressed data
//
// The Hadoop Lz4Codec source code can be found here:
// https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/Lz4Codec.cc
if (input_len < kPrefixLength) {
return kNotHadoop;
}

const uint32_t expected_decompressed_size =
BitUtil::FromBigEndian(SafeLoadAs<uint32_t>(input));
const uint32_t expected_compressed_size =
BitUtil::FromBigEndian(SafeLoadAs<uint32_t>(input + sizeof(uint32_t)));
const int64_t lz4_compressed_buffer_size = input_len - kPrefixLength;

// We use a heuristic to determine if the parquet file being read
// was compressed using the Hadoop Lz4Codec.
if (lz4_compressed_buffer_size == expected_compressed_size) {
// Parquet file was likely compressed with Hadoop Lz4Codec.
auto maybe_decompressed_size =
Lz4Codec::Decompress(lz4_compressed_buffer_size, input + kPrefixLength,
output_buffer_len, output_buffer);

if (maybe_decompressed_size.ok() &&
*maybe_decompressed_size == expected_decompressed_size) {
return *maybe_decompressed_size;
int64_t total_decompressed_size = 0;

while (input_len >= kPrefixLength) {
const uint32_t expected_decompressed_size =
BitUtil::FromBigEndian(SafeLoadAs<uint32_t>(input));
const uint32_t expected_compressed_size =
BitUtil::FromBigEndian(SafeLoadAs<uint32_t>(input + sizeof(uint32_t)));
input += kPrefixLength;
input_len -= kPrefixLength;

if (input_len < expected_compressed_size) {
// Not enough bytes for Hadoop "frame"
return kNotHadoop;
}
if (output_buffer_len < expected_decompressed_size) {
// Not enough bytes to hold advertised output => probably not Hadoop
return kNotHadoop;
}
// Try decompressing and compare with expected decompressed length
auto maybe_decompressed_size = Lz4Codec::Decompress(
expected_compressed_size, input, output_buffer_len, output_buffer);
if (!maybe_decompressed_size.ok() ||
*maybe_decompressed_size != expected_decompressed_size) {
return kNotHadoop;
}
input += expected_compressed_size;
input_len -= expected_compressed_size;
output_buffer += expected_decompressed_size;
output_buffer_len -= expected_decompressed_size;
total_decompressed_size += expected_decompressed_size;
}

// Parquet file was compressed without Hadoop Lz4Codec (or data is corrupt)
return kNotHadoop;
if (input_len == 0) {
return total_decompressed_size;
} else {
return kNotHadoop;
}
}
};

Expand Down
32 changes: 32 additions & 0 deletions cpp/src/parquet/reader_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ std::string non_hadoop_lz4_compressed() {
return data_file("non_hadoop_lz4_compressed.parquet");
}

// Larger data compressed using custom Hadoop LZ4 format (several frames)
std::string hadoop_lz4_compressed_larger() {
return data_file("hadoop_lz4_compressed_larger.parquet");
}

// TODO: Assert on definition and repetition levels
template <typename DType, typename ValueType>
void AssertColumnValues(std::shared_ptr<TypedColumnReader<DType>> col, int64_t batch_size,
Expand Down Expand Up @@ -553,6 +558,33 @@ TEST_P(TestCodec, FileMetadataAndValues) {
INSTANTIATE_TEST_SUITE_P(Lz4CodecTests, TestCodec,
::testing::Values(hadoop_lz4_compressed(),
non_hadoop_lz4_compressed()));

TEST(TestLz4HadoopCodec, TestSeveralFrames) {
// ARROW-9177: Hadoop can compress a data block in several LZ4 "frames"
auto file = ParquetFileReader::OpenFile(hadoop_lz4_compressed_larger());
auto group = file->RowGroup(0);

const int64_t kNumRows = 10000;

ASSERT_EQ(kNumRows, file->metadata()->num_rows());
ASSERT_EQ(1, file->metadata()->num_columns());
ASSERT_EQ(1, file->metadata()->num_row_groups());
ASSERT_EQ(kNumRows, group->metadata()->num_rows());

// column 0 ("a")
auto col = checked_pointer_cast<ByteArrayReader>(group->Column(0));

std::vector<ByteArray> values(kNumRows);
int64_t values_read;
auto levels_read =
col->ReadBatch(kNumRows, nullptr, nullptr, values.data(), &values_read);
ASSERT_EQ(kNumRows, levels_read);
ASSERT_EQ(kNumRows, values_read);
ASSERT_EQ(values[0], ByteArray("c7ce6bef-d5b0-4863-b199-8ea8c7fb117b"));
ASSERT_EQ(values[1], ByteArray("e8fb9197-cb9f-4118-b67f-fbfa65f61843"));
ASSERT_EQ(values[kNumRows - 2], ByteArray("ab52a0cc-c6bb-4d61-8a8f-166dc4b8b13c"));
ASSERT_EQ(values[kNumRows - 1], ByteArray("85440778-460a-41ac-aa2e-ac3ee41696bf"));
}
#endif

} // namespace parquet