Skip to content

Commit 2e31e2c

Browse files
committed
[SPARK-34503][CORE] Use zstd for spark.eventLog.compression.codec by default
### What changes were proposed in this pull request? Apache Spark 3.0 introduced `spark.eventLog.compression.codec` configuration. For Apache Spark 3.2, this PR aims to set `zstd` as the default value for `spark.eventLog.compression.codec` configuration. This only affects creating a new log file. ### Why are the changes needed? The main purpose of event logs is archiving. Many logs are generated and occupy the storage, but most of them are never accessed by users. **1. Save storage resources (and money)** In general, ZSTD is much smaller than LZ4. For example, in case of TPCDS (Scale 200) log, ZSTD generates about 3 times smaller log files than LZ4. | CODEC | SIZE (bytes) | |---------|-------------| | LZ4 | 184001434| | ZSTD | 64522396| And, the plain file is 17.6 times bigger. ``` -rw-r--r-- 1 dongjoon staff 1135464691 Feb 21 22:31 spark-a1843ead29834f46b1125a03eca32679 -rw-r--r-- 1 dongjoon staff 64522396 Feb 21 22:31 spark-a1843ead29834f46b1125a03eca32679.zstd ``` **2. Better Usability** We cannot decompress Spark-generated LZ4 event log files via CLI while we can for ZSTD event log files. Spark's LZ4 event log files are inconvenient to some users who want to uncompress and access them. ``` $ lz4 -d spark-d3deba027bd34435ba849e14fc2c42ef.lz4 Decoding file spark-d3deba027bd34435ba849e14fc2c42ef Error 44 : Unrecognized header : file cannot be decoded ``` ``` $ zstd -d spark-a1843ead29834f46b1125a03eca32679.zstd spark-a1843ead29834f46b1125a03eca32679.zstd: 1135464691 bytes ``` **3. Speed** The following results are collected by running [lzbench](https://github.com/inikep/lzbench) on the above Spark event log. Note that - This is not a direct comparison of Spark compression/decompression codec. - `lzbench` is an in-memory benchmark. So, it doesn't show the benefit of the reduced network traffic due to the small size of ZSTD. Here, - To get ZSTD 1.4.8-1 result, `lzbench` `master` branch is used because Spark is using ZSTD 1.4.8. - To get LZ4 1.7.5 result, `lzbench` `v1.7` branch is used because Spark is using LZ4 1.7.1. ``` Compressor name Compress. Decompress. Compr. size Ratio Filename memcpy 7393 MB/s 7166 MB/s 1135464691 100.00 spark-a1843ead29834f46b1125a03eca32679 zstd 1.4.8 -1 1344 MB/s 3351 MB/s 56665767 4.99 spark-a1843ead29834f46b1125a03eca32679 lz4 1.7.5 1385 MB/s 4782 MB/s 127662168 11.24 spark-a1843ead29834f46b1125a03eca32679 ``` ### Does this PR introduce _any_ user-facing change? - No for the apps which doesn't use `spark.eventLog.compress` because `spark.eventLog.compress` is disabled by default. - No for the apps using `spark.eventLog.compression.codec` explicitly because this is a change of the default value. - Yes for the apps using `spark.eventLog.compress` without setting `spark.eventLog.compression.codec`. In this case, previously `spark.io.compression.codec` value was used whose default is `lz4`. So this JIRA issue, SPARK-34503, is labeled with `releasenotes`. ### How was this patch tested? Pass the updated UT. Closes #31618 from dongjoon-hyun/SPARK-34503. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 7f27d33 commit 2e31e2c

File tree

4 files changed

+9
-13
lines changed

4 files changed

+9
-13
lines changed

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1726,9 +1726,10 @@ package object config {
17261726
ConfigBuilder("spark.eventLog.compression.codec")
17271727
.doc("The codec used to compress event log. By default, Spark provides four codecs: " +
17281728
"lz4, lzf, snappy, and zstd. You can also use fully qualified class names to specify " +
1729-
"the codec. If this is not given, spark.io.compression.codec will be used.")
1729+
"the codec.")
17301730
.version("3.0.0")
1731-
.fallbackConf(IO_COMPRESSION_CODEC)
1731+
.stringConf
1732+
.createWithDefault("zstd")
17321733

17331734
private[spark] val BUFFER_SIZE =
17341735
ConfigBuilder("spark.buffer.size")

core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -99,22 +99,16 @@ abstract class EventLogFileWritersSuite extends SparkFunSuite with LocalSparkCon
9999
}
100100
}
101101

102-
test("spark.eventLog.compression.codec overrides spark.io.compression.codec") {
102+
test("Use the defalut value of spark.eventLog.compression.codec") {
103103
val conf = new SparkConf
104104
conf.set(EVENT_LOG_COMPRESS, true)
105105
val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
106106

107107
val appId = "test"
108108
val appAttemptId = None
109109

110-
// The default value is `spark.io.compression.codec`.
111110
val writer = createWriter(appId, appAttemptId, testDirPath.toUri, conf, hadoopConf)
112-
assert(writer.compressionCodecName.contains("lz4"))
113-
114-
// `spark.eventLog.compression.codec` overrides `spark.io.compression.codec`.
115-
conf.set(EVENT_LOG_COMPRESSION_CODEC, "zstd")
116-
val writer2 = createWriter(appId, appAttemptId, testDirPath.toUri, conf, hadoopConf)
117-
assert(writer2.compressionCodecName.contains("zstd"))
111+
assert(writer.compressionCodecName === EVENT_LOG_COMPRESSION_CODEC.defaultValue)
118112
}
119113

120114
protected def readLinesFromEventLogFile(log: Path, fs: FileSystem): List[String] = {

docs/configuration.md

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1040,10 +1040,9 @@ Apart from these, the following properties are also available, and may be useful
10401040
</tr>
10411041
<tr>
10421042
<td><code>spark.eventLog.compression.codec</code></td>
1043-
<td></td>
1043+
<td>zstd</td>
10441044
<td>
1045-
The codec to compress logged events. If this is not given,
1046-
<code>spark.io.compression.codec</code> will be used.
1045+
The codec to compress logged events.
10471046
</td>
10481047
<td>3.0.0</td>
10491048
</tr>

docs/core-migration-guide.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ license: |
2424

2525
## Upgrading from Core 3.1 to 3.2
2626

27+
- Since Spark 3.2, `spark.eventLog.compression.codec` is set to `zstd` by default which means Spark will not fallback to use `spark.io.compression.codec` anymore.
28+
2729
- Since Spark 3.2, `spark.storage.replication.proactive` is enabled by default which means Spark tries to replenish in case of the loss of cached RDD block replicas due to executor failures. To restore the behavior before Spark 3.2, you can set `spark.storage.replication.proactive` to `false`.
2830

2931
## Upgrading from Core 3.0 to 3.1

0 commit comments

Comments
 (0)