From 1e361344101ccaf3d8a9ddebe6767b610f0916ed Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 11 May 2017 20:21:54 -0700 Subject: [PATCH 1/2] Ignored exceptions --- .../state/HDFSBackedStateStoreProvider.scala | 22 ++++++++++++++----- 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala index 1426728f9b550..fa809d9505793 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.streaming.state import java.io.{DataInputStream, DataOutputStream, FileNotFoundException, IOException} +import java.nio.channels.ClosedChannelException import java.util.Locale import scala.collection.JavaConverters._ @@ -202,13 +203,22 @@ private[state] class HDFSBackedStateStoreProvider( /** Abort all the updates made on this store. This store will not be usable any more. */ override def abort(): Unit = { verify(state == UPDATING || state == ABORTED, "Cannot abort after already committed") + try { + state = ABORTED + if (tempDeltaFileStream != null) { + tempDeltaFileStream.close() + } + if (tempDeltaFile != null) { + fs.delete(tempDeltaFile, true) + } + } catch { + case c: ClosedChannelException => + // This can happen when underlying file output stream has been closed before the + // compression stream. + logDebug(s"Error aborting version $newVersion into $this", c) - state = ABORTED - if (tempDeltaFileStream != null) { - tempDeltaFileStream.close() - } - if (tempDeltaFile != null) { - fs.delete(tempDeltaFile, true) + case e: Exception => + logWarning(s"Error aborting version $newVersion into $this") } logInfo(s"Aborted version $newVersion for $this") } From 820e4d54a1433316da1a1c97b9415b9f7d86d0d1 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Fri, 12 May 2017 11:31:39 -0700 Subject: [PATCH 2/2] Fix mistake --- .../streaming/state/HDFSBackedStateStoreProvider.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala index fa809d9505793..fb2bf47d6e83b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala @@ -218,7 +218,7 @@ private[state] class HDFSBackedStateStoreProvider( logDebug(s"Error aborting version $newVersion into $this", c) case e: Exception => - logWarning(s"Error aborting version $newVersion into $this") + logWarning(s"Error aborting version $newVersion into $this", e) } logInfo(s"Aborted version $newVersion for $this") }