diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java index 39f397ab170d6..8e59478da05f9 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java @@ -18,7 +18,7 @@ package org.apache.hudi.client.utils; -import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; import org.apache.hudi.client.transaction.ConflictResolutionStrategy; @@ -53,8 +53,12 @@ public class TransactionUtils { * @return * @throws HoodieWriteConflictException */ - public static Option resolveWriteConflictIfAny(final HoodieTable table, final Option currentTxnOwnerInstant, - final Option thisCommitMetadata, final HoodieWriteConfig config, Option lastCompletedTxnOwnerInstant) throws HoodieWriteConflictException { + public static Option resolveWriteConflictIfAny( + final HoodieTable table, + final Option currentTxnOwnerInstant, + final Option thisCommitMetadata, + final HoodieWriteConfig config, + Option lastCompletedTxnOwnerInstant) throws HoodieWriteConflictException { if (config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) { ConflictResolutionStrategy resolutionStrategy = config.getWriteConflictResolutionStrategy(); Stream instantStream = resolutionStrategy.getCandidateInstants(table.getActiveTimeline(), currentTxnOwnerInstant.get(), lastCompletedTxnOwnerInstant); @@ -72,8 +76,11 @@ public static Option resolveWriteConflictIfAny(final Hoodi } }); LOG.info("Successfully resolved conflicts, if any"); - // carry over necessary metadata from latest commit metadata - overrideWithLatestCommitMetadata(table.getMetaClient(), thisOperation.getCommitMetadataOption(), currentTxnOwnerInstant, Arrays.asList(config.getWriteMetaKeyPrefixes().split(","))); + + if (config.mergeDeltastreamerStateFromPreviousCommit()) { + mergeCheckpointStateFromPreviousCommit(table.getMetaClient(), thisOperation.getCommitMetadataOption()); + } + return thisOperation.getCommitMetadataOption(); } return thisCommitMetadata; @@ -111,16 +118,27 @@ public static Option>> getLastCompletedT } } - // override the current metadata with the metadata from the latest instant for the specified key prefixes - private static void overrideWithLatestCommitMetadata(HoodieTableMetaClient metaClient, Option thisMetadata, - Option thisInstant, List keyPrefixes) { + protected static void mergeCheckpointStateFromPreviousCommit(HoodieTableMetaClient metaClient, Option thisMetadata) { + overrideWithLatestCommitMetadata(metaClient, thisMetadata, Collections.singletonList(HoodieWriteConfig.DELTASTREAMER_CHECKPOINT_KEY)); + } + + /** + * Generic method allowing us to override the current metadata with the metadata from + * the latest instant for the specified key prefixes. + * @param metaClient + * @param thisMetadata + * @param keyPrefixes The key prefixes to merge from the previous commit + */ + private static void overrideWithLatestCommitMetadata(HoodieTableMetaClient metaClient, + Option thisMetadata, + List keyPrefixes) { if (keyPrefixes.size() == 1 && keyPrefixes.get(0).length() < 1) { return; } Option>> lastInstant = getLastCompletedTxnInstantAndMetadata(metaClient); if (lastInstant.isPresent() && thisMetadata.isPresent()) { - Stream keys = thisMetadata.get().getExtraMetadata().keySet().stream(); - keyPrefixes.stream().forEach(keyPrefix -> keys + Stream lastCommitMetadataKeys = lastInstant.get().getRight().keySet().stream(); + keyPrefixes.stream().forEach(keyPrefix -> lastCommitMetadataKeys .filter(key -> key.startsWith(keyPrefix)) .forEach(key -> thisMetadata.get().getExtraMetadata().put(key, lastInstant.get().getRight().get(key)))); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 736fe3b471bcb..4eaff7e5f5512 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -85,6 +85,10 @@ public class HoodieWriteConfig extends HoodieConfig { private static final long serialVersionUID = 0L; + // This is a constant as is should never be changed via config (will invalidate previous commits) + // It is here so that both the client and deltastreamer use the same reference + public static final String DELTASTREAMER_CHECKPOINT_KEY = "deltastreamer.checkpoint.key"; + public static final ConfigProperty TBL_NAME = ConfigProperty .key("hoodie.table.name") .noDefaultValue() @@ -368,11 +372,13 @@ public class HoodieWriteConfig extends HoodieConfig { + "OPTIMISTIC_CONCURRENCY_CONTROL: Multiple writers can operate on the table and exactly one of them succeed " + "if a conflict (writes affect the same file group) is detected."); - public static final ConfigProperty WRITE_META_KEY_PREFIXES = ConfigProperty - .key("hoodie.write.meta.key.prefixes") - .defaultValue("") - .withDocumentation("Comma separated metadata key prefixes to override from latest commit " - + "during overlapping commits via multi writing"); + public static final ConfigProperty WRITE_CONCURRENCY_MERGE_DELTASTREAMER_STATE = ConfigProperty + .key("hoodie.write.concurrency.merge.deltastreamer.state") + .defaultValue(false) + .withAlternatives("hoodie.write.meta.key.prefixes") + .withDocumentation("If enabled, this writer will merge Deltastreamer state from the previous checkpoint in order to allow both realtime " + + "and batch writers to ingest into a single table. This should not be enabled on Deltastreamer writers. Enabling this config means," + + "for a spark writer, deltastreamer checkpoint will be copied over from previous commit to the current one."); /** * Currently the use this to specify the write schema. @@ -783,16 +789,6 @@ public class HoodieWriteConfig extends HoodieConfig { */ @Deprecated public static final String DEFAULT_WRITE_CONCURRENCY_MODE = WRITE_CONCURRENCY_MODE.defaultValue(); - /** - * @deprecated Use {@link #WRITE_META_KEY_PREFIXES} and its methods instead - */ - @Deprecated - public static final String WRITE_META_KEY_PREFIXES_PROP = WRITE_META_KEY_PREFIXES.key(); - /** - * @deprecated Use {@link #WRITE_META_KEY_PREFIXES} and its methods instead - */ - @Deprecated - public static final String DEFAULT_WRITE_META_KEY_PREFIXES = WRITE_META_KEY_PREFIXES.defaultValue(); /** * @deprecated Use {@link #ALLOW_MULTI_WRITE_ON_SAME_INSTANT_ENABLE} and its methods instead */ @@ -1764,12 +1760,12 @@ public WriteConcurrencyMode getWriteConcurrencyMode() { return WriteConcurrencyMode.fromValue(getString(WRITE_CONCURRENCY_MODE)); } - public Boolean inlineTableServices() { - return inlineClusteringEnabled() || inlineCompactionEnabled() || isAutoClean(); + public Boolean mergeDeltastreamerStateFromPreviousCommit() { + return getBoolean(HoodieWriteConfig.WRITE_CONCURRENCY_MERGE_DELTASTREAMER_STATE); } - public String getWriteMetaKeyPrefixes() { - return getString(WRITE_META_KEY_PREFIXES); + public Boolean inlineTableServices() { + return inlineClusteringEnabled() || inlineCompactionEnabled() || isAutoClean(); } public String getPreCommitValidators() { @@ -2131,11 +2127,6 @@ public Builder withWriteConcurrencyMode(WriteConcurrencyMode concurrencyMode) { return this; } - public Builder withWriteMetaKeyPrefixes(String writeMetaKeyPrefixes) { - writeConfig.setValue(WRITE_META_KEY_PREFIXES, writeMetaKeyPrefixes); - return this; - } - public Builder withPopulateMetaFields(boolean populateMetaFields) { writeConfig.setValue(HoodieTableConfig.POPULATE_META_FIELDS, Boolean.toString(populateMetaFields)); return this; diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/TestTransactionUtils.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/TestTransactionUtils.java new file mode 100644 index 0000000000000..6a836974312fe --- /dev/null +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/TestTransactionUtils.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.utils; + +import org.apache.hadoop.fs.Path; +import org.apache.hudi.client.utils.TransactionUtils; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.testutils.HoodieCommonTestHarness; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; + +public class TestTransactionUtils extends HoodieCommonTestHarness { + + @BeforeEach + public void setUp() throws Exception { + init(); + } + + public void init() throws Exception { + initPath(); + initMetaClient(); + metaClient.getFs().mkdirs(new Path(basePath)); + } + + @Test + public void testCheckpointStateMerge() throws IOException { + HoodieActiveTimeline timeline = new HoodieActiveTimeline(metaClient); + + // Create completed commit with deltastreamer checkpoint state + HoodieInstant commitInstantWithCheckpointState = new HoodieInstant( + true, + HoodieTimeline.COMMIT_ACTION, + HoodieActiveTimeline.createNewInstantTime() + ); + timeline.createNewInstant(commitInstantWithCheckpointState); + + HoodieCommitMetadata metadataWithCheckpoint = new HoodieCommitMetadata(); + String checkpointVal = "00001"; + metadataWithCheckpoint.addMetadata(HoodieWriteConfig.DELTASTREAMER_CHECKPOINT_KEY, checkpointVal); + timeline.saveAsComplete( + commitInstantWithCheckpointState, + Option.of(metadataWithCheckpoint.toJsonString().getBytes(StandardCharsets.UTF_8)) + ); + + // Inflight commit without checkpoint metadata + HoodieInstant commitInstantWithoutCheckpointState = new HoodieInstant( + true, + HoodieTimeline.COMMIT_ACTION, + HoodieActiveTimeline.createNewInstantTime() + ); + timeline.createNewInstant(commitInstantWithoutCheckpointState); + HoodieCommitMetadata metadataWithoutCheckpoint = new HoodieCommitMetadata(); + + // Ensure that checkpoint state is merged in from previous completed commit + MockTransactionUtils.assertCheckpointStateWasMerged(metaClient, metadataWithoutCheckpoint, checkpointVal); + } + + private static class MockTransactionUtils extends TransactionUtils { + + public static void assertCheckpointStateWasMerged( + HoodieTableMetaClient metaClient, + HoodieCommitMetadata currentMetadata, + String expectedCheckpointState) { + TransactionUtils.mergeCheckpointStateFromPreviousCommit(metaClient, Option.of(currentMetadata)); + assertEquals( + expectedCheckpointState, + currentMetadata.getExtraMetadata().get(HoodieWriteConfig.DELTASTREAMER_CHECKPOINT_KEY) + ); + } + } +} diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index 6632dce86d953..b5239e9293593 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -106,6 +106,7 @@ import static org.apache.hudi.config.HoodieWriteConfig.AUTO_COMMIT_ENABLE; import static org.apache.hudi.config.HoodieWriteConfig.COMBINE_BEFORE_INSERT; import static org.apache.hudi.config.HoodieWriteConfig.COMBINE_BEFORE_UPSERT; +import static org.apache.hudi.config.HoodieWriteConfig.WRITE_CONCURRENCY_MERGE_DELTASTREAMER_STATE; import static org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_KEY; import static org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_RESET_KEY; import static org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE; @@ -715,6 +716,11 @@ private HoodieWriteConfig getHoodieClientConfig(Schema schema) { String.format("%s should be set to %s", COMBINE_BEFORE_INSERT.key(), cfg.filterDupes)); ValidationUtils.checkArgument(config.shouldCombineBeforeUpsert(), String.format("%s should be set to %s", COMBINE_BEFORE_UPSERT.key(), combineBeforeUpsert)); + ValidationUtils.checkArgument(!config.mergeDeltastreamerStateFromPreviousCommit(), + String.format( + "Deltastreamer processes should not merge state from previous deltastreamer commits. Please unset '%s'", + WRITE_CONCURRENCY_MERGE_DELTASTREAMER_STATE.key()) + ); return config; } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java index ec65d684fc5bb..1649759f72528 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java @@ -44,6 +44,7 @@ import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieClusteringConfig; +import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.hive.HiveSyncTool; @@ -90,7 +91,7 @@ public class HoodieDeltaStreamer implements Serializable { private static final long serialVersionUID = 1L; private static final Logger LOG = LogManager.getLogger(HoodieDeltaStreamer.class); - public static final String CHECKPOINT_KEY = "deltastreamer.checkpoint.key"; + public static final String CHECKPOINT_KEY = HoodieWriteConfig.DELTASTREAMER_CHECKPOINT_KEY; public static final String CHECKPOINT_RESET_KEY = "deltastreamer.checkpoint.reset_key"; protected final transient Config cfg; diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerWithMultiWriter.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerWithMultiWriter.java index 3cdf5f9027a10..e67dfcca113df 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerWithMultiWriter.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerWithMultiWriter.java @@ -19,6 +19,7 @@ package org.apache.hudi.utilities.functional; +import org.apache.hudi.DataSourceWriteOptions; import org.apache.hudi.common.config.LockConfiguration; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.model.HoodieCommitMetadata; @@ -27,6 +28,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.config.HoodieCompactionConfig; +import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode; import org.apache.hudi.testutils.SparkClientFunctionalTestHarness; import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer; @@ -35,6 +37,7 @@ import org.apache.hudi.utilities.testutils.sources.config.SourceConfigs; import org.apache.hadoop.fs.FileSystem; +import org.apache.spark.sql.SaveMode; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Tag; import org.junit.jupiter.params.ParameterizedTest; @@ -71,6 +74,8 @@ @Tag("functional") public class TestHoodieDeltaStreamerWithMultiWriter extends SparkClientFunctionalTestHarness { + private static final String COW_TEST_TABLE_NAME = "testtable_COPY_ON_WRITE"; + String basePath; String propsFilePath; String tableBasePath; @@ -154,7 +159,6 @@ void testLatestCheckpointCarryOverWithMultipleWriters(HoodieTableType tableType) // NOTE : Overriding the LockProvider to FileSystemBasedLockProviderTestClass since Zookeeper locks work in unit test but fail on Jenkins with connection timeouts setUpTestTable(tableType); prepareInitialConfigs(fs(), basePath, "foo"); - // enable carrying forward latest checkpoint TypedProperties props = prepareMultiWriterProps(fs(), basePath, propsFilePath); props.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.FileSystemBasedLockProviderTestClass"); props.setProperty("hoodie.write.lock.filesystem.path", tableBasePath); @@ -171,36 +175,61 @@ void testLatestCheckpointCarryOverWithMultipleWriters(HoodieTableType tableType) propsFilePath, Collections.singletonList(TestHoodieDeltaStreamer.TripsWithDistanceTransformer.class.getName())); cfgBackfillJob.continuousMode = false; HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(hadoopConf()).setBasePath(tableBasePath).build(); + HoodieTimeline timeline = meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); HoodieCommitMetadata commitMetadataForFirstInstant = HoodieCommitMetadata .fromBytes(timeline.getInstantDetails(timeline.firstInstant().get()).get(), HoodieCommitMetadata.class); - // get current checkpoint after preparing base dataset with some commits - HoodieCommitMetadata commitMetadataForLastInstant = HoodieCommitMetadata - .fromBytes(timeline.getInstantDetails(timeline.lastInstant().get()).get(), HoodieCommitMetadata.class); - String lastCheckpointBeforeParallelBackfill = commitMetadataForLastInstant.getMetadata(CHECKPOINT_KEY); - - // run the backfill job, enable overriding checkpoint from the latest commit + // run the backfill job props = prepareMultiWriterProps(fs(), basePath, propsFilePath); props.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.FileSystemBasedLockProviderTestClass"); props.setProperty("hoodie.write.lock.filesystem.path", tableBasePath); - props.setProperty("hoodie.write.meta.key.prefixes", CHECKPOINT_KEY); UtilitiesTestBase.Helpers.savePropsToDFS(props, fs(), propsFilePath); - // reset checkpoint to first instant to simulate a random checkpoint for backfill job - // checkpoint will move from 00000 to 00001 for this backfill job - cfgBackfillJob.checkpoint = commitMetadataForFirstInstant.getMetadata(CHECKPOINT_KEY); + // get current checkpoint after preparing base dataset with some commits + HoodieCommitMetadata commitMetadataForLastInstant = getLatestMetadata(meta); + + // Set checkpoint to the last successful position + cfgBackfillJob.checkpoint = commitMetadataForLastInstant.getMetadata(CHECKPOINT_KEY); cfgBackfillJob.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords)); cfgBackfillJob.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key())); HoodieDeltaStreamer backfillJob = new HoodieDeltaStreamer(cfgBackfillJob, jsc()); backfillJob.sync(); - // check if the checkpoint is carried over - timeline = meta.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants(); - commitMetadataForLastInstant = HoodieCommitMetadata - .fromBytes(timeline.getInstantDetails(timeline.lastInstant().get()).get(), HoodieCommitMetadata.class); - String lastCheckpointAfterParallelBackfill = commitMetadataForLastInstant.getMetadata(CHECKPOINT_KEY); - Assertions.assertEquals(lastCheckpointBeforeParallelBackfill, lastCheckpointAfterParallelBackfill); + // Save the checkpoint information from the deltastreamer run and perform next write + String checkpointAfterDeltaSync = getLatestMetadata(meta).getMetadata(CHECKPOINT_KEY); + // this writer will enable HoodieWriteConfig.WRITE_CONCURRENCY_MERGE_DELTASTREAMER_STATE.key() so that deltastreamer checkpoint will be carried over. + performWriteWithDeltastreamerStateMerge(); + + // Verify that the checkpoint is carried over + HoodieCommitMetadata commitMetaAfterDatasourceWrite = getLatestMetadata(meta); + Assertions.assertEquals(checkpointAfterDeltaSync, commitMetaAfterDatasourceWrite.getMetadata(CHECKPOINT_KEY)); + } + + /** + * Performs a hudi datasource write with deltastreamer state merge enabled. + */ + private void performWriteWithDeltastreamerStateMerge() { + spark().read() + .format("hudi") + .load(tableBasePath + "/*/*.parquet") + .limit(1) + .write() + .format("hudi") + .option(HoodieWriteConfig.TBL_NAME.key(), COW_TEST_TABLE_NAME) + .option(DataSourceWriteOptions.OPERATION().key(), DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL()) + .option(DataSourceWriteOptions.INSERT_DROP_DUPS().key(), "true") + .option(DataSourceWriteOptions.RECORDKEY_FIELD().key(), "_row_key") + .option(DataSourceWriteOptions.PRECOMBINE_FIELD().key(), "timestamp") + .option(HoodieWriteConfig.WRITE_CONCURRENCY_MERGE_DELTASTREAMER_STATE.key(), "true") + .mode(SaveMode.Append) + .save(tableBasePath + "/*/*.parquet"); + } + + private static HoodieCommitMetadata getLatestMetadata(HoodieTableMetaClient meta) throws IOException { + HoodieTimeline timeline = meta.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants(); + return HoodieCommitMetadata + .fromBytes(timeline.getInstantDetails(timeline.lastInstant().get()).get(), HoodieCommitMetadata.class); } private static TypedProperties prepareMultiWriterProps(FileSystem fs, String basePath, String propsFilePath) throws IOException {