Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 14 additions & 7 deletions .github/workflows/benchmark.yml
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ jobs:

# Any TPC-DS related updates on this job need to be applied to tpcds-1g job of build_and_test.yml as well
tpcds-1g-gen:
name: "Generate an input dataset for TPCDSQueryBenchmark with SF=1"
if: contains(inputs.class, 'TPCDSQueryBenchmark') || contains(inputs.class, '*')
name: "Generate an TPC-DS dataset with SF=1"
if: contains(inputs.class, 'TPCDSQueryBenchmark') || contains(inputs.class, 'ZStandardTPCDSDataBenchmark') || contains(inputs.class, '*')
runs-on: ubuntu-latest
env:
SPARK_LOCAL_IP: localhost
Expand Down Expand Up @@ -98,7 +98,9 @@ jobs:
id: cache-tpcds-sf-1
uses: actions/cache@v4
with:
path: ./tpcds-sf-1
path: |
./tpcds-sf-1
./tpcds-sf-1-text
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with the new testing approach, but is it possible to produce tpcds-sf-1-text only when conducting tests on ZStandardBenchmark?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer to keep it to make the workflow definition simple, as it only takes a few seconds to generate the dataset

$ time tpcds-kit/tools/dsdgen -DISTRIBUTIONS tpcds-kit/tools/tpcds.idx -SCALE 1 -DIR ~/tpcds-sf-1-text
dsdgen Population Generator (Version 2.13.0)
Copyright Transaction Processing Performance Council (TPC) 2001 - 2020
Warning: This scale factor is valid for QUALIFICATION ONLY
tpcds-kit/tools/dsdgen -DISTRIBUTIONS tpcds-kit/tools/tpcds.idx -SCALE 1 -DIR  7.86s user 0.57s system 99% cpu 8.467 total

