Skip to content

Commit b65e155

Browse files
committed
More changes based on PR comments.
1 parent d7cd15b commit b65e155

File tree

12 files changed

+170
-37
lines changed

12 files changed

+170
-37
lines changed

streaming/src/main/java/org/apache/spark/streaming/util/WriteAheadLog.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,39 +21,39 @@
2121
import java.util.Iterator;
2222

2323
/**
24-
* Interface representing a write ahead log (aka journal) that is used by Spark Streaming to
25-
* save the received data (by receivers) and associated metadata to a reliable storage, so that
26-
* they can be recovered after driver failures. See the Spark docs for more information on how
27-
* to plug in your own custom implementation of a write ahead log.
24+
* This abstract class represents a write ahead log (aka journal) that is used by Spark Streaming
25+
* to save the received data (by receivers) and associated metadata to a reliable storage, so that
26+
* they can be recovered after driver failures. See the Spark documentation for more information
27+
* on how to plug in your own custom implementation of a write ahead log.
2828
*/
2929
@org.apache.spark.annotation.DeveloperApi
30-
public interface WriteAheadLog {
30+
public abstract class WriteAheadLog {
3131
/**
3232
* Write the record to the log and return the segment information that is necessary to read
3333
* back the written record. The time is used to the index the record, such that it can be
3434
* cleaned later. Note that the written data must be durable and readable (using the
3535
* segment info) by the time this function returns.
3636
*/
37-
WriteAheadLogSegment write(ByteBuffer record, long time);
37+
abstract public WriteAheadLogRecordHandle write(ByteBuffer record, long time);
3838

3939
/**
4040
* Read a written record based on the given segment information.
4141
*/
42-
ByteBuffer read(WriteAheadLogSegment segment);
42+
abstract public ByteBuffer read(WriteAheadLogRecordHandle handle);
4343

4444
/**
45-
* Read and return an iterator of all the records that have written and not yet cleanup.
45+
* Read and return an iterator of all the records that have been written but not yet cleaned up.
4646
*/
47-
Iterator<ByteBuffer> readAll();
47+
abstract public Iterator<ByteBuffer> readAll();
4848

4949
/**
50-
* Cleanup all the records that are older than the given threshold time. It can wait for
50+
* Clean all the records that are older than the threshold time. It can wait for
5151
* the completion of the deletion.
5252
*/
53-
void cleanup(long threshTime, boolean waitForCompletion);
53+
abstract public void clean(long threshTime, boolean waitForCompletion);
5454

5555
/**
5656
* Close this log and release any resources.
5757
*/
58-
void close();
58+
abstract public void close();
5959
}

streaming/src/main/java/org/apache/spark/streaming/util/WriteAheadLogSegment.java renamed to streaming/src/main/java/org/apache/spark/streaming/util/WriteAheadLogRecordHandle.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,13 @@
1818
package org.apache.spark.streaming.util;
1919

2020
/**
21-
* This is an interface that represent the information required by any implementation of
22-
* a WriteAheadLog to read a written record.
21+
* This abstract class represents a handle that refers to a record written in a
22+
* {@link org.apache.spark.streaming.util.WriteAheadLog WriteAheadLog}.
23+
* It must contain all the information necessary for the record to be read and returned by
24+
* an implemenation of the WriteAheadLog class.
25+
*
26+
* @see org.apache.spark.streaming.util.WriteAheadLog
2327
*/
2428
@org.apache.spark.annotation.DeveloperApi
25-
public interface WriteAheadLogSegment extends java.io.Serializable {
29+
public abstract class WriteAheadLogRecordHandle implements java.io.Serializable {
2630
}

streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ private[streaming]
4040
class WriteAheadLogBackedBlockRDDPartition(
4141
val index: Int,
4242
val blockId: BlockId,
43-
val segment: WriteAheadLogSegment)
43+
val segment: WriteAheadLogRecordHandle)
4444
extends Partition
4545

4646

@@ -61,7 +61,7 @@ private[streaming]
6161
class WriteAheadLogBackedBlockRDD[T: ClassTag](
6262
@transient sc: SparkContext,
6363
@transient blockIds: Array[BlockId],
64-
@transient segments: Array[WriteAheadLogSegment],
64+
@transient segments: Array[WriteAheadLogRecordHandle],
6565
storeInBlockManager: Boolean,
6666
storageLevel: StorageLevel)
6767
extends BlockRDD[T](sc, blockIds) {

streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import org.apache.hadoop.fs.Path
2626

2727
import org.apache.spark.storage._
2828
import org.apache.spark.streaming.receiver.WriteAheadLogBasedBlockHandler._
29-
import org.apache.spark.streaming.util.{WriteAheadLogSegment, WriteAheadLogUtils}
29+
import org.apache.spark.streaming.util.{WriteAheadLogRecordHandle, WriteAheadLogUtils}
3030
import org.apache.spark.util.{Clock, SystemClock, ThreadUtils}
3131
import org.apache.spark.{Logging, SparkConf, SparkException}
3232

@@ -96,7 +96,7 @@ private[streaming] class BlockManagerBasedBlockHandler(
9696
*/
9797
private[streaming] case class WriteAheadLogBasedStoreResult(
9898
blockId: StreamBlockId,
99-
segment: WriteAheadLogSegment
99+
segment: WriteAheadLogRecordHandle
100100
) extends ReceivedBlockStoreResult
101101

102102

@@ -185,7 +185,7 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
185185
}
186186

187187
def cleanupOldBlocks(threshTime: Long) {
188-
writeAheadLog.cleanup(threshTime, false)
188+
writeAheadLog.clean(threshTime, false)
189189
}
190190

191191
def stop() {

streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ private[streaming] class ReceivedBlockTracker(
155155
logInfo("Deleting batches " + timesToCleanup)
156156
writeToLog(BatchCleanupEvent(timesToCleanup))
157157
timeToAllocatedBlocks --= timesToCleanup
158-
writeAheadLogOption.foreach(_.cleanup(cleanupThreshTime.milliseconds, waitForCompletion))
158+
writeAheadLogOption.foreach(_.clean(cleanupThreshTime.milliseconds, waitForCompletion))
159159
}
160160

161161
/** Stop the block tracker. */

streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ private[streaming] class FileBasedWriteAheadLog(
9595
fileSegment
9696
}
9797

98-
def read(segment: WriteAheadLogSegment): ByteBuffer = {
98+
def read(segment: WriteAheadLogRecordHandle): ByteBuffer = {
9999
val fileSegment = segment.asInstanceOf[FileBasedWriteAheadLogSegment]
100100
var reader: FileBasedWriteAheadLogRandomReader = null
101101
var byteBuffer: ByteBuffer = null
@@ -140,7 +140,7 @@ private[streaming] class FileBasedWriteAheadLog(
140140
* deleted. This should be set to true only for testing. Else the files will be deleted
141141
* asynchronously.
142142
*/
143-
def cleanup(threshTime: Long, waitForCompletion: Boolean): Unit = {
143+
def clean(threshTime: Long, waitForCompletion: Boolean): Unit = {
144144
val oldLogFiles = synchronized { pastLogs.filter { _.endTime < threshTime } }
145145
logInfo(s"Attempting to clear ${oldLogFiles.size} old log files in $logDirectory " +
146146
s"older than $threshTime: ${oldLogFiles.map { _.path }.mkString("\n")}")

streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogSegment.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,4 +18,4 @@ package org.apache.spark.streaming.util
1818

1919
/** Class for representing a segment of data in a write ahead log file */
2020
private[streaming] case class FileBasedWriteAheadLogSegment(path: String, offset: Long, length: Int)
21-
extends WriteAheadLogSegment
21+
extends WriteAheadLogRecordHandle
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.streaming;
19+
20+
import java.util.ArrayList;
21+
import java.nio.ByteBuffer;
22+
import java.util.Arrays;
23+
import java.util.Collection;
24+
25+
import org.apache.commons.collections.CollectionUtils;
26+
import org.apache.commons.collections.Transformer;
27+
import org.apache.spark.SparkConf;
28+
import org.apache.spark.streaming.util.WriteAheadLog;
29+
import org.apache.spark.streaming.util.WriteAheadLogRecordHandle;
30+
import org.apache.spark.streaming.util.WriteAheadLogUtils;
31+
32+
import org.junit.Test;
33+
import org.junit.Assert;
34+
35+
class JavaWriteAheadLogSuiteHandle extends WriteAheadLogRecordHandle {
36+
int index = -1;
37+
public JavaWriteAheadLogSuiteHandle(int idx) {
38+
index = idx;
39+
}
40+
}
41+
42+
public class JavaWriteAheadLogSuite extends WriteAheadLog {
43+
44+
class Record {
45+
long time;
46+
int index;
47+
ByteBuffer buffer;
48+
49+
public Record(long tym, int idx, ByteBuffer buf) {
50+
index = idx;
51+
time = tym;
52+
buffer = buf;
53+
}
54+
}
55+
private int index = -1;
56+
private ArrayList<Record> records = new ArrayList<Record>();
57+
58+
59+
// Methods for WriteAheadLog
60+
@Override
61+
public WriteAheadLogRecordHandle write(java.nio.ByteBuffer record, long time) {
62+
index += 1;
63+
records.add(new org.apache.spark.streaming.JavaWriteAheadLogSuite.Record(time, index, record));
64+
return new JavaWriteAheadLogSuiteHandle(index);
65+
}
66+
67+
@Override
68+
public java.nio.ByteBuffer read(WriteAheadLogRecordHandle handle) {
69+
if (handle instanceof JavaWriteAheadLogSuiteHandle) {
70+
int reqdIndex = ((JavaWriteAheadLogSuiteHandle) handle).index;
71+
for (Record record: records) {
72+
if (record.index == reqdIndex) {
73+
return record.buffer;
74+
}
75+
}
76+
}
77+
return null;
78+
}
79+
80+
@Override
81+
public java.util.Iterator<java.nio.ByteBuffer> readAll() {
82+
Collection<ByteBuffer> buffers = CollectionUtils.collect(records, new Transformer() {
83+
@Override
84+
public Object transform(Object input) {
85+
return ((Record) input).buffer;
86+
}
87+
});
88+
return buffers.iterator();
89+
}
90+
91+
@Override
92+
public void clean(long threshTime, boolean waitForCompletion) {
93+
for (int i = 0; i < records.size(); i++) {
94+
if (records.get(i).time < threshTime) {
95+
records.remove(i);
96+
i--;
97+
}
98+
}
99+
}
100+
101+
@Override
102+
public void close() {
103+
records.clear();
104+
}
105+
106+
@Test
107+
public void testCustomWAL() {
108+
SparkConf conf = new SparkConf();
109+
conf.set("spark.streaming.driver.writeAheadLog.class", JavaWriteAheadLogSuite.class.getName());
110+
WriteAheadLog wal = WriteAheadLogUtils.createLogForDriver(conf, null, null);
111+
112+
String data1 = "data1";
113+
WriteAheadLogRecordHandle handle = wal.write(ByteBuffer.wrap(data1.getBytes()), 1234);
114+
Assert.assertTrue(handle instanceof JavaWriteAheadLogSuiteHandle);
115+
Assert.assertTrue(new String(wal.read(handle).array()).equals(data1));
116+
117+
wal.write(ByteBuffer.wrap("data2".getBytes()), 1235);
118+
wal.write(ByteBuffer.wrap("data3".getBytes()), 1236);
119+
wal.write(ByteBuffer.wrap("data4".getBytes()), 1237);
120+
wal.clean(1236, false);
121+
122+
java.util.Iterator<java.nio.ByteBuffer> dataIterator = wal.readAll();
123+
ArrayList<String> readData = new ArrayList<String>();
124+
while (dataIterator.hasNext()) {
125+
readData.add(new String(dataIterator.next().array()));
126+
}
127+
Assert.assertTrue(readData.equals(Arrays.asList("data3", "data4")));
128+
}
129+
}

streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ class ReceivedBlockHandlerSuite extends FunSuite with BeforeAndAfter with Matche
149149
}
150150
}
151151

152-
test("WriteAheadLogBasedBlockHandler - cleanup old blocks") {
152+
test("WriteAheadLogBasedBlockHandler - clean old blocks") {
153153
withWriteAheadLogBasedBlockHandler { handler =>
154154
val blocks = Seq.tabulate(10) { i => IteratorBlock(Iterator(1 to i)) }
155155
storeBlocks(handler, blocks)

streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ class ReceivedBlockTrackerSuite
8888
receivedBlockTracker.getUnallocatedBlocks(streamId) shouldEqual blockInfos
8989
}
9090

91-
test("block addition, block to batch allocation and cleanup with write ahead log") {
91+
test("block addition, block to batch allocation and clean up with write ahead log") {
9292
val manualClock = new ManualClock
9393
// Set the time increment level to twice the rotation interval so that every increment creates
9494
// a new log file
@@ -175,7 +175,7 @@ class ReceivedBlockTrackerSuite
175175
eventually(timeout(10 seconds), interval(10 millisecond)) {
176176
getWriteAheadLogFiles() should not contain oldestLogFile
177177
}
178-
printLogFiles("After cleanup")
178+
printLogFiles("After clean")
179179

180180
// Restart tracker and verify recovered state, specifically whether info about the first
181181
// batch has been removed, but not the second batch
@@ -206,7 +206,7 @@ class ReceivedBlockTrackerSuite
206206

207207
/**
208208
* Create tracker object with the optional provided clock. Use fake clock if you
209-
* want to control time by manually incrementing it to test log cleanup.
209+
* want to control time by manually incrementing it to test log clean.
210210
*/
211211
def createTracker(
212212
setCheckpointDir: Boolean = true,
@@ -254,7 +254,7 @@ class ReceivedBlockTrackerSuite
254254
BatchAllocationEvent(time, AllocatedBlocks(Map((streamId -> blockInfos))))
255255
}
256256

257-
/** Create batch cleanup object from the given info */
257+
/** Create batch clean object from the given info */
258258
def createBatchCleanup(time: Long, moreTimes: Long*): BatchCleanupEvent = {
259259
BatchCleanupEvent((Seq(time) ++ moreTimes).map(Time.apply))
260260
}

0 commit comments

Comments
 (0)