diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java index 270027df18053..251ff97799ffa 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java @@ -1379,7 +1379,7 @@ protected Option inlineScheduleClustering(Option> ex return scheduleClustering(extraMetadata); } - protected void rollbackInflightClustering(HoodieInstant inflightInstant, HoodieTable table) { + public void rollbackInflightClustering(HoodieInstant inflightInstant, HoodieTable table) { Option pendingRollbackInstantInfo = getPendingRollbackInfo(table.getMetaClient(), inflightInstant.getTimestamp(), false); String commitTime = pendingRollbackInstantInfo.map(entry -> entry.getRollbackInstant().getTimestamp()).orElse(HoodieActiveTimeline.createNewInstantTime()); table.scheduleRollback(context, commitTime, inflightInstant, false, config.shouldRollbackUsingMarkers()); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java index eee6f4f4927e0..1180845a6ed8a 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java @@ -51,6 +51,8 @@ public class HoodieClusteringConfig extends HoodieConfig { public static final String CLUSTERING_STRATEGY_PARAM_PREFIX = "hoodie.clustering.plan.strategy."; public static final String SPARK_SIZED_BASED_CLUSTERING_PLAN_STRATEGY = "org.apache.hudi.client.clustering.plan.strategy.SparkSizeBasedClusteringPlanStrategy"; + public static final String FLINK_SIZED_BASED_CLUSTERING_PLAN_STRATEGY = + "org.apache.hudi.client.clustering.plan.strategy.FlinkSizeBasedClusteringPlanStrategy"; public static final String JAVA_SIZED_BASED_CLUSTERING_PLAN_STRATEGY = "org.apache.hudi.client.clustering.plan.strategy.JavaSizeBasedClusteringPlanStrategy"; public static final String SPARK_SORT_AND_SIZE_EXECUTION_STRATEGY = diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BulkInsertPartitioner.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BulkInsertPartitioner.java index 63b502531a896..89360c247403d 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BulkInsertPartitioner.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BulkInsertPartitioner.java @@ -25,20 +25,20 @@ import java.io.Serializable; /** - * Repartition input records into at least expected number of output spark partitions. It should give below guarantees - - * Output spark partition will have records from only one hoodie partition. - Average records per output spark - * partitions should be almost equal to (#inputRecords / #outputSparkPartitions) to avoid possible skews. + * Repartition input records into at least expected number of output partitions. It should give below guarantees - + * Output partition will have records from only one hoodie partition. - Average records per output + * partitions should be almost equal to (#inputRecords / #outputPartitions) to avoid possible skews. */ public interface BulkInsertPartitioner extends Serializable { /** - * Repartitions the input records into at least expected number of output spark partitions. + * Repartitions the input records into at least expected number of output partitions. * - * @param records Input Hoodie records - * @param outputSparkPartitions Expected number of output partitions + * @param records Input Hoodie records + * @param outputPartitions Expected number of output partitions * @return */ - I repartitionRecords(I records, int outputSparkPartitions); + I repartitionRecords(I records, int outputPartitions); /** * @return {@code true} if the records within a partition are sorted; {@code false} otherwise. @@ -48,6 +48,7 @@ public interface BulkInsertPartitioner extends Serializable { /** * Return file group id prefix for the given data partition. * By defauult, return a new file group id prefix, so that incoming records will route to a fresh new file group + * * @param partitionId data partition * @return */ @@ -57,6 +58,7 @@ default String getFileIdPfx(int partitionId) { /** * Return write handle factory for the given partition. + * * @param partitionId data partition * @return */ diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringPlanStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringPlanStrategy.java index 479f63932c5b3..a96ff73947cdb 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringPlanStrategy.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringPlanStrategy.java @@ -70,6 +70,9 @@ public static String checkAndGetClusteringPlanStrategy(HoodieWriteConfig config) String sparkSizeBasedClassName = HoodieClusteringConfig.SPARK_SIZED_BASED_CLUSTERING_PLAN_STRATEGY; String sparkSelectedPartitionsClassName = "org.apache.hudi.client.clustering.plan.strategy.SparkSelectedPartitionsClusteringPlanStrategy"; String sparkRecentDaysClassName = "org.apache.hudi.client.clustering.plan.strategy.SparkRecentDaysClusteringPlanStrategy"; + String flinkSizeBasedClassName = HoodieClusteringConfig.FLINK_SIZED_BASED_CLUSTERING_PLAN_STRATEGY; + String flinkSelectedPartitionsClassName = "org.apache.hudi.client.clustering.plan.strategy.FlinkSelectedPartitionsClusteringPlanStrategy"; + String flinkRecentDaysClassName = "org.apache.hudi.client.clustering.plan.strategy.FlinkRecentDaysClusteringPlanStrategy"; String javaSelectedPartitionClassName = "org.apache.hudi.client.clustering.plan.strategy.JavaRecentDaysClusteringPlanStrategy"; String javaSizeBasedClassName = HoodieClusteringConfig.JAVA_SIZED_BASED_CLUSTERING_PLAN_STRATEGY; @@ -82,6 +85,14 @@ public static String checkAndGetClusteringPlanStrategy(HoodieWriteConfig config) config.setValue(HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME, ClusteringPlanPartitionFilterMode.SELECTED_PARTITIONS.name()); LOG.warn(String.format(logStr, className, sparkSizeBasedClassName, HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME.key(), ClusteringPlanPartitionFilterMode.SELECTED_PARTITIONS.name())); return sparkSizeBasedClassName; + } else if (flinkRecentDaysClassName.equals(className)) { + config.setValue(HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME, ClusteringPlanPartitionFilterMode.RECENT_DAYS.name()); + LOG.warn(String.format(logStr, className, sparkSizeBasedClassName, HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME.key(), ClusteringPlanPartitionFilterMode.RECENT_DAYS.name())); + return flinkSizeBasedClassName; + } else if (flinkSelectedPartitionsClassName.equals(className)) { + config.setValue(HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME, ClusteringPlanPartitionFilterMode.SELECTED_PARTITIONS.name()); + LOG.warn(String.format(logStr, className, sparkSizeBasedClassName, HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME.key(), ClusteringPlanPartitionFilterMode.SELECTED_PARTITIONS.name())); + return flinkSizeBasedClassName; } else if (javaSelectedPartitionClassName.equals(className)) { config.setValue(HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME, ClusteringPlanPartitionFilterMode.RECENT_DAYS.name()); LOG.warn(String.format(logStr, className, javaSizeBasedClassName, HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME.key(), ClusteringPlanPartitionFilterMode.SELECTED_PARTITIONS.name())); diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index 49fa2ec246cf9..ddfbabaf36ae9 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -29,8 +29,10 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.HoodieReplaceCommitMetadata; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.model.TableServiceType; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableVersion; @@ -39,6 +41,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieClusteringException; import org.apache.hudi.exception.HoodieCommitException; import org.apache.hudi.exception.HoodieNotSupportedException; import org.apache.hudi.index.FlinkHoodieIndexFactory; @@ -68,6 +71,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.text.ParseException; import java.util.HashMap; import java.util.Iterator; @@ -399,6 +404,52 @@ public HoodieWriteMetadata> cluster(final String clusteringIns throw new HoodieNotSupportedException("Clustering is not supported yet"); } + private void completeClustering( + HoodieReplaceCommitMetadata metadata, + HoodieTable>, List, List> table, + String clusteringCommitTime) { + this.context.setJobStatus(this.getClass().getSimpleName(), "Collect clustering write status and commit clustering"); + HoodieInstant clusteringInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.REPLACE_COMMIT_ACTION, clusteringCommitTime); + List writeStats = metadata.getPartitionToWriteStats().entrySet().stream().flatMap(e -> + e.getValue().stream()).collect(Collectors.toList()); + if (writeStats.stream().mapToLong(HoodieWriteStat::getTotalWriteErrors).sum() > 0) { + throw new HoodieClusteringException("Clustering failed to write to files:" + + writeStats.stream().filter(s -> s.getTotalWriteErrors() > 0L).map(HoodieWriteStat::getFileId).collect(Collectors.joining(","))); + } + + try { + this.txnManager.beginTransaction(Option.of(clusteringInstant), Option.empty()); + finalizeWrite(table, clusteringCommitTime, writeStats); + // commit to data table after committing to metadata table. + // Do not do any conflict resolution here as we do with regular writes. We take the lock here to ensure all writes to metadata table happens within a + // single lock (single writer). Because more than one write to metadata table will result in conflicts since all of them updates the same partition. + writeTableMetadata(table, clusteringCommitTime, clusteringInstant.getAction(), metadata); + LOG.info("Committing Clustering {} finished with result {}.", clusteringCommitTime, metadata); + table.getActiveTimeline().transitionReplaceInflightToComplete( + HoodieTimeline.getReplaceCommitInflightInstant(clusteringCommitTime), + Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8))); + } catch (IOException e) { + throw new HoodieClusteringException( + "Failed to commit " + table.getMetaClient().getBasePath() + " at time " + clusteringCommitTime, e); + } finally { + this.txnManager.endTransaction(Option.of(clusteringInstant)); + } + + WriteMarkersFactory.get(config.getMarkersType(), table, clusteringCommitTime) + .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism()); + if (clusteringTimer != null) { + long durationInMs = metrics.getDurationInMs(clusteringTimer.stop()); + try { + metrics.updateCommitMetrics(HoodieActiveTimeline.parseDateFromInstantTime(clusteringCommitTime).getTime(), + durationInMs, metadata, HoodieActiveTimeline.REPLACE_COMMIT_ACTION); + } catch (ParseException e) { + throw new HoodieCommitException("Commit time is not of valid format. Failed to commit compaction " + + config.getBasePath() + " at time " + clusteringCommitTime, e); + } + } + LOG.info("Clustering successfully on commit " + clusteringCommitTime); + } + @Override protected HoodieTable doInitTable(HoodieTableMetaClient metaClient, Option instantTime, boolean initialMetadataTableIfNecessary) { // Create a Hoodie table which encapsulated the commits and files visible @@ -412,6 +463,23 @@ protected void tryUpgrade(HoodieTableMetaClient metaClient, Option insta // no need to execute the upgrade/downgrade on each write in streaming. } + public void completeTableService( + TableServiceType tableServiceType, + HoodieCommitMetadata metadata, + HoodieTable>, List, List> table, + String commitInstant) { + switch (tableServiceType) { + case CLUSTER: + completeClustering((HoodieReplaceCommitMetadata) metadata, table, commitInstant); + break; + case COMPACT: + completeCompaction(metadata, table, commitInstant); + break; + default: + throw new IllegalArgumentException("This table service is not valid " + tableServiceType); + } + } + /** * Upgrade downgrade the Hoodie table. * diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkRecentDaysClusteringPlanStrategy.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkRecentDaysClusteringPlanStrategy.java new file mode 100644 index 0000000000000..0109aaa60ffb9 --- /dev/null +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkRecentDaysClusteringPlanStrategy.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client.clustering.plan.strategy; + +import org.apache.hudi.client.common.HoodieFlinkEngineContext; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.HoodieFlinkCopyOnWriteTable; +import org.apache.hudi.table.HoodieFlinkMergeOnReadTable; + +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.util.Comparator; +import java.util.List; +import java.util.stream.Collectors; + +/** + * Clustering Strategy based on following. + * 1) Only looks at latest 'daybased.lookback.partitions' partitions. + * 2) Excludes files that are greater than 'small.file.limit' from clustering plan. + */ +public class FlinkRecentDaysClusteringPlanStrategy> + extends FlinkSizeBasedClusteringPlanStrategy { + private static final Logger LOG = LogManager.getLogger(FlinkRecentDaysClusteringPlanStrategy.class); + + public FlinkRecentDaysClusteringPlanStrategy(HoodieFlinkCopyOnWriteTable table, + HoodieFlinkEngineContext engineContext, + HoodieWriteConfig writeConfig) { + super(table, engineContext, writeConfig); + } + + public FlinkRecentDaysClusteringPlanStrategy(HoodieFlinkMergeOnReadTable table, + HoodieFlinkEngineContext engineContext, + HoodieWriteConfig writeConfig) { + super(table, engineContext, writeConfig); + } + + @Override + protected List filterPartitionPaths(List partitionPaths) { + int targetPartitionsForClustering = getWriteConfig().getTargetPartitionsForClustering(); + int skipPartitionsFromLatestForClustering = getWriteConfig().getSkipPartitionsFromLatestForClustering(); + return partitionPaths.stream() + .sorted(Comparator.reverseOrder()) + .skip(Math.max(skipPartitionsFromLatestForClustering, 0)) + .limit(targetPartitionsForClustering > 0 ? targetPartitionsForClustering : partitionPaths.size()) + .collect(Collectors.toList()); + } +} diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkSelectedPartitionsClusteringPlanStrategy.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkSelectedPartitionsClusteringPlanStrategy.java new file mode 100644 index 0000000000000..ae5726bb4a46e --- /dev/null +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkSelectedPartitionsClusteringPlanStrategy.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client.clustering.plan.strategy; + +import org.apache.hudi.client.common.HoodieFlinkEngineContext; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.HoodieFlinkCopyOnWriteTable; +import org.apache.hudi.table.HoodieFlinkMergeOnReadTable; + +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.util.List; +import java.util.stream.Collectors; + +import static org.apache.hudi.config.HoodieClusteringConfig.CLUSTERING_STRATEGY_PARAM_PREFIX; + +/** + * Clustering Strategy to filter just specified partitions from [begin, end]. Note both begin and end are inclusive. + */ +public class FlinkSelectedPartitionsClusteringPlanStrategy> + extends FlinkSizeBasedClusteringPlanStrategy { + private static final Logger LOG = LogManager.getLogger(FlinkSelectedPartitionsClusteringPlanStrategy.class); + + public static final String CONF_BEGIN_PARTITION = CLUSTERING_STRATEGY_PARAM_PREFIX + "cluster.begin.partition"; + public static final String CONF_END_PARTITION = CLUSTERING_STRATEGY_PARAM_PREFIX + "cluster.end.partition"; + + public FlinkSelectedPartitionsClusteringPlanStrategy(HoodieFlinkCopyOnWriteTable table, + HoodieFlinkEngineContext engineContext, + HoodieWriteConfig writeConfig) { + super(table, engineContext, writeConfig); + } + + public FlinkSelectedPartitionsClusteringPlanStrategy(HoodieFlinkMergeOnReadTable table, + HoodieFlinkEngineContext engineContext, + HoodieWriteConfig writeConfig) { + super(table, engineContext, writeConfig); + } + + @Override + protected List filterPartitionPaths(List partitionPaths) { + String beginPartition = getWriteConfig().getProps().getProperty(CONF_BEGIN_PARTITION); + String endPartition = getWriteConfig().getProps().getProperty(CONF_END_PARTITION); + List filteredPartitions = partitionPaths.stream() + .filter(path -> path.compareTo(beginPartition) >= 0 && path.compareTo(endPartition) <= 0) + .collect(Collectors.toList()); + LOG.info("Filtered to the following partitions: " + filteredPartitions); + return filteredPartitions; + } +} diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkSizeBasedClusteringPlanStrategy.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkSizeBasedClusteringPlanStrategy.java new file mode 100644 index 0000000000000..8347da6014af8 --- /dev/null +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkSizeBasedClusteringPlanStrategy.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client.clustering.plan.strategy; + +import org.apache.hudi.avro.model.HoodieClusteringGroup; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.client.common.HoodieFlinkEngineContext; +import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.HoodieFlinkCopyOnWriteTable; +import org.apache.hudi.table.HoodieFlinkMergeOnReadTable; +import org.apache.hudi.table.action.cluster.strategy.PartitionAwareClusteringPlanStrategy; + +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Stream; + +import static org.apache.hudi.config.HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS; + +/** + * Clustering Strategy based on following. + * 1) Creates clustering groups based on max size allowed per group. + * 2) Excludes files that are greater than 'small.file.limit' from clustering plan. + */ +public class FlinkSizeBasedClusteringPlanStrategy> + extends PartitionAwareClusteringPlanStrategy>, List, List> { + private static final Logger LOG = LogManager.getLogger(FlinkSizeBasedClusteringPlanStrategy.class); + + public FlinkSizeBasedClusteringPlanStrategy(HoodieFlinkCopyOnWriteTable table, + HoodieFlinkEngineContext engineContext, + HoodieWriteConfig writeConfig) { + super(table, engineContext, writeConfig); + } + + public FlinkSizeBasedClusteringPlanStrategy(HoodieFlinkMergeOnReadTable table, + HoodieFlinkEngineContext engineContext, + HoodieWriteConfig writeConfig) { + super(table, engineContext, writeConfig); + } + + @Override + protected Stream buildClusteringGroupsForPartition(String partitionPath, List fileSlices) { + HoodieWriteConfig writeConfig = getWriteConfig(); + + List, Integer>> fileSliceGroups = new ArrayList<>(); + List currentGroup = new ArrayList<>(); + long totalSizeSoFar = 0; + + for (FileSlice currentSlice : fileSlices) { + // check if max size is reached and create new group, if needed. + // in now, every clustering group out put is 1 file group. + if (totalSizeSoFar >= writeConfig.getClusteringTargetFileMaxBytes() && !currentGroup.isEmpty()) { + LOG.info("Adding one clustering group " + totalSizeSoFar + " max bytes: " + + writeConfig.getClusteringMaxBytesInGroup() + " num input slices: " + currentGroup.size()); + fileSliceGroups.add(Pair.of(currentGroup, 1)); + currentGroup = new ArrayList<>(); + totalSizeSoFar = 0; + } + + // Add to the current file-group + currentGroup.add(currentSlice); + // assume each file group size is ~= parquet.max.file.size + totalSizeSoFar += currentSlice.getBaseFile().isPresent() ? currentSlice.getBaseFile().get().getFileSize() : writeConfig.getParquetMaxFileSize(); + } + + if (!currentGroup.isEmpty()) { + fileSliceGroups.add(Pair.of(currentGroup, 1)); + } + + return fileSliceGroups.stream().map(fileSliceGroup -> + HoodieClusteringGroup.newBuilder() + .setSlices(getFileSliceInfo(fileSliceGroup.getLeft())) + .setNumOutputFileGroups(fileSliceGroup.getRight()) + .setMetrics(buildMetrics(fileSliceGroup.getLeft())) + .build()); + } + + @Override + protected Map getStrategyParams() { + Map params = new HashMap<>(); + if (!StringUtils.isNullOrEmpty(getWriteConfig().getClusteringSortColumns())) { + params.put(PLAN_STRATEGY_SORT_COLUMNS.key(), getWriteConfig().getClusteringSortColumns()); + } + return params; + } + + @Override + protected List filterPartitionPaths(List partitionPaths) { + return partitionPaths; + } + + @Override + protected Stream getFileSlicesEligibleForClustering(final String partition) { + return super.getFileSlicesEligibleForClustering(partition) + // Only files that have basefile size smaller than small file size are eligible. + .filter(slice -> slice.getBaseFile().map(HoodieBaseFile::getFileSize).orElse(0L) < getWriteConfig().getClusteringSmallFileLimit()); + } + + private int getNumberOfOutputFileGroups(long groupSize, long targetFileSize) { + return (int) Math.ceil(groupSize / (double) targetFileSize); + } +} diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java index 9ab633f9e3b37..0e5f1c26e32f4 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java @@ -55,6 +55,7 @@ import org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata; import org.apache.hudi.table.action.clean.CleanActionExecutor; import org.apache.hudi.table.action.clean.CleanPlanActionExecutor; +import org.apache.hudi.table.action.cluster.ClusteringPlanActionExecutor; import org.apache.hudi.table.action.commit.FlinkDeleteCommitActionExecutor; import org.apache.hudi.table.action.commit.FlinkInsertCommitActionExecutor; import org.apache.hudi.table.action.commit.FlinkInsertOverwriteCommitActionExecutor; @@ -286,7 +287,7 @@ public HoodieWriteMetadata> compact( @Override public Option scheduleClustering(final HoodieEngineContext context, final String instantTime, final Option> extraMetadata) { - throw new HoodieNotSupportedException("Clustering is not supported on a Flink CopyOnWrite table"); + return new ClusteringPlanActionExecutor<>(context, config,this, instantTime, extraMetadata).execute(); } @Override diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaCustomColumnsSortPartitioner.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaCustomColumnsSortPartitioner.java index eb3d4ef312e99..b9e466485f209 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaCustomColumnsSortPartitioner.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaCustomColumnsSortPartitioner.java @@ -49,7 +49,7 @@ public JavaCustomColumnsSortPartitioner(String[] columnNames, Schema schema, boo @Override public List> repartitionRecords( - List> records, int outputSparkPartitions) { + List> records, int outputPartitions) { return records.stream().sorted((o1, o2) -> { Object values1 = HoodieAvroUtils.getRecordColumnValues(o1, sortColumnNames, schema, consistentLogicalTimestampEnabled); Object values2 = HoodieAvroUtils.getRecordColumnValues(o2, sortColumnNames, schema, consistentLogicalTimestampEnabled); diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaGlobalSortPartitioner.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaGlobalSortPartitioner.java index fded0ffab51bd..d272849a19f28 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaGlobalSortPartitioner.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaGlobalSortPartitioner.java @@ -37,7 +37,7 @@ public class JavaGlobalSortPartitioner @Override public List> repartitionRecords(List> records, - int outputSparkPartitions) { + int outputPartitions) { // Now, sort the records and line them up nicely for loading. records.sort(new Comparator() { @Override diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index 729f0147b5940..3de4bd4f757b8 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -18,6 +18,7 @@ package org.apache.hudi.configuration; +import org.apache.hudi.client.clustering.plan.strategy.FlinkRecentDaysClusteringPlanStrategy; import org.apache.hudi.common.config.ConfigClassProperty; import org.apache.hudi.common.config.ConfigGroups; import org.apache.hudi.common.config.HoodieConfig; @@ -583,6 +584,72 @@ private FlinkOptions() { .defaultValue(40)// default min 40 commits .withDescription("Min number of commits to keep before archiving older commits into a sequential log, default 40"); + // ------------------------------------------------------------------------ + // Clustering Options + // ------------------------------------------------------------------------ + + public static final ConfigOption CLUSTERING_SCHEDULE_ENABLED = ConfigOptions + .key("clustering.schedule.enabled") + .booleanType() + .defaultValue(false) // default false for pipeline + .withDescription("Schedule the cluster plan, default false"); + + public static final ConfigOption CLUSTERING_DELTA_COMMITS = ConfigOptions + .key("clustering.delta_commits") + .intType() + .defaultValue(4) + .withDescription("Max delta commits needed to trigger clustering, default 4 commits"); + + public static final ConfigOption CLUSTERING_TASKS = ConfigOptions + .key("clustering.tasks") + .intType() + .defaultValue(4) + .withDescription("Parallelism of tasks that do actual clustering, default is 4"); + + public static final ConfigOption CLUSTERING_TARGET_PARTITIONS = ConfigOptions + .key("clustering.plan.strategy.daybased.lookback.partitions") + .intType() + .defaultValue(2) + .withDescription("Number of partitions to list to create ClusteringPlan, default is 2"); + + public static final ConfigOption CLUSTERING_PLAN_STRATEGY_CLASS = ConfigOptions + .key("clustering.plan.strategy.class") + .stringType() + .defaultValue(FlinkRecentDaysClusteringPlanStrategy.class.getName()) + .withDescription("Config to provide a strategy class (subclass of ClusteringPlanStrategy) to create clustering plan " + + "i.e select what file groups are being clustered. Default strategy, looks at the last N (determined by " + + CLUSTERING_TARGET_PARTITIONS.key() + ") day based partitions picks the small file slices within those partitions."); + + public static final ConfigOption CLUSTERING_PLAN_STRATEGY_TARGET_FILE_MAX_BYTES = ConfigOptions + .key("clustering.plan.strategy.target.file.max.bytes") + .intType() + .defaultValue(1024 * 1024 * 1024) // default 1 GB + .withDescription("Each group can produce 'N' (CLUSTERING_MAX_GROUP_SIZE/CLUSTERING_TARGET_FILE_SIZE) output file groups, default 1 GB"); + + public static final ConfigOption CLUSTERING_PLAN_STRATEGY_SMALL_FILE_LIMIT = ConfigOptions + .key("clustering.plan.strategy.small.file.limit") + .intType() + .defaultValue(600) // default 600 MB + .withDescription("Files smaller than the size specified here are candidates for clustering, default 600 MB"); + + public static final ConfigOption CLUSTERING_PLAN_STRATEGY_SKIP_PARTITIONS_FROM_LATEST = ConfigOptions + .key("clustering.plan.strategy.daybased.skipfromlatest.partitions") + .intType() + .defaultValue(0) + .withDescription("Number of partitions to skip from latest when choosing partitions to create ClusteringPlan"); + + public static final ConfigOption CLUSTERING_SORT_COLUMNS = ConfigOptions + .key("clustering.plan.strategy.sort.columns") + .stringType() + .noDefaultValue() + .withDescription("Columns to sort the data by when clustering"); + + public static final ConfigOption CLUSTERING_MAX_NUM_GROUPS = ConfigOptions + .key("clustering.plan.strategy.max.num.groups") + .intType() + .defaultValue(30) + .withDescription("Maximum number of groups to create as part of ClusteringPlan. Increasing groups will increase parallelism, default is 30"); + // ------------------------------------------------------------------------ // Hive Sync Options // ------------------------------------------------------------------------ diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/sort/SortOperatorGen.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/sort/SortOperatorGen.java index 4d3fc08efe197..b5599886a9d0b 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/sort/SortOperatorGen.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/sort/SortOperatorGen.java @@ -48,7 +48,7 @@ public OneInputStreamOperator createSortOperator() { codeGen.generateRecordComparator("SortComparator")); } - private SortCodeGenerator createSortCodeGenerator() { + public SortCodeGenerator createSortCodeGenerator() { SortSpec.SortSpecBuilder builder = SortSpec.builder(); IntStream.range(0, sortIndices.length).forEach(i -> builder.addField(i, true, true)); return new SortCodeGenerator(tableConfig, rowType, builder.build()); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitEvent.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitEvent.java new file mode 100644 index 0000000000000..30a8fbed3fafd --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitEvent.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.clustering; + +import org.apache.hudi.client.WriteStatus; + +import java.io.Serializable; +import java.util.List; + +/** + * Represents a commit event from the clustering task {@link ClusteringFunction}. + */ +public class ClusteringCommitEvent implements Serializable { + private static final long serialVersionUID = 1L; + + /** + * The clustering commit instant time. + */ + private String instant; + /** + * The write statuses. + */ + private List writeStatuses; + /** + * The clustering task identifier. + */ + private int taskID; + + public ClusteringCommitEvent() { + } + + public ClusteringCommitEvent(String instant, List writeStatuses, int taskID) { + this.instant = instant; + this.writeStatuses = writeStatuses; + this.taskID = taskID; + } + + public void setInstant(String instant) { + this.instant = instant; + } + + public void setWriteStatuses(List writeStatuses) { + this.writeStatuses = writeStatuses; + } + + public void setTaskID(int taskID) { + this.taskID = taskID; + } + + public String getInstant() { + return instant; + } + + public List getWriteStatuses() { + return writeStatuses; + } + + public int getTaskID() { + return taskID; + } +} diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java new file mode 100644 index 0000000000000..bc87270a49f1b --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.clustering; + +import org.apache.hudi.avro.model.HoodieClusteringGroup; +import org.apache.hudi.avro.model.HoodieClusteringPlan; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieFileGroupId; +import org.apache.hudi.common.model.TableServiceType; +import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.ClusteringUtils; +import org.apache.hudi.common.util.CommitUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.exception.HoodieClusteringException; +import org.apache.hudi.sink.CleanFunction; +import org.apache.hudi.table.HoodieFlinkTable; +import org.apache.hudi.table.action.HoodieWriteMetadata; +import org.apache.hudi.util.StreamerUtil; + +import org.apache.flink.configuration.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Function to check and commit the clustering action. + * + *

Each time after receiving a clustering commit event {@link ClusteringCommitEvent}, + * it loads and checks the clustering plan {@link org.apache.hudi.avro.model.HoodieClusteringPlan}, + * if all the clustering operations {@link org.apache.hudi.common.model.ClusteringOperation} + * of the plan are finished, tries to commit the clustering action. + * + *

It also inherits the {@link CleanFunction} cleaning ability. This is needed because + * the SQL API does not allow multiple sinks in one table sink provider. + */ +public class ClusteringCommitSink extends CleanFunction { + private static final Logger LOG = LoggerFactory.getLogger(ClusteringCommitSink.class); + + /** + * Config options. + */ + private final Configuration conf; + + private transient HoodieFlinkTable table; + + /** + * Buffer to collect the event from each clustering task {@code ClusteringFunction}. + * The key is the instant time. + */ + private transient Map> commitBuffer; + + public ClusteringCommitSink(Configuration conf) { + super(conf); + this.conf = conf; + } + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + if (writeClient == null) { + this.writeClient = StreamerUtil.createWriteClient(conf, getRuntimeContext()); + } + this.commitBuffer = new HashMap<>(); + this.table = writeClient.getHoodieTable(); + } + + @Override + public void invoke(ClusteringCommitEvent event, Context context) throws Exception { + final String instant = event.getInstant(); + commitBuffer.computeIfAbsent(instant, k -> new ArrayList<>()) + .add(event); + commitIfNecessary(instant, commitBuffer.get(instant)); + } + + /** + * Condition to commit: the commit buffer has equal size with the clustering plan operations + * and all the clustering commit event {@link ClusteringCommitEvent} has the same clustering instant time. + * + * @param instant Clustering commit instant time + * @param events Commit events ever received for the instant + */ + private void commitIfNecessary(String instant, List events) { + HoodieInstant clusteringInstant = HoodieTimeline.getReplaceCommitInflightInstant(instant); + Option> clusteringPlanOption = ClusteringUtils.getClusteringPlan( + StreamerUtil.createMetaClient(this.conf), clusteringInstant); + HoodieClusteringPlan clusteringPlan = clusteringPlanOption.get().getRight(); + boolean isReady = clusteringPlan.getInputGroups().size() == events.size(); + if (!isReady) { + return; + } + List statuses = events.stream() + .map(ClusteringCommitEvent::getWriteStatuses) + .flatMap(Collection::stream) + .collect(Collectors.toList()); + + HoodieWriteMetadata> writeMetadata = new HoodieWriteMetadata<>(); + writeMetadata.setWriteStatuses(statuses); + writeMetadata.setWriteStats(statuses.stream().map(WriteStatus::getStat).collect(Collectors.toList())); + writeMetadata.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(clusteringPlan, writeMetadata)); + validateWriteResult(clusteringPlan, instant, writeMetadata); + if (!writeMetadata.getCommitMetadata().isPresent()) { + HoodieCommitMetadata commitMetadata = CommitUtils.buildMetadata( + writeMetadata.getWriteStats().get(), + writeMetadata.getPartitionToReplaceFileIds(), + Option.empty(), + WriteOperationType.CLUSTER, + this.writeClient.getConfig().getSchema(), + HoodieTimeline.REPLACE_COMMIT_ACTION); + writeMetadata.setCommitMetadata(Option.of(commitMetadata)); + } + // commit the clustering + this.table.getMetaClient().reloadActiveTimeline(); + this.writeClient.completeTableService( + TableServiceType.CLUSTER, writeMetadata.getCommitMetadata().get(), table, instant); + + // reset the status + reset(instant); + } + + private void reset(String instant) { + this.commitBuffer.remove(instant); + } + + /** + * Validate actions taken by clustering. In the first implementation, we validate at least one new file is written. + * But we can extend this to add more validation. E.g. number of records read = number of records written etc. + * We can also make these validations in BaseCommitActionExecutor to reuse pre-commit hooks for multiple actions. + */ + private static void validateWriteResult(HoodieClusteringPlan clusteringPlan, String instantTime, HoodieWriteMetadata> writeMetadata) { + if (writeMetadata.getWriteStatuses().isEmpty()) { + throw new HoodieClusteringException("Clustering plan produced 0 WriteStatus for " + instantTime + + " #groups: " + clusteringPlan.getInputGroups().size() + " expected at least " + + clusteringPlan.getInputGroups().stream().mapToInt(HoodieClusteringGroup::getNumOutputFileGroups).sum() + + " write statuses"); + } + } + + private static Map> getPartitionToReplacedFileIds( + HoodieClusteringPlan clusteringPlan, + HoodieWriteMetadata> writeMetadata) { + Set newFilesWritten = writeMetadata.getWriteStats().get().stream() + .map(s -> new HoodieFileGroupId(s.getPartitionPath(), s.getFileId())).collect(Collectors.toSet()); + return ClusteringUtils.getFileGroupsFromClusteringPlan(clusteringPlan) + .filter(fg -> !newFilesWritten.contains(fg)) + .collect(Collectors.groupingBy(HoodieFileGroupId::getPartitionPath, Collectors.mapping(HoodieFileGroupId::getFileId, Collectors.toList()))); + } +} diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java new file mode 100644 index 0000000000000..a415ac9d46165 --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.clustering; + +import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.client.FlinkTaskContextSupplier; +import org.apache.hudi.client.HoodieFlinkWriteClient; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.client.utils.ConcatenatingIterator; +import org.apache.hudi.common.model.ClusteringGroupInfo; +import org.apache.hudi.common.model.ClusteringOperation; +import org.apache.hudi.common.table.HoodieTableConfig; +import org.apache.hudi.common.table.log.HoodieFileSliceReader; +import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.exception.HoodieClusteringException; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.io.IOUtils; +import org.apache.hudi.io.storage.HoodieFileReader; +import org.apache.hudi.io.storage.HoodieFileReaderFactory; +import org.apache.hudi.sink.bulk.BulkInsertWriterHelper; +import org.apache.hudi.sink.bulk.sort.SortOperatorGen; +import org.apache.hudi.table.HoodieFlinkTable; +import org.apache.hudi.util.AvroToRowDataConverters; +import org.apache.hudi.util.StreamerUtil; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.GenericRecordBuilder; +import org.apache.avro.generic.IndexedRecord; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.streaming.api.operators.BoundedOneInput; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.binary.BinaryRowData; +import org.apache.flink.table.planner.codegen.sort.SortCodeGenerator; +import org.apache.flink.table.runtime.generated.NormalizedKeyComputer; +import org.apache.flink.table.runtime.generated.RecordComparator; +import org.apache.flink.table.runtime.operators.TableStreamOperator; +import org.apache.flink.table.runtime.operators.sort.BinaryExternalSorter; +import org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer; +import org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer; +import org.apache.flink.table.runtime.typeutils.RowDataSerializer; +import org.apache.flink.table.runtime.util.StreamRecordCollector; +import org.apache.flink.table.types.logical.RowType; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Spliterator; +import java.util.Spliterators; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +import static org.apache.hudi.table.format.FormatUtils.buildAvroRecordBySchema; + +/** + * Operator to execute the actual clustering task assigned by the clustering plan task. + * In order to execute scalable, the input should shuffle by the clustering event {@link ClusteringPlanEvent}. + */ +public class ClusteringOperator extends TableStreamOperator implements + OneInputStreamOperator, BoundedOneInput { + private static final Logger LOG = LoggerFactory.getLogger(ClusteringOperator.class); + + private final Configuration conf; + private final RowType rowType; + private int taskID; + private transient HoodieWriteConfig writeConfig; + private transient HoodieFlinkTable table; + private transient Schema schema; + private transient Schema readerSchema; + private transient int[] requiredPos; + private transient AvroToRowDataConverters.AvroToRowDataConverter avroToRowDataConverter; + private transient HoodieFlinkWriteClient writeClient; + private transient BulkInsertWriterHelper writerHelper; + private transient String instantTime; + + private transient BinaryExternalSorter sorter; + private transient StreamRecordCollector collector; + private transient BinaryRowDataSerializer binarySerializer; + + public ClusteringOperator(Configuration conf, RowType rowType) { + this.conf = conf; + this.rowType = rowType; + } + + @Override + public void open() throws Exception { + super.open(); + + this.taskID = getRuntimeContext().getIndexOfThisSubtask(); + this.writeConfig = StreamerUtil.getHoodieClientConfig(this.conf); + this.writeClient = StreamerUtil.createWriteClient(conf, getRuntimeContext()); + this.table = writeClient.getHoodieTable(); + + this.schema = StreamerUtil.getTableAvroSchema(table.getMetaClient(), false); + this.readerSchema = HoodieAvroUtils.addMetadataFields(this.schema); + this.requiredPos = getRequiredPositions(); + + this.avroToRowDataConverter = AvroToRowDataConverters.createRowConverter(rowType); + + ClassLoader cl = getContainingTask().getUserCodeClassLoader(); + + AbstractRowDataSerializer inputSerializer = new BinaryRowDataSerializer(rowType.getFieldCount()); + this.binarySerializer = new BinaryRowDataSerializer(inputSerializer.getArity()); + + NormalizedKeyComputer computer = createSortCodeGenerator().generateNormalizedKeyComputer("SortComputer").newInstance(cl); + RecordComparator comparator = createSortCodeGenerator().generateRecordComparator("SortComparator").newInstance(cl); + + MemoryManager memManager = getContainingTask().getEnvironment().getMemoryManager(); + this.sorter = + new BinaryExternalSorter( + this.getContainingTask(), + memManager, + computeMemorySize(), + this.getContainingTask().getEnvironment().getIOManager(), + inputSerializer, + binarySerializer, + computer, + comparator, + getContainingTask().getJobConfiguration()); + this.sorter.startThreads(); + + collector = new StreamRecordCollector<>(output); + + // register the metrics. + getMetricGroup().gauge("memoryUsedSizeInBytes", (Gauge) sorter::getUsedMemoryInBytes); + getMetricGroup().gauge("numSpillFiles", (Gauge) sorter::getNumSpillFiles); + getMetricGroup().gauge("spillInBytes", (Gauge) sorter::getSpillInBytes); + } + + @Override + public void processElement(StreamRecord element) throws Exception { + ClusteringPlanEvent event = element.getValue(); + final String instantTime = event.getClusteringInstantTime(); + final ClusteringGroupInfo clusteringGroupInfo = event.getClusteringGroupInfo(); + + initWriterHelper(instantTime); + + List clusteringOps = clusteringGroupInfo.getOperations(); + boolean hasLogFiles = clusteringOps.stream().anyMatch(op -> op.getDeltaFilePaths().size() > 0); + + Iterator iterator; + if (hasLogFiles) { + // if there are log files, we read all records into memory for a file group and apply updates. + iterator = readRecordsForGroupWithLogs(clusteringOps, instantTime); + } else { + // We want to optimize reading records for case there are no log files. + iterator = readRecordsForGroupBaseFiles(clusteringOps); + } + + RowDataSerializer rowDataSerializer = new RowDataSerializer(rowType); + while (iterator.hasNext()) { + RowData rowData = iterator.next(); + BinaryRowData binaryRowData = rowDataSerializer.toBinaryRow(rowData).copy(); + this.sorter.write(binaryRowData); + } + + BinaryRowData row = binarySerializer.createInstance(); + while ((row = sorter.getIterator().next(row)) != null) { + this.writerHelper.write(row); + } + } + + @Override + public void close() { + if (this.writeClient != null) { + this.writeClient.cleanHandlesGracefully(); + this.writeClient.close(); + } + } + + /** + * End input action for batch source. + */ + public void endInput() { + List writeStatuses = this.writerHelper.getWriteStatuses(this.taskID); + collector.collect(new ClusteringCommitEvent(instantTime, writeStatuses, this.taskID)); + } + + // ------------------------------------------------------------------------- + // Utilities + // ------------------------------------------------------------------------- + + private void initWriterHelper(String clusteringInstantTime) { + if (this.writerHelper == null) { + this.writerHelper = new BulkInsertWriterHelper(this.conf, this.table, this.writeConfig, + clusteringInstantTime, this.taskID, getRuntimeContext().getNumberOfParallelSubtasks(), getRuntimeContext().getAttemptNumber(), + this.rowType); + this.instantTime = clusteringInstantTime; + } + } + + /** + * Read records from baseFiles, apply updates and convert to Iterator. + */ + @SuppressWarnings("unchecked") + private Iterator readRecordsForGroupWithLogs(List clusteringOps, String instantTime) { + List> recordIterators = new ArrayList<>(); + + long maxMemoryPerCompaction = IOUtils.getMaxMemoryPerCompaction(new FlinkTaskContextSupplier(null), writeConfig); + LOG.info("MaxMemoryPerCompaction run as part of clustering => " + maxMemoryPerCompaction); + + for (ClusteringOperation clusteringOp : clusteringOps) { + try { + Option baseFileReader = StringUtils.isNullOrEmpty(clusteringOp.getDataFilePath()) + ? Option.empty() + : Option.of(HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), new Path(clusteringOp.getDataFilePath()))); + HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder() + .withFileSystem(table.getMetaClient().getFs()) + .withBasePath(table.getMetaClient().getBasePath()) + .withLogFilePaths(clusteringOp.getDeltaFilePaths()) + .withReaderSchema(readerSchema) + .withLatestInstantTime(instantTime) + .withMaxMemorySizeInBytes(maxMemoryPerCompaction) + .withReadBlocksLazily(writeConfig.getCompactionLazyBlockReadEnabled()) + .withReverseReader(writeConfig.getCompactionReverseLogReadEnabled()) + .withBufferSize(writeConfig.getMaxDFSStreamBufferSize()) + .withSpillableMapBasePath(writeConfig.getSpillableMapBasePath()) + .build(); + + HoodieTableConfig tableConfig = table.getMetaClient().getTableConfig(); + HoodieFileSliceReader hoodieFileSliceReader = HoodieFileSliceReader.getFileSliceReader(baseFileReader, scanner, readerSchema, + tableConfig.getPayloadClass(), + tableConfig.getPreCombineField(), + tableConfig.populateMetaFields() ? Option.empty() : Option.of(Pair.of(tableConfig.getRecordKeyFieldProp(), + tableConfig.getPartitionFieldProp()))); + + recordIterators.add(StreamSupport.stream(Spliterators.spliteratorUnknownSize(hoodieFileSliceReader, Spliterator.NONNULL), false).map(hoodieRecord -> { + try { + return this.transform((IndexedRecord) hoodieRecord.getData().getInsertValue(readerSchema).get()); + } catch (IOException e) { + throw new HoodieIOException("Failed to read next record", e); + } + }).iterator()); + } catch (IOException e) { + throw new HoodieClusteringException("Error reading input data for " + clusteringOp.getDataFilePath() + + " and " + clusteringOp.getDeltaFilePaths(), e); + } + } + + return new ConcatenatingIterator<>(recordIterators); + } + + /** + * Read records from baseFiles and get iterator. + */ + private Iterator readRecordsForGroupBaseFiles(List clusteringOps) { + List> iteratorsForPartition = clusteringOps.stream().map(clusteringOp -> { + Iterable indexedRecords = () -> { + try { + return HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), new Path(clusteringOp.getDataFilePath())).getRecordIterator(readerSchema); + } catch (IOException e) { + throw new HoodieClusteringException("Error reading input data for " + clusteringOp.getDataFilePath() + + " and " + clusteringOp.getDeltaFilePaths(), e); + } + }; + + return StreamSupport.stream(indexedRecords.spliterator(), false).map(this::transform).iterator(); + }).collect(Collectors.toList()); + + return new ConcatenatingIterator<>(iteratorsForPartition); + } + + /** + * Transform IndexedRecord into HoodieRecord. + */ + private RowData transform(IndexedRecord indexedRecord) { + GenericRecord record = buildAvroRecordBySchema(indexedRecord, schema, requiredPos, new GenericRecordBuilder(schema)); + return (RowData) avroToRowDataConverter.convert(record); + } + + private int[] getRequiredPositions() { + final List fieldNames = readerSchema.getFields().stream().map(Schema.Field::name).collect(Collectors.toList()); + return schema.getFields().stream() + .map(field -> fieldNames.indexOf(field.name())) + .mapToInt(i -> i) + .toArray(); + } + + private SortCodeGenerator createSortCodeGenerator() { + SortOperatorGen sortOperatorGen = new SortOperatorGen(rowType, + conf.getString(FlinkOptions.CLUSTERING_SORT_COLUMNS).split(",")); + return sortOperatorGen.createSortCodeGenerator(); + } + + @Override + public void setKeyContextElement(StreamRecord record) throws Exception { + OneInputStreamOperator.super.setKeyContextElement(record); + } +} diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringPlanEvent.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringPlanEvent.java new file mode 100644 index 0000000000000..c82075877bcf3 --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringPlanEvent.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.clustering; + +import org.apache.hudi.common.model.ClusteringGroupInfo; + +import java.io.Serializable; +import java.util.Map; + +/** + * Represents a cluster command from the clustering plan task {@link ClusteringPlanSourceFunction}. + */ +public class ClusteringPlanEvent implements Serializable { + private static final long serialVersionUID = 1L; + + private String clusteringInstantTime; + + private ClusteringGroupInfo clusteringGroupInfo; + + private Map strategyParams; + + public ClusteringPlanEvent() { + } + + public ClusteringPlanEvent( + String instantTime, + ClusteringGroupInfo clusteringGroupInfo, + Map strategyParams) { + this.clusteringInstantTime = instantTime; + this.clusteringGroupInfo = clusteringGroupInfo; + this.strategyParams = strategyParams; + } + + public void setClusteringInstantTime(String clusteringInstantTime) { + this.clusteringInstantTime = clusteringInstantTime; + } + + public void setClusteringGroupInfo(ClusteringGroupInfo clusteringGroupInfo) { + this.clusteringGroupInfo = clusteringGroupInfo; + } + + public void setStrategyParams(Map strategyParams) { + this.strategyParams = strategyParams; + } + + public String getClusteringInstantTime() { + return clusteringInstantTime; + } + + public ClusteringGroupInfo getClusteringGroupInfo() { + return clusteringGroupInfo; + } + + public Map getStrategyParams() { + return strategyParams; + } +} diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringPlanSourceFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringPlanSourceFunction.java new file mode 100644 index 0000000000000..a3db2d41c8371 --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringPlanSourceFunction.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.clustering; + +import org.apache.hudi.avro.model.HoodieClusteringGroup; +import org.apache.hudi.avro.model.HoodieClusteringPlan; +import org.apache.hudi.common.model.ClusteringGroupInfo; +import org.apache.hudi.common.model.ClusteringOperation; +import org.apache.hudi.common.table.timeline.HoodieInstant; + +import org.apache.flink.api.common.functions.AbstractRichFunction; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Flink hudi clustering source function. + * + *

This function read the clustering plan as {@link ClusteringOperation}s then assign the clustering task + * event {@link ClusteringPlanEvent} to downstream operators. + * + *

The clustering instant time is specified explicitly with strategies: + * + *

    + *
  • If the timeline has no inflight instants, + * use {@link org.apache.hudi.common.table.timeline.HoodieActiveTimeline#createNewInstantTime()} + * as the instant time;
  • + *
  • If the timeline has inflight instants, + * use the median instant time between [last complete instant time, earliest inflight instant time] + * as the instant time.
  • + *
+ */ +public class ClusteringPlanSourceFunction extends AbstractRichFunction implements SourceFunction { + + protected static final Logger LOG = LoggerFactory.getLogger(ClusteringPlanSourceFunction.class); + + /** + * The clustering plan. + */ + private final HoodieClusteringPlan clusteringPlan; + + /** + * Hoodie instant. + */ + private final HoodieInstant instant; + + public ClusteringPlanSourceFunction(HoodieInstant instant, HoodieClusteringPlan clusteringPlan) { + this.instant = instant; + this.clusteringPlan = clusteringPlan; + } + + @Override + public void open(Configuration parameters) throws Exception { + // no operation + } + + @Override + public void run(SourceContext sourceContext) throws Exception { + for (HoodieClusteringGroup clusteringGroup : clusteringPlan.getInputGroups()) { + LOG.info("ClusteringPlanSourceFunction cluster " + clusteringGroup + " files"); + sourceContext.collect(new ClusteringPlanEvent(this.instant.getTimestamp(), ClusteringGroupInfo.create(clusteringGroup), clusteringPlan.getStrategy().getStrategyParams())); + } + } + + @Override + public void close() throws Exception { + // no operation + } + + @Override + public void cancel() { + // no operation + } +} diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/FlinkClusteringConfig.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/FlinkClusteringConfig.java new file mode 100644 index 0000000000000..e87a7d6752b6e --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/FlinkClusteringConfig.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.clustering; + +import org.apache.hudi.configuration.FlinkOptions; + +import com.beust.jcommander.Parameter; +import org.apache.flink.configuration.Configuration; + +/** + * Configurations for Hoodie Flink clustering. + */ +public class FlinkClusteringConfig extends Configuration { + + @Parameter(names = {"--help", "-h"}, help = true) + public Boolean help = false; + + // ------------------------------------------------------------------------ + // Hudi Write Options + // ------------------------------------------------------------------------ + + @Parameter(names = {"--path"}, description = "Base path for the target hoodie table.", required = true) + public String path; + + // ------------------------------------------------------------------------ + // Clustering Options + // ------------------------------------------------------------------------ + @Parameter(names = {"--clustering-delta-commits"}, description = "Max delta commits needed to trigger clustering, default 4 commits", required = false) + public Integer clusteringDeltaCommits = 1; + + @Parameter(names = {"--clustering-tasks"}, description = "Parallelism of tasks that do actual clustering, default is -1", required = false) + public Integer clusteringTasks = -1; + + @Parameter(names = {"--compaction-max-memory"}, description = "Max memory in MB for compaction spillable map, default 100MB.", required = false) + public Integer compactionMaxMemory = 100; + + @Parameter(names = {"--clean-retain-commits"}, + description = "Number of commits to retain. So data will be retained for num_of_commits * time_between_commits (scheduled).\n" + + "This also directly translates into how much you can incrementally pull on this table, default 10", + required = false) + public Integer cleanRetainCommits = 10; + + @Parameter(names = {"--archive-min-commits"}, + description = "Min number of commits to keep before archiving older commits into a sequential log, default 20.", + required = false) + public Integer archiveMinCommits = 20; + + @Parameter(names = {"--archive-max-commits"}, + description = "Max number of commits to keep before archiving older commits into a sequential log, default 30.", + required = false) + public Integer archiveMaxCommits = 30; + + @Parameter(names = {"--schedule", "-sc"}, description = "Not recommended. Schedule the clustering plan in this job.\n" + + "There is a risk of losing data when scheduling clustering outside the writer job.\n" + + "Scheduling clustering in the writer job and only let this job do the clustering execution is recommended.\n" + + "Default is true", required = false) + public Boolean schedule = true; + + @Parameter(names = {"--clean-async-enabled"}, description = "Whether to cleanup the old commits immediately on new commits, enabled by default", required = false) + public Boolean cleanAsyncEnable = false; + + @Parameter(names = {"--plan-strategy-class"}, description = "Config to provide a strategy class to generator clustering plan", required = false) + public String planStrategyClass = "org.apache.hudi.client.clustering.plan.strategy.FlinkRecentDaysClusteringPlanStrategy"; + + @Parameter(names = {"--target-file-max-bytes"}, description = "Each group can produce 'N' (CLUSTERING_MAX_GROUP_SIZE/CLUSTERING_TARGET_FILE_SIZE) output file groups, default 1 GB", required = false) + public Integer targetFileMaxBytes = 1024 * 1024 * 1024; + + @Parameter(names = {"--small-file-limit"}, description = "Files smaller than the size specified here are candidates for clustering, default 600 MB", required = false) + public Integer smallFileLimit = 600; + + @Parameter(names = {"--skip-from-latest-partitions"}, description = "Number of partitions to skip from latest when choosing partitions to create ClusteringPlan, default 0", required = false) + public Integer skipFromLatestPartitions = 0; + + @Parameter(names = {"--sort-columns"}, description = "Columns to sort the data by when clustering.", required = false) + public String sortColumns = ""; + + @Parameter(names = {"--max-num-groups"}, description = "Maximum number of groups to create as part of ClusteringPlan. Increasing groups will increase parallelism. default 30", required = false) + public Integer maxNumGroups = 30; + + @Parameter(names = {"--target-partitions"}, description = "Number of partitions to list to create ClusteringPlan, default 2", required = false) + public Integer targetPartitions = 2; + + public static final String SEQ_FIFO = "FIFO"; + public static final String SEQ_LIFO = "LIFO"; + @Parameter(names = {"--seq"}, description = "Clustering plan execution sequence, two options are supported:\n" + + "1). FIFO: execute the oldest plan first;\n" + + "2). LIFO: execute the latest plan first, by default LIFO", required = false) + public String clusteringSeq = SEQ_LIFO; + + @Parameter(names = {"--write-partition-url-encode"}, description = "Whether to encode the partition path url, default false") + public Boolean writePartitionUrlEncode = false; + + @Parameter(names = {"--hive-style-partitioning"}, description = "Whether to use Hive style partitioning.\n" + + "If set true, the names of partition folders follow = format.\n" + + "By default false (the names of partition folders are only partition values)") + public Boolean hiveStylePartitioning = false; + + /** + * Transforms a {@code FlinkClusteringConfig.config} into {@code Configuration}. + * The latter is more suitable for the table APIs. It reads all the properties + * in the properties file (set by `--props` option) and cmd line options + * (set by `--hoodie-conf` option). + */ + public static Configuration toFlinkConfig(FlinkClusteringConfig config) { + Configuration conf = new Configuration(); + + conf.setString(FlinkOptions.PATH, config.path); + conf.setInteger(FlinkOptions.ARCHIVE_MAX_COMMITS, config.archiveMaxCommits); + conf.setInteger(FlinkOptions.ARCHIVE_MIN_COMMITS, config.archiveMinCommits); + conf.setInteger(FlinkOptions.CLEAN_RETAIN_COMMITS, config.cleanRetainCommits); + conf.setInteger(FlinkOptions.COMPACTION_MAX_MEMORY, config.compactionMaxMemory); + conf.setInteger(FlinkOptions.CLUSTERING_DELTA_COMMITS, config.clusteringDeltaCommits); + conf.setInteger(FlinkOptions.CLUSTERING_TASKS, config.clusteringTasks); + conf.setString(FlinkOptions.CLUSTERING_PLAN_STRATEGY_CLASS, config.planStrategyClass); + conf.setInteger(FlinkOptions.CLUSTERING_PLAN_STRATEGY_TARGET_FILE_MAX_BYTES, config.targetFileMaxBytes); + conf.setInteger(FlinkOptions.CLUSTERING_PLAN_STRATEGY_SMALL_FILE_LIMIT, config.smallFileLimit); + conf.setInteger(FlinkOptions.CLUSTERING_PLAN_STRATEGY_SKIP_PARTITIONS_FROM_LATEST, config.skipFromLatestPartitions); + conf.setString(FlinkOptions.CLUSTERING_SORT_COLUMNS, config.sortColumns); + conf.setInteger(FlinkOptions.CLUSTERING_MAX_NUM_GROUPS, config.maxNumGroups); + conf.setInteger(FlinkOptions.CLUSTERING_TARGET_PARTITIONS, config.targetPartitions); + conf.setBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED, config.cleanAsyncEnable); + + // use synchronous clustering always + conf.setBoolean(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED, config.schedule); + + // bulk insert conf + conf.setBoolean(FlinkOptions.URL_ENCODE_PARTITIONING, config.writePartitionUrlEncode); + conf.setBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING, config.hiveStylePartitioning); + + return conf; + } +} diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java new file mode 100644 index 0000000000000..f7c361533a0d9 --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.clustering; + +import org.apache.hudi.avro.model.HoodieClusteringPlan; +import org.apache.hudi.client.HoodieFlinkWriteClient; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.ClusteringUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.table.HoodieFlinkTable; +import org.apache.hudi.util.AvroSchemaConverter; +import org.apache.hudi.util.CompactionUtil; +import org.apache.hudi.util.StreamerUtil; + +import com.beust.jcommander.JCommander; +import org.apache.avro.Schema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.RowType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Flink hudi clustering program that can be executed manually. + */ +public class HoodieFlinkClusteringJob { + + protected static final Logger LOG = LoggerFactory.getLogger(HoodieFlinkClusteringJob.class); + + public static void main(String[] args) throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + FlinkClusteringConfig cfg = new FlinkClusteringConfig(); + JCommander cmd = new JCommander(cfg, null, args); + if (cfg.help || args.length == 0) { + cmd.usage(); + System.exit(1); + } + + Configuration conf = FlinkClusteringConfig.toFlinkConfig(cfg); + + // create metaClient + HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf); + + // set table name + conf.setString(FlinkOptions.TABLE_NAME, metaClient.getTableConfig().getTableName()); + + // set table type + conf.setString(FlinkOptions.TABLE_TYPE, metaClient.getTableConfig().getTableType().name()); + + // set record key field + conf.setString(FlinkOptions.RECORD_KEY_FIELD, metaClient.getTableConfig().getRecordKeyFieldProp()); + + // set partition field + conf.setString(FlinkOptions.PARTITION_PATH_FIELD, metaClient.getTableConfig().getPartitionFieldProp()); + + // set table schema + CompactionUtil.setAvroSchema(conf, metaClient); + + HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf); + HoodieFlinkTable table = writeClient.getHoodieTable(); + + // judge whether have operation + // to compute the clustering instant time and do cluster. + if (cfg.schedule) { + String clusteringInstantTime = HoodieActiveTimeline.createNewInstantTime(); + boolean scheduled = writeClient.scheduleClusteringAtInstant(clusteringInstantTime, Option.empty()); + if (!scheduled) { + // do nothing. + LOG.info("No clustering plan for this job "); + return; + } + } + + table.getMetaClient().reloadActiveTimeline(); + + // fetch the instant based on the configured execution sequence + HoodieTimeline timeline = table.getActiveTimeline().filterPendingReplaceTimeline() + .filter(instant -> instant.getState() == HoodieInstant.State.REQUESTED); + Option requested = CompactionUtil.isLIFO(cfg.clusteringSeq) ? timeline.lastInstant() : timeline.firstInstant(); + if (!requested.isPresent()) { + // do nothing. + LOG.info("No clustering plan scheduled, turns on the clustering plan schedule with --schedule option"); + return; + } + + HoodieInstant clusteringInstant = requested.get(); + + HoodieInstant inflightInstant = HoodieTimeline.getReplaceCommitInflightInstant(clusteringInstant.getTimestamp()); + if (timeline.containsInstant(inflightInstant)) { + LOG.info("Rollback inflight clustering instant: [" + clusteringInstant + "]"); + writeClient.rollbackInflightClustering(inflightInstant, table); + table.getMetaClient().reloadActiveTimeline(); + } + + // generate clustering plan + // should support configurable commit metadata + Option> clusteringPlanOption = ClusteringUtils.getClusteringPlan( + table.getMetaClient(), clusteringInstant); + + if (!clusteringPlanOption.isPresent()) { + // do nothing. + LOG.info("No clustering plan scheduled, turns on the clustering plan schedule with --schedule option"); + return; + } + + HoodieClusteringPlan clusteringPlan = clusteringPlanOption.get().getRight(); + + if (clusteringPlan == null || (clusteringPlan.getInputGroups() == null) + || (clusteringPlan.getInputGroups().isEmpty())) { + // No clustering plan, do nothing and return. + LOG.info("No clustering plan for instant " + clusteringInstant.getTimestamp()); + return; + } + + HoodieInstant instant = HoodieTimeline.getReplaceCommitRequestedInstant(clusteringInstant.getTimestamp()); + HoodieTimeline pendingClusteringTimeline = table.getActiveTimeline().filterPendingReplaceTimeline(); + if (!pendingClusteringTimeline.containsInstant(instant)) { + // this means that the clustering plan was written to auxiliary path(.tmp) + // but not the meta path(.hoodie), this usually happens when the job crush + // exceptionally. + + // clean the clustering plan in auxiliary path and cancels the clustering. + + LOG.warn("The clustering plan was fetched through the auxiliary path(.tmp) but not the meta path(.hoodie).\n" + + "Clean the clustering plan in auxiliary path and cancels the clustering"); + CompactionUtil.cleanInstant(table.getMetaClient(), instant); + return; + } + + // get clusteringParallelism. + int clusteringParallelism = conf.getInteger(FlinkOptions.CLUSTERING_TASKS) == -1 + ? clusteringPlan.getInputGroups().size() : conf.getInteger(FlinkOptions.CLUSTERING_TASKS); + + // Mark instant as clustering inflight + table.getActiveTimeline().transitionReplaceRequestedToInflight(instant, Option.empty()); + + final Schema tableAvroSchema = StreamerUtil.getTableAvroSchema(table.getMetaClient(), false); + final DataType rowDataType = AvroSchemaConverter.convertToDataType(tableAvroSchema); + final RowType rowType = (RowType) rowDataType.getLogicalType(); + + // setup configuration + long ckpTimeout = env.getCheckpointConfig().getCheckpointTimeout(); + conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout); + + DataStream dataStream = env.addSource(new ClusteringPlanSourceFunction(timeline.lastInstant().get(), clusteringPlan)) + .name("clustering_source") + .uid("uid_clustering_source") + .rebalance() + .transform("clustering_task", + TypeInformation.of(ClusteringCommitEvent.class), + new ClusteringOperator(conf, rowType)) + .setParallelism(clusteringPlan.getInputGroups().size()); + + ExecNodeUtil.setManagedMemoryWeight(dataStream.getTransformation(), + conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L); + + dataStream + .addSink(new ClusteringCommitSink(conf)) + .name("clustering_commit") + .uid("uid_clustering_commit") + .setParallelism(1); + + env.execute("flink_hudi_clustering"); + } +} diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java index f82712bca2c2a..e9574dd52bedd 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java @@ -192,7 +192,7 @@ public class FlinkStreamerConfig extends Configuration { public Boolean indexGlobalEnabled = true; @Parameter(names = {"--index-partition-regex"}, - description = "Whether to load partitions in state if partition path matching, default *") + description = "Whether to load partitions in state if partition path matching, default *") public String indexPartitionRegex = ".*"; @Parameter(names = {"--source-avro-schema-path"}, description = "Source avro schema file path, the parsed schema is used for deserialization") diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index b977dfd7c5343..fcffbed54b48f 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -29,6 +29,7 @@ import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieCleaningPolicy; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.table.log.HoodieLogFormat; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; @@ -37,6 +38,7 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.config.HoodieClusteringConfig; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieMemoryConfig; import org.apache.hudi.config.HoodiePayloadConfig; @@ -162,6 +164,17 @@ public static HoodieWriteConfig getHoodieClientConfig( .withPath(conf.getString(FlinkOptions.PATH)) .combineInput(conf.getBoolean(FlinkOptions.PRE_COMBINE), true) .withMergeAllowDuplicateOnInserts(OptionsResolver.insertClustering(conf)) + .withClusteringConfig( + HoodieClusteringConfig.newBuilder() + .withAsyncClustering(conf.getBoolean(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED)) + .withClusteringPlanStrategyClass(conf.getString(FlinkOptions.CLUSTERING_PLAN_STRATEGY_CLASS)) + .withClusteringTargetPartitions(conf.getInteger(FlinkOptions.CLUSTERING_TARGET_PARTITIONS)) + .withClusteringMaxNumGroups(conf.getInteger(FlinkOptions.CLUSTERING_MAX_NUM_GROUPS)) + .withClusteringTargetFileMaxBytes(conf.getInteger(FlinkOptions.CLUSTERING_PLAN_STRATEGY_TARGET_FILE_MAX_BYTES)) + .withClusteringPlanSmallFileLimit(conf.getInteger(FlinkOptions.CLUSTERING_PLAN_STRATEGY_SMALL_FILE_LIMIT) * 1024 * 1024L) + .withClusteringSkipPartitionsFromLatest(conf.getInteger(FlinkOptions.CLUSTERING_PLAN_STRATEGY_SKIP_PARTITIONS_FROM_LATEST)) + .withAsyncClusteringMaxCommits(conf.getInteger(FlinkOptions.CLUSTERING_DELTA_COMMITS)) + .build()) .withCompactionConfig( HoodieCompactionConfig.newBuilder() .withPayloadClass(conf.getString(FlinkOptions.PAYLOAD_CLASS_NAME)) @@ -505,6 +518,11 @@ public static boolean haveSuccessfulCommits(HoodieTableMetaClient metaClient) { * Returns the max compaction memory in bytes with given conf. */ public static long getMaxCompactionMemoryInBytes(Configuration conf) { - return conf.getInteger(FlinkOptions.COMPACTION_MAX_MEMORY) * 1024 * 1024; + return (long) conf.getInteger(FlinkOptions.COMPACTION_MAX_MEMORY) * 1024 * 1024; + } + + public static Schema getTableAvroSchema(HoodieTableMetaClient metaClient, boolean includeMetadataFields) throws Exception { + TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient); + return schemaUtil.getTableAvroSchema(includeMetadataFields); } } diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java new file mode 100644 index 0000000000000..ac2ee0be374ea --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.cluster; + +import org.apache.hudi.avro.model.HoodieClusteringPlan; +import org.apache.hudi.client.HoodieFlinkWriteClient; +import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.ClusteringUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.sink.clustering.ClusteringCommitEvent; +import org.apache.hudi.sink.clustering.ClusteringCommitSink; +import org.apache.hudi.sink.clustering.ClusteringOperator; +import org.apache.hudi.sink.clustering.ClusteringPlanSourceFunction; +import org.apache.hudi.sink.clustering.FlinkClusteringConfig; +import org.apache.hudi.sink.clustering.HoodieFlinkClusteringJob; +import org.apache.hudi.table.HoodieFlinkTable; +import org.apache.hudi.util.AvroSchemaConverter; +import org.apache.hudi.util.CompactionUtil; +import org.apache.hudi.util.StreamerUtil; +import org.apache.hudi.utils.TestConfigurations; +import org.apache.hudi.utils.TestData; +import org.apache.hudi.utils.TestSQL; + +import org.apache.avro.Schema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.config.ExecutionConfigOptions; +import org.apache.flink.table.api.internal.TableEnvironmentImpl; +import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.RowType; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * IT cases for {@link HoodieFlinkClusteringJob}. + */ +public class ITTestHoodieFlinkClustering { + + private static final Map EXPECTED = new HashMap<>(); + + static { + EXPECTED.put("par1", "[id1,par1,id1,Danny,23,1000,par1, id2,par1,id2,Stephen,33,2000,par1]"); + EXPECTED.put("par2", "[id3,par2,id3,Julian,53,3000,par2, id4,par2,id4,Fabian,31,4000,par2]"); + EXPECTED.put("par3", "[id5,par3,id5,Sophia,18,5000,par3, id6,par3,id6,Emma,20,6000,par3]"); + EXPECTED.put("par4", "[id7,par4,id7,Bob,44,7000,par4, id8,par4,id8,Han,56,8000,par4]"); + } + + @TempDir + File tempFile; + + @Test + public void testHoodieFlinkClustering() throws Exception { + // Create hoodie table and insert into data. + EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build(); + TableEnvironment tableEnv = TableEnvironmentImpl.create(settings); + tableEnv.getConfig().getConfiguration() + .setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1); + Map options = new HashMap<>(); + options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); + + // use append mode + options.put(FlinkOptions.OPERATION.key(), WriteOperationType.INSERT.value()); + options.put(FlinkOptions.INSERT_CLUSTER.key(), "false"); + + String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options); + tableEnv.executeSql(hoodieTableDDL); + tableEnv.executeSql(TestSQL.INSERT_T1).await(); + + // wait for the asynchronous commit to finish + TimeUnit.SECONDS.sleep(3); + + // Make configuration and setAvroSchema. + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + FlinkClusteringConfig cfg = new FlinkClusteringConfig(); + cfg.path = tempFile.getAbsolutePath(); + cfg.targetPartitions = 4; + Configuration conf = FlinkClusteringConfig.toFlinkConfig(cfg); + + // create metaClient + HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf); + + // set the table name + conf.setString(FlinkOptions.TABLE_NAME, metaClient.getTableConfig().getTableName()); + conf.setString(FlinkOptions.TABLE_TYPE, metaClient.getTableConfig().getTableType().name()); + + // set record key field + conf.setString(FlinkOptions.RECORD_KEY_FIELD, metaClient.getTableConfig().getRecordKeyFieldProp()); + // set partition field + conf.setString(FlinkOptions.PARTITION_PATH_FIELD, metaClient.getTableConfig().getPartitionFieldProp()); + + long ckpTimeout = env.getCheckpointConfig().getCheckpointTimeout(); + conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout); + conf.setString(FlinkOptions.PARTITION_PATH_FIELD, "partition"); + + // set table schema + CompactionUtil.setAvroSchema(conf, metaClient); + + // judge whether have operation + // To compute the clustering instant time and do clustering. + String clusteringInstantTime = HoodieActiveTimeline.createNewInstantTime(); + + HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf, null); + HoodieFlinkTable table = writeClient.getHoodieTable(); + + boolean scheduled = writeClient.scheduleClusteringAtInstant(clusteringInstantTime, Option.empty()); + + assertTrue(scheduled, "The clustering plan should be scheduled"); + + // fetch the instant based on the configured execution sequence + table.getMetaClient().reloadActiveTimeline(); + HoodieTimeline timeline = table.getActiveTimeline().filterPendingReplaceTimeline() + .filter(instant -> instant.getState() == HoodieInstant.State.REQUESTED); + + // generate clustering plan + // should support configurable commit metadata + Option> clusteringPlanOption = ClusteringUtils.getClusteringPlan( + table.getMetaClient(), timeline.lastInstant().get()); + + HoodieClusteringPlan clusteringPlan = clusteringPlanOption.get().getRight(); + + // Mark instant as clustering inflight + HoodieInstant instant = HoodieTimeline.getReplaceCommitRequestedInstant(clusteringInstantTime); + table.getActiveTimeline().transitionReplaceRequestedToInflight(instant, Option.empty()); + + final Schema tableAvroSchema = StreamerUtil.getTableAvroSchema(table.getMetaClient(), false); + final DataType rowDataType = AvroSchemaConverter.convertToDataType(tableAvroSchema); + final RowType rowType = (RowType) rowDataType.getLogicalType(); + + DataStream dataStream = env.addSource(new ClusteringPlanSourceFunction(timeline.lastInstant().get(), clusteringPlan)) + .name("clustering_source") + .uid("uid_clustering_source") + .rebalance() + .transform("clustering_task", + TypeInformation.of(ClusteringCommitEvent.class), + new ClusteringOperator(conf, rowType)) + .setParallelism(clusteringPlan.getInputGroups().size()); + + ExecNodeUtil.setManagedMemoryWeight(dataStream.getTransformation(), + conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L); + + dataStream + .addSink(new ClusteringCommitSink(conf)) + .name("clustering_commit") + .uid("uid_clustering_commit") + .setParallelism(1); + + env.execute("flink_hudi_clustering"); + TestData.checkWrittenData(tempFile, EXPECTED, 4); + } +}