diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java index 824c7375fa07a..7a688b50c7097 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java @@ -43,6 +43,7 @@ import org.apache.hudi.common.util.ClusteringUtils; import org.apache.hudi.common.util.CompactionUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieClusteringConfig; @@ -403,6 +404,9 @@ public static class Config implements Serializable { + "https://spark.apache.org/docs/latest/job-scheduling.html") public Integer clusterSchedulingMinShare = 0; + @Parameter(names = {"--post-write-termination-strategy-class"}, description = "Post writer termination strategy class to gracefully shutdown deltastreamer in continuous mode") + public String postWriteTerminationStrategyClass = ""; + public boolean isAsyncCompactionEnabled() { return continuousMode && !forceDisableCompaction && HoodieTableType.MERGE_ON_READ.equals(HoodieTableType.valueOf(tableType)); @@ -603,6 +607,8 @@ public static class DeltaSyncService extends HoodieAsyncService { */ private transient DeltaSync deltaSync; + private final Option postWriteTerminationStrategy; + public DeltaSyncService(Config cfg, JavaSparkContext jssc, FileSystem fs, Configuration conf, Option properties) throws IOException { this.cfg = cfg; @@ -610,6 +616,8 @@ public DeltaSyncService(Config cfg, JavaSparkContext jssc, FileSystem fs, Config this.sparkSession = SparkSession.builder().config(jssc.getConf()).getOrCreate(); this.asyncCompactService = Option.empty(); this.asyncClusteringService = Option.empty(); + this.postWriteTerminationStrategy = StringUtils.isNullOrEmpty(cfg.postWriteTerminationStrategyClass) ? Option.empty() : + TerminationStrategyUtils.createPostWriteTerminationStrategy(properties.get(), cfg.postWriteTerminationStrategyClass); if (fs.exists(new Path(cfg.targetBasePath))) { HoodieTableMetaClient meta = @@ -695,6 +703,14 @@ protected Pair startService() { } } } + // check if deltastreamer need to be shutdown + if (postWriteTerminationStrategy.isPresent()) { + if (postWriteTerminationStrategy.get().shouldShutdown(scheduledCompactionInstantAndRDD.isPresent() ? Option.of(scheduledCompactionInstantAndRDD.get().getRight()) : + Option.empty())) { + error = true; + shutdown(false); + } + } long toSleepMs = cfg.minSyncIntervalSeconds * 1000 - (System.currentTimeMillis() - start); if (toSleepMs > 0) { LOG.info("Last sync ran less than min sync interval: " + cfg.minSyncIntervalSeconds + " s, sleep: " diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/NoNewDataTerminationStrategy.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/NoNewDataTerminationStrategy.java new file mode 100644 index 0000000000000..2701ce4bc3085 --- /dev/null +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/NoNewDataTerminationStrategy.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.utilities.deltastreamer; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.util.Option; + +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaRDD; + +/** + * Post writer termination strategy for deltastreamer in continuous mode. This strategy is based on no new data for consecutive number of times. + */ +public class NoNewDataTerminationStrategy implements PostWriteTerminationStrategy { + + private static final Logger LOG = LogManager.getLogger(NoNewDataTerminationStrategy.class); + + public static final String MAX_ROUNDS_WITHOUT_NEW_DATA_TO_SHUTDOWN = "max.rounds.without.new.data.to.shutdown"; + public static final int DEFAULT_MAX_ROUNDS_WITHOUT_NEW_DATA_TO_SHUTDOWN = 3; + + private final int numTimesNoNewDataToShutdown; + private int numTimesNoNewData = 0; + + public NoNewDataTerminationStrategy(TypedProperties properties) { + numTimesNoNewDataToShutdown = properties.getInteger(MAX_ROUNDS_WITHOUT_NEW_DATA_TO_SHUTDOWN, DEFAULT_MAX_ROUNDS_WITHOUT_NEW_DATA_TO_SHUTDOWN); + } + + @Override + public boolean shouldShutdown(Option> writeStatuses) { + numTimesNoNewData = writeStatuses.isPresent() ? 0 : numTimesNoNewData + 1; + if (numTimesNoNewData >= numTimesNoNewDataToShutdown) { + LOG.info("Shutting down on continuous mode as there is no new data for " + numTimesNoNewData); + return true; + } + return false; + } +} diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/PostWriteTerminationStrategy.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/PostWriteTerminationStrategy.java new file mode 100644 index 0000000000000..61f55428f166a --- /dev/null +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/PostWriteTerminationStrategy.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.utilities.deltastreamer; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.util.Option; + +import org.apache.spark.api.java.JavaRDD; + +/** + * Post write termination strategy for deltastreamer in continuous mode. + */ +public interface PostWriteTerminationStrategy { + + /** + * Returns whether deltastreamer needs to be shutdown. + * @param writeStatuses optional pair of scheduled compaction instant and write statuses. + * @return true if deltastreamer has to be shutdown. false otherwise. + */ + boolean shouldShutdown(Option> writeStatuses); + +} diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/TerminationStrategyUtils.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/TerminationStrategyUtils.java new file mode 100644 index 0000000000000..1b046a0db0da2 --- /dev/null +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/TerminationStrategyUtils.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.utilities.deltastreamer; + +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ReflectionUtils; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.exception.HoodieException; + +public class TerminationStrategyUtils { + + /** + * Create a PostWriteTerminationStrategy class via reflection, + *
+ * if the class name of PostWriteTerminationStrategy is configured through the {@link HoodieDeltaStreamer.Config#postWriteTerminationStrategyClass}. + */ + public static Option createPostWriteTerminationStrategy(TypedProperties properties, String postWriteTerminationStrategyClass) + throws HoodieException { + try { + return StringUtils.isNullOrEmpty(postWriteTerminationStrategyClass) + ? Option.empty() : + Option.of((PostWriteTerminationStrategy) ReflectionUtils.loadClass(postWriteTerminationStrategyClass, properties)); + } catch (Throwable e) { + throw new HoodieException("Could not create PostWritTerminationStrategy class " + postWriteTerminationStrategyClass, e); + } + } +} diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java index 0576f6aaee88b..3eaec56cc2764 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java @@ -60,6 +60,7 @@ import org.apache.hudi.utilities.HoodieIndexer; import org.apache.hudi.utilities.deltastreamer.DeltaSync; import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer; +import org.apache.hudi.utilities.deltastreamer.NoNewDataTerminationStrategy; import org.apache.hudi.utilities.schema.FilebasedSchemaProvider; import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.schema.SparkAvroPostProcessor; @@ -738,18 +739,30 @@ public void testUpsertsCOWContinuousMode() throws Exception { testUpsertsContinuousMode(HoodieTableType.COPY_ON_WRITE, "continuous_cow"); } + @Test + public void testUpsertsCOWContinuousModeShutdownGracefully() throws Exception { + testUpsertsContinuousMode(HoodieTableType.COPY_ON_WRITE, "continuous_cow", true); + } + @Test public void testUpsertsMORContinuousMode() throws Exception { testUpsertsContinuousMode(HoodieTableType.MERGE_ON_READ, "continuous_mor"); } private void testUpsertsContinuousMode(HoodieTableType tableType, String tempDir) throws Exception { + testUpsertsContinuousMode(tableType, tempDir, false); + } + + private void testUpsertsContinuousMode(HoodieTableType tableType, String tempDir, boolean testShutdownGracefully) throws Exception { String tableBasePath = dfsBasePath + "/" + tempDir; // Keep it higher than batch-size to test continuous mode int totalRecords = 3000; // Initial bulk insert HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT); cfg.continuousMode = true; + if (testShutdownGracefully) { + cfg.postWriteTerminationStrategyClass = NoNewDataTerminationStrategy.class.getName(); + } cfg.tableType = tableType.name(); cfg.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords)); cfg.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key())); @@ -763,6 +776,9 @@ private void testUpsertsContinuousMode(HoodieTableType tableType, String tempDir } TestHelpers.assertRecordCount(totalRecords, tableBasePath, sqlContext); TestHelpers.assertDistanceCount(totalRecords, tableBasePath, sqlContext); + if (testShutdownGracefully) { + TestDataSource.returnEmptyBatch = true; + } return true; }); } @@ -781,8 +797,35 @@ static void deltaStreamerTestRunner(HoodieDeltaStreamer ds, HoodieDeltaStreamer. } }); TestHelpers.waitTillCondition(condition, dsFuture, 360); - ds.shutdownGracefully(); - dsFuture.get(); + if (cfg != null && !cfg.postWriteTerminationStrategyClass.isEmpty()) { + awaitDeltaStreamerShutdown(ds); + } else { + ds.shutdownGracefully(); + dsFuture.get(); + } + } + + static void awaitDeltaStreamerShutdown(HoodieDeltaStreamer ds) throws InterruptedException { + // await until deltastreamer shuts down on its own + boolean shutDownRequested = false; + int timeSoFar = 0; + while (!shutDownRequested) { + shutDownRequested = ds.getDeltaSyncService().isShutdownRequested(); + Thread.sleep(500); + timeSoFar += 500; + if (timeSoFar > (2 * 60 * 1000)) { + Assertions.fail("Deltastreamer should have shutdown by now"); + } + } + boolean shutdownComplete = false; + while (!shutdownComplete) { + shutdownComplete = ds.getDeltaSyncService().isShutdown(); + Thread.sleep(500); + timeSoFar += 500; + if (timeSoFar > (2 * 60 * 1000)) { + Assertions.fail("Deltastreamer should have shutdown by now"); + } + } } static void deltaStreamerTestRunner(HoodieDeltaStreamer ds, Function condition) throws Exception { diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestDataSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestDataSource.java index 1806d5c48b06d..a5a39dbe2d09e 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestDataSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestDataSource.java @@ -39,11 +39,14 @@ public class TestDataSource extends AbstractBaseTestSource { private static final Logger LOG = LogManager.getLogger(TestDataSource.class); + public static boolean returnEmptyBatch = false; + private static int counter = 0; public TestDataSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession, SchemaProvider schemaProvider) { super(props, sparkContext, sparkSession, schemaProvider); initDataGen(); + returnEmptyBatch = false; } @Override @@ -54,9 +57,13 @@ protected InputBatch> fetchNewData(Option lastChe LOG.info("Source Limit is set to " + sourceLimit); // No new data. - if (sourceLimit <= 0) { + if (sourceLimit <= 0 || returnEmptyBatch) { + LOG.warn("Return no new data from Test Data source " + counter + ", source limit " + sourceLimit); return new InputBatch<>(Option.empty(), lastCheckpointStr.orElse(null)); + } else { + LOG.warn("Returning valid data from Test Data source " + counter + ", source limit " + sourceLimit); } + counter++; List records = fetchNextBatch(props, (int) sourceLimit, instantTime, DEFAULT_PARTITION_NUM).collect(Collectors.toList());