diff --git a/R/pkg/R/mllib_fpm.R b/R/pkg/R/mllib_fpm.R index 0cc7a16c302dc..30bc51b932041 100644 --- a/R/pkg/R/mllib_fpm.R +++ b/R/pkg/R/mllib_fpm.R @@ -122,11 +122,12 @@ setMethod("spark.freqItemsets", signature(object = "FPGrowthModel"), # Get association rules. #' @return A \code{SparkDataFrame} with association rules. -#' The \code{SparkDataFrame} contains four columns: +#' The \code{SparkDataFrame} contains five columns: #' \code{antecedent} (an array of the same type as the input column), #' \code{consequent} (an array of the same type as the input column), #' \code{condfidence} (confidence for the rule) -#' and \code{lift} (lift for the rule) +#' \code{lift} (lift for the rule) +#' and \code{support} (support for the rule) #' @rdname spark.fpGrowth #' @aliases associationRules,FPGrowthModel-method #' @note spark.associationRules(FPGrowthModel) since 2.2.0 diff --git a/R/pkg/tests/fulltests/test_mllib_fpm.R b/R/pkg/tests/fulltests/test_mllib_fpm.R index bc1e17538d41a..78d26d3324473 100644 --- a/R/pkg/tests/fulltests/test_mllib_fpm.R +++ b/R/pkg/tests/fulltests/test_mllib_fpm.R @@ -45,7 +45,8 @@ test_that("spark.fpGrowth", { antecedent = I(list(list("2"), list("3"))), consequent = I(list(list("1"), list("1"))), confidence = c(1, 1), - lift = c(1, 1) + lift = c(1, 1), + support = c(0.75, 0.5) ) expect_equivalent(expected_association_rules, collect(spark.associationRules(model))) diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java index 186597fa64780..7205293aa48c5 100644 --- a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java +++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java @@ -341,8 +341,17 @@ public UTF8String substringSQL(int pos, int length) { // to the -ith element before the end of the sequence. If a start index i is 0, it // refers to the first element. int len = numChars(); + // `len + pos` does not overflow as `len >= 0`. int start = (pos > 0) ? pos -1 : ((pos < 0) ? len + pos : 0); - int end = (length == Integer.MAX_VALUE) ? len : start + length; + + int end; + if ((long) start + length > Integer.MAX_VALUE) { + end = Integer.MAX_VALUE; + } else if ((long) start + length < Integer.MIN_VALUE) { + end = Integer.MIN_VALUE; + } else { + end = start + length; + } return substring(start, end); } diff --git a/common/unsafe/src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java b/common/unsafe/src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java index 8f933877f82e6..70e276f7e5a8b 100644 --- a/common/unsafe/src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java +++ b/common/unsafe/src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java @@ -390,6 +390,10 @@ public void substringSQL() { assertEquals(fromString("example"), e.substringSQL(0, Integer.MAX_VALUE)); assertEquals(fromString("example"), e.substringSQL(1, Integer.MAX_VALUE)); assertEquals(fromString("xample"), e.substringSQL(2, Integer.MAX_VALUE)); + assertEquals(EMPTY_UTF8, e.substringSQL(-100, -100)); + assertEquals(EMPTY_UTF8, e.substringSQL(-1207959552, -1207959552)); + assertEquals(fromString("pl"), e.substringSQL(-3, 2)); + assertEquals(EMPTY_UTF8, e.substringSQL(Integer.MIN_VALUE, 6)); } @Test diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index ec8621bc55cf3..18cd5de4cfada 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -322,36 +322,22 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging // For testing def getMapSizesByExecutorId(shuffleId: Int, reduceId: Int) : Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = { - getMapSizesByExecutorId(shuffleId, reduceId, reduceId + 1) + getMapSizesByExecutorId(shuffleId, 0, Int.MaxValue, reduceId, reduceId + 1) } /** * Called from executors to get the server URIs and output sizes for each shuffle block that * needs to be read from a given range of map output partitions (startPartition is included but - * endPartition is excluded from the range). + * endPartition is excluded from the range) within a range of mappers (startMapIndex is included + * but endMapIndex is excluded). If endMapIndex=Int.MaxValue, the actual endMapIndex will be + * changed to the length of total map outputs. * * @return A sequence of 2-item tuples, where the first item in the tuple is a BlockManagerId, * and the second item is a sequence of (shuffle block id, shuffle block size, map index) * tuples describing the shuffle blocks that are stored at that block manager. + * Note that zero-sized blocks are excluded in the result. */ def getMapSizesByExecutorId( - shuffleId: Int, - startPartition: Int, - endPartition: Int) - : Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] - - /** - * Called from executors to get the server URIs and output sizes for each shuffle block that - * needs to be read from a given range of map output partitions (startPartition is included but - * endPartition is excluded from the range) and is produced by - * a range of mappers (startMapIndex, endMapIndex, startMapIndex is included and - * the endMapIndex is excluded). - * - * @return A sequence of 2-item tuples, where the first item in the tuple is a BlockManagerId, - * and the second item is a sequence of (shuffle block id, shuffle block size, map index) - * tuples describing the shuffle blocks that are stored at that block manager. - */ - def getMapSizesByRange( shuffleId: Int, startMapIndex: Int, endMapIndex: Int, @@ -734,38 +720,22 @@ private[spark] class MapOutputTrackerMaster( } } - // Get blocks sizes by executor Id. Note that zero-sized blocks are excluded in the result. // This method is only called in local-mode. def getMapSizesByExecutorId( - shuffleId: Int, - startPartition: Int, - endPartition: Int) - : Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = { - logDebug(s"Fetching outputs for shuffle $shuffleId, partitions $startPartition-$endPartition") - shuffleStatuses.get(shuffleId) match { - case Some (shuffleStatus) => - shuffleStatus.withMapStatuses { statuses => - MapOutputTracker.convertMapStatuses( - shuffleId, startPartition, endPartition, statuses, 0, shuffleStatus.mapStatuses.length) - } - case None => - Iterator.empty - } - } - - override def getMapSizesByRange( shuffleId: Int, startMapIndex: Int, endMapIndex: Int, startPartition: Int, endPartition: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = { - logDebug(s"Fetching outputs for shuffle $shuffleId, mappers $startMapIndex-$endMapIndex" + - s"partitions $startPartition-$endPartition") + logDebug(s"Fetching outputs for shuffle $shuffleId") shuffleStatuses.get(shuffleId) match { case Some(shuffleStatus) => shuffleStatus.withMapStatuses { statuses => + val actualEndMapIndex = if (endMapIndex == Int.MaxValue) statuses.length else endMapIndex + logDebug(s"Convert map statuses for shuffle $shuffleId, " + + s"mappers $startMapIndex-$actualEndMapIndex, partitions $startPartition-$endPartition") MapOutputTracker.convertMapStatuses( - shuffleId, startPartition, endPartition, statuses, startMapIndex, endMapIndex) + shuffleId, startPartition, endPartition, statuses, startMapIndex, actualEndMapIndex) } case None => Iterator.empty @@ -798,37 +768,20 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr */ private val fetchingLock = new KeyLock[Int] - // Get blocks sizes by executor Id. Note that zero-sized blocks are excluded in the result. override def getMapSizesByExecutorId( - shuffleId: Int, - startPartition: Int, - endPartition: Int) - : Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = { - logDebug(s"Fetching outputs for shuffle $shuffleId, partitions $startPartition-$endPartition") - val statuses = getStatuses(shuffleId, conf) - try { - MapOutputTracker.convertMapStatuses( - shuffleId, startPartition, endPartition, statuses, 0, statuses.length) - } catch { - case e: MetadataFetchFailedException => - // We experienced a fetch failure so our mapStatuses cache is outdated; clear it: - mapStatuses.clear() - throw e - } - } - - override def getMapSizesByRange( shuffleId: Int, startMapIndex: Int, endMapIndex: Int, startPartition: Int, endPartition: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = { - logDebug(s"Fetching outputs for shuffle $shuffleId, mappers $startMapIndex-$endMapIndex" + - s"partitions $startPartition-$endPartition") + logDebug(s"Fetching outputs for shuffle $shuffleId") val statuses = getStatuses(shuffleId, conf) try { + val actualEndMapIndex = if (endMapIndex == Int.MaxValue) statuses.length else endMapIndex + logDebug(s"Convert map statuses for shuffle $shuffleId, " + + s"mappers $startMapIndex-$actualEndMapIndex, partitions $startPartition-$endPartition") MapOutputTracker.convertMapStatuses( - shuffleId, startPartition, endPartition, statuses, startMapIndex, endMapIndex) + shuffleId, startPartition, endPartition, statuses, startMapIndex, actualEndMapIndex) } catch { case e: MetadataFetchFailedException => // We experienced a fetch failure so our mapStatuses cache is outdated; clear it: diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala index 057b0d6e0b0a7..400c4526f0114 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala @@ -43,23 +43,31 @@ private[spark] trait ShuffleManager { context: TaskContext, metrics: ShuffleWriteMetricsReporter): ShuffleWriter[K, V] + /** - * Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive). + * Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive) to + * read from all map outputs of the shuffle. + * * Called on executors by reduce tasks. */ - def getReader[K, C]( + final def getReader[K, C]( handle: ShuffleHandle, startPartition: Int, endPartition: Int, context: TaskContext, - metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] + metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = { + getReader(handle, 0, Int.MaxValue, startPartition, endPartition, context, metrics) + } /** * Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive) to - * read from map output (startMapIndex to endMapIndex - 1, inclusive). + * read from a range of map outputs(startMapIndex to endMapIndex-1, inclusive). + * If endMapIndex=Int.MaxValue, the actual endMapIndex will be changed to the length of total map + * outputs of the shuffle in `getMapSizesByExecutorId`. + * * Called on executors by reduce tasks. */ - def getReaderForRange[K, C]( + def getReader[K, C]( handle: ShuffleHandle, startMapIndex: Int, endMapIndex: Int, diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala index aefcb59b8bb87..72460180f5908 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala @@ -23,6 +23,7 @@ import scala.collection.JavaConverters._ import org.apache.spark._ import org.apache.spark.internal.{config, Logging} +import org.apache.spark.scheduler.MapStatus import org.apache.spark.shuffle._ import org.apache.spark.shuffle.api.{ShuffleDataIO, ShuffleExecutorComponents} import org.apache.spark.util.Utils @@ -115,23 +116,14 @@ private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager } /** - * Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive). + * Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive) to + * read from a range of map outputs(startMapIndex to endMapIndex-1, inclusive). + * If endMapIndex=Int.MaxValue, the actual endMapIndex will be changed to the length of total map + * outputs of the shuffle in `getMapSizesByExecutorId`. + * * Called on executors by reduce tasks. */ override def getReader[K, C]( - handle: ShuffleHandle, - startPartition: Int, - endPartition: Int, - context: TaskContext, - metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = { - val blocksByAddress = SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId( - handle.shuffleId, startPartition, endPartition) - new BlockStoreShuffleReader( - handle.asInstanceOf[BaseShuffleHandle[K, _, C]], blocksByAddress, context, metrics, - shouldBatchFetch = canUseBatchFetch(startPartition, endPartition, context)) - } - - override def getReaderForRange[K, C]( handle: ShuffleHandle, startMapIndex: Int, endMapIndex: Int, @@ -139,7 +131,7 @@ private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager endPartition: Int, context: TaskContext, metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = { - val blocksByAddress = SparkEnv.get.mapOutputTracker.getMapSizesByRange( + val blocksByAddress = SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId( handle.shuffleId, startMapIndex, endMapIndex, startPartition, endPartition) new BlockStoreShuffleReader( handle.asInstanceOf[BaseShuffleHandle[K, _, C]], blocksByAddress, context, metrics, diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 1c788a30022d0..ced3f9d15720d 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -1078,7 +1078,10 @@ private[spark] object JsonProtocol { val blockManagerAddress = blockManagerIdFromJson(json \ "Block Manager Address") val shuffleId = (json \ "Shuffle ID").extract[Int] val mapId = (json \ "Map ID").extract[Long] - val mapIndex = (json \ "Map Index").extract[Int] + val mapIndex = (json \ "Map Index") match { + case JNothing => 0 + case x => x.extract[Int] + } val reduceId = (json \ "Reduce ID").extract[Int] val message = jsonOption(json \ "Message").map(_.extract[String]) new FetchFailed(blockManagerAddress, shuffleId, mapId, mapIndex, reduceId, diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala index d5ee19bde8edf..630ffd9baa06e 100644 --- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala @@ -317,7 +317,7 @@ class MapOutputTrackerSuite extends SparkFunSuite { tracker.registerMapOutput(10, 1, MapStatus(BlockManagerId("b", "hostB", 1000), Array(size10000, size0, size1000, size0), 6)) assert(tracker.containsShuffle(10)) - assert(tracker.getMapSizesByExecutorId(10, 0, 4).toSeq === + assert(tracker.getMapSizesByExecutorId(10, 0, 2, 0, 4).toSeq === Seq( (BlockManagerId("a", "hostA", 1000), Seq((ShuffleBlockId(10, 5, 1), size1000, 0), diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 9d412f2dba3ce..51d20d3428915 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -349,9 +349,9 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi it.next.asInstanceOf[Tuple2[_, _]]._1 /** Send the given CompletionEvent messages for the tasks in the TaskSet. */ - private def complete(taskSet: TaskSet, results: Seq[(TaskEndReason, Any)]): Unit = { - assert(taskSet.tasks.size >= results.size) - for ((result, i) <- results.zipWithIndex) { + private def complete(taskSet: TaskSet, taskEndInfos: Seq[(TaskEndReason, Any)]): Unit = { + assert(taskSet.tasks.size >= taskEndInfos.size) + for ((result, i) <- taskEndInfos.zipWithIndex) { if (i < taskSet.tasks.size) { runEvent(makeCompletionEvent(taskSet.tasks(i), result._1, result._2)) } @@ -405,6 +405,15 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi runEvent(JobCancelled(jobId, None)) } + /** Make some tasks in task set success and check results. */ + private def completeAndCheckAnswer( + taskSet: TaskSet, + taskEndInfos: Seq[(TaskEndReason, Any)], + expected: Map[Int, Any]): Unit = { + complete(taskSet, taskEndInfos) + assert(this.results === expected) + } + test("[SPARK-3353] parent stage should have lower stage id") { sc.parallelize(1 to 10).map(x => (x, x)).reduceByKey(_ + _, 4).count() val stageByOrderOfExecution = sparkListener.stageByOrderOfExecution @@ -461,8 +470,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi completeShuffleMapStageSuccessfully(0, 0, 1) completeShuffleMapStageSuccessfully(1, 0, 1) completeShuffleMapStageSuccessfully(2, 0, 1) - complete(taskSets(3), Seq((Success, 42))) - assert(results === Map(0 -> 42)) + completeAndCheckAnswer(taskSets(3), Seq((Success, 42)), Map(0 -> 42)) assertDataStructuresEmpty() } @@ -558,8 +566,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi test("run trivial job") { submit(new MyRDD(sc, 1, Nil), Array(0)) - complete(taskSets(0), List((Success, 42))) - assert(results === Map(0 -> 42)) + completeAndCheckAnswer(taskSets(0), Seq((Success, 42)), Map(0 -> 42)) assertDataStructuresEmpty() } @@ -567,8 +574,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi val baseRdd = new MyRDD(sc, 1, Nil) val finalRdd = new MyRDD(sc, 1, List(new OneToOneDependency(baseRdd))) submit(finalRdd, Array(0)) - complete(taskSets(0), Seq((Success, 42))) - assert(results === Map(0 -> 42)) + completeAndCheckAnswer(taskSets(0), Seq((Success, 42)), Map(0 -> 42)) assertDataStructuresEmpty() } @@ -592,8 +598,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi submit(finalRdd, Array(0)) val taskSet = taskSets(0) assertLocations(taskSet, Seq(Seq("hostA", "hostB"))) - complete(taskSet, Seq((Success, 42))) - assert(results === Map(0 -> 42)) + completeAndCheckAnswer(taskSet, Seq((Success, 42)), Map(0 -> 42)) assertDataStructuresEmpty() } @@ -729,8 +734,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi assert(failure === null) // When the task set completes normally, state should be correctly updated. - complete(taskSets(0), Seq((Success, 42))) - assert(results === Map(0 -> 42)) + completeAndCheckAnswer(taskSets(0), Seq((Success, 42)), Map(0 -> 42)) assertDataStructuresEmpty() assert(sparkListener.failedStages.isEmpty) @@ -746,8 +750,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi completeShuffleMapStageSuccessfully(0, 0, 1) assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1).toSet === HashSet(makeBlockManagerId("hostA"), makeBlockManagerId("hostB"))) - complete(taskSets(1), Seq((Success, 42))) - assert(results === Map(0 -> 42)) + completeAndCheckAnswer(taskSets(1), Seq((Success, 42)), Map(0 -> 42)) assertDataStructuresEmpty() } @@ -771,8 +774,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi // we can see both result blocks now assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1.host).toSet === HashSet("hostA", "hostB")) - complete(taskSets(3), Seq((Success, 43))) - assert(results === Map(0 -> 42, 1 -> 43)) + completeAndCheckAnswer(taskSets(3), Seq((Success, 43)), Map(0 -> 42, 1 -> 43)) assertDataStructuresEmpty() } @@ -1454,8 +1456,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi HashSet(makeBlockManagerId("hostB"), makeBlockManagerId("hostA"))) // finish the next stage normally, which completes the job - complete(taskSets(1), Seq((Success, 42), (Success, 43))) - assert(results === Map(0 -> 42, 1 -> 43)) + completeAndCheckAnswer(taskSets(1), Seq((Success, 42), (Success, 43)), Map(0 -> 42, 1 -> 43)) assertDataStructuresEmpty() } @@ -1796,9 +1797,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi // lets say there is a fetch failure in this task set, which makes us go back and // run stage 0, attempt 1 - complete(taskSets(1), Seq( - (FetchFailed(makeBlockManagerId("hostA"), - shuffleDep1.shuffleId, 0L, 0, 0, "ignored"), null))) + completeNextStageWithFetchFailure(1, 0, shuffleDep1) scheduler.resubmitFailedStages() // stage 0, attempt 1 should have the properties of job2 @@ -1872,9 +1871,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi // have the second stage complete normally completeShuffleMapStageSuccessfully(1, 0, 1, Seq("hostA", "hostC")) // fail the third stage because hostA went down - complete(taskSets(2), Seq( - (FetchFailed(makeBlockManagerId("hostA"), - shuffleDepTwo.shuffleId, 0L, 0, 0, "ignored"), null))) + completeNextStageWithFetchFailure(2, 0, shuffleDepTwo) // TODO assert this: // blockManagerMaster.removeExecutor("hostA-exec") // have DAGScheduler try again @@ -1900,9 +1897,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi // complete stage 1 completeShuffleMapStageSuccessfully(1, 0, 1) // pretend stage 2 failed because hostA went down - complete(taskSets(2), Seq( - (FetchFailed(makeBlockManagerId("hostA"), - shuffleDepTwo.shuffleId, 0L, 0, 0, "ignored"), null))) + completeNextStageWithFetchFailure(2, 0, shuffleDepTwo) // TODO assert this: // blockManagerMaster.removeExecutor("hostA-exec") // DAGScheduler should notice the cached copy of the second shuffle and try to get it rerun. diff --git a/core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala index a82f86a11c77e..d964b28df2983 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala @@ -104,7 +104,7 @@ class BlockStoreShuffleReaderSuite extends SparkFunSuite with LocalSparkContext // shuffle data to read. val mapOutputTracker = mock(classOf[MapOutputTracker]) when(mapOutputTracker.getMapSizesByExecutorId( - shuffleId, reduceId, reduceId + 1)).thenReturn { + shuffleId, 0, numMaps, reduceId, reduceId + 1)).thenReturn { // Test a scenario where all data is local, to avoid creating a bunch of additional mocks // for the code to read data over the network. val shuffleBlockIdsAndSizes = (0 until numMaps).map { mapId => @@ -132,7 +132,7 @@ class BlockStoreShuffleReaderSuite extends SparkFunSuite with LocalSparkContext val taskContext = TaskContext.empty() val metrics = taskContext.taskMetrics.createTempShuffleReadMetrics() val blocksByAddress = mapOutputTracker.getMapSizesByExecutorId( - shuffleId, reduceId, reduceId + 1) + shuffleId, 0, numMaps, reduceId, reduceId + 1) val shuffleReader = new BlockStoreShuffleReader( shuffleHandle, blocksByAddress, diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 5a4073baa19d4..955589fc5b47b 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -327,6 +327,17 @@ class JsonProtocolSuite extends SparkFunSuite { assert(expectedFetchFailed === JsonProtocol.taskEndReasonFromJson(oldEvent)) } + test("SPARK-32124: FetchFailed Map Index backwards compatibility") { + // FetchFailed in Spark 2.4.0 does not have "Map Index" property. + val fetchFailed = FetchFailed(BlockManagerId("With or", "without you", 15), 17, 16L, 18, 19, + "ignored") + val oldEvent = JsonProtocol.taskEndReasonToJson(fetchFailed) + .removeField({ _._1 == "Map Index" }) + val expectedFetchFailed = FetchFailed(BlockManagerId("With or", "without you", 15), 17, 16L, + 0, 19, "ignored") + assert(expectedFetchFailed === JsonProtocol.taskEndReasonFromJson(oldEvent)) + } + test("ShuffleReadMetrics: Local bytes read backwards compatibility") { // Metrics about local shuffle bytes read were added in 1.3.1. val metrics = makeTaskMetrics(1L, 2L, 3L, 4L, 5, 6, diff --git a/dev/create-release/known_translations b/dev/create-release/known_translations index 376398bc3788a..ff41cccde0140 100644 --- a/dev/create-release/known_translations +++ b/dev/create-release/known_translations @@ -1,411 +1,410 @@ # This is a mapping of names to be translated through translate-contributors.py -# The format expected on each line should be: - -CodingCat - Nan Zhu -CrazyJvm - Chao Chen -EugenCepoi - Eugen Cepoi -GraceH - Jie Huang -JerryLead - Lijie Xu -Leolh - Liu Hao -Lewuathe - Kai Sasaki -RongGu - Rong Gu -Shiti - Shiti Saxena -Victsm - Min Shen -WangTaoTheTonic - Wang Tao -XuTingjun - Tingjun Xu -YanTangZhai - Yantang Zhai -alexdebrie - Alex DeBrie -alokito - Alok Saldanha -anantasty - Anant Asthana -andrewor14 - Andrew Or -aniketbhatnagar - Aniket Bhatnagar -arahuja - Arun Ahuja -brkyvz - Burak Yavuz -chesterxgchen - Chester Chen -chiragaggarwal - Chirag Aggarwal -chouqin - Qiping Li -cocoatomo - Tomohiko K. -coderfi - Fairiz Azizi -coderxiang - Shuo Xiang -davies - Davies Liu -epahomov - Egor Pahomov -falaki - Hossein Falaki -freeman-lab - Jeremy Freeman -industrial-sloth - Jascha Swisher -jackylk - Jacky Li -jayunit100 - Jay Vyas -jerryshao - Saisai Shao -jkbradley - Joseph Bradley -lianhuiwang - Lianhui Wang -lirui-intel - Rui Li -luluorta - Lu Lu -luogankun - Gankun Luo -maji2014 - Derek Ma -mccheah - Matthew Cheah -mengxr - Xiangrui Meng -nartz - Nathan Artz -odedz - Oded Zimerman -ravipesala - Ravindra Pesala -roxchkplusony - Victor Tso -scwf - Wang Fei -shimingfei - Shiming Fei -surq - Surong Quan -suyanNone - Su Yan -tedyu - Ted Yu -tigerquoll - Dale Richardson -wangxiaojing - Xiaojing Wang -watermen - Yadong Qi -witgo - Guoqiang Li -xinyunh - Xinyun Huang -zsxwing - Shixiong Zhu -Bilna - Bilna P -DoingDone9 - Doing Done -Earne - Ernest -FlytxtRnD - Meethu Mathew -GenTang - Gen TANG -JoshRosen - Josh Rosen -MechCoder - Manoj Kumar -OopsOutOfMemory - Sheng Li -Peishen-Jia - Peishen Jia -SaintBacchus - Huang Zhaowei -azagrebin - Andrey Zagrebin -bzz - Alexander Bezzubov -fjiang6 - Fan Jiang -gasparms - Gaspar Munoz -guowei2 - Guo Wei -hhbyyh - Yuhao Yang -hseagle - Peng Xu -javadba - Stephen Boesch -jbencook - Ben Cook -kul - Kuldeep -ligangty - Gang Li -marsishandsome - Liangliang Gu -medale - Markus Dale -nemccarthy - Nathan McCarthy -nxwhite-str - Nate Crosswhite -seayi - Xiaohua Yi -tianyi - Yi Tian -uncleGen - Uncle Gen -viper-kun - Xu Kun -x1- - Yuri Saito -zapletal-martin - Martin Zapletal -zuxqoj - Shekhar Bansal -mingyukim - Mingyu Kim -sigmoidanalytics - Mayur Rustagi -AiHe - Ai He -BenFradet - Ben Fradet -FavioVazquez - Favio Vazquez -JaysonSunshine - Jayson Sunshine -Liuchang0812 - Liu Chang -Sephiroth-Lin - Sephiroth Lin -dobashim - Masaru Dobashi -ehnalis - Zoltan Zvara -emres - Emre Sevinc -gchen - Guancheng Chen -haiyangsea - Haiyang Sea -hlin09 - Hao Lin -hqzizania - Qian Huang -jeanlyn - Jean Lyn -jerluc - Jeremy A. Lucas -jrabary - Jaonary Rabarisoa -judynash - Judy Nash -kaka1992 - Chen Song -ksonj - Kalle Jepsen -kuromatsu-nobuyuki - Nobuyuki Kuromatsu -lazyman500 - Dong Xu -leahmcguire - Leah McGuire -mbittmann - Mark Bittmann -mbonaci - Marko Bonaci -meawoppl - Matthew Goodman -nyaapa - Arsenii Krasikov -phatak-dev - Madhukara Phatak -prabeesh - Prabeesh K -rakeshchalasani - Rakesh Chalasani -rekhajoshm - Rekha Joshi -sisihj - June He -szheng79 - Shuai Zheng -texasmichelle - Michelle Casbon -vinodkc - Vinod KC -yongtang - Yong Tang -ypcat - Pei-Lun Lee -zhichao-li - Zhichao Li -zzcclp - Zhichao Zhang -979969786 - Yuming Wang -Rosstin - Rosstin Murphy -ameyc - Amey Chaugule -animeshbaranawal - Animesh Baranawal -cafreeman - Chris Freeman -lee19 - Lee -lockwobr - Brian Lockwood -navis - Navis Ryu -pparkkin - Paavo Parkkinen -HyukjinKwon - Hyukjin Kwon -JDrit - Joseph Batchik -JuhongPark - Juhong Park -KaiXinXiaoLei - KaiXinXIaoLei -NamelessAnalyst - NamelessAnalyst -alyaxey - Alex Slusarenko -baishuo - Shuo Bai -fe2s - Oleksiy Dyagilev -felixcheung - Felix Cheung -feynmanliang - Feynman Liang -josepablocam - Jose Cambronero -kai-zeng - Kai Zeng -mosessky - mosessky -msannell - Michael Sannella -nishkamravi2 - Nishkam Ravi -noel-smith - Noel Smith -petz2000 - Patrick Baier -qiansl127 - Shilei Qian -rahulpalamuttam - Rahul Palamuttam -rowan000 - Rowan Chattaway -sarutak - Kousuke Saruta -sethah - Seth Hendrickson -small-wang - Wang Wei -stanzhai - Stan Zhai -tien-dungle - Tien-Dung Le -xuchenCN - Xu Chen -zhangjiajin - Zhang JiaJin -ClassNotFoundExp - Fu Xing -KevinGrealish - Kevin Grealish -MasterDDT - Mitesh Patel -VinceShieh - Vincent Xie -WeichenXu123 - Weichen Xu -Yunni - Yun Ni -actuaryzhang - Wayne Zhang -alicegugu - Gu Huiqin Alice -anabranch - Bill Chambers -ashangit - Nicolas Fraison -avulanov - Alexander Ulanov -biglobster - Liang Ke -cenyuhai - Yuhai Cen -codlife - Jianfei Wang -david-weiluo-ren - Weiluo (David) Ren -dding3 - Ding Ding -fidato13 - Tarun Kumar -frreiss - Fred Reiss -gatorsmile - Xiao Li -hayashidac - Chie Hayashida -invkrh - Hao Ren -jagadeesanas2 - Jagadeesan A S -jiangxb1987 - Jiang Xingbo -jisookim0513 - Jisoo Kim -junyangq - Junyang Qian -krishnakalyan3 - Krishna Kalyan -linbojin - Linbo Jin -mpjlu - Peng Meng -neggert - Nic Eggert -petermaxlee - Peter Lee -phalodi - Sandeep Purohit -pkch - pkch -priyankagargnitk - Priyanka Garg -sharkdtu - Xiaogang Tu -shenh062326 - Shen Hong -aokolnychyi - Anton Okolnychyi -linbojin - Linbo Jin -lw-lin - Liwei Lin +# The format expected on each line should be: - +012huang - Weiyi Huang +07ARB - Ankit Raj Boudh 10110346 - Xian Liu +979969786 - Yuming Wang Achuth17 - Achuth Narayan Rajagopal Adamyuanyuan - Adam Wang -DylanGuedes - Dylan Guedes -JiahuiJiang - Jiahui Jiang -KevinZwx - Kevin Zhang -LantaoJin - Lantao Jin -Lemonjing - Rann Tao -LucaCanali - Luca Canali -XD-DENG - Xiaodong Deng -aai95 - Aleksei Izmalkin -akonopko - Alexander Konopko -ankuriitg - Ankur Gupta -arucard21 - Riaas Mokiem -attilapiros - Attila Zsolt Piros -bravo-zhang - Bravo Zhang -caneGuy - Kang Zhou -chaoslawful - Xiaozhe Wang -cluo512 - Chuan Luo -codeatri - Neha Patil -crafty-coder - Carlos Pena -debugger87 - Chaozhong Yang -e-dorigatti - Emilio Dorigatti -eric-maynard - Eric Maynard -felixalbani - Felix Albani -fjh100456 - Jinhua Fu -guoxiaolongzte - Xiaolong Guo -heary-cao - Xuewen Cao -huangweizhe123 - Weizhe Huang -ivoson - Tengfei Huang -jinxing64 - Jin Xing -liu-zhaokun - Zhaokun Liu -liutang123 - Lijia Liu -maropu - Takeshi Yamamuro -maryannxue - Maryann Xue -mcteo - Thomas Dunne -mn-mikke - Marek Novotny -myroslavlisniak - Myroslav Lisniak -npoggi - Nicolas Poggi -pgandhi999 - Parth Gandhi -rimolive - Ricardo Martinelli De Oliveira -sadhen - Darcy Shen -sandeep-katta - Sandeep Katta -seancxmao - Chenxiao Mao -sel - Steve Larkin -shimamoto - Takako Shimamoto -shivusondur - Shivakumar Sondur -skonto - Stavros Kontopoulos -trystanleftwich - Trystan Leftwich -ueshin - Takuya Ueshin -uzmijnlm - Weizhe Huang -xuanyuanking - Yuanjian Li -xubo245 - Bo Xu -xueyumusic - Xue Yu -yanlin-Lynn - Yanlin Wang -yucai - Yucai Yu -zhengruifeng - Ruifeng Zheng -zuotingbing - Tingbing Zuo -012huang - Weiyi Huang -07ARB - Ankit Raj Boudh +AiHe - Ai He Andrew-Crosby - Andrew Crosby AngersZhuuuu - Yi Zhu +BenFradet - Ben Fradet +Bilna - Bilna P +ClassNotFoundExp - Fu Xing +CodingCat - Nan Zhu +CrazyJvm - Chao Chen Deegue - Yizhong Zhang +DoingDone9 - Doing Done +DylanGuedes - Dylan Guedes +Earne - Ernest +EugenCepoi - Eugen Cepoi +FavioVazquez - Favio Vazquez +FlytxtRnD - Meethu Mathew +GenTang - Gen TANG +GraceH - Jie Huang Gschiavon - German Schiavon Matteo GuoPhilipse - Philipse Guo Hellsen83 - Erik Christiansen +HyukjinKwon - Hyukjin Kwon Icysandwich - Icysandwich +JDrit - Joseph Batchik JasonWayne - Wenjie Wu +JaysonSunshine - Jayson Sunshine +JerryLead - Lijie Xu +JiahuiJiang - Jiahui Jiang JkSelf - Ke Jia JoanFM - Joan Fontanals +JoshRosen - Josh Rosen +JuhongPark - Juhong Park JulienPeloton - Julien Peloton +KaiXinXiaoLei - KaiXinXIaoLei +KevinGrealish - Kevin Grealish +KevinZwx - Kevin Zhang Koraseg - Artem Kupchinskiy KyleLi1985 - Liang Li +LantaoJin - Lantao Jin +Lemonjing - Rann Tao +Leolh - Liu Hao +Lewuathe - Kai Sasaki LiShuMing - Shuming Li -LinhongLiu - Liu, Linhong +LinhongLiu - Linhong Liu +Liuchang0812 - Liu Chang +LucaCanali - Luca Canali LuciferYang - Yang Jie +MasterDDT - Mitesh Patel MaxGekk - Maxim Gekk +MechCoder - Manoj Kumar +NamelessAnalyst - NamelessAnalyst Ngone51 - Yi Wu +OopsOutOfMemory - Sheng Li PavithraRamachandran - Pavithra Ramachandran +Peishen-Jia - Peishen Jia +RongGu - Rong Gu +Rosstin - Rosstin Murphy +SaintBacchus - Huang Zhaowei +Sephiroth-Lin - Sephiroth Lin +Shiti - Shiti Saxena SongYadong - Yadong Song TigerYang414 - David Yang TomokoKomiyama - Tomoko Komiyama TopGunViper - TopGunViper Udbhav30 - Udbhav Agrawal +Victsm - Min Shen +VinceShieh - Vincent Xie WangGuangxin - Guangxin Wang +WangTaoTheTonic - Wang Tao +WeichenXu123 - Weichen Xu William1104 - William Wong +XD-DENG - Xiaodong Deng +XuTingjun - Tingjun Xu +YanTangZhai - Yantang Zhai YongjinZhou - Yongjin Zhou +Yunni - Yun Ni +aai95 - Aleksei Izmalkin aaruna - Aaruna Godthi +actuaryzhang - Wayne Zhang adrian555 - Weiqiang Zhuang ajithme - Ajith S +akonopko - Alexander Konopko +alexdebrie - Alex DeBrie +alicegugu - Gu Huiqin Alice +alokito - Alok Saldanha +alyaxey - Alex Slusarenko amanomer - Aman Omer +ameyc - Amey Chaugule +anabranch - Bill Chambers +anantasty - Anant Asthana ancasarb - Anca Sarb +andrewor14 - Andrew Or +aniketbhatnagar - Aniket Bhatnagar +animeshbaranawal - Animesh Baranawal +ankuriitg - Ankur Gupta +aokolnychyi - Anton Okolnychyi +arahuja - Arun Ahuja +arucard21 - Riaas Mokiem +ashangit - Nicolas Fraison +attilapiros - Attila Zsolt Piros avkgh - Aleksandr Kashkirov +avulanov - Alexander Ulanov ayudovin - Artsiom Yudovin +azagrebin - Andrey Zagrebin +baishuo - Shuo Bai bartosz25 - Bartosz Konieczny beliefer - Jiaan Geng bettermouse - Chen Hao +biglobster - Liang Ke +bravo-zhang - Bravo Zhang +brkyvz - Burak Yavuz bscan - Brian Scannell +bzz - Alexander Bezzubov +cafreeman - Chris Freeman +caneGuy - Kang Zhou cchung100m - Neo Chien cclauss - Christian Clauss +cenyuhai - Yuhai Cen chakravarthiT - Chakravarthi chandulal - Chandu Kavar +chaoslawful - Xiaozhe Wang +chesterxgchen - Chester Chen +chiragaggarwal - Chirag Aggarwal chitralverma - Chitral Verma -cjn082030 - Jenny +chouqin - Qiping Li +cjn082030 - Juanni Chen cloud-fan - Wenchen Fan +cluo512 - Chuan Luo +cocoatomo - Tomohiko K. +codeatri - Neha Patil codeborui - codeborui +coderfi - Fairiz Azizi +coderxiang - Shuo Xiang +codlife - Jianfei Wang colinmjj - Colin Ma -cxzl25 - cxzl25 +crafty-coder - Carlos Pena +cxzl25 - Shaoyun Chen cyq89051127 - Yongqiang Chai darrentirto - Darren Tirto +david-weiluo-ren - Weiluo (David) Ren daviddingly - Xiaoyuan Ding davidvrba - David Vrba +davies - Davies Liu +dding3 - Ding Ding +debugger87 - Chaozhong Yang deepyaman - Deepyaman Datta denglingang - Lingang Deng dengziming - dengziming deshanxiao - deshanxiao dima-asana - Dima Kamalov dlindelof - David Lindelof +dobashim - Masaru Dobashi dongjoon-hyun - Dongjoon Hyun -eatoncys - eatoncys +e-dorigatti - Emilio Dorigatti +eatoncys - Yanshan Chen +ehnalis - Zoltan Zvara +emres - Emre Sevinc +epahomov - Egor Pahomov +eric-maynard - Eric Maynard +falaki - Hossein Falaki fan31415 - Yijie Fan +fe2s - Oleksiy Dyagilev +felixalbani - Felix Albani +felixcheung - Felix Cheung +feynmanliang - Feynman Liang +fidato13 - Tarun Kumar fitermay - Yuli Fiterman +fjh100456 - Jinhua Fu +fjiang6 - Fan Jiang francis0407 - Mingcong Han +freeman-lab - Jeremy Freeman +frreiss - Fred Reiss fuwhu - Fuwang Hu +gasparms - Gaspar Munoz +gatorsmile - Xiao Li +gchen - Guancheng Chen gss2002 - Greg Senia +guowei2 - Guo Wei +guoxiaolongzte - Xiaolong Guo +haiyangsea - Haiyang Sea +hayashidac - Chie Hayashida hddong - Dongdong Hong +heary-cao - Xuewen Cao hehuiyuan - hehuiyuan helenyugithub - Helen Yu +hhbyyh - Yuhao Yang highmoutain - highmoutain +hlin09 - Hao Lin +hqzizania - Qian Huang +hseagle - Peng Xu httfighter - Tiantian Han huangtianhua - huangtianhua +huangweizhe123 - Weizhe Huang hvanhovell - Herman Van Hovell iRakson - Rakesh Raushan igorcalabria - Igor Calabria imback82 - Terry Kim +industrial-sloth - Jascha Swisher +invkrh - Hao Ren +ivoson - Tengfei Huang +jackylk - Jacky Li +jagadeesanas2 - Jagadeesan A S +javadba - Stephen Boesch javierivanov - Javier Fuentes +jayunit100 - Jay Vyas +jbencook - Ben Cook +jeanlyn - Jean Lyn +jerluc - Jeremy A. Lucas +jerryshao - Saisai Shao +jiangxb1987 - Jiang Xingbo +jinxing64 - Jin Xing +jisookim0513 - Jisoo Kim +jkbradley - Joseph Bradley joelgenter - Joel Genter +josepablocam - Jose Cambronero +jrabary - Jaonary Rabarisoa +judynash - Judy Nash +junyangq - Junyang Qian +kai-zeng - Kai Zeng +kaka1992 - Chen Song ketank-new - Ketan Kunde +krishnakalyan3 - Krishna Kalyan +ksonj - Kalle Jepsen +kul - Kuldeep +kuromatsu-nobuyuki - Nobuyuki Kuromatsu laskfla - Keith Sun +lazyman500 - Dong Xu lcqzte10192193 - Chaoqun Li +leahmcguire - Leah McGuire +lee19 - Lee leoluan2009 - Xuedong Luan liangxs - Xuesen Liang +lianhuiwang - Lianhui Wang lidinghao - Li Hao +ligangty - Gang Li +linbojin - Linbo Jin linehrr - Ryne Yang linzebing - Zebing Lin lipzhu - Lipeng Zhu +lirui-intel - Rui Li +liu-zhaokun - Zhaokun Liu liucht-inspur - liucht-inspur liupc - Pengcheng Liu +liutang123 - Lijia Liu liwensun - Liwen Sun +lockwobr - Brian Lockwood +luluorta - Lu Lu +luogankun - Gankun Luo +lw-lin - Liwei Lin +maji2014 - Derek Ma manuzhang - Manu Zhang mareksimunek - Marek Simunek +maropu - Takeshi Yamamuro +marsishandsome - Liangliang Gu +maryannxue - Maryann Xue masa3141 - Masahiro Kazama +mbittmann - Mark Bittmann +mbonaci - Marko Bonaci +mccheah - Matthew Cheah +mcteo - Thomas Dunne mdianjun - Dianjun Ma +meawoppl - Matthew Goodman +medale - Markus Dale +mengxr - Xiangrui Meng merrily01 - Ruilei Ma +mingyukim - Mingyu Kim +mn-mikke - Marek Novotny mob-ai - mob-ai +mosessky - mosessky +mpjlu - Peng Meng +msannell - Michael Sannella mu5358271 - Shuheng Dai mwlon - Martin Loncaric +myroslavlisniak - Myroslav Lisniak nandorKollar - Nandor Kollar +nartz - Nathan Artz +navis - Navis Ryu +neggert - Nic Eggert +nemccarthy - Nathan McCarthy +nishkamravi2 - Nishkam Ravi +noel-smith - Noel Smith nooberfsh - nooberfsh +npoggi - Nicolas Poggi +nxwhite-str - Nate Crosswhite +nyaapa - Arsenii Krasikov +odedz - Oded Zimerman oleg-smith - Oleg Kuznetsov ozancicek - Ozan Cicekci pengbo - Peng Bo +petermaxlee - Peter Lee +petz2000 - Patrick Baier +pgandhi999 - Parth Gandhi +phalodi - Sandeep Purohit +phatak-dev - Madhukara Phatak +pkch - pkch planga82 - Pablo Langa Blanco +pparkkin - Paavo Parkkinen +prabeesh - Prabeesh K praneetsharma - Praneet Sharma +priyankagargnitk - Priyanka Garg ptkool - Michael Styles qb-tarushg - Tarush Grover +qiansl127 - Shilei Qian +rahulpalamuttam - Rahul Palamuttam +rakeshchalasani - Rakesh Chalasani +ravipesala - Ravindra Pesala redsanket - Sanket Reddy redsk - Nicola Bova -roland1982 - roland1982 +rekhajoshm - Rekha Joshi +rimolive - Ricardo Martinelli De Oliveira +roland1982 - Roland Pogonyi rongma1997 - Rong Ma +rowan000 - Rowan Chattaway +roxchkplusony - Victor Tso rrusso2007 - Rob Russo +sadhen - Darcy Shen samsetegne - Samuel L. Setegne +sandeep-katta - Sandeep Katta sangramga - Sangram Gaikwad sarthfrey - Sarth Frey +sarutak - Kousuke Saruta +scwf - Wang Fei +seancxmao - Chenxiao Mao +seayi - Xiaohua Yi seayoun - Haiyang Yu +sel - Steve Larkin +sethah - Seth Hendrickson sev7e0 - Jiaqi Li -shahidki31 - Shahid +shahidki31 - Shahid K I sharangk - Sharanabasappa G Keriwaddi +sharkdtu - Xiaogang Tu sheepstop - Ting Yang +shenh062326 - Shen Hong +shimamoto - Takako Shimamoto +shimingfei - Shiming Fei shivsood - Shiv Prashant Sood +shivusondur - Shivakumar Sondur +sigmoidanalytics - Mayur Rustagi +sisihj - June He sitegui - Guilherme Souza +skonto - Stavros Kontopoulos slamke - Sun Ke +small-wang - Wang Wei southernriver - Liang Chen squito - Imran Rashid +stanzhai - Stan Zhai stczwd - Jackey Lee sujith71955 - Sujith Chacko +surq - Surong Quan suxingfate - Xinglong Wang -teeyog - teeyog +suyanNone - Su Yan +szheng79 - Shuai Zheng +tedyu - Ted Yu +teeyog - Yong Tian +texasmichelle - Michelle Casbon +tianyi - Yi Tian +tien-dungle - Tien-Dung Le +tigerquoll - Dale Richardson tinhto-000 - Tin Hang To tools4origins - tools4origins triplesheep - triplesheep +trystanleftwich - Trystan Leftwich turboFei - Fei Wang -ulysses-you - ulysses-you +ueshin - Takuya Ueshin +ulysses-you - Xiduo You +uncleGen - Uncle Gen uzadude - Ohad Raviv -wackxu - wackxu -wangjiaochun - wangjiaochun +uzmijnlm - Weizhe Huang +vinodkc - Vinod KC +viper-kun - Xu Kun +wackxu - Shiwei Xu +wangjiaochun - Jiaochun Wang wangshisan - wangshisan +wangxiaojing - Xiaojing Wang +watermen - Yadong Qi weixiuli - XiuLi Wei wenfang6 - wenfang6 wenxuanguan - wenxuanguan windpiger - Song Jun +witgo - Guoqiang Li woudygao - Woudy Gao +x1- - Yuri Saito xianyinxin - Xianyin Xin +xinyunh - Xinyun Huang +xuanyuanking - Yuanjian Li +xubo245 - Bo Xu +xuchenCN - Xu Chen +xueyumusic - Xue Yu +yanlin-Lynn - Yanlin Wang +yongtang - Yong Tang +ypcat - Pei-Lun Lee +yucai - Yucai Yu yunzoud - Yun Zou +zapletal-martin - Martin Zapletal zero323 - Maciej Szymkiewicz +zhangjiajin - Zhang JiaJin +zhengruifeng - Ruifeng Zheng +zhichao-li - Zhichao Li zjf2012 - Jiafu Zhang +zsxwing - Shixiong Zhu +zuotingbing - Tingbing Zuo +zuxqoj - Shekhar Bansal +zzcclp - Zhichao Zhang diff --git a/dev/create-release/release-build.sh b/dev/create-release/release-build.sh index eb972589a995e..31633456a6590 100755 --- a/dev/create-release/release-build.sh +++ b/dev/create-release/release-build.sh @@ -276,14 +276,14 @@ if [[ "$1" == "package" ]]; then # list of packages to be built, so it's ok for things to be missing in BINARY_PKGS_EXTRA. declare -A BINARY_PKGS_ARGS - BINARY_PKGS_ARGS["hadoop2.7"]="-Phadoop-2.7 $HIVE_PROFILES" + BINARY_PKGS_ARGS["hadoop3.2"]="-Phadoop-3.2 $HIVE_PROFILES" if ! is_dry_run; then BINARY_PKGS_ARGS["without-hadoop"]="-Phadoop-provided" if [[ $SPARK_VERSION < "3.0." ]]; then BINARY_PKGS_ARGS["hadoop2.6"]="-Phadoop-2.6 $HIVE_PROFILES" else BINARY_PKGS_ARGS["hadoop2.7-hive1.2"]="-Phadoop-2.7 -Phive-1.2 $HIVE_PROFILES" - BINARY_PKGS_ARGS["hadoop3.2"]="-Phadoop-3.2 $HIVE_PROFILES" + BINARY_PKGS_ARGS["hadoop2.7"]="-Phadoop-2.7 $HIVE_PROFILES" fi fi diff --git a/dev/run-tests.py b/dev/run-tests.py index 5255a77ec2081..ec04c37857d96 100755 --- a/dev/run-tests.py +++ b/dev/run-tests.py @@ -574,7 +574,7 @@ def main(): # if we're on the Amplab Jenkins build servers setup variables # to reflect the environment settings build_tool = os.environ.get("AMPLAB_JENKINS_BUILD_TOOL", "sbt") - hadoop_version = os.environ.get("AMPLAB_JENKINS_BUILD_PROFILE", "hadoop2.7") + hadoop_version = os.environ.get("AMPLAB_JENKINS_BUILD_PROFILE", "hadoop3.2") hive_version = os.environ.get("AMPLAB_JENKINS_BUILD_HIVE_PROFILE", "hive2.3") test_env = "amplab_jenkins" # add path for Python3 in Jenkins if we're calling from a Jenkins machine diff --git a/docs/cloud-integration.md b/docs/cloud-integration.md index b2a3e77f1ee9d..01f437f38ef17 100644 --- a/docs/cloud-integration.md +++ b/docs/cloud-integration.md @@ -257,5 +257,5 @@ Here is the documentation on the standard connectors both from Apache and the cl * [Amazon EMR File System (EMRFS)](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-fs.html). From Amazon * [Google Cloud Storage Connector for Spark and Hadoop](https://cloud.google.com/hadoop/google-cloud-storage-connector). From Google * [The Azure Blob Filesystem driver (ABFS)](https://docs.microsoft.com/en-us/azure/storage/blobs/data-lake-storage-abfs-driver) -* IBM Cloud Object Storage connector for Apache Spark: [Stocator](https://github.com/CODAIT/stocator), [IBM Object Storage](https://www.ibm.com/cloud/object-storage), [how-to-use-connector](https://developer.ibm.com/code/2018/08/16/installing-running-stocator-apache-spark-ibm-cloud-object-storage). From IBM +* IBM Cloud Object Storage connector for Apache Spark: [Stocator](https://github.com/CODAIT/stocator), [IBM Object Storage](https://www.ibm.com/cloud/object-storage). From IBM diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala index 217398c51b393..1659bbb1d34b3 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala @@ -36,7 +36,7 @@ import org.apache.spark.ml.stat._ import org.apache.spark.ml.util._ import org.apache.spark.ml.util.Instrumentation.instrumented import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{Dataset, Row} +import org.apache.spark.sql._ import org.apache.spark.storage.StorageLevel /** Params for linear SVM Classifier. */ @@ -267,7 +267,26 @@ class LinearSVC @Since("2.2.0") ( if (featuresStd(i) != 0.0) rawCoefficients(i) / featuresStd(i) else 0.0 } val intercept = if ($(fitIntercept)) rawCoefficients.last else 0.0 - copyValues(new LinearSVCModel(uid, Vectors.dense(coefficientArray), intercept)) + createModel(dataset, Vectors.dense(coefficientArray), intercept, objectiveHistory) + } + + private def createModel( + dataset: Dataset[_], + coefficients: Vector, + intercept: Double, + objectiveHistory: Array[Double]): LinearSVCModel = { + val model = copyValues(new LinearSVCModel(uid, coefficients, intercept)) + val weightColName = if (!isDefined(weightCol)) "weightCol" else $(weightCol) + + val (summaryModel, rawPredictionColName, predictionColName) = model.findSummaryModel() + val summary = new LinearSVCTrainingSummaryImpl( + summaryModel.transform(dataset), + rawPredictionColName, + predictionColName, + $(labelCol), + weightColName, + objectiveHistory) + model.setSummary(Some(summary)) } private def trainOnRows( @@ -352,7 +371,7 @@ class LinearSVCModel private[classification] ( @Since("2.2.0") val coefficients: Vector, @Since("2.2.0") val intercept: Double) extends ClassificationModel[Vector, LinearSVCModel] - with LinearSVCParams with MLWritable { + with LinearSVCParams with MLWritable with HasTrainingSummary[LinearSVCTrainingSummary] { @Since("2.2.0") override val numClasses: Int = 2 @@ -368,6 +387,48 @@ class LinearSVCModel private[classification] ( BLAS.dot(features, coefficients) + intercept } + /** + * Gets summary of model on training set. An exception is thrown + * if `hasSummary` is false. + */ + @Since("3.1.0") + override def summary: LinearSVCTrainingSummary = super.summary + + /** + * If the rawPrediction and prediction columns are set, this method returns the current model, + * otherwise it generates new columns for them and sets them as columns on a new copy of + * the current model + */ + private[classification] def findSummaryModel(): (LinearSVCModel, String, String) = { + val model = if ($(rawPredictionCol).isEmpty && $(predictionCol).isEmpty) { + copy(ParamMap.empty) + .setRawPredictionCol("rawPrediction_" + java.util.UUID.randomUUID.toString) + .setPredictionCol("prediction_" + java.util.UUID.randomUUID.toString) + } else if ($(rawPredictionCol).isEmpty) { + copy(ParamMap.empty).setRawPredictionCol("rawPrediction_" + + java.util.UUID.randomUUID.toString) + } else if ($(predictionCol).isEmpty) { + copy(ParamMap.empty).setPredictionCol("prediction_" + java.util.UUID.randomUUID.toString) + } else { + this + } + (model, model.getRawPredictionCol, model.getPredictionCol) + } + + /** + * Evaluates the model on a test dataset. + * + * @param dataset Test dataset to evaluate model on. + */ + @Since("3.1.0") + def evaluate(dataset: Dataset[_]): LinearSVCSummary = { + val weightColName = if (!isDefined(weightCol)) "weightCol" else $(weightCol) + // Handle possible missing or invalid rawPrediction or prediction columns + val (summaryModel, rawPrediction, predictionColName) = findSummaryModel() + new LinearSVCSummaryImpl(summaryModel.transform(dataset), + rawPrediction, predictionColName, $(labelCol), weightColName) + } + override def predict(features: Vector): Double = { if (margin(features) > $(threshold)) 1.0 else 0.0 } @@ -439,3 +500,53 @@ object LinearSVCModel extends MLReadable[LinearSVCModel] { } } } + +/** + * Abstraction for LinearSVC results for a given model. + */ +sealed trait LinearSVCSummary extends BinaryClassificationSummary + +/** + * Abstraction for LinearSVC training results. + */ +sealed trait LinearSVCTrainingSummary extends LinearSVCSummary with TrainingSummary + +/** + * LinearSVC results for a given model. + * + * @param predictions dataframe output by the model's `transform` method. + * @param scoreCol field in "predictions" which gives the rawPrediction of each instance. + * @param predictionCol field in "predictions" which gives the prediction for a data instance as a + * double. + * @param labelCol field in "predictions" which gives the true label of each instance. + * @param weightCol field in "predictions" which gives the weight of each instance. + */ +private class LinearSVCSummaryImpl( + @transient override val predictions: DataFrame, + override val scoreCol: String, + override val predictionCol: String, + override val labelCol: String, + override val weightCol: String) + extends LinearSVCSummary + +/** + * LinearSVC training results. + * + * @param predictions dataframe output by the model's `transform` method. + * @param scoreCol field in "predictions" which gives the rawPrediction of each instance. + * @param predictionCol field in "predictions" which gives the prediction for a data instance as a + * double. + * @param labelCol field in "predictions" which gives the true label of each instance. + * @param weightCol field in "predictions" which gives the weight of each instance. + * @param objectiveHistory objective function (scaled loss + regularization) at each iteration. + */ +private class LinearSVCTrainingSummaryImpl( + predictions: DataFrame, + scoreCol: String, + predictionCol: String, + labelCol: String, + weightCol: String, + override val objectiveHistory: Array[Double]) + extends LinearSVCSummaryImpl( + predictions, scoreCol, predictionCol, labelCol, weightCol) + with LinearSVCTrainingSummary diff --git a/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala b/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala index e50d4255b1f37..f1a68edaed950 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala @@ -244,9 +244,9 @@ class FPGrowthModel private[ml] ( @transient private var _cachedRules: DataFrame = _ /** - * Get association rules fitted using the minConfidence. Returns a dataframe with four fields, - * "antecedent", "consequent", "confidence" and "lift", where "antecedent" and "consequent" are - * Array[T], whereas "confidence" and "lift" are Double. + * Get association rules fitted using the minConfidence. Returns a dataframe with five fields, + * "antecedent", "consequent", "confidence", "lift" and "support", where "antecedent" and + * "consequent" are Array[T], whereas "confidence", "lift" and "support" are Double. */ @Since("2.2.0") @transient def associationRules: DataFrame = { @@ -254,7 +254,8 @@ class FPGrowthModel private[ml] ( _cachedRules } else { _cachedRules = AssociationRules - .getAssociationRulesFromFP(freqItemsets, "items", "freq", $(minConfidence), itemSupport) + .getAssociationRulesFromFP(freqItemsets, "items", "freq", $(minConfidence), itemSupport, + numTrainingRecords) _cachedMinConf = $(minConfidence) _cachedRules } @@ -385,6 +386,7 @@ private[fpm] object AssociationRules { * @param freqCol column name for appearance count of the frequent itemsets * @param minConfidence minimum confidence for generating the association rules * @param itemSupport map containing an item and its support + * @param numTrainingRecords count of training Dataset * @return a DataFrame("antecedent"[Array], "consequent"[Array], "confidence"[Double], * "lift" [Double]) containing the association rules. */ @@ -393,21 +395,23 @@ private[fpm] object AssociationRules { itemsCol: String, freqCol: String, minConfidence: Double, - itemSupport: scala.collection.Map[T, Double]): DataFrame = { - + itemSupport: scala.collection.Map[T, Double], + numTrainingRecords: Long): DataFrame = { val freqItemSetRdd = dataset.select(itemsCol, freqCol).rdd .map(row => new FreqItemset(row.getSeq[T](0).toArray, row.getLong(1))) val rows = new MLlibAssociationRules() .setMinConfidence(minConfidence) .run(freqItemSetRdd, itemSupport) - .map(r => Row(r.antecedent, r.consequent, r.confidence, r.lift.orNull)) + .map(r => Row(r.antecedent, r.consequent, r.confidence, r.lift.orNull, + r.freqUnion / numTrainingRecords)) val dt = dataset.schema(itemsCol).dataType val schema = StructType(Seq( StructField("antecedent", dt, nullable = false), StructField("consequent", dt, nullable = false), StructField("confidence", DoubleType, nullable = false), - StructField("lift", DoubleType))) + StructField("lift", DoubleType), + StructField("support", DoubleType, nullable = false))) val rules = dataset.sparkSession.createDataFrame(rows, schema) rules } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/AssociationRules.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/AssociationRules.scala index 43d256bbc46c3..601c7da30ffed 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/AssociationRules.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/AssociationRules.scala @@ -124,7 +124,7 @@ object AssociationRules { class Rule[Item] private[fpm] ( @Since("1.5.0") val antecedent: Array[Item], @Since("1.5.0") val consequent: Array[Item], - freqUnion: Double, + private[spark] val freqUnion: Double, freqAntecedent: Double, freqConsequent: Option[Double]) extends Serializable { diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala index 579d6b12ab99f..a66397324c1a6 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala @@ -30,7 +30,7 @@ import org.apache.spark.ml.param.ParamsSuite import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest, MLTestingUtils} import org.apache.spark.ml.util.TestingUtils._ import org.apache.spark.sql.{Dataset, Row} -import org.apache.spark.sql.functions.udf +import org.apache.spark.sql.functions._ class LinearSVCSuite extends MLTest with DefaultReadWriteTest { @@ -284,6 +284,57 @@ class LinearSVCSuite extends MLTest with DefaultReadWriteTest { assert(model1.coefficients ~== coefficientsSK relTol 4E-3) } + test("summary and training summary") { + val lsvc = new LinearSVC() + val model = lsvc.setMaxIter(5).fit(smallBinaryDataset) + + val summary = model.evaluate(smallBinaryDataset) + + assert(model.summary.accuracy === summary.accuracy) + assert(model.summary.weightedPrecision === summary.weightedPrecision) + assert(model.summary.weightedRecall === summary.weightedRecall) + assert(model.summary.pr.collect() === summary.pr.collect()) + assert(model.summary.roc.collect() === summary.roc.collect()) + assert(model.summary.areaUnderROC === summary.areaUnderROC) + + // verify instance weight works + val lsvc2 = new LinearSVC() + .setMaxIter(5) + .setWeightCol("weight") + + val smallBinaryDatasetWithWeight = + smallBinaryDataset.select(col("label"), col("features"), lit(2.5).as("weight")) + + val summary2 = model.evaluate(smallBinaryDatasetWithWeight) + + val model2 = lsvc2.fit(smallBinaryDatasetWithWeight) + assert(model2.summary.accuracy === summary2.accuracy) + assert(model2.summary.weightedPrecision ~== summary2.weightedPrecision relTol 1e-6) + assert(model2.summary.weightedRecall === summary2.weightedRecall) + assert(model2.summary.pr.collect() === summary2.pr.collect()) + assert(model2.summary.roc.collect() === summary2.roc.collect()) + assert(model2.summary.areaUnderROC === summary2.areaUnderROC) + + assert(model2.summary.accuracy === model.summary.accuracy) + assert(model2.summary.weightedPrecision ~== model.summary.weightedPrecision relTol 1e-6) + assert(model2.summary.weightedRecall === model.summary.weightedRecall) + assert(model2.summary.pr.collect() === model.summary.pr.collect()) + assert(model2.summary.roc.collect() === model.summary.roc.collect()) + assert(model2.summary.areaUnderROC === model.summary.areaUnderROC) + } + + test("linearSVC training summary totalIterations") { + Seq(1, 5, 10, 20, 100).foreach { maxIter => + val trainer = new LinearSVC().setMaxIter(maxIter) + val model = trainer.fit(smallBinaryDataset) + if (maxIter == 1) { + assert(model.summary.totalIterations === maxIter) + } else { + assert(model.summary.totalIterations <= maxIter) + } + } + } + test("read/write: SVM") { def checkModelData(model: LinearSVCModel, model2: LinearSVCModel): Unit = { assert(model.intercept === model2.intercept) diff --git a/mllib/src/test/scala/org/apache/spark/ml/fpm/FPGrowthSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/fpm/FPGrowthSuite.scala index b75526a48371a..d42ced0f8f91b 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/fpm/FPGrowthSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/fpm/FPGrowthSuite.scala @@ -39,9 +39,9 @@ class FPGrowthSuite extends SparkFunSuite with MLlibTestSparkContext with Defaul val model = new FPGrowth().setMinSupport(0.5).fit(data) val generatedRules = model.setMinConfidence(0.5).associationRules val expectedRules = spark.createDataFrame(Seq( - (Array("2"), Array("1"), 1.0, 1.0), - (Array("1"), Array("2"), 0.75, 1.0) - )).toDF("antecedent", "consequent", "confidence", "lift") + (Array("2"), Array("1"), 1.0, 1.0, 0.75), + (Array("1"), Array("2"), 0.75, 1.0, 0.75) + )).toDF("antecedent", "consequent", "confidence", "lift", "support") .withColumn("antecedent", col("antecedent").cast(ArrayType(dt))) .withColumn("consequent", col("consequent").cast(ArrayType(dt))) assert(expectedRules.sort("antecedent").rdd.collect().sameElements( @@ -61,6 +61,31 @@ class FPGrowthSuite extends SparkFunSuite with MLlibTestSparkContext with Defaul } } + test("FPGrowth associationRules") { + val dataset = spark.createDataFrame(Seq( + (1, Array("1", "2")), + (2, Array("3")), + (3, Array("4", "5")), + (4, Array("1", "2", "3")), + (5, Array("2")) + )).toDF("id", "items") + val model = new FPGrowth().setMinSupport(0.1).setMinConfidence(0.1).fit(dataset) + val expectedRules = spark.createDataFrame(Seq( + (Array("2"), Array("1"), 0.6666666666666666, 1.6666666666666665, 0.4), + (Array("2"), Array("3"), 0.3333333333333333, 0.8333333333333333, 0.2), + (Array("3"), Array("1"), 0.5, 1.25, 0.2), + (Array("3"), Array("2"), 0.5, 0.8333333333333334, 0.2), + (Array("1", "3"), Array("2"), 1.0, 1.6666666666666667, 0.2), + (Array("1", "2"), Array("3"), 0.5, 1.25, 0.2), + (Array("4"), Array("5"), 1.0, 5.0, 0.2), + (Array("5"), Array("4"), 1.0, 5.0, 0.2), + (Array("1"), Array("3"), 0.5, 1.25, 0.2), + (Array("1"), Array("2"), 1.0, 1.6666666666666667, 0.4), + (Array("3", "2"), Array("1"), 1.0, 2.5, 0.2) + )).toDF("antecedent", "consequent", "confidence", "lift", "support") + assert(expectedRules.collect().toSet.equals(model.associationRules.collect().toSet)) + } + test("FPGrowth getFreqItems") { val model = new FPGrowth().setMinSupport(0.7).fit(dataset) val expectedFreq = spark.createDataFrame(Seq( diff --git a/pom.xml b/pom.xml index 82c12ae3dcb80..08ca13bfe9d37 100644 --- a/pom.xml +++ b/pom.xml @@ -119,11 +119,11 @@ spark 1.7.30 1.2.17 - 2.7.4 + 3.2.0 2.5.0 ${hadoop.version} 3.4.14 - 2.7.1 + 2.13.0 org.apache.hive core @@ -170,7 +170,7 @@ 1.1.7.5 1.1.2 1.10 - 2.4 + 2.5 2.6 @@ -3054,16 +3054,16 @@ hadoop-2.7 - + + 2.7.4 + 2.7.1 + 2.4 + hadoop-3.2 - - 3.2.0 - 2.13.0 - 2.5 - + diff --git a/python/pyspark/ml/classification.py b/python/pyspark/ml/classification.py index ff506066519cd..bdd37c99df0a8 100644 --- a/python/pyspark/ml/classification.py +++ b/python/pyspark/ml/classification.py @@ -39,6 +39,7 @@ from pyspark.storagelevel import StorageLevel __all__ = ['LinearSVC', 'LinearSVCModel', + 'LinearSVCSummary', 'LinearSVCTrainingSummary', 'LogisticRegression', 'LogisticRegressionModel', 'LogisticRegressionSummary', 'LogisticRegressionTrainingSummary', 'BinaryLogisticRegressionSummary', 'BinaryLogisticRegressionTrainingSummary', @@ -683,7 +684,8 @@ def setBlockSize(self, value): return self._set(blockSize=value) -class LinearSVCModel(_JavaClassificationModel, _LinearSVCParams, JavaMLWritable, JavaMLReadable): +class LinearSVCModel(_JavaClassificationModel, _LinearSVCParams, JavaMLWritable, JavaMLReadable, + HasTrainingSummary): """ Model fitted by LinearSVC. @@ -713,6 +715,50 @@ def intercept(self): """ return self._call_java("intercept") + @since("3.1.0") + def summary(self): + """ + Gets summary (e.g. accuracy/precision/recall, objective history, total iterations) of model + trained on the training set. An exception is thrown if `trainingSummary is None`. + """ + if self.hasSummary: + return LinearSVCTrainingSummary(super(LinearSVCModel, self).summary) + else: + raise RuntimeError("No training summary available for this %s" % + self.__class__.__name__) + + @since("3.1.0") + def evaluate(self, dataset): + """ + Evaluates the model on a test dataset. + + :param dataset: + Test dataset to evaluate model on, where dataset is an + instance of :py:class:`pyspark.sql.DataFrame` + """ + if not isinstance(dataset, DataFrame): + raise ValueError("dataset must be a DataFrame but got %s." % type(dataset)) + java_lsvc_summary = self._call_java("evaluate", dataset) + return LinearSVCSummary(java_lsvc_summary) + + +class LinearSVCSummary(_BinaryClassificationSummary): + """ + Abstraction for LinearSVC Results for a given model. + .. versionadded:: 3.1.0 + """ + pass + + +@inherit_doc +class LinearSVCTrainingSummary(LinearSVCSummary, _TrainingSummary): + """ + Abstraction for LinearSVC Training results. + + .. versionadded:: 3.1.0 + """ + pass + class _LogisticRegressionParams(_ProbabilisticClassifierParams, HasRegParam, HasElasticNetParam, HasMaxIter, HasFitIntercept, HasTol, diff --git a/python/pyspark/ml/fpm.py b/python/pyspark/ml/fpm.py index 7d933daf9e032..7a5591f3fbf76 100644 --- a/python/pyspark/ml/fpm.py +++ b/python/pyspark/ml/fpm.py @@ -180,15 +180,15 @@ class FPGrowth(JavaEstimator, _FPGrowthParams, JavaMLWritable, JavaMLReadable): only showing top 5 rows ... >>> fpm.associationRules.show(5) - +----------+----------+----------+----+ - |antecedent|consequent|confidence|lift| - +----------+----------+----------+----+ - | [t, s]| [y]| 1.0| 2.0| - | [t, s]| [x]| 1.0| 1.5| - | [t, s]| [z]| 1.0| 1.2| - | [p]| [r]| 1.0| 2.0| - | [p]| [z]| 1.0| 1.2| - +----------+----------+----------+----+ + +----------+----------+----------+----+------------------+ + |antecedent|consequent|confidence|lift| support| + +----------+----------+----------+----+------------------+ + | [t, s]| [y]| 1.0| 2.0|0.3333333333333333| + | [t, s]| [x]| 1.0| 1.5|0.3333333333333333| + | [t, s]| [z]| 1.0| 1.2|0.3333333333333333| + | [p]| [r]| 1.0| 2.0|0.3333333333333333| + | [p]| [z]| 1.0| 1.2|0.3333333333333333| + +----------+----------+----------+----+------------------+ only showing top 5 rows ... >>> new_data = spark.createDataFrame([(["t", "s"], )], ["items"]) diff --git a/python/pyspark/ml/tests/test_algorithms.py b/python/pyspark/ml/tests/test_algorithms.py index 2faf2d98f0271..c948bd0c646de 100644 --- a/python/pyspark/ml/tests/test_algorithms.py +++ b/python/pyspark/ml/tests/test_algorithms.py @@ -226,8 +226,8 @@ def test_association_rules(self): fpm = fp.fit(self.data) expected_association_rules = self.spark.createDataFrame( - [([3], [1], 1.0, 1.0), ([2], [1], 1.0, 1.0)], - ["antecedent", "consequent", "confidence", "lift"] + [([3], [1], 1.0, 1.0, 0.5), ([2], [1], 1.0, 1.0, 0.75)], + ["antecedent", "consequent", "confidence", "lift", "support"] ) actual_association_rules = fpm.associationRules diff --git a/python/pyspark/ml/tests/test_training_summary.py b/python/pyspark/ml/tests/test_training_summary.py index ac944d8397a86..19acd194f4ddf 100644 --- a/python/pyspark/ml/tests/test_training_summary.py +++ b/python/pyspark/ml/tests/test_training_summary.py @@ -21,8 +21,8 @@ if sys.version > '3': basestring = str -from pyspark.ml.classification import BinaryLogisticRegressionSummary, LogisticRegression, \ - LogisticRegressionSummary +from pyspark.ml.classification import BinaryLogisticRegressionSummary, LinearSVC, \ + LinearSVCSummary, LogisticRegression, LogisticRegressionSummary from pyspark.ml.clustering import BisectingKMeans, GaussianMixture, KMeans from pyspark.ml.linalg import Vectors from pyspark.ml.regression import GeneralizedLinearRegression, LinearRegression @@ -193,6 +193,48 @@ def test_multiclass_logistic_regression_summary(self): self.assertFalse(isinstance(sameSummary, BinaryLogisticRegressionSummary)) self.assertAlmostEqual(sameSummary.accuracy, s.accuracy) + def test_linear_svc_summary(self): + df = self.spark.createDataFrame([(1.0, 2.0, Vectors.dense(1.0, 1.0, 1.0)), + (0.0, 2.0, Vectors.dense(1.0, 2.0, 3.0))], + ["label", "weight", "features"]) + svc = LinearSVC(maxIter=5, weightCol="weight") + model = svc.fit(df) + self.assertTrue(model.hasSummary) + s = model.summary() + # test that api is callable and returns expected types + self.assertTrue(isinstance(s.predictions, DataFrame)) + self.assertEqual(s.scoreCol, "rawPrediction") + self.assertEqual(s.labelCol, "label") + self.assertEqual(s.predictionCol, "prediction") + objHist = s.objectiveHistory + self.assertTrue(isinstance(objHist, list) and isinstance(objHist[0], float)) + self.assertGreater(s.totalIterations, 0) + self.assertTrue(isinstance(s.labels, list)) + self.assertTrue(isinstance(s.truePositiveRateByLabel, list)) + self.assertTrue(isinstance(s.falsePositiveRateByLabel, list)) + self.assertTrue(isinstance(s.precisionByLabel, list)) + self.assertTrue(isinstance(s.recallByLabel, list)) + self.assertTrue(isinstance(s.fMeasureByLabel(), list)) + self.assertTrue(isinstance(s.fMeasureByLabel(1.0), list)) + self.assertTrue(isinstance(s.roc, DataFrame)) + self.assertAlmostEqual(s.areaUnderROC, 1.0, 2) + self.assertTrue(isinstance(s.pr, DataFrame)) + self.assertTrue(isinstance(s.fMeasureByThreshold, DataFrame)) + self.assertTrue(isinstance(s.precisionByThreshold, DataFrame)) + self.assertTrue(isinstance(s.recallByThreshold, DataFrame)) + print(s.weightedTruePositiveRate) + self.assertAlmostEqual(s.weightedTruePositiveRate, 0.5, 2) + self.assertAlmostEqual(s.weightedFalsePositiveRate, 0.5, 2) + self.assertAlmostEqual(s.weightedRecall, 0.5, 2) + self.assertAlmostEqual(s.weightedPrecision, 0.25, 2) + self.assertAlmostEqual(s.weightedFMeasure(), 0.3333333333333333, 2) + self.assertAlmostEqual(s.weightedFMeasure(1.0), 0.3333333333333333, 2) + # test evaluation (with training dataset) produces a summary with same values + # one check is enough to verify a summary is returned, Scala version runs full test + sameSummary = model.evaluate(df) + self.assertTrue(isinstance(sameSummary, LinearSVCSummary)) + self.assertAlmostEqual(sameSummary.areaUnderROC, s.areaUnderROC) + def test_gaussian_mixture_summary(self): data = [(Vectors.dense(1.0),), (Vectors.dense(5.0),), (Vectors.dense(10.0),), (Vectors.sparse(1, [], []),)] diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 0c8c34dd87996..b0498d0298785 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -1431,9 +1431,11 @@ def to_utc_timestamp(timestamp, tz): def timestamp_seconds(col): """ >>> from pyspark.sql.functions import timestamp_seconds + >>> spark.conf.set("spark.sql.session.timeZone", "America/Los_Angeles") >>> time_df = spark.createDataFrame([(1230219000,)], ['unix_time']) >>> time_df.select(timestamp_seconds(time_df.unix_time).alias('ts')).collect() [Row(ts=datetime.datetime(2008, 12, 25, 7, 30))] + >>> spark.conf.unset("spark.sql.session.timeZone") """ sc = SparkContext._active_spark_context diff --git a/resource-managers/kubernetes/integration-tests/pom.xml b/resource-managers/kubernetes/integration-tests/pom.xml index 503540403f5ec..d1e00cc0b5b10 100644 --- a/resource-managers/kubernetes/integration-tests/pom.xml +++ b/resource-managers/kubernetes/integration-tests/pom.xml @@ -186,9 +186,6 @@ hadoop-2.7 - - true - com.amazonaws @@ -200,6 +197,9 @@ hadoop-3.2 + + true + com.amazonaws diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 79a8380826ab3..039fd9382000a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, RangePartitioning, RoundRobinPartitioning} import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.connector.catalog.Identifier +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.util.random.RandomSampler @@ -953,16 +954,18 @@ case class Repartition(numPartitions: Int, shuffle: Boolean, child: LogicalPlan) } /** - * This method repartitions data using [[Expression]]s into `numPartitions`, and receives + * This method repartitions data using [[Expression]]s into `optNumPartitions`, and receives * information about the number of partitions during execution. Used when a specific ordering or * distribution is expected by the consumer of the query result. Use [[Repartition]] for RDD-like - * `coalesce` and `repartition`. + * `coalesce` and `repartition`. If no `optNumPartitions` is given, by default it partitions data + * into `numShufflePartitions` defined in `SQLConf`, and could be coalesced by AQE. */ case class RepartitionByExpression( partitionExpressions: Seq[Expression], child: LogicalPlan, - numPartitions: Int) extends RepartitionOperation { + optNumPartitions: Option[Int]) extends RepartitionOperation { + val numPartitions = optNumPartitions.getOrElse(SQLConf.get.numShufflePartitions) require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.") val partitioning: Partitioning = { @@ -990,6 +993,15 @@ case class RepartitionByExpression( override def shuffle: Boolean = true } +object RepartitionByExpression { + def apply( + partitionExpressions: Seq[Expression], + child: LogicalPlan, + numPartitions: Int): RepartitionByExpression = { + RepartitionByExpression(partitionExpressions, child, Some(numPartitions)) + } +} + /** * A relation with one row. This is used in "SELECT ..." without a from clause. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UserDefinedType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UserDefinedType.scala index 6af16e2dba105..592ce03606d4b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UserDefinedType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UserDefinedType.scala @@ -90,7 +90,7 @@ abstract class UserDefinedType[UserType >: Null] extends DataType with Serializa override def hashCode(): Int = getClass.hashCode() override def equals(other: Any): Boolean = other match { - case that: UserDefinedType[_] => this.acceptsType(that) + case that: UserDefinedType[_] => this.getClass == that.getClass case _ => false } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/StringExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/StringExpressionsSuite.scala index f18364d844ce1..967ccc42c632d 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/StringExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/StringExpressionsSuite.scala @@ -236,6 +236,10 @@ class StringExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { "xample", row) + // Substring with from negative position with negative length + checkEvaluation(Substring(s, Literal.create(-1207959552, IntegerType), + Literal.create(-1207959552, IntegerType)), "", row) + val s_notNull = 'a.string.notNull.at(0) assert(Substring(s, Literal.create(0, IntegerType), Literal.create(2, IntegerType)).nullable) diff --git a/sql/core/benchmarks/IntervalBenchmark-jdk11-results.txt b/sql/core/benchmarks/IntervalBenchmark-jdk11-results.txt index 8958d7c53413f..70a64931049c0 100644 --- a/sql/core/benchmarks/IntervalBenchmark-jdk11-results.txt +++ b/sql/core/benchmarks/IntervalBenchmark-jdk11-results.txt @@ -1,29 +1,40 @@ -Java HotSpot(TM) 64-Bit Server VM 11.0.5+10-LTS on Mac OS X 10.15.3 -Intel(R) Core(TM) i9-9980HK CPU @ 2.40GHz +OpenJDK 64-Bit Server VM 11.0.7+10-post-Ubuntu-2ubuntu218.04 on Linux 4.15.0-1063-aws +Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz cast strings to intervals: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -prepare string w/ interval 448 469 20 2.2 447.6 1.0X -prepare string w/o interval 405 409 4 2.5 404.6 1.1X -1 units w/ interval 321 328 6 3.1 321.4 1.4X -1 units w/o interval 303 307 4 3.3 303.1 1.5X -2 units w/ interval 445 458 12 2.2 444.6 1.0X -2 units w/o interval 416 424 10 2.4 416.2 1.1X -3 units w/ interval 1006 1012 8 1.0 1006.4 0.4X -3 units w/o interval 1240 1249 8 0.8 1239.6 0.4X -4 units w/ interval 1295 1418 106 0.8 1295.4 0.3X -4 units w/o interval 1172 1188 15 0.9 1171.6 0.4X -5 units w/ interval 1326 1335 11 0.8 1325.6 0.3X -5 units w/o interval 1309 1336 44 0.8 1308.7 0.3X -6 units w/ interval 1441 1464 29 0.7 1441.0 0.3X -6 units w/o interval 1350 1369 17 0.7 1350.1 0.3X -7 units w/ interval 1606 1669 99 0.6 1605.6 0.3X -7 units w/o interval 1546 1557 12 0.6 1546.3 0.3X -8 units w/ interval 1771 1875 120 0.6 1770.6 0.3X -8 units w/o interval 1775 1789 13 0.6 1775.2 0.3X -9 units w/ interval 2126 2757 849 0.5 2126.4 0.2X -9 units w/o interval 2053 2070 21 0.5 2053.3 0.2X -10 units w/ interval 2209 2243 30 0.5 2209.1 0.2X -10 units w/o interval 2400 2702 365 0.4 2400.2 0.2X -11 units w/ interval 2616 2699 72 0.4 2616.5 0.2X -11 units w/o interval 3218 3380 195 0.3 3218.4 0.1X +prepare string w/ interval 708 829 110 1.4 708.0 1.0X +prepare string w/o interval 660 672 14 1.5 660.3 1.1X +1 units w/ interval 514 543 33 1.9 514.2 1.4X +1 units w/o interval 476 492 20 2.1 475.9 1.5X +2 units w/ interval 751 767 14 1.3 751.0 0.9X +2 units w/o interval 709 716 11 1.4 709.0 1.0X +3 units w/ interval 1541 1551 15 0.6 1540.9 0.5X +3 units w/o interval 1531 1532 1 0.7 1531.5 0.5X +4 units w/ interval 1764 1768 5 0.6 1763.5 0.4X +4 units w/o interval 1737 1745 8 0.6 1736.6 0.4X +5 units w/ interval 1920 1930 10 0.5 1919.7 0.4X +5 units w/o interval 1928 1936 11 0.5 1927.9 0.4X +6 units w/ interval 2124 2127 4 0.5 2124.2 0.3X +6 units w/o interval 2124 2125 1 0.5 2123.7 0.3X +7 units w/ interval 2525 2541 15 0.4 2525.5 0.3X +7 units w/o interval 2512 2518 11 0.4 2511.5 0.3X +8 units w/ interval 2578 2597 19 0.4 2578.1 0.3X +8 units w/o interval 2558 2562 6 0.4 2558.1 0.3X +9 units w/ interval 2742 2750 9 0.4 2741.8 0.3X +9 units w/o interval 2752 2762 11 0.4 2751.8 0.3X +10 units w/ interval 3112 3123 10 0.3 3111.9 0.2X +10 units w/o interval 3116 3130 14 0.3 3115.7 0.2X +11 units w/ interval 3255 3273 20 0.3 3255.3 0.2X +11 units w/o interval 3294 3305 14 0.3 3293.6 0.2X + +OpenJDK 64-Bit Server VM 11.0.7+10-post-Ubuntu-2ubuntu218.04 on Linux 4.15.0-1063-aws +Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz +make_interval(): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +-------------------------------------------------------------------------------------------------------------------------- +prepare make_interval() 3395 3410 16 0.3 3395.0 1.0X +make_interval(0, 1, 2, 3, 4, 5, 50.123456) 94 102 9 10.7 93.8 36.2X +make_interval(*, *, 2, 3, 4, 5, 50.123456) 136 139 4 7.3 136.5 24.9X +make_interval(0, 1, *, *, 4, 5, 50.123456) 115 119 4 8.7 114.8 29.6X +make_interval(0, 1, 2, 3, *, *, *) 3359 3382 37 0.3 3358.7 1.0X +make_interval(*, *, *, *, *, *, *) 3382 3388 9 0.3 3382.3 1.0X diff --git a/sql/core/benchmarks/IntervalBenchmark-results.txt b/sql/core/benchmarks/IntervalBenchmark-results.txt index 48af333b78ba4..98b9f55c2e15e 100644 --- a/sql/core/benchmarks/IntervalBenchmark-results.txt +++ b/sql/core/benchmarks/IntervalBenchmark-results.txt @@ -1,29 +1,40 @@ -Java HotSpot(TM) 64-Bit Server VM 1.8.0_231-b11 on Mac OS X 10.15.3 -Intel(R) Core(TM) i9-9980HK CPU @ 2.40GHz +OpenJDK 64-Bit Server VM 1.8.0_252-8u252-b09-1~18.04-b09 on Linux 4.15.0-1063-aws +Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz cast strings to intervals: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -prepare string w/ interval 389 410 21 2.6 388.7 1.0X -prepare string w/o interval 340 360 18 2.9 340.5 1.1X -1 units w/ interval 378 389 16 2.6 377.8 1.0X -1 units w/o interval 346 350 5 2.9 346.2 1.1X -2 units w/ interval 444 457 11 2.3 444.2 0.9X -2 units w/o interval 455 464 12 2.2 455.1 0.9X -3 units w/ interval 942 964 20 1.1 941.5 0.4X -3 units w/o interval 927 1020 93 1.1 927.3 0.4X -4 units w/ interval 1114 1127 17 0.9 1113.9 0.3X -4 units w/o interval 1100 1105 4 0.9 1100.3 0.4X -5 units w/ interval 1180 1244 57 0.8 1180.1 0.3X -5 units w/o interval 1135 1141 6 0.9 1135.2 0.3X -6 units w/ interval 1284 1316 48 0.8 1284.0 0.3X -6 units w/o interval 1276 1357 122 0.8 1276.1 0.3X -7 units w/ interval 1609 1636 32 0.6 1609.1 0.2X -7 units w/o interval 1551 1578 36 0.6 1550.9 0.3X -8 units w/ interval 1787 1874 129 0.6 1787.1 0.2X -8 units w/o interval 1751 1767 15 0.6 1750.6 0.2X -9 units w/ interval 1960 2065 141 0.5 1959.7 0.2X -9 units w/o interval 1885 1908 39 0.5 1885.1 0.2X -10 units w/ interval 2178 2185 11 0.5 2177.9 0.2X -10 units w/o interval 2150 2255 164 0.5 2150.1 0.2X -11 units w/ interval 2457 2542 139 0.4 2456.7 0.2X -11 units w/o interval 2557 2770 188 0.4 2556.7 0.2X +prepare string w/ interval 677 718 40 1.5 677.2 1.0X +prepare string w/o interval 602 624 19 1.7 602.2 1.1X +1 units w/ interval 582 598 20 1.7 581.8 1.2X +1 units w/o interval 549 591 64 1.8 549.1 1.2X +2 units w/ interval 758 773 14 1.3 758.2 0.9X +2 units w/o interval 723 738 14 1.4 722.6 0.9X +3 units w/ interval 1442 1450 11 0.7 1441.8 0.5X +3 units w/o interval 1426 1429 3 0.7 1426.4 0.5X +4 units w/ interval 1645 1652 11 0.6 1645.1 0.4X +4 units w/o interval 1618 1626 10 0.6 1617.6 0.4X +5 units w/ interval 1794 1803 13 0.6 1794.4 0.4X +5 units w/o interval 1783 1793 9 0.6 1783.2 0.4X +6 units w/ interval 1976 1984 11 0.5 1976.2 0.3X +6 units w/o interval 1948 1959 10 0.5 1947.9 0.3X +7 units w/ interval 2394 2408 18 0.4 2393.7 0.3X +7 units w/o interval 2387 2392 8 0.4 2386.8 0.3X +8 units w/ interval 2578 2588 15 0.4 2577.5 0.3X +8 units w/o interval 2572 2578 5 0.4 2571.8 0.3X +9 units w/ interval 2812 2829 19 0.4 2811.7 0.2X +9 units w/o interval 2811 2816 4 0.4 2810.7 0.2X +10 units w/ interval 3108 3116 10 0.3 3107.8 0.2X +10 units w/o interval 3107 3109 3 0.3 3106.8 0.2X +11 units w/ interval 3386 3392 8 0.3 3386.3 0.2X +11 units w/o interval 3374 3377 4 0.3 3374.0 0.2X + +OpenJDK 64-Bit Server VM 1.8.0_252-8u252-b09-1~18.04-b09 on Linux 4.15.0-1063-aws +Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz +make_interval(): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +-------------------------------------------------------------------------------------------------------------------------- +prepare make_interval() 3634 3684 47 0.3 3634.1 1.0X +make_interval(0, 1, 2, 3, 4, 5, 50.123456) 90 100 12 11.1 90.0 40.4X +make_interval(*, *, 2, 3, 4, 5, 50.123456) 114 119 5 8.8 114.3 31.8X +make_interval(0, 1, *, *, 4, 5, 50.123456) 121 138 21 8.3 120.7 30.1X +make_interval(0, 1, 2, 3, *, *, *) 3615 3621 9 0.3 3614.7 1.0X +make_interval(*, *, *, *, *, *, *) 3638 3657 21 0.3 3637.7 1.0X diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 524e231eb7eb9..6f97121d88ede 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -2991,17 +2991,9 @@ class Dataset[T] private[sql]( Repartition(numPartitions, shuffle = true, logicalPlan) } - /** - * Returns a new Dataset partitioned by the given partitioning expressions into - * `numPartitions`. The resulting Dataset is hash partitioned. - * - * This is the same operation as "DISTRIBUTE BY" in SQL (Hive QL). - * - * @group typedrel - * @since 2.0.0 - */ - @scala.annotation.varargs - def repartition(numPartitions: Int, partitionExprs: Column*): Dataset[T] = { + private def repartitionByExpression( + numPartitions: Option[Int], + partitionExprs: Seq[Column]): Dataset[T] = { // The underlying `LogicalPlan` operator special-cases all-`SortOrder` arguments. // However, we don't want to complicate the semantics of this API method. // Instead, let's give users a friendly error message, pointing them to the new method. @@ -3015,6 +3007,20 @@ class Dataset[T] private[sql]( } } + /** + * Returns a new Dataset partitioned by the given partitioning expressions into + * `numPartitions`. The resulting Dataset is hash partitioned. + * + * This is the same operation as "DISTRIBUTE BY" in SQL (Hive QL). + * + * @group typedrel + * @since 2.0.0 + */ + @scala.annotation.varargs + def repartition(numPartitions: Int, partitionExprs: Column*): Dataset[T] = { + repartitionByExpression(Some(numPartitions), partitionExprs) + } + /** * Returns a new Dataset partitioned by the given partitioning expressions, using * `spark.sql.shuffle.partitions` as number of partitions. @@ -3027,7 +3033,20 @@ class Dataset[T] private[sql]( */ @scala.annotation.varargs def repartition(partitionExprs: Column*): Dataset[T] = { - repartition(sparkSession.sessionState.conf.numShufflePartitions, partitionExprs: _*) + repartitionByExpression(None, partitionExprs) + } + + private def repartitionByRange( + numPartitions: Option[Int], + partitionExprs: Seq[Column]): Dataset[T] = { + require(partitionExprs.nonEmpty, "At least one partition-by expression must be specified.") + val sortOrder: Seq[SortOrder] = partitionExprs.map(_.expr match { + case expr: SortOrder => expr + case expr: Expression => SortOrder(expr, Ascending) + }) + withTypedPlan { + RepartitionByExpression(sortOrder, logicalPlan, numPartitions) + } } /** @@ -3049,14 +3068,7 @@ class Dataset[T] private[sql]( */ @scala.annotation.varargs def repartitionByRange(numPartitions: Int, partitionExprs: Column*): Dataset[T] = { - require(partitionExprs.nonEmpty, "At least one partition-by expression must be specified.") - val sortOrder: Seq[SortOrder] = partitionExprs.map(_.expr match { - case expr: SortOrder => expr - case expr: Expression => SortOrder(expr, Ascending) - }) - withTypedPlan { - RepartitionByExpression(sortOrder, logicalPlan, numPartitions) - } + repartitionByRange(Some(numPartitions), partitionExprs) } /** @@ -3078,7 +3090,7 @@ class Dataset[T] private[sql]( */ @scala.annotation.varargs def repartitionByRange(partitionExprs: Column*): Dataset[T] = { - repartitionByRange(sparkSession.sessionState.conf.numShufflePartitions, partitionExprs: _*) + repartitionByRange(None, partitionExprs) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index bf60427e5f3bf..791e432269632 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -131,7 +131,7 @@ class QueryExecution( Option(InsertAdaptiveSparkPlan(AdaptiveExecutionContext(sparkSession, this)))) } - private def executePhase[T](phase: String)(block: => T): T = sparkSession.withActive { + protected def executePhase[T](phase: String)(block: => T): T = sparkSession.withActive { tracker.measurePhase(phase)(block) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala index 5936492dd819c..b5e9655a776b7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala @@ -191,7 +191,7 @@ class ShuffledRowRDD( sqlMetricsReporter) case PartialReducerPartitionSpec(reducerIndex, startMapIndex, endMapIndex, _) => - SparkEnv.get.shuffleManager.getReaderForRange( + SparkEnv.get.shuffleManager.getReader( dependency.shuffleHandle, startMapIndex, endMapIndex, @@ -201,7 +201,7 @@ class ShuffledRowRDD( sqlMetricsReporter) case PartialMapperPartitionSpec(mapIndex, startReducerIndex, endReducerIndex) => - SparkEnv.get.shuffleManager.getReaderForRange( + SparkEnv.get.shuffleManager.getReader( dependency.shuffleHandle, mapIndex, mapIndex + 1, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 4d23e5e8a65b5..3f339347ab4db 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -436,6 +436,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { val normalizedGroupingExpressions = groupingExpressions.map { e => NormalizeFloatingNumbers.normalize(e) match { case n: NamedExpression => n + // Keep the name of the original expression. case other => Alias(other, e.name)(exprId = e.exprId) } } @@ -460,7 +461,13 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { // because `distinctExpressions` is not extracted during logical phase. NormalizeFloatingNumbers.normalize(e) match { case ne: NamedExpression => ne - case other => Alias(other, other.toString)() + case other => + // Keep the name of the original expression. + val name = e match { + case ne: NamedExpression => ne.name + case _ => e.toString + } + Alias(other, name)() } } @@ -685,8 +692,9 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case r: logical.Range => execution.RangeExec(r) :: Nil case r: logical.RepartitionByExpression => + val canChangeNumParts = r.optNumPartitions.isEmpty exchange.ShuffleExchangeExec( - r.partitioning, planLater(r.child), canChangeNumPartitions = false) :: Nil + r.partitioning, planLater(r.child), canChangeNumParts) :: Nil case ExternalRDD(outputObjAttr, rdd) => ExternalRDDScanExec(outputObjAttr, rdd) :: Nil case r: LogicalRDD => RDDScanExec(r.output, r.rdd, "ExistingRDD", r.outputPartitioning, r.outputOrdering) :: Nil diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala index 09ae7692ec518..7773ac71c4954 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala @@ -76,7 +76,7 @@ class IncrementalExecution( * with the desired literal */ override - lazy val optimizedPlan: LogicalPlan = tracker.measurePhase(QueryPlanningTracker.OPTIMIZATION) { + lazy val optimizedPlan: LogicalPlan = executePhase(QueryPlanningTracker.OPTIMIZATION) { sparkSession.sessionState.optimizer.executeAndTrack(withCachedData, tracker) transformAllExpressions { case ts @ CurrentBatchTimestamp(timestamp, _, _) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index 62ad5ea9b5935..239b705a473d0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -3360,8 +3360,8 @@ object functions { /** * Creates timestamp from the number of seconds since UTC epoch. - * @group = datetime_funcs - * @since = 3.1.0 + * @group datetime_funcs + * @since 3.1.0 */ def timestamp_seconds(e: Column): Column = withExpr { SecondsToTimestamp(e.expr) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala index 3fd5cc72cb95e..9acb00b7b6d0b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala @@ -134,6 +134,24 @@ class UserDefinedTypeSuite extends QueryTest with SharedSparkSession with Parque MyLabeledPoint(1.0, new TestUDT.MyDenseVector(Array(0.1, 1.0))), MyLabeledPoint(0.0, new TestUDT.MyDenseVector(Array(0.3, 3.0)))).toDF() + + test("SPARK-32090: equal") { + val udt1 = new ExampleBaseTypeUDT + val udt2 = new ExampleSubTypeUDT + val udt3 = new ExampleSubTypeUDT + assert(udt1 !== udt2) + assert(udt2 !== udt1) + assert(udt2 === udt3) + assert(udt3 === udt2) + } + + test("SPARK-32090: acceptsType") { + val udt1 = new ExampleBaseTypeUDT + val udt2 = new ExampleSubTypeUDT + assert(udt1.acceptsType(udt2)) + assert(!udt2.acceptsType(udt1)) + } + test("register user type: MyDenseVector for MyLabeledPoint") { val labels: RDD[Double] = pointsRDD.select('label).rdd.map { case Row(v: Double) => v } val labelsArrays: Array[Double] = labels.collect() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 9fa97bffa8910..27d9748476c98 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight} import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan} import org.apache.spark.sql.execution.{PartialReducerPartitionSpec, ReusedSubqueryExec, ShuffledRowRDD, SparkPlan} import org.apache.spark.sql.execution.command.DataWritingCommandExec -import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, Exchange, ReusedExchangeExec} +import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, Exchange, ReusedExchangeExec, ShuffleExchangeExec} import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, SortMergeJoinExec} import org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate import org.apache.spark.sql.functions._ @@ -1022,18 +1022,81 @@ class AdaptiveQueryExecSuite } } - test("SPARK-31220 repartition obeys initialPartitionNum when adaptiveExecutionEnabled") { + test("SPARK-31220, SPARK-32056: repartition by expression with AQE") { Seq(true, false).foreach { enableAQE => withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> enableAQE.toString, - SQLConf.SHUFFLE_PARTITIONS.key -> "6", - SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "7") { - val partitionsNum = spark.range(10).repartition($"id").rdd.collectPartitions().length + SQLConf.COALESCE_PARTITIONS_ENABLED.key -> "true", + SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "10", + SQLConf.SHUFFLE_PARTITIONS.key -> "10") { + + val df1 = spark.range(10).repartition($"id") + val df2 = spark.range(10).repartition($"id" + 1) + + val partitionsNum1 = df1.rdd.collectPartitions().length + val partitionsNum2 = df2.rdd.collectPartitions().length + if (enableAQE) { - assert(partitionsNum === 7) + assert(partitionsNum1 < 10) + assert(partitionsNum2 < 10) + + // repartition obeys initialPartitionNum when adaptiveExecutionEnabled + val plan = df1.queryExecution.executedPlan + assert(plan.isInstanceOf[AdaptiveSparkPlanExec]) + val shuffle = plan.asInstanceOf[AdaptiveSparkPlanExec].executedPlan.collect { + case s: ShuffleExchangeExec => s + } + assert(shuffle.size == 1) + assert(shuffle(0).outputPartitioning.numPartitions == 10) } else { - assert(partitionsNum === 6) + assert(partitionsNum1 === 10) + assert(partitionsNum2 === 10) } + + + // Don't coalesce partitions if the number of partitions is specified. + val df3 = spark.range(10).repartition(10, $"id") + val df4 = spark.range(10).repartition(10) + assert(df3.rdd.collectPartitions().length == 10) + assert(df4.rdd.collectPartitions().length == 10) + } + } + } + + test("SPARK-31220, SPARK-32056: repartition by range with AQE") { + Seq(true, false).foreach { enableAQE => + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> enableAQE.toString, + SQLConf.COALESCE_PARTITIONS_ENABLED.key -> "true", + SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "10", + SQLConf.SHUFFLE_PARTITIONS.key -> "10") { + + val df1 = spark.range(10).toDF.repartitionByRange($"id".asc) + val df2 = spark.range(10).toDF.repartitionByRange(($"id" + 1).asc) + + val partitionsNum1 = df1.rdd.collectPartitions().length + val partitionsNum2 = df2.rdd.collectPartitions().length + + if (enableAQE) { + assert(partitionsNum1 < 10) + assert(partitionsNum2 < 10) + + // repartition obeys initialPartitionNum when adaptiveExecutionEnabled + val plan = df1.queryExecution.executedPlan + assert(plan.isInstanceOf[AdaptiveSparkPlanExec]) + val shuffle = plan.asInstanceOf[AdaptiveSparkPlanExec].executedPlan.collect { + case s: ShuffleExchangeExec => s + } + assert(shuffle.size == 1) + assert(shuffle(0).outputPartitioning.numPartitions == 10) + } else { + assert(partitionsNum1 === 10) + assert(partitionsNum2 === 10) + } + + // Don't coalesce partitions if the number of partitions is specified. + val df3 = spark.range(10).repartitionByRange(10, $"id".asc) + assert(df3.rdd.collectPartitions().length == 10) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/IntervalBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/IntervalBenchmark.scala index 907e3f40c1911..96ad453aeb2d7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/IntervalBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/IntervalBenchmark.scala @@ -39,11 +39,11 @@ import org.apache.spark.sql.internal.SQLConf object IntervalBenchmark extends SqlBasedBenchmark { import spark.implicits._ - private def doBenchmark(cardinality: Long, exprs: Column*): Unit = { + private def doBenchmark(cardinality: Long, columns: Column*): Unit = { withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true") { spark .range(0, cardinality, 1, 1) - .select(exprs: _*) + .select(columns: _*) .queryExecution .toRdd .foreach(_ => ()) @@ -60,6 +60,26 @@ object IntervalBenchmark extends SqlBasedBenchmark { } } + private def doBenchmarkExpr(cardinality: Long, exprs: String*): Unit = { + withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true") { + spark + .range(0, cardinality, 1, 1) + .selectExpr(exprs: _*) + .queryExecution + .toRdd + .foreach(_ => ()) + } + } + + private def addCaseExpr( + benchmark: Benchmark, + cardinality: Long, + name: String, + exprs: String*): Unit = { + benchmark.addCase(name, numIters = 3) { _ => doBenchmarkExpr(cardinality, exprs: _*) } + } + + private def buildString(withPrefix: Boolean, units: Seq[String] = Seq.empty): Column = { val init = lit(if (withPrefix) "interval" else "") :: ($"id" % 10000).cast("string") :: @@ -78,25 +98,68 @@ object IntervalBenchmark extends SqlBasedBenchmark { } } - override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { - val N = 1000000 + private def benchmarkIntervalStringParsing(cardinality: Long): Unit = { val timeUnits = Seq( "13 months", " 1 months", "100 weeks", "9 days", "12 hours", "- 3 hours", "5 minutes", "45 seconds", "123 milliseconds", "567 microseconds") val intervalToTest = ListBuffer[String]() - val benchmark = new Benchmark("cast strings to intervals", N, output = output) + val benchmark = new Benchmark("cast strings to intervals", cardinality, output = output) // The first 2 cases are used to show the overhead of preparing the interval string. - addCase(benchmark, N, "prepare string w/ interval", buildString(true, timeUnits)) - addCase(benchmark, N, "prepare string w/o interval", buildString(false, timeUnits)) - addCase(benchmark, N, intervalToTest) // Only years + addCase(benchmark, cardinality, "prepare string w/ interval", buildString(true, timeUnits)) + addCase(benchmark, cardinality, "prepare string w/o interval", buildString(false, timeUnits)) + addCase(benchmark, cardinality, intervalToTest) // Only years for (unit <- timeUnits) { intervalToTest.append(unit) - addCase(benchmark, N, intervalToTest) + addCase(benchmark, cardinality, intervalToTest) } benchmark.run() } + + private def benchmarkMakeInterval(cardinality: Long): Unit = { + val benchmark = new Benchmark("make_interval()", cardinality, output = output) + val hmExprs = Seq("id % 24", "id % 60") + val hmsExprs = hmExprs ++ Seq("cast((id % 500000000) / 1000000.0 as decimal(18, 6))") + val ymExprs = Seq("(2000 + (id % 30))", "((id % 12) + 1)") + val wdExpr = Seq("((id % 54) + 1)", "((id % 1000) + 1)") + val args = ymExprs ++ wdExpr ++ hmsExprs + + addCaseExpr( + benchmark, + cardinality, + "prepare make_interval()", + args: _*) + val foldableExpr = "make_interval(0, 1, 2, 3, 4, 5, 50.123456)" + addCaseExpr(benchmark, cardinality, foldableExpr, foldableExpr) + addCaseExpr( + benchmark, + cardinality, + "make_interval(*, *, 2, 3, 4, 5, 50.123456)", + s"make_interval(${ymExprs.mkString(",")}, 2, 3, 4, 5, 50.123456)") + addCaseExpr( + benchmark, + cardinality, + "make_interval(0, 1, *, *, 4, 5, 50.123456)", + s"make_interval(0, 1, ${wdExpr.mkString(",")}, 4, 5, 50.123456)") + addCaseExpr( + benchmark, + cardinality, + "make_interval(0, 1, 2, 3, *, *, *)", + s"make_interval(0, 1, 2, 3, ${hmsExprs.mkString(",")})") + addCaseExpr( + benchmark, + cardinality, + "make_interval(*, *, *, *, *, *, *)", + s"make_interval(${args.mkString(",")})") + + benchmark.run() + } + + override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { + benchmarkIntervalStringParsing(1000000) + benchmarkMakeInterval(1000000) + } }