diff --git a/rfc/rfc-46/rfc-46.md b/rfc/rfc-46/rfc-46.md index a851a4443ae6e..192bdbf8c6ca1 100644 --- a/rfc/rfc-46/rfc-46.md +++ b/rfc/rfc-46/rfc-46.md @@ -38,7 +38,7 @@ when dealing with records (during merge, column value extractions, writing into While having a single format of the record representation is certainly making implementation of some components simpler, it bears unavoidable performance penalty of de-/serialization loop: every record handled by Hudi has to be converted -from (low-level) engine-specific representation (`Row` for Spark, `RowData` for Flink, `ArrayWritable` for Hive) into intermediate +from (low-level) engine-specific representation (`InternalRow` for Spark, `RowData` for Flink, `ArrayWritable` for Hive) into intermediate one (Avro), with some operations (like clustering, compaction) potentially incurring this penalty multiple times (on read- and write-paths). @@ -84,59 +84,105 @@ is known to have poor performance (compared to non-reflection based instantiatio #### Record Merge API -Stateless component interface providing for API Combining Records will look like following: +CombineAndGetUpdateValue and Precombine will converge to one API. Stateless component interface providing for API Combining Records will look like following: ```java -interface HoodieMerge { - HoodieRecord preCombine(HoodieRecord older, HoodieRecord newer); - - Option combineAndGetUpdateValue(HoodieRecord older, HoodieRecord newer, Schema schema, Properties props) throws IOException; -} +interface HoodieRecordMerger { /** - * Spark-specific implementation + * The kind of merging strategy this recordMerger belongs to. A UUID represents merging strategy. */ - class HoodieSparkRecordMerge implements HoodieMerge { + String getMergingStrategy(); + + // This method converges combineAndGetUpdateValue and precombine from HoodiePayload. + // It'd be associative operation: f(a, f(b, c)) = f(f(a, b), c) (which we can translate as having 3 versions A, B, C of the single record, both orders of operations applications have to yield the same result) + Option merge(HoodieRecord older, HoodieRecord newer, Schema schema, Properties props) throws IOException; + + // The record type handled by the current merger + // SPARK, AVRO, FLINK + HoodieRecordType getRecordType(); +} - @Override - public HoodieRecord preCombine(HoodieRecord older, HoodieRecord newer) { - // HoodieSparkRecords preCombine - } +/** + * Spark-specific implementation + */ +class HoodieSparkRecordMerger implements HoodieRecordMerger { + + @Override + public String getMergingStrategy() { + return UUID_MERGER_STRATEGY; + } + + @Override + Option merge(HoodieRecord older, HoodieRecord newer, Schema schema, Properties props) throws IOException { + // HoodieSparkRecord precombine and combineAndGetUpdateValue. It'd be associative operation. + } - @Override - public Option combineAndGetUpdateValue(HoodieRecord older, HoodieRecord newer, Schema schema, Properties props) { - // HoodieSparkRecord combineAndGetUpdateValue - } + @Override + HoodieRecordType getRecordType() { + return HoodieRecordType.SPARK; } +} - /** - * Flink-specific implementation - */ - class HoodieFlinkRecordMerge implements HoodieMerge { - - @Override - public HoodieRecord preCombine(HoodieRecord older, HoodieRecord newer) { - // HoodieFlinkRecord preCombine - } +/** + * Flink-specific implementation + */ +class HoodieFlinkRecordMerger implements HoodieRecordMerger { + + @Override + public String getMergingStrategy() { + return UUID_MERGER_STRATEGY; + } + + @Override + Option merge(HoodieRecord older, HoodieRecord newer, Schema schema, Properties props) throws IOException { + // HoodieFlinkRecord precombine and combineAndGetUpdateValue. It'd be associative operation. + } - @Override - public Option combineAndGetUpdateValue(HoodieRecord older, HoodieRecord newer, Schema schema, Properties props) { - // HoodieFlinkRecord combineAndGetUpdateValue - } + @Override + HoodieRecordType getRecordType() { + return HoodieRecordType.FLINK; } +} ``` Where user can provide their own subclass implementing such interface for the engines of interest. -#### Migration from `HoodieRecordPayload` to `HoodieMerge` +#### Migration from `HoodieRecordPayload` to `HoodieRecordMerger` To warrant backward-compatibility (BWC) on the code-level with already created subclasses of `HoodieRecordPayload` currently -already used in production by Hudi users, we will provide a BWC-bridge in the form of instance of `HoodieMerge`, that will +already used in production by Hudi users, we will provide a BWC-bridge in the form of instance of `HoodieRecordMerger` called `HoodieAvroRecordMerger`, that will be using user-defined subclass of `HoodieRecordPayload` to combine the records. -Leveraging such bridge will make provide for seamless BWC migration to the 0.11 release, however will be removing the performance +Leveraging such bridge will provide for seamless BWC migration to the 0.11 release, however will be removing the performance benefit of this refactoring, since it would unavoidably have to perform conversion to intermediate representation (Avro). To realize full-suite of benefits of this refactoring, users will have to migrate their merging logic out of `HoodieRecordPayload` subclass and into -new `HoodieMerge` implementation. +new `HoodieRecordMerger` implementation. + +Precombine is used to merge records from logs or incoming records; CombineAndGetUpdateValue is used to merge record from log file and record from base file. +these two merge logics are unified in HoodieAvroRecordMerger as merge function. `HoodieAvroRecordMerger`'s API will look like following: + +```java +/** + * Backward compatibility HoodieRecordPayload implementation + */ +class HoodieAvroRecordMerger implements HoodieRecordMerger { + + @Override + public String getMergingStrategy() { + return UUID_MERGER_STRATEGY; + } + + @Override + Option merge(HoodieRecord older, HoodieRecord newer, Schema schema, Properties props) throws IOException { + // HoodieAvroRecordMerger precombine and combineAndGetUpdateValue. It'd be associative operation. + } + + @Override + HoodieRecordType getRecordType() { + return HoodieRecordType.AVRO; + } +} +``` ### Refactoring Flows Directly Interacting w/ Records: @@ -156,13 +202,66 @@ Following major components will be refactored: 3. `HoodieRealtimeRecordReader`s 1. API will be returning opaque `HoodieRecord` instead of raw Avro payload +### Config for RecordMerger +The RecordMerger is engine-aware. We provide a config called MERGER_IMPLS. You can set a list of RecordMerger class name to it. And you can set MERGER_STRATEGY which is UUID of RecordMerger. Hudi will pick RecordMergers in MERGER_IMPLS which has the same MERGER_STRATEGY according to the engine type at runtime. + +### Public Api in HoodieRecord +Because we implement different types of records, we need to implement functionality similar to AvroUtils in HoodieRecord for different data(avro, InternalRow, RowData). +Its public API will look like following: + +```java +import java.util.Properties; + +class HoodieRecord { + + /** + * Get column in record to support RDDCustomColumnsSortPartitioner + */ + Object getRecordColumnValues(Schema recordSchema, String[] columns, + boolean consistentLogicalTimestampEnabled); + + /** + * Support bootstrap. + */ + HoodieRecord mergeWith(HoodieRecord other, Schema targetSchema) throws IOException; + + /** + * Rewrite record into new schema(add meta columns) + */ + HoodieRecord rewriteRecord(Schema recordSchema, Properties props, Schema targetSchema) + throws IOException; + + /** + * Support schema evolution. + */ + HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties props, Schema newSchema, + Map renameCols) throws IOException; + + HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties props, Schema newSchema) throws IOException; + + HoodieRecord updateMetadataValues(Schema recordSchema, Properties props, + MetadataValues metadataValues) throws IOException; + + boolean isDelete(Schema recordSchema, Properties props) throws IOException; + + /** + * Is EmptyRecord. Generated by ExpressionPayload. + */ + boolean shouldIgnore(Schema recordSchema, Properties props) throws IOException; + + Option toIndexedRecord(Schema schema, Properties props) + throws IOException; + + // Other functions with getter or setter ... +} +``` ## Rollout/Adoption Plan - What impact (if any) will there be on existing users? - Users of the Hudi will observe considerably better performance for most of the routine operations: writing, reading, compaction, clustering, etc due to avoiding the superfluous intermediate de-/serialization penalty - By default, modified hierarchy would still leverage - - Users will need to rebase their logic of combining records by creating a subclass of `HoodieRecordPayload`, and instead subclass newly created interface `HoodieMerge` to get full-suite of performance benefits + - Users will need to rebase their logic of combining records by creating a subclass of `HoodieRecordPayload`, and instead subclass newly created interface `HoodieRecordMerger` to get full-suite of performance benefits - If we are changing behavior how will we phase out the older behavior? - Older behavior leveraging `HoodieRecordPayload` for merging will be marked as deprecated in 0.11, and subsequently removed in 0.1x - If we need special migration tools, describe them here.