Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.json4s.NoTypeHints
import org.json4s.jackson.Serialization

import org.apache.spark.sql.SparkSession
import org.apache.spark.util.{SizeEstimator, Utils}

/**
* An abstract class for compactible metadata logs. It will write one log file for each batch.
Expand Down Expand Up @@ -177,16 +178,35 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag](
* corresponding `batchId` file. It will delete expired files as well if enabled.
*/
private def compact(batchId: Long, logs: Array[T]): Boolean = {
val validBatches = getValidBatchesBeforeCompactionBatch(batchId, compactInterval)
val allLogs = validBatches.flatMap { id =>
super.get(id).getOrElse {
throw new IllegalStateException(
s"${batchIdToPath(id)} doesn't exist when compacting batch $batchId " +
s"(compactInterval: $compactInterval)")
}
} ++ logs
val (allLogs, loadElapsedMs) = Utils.timeTakenMs {
val validBatches = getValidBatchesBeforeCompactionBatch(batchId, compactInterval)
validBatches.flatMap { id =>
super.get(id).getOrElse {
throw new IllegalStateException(
s"${batchIdToPath(id)} doesn't exist when compacting batch $batchId " +
s"(compactInterval: $compactInterval)")
}
} ++ logs
}
val compactedLogs = compactLogs(allLogs)

// Return false as there is another writer.
super.add(batchId, compactLogs(allLogs).toArray)
val (writeSucceed, writeElapsedMs) = Utils.timeTakenMs {
super.add(batchId, compactedLogs.toArray)
}

val elapsedMs = loadElapsedMs + writeElapsedMs
if (elapsedMs >= COMPACT_LATENCY_WARN_THRESHOLD_MS) {
logWarning(s"Compacting took $elapsedMs ms (load: $loadElapsedMs ms," +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: seems these two logs could combine into one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was actually one line and I split it because I felt it's a bit long to have it one-liner, as well as message of second line is only for WARN level.
But if it helps to correlate I would do. Let's have more voices on this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

two logs are fine I guess.

s" write: $writeElapsedMs ms) for compact batch $batchId")
logWarning(s"Loaded ${allLogs.size} entries (estimated ${SizeEstimator.estimate(allLogs)} " +
s"bytes in memory), and wrote ${compactedLogs.size} entries for compact batch $batchId")
} else {
logDebug(s"Compacting took $elapsedMs ms (load: $loadElapsedMs ms," +
s" write: $writeElapsedMs ms) for compact batch $batchId")
}

writeSucceed
}

/**
Expand Down Expand Up @@ -268,6 +288,7 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag](

object CompactibleFileStreamLog {
val COMPACT_FILE_SUFFIX = ".compact"
val COMPACT_LATENCY_WARN_THRESHOLD_MS = 2000
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this value from the practice? I guess it's fine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it's a heuristic - I think a batch spending more than 2 seconds only for compacting metadata should be noticed to the end users, as the latency here is opaque to end user if we don't log it and they will be questioning.


def getBatchIdFromFileName(fileName: String): Long = {
fileName.stripSuffix(COMPACT_FILE_SUFFIX).toLong
Expand Down