diff --git a/cpp/src/arrow/util/compression_lz4.cc b/cpp/src/arrow/util/compression_lz4.cc index 365cd0f523e..bb0295e6858 100644 --- a/cpp/src/arrow/util/compression_lz4.cc +++ b/cpp/src/arrow/util/compression_lz4.cc @@ -417,38 +417,52 @@ class Lz4HadoopCodec : public Lz4Codec { int64_t TryDecompressHadoop(int64_t input_len, const uint8_t* input, int64_t output_buffer_len, uint8_t* output_buffer) { - // Parquet files written with the Hadoop Lz4Codec contain at the beginning - // of the input buffer two uint32_t's representing (in this order) expected - // decompressed size in bytes and expected compressed size in bytes. + // Parquet files written with the Hadoop Lz4Codec use their own framing. + // The input buffer can contain an arbitrary number of "frames", each + // with the following structure: + // - bytes 0..3: big-endian uint32_t representing the frame decompressed size + // - bytes 4..7: big-endian uint32_t representing the frame compressed size + // - bytes 8...: frame compressed data // // The Hadoop Lz4Codec source code can be found here: // https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/Lz4Codec.cc - if (input_len < kPrefixLength) { - return kNotHadoop; - } - - const uint32_t expected_decompressed_size = - BitUtil::FromBigEndian(SafeLoadAs(input)); - const uint32_t expected_compressed_size = - BitUtil::FromBigEndian(SafeLoadAs(input + sizeof(uint32_t))); - const int64_t lz4_compressed_buffer_size = input_len - kPrefixLength; - - // We use a heuristic to determine if the parquet file being read - // was compressed using the Hadoop Lz4Codec. - if (lz4_compressed_buffer_size == expected_compressed_size) { - // Parquet file was likely compressed with Hadoop Lz4Codec. - auto maybe_decompressed_size = - Lz4Codec::Decompress(lz4_compressed_buffer_size, input + kPrefixLength, - output_buffer_len, output_buffer); - - if (maybe_decompressed_size.ok() && - *maybe_decompressed_size == expected_decompressed_size) { - return *maybe_decompressed_size; + int64_t total_decompressed_size = 0; + + while (input_len >= kPrefixLength) { + const uint32_t expected_decompressed_size = + BitUtil::FromBigEndian(SafeLoadAs(input)); + const uint32_t expected_compressed_size = + BitUtil::FromBigEndian(SafeLoadAs(input + sizeof(uint32_t))); + input += kPrefixLength; + input_len -= kPrefixLength; + + if (input_len < expected_compressed_size) { + // Not enough bytes for Hadoop "frame" + return kNotHadoop; + } + if (output_buffer_len < expected_decompressed_size) { + // Not enough bytes to hold advertised output => probably not Hadoop + return kNotHadoop; } + // Try decompressing and compare with expected decompressed length + auto maybe_decompressed_size = Lz4Codec::Decompress( + expected_compressed_size, input, output_buffer_len, output_buffer); + if (!maybe_decompressed_size.ok() || + *maybe_decompressed_size != expected_decompressed_size) { + return kNotHadoop; + } + input += expected_compressed_size; + input_len -= expected_compressed_size; + output_buffer += expected_decompressed_size; + output_buffer_len -= expected_decompressed_size; + total_decompressed_size += expected_decompressed_size; } - // Parquet file was compressed without Hadoop Lz4Codec (or data is corrupt) - return kNotHadoop; + if (input_len == 0) { + return total_decompressed_size; + } else { + return kNotHadoop; + } } }; diff --git a/cpp/src/parquet/reader_test.cc b/cpp/src/parquet/reader_test.cc index 8dbcde8e5f2..78aa0693cb9 100644 --- a/cpp/src/parquet/reader_test.cc +++ b/cpp/src/parquet/reader_test.cc @@ -68,6 +68,11 @@ std::string non_hadoop_lz4_compressed() { return data_file("non_hadoop_lz4_compressed.parquet"); } +// Larger data compressed using custom Hadoop LZ4 format (several frames) +std::string hadoop_lz4_compressed_larger() { + return data_file("hadoop_lz4_compressed_larger.parquet"); +} + // TODO: Assert on definition and repetition levels template void AssertColumnValues(std::shared_ptr> col, int64_t batch_size, @@ -553,6 +558,33 @@ TEST_P(TestCodec, FileMetadataAndValues) { INSTANTIATE_TEST_SUITE_P(Lz4CodecTests, TestCodec, ::testing::Values(hadoop_lz4_compressed(), non_hadoop_lz4_compressed())); + +TEST(TestLz4HadoopCodec, TestSeveralFrames) { + // ARROW-9177: Hadoop can compress a data block in several LZ4 "frames" + auto file = ParquetFileReader::OpenFile(hadoop_lz4_compressed_larger()); + auto group = file->RowGroup(0); + + const int64_t kNumRows = 10000; + + ASSERT_EQ(kNumRows, file->metadata()->num_rows()); + ASSERT_EQ(1, file->metadata()->num_columns()); + ASSERT_EQ(1, file->metadata()->num_row_groups()); + ASSERT_EQ(kNumRows, group->metadata()->num_rows()); + + // column 0 ("a") + auto col = checked_pointer_cast(group->Column(0)); + + std::vector values(kNumRows); + int64_t values_read; + auto levels_read = + col->ReadBatch(kNumRows, nullptr, nullptr, values.data(), &values_read); + ASSERT_EQ(kNumRows, levels_read); + ASSERT_EQ(kNumRows, values_read); + ASSERT_EQ(values[0], ByteArray("c7ce6bef-d5b0-4863-b199-8ea8c7fb117b")); + ASSERT_EQ(values[1], ByteArray("e8fb9197-cb9f-4118-b67f-fbfa65f61843")); + ASSERT_EQ(values[kNumRows - 2], ByteArray("ab52a0cc-c6bb-4d61-8a8f-166dc4b8b13c")); + ASSERT_EQ(values[kNumRows - 1], ByteArray("85440778-460a-41ac-aa2e-ac3ee41696bf")); +} #endif } // namespace parquet diff --git a/cpp/submodules/parquet-testing b/cpp/submodules/parquet-testing index d914f9d2894..e31fe1a02c9 160000 --- a/cpp/submodules/parquet-testing +++ b/cpp/submodules/parquet-testing @@ -1 +1 @@ -Subproject commit d914f9d289488c7db1759d7a88a4a1b8f062c7dd +Subproject commit e31fe1a02c9e9f271e4bfb8002d403c52f1ef8eb