diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 57f9faf5ddd1..ee725be39f93 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -175,8 +175,13 @@ class SparkHadoopUtil extends Logging { private def getFileSystemThreadStatistics(path: Path, conf: Configuration): Seq[AnyRef] = { val qualifiedPath = path.getFileSystem(conf).makeQualified(path) val scheme = qualifiedPath.toUri().getScheme() - val stats = FileSystem.getAllStatistics().filter(_.getScheme().equals(scheme)) - stats.map(Utils.invoke(classOf[Statistics], _, "getThreadStatistics")) + if (scheme == null) { + Seq.empty + } else { + FileSystem.getAllStatistics + .filter { stats => scheme.equals(stats.getScheme()) } + .map(Utils.invoke(classOf[Statistics], _, "getThreadStatistics")) + } } private def getFileSystemThreadStatisticsMethod(methodName: String): Method = { diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala index 1ac7f4e448eb..2343e69815b9 100644 --- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala +++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala @@ -17,7 +17,7 @@ package org.apache.spark.io -import java.io.{InputStream, OutputStream} +import java.io.{IOException, InputStream, OutputStream} import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream} import net.jpountz.lz4.{LZ4BlockInputStream, LZ4BlockOutputStream} @@ -122,8 +122,53 @@ class SnappyCompressionCodec(conf: SparkConf) extends CompressionCodec { override def compressedOutputStream(s: OutputStream): OutputStream = { val blockSize = conf.getInt("spark.io.compression.snappy.block.size", 32768) - new SnappyOutputStream(s, blockSize) + new SnappyOutputStreamWrapper(new SnappyOutputStream(s, blockSize)) } override def compressedInputStream(s: InputStream): InputStream = new SnappyInputStream(s) } + +/** + * Wrapper over [[SnappyOutputStream]] which guards against write-after-close and double-close + * issues. See SPARK-7660 for more details. This wrapping can be removed if we upgrade to a version + * of snappy-java that contains the fix for https://github.com/xerial/snappy-java/issues/107. + */ +private final class SnappyOutputStreamWrapper(os: SnappyOutputStream) extends OutputStream { + + private[this] var closed: Boolean = false + + override def write(b: Int): Unit = { + if (closed) { + throw new IOException("Stream is closed") + } + os.write(b) + } + + override def write(b: Array[Byte]): Unit = { + if (closed) { + throw new IOException("Stream is closed") + } + os.write(b) + } + + override def write(b: Array[Byte], off: Int, len: Int): Unit = { + if (closed) { + throw new IOException("Stream is closed") + } + os.write(b, off, len) + } + + override def flush(): Unit = { + if (closed) { + throw new IOException("Stream is closed") + } + os.flush() + } + + override def close(): Unit = { + if (!closed) { + closed = true + os.close() + } + } +} diff --git a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala index ca226fd4e694..1b17c3ad359d 100644 --- a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala +++ b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala @@ -22,6 +22,7 @@ import java.io.{FileWriter, PrintWriter, File} import org.apache.spark.SharedSparkContext import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.scheduler.{SparkListenerTaskEnd, SparkListener} +import org.apache.spark.util.Utils import org.scalatest.FunSuite import org.scalatest.matchers.ShouldMatchers @@ -106,4 +107,23 @@ class InputOutputMetricsSuite extends FunSuite with SharedSparkContext with Shou } } } + + test("getFileSystemThreadStatistics should guard against null schemes (SPARK-8062)") { + val tempDir = Utils.createTempDir() + val outPath = new File(tempDir, "outfile") + + // Intentionally call this method with a null scheme, which will store an entry for a FileSystem + // with a null scheme into Hadoop's global `FileSystem.statisticsTable`. + FileSystem.getStatistics(null, classOf[FileSystem]) + + // Prior to fixing SPARK-8062, this would fail with a NullPointerException in + // SparkHadoopUtil.getFileSystemThreadStatistics + try { + val rdd = sc.parallelize(Array("a", "b", "c", "d"), 2) + rdd.saveAsTextFile(outPath.toString) + sc.textFile(outPath.toString).count() + } finally { + Utils.deleteRecursively(tempDir) + } + } } diff --git a/docs/mllib-collaborative-filtering.md b/docs/mllib-collaborative-filtering.md index ef18cec9371d..57cf04b1f20e 100644 --- a/docs/mllib-collaborative-filtering.md +++ b/docs/mllib-collaborative-filtering.md @@ -102,7 +102,8 @@ other signals), you can use the `trainImplicit` method to get better results. {% highlight scala %} val alpha = 0.01 -val model = ALS.trainImplicit(ratings, rank, numIterations, alpha) +val lambda = 0.01 +val model = ALS.trainImplicit(ratings, rank, numIterations, lambda, alpha) {% endhighlight %} diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkPageRank.scala b/examples/src/main/scala/org/apache/spark/examples/SparkPageRank.scala index 8d092b6506d3..bd7894f184c4 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkPageRank.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkPageRank.scala @@ -51,7 +51,7 @@ object SparkPageRank { showWarning() val sparkConf = new SparkConf().setAppName("PageRank") - val iters = if (args.length > 0) args(1).toInt else 10 + val iters = if (args.length > 1) args(1).toInt else 10 val ctx = new SparkContext(sparkConf) val lines = ctx.textFile(args(0), 1) val links = lines.map{ s => diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/DecisionTreeRunner.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/DecisionTreeRunner.scala index 205d80dd0268..f2586ad8bc30 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/DecisionTreeRunner.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/DecisionTreeRunner.scala @@ -126,7 +126,7 @@ object DecisionTreeRunner { .text(s"input path to test dataset. If given, option fracTest is ignored." + s" default: ${defaultParams.testInput}") .action((x, c) => c.copy(testInput = x)) - opt[String]("") + opt[String]("dataFormat") .text("data format: libsvm (default), dense (deprecated in Spark v1.1)") .action((x, c) => c.copy(dataFormat = x)) arg[String]("") diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/GradientBoostedTreesRunner.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/GradientBoostedTreesRunner.scala index 431ead8c0c16..7b0ded4a4264 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/GradientBoostedTreesRunner.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/GradientBoostedTreesRunner.scala @@ -68,7 +68,7 @@ object GradientBoostedTreesRunner { .text(s"input path to test dataset. If given, option fracTest is ignored." + s" default: ${defaultParams.testInput}") .action((x, c) => c.copy(testInput = x)) - opt[String]("") + opt[String]("dataFormat") .text("data format: libsvm (default), dense (deprecated in Spark v1.1)") .action((x, c) => c.copy(dataFormat = x)) arg[String]("") diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala index 5ab7e1aa58db..9b10ce6dec6e 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala @@ -174,7 +174,7 @@ class StreamingKMeans( /** Set the decay factor directly (for forgetful algorithms). */ def setDecayFactor(a: Double): this.type = { - this.decayFactor = decayFactor + this.decayFactor = a this } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/NumericParser.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/NumericParser.scala index f7cba6c6cb62..629b0af4e63f 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/util/NumericParser.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/util/NumericParser.scala @@ -98,6 +98,8 @@ private[mllib] object NumericParser { } } else if (token == ")") { parsing = false + } else if (token.trim.isEmpty){ + // ignore whitespaces between delim chars, e.g. ", [" } else { // expecting a number items.append(parseDouble(token)) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/StreamingKMeansSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/StreamingKMeansSuite.scala index 850c9fce507c..83ee4a96cc9a 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/StreamingKMeansSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/StreamingKMeansSuite.scala @@ -133,6 +133,13 @@ class StreamingKMeansSuite extends FunSuite with TestSuiteBase { assert(math.abs(c1) ~== 0.8 absTol 0.6) } + test("SPARK-7946 setDecayFactor") { + val kMeans = new StreamingKMeans() + assert(kMeans.decayFactor === 1.0) + kMeans.setDecayFactor(2.0) + assert(kMeans.decayFactor === 2.0) + } + def StreamingKMeansDataGenerator( numPoints: Int, numBatches: Int, diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/LabeledPointSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/LabeledPointSuite.scala index 110c44a7193f..fc08bac3bdbd 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/regression/LabeledPointSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/LabeledPointSuite.scala @@ -32,6 +32,11 @@ class LabeledPointSuite extends FunSuite { } } + test("parse labeled points with whitespaces") { + val point = LabeledPoint.parse("(0.0, [1.0, 2.0])") + assert(point === LabeledPoint(0.0, Vectors.dense(1.0, 2.0))) + } + test("parse labeled points with v0.9 format") { val point = LabeledPoint.parse("1.0,1.0 0.0 -2.0") assert(point === LabeledPoint(1.0, Vectors.dense(1.0, 0.0, -2.0))) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/util/NumericParserSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/util/NumericParserSuite.scala index f68fb95eac4e..5027311db7ff 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/util/NumericParserSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/util/NumericParserSuite.scala @@ -39,4 +39,11 @@ class NumericParserSuite extends FunSuite { } } } + + test("parser with whitespaces") { + val s = "(0.0, [1.0, 2.0])" + val parsed = NumericParser.parse(s).asInstanceOf[Seq[_]] + assert(parsed(0).asInstanceOf[Double] === 0.0) + assert(parsed(1).asInstanceOf[Array[Double]] === Array(1.0, 2.0)) + } } diff --git a/python/pyspark/mllib/__init__.py b/python/pyspark/mllib/__init__.py index c3217620e3c4..d9678122921d 100644 --- a/python/pyspark/mllib/__init__.py +++ b/python/pyspark/mllib/__init__.py @@ -22,7 +22,9 @@ # MLlib currently needs and NumPy 1.4+, so complain if lower import numpy -if numpy.version.version < '1.4': + +ver = [int(x) for x in numpy.version.version.split('.')[:2]] +if ver < [1, 4]: raise Exception("MLlib requires NumPy 1.4+") __all__ = ['classification', 'clustering', 'feature', 'linalg', 'random', diff --git a/sbin/start-master.sh b/sbin/start-master.sh index 17fff58f4f76..a7f5d5702fd8 100755 --- a/sbin/start-master.sh +++ b/sbin/start-master.sh @@ -22,6 +22,8 @@ sbin="`dirname "$0"`" sbin="`cd "$sbin"; pwd`" +ORIGINAL_ARGS="$@" + START_TACHYON=false while (( "$#" )); do @@ -53,7 +55,9 @@ if [ "$SPARK_MASTER_WEBUI_PORT" = "" ]; then SPARK_MASTER_WEBUI_PORT=8080 fi -"$sbin"/spark-daemon.sh start org.apache.spark.deploy.master.Master 1 --ip $SPARK_MASTER_IP --port $SPARK_MASTER_PORT --webui-port $SPARK_MASTER_WEBUI_PORT +"$sbin"/spark-daemon.sh start org.apache.spark.deploy.master.Master 1 \ + --ip $SPARK_MASTER_IP --port $SPARK_MASTER_PORT --webui-port $SPARK_MASTER_WEBUI_PORT \ + $ORIGINAL_ARGS if [ "$START_TACHYON" == "true" ]; then "$sbin"/../tachyon/bin/tachyon bootstrap-conf $SPARK_MASTER_IP diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index 3d39e6c1bd12..a76561756789 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -24,6 +24,7 @@ import org.apache.hadoop.hive.ql.lib.Node import org.apache.hadoop.hive.ql.metadata.Table import org.apache.hadoop.hive.ql.parse._ import org.apache.hadoop.hive.ql.plan.PlanUtils +import org.apache.hadoop.hive.ql.session.SessionState import org.apache.spark.sql.catalyst.SparkSQLParser import org.apache.spark.sql.catalyst.analysis._ @@ -224,12 +225,23 @@ private[hive] object HiveQl { * Otherwise, there will be Null pointer exception, * when retrieving properties form HiveConf. */ - val hContext = new Context(new HiveConf()) + val hContext = new Context(hiveConf) val node = ParseUtils.findRootNonNullToken((new ParseDriver).parse(sql, hContext)) hContext.clear() node } + /** + * Returns the HiveConf + */ + private[this] def hiveConf(): HiveConf = { + val ss = SessionState.get() // SessionState is lazy initializaion, it can be null here + if (ss == null) { + new HiveConf() + } else { + ss.getConf + } + } /** Returns a LogicalPlan for a given HiveQL string. */ def parseSql(sql: String): LogicalPlan = hqlParser(sql)