Skip to content

Conversation

@JingsongLi
Copy link
Contributor

@JingsongLi JingsongLi commented Aug 4, 2020

Fixes #1275

This is Proof of Concept (POC) for Flink reader.

The Flink reader is essentially the same as Spark.

  • Flink InputFormat is similar to Hive (Hadoop) input format. Its splits are generated in the job manager. Therefore, an iceberg catalog loader is needed to obtain the Iceberg Table object.
  • Flink TableFactory and TableSource are similar to Spark TableProvider and SparkScanBuilder, It also provides projection push down ProjectableTableSource and filter push down FilterableTableSource.

Work can be divided into:

@JingsongLi JingsongLi force-pushed the flink_reader branch 2 times, most recently from 76426e6 to 3e43afd Compare August 4, 2020 10:41

@Override
public Map<String, String> requiredContext() {
throw new UnsupportedOperationException("Iceberg Table Factory can not loaded from Java SPI");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Probable typo. Should possible be can not *be* loaded from Java SPI

I also think it might help the average reader if you defined the acronym SPI at least once.

Copy link
Member

@openinx openinx left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had a rough look, left few comments. Will take a more closer review.

Comment on lines 81 to 113
switch (fileFormat) {
case AVRO:
appender = Avro.write(Files.localOutput(file))
.schema(table.schema())
.createWriterFunc(DataWriter::create)
.named(fileFormat.name())
.build();
break;

case PARQUET:
appender = Parquet.write(Files.localOutput(file))
.schema(table.schema())
.createWriterFunc(GenericParquetWriter::buildWriter)
.named(fileFormat.name())
.build();
break;

case ORC:
appender = ORC.write(Files.localOutput(file))
.schema(table.schema())
.createWriterFunc(GenericOrcWriter::buildWriter)
.build();
break;

default:
throw new UnsupportedOperationException("Cannot write format: " + fileFormat);
}

try {
appender.addAll(records);
} finally {
appender.close();
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems we could abstract those appender building into a separate method, I saw lots of unit tests depend on this common logics, could be a separate issue to do this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can have a GenericAppenderFactory

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created #1340

* Flink needs to get {@link Table} objects in the cluster (for example, to get splits), not just on the client side.
* So we need an Iceberg catalog loader to get the {@link Catalog} and get the {@link Table} object.
*/
public interface CatalogLoader extends Serializable {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems we could move this class to hive modules ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe not in hive but somewhere in Iceberg modules, I don't know if any other modules want to use CatalogLoader.
We can discuss it in #1332

this.originalCatalog = icebergCatalog;
this.icebergCatalog = cacheEnabled ? CachingCatalog.wrap(icebergCatalog) : icebergCatalog;
this.originalCatalog = catalogLoader.loadCatalog(
HadoopUtils.getHadoopConfiguration(GlobalConfiguration.loadConfiguration()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about adding a Configuration argument in this current constructor. IMO for FlinkCatalog, it shouldn't handle the logic about configuration initialization. Moving the configuration loading to the FlinkCatalogFactory sounds more reasonable. besides, unit test for FlinkCatalog will be easy because it don't depend on how the flink will load its configuration.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can discuss it in #1332

}

public FlinkInputFormat build() {
return new FlinkInputFormat(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'd better to have a Precondition#check for those arguments in case of throw NPE in the following call stack.


@Override
protected List<Row> executeWithSnapshotId(Table table, long snapshotId) throws IOException {
return run(builder.table(table).options(ScanOptions.builder().snapshotId(snapshotId).build()).build());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also add unit tests for following cases:

  1. scan with both startSnapshotId and endSnapshotId;
  2. scan with only asOfTimestamp;
  3. scan with only startSnapshotId .


/**
* Prune columns from a {@link Schema} using a projected fields.
* TODO Why Spark care about filters?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As the javadoc said:

The filters list of {@link Expression} is used to ensure that columns referenced by filters are projected.

For my understanding, when doing filter push down, we need to keep the columns which has been involved in push-down filter even if it does not in the projection column list.

Now we do not implement the FilterableTableSource interface, so I think we don't need to consider it now.

Copy link
Contributor Author

@JingsongLi JingsongLi Aug 14, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The columns which has been involved in push-down filter must be in the projection column list. Because just like spark:
Spark doesn't support residuals per task, so return all filters to get Spark to handle record-level filtering.
Flink source also doesn't support residuals per task, left these filtering to Flink planner.

CloseableIterable<RowData> iterable = newIterable(task, idToConstant);
ProjectionRowData projectionRow = new ProjectionRowData();
return (finalProjection == null ? iterable : CloseableIterable.transform(
iterable, rowData -> (RowData) projectionRow.replace(rowData, finalProjection))).iterator();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q: Why we need to transform the CloseableIterable<RowData> to be a projected RowData iterable again, I mean we've created an AVRO iterable with the projected read schema, it should guarantee that iterable only contains the projected columns ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can take a look to FlinkSchemaUtil. pruneWithoutReordering, it just keep the order from original schema. But the real Flink projection may change the order.

@Override
public FlinkInputSplit[] createInputSplits(int minNumSplits) throws IOException {
// Invoked by Job manager, so it is OK to load table from catalog.
tableLoader.open(HadoopUtils.getHadoopConfiguration(GlobalConfiguration.loadConfiguration()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q: Are we binding the iceberg table's hadoop configuration with flink job manager's hadoop configuration ? Is it possible to access an iceberg table in the hadoop cluster which is different with flink's hadoop cluster ? Seems a more reasonable way is: Passing a customized SerializeableConfiguration from client, then job manager could access any hadoop clusters.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm OK to pass a configuration to FlinkInputFormat, although there will be some serialization cost.

@JingsongLi
Copy link
Contributor Author

Hi @openinx , I have create #1346 for InputFormat and addressed your comments.

@openinx
Copy link
Member

openinx commented Aug 17, 2020

Thanks for the update, I will take a look today.

}

@Override
public Optional<TableFactory> getTableFactory() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q: The javadoc says it's deprecated now, What's the time for us to use the getFactory in future ?

	 * @deprecated Use {@link #getFactory()} for the new factory stack. The new factory stack uses the
	 *             new table sources and sinks defined in FLIP-95 and a slightly different discovery mechanism.
	 */
	@Deprecated
	default Optional<TableFactory> getTableFactory() {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Until the new API is really ready...
In the 1.11 and master, FLIP-95 interfaces still lack many things. I am not sure about Flink 1.12, maybe 1.13 is the time.

* Flink Iceberg table factory to create table source and sink.
* Only works for catalog, can not be loaded from Java SPI(Service Provider Interface).
*/
class FlinkTableFactory implements TableSourceFactory<RowData> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, should be in this class too.

private final CatalogLoader catalogLoader;
private final Configuration hadoopConf;
private final TableSchema schema;
private final Map<String, String> options;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about renaming it to scanOptions ? I was thought it's an options map of the iceberg table.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It also includes table hints.

public TableSource<RowData> createTableSource(Context context) {
ObjectIdentifier identifier = context.getObjectIdentifier();
ObjectPath objectPath = new ObjectPath(identifier.getDatabaseName(), identifier.getObjectName());
TableIdentifier icebergIdentifier = catalog.toIdentifier(objectPath);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: the toIdentifier could be a static method in catalog .

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it can not, because it needs baseNamespace information.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK , I did not take look into the toNamespace, sounds good.

try {
Table table = catalog.getIcebergTable(objectPath);
// Excludes computed columns
TableSchema icebergSchema = TableSchemaUtils.getPhysicalSchema(context.getTable().getSchema());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q: what if someone query the flink table with projecting a computed column (which does not exist in iceberg table )? Does that works fine in current version , or will it throw an exception ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not works.
Even if the iceberg table supports computed columns in the future, these computed columns will be generated by Flink instead of Iceberg source. (So here should always be getPhysicalSchema)

@JingsongLi
Copy link
Contributor Author

Thanks all for your help, all sub-PRs have been completed. I'll close this PR.

@JingsongLi JingsongLi closed this Oct 9, 2020
@JingsongLi JingsongLi deleted the flink_reader branch November 5, 2020 09:42
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Flink: Implement Flink InputFormat and integrate it to FlinkCatalog

3 participants