diff --git a/hudi-client/src/main/java/org/apache/hudi/async/AbstractAsyncService.java b/hudi-client/src/main/java/org/apache/hudi/async/AbstractAsyncService.java index 7ac236d10fc0e..714fa6019990d 100644 --- a/hudi-client/src/main/java/org/apache/hudi/async/AbstractAsyncService.java +++ b/hudi-client/src/main/java/org/apache/hudi/async/AbstractAsyncService.java @@ -48,9 +48,16 @@ public abstract class AbstractAsyncService implements Serializable { private transient ExecutorService executor; // Future tracking delta-sync/compaction private transient CompletableFuture future; + // Run in daemon mode + private final boolean runInDaemonMode; protected AbstractAsyncService() { + this(false); + } + + protected AbstractAsyncService(boolean runInDaemonMode) { shutdownRequested = false; + this.runInDaemonMode = runInDaemonMode; } protected boolean isShutdownRequested() { @@ -129,7 +136,11 @@ public void start(Function onShutdownCallback) { */ private void monitorThreads(Function onShutdownCallback) { LOG.info("Submitting monitor thread !!"); - Executors.newSingleThreadExecutor().submit(() -> { + Executors.newSingleThreadExecutor(r -> { + Thread t = new Thread(r, "Monitor Thread"); + t.setDaemon(isRunInDaemonMode()); + return t; + }).submit(() -> { boolean error = false; try { LOG.info("Monitoring thread(s) !!"); @@ -137,18 +148,21 @@ private void monitorThreads(Function onShutdownCallback) { } catch (ExecutionException ex) { LOG.error("Monitor noticed one or more threads failed. Requesting graceful shutdown of other threads", ex); error = true; - shutdown(false); } catch (InterruptedException ie) { LOG.error("Got interrupted Monitoring threads", ie); error = true; - shutdown(false); } finally { // Mark as shutdown shutdown = true; if (null != onShutdownCallback) { onShutdownCallback.apply(error); } + shutdown(false); } }); } + + public boolean isRunInDaemonMode() { + return runInDaemonMode; + } } diff --git a/hudi-client/src/main/java/org/apache/hudi/async/AsyncCompactService.java b/hudi-client/src/main/java/org/apache/hudi/async/AsyncCompactService.java new file mode 100644 index 0000000000000..1c39bb6cf2ead --- /dev/null +++ b/hudi-client/src/main/java/org/apache/hudi/async/AsyncCompactService.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.async; + +import org.apache.hudi.client.Compactor; +import org.apache.hudi.client.HoodieWriteClient; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaSparkContext; + +import java.io.IOException; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.IntStream; + +/** + * Async Compactor Service that runs in separate thread. Currently, only one compactor is allowed to run at any time. + */ +public class AsyncCompactService extends AbstractAsyncService { + + private static final long serialVersionUID = 1L; + private static final Logger LOG = LogManager.getLogger(AsyncCompactService.class); + + /** + * This is the job pool used by async compaction. + */ + public static final String COMPACT_POOL_NAME = "hoodiecompact"; + + private final int maxConcurrentCompaction; + private transient Compactor compactor; + private transient JavaSparkContext jssc; + private transient BlockingQueue pendingCompactions = new LinkedBlockingQueue<>(); + private transient ReentrantLock queueLock = new ReentrantLock(); + private transient Condition consumed = queueLock.newCondition(); + + public AsyncCompactService(JavaSparkContext jssc, HoodieWriteClient client) { + this(jssc, client, false); + } + + public AsyncCompactService(JavaSparkContext jssc, HoodieWriteClient client, boolean runInDaemonMode) { + super(runInDaemonMode); + this.jssc = jssc; + this.compactor = new Compactor(client, jssc); + this.maxConcurrentCompaction = 1; + } + + /** + * Enqueues new Pending compaction. + */ + public void enqueuePendingCompaction(HoodieInstant instant) { + pendingCompactions.add(instant); + } + + /** + * Wait till outstanding pending compactions reduces to the passed in value. + * + * @param numPendingCompactions Maximum pending compactions allowed + * @throws InterruptedException + */ + public void waitTillPendingCompactionsReducesTo(int numPendingCompactions) throws InterruptedException { + try { + queueLock.lock(); + while (!isShutdown() && (pendingCompactions.size() > numPendingCompactions)) { + consumed.await(); + } + } finally { + queueLock.unlock(); + } + } + + /** + * Fetch Next pending compaction if available. + * + * @return + * @throws InterruptedException + */ + private HoodieInstant fetchNextCompactionInstant() throws InterruptedException { + LOG.info("Compactor waiting for next instant for compaction upto 60 seconds"); + HoodieInstant instant = pendingCompactions.poll(10, TimeUnit.SECONDS); + if (instant != null) { + try { + queueLock.lock(); + // Signal waiting thread + consumed.signal(); + } finally { + queueLock.unlock(); + } + } + return instant; + } + + /** + * Start Compaction Service. + */ + @Override + protected Pair startService() { + ExecutorService executor = Executors.newFixedThreadPool(maxConcurrentCompaction, + r -> { + Thread t = new Thread(r, "async_compact_thread"); + t.setDaemon(isRunInDaemonMode()); + return t; + }); + return Pair.of(CompletableFuture.allOf(IntStream.range(0, maxConcurrentCompaction).mapToObj(i -> CompletableFuture.supplyAsync(() -> { + try { + // Set Compactor Pool Name for allowing users to prioritize compaction + LOG.info("Setting Spark Pool name for compaction to " + COMPACT_POOL_NAME); + jssc.setLocalProperty("spark.scheduler.pool", COMPACT_POOL_NAME); + + while (!isShutdownRequested()) { + final HoodieInstant instant = fetchNextCompactionInstant(); + + if (null != instant) { + LOG.info("Starting Compaction for instant " + instant); + compactor.compact(instant); + LOG.info("Finished Compaction for instant " + instant); + } + } + LOG.info("Compactor shutting down properly!!"); + } catch (InterruptedException ie) { + LOG.warn("Compactor executor thread got interrupted exception. Stopping", ie); + } catch (IOException e) { + LOG.error("Compactor executor failed", e); + throw new HoodieIOException(e.getMessage(), e); + } + return true; + }, executor)).toArray(CompletableFuture[]::new)), executor); + } + + + /** + * Check whether compactor thread needs to be stopped. + * @return + */ + protected boolean shouldStopCompactor() { + return false; + } +} diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/Compactor.java b/hudi-client/src/main/java/org/apache/hudi/client/Compactor.java similarity index 94% rename from hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/Compactor.java rename to hudi-client/src/main/java/org/apache/hudi/client/Compactor.java index 4c23537e0393c..1f11620c7363a 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/Compactor.java +++ b/hudi-client/src/main/java/org/apache/hudi/client/Compactor.java @@ -16,10 +16,8 @@ * limitations under the License. */ -package org.apache.hudi.utilities.deltastreamer; +package org.apache.hudi.client; -import org.apache.hudi.client.HoodieWriteClient; -import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieException; diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java index 1ead5fff087ff..6b6074b135952 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java @@ -599,6 +599,26 @@ public List generateUpdates(String instantTime, Integer n) throws return updates; } + /** + * Generate update for each record in the dataset. + * @param instantTime + * @return + * @throws IOException + */ + public List generateUpdatesForAllRecords(String instantTime) { + List updates = new ArrayList<>(); + Map existingKeys = existingKeysBySchema.get(TRIP_EXAMPLE_SCHEMA); + existingKeys.values().forEach(kp -> { + try { + HoodieRecord record = generateUpdateRecord(kp.key, instantTime); + updates.add(record); + } catch (IOException ioe) { + throw new HoodieIOException(ioe.getMessage(), ioe); + } + }); + return updates; + } + public List generateUpdatesAsPerSchema(String commitTime, Integer n, String schemaStr) { return generateUniqueUpdatesStream(commitTime, n, schemaStr).collect(Collectors.toList()); } diff --git a/hudi-integ-test/src/test/java/org/apache/hudi/integ/HoodieTestHiveBase.java b/hudi-integ-test/src/test/java/org/apache/hudi/integ/HoodieTestHiveBase.java index 170ca608f21d9..95e4c0117cea9 100644 --- a/hudi-integ-test/src/test/java/org/apache/hudi/integ/HoodieTestHiveBase.java +++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/HoodieTestHiveBase.java @@ -72,8 +72,8 @@ public void generateDataByHoodieJavaApp(String hiveTableName, String tableType, } // Run Hoodie Java App - String cmd = String.format("%s %s --hive-sync --table-path %s --hive-url %s --table-type %s --hive-table %s" + - " --commit-type %s --table-name %s", HOODIE_JAVA_APP, "HoodieJavaGenerateApp", hdfsUrl, HIVE_SERVER_JDBC_URL, + String cmd = String.format("%s --hive-sync --table-path %s --hive-url %s --table-type %s --hive-table %s" + + " --commit-type %s --table-name %s", HOODIE_GENERATE_APP, hdfsUrl, HIVE_SERVER_JDBC_URL, tableType, hiveTableName, commitType, hoodieTableName); if (partitionType == PartitionType.MULTI_KEYS_PARTITIONED) { cmd = cmd + " --use-multi-partition-keys"; diff --git a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestBase.java b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestBase.java index 7b70857a9adcc..2b1f792b86c5d 100644 --- a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestBase.java +++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestBase.java @@ -58,6 +58,8 @@ public abstract class ITTestBase { protected static final String PRESTO_COORDINATOR = "/presto-coordinator-1"; protected static final String HOODIE_WS_ROOT = "/var/hoodie/ws"; protected static final String HOODIE_JAVA_APP = HOODIE_WS_ROOT + "/hudi-spark/run_hoodie_app.sh"; + protected static final String HOODIE_GENERATE_APP = HOODIE_WS_ROOT + "/hudi-spark/run_hoodie_generate_app.sh"; + protected static final String HOODIE_JAVA_STREAMING_APP = HOODIE_WS_ROOT + "/hudi-spark/run_hoodie_streaming_app.sh"; protected static final String HUDI_HADOOP_BUNDLE = HOODIE_WS_ROOT + "/docker/hoodie/hadoop/hive_base/target/hoodie-hadoop-mr-bundle.jar"; protected static final String HUDI_HIVE_SYNC_BUNDLE = diff --git a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieSanity.java b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieSanity.java index 4b586a34212ac..c7787a7a6e8cc 100644 --- a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieSanity.java +++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieSanity.java @@ -23,11 +23,12 @@ import org.apache.hudi.common.util.collection.Pair; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; - /** * Smoke tests to run as part of verification. */ @@ -37,27 +38,31 @@ enum PartitionType { SINGLE_KEY_PARTITIONED, MULTI_KEYS_PARTITIONED, NON_PARTITIONED, } - @Test + @ParameterizedTest + @ValueSource(strings = { HOODIE_JAVA_APP, HOODIE_JAVA_STREAMING_APP }) /** * A basic integration test that runs HoodieJavaApp to create a sample COW Hoodie with single partition key data-set * and performs upserts on it. Hive integration and upsert functionality is checked by running a count query in hive * console. */ - public void testRunHoodieJavaAppOnSinglePartitionKeyCOWTable() throws Exception { + public void testRunHoodieJavaAppOnSinglePartitionKeyCOWTable(String command) throws Exception { String hiveTableName = "docker_hoodie_single_partition_key_cow_test"; - testRunHoodieJavaApp(hiveTableName, HoodieTableType.COPY_ON_WRITE.name(), PartitionType.SINGLE_KEY_PARTITIONED); + testRunHoodieJavaApp(command, hiveTableName, HoodieTableType.COPY_ON_WRITE.name(), + PartitionType.SINGLE_KEY_PARTITIONED); dropHiveTables(hiveTableName, HoodieTableType.COPY_ON_WRITE.name()); } - @Test + @ParameterizedTest + @ValueSource(strings = { HOODIE_JAVA_APP, HOODIE_JAVA_STREAMING_APP }) /** * A basic integration test that runs HoodieJavaApp to create a sample COW Hoodie with multiple partition-keys * data-set and performs upserts on it. Hive integration and upsert functionality is checked by running a count query * in hive console. */ - public void testRunHoodieJavaAppOnMultiPartitionKeysCOWTable() throws Exception { + public void testRunHoodieJavaAppOnMultiPartitionKeysCOWTable(String command) throws Exception { String hiveTableName = "docker_hoodie_multi_partition_key_cow_test"; - testRunHoodieJavaApp(hiveTableName, HoodieTableType.COPY_ON_WRITE.name(), PartitionType.MULTI_KEYS_PARTITIONED); + testRunHoodieJavaApp(command, hiveTableName, HoodieTableType.COPY_ON_WRITE.name(), + PartitionType.MULTI_KEYS_PARTITIONED); dropHiveTables(hiveTableName, HoodieTableType.COPY_ON_WRITE.name()); } @@ -73,27 +78,31 @@ public void testRunHoodieJavaAppOnNonPartitionedCOWTable() throws Exception { dropHiveTables(hiveTableName, HoodieTableType.COPY_ON_WRITE.name()); } - @Test + @ParameterizedTest + @ValueSource(strings = { HOODIE_JAVA_APP, HOODIE_JAVA_STREAMING_APP }) /** * A basic integration test that runs HoodieJavaApp to create a sample MOR Hoodie with single partition key data-set * and performs upserts on it. Hive integration and upsert functionality is checked by running a count query in hive * console. */ - public void testRunHoodieJavaAppOnSinglePartitionKeyMORTable() throws Exception { + public void testRunHoodieJavaAppOnSinglePartitionKeyMORTable(String command) throws Exception { String hiveTableName = "docker_hoodie_single_partition_key_mor_test"; - testRunHoodieJavaApp(hiveTableName, HoodieTableType.MERGE_ON_READ.name(), PartitionType.SINGLE_KEY_PARTITIONED); + testRunHoodieJavaApp(command, hiveTableName, HoodieTableType.MERGE_ON_READ.name(), + PartitionType.SINGLE_KEY_PARTITIONED); dropHiveTables(hiveTableName, HoodieTableType.MERGE_ON_READ.name()); } - @Test + @ParameterizedTest + @ValueSource(strings = { HOODIE_JAVA_APP, HOODIE_JAVA_STREAMING_APP }) /** * A basic integration test that runs HoodieJavaApp to create a sample MOR Hoodie with multiple partition-keys * data-set and performs upserts on it. Hive integration and upsert functionality is checked by running a count query * in hive console. */ - public void testRunHoodieJavaAppOnMultiPartitionKeysMORTable() throws Exception { + public void testRunHoodieJavaAppOnMultiPartitionKeysMORTable(String command) throws Exception { String hiveTableName = "docker_hoodie_multi_partition_key_mor_test"; - testRunHoodieJavaApp(hiveTableName, HoodieTableType.MERGE_ON_READ.name(), PartitionType.MULTI_KEYS_PARTITIONED); + testRunHoodieJavaApp(command, hiveTableName, HoodieTableType.MERGE_ON_READ.name(), + PartitionType.MULTI_KEYS_PARTITIONED); dropHiveTables(hiveTableName, HoodieTableType.MERGE_ON_READ.name()); } @@ -114,7 +123,7 @@ public void testRunHoodieJavaAppOnNonPartitionedMORTable() throws Exception { * Hive integration and upsert functionality is checked by running a count query in hive console. TODO: Add * spark-shell test-case */ - public void testRunHoodieJavaApp(String hiveTableName, String tableType, PartitionType partitionType) + public void testRunHoodieJavaApp(String command, String hiveTableName, String tableType, PartitionType partitionType) throws Exception { String hdfsPath = "/" + hiveTableName; @@ -137,13 +146,13 @@ public void testRunHoodieJavaApp(String hiveTableName, String tableType, Partiti // Run Hoodie Java App String cmd; if (partitionType == PartitionType.SINGLE_KEY_PARTITIONED) { - cmd = HOODIE_JAVA_APP + " HoodieJavaApp --hive-sync --table-path " + hdfsUrl + " --hive-url " + HIVE_SERVER_JDBC_URL + cmd = command + " --hive-sync --table-path " + hdfsUrl + " --hive-url " + HIVE_SERVER_JDBC_URL + " --table-type " + tableType + " --hive-table " + hiveTableName; } else if (partitionType == PartitionType.MULTI_KEYS_PARTITIONED) { - cmd = HOODIE_JAVA_APP + " HoodieJavaApp --hive-sync --table-path " + hdfsUrl + " --hive-url " + HIVE_SERVER_JDBC_URL + cmd = command + " --hive-sync --table-path " + hdfsUrl + " --hive-url " + HIVE_SERVER_JDBC_URL + " --table-type " + tableType + " --hive-table " + hiveTableName + " --use-multi-partition-keys"; } else { - cmd = HOODIE_JAVA_APP + " HoodieJavaApp --hive-sync --table-path " + hdfsUrl + " --hive-url " + HIVE_SERVER_JDBC_URL + cmd = command + " --hive-sync --table-path " + hdfsUrl + " --hive-url " + HIVE_SERVER_JDBC_URL + " --table-type " + tableType + " --hive-table " + hiveTableName + " --non-partitioned"; } executeCommandStringInDocker(ADHOC_1_CONTAINER, cmd, true); @@ -182,6 +191,11 @@ public void testRunHoodieJavaApp(String hiveTableName, String tableType, Partiti "Expecting 280 rows to be present in the new table"); } + public void testRunHoodieJavaApp(String hiveTableName, String tableType, PartitionType partitionType) + throws Exception { + testRunHoodieJavaApp(HOODIE_JAVA_APP, hiveTableName, tableType, partitionType); + } + private void dropHiveTables(String hiveTableName, String tableType) throws Exception { if (tableType.equals(HoodieTableType.MERGE_ON_READ.name())) { executeHiveCommand("drop table if exists " + hiveTableName + "_rt"); diff --git a/hudi-spark/run_hoodie_app.sh b/hudi-spark/run_hoodie_app.sh index e2acc6cf5c307..7c63e7411eb48 100755 --- a/hudi-spark/run_hoodie_app.sh +++ b/hudi-spark/run_hoodie_app.sh @@ -37,4 +37,4 @@ fi OTHER_JARS=`ls -1 $DIR/target/lib/*jar | grep -v '*avro*-1.' | tr '\n' ':'` #TODO - Need to move TestDataGenerator and HoodieJavaApp out of tests echo "Running command : java -cp $DIR/target/test-classes/:$DIR/../hudi-client/target/test-classes/:${HADOOP_CONF_DIR}:$HUDI_JAR:${CLIENT_JAR}:$OTHER_JARS $@" -java -Xmx1G -cp $DIR/target/test-classes/:$DIR/../hudi-client/target/test-classes/:${HADOOP_CONF_DIR}:$HUDI_JAR:${CLIENT_JAR}:$OTHER_JARS "$@" +java -Xmx1G -cp $DIR/target/test-classes/:$DIR/../hudi-client/target/test-classes/:${HADOOP_CONF_DIR}:$HUDI_JAR:${CLIENT_JAR}:$OTHER_JARS HoodieJavaApp "$@" diff --git a/hudi-spark/run_hoodie_generate_app.sh b/hudi-spark/run_hoodie_generate_app.sh new file mode 100755 index 0000000000000..a4b4090b049a8 --- /dev/null +++ b/hudi-spark/run_hoodie_generate_app.sh @@ -0,0 +1,40 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +function error_exit { + echo "$1" >&2 ## Send message to stderr. Exclude >&2 if you don't want it that way. + exit "${2:-1}" ## Return a code specified by $2 or 1 by default. +} + +DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" +#Ensure we pick the right jar even for hive11 builds +HUDI_JAR=`ls -c $DIR/../packaging/hudi-spark-bundle/target/hudi-spark-bundle*.jar | grep -v source | head -1` + +if [ -z "$HADOOP_CONF_DIR" ]; then + echo "setting hadoop conf dir" + HADOOP_CONF_DIR="/etc/hadoop/conf" +fi + +if [ -z "$CLIENT_JAR" ]; then + echo "client jar location not set" +fi + +OTHER_JARS=`ls -1 $DIR/target/lib/*jar | grep -v '*avro*-1.' | tr '\n' ':'` +#TODO - Need to move TestDataGenerator and HoodieJavaApp out of tests +echo "Running command : java -cp $DIR/target/test-classes/:$DIR/../hudi-client/target/test-classes/:${HADOOP_CONF_DIR}:$HUDI_JAR:${CLIENT_JAR}:$OTHER_JARS $@" +java -Xmx1G -cp $DIR/target/test-classes/:$DIR/../hudi-client/target/test-classes/:${HADOOP_CONF_DIR}:$HUDI_JAR:${CLIENT_JAR}:$OTHER_JARS HoodieJavaGenerateApp "$@" diff --git a/hudi-spark/run_hoodie_streaming_app.sh b/hudi-spark/run_hoodie_streaming_app.sh new file mode 100755 index 0000000000000..01f1a4e4a13f0 --- /dev/null +++ b/hudi-spark/run_hoodie_streaming_app.sh @@ -0,0 +1,40 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +function error_exit { + echo "$1" >&2 ## Send message to stderr. Exclude >&2 if you don't want it that way. + exit "${2:-1}" ## Return a code specified by $2 or 1 by default. +} + +DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" +#Ensure we pick the right jar even for hive11 builds +HUDI_JAR=`ls -c $DIR/../packaging/hudi-spark-bundle/target/hudi-spark-bundle*.jar | grep -v source | head -1` + +if [ -z "$HADOOP_CONF_DIR" ]; then + echo "setting hadoop conf dir" + HADOOP_CONF_DIR="/etc/hadoop/conf" +fi + +if [ -z "$CLIENT_JAR" ]; then + echo "client jar location not set" +fi + +OTHER_JARS=`ls -1 $DIR/target/lib/*jar | grep -v '*avro*-1.' | tr '\n' ':'` +#TODO - Need to move TestDataGenerator and HoodieJavaApp out of tests +echo "Running command : java -cp $DIR/target/test-classes/:$DIR/../hudi-client/target/test-classes/:${HADOOP_CONF_DIR}:$HUDI_JAR:${CLIENT_JAR}:$OTHER_JARS HoodieJavaStreamingApp $@" +java -Xmx1G -cp $DIR/target/test-classes/:$DIR/../hudi-client/target/test-classes/:${HADOOP_CONF_DIR}:$HUDI_JAR:${CLIENT_JAR}:$OTHER_JARS HoodieJavaStreamingApp "$@" diff --git a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java index 3345204faf80b..5647e65d3f129 100644 --- a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java +++ b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java @@ -226,11 +226,16 @@ public static void checkRequiredProperties(TypedProperties props, List c } public static HoodieWriteClient createHoodieClient(JavaSparkContext jssc, String schemaStr, String basePath, - String tblName, Map parameters) { - + String tblName, Map parameters) { + boolean asyncCompact = Boolean.parseBoolean(parameters.get(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE_KEY())); // inline compaction is on by default for MOR - boolean inlineCompact = parameters.get(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY()) + boolean inlineCompact = !asyncCompact && parameters.get(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY()) .equals(DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL()); + return createHoodieClient(jssc, schemaStr, basePath, tblName, parameters, inlineCompact); + } + + public static HoodieWriteClient createHoodieClient(JavaSparkContext jssc, String schemaStr, String basePath, + String tblName, Map parameters, boolean inlineCompact) { // insert/bulk-insert combining to be true, if filtering for duplicates boolean combineInserts = Boolean.parseBoolean(parameters.get(DataSourceWriteOptions.INSERT_DROP_DUPS_OPT_KEY())); diff --git a/hudi-spark/src/main/java/org/apache/hudi/async/SparkStreamingAsyncCompactService.java b/hudi-spark/src/main/java/org/apache/hudi/async/SparkStreamingAsyncCompactService.java new file mode 100644 index 0000000000000..ae0ad7357b907 --- /dev/null +++ b/hudi-spark/src/main/java/org/apache/hudi/async/SparkStreamingAsyncCompactService.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.async; + +import org.apache.hudi.client.HoodieWriteClient; +import org.apache.spark.api.java.JavaSparkContext; + +/** + * Async Compaction Service used by Structured Streaming. Here, async compaction is run in daemon mode to prevent + * blocking shutting down the Spark application. + */ +public class SparkStreamingAsyncCompactService extends AsyncCompactService { + + private static final long serialVersionUID = 1L; + + public SparkStreamingAsyncCompactService(JavaSparkContext jssc, HoodieWriteClient client) { + super(jssc, client, true); + } +} diff --git a/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala index 8a8f87fed3233..c505ec4979f2a 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -281,4 +281,8 @@ object DataSourceWriteOptions { val DEFAULT_HIVE_ASSUME_DATE_PARTITION_OPT_VAL = "false" val DEFAULT_USE_PRE_APACHE_INPUT_FORMAT_OPT_VAL = "false" val DEFAULT_HIVE_USE_JDBC_OPT_VAL = "true" + + // Async Compaction - Enabled by default for MOR + val ASYNC_COMPACT_ENABLE_KEY = "hoodie.datasource.compaction.async.enable" + val DEFAULT_ASYNC_COMPACT_ENABLE_VAL = "true" } diff --git a/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala b/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala index fbdd4ea9cfb1b..e26c1c86f427c 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala +++ b/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala @@ -18,6 +18,8 @@ package org.apache.hudi import org.apache.hudi.DataSourceReadOptions._ +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.exception.HoodieException import org.apache.hudi.hadoop.HoodieROTablePathFilter import org.apache.log4j.LogManager @@ -103,10 +105,8 @@ class DefaultSource extends RelationProvider mode: SaveMode, optParams: Map[String, String], df: DataFrame): BaseRelation = { - val parameters = HoodieSparkSqlWriter.parametersWithWriteDefaults(optParams) HoodieSparkSqlWriter.write(sqlContext, mode, parameters, df) - new HudiEmptyRelation(sqlContext, df.schema) } diff --git a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 05ef8635a7ec6..479005b30574b 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -21,6 +21,7 @@ import java.util import org.apache.avro.Schema import org.apache.avro.generic.GenericRecord +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.hive.conf.HiveConf import org.apache.hudi.DataSourceWriteOptions._ @@ -29,7 +30,7 @@ import org.apache.hudi.client.{HoodieWriteClient, WriteStatus} import org.apache.hudi.common.config.TypedProperties import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.model.{HoodieRecordPayload, HoodieTableType} -import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} import org.apache.hudi.common.table.timeline.HoodieActiveTimeline import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.exception.HoodieException @@ -49,7 +50,13 @@ private[hudi] object HoodieSparkSqlWriter { def write(sqlContext: SQLContext, mode: SaveMode, parameters: Map[String, String], - df: DataFrame): (Boolean, common.util.Option[String]) = { + df: DataFrame, + hoodieTableConfig: Option[HoodieTableConfig] = Option.empty, + hoodieWriteClient: Option[HoodieWriteClient[HoodieRecordPayload[Nothing]]] = Option.empty, + asyncCompactionTriggerFn: Option[Function1[HoodieWriteClient[HoodieRecordPayload[Nothing]], Unit]] = Option.empty + ) + : (Boolean, common.util.Option[String], common.util.Option[String], + HoodieWriteClient[HoodieRecordPayload[Nothing]], HoodieTableConfig) = { val sparkContext = sqlContext.sparkContext val path = parameters.get("path") @@ -84,113 +91,134 @@ private[hudi] object HoodieSparkSqlWriter { val instantTime = HoodieActiveTimeline.createNewInstantTime() val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration) var exists = fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME)) - - if (exists && mode == SaveMode.Append) { - val existingTableName = new HoodieTableMetaClient(sparkContext.hadoopConfiguration, path.get).getTableConfig.getTableName - if (!existingTableName.equals(tblName)) { - throw new HoodieException(s"hoodie table with name $existingTableName already exist at $basePath") - } + var tableConfig : HoodieTableConfig = if (exists) { + hoodieTableConfig.getOrElse( + new HoodieTableMetaClient(sparkContext.hadoopConfiguration, path.get).getTableConfig) + } else { + null } - val (writeStatuses, writeClient: HoodieWriteClient[HoodieRecordPayload[Nothing]]) = - if (!operation.equalsIgnoreCase(DELETE_OPERATION_OPT_VAL)) { - // register classes & schemas - val structName = s"${tblName}_record" - val nameSpace = s"hoodie.${tblName}" - sparkContext.getConf.registerKryoClasses( - Array(classOf[org.apache.avro.generic.GenericData], - classOf[org.apache.avro.Schema])) - val schema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, nameSpace) - sparkContext.getConf.registerAvroSchemas(schema) - log.info(s"Registered avro schema : ${schema.toString(true)}") - - // Convert to RDD[HoodieRecord] - val keyGenerator = DataSourceUtils.createKeyGenerator(toProperties(parameters)) - val genericRecords: RDD[GenericRecord] = AvroConversionUtils.createRdd(df, structName, nameSpace) - val hoodieAllIncomingRecords = genericRecords.map(gr => { - val orderingVal = HoodieAvroUtils.getNestedFieldVal(gr, parameters(PRECOMBINE_FIELD_OPT_KEY), false) - .asInstanceOf[Comparable[_]] - DataSourceUtils.createHoodieRecord(gr, - orderingVal, keyGenerator.getKey(gr), parameters(PAYLOAD_CLASS_OPT_KEY)) - }).toJavaRDD() - - // Handle various save modes - if (mode == SaveMode.ErrorIfExists && exists) { - throw new HoodieException(s"hoodie table at $basePath already exists.") - } - if (mode == SaveMode.Ignore && exists) { - log.warn(s"hoodie table at $basePath already exists. Ignoring & not performing actual writes.") - (true, common.util.Option.empty()) - } - if (mode == SaveMode.Overwrite && exists) { - log.warn(s"hoodie table at $basePath already exists. Deleting existing data & overwriting with new data.") - fs.delete(basePath, true) - exists = false - } - - // Create the table if not present - if (!exists) { - //FIXME(bootstrap): bootstrapIndexClass needs to be set when bootstrap index class is integrated. - HoodieTableMetaClient.initTableTypeWithBootstrap(sparkContext.hadoopConfiguration, path.get, HoodieTableType.valueOf(tableType), - tblName, "archived", parameters(PAYLOAD_CLASS_OPT_KEY), null, null, null) - } - - // Create a HoodieWriteClient & issue the write. - val client = DataSourceUtils.createHoodieClient(jsc, schema.toString, path.get, tblName, - mapAsJavaMap(parameters) - ) - - val hoodieRecords = - if (parameters(INSERT_DROP_DUPS_OPT_KEY).toBoolean) { - DataSourceUtils.dropDuplicates(jsc, hoodieAllIncomingRecords, mapAsJavaMap(parameters)) - } else { - hoodieAllIncomingRecords - } - - if (hoodieRecords.isEmpty()) { - log.info("new batch has no new records, skipping...") - (true, common.util.Option.empty()) - } - client.startCommitWithTime(instantTime) - val writeStatuses = DataSourceUtils.doWriteOperation(client, hoodieRecords, instantTime, operation) - (writeStatuses, client) + if (mode == SaveMode.Ignore && exists) { + log.warn(s"hoodie table at $basePath already exists. Ignoring & not performing actual writes.") + (false, common.util.Option.empty(), common.util.Option.empty(), hoodieWriteClient.orNull, tableConfig) } else { - - // Handle save modes - if (mode != SaveMode.Append) { - throw new HoodieException(s"Append is the only save mode applicable for $operation operation") + if (exists && mode == SaveMode.Append) { + val existingTableName = tableConfig.getTableName + if (!existingTableName.equals(tblName)) { + throw new HoodieException(s"hoodie table with name $existingTableName already exist at $basePath") + } } + val (writeStatuses, writeClient: HoodieWriteClient[HoodieRecordPayload[Nothing]]) = + if (!operation.equalsIgnoreCase(DELETE_OPERATION_OPT_VAL)) { + // register classes & schemas + val structName = s"${tblName}_record" + val nameSpace = s"hoodie.${tblName}" + sparkContext.getConf.registerKryoClasses( + Array(classOf[org.apache.avro.generic.GenericData], + classOf[org.apache.avro.Schema])) + val schema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, nameSpace) + sparkContext.getConf.registerAvroSchemas(schema) + log.info(s"Registered avro schema : ${schema.toString(true)}") + + // Convert to RDD[HoodieRecord] + val keyGenerator = DataSourceUtils.createKeyGenerator(toProperties(parameters)) + val genericRecords: RDD[GenericRecord] = AvroConversionUtils.createRdd(df, structName, nameSpace) + val hoodieAllIncomingRecords = genericRecords.map(gr => { + val orderingVal = HoodieAvroUtils.getNestedFieldVal(gr, parameters(PRECOMBINE_FIELD_OPT_KEY), false) + .asInstanceOf[Comparable[_]] + DataSourceUtils.createHoodieRecord(gr, + orderingVal, keyGenerator.getKey(gr), + parameters(PAYLOAD_CLASS_OPT_KEY)) + }).toJavaRDD() + + // Handle various save modes + if (mode == SaveMode.ErrorIfExists && exists) { + throw new HoodieException(s"hoodie table at $basePath already exists.") + } + + if (mode == SaveMode.Overwrite && exists) { + log.warn(s"hoodie table at $basePath already exists. Deleting existing data & overwriting with new data.") + fs.delete(basePath, true) + exists = false + } + + // Create the table if not present + if (!exists) { + //FIXME(bootstrap): bootstrapIndexClass needs to be set when bootstrap index class is integrated. + val tableMetaClient = HoodieTableMetaClient.initTableTypeWithBootstrap(sparkContext.hadoopConfiguration, + path.get, HoodieTableType.valueOf(tableType), + tblName, "archived", parameters(PAYLOAD_CLASS_OPT_KEY), null, null, null) + tableConfig = tableMetaClient.getTableConfig + } + + // Create a HoodieWriteClient & issue the write. + val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc, schema.toString, path.get, + tblName, mapAsJavaMap(parameters) + )).asInstanceOf[HoodieWriteClient[HoodieRecordPayload[Nothing]]] + + if (asyncCompactionTriggerFn.isDefined && + isAsyncCompactionEnabled(client, tableConfig, parameters, jsc.hadoopConfiguration())) { + asyncCompactionTriggerFn.get.apply(client) + } + + val hoodieRecords = + if (parameters(INSERT_DROP_DUPS_OPT_KEY).toBoolean) { + DataSourceUtils.dropDuplicates(jsc, hoodieAllIncomingRecords, mapAsJavaMap(parameters)) + } else { + hoodieAllIncomingRecords + } - val structName = s"${tblName}_record" - val nameSpace = s"hoodie.${tblName}" - sparkContext.getConf.registerKryoClasses( - Array(classOf[org.apache.avro.generic.GenericData], - classOf[org.apache.avro.Schema])) - - // Convert to RDD[HoodieKey] - val keyGenerator = DataSourceUtils.createKeyGenerator(toProperties(parameters)) - val genericRecords: RDD[GenericRecord] = AvroConversionUtils.createRdd(df, structName, nameSpace) - val hoodieKeysToDelete = genericRecords.map(gr => keyGenerator.getKey(gr)).toJavaRDD() - - if (!exists) { - throw new HoodieException(s"hoodie table at $basePath does not exist") - } + if (hoodieRecords.isEmpty()) { + log.info("new batch has no new records, skipping...") + (true, common.util.Option.empty()) + } + client.startCommitWithTime(instantTime) + val writeStatuses = DataSourceUtils.doWriteOperation(client, hoodieRecords, instantTime, operation) + (writeStatuses, client) + } else { - // Create a HoodieWriteClient & issue the delete. - val client = DataSourceUtils.createHoodieClient(jsc, - Schema.create(Schema.Type.NULL).toString, path.get, tblName, - mapAsJavaMap(parameters) - ) + // Handle save modes + if (mode != SaveMode.Append) { + throw new HoodieException(s"Append is the only save mode applicable for $operation operation") + } + + val structName = s"${tblName}_record" + val nameSpace = s"hoodie.${tblName}" + sparkContext.getConf.registerKryoClasses( + Array(classOf[org.apache.avro.generic.GenericData], + classOf[org.apache.avro.Schema])) + + // Convert to RDD[HoodieKey] + val keyGenerator = DataSourceUtils.createKeyGenerator(toProperties(parameters)) + val genericRecords: RDD[GenericRecord] = AvroConversionUtils.createRdd(df, structName, nameSpace) + val hoodieKeysToDelete = genericRecords.map(gr => keyGenerator.getKey(gr)).toJavaRDD() + + if (!exists) { + throw new HoodieException(s"hoodie table at $basePath does not exist") + } + + // Create a HoodieWriteClient & issue the delete. + val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc, + Schema.create(Schema.Type.NULL).toString, path.get, tblName, + mapAsJavaMap(parameters))).asInstanceOf[HoodieWriteClient[HoodieRecordPayload[Nothing]]] + + if (asyncCompactionTriggerFn.isDefined && + isAsyncCompactionEnabled(client, tableConfig, parameters, jsc.hadoopConfiguration())) { + asyncCompactionTriggerFn.get.apply(client) + } + + // Issue deletes + client.startCommitWithTime(instantTime) + val writeStatuses = DataSourceUtils.doDeleteOperation(client, hoodieKeysToDelete, instantTime) + (writeStatuses, client) + } - // Issue deletes - client.startCommitWithTime(instantTime) - val writeStatuses = DataSourceUtils.doDeleteOperation(client, hoodieKeysToDelete, instantTime) - (writeStatuses, client) + // Check for errors and commit the write. + val (writeSuccessful, compactionInstant) = + commitAndPerformPostOperations(writeStatuses, parameters, writeClient, tableConfig, instantTime, basePath, + operation, jsc) + (writeSuccessful, common.util.Option.ofNullable(instantTime), compactionInstant, writeClient, tableConfig) } - - // Check for errors and commit the write. - val writeSuccessful = checkWriteStatus(writeStatuses, parameters, writeClient, instantTime, basePath, operation, jsc) - (writeSuccessful, common.util.Option.ofNullable(instantTime)) } /** @@ -222,7 +250,8 @@ private[hudi] object HoodieSparkSqlWriter { HIVE_PARTITION_FIELDS_OPT_KEY -> DEFAULT_HIVE_PARTITION_FIELDS_OPT_VAL, HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> DEFAULT_HIVE_PARTITION_EXTRACTOR_CLASS_OPT_VAL, HIVE_STYLE_PARTITIONING_OPT_KEY -> DEFAULT_HIVE_STYLE_PARTITIONING_OPT_VAL, - HIVE_USE_JDBC_OPT_KEY -> DEFAULT_HIVE_USE_JDBC_OPT_VAL + HIVE_USE_JDBC_OPT_KEY -> DEFAULT_HIVE_USE_JDBC_OPT_VAL, + ASYNC_COMPACT_ENABLE_KEY -> DEFAULT_ASYNC_COMPACT_ENABLE_VAL ) ++ translateStorageTypeToTableType(parameters) } @@ -258,13 +287,14 @@ private[hudi] object HoodieSparkSqlWriter { hiveSyncConfig } - private def checkWriteStatus(writeStatuses: JavaRDD[WriteStatus], - parameters: Map[String, String], - client: HoodieWriteClient[_], - instantTime: String, - basePath: Path, - operation: String, - jsc: JavaSparkContext): Boolean = { + private def commitAndPerformPostOperations(writeStatuses: JavaRDD[WriteStatus], + parameters: Map[String, String], + client: HoodieWriteClient[HoodieRecordPayload[Nothing]], + tableConfig: HoodieTableConfig, + instantTime: String, + basePath: Path, + operation: String, + jsc: JavaSparkContext): (Boolean, common.util.Option[java.lang.String]) = { val errorCount = writeStatuses.rdd.filter(ws => ws.hasErrors).count() if (errorCount == 0) { log.info("No errors. Proceeding to commit the write.") @@ -284,6 +314,15 @@ private[hudi] object HoodieSparkSqlWriter { log.info("Commit " + instantTime + " failed!") } + val asyncCompactionEnabled = isAsyncCompactionEnabled(client, tableConfig, parameters, jsc.hadoopConfiguration()) + val compactionInstant : common.util.Option[java.lang.String] = + if (asyncCompactionEnabled) { + client.scheduleCompaction(common.util.Option.of(new util.HashMap[String, String](mapAsJavaMap(metaMap)))) + } else { + common.util.Option.empty() + } + + log.info(s"Compaction Scheduled is $compactionInstant") val hiveSyncEnabled = parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean) val syncHiveSucess = if (hiveSyncEnabled) { log.info("Syncing to Hive Metastore (URL: " + parameters(HIVE_URL_OPT_KEY) + ")") @@ -292,8 +331,12 @@ private[hudi] object HoodieSparkSqlWriter { } else { true } - client.close() - commitSuccess && syncHiveSucess + + log.info(s"Is Async Compaction Enabled ? $asyncCompactionEnabled") + if (!asyncCompactionEnabled) { + client.close() + } + (commitSuccess && syncHiveSucess, compactionInstant) } else { log.error(s"$operation failed with $errorCount errors :") if (log.isTraceEnabled) { @@ -308,6 +351,18 @@ private[hudi] object HoodieSparkSqlWriter { } }) } + (false, common.util.Option.empty()) + } + } + + private def isAsyncCompactionEnabled(client: HoodieWriteClient[HoodieRecordPayload[Nothing]], + tableConfig: HoodieTableConfig, + parameters: Map[String, String], configuration: Configuration) : Boolean = { + log.info(s"Config.isInlineCompaction ? ${client.getConfig.isInlineCompaction}") + if (!client.getConfig.isInlineCompaction + && parameters.get(ASYNC_COMPACT_ENABLE_KEY).exists(r => r.toBoolean)) { + tableConfig.getTableType == HoodieTableType.MERGE_ON_READ + } else { false } } diff --git a/hudi-spark/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala b/hudi-spark/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala index 9f18a2eb3262b..1600ab02b2e2b 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala +++ b/hudi-spark/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala @@ -16,13 +16,25 @@ */ package org.apache.hudi +import java.lang +import java.util.function.{Function, Supplier} + +import org.apache.hudi.async.{AsyncCompactService, SparkStreamingAsyncCompactService} +import org.apache.hudi.client.HoodieWriteClient +import org.apache.hudi.common.model.HoodieRecordPayload +import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} +import org.apache.hudi.common.table.timeline.HoodieInstant.State +import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline} +import org.apache.hudi.common.util.CompactionUtils import org.apache.hudi.exception.HoodieCorruptedDataException import org.apache.log4j.LogManager +import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.sql.execution.streaming.Sink -import org.apache.spark.sql.streaming.OutputMode +import org.apache.spark.sql.streaming.{OutputMode, StreamingQueryListener} import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode} import scala.util.{Failure, Success, Try} +import scala.collection.JavaConversions._ class HoodieStreamingSink(sqlContext: SQLContext, options: Map[String, String], @@ -38,6 +50,8 @@ class HoodieStreamingSink(sqlContext: SQLContext, private val retryIntervalMs = options(DataSourceWriteOptions.STREAMING_RETRY_INTERVAL_MS_OPT_KEY).toLong private val ignoreFailedBatch = options(DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH_OPT_KEY).toBoolean + private var isAsyncCompactorServiceShutdownAbnormally = false + private val mode = if (outputMode == OutputMode.Append()) { SaveMode.Append @@ -45,39 +59,54 @@ class HoodieStreamingSink(sqlContext: SQLContext, SaveMode.Overwrite } - override def addBatch(batchId: Long, data: DataFrame): Unit = { + private var asyncCompactorService : AsyncCompactService = _ + private var writeClient : Option[HoodieWriteClient[HoodieRecordPayload[Nothing]]] = Option.empty + private var hoodieTableConfig : Option[HoodieTableConfig] = Option.empty + + override def addBatch(batchId: Long, data: DataFrame): Unit = this.synchronized { + if (isAsyncCompactorServiceShutdownAbnormally) { + throw new IllegalStateException("Async Compactor shutdown unexpectedly") + } + retry(retryCnt, retryIntervalMs)( Try( HoodieSparkSqlWriter.write( - sqlContext, - mode, - options, - data) + sqlContext, mode, options, data, hoodieTableConfig, writeClient, Some(triggerAsyncCompactor)) ) match { - case Success((true, commitOps)) => + case Success((true, commitOps, compactionInstantOps, client, tableConfig)) => log.info(s"Micro batch id=$batchId succeeded" + (commitOps.isPresent match { case true => s" for commit=${commitOps.get()}" case _ => s" with no new commits" })) - Success((true, commitOps)) + writeClient = Some(client) + hoodieTableConfig = Some(tableConfig) + if (compactionInstantOps.isPresent) { + asyncCompactorService.enqueuePendingCompaction( + new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, compactionInstantOps.get())) + } + Success((true, commitOps, compactionInstantOps)) case Failure(e) => // clean up persist rdds in the write process data.sparkSession.sparkContext.getPersistentRDDs .foreach { case (id, rdd) => - rdd.unpersist() + try { + rdd.unpersist() + } catch { + case t: Exception => log.warn("Got excepting trying to unpersist rdd", t) + } } - log.error(s"Micro batch id=$batchId threw following expection: ", e) + log.error(s"Micro batch id=$batchId threw following exception: ", e) if (ignoreFailedBatch) { log.info(s"Ignore the exception and move on streaming as per " + s"${DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH_OPT_KEY} configuration") - Success((true, None)) + Success((true, None, None)) } else { if (retryCnt > 1) log.info(s"Retrying the failed micro batch id=$batchId ...") Failure(e) } - case Success((false, commitOps)) => + case Success((false, commitOps, compactionInstantOps, client, tableConfig)) => log.error(s"Micro batch id=$batchId ended up with errors" + (commitOps.isPresent match { case true => s" for commit=${commitOps.get()}" @@ -86,7 +115,7 @@ class HoodieStreamingSink(sqlContext: SQLContext, if (ignoreFailedBatch) { log.info(s"Ignore the errors and move on streaming as per " + s"${DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH_OPT_KEY} configuration") - Success((true, None)) + Success((true, None, None)) } else { if (retryCnt > 1) log.info(s"Retrying the failed micro batch id=$batchId ...") Failure(new HoodieCorruptedDataException(s"Micro batch id=$batchId ended up with errors")) @@ -100,6 +129,7 @@ class HoodieStreamingSink(sqlContext: SQLContext, // spark sometimes hangs upon exceptions and keep on hold of the executors // this is to force exit upon errors / exceptions and release all executors // will require redeployment / supervise mode to restart the streaming + reset(true) System.exit(1) } case Success(_) => @@ -112,11 +142,55 @@ class HoodieStreamingSink(sqlContext: SQLContext, @annotation.tailrec private def retry[T](n: Int, waitInMillis: Long)(fn: => Try[T]): Try[T] = { fn match { - case x: util.Success[T] => x + case x: Success[T] => + x case _ if n > 1 => Thread.sleep(waitInMillis) retry(n - 1, waitInMillis * 2)(fn) - case f => f + case f => + reset(false) + f + } + } + + protected def triggerAsyncCompactor(client: HoodieWriteClient[HoodieRecordPayload[Nothing]]): Unit = { + if (null == asyncCompactorService) { + log.info("Triggering Async compaction !!") + asyncCompactorService = new SparkStreamingAsyncCompactService(new JavaSparkContext(sqlContext.sparkContext), + client) + asyncCompactorService.start(new Function[java.lang.Boolean, java.lang.Boolean] { + override def apply(errored: lang.Boolean): lang.Boolean = { + log.info(s"Async Compactor shutdown. Errored ? $errored") + isAsyncCompactorServiceShutdownAbnormally = errored + reset(false) + log.info("Done resetting write client.") + true + } + }) + + // Add Shutdown Hook + Runtime.getRuntime.addShutdownHook(new Thread(new Runnable { + override def run(): Unit = reset(true) + })) + + // First time, scan .hoodie folder and get all pending compactions + val metaClient = new HoodieTableMetaClient(sqlContext.sparkContext.hadoopConfiguration, + client.getConfig.getBasePath) + val pendingInstants :java.util.List[HoodieInstant] = + CompactionUtils.getPendingCompactionInstantTimes(metaClient) + pendingInstants.foreach((h : HoodieInstant) => asyncCompactorService.enqueuePendingCompaction(h)) + } + } + + private def reset(force: Boolean) : Unit = this.synchronized { + if (asyncCompactorService != null) { + asyncCompactorService.shutdown(force) + asyncCompactorService = null + } + + if (writeClient.isDefined) { + writeClient.get.close() + writeClient = Option.empty } } } diff --git a/hudi-spark/src/test/java/HoodieJavaApp.java b/hudi-spark/src/test/java/HoodieJavaApp.java index 2cf36f9182d55..6eda0517d461c 100644 --- a/hudi-spark/src/test/java/HoodieJavaApp.java +++ b/hudi-spark/src/test/java/HoodieJavaApp.java @@ -151,6 +151,7 @@ public void run() throws Exception { .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY(), nonPartitionedTable ? NonpartitionedKeyGenerator.class.getCanonicalName() : SimpleKeyGenerator.class.getCanonicalName()) + .option(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE_KEY(), "false") // This will remove any existing data at path below, and create a .mode(SaveMode.Overwrite); @@ -177,6 +178,7 @@ public void run() throws Exception { nonPartitionedTable ? NonpartitionedKeyGenerator.class.getCanonicalName() : SimpleKeyGenerator.class.getCanonicalName()) // Add Key Extractor .option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS_PROP, "1") + .option(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE_KEY(), "false") .option(HoodieWriteConfig.TABLE_NAME, tableName).mode(SaveMode.Append); updateHiveSyncConfig(writer); @@ -202,6 +204,7 @@ public void run() throws Exception { nonPartitionedTable ? NonpartitionedKeyGenerator.class.getCanonicalName() : SimpleKeyGenerator.class.getCanonicalName()) // Add Key Extractor .option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS_PROP, "1") + .option(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE_KEY(), "false") .option(HoodieWriteConfig.TABLE_NAME, tableName).mode(SaveMode.Append); updateHiveSyncConfig(writer); diff --git a/hudi-spark/src/test/java/HoodieJavaStreamingApp.java b/hudi-spark/src/test/java/HoodieJavaStreamingApp.java index 977ae09d38da2..500189d1cf211 100644 --- a/hudi-spark/src/test/java/HoodieJavaStreamingApp.java +++ b/hudi-spark/src/test/java/HoodieJavaStreamingApp.java @@ -16,12 +16,18 @@ * limitations under the License. */ +import java.util.stream.Collectors; import org.apache.hudi.DataSourceReadOptions; import org.apache.hudi.DataSourceWriteOptions; import org.apache.hudi.HoodieDataSourceHelpers; import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.TableNotFoundException; import org.apache.hudi.hive.MultiPartKeysValueExtractor; import com.beust.jcommander.JCommander; @@ -43,6 +49,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import org.apache.spark.sql.streaming.StreamingQuery; import static org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings; @@ -52,14 +59,14 @@ public class HoodieJavaStreamingApp { @Parameter(names = {"--table-path", "-p"}, description = "path for Hoodie sample table") - private String tablePath = "file:///tmp/hoodie/streaming/sample-table"; + private String tablePath = "/tmp/hoodie/streaming/sample-table"; @Parameter(names = {"--streaming-source-path", "-ssp"}, description = "path for streaming source file folder") - private String streamingSourcePath = "file:///tmp/hoodie/streaming/source"; + private String streamingSourcePath = "/tmp/hoodie/streaming/source"; @Parameter(names = {"--streaming-checkpointing-path", "-scp"}, description = "path for streaming checking pointing folder") - private String streamingCheckpointingPath = "file:///tmp/hoodie/streaming/checkpoint"; + private String streamingCheckpointingPath = "/tmp/hoodie/streaming/checkpoint"; @Parameter(names = {"--streaming-duration-in-ms", "-sdm"}, description = "time in millisecond for the streaming duration") @@ -106,7 +113,15 @@ public static void main(String[] args) throws Exception { cmd.usage(); System.exit(1); } - cli.run(); + int errStatus = 0; + try { + cli.run(); + } catch (Exception ex) { + LOG.error("Got error running app ", ex); + errStatus = -1; + } finally { + System.exit(errStatus); + } } /** @@ -132,38 +147,118 @@ public void run() throws Exception { List records1 = recordsToStrings(dataGen.generateInserts("001", 100)); Dataset inputDF1 = spark.read().json(jssc.parallelize(records1, 2)); - List records2 = recordsToStrings(dataGen.generateUpdates("002", 100)); - + List records2 = recordsToStrings(dataGen.generateUpdatesForAllRecords("002")); Dataset inputDF2 = spark.read().json(jssc.parallelize(records2, 2)); - // setup the input for streaming - Dataset streamingInput = spark.readStream().schema(inputDF1.schema()).json(streamingSourcePath); + String ckptPath = streamingCheckpointingPath + "/stream1"; + String srcPath = streamingSourcePath + "/stream1"; + fs.mkdirs(new Path(ckptPath)); + fs.mkdirs(new Path(srcPath)); + + // setup the input for streaming + Dataset streamingInput = spark.readStream().schema(inputDF1.schema()).json(srcPath + "/*"); // start streaming and showing ExecutorService executor = Executors.newFixedThreadPool(2); + int numInitialCommits = 0; // thread for spark strucutured streaming - Future streamFuture = executor.submit(() -> { - LOG.info("===== Streaming Starting ====="); - stream(streamingInput); - LOG.info("===== Streaming Ends ====="); - return null; - }); - - // thread for adding data to the streaming source and showing results over time - Future showFuture = executor.submit(() -> { - LOG.info("===== Showing Starting ====="); - show(spark, fs, inputDF1, inputDF2); - LOG.info("===== Showing Ends ====="); - return null; - }); - - // let the threads run - streamFuture.get(); - showFuture.get(); - - executor.shutdown(); + try { + Future streamFuture = executor.submit(() -> { + LOG.info("===== Streaming Starting ====="); + stream(streamingInput, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL(), ckptPath); + LOG.info("===== Streaming Ends ====="); + return null; + }); + + // thread for adding data to the streaming source and showing results over time + Future showFuture = executor.submit(() -> { + LOG.info("===== Showing Starting ====="); + int numCommits = addInputAndValidateIngestion(spark, fs, srcPath,0, 100, inputDF1, inputDF2, true); + LOG.info("===== Showing Ends ====="); + return numCommits; + }); + + // let the threads run + streamFuture.get(); + numInitialCommits = showFuture.get(); + } finally { + executor.shutdownNow(); + } + + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jssc.hadoopConfiguration(), tablePath); + if (tableType.equals(HoodieTableType.MERGE_ON_READ.name())) { + // Ensure we have successfully completed one compaction commit + ValidationUtils.checkArgument(metaClient.getActiveTimeline().getCommitTimeline().getInstants().count() == 1); + } else { + ValidationUtils.checkArgument(metaClient.getActiveTimeline().getCommitTimeline().getInstants().count() >= 1); + } + + // Deletes Stream + // Need to restart application to ensure spark does not assume there are multiple streams active. + spark.close(); + SparkSession newSpark = SparkSession.builder().appName("Hoodie Spark Streaming APP") + .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").master("local[1]").getOrCreate(); + jssc = new JavaSparkContext(newSpark.sparkContext()); + String ckptPath2 = streamingCheckpointingPath + "/stream2"; + String srcPath2 = srcPath + "/stream2"; + fs.mkdirs(new Path(ckptPath2)); + fs.mkdirs(new Path(srcPath2)); + Dataset delStreamingInput = newSpark.readStream().schema(inputDF1.schema()).json(srcPath2 + "/*"); + List deletes = recordsToStrings(dataGen.generateUniqueUpdates("002", 20)); + Dataset inputDF3 = newSpark.read().json(jssc.parallelize(deletes, 2)); + executor = Executors.newFixedThreadPool(2); + + // thread for spark strucutured streaming + try { + Future streamFuture = executor.submit(() -> { + LOG.info("===== Streaming Starting ====="); + stream(delStreamingInput, DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL(), ckptPath2); + LOG.info("===== Streaming Ends ====="); + return null; + }); + + final int numCommits = numInitialCommits; + // thread for adding data to the streaming source and showing results over time + Future showFuture = executor.submit(() -> { + LOG.info("===== Showing Starting ====="); + addInputAndValidateIngestion(newSpark, fs, srcPath2, numCommits, 80, inputDF3, null, false); + LOG.info("===== Showing Ends ====="); + return null; + }); + + // let the threads run + streamFuture.get(); + showFuture.get(); + } finally { + executor.shutdown(); + } + } + + private void waitTillNCommits(FileSystem fs, int numCommits, int timeoutSecs, int sleepSecsAfterEachRun) + throws InterruptedException { + long beginTime = System.currentTimeMillis(); + long currTime = beginTime; + long timeoutMsecs = timeoutSecs * 1000; + + while ((currTime - beginTime) < timeoutMsecs) { + try { + HoodieTimeline timeline = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, tablePath); + LOG.info("Timeline :" + timeline.getInstants().collect(Collectors.toList())); + if (timeline.countInstants() >= numCommits) { + return; + } + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs.getConf(), tablePath, true); + System.out.println("Instants :" + metaClient.getActiveTimeline().getInstants().collect(Collectors.toList())); + } catch (TableNotFoundException te) { + LOG.info("Got table not found exception. Retrying"); + } finally { + Thread.sleep(sleepSecsAfterEachRun * 1000); + currTime = System.currentTimeMillis(); + } + } + throw new IllegalStateException("Timedout waiting for " + numCommits + " commits to appear in " + tablePath); } /** @@ -175,23 +270,40 @@ public void run() throws Exception { * @param inputDF2 * @throws Exception */ - public void show(SparkSession spark, FileSystem fs, Dataset inputDF1, Dataset inputDF2) throws Exception { - inputDF1.write().mode(SaveMode.Append).json(streamingSourcePath); + public int addInputAndValidateIngestion(SparkSession spark, FileSystem fs, String srcPath, + int initialCommits, int expRecords, + Dataset inputDF1, Dataset inputDF2, boolean instantTimeValidation) throws Exception { + inputDF1.write().mode(SaveMode.Append).json(srcPath); + + int numExpCommits = initialCommits + 1; // wait for spark streaming to process one microbatch - Thread.sleep(3000); + waitTillNCommits(fs, numExpCommits, 180, 3); String commitInstantTime1 = HoodieDataSourceHelpers.latestCommit(fs, tablePath); LOG.info("First commit at instant time :" + commitInstantTime1); - inputDF2.write().mode(SaveMode.Append).json(streamingSourcePath); - // wait for spark streaming to process one microbatch - Thread.sleep(3000); - String commitInstantTime2 = HoodieDataSourceHelpers.latestCommit(fs, tablePath); - LOG.info("Second commit at instant time :" + commitInstantTime2); + String commitInstantTime2 = commitInstantTime1; + if (null != inputDF2) { + numExpCommits += 1; + inputDF2.write().mode(SaveMode.Append).json(srcPath); + // wait for spark streaming to process one microbatch + Thread.sleep(3000); + waitTillNCommits(fs, numExpCommits, 180, 3); + commitInstantTime2 = HoodieDataSourceHelpers.latestCommit(fs, tablePath); + LOG.info("Second commit at instant time :" + commitInstantTime2); + } + + if (tableType.equals(HoodieTableType.MERGE_ON_READ.name())) { + numExpCommits += 1; + // Wait for compaction to also finish and track latest timestamp as commit timestamp + waitTillNCommits(fs, numExpCommits, 180, 3); + commitInstantTime2 = HoodieDataSourceHelpers.latestCommit(fs, tablePath); + LOG.info("Compaction commit at instant time :" + commitInstantTime2); + } /** * Read & do some queries */ - Dataset hoodieROViewDF = spark.read().format("org.apache.hudi") + Dataset hoodieROViewDF = spark.read().format("hudi") // pass any path glob, can include hoodie & non-hoodie // datasets .load(tablePath + "/*/*/*/*"); @@ -200,11 +312,24 @@ public void show(SparkSession spark, FileSystem fs, Dataset inputDF1, Datas // all trips whose fare amount was greater than 2. spark.sql("select fare.amount, begin_lon, begin_lat, timestamp from hoodie_ro where fare.amount > 2.0").show(); + if (instantTimeValidation) { + System.out.println("Showing all records. Latest Instant Time =" + commitInstantTime2); + spark.sql("select * from hoodie_ro").show(200, false); + long numRecordsAtInstant2 = + spark.sql("select * from hoodie_ro where _hoodie_commit_time = " + commitInstantTime2).count(); + ValidationUtils.checkArgument(numRecordsAtInstant2 == expRecords, + "Expecting " + expRecords + " records, Got " + numRecordsAtInstant2); + } + + long numRecords = spark.sql("select * from hoodie_ro").count(); + ValidationUtils.checkArgument(numRecords == expRecords, + "Expecting " + expRecords + " records, Got " + numRecords); + if (tableType.equals(HoodieTableType.COPY_ON_WRITE.name())) { /** * Consume incrementally, only changes in commit 2 above. Currently only supported for COPY_ON_WRITE TABLE */ - Dataset hoodieIncViewDF = spark.read().format("org.apache.hudi") + Dataset hoodieIncViewDF = spark.read().format("hudi") .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY(), DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL()) // Only changes in write 2 above .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY(), commitInstantTime1) @@ -214,6 +339,7 @@ public void show(SparkSession spark, FileSystem fs, Dataset inputDF1, Datas LOG.info("You will only see records from : " + commitInstantTime2); hoodieIncViewDF.groupBy(hoodieIncViewDF.col("_hoodie_commit_time")).count().show(); } + return numExpCommits; } /** @@ -222,19 +348,23 @@ public void show(SparkSession spark, FileSystem fs, Dataset inputDF1, Datas * @param streamingInput * @throws Exception */ - public void stream(Dataset streamingInput) throws Exception { + public void stream(Dataset streamingInput, String operationType, String checkpointLocation) throws Exception { DataStreamWriter writer = streamingInput.writeStream().format("org.apache.hudi") .option("hoodie.insert.shuffle.parallelism", "2").option("hoodie.upsert.shuffle.parallelism", "2") + .option(DataSourceWriteOptions.OPERATION_OPT_KEY(), operationType) .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY(), tableType) .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key") .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "partition") .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY(), "timestamp") - .option(HoodieWriteConfig.TABLE_NAME, tableName).option("checkpointLocation", streamingCheckpointingPath) + .option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS_PROP, "1") + .option(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE_KEY(), "true") + .option(HoodieWriteConfig.TABLE_NAME, tableName).option("checkpointLocation", checkpointLocation) .outputMode(OutputMode.Append()); updateHiveSyncConfig(writer); - writer.trigger(new ProcessingTime(500)).start(tablePath).awaitTermination(streamingDurationInMs); + StreamingQuery query = writer.trigger(new ProcessingTime(500)).start(tablePath); + query.awaitTermination(streamingDurationInMs); } /** diff --git a/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala b/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala index 7f264814ccda6..feeec96e0d9d4 100644 --- a/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala +++ b/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala @@ -50,7 +50,8 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { try { val sqlContext = session.sqlContext val options = Map("path" -> "hoodie/test/path", HoodieWriteConfig.TABLE_NAME -> "hoodie_test_tbl") - val e = intercept[HoodieException](HoodieSparkSqlWriter.write(sqlContext, SaveMode.ErrorIfExists, options, session.emptyDataFrame)) + val e = intercept[HoodieException](HoodieSparkSqlWriter.write(sqlContext, SaveMode.ErrorIfExists, options, + session.emptyDataFrame)) assert(e.getMessage.contains("spark.serializer")) } finally { session.stop() diff --git a/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSource.scala b/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSource.scala index 07e8704da0f62..48582c18e545f 100644 --- a/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSource.scala +++ b/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSource.scala @@ -17,12 +17,16 @@ package org.apache.hudi.functional + import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hudi.common.fs.FSUtils +import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.testutils.HoodieTestDataGenerator import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings import org.apache.hudi.config.HoodieWriteConfig +import org.apache.hudi.exception.TableNotFoundException import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers} +import org.apache.log4j.LogManager import org.apache.spark.sql._ import org.apache.spark.sql.functions.col import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime} @@ -39,6 +43,7 @@ import scala.concurrent.{Await, Future} * Basic tests on the spark datasource */ class TestDataSource { + private val log = LogManager.getLogger(getClass) var spark: SparkSession = null var dataGen: HoodieTestDataGenerator = null @@ -214,7 +219,7 @@ class TestDataSource { assertEquals(hoodieIncViewDF2.count(), insert2NewKeyCnt) } - //@Test (TODO: re-enable after fixing noisyness) + @Test def testStructuredStreaming(): Unit = { fs.delete(new Path(basePath), true) val sourcePath = basePath + "/source" @@ -254,7 +259,7 @@ class TestDataSource { val f2 = Future { inputDF1.write.mode(SaveMode.Append).json(sourcePath) // wait for spark streaming to process one microbatch - Thread.sleep(3000) + val currNumCommits = waitTillAtleastNCommits(fs, destPath, 1, 120, 5); assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, destPath, "000")) val commitInstantTime1: String = HoodieDataSourceHelpers.latestCommit(fs, destPath) // Read RO View @@ -264,9 +269,8 @@ class TestDataSource { inputDF2.write.mode(SaveMode.Append).json(sourcePath) // wait for spark streaming to process one microbatch - Thread.sleep(10000) + waitTillAtleastNCommits(fs, destPath, currNumCommits + 1, 120, 5); val commitInstantTime2: String = HoodieDataSourceHelpers.latestCommit(fs, destPath) - assertEquals(2, HoodieDataSourceHelpers.listCommitsSince(fs, destPath, "000").size()) // Read RO View val hoodieROViewDF2 = spark.read.format("org.apache.hudi") @@ -299,8 +303,35 @@ class TestDataSource { assertEquals(1, countsPerCommit.length) assertEquals(commitInstantTime2, countsPerCommit(0).get(0)) } - Await.result(Future.sequence(Seq(f1, f2)), Duration.Inf) + } + @throws[InterruptedException] + private def waitTillAtleastNCommits(fs: FileSystem, tablePath: String, + numCommits: Int, timeoutSecs: Int, sleepSecsAfterEachRun: Int): Int = { + val beginTime = System.currentTimeMillis + var currTime = beginTime + val timeoutMsecs = timeoutSecs * 1000 + var numInstants = 0 + var success: Boolean = false + while ({!success && (currTime - beginTime) < timeoutMsecs}) try { + val timeline = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, tablePath) + log.info("Timeline :" + timeline.getInstants.toArray) + if (timeline.countInstants >= numCommits) { + numInstants = timeline.countInstants + success = true + } + val metaClient = new HoodieTableMetaClient(fs.getConf, tablePath, true) + } catch { + case te: TableNotFoundException => + log.info("Got table not found exception. Retrying") + } finally { + Thread.sleep(sleepSecsAfterEachRun * 1000) + currTime = System.currentTimeMillis + } + if (!success) { + throw new IllegalStateException("Timed-out waiting for " + numCommits + " commits to appear in " + tablePath) + } + numInstants } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java index 05a1bbca6139e..3bc51c9ef5605 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java @@ -18,8 +18,9 @@ package org.apache.hudi.utilities.deltastreamer; -import org.apache.hudi.client.HoodieWriteClient; import org.apache.hudi.async.AbstractAsyncService; +import org.apache.hudi.async.AsyncCompactService; +import org.apache.hudi.client.HoodieWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.bootstrap.index.HFileBootstrapIndex; import org.apache.hudi.common.config.TypedProperties; @@ -62,15 +63,9 @@ import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; -import java.util.stream.IntStream; /** * An Utility which can incrementally take the output from {@link HiveIncrementalPuller} and apply it to the target @@ -97,6 +92,8 @@ public class HoodieDeltaStreamer implements Serializable { private final Option bootstrapExecutor; + public static final String DELTASYNC_POOL_NAME = "hoodiedeltasync"; + public HoodieDeltaStreamer(Config cfg, JavaSparkContext jssc) throws IOException { this(cfg, jssc, FSUtils.getFs(cfg.targetBasePath, jssc.hadoopConfiguration()), jssc.hadoopConfiguration(), null); @@ -559,8 +556,8 @@ protected Pair startService() { boolean error = false; if (cfg.isAsyncCompactionEnabled()) { // set Scheduler Pool. - LOG.info("Setting Spark Pool name for delta-sync to " + SchedulerConfGenerator.DELTASYNC_POOL_NAME); - jssc.setLocalProperty("spark.scheduler.pool", SchedulerConfGenerator.DELTASYNC_POOL_NAME); + LOG.info("Setting Spark Pool name for delta-sync to " + DELTASYNC_POOL_NAME); + jssc.setLocalProperty("spark.scheduler.pool", DELTASYNC_POOL_NAME); } try { while (!isShutdownRequested()) { @@ -661,100 +658,6 @@ public TypedProperties getProps() { } } - /** - * Async Compactor Service that runs in separate thread. Currently, only one compactor is allowed to run at any time. - */ - public static class AsyncCompactService extends AbstractAsyncService { - - private static final long serialVersionUID = 1L; - private final int maxConcurrentCompaction; - private transient Compactor compactor; - private transient JavaSparkContext jssc; - private transient BlockingQueue pendingCompactions = new LinkedBlockingQueue<>(); - private transient ReentrantLock queueLock = new ReentrantLock(); - private transient Condition consumed = queueLock.newCondition(); - - public AsyncCompactService(JavaSparkContext jssc, HoodieWriteClient client) { - this.jssc = jssc; - this.compactor = new Compactor(client, jssc); - this.maxConcurrentCompaction = 1; - } - - /** - * Enqueues new Pending compaction. - */ - public void enqueuePendingCompaction(HoodieInstant instant) { - pendingCompactions.add(instant); - } - - /** - * Wait till outstanding pending compactions reduces to the passed in value. - * - * @param numPendingCompactions Maximum pending compactions allowed - * @throws InterruptedException - */ - public void waitTillPendingCompactionsReducesTo(int numPendingCompactions) throws InterruptedException { - try { - queueLock.lock(); - while (!isShutdown() && (pendingCompactions.size() > numPendingCompactions)) { - consumed.await(); - } - } finally { - queueLock.unlock(); - } - } - - /** - * Fetch Next pending compaction if available. - * - * @return - * @throws InterruptedException - */ - private HoodieInstant fetchNextCompactionInstant() throws InterruptedException { - LOG.info("Compactor waiting for next instant for compaction upto 60 seconds"); - HoodieInstant instant = pendingCompactions.poll(60, TimeUnit.SECONDS); - if (instant != null) { - try { - queueLock.lock(); - // Signal waiting thread - consumed.signal(); - } finally { - queueLock.unlock(); - } - } - return instant; - } - - /** - * Start Compaction Service. - */ - @Override - protected Pair startService() { - ExecutorService executor = Executors.newFixedThreadPool(maxConcurrentCompaction); - return Pair.of(CompletableFuture.allOf(IntStream.range(0, maxConcurrentCompaction).mapToObj(i -> CompletableFuture.supplyAsync(() -> { - try { - // Set Compactor Pool Name for allowing users to prioritize compaction - LOG.info("Setting Spark Pool name for compaction to " + SchedulerConfGenerator.COMPACT_POOL_NAME); - jssc.setLocalProperty("spark.scheduler.pool", SchedulerConfGenerator.COMPACT_POOL_NAME); - - while (!isShutdownRequested()) { - final HoodieInstant instant = fetchNextCompactionInstant(); - if (null != instant) { - compactor.compact(instant); - } - } - LOG.info("Compactor shutting down properly!!"); - } catch (InterruptedException ie) { - LOG.warn("Compactor executor thread got interrupted exception. Stopping", ie); - } catch (IOException e) { - LOG.error("Compactor executor failed", e); - throw new HoodieIOException(e.getMessage(), e); - } - return true; - }, executor)).toArray(CompletableFuture[]::new)), executor); - } - } - public DeltaSyncService getDeltaSyncService() { return deltaSyncService.get(); } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SchedulerConfGenerator.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SchedulerConfGenerator.java index 54fcf689a99ab..f5f1f384765ab 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SchedulerConfGenerator.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SchedulerConfGenerator.java @@ -18,6 +18,7 @@ package org.apache.hudi.utilities.deltastreamer; +import org.apache.hudi.async.AsyncCompactService; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.util.Option; @@ -41,8 +42,8 @@ public class SchedulerConfGenerator { private static final Logger LOG = LogManager.getLogger(SchedulerConfGenerator.class); - public static final String DELTASYNC_POOL_NAME = "hoodiedeltasync"; - public static final String COMPACT_POOL_NAME = "hoodiecompact"; + public static final String DELTASYNC_POOL_NAME = HoodieDeltaStreamer.DELTASYNC_POOL_NAME; + public static final String COMPACT_POOL_NAME = AsyncCompactService.COMPACT_POOL_NAME; public static final String SPARK_SCHEDULER_MODE_KEY = "spark.scheduler.mode"; public static final String SPARK_SCHEDULER_FAIR_MODE = "FAIR"; public static final String SPARK_SCHEDULER_ALLOCATION_FILE_KEY = "spark.scheduler.allocation.file";