From c6a3a7ea18ade52cbd15dbf690623aa661a91966 Mon Sep 17 00:00:00 2001 From: Anish Shrigondekar Date: Sat, 30 Dec 2023 14:44:51 +0530 Subject: [PATCH 1/6] [SPARK-46547] Fix deadlock between maintenance thread and streaming aggregation operator --- .../execution/streaming/state/RocksDB.scala | 1 + .../streaming/statefulOperators.scala | 34 +++++++++++-------- 2 files changed, 20 insertions(+), 15 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala index 101a9e6b9199..0284d4c9d303 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala @@ -452,6 +452,7 @@ class RocksDB( * Drop uncommitted changes, and roll back to previous version. */ def rollback(): Unit = { + acquire() numKeysOnWritingVersion = numKeysOnLoadedVersion loadedVersion = -1L changelogWriter.foreach(_.abort()) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala index 80f5b3532c5e..14209d1461f2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala @@ -434,22 +434,26 @@ case class StateStoreRestoreExec( numColsPrefixKey = 0, session.sessionState, Some(session.streams.stateStoreCoordinator)) { case (store, iter) => - val hasInput = iter.hasNext - if (!hasInput && keyExpressions.isEmpty) { - // If our `keyExpressions` are empty, we're getting a global aggregation. In that case - // the `HashAggregateExec` will output a 0 value for the partial merge. We need to - // restore the value, so that we don't overwrite our state with a 0 value, but rather - // merge the 0 with existing state. - store.iterator().map(_.value) - } else { - iter.flatMap { row => - val key = stateManager.getKey(row.asInstanceOf[UnsafeRow]) - val restoredRow = stateManager.get(store, key) - val outputRows = Option(restoredRow).toSeq :+ row - numOutputRows += outputRows.size - outputRows - } + val hasInput = iter.hasNext + val result = if (!hasInput && keyExpressions.isEmpty) { + // If our `keyExpressions` are empty, we're getting a global aggregation. In that case + // the `HashAggregateExec` will output a 0 value for the partial merge. We need to + // restore the value, so that we don't overwrite our state with a 0 value, but rather + // merge the 0 with existing state. + store.iterator().map(_.value) + } else { + iter.flatMap { row => + val key = stateManager.getKey(row.asInstanceOf[UnsafeRow]) + val restoredRow = stateManager.get(store, key) + val outputRows = Option(restoredRow).toSeq :+ row + numOutputRows += outputRows.size + outputRows } + } + // SPARK-46547 - Release any locks/resources if required, to prevent + // deadlocks with the maintenance thread. + store.abort() + result } } From 5daa8067a134d11c37a86dab555888fe858a130c Mon Sep 17 00:00:00 2001 From: Anish Shrigondekar Date: Mon, 1 Jan 2024 13:07:49 +0530 Subject: [PATCH 2/6] Use light weight abort method within completionIterator --- .../state/HDFSBackedStateStoreProvider.scala | 4 ++-- .../sql/execution/streaming/state/RocksDB.scala | 16 +++++++++------- .../state/RocksDBStateStoreProvider.scala | 4 ++-- .../execution/streaming/state/StateStore.scala | 9 +++++---- .../execution/streaming/statefulOperators.scala | 10 ++++++---- .../streaming/state/MemoryStateStore.scala | 2 +- 6 files changed, 25 insertions(+), 20 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala index 504c56ae7082..a29f90106ad4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala @@ -84,7 +84,7 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with map.iterator() } - override def abort(): Unit = {} + override def abort(releaseOnly: Boolean = false): Unit = {} override def toString(): String = { s"HDFSReadStateStore[id=(op=${id.operatorId},part=${id.partitionId}),dir=$baseDir]" @@ -149,7 +149,7 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with } /** Abort all the updates made on this store. This store will not be usable any more. */ - override def abort(): Unit = { + override def abort(releaseOnly: Boolean = false): Unit = { // This if statement is to ensure that files are deleted only once: if either commit or abort // is called before, it will be no-op. if (state == UPDATING) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala index 0284d4c9d303..dac7a9c57b76 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala @@ -451,15 +451,17 @@ class RocksDB( /** * Drop uncommitted changes, and roll back to previous version. */ - def rollback(): Unit = { + def rollback(releaseOnly: Boolean = false): Unit = { acquire() - numKeysOnWritingVersion = numKeysOnLoadedVersion - loadedVersion = -1L - changelogWriter.foreach(_.abort()) - // Make sure changelogWriter gets recreated next time. - changelogWriter = None + if (!releaseOnly) { + numKeysOnWritingVersion = numKeysOnLoadedVersion + loadedVersion = -1L + changelogWriter.foreach(_.abort()) + // Make sure changelogWriter gets recreated next time. + changelogWriter = None + logInfo(s"Rolled back to $loadedVersion") + } release() - logInfo(s"Rolled back to $loadedVersion") } def doMaintenance(): Unit = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala index 9552e2c81bb1..c67810474bc1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala @@ -102,10 +102,10 @@ private[sql] class RocksDBStateStoreProvider } } - override def abort(): Unit = { + override def abort(releaseOnly: Boolean = false): Unit = { verify(state == UPDATING || state == ABORTED, "Cannot abort after already committed") logInfo(s"Aborting ${version + 1} for $id") - rocksDB.rollback() + rocksDB.rollback(releaseOnly) state = ABORTED } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala index 6a3a30c7efbc..1d2c6b69eb21 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala @@ -84,10 +84,11 @@ trait ReadStateStore { /** * Clean up the resource. - * + * @param releaseOnly - if true, only release the instance lock and do not run the full abort + * sequence. * The method name is to respect backward compatibility on [[StateStore]]. */ - def abort(): Unit + def abort(releaseOnly: Boolean = false): Unit } /** @@ -124,7 +125,7 @@ trait StateStore extends ReadStateStore { * Abort all the updates that have been made to the store. Implementations should ensure that * no more updates (puts, removes) can be after an abort in order to avoid incorrect usage. */ - override def abort(): Unit + override def abort(releaseOnly: Boolean = false): Unit /** * Return an iterator containing all the key-value pairs in the StateStore. Implementations must @@ -155,7 +156,7 @@ class WrappedReadStateStore(store: StateStore) extends ReadStateStore { override def iterator(): Iterator[UnsafeRowPair] = store.iterator() - override def abort(): Unit = store.abort() + override def abort(releaseOnly: Boolean = false): Unit = store.abort(releaseOnly) override def prefixScan(prefixKey: UnsafeRow): Iterator[UnsafeRowPair] = store.prefixScan(prefixKey) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala index 14209d1461f2..76773f8c7b43 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala @@ -450,10 +450,12 @@ case class StateStoreRestoreExec( outputRows } } - // SPARK-46547 - Release any locks/resources if required, to prevent - // deadlocks with the maintenance thread. - store.abort() - result + + CompletionIterator[InternalRow, Iterator[InternalRow]](result, { + // SPARK-46547 - Release the DB instance lock if required, to prevent + // deadlocks with the maintenance thread. + store.abort(releaseOnly = true) + }) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/MemoryStateStore.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/MemoryStateStore.scala index 4f11f1edd7e0..e5849e967822 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/MemoryStateStore.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/MemoryStateStore.scala @@ -37,7 +37,7 @@ class MemoryStateStore extends StateStore() { override def commit(): Long = version + 1 - override def abort(): Unit = {} + override def abort(releaseOnly: Boolean = false): Unit = {} override def id: StateStoreId = null From 8bbaf8f774c930acd201f6057adf38fcc6501721 Mon Sep 17 00:00:00 2001 From: Anish Shrigondekar Date: Tue, 9 Jan 2024 14:47:01 +0530 Subject: [PATCH 3/6] Revert "Use light weight abort method within completionIterator" This reverts commit 5daa8067a134d11c37a86dab555888fe858a130c. --- .../state/HDFSBackedStateStoreProvider.scala | 4 ++-- .../sql/execution/streaming/state/RocksDB.scala | 16 +++++++--------- .../state/RocksDBStateStoreProvider.scala | 4 ++-- .../execution/streaming/state/StateStore.scala | 9 ++++----- .../execution/streaming/statefulOperators.scala | 10 ++++------ .../streaming/state/MemoryStateStore.scala | 2 +- 6 files changed, 20 insertions(+), 25 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala index a29f90106ad4..504c56ae7082 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala @@ -84,7 +84,7 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with map.iterator() } - override def abort(releaseOnly: Boolean = false): Unit = {} + override def abort(): Unit = {} override def toString(): String = { s"HDFSReadStateStore[id=(op=${id.operatorId},part=${id.partitionId}),dir=$baseDir]" @@ -149,7 +149,7 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with } /** Abort all the updates made on this store. This store will not be usable any more. */ - override def abort(releaseOnly: Boolean = false): Unit = { + override def abort(): Unit = { // This if statement is to ensure that files are deleted only once: if either commit or abort // is called before, it will be no-op. if (state == UPDATING) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala index dac7a9c57b76..0284d4c9d303 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala @@ -451,17 +451,15 @@ class RocksDB( /** * Drop uncommitted changes, and roll back to previous version. */ - def rollback(releaseOnly: Boolean = false): Unit = { + def rollback(): Unit = { acquire() - if (!releaseOnly) { - numKeysOnWritingVersion = numKeysOnLoadedVersion - loadedVersion = -1L - changelogWriter.foreach(_.abort()) - // Make sure changelogWriter gets recreated next time. - changelogWriter = None - logInfo(s"Rolled back to $loadedVersion") - } + numKeysOnWritingVersion = numKeysOnLoadedVersion + loadedVersion = -1L + changelogWriter.foreach(_.abort()) + // Make sure changelogWriter gets recreated next time. + changelogWriter = None release() + logInfo(s"Rolled back to $loadedVersion") } def doMaintenance(): Unit = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala index c67810474bc1..9552e2c81bb1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala @@ -102,10 +102,10 @@ private[sql] class RocksDBStateStoreProvider } } - override def abort(releaseOnly: Boolean = false): Unit = { + override def abort(): Unit = { verify(state == UPDATING || state == ABORTED, "Cannot abort after already committed") logInfo(s"Aborting ${version + 1} for $id") - rocksDB.rollback(releaseOnly) + rocksDB.rollback() state = ABORTED } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala index 1d2c6b69eb21..6a3a30c7efbc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala @@ -84,11 +84,10 @@ trait ReadStateStore { /** * Clean up the resource. - * @param releaseOnly - if true, only release the instance lock and do not run the full abort - * sequence. + * * The method name is to respect backward compatibility on [[StateStore]]. */ - def abort(releaseOnly: Boolean = false): Unit + def abort(): Unit } /** @@ -125,7 +124,7 @@ trait StateStore extends ReadStateStore { * Abort all the updates that have been made to the store. Implementations should ensure that * no more updates (puts, removes) can be after an abort in order to avoid incorrect usage. */ - override def abort(releaseOnly: Boolean = false): Unit + override def abort(): Unit /** * Return an iterator containing all the key-value pairs in the StateStore. Implementations must @@ -156,7 +155,7 @@ class WrappedReadStateStore(store: StateStore) extends ReadStateStore { override def iterator(): Iterator[UnsafeRowPair] = store.iterator() - override def abort(releaseOnly: Boolean = false): Unit = store.abort(releaseOnly) + override def abort(): Unit = store.abort() override def prefixScan(prefixKey: UnsafeRow): Iterator[UnsafeRowPair] = store.prefixScan(prefixKey) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala index 76773f8c7b43..14209d1461f2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala @@ -450,12 +450,10 @@ case class StateStoreRestoreExec( outputRows } } - - CompletionIterator[InternalRow, Iterator[InternalRow]](result, { - // SPARK-46547 - Release the DB instance lock if required, to prevent - // deadlocks with the maintenance thread. - store.abort(releaseOnly = true) - }) + // SPARK-46547 - Release any locks/resources if required, to prevent + // deadlocks with the maintenance thread. + store.abort() + result } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/MemoryStateStore.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/MemoryStateStore.scala index e5849e967822..4f11f1edd7e0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/MemoryStateStore.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/MemoryStateStore.scala @@ -37,7 +37,7 @@ class MemoryStateStore extends StateStore() { override def commit(): Long = version + 1 - override def abort(releaseOnly: Boolean = false): Unit = {} + override def abort(): Unit = {} override def id: StateStoreId = null From 2acba495a92abcf182c16c4512e0fd3fe722da0e Mon Sep 17 00:00:00 2001 From: Anish Shrigondekar Date: Tue, 9 Jan 2024 14:54:31 +0530 Subject: [PATCH 4/6] Change to address Jungtaek's comments --- .../streaming/state/RocksDBStateStoreProvider.scala | 9 ++++++++- .../sql/execution/streaming/statefulOperators.scala | 6 +----- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala index 9552e2c81bb1..8d5eb23283a1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.streaming.state import java.io._ +import scala.util.control.NonFatal + import org.apache.hadoop.conf.Configuration import org.apache.spark.{SparkConf, SparkEnv} @@ -233,7 +235,12 @@ private[sql] class RocksDBStateStoreProvider } override def doMaintenance(): Unit = { - rocksDB.doMaintenance() + try { + rocksDB.doMaintenance() + } catch { + case NonFatal(ex) => + logWarning(s"Error performing maintenance operations with exception=$ex") + } } override def close(): Unit = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala index 14209d1461f2..8cb99a162ab2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala @@ -435,7 +435,7 @@ case class StateStoreRestoreExec( session.sessionState, Some(session.streams.stateStoreCoordinator)) { case (store, iter) => val hasInput = iter.hasNext - val result = if (!hasInput && keyExpressions.isEmpty) { + if (!hasInput && keyExpressions.isEmpty) { // If our `keyExpressions` are empty, we're getting a global aggregation. In that case // the `HashAggregateExec` will output a 0 value for the partial merge. We need to // restore the value, so that we don't overwrite our state with a 0 value, but rather @@ -450,10 +450,6 @@ case class StateStoreRestoreExec( outputRows } } - // SPARK-46547 - Release any locks/resources if required, to prevent - // deadlocks with the maintenance thread. - store.abort() - result } } From 4f676f445b8bc080a208818b02c2448405369cbc Mon Sep 17 00:00:00 2001 From: Anish Shrigondekar Date: Wed, 10 Jan 2024 11:27:29 +0530 Subject: [PATCH 5/6] Change to address Jungtaek's comments --- .../execution/streaming/state/RocksDB.scala | 1 - .../streaming/statefulOperators.scala | 30 +++++++++---------- 2 files changed, 15 insertions(+), 16 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala index 0284d4c9d303..101a9e6b9199 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala @@ -452,7 +452,6 @@ class RocksDB( * Drop uncommitted changes, and roll back to previous version. */ def rollback(): Unit = { - acquire() numKeysOnWritingVersion = numKeysOnLoadedVersion loadedVersion = -1L changelogWriter.foreach(_.abort()) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala index 8cb99a162ab2..80f5b3532c5e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala @@ -434,22 +434,22 @@ case class StateStoreRestoreExec( numColsPrefixKey = 0, session.sessionState, Some(session.streams.stateStoreCoordinator)) { case (store, iter) => - val hasInput = iter.hasNext - if (!hasInput && keyExpressions.isEmpty) { - // If our `keyExpressions` are empty, we're getting a global aggregation. In that case - // the `HashAggregateExec` will output a 0 value for the partial merge. We need to - // restore the value, so that we don't overwrite our state with a 0 value, but rather - // merge the 0 with existing state. - store.iterator().map(_.value) - } else { - iter.flatMap { row => - val key = stateManager.getKey(row.asInstanceOf[UnsafeRow]) - val restoredRow = stateManager.get(store, key) - val outputRows = Option(restoredRow).toSeq :+ row - numOutputRows += outputRows.size - outputRows + val hasInput = iter.hasNext + if (!hasInput && keyExpressions.isEmpty) { + // If our `keyExpressions` are empty, we're getting a global aggregation. In that case + // the `HashAggregateExec` will output a 0 value for the partial merge. We need to + // restore the value, so that we don't overwrite our state with a 0 value, but rather + // merge the 0 with existing state. + store.iterator().map(_.value) + } else { + iter.flatMap { row => + val key = stateManager.getKey(row.asInstanceOf[UnsafeRow]) + val restoredRow = stateManager.get(store, key) + val outputRows = Option(restoredRow).toSeq :+ row + numOutputRows += outputRows.size + outputRows + } } - } } } From a7b4f164b8c21042d7129c802645ffdc12990624 Mon Sep 17 00:00:00 2001 From: Anish Shrigondekar Date: Wed, 10 Jan 2024 13:03:44 +0530 Subject: [PATCH 6/6] Change to address Raghu's comments --- .../streaming/state/RocksDBStateStoreProvider.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala index 8d5eb23283a1..f5382d040f28 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala @@ -238,8 +238,11 @@ private[sql] class RocksDBStateStoreProvider try { rocksDB.doMaintenance() } catch { + // SPARK-46547 - Swallow non-fatal exception in maintenance task to avoid deadlock between + // maintenance thread and streaming aggregation operator case NonFatal(ex) => - logWarning(s"Error performing maintenance operations with exception=$ex") + logWarning(s"Ignoring error while performing maintenance operations with exception=", + ex) } }