Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 29 additions & 3 deletions flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import org.apache.flink.api.common.functions.MapFunction;
Expand All @@ -44,6 +45,7 @@
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.PropertyUtil;

Expand Down Expand Up @@ -113,6 +115,7 @@ public static class Builder {
private TableSchema tableSchema;
private boolean overwrite = false;
private Integer writeParallelism = null;
private List<String> equalityFieldColumns = null;

private Builder() {
}
Expand Down Expand Up @@ -169,6 +172,17 @@ public Builder writeParallelism(int newWriteParallelism) {
return this;
}

/**
* Configuring the equality field columns for iceberg table that accept CDC or UPSERT events.
*
* @param columns defines the iceberg table's key.
* @return {@link Builder} to connect the iceberg table.
*/
public Builder equalityFieldColumns(List<String> columns) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think that we should consider adding primary key columns to the spec?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the next PR https://github.com/openinx/incubator-iceberg/commit/a863c66eb3d72dd975ea64c75ed2ac35984c17fe, The flink table SQL's primary key will act as the equality field columns. The semantic of iceberg equality columns is almost the same as primary key, one difference I can think of is: the uniqueness of key are not enforced. In this discussion, we don't guarantee the uniqueness when writing a key which has been also wrote in the previous committed txn, that means if :

Txn-1:  INSERT key1,  txn commit; 
Txn-2:  INSERT key1,  txn commit;

Then the table will have two records with the same key.

If people really need iceberg to maintain the key's uniqueness, then they will need to transform all the INSERT to UPSERT, which means DELETE firstly and then INSERT the new values.

It will introduce another issues: Each INSERT will be regarded as an UPSERT, so it write a DELETE and a INSERT. Finally the size of delete files will be almost same as the size of data files. The process of merging on read will be quite inefficient because there are too many useless DELETE to JOIN.

The direct way is using bloom filter to reduce the useless DELETE, say we will generate bloom filter binary for each committed data file. When bootstrap the flink/spark job we will need to prefetch all the bloom filter binary from parquet/avro data files's metadata. Before writing a equality delete, we will check the bloom filter, and if the bloom filter indicate that all the committed data files are not containing the given key, then we could skip to append that equality-delete. That would reduce lots of useless DELETE in delete files. Of course, the bloom filter will have 'false positive' issue, but that probability is less than 1%, that means we may append
small amout of deletes whose keys don't exist in the current table. In my view, that should be OK.

In summary, I think it's reasonable to regard those equality fields as primary key in iceberg table, people could choose to use UNIQUENESS ENFORCED or UNIQUENESS NOT-ENFORCED, in this way they could trade off between strong semantic and performance.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the bloom filter idea, @wangmiao1981 has been working on a proposal for secondary indexes. I think that could be used for the check you're suggesting here.

people could choose to use UNIQUENESS ENFORCED or UNIQUENESS NOT-ENFORCED, in this way they could trade off between strong semantic and performance.

Are you saying that if uniqueness is enforced, each insert becomes an upsert. But if uniqueness is not enforced, then the sink would assume that whatever is emitting records will correctly delete before inserting? That sounds reasonable to me.

Finally the size of delete files will be almost same as the size of data files. The process of merging on read will be quite inefficient because there are too many useless DELETE to JOIN.

I think that even if uniqueness is not enforced, tables will quickly require compaction to rewrite the equality deletes. I think we should spend some time making sure that we have good ways to maintain tables and compact equality deletes into position deletes, and position deletes into data files.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you saying that if uniqueness is enforced, each insert becomes an upsert. But if uniqueness is not enforced, then the sink would assume that whatever is emitting records will correctly delete before inserting?

Yes. If someone are exporting relational database's change log events to apache iceberg table and they could guarantee the exactly-once semantics (For example, the flink-cdc-connector could guarantee that), then the uniqueness is always correct when we just write the INSERT/DELETE/UPDATE_BEFORE/UPDATE_AFTER to iceberg. While in some other cases, for example flink aggregate job to refresh the metrics count value, we will write the same key several times without deleting first, then we should regard all the INSERT as UPSERT.

even if uniqueness is not enforced, tables will quickly require compaction to rewrite the equality deletes.

That was planned in the second phase, include:

  1. Use bloom filter to reduce lots of useless deletes;
  2. Minor compaction to convert parts of equality deletes to pos-deletes
  3. Major compaction to eliminate all the deletes.
  4. Make the whole read path & write path more stable. For example, cache policy reduce duplicated delete files loading when merging on read in the same tasks; Spill to disk if the insertedRowMap is exceeding the task's memory threshold, etc. I will evaluate the read & write & compaction paths in a large dataset, making this to be a stable solution for production.

It's good to have a document to collect all those things for reviewing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’d vote for not ensuring uniqueness as it is really hard at scale. If we are to ensure this at write, we have to join the incoming data with the target table making it really expensive. Doing this at read would require sorting the data not only by the sort key but also by the sequence number.

this.equalityFieldColumns = columns;
return this;
}

