Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -172,4 +172,12 @@ public int workerPoolSize() {
.defaultValue(FlinkConfigOptions.TABLE_EXEC_ICEBERG_WORKER_POOL_SIZE.defaultValue())
.parse();
}

public String branch() {
return confParser
.stringConf()
.option(FlinkWriteOptions.BRANCH.key())
.defaultValue(FlinkWriteOptions.BRANCH.defaultValue())
.parse();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.iceberg.SnapshotRef;

/** Flink sink write options */
public class FlinkWriteOptions {
Expand Down Expand Up @@ -56,4 +57,8 @@ private FlinkWriteOptions() {}
// Overrides the table's write.distribution-mode
public static final ConfigOption<String> DISTRIBUTION_MODE =
ConfigOptions.key("distribution-mode").stringType().noDefaultValue();

// Branch to write to
public static final ConfigOption<String> BRANCH =
ConfigOptions.key("branch").stringType().defaultValue(SnapshotRef.MAIN_BRANCH);
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,7 @@ public static class Builder {
private TableLoader tableLoader;
private Table table;
private TableSchema tableSchema;
private boolean overwrite = false;
private DistributionMode distributionMode = null;
private Integer writeParallelism = null;
private boolean upsert = false;
private List<String> equalityFieldColumns = null;
private String uidPrefix = null;
private final Map<String, String> snapshotProperties = Maps.newHashMap();
Expand Down Expand Up @@ -319,6 +316,11 @@ public Builder setSnapshotProperty(String property, String value) {
return this;
}

public Builder toBranch(String branch) {
writeOptions.put(FlinkWriteOptions.BRANCH.key(), branch);
return this;
}

private <T> DataStreamSink<T> chainIcebergOperators() {
Preconditions.checkArgument(
inputCreator != null,
Expand Down Expand Up @@ -425,7 +427,8 @@ private SingleOutputStreamOperator<Void> appendCommitter(
tableLoader,
flinkWriteConf.overwriteMode(),
snapshotProperties,
flinkWriteConf.workerPoolSize());
flinkWriteConf.workerPoolSize(),
flinkWriteConf.branch());
SingleOutputStreamOperator<Void> committerStream =
writerStream
.transform(operatorName(ICEBERG_FILES_COMMITTER_NAME), Types.VOID, filesCommitter)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
// The completed files cache for current checkpoint. Once the snapshot barrier received, it will
// be flushed to the 'dataFilesPerCheckpoint'.
private final List<WriteResult> writeResultsOfCurrentCkpt = Lists.newArrayList();
private final String branch;

// It will have an unique identifier for one job.
private transient String flinkJobId;
Expand Down Expand Up @@ -125,11 +126,13 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
TableLoader tableLoader,
boolean replacePartitions,
Map<String, String> snapshotProperties,
Integer workerPoolSize) {
Integer workerPoolSize,
String branch) {
this.tableLoader = tableLoader;
this.replacePartitions = replacePartitions;
this.snapshotProperties = snapshotProperties;
this.workerPoolSize = workerPoolSize;
this.branch = branch;
}

@Override
Expand Down Expand Up @@ -179,7 +182,7 @@ public void initializeState(StateInitializationContext context) throws Exception
// it's safe to assign the max committed checkpoint id from restored flink job to the current
// flink job.
this.maxCommittedCheckpointId =
getMaxCommittedCheckpointId(table, restoredFlinkJobId, operatorUniqueId);
getMaxCommittedCheckpointId(table, restoredFlinkJobId, operatorUniqueId, branch);

NavigableMap<Long, byte[]> uncommittedDataFiles =
Maps.newTreeMap(checkpointsState.get().iterator().next())
Expand Down Expand Up @@ -230,7 +233,6 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception {
// the files,
// Besides, we need to maintain the max-committed-checkpoint-id to be increasing.
if (checkpointId > maxCommittedCheckpointId) {
LOG.info("Checkpoint {} completed. Attempting commit.", checkpointId);
commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, operatorUniqueId, checkpointId);
this.maxCommittedCheckpointId = checkpointId;
} else {
Expand Down Expand Up @@ -286,8 +288,6 @@ private void commitPendingResult(
commitDeltaTxn(pendingResults, summary, newFlinkJobId, operatorId, checkpointId);
}
continuousEmptyCheckpoints = 0;
} else {
LOG.info("Skip commit for checkpoint {} due to no data files or delete files.", checkpointId);
}
}

Expand Down Expand Up @@ -386,25 +386,28 @@ private void commitOperation(
String operatorId,
long checkpointId) {
LOG.info(
"Committing {} for checkpoint {} to table {} with summary: {}",
"Committing {} for checkpoint {} to table {} branch {} with summary: {}",
description,
checkpointId,
table.name(),
branch,
summary);
snapshotProperties.forEach(operation::set);
// custom snapshot metadata properties will be overridden if they conflict with internal ones
// used by the sink.
operation.set(MAX_COMMITTED_CHECKPOINT_ID, Long.toString(checkpointId));
operation.set(FLINK_JOB_ID, newFlinkJobId);
operation.set(OPERATOR_ID, operatorId);
operation.toBranch(branch);

long startNano = System.nanoTime();
operation.commit(); // abort is automatically called if this fails.
long durationMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNano);
LOG.info(
"Committed {} to table: {}, checkpointId {} in {} ms",
"Committed {} to table: {}, branch: {}, checkpointId {} in {} ms",
Copy link
Contributor Author

@amogh-jahagirdar amogh-jahagirdar Feb 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@stevenzwu I added this log for branch when backporting, noticed we were missing the branch after the operation actually committed. I'll add this to Flink 1.16 in a separate PR. Let me know your thoughts.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will be fine if you just add it in this PR.

Can you check the diff output and make sure no diff related to this backport?

git diff --no-index  flink/v1.14/flink/src/ flink/v1.16/flink/src
git diff --no-index  flink/v1.15/flink/src/ flink/v1.16/flink/src

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checked, verified there's no diff related to this backport.

description,
table.name(),
branch,
checkpointId,
durationMs);
committerMetrics.commitDuration(durationMs);
Expand Down Expand Up @@ -474,8 +477,9 @@ private static ListStateDescriptor<SortedMap<Long, byte[]>> buildStateDescriptor
return new ListStateDescriptor<>("iceberg-files-committer-state", sortedMapTypeInfo);
}

static long getMaxCommittedCheckpointId(Table table, String flinkJobId, String operatorId) {
Snapshot snapshot = table.currentSnapshot();
static long getMaxCommittedCheckpointId(
Table table, String flinkJobId, String operatorId, String branch) {
Snapshot snapshot = table.snapshot(branch);
long lastCommittedCheckpointId = INITIAL_CHECKPOINT_ID;

while (snapshot != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.data.GenericRecord;
Expand Down Expand Up @@ -206,12 +207,18 @@ private static List<Record> convertToRecords(List<RowData> rows) {
return records;
}

public static void assertTableRows(String tablePath, List<RowData> expected) throws IOException {
assertTableRecords(tablePath, convertToRecords(expected));
public static void assertTableRows(String tablePath, List<RowData> expected, String branch)
throws IOException {
assertTableRecords(tablePath, convertToRecords(expected), branch);
}

public static void assertTableRows(Table table, List<RowData> expected) throws IOException {
assertTableRecords(table, convertToRecords(expected));
assertTableRecords(table, convertToRecords(expected), SnapshotRef.MAIN_BRANCH);
}

public static void assertTableRows(Table table, List<RowData> expected, String branch)
throws IOException {
assertTableRecords(table, convertToRecords(expected), branch);
}

/** Get all rows for a table */
Expand Down Expand Up @@ -267,13 +274,25 @@ public static void assertTableRecords(
}

public static void assertTableRecords(Table table, List<Record> expected) throws IOException {
assertTableRecords(table, expected, SnapshotRef.MAIN_BRANCH);
}

public static void assertTableRecords(Table table, List<Record> expected, String branch)
throws IOException {
table.refresh();
Snapshot snapshot = latestSnapshot(table, branch);

if (snapshot == null) {
Assert.assertEquals(expected, ImmutableList.of());
return;
}

Types.StructType type = table.schema().asStruct();
StructLikeSet expectedSet = StructLikeSet.create(type);
expectedSet.addAll(expected);

try (CloseableIterable<Record> iterable = IcebergGenerics.read(table).build()) {
try (CloseableIterable<Record> iterable =
IcebergGenerics.read(table).useSnapshot(snapshot.snapshotId()).build()) {
StructLikeSet actualSet = StructLikeSet.create(type);

for (Record record : iterable) {
Expand All @@ -284,10 +303,27 @@ public static void assertTableRecords(Table table, List<Record> expected) throws
}
}

// Returns the latest snapshot of the given branch in the table
public static Snapshot latestSnapshot(Table table, String branch) {
// For the main branch, currentSnapshot() is used to validate that the API behavior has
// not changed since that was the API used for validation prior to addition of branches.
if (branch.equals(SnapshotRef.MAIN_BRANCH)) {
return table.currentSnapshot();
}

return table.snapshot(branch);
}

public static void assertTableRecords(String tablePath, List<Record> expected)
throws IOException {
Preconditions.checkArgument(expected != null, "expected records shouldn't be null");
assertTableRecords(new HadoopTables().load(tablePath), expected);
assertTableRecords(new HadoopTables().load(tablePath), expected, SnapshotRef.MAIN_BRANCH);
}

public static void assertTableRecords(String tablePath, List<Record> expected, String branch)
throws IOException {
Preconditions.checkArgument(expected != null, "expected records shouldn't be null");
assertTableRecords(new HadoopTables().load(tablePath), expected, branch);
}

public static StructLikeSet expectedRowSet(Table table, Record... records) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,10 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.util.DataFormatConverters;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.types.Row;
import org.apache.iceberg.AssertHelpers;
Expand All @@ -45,7 +41,6 @@
import org.apache.iceberg.flink.SimpleDataUtil;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.TestFixtures;
import org.apache.iceberg.flink.source.BoundedTestSource;
import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
Expand All @@ -60,7 +55,7 @@
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
public class TestFlinkIcebergSink {
public class TestFlinkIcebergSink extends TestFlinkIcebergSinkBase {

@ClassRule
public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
Expand All @@ -72,13 +67,6 @@ public class TestFlinkIcebergSink {
public final HadoopCatalogResource catalogResource =
new HadoopCatalogResource(TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE);

private static final TypeInformation<Row> ROW_TYPE_INFO =
new RowTypeInfo(SimpleDataUtil.FLINK_SCHEMA.getFieldTypes());
private static final DataFormatConverters.RowConverter CONVERTER =
new DataFormatConverters.RowConverter(SimpleDataUtil.FLINK_SCHEMA.getFieldDataTypes());

private Table table;
private StreamExecutionEnvironment env;
private TableLoader tableLoader;

private final FileFormat format;
Expand Down Expand Up @@ -132,14 +120,6 @@ public void before() throws IOException {
tableLoader = catalogResource.tableLoader();
}

private List<RowData> convertToRowData(List<Row> rows) {
return rows.stream().map(CONVERTER::toInternal).collect(Collectors.toList());
}

private BoundedTestSource<Row> createBoundedSource(List<Row> rows) {
return new BoundedTestSource<>(rows.toArray(new Row[0]));
}

@Test
public void testWriteRowData() throws Exception {
List<Row> rows = Lists.newArrayList(Row.of(1, "hello"), Row.of(2, "world"), Row.of(3, "foo"));
Expand All @@ -160,19 +140,6 @@ public void testWriteRowData() throws Exception {
SimpleDataUtil.assertTableRows(table, convertToRowData(rows));
}

private List<Row> createRows(String prefix) {
return Lists.newArrayList(
Row.of(1, prefix + "aaa"),
Row.of(1, prefix + "bbb"),
Row.of(1, prefix + "ccc"),
Row.of(2, prefix + "aaa"),
Row.of(2, prefix + "bbb"),
Row.of(2, prefix + "ccc"),
Row.of(3, prefix + "aaa"),
Row.of(3, prefix + "bbb"),
Row.of(3, prefix + "ccc"));
}

private void testWriteRow(TableSchema tableSchema, DistributionMode distributionMode)
throws Exception {
List<Row> rows = createRows("");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.flink.sink;

import java.util.List;
import java.util.stream.Collectors;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.util.DataFormatConverters;
import org.apache.flink.types.Row;
import org.apache.iceberg.Table;
import org.apache.iceberg.flink.SimpleDataUtil;
import org.apache.iceberg.flink.source.BoundedTestSource;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;

public class TestFlinkIcebergSinkBase {

protected Table table;
protected StreamExecutionEnvironment env;
protected static final TypeInformation<Row> ROW_TYPE_INFO =
new RowTypeInfo(SimpleDataUtil.FLINK_SCHEMA.getFieldTypes());

protected static final DataFormatConverters.RowConverter CONVERTER =
new DataFormatConverters.RowConverter(SimpleDataUtil.FLINK_SCHEMA.getFieldDataTypes());

protected BoundedTestSource<Row> createBoundedSource(List<Row> rows) {
return new BoundedTestSource<>(rows.toArray(new Row[0]));
}

protected List<Row> createRows(String prefix) {
return Lists.newArrayList(
Row.of(1, prefix + "aaa"),
Row.of(1, prefix + "bbb"),
Row.of(1, prefix + "ccc"),
Row.of(2, prefix + "aaa"),
Row.of(2, prefix + "bbb"),
Row.of(2, prefix + "ccc"),
Row.of(3, prefix + "aaa"),
Row.of(3, prefix + "bbb"),
Row.of(3, prefix + "ccc"));
}

protected List<RowData> convertToRowData(List<Row> rows) {
return rows.stream().map(CONVERTER::toInternal).collect(Collectors.toList());
}
}
Loading