diff --git a/core/benchmarks/ZStandardBenchmark-jdk21-results.txt b/core/benchmarks/ZStandardBenchmark-jdk21-results.txt index 276d1dc5f0a8..60439ee359ab 100644 --- a/core/benchmarks/ZStandardBenchmark-jdk21-results.txt +++ b/core/benchmarks/ZStandardBenchmark-jdk21-results.txt @@ -2,48 +2,81 @@ Benchmark ZStandardCompressionCodec ================================================================================================ -OpenJDK 64-Bit Server VM 21.0.7+6-LTS on Linux 6.11.0-1018-azure +OpenJDK 64-Bit Server VM 21.0.8+9-LTS on Linux 6.11.0-1018-azure AMD EPYC 7763 64-Core Processor Benchmark ZStandardCompressionCodec: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative -------------------------------------------------------------------------------------------------------------------------------------- -Compression 10000 times at level 1 without buffer pool 646 668 20 0.0 64639.2 1.0X -Compression 10000 times at level 2 without buffer pool 715 716 2 0.0 71496.2 0.9X -Compression 10000 times at level 3 without buffer pool 810 818 7 0.0 81013.5 0.8X -Compression 10000 times at level 1 with buffer pool 603 604 0 0.0 60335.0 1.1X -Compression 10000 times at level 2 with buffer pool 638 641 3 0.0 63817.7 1.0X -Compression 10000 times at level 3 with buffer pool 739 740 1 0.0 73912.1 0.9X - -OpenJDK 64-Bit Server VM 21.0.7+6-LTS on Linux 6.11.0-1018-azure +Compression 10000 times at level 1 without buffer pool 645 664 13 0.0 64521.5 1.0X +Compression 10000 times at level 2 without buffer pool 700 706 6 0.0 70013.1 0.9X +Compression 10000 times at level 3 without buffer pool 811 813 3 0.0 81082.2 0.8X +Compression 10000 times at level 1 with buffer pool 597 598 1 0.0 59661.6 1.1X +Compression 10000 times at level 2 with buffer pool 630 632 2 0.0 63037.2 1.0X +Compression 10000 times at level 3 with buffer pool 735 742 6 0.0 73535.4 0.9X + +OpenJDK 64-Bit Server VM 21.0.8+9-LTS on Linux 6.11.0-1018-azure AMD EPYC 7763 64-Core Processor Benchmark ZStandardCompressionCodec: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------------------------ -Decompression 10000 times from level 1 without buffer pool 830 833 4 0.0 83030.7 1.0X -Decompression 10000 times from level 2 without buffer pool 832 833 1 0.0 83236.0 1.0X -Decompression 10000 times from level 3 without buffer pool 832 833 1 0.0 83183.1 1.0X -Decompression 10000 times from level 1 with buffer pool 758 759 1 0.0 75813.5 1.1X -Decompression 10000 times from level 2 with buffer pool 758 758 1 0.0 75767.1 1.1X -Decompression 10000 times from level 3 with buffer pool 757 758 1 0.0 75652.4 1.1X - -OpenJDK 64-Bit Server VM 21.0.7+6-LTS on Linux 6.11.0-1018-azure +Decompression 10000 times from level 1 without buffer pool 833 835 3 0.0 83296.4 1.0X +Decompression 10000 times from level 2 without buffer pool 831 832 2 0.0 83090.3 1.0X +Decompression 10000 times from level 3 without buffer pool 837 845 7 0.0 83701.2 1.0X +Decompression 10000 times from level 1 with buffer pool 762 764 1 0.0 76246.2 1.1X +Decompression 10000 times from level 2 with buffer pool 760 763 4 0.0 76020.8 1.1X +Decompression 10000 times from level 3 with buffer pool 761 763 1 0.0 76128.2 1.1X + +OpenJDK 64-Bit Server VM 21.0.8+9-LTS on Linux 6.11.0-1018-azure AMD EPYC 7763 64-Core Processor Parallel Compression at level 3: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Parallel Compression with 0 workers 66 66 0 0.0 512667.9 1.0X -Parallel Compression with 1 workers 56 58 2 0.0 435183.0 1.2X -Parallel Compression with 2 workers 46 47 1 0.0 356034.8 1.4X -Parallel Compression with 4 workers 41 43 1 0.0 318331.7 1.6X -Parallel Compression with 8 workers 44 46 1 0.0 342564.5 1.5X -Parallel Compression with 16 workers 48 51 2 0.0 371266.4 1.4X - -OpenJDK 64-Bit Server VM 21.0.7+6-LTS on Linux 6.11.0-1018-azure +Parallel Compression with 0 workers 66 68 1 0.0 513077.9 1.0X +Parallel Compression with 1 workers 54 58 3 0.0 424042.7 1.2X +Parallel Compression with 2 workers 44 47 1 0.0 347613.7 1.5X +Parallel Compression with 4 workers 41 43 1 0.0 318932.6 1.6X +Parallel Compression with 8 workers 43 45 1 0.0 337266.1 1.5X +Parallel Compression with 16 workers 47 50 1 0.0 363883.0 1.4X + +OpenJDK 64-Bit Server VM 21.0.8+9-LTS on Linux 6.11.0-1018-azure AMD EPYC 7763 64-Core Processor Parallel Compression at level 9: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Parallel Compression with 0 workers 236 237 1 0.0 1847245.2 1.0X -Parallel Compression with 1 workers 251 252 2 0.0 1961753.5 0.9X -Parallel Compression with 2 workers 141 148 4 0.0 1100274.2 1.7X -Parallel Compression with 4 workers 129 133 3 0.0 1009465.5 1.8X -Parallel Compression with 8 workers 135 139 3 0.0 1054496.8 1.8X -Parallel Compression with 16 workers 135 139 6 0.0 1051577.1 1.8X +Parallel Compression with 0 workers 236 241 5 0.0 1841027.0 1.0X +Parallel Compression with 1 workers 259 265 3 0.0 2025243.3 0.9X +Parallel Compression with 2 workers 140 151 9 0.0 1095661.5 1.7X +Parallel Compression with 4 workers 132 140 6 0.0 1027414.9 1.8X +Parallel Compression with 8 workers 138 143 3 0.0 1079814.7 1.7X +Parallel Compression with 16 workers 135 139 4 0.0 1053311.9 1.7X + +OpenJDK 64-Bit Server VM 21.0.8+9-LTS on Linux 6.11.0-1018-azure +AMD EPYC 7763 64-Core Processor +Compression at level 1: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +Compression by strategy -1 54 57 2 0.0 422355.5 1.0X +Compression by strategy 1 53 55 1 0.0 411561.0 1.0X +Compression by strategy 3 87 89 1 0.0 676134.2 0.6X +Compression by strategy 5 231 233 1 0.0 1805263.1 0.2X +Compression by strategy 7 252 253 1 0.0 1969415.3 0.2X +Compression by strategy 9 252 253 1 0.0 1965181.2 0.2X + +OpenJDK 64-Bit Server VM 21.0.8+9-LTS on Linux 6.11.0-1018-azure +AMD EPYC 7763 64-Core Processor +Compression at level 3: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +Compression by strategy -1 65 67 1 0.0 511151.0 1.0X +Compression by strategy 1 49 51 1 0.0 380610.7 1.3X +Compression by strategy 3 85 87 1 0.0 665865.9 0.8X +Compression by strategy 5 236 240 3 0.0 1841743.1 0.3X +Compression by strategy 7 272 274 1 0.0 2128783.4 0.2X +Compression by strategy 9 271 274 2 0.0 2119704.9 0.2X + +OpenJDK 64-Bit Server VM 21.0.8+9-LTS on Linux 6.11.0-1018-azure +AMD EPYC 7763 64-Core Processor +Compression at level 9: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +Compression by strategy -1 233 234 1 0.0 1821157.0 1.0X +Compression by strategy 1 48 50 1 0.0 377701.6 4.8X +Compression by strategy 3 86 87 1 0.0 670743.4 2.7X +Compression by strategy 5 236 241 5 0.0 1844175.2 1.0X +Compression by strategy 7 274 277 2 0.0 2141909.1 0.9X +Compression by strategy 9 275 277 2 0.0 2148129.3 0.8X diff --git a/core/benchmarks/ZStandardBenchmark-results.txt b/core/benchmarks/ZStandardBenchmark-results.txt index 891932f96d6f..75265f1080ab 100644 --- a/core/benchmarks/ZStandardBenchmark-results.txt +++ b/core/benchmarks/ZStandardBenchmark-results.txt @@ -2,48 +2,81 @@ Benchmark ZStandardCompressionCodec ================================================================================================ -OpenJDK 64-Bit Server VM 17.0.15+6-LTS on Linux 6.11.0-1018-azure +OpenJDK 64-Bit Server VM 17.0.16+8-LTS on Linux 6.11.0-1018-azure AMD EPYC 7763 64-Core Processor Benchmark ZStandardCompressionCodec: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative -------------------------------------------------------------------------------------------------------------------------------------- -Compression 10000 times at level 1 without buffer pool 664 665 1 0.0 66403.7 1.0X -Compression 10000 times at level 2 without buffer pool 699 700 2 0.0 69853.1 1.0X -Compression 10000 times at level 3 without buffer pool 788 793 5 0.0 78810.6 0.8X -Compression 10000 times at level 1 with buffer pool 582 583 1 0.0 58212.7 1.1X -Compression 10000 times at level 2 with buffer pool 612 614 2 0.0 61216.1 1.1X -Compression 10000 times at level 3 with buffer pool 718 719 1 0.0 71825.7 0.9X - -OpenJDK 64-Bit Server VM 17.0.15+6-LTS on Linux 6.11.0-1018-azure +Compression 10000 times at level 1 without buffer pool 655 656 1 0.0 65523.2 1.0X +Compression 10000 times at level 2 without buffer pool 695 697 1 0.0 69530.7 0.9X +Compression 10000 times at level 3 without buffer pool 817 824 12 0.0 81694.8 0.8X +Compression 10000 times at level 1 with buffer pool 580 581 0 0.0 58038.0 1.1X +Compression 10000 times at level 2 with buffer pool 612 615 3 0.0 61246.1 1.1X +Compression 10000 times at level 3 with buffer pool 721 734 11 0.0 72106.4 0.9X + +OpenJDK 64-Bit Server VM 17.0.16+8-LTS on Linux 6.11.0-1018-azure AMD EPYC 7763 64-Core Processor Benchmark ZStandardCompressionCodec: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------------------------ -Decompression 10000 times from level 1 without buffer pool 601 602 1 0.0 60084.3 1.0X -Decompression 10000 times from level 2 without buffer pool 600 602 2 0.0 59976.6 1.0X -Decompression 10000 times from level 3 without buffer pool 600 610 19 0.0 59982.0 1.0X -Decompression 10000 times from level 1 with buffer pool 543 544 1 0.0 54315.8 1.1X -Decompression 10000 times from level 2 with buffer pool 542 543 0 0.0 54239.8 1.1X -Decompression 10000 times from level 3 with buffer pool 543 544 1 0.0 54283.7 1.1X - -OpenJDK 64-Bit Server VM 17.0.15+6-LTS on Linux 6.11.0-1018-azure +Decompression 10000 times from level 1 without buffer pool 594 595 1 0.0 59350.1 1.0X +Decompression 10000 times from level 2 without buffer pool 594 595 1 0.0 59430.0 1.0X +Decompression 10000 times from level 3 without buffer pool 594 595 1 0.0 59357.1 1.0X +Decompression 10000 times from level 1 with buffer pool 542 543 1 0.0 54223.9 1.1X +Decompression 10000 times from level 2 with buffer pool 542 543 1 0.0 54227.0 1.1X +Decompression 10000 times from level 3 with buffer pool 542 543 1 0.0 54218.2 1.1X + +OpenJDK 64-Bit Server VM 17.0.16+8-LTS on Linux 6.11.0-1018-azure AMD EPYC 7763 64-Core Processor Parallel Compression at level 3: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Parallel Compression with 0 workers 66 68 1 0.0 518331.5 1.0X -Parallel Compression with 1 workers 54 58 3 0.0 423384.3 1.2X -Parallel Compression with 2 workers 45 47 1 0.0 353024.7 1.5X -Parallel Compression with 4 workers 42 43 1 0.0 325797.6 1.6X -Parallel Compression with 8 workers 44 46 1 0.0 345601.8 1.5X -Parallel Compression with 16 workers 47 49 1 0.0 365954.9 1.4X - -OpenJDK 64-Bit Server VM 17.0.15+6-LTS on Linux 6.11.0-1018-azure +Parallel Compression with 0 workers 65 67 1 0.0 511709.4 1.0X +Parallel Compression with 1 workers 54 57 2 0.0 421990.0 1.2X +Parallel Compression with 2 workers 44 46 1 0.0 345009.1 1.5X +Parallel Compression with 4 workers 41 43 1 0.0 320346.1 1.6X +Parallel Compression with 8 workers 43 45 1 0.0 333297.2 1.5X +Parallel Compression with 16 workers 48 50 1 0.0 374714.9 1.4X + +OpenJDK 64-Bit Server VM 17.0.16+8-LTS on Linux 6.11.0-1018-azure AMD EPYC 7763 64-Core Processor Parallel Compression at level 9: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Parallel Compression with 0 workers 235 236 1 0.0 1834259.5 1.0X -Parallel Compression with 1 workers 258 261 3 0.0 2017482.6 0.9X -Parallel Compression with 2 workers 142 150 7 0.0 1110600.9 1.7X -Parallel Compression with 4 workers 131 134 2 0.0 1025190.7 1.8X -Parallel Compression with 8 workers 137 140 3 0.0 1072618.8 1.7X -Parallel Compression with 16 workers 137 140 3 0.0 1066912.9 1.7X +Parallel Compression with 0 workers 230 231 1 0.0 1794277.5 1.0X +Parallel Compression with 1 workers 254 257 3 0.0 1981037.2 0.9X +Parallel Compression with 2 workers 142 150 16 0.0 1106955.4 1.6X +Parallel Compression with 4 workers 129 134 2 0.0 1010757.7 1.8X +Parallel Compression with 8 workers 136 140 2 0.0 1066319.6 1.7X +Parallel Compression with 16 workers 135 139 2 0.0 1055941.3 1.7X + +OpenJDK 64-Bit Server VM 17.0.16+8-LTS on Linux 6.11.0-1018-azure +AMD EPYC 7763 64-Core Processor +Compression at level 1: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +Compression by strategy -1 54 55 1 0.0 420584.9 1.0X +Compression by strategy 1 54 55 1 0.0 424684.0 1.0X +Compression by strategy 3 88 89 1 0.0 688294.1 0.6X +Compression by strategy 5 228 228 1 0.0 1779497.2 0.2X +Compression by strategy 7 254 255 1 0.0 1980652.5 0.2X +Compression by strategy 9 253 254 1 0.0 1974781.6 0.2X + +OpenJDK 64-Bit Server VM 17.0.16+8-LTS on Linux 6.11.0-1018-azure +AMD EPYC 7763 64-Core Processor +Compression at level 3: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +Compression by strategy -1 66 67 1 0.0 514817.8 1.0X +Compression by strategy 1 50 52 1 0.0 393785.0 1.3X +Compression by strategy 3 88 89 1 0.0 685654.7 0.8X +Compression by strategy 5 229 230 1 0.0 1789743.4 0.3X +Compression by strategy 7 275 276 1 0.0 2147629.7 0.2X +Compression by strategy 9 275 276 1 0.0 2146254.8 0.2X + +OpenJDK 64-Bit Server VM 17.0.16+8-LTS on Linux 6.11.0-1018-azure +AMD EPYC 7763 64-Core Processor +Compression at level 9: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +Compression by strategy -1 230 231 1 0.0 1798838.7 1.0X +Compression by strategy 1 50 52 1 0.0 392246.0 4.6X +Compression by strategy 3 87 89 1 0.0 682833.4 2.6X +Compression by strategy 5 232 233 1 0.0 1809273.7 1.0X +Compression by strategy 7 275 278 1 0.0 2151672.9 0.8X +Compression by strategy 9 275 276 1 0.0 2147982.2 0.8X diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 9e2eb4e0b56a..2c7c2f120b93 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -2125,6 +2125,15 @@ package object config { .intConf .createWithDefault(1) + private[spark] val IO_COMPRESSION_ZSTD_STRATEGY = + ConfigBuilder("spark.io.compression.zstd.strategy") + .doc("Compression strategy for Zstd compression codec. The higher the value is, the more " + + "complex it becomes, usually resulting stronger but slower compression or higher CPU " + + "cost.") + .version("4.1.0") + .intConf + .createOptional + private[spark] val IO_COMPRESSION_LZF_PARALLEL = ConfigBuilder("spark.io.compression.lzf.parallel.enabled") .doc("When true, LZF compression will use multiple threads to compress data in parallel.") diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala index 233228a9c6d4..b81e46667323 100644 --- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala +++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala @@ -228,6 +228,7 @@ class ZStdCompressionCodec(conf: SparkConf) extends CompressionCodec { // Default compression level for zstd compression to 1 because it is // fastest of all with reasonably high compression ratio. private val level = conf.get(IO_COMPRESSION_ZSTD_LEVEL) + private val strategy = conf.get(IO_COMPRESSION_ZSTD_STRATEGY) private val bufferPool = if (conf.get(IO_COMPRESSION_ZSTD_BUFFERPOOL_ENABLED)) { RecyclingBufferPool.INSTANCE @@ -241,6 +242,7 @@ class ZStdCompressionCodec(conf: SparkConf) extends CompressionCodec { // Wrap the zstd output stream in a buffered output stream, so that we can // avoid overhead excessive of JNI call while trying to compress small amount of data. val os = new ZstdOutputStreamNoFinalizer(s, bufferPool).setLevel(level).setWorkers(workers) + strategy.foreach(os.setStrategy) new BufferedOutputStream(os, bufferSize) } diff --git a/core/src/test/scala/org/apache/spark/io/ZStandardBenchmark.scala b/core/src/test/scala/org/apache/spark/io/ZStandardBenchmark.scala index e5b7bb927831..1e52b12bb6e5 100644 --- a/core/src/test/scala/org/apache/spark/io/ZStandardBenchmark.scala +++ b/core/src/test/scala/org/apache/spark/io/ZStandardBenchmark.scala @@ -21,7 +21,7 @@ import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectOutputStream} import org.apache.spark.SparkConf import org.apache.spark.benchmark.{Benchmark, BenchmarkBase} -import org.apache.spark.internal.config.{IO_COMPRESSION_ZSTD_BUFFERPOOL_ENABLED, IO_COMPRESSION_ZSTD_BUFFERSIZE, IO_COMPRESSION_ZSTD_LEVEL, IO_COMPRESSION_ZSTD_WORKERS} +import org.apache.spark.internal.config._ /** @@ -50,6 +50,7 @@ object ZStandardBenchmark extends BenchmarkBase { decompressionBenchmark(benchmark2, N) benchmark2.run() parallelCompressionBenchmark() + strategyCompressionBenchmark() } } @@ -127,4 +128,30 @@ object ZStandardBenchmark extends BenchmarkBase { benchmark.run() } } + + private def strategyCompressionBenchmark(): Unit = { + val numberOfLargeObjectToWrite = 128 + val data: Array[Byte] = (1 until 256 * 1024 * 1024).map(_.toByte).toArray + + Seq(1, 3, 9).foreach { level => + val benchmark = new Benchmark( + s"Compression at level $level", numberOfLargeObjectToWrite, output = output) + Seq(-1, 1, 3, 5, 7, 9).foreach { strategy => + val conf = new SparkConf(false).set(IO_COMPRESSION_ZSTD_LEVEL, level) + if (strategy >= 0) { + conf.set(IO_COMPRESSION_ZSTD_STRATEGY, strategy) + } + benchmark.addCase(s"Compression by strategy $strategy") { _ => + val baos = new ByteArrayOutputStream() + val zcos = new ZStdCompressionCodec(conf).compressedOutputStream(baos) + val oos = new ObjectOutputStream(zcos) + 1 to numberOfLargeObjectToWrite foreach { _ => + oos.writeObject(data) + } + oos.close() + } + } + benchmark.run() + } + } }