File tree Expand file tree Collapse file tree 1 file changed +6
-1
lines changed
core/src/main/scala/org/apache/spark/storage Expand file tree Collapse file tree 1 file changed +6
-1
lines changed Original file line number Diff line number Diff line change @@ -140,10 +140,11 @@ private[spark] class DiskBlockObjectWriter(
140140 // serializer stream and the lower level stream.
141141 objOut.flush()
142142 bs.flush()
143- updateBytesWritten()
144143 close()
145144 }
146145 finalPosition = file.length()
146+ // In certain compression codecs, more bytes are written after close() is called
147+ writeMetrics.shuffleBytesWritten += (finalPosition - lastPosition)
147148 }
148149
149150 // Discard current writes. We do this by flushing the outstanding writes and then
@@ -189,6 +190,10 @@ private[spark] class DiskBlockObjectWriter(
189190 new FileSegment (file, initialPosition, finalPosition - initialPosition)
190191 }
191192
193+ /**
194+ * Report the number of bytes written in this writer's shuffle write metrics.
195+ * Note that this is only valid before the underlying streams are closed.
196+ */
192197 private def updateBytesWritten () {
193198 val pos = channel.position()
194199 writeMetrics.shuffleBytesWritten += (pos - lastPosition)
You can’t perform that action at this time.
0 commit comments