Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 77 additions & 55 deletions rfc/rfc-27/rfc-27.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,20 @@ JIRA: https://issues.apache.org/jira/browse/HUDI-1822

## Abstract

Query engines typically scan large amounts of irrelevant data for query planning and execution. Some workarounds are
available to reduce amount of irrelevant data scanned. These include
Query engines typically scan large amounts of data for query planning and execution. Few data skipping strategies are
available to reduce the amount of data scanned, like

- Partition pruning
- File pruning <br>
- Some data file formats contain metadata including range information for certain columns (for parquet, this metadata
is stored in footer).
- As part of query planning, all range information from data files is read.
- Irrelevant data files are then pruned based on predicates and available range information

Partition pruning typically puts the burden on users to select partitions where the data may exist. File pruning approach
is expensive and does not scale if there are large number of partitions and data files to be scanned. So we propose a
new solution to store additional information as part of Hudi metadata table to implement data skipping index. The
goals of data skipping index is to provide:
- User has to select the partitions to narrow down the data to be scanned for the query.
- File pruning
- Some data file formats contain metadata including range information for certain columns, like for parquet, this
metadata is stored in the file footer. As part of query planning, all range information from data files are loaded
and data files are then pruned based on the comparisons done for the query expression with the column range
information.
- This approach does not scale if there are a large number of partitions and data files to be scanned.

We propose a new data skipping approach here for improving the query performance. to store additional information as
part of Hudi metadata table to implement data skipping index. The goals of data skipping index is to provide:

