diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/CompactHelpers.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/CompactHelpers.java index a348eb0ed3a76..3379d16f4c035 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/CompactHelpers.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/CompactHelpers.java @@ -25,7 +25,6 @@ import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; -import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; import org.apache.hudi.common.util.Option; @@ -77,7 +76,7 @@ public void completeInflightCompaction(HoodieTable table, String compactionCommi HoodieActiveTimeline activeTimeline = table.getActiveTimeline(); try { activeTimeline.transitionCompactionInflightToComplete( - new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionCommitTime), + HoodieTimeline.getCompactionInflightInstant(compactionCommitTime), Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8))); } catch (IOException e) { throw new HoodieCompactionException( diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index 524758a675cfc..3cc6a9ea8776b 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -358,7 +358,7 @@ public void completeCompaction( String compactionCommitTime) { this.context.setJobStatus(this.getClass().getSimpleName(), "Collect compaction write status and commit compaction: " + config.getTableName()); List writeStats = metadata.getWriteStats(); - final HoodieInstant compactionInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionCommitTime); + final HoodieInstant compactionInstant = HoodieTimeline.getCompactionInflightInstant(compactionCommitTime); try { this.txnManager.beginTransaction(Option.of(compactionInstant), Option.empty()); finalizeWrite(table, compactionCommitTime, writeStats); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java index df82e75db92ca..7f9ec05e3c5eb 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java @@ -305,7 +305,7 @@ protected void completeCompaction(HoodieCommitMetadata metadata, String compactionCommitTime) { this.context.setJobStatus(this.getClass().getSimpleName(), "Collect compaction write status and commit compaction: " + config.getTableName()); List writeStats = metadata.getWriteStats(); - final HoodieInstant compactionInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionCommitTime); + final HoodieInstant compactionInstant = HoodieTimeline.getCompactionInflightInstant(compactionCommitTime); try { this.txnManager.beginTransaction(Option.of(compactionInstant), Option.empty()); finalizeWrite(table, compactionCommitTime, writeStats); @@ -382,7 +382,7 @@ private void completeClustering(HoodieReplaceCommitMetadata metadata, + writeStats.stream().filter(s -> s.getTotalWriteErrors() > 0L).map(s -> s.getFileId()).collect(Collectors.joining(","))); } - final HoodieInstant clusteringInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.REPLACE_COMMIT_ACTION, clusteringCommitTime); + final HoodieInstant clusteringInstant = HoodieTimeline.getReplaceCommitInflightInstant(clusteringCommitTime); try { this.txnManager.beginTransaction(Option.of(clusteringInstant), Option.empty()); @@ -393,7 +393,7 @@ private void completeClustering(HoodieReplaceCommitMetadata metadata, LOG.info("Committing Clustering " + clusteringCommitTime + ". Finished with result " + metadata); table.getActiveTimeline().transitionReplaceInflightToComplete( - HoodieTimeline.getReplaceCommitInflightInstant(clusteringCommitTime), + clusteringInstant, Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8))); } catch (Exception e) { throw new HoodieClusteringException("unable to transition clustering inflight to complete: " + clusteringCommitTime, e); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieLogFile.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieLogFile.java index 5b5a6432e633c..d4ad2cae1fe18 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieLogFile.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieLogFile.java @@ -40,6 +40,9 @@ public class HoodieLogFile implements Serializable { public static final String DELTA_EXTENSION = ".log"; public static final Integer LOGFILE_BASE_VERSION = 1; + private static final Comparator LOG_FILE_COMPARATOR = new LogFileComparator(); + private static final Comparator LOG_FILE_COMPARATOR_REVERSED = new LogFileComparator().reversed(); + private transient FileStatus fileStatus; private final String pathStr; private long fileLen; @@ -129,11 +132,11 @@ public HoodieLogFile rollOver(FileSystem fs, String logWriteToken) throws IOExce } public static Comparator getLogFileComparator() { - return new LogFileComparator(); + return LOG_FILE_COMPARATOR; } public static Comparator getReverseLogFileComparator() { - return new LogFileComparator().reversed(); + return LOG_FILE_COMPARATOR_REVERSED; } /** diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/HadoopConfigurations.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/HadoopConfigurations.java index 7784e7caaae2a..72f20311504d0 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/HadoopConfigurations.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/HadoopConfigurations.java @@ -23,10 +23,16 @@ import java.util.Map; +/** + * Utilities for fetching hadoop configurations. + */ public class HadoopConfigurations { private static final String HADOOP_PREFIX = "hadoop."; private static final String PARQUET_PREFIX = "parquet."; + /** + * Creates a merged hadoop configuration with given flink configuration and hadoop configuration. + */ public static org.apache.hadoop.conf.Configuration getParquetConf( org.apache.flink.configuration.Configuration options, org.apache.hadoop.conf.Configuration hadoopConf) { @@ -37,12 +43,12 @@ public static org.apache.hadoop.conf.Configuration getParquetConf( } /** - * Create a new hadoop configuration that is initialized with the given flink configuration. + * Creates a new hadoop configuration that is initialized with the given flink configuration. */ public static org.apache.hadoop.conf.Configuration getHadoopConf(Configuration conf) { org.apache.hadoop.conf.Configuration hadoopConf = FlinkClientUtil.getHadoopConf(); Map options = FlinkOptions.getPropertiesWithPrefix(conf.toMap(), HADOOP_PREFIX); - options.forEach((k, v) -> hadoopConf.set(k, v)); + options.forEach(hadoopConf::set); return hadoopConf; } }