Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -18,53 +18,98 @@

package org.apache.hudi.io.storage;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;

import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
import org.apache.hudi.common.util.VisibleForTesting;

import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.api.WriteSupport;

import java.io.Closeable;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.concurrent.atomic.AtomicLong;

import static org.apache.parquet.column.ParquetProperties.DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK;
import static org.apache.parquet.column.ParquetProperties.DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK;
import java.lang.reflect.Method;

/**
* Base class of Hudi's custom {@link ParquetWriter} implementations
*
* @param <R> target type of the object being written into Parquet files (for ex,
* {@code IndexedRecord}, {@code InternalRow})
* {@code IndexedRecord}, {@code InternalRow})
*/
public abstract class HoodieBaseParquetWriter<R> extends ParquetWriter<R> {
public abstract class HoodieBaseParquetWriter<R> implements Closeable {

private final AtomicLong writtenRecordCount = new AtomicLong(0);
private final long maxFileSize;
private long recordCountForNextSizeCheck;
private final ParquetWriter parquetWriter;
public static final String BLOOM_FILTER_EXPECTED_NDV = "parquet.bloom.filter.expected.ndv";
public static final String BLOOM_FILTER_ENABLED = "parquet.bloom.filter.enabled";

public HoodieBaseParquetWriter(Path file,
HoodieParquetConfig<? extends WriteSupport<R>> parquetConfig) throws IOException {
super(HoodieWrapperFileSystem.convertToHoodiePath(file, parquetConfig.getHadoopConf()),
ParquetFileWriter.Mode.CREATE,
parquetConfig.getWriteSupport(),
parquetConfig.getCompressionCodecName(),
parquetConfig.getBlockSize(),
parquetConfig.getPageSize(),
parquetConfig.getPageSize(),
parquetConfig.dictionaryEnabled(),
DEFAULT_IS_VALIDATING_ENABLED,
DEFAULT_WRITER_VERSION,
FSUtils.registerFileSystem(file, parquetConfig.getHadoopConf()));
ParquetWriter.Builder parquetWriterbuilder = new ParquetWriter.Builder(HoodieWrapperFileSystem.convertToHoodiePath(file, parquetConfig.getHadoopConf())) {
@Override
protected ParquetWriter.Builder self() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you show me the code how spark adapter to the Parquet SSBF support?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry not fully understood what you want. What is Parquet SSBF? Do you mean how spark itself handle parquet blooms ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, can you show me that code snippet?

Copy link
Contributor Author

@parisni parisni May 17, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please see my notes for the lineage on spark: it basically use an other parquet class to write, which get the hadoop conf directly. (ie ParquetOutputFormat) while hudi uses ParquetWriter directly

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, another question is does Delta can gain benefits from these bloomfilters, how much regression for writer path when enabling the BloomFilter for parquet.

Copy link
Contributor Author

@parisni parisni May 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Delta" do you mean MOR logs files configured in parquet format ? If you mean delta-lake then yes, as well as iceberg they likely rely on spark writer so benefit from bloom.

By regression do you mean performance regression ? Each column configured with bloom will introduce overhead at write time, but faster subsequent reads with predicates on the column. I haven't benchmarked that but I can say blooms will faster reads significantly by skipping lot of parquet scan that hudi stats index won't cover. ie uuids, strings, high cardinality dictionaries

BTW in this PR up to the user to enable blooms so no regression is expected

return this;
}

@Override
protected WriteSupport getWriteSupport(Configuration conf) {
return parquetConfig.getWriteSupport();
}
};

parquetWriterbuilder.withWriteMode(ParquetFileWriter.Mode.CREATE);
parquetWriterbuilder.withCompressionCodec(parquetConfig.getCompressionCodecName());
parquetWriterbuilder.withRowGroupSize(parquetConfig.getBlockSize());
parquetWriterbuilder.withPageSize(parquetConfig.getPageSize());
parquetWriterbuilder.withDictionaryPageSize(parquetConfig.getPageSize());
parquetWriterbuilder.withDictionaryEncoding(parquetConfig.dictionaryEnabled());
parquetWriterbuilder.withValidation(ParquetWriter.DEFAULT_IS_VALIDATING_ENABLED);
parquetWriterbuilder.withWriterVersion(ParquetWriter.DEFAULT_WRITER_VERSION);
parquetWriterbuilder.withConf(FSUtils.registerFileSystem(file, parquetConfig.getHadoopConf()));
handleParquetBloomFilters(parquetWriterbuilder, parquetConfig.getHadoopConf());

parquetWriter = parquetWriterbuilder.build();
// We cannot accurately measure the snappy compressed output file size. We are choosing a
// conservative 10%
// TODO - compute this compression ratio dynamically by looking at the bytes written to the
// stream and the actual file size reported by HDFS
this.maxFileSize = parquetConfig.getMaxFileSize()
+ Math.round(parquetConfig.getMaxFileSize() * parquetConfig.getCompressionRatio());
this.recordCountForNextSizeCheck = DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK;
this.recordCountForNextSizeCheck = ParquetProperties.DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK;
}

protected void handleParquetBloomFilters(ParquetWriter.Builder parquetWriterbuilder, Configuration hadoopConf) {
// inspired from https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java#L458-L464
hadoopConf.forEach(conf -> {
String key = conf.getKey();
if (key.startsWith(BLOOM_FILTER_ENABLED)) {
String column = key.substring(BLOOM_FILTER_ENABLED.length() + 1, key.length());
try {
Method method = parquetWriterbuilder.getClass().getDeclaredMethod("withBloomFilterEnabled");
method.invoke(column, Boolean.valueOf(conf.getValue()));
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
// skip
}
}
if (key.startsWith(BLOOM_FILTER_EXPECTED_NDV)) {
String column = key.substring(BLOOM_FILTER_EXPECTED_NDV.length() + 1, key.length());
try {
Method method = parquetWriterbuilder.getClass().getDeclaredMethod("withBloomFilterNDV");
method.invoke(column, Long.valueOf(conf.getValue(), -1));
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
// skip
}
}
});
}

public boolean canWrite() {
Expand All @@ -82,15 +127,18 @@ public boolean canWrite() {
}
recordCountForNextSizeCheck = writtenCount + Math.min(
// Do check it in the halfway
Math.max(DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK, (maxFileSize / avgRecordSize - writtenCount) / 2),
DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK);
Math.max(ParquetProperties.DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK, (maxFileSize / avgRecordSize - writtenCount) / 2),
ParquetProperties.DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK);
}
return true;
}

