Skip to content

Commit 480a92b

Browse files
committed
Merge remote-tracking branch 'upstream/master' into SPARK-33848
2 parents 426b75d + df2314b commit 480a92b

File tree

22 files changed

+661
-320
lines changed

22 files changed

+661
-320
lines changed

dev/deps/spark-deps-hadoop-2.7-hive-2.3

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -242,4 +242,4 @@ xmlenc/0.52//xmlenc-0.52.jar
242242
xz/1.5//xz-1.5.jar
243243
zjsonpatch/0.3.0//zjsonpatch-0.3.0.jar
244244
zookeeper/3.4.14//zookeeper-3.4.14.jar
245-
zstd-jni/1.4.5-6//zstd-jni-1.4.5-6.jar
245+
zstd-jni/1.4.8-1//zstd-jni-1.4.8-1.jar

dev/deps/spark-deps-hadoop-3.2-hive-2.3

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -257,4 +257,4 @@ xbean-asm7-shaded/4.15//xbean-asm7-shaded-4.15.jar
257257
xz/1.5//xz-1.5.jar
258258
zjsonpatch/0.3.0//zjsonpatch-0.3.0.jar
259259
zookeeper/3.4.14//zookeeper-3.4.14.jar
260-
zstd-jni/1.4.5-6//zstd-jni-1.4.5-6.jar
260+
zstd-jni/1.4.8-1//zstd-jni-1.4.8-1.jar

mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala

Lines changed: 33 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import scala.util.{Sorting, Try}
2727
import scala.util.hashing.byteswap64
2828

2929
import com.github.fommil.netlib.BLAS.{getInstance => blas}
30+
import com.google.common.collect.{Ordering => GuavaOrdering}
3031
import org.apache.hadoop.fs.Path
3132
import org.json4s.DefaultFormats
3233
import org.json4s.JsonDSL._
@@ -47,7 +48,7 @@ import org.apache.spark.sql.{DataFrame, Dataset}
4748
import org.apache.spark.sql.functions._
4849
import org.apache.spark.sql.types._
4950
import org.apache.spark.storage.StorageLevel
50-
import org.apache.spark.util.{BoundedPriorityQueue, Utils}
51+
import org.apache.spark.util.Utils
5152
import org.apache.spark.util.collection.{OpenHashMap, OpenHashSet, SortDataFormat, Sorter}
5253
import org.apache.spark.util.random.XORShiftRandom
5354

