Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public FlinkInputSplit[] createInputSplits(int minNumSplits) throws IOException
tableLoader.open();
try (TableLoader loader = tableLoader) {
Table table = loader.loadTable();
return FlinkSplitPlanner.planInputSplits(table, context);
return FlinkSplitGenerator.createInputSplits(table, context);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,56 +22,33 @@
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
import org.apache.flink.annotation.Internal;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;

@Internal
public class FlinkSplitPlanner {
private FlinkSplitPlanner() {
class FlinkSplitGenerator {
private FlinkSplitGenerator() {
}

static FlinkInputSplit[] planInputSplits(Table table, ScanContext context) {
try (CloseableIterable<CombinedScanTask> tasksIterable = planTasks(table, context)) {
List<CombinedScanTask> tasks = Lists.newArrayList(tasksIterable);
FlinkInputSplit[] splits = new FlinkInputSplit[tasks.size()];
for (int i = 0; i < tasks.size(); i++) {
splits[i] = new FlinkInputSplit(i, tasks.get(i));
}
return splits;
} catch (IOException e) {
throw new UncheckedIOException("Failed to process tasks iterable", e);
}
}

/**
* This returns splits for the FLIP-27 source
*/
public static List<IcebergSourceSplit> planIcebergSourceSplits(Table table, ScanContext context) {
try (CloseableIterable<CombinedScanTask> tasksIterable = planTasks(table, context)) {
return Lists.newArrayList(CloseableIterable.transform(tasksIterable,
task -> IcebergSourceSplit.fromCombinedScanTask(task)));
} catch (IOException e) {
throw new UncheckedIOException("Failed to process task iterable: ", e);
static FlinkInputSplit[] createInputSplits(Table table, ScanContext context) {
List<CombinedScanTask> tasks = tasks(table, context);
FlinkInputSplit[] splits = new FlinkInputSplit[tasks.size()];
for (int i = 0; i < tasks.size(); i++) {
splits[i] = new FlinkInputSplit(i, tasks.get(i));
}
return splits;
}

static CloseableIterable<CombinedScanTask> planTasks(Table table, ScanContext context) {
private static List<CombinedScanTask> tasks(Table table, ScanContext context) {
TableScan scan = table
.newScan()
.caseSensitive(context.caseSensitive())
.project(context.project());

if (context.includeColumnStats()) {
scan = scan.includeColumnStats();
}

if (context.snapshotId() != null) {
scan = scan.useSnapshot(context.snapshotId());
}
Expand Down Expand Up @@ -106,6 +83,10 @@ static CloseableIterable<CombinedScanTask> planTasks(Table table, ScanContext co
}
}

return scan.planTasks();
try (CloseableIterable<CombinedScanTask> tasksIterable = scan.planTasks()) {
return Lists.newArrayList(tasksIterable);
} catch (IOException e) {
throw new UncheckedIOException("Failed to close table scan: " + scan, e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,6 @@ class ScanContext implements Serializable {
private static final ConfigOption<Duration> MONITOR_INTERVAL =
ConfigOptions.key("monitor-interval").durationType().defaultValue(Duration.ofSeconds(10));

private static final ConfigOption<Boolean> INCLUDE_COLUMN_STATS =
ConfigOptions.key("include-column-stats").booleanType().defaultValue(false);

private final boolean caseSensitive;
private final Long snapshotId;
private final Long startSnapshotId;
Expand All @@ -86,12 +83,11 @@ class ScanContext implements Serializable {
private final Schema schema;
private final List<Expression> filters;
private final long limit;
private final boolean includeColumnStats;

private ScanContext(boolean caseSensitive, Long snapshotId, Long startSnapshotId, Long endSnapshotId,
Long asOfTimestamp, Long splitSize, Integer splitLookback, Long splitOpenFileCost,
boolean isStreaming, Duration monitorInterval, String nameMapping,
Schema schema, List<Expression> filters, long limit, boolean includeColumnStats) {
Schema schema, List<Expression> filters, long limit) {
this.caseSensitive = caseSensitive;
this.snapshotId = snapshotId;
this.startSnapshotId = startSnapshotId;
Expand All @@ -107,7 +103,6 @@ private ScanContext(boolean caseSensitive, Long snapshotId, Long startSnapshotId
this.schema = schema;
this.filters = filters;
this.limit = limit;
this.includeColumnStats = includeColumnStats;
}

boolean caseSensitive() {
Expand Down Expand Up @@ -166,10 +161,6 @@ long limit() {
return limit;
}

boolean includeColumnStats() {
return includeColumnStats;
}

ScanContext copyWithAppendsBetween(long newStartSnapshotId, long newEndSnapshotId) {
return ScanContext.builder()
.caseSensitive(caseSensitive)
Expand All @@ -186,7 +177,6 @@ ScanContext copyWithAppendsBetween(long newStartSnapshotId, long newEndSnapshotI
.project(schema)
.filters(filters)
.limit(limit)
.includeColumnStats(includeColumnStats)
.build();
}

Expand All @@ -206,7 +196,6 @@ ScanContext copyWithSnapshotId(long newSnapshotId) {
.project(schema)
.filters(filters)
.limit(limit)
.includeColumnStats(includeColumnStats)
.build();
}

Expand All @@ -229,7 +218,6 @@ static class Builder {
private Schema projectedSchema;
private List<Expression> filters;
private long limit = -1L;
private boolean includeColumnStats = INCLUDE_COLUMN_STATS.defaultValue();

private Builder() {
}
Expand Down Expand Up @@ -304,11 +292,6 @@ Builder limit(long newLimit) {
return this;
}

Builder includeColumnStats(boolean newIncludeColumnStats) {
this.includeColumnStats = newIncludeColumnStats;
return this;
}

Builder fromProperties(Map<String, String> properties) {
Configuration config = new Configuration();
properties.forEach(config::setString);
Expand All @@ -323,15 +306,14 @@ Builder fromProperties(Map<String, String> properties) {
.splitOpenFileCost(config.get(SPLIT_FILE_OPEN_COST))
.streaming(config.get(STREAMING))
.monitorInterval(config.get(MONITOR_INTERVAL))
.nameMapping(properties.get(DEFAULT_NAME_MAPPING))
.includeColumnStats(config.get(INCLUDE_COLUMN_STATS));
.nameMapping(properties.get(DEFAULT_NAME_MAPPING));
}

public ScanContext build() {
return new ScanContext(caseSensitive, snapshotId, startSnapshotId,
endSnapshotId, asOfTimestamp, splitSize, splitLookback,
splitOpenFileCost, isStreaming, monitorInterval, nameMapping, projectedSchema,
filters, limit, includeColumnStats);
filters, limit);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ private void monitorAndForwardSplits() {
newScanContext = scanContext.copyWithAppendsBetween(lastSnapshotId, snapshotId);
}

FlinkInputSplit[] splits = FlinkSplitPlanner.planInputSplits(table, newScanContext);
FlinkInputSplit[] splits = FlinkSplitGenerator.createInputSplits(table, newScanContext);
for (FlinkInputSplit split : splits) {
sourceContext.collect(split);
}
Expand Down

This file was deleted.

This file was deleted.

Loading