From 7d4f9812e05062092c33bb5a4718ed25aa29bac6 Mon Sep 17 00:00:00 2001 From: Sagar Sumit Date: Fri, 18 Jun 2021 12:51:09 +0530 Subject: [PATCH] [HUDI-1483] Support async clustering for deltastreamer and Spark streaming Integrate async clustering service with HoodieDeltaStreamer and HoodieStreamingSink Added methods in HoodieAsyncService to reuse code Move some common methods to HoodieAsyncService Add tests for clustering with compaction Rename methods appropriately Disable inline clustering Fix CI failures --- .../hudi/async/AsyncClusteringService.java | 100 ++++++++++++++++++ .../hudi/async/AsyncCompactService.java | 56 +--------- .../apache/hudi/async/HoodieAsyncService.java | 60 ++++++++++- .../hudi/client/AbstractClusteringClient.java | 55 ++++++++++ .../hudi/config/HoodieClusteringConfig.java | 13 ++- .../apache/hudi/config/HoodieWriteConfig.java | 4 + .../async/SparkAsyncClusteringService.java | 39 +++++++ .../client/HoodieSparkClusteringClient.java | 56 ++++++++++ .../SparkClusteringPlanActionExecutor.java | 11 +- .../hudi/common/util/ClusteringUtils.java | 4 + .../java/org/apache/hudi/DataSourceUtils.java | 6 ++ .../org/apache/hudi/DataSourceOptions.scala | 12 +++ .../SparkStreamingAsyncClusteringService.java | 42 ++++++++ .../apache/hudi/HoodieSparkSqlWriter.scala | 48 +++++++-- .../org/apache/hudi/HoodieStreamingSink.scala | 55 ++++++++-- .../org/apache/hudi/HoodieWriterUtils.scala | 2 + .../src/test/java/HoodieJavaApp.java | 3 + .../src/test/java/HoodieJavaStreamingApp.java | 1 + .../functional/TestStructuredStreaming.scala | 48 +++++++-- .../utilities/deltastreamer/DeltaSync.java | 20 ++++ .../deltastreamer/HoodieDeltaStreamer.java | 83 +++++++++++++-- .../HoodieMultiTableDeltaStreamer.java | 6 ++ .../functional/TestHoodieDeltaStreamer.java | 98 +++++++++++++---- 23 files changed, 710 insertions(+), 112 deletions(-) create mode 100644 hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncClusteringService.java create mode 100644 hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractClusteringClient.java create mode 100644 hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/async/SparkAsyncClusteringService.java create mode 100644 hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieSparkClusteringClient.java create mode 100644 hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/async/SparkStreamingAsyncClusteringService.java diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncClusteringService.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncClusteringService.java new file mode 100644 index 0000000000000..b9707bb6d82a7 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncClusteringService.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.async; + +import org.apache.hudi.client.AbstractClusteringClient; +import org.apache.hudi.client.AbstractHoodieWriteClient; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.exception.HoodieIOException; + +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.stream.IntStream; + +/** + * Async clustering service that runs in a separate thread. + * Currently, only one clustering thread is allowed to run at any time. + */ +public abstract class AsyncClusteringService extends HoodieAsyncService { + + private static final long serialVersionUID = 1L; + private static final Logger LOG = LogManager.getLogger(AsyncClusteringService.class); + + private final int maxConcurrentClustering; + private transient AbstractClusteringClient clusteringClient; + + public AsyncClusteringService(AbstractHoodieWriteClient writeClient) { + this(writeClient, false); + } + + public AsyncClusteringService(AbstractHoodieWriteClient writeClient, boolean runInDaemonMode) { + super(runInDaemonMode); + this.clusteringClient = createClusteringClient(writeClient); + this.maxConcurrentClustering = 1; + } + + protected abstract AbstractClusteringClient createClusteringClient(AbstractHoodieWriteClient client); + + /** + * Start clustering service. + */ + @Override + protected Pair startService() { + ExecutorService executor = Executors.newFixedThreadPool(maxConcurrentClustering, + r -> { + Thread t = new Thread(r, "async_clustering_thread"); + t.setDaemon(isRunInDaemonMode()); + return t; + }); + + return Pair.of(CompletableFuture.allOf(IntStream.range(0, maxConcurrentClustering).mapToObj(i -> CompletableFuture.supplyAsync(() -> { + try { + while (!isShutdownRequested()) { + final HoodieInstant instant = fetchNextAsyncServiceInstant(); + if (null != instant) { + LOG.info("Starting clustering for instant " + instant); + clusteringClient.cluster(instant); + LOG.info("Finished clustering for instant " + instant); + } + } + LOG.info("Clustering executor shutting down properly"); + } catch (InterruptedException ie) { + LOG.warn("Clustering executor got interrupted exception! Stopping", ie); + } catch (IOException e) { + LOG.error("Clustering executor failed", e); + throw new HoodieIOException(e.getMessage(), e); + } + return true; + }, executor)).toArray(CompletableFuture[]::new)), executor); + } + + /** + * Update the write client to be used for clustering. + */ + public synchronized void updateWriteClient(AbstractHoodieWriteClient writeClient) { + this.clusteringClient.updateWriteClient(writeClient); + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncCompactService.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncCompactService.java index 1bb5daa528c79..2f63297210e14 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncCompactService.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncCompactService.java @@ -24,18 +24,14 @@ import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieIOException; + import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import java.io.IOException; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; import java.util.stream.IntStream; /** @@ -54,9 +50,6 @@ public abstract class AsyncCompactService extends HoodieAsyncService { private final int maxConcurrentCompaction; private transient AbstractCompactor compactor; protected transient HoodieEngineContext context; - private transient BlockingQueue pendingCompactions = new LinkedBlockingQueue<>(); - private transient ReentrantLock queueLock = new ReentrantLock(); - private transient Condition consumed = queueLock.newCondition(); public AsyncCompactService(HoodieEngineContext context, AbstractHoodieWriteClient client) { this(context, client, false); @@ -71,51 +64,6 @@ public AsyncCompactService(HoodieEngineContext context, AbstractHoodieWriteClien protected abstract AbstractCompactor createCompactor(AbstractHoodieWriteClient client); - /** - * Enqueues new Pending compaction. - */ - public void enqueuePendingCompaction(HoodieInstant instant) { - pendingCompactions.add(instant); - } - - /** - * Wait till outstanding pending compactions reduces to the passed in value. - * - * @param numPendingCompactions Maximum pending compactions allowed - * @throws InterruptedException - */ - public void waitTillPendingCompactionsReducesTo(int numPendingCompactions) throws InterruptedException { - try { - queueLock.lock(); - while (!isShutdown() && (pendingCompactions.size() > numPendingCompactions)) { - consumed.await(); - } - } finally { - queueLock.unlock(); - } - } - - /** - * Fetch Next pending compaction if available. - * - * @return - * @throws InterruptedException - */ - private HoodieInstant fetchNextCompactionInstant() throws InterruptedException { - LOG.info("Compactor waiting for next instant for compaction upto 60 seconds"); - HoodieInstant instant = pendingCompactions.poll(10, TimeUnit.SECONDS); - if (instant != null) { - try { - queueLock.lock(); - // Signal waiting thread - consumed.signal(); - } finally { - queueLock.unlock(); - } - } - return instant; - } - /** * Start Compaction Service. */ @@ -134,7 +82,7 @@ protected Pair startService() { context.setProperty(EngineProperty.COMPACTION_POOL_NAME, COMPACT_POOL_NAME); while (!isShutdownRequested()) { - final HoodieInstant instant = fetchNextCompactionInstant(); + final HoodieInstant instant = fetchNextAsyncServiceInstant(); if (null != instant) { LOG.info("Starting Compaction for instant " + instant); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncService.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncService.java index 32dd0427255f0..85e008199adca 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncService.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncService.java @@ -18,21 +18,26 @@ package org.apache.hudi.async; +import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.util.collection.Pair; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import java.io.Serializable; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; /** - * Base Class for running clean/delta-sync/compaction in separate thread and controlling their life-cycle. + * Base Class for running clean/delta-sync/compaction/clustering in separate thread and controlling their life-cycle. */ public abstract class HoodieAsyncService implements Serializable { @@ -50,6 +55,12 @@ public abstract class HoodieAsyncService implements Serializable { private transient CompletableFuture future; // Run in daemon mode private final boolean runInDaemonMode; + // Queue to hold pending compaction/clustering instants + private transient BlockingQueue pendingInstants = new LinkedBlockingQueue<>(); + // Mutex lock for synchronized access to pendingInstants queue + private transient ReentrantLock queueLock = new ReentrantLock(); + // Condition instance to use with the queueLock + private transient Condition consumed = queueLock.newCondition(); protected HoodieAsyncService() { this(false); @@ -165,4 +176,51 @@ private void monitorThreads(Function onShutdownCallback) { public boolean isRunInDaemonMode() { return runInDaemonMode; } + + /** + * Wait till outstanding pending compaction/clustering reduces to the passed in value. + * + * @param numPending Maximum pending compactions/clustering allowed + * @throws InterruptedException + */ + public void waitTillPendingAsyncServiceInstantsReducesTo(int numPending) throws InterruptedException { + try { + queueLock.lock(); + while (!isShutdown() && (pendingInstants.size() > numPending)) { + consumed.await(); + } + } finally { + queueLock.unlock(); + } + } + + /** + * Enqueues new pending clustering instant. + * @param instant {@link HoodieInstant} to enqueue. + */ + public void enqueuePendingAsyncServiceInstant(HoodieInstant instant) { + LOG.info("Enqueuing new pending clustering instant: " + instant.getTimestamp()); + pendingInstants.add(instant); + } + + /** + * Fetch next pending compaction/clustering instant if available. + * + * @return {@link HoodieInstant} corresponding to the next pending compaction/clustering. + * @throws InterruptedException + */ + HoodieInstant fetchNextAsyncServiceInstant() throws InterruptedException { + LOG.info("Waiting for next instant upto 10 seconds"); + HoodieInstant instant = pendingInstants.poll(10, TimeUnit.SECONDS); + if (instant != null) { + try { + queueLock.lock(); + // Signal waiting thread + consumed.signal(); + } finally { + queueLock.unlock(); + } + } + return instant; + } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractClusteringClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractClusteringClient.java new file mode 100644 index 0000000000000..34234f546ed19 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractClusteringClient.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.client; + +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.table.timeline.HoodieInstant; + +import java.io.IOException; +import java.io.Serializable; + +/** + * Client will run one round of clustering. + */ +public abstract class AbstractClusteringClient implements Serializable { + + private static final long serialVersionUID = 1L; + + protected transient AbstractHoodieWriteClient clusteringClient; + + public AbstractClusteringClient(AbstractHoodieWriteClient clusteringClient) { + this.clusteringClient = clusteringClient; + } + + /** + * Run clustering for the instant. + * @param instant + * @throws IOException + */ + public abstract void cluster(HoodieInstant instant) throws IOException; + + /** + * Update the write client used by async clustering. + * @param writeClient + */ + public void updateWriteClient(AbstractHoodieWriteClient writeClient) { + this.clusteringClient = writeClient; + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java index 86d26d3b2e6b0..a750fa23c511f 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java @@ -53,7 +53,13 @@ public class HoodieClusteringConfig extends HoodieConfig { .key("hoodie.clustering.inline.max.commits") .defaultValue("4") .sinceVersion("0.7.0") - .withDocumentation("Config to control frequency of clustering"); + .withDocumentation("Config to control frequency of inline clustering"); + + public static final ConfigProperty ASYNC_CLUSTERING_MAX_COMMIT_PROP = ConfigProperty + .key("hoodie.clustering.async.max.commits") + .defaultValue("4") + .sinceVersion("0.9.0") + .withDocumentation("Config to control frequency of async clustering"); // Any strategy specific params can be saved with this prefix public static final String CLUSTERING_STRATEGY_PARAM_PREFIX = "hoodie.clustering.plan.strategy."; @@ -177,6 +183,11 @@ public Builder withInlineClusteringNumCommits(int numCommits) { return this; } + public Builder withAsyncClusteringMaxCommits(int numCommits) { + clusteringConfig.setValue(ASYNC_CLUSTERING_MAX_COMMIT_PROP, String.valueOf(numCommits)); + return this; + } + public Builder fromProperties(Properties props) { this.clusteringConfig.getProps().putAll(props); return this; diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 1573c7fbfcd03..20d2846a7a7c5 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -677,6 +677,10 @@ public int getInlineClusterMaxCommits() { return getInt(HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMIT_PROP); } + public int getAsyncClusterMaxCommits() { + return getInt(HoodieClusteringConfig.ASYNC_CLUSTERING_MAX_COMMIT_PROP); + } + public String getPayloadClass() { return getString(HoodieCompactionConfig.PAYLOAD_CLASS_PROP); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/async/SparkAsyncClusteringService.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/async/SparkAsyncClusteringService.java new file mode 100644 index 0000000000000..ce436ba034d98 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/async/SparkAsyncClusteringService.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.async; + +import org.apache.hudi.client.AbstractClusteringClient; +import org.apache.hudi.client.AbstractHoodieWriteClient; +import org.apache.hudi.client.HoodieSparkClusteringClient; + +/** + * Async clustering service for Spark datasource. + */ +public class SparkAsyncClusteringService extends AsyncClusteringService { + + public SparkAsyncClusteringService(AbstractHoodieWriteClient writeClient) { + super(writeClient); + } + + @Override + protected AbstractClusteringClient createClusteringClient(AbstractHoodieWriteClient client) { + return new HoodieSparkClusteringClient(client); + } +} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieSparkClusteringClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieSparkClusteringClient.java new file mode 100644 index 0000000000000..884b555447897 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieSparkClusteringClient.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.client; + +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.table.timeline.HoodieInstant; + +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaRDD; + +import java.io.IOException; + +/** + * Async clustering client for Spark datasource. + */ +public class HoodieSparkClusteringClient extends + AbstractClusteringClient>, JavaRDD, JavaRDD> { + + private static final Logger LOG = LogManager.getLogger(HoodieSparkClusteringClient.class); + + public HoodieSparkClusteringClient( + AbstractHoodieWriteClient>, JavaRDD, JavaRDD> clusteringClient) { + super(clusteringClient); + } + + @Override + public void cluster(HoodieInstant instant) throws IOException { + LOG.info("Executing clustering instance " + instant); + SparkRDDWriteClient writeClient = (SparkRDDWriteClient) clusteringClient; + JavaRDD res = writeClient.cluster(instant.getTimestamp(), true).getWriteStatuses(); + if (res != null && res.collect().stream().anyMatch(WriteStatus::hasErrors)) { + // TODO: Should we treat this fatal and throw exception? + LOG.error("Clustering for instant (" + instant + ") failed with write errors"); + } + } +} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkClusteringPlanActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkClusteringPlanActionExecutor.java index 1f71aa4dfac7c..683d852131f6d 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkClusteringPlanActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkClusteringPlanActionExecutor.java @@ -58,13 +58,20 @@ protected Option createClusteringPlan() { int commitsSinceLastClustering = table.getActiveTimeline().getCommitsTimeline().filterCompletedInstants() .findInstantsAfter(lastClusteringInstant.map(HoodieInstant::getTimestamp).orElse("0"), Integer.MAX_VALUE) .countInstants(); - if (config.getInlineClusterMaxCommits() > commitsSinceLastClustering) { - LOG.info("Not scheduling clustering as only " + commitsSinceLastClustering + if (config.inlineClusteringEnabled() && config.getInlineClusterMaxCommits() > commitsSinceLastClustering) { + LOG.info("Not scheduling inline clustering as only " + commitsSinceLastClustering + " commits was found since last clustering " + lastClusteringInstant + ". Waiting for " + config.getInlineClusterMaxCommits()); return Option.empty(); } + if (config.isAsyncClusteringEnabled() && config.getAsyncClusterMaxCommits() > commitsSinceLastClustering) { + LOG.info("Not scheduling async clustering as only " + commitsSinceLastClustering + + " commits was found since last clustering " + lastClusteringInstant + ". Waiting for " + + config.getAsyncClusterMaxCommits()); + return Option.empty(); + } + LOG.info("Generating clustering plan for table " + config.getBasePath()); ClusteringPlanStrategy strategy = (ClusteringPlanStrategy) ReflectionUtils.loadClass(config.getClusteringPlanStrategyClass(), table, context, config); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java index 1bf97c68eb69a..0d790be842619 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java @@ -206,4 +206,8 @@ private static Map buildMetrics(List fileSlices) { metrics.put(TOTAL_LOG_FILES, (double) numLogFiles); return metrics; } + + public static List getPendingClusteringInstantTimes(HoodieTableMetaClient metaClient) { + return metaClient.getActiveTimeline().filterPendingReplaceTimeline().getInstants().collect(Collectors.toList()); + } } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java index 352a0ca7bc2cf..469f9c7b490bf 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java +++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java @@ -34,6 +34,7 @@ import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.TablePathUtils; +import org.apache.hudi.config.HoodieClusteringConfig; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodiePayloadConfig; @@ -171,6 +172,8 @@ public static HoodieWriteConfig createHoodieConfig(String schemaStr, String base boolean asyncCompact = Boolean.parseBoolean(parameters.get(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE_OPT_KEY().key())); boolean inlineCompact = !asyncCompact && parameters.get(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY().key()) .equals(DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL()); + boolean asyncClusteringEnabled = Boolean.parseBoolean(parameters.get(DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE_OPT_KEY().key())); + boolean inlineClusteringEnabled = Boolean.parseBoolean(parameters.get(DataSourceWriteOptions.INLINE_CLUSTERING_ENABLE_OPT_KEY().key())); // insert/bulk-insert combining to be true, if filtering for duplicates boolean combineInserts = Boolean.parseBoolean(parameters.get(DataSourceWriteOptions.INSERT_DROP_DUPS_OPT_KEY().key())); HoodieWriteConfig.Builder builder = HoodieWriteConfig.newBuilder() @@ -184,6 +187,9 @@ public static HoodieWriteConfig createHoodieConfig(String schemaStr, String base .withCompactionConfig(HoodieCompactionConfig.newBuilder() .withPayloadClass(parameters.get(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY().key())) .withInlineCompaction(inlineCompact).build()) + .withClusteringConfig(HoodieClusteringConfig.newBuilder() + .withInlineClustering(inlineClusteringEnabled) + .withAsyncClustering(asyncClusteringEnabled).build()) .withPayloadConfig(HoodiePayloadConfig.newBuilder().withPayloadOrderingField(parameters.get(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY().key())) .build()) // override above with Hoodie configs specified as options. diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala index ebb8c6bc4d813..ce3683126c8a0 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -474,6 +474,18 @@ object DataSourceWriteOptions { .defaultValue("true") .withDocumentation("") + val INLINE_CLUSTERING_ENABLE_OPT_KEY: ConfigProperty[String] = ConfigProperty + .key("hoodie.datasource.clustering.inline.enable") + .defaultValue("false") + .sinceVersion("0.9.0") + .withDocumentation("Enable inline clustering. Disabled by default.") + + val ASYNC_CLUSTERING_ENABLE_OPT_KEY: ConfigProperty[String] = ConfigProperty + .key("hoodie.datasource.clustering.async.enable") + .defaultValue("false") + .sinceVersion("0.9.0") + .withDocumentation("Enable asynchronous clustering. Disabled by default.") + val KAFKA_AVRO_VALUE_DESERIALIZER_CLASS: ConfigProperty[String] = ConfigProperty .key("hoodie.deltastreamer.source.kafka.value.deserializer.class") .defaultValue("io.confluent.kafka.serializers.KafkaAvroDeserializer") diff --git a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/async/SparkStreamingAsyncClusteringService.java b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/async/SparkStreamingAsyncClusteringService.java new file mode 100644 index 0000000000000..81d880ee974df --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/async/SparkStreamingAsyncClusteringService.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.async; + +import org.apache.hudi.client.AbstractClusteringClient; +import org.apache.hudi.client.AbstractHoodieWriteClient; +import org.apache.hudi.client.HoodieSparkClusteringClient; + +/** + * Async clustering service for Spark structured streaming. + * Here, async clustering is run in daemon mode to prevent blocking shutting down the Spark application. + */ +public class SparkStreamingAsyncClusteringService extends AsyncClusteringService { + + private static final long serialVersionUID = 1L; + + public SparkStreamingAsyncClusteringService(AbstractHoodieWriteClient writeClient) { + super(writeClient, true); + } + + @Override + protected AbstractClusteringClient createClusteringClient(AbstractHoodieWriteClient client) { + return new HoodieSparkClusteringClient(client); + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 76bc99b250bd6..b290533dd348b 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -62,6 +62,7 @@ object HoodieSparkSqlWriter { private val log = LogManager.getLogger(getClass) private var tableExists: Boolean = false private var asyncCompactionTriggerFnDefined: Boolean = false + private var asyncClusteringTriggerFnDefined: Boolean = false def write(sqlContext: SQLContext, mode: SaveMode, @@ -69,9 +70,10 @@ object HoodieSparkSqlWriter { df: DataFrame, hoodieTableConfigOpt: Option[HoodieTableConfig] = Option.empty, hoodieWriteClient: Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]] = Option.empty, - asyncCompactionTriggerFn: Option[Function1[SparkRDDWriteClient[HoodieRecordPayload[Nothing]], Unit]] = Option.empty + asyncCompactionTriggerFn: Option[Function1[SparkRDDWriteClient[HoodieRecordPayload[Nothing]], Unit]] = Option.empty, + asyncClusteringTriggerFn: Option[Function1[SparkRDDWriteClient[HoodieRecordPayload[Nothing]], Unit]] = Option.empty ) - : (Boolean, common.util.Option[String], common.util.Option[String], + : (Boolean, common.util.Option[String], common.util.Option[String], common.util.Option[String], SparkRDDWriteClient[HoodieRecordPayload[Nothing]], HoodieTableConfig) = { val sparkContext = sqlContext.sparkContext @@ -79,6 +81,7 @@ object HoodieSparkSqlWriter { val hoodieConfig = HoodieWriterUtils.convertMapToHoodieConfig(parameters) val tblNameOp = hoodieConfig.getStringOrThrow(HoodieWriteConfig.TABLE_NAME, s"'${HoodieWriteConfig.TABLE_NAME.key}' must be set.") asyncCompactionTriggerFnDefined = asyncCompactionTriggerFn.isDefined + asyncClusteringTriggerFnDefined = asyncClusteringTriggerFn.isDefined if (path.isEmpty) { throw new HoodieException(s"'path' must be set.") } @@ -112,7 +115,7 @@ object HoodieSparkSqlWriter { if (mode == SaveMode.Ignore && tableExists) { log.warn(s"hoodie table at $basePath already exists. Ignoring & not performing actual writes.") - (false, common.util.Option.empty(), common.util.Option.empty(), hoodieWriteClient.orNull, tableConfig) + (false, common.util.Option.empty(), common.util.Option.empty(), common.util.Option.empty(), hoodieWriteClient.orNull, tableConfig) } else { // Handle various save modes handleSaveModes(mode, basePath, tableConfig, tblName, operation, fs) @@ -140,7 +143,7 @@ object HoodieSparkSqlWriter { operation == WriteOperationType.BULK_INSERT) { val (success, commitTime: common.util.Option[String]) = bulkInsertAsRow(sqlContext, parameters, df, tblName, basePath, path, instantTime) - return (success, commitTime, common.util.Option.empty(), hoodieWriteClient.orNull, tableConfig) + return (success, commitTime, common.util.Option.empty(), common.util.Option.empty(), hoodieWriteClient.orNull, tableConfig) } // scalastyle:on @@ -180,6 +183,10 @@ object HoodieSparkSqlWriter { asyncCompactionTriggerFn.get.apply(client) } + if (isAsyncClusteringEnabled(client, parameters)) { + asyncClusteringTriggerFn.get.apply(client) + } + val hoodieRecords = if (hoodieConfig.getBoolean(INSERT_DROP_DUPS_OPT_KEY)) { DataSourceUtils.dropDuplicates(jsc, hoodieAllIncomingRecords, mapAsJavaMap(parameters)) @@ -219,6 +226,10 @@ object HoodieSparkSqlWriter { asyncCompactionTriggerFn.get.apply(client) } + if (isAsyncClusteringEnabled(client, parameters)) { + asyncClusteringTriggerFn.get.apply(client) + } + // Issue deletes client.startCommitWithTime(instantTime, commitActionType) val writeStatuses = DataSourceUtils.doDeleteOperation(client, hoodieKeysToDelete, instantTime) @@ -226,7 +237,7 @@ object HoodieSparkSqlWriter { } // Check for errors and commit the write. - val (writeSuccessful, compactionInstant) = + val (writeSuccessful, compactionInstant, clusteringInstant) = commitAndPerformPostOperations(sqlContext.sparkSession, df.schema, writeResult, parameters, writeClient, tableConfig, jsc, TableInstantInfo(basePath, instantTime, commitActionType, operation)) @@ -247,7 +258,7 @@ object HoodieSparkSqlWriter { // it's safe to unpersist cached rdds here unpersistRdd(writeResult.getWriteStatuses.rdd) - (writeSuccessful, common.util.Option.ofNullable(instantTime), compactionInstant, writeClient, tableConfig) + (writeSuccessful, common.util.Option.ofNullable(instantTime), compactionInstant, clusteringInstant, writeClient, tableConfig) } } @@ -565,7 +576,7 @@ object HoodieSparkSqlWriter { tableConfig: HoodieTableConfig, jsc: JavaSparkContext, tableInstantInfo: TableInstantInfo - ): (Boolean, common.util.Option[java.lang.String]) = { + ): (Boolean, common.util.Option[java.lang.String], common.util.Option[java.lang.String]) = { if(writeResult.getWriteStatuses.rdd.filter(ws => ws.hasErrors).isEmpty()) { log.info("Proceeding to commit the write.") val metaMap = parameters.filter(kv => @@ -593,14 +604,24 @@ object HoodieSparkSqlWriter { log.info(s"Compaction Scheduled is $compactionInstant") + val asyncClusteringEnabled = isAsyncClusteringEnabled(client, parameters) + val clusteringInstant: common.util.Option[java.lang.String] = + if (asyncClusteringEnabled) { + client.scheduleClustering(common.util.Option.of(new util.HashMap[String, String](mapAsJavaMap(metaMap)))) + } else { + common.util.Option.empty() + } + + log.info(s"Clustering Scheduled is $clusteringInstant") + val metaSyncSuccess = metaSync(spark, HoodieWriterUtils.convertMapToHoodieConfig(parameters), tableInstantInfo.basePath, schema) log.info(s"Is Async Compaction Enabled ? $asyncCompactionEnabled") - if (!asyncCompactionEnabled) { + if (!asyncCompactionEnabled && !asyncClusteringEnabled) { client.close() } - (commitSuccess && metaSyncSuccess, compactionInstant) + (commitSuccess && metaSyncSuccess, compactionInstant, clusteringInstant) } else { log.error(s"${tableInstantInfo.operation} failed with errors") if (log.isTraceEnabled) { @@ -615,7 +636,7 @@ object HoodieSparkSqlWriter { } }) } - (false, common.util.Option.empty()) + (false, common.util.Option.empty(), common.util.Option.empty()) } } @@ -631,6 +652,13 @@ object HoodieSparkSqlWriter { } } + private def isAsyncClusteringEnabled(client: SparkRDDWriteClient[HoodieRecordPayload[Nothing]], + parameters: Map[String, String]) : Boolean = { + log.info(s"Config.asyncClusteringEnabled ? ${client.getConfig.isAsyncClusteringEnabled}") + asyncClusteringTriggerFnDefined && client.getConfig.isAsyncClusteringEnabled && + parameters.get(ASYNC_CLUSTERING_ENABLE_OPT_KEY.key).exists(r => r.toBoolean) + } + private def getHoodieTableConfig(sparkContext: SparkContext, tablePath: String, hoodieTableConfigOpt: Option[HoodieTableConfig]): HoodieTableConfig = { diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala index 61cad3890cc7d..8dfcbc4f1fa0b 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala @@ -19,7 +19,7 @@ package org.apache.hudi import java.lang import java.util.function.Function -import org.apache.hudi.async.{AsyncCompactService, SparkStreamingAsyncCompactService} +import org.apache.hudi.async.{AsyncClusteringService, AsyncCompactService, SparkStreamingAsyncClusteringService, SparkStreamingAsyncCompactService} import org.apache.hudi.client.SparkRDDWriteClient import org.apache.hudi.client.common.HoodieSparkEngineContext import org.apache.hudi.common.model.HoodieRecordPayload @@ -27,6 +27,7 @@ import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} import org.apache.hudi.common.table.timeline.HoodieInstant.State import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline} import org.apache.hudi.common.util.CompactionUtils +import org.apache.hudi.common.util.ClusteringUtils import org.apache.hudi.exception.HoodieCorruptedDataException import org.apache.log4j.LogManager import org.apache.spark.api.java.JavaSparkContext @@ -52,6 +53,7 @@ class HoodieStreamingSink(sqlContext: SQLContext, private val ignoreFailedBatch = options(DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH_OPT_KEY.key).toBoolean private var isAsyncCompactorServiceShutdownAbnormally = false + private var isAsyncClusteringServiceShutdownAbnormally = false private val mode = if (outputMode == OutputMode.Append()) { @@ -61,6 +63,7 @@ class HoodieStreamingSink(sqlContext: SQLContext, } private var asyncCompactorService : AsyncCompactService = _ + private var asyncClusteringService: AsyncClusteringService = _ private var writeClient : Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]] = Option.empty private var hoodieTableConfig : Option[HoodieTableConfig] = Option.empty @@ -68,13 +71,17 @@ class HoodieStreamingSink(sqlContext: SQLContext, if (isAsyncCompactorServiceShutdownAbnormally) { throw new IllegalStateException("Async Compactor shutdown unexpectedly") } + if (isAsyncClusteringServiceShutdownAbnormally) { + log.error("Async clustering service shutdown unexpectedly") + throw new IllegalStateException("Async clustering service shutdown unexpectedly") + } retry(retryCnt, retryIntervalMs)( Try( HoodieSparkSqlWriter.write( - sqlContext, mode, options, data, hoodieTableConfig, writeClient, Some(triggerAsyncCompactor)) + sqlContext, mode, options, data, hoodieTableConfig, writeClient, Some(triggerAsyncCompactor), Some(triggerAsyncClustering)) ) match { - case Success((true, commitOps, compactionInstantOps, client, tableConfig)) => + case Success((true, commitOps, compactionInstantOps, clusteringInstant, client, tableConfig)) => log.info(s"Micro batch id=$batchId succeeded" + (commitOps.isPresent match { case true => s" for commit=${commitOps.get()}" @@ -83,9 +90,14 @@ class HoodieStreamingSink(sqlContext: SQLContext, writeClient = Some(client) hoodieTableConfig = Some(tableConfig) if (compactionInstantOps.isPresent) { - asyncCompactorService.enqueuePendingCompaction( + asyncCompactorService.enqueuePendingAsyncServiceInstant( new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, compactionInstantOps.get())) } + if (clusteringInstant.isPresent) { + asyncClusteringService.enqueuePendingAsyncServiceInstant(new HoodieInstant( + State.REQUESTED, HoodieTimeline.REPLACE_COMMIT_ACTION, clusteringInstant.get() + )) + } Success((true, commitOps, compactionInstantOps)) case Failure(e) => // clean up persist rdds in the write process @@ -107,7 +119,7 @@ class HoodieStreamingSink(sqlContext: SQLContext, if (retryCnt > 1) log.info(s"Retrying the failed micro batch id=$batchId ...") Failure(e) } - case Success((false, commitOps, compactionInstantOps, client, tableConfig)) => + case Success((false, commitOps, compactionInstantOps, clusteringInstant, client, tableConfig)) => log.error(s"Micro batch id=$batchId ended up with errors" + (commitOps.isPresent match { case true => s" for commit=${commitOps.get()}" @@ -179,7 +191,33 @@ class HoodieStreamingSink(sqlContext: SQLContext, .setBasePath(client.getConfig.getBasePath).build() val pendingInstants :java.util.List[HoodieInstant] = CompactionUtils.getPendingCompactionInstantTimes(metaClient) - pendingInstants.foreach((h : HoodieInstant) => asyncCompactorService.enqueuePendingCompaction(h)) + pendingInstants.foreach((h : HoodieInstant) => asyncCompactorService.enqueuePendingAsyncServiceInstant(h)) + } + } + + protected def triggerAsyncClustering(client: SparkRDDWriteClient[HoodieRecordPayload[Nothing]]): Unit = { + if (null == asyncClusteringService) { + log.info("Triggering async clustering!") + asyncClusteringService = new SparkStreamingAsyncClusteringService(client) + asyncClusteringService.start(new Function[java.lang.Boolean, java.lang.Boolean] { + override def apply(errored: lang.Boolean): lang.Boolean = { + log.info(s"Async clustering service shutdown. Errored ? $errored") + isAsyncClusteringServiceShutdownAbnormally = errored + reset(false) + true + } + }) + + // Add Shutdown Hook + Runtime.getRuntime.addShutdownHook(new Thread(new Runnable { + override def run(): Unit = reset(true) + })) + + // First time, scan .hoodie folder and get all pending clustering instants + val metaClient = HoodieTableMetaClient.builder().setConf(sqlContext.sparkContext.hadoopConfiguration) + .setBasePath(client.getConfig.getBasePath).build() + val pendingInstants :java.util.List[HoodieInstant] = ClusteringUtils.getPendingClusteringInstantTimes(metaClient) + pendingInstants.foreach((h : HoodieInstant) => asyncClusteringService.enqueuePendingAsyncServiceInstant(h)) } } @@ -189,6 +227,11 @@ class HoodieStreamingSink(sqlContext: SQLContext, asyncCompactorService = null } + if (asyncClusteringService != null) { + asyncClusteringService.shutdown(force) + asyncClusteringService = null + } + if (writeClient.isDefined) { writeClient.get.close() writeClient = Option.empty diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala index 586e91685b01a..3056103a268ac 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala @@ -76,6 +76,8 @@ object HoodieWriterUtils { HIVE_CREATE_MANAGED_TABLE.key() -> HIVE_CREATE_MANAGED_TABLE.defaultValue.toString, HIVE_SYNC_AS_DATA_SOURCE_TABLE.key() -> HIVE_SYNC_AS_DATA_SOURCE_TABLE.defaultValue(), ASYNC_COMPACT_ENABLE_OPT_KEY.key -> ASYNC_COMPACT_ENABLE_OPT_KEY.defaultValue, + INLINE_CLUSTERING_ENABLE_OPT_KEY.key -> INLINE_CLUSTERING_ENABLE_OPT_KEY.defaultValue, + ASYNC_CLUSTERING_ENABLE_OPT_KEY.key -> ASYNC_CLUSTERING_ENABLE_OPT_KEY.defaultValue, ENABLE_ROW_WRITER_OPT_KEY.key -> ENABLE_ROW_WRITER_OPT_KEY.defaultValue ) ++ DataSourceOptionsHelper.translateConfigurations(parameters) } diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaApp.java b/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaApp.java index d086c2eb00c6b..966ffb0b890f3 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaApp.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaApp.java @@ -156,6 +156,7 @@ public void run() throws Exception { nonPartitionedTable ? NonpartitionedKeyGenerator.class.getCanonicalName() : SimpleKeyGenerator.class.getCanonicalName()) .option(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE_OPT_KEY().key(), "false") + .option(DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE_OPT_KEY().key(), "true") // This will remove any existing data at path below, and create a .mode(SaveMode.Overwrite); @@ -183,6 +184,7 @@ public void run() throws Exception { : SimpleKeyGenerator.class.getCanonicalName()) // Add Key Extractor .option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS_PROP.key(), "1") .option(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE_OPT_KEY().key(), "false") + .option(DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE_OPT_KEY().key(), "true") .option(HoodieWriteConfig.TABLE_NAME.key(), tableName).mode(SaveMode.Append); updateHiveSyncConfig(writer); @@ -210,6 +212,7 @@ public void run() throws Exception { : SimpleKeyGenerator.class.getCanonicalName()) // Add Key Extractor .option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS_PROP.key(), "1") .option(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE_OPT_KEY().key(), "false") + .option(DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE_OPT_KEY().key(), "true") .option(HoodieWriteConfig.TABLE_NAME.key(), tableName).mode(SaveMode.Append); updateHiveSyncConfig(writer); diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaStreamingApp.java b/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaStreamingApp.java index b0f0631033882..75fa91ebabfdc 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaStreamingApp.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaStreamingApp.java @@ -362,6 +362,7 @@ public void stream(Dataset streamingInput, String operationType, String che .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY().key(), "timestamp") .option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS_PROP.key(), "1") .option(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE_OPT_KEY().key(), "true") + .option(DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE_OPT_KEY().key(), "true") .option(HoodieWriteConfig.TABLE_NAME.key(), tableName).option("checkpointLocation", checkpointLocation) .outputMode(OutputMode.Append()); diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala index 42038827db7ff..66cb1ca876365 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala @@ -190,9 +190,13 @@ class TestStructuredStreaming extends HoodieClientTestBase { numInstants } - def getInlineClusteringOpts( isInlineClustering: String, clusteringNumCommit: String, fileMaxRecordNum: Int):Map[String, String] = { + def getClusteringOpts(isInlineClustering: String, isAsyncClustering: String, isAsyncCompaction: String, + clusteringNumCommit: String, fileMaxRecordNum: Int):Map[String, String] = { commonOpts + (HoodieClusteringConfig.INLINE_CLUSTERING_PROP.key -> isInlineClustering, HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMIT_PROP.key -> clusteringNumCommit, + DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE_OPT_KEY.key -> isAsyncClustering, + DataSourceWriteOptions.ASYNC_COMPACT_ENABLE_OPT_KEY.key -> isAsyncCompaction, + HoodieClusteringConfig.ASYNC_CLUSTERING_MAX_COMMIT_PROP.key -> clusteringNumCommit, HoodieStorageConfig.PARQUET_FILE_MAX_BYTES.key -> dataGen.getEstimatedFileSizeInBytes(fileMaxRecordNum).toString ) } @@ -207,12 +211,40 @@ class TestStructuredStreaming extends HoodieClientTestBase { metaClient.reloadActiveTimeline() assertEquals(1, getLatestFileGroupsFileId(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).size) } - structuredStreamingForTestClusteringRunner(sourcePath, destPath, true, + structuredStreamingForTestClusteringRunner(sourcePath, destPath, true, false, false, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, checkClusteringResult) } @Test - def testStructuredStreamingWithoutInlineClustering(): Unit = { + def testStructuredStreamingWithAsyncClustering(): Unit = { + val (sourcePath, destPath) = initStreamingSourceAndDestPath("source", "dest") + + def checkClusteringResult(destPath: String):Unit = { + // check have schedule clustering and clustering file group to one + waitTillHasCompletedReplaceInstant(destPath, 120, 5) + metaClient.reloadActiveTimeline() + assertEquals(1, getLatestFileGroupsFileId(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).size) + } + structuredStreamingForTestClusteringRunner(sourcePath, destPath, false, true, false, + HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, checkClusteringResult) + } + + @Test + def testStructuredStreamingWithAsyncClusteringAndCompaction(): Unit = { + val (sourcePath, destPath) = initStreamingSourceAndDestPath("source", "dest") + + def checkClusteringResult(destPath: String):Unit = { + // check have schedule clustering and clustering file group to one + waitTillHasCompletedReplaceInstant(destPath, 120, 5) + metaClient.reloadActiveTimeline() + assertEquals(1, getLatestFileGroupsFileId(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).size) + } + structuredStreamingForTestClusteringRunner(sourcePath, destPath, false, true, true, + HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, checkClusteringResult) + } + + @Test + def testStructuredStreamingWithoutClustering(): Unit = { val (sourcePath, destPath) = initStreamingSourceAndDestPath("source", "dest") def checkClusteringResult(destPath: String):Unit = { @@ -224,12 +256,13 @@ class TestStructuredStreaming extends HoodieClientTestBase { }, msg) println(msg) } - structuredStreamingForTestClusteringRunner(sourcePath, destPath, false, + structuredStreamingForTestClusteringRunner(sourcePath, destPath, false, false, false, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, checkClusteringResult) } - def structuredStreamingForTestClusteringRunner(sourcePath: String, destPath: String, - isInlineClustering: Boolean, partitionOfRecords: String, checkClusteringResult: String => Unit): Unit = { + def structuredStreamingForTestClusteringRunner(sourcePath: String, destPath: String, isInlineClustering: Boolean, + isAsyncClustering: Boolean, isAsyncCompaction: Boolean, + partitionOfRecords: String, checkClusteringResult: String => Unit): Unit = { // First insert of data val records1 = recordsToStrings(dataGen.generateInsertsForPartition("000", 100, partitionOfRecords)).toList val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2)) @@ -238,7 +271,8 @@ class TestStructuredStreaming extends HoodieClientTestBase { val records2 = recordsToStrings(dataGen.generateInsertsForPartition("001", 100, partitionOfRecords)).toList val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2)) - val hudiOptions = getInlineClusteringOpts(isInlineClustering.toString, "2", 100) + val hudiOptions = getClusteringOpts(isInlineClustering.toString, isAsyncClustering.toString, + isAsyncCompaction.toString, "2", 100) val f1 = initStreamingWriteFuture(inputDF1.schema, sourcePath, destPath, hudiOptions) val f2 = Future { diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index 2aad344ded63c..7742e8e624bf1 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -40,6 +40,7 @@ import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieClusteringConfig; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodiePayloadConfig; import org.apache.hudi.config.HoodieWriteConfig; @@ -91,8 +92,10 @@ import scala.collection.JavaConversions; import static org.apache.hudi.common.table.HoodieTableConfig.HOODIE_ARCHIVELOG_FOLDER_PROP; +import static org.apache.hudi.config.HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE_OPT_KEY; import static org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_KEY; import static org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_RESET_KEY; +import static org.apache.hudi.config.HoodieClusteringConfig.INLINE_CLUSTERING_PROP; import static org.apache.hudi.config.HoodieCompactionConfig.INLINE_COMPACT_PROP; import static org.apache.hudi.config.HoodieWriteConfig.COMBINE_BEFORE_INSERT_PROP; import static org.apache.hudi.config.HoodieWriteConfig.COMBINE_BEFORE_UPSERT_PROP; @@ -645,6 +648,9 @@ private HoodieWriteConfig getHoodieClientConfig(Schema schema) { .withCompactionConfig(HoodieCompactionConfig.newBuilder().withPayloadClass(cfg.payloadClassName) // Inline compaction is disabled for continuous mode. otherwise enabled for MOR .withInlineCompaction(cfg.isInlineCompactionEnabled()).build()) + .withClusteringConfig(HoodieClusteringConfig.newBuilder() + .withInlineClustering(cfg.isInlineClusteringEnabled()) + .withAsyncClustering(cfg.isAsyncClusteringEnabled()).build()) .withPayloadConfig(HoodiePayloadConfig.newBuilder().withPayloadOrderingField(cfg.sourceOrderingField) .build()) .forTable(cfg.targetTableName) @@ -663,6 +669,10 @@ private HoodieWriteConfig getHoodieClientConfig(Schema schema) { // Validate what deltastreamer assumes of write-config to be really safe ValidationUtils.checkArgument(config.inlineCompactionEnabled() == cfg.isInlineCompactionEnabled(), String.format("%s should be set to %s", INLINE_COMPACT_PROP.key(), cfg.isInlineCompactionEnabled())); + ValidationUtils.checkArgument(config.inlineClusteringEnabled() == cfg.isInlineClusteringEnabled(), + String.format("%s should be set to %s", INLINE_CLUSTERING_PROP.key(), cfg.isInlineClusteringEnabled())); + ValidationUtils.checkArgument(config.isAsyncClusteringEnabled() == cfg.isAsyncClusteringEnabled(), + String.format("%s should be set to %s", ASYNC_CLUSTERING_ENABLE_OPT_KEY.key(), cfg.isAsyncClusteringEnabled())); ValidationUtils.checkArgument(!config.shouldAutoCommit(), String.format("%s should be set to %s", HOODIE_AUTO_COMMIT_PROP.key(), autoCommit)); ValidationUtils.checkArgument(config.shouldCombineBeforeInsert() == cfg.filterDupes, @@ -736,4 +746,14 @@ public Config getCfg() { public Option getCommitTimelineOpt() { return commitTimelineOpt; } + + /** + * Schedule clustering. + * Called from {@link HoodieDeltaStreamer} when async clustering is enabled. + * + * @return Requested clustering instant. + */ + public Option getClusteringInstantOpt() { + return writeClient.scheduleClustering(Option.empty()); + } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java index a1066c7e92034..9734a1d5cfeba 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java @@ -18,8 +18,10 @@ package org.apache.hudi.utilities.deltastreamer; -import org.apache.hudi.async.HoodieAsyncService; +import org.apache.hudi.async.AsyncClusteringService; import org.apache.hudi.async.AsyncCompactService; +import org.apache.hudi.async.HoodieAsyncService; +import org.apache.hudi.async.SparkAsyncClusteringService; import org.apache.hudi.async.SparkAsyncCompactService; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.WriteStatus; @@ -35,15 +37,17 @@ import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstant.State; import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.ClusteringUtils; import org.apache.hudi.common.util.CompactionUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; -import org.apache.hudi.utilities.IdentitySplitter; import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieClusteringConfig; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.hive.HiveSyncTool; import org.apache.hudi.utilities.HiveIncrementalPuller; +import org.apache.hudi.utilities.IdentitySplitter; import org.apache.hudi.utilities.UtilHelpers; import org.apache.hudi.utilities.checkpointing.InitialCheckPointProvider; import org.apache.hudi.utilities.schema.SchemaProvider; @@ -282,6 +286,11 @@ public static class Config implements Serializable { + "outstanding compactions is less than this number") public Integer maxPendingCompactions = 5; + @Parameter(names = {"--max-pending-clustering"}, + description = "Maximum number of outstanding inflight/requested clustering. Delta Sync will not happen unless" + + "outstanding clustering is less than this number") + public Integer maxPendingClustering = 5; + @Parameter(names = {"--continuous"}, description = "Delta Streamer runs in continuous mode running" + " source-fetch -> Transform -> Hudi Write in loop") public Boolean continuousMode = false; @@ -351,6 +360,16 @@ public boolean isInlineCompactionEnabled() { && HoodieTableType.MERGE_ON_READ.equals(HoodieTableType.valueOf(tableType)); } + public boolean isAsyncClusteringEnabled() { + return Boolean.parseBoolean(String.valueOf(UtilHelpers.getConfig(this.configs).getConfig() + .getOrDefault(HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE_OPT_KEY.key(), false))); + } + + public boolean isInlineClusteringEnabled() { + return Boolean.parseBoolean(String.valueOf(UtilHelpers.getConfig(this.configs).getConfig() + .getOrDefault(HoodieClusteringConfig.INLINE_CLUSTERING_PROP.key(), false))); + } + @Override public boolean equals(Object o) { if (this == o) { @@ -376,6 +395,7 @@ public boolean equals(Object o) { && Objects.equals(filterDupes, config.filterDupes) && Objects.equals(enableHiveSync, config.enableHiveSync) && Objects.equals(maxPendingCompactions, config.maxPendingCompactions) + && Objects.equals(maxPendingClustering, config.maxPendingClustering) && Objects.equals(continuousMode, config.continuousMode) && Objects.equals(minSyncIntervalSeconds, config.minSyncIntervalSeconds) && Objects.equals(sparkMaster, config.sparkMaster) @@ -396,7 +416,7 @@ public int hashCode() { baseFileFormat, propsFilePath, configs, sourceClassName, sourceOrderingField, payloadClassName, schemaProviderClassName, transformerClassNames, sourceLimit, operation, filterDupes, - enableHiveSync, maxPendingCompactions, continuousMode, + enableHiveSync, maxPendingCompactions, maxPendingClustering, continuousMode, minSyncIntervalSeconds, sparkMaster, commitOnErrors, deltaSyncSchedulingWeight, compactSchedulingWeight, deltaSyncSchedulingMinShare, compactSchedulingMinShare, forceDisableCompaction, checkpoint, @@ -422,6 +442,7 @@ public String toString() { + ", filterDupes=" + filterDupes + ", enableHiveSync=" + enableHiveSync + ", maxPendingCompactions=" + maxPendingCompactions + + ", maxPendingClustering=" + maxPendingClustering + ", continuousMode=" + continuousMode + ", minSyncIntervalSeconds=" + minSyncIntervalSeconds + ", sparkMaster='" + sparkMaster + '\'' @@ -519,6 +540,11 @@ public static class DeltaSyncService extends HoodieAsyncService { */ private Option asyncCompactService; + /** + * Async clustering service. + */ + private Option asyncClusteringService; + /** * Table Type. */ @@ -535,6 +561,7 @@ public DeltaSyncService(Config cfg, JavaSparkContext jssc, FileSystem fs, Config this.jssc = jssc; this.sparkSession = SparkSession.builder().config(jssc.getConf()).getOrCreate(); this.asyncCompactService = Option.empty(); + this.asyncClusteringService = Option.empty(); if (fs.exists(new Path(cfg.targetBasePath))) { HoodieTableMetaClient meta = @@ -598,9 +625,17 @@ protected Pair startService() { Option, JavaRDD>> scheduledCompactionInstantAndRDD = Option.ofNullable(deltaSync.syncOnce()); if (scheduledCompactionInstantAndRDD.isPresent() && scheduledCompactionInstantAndRDD.get().getLeft().isPresent()) { LOG.info("Enqueuing new pending compaction instant (" + scheduledCompactionInstantAndRDD.get().getLeft() + ")"); - asyncCompactService.get().enqueuePendingCompaction(new HoodieInstant(State.REQUESTED, + asyncCompactService.get().enqueuePendingAsyncServiceInstant(new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, scheduledCompactionInstantAndRDD.get().getLeft().get())); - asyncCompactService.get().waitTillPendingCompactionsReducesTo(cfg.maxPendingCompactions); + asyncCompactService.get().waitTillPendingAsyncServiceInstantsReducesTo(cfg.maxPendingCompactions); + } + if (cfg.isAsyncClusteringEnabled()) { + Option clusteringInstant = deltaSync.getClusteringInstantOpt(); + if (clusteringInstant.isPresent()) { + LOG.info("Scheduled async clustering for instant: " + clusteringInstant.get()); + asyncClusteringService.get().enqueuePendingAsyncServiceInstant(new HoodieInstant(State.REQUESTED, HoodieTimeline.REPLACE_COMMIT_ACTION, clusteringInstant.get())); + asyncClusteringService.get().waitTillPendingAsyncServiceInstantsReducesTo(cfg.maxPendingClustering); + } } long toSleepMs = cfg.minSyncIntervalSeconds * 1000 - (System.currentTimeMillis() - start); if (toSleepMs > 0) { @@ -615,21 +650,25 @@ protected Pair startService() { } } } finally { - shutdownCompactor(error); + shutdownAsyncServices(error); } return true; }, executor), executor); } /** - * Shutdown compactor as DeltaSync is shutdown. + * Shutdown async services like compaction/clustering as DeltaSync is shutdown. */ - private void shutdownCompactor(boolean error) { + private void shutdownAsyncServices(boolean error) { LOG.info("Delta Sync shutdown. Error ?" + error); if (asyncCompactService.isPresent()) { LOG.warn("Gracefully shutting down compactor"); asyncCompactService.get().shutdown(false); } + if (asyncClusteringService.isPresent()) { + LOG.warn("Gracefully shutting down clustering service"); + asyncClusteringService.get().shutdown(false); + } } /** @@ -649,19 +688,43 @@ protected Boolean onInitializingWriteClient(SparkRDDWriteClient writeClient) { HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(new Configuration(jssc.hadoopConfiguration())).setBasePath(cfg.targetBasePath).setLoadActiveTimelineOnLoad(true).build(); List pending = CompactionUtils.getPendingCompactionInstantTimes(meta); - pending.forEach(hoodieInstant -> asyncCompactService.get().enqueuePendingCompaction(hoodieInstant)); + pending.forEach(hoodieInstant -> asyncCompactService.get().enqueuePendingAsyncServiceInstant(hoodieInstant)); asyncCompactService.get().start((error) -> { // Shutdown DeltaSync shutdown(false); return true; }); try { - asyncCompactService.get().waitTillPendingCompactionsReducesTo(cfg.maxPendingCompactions); + asyncCompactService.get().waitTillPendingAsyncServiceInstantsReducesTo(cfg.maxPendingCompactions); } catch (InterruptedException ie) { throw new HoodieException(ie); } } } + // start async clustering if required + if (cfg.isAsyncClusteringEnabled()) { + if (asyncClusteringService.isPresent()) { + asyncClusteringService.get().updateWriteClient(writeClient); + } else { + asyncClusteringService = Option.ofNullable(new SparkAsyncClusteringService(writeClient)); + HoodieTableMetaClient meta = HoodieTableMetaClient.builder() + .setConf(new Configuration(jssc.hadoopConfiguration())) + .setBasePath(cfg.targetBasePath) + .setLoadActiveTimelineOnLoad(true).build(); + List pending = ClusteringUtils.getPendingClusteringInstantTimes(meta); + LOG.info(String.format("Found %d pending clustering instants ", pending.size())); + pending.forEach(hoodieInstant -> asyncClusteringService.get().enqueuePendingAsyncServiceInstant(hoodieInstant)); + asyncClusteringService.get().start((error) -> { + shutdown(false); + return true; + }); + try { + asyncClusteringService.get().waitTillPendingAsyncServiceInstantsReducesTo(cfg.maxPendingClustering); + } catch (InterruptedException e) { + throw new HoodieException(e); + } + } + } return true; } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java index eb534c494410a..ddf03cb49afb1 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java @@ -193,6 +193,7 @@ static void deepCopyConfigs(Config globalConfig, HoodieDeltaStreamer.Config tabl tableConfig.payloadClassName = globalConfig.payloadClassName; tableConfig.forceDisableCompaction = globalConfig.forceDisableCompaction; tableConfig.maxPendingCompactions = globalConfig.maxPendingCompactions; + tableConfig.maxPendingClustering = globalConfig.maxPendingClustering; tableConfig.minSyncIntervalSeconds = globalConfig.minSyncIntervalSeconds; tableConfig.transformerClassNames = globalConfig.transformerClassNames; tableConfig.commitOnErrors = globalConfig.commitOnErrors; @@ -296,6 +297,11 @@ public static class Config implements Serializable { + "outstanding compactions is less than this number") public Integer maxPendingCompactions = 5; + @Parameter(names = {"--max-pending-clustering"}, + description = "Maximum number of outstanding inflight/requested clustering. Delta Sync will not happen unless" + + "outstanding clustering is less than this number") + public Integer maxPendingClustering = 5; + @Parameter(names = {"--continuous"}, description = "Delta Streamer runs in continuous mode running" + " source-fetch -> Transform -> Hudi Write in loop") public Boolean continuousMode = false; diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java index 8a2648dc4073b..642c6664b55a8 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java @@ -38,6 +38,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.config.HoodieClusteringConfig; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.exception.HoodieException; @@ -500,6 +501,14 @@ static void assertAtLeastNCommits(int minExpected, String tablePath, FileSystem int numDeltaCommits = (int) timeline.getInstants().count(); assertTrue(minExpected <= numDeltaCommits, "Got=" + numDeltaCommits + ", exp >=" + minExpected); } + + static void assertAtLeastNReplaceCommits(int minExpected, String tablePath, FileSystem fs) { + HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).setLoadActiveTimelineOnLoad(true).build(); + HoodieTimeline timeline = meta.getActiveTimeline().getCompletedReplaceTimeline(); + LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants().collect(Collectors.toList())); + int numDeltaCommits = (int) timeline.getInstants().count(); + assertTrue(minExpected <= numDeltaCommits, "Got=" + numDeltaCommits + ", exp >=" + minExpected); + } } @Test @@ -987,20 +996,35 @@ public void testInlineClustering() throws Exception { HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT); cfg.continuousMode = true; cfg.tableType = HoodieTableType.MERGE_ON_READ.name(); - cfg.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords)); - cfg.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN_PROP.key())); - cfg.configs.add(String.format("%s=%s", HoodieClusteringConfig.INLINE_CLUSTERING_PROP.key(), "true")); - cfg.configs.add(String.format("%s=%s", HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMIT_PROP.key(), "2")); + cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "true", "2", "", "")); HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc); deltaStreamerTestRunner(ds, cfg, (r) -> { - HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(this.dfs.getConf()).setBasePath(tableBasePath).setLoadActiveTimelineOnLoad(true).build(); - int pendingReplaceSize = metaClient.getActiveTimeline().filterPendingReplaceTimeline().getInstants().toArray().length; - int completeReplaceSize = metaClient.getActiveTimeline().getCompletedReplaceTimeline().getInstants().toArray().length; - LOG.info("PendingReplaceSize=" + pendingReplaceSize + ",completeReplaceSize = " + completeReplaceSize); - return completeReplaceSize > 0; + TestHelpers.assertAtLeastNCommits(2, tableBasePath, dfs); + TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, dfs); + return true; }); - HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(this.dfs.getConf()).setBasePath(tableBasePath).setLoadActiveTimelineOnLoad(true).build(); - assertEquals(1, metaClient.getActiveTimeline().getCompletedReplaceTimeline().getInstants().toArray().length); + } + + private List getAsyncServicesConfigs(int totalRecords, String autoClean, String inlineCluster, + String inlineClusterMaxCommit, String asyncCluster, String asyncClusterMaxCommit) { + List configs = new ArrayList<>(); + configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords)); + if (!StringUtils.isNullOrEmpty(autoClean)) { + configs.add(String.format("%s=%s", HoodieCompactionConfig.AUTO_CLEAN_PROP.key(), autoClean)); + } + if (!StringUtils.isNullOrEmpty(inlineCluster)) { + configs.add(String.format("%s=%s", HoodieClusteringConfig.INLINE_CLUSTERING_PROP.key(), inlineCluster)); + } + if (!StringUtils.isNullOrEmpty(inlineClusterMaxCommit)) { + configs.add(String.format("%s=%s", HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMIT_PROP.key(), inlineClusterMaxCommit)); + } + if (!StringUtils.isNullOrEmpty(asyncCluster)) { + configs.add(String.format("%s=%s", HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE_OPT_KEY.key(), asyncCluster)); + } + if (!StringUtils.isNullOrEmpty(asyncClusterMaxCommit)) { + configs.add(String.format("%s=%s", HoodieClusteringConfig.ASYNC_CLUSTERING_MAX_COMMIT_PROP.key(), asyncClusterMaxCommit)); + } + return configs; } private HoodieClusteringJob.Config buildHoodieClusteringUtilConfig(String basePath, @@ -1023,9 +1047,7 @@ public void testHoodieAsyncClusteringJob() throws Exception { HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT); cfg.continuousMode = true; cfg.tableType = HoodieTableType.COPY_ON_WRITE.name(); - cfg.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords)); - cfg.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN_PROP.key())); - cfg.configs.add(String.format("%s=true", HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE_OPT_KEY.key())); + cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "", "", "true", "")); HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc); deltaStreamerTestRunner(ds, cfg, (r) -> { TestHelpers.assertAtLeastNCommits(2, tableBasePath, dfs); @@ -1049,14 +1071,48 @@ public void testHoodieAsyncClusteringJob() throws Exception { } else { LOG.warn("Schedule clustering failed"); } - HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(this.dfs.getConf()).setBasePath(tableBasePath).setLoadActiveTimelineOnLoad(true).build(); - int pendingReplaceSize = metaClient.getActiveTimeline().filterPendingReplaceTimeline().getInstants().toArray().length; - int completeReplaceSize = metaClient.getActiveTimeline().getCompletedReplaceTimeline().getInstants().toArray().length; - System.out.println("PendingReplaceSize=" + pendingReplaceSize + ",completeReplaceSize = " + completeReplaceSize); - return completeReplaceSize > 0; + TestHelpers.assertAtLeastNReplaceCommits(2, tableBasePath, dfs); + return true; + }); + } + + @Test + public void testAsyncClusteringService() throws Exception { + String tableBasePath = dfsBasePath + "/asyncClustering"; + // Keep it higher than batch-size to test continuous mode + int totalRecords = 3000; + + // Initial bulk insert + HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT); + cfg.continuousMode = true; + cfg.tableType = HoodieTableType.COPY_ON_WRITE.name(); + cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "", "", "true", "2")); + HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc); + deltaStreamerTestRunner(ds, cfg, (r) -> { + TestHelpers.assertAtLeastNCommits(2, tableBasePath, dfs); + TestHelpers.assertAtLeastNReplaceCommits(2, tableBasePath, dfs); + return true; + }); + } + + @Test + public void testAsyncClusteringServiceWithCompaction() throws Exception { + String tableBasePath = dfsBasePath + "/asyncClusteringCompaction"; + // Keep it higher than batch-size to test continuous mode + int totalRecords = 3000; + + // Initial bulk insert + HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT); + cfg.continuousMode = true; + cfg.tableType = HoodieTableType.MERGE_ON_READ.name(); + cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "", "", "true", "2")); + HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc); + deltaStreamerTestRunner(ds, cfg, (r) -> { + TestHelpers.assertAtLeastNCommits(2, tableBasePath, dfs); + TestHelpers.assertAtleastNCompactionCommits(2, tableBasePath, dfs); + TestHelpers.assertAtLeastNReplaceCommits(2, tableBasePath, dfs); + return true; }); - HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(this.dfs.getConf()).setBasePath(tableBasePath).setLoadActiveTimelineOnLoad(true).build(); - assertEquals(1, metaClient.getActiveTimeline().getCompletedReplaceTimeline().getInstants().toArray().length); } /**