-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-35531][SQL] Update hive table stats without unnecessary convert #38495
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
d616d85 to
963bca9
Compare
|
Retest this please |
|
@cloud-fan @AngersZhuuuu Could you help to review this PR ? Another PR #38496 depends on this. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why stats changed after this pr?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before this PR,when updating table stats, spark will convert CatalogTable to hive table;
After this PR, when updating table stats, spark will get the hive table from hive metastore and then update table stats. Refer to https://github.com/apache/spark/pull/38495/files#diff-45c9b065d76b237bcfecda83b8ee08c1ff6592d6f85acca09c0fa01472e056afR616-R618
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| def alterTableStats(dbName: String, tableName: String, parameters: Map[String, String]): Unit | |
| def alterTableProps(dbName: String, tableName: String, newProps: Map[String, String]): Unit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can just call getRawHiveTable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what are we doing here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reuse this UT to test update table stats code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it test anything? It just invokes alterTableStats but does no verification.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove this change
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can call client.getRawHiveTable, see 0942ea9
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we can call client.getRawHiveTable here, will throw exception java.lang.LinkageError: loader constraint violation: loader (instance of sun/misc/Launcher$AppClassLoader) previously initiated loading for a different type with name "org/apache/hadoop/hive/ql/metadata/Table"
Detail stack:
[info] org.apache.spark.sql.hive.execution.command.AlterTableDropPartitionSuite *** ABORTED *** (18 seconds, 552 milliseconds)
[info] java.lang.LinkageError: loader constraint violation: loader (instance of sun/misc/Launcher$AppClassLoader) previously initiated loading for a different type with name "org/apache/hadoop/hive/ql/metadata/Table"
[info] at java.lang.ClassLoader.defineClass1(Native Method)
[info] at java.lang.ClassLoader.defineClass(ClassLoader.java:756)
[info] at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
[info] at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
[info] at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
[info] at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
[info] at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
[info] at java.security.AccessController.doPrivileged(Native Method)
[info] at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
[info] at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
[info] at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355)
[info] at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
[info] at org.apache.spark.sql.hive.client.HiveClientImpl$.toHiveTable(HiveClientImpl.scala:1115)
[info] at org.apache.spark.sql.hive.execution.V1WritesHiveUtils.getDynamicPartitionColumns(V1WritesHiveUtils.scala:51)
[info] at org.apache.spark.sql.hive.execution.V1WritesHiveUtils.getDynamicPartitionColumns$(V1WritesHiveUtils.scala:43)
[info] at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.getDynamicPartitionColumns(InsertIntoHiveTable.scala:70)
[info] at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.partitionColumns$lzycompute(InsertIntoHiveTable.scala:80)
[info] at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.partitionColumns(InsertIntoHiveTable.scala:79)
[info] at org.apache.spark.sql.execution.datasources.V1Writes$.org$apache$spark$sql$execution$datasources$V1Writes$$prepareQuery(V1Writes.scala:75)
[info] at org.apache.spark.sql.execution.datasources.V1Writes$$anonfun$apply$1.applyOrElse(V1Writes.scala:57)
[info] at org.apache.spark.sql.execution.datasources.V1Writes$$anonfun$apply$1.applyOrElse(V1Writes.scala:55)
[info] at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512)
[info] at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)
[info] at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512)
[info] at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
[info] at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
[info] at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
[info] at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
[info] at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
[info] at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488)
[info] at org.apache.spark.sql.execution.datasources.V1Writes$.apply(V1Writes.scala:55)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method should take RawHiveTable, so that we don't need to look up the table here.
f69acca to
9108792
Compare
|
Retest this please |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a bit tricky to make HiveClient handle this STATISTICS_PREFIX. It should be the responsibility of HiveExternalCatalog. HiveClient should only take care of the communication with HMS.
acb8a95 to
be0c869
Compare
| // convert table statistics to properties so that we can persist them through hive client | ||
| val statsProperties = | ||
| val rawHiveTable = client.getRawHiveTable(db, table) | ||
| val oldProps = client.hiveTableProps(rawHiveTable) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you explain the rationale?
6486103 to
3115a62
Compare
| val oldProps = | ||
| client.hiveTableProps(rawHiveTable, containsStats = false) | ||
| .filterKeys(!_.startsWith(STATISTICS_PREFIX)) | ||
| val newProps = | ||
| if (stats.isDefined) { | ||
| statsToProperties(stats.get) | ||
| oldProps ++ statsToProperties(stats.get) | ||
| } else { | ||
| new mutable.HashMap[String, String]() | ||
| oldProps | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Get all old properties from the hive table without table stats. (HiveStatisticsProperties is defined in HiveClientImpl, so do this in HiveClientImpl)
- Filter out spark sql table and columns properties. (STATISTICS_PREFIX is defined in HiveExternalCatalog, so do this in HiveExternalCatalog)
- Add the new table stats and then save the new table.
| def createTable(table: CatalogTable, ignoreIfExists: Boolean): Unit | ||
|
|
||
| /** Get hive table properties. */ | ||
| def hiveTableProps(rawHiveTable: RawHiveTable, containsStats: Boolean): Map[String, String] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we add a method in RawHiveTable to do it?
| private class RawHiveTableImpl(override val rawTable: HiveTable) extends RawHiveTable { | ||
| override lazy val toCatalogTable = convertHiveTableToCatalogTable(rawTable) | ||
|
|
||
| override def hiveTableProps(containsStats: Boolean): Map[String, String] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need this parameter?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if we need all hive table properties in some other places?
73be616 to
bad2444
Compare
|
thanks, merging to master! |
### What changes were proposed in this pull request? Update hive table stats without convert from HiveTable -> CatalogTable -> HiveTable . `HiveExternalCatalog.alterTableStats()` will convert a raw HiveTable to CatalogTable which will store schema as lowercase and keep bucket columns as they are. `HiveClientImpl.alterTable()` will throw `Bucket columns V1 is not part of the table columns` exception when re-convert the CatalogTable to a HiveTable ### Why are the changes needed? Bug fix, refer to apache#32675 (comment) ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Update exists UT Closes apache#38495 from wankunde/write_stats_directly. Authored-by: Kun Wan <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
### What changes were proposed in this pull request? Update hive table stats without convert from HiveTable -> CatalogTable -> HiveTable . `HiveExternalCatalog.alterTableStats()` will convert a raw HiveTable to CatalogTable which will store schema as lowercase and keep bucket columns as they are. `HiveClientImpl.alterTable()` will throw `Bucket columns V1 is not part of the table columns` exception when re-convert the CatalogTable to a HiveTable ### Why are the changes needed? Bug fix, refer to apache#32675 (comment) ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Update exists UT Closes apache#38495 from wankunde/write_stats_directly. Authored-by: Kun Wan <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
### What changes were proposed in this pull request? Update hive table stats without convert from HiveTable -> CatalogTable -> HiveTable . `HiveExternalCatalog.alterTableStats()` will convert a raw HiveTable to CatalogTable which will store schema as lowercase and keep bucket columns as they are. `HiveClientImpl.alterTable()` will throw `Bucket columns V1 is not part of the table columns` exception when re-convert the CatalogTable to a HiveTable ### Why are the changes needed? Bug fix, refer to apache#32675 (comment) ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Update exists UT Closes apache#38495 from wankunde/write_stats_directly. Authored-by: Kun Wan <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
What changes were proposed in this pull request?
Update hive table stats without convert from HiveTable -> CatalogTable -> HiveTable .
HiveExternalCatalog.alterTableStats()will convert a raw HiveTable to CatalogTable which will store schema as lowercase and keep bucket columns as they are.HiveClientImpl.alterTable()will throwBucket columns V1 is not part of the table columnsexception when re-convert the CatalogTable to a HiveTableWhy are the changes needed?
Bug fix, refer to #32675 (comment)
Does this PR introduce any user-facing change?
No
How was this patch tested?
Update exists UT