Skip to content

Commit 5ceaa8a

Browse files
author
Davies Liu
committed
Merge branch 'master' of github.com:apache/spark into binary
2 parents 1900085 + 2812815 commit 5ceaa8a

File tree

21 files changed

+368
-125
lines changed

21 files changed

+368
-125
lines changed

core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -211,20 +211,11 @@ class HadoopRDD[K, V](
211211

212212
val split = theSplit.asInstanceOf[HadoopPartition]
213213
logInfo("Input split: " + split.inputSplit)
214-
var reader: RecordReader[K, V] = null
215214
val jobConf = getJobConf()
216-
val inputFormat = getInputFormat(jobConf)
217-
HadoopRDD.addLocalConfiguration(new SimpleDateFormat("yyyyMMddHHmm").format(createTime),
218-
context.stageId, theSplit.index, context.attemptId.toInt, jobConf)
219-
reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL)
220-
221-
// Register an on-task-completion callback to close the input stream.
222-
context.addTaskCompletionListener{ context => closeIfNeeded() }
223-
val key: K = reader.createKey()
224-
val value: V = reader.createValue()
225215

226216
val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
227-
// Find a function that will return the FileSystem bytes read by this thread.
217+
// Find a function that will return the FileSystem bytes read by this thread. Do this before
218+
// creating RecordReader, because RecordReader's constructor might read some bytes
228219
val bytesReadCallback = if (split.inputSplit.value.isInstanceOf[FileSplit]) {
229220
SparkHadoopUtil.get.getFSBytesReadOnThreadCallback(
230221
split.inputSplit.value.asInstanceOf[FileSplit].getPath, jobConf)
@@ -234,6 +225,18 @@ class HadoopRDD[K, V](
234225
if (bytesReadCallback.isDefined) {
235226
context.taskMetrics.inputMetrics = Some(inputMetrics)
236227
}
228+
229+
var reader: RecordReader[K, V] = null
230+
val inputFormat = getInputFormat(jobConf)
231+
HadoopRDD.addLocalConfiguration(new SimpleDateFormat("yyyyMMddHHmm").format(createTime),
232+
context.stageId, theSplit.index, context.attemptId.toInt, jobConf)
233+
reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL)
234+
235+
// Register an on-task-completion callback to close the input stream.
236+
context.addTaskCompletionListener{ context => closeIfNeeded() }
237+
val key: K = reader.createKey()
238+
val value: V = reader.createValue()
239+
237240
var recordsSinceMetricsUpdate = 0
238241

239242
override def getNext() = {

core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -107,20 +107,10 @@ class NewHadoopRDD[K, V](
107107
val split = theSplit.asInstanceOf[NewHadoopPartition]
108108
logInfo("Input split: " + split.serializableHadoopSplit)
109109
val conf = confBroadcast.value.value
110-
val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, split.index, 0)
111-
val hadoopAttemptContext = newTaskAttemptContext(conf, attemptId)
112-
val format = inputFormatClass.newInstance
113-
format match {
114-
case configurable: Configurable =>
115-
configurable.setConf(conf)
116-
case _ =>
117-
}
118-
val reader = format.createRecordReader(
119-
split.serializableHadoopSplit.value, hadoopAttemptContext)
120-
reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext)
121110

122111
val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
123-
// Find a function that will return the FileSystem bytes read by this thread.
112+
// Find a function that will return the FileSystem bytes read by this thread. Do this before
113+
// creating RecordReader, because RecordReader's constructor might read some bytes
124114
val bytesReadCallback = if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit]) {
125115
SparkHadoopUtil.get.getFSBytesReadOnThreadCallback(
126116
split.serializableHadoopSplit.value.asInstanceOf[FileSplit].getPath, conf)
@@ -131,6 +121,18 @@ class NewHadoopRDD[K, V](
131121
context.taskMetrics.inputMetrics = Some(inputMetrics)
132122
}
133123

124+
val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, split.index, 0)
125+
val hadoopAttemptContext = newTaskAttemptContext(conf, attemptId)
126+
val format = inputFormatClass.newInstance
127+
format match {
128+
case configurable: Configurable =>
129+
configurable.setConf(conf)
130+
case _ =>
131+
}
132+
val reader = format.createRecordReader(
133+
split.serializableHadoopSplit.value, hadoopAttemptContext)
134+
reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext)
135+
134136
// Register an on-task-completion callback to close the input stream.
135137
context.addTaskCompletionListener(context => close())
136138
var havePair = false

core/src/test/scala/org/apache/spark/metrics/InputMetricsSuite.scala

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import scala.collection.mutable.ArrayBuffer
2727
import java.io.{FileWriter, PrintWriter, File}
2828

