Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1726,9 +1726,10 @@ package object config {
ConfigBuilder("spark.eventLog.compression.codec")
.doc("The codec used to compress event log. By default, Spark provides four codecs: " +
"lz4, lzf, snappy, and zstd. You can also use fully qualified class names to specify " +
"the codec. If this is not given, spark.io.compression.codec will be used.")
"the codec.")
.version("3.0.0")
.fallbackConf(IO_COMPRESSION_CODEC)
.stringConf
.createWithDefault("zstd")

private[spark] val BUFFER_SIZE =
ConfigBuilder("spark.buffer.size")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,22 +99,16 @@ abstract class EventLogFileWritersSuite extends SparkFunSuite with LocalSparkCon
}
}

test("spark.eventLog.compression.codec overrides spark.io.compression.codec") {
test("Use the defalut value of spark.eventLog.compression.codec") {
val conf = new SparkConf
conf.set(EVENT_LOG_COMPRESS, true)
val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)

val appId = "test"
val appAttemptId = None

// The default value is `spark.io.compression.codec`.
val writer = createWriter(appId, appAttemptId, testDirPath.toUri, conf, hadoopConf)
assert(writer.compressionCodecName.contains("lz4"))

// `spark.eventLog.compression.codec` overrides `spark.io.compression.codec`.
conf.set(EVENT_LOG_COMPRESSION_CODEC, "zstd")
val writer2 = createWriter(appId, appAttemptId, testDirPath.toUri, conf, hadoopConf)
assert(writer2.compressionCodecName.contains("zstd"))
assert(writer.compressionCodecName === EVENT_LOG_COMPRESSION_CODEC.defaultValue)
}

protected def readLinesFromEventLogFile(log: Path, fs: FileSystem): List[String] = {
Expand Down
5 changes: 2 additions & 3 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -1040,10 +1040,9 @@ Apart from these, the following properties are also available, and may be useful
</tr>
<tr>
<td><code>spark.eventLog.compression.codec</code></td>
<td></td>
<td>zstd</td>
<td>
The codec to compress logged events. If this is not given,
<code>spark.io.compression.codec</code> will be used.
The codec to compress logged events.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry for coming in late, was out last week, we may want to reference what other codecs can be used here. @dongjoon-hyun thoughts?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for review, @tgravescs . Sure, I'll make a documentation follow-up.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

</td>
<td>3.0.0</td>
</tr>
Expand Down
2 changes: 2 additions & 0 deletions docs/core-migration-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ license: |

## Upgrading from Core 3.1 to 3.2

- Since Spark 3.2, `spark.eventLog.compression.codec` is set to `zstd` by default which means Spark will not fallback to use `spark.io.compression.codec` anymore.

- Since Spark 3.2, `spark.storage.replication.proactive` is enabled by default which means Spark tries to replenish in case of the loss of cached RDD block replicas due to executor failures. To restore the behavior before Spark 3.2, you can set `spark.storage.replication.proactive` to `false`.

## Upgrading from Core 3.0 to 3.1
Expand Down