Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions core/src/main/java/org/apache/iceberg/TableProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -158,4 +158,7 @@ private TableProperties() {

public static final String UPDATE_MODE = "write.update.mode";
public static final String UPDATE_MODE_DEFAULT = "copy-on-write";

public static final String EQUALITY_FIELD_COLUMNS = "equality.field.columns";
public static final String DEFAULT_EQUALITY_FIELD_COLUMNS = null;
}
42 changes: 38 additions & 4 deletions flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@
import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.factories.TableFactory;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.util.StringUtils;
import org.apache.iceberg.CachingCatalog;
import org.apache.iceberg.DataFile;
Expand All @@ -60,6 +62,7 @@
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.UpdateProperties;
import org.apache.iceberg.catalog.Catalog;
Expand All @@ -70,11 +73,14 @@
import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.base.Splitter;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.PropertyUtil;

/**
* A Flink Catalog implementation that wraps an Iceberg {@link Catalog}.
Expand Down Expand Up @@ -359,6 +365,13 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig
PartitionSpec spec = toPartitionSpec(((CatalogTable) table).getPartitionKeys(), icebergSchema);

ImmutableMap.Builder<String, String> properties = ImmutableMap.builder();

// Set the equality field columns.
List<String> equalityFieldColumns = toEqualityColumns(table.getSchema());
if (!equalityFieldColumns.isEmpty()) {
properties.put(TableProperties.EQUALITY_FIELD_COLUMNS, Joiner.on(',').join(equalityFieldColumns));
}

String location = null;
for (Map.Entry<String, String> entry : table.getOptions().entrySet()) {
if ("location".equalsIgnoreCase(entry.getKey())) {
Expand Down Expand Up @@ -447,10 +460,12 @@ private static void validateFlinkTable(CatalogBaseTable table) {
if (!schema.getWatermarkSpecs().isEmpty()) {
throw new UnsupportedOperationException("Creating table with watermark specs is not supported yet.");
}
}

if (schema.getPrimaryKey().isPresent()) {
throw new UnsupportedOperationException("Creating table with primary key is not supported yet.");
}
private static List<String> toEqualityColumns(TableSchema schema) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the format, I think that equality columns should be tracked by ID rather than by name so that renaming columns doesn't break the primary key metadata.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it should track ID here. I was thinking that it could be a temporary solution before introducing the primary key specification. Considering the renaming issue, it seems not a good idea to track column names in iceberg properties.

I will create another PR to introduce the primary key specification based on the discussion from here.

List<String> equalityColumns = Lists.newArrayList();
schema.getPrimaryKey().ifPresent(uniqueConstraint -> equalityColumns.addAll(uniqueConstraint.getColumns()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like we should track primary key columns in the table format rather than in properties. If we are going to add primary key columns, then we should add it before v2 so that v2 writers are required to not drop it.

return equalityColumns;
}

private static PartitionSpec toPartitionSpec(List<String> partitionKeys, Schema icebergSchema) {
Expand Down Expand Up @@ -516,7 +531,26 @@ private static void commitChanges(Table table, String setLocation, String setSna
}

static CatalogTable toCatalogTable(Table table) {
TableSchema schema = FlinkSchemaUtil.toSchema(FlinkSchemaUtil.convert(table.schema()));
TableSchema.Builder builder = TableSchema.builder();

// Add the table columns.
RowType rowType = FlinkSchemaUtil.convert(table.schema());
for (RowType.RowField field : rowType.getFields()) {
builder.field(field.getName(), TypeConversions.fromLogicalToDataType(field.getType()));
}

// Add the primary keys.
String concatColumns = PropertyUtil.propertyAsString(table.properties(),
TableProperties.EQUALITY_FIELD_COLUMNS,
TableProperties.DEFAULT_EQUALITY_FIELD_COLUMNS);
String[] columns = Splitter.on(',').splitToList(concatColumns).toArray(new String[0]);
if (columns.length > 0) {
builder.primaryKey(columns);
}

// Build the table schema.
TableSchema schema = builder.build();

List<String> partitionKeys = toPartitionKeys(table.spec(), table.schema());

// NOTE: We can not create a IcebergCatalogTable extends CatalogTable, because Flink optimizer may use
Expand Down
12 changes: 12 additions & 0 deletions flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
Expand All @@ -41,6 +43,7 @@
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.io.WriteResult;
Expand Down Expand Up @@ -200,6 +203,15 @@ public DataStreamSink<RowData> build() {

// Find out the equality field id list based on the user-provided equality field column names.
List<Integer> equalityFieldIds = Lists.newArrayList();
if (equalityFieldColumns == null) {
String concatColumns = PropertyUtil.propertyAsString(table.properties(),
TableProperties.EQUALITY_FIELD_COLUMNS,
TableProperties.DEFAULT_EQUALITY_FIELD_COLUMNS);
String[] columns = StringUtils.split(concatColumns, ",");
if (columns != null && columns.length > 0) {
equalityFieldColumns = Arrays.asList(columns);
}
}
if (equalityFieldColumns != null && equalityFieldColumns.size() > 0) {
for (String column : equalityFieldColumns) {
org.apache.iceberg.types.Types.NestedField field = table.schema().findField(column);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@

import java.io.IOException;
import java.util.Map;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.util.ArrayUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.iceberg.CatalogProperties;
Expand Down Expand Up @@ -135,4 +140,18 @@ static String toWithClause(Map<String, String> props) {
builder.append(")");
return builder.toString();
}

static TableEnvironment createTableEnv(boolean isStreamingJob) {
EnvironmentSettings.Builder settingsBuilder = EnvironmentSettings
.newInstance()
.useBlinkPlanner();
if (isStreamingJob) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.enableCheckpointing(400);
return StreamTableEnvironment.create(env, settingsBuilder.inStreamingMode().build());
} else {
return TableEnvironment.create(settingsBuilder.inBatchMode().build());
}
}
}
27 changes: 26 additions & 1 deletion flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -98,6 +99,21 @@ public static Record createRecord(Integer id, String data) {
return record;
}

private static final Map<String, RowKind> ROW_KIND_MAP = ImmutableMap.of(
"+I", RowKind.INSERT,
"-D", RowKind.DELETE,
"-U", RowKind.UPDATE_BEFORE,
"+U", RowKind.UPDATE_AFTER);

public static Row createRow(String rowKind, int id, String data) {
RowKind kind = ROW_KIND_MAP.get(rowKind);
if (kind == null) {
throw new IllegalArgumentException("Unknown row kind: " + rowKind);
}

return Row.ofKind(kind, id, data);
}

public static RowData createRowData(Integer id, String data) {
return GenericRowData.of(id, StringData.fromString(data));
}
Expand Down Expand Up @@ -207,9 +223,18 @@ public static StructLikeSet expectedRowSet(Table table, Record... records) {
}

public static StructLikeSet actualRowSet(Table table, String... columns) throws IOException {
table.refresh();
return actualRowSet(table, table.currentSnapshot().snapshotId(), columns);
}

public static StructLikeSet actualRowSet(Table table, long snapshotId, String... columns) throws IOException {
table.refresh();
StructLikeSet set = StructLikeSet.create(table.schema().asStruct());
try (CloseableIterable<Record> reader = IcebergGenerics.read(table).select(columns).build()) {
try (CloseableIterable<Record> reader = IcebergGenerics
.read(table)
.useSnapshot(snapshotId)
.select(columns)
.build()) {
reader.forEach(set::add);
}
return set;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableColumn;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.constraints.UniqueConstraint;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
Expand All @@ -42,10 +44,14 @@
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.PropertyUtil;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
Expand Down Expand Up @@ -158,6 +164,31 @@ public void testCreatePartitionTable() throws TableNotExistException {
Assert.assertEquals(Collections.singletonList("dt"), catalogTable.getPartitionKeys());
}

@Test
public void testCreateTableWithPrimaryKey() throws Exception {
sql("CREATE TABLE tl(id BIGINT, dt STRING, PRIMARY KEY(id, dt) NOT ENFORCED)");

Table table = table("tl");
Assert.assertEquals(new Schema(
Types.NestedField.required(1, "id", Types.LongType.get()),
Types.NestedField.required(2, "dt", Types.StringType.get())
).asStruct(), table.schema().asStruct());
Assert.assertEquals("Should have the expected primary keys", "id,dt",
PropertyUtil.propertyAsString(table.properties(),
TableProperties.EQUALITY_FIELD_COLUMNS,
TableProperties.DEFAULT_EQUALITY_FIELD_COLUMNS));

CatalogTable catalogTable = catalogTable("tl");
Assert.assertTrue(catalogTable.getSchema().getTableColumn("id").isPresent());
Assert.assertTrue(catalogTable.getSchema().getTableColumn("dt").isPresent());
Assert.assertEquals(ImmutableMap.of(TableProperties.EQUALITY_FIELD_COLUMNS, "id,dt"), catalogTable.getOptions());

Optional<UniqueConstraint> constraint = catalogTable.getSchema().getPrimaryKey();
Assert.assertTrue("Should have an unique constraint.", constraint.isPresent());
Assert.assertEquals("Should have the expected primary key columns", ImmutableList.of("id", "dt"),
constraint.get().getColumns());
}

@Test
public void testLoadTransformPartitionTable() throws TableNotExistException {
Schema schema = new Schema(Types.NestedField.optional(0, "id", Types.LongType.get()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,8 @@
package org.apache.iceberg.flink;

import java.util.List;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Expressions;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
Expand Down Expand Up @@ -71,19 +67,7 @@ public TestFlinkTableSink(String catalogName, String[] baseNamespace, FileFormat
protected TableEnvironment getTableEnv() {
if (tEnv == null) {
synchronized (this) {
EnvironmentSettings.Builder settingsBuilder = EnvironmentSettings
.newInstance()
.useBlinkPlanner();
if (isStreamingJob) {
settingsBuilder.inStreamingMode();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.enableCheckpointing(400);
tEnv = StreamTableEnvironment.create(env, settingsBuilder.build());
} else {
settingsBuilder.inBatchMode();
tEnv = TableEnvironment.create(settingsBuilder.build());
}
tEnv = createTableEnv(isStreamingJob);
}
}
return tEnv;
Expand Down
Loading