Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.util.Option;
Expand Down Expand Up @@ -77,7 +76,7 @@ public void completeInflightCompaction(HoodieTable table, String compactionCommi
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
try {
activeTimeline.transitionCompactionInflightToComplete(
new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionCommitTime),
HoodieTimeline.getCompactionInflightInstant(compactionCommitTime),
Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
} catch (IOException e) {
throw new HoodieCompactionException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ public void completeCompaction(
String compactionCommitTime) {
this.context.setJobStatus(this.getClass().getSimpleName(), "Collect compaction write status and commit compaction: " + config.getTableName());
List<HoodieWriteStat> writeStats = metadata.getWriteStats();
final HoodieInstant compactionInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionCommitTime);
final HoodieInstant compactionInstant = HoodieTimeline.getCompactionInflightInstant(compactionCommitTime);
try {
this.txnManager.beginTransaction(Option.of(compactionInstant), Option.empty());
finalizeWrite(table, compactionCommitTime, writeStats);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ protected void completeCompaction(HoodieCommitMetadata metadata,
String compactionCommitTime) {
this.context.setJobStatus(this.getClass().getSimpleName(), "Collect compaction write status and commit compaction: " + config.getTableName());
List<HoodieWriteStat> writeStats = metadata.getWriteStats();
final HoodieInstant compactionInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionCommitTime);
final HoodieInstant compactionInstant = HoodieTimeline.getCompactionInflightInstant(compactionCommitTime);
try {
this.txnManager.beginTransaction(Option.of(compactionInstant), Option.empty());
finalizeWrite(table, compactionCommitTime, writeStats);
Expand Down Expand Up @@ -382,7 +382,7 @@ private void completeClustering(HoodieReplaceCommitMetadata metadata,
+ writeStats.stream().filter(s -> s.getTotalWriteErrors() > 0L).map(s -> s.getFileId()).collect(Collectors.joining(",")));
}

final HoodieInstant clusteringInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.REPLACE_COMMIT_ACTION, clusteringCommitTime);
final HoodieInstant clusteringInstant = HoodieTimeline.getReplaceCommitInflightInstant(clusteringCommitTime);
try {
this.txnManager.beginTransaction(Option.of(clusteringInstant), Option.empty());

Expand All @@ -393,7 +393,7 @@ private void completeClustering(HoodieReplaceCommitMetadata metadata,
LOG.info("Committing Clustering " + clusteringCommitTime + ". Finished with result " + metadata);

table.getActiveTimeline().transitionReplaceInflightToComplete(
HoodieTimeline.getReplaceCommitInflightInstant(clusteringCommitTime),
clusteringInstant,
Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
} catch (Exception e) {
throw new HoodieClusteringException("unable to transition clustering inflight to complete: " + clusteringCommitTime, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ public class HoodieLogFile implements Serializable {
public static final String DELTA_EXTENSION = ".log";
public static final Integer LOGFILE_BASE_VERSION = 1;

private static final Comparator<HoodieLogFile> LOG_FILE_COMPARATOR = new LogFileComparator();
private static final Comparator<HoodieLogFile> LOG_FILE_COMPARATOR_REVERSED = new LogFileComparator().reversed();

private transient FileStatus fileStatus;
private final String pathStr;
private long fileLen;
Expand Down Expand Up @@ -129,11 +132,11 @@ public HoodieLogFile rollOver(FileSystem fs, String logWriteToken) throws IOExce
}

public static Comparator<HoodieLogFile> getLogFileComparator() {
return new LogFileComparator();
return LOG_FILE_COMPARATOR;
}

public static Comparator<HoodieLogFile> getReverseLogFileComparator() {
return new LogFileComparator().reversed();
return LOG_FILE_COMPARATOR_REVERSED;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,16 @@

import java.util.Map;

/**
* Utilities for fetching hadoop configurations.
*/
public class HadoopConfigurations {
private static final String HADOOP_PREFIX = "hadoop.";
private static final String PARQUET_PREFIX = "parquet.";

/**
* Creates a merged hadoop configuration with given flink configuration and hadoop configuration.
*/
public static org.apache.hadoop.conf.Configuration getParquetConf(
org.apache.flink.configuration.Configuration options,
org.apache.hadoop.conf.Configuration hadoopConf) {
Expand All @@ -37,12 +43,12 @@ public static org.apache.hadoop.conf.Configuration getParquetConf(
}

/**
* Create a new hadoop configuration that is initialized with the given flink configuration.
* Creates a new hadoop configuration that is initialized with the given flink configuration.
*/
public static org.apache.hadoop.conf.Configuration getHadoopConf(Configuration conf) {
org.apache.hadoop.conf.Configuration hadoopConf = FlinkClientUtil.getHadoopConf();
Map<String, String> options = FlinkOptions.getPropertiesWithPrefix(conf.toMap(), HADOOP_PREFIX);
options.forEach((k, v) -> hadoopConf.set(k, v));
options.forEach(hadoopConf::set);
return hadoopConf;
}
}