Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,32 @@
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.StreamTableSinkFactory;
import org.apache.flink.table.sinks.StreamTableSink;
import org.apache.flink.table.factories.TableSinkFactory;
import org.apache.flink.table.factories.TableSourceFactory;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.sources.TableSource;
import org.apache.flink.table.utils.TableSchemaUtils;

public class FlinkTableFactory implements StreamTableSinkFactory<RowData> {
public class FlinkTableFactory implements TableSinkFactory<RowData>, TableSourceFactory<RowData> {
private final FlinkCatalog catalog;

public FlinkTableFactory(FlinkCatalog catalog) {
this.catalog = catalog;
}

@Override
public StreamTableSink<RowData> createTableSink(Context context) {
public TableSource<RowData> createTableSource(TableSourceFactory.Context context) {
ObjectPath objectPath = context.getObjectIdentifier().toObjectPath();
TableLoader tableLoader = createTableLoader(objectPath);
TableSchema tableSchema = getPhysicalSchema(context);
TableSchema tableSchema = TableSchemaUtils.getPhysicalSchema(context.getTable().getSchema());
return new IcebergTableSource(tableLoader, catalog.getHadoopConf(), tableSchema, context.getTable().getOptions());
}

@Override
public TableSink<RowData> createTableSink(TableSinkFactory.Context context) {
ObjectPath objectPath = context.getObjectIdentifier().toObjectPath();
TableLoader tableLoader = createTableLoader(objectPath);
TableSchema tableSchema = TableSchemaUtils.getPhysicalSchema(context.getTable().getSchema());
return new IcebergTableSink(context.isBounded(), tableLoader, catalog.getHadoopConf(), tableSchema);
}

Expand All @@ -53,10 +63,6 @@ public List<String> supportedProperties() {
throw new UnsupportedOperationException("Iceberg Table Factory can not be loaded from Java SPI");
}

private TableSchema getPhysicalSchema(Context context) {
return TableSchemaUtils.getPhysicalSchema(context.getTable().getSchema());
}

private TableLoader createTableLoader(ObjectPath objectPath) {
return TableLoader.fromCatalog(catalog.getCatalogLoader(), catalog.toIdentifier(objectPath));
}
Expand Down
111 changes: 111 additions & 0 deletions flink/src/main/java/org/apache/iceberg/flink/IcebergTableSource.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.flink;

import java.util.Arrays;
import java.util.Map;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.sources.FilterableTableSource;
import org.apache.flink.table.sources.LimitableTableSource;
import org.apache.flink.table.sources.ProjectableTableSource;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.table.sources.TableSource;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.utils.TableConnectorUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.flink.source.FlinkSource;

/**
* Flink Iceberg table source.
* TODO: Implement {@link FilterableTableSource} and {@link LimitableTableSource}.
*/
public class IcebergTableSource implements StreamTableSource<RowData>, ProjectableTableSource<RowData> {

private final TableLoader loader;
private final Configuration hadoopConf;
private final TableSchema schema;
private final Map<String, String> options;
private final int[] projectedFields;

public IcebergTableSource(TableLoader loader, Configuration hadoopConf, TableSchema schema,
Map<String, String> options) {
this(loader, hadoopConf, schema, options, null);
}

private IcebergTableSource(TableLoader loader, Configuration hadoopConf, TableSchema schema,
Map<String, String> options, int[] projectedFields) {
this.loader = loader;
this.hadoopConf = hadoopConf;
this.schema = schema;
this.options = options;
this.projectedFields = projectedFields;
}

@Override
public boolean isBounded() {
return FlinkSource.isBounded(options);
}

@Override
public TableSource<RowData> projectFields(int[] fields) {
return new IcebergTableSource(loader, hadoopConf, schema, options, fields);
}

@Override
public DataStream<RowData> getDataStream(StreamExecutionEnvironment execEnv) {
return FlinkSource.forRowData().env(execEnv).tableLoader(loader).hadoopConf(hadoopConf)
.project(getProjectedSchema()).properties(options).build();
}

@Override
public TableSchema getTableSchema() {
return schema;
}

@Override
public DataType getProducedDataType() {
return getProjectedSchema().toRowDataType().bridgedTo(RowData.class);
}

private TableSchema getProjectedSchema() {
TableSchema fullSchema = getTableSchema();
if (projectedFields == null) {
return fullSchema;
} else {
String[] fullNames = fullSchema.getFieldNames();
DataType[] fullTypes = fullSchema.getFieldDataTypes();
return TableSchema.builder().fields(
Arrays.stream(projectedFields).mapToObj(i -> fullNames[i]).toArray(String[]::new),
Arrays.stream(projectedFields).mapToObj(i -> fullTypes[i]).toArray(DataType[]::new)).build();
}
}

@Override
public String explainSource() {
String explain = "Iceberg table: " + loader.toString();
if (projectedFields != null) {
explain += ", ProjectedFields: " + Arrays.toString(projectedFields);
}
return TableConnectorUtils.generateRuntimeName(getClass(), getTableSchema().getFieldNames()) + explain;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.apache.iceberg.flink.source;

import java.io.IOException;
import java.util.List;
import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.io.RichInputFormat;
Expand All @@ -31,7 +30,6 @@
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.hadoop.SerializableConfiguration;
import org.apache.iceberg.io.FileIO;
Expand All @@ -45,30 +43,25 @@ public class FlinkInputFormat extends RichInputFormat<RowData, FlinkInputSplit>
private static final long serialVersionUID = 1L;

private final TableLoader tableLoader;
private final Schema projectedSchema;
private final ScanOptions options;
private final List<Expression> filterExpressions;
private final SerializableConfiguration serializableConf;
private final FileIO io;
private final EncryptionManager encryption;
private final SerializableConfiguration serializableConf;
private final ScanContext context;

private transient RowDataIterator iterator;

FlinkInputFormat(
TableLoader tableLoader, Schema projectedSchema, FileIO io, EncryptionManager encryption,
List<Expression> filterExpressions, ScanOptions options, SerializableConfiguration serializableConf) {
FlinkInputFormat(TableLoader tableLoader, SerializableConfiguration serializableConf, FileIO io,
EncryptionManager encryption, ScanContext context) {
this.tableLoader = tableLoader;
this.projectedSchema = projectedSchema;
this.serializableConf = serializableConf;
this.io = io;
this.encryption = encryption;
this.filterExpressions = filterExpressions;
this.options = options;
this.serializableConf = serializableConf;
this.context = context;
}

@VisibleForTesting
Schema projectedSchema() {
return projectedSchema;
return context.projectedSchema();
}

@Override
Expand All @@ -83,8 +76,7 @@ public FlinkInputSplit[] createInputSplits(int minNumSplits) throws IOException
tableLoader.open(serializableConf.get());
try (TableLoader loader = tableLoader) {
Table table = loader.loadTable();
FlinkSplitGenerator generator = new FlinkSplitGenerator(table, projectedSchema, options, filterExpressions);
return generator.createInputSplits();
return FlinkSplitGenerator.createInputSplits(table, context);
}
}

Expand All @@ -100,7 +92,7 @@ public void configure(Configuration parameters) {
@Override
public void open(FlinkInputSplit split) {
this.iterator = new RowDataIterator(
split.getTask(), io, encryption, projectedSchema, options.nameMapping(), options.caseSensitive());
split.getTask(), io, encryption, context.projectedSchema(), context.nameMapping(), context.caseSensitive());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
import java.util.Map;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableSchema;
Expand All @@ -47,11 +48,10 @@ private FlinkSource() {

/**
* Initialize a {@link Builder} to read the data from iceberg table. Equivalent to {@link TableScan}.
* See more options in {@link ScanOptions}.
* See more options in {@link ScanContext}.
* <p>
* The Source can be read static data in bounded mode. It can also continuously check the arrival of new data and
* read records incrementally.
* The Bounded and Unbounded depends on the {@link Builder#options(ScanOptions)}:
* <ul>
* <li>Without startSnapshotId: Bounded</li>
* <li>With startSnapshotId and with endSnapshotId: Bounded</li>
Expand All @@ -72,10 +72,9 @@ public static class Builder {
private StreamExecutionEnvironment env;
private Table table;
private TableLoader tableLoader;
private TableSchema projectedSchema;
private ScanOptions options = ScanOptions.builder().build();
private List<Expression> filterExpressions;
private Configuration hadoopConf;
private TableSchema projectedSchema;
private ScanContext context = new ScanContext();

private RowDataTypeInfo rowTypeInfo;

Expand All @@ -89,8 +88,18 @@ public Builder table(Table newTable) {
return this;
}

public Builder filters(List<Expression> newFilters) {
this.filterExpressions = newFilters;
public Builder hadoopConf(Configuration newConf) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We try to avoid using Hadoop classes in Iceberg APIs because they are hard to remove. Injecting a Hadoop Configuration is currently done in one place: to instantiate a catalog that requires it.

The catalog creates tables and tables are associated with a FileIO, so the configuration is passed down through that chain. The table's configuration should be used for any table configuration.

MR also has a Hadoop Configuration, but that's required by the API so we can't remove it from the API there. But we still prefer using the table's Configuration when it is needed for components like HadoopFileIO.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for avoiding using Hadoop Configuration.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason why Hadoop conf needs to be passed now is because:

  • JobManager needs the splits of the scan, so it needs to call Table.newScan.
  • So JobManager needs Table object, where can a table be generated? TableLoader -> from Catalog or HadoopTables.
  • The creation of Catalog and HadoopTables needs a Hadoop Configuration.

Maybe we can pass this chain with an Iceberg object (FileIO may looks not good to the creation of catalog)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe CatalogLoader should serialize its own Configuration? It would make sense to pass one when creating a CatalogLoader for HadoopCatalog or HiveCatalog because those need a Configuration to create the catalog.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or maybe this could use the approach from the FlinkCatalogFactory:

  public static Configuration clusterHadoopConf() {
    return HadoopUtils.getHadoopConfiguration(GlobalConfiguration.loadConfiguration());
  }

Using clusterHadoopConf internally here would avoid the need to expose this in the API and we could add Configuration to the loader later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using clusterHadoopConf may not so flexible.
I am +1 for CatalogLoader should serialize its own Configuration. I will create a PR later for source and sink.

this.hadoopConf = newConf;
return this;
}

public Builder env(StreamExecutionEnvironment newEnv) {
this.env = newEnv;
return this;
}

public Builder filters(List<Expression> filters) {
this.context = context.filterRows(filters);
return this;
}

Expand All @@ -99,18 +108,53 @@ public Builder project(TableSchema schema) {
return this;
}

public Builder options(ScanOptions newOptions) {
this.options = newOptions;
public Builder properties(Map<String, String> properties) {
this.context = context.fromProperties(properties);
return this;
}

public Builder hadoopConf(Configuration newConf) {
this.hadoopConf = newConf;
public Builder caseSensitive(boolean caseSensitive) {
this.context = context.setCaseSensitive(caseSensitive);
return this;
}

public Builder env(StreamExecutionEnvironment newEnv) {
this.env = newEnv;
public Builder snapshotId(Long snapshotId) {
this.context = context.useSnapshotId(snapshotId);
return this;
}

public Builder startSnapshotId(Long startSnapshotId) {
this.context = context.startSnapshotId(startSnapshotId);
return this;
}

public Builder endSnapshotId(Long endSnapshotId) {
this.context = context.endSnapshotId(endSnapshotId);
return this;
}

public Builder asOfTimestamp(Long asOfTimestamp) {
this.context = context.asOfTimestamp(asOfTimestamp);
return this;
}

public Builder splitSize(Long splitSize) {
this.context = context.splitSize(splitSize);
return this;
}

public Builder splitLookback(Integer splitLookback) {
this.context = context.splitLookback(splitLookback);
return this;
}

public Builder splitOpenFileCost(Long splitOpenFileCost) {
this.context = context.splitOpenFileCost(splitOpenFileCost);
return this;
}

public Builder nameMapping(String nameMapping) {
this.context = context.nameMapping(nameMapping);
return this;
}

Expand Down Expand Up @@ -144,23 +188,28 @@ public FlinkInputFormat buildFormat() {
FlinkSchemaUtil.toSchema(FlinkSchemaUtil.convert(icebergSchema)) :
projectedSchema).toRowDataType().getLogicalType());

Schema expectedSchema = icebergSchema;
if (projectedSchema != null) {
expectedSchema = FlinkSchemaUtil.convert(icebergSchema, projectedSchema);
}
context = context.project(projectedSchema == null ? icebergSchema :
FlinkSchemaUtil.convert(icebergSchema, projectedSchema));

return new FlinkInputFormat(tableLoader, expectedSchema, io, encryption, filterExpressions, options,
new SerializableConfiguration(hadoopConf));
return new FlinkInputFormat(tableLoader, new SerializableConfiguration(hadoopConf), io, encryption, context);
}

public DataStream<RowData> build() {
Preconditions.checkNotNull(env, "StreamExecutionEnvironment should not be null");
FlinkInputFormat format = buildFormat();
if (options.startSnapshotId() != null && options.endSnapshotId() == null) {
throw new UnsupportedOperationException("The Unbounded mode is not supported yet");
} else {
if (isBounded(context)) {
return env.createInput(format, rowTypeInfo);
} else {
throw new UnsupportedOperationException("The Unbounded mode is not supported yet");
}
}
}

private static boolean isBounded(ScanContext context) {
return context.startSnapshotId() == null || context.endSnapshotId() != null;
}

public static boolean isBounded(Map<String, String> properties) {
return isBounded(new ScanContext().fromProperties(properties));
}
}
Loading