Did you specify the correct logging directory?
- Please verify your setting of
- spark.history.fs.logDirectory and whether you have the permissions to
- access it.
It is also possible that your application did not run to
- completion or did not stop the SparkContext.
-
+
|
{job.jobId} {job.jobGroup.map(id => s"($id)").getOrElse("")}
|
- {jobTableRow.jobDescription}
+ {jobTableRow.jobDescription} {killLink}
{jobTableRow.lastStageName}
|
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala
index 7b00b558d591a..620c54c2dc0a5 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala
@@ -17,6 +17,8 @@
package org.apache.spark.ui.jobs
+import javax.servlet.http.HttpServletRequest
+
import org.apache.spark.scheduler.SchedulingMode
import org.apache.spark.ui.{SparkUI, SparkUITab}
@@ -35,4 +37,19 @@ private[ui] class JobsTab(parent: SparkUI) extends SparkUITab(parent, "jobs") {
attachPage(new AllJobsPage(this))
attachPage(new JobPage(this))
+
+ def handleKillRequest(request: HttpServletRequest): Unit = {
+ if (killEnabled && parent.securityManager.checkModifyPermissions(request.getRemoteUser)) {
+ val jobId = Option(request.getParameter("id")).map(_.toInt)
+ jobId.foreach { id =>
+ if (jobProgresslistener.activeJobs.contains(id)) {
+ sc.foreach(_.cancelJob(id))
+ // Do a quick pause here to give Spark time to kill the job so it shows up as
+ // killed after the refresh. Note that this will block the serving thread so the
+ // time should be limited in duration.
+ Thread.sleep(100)
+ }
+ }
+ }
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
index 9b9b4681ba5db..c9d0431e2d2f7 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
@@ -353,12 +353,13 @@ private[ui] class StagePagedTable(
val killLinkUri = s"$basePathUri/stages/stage/kill/"
*/
- val killLinkUri = s"$basePathUri/stages/stage/kill/?id=${s.stageId}&terminate=true"
+ val killLinkUri = s"$basePathUri/stages/stage/kill/?id=${s.stageId}"
(kill)
+ } else {
+ Seq.empty
}
val nameLinkUri = s"$basePathUri/stages/stage?id=${s.stageId}&attempt=${s.attemptId}"
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala
index 573192ac17d45..c1f25114371f1 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala
@@ -39,15 +39,16 @@ private[ui] class StagesTab(parent: SparkUI) extends SparkUITab(parent, "stages"
def handleKillRequest(request: HttpServletRequest): Unit = {
if (killEnabled && parent.securityManager.checkModifyPermissions(request.getRemoteUser)) {
- val killFlag = Option(request.getParameter("terminate")).getOrElse("false").toBoolean
- val stageId = Option(request.getParameter("id")).getOrElse("-1").toInt
- if (stageId >= 0 && killFlag && progressListener.activeStages.contains(stageId)) {
- sc.get.cancelStage(stageId)
+ val stageId = Option(request.getParameter("id")).map(_.toInt)
+ stageId.foreach { id =>
+ if (progressListener.activeStages.contains(id)) {
+ sc.foreach(_.cancelStage(id))
+ // Do a quick pause here to give Spark time to kill the stage so it shows up as
+ // killed after the refresh. Note that this will block the serving thread so the
+ // time should be limited in duration.
+ Thread.sleep(100)
+ }
}
- // Do a quick pause here to give Spark time to kill the stage so it shows up as
- // killed after the refresh. Note that this will block the serving thread so the
- // time should be limited in duration.
- Thread.sleep(100)
}
}
diff --git a/core/src/main/scala/org/apache/spark/util/ManualClock.scala b/core/src/main/scala/org/apache/spark/util/ManualClock.scala
index 91a95871014f0..e7a65d74a440e 100644
--- a/core/src/main/scala/org/apache/spark/util/ManualClock.scala
+++ b/core/src/main/scala/org/apache/spark/util/ManualClock.scala
@@ -26,8 +26,6 @@ package org.apache.spark.util
*/
private[spark] class ManualClock(private var time: Long) extends Clock {
- private var _isWaiting = false
-
/**
* @return `ManualClock` with initial time 0
*/
@@ -59,19 +57,9 @@ private[spark] class ManualClock(private var time: Long) extends Clock {
* @return current time reported by the clock when waiting finishes
*/
def waitTillTime(targetTime: Long): Long = synchronized {
- _isWaiting = true
- try {
- while (time < targetTime) {
- wait(10)
- }
- getTimeMillis()
- } finally {
- _isWaiting = false
+ while (time < targetTime) {
+ wait(10)
}
+ getTimeMillis()
}
-
- /**
- * Returns whether there is any thread being blocked in `waitTillTime`.
- */
- def isWaiting: Boolean = synchronized { _isWaiting }
}
diff --git a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
index 5a6dbc830448a..d093e7bfc3dac 100644
--- a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
@@ -194,4 +194,25 @@ private[spark] object ThreadUtils {
throw new SparkException("Exception thrown in awaitResult: ", t)
}
}
+
+ /**
+ * Calls [[Awaitable.result]] directly to avoid using `ForkJoinPool`'s `BlockingContext`, wraps
+ * and re-throws any exceptions with nice stack track.
+ *
+ * Codes running in the user's thread may be in a thread of Scala ForkJoinPool. As concurrent
+ * executions in ForkJoinPool may see some [[ThreadLocal]] value unexpectedly, this method
+ * basically prevents ForkJoinPool from running other tasks in the current waiting thread.
+ */
+ @throws(classOf[SparkException])
+ def awaitResultInForkJoinSafely[T](awaitable: Awaitable[T], atMost: Duration): T = {
+ try {
+ // `awaitPermission` is not actually used anywhere so it's safe to pass in null here.
+ // See SPARK-13747.
+ val awaitPermission = null.asInstanceOf[scala.concurrent.CanAwait]
+ awaitable.result(Duration.Inf)(awaitPermission)
+ } catch {
+ case NonFatal(t) =>
+ throw new SparkException("Exception thrown in awaitResult: ", t)
+ }
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index ef832756ce3b7..6027b07c0fee8 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -27,6 +27,7 @@ import java.nio.file.{Files, Paths}
import java.util.{Locale, Properties, Random, UUID}
import java.util.concurrent._
import java.util.concurrent.atomic.AtomicBoolean
+import java.util.zip.GZIPInputStream
import javax.net.ssl.HttpsURLConnection
import scala.annotation.tailrec
@@ -38,6 +39,7 @@ import scala.reflect.ClassTag
import scala.util.Try
import scala.util.control.{ControlThrowable, NonFatal}
+import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
import com.google.common.io.{ByteStreams, Files => GFiles}
import com.google.common.net.InetAddresses
import org.apache.commons.lang3.SystemUtils
@@ -55,6 +57,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.{DYN_ALLOCATION_INITIAL_EXECUTORS, DYN_ALLOCATION_MIN_EXECUTORS, EXECUTOR_INSTANCES}
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance}
+import org.apache.spark.util.logging.RollingFileAppender
/** CallSite represents a place in user code. It can have a short and a long form. */
private[spark] case class CallSite(shortForm: String, longForm: String)
@@ -1440,14 +1443,72 @@ private[spark] object Utils extends Logging {
CallSite(shortForm, longForm)
}
+ private val UNCOMPRESSED_LOG_FILE_LENGTH_CACHE_SIZE_CONF =
+ "spark.worker.ui.compressedLogFileLengthCacheSize"
+ private val DEFAULT_UNCOMPRESSED_LOG_FILE_LENGTH_CACHE_SIZE = 100
+ private var compressedLogFileLengthCache: LoadingCache[String, java.lang.Long] = null
+ private def getCompressedLogFileLengthCache(
+ sparkConf: SparkConf): LoadingCache[String, java.lang.Long] = this.synchronized {
+ if (compressedLogFileLengthCache == null) {
+ val compressedLogFileLengthCacheSize = sparkConf.getInt(
+ UNCOMPRESSED_LOG_FILE_LENGTH_CACHE_SIZE_CONF,
+ DEFAULT_UNCOMPRESSED_LOG_FILE_LENGTH_CACHE_SIZE)
+ compressedLogFileLengthCache = CacheBuilder.newBuilder()
+ .maximumSize(compressedLogFileLengthCacheSize)
+ .build[String, java.lang.Long](new CacheLoader[String, java.lang.Long]() {
+ override def load(path: String): java.lang.Long = {
+ Utils.getCompressedFileLength(new File(path))
+ }
+ })
+ }
+ compressedLogFileLengthCache
+ }
+
+ /**
+ * Return the file length, if the file is compressed it returns the uncompressed file length.
+ * It also caches the uncompressed file size to avoid repeated decompression. The cache size is
+ * read from workerConf.
+ */
+ def getFileLength(file: File, workConf: SparkConf): Long = {
+ if (file.getName.endsWith(".gz")) {
+ getCompressedLogFileLengthCache(workConf).get(file.getAbsolutePath)
+ } else {
+ file.length
+ }
+ }
+
+ /** Return uncompressed file length of a compressed file. */
+ private def getCompressedFileLength(file: File): Long = {
+ try {
+ // Uncompress .gz file to determine file size.
+ var fileSize = 0L
+ val gzInputStream = new GZIPInputStream(new FileInputStream(file))
+ val bufSize = 1024
+ val buf = new Array[Byte](bufSize)
+ var numBytes = ByteStreams.read(gzInputStream, buf, 0, bufSize)
+ while (numBytes > 0) {
+ fileSize += numBytes
+ numBytes = ByteStreams.read(gzInputStream, buf, 0, bufSize)
+ }
+ fileSize
+ } catch {
+ case e: Throwable =>
+ logError(s"Cannot get file length of ${file}", e)
+ throw e
+ }
+ }
+
/** Return a string containing part of a file from byte 'start' to 'end'. */
- def offsetBytes(path: String, start: Long, end: Long): String = {
+ def offsetBytes(path: String, length: Long, start: Long, end: Long): String = {
val file = new File(path)
- val length = file.length()
val effectiveEnd = math.min(length, end)
val effectiveStart = math.max(0, start)
val buff = new Array[Byte]((effectiveEnd-effectiveStart).toInt)
- val stream = new FileInputStream(file)
+ val stream = if (path.endsWith(".gz")) {
+ new GZIPInputStream(new FileInputStream(file))
+ } else {
+ new FileInputStream(file)
+ }
try {
ByteStreams.skipFully(stream, effectiveStart)
@@ -1463,8 +1524,8 @@ private[spark] object Utils extends Logging {
* and `endIndex` is based on the cumulative size of all the files take in
* the given order. See figure below for more details.
*/
- def offsetBytes(files: Seq[File], start: Long, end: Long): String = {
- val fileLengths = files.map { _.length }
+ def offsetBytes(files: Seq[File], fileLengths: Seq[Long], start: Long, end: Long): String = {
+ assert(files.length == fileLengths.length)
val startIndex = math.max(start, 0)
val endIndex = math.min(end, fileLengths.sum)
val fileToLength = files.zip(fileLengths).toMap
@@ -1472,7 +1533,7 @@ private[spark] object Utils extends Logging {
val stringBuffer = new StringBuffer((endIndex - startIndex).toInt)
var sum = 0L
- for (file <- files) {
+ files.zip(fileLengths).foreach { case (file, fileLength) =>
val startIndexOfFile = sum
val endIndexOfFile = sum + fileToLength(file)
logDebug(s"Processing file $file, " +
@@ -1491,19 +1552,19 @@ private[spark] object Utils extends Logging {
if (startIndex <= startIndexOfFile && endIndex >= endIndexOfFile) {
// Case C: read the whole file
- stringBuffer.append(offsetBytes(file.getAbsolutePath, 0, fileToLength(file)))
+ stringBuffer.append(offsetBytes(file.getAbsolutePath, fileLength, 0, fileToLength(file)))
} else if (startIndex > startIndexOfFile && startIndex < endIndexOfFile) {
// Case A and B: read from [start of required range] to [end of file / end of range]
val effectiveStartIndex = startIndex - startIndexOfFile
val effectiveEndIndex = math.min(endIndex - startIndexOfFile, fileToLength(file))
stringBuffer.append(Utils.offsetBytes(
- file.getAbsolutePath, effectiveStartIndex, effectiveEndIndex))
+ file.getAbsolutePath, fileLength, effectiveStartIndex, effectiveEndIndex))
} else if (endIndex > startIndexOfFile && endIndex < endIndexOfFile) {
// Case D: read from [start of file] to [end of require range]
val effectiveStartIndex = math.max(startIndex - startIndexOfFile, 0)
val effectiveEndIndex = endIndex - startIndexOfFile
stringBuffer.append(Utils.offsetBytes(
- file.getAbsolutePath, effectiveStartIndex, effectiveEndIndex))
+ file.getAbsolutePath, fileLength, effectiveStartIndex, effectiveEndIndex))
}
sum += fileToLength(file)
logDebug(s"After processing file $file, string built is ${stringBuffer.toString}")
@@ -1698,6 +1759,22 @@ private[spark] object Utils extends Logging {
count
}
+ /**
+ * Generate a zipWithIndex iterator, avoid index value overflowing problem
+ * in scala's zipWithIndex
+ */
+ def getIteratorZipWithIndex[T](iterator: Iterator[T], startIndex: Long): Iterator[(T, Long)] = {
+ new Iterator[(T, Long)] {
+ require(startIndex >= 0, "startIndex should be >= 0.")
+ var index: Long = startIndex - 1L
+ def hasNext: Boolean = iterator.hasNext
+ def next(): (T, Long) = {
+ index += 1L
+ (iterator.next(), index)
+ }
+ }
+ }
+
/**
* Creates a symlink.
*
@@ -2432,6 +2509,26 @@ private[spark] object Utils extends Logging {
}
}
+private[util] object CallerContext extends Logging {
+ val callerContextSupported: Boolean = {
+ SparkHadoopUtil.get.conf.getBoolean("hadoop.caller.context.enabled", false) && {
+ try {
+ // scalastyle:off classforname
+ Class.forName("org.apache.hadoop.ipc.CallerContext")
+ Class.forName("org.apache.hadoop.ipc.CallerContext$Builder")
+ // scalastyle:on classforname
+ true
+ } catch {
+ case _: ClassNotFoundException =>
+ false
+ case NonFatal(e) =>
+ logWarning("Fail to load the CallerContext class", e)
+ false
+ }
+ }
+ }
+}
+
/**
* An utility class used to set up Spark caller contexts to HDFS and Yarn. The `context` will be
* constructed by parameters passed in.
@@ -2478,21 +2575,21 @@ private[spark] class CallerContext(
* Set up the caller context [[context]] by invoking Hadoop CallerContext API of
* [[org.apache.hadoop.ipc.CallerContext]], which was added in hadoop 2.8.
*/
- def setCurrentContext(): Boolean = {
- var succeed = false
- try {
- // scalastyle:off classforname
- val callerContext = Class.forName("org.apache.hadoop.ipc.CallerContext")
- val Builder = Class.forName("org.apache.hadoop.ipc.CallerContext$Builder")
- // scalastyle:on classforname
- val builderInst = Builder.getConstructor(classOf[String]).newInstance(context)
- val hdfsContext = Builder.getMethod("build").invoke(builderInst)
- callerContext.getMethod("setCurrent", callerContext).invoke(null, hdfsContext)
- succeed = true
- } catch {
- case NonFatal(e) => logInfo("Fail to set Spark caller context", e)
+ def setCurrentContext(): Unit = {
+ if (CallerContext.callerContextSupported) {
+ try {
+ // scalastyle:off classforname
+ val callerContext = Class.forName("org.apache.hadoop.ipc.CallerContext")
+ val builder = Class.forName("org.apache.hadoop.ipc.CallerContext$Builder")
+ // scalastyle:on classforname
+ val builderInst = builder.getConstructor(classOf[String]).newInstance(context)
+ val hdfsContext = builder.getMethod("build").invoke(builderInst)
+ callerContext.getMethod("setCurrent", callerContext).invoke(null, hdfsContext)
+ } catch {
+ case NonFatal(e) =>
+ logWarning("Fail to set Spark caller context", e)
+ }
}
- succeed
}
}
diff --git a/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala b/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala
index a0eb05c7c0e82..5d8cec8447b53 100644
--- a/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala
+++ b/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala
@@ -17,9 +17,11 @@
package org.apache.spark.util.logging
-import java.io.{File, FileFilter, InputStream}
+import java.io._
+import java.util.zip.GZIPOutputStream
import com.google.common.io.Files
+import org.apache.commons.io.IOUtils
import org.apache.spark.SparkConf
@@ -45,6 +47,7 @@ private[spark] class RollingFileAppender(
import RollingFileAppender._
private val maxRetainedFiles = conf.getInt(RETAINED_FILES_PROPERTY, -1)
+ private val enableCompression = conf.getBoolean(ENABLE_COMPRESSION, false)
/** Stop the appender */
override def stop() {
@@ -76,6 +79,33 @@ private[spark] class RollingFileAppender(
}
}
+ // Roll the log file and compress if enableCompression is true.
+ private def rotateFile(activeFile: File, rolloverFile: File): Unit = {
+ if (enableCompression) {
+ val gzFile = new File(rolloverFile.getAbsolutePath + GZIP_LOG_SUFFIX)
+ var gzOutputStream: GZIPOutputStream = null
+ var inputStream: InputStream = null
+ try {
+ inputStream = new FileInputStream(activeFile)
+ gzOutputStream = new GZIPOutputStream(new FileOutputStream(gzFile))
+ IOUtils.copy(inputStream, gzOutputStream)
+ inputStream.close()
+ gzOutputStream.close()
+ activeFile.delete()
+ } finally {
+ IOUtils.closeQuietly(inputStream)
+ IOUtils.closeQuietly(gzOutputStream)
+ }
+ } else {
+ Files.move(activeFile, rolloverFile)
+ }
+ }
+
+ // Check if the rollover file already exists.
+ private def rolloverFileExist(file: File): Boolean = {
+ file.exists || new File(file.getAbsolutePath + GZIP_LOG_SUFFIX).exists
+ }
+
/** Move the active log file to a new rollover file */
private def moveFile() {
val rolloverSuffix = rollingPolicy.generateRolledOverFileSuffix()
@@ -83,8 +113,8 @@ private[spark] class RollingFileAppender(
activeFile.getParentFile, activeFile.getName + rolloverSuffix).getAbsoluteFile
logDebug(s"Attempting to rollover file $activeFile to file $rolloverFile")
if (activeFile.exists) {
- if (!rolloverFile.exists) {
- Files.move(activeFile, rolloverFile)
+ if (!rolloverFileExist(rolloverFile)) {
+ rotateFile(activeFile, rolloverFile)
logInfo(s"Rolled over $activeFile to $rolloverFile")
} else {
// In case the rollover file name clashes, make a unique file name.
@@ -97,11 +127,11 @@ private[spark] class RollingFileAppender(
altRolloverFile = new File(activeFile.getParent,
s"${activeFile.getName}$rolloverSuffix--$i").getAbsoluteFile
i += 1
- } while (i < 10000 && altRolloverFile.exists)
+ } while (i < 10000 && rolloverFileExist(altRolloverFile))
logWarning(s"Rollover file $rolloverFile already exists, " +
s"rolled over $activeFile to file $altRolloverFile")
- Files.move(activeFile, altRolloverFile)
+ rotateFile(activeFile, altRolloverFile)
}
} else {
logWarning(s"File $activeFile does not exist")
@@ -142,6 +172,9 @@ private[spark] object RollingFileAppender {
val SIZE_DEFAULT = (1024 * 1024).toString
val RETAINED_FILES_PROPERTY = "spark.executor.logs.rolling.maxRetainedFiles"
val DEFAULT_BUFFER_SIZE = 8192
+ val ENABLE_COMPRESSION = "spark.executor.logs.rolling.enableCompression"
+
+ val GZIP_LOG_SUFFIX = ".gz"
/**
* Get the sorted list of rolled over files. This assumes that the all the rolled
@@ -158,6 +191,6 @@ private[spark] object RollingFileAppender {
val file = new File(directory, activeFileName).getAbsoluteFile
if (file.exists) Some(file) else None
}
- rolledOverFiles ++ activeFile
+ rolledOverFiles.sortBy(_.getName.stripSuffix(GZIP_LOG_SUFFIX)) ++ activeFile
}
}
diff --git a/core/src/test/java/org/apache/spark/io/NioBufferedFileInputStreamSuite.java b/core/src/test/java/org/apache/spark/io/NioBufferedFileInputStreamSuite.java
new file mode 100644
index 0000000000000..2c1a34a607592
--- /dev/null
+++ b/core/src/test/java/org/apache/spark/io/NioBufferedFileInputStreamSuite.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.RandomUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests functionality of {@link NioBufferedFileInputStream}
+ */
+public class NioBufferedFileInputStreamSuite {
+
+ private byte[] randomBytes;
+
+ private File inputFile;
+
+ @Before
+ public void setUp() throws IOException {
+ // Create a byte array of size 2 MB with random bytes
+ randomBytes = RandomUtils.nextBytes(2 * 1024 * 1024);
+ inputFile = File.createTempFile("temp-file", ".tmp");
+ FileUtils.writeByteArrayToFile(inputFile, randomBytes);
+ }
+
+ @After
+ public void tearDown() {
+ inputFile.delete();
+ }
+
+ @Test
+ public void testReadOneByte() throws IOException {
+ InputStream inputStream = new NioBufferedFileInputStream(inputFile);
+ for (int i = 0; i < randomBytes.length; i++) {
+ assertEquals(randomBytes[i], (byte) inputStream.read());
+ }
+ }
+
+ @Test
+ public void testReadMultipleBytes() throws IOException {
+ InputStream inputStream = new NioBufferedFileInputStream(inputFile);
+ byte[] readBytes = new byte[8 * 1024];
+ int i = 0;
+ while (i < randomBytes.length) {
+ int read = inputStream.read(readBytes, 0, 8 * 1024);
+ for (int j = 0; j < read; j++) {
+ assertEquals(randomBytes[i], readBytes[j]);
+ i++;
+ }
+ }
+ }
+
+ @Test
+ public void testBytesSkipped() throws IOException {
+ InputStream inputStream = new NioBufferedFileInputStream(inputFile);
+ assertEquals(1024, inputStream.skip(1024));
+ for (int i = 1024; i < randomBytes.length; i++) {
+ assertEquals(randomBytes[i], (byte) inputStream.read());
+ }
+ }
+
+ @Test
+ public void testBytesSkippedAfterRead() throws IOException {
+ InputStream inputStream = new NioBufferedFileInputStream(inputFile);
+ for (int i = 0; i < 1024; i++) {
+ assertEquals(randomBytes[i], (byte) inputStream.read());
+ }
+ assertEquals(1024, inputStream.skip(1024));
+ for (int i = 2048; i < randomBytes.length; i++) {
+ assertEquals(randomBytes[i], (byte) inputStream.read());
+ }
+ }
+
+ @Test
+ public void testNegativeBytesSkippedAfterRead() throws IOException {
+ InputStream inputStream = new NioBufferedFileInputStream(inputFile);
+ for (int i = 0; i < 1024; i++) {
+ assertEquals(randomBytes[i], (byte) inputStream.read());
+ }
+ // Skipping negative bytes should essential be a no-op
+ assertEquals(0, inputStream.skip(-1));
+ assertEquals(0, inputStream.skip(-1024));
+ assertEquals(0, inputStream.skip(Long.MIN_VALUE));
+ assertEquals(1024, inputStream.skip(1024));
+ for (int i = 2048; i < randomBytes.length; i++) {
+ assertEquals(randomBytes[i], (byte) inputStream.read());
+ }
+ }
+
+ @Test
+ public void testSkipFromFileChannel() throws IOException {
+ InputStream inputStream = new NioBufferedFileInputStream(inputFile, 10);
+ // Since the buffer is smaller than the skipped bytes, this will guarantee
+ // we skip from underlying file channel.
+ assertEquals(1024, inputStream.skip(1024));
+ for (int i = 1024; i < 2048; i++) {
+ assertEquals(randomBytes[i], (byte) inputStream.read());
+ }
+ assertEquals(256, inputStream.skip(256));
+ assertEquals(256, inputStream.skip(256));
+ assertEquals(512, inputStream.skip(512));
+ for (int i = 3072; i < randomBytes.length; i++) {
+ assertEquals(randomBytes[i], (byte) inputStream.read());
+ }
+ }
+
+ @Test
+ public void testBytesSkippedAfterEOF() throws IOException {
+ InputStream inputStream = new NioBufferedFileInputStream(inputFile);
+ assertEquals(randomBytes.length, inputStream.skip(randomBytes.length + 1));
+ assertEquals(-1, inputStream.read());
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
index 9f94e36324536..b117c7709b46f 100644
--- a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
+++ b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
@@ -500,7 +500,7 @@ class CheckpointSuite extends SparkFunSuite with RDDCheckpointTester with LocalS
}
runTest("CheckpointRDD with zero partitions") { reliableCheckpoint: Boolean =>
- val rdd = new BlockRDD[Int](sc, Array[BlockId]())
+ val rdd = new BlockRDD[Int](sc, Array.empty[BlockId])
assert(rdd.partitions.size === 0)
assert(rdd.isCheckpointed === false)
assert(rdd.isCheckpointedAndMaterialized === false)
diff --git a/core/src/test/scala/org/apache/spark/SSLOptionsSuite.scala b/core/src/test/scala/org/apache/spark/SSLOptionsSuite.scala
index 159b448e05b02..2b8b1805bc83f 100644
--- a/core/src/test/scala/org/apache/spark/SSLOptionsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SSLOptionsSuite.scala
@@ -79,7 +79,7 @@ class SSLOptionsSuite extends SparkFunSuite with BeforeAndAfterAll {
conf.set("spark.ssl.protocol", "SSLv3")
val defaultOpts = SSLOptions.parse(conf, "spark.ssl", defaults = None)
- val opts = SSLOptions.parse(conf, "spark.ui.ssl", defaults = Some(defaultOpts))
+ val opts = SSLOptions.parse(conf, "spark.ssl.ui", defaults = Some(defaultOpts))
assert(opts.enabled === true)
assert(opts.trustStore.isDefined === true)
@@ -102,20 +102,20 @@ class SSLOptionsSuite extends SparkFunSuite with BeforeAndAfterAll {
val conf = new SparkConf
conf.set("spark.ssl.enabled", "true")
- conf.set("spark.ui.ssl.enabled", "false")
+ conf.set("spark.ssl.ui.enabled", "false")
conf.set("spark.ssl.keyStore", keyStorePath)
conf.set("spark.ssl.keyStorePassword", "password")
- conf.set("spark.ui.ssl.keyStorePassword", "12345")
+ conf.set("spark.ssl.ui.keyStorePassword", "12345")
conf.set("spark.ssl.keyPassword", "password")
conf.set("spark.ssl.trustStore", trustStorePath)
conf.set("spark.ssl.trustStorePassword", "password")
conf.set("spark.ssl.enabledAlgorithms",
"TLS_RSA_WITH_AES_128_CBC_SHA, TLS_RSA_WITH_AES_256_CBC_SHA")
- conf.set("spark.ui.ssl.enabledAlgorithms", "ABC, DEF")
+ conf.set("spark.ssl.ui.enabledAlgorithms", "ABC, DEF")
conf.set("spark.ssl.protocol", "SSLv3")
val defaultOpts = SSLOptions.parse(conf, "spark.ssl", defaults = None)
- val opts = SSLOptions.parse(conf, "spark.ui.ssl", defaults = Some(defaultOpts))
+ val opts = SSLOptions.parse(conf, "spark.ssl.ui", defaults = Some(defaultOpts))
assert(opts.enabled === false)
assert(opts.trustStore.isDefined === true)
diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
index 2d48e75cfbd96..7093dad05c5f6 100644
--- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
@@ -65,7 +65,7 @@ class JsonProtocolSuite extends SparkFunSuite with JsonTestUtils {
test("writeMasterState") {
val workers = Array(createWorkerInfo(), createWorkerInfo())
val activeApps = Array(createAppInfo())
- val completedApps = Array[ApplicationInfo]()
+ val completedApps = Array.empty[ApplicationInfo]
val activeDrivers = Array(createDriverInfo())
val completedDrivers = Array(createDriverInfo())
val stateResponse = new MasterStateResponse(
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index 732cbfaaeea46..7c649e305a37e 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -91,7 +91,7 @@ class SparkSubmitSuite
// scalastyle:off println
test("prints usage on empty input") {
- testPrematureExit(Array[String](), "Usage: spark-submit")
+ testPrematureExit(Array.empty[String], "Usage: spark-submit")
}
test("prints usage with only --help") {
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerArgumentsSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerArgumentsSuite.scala
index 34f27ecaa07a3..de321db845a66 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerArgumentsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerArgumentsSuite.scala
@@ -33,7 +33,7 @@ class HistoryServerArgumentsSuite extends SparkFunSuite {
.set("spark.testing", "true")
test("No Arguments Parsing") {
- val argStrings = Array[String]()
+ val argStrings = Array.empty[String]
val hsa = new HistoryServerArguments(conf, argStrings)
assert(conf.get("spark.history.fs.logDirectory") === logDir.getAbsolutePath)
assert(conf.get("spark.history.fs.updateInterval") === "1")
diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/ui/LogPageSuite.scala b/core/src/test/scala/org/apache/spark/deploy/worker/ui/LogPageSuite.scala
index 72eaffb416981..4c3e96777940d 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/ui/LogPageSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/ui/LogPageSuite.scala
@@ -22,16 +22,20 @@ import java.io.{File, FileWriter}
import org.mockito.Mockito.{mock, when}
import org.scalatest.PrivateMethodTester
-import org.apache.spark.SparkFunSuite
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.worker.Worker
class LogPageSuite extends SparkFunSuite with PrivateMethodTester {
test("get logs simple") {
val webui = mock(classOf[WorkerWebUI])
+ val worker = mock(classOf[Worker])
val tmpDir = new File(sys.props("java.io.tmpdir"))
val workDir = new File(tmpDir, "work-dir")
workDir.mkdir()
when(webui.workDir).thenReturn(workDir)
+ when(webui.worker).thenReturn(worker)
+ when(worker.conf).thenReturn(new SparkConf())
val logPage = new LogPage(webui)
// Prepare some fake log files to read later
diff --git a/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala b/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala
index 38b48a4c9e654..3b798e36b0499 100644
--- a/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala
+++ b/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala
@@ -57,7 +57,7 @@ class ChunkedByteBufferSuite extends SparkFunSuite {
}
test("toArray()") {
- val empty = ByteBuffer.wrap(Array[Byte]())
+ val empty = ByteBuffer.wrap(Array.empty[Byte])
val bytes = ByteBuffer.wrap(Array.tabulate(8)(_.toByte))
val chunkedByteBuffer = new ChunkedByteBuffer(Array(bytes, bytes, empty))
assert(chunkedByteBuffer.toArray === bytes.array() ++ bytes.array())
@@ -74,7 +74,7 @@ class ChunkedByteBufferSuite extends SparkFunSuite {
}
test("toInputStream()") {
- val empty = ByteBuffer.wrap(Array[Byte]())
+ val empty = ByteBuffer.wrap(Array.empty[Byte])
val bytes1 = ByteBuffer.wrap(Array.tabulate(256)(_.toByte))
val bytes2 = ByteBuffer.wrap(Array.tabulate(128)(_.toByte))
val chunkedByteBuffer = new ChunkedByteBuffer(Array(empty, bytes1, bytes2))
diff --git a/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala b/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala
index 87600fe504b98..a757041299411 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala
@@ -33,16 +33,21 @@ object FakeTask {
* locations for each task (given as varargs) if this sequence is not empty.
*/
def createTaskSet(numTasks: Int, prefLocs: Seq[TaskLocation]*): TaskSet = {
- createTaskSet(numTasks, 0, prefLocs: _*)
+ createTaskSet(numTasks, stageAttemptId = 0, prefLocs: _*)
}
def createTaskSet(numTasks: Int, stageAttemptId: Int, prefLocs: Seq[TaskLocation]*): TaskSet = {
+ createTaskSet(numTasks, stageId = 0, stageAttemptId, prefLocs: _*)
+ }
+
+ def createTaskSet(numTasks: Int, stageId: Int, stageAttemptId: Int, prefLocs: Seq[TaskLocation]*):
+ TaskSet = {
if (prefLocs.size != 0 && prefLocs.size != numTasks) {
throw new IllegalArgumentException("Wrong number of task locations")
}
val tasks = Array.tabulate[Task[_]](numTasks) { i =>
- new FakeTask(0, i, if (prefLocs.size != 0) prefLocs(i) else Nil)
+ new FakeTask(stageId, i, if (prefLocs.size != 0) prefLocs(i) else Nil)
}
- new TaskSet(tasks, 0, stageAttemptId, 0, null)
+ new TaskSet(tasks, stageId, stageAttemptId, priority = 0, null)
}
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index 69edcf3347243..1b1a764ceff95 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -261,14 +261,14 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
assert(manager.resourceOffer("exec1", "host1", PROCESS_LOCAL) == None)
clock.advance(LOCALITY_WAIT_MS)
- // Offer host1, exec1 again, at NODE_LOCAL level: the node local (task 2) should
+ // Offer host1, exec1 again, at NODE_LOCAL level: the node local (task 3) should
// get chosen before the noPref task
assert(manager.resourceOffer("exec1", "host1", NODE_LOCAL).get.index == 2)
- // Offer host2, exec3 again, at NODE_LOCAL level: we should choose task 2
+ // Offer host2, exec2, at NODE_LOCAL level: we should choose task 2
assert(manager.resourceOffer("exec2", "host2", NODE_LOCAL).get.index == 1)
- // Offer host2, exec3 again, at NODE_LOCAL level: we should get noPref task
+ // Offer host2, exec2 again, at NODE_LOCAL level: we should get noPref task
// after failing to find a node_Local task
assert(manager.resourceOffer("exec2", "host2", NODE_LOCAL) == None)
clock.advance(LOCALITY_WAIT_MS)
@@ -904,7 +904,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
task.index == index && !sched.endedTasks.contains(task.taskId)
}.getOrElse {
throw new RuntimeException(s"couldn't find index $index in " +
- s"tasks: ${tasks.map{t => t.index -> t.taskId}} with endedTasks:" +
+ s"tasks: ${tasks.map { t => t.index -> t.taskId }} with endedTasks:" +
s" ${sched.endedTasks.keys}")
}
}
@@ -974,6 +974,24 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
assert(manager.isZombie)
}
+ test("SPARK-17894: Verify TaskSetManagers for different stage attempts have unique names") {
+ sc = new SparkContext("local", "test")
+ sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
+ val taskSet = FakeTask.createTaskSet(numTasks = 1, stageId = 0, stageAttemptId = 0)
+ val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, new ManualClock)
+ assert(manager.name === "TaskSet_0.0")
+
+ // Make sure a task set with the same stage ID but different attempt ID has a unique name
+ val taskSet2 = FakeTask.createTaskSet(numTasks = 1, stageId = 0, stageAttemptId = 1)
+ val manager2 = new TaskSetManager(sched, taskSet2, MAX_TASK_FAILURES, new ManualClock)
+ assert(manager2.name === "TaskSet_0.1")
+
+ // Make sure a task set with the same attempt ID but different stage ID also has a unique name
+ val taskSet3 = FakeTask.createTaskSet(numTasks = 1, stageId = 1, stageAttemptId = 1)
+ val manager3 = new TaskSetManager(sched, taskSet3, MAX_TASK_FAILURES, new ManualClock)
+ assert(manager3.name === "TaskSet_1.1")
+ }
+
private def createTaskResult(
id: Int,
accumUpdates: Seq[AccumulatorV2[_, _]] = Seq.empty): DirectTaskResult[Int] = {
diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoBenchmark.scala b/core/src/test/scala/org/apache/spark/serializer/KryoBenchmark.scala
new file mode 100644
index 0000000000000..64be966276140
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/serializer/KryoBenchmark.scala
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.serializer
+
+import scala.reflect.ClassTag
+import scala.util.Random
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.serializer.KryoTest._
+import org.apache.spark.util.Benchmark
+
+class KryoBenchmark extends SparkFunSuite {
+ val benchmark = new Benchmark("Benchmark Kryo Unsafe vs safe Serialization", 1024 * 1024 * 15, 10)
+
+ ignore(s"Benchmark Kryo Unsafe vs safe Serialization") {
+ Seq (true, false).foreach (runBenchmark)
+ benchmark.run()
+
+ // scalastyle:off
+ /*
+ Benchmark Kryo Unsafe vs safe Serialization: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ ------------------------------------------------------------------------------------------------
+ basicTypes: Int with unsafe:true 151 / 170 104.2 9.6 1.0X
+ basicTypes: Long with unsafe:true 175 / 191 89.8 11.1 0.9X
+ basicTypes: Float with unsafe:true 177 / 184 88.8 11.3 0.9X
+ basicTypes: Double with unsafe:true 193 / 216 81.4 12.3 0.8X
+ Array: Int with unsafe:true 513 / 587 30.7 32.6 0.3X
+ Array: Long with unsafe:true 1211 / 1358 13.0 77.0 0.1X
+ Array: Float with unsafe:true 890 / 964 17.7 56.6 0.2X
+ Array: Double with unsafe:true 1335 / 1428 11.8 84.9 0.1X
+ Map of string->Double with unsafe:true 931 / 988 16.9 59.2 0.2X
+ basicTypes: Int with unsafe:false 197 / 217 79.9 12.5 0.8X
+ basicTypes: Long with unsafe:false 219 / 240 71.8 13.9 0.7X
+ basicTypes: Float with unsafe:false 208 / 217 75.7 13.2 0.7X
+ basicTypes: Double with unsafe:false 208 / 225 75.6 13.2 0.7X
+ Array: Int with unsafe:false 2559 / 2681 6.1 162.7 0.1X
+ Array: Long with unsafe:false 3425 / 3516 4.6 217.8 0.0X
+ Array: Float with unsafe:false 2025 / 2134 7.8 128.7 0.1X
+ Array: Double with unsafe:false 2241 / 2358 7.0 142.5 0.1X
+ Map of string->Double with unsafe:false 1044 / 1085 15.1 66.4 0.1X
+ */
+ // scalastyle:on
+ }
+
+ private def runBenchmark(useUnsafe: Boolean): Unit = {
+ def check[T: ClassTag](t: T, ser: SerializerInstance): Int = {
+ if (ser.deserialize[T](ser.serialize(t)) === t) 1 else 0
+ }
+
+ // Benchmark Primitives
+ val basicTypeCount = 1000000
+ def basicTypes[T: ClassTag](name: String, gen: () => T): Unit = {
+ lazy val ser = createSerializer(useUnsafe)
+ val arrayOfBasicType: Array[T] = Array.fill(basicTypeCount)(gen())
+
+ benchmark.addCase(s"basicTypes: $name with unsafe:$useUnsafe") { _ =>
+ var sum = 0L
+ var i = 0
+ while (i < basicTypeCount) {
+ sum += check(arrayOfBasicType(i), ser)
+ i += 1
+ }
+ sum
+ }
+ }
+ basicTypes("Int", Random.nextInt)
+ basicTypes("Long", Random.nextLong)
+ basicTypes("Float", Random.nextFloat)
+ basicTypes("Double", Random.nextDouble)
+
+ // Benchmark Array of Primitives
+ val arrayCount = 10000
+ def basicTypeArray[T: ClassTag](name: String, gen: () => T): Unit = {
+ lazy val ser = createSerializer(useUnsafe)
+ val arrayOfArrays: Array[Array[T]] =
+ Array.fill(arrayCount)(Array.fill[T](Random.nextInt(arrayCount))(gen()))
+
+ benchmark.addCase(s"Array: $name with unsafe:$useUnsafe") { _ =>
+ var sum = 0L
+ var i = 0
+ while (i < arrayCount) {
+ val arr = arrayOfArrays(i)
+ sum += check(arr, ser)
+ i += 1
+ }
+ sum
+ }
+ }
+ basicTypeArray("Int", Random.nextInt)
+ basicTypeArray("Long", Random.nextLong)
+ basicTypeArray("Float", Random.nextFloat)
+ basicTypeArray("Double", Random.nextDouble)
+
+ // Benchmark Maps
+ val mapsCount = 1000
+ lazy val ser = createSerializer(useUnsafe)
+ val arrayOfMaps: Array[Map[String, Double]] = Array.fill(mapsCount) {
+ Array.fill(Random.nextInt(mapsCount)) {
+ (Random.nextString(mapsCount / 10), Random.nextDouble())
+ }.toMap
+ }
+
+ benchmark.addCase(s"Map of string->Double with unsafe:$useUnsafe") { _ =>
+ var sum = 0L
+ var i = 0
+ while (i < mapsCount) {
+ val map = arrayOfMaps(i)
+ sum += check(map, ser)
+ i += 1
+ }
+ sum
+ }
+ }
+
+ def createSerializer(useUnsafe: Boolean): SerializerInstance = {
+ val conf = new SparkConf()
+ conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+ conf.set("spark.kryo.registrator", classOf[MyRegistrator].getName)
+ conf.set("spark.kryo.unsafe", useUnsafe.toString)
+
+ new KryoSerializer(conf).newInstance()
+ }
+
+}
diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
index 57a82312008e9..5040841811054 100644
--- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
@@ -36,6 +36,7 @@ import org.apache.spark.util.Utils
class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.kryo.registrator", classOf[MyRegistrator].getName)
+ conf.set("spark.kryo.unsafe", "false")
test("SPARK-7392 configuration limits") {
val kryoBufferProperty = "spark.kryoserializer.buffer"
@@ -100,7 +101,7 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
check(Array("aaa", "bbb", null))
check(Array(true, false, true))
check(Array('a', 'b', 'c'))
- check(Array[Int]())
+ check(Array.empty[Int])
check(Array(Array("1", "2"), Array("1", "2", "3", "4")))
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameSuite.scala b/core/src/test/scala/org/apache/spark/serializer/UnsafeKryoSerializerSuite.scala
similarity index 54%
rename from sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameSuite.scala
rename to core/src/test/scala/org/apache/spark/serializer/UnsafeKryoSerializerSuite.scala
index 96e9054cd4876..d63a45ae4a6a9 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/UnsafeKryoSerializerSuite.scala
@@ -15,23 +15,19 @@
* limitations under the License.
*/
-package org.apache.spark.sql.hive
+package org.apache.spark.serializer
-import org.apache.spark.sql.hive.test.TestHiveSingleton
-import org.apache.spark.sql.QueryTest
+class UnsafeKryoSerializerSuite extends KryoSerializerSuite {
-class HiveDataFrameSuite extends QueryTest with TestHiveSingleton {
- test("table name with schema") {
- // regression test for SPARK-11778
- spark.sql("create schema usrdb")
- spark.sql("create table usrdb.test(c int)")
- spark.read.table("usrdb.test")
- spark.sql("drop table usrdb.test")
- spark.sql("drop schema usrdb")
+ // This test suite should run all tests in KryoSerializerSuite with kryo unsafe.
+
+ override def beforeAll() {
+ conf.set("spark.kryo.unsafe", "true")
+ super.beforeAll()
}
- test("SPARK-15887: hive-site.xml should be loaded") {
- val hiveClient = spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
- assert(hiveClient.getConf("hive.in.test", "") == "true")
+ override def afterAll() {
+ conf.set("spark.kryo.unsafe", "false")
+ super.afterAll()
}
}
diff --git a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
index fd12a21b7927e..e5d408a167361 100644
--- a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
@@ -194,6 +194,22 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
sc.parallelize(1 to 10).map{x => Thread.sleep(10000); x}.countAsync()
}
+ withSpark(newSparkContext(killEnabled = true)) { sc =>
+ runSlowJob(sc)
+ eventually(timeout(5 seconds), interval(50 milliseconds)) {
+ goToUi(sc, "/jobs")
+ assert(hasKillLink)
+ }
+ }
+
+ withSpark(newSparkContext(killEnabled = false)) { sc =>
+ runSlowJob(sc)
+ eventually(timeout(5 seconds), interval(50 milliseconds)) {
+ goToUi(sc, "/jobs")
+ assert(!hasKillLink)
+ }
+ }
+
withSpark(newSparkContext(killEnabled = true)) { sc =>
runSlowJob(sc)
eventually(timeout(5 seconds), interval(50 milliseconds)) {
@@ -453,20 +469,24 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
}
test("kill stage POST/GET response is correct") {
- def getResponseCode(url: URL, method: String): Int = {
- val connection = url.openConnection().asInstanceOf[HttpURLConnection]
- connection.setRequestMethod(method)
- connection.connect()
- val code = connection.getResponseCode()
- connection.disconnect()
- code
+ withSpark(newSparkContext(killEnabled = true)) { sc =>
+ sc.parallelize(1 to 10).map{x => Thread.sleep(10000); x}.countAsync()
+ eventually(timeout(5 seconds), interval(50 milliseconds)) {
+ val url = new URL(
+ sc.ui.get.appUIAddress.stripSuffix("/") + "/stages/stage/kill/?id=0")
+ // SPARK-6846: should be POST only but YARN AM doesn't proxy POST
+ getResponseCode(url, "GET") should be (200)
+ getResponseCode(url, "POST") should be (200)
+ }
}
+ }
+ test("kill job POST/GET response is correct") {
withSpark(newSparkContext(killEnabled = true)) { sc =>
sc.parallelize(1 to 10).map{x => Thread.sleep(10000); x}.countAsync()
eventually(timeout(5 seconds), interval(50 milliseconds)) {
val url = new URL(
- sc.ui.get.appUIAddress.stripSuffix("/") + "/stages/stage/kill/?id=0&terminate=true")
+ sc.ui.get.appUIAddress.stripSuffix("/") + "/jobs/job/kill/?id=0")
// SPARK-6846: should be POST only but YARN AM doesn't proxy POST
getResponseCode(url, "GET") should be (200)
getResponseCode(url, "POST") should be (200)
@@ -651,6 +671,17 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
}
}
+ def getResponseCode(url: URL, method: String): Int = {
+ val connection = url.openConnection().asInstanceOf[HttpURLConnection]
+ connection.setRequestMethod(method)
+ try {
+ connection.connect()
+ connection.getResponseCode()
+ } finally {
+ connection.disconnect()
+ }
+ }
+
def goToUi(sc: SparkContext, path: String): Unit = {
goToUi(sc.ui.get, path)
}
diff --git a/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala
index 4fa9f9a8f590f..7e2da8e141532 100644
--- a/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala
@@ -20,11 +20,13 @@ package org.apache.spark.util
import java.io._
import java.nio.charset.StandardCharsets
import java.util.concurrent.CountDownLatch
+import java.util.zip.GZIPInputStream
import scala.collection.mutable.HashSet
import scala.reflect._
import com.google.common.io.Files
+import org.apache.commons.io.IOUtils
import org.apache.log4j.{Appender, Level, Logger}
import org.apache.log4j.spi.LoggingEvent
import org.mockito.ArgumentCaptor
@@ -72,6 +74,25 @@ class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging {
testRolling(appender, testOutputStream, textToAppend, rolloverIntervalMillis)
}
+ test("rolling file appender - time-based rolling (compressed)") {
+ // setup input stream and appender
+ val testOutputStream = new PipedOutputStream()
+ val testInputStream = new PipedInputStream(testOutputStream, 100 * 1000)
+ val rolloverIntervalMillis = 100
+ val durationMillis = 1000
+ val numRollovers = durationMillis / rolloverIntervalMillis
+ val textToAppend = (1 to numRollovers).map( _.toString * 10 )
+
+ val sparkConf = new SparkConf()
+ sparkConf.set("spark.executor.logs.rolling.enableCompression", "true")
+ val appender = new RollingFileAppender(testInputStream, testFile,
+ new TimeBasedRollingPolicy(rolloverIntervalMillis, s"--HH-mm-ss-SSSS", false),
+ sparkConf, 10)
+
+ testRolling(
+ appender, testOutputStream, textToAppend, rolloverIntervalMillis, isCompressed = true)
+ }
+
test("rolling file appender - size-based rolling") {
// setup input stream and appender
val testOutputStream = new PipedOutputStream()
@@ -89,6 +110,25 @@ class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging {
}
}
+ test("rolling file appender - size-based rolling (compressed)") {
+ // setup input stream and appender
+ val testOutputStream = new PipedOutputStream()
+ val testInputStream = new PipedInputStream(testOutputStream, 100 * 1000)
+ val rolloverSize = 1000
+ val textToAppend = (1 to 3).map( _.toString * 1000 )
+
+ val sparkConf = new SparkConf()
+ sparkConf.set("spark.executor.logs.rolling.enableCompression", "true")
+ val appender = new RollingFileAppender(testInputStream, testFile,
+ new SizeBasedRollingPolicy(rolloverSize, false), sparkConf, 99)
+
+ val files = testRolling(appender, testOutputStream, textToAppend, 0, isCompressed = true)
+ files.foreach { file =>
+ logInfo(file.toString + ": " + file.length + " bytes")
+ assert(file.length < rolloverSize)
+ }
+ }
+
test("rolling file appender - cleaning") {
// setup input stream and appender
val testOutputStream = new PipedOutputStream()
@@ -273,7 +313,8 @@ class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging {
appender: FileAppender,
outputStream: OutputStream,
textToAppend: Seq[String],
- sleepTimeBetweenTexts: Long
+ sleepTimeBetweenTexts: Long,
+ isCompressed: Boolean = false
): Seq[File] = {
// send data to appender through the input stream, and wait for the data to be written
val expectedText = textToAppend.mkString("")
@@ -290,10 +331,23 @@ class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging {
// verify whether all the data written to rolled over files is same as expected
val generatedFiles = RollingFileAppender.getSortedRolledOverFiles(
testFile.getParentFile.toString, testFile.getName)
- logInfo("Filtered files: \n" + generatedFiles.mkString("\n"))
+ logInfo("Generate files: \n" + generatedFiles.mkString("\n"))
assert(generatedFiles.size > 1)
+ if (isCompressed) {
+ assert(
+ generatedFiles.filter(_.getName.endsWith(RollingFileAppender.GZIP_LOG_SUFFIX)).size > 0)
+ }
val allText = generatedFiles.map { file =>
- Files.toString(file, StandardCharsets.UTF_8)
+ if (file.getName.endsWith(RollingFileAppender.GZIP_LOG_SUFFIX)) {
+ val inputStream = new GZIPInputStream(new FileInputStream(file))
+ try {
+ IOUtils.toString(inputStream, StandardCharsets.UTF_8)
+ } finally {
+ IOUtils.closeQuietly(inputStream)
+ }
+ } else {
+ Files.toString(file, StandardCharsets.UTF_8)
+ }
}.mkString("")
assert(allText === expectedText)
generatedFiles
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index bc28b2d9cb831..15ef32f21d90c 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -25,11 +25,13 @@ import java.nio.charset.StandardCharsets
import java.text.DecimalFormatSymbols
import java.util.Locale
import java.util.concurrent.TimeUnit
+import java.util.zip.GZIPOutputStream
import scala.collection.mutable.ListBuffer
import scala.util.Random
import com.google.common.io.Files
+import org.apache.commons.io.IOUtils
import org.apache.commons.lang3.SystemUtils
import org.apache.commons.math3.stat.inference.ChiSquareTest
import org.apache.hadoop.conf.Configuration
@@ -274,65 +276,109 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
assert(str(10 * hour + 59 * minute + 59 * second + 999) === "11" + sep + "00 h")
}
- test("reading offset bytes of a file") {
+ def getSuffix(isCompressed: Boolean): String = {
+ if (isCompressed) {
+ ".gz"
+ } else {
+ ""
+ }
+ }
+
+ def writeLogFile(path: String, content: Array[Byte]): Unit = {
+ val outputStream = if (path.endsWith(".gz")) {
+ new GZIPOutputStream(new FileOutputStream(path))
+ } else {
+ new FileOutputStream(path)
+ }
+ IOUtils.write(content, outputStream)
+ outputStream.close()
+ content.size
+ }
+
+ private val workerConf = new SparkConf()
+
+ def testOffsetBytes(isCompressed: Boolean): Unit = {
val tmpDir2 = Utils.createTempDir()
- val f1Path = tmpDir2 + "/f1"
- val f1 = new FileOutputStream(f1Path)
- f1.write("1\n2\n3\n4\n5\n6\n7\n8\n9\n".getBytes(StandardCharsets.UTF_8))
- f1.close()
+ val suffix = getSuffix(isCompressed)
+ val f1Path = tmpDir2 + "/f1" + suffix
+ writeLogFile(f1Path, "1\n2\n3\n4\n5\n6\n7\n8\n9\n".getBytes(StandardCharsets.UTF_8))
+ val f1Length = Utils.getFileLength(new File(f1Path), workerConf)
// Read first few bytes
- assert(Utils.offsetBytes(f1Path, 0, 5) === "1\n2\n3")
+ assert(Utils.offsetBytes(f1Path, f1Length, 0, 5) === "1\n2\n3")
// Read some middle bytes
- assert(Utils.offsetBytes(f1Path, 4, 11) === "3\n4\n5\n6")
+ assert(Utils.offsetBytes(f1Path, f1Length, 4, 11) === "3\n4\n5\n6")
// Read last few bytes
- assert(Utils.offsetBytes(f1Path, 12, 18) === "7\n8\n9\n")
+ assert(Utils.offsetBytes(f1Path, f1Length, 12, 18) === "7\n8\n9\n")
// Read some nonexistent bytes in the beginning
- assert(Utils.offsetBytes(f1Path, -5, 5) === "1\n2\n3")
+ assert(Utils.offsetBytes(f1Path, f1Length, -5, 5) === "1\n2\n3")
// Read some nonexistent bytes at the end
- assert(Utils.offsetBytes(f1Path, 12, 22) === "7\n8\n9\n")
+ assert(Utils.offsetBytes(f1Path, f1Length, 12, 22) === "7\n8\n9\n")
// Read some nonexistent bytes on both ends
- assert(Utils.offsetBytes(f1Path, -3, 25) === "1\n2\n3\n4\n5\n6\n7\n8\n9\n")
+ assert(Utils.offsetBytes(f1Path, f1Length, -3, 25) === "1\n2\n3\n4\n5\n6\n7\n8\n9\n")
Utils.deleteRecursively(tmpDir2)
}
- test("reading offset bytes across multiple files") {
+ test("reading offset bytes of a file") {
+ testOffsetBytes(isCompressed = false)
+ }
+
+ test("reading offset bytes of a file (compressed)") {
+ testOffsetBytes(isCompressed = true)
+ }
+
+ def testOffsetBytesMultipleFiles(isCompressed: Boolean): Unit = {
val tmpDir = Utils.createTempDir()
- val files = (1 to 3).map(i => new File(tmpDir, i.toString))
- Files.write("0123456789", files(0), StandardCharsets.UTF_8)
- Files.write("abcdefghij", files(1), StandardCharsets.UTF_8)
- Files.write("ABCDEFGHIJ", files(2), StandardCharsets.UTF_8)
+ val suffix = getSuffix(isCompressed)
+ val files = (1 to 3).map(i => new File(tmpDir, i.toString + suffix)) :+ new File(tmpDir, "4")
+ writeLogFile(files(0).getAbsolutePath, "0123456789".getBytes(StandardCharsets.UTF_8))
+ writeLogFile(files(1).getAbsolutePath, "abcdefghij".getBytes(StandardCharsets.UTF_8))
+ writeLogFile(files(2).getAbsolutePath, "ABCDEFGHIJ".getBytes(StandardCharsets.UTF_8))
+ writeLogFile(files(3).getAbsolutePath, "9876543210".getBytes(StandardCharsets.UTF_8))
+ val fileLengths = files.map(Utils.getFileLength(_, workerConf))
// Read first few bytes in the 1st file
- assert(Utils.offsetBytes(files, 0, 5) === "01234")
+ assert(Utils.offsetBytes(files, fileLengths, 0, 5) === "01234")
// Read bytes within the 1st file
- assert(Utils.offsetBytes(files, 5, 8) === "567")
+ assert(Utils.offsetBytes(files, fileLengths, 5, 8) === "567")
// Read bytes across 1st and 2nd file
- assert(Utils.offsetBytes(files, 8, 18) === "89abcdefgh")
+ assert(Utils.offsetBytes(files, fileLengths, 8, 18) === "89abcdefgh")
// Read bytes across 1st, 2nd and 3rd file
- assert(Utils.offsetBytes(files, 5, 24) === "56789abcdefghijABCD")
+ assert(Utils.offsetBytes(files, fileLengths, 5, 24) === "56789abcdefghijABCD")
+
+ // Read bytes across 3rd and 4th file
+ assert(Utils.offsetBytes(files, fileLengths, 25, 35) === "FGHIJ98765")
// Read some nonexistent bytes in the beginning
- assert(Utils.offsetBytes(files, -5, 18) === "0123456789abcdefgh")
+ assert(Utils.offsetBytes(files, fileLengths, -5, 18) === "0123456789abcdefgh")
// Read some nonexistent bytes at the end
- assert(Utils.offsetBytes(files, 18, 35) === "ijABCDEFGHIJ")
+ assert(Utils.offsetBytes(files, fileLengths, 18, 45) === "ijABCDEFGHIJ9876543210")
// Read some nonexistent bytes on both ends
- assert(Utils.offsetBytes(files, -5, 35) === "0123456789abcdefghijABCDEFGHIJ")
+ assert(Utils.offsetBytes(files, fileLengths, -5, 45) ===
+ "0123456789abcdefghijABCDEFGHIJ9876543210")
Utils.deleteRecursively(tmpDir)
}
+ test("reading offset bytes across multiple files") {
+ testOffsetBytesMultipleFiles(isCompressed = false)
+ }
+
+ test("reading offset bytes across multiple files (compressed)") {
+ testOffsetBytesMultipleFiles(isCompressed = true)
+ }
+
test("deserialize long value") {
val testval : Long = 9730889947L
val bbuf = ByteBuffer.allocate(8)
@@ -350,6 +396,16 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
assert(Utils.getIteratorSize(iterator) === 5L)
}
+ test("getIteratorZipWithIndex") {
+ val iterator = Utils.getIteratorZipWithIndex(Iterator(0, 1, 2), -1L + Int.MaxValue)
+ assert(iterator.toArray === Array(
+ (0, -1L + Int.MaxValue), (1, 0L + Int.MaxValue), (2, 1L + Int.MaxValue)
+ ))
+ intercept[IllegalArgumentException] {
+ Utils.getIteratorZipWithIndex(Iterator(0, 1, 2), -1L)
+ }
+ }
+
test("doesDirectoryContainFilesNewerThan") {
// create some temporary directories and files
val parent: File = Utils.createTempDir()
@@ -790,14 +846,11 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
test("Set Spark CallerContext") {
val context = "test"
- try {
+ new CallerContext(context).setCurrentContext()
+ if (CallerContext.callerContextSupported) {
val callerContext = Utils.classForName("org.apache.hadoop.ipc.CallerContext")
- assert(new CallerContext(context).setCurrentContext())
assert(s"SPARK_$context" ===
callerContext.getMethod("getCurrent").invoke(null).toString)
- } catch {
- case e: ClassNotFoundException =>
- assert(!new CallerContext(context).setCurrentContext())
}
}
diff --git a/dev/deps/spark-deps-hadoop-2.7 b/dev/deps/spark-deps-hadoop-2.7
index 2a568cc8010db..dfe4eb9f8bc65 100644
--- a/dev/deps/spark-deps-hadoop-2.7
+++ b/dev/deps/spark-deps-hadoop-2.7
@@ -40,7 +40,7 @@ commons-digester-1.8.jar
commons-httpclient-3.1.jar
commons-io-2.4.jar
commons-lang-2.6.jar
-commons-lang3-3.3.2.jar
+commons-lang3-3.5.jar
commons-logging-1.1.3.jar
commons-math3-3.4.1.jar
commons-net-2.2.jar
@@ -157,7 +157,7 @@ parquet-jackson-1.9.0-palantir2.jar
pmml-model-1.2.15.jar
pmml-schema-1.2.15.jar
protobuf-java-2.5.0.jar
-py4j-0.10.3.jar
+py4j-0.10.4.jar
pyrolite-4.13.jar
scala-compiler-2.11.8.jar
scala-library-2.11.8.jar
diff --git a/docs/building-spark.md b/docs/building-spark.md
index f5acee6b90059..ebe46a42a15c6 100644
--- a/docs/building-spark.md
+++ b/docs/building-spark.md
@@ -217,9 +217,8 @@ For help in setting up IntelliJ IDEA or Eclipse for Spark development, and troub
Tests are run by default via the [ScalaTest Maven plugin](http://www.scalatest.org/user_guide/using_the_scalatest_maven_plugin).
Note that tests should not be run as root or an admin user.
-Some of the tests require Spark to be packaged first, so always run `mvn package` with `-DskipTests` the first time. The following is an example of a correct (build, test) sequence:
+The following is an example of a command to run the tests:
- ./build/mvn -Pyarn -Phadoop-2.3 -DskipTests -Phive -Phive-thriftserver clean package
./build/mvn -Pyarn -Phadoop-2.3 -Phive -Phive-thriftserver test
The ScalaTest plugin also supports running only a specific Scala test suite as follows:
@@ -233,9 +232,8 @@ or a Java test:
## Testing with SBT
-Some of the tests require Spark to be packaged first, so always run `build/sbt package` the first time. The following is an example of a correct (build, test) sequence:
+The following is an example of a command to run the tests:
- ./build/sbt -Pyarn -Phadoop-2.3 -Phive -Phive-thriftserver package
./build/sbt -Pyarn -Phadoop-2.3 -Phive -Phive-thriftserver test
To run only a specific test suite as follows:
diff --git a/docs/configuration.md b/docs/configuration.md
index 373e22d71a872..780fc94908d38 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -293,6 +293,14 @@ Apart from these, the following properties are also available, and may be useful
Older log files will be deleted. Disabled by default.
|
+
{% highlight r %}
@@ -591,3 +591,7 @@ You can inspect the search path in R with [`search()`](https://stat.ethz.ch/R-ma
- The method `registerTempTable` has been deprecated to be replaced by `createOrReplaceTempView`.
- The method `dropTempTable` has been deprecated to be replaced by `dropTempView`.
- The `sc` SparkContext parameter is no longer required for these functions: `setJobGroup`, `clearJobGroup`, `cancelJobGroup`
+
+## Upgrading to SparkR 2.1.0
+
+ - `join` no longer performs Cartesian Product by default, use `crossJoin` instead.
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index d0f43ab0a9cc9..b9be7a7545ef8 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -140,7 +140,7 @@ As an example, the following creates a DataFrame based on the content of a JSON
## Untyped Dataset Operations (aka DataFrame Operations)
-DataFrames provide a domain-specific language for structured data manipulation in [Scala](api/scala/index.html#org.apache.spark.sql.Dataset), [Java](api/java/index.html?org/apache/spark/sql/Dataset.html), [Python](api/python/pyspark.sql.html#pyspark.sql.DataFrame) and [R](api/R/DataFrame.html).
+DataFrames provide a domain-specific language for structured data manipulation in [Scala](api/scala/index.html#org.apache.spark.sql.Dataset), [Java](api/java/index.html?org/apache/spark/sql/Dataset.html), [Python](api/python/pyspark.sql.html#pyspark.sql.DataFrame) and [R](api/R/SparkDataFrame.html).
As mentioned above, in Spark 2.0, DataFrames are just Dataset of `Row`s in Scala and Java API. These operations are also referred as "untyped transformations" in contrast to "typed transformations" come with strongly typed Scala/Java Datasets.
@@ -316,7 +316,7 @@ Serializable and has getters and setters for all of its fields.
Spark SQL can convert an RDD of Row objects to a DataFrame, inferring the datatypes. Rows are constructed by passing a list of
key/value pairs as kwargs to the Row class. The keys of this list define the column names of the table,
-and the types are inferred by sampling the whole datase, similar to the inference that is performed on JSON files.
+and the types are inferred by sampling the whole dataset, similar to the inference that is performed on JSON files.
{% include_example schema_inferring python/sql/basic.py %}
@@ -422,8 +422,8 @@ In the simplest form, the default data source (`parquet` unless otherwise config
You can also manually specify the data source that will be used along with any extra options
that you would like to pass to the data source. Data sources are specified by their fully qualified
name (i.e., `org.apache.spark.sql.parquet`), but for built-in sources you can also use their short
-names (`json`, `parquet`, `jdbc`). DataFrames loaded from any data source type can be converted into other types
-using this syntax.
+names (`json`, `parquet`, `jdbc`, `orc`, `libsvm`, `csv`, `text`). DataFrames loaded from any data
+source type can be converted into other types using this syntax.
+ // Import dependencies and create kafka params as in Create Direct Stream above
+
+ OffsetRange[] offsetRanges = {
+ // topic, partition, inclusive starting offset, exclusive ending offset
+ OffsetRange.create("test", 0, 0, 100),
+ OffsetRange.create("test", 1, 0, 100)
+ };
+
+ JavaRDD> rdd = KafkaUtils.createRDD(
+ sparkContext,
+ kafkaParams,
+ offsetRanges,
+ LocationStrategies.PreferConsistent()
+ );
@@ -102,6 +153,20 @@ Note that you cannot use `PreferBrokers`, because without the stream there is no
}
+ // The details depend on your data store, but the general idea looks like this
+
+ // begin from the the offsets committed to the database
+ Map fromOffsets = new HashMap<>();
+ for (resultSet : selectOffsetsFromYourDatabase)
+ fromOffsets.put(new TopicPartition(resultSet.string("topic"), resultSet.int("partition")), resultSet.long("offset"));
+ }
+
+ JavaInputDStream> stream = KafkaUtils.createDirectStream(
+ streamingContext,
+ LocationStrategies.PreferConsistent(),
+ ConsumerStrategies.Assign(fromOffsets.keySet(), kafkaParams, fromOffsets)
+ );
+
+ stream.foreachRDD(new VoidFunction>>() {
+ @Override
+ public void call(JavaRDD> rdd) {
+ OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
+
+ Object results = yourCalculation(rdd);
+
+ // begin your transaction
+
+ // update results
+ // update offsets where the end of existing offsets matches the beginning of this batch of offsets
+ // assert that offsets were updated correctly
+
+ // end your transaction
+ }
+ });
@@ -184,6 +288,14 @@ The new Kafka consumer [supports SSL](http://kafka.apache.org/documentation.html
)