Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions flink/src/main/java/org/apache/iceberg/flink/data/RowDataUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,14 @@
import java.nio.ByteBuffer;
import org.apache.avro.generic.GenericData;
import org.apache.avro.util.Utf8;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.types.logical.RowType;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.ByteBuffers;
Expand Down Expand Up @@ -69,4 +74,30 @@ public static Object convertConstant(Type type, Object value) {
}
return value;
}

/**
* Similar to the private {@link RowDataSerializer#copyRowData(RowData, RowData)} method.
* This skips the check the arity of rowType and from,
* because the from RowData may contains additional column for position deletes.
* Using {@link RowDataSerializer#copy(RowData, RowData)} will fail the arity check.
*/
public static RowData clone(RowData from, RowData reuse, RowType rowType, TypeSerializer[] fieldSerializers) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How the FLIP-27 use this method ? How did they construct their TypeSerializer ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my understanding, this clone methods is really not friendly for developers to use. If we really need to introduce a copy with checking the length, then how about making this to be private and expose an more easy method to public :

public static RowData clone(RowData from, RowType rowType)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is used here: https://github.com/stevenzwu/iceberg/blob/flip27IcebergSource/flink/src/main/java/org/apache/iceberg/flink/source/reader/RowDataIteratorBulkFormat.java#L116

The main reason for adding TypeSerializer to the RowDataUtil#clone() method is to avoid constructing it for each clone call. In the constructor of RowDataIteratorBulkFormat, we construct TypeSerializer once from RowType.

Copy link
Member

@openinx openinx Mar 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK , seems we're clone the rowData iterately, I saw the FieldGetter will be created for each row here, that should also not be the expected behavior. We may need to introduce an new RowDataCloner which will initialize all its TypeSerializer & FieldGetter once in instance constructor, when iterating the RowData we will just clone row by row don't need to create any extra instances.

Users don't have to interact with the internal TypeSerializer, they could just use the RowDataCloner.

Copy link
Member

@openinx openinx Mar 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could do the refactor when we review the RowDataIteratorBulkFormat.

GenericRowData ret;
if (reuse instanceof GenericRowData) {
ret = (GenericRowData) reuse;
} else {
ret = new GenericRowData(from.getArity());
}
ret.setRowKind(from.getRowKind());
for (int i = 0; i < rowType.getFieldCount(); i++) {
if (!from.isNullAt(i)) {
RowData.FieldGetter getter = RowData.createFieldGetter(rowType.getTypeAt(i), i);
ret.setField(i, fieldSerializers[i].copy(getter.getFieldOrNull(from)));
} else {
ret.setField(i, null);
}
}
return ret;
}

}
52 changes: 52 additions & 0 deletions flink/src/test/java/org/apache/iceberg/flink/TestFixtures.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.flink;

import org.apache.flink.table.types.logical.RowType;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.types.Types;

import static org.apache.iceberg.types.Types.NestedField.required;

public class TestFixtures {

private TestFixtures() {

}

public static final Schema SCHEMA = new Schema(
required(1, "data", Types.StringType.get()),
required(2, "id", Types.LongType.get()),
required(3, "dt", Types.StringType.get()));

public static final PartitionSpec SPEC = PartitionSpec.builderFor(SCHEMA)
.identity("dt")
.bucket("id", 1)
.build();

public static final RowType ROW_TYPE = FlinkSchemaUtil.convert(SCHEMA);

public static final String DATABASE = "default";
public static final String TABLE = "t";

public static final TableIdentifier TABLE_IDENTIFIER = TableIdentifier.of(DATABASE, TABLE);
}
41 changes: 24 additions & 17 deletions flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@
import java.time.LocalTime;
import java.time.OffsetDateTime;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.table.data.ArrayData;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.MapData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.TimestampData;
Expand All @@ -47,7 +47,9 @@
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.Row;
import org.apache.iceberg.Schema;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.flink.data.RowDataUtil;
import org.apache.iceberg.flink.source.FlinkInputFormat;
import org.apache.iceberg.flink.source.FlinkInputSplit;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
Expand All @@ -64,20 +66,7 @@ public static RowData copyRowData(RowData from, RowType rowType) {
TypeSerializer[] fieldSerializers = rowType.getChildren().stream()
.map((LogicalType type) -> InternalSerializers.create(type))
.toArray(TypeSerializer[]::new);

// Use rowType field count to avoid copy metadata column in case of merging position deletes
GenericRowData ret = new GenericRowData(rowType.getFieldCount());
ret.setRowKind(from.getRowKind());
for (int i = 0; i < rowType.getFieldCount(); i++) {
if (!from.isNullAt(i)) {
RowData.FieldGetter getter = RowData.createFieldGetter(rowType.getTypeAt(i), i);
ret.setField(i, fieldSerializers[i].copy(getter.getFieldOrNull(from)));
} else {
ret.setField(i, null);
}
}

return ret;
return RowDataUtil.clone(from, null, rowType, fieldSerializers);
}

