Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 134 additions & 35 deletions rfc/rfc-46/rfc-46.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ when dealing with records (during merge, column value extractions, writing into

While having a single format of the record representation is certainly making implementation of some components simpler,
it bears unavoidable performance penalty of de-/serialization loop: every record handled by Hudi has to be converted
from (low-level) engine-specific representation (`Row` for Spark, `RowData` for Flink, `ArrayWritable` for Hive) into intermediate
from (low-level) engine-specific representation (`InternalRow` for Spark, `RowData` for Flink, `ArrayWritable` for Hive) into intermediate
one (Avro), with some operations (like clustering, compaction) potentially incurring this penalty multiple times (on read-
and write-paths).

Expand Down Expand Up @@ -84,59 +84,105 @@ is known to have poor performance (compared to non-reflection based instantiatio

#### Record Merge API

Stateless component interface providing for API Combining Records will look like following:
CombineAndGetUpdateValue and Precombine will converge to one API. Stateless component interface providing for API Combining Records will look like following:

```java
interface HoodieMerge {
HoodieRecord preCombine(HoodieRecord older, HoodieRecord newer);

Option<HoodieRecord> combineAndGetUpdateValue(HoodieRecord older, HoodieRecord newer, Schema schema, Properties props) throws IOException;
}
interface HoodieRecordMerger {

/**
* Spark-specific implementation
* The kind of merging strategy this recordMerger belongs to. An UUID represents merging strategy.
*/
class HoodieSparkRecordMerge implements HoodieMerge {
String getMergingStrategy();

// This method converges combineAndGetUpdateValue and precombine from HoodiePayload.
// It'd be associative operation: f(a, f(b, c)) = f(f(a, b), c) (which we can translate as having 3 versions A, B, C of the single record, both orders of operations applications have to yield the same result)
Option<HoodieRecord> merge(HoodieRecord older, HoodieRecord newer, Schema schema, Properties props) throws IOException;

// The record type handled by the current merger
// SPARK, AVRO, FLINK
HoodieRecordType getRecordType();
}

@Override
public HoodieRecord preCombine(HoodieRecord older, HoodieRecord newer) {
// HoodieSparkRecords preCombine
}
/**
* Spark-specific implementation
*/
class HoodieSparkRecordMerger implements HoodieRecordMerger {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please check my comment in here:
#5629 (comment)

We should allow user to implement single RecordMerger object supporting every engine type


@Override
public String getMergingStrategy() {
return UUID_MERGER_STRATEGY;
}

@Override
Option<HoodieRecord> merge(HoodieRecord older, HoodieRecord newer, Schema schema, Properties props) throws IOException {
// HoodieSparkRecord precombine and combineAndGetUpdateValue. It'd be associative operation.
}

@Override
public Option<HoodieRecord> combineAndGetUpdateValue(HoodieRecord older, HoodieRecord newer, Schema schema, Properties props) {
// HoodieSparkRecord combineAndGetUpdateValue
}
@Override
HoodieRecordType getRecordType() {
return HoodieRecordType.SPARK;
}
}

/**
* Flink-specific implementation
*/
class HoodieFlinkRecordMerge implements HoodieMerge {

@Override
public HoodieRecord preCombine(HoodieRecord older, HoodieRecord newer) {
// HoodieFlinkRecord preCombine
}
/**
* Flink-specific implementation
*/
class HoodieFlinkRecordMerger implements HoodieRecordMerger {

@Override
public String getMergingStrategy() {
return UUID_MERGER_STRATEGY;
}

@Override
Option<HoodieRecord> merge(HoodieRecord older, HoodieRecord newer, Schema schema, Properties props) throws IOException {
// HoodieFlinkRecord precombine and combineAndGetUpdateValue. It'd be associative operation.
}

@Override
public Option<HoodieRecord> combineAndGetUpdateValue(HoodieRecord older, HoodieRecord newer, Schema schema, Properties props) {
// HoodieFlinkRecord combineAndGetUpdateValue
}
@Override
HoodieRecordType getRecordType() {
return HoodieRecordType.FLINK;
}
}
```
Where user can provide their own subclass implementing such interface for the engines of interest.

#### Migration from `HoodieRecordPayload` to `HoodieMerge`
#### Migration from `HoodieRecordPayload` to `HoodieRecordMerger`

To warrant backward-compatibility (BWC) on the code-level with already created subclasses of `HoodieRecordPayload` currently
already used in production by Hudi users, we will provide a BWC-bridge in the form of instance of `HoodieMerge`, that will
already used in production by Hudi users, we will provide a BWC-bridge in the form of instance of `HoodieRecordMerger` called `HoodieAvroRecordMerger`, that will
be using user-defined subclass of `HoodieRecordPayload` to combine the records.

Leveraging such bridge will make provide for seamless BWC migration to the 0.11 release, however will be removing the performance
Leveraging such bridge will provide for seamless BWC migration to the 0.11 release, however will be removing the performance
benefit of this refactoring, since it would unavoidably have to perform conversion to intermediate representation (Avro). To realize
full-suite of benefits of this refactoring, users will have to migrate their merging logic out of `HoodieRecordPayload` subclass and into
new `HoodieMerge` implementation.
new `HoodieRecordMerger` implementation.

Precombine is used to merge records from logs or incoming records; CombineAndGetUpdateValue is used to merge record from log file and record from base file.
these two merge logics are unified in HoodieAvroRecordMerger as merge function. `HoodieAvroRecordMerger`'s API will look like following:

```java
/**
* Backward compatibility HoodieRecordPayload implementation
*/
class HoodieAvroRecordMerger implements HoodieRecordMerger {

@Override
public String getMergingStrategy() {
return UUID_MERGER_STRATEGY;
}

@Override
Option<HoodieRecord> merge(HoodieRecord older, HoodieRecord newer, Schema schema, Properties props) throws IOException {
// HoodieAvroRecordMerger precombine and combineAndGetUpdateValue. It'd be associative operation.
}

@Override
HoodieRecordType getRecordType() {
return HoodieRecordType.AVRO;
}
}
```

### Refactoring Flows Directly Interacting w/ Records:

Expand All @@ -156,13 +202,66 @@ Following major components will be refactored:
3. `HoodieRealtimeRecordReader`s
1. API will be returning opaque `HoodieRecord` instead of raw Avro payload

### Config for RecordMerger
The RecordMerger is engine-aware. We provide a config called MERGER_IMPLS. You can set a list of RecordMerger class name to it. And you can set MERGER_STRATEGY which is Id of merger strategy. Hudi will pick RecordMergers in MERGER_IMPLS which has the same merger strategy id with MERGER_STRATEGY according to the engine type at runtime.

### Public Api in HoodieRecord
Because we implement different types of records, we need to implement functionality similar to AvroUtils in HoodieRecord for different data(avro, InternalRow, RowData).
Its public API will look like following:

```java
import java.util.Properties;

class HoodieRecord {

/**
* Get column in record to support RDDCustomColumnsSortPartitioner
*/
Object getRecordColumnValues(Schema recordSchema, String[] columns,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should return an array of objects, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is consistent with HoodieAvroUtils#getRecordColumnValues . Column value if a single column, or concatenated String values by comma.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wzx140 understand where you're coming from.

We should have already deprecated getRecordColumnValues as this method is heavily coupled to where it's used currently and unfortunately isn't generic enough to serve its purpose. In this particular case converting the values and concat-ing them as strings doesn't make sense for a generic utility -- whenever someone requests a list of column values they expect to get a list of values (as they are) as compared to receiving a string (!) of concatenated values.

boolean consistentLogicalTimestampEnabled);

/**
* Support bootstrap.
*/
HoodieRecord mergeWith(HoodieRecord other, Schema targetSchema) throws IOException;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we've discussed prior we should avoid adding any merging semantic to the Record API itself. What this method is going to be used for?


/**
* Rewrite record into new schema(add meta columns)
*/
HoodieRecord rewriteRecord(Schema recordSchema, Properties props, Schema targetSchema)
throws IOException;

/**
* Support schema evolution.
*/
HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties props, Schema newSchema,
Map<String, String> renameCols) throws IOException;

HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties props, Schema newSchema) throws IOException;

HoodieRecord updateValues(Schema recordSchema, Properties props,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest to

  1. Name it updateMetadataValues to avoid confusion (we shouldn't be allowing to modify the record's payload)
  2. Instead of Map<Sstring, String> let's create a strongly typed Java class w/ all meta-fields and pass it here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is also used in HoodieHFileDataBlock#serializeRecord to update recordKey field

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wzx140 we should split these up:

  • Only legitimate use-case for us to update fields is Hudi's metadata
  • HoodieHFileDataBlock shouldn't be modifying existing payload but should instead be rewriting w/o the field it wants to omit. We will tackle that separately, and for the sake of RFC-46 we can create temporary method truncateRecordKey which will be overwriting record-key value for now (we will deprecate and remove this method after we address this)

We should not leave a loophole where we allow a record to be modified to make sure that nobody can start building against this API

Map<String, String> metadataValues) throws IOException;

boolean isDelete(Schema recordSchema, Properties props) throws IOException;

/**
* Is EmptyRecord. Generated by ExpressionPayload.
*/
boolean shouldIgnore(Schema recordSchema, Properties props) throws IOException;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you referring to EmptyHoodieRecordPayload? It's as well used tombstone record (ie delete).

We should be fine with just one deleting mechanism.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. This method check whether is EmptyRecord SENTINEL.

Copy link
Contributor

@alexeykudinkin alexeykudinkin Aug 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably update the java-doc then to avoid ref to any particular implementation


Option<HoodieAvroIndexedRecord> toIndexedRecord(Schema schema, Properties props)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need HoodieAvroIndexedRecord?
I think HoodieAvroRecord should be enough

throws IOException;

// Other functions with getter or setter ...
}
```

## Rollout/Adoption Plan

- What impact (if any) will there be on existing users?
- Users of the Hudi will observe considerably better performance for most of the routine operations: writing, reading, compaction, clustering, etc due to avoiding the superfluous intermediate de-/serialization penalty
- By default, modified hierarchy would still leverage
- Users will need to rebase their logic of combining records by creating a subclass of `HoodieRecordPayload`, and instead subclass newly created interface `HoodieMerge` to get full-suite of performance benefits
- Users will need to rebase their logic of combining records by creating a subclass of `HoodieRecordPayload`, and instead subclass newly created interface `HoodieRecordMerger` to get full-suite of performance benefits
- If we are changing behavior how will we phase out the older behavior?
- Older behavior leveraging `HoodieRecordPayload` for merging will be marked as deprecated in 0.11, and subsequently removed in 0.1x
- If we need special migration tools, describe them here.
Expand Down