Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hudi.common.table.log;

import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.ValidationUtils;

import java.io.Serializable;
import java.util.Objects;
Expand All @@ -33,19 +34,15 @@ public abstract class InstantRange implements Serializable {
protected final String endInstant;

public InstantRange(String startInstant, String endInstant) {
this.startInstant = Objects.requireNonNull(startInstant);
this.endInstant = Objects.requireNonNull(endInstant);
this.startInstant = startInstant;
this.endInstant = endInstant;
}

public static InstantRange getInstance(String startInstant, String endInstant, RangeType rangeType) {
switch (rangeType) {
case OPEN_CLOSE:
return new OpenCloseRange(startInstant, endInstant);
case CLOSE_CLOSE:
return new CloseCloseRange(startInstant, endInstant);
default:
throw new AssertionError();
}
/**
* Returns the builder.
*/
public static Builder builder() {
return new Builder();
}

public String getStartInstant() {
Expand All @@ -65,14 +62,14 @@ public String getEndInstant() {
/**
* Represents a range type.
*/
public enum RangeType {
public static enum RangeType {
OPEN_CLOSE, CLOSE_CLOSE
}

private static class OpenCloseRange extends InstantRange {

public OpenCloseRange(String startInstant, String endInstant) {
super(startInstant, endInstant);
super(Objects.requireNonNull(startInstant), endInstant);
}

@Override
Expand All @@ -84,10 +81,31 @@ public boolean isInRange(String instant) {
}
}

private static class OpenCloseRangeNullableBoundary extends InstantRange {

public OpenCloseRangeNullableBoundary(String startInstant, String endInstant) {
super(startInstant, endInstant);
ValidationUtils.checkArgument(startInstant != null || endInstant != null,
"Start and end instants can not both be null");
}

@Override
public boolean isInRange(String instant) {
if (startInstant == null) {
return HoodieTimeline.compareTimestamps(instant, HoodieTimeline.LESSER_THAN_OR_EQUALS, endInstant);
} else if (endInstant == null) {
return HoodieTimeline.compareTimestamps(instant, HoodieTimeline.GREATER_THAN, startInstant);
} else {
return HoodieTimeline.compareTimestamps(instant, HoodieTimeline.GREATER_THAN, startInstant)
&& HoodieTimeline.compareTimestamps(instant, HoodieTimeline.LESSER_THAN_OR_EQUALS, endInstant);
}
}
}

private static class CloseCloseRange extends InstantRange {

public CloseCloseRange(String startInstant, String endInstant) {
super(startInstant, endInstant);
super(Objects.requireNonNull(startInstant), endInstant);
}

@Override
Expand All @@ -98,4 +116,78 @@ public boolean isInRange(String instant) {
return HoodieTimeline.compareTimestamps(instant, HoodieTimeline.GREATER_THAN_OR_EQUALS, startInstant);
}
}

private static class CloseCloseRangeNullableBoundary extends InstantRange {

public CloseCloseRangeNullableBoundary(String startInstant, String endInstant) {
super(startInstant, endInstant);
ValidationUtils.checkArgument(startInstant != null || endInstant != null,
"Start and end instants can not both be null");
}

@Override
public boolean isInRange(String instant) {
if (startInstant == null) {
return HoodieTimeline.compareTimestamps(instant, HoodieTimeline.LESSER_THAN_OR_EQUALS, endInstant);
} else if (endInstant == null) {
return HoodieTimeline.compareTimestamps(instant, HoodieTimeline.GREATER_THAN_OR_EQUALS, startInstant);
} else {
return HoodieTimeline.compareTimestamps(instant, HoodieTimeline.GREATER_THAN_OR_EQUALS, startInstant)
&& HoodieTimeline.compareTimestamps(instant, HoodieTimeline.LESSER_THAN_OR_EQUALS, endInstant);
}
}
}

// -------------------------------------------------------------------------
// Inner Class
// -------------------------------------------------------------------------

/**
* Builder for {@link InstantRange}.
*/
public static class Builder {
private String startInstant;
private String endInstant;
private RangeType rangeType;
private boolean nullableBoundary = false;

private Builder() {
}

public Builder startInstant(String startInstant) {
this.startInstant = startInstant;
return this;
}

public Builder endInstant(String endInstant) {
this.endInstant = endInstant;
return this;
}

public Builder rangeType(RangeType rangeType) {
this.rangeType = rangeType;
return this;
}

public Builder nullableBoundary(boolean nullable) {
this.nullableBoundary = nullable;
return this;
}

public InstantRange build() {
ValidationUtils.checkState(this.rangeType != null, "Range type is required");
switch (rangeType) {
case OPEN_CLOSE:
return nullableBoundary
? new OpenCloseRangeNullableBoundary(startInstant, endInstant)
: new OpenCloseRange(startInstant, endInstant);
case CLOSE_CLOSE:
return nullableBoundary
? new CloseCloseRangeNullableBoundary(startInstant, endInstant)
: new CloseCloseRange(startInstant, endInstant);
default:
throw new AssertionError();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -172,4 +172,12 @@ public static boolean isInsertOverwrite(Configuration conf) {
return conf.getString(FlinkOptions.OPERATION).equals(WriteOperationType.INSERT_OVERWRITE_TABLE.value())
|| conf.getString(FlinkOptions.OPERATION).equals(WriteOperationType.INSERT_OVERWRITE.value());
}

/**
* Returns whether the read start commit is specific commit timestamp.
*/
public static boolean isSpecificStartCommit(Configuration conf) {
return conf.getOptional(FlinkOptions.READ_START_COMMIT).isPresent()
&& !conf.get(FlinkOptions.READ_START_COMMIT).equalsIgnoreCase(FlinkOptions.START_COMMIT_EARLIEST);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,29 @@ public static void clean(String path) {
PROFILES.remove(path);
}

/**
* Returns all the incremental write file statuses with the given commits metadata.
*
* <p> Different with {@link #getWritePathsOfInstants}, the files are not filtered by
* existence.
*
* @param basePath Table base path
* @param hadoopConf The hadoop conf
* @param metadataList The commits metadata
* @param tableType The table type
* @return the file status array
*/
public static FileStatus[] getRawWritePathsOfInstants(
Path basePath,
Configuration hadoopConf,
List<HoodieCommitMetadata> metadataList,
HoodieTableType tableType) {
Map<String, FileStatus> uniqueIdToFileStatus = new HashMap<>();
metadataList.forEach(metadata ->
uniqueIdToFileStatus.putAll(getFilesToReadOfInstant(basePath, metadata, hadoopConf, tableType)));
return uniqueIdToFileStatus.values().toArray(new FileStatus[0]);
}

/**
* Returns all the incremental write file statuses with the given commits metadata.
*
Expand All @@ -103,6 +126,25 @@ public static FileStatus[] getWritePathsOfInstants(
return uniqueIdToFileStatus.values().toArray(new FileStatus[0]);
}

/**
* Returns the commit file status info with given metadata.
*
* @param basePath Table base path
* @param metadata The metadata
* @param hadoopConf The filesystem
* @param tableType The table type
* @return the commit file status info grouping by specific ID
*/
private static Map<String, FileStatus> getFilesToReadOfInstant(
Path basePath,
HoodieCommitMetadata metadata,
Configuration hadoopConf,
HoodieTableType tableType) {
return getFilesToRead(hadoopConf, metadata, basePath.toString(), tableType).entrySet().stream()
.filter(entry -> StreamerUtil.isValidFile(entry.getValue()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

/**
* Returns the commit file status info with given metadata.
*
Expand Down
Loading