diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncClusteringService.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncClusteringService.java new file mode 100644 index 0000000000000..b9707bb6d82a7 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncClusteringService.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.async; + +import org.apache.hudi.client.AbstractClusteringClient; +import org.apache.hudi.client.AbstractHoodieWriteClient; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.exception.HoodieIOException; + +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.stream.IntStream; + +/** + * Async clustering service that runs in a separate thread. + * Currently, only one clustering thread is allowed to run at any time. + */ +public abstract class AsyncClusteringService extends HoodieAsyncService { + + private static final long serialVersionUID = 1L; + private static final Logger LOG = LogManager.getLogger(AsyncClusteringService.class); + + private final int maxConcurrentClustering; + private transient AbstractClusteringClient clusteringClient; + + public AsyncClusteringService(AbstractHoodieWriteClient writeClient) { + this(writeClient, false); + } + + public AsyncClusteringService(AbstractHoodieWriteClient writeClient, boolean runInDaemonMode) { + super(runInDaemonMode); + this.clusteringClient = createClusteringClient(writeClient); + this.maxConcurrentClustering = 1; + } + + protected abstract AbstractClusteringClient createClusteringClient(AbstractHoodieWriteClient client); + + /** + * Start clustering service. + */ + @Override + protected Pair startService() { + ExecutorService executor = Executors.newFixedThreadPool(maxConcurrentClustering, + r -> { + Thread t = new Thread(r, "async_clustering_thread"); + t.setDaemon(isRunInDaemonMode()); + return t; + }); + + return Pair.of(CompletableFuture.allOf(IntStream.range(0, maxConcurrentClustering).mapToObj(i -> CompletableFuture.supplyAsync(() -> { + try { + while (!isShutdownRequested()) { + final HoodieInstant instant = fetchNextAsyncServiceInstant(); + if (null != instant) { + LOG.info("Starting clustering for instant " + instant); + clusteringClient.cluster(instant); + LOG.info("Finished clustering for instant " + instant); + } + } + LOG.info("Clustering executor shutting down properly"); + } catch (InterruptedException ie) { + LOG.warn("Clustering executor got interrupted exception! Stopping", ie); + } catch (IOException e) { + LOG.error("Clustering executor failed", e); + throw new HoodieIOException(e.getMessage(), e); + } + return true; + }, executor)).toArray(CompletableFuture[]::new)), executor); + } + + /** + * Update the write client to be used for clustering. + */ + public synchronized void updateWriteClient(AbstractHoodieWriteClient writeClient) { + this.clusteringClient.updateWriteClient(writeClient); + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncCompactService.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncCompactService.java index 1bb5daa528c79..2f63297210e14 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncCompactService.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncCompactService.java @@ -24,18 +24,14 @@ import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieIOException; + import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import java.io.IOException; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; import java.util.stream.IntStream; /** @@ -54,9 +50,6 @@ public abstract class AsyncCompactService extends HoodieAsyncService { private final int maxConcurrentCompaction; private transient AbstractCompactor compactor; protected transient HoodieEngineContext context; - private transient BlockingQueue pendingCompactions = new LinkedBlockingQueue<>(); - private transient ReentrantLock queueLock = new ReentrantLock(); - private transient Condition consumed = queueLock.newCondition(); public AsyncCompactService(HoodieEngineContext context, AbstractHoodieWriteClient client) { this(context, client, false); @@ -71,51 +64,6 @@ public AsyncCompactService(HoodieEngineContext context, AbstractHoodieWriteClien protected abstract AbstractCompactor createCompactor(AbstractHoodieWriteClient client); - /** - * Enqueues new Pending compaction. - */ - public void enqueuePendingCompaction(HoodieInstant instant) { - pendingCompactions.add(instant); - } - - /** - * Wait till outstanding pending compactions reduces to the passed in value. - * - * @param numPendingCompactions Maximum pending compactions allowed - * @throws InterruptedException - */ - public void waitTillPendingCompactionsReducesTo(int numPendingCompactions) throws InterruptedException { - try { - queueLock.lock(); - while (!isShutdown() && (pendingCompactions.size() > numPendingCompactions)) { - consumed.await(); - } - } finally { - queueLock.unlock(); - } - } - - /** - * Fetch Next pending compaction if available. - * - * @return - * @throws InterruptedException - */ - private HoodieInstant fetchNextCompactionInstant() throws InterruptedException { - LOG.info("Compactor waiting for next instant for compaction upto 60 seconds"); - HoodieInstant instant = pendingCompactions.poll(10, TimeUnit.SECONDS); - if (instant != null) { - try { - queueLock.lock(); - // Signal waiting thread - consumed.signal(); - } finally { - queueLock.unlock(); - } - } - return instant; - } - /** * Start Compaction Service. */ @@ -134,7 +82,7 @@ protected Pair startService() { context.setProperty(EngineProperty.COMPACTION_POOL_NAME, COMPACT_POOL_NAME); while (!isShutdownRequested()) { - final HoodieInstant instant = fetchNextCompactionInstant(); + final HoodieInstant instant = fetchNextAsyncServiceInstant(); if (null != instant) { LOG.info("Starting Compaction for instant " + instant); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncService.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncService.java index 32dd0427255f0..85e008199adca 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncService.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncService.java @@ -18,21 +18,26 @@ package org.apache.hudi.async; +import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.util.collection.Pair; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import java.io.Serializable; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; /** - * Base Class for running clean/delta-sync/compaction in separate thread and controlling their life-cycle. + * Base Class for running clean/delta-sync/compaction/clustering in separate thread and controlling their life-cycle. */ public abstract class HoodieAsyncService implements Serializable { @@ -50,6 +55,12 @@ public abstract class HoodieAsyncService implements Serializable { private transient CompletableFuture future; // Run in daemon mode private final boolean runInDaemonMode; + // Queue to hold pending compaction/clustering instants + private transient BlockingQueue pendingInstants = new LinkedBlockingQueue<>(); + // Mutex lock for synchronized access to pendingInstants queue + private transient ReentrantLock queueLock = new ReentrantLock(); + // Condition instance to use with the queueLock + private transient Condition consumed = queueLock.newCondition(); protected HoodieAsyncService() { this(false); @@ -165,4 +176,51 @@ private void monitorThreads(Function onShutdownCallback) { public boolean isRunInDaemonMode() { return runInDaemonMode; } + + /** + * Wait till outstanding pending compaction/clustering reduces to the passed in value. + * + * @param numPending Maximum pending compactions/clustering allowed + * @throws InterruptedException + */ + public void waitTillPendingAsyncServiceInstantsReducesTo(int numPending) throws InterruptedException { + try { + queueLock.lock(); + while (!isShutdown() && (pendingInstants.size() > numPending)) { + consumed.await(); + } + } finally { + queueLock.unlock(); + } + } + + /** + * Enqueues new pending clustering instant. + * @param instant {@link HoodieInstant} to enqueue. + */ + public void enqueuePendingAsyncServiceInstant(HoodieInstant instant) { + LOG.info("Enqueuing new pending clustering instant: " + instant.getTimestamp()); + pendingInstants.add(instant); + } + + /** + * Fetch next pending compaction/clustering instant if available. + * + * @return {@link HoodieInstant} corresponding to the next pending compaction/clustering. + * @throws InterruptedException + */ + HoodieInstant fetchNextAsyncServiceInstant() throws InterruptedException { + LOG.info("Waiting for next instant upto 10 seconds"); + HoodieInstant instant = pendingInstants.poll(10, TimeUnit.SECONDS); + if (instant != null) { + try { + queueLock.lock(); + // Signal waiting thread + consumed.signal(); + } finally { + queueLock.unlock(); + } + } + return instant; + } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractClusteringClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractClusteringClient.java new file mode 100644 index 0000000000000..34234f546ed19 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractClusteringClient.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.client; + +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.table.timeline.HoodieInstant; + +import java.io.IOException; +import java.io.Serializable; + +/** + * Client will run one round of clustering. + */ +public abstract class AbstractClusteringClient implements Serializable { + + private static final long serialVersionUID = 1L; + + protected transient AbstractHoodieWriteClient clusteringClient; + + public AbstractClusteringClient(AbstractHoodieWriteClient clusteringClient) { + this.clusteringClient = clusteringClient; + } + + /** + * Run clustering for the instant. + * @param instant + * @throws IOException + */ + public abstract void cluster(HoodieInstant instant) throws IOException; + + /** + * Update the write client used by async clustering. + * @param writeClient + */ + public void updateWriteClient(AbstractHoodieWriteClient writeClient) { + this.clusteringClient = writeClient; + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java index 86d26d3b2e6b0..a750fa23c511f 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java @@ -53,7 +53,13 @@ public class HoodieClusteringConfig extends HoodieConfig { .key("hoodie.clustering.inline.max.commits") .defaultValue("4") .sinceVersion("0.7.0") - .withDocumentation("Config to control frequency of clustering"); + .withDocumentation("Config to control frequency of inline clustering"); + + public static final ConfigProperty ASYNC_CLUSTERING_MAX_COMMIT_PROP = ConfigProperty + .key("hoodie.clustering.async.max.commits") + .defaultValue("4") + .sinceVersion("0.9.0") + .withDocumentation("Config to control frequency of async clustering"); // Any strategy specific params can be saved with this prefix public static final String CLUSTERING_STRATEGY_PARAM_PREFIX = "hoodie.clustering.plan.strategy."; @@ -177,6 +183,11 @@ public Builder withInlineClusteringNumCommits(int numCommits) { return this; } + public Builder withAsyncClusteringMaxCommits(int numCommits) { + clusteringConfig.setValue(ASYNC_CLUSTERING_MAX_COMMIT_PROP, String.valueOf(numCommits)); + return this; + } + public Builder fromProperties(Properties props) { this.clusteringConfig.getProps().putAll(props); return this; diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 1573c7fbfcd03..20d2846a7a7c5 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -677,6 +677,10 @@ public int getInlineClusterMaxCommits() { return getInt(HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMIT_PROP); } + public int getAsyncClusterMaxCommits() { + return getInt(HoodieClusteringConfig.ASYNC_CLUSTERING_MAX_COMMIT_PROP); + } + public String getPayloadClass() { return getString(HoodieCompactionConfig.PAYLOAD_CLASS_PROP); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/async/SparkAsyncClusteringService.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/async/SparkAsyncClusteringService.java new file mode 100644 index 0000000000000..ce436ba034d98 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/async/SparkAsyncClusteringService.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.async; + +import org.apache.hudi.client.AbstractClusteringClient; +import org.apache.hudi.client.AbstractHoodieWriteClient; +import org.apache.hudi.client.HoodieSparkClusteringClient; + +/** + * Async clustering service for Spark datasource. + */ +public class SparkAsyncClusteringService extends AsyncClusteringService { + + public SparkAsyncClusteringService(AbstractHoodieWriteClient writeClient) { + super(writeClient); + } + + @Override + protected AbstractClusteringClient createClusteringClient(AbstractHoodieWriteClient client) { + return new HoodieSparkClusteringClient(client); + } +} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieSparkClusteringClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieSparkClusteringClient.java new file mode 100644 index 0000000000000..884b555447897 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieSparkClusteringClient.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.client; + +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.table.timeline.HoodieInstant; + +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaRDD; + +import java.io.IOException; + +/** + * Async clustering client for Spark datasource. + */ +public class HoodieSparkClusteringClient extends + AbstractClusteringClient>, JavaRDD, JavaRDD> { + + private static final Logger LOG = LogManager.getLogger(HoodieSparkClusteringClient.class); + + public HoodieSparkClusteringClient( + AbstractHoodieWriteClient>, JavaRDD, JavaRDD> clusteringClient) { + super(clusteringClient); + } + + @Override + public void cluster(HoodieInstant instant) throws IOException { + LOG.info("Executing clustering instance " + instant); + SparkRDDWriteClient writeClient = (SparkRDDWriteClient) clusteringClient; + JavaRDD res = writeClient.cluster(instant.getTimestamp(), true).getWriteStatuses(); + if (res != null && res.collect().stream().anyMatch(WriteStatus::hasErrors)) { + // TODO: Should we treat this fatal and throw exception? + LOG.error("Clustering for instant (" + instant + ") failed with write errors"); + } + } +} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkClusteringPlanActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkClusteringPlanActionExecutor.java index 1f71aa4dfac7c..683d852131f6d 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkClusteringPlanActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkClusteringPlanActionExecutor.java @@ -58,13 +58,20 @@ protected Option createClusteringPlan() { int commitsSinceLastClustering = table.getActiveTimeline().getCommitsTimeline().filterCompletedInstants() .findInstantsAfter(lastClusteringInstant.map(HoodieInstant::getTimestamp).orElse("0"), Integer.MAX_VALUE) .countInstants(); - if (config.getInlineClusterMaxCommits() > commitsSinceLastClustering) { - LOG.info("Not scheduling clustering as only " + commitsSinceLastClustering + if (config.inlineClusteringEnabled() && config.getInlineClusterMaxCommits() > commitsSinceLastClustering) { + LOG.info("Not scheduling inline clustering as only " + commitsSinceLastClustering + " commits was found since last clustering " + lastClusteringInstant + ". Waiting for " + config.getInlineClusterMaxCommits()); return Option.empty(); } + if (config.isAsyncClusteringEnabled() && config.getAsyncClusterMaxCommits() > commitsSinceLastClustering) { + LOG.info("Not scheduling async clustering as only " + commitsSinceLastClustering + + " commits was found since last clustering " + lastClusteringInstant + ". Waiting for " + + config.getAsyncClusterMaxCommits()); + return Option.empty(); + } + LOG.info("Generating clustering plan for table " + config.getBasePath()); ClusteringPlanStrategy strategy = (ClusteringPlanStrategy) ReflectionUtils.loadClass(config.getClusteringPlanStrategyClass(), table, context, config); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java index 1bf97c68eb69a..0d790be842619 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java @@ -206,4 +206,8 @@ private static Map buildMetrics(List fileSlices) { metrics.put(TOTAL_LOG_FILES, (double) numLogFiles); return metrics; } + + public static List getPendingClusteringInstantTimes(HoodieTableMetaClient metaClient) { + return metaClient.getActiveTimeline().filterPendingReplaceTimeline().getInstants().collect(Collectors.toList()); + } } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java index 352a0ca7bc2cf..469f9c7b490bf 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java +++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java @@ -34,6 +34,7 @@ import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.TablePathUtils; +import org.apache.hudi.config.HoodieClusteringConfig; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodiePayloadConfig; @@ -171,6 +172,8 @@ public static HoodieWriteConfig createHoodieConfig(String schemaStr, String base boolean asyncCompact = Boolean.parseBoolean(parameters.get(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE_OPT_KEY().key())); boolean inlineCompact = !asyncCompact && parameters.get(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY().key()) .equals(DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL()); + boolean asyncClusteringEnabled = Boolean.parseBoolean(parameters.get(DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE_OPT_KEY().key())); + boolean inlineClusteringEnabled = Boolean.parseBoolean(parameters.get(DataSourceWriteOptions.INLINE_CLUSTERING_ENABLE_OPT_KEY().key())); // insert/bulk-insert combining to be true, if filtering for duplicates boolean combineInserts = Boolean.parseBoolean(parameters.get(DataSourceWriteOptions.INSERT_DROP_DUPS_OPT_KEY().key())); HoodieWriteConfig.Builder builder = HoodieWriteConfig.newBuilder() @@ -184,6 +187,9 @@ public static HoodieWriteConfig createHoodieConfig(String schemaStr, String base .withCompactionConfig(HoodieCompactionConfig.newBuilder() .withPayloadClass(parameters.get(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY().key())) .withInlineCompaction(inlineCompact).build()) + .withClusteringConfig(HoodieClusteringConfig.newBuilder() + .withInlineClustering(inlineClusteringEnabled) + .withAsyncClustering(asyncClusteringEnabled).build()) .withPayloadConfig(HoodiePayloadConfig.newBuilder().withPayloadOrderingField(parameters.get(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY().key())) .build()) // override above with Hoodie configs specified as options. diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala index ebb8c6bc4d813..ce3683126c8a0 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -474,6 +474,18 @@ object DataSourceWriteOptions { .defaultValue("true") .withDocumentation("") + val INLINE_CLUSTERING_ENABLE_OPT_KEY: ConfigProperty[String] = ConfigProperty + .key("hoodie.datasource.clustering.inline.enable") + .defaultValue("false") + .sinceVersion("0.9.0") + .withDocumentation("Enable inline clustering. Disabled by default.") + + val ASYNC_CLUSTERING_ENABLE_OPT_KEY: ConfigProperty[String] = ConfigProperty + .key("hoodie.datasource.clustering.async.enable") + .defaultValue("false") + .sinceVersion("0.9.0") + .withDocumentation("Enable asynchronous clustering. Disabled by default.") + val KAFKA_AVRO_VALUE_DESERIALIZER_CLASS: ConfigProperty[String] = ConfigProperty .key("hoodie.deltastreamer.source.kafka.value.deserializer.class") .defaultValue("io.confluent.kafka.serializers.KafkaAvroDeserializer") diff --git a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/async/SparkStreamingAsyncClusteringService.java b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/async/SparkStreamingAsyncClusteringService.java new file mode 100644 index 0000000000000..81d880ee974df --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/async/SparkStreamingAsyncClusteringService.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.async; + +import org.apache.hudi.client.AbstractClusteringClient; +import org.apache.hudi.client.AbstractHoodieWriteClient; +import org.apache.hudi.client.HoodieSparkClusteringClient; + +/** + * Async clustering service for Spark structured streaming. + * Here, async clustering is run in daemon mode to prevent blocking shutting down the Spark application. + */ +public class SparkStreamingAsyncClusteringService extends AsyncClusteringService { + + private static final long serialVersionUID = 1L; + + public SparkStreamingAsyncClusteringService(AbstractHoodieWriteClient writeClient) { + super(writeClient, true); + } + + @Override + protected AbstractClusteringClient createClusteringClient(AbstractHoodieWriteClient client) { + return new HoodieSparkClusteringClient(client); + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 76bc99b250bd6..b290533dd348b 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -62,6 +62,7 @@ object HoodieSparkSqlWriter { private val log = LogManager.getLogger(getClass) private var tableExists: Boolean = false private var asyncCompactionTriggerFnDefined: Boolean = false + private var asyncClusteringTriggerFnDefined: Boolean = false def write(sqlContext: SQLContext, mode: SaveMode, @@ -69,9 +70,10 @@ object HoodieSparkSqlWriter { df: DataFrame, hoodieTableConfigOpt: Option[HoodieTableConfig] = Option.empty, hoodieWriteClient: Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]] = Option.empty, - asyncCompactionTriggerFn: Option[Function1[SparkRDDWriteClient[HoodieRecordPayload[Nothing]], Unit]] = Option.empty + asyncCompactionTriggerFn: Option[Function1[SparkRDDWriteClient[HoodieRecordPayload[Nothing]], Unit]] = Option.empty, + asyncClusteringTriggerFn: Option[Function1[SparkRDDWriteClient[HoodieRecordPayload[Nothing]], Unit]] = Option.empty ) - : (Boolean, common.util.Option[String], common.util.Option[String], + : (Boolean, common.util.Option[String], common.util.Option[String], common.util.Option[String], SparkRDDWriteClient[HoodieRecordPayload[Nothing]], HoodieTableConfig) = { val sparkContext = sqlContext.sparkContext @@ -79,6 +81,7 @@ object HoodieSparkSqlWriter { val hoodieConfig = HoodieWriterUtils.convertMapToHoodieConfig(parameters) val tblNameOp = hoodieConfig.getStringOrThrow(HoodieWriteConfig.TABLE_NAME, s"'${HoodieWriteConfig.TABLE_NAME.key}' must be set.") asyncCompactionTriggerFnDefined = asyncCompactionTriggerFn.isDefined + asyncClusteringTriggerFnDefined = asyncClusteringTriggerFn.isDefined if (path.isEmpty) { throw new HoodieException(s"'path' must be set.") } @@ -112,7 +115,7 @@ object HoodieSparkSqlWriter { if (mode == SaveMode.Ignore && tableExists) { log.warn(s"hoodie table at $basePath already exists. Ignoring & not performing actual writes.") - (false, common.util.Option.empty(), common.util.Option.empty(), hoodieWriteClient.orNull, tableConfig) + (false, common.util.Option.empty(), common.util.Option.empty(), common.util.Option.empty(), hoodieWriteClient.orNull, tableConfig) } else { // Handle various save modes handleSaveModes(mode, basePath, tableConfig, tblName, operation, fs) @@ -140,7 +143,7 @@ object HoodieSparkSqlWriter { operation == WriteOperationType.BULK_INSERT) { val (success, commitTime: common.util.Option[String]) = bulkInsertAsRow(sqlContext, parameters, df, tblName, basePath, path, instantTime) - return (success, commitTime, common.util.Option.empty(), hoodieWriteClient.orNull, tableConfig) + return (success, commitTime, common.util.Option.empty(), common.util.Option.empty(), hoodieWriteClient.orNull, tableConfig) } // scalastyle:on @@ -180,6 +183,10 @@ object HoodieSparkSqlWriter { asyncCompactionTriggerFn.get.apply(client) } + if (isAsyncClusteringEnabled(client, parameters)) { + asyncClusteringTriggerFn.get.apply(client) + } + val hoodieRecords = if (hoodieConfig.getBoolean(INSERT_DROP_DUPS_OPT_KEY)) { DataSourceUtils.dropDuplicates(jsc, hoodieAllIncomingRecords, mapAsJavaMap(parameters)) @@ -219,6 +226,10 @@ object HoodieSparkSqlWriter { asyncCompactionTriggerFn.get.apply(client) } + if (isAsyncClusteringEnabled(client, parameters)) { + asyncClusteringTriggerFn.get.apply(client) + } + // Issue deletes client.startCommitWithTime(instantTime, commitActionType) val writeStatuses = DataSourceUtils.doDeleteOperation(client, hoodieKeysToDelete, instantTime) @@ -226,7 +237,7 @@ object HoodieSparkSqlWriter { } // Check for errors and commit the write. - val (writeSuccessful, compactionInstant) = + val (writeSuccessful, compactionInstant, clusteringInstant) = commitAndPerformPostOperations(sqlContext.sparkSession, df.schema, writeResult, parameters, writeClient, tableConfig, jsc, TableInstantInfo(basePath, instantTime, commitActionType, operation)) @@ -247,7 +258,7 @@ object HoodieSparkSqlWriter { // it's safe to unpersist cached rdds here unpersistRdd(writeResult.getWriteStatuses.rdd) - (writeSuccessful, common.util.Option.ofNullable(instantTime), compactionInstant, writeClient, tableConfig) + (writeSuccessful, common.util.Option.ofNullable(instantTime), compactionInstant, clusteringInstant, writeClient, tableConfig) } } @@ -565,7 +576,7 @@ object HoodieSparkSqlWriter { tableConfig: HoodieTableConfig, jsc: JavaSparkContext, tableInstantInfo: TableInstantInfo - ): (Boolean, common.util.Option[java.lang.String]) = { + ): (Boolean, common.util.Option[java.lang.String], common.util.Option[java.lang.String]) = { if(writeResult.getWriteStatuses.rdd.filter(ws => ws.hasErrors).isEmpty()) { log.info("Proceeding to commit the write.") val metaMap = parameters.filter(kv => @@ -593,14 +604,24 @@ object HoodieSparkSqlWriter { log.info(s"Compaction Scheduled is $compactionInstant") + val asyncClusteringEnabled = isAsyncClusteringEnabled(client, parameters) + val clusteringInstant: common.util.Option[java.lang.String] = + if (asyncClusteringEnabled) { + client.scheduleClustering(common.util.Option.of(new util.HashMap[String, String](mapAsJavaMap(metaMap)))) + } else { + common.util.Option.empty() + } + + log.info(s"Clustering Scheduled is $clusteringInstant") + val metaSyncSuccess = metaSync(spark, HoodieWriterUtils.convertMapToHoodieConfig(parameters), tableInstantInfo.basePath, schema) log.info(s"Is Async Compaction Enabled ? $asyncCompactionEnabled") - if (!asyncCompactionEnabled) { + if (!asyncCompactionEnabled && !asyncClusteringEnabled) { client.close() } - (commitSuccess && metaSyncSuccess, compactionInstant) + (commitSuccess && metaSyncSuccess, compactionInstant, clusteringInstant) } else { log.error(s"${tableInstantInfo.operation} failed with errors") if (log.isTraceEnabled) { @@ -615,7 +636,7 @@ object HoodieSparkSqlWriter { } }) } - (false, common.util.Option.empty()) + (false, common.util.Option.empty(), common.util.Option.empty()) } } @@ -631,6 +652,13 @@ object HoodieSparkSqlWriter { } } + private def isAsyncClusteringEnabled(client: SparkRDDWriteClient[HoodieRecordPayload[Nothing]], + parameters: Map[String, String]) : Boolean = { + log.info(s"Config.asyncClusteringEnabled ? ${client.getConfig.isAsyncClusteringEnabled}") + asyncClusteringTriggerFnDefined && client.getConfig.isAsyncClusteringEnabled && + parameters.get(ASYNC_CLUSTERING_ENABLE_OPT_KEY.key).exists(r => r.toBoolean) + } + private def getHoodieTableConfig(sparkContext: SparkContext, tablePath: String, hoodieTableConfigOpt: Option[HoodieTableConfig]): HoodieTableConfig = { diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala index 61cad3890cc7d..8dfcbc4f1fa0b 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala @@ -19,7 +19,7 @@ package org.apache.hudi import java.lang import java.util.function.Function -import org.apache.hudi.async.{AsyncCompactService, SparkStreamingAsyncCompactService} +import org.apache.hudi.async.{AsyncClusteringService, AsyncCompactService, SparkStreamingAsyncClusteringService, SparkStreamingAsyncCompactService} import org.apache.hudi.client.SparkRDDWriteClient import org.apache.hudi.client.common.HoodieSparkEngineContext import org.apache.hudi.common.model.HoodieRecordPayload @@ -27,6 +27,7 @@ import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} import org.apache.hudi.common.table.timeline.HoodieInstant.State import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline} import org.apache.hudi.common.util.CompactionUtils +import org.apache.hudi.common.util.ClusteringUtils import org.apache.hudi.exception.HoodieCorruptedDataException import org.apache.log4j.LogManager import org.apache.spark.api.java.JavaSparkContext @@ -52,6 +53,7 @@ class HoodieStreamingSink(sqlContext: SQLContext, private val ignoreFailedBatch = options(DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH_OPT_KEY.key).toBoolean private var isAsyncCompactorServiceShutdownAbnormally = false + private var isAsyncClusteringServiceShutdownAbnormally = false private val mode = if (outputMode == OutputMode.Append()) { @@ -61,6 +63,7 @@ class HoodieStreamingSink(sqlContext: SQLContext, } private var asyncCompactorService : AsyncCompactService = _ + private var asyncClusteringService: AsyncClusteringService = _ private var writeClient : Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]] = Option.empty private var hoodieTableConfig : Option[HoodieTableConfig] = Option.empty @@ -68,13 +71,17 @@ class HoodieStreamingSink(sqlContext: SQLContext, if (isAsyncCompactorServiceShutdownAbnormally) { throw new IllegalStateException("Async Compactor shutdown unexpectedly") } + if (isAsyncClusteringServiceShutdownAbnormally) { + log.error("Async clustering service shutdown unexpectedly") + throw new IllegalStateException("Async clustering service shutdown unexpectedly") + } retry(retryCnt, retryIntervalMs)( Try( HoodieSparkSqlWriter.write( - sqlContext, mode, options, data, hoodieTableConfig, writeClient, Some(triggerAsyncCompactor)) + sqlContext, mode, options, data, hoodieTableConfig, writeClient, Some(triggerAsyncCompactor), Some(triggerAsyncClustering)) ) match { - case Success((true, commitOps, compactionInstantOps, client, tableConfig)) => + case Success((true, commitOps, compactionInstantOps, clusteringInstant, client, tableConfig)) => log.info(s"Micro batch id=$batchId succeeded" + (commitOps.isPresent match { case true => s" for commit=${commitOps.get()}" @@ -83,9 +90,14 @@ class HoodieStreamingSink(sqlContext: SQLContext, writeClient = Some(client) hoodieTableConfig = Some(tableConfig) if (compactionInstantOps.isPresent) { - asyncCompactorService.enqueuePendingCompaction( + asyncCompactorService.enqueuePendingAsyncServiceInstant( new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, compactionInstantOps.get())) } + if (clusteringInstant.isPresent) { + asyncClusteringService.enqueuePendingAsyncServiceInstant(new HoodieInstant( + State.REQUESTED, HoodieTimeline.REPLACE_COMMIT_ACTION, clusteringInstant.get() + )) + } Success((true, commitOps, compactionInstantOps)) case Failure(e) => // clean up persist rdds in the write process @@ -107,7 +119,7 @@ class HoodieStreamingSink(sqlContext: SQLContext, if (retryCnt > 1) log.info(s"Retrying the failed micro batch id=$batchId ...") Failure(e) } - case Success((false, commitOps, compactionInstantOps, client, tableConfig)) => + case Success((false, commitOps, compactionInstantOps, clusteringInstant, client, tableConfig)) => log.error(s"Micro batch id=$batchId ended up with errors" + (commitOps.isPresent match { case true => s" for commit=${commitOps.get()}" @@ -179,7 +191,33 @@ class HoodieStreamingSink(sqlContext: SQLContext, .setBasePath(client.getConfig.getBasePath).build() val pendingInstants :java.util.List[HoodieInstant] = CompactionUtils.getPendingCompactionInstantTimes(metaClient) - pendingInstants.foreach((h : HoodieInstant) => asyncCompactorService.enqueuePendingCompaction(h)) + pendingInstants.foreach((h : HoodieInstant) => asyncCompactorService.enqueuePendingAsyncServiceInstant(h)) + } + } + + protected def triggerAsyncClustering(client: SparkRDDWriteClient[HoodieRecordPayload[Nothing]]): Unit = { + if (null == asyncClusteringService) { + log.info("Triggering async clustering!") + asyncClusteringService = new SparkStreamingAsyncClusteringService(client) + asyncClusteringService.start(new Function[java.lang.Boolean, java.lang.Boolean] { + override def apply(errored: lang.Boolean): lang.Boolean = { + log.info(s"Async clustering service shutdown. Errored ? $errored") + isAsyncClusteringServiceShutdownAbnormally = errored + reset(false) + true + } + }) + + // Add Shutdown Hook + Runtime.getRuntime.addShutdownHook(new Thread(new Runnable { + override def run(): Unit = reset(true) + })) + + // First time, scan .hoodie folder and get all pending clustering instants + val metaClient = HoodieTableMetaClient.builder().setConf(sqlContext.sparkContext.hadoopConfiguration) + .setBasePath(client.getConfig.getBasePath).build() + val pendingInstants :java.util.List[HoodieInstant] = ClusteringUtils.getPendingClusteringInstantTimes(metaClient) + pendingInstants.foreach((h : HoodieInstant) => asyncClusteringService.enqueuePendingAsyncServiceInstant(h)) } } @@ -189,6 +227,11 @@ class HoodieStreamingSink(sqlContext: SQLContext, asyncCompactorService = null } + if (asyncClusteringService != null) { + asyncClusteringService.shutdown(force) + asyncClusteringService = null + } + if (writeClient.isDefined) { writeClient.get.close() writeClient = Option.empty diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala index 586e91685b01a..3056103a268ac 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala @@ -76,6 +76,8 @@ object HoodieWriterUtils { HIVE_CREATE_MANAGED_TABLE.key() -> HIVE_CREATE_MANAGED_TABLE.defaultValue.toString, HIVE_SYNC_AS_DATA_SOURCE_TABLE.key() -> HIVE_SYNC_AS_DATA_SOURCE_TABLE.defaultValue(), ASYNC_COMPACT_ENABLE_OPT_KEY.key -> ASYNC_COMPACT_ENABLE_OPT_KEY.defaultValue, + INLINE_CLUSTERING_ENABLE_OPT_KEY.key -> INLINE_CLUSTERING_ENABLE_OPT_KEY.defaultValue, + ASYNC_CLUSTERING_ENABLE_OPT_KEY.key -> ASYNC_CLUSTERING_ENABLE_OPT_KEY.defaultValue, ENABLE_ROW_WRITER_OPT_KEY.key -> ENABLE_ROW_WRITER_OPT_KEY.defaultValue ) ++ DataSourceOptionsHelper.translateConfigurations(parameters) } diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaApp.java b/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaApp.java index d086c2eb00c6b..966ffb0b890f3 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaApp.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaApp.java @@ -156,6 +156,7 @@ public void run() throws Exception { nonPartitionedTable ? NonpartitionedKeyGenerator.class.getCanonicalName() : SimpleKeyGenerator.class.getCanonicalName()) .option(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE_OPT_KEY().key(), "false") + .option(DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE_OPT_KEY().key(), "true") // This will remove any existing data at path below, and create a .mode(SaveMode.Overwrite); @@ -183,6 +184,7 @@ public void run() throws Exception { : SimpleKeyGenerator.class.getCanonicalName()) // Add Key Extractor .option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS_PROP.key(), "1") .option(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE_OPT_KEY().key(), "false") + .option(DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE_OPT_KEY().key(), "true") .option(HoodieWriteConfig.TABLE_NAME.key(), tableName).mode(SaveMode.Append); updateHiveSyncConfig(writer); @@ -210,6 +212,7 @@ public void run() throws Exception { : SimpleKeyGenerator.class.getCanonicalName()) // Add Key Extractor .option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS_PROP.key(), "1") .option(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE_OPT_KEY().key(), "false") + .option(DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE_OPT_KEY().key(), "true") .option(HoodieWriteConfig.TABLE_NAME.key(), tableName).mode(SaveMode.Append); updateHiveSyncConfig(writer); diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaStreamingApp.java b/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaStreamingApp.java index b0f0631033882..75fa91ebabfdc 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaStreamingApp.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaStreamingApp.java @@ -362,6 +362,7 @@ public void stream(Dataset streamingInput, String operationType, String che .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY().key(), "timestamp") .option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS_PROP.key(), "1") .option(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE_OPT_KEY().key(), "true") + .option(DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE_OPT_KEY().key(), "true") .option(HoodieWriteConfig.TABLE_NAME.key(), tableName).option("checkpointLocation", checkpointLocation) .outputMode(OutputMode.Append()); diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala index 42038827db7ff..66cb1ca876365 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala @@ -190,9 +190,13 @@ class TestStructuredStreaming extends HoodieClientTestBase { numInstants } - def getInlineClusteringOpts( isInlineClustering: String, clusteringNumCommit: String, fileMaxRecordNum: Int):Map[String, String] = { + def getClusteringOpts(isInlineClustering: String, isAsyncClustering: String, isAsyncCompaction: String, + clusteringNumCommit: String, fileMaxRecordNum: Int):Map[String, String] = { commonOpts + (HoodieClusteringConfig.INLINE_CLUSTERING_PROP.key -> isInlineClustering, HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMIT_PROP.key -> clusteringNumCommit, + DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE_OPT_KEY.key -> isAsyncClustering, + DataSourceWriteOptions.ASYNC_COMPACT_ENABLE_OPT_KEY.key -> isAsyncCompaction, + HoodieClusteringConfig.ASYNC_CLUSTERING_MAX_COMMIT_PROP.key -> clusteringNumCommit, HoodieStorageConfig.PARQUET_FILE_MAX_BYTES.key -> dataGen.getEstimatedFileSizeInBytes(fileMaxRecordNum).toString ) } @@ -207,12 +211,40 @@ class TestStructuredStreaming extends HoodieClientTestBase { metaClient.reloadActiveTimeline() assertEquals(1, getLatestFileGroupsFileId(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).size) } - structuredStreamingForTestClusteringRunner(sourcePath, destPath, true, + structuredStreamingForTestClusteringRunner(sourcePath, destPath, true, false, false, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, checkClusteringResult) } @Test - def testStructuredStreamingWithoutInlineClustering(): Unit = { + def testStructuredStreamingWithAsyncClustering(): Unit = { + val (sourcePath, destPath) = initStreamingSourceAndDestPath("source", "dest") + + def checkClusteringResult(destPath: String):Unit = { + // check have schedule clustering and clustering file group to one + waitTillHasCompletedReplaceInstant(destPath, 120, 5) + metaClient.reloadActiveTimeline() + assertEquals(1, getLatestFileGroupsFileId(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).size) + } + structuredStreamingForTestClusteringRunner(sourcePath, destPath, false, true, false, + HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, checkClusteringResult) + } + + @Test + def testStructuredStreamingWithAsyncClusteringAndCompaction(): Unit = { + val (sourcePath, destPath) = initStreamingSourceAndDestPath("source", "dest") + + def checkClusteringResult(destPath: String):Unit = { + // check have schedule clustering and clustering file group to one + waitTillHasCompletedReplaceInstant(destPath, 120, 5) + metaClient.reloadActiveTimeline() + assertEquals(1, getLatestFileGroupsFileId(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).size) + } + structuredStreamingForTestClusteringRunner(sourcePath, destPath, false, true, true, + HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, checkClusteringResult) + } + + @Test + def testStructuredStreamingWithoutClustering(): Unit = { val (sourcePath, destPath) = initStreamingSourceAndDestPath("source", "dest") def checkClusteringResult(destPath: String):Unit = { @@ -224,12 +256,13 @@ class TestStructuredStreaming extends HoodieClientTestBase { }, msg) println(msg) } - structuredStreamingForTestClusteringRunner(sourcePath, destPath, false, + structuredStreamingForTestClusteringRunner(sourcePath, destPath, false, false, false, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, checkClusteringResult) } - def structuredStreamingForTestClusteringRunner(sourcePath: String, destPath: String, - isInlineClustering: Boolean, partitionOfRecords: String, checkClusteringResult: String => Unit): Unit = { + def structuredStreamingForTestClusteringRunner(sourcePath: String, destPath: String, isInlineClustering: Boolean, + isAsyncClustering: Boolean, isAsyncCompaction: Boolean, + partitionOfRecords: String, checkClusteringResult: String => Unit): Unit = { // First insert of data val records1 = recordsToStrings(dataGen.generateInsertsForPartition("000", 100, partitionOfRecords)).toList val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2)) @@ -238,7 +271,8 @@ class TestStructuredStreaming extends HoodieClientTestBase { val records2 = recordsToStrings(dataGen.generateInsertsForPartition("001", 100, partitionOfRecords)).toList val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2)) - val hudiOptions = getInlineClusteringOpts(isInlineClustering.toString, "2", 100) + val hudiOptions = getClusteringOpts(isInlineClustering.toString, isAsyncClustering.toString, + isAsyncCompaction.toString, "2", 100) val f1 = initStreamingWriteFuture(inputDF1.schema, sourcePath, destPath, hudiOptions) val f2 = Future { diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index 2aad344ded63c..7742e8e624bf1 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -40,6 +40,7 @@ import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieClusteringConfig; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodiePayloadConfig; import org.apache.hudi.config.HoodieWriteConfig; @@ -91,8 +92,10 @@ import scala.collection.JavaConversions; import static org.apache.hudi.common.table.HoodieTableConfig.HOODIE_ARCHIVELOG_FOLDER_PROP; +import static org.apache.hudi.config.HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE_OPT_KEY; import static org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_KEY; import static org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_RESET_KEY; +import static org.apache.hudi.config.HoodieClusteringConfig.INLINE_CLUSTERING_PROP; import static org.apache.hudi.config.HoodieCompactionConfig.INLINE_COMPACT_PROP; import static org.apache.hudi.config.HoodieWriteConfig.COMBINE_BEFORE_INSERT_PROP; import static org.apache.hudi.config.HoodieWriteConfig.COMBINE_BEFORE_UPSERT_PROP; @@ -645,6 +648,9 @@ private HoodieWriteConfig getHoodieClientConfig(Schema schema) { .withCompactionConfig(HoodieCompactionConfig.newBuilder().withPayloadClass(cfg.payloadClassName) // Inline compaction is disabled for continuous mode. otherwise enabled for MOR .withInlineCompaction(cfg.isInlineCompactionEnabled()).build()) + .withClusteringConfig(HoodieClusteringConfig.newBuilder() + .withInlineClustering(cfg.isInlineClusteringEnabled()) + .withAsyncClustering(cfg.isAsyncClusteringEnabled()).build()) .withPayloadConfig(HoodiePayloadConfig.newBuilder().withPayloadOrderingField(cfg.sourceOrderingField) .build()) .forTable(cfg.targetTableName) @@ -663,6 +669,10 @@ private HoodieWriteConfig getHoodieClientConfig(Schema schema) { // Validate what deltastreamer assumes of write-config to be really safe ValidationUtils.checkArgument(config.inlineCompactionEnabled() == cfg.isInlineCompactionEnabled(), String.format("%s should be set to %s", INLINE_COMPACT_PROP.key(), cfg.isInlineCompactionEnabled())); + ValidationUtils.checkArgument(config.inlineClusteringEnabled() == cfg.isInlineClusteringEnabled(), + String.format("%s should be set to %s", INLINE_CLUSTERING_PROP.key(), cfg.isInlineClusteringEnabled())); + ValidationUtils.checkArgument(config.isAsyncClusteringEnabled() == cfg.isAsyncClusteringEnabled(), + String.format("%s should be set to %s", ASYNC_CLUSTERING_ENABLE_OPT_KEY.key(), cfg.isAsyncClusteringEnabled())); ValidationUtils.checkArgument(!config.shouldAutoCommit(), String.format("%s should be set to %s", HOODIE_AUTO_COMMIT_PROP.key(), autoCommit)); ValidationUtils.checkArgument(config.shouldCombineBeforeInsert() == cfg.filterDupes, @@ -736,4 +746,14 @@ public Config getCfg() { public Option getCommitTimelineOpt() { return commitTimelineOpt; } + + /** + * Schedule clustering. + * Called from {@link HoodieDeltaStreamer} when async clustering is enabled. + * + * @return Requested clustering instant. + */ + public Option getClusteringInstantOpt() { + return writeClient.scheduleClustering(Option.empty()); + } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java index a1066c7e92034..9734a1d5cfeba 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java @@ -18,8 +18,10 @@ package org.apache.hudi.utilities.deltastreamer; -import org.apache.hudi.async.HoodieAsyncService; +import org.apache.hudi.async.AsyncClusteringService; import org.apache.hudi.async.AsyncCompactService; +import org.apache.hudi.async.HoodieAsyncService; +import org.apache.hudi.async.SparkAsyncClusteringService; import org.apache.hudi.async.SparkAsyncCompactService; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.WriteStatus; @@ -35,15 +37,17 @@ import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstant.State; import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.ClusteringUtils; import org.apache.hudi.common.util.CompactionUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; -import org.apache.hudi.utilities.IdentitySplitter; import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieClusteringConfig; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.hive.HiveSyncTool; import org.apache.hudi.utilities.HiveIncrementalPuller; +import org.apache.hudi.utilities.IdentitySplitter; import org.apache.hudi.utilities.UtilHelpers; import org.apache.hudi.utilities.checkpointing.InitialCheckPointProvider; import org.apache.hudi.utilities.schema.SchemaProvider; @@ -282,6 +286,11 @@ public static class Config implements Serializable { + "outstanding compactions is less than this number") public Integer maxPendingCompactions = 5; + @Parameter(names = {"--max-pending-clustering"}, + description = "Maximum number of outstanding inflight/requested clustering. Delta Sync will not happen unless" + + "outstanding clustering is less than this number") + public Integer maxPendingClustering = 5; + @Parameter(names = {"--continuous"}, description = "Delta Streamer runs in continuous mode running" + " source-fetch -> Transform -> Hudi Write in loop") public Boolean continuousMode = false; @@ -351,6 +360,16 @@ public boolean isInlineCompactionEnabled() { && HoodieTableType.MERGE_ON_READ.equals(HoodieTableType.valueOf(tableType)); } + public boolean isAsyncClusteringEnabled() { + return Boolean.parseBoolean(String.valueOf(UtilHelpers.getConfig(this.configs).getConfig() + .getOrDefault(HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE_OPT_KEY.key(), false))); + } + + public boolean isInlineClusteringEnabled() { + return Boolean.parseBoolean(String.valueOf(UtilHelpers.getConfig(this.configs).getConfig() + .getOrDefault(HoodieClusteringConfig.INLINE_CLUSTERING_PROP.key(), false))); + } + @Override public boolean equals(Object o) { if (this == o) { @@ -376,6 +395,7 @@ public boolean equals(Object o) { && Objects.equals(filterDupes, config.filterDupes) && Objects.equals(enableHiveSync, config.enableHiveSync) && Objects.equals(maxPendingCompactions, config.maxPendingCompactions) + && Objects.equals(maxPendingClustering, config.maxPendingClustering) && Objects.equals(continuousMode, config.continuousMode) && Objects.equals(minSyncIntervalSeconds, config.minSyncIntervalSeconds) && Objects.equals(sparkMaster, config.sparkMaster) @@ -396,7 +416,7 @@ public int hashCode() { baseFileFormat, propsFilePath, configs, sourceClassName, sourceOrderingField, payloadClassName, schemaProviderClassName, transformerClassNames, sourceLimit, operation, filterDupes, - enableHiveSync, maxPendingCompactions, continuousMode, + enableHiveSync, maxPendingCompactions, maxPendingClustering, continuousMode, minSyncIntervalSeconds, sparkMaster, commitOnErrors, deltaSyncSchedulingWeight, compactSchedulingWeight, deltaSyncSchedulingMinShare, compactSchedulingMinShare, forceDisableCompaction, checkpoint, @@ -422,6 +442,7 @@ public String toString() { + ", filterDupes=" + filterDupes + ", enableHiveSync=" + enableHiveSync + ", maxPendingCompactions=" + maxPendingCompactions + + ", maxPendingClustering=" + maxPendingClustering + ", continuousMode=" + continuousMode + ", minSyncIntervalSeconds=" + minSyncIntervalSeconds + ", sparkMaster='" + sparkMaster + '\'' @@ -519,6 +540,11 @@ public static class DeltaSyncService extends HoodieAsyncService { */ private Option asyncCompactService; + /** + * Async clustering service. + */ + private Option asyncClusteringService; + /** * Table Type. */ @@ -535,6 +561,7 @@ public DeltaSyncService(Config cfg, JavaSparkContext jssc, FileSystem fs, Config this.jssc = jssc; this.sparkSession = SparkSession.builder().config(jssc.getConf()).getOrCreate(); this.asyncCompactService = Option.empty(); + this.asyncClusteringService = Option.empty(); if (fs.exists(new Path(cfg.targetBasePath))) { HoodieTableMetaClient meta = @@ -598,9 +625,17 @@ protected Pair startService() { Option, JavaRDD>> scheduledCompactionInstantAndRDD = Option.ofNullable(deltaSync.syncOnce()); if (scheduledCompactionInstantAndRDD.isPresent() && scheduledCompactionInstantAndRDD.get().getLeft().isPresent()) { LOG.info("Enqueuing new pending compaction instant (" + scheduledCompactionInstantAndRDD.get().getLeft() + ")"); - asyncCompactService.get().enqueuePendingCompaction(new HoodieInstant(State.REQUESTED, + asyncCompactService.get().enqueuePendingAsyncServiceInstant(new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, scheduledCompactionInstantAndRDD.get().getLeft().get())); - asyncCompactService.get().waitTillPendingCompactionsReducesTo(cfg.maxPendingCompactions); + asyncCompactService.get().waitTillPendingAsyncServiceInstantsReducesTo(cfg.maxPendingCompactions); + } + if (cfg.isAsyncClusteringEnabled()) { + Option clusteringInstant = deltaSync.getClusteringInstantOpt(); + if (clusteringInstant.isPresent()) { + LOG.info("Scheduled async clustering for instant: " + clusteringInstant.get()); + asyncClusteringService.get().enqueuePendingAsyncServiceInstant(new HoodieInstant(State.REQUESTED, HoodieTimeline.REPLACE_COMMIT_ACTION, clusteringInstant.get())); + asyncClusteringService.get().waitTillPendingAsyncServiceInstantsReducesTo(cfg.maxPendingClustering); + } } long toSleepMs = cfg.minSyncIntervalSeconds * 1000 - (System.currentTimeMillis() - start); if (toSleepMs > 0) { @@ -615,21 +650,25 @@ protected Pair startService() { } } } finally { - shutdownCompactor(error); + shutdownAsyncServices(error); } return true; }, executor), executor); } /** - * Shutdown compactor as DeltaSync is shutdown. + * Shutdown async services like compaction/clustering as DeltaSync is shutdown. */ - private void shutdownCompactor(boolean error) { + private void shutdownAsyncServices(boolean error) { LOG.info("Delta Sync shutdown. Error ?" + error); if (asyncCompactService.isPresent()) { LOG.warn("Gracefully shutting down compactor"); asyncCompactService.get().shutdown(false); } + if (asyncClusteringService.isPresent()) { + LOG.warn("Gracefully shutting down clustering service"); + asyncClusteringService.get().shutdown(false); + } } /** @@ -649,19 +688,43 @@ protected Boolean onInitializingWriteClient(SparkRDDWriteClient writeClient) { HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(new Configuration(jssc.hadoopConfiguration())).setBasePath(cfg.targetBasePath).setLoadActiveTimelineOnLoad(true).build(); List pending = CompactionUtils.getPendingCompactionInstantTimes(meta); - pending.forEach(hoodieInstant -> asyncCompactService.get().enqueuePendingCompaction(hoodieInstant)); + pending.forEach(hoodieInstant -> asyncCompactService.get().enqueuePendingAsyncServiceInstant(hoodieInstant)); asyncCompactService.get().start((error) -> { // Shutdown DeltaSync shutdown(false); return true; }); try { - asyncCompactService.get().waitTillPendingCompactionsReducesTo(cfg.maxPendingCompactions); + asyncCompactService.get().waitTillPendingAsyncServiceInstantsReducesTo(cfg.maxPendingCompactions); } catch (InterruptedException ie) { throw new HoodieException(ie); } } } + // start async clustering if required + if (cfg.isAsyncClusteringEnabled()) { + if (asyncClusteringService.isPresent()) { + asyncClusteringService.get().updateWriteClient(writeClient); + } else { + asyncClusteringService = Option.ofNullable(new SparkAsyncClusteringService(writeClient)); + HoodieTableMetaClient meta = HoodieTableMetaClient.builder() + .setConf(new Configuration(jssc.hadoopConfiguration())) + .setBasePath(cfg.targetBasePath) + .setLoadActiveTimelineOnLoad(true).build(); + List pending = ClusteringUtils.getPendingClusteringInstantTimes(meta); + LOG.info(String.format("Found %d pending clustering instants ", pending.size())); + pending.forEach(hoodieInstant -> asyncClusteringService.get().enqueuePendingAsyncServiceInstant(hoodieInstant)); + asyncClusteringService.get().start((error) -> { + shutdown(false); + return true; + }); + try { + asyncClusteringService.get().waitTillPendingAsyncServiceInstantsReducesTo(cfg.maxPendingClustering); + } catch (InterruptedException e) { + throw new HoodieException(e); + } + } + } return true; } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java index eb534c494410a..ddf03cb49afb1 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java @@ -193,6 +193,7 @@ static void deepCopyConfigs(Config globalConfig, HoodieDeltaStreamer.Config tabl tableConfig.payloadClassName = globalConfig.payloadClassName; tableConfig.forceDisableCompaction = globalConfig.forceDisableCompaction; tableConfig.maxPendingCompactions = globalConfig.maxPendingCompactions; + tableConfig.maxPendingClustering = globalConfig.maxPendingClustering; tableConfig.minSyncIntervalSeconds = globalConfig.minSyncIntervalSeconds; tableConfig.transformerClassNames = globalConfig.transformerClassNames; tableConfig.commitOnErrors = globalConfig.commitOnErrors; @@ -296,6 +297,11 @@ public static class Config implements Serializable { + "outstanding compactions is less than this number") public Integer maxPendingCompactions = 5; + @Parameter(names = {"--max-pending-clustering"}, + description = "Maximum number of outstanding inflight/requested clustering. Delta Sync will not happen unless" + + "outstanding clustering is less than this number") + public Integer maxPendingClustering = 5; + @Parameter(names = {"--continuous"}, description = "Delta Streamer runs in continuous mode running" + " source-fetch -> Transform -> Hudi Write in loop") public Boolean continuousMode = false; diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java index 8a2648dc4073b..642c6664b55a8 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java @@ -38,6 +38,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.config.HoodieClusteringConfig; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.exception.HoodieException; @@ -500,6 +501,14 @@ static void assertAtLeastNCommits(int minExpected, String tablePath, FileSystem int numDeltaCommits = (int) timeline.getInstants().count(); assertTrue(minExpected <= numDeltaCommits, "Got=" + numDeltaCommits + ", exp >=" + minExpected); } + + static void assertAtLeastNReplaceCommits(int minExpected, String tablePath, FileSystem fs) { + HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).setLoadActiveTimelineOnLoad(true).build(); + HoodieTimeline timeline = meta.getActiveTimeline().getCompletedReplaceTimeline(); + LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants().collect(Collectors.toList())); + int numDeltaCommits = (int) timeline.getInstants().count(); + assertTrue(minExpected <= numDeltaCommits, "Got=" + numDeltaCommits + ", exp >=" + minExpected); + } } @Test @@ -987,20 +996,35 @@ public void testInlineClustering() throws Exception { HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT); cfg.continuousMode = true; cfg.tableType = HoodieTableType.MERGE_ON_READ.name(); - cfg.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords)); - cfg.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN_PROP.key())); - cfg.configs.add(String.format("%s=%s", HoodieClusteringConfig.INLINE_CLUSTERING_PROP.key(), "true")); - cfg.configs.add(String.format("%s=%s", HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMIT_PROP.key(), "2")); + cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "true", "2", "", "")); HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc); deltaStreamerTestRunner(ds, cfg, (r) -> { - HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(this.dfs.getConf()).setBasePath(tableBasePath).setLoadActiveTimelineOnLoad(true).build(); - int pendingReplaceSize = metaClient.getActiveTimeline().filterPendingReplaceTimeline().getInstants().toArray().length; - int completeReplaceSize = metaClient.getActiveTimeline().getCompletedReplaceTimeline().getInstants().toArray().length; - LOG.info("PendingReplaceSize=" + pendingReplaceSize + ",completeReplaceSize = " + completeReplaceSize); - return completeReplaceSize > 0; + TestHelpers.assertAtLeastNCommits(2, tableBasePath, dfs); + TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, dfs); + return true; }); - HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(this.dfs.getConf()).setBasePath(tableBasePath).setLoadActiveTimelineOnLoad(true).build(); - assertEquals(1, metaClient.getActiveTimeline().getCompletedReplaceTimeline().getInstants().toArray().length); + } + + private List getAsyncServicesConfigs(int totalRecords, String autoClean, String inlineCluster, + String inlineClusterMaxCommit, String asyncCluster, String asyncClusterMaxCommit) { + List configs = new ArrayList<>(); + configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords)); + if (!StringUtils.isNullOrEmpty(autoClean)) { + configs.add(String.format("%s=%s", HoodieCompactionConfig.AUTO_CLEAN_PROP.key(), autoClean)); + } + if (!StringUtils.isNullOrEmpty(inlineCluster)) { + configs.add(String.format("%s=%s", HoodieClusteringConfig.INLINE_CLUSTERING_PROP.key(), inlineCluster)); + } + if (!StringUtils.isNullOrEmpty(inlineClusterMaxCommit)) { + configs.add(String.format("%s=%s", HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMIT_PROP.key(), inlineClusterMaxCommit)); + } + if (!StringUtils.isNullOrEmpty(asyncCluster)) { + configs.add(String.format("%s=%s", HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE_OPT_KEY.key(), asyncCluster)); + } + if (!StringUtils.isNullOrEmpty(asyncClusterMaxCommit)) { + configs.add(String.format("%s=%s", HoodieClusteringConfig.ASYNC_CLUSTERING_MAX_COMMIT_PROP.key(), asyncClusterMaxCommit)); + } + return configs; } private HoodieClusteringJob.Config buildHoodieClusteringUtilConfig(String basePath, @@ -1023,9 +1047,7 @@ public void testHoodieAsyncClusteringJob() throws Exception { HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT); cfg.continuousMode = true; cfg.tableType = HoodieTableType.COPY_ON_WRITE.name(); - cfg.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords)); - cfg.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN_PROP.key())); - cfg.configs.add(String.format("%s=true", HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE_OPT_KEY.key())); + cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "", "", "true", "")); HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc); deltaStreamerTestRunner(ds, cfg, (r) -> { TestHelpers.assertAtLeastNCommits(2, tableBasePath, dfs); @@ -1049,14 +1071,48 @@ public void testHoodieAsyncClusteringJob() throws Exception { } else { LOG.warn("Schedule clustering failed"); } - HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(this.dfs.getConf()).setBasePath(tableBasePath).setLoadActiveTimelineOnLoad(true).build(); - int pendingReplaceSize = metaClient.getActiveTimeline().filterPendingReplaceTimeline().getInstants().toArray().length; - int completeReplaceSize = metaClient.getActiveTimeline().getCompletedReplaceTimeline().getInstants().toArray().length; - System.out.println("PendingReplaceSize=" + pendingReplaceSize + ",completeReplaceSize = " + completeReplaceSize); - return completeReplaceSize > 0; + TestHelpers.assertAtLeastNReplaceCommits(2, tableBasePath, dfs); + return true; + }); + } + + @Test + public void testAsyncClusteringService() throws Exception { + String tableBasePath = dfsBasePath + "/asyncClustering"; + // Keep it higher than batch-size to test continuous mode + int totalRecords = 3000; + + // Initial bulk insert + HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT); + cfg.continuousMode = true; + cfg.tableType = HoodieTableType.COPY_ON_WRITE.name(); + cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "", "", "true", "2")); + HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc); + deltaStreamerTestRunner(ds, cfg, (r) -> { + TestHelpers.assertAtLeastNCommits(2, tableBasePath, dfs); + TestHelpers.assertAtLeastNReplaceCommits(2, tableBasePath, dfs); + return true; + }); + } + + @Test + public void testAsyncClusteringServiceWithCompaction() throws Exception { + String tableBasePath = dfsBasePath + "/asyncClusteringCompaction"; + // Keep it higher than batch-size to test continuous mode + int totalRecords = 3000; + + // Initial bulk insert + HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT); + cfg.continuousMode = true; + cfg.tableType = HoodieTableType.MERGE_ON_READ.name(); + cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "", "", "true", "2")); + HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc); + deltaStreamerTestRunner(ds, cfg, (r) -> { + TestHelpers.assertAtLeastNCommits(2, tableBasePath, dfs); + TestHelpers.assertAtleastNCompactionCommits(2, tableBasePath, dfs); + TestHelpers.assertAtLeastNReplaceCommits(2, tableBasePath, dfs); + return true; }); - HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(this.dfs.getConf()).setBasePath(tableBasePath).setLoadActiveTimelineOnLoad(true).build(); - assertEquals(1, metaClient.getActiveTimeline().getCompletedReplaceTimeline().getInstants().toArray().length); } /**