-
Notifications
You must be signed in to change notification settings - Fork 29.1k
[SPARK-52078][TEST] Add ZStandardTPCDSDataBenchmark #50857
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
b93601f
2fcafbb
1d8b34c
4c2c253
c05ff1e
73c4fbb
46e3d9e
94d865a
c37bace
92d3a7f
279bf8a
28b9bee
85d9ff6
9df06f1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -66,8 +66,8 @@ jobs: | |
|
|
||
| # Any TPC-DS related updates on this job need to be applied to tpcds-1g job of build_and_test.yml as well | ||
| tpcds-1g-gen: | ||
| name: "Generate an input dataset for TPCDSQueryBenchmark with SF=1" | ||
| if: contains(inputs.class, 'TPCDSQueryBenchmark') || contains(inputs.class, '*') | ||
| name: "Generate an TPC-DS dataset with SF=1" | ||
| if: contains(inputs.class, 'TPCDSQueryBenchmark') || contains(inputs.class, 'ZStandardTPCDSDataBenchmark') || contains(inputs.class, '*') | ||
| runs-on: ubuntu-latest | ||
| env: | ||
| SPARK_LOCAL_IP: localhost | ||
|
|
@@ -98,7 +98,9 @@ jobs: | |
| id: cache-tpcds-sf-1 | ||
| uses: actions/cache@v4 | ||
| with: | ||
| path: ./tpcds-sf-1 | ||
| path: | | ||
| ./tpcds-sf-1 | ||
| ./tpcds-sf-1-text | ||
| key: tpcds-${{ hashFiles('.github/workflows/benchmark.yml', 'sql/core/src/test/scala/org/apache/spark/sql/TPCDSSchema.scala') }} | ||
| - name: Checkout tpcds-kit repository | ||
| if: steps.cache-tpcds-sf-1.outputs.cache-hit != 'true' | ||
|
|
@@ -118,7 +120,9 @@ jobs: | |
| java-version: ${{ inputs.jdk }} | ||
| - name: Generate TPC-DS (SF=1) table data | ||
| if: steps.cache-tpcds-sf-1.outputs.cache-hit != 'true' | ||
| run: build/sbt "sql/Test/runMain org.apache.spark.sql.GenTPCDSData --dsdgenDir `pwd`/tpcds-kit/tools --location `pwd`/tpcds-sf-1 --scaleFactor 1 --numPartitions 1 --overwrite" | ||
| run: | | ||
| build/sbt "sql/Test/runMain org.apache.spark.sql.GenTPCDSData --dsdgenDir `pwd`/tpcds-kit/tools --location `pwd`/tpcds-sf-1 --scaleFactor 1 --numPartitions 1 --overwrite" | ||
| mkdir -p `pwd`/tpcds-sf-1-text && rm -f `pwd`/tpcds-sf-1-text/* && `pwd`/tpcds-kit/tools/dsdgen -DISTRIBUTIONS `pwd`/tpcds-kit/tools/tpcds.idx -SCALE 1 -DIR `pwd`/tpcds-sf-1-text | ||
|
|
||
| benchmark: | ||
| name: "Run benchmarks: ${{ inputs.class }} (JDK ${{ inputs.jdk }}, Scala ${{ inputs.scala }}, ${{ matrix.split }} out of ${{ inputs.num-splits }} splits)" | ||
|
|
@@ -138,6 +142,7 @@ jobs: | |
| # To prevent spark.test.home not being set. See more detail in SPARK-36007. | ||
| SPARK_HOME: ${{ github.workspace }} | ||
| SPARK_TPCDS_DATA: ${{ github.workspace }}/tpcds-sf-1 | ||
| SPARK_TPCDS_DATA_TEXT: ${{ github.workspace }}/tpcds-sf-1-text | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the output data size
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hi @luben, in this round, I'm trying to use TPCDS-generated data for the zstd compression benchmark. The data can be generated by the following steps:
And my local test shows that zstd-jni 1.5.6 and 1.5.7 are basically at the same level, and 1.5.7 is a little bit faster in some cases. |
||
| steps: | ||
| - name: Checkout Spark repository | ||
| uses: actions/checkout@v4 | ||
|
|
@@ -167,11 +172,13 @@ jobs: | |
| distribution: zulu | ||
| java-version: ${{ inputs.jdk }} | ||
| - name: Cache TPC-DS generated data | ||
| if: contains(inputs.class, 'TPCDSQueryBenchmark') || contains(inputs.class, '*') | ||
| if: contains(inputs.class, 'TPCDSQueryBenchmark') || contains(inputs.class, 'ZStandardTPCDSDataBenchmark') || contains(inputs.class, '*') | ||
| id: cache-tpcds-sf-1 | ||
| uses: actions/cache@v4 | ||
| with: | ||
| path: ./tpcds-sf-1 | ||
| path: | | ||
| ./tpcds-sf-1 | ||
| ./tpcds-sf-1-text | ||
| key: tpcds-${{ hashFiles('.github/workflows/benchmark.yml', 'sql/core/src/test/scala/org/apache/spark/sql/TPCDSSchema.scala') }} | ||
| - name: Run benchmarks | ||
| run: | | ||
|
|
@@ -188,7 +195,7 @@ jobs: | |
| # To keep the directory structure and file permissions, tar them | ||
| # See also https://github.com/actions/upload-artifact#maintaining-file-permissions-and-case-sensitive-files | ||
| echo "Preparing the benchmark results:" | ||
| tar -cvf benchmark-results-${{ inputs.jdk }}-${{ inputs.scala }}.tar `git diff --name-only` `git ls-files --others --exclude=tpcds-sf-1 --exclude-standard` | ||
| tar -cvf benchmark-results-${{ inputs.jdk }}-${{ inputs.scala }}.tar `git diff --name-only` `git ls-files --others --exclude=tpcds-sf-1 --exclude=tpcds-sf-1-text --exclude-standard` | ||
| - name: Upload benchmark results | ||
| uses: actions/upload-artifact@v4 | ||
| with: | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,49 @@ | ||
| ================================================================================================ | ||
| Benchmark ZStandardCompressionCodec | ||
| ================================================================================================ | ||
|
|
||
| OpenJDK 64-Bit Server VM 21.0.7+6-LTS on Linux 6.11.0-1013-azure | ||
| AMD EPYC 7763 64-Core Processor | ||
| Benchmark ZStandardCompressionCodec: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative | ||
| ---------------------------------------------------------------------------------------------------------------------------------- | ||
| Compression 4 times at level 1 without buffer pool 2539 2541 2 0.0 634832028.5 1.0X | ||
| Compression 4 times at level 2 without buffer pool 4157 4188 44 0.0 1039277864.3 0.6X | ||
| Compression 4 times at level 3 without buffer pool 6091 6095 5 0.0 1522781623.3 0.4X | ||
| Compression 4 times at level 1 with buffer pool 2536 2540 5 0.0 634097186.3 1.0X | ||
| Compression 4 times at level 2 with buffer pool 4147 4150 4 0.0 1036639857.0 0.6X | ||
| Compression 4 times at level 3 with buffer pool 6097 6099 3 0.0 1524134426.0 0.4X | ||
|
|
||
| OpenJDK 64-Bit Server VM 21.0.7+6-LTS on Linux 6.11.0-1013-azure | ||
| AMD EPYC 7763 64-Core Processor | ||
| Benchmark ZStandardCompressionCodec: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative | ||
| -------------------------------------------------------------------------------------------------------------------------------------- | ||
| Decompression 4 times from level 1 without buffer pool 886 902 23 0.0 221484611.2 1.0X | ||
| Decompression 4 times from level 2 without buffer pool 1109 1130 30 0.0 277257788.3 0.8X | ||
| Decompression 4 times from level 3 without buffer pool 1336 1359 32 0.0 334102921.8 0.7X | ||
| Decompression 4 times from level 1 with buffer pool 858 868 9 0.0 214401966.0 1.0X | ||
| Decompression 4 times from level 2 with buffer pool 1131 1140 12 0.0 282739707.3 0.8X | ||
| Decompression 4 times from level 3 with buffer pool 1366 1375 12 0.0 341571527.0 0.6X | ||
|
|
||
| OpenJDK 64-Bit Server VM 21.0.7+6-LTS on Linux 6.11.0-1013-azure | ||
| AMD EPYC 7763 64-Core Processor | ||
| Parallel Compression at level 3: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative | ||
| ------------------------------------------------------------------------------------------------------------------------ | ||
| Parallel Compression with 0 workers 2030 2033 4 0.0 507451934.3 1.0X | ||
| Parallel Compression with 1 workers 1879 1882 4 0.0 469750208.3 1.1X | ||
| Parallel Compression with 2 workers 969 976 10 0.0 242174332.5 2.1X | ||
| Parallel Compression with 4 workers 711 713 2 0.0 177820489.8 2.9X | ||
| Parallel Compression with 8 workers 847 898 53 0.0 211649152.3 2.4X | ||
| Parallel Compression with 16 workers 848 859 10 0.0 211876140.0 2.4X | ||
|
|
||
| OpenJDK 64-Bit Server VM 21.0.7+6-LTS on Linux 6.11.0-1013-azure | ||
| AMD EPYC 7763 64-Core Processor | ||
| Parallel Compression at level 9: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative | ||
| ------------------------------------------------------------------------------------------------------------------------ | ||
| Parallel Compression with 0 workers 8266 8278 16 0.0 2066565583.8 1.0X | ||
| Parallel Compression with 1 workers 6933 6941 10 0.0 1733356075.3 1.2X | ||
| Parallel Compression with 2 workers 3690 3691 1 0.0 922481882.3 2.2X | ||
| Parallel Compression with 4 workers 3223 3231 11 0.0 805643345.5 2.6X | ||
| Parallel Compression with 8 workers 3652 3656 7 0.0 912916115.3 2.3X | ||
| Parallel Compression with 16 workers 3912 3950 54 0.0 977901486.2 2.1X | ||
|
|
||
|
|
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,49 @@ | ||
| ================================================================================================ | ||
| Benchmark ZStandardCompressionCodec | ||
| ================================================================================================ | ||
|
|
||
| OpenJDK 64-Bit Server VM 17.0.15+6-LTS on Linux 6.11.0-1014-azure | ||
| AMD EPYC 7763 64-Core Processor | ||
| Benchmark ZStandardCompressionCodec: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative | ||
| ---------------------------------------------------------------------------------------------------------------------------------- | ||
| Compression 4 times at level 1 without buffer pool 2518 2519 1 0.0 629582183.5 1.0X | ||
| Compression 4 times at level 2 without buffer pool 4111 4111 1 0.0 1027767031.5 0.6X | ||
| Compression 4 times at level 3 without buffer pool 6146 6160 19 0.0 1536532700.3 0.4X | ||
| Compression 4 times at level 1 with buffer pool 2517 2517 1 0.0 629208370.5 1.0X | ||
| Compression 4 times at level 2 with buffer pool 4105 4112 11 0.0 1026190298.0 0.6X | ||
| Compression 4 times at level 3 with buffer pool 6154 6157 5 0.0 1538378430.0 0.4X | ||
|
|
||
| OpenJDK 64-Bit Server VM 17.0.15+6-LTS on Linux 6.11.0-1014-azure | ||
| AMD EPYC 7763 64-Core Processor | ||
| Benchmark ZStandardCompressionCodec: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative | ||
| -------------------------------------------------------------------------------------------------------------------------------------- | ||
| Decompression 4 times from level 1 without buffer pool 900 903 4 0.0 225055920.0 1.0X | ||
| Decompression 4 times from level 2 without buffer pool 1161 1163 3 0.0 290146657.0 0.8X | ||
| Decompression 4 times from level 3 without buffer pool 1399 1406 10 0.0 349650877.8 0.6X | ||
| Decompression 4 times from level 1 with buffer pool 899 901 2 0.0 224627803.0 1.0X | ||
| Decompression 4 times from level 2 with buffer pool 1165 1166 1 0.0 291335735.3 0.8X | ||
| Decompression 4 times from level 3 with buffer pool 1398 1401 4 0.0 349578394.0 0.6X | ||
|
|
||
| OpenJDK 64-Bit Server VM 17.0.15+6-LTS on Linux 6.11.0-1014-azure | ||
| AMD EPYC 7763 64-Core Processor | ||
| Parallel Compression at level 3: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative | ||
| ------------------------------------------------------------------------------------------------------------------------ | ||
| Parallel Compression with 0 workers 2061 2067 8 0.0 515297811.0 1.0X | ||
| Parallel Compression with 1 workers 1843 1844 1 0.0 460705797.3 1.1X | ||
| Parallel Compression with 2 workers 961 972 16 0.0 240177085.3 2.1X | ||
| Parallel Compression with 4 workers 729 731 2 0.0 182208026.2 2.8X | ||
| Parallel Compression with 8 workers 781 800 18 0.0 195212932.0 2.6X | ||
| Parallel Compression with 16 workers 865 871 6 0.0 216145271.5 2.4X | ||
|
|
||
| OpenJDK 64-Bit Server VM 17.0.15+6-LTS on Linux 6.11.0-1014-azure | ||
| AMD EPYC 7763 64-Core Processor | ||
| Parallel Compression at level 9: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative | ||
| ------------------------------------------------------------------------------------------------------------------------ | ||
| Parallel Compression with 0 workers 8557 8635 110 0.0 2139353975.8 1.0X | ||
| Parallel Compression with 1 workers 7156 7193 52 0.0 1789023949.5 1.2X | ||
| Parallel Compression with 2 workers 3855 3861 9 0.0 963635046.3 2.2X | ||
| Parallel Compression with 4 workers 3248 3253 8 0.0 811889324.8 2.6X | ||
| Parallel Compression with 8 workers 3667 3671 6 0.0 916671282.5 2.3X | ||
| Parallel Compression with 16 workers 3799 3845 65 0.0 949757174.5 2.3X | ||
|
|
||
|
|
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,123 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.io | ||
|
|
||
| import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectOutputStream, OutputStream} | ||
| import java.nio.file.{Files, Paths} | ||
|
|
||
| import org.apache.spark.SparkConf | ||
| import org.apache.spark.benchmark.{Benchmark, BenchmarkBase} | ||
| import org.apache.spark.internal.config.{IO_COMPRESSION_ZSTD_BUFFERPOOL_ENABLED, IO_COMPRESSION_ZSTD_LEVEL, IO_COMPRESSION_ZSTD_WORKERS} | ||
|
|
||
| /** | ||
| * Benchmark for ZStandard codec performance. | ||
| * {{{ | ||
| * To run this benchmark: | ||
| * 1. without sbt: bin/spark-submit --class <this class> <spark core test jar> | ||
| * 2. build/sbt "core/Test/runMain <this class>" | ||
| * 3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "core/Test/runMain <this class>" | ||
| * Results will be written to "benchmarks/ZStandardTPCDSDataBenchmark-results.txt". | ||
| * }}} | ||
| */ | ||
| object ZStandardTPCDSDataBenchmark extends BenchmarkBase { | ||
|
|
||
| val N = 4 | ||
|
|
||
| // the size of TPCDS catalog_sales.dat (SF1) is about 283M | ||
| val data = Files.readAllBytes(Paths.get(sys.env("SPARK_TPCDS_DATA_TEXT"), "catalog_sales.dat")) | ||
|
|
||
| override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { | ||
| val name = "Benchmark ZStandardCompressionCodec" | ||
| runBenchmark(name) { | ||
| val benchmark1 = new Benchmark(name, N, output = output) | ||
| compressionBenchmark(benchmark1, N) | ||
| benchmark1.run() | ||
|
|
||
| val benchmark2 = new Benchmark(name, N, output = output) | ||
| decompressionBenchmark(benchmark2, N) | ||
| benchmark2.run() | ||
| parallelCompressionBenchmark() | ||
| } | ||
| } | ||
|
|
||
| private def compressionBenchmark(benchmark: Benchmark, N: Int): Unit = { | ||
| Seq(false, true).foreach { enablePool => | ||
| Seq(1, 2, 3).foreach { level => | ||
| val conf = new SparkConf(false) | ||
| .set(IO_COMPRESSION_ZSTD_BUFFERPOOL_ENABLED, enablePool) | ||
| .set(IO_COMPRESSION_ZSTD_LEVEL, level) | ||
| val condition = if (enablePool) "with" else "without" | ||
| benchmark.addCase(s"Compression $N times at level $level $condition buffer pool") { _ => | ||
| (1 until N).foreach { _ => | ||
| val os = new ZStdCompressionCodec(conf) | ||
| .compressedOutputStream(OutputStream.nullOutputStream()) | ||
| os.write(data) | ||
| os.close() | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| private def decompressionBenchmark(benchmark: Benchmark, N: Int): Unit = { | ||
| Seq(false, true).foreach { enablePool => | ||
| Seq(1, 2, 3).foreach { level => | ||
| val conf = new SparkConf(false) | ||
| .set(IO_COMPRESSION_ZSTD_BUFFERPOOL_ENABLED, enablePool) | ||
| .set(IO_COMPRESSION_ZSTD_LEVEL, level) | ||
| val outputStream = new ByteArrayOutputStream() | ||
| val out = new ZStdCompressionCodec(conf).compressedOutputStream(outputStream) | ||
| out.write(data) | ||
| out.close() | ||
| val bytes = outputStream.toByteArray | ||
|
|
||
| val condition = if (enablePool) "with" else "without" | ||
| benchmark.addCase(s"Decompression $N times from level $level $condition buffer pool") { _ => | ||
| (1 until N).foreach { _ => | ||
| val bais = new ByteArrayInputStream(bytes) | ||
| val is = new ZStdCompressionCodec(conf).compressedInputStream(bais) | ||
| is.readAllBytes() | ||
| is.close() | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| private def parallelCompressionBenchmark(): Unit = { | ||
| Seq(3, 9).foreach { level => | ||
| val benchmark = new Benchmark( | ||
| s"Parallel Compression at level $level", N, output = output) | ||
| Seq(0, 1, 2, 4, 8, 16).foreach { workers => | ||
| val conf = new SparkConf(false) | ||
| .set(IO_COMPRESSION_ZSTD_LEVEL, level) | ||
| .set(IO_COMPRESSION_ZSTD_WORKERS, workers) | ||
| benchmark.addCase(s"Parallel Compression with $workers workers") { _ => | ||
| val os = OutputStream.nullOutputStream() | ||
| val zcos = new ZStdCompressionCodec(conf).compressedOutputStream(os) | ||
| val oos = new ObjectOutputStream(zcos) | ||
| 1 to N foreach { _ => | ||
| oos.writeObject(data) | ||
| } | ||
| oos.close() | ||
| } | ||
| } | ||
| benchmark.run() | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with the new testing approach, but is it possible to produce
tpcds-sf-1-textonly when conducting tests onZStandardBenchmark?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would prefer to keep it to make the workflow definition simple, as it only takes a few seconds to generate the dataset