Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public FlinkInputSplit[] createInputSplits(int minNumSplits) throws IOException
tableLoader.open();
try (TableLoader loader = tableLoader) {
Table table = loader.loadTable();
return FlinkSplitGenerator.createInputSplits(table, context);
return FlinkSplitPlanner.planInputSplits(table, context);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,33 +22,56 @@
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
import org.apache.flink.annotation.Internal;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;

class FlinkSplitGenerator {
private FlinkSplitGenerator() {
@Internal
public class FlinkSplitPlanner {
private FlinkSplitPlanner() {
}

static FlinkInputSplit[] createInputSplits(Table table, ScanContext context) {
List<CombinedScanTask> tasks = tasks(table, context);
FlinkInputSplit[] splits = new FlinkInputSplit[tasks.size()];
for (int i = 0; i < tasks.size(); i++) {
splits[i] = new FlinkInputSplit(i, tasks.get(i));
static FlinkInputSplit[] planInputSplits(Table table, ScanContext context) {
try (CloseableIterable<CombinedScanTask> tasksIterable = planTasks(table, context)) {
List<CombinedScanTask> tasks = Lists.newArrayList(tasksIterable);
FlinkInputSplit[] splits = new FlinkInputSplit[tasks.size()];
for (int i = 0; i < tasks.size(); i++) {
splits[i] = new FlinkInputSplit(i, tasks.get(i));
}
return splits;
} catch (IOException e) {
throw new UncheckedIOException("Failed to process tasks iterable", e);
}
}

/**
* This returns splits for the FLIP-27 source
*/
public static List<IcebergSourceSplit> planIcebergSourceSplits(Table table, ScanContext context) {
try (CloseableIterable<CombinedScanTask> tasksIterable = planTasks(table, context)) {
return Lists.newArrayList(CloseableIterable.transform(tasksIterable,
task -> IcebergSourceSplit.fromCombinedScanTask(task)));
} catch (IOException e) {
throw new UncheckedIOException("Failed to process task iterable: ", e);
}
return splits;
}

private static List<CombinedScanTask> tasks(Table table, ScanContext context) {
static CloseableIterable<CombinedScanTask> planTasks(Table table, ScanContext context) {
TableScan scan = table
.newScan()
.caseSensitive(context.caseSensitive())
.project(context.project());

if (context.includeColumnStats()) {
scan = scan.includeColumnStats();
}
Comment on lines +71 to +73
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why add this switch in this PR ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will recommend to make this PR to add the flip-27 source split as focused as possible. So it will recommend to remove the unrelated changes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

column stats are needed for event time/watermark aligned assigner. You are correct that it is directly used by this PR. Right now, I am taking the approach of splitting sub PRs at minimally connected files for easier creation of the sub PRs. if you think it is important to avoid unrelated changes inside a file, I can revert the piece of change.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@stevenzwu How is your feeling for this comment ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a reasonable addition. I think the motivation is to change the file in just this PR and not in the others that are part of FLIP-27.


if (context.snapshotId() != null) {
scan = scan.useSnapshot(context.snapshotId());
}
Expand Down Expand Up @@ -83,10 +106,6 @@ private static List<CombinedScanTask> tasks(Table table, ScanContext context) {
}
}

try (CloseableIterable<CombinedScanTask> tasksIterable = scan.planTasks()) {
return Lists.newArrayList(tasksIterable);
} catch (IOException e) {
throw new UncheckedIOException("Failed to close table scan: " + scan, e);
}
return scan.planTasks();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ class ScanContext implements Serializable {
private static final ConfigOption<Duration> MONITOR_INTERVAL =
ConfigOptions.key("monitor-interval").durationType().defaultValue(Duration.ofSeconds(10));

private static final ConfigOption<Boolean> INCLUDE_COLUMN_STATS =
ConfigOptions.key("include-column-stats").booleanType().defaultValue(false);

private final boolean caseSensitive;
private final Long snapshotId;
private final Long startSnapshotId;
Expand All @@ -83,11 +86,12 @@ class ScanContext implements Serializable {
private final Schema schema;
private final List<Expression> filters;
private final long limit;
private final boolean includeColumnStats;

private ScanContext(boolean caseSensitive, Long snapshotId, Long startSnapshotId, Long endSnapshotId,
Long asOfTimestamp, Long splitSize, Integer splitLookback, Long splitOpenFileCost,
boolean isStreaming, Duration monitorInterval, String nameMapping,
Schema schema, List<Expression> filters, long limit) {
Schema schema, List<Expression> filters, long limit, boolean includeColumnStats) {
this.caseSensitive = caseSensitive;
this.snapshotId = snapshotId;
this.startSnapshotId = startSnapshotId;
Expand All @@ -103,6 +107,7 @@ private ScanContext(boolean caseSensitive, Long snapshotId, Long startSnapshotId
this.schema = schema;
this.filters = filters;
this.limit = limit;
this.includeColumnStats = includeColumnStats;
}

boolean caseSensitive() {
Expand Down Expand Up @@ -161,6 +166,10 @@ long limit() {
return limit;
}

boolean includeColumnStats() {
return includeColumnStats;
}

ScanContext copyWithAppendsBetween(long newStartSnapshotId, long newEndSnapshotId) {
return ScanContext.builder()
.caseSensitive(caseSensitive)
Expand All @@ -177,6 +186,7 @@ ScanContext copyWithAppendsBetween(long newStartSnapshotId, long newEndSnapshotI
.project(schema)
.filters(filters)
.limit(limit)
.includeColumnStats(includeColumnStats)
.build();
}

Expand All @@ -196,6 +206,7 @@ ScanContext copyWithSnapshotId(long newSnapshotId) {
.project(schema)
.filters(filters)
.limit(limit)
.includeColumnStats(includeColumnStats)
.build();
}

Expand All @@ -218,6 +229,7 @@ static class Builder {
private Schema projectedSchema;
private List<Expression> filters;
private long limit = -1L;
private boolean includeColumnStats = INCLUDE_COLUMN_STATS.defaultValue();

private Builder() {
}
Expand Down Expand Up @@ -292,6 +304,11 @@ Builder limit(long newLimit) {
return this;
}

Builder includeColumnStats(boolean newIncludeColumnStats) {
this.includeColumnStats = newIncludeColumnStats;
return this;
}

Builder fromProperties(Map<String, String> properties) {
Configuration config = new Configuration();
properties.forEach(config::setString);
Expand All @@ -306,14 +323,15 @@ Builder fromProperties(Map<String, String> properties) {
.splitOpenFileCost(config.get(SPLIT_FILE_OPEN_COST))
.streaming(config.get(STREAMING))
.monitorInterval(config.get(MONITOR_INTERVAL))
.nameMapping(properties.get(DEFAULT_NAME_MAPPING));
.nameMapping(properties.get(DEFAULT_NAME_MAPPING))
.includeColumnStats(config.get(INCLUDE_COLUMN_STATS));
}

public ScanContext build() {
return new ScanContext(caseSensitive, snapshotId, startSnapshotId,
endSnapshotId, asOfTimestamp, splitSize, splitLookback,
splitOpenFileCost, isStreaming, monitorInterval, nameMapping, projectedSchema,
filters, limit);
filters, limit, includeColumnStats);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ private void monitorAndForwardSplits() {
newScanContext = scanContext.copyWithAppendsBetween(lastSnapshotId, snapshotId);
}

FlinkInputSplit[] splits = FlinkSplitGenerator.createInputSplits(table, newScanContext);
FlinkInputSplit[] splits = FlinkSplitPlanner.planInputSplits(table, newScanContext);
for (FlinkInputSplit split : splits) {
sourceContext.collect(split);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.flink.source.split;

import java.io.IOException;
import java.io.Serializable;
import java.util.Collection;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.util.InstantiationUtil;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;

@Internal
public class IcebergSourceSplit implements SourceSplit, Serializable {
private static final long serialVersionUID = 1L;

private final CombinedScanTask task;

private int fileOffset;
private long recordOffset;

// The splits are frequently serialized into checkpoints.
// Caching the byte representation makes repeated serialization cheap.
@Nullable
private transient byte[] serializedBytesCache;

private IcebergSourceSplit(CombinedScanTask task, int fileOffset, long recordOffset) {
this.task = task;
this.fileOffset = fileOffset;
this.recordOffset = recordOffset;
}

public static IcebergSourceSplit fromCombinedScanTask(CombinedScanTask combinedScanTask) {
return fromCombinedScanTask(combinedScanTask, 0, 0L);
}

public static IcebergSourceSplit fromCombinedScanTask(
CombinedScanTask combinedScanTask, int fileOffset, long recordOffset) {
return new IcebergSourceSplit(combinedScanTask, fileOffset, recordOffset);
}

public CombinedScanTask task() {
return task;
}

public int fileOffset() {
return fileOffset;
}

public long recordOffset() {
return recordOffset;
}

@Override
public String splitId() {
return MoreObjects.toStringHelper(this)
.add("files", toString(task.files()))
.toString();
}

public void updatePosition(int newFileOffset, long newRecordOffset) {
// invalidate the cache after position change
serializedBytesCache = null;
fileOffset = newFileOffset;
recordOffset = newRecordOffset;
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("files", toString(task.files()))
.add("fileOffset", fileOffset)
.add("recordOffset", recordOffset)
.toString();
}

private String toString(Collection<FileScanTask> files) {
return Iterables.toString(files.stream().map(fileScanTask ->
MoreObjects.toStringHelper(fileScanTask)
.add("file", fileScanTask.file().path().toString())
.add("start", fileScanTask.start())
.add("length", fileScanTask.length())
.toString()).collect(Collectors.toList()));
}

byte[] serializeV1() throws IOException {
if (serializedBytesCache == null) {
serializedBytesCache = InstantiationUtil.serializeObject(this);
}
return serializedBytesCache;
}

static IcebergSourceSplit deserializeV1(byte[] serialized) throws IOException {
try {
return InstantiationUtil.deserializeObject(serialized, IcebergSourceSplit.class.getClassLoader());
} catch (ClassNotFoundException e) {
throw new RuntimeException("Failed to deserialize the split.", e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.flink.source.split;

import java.io.IOException;
import org.apache.flink.annotation.Internal;
import org.apache.flink.core.io.SimpleVersionedSerializer;

/**
* TODO: use Java serialization for now.
* Will switch to more stable serializer from
* <a href="https://github.com/apache/iceberg/issues/1698">issue-1698</a>.
*/
@Internal
public class IcebergSourceSplitSerializer implements SimpleVersionedSerializer<IcebergSourceSplit> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this get used to serialize / deserialize the splits across task boundaries, or just for checkpoints?

The comment in IcebergSourceSplit for serializedFormCache field mentions checkpoints, but curious about crossing task boundaries etc.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question for my own understanding after looking through SimpleVersionedSerializer docs:

It seems like SimpleVersionedSerializer can only handle one version at a time (hence the getVersion() function).

Is that correct? Are there any best practices when working with SimpleVersionedSerializer to consider? When we evolve, will we have two classes or put all known classes into one instance?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class is used for checkpoint state serializer. cross-process (JM->TM) communication is via Java serializable. Currently, we are using the Java serializable inside this class too for simpler start. This is not ideal, as we know Java serializable is not good with schema evolution. Schema evolution would be important for long-running streaming jobs (not so much for batch jobs).

In the class Javadoc, we linked to an issue for future improvement. Note that this is already an issue for the current FlinkSource in streaming mode.
#1698

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SimpleVersionedSerializer will always use one/latest version to serialize. But during deserialization, it should handle multiple versions to support evolution (e.g. when we switch from Java serializable to some Avro serialization for FileScanTask)

  @Override
  public IcebergSourceSplit deserialize(int version, byte[] serialized) throws IOException {
    switch (version) {
      case 1:
        return deserializeV1(serialized);
      default:
        throw new IOException("Unknown version: " + version);
    }
  }

public static final IcebergSourceSplitSerializer INSTANCE = new IcebergSourceSplitSerializer();
private static final int VERSION = 1;

@Override
public int getVersion() {
return VERSION;
}

@Override
public byte[] serialize(IcebergSourceSplit split) throws IOException {
return split.serializeV1();
}

@Override
public IcebergSourceSplit deserialize(int version, byte[] serialized) throws IOException {
switch (version) {
case 1:
return IcebergSourceSplit.deserializeV1(serialized);
default:
throw new IOException(String.format("Failed to deserialize IcebergSourceSplit. " +
"Encountered unsupported version: %d. Supported version are [1]", version));
}
}
}
Loading