key: tpcds-${{ hashFiles('.github/workflows/benchmark.yml', 'sql/core/src/test/scala/org/apache/spark/sql/TPCDSSchema.scala') }}
- name: Checkout tpcds-kit repository
if: steps.cache-tpcds-sf-1.outputs.cache-hit != 'true'
Expand All @@ -118,7 +120,9 @@ jobs:
java-version: ${{ inputs.jdk }}
- name: Generate TPC-DS (SF=1) table data
if: steps.cache-tpcds-sf-1.outputs.cache-hit != 'true'
run: build/sbt "sql/Test/runMain org.apache.spark.sql.GenTPCDSData --dsdgenDir `pwd`/tpcds-kit/tools --location `pwd`/tpcds-sf-1 --scaleFactor 1 --numPartitions 1 --overwrite"
run: |
build/sbt "sql/Test/runMain org.apache.spark.sql.GenTPCDSData --dsdgenDir `pwd`/tpcds-kit/tools --location `pwd`/tpcds-sf-1 --scaleFactor 1 --numPartitions 1 --overwrite"
mkdir -p `pwd`/tpcds-sf-1-text && rm -f `pwd`/tpcds-sf-1-text/* && `pwd`/tpcds-kit/tools/dsdgen -DISTRIBUTIONS `pwd`/tpcds-kit/tools/tpcds.idx -SCALE 1 -DIR `pwd`/tpcds-sf-1-text

benchmark:
name: "Run benchmarks: ${{ inputs.class }} (JDK ${{ inputs.jdk }}, Scala ${{ inputs.scala }}, ${{ matrix.split }} out of ${{ inputs.num-splits }} splits)"
Expand All @@ -138,6 +142,7 @@ jobs:
# To prevent spark.test.home not being set. See more detail in SPARK-36007.
SPARK_HOME: ${{ github.workspace }}
SPARK_TPCDS_DATA: ${{ github.workspace }}/tpcds-sf-1
SPARK_TPCDS_DATA_TEXT: ${{ github.workspace }}/tpcds-sf-1-text
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the output data size

total 1.2G
-rw-rw-r-- 1 chengpan chengpan 1.9K May 12 13:58 call_center.dat
-rw-rw-r-- 1 chengpan chengpan 1.6M May 12 13:58 catalog_page.dat
-rw-rw-r-- 1 chengpan chengpan  21M May 12 13:58 catalog_returns.dat
-rw-rw-r-- 1 chengpan chengpan 283M May 12 13:58 catalog_sales.dat     -- used for benchmark
-rw-rw-r-- 1 chengpan chengpan  13M May 12 13:58 customer.dat
-rw-rw-r-- 1 chengpan chengpan 5.3M May 12 13:58 customer_address.dat
-rw-rw-r-- 1 chengpan chengpan  77M May 12 13:58 customer_demographics.dat
-rw-rw-r-- 1 chengpan chengpan 9.9M May 12 13:58 date_dim.dat
-rw-rw-r-- 1 chengpan chengpan   67 May 12 13:58 dbgen_version.dat
-rw-rw-r-- 1 chengpan chengpan 149K May 12 13:58 household_demographics.dat
-rw-rw-r-- 1 chengpan chengpan  328 May 12 13:58 income_band.dat
-rw-rw-r-- 1 chengpan chengpan 226M May 12 13:58 inventory.dat
-rw-rw-r-- 1 chengpan chengpan 4.9M May 12 13:58 item.dat
-rw-rw-r-- 1 chengpan chengpan  37K May 12 13:58 promotion.dat
-rw-rw-r-- 1 chengpan chengpan 1.4K May 12 13:58 reason.dat
-rw-rw-r-- 1 chengpan chengpan 1.1K May 12 13:58 ship_mode.dat
-rw-rw-r-- 1 chengpan chengpan 3.1K May 12 13:58 store.dat
-rw-rw-r-- 1 chengpan chengpan  32M May 12 13:58 store_returns.dat
-rw-rw-r-- 1 chengpan chengpan 371M May 12 13:58 store_sales.dat
-rw-rw-r-- 1 chengpan chengpan 4.9M May 12 13:58 time_dim.dat
-rw-rw-r-- 1 chengpan chengpan  585 May 12 13:58 warehouse.dat
-rw-rw-r-- 1 chengpan chengpan 5.7K May 12 13:58 web_page.dat
-rw-rw-r-- 1 chengpan chengpan 9.4M May 12 13:58 web_returns.dat
-rw-rw-r-- 1 chengpan chengpan 141M May 12 13:58 web_sales.dat
-rw-rw-r-- 1 chengpan chengpan 8.6K May 12 13:58 web_site.dat

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @luben, in this round, I'm trying to use TPCDS-generated data for the zstd compression benchmark.

The data can be generated by the following steps:

And my local test shows that zstd-jni 1.5.6 and 1.5.7 are basically at the same level, and 1.5.7 is a little bit faster in some cases.

steps:
- name: Checkout Spark repository
uses: actions/checkout@v4
Expand Down Expand Up @@ -167,11 +172,13 @@ jobs:
distribution: zulu
java-version: ${{ inputs.jdk }}
- name: Cache TPC-DS generated data
if: contains(inputs.class, 'TPCDSQueryBenchmark') || contains(inputs.class, '*')
if: contains(inputs.class, 'TPCDSQueryBenchmark') || contains(inputs.class, 'ZStandardTPCDSDataBenchmark') || contains(inputs.class, '*')
id: cache-tpcds-sf-1
uses: actions/cache@v4
with:
path: ./tpcds-sf-1
path: |
./tpcds-sf-1
./tpcds-sf-1-text
key: tpcds-${{ hashFiles('.github/workflows/benchmark.yml', 'sql/core/src/test/scala/org/apache/spark/sql/TPCDSSchema.scala') }}
- name: Run benchmarks
run: |
Expand All @@ -188,7 +195,7 @@ jobs:
# To keep the directory structure and file permissions, tar them
# See also https://github.com/actions/upload-artifact#maintaining-file-permissions-and-case-sensitive-files
echo "Preparing the benchmark results:"
tar -cvf benchmark-results-${{ inputs.jdk }}-${{ inputs.scala }}.tar `git diff --name-only` `git ls-files --others --exclude=tpcds-sf-1 --exclude-standard`
tar -cvf benchmark-results-${{ inputs.jdk }}-${{ inputs.scala }}.tar `git diff --name-only` `git ls-files --others --exclude=tpcds-sf-1 --exclude=tpcds-sf-1-text --exclude-standard`
- name: Upload benchmark results
uses: actions/upload-artifact@v4
with:
Expand Down
49 changes: 49 additions & 0 deletions core/benchmarks/ZStandardTPCDSDataBenchmark-jdk21-results.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
================================================================================================
Benchmark ZStandardCompressionCodec
================================================================================================

OpenJDK 64-Bit Server VM 21.0.7+6-LTS on Linux 6.11.0-1013-azure
AMD EPYC 7763 64-Core Processor
Benchmark ZStandardCompressionCodec: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
----------------------------------------------------------------------------------------------------------------------------------
Compression 4 times at level 1 without buffer pool 2539 2541 2 0.0 634832028.5 1.0X
Compression 4 times at level 2 without buffer pool 4157 4188 44 0.0 1039277864.3 0.6X
Compression 4 times at level 3 without buffer pool 6091 6095 5 0.0 1522781623.3 0.4X
Compression 4 times at level 1 with buffer pool 2536 2540 5 0.0 634097186.3 1.0X
Compression 4 times at level 2 with buffer pool 4147 4150 4 0.0 1036639857.0 0.6X
Compression 4 times at level 3 with buffer pool 6097 6099 3 0.0 1524134426.0 0.4X

OpenJDK 64-Bit Server VM 21.0.7+6-LTS on Linux 6.11.0-1013-azure
AMD EPYC 7763 64-Core Processor
Benchmark ZStandardCompressionCodec: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
--------------------------------------------------------------------------------------------------------------------------------------
Decompression 4 times from level 1 without buffer pool 886 902 23 0.0 221484611.2 1.0X
Decompression 4 times from level 2 without buffer pool 1109 1130 30 0.0 277257788.3 0.8X
Decompression 4 times from level 3 without buffer pool 1336 1359 32 0.0 334102921.8 0.7X
Decompression 4 times from level 1 with buffer pool 858 868 9 0.0 214401966.0 1.0X
Decompression 4 times from level 2 with buffer pool 1131 1140 12 0.0 282739707.3 0.8X
Decompression 4 times from level 3 with buffer pool 1366 1375 12 0.0 341571527.0 0.6X

OpenJDK 64-Bit Server VM 21.0.7+6-LTS on Linux 6.11.0-1013-azure
AMD EPYC 7763 64-Core Processor
Parallel Compression at level 3: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
Parallel Compression with 0 workers 2030 2033 4 0.0 507451934.3 1.0X
Parallel Compression with 1 workers 1879 1882 4 0.0 469750208.3 1.1X
Parallel Compression with 2 workers 969 976 10 0.0 242174332.5 2.1X
Parallel Compression with 4 workers 711 713 2 0.0 177820489.8 2.9X
Parallel Compression with 8 workers 847 898 53 0.0 211649152.3 2.4X
Parallel Compression with 16 workers 848 859 10 0.0 211876140.0 2.4X

OpenJDK 64-Bit Server VM 21.0.7+6-LTS on Linux 6.11.0-1013-azure
AMD EPYC 7763 64-Core Processor
Parallel Compression at level 9: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
Parallel Compression with 0 workers 8266 8278 16 0.0 2066565583.8 1.0X
Parallel Compression with 1 workers 6933 6941 10 0.0 1733356075.3 1.2X
Parallel Compression with 2 workers 3690 3691 1 0.0 922481882.3 2.2X
Parallel Compression with 4 workers 3223 3231 11 0.0 805643345.5 2.6X
Parallel Compression with 8 workers 3652 3656 7 0.0 912916115.3 2.3X
Parallel Compression with 16 workers 3912 3950 54 0.0 977901486.2 2.1X


49 changes: 49 additions & 0 deletions core/benchmarks/ZStandardTPCDSDataBenchmark-results.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
================================================================================================
Benchmark ZStandardCompressionCodec
================================================================================================

OpenJDK 64-Bit Server VM 17.0.15+6-LTS on Linux 6.11.0-1014-azure
AMD EPYC 7763 64-Core Processor
Benchmark ZStandardCompressionCodec: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
----------------------------------------------------------------------------------------------------------------------------------
Compression 4 times at level 1 without buffer pool 2518 2519 1 0.0 629582183.5 1.0X
Compression 4 times at level 2 without buffer pool 4111 4111 1 0.0 1027767031.5 0.6X
Compression 4 times at level 3 without buffer pool 6146 6160 19 0.0 1536532700.3 0.4X
Compression 4 times at level 1 with buffer pool 2517 2517 1 0.0 629208370.5 1.0X
Compression 4 times at level 2 with buffer pool 4105 4112 11 0.0 1026190298.0 0.6X
Compression 4 times at level 3 with buffer pool 6154 6157 5 0.0 1538378430.0 0.4X

OpenJDK 64-Bit Server VM 17.0.15+6-LTS on Linux 6.11.0-1014-azure
AMD EPYC 7763 64-Core Processor
Benchmark ZStandardCompressionCodec: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
--------------------------------------------------------------------------------------------------------------------------------------
Decompression 4 times from level 1 without buffer pool 900 903 4 0.0 225055920.0 1.0X
Decompression 4 times from level 2 without buffer pool 1161 1163 3 0.0 290146657.0 0.8X
Decompression 4 times from level 3 without buffer pool 1399 1406 10 0.0 349650877.8 0.6X
Decompression 4 times from level 1 with buffer pool 899 901 2 0.0 224627803.0 1.0X
Decompression 4 times from level 2 with buffer pool 1165 1166 1 0.0 291335735.3 0.8X
Decompression 4 times from level 3 with buffer pool 1398 1401 4 0.0 349578394.0 0.6X

OpenJDK 64-Bit Server VM 17.0.15+6-LTS on Linux 6.11.0-1014-azure
AMD EPYC 7763 64-Core Processor
Parallel Compression at level 3: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
Parallel Compression with 0 workers 2061 2067 8 0.0 515297811.0 1.0X
Parallel Compression with 1 workers 1843 1844 1 0.0 460705797.3 1.1X
Parallel Compression with 2 workers 961 972 16 0.0 240177085.3 2.1X
Parallel Compression with 4 workers 729 731 2 0.0 182208026.2 2.8X
Parallel Compression with 8 workers 781 800 18 0.0 195212932.0 2.6X
Parallel Compression with 16 workers 865 871 6 0.0 216145271.5 2.4X

OpenJDK 64-Bit Server VM 17.0.15+6-LTS on Linux 6.11.0-1014-azure
AMD EPYC 7763 64-Core Processor
Parallel Compression at level 9: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
Parallel Compression with 0 workers 8557 8635 110 0.0 2139353975.8 1.0X
Parallel Compression with 1 workers 7156 7193 52 0.0 1789023949.5 1.2X
Parallel Compression with 2 workers 3855 3861 9 0.0 963635046.3 2.2X
Parallel Compression with 4 workers 3248 3253 8 0.0 811889324.8 2.6X
Parallel Compression with 8 workers 3667 3671 6 0.0 916671282.5 2.3X
Parallel Compression with 16 workers 3799 3845 65 0.0 949757174.5 2.3X


Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.io

import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectOutputStream, OutputStream}
import java.nio.file.{Files, Paths}

import org.apache.spark.SparkConf
import org.apache.spark.benchmark.{Benchmark, BenchmarkBase}
import org.apache.spark.internal.config.{IO_COMPRESSION_ZSTD_BUFFERPOOL_ENABLED, IO_COMPRESSION_ZSTD_LEVEL, IO_COMPRESSION_ZSTD_WORKERS}

/**
* Benchmark for ZStandard codec performance.
* {{{
* To run this benchmark:
* 1. without sbt: bin/spark-submit --class <this class> <spark core test jar>
* 2. build/sbt "core/Test/runMain <this class>"
* 3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "core/Test/runMain <this class>"
* Results will be written to "benchmarks/ZStandardTPCDSDataBenchmark-results.txt".
* }}}
*/
object ZStandardTPCDSDataBenchmark extends BenchmarkBase {

val N = 4

// the size of TPCDS catalog_sales.dat (SF1) is about 283M
val data = Files.readAllBytes(Paths.get(sys.env("SPARK_TPCDS_DATA_TEXT"), "catalog_sales.dat"))

override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
val name = "Benchmark ZStandardCompressionCodec"
runBenchmark(name) {
val benchmark1 = new Benchmark(name, N, output = output)
compressionBenchmark(benchmark1, N)
benchmark1.run()

val benchmark2 = new Benchmark(name, N, output = output)
decompressionBenchmark(benchmark2, N)
benchmark2.run()
parallelCompressionBenchmark()
}
}

