Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
cfb25b2
[SPARK-21530] Update description of spark.shuffle.maxChunksBeingTrans…
Jul 27, 2017
ebbe589
[SPARK-21271][SQL] Ensure Unsafe.sizeInBytes is a multiple of 8
kiszk Jul 27, 2017
2ff35a0
[SPARK-21440][SQL][PYSPARK] Refactor ArrowConverters and add ArrayTyp…
ueshin Jul 27, 2017
ddcd2e8
[SPARK-19270][ML] Add summary table to GLM summary
actuaryzhang Jul 27, 2017
9f5647d
[SPARK-21319][SQL] Fix memory leak in sorter
cloud-fan Jul 27, 2017
f44ead8
[SPARK-21538][SQL] Attribute resolution inconsistency in the Dataset API
Jul 27, 2017
a5a3189
[SPARK-21306][ML] OneVsRest should support setWeightCol
facaiy Jul 28, 2017
63d168c
[MINOR][BUILD] Fix current lint-java failures
srowen Jul 28, 2017
7846809
[SPARK-21553][SPARK SHELL] Add the description of the default value o…
Jul 28, 2017
69ab0e4
[SPARK-21541][YARN] Spark Logs show incorrect job status for a job th…
Jul 28, 2017
0ef9fe6
Typo in comment
nahoj Jul 28, 2017
b56f79c
[SPARK-20090][PYTHON] Add StructType.fieldNames in PySpark
HyukjinKwon Jul 29, 2017
c143820
[SPARK-21508][DOC] Fix example code provided in Spark Streaming Docum…
Jul 29, 2017
60e9b2b
[SPARK-21357][DSTREAMS] FileInputDStream not remove out of date RDD
shaofei007 Jul 29, 2017
9c8109e
[SPARK-21555][SQL] RuntimeReplaceable should be compared semantically…
viirya Jul 29, 2017
92d8563
[SPARK-19451][SQL] rangeBetween method should accept Long value as bo…
jiangxb1987 Jul 29, 2017
6550086
[SPARK-20962][SQL] Support subquery column aliases in FROM clause
maropu Jul 29, 2017
51f99fb
[SQL] Fix typo in DataframeWriter doc
Jul 30, 2017
d79816d
[SPARK-21297][WEB-UI] Add count in 'JDBC/ODBC Server' page.
Jul 30, 2017
6830e90
[MINOR][DOC] Replace numTasks with numPartitions in programming guide
polarker Jul 30, 2017
f1a798b
[MINOR] Minor comment fixes in merge_spark_pr.py script
HyukjinKwon Jul 31, 2017
44e501a
[SPARK-19839][CORE] release longArray in BytesToBytesMap
Jul 31, 2017
106eaa9
[SPARK-21575][SPARKR] Eliminate needless synchronization in java-R se…
SereneAnt Jul 31, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,11 @@ public Properties cryptoConf() {
}

