From 19ca5a1114dbe666ac40f4c966f3e600b58f92c2 Mon Sep 17 00:00:00 2001 From: Bogdan Raducanu Date: Mon, 10 Apr 2017 14:08:55 +0200 Subject: [PATCH 1/2] fix by synchronizing access to the stream map --- .../org/apache/spark/DebugFilesystem.scala | 26 ++++++++++++------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/DebugFilesystem.scala b/core/src/test/scala/org/apache/spark/DebugFilesystem.scala index 72aea841117cc..8dd3a2d2cbe6c 100644 --- a/core/src/test/scala/org/apache/spark/DebugFilesystem.scala +++ b/core/src/test/scala/org/apache/spark/DebugFilesystem.scala @@ -20,7 +20,6 @@ package org.apache.spark import java.io.{FileDescriptor, InputStream} import java.lang import java.nio.ByteBuffer -import java.util.concurrent.ConcurrentHashMap import scala.collection.JavaConverters._ import scala.collection.mutable @@ -31,21 +30,29 @@ import org.apache.spark.internal.Logging object DebugFilesystem extends Logging { // Stores the set of active streams and their creation sites. - private val openStreams = new ConcurrentHashMap[FSDataInputStream, Throwable]() + private val openStreams = mutable.Map.empty[FSDataInputStream, Throwable] - def clearOpenStreams(): Unit = { + def addOpenStream(stream: FSDataInputStream): Unit = synchronized { + openStreams.put(stream, new Throwable()) + } + + def clearOpenStreams(): Unit = synchronized { openStreams.clear() } - def assertNoOpenStreams(): Unit = { - val numOpen = openStreams.size() + def removeOpenStream(stream: FSDataInputStream): Unit = synchronized { + openStreams.remove(stream) + } + + def assertNoOpenStreams(): Unit = synchronized { + val numOpen = openStreams.values.size if (numOpen > 0) { - for (exc <- openStreams.values().asScala) { + for (exc <- openStreams.values) { logWarning("Leaked filesystem connection created at:") exc.printStackTrace() } throw new IllegalStateException(s"There are $numOpen possibly leaked file streams.", - openStreams.values().asScala.head) + openStreams.values.head) } } } @@ -60,8 +67,7 @@ class DebugFilesystem extends LocalFileSystem { override def open(f: Path, bufferSize: Int): FSDataInputStream = { val wrapped: FSDataInputStream = super.open(f, bufferSize) - openStreams.put(wrapped, new Throwable()) - + addOpenStream(wrapped) new FSDataInputStream(wrapped.getWrappedStream) { override def setDropBehind(dropBehind: lang.Boolean): Unit = wrapped.setDropBehind(dropBehind) @@ -98,7 +104,7 @@ class DebugFilesystem extends LocalFileSystem { override def close(): Unit = { wrapped.close() - openStreams.remove(wrapped) + removeOpenStream(wrapped) } override def read(): Int = wrapped.read() From d486d6015ae0129ce41e6683eae37243c843ba59 Mon Sep 17 00:00:00 2001 From: Bogdan Raducanu Date: Mon, 10 Apr 2017 14:38:33 +0200 Subject: [PATCH 2/2] openStreams.synchronized instead of object synchronized --- .../src/test/scala/org/apache/spark/DebugFilesystem.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/DebugFilesystem.scala b/core/src/test/scala/org/apache/spark/DebugFilesystem.scala index 8dd3a2d2cbe6c..91355f7362900 100644 --- a/core/src/test/scala/org/apache/spark/DebugFilesystem.scala +++ b/core/src/test/scala/org/apache/spark/DebugFilesystem.scala @@ -32,19 +32,19 @@ object DebugFilesystem extends Logging { // Stores the set of active streams and their creation sites. private val openStreams = mutable.Map.empty[FSDataInputStream, Throwable] - def addOpenStream(stream: FSDataInputStream): Unit = synchronized { + def addOpenStream(stream: FSDataInputStream): Unit = openStreams.synchronized { openStreams.put(stream, new Throwable()) } - def clearOpenStreams(): Unit = synchronized { + def clearOpenStreams(): Unit = openStreams.synchronized { openStreams.clear() } - def removeOpenStream(stream: FSDataInputStream): Unit = synchronized { + def removeOpenStream(stream: FSDataInputStream): Unit = openStreams.synchronized { openStreams.remove(stream) } - def assertNoOpenStreams(): Unit = synchronized { + def assertNoOpenStreams(): Unit = openStreams.synchronized { val numOpen = openStreams.values.size if (numOpen > 0) { for (exc <- openStreams.values) {