Skip to content
Closed
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import kafka.message.MessageAndMetadata
import kafka.serializer.{DefaultDecoder, Decoder, StringDecoder}

import org.apache.spark.api.java.function.{Function => JFunction}
import org.apache.spark.streaming.util.WriteAheadLogUtils
import org.apache.spark.{SparkContext, SparkException}
import org.apache.spark.annotation.Experimental
import org.apache.spark.rdd.RDD
Expand Down Expand Up @@ -79,7 +80,7 @@ object KafkaUtils {
topics: Map[String, Int],
storageLevel: StorageLevel
): ReceiverInputDStream[(K, V)] = {
val walEnabled = ssc.conf.getBoolean("spark.streaming.receiver.writeAheadLog.enable", false)
val walEnabled = WriteAheadLogUtils.enableReceiverLog(ssc.conf)
new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, walEnabled, storageLevel)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.streaming.util;

import java.nio.ByteBuffer;
import java.util.Iterator;

/**
* Interface representing a write ahead log (aka journal) that is used by Spark Streaming to
* save the received data (by receivers) and associated metadata to a reliable storage, so that
* they can be recovered after driver failures. See the Spark docs for more information on how
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be good to give a high level description like "A WriteAheadLog is any storage service capable of persisting data binary data associated with a particular time and removing all data older than a certain time."

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. Will do.

* to plug in your own custom implementation of a write ahead log.
*/
@org.apache.spark.annotation.DeveloperApi
public interface WriteAheadLog {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the idea that this would be useful for Java implementations to keep this a Java interface?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Its meant for users to create arbitrary implementations and we want to
stay backward compatible (scala traits have pretty nasty corner cases).
On Apr 26, 2015 9:48 PM, "Hari Shreedharan" [email protected]
wrote:

In
streaming/src/main/java/org/apache/spark/streaming/util/WriteAheadLog.java
#5645 (comment):

