-
Notifications
You must be signed in to change notification settings - Fork 3k
Flink: Integrate Flink reader to SQL #1509
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
flink/src/main/java/org/apache/iceberg/flink/TypeToFlinkType.java
Outdated
Show resolved
Hide resolved
|
|
||
| // produce another timestamp | ||
| Thread.sleep(10); | ||
| waitUntilAfter(10); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be called with timestampMillis because you want to wait until it is strictly after the current snapshot's timestamp.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, you are right.
|
Mostly looks good. Just a couple of minor issues. |
Got it, I think I can change the |
flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java
Outdated
Show resolved
Hide resolved
|
|
||
| public Builder filters(List<Expression> newFilters) { | ||
| this.filterExpressions = newFilters; | ||
| public Builder hadoopConf(Configuration newConf) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We try to avoid using Hadoop classes in Iceberg APIs because they are hard to remove. Injecting a Hadoop Configuration is currently done in one place: to instantiate a catalog that requires it.
The catalog creates tables and tables are associated with a FileIO, so the configuration is passed down through that chain. The table's configuration should be used for any table configuration.
MR also has a Hadoop Configuration, but that's required by the API so we can't remove it from the API there. But we still prefer using the table's Configuration when it is needed for components like HadoopFileIO.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for avoiding using Hadoop Configuration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason why Hadoop conf needs to be passed now is because:
- JobManager needs the splits of the scan, so it needs to call
Table.newScan. - So JobManager needs
Tableobject, where can a table be generated?TableLoader-> fromCatalogorHadoopTables. - The creation of
CatalogandHadoopTablesneeds a HadoopConfiguration.
Maybe we can pass this chain with an Iceberg object (FileIO may looks not good to the creation of catalog)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe CatalogLoader should serialize its own Configuration? It would make sense to pass one when creating a CatalogLoader for HadoopCatalog or HiveCatalog because those need a Configuration to create the catalog.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or maybe this could use the approach from the FlinkCatalogFactory:
public static Configuration clusterHadoopConf() {
return HadoopUtils.getHadoopConfiguration(GlobalConfiguration.loadConfiguration());
}Using clusterHadoopConf internally here would avoid the need to expose this in the API and we could add Configuration to the loader later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using clusterHadoopConf may not so flexible.
I am +1 for CatalogLoader should serialize its own Configuration. I will create a PR later for source and sink.
flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java
Show resolved
Hide resolved
a41dd8f to
8e1b03f
Compare
|
Looks good, @JingsongLi! I'd like to remove the Hadoop |
This is subtask of #1293
Also fix left comments in #1346