Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions website/docs/configurations.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ It helps to have a central configuration file for your common cross job configur

By default, Hudi would load the configuration file under `/etc/hudi/conf` directory. You can specify a different configuration directory location by setting the `HUDI_CONF_DIR` environment variable.
- [**Spark Datasource Configs**](#SPARK_DATASOURCE): These configs control the Hudi Spark Datasource, providing ability to define keys/partitioning, pick out the write operation, specify how to merge records or choosing query type to read.
- [**Parquet Configs**](#PARQUET_CONFIG): These configs makes it possible to bring native parquet features
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The configurations.md is automatically generated by https://github.com/apache/hudi/tree/asf-site/hudi-utils so let's avoid changing this page directly.

- [**Flink Sql Configs**](#FLINK_SQL): These configs control the Hudi Flink SQL source/sink connectors, providing ability to define record keys, pick out the write operation, specify how to merge records, enable/disable asynchronous compaction or choosing query type to read.
- [**Write Client Configs**](#WRITE_CLIENT): Internally, the Hudi datasource uses a RDD based HoodieWriteClient API to actually perform writes to storage. These configs provide deep control over lower level aspects like file sizing, compression, parallelism, compaction, write schema, cleaning etc. Although Hudi provides sane defaults, from time-time these configs may need to be tweaked to optimize for specific workloads.
- [**Metastore and Catalog Sync Configs**](#META_SYNC): Configurations used by the Hudi to sync metadata to external metastores and catalogs.
Expand Down Expand Up @@ -199,6 +200,7 @@ Options useful for reading tables via `read.format.option(...)`

You can pass down any of the WriteClient level configs directly using `options()` or `option(k,v)` methods.


```java
inputDF.write()
.format("org.apache.hudi")
Expand Down Expand Up @@ -657,6 +659,9 @@ The following set of configurations help validate new data before commits.

---

## Parquet Config {#PARQUET_CONFIG}
Hudi supports [parquet modular encryption](/docs/encryption) and [parquet bloom filters](/docs/parquet_bloom) through hadoop configurations.

## Flink Sql Configs {#FLINK_SQL}
These configs control the Hudi Flink SQL source/sink connectors, providing ability to define record keys, pick out the write operation, specify how to merge records, enable/disable asynchronous compaction or choosing query type to read.

Expand Down
74 changes: 74 additions & 0 deletions website/docs/parquet_bloom.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
---
title: Parquet Bloom Filters
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this new page show up in the sidebar?

keywords: [ hudi, index ]
summary: This section offers an overview of parquet bloom in Hudi
toc: true
last_modified_at: 2023-06-26T15:59:57-04:00
---

Since Hudi 0.14.0, for engines based on Parquet 1.12, parquet bloom filters feature has been included. In this section, we will show a guide on how to enable parquet blooms in Hudi tables.

## Various bloom support in hudi

From almost the beginning hudi has supported dynamic bloom indexing on the record key, stored in the parquet footers - aka hudi blooms.

Since 0.11 Hudi, and the multimodal indexing feature, the hudi blooms can be stored in the metadata table. Also arbitrary columns can be indexed. Storing the blooms within the metadata allow to skip reading the footers, and increase performances in huge tables scenarios.

Until now, hudi blooms are used at write time only. They are leveraged during the write operation to identify the files to be later merged.

In parallel parquet 1.12 came with it's own bloom filters - aka "parquet blooms". Those are also stored in the parquet footers, when enabled before writing the parquet files. Then at read time, if bloom matches the query predicates, the parquet engine will transparently use the blooms to skip reading data.

Now hudi supports both kind of blooms, which help in complementary contexts. All COW operations are supported, bulk_insert, insert, upsert, delete, clustering...

The current page describes how to enable parquet blooms in spark 3.x and starting from hudi 1.14.0, on a COW table.

## How parquet bloom can speedup read queries ?

Parquet has various statistics to speedup read queries (min/max, dictionaries, nulls ...). Dictionaries, already covers
cases when the column contains duplicates and has less than 40 000 unique values. In this case it stores the list of
unique values and makes blooms useless.

So bloom would be useful in either case (at the parquet file level) :

- the column has no duplicates
- the column number of unique values is more than 40k

## How should I choose the NDV

NDV is the number of distinct values and is used by parquet to size the bloom. Bloom precision (in order to limit the
false positive) is a tradeoff on its size. Then you should choose a NDV representing your own data.

## Setup parquet blooms before writes

First, make sure Hudi Spark 3.x bundle jar and hudi 0.14.0 are used.

Here is an example.

```java
JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
// create a parquet bloom on rider column
jsc.hadoopConfiguration().set("parquet.bloom.filter.enabled#rider", "true")
jsc.hadoopConfiguration().set("parquet.bloom.filter.expected.ndv#rider", "20")

QuickstartUtils.DataGenerator dataGen = new QuickstartUtils.DataGenerator();
List<String> inserts = convertToStringList(dataGen.generateInserts(3));
Dataset<Row> inputDF1 = spark.read().json(jsc.parallelize(inserts, 1));
inputDF1.write().format("org.apache.hudi")
.option("hoodie.table.name", "bloom_table")
.option("hoodie.upsert.shuffle.parallelism","2")
.option("hoodie.insert.shuffle.parallelism","2")
.option("hoodie.delete.shuffle.parallelism","2")
.option("hoodie.bulkinsert.shuffle.parallelism","2")
.mode(SaveMode.Overwrite)
.save("path");

spark.read().format("org.apache.hudi").load("path").filter("rider = 'easy'").count();
```

Then the rider column parquet blooms will allow to skip reading a high number of parquet file, depending on the bloom tuning.

Read more from [Parquet docs](https://github.com/apache/parquet-mr/tree/parquet-1.12.x/parquet-hadoop).

### Note

This feature is currently only available for COW tables due to only Parquet base files present there.