- Global index: Users query for information they need without need for specifying partitions. Index can effectively find
data files in the table.
Expand Down Expand Up @@ -90,18 +91,18 @@ So, high level requirement for this column_stats partition is (pertaining to thi

### Storage format
To cater to the above requirement, we plan to encode column name, partition path and file name to the keys in HFile.
Since HFile supports efficient range/prefix search, our look up should be very fast.
Since HFile supports efficient range/prefix search, our lookup should be very fast.

![Column Stats Partition](col_stats.png)

We plan to generate unique and random and unique hash IDs for all 3 components
- ColumnID :
- ColumnIndexID :
- base64(hash32(column name))
- on-disk size = 12bytes per col_stat per file
- PartitionID:
- PartitionIndexID:
- base64(hash32(partition name))
- on-disk size = 12bytes per partition
- FileID:
- FileIndexID:
- base64(hash128(file name))
- on-disk size = 24bytes per file

Expand All @@ -127,8 +128,8 @@ We plan to generate unique and random and unique hash IDs for all 3 components
Takes up larger space in-memory and on-disk compared to Sequential IDs. Theoretically, the compression ratio should be lesser compared to Sequential IDs.

Key format in column_stats partition<br/>
- [colId][PartitionId][FileId]
- [colId]+"agg"+[PartitionId]
- ColumnStatsIndexID = ColumnIndexID.append(PartitionIndexID).append(FileIndexID)
- ColumnStatsAggregateIndexID = ColumnIndexID.append(PartitionIndexID)

First type will be used to store one entry per column per file. And second type will be used to store one aggregated
entry per column per partition. This will be a fixed size key. Lookups don't have to search for ID delimiters as in the
Expand All @@ -140,17 +141,17 @@ our use-case as we have chosen the key format consciously having this in mind.

Given a list of columns and optionally partitions, return a list of matching file names.

1. We can do prefix search of [ColumnID] or [ColumnID][PartitionID]
1. We can do prefix search of [ColumnIndexID] or [ColumnIndexID][PartitionIndexID]
- If both columnId and partitionIds are supplied, we will do range read of [colId][partitionId].
- If list of partitions not available as part of query, we will first look up [colId]+"agg" to do prefix search
for partition level stats. Filter for those partitions which matches the predicates and then follow (1) as in previous line.

2. Fetch only interested entries for [colId][partitionId] list.
3. Will look up the stats and filter for matching FileIDs
4. Reverse lookup in Files partition to get FileID to FileName mapping.
3. Will look up the stats and filter for matching FileIndexIDs
4. Reverse lookup in Files partition to get FileIndexID to FileName mapping.

Note:
As you could see here, reverse look up of FileId to fileName mapping has to go into "Files" partition to satisfy our requirement.
As you could see here, reverse look up of FileIndexID to fileName mapping has to go into "Files" partition to satisfy our requirement.
So, "Files" partition will be added with additional entries of fileId to fileName mappings on the write path.

#### Sharding:
Expand Down Expand Up @@ -194,54 +195,75 @@ field will be used to detect the column stats payload record. Here is the schema
.
},
{
"doc": "Metadata Index of column statistics for all data files in the user table",
"name": "ColumnStatsMetadata",
"doc": "Contains information about column statistics for all data files in the table",
"type": [
"null",
{
"type": "record",
"doc": "Data file column statistics",
"name": "HoodieColumnStats",
"type": "record",
"fields": [
{
"name": "rangeLow",
"doc": "File name for which this column statistics applies",
"name": "fileName",
"type": [
"null",
"bytes"
],
"doc": "Low end of the range. For now, this is a String. Based on main data table schema, we can convert it to appropriate type"
"string"
]
},
{
"name": "rangeHigh",
"doc": "Minimum value in the range. Based on user data table schema, we can convert this to appropriate type",
"name": "minValue",
"type": [
"null",
"bytes"
],
"doc": "High end of the range. For now, this is a String. Based on main data table schema, we can convert it to appropriate type"
"string"
]
},
{
"name":"total_values",
"type":["long", "null"],
"doc" : "Stores total values for this column in the resepective data file"
},
"doc": "Maximum value in the range. Based on user data table schema, we can convert it to appropriate type",
"name": "maxValue",
"type": [
"null",
"string"
]
},
{
"name":"total_nulls",
"type":["long", "null"],
"doc" : "Stores total null values for this column in the resepective data file"
},
"doc": "Total count of values",
"name": "valueCount",
"type": [
"null",
"long"
]
},
{
"name":"total_nans",
"type":["long", "null"],
"doc" : "Stores total Nan values for this column in the resepective data file"
},
"doc": "Total count of null values",
"name": "nullCount",
"type": [
"null",
"long"
]
},
{
"doc": "Total storage size on disk",
"name": "totalSize",
"type": [
"null",
"long"
]
},
{
"name":"total_size_on_disk",
"type":["long", "null"],
"doc" : "Stores total size occupied by this column on disk corresponding to the resepective data file"
},
"doc": "Total uncompressed storage size on disk",
"name": "totalUncompressedSize",
"type": [
"null",
"long"
]
},
{
"doc": "Column range entry valid/deleted flag",
"name": "isDeleted",
"type": "boolean",
"doc": "True if this file has been deleted"
"type": "boolean"
}
]
}
Expand All @@ -254,7 +276,7 @@ encoded string as discussed earlier.

```
key = base64_encode(hash64(column name) + hash64(partition name) + hash128(file path))
key = base64_encode(hash64(column name) + "agg" + hash64(partition name))
key = base64_encode(hash64(column name) + hash64(partition name))
```

While Hash based IDs have quite a few desirable properties in the context of Hudi index lookups, there is an impact
Expand All @@ -265,13 +287,13 @@ Let's walk through the writer flow to update column_stats partition in metadata

1. Files partition - prepare records for adding // just calling out whats required in the context of column_stats
partition. General files partition will be updated as usual to store file listing information.
- FileID => FileName mapping entries
- PartitionID => PartitionName entry, if not already exists
- FileIndexID => FileName mapping entries
- PartitionIndexID => PartitionName entry, if not already exists
- Since these IDs are hash based IDs, no look up of prior usages is required here. If not, we need to know what was
the last assigned ID and then go about assigning new incremental/sequential IDs, which slows down the performance significantly
2. Column_stats partition - prepare records for adding
- [ColumnID][PartitionID][FileID] => ColumnStat
- [ColumnId]"agg"[PartitionId] => ColumnStat
- [ColumnIndexID][PartitionIndexID][FileIndexID] => ColumnStat
- [ColumnIndexID]"agg"[PartitionIndexID] => ColumnStat
- This involves reading the base file footers to fetch min max and other stats to populate values for the record.
d. Commit all these records to metadata table.

Expand Down