@SuppressWarnings("unchecked")
public DataStreamSink<RowData> build() {
Preconditions.checkArgument(rowDataInput != null,
Expand All @@ -184,7 +198,18 @@ public DataStreamSink<RowData> build() {
}
}

IcebergStreamWriter<RowData> streamWriter = createStreamWriter(table, tableSchema);
// Find out the equality field id list based on the user-provided equality field column names.
List<Integer> equalityFieldIds = Lists.newArrayList();
if (equalityFieldColumns != null && equalityFieldColumns.size() > 0) {
for (String column : equalityFieldColumns) {
org.apache.iceberg.types.Types.NestedField field = table.schema().findField(column);
Preconditions.checkNotNull(field, "Missing required equality field column '%s' in table schema %s",
column, table.schema());
equalityFieldIds.add(field.fieldId());
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not do this conversion in equalityFieldColumns and keep the column ids in the builder instead of the source column names?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the FlinkSink is an API which will be exposed to flink's DataStream users, the concept of equality field id is harder to understand for those flink users. Equality field column names will be more friendly.


IcebergStreamWriter<RowData> streamWriter = createStreamWriter(table, tableSchema, equalityFieldIds);
IcebergFilesCommitter filesCommitter = new IcebergFilesCommitter(tableLoader, overwrite);

this.writeParallelism = writeParallelism == null ? rowDataInput.getParallelism() : writeParallelism;
Expand All @@ -202,7 +227,8 @@ public DataStreamSink<RowData> build() {
}
}

static IcebergStreamWriter<RowData> createStreamWriter(Table table, TableSchema requestedSchema) {
static IcebergStreamWriter<RowData> createStreamWriter(Table table, TableSchema requestedSchema,
List<Integer> equalityFieldIds) {
Preconditions.checkArgument(table != null, "Iceberg table should't be null");

RowType flinkSchema;
Expand All @@ -226,7 +252,7 @@ static IcebergStreamWriter<RowData> createStreamWriter(Table table, TableSchema

TaskWriterFactory<RowData> taskWriterFactory = new RowDataTaskWriterFactory(table.schema(), flinkSchema,
table.spec(), table.locationProvider(), table.io(), table.encryption(), targetFileSize, fileFormat, props,
null);
equalityFieldIds);

return new IcebergStreamWriter<>(table.name(), taskWriterFactory);
}
Expand Down
17 changes: 17 additions & 0 deletions flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.iceberg.flink;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.flink.table.api.DataTypes;
Expand Down Expand Up @@ -56,6 +57,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.StructLikeSet;
import org.junit.Assert;

import static org.apache.iceberg.hadoop.HadoopOutputFile.fromPath;
Expand Down Expand Up @@ -197,4 +199,19 @@ public static void assertTableRecords(String tablePath, List<Record> expected) t
Preconditions.checkArgument(expected != null, "expected records shouldn't be null");
assertTableRecords(new HadoopTables().load(tablePath), expected);
}

public static StructLikeSet expectedRowSet(Table table, Record... records) {
StructLikeSet set = StructLikeSet.create(table.schema().asStruct());
Collections.addAll(set, records);
return set;
}

public static StructLikeSet actualRowSet(Table table, String... columns) throws IOException {
table.refresh();
StructLikeSet set = StructLikeSet.create(table.schema().asStruct());
try (CloseableIterable<Record> reader = IcebergGenerics.read(table).select(columns).build()) {
reader.forEach(set::add);
}
return set;
}
}
47 changes: 47 additions & 0 deletions flink/src/test/java/org/apache/iceberg/flink/TestTableLoader.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.flink;

import java.io.File;
import org.apache.iceberg.Table;
import org.apache.iceberg.TestTables;

public class TestTableLoader implements TableLoader {
private File dir;

public TestTableLoader(String dir) {
this.dir = new File(dir);
}

@Override
public void open() {

}

@Override
public Table loadTable() {
return TestTables.load(dir, "test");
}

@Override
public void close() {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.stream.Collectors;
Expand All @@ -35,10 +34,9 @@
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.RowDelta;
import org.apache.iceberg.TableTestBase;
import org.apache.iceberg.data.IcebergGenerics;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.flink.SimpleDataUtil;
import org.apache.iceberg.io.TaskWriter;
import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
Expand Down Expand Up @@ -325,17 +323,11 @@ private void commitTransaction(WriteResult result) {
}

private StructLikeSet expectedRowSet(Record... records) {
StructLikeSet set = StructLikeSet.create(table.schema().asStruct());
Collections.addAll(set, records);
return set;
return SimpleDataUtil.expectedRowSet(table, records);
}

private StructLikeSet actualRowSet(String... columns) throws IOException {
StructLikeSet set = StructLikeSet.create(table.schema().asStruct());
try (CloseableIterable<Record> reader = IcebergGenerics.read(table).select(columns).build()) {
reader.forEach(set::add);
}
return set;
return SimpleDataUtil.actualRowSet(table, columns);
}

private TaskWriterFactory<RowData> createTaskWriterFactory(List<Integer> equalityFieldIds) {
Expand Down
Loading