Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 81 additions & 0 deletions flink-runtime/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

project(':iceberg-flink-runtime') {
apply plugin: 'com.github.johnrengelman.shadow'

tasks.jar.dependsOn tasks.shadowJar

configurations {
implementation {
exclude group: 'org.apache.flink'
// included in Flink
exclude group: 'org.slf4j'
exclude group: 'org.apache.commons'
exclude group: 'commons-pool'
exclude group: 'commons-codec'
exclude group: 'org.xerial.snappy'
exclude group: 'javax.xml.bind'
exclude group: 'javax.annotation'
}
}

dependencies {
implementation project(':iceberg-flink')
implementation project(':iceberg-aws')
implementation(project(':iceberg-nessie')) {
exclude group: 'com.google.code.findbugs', module: 'jsr305'
}

// flink-connector-base is not part of Flink runtime. Hence,
// iceberg-flink-runtime should include it as a transitive dependency.
implementation "org.apache.flink:flink-connector-base"
Copy link
Contributor

@rdblue rdblue Oct 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For other dependencies that we expect to be present at runtime, we use compileOnly so the dependency doesn't leak into the runtime Jar. Is that something we should do here as well? This looks like it would add the Flink Jar into runtimeClasspath, which would get included in the Jar.

Copy link
Contributor Author

@stevenzwu stevenzwu Oct 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the flink-runtime module. Hence we used implementation dep to pull in flink-connector-base for the iceberg-flink-runtime jar. In the flink module below, it is compileOnly

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add comment.

@rdblue flink-connector-base needs to be a transitive dependency of the iceberg connector (or shaded/relocated). It is not part of the Flink runtime.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added comment

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@stevenzwu After we got this PR merged: #3364, we don't need to introduce a common iceberg-flink-runtime for all different flink versions, instead we have a different iceberg-flink:iceberg-flink-<MAJOR.MINO>-runtime module for different <MAJRO.MINOR> flink releases so that we could build the features on top of the latest flink API.

You may want to add the transitive dependency org.apache.flink:flink-connector-base in this line for flink 1.12 , and this line for flink 1.13.

}

shadowJar {
configurations = [project.configurations.runtimeClasspath]

zip64 true

// include the LICENSE and NOTICE files for the shaded Jar
from(projectDir) {
include 'LICENSE'
include 'NOTICE'
}

// Relocate dependencies to avoid conflicts
relocate 'org.apache.avro', 'org.apache.iceberg.shaded.org.apache.avro'
relocate 'org.apache.parquet', 'org.apache.iceberg.shaded.org.apache.parquet'
relocate 'com.google', 'org.apache.iceberg.shaded.com.google'
relocate 'com.fasterxml', 'org.apache.iceberg.shaded.com.fasterxml'
relocate 'com.github.benmanes', 'org.apache.iceberg.shaded.com.github.benmanes'
relocate 'org.checkerframework', 'org.apache.iceberg.shaded.org.checkerframework'
relocate 'shaded.parquet', 'org.apache.iceberg.shaded.org.apache.parquet.shaded'
relocate 'org.apache.orc', 'org.apache.iceberg.shaded.org.apache.orc'
relocate 'io.airlift', 'org.apache.iceberg.shaded.io.airlift'
relocate 'org.threeten.extra', 'org.apache.iceberg.shaded.org.threeten.extra'

classifier null
}

jar {
enabled = false
}
}

