From d1255297b85d9b39376bb479821cfb603bc7b47b Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Fri, 26 Jun 2020 12:55:38 -0500 Subject: [PATCH 01/15] [SPARK-19939][ML] Add support for association rules in ML ### What changes were proposed in this pull request? Adding support to Association Rules in Spark ml.fpm. ### Why are the changes needed? Support is an indication of how frequently the itemset of an association rule appears in the database and suggests if the rules are generally applicable to the dateset. Refer to [wiki](https://en.wikipedia.org/wiki/Association_rule_learning#Support) for more details. ### Does this PR introduce _any_ user-facing change? Yes. Associate Rules now have support measure ### How was this patch tested? existing and new unit test Closes #28903 from huaxingao/fpm. Authored-by: Huaxin Gao Signed-off-by: Sean Owen --- R/pkg/R/mllib_fpm.R | 5 +-- R/pkg/tests/fulltests/test_mllib_fpm.R | 3 +- .../org/apache/spark/ml/fpm/FPGrowth.scala | 20 +++++++----- .../spark/mllib/fpm/AssociationRules.scala | 2 +- .../apache/spark/ml/fpm/FPGrowthSuite.scala | 31 +++++++++++++++++-- python/pyspark/ml/fpm.py | 18 +++++------ python/pyspark/ml/tests/test_algorithms.py | 4 +-- 7 files changed, 57 insertions(+), 26 deletions(-) diff --git a/R/pkg/R/mllib_fpm.R b/R/pkg/R/mllib_fpm.R index 0cc7a16c302dc..30bc51b932041 100644 --- a/R/pkg/R/mllib_fpm.R +++ b/R/pkg/R/mllib_fpm.R @@ -122,11 +122,12 @@ setMethod("spark.freqItemsets", signature(object = "FPGrowthModel"), # Get association rules. #' @return A \code{SparkDataFrame} with association rules. -#' The \code{SparkDataFrame} contains four columns: +#' The \code{SparkDataFrame} contains five columns: #' \code{antecedent} (an array of the same type as the input column), #' \code{consequent} (an array of the same type as the input column), #' \code{condfidence} (confidence for the rule) -#' and \code{lift} (lift for the rule) +#' \code{lift} (lift for the rule) +#' and \code{support} (support for the rule) #' @rdname spark.fpGrowth #' @aliases associationRules,FPGrowthModel-method #' @note spark.associationRules(FPGrowthModel) since 2.2.0 diff --git a/R/pkg/tests/fulltests/test_mllib_fpm.R b/R/pkg/tests/fulltests/test_mllib_fpm.R index bc1e17538d41a..78d26d3324473 100644 --- a/R/pkg/tests/fulltests/test_mllib_fpm.R +++ b/R/pkg/tests/fulltests/test_mllib_fpm.R @@ -45,7 +45,8 @@ test_that("spark.fpGrowth", { antecedent = I(list(list("2"), list("3"))), consequent = I(list(list("1"), list("1"))), confidence = c(1, 1), - lift = c(1, 1) + lift = c(1, 1), + support = c(0.75, 0.5) ) expect_equivalent(expected_association_rules, collect(spark.associationRules(model))) diff --git a/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala b/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala index e50d4255b1f37..f1a68edaed950 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala @@ -244,9 +244,9 @@ class FPGrowthModel private[ml] ( @transient private var _cachedRules: DataFrame = _ /** - * Get association rules fitted using the minConfidence. Returns a dataframe with four fields, - * "antecedent", "consequent", "confidence" and "lift", where "antecedent" and "consequent" are - * Array[T], whereas "confidence" and "lift" are Double. + * Get association rules fitted using the minConfidence. Returns a dataframe with five fields, + * "antecedent", "consequent", "confidence", "lift" and "support", where "antecedent" and + * "consequent" are Array[T], whereas "confidence", "lift" and "support" are Double. */ @Since("2.2.0") @transient def associationRules: DataFrame = { @@ -254,7 +254,8 @@ class FPGrowthModel private[ml] ( _cachedRules } else { _cachedRules = AssociationRules - .getAssociationRulesFromFP(freqItemsets, "items", "freq", $(minConfidence), itemSupport) + .getAssociationRulesFromFP(freqItemsets, "items", "freq", $(minConfidence), itemSupport, + numTrainingRecords) _cachedMinConf = $(minConfidence) _cachedRules } @@ -385,6 +386,7 @@ private[fpm] object AssociationRules { * @param freqCol column name for appearance count of the frequent itemsets * @param minConfidence minimum confidence for generating the association rules * @param itemSupport map containing an item and its support + * @param numTrainingRecords count of training Dataset * @return a DataFrame("antecedent"[Array], "consequent"[Array], "confidence"[Double], * "lift" [Double]) containing the association rules. */ @@ -393,21 +395,23 @@ private[fpm] object AssociationRules { itemsCol: String, freqCol: String, minConfidence: Double, - itemSupport: scala.collection.Map[T, Double]): DataFrame = { - + itemSupport: scala.collection.Map[T, Double], + numTrainingRecords: Long): DataFrame = { val freqItemSetRdd = dataset.select(itemsCol, freqCol).rdd .map(row => new FreqItemset(row.getSeq[T](0).toArray, row.getLong(1))) val rows = new MLlibAssociationRules() .setMinConfidence(minConfidence) .run(freqItemSetRdd, itemSupport) - .map(r => Row(r.antecedent, r.consequent, r.confidence, r.lift.orNull)) + .map(r => Row(r.antecedent, r.consequent, r.confidence, r.lift.orNull, + r.freqUnion / numTrainingRecords)) val dt = dataset.schema(itemsCol).dataType val schema = StructType(Seq( StructField("antecedent", dt, nullable = false), StructField("consequent", dt, nullable = false), StructField("confidence", DoubleType, nullable = false), - StructField("lift", DoubleType))) + StructField("lift", DoubleType), + StructField("support", DoubleType, nullable = false))) val rules = dataset.sparkSession.createDataFrame(rows, schema) rules } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/AssociationRules.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/AssociationRules.scala index 43d256bbc46c3..601c7da30ffed 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/AssociationRules.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/AssociationRules.scala @@ -124,7 +124,7 @@ object AssociationRules { class Rule[Item] private[fpm] ( @Since("1.5.0") val antecedent: Array[Item], @Since("1.5.0") val consequent: Array[Item], - freqUnion: Double, + private[spark] val freqUnion: Double, freqAntecedent: Double, freqConsequent: Option[Double]) extends Serializable { diff --git a/mllib/src/test/scala/org/apache/spark/ml/fpm/FPGrowthSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/fpm/FPGrowthSuite.scala index b75526a48371a..d42ced0f8f91b 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/fpm/FPGrowthSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/fpm/FPGrowthSuite.scala @@ -39,9 +39,9 @@ class FPGrowthSuite extends SparkFunSuite with MLlibTestSparkContext with Defaul val model = new FPGrowth().setMinSupport(0.5).fit(data) val generatedRules = model.setMinConfidence(0.5).associationRules val expectedRules = spark.createDataFrame(Seq( - (Array("2"), Array("1"), 1.0, 1.0), - (Array("1"), Array("2"), 0.75, 1.0) - )).toDF("antecedent", "consequent", "confidence", "lift") + (Array("2"), Array("1"), 1.0, 1.0, 0.75), + (Array("1"), Array("2"), 0.75, 1.0, 0.75) + )).toDF("antecedent", "consequent", "confidence", "lift", "support") .withColumn("antecedent", col("antecedent").cast(ArrayType(dt))) .withColumn("consequent", col("consequent").cast(ArrayType(dt))) assert(expectedRules.sort("antecedent").rdd.collect().sameElements( @@ -61,6 +61,31 @@ class FPGrowthSuite extends SparkFunSuite with MLlibTestSparkContext with Defaul } } + test("FPGrowth associationRules") { + val dataset = spark.createDataFrame(Seq( + (1, Array("1", "2")), + (2, Array("3")), + (3, Array("4", "5")), + (4, Array("1", "2", "3")), + (5, Array("2")) + )).toDF("id", "items") + val model = new FPGrowth().setMinSupport(0.1).setMinConfidence(0.1).fit(dataset) + val expectedRules = spark.createDataFrame(Seq( + (Array("2"), Array("1"), 0.6666666666666666, 1.6666666666666665, 0.4), + (Array("2"), Array("3"), 0.3333333333333333, 0.8333333333333333, 0.2), + (Array("3"), Array("1"), 0.5, 1.25, 0.2), + (Array("3"), Array("2"), 0.5, 0.8333333333333334, 0.2), + (Array("1", "3"), Array("2"), 1.0, 1.6666666666666667, 0.2), + (Array("1", "2"), Array("3"), 0.5, 1.25, 0.2), + (Array("4"), Array("5"), 1.0, 5.0, 0.2), + (Array("5"), Array("4"), 1.0, 5.0, 0.2), + (Array("1"), Array("3"), 0.5, 1.25, 0.2), + (Array("1"), Array("2"), 1.0, 1.6666666666666667, 0.4), + (Array("3", "2"), Array("1"), 1.0, 2.5, 0.2) + )).toDF("antecedent", "consequent", "confidence", "lift", "support") + assert(expectedRules.collect().toSet.equals(model.associationRules.collect().toSet)) + } + test("FPGrowth getFreqItems") { val model = new FPGrowth().setMinSupport(0.7).fit(dataset) val expectedFreq = spark.createDataFrame(Seq( diff --git a/python/pyspark/ml/fpm.py b/python/pyspark/ml/fpm.py index 7d933daf9e032..7a5591f3fbf76 100644 --- a/python/pyspark/ml/fpm.py +++ b/python/pyspark/ml/fpm.py @@ -180,15 +180,15 @@ class FPGrowth(JavaEstimator, _FPGrowthParams, JavaMLWritable, JavaMLReadable): only showing top 5 rows ... >>> fpm.associationRules.show(5) - +----------+----------+----------+----+ - |antecedent|consequent|confidence|lift| - +----------+----------+----------+----+ - | [t, s]| [y]| 1.0| 2.0| - | [t, s]| [x]| 1.0| 1.5| - | [t, s]| [z]| 1.0| 1.2| - | [p]| [r]| 1.0| 2.0| - | [p]| [z]| 1.0| 1.2| - +----------+----------+----------+----+ + +----------+----------+----------+----+------------------+ + |antecedent|consequent|confidence|lift| support| + +----------+----------+----------+----+------------------+ + | [t, s]| [y]| 1.0| 2.0|0.3333333333333333| + | [t, s]| [x]| 1.0| 1.5|0.3333333333333333| + | [t, s]| [z]| 1.0| 1.2|0.3333333333333333| + | [p]| [r]| 1.0| 2.0|0.3333333333333333| + | [p]| [z]| 1.0| 1.2|0.3333333333333333| + +----------+----------+----------+----+------------------+ only showing top 5 rows ... >>> new_data = spark.createDataFrame([(["t", "s"], )], ["items"]) diff --git a/python/pyspark/ml/tests/test_algorithms.py b/python/pyspark/ml/tests/test_algorithms.py index 2faf2d98f0271..c948bd0c646de 100644 --- a/python/pyspark/ml/tests/test_algorithms.py +++ b/python/pyspark/ml/tests/test_algorithms.py @@ -226,8 +226,8 @@ def test_association_rules(self): fpm = fp.fit(self.data) expected_association_rules = self.spark.createDataFrame( - [([3], [1], 1.0, 1.0), ([2], [1], 1.0, 1.0)], - ["antecedent", "consequent", "confidence", "lift"] + [([3], [1], 1.0, 1.0, 0.5), ([2], [1], 1.0, 1.0, 0.75)], + ["antecedent", "consequent", "confidence", "lift", "support"] ) actual_association_rules = fpm.associationRules From 879513370767f647765ff5b96adb08f5b8c46489 Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Fri, 26 Jun 2020 12:57:30 -0500 Subject: [PATCH 02/15] [SPARK-20249][ML][PYSPARK] Add training summary for LinearSVCModel ### What changes were proposed in this pull request? Add training summary for LinearSVCModel...... ### Why are the changes needed? so that user can get the training process status, such as loss value of each iteration and total iteration number. ### Does this PR introduce _any_ user-facing change? Yes ```LinearSVCModel.summary``` ```LinearSVCModel.evaluate``` ### How was this patch tested? new tests Closes #28884 from huaxingao/svc_summary. Authored-by: Huaxin Gao Signed-off-by: Sean Owen --- .../spark/ml/classification/LinearSVC.scala | 117 +++++++++++++++++- .../ml/classification/LinearSVCSuite.scala | 53 +++++++- python/pyspark/ml/classification.py | 48 ++++++- .../pyspark/ml/tests/test_training_summary.py | 46 ++++++- 4 files changed, 257 insertions(+), 7 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala index 217398c51b393..1659bbb1d34b3 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala @@ -36,7 +36,7 @@ import org.apache.spark.ml.stat._ import org.apache.spark.ml.util._ import org.apache.spark.ml.util.Instrumentation.instrumented import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{Dataset, Row} +import org.apache.spark.sql._ import org.apache.spark.storage.StorageLevel /** Params for linear SVM Classifier. */ @@ -267,7 +267,26 @@ class LinearSVC @Since("2.2.0") ( if (featuresStd(i) != 0.0) rawCoefficients(i) / featuresStd(i) else 0.0 } val intercept = if ($(fitIntercept)) rawCoefficients.last else 0.0 - copyValues(new LinearSVCModel(uid, Vectors.dense(coefficientArray), intercept)) + createModel(dataset, Vectors.dense(coefficientArray), intercept, objectiveHistory) + } + + private def createModel( + dataset: Dataset[_], + coefficients: Vector, + intercept: Double, + objectiveHistory: Array[Double]): LinearSVCModel = { + val model = copyValues(new LinearSVCModel(uid, coefficients, intercept)) + val weightColName = if (!isDefined(weightCol)) "weightCol" else $(weightCol) + + val (summaryModel, rawPredictionColName, predictionColName) = model.findSummaryModel() + val summary = new LinearSVCTrainingSummaryImpl( + summaryModel.transform(dataset), + rawPredictionColName, + predictionColName, + $(labelCol), + weightColName, + objectiveHistory) + model.setSummary(Some(summary)) } private def trainOnRows( @@ -352,7 +371,7 @@ class LinearSVCModel private[classification] ( @Since("2.2.0") val coefficients: Vector, @Since("2.2.0") val intercept: Double) extends ClassificationModel[Vector, LinearSVCModel] - with LinearSVCParams with MLWritable { + with LinearSVCParams with MLWritable with HasTrainingSummary[LinearSVCTrainingSummary] { @Since("2.2.0") override val numClasses: Int = 2 @@ -368,6 +387,48 @@ class LinearSVCModel private[classification] ( BLAS.dot(features, coefficients) + intercept } + /** + * Gets summary of model on training set. An exception is thrown + * if `hasSummary` is false. + */ + @Since("3.1.0") + override def summary: LinearSVCTrainingSummary = super.summary + + /** + * If the rawPrediction and prediction columns are set, this method returns the current model, + * otherwise it generates new columns for them and sets them as columns on a new copy of + * the current model + */ + private[classification] def findSummaryModel(): (LinearSVCModel, String, String) = { + val model = if ($(rawPredictionCol).isEmpty && $(predictionCol).isEmpty) { + copy(ParamMap.empty) + .setRawPredictionCol("rawPrediction_" + java.util.UUID.randomUUID.toString) + .setPredictionCol("prediction_" + java.util.UUID.randomUUID.toString) + } else if ($(rawPredictionCol).isEmpty) { + copy(ParamMap.empty).setRawPredictionCol("rawPrediction_" + + java.util.UUID.randomUUID.toString) + } else if ($(predictionCol).isEmpty) { + copy(ParamMap.empty).setPredictionCol("prediction_" + java.util.UUID.randomUUID.toString) + } else { + this + } + (model, model.getRawPredictionCol, model.getPredictionCol) + } + + /** + * Evaluates the model on a test dataset. + * + * @param dataset Test dataset to evaluate model on. + */ + @Since("3.1.0") + def evaluate(dataset: Dataset[_]): LinearSVCSummary = { + val weightColName = if (!isDefined(weightCol)) "weightCol" else $(weightCol) + // Handle possible missing or invalid rawPrediction or prediction columns + val (summaryModel, rawPrediction, predictionColName) = findSummaryModel() + new LinearSVCSummaryImpl(summaryModel.transform(dataset), + rawPrediction, predictionColName, $(labelCol), weightColName) + } + override def predict(features: Vector): Double = { if (margin(features) > $(threshold)) 1.0 else 0.0 } @@ -439,3 +500,53 @@ object LinearSVCModel extends MLReadable[LinearSVCModel] { } } } + +/** + * Abstraction for LinearSVC results for a given model. + */ +sealed trait LinearSVCSummary extends BinaryClassificationSummary + +/** + * Abstraction for LinearSVC training results. + */ +sealed trait LinearSVCTrainingSummary extends LinearSVCSummary with TrainingSummary + +/** + * LinearSVC results for a given model. + * + * @param predictions dataframe output by the model's `transform` method. + * @param scoreCol field in "predictions" which gives the rawPrediction of each instance. + * @param predictionCol field in "predictions" which gives the prediction for a data instance as a + * double. + * @param labelCol field in "predictions" which gives the true label of each instance. + * @param weightCol field in "predictions" which gives the weight of each instance. + */ +private class LinearSVCSummaryImpl( + @transient override val predictions: DataFrame, + override val scoreCol: String, + override val predictionCol: String, + override val labelCol: String, + override val weightCol: String) + extends LinearSVCSummary + +/** + * LinearSVC training results. + * + * @param predictions dataframe output by the model's `transform` method. + * @param scoreCol field in "predictions" which gives the rawPrediction of each instance. + * @param predictionCol field in "predictions" which gives the prediction for a data instance as a + * double. + * @param labelCol field in "predictions" which gives the true label of each instance. + * @param weightCol field in "predictions" which gives the weight of each instance. + * @param objectiveHistory objective function (scaled loss + regularization) at each iteration. + */ +private class LinearSVCTrainingSummaryImpl( + predictions: DataFrame, + scoreCol: String, + predictionCol: String, + labelCol: String, + weightCol: String, + override val objectiveHistory: Array[Double]) + extends LinearSVCSummaryImpl( + predictions, scoreCol, predictionCol, labelCol, weightCol) + with LinearSVCTrainingSummary diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala index 579d6b12ab99f..a66397324c1a6 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala @@ -30,7 +30,7 @@ import org.apache.spark.ml.param.ParamsSuite import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest, MLTestingUtils} import org.apache.spark.ml.util.TestingUtils._ import org.apache.spark.sql.{Dataset, Row} -import org.apache.spark.sql.functions.udf +import org.apache.spark.sql.functions._ class LinearSVCSuite extends MLTest with DefaultReadWriteTest { @@ -284,6 +284,57 @@ class LinearSVCSuite extends MLTest with DefaultReadWriteTest { assert(model1.coefficients ~== coefficientsSK relTol 4E-3) } + test("summary and training summary") { + val lsvc = new LinearSVC() + val model = lsvc.setMaxIter(5).fit(smallBinaryDataset) + + val summary = model.evaluate(smallBinaryDataset) + + assert(model.summary.accuracy === summary.accuracy) + assert(model.summary.weightedPrecision === summary.weightedPrecision) + assert(model.summary.weightedRecall === summary.weightedRecall) + assert(model.summary.pr.collect() === summary.pr.collect()) + assert(model.summary.roc.collect() === summary.roc.collect()) + assert(model.summary.areaUnderROC === summary.areaUnderROC) + + // verify instance weight works + val lsvc2 = new LinearSVC() + .setMaxIter(5) + .setWeightCol("weight") + + val smallBinaryDatasetWithWeight = + smallBinaryDataset.select(col("label"), col("features"), lit(2.5).as("weight")) + + val summary2 = model.evaluate(smallBinaryDatasetWithWeight) + + val model2 = lsvc2.fit(smallBinaryDatasetWithWeight) + assert(model2.summary.accuracy === summary2.accuracy) + assert(model2.summary.weightedPrecision ~== summary2.weightedPrecision relTol 1e-6) + assert(model2.summary.weightedRecall === summary2.weightedRecall) + assert(model2.summary.pr.collect() === summary2.pr.collect()) + assert(model2.summary.roc.collect() === summary2.roc.collect()) + assert(model2.summary.areaUnderROC === summary2.areaUnderROC) + + assert(model2.summary.accuracy === model.summary.accuracy) + assert(model2.summary.weightedPrecision ~== model.summary.weightedPrecision relTol 1e-6) + assert(model2.summary.weightedRecall === model.summary.weightedRecall) + assert(model2.summary.pr.collect() === model.summary.pr.collect()) + assert(model2.summary.roc.collect() === model.summary.roc.collect()) + assert(model2.summary.areaUnderROC === model.summary.areaUnderROC) + } + + test("linearSVC training summary totalIterations") { + Seq(1, 5, 10, 20, 100).foreach { maxIter => + val trainer = new LinearSVC().setMaxIter(maxIter) + val model = trainer.fit(smallBinaryDataset) + if (maxIter == 1) { + assert(model.summary.totalIterations === maxIter) + } else { + assert(model.summary.totalIterations <= maxIter) + } + } + } + test("read/write: SVM") { def checkModelData(model: LinearSVCModel, model2: LinearSVCModel): Unit = { assert(model.intercept === model2.intercept) diff --git a/python/pyspark/ml/classification.py b/python/pyspark/ml/classification.py index ff506066519cd..bdd37c99df0a8 100644 --- a/python/pyspark/ml/classification.py +++ b/python/pyspark/ml/classification.py @@ -39,6 +39,7 @@ from pyspark.storagelevel import StorageLevel __all__ = ['LinearSVC', 'LinearSVCModel', + 'LinearSVCSummary', 'LinearSVCTrainingSummary', 'LogisticRegression', 'LogisticRegressionModel', 'LogisticRegressionSummary', 'LogisticRegressionTrainingSummary', 'BinaryLogisticRegressionSummary', 'BinaryLogisticRegressionTrainingSummary', @@ -683,7 +684,8 @@ def setBlockSize(self, value): return self._set(blockSize=value) -class LinearSVCModel(_JavaClassificationModel, _LinearSVCParams, JavaMLWritable, JavaMLReadable): +class LinearSVCModel(_JavaClassificationModel, _LinearSVCParams, JavaMLWritable, JavaMLReadable, + HasTrainingSummary): """ Model fitted by LinearSVC. @@ -713,6 +715,50 @@ def intercept(self): """ return self._call_java("intercept") + @since("3.1.0") + def summary(self): + """ + Gets summary (e.g. accuracy/precision/recall, objective history, total iterations) of model + trained on the training set. An exception is thrown if `trainingSummary is None`. + """ + if self.hasSummary: + return LinearSVCTrainingSummary(super(LinearSVCModel, self).summary) + else: + raise RuntimeError("No training summary available for this %s" % + self.__class__.__name__) + + @since("3.1.0") + def evaluate(self, dataset): + """ + Evaluates the model on a test dataset. + + :param dataset: + Test dataset to evaluate model on, where dataset is an + instance of :py:class:`pyspark.sql.DataFrame` + """ + if not isinstance(dataset, DataFrame): + raise ValueError("dataset must be a DataFrame but got %s." % type(dataset)) + java_lsvc_summary = self._call_java("evaluate", dataset) + return LinearSVCSummary(java_lsvc_summary) + + +class LinearSVCSummary(_BinaryClassificationSummary): + """ + Abstraction for LinearSVC Results for a given model. + .. versionadded:: 3.1.0 + """ + pass + + +@inherit_doc +class LinearSVCTrainingSummary(LinearSVCSummary, _TrainingSummary): + """ + Abstraction for LinearSVC Training results. + + .. versionadded:: 3.1.0 + """ + pass + class _LogisticRegressionParams(_ProbabilisticClassifierParams, HasRegParam, HasElasticNetParam, HasMaxIter, HasFitIntercept, HasTol, diff --git a/python/pyspark/ml/tests/test_training_summary.py b/python/pyspark/ml/tests/test_training_summary.py index ac944d8397a86..19acd194f4ddf 100644 --- a/python/pyspark/ml/tests/test_training_summary.py +++ b/python/pyspark/ml/tests/test_training_summary.py @@ -21,8 +21,8 @@ if sys.version > '3': basestring = str -from pyspark.ml.classification import BinaryLogisticRegressionSummary, LogisticRegression, \ - LogisticRegressionSummary +from pyspark.ml.classification import BinaryLogisticRegressionSummary, LinearSVC, \ + LinearSVCSummary, LogisticRegression, LogisticRegressionSummary from pyspark.ml.clustering import BisectingKMeans, GaussianMixture, KMeans from pyspark.ml.linalg import Vectors from pyspark.ml.regression import GeneralizedLinearRegression, LinearRegression @@ -193,6 +193,48 @@ def test_multiclass_logistic_regression_summary(self): self.assertFalse(isinstance(sameSummary, BinaryLogisticRegressionSummary)) self.assertAlmostEqual(sameSummary.accuracy, s.accuracy) + def test_linear_svc_summary(self): + df = self.spark.createDataFrame([(1.0, 2.0, Vectors.dense(1.0, 1.0, 1.0)), + (0.0, 2.0, Vectors.dense(1.0, 2.0, 3.0))], + ["label", "weight", "features"]) + svc = LinearSVC(maxIter=5, weightCol="weight") + model = svc.fit(df) + self.assertTrue(model.hasSummary) + s = model.summary() + # test that api is callable and returns expected types + self.assertTrue(isinstance(s.predictions, DataFrame)) + self.assertEqual(s.scoreCol, "rawPrediction") + self.assertEqual(s.labelCol, "label") + self.assertEqual(s.predictionCol, "prediction") + objHist = s.objectiveHistory + self.assertTrue(isinstance(objHist, list) and isinstance(objHist[0], float)) + self.assertGreater(s.totalIterations, 0) + self.assertTrue(isinstance(s.labels, list)) + self.assertTrue(isinstance(s.truePositiveRateByLabel, list)) + self.assertTrue(isinstance(s.falsePositiveRateByLabel, list)) + self.assertTrue(isinstance(s.precisionByLabel, list)) + self.assertTrue(isinstance(s.recallByLabel, list)) + self.assertTrue(isinstance(s.fMeasureByLabel(), list)) + self.assertTrue(isinstance(s.fMeasureByLabel(1.0), list)) + self.assertTrue(isinstance(s.roc, DataFrame)) + self.assertAlmostEqual(s.areaUnderROC, 1.0, 2) + self.assertTrue(isinstance(s.pr, DataFrame)) + self.assertTrue(isinstance(s.fMeasureByThreshold, DataFrame)) + self.assertTrue(isinstance(s.precisionByThreshold, DataFrame)) + self.assertTrue(isinstance(s.recallByThreshold, DataFrame)) + print(s.weightedTruePositiveRate) + self.assertAlmostEqual(s.weightedTruePositiveRate, 0.5, 2) + self.assertAlmostEqual(s.weightedFalsePositiveRate, 0.5, 2) + self.assertAlmostEqual(s.weightedRecall, 0.5, 2) + self.assertAlmostEqual(s.weightedPrecision, 0.25, 2) + self.assertAlmostEqual(s.weightedFMeasure(), 0.3333333333333333, 2) + self.assertAlmostEqual(s.weightedFMeasure(1.0), 0.3333333333333333, 2) + # test evaluation (with training dataset) produces a summary with same values + # one check is enough to verify a summary is returned, Scala version runs full test + sameSummary = model.evaluate(df) + self.assertTrue(isinstance(sameSummary, LinearSVCSummary)) + self.assertAlmostEqual(sameSummary.areaUnderROC, s.areaUnderROC) + def test_gaussian_mixture_summary(self): data = [(Vectors.dense(1.0),), (Vectors.dense(5.0),), (Vectors.dense(10.0),), (Vectors.sparse(1, [], []),)] From ac3a0551d82c8e808d01aecbd1f6918cfe331ec4 Mon Sep 17 00:00:00 2001 From: GuoPhilipse <46367746+GuoPhilipse@users.noreply.github.com> Date: Fri, 26 Jun 2020 19:06:31 -0700 Subject: [PATCH 03/15] [SPARK-32088][PYTHON] Pin the timezone in timestamp_seconds doctest ### What changes were proposed in this pull request? Add American timezone during timestamp_seconds doctest ### Why are the changes needed? `timestamp_seconds` doctest in `functions.py` used default timezone to get expected result For example: ```python >>> time_df = spark.createDataFrame([(1230219000,)], ['unix_time']) >>> time_df.select(timestamp_seconds(time_df.unix_time).alias('ts')).collect() [Row(ts=datetime.datetime(2008, 12, 25, 7, 30))] ``` But when we have a non-american timezone, the test case will get different test result. For example, when we set current timezone as `Asia/Shanghai`, the test result will be ``` [Row(ts=datetime.datetime(2008, 12, 25, 23, 30))] ``` So no matter where we run the test case ,we will always get the expected permanent result if we set the timezone on one specific area. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit test Closes #28932 from GuoPhilipse/SPARK-32088-fix-timezone-issue. Lead-authored-by: GuoPhilipse <46367746+GuoPhilipse@users.noreply.github.com> Co-authored-by: GuoPhilipse Signed-off-by: Dongjoon Hyun --- python/pyspark/sql/functions.py | 2 ++ sql/core/src/main/scala/org/apache/spark/sql/functions.scala | 4 ++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 0c8c34dd87996..b0498d0298785 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -1431,9 +1431,11 @@ def to_utc_timestamp(timestamp, tz): def timestamp_seconds(col): """ >>> from pyspark.sql.functions import timestamp_seconds + >>> spark.conf.set("spark.sql.session.timeZone", "America/Los_Angeles") >>> time_df = spark.createDataFrame([(1230219000,)], ['unix_time']) >>> time_df.select(timestamp_seconds(time_df.unix_time).alias('ts')).collect() [Row(ts=datetime.datetime(2008, 12, 25, 7, 30))] + >>> spark.conf.unset("spark.sql.session.timeZone") """ sc = SparkContext._active_spark_context diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index 62ad5ea9b5935..239b705a473d0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -3360,8 +3360,8 @@ object functions { /** * Creates timestamp from the number of seconds since UTC epoch. - * @group = datetime_funcs - * @since = 3.1.0 + * @group datetime_funcs + * @since 3.1.0 */ def timestamp_seconds(e: Column): Column = withExpr { SecondsToTimestamp(e.expr) From 44aecaa9124fb2158f009771022c64ede4b582dc Mon Sep 17 00:00:00 2001 From: Guy Khazma Date: Fri, 26 Jun 2020 19:12:42 -0700 Subject: [PATCH 04/15] [SPARK-32099][DOCS] Remove broken link in cloud integration documentation ### What changes were proposed in this pull request? The 3rd link in `IBM Cloud Object Storage connector for Apache Spark` is broken. The PR removes this link. ### Why are the changes needed? broken link ### Does this PR introduce _any_ user-facing change? yes, the broken link is removed from the doc. ### How was this patch tested? doc generation passes successfully as before Closes #28927 from guykhazma/spark32099. Authored-by: Guy Khazma Signed-off-by: Dongjoon Hyun --- docs/cloud-integration.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/cloud-integration.md b/docs/cloud-integration.md index b2a3e77f1ee9d..01f437f38ef17 100644 --- a/docs/cloud-integration.md +++ b/docs/cloud-integration.md @@ -257,5 +257,5 @@ Here is the documentation on the standard connectors both from Apache and the cl * [Amazon EMR File System (EMRFS)](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-fs.html). From Amazon * [Google Cloud Storage Connector for Spark and Hadoop](https://cloud.google.com/hadoop/google-cloud-storage-connector). From Google * [The Azure Blob Filesystem driver (ABFS)](https://docs.microsoft.com/en-us/azure/storage/blobs/data-lake-storage-abfs-driver) -* IBM Cloud Object Storage connector for Apache Spark: [Stocator](https://github.com/CODAIT/stocator), [IBM Object Storage](https://www.ibm.com/cloud/object-storage), [how-to-use-connector](https://developer.ibm.com/code/2018/08/16/installing-running-stocator-apache-spark-ibm-cloud-object-storage). From IBM +* IBM Cloud Object Storage connector for Apache Spark: [Stocator](https://github.com/CODAIT/stocator), [IBM Object Storage](https://www.ibm.com/cloud/object-storage). From IBM From 7445c7534ba11bcbdf2e05259cd4f5cde13fe5fb Mon Sep 17 00:00:00 2001 From: gengjiaan Date: Fri, 26 Jun 2020 19:36:06 -0700 Subject: [PATCH 05/15] [SPARK-31845][CORE][TESTS] Refactor DAGSchedulerSuite by introducing completeAndCheckAnswer and using completeNextStageWithFetchFailure ### What changes were proposed in this pull request? **First** `DAGSchedulerSuite` provides `completeNextStageWithFetchFailure` to make all tasks in non first stage occur `FetchFailed`. But many test case uses complete directly as follows: ```scala complete(taskSets(1), Seq( (FetchFailed(makeBlockManagerId("hostA"), shuffleDep1.shuffleId, 0L, 0, 0, "ignored"), null))) ``` We need to reuse `completeNextStageWithFetchFailure`. **Second** `DAGSchedulerSuite` also check the results show below: ```scala complete(taskSets(0), Seq((Success, 42))) assert(results === Map(0 -> 42)) ``` We can extract it as a generic method of `checkAnswer`. ### Why are the changes needed? Reuse `completeNextStageWithFetchFailure` ### Does this PR introduce _any_ user-facing change? 'No'. ### How was this patch tested? Jenkins test Closes #28866 from beliefer/reuse-completeNextStageWithFetchFailure. Authored-by: gengjiaan Signed-off-by: Dongjoon Hyun --- .../spark/scheduler/DAGSchedulerSuite.scala | 51 +++++++++---------- 1 file changed, 23 insertions(+), 28 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 9d412f2dba3ce..51d20d3428915 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -349,9 +349,9 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi it.next.asInstanceOf[Tuple2[_, _]]._1 /** Send the given CompletionEvent messages for the tasks in the TaskSet. */ - private def complete(taskSet: TaskSet, results: Seq[(TaskEndReason, Any)]): Unit = { - assert(taskSet.tasks.size >= results.size) - for ((result, i) <- results.zipWithIndex) { + private def complete(taskSet: TaskSet, taskEndInfos: Seq[(TaskEndReason, Any)]): Unit = { + assert(taskSet.tasks.size >= taskEndInfos.size) + for ((result, i) <- taskEndInfos.zipWithIndex) { if (i < taskSet.tasks.size) { runEvent(makeCompletionEvent(taskSet.tasks(i), result._1, result._2)) } @@ -405,6 +405,15 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi runEvent(JobCancelled(jobId, None)) } + /** Make some tasks in task set success and check results. */ + private def completeAndCheckAnswer( + taskSet: TaskSet, + taskEndInfos: Seq[(TaskEndReason, Any)], + expected: Map[Int, Any]): Unit = { + complete(taskSet, taskEndInfos) + assert(this.results === expected) + } + test("[SPARK-3353] parent stage should have lower stage id") { sc.parallelize(1 to 10).map(x => (x, x)).reduceByKey(_ + _, 4).count() val stageByOrderOfExecution = sparkListener.stageByOrderOfExecution @@ -461,8 +470,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi completeShuffleMapStageSuccessfully(0, 0, 1) completeShuffleMapStageSuccessfully(1, 0, 1) completeShuffleMapStageSuccessfully(2, 0, 1) - complete(taskSets(3), Seq((Success, 42))) - assert(results === Map(0 -> 42)) + completeAndCheckAnswer(taskSets(3), Seq((Success, 42)), Map(0 -> 42)) assertDataStructuresEmpty() } @@ -558,8 +566,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi test("run trivial job") { submit(new MyRDD(sc, 1, Nil), Array(0)) - complete(taskSets(0), List((Success, 42))) - assert(results === Map(0 -> 42)) + completeAndCheckAnswer(taskSets(0), Seq((Success, 42)), Map(0 -> 42)) assertDataStructuresEmpty() } @@ -567,8 +574,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi val baseRdd = new MyRDD(sc, 1, Nil) val finalRdd = new MyRDD(sc, 1, List(new OneToOneDependency(baseRdd))) submit(finalRdd, Array(0)) - complete(taskSets(0), Seq((Success, 42))) - assert(results === Map(0 -> 42)) + completeAndCheckAnswer(taskSets(0), Seq((Success, 42)), Map(0 -> 42)) assertDataStructuresEmpty() } @@ -592,8 +598,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi submit(finalRdd, Array(0)) val taskSet = taskSets(0) assertLocations(taskSet, Seq(Seq("hostA", "hostB"))) - complete(taskSet, Seq((Success, 42))) - assert(results === Map(0 -> 42)) + completeAndCheckAnswer(taskSet, Seq((Success, 42)), Map(0 -> 42)) assertDataStructuresEmpty() } @@ -729,8 +734,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi assert(failure === null) // When the task set completes normally, state should be correctly updated. - complete(taskSets(0), Seq((Success, 42))) - assert(results === Map(0 -> 42)) + completeAndCheckAnswer(taskSets(0), Seq((Success, 42)), Map(0 -> 42)) assertDataStructuresEmpty() assert(sparkListener.failedStages.isEmpty) @@ -746,8 +750,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi completeShuffleMapStageSuccessfully(0, 0, 1) assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1).toSet === HashSet(makeBlockManagerId("hostA"), makeBlockManagerId("hostB"))) - complete(taskSets(1), Seq((Success, 42))) - assert(results === Map(0 -> 42)) + completeAndCheckAnswer(taskSets(1), Seq((Success, 42)), Map(0 -> 42)) assertDataStructuresEmpty() } @@ -771,8 +774,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi // we can see both result blocks now assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1.host).toSet === HashSet("hostA", "hostB")) - complete(taskSets(3), Seq((Success, 43))) - assert(results === Map(0 -> 42, 1 -> 43)) + completeAndCheckAnswer(taskSets(3), Seq((Success, 43)), Map(0 -> 42, 1 -> 43)) assertDataStructuresEmpty() } @@ -1454,8 +1456,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi HashSet(makeBlockManagerId("hostB"), makeBlockManagerId("hostA"))) // finish the next stage normally, which completes the job - complete(taskSets(1), Seq((Success, 42), (Success, 43))) - assert(results === Map(0 -> 42, 1 -> 43)) + completeAndCheckAnswer(taskSets(1), Seq((Success, 42), (Success, 43)), Map(0 -> 42, 1 -> 43)) assertDataStructuresEmpty() } @@ -1796,9 +1797,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi // lets say there is a fetch failure in this task set, which makes us go back and // run stage 0, attempt 1 - complete(taskSets(1), Seq( - (FetchFailed(makeBlockManagerId("hostA"), - shuffleDep1.shuffleId, 0L, 0, 0, "ignored"), null))) + completeNextStageWithFetchFailure(1, 0, shuffleDep1) scheduler.resubmitFailedStages() // stage 0, attempt 1 should have the properties of job2 @@ -1872,9 +1871,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi // have the second stage complete normally completeShuffleMapStageSuccessfully(1, 0, 1, Seq("hostA", "hostC")) // fail the third stage because hostA went down - complete(taskSets(2), Seq( - (FetchFailed(makeBlockManagerId("hostA"), - shuffleDepTwo.shuffleId, 0L, 0, 0, "ignored"), null))) + completeNextStageWithFetchFailure(2, 0, shuffleDepTwo) // TODO assert this: // blockManagerMaster.removeExecutor("hostA-exec") // have DAGScheduler try again @@ -1900,9 +1897,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi // complete stage 1 completeShuffleMapStageSuccessfully(1, 0, 1) // pretend stage 2 failed because hostA went down - complete(taskSets(2), Seq( - (FetchFailed(makeBlockManagerId("hostA"), - shuffleDepTwo.shuffleId, 0L, 0, 0, "ignored"), null))) + completeNextStageWithFetchFailure(2, 0, shuffleDepTwo) // TODO assert this: // blockManagerMaster.removeExecutor("hostA-exec") // DAGScheduler should notice the cached copy of the second shuffle and try to get it rerun. From 9c134b57bff5b7e7f9c85aeed2e9539117a5b57d Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Fri, 26 Jun 2020 19:43:29 -0700 Subject: [PATCH 06/15] [SPARK-32058][BUILD] Use Apache Hadoop 3.2.0 dependency by default ### What changes were proposed in this pull request? According to the dev mailing list discussion, this PR aims to switch the default Apache Hadoop dependency from 2.7.4 to 3.2.0 for Apache Spark 3.1.0 on December 2020. | Item | Default Hadoop Dependency | |------|-----------------------------| | Apache Spark Website | 3.2.0 | | Apache Download Site | 3.2.0 | | Apache Snapshot | 3.2.0 | | Maven Central | 3.2.0 | | PyPI | 2.7.4 (We will switch later) | | CRAN | 2.7.4 (We will switch later) | | Homebrew | 3.2.0 (already) | In Apache Spark 3.0.0 release, we focused on the other features. This PR targets for [Apache Spark 3.1.0 scheduled on December 2020](https://spark.apache.org/versioning-policy.html). ### Why are the changes needed? Apache Hadoop 3.2 has many fixes and new cloud-friendly features. **Reference** - 2017-08-04: https://hadoop.apache.org/release/2.7.4.html - 2019-01-16: https://hadoop.apache.org/release/3.2.0.html ### Does this PR introduce _any_ user-facing change? Since the default Hadoop dependency changes, the users will get a better support in a cloud environment. ### How was this patch tested? Pass the Jenkins. Closes #28897 from dongjoon-hyun/SPARK-32058. Authored-by: Dongjoon Hyun Signed-off-by: Dongjoon Hyun --- dev/create-release/release-build.sh | 4 ++-- dev/run-tests.py | 2 +- pom.xml | 18 +++++++++--------- .../kubernetes/integration-tests/pom.xml | 6 +++--- 4 files changed, 15 insertions(+), 15 deletions(-) diff --git a/dev/create-release/release-build.sh b/dev/create-release/release-build.sh index eb972589a995e..31633456a6590 100755 --- a/dev/create-release/release-build.sh +++ b/dev/create-release/release-build.sh @@ -276,14 +276,14 @@ if [[ "$1" == "package" ]]; then # list of packages to be built, so it's ok for things to be missing in BINARY_PKGS_EXTRA. declare -A BINARY_PKGS_ARGS - BINARY_PKGS_ARGS["hadoop2.7"]="-Phadoop-2.7 $HIVE_PROFILES" + BINARY_PKGS_ARGS["hadoop3.2"]="-Phadoop-3.2 $HIVE_PROFILES" if ! is_dry_run; then BINARY_PKGS_ARGS["without-hadoop"]="-Phadoop-provided" if [[ $SPARK_VERSION < "3.0." ]]; then BINARY_PKGS_ARGS["hadoop2.6"]="-Phadoop-2.6 $HIVE_PROFILES" else BINARY_PKGS_ARGS["hadoop2.7-hive1.2"]="-Phadoop-2.7 -Phive-1.2 $HIVE_PROFILES" - BINARY_PKGS_ARGS["hadoop3.2"]="-Phadoop-3.2 $HIVE_PROFILES" + BINARY_PKGS_ARGS["hadoop2.7"]="-Phadoop-2.7 $HIVE_PROFILES" fi fi diff --git a/dev/run-tests.py b/dev/run-tests.py index 5255a77ec2081..ec04c37857d96 100755 --- a/dev/run-tests.py +++ b/dev/run-tests.py @@ -574,7 +574,7 @@ def main(): # if we're on the Amplab Jenkins build servers setup variables # to reflect the environment settings build_tool = os.environ.get("AMPLAB_JENKINS_BUILD_TOOL", "sbt") - hadoop_version = os.environ.get("AMPLAB_JENKINS_BUILD_PROFILE", "hadoop2.7") + hadoop_version = os.environ.get("AMPLAB_JENKINS_BUILD_PROFILE", "hadoop3.2") hive_version = os.environ.get("AMPLAB_JENKINS_BUILD_HIVE_PROFILE", "hive2.3") test_env = "amplab_jenkins" # add path for Python3 in Jenkins if we're calling from a Jenkins machine diff --git a/pom.xml b/pom.xml index 82c12ae3dcb80..08ca13bfe9d37 100644 --- a/pom.xml +++ b/pom.xml @@ -119,11 +119,11 @@ spark 1.7.30 1.2.17 - 2.7.4 + 3.2.0 2.5.0 ${hadoop.version} 3.4.14 - 2.7.1 + 2.13.0 org.apache.hive core @@ -170,7 +170,7 @@ 1.1.7.5 1.1.2 1.10 - 2.4 + 2.5 2.6 @@ -3054,16 +3054,16 @@ hadoop-2.7 - + + 2.7.4 + 2.7.1 + 2.4 + hadoop-3.2 - - 3.2.0 - 2.13.0 - 2.5 - + diff --git a/resource-managers/kubernetes/integration-tests/pom.xml b/resource-managers/kubernetes/integration-tests/pom.xml index 503540403f5ec..d1e00cc0b5b10 100644 --- a/resource-managers/kubernetes/integration-tests/pom.xml +++ b/resource-managers/kubernetes/integration-tests/pom.xml @@ -186,9 +186,6 @@ hadoop-2.7 - - true - com.amazonaws @@ -200,6 +197,9 @@ hadoop-3.2 + + true + com.amazonaws From 8c44d744631516a5cdaf63406e69a9dd11e5b878 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Sat, 27 Jun 2020 17:54:06 -0700 Subject: [PATCH 07/15] [SPARK-32071][SQL][TESTS] Add `make_interval` benchmark ### What changes were proposed in this pull request? Add benchmarks for interval constructor `make_interval` and measure perf of 4 cases: 1. Constant (year, month) 2. Constant (week, day) 3. Constant (hour, minute, second, second fraction) 4. All fields are NOT constant. The benchmark results are generated in the environment: | Item | Description | | ---- | ----| | Region | us-west-2 (Oregon) | | Instance | r3.xlarge | | AMI | ubuntu/images/hvm-ssd/ubuntu-bionic-18.04-amd64-server-20190722.1 (ami-06f2f779464715dc5) | | Java | OpenJDK 64-Bit Server VM 1.8.0_252 and OpenJDK 64-Bit Server VM 11.0.7+10 | ### Why are the changes needed? To have a base line for future perf improvements of `make_interval`, and to prevent perf regressions in the future. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? By running `IntervalBenchmark` via: ``` $ SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain org.apache.spark.sql.execution.benchmark.IntervalBenchmark" ``` Closes #28905 from MaxGekk/benchmark-make_interval. Authored-by: Max Gekk Signed-off-by: Dongjoon Hyun --- .../IntervalBenchmark-jdk11-results.txt | 63 +++++++++------ .../benchmarks/IntervalBenchmark-results.txt | 63 +++++++++------ .../benchmark/IntervalBenchmark.scala | 81 ++++++++++++++++--- 3 files changed, 146 insertions(+), 61 deletions(-) diff --git a/sql/core/benchmarks/IntervalBenchmark-jdk11-results.txt b/sql/core/benchmarks/IntervalBenchmark-jdk11-results.txt index 8958d7c53413f..70a64931049c0 100644 --- a/sql/core/benchmarks/IntervalBenchmark-jdk11-results.txt +++ b/sql/core/benchmarks/IntervalBenchmark-jdk11-results.txt @@ -1,29 +1,40 @@ -Java HotSpot(TM) 64-Bit Server VM 11.0.5+10-LTS on Mac OS X 10.15.3 -Intel(R) Core(TM) i9-9980HK CPU @ 2.40GHz +OpenJDK 64-Bit Server VM 11.0.7+10-post-Ubuntu-2ubuntu218.04 on Linux 4.15.0-1063-aws +Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz cast strings to intervals: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -prepare string w/ interval 448 469 20 2.2 447.6 1.0X -prepare string w/o interval 405 409 4 2.5 404.6 1.1X -1 units w/ interval 321 328 6 3.1 321.4 1.4X -1 units w/o interval 303 307 4 3.3 303.1 1.5X -2 units w/ interval 445 458 12 2.2 444.6 1.0X -2 units w/o interval 416 424 10 2.4 416.2 1.1X -3 units w/ interval 1006 1012 8 1.0 1006.4 0.4X -3 units w/o interval 1240 1249 8 0.8 1239.6 0.4X -4 units w/ interval 1295 1418 106 0.8 1295.4 0.3X -4 units w/o interval 1172 1188 15 0.9 1171.6 0.4X -5 units w/ interval 1326 1335 11 0.8 1325.6 0.3X -5 units w/o interval 1309 1336 44 0.8 1308.7 0.3X -6 units w/ interval 1441 1464 29 0.7 1441.0 0.3X -6 units w/o interval 1350 1369 17 0.7 1350.1 0.3X -7 units w/ interval 1606 1669 99 0.6 1605.6 0.3X -7 units w/o interval 1546 1557 12 0.6 1546.3 0.3X -8 units w/ interval 1771 1875 120 0.6 1770.6 0.3X -8 units w/o interval 1775 1789 13 0.6 1775.2 0.3X -9 units w/ interval 2126 2757 849 0.5 2126.4 0.2X -9 units w/o interval 2053 2070 21 0.5 2053.3 0.2X -10 units w/ interval 2209 2243 30 0.5 2209.1 0.2X -10 units w/o interval 2400 2702 365 0.4 2400.2 0.2X -11 units w/ interval 2616 2699 72 0.4 2616.5 0.2X -11 units w/o interval 3218 3380 195 0.3 3218.4 0.1X +prepare string w/ interval 708 829 110 1.4 708.0 1.0X +prepare string w/o interval 660 672 14 1.5 660.3 1.1X +1 units w/ interval 514 543 33 1.9 514.2 1.4X +1 units w/o interval 476 492 20 2.1 475.9 1.5X +2 units w/ interval 751 767 14 1.3 751.0 0.9X +2 units w/o interval 709 716 11 1.4 709.0 1.0X +3 units w/ interval 1541 1551 15 0.6 1540.9 0.5X +3 units w/o interval 1531 1532 1 0.7 1531.5 0.5X +4 units w/ interval 1764 1768 5 0.6 1763.5 0.4X +4 units w/o interval 1737 1745 8 0.6 1736.6 0.4X +5 units w/ interval 1920 1930 10 0.5 1919.7 0.4X +5 units w/o interval 1928 1936 11 0.5 1927.9 0.4X +6 units w/ interval 2124 2127 4 0.5 2124.2 0.3X +6 units w/o interval 2124 2125 1 0.5 2123.7 0.3X +7 units w/ interval 2525 2541 15 0.4 2525.5 0.3X +7 units w/o interval 2512 2518 11 0.4 2511.5 0.3X +8 units w/ interval 2578 2597 19 0.4 2578.1 0.3X +8 units w/o interval 2558 2562 6 0.4 2558.1 0.3X +9 units w/ interval 2742 2750 9 0.4 2741.8 0.3X +9 units w/o interval 2752 2762 11 0.4 2751.8 0.3X +10 units w/ interval 3112 3123 10 0.3 3111.9 0.2X +10 units w/o interval 3116 3130 14 0.3 3115.7 0.2X +11 units w/ interval 3255 3273 20 0.3 3255.3 0.2X +11 units w/o interval 3294 3305 14 0.3 3293.6 0.2X + +OpenJDK 64-Bit Server VM 11.0.7+10-post-Ubuntu-2ubuntu218.04 on Linux 4.15.0-1063-aws +Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz +make_interval(): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +-------------------------------------------------------------------------------------------------------------------------- +prepare make_interval() 3395 3410 16 0.3 3395.0 1.0X +make_interval(0, 1, 2, 3, 4, 5, 50.123456) 94 102 9 10.7 93.8 36.2X +make_interval(*, *, 2, 3, 4, 5, 50.123456) 136 139 4 7.3 136.5 24.9X +make_interval(0, 1, *, *, 4, 5, 50.123456) 115 119 4 8.7 114.8 29.6X +make_interval(0, 1, 2, 3, *, *, *) 3359 3382 37 0.3 3358.7 1.0X +make_interval(*, *, *, *, *, *, *) 3382 3388 9 0.3 3382.3 1.0X diff --git a/sql/core/benchmarks/IntervalBenchmark-results.txt b/sql/core/benchmarks/IntervalBenchmark-results.txt index 48af333b78ba4..98b9f55c2e15e 100644 --- a/sql/core/benchmarks/IntervalBenchmark-results.txt +++ b/sql/core/benchmarks/IntervalBenchmark-results.txt @@ -1,29 +1,40 @@ -Java HotSpot(TM) 64-Bit Server VM 1.8.0_231-b11 on Mac OS X 10.15.3 -Intel(R) Core(TM) i9-9980HK CPU @ 2.40GHz +OpenJDK 64-Bit Server VM 1.8.0_252-8u252-b09-1~18.04-b09 on Linux 4.15.0-1063-aws +Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz cast strings to intervals: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -prepare string w/ interval 389 410 21 2.6 388.7 1.0X -prepare string w/o interval 340 360 18 2.9 340.5 1.1X -1 units w/ interval 378 389 16 2.6 377.8 1.0X -1 units w/o interval 346 350 5 2.9 346.2 1.1X -2 units w/ interval 444 457 11 2.3 444.2 0.9X -2 units w/o interval 455 464 12 2.2 455.1 0.9X -3 units w/ interval 942 964 20 1.1 941.5 0.4X -3 units w/o interval 927 1020 93 1.1 927.3 0.4X -4 units w/ interval 1114 1127 17 0.9 1113.9 0.3X -4 units w/o interval 1100 1105 4 0.9 1100.3 0.4X -5 units w/ interval 1180 1244 57 0.8 1180.1 0.3X -5 units w/o interval 1135 1141 6 0.9 1135.2 0.3X -6 units w/ interval 1284 1316 48 0.8 1284.0 0.3X -6 units w/o interval 1276 1357 122 0.8 1276.1 0.3X -7 units w/ interval 1609 1636 32 0.6 1609.1 0.2X -7 units w/o interval 1551 1578 36 0.6 1550.9 0.3X -8 units w/ interval 1787 1874 129 0.6 1787.1 0.2X -8 units w/o interval 1751 1767 15 0.6 1750.6 0.2X -9 units w/ interval 1960 2065 141 0.5 1959.7 0.2X -9 units w/o interval 1885 1908 39 0.5 1885.1 0.2X -10 units w/ interval 2178 2185 11 0.5 2177.9 0.2X -10 units w/o interval 2150 2255 164 0.5 2150.1 0.2X -11 units w/ interval 2457 2542 139 0.4 2456.7 0.2X -11 units w/o interval 2557 2770 188 0.4 2556.7 0.2X +prepare string w/ interval 677 718 40 1.5 677.2 1.0X +prepare string w/o interval 602 624 19 1.7 602.2 1.1X +1 units w/ interval 582 598 20 1.7 581.8 1.2X +1 units w/o interval 549 591 64 1.8 549.1 1.2X +2 units w/ interval 758 773 14 1.3 758.2 0.9X +2 units w/o interval 723 738 14 1.4 722.6 0.9X +3 units w/ interval 1442 1450 11 0.7 1441.8 0.5X +3 units w/o interval 1426 1429 3 0.7 1426.4 0.5X +4 units w/ interval 1645 1652 11 0.6 1645.1 0.4X +4 units w/o interval 1618 1626 10 0.6 1617.6 0.4X +5 units w/ interval 1794 1803 13 0.6 1794.4 0.4X +5 units w/o interval 1783 1793 9 0.6 1783.2 0.4X +6 units w/ interval 1976 1984 11 0.5 1976.2 0.3X +6 units w/o interval 1948 1959 10 0.5 1947.9 0.3X +7 units w/ interval 2394 2408 18 0.4 2393.7 0.3X +7 units w/o interval 2387 2392 8 0.4 2386.8 0.3X +8 units w/ interval 2578 2588 15 0.4 2577.5 0.3X +8 units w/o interval 2572 2578 5 0.4 2571.8 0.3X +9 units w/ interval 2812 2829 19 0.4 2811.7 0.2X +9 units w/o interval 2811 2816 4 0.4 2810.7 0.2X +10 units w/ interval 3108 3116 10 0.3 3107.8 0.2X +10 units w/o interval 3107 3109 3 0.3 3106.8 0.2X +11 units w/ interval 3386 3392 8 0.3 3386.3 0.2X +11 units w/o interval 3374 3377 4 0.3 3374.0 0.2X + +OpenJDK 64-Bit Server VM 1.8.0_252-8u252-b09-1~18.04-b09 on Linux 4.15.0-1063-aws +Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz +make_interval(): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +-------------------------------------------------------------------------------------------------------------------------- +prepare make_interval() 3634 3684 47 0.3 3634.1 1.0X +make_interval(0, 1, 2, 3, 4, 5, 50.123456) 90 100 12 11.1 90.0 40.4X +make_interval(*, *, 2, 3, 4, 5, 50.123456) 114 119 5 8.8 114.3 31.8X +make_interval(0, 1, *, *, 4, 5, 50.123456) 121 138 21 8.3 120.7 30.1X +make_interval(0, 1, 2, 3, *, *, *) 3615 3621 9 0.3 3614.7 1.0X +make_interval(*, *, *, *, *, *, *) 3638 3657 21 0.3 3637.7 1.0X diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/IntervalBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/IntervalBenchmark.scala index 907e3f40c1911..96ad453aeb2d7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/IntervalBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/IntervalBenchmark.scala @@ -39,11 +39,11 @@ import org.apache.spark.sql.internal.SQLConf object IntervalBenchmark extends SqlBasedBenchmark { import spark.implicits._ - private def doBenchmark(cardinality: Long, exprs: Column*): Unit = { + private def doBenchmark(cardinality: Long, columns: Column*): Unit = { withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true") { spark .range(0, cardinality, 1, 1) - .select(exprs: _*) + .select(columns: _*) .queryExecution .toRdd .foreach(_ => ()) @@ -60,6 +60,26 @@ object IntervalBenchmark extends SqlBasedBenchmark { } } + private def doBenchmarkExpr(cardinality: Long, exprs: String*): Unit = { + withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true") { + spark + .range(0, cardinality, 1, 1) + .selectExpr(exprs: _*) + .queryExecution + .toRdd + .foreach(_ => ()) + } + } + + private def addCaseExpr( + benchmark: Benchmark, + cardinality: Long, + name: String, + exprs: String*): Unit = { + benchmark.addCase(name, numIters = 3) { _ => doBenchmarkExpr(cardinality, exprs: _*) } + } + + private def buildString(withPrefix: Boolean, units: Seq[String] = Seq.empty): Column = { val init = lit(if (withPrefix) "interval" else "") :: ($"id" % 10000).cast("string") :: @@ -78,25 +98,68 @@ object IntervalBenchmark extends SqlBasedBenchmark { } } - override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { - val N = 1000000 + private def benchmarkIntervalStringParsing(cardinality: Long): Unit = { val timeUnits = Seq( "13 months", " 1 months", "100 weeks", "9 days", "12 hours", "- 3 hours", "5 minutes", "45 seconds", "123 milliseconds", "567 microseconds") val intervalToTest = ListBuffer[String]() - val benchmark = new Benchmark("cast strings to intervals", N, output = output) + val benchmark = new Benchmark("cast strings to intervals", cardinality, output = output) // The first 2 cases are used to show the overhead of preparing the interval string. - addCase(benchmark, N, "prepare string w/ interval", buildString(true, timeUnits)) - addCase(benchmark, N, "prepare string w/o interval", buildString(false, timeUnits)) - addCase(benchmark, N, intervalToTest) // Only years + addCase(benchmark, cardinality, "prepare string w/ interval", buildString(true, timeUnits)) + addCase(benchmark, cardinality, "prepare string w/o interval", buildString(false, timeUnits)) + addCase(benchmark, cardinality, intervalToTest) // Only years for (unit <- timeUnits) { intervalToTest.append(unit) - addCase(benchmark, N, intervalToTest) + addCase(benchmark, cardinality, intervalToTest) } benchmark.run() } + + private def benchmarkMakeInterval(cardinality: Long): Unit = { + val benchmark = new Benchmark("make_interval()", cardinality, output = output) + val hmExprs = Seq("id % 24", "id % 60") + val hmsExprs = hmExprs ++ Seq("cast((id % 500000000) / 1000000.0 as decimal(18, 6))") + val ymExprs = Seq("(2000 + (id % 30))", "((id % 12) + 1)") + val wdExpr = Seq("((id % 54) + 1)", "((id % 1000) + 1)") + val args = ymExprs ++ wdExpr ++ hmsExprs + + addCaseExpr( + benchmark, + cardinality, + "prepare make_interval()", + args: _*) + val foldableExpr = "make_interval(0, 1, 2, 3, 4, 5, 50.123456)" + addCaseExpr(benchmark, cardinality, foldableExpr, foldableExpr) + addCaseExpr( + benchmark, + cardinality, + "make_interval(*, *, 2, 3, 4, 5, 50.123456)", + s"make_interval(${ymExprs.mkString(",")}, 2, 3, 4, 5, 50.123456)") + addCaseExpr( + benchmark, + cardinality, + "make_interval(0, 1, *, *, 4, 5, 50.123456)", + s"make_interval(0, 1, ${wdExpr.mkString(",")}, 4, 5, 50.123456)") + addCaseExpr( + benchmark, + cardinality, + "make_interval(0, 1, 2, 3, *, *, *)", + s"make_interval(0, 1, 2, 3, ${hmsExprs.mkString(",")})") + addCaseExpr( + benchmark, + cardinality, + "make_interval(*, *, *, *, *, *, *)", + s"make_interval(${args.mkString(",")})") + + benchmark.run() + } + + override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { + benchmarkIntervalStringParsing(1000000) + benchmarkMakeInterval(1000000) + } } From 6484c14c57434dd6961cf9e9e73bbe8aa04cda15 Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Sun, 28 Jun 2020 12:22:44 -0700 Subject: [PATCH 08/15] [SPARK-32115][SQL] Fix SUBSTRING to handle integer overflows ### What changes were proposed in this pull request? Bug fix for overflow case in `UTF8String.substringSQL`. ### Why are the changes needed? SQL query `SELECT SUBSTRING("abc", -1207959552, -1207959552)` incorrectly returns` "abc"` against expected output of `""`. For query `SUBSTRING("abc", -100, -100)`, we'll get the right output of `""`. ### Does this PR introduce _any_ user-facing change? Yes, bug fix for the overflow case. ### How was this patch tested? New UT. Closes #28937 from xuanyuanking/SPARK-32115. Authored-by: Yuanjian Li Signed-off-by: Dongjoon Hyun --- .../org/apache/spark/unsafe/types/UTF8String.java | 11 ++++++++++- .../apache/spark/unsafe/types/UTF8StringSuite.java | 4 ++++ .../catalyst/expressions/StringExpressionsSuite.scala | 4 ++++ 3 files changed, 18 insertions(+), 1 deletion(-) diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java index 186597fa64780..7205293aa48c5 100644 --- a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java +++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java @@ -341,8 +341,17 @@ public UTF8String substringSQL(int pos, int length) { // to the -ith element before the end of the sequence. If a start index i is 0, it // refers to the first element. int len = numChars(); + // `len + pos` does not overflow as `len >= 0`. int start = (pos > 0) ? pos -1 : ((pos < 0) ? len + pos : 0); - int end = (length == Integer.MAX_VALUE) ? len : start + length; + + int end; + if ((long) start + length > Integer.MAX_VALUE) { + end = Integer.MAX_VALUE; + } else if ((long) start + length < Integer.MIN_VALUE) { + end = Integer.MIN_VALUE; + } else { + end = start + length; + } return substring(start, end); } diff --git a/common/unsafe/src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java b/common/unsafe/src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java index 8f933877f82e6..70e276f7e5a8b 100644 --- a/common/unsafe/src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java +++ b/common/unsafe/src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java @@ -390,6 +390,10 @@ public void substringSQL() { assertEquals(fromString("example"), e.substringSQL(0, Integer.MAX_VALUE)); assertEquals(fromString("example"), e.substringSQL(1, Integer.MAX_VALUE)); assertEquals(fromString("xample"), e.substringSQL(2, Integer.MAX_VALUE)); + assertEquals(EMPTY_UTF8, e.substringSQL(-100, -100)); + assertEquals(EMPTY_UTF8, e.substringSQL(-1207959552, -1207959552)); + assertEquals(fromString("pl"), e.substringSQL(-3, 2)); + assertEquals(EMPTY_UTF8, e.substringSQL(Integer.MIN_VALUE, 6)); } @Test diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/StringExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/StringExpressionsSuite.scala index f18364d844ce1..967ccc42c632d 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/StringExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/StringExpressionsSuite.scala @@ -236,6 +236,10 @@ class StringExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { "xample", row) + // Substring with from negative position with negative length + checkEvaluation(Substring(s, Literal.create(-1207959552, IntegerType), + Literal.create(-1207959552, IntegerType)), "", row) + val s_notNull = 'a.string.notNull.at(0) assert(Substring(s, Literal.create(0, IntegerType), Literal.create(2, IntegerType)).nullable) From 197ac3b13239d50a1f34a5860940e353ca6b99d5 Mon Sep 17 00:00:00 2001 From: Warren Zhu Date: Sun, 28 Jun 2020 21:06:45 -0700 Subject: [PATCH 09/15] [SPARK-32124][CORE][SHS] Fix taskEndReasonFromJson to handle event logs from old Spark versions ### What changes were proposed in this pull request? Fix bug of exception when parse event log of fetch failed task end reason without `Map Index` ### Why are the changes needed? When Spark history server read event log produced by older version of spark 2.4 (which don't have `Map Index` field), parsing of TaskEndReason will fail. This will cause TaskEnd event being ignored. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? JsonProtocolSuite.test("FetchFailed Map Index backwards compatibility") Closes #28941 from warrenzhu25/shs-task. Authored-by: Warren Zhu Signed-off-by: Dongjoon Hyun --- .../scala/org/apache/spark/util/JsonProtocol.scala | 5 ++++- .../org/apache/spark/util/JsonProtocolSuite.scala | 11 +++++++++++ 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 1c788a30022d0..ced3f9d15720d 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -1078,7 +1078,10 @@ private[spark] object JsonProtocol { val blockManagerAddress = blockManagerIdFromJson(json \ "Block Manager Address") val shuffleId = (json \ "Shuffle ID").extract[Int] val mapId = (json \ "Map ID").extract[Long] - val mapIndex = (json \ "Map Index").extract[Int] + val mapIndex = (json \ "Map Index") match { + case JNothing => 0 + case x => x.extract[Int] + } val reduceId = (json \ "Reduce ID").extract[Int] val message = jsonOption(json \ "Message").map(_.extract[String]) new FetchFailed(blockManagerAddress, shuffleId, mapId, mapIndex, reduceId, diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 5a4073baa19d4..955589fc5b47b 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -327,6 +327,17 @@ class JsonProtocolSuite extends SparkFunSuite { assert(expectedFetchFailed === JsonProtocol.taskEndReasonFromJson(oldEvent)) } + test("SPARK-32124: FetchFailed Map Index backwards compatibility") { + // FetchFailed in Spark 2.4.0 does not have "Map Index" property. + val fetchFailed = FetchFailed(BlockManagerId("With or", "without you", 15), 17, 16L, 18, 19, + "ignored") + val oldEvent = JsonProtocol.taskEndReasonToJson(fetchFailed) + .removeField({ _._1 == "Map Index" }) + val expectedFetchFailed = FetchFailed(BlockManagerId("With or", "without you", 15), 17, 16L, + 0, 19, "ignored") + assert(expectedFetchFailed === JsonProtocol.taskEndReasonFromJson(oldEvent)) + } + test("ShuffleReadMetrics: Local bytes read backwards compatibility") { // Metrics about local shuffle bytes read were added in 1.3.1. val metrics = makeTaskMetrics(1L, 2L, 3L, 4L, 5, 6, From f944603872284c03c557474bb9e816f20094a630 Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Sun, 28 Jun 2020 21:35:59 -0700 Subject: [PATCH 10/15] [SPARK-32126][SS] Scope Session.active in IncrementalExecution ### What changes were proposed in this pull request? The `optimizedPlan` in IncrementalExecution should also be scoped in `withActive`. ### Why are the changes needed? Follow-up of SPARK-30798 for the Streaming side. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing UT. Closes #28936 from xuanyuanking/SPARK-30798-follow. Authored-by: Yuanjian Li Signed-off-by: Dongjoon Hyun --- .../scala/org/apache/spark/sql/execution/QueryExecution.scala | 2 +- .../spark/sql/execution/streaming/IncrementalExecution.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index bf60427e5f3bf..791e432269632 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -131,7 +131,7 @@ class QueryExecution( Option(InsertAdaptiveSparkPlan(AdaptiveExecutionContext(sparkSession, this)))) } - private def executePhase[T](phase: String)(block: => T): T = sparkSession.withActive { + protected def executePhase[T](phase: String)(block: => T): T = sparkSession.withActive { tracker.measurePhase(phase)(block) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala index 09ae7692ec518..7773ac71c4954 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala @@ -76,7 +76,7 @@ class IncrementalExecution( * with the desired literal */ override - lazy val optimizedPlan: LogicalPlan = tracker.measurePhase(QueryPlanningTracker.OPTIMIZATION) { + lazy val optimizedPlan: LogicalPlan = executePhase(QueryPlanningTracker.OPTIMIZATION) { sparkSession.sessionState.optimizer.executeAndTrack(withCachedData, tracker) transformAllExpressions { case ts @ CurrentBatchTimestamp(timestamp, _, _) => From 0ec17c989debddb56a8048e4bd29ae666e4d9c56 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Sun, 28 Jun 2020 21:49:10 -0700 Subject: [PATCH 11/15] [SPARK-32090][SQL] Improve UserDefinedType.equal() to make it be symmetrical ### What changes were proposed in this pull request? This PR fix `UserDefinedType.equal()` by comparing the UDT class instead of checking `acceptsType()`. ### Why are the changes needed? It's weird that equality comparison between two UDT types can have different result by switching the order: ```scala // ExampleSubTypeUDT.userClass is a subclass of ExampleBaseTypeUDT.userClass val udt1 = new ExampleBaseTypeUDT val udt2 = new ExampleSubTypeUDT println(udt1 == udt2) // true println(udt2 == udt1) // false ``` ### Does this PR introduce _any_ user-facing change? Yes. Before: ```scala // ExampleSubTypeUDT.userClass is a subclass of ExampleBaseTypeUDT.userClass val udt1 = new ExampleBaseTypeUDT val udt2 = new ExampleSubTypeUDT println(udt1 == udt2) // true println(udt2 == udt1) // false ``` After: ```scala // ExampleSubTypeUDT.userClass is a subclass of ExampleBaseTypeUDT.userClass val udt1 = new ExampleBaseTypeUDT val udt2 = new ExampleSubTypeUDT println(udt1 == udt2) // false println(udt2 == udt1) // false ``` ### How was this patch tested? Added a unit test. Closes #28923 from Ngone51/fix-udt-equal. Authored-by: yi.wu Signed-off-by: Dongjoon Hyun --- .../spark/sql/types/UserDefinedType.scala | 2 +- .../spark/sql/UserDefinedTypeSuite.scala | 18 ++++++++++++++++++ 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UserDefinedType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UserDefinedType.scala index 6af16e2dba105..592ce03606d4b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UserDefinedType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UserDefinedType.scala @@ -90,7 +90,7 @@ abstract class UserDefinedType[UserType >: Null] extends DataType with Serializa override def hashCode(): Int = getClass.hashCode() override def equals(other: Any): Boolean = other match { - case that: UserDefinedType[_] => this.acceptsType(that) + case that: UserDefinedType[_] => this.getClass == that.getClass case _ => false } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala index 3fd5cc72cb95e..9acb00b7b6d0b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala @@ -134,6 +134,24 @@ class UserDefinedTypeSuite extends QueryTest with SharedSparkSession with Parque MyLabeledPoint(1.0, new TestUDT.MyDenseVector(Array(0.1, 1.0))), MyLabeledPoint(0.0, new TestUDT.MyDenseVector(Array(0.3, 3.0)))).toDF() + + test("SPARK-32090: equal") { + val udt1 = new ExampleBaseTypeUDT + val udt2 = new ExampleSubTypeUDT + val udt3 = new ExampleSubTypeUDT + assert(udt1 !== udt2) + assert(udt2 !== udt1) + assert(udt2 === udt3) + assert(udt3 === udt2) + } + + test("SPARK-32090: acceptsType") { + val udt1 = new ExampleBaseTypeUDT + val udt2 = new ExampleSubTypeUDT + assert(udt1.acceptsType(udt2)) + assert(!udt2.acceptsType(udt1)) + } + test("register user type: MyDenseVector for MyLabeledPoint") { val labels: RDD[Double] = pointsRDD.select('label).rdd.map { case Row(v: Double) => v } val labelsArrays: Array[Double] = labels.collect() From 835ef425d03f30984e885448fe785905ed1ee9a7 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Sun, 28 Jun 2020 21:55:19 -0700 Subject: [PATCH 12/15] [SPARK-32038][SQL][FOLLOWUP] Make the alias name pretty after float/double normalization ### What changes were proposed in this pull request? This is a followup of https://github.com/apache/spark/pull/28876/files This PR proposes to use the name of the original expression, as the alias name of the normalization expression. ### Why are the changes needed? make the query plan looks pretty when EXPLAIN. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? manually explain the query Closes #28919 from cloud-fan/follow. Authored-by: Wenchen Fan Signed-off-by: Dongjoon Hyun --- .../org/apache/spark/sql/execution/SparkStrategies.scala | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 4d23e5e8a65b5..56d421cdcd702 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -436,6 +436,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { val normalizedGroupingExpressions = groupingExpressions.map { e => NormalizeFloatingNumbers.normalize(e) match { case n: NamedExpression => n + // Keep the name of the original expression. case other => Alias(other, e.name)(exprId = e.exprId) } } @@ -460,7 +461,13 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { // because `distinctExpressions` is not extracted during logical phase. NormalizeFloatingNumbers.normalize(e) match { case ne: NamedExpression => ne - case other => Alias(other, other.toString)() + case other => + // Keep the name of the original expression. + val name = e match { + case ne: NamedExpression => ne.name + case _ => e.toString + } + Alias(other, name)() } } From 4204a63d4ff628a38107543742753667330d1112 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 29 Jun 2020 11:33:40 +0000 Subject: [PATCH 13/15] [SPARK-32056][SQL] Coalesce partitions for repartition by expressions when AQE is enabled ### What changes were proposed in this pull request? This patch proposes to coalesce partitions for repartition by expressions without specifying number of partitions, when AQE is enabled. ### Why are the changes needed? When repartition by some partition expressions, users can specify number of partitions or not. If the number of partitions is specified, we should not coalesce partitions because it breaks user expectation. But if without specifying number of partitions, AQE should be able to coalesce partitions as other shuffling. ### Does this PR introduce _any_ user-facing change? Yes. After this change, if users don't specify the number of partitions when repartitioning data by expressions, AQE will coalesce partitions. ### How was this patch tested? Added unit test. Closes #28900 from viirya/SPARK-32056. Authored-by: Liang-Chi Hsieh Signed-off-by: Wenchen Fan --- .../plans/logical/basicLogicalOperators.scala | 18 ++++- .../scala/org/apache/spark/sql/Dataset.scala | 54 ++++++++----- .../spark/sql/execution/SparkStrategies.scala | 3 +- .../adaptive/AdaptiveQueryExecSuite.scala | 77 +++++++++++++++++-- 4 files changed, 120 insertions(+), 32 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 79a8380826ab3..039fd9382000a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, RangePartitioning, RoundRobinPartitioning} import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.connector.catalog.Identifier +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.util.random.RandomSampler @@ -953,16 +954,18 @@ case class Repartition(numPartitions: Int, shuffle: Boolean, child: LogicalPlan) } /** - * This method repartitions data using [[Expression]]s into `numPartitions`, and receives + * This method repartitions data using [[Expression]]s into `optNumPartitions`, and receives * information about the number of partitions during execution. Used when a specific ordering or * distribution is expected by the consumer of the query result. Use [[Repartition]] for RDD-like - * `coalesce` and `repartition`. + * `coalesce` and `repartition`. If no `optNumPartitions` is given, by default it partitions data + * into `numShufflePartitions` defined in `SQLConf`, and could be coalesced by AQE. */ case class RepartitionByExpression( partitionExpressions: Seq[Expression], child: LogicalPlan, - numPartitions: Int) extends RepartitionOperation { + optNumPartitions: Option[Int]) extends RepartitionOperation { + val numPartitions = optNumPartitions.getOrElse(SQLConf.get.numShufflePartitions) require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.") val partitioning: Partitioning = { @@ -990,6 +993,15 @@ case class RepartitionByExpression( override def shuffle: Boolean = true } +object RepartitionByExpression { + def apply( + partitionExpressions: Seq[Expression], + child: LogicalPlan, + numPartitions: Int): RepartitionByExpression = { + RepartitionByExpression(partitionExpressions, child, Some(numPartitions)) + } +} + /** * A relation with one row. This is used in "SELECT ..." without a from clause. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 524e231eb7eb9..6f97121d88ede 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -2991,17 +2991,9 @@ class Dataset[T] private[sql]( Repartition(numPartitions, shuffle = true, logicalPlan) } - /** - * Returns a new Dataset partitioned by the given partitioning expressions into - * `numPartitions`. The resulting Dataset is hash partitioned. - * - * This is the same operation as "DISTRIBUTE BY" in SQL (Hive QL). - * - * @group typedrel - * @since 2.0.0 - */ - @scala.annotation.varargs - def repartition(numPartitions: Int, partitionExprs: Column*): Dataset[T] = { + private def repartitionByExpression( + numPartitions: Option[Int], + partitionExprs: Seq[Column]): Dataset[T] = { // The underlying `LogicalPlan` operator special-cases all-`SortOrder` arguments. // However, we don't want to complicate the semantics of this API method. // Instead, let's give users a friendly error message, pointing them to the new method. @@ -3015,6 +3007,20 @@ class Dataset[T] private[sql]( } } + /** + * Returns a new Dataset partitioned by the given partitioning expressions into + * `numPartitions`. The resulting Dataset is hash partitioned. + * + * This is the same operation as "DISTRIBUTE BY" in SQL (Hive QL). + * + * @group typedrel + * @since 2.0.0 + */ + @scala.annotation.varargs + def repartition(numPartitions: Int, partitionExprs: Column*): Dataset[T] = { + repartitionByExpression(Some(numPartitions), partitionExprs) + } + /** * Returns a new Dataset partitioned by the given partitioning expressions, using * `spark.sql.shuffle.partitions` as number of partitions. @@ -3027,7 +3033,20 @@ class Dataset[T] private[sql]( */ @scala.annotation.varargs def repartition(partitionExprs: Column*): Dataset[T] = { - repartition(sparkSession.sessionState.conf.numShufflePartitions, partitionExprs: _*) + repartitionByExpression(None, partitionExprs) + } + + private def repartitionByRange( + numPartitions: Option[Int], + partitionExprs: Seq[Column]): Dataset[T] = { + require(partitionExprs.nonEmpty, "At least one partition-by expression must be specified.") + val sortOrder: Seq[SortOrder] = partitionExprs.map(_.expr match { + case expr: SortOrder => expr + case expr: Expression => SortOrder(expr, Ascending) + }) + withTypedPlan { + RepartitionByExpression(sortOrder, logicalPlan, numPartitions) + } } /** @@ -3049,14 +3068,7 @@ class Dataset[T] private[sql]( */ @scala.annotation.varargs def repartitionByRange(numPartitions: Int, partitionExprs: Column*): Dataset[T] = { - require(partitionExprs.nonEmpty, "At least one partition-by expression must be specified.") - val sortOrder: Seq[SortOrder] = partitionExprs.map(_.expr match { - case expr: SortOrder => expr - case expr: Expression => SortOrder(expr, Ascending) - }) - withTypedPlan { - RepartitionByExpression(sortOrder, logicalPlan, numPartitions) - } + repartitionByRange(Some(numPartitions), partitionExprs) } /** @@ -3078,7 +3090,7 @@ class Dataset[T] private[sql]( */ @scala.annotation.varargs def repartitionByRange(partitionExprs: Column*): Dataset[T] = { - repartitionByRange(sparkSession.sessionState.conf.numShufflePartitions, partitionExprs: _*) + repartitionByRange(None, partitionExprs) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 56d421cdcd702..3f339347ab4db 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -692,8 +692,9 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case r: logical.Range => execution.RangeExec(r) :: Nil case r: logical.RepartitionByExpression => + val canChangeNumParts = r.optNumPartitions.isEmpty exchange.ShuffleExchangeExec( - r.partitioning, planLater(r.child), canChangeNumPartitions = false) :: Nil + r.partitioning, planLater(r.child), canChangeNumParts) :: Nil case ExternalRDD(outputObjAttr, rdd) => ExternalRDDScanExec(outputObjAttr, rdd) :: Nil case r: LogicalRDD => RDDScanExec(r.output, r.rdd, "ExistingRDD", r.outputPartitioning, r.outputOrdering) :: Nil diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 9fa97bffa8910..27d9748476c98 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight} import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan} import org.apache.spark.sql.execution.{PartialReducerPartitionSpec, ReusedSubqueryExec, ShuffledRowRDD, SparkPlan} import org.apache.spark.sql.execution.command.DataWritingCommandExec -import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, Exchange, ReusedExchangeExec} +import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, Exchange, ReusedExchangeExec, ShuffleExchangeExec} import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, SortMergeJoinExec} import org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate import org.apache.spark.sql.functions._ @@ -1022,18 +1022,81 @@ class AdaptiveQueryExecSuite } } - test("SPARK-31220 repartition obeys initialPartitionNum when adaptiveExecutionEnabled") { + test("SPARK-31220, SPARK-32056: repartition by expression with AQE") { Seq(true, false).foreach { enableAQE => withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> enableAQE.toString, - SQLConf.SHUFFLE_PARTITIONS.key -> "6", - SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "7") { - val partitionsNum = spark.range(10).repartition($"id").rdd.collectPartitions().length + SQLConf.COALESCE_PARTITIONS_ENABLED.key -> "true", + SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "10", + SQLConf.SHUFFLE_PARTITIONS.key -> "10") { + + val df1 = spark.range(10).repartition($"id") + val df2 = spark.range(10).repartition($"id" + 1) + + val partitionsNum1 = df1.rdd.collectPartitions().length + val partitionsNum2 = df2.rdd.collectPartitions().length + if (enableAQE) { - assert(partitionsNum === 7) + assert(partitionsNum1 < 10) + assert(partitionsNum2 < 10) + + // repartition obeys initialPartitionNum when adaptiveExecutionEnabled + val plan = df1.queryExecution.executedPlan + assert(plan.isInstanceOf[AdaptiveSparkPlanExec]) + val shuffle = plan.asInstanceOf[AdaptiveSparkPlanExec].executedPlan.collect { + case s: ShuffleExchangeExec => s + } + assert(shuffle.size == 1) + assert(shuffle(0).outputPartitioning.numPartitions == 10) } else { - assert(partitionsNum === 6) + assert(partitionsNum1 === 10) + assert(partitionsNum2 === 10) } + + + // Don't coalesce partitions if the number of partitions is specified. + val df3 = spark.range(10).repartition(10, $"id") + val df4 = spark.range(10).repartition(10) + assert(df3.rdd.collectPartitions().length == 10) + assert(df4.rdd.collectPartitions().length == 10) + } + } + } + + test("SPARK-31220, SPARK-32056: repartition by range with AQE") { + Seq(true, false).foreach { enableAQE => + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> enableAQE.toString, + SQLConf.COALESCE_PARTITIONS_ENABLED.key -> "true", + SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "10", + SQLConf.SHUFFLE_PARTITIONS.key -> "10") { + + val df1 = spark.range(10).toDF.repartitionByRange($"id".asc) + val df2 = spark.range(10).toDF.repartitionByRange(($"id" + 1).asc) + + val partitionsNum1 = df1.rdd.collectPartitions().length + val partitionsNum2 = df2.rdd.collectPartitions().length + + if (enableAQE) { + assert(partitionsNum1 < 10) + assert(partitionsNum2 < 10) + + // repartition obeys initialPartitionNum when adaptiveExecutionEnabled + val plan = df1.queryExecution.executedPlan + assert(plan.isInstanceOf[AdaptiveSparkPlanExec]) + val shuffle = plan.asInstanceOf[AdaptiveSparkPlanExec].executedPlan.collect { + case s: ShuffleExchangeExec => s + } + assert(shuffle.size == 1) + assert(shuffle(0).outputPartitioning.numPartitions == 10) + } else { + assert(partitionsNum1 === 10) + assert(partitionsNum2 === 10) + } + + // Don't coalesce partitions if the number of partitions is specified. + val df3 = spark.range(10).repartitionByRange(10, $"id".asc) + assert(df3.rdd.collectPartitions().length == 10) } } } From 6fcb70e0cadd8a543cd9be5f606c5dbeec0ae181 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Mon, 29 Jun 2020 11:37:03 +0000 Subject: [PATCH 14/15] [SPARK-32055][CORE][SQL] Unify getReader and getReaderForRange in ShuffleManager ### What changes were proposed in this pull request? This PR tries to unify the method `getReader` and `getReaderForRange` in `ShuffleManager`. ### Why are the changes needed? Reduce the duplicate codes, simplify the implementation, and for better maintenance. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Covered by existing tests. Closes #28895 from Ngone51/unify-getreader. Authored-by: yi.wu Signed-off-by: Wenchen Fan --- .../org/apache/spark/MapOutputTracker.scala | 77 ++++--------------- .../apache/spark/shuffle/ShuffleManager.scala | 18 +++-- .../shuffle/sort/SortShuffleManager.scala | 22 ++---- .../apache/spark/MapOutputTrackerSuite.scala | 2 +- .../BlockStoreShuffleReaderSuite.scala | 4 +- .../spark/sql/execution/ShuffledRowRDD.scala | 4 +- 6 files changed, 40 insertions(+), 87 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index ec8621bc55cf3..18cd5de4cfada 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -322,36 +322,22 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging // For testing def getMapSizesByExecutorId(shuffleId: Int, reduceId: Int) : Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = { - getMapSizesByExecutorId(shuffleId, reduceId, reduceId + 1) + getMapSizesByExecutorId(shuffleId, 0, Int.MaxValue, reduceId, reduceId + 1) } /** * Called from executors to get the server URIs and output sizes for each shuffle block that * needs to be read from a given range of map output partitions (startPartition is included but - * endPartition is excluded from the range). + * endPartition is excluded from the range) within a range of mappers (startMapIndex is included + * but endMapIndex is excluded). If endMapIndex=Int.MaxValue, the actual endMapIndex will be + * changed to the length of total map outputs. * * @return A sequence of 2-item tuples, where the first item in the tuple is a BlockManagerId, * and the second item is a sequence of (shuffle block id, shuffle block size, map index) * tuples describing the shuffle blocks that are stored at that block manager. + * Note that zero-sized blocks are excluded in the result. */ def getMapSizesByExecutorId( - shuffleId: Int, - startPartition: Int, - endPartition: Int) - : Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] - - /** - * Called from executors to get the server URIs and output sizes for each shuffle block that - * needs to be read from a given range of map output partitions (startPartition is included but - * endPartition is excluded from the range) and is produced by - * a range of mappers (startMapIndex, endMapIndex, startMapIndex is included and - * the endMapIndex is excluded). - * - * @return A sequence of 2-item tuples, where the first item in the tuple is a BlockManagerId, - * and the second item is a sequence of (shuffle block id, shuffle block size, map index) - * tuples describing the shuffle blocks that are stored at that block manager. - */ - def getMapSizesByRange( shuffleId: Int, startMapIndex: Int, endMapIndex: Int, @@ -734,38 +720,22 @@ private[spark] class MapOutputTrackerMaster( } } - // Get blocks sizes by executor Id. Note that zero-sized blocks are excluded in the result. // This method is only called in local-mode. def getMapSizesByExecutorId( - shuffleId: Int, - startPartition: Int, - endPartition: Int) - : Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = { - logDebug(s"Fetching outputs for shuffle $shuffleId, partitions $startPartition-$endPartition") - shuffleStatuses.get(shuffleId) match { - case Some (shuffleStatus) => - shuffleStatus.withMapStatuses { statuses => - MapOutputTracker.convertMapStatuses( - shuffleId, startPartition, endPartition, statuses, 0, shuffleStatus.mapStatuses.length) - } - case None => - Iterator.empty - } - } - - override def getMapSizesByRange( shuffleId: Int, startMapIndex: Int, endMapIndex: Int, startPartition: Int, endPartition: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = { - logDebug(s"Fetching outputs for shuffle $shuffleId, mappers $startMapIndex-$endMapIndex" + - s"partitions $startPartition-$endPartition") + logDebug(s"Fetching outputs for shuffle $shuffleId") shuffleStatuses.get(shuffleId) match { case Some(shuffleStatus) => shuffleStatus.withMapStatuses { statuses => + val actualEndMapIndex = if (endMapIndex == Int.MaxValue) statuses.length else endMapIndex + logDebug(s"Convert map statuses for shuffle $shuffleId, " + + s"mappers $startMapIndex-$actualEndMapIndex, partitions $startPartition-$endPartition") MapOutputTracker.convertMapStatuses( - shuffleId, startPartition, endPartition, statuses, startMapIndex, endMapIndex) + shuffleId, startPartition, endPartition, statuses, startMapIndex, actualEndMapIndex) } case None => Iterator.empty @@ -798,37 +768,20 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr */ private val fetchingLock = new KeyLock[Int] - // Get blocks sizes by executor Id. Note that zero-sized blocks are excluded in the result. override def getMapSizesByExecutorId( - shuffleId: Int, - startPartition: Int, - endPartition: Int) - : Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = { - logDebug(s"Fetching outputs for shuffle $shuffleId, partitions $startPartition-$endPartition") - val statuses = getStatuses(shuffleId, conf) - try { - MapOutputTracker.convertMapStatuses( - shuffleId, startPartition, endPartition, statuses, 0, statuses.length) - } catch { - case e: MetadataFetchFailedException => - // We experienced a fetch failure so our mapStatuses cache is outdated; clear it: - mapStatuses.clear() - throw e - } - } - - override def getMapSizesByRange( shuffleId: Int, startMapIndex: Int, endMapIndex: Int, startPartition: Int, endPartition: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = { - logDebug(s"Fetching outputs for shuffle $shuffleId, mappers $startMapIndex-$endMapIndex" + - s"partitions $startPartition-$endPartition") + logDebug(s"Fetching outputs for shuffle $shuffleId") val statuses = getStatuses(shuffleId, conf) try { + val actualEndMapIndex = if (endMapIndex == Int.MaxValue) statuses.length else endMapIndex + logDebug(s"Convert map statuses for shuffle $shuffleId, " + + s"mappers $startMapIndex-$actualEndMapIndex, partitions $startPartition-$endPartition") MapOutputTracker.convertMapStatuses( - shuffleId, startPartition, endPartition, statuses, startMapIndex, endMapIndex) + shuffleId, startPartition, endPartition, statuses, startMapIndex, actualEndMapIndex) } catch { case e: MetadataFetchFailedException => // We experienced a fetch failure so our mapStatuses cache is outdated; clear it: diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala index 057b0d6e0b0a7..400c4526f0114 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala @@ -43,23 +43,31 @@ private[spark] trait ShuffleManager { context: TaskContext, metrics: ShuffleWriteMetricsReporter): ShuffleWriter[K, V] + /** - * Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive). + * Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive) to + * read from all map outputs of the shuffle. + * * Called on executors by reduce tasks. */ - def getReader[K, C]( + final def getReader[K, C]( handle: ShuffleHandle, startPartition: Int, endPartition: Int, context: TaskContext, - metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] + metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = { + getReader(handle, 0, Int.MaxValue, startPartition, endPartition, context, metrics) + } /** * Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive) to - * read from map output (startMapIndex to endMapIndex - 1, inclusive). + * read from a range of map outputs(startMapIndex to endMapIndex-1, inclusive). + * If endMapIndex=Int.MaxValue, the actual endMapIndex will be changed to the length of total map + * outputs of the shuffle in `getMapSizesByExecutorId`. + * * Called on executors by reduce tasks. */ - def getReaderForRange[K, C]( + def getReader[K, C]( handle: ShuffleHandle, startMapIndex: Int, endMapIndex: Int, diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala index aefcb59b8bb87..72460180f5908 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala @@ -23,6 +23,7 @@ import scala.collection.JavaConverters._ import org.apache.spark._ import org.apache.spark.internal.{config, Logging} +import org.apache.spark.scheduler.MapStatus import org.apache.spark.shuffle._ import org.apache.spark.shuffle.api.{ShuffleDataIO, ShuffleExecutorComponents} import org.apache.spark.util.Utils @@ -115,23 +116,14 @@ private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager } /** - * Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive). + * Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive) to + * read from a range of map outputs(startMapIndex to endMapIndex-1, inclusive). + * If endMapIndex=Int.MaxValue, the actual endMapIndex will be changed to the length of total map + * outputs of the shuffle in `getMapSizesByExecutorId`. + * * Called on executors by reduce tasks. */ override def getReader[K, C]( - handle: ShuffleHandle, - startPartition: Int, - endPartition: Int, - context: TaskContext, - metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = { - val blocksByAddress = SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId( - handle.shuffleId, startPartition, endPartition) - new BlockStoreShuffleReader( - handle.asInstanceOf[BaseShuffleHandle[K, _, C]], blocksByAddress, context, metrics, - shouldBatchFetch = canUseBatchFetch(startPartition, endPartition, context)) - } - - override def getReaderForRange[K, C]( handle: ShuffleHandle, startMapIndex: Int, endMapIndex: Int, @@ -139,7 +131,7 @@ private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager endPartition: Int, context: TaskContext, metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = { - val blocksByAddress = SparkEnv.get.mapOutputTracker.getMapSizesByRange( + val blocksByAddress = SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId( handle.shuffleId, startMapIndex, endMapIndex, startPartition, endPartition) new BlockStoreShuffleReader( handle.asInstanceOf[BaseShuffleHandle[K, _, C]], blocksByAddress, context, metrics, diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala index d5ee19bde8edf..630ffd9baa06e 100644 --- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala @@ -317,7 +317,7 @@ class MapOutputTrackerSuite extends SparkFunSuite { tracker.registerMapOutput(10, 1, MapStatus(BlockManagerId("b", "hostB", 1000), Array(size10000, size0, size1000, size0), 6)) assert(tracker.containsShuffle(10)) - assert(tracker.getMapSizesByExecutorId(10, 0, 4).toSeq === + assert(tracker.getMapSizesByExecutorId(10, 0, 2, 0, 4).toSeq === Seq( (BlockManagerId("a", "hostA", 1000), Seq((ShuffleBlockId(10, 5, 1), size1000, 0), diff --git a/core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala index a82f86a11c77e..d964b28df2983 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala @@ -104,7 +104,7 @@ class BlockStoreShuffleReaderSuite extends SparkFunSuite with LocalSparkContext // shuffle data to read. val mapOutputTracker = mock(classOf[MapOutputTracker]) when(mapOutputTracker.getMapSizesByExecutorId( - shuffleId, reduceId, reduceId + 1)).thenReturn { + shuffleId, 0, numMaps, reduceId, reduceId + 1)).thenReturn { // Test a scenario where all data is local, to avoid creating a bunch of additional mocks // for the code to read data over the network. val shuffleBlockIdsAndSizes = (0 until numMaps).map { mapId => @@ -132,7 +132,7 @@ class BlockStoreShuffleReaderSuite extends SparkFunSuite with LocalSparkContext val taskContext = TaskContext.empty() val metrics = taskContext.taskMetrics.createTempShuffleReadMetrics() val blocksByAddress = mapOutputTracker.getMapSizesByExecutorId( - shuffleId, reduceId, reduceId + 1) + shuffleId, 0, numMaps, reduceId, reduceId + 1) val shuffleReader = new BlockStoreShuffleReader( shuffleHandle, blocksByAddress, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala index 5936492dd819c..b5e9655a776b7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala @@ -191,7 +191,7 @@ class ShuffledRowRDD( sqlMetricsReporter) case PartialReducerPartitionSpec(reducerIndex, startMapIndex, endMapIndex, _) => - SparkEnv.get.shuffleManager.getReaderForRange( + SparkEnv.get.shuffleManager.getReader( dependency.shuffleHandle, startMapIndex, endMapIndex, @@ -201,7 +201,7 @@ class ShuffledRowRDD( sqlMetricsReporter) case PartialMapperPartitionSpec(mapIndex, startReducerIndex, endReducerIndex) => - SparkEnv.get.shuffleManager.getReaderForRange( + SparkEnv.get.shuffleManager.getReader( dependency.shuffleHandle, mapIndex, mapIndex + 1, From a6b6a1fd611edb0b0c11e2a5399cee66a7e74de6 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Mon, 29 Jun 2020 11:53:56 +0000 Subject: [PATCH 15/15] [MINOR] update dev/create-release/known_translations This is a followup of https://github.com/apache/spark/pull/28861 : 1. sort the names by Github ID, suggested by https://github.com/apache/spark/pull/28861#pullrequestreview-433369190 2. add more full names collected in https://github.com/apache/spark/pull/28861 3. The duplicated entry of `linbojin` is removed. 4. Name format is normalized to First Last name style. Closes #28891 from cloud-fan/update. Authored-by: Wenchen Fan Signed-off-by: Wenchen Fan --- dev/create-release/known_translations | 541 +++++++++++++------------- 1 file changed, 270 insertions(+), 271 deletions(-) diff --git a/dev/create-release/known_translations b/dev/create-release/known_translations index 376398bc3788a..ff41cccde0140 100644 --- a/dev/create-release/known_translations +++ b/dev/create-release/known_translations @@ -1,411 +1,410 @@ # This is a mapping of names to be translated through translate-contributors.py -# The format expected on each line should be: - -CodingCat - Nan Zhu -CrazyJvm - Chao Chen -EugenCepoi - Eugen Cepoi -GraceH - Jie Huang -JerryLead - Lijie Xu -Leolh - Liu Hao -Lewuathe - Kai Sasaki -RongGu - Rong Gu -Shiti - Shiti Saxena -Victsm - Min Shen -WangTaoTheTonic - Wang Tao -XuTingjun - Tingjun Xu -YanTangZhai - Yantang Zhai -alexdebrie - Alex DeBrie -alokito - Alok Saldanha -anantasty - Anant Asthana -andrewor14 - Andrew Or -aniketbhatnagar - Aniket Bhatnagar -arahuja - Arun Ahuja -brkyvz - Burak Yavuz -chesterxgchen - Chester Chen -chiragaggarwal - Chirag Aggarwal -chouqin - Qiping Li -cocoatomo - Tomohiko K. -coderfi - Fairiz Azizi -coderxiang - Shuo Xiang -davies - Davies Liu -epahomov - Egor Pahomov -falaki - Hossein Falaki -freeman-lab - Jeremy Freeman -industrial-sloth - Jascha Swisher -jackylk - Jacky Li -jayunit100 - Jay Vyas -jerryshao - Saisai Shao -jkbradley - Joseph Bradley -lianhuiwang - Lianhui Wang -lirui-intel - Rui Li -luluorta - Lu Lu -luogankun - Gankun Luo -maji2014 - Derek Ma -mccheah - Matthew Cheah -mengxr - Xiangrui Meng -nartz - Nathan Artz -odedz - Oded Zimerman -ravipesala - Ravindra Pesala -roxchkplusony - Victor Tso -scwf - Wang Fei -shimingfei - Shiming Fei -surq - Surong Quan -suyanNone - Su Yan -tedyu - Ted Yu -tigerquoll - Dale Richardson -wangxiaojing - Xiaojing Wang -watermen - Yadong Qi -witgo - Guoqiang Li -xinyunh - Xinyun Huang -zsxwing - Shixiong Zhu -Bilna - Bilna P -DoingDone9 - Doing Done -Earne - Ernest -FlytxtRnD - Meethu Mathew -GenTang - Gen TANG -JoshRosen - Josh Rosen -MechCoder - Manoj Kumar -OopsOutOfMemory - Sheng Li -Peishen-Jia - Peishen Jia -SaintBacchus - Huang Zhaowei -azagrebin - Andrey Zagrebin -bzz - Alexander Bezzubov -fjiang6 - Fan Jiang -gasparms - Gaspar Munoz -guowei2 - Guo Wei -hhbyyh - Yuhao Yang -hseagle - Peng Xu -javadba - Stephen Boesch -jbencook - Ben Cook -kul - Kuldeep -ligangty - Gang Li -marsishandsome - Liangliang Gu -medale - Markus Dale -nemccarthy - Nathan McCarthy -nxwhite-str - Nate Crosswhite -seayi - Xiaohua Yi -tianyi - Yi Tian -uncleGen - Uncle Gen -viper-kun - Xu Kun -x1- - Yuri Saito -zapletal-martin - Martin Zapletal -zuxqoj - Shekhar Bansal -mingyukim - Mingyu Kim -sigmoidanalytics - Mayur Rustagi -AiHe - Ai He -BenFradet - Ben Fradet -FavioVazquez - Favio Vazquez -JaysonSunshine - Jayson Sunshine -Liuchang0812 - Liu Chang -Sephiroth-Lin - Sephiroth Lin -dobashim - Masaru Dobashi -ehnalis - Zoltan Zvara -emres - Emre Sevinc -gchen - Guancheng Chen -haiyangsea - Haiyang Sea -hlin09 - Hao Lin -hqzizania - Qian Huang -jeanlyn - Jean Lyn -jerluc - Jeremy A. Lucas -jrabary - Jaonary Rabarisoa -judynash - Judy Nash -kaka1992 - Chen Song -ksonj - Kalle Jepsen -kuromatsu-nobuyuki - Nobuyuki Kuromatsu -lazyman500 - Dong Xu -leahmcguire - Leah McGuire -mbittmann - Mark Bittmann -mbonaci - Marko Bonaci -meawoppl - Matthew Goodman -nyaapa - Arsenii Krasikov -phatak-dev - Madhukara Phatak -prabeesh - Prabeesh K -rakeshchalasani - Rakesh Chalasani -rekhajoshm - Rekha Joshi -sisihj - June He -szheng79 - Shuai Zheng -texasmichelle - Michelle Casbon -vinodkc - Vinod KC -yongtang - Yong Tang -ypcat - Pei-Lun Lee -zhichao-li - Zhichao Li -zzcclp - Zhichao Zhang -979969786 - Yuming Wang -Rosstin - Rosstin Murphy -ameyc - Amey Chaugule -animeshbaranawal - Animesh Baranawal -cafreeman - Chris Freeman -lee19 - Lee -lockwobr - Brian Lockwood -navis - Navis Ryu -pparkkin - Paavo Parkkinen -HyukjinKwon - Hyukjin Kwon -JDrit - Joseph Batchik -JuhongPark - Juhong Park -KaiXinXiaoLei - KaiXinXIaoLei -NamelessAnalyst - NamelessAnalyst -alyaxey - Alex Slusarenko -baishuo - Shuo Bai -fe2s - Oleksiy Dyagilev -felixcheung - Felix Cheung -feynmanliang - Feynman Liang -josepablocam - Jose Cambronero -kai-zeng - Kai Zeng -mosessky - mosessky -msannell - Michael Sannella -nishkamravi2 - Nishkam Ravi -noel-smith - Noel Smith -petz2000 - Patrick Baier -qiansl127 - Shilei Qian -rahulpalamuttam - Rahul Palamuttam -rowan000 - Rowan Chattaway -sarutak - Kousuke Saruta -sethah - Seth Hendrickson -small-wang - Wang Wei -stanzhai - Stan Zhai -tien-dungle - Tien-Dung Le -xuchenCN - Xu Chen -zhangjiajin - Zhang JiaJin -ClassNotFoundExp - Fu Xing -KevinGrealish - Kevin Grealish -MasterDDT - Mitesh Patel -VinceShieh - Vincent Xie -WeichenXu123 - Weichen Xu -Yunni - Yun Ni -actuaryzhang - Wayne Zhang -alicegugu - Gu Huiqin Alice -anabranch - Bill Chambers -ashangit - Nicolas Fraison -avulanov - Alexander Ulanov -biglobster - Liang Ke -cenyuhai - Yuhai Cen -codlife - Jianfei Wang -david-weiluo-ren - Weiluo (David) Ren -dding3 - Ding Ding -fidato13 - Tarun Kumar -frreiss - Fred Reiss -gatorsmile - Xiao Li -hayashidac - Chie Hayashida -invkrh - Hao Ren -jagadeesanas2 - Jagadeesan A S -jiangxb1987 - Jiang Xingbo -jisookim0513 - Jisoo Kim -junyangq - Junyang Qian -krishnakalyan3 - Krishna Kalyan -linbojin - Linbo Jin -mpjlu - Peng Meng -neggert - Nic Eggert -petermaxlee - Peter Lee -phalodi - Sandeep Purohit -pkch - pkch -priyankagargnitk - Priyanka Garg -sharkdtu - Xiaogang Tu -shenh062326 - Shen Hong -aokolnychyi - Anton Okolnychyi -linbojin - Linbo Jin -lw-lin - Liwei Lin +# The format expected on each line should be: - +012huang - Weiyi Huang +07ARB - Ankit Raj Boudh 10110346 - Xian Liu +979969786 - Yuming Wang Achuth17 - Achuth Narayan Rajagopal Adamyuanyuan - Adam Wang -DylanGuedes - Dylan Guedes -JiahuiJiang - Jiahui Jiang -KevinZwx - Kevin Zhang -LantaoJin - Lantao Jin -Lemonjing - Rann Tao -LucaCanali - Luca Canali -XD-DENG - Xiaodong Deng -aai95 - Aleksei Izmalkin -akonopko - Alexander Konopko -ankuriitg - Ankur Gupta -arucard21 - Riaas Mokiem -attilapiros - Attila Zsolt Piros -bravo-zhang - Bravo Zhang -caneGuy - Kang Zhou -chaoslawful - Xiaozhe Wang -cluo512 - Chuan Luo -codeatri - Neha Patil -crafty-coder - Carlos Pena -debugger87 - Chaozhong Yang -e-dorigatti - Emilio Dorigatti -eric-maynard - Eric Maynard -felixalbani - Felix Albani -fjh100456 - Jinhua Fu -guoxiaolongzte - Xiaolong Guo -heary-cao - Xuewen Cao -huangweizhe123 - Weizhe Huang -ivoson - Tengfei Huang -jinxing64 - Jin Xing -liu-zhaokun - Zhaokun Liu -liutang123 - Lijia Liu -maropu - Takeshi Yamamuro -maryannxue - Maryann Xue -mcteo - Thomas Dunne -mn-mikke - Marek Novotny -myroslavlisniak - Myroslav Lisniak -npoggi - Nicolas Poggi -pgandhi999 - Parth Gandhi -rimolive - Ricardo Martinelli De Oliveira -sadhen - Darcy Shen -sandeep-katta - Sandeep Katta -seancxmao - Chenxiao Mao -sel - Steve Larkin -shimamoto - Takako Shimamoto -shivusondur - Shivakumar Sondur -skonto - Stavros Kontopoulos -trystanleftwich - Trystan Leftwich -ueshin - Takuya Ueshin -uzmijnlm - Weizhe Huang -xuanyuanking - Yuanjian Li -xubo245 - Bo Xu -xueyumusic - Xue Yu -yanlin-Lynn - Yanlin Wang -yucai - Yucai Yu -zhengruifeng - Ruifeng Zheng -zuotingbing - Tingbing Zuo -012huang - Weiyi Huang -07ARB - Ankit Raj Boudh +AiHe - Ai He Andrew-Crosby - Andrew Crosby AngersZhuuuu - Yi Zhu +BenFradet - Ben Fradet +Bilna - Bilna P +ClassNotFoundExp - Fu Xing +CodingCat - Nan Zhu +CrazyJvm - Chao Chen Deegue - Yizhong Zhang +DoingDone9 - Doing Done +DylanGuedes - Dylan Guedes +Earne - Ernest +EugenCepoi - Eugen Cepoi +FavioVazquez - Favio Vazquez +FlytxtRnD - Meethu Mathew +GenTang - Gen TANG +GraceH - Jie Huang Gschiavon - German Schiavon Matteo GuoPhilipse - Philipse Guo Hellsen83 - Erik Christiansen +HyukjinKwon - Hyukjin Kwon Icysandwich - Icysandwich +JDrit - Joseph Batchik JasonWayne - Wenjie Wu +JaysonSunshine - Jayson Sunshine +JerryLead - Lijie Xu +JiahuiJiang - Jiahui Jiang JkSelf - Ke Jia JoanFM - Joan Fontanals +JoshRosen - Josh Rosen +JuhongPark - Juhong Park JulienPeloton - Julien Peloton +KaiXinXiaoLei - KaiXinXIaoLei +KevinGrealish - Kevin Grealish +KevinZwx - Kevin Zhang Koraseg - Artem Kupchinskiy KyleLi1985 - Liang Li +LantaoJin - Lantao Jin +Lemonjing - Rann Tao +Leolh - Liu Hao +Lewuathe - Kai Sasaki LiShuMing - Shuming Li -LinhongLiu - Liu, Linhong +LinhongLiu - Linhong Liu +Liuchang0812 - Liu Chang +LucaCanali - Luca Canali LuciferYang - Yang Jie +MasterDDT - Mitesh Patel MaxGekk - Maxim Gekk +MechCoder - Manoj Kumar +NamelessAnalyst - NamelessAnalyst Ngone51 - Yi Wu +OopsOutOfMemory - Sheng Li PavithraRamachandran - Pavithra Ramachandran +Peishen-Jia - Peishen Jia +RongGu - Rong Gu +Rosstin - Rosstin Murphy +SaintBacchus - Huang Zhaowei +Sephiroth-Lin - Sephiroth Lin +Shiti - Shiti Saxena SongYadong - Yadong Song TigerYang414 - David Yang TomokoKomiyama - Tomoko Komiyama TopGunViper - TopGunViper Udbhav30 - Udbhav Agrawal +Victsm - Min Shen +VinceShieh - Vincent Xie WangGuangxin - Guangxin Wang +WangTaoTheTonic - Wang Tao +WeichenXu123 - Weichen Xu William1104 - William Wong +XD-DENG - Xiaodong Deng +XuTingjun - Tingjun Xu +YanTangZhai - Yantang Zhai YongjinZhou - Yongjin Zhou +Yunni - Yun Ni +aai95 - Aleksei Izmalkin aaruna - Aaruna Godthi +actuaryzhang - Wayne Zhang adrian555 - Weiqiang Zhuang ajithme - Ajith S +akonopko - Alexander Konopko +alexdebrie - Alex DeBrie +alicegugu - Gu Huiqin Alice +alokito - Alok Saldanha +alyaxey - Alex Slusarenko amanomer - Aman Omer +ameyc - Amey Chaugule +anabranch - Bill Chambers +anantasty - Anant Asthana ancasarb - Anca Sarb +andrewor14 - Andrew Or +aniketbhatnagar - Aniket Bhatnagar +animeshbaranawal - Animesh Baranawal +ankuriitg - Ankur Gupta +aokolnychyi - Anton Okolnychyi +arahuja - Arun Ahuja +arucard21 - Riaas Mokiem +ashangit - Nicolas Fraison +attilapiros - Attila Zsolt Piros avkgh - Aleksandr Kashkirov +avulanov - Alexander Ulanov ayudovin - Artsiom Yudovin +azagrebin - Andrey Zagrebin +baishuo - Shuo Bai bartosz25 - Bartosz Konieczny beliefer - Jiaan Geng bettermouse - Chen Hao +biglobster - Liang Ke +bravo-zhang - Bravo Zhang +brkyvz - Burak Yavuz bscan - Brian Scannell +bzz - Alexander Bezzubov +cafreeman - Chris Freeman +caneGuy - Kang Zhou cchung100m - Neo Chien cclauss - Christian Clauss +cenyuhai - Yuhai Cen chakravarthiT - Chakravarthi chandulal - Chandu Kavar +chaoslawful - Xiaozhe Wang +chesterxgchen - Chester Chen +chiragaggarwal - Chirag Aggarwal chitralverma - Chitral Verma -cjn082030 - Jenny +chouqin - Qiping Li +cjn082030 - Juanni Chen cloud-fan - Wenchen Fan +cluo512 - Chuan Luo +cocoatomo - Tomohiko K. +codeatri - Neha Patil codeborui - codeborui +coderfi - Fairiz Azizi +coderxiang - Shuo Xiang +codlife - Jianfei Wang colinmjj - Colin Ma -cxzl25 - cxzl25 +crafty-coder - Carlos Pena +cxzl25 - Shaoyun Chen cyq89051127 - Yongqiang Chai darrentirto - Darren Tirto +david-weiluo-ren - Weiluo (David) Ren daviddingly - Xiaoyuan Ding davidvrba - David Vrba +davies - Davies Liu +dding3 - Ding Ding +debugger87 - Chaozhong Yang deepyaman - Deepyaman Datta denglingang - Lingang Deng dengziming - dengziming deshanxiao - deshanxiao dima-asana - Dima Kamalov dlindelof - David Lindelof +dobashim - Masaru Dobashi dongjoon-hyun - Dongjoon Hyun -eatoncys - eatoncys +e-dorigatti - Emilio Dorigatti +eatoncys - Yanshan Chen +ehnalis - Zoltan Zvara +emres - Emre Sevinc +epahomov - Egor Pahomov +eric-maynard - Eric Maynard +falaki - Hossein Falaki fan31415 - Yijie Fan +fe2s - Oleksiy Dyagilev +felixalbani - Felix Albani +felixcheung - Felix Cheung +feynmanliang - Feynman Liang +fidato13 - Tarun Kumar fitermay - Yuli Fiterman +fjh100456 - Jinhua Fu +fjiang6 - Fan Jiang francis0407 - Mingcong Han +freeman-lab - Jeremy Freeman +frreiss - Fred Reiss fuwhu - Fuwang Hu +gasparms - Gaspar Munoz +gatorsmile - Xiao Li +gchen - Guancheng Chen gss2002 - Greg Senia +guowei2 - Guo Wei +guoxiaolongzte - Xiaolong Guo +haiyangsea - Haiyang Sea +hayashidac - Chie Hayashida hddong - Dongdong Hong +heary-cao - Xuewen Cao hehuiyuan - hehuiyuan helenyugithub - Helen Yu +hhbyyh - Yuhao Yang highmoutain - highmoutain +hlin09 - Hao Lin +hqzizania - Qian Huang +hseagle - Peng Xu httfighter - Tiantian Han huangtianhua - huangtianhua +huangweizhe123 - Weizhe Huang hvanhovell - Herman Van Hovell iRakson - Rakesh Raushan igorcalabria - Igor Calabria imback82 - Terry Kim +industrial-sloth - Jascha Swisher +invkrh - Hao Ren +ivoson - Tengfei Huang +jackylk - Jacky Li +jagadeesanas2 - Jagadeesan A S +javadba - Stephen Boesch javierivanov - Javier Fuentes +jayunit100 - Jay Vyas +jbencook - Ben Cook +jeanlyn - Jean Lyn +jerluc - Jeremy A. Lucas +jerryshao - Saisai Shao +jiangxb1987 - Jiang Xingbo +jinxing64 - Jin Xing +jisookim0513 - Jisoo Kim +jkbradley - Joseph Bradley joelgenter - Joel Genter +josepablocam - Jose Cambronero +jrabary - Jaonary Rabarisoa +judynash - Judy Nash +junyangq - Junyang Qian +kai-zeng - Kai Zeng +kaka1992 - Chen Song ketank-new - Ketan Kunde +krishnakalyan3 - Krishna Kalyan +ksonj - Kalle Jepsen +kul - Kuldeep +kuromatsu-nobuyuki - Nobuyuki Kuromatsu laskfla - Keith Sun +lazyman500 - Dong Xu lcqzte10192193 - Chaoqun Li +leahmcguire - Leah McGuire +lee19 - Lee leoluan2009 - Xuedong Luan liangxs - Xuesen Liang +lianhuiwang - Lianhui Wang lidinghao - Li Hao +ligangty - Gang Li +linbojin - Linbo Jin linehrr - Ryne Yang linzebing - Zebing Lin lipzhu - Lipeng Zhu +lirui-intel - Rui Li +liu-zhaokun - Zhaokun Liu liucht-inspur - liucht-inspur liupc - Pengcheng Liu +liutang123 - Lijia Liu liwensun - Liwen Sun +lockwobr - Brian Lockwood +luluorta - Lu Lu +luogankun - Gankun Luo +lw-lin - Liwei Lin +maji2014 - Derek Ma manuzhang - Manu Zhang mareksimunek - Marek Simunek +maropu - Takeshi Yamamuro +marsishandsome - Liangliang Gu +maryannxue - Maryann Xue masa3141 - Masahiro Kazama +mbittmann - Mark Bittmann +mbonaci - Marko Bonaci +mccheah - Matthew Cheah +mcteo - Thomas Dunne mdianjun - Dianjun Ma +meawoppl - Matthew Goodman +medale - Markus Dale +mengxr - Xiangrui Meng merrily01 - Ruilei Ma +mingyukim - Mingyu Kim +mn-mikke - Marek Novotny mob-ai - mob-ai +mosessky - mosessky +mpjlu - Peng Meng +msannell - Michael Sannella mu5358271 - Shuheng Dai mwlon - Martin Loncaric +myroslavlisniak - Myroslav Lisniak nandorKollar - Nandor Kollar +nartz - Nathan Artz +navis - Navis Ryu +neggert - Nic Eggert +nemccarthy - Nathan McCarthy +nishkamravi2 - Nishkam Ravi +noel-smith - Noel Smith nooberfsh - nooberfsh +npoggi - Nicolas Poggi +nxwhite-str - Nate Crosswhite +nyaapa - Arsenii Krasikov +odedz - Oded Zimerman oleg-smith - Oleg Kuznetsov ozancicek - Ozan Cicekci pengbo - Peng Bo +petermaxlee - Peter Lee +petz2000 - Patrick Baier +pgandhi999 - Parth Gandhi +phalodi - Sandeep Purohit +phatak-dev - Madhukara Phatak +pkch - pkch planga82 - Pablo Langa Blanco +pparkkin - Paavo Parkkinen +prabeesh - Prabeesh K praneetsharma - Praneet Sharma +priyankagargnitk - Priyanka Garg ptkool - Michael Styles qb-tarushg - Tarush Grover +qiansl127 - Shilei Qian +rahulpalamuttam - Rahul Palamuttam +rakeshchalasani - Rakesh Chalasani +ravipesala - Ravindra Pesala redsanket - Sanket Reddy redsk - Nicola Bova -roland1982 - roland1982 +rekhajoshm - Rekha Joshi +rimolive - Ricardo Martinelli De Oliveira +roland1982 - Roland Pogonyi rongma1997 - Rong Ma +rowan000 - Rowan Chattaway +roxchkplusony - Victor Tso rrusso2007 - Rob Russo +sadhen - Darcy Shen samsetegne - Samuel L. Setegne +sandeep-katta - Sandeep Katta sangramga - Sangram Gaikwad sarthfrey - Sarth Frey +sarutak - Kousuke Saruta +scwf - Wang Fei +seancxmao - Chenxiao Mao +seayi - Xiaohua Yi seayoun - Haiyang Yu +sel - Steve Larkin +sethah - Seth Hendrickson sev7e0 - Jiaqi Li -shahidki31 - Shahid +shahidki31 - Shahid K I sharangk - Sharanabasappa G Keriwaddi +sharkdtu - Xiaogang Tu sheepstop - Ting Yang +shenh062326 - Shen Hong +shimamoto - Takako Shimamoto +shimingfei - Shiming Fei shivsood - Shiv Prashant Sood +shivusondur - Shivakumar Sondur +sigmoidanalytics - Mayur Rustagi +sisihj - June He sitegui - Guilherme Souza +skonto - Stavros Kontopoulos slamke - Sun Ke +small-wang - Wang Wei southernriver - Liang Chen squito - Imran Rashid +stanzhai - Stan Zhai stczwd - Jackey Lee sujith71955 - Sujith Chacko +surq - Surong Quan suxingfate - Xinglong Wang -teeyog - teeyog +suyanNone - Su Yan +szheng79 - Shuai Zheng +tedyu - Ted Yu +teeyog - Yong Tian +texasmichelle - Michelle Casbon +tianyi - Yi Tian +tien-dungle - Tien-Dung Le +tigerquoll - Dale Richardson tinhto-000 - Tin Hang To tools4origins - tools4origins triplesheep - triplesheep +trystanleftwich - Trystan Leftwich turboFei - Fei Wang -ulysses-you - ulysses-you +ueshin - Takuya Ueshin +ulysses-you - Xiduo You +uncleGen - Uncle Gen uzadude - Ohad Raviv -wackxu - wackxu -wangjiaochun - wangjiaochun +uzmijnlm - Weizhe Huang +vinodkc - Vinod KC +viper-kun - Xu Kun +wackxu - Shiwei Xu +wangjiaochun - Jiaochun Wang wangshisan - wangshisan +wangxiaojing - Xiaojing Wang +watermen - Yadong Qi weixiuli - XiuLi Wei wenfang6 - wenfang6 wenxuanguan - wenxuanguan windpiger - Song Jun +witgo - Guoqiang Li woudygao - Woudy Gao +x1- - Yuri Saito xianyinxin - Xianyin Xin +xinyunh - Xinyun Huang +xuanyuanking - Yuanjian Li +xubo245 - Bo Xu +xuchenCN - Xu Chen +xueyumusic - Xue Yu +yanlin-Lynn - Yanlin Wang +yongtang - Yong Tang +ypcat - Pei-Lun Lee +yucai - Yucai Yu yunzoud - Yun Zou +zapletal-martin - Martin Zapletal zero323 - Maciej Szymkiewicz +zhangjiajin - Zhang JiaJin +zhengruifeng - Ruifeng Zheng +zhichao-li - Zhichao Li zjf2012 - Jiafu Zhang +zsxwing - Shixiong Zhu +zuotingbing - Tingbing Zuo +zuxqoj - Shekhar Bansal +zzcclp - Zhichao Zhang