diff --git a/flink/src/main/java/org/apache/iceberg/flink/FlinkTableFactory.java b/flink/src/main/java/org/apache/iceberg/flink/FlinkTableFactory.java index 49a94f54833c..0e6ec60c8631 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/FlinkTableFactory.java +++ b/flink/src/main/java/org/apache/iceberg/flink/FlinkTableFactory.java @@ -24,11 +24,13 @@ import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.table.data.RowData; -import org.apache.flink.table.factories.StreamTableSinkFactory; -import org.apache.flink.table.sinks.StreamTableSink; +import org.apache.flink.table.factories.TableSinkFactory; +import org.apache.flink.table.factories.TableSourceFactory; +import org.apache.flink.table.sinks.TableSink; +import org.apache.flink.table.sources.TableSource; import org.apache.flink.table.utils.TableSchemaUtils; -public class FlinkTableFactory implements StreamTableSinkFactory { +public class FlinkTableFactory implements TableSinkFactory, TableSourceFactory { private final FlinkCatalog catalog; public FlinkTableFactory(FlinkCatalog catalog) { @@ -36,10 +38,18 @@ public FlinkTableFactory(FlinkCatalog catalog) { } @Override - public StreamTableSink createTableSink(Context context) { + public TableSource createTableSource(TableSourceFactory.Context context) { ObjectPath objectPath = context.getObjectIdentifier().toObjectPath(); TableLoader tableLoader = createTableLoader(objectPath); - TableSchema tableSchema = getPhysicalSchema(context); + TableSchema tableSchema = TableSchemaUtils.getPhysicalSchema(context.getTable().getSchema()); + return new IcebergTableSource(tableLoader, catalog.getHadoopConf(), tableSchema, context.getTable().getOptions()); + } + + @Override + public TableSink createTableSink(TableSinkFactory.Context context) { + ObjectPath objectPath = context.getObjectIdentifier().toObjectPath(); + TableLoader tableLoader = createTableLoader(objectPath); + TableSchema tableSchema = TableSchemaUtils.getPhysicalSchema(context.getTable().getSchema()); return new IcebergTableSink(context.isBounded(), tableLoader, catalog.getHadoopConf(), tableSchema); } @@ -53,10 +63,6 @@ public List supportedProperties() { throw new UnsupportedOperationException("Iceberg Table Factory can not be loaded from Java SPI"); } - private TableSchema getPhysicalSchema(Context context) { - return TableSchemaUtils.getPhysicalSchema(context.getTable().getSchema()); - } - private TableLoader createTableLoader(ObjectPath objectPath) { return TableLoader.fromCatalog(catalog.getCatalogLoader(), catalog.toIdentifier(objectPath)); } diff --git a/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSource.java b/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSource.java new file mode 100644 index 000000000000..5bbf25d93717 --- /dev/null +++ b/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSource.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink; + +import java.util.Arrays; +import java.util.Map; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.sources.FilterableTableSource; +import org.apache.flink.table.sources.LimitableTableSource; +import org.apache.flink.table.sources.ProjectableTableSource; +import org.apache.flink.table.sources.StreamTableSource; +import org.apache.flink.table.sources.TableSource; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.utils.TableConnectorUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.flink.source.FlinkSource; + +/** + * Flink Iceberg table source. + * TODO: Implement {@link FilterableTableSource} and {@link LimitableTableSource}. + */ +public class IcebergTableSource implements StreamTableSource, ProjectableTableSource { + + private final TableLoader loader; + private final Configuration hadoopConf; + private final TableSchema schema; + private final Map options; + private final int[] projectedFields; + + public IcebergTableSource(TableLoader loader, Configuration hadoopConf, TableSchema schema, + Map options) { + this(loader, hadoopConf, schema, options, null); + } + + private IcebergTableSource(TableLoader loader, Configuration hadoopConf, TableSchema schema, + Map options, int[] projectedFields) { + this.loader = loader; + this.hadoopConf = hadoopConf; + this.schema = schema; + this.options = options; + this.projectedFields = projectedFields; + } + + @Override + public boolean isBounded() { + return FlinkSource.isBounded(options); + } + + @Override + public TableSource projectFields(int[] fields) { + return new IcebergTableSource(loader, hadoopConf, schema, options, fields); + } + + @Override + public DataStream getDataStream(StreamExecutionEnvironment execEnv) { + return FlinkSource.forRowData().env(execEnv).tableLoader(loader).hadoopConf(hadoopConf) + .project(getProjectedSchema()).properties(options).build(); + } + + @Override + public TableSchema getTableSchema() { + return schema; + } + + @Override + public DataType getProducedDataType() { + return getProjectedSchema().toRowDataType().bridgedTo(RowData.class); + } + + private TableSchema getProjectedSchema() { + TableSchema fullSchema = getTableSchema(); + if (projectedFields == null) { + return fullSchema; + } else { + String[] fullNames = fullSchema.getFieldNames(); + DataType[] fullTypes = fullSchema.getFieldDataTypes(); + return TableSchema.builder().fields( + Arrays.stream(projectedFields).mapToObj(i -> fullNames[i]).toArray(String[]::new), + Arrays.stream(projectedFields).mapToObj(i -> fullTypes[i]).toArray(DataType[]::new)).build(); + } + } + + @Override + public String explainSource() { + String explain = "Iceberg table: " + loader.toString(); + if (projectedFields != null) { + explain += ", ProjectedFields: " + Arrays.toString(projectedFields); + } + return TableConnectorUtils.generateRuntimeName(getClass(), getTableSchema().getFieldNames()) + explain; + } +} diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java index f4a56fd54e57..978d36b6634d 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java +++ b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java @@ -20,7 +20,6 @@ package org.apache.iceberg.flink.source; import java.io.IOException; -import java.util.List; import org.apache.flink.api.common.io.DefaultInputSplitAssigner; import org.apache.flink.api.common.io.InputFormat; import org.apache.flink.api.common.io.RichInputFormat; @@ -31,7 +30,6 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.encryption.EncryptionManager; -import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.flink.TableLoader; import org.apache.iceberg.hadoop.SerializableConfiguration; import org.apache.iceberg.io.FileIO; @@ -45,30 +43,25 @@ public class FlinkInputFormat extends RichInputFormat private static final long serialVersionUID = 1L; private final TableLoader tableLoader; - private final Schema projectedSchema; - private final ScanOptions options; - private final List filterExpressions; + private final SerializableConfiguration serializableConf; private final FileIO io; private final EncryptionManager encryption; - private final SerializableConfiguration serializableConf; + private final ScanContext context; private transient RowDataIterator iterator; - FlinkInputFormat( - TableLoader tableLoader, Schema projectedSchema, FileIO io, EncryptionManager encryption, - List filterExpressions, ScanOptions options, SerializableConfiguration serializableConf) { + FlinkInputFormat(TableLoader tableLoader, SerializableConfiguration serializableConf, FileIO io, + EncryptionManager encryption, ScanContext context) { this.tableLoader = tableLoader; - this.projectedSchema = projectedSchema; + this.serializableConf = serializableConf; this.io = io; this.encryption = encryption; - this.filterExpressions = filterExpressions; - this.options = options; - this.serializableConf = serializableConf; + this.context = context; } @VisibleForTesting Schema projectedSchema() { - return projectedSchema; + return context.projectedSchema(); } @Override @@ -83,8 +76,7 @@ public FlinkInputSplit[] createInputSplits(int minNumSplits) throws IOException tableLoader.open(serializableConf.get()); try (TableLoader loader = tableLoader) { Table table = loader.loadTable(); - FlinkSplitGenerator generator = new FlinkSplitGenerator(table, projectedSchema, options, filterExpressions); - return generator.createInputSplits(); + return FlinkSplitGenerator.createInputSplits(table, context); } } @@ -100,7 +92,7 @@ public void configure(Configuration parameters) { @Override public void open(FlinkInputSplit split) { this.iterator = new RowDataIterator( - split.getTask(), io, encryption, projectedSchema, options.nameMapping(), options.caseSensitive()); + split.getTask(), io, encryption, context.projectedSchema(), context.nameMapping(), context.caseSensitive()); } @Override diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java index 6bcc96f254c8..aedc25558ac6 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java +++ b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.util.List; +import java.util.Map; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.TableSchema; @@ -47,11 +48,10 @@ private FlinkSource() { /** * Initialize a {@link Builder} to read the data from iceberg table. Equivalent to {@link TableScan}. - * See more options in {@link ScanOptions}. + * See more options in {@link ScanContext}. *

* The Source can be read static data in bounded mode. It can also continuously check the arrival of new data and * read records incrementally. - * The Bounded and Unbounded depends on the {@link Builder#options(ScanOptions)}: *

    *
  • Without startSnapshotId: Bounded
  • *
  • With startSnapshotId and with endSnapshotId: Bounded
  • @@ -72,10 +72,9 @@ public static class Builder { private StreamExecutionEnvironment env; private Table table; private TableLoader tableLoader; - private TableSchema projectedSchema; - private ScanOptions options = ScanOptions.builder().build(); - private List filterExpressions; private Configuration hadoopConf; + private TableSchema projectedSchema; + private ScanContext context = new ScanContext(); private RowDataTypeInfo rowTypeInfo; @@ -89,8 +88,18 @@ public Builder table(Table newTable) { return this; } - public Builder filters(List newFilters) { - this.filterExpressions = newFilters; + public Builder hadoopConf(Configuration newConf) { + this.hadoopConf = newConf; + return this; + } + + public Builder env(StreamExecutionEnvironment newEnv) { + this.env = newEnv; + return this; + } + + public Builder filters(List filters) { + this.context = context.filterRows(filters); return this; } @@ -99,18 +108,53 @@ public Builder project(TableSchema schema) { return this; } - public Builder options(ScanOptions newOptions) { - this.options = newOptions; + public Builder properties(Map properties) { + this.context = context.fromProperties(properties); return this; } - public Builder hadoopConf(Configuration newConf) { - this.hadoopConf = newConf; + public Builder caseSensitive(boolean caseSensitive) { + this.context = context.setCaseSensitive(caseSensitive); return this; } - public Builder env(StreamExecutionEnvironment newEnv) { - this.env = newEnv; + public Builder snapshotId(Long snapshotId) { + this.context = context.useSnapshotId(snapshotId); + return this; + } + + public Builder startSnapshotId(Long startSnapshotId) { + this.context = context.startSnapshotId(startSnapshotId); + return this; + } + + public Builder endSnapshotId(Long endSnapshotId) { + this.context = context.endSnapshotId(endSnapshotId); + return this; + } + + public Builder asOfTimestamp(Long asOfTimestamp) { + this.context = context.asOfTimestamp(asOfTimestamp); + return this; + } + + public Builder splitSize(Long splitSize) { + this.context = context.splitSize(splitSize); + return this; + } + + public Builder splitLookback(Integer splitLookback) { + this.context = context.splitLookback(splitLookback); + return this; + } + + public Builder splitOpenFileCost(Long splitOpenFileCost) { + this.context = context.splitOpenFileCost(splitOpenFileCost); + return this; + } + + public Builder nameMapping(String nameMapping) { + this.context = context.nameMapping(nameMapping); return this; } @@ -144,23 +188,28 @@ public FlinkInputFormat buildFormat() { FlinkSchemaUtil.toSchema(FlinkSchemaUtil.convert(icebergSchema)) : projectedSchema).toRowDataType().getLogicalType()); - Schema expectedSchema = icebergSchema; - if (projectedSchema != null) { - expectedSchema = FlinkSchemaUtil.convert(icebergSchema, projectedSchema); - } + context = context.project(projectedSchema == null ? icebergSchema : + FlinkSchemaUtil.convert(icebergSchema, projectedSchema)); - return new FlinkInputFormat(tableLoader, expectedSchema, io, encryption, filterExpressions, options, - new SerializableConfiguration(hadoopConf)); + return new FlinkInputFormat(tableLoader, new SerializableConfiguration(hadoopConf), io, encryption, context); } public DataStream build() { Preconditions.checkNotNull(env, "StreamExecutionEnvironment should not be null"); FlinkInputFormat format = buildFormat(); - if (options.startSnapshotId() != null && options.endSnapshotId() == null) { - throw new UnsupportedOperationException("The Unbounded mode is not supported yet"); - } else { + if (isBounded(context)) { return env.createInput(format, rowTypeInfo); + } else { + throw new UnsupportedOperationException("The Unbounded mode is not supported yet"); } } } + + private static boolean isBounded(ScanContext context) { + return context.startSnapshotId() == null || context.endSnapshotId() != null; + } + + public static boolean isBounded(Map properties) { + return isBounded(new ScanContext().fromProperties(properties)); + } } diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitGenerator.java b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitGenerator.java index af22edce10d7..ade4cfb7c957 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitGenerator.java +++ b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitGenerator.java @@ -23,7 +23,6 @@ import java.io.UncheckedIOException; import java.util.List; import org.apache.iceberg.CombinedScanTask; -import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.TableScan; @@ -32,21 +31,11 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; class FlinkSplitGenerator { - - private final Table table; - private final Schema projectedSchema; - private final ScanOptions options; - private final List filterExpressions; - - FlinkSplitGenerator(Table table, Schema projectedSchema, ScanOptions options, List filterExpressions) { - this.table = table; - this.projectedSchema = projectedSchema; - this.options = options; - this.filterExpressions = filterExpressions; + private FlinkSplitGenerator() { } - FlinkInputSplit[] createInputSplits() { - List tasks = tasks(); + static FlinkInputSplit[] createInputSplits(Table table, ScanContext context) { + List tasks = tasks(table, context); FlinkInputSplit[] splits = new FlinkInputSplit[tasks.size()]; for (int i = 0; i < tasks.size(); i++) { splits[i] = new FlinkInputSplit(i, tasks.get(i)); @@ -54,42 +43,42 @@ FlinkInputSplit[] createInputSplits() { return splits; } - private List tasks() { + private static List tasks(Table table, ScanContext context) { TableScan scan = table .newScan() - .caseSensitive(options.caseSensitive()) - .project(projectedSchema); + .caseSensitive(context.caseSensitive()) + .project(context.projectedSchema()); - if (options.snapshotId() != null) { - scan = scan.useSnapshot(options.snapshotId()); + if (context.snapshotId() != null) { + scan = scan.useSnapshot(context.snapshotId()); } - if (options.asOfTimestamp() != null) { - scan = scan.asOfTime(options.asOfTimestamp()); + if (context.asOfTimestamp() != null) { + scan = scan.asOfTime(context.asOfTimestamp()); } - if (options.startSnapshotId() != null) { - if (options.endSnapshotId() != null) { - scan = scan.appendsBetween(options.startSnapshotId(), options.endSnapshotId()); + if (context.startSnapshotId() != null) { + if (context.endSnapshotId() != null) { + scan = scan.appendsBetween(context.startSnapshotId(), context.endSnapshotId()); } else { - scan = scan.appendsAfter(options.startSnapshotId()); + scan = scan.appendsAfter(context.startSnapshotId()); } } - if (options.splitSize() != null) { - scan = scan.option(TableProperties.SPLIT_SIZE, options.splitSize().toString()); + if (context.splitSize() != null) { + scan = scan.option(TableProperties.SPLIT_SIZE, context.splitSize().toString()); } - if (options.splitLookback() != null) { - scan = scan.option(TableProperties.SPLIT_LOOKBACK, options.splitLookback().toString()); + if (context.splitLookback() != null) { + scan = scan.option(TableProperties.SPLIT_LOOKBACK, context.splitLookback().toString()); } - if (options.splitOpenFileCost() != null) { - scan = scan.option(TableProperties.SPLIT_OPEN_FILE_COST, options.splitOpenFileCost().toString()); + if (context.splitOpenFileCost() != null) { + scan = scan.option(TableProperties.SPLIT_OPEN_FILE_COST, context.splitOpenFileCost().toString()); } - if (filterExpressions != null) { - for (Expression filter : filterExpressions) { + if (context.filterExpressions() != null) { + for (Expression filter : context.filterExpressions()) { scan = scan.filter(filter); } } diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/RowDataIterator.java b/flink/src/main/java/org/apache/iceberg/flink/source/RowDataIterator.java index dc7b38dd00a6..c7cc8bf2196f 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/source/RowDataIterator.java +++ b/flink/src/main/java/org/apache/iceberg/flink/source/RowDataIterator.java @@ -108,6 +108,7 @@ private CloseableIterable newAvroIterable(FileScanTask task, Map newParquetIterable(FileScanTask task, Map idToConstant) { Parquet.ReadBuilder builder = Parquet.read(getInputFile(task)) + .reuseContainers() .split(task.start(), task.length()) .project(projectedSchema) .createReaderFunc(fileSchema -> FlinkParquetReaders.buildReader(projectedSchema, fileSchema, idToConstant)) diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java b/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java new file mode 100644 index 000000000000..b3dba3a822b3 --- /dev/null +++ b/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.iceberg.Schema; +import org.apache.iceberg.expressions.Expression; + +import static org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING; + +/** + * Context object with optional arguments for a Flink Scan. + */ +class ScanContext implements Serializable { + + private static final long serialVersionUID = 1L; + + private static final ConfigOption SNAPSHOT_ID = + ConfigOptions.key("snapshot-id").longType().defaultValue(null); + + private static final ConfigOption CASE_SENSITIVE = + ConfigOptions.key("case-sensitive").booleanType().defaultValue(false); + + private static final ConfigOption AS_OF_TIMESTAMP = + ConfigOptions.key("as-of-timestamp").longType().defaultValue(null); + + private static final ConfigOption START_SNAPSHOT_ID = + ConfigOptions.key("start-snapshot-id").longType().defaultValue(null); + + private static final ConfigOption END_SNAPSHOT_ID = + ConfigOptions.key("end-snapshot-id").longType().defaultValue(null); + + private static final ConfigOption SPLIT_SIZE = + ConfigOptions.key("split-size").longType().defaultValue(null); + + private static final ConfigOption SPLIT_LOOKBACK = + ConfigOptions.key("split-lookback").intType().defaultValue(null); + + private static final ConfigOption SPLIT_FILE_OPEN_COST = + ConfigOptions.key("split-file-open-cost").longType().defaultValue(null); + + private final boolean caseSensitive; + private final Long snapshotId; + private final Long startSnapshotId; + private final Long endSnapshotId; + private final Long asOfTimestamp; + private final Long splitSize; + private final Integer splitLookback; + private final Long splitOpenFileCost; + private final String nameMapping; + private final Schema projectedSchema; + private final List filterExpressions; + + ScanContext() { + this.caseSensitive = CASE_SENSITIVE.defaultValue(); + this.snapshotId = SNAPSHOT_ID.defaultValue(); + this.startSnapshotId = START_SNAPSHOT_ID.defaultValue(); + this.endSnapshotId = END_SNAPSHOT_ID.defaultValue(); + this.asOfTimestamp = AS_OF_TIMESTAMP.defaultValue(); + this.splitSize = SPLIT_SIZE.defaultValue(); + this.splitLookback = SPLIT_LOOKBACK.defaultValue(); + this.splitOpenFileCost = SPLIT_FILE_OPEN_COST.defaultValue(); + this.nameMapping = null; + this.projectedSchema = null; + this.filterExpressions = null; + } + + private ScanContext(boolean caseSensitive, Long snapshotId, Long startSnapshotId, Long endSnapshotId, + Long asOfTimestamp, Long splitSize, Integer splitLookback, Long splitOpenFileCost, + String nameMapping, Schema projectedSchema, List filterExpressions) { + this.caseSensitive = caseSensitive; + this.snapshotId = snapshotId; + this.startSnapshotId = startSnapshotId; + this.endSnapshotId = endSnapshotId; + this.asOfTimestamp = asOfTimestamp; + this.splitSize = splitSize; + this.splitLookback = splitLookback; + this.splitOpenFileCost = splitOpenFileCost; + this.nameMapping = nameMapping; + this.projectedSchema = projectedSchema; + this.filterExpressions = filterExpressions; + } + + ScanContext fromProperties(Map properties) { + Configuration config = new Configuration(); + properties.forEach(config::setString); + return new ScanContext(config.get(CASE_SENSITIVE), config.get(SNAPSHOT_ID), config.get(START_SNAPSHOT_ID), + config.get(END_SNAPSHOT_ID), config.get(AS_OF_TIMESTAMP), config.get(SPLIT_SIZE), config.get(SPLIT_LOOKBACK), + config.get(SPLIT_FILE_OPEN_COST), properties.get(DEFAULT_NAME_MAPPING), projectedSchema, filterExpressions); + } + + boolean caseSensitive() { + return caseSensitive; + } + + ScanContext setCaseSensitive(boolean isCaseSensitive) { + return new ScanContext(isCaseSensitive, snapshotId, startSnapshotId, endSnapshotId, asOfTimestamp, splitSize, + splitLookback, splitOpenFileCost, nameMapping, projectedSchema, filterExpressions); + } + + Long snapshotId() { + return snapshotId; + } + + ScanContext useSnapshotId(Long scanSnapshotId) { + return new ScanContext(caseSensitive, scanSnapshotId, startSnapshotId, endSnapshotId, asOfTimestamp, splitSize, + splitLookback, splitOpenFileCost, nameMapping, projectedSchema, filterExpressions); + } + + Long startSnapshotId() { + return startSnapshotId; + } + + ScanContext startSnapshotId(Long id) { + return new ScanContext(caseSensitive, snapshotId, id, endSnapshotId, asOfTimestamp, splitSize, splitLookback, + splitOpenFileCost, nameMapping, projectedSchema, filterExpressions); + } + + Long endSnapshotId() { + return endSnapshotId; + } + + ScanContext endSnapshotId(Long id) { + return new ScanContext(caseSensitive, snapshotId, startSnapshotId, id, asOfTimestamp, splitSize, splitLookback, + splitOpenFileCost, nameMapping, projectedSchema, filterExpressions); + } + + Long asOfTimestamp() { + return asOfTimestamp; + } + + ScanContext asOfTimestamp(Long timestamp) { + return new ScanContext(caseSensitive, snapshotId, startSnapshotId, endSnapshotId, timestamp, splitSize, + splitLookback, splitOpenFileCost, nameMapping, projectedSchema, filterExpressions); + } + + Long splitSize() { + return splitSize; + } + + ScanContext splitSize(Long size) { + return new ScanContext(caseSensitive, snapshotId, startSnapshotId, endSnapshotId, asOfTimestamp, size, + splitLookback, splitOpenFileCost, nameMapping, projectedSchema, filterExpressions); + } + + Integer splitLookback() { + return splitLookback; + } + + ScanContext splitLookback(Integer lookback) { + return new ScanContext(caseSensitive, snapshotId, startSnapshotId, endSnapshotId, asOfTimestamp, splitSize, + lookback, splitOpenFileCost, nameMapping, projectedSchema, filterExpressions); + } + + Long splitOpenFileCost() { + return splitOpenFileCost; + } + + ScanContext splitOpenFileCost(Long fileCost) { + return new ScanContext(caseSensitive, snapshotId, startSnapshotId, endSnapshotId, asOfTimestamp, splitSize, + splitLookback, fileCost, nameMapping, projectedSchema, filterExpressions); + } + + String nameMapping() { + return nameMapping; + } + + ScanContext nameMapping(String mapping) { + return new ScanContext(caseSensitive, snapshotId, startSnapshotId, endSnapshotId, asOfTimestamp, splitSize, + splitLookback, splitOpenFileCost, mapping, projectedSchema, filterExpressions); + } + + Schema projectedSchema() { + return projectedSchema; + } + + ScanContext project(Schema schema) { + return new ScanContext(caseSensitive, snapshotId, startSnapshotId, endSnapshotId, asOfTimestamp, splitSize, + splitLookback, splitOpenFileCost, nameMapping, schema, filterExpressions); + } + + List filterExpressions() { + return filterExpressions; + } + + ScanContext filterRows(List filters) { + return new ScanContext(caseSensitive, snapshotId, startSnapshotId, endSnapshotId, asOfTimestamp, splitSize, + splitLookback, splitOpenFileCost, nameMapping, projectedSchema, filters); + } +} diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/ScanOptions.java b/flink/src/main/java/org/apache/iceberg/flink/source/ScanOptions.java deleted file mode 100644 index 309b4e58aa08..000000000000 --- a/flink/src/main/java/org/apache/iceberg/flink/source/ScanOptions.java +++ /dev/null @@ -1,223 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iceberg.flink.source; - -import java.io.Serializable; -import java.util.Map; -import org.apache.flink.configuration.ConfigOption; -import org.apache.flink.configuration.ConfigOptions; -import org.apache.flink.configuration.Configuration; - -import static org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING; - -public class ScanOptions implements Serializable { - - private static final long serialVersionUID = 1L; - - public static final long UNBOUNDED_PRECEDING = -1L; - - public static final ConfigOption SNAPSHOT_ID = - ConfigOptions.key("snapshot-id").longType().defaultValue(null); - - public static final ConfigOption CASE_SENSITIVE = - ConfigOptions.key("case-sensitive").booleanType().defaultValue(false); - - public static final ConfigOption AS_OF_TIMESTAMP = - ConfigOptions.key("as-of-timestamp").longType().defaultValue(null); - - public static final ConfigOption START_SNAPSHOT_ID = - ConfigOptions.key("start-snapshot-id").longType().defaultValue(null); - - public static final ConfigOption END_SNAPSHOT_ID = - ConfigOptions.key("end-snapshot-id").longType().defaultValue(null); - - public static final ConfigOption SPLIT_SIZE = - ConfigOptions.key("split-size").longType().defaultValue(null); - - public static final ConfigOption SPLIT_LOOKBACK = - ConfigOptions.key("split-lookback").intType().defaultValue(null); - - public static final ConfigOption SPLIT_FILE_OPEN_COST = - ConfigOptions.key("split-file-open-cost").longType().defaultValue(null); - - private final boolean caseSensitive; - private final Long snapshotId; - private final Long startSnapshotId; - private final Long endSnapshotId; - private final Long asOfTimestamp; - private final Long splitSize; - private final Integer splitLookback; - private final Long splitOpenFileCost; - private final String nameMapping; - - public ScanOptions(boolean caseSensitive, Long snapshotId, Long startSnapshotId, Long endSnapshotId, - Long asOfTimestamp, Long splitSize, Integer splitLookback, Long splitOpenFileCost, - String nameMapping) { - this.caseSensitive = caseSensitive; - this.snapshotId = snapshotId; - this.startSnapshotId = startSnapshotId; - this.endSnapshotId = endSnapshotId; - this.asOfTimestamp = asOfTimestamp; - this.splitSize = splitSize; - this.splitLookback = splitLookback; - this.splitOpenFileCost = splitOpenFileCost; - this.nameMapping = nameMapping; - } - - public boolean caseSensitive() { - return caseSensitive; - } - - public Long snapshotId() { - return snapshotId; - } - - public Long startSnapshotId() { - return startSnapshotId; - } - - public Long endSnapshotId() { - return endSnapshotId; - } - - public Long asOfTimestamp() { - return asOfTimestamp; - } - - public Long splitSize() { - return splitSize; - } - - public Integer splitLookback() { - return splitLookback; - } - - public Long splitOpenFileCost() { - return splitOpenFileCost; - } - - public String nameMapping() { - return nameMapping; - } - - public static Builder builder() { - return new Builder(); - } - - public static ScanOptions fromProperties(Map properties) { - return builder().options(properties).build(); - } - - public static final class Builder { - private boolean caseSensitive = CASE_SENSITIVE.defaultValue(); - private Long snapshotId = SNAPSHOT_ID.defaultValue(); - private Long startSnapshotId = START_SNAPSHOT_ID.defaultValue(); - private Long endSnapshotId = END_SNAPSHOT_ID.defaultValue(); - private Long asOfTimestamp = AS_OF_TIMESTAMP.defaultValue(); - private Long splitSize = SPLIT_SIZE.defaultValue(); - private Integer splitLookback = SPLIT_LOOKBACK.defaultValue(); - private Long splitOpenFileCost = SPLIT_FILE_OPEN_COST.defaultValue(); - private String nameMapping; - - private Builder() { - } - - public Builder options(Map properties) { - Configuration config = new Configuration(); - properties.forEach(config::setString); - this.caseSensitive = config.get(CASE_SENSITIVE); - this.snapshotId = config.get(SNAPSHOT_ID); - this.asOfTimestamp = config.get(AS_OF_TIMESTAMP); - this.startSnapshotId = config.get(START_SNAPSHOT_ID); - this.endSnapshotId = config.get(END_SNAPSHOT_ID); - this.splitSize = config.get(SPLIT_SIZE); - this.splitLookback = config.get(SPLIT_LOOKBACK); - this.splitOpenFileCost = config.get(SPLIT_FILE_OPEN_COST); - this.nameMapping = properties.get(DEFAULT_NAME_MAPPING); - return this; - } - - public Builder caseSensitive(boolean newCaseSensitive) { - this.caseSensitive = newCaseSensitive; - return this; - } - - public Builder snapshotId(Long newSnapshotId) { - this.snapshotId = newSnapshotId; - return this; - } - - public Builder startSnapshotId(Long newStartSnapshotId) { - this.startSnapshotId = newStartSnapshotId; - return this; - } - - public Builder endSnapshotId(Long newEndSnapshotId) { - this.endSnapshotId = newEndSnapshotId; - return this; - } - - public Builder asOfTimestamp(Long newAsOfTimestamp) { - this.asOfTimestamp = newAsOfTimestamp; - return this; - } - - public Builder splitSize(Long newSplitSize) { - this.splitSize = newSplitSize; - return this; - } - - public Builder splitLookback(Integer newSplitLookback) { - this.splitLookback = newSplitLookback; - return this; - } - - public Builder splitOpenFileCost(Long newSplitOpenFileCost) { - this.splitOpenFileCost = newSplitOpenFileCost; - return this; - } - - public Builder nameMapping(String newNameMapping) { - this.nameMapping = newNameMapping; - return this; - } - - public ScanOptions build() { - if (snapshotId != null && asOfTimestamp != null) { - throw new IllegalArgumentException( - "Cannot scan using both snapshot-id and as-of-timestamp to select the table snapshot"); - } - - if (snapshotId != null || asOfTimestamp != null) { - if (startSnapshotId != null || endSnapshotId != null) { - throw new IllegalArgumentException( - "Cannot specify start-snapshot-id and end-snapshot-id to do incremental scan when either snapshot-id" + - " or as-of-timestamp is specified"); - } - } else { - if (startSnapshotId == null && endSnapshotId != null) { - throw new IllegalArgumentException("Cannot only specify option end-snapshot-id to do incremental scan"); - } - } - return new ScanOptions(caseSensitive, snapshotId, startSnapshotId, endSnapshotId, asOfTimestamp, splitSize, - splitLookback, splitOpenFileCost, nameMapping); - } - } -} diff --git a/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java b/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java index d359aab4a8a9..4c98b1e3c73e 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java +++ b/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java @@ -21,7 +21,7 @@ import java.io.IOException; import java.util.List; -import java.util.stream.Collectors; +import java.util.Map; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.data.RowData; @@ -32,19 +32,23 @@ import org.apache.flink.types.Row; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.GenericAppenderHelper; +import org.apache.iceberg.data.RandomGenericData; import org.apache.iceberg.data.Record; -import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.TableLoader; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; +import org.junit.Test; + +import static org.apache.iceberg.types.Types.NestedField.required; /** * Test {@link FlinkInputFormat}. */ public class TestFlinkInputFormat extends TestFlinkScan { - private FlinkSource.Builder builder; - public TestFlinkInputFormat(String fileFormat) { super(fileFormat); } @@ -52,42 +56,46 @@ public TestFlinkInputFormat(String fileFormat) { @Override public void before() throws IOException { super.before(); - builder = FlinkSource.forRowData().tableLoader(TableLoader.fromHadoopTable(warehouse + "/default/t")); } - @Override - protected List execute(Table table, List projectFields) throws IOException { - Schema projected = new Schema(projectFields.stream().map(f -> - table.schema().asStruct().field(f)).collect(Collectors.toList())); - return run(builder.project(FlinkSchemaUtil.toSchema(FlinkSchemaUtil.convert(projected))).buildFormat()); + private TableLoader loader() { + return TableLoader.fromHadoopTable(warehouse + "/default/t"); } @Override - protected List execute(Table table, ScanOptions options) throws IOException { - return run(builder.options(options).buildFormat()); + protected List run( + FlinkSource.Builder formatBuilder, Map sqlOptions, String sqlFilter, String... sqlSelectedFields) + throws IOException { + return runFormat(formatBuilder.tableLoader(loader()).buildFormat()); } - @Override - protected List execute(Table table, List filters, String sqlFilter) throws IOException { - return run(builder.filters(filters).buildFormat()); - } + @Test + public void testNestedProjection() throws Exception { + Schema schema = new Schema( + required(1, "data", Types.StringType.get()), + required(2, "nested", Types.StructType.of( + Types.NestedField.required(3, "f1", Types.StringType.get()), + Types.NestedField.required(4, "f2", Types.StringType.get()), + Types.NestedField.required(5, "f3", Types.LongType.get()))), + required(6, "id", Types.LongType.get())); - @Override - protected void assertResiduals( - Schema schema, List results, List writeRecords, List filteredRecords) { - // can not filter the data. - assertRecords(results, writeRecords, schema); - } + Table table = catalog.createTable(TableIdentifier.of("default", "t"), schema); + + List writeRecords = RandomGenericData.generate(schema, 2, 0L); + new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER).appendToTable(writeRecords); + + // Schema: [data, nested[f1, f2, f3], id] + // Projection: [nested.f2, data] + // The Flink SQL output: [f2, data] + // The FlinkInputFormat output: [nested[f2], data] - @Override - protected void assertNestedProjection(Table table, List records) throws IOException { TableSchema projectedSchema = TableSchema.builder() .field("nested", DataTypes.ROW(DataTypes.FIELD("f2", DataTypes.STRING()))) .field("data", DataTypes.STRING()).build(); - List result = run(builder.project(projectedSchema).buildFormat()); + List result = runFormat(FlinkSource.forRowData().tableLoader(loader()).project(projectedSchema).buildFormat()); List expected = Lists.newArrayList(); - for (Record record : records) { + for (Record record : writeRecords) { Row nested = Row.of(((Record) record.get(1)).get(1)); expected.add(Row.of(nested, record.get(0))); } @@ -95,7 +103,7 @@ protected void assertNestedProjection(Table table, List records) throws assertRows(result, expected); } - private List run(FlinkInputFormat inputFormat) throws IOException { + private List runFormat(FlinkInputFormat inputFormat) throws IOException { FlinkInputSplit[] splits = inputFormat.createInputSplits(0); List results = Lists.newArrayList(); diff --git a/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java b/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java index 9b1dbf9dcb29..6d113890651e 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java +++ b/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java @@ -29,6 +29,10 @@ import java.util.Comparator; import java.util.List; import java.util.Locale; +import java.util.Map; +import java.util.Optional; +import org.apache.flink.table.api.TableColumn; +import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.conversion.DataStructureConverter; import org.apache.flink.table.data.conversion.DataStructureConverters; @@ -53,7 +57,9 @@ import org.apache.iceberg.flink.RowDataConverter; import org.apache.iceberg.hadoop.HadoopCatalog; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.DateTimeUtil; import org.junit.Assert; @@ -67,21 +73,21 @@ @RunWith(Parameterized.class) public abstract class TestFlinkScan extends AbstractTestBase { - private static final Schema SCHEMA = new Schema( + protected static final Schema SCHEMA = new Schema( required(1, "data", Types.StringType.get()), required(2, "id", Types.LongType.get()), required(3, "dt", Types.StringType.get())); - private static final PartitionSpec SPEC = PartitionSpec.builderFor(SCHEMA) + protected static final PartitionSpec SPEC = PartitionSpec.builderFor(SCHEMA) .identity("dt") .bucket("id", 1) .build(); - private HadoopCatalog catalog; + protected HadoopCatalog catalog; protected String warehouse; // parametrized variables - private final FileFormat fileFormat; + protected final FileFormat fileFormat; @Parameterized.Parameters(name = "format={0}") public static Object[] parameters() { @@ -102,37 +108,47 @@ public void before() throws IOException { catalog = new HadoopCatalog(conf, warehouse); } - private List execute(Table table) throws IOException { - return execute(table, ScanOptions.builder().build()); + private List runWithProjection(String... projected) throws IOException { + TableSchema.Builder builder = TableSchema.builder(); + TableSchema schema = FlinkSchemaUtil.toSchema(FlinkSchemaUtil.convert( + catalog.loadTable(TableIdentifier.of("default", "t")).schema())); + for (String field : projected) { + TableColumn column = schema.getTableColumn(field).get(); + builder.field(column.getName(), column.getType()); + } + return run(FlinkSource.forRowData().project(builder.build()), Maps.newHashMap(), "", projected); } - protected abstract List execute(Table table, List projectFields) throws IOException; - - protected abstract List execute(Table table, ScanOptions options) throws IOException; + protected List runWithFilter(Expression filter, String sqlFilter) throws IOException { + FlinkSource.Builder builder = FlinkSource.forRowData().filters(Collections.singletonList(filter)); + return run(builder, Maps.newHashMap(), sqlFilter, "*"); + } - protected abstract List execute(Table table, List filters, String sqlFilter) throws IOException; + private List runWithOptions(Map options) throws IOException { + FlinkSource.Builder builder = FlinkSource.forRowData(); + Optional.ofNullable(options.get("snapshot-id")).ifPresent(value -> builder.snapshotId(Long.parseLong(value))); + Optional.ofNullable(options.get("start-snapshot-id")) + .ifPresent(value -> builder.startSnapshotId(Long.parseLong(value))); + Optional.ofNullable(options.get("end-snapshot-id")) + .ifPresent(value -> builder.endSnapshotId(Long.parseLong(value))); + Optional.ofNullable(options.get("as-of-timestamp")) + .ifPresent(value -> builder.asOfTimestamp(Long.parseLong(value))); + return run(builder, options, "", "*"); + } - /** - * The Flink SQL has no residuals, because there will be operator to filter all the data that should be filtered. - * But the FlinkInputFormat can't. - */ - protected abstract void assertResiduals(Schema schema, List results, List writeRecords, - List filteredRecords) throws IOException; + private List run() throws IOException { + return run(FlinkSource.forRowData(), Maps.newHashMap(), "", "*"); + } - /** - * Schema: [data, nested[f1, f2, f3], id] - * Projection: [nested.f2, data] - * The Flink SQL output: [f2, data] - * The FlinkInputFormat output: [nested[f2], data]. - */ - protected abstract void assertNestedProjection(Table table, List records) throws IOException; + protected abstract List run(FlinkSource.Builder formatBuilder, Map sqlOptions, String sqlFilter, + String... sqlSelectedFields) throws IOException; @Test public void testUnpartitionedTable() throws Exception { Table table = catalog.createTable(TableIdentifier.of("default", "t"), SCHEMA); List expectedRecords = RandomGenericData.generate(SCHEMA, 2, 0L); new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER).appendToTable(expectedRecords); - assertRecords(execute(table), expectedRecords, SCHEMA); + assertRecords(run(), expectedRecords, SCHEMA); } @Test @@ -142,7 +158,7 @@ public void testPartitionedTable() throws Exception { expectedRecords.get(0).set(2, "2020-03-20"); new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER).appendToTable( org.apache.iceberg.TestHelpers.Row.of("2020-03-20", 0), expectedRecords); - assertRecords(execute(table), expectedRecords, SCHEMA); + assertRecords(run(), expectedRecords, SCHEMA); } @Test @@ -151,7 +167,7 @@ public void testProjection() throws Exception { List inputRecords = RandomGenericData.generate(SCHEMA, 1, 0L); new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER).appendToTable( org.apache.iceberg.TestHelpers.Row.of("2020-03-20", 0), inputRecords); - assertRows(execute(table, Collections.singletonList("data")), Row.of(inputRecords.get(0).get(0))); + assertRows(runWithProjection("data"), Row.of(inputRecords.get(0).get(0))); } @Test @@ -201,9 +217,9 @@ public void testIdentityPartitionProjections() throws Exception { validateIdentityPartitionProjections(table, Arrays.asList("message", "level", "dt"), inputRecords); } - private void validateIdentityPartitionProjections(Table table, List projectedFields, - List inputRecords) throws IOException { - List rows = execute(table, projectedFields); + private void validateIdentityPartitionProjections( + Table table, List projectedFields, List inputRecords) throws IOException { + List rows = runWithProjection(projectedFields.toArray(new String[0])); for (int pos = 0; pos < inputRecords.size(); pos++) { Record inputRecord = inputRecords.get(pos); @@ -230,13 +246,16 @@ public void testSnapshotReads() throws Exception { long timestampMillis = table.currentSnapshot().timestampMillis(); // produce another timestamp - Thread.sleep(10); + waitUntilAfter(timestampMillis); helper.appendToTable(RandomGenericData.generate(SCHEMA, 1, 0L)); assertRecords( - execute(table, ScanOptions.builder().snapshotId(snapshotId).build()), expectedRecords, SCHEMA); + runWithOptions(ImmutableMap.builder().put("snapshot-id", Long.toString(snapshotId)).build()), + expectedRecords, SCHEMA); assertRecords( - execute(table, ScanOptions.builder().asOfTimestamp(timestampMillis).build()), expectedRecords, SCHEMA); + runWithOptions( + ImmutableMap.builder().put("as-of-timestamp", Long.toString(timestampMillis)).build()), + expectedRecords, SCHEMA); } @Test @@ -258,19 +277,14 @@ public void testIncrementalRead() throws Exception { long snapshotId3 = table.currentSnapshot().snapshotId(); // snapshot 4 - List records4 = RandomGenericData.generate(SCHEMA, 1, 0L); helper.appendToTable(RandomGenericData.generate(SCHEMA, 1, 0L)); - List expected1 = Lists.newArrayList(); - expected1.addAll(records2); - expected1.addAll(records3); - expected1.addAll(records4); - assertRecords(execute(table, ScanOptions.builder().startSnapshotId(snapshotId1).build()), expected1, SCHEMA); - List expected2 = Lists.newArrayList(); expected2.addAll(records2); expected2.addAll(records3); - assertRecords(execute(table, ScanOptions.builder().startSnapshotId(snapshotId1).endSnapshotId(snapshotId3).build()), + assertRecords(runWithOptions(ImmutableMap.builder() + .put("start-snapshot-id", Long.toString(snapshotId1)) + .put("end-snapshot-id", Long.toString(snapshotId3)).build()), expected2, SCHEMA); } @@ -287,32 +301,8 @@ public void testFilterExp() throws Exception { DataFile dataFile2 = helper.writeFile(TestHelpers.Row.of("2020-03-21", 0), RandomGenericData.generate(SCHEMA, 2, 0L)); helper.appendToTable(dataFile1, dataFile2); - List filters = Collections.singletonList(Expressions.equal("dt", "2020-03-20")); - assertRecords(execute(table, filters, "dt='2020-03-20'"), expectedRecords, SCHEMA); - } - - @Test - public void testResiduals() throws Exception { - Table table = catalog.createTable(TableIdentifier.of("default", "t"), SCHEMA, SPEC); - - List writeRecords = RandomGenericData.generate(SCHEMA, 2, 0L); - writeRecords.get(0).set(1, 123L); - writeRecords.get(0).set(2, "2020-03-20"); - writeRecords.get(1).set(1, 456L); - writeRecords.get(1).set(2, "2020-03-20"); - - GenericAppenderHelper helper = new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER); - - List expectedRecords = Lists.newArrayList(); - expectedRecords.add(writeRecords.get(0)); - - DataFile dataFile1 = helper.writeFile(TestHelpers.Row.of("2020-03-20", 0), writeRecords); - DataFile dataFile2 = helper.writeFile(TestHelpers.Row.of("2020-03-21", 0), - RandomGenericData.generate(SCHEMA, 2, 0L)); - helper.appendToTable(dataFile1, dataFile2); - - List filters = Arrays.asList(Expressions.equal("dt", "2020-03-20"), Expressions.equal("id", 123)); - assertResiduals(SCHEMA, execute(table, filters, "dt='2020-03-20' and id=123"), writeRecords, expectedRecords); + assertRecords(runWithFilter(Expressions.equal("dt", "2020-03-20"), "where dt='2020-03-20'"), expectedRecords, + SCHEMA); } @Test @@ -343,25 +333,7 @@ public void testPartitionTypes() throws Exception { appender.appendToTable(partition, Collections.singletonList(record)); } - assertRecords(execute(table), records, typesSchema); - } - - @Test - public void testNestedProjection() throws Exception { - Schema schema = new Schema( - required(1, "data", Types.StringType.get()), - required(2, "nested", Types.StructType.of( - Types.NestedField.required(3, "f1", Types.StringType.get()), - Types.NestedField.required(4, "f2", Types.StringType.get()), - Types.NestedField.required(5, "f3", Types.LongType.get()))), - required(6, "id", Types.LongType.get())); - - Table table = catalog.createTable(TableIdentifier.of("default", "t"), schema); - - List writeRecords = RandomGenericData.generate(schema, 2, 0L); - new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER).appendToTable(writeRecords); - - assertNestedProjection(table, writeRecords); + assertRecords(run(), records, typesSchema); } static void assertRecords(List results, List expectedRecords, Schema schema) { @@ -382,4 +354,11 @@ static void assertRows(List results, List expected) { results.sort(Comparator.comparing(Row::toString)); Assert.assertEquals(expected, results); } + + private static void waitUntilAfter(long timestampMillis) { + long current = System.currentTimeMillis(); + while (current <= timestampMillis) { + current = System.currentTimeMillis(); + } + } } diff --git a/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScanSql.java b/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScanSql.java new file mode 100644 index 000000000000..4231e335bd2b --- /dev/null +++ b/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScanSql.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.config.TableConfigOptions; +import org.apache.flink.types.Row; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.Table; +import org.apache.iceberg.TestHelpers; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.GenericAppenderHelper; +import org.apache.iceberg.data.RandomGenericData; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.junit.Test; + +/** + * Test Flink SELECT SQLs. + */ +public class TestFlinkScanSql extends TestFlinkScan { + + private TableEnvironment tEnv; + + public TestFlinkScanSql(String fileFormat) { + super(fileFormat); + } + + @Override + public void before() throws IOException { + super.before(); + tEnv = TableEnvironment.create(EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()); + tEnv.executeSql(String.format( + "create catalog iceberg_catalog with ('type'='iceberg', 'catalog-type'='hadoop', 'warehouse'='%s')", + warehouse)); + tEnv.executeSql("use catalog iceberg_catalog"); + tEnv.getConfig().getConfiguration().set(TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED, true); + } + + @Override + protected List run(FlinkSource.Builder formatBuilder, Map sqlOptions, String sqlFilter, + String... sqlSelectedFields) { + String select = String.join(",", sqlSelectedFields); + + StringBuilder builder = new StringBuilder(); + sqlOptions.forEach((key, value) -> builder.append(optionToKv(key, value)).append(",")); + + String optionStr = builder.toString(); + + if (optionStr.endsWith(",")) { + optionStr = optionStr.substring(0, optionStr.length() - 1); + } + + if (!optionStr.isEmpty()) { + optionStr = String.format("/*+ OPTIONS(%s)*/", optionStr); + } + + String sql = String.format("select %s from t %s %s", select, optionStr, sqlFilter); + return executeSQL(sql); + } + + @Test + public void testResiduals() throws Exception { + Table table = catalog.createTable(TableIdentifier.of("default", "t"), SCHEMA, SPEC); + + List writeRecords = RandomGenericData.generate(SCHEMA, 2, 0L); + writeRecords.get(0).set(1, 123L); + writeRecords.get(0).set(2, "2020-03-20"); + writeRecords.get(1).set(1, 456L); + writeRecords.get(1).set(2, "2020-03-20"); + + GenericAppenderHelper helper = new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER); + + List expectedRecords = Lists.newArrayList(); + expectedRecords.add(writeRecords.get(0)); + + DataFile dataFile1 = helper.writeFile(TestHelpers.Row.of("2020-03-20", 0), writeRecords); + DataFile dataFile2 = helper.writeFile(TestHelpers.Row.of("2020-03-21", 0), + RandomGenericData.generate(SCHEMA, 2, 0L)); + helper.appendToTable(dataFile1, dataFile2); + + Expression filter = Expressions.and(Expressions.equal("dt", "2020-03-20"), Expressions.equal("id", 123)); + assertRecords(runWithFilter(filter, "where dt='2020-03-20' and id=123"), expectedRecords, SCHEMA); + } + + private List executeSQL(String sql) { + return Lists.newArrayList(tEnv.executeSql(sql).collect()); + } + + private String optionToKv(String key, Object value) { + return "'" + key + "'='" + value + "'"; + } +}