From 6cc43a3526295438a0b8b3b810f77bced3dd18dc Mon Sep 17 00:00:00 2001 From: jerryshao Date: Thu, 2 Jun 2016 11:14:48 +0800 Subject: [PATCH 01/10] Add the ability to remove the old MetadataLog in FileStreamSource --- .../streaming/FileStreamSource.scala | 107 +++++++++++++++++- .../apache/spark/sql/internal/SQLConf.scala | 23 +++- .../sql/streaming/FileStreamSourceSuite.scala | 66 ++++++++++- 3 files changed, 193 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index 42fb454c2d15..6cb2f8ed9f22 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -17,20 +17,36 @@ package org.apache.spark.sql.execution.streaming +<<<<<<< 92ce8d4849a0341c4636e70821b7be57ad3055b1 import scala.collection.JavaConverters._ +======= +import java.util.UUID -import org.apache.hadoop.fs.Path +import scala.collection.mutable.ArrayBuffer +import scala.util.control.NonFatal +>>>>>>> Add the ability to remove the old MetadataLog in FileStreamSource + +import org.apache.hadoop.fs.{Path, PathFilter} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} +<<<<<<< 92ce8d4849a0341c4636e70821b7be57ad3055b1 import org.apache.spark.sql.execution.datasources.{DataSource, ListingFileCatalog, LogicalRelation} +======= +import org.apache.spark.sql.execution.datasources.{CaseInsensitiveMap, DataSource, ListingFileCatalog, LogicalRelation} +import org.apache.spark.sql.internal.SQLConf +>>>>>>> Add the ability to remove the old MetadataLog in FileStreamSource import org.apache.spark.sql.types.StructType /** +<<<<<<< 92ce8d4849a0341c4636e70821b7be57ad3055b1 * A very simple source that reads files from the given directory as they appear. * * TODO: Clean up the metadata log files periodically. +======= + * A very simple source that reads text files from the given directory as they appear. +>>>>>>> Add the ability to remove the old MetadataLog in FileStreamSource */ class FileStreamSource( sparkSession: SparkSession, @@ -40,6 +56,7 @@ class FileStreamSource( metadataPath: String, options: Map[String, String]) extends Source with Logging { +<<<<<<< 92ce8d4849a0341c4636e70821b7be57ad3055b1 import FileStreamSource._ private val sourceOptions = new FileStreamOptions(options) @@ -51,6 +68,11 @@ class FileStreamSource( private val metadataLog = new HDFSMetadataLog[Array[FileEntry]](sparkSession, metadataPath) +======= + private val fs = new Path(path).getFileSystem(sparkSession.sessionState.newHadoopConf()) + private val qualifiedBasePath = fs.makeQualified(new Path(path)) // can contains glob patterns + private val metadataLog = new FileStreamSourceLog(sparkSession, metadataPath) +>>>>>>> Add the ability to remove the old MetadataLog in FileStreamSource private var maxBatchId = metadataLog.getLatest().map(_._1).getOrElse(-1L) /** Maximum number of new files to be considered in each batch */ @@ -234,3 +256,86 @@ object FileStreamSource { } } } + +class FileStreamSourceLog(sparkSession: SparkSession, path: String) + extends HDFSMetadataLog[Seq[String]](sparkSession, path) { + + // Configurations about metadata compaction + private val compactInterval = sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL) + require(compactInterval > 0, + s"Please set ${SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key} (was $compactInterval) to a " + + s"positive value.") + + private val fileCleanupDelayMs = sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY) + + private val isDeletingExpiredLog = sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_DELETION) + + private var compactBatchId: Long = -1L + + private def isCompactionBatch(batchId: Long, compactInterval: Long): Boolean = { + batchId % compactInterval == 0 + } + + override def add(batchId: Long, metadata: Seq[String]): Boolean = { + if (isCompactionBatch(batchId, compactInterval)) { + compactMetadataLog(batchId - 1) + } + + super.add(batchId, metadata) + } + + private def compactMetadataLog(batchId: Long): Unit = { + // read out compact metadata and merge with new metadata. + val batches = super.get(Some(compactBatchId), Some(batchId)) + val totalMetadata = batches.flatMap(_._2) + if (totalMetadata.isEmpty) { + return + } + + // Remove old compact metadata file and rewrite. + val renamedPath = new Path(path, s".${batchId.toString}-${UUID.randomUUID.toString}.tmp") + fileManager.rename(batchIdToPath(batchId), renamedPath) + + var isSuccess = false + try { + isSuccess = super.add(batchId, totalMetadata) + } catch { + case NonFatal(e) => isSuccess = false + } finally { + if (!isSuccess) { + // Rollback to the previous status if compaction is failed. + fileManager.delete(batchIdToPath(batchId)) + fileManager.rename(renamedPath, batchIdToPath(batchId)) + return + } else { + fileManager.delete(renamedPath) + } + } + + compactBatchId = batchId + + // Remove expired metadata log + if (isDeletingExpiredLog) { + removeOlderThan(compactBatchId) + } + } + + private def removeOlderThan(batchId: Long): Unit = { + val expiredTime = System.currentTimeMillis() - fileCleanupDelayMs + fileManager.list(metadataPath, new PathFilter { + override def accept(path: Path): Boolean = { + try { + val id = pathToBatchId(path) + id < batchId + } catch { + case _: NumberFormatException => + false + } + } + }).foreach { f => + if (f.getModificationTime <= expiredTime) { + fileManager.delete(f.getPath) + } + } + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 428032b1fba8..ab0903b56810 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -544,7 +544,28 @@ object SQLConf { .internal() .doc("How long that a file is guaranteed to be visible for all readers.") .timeConf(TimeUnit.MILLISECONDS) - .createWithDefault(60 * 1000L) // 10 minutes + .createWithDefault(60 * 10 * 1000L) // 10 minutes + + val FILE_SOURCE_LOG_DELETION = SQLConfigBuilder("spark.sql.streaming.fileSource.log.deletion") + .internal() + .doc("Whether to delete the expired log files in file stream source.") + .booleanConf + .createWithDefault(true) + + val FILE_SOURCE_LOG_COMPACT_INTERVAL = + SQLConfigBuilder("spark.sql.streaming.fileSource.log.compactInterval") + .internal() + .doc("Number of log files after which all the previous files " + + "are compacted into the next log file.") + .intConf + .createWithDefault(10) + + val FILE_SOURCE_LOG_CLEANUP_DELAY = + SQLConfigBuilder("spark.sql.streaming.fileSource.log.cleanupDelay") + .internal() + .doc("How long in milliseconds a file is guaranteed to be visible for all readers.") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefault(60 * 10 * 1000L) // 10 minutes val STREAMING_SCHEMA_INFERENCE = SQLConfigBuilder("spark.sql.streaming.schemaInference") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 886f7be59db9..55c15e87feb3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -17,10 +17,16 @@ package org.apache.spark.sql.streaming +<<<<<<< 92ce8d4849a0341c4636e70821b7be57ad3055b1 import java.io.File import org.scalatest.concurrent.Eventually._ import org.scalatest.time.SpanSugar._ +======= +import java.io.{File, FilenameFilter} + +import org.scalatest.PrivateMethodTester +>>>>>>> Add the ability to remove the old MetadataLog in FileStreamSource import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.util._ @@ -30,7 +36,7 @@ import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ import org.apache.spark.util.Utils -class FileStreamSourceTest extends StreamTest with SharedSQLContext { +class FileStreamSourceTest extends StreamTest with SharedSQLContext with PrivateMethodTester { import testImplicits._ @@ -623,6 +629,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest { } } +<<<<<<< 92ce8d4849a0341c4636e70821b7be57ad3055b1 test("max files per trigger") { withTempDir { case src => var lastFileModTime: Option[Long] = None @@ -801,6 +808,63 @@ class FileStreamSourceSuite extends FileStreamSourceTest { ) } } +======= + test("clean obsolete metadata log") { + val _sources = PrivateMethod[Seq[Source]]('sources) + val _metadataLog = PrivateMethod[FileStreamSourceLog]('metadataLog) + + def verify(execution: StreamExecution) + (batchId: Long, expectedBatches: Int, expectedFileNames: Array[String]): Boolean = { + val fileSource = (execution invokePrivate _sources()).head.asInstanceOf[FileStreamSource] + val metadataLog = fileSource invokePrivate _metadataLog() + val files = new File(metadataLog.metadataPath.toUri.toString).listFiles( + new FilenameFilter { + override def accept(dir: File, name: String): Boolean = { + try { + name.toLong + true + } catch { + case _: NumberFormatException => false + } + } + }).map(_.getName) + + metadataLog.get(None, Some(batchId)).flatMap(_._2).size === expectedBatches && + files === expectedFileNames + } + + withTempDirs { case (src, tmp) => + withSQLConf( + SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key -> "2", + SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY.key -> "0ms" + ) { + val fileStream = createFileStream("text", src.getCanonicalPath) + val filtered = fileStream.filter($"value" contains "keep") + + testStream(filtered)( + AddTextFileData("drop1\nkeep2\nkeep3", src, tmp), + CheckAnswer("keep2", "keep3"), + AssertOnQuery(verify(_)(0L, 1, Array("0"))), + AddTextFileData("drop4\nkeep5\nkeep6", src, tmp), + CheckAnswer("keep2", "keep3", "keep5", "keep6"), + AssertOnQuery(verify(_)(1L, 2, Array("0", "1"))), + AddTextFileData("drop7\nkeep8\nkeep9", src, tmp), + CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9"), + AssertOnQuery(verify(_)(2L, 3, Array("1", "2"))), + StopStream, + StartStream(), + AssertOnQuery(verify(_)(2L, 3, Array("1", "2"))), + AddTextFileData("drop10\nkeep11", src, tmp), + CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9", "keep11"), + AssertOnQuery(verify(_)(3L, 4, Array("1", "2", "3"))), + AddTextFileData("drop12\nkeep13", src, tmp), + CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9", "keep11", "keep13"), + AssertOnQuery(verify(_)(4L, 5, Array("3", "4"))) + ) + } + } + } +>>>>>>> Add the ability to remove the old MetadataLog in FileStreamSource } class FileStreamSourceStressTestSuite extends FileStreamSourceTest { From b1299dd202a3cde30cfc30c36c5b932e40bed1a4 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Mon, 6 Jun 2016 14:02:03 -0700 Subject: [PATCH 02/10] Fix flaky test --- .../sql/streaming/FileStreamSourceSuite.scala | 37 +++++++------------ 1 file changed, 14 insertions(+), 23 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 55c15e87feb3..d4e854ace2ad 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.streaming +<<<<<<< 8f2fdce1b015c829b6d0c398213991c7f15575c9 <<<<<<< 92ce8d4849a0341c4636e70821b7be57ad3055b1 import java.io.File @@ -24,6 +25,9 @@ import org.scalatest.concurrent.Eventually._ import org.scalatest.time.SpanSugar._ ======= import java.io.{File, FilenameFilter} +======= +import java.io.File +>>>>>>> Fix flaky test import org.scalatest.PrivateMethodTester >>>>>>> Add the ability to remove the old MetadataLog in FileStreamSource @@ -814,29 +818,16 @@ class FileStreamSourceSuite extends FileStreamSourceTest { val _metadataLog = PrivateMethod[FileStreamSourceLog]('metadataLog) def verify(execution: StreamExecution) - (batchId: Long, expectedBatches: Int, expectedFileNames: Array[String]): Boolean = { + (batchId: Long, expectedBatches: Int): Boolean = { val fileSource = (execution invokePrivate _sources()).head.asInstanceOf[FileStreamSource] val metadataLog = fileSource invokePrivate _metadataLog() - val files = new File(metadataLog.metadataPath.toUri.toString).listFiles( - new FilenameFilter { - override def accept(dir: File, name: String): Boolean = { - try { - name.toLong - true - } catch { - case _: NumberFormatException => false - } - } - }).map(_.getName) - - metadataLog.get(None, Some(batchId)).flatMap(_._2).size === expectedBatches && - files === expectedFileNames + + metadataLog.get(None, Some(batchId)).flatMap(_._2).toSet.size === expectedBatches } withTempDirs { case (src, tmp) => withSQLConf( - SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key -> "2", - SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY.key -> "0ms" + SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key -> "2" ) { val fileStream = createFileStream("text", src.getCanonicalPath) val filtered = fileStream.filter($"value" contains "keep") @@ -844,22 +835,22 @@ class FileStreamSourceSuite extends FileStreamSourceTest { testStream(filtered)( AddTextFileData("drop1\nkeep2\nkeep3", src, tmp), CheckAnswer("keep2", "keep3"), - AssertOnQuery(verify(_)(0L, 1, Array("0"))), + AssertOnQuery(verify(_)(0L, 1)), AddTextFileData("drop4\nkeep5\nkeep6", src, tmp), CheckAnswer("keep2", "keep3", "keep5", "keep6"), - AssertOnQuery(verify(_)(1L, 2, Array("0", "1"))), + AssertOnQuery(verify(_)(1L, 2)), AddTextFileData("drop7\nkeep8\nkeep9", src, tmp), CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9"), - AssertOnQuery(verify(_)(2L, 3, Array("1", "2"))), + AssertOnQuery(verify(_)(2L, 3)), StopStream, StartStream(), - AssertOnQuery(verify(_)(2L, 3, Array("1", "2"))), + AssertOnQuery(verify(_)(2L, 3)), AddTextFileData("drop10\nkeep11", src, tmp), CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9", "keep11"), - AssertOnQuery(verify(_)(3L, 4, Array("1", "2", "3"))), + AssertOnQuery(verify(_)(3L, 4)), AddTextFileData("drop12\nkeep13", src, tmp), CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9", "keep11", "keep13"), - AssertOnQuery(verify(_)(4L, 5, Array("3", "4"))) + AssertOnQuery(verify(_)(4L, 5)) ) } } From 5300d9d331074ad4530996c590a7325258cbabfb Mon Sep 17 00:00:00 2001 From: jerryshao Date: Mon, 12 Sep 2016 15:37:18 +0800 Subject: [PATCH 03/10] refactor according to comments --- .../streaming/FileStreamSinkLog.scala | 193 ++---------------- .../streaming/FileStreamSource.scala | 141 ++++--------- .../apache/spark/sql/internal/SQLConf.scala | 4 +- .../streaming/FileStreamSinkLogSuite.scala | 11 +- .../sql/streaming/FileStreamSourceSuite.scala | 37 ++-- 5 files changed, 89 insertions(+), 297 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala index 6f9f7c18c4dc..08aefcabb3e7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala @@ -18,9 +18,8 @@ package org.apache.spark.sql.execution.streaming import java.io.IOException -import java.nio.charset.StandardCharsets.UTF_8 -import org.apache.hadoop.fs.{FileStatus, Path, PathFilter} +import org.apache.hadoop.fs.{FileStatus, Path} import org.json4s.NoTypeHints import org.json4s.jackson.Serialization import org.json4s.jackson.Serialization.{read, write} @@ -80,195 +79,40 @@ object SinkFileStatus { * (drops the deleted files). */ class FileStreamSinkLog(sparkSession: SparkSession, path: String) - extends HDFSMetadataLog[Array[SinkFileStatus]](sparkSession, path) { - - import FileStreamSinkLog._ + extends CompactibleFileStreamLog[SinkFileStatus](sparkSession, path) { private implicit val formats = Serialization.formats(NoTypeHints) - /** - * If we delete the old files after compaction at once, there is a race condition in S3: other - * processes may see the old files are deleted but still cannot see the compaction file using - * "list". The `allFiles` handles this by looking for the next compaction file directly, however, - * a live lock may happen if the compaction happens too frequently: one processing keeps deleting - * old files while another one keeps retrying. Setting a reasonable cleanup delay could avoid it. - */ - private val fileCleanupDelayMs = sparkSession.sessionState.conf.fileSinkLogCleanupDelay + protected override val fileCleanupDelayMs = + sparkSession.conf.get(SQLConf.FILE_SINK_LOG_CLEANUP_DELAY) - private val isDeletingExpiredLog = sparkSession.sessionState.conf.fileSinkLogDeletion + protected override val isDeletingExpiredLog = + sparkSession.conf.get(SQLConf.FILE_SINK_LOG_DELETION) - private val compactInterval = sparkSession.sessionState.conf.fileSinkLogCompatInterval + protected override val compactInterval = + sparkSession.conf.get(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL) require(compactInterval > 0, s"Please set ${SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key} (was $compactInterval) " + "to a positive value.") - override def batchIdToPath(batchId: Long): Path = { - if (isCompactionBatch(batchId, compactInterval)) { - new Path(metadataPath, s"$batchId$COMPACT_FILE_SUFFIX") - } else { - new Path(metadataPath, batchId.toString) - } - } - - override def pathToBatchId(path: Path): Long = { - getBatchIdFromFileName(path.getName) - } - - override def isBatchFile(path: Path): Boolean = { - try { - getBatchIdFromFileName(path.getName) - true - } catch { - case _: NumberFormatException => false - } - } - - override def serialize(logData: Array[SinkFileStatus]): Array[Byte] = { - (VERSION +: logData.map(write(_))).mkString("\n").getBytes(UTF_8) - } - - override def deserialize(bytes: Array[Byte]): Array[SinkFileStatus] = { - val lines = new String(bytes, UTF_8).split("\n") - if (lines.length == 0) { - throw new IllegalStateException("Incomplete log file") - } - val version = lines(0) - if (version != VERSION) { - throw new IllegalStateException(s"Unknown log version: ${version}") - } - lines.slice(1, lines.length).map(read[SinkFileStatus](_)) - } - - override def add(batchId: Long, logs: Array[SinkFileStatus]): Boolean = { - if (isCompactionBatch(batchId, compactInterval)) { - compact(batchId, logs) - } else { - super.add(batchId, logs) - } - } - - /** - * Returns all files except the deleted ones. - */ - def allFiles(): Array[SinkFileStatus] = { - var latestId = getLatest().map(_._1).getOrElse(-1L) - // There is a race condition when `FileStreamSink` is deleting old files and `StreamFileCatalog` - // is calling this method. This loop will retry the reading to deal with the - // race condition. - while (true) { - if (latestId >= 0) { - val startId = getAllValidBatches(latestId, compactInterval)(0) - try { - val logs = get(Some(startId), Some(latestId)).flatMap(_._2) - return compactLogs(logs).toArray - } catch { - case e: IOException => - // Another process using `FileStreamSink` may delete the batch files when - // `StreamFileCatalog` are reading. However, it only happens when a compaction is - // deleting old files. If so, let's try the next compaction batch and we should find it. - // Otherwise, this is a real IO issue and we should throw it. - latestId = nextCompactionBatchId(latestId, compactInterval) - get(latestId).getOrElse { - throw e - } - } - } else { - return Array.empty - } - } - Array.empty + protected override def serializeData(data: SinkFileStatus): String = { + write(data) } - /** - * Compacts all logs before `batchId` plus the provided `logs`, and writes them into the - * corresponding `batchId` file. It will delete expired files as well if enabled. - */ - private def compact(batchId: Long, logs: Seq[SinkFileStatus]): Boolean = { - val validBatches = getValidBatchesBeforeCompactionBatch(batchId, compactInterval) - val allLogs = validBatches.flatMap(batchId => get(batchId)).flatten ++ logs - if (super.add(batchId, compactLogs(allLogs).toArray)) { - if (isDeletingExpiredLog) { - deleteExpiredLog(batchId) - } - true - } else { - // Return false as there is another writer. - false - } + protected override def deserializeData(encodedString: String): SinkFileStatus = { + read[SinkFileStatus](encodedString) } - /** - * Since all logs before `compactionBatchId` are compacted and written into the - * `compactionBatchId` log file, they can be removed. However, due to the eventual consistency of - * S3, the compaction file may not be seen by other processes at once. So we only delete files - * created `fileCleanupDelayMs` milliseconds ago. - */ - private def deleteExpiredLog(compactionBatchId: Long): Unit = { - val expiredTime = System.currentTimeMillis() - fileCleanupDelayMs - fileManager.list(metadataPath, new PathFilter { - override def accept(path: Path): Boolean = { - try { - val batchId = getBatchIdFromFileName(path.getName) - batchId < compactionBatchId - } catch { - case _: NumberFormatException => - false - } - } - }).foreach { f => - if (f.getModificationTime <= expiredTime) { - fileManager.delete(f.getPath) - } - } + protected override def compactLogs( + oldLogs: Seq[SinkFileStatus], newLogs: Seq[SinkFileStatus]): Seq[SinkFileStatus] = { + FileStreamSinkLog.compactLogs(oldLogs ++ newLogs) } } object FileStreamSinkLog { - val VERSION = "v1" - val COMPACT_FILE_SUFFIX = ".compact" val DELETE_ACTION = "delete" val ADD_ACTION = "add" - def getBatchIdFromFileName(fileName: String): Long = { - fileName.stripSuffix(COMPACT_FILE_SUFFIX).toLong - } - - /** - * Returns if this is a compaction batch. FileStreamSinkLog will compact old logs every - * `compactInterval` commits. - * - * E.g., if `compactInterval` is 3, then 2, 5, 8, ... are all compaction batches. - */ - def isCompactionBatch(batchId: Long, compactInterval: Int): Boolean = { - (batchId + 1) % compactInterval == 0 - } - - /** - * Returns all valid batches before the specified `compactionBatchId`. They contain all logs we - * need to do a new compaction. - * - * E.g., if `compactInterval` is 3 and `compactionBatchId` is 5, this method should returns - * `Seq(2, 3, 4)` (Note: it includes the previous compaction batch 2). - */ - def getValidBatchesBeforeCompactionBatch( - compactionBatchId: Long, - compactInterval: Int): Seq[Long] = { - assert(isCompactionBatch(compactionBatchId, compactInterval), - s"$compactionBatchId is not a compaction batch") - (math.max(0, compactionBatchId - compactInterval)) until compactionBatchId - } - - /** - * Returns all necessary logs before `batchId` (inclusive). If `batchId` is a compaction, just - * return itself. Otherwise, it will find the previous compaction batch and return all batches - * between it and `batchId`. - */ - def getAllValidBatches(batchId: Long, compactInterval: Long): Seq[Long] = { - assert(batchId >= 0) - val start = math.max(0, (batchId + 1) / compactInterval * compactInterval - 1) - start to batchId - } - /** * Removes all deleted files from logs. It assumes once one file is deleted, it won't be added to * the log in future. @@ -281,11 +125,4 @@ object FileStreamSinkLog { logs.filter(f => !deletedFiles.contains(f.path)) } } - - /** - * Returns the next compaction batch id after `batchId`. - */ - def nextCompactionBatchId(batchId: Long, compactInterval: Long): Long = { - (batchId + compactInterval + 1) / compactInterval * compactInterval - 1 - } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index 6cb2f8ed9f22..9dafc8b6554f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -17,36 +17,21 @@ package org.apache.spark.sql.execution.streaming -<<<<<<< 92ce8d4849a0341c4636e70821b7be57ad3055b1 import scala.collection.JavaConverters._ -======= -import java.util.UUID -import scala.collection.mutable.ArrayBuffer -import scala.util.control.NonFatal ->>>>>>> Add the ability to remove the old MetadataLog in FileStreamSource - -import org.apache.hadoop.fs.{Path, PathFilter} +import org.apache.hadoop.fs.Path +import org.json4s.NoTypeHints +import org.json4s.jackson.Serialization import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} -<<<<<<< 92ce8d4849a0341c4636e70821b7be57ad3055b1 import org.apache.spark.sql.execution.datasources.{DataSource, ListingFileCatalog, LogicalRelation} -======= -import org.apache.spark.sql.execution.datasources.{CaseInsensitiveMap, DataSource, ListingFileCatalog, LogicalRelation} import org.apache.spark.sql.internal.SQLConf ->>>>>>> Add the ability to remove the old MetadataLog in FileStreamSource import org.apache.spark.sql.types.StructType /** -<<<<<<< 92ce8d4849a0341c4636e70821b7be57ad3055b1 * A very simple source that reads files from the given directory as they appear. - * - * TODO: Clean up the metadata log files periodically. -======= - * A very simple source that reads text files from the given directory as they appear. ->>>>>>> Add the ability to remove the old MetadataLog in FileStreamSource */ class FileStreamSource( sparkSession: SparkSession, @@ -56,7 +41,6 @@ class FileStreamSource( metadataPath: String, options: Map[String, String]) extends Source with Logging { -<<<<<<< 92ce8d4849a0341c4636e70821b7be57ad3055b1 import FileStreamSource._ private val sourceOptions = new FileStreamOptions(options) @@ -66,13 +50,7 @@ class FileStreamSource( fs.makeQualified(new Path(path)) // can contains glob patterns } - private val metadataLog = new HDFSMetadataLog[Array[FileEntry]](sparkSession, metadataPath) - -======= - private val fs = new Path(path).getFileSystem(sparkSession.sessionState.newHadoopConf()) - private val qualifiedBasePath = fs.makeQualified(new Path(path)) // can contains glob patterns private val metadataLog = new FileStreamSourceLog(sparkSession, metadataPath) ->>>>>>> Add the ability to remove the old MetadataLog in FileStreamSource private var maxBatchId = metadataLog.getLatest().map(_._1).getOrElse(-1L) /** Maximum number of new files to be considered in each batch */ @@ -82,11 +60,10 @@ class FileStreamSource( // Visible for testing and debugging in production. val seenFiles = new SeenFilesMap(sourceOptions.maxFileAgeMs) - metadataLog.get(None, Some(maxBatchId)).foreach { case (batchId, entry) => - entry.foreach(seenFiles.add) - // TODO: move purge call out of the loop once we truncate logs. - seenFiles.purge() + metadataLog.allFiles().foreach { entry => + seenFiles.add(entry) } + seenFiles.purge() logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, maxFileAge = ${sourceOptions.maxFileAgeMs}") @@ -196,7 +173,14 @@ object FileStreamSource { /** Timestamp for file modification time, in ms since January 1, 1970 UTC. */ type Timestamp = Long - case class FileEntry(path: String, timestamp: Timestamp) extends Serializable + // Default action when `FileEntry` is persisted into log. + val ADD_ACTION = "add" + // Action when `FileEntry` is compacted. + val COMPACT_ACTION = "compact" + + + case class FileEntry(path: String, timestamp: Timestamp, action: String = ADD_ACTION) + extends Serializable /** * A custom hash map used to track the list of files seen. This map is not thread-safe. @@ -255,86 +239,47 @@ object FileStreamSource { map.entrySet().asScala.map(entry => FileEntry(entry.getKey, entry.getValue)).toSeq } } -} - -class FileStreamSourceLog(sparkSession: SparkSession, path: String) - extends HDFSMetadataLog[Seq[String]](sparkSession, path) { - // Configurations about metadata compaction - private val compactInterval = sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL) - require(compactInterval > 0, - s"Please set ${SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key} (was $compactInterval) to a " + - s"positive value.") + class FileStreamSourceLog(sparkSession: SparkSession, path: String) + extends CompactibleFileStreamLog[FileEntry](sparkSession, path) { - private val fileCleanupDelayMs = sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY) + // Configurations about metadata compaction + protected override val compactInterval = + sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL) + require(compactInterval > 0, + s"Please set ${SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key} (was $compactInterval) to a " + + s"positive value.") - private val isDeletingExpiredLog = sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_DELETION) + protected override val fileCleanupDelayMs = + sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY) - private var compactBatchId: Long = -1L + protected override val isDeletingExpiredLog = + sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_DELETION) - private def isCompactionBatch(batchId: Long, compactInterval: Long): Boolean = { - batchId % compactInterval == 0 - } + private implicit val formats = Serialization.formats(NoTypeHints) - override def add(batchId: Long, metadata: Seq[String]): Boolean = { - if (isCompactionBatch(batchId, compactInterval)) { - compactMetadataLog(batchId - 1) + protected override def serializeData(data: FileEntry): String = { + Serialization.write(data) } - super.add(batchId, metadata) - } - - private def compactMetadataLog(batchId: Long): Unit = { - // read out compact metadata and merge with new metadata. - val batches = super.get(Some(compactBatchId), Some(batchId)) - val totalMetadata = batches.flatMap(_._2) - if (totalMetadata.isEmpty) { - return - } - - // Remove old compact metadata file and rewrite. - val renamedPath = new Path(path, s".${batchId.toString}-${UUID.randomUUID.toString}.tmp") - fileManager.rename(batchIdToPath(batchId), renamedPath) - - var isSuccess = false - try { - isSuccess = super.add(batchId, totalMetadata) - } catch { - case NonFatal(e) => isSuccess = false - } finally { - if (!isSuccess) { - // Rollback to the previous status if compaction is failed. - fileManager.delete(batchIdToPath(batchId)) - fileManager.rename(renamedPath, batchIdToPath(batchId)) - return - } else { - fileManager.delete(renamedPath) - } + def deserializeData(encodedString: String): FileEntry = { + Serialization.read[FileEntry](encodedString) } - compactBatchId = batchId - - // Remove expired metadata log - if (isDeletingExpiredLog) { - removeOlderThan(compactBatchId) + protected override def compactLogs( + oldLogs: Seq[FileEntry], newLogs: Seq[FileEntry]): Seq[FileEntry] = { + // Change the action of old file entry into COMPACT, so when fetching these out, they will + // be filtered out to avoid processing again. + oldLogs.map(e => FileEntry(e.path, e.timestamp, COMPACT_ACTION)) ++ newLogs } - } - private def removeOlderThan(batchId: Long): Unit = { - val expiredTime = System.currentTimeMillis() - fileCleanupDelayMs - fileManager.list(metadataPath, new PathFilter { - override def accept(path: Path): Boolean = { - try { - val id = pathToBatchId(path) - id < batchId - } catch { - case _: NumberFormatException => - false - } - } - }).foreach { f => - if (f.getModificationTime <= expiredTime) { - fileManager.delete(f.getPath) + override def get( + startId: Option[Long], endId: Option[Long]): Array[(Long, Array[FileEntry])] = { + super.get(startId, endId).map { case (id, entries) => + // Keep only the file entries in which the action is ADD, this will keep the consistency + // while retrieving again after compaction. + val addedEntries = entries.filter(_.action == ADD_ACTION) + (id, addedEntries) } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index ab0903b56810..f8b7a7f8ef77 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -544,7 +544,7 @@ object SQLConf { .internal() .doc("How long that a file is guaranteed to be visible for all readers.") .timeConf(TimeUnit.MILLISECONDS) - .createWithDefault(60 * 10 * 1000L) // 10 minutes + .createWithDefault(TimeUnit.MINUTES.toMillis(10)) // 10 minutes val FILE_SOURCE_LOG_DELETION = SQLConfigBuilder("spark.sql.streaming.fileSource.log.deletion") .internal() @@ -565,7 +565,7 @@ object SQLConf { .internal() .doc("How long in milliseconds a file is guaranteed to be visible for all readers.") .timeConf(TimeUnit.MILLISECONDS) - .createWithDefault(60 * 10 * 1000L) // 10 minutes + .createWithDefault(TimeUnit.MINUTES.toMillis(10)) // 10 minutes val STREAMING_SCHEMA_INFERENCE = SQLConfigBuilder("spark.sql.streaming.schemaInference") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala index 26f8b98cb38a..bce75d6e938c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala @@ -25,13 +25,14 @@ import org.apache.spark.sql.test.SharedSQLContext class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext { + import CompactibleFileStreamLog._ import FileStreamSinkLog._ test("getBatchIdFromFileName") { assert(1234L === getBatchIdFromFileName("1234")) assert(1234L === getBatchIdFromFileName("1234.compact")) intercept[NumberFormatException] { - FileStreamSinkLog.getBatchIdFromFileName("1234a") + getBatchIdFromFileName("1234a") } } @@ -125,21 +126,21 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext { action = FileStreamSinkLog.ADD_ACTION)) // scalastyle:off - val expected = s"""${FileStreamSinkLog.VERSION} + val expected = s"""$VERSION |{"path":"/a/b/x","size":100,"isDir":false,"modificationTime":1000,"blockReplication":1,"blockSize":10000,"action":"add"} |{"path":"/a/b/y","size":200,"isDir":false,"modificationTime":2000,"blockReplication":2,"blockSize":20000,"action":"delete"} |{"path":"/a/b/z","size":300,"isDir":false,"modificationTime":3000,"blockReplication":3,"blockSize":30000,"action":"add"}""".stripMargin // scalastyle:on assert(expected === new String(sinkLog.serialize(logs), UTF_8)) - assert(FileStreamSinkLog.VERSION === new String(sinkLog.serialize(Array()), UTF_8)) + assert(VERSION === new String(sinkLog.serialize(Array()), UTF_8)) } } test("deserialize") { withFileStreamSinkLog { sinkLog => // scalastyle:off - val logs = s"""${FileStreamSinkLog.VERSION} + val logs = s"""$VERSION |{"path":"/a/b/x","size":100,"isDir":false,"modificationTime":1000,"blockReplication":1,"blockSize":10000,"action":"add"} |{"path":"/a/b/y","size":200,"isDir":false,"modificationTime":2000,"blockReplication":2,"blockSize":20000,"action":"delete"} |{"path":"/a/b/z","size":300,"isDir":false,"modificationTime":3000,"blockReplication":3,"blockSize":30000,"action":"add"}""".stripMargin @@ -173,7 +174,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext { assert(expected === sinkLog.deserialize(logs.getBytes(UTF_8))) - assert(Nil === sinkLog.deserialize(FileStreamSinkLog.VERSION.getBytes(UTF_8))) + assert(Nil === sinkLog.deserialize(VERSION.getBytes(UTF_8))) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index d4e854ace2ad..792069f4b503 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -17,24 +17,17 @@ package org.apache.spark.sql.streaming -<<<<<<< 8f2fdce1b015c829b6d0c398213991c7f15575c9 -<<<<<<< 92ce8d4849a0341c4636e70821b7be57ad3055b1 -import java.io.File - -import org.scalatest.concurrent.Eventually._ -import org.scalatest.time.SpanSugar._ -======= import java.io.{File, FilenameFilter} -======= import java.io.File ->>>>>>> Fix flaky test +import org.scalatest.concurrent.Eventually._ import org.scalatest.PrivateMethodTester ->>>>>>> Add the ability to remove the old MetadataLog in FileStreamSource +import org.scalatest.time.SpanSugar._ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.execution.streaming.FileStreamSource.FileStreamSourceLog import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ @@ -633,7 +626,6 @@ class FileStreamSourceSuite extends FileStreamSourceTest { } } -<<<<<<< 92ce8d4849a0341c4636e70821b7be57ad3055b1 test("max files per trigger") { withTempDir { case src => var lastFileModTime: Option[Long] = None @@ -812,16 +804,34 @@ class FileStreamSourceSuite extends FileStreamSourceTest { ) } } -======= - test("clean obsolete metadata log") { + + test("compacat metadata log") { val _sources = PrivateMethod[Seq[Source]]('sources) val _metadataLog = PrivateMethod[FileStreamSourceLog]('metadataLog) def verify(execution: StreamExecution) (batchId: Long, expectedBatches: Int): Boolean = { + import CompactibleFileStreamLog._ + val fileSource = (execution invokePrivate _sources()).head.asInstanceOf[FileStreamSource] val metadataLog = fileSource invokePrivate _metadataLog() + if (isCompactionBatch(batchId, 2)) { + val path = metadataLog.batchIdToPath(batchId) + + // Assert path name should be ended with compact suffix. + assert(path.getName.endsWith(COMPACT_FILE_SUFFIX)) + + // Compact batch should include all entries from start. + val entries = metadataLog.get(batchId) + assert(entries.isDefined) + assert(entries.get.length === metadataLog.allFiles().length) + assert(metadataLog.get(None, Some(batchId)).flatMap(_._2).length === entries.get.length) + } + + assert(metadataLog.allFiles().length === + metadataLog.get(None, Some(batchId)).flatMap(_._2).length) + metadataLog.get(None, Some(batchId)).flatMap(_._2).toSet.size === expectedBatches } @@ -855,7 +865,6 @@ class FileStreamSourceSuite extends FileStreamSourceTest { } } } ->>>>>>> Add the ability to remove the old MetadataLog in FileStreamSource } class FileStreamSourceStressTestSuite extends FileStreamSourceTest { From 4187999cc52d8e29e00032f37ead5718c29694a6 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Mon, 12 Sep 2016 15:49:12 +0800 Subject: [PATCH 04/10] fix compile error --- .../streaming/CompactibleFileStreamLog.scala | 249 ++++++++++++++++++ 1 file changed, 249 insertions(+) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala new file mode 100644 index 000000000000..5395cb67447e --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.io.IOException +import java.nio.charset.StandardCharsets.UTF_8 +import java.util.concurrent.TimeUnit + +import scala.reflect.ClassTag + +import org.apache.hadoop.fs.{Path, PathFilter} + +import org.apache.spark.sql.SparkSession + +/** + * An abstract class for compactible metadata logs. It will write one log file for each batch. + * The first line of the log file is the version number, and there are multiple JSON lines + * following. + * + * As reading from many small files is usually pretty slow, also too many + * small files in one folder will mess the FS, [[CompactibleFileStreamLog]] will + * compact log files every 10 batches by default into a big file. When + * doing a compaction, it will read all old log files and merge them with the new batch. + */ +abstract class CompactibleFileStreamLog[T: ClassTag]( + sparkSession: SparkSession, + path: String) + extends HDFSMetadataLog[Array[T]](sparkSession, path) { + + import CompactibleFileStreamLog._ + + /** + * If we delete the old files after compaction at once, there is a race condition in S3: other + * processes may see the old files are deleted but still cannot see the compaction file using + * "list". The `allFiles` handles this by looking for the next compaction file directly, however, + * a live lock may happen if the compaction happens too frequently: one processing keeps deleting + * old files while another one keeps retrying. Setting a reasonable cleanup delay could avoid it. + */ + protected val fileCleanupDelayMs = TimeUnit.MINUTES.toMillis(10) + + protected val isDeletingExpiredLog = true + + protected val compactInterval = 10 + + /** + * Serialize the data into encoded string. + */ + protected def serializeData(t: T): String + + /** + * Deserialize the string into data object. + */ + protected def deserializeData(encodedString: String): T + + /** + * Filter out the unwanted logs, by default it filters out nothing, inherited class could + * override this method to do filtering. + */ + protected def compactLogs(oldLogs: Seq[T], newLogs: Seq[T]): Seq[T] = { + oldLogs ++ newLogs + } + + override def batchIdToPath(batchId: Long): Path = { + if (isCompactionBatch(batchId, compactInterval)) { + new Path(metadataPath, s"$batchId$COMPACT_FILE_SUFFIX") + } else { + new Path(metadataPath, batchId.toString) + } + } + + override def pathToBatchId(path: Path): Long = { + getBatchIdFromFileName(path.getName) + } + + override def isBatchFile(path: Path): Boolean = { + try { + getBatchIdFromFileName(path.getName) + true + } catch { + case _: NumberFormatException => false + } + } + + override def serialize(logData: Array[T]): Array[Byte] = { + (VERSION +: logData.map(serializeData)).mkString("\n").getBytes(UTF_8) + } + + override def deserialize(bytes: Array[Byte]): Array[T] = { + val lines = new String(bytes, UTF_8).split("\n") + if (lines.length == 0) { + throw new IllegalStateException("Incomplete log file") + } + val version = lines(0) + if (version != VERSION) { + throw new IllegalStateException(s"Unknown log version: ${version}") + } + lines.slice(1, lines.length).map(deserializeData) + } + + override def add(batchId: Long, logs: Array[T]): Boolean = { + if (isCompactionBatch(batchId, compactInterval)) { + compact(batchId, logs) + } else { + super.add(batchId, logs) + } + } + + /** + * Compacts all logs before `batchId` plus the provided `logs`, and writes them into the + * corresponding `batchId` file. It will delete expired files as well if enabled. + */ + private def compact(batchId: Long, logs: Array[T]): Boolean = { + val validBatches = getValidBatchesBeforeCompactionBatch(batchId, compactInterval) + val allLogs = validBatches.flatMap(batchId => get(batchId)).flatten + if (super.add(batchId, compactLogs(allLogs, logs).toArray)) { + if (isDeletingExpiredLog) { + deleteExpiredLog(batchId) + } + true + } else { + // Return false as there is another writer. + false + } + } + + /** + * Returns all files except the deleted ones. + */ + def allFiles(): Array[T] = { + var latestId = getLatest().map(_._1).getOrElse(-1L) + // There is a race condition when `FileStreamSink` is deleting old files and `StreamFileCatalog` + // is calling this method. This loop will retry the reading to deal with the + // race condition. + while (true) { + if (latestId >= 0) { + val startId = getAllValidBatches(latestId, compactInterval)(0) + try { + val logs = super.get(Some(startId), Some(latestId)).flatMap(_._2) + return compactLogs(logs, Seq.empty).toArray + } catch { + case e: IOException => + // Another process using `FileStreamSink` may delete the batch files when + // `StreamFileCatalog` are reading. However, it only happens when a compaction is + // deleting old files. If so, let's try the next compaction batch and we should find it. + // Otherwise, this is a real IO issue and we should throw it. + latestId = nextCompactionBatchId(latestId, compactInterval) + get(latestId).getOrElse { + throw e + } + } + } else { + return Array.empty + } + } + Array.empty + } + + /** + * Since all logs before `compactionBatchId` are compacted and written into the + * `compactionBatchId` log file, they can be removed. However, due to the eventual consistency of + * S3, the compaction file may not be seen by other processes at once. So we only delete files + * created `fileCleanupDelayMs` milliseconds ago. + */ + private def deleteExpiredLog(compactionBatchId: Long): Unit = { + val expiredTime = System.currentTimeMillis() - fileCleanupDelayMs + fileManager.list(metadataPath, new PathFilter { + override def accept(path: Path): Boolean = { + try { + val batchId = getBatchIdFromFileName(path.getName) + batchId < compactionBatchId + } catch { + case _: NumberFormatException => + false + } + } + }).foreach { f => + if (f.getModificationTime <= expiredTime) { + fileManager.delete(f.getPath) + } + } + } +} + +object CompactibleFileStreamLog { + val VERSION = "v1" + val COMPACT_FILE_SUFFIX = ".compact" + + def getBatchIdFromFileName(fileName: String): Long = { + fileName.stripSuffix(COMPACT_FILE_SUFFIX).toLong + } + + /** + * Returns if this is a compaction batch. FileStreamSinkLog will compact old logs every + * `compactInterval` commits. + * + * E.g., if `compactInterval` is 3, then 2, 5, 8, ... are all compaction batches. + */ + def isCompactionBatch(batchId: Long, compactInterval: Int): Boolean = { + (batchId + 1) % compactInterval == 0 + } + + /** + * Returns all valid batches before the specified `compactionBatchId`. They contain all logs we + * need to do a new compaction. + * + * E.g., if `compactInterval` is 3 and `compactionBatchId` is 5, this method should returns + * `Seq(2, 3, 4)` (Note: it includes the previous compaction batch 2). + */ + def getValidBatchesBeforeCompactionBatch( + compactionBatchId: Long, + compactInterval: Int): Seq[Long] = { + assert(isCompactionBatch(compactionBatchId, compactInterval), + s"$compactionBatchId is not a compaction batch") + (math.max(0, compactionBatchId - compactInterval)) until compactionBatchId + } + + /** + * Returns all necessary logs before `batchId` (inclusive). If `batchId` is a compaction, just + * return itself. Otherwise, it will find the previous compaction batch and return all batches + * between it and `batchId`. + */ + def getAllValidBatches(batchId: Long, compactInterval: Long): Seq[Long] = { + assert(batchId >= 0) + val start = math.max(0, (batchId + 1) / compactInterval * compactInterval - 1) + start to batchId + } + + /** + * Returns the next compaction batch id after `batchId`. + */ + def nextCompactionBatchId(batchId: Long, compactInterval: Long): Long = { + (batchId + compactInterval + 1) / compactInterval * compactInterval - 1 + } +} From fb5a72c72906eb1342d0bb7866a409b5e6283888 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Mon, 12 Sep 2016 15:53:31 +0800 Subject: [PATCH 05/10] Remove white space --- .../apache/spark/sql/execution/streaming/FileStreamSource.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index 9dafc8b6554f..ea43d86cc994 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -178,7 +178,6 @@ object FileStreamSource { // Action when `FileEntry` is compacted. val COMPACT_ACTION = "compact" - case class FileEntry(path: String, timestamp: Timestamp, action: String = ADD_ACTION) extends Serializable From bbf766308a3dca0266991179c0f8d378223664b0 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Wed, 14 Sep 2016 20:26:11 +0800 Subject: [PATCH 06/10] Address the comments --- .../streaming/CompactibleFileStreamLog.scala | 38 +++-- .../execution/streaming/FileStreamSink.scala | 3 +- .../streaming/FileStreamSinkLog.scala | 33 ++--- .../streaming/FileStreamSource.scala | 59 +------- .../streaming/FileStreamSourceLog.scala | 133 ++++++++++++++++++ .../streaming/MetadataLogFileCatalog.scala | 3 +- .../streaming/FileStreamSinkLogSuite.scala | 24 ++-- .../sql/streaming/FileStreamSourceSuite.scala | 48 +++++-- 8 files changed, 221 insertions(+), 120 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala index 5395cb67447e..314e9c128cb1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.execution.streaming import java.io.IOException import java.nio.charset.StandardCharsets.UTF_8 -import java.util.concurrent.TimeUnit import scala.reflect.ClassTag @@ -29,8 +28,8 @@ import org.apache.spark.sql.SparkSession /** * An abstract class for compactible metadata logs. It will write one log file for each batch. - * The first line of the log file is the version number, and there are multiple JSON lines - * following. + * The first line of the log file is the version number, and there are multiple serialized + * metadata lines following. * * As reading from many small files is usually pretty slow, also too many * small files in one folder will mess the FS, [[CompactibleFileStreamLog]] will @@ -38,6 +37,7 @@ import org.apache.spark.sql.SparkSession * doing a compaction, it will read all old log files and merge them with the new batch. */ abstract class CompactibleFileStreamLog[T: ClassTag]( + metadataLogVersion: String, sparkSession: SparkSession, path: String) extends HDFSMetadataLog[Array[T]](sparkSession, path) { @@ -51,11 +51,11 @@ abstract class CompactibleFileStreamLog[T: ClassTag]( * a live lock may happen if the compaction happens too frequently: one processing keeps deleting * old files while another one keeps retrying. Setting a reasonable cleanup delay could avoid it. */ - protected val fileCleanupDelayMs = TimeUnit.MINUTES.toMillis(10) + protected def fileCleanupDelayMs: Long - protected val isDeletingExpiredLog = true + protected def isDeletingExpiredLog: Boolean - protected val compactInterval = 10 + protected def compactInterval: Int /** * Serialize the data into encoded string. @@ -68,12 +68,9 @@ abstract class CompactibleFileStreamLog[T: ClassTag]( protected def deserializeData(encodedString: String): T /** - * Filter out the unwanted logs, by default it filters out nothing, inherited class could - * override this method to do filtering. + * Filter out the obsolote logs. */ - protected def compactLogs(oldLogs: Seq[T], newLogs: Seq[T]): Seq[T] = { - oldLogs ++ newLogs - } + def compactLogs(logs: Seq[T]): Seq[T] override def batchIdToPath(batchId: Long): Path = { if (isCompactionBatch(batchId, compactInterval)) { @@ -97,7 +94,7 @@ abstract class CompactibleFileStreamLog[T: ClassTag]( } override def serialize(logData: Array[T]): Array[Byte] = { - (VERSION +: logData.map(serializeData)).mkString("\n").getBytes(UTF_8) + (metadataLogVersion +: logData.map(serializeData)).mkString("\n").getBytes(UTF_8) } override def deserialize(bytes: Array[Byte]): Array[T] = { @@ -106,7 +103,7 @@ abstract class CompactibleFileStreamLog[T: ClassTag]( throw new IllegalStateException("Incomplete log file") } val version = lines(0) - if (version != VERSION) { + if (version != metadataLogVersion) { throw new IllegalStateException(s"Unknown log version: ${version}") } lines.slice(1, lines.length).map(deserializeData) @@ -126,8 +123,8 @@ abstract class CompactibleFileStreamLog[T: ClassTag]( */ private def compact(batchId: Long, logs: Array[T]): Boolean = { val validBatches = getValidBatchesBeforeCompactionBatch(batchId, compactInterval) - val allLogs = validBatches.flatMap(batchId => get(batchId)).flatten - if (super.add(batchId, compactLogs(allLogs, logs).toArray)) { + val allLogs = validBatches.flatMap(batchId => super.get(batchId)).flatten ++ logs + if (super.add(batchId, compactLogs(allLogs).toArray)) { if (isDeletingExpiredLog) { deleteExpiredLog(batchId) } @@ -148,18 +145,18 @@ abstract class CompactibleFileStreamLog[T: ClassTag]( // race condition. while (true) { if (latestId >= 0) { - val startId = getAllValidBatches(latestId, compactInterval)(0) try { - val logs = super.get(Some(startId), Some(latestId)).flatMap(_._2) - return compactLogs(logs, Seq.empty).toArray + val logs = + getAllValidBatches(latestId, compactInterval).flatMap(id => super.get(id)).flatten + return compactLogs(logs).toArray } catch { case e: IOException => - // Another process using `FileStreamSink` may delete the batch files when + // Another process using `CompactibleFileStreamLog` may delete the batch files when // `StreamFileCatalog` are reading. However, it only happens when a compaction is // deleting old files. If so, let's try the next compaction batch and we should find it. // Otherwise, this is a real IO issue and we should throw it. latestId = nextCompactionBatchId(latestId, compactInterval) - get(latestId).getOrElse { + super.get(latestId).getOrElse { throw e } } @@ -197,7 +194,6 @@ abstract class CompactibleFileStreamLog[T: ClassTag]( } object CompactibleFileStreamLog { - val VERSION = "v1" val COMPACT_FILE_SUFFIX = ".compact" def getBatchIdFromFileName(fileName: String): Long = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala index 0f7d95813683..02c5b857ee7f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala @@ -56,7 +56,8 @@ class FileStreamSink( private val basePath = new Path(path) private val logPath = new Path(basePath, FileStreamSink.metadataDir) - private val fileLog = new FileStreamSinkLog(sparkSession, logPath.toUri.toString) + private val fileLog = + new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession, logPath.toUri.toString) private val hadoopConf = sparkSession.sessionState.newHadoopConf() private val fs = basePath.getFileSystem(hadoopConf) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala index 08aefcabb3e7..64f2f00484f4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql.execution.streaming -import java.io.IOException - import org.apache.hadoop.fs.{FileStatus, Path} import org.json4s.NoTypeHints import org.json4s.jackson.Serialization @@ -78,8 +76,11 @@ object SinkFileStatus { * When the reader uses `allFiles` to list all files, this method only returns the visible files * (drops the deleted files). */ -class FileStreamSinkLog(sparkSession: SparkSession, path: String) - extends CompactibleFileStreamLog[SinkFileStatus](sparkSession, path) { +class FileStreamSinkLog( + metadataLogVersion: String, + sparkSession: SparkSession, + path: String) + extends CompactibleFileStreamLog[SinkFileStatus](metadataLogVersion, sparkSession, path) { private implicit val formats = Serialization.formats(NoTypeHints) @@ -103,22 +104,8 @@ class FileStreamSinkLog(sparkSession: SparkSession, path: String) read[SinkFileStatus](encodedString) } - protected override def compactLogs( - oldLogs: Seq[SinkFileStatus], newLogs: Seq[SinkFileStatus]): Seq[SinkFileStatus] = { - FileStreamSinkLog.compactLogs(oldLogs ++ newLogs) - } -} - -object FileStreamSinkLog { - val DELETE_ACTION = "delete" - val ADD_ACTION = "add" - - /** - * Removes all deleted files from logs. It assumes once one file is deleted, it won't be added to - * the log in future. - */ - def compactLogs(logs: Seq[SinkFileStatus]): Seq[SinkFileStatus] = { - val deletedFiles = logs.filter(_.action == DELETE_ACTION).map(_.path).toSet + override def compactLogs(logs: Seq[SinkFileStatus]): Seq[SinkFileStatus] = { + val deletedFiles = logs.filter(_.action == FileStreamSinkLog.DELETE_ACTION).map(_.path).toSet if (deletedFiles.isEmpty) { logs } else { @@ -126,3 +113,9 @@ object FileStreamSinkLog { } } } + +object FileStreamSinkLog { + val VERSION = "v1" + val DELETE_ACTION = "delete" + val ADD_ACTION = "add" +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index ea43d86cc994..0dc08b1467b1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -20,14 +20,11 @@ package org.apache.spark.sql.execution.streaming import scala.collection.JavaConverters._ import org.apache.hadoop.fs.Path -import org.json4s.NoTypeHints -import org.json4s.jackson.Serialization import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} import org.apache.spark.sql.execution.datasources.{DataSource, ListingFileCatalog, LogicalRelation} -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.StructType /** @@ -50,7 +47,8 @@ class FileStreamSource( fs.makeQualified(new Path(path)) // can contains glob patterns } - private val metadataLog = new FileStreamSourceLog(sparkSession, metadataPath) + private val metadataLog = + new FileStreamSourceLog(FileStreamSourceLog.VERSION, sparkSession, metadataPath) private var maxBatchId = metadataLog.getLatest().map(_._1).getOrElse(-1L) /** Maximum number of new files to be considered in each batch */ @@ -97,7 +95,7 @@ class FileStreamSource( if (batchFiles.nonEmpty) { maxBatchId += 1 - metadataLog.add(maxBatchId, batchFiles.toArray) + metadataLog.add(maxBatchId, batchFiles.map(_.copy(batchId = maxBatchId)).toArray) logInfo(s"Max batch id increased to $maxBatchId with ${batchFiles.size} new files") } @@ -173,12 +171,9 @@ object FileStreamSource { /** Timestamp for file modification time, in ms since January 1, 1970 UTC. */ type Timestamp = Long - // Default action when `FileEntry` is persisted into log. - val ADD_ACTION = "add" - // Action when `FileEntry` is compacted. - val COMPACT_ACTION = "compact" + val NOT_SET = -1L - case class FileEntry(path: String, timestamp: Timestamp, action: String = ADD_ACTION) + case class FileEntry(path: String, timestamp: Timestamp, batchId: Long = NOT_SET) extends Serializable /** @@ -238,48 +233,4 @@ object FileStreamSource { map.entrySet().asScala.map(entry => FileEntry(entry.getKey, entry.getValue)).toSeq } } - - class FileStreamSourceLog(sparkSession: SparkSession, path: String) - extends CompactibleFileStreamLog[FileEntry](sparkSession, path) { - - // Configurations about metadata compaction - protected override val compactInterval = - sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL) - require(compactInterval > 0, - s"Please set ${SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key} (was $compactInterval) to a " + - s"positive value.") - - protected override val fileCleanupDelayMs = - sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY) - - protected override val isDeletingExpiredLog = - sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_DELETION) - - private implicit val formats = Serialization.formats(NoTypeHints) - - protected override def serializeData(data: FileEntry): String = { - Serialization.write(data) - } - - def deserializeData(encodedString: String): FileEntry = { - Serialization.read[FileEntry](encodedString) - } - - protected override def compactLogs( - oldLogs: Seq[FileEntry], newLogs: Seq[FileEntry]): Seq[FileEntry] = { - // Change the action of old file entry into COMPACT, so when fetching these out, they will - // be filtered out to avoid processing again. - oldLogs.map(e => FileEntry(e.path, e.timestamp, COMPACT_ACTION)) ++ newLogs - } - - override def get( - startId: Option[Long], endId: Option[Long]): Array[(Long, Array[FileEntry])] = { - super.get(startId, endId).map { case (id, entries) => - // Keep only the file entries in which the action is ADD, this will keep the consistency - // while retrieving again after compaction. - val addedEntries = entries.filter(_.action == ADD_ACTION) - (id, addedEntries) - } - } - } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala new file mode 100644 index 000000000000..cf55e7165b17 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import scala.collection.mutable + +import org.json4s.NoTypeHints +import org.json4s.jackson.Serialization + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.execution.streaming.FileStreamSource.FileEntry +import org.apache.spark.sql.internal.SQLConf + +class FileStreamSourceLog( + metadataLogVersion: String, + sparkSession: SparkSession, + path: String) + extends CompactibleFileStreamLog[FileEntry](metadataLogVersion, sparkSession, path) { + + import CompactibleFileStreamLog._ + + // Configurations about metadata compaction + protected override val compactInterval = + sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL) + require(compactInterval > 0, + s"Please set ${SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key} (was $compactInterval) to a " + + s"positive value.") + + protected override val fileCleanupDelayMs = + sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY) + + protected override val isDeletingExpiredLog = + sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_DELETION) + + private implicit val formats = Serialization.formats(NoTypeHints) + + // A fixed size log cache to cache the file entries belong to the compaction batch. It is used + // to avoid scanning the compacted log file to retrieve it's own batch data. + private val cacheSize = compactInterval + private val fileEntryCache = new mutable.LinkedHashMap[Long, Array[FileEntry]] + + private def updateCache(batchId: Long, logs: Array[FileEntry]): Unit = { + if (fileEntryCache.size >= cacheSize) { + fileEntryCache.drop(1) + } + + fileEntryCache.put(batchId, logs) + } + + protected override def serializeData(data: FileEntry): String = { + Serialization.write(data) + } + + protected override def deserializeData(encodedString: String): FileEntry = { + Serialization.read[FileEntry](encodedString) + } + + def compactLogs(logs: Seq[FileEntry]): Seq[FileEntry] = { + logs + } + + override def add(batchId: Long, logs: Array[FileEntry]): Boolean = { + if (super.add(batchId, logs) && isCompactionBatch(batchId, compactInterval)) { + updateCache(batchId, logs) + true + } else if (!isCompactionBatch(batchId, compactInterval)) { + true + } else { + false + } + } + + override def get(startId: Option[Long], endId: Option[Long]): Array[(Long, Array[FileEntry])] = { + val startBatchId = startId.getOrElse(0L) + val endBatchId = getLatest().map(_._1).getOrElse(0L) + + val (existedBatches, removedBatches) = (startBatchId to endBatchId).map { id => + if (isCompactionBatch(id, compactInterval) && fileEntryCache.contains(id)) { + (id, Some(fileEntryCache(id))) + } else { + val logs = super.get(id).map(_.filter(_.batchId == id)) + (id, logs) + } + }.partition(_._2.isDefined) + + // The below code may only be happened when original metadata log file has been removed, so we + // have to get the batch from latest compacted log file. This is quite time-consuming and may + // not be happened in the current FileStreamSource code path, since we only fetch the + // latest metadata log file. + val searchKeys = removedBatches.map(_._1) + val retrievedBatches = if (searchKeys.nonEmpty) { + logWarning(s"Get batches from removed files, this is unexpected in the current code path!!!") + val latestBatchId = getLatest().map(_._1).getOrElse(-1L) + if (latestBatchId < 0) { + Map.empty[Long, Option[Array[FileEntry]]] + } else { + val latestCompactedBatchId = getAllValidBatches(latestBatchId, compactInterval)(0) + val allLogs = new mutable.HashMap[Long, mutable.ArrayBuffer[FileEntry]] + + super.get(latestCompactedBatchId).foreach { entries => + entries.foreach { e => + allLogs.put(e.batchId, allLogs.getOrElse(e.batchId, mutable.ArrayBuffer()) += e) + } + } + + searchKeys.map(id => id -> allLogs.get(id).map(_.toArray)).filter(_._2.isDefined).toMap + } + } else { + Map.empty[Long, Option[Array[FileEntry]]] + } + + (existedBatches ++ retrievedBatches).map(i => i._1 -> i._2.get).toArray.sortBy(_._1) + } +} + +object FileStreamSourceLog { + val VERSION = "v1" +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileCatalog.scala index 20ade12e3796..a32c4671e347 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileCatalog.scala @@ -34,7 +34,8 @@ class MetadataLogFileCatalog(sparkSession: SparkSession, path: Path) private val metadataDirectory = new Path(path, FileStreamSink.metadataDir) logInfo(s"Reading streaming file log from $metadataDirectory") - private val metadataLog = new FileStreamSinkLog(sparkSession, metadataDirectory.toUri.toString) + private val metadataLog = + new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession, metadataDirectory.toUri.toString) private val allFilesFromLog = metadataLog.allFiles().map(_.toFileStatus).filterNot(_.isDirectory) private var cachedPartitionSpec: PartitionSpec = _ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala index bce75d6e938c..41a8cc2400df 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala @@ -84,17 +84,19 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext { } test("compactLogs") { - val logs = Seq( - newFakeSinkFileStatus("/a/b/x", FileStreamSinkLog.ADD_ACTION), - newFakeSinkFileStatus("/a/b/y", FileStreamSinkLog.ADD_ACTION), - newFakeSinkFileStatus("/a/b/z", FileStreamSinkLog.ADD_ACTION)) - assert(logs === compactLogs(logs)) + withFileStreamSinkLog { sinkLog => + val logs = Seq( + newFakeSinkFileStatus("/a/b/x", FileStreamSinkLog.ADD_ACTION), + newFakeSinkFileStatus("/a/b/y", FileStreamSinkLog.ADD_ACTION), + newFakeSinkFileStatus("/a/b/z", FileStreamSinkLog.ADD_ACTION)) + assert(logs === sinkLog.compactLogs(logs)) - val logs2 = Seq( - newFakeSinkFileStatus("/a/b/m", FileStreamSinkLog.ADD_ACTION), - newFakeSinkFileStatus("/a/b/n", FileStreamSinkLog.ADD_ACTION), - newFakeSinkFileStatus("/a/b/z", FileStreamSinkLog.DELETE_ACTION)) - assert(logs.dropRight(1) ++ logs2.dropRight(1) === compactLogs(logs ++ logs2)) + val logs2 = Seq( + newFakeSinkFileStatus("/a/b/m", FileStreamSinkLog.ADD_ACTION), + newFakeSinkFileStatus("/a/b/n", FileStreamSinkLog.ADD_ACTION), + newFakeSinkFileStatus("/a/b/z", FileStreamSinkLog.DELETE_ACTION)) + assert(logs.dropRight(1) ++ logs2.dropRight(1) === sinkLog.compactLogs(logs ++ logs2)) + } } test("serialize") { @@ -264,7 +266,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext { private def withFileStreamSinkLog(f: FileStreamSinkLog => Unit): Unit = { withTempDir { file => - val sinkLog = new FileStreamSinkLog(spark, file.getCanonicalPath) + val sinkLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, spark, file.getCanonicalPath) f(sinkLog) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 792069f4b503..61ff1b6577eb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -17,17 +17,14 @@ package org.apache.spark.sql.streaming -import java.io.{File, FilenameFilter} import java.io.File -import org.scalatest.concurrent.Eventually._ import org.scalatest.PrivateMethodTester import org.scalatest.time.SpanSugar._ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.execution.streaming._ -import org.apache.spark.sql.execution.streaming.FileStreamSource.FileStreamSourceLog import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ @@ -821,18 +818,12 @@ class FileStreamSourceSuite extends FileStreamSourceTest { // Assert path name should be ended with compact suffix. assert(path.getName.endsWith(COMPACT_FILE_SUFFIX)) - - // Compact batch should include all entries from start. - val entries = metadataLog.get(batchId) - assert(entries.isDefined) - assert(entries.get.length === metadataLog.allFiles().length) - assert(metadataLog.get(None, Some(batchId)).flatMap(_._2).length === entries.get.length) } - assert(metadataLog.allFiles().length === - metadataLog.get(None, Some(batchId)).flatMap(_._2).length) + assert(metadataLog.allFiles().sortBy(_.batchId) === + metadataLog.get(None, Some(batchId)).flatMap(_._2).sortBy(_.batchId)) - metadataLog.get(None, Some(batchId)).flatMap(_._2).toSet.size === expectedBatches + metadataLog.get(None, Some(batchId)).flatMap(_._2).length === expectedBatches } withTempDirs { case (src, tmp) => @@ -865,6 +856,39 @@ class FileStreamSourceSuite extends FileStreamSourceTest { } } } + + test("get arbitrary batch from FileStreamSource") { + withTempDirs { case (src, tmp) => + withSQLConf( + SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key -> "2", + // Force deleting the old logs + SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY.key -> "1" + ) { + val fileStream = createFileStream("text", src.getCanonicalPath) + val filtered = fileStream.filter($"value" contains "keep") + + testStream(filtered)( + AddTextFileData("keep1", src, tmp), + CheckAnswer("keep1"), + AddTextFileData("keep2", src, tmp), + CheckAnswer("keep1", "keep2"), + AddTextFileData("keep3", src, tmp), + CheckAnswer("keep1", "keep2", "keep3"), + AssertOnQuery("check getBatch") { execution: StreamExecution => + val _sources = PrivateMethod[Seq[Source]]('sources) + val fileSource = + (execution invokePrivate _sources()).head.asInstanceOf[FileStreamSource] + assert(fileSource.getBatch(None, LongOffset(2)).as[String].collect() === + List("keep1", "keep2", "keep3")) + assert(fileSource.getBatch(Some(LongOffset(0)), LongOffset(2)).as[String].collect() === + List("keep2", "keep3")) + assert(fileSource.getBatch(Some(LongOffset(1)), LongOffset(2)).as[String].collect() === + List("keep3")) + } + ) + } + } + } } class FileStreamSourceStressTestSuite extends FileStreamSourceTest { From 56a00ae5ee97e9eab3e923b5904a1cf5a5a7219c Mon Sep 17 00:00:00 2001 From: jerryshao Date: Wed, 14 Sep 2016 21:27:09 +0800 Subject: [PATCH 07/10] readd the test --- .../apache/spark/sql/streaming/FileStreamSourceSuite.scala | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 61ff1b6577eb..25ce70e1167b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -818,6 +818,12 @@ class FileStreamSourceSuite extends FileStreamSourceTest { // Assert path name should be ended with compact suffix. assert(path.getName.endsWith(COMPACT_FILE_SUFFIX)) + + // Compacted batch should include all entries from start. + val entries = metadataLog.get(batchId) + assert(entries.isDefined) + assert(entries.get.length === metadataLog.allFiles().length) + assert(metadataLog.get(None, Some(batchId)).flatMap(_._2).length === entries.get.length) } assert(metadataLog.allFiles().sortBy(_.batchId) === From be1abfa0e902fca3ed945bfbb6e0573909d55e2b Mon Sep 17 00:00:00 2001 From: jerryshao Date: Sun, 18 Sep 2016 10:16:16 +0800 Subject: [PATCH 08/10] Address the comments --- .../streaming/CompactibleFileStreamLog.scala | 2 +- .../streaming/FileStreamSourceLog.scala | 23 +++++++++---------- 2 files changed, 12 insertions(+), 13 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala index 314e9c128cb1..027b5bbfab8d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala @@ -68,7 +68,7 @@ abstract class CompactibleFileStreamLog[T: ClassTag]( protected def deserializeData(encodedString: String): T /** - * Filter out the obsolote logs. + * Filter out the obsolete logs. */ def compactLogs(logs: Seq[T]): Seq[T] diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala index cf55e7165b17..f9578ee59b68 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala @@ -17,6 +17,9 @@ package org.apache.spark.sql.execution.streaming +import java.util.{LinkedHashMap => JLinkedHashMap} +import java.util.Map.Entry + import scala.collection.mutable import org.json4s.NoTypeHints @@ -49,17 +52,13 @@ class FileStreamSourceLog( private implicit val formats = Serialization.formats(NoTypeHints) - // A fixed size log cache to cache the file entries belong to the compaction batch. It is used - // to avoid scanning the compacted log file to retrieve it's own batch data. + // A fixed size log entry cache to cache the file entries belong to the compaction batch. It is + // used to avoid scanning the compacted log file to retrieve it's own batch data. private val cacheSize = compactInterval - private val fileEntryCache = new mutable.LinkedHashMap[Long, Array[FileEntry]] - - private def updateCache(batchId: Long, logs: Array[FileEntry]): Unit = { - if (fileEntryCache.size >= cacheSize) { - fileEntryCache.drop(1) + private val fileEntryCache = new JLinkedHashMap[Long, Array[FileEntry]] { + override def removeEldestEntry(eldest: Entry[Long, Array[FileEntry]]): Boolean = { + size() > cacheSize } - - fileEntryCache.put(batchId, logs) } protected override def serializeData(data: FileEntry): String = { @@ -76,7 +75,7 @@ class FileStreamSourceLog( override def add(batchId: Long, logs: Array[FileEntry]): Boolean = { if (super.add(batchId, logs) && isCompactionBatch(batchId, compactInterval)) { - updateCache(batchId, logs) + fileEntryCache.put(batchId, logs) true } else if (!isCompactionBatch(batchId, compactInterval)) { true @@ -90,8 +89,8 @@ class FileStreamSourceLog( val endBatchId = getLatest().map(_._1).getOrElse(0L) val (existedBatches, removedBatches) = (startBatchId to endBatchId).map { id => - if (isCompactionBatch(id, compactInterval) && fileEntryCache.contains(id)) { - (id, Some(fileEntryCache(id))) + if (isCompactionBatch(id, compactInterval) && fileEntryCache.containsKey(id)) { + (id, Some(fileEntryCache.get(id))) } else { val logs = super.get(id).map(_.filter(_.batchId == id)) (id, logs) From bddbc7f8e1563000ea4a9dcad07c92e34c24199f Mon Sep 17 00:00:00 2001 From: jerryshao Date: Tue, 20 Sep 2016 11:31:18 +0800 Subject: [PATCH 09/10] Address comments --- .../sql/execution/streaming/FileStreamSourceLog.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala index f9578ee59b68..8103309aff2a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala @@ -74,10 +74,10 @@ class FileStreamSourceLog( } override def add(batchId: Long, logs: Array[FileEntry]): Boolean = { - if (super.add(batchId, logs) && isCompactionBatch(batchId, compactInterval)) { - fileEntryCache.put(batchId, logs) - true - } else if (!isCompactionBatch(batchId, compactInterval)) { + if (super.add(batchId, logs)) { + if (isCompactionBatch(batchId, compactInterval)) { + fileEntryCache.put(batchId, logs) + } true } else { false From 84d3d27490556dc1de4e4bce3b6b19a75691f52e Mon Sep 17 00:00:00 2001 From: jerryshao Date: Tue, 20 Sep 2016 11:51:27 +0800 Subject: [PATCH 10/10] Fix test compile issue --- .../org/apache/spark/sql/streaming/FileStreamSourceSuite.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 25ce70e1167b..206ec1cd0af2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -890,6 +890,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest { List("keep2", "keep3")) assert(fileSource.getBatch(Some(LongOffset(1)), LongOffset(2)).as[String].collect() === List("keep3")) + true } ) }