diff --git a/flink/src/main/java/org/apache/iceberg/flink/data/RowDataUtil.java b/flink/src/main/java/org/apache/iceberg/flink/data/RowDataUtil.java index fc0461ba6f84..931880fc360c 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/data/RowDataUtil.java +++ b/flink/src/main/java/org/apache/iceberg/flink/data/RowDataUtil.java @@ -23,9 +23,14 @@ import java.nio.ByteBuffer; import org.apache.avro.generic.GenericData; import org.apache.avro.util.Utf8; +import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.StringData; import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.runtime.typeutils.RowDataSerializer; +import org.apache.flink.table.types.logical.RowType; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.ByteBuffers; @@ -69,4 +74,30 @@ public static Object convertConstant(Type type, Object value) { } return value; } + + /** + * Similar to the private {@link RowDataSerializer#copyRowData(RowData, RowData)} method. + * This skips the check the arity of rowType and from, + * because the from RowData may contains additional column for position deletes. + * Using {@link RowDataSerializer#copy(RowData, RowData)} will fail the arity check. + */ + public static RowData clone(RowData from, RowData reuse, RowType rowType, TypeSerializer[] fieldSerializers) { + GenericRowData ret; + if (reuse instanceof GenericRowData) { + ret = (GenericRowData) reuse; + } else { + ret = new GenericRowData(from.getArity()); + } + ret.setRowKind(from.getRowKind()); + for (int i = 0; i < rowType.getFieldCount(); i++) { + if (!from.isNullAt(i)) { + RowData.FieldGetter getter = RowData.createFieldGetter(rowType.getTypeAt(i), i); + ret.setField(i, fieldSerializers[i].copy(getter.getFieldOrNull(from))); + } else { + ret.setField(i, null); + } + } + return ret; + } + } diff --git a/flink/src/test/java/org/apache/iceberg/flink/TestFixtures.java b/flink/src/test/java/org/apache/iceberg/flink/TestFixtures.java new file mode 100644 index 000000000000..71532c59402c --- /dev/null +++ b/flink/src/test/java/org/apache/iceberg/flink/TestFixtures.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink; + +import org.apache.flink.table.types.logical.RowType; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.types.Types; + +import static org.apache.iceberg.types.Types.NestedField.required; + +public class TestFixtures { + + private TestFixtures() { + + } + + public static final Schema SCHEMA = new Schema( + required(1, "data", Types.StringType.get()), + required(2, "id", Types.LongType.get()), + required(3, "dt", Types.StringType.get())); + + public static final PartitionSpec SPEC = PartitionSpec.builderFor(SCHEMA) + .identity("dt") + .bucket("id", 1) + .build(); + + public static final RowType ROW_TYPE = FlinkSchemaUtil.convert(SCHEMA); + + public static final String DATABASE = "default"; + public static final String TABLE = "t"; + + public static final TableIdentifier TABLE_IDENTIFIER = TableIdentifier.of(DATABASE, TABLE); +} diff --git a/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java b/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java index 62865f5a4990..ca90fd018196 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java +++ b/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java @@ -27,6 +27,7 @@ import java.time.LocalTime; import java.time.OffsetDateTime; import java.util.Collection; +import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.UUID; @@ -34,7 +35,6 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.table.data.ArrayData; import org.apache.flink.table.data.DecimalData; -import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.MapData; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.TimestampData; @@ -47,7 +47,9 @@ import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.types.utils.TypeConversions; import org.apache.flink.types.Row; +import org.apache.iceberg.Schema; import org.apache.iceberg.data.Record; +import org.apache.iceberg.flink.data.RowDataUtil; import org.apache.iceberg.flink.source.FlinkInputFormat; import org.apache.iceberg.flink.source.FlinkInputSplit; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -64,20 +66,7 @@ public static RowData copyRowData(RowData from, RowType rowType) { TypeSerializer[] fieldSerializers = rowType.getChildren().stream() .map((LogicalType type) -> InternalSerializers.create(type)) .toArray(TypeSerializer[]::new); - - // Use rowType field count to avoid copy metadata column in case of merging position deletes - GenericRowData ret = new GenericRowData(rowType.getFieldCount()); - ret.setRowKind(from.getRowKind()); - for (int i = 0; i < rowType.getFieldCount(); i++) { - if (!from.isNullAt(i)) { - RowData.FieldGetter getter = RowData.createFieldGetter(rowType.getTypeAt(i), i); - ret.setField(i, fieldSerializers[i].copy(getter.getFieldOrNull(from))); - } else { - ret.setField(i, null); - } - } - - return ret; + return RowDataUtil.clone(from, null, rowType, fieldSerializers); } public static List readRowData(FlinkInputFormat inputFormat, RowType rowType) throws IOException { @@ -97,15 +86,33 @@ public static List readRowData(FlinkInputFormat inputFormat, RowType ro } public static List readRows(FlinkInputFormat inputFormat, RowType rowType) throws IOException { + return convertRowDataToRow(readRowData(inputFormat, rowType), rowType); + } + + public static List convertRowDataToRow(List rowDataList, RowType rowType) { DataStructureConverter converter = DataStructureConverters.getConverter( TypeConversions.fromLogicalToDataType(rowType)); - - return readRowData(inputFormat, rowType).stream() + return rowDataList.stream() .map(converter::toExternal) .map(Row.class::cast) .collect(Collectors.toList()); } + public static void assertRecords(List results, List expectedRecords, Schema schema) { + List expected = Lists.newArrayList(); + @SuppressWarnings("unchecked") + DataStructureConverter converter = (DataStructureConverter) DataStructureConverters.getConverter( + TypeConversions.fromLogicalToDataType(FlinkSchemaUtil.convert(schema))); + expectedRecords.forEach(r -> expected.add(converter.toExternal(RowDataConverter.convert(schema, r)))); + assertRows(results, expected); + } + + public static void assertRows(List results, List expected) { + expected.sort(Comparator.comparing(Row::toString)); + results.sort(Comparator.comparing(Row::toString)); + Assert.assertEquals(expected, results); + } + public static void assertRowData(Types.StructType structType, LogicalType rowType, Record expectedRecord, RowData actualRowData) { if (expectedRecord == null && actualRowData == null) { diff --git a/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java b/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java index adcaac781181..eae3233a6546 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java +++ b/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java @@ -33,7 +33,6 @@ import org.apache.iceberg.data.RandomGenericData; import org.apache.iceberg.data.Record; import org.apache.iceberg.flink.FlinkSchemaUtil; -import org.apache.iceberg.flink.TableLoader; import org.apache.iceberg.flink.TestHelpers; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Types; @@ -44,7 +43,7 @@ /** * Test {@link FlinkInputFormat}. */ -public class TestFlinkInputFormat extends TestFlinkScan { +public class TestFlinkInputFormat extends TestFlinkSource { public TestFlinkInputFormat(String fileFormat) { super(fileFormat); @@ -55,15 +54,11 @@ public void before() throws IOException { super.before(); } - private TableLoader loader() { - return TableLoader.fromHadoopTable(warehouse + "/default/t"); - } - @Override protected List run( FlinkSource.Builder formatBuilder, Map sqlOptions, String sqlFilter, String... sqlSelectedFields) - throws IOException { - return runFormat(formatBuilder.tableLoader(loader()).buildFormat()); + throws Exception { + return runFormat(formatBuilder.tableLoader(tableLoader()).buildFormat()); } @Test @@ -89,7 +84,10 @@ public void testNestedProjection() throws Exception { TableSchema projectedSchema = TableSchema.builder() .field("nested", DataTypes.ROW(DataTypes.FIELD("f2", DataTypes.STRING()))) .field("data", DataTypes.STRING()).build(); - List result = runFormat(FlinkSource.forRowData().tableLoader(loader()).project(projectedSchema).buildFormat()); + List result = runFormat(FlinkSource.forRowData() + .tableLoader(tableLoader()) + .project(projectedSchema) + .buildFormat()); List expected = Lists.newArrayList(); for (Record record : writeRecords) { @@ -97,7 +95,7 @@ public void testNestedProjection() throws Exception { expected.add(Row.of(nested, record.get(0))); } - assertRows(result, expected); + TestHelpers.assertRows(result, expected); } @Test @@ -117,14 +115,15 @@ public void testBasicProjection() throws IOException { .field("id", DataTypes.BIGINT()) .field("data", DataTypes.STRING()) .build(); - List result = runFormat(FlinkSource.forRowData().tableLoader(loader()).project(projectedSchema).buildFormat()); + List result = runFormat(FlinkSource.forRowData() + .tableLoader(tableLoader()).project(projectedSchema).buildFormat()); List expected = Lists.newArrayList(); for (Record record : writeRecords) { expected.add(Row.of(record.get(0), record.get(1))); } - assertRows(result, expected); + TestHelpers.assertRows(result, expected); } private List runFormat(FlinkInputFormat inputFormat) throws IOException { diff --git a/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormatReaderDeletes.java b/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormatReaderDeletes.java index 50b16167473c..2a593c4702b4 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormatReaderDeletes.java +++ b/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormatReaderDeletes.java @@ -23,87 +23,27 @@ import java.util.Map; import org.apache.flink.table.types.logical.RowType; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.iceberg.BaseTable; import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.FileFormat; -import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; -import org.apache.iceberg.TableMetadata; -import org.apache.iceberg.TableOperations; -import org.apache.iceberg.TableProperties; import org.apache.iceberg.catalog.TableIdentifier; -import org.apache.iceberg.data.DeleteReadTests; import org.apache.iceberg.flink.CatalogLoader; import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.RowDataWrapper; import org.apache.iceberg.flink.TableLoader; import org.apache.iceberg.flink.TestHelpers; -import org.apache.iceberg.hive.HiveCatalog; -import org.apache.iceberg.hive.TestHiveMetastore; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.util.StructLikeSet; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -@RunWith(Parameterized.class) -public class TestFlinkInputFormatReaderDeletes extends DeleteReadTests { - private static HiveConf hiveConf = null; - private static HiveCatalog catalog = null; - private static TestHiveMetastore metastore = null; - - private final FileFormat format; - - @Parameterized.Parameters(name = "fileFormat={0}") - public static Object[][] parameters() { - return new Object[][] { - new Object[] { FileFormat.PARQUET }, - new Object[] { FileFormat.AVRO }, - new Object[] { FileFormat.ORC } - }; - } +public class TestFlinkInputFormatReaderDeletes extends TestFlinkReaderDeletesBase { public TestFlinkInputFormatReaderDeletes(FileFormat inputFormat) { - this.format = inputFormat; - } - - @BeforeClass - public static void startMetastore() { - TestFlinkInputFormatReaderDeletes.metastore = new TestHiveMetastore(); - metastore.start(); - TestFlinkInputFormatReaderDeletes.hiveConf = metastore.hiveConf(); - TestFlinkInputFormatReaderDeletes.catalog = new HiveCatalog(hiveConf); - } - - @AfterClass - public static void stopMetastore() { - metastore.stop(); - catalog.close(); - TestFlinkInputFormatReaderDeletes.catalog = null; + super(inputFormat); } @Override - protected Table createTable(String name, Schema schema, PartitionSpec spec) { - Map props = Maps.newHashMap(); - props.put(TableProperties.DEFAULT_FILE_FORMAT, format.name()); - - Table table = catalog.createTable(TableIdentifier.of("default", name), schema, spec, props); - TableOperations ops = ((BaseTable) table).operations(); - TableMetadata meta = ops.current(); - ops.commit(meta, meta.upgradeToFormatVersion(2)); - - return table; - } - - @Override - protected void dropTable(String name) { - catalog.dropTable(TableIdentifier.of("default", name)); - } - - @Override - protected StructLikeSet rowSet(String name, Table testTable, String... columns) throws IOException { + protected StructLikeSet rowSet(String tableName, Table testTable, String... columns) throws IOException { Schema projected = testTable.schema().select(columns); RowType rowType = FlinkSchemaUtil.convert(projected); Map properties = Maps.newHashMap(); @@ -113,7 +53,7 @@ protected StructLikeSet rowSet(String name, Table testTable, String... columns) Integer.toString(hiveConf.getInt("iceberg.hive.client-pool-size", 5))); CatalogLoader hiveCatalogLoader = CatalogLoader.hive(catalog.name(), hiveConf, properties); FlinkInputFormat inputFormat = FlinkSource.forRowData() - .tableLoader(TableLoader.fromCatalog(hiveCatalogLoader, TableIdentifier.of("default", name))) + .tableLoader(TableLoader.fromCatalog(hiveCatalogLoader, TableIdentifier.of("default", tableName))) .project(FlinkSchemaUtil.toSchema(rowType)).buildFormat(); StructLikeSet set = StructLikeSet.create(projected.asStruct()); @@ -125,8 +65,4 @@ protected StructLikeSet rowSet(String name, Table testTable, String... columns) return set; } - @Override - protected boolean expectPruned() { - return false; - } } diff --git a/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkReaderDeletesBase.java b/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkReaderDeletesBase.java new file mode 100644 index 000000000000..920e5f3df984 --- /dev/null +++ b/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkReaderDeletesBase.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source; + +import java.util.Map; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.iceberg.BaseTable; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.DeleteReadTests; +import org.apache.iceberg.hive.HiveCatalog; +import org.apache.iceberg.hive.TestHiveMetastore; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public abstract class TestFlinkReaderDeletesBase extends DeleteReadTests { + + @ClassRule + public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder(); + + protected static String databaseName = "default"; + + protected static HiveConf hiveConf = null; + protected static HiveCatalog catalog = null; + private static TestHiveMetastore metastore = null; + + protected final FileFormat format; + + @Parameterized.Parameters(name = "fileFormat={0}") + public static Object[][] parameters() { + return new Object[][] { + new Object[] { FileFormat.PARQUET }, + new Object[] { FileFormat.AVRO }, + new Object[] { FileFormat.ORC } + }; + } + + TestFlinkReaderDeletesBase(FileFormat fileFormat) { + this.format = fileFormat; + } + + @BeforeClass + public static void startMetastore() { + metastore = new TestHiveMetastore(); + metastore.start(); + hiveConf = metastore.hiveConf(); + catalog = new HiveCatalog(hiveConf); + } + + @AfterClass + public static void stopMetastore() { + metastore.stop(); + catalog.close(); + catalog = null; + } + + @Override + protected Table createTable(String name, Schema schema, PartitionSpec spec) { + Map props = Maps.newHashMap(); + props.put(TableProperties.DEFAULT_FILE_FORMAT, format.name()); + + Table table = catalog.createTable(TableIdentifier.of(databaseName, name), schema, spec, props); + TableOperations ops = ((BaseTable) table).operations(); + TableMetadata meta = ops.current(); + ops.commit(meta, meta.upgradeToFormatVersion(2)); + + return table; + } + + @Override + protected void dropTable(String name) { + catalog.dropTable(TableIdentifier.of(databaseName, name)); + } + + @Override + protected boolean expectPruned() { + return false; + } +} diff --git a/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java b/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java index fcc71d53ddd2..b9c7d2c20ba4 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java +++ b/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java @@ -26,17 +26,9 @@ import java.time.LocalTime; import java.util.Arrays; import java.util.Collections; -import java.util.Comparator; import java.util.List; import java.util.Locale; import java.util.Map; -import java.util.Optional; -import org.apache.flink.table.api.TableColumn; -import org.apache.flink.table.api.TableSchema; -import org.apache.flink.table.data.RowData; -import org.apache.flink.table.data.conversion.DataStructureConverter; -import org.apache.flink.table.data.conversion.DataStructureConverters; -import org.apache.flink.table.types.utils.TypeConversions; import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.types.Row; import org.apache.hadoop.conf.Configuration; @@ -46,23 +38,22 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; -import org.apache.iceberg.TestHelpers; -import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.data.GenericAppenderHelper; import org.apache.iceberg.data.RandomGenericData; import org.apache.iceberg.data.Record; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; -import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.MiniClusterResource; -import org.apache.iceberg.flink.RowDataConverter; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.flink.TestFixtures; +import org.apache.iceberg.flink.TestHelpers; import org.apache.iceberg.hadoop.HadoopCatalog; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.DateTimeUtil; +import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.ClassRule; @@ -71,8 +62,6 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import static org.apache.iceberg.types.Types.NestedField.required; - @RunWith(Parameterized.class) public abstract class TestFlinkScan { @@ -83,18 +72,9 @@ public abstract class TestFlinkScan { @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); - protected static final Schema SCHEMA = new Schema( - required(1, "data", Types.StringType.get()), - required(2, "id", Types.LongType.get()), - required(3, "dt", Types.StringType.get())); - - protected static final PartitionSpec SPEC = PartitionSpec.builderFor(SCHEMA) - .identity("dt") - .bucket("id", 1) - .build(); - protected HadoopCatalog catalog; protected String warehouse; + protected String location; // parametrized variables protected final FileFormat fileFormat; @@ -113,68 +93,47 @@ public void before() throws IOException { File warehouseFile = TEMPORARY_FOLDER.newFolder(); Assert.assertTrue(warehouseFile.delete()); // before variables - Configuration conf = new Configuration(); warehouse = "file:" + warehouseFile; + Configuration conf = new Configuration(); catalog = new HadoopCatalog(conf, warehouse); + location = String.format("%s/%s/%s", warehouse, TestFixtures.DATABASE, TestFixtures.TABLE); } - private List runWithProjection(String... projected) throws IOException { - TableSchema.Builder builder = TableSchema.builder(); - TableSchema schema = FlinkSchemaUtil.toSchema(FlinkSchemaUtil.convert( - catalog.loadTable(TableIdentifier.of("default", "t")).schema())); - for (String field : projected) { - TableColumn column = schema.getTableColumn(field).get(); - builder.field(column.getName(), column.getType()); - } - return run(FlinkSource.forRowData().project(builder.build()), Maps.newHashMap(), "", projected); - } - - protected List runWithFilter(Expression filter, String sqlFilter) throws IOException { - FlinkSource.Builder builder = FlinkSource.forRowData().filters(Collections.singletonList(filter)); - return run(builder, Maps.newHashMap(), sqlFilter, "*"); - } - - private List runWithOptions(Map options) throws IOException { - FlinkSource.Builder builder = FlinkSource.forRowData(); - Optional.ofNullable(options.get("snapshot-id")).ifPresent(value -> builder.snapshotId(Long.parseLong(value))); - Optional.ofNullable(options.get("start-snapshot-id")) - .ifPresent(value -> builder.startSnapshotId(Long.parseLong(value))); - Optional.ofNullable(options.get("end-snapshot-id")) - .ifPresent(value -> builder.endSnapshotId(Long.parseLong(value))); - Optional.ofNullable(options.get("as-of-timestamp")) - .ifPresent(value -> builder.asOfTimestamp(Long.parseLong(value))); - return run(builder, options, "", "*"); + @After + public void after() throws IOException { } - private List run() throws IOException { - return run(FlinkSource.forRowData(), Maps.newHashMap(), "", "*"); + protected TableLoader tableLoader() { + return TableLoader.fromHadoopTable(location); } - protected abstract List run(FlinkSource.Builder formatBuilder, Map sqlOptions, String sqlFilter, - String... sqlSelectedFields) throws IOException; + protected abstract List runWithProjection(String... projected) throws Exception; + protected abstract List runWithFilter(Expression filter, String sqlFilter) throws Exception; + protected abstract List runWithOptions(Map options) throws Exception; + protected abstract List run() throws Exception; @Test public void testUnpartitionedTable() throws Exception { - Table table = catalog.createTable(TableIdentifier.of("default", "t"), SCHEMA); - List expectedRecords = RandomGenericData.generate(SCHEMA, 2, 0L); + Table table = catalog.createTable(TestFixtures.TABLE_IDENTIFIER, TestFixtures.SCHEMA); + List expectedRecords = RandomGenericData.generate(TestFixtures.SCHEMA, 2, 0L); new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER).appendToTable(expectedRecords); - assertRecords(run(), expectedRecords, SCHEMA); + TestHelpers.assertRecords(run(), expectedRecords, TestFixtures.SCHEMA); } @Test public void testPartitionedTable() throws Exception { - Table table = catalog.createTable(TableIdentifier.of("default", "t"), SCHEMA, SPEC); - List expectedRecords = RandomGenericData.generate(SCHEMA, 1, 0L); + Table table = catalog.createTable(TestFixtures.TABLE_IDENTIFIER, TestFixtures.SCHEMA, TestFixtures.SPEC); + List expectedRecords = RandomGenericData.generate(TestFixtures.SCHEMA, 1, 0L); expectedRecords.get(0).set(2, "2020-03-20"); new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER).appendToTable( org.apache.iceberg.TestHelpers.Row.of("2020-03-20", 0), expectedRecords); - assertRecords(run(), expectedRecords, SCHEMA); + TestHelpers.assertRecords(run(), expectedRecords, TestFixtures.SCHEMA); } @Test public void testProjection() throws Exception { - Table table = catalog.createTable(TableIdentifier.of("default", "t"), SCHEMA, SPEC); - List inputRecords = RandomGenericData.generate(SCHEMA, 1, 0L); + Table table = catalog.createTable(TestFixtures.TABLE_IDENTIFIER, TestFixtures.SCHEMA, TestFixtures.SPEC); + List inputRecords = RandomGenericData.generate(TestFixtures.SCHEMA, 1, 0L); new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER).appendToTable( org.apache.iceberg.TestHelpers.Row.of("2020-03-20", 0), inputRecords); assertRows(runWithProjection("data"), Row.of(inputRecords.get(0).get(0))); @@ -191,7 +150,7 @@ public void testIdentityPartitionProjections() throws Exception { PartitionSpec spec = PartitionSpec.builderFor(logSchema).identity("dt").identity("level").build(); - Table table = catalog.createTable(TableIdentifier.of("default", "t"), logSchema, spec); + Table table = catalog.createTable(TestFixtures.TABLE_IDENTIFIER, logSchema, spec); List inputRecords = RandomGenericData.generate(logSchema, 10, 0L); int idx = 0; @@ -228,7 +187,7 @@ public void testIdentityPartitionProjections() throws Exception { } private void validateIdentityPartitionProjections( - Table table, List projectedFields, List inputRecords) throws IOException { + Table table, List projectedFields, List inputRecords) throws Exception { List rows = runWithProjection(projectedFields.toArray(new String[0])); for (int pos = 0; pos < inputRecords.size(); pos++) { @@ -245,11 +204,11 @@ private void validateIdentityPartitionProjections( @Test public void testSnapshotReads() throws Exception { - Table table = catalog.createTable(TableIdentifier.of("default", "t"), SCHEMA); + Table table = catalog.createTable(TestFixtures.TABLE_IDENTIFIER, TestFixtures.SCHEMA); GenericAppenderHelper helper = new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER); - List expectedRecords = RandomGenericData.generate(SCHEMA, 1, 0L); + List expectedRecords = RandomGenericData.generate(TestFixtures.SCHEMA, 1, 0L); helper.appendToTable(expectedRecords); long snapshotId = table.currentSnapshot().snapshotId(); @@ -257,62 +216,64 @@ public void testSnapshotReads() throws Exception { // produce another timestamp waitUntilAfter(timestampMillis); - helper.appendToTable(RandomGenericData.generate(SCHEMA, 1, 0L)); - - assertRecords( - runWithOptions(ImmutableMap.builder().put("snapshot-id", Long.toString(snapshotId)).build()), - expectedRecords, SCHEMA); - assertRecords( - runWithOptions( - ImmutableMap.builder().put("as-of-timestamp", Long.toString(timestampMillis)).build()), - expectedRecords, SCHEMA); + helper.appendToTable(RandomGenericData.generate(TestFixtures.SCHEMA, 1, 0L)); + + TestHelpers.assertRecords( + runWithOptions(ImmutableMap.of("snapshot-id", Long.toString(snapshotId))), + expectedRecords, TestFixtures.SCHEMA); + TestHelpers.assertRecords( + runWithOptions(ImmutableMap.of("as-of-timestamp", Long.toString(timestampMillis))), + expectedRecords, TestFixtures.SCHEMA); } @Test public void testIncrementalRead() throws Exception { - Table table = catalog.createTable(TableIdentifier.of("default", "t"), SCHEMA); + Table table = catalog.createTable(TestFixtures.TABLE_IDENTIFIER, TestFixtures.SCHEMA); GenericAppenderHelper helper = new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER); - List records1 = RandomGenericData.generate(SCHEMA, 1, 0L); + List records1 = RandomGenericData.generate(TestFixtures.SCHEMA, 1, 0L); helper.appendToTable(records1); long snapshotId1 = table.currentSnapshot().snapshotId(); // snapshot 2 - List records2 = RandomGenericData.generate(SCHEMA, 1, 0L); + List records2 = RandomGenericData.generate(TestFixtures.SCHEMA, 1, 0L); helper.appendToTable(records2); - List records3 = RandomGenericData.generate(SCHEMA, 1, 0L); + List records3 = RandomGenericData.generate(TestFixtures.SCHEMA, 1, 0L); helper.appendToTable(records3); long snapshotId3 = table.currentSnapshot().snapshotId(); // snapshot 4 - helper.appendToTable(RandomGenericData.generate(SCHEMA, 1, 0L)); + helper.appendToTable(RandomGenericData.generate(TestFixtures.SCHEMA, 1, 0L)); List expected2 = Lists.newArrayList(); expected2.addAll(records2); expected2.addAll(records3); - assertRecords(runWithOptions(ImmutableMap.builder() + TestHelpers.assertRecords(runWithOptions( + ImmutableMap.builder() .put("start-snapshot-id", Long.toString(snapshotId1)) .put("end-snapshot-id", Long.toString(snapshotId3)).build()), - expected2, SCHEMA); + expected2, TestFixtures.SCHEMA); } @Test public void testFilterExp() throws Exception { - Table table = catalog.createTable(TableIdentifier.of("default", "t"), SCHEMA, SPEC); + Table table = catalog.createTable(TestFixtures.TABLE_IDENTIFIER, TestFixtures.SCHEMA, TestFixtures.SPEC); - List expectedRecords = RandomGenericData.generate(SCHEMA, 2, 0L); + List expectedRecords = RandomGenericData.generate(TestFixtures.SCHEMA, 2, 0L); expectedRecords.get(0).set(2, "2020-03-20"); expectedRecords.get(1).set(2, "2020-03-20"); GenericAppenderHelper helper = new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER); - DataFile dataFile1 = helper.writeFile(TestHelpers.Row.of("2020-03-20", 0), expectedRecords); - DataFile dataFile2 = helper.writeFile(TestHelpers.Row.of("2020-03-21", 0), - RandomGenericData.generate(SCHEMA, 2, 0L)); + DataFile dataFile1 = helper.writeFile(org.apache.iceberg.TestHelpers.Row.of("2020-03-20", 0), expectedRecords); + DataFile dataFile2 = helper.writeFile(org.apache.iceberg.TestHelpers.Row.of("2020-03-21", 0), + RandomGenericData.generate(TestFixtures.SCHEMA, 2, 0L)); helper.appendToTable(dataFile1, dataFile2); - assertRecords(runWithFilter(Expressions.equal("dt", "2020-03-20"), "where dt='2020-03-20'"), expectedRecords, - SCHEMA); + TestHelpers.assertRecords(runWithFilter( + Expressions.equal("dt", "2020-03-20"), "where dt='2020-03-20'"), + expectedRecords, + TestFixtures.SCHEMA); } @Test @@ -329,11 +290,11 @@ public void testPartitionTypes() throws Exception { PartitionSpec spec = PartitionSpec.builderFor(typesSchema).identity("decimal").identity("str").identity("binary") .identity("date").identity("time").identity("timestamp").build(); - Table table = catalog.createTable(TableIdentifier.of("default", "t"), typesSchema, spec); + Table table = catalog.createTable(TestFixtures.TABLE_IDENTIFIER, typesSchema, spec); List records = RandomGenericData.generate(typesSchema, 10, 0L); GenericAppenderHelper appender = new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER); for (Record record : records) { - TestHelpers.Row partition = TestHelpers.Row.of( + org.apache.iceberg.TestHelpers.Row partition = org.apache.iceberg.TestHelpers.Row.of( record.get(1), record.get(2), record.get(3), @@ -343,26 +304,11 @@ public void testPartitionTypes() throws Exception { appender.appendToTable(partition, Collections.singletonList(record)); } - assertRecords(run(), records, typesSchema); - } - - static void assertRecords(List results, List expectedRecords, Schema schema) { - List expected = Lists.newArrayList(); - @SuppressWarnings("unchecked") - DataStructureConverter converter = (DataStructureConverter) DataStructureConverters.getConverter( - TypeConversions.fromLogicalToDataType(FlinkSchemaUtil.convert(schema))); - expectedRecords.forEach(r -> expected.add(converter.toExternal(RowDataConverter.convert(schema, r)))); - assertRows(results, expected); + TestHelpers.assertRecords(run(), records, typesSchema); } private static void assertRows(List results, Row... expected) { - assertRows(results, Arrays.asList(expected)); - } - - static void assertRows(List results, List expected) { - expected.sort(Comparator.comparing(Row::toString)); - results.sort(Comparator.comparing(Row::toString)); - Assert.assertEquals(expected, results); + TestHelpers.assertRows(results, Arrays.asList(expected)); } private static void waitUntilAfter(long timestampMillis) { diff --git a/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScanSql.java b/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScanSql.java index 558ea3e77bb1..f99e4b682a86 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScanSql.java +++ b/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScanSql.java @@ -40,6 +40,7 @@ import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.flink.FlinkTableOptions; import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.flink.TestFixtures; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.junit.Assert; import org.junit.Test; @@ -47,7 +48,7 @@ /** * Test Flink SELECT SQLs. */ -public class TestFlinkScanSql extends TestFlinkScan { +public class TestFlinkScanSql extends TestFlinkSource { private volatile TableEnvironment tEnv; @@ -100,9 +101,9 @@ protected List run(FlinkSource.Builder formatBuilder, Map s @Test public void testResiduals() throws Exception { - Table table = catalog.createTable(TableIdentifier.of("default", "t"), SCHEMA, SPEC); + Table table = catalog.createTable(TableIdentifier.of("default", "t"), TestFixtures.SCHEMA, TestFixtures.SPEC); - List writeRecords = RandomGenericData.generate(SCHEMA, 2, 0L); + List writeRecords = RandomGenericData.generate(TestFixtures.SCHEMA, 2, 0L); writeRecords.get(0).set(1, 123L); writeRecords.get(0).set(2, "2020-03-20"); writeRecords.get(1).set(1, 456L); @@ -115,16 +116,17 @@ public void testResiduals() throws Exception { DataFile dataFile1 = helper.writeFile(TestHelpers.Row.of("2020-03-20", 0), writeRecords); DataFile dataFile2 = helper.writeFile(TestHelpers.Row.of("2020-03-21", 0), - RandomGenericData.generate(SCHEMA, 2, 0L)); + RandomGenericData.generate(TestFixtures.SCHEMA, 2, 0L)); helper.appendToTable(dataFile1, dataFile2); Expression filter = Expressions.and(Expressions.equal("dt", "2020-03-20"), Expressions.equal("id", 123)); - assertRecords(runWithFilter(filter, "where dt='2020-03-20' and id=123"), expectedRecords, SCHEMA); + org.apache.iceberg.flink.TestHelpers.assertRecords(runWithFilter( + filter, "where dt='2020-03-20' and id=123"), expectedRecords, TestFixtures.SCHEMA); } @Test public void testInferedParallelism() throws IOException { - Table table = catalog.createTable(TableIdentifier.of("default", "t"), SCHEMA, SPEC); + Table table = catalog.createTable(TableIdentifier.of("default", "t"), TestFixtures.SCHEMA, TestFixtures.SPEC); TableLoader tableLoader = TableLoader.fromHadoopTable(table.location()); FlinkInputFormat flinkInputFormat = FlinkSource.forRowData().tableLoader(tableLoader).table(table).buildFormat(); @@ -136,9 +138,9 @@ public void testInferedParallelism() throws IOException { GenericAppenderHelper helper = new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER); DataFile dataFile1 = helper.writeFile(TestHelpers.Row.of("2020-03-20", 0), - RandomGenericData.generate(SCHEMA, 2, 0L)); + RandomGenericData.generate(TestFixtures.SCHEMA, 2, 0L)); DataFile dataFile2 = helper.writeFile(TestHelpers.Row.of("2020-03-21", 0), - RandomGenericData.generate(SCHEMA, 2, 0L)); + RandomGenericData.generate(TestFixtures.SCHEMA, 2, 0L)); helper.appendToTable(dataFile1, dataFile2); // Make sure to generate 2 CombinedScanTasks diff --git a/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkSource.java b/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkSource.java new file mode 100644 index 000000000000..633a32a4c3d1 --- /dev/null +++ b/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkSource.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import org.apache.flink.table.api.TableColumn; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.types.Row; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; + +public abstract class TestFlinkSource extends TestFlinkScan { + + TestFlinkSource(String fileFormat) { + super(fileFormat); + } + + @Override + protected List runWithProjection(String... projected) throws Exception { + TableSchema.Builder builder = TableSchema.builder(); + TableSchema schema = FlinkSchemaUtil.toSchema(FlinkSchemaUtil.convert( + catalog.loadTable(TableIdentifier.of("default", "t")).schema())); + for (String field : projected) { + TableColumn column = schema.getTableColumn(field).get(); + builder.field(column.getName(), column.getType()); + } + return run(FlinkSource.forRowData().project(builder.build()), Maps.newHashMap(), "", projected); + } + + @Override + protected List runWithFilter(Expression filter, String sqlFilter) throws Exception { + FlinkSource.Builder builder = FlinkSource.forRowData().filters(Collections.singletonList(filter)); + return run(builder, Maps.newHashMap(), sqlFilter, "*"); + } + + @Override + protected List runWithOptions(Map options) throws Exception { + FlinkSource.Builder builder = FlinkSource.forRowData(); + Optional.ofNullable(options.get("snapshot-id")).ifPresent(value -> builder.snapshotId(Long.parseLong(value))); + Optional.ofNullable(options.get("start-snapshot-id")) + .ifPresent(value -> builder.startSnapshotId(Long.parseLong(value))); + Optional.ofNullable(options.get("end-snapshot-id")) + .ifPresent(value -> builder.endSnapshotId(Long.parseLong(value))); + Optional.ofNullable(options.get("as-of-timestamp")) + .ifPresent(value -> builder.asOfTimestamp(Long.parseLong(value))); + return run(builder, options, "", "*"); + } + + @Override + protected List run() throws Exception { + return run(FlinkSource.forRowData(), Maps.newHashMap(), "", "*"); + } + + protected abstract List run(FlinkSource.Builder formatBuilder, Map sqlOptions, String sqlFilter, + String... sqlSelectedFields) throws Exception; +} diff --git a/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java b/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java index dcd41dcb97c5..84fbf42c604c 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java +++ b/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java @@ -39,6 +39,7 @@ import org.apache.iceberg.data.GenericAppenderHelper; import org.apache.iceberg.data.RandomGenericData; import org.apache.iceberg.data.Record; +import org.apache.iceberg.flink.TestHelpers; import org.apache.iceberg.flink.TestTableLoader; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; @@ -117,7 +118,7 @@ public void testConsumeWithoutStartSnapshotId() throws Exception { function.close(); Assert.assertEquals("Should produce the expected splits", 1, sourceContext.splits.size()); - TestFlinkScan.assertRecords(sourceContext.toRows(), Lists.newArrayList(Iterables.concat(recordsList)), SCHEMA); + TestHelpers.assertRecords(sourceContext.toRows(), Lists.newArrayList(Iterables.concat(recordsList)), SCHEMA); } } @@ -151,7 +152,7 @@ public void testConsumeFromStartSnapshotId() throws Exception { function.close(); Assert.assertEquals("Should produce the expected splits", 1, sourceContext.splits.size()); - TestFlinkScan.assertRecords(sourceContext.toRows(), Lists.newArrayList(Iterables.concat(recordsList)), SCHEMA); + TestHelpers.assertRecords(sourceContext.toRows(), Lists.newArrayList(Iterables.concat(recordsList)), SCHEMA); } } @@ -181,7 +182,7 @@ public void testCheckpointRestore() throws Exception { func.close(); Assert.assertEquals("Should produce the expected splits", 1, sourceContext.splits.size()); - TestFlinkScan.assertRecords(sourceContext.toRows(), Lists.newArrayList(Iterables.concat(recordsList)), SCHEMA); + TestHelpers.assertRecords(sourceContext.toRows(), Lists.newArrayList(Iterables.concat(recordsList)), SCHEMA); } List> newRecordsList = generateRecordsAndCommitTxn(10); @@ -203,7 +204,7 @@ public void testCheckpointRestore() throws Exception { newFunc.close(); Assert.assertEquals("Should produce the expected splits", 1, sourceContext.splits.size()); - TestFlinkScan.assertRecords(sourceContext.toRows(), Lists.newArrayList(Iterables.concat(newRecordsList)), SCHEMA); + TestHelpers.assertRecords(sourceContext.toRows(), Lists.newArrayList(Iterables.concat(newRecordsList)), SCHEMA); } } diff --git a/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingReaderOperator.java b/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingReaderOperator.java index 112f02167674..0f5d6e1e4975 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingReaderOperator.java +++ b/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingReaderOperator.java @@ -39,6 +39,7 @@ import org.apache.iceberg.data.GenericAppenderHelper; import org.apache.iceberg.data.RandomGenericData; import org.apache.iceberg.data.Record; +import org.apache.iceberg.flink.TestHelpers; import org.apache.iceberg.flink.TestTableLoader; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; @@ -106,7 +107,7 @@ public void testProcessAllRecords() throws Exception { // Assert the output has expected elements. expected.addAll(expectedRecords.get(i)); - TestFlinkScan.assertRecords(readOutputValues(harness), expected, SCHEMA); + TestHelpers.assertRecords(readOutputValues(harness), expected, SCHEMA); } } } @@ -138,7 +139,7 @@ public void testTriggerCheckpoint() throws Exception { Assert.assertTrue("Should have processed the split0", processor.runMailboxStep()); Assert.assertTrue("Should have processed the snapshot state action", processor.runMailboxStep()); - TestFlinkScan.assertRecords(readOutputValues(harness), expectedRecords.get(0), SCHEMA); + TestHelpers.assertRecords(readOutputValues(harness), expectedRecords.get(0), SCHEMA); // Read records from split1. Assert.assertTrue("Should have processed the split1", processor.runMailboxStep()); @@ -146,7 +147,7 @@ public void testTriggerCheckpoint() throws Exception { // Read records from split2. Assert.assertTrue("Should have processed the split2", processor.runMailboxStep()); - TestFlinkScan.assertRecords(readOutputValues(harness), + TestHelpers.assertRecords(readOutputValues(harness), Lists.newArrayList(Iterables.concat(expectedRecords)), SCHEMA); } } @@ -175,7 +176,7 @@ public void testCheckpointRestore() throws Exception { expected.addAll(expectedRecords.get(i)); Assert.assertTrue("Should have processed the split#" + i, localMailbox.runMailboxStep()); - TestFlinkScan.assertRecords(readOutputValues(harness), expected, SCHEMA); + TestHelpers.assertRecords(readOutputValues(harness), expected, SCHEMA); } // Snapshot state now, there're 10 splits left in the state. @@ -195,7 +196,7 @@ public void testCheckpointRestore() throws Exception { expected.addAll(expectedRecords.get(i)); Assert.assertTrue("Should have processed one split#" + i, localMailbox.runMailboxStep()); - TestFlinkScan.assertRecords(readOutputValues(harness), expected, SCHEMA); + TestHelpers.assertRecords(readOutputValues(harness), expected, SCHEMA); } // Let's process the final 5 splits now. @@ -204,7 +205,7 @@ public void testCheckpointRestore() throws Exception { harness.processElement(splits.get(i), 1); Assert.assertTrue("Should have processed the split#" + i, localMailbox.runMailboxStep()); - TestFlinkScan.assertRecords(readOutputValues(harness), expected, SCHEMA); + TestHelpers.assertRecords(readOutputValues(harness), expected, SCHEMA); } } }