From 563ff3deb07dc7c955cfb7c85a715df279da3db1 Mon Sep 17 00:00:00 2001 From: Hyukjin Kwon Date: Tue, 20 Feb 2024 10:49:52 +0900 Subject: [PATCH] Deflake "interrupt tag" at SparkSessionE2ESuite --- .../spark/sql/SparkSessionE2ESuite.scala | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SparkSessionE2ESuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SparkSessionE2ESuite.scala index c76dc724828e..e4cbcf620d15 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SparkSessionE2ESuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SparkSessionE2ESuite.scala @@ -128,9 +128,9 @@ class SparkSessionE2ESuite extends RemoteSparkSession { assert(spark.getTags() == Set("one")) try { spark - .range(10) + .range(start = 0, end = 10, step = 1, numPartitions = 2) .map(n => { - Thread.sleep(30000); n + Thread.sleep(40000); n }) .collect() } finally { @@ -146,9 +146,9 @@ class SparkSessionE2ESuite extends RemoteSparkSession { assert(spark.getTags() == Set("one", "two")) try { spark - .range(10) + .range(start = 0, end = 10, step = 1, numPartitions = 2) .map(n => { - Thread.sleep(30000); n + Thread.sleep(40000); n }) .collect() } finally { @@ -164,9 +164,9 @@ class SparkSessionE2ESuite extends RemoteSparkSession { assert(spark.getTags() == Set("two")) try { spark - .range(10) + .range(start = 0, end = 10, step = 1, numPartitions = 2) .map(n => { - Thread.sleep(30000); n + Thread.sleep(40000); n }) .collect() } finally { @@ -183,9 +183,9 @@ class SparkSessionE2ESuite extends RemoteSparkSession { assert(spark.getTags() == Set("one")) try { spark - .range(10) + .range(start = 0, end = 10, step = 1, numPartitions = 2) .map(n => { - Thread.sleep(30000); n + Thread.sleep(40000); n }) .collect() } finally { @@ -196,7 +196,7 @@ class SparkSessionE2ESuite extends RemoteSparkSession { // q2 and q3 should be cancelled interrupted.clear() - eventually(timeout(20.seconds), interval(1.seconds)) { + eventually(timeout(30.seconds), interval(1.seconds)) { val ids = spark.interruptTag("two") interrupted ++= ids assert(interrupted.length == 2, s"Interrupted operations: $interrupted.") @@ -213,7 +213,7 @@ class SparkSessionE2ESuite extends RemoteSparkSession { // q1 and q4 should be cancelled interrupted.clear() - eventually(timeout(20.seconds), interval(1.seconds)) { + eventually(timeout(30.seconds), interval(1.seconds)) { val ids = spark.interruptTag("one") interrupted ++= ids assert(interrupted.length == 2, s"Interrupted operations: $interrupted.")