diff --git a/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java b/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java index d0526b023602..61c3a81001b2 100644 --- a/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java +++ b/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java @@ -29,6 +29,7 @@ import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableVersion; +import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; @@ -117,7 +118,17 @@ public boolean commitStats(String instantTime, List stats, Opti if (extraMetadata.isPresent()) { extraMetadata.get().forEach(metadata::addMetadata); } - metadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, config.getSchema()); + String schema = config.getSchema(); + if (config.updatePartialFields()) { + try { + TableSchemaResolver resolver = new TableSchemaResolver(table.getMetaClient()); + schema = resolver.getTableAvroSchemaWithoutMetadataFields().toString(); + } catch (Exception e) { + // ignore exception. + schema = config.getSchema(); + } + } + metadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, schema); metadata.setOperationType(operationType); try { diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 249b107f8893..ecda4a3a0ebb 100644 --- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -61,6 +61,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { public static final String TIMELINE_LAYOUT_VERSION = "hoodie.timeline.layout.version"; public static final String BASE_PATH_PROP = "hoodie.base.path"; public static final String AVRO_SCHEMA = "hoodie.avro.schema"; + public static final String LAST_AVRO_SCHEMA = "hoodie.last.avro.schema"; public static final String AVRO_SCHEMA_VALIDATE = "hoodie.avro.schema.validate"; public static final String DEFAULT_AVRO_SCHEMA_VALIDATE = "false"; public static final String DEFAULT_PARALLELISM = "1500"; @@ -94,6 +95,11 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { public static final String BULKINSERT_SORT_MODE = "hoodie.bulkinsert.sort.mode"; public static final String DEFAULT_BULKINSERT_SORT_MODE = BulkInsertSortMode.GLOBAL_SORT .toString(); + public static final String DELETE_MARKER_FIELD_PROP = "hoodie.write.delete.marker.field"; + public static final String DEFAULT_DELETE_MARKER_FIELD = "_hoodie_is_deleted"; + + public static final String UPDATE_PARTIAL_FIELDS = "hoodie.update.partial.fields"; + public static final String DEFAULT_UPDATE_PARTIAL_FIELDS = "false"; public static final String EMBEDDED_TIMELINE_SERVER_ENABLED = "hoodie.embed.timeline.server"; public static final String DEFAULT_EMBEDDED_TIMELINE_SERVER_ENABLED = "true"; @@ -161,10 +167,18 @@ public String getSchema() { return props.getProperty(AVRO_SCHEMA); } + public String getLastSchema() { + return props.getProperty(LAST_AVRO_SCHEMA); + } + public void setSchema(String schemaStr) { props.setProperty(AVRO_SCHEMA, schemaStr); } + public void setLastSchema(String schemaStr) { + props.setProperty(LAST_AVRO_SCHEMA, schemaStr); + } + public boolean getAvroSchemaValidate() { return Boolean.parseBoolean(props.getProperty(AVRO_SCHEMA_VALIDATE)); } @@ -274,6 +288,14 @@ public BulkInsertSortMode getBulkInsertSortMode() { return BulkInsertSortMode.valueOf(sortMode.toUpperCase()); } + public String getDeleteMarkerField() { + return props.getProperty(DELETE_MARKER_FIELD_PROP); + } + + public Boolean updatePartialFields() { + return Boolean.parseBoolean(props.getProperty(UPDATE_PARTIAL_FIELDS)); + } + /** * compaction properties. */ @@ -784,6 +806,11 @@ public Builder withSchema(String schemaStr) { return this; } + public Builder withLastSchema(String schemaStr) { + props.setProperty(LAST_AVRO_SCHEMA, schemaStr); + return this; + } + public Builder withAvroSchemaValidate(boolean enable) { props.setProperty(AVRO_SCHEMA_VALIDATE, String.valueOf(enable)); return this; @@ -940,6 +967,11 @@ public Builder withExternalSchemaTrasformation(boolean enabled) { return this; } + public Builder withUpdatePartialFields(boolean updatePartialFields) { + props.setProperty(UPDATE_PARTIAL_FIELDS, String.valueOf(updatePartialFields)); + return this; + } + public Builder withProperties(Properties properties) { this.props.putAll(properties); return this; @@ -950,6 +982,7 @@ protected void setDefaults() { setDefaultOnCondition(props, !props.containsKey(INSERT_PARALLELISM), INSERT_PARALLELISM, DEFAULT_PARALLELISM); setDefaultOnCondition(props, !props.containsKey(BULKINSERT_PARALLELISM), BULKINSERT_PARALLELISM, DEFAULT_PARALLELISM); + setDefaultOnCondition(props, !props.containsKey(UPDATE_PARTIAL_FIELDS), UPDATE_PARTIAL_FIELDS, DEFAULT_UPDATE_PARTIAL_FIELDS); setDefaultOnCondition(props, !props.containsKey(UPSERT_PARALLELISM), UPSERT_PARALLELISM, DEFAULT_PARALLELISM); setDefaultOnCondition(props, !props.containsKey(DELETE_PARALLELISM), DELETE_PARALLELISM, DEFAULT_PARALLELISM); setDefaultOnCondition(props, !props.containsKey(ROLLBACK_PARALLELISM), ROLLBACK_PARALLELISM, diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java index 5a76dc7469de..b51e48aac3af 100644 --- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java +++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java @@ -34,7 +34,6 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieInsertException; import org.apache.hudi.io.storage.HoodieFileWriter; -import org.apache.hudi.io.storage.HoodieFileWriterFactory; import org.apache.hudi.table.HoodieTable; import org.apache.avro.generic.GenericRecord; @@ -60,7 +59,7 @@ public class HoodieCreateHandle extends HoodieWri public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable hoodieTable, String partitionPath, String fileId, SparkTaskContextSupplier sparkTaskContextSupplier) { - this(config, instantTime, hoodieTable, partitionPath, fileId, getWriterSchemaIncludingAndExcludingMetadataPair(config), + this(config, instantTime, hoodieTable, partitionPath, fileId, getWriterSchemaIncludingAndExcludingMetadataPair(config, hoodieTable), sparkTaskContextSupplier); } @@ -79,7 +78,7 @@ public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTa new Path(config.getBasePath()), FSUtils.getPartitionPath(config.getBasePath(), partitionPath)); partitionMetadata.trySave(getPartitionId()); createMarkerFile(partitionPath, FSUtils.makeDataFileName(this.instantTime, this.writeToken, this.fileId, hoodieTable.getBaseFileExtension())); - this.fileWriter = HoodieFileWriterFactory.getFileWriter(instantTime, path, hoodieTable, config, writerSchemaWithMetafields, this.sparkTaskContextSupplier); + this.fileWriter = createNewFileWriter(instantTime, path, hoodieTable, config, writerSchemaWithMetafields, this.sparkTaskContextSupplier); } catch (IOException e) { throw new HoodieInsertException("Failed to initialize HoodieStorageWriter for path " + path, e); } diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java index 5ea8c380295a..d9d2a84fa3ac 100644 --- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java +++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java @@ -25,9 +25,11 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.IOType; +import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ReflectionUtils; +import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieIOException; @@ -65,7 +67,7 @@ public abstract class HoodieWriteHandle extends H public HoodieWriteHandle(HoodieWriteConfig config, String instantTime, String partitionPath, String fileId, HoodieTable hoodieTable, SparkTaskContextSupplier sparkTaskContextSupplier) { this(config, instantTime, partitionPath, fileId, hoodieTable, - getWriterSchemaIncludingAndExcludingMetadataPair(config), sparkTaskContextSupplier); + getWriterSchemaIncludingAndExcludingMetadataPair(config, hoodieTable), sparkTaskContextSupplier); } protected HoodieWriteHandle(HoodieWriteConfig config, String instantTime, String partitionPath, String fileId, @@ -90,9 +92,19 @@ protected HoodieWriteHandle(HoodieWriteConfig config, String instantTime, String * @param config Write Config * @return */ - protected static Pair getWriterSchemaIncludingAndExcludingMetadataPair(HoodieWriteConfig config) { + protected static Pair getWriterSchemaIncludingAndExcludingMetadataPair(HoodieWriteConfig config, HoodieTable hoodieTable) { Schema originalSchema = new Schema.Parser().parse(config.getSchema()); Schema hoodieSchema = HoodieAvroUtils.addMetadataFields(originalSchema); + boolean updatePartialFields = config.updatePartialFields(); + if (updatePartialFields) { + try { + TableSchemaResolver resolver = new TableSchemaResolver(hoodieTable.getMetaClient()); + Schema lastSchema = resolver.getTableAvroSchema(); + config.setLastSchema(lastSchema.toString()); + } catch (Exception e) { + // Ignore exception. + } + } return Pair.of(originalSchema, hoodieSchema); } @@ -164,7 +176,11 @@ public void write(HoodieRecord record, Option avroRecord, Option< * Rewrite the GenericRecord with the Schema containing the Hoodie Metadata fields. */ protected GenericRecord rewriteRecord(GenericRecord record) { - return HoodieAvroUtils.rewriteRecord(record, writerSchemaWithMetafields); + if (config.updatePartialFields() && !StringUtils.isNullOrEmpty(config.getLastSchema())) { + return HoodieAvroUtils.rewriteRecord(record, new Schema.Parser().parse(config.getLastSchema())); + } else { + return HoodieAvroUtils.rewriteRecord(record, writerSchemaWithMetafields); + } } public abstract WriteStatus close(); @@ -192,6 +208,10 @@ protected long getAttemptId() { protected HoodieFileWriter createNewFileWriter(String instantTime, Path path, HoodieTable hoodieTable, HoodieWriteConfig config, Schema schema, SparkTaskContextSupplier sparkTaskContextSupplier) throws IOException { - return HoodieFileWriterFactory.getFileWriter(instantTime, path, hoodieTable, config, schema, sparkTaskContextSupplier); + if (config.updatePartialFields() && !StringUtils.isNullOrEmpty(config.getLastSchema())) { + return HoodieFileWriterFactory.getFileWriter(instantTime, path, hoodieTable, config, new Schema.Parser().parse(config.getLastSchema()), sparkTaskContextSupplier); + } else { + return HoodieFileWriterFactory.getFileWriter(instantTime, path, hoodieTable, config, schema, sparkTaskContextSupplier); + } } } diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java index 5408d44cfa3e..3f1bba620c19 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java @@ -37,6 +37,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstant.State; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieCommitException; import org.apache.hudi.exception.HoodieIOException; @@ -237,6 +238,9 @@ protected void finalizeWrite(String instantTime, List stats, Ho * By default, return the writer schema in Write Config for storing in commit. */ protected String getSchemaToStoreInCommit() { + if (config.updatePartialFields() && !StringUtils.isNullOrEmpty(config.getLastSchema())) { + return config.getLastSchema(); + } return config.getSchema(); } diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/MergeHelper.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/MergeHelper.java index 4daa5c61fa96..dc76e0812301 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/MergeHelper.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/MergeHelper.java @@ -30,6 +30,7 @@ import org.apache.hudi.client.utils.MergingIterator; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor; import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer; import org.apache.hudi.exception.HoodieException; @@ -73,7 +74,11 @@ public static > void runMerge(HoodieTable ta } else { gReader = null; gWriter = null; - readSchema = upsertHandle.getWriterSchemaWithMetafields(); + if (table.getConfig().updatePartialFields() && !StringUtils.isNullOrEmpty(table.getConfig().getLastSchema())) { + readSchema = new Schema.Parser().parse(table.getConfig().getLastSchema()); + } else { + readSchema = upsertHandle.getWriterSchemaWithMetafields(); + } } BoundedInMemoryExecutor wrapper = null; diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodiePartialUpdate.java b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodiePartialUpdate.java new file mode 100644 index 000000000000..25b17ff2f3c1 --- /dev/null +++ b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodiePartialUpdate.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.fs.Path; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.PartialUpdatePayload; +import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.testutils.HoodieClientTestBase; +import org.apache.spark.api.java.JavaRDD; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.AVRO_TRIP_SCHEMA; +import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_SCHEMA; +import static org.apache.hudi.common.testutils.Transformations.recordsToHoodieKeys; +import static org.apache.hudi.common.util.ParquetUtils.readAvroRecords; +import static org.apache.hudi.common.util.ParquetUtils.readAvroSchema; +import static org.apache.hudi.common.util.ParquetUtils.readRowKeysFromParquet; +import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class TestHoodiePartialUpdate extends HoodieClientTestBase { + + @Test + public void testCopyOnWritePartialUpdate() { + final String testPartitionPath = "2016/09/26"; + HoodieWriteClient client = getHoodieWriteClient(getConfig(true, false)); + dataGen = new HoodieTestDataGenerator(new String[] {testPartitionPath}); + + String commitTime1 = "001"; + client.startCommitWithTime(commitTime1); + + List inserts1 = + dataGen.generateInsertsStream(commitTime1, 100, false, TRIP_SCHEMA).collect(Collectors.toList()); // this writes ~500kb + + List insertKeys = recordsToHoodieKeys(inserts1); + upsertAndCheck(client, insertKeys, commitTime1, false); + + client = getHoodieWriteClient(getConfig(true, true)); + String commitTime2 = "002"; + client.startCommitWithTime(commitTime2); + + WriteStatus writeStatus = upsertAndCheck(client, insertKeys, commitTime2, true); + + Schema schema = readAvroSchema(hadoopConf, new Path(basePath, writeStatus.getStat().getPath())); + List oldSchemaFieldNames = AVRO_TRIP_SCHEMA.getFields().stream().map(Schema.Field::name).collect(Collectors.toList()); + List parquetFieldNames = schema.getFields().stream().map(Schema.Field::name).collect(Collectors.toList()); + + for (String name : oldSchemaFieldNames) { + assertTrue(parquetFieldNames.contains(name)); + } + + List records1 = readAvroRecords(hadoopConf, new Path(basePath, writeStatus.getStat().getPath())); + for (GenericRecord record : records1) { + assertEquals("rider-" + commitTime1, record.get("rider").toString()); + assertEquals("driver-" + commitTime1, record.get("driver").toString()); + assertEquals(String.valueOf(1.0), record.get("timestamp").toString()); + } + } + + private WriteStatus upsertAndCheck(HoodieWriteClient client, List insertKeys, String commitTime, boolean partial) { + List records = new ArrayList<>(); + for (HoodieKey hoodieKey : insertKeys) { + PartialUpdatePayload payload; + if (partial) { + payload = dataGen.generatePartialUpdatePayloadForPartialTripSchema(hoodieKey, commitTime); + } else { + payload = dataGen.generatePartialUpdatePayloadForTripSchema(hoodieKey, commitTime); + } + HoodieRecord record = new HoodieRecord(hoodieKey, payload); + records.add(record); + } + + JavaRDD insertRecordsRDD1 = jsc.parallelize(records, 1); + List statuses = client.upsert(insertRecordsRDD1, commitTime).collect(); + + assertNoWriteErrors(statuses); + + assertEquals(1, statuses.size(), "Just 1 file needs to be added."); + assertEquals(100, + readRowKeysFromParquet(hadoopConf, new Path(basePath, statuses.get(0).getStat().getPath())) + .size(), "file should contain 100 records"); + + return statuses.get(0); + } +} diff --git a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java index 203cc54f5213..fe06f2f98ee8 100644 --- a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java +++ b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java @@ -61,6 +61,8 @@ import java.util.function.Function; import java.util.stream.Collectors; +import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.PARTIAL_TRIP_SCHEMA; +import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_SCHEMA; import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -96,6 +98,14 @@ public HoodieWriteConfig getConfig(IndexType indexType) { return getConfigBuilder(indexType).build(); } + public HoodieWriteConfig getConfig(boolean updatePartialFields, boolean usePartialSchema) { + if (usePartialSchema) { + return getConfigBuilder(updatePartialFields, PARTIAL_TRIP_SCHEMA).build(); + } else { + return getConfigBuilder(updatePartialFields, TRIP_SCHEMA).build(); + } + } + /** * Get Config builder with default configs set. * @@ -118,16 +128,25 @@ public HoodieWriteConfig.Builder getConfigBuilder(String schemaStr) { return getConfigBuilder(schemaStr, IndexType.BLOOM); } + public HoodieWriteConfig.Builder getConfigBuilder(boolean updatePartialFields, String schemaStr) { + return getConfigBuilder(schemaStr, IndexType.BLOOM, updatePartialFields); + } + /** * Get Config builder with default configs set. * * @return Config Builder */ public HoodieWriteConfig.Builder getConfigBuilder(String schemaStr, IndexType indexType) { + return getConfigBuilder(schemaStr, indexType, false); + } + + public HoodieWriteConfig.Builder getConfigBuilder(String schemaStr, IndexType indexType, boolean updatePartialFields) { return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(schemaStr) .withParallelism(2, 2).withBulkInsertParallelism(2).withFinalizeWriteParallelism(2) .withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION) .withWriteStatusClass(MetadataMergeWriteStatus.class) + .withUpdatePartialFields(updatePartialFields) .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build()) .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024).build()) .withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024).build()) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdatePayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdatePayload.java new file mode 100644 index 000000000000..149678c03919 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdatePayload.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.model; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hudi.common.util.Option; + +import java.io.IOException; +import java.util.List; + +public class PartialUpdatePayload extends OverwriteWithLatestAvroPayload { + public PartialUpdatePayload(GenericRecord record, Comparable orderingVal) { + super(record, orderingVal); + } + + public PartialUpdatePayload(Option record) { + this(record.get(), (record1) -> 0); // natural order + } + + @Override + public Option combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) throws IOException { + Option recordOption = getInsertValue(schema); + if (recordOption.isPresent()) { + IndexedRecord record = recordOption.get(); + GenericRecord current = (GenericRecord) record; + + List fieldList = schema.getFields(); + GenericRecord last = (GenericRecord) currentValue; + for (Schema.Field field : fieldList) { + last.put(field.name(), current.get(field.name())); + } + return Option.ofNullable(last); + } + return recordOption; + } +} \ No newline at end of file diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java index 90b15d023cce..1f3ce1e24a78 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java @@ -30,6 +30,7 @@ import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodiePartitionMetadata; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.PartialUpdatePayload; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; @@ -122,6 +123,9 @@ public class HoodieTestDataGenerator { + "{\"name\":\"timestamp\",\"type\":\"double\"},{\"name\":\"_row_key\",\"type\":\"string\"},{\"name\":\"rider\",\"type\":\"string\"}," + "{\"name\":\"driver\",\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false}]}"; + public static final String PARTIAL_TRIP_SCHEMA = "{\"type\":\"record\",\"name\":\"partialTripUberRec\",\"fields\":[" + + "{\"name\":\"timestamp\",\"type\":\"double\"},{\"name\":\"_row_key\",\"type\":\"string\"}]}"; + public static final String NULL_SCHEMA = Schema.create(Schema.Type.NULL).toString(); public static final String TRIP_HIVE_COLUMN_TYPES = "double,string,string,string,double,double,double,double,int,bigint,float,binary,int,bigint,decimal(10,6)," + "map,struct,array>,boolean"; @@ -132,6 +136,7 @@ public class HoodieTestDataGenerator { HoodieAvroUtils.addMetadataFields(AVRO_SCHEMA); public static final Schema AVRO_SHORT_TRIP_SCHEMA = new Schema.Parser().parse(SHORT_TRIP_SCHEMA); public static final Schema AVRO_TRIP_SCHEMA = new Schema.Parser().parse(TRIP_SCHEMA); + public static final Schema PARTIAL_AVRO_TRIP_SCHEMA = new Schema.Parser().parse(PARTIAL_TRIP_SCHEMA); public static final Schema FLATTENED_AVRO_SCHEMA = new Schema.Parser().parse(TRIP_FLATTENED_SCHEMA); private static final Random RAND = new Random(46474747); @@ -175,7 +180,6 @@ public RawTripTestPayload generateRandomValueAsPerSchema(String schemaStr, Hoodi } else if (SHORT_TRIP_SCHEMA.equals(schemaStr)) { return generatePayloadForShortTripSchema(key, commitTime); } - return null; } @@ -222,6 +226,16 @@ public RawTripTestPayload generatePayloadForShortTripSchema(HoodieKey key, Strin return new RawTripTestPayload(rec.toString(), key.getRecordKey(), key.getPartitionPath(), SHORT_TRIP_SCHEMA); } + public PartialUpdatePayload generatePartialUpdatePayloadForTripSchema(HoodieKey key, String commitTime) { + GenericRecord rec = generateRecordForTripSchema(key.getRecordKey(), "rider-" + commitTime, "driver-" + commitTime, 0.0); + return new PartialUpdatePayload(rec, 0); + } + + public PartialUpdatePayload generatePartialUpdatePayloadForPartialTripSchema(HoodieKey key, String commitTime) { + GenericRecord rec = generateRecordForPartialTripSchema(key.getRecordKey(), 1.0); + return new PartialUpdatePayload(rec, 0); + } + /** * Generates a new avro record of the above schema format for a delete. */ @@ -314,6 +328,13 @@ public GenericRecord generateRecordForTripSchema(String rowKey, String riderName return rec; } + public GenericRecord generateRecordForPartialTripSchema(String rowKey, double timestamp) { + GenericRecord rec = new GenericData.Record(PARTIAL_AVRO_TRIP_SCHEMA); + rec.put("_row_key", rowKey); + rec.put("timestamp", timestamp); + return rec; + } + public GenericRecord generateRecordForShortTripSchema(String rowKey, String riderName, String driverName, double timestamp) { GenericRecord rec = new GenericData.Record(AVRO_SHORT_TRIP_SCHEMA); rec.put("_row_key", rowKey);