Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 16 additions & 12 deletions common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,18 +46,22 @@ public final class Platform {
private static final boolean unaligned;
static {
boolean _unaligned;
// use reflection to access unaligned field
try {
Class<?> bitsClass =
Class.forName("java.nio.Bits", false, ClassLoader.getSystemClassLoader());
Method unalignedMethod = bitsClass.getDeclaredMethod("unaligned");
unalignedMethod.setAccessible(true);
_unaligned = Boolean.TRUE.equals(unalignedMethod.invoke(null));
} catch (Throwable t) {
// We at least know x86 and x64 support unaligned access.
String arch = System.getProperty("os.arch", "");
//noinspection DynamicRegexReplaceableByCompiledPattern
_unaligned = arch.matches("^(i[3-6]86|x86(_64)?|x64|amd64|aarch64)$");
String arch = System.getProperty("os.arch", "");
if (arch.equals("ppc64le") || arch.equals("ppc64")) {
// Since java.nio.Bits.unaligned() doesn't return true on ppc (See JDK-8165231), but ppc64 and ppc64le support it
_unaligned = true;
} else {
try {
Class<?> bitsClass =
Class.forName("java.nio.Bits", false, ClassLoader.getSystemClassLoader());
Method unalignedMethod = bitsClass.getDeclaredMethod("unaligned");
unalignedMethod.setAccessible(true);
_unaligned = Boolean.TRUE.equals(unalignedMethod.invoke(null));
} catch (Throwable t) {
// We at least know x86 and x64 support unaligned access.
//noinspection DynamicRegexReplaceableByCompiledPattern
_unaligned = arch.matches("^(i[3-6]86|x86(_64)?|x64|amd64|aarch64)$");
}
}
unaligned = _unaligned;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,6 @@ protected long getUsed() {

/**
* Force spill during building.
*
* For testing.
*/
public void spill() throws IOException {
spill(Long.MAX_VALUE, this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,11 +155,7 @@ public long acquireExecutionMemory(long required, MemoryConsumer consumer) {
for (MemoryConsumer c: consumers) {
if (c != consumer && c.getUsed() > 0 && c.getMode() == mode) {
long key = c.getUsed();
List<MemoryConsumer> list = sortedConsumers.get(key);
if (list == null) {
list = new ArrayList<>(1);
sortedConsumers.put(key, list);
}
List<MemoryConsumer> list = sortedConsumers.computeIfAbsent(key, k -> new ArrayList<>(1));
list.add(c);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,15 @@
* This class implements sort-based shuffle's hash-style shuffle fallback path. This write path
* writes incoming records to separate files, one file per reduce partition, then concatenates these
* per-partition files to form a single output file, regions of which are served to reducers.
* Records are not buffered in memory. This is essentially identical to
* {@link org.apache.spark.shuffle.hash.HashShuffleWriter}, except that it writes output in a format
* Records are not buffered in memory. It writes output in a format
* that can be served / consumed via {@link org.apache.spark.shuffle.IndexShuffleBlockResolver}.
* <p>
* This write path is inefficient for shuffles with large numbers of reduce partitions because it
* simultaneously opens separate serializers and file streams for all partitions. As a result,
* {@link SortShuffleManager} only selects this write path when
* <ul>
* <li>no Ordering is specified,</li>
* <li>no Aggregator is specific, and</li>
* <li>no Aggregator is specified, and</li>
* <li>the number of partitions is less than
* <code>spark.shuffle.sort.bypassMergeThreshold</code>.</li>
* </ul>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.spark.util.EnumUtil;

import java.util.Collections;
import java.util.HashSet;
import java.util.Set;

Expand All @@ -30,9 +31,7 @@ public enum TaskSorting {
private final Set<String> alternateNames;
TaskSorting(String... names) {
alternateNames = new HashSet<>();
for (String n: names) {
alternateNames.add(n);
}
Collections.addAll(alternateNames, names);
}

public static TaskSorting fromString(String str) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,12 @@ private[spark] trait ExecutorAllocationClient {

/**
* Request that the cluster manager kill every executor on the specified host.
* Results in a call to killExecutors for each executor on the host, with the replace
* and force arguments set to true.
*
* @return whether the request is acknowledged by the cluster manager.
*/
def killExecutorsOnHost(host: String): Boolean

/**
/**
* Request that the cluster manager kill the specified executor.
* @return whether the request is acknowledged by the cluster manager.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
.orNull
numExecutors = Option(numExecutors)
.getOrElse(sparkProperties.get("spark.executor.instances").orNull)
queue = Option(queue).orElse(sparkProperties.get("spark.yarn.queue")).orNull
keytab = Option(keytab).orElse(sparkProperties.get("spark.yarn.keytab")).orNull
principal = Option(principal).orElse(sparkProperties.get("spark.yarn.principal")).orNull

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,13 @@ abstract class FileCommitProtocol {
def deleteWithJob(fs: FileSystem, path: Path, recursive: Boolean): Boolean = {
fs.delete(path, recursive)
}

/**
* Called on the driver after a task commits. This can be used to access task commit messages
* before the job has finished. These same task commit messages will be passed to commitJob()
* if the entire job succeeds.
*/
def onTaskCommit(taskCommit: TaskCommitMessage): Unit = {}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,7 @@ private[spark] object CompressionCodec {
val ctor = Utils.classForName(codecClass).getConstructor(classOf[SparkConf])
Some(ctor.newInstance(conf).asInstanceOf[CompressionCodec])
} catch {
case e: ClassNotFoundException => None
case e: IllegalArgumentException => None
case _: ClassNotFoundException | _: IllegalArgumentException => None
}
codec.getOrElse(throw new IllegalArgumentException(s"Codec [$codecName] is not available. " +
s"Consider setting $configKey=$FALLBACK_COMPRESSION_CODEC"))
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/scheduler/Task.scala
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ private[spark] abstract class Task[T](

def preferredLocations: Seq[TaskLocation] = Nil

// Map output tracker epoch. Will be set by TaskScheduler.
// Map output tracker epoch. Will be set by TaskSetManager.
var epoch: Long = -1

// Task context, to be initialized in run().
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ abstract class Serializer {
* position = 0
* serOut.write(obj1)
* serOut.flush()
* position = # of bytes writen to stream so far
* position = # of bytes written to stream so far
* obj1Bytes = output[0:position-1]
* serOut.write(obj2)
* serOut.flush()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,7 @@ private[spark] class BlockStoreShuffleReader[K, C](
// Sort the output if there is a sort ordering defined.
dep.keyOrdering match {
case Some(keyOrd: Ordering[K]) =>
// Create an ExternalSorter to sort the data. Note that if spark.shuffle.spill is disabled,
// the ExternalSorter won't spill to disk.
// Create an ExternalSorter to sort the data.
val sorter =
new ExternalSorter[K, C, C](context, ordering = Some(keyOrd), serializer = dep.serializer)
sorter.insertAll(aggregatedIter)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ private[spark] class IndexShuffleBlockResolver(

/**
* Remove data file and index file that contain the output data from one map.
* */
*/
def removeDataByMap(shuffleId: Int, mapId: Int): Unit = {
var file = getDataFile(shuffleId, mapId)
if (file.exists()) {
Expand Down Expand Up @@ -132,7 +132,7 @@ private[spark] class IndexShuffleBlockResolver(
* replace them with new ones.
*
* Note: the `lengths` will be updated to match the existing index file if use the existing ones.
* */
*/
def writeIndexFileAndCommit(
shuffleId: Int,
mapId: Int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,13 @@ private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager
override val shuffleBlockResolver = new IndexShuffleBlockResolver(conf)

/**
* Register a shuffle with the manager and obtain a handle for it to pass to tasks.
* Obtains a [[ShuffleHandle]] to pass to tasks.
*/
override def registerShuffle[K, V, C](
shuffleId: Int,
numMaps: Int,
dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
if (SortShuffleWriter.shouldBypassMergeSort(SparkEnv.get.conf, dependency)) {
if (SortShuffleWriter.shouldBypassMergeSort(conf, dependency)) {
// If there are fewer than spark.shuffle.sort.bypassMergeThreshold partitions and we don't
// need map-side aggregation, then write numPartitions files directly and just concatenate
// them at the end. This avoids doing serialization and deserialization twice to merge
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ import org.apache.spark.unsafe.Platform
import org.apache.spark.util._
import org.apache.spark.util.io.ChunkedByteBuffer


/* Class for returning a fetched block and associated metrics. */
private[spark] class BlockResult(
val data: Iterator[Any],
Expand Down Expand Up @@ -1258,7 +1257,6 @@ private[spark] class BlockManager(
replication = 1)

val numPeersToReplicateTo = level.replication - 1

val startTime = System.nanoTime

var peersReplicatedTo = mutable.HashSet.empty ++ existingReplicas
Expand Down Expand Up @@ -1313,7 +1311,6 @@ private[spark] class BlockManager(
numPeersToReplicateTo - peersReplicatedTo.size)
}
}

logDebug(s"Replicating $blockId of ${data.size} bytes to " +
s"${peersReplicatedTo.size} peer(s) took ${(System.nanoTime - startTime) / 1e6} ms")
if (peersReplicatedTo.size < numPeersToReplicateTo) {
Expand Down
Loading