Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
public static final String MAX_CONSISTENCY_CHECKS_PROP = "hoodie.consistency.check.max_checks";
public static int DEFAULT_MAX_CONSISTENCY_CHECKS = 7;

private static final String PAYLOAD_ORDERING_FIELD_PROP = "hoodie.payload.ordering.field";
Copy link
Contributor

@nsivabalan nsivabalan Dec 5, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vinothchandar @bhasudha : can you throw some light on why new config rather than reusing preCombine field value? I am working on putting up a new PR, so would like to know was this intentional or unintentional.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's because you don't know what the ordering field value is, when reading the data from storage right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, so for preCombine (combining two records within the same ingestion batch) we use one config and for determining ordering among one incoming record and another in storage or in other words, to combine an incoming record w/ one in storage, we use a diff config?
I see both these as one and the same. just that whether we resolve records within same batch or across batches is what differs.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, they are one and the same. If we can avoid the additional config, we should

Copy link
Contributor

@nsivabalan nsivabalan Dec 6, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gotcha. here is my take.
Will introduce a config called "honorOrderingToCombineRecordsAcrossBatches" (we can work out a good naming). default value is false. Will send in orderingFieldKey (same as preCombineFieldKey) as an arg to OverwriteWithLatestAvroPayload.

public OverwriteWithLatestAvroPayload(GenericRecord record, Comparable orderingVal, 
String orderingFieldKey, boolean honorOrderingToCombineRecordsAcrossBatches) {
.
.

Based on "honorOrderingToCombineRecordsAcrossBatches" value, combineAndGetUpdateValue() impl will decide to go with existing impl or new one (as we see in this patch)

I know we do have a gap here where in, if ordering field has changed over time, we can't do much here. But we can't afford to store the ordering field as a separate column in dataset either, as diff commits could theoretically have diff ordering field. But guess we can call it out that we may not support such evolution of ordering field.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer we don't do this. but separate the existing impl and new one into separate payload classes.

don't understand actually how we plan to implement obtaining the ordering field from the record in dfs. could you please clarify? @nsivabalan

Copy link
Contributor

@nsivabalan nsivabalan Dec 9, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similar to how this patch already does. basically we lookup the preCombine field in existing generic record.

Object persistedOrderingVal = getNestedFieldVal((GenericRecord) currentValue, props.get(ORDERING_FIELD_OPT_KEY), true);

Reason I proposed to tweak existing class is.
Even if we want to go w/ 2 classes, Ideally I would want to name existing class as OverwriteWithIncomingPayload and introduce a new class called OverwriteWithLatestAvroPayload, bcoz, existing one overwrites w/ incoming and we are introduce a new class which will actually overwrite w/ latest payload.
But since users configure via class names, we can't go w/ this approach and hence my proposal to introduce a config and change existing class.

private static String DEFAULT_PAYLOAD_ORDERING_FIELD_VAL = "";

/**
* HUDI-858 : There are users who had been directly using RDD APIs and have relied on a behavior in 0.4.x to allow
* multiple write operations (upsert/buk-insert/...) to be executed within a single commit.
Expand Down Expand Up @@ -274,6 +277,10 @@ public BulkInsertSortMode getBulkInsertSortMode() {
return BulkInsertSortMode.valueOf(sortMode.toUpperCase());
}

public String getPayloadOrderingField() {
return props.getProperty(PAYLOAD_ORDERING_FIELD_PROP, DEFAULT_PAYLOAD_ORDERING_FIELD_VAL);
}

/**
* compaction properties.
*/
Expand Down Expand Up @@ -913,6 +920,11 @@ public Builder withProperties(Properties properties) {
return this;
}

public Builder withPayloadOrderingField(String fieldName) {
props.setProperty(PAYLOAD_ORDERING_FIELD_PROP, fieldName);
return this;
}

protected void setDefaults() {
// Check for mandatory properties
setDefaultOnCondition(props, !props.containsKey(INSERT_PARALLELISM), INSERT_PARALLELISM, DEFAULT_PARALLELISM);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.utils.SparkConfigUtils;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.BaseAvroPayload;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.model.HoodieRecord;
Expand All @@ -47,6 +48,7 @@
import org.apache.log4j.Logger;

import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
Expand Down Expand Up @@ -221,7 +223,8 @@ public void write(GenericRecord oldRecord) {
HoodieRecord<T> hoodieRecord = new HoodieRecord<>(keyToNewRecords.get(key));
try {
Option<IndexedRecord> combinedAvroRecord =
hoodieRecord.getData().combineAndGetUpdateValue(oldRecord, useWriterSchema ? writerSchemaWithMetafields : writerSchema);
hoodieRecord.getData().combineAndGetUpdateValue(oldRecord, useWriterSchema ? writerSchemaWithMetafields : writerSchema,
Collections.singletonMap(BaseAvroPayload.ORDERING_FIELD_OPT_KEY, config.getPayloadOrderingField()));
if (writeUpdateRecord(hoodieRecord, combinedAvroRecord)) {
/*
* ONLY WHEN 1) we have an update for this key AND 2) We are able to successfully write the the combined new
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ public static Object getNestedFieldVal(GenericRecord record, String fieldName, b
* @param fieldValue avro field value
* @return field value either converted (for certain data types) or as it is.
*/
private static Object convertValueForSpecificDataTypes(Schema fieldSchema, Object fieldValue) {
public static Object convertValueForSpecificDataTypes(Schema fieldSchema, Object fieldValue) {
if (fieldSchema == null) {
return fieldValue;
}
Expand All @@ -438,7 +438,7 @@ private static Object convertValueForSpecificDataTypes(Schema fieldSchema, Objec
* @param fieldSchema avro field schema
* @return boolean indicating whether fieldSchema is of Avro's Date Logical Type
*/
private static boolean isLogicalTypeDate(Schema fieldSchema) {
public static boolean isLogicalTypeDate(Schema fieldSchema) {
if (fieldSchema.getType() == Schema.Type.UNION) {
return fieldSchema.getTypes().stream().anyMatch(schema -> schema.getLogicalType() == LogicalTypes.date());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
* Base class for all AVRO record based payloads, that can be ordered based on a field.
*/
public abstract class BaseAvroPayload implements Serializable {

public static final String ORDERING_FIELD_OPT_KEY = "ordering.field";
/**
* Avro data extracted from the source converted to bytes.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,25 @@ public interface HoodieRecordPayload<T extends HoodieRecordPayload> extends Seri
* @param schema Schema used for record
* @return new combined/merged value to be written back to storage. EMPTY to skip writing this record.
*/
@Deprecated
Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) throws IOException;

/**
* This methods lets you write custom merging/combining logic to produce new values as a function of current value on
* storage and whats contained in this object.
* <p>
* eg: 1) You are updating counters, you may want to add counts to currentValue and write back updated counts 2) You
* may be reading DB redo logs, and merge them with current image for a database row on storage
*
* @param currentValue Current value in storage, to merge/combine this payload with
* @param schema Schema used for record
* @param props Payload related properties. For example pass the ordering field(s) name to extract from value in storage.
* @return new combined/merged value to be written back to storage. EMPTY to skip writing this record.
*/
default Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema, Map<String, String> props) throws IOException {
Copy link
Contributor

@n3nash n3nash Jun 11, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you describe any more use-cases apart from the ordering field for the Map<> introduced ? Why can't the user simply perform this during the old combineAndGetUpdateValue by comparing the incoming ordering field val vs the old one (in the GenericRecord), what am I missing ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Map was a more generic structure to send properties. For now, its only the ordering field that is required here. For your second question, not sure if I am understanding it right. Are you suggesting, we send in the ordering field val itself instead of the field name ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me explain. The current interface combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) doesn't allow the user to pass the ordering field names. You're trying to introduce this and have a Map<> to allow the user to do this. (The description link in the JIRA ticket doesn't work so I needed more context).
Now, since the user can have their their own implementation of combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema), they can have the ordering field name in their own class and maintain it themselves, thus not requiring the need to explicitly pass these names through a method.
I see that the intention of this PR to support this functionality specifically for the OverwriteWithLatestAvroPayload. For preCombine, will we just continue to support natural ordering which may not be the same as for combineAndGetUpdateValue, correct ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the clarification @n3nash . I am not able to open the link either. It could be because the initial reporter had different jira ids or something else. But the comments section has good context on what this ticket is about. Also, this has come across multiple times in Slack channels from different users. Based on that I can summarize as follows.

Users are looking to consider records in disk when using OverwriteWithLatestAvro payload class. In preCombine step - we pick the latest value for every key from a batch of input data based on some ordering field. At this step we dont know what is in storage yet. When we are ready to write the records, we iterate the records in storage, for each record determine if there is a matching entry in the input batch. If so, we invoke combineAndGetUpdateValue and pass in the record in disk as the currentValue param. In this step specifically, OverwriteWithLatestAvro can possibly overwrite an already stored record with an older update since we werent comparing the ordering val of new data with data in disk. This whole PR is to provide that capability instead of asking Users to write their own implementation to do this. w.r.t preCombine and combineAndGetUpdateValue both will be using the same ordering field(s) to determine the latest.

In order to not disrupt other payload classes, I deprecated the existing combineAndGetUpdateValue, extended it and provided a default implementation that would ignore the Map field and internally do the old way. OverwriteWithLatestAvroPayload alone will override this default functionality to achieve the above purpose.

Hope that helps!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bhasudha thanks for explaining, although my metapoint is still not resolved. If you look at the log scanner -> https://github.com/apache/hudi/blob/master/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java#L114, the preCombine method is used to combine records written to the log (over multiple batches). Ideally, according to your use-case, since during logging we don't know what is the order of updates, we simply write to disk and leave the resolution for a later time. In this scenario, since combineAndGetUpdateValue is not called, how will preCombine honor the orderingVal ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@n3nash preCombine is using the same orderingVal as would combineAndGetUpdateValue. And in the HoodieMergedLogRedordScanner class we are trying to resolve records that are logged to the system already and use orderingVal to identify the most recent one. I dont understand why would we need to use combineAndGetUpdateValue here.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just my two cents:
The interface changes looks a bit of overkill for this type of new feature.

If we are still relying on avro schema, we may leverage schema.getProp(orderingFieldKey) to get ordering field for this record.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@advancedxy I guess we would nt know which is the orderingFiledKey when calling this method. That is why we would need to send in that info using another param, so we can get fetch that from the schema later.

return combineAndGetUpdateValue(currentValue, schema);
}

/**
* Generates an avro record out of the given HoodieRecordPayload, to be written out to storage. Called when writing a
* new value for the given HoodieKey, wherein there is no existing record in storage to be combined against. (i.e
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@

package org.apache.hudi.common.model;

import org.apache.hudi.avro.HoodieAvroUtils;
import static org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro;
import static org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldVal;

import java.util.Map;
import org.apache.hudi.common.util.Option;

import org.apache.avro.Schema;
Expand Down Expand Up @@ -67,7 +70,7 @@ public Option<IndexedRecord> getInsertValue(Schema schema) throws IOException {
if (recordBytes.length == 0) {
return Option.empty();
}
IndexedRecord indexedRecord = HoodieAvroUtils.bytesToAvro(recordBytes, schema);
IndexedRecord indexedRecord = bytesToAvro(recordBytes, schema);
if (isDeleteRecord((GenericRecord) indexedRecord)) {
return Option.empty();
} else {
Expand All @@ -83,4 +86,39 @@ private boolean isDeleteRecord(GenericRecord genericRecord) {
Object deleteMarker = genericRecord.get("_hoodie_is_deleted");
return (deleteMarker instanceof Boolean && (boolean) deleteMarker);
}

@Override
public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema, Map<String, String> props) throws IOException {
if (recordBytes.length == 0) {
return Option.empty();
}
GenericRecord incomingRecord = bytesToAvro(recordBytes, schema);
/*
* Combining strategy here returns currentValue on disk if incoming record is older.
* The incoming record can be either a delete (sent as an upsert with _hoodie_is_deleted set to true)
* or an insert/update record. In any case, if it is older than the record in disk, the currentValue
* in disk is returned (to be rewritten with new commit time).
*
* NOTE: Deletes sent via EmptyHoodieRecordPayload and/or Delete operation type do not hit this code path
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

created a Jira issue to track this separately - https://issues.apache.org/jira/browse/HUDI-1127

* and need to be dealt with separately.
*/
Object persistedOrderingVal = getNestedFieldVal((GenericRecord) currentValue, props.get(ORDERING_FIELD_OPT_KEY), true);
Comparable incomingOrderingVal = (Comparable) getNestedFieldVal(incomingRecord, props.get(ORDERING_FIELD_OPT_KEY), false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if schema got evolved and the ordering field for incoming record does not exist in existing storage, this might throw NPE.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bhasudha : I haven't reviewed the PR, but a high level question. do we have any guards if user changes the ordering field prop along the way ? Basically I expect we fail any operation if ordering field prop is changed.

This will be handled the same way our ingestion works when ordering field changes. in the merge phase, the new ordering field will be used to compare the incoming and persisted record.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if schema got evolved and the ordering field for incoming record does not exist in existing storage, this might throw NPE.

Thanks @nsivabalan nice catch. I think if the incoming records schema evolved to add new columns and they are also used for ordering, the persisted record will not have that ordering field. And that will throw an NPE. I ll handle that here.


// Null check is needed here to support schema evolution. The record in storage may be from old schema where
// the new ordering column might not be present and hence returns null.
if (persistedOrderingVal != null && ((Comparable) persistedOrderingVal).compareTo(incomingOrderingVal) > 0) {
return Option.of(currentValue);
}

/*
* We reached a point where the value is disk is older than the incoming record.
* Now check if the incoming record is a delete record.
*/
if (isDeleteRecord(incomingRecord)) {
return Option.empty();
} else {
return Option.of(incomingRecord);
}
}
}
Loading