Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.RowDataWrapper;
import org.apache.iceberg.flink.data.RowDataProjection;
import org.apache.iceberg.io.BaseTaskWriter;
import org.apache.iceberg.io.FileAppenderFactory;
import org.apache.iceberg.io.FileIO;
Expand All @@ -41,6 +43,8 @@ abstract class BaseDeltaTaskWriter extends BaseTaskWriter<RowData> {
private final Schema schema;
private final Schema deleteSchema;
private final RowDataWrapper wrapper;
private final RowDataWrapper keyWrapper;
private final RowDataProjection keyProjection;
private final boolean upsert;

BaseDeltaTaskWriter(PartitionSpec spec,
Expand All @@ -58,6 +62,8 @@ abstract class BaseDeltaTaskWriter extends BaseTaskWriter<RowData> {
this.deleteSchema = TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds));
this.wrapper = new RowDataWrapper(flinkSchema, schema.asStruct());
this.upsert = upsert;
this.keyWrapper = new RowDataWrapper(FlinkSchemaUtil.convert(deleteSchema), deleteSchema.asStruct());
this.keyProjection = RowDataProjection.create(schema, deleteSchema);
}

abstract RowDataDeltaWriter route(RowData row);
Expand All @@ -74,7 +80,7 @@ public void write(RowData row) throws IOException {
case INSERT:
case UPDATE_AFTER:
if (upsert) {
writer.delete(row);
writer.deleteKey(keyProjection.wrap(row));
}
writer.write(row);
break;
Expand Down Expand Up @@ -106,7 +112,7 @@ protected StructLike asStructLike(RowData data) {

@Override
protected StructLike asStructLikeKey(RowData data) {
throw new UnsupportedOperationException("Not implemented for Flink 1.13 during PR review");
return keyWrapper.wrap(data);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import org.apache.iceberg.io.TaskWriter;
import org.apache.iceberg.io.UnpartitionedWriter;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.ArrayUtil;

public class RowDataTaskWriterFactory implements TaskWriterFactory<RowData> {
Expand Down Expand Up @@ -69,8 +71,13 @@ public RowDataTaskWriterFactory(Table table,

if (equalityFieldIds == null || equalityFieldIds.isEmpty()) {
this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, table.properties(), spec);
} else if (upsert) {
// In upsert mode, only the new row is emitted using INSERT row kind. Therefore, any column of the inserted row
// may differ from the deleted row other than the primary key fields, and the delete file must contain values
// that are correct for the deleted row. Therefore, only write the equality delete fields.
this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, table.properties(), spec,
ArrayUtil.toIntArray(equalityFieldIds), TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds)), null);
} else {
// TODO provide the ability to customize the equality-delete row schema.
this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, table.properties(), spec,
ArrayUtil.toIntArray(equalityFieldIds), schema, null);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,287 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.flink;

import java.time.LocalDate;
import java.util.List;
import java.util.Map;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.types.Row;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
public class TestFlinkUpsert extends FlinkCatalogTestBase {

@ClassRule
public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
MiniClusterResource.createWithClassloaderCheckDisabled();

@ClassRule
public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();

private final boolean isStreamingJob;
private final Map<String, String> tableUpsertProps = Maps.newHashMap();
private TableEnvironment tEnv;

public TestFlinkUpsert(String catalogName, Namespace baseNamespace, FileFormat format, Boolean isStreamingJob) {
super(catalogName, baseNamespace);
this.isStreamingJob = isStreamingJob;
tableUpsertProps.put(TableProperties.FORMAT_VERSION, "2");
tableUpsertProps.put(TableProperties.UPSERT_ENABLED, "true");
tableUpsertProps.put(TableProperties.DEFAULT_FILE_FORMAT, format.name());
}

@Parameterized.Parameters(name = "catalogName={0}, baseNamespace={1}, format={2}, isStreaming={3}")
public static Iterable<Object[]> parameters() {
List<Object[]> parameters = Lists.newArrayList();
for (FileFormat format : new FileFormat[] {FileFormat.PARQUET, FileFormat.AVRO, FileFormat.ORC}) {
for (Boolean isStreaming : new Boolean[] {true, false}) {
// Only test with one catalog as this is a file operation concern.
// FlinkCatalogTestBase requires the catalog name start with testhadoop if using hadoop catalog.
String catalogName = "testhadoop";
Namespace baseNamespace = Namespace.of("default");
parameters.add(new Object[] {catalogName, baseNamespace, format, isStreaming});
}
}
return parameters;
}

@Override
protected TableEnvironment getTableEnv() {
if (tEnv == null) {
synchronized (this) {
EnvironmentSettings.Builder settingsBuilder = EnvironmentSettings
.newInstance();
if (isStreamingJob) {
settingsBuilder.inStreamingMode();
StreamExecutionEnvironment env = StreamExecutionEnvironment
.getExecutionEnvironment(MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG);
env.enableCheckpointing(400);
env.setMaxParallelism(2);
env.setParallelism(2);
tEnv = StreamTableEnvironment.create(env, settingsBuilder.build());
} else {
settingsBuilder.inBatchMode();
tEnv = TableEnvironment.create(settingsBuilder.build());
}
}
}
return tEnv;
}

@Override
@Before
public void before() {
super.before();
sql("CREATE DATABASE IF NOT EXISTS %s", flinkDatabase);
sql("USE CATALOG %s", catalogName);
sql("USE %s", DATABASE);
}

@Override
@After
public void clean() {
sql("DROP DATABASE IF EXISTS %s", flinkDatabase);
super.clean();
}

@Test
public void testUpsertAndQuery() {
String tableName = "test_upsert_query";
LocalDate dt20220301 = LocalDate.of(2022, 3, 1);
LocalDate dt20220302 = LocalDate.of(2022, 3, 2);

sql("CREATE TABLE %s(id INT NOT NULL, province STRING NOT NULL, dt DATE, PRIMARY KEY(id,province) NOT ENFORCED) " +
"PARTITIONED BY (province) WITH %s",
tableName, toWithClause(tableUpsertProps));

try {
sql("INSERT INTO %s VALUES " +
"(1, 'a', TO_DATE('2022-03-01'))," +
"(2, 'b', TO_DATE('2022-03-01'))," +
"(1, 'b', TO_DATE('2022-03-01'))",
tableName);

sql("INSERT INTO %s VALUES " +
"(4, 'a', TO_DATE('2022-03-02'))," +
"(5, 'b', TO_DATE('2022-03-02'))," +
"(1, 'b', TO_DATE('2022-03-02'))",
tableName);

List<Row> rowsOn20220301 = Lists.newArrayList(Row.of(2, "b", dt20220301), Row.of(1, "a", dt20220301));
TestHelpers.assertRows(
sql("SELECT * FROM %s WHERE dt < '2022-03-02'", tableName),
rowsOn20220301);

List<Row> rowsOn20220302 = Lists.newArrayList(
Row.of(1, "b", dt20220302), Row.of(4, "a", dt20220302), Row.of(5, "b", dt20220302));
TestHelpers.assertRows(
sql("SELECT * FROM %s WHERE dt = '2022-03-02'", tableName),
rowsOn20220302);

TestHelpers.assertRows(
sql("SELECT * FROM %s", tableName),
Lists.newArrayList(Iterables.concat(rowsOn20220301, rowsOn20220302)));
} finally {
sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
}
}

@Test
public void testPrimaryKeyEqualToPartitionKey() {
// This is an SQL based reproduction of TestFlinkIcebergSinkV2#testUpsertOnDataKey
String tableName = "upsert_on_data_key";
try {
sql("CREATE TABLE %s(id INT NOT NULL, data STRING NOT NULL, PRIMARY KEY(data) NOT ENFORCED) " +
"PARTITIONED BY (data) WITH %s",
tableName, toWithClause(tableUpsertProps));

sql("INSERT INTO %s VALUES " +
"(1, 'aaa')," +
"(2, 'aaa')," +
"(3, 'bbb')",
tableName);

TestHelpers.assertRows(
sql("SELECT * FROM %s", tableName),
Lists.newArrayList(Row.of(2, "aaa"), Row.of(3, "bbb")));

sql("INSERT INTO %s VALUES " +
"(4, 'aaa')," +
"(5, 'bbb')",
tableName);

TestHelpers.assertRows(
sql("SELECT * FROM %s", tableName),
Lists.newArrayList(Row.of(4, "aaa"), Row.of(5, "bbb")));

sql("INSERT INTO %s VALUES " +
"(6, 'aaa')," +
"(7, 'bbb')",
tableName);

TestHelpers.assertRows(
sql("SELECT * FROM %s", tableName),
Lists.newArrayList(Row.of(6, "aaa"), Row.of(7, "bbb")));
} finally {
sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
}
}

@Test
public void testPrimaryKeyFieldsAtBeginningOfSchema() {
String tableName = "upsert_on_pk_at_schema_start";
LocalDate dt = LocalDate.of(2022, 3, 1);
try {
sql("CREATE TABLE %s(data STRING NOT NULL, dt DATE NOT NULL, id INT, PRIMARY KEY(data,dt) NOT ENFORCED) " +
"PARTITIONED BY (data) WITH %s",
tableName, toWithClause(tableUpsertProps));

sql("INSERT INTO %s VALUES " +
"('aaa', TO_DATE('2022-03-01'), 1)," +
"('aaa', TO_DATE('2022-03-01'), 2)," +
"('bbb', TO_DATE('2022-03-01'), 3)",
tableName);

TestHelpers.assertRows(
sql("SELECT * FROM %s", tableName),
Lists.newArrayList(Row.of("aaa", dt, 2), Row.of("bbb", dt, 3)));

sql("INSERT INTO %s VALUES " +
"('aaa', TO_DATE('2022-03-01'), 4)," +
"('bbb', TO_DATE('2022-03-01'), 5)",
tableName);

TestHelpers.assertRows(
sql("SELECT * FROM %s", tableName),
Lists.newArrayList(Row.of("aaa", dt, 4), Row.of("bbb", dt, 5)));

sql("INSERT INTO %s VALUES " +
"('aaa', TO_DATE('2022-03-01'), 6)," +
"('bbb', TO_DATE('2022-03-01'), 7)",
tableName);

TestHelpers.assertRows(
sql("SELECT * FROM %s", tableName),
Lists.newArrayList(Row.of("aaa", dt, 6), Row.of("bbb", dt, 7)));
} finally {
sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
}
}

@Test
public void testPrimaryKeyFieldsAtEndOfTableSchema() {
// This is the same test case as testPrimaryKeyFieldsAtBeginningOfSchema, but the primary key fields
// are located at the end of the flink schema.
String tableName = "upsert_on_pk_at_schema_end";
LocalDate dt = LocalDate.of(2022, 3, 1);
try {
sql("CREATE TABLE %s(id INT, data STRING NOT NULL, dt DATE NOT NULL, PRIMARY KEY(data,dt) NOT ENFORCED) " +
"PARTITIONED BY (data) WITH %s",
tableName, toWithClause(tableUpsertProps));

sql("INSERT INTO %s VALUES " +
"(1, 'aaa', TO_DATE('2022-03-01'))," +
"(2, 'aaa', TO_DATE('2022-03-01'))," +
"(3, 'bbb', TO_DATE('2022-03-01'))",
tableName);

TestHelpers.assertRows(
sql("SELECT * FROM %s", tableName),
Lists.newArrayList(Row.of(2, "aaa", dt), Row.of(3, "bbb", dt)));

sql("INSERT INTO %s VALUES " +
"(4, 'aaa', TO_DATE('2022-03-01'))," +
"(5, 'bbb', TO_DATE('2022-03-01'))",
tableName);

TestHelpers.assertRows(
sql("SELECT * FROM %s", tableName),
Lists.newArrayList(Row.of(4, "aaa", dt), Row.of(5, "bbb", dt)));

sql("INSERT INTO %s VALUES " +
"(6, 'aaa', TO_DATE('2022-03-01'))," +
"(7, 'bbb', TO_DATE('2022-03-01'))",
tableName);

TestHelpers.assertRows(
sql("SELECT * FROM %s", tableName),
Lists.newArrayList(Row.of(6, "aaa", dt), Row.of(7, "bbb", dt)));
} finally {
sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
}
}
}