Skip to content

Conversation

@JingsongLi
Copy link
Contributor

@JingsongLi JingsongLi commented Aug 13, 2020

Fixes #1303
Unlike Spark catalog table (Table is only required on the client/driver side), Flink needs obtain Table object in Job Manager or Task.

So we can introduce a CatalogLoader for reader and writer, users can define a custom catalog loader in FlinkCatalogFactory.

public interface CatalogLoader extends Serializable {
  Catalog loadCatalog(Configuration hadoopConf);
}

For support hadoop table based on location. Introduce TableLoader for both catalog table and hadoop location table.

Q: Can/Should we introduce CatalogLoader and TableLoader to an Iceberg common module.

Comment on lines +75 to +76
private final CatalogLoader catalogLoader;
private final Configuration hadoopConf;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those two seems don't have to be private members because I did not see anywhere accessing them except the constructor.

this.originalCatalog = icebergCatalog;
this.icebergCatalog = cacheEnabled ? CachingCatalog.wrap(icebergCatalog) : icebergCatalog;
this.hadoopConf = hadoopConf;
this.originalCatalog = catalogLoader.loadCatalog(hadoopConf);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is possible to track only on Catalog in this FlinkCatalog class ? For example, we only keep the icebergCatalog as the member of this class, that will be much easier to follow the code ( Introducing two member here confused me sometime). When close the catalog we could make the CachingCatalog to implement Closeable inteface.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CachingCatalog is just a wrapper for original catalog. I think it is better to not let it implement Closeable interface.
A way to solve this problem can be: Keep Closeable as a class member instead of Catalog.
closeable = originalCatalog instanceof Closeable ? (Closeable) originalCatalog : null;
What do you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense.

return new HadoopTableLoader(location);
}

class HadoopTableLoader implements TableLoader {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you really need this? isn't this covered by the CatalogTableLoader using hadoop?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw #1306 , I don't know if I misunderstood something.
It seems that a simple path can make it easier for users to use.

Copy link
Contributor

@edgarRd edgarRd Aug 20, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, I think part of the discussion is that providing a table via this API does not guarantee the atomic commit on table changes, therefore we have HiveCatalog for tables that are backed in systems like s3. While HadoopCatalog for hdfs which does not implement HMS coordination since the filesystem itself has atomic renames.

I'm not sure which case specifically this specific Flink implementation will use, but as mentioned in #1306 (comment) it seems dangerous to provide tables loaded with HadoopTables if there's no context of the underlying limitations.

Copy link
Contributor Author

@JingsongLi JingsongLi Aug 20, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, There are some comments in HadoopCatalog: The HadoopCatalog requires that the underlying file system supports atomic rename. So as HadoopTables.
But I think if users use HadoopTables, HadoopCatalog or TableLoader.fromHadoopTable, users should know, yeah, we need a file system supports atomic rename instead S3 and etc...

In #1306, should be discussing how to support location in the SQL layer.
For Flink, there are two kinds of users: DataStream users and SQL users.

So here is my thoughts:

  • Just like Spark and Mapreduce. For a Flink DataStream programer, he/she can just specify a hadoop location path for reading and writing, or use iceberg Catalog.
  • But for Flink SQL user, he/she must specify an iceberg Catalog to load tables.

@rdblue rdblue merged commit e2d0d73 into apache:master Aug 20, 2020
@rdblue
Copy link
Contributor

rdblue commented Aug 20, 2020

Merged! Thanks @JingsongLi!

I think this seems like a reasonable way to load tables on tasks that need to. We should document how to supply your own catalog, though. I think it is common for people to override them.

@JingsongLi JingsongLi deleted the loader branch November 5, 2020 09:42
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Flink: Introduce IcebergCatalogLoader for loading table at runtime

4 participants