  • * limitations under the License.
  • /
    +
    +package org.apache.spark.streaming.util;
    +
    +import java.nio.ByteBuffer;
    +import java.util.Iterator;
    +
    +/
    *
  • * Interface representing a write ahead log (aka journal) that is used by Spark Streaming to
  • * save the received data (by receivers) and associated metadata to a reliable storage, so that
  • * they can be recovered after driver failures. See the Spark docs for more information on how
  • * to plug in your own custom implementation of a write ahead log.
  • */
    [email protected]
    +public interface WriteAheadLog {

Is the idea that this would be useful for Java implementations to keep
this a Java interface?


Reply to this email directly or view it on GitHub
https://github.com/apache/spark/pull/5645/files#r29120494.

/**
* Write the record to the log and return the segment information that is necessary to read
* back the written record. The time is used to the index the record, such that it can be
* cleaned later. Note that the written data must be durable and readable (using the
* segment info) by the time this function returns.
*/
WriteAheadLogSegment write(ByteBuffer record, long time);

/**
* Read a written record based on the given segment information.
*/
ByteBuffer read(WriteAheadLogSegment segment);

/**
* Read and return an iterator of all the records that have written and not yet cleanup.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not yet cleaned up.

*/
Iterator<ByteBuffer> readAll();

/**
* Cleanup all the records that are older than the given threshold time. It can wait for
* the completion of the deletion.
*/
void cleanup(long threshTime, boolean waitForCompletion);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For some reason I feel like it should just be "clean" instead of "cleanup" (as in "log cleaning"). This is totally subjective though so I think the current one is OK to.


/**
* Close this log and release any resources.
*/
void close();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.streaming.util;

/**
* This is an interface that represent the information required by any implementation of
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

represents

* a WriteAheadLog to read a written record.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be good to say something like 'Represents an opaque identifier that references metadata required to read a specific piece of data".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Saying "opaque" is kind of confusing. Its opaque to Spark, but not to whoever is implementing the WAL. And its not for referring to arbitrary piece of data, but specifically for referring to a record that has been written using a specific WriteAheadLog implementation.

*/
@org.apache.spark.annotation.DeveloperApi
public interface WriteAheadLogSegment extends java.io.Serializable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should more explicitly build the serialization of these segment identifiers into this interface. One extreme option is to have the segment identifiers actually be byte buffers and ask the user to deal on their own with serializing them.

The main concerns I have are the following:

  1. Individual implementations of this must be java Serializable, but it's not possible to reflect that in the interface.
  2. If those implementations want to evolve over different versions for instance they add a new field to the segment identifier, it will be tricky for them to do in a way that's backwards compatible (they'll have to write a custom externalization logic, which isn't really used for backwards compatibility).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also could we call this a WALSegmentHandle or something? This isn't the segment itself it's just an identifier.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Well, for advanced users who want to implement their own WAL implementation will have to ensure that the segment info is serializable, no matter whether we expose an interface or a bytebuffer. In fact, exposing an interface avoids them from writing the code to serialize and return a bytebuffer in a usual case, which is easier to user. Also this interface is expected to be called not faster than 100s of time per second. So does not require super high serialization efficiency. Even if they want, they can always make the implementation extend Externalizable.
  2. That is a good point. There are easy workaround even if we dont make this a ByteBuffer. They can put a bytebuffer within their implementation class MyWALSegment(byteBuffer: ByteBuffer) extends WALSegment. Now for people who dont care about backward compatibility, making it a bytebuffer make it harder for them to implement. For others who do care about backward compatibility, they will have to write custom externalization logic either way, while returning bytebuffer or returning MyWALSegment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But in the current approach, they can't for instance use kryo or protobuf to serialize, unless they do something really crazy like use an externalizable hook to then call into Kryo. I guess I'm just thinking ahead to how this will evolve. However, if we want to have this in the future we can always create an alternative version that is additive, so I don't feel strongly at all

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, this is an interface, so I am not sure we can create an alternate
method without breaking binary compatibility.
Well, they could leave the serialization of MyWALSegment to Java, which is
just a wrapper for a ByteBuffer/byte array which contains all the real
data. If that sounds too complicated, then may be we should do bytes. And
probably we should simply use byte array instead of ByteBuffer, as we
probably dont need to deal with direct byte buffers here.

On Mon, Apr 27, 2015 at 9:31 PM, Patrick Wendell [email protected]
wrote:

In
streaming/src/main/java/org/apache/spark/streaming/util/WriteAheadLogSegment.java
#5645 (comment):

  • * Unless required by applicable law or agreed to in writing, software
  • * distributed under the License is distributed on an "AS IS" BASIS,
  • * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  • * See the License for the specific language governing permissions and
  • * limitations under the License.
  • /
    +
    +package org.apache.spark.streaming.util;
    +
    +/
    *
  • * This is an interface that represent the information required by any implementation of
  • * a WriteAheadLog to read a written record.
  • */
    [email protected]
    +public interface WriteAheadLogSegment extends java.io.Serializable {

But in the current approach, they can't for instance use kryo or protobuf
to serialize, unless they do something really crazy like use an
externalizable hook to then call into Kryo. I guess I'm just thinking ahead
to how this will evolve. However, if we want to have this in the future we
can always create an alternative version that is additive, so I don't feel
strongly at all


Reply to this email directly or view it on GitHub
https://github.com/apache/spark/pull/5645/files#r29213656.

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,17 @@
*/
package org.apache.spark.streaming.rdd

import java.nio.ByteBuffer

import scala.reflect.ClassTag
import scala.util.control.NonFatal

import org.apache.hadoop.conf.Configuration
import org.apache.commons.io.FileUtils

import org.apache.spark._
import org.apache.spark.rdd.BlockRDD
import org.apache.spark.storage.{BlockId, StorageLevel}
import org.apache.spark.streaming.util.{HdfsUtils, WriteAheadLogFileSegment, WriteAheadLogRandomReader}
import org.apache.spark.streaming.util._

/**
* Partition class for [[org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD]].
Expand All @@ -37,7 +40,7 @@ private[streaming]
class WriteAheadLogBackedBlockRDDPartition(
val index: Int,
val blockId: BlockId,
val segment: WriteAheadLogFileSegment)
val segment: WriteAheadLogSegment)
extends Partition


Expand All @@ -58,7 +61,7 @@ private[streaming]
class WriteAheadLogBackedBlockRDD[T: ClassTag](
@transient sc: SparkContext,
@transient blockIds: Array[BlockId],
@transient segments: Array[WriteAheadLogFileSegment],
@transient segments: Array[WriteAheadLogSegment],
storeInBlockManager: Boolean,
storageLevel: StorageLevel)
extends BlockRDD[T](sc, blockIds) {
Expand Down Expand Up @@ -96,9 +99,27 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
logDebug(s"Read partition data of $this from block manager, block $blockId")
iterator
case None => // Data not found in Block Manager, grab it from write ahead log file
val reader = new WriteAheadLogRandomReader(partition.segment.path, hadoopConf)
val dataRead = reader.read(partition.segment)
reader.close()
var dataRead: ByteBuffer = null
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel dirty seeing nulls in scala

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

    ByteBuffer.wrap(new byte[0])

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why allocate two (at least) objects when it is completely obvious that they are not going to be used. The null does not get exposed to anything outside the function, and hence is okay to have.

If you look at rest of the Spark source code, we dont strictly adhere to Scala-way of doing things, rather balance code understandability (limit the levels of functional nesting) and efficiency (while loops instead of for when perf matters) with Scala styles.

var writeAheadLog: WriteAheadLog = null
try {
val dummyDirectory = FileUtils.getTempDirectoryPath()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why here need to use dummyDirectory? Assuming WAL may not be file-based, so I'm not sure what's the meaning we need to have this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the default WAL is file based so a log directory is needed for it to work. However, the log directory is really not needed reading a particular record. But to read a single record you have to create a FileBasedWriteAheadLog object, which needs a log directory. Hence I am providing a dummy directory for this.

I know that this is a little awkward. This is the cost of defining a single interface for both writing and reading single records. Earlier there were two independent classes (WALWriter and WALRandomReader) that was used for these two purposes, which has different requirements. But since I am trying make single interface that can be used for all reading and writing, the log directory must be provided in the constructor of the default file-based WAL. This results in the awkwardness.

I dont quite like it myself, but it may practically be okay as long as we ensure that the FileBasedWAL does not create unnecessary directories/files when only reading a single record. I can add a test to ensure that.

writeAheadLog = WriteAheadLogUtils.createLogForReceiver(
SparkEnv.get.conf, dummyDirectory, hadoopConf)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also IIUC here if the journal system if not hadoop based, hadoopConf may not be available.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hadoopConf is always available through the SparkContext. Irrespective of whether Hadoop file system is used, a Hadoop conf is created by the SparkContext which is passed on to this location. If the WAL is not the default FileBasedWAL, then this parameter is just ignored (see the method WriteAheadLogUtils.createLogForReceiver

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I'm thinking is that do we need to have this parameter for the interface, can we hide this into file-based WAL implementations.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The log directory needs to be passed through the WriteAheadLogUtils.createLogForXXX(). If you want to hide it from this method, and pass it through the SparkConf, then every place where WriteAheadLogUtils.createLogForXXX() needs to be called, we need to add the following.

val walConf = SparkEnv.get.conf.clone()
walConf.set("logdir", ....)
`WriteAheadLogUtils.createLogForXXX(walConf, hadoopConf)`

IMO that duplicates code everywhere and uglier that this dummy dir approach. And also, this does not handle hadoopConfparameter, which is equally unnecessary as the log directory for arbitrary WAL implementations.

dataRead = writeAheadLog.read(partition.segment)
} catch {
case NonFatal(e) =>
throw new SparkException(
s"Could not read data from write ahead log segment ${partition.segment}", e)
} finally {
if (writeAheadLog != null) {
writeAheadLog.close()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be reset writeAheadLog to null after close to avoid unexpected behavior :).

writeAheadLog = null

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}
}
if (dataRead == null) {
throw new SparkException(
s"Could not read data from write ahead log segment ${partition.segment}, " +
s"read returned null")
}
logInfo(s"Read partition data of $this from write ahead log, segment ${partition.segment}")
if (storeInBlockManager) {
blockManager.putBytes(blockId, dataRead, storageLevel)
Expand All @@ -117,8 +138,14 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
override def getPreferredLocations(split: Partition): Seq[String] = {
val partition = split.asInstanceOf[WriteAheadLogBackedBlockRDDPartition]
val blockLocations = getBlockIdLocations().get(partition.blockId)
blockLocations.getOrElse(
HdfsUtils.getFileSegmentLocations(
partition.segment.path, partition.segment.offset, partition.segment.length, hadoopConfig))
blockLocations.getOrElse {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might make sense to add location info to the WALRecordHandle interface itself. This way, systems that are not HDFS, but still benefit from preferred locations can use it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point. I wasnt super sure of whether it is a good idea to have it in the interface in this version. We can add it later and maintain binary compatibility as the RecordHandle is an abstract class. Also It is still a developer API s. For now, I am going to merge this in to unblock #5732 .

partition.segment match {
case fileSegment: FileBasedWriteAheadLogSegment =>
HdfsUtils.getFileSegmentLocations(
fileSegment.path, fileSegment.offset, fileSegment.length, hadoopConfig)
case _ =>
Seq.empty
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,18 @@

package org.apache.spark.streaming.receiver

import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.language.{existentials, postfixOps}

import WriteAheadLogBasedBlockHandler._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path

import org.apache.spark.{Logging, SparkConf, SparkException}
import org.apache.spark.storage._
import org.apache.spark.streaming.util.{WriteAheadLogFileSegment, WriteAheadLogManager}
import org.apache.spark.util.{ThreadUtils, Clock, SystemClock}
import org.apache.spark.streaming.receiver.WriteAheadLogBasedBlockHandler._
import org.apache.spark.streaming.util.{WriteAheadLogSegment, WriteAheadLogUtils}
import org.apache.spark.util.{Clock, SystemClock, ThreadUtils}
import org.apache.spark.{Logging, SparkConf, SparkException}

/** Trait that represents the metadata related to storage of blocks */
private[streaming] trait ReceivedBlockStoreResult {
Expand Down Expand Up @@ -96,7 +96,7 @@ private[streaming] class BlockManagerBasedBlockHandler(
*/
private[streaming] case class WriteAheadLogBasedStoreResult(
blockId: StreamBlockId,
segment: WriteAheadLogFileSegment
segment: WriteAheadLogSegment
) extends ReceivedBlockStoreResult


Expand All @@ -116,10 +116,6 @@ private[streaming] class WriteAheadLogBasedBlockHandler(

private val blockStoreTimeout = conf.getInt(
"spark.streaming.receiver.blockStoreTimeout", 30).seconds
private val rollingInterval = conf.getInt(
"spark.streaming.receiver.writeAheadLog.rollingInterval", 60)
private val maxFailures = conf.getInt(
"spark.streaming.receiver.writeAheadLog.maxFailures", 3)

private val effectiveStorageLevel = {
if (storageLevel.deserialized) {
Expand All @@ -139,13 +135,9 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
s"$effectiveStorageLevel when write ahead log is enabled")
}

// Manages rolling log files
private val logManager = new WriteAheadLogManager(
checkpointDirToLogDir(checkpointDir, streamId),
hadoopConf, rollingInterval, maxFailures,
callerName = this.getClass.getSimpleName,
clock = clock
)
// Write ahead log manages
private val writeAheadLog = WriteAheadLogUtils.createLogForReceiver(
conf, checkpointDirToLogDir(checkpointDir, streamId), hadoopConf)

// For processing futures used in parallel block storing into block manager and write ahead log
// # threads = 2, so that both writing to BM and WAL can proceed in parallel
Expand Down Expand Up @@ -183,7 +175,7 @@ private[streaming] class WriteAheadLogBasedBlockHandler(

// Store the block in write ahead log
val storeInWriteAheadLogFuture = Future {
logManager.writeToLog(serializedBlock)
writeAheadLog.write(serializedBlock, clock.getTimeMillis())
}

// Combine the futures, wait for both to complete, and return the write ahead log segment
Expand All @@ -193,11 +185,11 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
}

def cleanupOldBlocks(threshTime: Long) {
logManager.cleanupOldLogs(threshTime, waitForCompletion = false)
writeAheadLog.cleanup(threshTime, false)
}

def stop() {
logManager.stop()
writeAheadLog.close()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@ import scala.collection.mutable.ArrayBuffer
import com.google.common.base.Throwables
import org.apache.hadoop.conf.Configuration

import org.apache.spark.{Logging, SparkEnv, SparkException}
import org.apache.spark.rpc.{RpcEnv, ThreadSafeRpcEndpoint}
import org.apache.spark.storage.StreamBlockId
import org.apache.spark.streaming.Time
import org.apache.spark.streaming.scheduler._
import org.apache.spark.streaming.util.WriteAheadLogUtils
import org.apache.spark.util.{RpcUtils, Utils}
import org.apache.spark.{Logging, SparkEnv, SparkException}

/**
* Concrete implementation of [[org.apache.spark.streaming.receiver.ReceiverSupervisor]]
Expand All @@ -46,7 +47,7 @@ private[streaming] class ReceiverSupervisorImpl(
) extends ReceiverSupervisor(receiver, env.conf) with Logging {

private val receivedBlockHandler: ReceivedBlockHandler = {
if (env.conf.getBoolean("spark.streaming.receiver.writeAheadLog.enable", false)) {
if (WriteAheadLogUtils.enableReceiverLog(env.conf)) {
if (checkpointDirOption.isEmpty) {
throw new SparkException(
"Cannot enable receiver write-ahead log without checkpoint directory set. " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ import scala.language.implicitConversions
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path

import org.apache.spark.{SparkException, Logging, SparkConf}
import org.apache.spark.streaming.Time
import org.apache.spark.streaming.util.WriteAheadLogManager
import org.apache.spark.streaming.util.{WriteAheadLog, WriteAheadLogUtils}
import org.apache.spark.util.{Clock, Utils}
import org.apache.spark.{Logging, SparkConf, SparkException}

/** Trait representing any event in the ReceivedBlockTracker that updates its state. */
private[streaming] sealed trait ReceivedBlockTrackerLogEvent
Expand Down Expand Up @@ -70,7 +70,7 @@ private[streaming] class ReceivedBlockTracker(

private val streamIdToUnallocatedBlockQueues = new mutable.HashMap[Int, ReceivedBlockQueue]
private val timeToAllocatedBlocks = new mutable.HashMap[Time, AllocatedBlocks]
private val logManagerOption = createLogManager()
private val writeAheadLogOption = createWriteAheadLog()

private var lastAllocatedBatchTime: Time = null

Expand Down Expand Up @@ -155,12 +155,12 @@ private[streaming] class ReceivedBlockTracker(
logInfo("Deleting batches " + timesToCleanup)
writeToLog(BatchCleanupEvent(timesToCleanup))
timeToAllocatedBlocks --= timesToCleanup
logManagerOption.foreach(_.cleanupOldLogs(cleanupThreshTime.milliseconds, waitForCompletion))
writeAheadLogOption.foreach(_.cleanup(cleanupThreshTime.milliseconds, waitForCompletion))
}

/** Stop the block tracker. */
def stop() {
logManagerOption.foreach { _.stop() }
writeAheadLogOption.foreach { _.close() }
}

/**
Expand Down Expand Up @@ -190,9 +190,10 @@ private[streaming] class ReceivedBlockTracker(
timeToAllocatedBlocks --= batchTimes
}

logManagerOption.foreach { logManager =>
writeAheadLogOption.foreach { writeAheadLog =>
logInfo(s"Recovering from write ahead logs in ${checkpointDirOption.get}")
logManager.readFromLog().foreach { byteBuffer =>
import scala.collection.JavaConversions._
writeAheadLog.readAll().foreach { byteBuffer =>
logTrace("Recovering record " + byteBuffer)
Utils.deserialize[ReceivedBlockTrackerLogEvent](byteBuffer.array) match {
case BlockAdditionEvent(receivedBlockInfo) =>
Expand All @@ -208,10 +209,10 @@ private[streaming] class ReceivedBlockTracker(

/** Write an update to the tracker to the write ahead log */
private def writeToLog(record: ReceivedBlockTrackerLogEvent) {
if (isLogManagerEnabled) {
if (isWriteAheadLogEnabled) {
logDebug(s"Writing to log $record")
logManagerOption.foreach { logManager =>
logManager.writeToLog(ByteBuffer.wrap(Utils.serialize(record)))
writeAheadLogOption.foreach { logManager =>
logManager.write(ByteBuffer.wrap(Utils.serialize(record)), clock.getTimeMillis())
}
}
}
Expand All @@ -222,28 +223,25 @@ private[streaming] class ReceivedBlockTracker(
}

/** Optionally create the write ahead log manager only if the feature is enabled */
private def createLogManager(): Option[WriteAheadLogManager] = {
if (conf.getBoolean("spark.streaming.receiver.writeAheadLog.enable", false)) {
private def createWriteAheadLog(): Option[WriteAheadLog] = {
if (WriteAheadLogUtils.enableReceiverLog(conf)) {
if (checkpointDirOption.isEmpty) {
throw new SparkException(
"Cannot enable receiver write-ahead log without checkpoint directory set. " +
"Please use streamingContext.checkpoint() to set the checkpoint directory. " +
"See documentation for more details.")
}
val logDir = ReceivedBlockTracker.checkpointDirToLogDir(checkpointDirOption.get)
val rollingIntervalSecs = conf.getInt(
"spark.streaming.receivedBlockTracker.writeAheadLog.rotationIntervalSecs", 60)
val logManager = new WriteAheadLogManager(logDir, hadoopConf,
rollingIntervalSecs = rollingIntervalSecs, clock = clock,
callerName = "ReceivedBlockHandlerMaster")
Some(logManager)

val log = WriteAheadLogUtils.createLogForDriver(conf, logDir, hadoopConf)
Some(log)
} else {
None
}
}

/** Check if the log manager is enabled. This is only used for testing purposes. */
private[streaming] def isLogManagerEnabled: Boolean = logManagerOption.nonEmpty
/** Check if the write ahead log is enabled. This is only used for testing purposes. */
private[streaming] def isWriteAheadLogEnabled: Boolean = writeAheadLogOption.nonEmpty
}

private[streaming] object ReceivedBlockTracker {
Expand Down
Loading