2 changes: 2 additions & 0 deletions flink/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ project(':iceberg-flink') {
implementation project(':iceberg-parquet')
implementation project(':iceberg-hive-metastore')

compileOnly "org.apache.flink:flink-connector-base"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's strange the the build for flink 1.12 & flink 1.13 has been passed, because I don't see the same dependency are added to flink 1.12 build.gradle and 1.13 build.gradle. Maybe I need to check the 1.12's build.gradle again.

Copy link
Contributor Author

@stevenzwu stevenzwu Nov 1, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@openinx Maybe follow-up on the other comment discussion here.

With the SplitEnumerator API change, looks like I need to put FLIP-27 source in the v1.13 folder. What should we do with future versions (like 1.14)? do we copy the FLIP-27 source code from v1.13 to v1.14 folder?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@openinx, the tests run against the iceberg-flink module. They aren't present in the 1.12 or 1.13 modules. If you want them to be run for those modules, you'd need to add the source folder like you do for src/main/java. If you choose to do that, let's also remove CI for the common module since we don't need to run the tests outside of 1.12 and 1.13 if they are run in those modules.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@stevenzwu, I think that copying the parts that change is reasonable. And once we remove support for 1.12, you can move the files back into the common module.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah. that is my plan too. Once 1.12 support is removed, we should be able to move files back to the common module. We just need to be diligent with these efforts.

compileOnly "org.apache.flink:flink-streaming-java_2.12"
compileOnly "org.apache.flink:flink-streaming-java_2.12::tests"
compileOnly "org.apache.flink:flink-table-api-java-bridge_2.12"
Expand Down Expand Up @@ -56,6 +57,7 @@ project(':iceberg-flink') {
exclude group: 'org.apache.hive', module: 'hive-storage-api'
}

testImplementation "org.apache.flink:flink-connector-test-utils"
testImplementation "org.apache.flink:flink-core"
testImplementation "org.apache.flink:flink-runtime_2.12"
testImplementation "org.apache.flink:flink-table-planner-blink_2.12"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,10 @@ private FlinkConfigOptions() {
.intType()
.defaultValue(100)
.withDescription("Sets max infer parallelism for source operator.");

public static final ConfigOption<Integer> SOURCE_READER_FETCH_RECORD_BATCH_SIZE = ConfigOptions
.key("source.iceberg.reader.fetch-record-batch-size")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there precedent for this config key? What other keys are similar? The others in this file start with table.exec.iceberg. Is there a reason for not continuing with that convention?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't use table.exec, as this config isn't about table/SQL execution behavior. This is the Iceberg source behavior (DataStream API or SQL).

I checked the two FLIP-27 source impls (Kafka and file) in Flink repo.

  • Kafka source option doesn't contain any prefix, e.g. "partition.discovery.interval.ms"
  • file source does contain a prefix. e.g. "source.file.records.fetch-size"

This is following the file source convention.

@openinx any suggestion?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is unrelated to the table/SQL execution. Both datastream job and table SQL job are using the same configuration keys. So I'm okay to keep the current name.

(In fact, if we don't consider the flink's configuration name, I'd prefer to name it iceberg.source.reader.fetch-record-batch-size. But iceberg is also a kind of flink connector, all the other flip-27 source connector has named it as source.<connector>.xxx, so I think we can follow the naming).

.intType()
.defaultValue(2048)
.withDescription("The target number of records for Iceberg reader fetch batch.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.iceberg.encryption.InputFilesDecryptor;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

/**
* Flink data iterator that reads {@link CombinedScanTask} into a {@link CloseableIterator}
Expand All @@ -41,18 +42,47 @@ public class DataIterator<T> implements CloseableIterator<T> {
private final FileScanTaskReader<T> fileScanTaskReader;

private final InputFilesDecryptor inputFilesDecryptor;
private Iterator<FileScanTask> tasks;
private final CombinedScanTask combinedTask;
private final Position position;

private Iterator<FileScanTask> fileTasksIterator;
private CloseableIterator<T> currentIterator;

public DataIterator(FileScanTaskReader<T> fileScanTaskReader, CombinedScanTask task,
FileIO io, EncryptionManager encryption) {
this.fileScanTaskReader = fileScanTaskReader;

this.inputFilesDecryptor = new InputFilesDecryptor(task, io, encryption);
this.tasks = task.files().iterator();
this.combinedTask = task;
// fileOffset starts at -1 because we started
// from an empty iterator that is not from the split files.
this.position = new Position(-1, 0L);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The general DataIterator don't use the position or seek method to skip tasks or records. Putting all the flip-27 related logics in the flink common read path does not make sense to me, because every times when I read this class, I need to see which part is related to flip-27, which is the unrelated part.

I will suggest to introduce a separate SeekableDataIterator to isolate the two code path, I made a simple commit for this: https://github.com/openinx/incubator-iceberg/commit/b08dde86aae0c718d9d72acb347dffb3a836b336, you may want to take a look.

Copy link
Contributor Author

@stevenzwu stevenzwu Nov 1, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't say that seek capability is FLIP-27 specific. If we think DataIterator as reading a list of files/splits from CombinedScanTask, it is like a file API where seek is pretty common. It is needed to achieve exactly-once processing semantics. e.g., if we were to implement exactly once semantics for the current streaming source, I would imagine we need this as well.

Thanks a lot for the SeekableDataIterator. I feel that leaving these two empty abstract methods in the base DataIterator is a little weird

protected void advanceRecord()
protected void advanceTask()

Overall, I still think adding seek capability to DataIterator is natural (for file-like read APIs)


this.fileTasksIterator = task.files().iterator();
this.currentIterator = CloseableIterator.empty();
}

public void seek(Position startingPosition) {
// skip files
Preconditions.checkArgument(startingPosition.fileOffset() < combinedTask.files().size(),
"Checkpointed file offset is %d, while CombinedScanTask has %d files",
startingPosition.fileOffset(), combinedTask.files().size());
for (long i = 0L; i < startingPosition.fileOffset(); ++i) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is fileOffset() a long? That seems odd to me. When would you need to address more than 2 billion files in a single combined scan task?

Copy link
Contributor Author

@stevenzwu stevenzwu Nov 1, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

integer would certainly be sufficient. I was using long to match the type in RecordAndPosition from flink-connector-files module. Looking at it again. The long offset from Flink's RecordAndPosition actually meant byte offset within a file. I will define our own RecordAndPosition and change fileOffset to int type.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as another comment. will update

fileTasksIterator.next();
}
updateCurrentIterator();
// skip records within the file
for (long i = 0; i < startingPosition.recordOffset(); ++i) {
if (hasNext()) {
next();
} else {
throw new IllegalStateException("Not enough records to skip: " +
startingPosition.recordOffset());
}
}
this.position.update(startingPosition.fileOffset(), startingPosition.recordOffset());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can position be final since this is using update?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, will change Position to final

}

@Override
public boolean hasNext() {
updateCurrentIterator();
Expand All @@ -62,18 +92,24 @@ public boolean hasNext() {
@Override
public T next() {
updateCurrentIterator();
position.advanceRecord();
return currentIterator.next();
}

public boolean isCurrentIteratorDone() {
return !currentIterator.hasNext();
}

/**
* Updates the current iterator field to ensure that the current Iterator
* is not exhausted.
*/
private void updateCurrentIterator() {
try {
while (!currentIterator.hasNext() && tasks.hasNext()) {
while (!currentIterator.hasNext() && fileTasksIterator.hasNext()) {
currentIterator.close();
currentIterator = openTaskIterator(tasks.next());
currentIterator = openTaskIterator(fileTasksIterator.next());
position.advanceFile();
}
} catch (IOException e) {
throw new UncheckedIOException(e);
Expand All @@ -88,6 +124,10 @@ private CloseableIterator<T> openTaskIterator(FileScanTask scanTask) {
public void close() throws IOException {
// close the current iterator
currentIterator.close();
tasks = null;
fileTasksIterator = null;
}

public Position position() {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It appears that you are using CheckpointedPosition DS to communicate the position that the iterator has to seek to from outside. However, in order to communicate the current position to the outside world, you are using the internal Position DS. Wondering if we can keep this consistent to be either CheckpointedPosition or the mutable Position?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is a good point. It is also related to your question above. Let me see how to unify them and maybe move away from Flink's CheckpointedPosition

return position;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public FlinkInputSplit[] createInputSplits(int minNumSplits) throws IOException
tableLoader.open();
try (TableLoader loader = tableLoader) {
Table table = loader.loadTable();
return FlinkSplitGenerator.createInputSplits(table, context);
return FlinkSplitPlanner.planInputSplits(table, context);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,33 +22,58 @@
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
import org.apache.flink.annotation.Internal;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;

class FlinkSplitGenerator {
private FlinkSplitGenerator() {
@Internal
public class FlinkSplitPlanner {
private FlinkSplitPlanner() {
}

static FlinkInputSplit[] createInputSplits(Table table, ScanContext context) {
List<CombinedScanTask> tasks = tasks(table, context);
FlinkInputSplit[] splits = new FlinkInputSplit[tasks.size()];
for (int i = 0; i < tasks.size(); i++) {
splits[i] = new FlinkInputSplit(i, tasks.get(i));
static FlinkInputSplit[] planInputSplits(Table table, ScanContext context) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why change the name of this method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

create/generate implies creating sth new. This is actually plan/discover splits from table. Hence changed the method name. I actually also renamed the class name from FlinkSplitGenerator to FlinkSplitPlanner. This is an internal class. So it shouldn't break user code.

try (CloseableIterable<CombinedScanTask> tasksIterable = planTasks(table, context)) {
List<CombinedScanTask> tasks = Lists.newArrayList(tasksIterable);
FlinkInputSplit[] splits = new FlinkInputSplit[tasks.size()];
for (int i = 0; i < tasks.size(); i++) {
splits[i] = new FlinkInputSplit(i, tasks.get(i));
}
return splits;
} catch (IOException e) {
throw new UncheckedIOException("Failed to process tasks iterable", e);
}
}

/**
* This returns splits for the FLIP-27 source
*/
public static List<IcebergSourceSplit> planIcebergSourceSplits(
Copy link
Member

@openinx openinx Sep 9, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a javadoc (Or replace it with a more clear name) to indicate why do we need to add an extra planIcebergSourceSplits (compared to createInputSplits) ? Seems it's not easy to identify the difference between the name 'InputSplits' and name 'IcebergSourceSplits'. I think it's used for implementing the flip27's SourceSplit, right ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's good to align with the createInputSplits by naming this as createIcebergSourceSplits.

Copy link
Contributor Author

@stevenzwu stevenzwu Sep 10, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, I think we should rename createInputSplits to planFlinkInputSplit. We are not creating splits out of nowhere. Both are just discover/plan splits from table by calling the same planTasks. I can add some javadoc on the new planIcebergSourceSplits method.

Since createInputSplits is non-public, we should be safe to rename.

Table table, ScanContext context) {
try (CloseableIterable<CombinedScanTask> tasksIterable = planTasks(table, context)) {
List<IcebergSourceSplit> splits = Lists.newArrayList();
tasksIterable.forEach(task -> splits.add(IcebergSourceSplit.fromCombinedScanTask(task)));
return splits;
} catch (IOException e) {
throw new UncheckedIOException("Failed to process task iterable: ", e);
}
return splits;
}

private static List<CombinedScanTask> tasks(Table table, ScanContext context) {
static CloseableIterable<CombinedScanTask> planTasks(Table table, ScanContext context) {
TableScan scan = table
.newScan()
.caseSensitive(context.caseSensitive())
.project(context.project());

if (context.includeColumnStats()) {
scan = scan.includeColumnStats();
}

if (context.snapshotId() != null) {
scan = scan.useSnapshot(context.snapshotId());
}
Expand Down Expand Up @@ -83,10 +108,6 @@ private static List<CombinedScanTask> tasks(Table table, ScanContext context) {
}
}

try (CloseableIterable<CombinedScanTask> tasksIterable = scan.planTasks()) {
return Lists.newArrayList(tasksIterable);
} catch (IOException e) {
throw new UncheckedIOException("Failed to close table scan: " + scan, e);
}
return scan.planTasks();
}
}
95 changes: 95 additions & 0 deletions flink/src/main/java/org/apache/iceberg/flink/source/Position.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.flink.source;

import java.io.Serializable;
import java.util.Objects;
import org.apache.flink.annotation.Internal;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;

/**
* A mutable class that defines the read position
* <ul>
* <li>file offset in the list of files in a {@link CombinedScanTask}</li>
* <li>record offset within a file</li>
* </ul>
*/
@Internal
public class Position implements Serializable {

private static final long serialVersionUID = 1L;

private int fileOffset;
private long recordOffset;

public Position(int fileOffset, long recordOffset) {
this.fileOffset = fileOffset;
this.recordOffset = recordOffset;
}

void advanceFile() {
this.fileOffset += 1;
this.recordOffset = 0L;
}

void advanceRecord() {
this.recordOffset += 1L;
}

public void update(int newFileOffset, long newRecordOffset) {
this.fileOffset = newFileOffset;
this.recordOffset = newRecordOffset;
}

public int fileOffset() {
return fileOffset;
}

public long recordOffset() {
return recordOffset;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final Position that = (Position) o;
return Objects.equals(fileOffset, that.fileOffset) &&
Objects.equals(recordOffset, that.recordOffset);
}

@Override
public int hashCode() {
return Objects.hash(fileOffset, recordOffset);
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("fileOffset", fileOffset)
.add("recordOffset", recordOffset)
.toString();
}
}
Loading