diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java index 9f3b74b0c4708..4d8c9ed5f9cf2 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java @@ -175,7 +175,8 @@ public boolean commitStats(String instantTime, List stats, Opti // Create a Hoodie table which encapsulated the commits and files visible HoodieTable table = createTable(config, hadoopConf); HoodieActiveTimeline activeTimeline = table.getActiveTimeline(); - HoodieCommitMetadata metadata = CommitUtils.buildMetadata(stats, partitionToReplaceFileIds, extraMetadata, operationType, config.getSchema(), commitActionType); + HoodieCommitMetadata metadata = CommitUtils.buildMetadata(stats, partitionToReplaceFileIds, extraMetadata, operationType, config.getSchema(), + commitActionType, config.updatePartialFields(), table.getMetaClient()); // Finalize write finalizeWrite(table, instantTime, stats); HeartbeatUtils.abortIfHeartbeatExpired(instantTime, table, heartbeatClient, config); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 944cd02306b36..608820357596d 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -35,6 +35,7 @@ import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.keygen.SimpleAvroKeyGenerator; @@ -79,6 +80,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { public static final String TIMELINE_LAYOUT_VERSION = "hoodie.timeline.layout.version"; public static final String BASE_PATH_PROP = "hoodie.base.path"; public static final String AVRO_SCHEMA = "hoodie.avro.schema"; + public static final String LAST_AVRO_SCHEMA = "hoodie.last.avro.schema"; public static final String AVRO_SCHEMA_VALIDATE = "hoodie.avro.schema.validate"; public static final String DEFAULT_AVRO_SCHEMA_VALIDATE = "false"; public static final String DEFAULT_PARALLELISM = "1500"; @@ -113,6 +115,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { public static final String DEFAULT_BULKINSERT_SORT_MODE = BulkInsertSortMode.GLOBAL_SORT .toString(); + // Enable partial field updates + public static final String UPDATE_PARTIAL_FIELDS = "hoodie.update.partial.fields"; + public static final String DEFAULT_UPDATE_PARTIAL_FIELDS = "false"; + public static final String EMBEDDED_TIMELINE_SERVER_ENABLED = "hoodie.embed.timeline.server"; public static final String DEFAULT_EMBEDDED_TIMELINE_SERVER_ENABLED = "true"; public static final String EMBEDDED_TIMELINE_SERVER_PORT = "hoodie.embed.timeline.server.port"; @@ -221,6 +227,9 @@ public String getBasePath() { } public String getSchema() { + if (updatePartialFields() && !StringUtils.isNullOrEmpty(getLastSchema())) { + return getLastSchema(); + } return props.getProperty(AVRO_SCHEMA); } @@ -228,6 +237,14 @@ public void setSchema(String schemaStr) { props.setProperty(AVRO_SCHEMA, schemaStr); } + public String getLastSchema() { + return props.getProperty(LAST_AVRO_SCHEMA); + } + + public void setLastSchema(String schemaStr) { + props.setProperty(LAST_AVRO_SCHEMA, schemaStr); + } + public boolean getAvroSchemaValidate() { return Boolean.parseBoolean(props.getProperty(AVRO_SCHEMA_VALIDATE)); } @@ -369,6 +386,10 @@ public BulkInsertSortMode getBulkInsertSortMode() { return BulkInsertSortMode.valueOf(sortMode.toUpperCase()); } + public Boolean updatePartialFields() { + return Boolean.parseBoolean(props.getProperty(UPDATE_PARTIAL_FIELDS)); + } + public boolean isMergeDataValidationCheckEnabled() { return Boolean.parseBoolean(props.getProperty(MERGE_DATA_VALIDATION_CHECK_ENABLED)); } @@ -1086,6 +1107,11 @@ public Builder withSchema(String schemaStr) { return this; } + public Builder withLastSchema(String schemaStr) { + props.setProperty(LAST_AVRO_SCHEMA, schemaStr); + return this; + } + public Builder withAvroSchemaValidate(boolean enable) { props.setProperty(AVRO_SCHEMA_VALIDATE, String.valueOf(enable)); return this; @@ -1291,6 +1317,11 @@ public Builder withExternalSchemaTrasformation(boolean enabled) { return this; } + public Builder withUpdatePartialFields(boolean updatePartialFields) { + props.setProperty(UPDATE_PARTIAL_FIELDS, String.valueOf(updatePartialFields)); + return this; + } + public Builder withMergeDataValidationCheckEnabled(boolean enabled) { props.setProperty(MERGE_DATA_VALIDATION_CHECK_ENABLED, String.valueOf(enabled)); return this; @@ -1333,6 +1364,7 @@ protected void setDefaults() { DEFAULT_PARALLELISM); setDefaultOnCondition(props, !props.containsKey(UPSERT_PARALLELISM), UPSERT_PARALLELISM, DEFAULT_PARALLELISM); setDefaultOnCondition(props, !props.containsKey(DELETE_PARALLELISM), DELETE_PARALLELISM, DEFAULT_PARALLELISM); + setDefaultOnCondition(props, !props.containsKey(UPDATE_PARTIAL_FIELDS), UPDATE_PARTIAL_FIELDS, DEFAULT_UPDATE_PARTIAL_FIELDS); setDefaultOnCondition(props, !props.containsKey(ROLLBACK_PARALLELISM), ROLLBACK_PARALLELISM, DEFAULT_ROLLBACK_PARALLELISM); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java index 357cf1b3bedee..63094c62143f6 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java @@ -34,7 +34,6 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieInsertException; import org.apache.hudi.io.storage.HoodieFileWriter; -import org.apache.hudi.io.storage.HoodieFileWriterFactory; import org.apache.hudi.table.HoodieTable; import org.apache.avro.generic.GenericRecord; @@ -82,7 +81,7 @@ public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTa new Path(config.getBasePath()), FSUtils.getPartitionPath(config.getBasePath(), partitionPath)); partitionMetadata.trySave(getPartitionId()); createMarkerFile(partitionPath, FSUtils.makeDataFileName(this.instantTime, this.writeToken, this.fileId, hoodieTable.getBaseFileExtension())); - this.fileWriter = HoodieFileWriterFactory.getFileWriter(instantTime, path, hoodieTable, config, writerSchemaWithMetafields, this.taskContextSupplier); + this.fileWriter = createNewFileWriter(instantTime, path, hoodieTable, config, writerSchemaWithMetafields, this.taskContextSupplier); } catch (IOException e) { throw new HoodieInsertException("Failed to initialize HoodieStorageWriter for path " + path, e); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java index 8579f54b02686..b286ff74d13a5 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java @@ -18,6 +18,7 @@ package org.apache.hudi.io; +import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.engine.TaskContextSupplier; import org.apache.hudi.common.fs.FSUtils; @@ -29,10 +30,13 @@ import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats; import org.apache.hudi.common.model.IOType; +import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.util.DefaultSizeEstimator; import org.apache.hudi.common.util.HoodieRecordSizeEstimator; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.collection.ExternalSpillableMap; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieCorruptedDataException; import org.apache.hudi.exception.HoodieIOException; @@ -106,7 +110,7 @@ public class HoodieMergeHandle extends H public HoodieMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTable hoodieTable, Iterator> recordItr, String partitionPath, String fileId, TaskContextSupplier taskContextSupplier) { - super(config, instantTime, partitionPath, fileId, hoodieTable, taskContextSupplier); + super(config, instantTime, partitionPath, fileId, hoodieTable, getWriterSchemaIncludingAndExcludingMetadataPair(config, hoodieTable), taskContextSupplier); init(fileId, recordItr); init(fileId, partitionPath, hoodieTable.getBaseFileOnlyView().getLatestBaseFile(partitionPath, fileId).get()); } @@ -123,6 +127,22 @@ public HoodieMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTab init(fileId, this.partitionPath, dataFileToBeMerged); } + protected static Pair getWriterSchemaIncludingAndExcludingMetadataPair(HoodieWriteConfig config, HoodieTable hoodieTable) { + Schema originalSchema = new Schema.Parser().parse(config.getSchema()); + Schema hoodieSchema = HoodieAvroUtils.addMetadataFields(originalSchema); + boolean updatePartialFields = config.updatePartialFields(); + if (updatePartialFields) { + try { + TableSchemaResolver resolver = new TableSchemaResolver(hoodieTable.getMetaClient()); + Schema lastSchema = resolver.getTableAvroSchema(); + config.setLastSchema(lastSchema.toString()); + } catch (Exception e) { + // Ignore exception. + } + } + return Pair.of(originalSchema, hoodieSchema); + } + @Override public Schema getWriterSchemaWithMetafields() { return writerSchemaWithMetafields; @@ -144,6 +164,7 @@ protected String generatesDataFileName() { */ private void init(String fileId, String partitionPath, HoodieBaseFile baseFileToMerge) { LOG.info("partitionPath:" + partitionPath + ", fileId to be merged:" + fileId); + writerSchemaWithMetafields = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema())); this.baseFileToMerge = baseFileToMerge; this.writtenRecordKeys = new HashSet<>(); writeStatus.setStat(new HoodieWriteStat()); @@ -266,6 +287,13 @@ protected boolean writeRecord(HoodieRecord hoodieRecord, Option private static final Logger LOG = LogManager.getLogger(HoodieWriteHandle.class); protected final Schema writerSchema; - protected final Schema writerSchemaWithMetafields; + protected Schema writerSchemaWithMetafields; protected HoodieTimer timer; protected WriteStatus writeStatus; protected final String partitionPath; diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java index 5df46a65bc5ee..a6eed2ea801b8 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java @@ -148,7 +148,7 @@ protected void commit(Option> extraMetadata, HoodieWriteMeta LOG.info("Committing " + instantTime + ", action Type " + getCommitActionType()); HoodieActiveTimeline activeTimeline = table.getActiveTimeline(); HoodieCommitMetadata metadata = CommitUtils.buildMetadata(writeStats, result.getPartitionToReplaceFileIds(), - extraMetadata, operationType, getSchemaToStoreInCommit(), getCommitActionType()); + extraMetadata, operationType, getSchemaToStoreInCommit(), getCommitActionType(), config.updatePartialFields(), table.getMetaClient()); activeTimeline.saveAsComplete(new HoodieInstant(true, getCommitActionType(), instantTime), Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8))); diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkMergeHelper.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkMergeHelper.java index 9f9f865d51ae1..43ec8095c08f9 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkMergeHelper.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkMergeHelper.java @@ -24,6 +24,7 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor; import org.apache.hudi.common.util.queue.IteratorBasedQueueProducer; import org.apache.hudi.exception.HoodieException; @@ -79,7 +80,11 @@ public void runMerge(HoodieTable>, List, List } else { gReader = null; gWriter = null; - readSchema = mergeHandle.getWriterSchemaWithMetafields(); + if (table.getConfig().updatePartialFields() && !StringUtils.isNullOrEmpty(table.getConfig().getLastSchema())) { + readSchema = new Schema.Parser().parse(table.getConfig().getLastSchema()); + } else { + readSchema = upsertHandle.getWriterSchemaWithMetafields(); + } } BoundedInMemoryExecutor wrapper = null; diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java index a2ea1c9c0a837..cf5ba61832a92 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java @@ -204,7 +204,7 @@ protected void commit(Option> extraMetadata, HoodieWriteMeta LOG.info("Committing " + instantTime + ", action Type " + getCommitActionType()); HoodieActiveTimeline activeTimeline = table.getActiveTimeline(); HoodieCommitMetadata metadata = CommitUtils.buildMetadata(writeStats, result.getPartitionToReplaceFileIds(), - extraMetadata, operationType, getSchemaToStoreInCommit(), getCommitActionType()); + extraMetadata, operationType, getSchemaToStoreInCommit(), getCommitActionType(), config.updatePartialFields(), table.getMetaClient()); activeTimeline.saveAsComplete(new HoodieInstant(true, getCommitActionType(), instantTime), Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8))); diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaMergeHelper.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaMergeHelper.java index bd596bea541e1..b5cdae3b5c14b 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaMergeHelper.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaMergeHelper.java @@ -24,6 +24,7 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor; import org.apache.hudi.common.util.queue.IteratorBasedQueueProducer; import org.apache.hudi.exception.HoodieException; @@ -76,7 +77,11 @@ public void runMerge(HoodieTable>, List, List } else { gReader = null; gWriter = null; - readSchema = mergeHandle.getWriterSchemaWithMetafields(); + if (table.getConfig().updatePartialFields() && !StringUtils.isNullOrEmpty(table.getConfig().getLastSchema())) { + readSchema = new Schema.Parser().parse(table.getConfig().getLastSchema()); + } else { + readSchema = upsertHandle.getWriterSchemaWithMetafields(); + } } BoundedInMemoryExecutor wrapper = null; diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkExecuteClusteringCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkExecuteClusteringCommitActionExecutor.java index 9a9109afb1d62..2cf3bdef50d50 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkExecuteClusteringCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkExecuteClusteringCommitActionExecutor.java @@ -94,7 +94,7 @@ public HoodieWriteMetadata> execute() { .map(inputGroup -> runClusteringForGroupAsync(inputGroup, clusteringPlan.getStrategy().getStrategyParams())) .map(CompletableFuture::join) .reduce((rdd1, rdd2) -> rdd1.union(rdd2)).orElse(engineContext.emptyRDD()); - + HoodieWriteMetadata> writeMetadata = buildWriteMetadata(writeStatusRDD); JavaRDD statuses = updateIndex(writeStatusRDD, writeMetadata); writeMetadata.setWriteStats(statuses.map(WriteStatus::getStat).collect()); @@ -103,7 +103,7 @@ public HoodieWriteMetadata> execute() { commitOnAutoCommit(writeMetadata); if (!writeMetadata.getCommitMetadata().isPresent()) { HoodieCommitMetadata commitMetadata = CommitUtils.buildMetadata(writeStatusRDD.map(WriteStatus::getStat).collect(), writeMetadata.getPartitionToReplaceFileIds(), - extraMetadata, operationType, getSchemaToStoreInCommit(), getCommitActionType()); + extraMetadata, operationType, getSchemaToStoreInCommit(), getCommitActionType(), config.updatePartialFields(), table.getMetaClient()); writeMetadata.setCommitMetadata(Option.of(commitMetadata)); } return writeMetadata; @@ -112,7 +112,7 @@ public HoodieWriteMetadata> execute() { /** * Validate actions taken by clustering. In the first implementation, we validate at least one new file is written. * But we can extend this to add more validation. E.g. number of records read = number of records written etc. - * + * * We can also make these validations in BaseCommitActionExecutor to reuse pre-commit hooks for multiple actions. */ private void validateWriteResult(HoodieWriteMetadata> writeMetadata) { @@ -211,7 +211,7 @@ private JavaRDD> readRecordsForGroup iteratorsForPartition.add(baseFileReader.getRecordIterator(readerSchema)); } catch (IOException e) { throw new HoodieClusteringException("Error reading input data for " + clusteringOp.getDataFilePath() - + " and " + clusteringOp.getDeltaFilePaths(), e); + + " and " + clusteringOp.getDeltaFilePaths(), e); } }); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java index aa199c2b5008e..5bc126028c3bf 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java @@ -252,7 +252,7 @@ protected void commit(Option> extraMetadata, HoodieWriteMeta LOG.info("Committing " + instantTime + ", action Type " + getCommitActionType()); HoodieActiveTimeline activeTimeline = table.getActiveTimeline(); HoodieCommitMetadata metadata = CommitUtils.buildMetadata(writeStats, result.getPartitionToReplaceFileIds(), - extraMetadata, operationType, getSchemaToStoreInCommit(), getCommitActionType()); + extraMetadata, operationType, getSchemaToStoreInCommit(), getCommitActionType(), config.updatePartialFields(), table.getMetaClient()); activeTimeline.saveAsComplete(new HoodieInstant(true, getCommitActionType(), instantTime), Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8))); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkMergeHelper.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkMergeHelper.java index 08d60b93da37a..e8dbf4eaa95da 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkMergeHelper.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkMergeHelper.java @@ -23,6 +23,7 @@ import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.execution.SparkBoundedInMemoryExecutor; @@ -75,7 +76,11 @@ public void runMerge(HoodieTable>, JavaRDD } else { gReader = null; gWriter = null; - readSchema = mergeHandle.getWriterSchemaWithMetafields(); + if (table.getConfig().updatePartialFields() && !StringUtils.isNullOrEmpty(table.getConfig().getLastSchema())) { + readSchema = new Schema.Parser().parse(table.getConfig().getLastSchema()); + } else { + readSchema = upsertHandle.getWriterSchemaWithMetafields(); + } } BoundedInMemoryExecutor wrapper = null; diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodiePartialUpdate.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodiePartialUpdate.java new file mode 100644 index 0000000000000..d28c6d8871f43 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodiePartialUpdate.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.fs.Path; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.PartialUpdatePayload; +import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.testutils.HoodieClientTestBase; +import org.apache.spark.api.java.JavaRDD; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.AVRO_TRIP_SCHEMA; +import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_SCHEMA; +import static org.apache.hudi.common.testutils.Transformations.recordsToHoodieKeys; +import static org.apache.hudi.common.util.ParquetUtils.readAvroRecords; +import static org.apache.hudi.common.util.ParquetUtils.readAvroSchema; +import static org.apache.hudi.common.util.ParquetUtils.readRowKeysFromParquet; +import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class TestHoodiePartialUpdate extends HoodieClientTestBase { + + @Test + public void testCopyOnWritePartialUpdate() { + final String testPartitionPath = "2016/09/26"; + SparkRDDWriteClient client = getHoodieWriteClient(getConfig(true, false)); + dataGen = new HoodieTestDataGenerator(new String[] {testPartitionPath}); + + String commitTime1 = "001"; + client.startCommitWithTime(commitTime1); + + List inserts1 = + dataGen.generateInsertsStream(commitTime1, 100, false, TRIP_SCHEMA).collect(Collectors.toList()); // this writes ~500kb + + List insertKeys = recordsToHoodieKeys(inserts1); + upsertAndCheck(client, insertKeys, commitTime1, false); + + client = getHoodieWriteClient(getConfig(true, true)); + String commitTime2 = "002"; + client.startCommitWithTime(commitTime2); + + WriteStatus writeStatus = upsertAndCheck(client, insertKeys, commitTime2, true); + + Schema schema = readAvroSchema(hadoopConf, new Path(basePath, writeStatus.getStat().getPath())); + List oldSchemaFieldNames = AVRO_TRIP_SCHEMA.getFields().stream().map(Schema.Field::name).collect(Collectors.toList()); + List parquetFieldNames = schema.getFields().stream().map(Schema.Field::name).collect(Collectors.toList()); + + for (String name : oldSchemaFieldNames) { + assertTrue(parquetFieldNames.contains(name)); + } + + List records1 = readAvroRecords(hadoopConf, new Path(basePath, writeStatus.getStat().getPath())); + for (GenericRecord record : records1) { + assertEquals("rider-" + commitTime1, record.get("rider").toString()); + assertEquals("driver-" + commitTime1, record.get("driver").toString()); + assertEquals(String.valueOf(1L), record.get("timestamp").toString()); + } + } + + private WriteStatus upsertAndCheck(SparkRDDWriteClient client, List insertKeys, String commitTime, boolean partial) { + List records = new ArrayList<>(); + for (HoodieKey hoodieKey : insertKeys) { + PartialUpdatePayload payload; + if (partial) { + payload = dataGen.generatePartialUpdatePayloadForPartialTripSchema(hoodieKey, commitTime); + } else { + payload = dataGen.generatePartialUpdatePayloadForTripSchema(hoodieKey, commitTime); + } + HoodieRecord record = new HoodieRecord(hoodieKey, payload); + records.add(record); + } + + JavaRDD insertRecordsRDD1 = jsc.parallelize(records, 1); + List statuses = client.upsert(insertRecordsRDD1, commitTime).collect(); + + assertNoWriteErrors(statuses); + + assertEquals(1, statuses.size(), "Just 1 file needs to be added."); + assertEquals(100, + readRowKeysFromParquet(hadoopConf, new Path(basePath, statuses.get(0).getStat().getPath())) + .size(), "file should contain 100 records"); + + return statuses.get(0); + } +} \ No newline at end of file diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java index 1386edcfee1aa..5cae6c05155a5 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java @@ -62,6 +62,8 @@ import java.util.function.Function; import java.util.stream.Collectors; +import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.PARTIAL_TRIP_SCHEMA; +import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_SCHEMA; import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -97,6 +99,14 @@ public HoodieWriteConfig getConfig(IndexType indexType) { return getConfigBuilder(indexType).build(); } + public HoodieWriteConfig getConfig(boolean updatePartialFields, boolean usePartialSchema) { + if (usePartialSchema) { + return getConfigBuilder(updatePartialFields, PARTIAL_TRIP_SCHEMA).build(); + } else { + return getConfigBuilder(updatePartialFields, TRIP_SCHEMA).build(); + } + } + /** * Get Config builder with default configs set. * @@ -112,7 +122,11 @@ public HoodieWriteConfig.Builder getConfigBuilder() { * @return Config Builder */ public HoodieWriteConfig.Builder getConfigBuilder(HoodieFailedWritesCleaningPolicy cleaningPolicy) { - return getConfigBuilder(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA, IndexType.BLOOM, cleaningPolicy); + return getConfigBuilder(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA, IndexType.BLOOM, cleaningPolicy, false); + } + + public HoodieWriteConfig.Builder getConfigBuilder(boolean updatePartialFields, String schemaStr) { + return getConfigBuilder(schemaStr, IndexType.BLOOM, HoodieFailedWritesCleaningPolicy.EAGER, updatePartialFields); } /** @@ -121,15 +135,15 @@ public HoodieWriteConfig.Builder getConfigBuilder(HoodieFailedWritesCleaningPoli * @return Config Builder */ public HoodieWriteConfig.Builder getConfigBuilder(IndexType indexType) { - return getConfigBuilder(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA, indexType, HoodieFailedWritesCleaningPolicy.EAGER); + return getConfigBuilder(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA, indexType, HoodieFailedWritesCleaningPolicy.EAGER, false); } public HoodieWriteConfig.Builder getConfigBuilder(String schemaStr) { - return getConfigBuilder(schemaStr, IndexType.BLOOM, HoodieFailedWritesCleaningPolicy.EAGER); + return getConfigBuilder(schemaStr, IndexType.BLOOM, HoodieFailedWritesCleaningPolicy.EAGER, false); } public HoodieWriteConfig.Builder getConfigBuilder(String schemaStr, IndexType indexType) { - return getConfigBuilder(schemaStr, indexType, HoodieFailedWritesCleaningPolicy.EAGER); + return getConfigBuilder(schemaStr, indexType, HoodieFailedWritesCleaningPolicy.EAGER, false); } /** @@ -138,11 +152,12 @@ public HoodieWriteConfig.Builder getConfigBuilder(String schemaStr, IndexType in * @return Config Builder */ public HoodieWriteConfig.Builder getConfigBuilder(String schemaStr, IndexType indexType, - HoodieFailedWritesCleaningPolicy cleaningPolicy) { + HoodieFailedWritesCleaningPolicy cleaningPolicy, boolean updatePartialFields) { return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(schemaStr) .withParallelism(2, 2).withBulkInsertParallelism(2).withFinalizeWriteParallelism(2).withDeleteParallelism(2) .withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION) .withWriteStatusClass(MetadataMergeWriteStatus.class) + .withUpdatePartialFields(updatePartialFields) .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build()) .withCompactionConfig(HoodieCompactionConfig.newBuilder().withFailedWritesCleaningPolicy(cleaningPolicy) .compactionSmallFileSize(1024 * 1024).build()) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdatePayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdatePayload.java new file mode 100644 index 0000000000000..6a47d73587cc9 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdatePayload.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.model; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hudi.common.util.Option; + +import java.io.IOException; +import java.util.List; + +public class PartialUpdatePayload extends OverwriteWithLatestAvroPayload { + public PartialUpdatePayload(GenericRecord record, Comparable orderingVal) { + super(record, orderingVal); + } + + public PartialUpdatePayload(Option record) { + this(record.get(), (record1) -> 0); // natural order + } + + @Override + public Option combineAndGetUpdateValue(IndexedRecord lastValue, Schema schema) throws IOException { + Option recordOption = getInsertValue(schema); + if (recordOption.isPresent()) { + IndexedRecord record = recordOption.get(); + GenericRecord current = (GenericRecord) record; + + List fieldList = schema.getFields(); + GenericRecord last = (GenericRecord) lastValue; + for (Schema.Field field : fieldList) { + last.put(field.name(), current.get(field.name())); + } + return Option.ofNullable(last); + } + return recordOption; + } +} \ No newline at end of file diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java index ea36f67396f8c..a2340e37d5011 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java @@ -23,6 +23,8 @@ import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.exception.HoodieException; @@ -59,7 +61,9 @@ public static HoodieCommitMetadata buildMetadata(List writeStat Option> extraMetadata, WriteOperationType operationType, String schemaToStoreInCommit, - String commitActionType) { + String commitActionType, + Boolean updatePartialFields, + HoodieTableMetaClient metaClient) { HoodieCommitMetadata commitMetadata = buildMetadataFromStats(writeStats, partitionToReplaceFileIds, commitActionType, operationType); @@ -67,6 +71,14 @@ public static HoodieCommitMetadata buildMetadata(List writeStat if (extraMetadata.isPresent()) { extraMetadata.get().forEach(commitMetadata::addMetadata); } + if (updatePartialFields) { + try { + TableSchemaResolver resolver = new TableSchemaResolver(metaClient); + schemaToStoreInCommit = resolver.getTableAvroSchemaWithoutMetadataFields().toString(); + } catch (Exception e) { + // ignore exception. + } + } commitMetadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, schemaToStoreInCommit == null ? "" : schemaToStoreInCommit); commitMetadata.setOperationType(operationType); return commitMetadata; diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java index e103427d40728..2b60a769d7d16 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java @@ -1322,7 +1322,7 @@ public void testReplaceWithTimeTravel() throws IOException { replacedFileIds.add(fileId1); partitionToReplaceFileIds.put(partitionPath1, replacedFileIds); HoodieCommitMetadata commitMetadata = - CommitUtils.buildMetadata(Collections.emptyList(), partitionToReplaceFileIds, Option.empty(), WriteOperationType.INSERT_OVERWRITE, "", HoodieTimeline.REPLACE_COMMIT_ACTION); + CommitUtils.buildMetadata(Collections.emptyList(), partitionToReplaceFileIds, Option.empty(), WriteOperationType.INSERT_OVERWRITE, "", HoodieTimeline.REPLACE_COMMIT_ACTION, false, metaClient); commitTimeline = metaClient.getActiveTimeline(); HoodieInstant instant2 = new HoodieInstant(true, HoodieTimeline.REPLACE_COMMIT_ACTION, commitTime2); saveAsComplete(commitTimeline, instant2, Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8))); @@ -1407,7 +1407,7 @@ public void testReplaceFileIdIsExcludedInView() throws IOException { replacedFileIdsP2.add(fileId4); partitionToReplaceFileIds.put(partitionPath2, replacedFileIdsP2); HoodieCommitMetadata commitMetadata = - CommitUtils.buildMetadata(Collections.emptyList(), partitionToReplaceFileIds, Option.empty(), WriteOperationType.INSERT_OVERWRITE, "", HoodieTimeline.REPLACE_COMMIT_ACTION); + CommitUtils.buildMetadata(Collections.emptyList(), partitionToReplaceFileIds, Option.empty(), WriteOperationType.INSERT_OVERWRITE, "", HoodieTimeline.REPLACE_COMMIT_ACTION, false, metaClient); HoodieActiveTimeline commitTimeline = metaClient.getActiveTimeline(); HoodieInstant instant1 = new HoodieInstant(true, HoodieTimeline.REPLACE_COMMIT_ACTION, commitTime1); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java index 8017bc3d74860..e039b09e83418 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java @@ -27,6 +27,7 @@ import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodiePartitionMetadata; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.PartialUpdatePayload; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; @@ -126,12 +127,15 @@ public class HoodieTestDataGenerator { public static final String TRIP_HIVE_COLUMN_TYPES = "bigint,string,string,string,double,double,double,double,int,bigint,float,binary,int,bigint,decimal(10,6)," + "map,struct,array>,boolean"; + public static final String PARTIAL_TRIP_SCHEMA = "{\"type\":\"record\",\"name\":\"partialTripUberRec\",\"fields\":[" + + "{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"_row_key\",\"type\":\"string\"}]}"; public static final Schema AVRO_SCHEMA = new Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA); public static final Schema AVRO_SCHEMA_WITH_METADATA_FIELDS = HoodieAvroUtils.addMetadataFields(AVRO_SCHEMA); public static final Schema AVRO_SHORT_TRIP_SCHEMA = new Schema.Parser().parse(SHORT_TRIP_SCHEMA); public static final Schema AVRO_TRIP_SCHEMA = new Schema.Parser().parse(TRIP_SCHEMA); + public static final Schema PARTIAL_AVRO_TRIP_SCHEMA = new Schema.Parser().parse(PARTIAL_TRIP_SCHEMA); public static final Schema FLATTENED_AVRO_SCHEMA = new Schema.Parser().parse(TRIP_FLATTENED_SCHEMA); private static final Random RAND = new Random(46474747); @@ -179,7 +183,6 @@ public RawTripTestPayload generateRandomValueAsPerSchema(String schemaStr, Hoodi } else if (SHORT_TRIP_SCHEMA.equals(schemaStr)) { return generatePayloadForShortTripSchema(key, commitTime); } - return null; } @@ -235,6 +238,16 @@ public static RawTripTestPayload generateRandomDeleteValue(HoodieKey key, String return new RawTripTestPayload(Option.of(rec.toString()), key.getRecordKey(), key.getPartitionPath(), TRIP_EXAMPLE_SCHEMA, true); } + public PartialUpdatePayload generatePartialUpdatePayloadForTripSchema(HoodieKey key, String commitTime) { + GenericRecord rec = generateRecordForTripSchema(key.getRecordKey(), "rider-" + commitTime, "driver-" + commitTime, 0L); + return new PartialUpdatePayload(rec, 0); + } + + public PartialUpdatePayload generatePartialUpdatePayloadForPartialTripSchema(HoodieKey key, String commitTime) { + GenericRecord rec = generateRecordForPartialTripSchema(key.getRecordKey(), 1L); + return new PartialUpdatePayload(rec, 0); + } + /** * Generates a new avro record of the above schema format, retaining the key if optionally provided. */ @@ -329,6 +342,13 @@ public GenericRecord generateRecordForShortTripSchema(String rowKey, String ride return rec; } + public GenericRecord generateRecordForPartialTripSchema(String rowKey, Long timestamp) { + GenericRecord rec = new GenericData.Record(PARTIAL_AVRO_TRIP_SCHEMA); + rec.put("_row_key", rowKey); + rec.put("timestamp", timestamp); + return rec; + } + public static void createCommitFile(String basePath, String instantTime, Configuration configuration) { HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata(); createCommitFile(basePath, instantTime, configuration, commitMetadata); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/TestCommitUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/util/TestCommitUtils.java index c55e34acfa877..87fc8bc169044 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/util/TestCommitUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestCommitUtils.java @@ -50,7 +50,7 @@ public void testCommitMetadataCreation() { Option.empty(), WriteOperationType.INSERT, TRIP_SCHEMA, - HoodieTimeline.DELTA_COMMIT_ACTION); + HoodieTimeline.DELTA_COMMIT_ACTION, false, null); assertFalse(commitMetadata instanceof HoodieReplaceCommitMetadata); assertEquals(2, commitMetadata.getPartitionToWriteStats().size()); @@ -74,7 +74,7 @@ public void testReplaceMetadataCreation() { Option.empty(), WriteOperationType.INSERT, TRIP_SCHEMA, - HoodieTimeline.REPLACE_COMMIT_ACTION); + HoodieTimeline.REPLACE_COMMIT_ACTION, false, null); assertTrue(commitMetadata instanceof HoodieReplaceCommitMetadata); HoodieReplaceCommitMetadata replaceCommitMetadata = (HoodieReplaceCommitMetadata) commitMetadata;