From 5e2e2ae5282a35594a1fb2509b4fbd2501f0e5fc Mon Sep 17 00:00:00 2001 From: sivabalan Date: Sat, 19 Mar 2022 08:01:04 -0700 Subject: [PATCH 1/5] Adding support to shutdown deltastreamer gracefully when certain conditions are met with continuous mode --- .../deltastreamer/HoodieDeltaStreamer.java | 18 ++++++++++++++++++ .../functional/TestHoodieDeltaStreamer.java | 14 +++++++++++++- .../hudi/utilities/sources/TestDataSource.java | 8 +++++++- 3 files changed, 38 insertions(+), 2 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java index 824c7375fa07a..bc203bf208dde 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java @@ -403,6 +403,14 @@ public static class Config implements Serializable { + "https://spark.apache.org/docs/latest/job-scheduling.html") public Integer clusterSchedulingMinShare = 0; + @Parameter(names = {"--enable-shutdown-with-continuous-mode"}, description = "Enable graceful shutdown with continuous mode on certain conditions") + public Boolean enableShutdownOnContinousMode = false; + + @Parameter(names = {"--num-times-no-new-data-before-shutdown-with-continuous-mode"}, description = "When --enable-shutdown-with-continuous-mode is enabled, " + + "this config will determine when to shutdown deltastreamer if no new data is found in the source. If (numTimesNoNewDataBeforeShutdownWithContinuousMode) times," + + " no new data is encountered, deltastreamer will shutdown gracefully") + public Integer numTimesNoNewDataBeforeShutdownWithContinuousMode = 3; + public boolean isAsyncCompactionEnabled() { return continuousMode && !forceDisableCompaction && HoodieTableType.MERGE_ON_READ.equals(HoodieTableType.valueOf(tableType)); @@ -603,6 +611,8 @@ public static class DeltaSyncService extends HoodieAsyncService { */ private transient DeltaSync deltaSync; + private long numTimesNoNewData = 0; + public DeltaSyncService(Config cfg, JavaSparkContext jssc, FileSystem fs, Configuration conf, Option properties) throws IOException { this.cfg = cfg; @@ -695,6 +705,14 @@ protected Pair startService() { } } } + if (cfg.enableShutdownOnContinousMode) { + // check if deltastreamer need to be shutdown + numTimesNoNewData = scheduledCompactionInstantAndRDD.isPresent() ? 0 : numTimesNoNewData + 1; + if (numTimesNoNewData >= cfg.numTimesNoNewDataBeforeShutdownWithContinuousMode) { + error = true; + throw new HoodieException("Shutting down on continuous mode condition met. Shutting down delta sync."); + } + } long toSleepMs = cfg.minSyncIntervalSeconds * 1000 - (System.currentTimeMillis() - start); if (toSleepMs > 0) { LOG.info("Last sync ran less than min sync interval: " + cfg.minSyncIntervalSeconds + " s, sleep: " diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java index 0576f6aaee88b..5920f6f8840d7 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java @@ -738,18 +738,30 @@ public void testUpsertsCOWContinuousMode() throws Exception { testUpsertsContinuousMode(HoodieTableType.COPY_ON_WRITE, "continuous_cow"); } + @Test + public void testUpsertsCOWContinuousModeShutdownGracefully() throws Exception { + testUpsertsContinuousMode(HoodieTableType.COPY_ON_WRITE, "continuous_cow", true); + } + @Test public void testUpsertsMORContinuousMode() throws Exception { testUpsertsContinuousMode(HoodieTableType.MERGE_ON_READ, "continuous_mor"); } private void testUpsertsContinuousMode(HoodieTableType tableType, String tempDir) throws Exception { + testUpsertsContinuousMode(tableType, tempDir, false); + } + + private void testUpsertsContinuousMode(HoodieTableType tableType, String tempDir, boolean testShutdownGracefully) throws Exception { String tableBasePath = dfsBasePath + "/" + tempDir; // Keep it higher than batch-size to test continuous mode int totalRecords = 3000; // Initial bulk insert HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT); cfg.continuousMode = true; + if (testShutdownGracefully) { + cfg.enableShutdownOnContinousMode = true; + } cfg.tableType = tableType.name(); cfg.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords)); cfg.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key())); @@ -763,7 +775,7 @@ private void testUpsertsContinuousMode(HoodieTableType tableType, String tempDir } TestHelpers.assertRecordCount(totalRecords, tableBasePath, sqlContext); TestHelpers.assertDistanceCount(totalRecords, tableBasePath, sqlContext); - return true; + return !testShutdownGracefully; }); } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestDataSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestDataSource.java index 1806d5c48b06d..9cd36a95b4c86 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestDataSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestDataSource.java @@ -39,6 +39,8 @@ public class TestDataSource extends AbstractBaseTestSource { private static final Logger LOG = LogManager.getLogger(TestDataSource.class); + public static transient boolean returnEmptyBatch = false; + private static int counter = 0; public TestDataSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession, SchemaProvider schemaProvider) { @@ -54,9 +56,13 @@ protected InputBatch> fetchNewData(Option lastChe LOG.info("Source Limit is set to " + sourceLimit); // No new data. - if (sourceLimit <= 0) { + if (sourceLimit <= 0 || returnEmptyBatch) { + LOG.warn("Return no new data from Test Data source " + counter + ", source limit " + sourceLimit); return new InputBatch<>(Option.empty(), lastCheckpointStr.orElse(null)); + } else { + LOG.warn("REturning valid data from Test Data source " + counter + ", source limit " + sourceLimit); } + counter++; List records = fetchNextBatch(props, (int) sourceLimit, instantTime, DEFAULT_PARTITION_NUM).collect(Collectors.toList()); From 724f1f824dfefff0071e4a3fcded35daefb1e01d Mon Sep 17 00:00:00 2001 From: sivabalan Date: Sun, 24 Apr 2022 14:18:00 -0400 Subject: [PATCH 2/5] Adding termination strategy for deltastreamer --- .../deltastreamer/HoodieDeltaStreamer.java | 23 ++++---- .../NoNewDataTerminationStrategy.java | 57 +++++++++++++++++++ .../PostWriteTerminationStrategy.java | 40 +++++++++++++ .../TerminationStrategyUtils.java | 45 +++++++++++++++ .../functional/TestHoodieDeltaStreamer.java | 40 +++++++++++-- .../utilities/sources/TestDataSource.java | 2 +- 6 files changed, 190 insertions(+), 17 deletions(-) create mode 100644 hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/NoNewDataTerminationStrategy.java create mode 100644 hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/PostWriteTerminationStrategy.java create mode 100644 hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/TerminationStrategyUtils.java diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java index bc203bf208dde..1d4dd87b4c47b 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java @@ -403,13 +403,11 @@ public static class Config implements Serializable { + "https://spark.apache.org/docs/latest/job-scheduling.html") public Integer clusterSchedulingMinShare = 0; - @Parameter(names = {"--enable-shutdown-with-continuous-mode"}, description = "Enable graceful shutdown with continuous mode on certain conditions") - public Boolean enableShutdownOnContinousMode = false; + @Parameter(names = {"--enable-post-write-termination-strategy"}, description = "Enable graceful shutdown with continuous mode on certain conditions") + public Boolean enablePostWriteTerminationStrategy = false; - @Parameter(names = {"--num-times-no-new-data-before-shutdown-with-continuous-mode"}, description = "When --enable-shutdown-with-continuous-mode is enabled, " - + "this config will determine when to shutdown deltastreamer if no new data is found in the source. If (numTimesNoNewDataBeforeShutdownWithContinuousMode) times," - + " no new data is encountered, deltastreamer will shutdown gracefully") - public Integer numTimesNoNewDataBeforeShutdownWithContinuousMode = 3; + @Parameter(names = {"--post-write-termination-strategy-class"}, description = "Post writer termination strategy class to gracefully shutdown deltastreamer in continuous mode") + public String postWriteTerminationStrategyClass = ""; public boolean isAsyncCompactionEnabled() { return continuousMode && !forceDisableCompaction @@ -611,7 +609,7 @@ public static class DeltaSyncService extends HoodieAsyncService { */ private transient DeltaSync deltaSync; - private long numTimesNoNewData = 0; + private final Option postWriteTerminationStrategy; public DeltaSyncService(Config cfg, JavaSparkContext jssc, FileSystem fs, Configuration conf, Option properties) throws IOException { @@ -620,6 +618,8 @@ public DeltaSyncService(Config cfg, JavaSparkContext jssc, FileSystem fs, Config this.sparkSession = SparkSession.builder().config(jssc.getConf()).getOrCreate(); this.asyncCompactService = Option.empty(); this.asyncClusteringService = Option.empty(); + this.postWriteTerminationStrategy = cfg.enablePostWriteTerminationStrategy ? TerminationStrategyUtils.createPostWriteTerminationStrategy(properties.get(), cfg.postWriteTerminationStrategyClass) : + Option.empty(); if (fs.exists(new Path(cfg.targetBasePath))) { HoodieTableMetaClient meta = @@ -705,12 +705,11 @@ protected Pair startService() { } } } - if (cfg.enableShutdownOnContinousMode) { - // check if deltastreamer need to be shutdown - numTimesNoNewData = scheduledCompactionInstantAndRDD.isPresent() ? 0 : numTimesNoNewData + 1; - if (numTimesNoNewData >= cfg.numTimesNoNewDataBeforeShutdownWithContinuousMode) { + // check if deltastreamer need to be shutdown + if (postWriteTerminationStrategy.isPresent()) { + if (postWriteTerminationStrategy.get().shouldShutdown(scheduledCompactionInstantAndRDD)) { error = true; - throw new HoodieException("Shutting down on continuous mode condition met. Shutting down delta sync."); + shutdown(false); } } long toSleepMs = cfg.minSyncIntervalSeconds * 1000 - (System.currentTimeMillis() - start); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/NoNewDataTerminationStrategy.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/NoNewDataTerminationStrategy.java new file mode 100644 index 0000000000000..9d7682e97839d --- /dev/null +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/NoNewDataTerminationStrategy.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.utilities.deltastreamer; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; + +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaRDD; + +/** + * Post writer termination strategy for deltastreamer in continuous mode. This strategy is based on no new data for consecutive number of times. + */ +public class NoNewDataTerminationStrategy implements PostWriteTerminationStrategy { + + private static final Logger LOG = LogManager.getLogger(NoNewDataTerminationStrategy.class); + + public static final String NUM_TIMES_NO_NEW_DATA_TO_SHUTDOWN = "no.new.data.termination.strategy.num.times.no.new.data.to.shutdown"; + public static final int DEFAULT_NUM_TIMES_NO_NEW_DATA_TO_SHUTDOWN = 3; + + private final int numTimesNoNewDataToShutdown; + private int numTimesNoNewData = 0; + + public NoNewDataTerminationStrategy(TypedProperties properties) { + numTimesNoNewDataToShutdown = properties.getInteger(NUM_TIMES_NO_NEW_DATA_TO_SHUTDOWN, DEFAULT_NUM_TIMES_NO_NEW_DATA_TO_SHUTDOWN); + } + + @Override + public boolean shouldShutdown(Option, JavaRDD>> scheduledCompactionInstantAndWriteStatuses) { + numTimesNoNewData = scheduledCompactionInstantAndWriteStatuses.isPresent() ? 0 : numTimesNoNewData + 1; + if (numTimesNoNewData >= numTimesNoNewDataToShutdown) { + LOG.info("Shutting down on continuous mode as there is no new data for " + numTimesNoNewData); + return true; + } + return false; + } +} diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/PostWriteTerminationStrategy.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/PostWriteTerminationStrategy.java new file mode 100644 index 0000000000000..897483ce6fa4b --- /dev/null +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/PostWriteTerminationStrategy.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.utilities.deltastreamer; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; + +import org.apache.spark.api.java.JavaRDD; + +/** + * Post write termination strategy for deltastreamer in continuous mode. + */ +public interface PostWriteTerminationStrategy { + + /** + * Returns whether deltastreamer needs to be shutdown. + * @param scheduledCompactionInstantAndWriteStatuses optional pair of scheduled compaction instant and write statuses. + * @return true if deltastreamer has to be shutdown. false otherwise. + */ + boolean shouldShutdown(Option, JavaRDD>> scheduledCompactionInstantAndWriteStatuses); + +} diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/TerminationStrategyUtils.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/TerminationStrategyUtils.java new file mode 100644 index 0000000000000..1b046a0db0da2 --- /dev/null +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/TerminationStrategyUtils.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.utilities.deltastreamer; + +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ReflectionUtils; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.exception.HoodieException; + +public class TerminationStrategyUtils { + + /** + * Create a PostWriteTerminationStrategy class via reflection, + *
+ * if the class name of PostWriteTerminationStrategy is configured through the {@link HoodieDeltaStreamer.Config#postWriteTerminationStrategyClass}. + */ + public static Option createPostWriteTerminationStrategy(TypedProperties properties, String postWriteTerminationStrategyClass) + throws HoodieException { + try { + return StringUtils.isNullOrEmpty(postWriteTerminationStrategyClass) + ? Option.empty() : + Option.of((PostWriteTerminationStrategy) ReflectionUtils.loadClass(postWriteTerminationStrategyClass, properties)); + } catch (Throwable e) { + throw new HoodieException("Could not create PostWritTerminationStrategy class " + postWriteTerminationStrategyClass, e); + } + } +} diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java index 5920f6f8840d7..4e1d78a000680 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java @@ -60,6 +60,7 @@ import org.apache.hudi.utilities.HoodieIndexer; import org.apache.hudi.utilities.deltastreamer.DeltaSync; import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer; +import org.apache.hudi.utilities.deltastreamer.NoNewDataTerminationStrategy; import org.apache.hudi.utilities.schema.FilebasedSchemaProvider; import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.schema.SparkAvroPostProcessor; @@ -760,7 +761,8 @@ private void testUpsertsContinuousMode(HoodieTableType tableType, String tempDir HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT); cfg.continuousMode = true; if (testShutdownGracefully) { - cfg.enableShutdownOnContinousMode = true; + cfg.enablePostWriteTerminationStrategy = true; + cfg.postWriteTerminationStrategyClass = NoNewDataTerminationStrategy.class.getName(); } cfg.tableType = tableType.name(); cfg.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords)); @@ -775,7 +777,10 @@ private void testUpsertsContinuousMode(HoodieTableType tableType, String tempDir } TestHelpers.assertRecordCount(totalRecords, tableBasePath, sqlContext); TestHelpers.assertDistanceCount(totalRecords, tableBasePath, sqlContext); - return !testShutdownGracefully; + if (testShutdownGracefully) { + TestDataSource.returnEmptyBatch = true; + } + return true; }); } @@ -793,8 +798,35 @@ static void deltaStreamerTestRunner(HoodieDeltaStreamer ds, HoodieDeltaStreamer. } }); TestHelpers.waitTillCondition(condition, dsFuture, 360); - ds.shutdownGracefully(); - dsFuture.get(); + if (cfg.enablePostWriteTerminationStrategy) { + awaitDeltaStreamerShutdown(ds); + } else { + ds.shutdownGracefully(); + dsFuture.get(); + } + } + + static void awaitDeltaStreamerShutdown(HoodieDeltaStreamer ds) throws InterruptedException { + // await until deltastreamer shuts down on its own + boolean shutDownRequested = false; + int timeSoFar = 0; + while(!shutDownRequested) { + shutDownRequested = ds.getDeltaSyncService().isShutdownRequested(); + Thread.sleep(500); + timeSoFar += 500; + if (timeSoFar > (2 * 60 * 1000)) { + Assertions.fail("Deltastreamer should have shutdown by now"); + } + } + boolean shutdownComplete = false; + while(!shutdownComplete) { + shutdownComplete = ds.getDeltaSyncService().isShutdown(); + Thread.sleep(500); + timeSoFar += 500; + if (timeSoFar > (2 * 60 * 1000)) { + Assertions.fail("Deltastreamer should have shutdown by now"); + } + } } static void deltaStreamerTestRunner(HoodieDeltaStreamer ds, Function condition) throws Exception { diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestDataSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestDataSource.java index 9cd36a95b4c86..fa3e1beab44a6 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestDataSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestDataSource.java @@ -60,7 +60,7 @@ protected InputBatch> fetchNewData(Option lastChe LOG.warn("Return no new data from Test Data source " + counter + ", source limit " + sourceLimit); return new InputBatch<>(Option.empty(), lastCheckpointStr.orElse(null)); } else { - LOG.warn("REturning valid data from Test Data source " + counter + ", source limit " + sourceLimit); + LOG.warn("Returning valid data from Test Data source " + counter + ", source limit " + sourceLimit); } counter++; From d745aeb46e224ddd4b95cdabdd3742198a90725f Mon Sep 17 00:00:00 2001 From: sivabalan Date: Wed, 4 May 2022 10:10:25 -0400 Subject: [PATCH 3/5] addressing feedback --- .../utilities/deltastreamer/HoodieDeltaStreamer.java | 11 +++++------ .../deltastreamer/NoNewDataTerminationStrategy.java | 10 +++++----- .../deltastreamer/PostWriteTerminationStrategy.java | 4 ++-- 3 files changed, 12 insertions(+), 13 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java index 1d4dd87b4c47b..7a688b50c7097 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java @@ -43,6 +43,7 @@ import org.apache.hudi.common.util.ClusteringUtils; import org.apache.hudi.common.util.CompactionUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieClusteringConfig; @@ -403,9 +404,6 @@ public static class Config implements Serializable { + "https://spark.apache.org/docs/latest/job-scheduling.html") public Integer clusterSchedulingMinShare = 0; - @Parameter(names = {"--enable-post-write-termination-strategy"}, description = "Enable graceful shutdown with continuous mode on certain conditions") - public Boolean enablePostWriteTerminationStrategy = false; - @Parameter(names = {"--post-write-termination-strategy-class"}, description = "Post writer termination strategy class to gracefully shutdown deltastreamer in continuous mode") public String postWriteTerminationStrategyClass = ""; @@ -618,8 +616,8 @@ public DeltaSyncService(Config cfg, JavaSparkContext jssc, FileSystem fs, Config this.sparkSession = SparkSession.builder().config(jssc.getConf()).getOrCreate(); this.asyncCompactService = Option.empty(); this.asyncClusteringService = Option.empty(); - this.postWriteTerminationStrategy = cfg.enablePostWriteTerminationStrategy ? TerminationStrategyUtils.createPostWriteTerminationStrategy(properties.get(), cfg.postWriteTerminationStrategyClass) : - Option.empty(); + this.postWriteTerminationStrategy = StringUtils.isNullOrEmpty(cfg.postWriteTerminationStrategyClass) ? Option.empty() : + TerminationStrategyUtils.createPostWriteTerminationStrategy(properties.get(), cfg.postWriteTerminationStrategyClass); if (fs.exists(new Path(cfg.targetBasePath))) { HoodieTableMetaClient meta = @@ -707,7 +705,8 @@ protected Pair startService() { } // check if deltastreamer need to be shutdown if (postWriteTerminationStrategy.isPresent()) { - if (postWriteTerminationStrategy.get().shouldShutdown(scheduledCompactionInstantAndRDD)) { + if (postWriteTerminationStrategy.get().shouldShutdown(scheduledCompactionInstantAndRDD.isPresent() ? Option.of(scheduledCompactionInstantAndRDD.get().getRight()) : + Option.empty())) { error = true; shutdown(false); } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/NoNewDataTerminationStrategy.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/NoNewDataTerminationStrategy.java index 9d7682e97839d..3a00e096487a6 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/NoNewDataTerminationStrategy.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/NoNewDataTerminationStrategy.java @@ -35,19 +35,19 @@ public class NoNewDataTerminationStrategy implements PostWriteTerminationStrateg private static final Logger LOG = LogManager.getLogger(NoNewDataTerminationStrategy.class); - public static final String NUM_TIMES_NO_NEW_DATA_TO_SHUTDOWN = "no.new.data.termination.strategy.num.times.no.new.data.to.shutdown"; - public static final int DEFAULT_NUM_TIMES_NO_NEW_DATA_TO_SHUTDOWN = 3; + public static final String MAX_ROUNDS_WITHOUT_NEW_DATA_TO_SHUTDOWN = "max.rounds.without.new.data.to.shutdown"; + public static final int DEFAULT_MAX_ROUNDS_WITHOUT_NEW_DATA_TO_SHUTDOWN = 3; private final int numTimesNoNewDataToShutdown; private int numTimesNoNewData = 0; public NoNewDataTerminationStrategy(TypedProperties properties) { - numTimesNoNewDataToShutdown = properties.getInteger(NUM_TIMES_NO_NEW_DATA_TO_SHUTDOWN, DEFAULT_NUM_TIMES_NO_NEW_DATA_TO_SHUTDOWN); + numTimesNoNewDataToShutdown = properties.getInteger(MAX_ROUNDS_WITHOUT_NEW_DATA_TO_SHUTDOWN, DEFAULT_MAX_ROUNDS_WITHOUT_NEW_DATA_TO_SHUTDOWN); } @Override - public boolean shouldShutdown(Option, JavaRDD>> scheduledCompactionInstantAndWriteStatuses) { - numTimesNoNewData = scheduledCompactionInstantAndWriteStatuses.isPresent() ? 0 : numTimesNoNewData + 1; + public boolean shouldShutdown(Option> writeStatuses) { + numTimesNoNewData = writeStatuses.isPresent() ? 0 : numTimesNoNewData + 1; if (numTimesNoNewData >= numTimesNoNewDataToShutdown) { LOG.info("Shutting down on continuous mode as there is no new data for " + numTimesNoNewData); return true; diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/PostWriteTerminationStrategy.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/PostWriteTerminationStrategy.java index 897483ce6fa4b..60983b63f1a23 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/PostWriteTerminationStrategy.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/PostWriteTerminationStrategy.java @@ -32,9 +32,9 @@ public interface PostWriteTerminationStrategy { /** * Returns whether deltastreamer needs to be shutdown. - * @param scheduledCompactionInstantAndWriteStatuses optional pair of scheduled compaction instant and write statuses. + * @param writeStatuses optional pair of scheduled compaction instant and write statuses. * @return true if deltastreamer has to be shutdown. false otherwise. */ - boolean shouldShutdown(Option, JavaRDD>> scheduledCompactionInstantAndWriteStatuses); + boolean shouldShutdown(Option> writeStatuses); } From 8186aba0e312a7a713c1fe3d9cd2d9201d30d90a Mon Sep 17 00:00:00 2001 From: sivabalan Date: Wed, 4 May 2022 10:39:59 -0400 Subject: [PATCH 4/5] minor fixes --- .../deltastreamer/NoNewDataTerminationStrategy.java | 1 - .../deltastreamer/PostWriteTerminationStrategy.java | 1 - .../hudi/utilities/functional/TestHoodieDeltaStreamer.java | 7 +++---- 3 files changed, 3 insertions(+), 6 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/NoNewDataTerminationStrategy.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/NoNewDataTerminationStrategy.java index 3a00e096487a6..2701ce4bc3085 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/NoNewDataTerminationStrategy.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/NoNewDataTerminationStrategy.java @@ -22,7 +22,6 @@ import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.collection.Pair; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/PostWriteTerminationStrategy.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/PostWriteTerminationStrategy.java index 60983b63f1a23..61f55428f166a 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/PostWriteTerminationStrategy.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/PostWriteTerminationStrategy.java @@ -21,7 +21,6 @@ import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.collection.Pair; import org.apache.spark.api.java.JavaRDD; diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java index 4e1d78a000680..eba447077b218 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java @@ -761,7 +761,6 @@ private void testUpsertsContinuousMode(HoodieTableType tableType, String tempDir HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT); cfg.continuousMode = true; if (testShutdownGracefully) { - cfg.enablePostWriteTerminationStrategy = true; cfg.postWriteTerminationStrategyClass = NoNewDataTerminationStrategy.class.getName(); } cfg.tableType = tableType.name(); @@ -798,7 +797,7 @@ static void deltaStreamerTestRunner(HoodieDeltaStreamer ds, HoodieDeltaStreamer. } }); TestHelpers.waitTillCondition(condition, dsFuture, 360); - if (cfg.enablePostWriteTerminationStrategy) { + if (!cfg.postWriteTerminationStrategyClass.isEmpty()) { awaitDeltaStreamerShutdown(ds); } else { ds.shutdownGracefully(); @@ -810,7 +809,7 @@ static void awaitDeltaStreamerShutdown(HoodieDeltaStreamer ds) throws Interrupte // await until deltastreamer shuts down on its own boolean shutDownRequested = false; int timeSoFar = 0; - while(!shutDownRequested) { + while (!shutDownRequested) { shutDownRequested = ds.getDeltaSyncService().isShutdownRequested(); Thread.sleep(500); timeSoFar += 500; @@ -819,7 +818,7 @@ static void awaitDeltaStreamerShutdown(HoodieDeltaStreamer ds) throws Interrupte } } boolean shutdownComplete = false; - while(!shutdownComplete) { + while (!shutdownComplete) { shutdownComplete = ds.getDeltaSyncService().isShutdown(); Thread.sleep(500); timeSoFar += 500; From a1322fbeb11fe5bb71cd5d70f13147bf8a036996 Mon Sep 17 00:00:00 2001 From: sivabalan Date: Thu, 5 May 2022 19:52:40 -0400 Subject: [PATCH 5/5] fixing tests --- .../hudi/utilities/functional/TestHoodieDeltaStreamer.java | 2 +- .../java/org/apache/hudi/utilities/sources/TestDataSource.java | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java index eba447077b218..3eaec56cc2764 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java @@ -797,7 +797,7 @@ static void deltaStreamerTestRunner(HoodieDeltaStreamer ds, HoodieDeltaStreamer. } }); TestHelpers.waitTillCondition(condition, dsFuture, 360); - if (!cfg.postWriteTerminationStrategyClass.isEmpty()) { + if (cfg != null && !cfg.postWriteTerminationStrategyClass.isEmpty()) { awaitDeltaStreamerShutdown(ds); } else { ds.shutdownGracefully(); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestDataSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestDataSource.java index fa3e1beab44a6..a5a39dbe2d09e 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestDataSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestDataSource.java @@ -39,13 +39,14 @@ public class TestDataSource extends AbstractBaseTestSource { private static final Logger LOG = LogManager.getLogger(TestDataSource.class); - public static transient boolean returnEmptyBatch = false; + public static boolean returnEmptyBatch = false; private static int counter = 0; public TestDataSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession, SchemaProvider schemaProvider) { super(props, sparkContext, sparkSession, schemaProvider); initDataGen(); + returnEmptyBatch = false; } @Override