diff --git a/R/pkg/DESCRIPTION b/R/pkg/DESCRIPTION index 635a04e48d3a..f237df1fb854 100644 --- a/R/pkg/DESCRIPTION +++ b/R/pkg/DESCRIPTION @@ -1,7 +1,7 @@ Package: SparkR Type: Package Title: R frontend for Spark -Version: 1.6.1 +Version: 1.6.4 Date: 2013-09-09 Author: The Apache Software Foundation Maintainer: Shivaram Venkataraman diff --git a/assembly/pom.xml b/assembly/pom.xml index a91203eb3e45..bd794fb9cd92 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.11 - 1.6.2-csd-7-SNAPSHOT + 1.6.3-csd-1-SNAPSHOT ../pom.xml diff --git a/bagel/pom.xml b/bagel/pom.xml index 885168bbeb84..547506d8fce4 100644 --- a/bagel/pom.xml +++ b/bagel/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.11 - 1.6.2-csd-7-SNAPSHOT + 1.6.3-csd-1-SNAPSHOT ../pom.xml diff --git a/core/pom.xml b/core/pom.xml index ce785acbfbd5..3ec9da5189d0 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.11 - 1.6.2-csd-7-SNAPSHOT + 1.6.3-csd-1-SNAPSHOT ../pom.xml diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index c2ebf3059621..47ce6672ccde 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -19,6 +19,7 @@ package org.apache.spark.executor import java.net.URL import java.nio.ByteBuffer +import java.util.concurrent.atomic.AtomicBoolean import org.apache.hadoop.conf.Configuration @@ -45,6 +46,7 @@ private[spark] class CoarseGrainedExecutorBackend( env: SparkEnv) extends ThreadSafeRpcEndpoint with ExecutorBackend with Logging { + private[this] val stopping = new AtomicBoolean(false) var executor: Executor = null @volatile var driver: Option[RpcEndpointRef] = None @@ -106,19 +108,23 @@ private[spark] class CoarseGrainedExecutorBackend( } case StopExecutor => + stopping.set(true) logInfo("Driver commanded a shutdown") // Cannot shutdown here because an ack may need to be sent back to the caller. So send // a message to self to actually do the shutdown. self.send(Shutdown) case Shutdown => + stopping.set(true) executor.stop() stop() rpcEnv.shutdown() } override def onDisconnected(remoteAddress: RpcAddress): Unit = { - if (driver.exists(_.address == remoteAddress)) { + if (stopping.get()) { + logInfo(s"Driver from $remoteAddress disconnected during shutdown") + } else if (driver.exists(_.address == remoteAddress)) { logError(s"Driver $remoteAddress disassociated! Shutting down.") System.exit(1) } else { diff --git a/core/src/main/scala/org/apache/spark/package.scala b/core/src/main/scala/org/apache/spark/package.scala index 1499d149391b..f8c1f298de46 100644 --- a/core/src/main/scala/org/apache/spark/package.scala +++ b/core/src/main/scala/org/apache/spark/package.scala @@ -43,5 +43,5 @@ package org.apache package object spark { // For package docs only - val SPARK_VERSION = "1.6.2" + val SPARK_VERSION = "1.6.3" } diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index f37c95bedc0a..463dd5bd3073 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -139,6 +139,9 @@ class HadoopRDD[K, V]( private val shouldCloneJobConf = sparkContext.conf.getBoolean("spark.hadoop.cloneConf", false) + private val ignoreCorruptFiles = + sparkContext.conf.getBoolean("spark.files.ignoreCorruptFiles", true) + // Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads. protected def getJobConf(): JobConf = { val conf: Configuration = broadcastedConf.value.value @@ -245,8 +248,7 @@ class HadoopRDD[K, V]( try { finished = !reader.next(key, value) } catch { - case eof: EOFException => - finished = true + case _: EOFException if ignoreCorruptFiles => finished = true } if (!finished) { inputMetrics.incRecordsRead(1) diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index 46fe1ba93441..5b5ddd5ce28f 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -17,6 +17,7 @@ package org.apache.spark.rdd +import java.io.EOFException import java.text.SimpleDateFormat import java.util.Date @@ -84,6 +85,9 @@ class NewHadoopRDD[K, V]( private val shouldCloneJobConf = sparkContext.conf.getBoolean("spark.hadoop.cloneConf", false) + private val ignoreCorruptFiles = + sparkContext.conf.getBoolean("spark.files.ignoreCorruptFiles", true) + def getConf: Configuration = { val conf: Configuration = confBroadcast.value.value if (shouldCloneJobConf) { @@ -171,7 +175,11 @@ class NewHadoopRDD[K, V]( override def hasNext: Boolean = { if (!finished && !havePair) { - finished = !reader.nextKeyValue + try { + finished = !reader.nextKeyValue + } catch { + case _: EOFException if ignoreCorruptFiles => finished = true + } if (finished) { // Close and release the reader here; close() will also be called when the task // completes, but for tasks that read from many files, it helps to release the diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index bdf19f9f277d..6d1ba4235499 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -87,8 +87,8 @@ private[spark] class TaskSchedulerImpl( // Incrementing task IDs val nextTaskId = new AtomicLong(0) - // Number of tasks running on each executor - private val executorIdToTaskCount = new HashMap[String, Int] + // IDs of the tasks running on each executor + private val executorIdToRunningTaskIds = new HashMap[String, HashSet[Long]] // The set of executors we have on each host; this is used to compute hostsAlive, which // in turn is used to decide when we can attain data locality on a given host @@ -254,7 +254,7 @@ private[spark] class TaskSchedulerImpl( val tid = task.taskId taskIdToTaskSetManager(tid) = taskSet taskIdToExecutorId(tid) = execId - executorIdToTaskCount(execId) += 1 + executorIdToRunningTaskIds(execId).add(tid) executorsByHost(host) += execId availableCpus(i) -= CPUS_PER_TASK assert(availableCpus(i) >= 0) @@ -283,7 +283,7 @@ private[spark] class TaskSchedulerImpl( var newExecAvail = false for (o <- offers) { executorIdToHost(o.executorId) = o.host - executorIdToTaskCount.getOrElseUpdate(o.executorId, 0) + executorIdToRunningTaskIds.getOrElseUpdate(o.executorId, HashSet[Long]()) if (!executorsByHost.contains(o.host)) { executorsByHost(o.host) = new HashSet[String]() executorAdded(o.executorId, o.host) @@ -329,37 +329,34 @@ private[spark] class TaskSchedulerImpl( var failedExecutor: Option[String] = None synchronized { try { - if (state == TaskState.LOST && taskIdToExecutorId.contains(tid)) { - // We lost this entire executor, so remember that it's gone - val execId = taskIdToExecutorId(tid) - - if (executorIdToTaskCount.contains(execId)) { - removeExecutor(execId, - SlaveLost(s"Task $tid was lost, so marking the executor as lost as well.")) - failedExecutor = Some(execId) - } - } taskIdToTaskSetManager.get(tid) match { case Some(taskSet) => - if (TaskState.isFinished(state)) { - taskIdToTaskSetManager.remove(tid) - taskIdToExecutorId.remove(tid).foreach { execId => - if (executorIdToTaskCount.contains(execId)) { - executorIdToTaskCount(execId) -= 1 - } + if (state == TaskState.LOST) { + // TaskState.LOST is only used by the Mesos fine-grained scheduling mode, + // where each executor corresponds to a single task, so mark the executor as failed. + val execId = taskIdToExecutorId.getOrElse(tid, throw new IllegalStateException( + "taskIdToTaskSetManager.contains(tid) <=> taskIdToExecutorId.contains(tid)")) + if (executorIdToRunningTaskIds.contains(execId)) { + val reason = + SlaveLost(s"Task $tid was lost, so marking the executor as lost as well.") + removeExecutor(execId, reason) + failedExecutor = Some(execId) } } - if (state == TaskState.FINISHED) { - taskSet.removeRunningTask(tid) - taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData) - } else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) { + if (TaskState.isFinished(state)) { + cleanupTaskState(tid) taskSet.removeRunningTask(tid) - taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData) + if (state == TaskState.FINISHED) { + taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData) + } else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) { + taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData) + } } case None => logError( ("Ignoring update with state %s for TID %s because its task set is gone (this is " + - "likely the result of receiving duplicate task finished status updates)") + "likely the result of receiving duplicate task finished status updates) or its " + + "executor has been marked as failed.") .format(state, tid)) } } catch { @@ -468,7 +465,7 @@ private[spark] class TaskSchedulerImpl( var failedExecutor: Option[String] = None synchronized { - if (executorIdToTaskCount.contains(executorId)) { + if (executorIdToRunningTaskIds.contains(executorId)) { val hostPort = executorIdToHost(executorId) logExecutorLoss(executorId, hostPort, reason) removeExecutor(executorId, reason) @@ -510,13 +507,31 @@ private[spark] class TaskSchedulerImpl( logError(s"Lost executor $executorId on $hostPort: $reason") } + /** + * Cleans up the TaskScheduler's state for tracking the given task. + */ + private def cleanupTaskState(tid: Long): Unit = { + taskIdToTaskSetManager.remove(tid) + taskIdToExecutorId.remove(tid).foreach { executorId => + executorIdToRunningTaskIds.get(executorId).foreach { _.remove(tid) } + } + } + /** * Remove an executor from all our data structures and mark it as lost. If the executor's loss * reason is not yet known, do not yet remove its association with its host nor update the status * of any running tasks, since the loss reason defines whether we'll fail those tasks. */ private def removeExecutor(executorId: String, reason: ExecutorLossReason) { - executorIdToTaskCount -= executorId + // The tasks on the lost executor may not send any more status updates (because the executor + // has been lost), so they should be cleaned up here. + executorIdToRunningTaskIds.remove(executorId).foreach { taskIds => + logDebug("Cleaning up TaskScheduler state for tasks " + + s"${taskIds.mkString("[", ",", "]")} on failed executor $executorId") + // We do not notify the TaskSetManager of the task failures because that will + // happen below in the rootPool.executorLost() call. + taskIds.foreach(cleanupTaskState) + } val host = executorIdToHost(executorId) val execs = executorsByHost.getOrElse(host, new HashSet) @@ -554,11 +569,11 @@ private[spark] class TaskSchedulerImpl( } def isExecutorAlive(execId: String): Boolean = synchronized { - executorIdToTaskCount.contains(execId) + executorIdToRunningTaskIds.contains(execId) } def isExecutorBusy(execId: String): Boolean = synchronized { - executorIdToTaskCount.getOrElse(execId, -1) > 0 + executorIdToRunningTaskIds.get(execId).exists(_.nonEmpty) } // By default, rack is unknown diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala index fdb00aafc4a4..7e87092d6167 100644 --- a/core/src/test/scala/org/apache/spark/FileSuite.scala +++ b/core/src/test/scala/org/apache/spark/FileSuite.scala @@ -17,7 +17,8 @@ package org.apache.spark -import java.io.{File, FileWriter} +import java.io._ +import java.util.zip.GZIPOutputStream import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.input.PortableDataStream @@ -540,4 +541,63 @@ class FileSuite extends SparkFunSuite with LocalSparkContext { }.collect() assert(inputPaths.toSet === Set(s"$outDir/part-00000", s"$outDir/part-00001")) } + + test("spark.files.ignoreCorruptFiles should work both HadoopRDD and NewHadoopRDD") { + val inputFile = File.createTempFile("input-", ".gz") + try { + // Create a corrupt gzip file + val byteOutput = new ByteArrayOutputStream() + val gzip = new GZIPOutputStream(byteOutput) + try { + gzip.write(Array[Byte](1, 2, 3, 4)) + } finally { + gzip.close() + } + val bytes = byteOutput.toByteArray + val o = new FileOutputStream(inputFile) + try { + // It's corrupt since we only write half of bytes into the file. + o.write(bytes.take(bytes.length / 2)) + } finally { + o.close() + } + + // Spark job should ignore corrupt files by default + sc = new SparkContext("local", "test") + // Test HadoopRDD + assert(sc.textFile(inputFile.toURI.toString).collect().isEmpty) + // Test NewHadoopRDD + assert { + sc.newAPIHadoopFile( + inputFile.toURI.toString, + classOf[NewTextInputFormat], + classOf[LongWritable], + classOf[Text]).collect().isEmpty + } + sc.stop() + + // Reading a corrupt gzip file should throw EOFException + val conf = new SparkConf().set("spark.files.ignoreCorruptFiles", "false") + sc = new SparkContext("local", "test", conf) + // Test HadoopRDD + var e = intercept[SparkException] { + sc.textFile(inputFile.toURI.toString).collect() + } + assert(e.getCause.isInstanceOf[EOFException]) + assert(e.getCause.getMessage === "Unexpected end of input stream") + // Test NewHadoopRDD + e = intercept[SparkException] { + sc.newAPIHadoopFile( + inputFile.toURI.toString, + classOf[NewTextInputFormat], + classOf[LongWritable], + classOf[Text]).collect() + } + assert(e.getCause.isInstanceOf[EOFException]) + assert(e.getCause.getMessage === "Unexpected end of input stream") + } finally { + inputFile.delete() + } + } + } diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala index 2fa795f84666..088bc97f7eba 100644 --- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala @@ -425,10 +425,11 @@ class StandaloneDynamicAllocationSuite assert(executors.size === 2) // simulate running a task on the executor - val getMap = PrivateMethod[mutable.HashMap[String, Int]]('executorIdToTaskCount) + val getMap = + PrivateMethod[mutable.HashMap[String, mutable.HashSet[Long]]]('executorIdToRunningTaskIds) val taskScheduler = sc.taskScheduler.asInstanceOf[TaskSchedulerImpl] - val executorIdToTaskCount = taskScheduler invokePrivate getMap() - executorIdToTaskCount(executors.head) = 1 + val executorIdToRunningTaskIds = taskScheduler invokePrivate getMap() + executorIdToRunningTaskIds(executors.head) = mutable.HashSet(1L) // kill the busy executor without force; this should fail assert(killExecutor(sc, executors.head, force = false)) apps = getApplications() diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 2afb595e6f10..2d1d9f5b6115 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.scheduler +import java.nio.ByteBuffer + import org.apache.spark._ class FakeSchedulerBackend extends SchedulerBackend { @@ -273,4 +275,68 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with L assert("executor1" === taskDescriptions3(0).executorId) } + test("if an executor is lost then the state for its running tasks is cleaned up (SPARK-18553)") { + sc = new SparkContext("local", "TaskSchedulerImplSuite") + val taskScheduler = new TaskSchedulerImpl(sc) + taskScheduler.initialize(new FakeSchedulerBackend) + // Need to initialize a DAGScheduler for the taskScheduler to use for callbacks. + new DAGScheduler(sc, taskScheduler) { + override def taskStarted(task: Task[_], taskInfo: TaskInfo) {} + override def executorAdded(execId: String, host: String) {} + } + + val e0Offers = Seq(WorkerOffer("executor0", "host0", 1)) + val attempt1 = FakeTask.createTaskSet(1) + + // submit attempt 1, offer resources, task gets scheduled + taskScheduler.submitTasks(attempt1) + val taskDescriptions = taskScheduler.resourceOffers(e0Offers).flatten + assert(1 === taskDescriptions.length) + + // mark executor0 as dead + taskScheduler.executorLost("executor0", SlaveLost()) + assert(!taskScheduler.isExecutorAlive("executor0")) + assert(!taskScheduler.hasExecutorsAliveOnHost("host0")) + assert(taskScheduler.getExecutorsAliveOnHost("host0").isEmpty) + + + // Check that state associated with the lost task attempt is cleaned up: + assert(taskScheduler.taskIdToExecutorId.isEmpty) + assert(taskScheduler.taskIdToTaskSetManager.isEmpty) + } + + test("if a task finishes with TaskState.LOST its executor is marked as dead") { + sc = new SparkContext("local", "TaskSchedulerImplSuite") + val taskScheduler = new TaskSchedulerImpl(sc) + taskScheduler.initialize(new FakeSchedulerBackend) + // Need to initialize a DAGScheduler for the taskScheduler to use for callbacks. + new DAGScheduler(sc, taskScheduler) { + override def taskStarted(task: Task[_], taskInfo: TaskInfo) {} + override def executorAdded(execId: String, host: String) {} + } + + val e0Offers = Seq(WorkerOffer("executor0", "host0", 1)) + val attempt1 = FakeTask.createTaskSet(1) + + // submit attempt 1, offer resources, task gets scheduled + taskScheduler.submitTasks(attempt1) + val taskDescriptions = taskScheduler.resourceOffers(e0Offers).flatten + assert(1 === taskDescriptions.length) + + // Report the task as failed with TaskState.LOST + taskScheduler.statusUpdate( + tid = taskDescriptions.head.taskId, + state = TaskState.LOST, + serializedData = ByteBuffer.allocate(0) + ) + + // Check that state associated with the lost task attempt is cleaned up: + assert(taskScheduler.taskIdToExecutorId.isEmpty) + assert(taskScheduler.taskIdToTaskSetManager.isEmpty) + + // Check that the executor has been marked as dead + assert(!taskScheduler.isExecutorAlive("executor0")) + assert(!taskScheduler.hasExecutorsAliveOnHost("host0")) + assert(taskScheduler.getExecutorsAliveOnHost("host0").isEmpty) + } } diff --git a/docker-integration-tests/pom.xml b/docker-integration-tests/pom.xml index bf6d4f39a626..f774b1cf23c6 100644 --- a/docker-integration-tests/pom.xml +++ b/docker-integration-tests/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.11 - 1.6.2-csd-7-SNAPSHOT + 1.6.3-csd-1-SNAPSHOT ../pom.xml diff --git a/docs/_config.yml b/docs/_config.yml index c2ecb59f644c..ee235bcfa3ed 100644 --- a/docs/_config.yml +++ b/docs/_config.yml @@ -14,8 +14,8 @@ include: # These allow the documentation to be updated with newer releases # of Spark, Scala, and Mesos. -SPARK_VERSION: 1.6.2 -SPARK_VERSION_SHORT: 1.6.2 +SPARK_VERSION: 1.6.4-SNAPSHOT +SPARK_VERSION_SHORT: 1.6.4 SCALA_BINARY_VERSION: "2.10" SCALA_VERSION: "2.10.5" MESOS_VERSION: 0.21.0 diff --git a/examples/pom.xml b/examples/pom.xml index d5f380bdd309..0b38f8d35bc0 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.11 - 1.6.2-csd-7-SNAPSHOT + 1.6.3-csd-1-SNAPSHOT ../pom.xml diff --git a/external/flume-assembly/pom.xml b/external/flume-assembly/pom.xml index 34d4e666bd2c..7e9cf36a7a1a 100644 --- a/external/flume-assembly/pom.xml +++ b/external/flume-assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.11 - 1.6.2-csd-7-SNAPSHOT + 1.6.3-csd-1-SNAPSHOT ../../pom.xml diff --git a/external/flume-sink/pom.xml b/external/flume-sink/pom.xml index 728bf8004f80..9ad060c2e76b 100644 --- a/external/flume-sink/pom.xml +++ b/external/flume-sink/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.11 - 1.6.2-csd-7-SNAPSHOT + 1.6.3-csd-1-SNAPSHOT ../../pom.xml diff --git a/external/flume/pom.xml b/external/flume/pom.xml index d815f1ef5669..a822ce000313 100644 --- a/external/flume/pom.xml +++ b/external/flume/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.11 - 1.6.2-csd-7-SNAPSHOT + 1.6.3-csd-1-SNAPSHOT ../../pom.xml diff --git a/external/kafka-assembly/pom.xml b/external/kafka-assembly/pom.xml index f8850d7ea8ee..0425204c0b22 100644 --- a/external/kafka-assembly/pom.xml +++ b/external/kafka-assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.11 - 1.6.2-csd-7-SNAPSHOT + 1.6.3-csd-1-SNAPSHOT ../../pom.xml diff --git a/external/kafka/pom.xml b/external/kafka/pom.xml index 00e9858b9eec..01e38c8129d0 100644 --- a/external/kafka/pom.xml +++ b/external/kafka/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.11 - 1.6.2-csd-7-SNAPSHOT + 1.6.3-csd-1-SNAPSHOT ../../pom.xml diff --git a/external/mqtt-assembly/pom.xml b/external/mqtt-assembly/pom.xml index f69dae30e0fa..8c2a6d0bbd39 100644 --- a/external/mqtt-assembly/pom.xml +++ b/external/mqtt-assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.11 - 1.6.2-csd-7-SNAPSHOT + 1.6.3-csd-1-SNAPSHOT ../../pom.xml diff --git a/external/mqtt/pom.xml b/external/mqtt/pom.xml index c3cf37490d4a..7d3785972175 100644 --- a/external/mqtt/pom.xml +++ b/external/mqtt/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.11 - 1.6.2-csd-7-SNAPSHOT + 1.6.3-csd-1-SNAPSHOT ../../pom.xml diff --git a/external/twitter/pom.xml b/external/twitter/pom.xml index acc23b399a75..0b154cfa596d 100644 --- a/external/twitter/pom.xml +++ b/external/twitter/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.11 - 1.6.2-csd-7-SNAPSHOT + 1.6.3-csd-1-SNAPSHOT ../../pom.xml diff --git a/external/zeromq/pom.xml b/external/zeromq/pom.xml index 40dd374c5120..766dd7d0622d 100644 --- a/external/zeromq/pom.xml +++ b/external/zeromq/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.11 - 1.6.2-csd-7-SNAPSHOT + 1.6.3-csd-1-SNAPSHOT ../../pom.xml diff --git a/extras/java8-tests/pom.xml b/extras/java8-tests/pom.xml index 3e4a75785209..f2578b0ec7a8 100644 --- a/extras/java8-tests/pom.xml +++ b/extras/java8-tests/pom.xml @@ -20,7 +20,7 @@ org.apache.spark spark-parent_2.11 - 1.6.2-csd-1-SNAPSHOT + 1.6.3-csd-1-SNAPSHOT ../../pom.xml diff --git a/extras/kinesis-asl-assembly/pom.xml b/extras/kinesis-asl-assembly/pom.xml index c5efcae0ee0e..68d4bb44b870 100644 --- a/extras/kinesis-asl-assembly/pom.xml +++ b/extras/kinesis-asl-assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.11 - 1.6.2-csd-1-SNAPSHOT + 1.6.3-csd-1-SNAPSHOT ../../pom.xml diff --git a/extras/kinesis-asl/pom.xml b/extras/kinesis-asl/pom.xml index 496bf0b3e434..6313d25aba25 100644 --- a/extras/kinesis-asl/pom.xml +++ b/extras/kinesis-asl/pom.xml @@ -20,7 +20,7 @@ org.apache.spark spark-parent_2.11 - 1.6.2-csd-1-SNAPSHOT + 1.6.3-csd-1-SNAPSHOT ../../pom.xml diff --git a/extras/spark-ganglia-lgpl/pom.xml b/extras/spark-ganglia-lgpl/pom.xml index f734e0f3ef51..c3bc9eb2f828 100644 --- a/extras/spark-ganglia-lgpl/pom.xml +++ b/extras/spark-ganglia-lgpl/pom.xml @@ -20,7 +20,7 @@ org.apache.spark spark-parent_2.11 - 1.6.2-csd-1-SNAPSHOT + 1.6.3-csd-1-SNAPSHOT ../../pom.xml diff --git a/graphx/pom.xml b/graphx/pom.xml index 9ceac837034c..f25a565ce831 100644 --- a/graphx/pom.xml +++ b/graphx/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.11 - 1.6.2-csd-7-SNAPSHOT + 1.6.3-csd-1-SNAPSHOT ../pom.xml diff --git a/launcher/pom.xml b/launcher/pom.xml index d1f34b9ab642..8342870ca36d 100644 --- a/launcher/pom.xml +++ b/launcher/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.11 - 1.6.2-csd-7-SNAPSHOT + 1.6.3-csd-1-SNAPSHOT ../pom.xml diff --git a/mllib/pom.xml b/mllib/pom.xml index fa7c15e76bbe..243dd55c6bb7 100644 --- a/mllib/pom.xml +++ b/mllib/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.11 - 1.6.2-csd-7-SNAPSHOT + 1.6.3-csd-1-SNAPSHOT ../pom.xml diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/BLAS.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/BLAS.scala index df9f4ae145b8..8f91d63b63d5 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/BLAS.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/BLAS.scala @@ -638,12 +638,16 @@ private[spark] object BLAS extends Serializable with Logging { val indEnd = Arows(rowCounter + 1) var sum = 0.0 var k = 0 - while (k < xNnz && i < indEnd) { + while (i < indEnd && k < xNnz) { if (xIndices(k) == Acols(i)) { sum += Avals(i) * xValues(k) + k += 1 + i += 1 + } else if (xIndices(k) < Acols(i)) { + k += 1 + } else { i += 1 } - k += 1 } yValues(rowCounter) = sum * alpha + beta * yValues(rowCounter) rowCounter += 1 diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/BLASSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/BLASSuite.scala index 96e5ffef7a13..31c23c870fe9 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/BLASSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/BLASSuite.scala @@ -392,6 +392,23 @@ class BLASSuite extends SparkFunSuite { } } + val y17 = new DenseVector(Array(0.0, 0.0)) + val y18 = y17.copy + + val sA3 = new SparseMatrix(3, 2, Array(0, 2, 4), Array(1, 2, 0, 1), Array(2.0, 1.0, 1.0, 2.0)) + .transpose + val sA4 = + new SparseMatrix(2, 3, Array(0, 1, 3, 4), Array(1, 0, 1, 0), Array(1.0, 2.0, 2.0, 1.0)) + val sx3 = new SparseVector(3, Array(1, 2), Array(2.0, 1.0)) + + val expected4 = new DenseVector(Array(5.0, 4.0)) + + gemv(1.0, sA3, sx3, 0.0, y17) + gemv(1.0, sA4, sx3, 0.0, y18) + + assert(y17 ~== expected4 absTol 1e-15) + assert(y18 ~== expected4 absTol 1e-15) + val dAT = new DenseMatrix(3, 4, Array(0.0, 2.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 3.0)) val sAT = diff --git a/network/common/pom.xml b/network/common/pom.xml index 352a504513a2..670dc3597f9d 100644 --- a/network/common/pom.xml +++ b/network/common/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.11 - 1.6.2-csd-7-SNAPSHOT + 1.6.3-csd-1-SNAPSHOT ../../pom.xml diff --git a/network/shuffle/pom.xml b/network/shuffle/pom.xml index c6df2d83602f..ec7087302b0a 100644 --- a/network/shuffle/pom.xml +++ b/network/shuffle/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.11 - 1.6.2-csd-7-SNAPSHOT + 1.6.3-csd-1-SNAPSHOT ../../pom.xml diff --git a/network/yarn/pom.xml b/network/yarn/pom.xml index 5d682f19366a..b574af6cbb72 100644 --- a/network/yarn/pom.xml +++ b/network/yarn/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.11 - 1.6.2-csd-7-SNAPSHOT + 1.6.3-csd-1-SNAPSHOT ../../pom.xml diff --git a/pom.xml b/pom.xml index 11757c03c4c4..58a907f45675 100644 --- a/pom.xml +++ b/pom.xml @@ -25,7 +25,7 @@ org.apache.spark spark-parent_2.11 - 1.6.2-csd-7-SNAPSHOT + 1.6.3-csd-1-SNAPSHOT pom Spark Project Parent POM http://spark.apache.org/ diff --git a/repl/pom.xml b/repl/pom.xml index d3421063d5a6..e3d8136423cb 100644 --- a/repl/pom.xml +++ b/repl/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.11 - 1.6.2-csd-7-SNAPSHOT + 1.6.3-csd-1-SNAPSHOT ../pom.xml diff --git a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala index 627148df80c1..ed4cab06ab86 100644 --- a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala +++ b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala @@ -36,7 +36,9 @@ object Main extends Logging { "-Yrepl-outdir", s"${outputDir.getAbsolutePath}", "-classpath", getAddedJars.mkString(File.pathSeparator)), true) // the creation of SecurityManager has to be lazy so SPARK_YARN_MODE is set if needed - lazy val classServer = new HttpServer(conf, outputDir, new SecurityManager(conf)) + val classServerPort = conf.getInt("spark.replClassServer.port", 0) + lazy val classServer = + new HttpServer(conf, outputDir, new SecurityManager(conf), classServerPort, "HTTP class server") var sparkContext: SparkContext = _ var sqlContext: SQLContext = _ var interp = new SparkILoop // this is a public var because tests reset it. diff --git a/sql/catalyst/pom.xml b/sql/catalyst/pom.xml index c077f2ecebfe..c1672c08943b 100644 --- a/sql/catalyst/pom.xml +++ b/sql/catalyst/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.11 - 1.6.2-csd-7-SNAPSHOT + 1.6.3-csd-1-SNAPSHOT ../../pom.xml diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala index 8722191f9d3e..9cf400f5d1b2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala @@ -485,6 +485,13 @@ object ScalaReflection extends ScalaReflection { extractorFor(unwrapped, optType, newPath)) } + // Since List[_] also belongs to localTypeOf[Product], we put this case before + // "case t if t <:< localTypeOf[Product]" to make sure it will match to the + // case "localTypeOf[Seq[_]]" + case t if t <:< localTypeOf[Seq[_]] => + val TypeRef(_, _, Seq(elementType)) = t + toCatalystArray(inputObject, elementType) + case t if t <:< localTypeOf[Product] => val params = getConstructorParameters(t) val nonNullOutput = CreateNamedStruct(params.flatMap { case (fieldName, fieldType) => @@ -500,10 +507,6 @@ object ScalaReflection extends ScalaReflection { val TypeRef(_, _, Seq(elementType)) = t toCatalystArray(inputObject, elementType) - case t if t <:< localTypeOf[Seq[_]] => - val TypeRef(_, _, Seq(elementType)) = t - toCatalystArray(inputObject, elementType) - case t if t <:< localTypeOf[Map[_, _]] => val TypeRef(_, _, Seq(keyType, valueType)) = t diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala index c53e84d9ab22..f35be3de9b1b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala @@ -656,7 +656,12 @@ case class Cast(child: Expression, dataType: DataType) private[this] def castToIntervalCode(from: DataType): CastFunction = from match { case StringType => (c, evPrim, evNull) => - s"$evPrim = CalendarInterval.fromString($c.toString());" + s"""$evPrim = CalendarInterval.fromString($c.toString()); + if(${evPrim} == null) { + ${evNull} = true; + } + """.stripMargin + } private[this] def decimalToTimestampCode(d: String): String = diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala index 03c39f8404e7..91eca246bee9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala @@ -658,16 +658,17 @@ case class FromUTCTimestamp(left: Expression, right: Expression) """.stripMargin } else { val tzTerm = ctx.freshName("tz") + val utcTerm = ctx.freshName("utc") val tzClass = classOf[TimeZone].getName ctx.addMutableState(tzClass, tzTerm, s"""$tzTerm = $tzClass.getTimeZone("$tz");""") + ctx.addMutableState(tzClass, utcTerm, s"""$utcTerm = $tzClass.getTimeZone("UTC");""") val eval = left.gen(ctx) s""" |${eval.code} |boolean ${ev.isNull} = ${eval.isNull}; |long ${ev.value} = 0; |if (!${ev.isNull}) { - | ${ev.value} = ${eval.value} + - | ${tzTerm}.getOffset(${eval.value} / 1000) * 1000L; + | ${ev.value} = $dtu.convertTz(${eval.value}, $utcTerm, $tzTerm); |} """.stripMargin } @@ -783,16 +784,17 @@ case class ToUTCTimestamp(left: Expression, right: Expression) """.stripMargin } else { val tzTerm = ctx.freshName("tz") + val utcTerm = ctx.freshName("utc") val tzClass = classOf[TimeZone].getName ctx.addMutableState(tzClass, tzTerm, s"""$tzTerm = $tzClass.getTimeZone("$tz");""") + ctx.addMutableState(tzClass, utcTerm, s"""$utcTerm = $tzClass.getTimeZone("UTC");""") val eval = left.gen(ctx) s""" |${eval.code} |boolean ${ev.isNull} = ${eval.isNull}; |long ${ev.value} = 0; |if (!${ev.isNull}) { - | ${ev.value} = ${eval.value} - - | ${tzTerm}.getOffset(${eval.value} / 1000) * 1000L; + | ${ev.value} = $dtu.convertTz(${eval.value}, $tzTerm, $utcTerm); |} """.stripMargin } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala index 157ac2ba24ca..36fe11cbd2d7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala @@ -55,6 +55,7 @@ object DateTimeUtils { // this is year -17999, calculation: 50 * daysIn400Year final val YearZero = -17999 final val toYearZero = to2001 + 7304850 + final val TimeZoneGMT = TimeZone.getTimeZone("GMT") @transient lazy val defaultTimeZone = TimeZone.getDefault @@ -854,14 +855,38 @@ object DateTimeUtils { guess } + /** + * Convert the timestamp `ts` from one timezone to another. + * + * TODO: Because of DST, the conversion between UTC and human time is not exactly one-to-one + * mapping, the conversion here may return wrong result, we should make the timestamp + * timezone-aware. + */ + def convertTz(ts: SQLTimestamp, fromZone: TimeZone, toZone: TimeZone): SQLTimestamp = { + // We always use local timezone to parse or format a timestamp + val localZone = threadLocalLocalTimeZone.get() + val utcTs = if (fromZone.getID == localZone.getID) { + ts + } else { + // get the human time using local time zone, that actually is in fromZone. + val localTs = ts + localZone.getOffset(ts / 1000L) * 1000L // in fromZone + localTs - getOffsetFromLocalMillis(localTs / 1000L, fromZone) * 1000L + } + if (toZone.getID == localZone.getID) { + utcTs + } else { + val localTs2 = utcTs + toZone.getOffset(utcTs / 1000L) * 1000L // in toZone + // treat it as local timezone, convert to UTC (we could get the expected human time back) + localTs2 - getOffsetFromLocalMillis(localTs2 / 1000L, localZone) * 1000L + } + } + /** * Returns a timestamp of given timezone from utc timestamp, with the same string * representation in their timezone. */ def fromUTCTime(time: SQLTimestamp, timeZone: String): SQLTimestamp = { - val tz = TimeZone.getTimeZone(timeZone) - val offset = tz.getOffset(time / 1000L) - time + offset * 1000L + convertTz(time, TimeZoneGMT, TimeZone.getTimeZone(timeZone)) } /** @@ -869,9 +894,7 @@ object DateTimeUtils { * string representation in their timezone. */ def toUTCTime(time: SQLTimestamp, timeZone: String): SQLTimestamp = { - val tz = TimeZone.getTimeZone(timeZone) - val offset = getOffsetFromLocalMillis(time / 1000L, tz) - time - offset * 1000L + convertTz(time, TimeZone.getTimeZone(timeZone), TimeZoneGMT) } /** diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala index c2aace1ef238..fc5869953534 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala @@ -21,7 +21,9 @@ import java.math.BigInteger import java.sql.{Date, Timestamp} import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.expressions.{BoundReference, Literal, NewInstance} import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String case class PrimitiveData( intField: Int, @@ -229,4 +231,16 @@ class ScalaReflectionSuite extends SparkFunSuite { assert(anyTypes.forall(!_.isPrimitive)) assert(anyTypes === Seq(classOf[java.lang.Object], classOf[java.lang.Object])) } + + test("SPARK-15062: Get correct serializer for List[_]") { + val list = List(1, 2, 3) + val serializer = extractorsFor[List[Int]](BoundReference( + 0, ObjectType(list.getClass), nullable = false)) + assert(serializer.children.size == 2) + assert(serializer.children.head.isInstanceOf[Literal]) + assert(serializer.children.head.asInstanceOf[Literal].value === UTF8String.fromString("value")) + assert(serializer.children.last.isInstanceOf[NewInstance]) + assert(serializer.children.last.asInstanceOf[NewInstance] + .cls.isInstanceOf[Class[org.apache.spark.sql.catalyst.util.GenericArrayData]]) + } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala index ab77a764483e..9a6aaffc7ef2 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala @@ -764,6 +764,7 @@ class CastSuite extends SparkFunSuite with ExpressionEvalHelper { test("cast between string and interval") { import org.apache.spark.unsafe.types.CalendarInterval + checkEvaluation(Cast(Literal(""), CalendarIntervalType), null) checkEvaluation(Cast(Literal("interval -3 month 7 hours"), CalendarIntervalType), new CalendarInterval(-3, 7 * CalendarInterval.MICROS_PER_HOUR)) checkEvaluation(Cast(Literal.create( diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala index 6660453da7d4..5ecbbcad4a36 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala @@ -472,17 +472,23 @@ class DateTimeUtilsSuite extends SparkFunSuite { assert(toJavaTimestamp(fromUTCTime(fromJavaTimestamp(Timestamp.valueOf(utc)), tz)).toString === expected) } - test("2011-12-25 09:00:00.123456", "UTC", "2011-12-25 09:00:00.123456") - test("2011-12-25 09:00:00.123456", "JST", "2011-12-25 18:00:00.123456") - test("2011-12-25 09:00:00.123456", "PST", "2011-12-25 01:00:00.123456") - test("2011-12-25 09:00:00.123456", "Asia/Shanghai", "2011-12-25 17:00:00.123456") - - // Daylight Saving Time - test("2016-03-13 09:59:59.0", "PST", "2016-03-13 01:59:59.0") - test("2016-03-13 10:00:00.0", "PST", "2016-03-13 03:00:00.0") - test("2016-11-06 08:59:59.0", "PST", "2016-11-06 01:59:59.0") - test("2016-11-06 09:00:00.0", "PST", "2016-11-06 01:00:00.0") - test("2016-11-06 10:00:00.0", "PST", "2016-11-06 02:00:00.0") + for (tz <- DateTimeTestUtils.ALL_TIMEZONES) { + DateTimeTestUtils.withDefaultTimeZone(tz) { + test("2011-12-25 09:00:00.123456", "UTC", "2011-12-25 09:00:00.123456") + test("2011-12-25 09:00:00.123456", "JST", "2011-12-25 18:00:00.123456") + test("2011-12-25 09:00:00.123456", "PST", "2011-12-25 01:00:00.123456") + test("2011-12-25 09:00:00.123456", "Asia/Shanghai", "2011-12-25 17:00:00.123456") + } + } + + DateTimeTestUtils.withDefaultTimeZone(TimeZone.getTimeZone("PST")) { + // Daylight Saving Time + test("2016-03-13 09:59:59.0", "PST", "2016-03-13 01:59:59.0") + test("2016-03-13 10:00:00.0", "PST", "2016-03-13 03:00:00.0") + test("2016-11-06 08:59:59.0", "PST", "2016-11-06 01:59:59.0") + test("2016-11-06 09:00:00.0", "PST", "2016-11-06 01:00:00.0") + test("2016-11-06 10:00:00.0", "PST", "2016-11-06 02:00:00.0") + } } test("to UTC timestamp") { @@ -490,21 +496,28 @@ class DateTimeUtilsSuite extends SparkFunSuite { assert(toJavaTimestamp(toUTCTime(fromJavaTimestamp(Timestamp.valueOf(utc)), tz)).toString === expected) } - test("2011-12-25 09:00:00.123456", "UTC", "2011-12-25 09:00:00.123456") - test("2011-12-25 18:00:00.123456", "JST", "2011-12-25 09:00:00.123456") - test("2011-12-25 01:00:00.123456", "PST", "2011-12-25 09:00:00.123456") - test("2011-12-25 17:00:00.123456", "Asia/Shanghai", "2011-12-25 09:00:00.123456") - - // Daylight Saving Time - test("2016-03-13 01:59:59", "PST", "2016-03-13 09:59:59.0") - // 2016-03-13 02:00:00 PST does not exists - test("2016-03-13 02:00:00", "PST", "2016-03-13 10:00:00.0") - test("2016-03-13 03:00:00", "PST", "2016-03-13 10:00:00.0") - test("2016-11-06 00:59:59", "PST", "2016-11-06 07:59:59.0") - // 2016-11-06 01:00:00 PST could be 2016-11-06 08:00:00 UTC or 2016-11-06 09:00:00 UTC - test("2016-11-06 01:00:00", "PST", "2016-11-06 09:00:00.0") - test("2016-11-06 01:59:59", "PST", "2016-11-06 09:59:59.0") - test("2016-11-06 02:00:00", "PST", "2016-11-06 10:00:00.0") + + for (tz <- DateTimeTestUtils.ALL_TIMEZONES) { + DateTimeTestUtils.withDefaultTimeZone(tz) { + test("2011-12-25 09:00:00.123456", "UTC", "2011-12-25 09:00:00.123456") + test("2011-12-25 18:00:00.123456", "JST", "2011-12-25 09:00:00.123456") + test("2011-12-25 01:00:00.123456", "PST", "2011-12-25 09:00:00.123456") + test("2011-12-25 17:00:00.123456", "Asia/Shanghai", "2011-12-25 09:00:00.123456") + } + } + + DateTimeTestUtils.withDefaultTimeZone(TimeZone.getTimeZone("PST")) { + // Daylight Saving Time + test("2016-03-13 01:59:59", "PST", "2016-03-13 09:59:59.0") + // 2016-03-13 02:00:00 PST does not exists + test("2016-03-13 02:00:00", "PST", "2016-03-13 10:00:00.0") + test("2016-03-13 03:00:00", "PST", "2016-03-13 10:00:00.0") + test("2016-11-06 00:59:59", "PST", "2016-11-06 07:59:59.0") + // 2016-11-06 01:00:00 PST could be 2016-11-06 08:00:00 UTC or 2016-11-06 09:00:00 UTC + test("2016-11-06 01:00:00", "PST", "2016-11-06 09:00:00.0") + test("2016-11-06 01:59:59", "PST", "2016-11-06 09:59:59.0") + test("2016-11-06 02:00:00", "PST", "2016-11-06 10:00:00.0") + } } test("daysToMillis and millisToDays") { diff --git a/sql/core/pom.xml b/sql/core/pom.xml index 0fa54b3bd190..3ead0e87d4f8 100644 --- a/sql/core/pom.xml +++ b/sql/core/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.11 - 1.6.2-csd-7-SNAPSHOT + 1.6.3-csd-1-SNAPSHOT ../../pom.xml diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala index fad482b42735..4c9a63305db8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala @@ -165,8 +165,37 @@ private[sql] object JDBCRDD extends Logging { * @return A Catalyst schema corresponding to columns in the given order. */ private def pruneSchema(schema: StructType, columns: Array[String]): StructType = { - val fieldMap = Map(schema.fields map { x => x.metadata.getString("name") -> x }: _*) - new StructType(columns map { name => fieldMap(name) }) + val fieldMap = Map(schema.fields.map(x => x.metadata.getString("name") -> x): _*) + new StructType(columns.map(name => fieldMap(name))) + } + + /** + * Converts value to SQL expression. + */ + private def compileValue(value: Any): Any = value match { + case stringValue: String => s"'${escapeSql(stringValue)}'" + case timestampValue: Timestamp => "'" + timestampValue + "'" + case dateValue: Date => "'" + dateValue + "'" + case _ => value + } + + private def escapeSql(value: String): String = + if (value == null) null else StringUtils.replace(value, "'", "''") + + /** + * Turns a single Filter into a String representing a SQL expression. + * Returns null for an unhandled filter. + */ + private def compileFilter(f: Filter): String = f match { + case EqualTo(attr, value) => s"$attr = ${compileValue(value)}" + case Not(EqualTo(attr, value)) => s"$attr != ${compileValue(value)}" + case LessThan(attr, value) => s"$attr < ${compileValue(value)}" + case GreaterThan(attr, value) => s"$attr > ${compileValue(value)}" + case LessThanOrEqual(attr, value) => s"$attr <= ${compileValue(value)}" + case GreaterThanOrEqual(attr, value) => s"$attr >= ${compileValue(value)}" + case IsNull(attr) => s"$attr IS NULL" + case IsNotNull(attr) => s"$attr IS NOT NULL" + case _ => null } @@ -240,37 +269,12 @@ private[sql] class JDBCRDD( if (sb.length == 0) "1" else sb.substring(1) } - /** - * Converts value to SQL expression. - */ - private def compileValue(value: Any): Any = value match { - case stringValue: String => s"'${escapeSql(stringValue)}'" - case timestampValue: Timestamp => "'" + timestampValue + "'" - case dateValue: Date => "'" + dateValue + "'" - case _ => value - } - - private def escapeSql(value: String): String = - if (value == null) null else StringUtils.replace(value, "'", "''") - - /** - * Turns a single Filter into a String representing a SQL expression. - * Returns null for an unhandled filter. - */ - private def compileFilter(f: Filter): String = f match { - case EqualTo(attr, value) => s"$attr = ${compileValue(value)}" - case LessThan(attr, value) => s"$attr < ${compileValue(value)}" - case GreaterThan(attr, value) => s"$attr > ${compileValue(value)}" - case LessThanOrEqual(attr, value) => s"$attr <= ${compileValue(value)}" - case GreaterThanOrEqual(attr, value) => s"$attr >= ${compileValue(value)}" - case _ => null - } /** * `filters`, but as a WHERE clause suitable for injection into a SQL query. */ private val filterWhereClause: String = { - val filterStrings = filters map compileFilter filter (_ != null) + val filterStrings = filters.map(JDBCRDD.compileFilter).filter(_ != null) if (filterStrings.size > 0) { val sb = new StringBuilder("WHERE ") filterStrings.foreach(x => sb.append(x).append(" AND ")) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 4b389add63c4..e28dc7af1141 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -18,18 +18,22 @@ package org.apache.spark.sql.jdbc import java.math.BigDecimal -import java.sql.DriverManager +import java.sql.{Date, DriverManager, Timestamp} import java.util.{Calendar, GregorianCalendar, Properties} import org.h2.jdbc.JdbcSQLException import org.scalatest.BeforeAndAfter +import org.scalatest.PrivateMethodTester import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ +import org.apache.spark.sql.sources._ import org.apache.spark.util.Utils -class JDBCSuite extends SparkFunSuite with BeforeAndAfter with SharedSQLContext { +class JDBCSuite extends SparkFunSuite + with BeforeAndAfter with PrivateMethodTester with SharedSQLContext { import testImplicits._ val url = "jdbc:h2:mem:testdb0" @@ -427,6 +431,22 @@ class JDBCSuite extends SparkFunSuite with BeforeAndAfter with SharedSQLContext assert(DerbyColumns === Seq(""""abc"""", """"key"""")) } + test("compile filters") { + val compileFilter = PrivateMethod[String]('compileFilter) + def doCompileFilter(f: Filter): String = JDBCRDD invokePrivate compileFilter(f) + assert(doCompileFilter(EqualTo("col0", 3)) === "col0 = 3") + assert(doCompileFilter(Not(EqualTo("col1", "abc"))) === "col1 != 'abc'") + assert(doCompileFilter(LessThan("col0", 5)) === "col0 < 5") + assert(doCompileFilter(LessThan("col3", + Timestamp.valueOf("1995-11-21 00:00:00.0"))) === "col3 < '1995-11-21 00:00:00.0'") + assert(doCompileFilter(LessThan("col4", Date.valueOf("1983-08-04"))) === "col4 < '1983-08-04'") + assert(doCompileFilter(LessThanOrEqual("col0", 5)) === "col0 <= 5") + assert(doCompileFilter(GreaterThan("col0", 3)) === "col0 > 3") + assert(doCompileFilter(GreaterThanOrEqual("col0", 3)) === "col0 >= 3") + assert(doCompileFilter(IsNull("col1")) === "col1 IS NULL") + assert(doCompileFilter(IsNotNull("col1")) === "col1 IS NOT NULL") + } + test("Dialect unregister") { JdbcDialects.registerDialect(testH2Dialect) JdbcDialects.unregisterDialect(testH2Dialect) diff --git a/sql/hive-thriftserver/pom.xml b/sql/hive-thriftserver/pom.xml index d337c569db53..d353f12abde4 100644 --- a/sql/hive-thriftserver/pom.xml +++ b/sql/hive-thriftserver/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.11 - 1.6.2-csd-7-SNAPSHOT + 1.6.3-csd-1-SNAPSHOT ../../pom.xml diff --git a/sql/hive/pom.xml b/sql/hive/pom.xml index 9b8331cb5b6d..55f3145de6b3 100644 --- a/sql/hive/pom.xml +++ b/sql/hive/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.11 - 1.6.2-csd-7-SNAPSHOT + 1.6.3-csd-1-SNAPSHOT ../../pom.xml diff --git a/streaming/pom.xml b/streaming/pom.xml index 135a9233687d..ade1742d6b97 100644 --- a/streaming/pom.xml +++ b/streaming/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.11 - 1.6.2-csd-7-SNAPSHOT + 1.6.3-csd-1-SNAPSHOT ../pom.xml diff --git a/tags/pom.xml b/tags/pom.xml index b504688bdc96..c9cfe6d282cf 100644 --- a/tags/pom.xml +++ b/tags/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.11 - 1.6.2-csd-7-SNAPSHOT + 1.6.3-csd-1-SNAPSHOT ../pom.xml diff --git a/tools/pom.xml b/tools/pom.xml index fd8caee1c654..f1b932887ba9 100644 --- a/tools/pom.xml +++ b/tools/pom.xml @@ -20,7 +20,7 @@ org.apache.spark spark-parent_2.11 - 1.6.2-csd-7-SNAPSHOT + 1.6.3-csd-1-SNAPSHOT ../pom.xml diff --git a/unsafe/pom.xml b/unsafe/pom.xml index 084886b5db87..51c01187e2ad 100644 --- a/unsafe/pom.xml +++ b/unsafe/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.11 - 1.6.2-csd-7-SNAPSHOT + 1.6.3-csd-1-SNAPSHOT ../pom.xml diff --git a/yarn/pom.xml b/yarn/pom.xml index 8ccdabecb731..8e3748f29866 100644 --- a/yarn/pom.xml +++ b/yarn/pom.xml @@ -20,7 +20,7 @@ org.apache.spark spark-parent_2.11 - 1.6.2-csd-7-SNAPSHOT + 1.6.3-csd-1-SNAPSHOT ../pom.xml