From c27e35eca5798d534f9b535c4e133d8c6e412709 Mon Sep 17 00:00:00 2001 From: Shanyu Zhao Date: Thu, 5 Sep 2019 17:43:22 -0700 Subject: [PATCH 1/6] SPARK-29003: Spark history server startup hang due to deadlock Signed-off-by: Shanyu Zhao --- .../spark/deploy/history/ApplicationHistoryProvider.scala | 2 ++ .../org/apache/spark/deploy/history/FsHistoryProvider.scala | 6 +++++- .../org/apache/spark/deploy/history/HistoryServer.scala | 1 + 3 files changed, 8 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala index f1c06205bf04c..ecc1d944bddaa 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala @@ -114,6 +114,8 @@ private[history] abstract class ApplicationHistoryProvider { */ def stop(): Unit = { } + def start(): Unit = { } + /** * Returns configuration data to be shown in the History Server home page. * diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 5f9b18ce01279..d86c829c3edbf 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -200,7 +200,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } } - val initThread = initialize() + var initThread:Thread = null private[history] def initialize(): Thread = { if (!isFsInSafeMode()) { @@ -384,6 +384,10 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) Map("Event log directory" -> logDir.toString) ++ safeMode } + override def start(): Unit = { + initThread = initialize() + } + override def stop(): Unit = { try { if (initThread != null && initThread.isAlive()) { diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala index 878f0cb632c5a..661d6255f2e8a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala @@ -151,6 +151,7 @@ class HistoryServer( /** Bind to the HTTP server behind this web interface. */ override def bind() { super.bind() + provider.start() } /** Stop the server and close the file system. */ From 6fcf84c3ec52997b23ebe76f3adfc6341d653233 Mon Sep 17 00:00:00 2001 From: Shanyu Zhao Date: Mon, 9 Sep 2019 17:31:33 -0700 Subject: [PATCH 2/6] Addressing review feedback Signed-off-by: Shanyu Zhao --- .../org/apache/spark/deploy/history/FsHistoryProvider.scala | 2 +- .../scala/org/apache/spark/deploy/history/HistoryServer.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index d86c829c3edbf..51abac472c72a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -200,7 +200,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } } - var initThread:Thread = null + var initThread : Thread = null private[history] def initialize(): Thread = { if (!isFsInSafeMode()) { diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala index 661d6255f2e8a..9beed89d8ed25 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala @@ -151,7 +151,6 @@ class HistoryServer( /** Bind to the HTTP server behind this web interface. */ override def bind() { super.bind() - provider.start() } /** Stop the server and close the file system. */ @@ -298,6 +297,7 @@ object HistoryServer extends Logging { val server = new HistoryServer(conf, provider, securityManager, port) server.bind() + provider.start() ShutdownHookManager.addShutdownHook { () => server.stop() } From c3b4411d8a99013d4ef9f7585dc478d5592bb2bb Mon Sep 17 00:00:00 2001 From: Shanyu Zhao Date: Mon, 9 Sep 2019 19:07:02 -0700 Subject: [PATCH 3/6] Addressing feedback Signed-off-by: Shanyu Zhao --- .../org/apache/spark/deploy/history/FsHistoryProvider.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 51abac472c72a..dce9581be2905 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -200,7 +200,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } } - var initThread : Thread = null + var initThread: Thread = null private[history] def initialize(): Thread = { if (!isFsInSafeMode()) { From 046dd767da7500f2fe57616fdcb6fd255801a924 Mon Sep 17 00:00:00 2001 From: Shanyu Zhao Date: Mon, 9 Sep 2019 22:03:58 -0700 Subject: [PATCH 4/6] Fix test failure Signed-off-by: Shanyu Zhao --- .../org/apache/spark/deploy/history/HistoryServerSuite.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala index dbc1938ed469a..71a127bd4b9f6 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala @@ -94,6 +94,7 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers server = new HistoryServer(conf, provider, securityManager, 18080) server.initialize() server.bind() + provider.start() port = server.boundPort } @@ -451,6 +452,7 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers server = new HistoryServer(myConf, provider, securityManager, 0) server.initialize() server.bind() + provider.start() val port = server.boundPort val metrics = server.cacheMetrics From e8d8024a969aba1d9895d3331fc71915a0250c66 Mon Sep 17 00:00:00 2001 From: Shanyu Zhao Date: Thu, 12 Sep 2019 10:31:04 -0700 Subject: [PATCH 5/6] Add comments to start() Signed-off-by: Shanyu Zhao --- .../spark/deploy/history/ApplicationHistoryProvider.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala index ecc1d944bddaa..97ffb690e34c7 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala @@ -114,6 +114,9 @@ private[history] abstract class ApplicationHistoryProvider { */ def stop(): Unit = { } + /** + * Called when the server is starting up. + */ def start(): Unit = { } /** From 75dc86d55e8d144a1ea8f911b39817a482fd033b Mon Sep 17 00:00:00 2001 From: Shanyu Zhao Date: Fri, 13 Sep 2019 15:25:41 -0700 Subject: [PATCH 6/6] Update comments for start() Signed-off-by: Shanyu Zhao --- .../spark/deploy/history/ApplicationHistoryProvider.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala index 97ffb690e34c7..472b52957ed7f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala @@ -115,7 +115,8 @@ private[history] abstract class ApplicationHistoryProvider { def stop(): Unit = { } /** - * Called when the server is starting up. + * Called when the server is starting up. Implement this function to init the provider and start + * background threads. With this function we can start provider later after it is created. */ def start(): Unit = { }