From e2bd706449961bff3cbb52250b8a735972aa71eb Mon Sep 17 00:00:00 2001 From: Nicolas Paris Date: Mon, 15 May 2023 22:43:35 +0200 Subject: [PATCH 01/10] Make bloom working with the OP example --- .../io/storage/HoodieBaseParquetWriter.java | 52 +++++++++++++------ .../storage/TestHoodieBaseParquetWriter.java | 1 - 2 files changed, 37 insertions(+), 16 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBaseParquetWriter.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBaseParquetWriter.java index a82c26bae9219..873878020a7e1 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBaseParquetWriter.java +++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBaseParquetWriter.java @@ -26,11 +26,14 @@ import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.api.WriteSupport; +import java.io.Closeable; import java.io.IOException; import java.util.concurrent.atomic.AtomicLong; import static org.apache.parquet.column.ParquetProperties.DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK; import static org.apache.parquet.column.ParquetProperties.DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK; +import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_IS_VALIDATING_ENABLED; +import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_WRITER_VERSION; /** * Base class of Hudi's custom {@link ParquetWriter} implementations @@ -38,26 +41,41 @@ * @param target type of the object being written into Parquet files (for ex, * {@code IndexedRecord}, {@code InternalRow}) */ -public abstract class HoodieBaseParquetWriter extends ParquetWriter { +public abstract class HoodieBaseParquetWriter implements Closeable { private final AtomicLong writtenRecordCount = new AtomicLong(0); private final long maxFileSize; private long recordCountForNextSizeCheck; + private final ParquetWriter parquetWriter; public HoodieBaseParquetWriter(Path file, HoodieParquetConfig> parquetConfig) throws IOException { - super(HoodieWrapperFileSystem.convertToHoodiePath(file, parquetConfig.getHadoopConf()), - ParquetFileWriter.Mode.CREATE, - parquetConfig.getWriteSupport(), - parquetConfig.getCompressionCodecName(), - parquetConfig.getBlockSize(), - parquetConfig.getPageSize(), - parquetConfig.getPageSize(), - parquetConfig.dictionaryEnabled(), - DEFAULT_IS_VALIDATING_ENABLED, - DEFAULT_WRITER_VERSION, - FSUtils.registerFileSystem(file, parquetConfig.getHadoopConf())); + ParquetWriter.Builder parquetWriterbuilder = new ParquetWriter.Builder(HoodieWrapperFileSystem.convertToHoodiePath(file, parquetConfig.getHadoopConf())) { + @Override + protected ParquetWriter.Builder self() { + return this; + } + + @Override + protected WriteSupport getWriteSupport(org.apache.hadoop.conf.Configuration conf) { + return parquetConfig.getWriteSupport(); + } + }; + + parquetWriterbuilder.withWriteMode(ParquetFileWriter.Mode.CREATE); + parquetWriterbuilder.withCompressionCodec(parquetConfig.getCompressionCodecName()); + parquetWriterbuilder.withRowGroupSize(parquetConfig.getBlockSize()); + parquetWriterbuilder.withPageSize(parquetConfig.getPageSize()); + parquetWriterbuilder.withDictionaryPageSize(parquetConfig.getPageSize()); + parquetWriterbuilder.withDictionaryEncoding(parquetConfig.dictionaryEnabled()); + parquetWriterbuilder.withValidation(DEFAULT_IS_VALIDATING_ENABLED); + parquetWriterbuilder.withWriterVersion(DEFAULT_WRITER_VERSION); + parquetWriterbuilder.withConf(FSUtils.registerFileSystem(file, parquetConfig.getHadoopConf())); + parquetWriterbuilder.withBloomFilterEnabled("b", true); + + + parquetWriter = parquetWriterbuilder.build(); // We cannot accurately measure the snappy compressed output file size. We are choosing a // conservative 10% // TODO - compute this compression ratio dynamically by looking at the bytes written to the @@ -70,7 +88,7 @@ public HoodieBaseParquetWriter(Path file, public boolean canWrite() { long writtenCount = getWrittenRecordCount(); if (writtenCount >= recordCountForNextSizeCheck) { - long dataSize = getDataSize(); + long dataSize = this.parquetWriter.getDataSize(); // In some very extreme cases, like all records are same value, then it's possible // the dataSize is much lower than the writtenRecordCount(high compression ratio), // causing avgRecordSize to 0, we'll force the avgRecordSize to 1 for such cases. @@ -88,9 +106,8 @@ public boolean canWrite() { return true; } - @Override public void write(R object) throws IOException { - super.write(object); + this.parquetWriter.write(object); writtenRecordCount.incrementAndGet(); } @@ -102,4 +119,9 @@ protected long getWrittenRecordCount() { protected long getRecordCountForNextSizeCheck() { return recordCountForNextSizeCheck; } + + @Override + public void close() throws IOException { + this.parquetWriter.close(); + } } diff --git a/hudi-common/src/test/java/org/apache/hudi/io/storage/TestHoodieBaseParquetWriter.java b/hudi-common/src/test/java/org/apache/hudi/io/storage/TestHoodieBaseParquetWriter.java index 36bd0b5d4af8f..68de22ba2436c 100644 --- a/hudi-common/src/test/java/org/apache/hudi/io/storage/TestHoodieBaseParquetWriter.java +++ b/hudi-common/src/test/java/org/apache/hudi/io/storage/TestHoodieBaseParquetWriter.java @@ -53,7 +53,6 @@ public MockHoodieParquetWriter(Path file, HoodieParquetConfig Date: Mon, 15 May 2023 23:31:18 +0200 Subject: [PATCH 02/10] Parse hadoop conf to infer bloom config --- .../io/storage/HoodieBaseParquetWriter.java | 30 +++++++++++++++---- 1 file changed, 25 insertions(+), 5 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBaseParquetWriter.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBaseParquetWriter.java index 873878020a7e1..ccb81b3b6e72f 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBaseParquetWriter.java +++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBaseParquetWriter.java @@ -18,11 +18,15 @@ package org.apache.hudi.io.storage; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; + import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.fs.HoodieWrapperFileSystem; import org.apache.hudi.common.util.VisibleForTesting; + import org.apache.parquet.hadoop.ParquetFileWriter; +import org.apache.parquet.hadoop.ParquetOutputFormat; import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.api.WriteSupport; @@ -32,6 +36,8 @@ import static org.apache.parquet.column.ParquetProperties.DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK; import static org.apache.parquet.column.ParquetProperties.DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK; +import static org.apache.parquet.hadoop.ParquetOutputFormat.BLOOM_FILTER_ENABLED; +import static org.apache.parquet.hadoop.ParquetOutputFormat.BLOOM_FILTER_EXPECTED_NDV; import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_IS_VALIDATING_ENABLED; import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_WRITER_VERSION; @@ -39,7 +45,7 @@ * Base class of Hudi's custom {@link ParquetWriter} implementations * * @param target type of the object being written into Parquet files (for ex, - * {@code IndexedRecord}, {@code InternalRow}) + * {@code IndexedRecord}, {@code InternalRow}) */ public abstract class HoodieBaseParquetWriter implements Closeable { @@ -57,7 +63,7 @@ protected ParquetWriter.Builder self() { } @Override - protected WriteSupport getWriteSupport(org.apache.hadoop.conf.Configuration conf) { + protected WriteSupport getWriteSupport(Configuration conf) { return parquetConfig.getWriteSupport(); } }; @@ -71,9 +77,7 @@ protected WriteSupport getWriteSupport(org.apache.hadoop.conf.Configuration conf parquetWriterbuilder.withValidation(DEFAULT_IS_VALIDATING_ENABLED); parquetWriterbuilder.withWriterVersion(DEFAULT_WRITER_VERSION); parquetWriterbuilder.withConf(FSUtils.registerFileSystem(file, parquetConfig.getHadoopConf())); - parquetWriterbuilder.withBloomFilterEnabled("b", true); - - + handleParquetBloomFilters(parquetWriterbuilder, parquetConfig.getHadoopConf()); parquetWriter = parquetWriterbuilder.build(); // We cannot accurately measure the snappy compressed output file size. We are choosing a @@ -85,6 +89,22 @@ protected WriteSupport getWriteSupport(org.apache.hadoop.conf.Configuration conf this.recordCountForNextSizeCheck = DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK; } + protected void handleParquetBloomFilters(ParquetWriter.Builder parquetWriterbuilder, Configuration hadoopConf) { + parquetWriterbuilder.withBloomFilterEnabled(ParquetOutputFormat.getBloomFilterEnabled(hadoopConf)); + // inspired from https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java#L458-L464 + hadoopConf.forEach(conf -> { + String key = conf.getKey(); + if (key.startsWith(BLOOM_FILTER_ENABLED)) { + String column = key.substring(BLOOM_FILTER_ENABLED.length() + 1, key.length()); + parquetWriterbuilder.withBloomFilterEnabled(column, Boolean.valueOf(conf.getValue())); + } + if (key.startsWith(BLOOM_FILTER_EXPECTED_NDV)) { + String column = key.substring(BLOOM_FILTER_EXPECTED_NDV.length() + 1, key.length()); + parquetWriterbuilder.withBloomFilterNDV(column, Long.valueOf(conf.getValue(), -1)); + } + }); + } + public boolean canWrite() { long writtenCount = getWrittenRecordCount(); if (writtenCount >= recordCountForNextSizeCheck) { From c9d1268d249ee8906fe9041476f7eb8bacad9186 Mon Sep 17 00:00:00 2001 From: Nicolas Paris Date: Tue, 16 May 2023 20:48:35 +0200 Subject: [PATCH 03/10] Fix test --- .../org/apache/hudi/io/storage/HoodieBaseParquetWriter.java | 6 +++++- .../apache/hudi/io/storage/TestHoodieBaseParquetWriter.java | 1 + 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBaseParquetWriter.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBaseParquetWriter.java index ccb81b3b6e72f..94ba193651831 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBaseParquetWriter.java +++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBaseParquetWriter.java @@ -108,7 +108,7 @@ protected void handleParquetBloomFilters(ParquetWriter.Builder parquetWriterbuil public boolean canWrite() { long writtenCount = getWrittenRecordCount(); if (writtenCount >= recordCountForNextSizeCheck) { - long dataSize = this.parquetWriter.getDataSize(); + long dataSize = getDataSize(); // In some very extreme cases, like all records are same value, then it's possible // the dataSize is much lower than the writtenRecordCount(high compression ratio), // causing avgRecordSize to 0, we'll force the avgRecordSize to 1 for such cases. @@ -126,6 +126,10 @@ public boolean canWrite() { return true; } + public long getDataSize() { + return this.parquetWriter.getDataSize(); + } + public void write(R object) throws IOException { this.parquetWriter.write(object); writtenRecordCount.incrementAndGet(); diff --git a/hudi-common/src/test/java/org/apache/hudi/io/storage/TestHoodieBaseParquetWriter.java b/hudi-common/src/test/java/org/apache/hudi/io/storage/TestHoodieBaseParquetWriter.java index 68de22ba2436c..36bd0b5d4af8f 100644 --- a/hudi-common/src/test/java/org/apache/hudi/io/storage/TestHoodieBaseParquetWriter.java +++ b/hudi-common/src/test/java/org/apache/hudi/io/storage/TestHoodieBaseParquetWriter.java @@ -53,6 +53,7 @@ public MockHoodieParquetWriter(Path file, HoodieParquetConfig Date: Tue, 16 May 2023 22:10:35 +0200 Subject: [PATCH 04/10] Add parquet bloom filter test --- .../apache/hudi/TestHoodieParquetBloom.scala | 119 ++++++++++++++++++ 1 file changed, 119 insertions(+) create mode 100644 hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieParquetBloom.scala diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieParquetBloom.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieParquetBloom.scala new file mode 100644 index 0000000000000..372102ef779a9 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieParquetBloom.scala @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi + +import org.apache.spark.sql._ +import org.apache.spark.sql.hudi.HoodieSparkSessionExtension +import org.apache.spark.util.{AccumulatorV2} +import org.apache.spark.SparkContext + +import org.apache.hudi.testutils.HoodieClientTestUtils.getSparkConfForTest +import org.apache.hudi.DataSourceWriteOptions +import org.apache.hudi.config.HoodieWriteConfig +import org.apache.hudi.common.model.{HoodieTableType, WriteOperationType} + + +import org.junit.jupiter.api.Assertions.{assertEquals} +import org.junit.jupiter.api.{BeforeEach} +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.{EnumSource} + +class TestHoodieParquetBloomFilter { + + var spark: SparkSession = _ + var sqlContext: SQLContext = _ + var sc: SparkContext = _ + + def initSparkContext(): Unit = { + val sparkConf = getSparkConfForTest(getClass.getSimpleName) + + spark = SparkSession.builder() + .withExtensions(new HoodieSparkSessionExtension) + .config(sparkConf) + .getOrCreate() + + sc = spark.sparkContext + sc.setLogLevel("ERROR") + sqlContext = spark.sqlContext + } + + @BeforeEach + def setUp() { + initSparkContext() + } + + @ParameterizedTest + @EnumSource(value = classOf[WriteOperationType], names = Array("BULK_INSERT", "INSERT")) + def testBloomFilter(operation: WriteOperationType): Unit = { + // setup hadoop conf with bloom col enabled + spark.sparkContext.hadoopConfiguration.set("parquet.bloom.filter.enabled#bloom_col", "true") + + val basePath = java.nio.file.Files.createTempDirectory("hoodie_bloom_source_path").toAbsolutePath.toString + val opts = Map( + HoodieWriteConfig.TBL_NAME.key -> "hoodie_bloom", + DataSourceWriteOptions.TABLE_TYPE.key -> HoodieTableType.COPY_ON_WRITE.toString, + DataSourceWriteOptions.OPERATION.key -> operation.toString, + DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key", + DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition" + ) + val inputDF = spark.sql( + """select '0' as _row_key, '1' as bloom_col, '2' as partition, '3' as ts + |union + |select '1', '2', '3', '4' + |""".stripMargin) + inputDF.write.format("hudi") + .options(opts) + .mode(SaveMode.Overwrite) + .save(basePath) + + val accu = new NumRowGroupsAcc + spark.sparkContext.register(accu) + + // this one shall skip partition scanning thanks to bloom + spark.read.format("hudi").load(basePath).filter("bloom_col = '3'").foreachPartition((it: Iterator[Row]) => it.foreach(_ => accu.add(0))) + assertEquals(0, accu.value) + + // this one will trigger one partition scan + spark.read.format("hudi").load(basePath).filter("bloom_col = '2'").foreachPartition((it: Iterator[Row]) => it.foreach(_ => accu.add(0))) + assertEquals(1, accu.value) + } +} + +class NumRowGroupsAcc extends AccumulatorV2[Integer, Integer] { + private var _sum = 0 + + override def isZero: Boolean = _sum == 0 + + override def copy(): AccumulatorV2[Integer, Integer] = { + val acc = new NumRowGroupsAcc() + acc._sum = _sum + acc + } + + override def reset(): Unit = _sum = 0 + + override def add(v: Integer): Unit = _sum += v + + override def merge(other: AccumulatorV2[Integer, Integer]): Unit = other match { + case a: NumRowGroupsAcc => _sum += a._sum + case _ => throw new UnsupportedOperationException( + s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}") + } + + override def value: Integer = _sum +} From 2b6691f73fc7adaa373b47a5d49fce9045638141 Mon Sep 17 00:00:00 2001 From: Nicolas Paris Date: Tue, 16 May 2023 22:24:45 +0200 Subject: [PATCH 05/10] Also add other operations --- .../src/test/scala/org/apache/hudi/TestHoodieParquetBloom.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieParquetBloom.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieParquetBloom.scala index 372102ef779a9..2d816b219372c 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieParquetBloom.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieParquetBloom.scala @@ -58,7 +58,7 @@ class TestHoodieParquetBloomFilter { } @ParameterizedTest - @EnumSource(value = classOf[WriteOperationType], names = Array("BULK_INSERT", "INSERT")) + @EnumSource(value = classOf[WriteOperationType], names = Array("BULK_INSERT", "INSERT", "UPSERT", "INSERT_OVERWRITE")) def testBloomFilter(operation: WriteOperationType): Unit = { // setup hadoop conf with bloom col enabled spark.sparkContext.hadoopConfiguration.set("parquet.bloom.filter.enabled#bloom_col", "true") From ae0d26f07bb07647077b23efccfe1b46635b4774 Mon Sep 17 00:00:00 2001 From: Nicolas Paris Date: Mon, 5 Jun 2023 15:32:53 +0200 Subject: [PATCH 06/10] Add UT --- .../avro/TestHoodieAvroParquetWriter.java | 146 ++++++++++++------ 1 file changed, 95 insertions(+), 51 deletions(-) diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroParquetWriter.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroParquetWriter.java index b625d57f7fc21..9d11c0984c3db 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroParquetWriter.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroParquetWriter.java @@ -32,9 +32,15 @@ import org.apache.hudi.io.storage.HoodieAvroParquetWriter; import org.apache.hudi.io.storage.HoodieParquetConfig; import org.apache.parquet.avro.AvroSchemaConverter; +import org.apache.parquet.hadoop.BloomFilterReader; +import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.hadoop.metadata.FileMetaData; +import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.io.InputFile; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @@ -42,77 +48,115 @@ import java.util.Comparator; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.stream.Collectors; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.*; public class TestHoodieAvroParquetWriter { - @TempDir java.nio.file.Path tmpDir; + @TempDir + java.nio.file.Path tmpDir; - @Test - public void testProperWriting() throws IOException { - Configuration hadoopConf = new Configuration(); + @Test + public void testProperWriting() throws IOException { + Configuration hadoopConf = new Configuration(); - HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(0xDEED); - List records = dataGen.generateGenericRecords(10); + HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(0xDEED); + List records = dataGen.generateGenericRecords(10); - Schema schema = records.get(0).getSchema(); + Schema schema = records.get(0).getSchema(); - BloomFilter filter = BloomFilterFactory.createBloomFilter(1000, 0.0001, 10000, - BloomFilterTypeCode.DYNAMIC_V0.name()); - HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(new AvroSchemaConverter().convert(schema), - schema, Option.of(filter)); + BloomFilter filter = BloomFilterFactory.createBloomFilter(1000, 0.0001, 10000, + BloomFilterTypeCode.DYNAMIC_V0.name()); + HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(new AvroSchemaConverter().convert(schema), + schema, Option.of(filter)); - HoodieParquetConfig parquetConfig = - new HoodieParquetConfig(writeSupport, CompressionCodecName.GZIP, ParquetWriter.DEFAULT_BLOCK_SIZE, - ParquetWriter.DEFAULT_PAGE_SIZE, 1024 * 1024 * 1024, hadoopConf, 0.1, true); + HoodieParquetConfig parquetConfig = + new HoodieParquetConfig(writeSupport, CompressionCodecName.GZIP, ParquetWriter.DEFAULT_BLOCK_SIZE, + ParquetWriter.DEFAULT_PAGE_SIZE, 1024 * 1024 * 1024, hadoopConf, 0.1, true); - Path filePath = new Path(tmpDir.resolve("test.parquet").toAbsolutePath().toString()); + Path filePath = new Path(tmpDir.resolve("test.parquet").toAbsolutePath().toString()); - try (HoodieAvroParquetWriter writer = - new HoodieAvroParquetWriter(filePath, parquetConfig, "001", new DummyTaskContextSupplier(), true)) { - for (GenericRecord record : records) { - writer.writeAvro((String) record.get("_row_key"), record); - } - } + try (HoodieAvroParquetWriter writer = + new HoodieAvroParquetWriter(filePath, parquetConfig, "001", new DummyTaskContextSupplier(), true)) { + for (GenericRecord record : records) { + writer.writeAvro((String) record.get("_row_key"), record); + } + } - ParquetUtils utils = new ParquetUtils(); + ParquetUtils utils = new ParquetUtils(); - // Step 1: Make sure records are written appropriately - List readRecords = utils.readAvroRecords(hadoopConf, filePath); + // Step 1: Make sure records are written appropriately + List readRecords = utils.readAvroRecords(hadoopConf, filePath); - assertEquals(toJson(records), toJson(readRecords)); + assertEquals(toJson(records), toJson(readRecords)); - // Step 2: Assert Parquet metadata was written appropriately - List recordKeys = records.stream().map(r -> (String) r.get("_row_key")).collect(Collectors.toList()); + // Step 2: Assert Parquet metadata was written appropriately + List recordKeys = records.stream().map(r -> (String) r.get("_row_key")).collect(Collectors.toList()); - String minKey = recordKeys.stream().min(Comparator.naturalOrder()).get(); - String maxKey = recordKeys.stream().max(Comparator.naturalOrder()).get(); + String minKey = recordKeys.stream().min(Comparator.naturalOrder()).get(); + String maxKey = recordKeys.stream().max(Comparator.naturalOrder()).get(); - FileMetaData parquetMetadata = ParquetUtils.readMetadata(hadoopConf, filePath).getFileMetaData(); + FileMetaData parquetMetadata = ParquetUtils.readMetadata(hadoopConf, filePath).getFileMetaData(); - Map extraMetadata = parquetMetadata.getKeyValueMetaData(); + Map extraMetadata = parquetMetadata.getKeyValueMetaData(); - assertEquals(extraMetadata.get(HoodieBloomFilterWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER), minKey); - assertEquals(extraMetadata.get(HoodieBloomFilterWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER), maxKey); - assertEquals(extraMetadata.get(HoodieBloomFilterWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE), BloomFilterTypeCode.DYNAMIC_V0.name()); + assertEquals(extraMetadata.get(HoodieBloomFilterWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER), minKey); + assertEquals(extraMetadata.get(HoodieBloomFilterWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER), maxKey); + assertEquals(extraMetadata.get(HoodieBloomFilterWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE), BloomFilterTypeCode.DYNAMIC_V0.name()); - // Step 3: Make sure Bloom Filter contains all the record keys - BloomFilter bloomFilter = utils.readBloomFilterFromMetadata(hadoopConf, filePath); - recordKeys.forEach(recordKey -> { - assertTrue(bloomFilter.mightContain(recordKey)); - }); - } + // Step 3: Make sure Bloom Filter contains all the record keys + BloomFilter bloomFilter = utils.readBloomFilterFromMetadata(hadoopConf, filePath); + recordKeys.forEach(recordKey -> { + assertTrue(bloomFilter.mightContain(recordKey)); + }); + } - private static List toJson(List records) { - return records.stream().map(r -> { - try { - return new String(HoodieAvroUtils.avroToJson(r, true)); - } catch (IOException e) { - throw new RuntimeException(e); - } - }).collect(Collectors.toList()); - } + private static List toJson(List records) { + return records.stream().map(r -> { + try { + return new String(HoodieAvroUtils.avroToJson(r, true)); + } catch (IOException e) { + throw new RuntimeException(e); + } + }).collect(Collectors.toList()); + } + + @Test + public void testProperWritingBuiltinParquetBloomFilter() throws IOException { + + Configuration hadoopConf = new Configuration(); + String columnToAddBloom = "driver"; + hadoopConf.set("parquet.bloom.filter.enabled#" + columnToAddBloom, "true"); + + HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(0xDEED); + List records = dataGen.generateGenericRecords(10); + + Schema schema = records.get(0).getSchema(); + HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(new AvroSchemaConverter().convert(schema), + schema, Option.empty()); + HoodieParquetConfig parquetConfig = + new HoodieParquetConfig(writeSupport, CompressionCodecName.GZIP, ParquetWriter.DEFAULT_BLOCK_SIZE, + ParquetWriter.DEFAULT_PAGE_SIZE, 1024 * 1024 * 1024, hadoopConf, 0.1, true); + Path filePath = new Path(tmpDir.resolve("test-builtin-bloom.parquet").toAbsolutePath().toString()); + try (HoodieAvroParquetWriter writer = + new HoodieAvroParquetWriter(filePath, parquetConfig, "001", new DummyTaskContextSupplier(), true)) { + for (GenericRecord record : records) { + writer.writeAvro((String) record.get("_row_key"), record); + } + } + + // inspired from https://github.com/apache/parquet-mr/pull/958/files + InputFile in = HadoopInputFile.fromPath(filePath, hadoopConf); + try (ParquetFileReader reader = ParquetFileReader.open(in)) { + for (BlockMetaData block : reader.getFooter().getBlocks()) { + Optional maybeColumnMeta = block.getColumns().stream() + .filter(c -> columnToAddBloom.equals(c.getPath().toDotString())).findFirst(); + BloomFilterReader bloomFilterReader = reader.getBloomFilterDataReader(block); + org.apache.parquet.column.values.bloomfilter.BloomFilter bloomFilter = bloomFilterReader.readBloomFilter(maybeColumnMeta.get()); + assertNotNull(bloomFilter); + } + } + } } From bfa81a44b1c0b9da52ba0addad476234f8480412 Mon Sep 17 00:00:00 2001 From: Nicolas Paris Date: Mon, 5 Jun 2023 15:52:08 +0200 Subject: [PATCH 07/10] Fix style --- .../avro/TestHoodieAvroParquetWriter.java | 186 +++++++++--------- 1 file changed, 94 insertions(+), 92 deletions(-) diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroParquetWriter.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroParquetWriter.java index 9d11c0984c3db..def7d0752a569 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroParquetWriter.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroParquetWriter.java @@ -22,6 +22,7 @@ import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; + import org.apache.hudi.DummyTaskContextSupplier; import org.apache.hudi.common.bloom.BloomFilter; import org.apache.hudi.common.bloom.BloomFilterFactory; @@ -31,6 +32,7 @@ import org.apache.hudi.common.util.ParquetUtils; import org.apache.hudi.io.storage.HoodieAvroParquetWriter; import org.apache.hudi.io.storage.HoodieParquetConfig; + import org.apache.parquet.avro.AvroSchemaConverter; import org.apache.parquet.hadoop.BloomFilterReader; import org.apache.parquet.hadoop.ParquetFileReader; @@ -55,108 +57,108 @@ public class TestHoodieAvroParquetWriter { - @TempDir - java.nio.file.Path tmpDir; - - @Test - public void testProperWriting() throws IOException { - Configuration hadoopConf = new Configuration(); - - HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(0xDEED); - List records = dataGen.generateGenericRecords(10); - - Schema schema = records.get(0).getSchema(); - - BloomFilter filter = BloomFilterFactory.createBloomFilter(1000, 0.0001, 10000, - BloomFilterTypeCode.DYNAMIC_V0.name()); - HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(new AvroSchemaConverter().convert(schema), - schema, Option.of(filter)); - - HoodieParquetConfig parquetConfig = - new HoodieParquetConfig(writeSupport, CompressionCodecName.GZIP, ParquetWriter.DEFAULT_BLOCK_SIZE, - ParquetWriter.DEFAULT_PAGE_SIZE, 1024 * 1024 * 1024, hadoopConf, 0.1, true); - - Path filePath = new Path(tmpDir.resolve("test.parquet").toAbsolutePath().toString()); - - try (HoodieAvroParquetWriter writer = - new HoodieAvroParquetWriter(filePath, parquetConfig, "001", new DummyTaskContextSupplier(), true)) { - for (GenericRecord record : records) { - writer.writeAvro((String) record.get("_row_key"), record); - } - } - - ParquetUtils utils = new ParquetUtils(); - - // Step 1: Make sure records are written appropriately - List readRecords = utils.readAvroRecords(hadoopConf, filePath); + @TempDir + java.nio.file.Path tmpDir; - assertEquals(toJson(records), toJson(readRecords)); + @Test + public void testProperWriting() throws IOException { + Configuration hadoopConf = new Configuration(); - // Step 2: Assert Parquet metadata was written appropriately - List recordKeys = records.stream().map(r -> (String) r.get("_row_key")).collect(Collectors.toList()); + HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(0xDEED); + List records = dataGen.generateGenericRecords(10); - String minKey = recordKeys.stream().min(Comparator.naturalOrder()).get(); - String maxKey = recordKeys.stream().max(Comparator.naturalOrder()).get(); + Schema schema = records.get(0).getSchema(); - FileMetaData parquetMetadata = ParquetUtils.readMetadata(hadoopConf, filePath).getFileMetaData(); + BloomFilter filter = BloomFilterFactory.createBloomFilter(1000, 0.0001, 10000, + BloomFilterTypeCode.DYNAMIC_V0.name()); + HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(new AvroSchemaConverter().convert(schema), + schema, Option.of(filter)); - Map extraMetadata = parquetMetadata.getKeyValueMetaData(); + HoodieParquetConfig parquetConfig = + new HoodieParquetConfig(writeSupport, CompressionCodecName.GZIP, ParquetWriter.DEFAULT_BLOCK_SIZE, + ParquetWriter.DEFAULT_PAGE_SIZE, 1024 * 1024 * 1024, hadoopConf, 0.1, true); - assertEquals(extraMetadata.get(HoodieBloomFilterWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER), minKey); - assertEquals(extraMetadata.get(HoodieBloomFilterWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER), maxKey); - assertEquals(extraMetadata.get(HoodieBloomFilterWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE), BloomFilterTypeCode.DYNAMIC_V0.name()); + Path filePath = new Path(tmpDir.resolve("test.parquet").toAbsolutePath().toString()); - // Step 3: Make sure Bloom Filter contains all the record keys - BloomFilter bloomFilter = utils.readBloomFilterFromMetadata(hadoopConf, filePath); - recordKeys.forEach(recordKey -> { - assertTrue(bloomFilter.mightContain(recordKey)); - }); + try (HoodieAvroParquetWriter writer = + new HoodieAvroParquetWriter(filePath, parquetConfig, "001", new DummyTaskContextSupplier(), true)) { + for (GenericRecord record : records) { + writer.writeAvro((String) record.get("_row_key"), record); + } } - private static List toJson(List records) { - return records.stream().map(r -> { - try { - return new String(HoodieAvroUtils.avroToJson(r, true)); - } catch (IOException e) { - throw new RuntimeException(e); - } - }).collect(Collectors.toList()); + ParquetUtils utils = new ParquetUtils(); + + // Step 1: Make sure records are written appropriately + List readRecords = utils.readAvroRecords(hadoopConf, filePath); + + assertEquals(toJson(records), toJson(readRecords)); + + // Step 2: Assert Parquet metadata was written appropriately + List recordKeys = records.stream().map(r -> (String) r.get("_row_key")).collect(Collectors.toList()); + + String minKey = recordKeys.stream().min(Comparator.naturalOrder()).get(); + String maxKey = recordKeys.stream().max(Comparator.naturalOrder()).get(); + + FileMetaData parquetMetadata = ParquetUtils.readMetadata(hadoopConf, filePath).getFileMetaData(); + + Map extraMetadata = parquetMetadata.getKeyValueMetaData(); + + assertEquals(extraMetadata.get(HoodieBloomFilterWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER), minKey); + assertEquals(extraMetadata.get(HoodieBloomFilterWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER), maxKey); + assertEquals(extraMetadata.get(HoodieBloomFilterWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE), BloomFilterTypeCode.DYNAMIC_V0.name()); + + // Step 3: Make sure Bloom Filter contains all the record keys + BloomFilter bloomFilter = utils.readBloomFilterFromMetadata(hadoopConf, filePath); + recordKeys.forEach(recordKey -> { + assertTrue(bloomFilter.mightContain(recordKey)); + }); + } + + private static List toJson(List records) { + return records.stream().map(r -> { + try { + return new String(HoodieAvroUtils.avroToJson(r, true)); + } catch (IOException e) { + throw new RuntimeException(e); + } + }).collect(Collectors.toList()); + } + + @Test + public void testProperWritingBuiltinParquetBloomFilter() throws IOException { + + Configuration hadoopConf = new Configuration(); + String columnToAddBloom = "driver"; + hadoopConf.set("parquet.bloom.filter.enabled#" + columnToAddBloom, "true"); + + HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(0xDEED); + List records = dataGen.generateGenericRecords(10); + + Schema schema = records.get(0).getSchema(); + HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(new AvroSchemaConverter().convert(schema), + schema, Option.empty()); + HoodieParquetConfig parquetConfig = + new HoodieParquetConfig(writeSupport, CompressionCodecName.GZIP, ParquetWriter.DEFAULT_BLOCK_SIZE, + ParquetWriter.DEFAULT_PAGE_SIZE, 1024 * 1024 * 1024, hadoopConf, 0.1, true); + Path filePath = new Path(tmpDir.resolve("test-builtin-bloom.parquet").toAbsolutePath().toString()); + try (HoodieAvroParquetWriter writer = + new HoodieAvroParquetWriter(filePath, parquetConfig, "001", new DummyTaskContextSupplier(), true)) { + for (GenericRecord record : records) { + writer.writeAvro((String) record.get("_row_key"), record); + } } - @Test - public void testProperWritingBuiltinParquetBloomFilter() throws IOException { - - Configuration hadoopConf = new Configuration(); - String columnToAddBloom = "driver"; - hadoopConf.set("parquet.bloom.filter.enabled#" + columnToAddBloom, "true"); - - HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(0xDEED); - List records = dataGen.generateGenericRecords(10); - - Schema schema = records.get(0).getSchema(); - HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(new AvroSchemaConverter().convert(schema), - schema, Option.empty()); - HoodieParquetConfig parquetConfig = - new HoodieParquetConfig(writeSupport, CompressionCodecName.GZIP, ParquetWriter.DEFAULT_BLOCK_SIZE, - ParquetWriter.DEFAULT_PAGE_SIZE, 1024 * 1024 * 1024, hadoopConf, 0.1, true); - Path filePath = new Path(tmpDir.resolve("test-builtin-bloom.parquet").toAbsolutePath().toString()); - try (HoodieAvroParquetWriter writer = - new HoodieAvroParquetWriter(filePath, parquetConfig, "001", new DummyTaskContextSupplier(), true)) { - for (GenericRecord record : records) { - writer.writeAvro((String) record.get("_row_key"), record); - } - } - - // inspired from https://github.com/apache/parquet-mr/pull/958/files - InputFile in = HadoopInputFile.fromPath(filePath, hadoopConf); - try (ParquetFileReader reader = ParquetFileReader.open(in)) { - for (BlockMetaData block : reader.getFooter().getBlocks()) { - Optional maybeColumnMeta = block.getColumns().stream() - .filter(c -> columnToAddBloom.equals(c.getPath().toDotString())).findFirst(); - BloomFilterReader bloomFilterReader = reader.getBloomFilterDataReader(block); - org.apache.parquet.column.values.bloomfilter.BloomFilter bloomFilter = bloomFilterReader.readBloomFilter(maybeColumnMeta.get()); - assertNotNull(bloomFilter); - } - } + // inspired from https://github.com/apache/parquet-mr/pull/958/files + InputFile in = HadoopInputFile.fromPath(filePath, hadoopConf); + try (ParquetFileReader reader = ParquetFileReader.open(in)) { + for (BlockMetaData block : reader.getFooter().getBlocks()) { + Optional maybeColumnMeta = block.getColumns().stream() + .filter(c -> columnToAddBloom.equals(c.getPath().toDotString())).findFirst(); + BloomFilterReader bloomFilterReader = reader.getBloomFilterDataReader(block); + org.apache.parquet.column.values.bloomfilter.BloomFilter bloomFilter = bloomFilterReader.readBloomFilter(maybeColumnMeta.get()); + assertNotNull(bloomFilter); + } } + } } From d51e9b667a67809afc58e3500ba8823249fd9417 Mon Sep 17 00:00:00 2001 From: Nicolas Paris Date: Tue, 6 Jun 2023 21:16:50 +0200 Subject: [PATCH 08/10] Fix import static --- .../io/storage/HoodieBaseParquetWriter.java | 26 +++++++------------ 1 file changed, 10 insertions(+), 16 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBaseParquetWriter.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBaseParquetWriter.java index 94ba193651831..a89edff6637aa 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBaseParquetWriter.java +++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBaseParquetWriter.java @@ -25,6 +25,7 @@ import org.apache.hudi.common.fs.HoodieWrapperFileSystem; import org.apache.hudi.common.util.VisibleForTesting; +import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.hadoop.ParquetFileWriter; import org.apache.parquet.hadoop.ParquetOutputFormat; import org.apache.parquet.hadoop.ParquetWriter; @@ -34,13 +35,6 @@ import java.io.IOException; import java.util.concurrent.atomic.AtomicLong; -import static org.apache.parquet.column.ParquetProperties.DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK; -import static org.apache.parquet.column.ParquetProperties.DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK; -import static org.apache.parquet.hadoop.ParquetOutputFormat.BLOOM_FILTER_ENABLED; -import static org.apache.parquet.hadoop.ParquetOutputFormat.BLOOM_FILTER_EXPECTED_NDV; -import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_IS_VALIDATING_ENABLED; -import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_WRITER_VERSION; - /** * Base class of Hudi's custom {@link ParquetWriter} implementations * @@ -74,8 +68,8 @@ protected WriteSupport getWriteSupport(Configuration conf) { parquetWriterbuilder.withPageSize(parquetConfig.getPageSize()); parquetWriterbuilder.withDictionaryPageSize(parquetConfig.getPageSize()); parquetWriterbuilder.withDictionaryEncoding(parquetConfig.dictionaryEnabled()); - parquetWriterbuilder.withValidation(DEFAULT_IS_VALIDATING_ENABLED); - parquetWriterbuilder.withWriterVersion(DEFAULT_WRITER_VERSION); + parquetWriterbuilder.withValidation(ParquetWriter.DEFAULT_IS_VALIDATING_ENABLED); + parquetWriterbuilder.withWriterVersion(ParquetWriter.DEFAULT_WRITER_VERSION); parquetWriterbuilder.withConf(FSUtils.registerFileSystem(file, parquetConfig.getHadoopConf())); handleParquetBloomFilters(parquetWriterbuilder, parquetConfig.getHadoopConf()); @@ -86,7 +80,7 @@ protected WriteSupport getWriteSupport(Configuration conf) { // stream and the actual file size reported by HDFS this.maxFileSize = parquetConfig.getMaxFileSize() + Math.round(parquetConfig.getMaxFileSize() * parquetConfig.getCompressionRatio()); - this.recordCountForNextSizeCheck = DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK; + this.recordCountForNextSizeCheck = ParquetProperties.DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK; } protected void handleParquetBloomFilters(ParquetWriter.Builder parquetWriterbuilder, Configuration hadoopConf) { @@ -94,12 +88,12 @@ protected void handleParquetBloomFilters(ParquetWriter.Builder parquetWriterbuil // inspired from https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java#L458-L464 hadoopConf.forEach(conf -> { String key = conf.getKey(); - if (key.startsWith(BLOOM_FILTER_ENABLED)) { - String column = key.substring(BLOOM_FILTER_ENABLED.length() + 1, key.length()); + if (key.startsWith(ParquetOutputFormat.BLOOM_FILTER_ENABLED)) { + String column = key.substring(ParquetOutputFormat.BLOOM_FILTER_ENABLED.length() + 1, key.length()); parquetWriterbuilder.withBloomFilterEnabled(column, Boolean.valueOf(conf.getValue())); } - if (key.startsWith(BLOOM_FILTER_EXPECTED_NDV)) { - String column = key.substring(BLOOM_FILTER_EXPECTED_NDV.length() + 1, key.length()); + if (key.startsWith(ParquetOutputFormat.BLOOM_FILTER_EXPECTED_NDV)) { + String column = key.substring(ParquetOutputFormat.BLOOM_FILTER_EXPECTED_NDV.length() + 1, key.length()); parquetWriterbuilder.withBloomFilterNDV(column, Long.valueOf(conf.getValue(), -1)); } }); @@ -120,8 +114,8 @@ public boolean canWrite() { } recordCountForNextSizeCheck = writtenCount + Math.min( // Do check it in the halfway - Math.max(DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK, (maxFileSize / avgRecordSize - writtenCount) / 2), - DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK); + Math.max(ParquetProperties.DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK, (maxFileSize / avgRecordSize - writtenCount) / 2), + ParquetProperties.DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK); } return true; } From 0c2ed7d288afa987312612c165d2de5fa6ad4a5f Mon Sep 17 00:00:00 2001 From: Nicolas Paris Date: Tue, 6 Jun 2023 23:01:00 +0200 Subject: [PATCH 09/10] Use reflexion to handle multiple parquet versions --- .../io/storage/HoodieBaseParquetWriter.java | 29 ++++++++++++++----- 1 file changed, 21 insertions(+), 8 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBaseParquetWriter.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBaseParquetWriter.java index a89edff6637aa..2e98a5e1bfd14 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBaseParquetWriter.java +++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBaseParquetWriter.java @@ -27,14 +27,16 @@ import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.hadoop.ParquetFileWriter; -import org.apache.parquet.hadoop.ParquetOutputFormat; import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.api.WriteSupport; import java.io.Closeable; import java.io.IOException; +import java.lang.reflect.InvocationTargetException; import java.util.concurrent.atomic.AtomicLong; +import java.lang.reflect.Method; + /** * Base class of Hudi's custom {@link ParquetWriter} implementations * @@ -47,6 +49,8 @@ public abstract class HoodieBaseParquetWriter implements Closeable { private final long maxFileSize; private long recordCountForNextSizeCheck; private final ParquetWriter parquetWriter; + public static final String BLOOM_FILTER_EXPECTED_NDV = "parquet.bloom.filter.expected.ndv"; + public static final String BLOOM_FILTER_ENABLED = "parquet.bloom.filter.enabled"; public HoodieBaseParquetWriter(Path file, HoodieParquetConfig> parquetConfig) throws IOException { @@ -84,17 +88,26 @@ protected WriteSupport getWriteSupport(Configuration conf) { } protected void handleParquetBloomFilters(ParquetWriter.Builder parquetWriterbuilder, Configuration hadoopConf) { - parquetWriterbuilder.withBloomFilterEnabled(ParquetOutputFormat.getBloomFilterEnabled(hadoopConf)); // inspired from https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java#L458-L464 hadoopConf.forEach(conf -> { String key = conf.getKey(); - if (key.startsWith(ParquetOutputFormat.BLOOM_FILTER_ENABLED)) { - String column = key.substring(ParquetOutputFormat.BLOOM_FILTER_ENABLED.length() + 1, key.length()); - parquetWriterbuilder.withBloomFilterEnabled(column, Boolean.valueOf(conf.getValue())); + if (key.startsWith(BLOOM_FILTER_ENABLED)) { + String column = key.substring(BLOOM_FILTER_ENABLED.length() + 1, key.length()); + try { + Method method = parquetWriterbuilder.getClass().getDeclaredMethod("withBloomFilterEnabled"); + method.invoke(column, Boolean.valueOf(conf.getValue())); + } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) { + // skip + } } - if (key.startsWith(ParquetOutputFormat.BLOOM_FILTER_EXPECTED_NDV)) { - String column = key.substring(ParquetOutputFormat.BLOOM_FILTER_EXPECTED_NDV.length() + 1, key.length()); - parquetWriterbuilder.withBloomFilterNDV(column, Long.valueOf(conf.getValue(), -1)); + if (key.startsWith(BLOOM_FILTER_EXPECTED_NDV)) { + String column = key.substring(BLOOM_FILTER_EXPECTED_NDV.length() + 1, key.length()); + try { + Method method = parquetWriterbuilder.getClass().getDeclaredMethod("withBloomFilterNDV"); + method.invoke(column, Long.valueOf(conf.getValue(), -1)); + } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) { + // skip + } } }); } From 1351072759972efee72a28363c55f4332be18db8 Mon Sep 17 00:00:00 2001 From: Nicolas Paris Date: Thu, 22 Jun 2023 21:36:45 +0200 Subject: [PATCH 10/10] Adapt test for both spark2 and 3 --- .../scala/org/apache/hudi/TestHoodieParquetBloom.scala | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieParquetBloom.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieParquetBloom.scala index 2d816b219372c..5ef8b1276f9e8 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieParquetBloom.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieParquetBloom.scala @@ -84,14 +84,18 @@ class TestHoodieParquetBloomFilter { val accu = new NumRowGroupsAcc spark.sparkContext.register(accu) - // this one shall skip partition scanning thanks to bloom + // this one shall skip partition scanning thanks to bloom when spark >=3 spark.read.format("hudi").load(basePath).filter("bloom_col = '3'").foreachPartition((it: Iterator[Row]) => it.foreach(_ => accu.add(0))) - assertEquals(0, accu.value) + assertEquals(if (currentSparkSupportParquetBloom()) 0 else 1, accu.value) // this one will trigger one partition scan spark.read.format("hudi").load(basePath).filter("bloom_col = '2'").foreachPartition((it: Iterator[Row]) => it.foreach(_ => accu.add(0))) assertEquals(1, accu.value) } + + def currentSparkSupportParquetBloom(): Boolean = { + Integer.valueOf(spark.version.charAt(0)) >= 3 + } } class NumRowGroupsAcc extends AccumulatorV2[Integer, Integer] {