diff --git a/rfc/rfc-27/col_stats.png b/rfc/rfc-27/col_stats.png new file mode 100644 index 0000000000000..76aa6ab44e7c2 Binary files /dev/null and b/rfc/rfc-27/col_stats.png differ diff --git a/rfc/rfc-27/rfc-27.md b/rfc/rfc-27/rfc-27.md new file mode 100644 index 0000000000000..3b00af7c140a0 --- /dev/null +++ b/rfc/rfc-27/rfc-27.md @@ -0,0 +1,444 @@ + +# RFC-[27]: [Data skipping Index to improve query performance] + +## Proposers + +- @manojpec +- @shivnarayan +- @satish.kotha + +## Approvers +- @rmpifer +- @uditme + +## Status + +JIRA: https://issues.apache.org/jira/browse/HUDI-1822 + +> Please keep the status updated in `rfc/README.md`. + +## Abstract + +Query engines typically scan large amounts of irrelevant data for query planning and execution. Some workarounds are +available to reduce amount of irrelevant data scanned. These include +- Partition pruning +- File pruning
+ - Some data file formats contain metadata including range information for certain columns (for parquet, this metadata + is stored in footer). + - As part of query planning, all range information from data files is read. + - Irrelevant data files are then pruned based on predicates and available range information + +Partition pruning typically puts the burden on users to select partitions where the data may exist. File pruning approach + is expensive and does not scale if there are large number of partitions and data files to be scanned. So we propose a + new solution to store additional information as part of Hudi metadata table to implement data skipping index. The + goals of data skipping index is to provide: + +- Global index: Users query for information they need without need for specifying partitions. Index can effectively find + data files in the table. +- Improve query plan: Efficiently find data files that have information for specified query predicates. +- Support multiple types of index: Initial implementation may provide range index. But goal is provide flexible + framework to implement other types of index (e.g. bloom) + +## Background +RFC-15 added metadata table support to Hudi for optimized file listing. RFC-37 is adding metadata index and column stats +as another partition to metadata table. This RFC will piggyback on the column stats partition that RFC-37 will be adding +to metadata table. + +Notes: Effectiveness of the index will be proportional to how data is layed out. If every file contains data for +commonly specified query predicate, index may not be very effective. + +## Implementation +At a high level there are 3 components to implement index support: +- Storage format +- Metadata generation +- Query engine integration. + +### Column_Stats Index/Partition +We want to support multiple types of index (range, bloom etc). So it is important to generate different types of record +for different columns. Focus of this RFC will be column range or column stats index. i.e min, max values, null counts etc. +Users can configure the commonly queried columns and columns stats partition in metadata table will store all stats pertaining +to the configured columns for every valid data file where the column is present. + +Similar to how we generate records for files partition in metadata table, we will generate HoodieMetadataRecord +for column stats partition on any commit that gets applied to metadata table. Basic building block of metadata table used +for file listing will be used for this column stats partition as well (how updates are applied to metadata table, +how invalid data is ignored, etc) + +Column_stats partition stores statistics for all indexed columns in the Hudi data table. The index maintained in this +partition helps +Predicate pushing/data skipping - file filtering based on column predicates + +For the purpose of column predicate filtering, this partition can store statistics for any column as per configs. + +So, high level requirement for this column_stats partition is (pertaining to this RFC): + - Given a list of columns and predicates(and optionally partitions), return a list of matching file names + +### Storage format +To cater to the above requirement, we plan to encode column name, partition path and file name to the keys in HFile. +Since HFile supports efficient range/prefix search, our look up should be very fast. + +![Column Stats Partition](col_stats.png) + +We plan to generate unique and random and unique hash IDs for all 3 components +- ColumnID : + - base64(hash32(column name)) + - on-disk size = 12bytes per col_stat per file +- PartitionID: + - base64(hash32(partition name)) + - on-disk size = 12bytes per partition +- FileID: + - base64(hash128(file name)) + - on-disk size = 24bytes per file + +#### Design Choices for ID generation +1. Incremental IDs: Sequentially increasing IDs can be generated in the context of the ongoing commit/write. ID can always start at 1 and to make the full ID unique enough, sequential IDs can be appended with the ongoing commit time. + a. Pros: + ID is simple to generate, doesn't depend on key lookups for resuming the ID generation across writers. + Overall ID can be shorter than Hash based IDs and can still be unique + Differential/delta encoding goes good with sequential numbers and can get high compression ratio (though we didn't see this in the tests) + b. Cons: + Same column can be given several IDs across several commits spilled over several files. Complex merging logic is needed to coalesce them all when looking up for any interested columns. + Doesn't go good with schema evolution. Even without schema evolution, changing IDs for the same column by itself is small schema evolution problem. + +2. Hash IDs: Hashing utilities can be used to generate unique and random IDs of any length for the given column/partition/file name. + a. Pros: + Deterministic Name to ID generation + Reverse lookup of ID to name is possible by relatively much smaller meta index read + ID length can be controlled for the scaling needs + Sharding and locality can be controlled by prefixing with more bits (doable by Incremental IDs also) + b. Cons: + Big scale deployments demand a huge ID space for files there by needing to generate 128 bits hashes + These are usually 32 digit hex chars, taking up at least 32 bytes/ID on disk. However, base64 encoding can help to shave off few bytes and get them to 24 bytes. + Takes up larger space in-memory and on-disk compared to Sequential IDs. Theoretically, the compression ratio should be lesser compared to Sequential IDs. + +Key format in column_stats partition
+- [colId][PartitionId][FileId] +- [colId]+"agg"+[PartitionId] + +First type will be used to store one entry per column per file. And second type will be used to store one aggregated +entry per column per partition. This will be a fixed size key. Lookups don't have to search for ID delimiters as in the +case of incremental IDs. + +These key encodings fit in well to serve our requirements. +Since we are using Hfile as the format, all keys are going to be sorted and hence range read will be very effective for +our use-case as we have chosen the key format consciously having this in mind. + +Given a list of columns and optionally partitions, return a list of matching file names. + +1. We can do prefix search of [ColumnID] or [ColumnID][PartitionID] + - If both columnId and partitionIds are supplied, we will do range read of [colId][partitionId]. + - If list of partitions not available as part of query, we will first look up [colId]+"agg" to do prefix search + for partition level stats. Filter for those partitions which matches the predicates and then follow (1) as in previous line. + +2. Fetch only interested entries for [colId][partitionId] list. +3. Will look up the stats and filter for matching FileIDs +4. Reverse lookup in Files partition to get FileID to FileName mapping. + +Note: +As you could see here, reverse look up of FileId to fileName mapping has to go into "Files" partition to satisfy our requirement. +So, "Files" partition will be added with additional entries of fileId to fileName mappings on the write path. + +#### Sharding: +Any partition in metadata table needs to be instantiated with N file groups/shards upfront. "Files" partition is small and hence +we went with just one file group. But for record level index, we can't go with single file group and had to shard the data. +We will employ some kind of hashing mechanism for key to file group mapping. On the write path, entries will be sharded +and written to different file groups. On the read path, key to be looked up will be hashed to find the right file group +to be looked up. For wild card search, all file groups will be looked up. + +// To be revisited.
+We plan to instantiate the number of file groups in column stats partition based on number of columns being indexed. +We can't estimate the data scale upfront, to which the table might grow eventually and hence have to go with some estimates. +So a rough idea is to instantiate one file group for 10 columns being indexed. Or get some rough input from the user whether +the table will be a small/medium/large scale and determine based on that. + +Similar to how we generate records for files partition in metadata table, we will generate HoodieMetadataRecord +for column stats partition on any commit that gets applied to metadata table. + +### Metadata generation +The existing metadata payload schema will be extended and shared for this new "column_stats" partition also. The type +field will be used to detect the column stats payload record. Here is the schema for the column stats payload record. + +``` + "namespace": "org.apache.hudi.avro.model", + "type": "record", + "name": "HoodieMetadataRecord", + "doc": "A record saved within the Metadata Table", + "fields": [ + { + "name": "key", + "type": "string" + }, + { + "name": "type", + "doc": "Type of the metadata record", + "type": "int" + }, + { "name": "filesystemMetadata", + . + . + . + }, + { + "name": "ColumnStatsMetadata", + "doc": "Contains information about column statistics for all data files in the table", + "type": [ + "null", + { + "type": "record", + "name": "HoodieColumnStats", + "fields": [ + { + "name": "rangeLow", + "type": [ + "null", + "bytes" + ], + "doc": "Low end of the range. For now, this is a String. Based on main data table schema, we can convert it to appropriate type" + }, + { + "name": "rangeHigh", + "type": [ + "null", + "bytes" + ], + "doc": "High end of the range. For now, this is a String. Based on main data table schema, we can convert it to appropriate type" + }, + { + "name":"total_values", + "type":["long", "null"], + "doc" : "Stores total values for this column in the resepective data file" + }, + { + "name":"total_nulls", + "type":["long", "null"], + "doc" : "Stores total null values for this column in the resepective data file" + }, + { + "name":"total_nans", + "type":["long", "null"], + "doc" : "Stores total Nan values for this column in the resepective data file" + }, + { + "name":"total_size_on_disk", + "type":["long", "null"], + "doc" : "Stores total size occupied by this column on disk corresponding to the resepective data file" + }, + { + "name": "isDeleted", + "type": "boolean", + "doc": "True if this file has been deleted" + } + ] + } + ] + } +``` + +Column stats records hold all stats for the file. The key for the column stat record would be an +encoded string as discussed earlier. + +``` +key = base64_encode(hash64(column name) + hash64(partition name) + hash128(file path)) +key = base64_encode(hash64(column name) + "agg" + hash64(partition name)) +``` + +While Hash based IDs have quite a few desirable properties in the context of Hudi index lookups, there is an impact +on the column level schema changes though. Refer to [Schema Evolution](#Schema-Evolution) section for more details. + +#### Writer flow +Let's walk through the writer flow to update column_stats partition in metadata table. + +1. Files partition - prepare records for adding // just calling out whats required in the context of column_stats + partition. General files partition will be updated as usual to store file listing information. + - FileID => FileName mapping entries + - PartitionID => PartitionName entry, if not already exists + - Since these IDs are hash based IDs, no look up of prior usages is required here. If not, we need to know what was + the last assigned ID and then go about assigning new incremental/sequential IDs, which slows down the performance significantly +2. Column_stats partition - prepare records for adding + - [ColumnID][PartitionID][FileID] => ColumnStat + - [ColumnId]"agg"[PartitionId] => ColumnStat + - This involves reading the base file footers to fetch min max and other stats to populate values for the record. +d. Commit all these records to metadata table. + +We need to ensure we have all sufficient info in WriteStatus/Commit Metadata that gets passed to metadata writer for +every commit. Reading parquet footers and meta is unavoidable, but other than that, we should try to embed all other info +in the WriteStatus. + +### Index integrations with query engines + +#### Spark +We already added support for z-ordering with 0.10.0. So, we will re-use data skipping code paths from there. + +Here is the high level flow of z-ordering: +##### Write path +1. Sort the data (Z-order/Hilbert/Linear) + - Being triggered by Clustering (right now) + - RDDSpatialCurveOptimizationSortPartitioner +2. Build "Col Stats" Index (.hoodie/.colstatsindex) + - Upon Clustering completion we invoke ColumnStatsIndexHelper.updateColumnStatsIndexFor + +##### Read path +1. (Spark SQL) Asks for a list of files to fetch data from + - HoodieFileIndex.listFiles +2. HoodieFileIndex will read Col Stats Index and apply the data predicates to fetch list of candidate files from it +3. Returns it back to Spark + +Given this, lets see how we can integrate the new column_stats partition. + +##### Z-order Write path +1. Sort the data (Z-order/Hilbert/Linear) + - Being triggered by Clustering (right now) + - RDDSpatialCurveOptimizationSortPartitioner +2. Do not do anything. + - Upon Clustering completion, replace commit will get applied to metadata table by default if metadata is enabled. + +##### Read path +1. (Spark SQL) Asks for a list of files to fetch data from + - HoodieFileIndex.listFiles +2. HoodieFileIndex will read Col Stats partition in metadata table and apply the data predicates to fetch list of candidate files from it +3. Returns it back to Spark + +One caveat: +But we can't get rid of z-order index completely though right away. If metadata table is not build out yet or has entered +an inconsistent state and is not usable, we have to go the existing way of building an index at the end of z-order clustering. + +### Predicate filtering + +#### How to apply query predicates in Hudi? +Query predicates are normally constructed in a tree like structure so this will follow same pattern. The proposal is +create a mapping utility from “Engine” query predicates to a HudiExpression. This way filtering logic is engine agnostic + +For AND and OR operators we can translate to a tree node with left and right expressions. An example is shown below of what the structure would look + +```java +public class HudiExpressionParentNode implements HudiExpression { + HudiExpression left; + HudiExpression right; + + @override + boolean evaluate() { + left.evaluate() && right.evaluate() + } +} +``` + +For LEAF nodes we can create expression which contains the operator and value we are comparing to determine whether the +file group may have data relevant to this query. The common search expressions for the leaf nodes: + +1. Equal to - if value in search expression greater than or equal to lower bound and is less than or equal to upper bound + in file’s column statistics then true, else false +2. Less than - if value in search expression is greater than lower bound in file’s column statistics then true, else false +3. Less than or equal to - if value in search expression is greater than or equal to lower bound in file’s column statistics + then true, else false +4. Greater than - if value in search expression is lower than upper bound in file’s column statistics then true, else false +5. Greater than or equal to - if value in search expression is lower than or equal to upper bound in file’s column statistics + then true, else false + +True tells us that there is a possibility that the file contains data which matches the search expression and to include +in result set. False tells us that there is no possibility this file contains any data which matches the search +expression and to exclude from the results. + +```java +public class HudiExpressionLeafNode implements HudiExpression { + + Operator op; // (EQ, LT, LTEQ, GT, GTEQ) + T literal; // (INT, DOUBLE, FLOAT value) + String column; + + @override + boolean evaluate() +} +``` + +This way we can call evaluate on the root HudiExpression tree and it will determine whether the entire expression is +satisfied for the file group. + +#### Hive +In order for us to implement predicate push down in Hive we need to have access to the query predicate. Query predicate +is not passed to InputFormat by default. HiveStoragePredicateHandler interface needs to be implemented in order to +provide query predicate to InputFormat and for this we need to create a custom HiveStorageHandler. Therefore we will +be creating new storage handler HudiStorageHandler. + +```java +public interface HiveStorageHandler extends Configurable { + public Class getInputFormatClass(); + public Class getOutputFormatClass(); + public Class getSerDeClass(); + public HiveMetaHook getMetaHook(); + public void configureTableJobProperties( + TableDesc tableDesc, + Map jobProperties); +} +``` + +Everything will remain same with input format, output format, and serde classes being used in existing Hudi tables +registered in Hive (HoodieParquetInputFormat still being used). HudiStorageHandler would implement HiveStorageHandler +and HiveStoragePredicateHandler. + +Hive adds the query predicate returned by the Storage Handler to the job configuration. This job configuration is then +supplied to the Input Format. It can be fetched and deserialized using the following: + +```java + String hiveFilter = jobConf.get(TableScanDesc.FILTER_EXPR_CONF_STR); + if (hiveFilter != null) { + ExprNodeGenericFuncDesc exprNodeDesc = SerializationUtilities + .deserializeObject(hiveFilter, ExprNodeGenericFuncDesc.class); + SearchArgument sarg = ConvertAstToSearchArg.create(job, exprNodeDesc); +``` + +The SearchArgument contains an ExpressionTree and a list of PredicateLeaf. The ExpressionTree is a tree structure used +to define the query predicate. If operator is defined as OR, AND, or NOT this indicates there are children expressions, +normally LEAFs. + +```java +public class ExpressionTree { + public enum Operator {OR, AND, NOT, LEAF, CONSTANT} + private final Operator operator; + private final List children; + private int leaf; +``` + +If operator in ExpressionTree is defined as LEAF it corresponds to a PredicateLeaf defined in the Search Argument. +PredicateLeaf will contain information about the query predicate such as operator, column name, and literal which is +being compared + +```java + private final org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf.Operator operator; + private final Type type; + private String columnName; + private final Object literal; + private final List literalList; + +``` + +We can use this information and the SearchArgument to generate our HudiExpression. Then in HoodieParquetInputFormat.listStatus() +after fetching files from FileSystemView for the remaining file groups we can apply HudieExpression using column metadata. + +#### Presto +To be filled. + +## Rollout/Adoption Plan + +- What impact (if any) will there be on existing users? +- If we are changing behavior how will we phase out the older behavior? +- If we need special migration tools, describe them here. +- When will we remove the existing behavior + +## Test Plan + +Describe in few sentences how the RFC will be tested. How will we know that the implementation works as expected? How will we know nothing broke?. \ No newline at end of file