Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public FlinkInputSplit[] createInputSplits(int minNumSplits) throws IOException
tableLoader.open();
try (TableLoader loader = tableLoader) {
Table table = loader.loadTable();
return FlinkSplitGenerator.createInputSplits(table, context);
return FlinkSplitPlanner.planInputSplits(table, context);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.flink.source;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
import org.apache.flink.annotation.Internal;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;

@Internal
public class FlinkSplitPlanner {
private FlinkSplitPlanner() {
}

static FlinkInputSplit[] planInputSplits(Table table, ScanContext context) {
try (CloseableIterable<CombinedScanTask> tasksIterable = planTasks(table, context)) {
List<CombinedScanTask> tasks = Lists.newArrayList(tasksIterable);
FlinkInputSplit[] splits = new FlinkInputSplit[tasks.size()];
for (int i = 0; i < tasks.size(); i++) {
splits[i] = new FlinkInputSplit(i, tasks.get(i));
}
return splits;
} catch (IOException e) {
throw new UncheckedIOException("Failed to process tasks iterable", e);
}
}

/**
* This returns splits for the FLIP-27 source
*/
public static List<IcebergSourceSplit> planIcebergSourceSplits(Table table, ScanContext context) {
try (CloseableIterable<CombinedScanTask> tasksIterable = planTasks(table, context)) {
return Lists.newArrayList(CloseableIterable.transform(tasksIterable,
IcebergSourceSplit::fromCombinedScanTask));
} catch (IOException e) {
throw new UncheckedIOException("Failed to process task iterable: ", e);
}
}

static CloseableIterable<CombinedScanTask> planTasks(Table table, ScanContext context) {
TableScan scan = table
.newScan()
.caseSensitive(context.caseSensitive())
.project(context.project());

if (context.includeColumnStats()) {
scan = scan.includeColumnStats();
}

if (context.snapshotId() != null) {
scan = scan.useSnapshot(context.snapshotId());
}

if (context.asOfTimestamp() != null) {
scan = scan.asOfTime(context.asOfTimestamp());
}

if (context.startSnapshotId() != null) {
if (context.endSnapshotId() != null) {
scan = scan.appendsBetween(context.startSnapshotId(), context.endSnapshotId());
} else {
scan = scan.appendsAfter(context.startSnapshotId());
}
}

if (context.splitSize() != null) {
scan = scan.option(TableProperties.SPLIT_SIZE, context.splitSize().toString());
}

if (context.splitLookback() != null) {
scan = scan.option(TableProperties.SPLIT_LOOKBACK, context.splitLookback().toString());
}

if (context.splitOpenFileCost() != null) {
scan = scan.option(TableProperties.SPLIT_OPEN_FILE_COST, context.splitOpenFileCost().toString());
}

if (context.filters() != null) {
for (Expression filter : context.filters()) {
scan = scan.filter(filter);
}
}

return scan.planTasks();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ class ScanContext implements Serializable {
private static final ConfigOption<Duration> MONITOR_INTERVAL =
ConfigOptions.key("monitor-interval").durationType().defaultValue(Duration.ofSeconds(10));

private static final ConfigOption<Boolean> INCLUDE_COLUMN_STATS =
ConfigOptions.key("include-column-stats").booleanType().defaultValue(false);

private final boolean caseSensitive;
private final Long snapshotId;
private final Long startSnapshotId;
Expand All @@ -83,11 +86,12 @@ class ScanContext implements Serializable {
private final Schema schema;
private final List<Expression> filters;
private final long limit;
private final boolean includeColumnStats;

private ScanContext(boolean caseSensitive, Long snapshotId, Long startSnapshotId, Long endSnapshotId,
Long asOfTimestamp, Long splitSize, Integer splitLookback, Long splitOpenFileCost,
boolean isStreaming, Duration monitorInterval, String nameMapping,
Schema schema, List<Expression> filters, long limit) {
Schema schema, List<Expression> filters, long limit, boolean includeColumnStats) {
this.caseSensitive = caseSensitive;
this.snapshotId = snapshotId;
this.startSnapshotId = startSnapshotId;
Expand All @@ -103,6 +107,7 @@ private ScanContext(boolean caseSensitive, Long snapshotId, Long startSnapshotId
this.schema = schema;
this.filters = filters;
this.limit = limit;
this.includeColumnStats = includeColumnStats;
}

boolean caseSensitive() {
Expand Down Expand Up @@ -161,6 +166,10 @@ long limit() {
return limit;
}

boolean includeColumnStats() {
return includeColumnStats;
}

ScanContext copyWithAppendsBetween(long newStartSnapshotId, long newEndSnapshotId) {
return ScanContext.builder()
.caseSensitive(caseSensitive)
Expand All @@ -177,6 +186,7 @@ ScanContext copyWithAppendsBetween(long newStartSnapshotId, long newEndSnapshotI
.project(schema)
.filters(filters)
.limit(limit)
.includeColumnStats(includeColumnStats)
.build();
}

Expand All @@ -196,6 +206,7 @@ ScanContext copyWithSnapshotId(long newSnapshotId) {
.project(schema)
.filters(filters)
.limit(limit)
.includeColumnStats(includeColumnStats)
.build();
}

Expand All @@ -218,6 +229,7 @@ static class Builder {
private Schema projectedSchema;
private List<Expression> filters;
private long limit = -1L;
private boolean includeColumnStats = INCLUDE_COLUMN_STATS.defaultValue();

private Builder() {
}
Expand Down Expand Up @@ -292,6 +304,11 @@ Builder limit(long newLimit) {
return this;
}

Builder includeColumnStats(boolean newIncludeColumnStats) {
this.includeColumnStats = newIncludeColumnStats;
return this;
}

Builder fromProperties(Map<String, String> properties) {
Configuration config = new Configuration();
properties.forEach(config::setString);
Expand All @@ -306,14 +323,15 @@ Builder fromProperties(Map<String, String> properties) {
.splitOpenFileCost(config.get(SPLIT_FILE_OPEN_COST))
.streaming(config.get(STREAMING))
.monitorInterval(config.get(MONITOR_INTERVAL))
.nameMapping(properties.get(DEFAULT_NAME_MAPPING));
.nameMapping(properties.get(DEFAULT_NAME_MAPPING))
.includeColumnStats(config.get(INCLUDE_COLUMN_STATS));
}

public ScanContext build() {
return new ScanContext(caseSensitive, snapshotId, startSnapshotId,
endSnapshotId, asOfTimestamp, splitSize, splitLookback,
splitOpenFileCost, isStreaming, monitorInterval, nameMapping, projectedSchema,
filters, limit);
filters, limit, includeColumnStats);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ private void monitorAndForwardSplits() {
newScanContext = scanContext.copyWithAppendsBetween(lastSnapshotId, snapshotId);
}

FlinkInputSplit[] splits = FlinkSplitGenerator.createInputSplits(table, newScanContext);
FlinkInputSplit[] splits = FlinkSplitPlanner.planInputSplits(table, newScanContext);
for (FlinkInputSplit split : splits) {
sourceContext.collect(split);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.flink.source.split;

import java.io.IOException;
import java.io.Serializable;
import java.util.Collection;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.util.InstantiationUtil;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;

@Internal
public class IcebergSourceSplit implements SourceSplit, Serializable {
private static final long serialVersionUID = 1L;

private final CombinedScanTask task;

private int fileOffset;
private long recordOffset;

// The splits are frequently serialized into checkpoints.
// Caching the byte representation makes repeated serialization cheap.
@Nullable
private transient byte[] serializedBytesCache;

private IcebergSourceSplit(CombinedScanTask task, int fileOffset, long recordOffset) {
this.task = task;
this.fileOffset = fileOffset;
this.recordOffset = recordOffset;
}

public static IcebergSourceSplit fromCombinedScanTask(CombinedScanTask combinedScanTask) {
return fromCombinedScanTask(combinedScanTask, 0, 0L);
}

public static IcebergSourceSplit fromCombinedScanTask(
CombinedScanTask combinedScanTask, int fileOffset, long recordOffset) {
return new IcebergSourceSplit(combinedScanTask, fileOffset, recordOffset);
}

public CombinedScanTask task() {
return task;
}

public int fileOffset() {
return fileOffset;
}

public long recordOffset() {
return recordOffset;
}

@Override
public String splitId() {
return MoreObjects.toStringHelper(this)
.add("files", toString(task.files()))
.toString();
}

public void updatePosition(int newFileOffset, long newRecordOffset) {
// invalidate the cache after position change
serializedBytesCache = null;
fileOffset = newFileOffset;
recordOffset = newRecordOffset;
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("files", toString(task.files()))
.add("fileOffset", fileOffset)
.add("recordOffset", recordOffset)
.toString();
}

private String toString(Collection<FileScanTask> files) {
return Iterables.toString(files.stream().map(fileScanTask ->
MoreObjects.toStringHelper(fileScanTask)
.add("file", fileScanTask.file().path().toString())
.add("start", fileScanTask.start())
.add("length", fileScanTask.length())
.toString()).collect(Collectors.toList()));
}

byte[] serializeV1() throws IOException {
if (serializedBytesCache == null) {
serializedBytesCache = InstantiationUtil.serializeObject(this);
}
return serializedBytesCache;
}

static IcebergSourceSplit deserializeV1(byte[] serialized) throws IOException {
try {
return InstantiationUtil.deserializeObject(serialized, IcebergSourceSplit.class.getClassLoader());
} catch (ClassNotFoundException e) {
throw new RuntimeException("Failed to deserialize the split.", e);
}
}
}
Loading