diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBaseParquetWriter.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBaseParquetWriter.java index 2e98a5e1bfd14..34736e5b4d260 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBaseParquetWriter.java +++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBaseParquetWriter.java @@ -87,6 +87,12 @@ protected WriteSupport getWriteSupport(Configuration conf) { this.recordCountForNextSizeCheck = ParquetProperties.DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK; } + /** + * Once we get parquet version >= 1.12 among all engines we can cleanup the reflexion hack. + * + * @param parquetWriterbuilder + * @param hadoopConf + */ protected void handleParquetBloomFilters(ParquetWriter.Builder parquetWriterbuilder, Configuration hadoopConf) { // inspired from https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java#L458-L464 hadoopConf.forEach(conf -> { @@ -94,8 +100,8 @@ protected void handleParquetBloomFilters(ParquetWriter.Builder parquetWriterbuil if (key.startsWith(BLOOM_FILTER_ENABLED)) { String column = key.substring(BLOOM_FILTER_ENABLED.length() + 1, key.length()); try { - Method method = parquetWriterbuilder.getClass().getDeclaredMethod("withBloomFilterEnabled"); - method.invoke(column, Boolean.valueOf(conf.getValue())); + Method method = parquetWriterbuilder.getClass().getMethod("withBloomFilterEnabled", String.class, boolean.class); + method.invoke(parquetWriterbuilder, column, Boolean.valueOf(conf.getValue()).booleanValue()); } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) { // skip } @@ -103,8 +109,8 @@ protected void handleParquetBloomFilters(ParquetWriter.Builder parquetWriterbuil if (key.startsWith(BLOOM_FILTER_EXPECTED_NDV)) { String column = key.substring(BLOOM_FILTER_EXPECTED_NDV.length() + 1, key.length()); try { - Method method = parquetWriterbuilder.getClass().getDeclaredMethod("withBloomFilterNDV"); - method.invoke(column, Long.valueOf(conf.getValue(), -1)); + Method method = parquetWriterbuilder.getClass().getMethod("withBloomFilterNDV", String.class, long.class); + method.invoke(parquetWriterbuilder, column, Long.valueOf(conf.getValue()).longValue()); } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) { // skip } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieParquetBloom.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieParquetBloom.scala index 5ef8b1276f9e8..2e5e30362bb92 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieParquetBloom.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieParquetBloom.scala @@ -62,6 +62,10 @@ class TestHoodieParquetBloomFilter { def testBloomFilter(operation: WriteOperationType): Unit = { // setup hadoop conf with bloom col enabled spark.sparkContext.hadoopConfiguration.set("parquet.bloom.filter.enabled#bloom_col", "true") + spark.sparkContext.hadoopConfiguration.set("parquet.bloom.filter.expected.ndv#bloom_col", "2") + // ensure nothing but bloom can trigger read skip + spark.sql("set parquet.filter.columnindex.enabled=false") + spark.sql("set parquet.filter.stats.enabled=false") val basePath = java.nio.file.Files.createTempDirectory("hoodie_bloom_source_path").toAbsolutePath.toString val opts = Map(