/**
* The max number of chunks allowed to being transferred at the same time on shuffle service.
* The max number of chunks allowed to be transferred at the same time on shuffle service.
* Note that new incoming connections will be closed when the max number is hit. The client will
* retry according to the shuffle retry configs (see `spark.shuffle.io.maxRetries` and
* `spark.shuffle.io.retryWait`), if those limits are reached the task will fail with fetch
* failure.
*/
public long maxChunksBeingTransferred() {
return conf.getLong("spark.shuffle.maxChunksBeingTransferred", Long.MAX_VALUE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ private class ExtendedChannelPromise extends DefaultChannelPromise {
private List<GenericFutureListener> listeners = new ArrayList<>();
private boolean success;

public ExtendedChannelPromise(Channel channel) {
ExtendedChannelPromise(Channel channel) {
super(channel);
success = false;
}
Expand All @@ -127,7 +127,9 @@ public void finish(boolean success) {
listeners.forEach(listener -> {
try {
listener.operationComplete(this);
} catch (Exception e) { }
} catch (Exception e) {
// do nothing
}
});
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,14 +120,16 @@ final class ShuffleExternalSorter extends MemoryConsumer {
this.taskContext = taskContext;
this.numPartitions = numPartitions;
// Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided
this.fileBufferSizeBytes = (int) (long) conf.get(package$.MODULE$.SHUFFLE_FILE_BUFFER_SIZE()) * 1024;
this.fileBufferSizeBytes =
(int) (long) conf.get(package$.MODULE$.SHUFFLE_FILE_BUFFER_SIZE()) * 1024;
this.numElementsForSpillThreshold =
conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold", 1024 * 1024 * 1024);
this.writeMetrics = writeMetrics;
this.inMemSorter = new ShuffleInMemorySorter(
this, initialSize, conf.getBoolean("spark.shuffle.sort.useRadixSort", true));
this.peakMemoryUsedBytes = getMemoryUsage();
this.diskWriteBufferSize = (int) (long) conf.get(package$.MODULE$.SHUFFLE_DISK_WRITE_BUFFER_SIZE());
this.diskWriteBufferSize =
(int) (long) conf.get(package$.MODULE$.SHUFFLE_DISK_WRITE_BUFFER_SIZE());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,11 @@ private MapIterator(int numRecords, Location loc, boolean destructive) {
this.destructive = destructive;
if (destructive) {
destructiveIterator = this;
// longArray will not be used anymore if destructive is true, release it now.
if (longArray != null) {
freeArray(longArray);
longArray = null;
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.IOException;
import java.util.LinkedList;
import java.util.Queue;
import java.util.function.Supplier;

import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
Expand All @@ -48,8 +49,16 @@ public final class UnsafeExternalSorter extends MemoryConsumer {

@Nullable
private final PrefixComparator prefixComparator;

/**
* {@link RecordComparator} may probably keep the reference to the records they compared last
* time, so we should not keep a {@link RecordComparator} instance inside
* {@link UnsafeExternalSorter}, because {@link UnsafeExternalSorter} is referenced by
* {@link TaskContext} and thus can not be garbage collected until the end of the task.
*/
@Nullable
private final RecordComparator recordComparator;
private final Supplier<RecordComparator> recordComparatorSupplier;

private final TaskMemoryManager taskMemoryManager;
private final BlockManager blockManager;
private final SerializerManager serializerManager;
Expand Down Expand Up @@ -90,14 +99,14 @@ public static UnsafeExternalSorter createWithExistingInMemorySorter(
BlockManager blockManager,
SerializerManager serializerManager,
TaskContext taskContext,
RecordComparator recordComparator,
Supplier<RecordComparator> recordComparatorSupplier,
PrefixComparator prefixComparator,
int initialSize,
long pageSizeBytes,
long numElementsForSpillThreshold,
UnsafeInMemorySorter inMemorySorter) throws IOException {
UnsafeExternalSorter sorter = new UnsafeExternalSorter(taskMemoryManager, blockManager,
serializerManager, taskContext, recordComparator, prefixComparator, initialSize,
serializerManager, taskContext, recordComparatorSupplier, prefixComparator, initialSize,
numElementsForSpillThreshold, pageSizeBytes, inMemorySorter, false /* ignored */);
sorter.spill(Long.MAX_VALUE, sorter);
// The external sorter will be used to insert records, in-memory sorter is not needed.
Expand All @@ -110,14 +119,14 @@ public static UnsafeExternalSorter create(
BlockManager blockManager,
SerializerManager serializerManager,
TaskContext taskContext,
RecordComparator recordComparator,
Supplier<RecordComparator> recordComparatorSupplier,
PrefixComparator prefixComparator,
int initialSize,
long pageSizeBytes,
long numElementsForSpillThreshold,
boolean canUseRadixSort) {
return new UnsafeExternalSorter(taskMemoryManager, blockManager, serializerManager,
taskContext, recordComparator, prefixComparator, initialSize, pageSizeBytes,
taskContext, recordComparatorSupplier, prefixComparator, initialSize, pageSizeBytes,
numElementsForSpillThreshold, null, canUseRadixSort);
}

Expand All @@ -126,7 +135,7 @@ private UnsafeExternalSorter(
BlockManager blockManager,
SerializerManager serializerManager,
TaskContext taskContext,
RecordComparator recordComparator,
Supplier<RecordComparator> recordComparatorSupplier,
PrefixComparator prefixComparator,
int initialSize,
long pageSizeBytes,
Expand All @@ -138,15 +147,24 @@ private UnsafeExternalSorter(
this.blockManager = blockManager;
this.serializerManager = serializerManager;
this.taskContext = taskContext;
this.recordComparator = recordComparator;
this.recordComparatorSupplier = recordComparatorSupplier;
this.prefixComparator = prefixComparator;
// Use getSizeAsKb (not bytes) to maintain backwards compatibility for units
// this.fileBufferSizeBytes = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024
this.fileBufferSizeBytes = 32 * 1024;

if (existingInMemorySorter == null) {
RecordComparator comparator = null;
if (recordComparatorSupplier != null) {
comparator = recordComparatorSupplier.get();
}
this.inMemSorter = new UnsafeInMemorySorter(
this, taskMemoryManager, recordComparator, prefixComparator, initialSize, canUseRadixSort);
this,
taskMemoryManager,
comparator,
prefixComparator,
initialSize,
canUseRadixSort);
} else {
this.inMemSorter = existingInMemorySorter;
}
Expand Down Expand Up @@ -451,14 +469,14 @@ public void merge(UnsafeExternalSorter other) throws IOException {
* after consuming this iterator.
*/
public UnsafeSorterIterator getSortedIterator() throws IOException {
assert(recordComparator != null);
assert(recordComparatorSupplier != null);
if (spillWriters.isEmpty()) {
assert(inMemSorter != null);
readingIterator = new SpillableIterator(inMemSorter.getSortedIterator());
return readingIterator;
} else {
final UnsafeSorterSpillMerger spillMerger =
new UnsafeSorterSpillMerger(recordComparator, prefixComparator, spillWriters.size());
final UnsafeSorterSpillMerger spillMerger = new UnsafeSorterSpillMerger(
recordComparatorSupplier.get(), prefixComparator, spillWriters.size());
for (UnsafeSorterSpillWriter spillWriter : spillWriters) {
spillMerger.addSpillIfNotEmpty(spillWriter.getReader(serializerManager));
}
Expand Down
16 changes: 2 additions & 14 deletions core/src/main/scala/org/apache/spark/api/r/JVMObjectTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,7 @@ private[r] class JVMObjectTracker {
/**
* Returns the JVM object associated with the input key or None if not found.
*/
final def get(id: JVMObjectId): Option[Object] = this.synchronized {
if (objMap.containsKey(id)) {
Some(objMap.get(id))
} else {
None
}
}
final def get(id: JVMObjectId): Option[Object] = Option(objMap.get(id))

/**
* Returns the JVM object associated with the input key or throws an exception if not found.
Expand All @@ -67,13 +61,7 @@ private[r] class JVMObjectTracker {
/**
* Removes and returns a JVM object with the specific ID from the tracker, or None if not found.
*/
final def remove(id: JVMObjectId): Option[Object] = this.synchronized {
if (objMap.containsKey(id)) {
Some(objMap.remove(id))
} else {
None
}
}
final def remove(id: JVMObjectId): Option[Object] = Option(objMap.remove(id))

/**
* Number of JVM objects being tracked.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,8 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
outStream.println(
s"""
|Options:
| --master MASTER_URL spark://host:port, mesos://host:port, yarn, or local.
| --master MASTER_URL spark://host:port, mesos://host:port, yarn, or local
| (Default: local[*]).
| --deploy-mode DEPLOY_MODE Whether to launch the driver program locally ("client") or
| on one of the worker machines inside the cluster ("cluster")
| (Default: client).
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ import org.apache.spark.util.random.{BernoulliCellSampler, BernoulliSampler, Poi
* Doubles; and
* [[org.apache.spark.rdd.SequenceFileRDDFunctions]] contains operations available on RDDs that
* can be saved as SequenceFiles.
* All operations are automatically available on any RDD of the right type (e.g. RDD[(Int, Int)]
* All operations are automatically available on any RDD of the right type (e.g. RDD[(Int, Int)])
* through implicit.
*
* Internally, each RDD is characterized by five main properties:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ private UnsafeExternalSorter newSorter() throws IOException {
blockManager,
serializerManager,
taskContext,
recordComparator,
() -> recordComparator,
prefixComparator,
/* initialSize */ 1024,
pageSizeBytes,
Expand Down Expand Up @@ -440,7 +440,7 @@ public void testPeakMemoryUsed() throws Exception {
blockManager,
serializerManager,
taskContext,
recordComparator,
() -> recordComparator,
prefixComparator,
1024,
pageSizeBytes,
Expand Down
7 changes: 4 additions & 3 deletions dev/merge_spark_pr.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@
# limitations under the License.
#

# Utility for creating well-formed pull request merges and pushing them to Apache.
# usage: ./apache-pr-merge.py (see config env vars below)
# Utility for creating well-formed pull request merges and pushing them to Apache
# Spark.
# usage: ./merge_spark_pr.py (see config env vars below)
#
# This utility assumes you already have local a Spark git folder and that you
# This utility assumes you already have a local Spark git folder and that you
# have added remotes corresponding to both (i) the github apache Spark
# mirror and (ii) the apache git repo.

Expand Down
6 changes: 5 additions & 1 deletion docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -635,7 +635,11 @@ Apart from these, the following properties are also available, and may be useful
<td><code>spark.shuffle.maxChunksBeingTransferred</code></td>
<td>Long.MAX_VALUE</td>
<td>
The max number of chunks allowed to being transferred at the same time on shuffle service.
The max number of chunks allowed to be transferred at the same time on shuffle service.
Note that new incoming connections will be closed when the max number is hit. The client will
retry according to the shuffle retry configs (see <code>spark.shuffle.io.maxRetries</code> and
<code>spark.shuffle.io.retryWait</code>), if those limits are reached the task will fail with
fetch failure.
</td>
</tr>
<tr>
Expand Down
16 changes: 8 additions & 8 deletions docs/rdd-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -978,40 +978,40 @@ for details.
<td> Return a new RDD that contains the intersection of elements in the source dataset and the argument. </td>
</tr>
<tr>
<td> <b>distinct</b>([<i>numTasks</i>])) </td>
<td> <b>distinct</b>([<i>numPartitions</i>])) </td>
<td> Return a new dataset that contains the distinct elements of the source dataset.</td>
</tr>
<tr>
<td> <b>groupByKey</b>([<i>numTasks</i>]) <a name="GroupByLink"></a> </td>
<td> <b>groupByKey</b>([<i>numPartitions</i>]) <a name="GroupByLink"></a> </td>
<td> When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable&lt;V&gt;) pairs. <br />
<b>Note:</b> If you are grouping in order to perform an aggregation (such as a sum or
average) over each key, using <code>reduceByKey</code> or <code>aggregateByKey</code> will yield much better
performance.
<br />
<b>Note:</b> By default, the level of parallelism in the output depends on the number of partitions of the parent RDD.
You can pass an optional <code>numTasks</code> argument to set a different number of tasks.
You can pass an optional <code>numPartitions</code> argument to set a different number of tasks.
</td>
</tr>
<tr>
<td> <b>reduceByKey</b>(<i>func</i>, [<i>numTasks</i>]) <a name="ReduceByLink"></a> </td>
<td> <b>reduceByKey</b>(<i>func</i>, [<i>numPartitions</i>]) <a name="ReduceByLink"></a> </td>
<td> When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function <i>func</i>, which must be of type (V,V) => V. Like in <code>groupByKey</code>, the number of reduce tasks is configurable through an optional second argument. </td>
</tr>
<tr>
<td> <b>aggregateByKey</b>(<i>zeroValue</i>)(<i>seqOp</i>, <i>combOp</i>, [<i>numTasks</i>]) <a name="AggregateByLink"></a> </td>
<td> <b>aggregateByKey</b>(<i>zeroValue</i>)(<i>seqOp</i>, <i>combOp</i>, [<i>numPartitions</i>]) <a name="AggregateByLink"></a> </td>
<td> When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine functions and a neutral "zero" value. Allows an aggregated value type that is different than the input value type, while avoiding unnecessary allocations. Like in <code>groupByKey</code>, the number of reduce tasks is configurable through an optional second argument. </td>
</tr>
<tr>
<td> <b>sortByKey</b>([<i>ascending</i>], [<i>numTasks</i>]) <a name="SortByLink"></a> </td>
<td> <b>sortByKey</b>([<i>ascending</i>], [<i>numPartitions</i>]) <a name="SortByLink"></a> </td>
<td> When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean <code>ascending</code> argument.</td>
</tr>
<tr>
<td> <b>join</b>(<i>otherDataset</i>, [<i>numTasks</i>]) <a name="JoinLink"></a> </td>
<td> <b>join</b>(<i>otherDataset</i>, [<i>numPartitions</i>]) <a name="JoinLink"></a> </td>
<td> When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key.
Outer joins are supported through <code>leftOuterJoin</code>, <code>rightOuterJoin</code>, and <code>fullOuterJoin</code>.
</td>
</tr>
<tr>
<td> <b>cogroup</b>(<i>otherDataset</i>, [<i>numTasks</i>]) <a name="CogroupLink"></a> </td>
<td> <b>cogroup</b>(<i>otherDataset</i>, [<i>numPartitions</i>]) <a name="CogroupLink"></a> </td>
<td> When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (Iterable&lt;V&gt;, Iterable&lt;W&gt;)) tuples. This operation is also called <code>groupWith</code>. </td>
</tr>
<tr>
Expand Down
4 changes: 2 additions & 2 deletions docs/streaming-custom-receivers.md
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ an input DStream using data received by the instance of custom receiver, as show
{% highlight scala %}
// Assuming ssc is the StreamingContext
val customReceiverStream = ssc.receiverStream(new CustomReceiver(host, port))
val words = lines.flatMap(_.split(" "))
val words = customReceiverStream.flatMap(_.split(" "))
...
{% endhighlight %}

Expand All @@ -187,7 +187,7 @@ The full source code is in the example [CustomReceiver.scala]({{site.SPARK_GITHU
{% highlight java %}
// Assuming ssc is the JavaStreamingContext
JavaDStream<String> customReceiverStream = ssc.receiverStream(new JavaCustomReceiver(host, port));
JavaDStream<String> words = lines.flatMap(s -> ...);
JavaDStream<String> words = customReceiverStream.flatMap(s -> ...);
...
{% endhighlight %}

Expand Down
Loading