@Override
public long getDataSize() {
return this.parquetWriter.getDataSize();
}

public void write(R object) throws IOException {
super.write(object);
this.parquetWriter.write(object);
writtenRecordCount.incrementAndGet();
}

Expand All @@ -102,4 +150,9 @@ protected long getWrittenRecordCount() {
protected long getRecordCountForNextSizeCheck() {
return recordCountForNextSizeCheck;
}

@Override
public void close() throws IOException {
this.parquetWriter.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi

import org.apache.spark.sql._
import org.apache.spark.sql.hudi.HoodieSparkSessionExtension
import org.apache.spark.util.{AccumulatorV2}
import org.apache.spark.SparkContext

import org.apache.hudi.testutils.HoodieClientTestUtils.getSparkConfForTest
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.common.model.{HoodieTableType, WriteOperationType}


import org.junit.jupiter.api.Assertions.{assertEquals}
import org.junit.jupiter.api.{BeforeEach}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{EnumSource}

class TestHoodieParquetBloomFilter {

var spark: SparkSession = _
var sqlContext: SQLContext = _
var sc: SparkContext = _

def initSparkContext(): Unit = {
val sparkConf = getSparkConfForTest(getClass.getSimpleName)

spark = SparkSession.builder()
.withExtensions(new HoodieSparkSessionExtension)
.config(sparkConf)
.getOrCreate()

sc = spark.sparkContext
sc.setLogLevel("ERROR")
sqlContext = spark.sqlContext
}

@BeforeEach
def setUp() {
initSparkContext()
}

@ParameterizedTest
@EnumSource(value = classOf[WriteOperationType], names = Array("BULK_INSERT", "INSERT", "UPSERT", "INSERT_OVERWRITE"))
def testBloomFilter(operation: WriteOperationType): Unit = {
// setup hadoop conf with bloom col enabled
spark.sparkContext.hadoopConfiguration.set("parquet.bloom.filter.enabled#bloom_col", "true")

val basePath = java.nio.file.Files.createTempDirectory("hoodie_bloom_source_path").toAbsolutePath.toString
val opts = Map(
HoodieWriteConfig.TBL_NAME.key -> "hoodie_bloom",
DataSourceWriteOptions.TABLE_TYPE.key -> HoodieTableType.COPY_ON_WRITE.toString,
DataSourceWriteOptions.OPERATION.key -> operation.toString,
DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition"
)
val inputDF = spark.sql(
"""select '0' as _row_key, '1' as bloom_col, '2' as partition, '3' as ts
|union
|select '1', '2', '3', '4'
|""".stripMargin)
inputDF.write.format("hudi")
.options(opts)
.mode(SaveMode.Overwrite)
.save(basePath)

val accu = new NumRowGroupsAcc
spark.sparkContext.register(accu)

// this one shall skip partition scanning thanks to bloom when spark >=3
spark.read.format("hudi").load(basePath).filter("bloom_col = '3'").foreachPartition((it: Iterator[Row]) => it.foreach(_ => accu.add(0)))
assertEquals(if (currentSparkSupportParquetBloom()) 0 else 1, accu.value)

// this one will trigger one partition scan
spark.read.format("hudi").load(basePath).filter("bloom_col = '2'").foreachPartition((it: Iterator[Row]) => it.foreach(_ => accu.add(0)))
assertEquals(1, accu.value)
}

def currentSparkSupportParquetBloom(): Boolean = {
Integer.valueOf(spark.version.charAt(0)) >= 3
}
}

class NumRowGroupsAcc extends AccumulatorV2[Integer, Integer] {
private var _sum = 0

override def isZero: Boolean = _sum == 0

override def copy(): AccumulatorV2[Integer, Integer] = {
val acc = new NumRowGroupsAcc()
acc._sum = _sum
acc
}

override def reset(): Unit = _sum = 0

override def add(v: Integer): Unit = _sum += v

override def merge(other: AccumulatorV2[Integer, Integer]): Unit = other match {
case a: NumRowGroupsAcc => _sum += a._sum
case _ => throw new UnsupportedOperationException(
s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
}

override def value: Integer = _sum
}