From 3f96145c5e867df1c1d68d841378b45b1c2e6842 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Tue, 3 Nov 2020 16:34:10 -0800 Subject: [PATCH 01/16] Block TaskCompletion event until the thread ends. --- .../sql/execution/python/EvalPythonExec.scala | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala index 89c7716f7c1b..468a4c685572 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.python import java.io.File +import java.util.concurrent.atomic.AtomicReference import scala.collection.mutable.ArrayBuffer @@ -148,8 +149,21 @@ trait EvalPythonExec extends UnaryExecNode { */ class ContextAwareIterator[IN](iter: Iterator[IN], context: TaskContext) extends Iterator[IN] { - override def hasNext: Boolean = + val thread = new AtomicReference[Thread]() + + context.addTaskCompletionListener[Unit] { _ => + val thread = this.thread.get() + if (thread != null && thread != Thread.currentThread()) { + while (thread.isAlive) { + context.wait(10) + } + } + } + + override def hasNext: Boolean = { + thread.set(Thread.currentThread()) !context.isCompleted() && !context.isInterrupted() && iter.hasNext + } override def next(): IN = iter.next() } From 2a0d2afcb6456308dfa1105828ecd4d0f95a3a8e Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Tue, 3 Nov 2020 18:06:54 -0800 Subject: [PATCH 02/16] Fix. --- .../spark/sql/execution/python/EvalPythonExec.scala | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala index 468a4c685572..28fe1059f3a0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala @@ -152,8 +152,14 @@ class ContextAwareIterator[IN](iter: Iterator[IN], context: TaskContext) extends val thread = new AtomicReference[Thread]() context.addTaskCompletionListener[Unit] { _ => - val thread = this.thread.get() - if (thread != null && thread != Thread.currentThread()) { + var thread: Thread = null + while (thread == null) { + thread = this.thread.get() + if (thread == null) { + context.wait(10) + } + } + if (thread != Thread.currentThread()) { while (thread.isAlive) { context.wait(10) } From 6e5be90afb010ee03359adc0495f0bb0ada9a4f4 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Tue, 3 Nov 2020 19:05:43 -0800 Subject: [PATCH 03/16] Fix. --- .../spark/sql/execution/python/EvalPythonExec.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala index 28fe1059f3a0..2b9144334640 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala @@ -152,14 +152,14 @@ class ContextAwareIterator[IN](iter: Iterator[IN], context: TaskContext) extends val thread = new AtomicReference[Thread]() context.addTaskCompletionListener[Unit] { _ => - var thread: Thread = null - while (thread == null) { + var thread = this.thread.get() + var attempt = 3 + while (thread == null && attempt > 0) { + context.wait(10) thread = this.thread.get() - if (thread == null) { - context.wait(10) - } + attempt -= 1 } - if (thread != Thread.currentThread()) { + if (thread != null && thread != Thread.currentThread()) { while (thread.isAlive) { context.wait(10) } From 895d91d5531b4898e82218e889030a81455bf870 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Tue, 3 Nov 2020 19:39:06 -0800 Subject: [PATCH 04/16] Comments. --- .../apache/spark/sql/execution/python/EvalPythonExec.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala index 2b9144334640..ad0bf460fdaf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala @@ -153,13 +153,17 @@ class ContextAwareIterator[IN](iter: Iterator[IN], context: TaskContext) extends context.addTaskCompletionListener[Unit] { _ => var thread = this.thread.get() + var attempt = 3 while (thread == null && attempt > 0) { + // Wait for a while since the writer thread might not reach to consuming the iterator yet. context.wait(10) thread = this.thread.get() attempt -= 1 } + if (thread != null && thread != Thread.currentThread()) { + // Wait until the writer thread ends. while (thread.isAlive) { context.wait(10) } From 997e1aac7548518b39bf0a46546cdc4813544a65 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Tue, 3 Nov 2020 21:14:53 -0800 Subject: [PATCH 05/16] Fix. --- .../spark/sql/execution/python/EvalPythonExec.scala | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala index ad0bf460fdaf..76e32f5cc321 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.execution.python import java.io.File -import java.util.concurrent.atomic.AtomicReference +import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference} import scala.collection.mutable.ArrayBuffer @@ -151,15 +151,19 @@ class ContextAwareIterator[IN](iter: Iterator[IN], context: TaskContext) extends val thread = new AtomicReference[Thread]() + var failed = new AtomicBoolean(false) + + context.addTaskFailureListener { (_, _) => + failed.set(true) + } + context.addTaskCompletionListener[Unit] { _ => var thread = this.thread.get() - var attempt = 3 - while (thread == null && attempt > 0) { + while (thread == null && !failed.get()) { // Wait for a while since the writer thread might not reach to consuming the iterator yet. context.wait(10) thread = this.thread.get() - attempt -= 1 } if (thread != null && thread != Thread.currentThread()) { From aec13c22754039bc0f93427726717cd6de400529 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Wed, 4 Nov 2020 11:47:37 -0800 Subject: [PATCH 06/16] Fix. --- .../sql/execution/python/EvalPythonExec.scala | 32 ++++++++++--------- 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala index 76e32f5cc321..4b46fc487b50 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala @@ -151,25 +151,27 @@ class ContextAwareIterator[IN](iter: Iterator[IN], context: TaskContext) extends val thread = new AtomicReference[Thread]() - var failed = new AtomicBoolean(false) + if (iter.hasNext) { + val failed = new AtomicBoolean(false) - context.addTaskFailureListener { (_, _) => - failed.set(true) - } - - context.addTaskCompletionListener[Unit] { _ => - var thread = this.thread.get() - - while (thread == null && !failed.get()) { - // Wait for a while since the writer thread might not reach to consuming the iterator yet. - context.wait(10) - thread = this.thread.get() + context.addTaskFailureListener { (_, _) => + failed.set(true) } - if (thread != null && thread != Thread.currentThread()) { - // Wait until the writer thread ends. - while (thread.isAlive) { + context.addTaskCompletionListener[Unit] { _ => + var thread = this.thread.get() + + while (thread == null && !failed.get()) { + // Wait for a while since the writer thread might not reach to consuming the iterator yet. context.wait(10) + thread = this.thread.get() + } + + if (thread != null && thread != Thread.currentThread()) { + // Wait until the writer thread ends. + while (thread.isAlive) { + context.wait(10) + } } } } From 7db9bb862f3e4d31f8327af0b761ffdc60374c64 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Wed, 4 Nov 2020 13:44:15 -0800 Subject: [PATCH 07/16] Run tests. From e2580163e0082490ea13eb365314bd8904a36f04 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Wed, 4 Nov 2020 15:22:04 -0800 Subject: [PATCH 08/16] Run tests. From 0947e2915f6773cb3571809b218cdccefd3c3b32 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Thu, 5 Nov 2020 13:17:37 -0800 Subject: [PATCH 09/16] Revert changes. --- .../sql/execution/python/EvalPythonExec.scala | 32 +------------------ 1 file changed, 1 insertion(+), 31 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala index 4b46fc487b50..89c7716f7c1b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.execution.python import java.io.File -import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference} import scala.collection.mutable.ArrayBuffer @@ -149,37 +148,8 @@ trait EvalPythonExec extends UnaryExecNode { */ class ContextAwareIterator[IN](iter: Iterator[IN], context: TaskContext) extends Iterator[IN] { - val thread = new AtomicReference[Thread]() - - if (iter.hasNext) { - val failed = new AtomicBoolean(false) - - context.addTaskFailureListener { (_, _) => - failed.set(true) - } - - context.addTaskCompletionListener[Unit] { _ => - var thread = this.thread.get() - - while (thread == null && !failed.get()) { - // Wait for a while since the writer thread might not reach to consuming the iterator yet. - context.wait(10) - thread = this.thread.get() - } - - if (thread != null && thread != Thread.currentThread()) { - // Wait until the writer thread ends. - while (thread.isAlive) { - context.wait(10) - } - } - } - } - - override def hasNext: Boolean = { - thread.set(Thread.currentThread()) + override def hasNext: Boolean = !context.isCompleted() && !context.isInterrupted() && iter.hasNext - } override def next(): IN = iter.next() } From 6b07f8d067aeba17d92282df06c6bcb6f1945805 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Thu, 5 Nov 2020 13:18:00 -0800 Subject: [PATCH 10/16] Revert "Revert "[SPARK-33277][PYSPARK][SQL] Use ContextAwareIterator to stop consuming after the task ends"" This reverts commit d530ed0ea8bdba09fba6dcd51f8e4f7745781c2e. --- python/pyspark/sql/tests/test_pandas_map.py | 22 +++++++++++++++++++ .../sql/tests/test_pandas_udf_scalar.py | 19 ++++++++++++++++ python/pyspark/sql/tests/test_udf.py | 20 +++++++++++++++++ .../sql/execution/python/EvalPythonExec.scala | 18 ++++++++++++++- .../execution/python/MapInPandasExec.scala | 7 +++--- 5 files changed, 82 insertions(+), 4 deletions(-) diff --git a/python/pyspark/sql/tests/test_pandas_map.py b/python/pyspark/sql/tests/test_pandas_map.py index 3ca437f75fc2..2cad30c7294d 100644 --- a/python/pyspark/sql/tests/test_pandas_map.py +++ b/python/pyspark/sql/tests/test_pandas_map.py @@ -15,9 +15,12 @@ # limitations under the License. # import os +import shutil +import tempfile import time import unittest +from pyspark.sql import Row from pyspark.testing.sqlutils import ReusedSQLTestCase, have_pandas, have_pyarrow, \ pandas_requirement_message, pyarrow_requirement_message @@ -112,6 +115,25 @@ def func(iterator): expected = df.collect() self.assertEquals(actual, expected) + # SPARK-33277 + def test_map_in_pandas_with_column_vector(self): + path = tempfile.mkdtemp() + shutil.rmtree(path) + + try: + self.spark.range(0, 200000, 1, 1).write.parquet(path) + + def func(iterator): + for pdf in iterator: + yield pd.DataFrame({'id': [0] * len(pdf)}) + + for offheap in ["true", "false"]: + with self.sql_conf({"spark.sql.columnVector.offheap.enabled": offheap}): + self.assertEquals( + self.spark.read.parquet(path).mapInPandas(func, 'id long').head(), Row(0)) + finally: + shutil.rmtree(path) + if __name__ == "__main__": from pyspark.sql.tests.test_pandas_map import * # noqa: F401 diff --git a/python/pyspark/sql/tests/test_pandas_udf_scalar.py b/python/pyspark/sql/tests/test_pandas_udf_scalar.py index 6d325c9085ce..c2c8f6f697c4 100644 --- a/python/pyspark/sql/tests/test_pandas_udf_scalar.py +++ b/python/pyspark/sql/tests/test_pandas_udf_scalar.py @@ -1137,6 +1137,25 @@ def test_datasource_with_udf(self): finally: shutil.rmtree(path) + # SPARK-33277 + def test_pandas_udf_with_column_vector(self): + path = tempfile.mkdtemp() + shutil.rmtree(path) + + try: + self.spark.range(0, 200000, 1, 1).write.parquet(path) + + @pandas_udf(LongType()) + def udf(x): + return pd.Series([0] * len(x)) + + for offheap in ["true", "false"]: + with self.sql_conf({"spark.sql.columnVector.offheap.enabled": offheap}): + self.assertEquals( + self.spark.read.parquet(path).select(udf('id')).head(), Row(0)) + finally: + shutil.rmtree(path) + if __name__ == "__main__": from pyspark.sql.tests.test_pandas_udf_scalar import * # noqa: F401 diff --git a/python/pyspark/sql/tests/test_udf.py b/python/pyspark/sql/tests/test_udf.py index a7dcbfd32ac1..c2e95fd41c5b 100644 --- a/python/pyspark/sql/tests/test_udf.py +++ b/python/pyspark/sql/tests/test_udf.py @@ -674,6 +674,26 @@ def test_udf_cache(self): self.assertEqual(df.select(udf(func)("id"))._jdf.queryExecution() .withCachedData().getClass().getSimpleName(), 'InMemoryRelation') + # SPARK-33277 + def test_udf_with_column_vector(self): + path = tempfile.mkdtemp() + shutil.rmtree(path) + + try: + self.spark.range(0, 100000, 1, 1).write.parquet(path) + + def f(x): + return 0 + + fUdf = udf(f, LongType()) + + for offheap in ["true", "false"]: + with self.sql_conf({"spark.sql.columnVector.offheap.enabled": offheap}): + self.assertEquals( + self.spark.read.parquet(path).select(fUdf('id')).head(), Row(0)) + finally: + shutil.rmtree(path) + class UDFInitializationTests(unittest.TestCase): def tearDown(self): diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala index 298d63478b63..89c7716f7c1b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala @@ -89,6 +89,7 @@ trait EvalPythonExec extends UnaryExecNode { inputRDD.mapPartitions { iter => val context = TaskContext.get() + val contextAwareIterator = new ContextAwareIterator(iter, context) // The queue used to buffer input rows so we can drain it to // combine input with output from Python. @@ -120,7 +121,7 @@ trait EvalPythonExec extends UnaryExecNode { }.toSeq) // Add rows to queue to join later with the result. - val projectedRowIter = iter.map { inputRow => + val projectedRowIter = contextAwareIterator.map { inputRow => queue.add(inputRow.asInstanceOf[UnsafeRow]) projection(inputRow) } @@ -137,3 +138,18 @@ trait EvalPythonExec extends UnaryExecNode { } } } + +/** + * A TaskContext aware iterator. + * + * As the Python evaluation consumes the parent iterator in a separate thread, + * it could consume more data from the parent even after the task ends and the parent is closed. + * Thus, we should use ContextAwareIterator to stop consuming after the task ends. + */ +class ContextAwareIterator[IN](iter: Iterator[IN], context: TaskContext) extends Iterator[IN] { + + override def hasNext: Boolean = + !context.isCompleted() && !context.isInterrupted() && iter.hasNext + + override def next(): IN = iter.next() +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/MapInPandasExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/MapInPandasExec.scala index 2bb808119c0a..7fc18f885a2d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/MapInPandasExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/MapInPandasExec.scala @@ -61,16 +61,17 @@ case class MapInPandasExec( val pythonRunnerConf = ArrowUtils.getPythonRunnerConfMap(conf) val outputTypes = child.schema + val context = TaskContext.get() + val contextAwareIterator = new ContextAwareIterator(inputIter, context) + // Here we wrap it via another row so that Python sides understand it // as a DataFrame. - val wrappedIter = inputIter.map(InternalRow(_)) + val wrappedIter = contextAwareIterator.map(InternalRow(_)) // DO NOT use iter.grouped(). See BatchIterator. val batchIter = if (batchSize > 0) new BatchIterator(wrappedIter, batchSize) else Iterator(wrappedIter) - val context = TaskContext.get() - val columnarBatchIter = new ArrowPythonRunner( chainedFunc, PythonEvalType.SQL_MAP_PANDAS_ITER_UDF, From 8b116471213b70a0eef49e9d58374042a84be6aa Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Thu, 5 Nov 2020 13:18:15 -0800 Subject: [PATCH 11/16] Revert "Revert changes." This reverts commit 0947e2915f6773cb3571809b218cdccefd3c3b32. --- .../sql/execution/python/EvalPythonExec.scala | 32 ++++++++++++++++++- 1 file changed, 31 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala index 89c7716f7c1b..4b46fc487b50 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.python import java.io.File +import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference} import scala.collection.mutable.ArrayBuffer @@ -148,8 +149,37 @@ trait EvalPythonExec extends UnaryExecNode { */ class ContextAwareIterator[IN](iter: Iterator[IN], context: TaskContext) extends Iterator[IN] { - override def hasNext: Boolean = + val thread = new AtomicReference[Thread]() + + if (iter.hasNext) { + val failed = new AtomicBoolean(false) + + context.addTaskFailureListener { (_, _) => + failed.set(true) + } + + context.addTaskCompletionListener[Unit] { _ => + var thread = this.thread.get() + + while (thread == null && !failed.get()) { + // Wait for a while since the writer thread might not reach to consuming the iterator yet. + context.wait(10) + thread = this.thread.get() + } + + if (thread != null && thread != Thread.currentThread()) { + // Wait until the writer thread ends. + while (thread.isAlive) { + context.wait(10) + } + } + } + } + + override def hasNext: Boolean = { + thread.set(Thread.currentThread()) !context.isCompleted() && !context.isInterrupted() && iter.hasNext + } override def next(): IN = iter.next() } From 1ba0b650e3d8f56108abd67d8399840c76b19b37 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Thu, 5 Nov 2020 14:51:19 -0800 Subject: [PATCH 12/16] Run tests. From 87d8854814ddc5fa2b310a802be00ce0cae11075 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Thu, 5 Nov 2020 16:34:23 -0800 Subject: [PATCH 13/16] Run tests. From e2cc2279b9677f4a17d701fca19f1a1b5a9e8037 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Tue, 24 Nov 2020 17:34:31 -0800 Subject: [PATCH 14/16] Add private. --- .../org/apache/spark/sql/execution/python/EvalPythonExec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala index a886a95e7299..2e209973bf82 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala @@ -149,7 +149,7 @@ trait EvalPythonExec extends UnaryExecNode { */ class ContextAwareIterator[IN](iter: Iterator[IN], context: TaskContext) extends Iterator[IN] { - val thread = new AtomicReference[Thread]() + private val thread = new AtomicReference[Thread]() if (iter.hasNext) { val failed = new AtomicBoolean(false) From 429c159ff937afbb785a6ca28c9eca848364149d Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Tue, 1 Dec 2020 15:00:30 -0800 Subject: [PATCH 15/16] Add more comments. --- .../spark/sql/execution/python/EvalPythonExec.scala | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala index 2e209973bf82..e3b2c55c4a6a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala @@ -161,15 +161,21 @@ class ContextAwareIterator[IN](iter: Iterator[IN], context: TaskContext) extends context.addTaskCompletionListener[Unit] { _ => var thread = this.thread.get() + // Wait for a while since the writer thread might not reach to consuming the iterator yet. while (thread == null && !failed.get()) { - // Wait for a while since the writer thread might not reach to consuming the iterator yet. + // Use `context.wait()` instead of `Thread.sleep()` here since the task completion lister + // works under `synchronized(context)`. We might need to consider to improve in the future. + // It's a bad idea to hold an implicit lock when calling user's listener because it's + // pretty easy to cause surprising deadlock. context.wait(10) + thread = this.thread.get() } if (thread != null && thread != Thread.currentThread()) { // Wait until the writer thread ends. while (thread.isAlive) { + // Use `context.wait()` instead of `Thread.sleep()` with the same reason above. context.wait(10) } } From 46613dda79fad1173c2f6305f8fefd5fa44b803b Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Tue, 1 Dec 2020 16:28:08 -0800 Subject: [PATCH 16/16] Rerun tests.