Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public FlinkInputSplit[] createInputSplits(int minNumSplits) throws IOException
tableLoader.open();
try (TableLoader loader = tableLoader) {
Table table = loader.loadTable();
return FlinkSplitGenerator.createInputSplits(table, context);
return FlinkSplitPlanner.planInputSplits(table, context);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,33 +22,56 @@
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
import org.apache.flink.annotation.Internal;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;

class FlinkSplitGenerator {
private FlinkSplitGenerator() {
@Internal
public class FlinkSplitPlanner {
private FlinkSplitPlanner() {
}

static FlinkInputSplit[] createInputSplits(Table table, ScanContext context) {
List<CombinedScanTask> tasks = tasks(table, context);
FlinkInputSplit[] splits = new FlinkInputSplit[tasks.size()];
for (int i = 0; i < tasks.size(); i++) {
splits[i] = new FlinkInputSplit(i, tasks.get(i));
static FlinkInputSplit[] planInputSplits(Table table, ScanContext context) {
try (CloseableIterable<CombinedScanTask> tasksIterable = planTasks(table, context)) {
List<CombinedScanTask> tasks = Lists.newArrayList(tasksIterable);
FlinkInputSplit[] splits = new FlinkInputSplit[tasks.size()];
for (int i = 0; i < tasks.size(); i++) {
splits[i] = new FlinkInputSplit(i, tasks.get(i));
}
return splits;
} catch (IOException e) {
throw new UncheckedIOException("Failed to process tasks iterable", e);
}
}

/**
* This returns splits for the FLIP-27 source
*/
public static List<IcebergSourceSplit> planIcebergSourceSplits(Table table, ScanContext context) {
try (CloseableIterable<CombinedScanTask> tasksIterable = planTasks(table, context)) {
return Lists.newArrayList(CloseableIterable.transform(tasksIterable,
task -> IcebergSourceSplit.fromCombinedScanTask(task)));
} catch (IOException e) {
throw new UncheckedIOException("Failed to process task iterable: ", e);
}
return splits;
}

private static List<CombinedScanTask> tasks(Table table, ScanContext context) {
static CloseableIterable<CombinedScanTask> planTasks(Table table, ScanContext context) {
TableScan scan = table
.newScan()
.caseSensitive(context.caseSensitive())
.project(context.project());

if (context.includeColumnStats()) {
scan = scan.includeColumnStats();
}

if (context.snapshotId() != null) {
scan = scan.useSnapshot(context.snapshotId());
}
Expand Down Expand Up @@ -83,10 +106,6 @@ private static List<CombinedScanTask> tasks(Table table, ScanContext context) {
}
}

try (CloseableIterable<CombinedScanTask> tasksIterable = scan.planTasks()) {
return Lists.newArrayList(tasksIterable);
} catch (IOException e) {
throw new UncheckedIOException("Failed to close table scan: " + scan, e);
}
return scan.planTasks();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ class ScanContext implements Serializable {
private static final ConfigOption<Duration> MONITOR_INTERVAL =
ConfigOptions.key("monitor-interval").durationType().defaultValue(Duration.ofSeconds(10));

private static final ConfigOption<Boolean> INCLUDE_COLUMN_STATS =
ConfigOptions.key("include-column-stats").booleanType().defaultValue(false);

private final boolean caseSensitive;
private final Long snapshotId;
private final Long startSnapshotId;
Expand All @@ -83,11 +86,12 @@ class ScanContext implements Serializable {
private final Schema schema;
private final List<Expression> filters;
private final long limit;
private final boolean includeColumnStats;

private ScanContext(boolean caseSensitive, Long snapshotId, Long startSnapshotId, Long endSnapshotId,
Long asOfTimestamp, Long splitSize, Integer splitLookback, Long splitOpenFileCost,
boolean isStreaming, Duration monitorInterval, String nameMapping,
Schema schema, List<Expression> filters, long limit) {
Schema schema, List<Expression> filters, long limit, boolean includeColumnStats) {
this.caseSensitive = caseSensitive;
this.snapshotId = snapshotId;
this.startSnapshotId = startSnapshotId;
Expand All @@ -103,6 +107,7 @@ private ScanContext(boolean caseSensitive, Long snapshotId, Long startSnapshotId
this.schema = schema;
this.filters = filters;
this.limit = limit;
this.includeColumnStats = includeColumnStats;
}

boolean caseSensitive() {
Expand Down Expand Up @@ -161,6 +166,10 @@ long limit() {
return limit;
}

boolean includeColumnStats() {
return includeColumnStats;
}

ScanContext copyWithAppendsBetween(long newStartSnapshotId, long newEndSnapshotId) {
return ScanContext.builder()
.caseSensitive(caseSensitive)
Expand All @@ -177,6 +186,7 @@ ScanContext copyWithAppendsBetween(long newStartSnapshotId, long newEndSnapshotI
.project(schema)
.filters(filters)
.limit(limit)
.includeColumnStats(includeColumnStats)
.build();
}

Expand All @@ -196,6 +206,7 @@ ScanContext copyWithSnapshotId(long newSnapshotId) {
.project(schema)
.filters(filters)
.limit(limit)
.includeColumnStats(includeColumnStats)
.build();
}

Expand All @@ -218,6 +229,7 @@ static class Builder {
private Schema projectedSchema;
private List<Expression> filters;
private long limit = -1L;
private boolean includeColumnStats = INCLUDE_COLUMN_STATS.defaultValue();

private Builder() {
}
Expand Down Expand Up @@ -292,6 +304,11 @@ Builder limit(long newLimit) {
return this;
}

Builder includeColumnStats(boolean newIncludeColumnStats) {
this.includeColumnStats = newIncludeColumnStats;
return this;
}

Builder fromProperties(Map<String, String> properties) {
Configuration config = new Configuration();
properties.forEach(config::setString);
Expand All @@ -306,14 +323,15 @@ Builder fromProperties(Map<String, String> properties) {
.splitOpenFileCost(config.get(SPLIT_FILE_OPEN_COST))
.streaming(config.get(STREAMING))
.monitorInterval(config.get(MONITOR_INTERVAL))
.nameMapping(properties.get(DEFAULT_NAME_MAPPING));
.nameMapping(properties.get(DEFAULT_NAME_MAPPING))
.includeColumnStats(config.get(INCLUDE_COLUMN_STATS));
}

public ScanContext build() {
return new ScanContext(caseSensitive, snapshotId, startSnapshotId,
endSnapshotId, asOfTimestamp, splitSize, splitLookback,
splitOpenFileCost, isStreaming, monitorInterval, nameMapping, projectedSchema,
filters, limit);
filters, limit, includeColumnStats);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ private void monitorAndForwardSplits() {
newScanContext = scanContext.copyWithAppendsBetween(lastSnapshotId, snapshotId);
}

FlinkInputSplit[] splits = FlinkSplitGenerator.createInputSplits(table, newScanContext);
FlinkInputSplit[] splits = FlinkSplitPlanner.planInputSplits(table, newScanContext);
for (FlinkInputSplit split : splits) {
sourceContext.collect(split);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.flink.source.split;

import java.io.IOException;
import java.io.Serializable;
import java.util.Collection;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.util.InstantiationUtil;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;

@Internal
public class IcebergSourceSplit implements SourceSplit, Serializable {
private static final long serialVersionUID = 1L;

private final CombinedScanTask task;

private int fileOffset;
private long recordOffset;

// The splits are frequently serialized into checkpoints.
// Caching the byte representation makes repeated serialization cheap.
@Nullable
private transient byte[] serializedBytesCache;

private IcebergSourceSplit(CombinedScanTask task, int fileOffset, long recordOffset) {
this.task = task;
this.fileOffset = fileOffset;
this.recordOffset = recordOffset;
}

public static IcebergSourceSplit fromCombinedScanTask(CombinedScanTask combinedScanTask) {
return fromCombinedScanTask(combinedScanTask, 0, 0L);
}

public static IcebergSourceSplit fromCombinedScanTask(
CombinedScanTask combinedScanTask, int fileOffset, long recordOffset) {
return new IcebergSourceSplit(combinedScanTask, fileOffset, recordOffset);
}

public CombinedScanTask task() {
return task;
}

public int fileOffset() {
return fileOffset;
}

public long recordOffset() {
return recordOffset;
}

@Override
public String splitId() {
return MoreObjects.toStringHelper(this)
.add("files", toString(task.files()))
.toString();
}

public void updatePosition(int newFileOffset, long newRecordOffset) {
// invalidate the cache after position change
serializedBytesCache = null;
fileOffset = newFileOffset;
recordOffset = newRecordOffset;
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("files", toString(task.files()))
.add("fileOffset", fileOffset)
.add("recordOffset", recordOffset)
.toString();
}

private String toString(Collection<FileScanTask> files) {
return Iterables.toString(files.stream().map(fileScanTask ->
MoreObjects.toStringHelper(fileScanTask)
.add("file", fileScanTask.file().path().toString())
.add("start", fileScanTask.start())
.add("length", fileScanTask.length())
.toString()).collect(Collectors.toList()));
}

byte[] serializeV1() throws IOException {
if (serializedBytesCache == null) {
serializedBytesCache = InstantiationUtil.serializeObject(this);
}
return serializedBytesCache;
}

static IcebergSourceSplit deserializeV1(byte[] serialized) throws IOException {
try {
return InstantiationUtil.deserializeObject(serialized, IcebergSourceSplit.class.getClassLoader());
} catch (ClassNotFoundException e) {
throw new RuntimeException("Failed to deserialize the split.", e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.flink.source.split;

import java.io.IOException;
import org.apache.flink.annotation.Internal;
import org.apache.flink.core.io.SimpleVersionedSerializer;

/**
* TODO: use Java serialization for now.
* Will switch to more stable serializer from
* <a href="https://github.com/apache/iceberg/issues/1698">issue-1698</a>.
*/
@Internal
public class IcebergSourceSplitSerializer implements SimpleVersionedSerializer<IcebergSourceSplit> {
public static final IcebergSourceSplitSerializer INSTANCE = new IcebergSourceSplitSerializer();
private static final int VERSION = 1;

@Override
public int getVersion() {
return VERSION;
}

@Override
public byte[] serialize(IcebergSourceSplit split) throws IOException {
return split.serializeV1();
}

@Override
public IcebergSourceSplit deserialize(int version, byte[] serialized) throws IOException {
switch (version) {
case 1:
return IcebergSourceSplit.deserializeV1(serialized);
default:
throw new IOException(String.format("Failed to deserialize IcebergSourceSplit. " +
"Encountered unsupported version: %d. Supported version are [1]", version));
}
}
}
Loading