diff --git a/rfc/README.md b/rfc/README.md index 0b4d10f27bcce..ca9e8782accc3 100644 --- a/rfc/README.md +++ b/rfc/README.md @@ -60,7 +60,7 @@ The list of all RFCs can be found here. | 34 | [Hudi BigQuery Integration (WIP)](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=188745980) | `UNDER REVIEW` | | 35 | [Make Flink MOR table writing streaming friendly](https://cwiki.apache.org/confluence/display/HUDI/RFC-35%3A+Make+Flink+MOR+table+writing+streaming+friendly) | `UNDER REVIEW` | | 36 | [HUDI Metastore Server](https://cwiki.apache.org/confluence/display/HUDI/%5BWIP%5D+RFC-36%3A+HUDI+Metastore+Server) | `UNDER REVIEW` | -| 37 | [Hudi metadata based bloom index] | `UNDER REVIEW` | +| 37 | [Hudi Metadata based Bloom Index](rfc-37/rfc-37.md) | `IN PROGRESS` | | 38 | [Spark Datasource V2 Integration] | `UNDER REVIEW` | | 39 | [Incremental source for Debezium](./rfc-39/rfc-39.md) | `IN PROGRESS` | | 40 | [Hudi Connector for Trino](./rfc-40/rfc-40.md) | `IN PROGRESS` | diff --git a/rfc/rfc-37/metadata_index_1.png b/rfc/rfc-37/metadata_index_1.png new file mode 100644 index 0000000000000..40b834f40f9b8 Binary files /dev/null and b/rfc/rfc-37/metadata_index_1.png differ diff --git a/rfc/rfc-37/metadata_index_bloom_partition.png b/rfc/rfc-37/metadata_index_bloom_partition.png new file mode 100644 index 0000000000000..8ada4b7f2c18f Binary files /dev/null and b/rfc/rfc-37/metadata_index_bloom_partition.png differ diff --git a/rfc/rfc-37/metadata_index_col_stats.png b/rfc/rfc-37/metadata_index_col_stats.png new file mode 100644 index 0000000000000..02a77fe0dd6d2 Binary files /dev/null and b/rfc/rfc-37/metadata_index_col_stats.png differ diff --git a/rfc/rfc-37/rfc-37.md b/rfc/rfc-37/rfc-37.md new file mode 100644 index 0000000000000..28d27b399482e --- /dev/null +++ b/rfc/rfc-37/rfc-37.md @@ -0,0 +1,329 @@ + +# RFC-37: Metadata based Bloom Index + +## Proposers +- @nsivabalan +- @manojpec + +## Approvers + - @vinothchandar + - @satishkotha + +## Status +JIRA: https://issues.apache.org/jira/browse/HUDI-2703 + +## Abstract +Hudi maintains several indices to locate/map incoming records to file groups during writes. Most commonly used record +index is the HoodieBloomIndex. Larger tables and global index has performance issues as the bloom filter from a large +number of data files needed to be read and looked up. Reading from several files over the cloud object storage like S3 +also faces request throttling issues. We are proposing to build a new Metadata index (metadata table based bloom index) +to boost the performance of existing bloom index. + +## Background + +HoodieBloomIndex is used to find the location of incoming records during every write. Bloom index assists Hudi in +deterministically routing records to a given file group and to distinguish inserts vs updates. This aggregate bloom +index is built from several bloom filters stored in the base file footers. Prior to bloom filter lookup, the file +pruning for the incoming records is also done based on the record key min/max stats stored in the base file footers. In +this RFC, we plan to build a new index for the bloom filters under the metadata table which to assist in bloom index +based record location tagging. This overlaps +with [RFC-27 Data skipping index ](https://cwiki.apache.org/confluence/display/HUDI/RFC-27+Data+skipping+index+to+improve+query+performance) +in the read path for improving the query performance. + +## Design +HoodieBloomIndex involves the following steps to find the right location of incoming records + +1. Find all the interested partitions and list all its data files. +2. File Pruning: Load record key min/max details from all the interested data file footers. Filter files and generate + files to keys mapping for the incoming records based on the key ranges using range interval tree built from + previously loaded min/max details. +3. Bloom Filter lookup: Filter files and prune files to keys mapping for the incoming keys mapping based on the bloom + filter key lookup +4. Final Look up in actual data files to find the right location of every incoming record + +As we could see from step 1 and 2, we are in need of min and max values for "_hoodie_record_key" and bloom filters +from all interested data files to perform the location tagging. In this design, we will add these key stats and +bloom filter to the metadata table and thereby able to quickly load the interested details and do faster lookups. + +Metadata table already has one partition `files` to help in partition file listing. For the metadata table based +indices, we are proposing to add following two new partitions: +1. `bloom_filter` - for the file level bloom filter +2. `column_stats` - for the key range stats + +Why metadata table: +Metadata table uses HBase HFile - the tree map file format to store and retrieve data. HFile is an indexed file format +and supports map like faster lookups by keys. Since, we will be storing stats/bloom for every file and the index will do +lookups based on files, we should be able to benefit from the faster lookups in HFile. + +High Level Metadata Index Design + +Following sections will talk about different partitions, key formats and then dive into the data and control flows. + +### MetaIndex/BloomFilter: + +A new partition `bloom_filter` will be added under the metadata table. Bloom filters from all the base files in the +data table will be added here. Metadata table is already in the HFile format. The existing metadata payload schema will +be extended and shared for this partition also. The type field will be used to detect the bloom filter payload record. +Here is the schema for the bloom filter payload record. +``` + { + "doc": "Metadata Index of bloom filters for all data files in the user table", + "name": "BloomFilterMetadata", + "type": [ + "null", + { + "doc": "Data file bloom filter details", + "name": "HoodieMetadataBloomFilter", + "type": "record", + "fields": [ + { + "doc": "Bloom filter type code", + "name": "type", + "type": "string" + }, + { + "doc": "Instant timestamp when this metadata was created/updated", + "name": "timestamp", + "type": "string" + }, + { + "doc": "Bloom filter binary byte array", + "name": "bloomFilter", + "type": "bytes" + }, + { + "doc": "Bloom filter entry valid/deleted flag", + "name": "isDeleted", + "type": "boolean" + } + ] + } + ] + } +``` + +The key for the bloom filter record would be an encoded string representing the partition and base file combo. The +partition and the file names are converted to deterministic hash based IDs, and then they are base64 encoded. Hash based +IDs are easy to generate for the incoming new inserts records and for the lookup for the updated records. It doesn't +need any dictionary to be added for the reverse lookups. Hash bits are chosen based on the cardinality and the collision +probability desired for the support max scale deployment. Base64 encoding the hash IDs further reduces the on-disk +storage space for these keys. + +``` +key = base64_encode(concat(hash64(partition name), hash128(file name))) +``` + +Bloom filter partition + +### MetaIndex/ColumnStats: + +Another new partition `column_stats` will also be added under the metadata table to make the record key lookup code path +much more performant. This metadata index also helps in the data skipping (please look at RFC-27 for more details). In +the context of faster record key lookups for the update use cases, proposing `column_stats` index to be used for +file pruning when generating the file to candidate keys mapping for the update records.The existing metadata payload +schema will be extended and shared for this partition also. The type field will be used to detect the column stats +payload record. Here is the schema for the column stats payload record. + +``` + { + "doc": "Metadata Index of column statistics for all data files in the user table", + "name": "ColumnStatsMetadata", + "type": [ + "null", + { + "doc": "Data file column statistics", + "name": "HoodieColumnStats", + "type": "record", + "fields": [ + { + "doc": "File name for which this column statistics applies", + "name": "fileName", + "type": [ + "null", + "string" + ] + }, + { + "doc": "Minimum value in the range. Based on user data table schema, we can convert this to appropriate type", + "name": "minValue", + "type": [ + "null", + "string" + ] + }, + { + "doc": "Maximum value in the range. Based on user data table schema, we can convert it to appropriate type", + "name": "maxValue", + "type": [ + "null", + "string" + ] + }, + { + "doc": "Total count of values", + "name": "valueCount", + "type": [ + "null", + "long" + ] + }, + { + "doc": "Total count of null values", + "name": "nullCount", + "type": [ + "null", + "long" + ] + }, + { + "doc": "Total storage size on disk", + "name": "totalSize", + "type": [ + "null", + "long" + ] + }, + { + "doc": "Total uncompressed storage size on disk", + "name": "totalUncompressedSize", + "type": [ + "null", + "long" + ] + }, + { + "doc": "Column range entry valid/deleted flag", + "name": "isDeleted", + "type": "boolean" + } + ] + } + ] + } +``` + +Column stats records hold key ranges (min and max) for the file. The key for the column stat record would be an +encoded string representing the tuple set of column name, partition name and the base file. The string names of +these fields are converted to deterministic hash based IDs, and then they are base64 encoded, just like the +bloom filter key. + +``` +key = base64_encode(concat(hash64(column name), hash64(partition name), hash128(file name))) +``` + +While Hash based IDs have quite a few desirable properties in the context of Hudi index lookups, there is an impact +on the column level schema changes though. Refer to [Schema Evolution](#Schema-Evolution) section for more details. + +Below picture gives a pictorial representation of Column stats partition in metadata table. +Column Stats Partition + +### Metadata Index lookup: + +For the incoming upsert records, given their keys, tag their current location. The new algorithm for the +index lookup would be + +1. Generate the list of partitions and the list of keys under each partition to be looked up +2. For all the involved partitions, load all its file list +3. Level 1: Range pruning using `column_stats` index: + 1. For each of the record key, generate the column stats index lookup key based on the tuple + (__hoodie_record_key, partition name, file path) + 2. Meta index lookup with the above key and if available get the value payload with the column stats details + 3. Prune the partition and its candidate files based on the range comparisons +4. Level 2: Record pruning using `bloom_filter` index: + 1. From the shortlisted file candidates per partition, generate bloom filter index lookup key based on the tuple + (partition name, file path) + 2. Meta index lookup with the above key to load the base file bloom filter + 3. Bloom filter lookup for the record key to generate the candidate keys that are probably available in the base file +5. Level 3: Record validation + 1. Given the list of files and their candidate keys from above pruning, do the actual file lookup to confirm the keys + 2. Return the location (file id) of the final matching keys + +### Schema Evolution: + +HashID based key are deterministically generated from the tuple input. That is, for the tuple consisting of column name, +partition name and file name, the key generated would always be the same. So, a table where the schema gets changed over +time would have an impact on the keys already generated. The most common schema evolution use cases like change of +column type, adding a new column are not affected though. Other relatively uncommon use cases like column name rename, +dropping a column and adding a column with dropped name would have indices referring them more than needed. This would +lead to the index lookup matching stale/new records across evolved schemas. + +To avoid looking up stale/new index records, here are the design options we have: +1. (Preferred) Query rewrite / Result recordset pruning + 1. Schema evolution layer should introduce query rewrite stage to detect evolved schemas for the input query and + optionally include additional predicates to the query + 2. The resultant recordset can also be pruned based on the commit time and the schema change time +3. Making input tuple set schema aware + 1. Along with column name, partition name and file path, a version/tag can also be added to make the key + generated very schema specific. But, this choice has a performance impact as the lookup now has to be more of a + prefix based instead of pointed lookups. That is, index lookup have to return records for all the versions/tags + and pruning on top of this have to be done. + +## Implementation + +1. No change to the HoodieIndex public interface. +``` + /** + * Looks up the index and tags each incoming record with a location of a file that contains + * the row (if it is actually present). + */ + @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING) + public abstract HoodieData> tagLocation( + HoodieData> records, HoodieEngineContext context, + HoodieTable hoodieTable) throws HoodieIndexException; +``` +2. HoodieBloomIndex::explodeRecordsWithFileComparisons() will be extended to check for a new config `hoodie.metadata.file_pruning.enable` + and if enabled, metadata table based column stat will be used for file pruning based on key ranges. +3. + + +### Writer flow: +Let's walk through the writer flow to update these partitions. + +Whenever a new commit is getting applied to metadata table, we do the following.
+1. Files partition - prepare records for adding +2. Column_stats partition - prepare records for adding +[ColumnIndexID][PartitionIndexID][FileIndexID] => ColumnStats +This involves reading the base file footers to fetch min max values for each column +3. Bloom_filter partition - prepare records for adding +[PartitionIndexID][FileIndexID] => BloomFilter +This involves reading the base file footers. +We can amortize the cost across (2) and (3) and just read it once and prepare/populate records for both partitions. +4. Commit all these records to metadata table. + +We need to ensure we have all sufficient info in WriteStatus get sent to metadata writer for every commit. + +### Reader flow: +When a new batch of write is ingested into Hudi, we need to tag the records with their +original file group location. And this index will leverage both the partitions to deduce the +record key => file name mappings. Refer to Metadata Index lookup section for more details. + +## Rollout/Adoption Plan +* Release 0.10.0 is a FlagDay release. Mean, the old metadata table will be wiped out and a new one will be built. +* Metadata Index feature is planning for 0.10.x version. Any preparatory changes/features ( + like `Metadata new indexing for existing tables`, RFC proposal and doc pending) that are needed to have this feature + in the later minor release need to be rolled out as part of 0.10.0 +* TODO: More details on the rollout plan + +## Test Plan +* Functionality + * Tag location for existing keys + * Tag location for non-existing keys +* Performance + * Prove Metadata based indices are helping upsert use cases +* Upgrade +* TODO: More details on the test plan