Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -197,12 +197,18 @@ private FlinkOptions() {
.withDescription("Check interval for streaming read of SECOND, default 1 minute");

public static final String START_COMMIT_EARLIEST = "earliest";
public static final ConfigOption<String> READ_STREAMING_START_COMMIT = ConfigOptions
.key("read.streaming.start-commit")
public static final ConfigOption<String> READ_START_COMMIT = ConfigOptions
.key("read.start-commit")
.stringType()
.noDefaultValue()
.withDescription("Start commit instant for streaming read, the commit time format should be 'yyyyMMddHHmmss', "
+ "by default reading from the latest instant");
.withDescription("Start commit instant for reading, the commit time format should be 'yyyyMMddHHmmss', "
+ "by default reading from the latest instant for streaming read");

public static final ConfigOption<String> READ_END_COMMIT = ConfigOptions
.key("read.end-commit")
.stringType()
.noDefaultValue()
.withDescription("End commit instant for reading, the commit time format should be 'yyyyMMddHHmmss'");

// ------------------------------------------------------------------------
// Write Options
Expand Down
23 changes: 22 additions & 1 deletion hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;

import javax.annotation.Nullable;

import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -36,6 +38,7 @@
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;

/**
* A file index which supports listing files efficiently through metadata table.
Expand Down Expand Up @@ -137,11 +140,29 @@ public void reset() {
this.partitionPaths = null;
}

// -------------------------------------------------------------------------
// Getter/Setter
// -------------------------------------------------------------------------

/**
* Sets up explicit partition paths for pruning.
*/
public void setPartitionPaths(@Nullable Set<String> partitionPaths) {
if (partitionPaths != null) {
this.partitionPaths = new ArrayList<>(partitionPaths);
}
}

// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------

private List<String> getOrBuildPartitionPaths() {
/**
* Returns all the relative partition paths.
*
* <p>The partition paths are cached once invoked.
*/
public List<String> getOrBuildPartitionPaths() {
if (this.partitionPaths != null) {
return this.partitionPaths;
}
Expand Down
Loading