diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala index 905bce4d614e..10bcfe664980 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala @@ -28,6 +28,7 @@ import org.json4s.NoTypeHints import org.json4s.jackson.Serialization import org.apache.spark.sql.SparkSession +import org.apache.spark.util.{SizeEstimator, Utils} /** * An abstract class for compactible metadata logs. It will write one log file for each batch. @@ -177,16 +178,35 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag]( * corresponding `batchId` file. It will delete expired files as well if enabled. */ private def compact(batchId: Long, logs: Array[T]): Boolean = { - val validBatches = getValidBatchesBeforeCompactionBatch(batchId, compactInterval) - val allLogs = validBatches.flatMap { id => - super.get(id).getOrElse { - throw new IllegalStateException( - s"${batchIdToPath(id)} doesn't exist when compacting batch $batchId " + - s"(compactInterval: $compactInterval)") - } - } ++ logs + val (allLogs, loadElapsedMs) = Utils.timeTakenMs { + val validBatches = getValidBatchesBeforeCompactionBatch(batchId, compactInterval) + validBatches.flatMap { id => + super.get(id).getOrElse { + throw new IllegalStateException( + s"${batchIdToPath(id)} doesn't exist when compacting batch $batchId " + + s"(compactInterval: $compactInterval)") + } + } ++ logs + } + val compactedLogs = compactLogs(allLogs) + // Return false as there is another writer. - super.add(batchId, compactLogs(allLogs).toArray) + val (writeSucceed, writeElapsedMs) = Utils.timeTakenMs { + super.add(batchId, compactedLogs.toArray) + } + + val elapsedMs = loadElapsedMs + writeElapsedMs + if (elapsedMs >= COMPACT_LATENCY_WARN_THRESHOLD_MS) { + logWarning(s"Compacting took $elapsedMs ms (load: $loadElapsedMs ms," + + s" write: $writeElapsedMs ms) for compact batch $batchId") + logWarning(s"Loaded ${allLogs.size} entries (estimated ${SizeEstimator.estimate(allLogs)} " + + s"bytes in memory), and wrote ${compactedLogs.size} entries for compact batch $batchId") + } else { + logDebug(s"Compacting took $elapsedMs ms (load: $loadElapsedMs ms," + + s" write: $writeElapsedMs ms) for compact batch $batchId") + } + + writeSucceed } /** @@ -268,6 +288,7 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag]( object CompactibleFileStreamLog { val COMPACT_FILE_SUFFIX = ".compact" + val COMPACT_LATENCY_WARN_THRESHOLD_MS = 2000 def getBatchIdFromFileName(fileName: String): Long = { fileName.stripSuffix(COMPACT_FILE_SUFFIX).toLong