From 2ccf74dd4a15eb7b5e9e1aa52476a41f1da01c3e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=96=BB=E5=85=86=E9=9D=96?= Date: Sat, 18 Jun 2022 12:48:23 +0800 Subject: [PATCH 1/2] [HUDI-4273] Support inline schedule clustering for Flink stream --- .../ScheduleCompactionActionExecutor.java | 2 +- .../hudi/configuration/FlinkOptions.java | 8 +- .../hudi/configuration/OptionsResolver.java | 51 ++- .../sink/StreamWriteOperatorCoordinator.java | 10 +- .../clustering/ClusteringCommitEvent.java | 10 +- .../sink/clustering/ClusteringCommitSink.java | 28 +- .../sink/clustering/ClusteringOperator.java | 159 +++++--- .../clustering/ClusteringPlanOperator.java | 139 +++++++ .../ClusteringPlanSourceFunction.java | 11 +- .../clustering/FlinkClusteringConfig.java | 23 +- .../clustering/HoodieFlinkClusteringJob.java | 347 +++++++++++++----- .../hudi/sink/compact/CompactFunction.java | 3 +- .../sink/compact/CompactionCommitSink.java | 2 +- .../org/apache/hudi/sink/utils/Pipelines.java | 63 +++- .../hudi/streamer/HoodieFlinkStreamer.java | 3 +- .../apache/hudi/table/HoodieTableSink.java | 10 +- .../org/apache/hudi/util/ClusteringUtil.java | 81 ++++ .../org/apache/hudi/util/StreamerUtil.java | 20 - .../hudi/sink/TestWriteCopyOnWrite.java | 29 +- .../cluster/ITTestHoodieFlinkClustering.java | 57 ++- .../sink/utils/ClusteringFunctionWrapper.java | 121 ++++++ .../sink/utils/InsertFunctionWrapper.java | 34 +- .../utils/StreamWriteFunctionWrapper.java | 5 +- .../apache/hudi/sink/utils/TestWriteBase.java | 7 +- .../java/org/apache/hudi/utils/TestUtils.java | 10 + 25 files changed, 1006 insertions(+), 227 deletions(-) create mode 100644 hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringPlanOperator.java create mode 100644 hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ClusteringUtil.java create mode 100644 hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/ClusteringFunctionWrapper.java diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java index 05fb7c0c92d1d..0f1c811ee9862 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java @@ -71,7 +71,7 @@ public Option execute() { if (!config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl() && !config.getFailedWritesCleanPolicy().isLazy()) { // TODO(yihua): this validation is removed for Java client used by kafka-connect. Need to revisit this. - if (config.getEngineType() != EngineType.JAVA) { + if (config.getEngineType() == EngineType.SPARK) { // if there are inflight writes, their instantTime must not be less than that of compaction instant time table.getActiveTimeline().getCommitsTimeline().filterPendingExcludingCompaction().firstInstant() .ifPresent(earliestInflight -> ValidationUtils.checkArgument( diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index d4fb4656f5f47..9463e497de99b 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -594,6 +594,12 @@ private FlinkOptions() { .defaultValue(false) // default false for pipeline .withDescription("Schedule the cluster plan, default false"); + public static final ConfigOption CLUSTERING_ASYNC_ENABLED = ConfigOptions + .key("clustering.async.enabled") + .booleanType() + .defaultValue(false) // default false for pipeline + .withDescription("Async Clustering, default false"); + public static final ConfigOption CLUSTERING_DELTA_COMMITS = ConfigOptions .key("clustering.delta_commits") .intType() @@ -641,7 +647,7 @@ private FlinkOptions() { public static final ConfigOption CLUSTERING_SORT_COLUMNS = ConfigOptions .key("clustering.plan.strategy.sort.columns") .stringType() - .noDefaultValue() + .defaultValue("") .withDescription("Columns to sort the data by when clustering"); public static final ConfigOption CLUSTERING_MAX_NUM_GROUPS = ConfigOptions diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java index 6ebf09069be60..4cfa0bc92aa40 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java @@ -20,6 +20,7 @@ import org.apache.hudi.common.model.DefaultHoodieRecordPayload; import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.table.format.FilePathUtils; @@ -42,7 +43,10 @@ public static boolean insertClustering(Configuration conf) { * Returns whether the insert is clustering disabled with given configuration {@code conf}. */ public static boolean isAppendMode(Configuration conf) { - return isCowTable(conf) && isInsertOperation(conf) && !conf.getBoolean(FlinkOptions.INSERT_CLUSTER); + // 1. inline clustering is supported for COW table; + // 2. async clustering is supported for both COW and MOR table + return isCowTable(conf) && isInsertOperation(conf) && !conf.getBoolean(FlinkOptions.INSERT_CLUSTER) + || needsScheduleClustering(conf); } /** @@ -115,4 +119,49 @@ public static boolean emitChangelog(Configuration conf) { return conf.getBoolean(FlinkOptions.READ_AS_STREAMING) && conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED); } + + /** + * Returns whether there is need to schedule the async compaction. + * + * @param conf The flink configuration. + */ + public static boolean needsAsyncCompaction(Configuration conf) { + return OptionsResolver.isMorTable(conf) + && conf.getBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED); + } + + /** + * Returns whether there is need to schedule the compaction plan. + * + * @param conf The flink configuration. + */ + public static boolean needsScheduleCompaction(Configuration conf) { + return OptionsResolver.isMorTable(conf) + && conf.getBoolean(FlinkOptions.COMPACTION_SCHEDULE_ENABLED); + } + + /** + * Returns whether there is need to schedule the async clustering. + * + * @param conf The flink configuration. + */ + public static boolean needsAsyncClustering(Configuration conf) { + return isInsertOperation(conf) && conf.getBoolean(FlinkOptions.CLUSTERING_ASYNC_ENABLED); + } + + /** + * Returns whether there is need to schedule the clustering plan. + * + * @param conf The flink configuration. + */ + public static boolean needsScheduleClustering(Configuration conf) { + return isInsertOperation(conf) && conf.getBoolean(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED); + } + + /** + * Returns whether the clustering sort is enabled. + */ + public static boolean sortClusteringEnabled(Configuration conf) { + return !StringUtils.isNullOrEmpty(conf.getString(FlinkOptions.CLUSTERING_SORT_COLUMNS)); + } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java index 25b34b11e7cc3..44553820be766 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java @@ -37,6 +37,7 @@ import org.apache.hudi.sink.meta.CkpMetadata; import org.apache.hudi.sink.utils.HiveSyncContext; import org.apache.hudi.sink.utils.NonThrownExecutor; +import org.apache.hudi.util.ClusteringUtil; import org.apache.hudi.util.CompactionUtil; import org.apache.hudi.util.StreamerUtil; @@ -253,6 +254,11 @@ public void notifyCheckpointComplete(long checkpointId) { CompactionUtil.scheduleCompaction(metaClient, writeClient, tableState.isDeltaTimeCompaction, committed); } + if (tableState.scheduleClustering) { + // if async clustering is on, schedule the clustering + ClusteringUtil.scheduleClustering(conf, writeClient, committed); + } + if (committed) { // start new instant. startInstant(); @@ -607,6 +613,7 @@ private static class TableState implements Serializable { final String commitAction; final boolean isOverwrite; final boolean scheduleCompaction; + final boolean scheduleClustering; final boolean syncHive; final boolean syncMetadata; final boolean isDeltaTimeCompaction; @@ -616,7 +623,8 @@ private TableState(Configuration conf) { this.commitAction = CommitUtils.getCommitActionType(this.operationType, HoodieTableType.valueOf(conf.getString(FlinkOptions.TABLE_TYPE).toUpperCase(Locale.ROOT))); this.isOverwrite = WriteOperationType.isOverwrite(this.operationType); - this.scheduleCompaction = StreamerUtil.needsScheduleCompaction(conf); + this.scheduleCompaction = OptionsResolver.needsScheduleCompaction(conf); + this.scheduleClustering = OptionsResolver.needsScheduleClustering(conf); this.syncHive = conf.getBoolean(FlinkOptions.HIVE_SYNC_ENABLED); this.syncMetadata = conf.getBoolean(FlinkOptions.METADATA_ENABLED); this.isDeltaTimeCompaction = OptionsResolver.isDeltaTimeCompaction(conf); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitEvent.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitEvent.java index 30a8fbed3fafd..46a15a62648bf 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitEvent.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitEvent.java @@ -24,7 +24,7 @@ import java.util.List; /** - * Represents a commit event from the clustering task {@link ClusteringFunction}. + * Represents a commit event from the clustering task {@link ClusteringOperator}. */ public class ClusteringCommitEvent implements Serializable { private static final long serialVersionUID = 1L; @@ -51,6 +51,10 @@ public ClusteringCommitEvent(String instant, List writeStatuses, in this.taskID = taskID; } + public ClusteringCommitEvent(String instant, int taskID) { + this(instant, null, taskID); + } + public void setInstant(String instant) { this.instant = instant; } @@ -74,4 +78,8 @@ public List getWriteStatuses() { public int getTaskID() { return taskID; } + + public boolean isFailed() { + return this.writeStatuses == null; + } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java index bc87270a49f1b..85e09cbc198a5 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java @@ -35,6 +35,7 @@ import org.apache.hudi.sink.CleanFunction; import org.apache.hudi.table.HoodieFlinkTable; import org.apache.hudi.table.action.HoodieWriteMetadata; +import org.apache.hudi.util.CompactionUtil; import org.apache.hudi.util.StreamerUtil; import org.apache.flink.configuration.Configuration; @@ -115,6 +116,30 @@ private void commitIfNecessary(String instant, List event if (!isReady) { return; } + + if (events.stream().anyMatch(ClusteringCommitEvent::isFailed)) { + try { + // handle failure case + CompactionUtil.rollbackCompaction(table, instant); + } finally { + // remove commitBuffer to avoid obsolete metadata commit + reset(instant); + } + return; + } + + try { + doCommit(instant, clusteringPlan, events); + } catch (Throwable throwable) { + // make it fail-safe + LOG.error("Error while committing clustering instant: " + instant, throwable); + } finally { + // reset the status + reset(instant); + } + } + + private void doCommit(String instant, HoodieClusteringPlan clusteringPlan, List events) { List statuses = events.stream() .map(ClusteringCommitEvent::getWriteStatuses) .flatMap(Collection::stream) @@ -139,9 +164,6 @@ private void commitIfNecessary(String instant, List event this.table.getMetaClient().reloadActiveTimeline(); this.writeClient.completeTableService( TableServiceType.CLUSTER, writeMetadata.getCommitMetadata().get(), table, instant); - - // reset the status - reset(instant); } private void reset(String instant) { diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java index a415ac9d46165..8abd870a35faa 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java @@ -33,6 +33,7 @@ import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.configuration.OptionsResolver; import org.apache.hudi.exception.HoodieClusteringException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.io.IOUtils; @@ -40,7 +41,9 @@ import org.apache.hudi.io.storage.HoodieFileReaderFactory; import org.apache.hudi.sink.bulk.BulkInsertWriterHelper; import org.apache.hudi.sink.bulk.sort.SortOperatorGen; +import org.apache.hudi.sink.utils.NonThrownExecutor; import org.apache.hudi.table.HoodieFlinkTable; +import org.apache.hudi.util.AvroSchemaConverter; import org.apache.hudi.util.AvroToRowDataConverters; import org.apache.hudi.util.StreamerUtil; @@ -48,11 +51,13 @@ import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecordBuilder; import org.apache.avro.generic.IndexedRecord; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.configuration.Configuration; import org.apache.flink.metrics.Gauge; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.streaming.api.operators.BoundedOneInput; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.binary.BinaryRowData; @@ -100,15 +105,25 @@ public class ClusteringOperator extends TableStreamOperator collector; private transient BinaryRowDataSerializer binarySerializer; + /** + * Whether to execute clustering asynchronously. + */ + private final boolean asyncClustering; + + /** + * Executor service to execute the clustering task. + */ + private transient NonThrownExecutor executor; + public ClusteringOperator(Configuration conf, RowType rowType) { this.conf = conf; this.rowType = rowType; + this.asyncClustering = OptionsResolver.needsAsyncClustering(conf); } @Override @@ -120,46 +135,61 @@ public void open() throws Exception { this.writeClient = StreamerUtil.createWriteClient(conf, getRuntimeContext()); this.table = writeClient.getHoodieTable(); - this.schema = StreamerUtil.getTableAvroSchema(table.getMetaClient(), false); + this.schema = AvroSchemaConverter.convertToSchema(rowType); this.readerSchema = HoodieAvroUtils.addMetadataFields(this.schema); this.requiredPos = getRequiredPositions(); this.avroToRowDataConverter = AvroToRowDataConverters.createRowConverter(rowType); + this.binarySerializer = new BinaryRowDataSerializer(rowType.getFieldCount()); - ClassLoader cl = getContainingTask().getUserCodeClassLoader(); - - AbstractRowDataSerializer inputSerializer = new BinaryRowDataSerializer(rowType.getFieldCount()); - this.binarySerializer = new BinaryRowDataSerializer(inputSerializer.getArity()); - - NormalizedKeyComputer computer = createSortCodeGenerator().generateNormalizedKeyComputer("SortComputer").newInstance(cl); - RecordComparator comparator = createSortCodeGenerator().generateRecordComparator("SortComparator").newInstance(cl); + if (OptionsResolver.sortClusteringEnabled(conf)) { + initSorter(); + } - MemoryManager memManager = getContainingTask().getEnvironment().getMemoryManager(); - this.sorter = - new BinaryExternalSorter( - this.getContainingTask(), - memManager, - computeMemorySize(), - this.getContainingTask().getEnvironment().getIOManager(), - inputSerializer, - binarySerializer, - computer, - comparator, - getContainingTask().getJobConfiguration()); - this.sorter.startThreads(); + if (this.asyncClustering) { + this.executor = NonThrownExecutor.builder(LOG).build(); + } collector = new StreamRecordCollector<>(output); - - // register the metrics. - getMetricGroup().gauge("memoryUsedSizeInBytes", (Gauge) sorter::getUsedMemoryInBytes); - getMetricGroup().gauge("numSpillFiles", (Gauge) sorter::getNumSpillFiles); - getMetricGroup().gauge("spillInBytes", (Gauge) sorter::getSpillInBytes); } @Override public void processElement(StreamRecord element) throws Exception { ClusteringPlanEvent event = element.getValue(); final String instantTime = event.getClusteringInstantTime(); + if (this.asyncClustering) { + // executes the compaction task asynchronously to not block the checkpoint barrier propagate. + executor.execute( + () -> doClustering(instantTime, event), + (errMsg, t) -> collector.collect(new ClusteringCommitEvent(instantTime, taskID)), + "Execute clustering for instant %s from task %d", instantTime, taskID); + } else { + // executes the clustering task synchronously for batch mode. + LOG.info("Execute clustering for instant {} from task {}", instantTime, taskID); + doClustering(instantTime, event); + } + } + + @Override + public void close() { + if (this.writeClient != null) { + this.writeClient.cleanHandlesGracefully(); + this.writeClient.close(); + } + } + + /** + * End input action for batch source. + */ + public void endInput() { + // no operation + } + + // ------------------------------------------------------------------------- + // Utilities + // ------------------------------------------------------------------------- + + private void doClustering(String instantTime, ClusteringPlanEvent event) throws Exception { final ClusteringGroupInfo clusteringGroupInfo = event.getClusteringGroupInfo(); initWriterHelper(instantTime); @@ -177,44 +207,33 @@ public void processElement(StreamRecord element) throws Exc } RowDataSerializer rowDataSerializer = new RowDataSerializer(rowType); - while (iterator.hasNext()) { - RowData rowData = iterator.next(); - BinaryRowData binaryRowData = rowDataSerializer.toBinaryRow(rowData).copy(); - this.sorter.write(binaryRowData); - } - BinaryRowData row = binarySerializer.createInstance(); - while ((row = sorter.getIterator().next(row)) != null) { - this.writerHelper.write(row); - } - } + if (!StringUtils.isNullOrEmpty(conf.getString(FlinkOptions.CLUSTERING_SORT_COLUMNS))) { + while (iterator.hasNext()) { + RowData rowData = iterator.next(); + BinaryRowData binaryRowData = rowDataSerializer.toBinaryRow(rowData).copy(); + this.sorter.write(binaryRowData); + } - @Override - public void close() { - if (this.writeClient != null) { - this.writeClient.cleanHandlesGracefully(); - this.writeClient.close(); + BinaryRowData row = binarySerializer.createInstance(); + while ((row = sorter.getIterator().next(row)) != null) { + this.writerHelper.write(row); + } + } else { + while (iterator.hasNext()) { + this.writerHelper.write(iterator.next()); + } } - } - /** - * End input action for batch source. - */ - public void endInput() { List writeStatuses = this.writerHelper.getWriteStatuses(this.taskID); collector.collect(new ClusteringCommitEvent(instantTime, writeStatuses, this.taskID)); } - // ------------------------------------------------------------------------- - // Utilities - // ------------------------------------------------------------------------- - private void initWriterHelper(String clusteringInstantTime) { if (this.writerHelper == null) { this.writerHelper = new BulkInsertWriterHelper(this.conf, this.table, this.writeConfig, clusteringInstantTime, this.taskID, getRuntimeContext().getNumberOfParallelSubtasks(), getRuntimeContext().getAttemptNumber(), this.rowType); - this.instantTime = clusteringInstantTime; } } @@ -305,14 +324,44 @@ private int[] getRequiredPositions() { .toArray(); } + private void initSorter() { + ClassLoader cl = getContainingTask().getUserCodeClassLoader(); + NormalizedKeyComputer computer = createSortCodeGenerator().generateNormalizedKeyComputer("SortComputer").newInstance(cl); + RecordComparator comparator = createSortCodeGenerator().generateRecordComparator("SortComparator").newInstance(cl); + + MemoryManager memManager = getContainingTask().getEnvironment().getMemoryManager(); + this.sorter = + new BinaryExternalSorter( + this.getContainingTask(), + memManager, + computeMemorySize(), + this.getContainingTask().getEnvironment().getIOManager(), + (AbstractRowDataSerializer) binarySerializer, + binarySerializer, + computer, + comparator, + getContainingTask().getJobConfiguration()); + this.sorter.startThreads(); + + // register the metrics. + getMetricGroup().gauge("memoryUsedSizeInBytes", (Gauge) sorter::getUsedMemoryInBytes); + getMetricGroup().gauge("numSpillFiles", (Gauge) sorter::getNumSpillFiles); + getMetricGroup().gauge("spillInBytes", (Gauge) sorter::getSpillInBytes); + } + private SortCodeGenerator createSortCodeGenerator() { SortOperatorGen sortOperatorGen = new SortOperatorGen(rowType, conf.getString(FlinkOptions.CLUSTERING_SORT_COLUMNS).split(",")); return sortOperatorGen.createSortCodeGenerator(); } - @Override - public void setKeyContextElement(StreamRecord record) throws Exception { - OneInputStreamOperator.super.setKeyContextElement(record); + @VisibleForTesting + public void setExecutor(NonThrownExecutor executor) { + this.executor = executor; + } + + @VisibleForTesting + public void setOutput(Output> output) { + this.output = output; } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringPlanOperator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringPlanOperator.java new file mode 100644 index 0000000000000..d5f9b612e1ae7 --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringPlanOperator.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.clustering; + +import org.apache.hudi.avro.model.HoodieClusteringGroup; +import org.apache.hudi.avro.model.HoodieClusteringPlan; +import org.apache.hudi.common.model.ClusteringGroupInfo; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.ClusteringUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.table.HoodieFlinkTable; +import org.apache.hudi.util.ClusteringUtil; +import org.apache.hudi.util.FlinkTables; +import org.apache.hudi.util.StreamerUtil; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +/** + * Operator that generates the clustering plan with pluggable strategies on finished checkpoints. + * + *

It should be singleton to avoid conflicts. + */ +public class ClusteringPlanOperator extends AbstractStreamOperator + implements OneInputStreamOperator { + + /** + * Config options. + */ + private final Configuration conf; + + /** + * Meta Client. + */ + @SuppressWarnings("rawtypes") + private transient HoodieFlinkTable table; + + public ClusteringPlanOperator(Configuration conf) { + this.conf = conf; + } + + @Override + public void open() throws Exception { + super.open(); + this.table = FlinkTables.createTable(conf, getRuntimeContext()); + // when starting up, rolls back all the inflight clustering instants if there exists, + // these instants are in priority for scheduling task because the clustering instants are + // scheduled from earliest(FIFO sequence). + ClusteringUtil.rollbackClustering(table, StreamerUtil.createWriteClient(conf, getRuntimeContext())); + } + + @Override + public void processElement(StreamRecord streamRecord) { + // no operation + } + + @Override + public void notifyCheckpointComplete(long checkpointId) { + try { + table.getMetaClient().reloadActiveTimeline(); + scheduleClustering(table, checkpointId); + } catch (Throwable throwable) { + // make it fail-safe + LOG.error("Error while scheduling clustering plan for checkpoint: " + checkpointId, throwable); + } + } + + private void scheduleClustering(HoodieFlinkTable table, long checkpointId) { + // the first instant takes the highest priority. + Option firstRequested = Option.fromJavaOptional( + ClusteringUtils.getPendingClusteringInstantTimes(table.getMetaClient()).stream() + .filter(instant -> instant.getState() == HoodieInstant.State.REQUESTED).findFirst()); + if (!firstRequested.isPresent()) { + // do nothing. + LOG.info("No clustering plan for checkpoint " + checkpointId); + return; + } + + String clusteringInstantTime = firstRequested.get().getTimestamp(); + + // generate clustering plan + // should support configurable commit metadata + Option> clusteringPlanOption = ClusteringUtils.getClusteringPlan( + table.getMetaClient(), HoodieTimeline.getReplaceCommitRequestedInstant(clusteringInstantTime)); + + if (!clusteringPlanOption.isPresent()) { + // do nothing. + LOG.info("No clustering plan scheduled"); + return; + } + + HoodieClusteringPlan clusteringPlan = clusteringPlanOption.get().getRight(); + + if (clusteringPlan == null || (clusteringPlan.getInputGroups() == null) + || (clusteringPlan.getInputGroups().isEmpty())) { + // do nothing. + LOG.info("Empty clustering plan for instant " + clusteringInstantTime); + } else { + HoodieInstant instant = HoodieTimeline.getReplaceCommitRequestedInstant(clusteringInstantTime); + // Mark instant as clustering inflight + table.getActiveTimeline().transitionReplaceRequestedToInflight(instant, Option.empty()); + table.getMetaClient().reloadActiveTimeline(); + + for (HoodieClusteringGroup clusteringGroup : clusteringPlan.getInputGroups()) { + LOG.info("ClusteringPlanSourceFunction cluster " + clusteringGroup + " files"); + output.collect(new StreamRecord<>( + new ClusteringPlanEvent(clusteringInstantTime, ClusteringGroupInfo.create(clusteringGroup), clusteringPlan.getStrategy().getStrategyParams()) + )); + } + } + } + + @VisibleForTesting + public void setOutput(Output> output) { + this.output = output; + } +} diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringPlanSourceFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringPlanSourceFunction.java index a3db2d41c8371..9b05165ae5acb 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringPlanSourceFunction.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringPlanSourceFunction.java @@ -22,7 +22,6 @@ import org.apache.hudi.avro.model.HoodieClusteringPlan; import org.apache.hudi.common.model.ClusteringGroupInfo; import org.apache.hudi.common.model.ClusteringOperation; -import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.flink.api.common.functions.AbstractRichFunction; import org.apache.flink.configuration.Configuration; @@ -57,12 +56,12 @@ public class ClusteringPlanSourceFunction extends AbstractRichFunction implement private final HoodieClusteringPlan clusteringPlan; /** - * Hoodie instant. + * Clustering instant time. */ - private final HoodieInstant instant; + private final String clusteringInstantTime; - public ClusteringPlanSourceFunction(HoodieInstant instant, HoodieClusteringPlan clusteringPlan) { - this.instant = instant; + public ClusteringPlanSourceFunction(String clusteringInstantTime, HoodieClusteringPlan clusteringPlan) { + this.clusteringInstantTime = clusteringInstantTime; this.clusteringPlan = clusteringPlan; } @@ -75,7 +74,7 @@ public void open(Configuration parameters) throws Exception { public void run(SourceContext sourceContext) throws Exception { for (HoodieClusteringGroup clusteringGroup : clusteringPlan.getInputGroups()) { LOG.info("ClusteringPlanSourceFunction cluster " + clusteringGroup + " files"); - sourceContext.collect(new ClusteringPlanEvent(this.instant.getTimestamp(), ClusteringGroupInfo.create(clusteringGroup), clusteringPlan.getStrategy().getStrategyParams())); + sourceContext.collect(new ClusteringPlanEvent(this.clusteringInstantTime, ClusteringGroupInfo.create(clusteringGroup), clusteringPlan.getStrategy().getStrategyParams())); } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/FlinkClusteringConfig.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/FlinkClusteringConfig.java index e87a7d6752b6e..655337a766908 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/FlinkClusteringConfig.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/FlinkClusteringConfig.java @@ -18,7 +18,9 @@ package org.apache.hudi.sink.clustering; +import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.util.StreamerUtil; import com.beust.jcommander.Parameter; import org.apache.flink.configuration.Configuration; @@ -100,16 +102,15 @@ public class FlinkClusteringConfig extends Configuration { public static final String SEQ_LIFO = "LIFO"; @Parameter(names = {"--seq"}, description = "Clustering plan execution sequence, two options are supported:\n" + "1). FIFO: execute the oldest plan first;\n" - + "2). LIFO: execute the latest plan first, by default LIFO", required = false) - public String clusteringSeq = SEQ_LIFO; + + "2). LIFO: execute the latest plan first, by default FIFO", required = false) + public String clusteringSeq = SEQ_FIFO; - @Parameter(names = {"--write-partition-url-encode"}, description = "Whether to encode the partition path url, default false") - public Boolean writePartitionUrlEncode = false; + @Parameter(names = {"--service"}, description = "Flink Clustering runs in service mode, disable by default") + public Boolean serviceMode = false; - @Parameter(names = {"--hive-style-partitioning"}, description = "Whether to use Hive style partitioning.\n" - + "If set true, the names of partition folders follow = format.\n" - + "By default false (the names of partition folders are only partition values)") - public Boolean hiveStylePartitioning = false; + @Parameter(names = {"--min-clustering-interval-seconds"}, + description = "Min clustering interval of async clustering service, default 10 minutes") + public Integer minClusteringIntervalSeconds = 600; /** * Transforms a {@code FlinkClusteringConfig.config} into {@code Configuration}. @@ -137,11 +138,13 @@ public static Configuration toFlinkConfig(FlinkClusteringConfig config) { conf.setBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED, config.cleanAsyncEnable); // use synchronous clustering always + conf.setBoolean(FlinkOptions.CLUSTERING_ASYNC_ENABLED, false); conf.setBoolean(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED, config.schedule); // bulk insert conf - conf.setBoolean(FlinkOptions.URL_ENCODE_PARTITIONING, config.writePartitionUrlEncode); - conf.setBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING, config.hiveStylePartitioning); + HoodieTableConfig tableConfig = StreamerUtil.createMetaClient(conf).getTableConfig(); + conf.setBoolean(FlinkOptions.URL_ENCODE_PARTITIONING, Boolean.parseBoolean(tableConfig.getUrlEncodePartitioning())); + conf.setBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING, Boolean.parseBoolean(tableConfig.getHiveStylePartitioningEnable())); return conf; } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java index b8ba8e43891d5..c88aa4b989b01 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java @@ -18,6 +18,7 @@ package org.apache.hudi.sink.clustering; +import org.apache.hudi.async.HoodieAsyncTableService; import org.apache.hudi.avro.model.HoodieClusteringPlan; import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.common.table.HoodieTableMetaClient; @@ -28,13 +29,16 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.table.HoodieFlinkTable; import org.apache.hudi.util.AvroSchemaConverter; +import org.apache.hudi.util.ClusteringUtil; import org.apache.hudi.util.CompactionUtil; import org.apache.hudi.util.StreamerUtil; import com.beust.jcommander.JCommander; import org.apache.avro.Schema; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; @@ -45,6 +49,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.stream.Collectors; + /** * Flink hudi clustering program that can be executed manually. */ @@ -52,141 +62,278 @@ public class HoodieFlinkClusteringJob { protected static final Logger LOG = LoggerFactory.getLogger(HoodieFlinkClusteringJob.class); + /** + * Flink Execution Environment. + */ + private final AsyncClusteringService clusteringScheduleService; + + public HoodieFlinkClusteringJob(AsyncClusteringService service) { + this.clusteringScheduleService = service; + } + public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + FlinkClusteringConfig cfg = getFlinkClusteringConfig(args); + Configuration conf = FlinkClusteringConfig.toFlinkConfig(cfg); + + AsyncClusteringService service = new AsyncClusteringService(cfg, conf, env); + + new HoodieFlinkClusteringJob(service).start(cfg.serviceMode); + } + + /** + * Main method to start clustering service. + */ + public void start(boolean serviceMode) throws Exception { + if (serviceMode) { + clusteringScheduleService.start(null); + try { + clusteringScheduleService.waitForShutdown(); + } catch (Exception e) { + throw new HoodieException(e.getMessage(), e); + } finally { + LOG.info("Shut down hoodie flink clustering"); + } + } else { + LOG.info("Hoodie Flink Clustering running only single round"); + try { + clusteringScheduleService.cluster(); + } catch (Exception e) { + LOG.error("Got error running delta sync once. Shutting down", e); + throw e; + } finally { + LOG.info("Shut down hoodie flink clustering"); + } + } + } + + public static FlinkClusteringConfig getFlinkClusteringConfig(String[] args) { FlinkClusteringConfig cfg = new FlinkClusteringConfig(); JCommander cmd = new JCommander(cfg, null, args); if (cfg.help || args.length == 0) { cmd.usage(); System.exit(1); } + return cfg; + } - Configuration conf = FlinkClusteringConfig.toFlinkConfig(cfg); - - // create metaClient - HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf); - - // set table name - conf.setString(FlinkOptions.TABLE_NAME, metaClient.getTableConfig().getTableName()); - - // set table type - conf.setString(FlinkOptions.TABLE_TYPE, metaClient.getTableConfig().getTableType().name()); - - // set record key field - conf.setString(FlinkOptions.RECORD_KEY_FIELD, metaClient.getTableConfig().getRecordKeyFieldProp()); + // ------------------------------------------------------------------------- + // Inner Class + // ------------------------------------------------------------------------- + + /** + * Schedules clustering in service. + */ + public static class AsyncClusteringService extends HoodieAsyncTableService { + private static final long serialVersionUID = 1L; + + /** + * Flink Clustering Config. + */ + private final FlinkClusteringConfig cfg; + + /** + * Flink Config. + */ + private final Configuration conf; + + /** + * Meta Client. + */ + private final HoodieTableMetaClient metaClient; + + /** + * Write Client. + */ + private final HoodieFlinkWriteClient writeClient; + + /** + * The hoodie table. + */ + private final HoodieFlinkTable table; + + /** + * Flink Execution Environment. + */ + private final StreamExecutionEnvironment env; + + /** + * Executor Service. + */ + private final ExecutorService executor; + + public AsyncClusteringService(FlinkClusteringConfig cfg, Configuration conf, StreamExecutionEnvironment env) throws Exception { + this.cfg = cfg; + this.conf = conf; + this.env = env; + this.executor = Executors.newFixedThreadPool(1); + + // create metaClient + this.metaClient = StreamerUtil.createMetaClient(conf); + + // set table name + conf.setString(FlinkOptions.TABLE_NAME, metaClient.getTableConfig().getTableName()); + + // set table type + conf.setString(FlinkOptions.TABLE_TYPE, metaClient.getTableConfig().getTableType().name()); + + // set record key field + conf.setString(FlinkOptions.RECORD_KEY_FIELD, metaClient.getTableConfig().getRecordKeyFieldProp()); + + // set partition field + conf.setString(FlinkOptions.PARTITION_PATH_FIELD, metaClient.getTableConfig().getPartitionFieldProp()); + + // set table schema + CompactionUtil.setAvroSchema(conf, metaClient); + + this.writeClient = StreamerUtil.createWriteClient(conf); + this.writeConfig = writeClient.getConfig(); + this.table = writeClient.getHoodieTable(); + } - // set partition field - conf.setString(FlinkOptions.PARTITION_PATH_FIELD, metaClient.getTableConfig().getPartitionFieldProp()); + @Override + protected Pair startService() { + return Pair.of(CompletableFuture.supplyAsync(() -> { + boolean error = false; + + try { + while (!isShutdownRequested()) { + try { + cluster(); + Thread.sleep(cfg.minClusteringIntervalSeconds * 1000); + } catch (Exception e) { + LOG.error("Shutting down clustering service due to exception", e); + error = true; + throw new HoodieException(e.getMessage(), e); + } + } + } finally { + shutdownAsyncService(error); + } + return true; + }, executor), executor); + } - // set table schema - CompactionUtil.setAvroSchema(conf, metaClient); + private void cluster() throws Exception { + table.getMetaClient().reloadActiveTimeline(); - HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf); - HoodieFlinkTable table = writeClient.getHoodieTable(); + // judges whether there are operations + // to compute the clustering instant time and exec clustering. + if (cfg.schedule) { + ClusteringUtil.validateClusteringScheduling(conf); + String clusteringInstantTime = HoodieActiveTimeline.createNewInstantTime(); + boolean scheduled = writeClient.scheduleClusteringAtInstant(clusteringInstantTime, Option.empty()); + if (!scheduled) { + // do nothing. + LOG.info("No clustering plan for this job "); + return; + } + table.getMetaClient().reloadActiveTimeline(); + } - // judge whether have operation - // to compute the clustering instant time and do cluster. - if (cfg.schedule) { - String clusteringInstantTime = HoodieActiveTimeline.createNewInstantTime(); - boolean scheduled = writeClient.scheduleClusteringAtInstant(clusteringInstantTime, Option.empty()); - if (!scheduled) { + // fetch the instant based on the configured execution sequence + List instants = ClusteringUtils.getPendingClusteringInstantTimes(table.getMetaClient()).stream() + .filter(instant -> instant.getState() == HoodieInstant.State.REQUESTED).collect(Collectors.toList()); + if (instants.isEmpty()) { // do nothing. - LOG.info("No clustering plan for this job "); + LOG.info("No clustering plan scheduled, turns on the clustering plan schedule with --schedule option"); return; } - } - table.getMetaClient().reloadActiveTimeline(); + HoodieInstant clusteringInstant = CompactionUtil.isLIFO(cfg.clusteringSeq) ? instants.get(instants.size() - 1) : instants.get(0); - // fetch the instant based on the configured execution sequence - HoodieTimeline timeline = table.getActiveTimeline().filterPendingReplaceTimeline() - .filter(instant -> instant.getState() == HoodieInstant.State.REQUESTED); - Option requested = CompactionUtil.isLIFO(cfg.clusteringSeq) ? timeline.lastInstant() : timeline.firstInstant(); - if (!requested.isPresent()) { - // do nothing. - LOG.info("No clustering plan scheduled, turns on the clustering plan schedule with --schedule option"); - return; - } + HoodieInstant inflightInstant = HoodieTimeline.getReplaceCommitInflightInstant(clusteringInstant.getTimestamp()); + if (table.getMetaClient().getActiveTimeline().containsInstant(inflightInstant)) { + LOG.info("Rollback inflight clustering instant: [" + clusteringInstant + "]"); + table.rollbackInflightClustering(inflightInstant, + commitToRollback -> writeClient.getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false)); + table.getMetaClient().reloadActiveTimeline(); + } - HoodieInstant clusteringInstant = requested.get(); + // generate clustering plan + // should support configurable commit metadata + Option> clusteringPlanOption = ClusteringUtils.getClusteringPlan( + table.getMetaClient(), clusteringInstant); - HoodieInstant inflightInstant = HoodieTimeline.getReplaceCommitInflightInstant(clusteringInstant.getTimestamp()); - if (timeline.containsInstant(inflightInstant)) { - LOG.info("Rollback inflight clustering instant: [" + clusteringInstant + "]"); - table.rollbackInflightClustering(inflightInstant, - commitToRollback -> writeClient.getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false)); - table.getMetaClient().reloadActiveTimeline(); - } + if (!clusteringPlanOption.isPresent()) { + // do nothing. + LOG.info("No clustering plan scheduled, turns on the clustering plan schedule with --schedule option"); + return; + } - // generate clustering plan - // should support configurable commit metadata - Option> clusteringPlanOption = ClusteringUtils.getClusteringPlan( - table.getMetaClient(), clusteringInstant); + HoodieClusteringPlan clusteringPlan = clusteringPlanOption.get().getRight(); - if (!clusteringPlanOption.isPresent()) { - // do nothing. - LOG.info("No clustering plan scheduled, turns on the clustering plan schedule with --schedule option"); - return; - } + if (clusteringPlan == null || (clusteringPlan.getInputGroups() == null) + || (clusteringPlan.getInputGroups().isEmpty())) { + // No clustering plan, do nothing and return. + LOG.info("No clustering plan for instant " + clusteringInstant.getTimestamp()); + return; + } - HoodieClusteringPlan clusteringPlan = clusteringPlanOption.get().getRight(); + HoodieInstant instant = HoodieTimeline.getReplaceCommitRequestedInstant(clusteringInstant.getTimestamp()); + HoodieTimeline pendingClusteringTimeline = table.getActiveTimeline().filterPendingReplaceTimeline(); + if (!pendingClusteringTimeline.containsInstant(instant)) { + // this means that the clustering plan was written to auxiliary path(.tmp) + // but not the meta path(.hoodie), this usually happens when the job crush + // exceptionally. - if (clusteringPlan == null || (clusteringPlan.getInputGroups() == null) - || (clusteringPlan.getInputGroups().isEmpty())) { - // No clustering plan, do nothing and return. - LOG.info("No clustering plan for instant " + clusteringInstant.getTimestamp()); - return; - } + // clean the clustering plan in auxiliary path and cancels the clustering. - HoodieInstant instant = HoodieTimeline.getReplaceCommitRequestedInstant(clusteringInstant.getTimestamp()); - HoodieTimeline pendingClusteringTimeline = table.getActiveTimeline().filterPendingReplaceTimeline(); - if (!pendingClusteringTimeline.containsInstant(instant)) { - // this means that the clustering plan was written to auxiliary path(.tmp) - // but not the meta path(.hoodie), this usually happens when the job crush - // exceptionally. + LOG.warn("The clustering plan was fetched through the auxiliary path(.tmp) but not the meta path(.hoodie).\n" + + "Clean the clustering plan in auxiliary path and cancels the clustering"); + CompactionUtil.cleanInstant(table.getMetaClient(), instant); + return; + } - // clean the clustering plan in auxiliary path and cancels the clustering. + // get clusteringParallelism. + int clusteringParallelism = conf.getInteger(FlinkOptions.CLUSTERING_TASKS) == -1 + ? clusteringPlan.getInputGroups().size() : conf.getInteger(FlinkOptions.CLUSTERING_TASKS); - LOG.warn("The clustering plan was fetched through the auxiliary path(.tmp) but not the meta path(.hoodie).\n" - + "Clean the clustering plan in auxiliary path and cancels the clustering"); - CompactionUtil.cleanInstant(table.getMetaClient(), instant); - return; - } + // Mark instant as clustering inflight + table.getActiveTimeline().transitionReplaceRequestedToInflight(instant, Option.empty()); - // get clusteringParallelism. - int clusteringParallelism = conf.getInteger(FlinkOptions.CLUSTERING_TASKS) == -1 - ? clusteringPlan.getInputGroups().size() : conf.getInteger(FlinkOptions.CLUSTERING_TASKS); + final Schema tableAvroSchema = StreamerUtil.getTableAvroSchema(table.getMetaClient(), false); + final DataType rowDataType = AvroSchemaConverter.convertToDataType(tableAvroSchema); + final RowType rowType = (RowType) rowDataType.getLogicalType(); - // Mark instant as clustering inflight - table.getActiveTimeline().transitionReplaceRequestedToInflight(instant, Option.empty()); + // setup configuration + long ckpTimeout = env.getCheckpointConfig().getCheckpointTimeout(); + conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout); - final Schema tableAvroSchema = StreamerUtil.getTableAvroSchema(table.getMetaClient(), false); - final DataType rowDataType = AvroSchemaConverter.convertToDataType(tableAvroSchema); - final RowType rowType = (RowType) rowDataType.getLogicalType(); + DataStream dataStream = env.addSource(new ClusteringPlanSourceFunction(clusteringInstant.getTimestamp(), clusteringPlan)) + .name("clustering_source") + .uid("uid_clustering_source") + .rebalance() + .transform("clustering_task", + TypeInformation.of(ClusteringCommitEvent.class), + new ClusteringOperator(conf, rowType)) + .setParallelism(clusteringParallelism); - // setup configuration - long ckpTimeout = env.getCheckpointConfig().getCheckpointTimeout(); - conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout); + ExecNodeUtil.setManagedMemoryWeight(dataStream.getTransformation(), + conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L); - DataStream dataStream = env.addSource(new ClusteringPlanSourceFunction(timeline.lastInstant().get(), clusteringPlan)) - .name("clustering_source") - .uid("uid_clustering_source") - .rebalance() - .transform("clustering_task", - TypeInformation.of(ClusteringCommitEvent.class), - new ClusteringOperator(conf, rowType)) - .setParallelism(clusteringPlan.getInputGroups().size()); + dataStream + .addSink(new ClusteringCommitSink(conf)) + .name("clustering_commit") + .uid("uid_clustering_commit") + .setParallelism(1); - ExecNodeUtil.setManagedMemoryWeight(dataStream.getTransformation(), - conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L); + env.execute("flink_hudi_clustering_" + clusteringInstant.getTimestamp()); + } - dataStream - .addSink(new ClusteringCommitSink(conf)) - .name("clustering_commit") - .uid("uid_clustering_commit") - .setParallelism(1); + /** + * Shutdown async services like compaction/clustering as DeltaSync is shutdown. + */ + public void shutdownAsyncService(boolean error) { + LOG.info("Gracefully shutting down clustering job. Error ?" + error); + executor.shutdown(); + writeClient.close(); + } - env.execute("flink_hudi_clustering"); + @VisibleForTesting + public void shutDown() { + shutdownAsyncService(false); + } } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java index a43fcd5ad4bf9..3622ed491b9dd 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java @@ -22,6 +22,7 @@ import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.model.CompactionOperation; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.configuration.OptionsResolver; import org.apache.hudi.sink.utils.NonThrownExecutor; import org.apache.hudi.table.HoodieFlinkCopyOnWriteTable; import org.apache.hudi.table.action.compact.HoodieFlinkMergeOnReadTableCompactor; @@ -72,7 +73,7 @@ public class CompactFunction extends ProcessFunction } finally { // remove commitBuffer to avoid obsolete metadata commit reset(instant); - return; } + return; } try { diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java index c969c10ed1913..43d476cf2aa83 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java @@ -32,6 +32,11 @@ import org.apache.hudi.sink.bulk.BulkInsertWriteOperator; import org.apache.hudi.sink.bulk.RowDataKeyGen; import org.apache.hudi.sink.bulk.sort.SortOperatorGen; +import org.apache.hudi.sink.clustering.ClusteringCommitEvent; +import org.apache.hudi.sink.clustering.ClusteringCommitSink; +import org.apache.hudi.sink.clustering.ClusteringOperator; +import org.apache.hudi.sink.clustering.ClusteringPlanEvent; +import org.apache.hudi.sink.clustering.ClusteringPlanOperator; import org.apache.hudi.sink.common.WriteOperatorFactory; import org.apache.hudi.sink.compact.CompactFunction; import org.apache.hudi.sink.compact.CompactionCommitEvent; @@ -114,7 +119,7 @@ public static DataStreamSink bulkInsert(Configuration conf, RowType rowT conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L); } return dataStream - .transform(writeOpIdentifier("bucket_bulk_insert", conf), TypeInformation.of(Object.class), operatorFactory) + .transform(opIdentifier("bucket_bulk_insert", conf), TypeInformation.of(Object.class), operatorFactory) .uid("uid_bucket_bulk_insert" + conf.getString(FlinkOptions.TABLE_NAME)) .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)) .addSink(DummySink.INSTANCE) @@ -146,7 +151,7 @@ public static DataStreamSink bulkInsert(Configuration conf, RowType rowT } } return dataStream - .transform(writeOpIdentifier("hoodie_bulk_insert_write", conf), + .transform(opIdentifier("hoodie_bulk_insert_write", conf), TypeInformation.of(Object.class), operatorFactory) // follow the parallelism of upstream operators to avoid shuffle @@ -177,7 +182,7 @@ public static DataStreamSink bulkInsert(Configuration conf, RowType rowT * @param bounded Whether the input stream is bounded * @return the appending data stream sink */ - public static DataStreamSink append( + public static DataStream append( Configuration conf, RowType rowType, DataStream dataStream, @@ -190,11 +195,9 @@ public static DataStreamSink append( WriteOperatorFactory operatorFactory = AppendWriteOperator.getFactory(conf, rowType); return dataStream - .transform(writeOpIdentifier("hoodie_append_write", conf), TypeInformation.of(Object.class), operatorFactory) + .transform(opIdentifier("hoodie_append_write", conf), TypeInformation.of(Object.class), operatorFactory) .uid("uid_hoodie_stream_write" + conf.getString(FlinkOptions.TABLE_NAME)) - .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)) - .addSink(DummySink.INSTANCE) - .name("dummy"); + .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)); } /** @@ -322,7 +325,7 @@ public static DataStream hoodieStreamWrite(Configuration conf, int defau String indexKeyFields = conf.getString(FlinkOptions.INDEX_KEY_FIELD); BucketIndexPartitioner partitioner = new BucketIndexPartitioner<>(bucketNum, indexKeyFields); return dataStream.partitionCustom(partitioner, HoodieRecord::getKey) - .transform(writeOpIdentifier("bucket_write", conf), TypeInformation.of(Object.class), operatorFactory) + .transform(opIdentifier("bucket_write", conf), TypeInformation.of(Object.class), operatorFactory) .uid("uid_bucket_write" + conf.getString(FlinkOptions.TABLE_NAME)) .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)); } else { @@ -338,7 +341,7 @@ public static DataStream hoodieStreamWrite(Configuration conf, int defau .setParallelism(conf.getOptional(FlinkOptions.BUCKET_ASSIGN_TASKS).orElse(defaultParallelism)) // shuffle by fileId(bucket id) .keyBy(record -> record.getCurrentLocation().getFileId()) - .transform(writeOpIdentifier("stream_write", conf), TypeInformation.of(Object.class), operatorFactory) + .transform(opIdentifier("stream_write", conf), TypeInformation.of(Object.class), operatorFactory) .uid("uid_stream_write" + conf.getString(FlinkOptions.TABLE_NAME)) .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)); } @@ -379,13 +382,53 @@ public static DataStreamSink compact(Configuration conf, .setParallelism(1); // compaction commit should be singleton } + /** + * The clustering tasks pipeline. + * + *

The clustering plan operator monitors the new clustering plan on the timeline + * then distributes the sub-plans to the clustering tasks. The clustering task then + * handle over the metadata to commit task for clustering transaction commit. + * The whole pipeline looks like the following: + * + *

+   *                                           /=== | task1 | ===\
+   *      | plan generation | ===> re-balance                      | commit |
+   *                                           \=== | task2 | ===/
+   *
+   *      Note: both the clustering plan generation task and commission task are singleton.
+   * 
+ * + * @param conf The configuration + * @param rowType The input row type + * @param dataStream The input data stream + * @return the clustering pipeline + */ + public static DataStreamSink cluster(Configuration conf, RowType rowType, DataStream dataStream) { + return dataStream.transform("cluster_plan_generate", + TypeInformation.of(ClusteringPlanEvent.class), + new ClusteringPlanOperator(conf)) + .setParallelism(1) // plan generate must be singleton + .rebalance() + .transform("clustering_task", + TypeInformation.of(ClusteringCommitEvent.class), + new ClusteringOperator(conf, rowType)) + .setParallelism(conf.getInteger(FlinkOptions.CLUSTERING_TASKS)) + .addSink(new ClusteringCommitSink(conf)) + .name("clustering_commit") + .setParallelism(1); // compaction commit should be singleton + } + public static DataStreamSink clean(Configuration conf, DataStream dataStream) { return dataStream.addSink(new CleanFunction<>(conf)) .setParallelism(1) .name("clean_commits"); } - public static String writeOpIdentifier(String operatorN, Configuration conf) { + public static DataStreamSink dummySink(DataStream dataStream) { + return dataStream.addSink(Pipelines.DummySink.INSTANCE).name("dummy"); + } + + public static String opIdentifier(String operatorN, Configuration conf) { return operatorN + ": " + conf.getString(FlinkOptions.TABLE_NAME); } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java index a12ec23dcb036..013753b6d9276 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java @@ -23,6 +23,7 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.util.Option; import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.configuration.OptionsResolver; import org.apache.hudi.sink.transform.Transformer; import org.apache.hudi.sink.utils.Pipelines; import org.apache.hudi.util.AvroSchemaConverter; @@ -99,7 +100,7 @@ public static void main(String[] args) throws Exception { DataStream hoodieRecordDataStream = Pipelines.bootstrap(conf, rowType, parallelism, dataStream); DataStream pipeline = Pipelines.hoodieStreamWrite(conf, parallelism, hoodieRecordDataStream); - if (StreamerUtil.needsAsyncCompaction(conf)) { + if (OptionsResolver.needsAsyncCompaction(conf)) { Pipelines.compact(conf, pipeline); } else { Pipelines.clean(conf, pipeline); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java index 4dd4f89d03c1d..930cbfe3d871a 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java @@ -24,7 +24,6 @@ import org.apache.hudi.configuration.OptionsResolver; import org.apache.hudi.sink.utils.Pipelines; import org.apache.hudi.util.ChangelogModes; -import org.apache.hudi.util.StreamerUtil; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.configuration.Configuration; @@ -78,7 +77,12 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { // Append mode if (OptionsResolver.isAppendMode(conf)) { - return Pipelines.append(conf, rowType, dataStream, context.isBounded()); + DataStream pipeline = Pipelines.append(conf, rowType, dataStream, context.isBounded()); + if (OptionsResolver.needsAsyncClustering(conf)) { + return Pipelines.cluster(conf, rowType, pipeline); + } else { + return Pipelines.dummySink(pipeline); + } } // default parallelism @@ -90,7 +94,7 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { // write pipeline pipeline = Pipelines.hoodieStreamWrite(conf, parallelism, hoodieRecordDataStream); // compaction - if (StreamerUtil.needsAsyncCompaction(conf)) { + if (OptionsResolver.needsAsyncCompaction(conf)) { return Pipelines.compact(conf, pipeline); } else { return Pipelines.clean(conf, pipeline); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ClusteringUtil.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ClusteringUtil.java new file mode 100644 index 0000000000000..9c753e3ae114d --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ClusteringUtil.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.util; + +import org.apache.hudi.client.HoodieFlinkWriteClient; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.ClusteringUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.configuration.OptionsResolver; +import org.apache.hudi.table.HoodieFlinkTable; + +import org.apache.flink.configuration.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.stream.Collectors; + +/** + * Utilities for flink hudi clustering. + */ +public class ClusteringUtil { + + private static final Logger LOG = LoggerFactory.getLogger(ClusteringUtil.class); + + public static void validateClusteringScheduling(Configuration conf) { + if (OptionsResolver.isBucketIndexType(conf)) { + throw new UnsupportedOperationException("Clustering is not supported for bucket index."); + } + } + + /** + * Schedules clustering plan by condition. + * + * @param conf The configuration + * @param writeClient The write client + * @param committed Whether the instant was committed + */ + public static void scheduleClustering(Configuration conf, HoodieFlinkWriteClient writeClient, boolean committed) { + validateClusteringScheduling(conf); + if (committed) { + writeClient.scheduleClustering(Option.empty()); + } + } + + /** + * Force rolls back all the inflight clustering instants, especially for job failover restart. + * + * @param table The hoodie table + * @param writeClient The write client + */ + public static void rollbackClustering(HoodieFlinkTable table, HoodieFlinkWriteClient writeClient) { + List inflightInstants = ClusteringUtils.getPendingClusteringInstantTimes(table.getMetaClient()) + .stream() + .filter(instant -> instant.getState() == HoodieInstant.State.INFLIGHT) + .collect(Collectors.toList()); + inflightInstants.forEach(inflightInstant -> { + LOG.info("Rollback the inflight clustering instant: " + inflightInstant + " for failover"); + table.rollbackInflightClustering(inflightInstant, + commitToRollback -> writeClient.getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false)); + table.getMetaClient().reloadActiveTimeline(); + }); + } +} diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index fcffbed54b48f..d101037dfae27 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -311,26 +311,6 @@ public static String generateBucketKey(String partitionPath, String fileId) { return String.format("%s_%s", partitionPath, fileId); } - /** - * Returns whether there is need to schedule the async compaction. - * - * @param conf The flink configuration. - */ - public static boolean needsAsyncCompaction(Configuration conf) { - return OptionsResolver.isMorTable(conf) - && conf.getBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED); - } - - /** - * Returns whether there is need to schedule the compaction plan. - * - * @param conf The flink configuration. - */ - public static boolean needsScheduleCompaction(Configuration conf) { - return OptionsResolver.isMorTable(conf) - && conf.getBoolean(FlinkOptions.COMPACTION_SCHEDULE_ENABLED); - } - /** * Creates the meta client for reader. * diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java index 403d0272b4e18..e67d2ab35c768 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java @@ -286,6 +286,29 @@ public void testInsertClustering() throws Exception { .end(); } + @Test + public void testInsertAsyncClustering() throws Exception { + // reset the config option + conf.setString(FlinkOptions.OPERATION, "insert"); + conf.setBoolean(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED, true); + conf.setBoolean(FlinkOptions.CLUSTERING_ASYNC_ENABLED, true); + conf.setInteger(FlinkOptions.CLUSTERING_DELTA_COMMITS, 1); + + prepareInsertPipeline(conf) + .consume(TestData.DATA_SET_INSERT_SAME_KEY) + .checkpoint(1) + .handleEvents(1) + .checkpointComplete(1) + .checkWrittenData(EXPECTED4, 1) + // insert duplicates again + .consume(TestData.DATA_SET_INSERT_SAME_KEY) + .checkpoint(2) + .handleEvents(1) + .checkpointComplete(2) + .checkWrittenFullData(EXPECTED5) + .end(); + } + @Test public void testInsertWithSmallBufferSize() throws Exception { // reset the config option @@ -419,7 +442,7 @@ public void testReuseEmbeddedServer() throws IOException { // ------------------------------------------------------------------------- private TestHarness preparePipeline() throws Exception { - return TestHarness.instance().preparePipeline(tempFile, conf); + return preparePipeline(conf); } protected TestHarness preparePipeline(Configuration conf) throws Exception { @@ -427,6 +450,10 @@ protected TestHarness preparePipeline(Configuration conf) throws Exception { } protected TestHarness prepareInsertPipeline() throws Exception { + return prepareInsertPipeline(conf); + } + + protected TestHarness prepareInsertPipeline(Configuration conf) throws Exception { return TestHarness.instance().preparePipeline(tempFile, conf, true); } diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java index ac2ee0be374ea..0bf49cbbb3be1 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java @@ -20,6 +20,7 @@ import org.apache.hudi.avro.model.HoodieClusteringPlan; import org.apache.hudi.client.HoodieFlinkWriteClient; +import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; @@ -55,8 +56,9 @@ import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.RowType; -import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; import java.io.File; import java.util.HashMap; @@ -82,8 +84,9 @@ public class ITTestHoodieFlinkClustering { @TempDir File tempFile; - @Test - public void testHoodieFlinkClustering() throws Exception { + @ParameterizedTest + @EnumSource(value = HoodieTableType.class) + public void testHoodieFlinkClustering(HoodieTableType tableType) throws Exception { // Create hoodie table and insert into data. EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build(); TableEnvironment tableEnv = TableEnvironmentImpl.create(settings); @@ -91,6 +94,7 @@ public void testHoodieFlinkClustering() throws Exception { .setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1); Map options = new HashMap<>(); options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); + options.put(FlinkOptions.TABLE_TYPE.key(), tableType.name()); // use append mode options.put(FlinkOptions.OPERATION.key(), WriteOperationType.INSERT.value()); @@ -115,7 +119,7 @@ public void testHoodieFlinkClustering() throws Exception { // set the table name conf.setString(FlinkOptions.TABLE_NAME, metaClient.getTableConfig().getTableName()); - conf.setString(FlinkOptions.TABLE_TYPE, metaClient.getTableConfig().getTableType().name()); + conf.setString(FlinkOptions.TABLE_TYPE, tableType.name()); // set record key field conf.setString(FlinkOptions.RECORD_KEY_FIELD, metaClient.getTableConfig().getRecordKeyFieldProp()); @@ -160,7 +164,7 @@ public void testHoodieFlinkClustering() throws Exception { final DataType rowDataType = AvroSchemaConverter.convertToDataType(tableAvroSchema); final RowType rowType = (RowType) rowDataType.getLogicalType(); - DataStream dataStream = env.addSource(new ClusteringPlanSourceFunction(timeline.lastInstant().get(), clusteringPlan)) + DataStream dataStream = env.addSource(new ClusteringPlanSourceFunction(clusteringInstantTime, clusteringPlan)) .name("clustering_source") .uid("uid_clustering_source") .rebalance() @@ -181,4 +185,47 @@ public void testHoodieFlinkClustering() throws Exception { env.execute("flink_hudi_clustering"); TestData.checkWrittenData(tempFile, EXPECTED, 4); } + + @ParameterizedTest + @EnumSource(value = HoodieTableType.class) + public void testHoodieFlinkClusteringService(HoodieTableType tableType) throws Exception { + // Create hoodie table and insert into data. + EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build(); + TableEnvironment tableEnv = TableEnvironmentImpl.create(settings); + tableEnv.getConfig().getConfiguration() + .setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1); + Map options = new HashMap<>(); + options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); + options.put(FlinkOptions.TABLE_TYPE.key(), tableType.name()); + + // use append mode + options.put(FlinkOptions.OPERATION.key(), WriteOperationType.INSERT.value()); + options.put(FlinkOptions.INSERT_CLUSTER.key(), "false"); + + String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options); + tableEnv.executeSql(hoodieTableDDL); + tableEnv.executeSql(TestSQL.INSERT_T1).await(); + + // wait for the asynchronous commit to finish + TimeUnit.SECONDS.sleep(3); + + // Make configuration and setAvroSchema. + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + FlinkClusteringConfig cfg = new FlinkClusteringConfig(); + cfg.path = tempFile.getAbsolutePath(); + cfg.minClusteringIntervalSeconds = 3; + cfg.schedule = true; + Configuration conf = FlinkClusteringConfig.toFlinkConfig(cfg); + conf.setString(FlinkOptions.TABLE_TYPE.key(), tableType.name()); + + HoodieFlinkClusteringJob.AsyncClusteringService asyncClusteringService = new HoodieFlinkClusteringJob.AsyncClusteringService(cfg, conf, env); + asyncClusteringService.start(null); + + // wait for the asynchronous commit to finish + TimeUnit.SECONDS.sleep(5); + + asyncClusteringService.shutDown(); + + TestData.checkWrittenData(tempFile, EXPECTED, 4); + } } diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/ClusteringFunctionWrapper.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/ClusteringFunctionWrapper.java new file mode 100644 index 0000000000000..55a79915d4755 --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/ClusteringFunctionWrapper.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.utils; + +import org.apache.hudi.sink.clustering.ClusteringCommitEvent; +import org.apache.hudi.sink.clustering.ClusteringCommitSink; +import org.apache.hudi.sink.clustering.ClusteringOperator; +import org.apache.hudi.sink.clustering.ClusteringPlanEvent; +import org.apache.hudi.sink.clustering.ClusteringPlanOperator; +import org.apache.hudi.utils.TestConfigurations; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext; +import org.apache.flink.runtime.operators.testutils.MockEnvironment; +import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; + +/** + * A wrapper class to manipulate the {@link ClusteringOperator} instance for testing. + */ +public class ClusteringFunctionWrapper { + private final Configuration conf; + + private final IOManager ioManager; + private final StreamingRuntimeContext runtimeContext; + + private final StreamTask streamTask; + private final StreamConfig streamConfig; + + /** + * Function that generates the {@code HoodieClusteringPlan}. + */ + private ClusteringPlanOperator clusteringPlanOperator; + /** + * Output to collect the clustering commit events. + */ + private CollectorOutput commitEventOutput; + /** + * Function that executes the clustering task. + */ + private ClusteringOperator clusteringOperator; + /** + * Stream sink to handle clustering commits. + */ + private ClusteringCommitSink commitSink; + + public ClusteringFunctionWrapper(Configuration conf, StreamTask streamTask, StreamConfig streamConfig) { + this.ioManager = new IOManagerAsync(); + MockEnvironment environment = new MockEnvironmentBuilder() + .setTaskName("mockTask") + .setManagedMemorySize(4 * MemoryManager.DEFAULT_PAGE_SIZE) + .setIOManager(ioManager) + .build(); + this.runtimeContext = new MockStreamingRuntimeContext(false, 1, 0, environment); + this.conf = conf; + this.streamTask = streamTask; + this.streamConfig = streamConfig; + } + + public void openFunction() throws Exception { + clusteringPlanOperator = new ClusteringPlanOperator(conf); + clusteringPlanOperator.open(); + + clusteringOperator = new ClusteringOperator(conf, TestConfigurations.ROW_TYPE); + // CAUTION: deprecated API used. + clusteringOperator.setProcessingTimeService(new TestProcessingTimeService()); + commitEventOutput = new CollectorOutput<>(); + clusteringOperator.setup(streamTask, streamConfig, commitEventOutput); + clusteringOperator.open(); + final NonThrownExecutor syncExecutor = new MockCoordinatorExecutor( + new MockOperatorCoordinatorContext(new OperatorID(), 1)); + clusteringOperator.setExecutor(syncExecutor); + + commitSink = new ClusteringCommitSink(conf); + commitSink.setRuntimeContext(runtimeContext); + commitSink.open(conf); + } + + public void cluster(long checkpointID) throws Exception { + // collect the ClusteringPlanEvents. + CollectorOutput planOutput = new CollectorOutput<>(); + clusteringPlanOperator.setOutput(planOutput); + clusteringPlanOperator.notifyCheckpointComplete(checkpointID); + // collect the ClusteringCommitEvents + for (ClusteringPlanEvent event : planOutput.getRecords()) { + clusteringOperator.processElement(new StreamRecord<>(event)); + } + // handle and commit the clustering + for (ClusteringCommitEvent event : commitEventOutput.getRecords()) { + commitSink.invoke(event, null); + } + } + + public void close() throws Exception { + ioManager.close(); + } +} diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.java index 642a407c1c7d0..707fe45c47358 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.java @@ -18,6 +18,8 @@ package org.apache.hudi.sink.utils; +import org.apache.hudi.configuration.OptionsResolver; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.sink.StreamWriteOperatorCoordinator; import org.apache.hudi.sink.append.AppendWriteFunction; import org.apache.hudi.sink.bulk.BulkInsertWriterHelper; @@ -25,6 +27,7 @@ import org.apache.hudi.util.AvroSchemaConverter; import org.apache.hudi.util.StreamerUtil; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; @@ -34,9 +37,12 @@ import org.apache.flink.runtime.operators.coordination.OperatorEvent; import org.apache.flink.runtime.operators.testutils.MockEnvironment; import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder; +import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.streaming.api.operators.collect.utils.MockFunctionSnapshotContext; import org.apache.flink.streaming.api.operators.collect.utils.MockOperatorEventGateway; +import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.streaming.util.MockStreamTaskBuilder; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.RowType; @@ -57,12 +63,15 @@ public class InsertFunctionWrapper implements TestFunctionWrapper { private final StreamWriteOperatorCoordinator coordinator; private final MockStateInitializationContext stateInitializationContext; + private final boolean asyncClustering; + private ClusteringFunctionWrapper clusteringFunctionWrapper; + /** * Append write function. */ private AppendWriteFunction writeFunction; - public InsertFunctionWrapper(String tablePath, Configuration conf) { + public InsertFunctionWrapper(String tablePath, Configuration conf) throws Exception { IOManager ioManager = new IOManagerAsync(); MockEnvironment environment = new MockEnvironmentBuilder() .setTaskName("mockTask") @@ -77,6 +86,15 @@ public InsertFunctionWrapper(String tablePath, Configuration conf) { this.coordinatorContext = new MockOperatorCoordinatorContext(new OperatorID(), 1); this.coordinator = new StreamWriteOperatorCoordinator(conf, this.coordinatorContext); this.stateInitializationContext = new MockStateInitializationContext(); + + this.asyncClustering = OptionsResolver.needsAsyncClustering(conf); + StreamConfig streamConfig = new StreamConfig(conf); + streamConfig.setOperatorID(new OperatorID()); + StreamTask streamTask = new MockStreamTaskBuilder(environment) + .setConfig(new StreamConfig(conf)) + .setExecutionConfig(new ExecutionConfig().enableObjectReuse()) + .build(); + this.clusteringFunctionWrapper = new ClusteringFunctionWrapper(this.conf, streamTask, streamConfig); } public void openFunction() throws Exception { @@ -84,6 +102,10 @@ public void openFunction() throws Exception { this.coordinator.setExecutor(new MockCoordinatorExecutor(coordinatorContext)); setupWriteFunction(); + + if (asyncClustering) { + clusteringFunctionWrapper.openFunction(); + } } public void invoke(I record) throws Exception { @@ -109,6 +131,13 @@ public void checkpointFunction(long checkpointId) throws Exception { public void checkpointComplete(long checkpointId) { stateInitializationContext.getOperatorStateStore().checkpointSuccess(checkpointId); coordinator.notifyCheckpointComplete(checkpointId); + if (asyncClustering) { + try { + clusteringFunctionWrapper.cluster(checkpointId); + } catch (Exception e) { + throw new HoodieException(e); + } + } } public StreamWriteOperatorCoordinator getCoordinator() { @@ -118,6 +147,9 @@ public StreamWriteOperatorCoordinator getCoordinator() { @Override public void close() throws Exception { this.coordinator.close(); + if (clusteringFunctionWrapper != null) { + clusteringFunctionWrapper.close(); + } } public BulkInsertWriterHelper getWriterHelper() { diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java index 2bb0f69d18ca9..a1a14456e3c59 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java @@ -21,6 +21,7 @@ import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.configuration.OptionsResolver; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.sink.StreamWriteFunction; import org.apache.hudi.sink.StreamWriteOperatorCoordinator; @@ -28,7 +29,6 @@ import org.apache.hudi.sink.event.WriteMetadataEvent; import org.apache.hudi.sink.partitioner.BucketAssignFunction; import org.apache.hudi.sink.transform.RowDataToHoodieFunction; -import org.apache.hudi.util.StreamerUtil; import org.apache.hudi.utils.TestConfigurations; import org.apache.flink.api.common.ExecutionConfig; @@ -120,8 +120,7 @@ public StreamWriteFunctionWrapper(String tablePath, Configuration conf) throws E this.compactFunctionWrapper = new CompactFunctionWrapper(this.conf); this.bucketAssignFunctionContext = new MockBucketAssignFunctionContext(); this.stateInitializationContext = new MockStateInitializationContext(); - this.compactFunctionWrapper = new CompactFunctionWrapper(this.conf); - this.asyncCompaction = StreamerUtil.needsAsyncCompaction(conf); + this.asyncCompaction = OptionsResolver.needsAsyncCompaction(conf); this.streamConfig = new StreamConfig(conf); streamConfig.setOperatorID(new OperatorID()); this.streamTask = new MockStreamTaskBuilder(environment) diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java index ba60ff9469d73..b752258219238 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java @@ -25,6 +25,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.HadoopConfigurations; import org.apache.hudi.configuration.OptionsResolver; @@ -337,7 +338,9 @@ public TestHarness checkWrittenData(Map expected) throws Excepti public TestHarness checkWrittenData( Map expected, int partitions) throws Exception { - if (OptionsResolver.isCowTable(conf) || conf.getBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED)) { + if (OptionsResolver.isCowTable(conf) + || conf.getBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED) + || OptionsResolver.isAppendMode(conf)) { TestData.checkWrittenData(this.baseFile, expected, partitions); } else { checkWrittenDataMor(baseFile, expected, partitions); @@ -419,7 +422,7 @@ private void checkInstantState(HoodieInstant.State state, String instantStr) { protected String lastCompleteInstant() { return OptionsResolver.isMorTable(conf) ? TestUtils.getLastDeltaCompleteInstant(basePath) - : TestUtils.getLastCompleteInstant(basePath); + : TestUtils.getLastCompleteInstant(basePath, HoodieTimeline.COMMIT_ACTION); } } } diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java index c3aa9c25c61a2..2d55b4001c0c8 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java @@ -50,6 +50,16 @@ public static String getLastCompleteInstant(String basePath) { return StreamerUtil.getLastCompletedInstant(metaClient); } + public static String getLastCompleteInstant(String basePath, String commitAction) { + final HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder() + .setConf(HadoopConfigurations.getHadoopConf(new Configuration())).setBasePath(basePath).build(); + return metaClient.getCommitsTimeline().filterCompletedInstants() + .filter(instant -> commitAction.equals(instant.getAction())) + .lastInstant() + .map(HoodieInstant::getTimestamp) + .orElse(null); + } + public static String getLastDeltaCompleteInstant(String basePath) { final HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder() .setConf(HadoopConfigurations.getHadoopConf(new Configuration())).setBasePath(basePath).build(); From 99b98b80a774ab0d51294b34b618a9f5c44aebcc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=96=BB=E5=85=86=E9=9D=96?= Date: Wed, 22 Jun 2022 20:13:30 +0800 Subject: [PATCH 2/2] delete deprecated clustering plan strategy and add clustering ITTest --- .../strategy/ClusteringPlanStrategy.java | 13 +--- ...FlinkRecentDaysClusteringPlanStrategy.java | 65 ---------------- ...ectedPartitionsClusteringPlanStrategy.java | 67 ----------------- .../hudi/configuration/FlinkOptions.java | 20 ++++- .../sink/clustering/ClusteringOperator.java | 10 ++- .../clustering/ClusteringPlanOperator.java | 8 +- .../ClusteringPlanSourceFunction.java | 2 +- .../clustering/FlinkClusteringConfig.java | 7 +- .../clustering/HoodieFlinkClusteringJob.java | 2 +- .../org/apache/hudi/util/ClusteringUtil.java | 1 - .../org/apache/hudi/util/StreamerUtil.java | 3 + .../hudi/sink/ITTestDataStreamWrite.java | 74 +++++++++++++++++++ .../cluster/ITTestHoodieFlinkClustering.java | 20 ++--- 13 files changed, 122 insertions(+), 170 deletions(-) delete mode 100644 hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkRecentDaysClusteringPlanStrategy.java delete mode 100644 hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkSelectedPartitionsClusteringPlanStrategy.java diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringPlanStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringPlanStrategy.java index a96ff73947cdb..34b35d2ba946d 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringPlanStrategy.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringPlanStrategy.java @@ -70,9 +70,6 @@ public static String checkAndGetClusteringPlanStrategy(HoodieWriteConfig config) String sparkSizeBasedClassName = HoodieClusteringConfig.SPARK_SIZED_BASED_CLUSTERING_PLAN_STRATEGY; String sparkSelectedPartitionsClassName = "org.apache.hudi.client.clustering.plan.strategy.SparkSelectedPartitionsClusteringPlanStrategy"; String sparkRecentDaysClassName = "org.apache.hudi.client.clustering.plan.strategy.SparkRecentDaysClusteringPlanStrategy"; - String flinkSizeBasedClassName = HoodieClusteringConfig.FLINK_SIZED_BASED_CLUSTERING_PLAN_STRATEGY; - String flinkSelectedPartitionsClassName = "org.apache.hudi.client.clustering.plan.strategy.FlinkSelectedPartitionsClusteringPlanStrategy"; - String flinkRecentDaysClassName = "org.apache.hudi.client.clustering.plan.strategy.FlinkRecentDaysClusteringPlanStrategy"; String javaSelectedPartitionClassName = "org.apache.hudi.client.clustering.plan.strategy.JavaRecentDaysClusteringPlanStrategy"; String javaSizeBasedClassName = HoodieClusteringConfig.JAVA_SIZED_BASED_CLUSTERING_PLAN_STRATEGY; @@ -85,14 +82,6 @@ public static String checkAndGetClusteringPlanStrategy(HoodieWriteConfig config) config.setValue(HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME, ClusteringPlanPartitionFilterMode.SELECTED_PARTITIONS.name()); LOG.warn(String.format(logStr, className, sparkSizeBasedClassName, HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME.key(), ClusteringPlanPartitionFilterMode.SELECTED_PARTITIONS.name())); return sparkSizeBasedClassName; - } else if (flinkRecentDaysClassName.equals(className)) { - config.setValue(HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME, ClusteringPlanPartitionFilterMode.RECENT_DAYS.name()); - LOG.warn(String.format(logStr, className, sparkSizeBasedClassName, HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME.key(), ClusteringPlanPartitionFilterMode.RECENT_DAYS.name())); - return flinkSizeBasedClassName; - } else if (flinkSelectedPartitionsClassName.equals(className)) { - config.setValue(HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME, ClusteringPlanPartitionFilterMode.SELECTED_PARTITIONS.name()); - LOG.warn(String.format(logStr, className, sparkSizeBasedClassName, HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME.key(), ClusteringPlanPartitionFilterMode.SELECTED_PARTITIONS.name())); - return flinkSizeBasedClassName; } else if (javaSelectedPartitionClassName.equals(className)) { config.setValue(HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME, ClusteringPlanPartitionFilterMode.RECENT_DAYS.name()); LOG.warn(String.format(logStr, className, javaSizeBasedClassName, HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME.key(), ClusteringPlanPartitionFilterMode.SELECTED_PARTITIONS.name())); @@ -173,7 +162,7 @@ protected Map buildMetrics(List fileSlices) { return metrics; } - protected HoodieTable getHoodieTable() { + protected HoodieTable getHoodieTable() { return this.hoodieTable; } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkRecentDaysClusteringPlanStrategy.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkRecentDaysClusteringPlanStrategy.java deleted file mode 100644 index 0109aaa60ffb9..0000000000000 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkRecentDaysClusteringPlanStrategy.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.client.clustering.plan.strategy; - -import org.apache.hudi.client.common.HoodieFlinkEngineContext; -import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.table.HoodieFlinkCopyOnWriteTable; -import org.apache.hudi.table.HoodieFlinkMergeOnReadTable; - -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; - -import java.util.Comparator; -import java.util.List; -import java.util.stream.Collectors; - -/** - * Clustering Strategy based on following. - * 1) Only looks at latest 'daybased.lookback.partitions' partitions. - * 2) Excludes files that are greater than 'small.file.limit' from clustering plan. - */ -public class FlinkRecentDaysClusteringPlanStrategy> - extends FlinkSizeBasedClusteringPlanStrategy { - private static final Logger LOG = LogManager.getLogger(FlinkRecentDaysClusteringPlanStrategy.class); - - public FlinkRecentDaysClusteringPlanStrategy(HoodieFlinkCopyOnWriteTable table, - HoodieFlinkEngineContext engineContext, - HoodieWriteConfig writeConfig) { - super(table, engineContext, writeConfig); - } - - public FlinkRecentDaysClusteringPlanStrategy(HoodieFlinkMergeOnReadTable table, - HoodieFlinkEngineContext engineContext, - HoodieWriteConfig writeConfig) { - super(table, engineContext, writeConfig); - } - - @Override - protected List filterPartitionPaths(List partitionPaths) { - int targetPartitionsForClustering = getWriteConfig().getTargetPartitionsForClustering(); - int skipPartitionsFromLatestForClustering = getWriteConfig().getSkipPartitionsFromLatestForClustering(); - return partitionPaths.stream() - .sorted(Comparator.reverseOrder()) - .skip(Math.max(skipPartitionsFromLatestForClustering, 0)) - .limit(targetPartitionsForClustering > 0 ? targetPartitionsForClustering : partitionPaths.size()) - .collect(Collectors.toList()); - } -} diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkSelectedPartitionsClusteringPlanStrategy.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkSelectedPartitionsClusteringPlanStrategy.java deleted file mode 100644 index ae5726bb4a46e..0000000000000 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkSelectedPartitionsClusteringPlanStrategy.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.client.clustering.plan.strategy; - -import org.apache.hudi.client.common.HoodieFlinkEngineContext; -import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.table.HoodieFlinkCopyOnWriteTable; -import org.apache.hudi.table.HoodieFlinkMergeOnReadTable; - -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; - -import java.util.List; -import java.util.stream.Collectors; - -import static org.apache.hudi.config.HoodieClusteringConfig.CLUSTERING_STRATEGY_PARAM_PREFIX; - -/** - * Clustering Strategy to filter just specified partitions from [begin, end]. Note both begin and end are inclusive. - */ -public class FlinkSelectedPartitionsClusteringPlanStrategy> - extends FlinkSizeBasedClusteringPlanStrategy { - private static final Logger LOG = LogManager.getLogger(FlinkSelectedPartitionsClusteringPlanStrategy.class); - - public static final String CONF_BEGIN_PARTITION = CLUSTERING_STRATEGY_PARAM_PREFIX + "cluster.begin.partition"; - public static final String CONF_END_PARTITION = CLUSTERING_STRATEGY_PARAM_PREFIX + "cluster.end.partition"; - - public FlinkSelectedPartitionsClusteringPlanStrategy(HoodieFlinkCopyOnWriteTable table, - HoodieFlinkEngineContext engineContext, - HoodieWriteConfig writeConfig) { - super(table, engineContext, writeConfig); - } - - public FlinkSelectedPartitionsClusteringPlanStrategy(HoodieFlinkMergeOnReadTable table, - HoodieFlinkEngineContext engineContext, - HoodieWriteConfig writeConfig) { - super(table, engineContext, writeConfig); - } - - @Override - protected List filterPartitionPaths(List partitionPaths) { - String beginPartition = getWriteConfig().getProps().getProperty(CONF_BEGIN_PARTITION); - String endPartition = getWriteConfig().getProps().getProperty(CONF_END_PARTITION); - List filteredPartitions = partitionPaths.stream() - .filter(path -> path.compareTo(beginPartition) >= 0 && path.compareTo(endPartition) <= 0) - .collect(Collectors.toList()); - LOG.info("Filtered to the following partitions: " + filteredPartitions); - return filteredPartitions; - } -} diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index 9463e497de99b..f34c673518e01 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -18,7 +18,7 @@ package org.apache.hudi.configuration; -import org.apache.hudi.client.clustering.plan.strategy.FlinkRecentDaysClusteringPlanStrategy; +import org.apache.hudi.client.clustering.plan.strategy.FlinkSizeBasedClusteringPlanStrategy; import org.apache.hudi.common.config.ConfigClassProperty; import org.apache.hudi.common.config.ConfigGroups; import org.apache.hudi.common.config.HoodieConfig; @@ -45,6 +45,11 @@ import java.util.Map; import java.util.Set; +import static org.apache.hudi.config.HoodieClusteringConfig.DAYBASED_LOOKBACK_PARTITIONS; +import static org.apache.hudi.config.HoodieClusteringConfig.PARTITION_FILTER_BEGIN_PARTITION; +import static org.apache.hudi.config.HoodieClusteringConfig.PARTITION_FILTER_END_PARTITION; +import static org.apache.hudi.config.HoodieClusteringConfig.PLAN_STRATEGY_SKIP_PARTITIONS_FROM_LATEST; + /** * Hoodie Flink config options. * @@ -621,11 +626,22 @@ private FlinkOptions() { public static final ConfigOption CLUSTERING_PLAN_STRATEGY_CLASS = ConfigOptions .key("clustering.plan.strategy.class") .stringType() - .defaultValue(FlinkRecentDaysClusteringPlanStrategy.class.getName()) + .defaultValue(FlinkSizeBasedClusteringPlanStrategy.class.getName()) .withDescription("Config to provide a strategy class (subclass of ClusteringPlanStrategy) to create clustering plan " + "i.e select what file groups are being clustered. Default strategy, looks at the last N (determined by " + CLUSTERING_TARGET_PARTITIONS.key() + ") day based partitions picks the small file slices within those partitions."); + public static final ConfigOption CLUSTERING_PLAN_PARTITION_FILTER_MODE_NAME = ConfigOptions + .key("clustering.plan.partition.filter.mode") + .stringType() + .defaultValue("NONE") + .withDescription("Partition filter mode used in the creation of clustering plan. Available values are - " + + "NONE: do not filter table partition and thus the clustering plan will include all partitions that have clustering candidate." + + "RECENT_DAYS: keep a continuous range of partitions, worked together with configs '" + DAYBASED_LOOKBACK_PARTITIONS.key() + "' and '" + + PLAN_STRATEGY_SKIP_PARTITIONS_FROM_LATEST.key() + "." + + "SELECTED_PARTITIONS: keep partitions that are in the specified range ['" + PARTITION_FILTER_BEGIN_PARTITION.key() + "', '" + + PARTITION_FILTER_END_PARTITION.key() + "']."); + public static final ConfigOption CLUSTERING_PLAN_STRATEGY_TARGET_FILE_MAX_BYTES = ConfigOptions .key("clustering.plan.strategy.target.file.max.bytes") .intType() diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java index 8abd870a35faa..676532402c7eb 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java @@ -115,6 +115,11 @@ public class ClusteringOperator extends TableStreamOperator table, long checkpointId) { // generate clustering plan // should support configurable commit metadata + HoodieInstant clusteringInstant = HoodieTimeline.getReplaceCommitRequestedInstant(clusteringInstantTime); Option> clusteringPlanOption = ClusteringUtils.getClusteringPlan( - table.getMetaClient(), HoodieTimeline.getReplaceCommitRequestedInstant(clusteringInstantTime)); + table.getMetaClient(), clusteringInstant); if (!clusteringPlanOption.isPresent()) { // do nothing. @@ -118,13 +119,12 @@ private void scheduleClustering(HoodieFlinkTable table, long checkpointId) { // do nothing. LOG.info("Empty clustering plan for instant " + clusteringInstantTime); } else { - HoodieInstant instant = HoodieTimeline.getReplaceCommitRequestedInstant(clusteringInstantTime); // Mark instant as clustering inflight - table.getActiveTimeline().transitionReplaceRequestedToInflight(instant, Option.empty()); + table.getActiveTimeline().transitionReplaceRequestedToInflight(clusteringInstant, Option.empty()); table.getMetaClient().reloadActiveTimeline(); for (HoodieClusteringGroup clusteringGroup : clusteringPlan.getInputGroups()) { - LOG.info("ClusteringPlanSourceFunction cluster " + clusteringGroup + " files"); + LOG.info("Execute clustering plan for instant {} as {} file slices", clusteringInstantTime, clusteringGroup.getSlices().size()); output.collect(new StreamRecord<>( new ClusteringPlanEvent(clusteringInstantTime, ClusteringGroupInfo.create(clusteringGroup), clusteringPlan.getStrategy().getStrategyParams()) )); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringPlanSourceFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringPlanSourceFunction.java index 9b05165ae5acb..fafaf9a1ce963 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringPlanSourceFunction.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringPlanSourceFunction.java @@ -73,7 +73,7 @@ public void open(Configuration parameters) throws Exception { @Override public void run(SourceContext sourceContext) throws Exception { for (HoodieClusteringGroup clusteringGroup : clusteringPlan.getInputGroups()) { - LOG.info("ClusteringPlanSourceFunction cluster " + clusteringGroup + " files"); + LOG.info("Execute clustering plan for instant {} as {} file slices", clusteringInstantTime, clusteringGroup.getSlices().size()); sourceContext.collect(new ClusteringPlanEvent(this.clusteringInstantTime, ClusteringGroupInfo.create(clusteringGroup), clusteringPlan.getStrategy().getStrategyParams())); } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/FlinkClusteringConfig.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/FlinkClusteringConfig.java index 655337a766908..74fa73c3f935b 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/FlinkClusteringConfig.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/FlinkClusteringConfig.java @@ -18,6 +18,7 @@ package org.apache.hudi.sink.clustering; +import org.apache.hudi.client.clustering.plan.strategy.FlinkSizeBasedClusteringPlanStrategy; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.util.StreamerUtil; @@ -78,7 +79,10 @@ public class FlinkClusteringConfig extends Configuration { public Boolean cleanAsyncEnable = false; @Parameter(names = {"--plan-strategy-class"}, description = "Config to provide a strategy class to generator clustering plan", required = false) - public String planStrategyClass = "org.apache.hudi.client.clustering.plan.strategy.FlinkRecentDaysClusteringPlanStrategy"; + public String planStrategyClass = FlinkSizeBasedClusteringPlanStrategy.class.getName(); + + @Parameter(names = {"--plan-partition-filter-mode"}, description = "Partition filter mode used in the creation of clustering plan", required = false) + public String planPartitionFilterMode = "NONE"; @Parameter(names = {"--target-file-max-bytes"}, description = "Each group can produce 'N' (CLUSTERING_MAX_GROUP_SIZE/CLUSTERING_TARGET_FILE_SIZE) output file groups, default 1 GB", required = false) public Integer targetFileMaxBytes = 1024 * 1024 * 1024; @@ -129,6 +133,7 @@ public static Configuration toFlinkConfig(FlinkClusteringConfig config) { conf.setInteger(FlinkOptions.CLUSTERING_DELTA_COMMITS, config.clusteringDeltaCommits); conf.setInteger(FlinkOptions.CLUSTERING_TASKS, config.clusteringTasks); conf.setString(FlinkOptions.CLUSTERING_PLAN_STRATEGY_CLASS, config.planStrategyClass); + conf.setString(FlinkOptions.CLUSTERING_PLAN_PARTITION_FILTER_MODE_NAME, config.planPartitionFilterMode); conf.setInteger(FlinkOptions.CLUSTERING_PLAN_STRATEGY_TARGET_FILE_MAX_BYTES, config.targetFileMaxBytes); conf.setInteger(FlinkOptions.CLUSTERING_PLAN_STRATEGY_SMALL_FILE_LIMIT, config.smallFileLimit); conf.setInteger(FlinkOptions.CLUSTERING_PLAN_STRATEGY_SKIP_PARTITIONS_FROM_LATEST, config.skipFromLatestPartitions); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java index c88aa4b989b01..d29f1f9a49c6c 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java @@ -226,7 +226,7 @@ private void cluster() throws Exception { boolean scheduled = writeClient.scheduleClusteringAtInstant(clusteringInstantTime, Option.empty()); if (!scheduled) { // do nothing. - LOG.info("No clustering plan for this job "); + LOG.info("No clustering plan for this job"); return; } table.getMetaClient().reloadActiveTimeline(); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ClusteringUtil.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ClusteringUtil.java index 9c753e3ae114d..79e53f1ed43d8 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ClusteringUtil.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ClusteringUtil.java @@ -20,7 +20,6 @@ import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.common.table.timeline.HoodieInstant; -import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.ClusteringUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.configuration.OptionsResolver; diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index d101037dfae27..d292b3832ac3b 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -53,6 +53,7 @@ import org.apache.hudi.sink.transform.ChainedTransformer; import org.apache.hudi.sink.transform.Transformer; import org.apache.hudi.streamer.FlinkStreamerConfig; +import org.apache.hudi.table.action.cluster.ClusteringPlanPartitionFilterMode; import org.apache.hudi.table.action.compact.CompactionTriggerStrategy; import org.apache.avro.Schema; @@ -168,6 +169,8 @@ public static HoodieWriteConfig getHoodieClientConfig( HoodieClusteringConfig.newBuilder() .withAsyncClustering(conf.getBoolean(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED)) .withClusteringPlanStrategyClass(conf.getString(FlinkOptions.CLUSTERING_PLAN_STRATEGY_CLASS)) + .withClusteringPlanPartitionFilterMode( + ClusteringPlanPartitionFilterMode.valueOf(conf.getString(FlinkOptions.CLUSTERING_PLAN_PARTITION_FILTER_MODE_NAME))) .withClusteringTargetPartitions(conf.getInteger(FlinkOptions.CLUSTERING_TARGET_PARTITIONS)) .withClusteringMaxNumGroups(conf.getInteger(FlinkOptions.CLUSTERING_MAX_NUM_GROUPS)) .withClusteringTargetFileMaxBytes(conf.getInteger(FlinkOptions.CLUSTERING_PLAN_STRATEGY_TARGET_FILE_MAX_BYTES)) diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java index 9c5eeeeb5a822..a25f0149c149b 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java @@ -152,6 +152,17 @@ public void testWriteMergeOnReadWithCompaction(String indexType) throws Exceptio testWriteToHoodie(conf, "mor_write_with_compact", 1, EXPECTED); } + @Test + public void testWriteMergeOnReadWithClustering() throws Exception { + Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); + conf.setBoolean(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED, true); + conf.setInteger(FlinkOptions.CLUSTERING_DELTA_COMMITS, 1); + conf.setString(FlinkOptions.OPERATION, "insert"); + conf.setString(FlinkOptions.TABLE_TYPE, HoodieTableType.COPY_ON_WRITE.name()); + + testWriteToHoodieWithCluster(conf, "cow_write_with_cluster", 1, EXPECTED); + } + private void testWriteToHoodie( Transformer transformer, String jobName, @@ -250,6 +261,69 @@ private void testWriteToHoodie( } TestData.checkWrittenFullData(tempFile, expected); + } + + private void testWriteToHoodieWithCluster( + Configuration conf, + String jobName, + int checkpoints, + Map> expected) throws Exception { + + StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); + execEnv.getConfig().disableObjectReuse(); + execEnv.setParallelism(4); + // set up checkpoint interval + execEnv.enableCheckpointing(4000, CheckpointingMode.EXACTLY_ONCE); + execEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1); + + // Read from file source + RowType rowType = + (RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf)) + .getLogicalType(); + JsonRowDataDeserializationSchema deserializationSchema = new JsonRowDataDeserializationSchema( + rowType, + InternalTypeInfo.of(rowType), + false, + true, + TimestampFormat.ISO_8601 + ); + String sourcePath = Objects.requireNonNull(Thread.currentThread() + .getContextClassLoader().getResource("test_source.data")).toString(); + + boolean isMor = conf.getString(FlinkOptions.TABLE_TYPE).equals(HoodieTableType.MERGE_ON_READ.name()); + + DataStream dataStream; + if (isMor) { + TextInputFormat format = new TextInputFormat(new Path(sourcePath)); + format.setFilesFilter(FilePathFilter.createDefaultFilter()); + TypeInformation typeInfo = BasicTypeInfo.STRING_TYPE_INFO; + format.setCharsetName("UTF-8"); + + dataStream = execEnv + // use PROCESS_CONTINUOUSLY mode to trigger checkpoint + .readFile(format, sourcePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000, typeInfo) + .map(record -> deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8))) + .setParallelism(1); + } else { + dataStream = execEnv + // use continuous file source to trigger checkpoint + .addSource(new ContinuousFileSource.BoundedSourceFunction(new Path(sourcePath), checkpoints)) + .name("continuous_file_source") + .setParallelism(1) + .map(record -> deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8))) + .setParallelism(4); + } + + DataStream pipeline = Pipelines.append(conf, rowType, dataStream, true); + execEnv.addOperator(pipeline.getTransformation()); + + Pipelines.cluster(conf, rowType, pipeline); + JobClient client = execEnv.executeAsync(jobName); + + // wait for the streaming job to finish + client.getJobExecutionResult().get(); + + TestData.checkWrittenFullData(tempFile, expected); } } diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java index 0bf49cbbb3be1..aba8e4c7b4b92 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java @@ -20,7 +20,6 @@ import org.apache.hudi.avro.model.HoodieClusteringPlan; import org.apache.hudi.client.HoodieFlinkWriteClient; -import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; @@ -56,9 +55,8 @@ import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.RowType; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.EnumSource; import java.io.File; import java.util.HashMap; @@ -84,9 +82,8 @@ public class ITTestHoodieFlinkClustering { @TempDir File tempFile; - @ParameterizedTest - @EnumSource(value = HoodieTableType.class) - public void testHoodieFlinkClustering(HoodieTableType tableType) throws Exception { + @Test + public void testHoodieFlinkClustering() throws Exception { // Create hoodie table and insert into data. EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build(); TableEnvironment tableEnv = TableEnvironmentImpl.create(settings); @@ -94,7 +91,6 @@ public void testHoodieFlinkClustering(HoodieTableType tableType) throws Exceptio .setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1); Map options = new HashMap<>(); options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); - options.put(FlinkOptions.TABLE_TYPE.key(), tableType.name()); // use append mode options.put(FlinkOptions.OPERATION.key(), WriteOperationType.INSERT.value()); @@ -119,7 +115,6 @@ public void testHoodieFlinkClustering(HoodieTableType tableType) throws Exceptio // set the table name conf.setString(FlinkOptions.TABLE_NAME, metaClient.getTableConfig().getTableName()); - conf.setString(FlinkOptions.TABLE_TYPE, tableType.name()); // set record key field conf.setString(FlinkOptions.RECORD_KEY_FIELD, metaClient.getTableConfig().getRecordKeyFieldProp()); @@ -137,7 +132,7 @@ public void testHoodieFlinkClustering(HoodieTableType tableType) throws Exceptio // To compute the clustering instant time and do clustering. String clusteringInstantTime = HoodieActiveTimeline.createNewInstantTime(); - HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf, null); + HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf); HoodieFlinkTable table = writeClient.getHoodieTable(); boolean scheduled = writeClient.scheduleClusteringAtInstant(clusteringInstantTime, Option.empty()); @@ -186,9 +181,8 @@ public void testHoodieFlinkClustering(HoodieTableType tableType) throws Exceptio TestData.checkWrittenData(tempFile, EXPECTED, 4); } - @ParameterizedTest - @EnumSource(value = HoodieTableType.class) - public void testHoodieFlinkClusteringService(HoodieTableType tableType) throws Exception { + @Test + public void testHoodieFlinkClusteringService() throws Exception { // Create hoodie table and insert into data. EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build(); TableEnvironment tableEnv = TableEnvironmentImpl.create(settings); @@ -196,7 +190,6 @@ public void testHoodieFlinkClusteringService(HoodieTableType tableType) throws E .setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1); Map options = new HashMap<>(); options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); - options.put(FlinkOptions.TABLE_TYPE.key(), tableType.name()); // use append mode options.put(FlinkOptions.OPERATION.key(), WriteOperationType.INSERT.value()); @@ -216,7 +209,6 @@ public void testHoodieFlinkClusteringService(HoodieTableType tableType) throws E cfg.minClusteringIntervalSeconds = 3; cfg.schedule = true; Configuration conf = FlinkClusteringConfig.toFlinkConfig(cfg); - conf.setString(FlinkOptions.TABLE_TYPE.key(), tableType.name()); HoodieFlinkClusteringJob.AsyncClusteringService asyncClusteringService = new HoodieFlinkClusteringJob.AsyncClusteringService(cfg, conf, env); asyncClusteringService.start(null);