public static List<RowData> readRowData(FlinkInputFormat inputFormat, RowType rowType) throws IOException {
Expand All @@ -97,15 +86,33 @@ public static List<RowData> readRowData(FlinkInputFormat inputFormat, RowType ro
}

public static List<Row> readRows(FlinkInputFormat inputFormat, RowType rowType) throws IOException {
return convertRowDataToRow(readRowData(inputFormat, rowType), rowType);
}

public static List<Row> convertRowDataToRow(List<RowData> rowDataList, RowType rowType) {
DataStructureConverter<Object, Object> converter = DataStructureConverters.getConverter(
TypeConversions.fromLogicalToDataType(rowType));

return readRowData(inputFormat, rowType).stream()
return rowDataList.stream()
.map(converter::toExternal)
.map(Row.class::cast)
.collect(Collectors.toList());
}

public static void assertRecords(List<Row> results, List<Record> expectedRecords, Schema schema) {
List<Row> expected = Lists.newArrayList();
@SuppressWarnings("unchecked")
DataStructureConverter<RowData, Row> converter = (DataStructureConverter) DataStructureConverters.getConverter(
TypeConversions.fromLogicalToDataType(FlinkSchemaUtil.convert(schema)));
expectedRecords.forEach(r -> expected.add(converter.toExternal(RowDataConverter.convert(schema, r))));
assertRows(results, expected);
}

public static void assertRows(List<Row> results, List<Row> expected) {
expected.sort(Comparator.comparing(Row::toString));
results.sort(Comparator.comparing(Row::toString));
Assert.assertEquals(expected, results);
}

public static void assertRowData(Types.StructType structType, LogicalType rowType, Record expectedRecord,
RowData actualRowData) {
if (expectedRecord == null && actualRowData == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.apache.iceberg.data.RandomGenericData;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.TestHelpers;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
Expand All @@ -44,7 +43,7 @@
/**
* Test {@link FlinkInputFormat}.
*/
public class TestFlinkInputFormat extends TestFlinkScan {
public class TestFlinkInputFormat extends TestFlinkSource {

public TestFlinkInputFormat(String fileFormat) {
super(fileFormat);
Expand All @@ -55,15 +54,11 @@ public void before() throws IOException {
super.before();
}

private TableLoader loader() {
return TableLoader.fromHadoopTable(warehouse + "/default/t");
}

@Override
protected List<Row> run(
FlinkSource.Builder formatBuilder, Map<String, String> sqlOptions, String sqlFilter, String... sqlSelectedFields)
throws IOException {
return runFormat(formatBuilder.tableLoader(loader()).buildFormat());
throws Exception {
return runFormat(formatBuilder.tableLoader(tableLoader()).buildFormat());
}

@Test
Expand All @@ -89,15 +84,18 @@ public void testNestedProjection() throws Exception {
TableSchema projectedSchema = TableSchema.builder()
.field("nested", DataTypes.ROW(DataTypes.FIELD("f2", DataTypes.STRING())))
.field("data", DataTypes.STRING()).build();
List<Row> result = runFormat(FlinkSource.forRowData().tableLoader(loader()).project(projectedSchema).buildFormat());
List<Row> result = runFormat(FlinkSource.forRowData()
.tableLoader(tableLoader())
.project(projectedSchema)
.buildFormat());

List<Row> expected = Lists.newArrayList();
for (Record record : writeRecords) {
Row nested = Row.of(((Record) record.get(1)).get(1));
expected.add(Row.of(nested, record.get(0)));
}

assertRows(result, expected);
TestHelpers.assertRows(result, expected);
}

@Test
Expand All @@ -117,14 +115,15 @@ public void testBasicProjection() throws IOException {
.field("id", DataTypes.BIGINT())
.field("data", DataTypes.STRING())
.build();
List<Row> result = runFormat(FlinkSource.forRowData().tableLoader(loader()).project(projectedSchema).buildFormat());
List<Row> result = runFormat(FlinkSource.forRowData()
.tableLoader(tableLoader()).project(projectedSchema).buildFormat());

List<Row> expected = Lists.newArrayList();
for (Record record : writeRecords) {
expected.add(Row.of(record.get(0), record.get(1)));
}

assertRows(result, expected);
TestHelpers.assertRows(result, expected);
}

private List<Row> runFormat(FlinkInputFormat inputFormat) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,87 +23,27 @@
import java.util.Map;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.DeleteReadTests;
import org.apache.iceberg.flink.CatalogLoader;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.RowDataWrapper;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.TestHelpers;
import org.apache.iceberg.hive.HiveCatalog;
import org.apache.iceberg.hive.TestHiveMetastore;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.StructLikeSet;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
public class TestFlinkInputFormatReaderDeletes extends DeleteReadTests {
private static HiveConf hiveConf = null;
private static HiveCatalog catalog = null;
private static TestHiveMetastore metastore = null;

private final FileFormat format;

@Parameterized.Parameters(name = "fileFormat={0}")
public static Object[][] parameters() {
return new Object[][] {
new Object[] { FileFormat.PARQUET },
new Object[] { FileFormat.AVRO },
new Object[] { FileFormat.ORC }
};
}
public class TestFlinkInputFormatReaderDeletes extends TestFlinkReaderDeletesBase {

public TestFlinkInputFormatReaderDeletes(FileFormat inputFormat) {
this.format = inputFormat;
}

@BeforeClass
public static void startMetastore() {
TestFlinkInputFormatReaderDeletes.metastore = new TestHiveMetastore();
metastore.start();
TestFlinkInputFormatReaderDeletes.hiveConf = metastore.hiveConf();
TestFlinkInputFormatReaderDeletes.catalog = new HiveCatalog(hiveConf);
}

@AfterClass
public static void stopMetastore() {
metastore.stop();
catalog.close();
TestFlinkInputFormatReaderDeletes.catalog = null;
super(inputFormat);
}

@Override
protected Table createTable(String name, Schema schema, PartitionSpec spec) {
Map<String, String> props = Maps.newHashMap();
props.put(TableProperties.DEFAULT_FILE_FORMAT, format.name());

Table table = catalog.createTable(TableIdentifier.of("default", name), schema, spec, props);
TableOperations ops = ((BaseTable) table).operations();
TableMetadata meta = ops.current();
ops.commit(meta, meta.upgradeToFormatVersion(2));

return table;
}

@Override
protected void dropTable(String name) {
catalog.dropTable(TableIdentifier.of("default", name));
}

@Override
protected StructLikeSet rowSet(String name, Table testTable, String... columns) throws IOException {
protected StructLikeSet rowSet(String tableName, Table testTable, String... columns) throws IOException {
Schema projected = testTable.schema().select(columns);
RowType rowType = FlinkSchemaUtil.convert(projected);
Map<String, String> properties = Maps.newHashMap();
Expand All @@ -113,7 +53,7 @@ protected StructLikeSet rowSet(String name, Table testTable, String... columns)
Integer.toString(hiveConf.getInt("iceberg.hive.client-pool-size", 5)));
CatalogLoader hiveCatalogLoader = CatalogLoader.hive(catalog.name(), hiveConf, properties);
FlinkInputFormat inputFormat = FlinkSource.forRowData()
.tableLoader(TableLoader.fromCatalog(hiveCatalogLoader, TableIdentifier.of("default", name)))
.tableLoader(TableLoader.fromCatalog(hiveCatalogLoader, TableIdentifier.of("default", tableName)))
.project(FlinkSchemaUtil.toSchema(rowType)).buildFormat();

StructLikeSet set = StructLikeSet.create(projected.asStruct());
Expand All @@ -125,8 +65,4 @@ protected StructLikeSet rowSet(String name, Table testTable, String... columns)
return set;
}

@Override
protected boolean expectPruned() {
return false;
}
}
Loading