diff --git a/build.gradle b/build.gradle index 447f7f1e3f63..1a2bc82fabc0 100644 --- a/build.gradle +++ b/build.gradle @@ -140,6 +140,8 @@ subprojects { } }) + maxHeapSize = "1500m" + testLogging { events "failed" exceptionFormat "full" diff --git a/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java b/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java index 46f997e4e7e1..0d927fb4ac18 100644 --- a/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java +++ b/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java @@ -115,6 +115,11 @@ protected BaseEqualityDeltaWriter(StructLike partition, Schema schema, Schema de */ protected abstract StructLike asStructLike(T data); + /** + * Wrap the passed in key of a row as a {@link StructLike} + */ + protected abstract StructLike asStructLikeKey(T key); + public void write(T row) throws IOException { PathOffset pathOffset = PathOffset.of(dataWriter.currentPath(), dataWriter.currentRows()); @@ -167,7 +172,7 @@ public void delete(T row) throws IOException { * @param key is the projected data whose columns are the same as the equality fields. */ public void deleteKey(T key) throws IOException { - if (!internalPosDelete(asStructLike(key))) { + if (!internalPosDelete(asStructLikeKey(key))) { eqDeleteWriter.write(key); } } diff --git a/data/src/test/java/org/apache/iceberg/io/TestTaskEqualityDeltaWriter.java b/data/src/test/java/org/apache/iceberg/io/TestTaskEqualityDeltaWriter.java index f373b3a9d3d2..82c194436b85 100644 --- a/data/src/test/java/org/apache/iceberg/io/TestTaskEqualityDeltaWriter.java +++ b/data/src/test/java/org/apache/iceberg/io/TestTaskEqualityDeltaWriter.java @@ -461,6 +461,7 @@ public void delete(Record row) throws IOException { deltaWriter.delete(row); } + // The caller of this function is responsible for passing in a record with only the key fields public void deleteKey(Record key) throws IOException { deltaWriter.deleteKey(key); } @@ -479,6 +480,11 @@ private GenericEqualityDeltaWriter(PartitionKey partition, Schema schema, Schema protected StructLike asStructLike(Record row) { return row; } + + @Override + protected StructLike asStructLikeKey(Record data) { + return data; + } } } diff --git a/flink/v1.12/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java b/flink/v1.12/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java index 8415129db9a7..9b5d01c207f7 100644 --- a/flink/v1.12/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java +++ b/flink/v1.12/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java @@ -103,5 +103,10 @@ protected class RowDataDeltaWriter extends BaseEqualityDeltaWriter { protected StructLike asStructLike(RowData data) { return wrapper.wrap(data); } + + @Override + protected StructLike asStructLikeKey(RowData data) { + throw new UnsupportedOperationException("Not implemented for Flink 1.12 during PR review"); + } } } diff --git a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java b/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java index 8415129db9a7..fad94863a6a2 100644 --- a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java +++ b/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java @@ -103,5 +103,10 @@ protected class RowDataDeltaWriter extends BaseEqualityDeltaWriter { protected StructLike asStructLike(RowData data) { return wrapper.wrap(data); } + + @Override + protected StructLike asStructLikeKey(RowData data) { + throw new UnsupportedOperationException("Not implemented for Flink 1.13 during PR review"); + } } } diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java index 8415129db9a7..16262b22e99c 100644 --- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java +++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java @@ -28,7 +28,9 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.StructLike; +import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.RowDataWrapper; +import org.apache.iceberg.flink.data.RowDataProjection; import org.apache.iceberg.io.BaseTaskWriter; import org.apache.iceberg.io.FileAppenderFactory; import org.apache.iceberg.io.FileIO; @@ -41,6 +43,8 @@ abstract class BaseDeltaTaskWriter extends BaseTaskWriter { private final Schema schema; private final Schema deleteSchema; private final RowDataWrapper wrapper; + private final RowDataWrapper keyWrapper; + private final RowDataProjection keyProjection; private final boolean upsert; BaseDeltaTaskWriter(PartitionSpec spec, @@ -57,6 +61,8 @@ abstract class BaseDeltaTaskWriter extends BaseTaskWriter { this.schema = schema; this.deleteSchema = TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds)); this.wrapper = new RowDataWrapper(flinkSchema, schema.asStruct()); + this.keyWrapper = new RowDataWrapper(FlinkSchemaUtil.convert(deleteSchema), deleteSchema.asStruct()); + this.keyProjection = RowDataProjection.create(schema, deleteSchema); this.upsert = upsert; } @@ -74,7 +80,7 @@ public void write(RowData row) throws IOException { case INSERT: case UPDATE_AFTER: if (upsert) { - writer.delete(row); + writer.deleteKey(keyProjection.wrap(row)); } writer.write(row); break; @@ -103,5 +109,10 @@ protected class RowDataDeltaWriter extends BaseEqualityDeltaWriter { protected StructLike asStructLike(RowData data) { return wrapper.wrap(data); } + + @Override + protected StructLike asStructLikeKey(RowData data) { + return keyWrapper.wrap(data); + } } } diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java index 2849100858a1..5144305029f4 100644 --- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java +++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java @@ -35,6 +35,8 @@ import org.apache.iceberg.io.TaskWriter; import org.apache.iceberg.io.UnpartitionedWriter; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.util.ArrayUtil; public class RowDataTaskWriterFactory implements TaskWriterFactory { @@ -69,8 +71,13 @@ public RowDataTaskWriterFactory(Table table, if (equalityFieldIds == null || equalityFieldIds.isEmpty()) { this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, table.properties(), spec); + } else if (upsert) { + // In upsert mode, only the new row is emitted using INSERT row kind. Therefore, any column of the inserted row + // may differ from the deleted row other than the primary key fields, and the delete file must contain values + // that are correct for the deleted row. Therefore, only write the equality delete fields. + this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, table.properties(), spec, + ArrayUtil.toIntArray(equalityFieldIds), TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds)), null); } else { - // TODO provide the ability to customize the equality-delete row schema. this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, table.properties(), spec, ArrayUtil.toIntArray(equalityFieldIds), schema, null); } diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java new file mode 100644 index 000000000000..6ec35e2ff169 --- /dev/null +++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink; + +import java.time.LocalDate; +import java.util.List; +import java.util.Map; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.apache.flink.types.Row; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class TestFlinkUpsert extends FlinkCatalogTestBase { + + @ClassRule + public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = + MiniClusterResource.createWithClassloaderCheckDisabled(); + + @ClassRule + public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + private final boolean isStreamingJob; + private final Map tableUpsertProps = Maps.newHashMap(); + private TableEnvironment tEnv; + + public TestFlinkUpsert(String catalogName, Namespace baseNamespace, FileFormat format, Boolean isStreamingJob) { + super(catalogName, baseNamespace); + this.isStreamingJob = isStreamingJob; + tableUpsertProps.put(TableProperties.FORMAT_VERSION, "2"); + tableUpsertProps.put(TableProperties.UPSERT_ENABLED, "true"); + tableUpsertProps.put(TableProperties.DEFAULT_FILE_FORMAT, format.name()); + } + + @Parameterized.Parameters(name = "catalogName={0}, baseNamespace={1}, format={2}, isStreaming={3}") + public static Iterable parameters() { + List parameters = Lists.newArrayList(); + for (FileFormat format : new FileFormat[] {FileFormat.PARQUET, FileFormat.AVRO, FileFormat.ORC}) { + for (Boolean isStreaming : new Boolean[] {true, false}) { + // Only test with one catalog as this is a file operation concern. + // FlinkCatalogTestBase requires the catalog name start with testhadoop if using hadoop catalog. + String catalogName = "testhadoop"; + Namespace baseNamespace = Namespace.of("default"); + parameters.add(new Object[] {catalogName, baseNamespace, format, isStreaming}); + } + } + return parameters; + } + + @Override + protected TableEnvironment getTableEnv() { + if (tEnv == null) { + synchronized (this) { + EnvironmentSettings.Builder settingsBuilder = EnvironmentSettings + .newInstance(); + if (isStreamingJob) { + settingsBuilder.inStreamingMode(); + StreamExecutionEnvironment env = StreamExecutionEnvironment + .getExecutionEnvironment(MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG); + env.enableCheckpointing(400); + env.setMaxParallelism(2); + env.setParallelism(2); + tEnv = StreamTableEnvironment.create(env, settingsBuilder.build()); + } else { + settingsBuilder.inBatchMode(); + tEnv = TableEnvironment.create(settingsBuilder.build()); + } + } + } + return tEnv; + } + + @Override + @Before + public void before() { + super.before(); + sql("CREATE DATABASE IF NOT EXISTS %s", flinkDatabase); + sql("USE CATALOG %s", catalogName); + sql("USE %s", DATABASE); + } + + @Override + @After + public void clean() { + sql("DROP DATABASE IF EXISTS %s", flinkDatabase); + super.clean(); + } + + @Test + public void testUpsertAndQuery() { + String tableName = "test_upsert_query"; + LocalDate dt20220301 = LocalDate.of(2022, 3, 1); + LocalDate dt20220302 = LocalDate.of(2022, 3, 2); + + sql("CREATE TABLE %s(id INT NOT NULL, province STRING NOT NULL, dt DATE, PRIMARY KEY(id,province) NOT ENFORCED) " + + "PARTITIONED BY (province) WITH %s", + tableName, toWithClause(tableUpsertProps)); + + try { + sql("INSERT INTO %s VALUES " + + "(1, 'a', TO_DATE('2022-03-01'))," + + "(2, 'b', TO_DATE('2022-03-01'))," + + "(1, 'b', TO_DATE('2022-03-01'))", + tableName); + + sql("INSERT INTO %s VALUES " + + "(4, 'a', TO_DATE('2022-03-02'))," + + "(5, 'b', TO_DATE('2022-03-02'))," + + "(1, 'b', TO_DATE('2022-03-02'))", + tableName); + + List rowsOn20220301 = Lists.newArrayList(Row.of(2, "b", dt20220301), Row.of(1, "a", dt20220301)); + TestHelpers.assertRows( + sql("SELECT * FROM %s WHERE dt < '2022-03-02'", tableName), + rowsOn20220301); + + List rowsOn20220302 = Lists.newArrayList( + Row.of(1, "b", dt20220302), Row.of(4, "a", dt20220302), Row.of(5, "b", dt20220302)); + TestHelpers.assertRows( + sql("SELECT * FROM %s WHERE dt = '2022-03-02'", tableName), + rowsOn20220302); + + TestHelpers.assertRows( + sql("SELECT * FROM %s", tableName), + Lists.newArrayList(Iterables.concat(rowsOn20220301, rowsOn20220302))); + } finally { + sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName); + } + } + + @Test + public void testPrimaryKeyEqualToPartitionKey() { + // This is an SQL based reproduction of TestFlinkIcebergSinkV2#testUpsertOnDataKey + String tableName = "upsert_on_data_key"; + try { + sql("CREATE TABLE %s(id INT NOT NULL, data STRING NOT NULL, PRIMARY KEY(data) NOT ENFORCED) " + + "PARTITIONED BY (data) WITH %s", + tableName, toWithClause(tableUpsertProps)); + + sql("INSERT INTO %s VALUES " + + "(1, 'aaa')," + + "(2, 'aaa')," + + "(3, 'bbb')", + tableName); + + TestHelpers.assertRows( + sql("SELECT * FROM %s", tableName), + Lists.newArrayList(Row.of(2, "aaa"), Row.of(3, "bbb"))); + + sql("INSERT INTO %s VALUES " + + "(4, 'aaa')," + + "(5, 'bbb')", + tableName); + + TestHelpers.assertRows( + sql("SELECT * FROM %s", tableName), + Lists.newArrayList(Row.of(4, "aaa"), Row.of(5, "bbb"))); + + sql("INSERT INTO %s VALUES " + + "(6, 'aaa')," + + "(7, 'bbb')", + tableName); + + TestHelpers.assertRows( + sql("SELECT * FROM %s", tableName), + Lists.newArrayList(Row.of(6, "aaa"), Row.of(7, "bbb"))); + } finally { + sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName); + } + } + + @Test + public void testPrimaryKeyFieldsAtBeginningOfSchema() { + String tableName = "upsert_on_pk_at_schema_start"; + LocalDate dt = LocalDate.of(2022, 3, 1); + try { + sql("CREATE TABLE %s(data STRING NOT NULL, dt DATE NOT NULL, id INT, PRIMARY KEY(data,dt) NOT ENFORCED) " + + "PARTITIONED BY (data) WITH %s", + tableName, toWithClause(tableUpsertProps)); + + sql("INSERT INTO %s VALUES " + + "('aaa', TO_DATE('2022-03-01'), 1)," + + "('aaa', TO_DATE('2022-03-01'), 2)," + + "('bbb', TO_DATE('2022-03-01'), 3)", + tableName); + + TestHelpers.assertRows( + sql("SELECT * FROM %s", tableName), + Lists.newArrayList(Row.of("aaa", dt, 2), Row.of("bbb", dt, 3))); + + sql("INSERT INTO %s VALUES " + + "('aaa', TO_DATE('2022-03-01'), 4)," + + "('bbb', TO_DATE('2022-03-01'), 5)", + tableName); + + TestHelpers.assertRows( + sql("SELECT * FROM %s", tableName), + Lists.newArrayList(Row.of("aaa", dt, 4), Row.of("bbb", dt, 5))); + + sql("INSERT INTO %s VALUES " + + "('aaa', TO_DATE('2022-03-01'), 6)," + + "('bbb', TO_DATE('2022-03-01'), 7)", + tableName); + + TestHelpers.assertRows( + sql("SELECT * FROM %s", tableName), + Lists.newArrayList(Row.of("aaa", dt, 6), Row.of("bbb", dt, 7))); + } finally { + sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName); + } + } + + @Test + public void testPrimaryKeyFieldsAtEndOfTableSchema() { + // This is the same test case as testPrimaryKeyFieldsAtBeginningOfSchema, but the primary key fields + // are located at the end of the flink schema. + String tableName = "upsert_on_pk_at_schema_end"; + LocalDate dt = LocalDate.of(2022, 3, 1); + try { + sql("CREATE TABLE %s(id INT, data STRING NOT NULL, dt DATE NOT NULL, PRIMARY KEY(data,dt) NOT ENFORCED) " + + "PARTITIONED BY (data) WITH %s", + tableName, toWithClause(tableUpsertProps)); + + sql("INSERT INTO %s VALUES " + + "(1, 'aaa', TO_DATE('2022-03-01'))," + + "(2, 'aaa', TO_DATE('2022-03-01'))," + + "(3, 'bbb', TO_DATE('2022-03-01'))", + tableName); + + TestHelpers.assertRows( + sql("SELECT * FROM %s", tableName), + Lists.newArrayList(Row.of(2, "aaa", dt), Row.of(3, "bbb", dt))); + + sql("INSERT INTO %s VALUES " + + "(4, 'aaa', TO_DATE('2022-03-01'))," + + "(5, 'bbb', TO_DATE('2022-03-01'))", + tableName); + + TestHelpers.assertRows( + sql("SELECT * FROM %s", tableName), + Lists.newArrayList(Row.of(4, "aaa", dt), Row.of(5, "bbb", dt))); + + sql("INSERT INTO %s VALUES " + + "(6, 'aaa', TO_DATE('2022-03-01'))," + + "(7, 'bbb', TO_DATE('2022-03-01'))", + tableName); + + TestHelpers.assertRows( + sql("SELECT * FROM %s", tableName), + Lists.newArrayList(Row.of(6, "aaa", dt), Row.of(7, "bbb", dt))); + } finally { + sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName); + } + } +}