-
Notifications
You must be signed in to change notification settings - Fork 2.5k
[HUDI-2656] Generalize HoodieIndex for flexible record data type #3893
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
b7989f4 to
47eaf38
Compare
|
|
||
| package org.apache.hudi.common.model; | ||
|
|
||
| public class HoodieAvroRecord<T extends HoodieRecordPayload> extends HoodieRecord<T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be HoodieRecordPayload<T>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right. This is more like a retrofit for Java 7 style and users extending old HoodieRecord. This should be fixed in one shot with new APIs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@yihua sorry, not sure i understood your point. Can you elaborate?
Why do we want to extend HoodieAvroRecord with format-specific impl?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussed offline: this will be an intermediate state until RFC-46 is fully implemented
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed, this is more of an intermediate solution for row writer, before RFC-46 revamps it completely.
| import java.io.Serializable; | ||
| import java.util.Properties; | ||
|
|
||
| public interface HoodieRecordPayloadDelegate<T> extends Serializable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please add a java-doc to explain purpose of this class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea of adding HoodieRecordPayloadDelegate is the same as the recording combining/merging API, i.e., decoupling the merging logic from record payload and adding an independent interface for merging, taking two type-generic instances and returning the result instance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since this class is not used in this PR and it'll probably be replaced with new API like Mergeable, I'll remove it from this patch.
8aff109 to
af9fb3c
Compare
af9fb3c to
fd6f671
Compare
a9c7cf6 to
ff7e9f7
Compare
ff7e9f7 to
70ef5f9
Compare
| public Map<String, Integer> mapFileWithInsertsToUniquePartition(JavaRDD<WriteStatus> writeStatusRDD) { | ||
| Map<String, Integer> mapFileWithInsertsToUniquePartition(JavaRDD<WriteStatus> writeStatusRDD) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this method only used in test; changing to package access due to using RDD in the signature
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we don't want to depend on Guava we can add our own annotation with the same purpose as @VisibleForTesting
| public Tuple2<Long, Integer> getHBasePutAccessParallelism(final JavaRDD<WriteStatus> writeStatusRDD) { | ||
| Tuple2<Long, Integer> getHBasePutAccessParallelism(final JavaRDD<WriteStatus> writeStatusRDD) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
| */ | ||
|
|
||
| package org.apache.hudi.client.functional; | ||
| package org.apache.hudi.index.hbase; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
moved to the same package as SparkHoodieHBaseIndex so that some methods with RDD in the signature can be changed to package access
70ef5f9 to
b466262
Compare
| HoodieWriteConfig config, HoodieEngineContext context, HoodieTable hoodieTable, | ||
| HoodiePairData<String, String> partitionRecordKeyPairs, | ||
| HoodieData<ImmutablePair<String, HoodieKey>> fileComparisonPairs, | ||
| HoodieData<Pair<String, HoodieKey>> fileComparisonPairs, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hide implementation from public API
| @Override | ||
| @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING) | ||
| public <R> HoodieData<HoodieRecord<R>> tagLocation( | ||
| HoodieData<HoodieRecord<R>> records, HoodieEngineContext context, | ||
| HoodieTable hoodieTable) throws HoodieIndexException { | ||
| return HoodieList.of(tagLocation( | ||
| HoodieList.getList(records.map(record -> (HoodieRecord<T>) record)), context, hoodieTable)); | ||
| List<HoodieRecord<T>> hoodieRecords = tagLocation(HoodieList.getList(records.map(record -> (HoodieRecord<T>) record)), context, hoodieTable); | ||
| return HoodieList.of(hoodieRecords.stream().map(r -> (HoodieRecord<R>) r).collect(Collectors.toList())); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@yihua don't seem to find a better way than this double casting.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is fine since this is more like an intermediate fix. Once RFC-46 is fully done, the logic here should be much cleaner without generics.
| */ | ||
| public static HoodieRecord getTaggedRecord(HoodieRecord inputRecord, Option<HoodieRecordLocation> location) { | ||
| HoodieRecord record = inputRecord; | ||
| HoodieRecord<?> record = inputRecord; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for fixing this!
|
|
||
| package org.apache.hudi.common.model; | ||
|
|
||
| public class HoodieAvroRecord<T extends HoodieRecordPayload> extends HoodieRecord<T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@yihua sorry, not sure i understood your point. Can you elaborate?
Why do we want to extend HoodieAvroRecord with format-specific impl?
| * A Single Record managed by Hoodie. | ||
| */ | ||
| public class HoodieRecord<T extends HoodieRecordPayload> implements Serializable { | ||
| public abstract class HoodieRecord<T> implements Serializable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@yihua @xushiyan let's chat more on this to make sure we're aligned on the approach going f/w:
I was thinking of keeping this component file-format agnostic and instead make it engine-specific, while refactoring MOR table read-path for efficient querying.
Can you elaborate what's the goal you're striving for w/ HoodieAvroRecord?
P.S. Putting this context in here for somebody who might not be aware of previous conversations
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@alexeykudinkin so the main goal here is to make HoodieRecord independent of HoodieRecordPayload, this will allow HoodieRowRecord to be used by row-writer code path. HoodieAvroRecord is also meant for compatibility with existing codebase and ease the decoupling. Yea we should discuss more about HoodieAvroRecord vs SparkHoodieRecord.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussed offline: this will be an intermediate state until RFC-46 is fully implemented
455714a to
de5b0b9
Compare
| @PublicAPIMethod(maturity = ApiMaturityLevel.DEPRECATED) | ||
| public I tagLocation(I records, HoodieEngineContext context, | ||
| HoodieTable<T, I, K, O> hoodieTable) throws HoodieIndexException { | ||
| HoodieTable hoodieTable) throws HoodieIndexException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As general rule of thumb, i think we should aim to fix all raw-types occurrences in the code-base.
Can you please fix new instances so that we at least don't add to our debt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@alexeykudinkin Yes, i plan to make a follow up PR to remove T from HoodieTable, then i can make the interface taking HoodieTable<I, K, O>.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@xushiyan @alexeykudinkin Since this is anyway backward incompatible, should we just remove the deprecated public API methods and get rid of I and O as well? The reason to keep these methods and the generics is to adapt for users extending these APIs. If you want to change the generics, I'd prefer that all such generics changes in relation to all public APIs should get in 0.11.0 release in one shot.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The deprecated APIs are going to be removed in a separate PR to keep the scope of this PR limited.
| @PublicAPIMethod(maturity = ApiMaturityLevel.DEPRECATED) | ||
| public O updateLocation(O writeStatuses, HoodieEngineContext context, | ||
| HoodieTable<T, I, K, O> hoodieTable) throws HoodieIndexException { | ||
| HoodieTable hoodieTable) throws HoodieIndexException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here as well
| public Map<String, Integer> mapFileWithInsertsToUniquePartition(JavaRDD<WriteStatus> writeStatusRDD) { | ||
| Map<String, Integer> mapFileWithInsertsToUniquePartition(JavaRDD<WriteStatus> writeStatusRDD) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we don't want to depend on Guava we can add our own annotation with the same purpose as @VisibleForTesting
|
|
||
| package org.apache.hudi.common.model; | ||
|
|
||
| public class HoodieAvroRecord<T extends HoodieRecordPayload> extends HoodieRecord<T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussed offline: this will be an intermediate state until RFC-46 is fully implemented
| * A Single Record managed by Hoodie. | ||
| */ | ||
| public class HoodieRecord<T extends HoodieRecordPayload> implements Serializable { | ||
| public abstract class HoodieRecord<T> implements Serializable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussed offline: this will be an intermediate state until RFC-46 is fully implemented
yihua
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall LGTM. Good job on revamping this! I put up one major call we need to make and a few nits.
| @PublicAPIMethod(maturity = ApiMaturityLevel.DEPRECATED) | ||
| public I tagLocation(I records, HoodieEngineContext context, | ||
| HoodieTable<T, I, K, O> hoodieTable) throws HoodieIndexException { | ||
| HoodieTable hoodieTable) throws HoodieIndexException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@xushiyan @alexeykudinkin Since this is anyway backward incompatible, should we just remove the deprecated public API methods and get rid of I and O as well? The reason to keep these methods and the generics is to adapt for users extending these APIs. If you want to change the generics, I'd prefer that all such generics changes in relation to all public APIs should get in 0.11.0 release in one shot.
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
Outdated
Show resolved
Hide resolved
| && !recordLocationHoodieKeyPair.get().getRight().getPartitionPath().equals(hoodieRecord.getPartitionPath())) { | ||
| // Create an empty record to delete the record in the old partition | ||
| HoodieRecord<T> deleteRecord = new HoodieRecord(recordLocationHoodieKeyPair.get().getRight(), | ||
| HoodieRecord<R> deleteRecord = new HoodieAvroRecord(recordLocationHoodieKeyPair.get().getRight(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs to be revisited to make it Avro agnostic later on.
| if (config.getGlobalSimpleIndexUpdatePartitionPath() && !(inputRecord.getPartitionPath().equals(partitionPath))) { | ||
| // Create an empty record to delete the record in the old partition | ||
| HoodieRecord<T> deleteRecord = new HoodieRecord(new HoodieKey(inputRecord.getRecordKey(), partitionPath), new EmptyHoodieRecordPayload()); | ||
| HoodieRecord<R> deleteRecord = new HoodieAvroRecord(new HoodieKey(inputRecord.getRecordKey(), partitionPath), new EmptyHoodieRecordPayload()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar here for revisiting later on.
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
Outdated
Show resolved
Hide resolved
...ent/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java
Outdated
Show resolved
Hide resolved
| @PublicAPIMethod(maturity = ApiMaturityLevel.DEPRECATED) | ||
| public abstract List<WriteStatus> updateLocation(List<WriteStatus> writeStatuses, | ||
| HoodieEngineContext context, | ||
| HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> hoodieTable) throws HoodieIndexException; | ||
| HoodieTable hoodieTable) throws HoodieIndexException; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar for engine-specific HoodieIndex classes to remove deprecated API methods altogether (in a separate PR).
...hudi-flink-client/src/test/java/org/apache/hudi/testutils/HoodieFlinkWriteableTestTable.java
Outdated
Show resolved
Hide resolved
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/TestHoodieIndexConfigs.java
Show resolved
Hide resolved
|
|
||
| package org.apache.hudi.common.model; | ||
|
|
||
| public class HoodieAvroRecord<T extends HoodieRecordPayload> extends HoodieRecord<T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed, this is more of an intermediate solution for row writer, before RFC-46 revamps it completely.
d7a82da to
e07767a
Compare
…che#3893) Co-authored-by: Raymond Xu <[email protected]>
…che#3893) Co-authored-by: Raymond Xu <[email protected]>
HoodieRecordabstract and useHoodieAvroRecordfor the implementation for extensibilityHoodieIndexindependent ofHoodieRecordPayload