Skip to content

Commit 5e189c6

Browse files
committed
Track time spend closing / flushing files; split TimeTrackingOutputStream into separate file.
1 parent d5779c6 commit 5e189c6

File tree

3 files changed

+20
-32
lines changed

3 files changed

+20
-32
lines changed

core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import org.apache.spark.shuffle.ShuffleMemoryManager;
5151
import org.apache.spark.shuffle.ShuffleWriter;
5252
import org.apache.spark.storage.BlockManager;
53+
import org.apache.spark.storage.TimeTrackingOutputStream;
5354
import org.apache.spark.unsafe.PlatformDependent;
5455
import org.apache.spark.unsafe.memory.TaskMemoryManager;
5556

@@ -301,7 +302,7 @@ private long[] mergeSpillsWithFileStream(
301302
for (int partition = 0; partition < numPartitions; partition++) {
302303
final long initialFileLength = outputFile.length();
303304
mergedFileOutputStream =
304-
new TimeTrackingFileOutputStream(writeMetrics, new FileOutputStream(outputFile, true));
305+
new TimeTrackingOutputStream(writeMetrics, new FileOutputStream(outputFile, true));
305306
if (compressionCodec != null) {
306307
mergedFileOutputStream = compressionCodec.compressedOutputStream(mergedFileOutputStream);
307308
}

core/src/main/java/org/apache/spark/shuffle/unsafe/TimeTrackingOutputStream.java renamed to core/src/main/java/org/apache/spark/storage/TimeTrackingOutputStream.java

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,25 +15,23 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.spark.shuffle.unsafe;
18+
package org.apache.spark.storage;
1919

20-
import org.apache.spark.executor.ShuffleWriteMetrics;
21-
22-
import java.io.FileOutputStream;
2320
import java.io.IOException;
2421
import java.io.OutputStream;
2522

23+
import org.apache.spark.executor.ShuffleWriteMetrics;
24+
2625
/**
27-
* Intercepts write calls and tracks total time spent writing.
26+
* Intercepts write calls and tracks total time spent writing in order to update shuffle write
27+
* metrics. Not thread safe.
2828
*/
29-
final class TimeTrackingFileOutputStream extends OutputStream {
29+
public final class TimeTrackingOutputStream extends OutputStream {
3030

3131
private final ShuffleWriteMetrics writeMetrics;
32-
private final FileOutputStream outputStream;
32+
private final OutputStream outputStream;
3333

34-
public TimeTrackingFileOutputStream(
35-
ShuffleWriteMetrics writeMetrics,
36-
FileOutputStream outputStream) {
34+
public TimeTrackingOutputStream(ShuffleWriteMetrics writeMetrics, OutputStream outputStream) {
3735
this.writeMetrics = writeMetrics;
3836
this.outputStream = outputStream;
3937
}
@@ -49,7 +47,8 @@ public void write(int b) throws IOException {
4947
public void write(byte[] b) throws IOException {
5048
final long startTime = System.nanoTime();
5149
outputStream.write(b);
52-
writeMetrics.incShuffleWriteTime(System.nanoTime() - startTime); }
50+
writeMetrics.incShuffleWriteTime(System.nanoTime() - startTime);
51+
}
5352

5453
@Override
5554
public void write(byte[] b, int off, int len) throws IOException {
@@ -60,11 +59,15 @@ public void write(byte[] b, int off, int len) throws IOException {
6059

6160
@Override
6261
public void flush() throws IOException {
62+
final long startTime = System.nanoTime();
6363
outputStream.flush();
64+
writeMetrics.incShuffleWriteTime(System.nanoTime() - startTime);
6465
}
6566

6667
@Override
6768
public void close() throws IOException {
69+
final long startTime = System.nanoTime();
6870
outputStream.close();
71+
writeMetrics.incShuffleWriteTime(System.nanoTime() - startTime);
6972
}
7073
}

core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala

Lines changed: 4 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -86,16 +86,6 @@ private[spark] class DiskBlockObjectWriter(
8686
extends BlockObjectWriter(blockId)
8787
with Logging
8888
{
89-
/** Intercepts write calls and tracks total time spent writing. Not thread safe. */
90-
private class TimeTrackingOutputStream(out: OutputStream) extends OutputStream {
91-
override def write(i: Int): Unit = callWithTiming(out.write(i))
92-
override def write(b: Array[Byte]): Unit = callWithTiming(out.write(b))
93-
override def write(b: Array[Byte], off: Int, len: Int): Unit = {
94-
callWithTiming(out.write(b, off, len))
95-
}
96-
override def close(): Unit = out.close()
97-
override def flush(): Unit = out.flush()
98-
}
9989

10090
/** The file channel, used for repositioning / truncating the file. */
10191
private var channel: FileChannel = null
@@ -136,7 +126,7 @@ private[spark] class DiskBlockObjectWriter(
136126
throw new IllegalStateException("Writer already closed. Cannot be reopened.")
137127
}
138128
fos = new FileOutputStream(file, true)
139-
ts = new TimeTrackingOutputStream(fos)
129+
ts = new TimeTrackingOutputStream(writeMetrics, fos)
140130
channel = fos.getChannel()
141131
bs = compressStream(new BufferedOutputStream(ts, bufferSize))
142132
objOut = serializerInstance.serializeStream(bs)
@@ -150,9 +140,9 @@ private[spark] class DiskBlockObjectWriter(
150140
if (syncWrites) {
151141
// Force outstanding writes to disk and track how long it takes
152142
objOut.flush()
153-
callWithTiming {
154-
fos.getFD.sync()
155-
}
143+
val start = System.nanoTime()
144+
fos.getFD.sync()
145+
writeMetrics.incShuffleWriteTime(System.nanoTime() - start)
156146
}
157147
} {
158148
objOut.close()
@@ -251,12 +241,6 @@ private[spark] class DiskBlockObjectWriter(
251241
reportedPosition = pos
252242
}
253243

254-
private def callWithTiming(f: => Unit) = {
255-
val start = System.nanoTime()
256-
f
257-
writeMetrics.incShuffleWriteTime(System.nanoTime() - start)
258-
}
259-
260244
// For testing
261245
private[spark] override def flush() {
262246
objOut.flush()

0 commit comments

Comments
 (0)