From 3db654fa36f89987075cf40aacd5adc568be2d45 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Wed, 23 Jul 2025 23:44:04 +0900 Subject: [PATCH 1/2] Change the definition of streamFinished to consider cases where responses are cleaned up before consumed --- .../sql/connect/execution/ExecuteGrpcResponseSender.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala index ff59789980ce..e58053e6d4b4 100644 --- a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala +++ b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala @@ -232,7 +232,8 @@ private[connect] class ExecuteGrpcResponseSender[T <: Message]( // 2. has a response to send def gotResponse = response.nonEmpty // 3. sent everything from the stream and the stream is finished - def streamFinished = executionObserver.getLastResponseIndex().exists(nextIndex > _) + def streamFinished = executionObserver.getLastResponseIndex().exists(nextIndex > _) || + executionObserver.completed() && response.isEmpty // 4. time deadline or size limit reached def deadlineLimitReached = sentResponsesSize > maximumResponseSize || deadlineTimeNs < System.nanoTime() From 381c905fb4d5aadf0246d38f34755d43be03fb70 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Fri, 25 Jul 2025 05:05:48 +0900 Subject: [PATCH 2/2] Update the definition of streamFinished --- .../sql/connect/execution/ExecuteGrpcResponseSender.scala | 2 +- .../sql/connect/execution/ExecuteResponseObserver.scala | 5 +++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala index e58053e6d4b4..3a707495ff3f 100644 --- a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala +++ b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala @@ -233,7 +233,7 @@ private[connect] class ExecuteGrpcResponseSender[T <: Message]( def gotResponse = response.nonEmpty // 3. sent everything from the stream and the stream is finished def streamFinished = executionObserver.getLastResponseIndex().exists(nextIndex > _) || - executionObserver.completed() && response.isEmpty + executionObserver.isCleaned() // 4. time deadline or size limit reached def deadlineLimitReached = sentResponsesSize > maximumResponseSize || deadlineTimeNs < System.nanoTime() diff --git a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala index 9d0cc2128dd4..bcb665eb01ef 100644 --- a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala +++ b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala @@ -260,6 +260,11 @@ private[connect] class ExecuteResponseObserver[T <: Message](val executeHolder: finalProducedIndex.isDefined } + // Returns if this observer has already been cleaned + def isCleaned(): Boolean = responseLock.synchronized { + completed() && responses.isEmpty + } + // For testing. private[connect] def undoCompletion(): Unit = responseLock.synchronized { finalProducedIndex = None