Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -105,25 +105,34 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
/**
* Store the metadata for the specified batchId and return `true` if successful. If the batchId's
* metadata has already been stored, this method will return `false`.
*
* Note that this method must be called on a [[org.apache.spark.util.UninterruptibleThread]]
* so that interrupts can be disabled while writing the batch file. This is because there is a
* potential dead-lock in Hadoop "Shell.runCommand" before 2.5.0 (HADOOP-10622). If the thread
* running "Shell.runCommand" is interrupted, then the thread can get deadlocked. In our
* case, `writeBatch` creates a file using HDFS API and calls "Shell.runCommand" to set the
* file permissions, and can get deadlocked if the stream execution thread is stopped by
* interrupt. Hence, we make sure that this method is called on [[UninterruptibleThread]] which
* allows us to disable interrupts here. Also see SPARK-14131.
*/
override def add(batchId: Long, metadata: T): Boolean = {
get(batchId).map(_ => false).getOrElse {
// Only write metadata when the batch has not yet been written
Thread.currentThread match {
case ut: UninterruptibleThread =>
ut.runUninterruptibly { writeBatch(batchId, metadata, serialize) }
case _ =>
throw new IllegalStateException(
"HDFSMetadataLog.add() must be executed on a o.a.spark.util.UninterruptibleThread")
if (fileManager.isLocalFileSystem) {
Thread.currentThread match {
case ut: UninterruptibleThread =>
// When using a local file system, "writeBatch" must be called on a
// [[org.apache.spark.util.UninterruptibleThread]] so that interrupts can be disabled
// while writing the batch file. This is because there is a potential dead-lock in
// Hadoop "Shell.runCommand" before 2.5.0 (HADOOP-10622). If the thread running
// "Shell.runCommand" is interrupted, then the thread can get deadlocked. In our case,
// `writeBatch` creates a file using HDFS API and will call "Shell.runCommand" to set
// the file permission if using the local file system, and can get deadlocked if the
// stream execution thread is stopped by interrupt. Hence, we make sure that
// "writeBatch" is called on [[UninterruptibleThread]] which allows us to disable
// interrupts here. Also see SPARK-14131.
ut.runUninterruptibly { writeBatch(batchId, metadata, serialize) }
case _ =>
throw new IllegalStateException(
"HDFSMetadataLog.add() on a local file system must be executed on " +
"a o.a.spark.util.UninterruptibleThread")
}
} else {
// For a distributed file system, such as HDFS or S3, if the network is broken, write
// operations may just hang until timeout. We should enable interrupts to allow stopping
// the query fast.
writeBatch(batchId, metadata, serialize)
}
true
}
Expand Down Expand Up @@ -298,6 +307,9 @@ object HDFSMetadataLog {

/** Recursively delete a path if it exists. Should not throw exception if file doesn't exist. */
def delete(path: Path): Unit

/** Whether the file systme is a local FS. */
def isLocalFileSystem: Boolean
}

/**
Expand Down Expand Up @@ -342,6 +354,13 @@ object HDFSMetadataLog {
// ignore if file has already been deleted
}
}

override def isLocalFileSystem: Boolean = fc.getDefaultFileSystem match {
case _: local.LocalFs | _: local.RawLocalFs =>
// LocalFs = RawLocalFs + ChecksumFs
true
case _ => false
}
}

/**
Expand Down Expand Up @@ -398,5 +417,12 @@ object HDFSMetadataLog {
// ignore if file has already been deleted
}
}

override def isLocalFileSystem: Boolean = fs match {
case _: LocalFileSystem | _: RawLocalFileSystem =>
// LocalFileSystem = RawLocalFileSystem + ChecksumFileSystem
true
case _ => false
}
}
}