diff --git a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala index ff59789980ce..3a707495ff3f 100644 --- a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala +++ b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala @@ -232,7 +232,8 @@ private[connect] class ExecuteGrpcResponseSender[T <: Message]( // 2. has a response to send def gotResponse = response.nonEmpty // 3. sent everything from the stream and the stream is finished - def streamFinished = executionObserver.getLastResponseIndex().exists(nextIndex > _) + def streamFinished = executionObserver.getLastResponseIndex().exists(nextIndex > _) || + executionObserver.isCleaned() // 4. time deadline or size limit reached def deadlineLimitReached = sentResponsesSize > maximumResponseSize || deadlineTimeNs < System.nanoTime() diff --git a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala index 9d0cc2128dd4..bcb665eb01ef 100644 --- a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala +++ b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala @@ -260,6 +260,11 @@ private[connect] class ExecuteResponseObserver[T <: Message](val executeHolder: finalProducedIndex.isDefined } + // Returns if this observer has already been cleaned + def isCleaned(): Boolean = responseLock.synchronized { + completed() && responses.isEmpty + } + // For testing. private[connect] def undoCompletion(): Unit = responseLock.synchronized { finalProducedIndex = None