private def compressionBenchmark(benchmark: Benchmark, N: Int): Unit = {
Seq(false, true).foreach { enablePool =>
Seq(1, 2, 3).foreach { level =>
val conf = new SparkConf(false)
.set(IO_COMPRESSION_ZSTD_BUFFERPOOL_ENABLED, enablePool)
.set(IO_COMPRESSION_ZSTD_LEVEL, level)
val condition = if (enablePool) "with" else "without"
benchmark.addCase(s"Compression $N times at level $level $condition buffer pool") { _ =>
(1 until N).foreach { _ =>
val os = new ZStdCompressionCodec(conf)
.compressedOutputStream(OutputStream.nullOutputStream())
os.write(data)
os.close()
}
}
}
}
}

private def decompressionBenchmark(benchmark: Benchmark, N: Int): Unit = {
Seq(false, true).foreach { enablePool =>
Seq(1, 2, 3).foreach { level =>
val conf = new SparkConf(false)
.set(IO_COMPRESSION_ZSTD_BUFFERPOOL_ENABLED, enablePool)
.set(IO_COMPRESSION_ZSTD_LEVEL, level)
val outputStream = new ByteArrayOutputStream()
val out = new ZStdCompressionCodec(conf).compressedOutputStream(outputStream)
out.write(data)
out.close()
val bytes = outputStream.toByteArray

val condition = if (enablePool) "with" else "without"
benchmark.addCase(s"Decompression $N times from level $level $condition buffer pool") { _ =>
(1 until N).foreach { _ =>
val bais = new ByteArrayInputStream(bytes)
val is = new ZStdCompressionCodec(conf).compressedInputStream(bais)
is.readAllBytes()
is.close()
}
}
}
}
}

private def parallelCompressionBenchmark(): Unit = {
Seq(3, 9).foreach { level =>
val benchmark = new Benchmark(
s"Parallel Compression at level $level", N, output = output)
Seq(0, 1, 2, 4, 8, 16).foreach { workers =>
val conf = new SparkConf(false)
.set(IO_COMPRESSION_ZSTD_LEVEL, level)
.set(IO_COMPRESSION_ZSTD_WORKERS, workers)
benchmark.addCase(s"Parallel Compression with $workers workers") { _ =>
val os = OutputStream.nullOutputStream()
val zcos = new ZStdCompressionCodec(conf).compressedOutputStream(os)
val oos = new ObjectOutputStream(zcos)
1 to N foreach { _ =>
oos.writeObject(data)
}
oos.close()
}
}
benchmark.run()
}
}
}