diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java index 222e1ab2ca5b2..d8f129e48d458 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java @@ -19,6 +19,7 @@ package org.apache.hudi.client; import com.codahale.metrics.Timer; +import org.apache.avro.Schema; import org.apache.hadoop.conf.Configuration; import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieCompactionPlan; @@ -259,7 +260,7 @@ protected void rollBackInflightBootstrap() { * @param instantTime Instant time of the commit * @return WriteStatus to inspect errors and counts */ - public abstract O upsert(I records, final String instantTime); + public abstract O upsert(I records, final String instantTime, Schema schema); /** * Upserts the given prepared records into the Hoodie table, at the supplied instantTime. diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java index afd1a36649409..a5a99b32d5851 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -123,7 +123,7 @@ private synchronized FileSystemViewManager getViewManager() { * @return HoodieWriteMetadata */ public abstract HoodieWriteMetadata upsert(HoodieEngineContext context, String instantTime, - I records); + I records, Schema schema); /** * Insert a batch of new records into Hoodie table at the supplied instantTime. diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/AbstractWriteHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/AbstractWriteHelper.java index caa6ecdb953a7..ae813622d40b4 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/AbstractWriteHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/AbstractWriteHelper.java @@ -19,6 +19,7 @@ package org.apache.hudi.table.action.commit; import org.apache.hudi.client.common.HoodieEngineContext; +import org.apache.hudi.common.config.SerializableSchema; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.exception.HoodieUpsertException; import org.apache.hudi.index.HoodieIndex; @@ -39,10 +40,23 @@ public HoodieWriteMetadata write(String instantTime, int shuffleParallelism, BaseCommitActionExecutor executor, boolean performTagging) { + return write(instantTime, inputRecords, context, table, shouldCombine, shuffleParallelism, + null, executor, performTagging); + } + + public HoodieWriteMetadata write(String instantTime, + I inputRecordsRDD, + HoodieEngineContext context, + HoodieTable table, + boolean shouldCombine, + int shuffleParallelism, + SerializableSchema schema, + BaseCommitActionExecutor executor, + boolean performTagging) { try { // De-dupe/merge if needed I dedupedRecords = - combineOnCondition(shouldCombine, inputRecords, shuffleParallelism, table); + combineOnCondition(shouldCombine, inputRecordsRDD, shuffleParallelism, table, schema); Instant lookupBegin = Instant.now(); I taggedRecords = dedupedRecords; @@ -70,8 +84,8 @@ private I tag( } public I combineOnCondition( - boolean condition, I records, int parallelism, HoodieTable table) { - return condition ? deduplicateRecords(records, table, parallelism) : records; + boolean condition, I records, int parallelism, HoodieTable table, SerializableSchema schema) { + return condition ? deduplicateRecords(records, table, parallelism, schema) : records; } /** @@ -82,10 +96,10 @@ public I combineOnCondition( * @return Collection of HoodieRecord already be deduplicated */ public I deduplicateRecords( - I records, HoodieTable table, int parallelism) { - return deduplicateRecords(records, table.getIndex(), parallelism); + I records, HoodieTable table, int parallelism, SerializableSchema schema) { + return deduplicateRecords(records, table.getIndex(), parallelism, schema); } public abstract I deduplicateRecords( - I records, HoodieIndex index, int parallelism); + I records, HoodieIndex index, int parallelism, SerializableSchema schema); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java index 56f06898abba2..f4e4ebcb4e82f 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java @@ -18,6 +18,7 @@ package org.apache.hudi.client; +import org.apache.avro.Schema; import org.apache.hudi.client.common.HoodieEngineContext; import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.client.embedded.EmbeddedTimelineService; @@ -128,14 +129,18 @@ public void bootstrap(Option> extraMetadata) { getTableAndInitCtx(WriteOperationType.UPSERT, HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS).bootstrap(context, extraMetadata); } - @Override public JavaRDD upsert(JavaRDD> records, String instantTime) { + return upsert(records, instantTime, null); + } + + @Override + public JavaRDD upsert(JavaRDD> records, String instantTime, Schema schema) { HoodieTable>, JavaRDD, JavaRDD> table = getTableAndInitCtx(WriteOperationType.UPSERT, instantTime); table.validateUpsertSchema(); setOperationType(WriteOperationType.UPSERT); this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime); - HoodieWriteMetadata> result = table.upsert(context, instantTime, records); + HoodieWriteMetadata> result = table.upsert(context, instantTime, records, schema); if (result.getIndexLookupDuration().isPresent()) { metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis()); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java index f2b336432b247..9bf5b6668b689 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java @@ -18,6 +18,7 @@ package org.apache.hudi.table; +import org.apache.avro.Schema; import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.avro.model.HoodieRestoreMetadata; @@ -84,8 +85,9 @@ public HoodieSparkCopyOnWriteTable(HoodieWriteConfig config, HoodieEngineContext } @Override - public HoodieWriteMetadata> upsert(HoodieEngineContext context, String instantTime, JavaRDD> records) { - return new SparkUpsertCommitActionExecutor<>((HoodieSparkEngineContext) context, config, this, instantTime, records).execute(); + public HoodieWriteMetadata> upsert(HoodieEngineContext context, String instantTime, + JavaRDD> records, Schema schema) { + return new SparkUpsertCommitActionExecutor<>((HoodieSparkEngineContext) context, config, this, instantTime, records, schema).execute(); } @Override diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java index 0a60dcc50f032..06a3cd1eaee48 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java @@ -18,6 +18,7 @@ package org.apache.hudi.table; +import org.apache.avro.Schema; import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.avro.model.HoodieRestoreMetadata; import org.apache.hudi.avro.model.HoodieRollbackMetadata; @@ -78,8 +79,10 @@ public class HoodieSparkMergeOnReadTable extends } @Override - public HoodieWriteMetadata> upsert(HoodieEngineContext context, String instantTime, JavaRDD> records) { - return new SparkUpsertDeltaCommitActionExecutor<>((HoodieSparkEngineContext) context, config, this, instantTime, records).execute(); + public HoodieWriteMetadata> upsert(HoodieEngineContext context, String instantTime, + JavaRDD> records, Schema schema) { + return new SparkUpsertDeltaCommitActionExecutor<>((HoodieSparkEngineContext) context, config, this, instantTime, + records, schema).execute(); } @Override diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java index 9ccd66b2cd265..e0d60cd2e8a2c 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java @@ -73,7 +73,7 @@ public HoodieWriteMetadata> bulkInsert(JavaRDD>) SparkWriteHelper.newInstance().combineOnCondition(config.shouldCombineBeforeInsert(), inputRecords, - config.getBulkInsertShuffleParallelism(), table); + config.getBulkInsertShuffleParallelism(), table, null); } final JavaRDD> repartitionedRecords; diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkUpsertCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkUpsertCommitActionExecutor.java index fe90212b0be15..57ecee0db4aeb 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkUpsertCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkUpsertCommitActionExecutor.java @@ -18,8 +18,10 @@ package org.apache.hudi.table.action.commit; +import org.apache.avro.Schema; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.common.HoodieSparkEngineContext; +import org.apache.hudi.common.config.SerializableSchema; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.WriteOperationType; @@ -33,17 +35,21 @@ public class SparkUpsertCommitActionExecutor> extends BaseSparkCommitActionExecutor { private JavaRDD> inputRecordsRDD; + private SerializableSchema schema; public SparkUpsertCommitActionExecutor(HoodieSparkEngineContext context, HoodieWriteConfig config, HoodieTable table, - String instantTime, JavaRDD> inputRecordsRDD) { + String instantTime, JavaRDD> inputRecordsRDD, + Schema schema) { super(context, config, table, instantTime, WriteOperationType.UPSERT); this.inputRecordsRDD = inputRecordsRDD; + this.schema = new SerializableSchema(schema); } @Override public HoodieWriteMetadata> execute() { return SparkWriteHelper.newInstance().write(instantTime, inputRecordsRDD, context, table, - config.shouldCombineBeforeUpsert(), config.getUpsertShuffleParallelism(), this, true); + config.shouldCombineBeforeUpsert(), config.getUpsertShuffleParallelism(), + schema, this, true); } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkWriteHelper.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkWriteHelper.java index 5f1a1ef5576dc..928a2e624f5af 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkWriteHelper.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkWriteHelper.java @@ -19,9 +19,11 @@ package org.apache.hudi.table.action.commit; import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.config.SerializableSchema; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.UpdatePrecombineAvroPayload; import org.apache.hudi.index.HoodieIndex; import org.apache.spark.api.java.JavaRDD; @@ -33,8 +35,8 @@ * * @param */ -public class SparkWriteHelper extends AbstractWriteHelper>, - JavaRDD, JavaRDD, R> { +public class SparkWriteHelper extends AbstractWriteHelper>, + JavaRDD, JavaRDD, R> { private SparkWriteHelper() { } @@ -49,7 +51,7 @@ public static SparkWriteHelper newInstance() { @Override public JavaRDD> deduplicateRecords(JavaRDD> records, HoodieIndex>, JavaRDD, JavaRDD> index, - int parallelism) { + int parallelism, SerializableSchema schema) { boolean isIndexingGlobal = index.isGlobal(); return records.mapToPair(record -> { HoodieKey hoodieKey = record.getKey(); @@ -58,7 +60,14 @@ public JavaRDD> deduplicateRecords(JavaRDD> reco return new Tuple2<>(key, record); }).reduceByKey((rec1, rec2) -> { @SuppressWarnings("unchecked") - T reducedData = (T) rec1.getData().preCombine(rec2.getData()); + T reducedData; + //To prevent every records from parsing schema + if (rec2.getData() instanceof UpdatePrecombineAvroPayload) { + reducedData = schema.getSchema() != null ? (T) rec1.getData().preCombine(rec2.getData(), schema.getSchema()) + : (T) rec1.getData().preCombine(rec2.getData()); + } else { + reducedData = (T) rec1.getData().preCombine(rec2.getData()); + } // we cannot allow the user to change the key or partitionPath, since that will affect // everything // so pick it from one of the records. diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkUpsertDeltaCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkUpsertDeltaCommitActionExecutor.java index 82aa081524050..9fdb61f772edd 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkUpsertDeltaCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkUpsertDeltaCommitActionExecutor.java @@ -18,7 +18,9 @@ package org.apache.hudi.table.action.deltacommit; +import org.apache.avro.Schema; import org.apache.hudi.client.common.HoodieSparkEngineContext; +import org.apache.hudi.common.config.SerializableSchema; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.WriteOperationType; @@ -33,17 +35,21 @@ public class SparkUpsertDeltaCommitActionExecutor { private JavaRDD> inputRecordsRDD; + private SerializableSchema schema; public SparkUpsertDeltaCommitActionExecutor(HoodieSparkEngineContext context, HoodieWriteConfig config, HoodieTable table, - String instantTime, JavaRDD> inputRecordsRDD) { + String instantTime, JavaRDD> inputRecordsRDD, + Schema schema) { super(context, config, table, instantTime, WriteOperationType.UPSERT); this.inputRecordsRDD = inputRecordsRDD; + this.schema = new SerializableSchema(schema); } @Override public HoodieWriteMetadata execute() { return SparkWriteHelper.newInstance().write(instantTime, inputRecordsRDD, context, table, - config.shouldCombineBeforeUpsert(), config.getUpsertShuffleParallelism(),this, true); + config.shouldCombineBeforeUpsert(), config.getUpsertShuffleParallelism(), + schema, this, true); } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java index bbb40488bb04e..b599ee83da72e 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java @@ -239,14 +239,15 @@ private void testDeduplication( // Global dedup should be done based on recordKey only HoodieIndex index = mock(HoodieIndex.class); when(index.isGlobal()).thenReturn(true); - List> dedupedRecs = SparkWriteHelper.newInstance().deduplicateRecords(records, index, 1).collect(); + List> dedupedRecs = SparkWriteHelper.newInstance().deduplicateRecords(records, index, 1, + null).collect(); assertEquals(1, dedupedRecs.size()); assertNodupesWithinPartition(dedupedRecs); // non-Global dedup should be done based on both recordKey and partitionPath index = mock(HoodieIndex.class); when(index.isGlobal()).thenReturn(false); - dedupedRecs = SparkWriteHelper.newInstance().deduplicateRecords(records, index, 1).collect(); + dedupedRecs = SparkWriteHelper.newInstance().deduplicateRecords(records, index, 1, null).collect(); assertEquals(2, dedupedRecs.size()); assertNodupesWithinPartition(dedupedRecs); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java index c054bc4602f85..8167dd89597e8 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java @@ -385,7 +385,7 @@ public void testFileSizeUpsertRecords() throws Exception { // Insert new records BaseSparkCommitActionExecutor actionExecutor = new SparkUpsertCommitActionExecutor(context, config, table, - instantTime, jsc.parallelize(records)); + instantTime, jsc.parallelize(records), null); jsc.parallelize(Arrays.asList(1)) .map(i -> actionExecutor.handleInsert(FSUtils.createNewFileIdPfx(), records.iterator())) .map(Transformations::flatten).collect(); @@ -426,7 +426,7 @@ public void testInsertUpsertWithHoodieAvroPayload() throws Exception { String partitionPath = writeStatus.getPartitionPath(); long numRecordsInPartition = updates.stream().filter(u -> u.getPartitionPath().equals(partitionPath)).count(); BaseSparkCommitActionExecutor newActionExecutor = new SparkUpsertCommitActionExecutor(context, config, table, - instantTime, jsc.parallelize(updates)); + instantTime, jsc.parallelize(updates), null); final List> updateStatus = jsc.parallelize(Arrays.asList(1)).map(x -> { return newActionExecutor.handleUpdate(partitionPath, fileId, updates.iterator()); }).map(Transformations::flatten).collect(); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/SerializableSchema.java b/hudi-common/src/main/java/org/apache/hudi/common/config/SerializableSchema.java new file mode 100644 index 0000000000000..556d829ef6c68 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/SerializableSchema.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.config; + +import org.apache.avro.AvroRuntimeException; +import org.apache.avro.Schema; +import org.apache.hadoop.io.WritableUtils; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +/** + * A wrapped schema which can be serialized. + */ +public class SerializableSchema implements Serializable { + private static final long serialVersionUID = -3281148111709753816L; + private transient Schema schema; + + public SerializableSchema(Schema schema) { + this.schema = schema; + } + + private void writeObject(ObjectOutputStream out) throws IOException { + out.defaultWriteObject(); + if (schema != null) { + List fields = schema.getFields(); + WritableUtils.writeVInt(out, fields.size()); + for (Schema.Field field : fields) { + org.apache.hadoop.io.Text.writeString(out, field.name()); + org.apache.hadoop.io.Text.writeString(out, castSchemaType(field.schema())); + org.apache.hadoop.io.Text.writeString(out, field.doc() == null ? "" : field.doc()); + out.writeObject(field.defaultVal()); + } + } else { + WritableUtils.writeVInt(out, 0); + } + } + + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + int size = WritableUtils.readVInt(in); + if (size != 0) { + ArrayList fields = new ArrayList<>(); + for (int i = 0; i < size; ++i) { + String name = org.apache.hadoop.io.Text.readString(in); + Schema value = castSchemaType(org.apache.hadoop.io.Text.readString(in)); + String doc = org.apache.hadoop.io.Text.readString(in); + Object defaultValue = in.readObject(); + fields.add(new Schema.Field(name, value, doc, defaultValue)); + } + this.schema = Schema.createRecord(fields); + } else { + schema = null; + } + } + + private String castSchemaType(Schema type) { + return type.getType().getName(); + } + + private Schema castSchemaType(String type) { + switch (type) { + case "string": + return Schema.create(Schema.Type.STRING); + case "bytes": + return Schema.create(Schema.Type.BYTES); + case "int": + return Schema.create(Schema.Type.INT); + case "long": + return Schema.create(Schema.Type.LONG); + case "float": + return Schema.create(Schema.Type.FLOAT); + case "double": + return Schema.create(Schema.Type.DOUBLE); + case "boolean": + return Schema.create(Schema.Type.BOOLEAN); + case "null": + return Schema.create(Schema.Type.NULL); + default: + throw new AvroRuntimeException("Can't create a: " + type); + } + } + + @Override + public String toString() { + return schema.toString(); + } + + public Schema getSchema() { + return schema; + } + + public void setSchema(Schema schema) { + this.schema = schema; + } + +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/BaseAvroPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/BaseAvroPayload.java index 3b35b0d4dca16..72f129f6c7e3a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/BaseAvroPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/BaseAvroPayload.java @@ -32,7 +32,7 @@ public abstract class BaseAvroPayload implements Serializable { /** * Avro data extracted from the source converted to bytes. */ - public final byte[] recordBytes; + public byte[] recordBytes; /** * For purposes of preCombining. diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java index 1afdd1b59af64..c8fe1d8a2872d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java @@ -41,9 +41,19 @@ public interface HoodieRecordPayload extends Seri * When more than one HoodieRecord have the same HoodieKey, this function combines them before attempting to * insert/upsert (if combining turned on in HoodieClientConfig). */ - @PublicAPIMethod(maturity = ApiMaturityLevel.STABLE) + @Deprecated T preCombine(T another); + /** + * When more than one HoodieRecord have the same HoodieKey, this function combines all fields(which is not null) + * before attempting to insert/upsert (if combining turned on in HoodieClientConfig). + * + */ + @PublicAPIMethod(maturity = ApiMaturityLevel.STABLE) + default T preCombine(T another, Schema schema) throws IOException { + return preCombine(another); + } + /** * This methods lets you write custom merging/combining logic to produce new values as a function of current value on * storage and whats contained in this object. diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/PartialAvroPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/PartialAvroPayload.java new file mode 100644 index 0000000000000..aefd4a9eb9c89 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/PartialAvroPayload.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.model; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hudi.common.util.Option; + +import java.io.IOException; +import java.util.List; + +/** + * subclass of OverwriteWithLatestAvroPayload. + * + * Extract the function precombine of UpdatePrecombineAvroPayload and combineAndGetUpdateValue of OverwriteNonDefaultsWithLatestAvroPayload. + * Which means When more than one HoodieRecord have the same HoodieKey, this function will combine all fields(which is not null) + * Before attempting to insert/upsert And when insert/upsert into storage. + */ +public class PartialAvroPayload extends UpdatePrecombineAvroPayload { + public PartialAvroPayload(GenericRecord record, Comparable orderingVal) { + super(record, orderingVal); + } + + public PartialAvroPayload(Option record) { + super(record); + } + + @Override + public Option combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) throws IOException { + Option recordOption = getInsertValue(schema); + if (!recordOption.isPresent()) { + return Option.empty(); + } + + GenericRecord insertRecord = (GenericRecord) recordOption.get(); + GenericRecord currentRecord = (GenericRecord) currentValue; + + if (isDeleteRecord(insertRecord)) { + return Option.empty(); + } else { + List fields = schema.getFields(); + fields.forEach(field -> { + Object value = insertRecord.get(field.name()); + Object defaultValue = field.defaultVal(); + if (!overwriteField(value, defaultValue)) { + currentRecord.put(field.name(), value); + } + }); + return Option.of(currentRecord); + } + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/UpdatePrecombineAvroPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/UpdatePrecombineAvroPayload.java new file mode 100644 index 0000000000000..b93e046d12714 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/UpdatePrecombineAvroPayload.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.model; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.common.util.Option; + +import java.io.IOException; +import java.util.List; + +/** + * subclass of OverwriteWithLatestAvroPayload. + * + *
    + *
  1. preCombine - When more than one HoodieRecord have the same HoodieKey, this function combines all fields(which is not null) + * before attempting to insert/upsert. + * eg: 1) + * Before: + * id name age ts + * 1 Karl null 0.0 + * 1 null 18 0.0 + * After: + * id name age ts + * 1 Karl 18 0.0 + *
+ */ +public class UpdatePrecombineAvroPayload extends OverwriteWithLatestAvroPayload { + public UpdatePrecombineAvroPayload(GenericRecord record, Comparable orderingVal) { + super(record, orderingVal); + } + + public UpdatePrecombineAvroPayload(Option record) { + super(record); + } + + @Override + public OverwriteWithLatestAvroPayload preCombine(OverwriteWithLatestAvroPayload another, Schema schema) throws IOException { + // pick the payload with greatest ordering value and aggregate all the fields,choosing the + // value that is not null + GenericRecord thisValue = HoodieAvroUtils.bytesToAvro(this.recordBytes, schema); + GenericRecord anotherValue = HoodieAvroUtils.bytesToAvro(another.recordBytes, schema); + List fields = schema.getFields(); + + if (another.orderingVal.compareTo(orderingVal) > 0) { + GenericRecord anotherRoc = combineAllFields(fields, anotherValue, thisValue); + another.recordBytes = HoodieAvroUtils.avroToBytes(anotherRoc); + return another; + } else { + GenericRecord thisRoc = combineAllFields(fields, thisValue, anotherValue); + this.recordBytes = HoodieAvroUtils.avroToBytes(thisRoc); + return this; + } + } + + public GenericRecord combineAllFields(List fields, GenericRecord priorRec, GenericRecord secPriorRoc) { + for (int i = 0; i < fields.size(); i++) { + Object priorValue = priorRec.get(fields.get(i).name()); + Object secPriorValue = secPriorRoc.get(fields.get(i).name()); + Object defaultVal = fields.get(i).defaultVal(); + if (overwriteField(priorValue, defaultVal) && !overwriteField(secPriorValue, defaultVal)) { + priorRec.put(fields.get(i).name(), secPriorValue); + } + } + return priorRec; + } +} diff --git a/hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwritePrecombineAvroPayload.java b/hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwritePrecombineAvroPayload.java new file mode 100644 index 0000000000000..eaccb4b82fa02 --- /dev/null +++ b/hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwritePrecombineAvroPayload.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.model; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.hudi.avro.HoodieAvroUtils; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.Arrays; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** + * Unit tests {@link UpdatePrecombineAvroPayload}. + */ +public class TestOverwritePrecombineAvroPayload { + + private Schema schema; + + @BeforeEach + public void setUp() throws Exception { + schema = Schema.createRecord(Arrays.asList( + new Schema.Field("id", Schema.create(Schema.Type.STRING), "", null), + new Schema.Field("partition", Schema.create(Schema.Type.STRING), "", ""), + new Schema.Field("ts", Schema.create(Schema.Type.LONG), "", null), + new Schema.Field("_hoodie_is_deleted", Schema.create(Schema.Type.BOOLEAN), "", false) + )); + } + + @Test + public void testActiveRecords() throws IOException { + GenericRecord record1 = new GenericData.Record(schema); + record1.put("id", "1"); + record1.put("partition", "partition0"); + record1.put("ts", 0L); + record1.put("_hoodie_is_deleted", false); + + GenericRecord record2 = new GenericData.Record(schema); + record2.put("id", "2"); + record2.put("partition", ""); + record2.put("ts", 1L); + record2.put("_hoodie_is_deleted", false); + + GenericRecord record3 = new GenericData.Record(schema); + record3.put("id", "2"); + record3.put("partition", "partition0"); + record3.put("ts", 1L); + record3.put("_hoodie_is_deleted", false); + + OverwriteWithLatestAvroPayload payload1 = new UpdatePrecombineAvroPayload(record1, 1); + OverwriteWithLatestAvroPayload payload2 = new UpdatePrecombineAvroPayload(record2, 2); + assertEquals(payload1.preCombine(payload2), payload2); + assertEquals(payload2.preCombine(payload1), payload2); + + assertEquals(record1, payload1.getInsertValue(schema).get()); + assertEquals(record2, payload2.getInsertValue(schema).get()); + + assertEquals(payload1.combineAndGetUpdateValue(record2, schema).get(), record1); + assertEquals(payload2.combineAndGetUpdateValue(record1, schema).get(), record2); + + assertEquals(HoodieAvroUtils.bytesToAvro(payload1.preCombine(payload2, schema).recordBytes, schema), + record3); + } +} diff --git a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java index 37572c3678485..3296e1f2b457e 100644 --- a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java +++ b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java @@ -18,6 +18,7 @@ package org.apache.hudi; +import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -199,7 +200,7 @@ public static String getCommitActionType(WriteOperationType operation, HoodieTab } public static HoodieWriteResult doWriteOperation(SparkRDDWriteClient client, JavaRDD hoodieRecords, - String instantTime, WriteOperationType operation) throws HoodieException { + String instantTime, WriteOperationType operation, Schema schema) throws HoodieException { switch (operation) { case BULK_INSERT: Option userDefinedBulkInsertPartitioner = @@ -208,7 +209,7 @@ public static HoodieWriteResult doWriteOperation(SparkRDDWriteClient client, Jav case INSERT: return new HoodieWriteResult(client.insert(hoodieRecords, instantTime)); case UPSERT: - return new HoodieWriteResult(client.upsert(hoodieRecords, instantTime)); + return new HoodieWriteResult(client.upsert(hoodieRecords, instantTime, schema)); case INSERT_OVERWRITE: return client.insertOverwrite(hoodieRecords, instantTime); default: diff --git a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 8cad2f3d508f9..86c60c0d3db56 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -176,7 +176,7 @@ private[hudi] object HoodieSparkSqlWriter { (true, common.util.Option.empty()) } client.startCommitWithTime(instantTime, commitActionType) - val writeResult = DataSourceUtils.doWriteOperation(client, hoodieRecords, instantTime, operation) + val writeResult = DataSourceUtils.doWriteOperation(client, hoodieRecords, instantTime, operation, schema) (writeResult, client) } else { val structName = s"${tblName}_record" diff --git a/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java b/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java index 97948b9ee3176..3c2db7ea86caa 100644 --- a/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java +++ b/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java @@ -131,7 +131,7 @@ public void testDoWriteOperationWithoutUserDefinedBulkInsertPartitioner() throws when(hoodieWriteClient.getConfig()).thenReturn(config); DataSourceUtils.doWriteOperation(hoodieWriteClient, hoodieRecords, "test-time", - WriteOperationType.BULK_INSERT); + WriteOperationType.BULK_INSERT, null); verify(hoodieWriteClient, times(1)).bulkInsert(any(hoodieRecords.getClass()), anyString(), optionCaptor.capture()); @@ -144,7 +144,7 @@ public void testDoWriteOperationWithNonExistUserDefinedBulkInsertPartitioner() t Exception exception = assertThrows(HoodieException.class, () -> { DataSourceUtils.doWriteOperation(hoodieWriteClient, hoodieRecords, "test-time", - WriteOperationType.BULK_INSERT); + WriteOperationType.BULK_INSERT, null); }); assertThat(exception.getMessage(), containsString("Could not create UserDefinedBulkInsertPartitioner")); @@ -155,7 +155,7 @@ public void testDoWriteOperationWithUserDefinedBulkInsertPartitioner() throws Ho setAndVerifyHoodieWriteClientWith(NoOpBulkInsertPartitioner.class.getName()); DataSourceUtils.doWriteOperation(hoodieWriteClient, hoodieRecords, "test-time", - WriteOperationType.BULK_INSERT); + WriteOperationType.BULK_INSERT, null); verify(hoodieWriteClient, times(1)).bulkInsert(any(hoodieRecords.getClass()), anyString(), optionCaptor.capture());