@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.streaming
1919
2020import java .io ._
2121import java .nio .charset .StandardCharsets
22- import java .util .{ConcurrentModificationException , EnumSet , UUID }
22+ import java .util .{ConcurrentModificationException , EnumSet , LinkedHashMap , UUID }
2323
2424import scala .reflect .ClassTag
2525
@@ -93,6 +93,14 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
9393 }
9494 }
9595
96+ /**
97+ * Cache the latest two batches. [[StreamExecution ]] usually just accesses the latest two batches
98+ * when committing offsets, this cache will save some file system operations.
99+ */
100+ private val batchCache = new LinkedHashMap [Long , T ](2 ) {
101+ override def removeEldestEntry (e : java.util.Map .Entry [Long , T ]): Boolean = size > 2
102+ }
103+
96104 /**
97105 * A `PathFilter` to filter only batch files
98106 */
@@ -133,6 +141,7 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
133141 * metadata has already been stored, this method will return `false`.
134142 */
135143 override def add (batchId : Long , metadata : T ): Boolean = {
144+ assert(metadata != null )
136145 get(batchId).map(_ => false ).getOrElse {
137146 // Only write metadata when the batch has not yet been written
138147 runUninterruptiblyIfLocal {
@@ -142,17 +151,21 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
142151 }
143152 }
144153
145- private def writeTempBatch (metadata : T ): Option [Path ] = {
154+ private def writeBatchToFile (metadata : T , path : Path ): Unit = {
155+ val output = fileManager.create(path)
156+ try {
157+ serialize(metadata, output)
158+ } finally {
159+ IOUtils .closeQuietly(output)
160+ }
161+ }
162+
163+ private def writeTempBatch (metadata : T ): Path = {
146164 while (true ) {
147165 val tempPath = new Path (metadataPath, s " . ${UUID .randomUUID.toString}.tmp " )
148166 try {
149- val output = fileManager.create(tempPath)
150- try {
151- serialize(metadata, output)
152- return Some (tempPath)
153- } finally {
154- IOUtils .closeQuietly(output)
155- }
167+ writeBatchToFile(metadata, tempPath)
168+ return tempPath
156169 } catch {
157170 case e : IOException if isFileAlreadyExistsException(e) =>
158171 // Failed to create "tempPath". There are two cases:
@@ -169,7 +182,8 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
169182 // malicious checkpoint files to crash a Streaming application too.
170183 }
171184 }
172- None
185+ assert(false , " should not happen" )
186+ null
173187 }
174188
175189 /**
@@ -179,18 +193,21 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
179193 * valid behavior, we still need to prevent it from destroying the files.
180194 */
181195 private def writeBatch (batchId : Long , metadata : T ): Unit = {
182- val tempPath = writeTempBatch(metadata).getOrElse(
183- throw new IllegalStateException (s " Unable to create temp batch file $batchId" ))
196+ if (! fileManager.supportsAtomicRename) {
197+ // The underlying file system implementation doesn't support atomic rename, so writing to the
198+ // target path directly.
199+ writeBatchToFile(metadata, batchIdToPath(batchId))
200+ batchCache.put(batchId, metadata)
201+ return
202+ }
203+
204+ val tempPath = writeTempBatch(metadata)
184205 try {
185206 // Try to commit the batch
186207 // It will fail if there is an existing file (someone has committed the batch)
187208 logDebug(s " Attempting to write log # ${batchIdToPath(batchId)}" )
188209 fileManager.rename(tempPath, batchIdToPath(batchId))
189-
190- // SPARK-17475: HDFSMetadataLog should not leak CRC files
191- // If the underlying filesystem didn't rename the CRC file, delete it.
192- val crcPath = new Path (tempPath.getParent(), s " . ${tempPath.getName()}.crc " )
193- if (fileManager.exists(crcPath)) fileManager.delete(crcPath)
210+ batchCache.put(batchId, metadata)
194211 } catch {
195212 case e : IOException if isFileAlreadyExistsException(e) =>
196213 // If "rename" fails, it means some other "HDFSMetadataLog" has committed the batch.
@@ -200,6 +217,11 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
200217 } finally {
201218 fileManager.delete(tempPath)
202219 }
220+
221+ // SPARK-17475: HDFSMetadataLog should not leak CRC files
222+ // If the underlying filesystem didn't rename the CRC file, delete it.
223+ val crcPath = new Path (tempPath.getParent(), s " . ${tempPath.getName()}.crc " )
224+ if (fileManager.exists(crcPath)) fileManager.delete(crcPath)
203225 }
204226
205227 private def isFileAlreadyExistsException (e : IOException ): Boolean = {
@@ -213,7 +235,7 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
213235 * @return the deserialized metadata in a batch file, or None if file not exist.
214236 * @throws IllegalArgumentException when path does not point to a batch file.
215237 */
216- def get (batchFile : Path ): Option [T ] = {
238+ private def get (batchFile : Path ): Option [T ] = {
217239 if (fileManager.exists(batchFile)) {
218240 if (isBatchFile(batchFile)) {
219241 get(pathToBatchId(batchFile))
@@ -226,6 +248,12 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
226248 }
227249
228250 override def get (batchId : Long ): Option [T ] = {
251+ if (batchCache.containsKey(batchId)) {
252+ val metadata = batchCache.get(batchId)
253+ assert(metadata != null )
254+ return Some (metadata)
255+ }
256+
229257 val batchMetadataFile = batchIdToPath(batchId)
230258 if (fileManager.exists(batchMetadataFile)) {
231259 val input = fileManager.open(batchMetadataFile)
@@ -267,17 +295,6 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
267295 None
268296 }
269297
270- /**
271- * Get an array of [FileStatus] referencing batch files.
272- * The array is sorted by most recent batch file first to
273- * oldest batch file.
274- */
275- def getOrderedBatchFiles (): Array [FileStatus ] = {
276- fileManager.list(metadataPath, batchFilesFilter)
277- .sortBy(f => pathToBatchId(f.getPath))
278- .reverse
279- }
280-
281298 /**
282299 * Removes all the log entry earlier than thresholdBatchId (exclusive).
283300 */
@@ -288,6 +305,7 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
288305 for (batchId <- batchIds if batchId < thresholdBatchId) {
289306 val path = batchIdToPath(batchId)
290307 fileManager.delete(path)
308+ batchCache.remove(batchId)
291309 logTrace(s " Removed metadata log file: $path" )
292310 }
293311 }
@@ -326,6 +344,9 @@ object HDFSMetadataLog {
326344 /** Create path, or throw exception if it already exists */
327345 def create (path : Path ): FSDataOutputStream
328346
347+ /** Wheter the file system supports atomic rename. */
348+ def supportsAtomicRename : Boolean = true
349+
329350 /**
330351 * Atomically rename path, or throw exception if it cannot be done.
331352 * Should throw FileNotFoundException if srcPath does not exist.
@@ -336,7 +357,7 @@ object HDFSMetadataLog {
336357 /** Recursively delete a path if it exists. Should not throw exception if file doesn't exist. */
337358 def delete (path : Path ): Unit
338359
339- /** Whether the file systme is a local FS. */
360+ /** Whether the file system is a local FS. */
340361 def isLocalFileSystem : Boolean
341362 }
342363
@@ -354,6 +375,8 @@ object HDFSMetadataLog {
354375 fc.util.listStatus(path, filter)
355376 }
356377
378+ override def supportsAtomicRename : Boolean = true
379+
357380 override def rename (srcPath : Path , destPath : Path ): Unit = {
358381 fc.rename(srcPath, destPath)
359382 }
@@ -403,6 +426,8 @@ object HDFSMetadataLog {
403426 fs.listStatus(path, filter)
404427 }
405428
429+ override def supportsAtomicRename : Boolean = false
430+
406431 /**
407432 * Rename a path. Note that this implementation is not atomic.
408433 * @throws FileNotFoundException if source path does not exist.
0 commit comments