-
Notifications
You must be signed in to change notification settings - Fork 3k
Spark 3.4: Allow control locality enabled on reading through session conf #7733
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
The change should be straightforward, no test is added because I don't find the existing test about locality in Spark module. |
|
If #7732 is merged, is this new setting |
This is needed. |
Ah, yeah. The options are not effective for tables loaded through catalog. But it may bring some confusions to users, such as: I was thinking maybe we could inject a new |
The division is not true, the key point here is where the table is loaded through TableProvider or CatalogPlugin.
Assume
|
In my experience, allowing the user to control some behaviors by using
|
Thanks for the detail explanation. By |
Yes, it gives users great flexibility.
If #7732 is merged, then all the options in SparkReadOptions and SparkWriteOptions would be exposed to session conf?
I don't think there are any limitations to allow overwrite options using SQL syntax, it's just that Spark doesn't pass any options when load table through |
wypoon
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is indeed a useful configuration to have. We have also found a need for it when using Iceberg on HDFS, where locality is enabled by default.
I'd like to see it in spark/v3.3 as well.
| return confParser | ||
| .booleanConf() | ||
| .option(SparkReadOptions.LOCALITY) | ||
| .sessionConf(SparkSQLProperties.LOCALITY_ENABLED) | ||
| .defaultValue(defaultValue) | ||
| .parse(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On further thought, I think it would be better to do
if (defaultValue) {
return confParser
.booleanConf()
.option(SparkReadOptions.LOCALITY)
.sessionConf(SparkSQLProperties.LOCALITY_ENABLED)
.defaultValue(true)
.parse();
} else {
return false;
}
If defaultValue is false, then locality should be disabled, regardless of the option or session conf, as block locations will not be available.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any issue with passing through "true" when that has no effect?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would probably rename "defaultValue" to "hasBlockLocations" or "canReadLocal" or something
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If SparkReadConf#localityEnabled returns true, even though the table is not in HDFS, then Iceberg will try to get the block locations, and I am not sure about this, since I haven't tested it, I think the HDFS call could throw an IOException and Iceberg will rethrow that. That would be a bad thing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, maybe it wouldn't throw an exception. But the default implementation of FileSystem#getFileBlockLocations does
String[] name = new String[]{"localhost:50010"};
String[] host = new String[]{"localhost"};
return new BlockLocation[]{new BlockLocation(name, host, 0L, file.getLen())};
and from that, we get a String[] of the hosts, and that probably won't be too helpful to Spark.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 to hasBlockLocations.
| // Controls whether vectorized reads are enabled | ||
| public static final String VECTORIZATION_ENABLED = "spark.sql.iceberg.vectorization.enabled"; | ||
|
|
||
| // Controls whether locality reads are enabled |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we add in this is only for Hadoop FileIO
|
This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions. |
|
This pull request has been closed due to lack of activity. This is not a judgement on the merit of the PR in any way. It is just a way of keeping the PR queue manageable. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time. |
This PR aims to allow user control locality enabled on reading through session conf
spark.sql.iceberg.locality.enabled.Previously, it's default enabled for HDFS, and can be disabled by setting read option, but it's is not friendly for SQL cases.
As described in #2577, the locality calculation may cause significant pressure on NameNode, after this PR, user can conveniently disable it by setting
spark.sql.iceberg.locality.enabled=false