2929
class InputMetricsSuite extends FunSuite with SharedSparkContext {
30-
test("input metrics when reading text file") {
30+
test("input metrics when reading text file with single split") {
3131
val file = new File(getClass.getSimpleName + ".txt")
3232
val pw = new PrintWriter(new FileWriter(file))
3333
pw.println("some stuff")
@@ -48,6 +48,29 @@ class InputMetricsSuite extends FunSuite with SharedSparkContext {
4848
// Wait for task end events to come in
4949
sc.listenerBus.waitUntilEmpty(500)
5050
assert(taskBytesRead.length == 2)
51-
assert(taskBytesRead.sum == file.length())
51+
assert(taskBytesRead.sum >= file.length())
52+
}
53+
54+
test("input metrics when reading text file with multiple splits") {
55+
val file = new File(getClass.getSimpleName + ".txt")
56+
val pw = new PrintWriter(new FileWriter(file))
57+
for (i <- 0 until 10000) {
58+
pw.println("some stuff")
59+
}
60+
pw.close()
61+
file.deleteOnExit()
62+
63+
val taskBytesRead = new ArrayBuffer[Long]()
64+
sc.addSparkListener(new SparkListener() {
65+
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
66+
taskBytesRead += taskEnd.taskMetrics.inputMetrics.get.bytesRead
67+
}
68+
})
69+
sc.textFile("file://" + file.getAbsolutePath, 2).count()
70+
71+
// Wait for task end events to come in
72+
sc.listenerBus.waitUntilEmpty(500)
73+
assert(taskBytesRead.length == 2)
74+
assert(taskBytesRead.sum >= file.length())
5275
}
5376
}

docs/sql-programming-guide.md

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -582,19 +582,27 @@ Configuration of Parquet can be done using the `setConf` method on SQLContext or
582582
</tr>
583583
<tr>
584584
<td><code>spark.sql.parquet.cacheMetadata</code></td>
585-
<td>false</td>
585+
<td>true</td>
586586
<td>
587587
Turns on caching of Parquet schema metadata. Can speed up querying of static data.
588588
</td>
589589
</tr>
590590
<tr>
591591
<td><code>spark.sql.parquet.compression.codec</code></td>
592-
<td>snappy</td>
592+
<td>gzip</td>
593593
<td>
594594
Sets the compression codec use when writing Parquet files. Acceptable values include:
595595
uncompressed, snappy, gzip, lzo.
596596
</td>
597597
</tr>
598+
<tr>
599+
<td><code>spark.sql.hive.convertMetastoreParquet</code></td>
600+
<td>true</td>
601+
<td>
602+
When set to false, Spark SQL will use the Hive SerDe for parquet tables instead of the built in
603+
support.
604+
</td>
605+
</tr>
598606
</table>
599607

600608
## JSON Datasets
@@ -815,15 +823,15 @@ Configuration of in-memory caching can be done using the `setConf` method on SQL
815823
<tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
816824
<tr>
817825
<td><code>spark.sql.inMemoryColumnarStorage.compressed</code></td>
818-
<td>false</td>
826+
<td>true</td>
819827
<td>
820828
When set to true Spark SQL will automatically select a compression codec for each column based
821829
on statistics of the data.
822830
</td>
823831
</tr>
824832
<tr>
825833
<td><code>spark.sql.inMemoryColumnarStorage.batchSize</code></td>
826-
<td>1000</td>
834+
<td>10000</td>
827835
<td>
828836
Controls the size of batches for columnar caching. Larger batch sizes can improve memory utilization
829837
and compression, but risk OOMs when caching data.
@@ -841,7 +849,7 @@ that these options will be deprecated in future release as more optimizations ar
841849
<tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
842850
<tr>
843851
<td><code>spark.sql.autoBroadcastJoinThreshold</code></td>
844-
<td>10000</td>
852+
<td>10485760 (10 MB)</td>
845853
<td>
846854
Configures the maximum size in bytes for a table that will be broadcast to all worker nodes when
847855
performing a join. By setting this value to -1 broadcasting can be disabled. Note that currently

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1359,7 +1359,7 @@
13591359
<activeByDefault>false</activeByDefault>
13601360
</activation>
13611361
<properties>
1362-
<hive.version>0.13.1</hive.version>
1362+
<hive.version>0.13.1a</hive.version>
13631363
<hive.version.short>0.13.1</hive.version.short>
13641364
<derby.version>10.10.1.1</derby.version>
13651365
</properties>

python/pyspark/rdd.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -316,9 +316,6 @@ def sample(self, withReplacement, fraction, seed=None):
316316
"""
317317
Return a sampled subset of this RDD (relies on numpy and falls back
318318
on default random generator if numpy is unavailable).
319-
320-
>>> sc.parallelize(range(0, 100)).sample(False, 0.1, 2).collect() #doctest: +SKIP
321-
[2, 3, 20, 21, 24, 41, 42, 66, 67, 89, 90, 98]
322319
"""
323320
assert fraction >= 0.0, "Negative fraction value: %s" % fraction
324321
return self.mapPartitionsWithIndex(RDDSampler(withReplacement, fraction, seed).func, True)

python/pyspark/rddsampler.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,14 +40,13 @@ def __init__(self, withReplacement, seed=None):
4040
def initRandomGenerator(self, split):
4141
if self._use_numpy:
4242
import numpy
43-
self._random = numpy.random.RandomState(self._seed)
43+
self._random = numpy.random.RandomState(self._seed ^ split)
4444
else:
45-
self._random = random.Random(self._seed)
45+
self._random = random.Random(self._seed ^ split)
4646

47-
for _ in range(0, split):
48-
# discard the next few values in the sequence to have a
49-
# different seed for the different splits
50-
self._random.randint(0, 2 ** 32 - 1)
47+
# mixing because the initial seeds are close to each other
48+
for _ in xrange(10):
49+
self._random.randint(0, 1)
5150

5251
self._split = split
5352
self._rand_initialized = True

0 commit comments

Comments
 (0)