Skip to content

Commit 7a6eb18

Browse files
committed
Merge remote-tracking branch 'apache/master' into scala-2.11-prashant
Conflicts: examples/pom.xml
2 parents 583aa07 + a878660 commit 7a6eb18

File tree

112 files changed

+1792
-1158
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

112 files changed

+1792
-1158
lines changed

core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import org.apache.hadoop.mapred._
2626
import org.apache.hadoop.fs.FileSystem
2727
import org.apache.hadoop.fs.Path
2828

29+
import org.apache.spark.mapred.SparkHadoopMapRedUtil
2930
import org.apache.spark.rdd.HadoopRDD
3031

3132
/**

core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala

Lines changed: 39 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.spark.deploy
1919

20+
import java.lang.reflect.Method
2021
import java.security.PrivilegedExceptionAction
2122

2223
import org.apache.hadoop.conf.Configuration
@@ -133,14 +134,9 @@ class SparkHadoopUtil extends Logging {
133134
*/
134135
private[spark] def getFSBytesReadOnThreadCallback(path: Path, conf: Configuration)
135136
: Option[() => Long] = {
136-
val qualifiedPath = path.getFileSystem(conf).makeQualified(path)
137-
val scheme = qualifiedPath.toUri().getScheme()
138-
val stats = FileSystem.getAllStatistics().filter(_.getScheme().equals(scheme))
139137
try {
140-
val threadStats = stats.map(Utils.invoke(classOf[Statistics], _, "getThreadStatistics"))
141-
val statisticsDataClass =
142-
Class.forName("org.apache.hadoop.fs.FileSystem$Statistics$StatisticsData")
143-
val getBytesReadMethod = statisticsDataClass.getDeclaredMethod("getBytesRead")
138+
val threadStats = getFileSystemThreadStatistics(path, conf)
139+
val getBytesReadMethod = getFileSystemThreadStatisticsMethod("getBytesRead")
144140
val f = () => threadStats.map(getBytesReadMethod.invoke(_).asInstanceOf[Long]).sum
145141
val baselineBytesRead = f()
146142
Some(() => f() - baselineBytesRead)
@@ -151,6 +147,42 @@ class SparkHadoopUtil extends Logging {
151147
}
152148
}
153149
}
150+
151+
/**
152+
* Returns a function that can be called to find Hadoop FileSystem bytes written. If
153+
* getFSBytesWrittenOnThreadCallback is called from thread r at time t, the returned callback will
154+
* return the bytes written on r since t. Reflection is required because thread-level FileSystem
155+
* statistics are only available as of Hadoop 2.5 (see HADOOP-10688).
156+
* Returns None if the required method can't be found.
157+
*/
158+
private[spark] def getFSBytesWrittenOnThreadCallback(path: Path, conf: Configuration)
159+
: Option[() => Long] = {
160+
try {
161+
val threadStats = getFileSystemThreadStatistics(path, conf)
162+
val getBytesWrittenMethod = getFileSystemThreadStatisticsMethod("getBytesWritten")
163+
val f = () => threadStats.map(getBytesWrittenMethod.invoke(_).asInstanceOf[Long]).sum
164+
val baselineBytesWritten = f()
165+
Some(() => f() - baselineBytesWritten)
166+
} catch {
167+
case e: NoSuchMethodException => {
168+
logDebug("Couldn't find method for retrieving thread-level FileSystem output data", e)
169+
None
170+
}
171+
}
172+
}
173+
174+
private def getFileSystemThreadStatistics(path: Path, conf: Configuration): Seq[AnyRef] = {
175+
val qualifiedPath = path.getFileSystem(conf).makeQualified(path)
176+
val scheme = qualifiedPath.toUri().getScheme()
177+
val stats = FileSystem.getAllStatistics().filter(_.getScheme().equals(scheme))
178+
stats.map(Utils.invoke(classOf[Statistics], _, "getThreadStatistics"))
179+
}
180+
181+
private def getFileSystemThreadStatisticsMethod(methodName: String): Method = {
182+
val statisticsDataClass =
183+
Class.forName("org.apache.hadoop.fs.FileSystem$Statistics$StatisticsData")
184+
statisticsDataClass.getDeclaredMethod(methodName)
185+
}
154186
}
155187