@@ -456,30 +457,39 @@ class ALSModel private[ml] (
456457
num: Int,
457458
blockSize: Int): DataFrame = {
458459
import srcFactors.sparkSession.implicits._
460+
import scala.collection.JavaConverters._
459461

460462
val srcFactorsBlocked = blockify(srcFactors.as[(Int, Array[Float])], blockSize)
461463
val dstFactorsBlocked = blockify(dstFactors.as[(Int, Array[Float])], blockSize)
462464
val ratings = srcFactorsBlocked.crossJoin(dstFactorsBlocked)
463-
.as[(Seq[(Int, Array[Float])], Seq[(Int, Array[Float])])]
464-
.flatMap { case (srcIter, dstIter) =>
465-
val m = srcIter.size
466-
val n = math.min(dstIter.size, num)
467-
val output = new Array[(Int, Int, Float)](m * n)
468-
var i = 0
469-
val pq = new BoundedPriorityQueue[(Int, Float)](num)(Ordering.by(_._2))
470-
srcIter.foreach { case (srcId, srcFactor) =>
471-
dstIter.foreach { case (dstId, dstFactor) =>
472-
// We use F2jBLAS which is faster than a call to native BLAS for vector dot product
473-
val score = BLAS.f2jBLAS.sdot(rank, srcFactor, 1, dstFactor, 1)
474-
pq += dstId -> score
465+
.as[(Array[Int], Array[Float], Array[Int], Array[Float])]
466+
.mapPartitions { iter =>
467+
var scores: Array[Float] = null
468+
var idxOrd: GuavaOrdering[Int] = null
469+
iter.flatMap { case (srcIds, srcMat, dstIds, dstMat) =>
470+
require(srcMat.length == srcIds.length * rank)
471+
require(dstMat.length == dstIds.length * rank)
472+
val m = srcIds.length
473+
val n = dstIds.length
474+
if (scores == null || scores.length < n) {
475+
scores = Array.ofDim[Float](n)
476+
idxOrd = new GuavaOrdering[Int] {
477+
override def compare(left: Int, right: Int): Int = {
478+
Ordering[Float].compare(scores(left), scores(right))
479+
}
480+
}
475481
}
476-
pq.foreach { case (dstId, score) =>
477-
output(i) = (srcId, dstId, score)
478-
i += 1
482+
483+
Iterator.range(0, m).flatMap { i =>
484+
// buffer = i-th vec in srcMat * dstMat
485+
BLAS.f2jBLAS.sgemv("T", rank, n, 1.0F, dstMat, 0, rank,
486+
srcMat, i * rank, 1, 0.0F, scores, 0, 1)
487+
488+
val srcId = srcIds(i)
489+
idxOrd.greatestOf(Iterator.range(0, n).asJava, num).asScala
490+
.iterator.map { j => (srcId, dstIds(j), scores(j)) }
479491
}
480-
pq.clear()
481492
}
482-
output.toSeq
483493
}
484494
// We'll force the IDs to be Int. Unfortunately this converts IDs to Int in the output.
485495
val topKAggregator = new TopByKeyAggregator[Int, Int, Float](num, Ordering.by(_._2))
@@ -499,9 +509,12 @@ class ALSModel private[ml] (
499509
*/
500510
private def blockify(
501511
factors: Dataset[(Int, Array[Float])],
502-
blockSize: Int): Dataset[Seq[(Int, Array[Float])]] = {
512+
blockSize: Int): Dataset[(Array[Int], Array[Float])] = {
503513
import factors.sparkSession.implicits._
504-
factors.mapPartitions(_.grouped(blockSize))
514+
factors.mapPartitions { iter =>
515+
iter.grouped(blockSize)
516+
.map(block => (block.map(_._1).toArray, block.flatMap(_._2).toArray))
517+
}
505518
}
506519

507520
}

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -695,7 +695,7 @@
695695
<dependency>
696696
<groupId>com.github.luben</groupId>
697697
<artifactId>zstd-jni</artifactId>
698-
<version>1.4.5-6</version>
698+
<version>1.4.8-1</version>
699699
</dependency>
700700
<dependency>
701701
<groupId>com.clearspring.analytics</groupId>

project/SparkBuild.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import java.util.Locale
2323
import scala.io.Source
2424
import scala.util.Properties
2525
import scala.collection.JavaConverters._
26-
import scala.collection.mutable.Stack
26+
import scala.collection.mutable.ListBuffer
2727

2828
import sbt._
2929
import sbt.Classpaths.publishTask
@@ -1109,14 +1109,14 @@ object TestSettings {
11091109
// Because File.mkdirs() can fail if multiple callers are trying to create the same
11101110
// parent directory, this code tries to create parents one at a time, and avoids
11111111
// failures when the directories have been created by somebody else.
1112-
val stack = new Stack[File]()
1112+
val stack = new ListBuffer[File]()
11131113
while (!dir.isDirectory()) {
1114-
stack.push(dir)
1114+
stack.prepend(dir)
11151115
dir = dir.getParentFile()
11161116
}
11171117

11181118
while (stack.nonEmpty) {
1119-
val d = stack.pop()
1119+
val d = stack.remove(0)
11201120
require(d.mkdir() || d.isDirectory(), s"Failed to create directory $d")
11211121
}
11221122
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala

Lines changed: 52 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -472,6 +472,51 @@ object CatalogTable {
472472

473473
val VIEW_REFERRED_TEMP_VIEW_NAMES = VIEW_PREFIX + "referredTempViewNames"
474474
val VIEW_REFERRED_TEMP_FUNCTION_NAMES = VIEW_PREFIX + "referredTempFunctionsNames"
475+
476+
def splitLargeTableProp(
477+
key: String,
478+
value: String,
479+
addProp: (String, String) => Unit,
480+
defaultThreshold: Int): Unit = {
481+
val threshold = SQLConf.get.getConf(SQLConf.HIVE_TABLE_PROPERTY_LENGTH_THRESHOLD)
482+
.getOrElse(defaultThreshold)
483+
if (value.length <= threshold) {
484+
addProp(key, value)
485+
} else {
486+
val parts = value.grouped(threshold).toSeq
487+
addProp(s"$key.numParts", parts.length.toString)
488+
parts.zipWithIndex.foreach { case (part, index) =>
489+
addProp(s"$key.part.$index", part)
490+
}
491+
}
492+
}
493+
494+
def readLargeTableProp(props: Map[String, String], key: String): Option[String] = {
495+
props.get(key).orElse {
496+
if (props.filterKeys(_.startsWith(key)).isEmpty) {
497+
None
498+
} else {
499+
val numParts = props.get(s"$key.numParts")
500+
val errorMessage = s"Cannot read table property '$key' as it's corrupted."
501+
if (numParts.isEmpty) {
502+
throw new AnalysisException(errorMessage)
503+
} else {
504+
val parts = (0 until numParts.get.toInt).map { index =>
505+
props.getOrElse(s"$key.part.$index", {
506+
throw new AnalysisException(
507+
s"$errorMessage Missing part $index, ${numParts.get} parts are expected.")
508+
})
509+
}
510+
Some(parts.mkString)
511+
}
512+
}
513+
}
514+
}
515+
516+
def isLargeTableProp(originalKey: String, propKey: String): Boolean = {
517+
propKey == originalKey || propKey == s"$originalKey.numParts" ||
518+
propKey.startsWith(s"$originalKey.part.")
519+
}
475520
}
476521

477522
/**
@@ -546,7 +591,11 @@ case class CatalogColumnStat(
546591
min.foreach { v => map.put(s"${colName}.${CatalogColumnStat.KEY_MIN_VALUE}", v) }
547592
max.foreach { v => map.put(s"${colName}.${CatalogColumnStat.KEY_MAX_VALUE}", v) }
548593
histogram.foreach { h =>
549-
map.put(s"${colName}.${CatalogColumnStat.KEY_HISTOGRAM}", HistogramSerializer.serialize(h))
594+
CatalogTable.splitLargeTableProp(
595+
s"$colName.${CatalogColumnStat.KEY_HISTOGRAM}",
596+
HistogramSerializer.serialize(h),
597+
map.put,
598+
4000)
550599
}
551600
map.toMap
552601
}
@@ -650,7 +699,8 @@ object CatalogColumnStat extends Logging {
650699
nullCount = map.get(s"${colName}.${KEY_NULL_COUNT}").map(v => BigInt(v.toLong)),
651700
avgLen = map.get(s"${colName}.${KEY_AVG_LEN}").map(_.toLong),
652701
maxLen = map.get(s"${colName}.${KEY_MAX_LEN}").map(_.toLong),
653-
histogram = map.get(s"${colName}.${KEY_HISTOGRAM}").map(HistogramSerializer.deserialize),
702+
histogram = CatalogTable.readLargeTableProp(map, s"$colName.$KEY_HISTOGRAM")
703+
.map(HistogramSerializer.deserialize),
654704
version = map(s"${colName}.${KEY_VERSION}").toInt
655705
))
656706
} catch {

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -905,6 +905,16 @@ object SQLConf {
905905
.checkValues(HiveCaseSensitiveInferenceMode.values.map(_.toString))
906906
.createWithDefault(HiveCaseSensitiveInferenceMode.NEVER_INFER.toString)
907907

908+
val HIVE_TABLE_PROPERTY_LENGTH_THRESHOLD =
909+
buildConf("spark.sql.hive.tablePropertyLengthThreshold")
910+
.internal()
911+
.doc("The maximum length allowed in a single cell when storing Spark-specific information " +
912+
"in Hive's metastore as table properties. Currently it covers 2 things: the schema's " +
913+
"JSON string, the histogram of column statistics.")
914+
.version("3.2.0")
915+
.intConf
916+
.createOptional
917+
908918
val OPTIMIZER_METADATA_ONLY = buildConf("spark.sql.optimizer.metadataOnly")
909919
.internal()
910920
.doc("When true, enable the metadata-only query optimization that use the table's metadata " +
@@ -3052,7 +3062,9 @@ object SQLConf {
30523062
"Avoid to depend on this optimization to prevent a potential correctness issue. " +
30533063
"If you must use, use 'SparkSessionExtensions' instead to inject it as a custom rule."),
30543064
DeprecatedConfig(CONVERT_CTAS.key, "3.1",
3055-
s"Set '${LEGACY_CREATE_HIVE_TABLE_BY_DEFAULT.key}' to false instead.")
3065+
s"Set '${LEGACY_CREATE_HIVE_TABLE_BY_DEFAULT.key}' to false instead."),
3066+
DeprecatedConfig("spark.sql.sources.schemaStringLengthThreshold", "3.2",
3067+
s"Use '${HIVE_TABLE_PROPERTY_LENGTH_THRESHOLD.key}' instead.")
30563068
)
30573069

30583070
Map(configs.map { cfg => cfg.key -> cfg } : _*)

sql/core/src/main/scala/org/apache/spark/sql/execution/ExplainUtils.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,8 @@ object ExplainUtils extends AdaptiveSparkPlanHelper {
218218
plan: => QueryPlan[_],
219219
subqueries: ArrayBuffer[(SparkPlan, Expression, BaseSubqueryExec)]): Unit = {
220220
plan.foreach {
221+
case a: AdaptiveSparkPlanExec =>
222+
getSubqueries(a.executedPlan, subqueries)
221223
case p: SparkPlan =>
222224
p.expressions.foreach (_.collect {
223225
case e: PlanExpression[_] =>

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import org.apache.spark.sql.execution.datasources.DataSourceStrategy
3131
import org.apache.spark.sql.execution.streaming.continuous.{WriteToContinuousDataSource, WriteToContinuousDataSourceExec}
3232
import org.apache.spark.sql.sources.{BaseRelation, TableScan}
3333
import org.apache.spark.sql.util.CaseInsensitiveStringMap
34+
import org.apache.spark.storage.StorageLevel
3435

3536
class DataSourceV2Strategy(session: SparkSession) extends Strategy with PredicateHelper {
3637

@@ -56,17 +57,24 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
5657
session.sharedState.cacheManager.recacheByPlan(session, r)
5758
}
5859

59-
private def invalidateCache(r: ResolvedTable, recacheTable: Boolean = false)(): Unit = {
60+
// Invalidates the cache associated with the given table. If the invalidated cache matches the
61+
// given table, the cache's storage level is returned.
62+
private def invalidateCache(
63+
r: ResolvedTable,
64+
recacheTable: Boolean = false)(): Option[StorageLevel] = {
6065
val v2Relation = DataSourceV2Relation.create(r.table, Some(r.catalog), Some(r.identifier))
6166
val cache = session.sharedState.cacheManager.lookupCachedData(v2Relation)
6267
session.sharedState.cacheManager.uncacheQuery(session, v2Relation, cascade = true)
63-
if (recacheTable && cache.isDefined) {
64-
// save the cache name and cache level for recreation
65-
val cacheName = cache.get.cachedRepresentation.cacheBuilder.tableName
68+
if (cache.isDefined) {
6669
val cacheLevel = cache.get.cachedRepresentation.cacheBuilder.storageLevel
67-
68-
// recache with the same name and cache level.
69-
session.sharedState.cacheManager.cacheQuery(session, v2Relation, cacheName, cacheLevel)
70+
if (recacheTable) {
71+
val cacheName = cache.get.cachedRepresentation.cacheBuilder.tableName
72+
// recache with the same name and cache level.
73+
session.sharedState.cacheManager.cacheQuery(session, v2Relation, cacheName, cacheLevel)
74+
}
75+
Some(cacheLevel)
76+
} else {
77+
None
7078
}
7179
}
7280

@@ -266,12 +274,17 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
266274
case AlterTable(catalog, ident, _, changes) =>
267275
AlterTableExec(catalog, ident, changes) :: Nil
268276

269-
case RenameTable(ResolvedTable(catalog, oldIdent, _), newIdent, isView) =>
277+
case RenameTable(r @ ResolvedTable(catalog, oldIdent, _), newIdent, isView) =>
270278
if (isView) {
271279
throw new AnalysisException(
272280
"Cannot rename a table with ALTER VIEW. Please use ALTER TABLE instead.")
273281
}
274-
RenameTableExec(catalog, oldIdent, newIdent.asIdentifier) :: Nil
282+
RenameTableExec(
283+
catalog,
284+
oldIdent,
285+
newIdent.asIdentifier,
286+
invalidateCache(r),
287+
session.sharedState.cacheManager.cacheQuery) :: Nil
275288

276289
case AlterNamespaceSetProperties(ResolvedNamespace(catalog, ns), properties) =>
277290
AlterNamespaceSetPropertiesExec(catalog.asNamespaceCatalog, ns, properties) :: Nil

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RenameTableExec.scala

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,24 +17,39 @@
1717

1818
package org.apache.spark.sql.execution.datasources.v2
1919

20+
import org.apache.spark.sql.SparkSession
2021
import org.apache.spark.sql.catalyst.InternalRow
2122
import org.apache.spark.sql.catalyst.expressions.Attribute
23+
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
2224
import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog}
25+
import org.apache.spark.storage.StorageLevel
2326

2427
/**
2528
* Physical plan node for renaming a table.
2629
*/
2730
case class RenameTableExec(
2831
catalog: TableCatalog,
2932
oldIdent: Identifier,
30-
newIdent: Identifier) extends V2CommandExec {
33+
newIdent: Identifier,
34+
invalidateCache: () => Option[StorageLevel],
35+
cacheTable: (SparkSession, LogicalPlan, Option[String], StorageLevel) => Unit)
36+
extends V2CommandExec {
3137

3238
override def output: Seq[Attribute] = Seq.empty
3339

3440
override protected def run(): Seq[InternalRow] = {
41+
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.IdentifierHelper
42+
43+
val optOldStorageLevel = invalidateCache()
3544
catalog.invalidateTable(oldIdent)
45+
3646
catalog.renameTable(oldIdent, newIdent)
3747

48+
optOldStorageLevel.foreach { oldStorageLevel =>
49+
val tbl = catalog.loadTable(newIdent)
50+
val newRelation = DataSourceV2Relation.create(tbl, Some(catalog), Some(newIdent))
51+
cacheTable(sqlContext.sparkSession, newRelation, Some(newIdent.quoted), oldStorageLevel)
52+
}
3853
Seq.empty
3954
}
4055
}

0 commit comments

Comments
 (0)