Skip to content

Commit 621267b

Browse files
committed
Merge branch 'master' of github.com:apache/spark into configure-ports
2 parents 8a6b820 + 82624e2 commit 621267b

File tree

22 files changed

+377
-174
lines changed

22 files changed

+377
-174
lines changed

core/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@
150150
<dependency>
151151
<groupId>org.json4s</groupId>
152152
<artifactId>json4s-jackson_${scala.binary.version}</artifactId>
153-
<version>3.2.6</version>
153+
<version>3.2.10</version>
154154
</dependency>
155155
<dependency>
156156
<groupId>colt</groupId>

core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -96,17 +96,23 @@ class JdbcRDD[T: ClassTag](
9696

9797
override def close() {
9898
try {
99-
if (null != rs && ! rs.isClosed()) rs.close()
99+
if (null != rs && ! rs.isClosed()) {
100+
rs.close()
101+
}
100102
} catch {
101103
case e: Exception => logWarning("Exception closing resultset", e)
102104
}
103105
try {
104-
if (null != stmt && ! stmt.isClosed()) stmt.close()
106+
if (null != stmt && ! stmt.isClosed()) {
107+
stmt.close()
108+
}
105109
} catch {
106110
case e: Exception => logWarning("Exception closing statement", e)
107111
}
108112
try {
109-
if (null != conn && ! stmt.isClosed()) conn.close()
113+
if (null != conn && ! conn.isClosed()) {
114+
conn.close()
115+
}
110116
logInfo("closed connection")
111117
} catch {
112118
case e: Exception => logWarning("Exception closing connection", e)
@@ -120,3 +126,4 @@ object JdbcRDD {
120126
Array.tabulate[Object](rs.getMetaData.getColumnCount)(i => rs.getObject(i + 1))
121127
}
122128
}
129+

core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ private[spark] class SortShuffleWriter[K, V, C](
4040
private val ser = Serializer.getSerializer(dep.serializer.orNull)
4141

4242
private val conf = SparkEnv.get.conf
43-
private val fileBufferSize = conf.getInt("spark.shuffle.file.buffer.kb", 100) * 1024
43+
private val fileBufferSize = conf.getInt("spark.shuffle.file.buffer.kb", 32) * 1024
4444

4545
private var sorter: ExternalSorter[K, V, _] = null
4646
private var outputFile: File = null

core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging {
7373
val sortBasedShuffle =
7474
conf.get("spark.shuffle.manager", "") == classOf[SortShuffleManager].getName
7575

76-
private val bufferSize = conf.getInt("spark.shuffle.file.buffer.kb", 100) * 1024
76+
private val bufferSize = conf.getInt("spark.shuffle.file.buffer.kb", 32) * 1024
7777

7878
/**
7979
* Contains all the state related to a particular shuffle. This includes a pool of unused

core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ class ExternalAppendOnlyMap[K, V, C](
101101
private var _memoryBytesSpilled = 0L
102102
private var _diskBytesSpilled = 0L
103103

104-
private val fileBufferSize = sparkConf.getInt("spark.shuffle.file.buffer.kb", 100) * 1024
104+
private val fileBufferSize = sparkConf.getInt("spark.shuffle.file.buffer.kb", 32) * 1024
105105
private val keyComparator = new HashComparator[K]
106106
private val ser = serializer.newInstance()
107107

core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ private[spark] class ExternalSorter[K, V, C](
8484

8585
private val conf = SparkEnv.get.conf
8686
private val spillingEnabled = conf.getBoolean("spark.shuffle.spill", true)
87-
private val fileBufferSize = conf.getInt("spark.shuffle.file.buffer.kb", 100) * 1024
87+
private val fileBufferSize = conf.getInt("spark.shuffle.file.buffer.kb", 32) * 1024
8888

8989
// Size of object batches when reading/writing from serializers.
9090
//

docs/configuration.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,7 @@ Apart from these, the following properties are also available, and may be useful
266266
</tr>
267267
<tr>
268268
<td><code>spark.shuffle.file.buffer.kb</code></td>
269-
<td>100</td>
269+
<td>32</td>
270270
<td>
271271
Size of the in-memory buffer for each shuffle file output stream, in kilobytes. These buffers
272272
reduce the number of disk seeks and system calls made in creating intermediate shuffle files.

mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala

Lines changed: 40 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,7 @@ class PythonMLLibAPI extends Serializable {
271271
.setNumIterations(numIterations)
272272
.setRegParam(regParam)
273273
.setStepSize(stepSize)
274+
.setMiniBatchFraction(miniBatchFraction)
274275
if (regType == "l2") {
275276
lrAlg.optimizer.setUpdater(new SquaredL2Updater)
276277
} else if (regType == "l1") {
@@ -341,16 +342,27 @@ class PythonMLLibAPI extends Serializable {
341342
stepSize: Double,
342343
regParam: Double,
343344
miniBatchFraction: Double,
344-
initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
345+
initialWeightsBA: Array[Byte],
346+
regType: String,
347+
intercept: Boolean): java.util.List[java.lang.Object] = {
348+
val SVMAlg = new SVMWithSGD()
349+
SVMAlg.setIntercept(intercept)
350+
SVMAlg.optimizer
351+
.setNumIterations(numIterations)
352+
.setRegParam(regParam)
353+
.setStepSize(stepSize)
354+
.setMiniBatchFraction(miniBatchFraction)
355+
if (regType == "l2") {
356+
SVMAlg.optimizer.setUpdater(new SquaredL2Updater)
357+
} else if (regType == "l1") {
358+
SVMAlg.optimizer.setUpdater(new L1Updater)
359+
} else if (regType != "none") {
360+
throw new java.lang.IllegalArgumentException("Invalid value for 'regType' parameter."
361+
+ " Can only be initialized using the following string values: [l1, l2, none].")
362+
}
345363
trainRegressionModel(
346364
(data, initialWeights) =>
347-
SVMWithSGD.train(
348-
data,
349-
numIterations,
350-
stepSize,
351-
regParam,
352-
miniBatchFraction,
353-
initialWeights),
365+
SVMAlg.run(data, initialWeights),
354366
dataBytesJRDD,
355367
initialWeightsBA)
356368
}
@@ -363,15 +375,28 @@ class PythonMLLibAPI extends Serializable {
363375
numIterations: Int,
364376
stepSize: Double,
365377
miniBatchFraction: Double,
366-
initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
378+
initialWeightsBA: Array[Byte],
379+
regParam: Double,
380+
regType: String,
381+
intercept: Boolean): java.util.List[java.lang.Object] = {
382+
val LogRegAlg = new LogisticRegressionWithSGD()
383+
LogRegAlg.setIntercept(intercept)
384+
LogRegAlg.optimizer
385+
.setNumIterations(numIterations)
386+
.setRegParam(regParam)
387+
.setStepSize(stepSize)
388+
.setMiniBatchFraction(miniBatchFraction)
389+
if (regType == "l2") {
390+
LogRegAlg.optimizer.setUpdater(new SquaredL2Updater)
391+
} else if (regType == "l1") {
392+
LogRegAlg.optimizer.setUpdater(new L1Updater)
393+
} else if (regType != "none") {
394+
throw new java.lang.IllegalArgumentException("Invalid value for 'regType' parameter."
395+
+ " Can only be initialized using the following string values: [l1, l2, none].")
396+
}
367397
trainRegressionModel(
368398
(data, initialWeights) =>
369-
LogisticRegressionWithSGD.train(
370-
data,
371-
numIterations,
372-
stepSize,
373-
miniBatchFraction,
374-
initialWeights),
399+
LogRegAlg.run(data, initialWeights),
375400
dataBytesJRDD,
376401
initialWeightsBA)
377402
}

0 commit comments

Comments
 (0)