156188
object SparkHadoopUtil {

core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import scala.collection.mutable.ArrayBuffer
2424

2525
import akka.actor.ActorRef
2626

27+
import org.apache.spark.annotation.DeveloperApi
2728
import org.apache.spark.deploy.ApplicationDescription
2829
import org.apache.spark.util.Utils
2930

core/src/main/scala/org/apache/spark/deploy/master/DriverInfo.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.deploy.master
1919

2020
import java.util.Date
2121

22+
import org.apache.spark.annotation.DeveloperApi
2223
import org.apache.spark.deploy.DriverDescription
2324
import org.apache.spark.util.Utils
2425

core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala

Lines changed: 18 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,12 @@
1818
package org.apache.spark.deploy.master
1919

2020
import java.io._
21-
22-
import akka.serialization.Serialization
21+
import java.nio.ByteBuffer
2322

2423
import org.apache.spark.Logging
24+
import org.apache.spark.serializer.Serializer
25+
26+
import scala.reflect.ClassTag
2527

2628
/**
2729
* Stores data in a single on-disk directory with one file per application and worker.
@@ -32,65 +34,39 @@ import org.apache.spark.Logging
3234
*/
3335
private[spark] class FileSystemPersistenceEngine(
3436
val dir: String,
35-
val serialization: Serialization)
37+
val serialization: Serializer)
3638
extends PersistenceEngine with Logging {
3739

40+
val serializer = serialization.newInstance()
3841
new File(dir).mkdir()
3942

40-
override def addApplication(app: ApplicationInfo) {
41-
val appFile = new File(dir + File.separator + "app_" + app.id)
42-
serializeIntoFile(appFile, app)
43-
}
44-
45-
override def removeApplication(app: ApplicationInfo) {
46-
new File(dir + File.separator + "app_" + app.id).delete()
47-
}
48-
49-
override def addDriver(driver: DriverInfo) {
50-
val driverFile = new File(dir + File.separator + "driver_" + driver.id)
51-
serializeIntoFile(driverFile, driver)
43+
override def persist(name: String, obj: Object): Unit = {
44+
serializeIntoFile(new File(dir + File.separator + name), obj)
5245
}
5346

54-
override def removeDriver(driver: DriverInfo) {
55-
new File(dir + File.separator + "driver_" + driver.id).delete()
47+
override def unpersist(name: String): Unit = {
48+
new File(dir + File.separator + name).delete()
5649
}
5750

58-
override def addWorker(worker: WorkerInfo) {
59-
val workerFile = new File(dir + File.separator + "worker_" + worker.id)
60-
serializeIntoFile(workerFile, worker)
61-
}
62-
63-
override def removeWorker(worker: WorkerInfo) {
64-
new File(dir + File.separator + "worker_" + worker.id).delete()
65-
}
66-
67-
override def readPersistedData(): (Seq[ApplicationInfo], Seq[DriverInfo], Seq[WorkerInfo]) = {
68-
val sortedFiles = new File(dir).listFiles().sortBy(_.getName)
69-
val appFiles = sortedFiles.filter(_.getName.startsWith("app_"))
70-
val apps = appFiles.map(deserializeFromFile[ApplicationInfo])
71-
val driverFiles = sortedFiles.filter(_.getName.startsWith("driver_"))
72-
val drivers = driverFiles.map(deserializeFromFile[DriverInfo])
73-
val workerFiles = sortedFiles.filter(_.getName.startsWith("worker_"))
74-
val workers = workerFiles.map(deserializeFromFile[WorkerInfo])
75-
(apps, drivers, workers)
51+
override def read[T: ClassTag](prefix: String) = {
52+
val files = new File(dir).listFiles().filter(_.getName.startsWith(prefix))
53+
files.map(deserializeFromFile[T])
7654
}
7755

7856
private def serializeIntoFile(file: File, value: AnyRef) {
7957
val created = file.createNewFile()
8058
if (!created) { throw new IllegalStateException("Could not create file: " + file) }
8159

82-
val serializer = serialization.findSerializerFor(value)
83-
val serialized = serializer.toBinary(value)
84-
85-
val out = new FileOutputStream(file)
60+
val out = serializer.serializeStream(new FileOutputStream(file))
8661
try {
87-
out.write(serialized)
62+
out.writeObject(value)
8863
} finally {
8964
out.close()
9065
}
66+
9167
}
9268

93-
def deserializeFromFile[T](file: File)(implicit m: Manifest[T]): T = {
69+
def deserializeFromFile[T](file: File): T = {
9470
val fileData = new Array[Byte](file.length().asInstanceOf[Int])
9571
val dis = new DataInputStream(new FileInputStream(file))
9672
try {
@@ -99,8 +75,6 @@ private[spark] class FileSystemPersistenceEngine(
9975
dis.close()
10076
}
10177

102-
val clazz = m.runtimeClass.asInstanceOf[Class[T]]
103-
val serializer = serialization.serializerFor(clazz)
104-
serializer.fromBinary(fileData).asInstanceOf[T]
78+
serializer.deserializeStream(dis).readObject()
10579
}
10680
}

core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala

Lines changed: 17 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -17,30 +17,27 @@
1717

1818
package org.apache.spark.deploy.master
1919

20-
import akka.actor.{Actor, ActorRef}
21-
22-
import org.apache.spark.deploy.master.MasterMessages.ElectedLeader
20+
import org.apache.spark.annotation.DeveloperApi
2321

2422
/**
25-
* A LeaderElectionAgent keeps track of whether the current Master is the leader, meaning it
26-
* is the only Master serving requests.
27-
* In addition to the API provided, the LeaderElectionAgent will use of the following messages
28-
* to inform the Master of leader changes:
29-
* [[org.apache.spark.deploy.master.MasterMessages.ElectedLeader ElectedLeader]]
30-
* [[org.apache.spark.deploy.master.MasterMessages.RevokedLeadership RevokedLeadership]]
23+
* :: DeveloperApi ::
24+
*
25+
* A LeaderElectionAgent tracks current master and is a common interface for all election Agents.
3126
*/
32-
private[spark] trait LeaderElectionAgent extends Actor {
33-
// TODO: LeaderElectionAgent does not necessary to be an Actor anymore, need refactoring.
34-
val masterActor: ActorRef
27+
@DeveloperApi
28+
trait LeaderElectionAgent {
29+
val masterActor: LeaderElectable
30+
def stop() {} // to avoid noops in implementations.
3531
}
3632

37-
/** Single-node implementation of LeaderElectionAgent -- we're initially and always the leader. */
38-
private[spark] class MonarchyLeaderAgent(val masterActor: ActorRef) extends LeaderElectionAgent {
39-
override def preStart() {
40-
masterActor ! ElectedLeader
41-
}
33+
@DeveloperApi
34+
trait LeaderElectable {
35+
def electedLeader()
36+
def revokedLeadership()
37+
}
4238

43-
override def receive = {
44-
case _ =>
45-
}
39+
/** Single-node implementation of LeaderElectionAgent -- we're initially and always the leader. */
40+
private[spark] class MonarchyLeaderAgent(val masterActor: LeaderElectable)
41+
extends LeaderElectionAgent {
42+
masterActor.electedLeader()
4643
}

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 24 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ private[spark] class Master(
5050
port: Int,
5151
webUiPort: Int,
5252
val securityMgr: SecurityManager)
53-
extends Actor with ActorLogReceive with Logging {
53+
extends Actor with ActorLogReceive with Logging with LeaderElectable {
5454

5555
import context.dispatcher // to use Akka's scheduler.schedule()
5656

@@ -61,7 +61,6 @@ private[spark] class Master(
6161
val RETAINED_APPLICATIONS = conf.getInt("spark.deploy.retainedApplications", 200)
6262
val RETAINED_DRIVERS = conf.getInt("spark.deploy.retainedDrivers", 200)
6363
val REAPER_ITERATIONS = conf.getInt("spark.dead.worker.persistence", 15)
64-
val RECOVERY_DIR = conf.get("spark.deploy.recoveryDirectory", "")
6564
val RECOVERY_MODE = conf.get("spark.deploy.recoveryMode", "NONE")
6665

6766
val workers = new HashSet[WorkerInfo]
@@ -103,7 +102,7 @@ private[spark] class Master(
103102

104103
var persistenceEngine: PersistenceEngine = _
105104

106-
var leaderElectionAgent: ActorRef = _
105+
var leaderElectionAgent: LeaderElectionAgent = _
107106

108107
private var recoveryCompletionTask: Cancellable = _
109108

@@ -130,23 +129,24 @@ private[spark] class Master(
130129
masterMetricsSystem.start()
131130
applicationMetricsSystem.start()
132131

133-
persistenceEngine = RECOVERY_MODE match {
132+
val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match {
134133
case "ZOOKEEPER" =>
135134
logInfo("Persisting recovery state to ZooKeeper")
136-
new ZooKeeperPersistenceEngine(SerializationExtension(context.system), conf)
135+
val zkFactory = new ZooKeeperRecoveryModeFactory(conf)
136+
(zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))
137137
case "FILESYSTEM" =>
138-
logInfo("Persisting recovery state to directory: " + RECOVERY_DIR)
139-
new FileSystemPersistenceEngine(RECOVERY_DIR, SerializationExtension(context.system))
138+
val fsFactory = new FileSystemRecoveryModeFactory(conf)
139+
(fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))
140+
case "CUSTOM" =>
141+
val clazz = Class.forName(conf.get("spark.deploy.recoveryMode.factory"))
142+
val factory = clazz.getConstructor(conf.getClass)
143+
.newInstance(conf).asInstanceOf[StandaloneRecoveryModeFactory]
144+
(factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this))
140145
case _ =>
141-
new BlackHolePersistenceEngine()
146+
(new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this))
142147
}
143-
144-
leaderElectionAgent = RECOVERY_MODE match {
145-
case "ZOOKEEPER" =>
146-
context.actorOf(Props(classOf[ZooKeeperLeaderElectionAgent], self, masterUrl, conf))
147-
case _ =>
148-
context.actorOf(Props(classOf[MonarchyLeaderAgent], self))
149-
}
148+
persistenceEngine = persistenceEngine_
149+
leaderElectionAgent = leaderElectionAgent_
150150
}
151151

152152
override def preRestart(reason: Throwable, message: Option[Any]) {
@@ -165,7 +165,15 @@ private[spark] class Master(
165165
masterMetricsSystem.stop()
166166
applicationMetricsSystem.stop()
167167
persistenceEngine.close()
168-
context.stop(leaderElectionAgent)
168+
leaderElectionAgent.stop()
169+
}
170+
171+
override def electedLeader() {
172+
self ! ElectedLeader
173+
}
174+
175+
override def revokedLeadership() {
176+
self ! RevokedLeadership
169177
}
170178

171179
override def receiveWithLogging = {

0 commit comments

Comments
 (0)