diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 9cc321af4bde2..6afe58bff5229 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -23,6 +23,7 @@ import java.text.DateFormat import java.util.{Arrays, Comparator, Date, Locale} import scala.collection.JavaConverters._ +import scala.collection.mutable import scala.util.control.NonFatal import com.google.common.primitives.Longs @@ -143,14 +144,29 @@ class SparkHadoopUtil extends Logging { * Returns a function that can be called to find Hadoop FileSystem bytes read. If * getFSBytesReadOnThreadCallback is called from thread r at time t, the returned callback will * return the bytes read on r since t. - * - * @return None if the required method can't be found. */ private[spark] def getFSBytesReadOnThreadCallback(): () => Long = { - val threadStats = FileSystem.getAllStatistics.asScala.map(_.getThreadStatistics) - val f = () => threadStats.map(_.getBytesRead).sum - val baselineBytesRead = f() - () => f() - baselineBytesRead + val f = () => FileSystem.getAllStatistics.asScala.map(_.getThreadStatistics.getBytesRead).sum + val baseline = (Thread.currentThread().getId, f()) + + /** + * This function may be called in both spawned child threads and parent task thread (in + * PythonRDD), and Hadoop FileSystem uses thread local variables to track the statistics. + * So we need a map to track the bytes read from the child threads and parent thread, + * summing them together to get the bytes read of this task. + */ + new Function0[Long] { + private val bytesReadMap = new mutable.HashMap[Long, Long]() + + override def apply(): Long = { + bytesReadMap.synchronized { + bytesReadMap.put(Thread.currentThread().getId, f()) + bytesReadMap.map { case (k, v) => + v - (if (k == baseline._1) baseline._2 else 0) + }.sum + } + } + } } /** diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index c60a2a1706d5a..d13fb4193970b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -879,6 +879,15 @@ private[spark] object SparkSubmitUtils { // Exposed for testing var printStream = SparkSubmit.printStream + // Exposed for testing. + // These components are used to make the default exclusion rules for Spark dependencies. + // We need to specify each component explicitly, otherwise we miss spark-streaming-kafka-0-8 and + // other spark-streaming utility components. Underscore is there to differentiate between + // spark-streaming_2.1x and spark-streaming-kafka-0-8-assembly_2.1x + val IVY_DEFAULT_EXCLUDES = Seq("catalyst_", "core_", "graphx_", "launcher_", "mllib_", + "mllib-local_", "network-common_", "network-shuffle_", "repl_", "sketch_", "sql_", "streaming_", + "tags_", "unsafe_") + /** * Represents a Maven Coordinate * @param groupId the groupId of the coordinate @@ -1007,13 +1016,7 @@ private[spark] object SparkSubmitUtils { // Add scala exclusion rule md.addExcludeRule(createExclusion("*:scala-library:*", ivySettings, ivyConfName)) - // We need to specify each component explicitly, otherwise we miss spark-streaming-kafka-0-8 and - // other spark-streaming utility components. Underscore is there to differentiate between - // spark-streaming_2.1x and spark-streaming-kafka-0-8-assembly_2.1x - val components = Seq("catalyst_", "core_", "graphx_", "hive_", "mllib_", "repl_", - "sql_", "streaming_", "yarn_", "network-common_", "network-shuffle_", "network-yarn_") - - components.foreach { comp => + IVY_DEFAULT_EXCLUDES.foreach { comp => md.addExcludeRule(createExclusion(s"org.apache.spark:spark-$comp*:*", ivySettings, ivyConfName)) } diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 4bf8ecc383542..76ea8b86c53d2 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -251,7 +251,13 @@ class HadoopRDD[K, V]( null } // Register an on-task-completion callback to close the input stream. - context.addTaskCompletionListener{ context => closeIfNeeded() } + context.addTaskCompletionListener { context => + // Update the bytes read before closing is to make sure lingering bytesRead statistics in + // this thread get correctly added. + updateBytesRead() + closeIfNeeded() + } + private val key: K = if (reader == null) null.asInstanceOf[K] else reader.createKey() private val value: V = if (reader == null) null.asInstanceOf[V] else reader.createValue() diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index ce3a9a2a1e2a8..482875e6c1ac5 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -191,7 +191,13 @@ class NewHadoopRDD[K, V]( } // Register an on-task-completion callback to close the input stream. - context.addTaskCompletionListener(context => close()) + context.addTaskCompletionListener { context => + // Update the bytesRead before closing is to make sure lingering bytesRead statistics in + // this thread get correctly added. + updateBytesRead() + close() + } + private var havePair = false private var recordsSinceMetricsUpdate = 0 diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala index 266c9d33b5a96..57024786b95e3 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala @@ -187,12 +187,9 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll { } test("neglects Spark and Spark's dependencies") { - val components = Seq("catalyst_", "core_", "graphx_", "hive_", "mllib_", "repl_", - "sql_", "streaming_", "yarn_", "network-common_", "network-shuffle_", "network-yarn_") - - val coordinates = - components.map(comp => s"org.apache.spark:spark-${comp}2.10:1.2.0").mkString(",") + - ",org.apache.spark:spark-core_fake:1.2.0" + val coordinates = SparkSubmitUtils.IVY_DEFAULT_EXCLUDES + .map(comp => s"org.apache.spark:spark-${comp}2.11:2.1.1") + .mkString(",") + ",org.apache.spark:spark-core_fake:1.2.0" val path = SparkSubmitUtils.resolveMavenCoordinates( coordinates, diff --git a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala index 5d522189a0c29..6f4203da1d866 100644 --- a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala +++ b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala @@ -34,7 +34,7 @@ import org.scalatest.BeforeAndAfter import org.apache.spark.{SharedSparkContext, SparkFunSuite} import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} -import org.apache.spark.util.Utils +import org.apache.spark.util.{ThreadUtils, Utils} class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext with BeforeAndAfter { @@ -319,6 +319,35 @@ class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext } assert(bytesRead >= tmpFile.length()) } + + test("input metrics with old Hadoop API in different thread") { + val bytesRead = runAndReturnBytesRead { + sc.textFile(tmpFilePath, 4).mapPartitions { iter => + val buf = new ArrayBuffer[String]() + ThreadUtils.runInNewThread("testThread", false) { + iter.flatMap(_.split(" ")).foreach(buf.append(_)) + } + + buf.iterator + }.count() + } + assert(bytesRead >= tmpFile.length()) + } + + test("input metrics with new Hadoop API in different thread") { + val bytesRead = runAndReturnBytesRead { + sc.newAPIHadoopFile(tmpFilePath, classOf[NewTextInputFormat], classOf[LongWritable], + classOf[Text]).mapPartitions { iter => + val buf = new ArrayBuffer[String]() + ThreadUtils.runInNewThread("testThread", false) { + iter.map(_._2.toString).flatMap(_.split(" ")).foreach(buf.append(_)) + } + + buf.iterator + }.count() + } + assert(bytesRead >= tmpFile.length()) + } } /** diff --git a/launcher/src/main/java/org/apache/spark/launcher/FilteredObjectInputStream.java b/launcher/src/main/java/org/apache/spark/launcher/FilteredObjectInputStream.java new file mode 100644 index 0000000000000..4d254a0c4c9fe --- /dev/null +++ b/launcher/src/main/java/org/apache/spark/launcher/FilteredObjectInputStream.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.launcher; + +import java.io.InputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectStreamClass; +import java.util.Arrays; +import java.util.List; + +/** + * An object input stream that only allows classes used by the launcher protocol to be in the + * serialized stream. See SPARK-20922. + */ +class FilteredObjectInputStream extends ObjectInputStream { + + private static final List ALLOWED_PACKAGES = Arrays.asList( + "org.apache.spark.launcher.", + "java.lang."); + + FilteredObjectInputStream(InputStream is) throws IOException { + super(is); + } + + @Override + protected Class resolveClass(ObjectStreamClass desc) + throws IOException, ClassNotFoundException { + + boolean isValid = ALLOWED_PACKAGES.stream().anyMatch(p -> desc.getName().startsWith(p)); + if (!isValid) { + throw new IllegalArgumentException( + String.format("Unexpected class in stream: %s", desc.getName())); + } + return super.resolveClass(desc); + } + +} diff --git a/launcher/src/main/java/org/apache/spark/launcher/LauncherConnection.java b/launcher/src/main/java/org/apache/spark/launcher/LauncherConnection.java index eec264909bbb6..b4a8719e26053 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/LauncherConnection.java +++ b/launcher/src/main/java/org/apache/spark/launcher/LauncherConnection.java @@ -20,7 +20,6 @@ import java.io.Closeable; import java.io.EOFException; import java.io.IOException; -import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.net.Socket; import java.util.logging.Level; @@ -53,7 +52,7 @@ abstract class LauncherConnection implements Closeable, Runnable { @Override public void run() { try { - ObjectInputStream in = new ObjectInputStream(socket.getInputStream()); + FilteredObjectInputStream in = new FilteredObjectInputStream(socket.getInputStream()); while (!closed) { Message msg = (Message) in.readObject(); handle(msg); diff --git a/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java b/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java index 12f1a0ce2d1b4..03c2934e2692e 100644 --- a/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java +++ b/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java @@ -19,8 +19,11 @@ import java.io.Closeable; import java.io.IOException; +import java.io.ObjectInputStream; import java.net.InetAddress; import java.net.Socket; +import java.util.Arrays; +import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.Semaphore; @@ -120,31 +123,7 @@ public void testTimeout() throws Exception { Socket s = new Socket(InetAddress.getLoopbackAddress(), LauncherServer.getServerInstance().getPort()); client = new TestClient(s); - - // Try a few times since the client-side socket may not reflect the server-side close - // immediately. - boolean helloSent = false; - int maxTries = 10; - for (int i = 0; i < maxTries; i++) { - try { - if (!helloSent) { - client.send(new Hello(handle.getSecret(), "1.4.0")); - helloSent = true; - } else { - client.send(new SetAppId("appId")); - } - fail("Expected exception caused by connection timeout."); - } catch (IllegalStateException | IOException e) { - // Expected. - break; - } catch (AssertionError e) { - if (i < maxTries - 1) { - Thread.sleep(100); - } else { - throw new AssertionError("Test failed after " + maxTries + " attempts.", e); - } - } - } + waitForError(client, handle.getSecret()); } finally { SparkLauncher.launcherConfig.remove(SparkLauncher.CHILD_CONNECTION_TIMEOUT); kill(handle); @@ -183,6 +162,25 @@ public void infoChanged(SparkAppHandle handle) { } } + @Test + public void testStreamFiltering() throws Exception { + ChildProcAppHandle handle = LauncherServer.newAppHandle(); + TestClient client = null; + try { + Socket s = new Socket(InetAddress.getLoopbackAddress(), + LauncherServer.getServerInstance().getPort()); + + client = new TestClient(s); + client.send(new EvilPayload()); + waitForError(client, handle.getSecret()); + assertEquals(0, EvilPayload.EVIL_BIT); + } finally { + kill(handle); + close(client); + client.clientThread.join(); + } + } + private void kill(SparkAppHandle handle) { if (handle != null) { handle.kill(); @@ -199,6 +197,35 @@ private void close(Closeable c) { } } + /** + * Try a few times to get a client-side error, since the client-side socket may not reflect the + * server-side close immediately. + */ + private void waitForError(TestClient client, String secret) throws Exception { + boolean helloSent = false; + int maxTries = 10; + for (int i = 0; i < maxTries; i++) { + try { + if (!helloSent) { + client.send(new Hello(secret, "1.4.0")); + helloSent = true; + } else { + client.send(new SetAppId("appId")); + } + fail("Expected error but message went through."); + } catch (IllegalStateException | IOException e) { + // Expected. + break; + } catch (AssertionError e) { + if (i < maxTries - 1) { + Thread.sleep(100); + } else { + throw new AssertionError("Test failed after " + maxTries + " attempts.", e); + } + } + } + } + private static class TestClient extends LauncherConnection { final BlockingQueue inbound; @@ -220,4 +247,19 @@ protected void handle(Message msg) throws IOException { } + private static class EvilPayload extends LauncherProtocol.Message { + + static int EVIL_BIT = 0; + + // This field should cause the launcher server to throw an error and not deserialize the + // message. + private List notAllowedField = Arrays.asList("disallowed"); + + private void readObject(ObjectInputStream stream) throws IOException, ClassNotFoundException { + stream.defaultReadObject(); + EVIL_BIT = 1; + } + + } + } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/CoordinateMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/CoordinateMatrix.scala index 26ca1ef9be870..0d223de9b6f7e 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/CoordinateMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/CoordinateMatrix.scala @@ -125,6 +125,13 @@ class CoordinateMatrix @Since("1.0.0") ( s"colsPerBlock needs to be greater than 0. colsPerBlock: $colsPerBlock") val m = numRows() val n = numCols() + + // Since block matrices require an integer row and col index + require(math.ceil(m.toDouble / rowsPerBlock) <= Int.MaxValue, + "Number of rows divided by rowsPerBlock cannot exceed maximum integer.") + require(math.ceil(n.toDouble / colsPerBlock) <= Int.MaxValue, + "Number of cols divided by colsPerBlock cannot exceed maximum integer.") + val numRowBlocks = math.ceil(m.toDouble / rowsPerBlock).toInt val numColBlocks = math.ceil(n.toDouble / colsPerBlock).toInt val partitioner = GridPartitioner(numRowBlocks, numColBlocks, entries.partitions.length) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrix.scala index d7255d527f036..8890662d99b52 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrix.scala @@ -91,7 +91,7 @@ class IndexedRowMatrix @Since("1.0.0") ( } /** - * Converts to BlockMatrix. Creates blocks of `SparseMatrix` with size 1024 x 1024. + * Converts to BlockMatrix. Creates blocks with size 1024 x 1024. */ @Since("1.3.0") def toBlockMatrix(): BlockMatrix = { @@ -99,7 +99,7 @@ class IndexedRowMatrix @Since("1.0.0") ( } /** - * Converts to BlockMatrix. Creates blocks of `SparseMatrix`. + * Converts to BlockMatrix. Blocks may be sparse or dense depending on the sparsity of the rows. * @param rowsPerBlock The number of rows of each block. The blocks at the bottom edge may have * a smaller value. Must be an integer value greater than 0. * @param colsPerBlock The number of columns of each block. The blocks at the right edge may have @@ -108,8 +108,70 @@ class IndexedRowMatrix @Since("1.0.0") ( */ @Since("1.3.0") def toBlockMatrix(rowsPerBlock: Int, colsPerBlock: Int): BlockMatrix = { - // TODO: This implementation may be optimized - toCoordinateMatrix().toBlockMatrix(rowsPerBlock, colsPerBlock) + require(rowsPerBlock > 0, + s"rowsPerBlock needs to be greater than 0. rowsPerBlock: $rowsPerBlock") + require(colsPerBlock > 0, + s"colsPerBlock needs to be greater than 0. colsPerBlock: $colsPerBlock") + + val m = numRows() + val n = numCols() + + // Since block matrices require an integer row index + require(math.ceil(m.toDouble / rowsPerBlock) <= Int.MaxValue, + "Number of rows divided by rowsPerBlock cannot exceed maximum integer.") + + // The remainder calculations only matter when m % rowsPerBlock != 0 or n % colsPerBlock != 0 + val remainderRowBlockIndex = m / rowsPerBlock + val remainderColBlockIndex = n / colsPerBlock + val remainderRowBlockSize = (m % rowsPerBlock).toInt + val remainderColBlockSize = (n % colsPerBlock).toInt + val numRowBlocks = math.ceil(m.toDouble / rowsPerBlock).toInt + val numColBlocks = math.ceil(n.toDouble / colsPerBlock).toInt + + val blocks = rows.flatMap { ir: IndexedRow => + val blockRow = ir.index / rowsPerBlock + val rowInBlock = ir.index % rowsPerBlock + + ir.vector match { + case SparseVector(size, indices, values) => + indices.zip(values).map { case (index, value) => + val blockColumn = index / colsPerBlock + val columnInBlock = index % colsPerBlock + ((blockRow.toInt, blockColumn.toInt), (rowInBlock.toInt, Array((value, columnInBlock)))) + } + case DenseVector(values) => + values.grouped(colsPerBlock) + .zipWithIndex + .map { case (values, blockColumn) => + ((blockRow.toInt, blockColumn), (rowInBlock.toInt, values.zipWithIndex)) + } + } + }.groupByKey(GridPartitioner(numRowBlocks, numColBlocks, rows.getNumPartitions)).map { + case ((blockRow, blockColumn), itr) => + val actualNumRows = + if (blockRow == remainderRowBlockIndex) remainderRowBlockSize else rowsPerBlock + val actualNumColumns = + if (blockColumn == remainderColBlockIndex) remainderColBlockSize else colsPerBlock + + val arraySize = actualNumRows * actualNumColumns + val matrixAsArray = new Array[Double](arraySize) + var countForValues = 0 + itr.foreach { case (rowWithinBlock, valuesWithColumns) => + valuesWithColumns.foreach { case (value, columnWithinBlock) => + matrixAsArray.update(columnWithinBlock * actualNumRows + rowWithinBlock, value) + countForValues += 1 + } + } + val denseMatrix = new DenseMatrix(actualNumRows, actualNumColumns, matrixAsArray) + val finalMatrix = if (countForValues / arraySize.toDouble >= 0.1) { + denseMatrix + } else { + denseMatrix.toSparse + } + + ((blockRow, blockColumn), finalMatrix) + } + new BlockMatrix(blocks, rowsPerBlock, colsPerBlock, m, n) } /** diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrixSuite.scala index 99af5fa10d999..566ce95be084a 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrixSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrixSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.mllib.linalg.distributed import breeze.linalg.{diag => brzDiag, DenseMatrix => BDM, DenseVector => BDV} import org.apache.spark.SparkFunSuite -import org.apache.spark.mllib.linalg.{Matrices, Vectors} +import org.apache.spark.mllib.linalg._ import org.apache.spark.mllib.util.MLlibTestSparkContext import org.apache.spark.rdd.RDD @@ -87,19 +87,96 @@ class IndexedRowMatrixSuite extends SparkFunSuite with MLlibTestSparkContext { assert(coordMat.toBreeze() === idxRowMat.toBreeze()) } - test("toBlockMatrix") { - val idxRowMat = new IndexedRowMatrix(indexedRows) - val blockMat = idxRowMat.toBlockMatrix(2, 2) + test("toBlockMatrix dense backing") { + val idxRowMatDense = new IndexedRowMatrix(indexedRows) + + // Tests when n % colsPerBlock != 0 + val blockMat = idxRowMatDense.toBlockMatrix(2, 2) assert(blockMat.numRows() === m) assert(blockMat.numCols() === n) - assert(blockMat.toBreeze() === idxRowMat.toBreeze()) + assert(blockMat.toBreeze() === idxRowMatDense.toBreeze()) + + // Tests when m % rowsPerBlock != 0 + val blockMat2 = idxRowMatDense.toBlockMatrix(3, 1) + assert(blockMat2.numRows() === m) + assert(blockMat2.numCols() === n) + assert(blockMat2.toBreeze() === idxRowMatDense.toBreeze()) intercept[IllegalArgumentException] { - idxRowMat.toBlockMatrix(-1, 2) + idxRowMatDense.toBlockMatrix(-1, 2) } intercept[IllegalArgumentException] { - idxRowMat.toBlockMatrix(2, 0) + idxRowMatDense.toBlockMatrix(2, 0) } + + assert(blockMat.blocks.map { case (_, matrix: Matrix) => + matrix.isInstanceOf[DenseMatrix] + }.reduce(_ && _)) + assert(blockMat2.blocks.map { case (_, matrix: Matrix) => + matrix.isInstanceOf[DenseMatrix] + }.reduce(_ && _)) + } + + test("toBlockMatrix sparse backing") { + val sparseData = Seq( + (15L, Vectors.sparse(12, Seq((0, 4.0)))) + ).map(x => IndexedRow(x._1, x._2)) + + // Gonna make m and n larger here so the matrices can easily be completely sparse: + val m = 16 + val n = 12 + + val idxRowMatSparse = new IndexedRowMatrix(sc.parallelize(sparseData)) + + // Tests when n % colsPerBlock != 0 + val blockMat = idxRowMatSparse.toBlockMatrix(8, 8) + assert(blockMat.numRows() === m) + assert(blockMat.numCols() === n) + assert(blockMat.toBreeze() === idxRowMatSparse.toBreeze()) + + // Tests when m % rowsPerBlock != 0 + val blockMat2 = idxRowMatSparse.toBlockMatrix(6, 6) + assert(blockMat2.numRows() === m) + assert(blockMat2.numCols() === n) + assert(blockMat2.toBreeze() === idxRowMatSparse.toBreeze()) + + assert(blockMat.blocks.collect().forall{ case (_, matrix: Matrix) => + matrix.isInstanceOf[SparseMatrix] + }) + assert(blockMat2.blocks.collect().forall{ case (_, matrix: Matrix) => + matrix.isInstanceOf[SparseMatrix] + }) + } + + test("toBlockMatrix mixed backing") { + val m = 24 + val n = 18 + + val mixedData = Seq( + (0L, Vectors.dense((0 to 17).map(_.toDouble).toArray)), + (1L, Vectors.dense((0 to 17).map(_.toDouble).toArray)), + (23L, Vectors.sparse(18, Seq((0, 4.0))))) + .map(x => IndexedRow(x._1, x._2)) + + val idxRowMatMixed = new IndexedRowMatrix( + sc.parallelize(mixedData)) + + // Tests when n % colsPerBlock != 0 + val blockMat = idxRowMatMixed.toBlockMatrix(12, 12) + assert(blockMat.numRows() === m) + assert(blockMat.numCols() === n) + assert(blockMat.toBreeze() === idxRowMatMixed.toBreeze()) + + // Tests when m % rowsPerBlock != 0 + val blockMat2 = idxRowMatMixed.toBlockMatrix(18, 6) + assert(blockMat2.numRows() === m) + assert(blockMat2.numCols() === n) + assert(blockMat2.toBreeze() === idxRowMatMixed.toBreeze()) + + val blocks = blockMat.blocks.collect() + + assert(blocks.forall { case((row, col), matrix) => + if (row == 0) matrix.isInstanceOf[DenseMatrix] else matrix.isInstanceOf[SparseMatrix]}) } test("multiply a local matrix") { diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 9956071fd6e38..1fb7edf2a6e30 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -1275,7 +1275,8 @@ private object Client extends Logging { if (sparkConf.get(SPARK_ARCHIVE).isEmpty) { sparkConf.get(SPARK_JARS).foreach { jars => jars.filter(isLocalUri).foreach { jar => - addClasspathEntry(getClusterPath(sparkConf, jar), env) + val uri = new URI(jar) + addClasspathEntry(getClusterPath(sparkConf, uri.getPath()), env) } } } diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala index 3a11787aa57dc..6cf68427921fd 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala @@ -122,6 +122,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll cp should not contain (uri.getPath()) } }) + cp should not contain ("local") cp should contain(PWD) cp should contain (s"$PWD${Path.SEPARATOR}${LOCALIZED_CONF_DIR}") cp should not contain (APP_JAR) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index 8081036bed8a6..116b26f612e02 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -380,6 +380,7 @@ object FunctionRegistry { expression[AssertTrue]("assert_true"), expression[Crc32]("crc32"), expression[Md5]("md5"), + expression[Uuid]("uuid"), expression[Murmur3Hash]("hash"), expression[Sha1]("sha"), expression[Sha1]("sha1"), diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala index 7b64568c69659..615256243ae2a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala @@ -283,7 +283,7 @@ case class Cosh(child: Expression) extends UnaryMathExpression(math.cosh, "COSH" > SELECT _FUNC_('100', 2, 10); 4 > SELECT _FUNC_(-10, 16, -10); - 16 + -16 """) case class Conv(numExpr: Expression, fromBaseExpr: Expression, toBaseExpr: Expression) extends TernaryExpression with ImplicitCastInputTypes { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala index bb9368cf6d774..3fc4bb7041636 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala @@ -17,9 +17,12 @@ package org.apache.spark.sql.catalyst.expressions +import java.util.UUID + import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String /** * Print the result of an expression to stderr (used for debugging codegen). @@ -104,3 +107,28 @@ case class CurrentDatabase() extends LeafExpression with Unevaluable { override def nullable: Boolean = false override def prettyName: String = "current_database" } + +// scalastyle:off line.size.limit +@ExpressionDescription( + usage = "_FUNC_() - Returns an universally unique identifier (UUID) string. The value is returned as a canonical UUID 36-character string.", + extended = """ + Examples: + > SELECT _FUNC_(); + 46707d92-02f4-4817-8116-a4c3b23e6266 + """) +// scalastyle:on line.size.limit +case class Uuid() extends LeafExpression { + + override def deterministic: Boolean = false + + override def nullable: Boolean = false + + override def dataType: DataType = StringType + + override def eval(input: InternalRow): Any = UTF8String.fromString(UUID.randomUUID().toString) + + override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { + ev.copy(code = s"final UTF8String ${ev.value} = " + + s"UTF8String.fromString(java.util.UUID.randomUUID().toString());", isNull = "false") + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala index cc4d465c5d701..035a1afe8b782 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala @@ -1047,7 +1047,7 @@ case class FormatString(children: Expression*) extends Expression with ImplicitC """, extended = """ Examples: - > SELECT initcap('sPark sql'); + > SELECT _FUNC_('sPark sql'); Spark Sql """) case class InitCap(child: Expression) extends UnaryExpression with ImplicitCastInputTypes { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index c6f5cf641b8d5..1739b0cfa2761 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -552,6 +552,12 @@ object SQLConf { .booleanConf .createWithDefault(true) + val SUBQUERY_REUSE_ENABLED = buildConf("spark.sql.subquery.reuse") + .internal() + .doc("When true, the planner will try to find out duplicated subqueries and re-use them.") + .booleanConf + .createWithDefault(true) + val STATE_STORE_PROVIDER_CLASS = buildConf("spark.sql.streaming.stateStore.providerClass") .internal() @@ -932,6 +938,8 @@ class SQLConf extends Serializable with Logging { def exchangeReuseEnabled: Boolean = getConf(EXCHANGE_REUSE_ENABLED) + def subqueryReuseEnabled: Boolean = getConf(SUBQUERY_REUSE_ENABLED) + def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE) def constraintPropagationEnabled: Boolean = getConf(CONSTRAINT_PROPAGATION_ENABLED) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MiscExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MiscExpressionsSuite.scala index a26d070a99c52..4fe7b436982b1 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MiscExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MiscExpressionsSuite.scala @@ -39,4 +39,9 @@ class MiscExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { checkEvaluation(AssertTrue(Cast(Literal(1), BooleanType)), null) } + test("uuid") { + checkEvaluation(Length(Uuid()), 36) + assert(evaluate(Uuid()) !== evaluate(Uuid())) + } + } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala index 85096dcc40f5d..f69a688555bbf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala @@ -595,6 +595,9 @@ case class OutputFakerExec(output: Seq[Attribute], child: SparkPlan) extends Spa */ case class SubqueryExec(name: String, child: SparkPlan) extends UnaryExecNode { + // Ignore this wrapper for canonicalizing. + override lazy val canonicalized: SparkPlan = child.canonicalized + override lazy val metrics = Map( "dataSize" -> SQLMetrics.createMetric(sparkContext, "data size (bytes)"), "collectTime" -> SQLMetrics.createMetric(sparkContext, "time to collect (ms)")) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala index d11045fb6ac8c..2abeadfe45362 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala @@ -156,7 +156,7 @@ case class PlanSubqueries(sparkSession: SparkSession) extends Rule[SparkPlan] { case class ReuseSubquery(conf: SQLConf) extends Rule[SparkPlan] { def apply(plan: SparkPlan): SparkPlan = { - if (!conf.exchangeReuseEnabled) { + if (!conf.subqueryReuseEnabled) { return plan } // Build a hash map using schema of subqueries to avoid O(N*N) sameResult calls. diff --git a/sql/core/src/test/resources/sql-tests/inputs/string-functions.sql b/sql/core/src/test/resources/sql-tests/inputs/string-functions.sql index e6dcea4972c18..d82df11251c5b 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/string-functions.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/string-functions.sql @@ -12,3 +12,6 @@ FROM (SELECT id col1, id col2, id col3, id col4 FROM range(10)) t; -- replace function select replace('abc', 'b', '123'); select replace('abc', 'b'); + +-- uuid +select length(uuid()), (uuid() <> uuid()); diff --git a/sql/core/src/test/resources/sql-tests/results/string-functions.sql.out b/sql/core/src/test/resources/sql-tests/results/string-functions.sql.out index abf0cc44d6e42..4093a7b9fc820 100644 --- a/sql/core/src/test/resources/sql-tests/results/string-functions.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/string-functions.sql.out @@ -1,5 +1,5 @@ -- Automatically generated by SQLQueryTestSuite --- Number of queries: 6 +-- Number of queries: 7 -- !query 0 @@ -70,3 +70,11 @@ select replace('abc', 'b') struct -- !query 5 output ac + + +-- !query 6 +select length(uuid()), (uuid() <> uuid()) +-- !query 6 schema +struct +-- !query 6 output +36 true diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index b525c9e80ba42..41e9e2c92ca8e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -23,9 +23,12 @@ import java.net.{MalformedURLException, URL} import java.sql.Timestamp import java.util.concurrent.atomic.AtomicBoolean +import scala.collection.mutable.ArrayBuffer + import org.apache.spark.{AccumulatorSuite, SparkException} import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart} import org.apache.spark.sql.catalyst.util.StringUtils +import org.apache.spark.sql.execution.{ScalarSubquery, SubqueryExec} import org.apache.spark.sql.execution.aggregate import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, CartesianProductExec, SortMergeJoinExec} import org.apache.spark.sql.functions._ @@ -700,6 +703,38 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { row => Seq.fill(16)(Row.merge(row, row))).collect().toSeq) } + test("Verify spark.sql.subquery.reuse") { + Seq(true, false).foreach { reuse => + withSQLConf(SQLConf.SUBQUERY_REUSE_ENABLED.key -> reuse.toString) { + val df = sql( + """ + |SELECT key, (SELECT avg(key) FROM testData) + |FROM testData + |WHERE key > (SELECT avg(key) FROM testData) + |ORDER BY key + |LIMIT 3 + """.stripMargin) + + checkAnswer(df, Row(51, 50.5) :: Row(52, 50.5) :: Row(53, 50.5) :: Nil) + + val subqueries = ArrayBuffer.empty[SubqueryExec] + df.queryExecution.executedPlan.transformAllExpressions { + case s @ ScalarSubquery(plan: SubqueryExec, _) => + subqueries += plan + s + } + + assert(subqueries.size == 2, "Two ScalarSubquery are expected in the plan") + + if (reuse) { + assert(subqueries.distinct.size == 1, "Only one ScalarSubquery exists in the plan") + } else { + assert(subqueries.distinct.size == 2, "Reuse is not expected") + } + } + } + } + test("cartesian product join") { withSQLConf(SQLConf.CROSS_JOINS_ENABLED.key -> "true") { checkAnswer(