diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBaseParquetWriter.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBaseParquetWriter.java index a82c26bae9219..2e98a5e1bfd14 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBaseParquetWriter.java +++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBaseParquetWriter.java @@ -18,53 +18,98 @@ package org.apache.hudi.io.storage; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; + import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.fs.HoodieWrapperFileSystem; import org.apache.hudi.common.util.VisibleForTesting; + +import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.hadoop.ParquetFileWriter; import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.api.WriteSupport; +import java.io.Closeable; import java.io.IOException; +import java.lang.reflect.InvocationTargetException; import java.util.concurrent.atomic.AtomicLong; -import static org.apache.parquet.column.ParquetProperties.DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK; -import static org.apache.parquet.column.ParquetProperties.DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK; +import java.lang.reflect.Method; /** * Base class of Hudi's custom {@link ParquetWriter} implementations * * @param target type of the object being written into Parquet files (for ex, - * {@code IndexedRecord}, {@code InternalRow}) + * {@code IndexedRecord}, {@code InternalRow}) */ -public abstract class HoodieBaseParquetWriter extends ParquetWriter { +public abstract class HoodieBaseParquetWriter implements Closeable { private final AtomicLong writtenRecordCount = new AtomicLong(0); private final long maxFileSize; private long recordCountForNextSizeCheck; + private final ParquetWriter parquetWriter; + public static final String BLOOM_FILTER_EXPECTED_NDV = "parquet.bloom.filter.expected.ndv"; + public static final String BLOOM_FILTER_ENABLED = "parquet.bloom.filter.enabled"; public HoodieBaseParquetWriter(Path file, HoodieParquetConfig> parquetConfig) throws IOException { - super(HoodieWrapperFileSystem.convertToHoodiePath(file, parquetConfig.getHadoopConf()), - ParquetFileWriter.Mode.CREATE, - parquetConfig.getWriteSupport(), - parquetConfig.getCompressionCodecName(), - parquetConfig.getBlockSize(), - parquetConfig.getPageSize(), - parquetConfig.getPageSize(), - parquetConfig.dictionaryEnabled(), - DEFAULT_IS_VALIDATING_ENABLED, - DEFAULT_WRITER_VERSION, - FSUtils.registerFileSystem(file, parquetConfig.getHadoopConf())); + ParquetWriter.Builder parquetWriterbuilder = new ParquetWriter.Builder(HoodieWrapperFileSystem.convertToHoodiePath(file, parquetConfig.getHadoopConf())) { + @Override + protected ParquetWriter.Builder self() { + return this; + } + + @Override + protected WriteSupport getWriteSupport(Configuration conf) { + return parquetConfig.getWriteSupport(); + } + }; + + parquetWriterbuilder.withWriteMode(ParquetFileWriter.Mode.CREATE); + parquetWriterbuilder.withCompressionCodec(parquetConfig.getCompressionCodecName()); + parquetWriterbuilder.withRowGroupSize(parquetConfig.getBlockSize()); + parquetWriterbuilder.withPageSize(parquetConfig.getPageSize()); + parquetWriterbuilder.withDictionaryPageSize(parquetConfig.getPageSize()); + parquetWriterbuilder.withDictionaryEncoding(parquetConfig.dictionaryEnabled()); + parquetWriterbuilder.withValidation(ParquetWriter.DEFAULT_IS_VALIDATING_ENABLED); + parquetWriterbuilder.withWriterVersion(ParquetWriter.DEFAULT_WRITER_VERSION); + parquetWriterbuilder.withConf(FSUtils.registerFileSystem(file, parquetConfig.getHadoopConf())); + handleParquetBloomFilters(parquetWriterbuilder, parquetConfig.getHadoopConf()); + parquetWriter = parquetWriterbuilder.build(); // We cannot accurately measure the snappy compressed output file size. We are choosing a // conservative 10% // TODO - compute this compression ratio dynamically by looking at the bytes written to the // stream and the actual file size reported by HDFS this.maxFileSize = parquetConfig.getMaxFileSize() + Math.round(parquetConfig.getMaxFileSize() * parquetConfig.getCompressionRatio()); - this.recordCountForNextSizeCheck = DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK; + this.recordCountForNextSizeCheck = ParquetProperties.DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK; + } + + protected void handleParquetBloomFilters(ParquetWriter.Builder parquetWriterbuilder, Configuration hadoopConf) { + // inspired from https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java#L458-L464 + hadoopConf.forEach(conf -> { + String key = conf.getKey(); + if (key.startsWith(BLOOM_FILTER_ENABLED)) { + String column = key.substring(BLOOM_FILTER_ENABLED.length() + 1, key.length()); + try { + Method method = parquetWriterbuilder.getClass().getDeclaredMethod("withBloomFilterEnabled"); + method.invoke(column, Boolean.valueOf(conf.getValue())); + } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) { + // skip + } + } + if (key.startsWith(BLOOM_FILTER_EXPECTED_NDV)) { + String column = key.substring(BLOOM_FILTER_EXPECTED_NDV.length() + 1, key.length()); + try { + Method method = parquetWriterbuilder.getClass().getDeclaredMethod("withBloomFilterNDV"); + method.invoke(column, Long.valueOf(conf.getValue(), -1)); + } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) { + // skip + } + } + }); } public boolean canWrite() { @@ -82,15 +127,18 @@ public boolean canWrite() { } recordCountForNextSizeCheck = writtenCount + Math.min( // Do check it in the halfway - Math.max(DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK, (maxFileSize / avgRecordSize - writtenCount) / 2), - DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK); + Math.max(ParquetProperties.DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK, (maxFileSize / avgRecordSize - writtenCount) / 2), + ParquetProperties.DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK); } return true; } - @Override + public long getDataSize() { + return this.parquetWriter.getDataSize(); + } + public void write(R object) throws IOException { - super.write(object); + this.parquetWriter.write(object); writtenRecordCount.incrementAndGet(); } @@ -102,4 +150,9 @@ protected long getWrittenRecordCount() { protected long getRecordCountForNextSizeCheck() { return recordCountForNextSizeCheck; } + + @Override + public void close() throws IOException { + this.parquetWriter.close(); + } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieParquetBloom.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieParquetBloom.scala new file mode 100644 index 0000000000000..5ef8b1276f9e8 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieParquetBloom.scala @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi + +import org.apache.spark.sql._ +import org.apache.spark.sql.hudi.HoodieSparkSessionExtension +import org.apache.spark.util.{AccumulatorV2} +import org.apache.spark.SparkContext + +import org.apache.hudi.testutils.HoodieClientTestUtils.getSparkConfForTest +import org.apache.hudi.DataSourceWriteOptions +import org.apache.hudi.config.HoodieWriteConfig +import org.apache.hudi.common.model.{HoodieTableType, WriteOperationType} + + +import org.junit.jupiter.api.Assertions.{assertEquals} +import org.junit.jupiter.api.{BeforeEach} +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.{EnumSource} + +class TestHoodieParquetBloomFilter { + + var spark: SparkSession = _ + var sqlContext: SQLContext = _ + var sc: SparkContext = _ + + def initSparkContext(): Unit = { + val sparkConf = getSparkConfForTest(getClass.getSimpleName) + + spark = SparkSession.builder() + .withExtensions(new HoodieSparkSessionExtension) + .config(sparkConf) + .getOrCreate() + + sc = spark.sparkContext + sc.setLogLevel("ERROR") + sqlContext = spark.sqlContext + } + + @BeforeEach + def setUp() { + initSparkContext() + } + + @ParameterizedTest + @EnumSource(value = classOf[WriteOperationType], names = Array("BULK_INSERT", "INSERT", "UPSERT", "INSERT_OVERWRITE")) + def testBloomFilter(operation: WriteOperationType): Unit = { + // setup hadoop conf with bloom col enabled + spark.sparkContext.hadoopConfiguration.set("parquet.bloom.filter.enabled#bloom_col", "true") + + val basePath = java.nio.file.Files.createTempDirectory("hoodie_bloom_source_path").toAbsolutePath.toString + val opts = Map( + HoodieWriteConfig.TBL_NAME.key -> "hoodie_bloom", + DataSourceWriteOptions.TABLE_TYPE.key -> HoodieTableType.COPY_ON_WRITE.toString, + DataSourceWriteOptions.OPERATION.key -> operation.toString, + DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key", + DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition" + ) + val inputDF = spark.sql( + """select '0' as _row_key, '1' as bloom_col, '2' as partition, '3' as ts + |union + |select '1', '2', '3', '4' + |""".stripMargin) + inputDF.write.format("hudi") + .options(opts) + .mode(SaveMode.Overwrite) + .save(basePath) + + val accu = new NumRowGroupsAcc + spark.sparkContext.register(accu) + + // this one shall skip partition scanning thanks to bloom when spark >=3 + spark.read.format("hudi").load(basePath).filter("bloom_col = '3'").foreachPartition((it: Iterator[Row]) => it.foreach(_ => accu.add(0))) + assertEquals(if (currentSparkSupportParquetBloom()) 0 else 1, accu.value) + + // this one will trigger one partition scan + spark.read.format("hudi").load(basePath).filter("bloom_col = '2'").foreachPartition((it: Iterator[Row]) => it.foreach(_ => accu.add(0))) + assertEquals(1, accu.value) + } + + def currentSparkSupportParquetBloom(): Boolean = { + Integer.valueOf(spark.version.charAt(0)) >= 3 + } +} + +class NumRowGroupsAcc extends AccumulatorV2[Integer, Integer] { + private var _sum = 0 + + override def isZero: Boolean = _sum == 0 + + override def copy(): AccumulatorV2[Integer, Integer] = { + val acc = new NumRowGroupsAcc() + acc._sum = _sum + acc + } + + override def reset(): Unit = _sum = 0 + + override def add(v: Integer): Unit = _sum += v + + override def merge(other: AccumulatorV2[Integer, Integer]): Unit = other match { + case a: NumRowGroupsAcc => _sum += a._sum + case _ => throw new UnsupportedOperationException( + s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}") + } + + override def value: Integer = _sum +}