diff --git a/flink/src/main/java/org/apache/iceberg/flink/FlinkFixupTypes.java b/flink/src/main/java/org/apache/iceberg/flink/FlinkFixupTypes.java
new file mode 100644
index 000000000000..6501c0226e44
--- /dev/null
+++ b/flink/src/main/java/org/apache/iceberg/flink/FlinkFixupTypes.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.types.FixupTypes;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types;
+
+/**
+ * The uuid and fixed are converted to the same Flink type. Conversion back can produce only one,
+ * which may not be correct.
+ */
+class FlinkFixupTypes extends FixupTypes {
+
+ private FlinkFixupTypes(Schema referenceSchema) {
+ super(referenceSchema);
+ }
+
+ static Schema fixup(Schema schema, Schema referenceSchema) {
+ return new Schema(TypeUtil.visit(schema,
+ new FlinkFixupTypes(referenceSchema)).asStructType().fields());
+ }
+
+ @Override
+ protected boolean fixupPrimitive(Type.PrimitiveType type, Type source) {
+ if (type instanceof Types.FixedType) {
+ int length = ((Types.FixedType) type).length();
+ return source.typeId() == Type.TypeID.UUID && length == 16;
+ }
+ return false;
+ }
+}
diff --git a/flink/src/main/java/org/apache/iceberg/flink/FlinkSchemaUtil.java b/flink/src/main/java/org/apache/iceberg/flink/FlinkSchemaUtil.java
index 90534d714630..fa871e505129 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/FlinkSchemaUtil.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/FlinkSchemaUtil.java
@@ -27,6 +27,7 @@
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types;
/**
* Converter between Flink types and Iceberg type.
@@ -63,6 +64,28 @@ public static Schema convert(TableSchema schema) {
return new Schema(converted.asStructType().fields());
}
+ /**
+ * Convert a Flink {@link TableSchema} to a {@link Schema} based on the given schema.
+ *
+ * This conversion does not assign new ids; it uses ids from the base schema.
+ *
+ * Data types, field order, and nullability will match the Flink type. This conversion may return
+ * a schema that is not compatible with base schema.
+ *
+ * @param baseSchema a Schema on which conversion is based
+ * @param flinkSchema a Flink TableSchema
+ * @return the equivalent Schema
+ * @throws IllegalArgumentException if the type cannot be converted or there are missing ids
+ */
+ public static Schema convert(Schema baseSchema, TableSchema flinkSchema) {
+ // convert to a type with fresh ids
+ Types.StructType struct = convert(flinkSchema).asStruct();
+ // reassign ids to match the base schema
+ Schema schema = TypeUtil.reassignIds(new Schema(struct.fields()), baseSchema);
+ // fix types that can't be represented in Flink (UUID)
+ return FlinkFixupTypes.fixup(schema, baseSchema);
+ }
+
/**
* Convert a {@link Schema} to a {@link RowType Flink type}.
*
diff --git a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcReader.java b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcReader.java
index 2f5db1967ef2..4c4e2050263b 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcReader.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcReader.java
@@ -39,18 +39,14 @@
public class FlinkOrcReader implements OrcRowReader {
private final OrcValueReader> reader;
- private FlinkOrcReader(Schema iSchema, TypeDescription readSchema) {
+ public FlinkOrcReader(Schema iSchema, TypeDescription readSchema) {
this(iSchema, readSchema, ImmutableMap.of());
}
- private FlinkOrcReader(Schema iSchema, TypeDescription readSchema, Map idToConstant) {
+ public FlinkOrcReader(Schema iSchema, TypeDescription readSchema, Map idToConstant) {
this.reader = OrcSchemaWithTypeVisitor.visit(iSchema, readSchema, new ReadBuilder(idToConstant));
}
- public static OrcRowReader buildReader(Schema schema, TypeDescription readSchema) {
- return new FlinkOrcReader(schema, readSchema);
- }
-
@Override
public RowData read(VectorizedRowBatch batch, int row) {
return (RowData) reader.read(new StructColumnVector(batch.size, batch.cols), row);
diff --git a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcReaders.java b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcReaders.java
index a434bddfe265..744a05eb2d21 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcReaders.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcReaders.java
@@ -36,6 +36,7 @@
import org.apache.flink.table.data.TimestampData;
import org.apache.iceberg.orc.OrcValueReader;
import org.apache.iceberg.orc.OrcValueReaders;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types;
@@ -127,6 +128,11 @@ private static class Decimal18Reader implements OrcValueReader {
@Override
public DecimalData nonNullRead(ColumnVector vector, int row) {
HiveDecimalWritable value = ((DecimalColumnVector) vector).vector[row];
+
+ // The hive ORC writer may will adjust the scale of decimal data.
+ Preconditions.checkArgument(value.precision() <= precision,
+ "Cannot read value as decimal(%s,%s), too large: %s", precision, scale, value);
+
return DecimalData.fromUnscaledLong(value.serialize64(scale), precision, scale);
}
}
@@ -143,6 +149,10 @@ private static class Decimal38Reader implements OrcValueReader {
@Override
public DecimalData nonNullRead(ColumnVector vector, int row) {
BigDecimal value = ((DecimalColumnVector) vector).vector[row].getHiveDecimal().bigDecimalValue();
+
+ Preconditions.checkArgument(value.precision() <= precision,
+ "Cannot read value as decimal(%s,%s), too large: %s", precision, scale, value);
+
return DecimalData.fromBigDecimal(value, precision, scale);
}
}
@@ -246,7 +256,7 @@ private static class StructReader extends OrcValueReaders.StructReader
StructReader(List> readers, Types.StructType struct, Map idToConstant) {
super(readers, struct, idToConstant);
- this.numFields = readers.size();
+ this.numFields = struct.fields().size();
}
@Override
diff --git a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
index 3012544cba83..720a842e32a8 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
@@ -52,7 +52,7 @@
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;
-class FlinkParquetReaders {
+public class FlinkParquetReaders {
private FlinkParquetReaders() {
}
diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/DataIterator.java b/flink/src/main/java/org/apache/iceberg/flink/source/DataIterator.java
new file mode 100644
index 000000000000..88a324fda14f
--- /dev/null
+++ b/flink/src/main/java/org/apache/iceberg/flink/source/DataIterator.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.util.Utf8;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.encryption.EncryptedFiles;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.ByteBuffers;
+import org.apache.iceberg.util.DateTimeUtil;
+
+/**
+ * Base class of Flink iterators.
+ *
+ * @param is the Java class returned by this iterator whose objects contain one or more rows.
+ */
+abstract class DataIterator implements CloseableIterator {
+
+ private Iterator tasks;
+ private final FileIO io;
+ private final EncryptionManager encryption;
+
+ private CloseableIterator currentIterator;
+
+ DataIterator(CombinedScanTask task, FileIO io, EncryptionManager encryption) {
+ this.tasks = task.files().iterator();
+ this.io = io;
+ this.encryption = encryption;
+ this.currentIterator = CloseableIterator.empty();
+ }
+
+ InputFile getInputFile(FileScanTask task) {
+ Preconditions.checkArgument(!task.isDataTask(), "Invalid task type");
+ return encryption.decrypt(EncryptedFiles.encryptedInput(
+ io.newInputFile(task.file().path().toString()),
+ task.file().keyMetadata()));
+ }
+
+ @Override
+ public boolean hasNext() {
+ updateCurrentIterator();
+ return currentIterator.hasNext();
+ }
+
+ @Override
+ public T next() {
+ updateCurrentIterator();
+ return currentIterator.next();
+ }
+
+ /**
+ * Updates the current iterator field to ensure that the current Iterator
+ * is not exhausted.
+ */
+ private void updateCurrentIterator() {
+ try {
+ while (!currentIterator.hasNext() && tasks.hasNext()) {
+ currentIterator.close();
+ currentIterator = openTaskIterator(tasks.next());
+ }
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ abstract CloseableIterator openTaskIterator(FileScanTask scanTask) throws IOException;
+
+ @Override
+ public void close() throws IOException {
+ // close the current iterator
+ currentIterator.close();
+ tasks = null;
+ }
+
+ static Object convertConstant(Type type, Object value) {
+ if (value == null) {
+ return null;
+ }
+
+ switch (type.typeId()) {
+ case DECIMAL: // DecimalData
+ Types.DecimalType decimal = (Types.DecimalType) type;
+ return DecimalData.fromBigDecimal((BigDecimal) value, decimal.precision(), decimal.scale());
+ case STRING: // StringData
+ if (value instanceof Utf8) {
+ Utf8 utf8 = (Utf8) value;
+ return StringData.fromBytes(utf8.getBytes(), 0, utf8.getByteLength());
+ }
+ return StringData.fromString(value.toString());
+ case FIXED: // byte[]
+ if (value instanceof byte[]) {
+ return value;
+ } else if (value instanceof GenericData.Fixed) {
+ return ((GenericData.Fixed) value).bytes();
+ }
+ return ByteBuffers.toByteArray((ByteBuffer) value);
+ case BINARY: // byte[]
+ return ByteBuffers.toByteArray((ByteBuffer) value);
+ case TIME: // int mills instead of long
+ return (int) ((Long) value / 1000);
+ case TIMESTAMP: // TimestampData
+ return TimestampData.fromLocalDateTime(DateTimeUtil.timestampFromMicros((Long) value));
+ default:
+ }
+ return value;
+ }
+}
diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java
new file mode 100644
index 000000000000..f4a56fd54e57
--- /dev/null
+++ b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.io.RichInputFormat;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.table.data.RowData;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.hadoop.SerializableConfiguration;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Flink {@link InputFormat} for Iceberg.
+ */
+public class FlinkInputFormat extends RichInputFormat {
+
+ private static final long serialVersionUID = 1L;
+
+ private final TableLoader tableLoader;
+ private final Schema projectedSchema;
+ private final ScanOptions options;
+ private final List filterExpressions;
+ private final FileIO io;
+ private final EncryptionManager encryption;
+ private final SerializableConfiguration serializableConf;
+
+ private transient RowDataIterator iterator;
+
+ FlinkInputFormat(
+ TableLoader tableLoader, Schema projectedSchema, FileIO io, EncryptionManager encryption,
+ List filterExpressions, ScanOptions options, SerializableConfiguration serializableConf) {
+ this.tableLoader = tableLoader;
+ this.projectedSchema = projectedSchema;
+ this.io = io;
+ this.encryption = encryption;
+ this.filterExpressions = filterExpressions;
+ this.options = options;
+ this.serializableConf = serializableConf;
+ }
+
+ @VisibleForTesting
+ Schema projectedSchema() {
+ return projectedSchema;
+ }
+
+ @Override
+ public BaseStatistics getStatistics(BaseStatistics cachedStatistics) {
+ // Legacy method, not be used.
+ return null;
+ }
+
+ @Override
+ public FlinkInputSplit[] createInputSplits(int minNumSplits) throws IOException {
+ // Called in Job manager, so it is OK to load table from catalog.
+ tableLoader.open(serializableConf.get());
+ try (TableLoader loader = tableLoader) {
+ Table table = loader.loadTable();
+ FlinkSplitGenerator generator = new FlinkSplitGenerator(table, projectedSchema, options, filterExpressions);
+ return generator.createInputSplits();
+ }
+ }
+
+ @Override
+ public InputSplitAssigner getInputSplitAssigner(FlinkInputSplit[] inputSplits) {
+ return new DefaultInputSplitAssigner(inputSplits);
+ }
+
+ @Override
+ public void configure(Configuration parameters) {
+ }
+
+ @Override
+ public void open(FlinkInputSplit split) {
+ this.iterator = new RowDataIterator(
+ split.getTask(), io, encryption, projectedSchema, options.nameMapping(), options.caseSensitive());
+ }
+
+ @Override
+ public boolean reachedEnd() {
+ return !iterator.hasNext();
+ }
+
+ @Override
+ public RowData nextRecord(RowData reuse) {
+ return iterator.next();
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (iterator != null) {
+ iterator.close();
+ }
+ }
+}
diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputSplit.java b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputSplit.java
new file mode 100644
index 000000000000..21a2f71ac86d
--- /dev/null
+++ b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputSplit.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source;
+
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.core.io.LocatableInputSplit;
+import org.apache.iceberg.CombinedScanTask;
+
+/**
+ * TODO Implement {@link LocatableInputSplit}.
+ */
+public class FlinkInputSplit implements InputSplit {
+
+ private final int splitNumber;
+ private final CombinedScanTask task;
+
+ FlinkInputSplit(int splitNumber, CombinedScanTask task) {
+ this.splitNumber = splitNumber;
+ this.task = task;
+ }
+
+ @Override
+ public int getSplitNumber() {
+ return splitNumber;
+ }
+
+ CombinedScanTask getTask() {
+ return task;
+ }
+}
diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
new file mode 100644
index 000000000000..6bcc96f254c8
--- /dev/null
+++ b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.List;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.flink.FlinkCatalogFactory;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.hadoop.SerializableConfiguration;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+public class FlinkSource {
+ private FlinkSource() {
+ }
+
+ /**
+ * Initialize a {@link Builder} to read the data from iceberg table. Equivalent to {@link TableScan}.
+ * See more options in {@link ScanOptions}.
+ *
+ * The Source can be read static data in bounded mode. It can also continuously check the arrival of new data and
+ * read records incrementally.
+ * The Bounded and Unbounded depends on the {@link Builder#options(ScanOptions)}:
+ *
+ *
Without startSnapshotId: Bounded
+ *
With startSnapshotId and with endSnapshotId: Bounded
+ *
With startSnapshotId (-1 means unbounded preceding) and Without endSnapshotId: Unbounded
+ *
+ *
+ *
+ * @return {@link Builder} to connect the iceberg table.
+ */
+ public static Builder forRowData() {
+ return new Builder();
+ }
+
+ /**
+ * Source builder to build {@link DataStream}.
+ */
+ public static class Builder {
+ private StreamExecutionEnvironment env;
+ private Table table;
+ private TableLoader tableLoader;
+ private TableSchema projectedSchema;
+ private ScanOptions options = ScanOptions.builder().build();
+ private List filterExpressions;
+ private Configuration hadoopConf;
+
+ private RowDataTypeInfo rowTypeInfo;
+
+ public Builder tableLoader(TableLoader newLoader) {
+ this.tableLoader = newLoader;
+ return this;
+ }
+
+ public Builder table(Table newTable) {
+ this.table = newTable;
+ return this;
+ }
+
+ public Builder filters(List newFilters) {
+ this.filterExpressions = newFilters;
+ return this;
+ }
+
+ public Builder project(TableSchema schema) {
+ this.projectedSchema = schema;
+ return this;
+ }
+
+ public Builder options(ScanOptions newOptions) {
+ this.options = newOptions;
+ return this;
+ }
+
+ public Builder hadoopConf(Configuration newConf) {
+ this.hadoopConf = newConf;
+ return this;
+ }
+
+ public Builder env(StreamExecutionEnvironment newEnv) {
+ this.env = newEnv;
+ return this;
+ }
+
+ public FlinkInputFormat buildFormat() {
+ Preconditions.checkNotNull(tableLoader, "TableLoader should not be null");
+
+ hadoopConf = hadoopConf == null ? FlinkCatalogFactory.clusterHadoopConf() : hadoopConf;
+
+ Schema icebergSchema;
+ FileIO io;
+ EncryptionManager encryption;
+ if (table == null) {
+ // load required fields by table loader.
+ tableLoader.open(hadoopConf);
+ try (TableLoader loader = tableLoader) {
+ table = loader.loadTable();
+ icebergSchema = table.schema();
+ io = table.io();
+ encryption = table.encryption();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ } else {
+ icebergSchema = table.schema();
+ io = table.io();
+ encryption = table.encryption();
+ }
+
+ rowTypeInfo = RowDataTypeInfo.of((RowType) (
+ projectedSchema == null ?
+ FlinkSchemaUtil.toSchema(FlinkSchemaUtil.convert(icebergSchema)) :
+ projectedSchema).toRowDataType().getLogicalType());
+
+ Schema expectedSchema = icebergSchema;
+ if (projectedSchema != null) {
+ expectedSchema = FlinkSchemaUtil.convert(icebergSchema, projectedSchema);
+ }
+
+ return new FlinkInputFormat(tableLoader, expectedSchema, io, encryption, filterExpressions, options,
+ new SerializableConfiguration(hadoopConf));
+ }
+
+ public DataStream build() {
+ Preconditions.checkNotNull(env, "StreamExecutionEnvironment should not be null");
+ FlinkInputFormat format = buildFormat();
+ if (options.startSnapshotId() != null && options.endSnapshotId() == null) {
+ throw new UnsupportedOperationException("The Unbounded mode is not supported yet");
+ } else {
+ return env.createInput(format, rowTypeInfo);
+ }
+ }
+ }
+}
diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitGenerator.java b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitGenerator.java
new file mode 100644
index 000000000000..af22edce10d7
--- /dev/null
+++ b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitGenerator.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.List;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+class FlinkSplitGenerator {
+
+ private final Table table;
+ private final Schema projectedSchema;
+ private final ScanOptions options;
+ private final List filterExpressions;
+
+ FlinkSplitGenerator(Table table, Schema projectedSchema, ScanOptions options, List filterExpressions) {
+ this.table = table;
+ this.projectedSchema = projectedSchema;
+ this.options = options;
+ this.filterExpressions = filterExpressions;
+ }
+
+ FlinkInputSplit[] createInputSplits() {
+ List tasks = tasks();
+ FlinkInputSplit[] splits = new FlinkInputSplit[tasks.size()];
+ for (int i = 0; i < tasks.size(); i++) {
+ splits[i] = new FlinkInputSplit(i, tasks.get(i));
+ }
+ return splits;
+ }
+
+ private List tasks() {
+ TableScan scan = table
+ .newScan()
+ .caseSensitive(options.caseSensitive())
+ .project(projectedSchema);
+
+ if (options.snapshotId() != null) {
+ scan = scan.useSnapshot(options.snapshotId());
+ }
+
+ if (options.asOfTimestamp() != null) {
+ scan = scan.asOfTime(options.asOfTimestamp());
+ }
+
+ if (options.startSnapshotId() != null) {
+ if (options.endSnapshotId() != null) {
+ scan = scan.appendsBetween(options.startSnapshotId(), options.endSnapshotId());
+ } else {
+ scan = scan.appendsAfter(options.startSnapshotId());
+ }
+ }
+
+ if (options.splitSize() != null) {
+ scan = scan.option(TableProperties.SPLIT_SIZE, options.splitSize().toString());
+ }
+
+ if (options.splitLookback() != null) {
+ scan = scan.option(TableProperties.SPLIT_LOOKBACK, options.splitLookback().toString());
+ }
+
+ if (options.splitOpenFileCost() != null) {
+ scan = scan.option(TableProperties.SPLIT_OPEN_FILE_COST, options.splitOpenFileCost().toString());
+ }
+
+ if (filterExpressions != null) {
+ for (Expression filter : filterExpressions) {
+ scan = scan.filter(filter);
+ }
+ }
+
+ try (CloseableIterable tasksIterable = scan.planTasks()) {
+ return Lists.newArrayList(tasksIterable);
+ } catch (IOException e) {
+ throw new UncheckedIOException("Failed to close table scan: " + scan, e);
+ }
+ }
+}
diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/RowDataIterator.java b/flink/src/main/java/org/apache/iceberg/flink/source/RowDataIterator.java
new file mode 100644
index 000000000000..dc7b38dd00a6
--- /dev/null
+++ b/flink/src/main/java/org/apache/iceberg/flink/source/RowDataIterator.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source;
+
+import java.util.Map;
+import org.apache.flink.table.data.RowData;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.flink.data.FlinkAvroReader;
+import org.apache.iceberg.flink.data.FlinkOrcReader;
+import org.apache.iceberg.flink.data.FlinkParquetReaders;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.mapping.NameMappingParser;
+import org.apache.iceberg.orc.ORC;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.util.PartitionUtil;
+
+class RowDataIterator extends DataIterator {
+
+ private final Schema projectedSchema;
+ private final String nameMapping;
+ private final boolean caseSensitive;
+
+ RowDataIterator(CombinedScanTask task, FileIO io, EncryptionManager encryption, Schema projectedSchema,
+ String nameMapping, boolean caseSensitive) {
+ super(task, io, encryption);
+ this.projectedSchema = projectedSchema;
+ this.nameMapping = nameMapping;
+ this.caseSensitive = caseSensitive;
+ }
+
+ @Override
+ protected CloseableIterator openTaskIterator(FileScanTask task) {
+ Schema partitionSchema = TypeUtil.select(projectedSchema, task.spec().identitySourceIds());
+
+ Map idToConstant = partitionSchema.columns().isEmpty() ? ImmutableMap.of() :
+ PartitionUtil.constantsMap(task, RowDataIterator::convertConstant);
+ CloseableIterable iterable = newIterable(task, idToConstant);
+ return iterable.iterator();
+ }
+
+ private CloseableIterable newIterable(FileScanTask task, Map idToConstant) {
+ CloseableIterable iter;
+ if (task.isDataTask()) {
+ throw new UnsupportedOperationException("Cannot read data task.");
+ } else {
+ switch (task.file().format()) {
+ case PARQUET:
+ iter = newParquetIterable(task, idToConstant);
+ break;
+
+ case AVRO:
+ iter = newAvroIterable(task, idToConstant);
+ break;
+
+ case ORC:
+ iter = newOrcIterable(task, idToConstant);
+ break;
+
+ default:
+ throw new UnsupportedOperationException(
+ "Cannot read unknown format: " + task.file().format());
+ }
+ }
+
+ return iter;
+ }
+
+ private CloseableIterable newAvroIterable(FileScanTask task, Map idToConstant) {
+ Avro.ReadBuilder builder = Avro.read(getInputFile(task))
+ .reuseContainers()
+ .project(projectedSchema)
+ .split(task.start(), task.length())
+ .createReaderFunc(readSchema -> new FlinkAvroReader(projectedSchema, readSchema, idToConstant));
+
+ if (nameMapping != null) {
+ builder.withNameMapping(NameMappingParser.fromJson(nameMapping));
+ }
+
+ return builder.build();
+ }
+
+ private CloseableIterable newParquetIterable(FileScanTask task, Map idToConstant) {
+ Parquet.ReadBuilder builder = Parquet.read(getInputFile(task))
+ .split(task.start(), task.length())
+ .project(projectedSchema)
+ .createReaderFunc(fileSchema -> FlinkParquetReaders.buildReader(projectedSchema, fileSchema, idToConstant))
+ .filter(task.residual())
+ .caseSensitive(caseSensitive);
+
+ if (nameMapping != null) {
+ builder.withNameMapping(NameMappingParser.fromJson(nameMapping));
+ }
+
+ return builder.build();
+ }
+
+ private CloseableIterable newOrcIterable(FileScanTask task, Map idToConstant) {
+ Schema readSchemaWithoutConstantAndMetadataFields = TypeUtil.selectNot(projectedSchema,
+ Sets.union(idToConstant.keySet(), MetadataColumns.metadataFieldIds()));
+
+ ORC.ReadBuilder builder = ORC.read(getInputFile(task))
+ .project(readSchemaWithoutConstantAndMetadataFields)
+ .split(task.start(), task.length())
+ .createReaderFunc(readOrcSchema -> new FlinkOrcReader(projectedSchema, readOrcSchema, idToConstant))
+ .filter(task.residual())
+ .caseSensitive(caseSensitive);
+
+ if (nameMapping != null) {
+ builder.withNameMapping(NameMappingParser.fromJson(nameMapping));
+ }
+
+ return builder.build();
+ }
+}
diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/ScanOptions.java b/flink/src/main/java/org/apache/iceberg/flink/source/ScanOptions.java
new file mode 100644
index 000000000000..309b4e58aa08
--- /dev/null
+++ b/flink/src/main/java/org/apache/iceberg/flink/source/ScanOptions.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source;
+
+import java.io.Serializable;
+import java.util.Map;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+
+import static org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING;
+
+public class ScanOptions implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ public static final long UNBOUNDED_PRECEDING = -1L;
+
+ public static final ConfigOption SNAPSHOT_ID =
+ ConfigOptions.key("snapshot-id").longType().defaultValue(null);
+
+ public static final ConfigOption CASE_SENSITIVE =
+ ConfigOptions.key("case-sensitive").booleanType().defaultValue(false);
+
+ public static final ConfigOption AS_OF_TIMESTAMP =
+ ConfigOptions.key("as-of-timestamp").longType().defaultValue(null);
+
+ public static final ConfigOption START_SNAPSHOT_ID =
+ ConfigOptions.key("start-snapshot-id").longType().defaultValue(null);
+
+ public static final ConfigOption END_SNAPSHOT_ID =
+ ConfigOptions.key("end-snapshot-id").longType().defaultValue(null);
+
+ public static final ConfigOption SPLIT_SIZE =
+ ConfigOptions.key("split-size").longType().defaultValue(null);
+
+ public static final ConfigOption SPLIT_LOOKBACK =
+ ConfigOptions.key("split-lookback").intType().defaultValue(null);
+
+ public static final ConfigOption SPLIT_FILE_OPEN_COST =
+ ConfigOptions.key("split-file-open-cost").longType().defaultValue(null);
+
+ private final boolean caseSensitive;
+ private final Long snapshotId;
+ private final Long startSnapshotId;
+ private final Long endSnapshotId;
+ private final Long asOfTimestamp;
+ private final Long splitSize;
+ private final Integer splitLookback;
+ private final Long splitOpenFileCost;
+ private final String nameMapping;
+
+ public ScanOptions(boolean caseSensitive, Long snapshotId, Long startSnapshotId, Long endSnapshotId,
+ Long asOfTimestamp, Long splitSize, Integer splitLookback, Long splitOpenFileCost,
+ String nameMapping) {
+ this.caseSensitive = caseSensitive;
+ this.snapshotId = snapshotId;
+ this.startSnapshotId = startSnapshotId;
+ this.endSnapshotId = endSnapshotId;
+ this.asOfTimestamp = asOfTimestamp;
+ this.splitSize = splitSize;
+ this.splitLookback = splitLookback;
+ this.splitOpenFileCost = splitOpenFileCost;
+ this.nameMapping = nameMapping;
+ }
+
+ public boolean caseSensitive() {
+ return caseSensitive;
+ }
+
+ public Long snapshotId() {
+ return snapshotId;
+ }
+
+ public Long startSnapshotId() {
+ return startSnapshotId;
+ }
+
+ public Long endSnapshotId() {
+ return endSnapshotId;
+ }
+
+ public Long asOfTimestamp() {
+ return asOfTimestamp;
+ }
+
+ public Long splitSize() {
+ return splitSize;
+ }
+
+ public Integer splitLookback() {
+ return splitLookback;
+ }
+
+ public Long splitOpenFileCost() {
+ return splitOpenFileCost;
+ }
+
+ public String nameMapping() {
+ return nameMapping;
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static ScanOptions fromProperties(Map properties) {
+ return builder().options(properties).build();
+ }
+
+ public static final class Builder {
+ private boolean caseSensitive = CASE_SENSITIVE.defaultValue();
+ private Long snapshotId = SNAPSHOT_ID.defaultValue();
+ private Long startSnapshotId = START_SNAPSHOT_ID.defaultValue();
+ private Long endSnapshotId = END_SNAPSHOT_ID.defaultValue();
+ private Long asOfTimestamp = AS_OF_TIMESTAMP.defaultValue();
+ private Long splitSize = SPLIT_SIZE.defaultValue();
+ private Integer splitLookback = SPLIT_LOOKBACK.defaultValue();
+ private Long splitOpenFileCost = SPLIT_FILE_OPEN_COST.defaultValue();
+ private String nameMapping;
+
+ private Builder() {
+ }
+
+ public Builder options(Map properties) {
+ Configuration config = new Configuration();
+ properties.forEach(config::setString);
+ this.caseSensitive = config.get(CASE_SENSITIVE);
+ this.snapshotId = config.get(SNAPSHOT_ID);
+ this.asOfTimestamp = config.get(AS_OF_TIMESTAMP);
+ this.startSnapshotId = config.get(START_SNAPSHOT_ID);
+ this.endSnapshotId = config.get(END_SNAPSHOT_ID);
+ this.splitSize = config.get(SPLIT_SIZE);
+ this.splitLookback = config.get(SPLIT_LOOKBACK);
+ this.splitOpenFileCost = config.get(SPLIT_FILE_OPEN_COST);
+ this.nameMapping = properties.get(DEFAULT_NAME_MAPPING);
+ return this;
+ }
+
+ public Builder caseSensitive(boolean newCaseSensitive) {
+ this.caseSensitive = newCaseSensitive;
+ return this;
+ }
+
+ public Builder snapshotId(Long newSnapshotId) {
+ this.snapshotId = newSnapshotId;
+ return this;
+ }
+
+ public Builder startSnapshotId(Long newStartSnapshotId) {
+ this.startSnapshotId = newStartSnapshotId;
+ return this;
+ }
+
+ public Builder endSnapshotId(Long newEndSnapshotId) {
+ this.endSnapshotId = newEndSnapshotId;
+ return this;
+ }
+
+ public Builder asOfTimestamp(Long newAsOfTimestamp) {
+ this.asOfTimestamp = newAsOfTimestamp;
+ return this;
+ }
+
+ public Builder splitSize(Long newSplitSize) {
+ this.splitSize = newSplitSize;
+ return this;
+ }
+
+ public Builder splitLookback(Integer newSplitLookback) {
+ this.splitLookback = newSplitLookback;
+ return this;
+ }
+
+ public Builder splitOpenFileCost(Long newSplitOpenFileCost) {
+ this.splitOpenFileCost = newSplitOpenFileCost;
+ return this;
+ }
+
+ public Builder nameMapping(String newNameMapping) {
+ this.nameMapping = newNameMapping;
+ return this;
+ }
+
+ public ScanOptions build() {
+ if (snapshotId != null && asOfTimestamp != null) {
+ throw new IllegalArgumentException(
+ "Cannot scan using both snapshot-id and as-of-timestamp to select the table snapshot");
+ }
+
+ if (snapshotId != null || asOfTimestamp != null) {
+ if (startSnapshotId != null || endSnapshotId != null) {
+ throw new IllegalArgumentException(
+ "Cannot specify start-snapshot-id and end-snapshot-id to do incremental scan when either snapshot-id" +
+ " or as-of-timestamp is specified");
+ }
+ } else {
+ if (startSnapshotId == null && endSnapshotId != null) {
+ throw new IllegalArgumentException("Cannot only specify option end-snapshot-id to do incremental scan");
+ }
+ }
+ return new ScanOptions(caseSensitive, snapshotId, startSnapshotId, endSnapshotId, asOfTimestamp, splitSize,
+ splitLookback, splitOpenFileCost, nameMapping);
+ }
+ }
+}
diff --git a/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkOrcReaderWriter.java b/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkOrcReaderWriter.java
index d86469a9115a..987e070f78b6 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkOrcReaderWriter.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkOrcReaderWriter.java
@@ -61,7 +61,7 @@ protected void writeAndValidate(Schema schema) throws IOException {
try (CloseableIterable reader = ORC.read(Files.localInput(recordsFile))
.project(schema)
- .createReaderFunc(type -> FlinkOrcReader.buildReader(schema, type))
+ .createReaderFunc(type -> new FlinkOrcReader(schema, type))
.build()) {
Iterator expected = expectedRecords.iterator();
Iterator rows = reader.iterator();
diff --git a/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java b/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java
new file mode 100644
index 000000000000..d359aab4a8a9
--- /dev/null
+++ b/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.conversion.DataStructureConverter;
+import org.apache.flink.table.data.conversion.DataStructureConverters;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.utils.TypeConversions;
+import org.apache.flink.types.Row;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+/**
+ * Test {@link FlinkInputFormat}.
+ */
+public class TestFlinkInputFormat extends TestFlinkScan {
+
+ private FlinkSource.Builder builder;
+
+ public TestFlinkInputFormat(String fileFormat) {
+ super(fileFormat);
+ }
+
+ @Override
+ public void before() throws IOException {
+ super.before();
+ builder = FlinkSource.forRowData().tableLoader(TableLoader.fromHadoopTable(warehouse + "/default/t"));
+ }
+
+ @Override
+ protected List execute(Table table, List projectFields) throws IOException {
+ Schema projected = new Schema(projectFields.stream().map(f ->
+ table.schema().asStruct().field(f)).collect(Collectors.toList()));
+ return run(builder.project(FlinkSchemaUtil.toSchema(FlinkSchemaUtil.convert(projected))).buildFormat());
+ }
+
+ @Override
+ protected List execute(Table table, ScanOptions options) throws IOException {
+ return run(builder.options(options).buildFormat());
+ }
+
+ @Override
+ protected List execute(Table table, List filters, String sqlFilter) throws IOException {
+ return run(builder.filters(filters).buildFormat());
+ }
+
+ @Override
+ protected void assertResiduals(
+ Schema schema, List results, List writeRecords, List filteredRecords) {
+ // can not filter the data.
+ assertRecords(results, writeRecords, schema);
+ }
+
+ @Override
+ protected void assertNestedProjection(Table table, List records) throws IOException {
+ TableSchema projectedSchema = TableSchema.builder()
+ .field("nested", DataTypes.ROW(DataTypes.FIELD("f2", DataTypes.STRING())))
+ .field("data", DataTypes.STRING()).build();
+ List result = run(builder.project(projectedSchema).buildFormat());
+
+ List expected = Lists.newArrayList();
+ for (Record record : records) {
+ Row nested = Row.of(((Record) record.get(1)).get(1));
+ expected.add(Row.of(nested, record.get(0)));
+ }
+
+ assertRows(result, expected);
+ }
+
+ private List run(FlinkInputFormat inputFormat) throws IOException {
+ FlinkInputSplit[] splits = inputFormat.createInputSplits(0);
+ List results = Lists.newArrayList();
+
+ RowType rowType = FlinkSchemaUtil.convert(inputFormat.projectedSchema());
+
+ DataStructureConverter