diff --git a/flink/src/main/java/org/apache/iceberg/flink/IcebergSinkUtil.java b/flink/src/main/java/org/apache/iceberg/flink/IcebergSinkUtil.java index a1d8fe97d176..8d80342a16a4 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/IcebergSinkUtil.java +++ b/flink/src/main/java/org/apache/iceberg/flink/IcebergSinkUtil.java @@ -22,7 +22,8 @@ import java.util.Locale; import java.util.Map; import org.apache.flink.table.api.TableSchema; -import org.apache.flink.types.Row; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; import org.apache.iceberg.FileFormat; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; @@ -39,22 +40,30 @@ class IcebergSinkUtil { private IcebergSinkUtil() { } - static IcebergStreamWriter createStreamWriter(Table table, TableSchema tableSchema) { + static IcebergStreamWriter createStreamWriter(Table table, TableSchema requestedSchema) { Preconditions.checkArgument(table != null, "Iceberg table should't be null"); - if (tableSchema != null) { - Schema writeSchema = FlinkSchemaUtil.convert(tableSchema); - // Reassign ids to match the existing table schema. - writeSchema = TypeUtil.reassignIds(writeSchema, table.schema()); + RowType flinkSchema; + if (requestedSchema != null) { + // Convert the flink schema to iceberg schema firstly, then reassign ids to match the existing iceberg schema. + Schema writeSchema = TypeUtil.reassignIds(FlinkSchemaUtil.convert(requestedSchema), table.schema()); TypeUtil.validateWriteSchema(table.schema(), writeSchema, true, true); + + // We use this flink schema to read values from RowData. The flink's TINYINT and SMALLINT will be promoted to + // iceberg INTEGER, that means if we use iceberg's table schema to read TINYINT (backend by 1 'byte'), we will + // read 4 bytes rather than 1 byte, it will mess up the byte array in BinaryRowData. So here we must use flink + // schema. + flinkSchema = (RowType) requestedSchema.toRowDataType().getLogicalType(); + } else { + flinkSchema = FlinkSchemaUtil.convert(table.schema()); } Map props = table.properties(); long targetFileSize = getTargetFileSizeBytes(props); FileFormat fileFormat = getFileFormat(props); - TaskWriterFactory taskWriterFactory = new RowTaskWriterFactory(table.schema(), table.spec(), - table.locationProvider(), table.io(), table.encryption(), targetFileSize, fileFormat, props); + TaskWriterFactory taskWriterFactory = new RowDataTaskWriterFactory(table.schema(), flinkSchema, + table.spec(), table.locationProvider(), table.io(), table.encryption(), targetFileSize, fileFormat, props); return new IcebergStreamWriter<>(table.toString(), taskWriterFactory); } diff --git a/flink/src/main/java/org/apache/iceberg/flink/RowTaskWriterFactory.java b/flink/src/main/java/org/apache/iceberg/flink/RowDataTaskWriterFactory.java similarity index 62% rename from flink/src/main/java/org/apache/iceberg/flink/RowTaskWriterFactory.java rename to flink/src/main/java/org/apache/iceberg/flink/RowDataTaskWriterFactory.java index 40e3e73f97f7..f14aed220a52 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/RowTaskWriterFactory.java +++ b/flink/src/main/java/org/apache/iceberg/flink/RowDataTaskWriterFactory.java @@ -22,14 +22,16 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.util.Map; -import org.apache.flink.types.Row; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; import org.apache.iceberg.FileFormat; import org.apache.iceberg.MetricsConfig; import org.apache.iceberg.PartitionKey; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; +import org.apache.iceberg.avro.Avro; import org.apache.iceberg.encryption.EncryptionManager; -import org.apache.iceberg.flink.data.FlinkParquetWriters; +import org.apache.iceberg.flink.data.FlinkAvroWriter; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.io.FileAppenderFactory; import org.apache.iceberg.io.FileIO; @@ -38,37 +40,39 @@ import org.apache.iceberg.io.OutputFileFactory; import org.apache.iceberg.io.TaskWriter; import org.apache.iceberg.io.UnpartitionedWriter; -import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -class RowTaskWriterFactory implements TaskWriterFactory { +class RowDataTaskWriterFactory implements TaskWriterFactory { private final Schema schema; + private final RowType flinkSchema; private final PartitionSpec spec; private final LocationProvider locations; private final FileIO io; private final EncryptionManager encryptionManager; private final long targetFileSizeBytes; private final FileFormat format; - private final FileAppenderFactory appenderFactory; + private final FileAppenderFactory appenderFactory; private OutputFileFactory outputFileFactory; - RowTaskWriterFactory(Schema schema, - PartitionSpec spec, - LocationProvider locations, - FileIO io, - EncryptionManager encryptionManager, - long targetFileSizeBytes, - FileFormat format, - Map tableProperties) { + RowDataTaskWriterFactory(Schema schema, + RowType flinkSchema, + PartitionSpec spec, + LocationProvider locations, + FileIO io, + EncryptionManager encryptionManager, + long targetFileSizeBytes, + FileFormat format, + Map tableProperties) { this.schema = schema; + this.flinkSchema = flinkSchema; this.spec = spec; this.locations = locations; this.io = io; this.encryptionManager = encryptionManager; this.targetFileSizeBytes = targetFileSizeBytes; this.format = format; - this.appenderFactory = new FlinkFileAppenderFactory(schema, tableProperties); + this.appenderFactory = new FlinkFileAppenderFactory(schema, flinkSchema, tableProperties); } @Override @@ -77,62 +81,63 @@ public void initialize(int taskId, int attemptId) { } @Override - public TaskWriter create() { + public TaskWriter create() { Preconditions.checkNotNull(outputFileFactory, "The outputFileFactory shouldn't be null if we have invoked the initialize()."); if (spec.fields().isEmpty()) { return new UnpartitionedWriter<>(spec, format, appenderFactory, outputFileFactory, io, targetFileSizeBytes); } else { - return new RowPartitionedFanoutWriter(spec, format, appenderFactory, outputFileFactory, - io, targetFileSizeBytes, schema); + return new RowDataPartitionedFanoutWriter(spec, format, appenderFactory, outputFileFactory, + io, targetFileSizeBytes, schema, flinkSchema); } } - private static class RowPartitionedFanoutWriter extends PartitionedFanoutWriter { + private static class RowDataPartitionedFanoutWriter extends PartitionedFanoutWriter { private final PartitionKey partitionKey; - private final RowWrapper rowWrapper; + private final RowDataWrapper rowDataWrapper; - RowPartitionedFanoutWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory appenderFactory, - OutputFileFactory fileFactory, FileIO io, long targetFileSize, Schema schema) { + RowDataPartitionedFanoutWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory appenderFactory, + OutputFileFactory fileFactory, FileIO io, long targetFileSize, Schema schema, + RowType flinkSchema) { super(spec, format, appenderFactory, fileFactory, io, targetFileSize); this.partitionKey = new PartitionKey(spec, schema); - this.rowWrapper = new RowWrapper(schema.asStruct()); + this.rowDataWrapper = new RowDataWrapper(flinkSchema, schema.asStruct()); } @Override - protected PartitionKey partition(Row row) { - partitionKey.partition(rowWrapper.wrap(row)); + protected PartitionKey partition(RowData row) { + partitionKey.partition(rowDataWrapper.wrap(row)); return partitionKey; } } - private static class FlinkFileAppenderFactory implements FileAppenderFactory { + private static class FlinkFileAppenderFactory implements FileAppenderFactory { private final Schema schema; + private final RowType flinkSchema; private final Map props; - private FlinkFileAppenderFactory(Schema schema, Map props) { + private FlinkFileAppenderFactory(Schema schema, RowType flinkSchema, Map props) { this.schema = schema; + this.flinkSchema = flinkSchema; this.props = props; } @Override - public FileAppender newAppender(OutputFile outputFile, FileFormat format) { + public FileAppender newAppender(OutputFile outputFile, FileFormat format) { + // TODO MetricsConfig will be used for building parquet RowData writer. MetricsConfig metricsConfig = MetricsConfig.fromProperties(props); try { switch (format) { - case PARQUET: - return Parquet.write(outputFile) - .createWriterFunc(FlinkParquetWriters::buildWriter) + case AVRO: + return Avro.write(outputFile) + .createWriterFunc(ignore -> new FlinkAvroWriter(flinkSchema)) .setAll(props) - .metricsConfig(metricsConfig) .schema(schema) .overwrite() .build(); - - case AVRO: - // TODO add the Avro writer building once RowDataWrapper is ready. + case PARQUET: case ORC: default: throw new UnsupportedOperationException("Cannot write unknown file format: " + format); diff --git a/flink/src/main/java/org/apache/iceberg/flink/RowDataWrapper.java b/flink/src/main/java/org/apache/iceberg/flink/RowDataWrapper.java index d2316da09368..28de63408e8f 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/RowDataWrapper.java +++ b/flink/src/main/java/org/apache/iceberg/flink/RowDataWrapper.java @@ -85,52 +85,51 @@ private interface PositionalGetter { } private static PositionalGetter buildGetter(LogicalType logicalType, Type type) { - switch (type.typeId()) { - case STRING: + switch (logicalType.getTypeRoot()) { + case TINYINT: + return (row, pos) -> (int) row.getByte(pos); + case SMALLINT: + return (row, pos) -> (int) row.getShort(pos); + case CHAR: + case VARCHAR: return (row, pos) -> row.getString(pos).toString(); - case FIXED: case BINARY: - return (row, pos) -> ByteBuffer.wrap(row.getBinary(pos)); - - case UUID: - return (row, pos) -> { - ByteBuffer bb = ByteBuffer.wrap(row.getBinary(pos)); - long mostSigBits = bb.getLong(); - long leastSigBits = bb.getLong(); - return new UUID(mostSigBits, leastSigBits); - }; + case VARBINARY: + if (Type.TypeID.UUID.equals(type.typeId())) { + return (row, pos) -> { + ByteBuffer bb = ByteBuffer.wrap(row.getBinary(pos)); + long mostSigBits = bb.getLong(); + long leastSigBits = bb.getLong(); + return new UUID(mostSigBits, leastSigBits); + }; + } else { + return (row, pos) -> ByteBuffer.wrap(row.getBinary(pos)); + } case DECIMAL: DecimalType decimalType = (DecimalType) logicalType; return (row, pos) -> row.getDecimal(pos, decimalType.getPrecision(), decimalType.getScale()).toBigDecimal(); - case TIME: + case TIME_WITHOUT_TIME_ZONE: // Time in RowData is in milliseconds (Integer), while iceberg's time is microseconds (Long). return (row, pos) -> ((long) row.getInt(pos)) * 1_000; - case TIMESTAMP: - switch (logicalType.getTypeRoot()) { - case TIMESTAMP_WITHOUT_TIME_ZONE: - TimestampType timestampType = (TimestampType) logicalType; - return (row, pos) -> { - LocalDateTime localDateTime = row.getTimestamp(pos, timestampType.getPrecision()).toLocalDateTime(); - return DateTimeUtil.microsFromTimestamp(localDateTime); - }; - - case TIMESTAMP_WITH_LOCAL_TIME_ZONE: - LocalZonedTimestampType lzTs = (LocalZonedTimestampType) logicalType; - return (row, pos) -> { - TimestampData timestampData = row.getTimestamp(pos, lzTs.getPrecision()); - return timestampData.getMillisecond() * 1000 + timestampData.getNanoOfMillisecond() / 1000; - }; - - default: - throw new IllegalArgumentException("Unhandled iceberg type: " + type + " corresponding flink type: " + - logicalType); - } + case TIMESTAMP_WITHOUT_TIME_ZONE: + TimestampType timestampType = (TimestampType) logicalType; + return (row, pos) -> { + LocalDateTime localDateTime = row.getTimestamp(pos, timestampType.getPrecision()).toLocalDateTime(); + return DateTimeUtil.microsFromTimestamp(localDateTime); + }; + + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + LocalZonedTimestampType lzTs = (LocalZonedTimestampType) logicalType; + return (row, pos) -> { + TimestampData timestampData = row.getTimestamp(pos, lzTs.getPrecision()); + return timestampData.getMillisecond() * 1000 + timestampData.getNanoOfMillisecond() / 1000; + }; - case STRUCT: + case ROW: RowType rowType = (RowType) logicalType; Types.StructType structType = (Types.StructType) type; diff --git a/flink/src/main/java/org/apache/iceberg/flink/RowWrapper.java b/flink/src/main/java/org/apache/iceberg/flink/RowWrapper.java deleted file mode 100644 index 0007518249d3..000000000000 --- a/flink/src/main/java/org/apache/iceberg/flink/RowWrapper.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iceberg.flink; - -import java.lang.reflect.Array; -import java.nio.ByteBuffer; -import java.time.LocalDate; -import java.time.LocalDateTime; -import java.time.LocalTime; -import java.time.OffsetDateTime; -import org.apache.flink.types.Row; -import org.apache.iceberg.StructLike; -import org.apache.iceberg.types.Type; -import org.apache.iceberg.types.Types; -import org.apache.iceberg.util.DateTimeUtil; - -class RowWrapper implements StructLike { - - private final Type[] types; - private final PositionalGetter[] getters; - private Row row = null; - - RowWrapper(Types.StructType type) { - int size = type.fields().size(); - - types = (Type[]) Array.newInstance(Type.class, size); - for (int i = 0; i < size; i++) { - types[i] = type.fields().get(i).type(); - } - - getters = (PositionalGetter[]) Array.newInstance(PositionalGetter.class, size); - for (int i = 0; i < size; i++) { - getters[i] = buildGetter(types[i]); - } - } - - RowWrapper wrap(Row data) { - this.row = data; - return this; - } - - @Override - public int size() { - return types.length; - } - - @Override - public T get(int pos, Class javaClass) { - if (row.getField(pos) == null) { - return null; - } else if (getters[pos] != null) { - return javaClass.cast(getters[pos].get(row, pos)); - } - - return javaClass.cast(row.getField(pos)); - } - - @Override - public void set(int pos, T value) { - row.setField(pos, value); - } - - private interface PositionalGetter { - T get(Row row, int pos); - } - - private static PositionalGetter buildGetter(Type type) { - switch (type.typeId()) { - case DATE: - return (r, pos) -> DateTimeUtil.daysFromDate((LocalDate) r.getField(pos)); - case TIME: - return (r, pos) -> DateTimeUtil.microsFromTime((LocalTime) r.getField(pos)); - case TIMESTAMP: - if (((Types.TimestampType) type).shouldAdjustToUTC()) { - return (r, pos) -> DateTimeUtil.microsFromTimestamptz((OffsetDateTime) r.getField(pos)); - } else { - return (r, pos) -> DateTimeUtil.microsFromTimestamp((LocalDateTime) r.getField(pos)); - } - case FIXED: - return (r, pos) -> ByteBuffer.wrap((byte[]) r.getField(pos)); - case STRUCT: - RowWrapper nestedWrapper = new RowWrapper((Types.StructType) type); - return (r, pos) -> nestedWrapper.wrap((Row) r.getField(pos)); - default: - return null; - } - } -} diff --git a/flink/src/test/java/org/apache/iceberg/flink/RowDataConverter.java b/flink/src/test/java/org/apache/iceberg/flink/RowDataConverter.java index b5a495479a8f..3dd7e75c2e63 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/RowDataConverter.java +++ b/flink/src/test/java/org/apache/iceberg/flink/RowDataConverter.java @@ -45,14 +45,14 @@ import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; -class RowDataConverter { +public class RowDataConverter { private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC); private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate(); private RowDataConverter() { } - static RowData convert(Schema iSchema, Record record) { + public static RowData convert(Schema iSchema, Record record) { return convert(iSchema.asStruct(), record); } diff --git a/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java b/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java index a81da4a14a82..82c2a89e33c1 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java +++ b/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java @@ -25,7 +25,9 @@ import java.util.Set; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.TableSchema; -import org.apache.flink.types.Row; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; @@ -74,10 +76,16 @@ static Record createRecord(Integer id, String data) { return record; } - static void assertTableRows(String tablePath, List rows) throws IOException { + static RowData createRowData(Integer id, String data) { + return GenericRowData.of(id, StringData.fromString(data)); + } + + static void assertTableRows(String tablePath, List rows) throws IOException { List records = Lists.newArrayList(); - for (Row row : rows) { - records.add(createRecord((Integer) row.getField(0), (String) row.getField(1))); + for (RowData row : rows) { + Integer id = row.isNullAt(0) ? null : row.getInt(0); + String data = row.isNullAt(1) ? null : row.getString(1).toString(); + records.add(createRecord(id, data)); } assertTableRecords(tablePath, records); } diff --git a/flink/src/test/java/org/apache/iceberg/flink/TestIcebergStreamWriter.java b/flink/src/test/java/org/apache/iceberg/flink/TestIcebergStreamWriter.java index a32075eb694c..bb07586d613e 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/TestIcebergStreamWriter.java +++ b/flink/src/test/java/org/apache/iceberg/flink/TestIcebergStreamWriter.java @@ -27,7 +27,10 @@ import java.util.Set; import org.apache.flink.streaming.api.operators.BoundedOneInput; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; -import org.apache.flink.types.Row; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; @@ -36,13 +39,18 @@ import org.apache.iceberg.AppendFiles; import org.apache.iceberg.DataFile; import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; +import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; +import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.Types; import org.junit.Assert; import org.junit.Before; import org.junit.Rule; @@ -62,12 +70,12 @@ public class TestIcebergStreamWriter { private final FileFormat format; private final boolean partitioned; - // TODO add AVRO/ORC unit test once the readers and writers are ready. + // TODO add ORC/Parquet unit test once the readers and writers are ready. @Parameterized.Parameters(name = "format = {0}, partitioned = {1}") public static Object[][] parameters() { return new Object[][] { - new Object[] {"parquet", true}, - new Object[] {"parquet", false} + new Object[] {"avro", true}, + new Object[] {"avro", false} }; } @@ -89,11 +97,11 @@ public void before() throws IOException { @Test public void testWritingTable() throws Exception { long checkpointId = 1L; - try (OneInputStreamOperatorTestHarness testHarness = createIcebergStreamWriter()) { + try (OneInputStreamOperatorTestHarness testHarness = createIcebergStreamWriter()) { // The first checkpoint - testHarness.processElement(Row.of(1, "hello"), 1); - testHarness.processElement(Row.of(2, "world"), 1); - testHarness.processElement(Row.of(3, "hello"), 1); + testHarness.processElement(SimpleDataUtil.createRowData(1, "hello"), 1); + testHarness.processElement(SimpleDataUtil.createRowData(2, "world"), 1); + testHarness.processElement(SimpleDataUtil.createRowData(3, "hello"), 1); testHarness.prepareSnapshotPreBarrier(checkpointId); long expectedDataFiles = partitioned ? 2 : 1; @@ -102,8 +110,8 @@ public void testWritingTable() throws Exception { checkpointId = checkpointId + 1; // The second checkpoint - testHarness.processElement(Row.of(4, "foo"), 1); - testHarness.processElement(Row.of(5, "bar"), 2); + testHarness.processElement(SimpleDataUtil.createRowData(4, "foo"), 1); + testHarness.processElement(SimpleDataUtil.createRowData(5, "bar"), 2); testHarness.prepareSnapshotPreBarrier(checkpointId); expectedDataFiles = partitioned ? 4 : 2; @@ -129,9 +137,9 @@ public void testWritingTable() throws Exception { public void testSnapshotTwice() throws Exception { long checkpointId = 1; long timestamp = 1; - try (OneInputStreamOperatorTestHarness testHarness = createIcebergStreamWriter()) { - testHarness.processElement(Row.of(1, "hello"), timestamp++); - testHarness.processElement(Row.of(2, "world"), timestamp); + try (OneInputStreamOperatorTestHarness testHarness = createIcebergStreamWriter()) { + testHarness.processElement(SimpleDataUtil.createRowData(1, "hello"), timestamp++); + testHarness.processElement(SimpleDataUtil.createRowData(2, "world"), timestamp); testHarness.prepareSnapshotPreBarrier(checkpointId++); long expectedDataFiles = partitioned ? 2 : 1; @@ -147,14 +155,14 @@ public void testSnapshotTwice() throws Exception { @Test public void testTableWithoutSnapshot() throws Exception { - try (OneInputStreamOperatorTestHarness testHarness = createIcebergStreamWriter()) { + try (OneInputStreamOperatorTestHarness testHarness = createIcebergStreamWriter()) { Assert.assertEquals(0, testHarness.extractOutputValues().size()); } // Even if we closed the iceberg stream writer, there's no orphan data file. Assert.assertEquals(0, scanDataFiles().size()); - try (OneInputStreamOperatorTestHarness testHarness = createIcebergStreamWriter()) { - testHarness.processElement(Row.of(1, "hello"), 1); + try (OneInputStreamOperatorTestHarness testHarness = createIcebergStreamWriter()) { + testHarness.processElement(SimpleDataUtil.createRowData(1, "hello"), 1); // Still not emit the data file yet, because there is no checkpoint. Assert.assertEquals(0, testHarness.extractOutputValues().size()); } @@ -185,9 +193,9 @@ private Set scanDataFiles() throws IOException { @Test public void testBoundedStreamCloseWithEmittingDataFiles() throws Exception { - try (OneInputStreamOperatorTestHarness testHarness = createIcebergStreamWriter()) { - testHarness.processElement(Row.of(1, "hello"), 1); - testHarness.processElement(Row.of(2, "world"), 2); + try (OneInputStreamOperatorTestHarness testHarness = createIcebergStreamWriter()) { + testHarness.processElement(SimpleDataUtil.createRowData(1, "hello"), 1); + testHarness.processElement(SimpleDataUtil.createRowData(2, "world"), 2); Assert.assertTrue(testHarness.getOneInputOperator() instanceof BoundedOneInput); ((BoundedOneInput) testHarness.getOneInputOperator()).endInput(); @@ -208,17 +216,17 @@ public void testTableWithTargetFileSize() throws Exception { .set(TableProperties.WRITE_TARGET_FILE_SIZE_BYTES, "4") // ~4 bytes; low enough to trigger .commit(); - List rows = Lists.newArrayListWithCapacity(8000); + List rows = Lists.newArrayListWithCapacity(8000); List records = Lists.newArrayListWithCapacity(8000); for (int i = 0; i < 2000; i++) { for (String data : new String[] {"a", "b", "c", "d"}) { - rows.add(Row.of(i, data)); + rows.add(SimpleDataUtil.createRowData(i, data)); records.add(SimpleDataUtil.createRecord(i, data)); } } - try (OneInputStreamOperatorTestHarness testHarness = createIcebergStreamWriter()) { - for (Row row : rows) { + try (OneInputStreamOperatorTestHarness testHarness = createIcebergStreamWriter()) { + for (RowData row : rows) { testHarness.processElement(row, 1); } @@ -241,10 +249,69 @@ public void testTableWithTargetFileSize() throws Exception { SimpleDataUtil.assertTableRecords(tablePath, records); } - private OneInputStreamOperatorTestHarness createIcebergStreamWriter() throws Exception { - IcebergStreamWriter streamWriter = IcebergSinkUtil.createStreamWriter(table, SimpleDataUtil.FLINK_SCHEMA); - OneInputStreamOperatorTestHarness harness = new OneInputStreamOperatorTestHarness<>(streamWriter, - 1, 1, 0); + @Test + public void testPromotedFlinkDataType() throws Exception { + Schema iSchema = new Schema( + Types.NestedField.required(1, "tinyint", Types.IntegerType.get()), + Types.NestedField.required(2, "smallint", Types.IntegerType.get()), + Types.NestedField.optional(3, "int", Types.IntegerType.get()) + ); + TableSchema flinkSchema = TableSchema.builder() + .field("tinyint", DataTypes.TINYINT().notNull()) + .field("smallint", DataTypes.SMALLINT().notNull()) + .field("int", DataTypes.INT().nullable()) + .build(); + + PartitionSpec spec; + if (partitioned) { + spec = PartitionSpec.builderFor(iSchema).identity("smallint").identity("tinyint").identity("int").build(); + } else { + spec = PartitionSpec.unpartitioned(); + } + + String location = tempFolder.newFolder().getAbsolutePath(); + Map props = ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name()); + Table icebergTable = new HadoopTables().create(iSchema, spec, props, location); + + List rows = Lists.newArrayList( + GenericRowData.of((byte) 0x01, (short) -32768, 101), + GenericRowData.of((byte) 0x02, (short) 0, 102), + GenericRowData.of((byte) 0x03, (short) 32767, 103) + ); + + Record record = GenericRecord.create(iSchema); + List expected = Lists.newArrayList( + record.copy(ImmutableMap.of("tinyint", 1, "smallint", -32768, "int", 101)), + record.copy(ImmutableMap.of("tinyint", 2, "smallint", 0, "int", 102)), + record.copy(ImmutableMap.of("tinyint", 3, "smallint", 32767, "int", 103)) + ); + + try (OneInputStreamOperatorTestHarness testHarness = createIcebergStreamWriter(icebergTable, + flinkSchema)) { + for (RowData row : rows) { + testHarness.processElement(row, 1); + } + testHarness.prepareSnapshotPreBarrier(1); + Assert.assertEquals(partitioned ? 3 : 1, testHarness.extractOutputValues().size()); + + // Commit the iceberg transaction. + AppendFiles appendFiles = icebergTable.newAppend(); + testHarness.extractOutputValues().forEach(appendFiles::appendFile); + appendFiles.commit(); + } + + SimpleDataUtil.assertTableRecords(location, expected); + } + + private OneInputStreamOperatorTestHarness createIcebergStreamWriter() throws Exception { + return createIcebergStreamWriter(table, SimpleDataUtil.FLINK_SCHEMA); + } + + private OneInputStreamOperatorTestHarness createIcebergStreamWriter( + Table icebergTable, TableSchema flinkSchema) throws Exception { + IcebergStreamWriter streamWriter = IcebergSinkUtil.createStreamWriter(icebergTable, flinkSchema); + OneInputStreamOperatorTestHarness harness = new OneInputStreamOperatorTestHarness<>( + streamWriter, 1, 1, 0); harness.setup(); harness.open(); diff --git a/flink/src/test/java/org/apache/iceberg/flink/TestPartitionKey.java b/flink/src/test/java/org/apache/iceberg/flink/TestPartitionKey.java deleted file mode 100644 index fde65af6157c..000000000000 --- a/flink/src/test/java/org/apache/iceberg/flink/TestPartitionKey.java +++ /dev/null @@ -1,230 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iceberg.flink; - -import java.nio.ByteBuffer; -import java.time.LocalDate; -import java.time.LocalDateTime; -import java.time.LocalTime; -import java.time.OffsetDateTime; -import java.util.List; -import org.apache.flink.types.Row; -import org.apache.iceberg.PartitionKey; -import org.apache.iceberg.PartitionSpec; -import org.apache.iceberg.Schema; -import org.apache.iceberg.flink.data.RandomData; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.types.Type; -import org.apache.iceberg.types.Types; -import org.apache.iceberg.util.DateTimeUtil; -import org.junit.Assert; -import org.junit.Test; - -public class TestPartitionKey { - - private static final Schema SCHEMA = new Schema( - Types.NestedField.optional(1, "id", Types.IntegerType.get()), - Types.NestedField.optional(2, "dateType", Types.DateType.get()), - Types.NestedField.optional(3, "timeType", Types.TimeType.get()), - Types.NestedField.optional(4, "timestampWithoutZone", Types.TimestampType.withoutZone()), - Types.NestedField.required(5, "timestampWithZone", Types.TimestampType.withZone()), - Types.NestedField.optional(6, "fixedType", Types.FixedType.ofLength(5)), - Types.NestedField.optional(7, "uuidType", Types.UUIDType.get()), - Types.NestedField.optional(8, "binaryType", Types.BinaryType.get()), - Types.NestedField.optional(9, "decimalType1", Types.DecimalType.of(3, 14)), - Types.NestedField.optional(10, "decimalType2", Types.DecimalType.of(10, 20)), - Types.NestedField.optional(11, "decimalType3", Types.DecimalType.of(38, 19)), - Types.NestedField.optional(12, "floatType", Types.FloatType.get()), - Types.NestedField.required(13, "doubleType", Types.DoubleType.get()) - ); - - private static final String[] SUPPORTED_PRIMITIVES = new String[] { - "id", "dateType", "timeType", "timestampWithoutZone", "timestampWithZone", "fixedType", "uuidType", - "binaryType", "decimalType1", "decimalType2", "decimalType3", "floatType", "doubleType" - }; - - private static final Schema NESTED_SCHEMA = new Schema( - Types.NestedField.required(1, "structType", Types.StructType.of( - Types.NestedField.optional(2, "innerStringType", Types.StringType.get()), - Types.NestedField.optional(3, "innerIntegerType", Types.IntegerType.get()) - )) - ); - - @Test - public void testNullPartitionValue() { - Schema schema = new Schema( - Types.NestedField.optional(1, "id", Types.IntegerType.get()), - Types.NestedField.optional(2, "data", Types.StringType.get()) - ); - - PartitionSpec spec = PartitionSpec.builderFor(schema) - .identity("data") - .build(); - - List rows = Lists.newArrayList( - Row.of(1, "a"), - Row.of(2, "b"), - Row.of(3, null) - ); - - RowWrapper rowWrapper = new RowWrapper(schema.asStruct()); - - for (Row row : rows) { - PartitionKey partitionKey = new PartitionKey(spec, schema); - partitionKey.partition(rowWrapper.wrap(row)); - Assert.assertEquals(partitionKey.size(), 1); - Assert.assertEquals(partitionKey.get(0, String.class), row.getField(1)); - } - } - - @Test - public void testPartitionWithOneNestedField() { - RowWrapper rowWrapper = new RowWrapper(NESTED_SCHEMA.asStruct()); - Iterable rows = RandomData.generate(NESTED_SCHEMA, 10, 1991); - - PartitionSpec spec1 = PartitionSpec.builderFor(NESTED_SCHEMA) - .identity("structType.innerStringType") - .build(); - PartitionSpec spec2 = PartitionSpec.builderFor(NESTED_SCHEMA) - .identity("structType.innerIntegerType") - .build(); - - for (Row row : rows) { - Row innerRow = (Row) row.getField(0); - - PartitionKey partitionKey1 = new PartitionKey(spec1, NESTED_SCHEMA); - partitionKey1.partition(rowWrapper.wrap(row)); - Object innerStringValue = innerRow.getField(0); - Assert.assertEquals(partitionKey1.size(), 1); - Assert.assertEquals(partitionKey1.get(0, String.class), innerStringValue); - - PartitionKey partitionKey2 = new PartitionKey(spec2, NESTED_SCHEMA); - partitionKey2.partition(rowWrapper.wrap(row)); - Object innerIntegerValue = innerRow.getField(1); - Assert.assertEquals(partitionKey2.size(), 1); - Assert.assertEquals(partitionKey2.get(0, Integer.class), innerIntegerValue); - } - } - - @Test - public void testPartitionMultipleNestedField() { - RowWrapper rowWrapper = new RowWrapper(NESTED_SCHEMA.asStruct()); - Iterable rows = RandomData.generate(NESTED_SCHEMA, 10, 1992); - - PartitionSpec spec1 = PartitionSpec.builderFor(NESTED_SCHEMA) - .identity("structType.innerIntegerType") - .identity("structType.innerStringType") - .build(); - PartitionSpec spec2 = PartitionSpec.builderFor(NESTED_SCHEMA) - .identity("structType.innerStringType") - .identity("structType.innerIntegerType") - .build(); - - PartitionKey pk1 = new PartitionKey(spec1, NESTED_SCHEMA); - PartitionKey pk2 = new PartitionKey(spec2, NESTED_SCHEMA); - - for (Row row : rows) { - Row innerRow = (Row) row.getField(0); - - pk1.partition(rowWrapper.wrap(row)); - Assert.assertEquals(2, pk1.size()); - Assert.assertEquals(innerRow.getField(1), pk1.get(0, Integer.class)); - Assert.assertEquals(innerRow.getField(0), pk1.get(1, String.class)); - - pk2.partition(rowWrapper.wrap(row)); - Assert.assertEquals(2, pk2.size()); - Assert.assertEquals(innerRow.getField(0), pk2.get(0, String.class)); - Assert.assertEquals(innerRow.getField(1), pk2.get(1, Integer.class)); - } - } - - private static Object transform(Object value, Type type) { - if (value == null) { - return null; - } - switch (type.typeId()) { - case DATE: - return DateTimeUtil.daysFromDate((LocalDate) value); - case TIME: - return DateTimeUtil.microsFromTime((LocalTime) value); - case TIMESTAMP: - if (((Types.TimestampType) type).shouldAdjustToUTC()) { - return DateTimeUtil.microsFromTimestamptz((OffsetDateTime) value); - } else { - return DateTimeUtil.microsFromTimestamp((LocalDateTime) value); - } - case FIXED: - return ByteBuffer.wrap((byte[]) value); - default: - return value; - } - } - - @Test - public void testPartitionValueTypes() { - RowWrapper rowWrapper = new RowWrapper(SCHEMA.asStruct()); - Iterable rows = RandomData.generate(SCHEMA, 10, 1993); - - for (int i = 0; i < SUPPORTED_PRIMITIVES.length; i++) { - String column = SUPPORTED_PRIMITIVES[i]; - - PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity(column).build(); - Type type = spec.schema().findType(column); - Class[] javaClasses = spec.javaClasses(); - - PartitionKey pk = new PartitionKey(spec, SCHEMA); - - for (Row row : rows) { - pk.partition(rowWrapper.wrap(row)); - Object expected = row.getField(i); - Assert.assertEquals("Partition with column " + column + " should have one field.", 1, pk.size()); - Assert.assertEquals("Partition with column " + column + " should have the expected values", - transform(expected, type), pk.get(0, javaClasses[0])); - } - } - } - - @Test - public void testNestedPartitionValues() { - Schema nestedSchema = new Schema(Types.NestedField.optional(1001, "nested", SCHEMA.asStruct())); - RowWrapper rowWrapper = new RowWrapper(nestedSchema.asStruct()); - Iterable rows = RandomData.generate(nestedSchema, 10, 1994); - - for (int i = 0; i < SUPPORTED_PRIMITIVES.length; i++) { - String column = String.format("nested.%s", SUPPORTED_PRIMITIVES[i]); - - PartitionSpec spec = PartitionSpec.builderFor(nestedSchema).identity(column).build(); - Type type = spec.schema().findType(column); - Class[] javaClasses = spec.javaClasses(); - - PartitionKey pk = new PartitionKey(spec, nestedSchema); - - for (Row row : rows) { - pk.partition(rowWrapper.wrap(row)); - - Object expected = ((Row) row.getField(0)).getField(i); - Assert.assertEquals("Partition with nested column " + column + " should have one field.", - 1, pk.size()); - Assert.assertEquals("Partition with nested column " + column + "should have the expected values.", - transform(expected, type), pk.get(0, javaClasses[0])); - } - } - } -} diff --git a/flink/src/test/java/org/apache/iceberg/flink/TestRowDataPartitionKey.java b/flink/src/test/java/org/apache/iceberg/flink/TestRowDataPartitionKey.java index b27a4b4adb1c..22a8654e9401 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/TestRowDataPartitionKey.java +++ b/flink/src/test/java/org/apache/iceberg/flink/TestRowDataPartitionKey.java @@ -31,6 +31,7 @@ import org.apache.iceberg.data.InternalRecordWrapper; import org.apache.iceberg.data.RandomGenericData; import org.apache.iceberg.data.Record; +import org.apache.iceberg.flink.data.RandomRowData; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Types; import org.junit.Assert; @@ -99,8 +100,7 @@ public void testNullPartitionValue() { public void testPartitionWithOneNestedField() { RowDataWrapper rowWrapper = new RowDataWrapper(FlinkSchemaUtil.convert(NESTED_SCHEMA), NESTED_SCHEMA.asStruct()); List records = RandomGenericData.generate(NESTED_SCHEMA, 10, 1991); - List rows = records.stream().map(record -> RowDataConverter.convert(NESTED_SCHEMA, record)) - .collect(Collectors.toList()); + List rows = Lists.newArrayList(RandomRowData.convert(NESTED_SCHEMA, records)); PartitionSpec spec1 = PartitionSpec.builderFor(NESTED_SCHEMA) .identity("structType.innerStringType") @@ -131,8 +131,7 @@ public void testPartitionWithOneNestedField() { public void testPartitionMultipleNestedField() { RowDataWrapper rowWrapper = new RowDataWrapper(FlinkSchemaUtil.convert(NESTED_SCHEMA), NESTED_SCHEMA.asStruct()); List records = RandomGenericData.generate(NESTED_SCHEMA, 10, 1992); - List rows = records.stream().map(record -> RowDataConverter.convert(NESTED_SCHEMA, record)) - .collect(Collectors.toList()); + List rows = Lists.newArrayList(RandomRowData.convert(NESTED_SCHEMA, records)); PartitionSpec spec1 = PartitionSpec.builderFor(NESTED_SCHEMA) .identity("structType.innerIntegerType") @@ -171,8 +170,7 @@ public void testPartitionValueTypes() { InternalRecordWrapper recordWrapper = new InternalRecordWrapper(SCHEMA.asStruct()); List records = RandomGenericData.generate(SCHEMA, 10, 1993); - List rows = records.stream().map(record -> RowDataConverter.convert(SCHEMA, record)) - .collect(Collectors.toList()); + List rows = Lists.newArrayList(RandomRowData.convert(SCHEMA, records)); for (String column : SUPPORTED_PRIMITIVES) { PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity(column).build(); @@ -210,8 +208,7 @@ public void testNestedPartitionValues() { InternalRecordWrapper recordWrapper = new InternalRecordWrapper(nestedSchema.asStruct()); List records = RandomGenericData.generate(nestedSchema, 10, 1994); - List rows = records.stream().map(record -> RowDataConverter.convert(nestedSchema, record)) - .collect(Collectors.toList()); + List rows = Lists.newArrayList(RandomRowData.convert(nestedSchema, records)); for (int i = 0; i < SUPPORTED_PRIMITIVES.size(); i++) { String column = String.format("nested.%s", SUPPORTED_PRIMITIVES.get(i)); diff --git a/flink/src/test/java/org/apache/iceberg/flink/TestTaskWriters.java b/flink/src/test/java/org/apache/iceberg/flink/TestTaskWriters.java index ee65eb463efb..dc39fd5baf27 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/TestTaskWriters.java +++ b/flink/src/test/java/org/apache/iceberg/flink/TestTaskWriters.java @@ -24,7 +24,8 @@ import java.util.List; import java.util.Locale; import java.util.Map; -import org.apache.flink.types.Row; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -34,7 +35,7 @@ import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.data.Record; -import org.apache.iceberg.flink.data.RandomData; +import org.apache.iceberg.flink.data.RandomRowData; import org.apache.iceberg.io.TaskWriter; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -54,13 +55,12 @@ public class TestTaskWriters { @Rule public final TemporaryFolder tempFolder = new TemporaryFolder(); - // TODO add AVRO unit test once the RowDataWrapper are ready. - // TODO add ORC unit test once the readers and writers are ready. + // TODO add ORC/Parquet unit test once the readers and writers are ready. @Parameterized.Parameters(name = "format = {0}, partitioned = {1}") public static Object[][] parameters() { return new Object[][] { - new Object[] {"parquet", true}, - new Object[] {"parquet", false} + new Object[] {"avro", true}, + new Object[] {"avro", false} }; } @@ -87,7 +87,7 @@ public void before() throws IOException { @Test public void testWriteZeroRecord() throws IOException { - try (TaskWriter taskWriter = createTaskWriter(TARGET_FILE_SIZE)) { + try (TaskWriter taskWriter = createTaskWriter(TARGET_FILE_SIZE)) { taskWriter.close(); DataFile[] dataFiles = taskWriter.complete(); @@ -104,9 +104,9 @@ public void testWriteZeroRecord() throws IOException { @Test public void testCloseTwice() throws IOException { - try (TaskWriter taskWriter = createTaskWriter(TARGET_FILE_SIZE)) { - taskWriter.write(Row.of(1, "hello")); - taskWriter.write(Row.of(2, "world")); + try (TaskWriter taskWriter = createTaskWriter(TARGET_FILE_SIZE)) { + taskWriter.write(SimpleDataUtil.createRowData(1, "hello")); + taskWriter.write(SimpleDataUtil.createRowData(2, "world")); taskWriter.close(); // The first close taskWriter.close(); // The second close @@ -123,9 +123,9 @@ public void testCloseTwice() throws IOException { @Test public void testAbort() throws IOException { - try (TaskWriter taskWriter = createTaskWriter(TARGET_FILE_SIZE)) { - taskWriter.write(Row.of(1, "hello")); - taskWriter.write(Row.of(2, "world")); + try (TaskWriter taskWriter = createTaskWriter(TARGET_FILE_SIZE)) { + taskWriter.write(SimpleDataUtil.createRowData(1, "hello")); + taskWriter.write(SimpleDataUtil.createRowData(2, "world")); taskWriter.abort(); DataFile[] dataFiles = taskWriter.complete(); @@ -142,11 +142,11 @@ public void testAbort() throws IOException { @Test public void testCompleteFiles() throws IOException { - try (TaskWriter taskWriter = createTaskWriter(TARGET_FILE_SIZE)) { - taskWriter.write(Row.of(1, "a")); - taskWriter.write(Row.of(2, "b")); - taskWriter.write(Row.of(3, "c")); - taskWriter.write(Row.of(4, "d")); + try (TaskWriter taskWriter = createTaskWriter(TARGET_FILE_SIZE)) { + taskWriter.write(SimpleDataUtil.createRowData(1, "a")); + taskWriter.write(SimpleDataUtil.createRowData(2, "b")); + taskWriter.write(SimpleDataUtil.createRowData(3, "c")); + taskWriter.write(SimpleDataUtil.createRowData(4, "d")); DataFile[] dataFiles = taskWriter.complete(); int expectedFiles = partitioned ? 4 : 1; @@ -178,17 +178,17 @@ public void testCompleteFiles() throws IOException { @Test public void testRollingWithTargetFileSize() throws IOException { - try (TaskWriter taskWriter = createTaskWriter(4)) { - List rows = Lists.newArrayListWithCapacity(8000); + try (TaskWriter taskWriter = createTaskWriter(4)) { + List rows = Lists.newArrayListWithCapacity(8000); List records = Lists.newArrayListWithCapacity(8000); for (int i = 0; i < 2000; i++) { for (String data : new String[] {"a", "b", "c", "d"}) { - rows.add(Row.of(i, data)); + rows.add(SimpleDataUtil.createRowData(i, data)); records.add(SimpleDataUtil.createRecord(i, data)); } } - for (Row row : rows) { + for (RowData row : rows) { taskWriter.write(row); } @@ -208,9 +208,9 @@ public void testRollingWithTargetFileSize() throws IOException { @Test public void testRandomData() throws IOException { - try (TaskWriter taskWriter = createTaskWriter(TARGET_FILE_SIZE)) { - Iterable rows = RandomData.generate(SimpleDataUtil.SCHEMA, 100, 1996); - for (Row row : rows) { + try (TaskWriter taskWriter = createTaskWriter(TARGET_FILE_SIZE)) { + Iterable rows = RandomRowData.generate(SimpleDataUtil.SCHEMA, 100, 1996); + for (RowData row : rows) { taskWriter.write(row); } @@ -227,9 +227,11 @@ public void testRandomData() throws IOException { } } - private TaskWriter createTaskWriter(long targetFileSize) { - TaskWriterFactory taskWriterFactory = new RowTaskWriterFactory(table.schema(), table.spec(), - table.locationProvider(), table.io(), table.encryption(), targetFileSize, format, table.properties()); + private TaskWriter createTaskWriter(long targetFileSize) { + TaskWriterFactory taskWriterFactory = new RowDataTaskWriterFactory(table.schema(), + (RowType) SimpleDataUtil.FLINK_SCHEMA.toRowDataType().getLogicalType(), table.spec(), + table.locationProvider(), table.io(), table.encryption(), + targetFileSize, format, table.properties()); taskWriterFactory.initialize(1, 1); return taskWriterFactory.create(); } diff --git a/flink/src/test/java/org/apache/iceberg/flink/data/RandomRowData.java b/flink/src/test/java/org/apache/iceberg/flink/data/RandomRowData.java new file mode 100644 index 000000000000..b6fee9259f53 --- /dev/null +++ b/flink/src/test/java/org/apache/iceberg/flink/data/RandomRowData.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.data; + +import org.apache.flink.table.data.RowData; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.RandomGenericData; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.flink.RowDataConverter; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; + +public class RandomRowData { + private RandomRowData() { + } + + public static Iterable generate(Schema schema, int numRecords, long seed) { + return convert(schema, RandomGenericData.generate(schema, numRecords, seed)); + } + + public static Iterable convert(Schema schema, Iterable records) { + return Iterables.transform(records, record -> RowDataConverter.convert(schema, record)); + } +}