diff --git a/.github/workflows/benchmark.yml b/.github/workflows/benchmark.yml index 20ea952ff0a2a..257c80022110c 100644 --- a/.github/workflows/benchmark.yml +++ b/.github/workflows/benchmark.yml @@ -185,7 +185,7 @@ jobs: echo "Preparing the benchmark results:" tar -cvf benchmark-results-${{ github.event.inputs.jdk }}-${{ github.event.inputs.scala }}.tar `git diff --name-only` `git ls-files --others --exclude=tpcds-sf-1 --exclude-standard` - name: Upload benchmark results - uses: actions/upload-artifact@v3 + uses: actions/upload-artifact@v4 with: name: benchmark-results-${{ github.event.inputs.jdk }}-${{ github.event.inputs.scala }}-${{ matrix.split }} path: benchmark-results-${{ github.event.inputs.jdk }}-${{ github.event.inputs.scala }}.tar diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index 40bcf734c6af9..493ed0c413a90 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -272,13 +272,13 @@ jobs: ./dev/run-tests --parallelism 1 --modules "$MODULES_TO_TEST" --included-tags "$INCLUDED_TAGS" --excluded-tags "$EXCLUDED_TAGS" - name: Upload test results to report if: always() - uses: actions/upload-artifact@v3 + uses: actions/upload-artifact@v4 with: name: test-results-${{ matrix.modules }}-${{ matrix.comment }}-${{ matrix.java }}-${{ matrix.hadoop }}-${{ matrix.hive }} path: "**/target/test-reports/*.xml" - name: Upload unit tests log files if: ${{ !success() }} - uses: actions/upload-artifact@v3 + uses: actions/upload-artifact@v4 with: name: unit-tests-log-${{ matrix.modules }}-${{ matrix.comment }}-${{ matrix.java }}-${{ matrix.hadoop }}-${{ matrix.hive }} path: "**/target/unit-tests.log" @@ -470,16 +470,16 @@ jobs: - name: Upload test results to report env: ${{ fromJSON(inputs.envs) }} if: always() - uses: actions/upload-artifact@v3 + uses: actions/upload-artifact@v4 with: - name: test-results-${{ matrix.modules }}--${{ matrix.java }}-${{ inputs.hadoop }}-hive2.3 + name: test-results-${{ matrix.modules }}--${{ matrix.java }}-${{ inputs.hadoop }}-hive2.3-${{ env.PYTHON_TO_TEST }} path: "**/target/test-reports/*.xml" - name: Upload unit tests log files env: ${{ fromJSON(inputs.envs) }} if: ${{ !success() }} - uses: actions/upload-artifact@v3 + uses: actions/upload-artifact@v4 with: - name: unit-tests-log-${{ matrix.modules }}--${{ matrix.java }}-${{ inputs.hadoop }}-hive2.3 + name: unit-tests-log-${{ matrix.modules }}--${{ matrix.java }}-${{ inputs.hadoop }}-hive2.3-${{ env.PYTHON_TO_TEST }} path: "**/target/unit-tests.log" sparkr: @@ -556,7 +556,7 @@ jobs: ./dev/run-tests --parallelism 1 --modules sparkr - name: Upload test results to report if: always() - uses: actions/upload-artifact@v3 + uses: actions/upload-artifact@v4 with: name: test-results-sparkr--${{ inputs.java }}-${{ inputs.hadoop }}-hive2.3 path: "**/target/test-reports/*.xml" @@ -699,11 +699,18 @@ jobs: # Should delete this section after SPARK 3.5 EOL. python3.9 -m pip install 'flake8==3.9.0' pydata_sphinx_theme 'mypy==0.982' 'pytest==7.1.3' 'pytest-mypy-plugins==1.9.3' numpydoc 'jinja2<3.0.0' 'black==22.6.0' python3.9 -m pip install 'pandas-stubs==1.2.0.53' ipython 'grpcio==1.59.3' 'grpc-stubs==1.24.11' 'googleapis-common-protos-stubs==2.2.0' - - name: Install Python linter dependencies + - name: Install Python dependencies for python linter and documentation generation if: inputs.branch != 'branch-3.4' && inputs.branch != 'branch-3.5' run: | - python3.9 -m pip install 'flake8==3.9.0' pydata_sphinx_theme 'mypy==0.982' 'pytest==7.1.3' 'pytest-mypy-plugins==1.9.3' numpydoc jinja2 'black==23.9.1' - python3.9 -m pip install 'pandas-stubs==1.2.0.53' ipython 'grpcio==1.59.3' 'grpc-stubs==1.24.11' 'googleapis-common-protos-stubs==2.2.0' + # Should unpin 'sphinxcontrib-*' after upgrading sphinx>5 + # See 'ipython_genutils' in SPARK-38517 + # See 'docutils<0.18.0' in SPARK-39421 + python3.9 -m pip install 'sphinx==4.5.0' mkdocs 'pydata_sphinx_theme>=0.13' sphinx-copybutton nbsphinx numpydoc jinja2 markupsafe 'pyzmq<24.0.0' \ + ipython ipython_genutils sphinx_plotly_directive 'numpy>=1.20.0' pyarrow pandas 'plotly>=4.8' 'docutils<0.18.0' \ + 'flake8==3.9.0' 'mypy==0.982' 'pytest==7.1.3' 'pytest-mypy-plugins==1.9.3' 'black==23.9.1' \ + 'pandas-stubs==1.2.0.53' 'grpcio==1.59.3' 'grpc-stubs==1.24.11' 'googleapis-common-protos-stubs==2.2.0' \ + 'sphinxcontrib-applehelp==1.0.4' 'sphinxcontrib-devhelp==1.0.2' 'sphinxcontrib-htmlhelp==2.0.1' 'sphinxcontrib-qthelp==1.0.3' 'sphinxcontrib-serializinghtml==1.1.5' + python3.9 -m pip list - name: Python linter run: PYTHON_EXECUTABLE=python3.9 ./dev/lint-python # Should delete this section after SPARK 3.5 EOL. @@ -751,13 +758,13 @@ jobs: Rscript -e "install.packages(c('devtools', 'testthat', 'knitr', 'rmarkdown', 'markdown', 'e1071', 'roxygen2', 'ggplot2', 'mvtnorm', 'statmod'), repos='https://cloud.r-project.org/')" Rscript -e "devtools::install_version('pkgdown', version='2.0.1', repos='https://cloud.r-project.org')" Rscript -e "devtools::install_version('preferably', version='0.4', repos='https://cloud.r-project.org')" - - name: Install dependencies for documentation generation - run: | # Should unpin 'sphinxcontrib-*' after upgrading sphinx>5 python3.9 -m pip install 'sphinx==4.5.0' mkdocs 'pydata_sphinx_theme>=0.13' sphinx-copybutton nbsphinx numpydoc jinja2 markupsafe 'pyzmq<24.0.0' 'sphinxcontrib-applehelp==1.0.4' 'sphinxcontrib-devhelp==1.0.2' 'sphinxcontrib-htmlhelp==2.0.1' 'sphinxcontrib-qthelp==1.0.3' 'sphinxcontrib-serializinghtml==1.1.5' python3.9 -m pip install ipython_genutils # See SPARK-38517 python3.9 -m pip install sphinx_plotly_directive 'numpy>=1.20.0' pyarrow pandas 'plotly>=4.8' python3.9 -m pip install 'docutils<0.18.0' # See SPARK-39421 + - name: Install dependencies for documentation generation + run: | gem install bundler -v 2.4.22 cd docs bundle install @@ -781,7 +788,7 @@ jobs: run: tar cjf site.tar.bz2 docs/_site - name: Upload documentation if: github.repository != 'apache/spark' - uses: actions/upload-artifact@v3 + uses: actions/upload-artifact@v4 with: name: site path: site.tar.bz2 @@ -934,13 +941,13 @@ jobs: spark.sql.join.forceApplyShuffledHashJoin=true - name: Upload test results to report if: always() - uses: actions/upload-artifact@v3 + uses: actions/upload-artifact@v4 with: name: test-results-tpcds--${{ inputs.java }}-${{ inputs.hadoop }}-hive2.3 path: "**/target/test-reports/*.xml" - name: Upload unit tests log files if: ${{ !success() }} - uses: actions/upload-artifact@v3 + uses: actions/upload-artifact@v4 with: name: unit-tests-log-tpcds--${{ inputs.java }}-${{ inputs.hadoop }}-hive2.3 path: "**/target/unit-tests.log" @@ -1003,13 +1010,13 @@ jobs: ./dev/run-tests --parallelism 1 --modules docker-integration-tests --included-tags org.apache.spark.tags.DockerTest - name: Upload test results to report if: always() - uses: actions/upload-artifact@v3 + uses: actions/upload-artifact@v4 with: name: test-results-docker-integration--${{ inputs.java }}-${{ inputs.hadoop }}-hive2.3 path: "**/target/test-reports/*.xml" - name: Upload unit tests log files if: ${{ !success() }} - uses: actions/upload-artifact@v3 + uses: actions/upload-artifact@v4 with: name: unit-tests-log-docker-integration--${{ inputs.java }}-${{ inputs.hadoop }}-hive2.3 path: "**/target/unit-tests.log" @@ -1084,7 +1091,7 @@ jobs: build/sbt -Phadoop-3 -Psparkr -Pkubernetes -Pvolcano -Pkubernetes-integration-tests -Dspark.kubernetes.test.driverRequestCores=0.5 -Dspark.kubernetes.test.executorRequestCores=0.2 -Dspark.kubernetes.test.volcanoMaxConcurrencyJobNum=1 -Dtest.exclude.tags=local "kubernetes-integration-tests/test" - name: Upload Spark on K8S integration tests log files if: ${{ !success() }} - uses: actions/upload-artifact@v3 + uses: actions/upload-artifact@v4 with: name: spark-on-kubernetes-it-log path: "**/target/integration-tests.log" diff --git a/.github/workflows/maven_test.yml b/.github/workflows/maven_test.yml index 078a380f0afea..3f19c76fd9616 100644 --- a/.github/workflows/maven_test.yml +++ b/.github/workflows/maven_test.yml @@ -200,13 +200,13 @@ jobs: rm -rf ~/.m2/repository/org/apache/spark - name: Upload test results to report if: always() - uses: actions/upload-artifact@v3 + uses: actions/upload-artifact@v4 with: name: test-results-${{ matrix.modules }}-${{ matrix.comment }}-${{ matrix.java }}-${{ matrix.hadoop }}-${{ matrix.hive }} path: "**/target/test-reports/*.xml" - name: Upload unit tests log files if: failure() - uses: actions/upload-artifact@v3 + uses: actions/upload-artifact@v4 with: name: unit-tests-log-${{ matrix.modules }}-${{ matrix.comment }}-${{ matrix.java }}-${{ matrix.hadoop }}-${{ matrix.hive }} path: "**/target/unit-tests.log" diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Column.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Column.scala index 140c4ce7cd94b..4cb99541ccf01 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Column.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Column.scala @@ -1298,7 +1298,8 @@ private[sql] object Column { val builder = proto.Expression.newBuilder() name match { case "*" => - builder.getUnresolvedStarBuilder + val starBuilder = builder.getUnresolvedStarBuilder + planId.foreach(starBuilder.setPlanId) case _ if name.endsWith(".*") => builder.getUnresolvedStarBuilder.setUnparsedTarget(name) case _ => diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala index 9191633171f78..2a48958d42222 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala @@ -402,7 +402,14 @@ object functions { * @group agg_funcs * @since 3.4.0 */ - def count(e: Column): Column = Column.fn("count", e) + def count(e: Column): Column = { + val withoutStar = e.expr.getExprTypeCase match { + // Turn count(*) into count(1) + case proto.Expression.ExprTypeCase.UNRESOLVED_STAR => lit(1) + case _ => e + } + Column.fn("count", withoutStar) + } /** * Aggregate function: returns the number of items in a group. diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala index 288964a084bad..f2f1571452c0a 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala @@ -878,6 +878,31 @@ class ClientE2ETestSuite extends RemoteSparkSession with SQLHelper with PrivateM assert(joined2.schema.catalogString === "struct") } + test("join with dataframe star") { + val left = spark.range(100) + val right = spark.range(100).select(col("id"), rand(12).as("a")) + val join1 = left.join(right, left("id") === right("id")) + assert( + join1.select(join1.col("*")).schema.catalogString === + "struct") + assert(join1.select(left.col("*")).schema.catalogString === "struct") + assert(join1.select(right.col("*")).schema.catalogString === "struct") + + val join2 = left.join(right) + assert( + join2.select(join2.col("*")).schema.catalogString === + "struct") + assert(join2.select(left.col("*")).schema.catalogString === "struct") + assert(join2.select(right.col("*")).schema.catalogString === "struct") + + val join3 = left.join(right, "id") + assert( + join3.select(join3.col("*")).schema.catalogString === + "struct") + assert(join3.select(left.col("*")).schema.catalogString === "struct") + assert(join3.select(right.col("*")).schema.catalogString === "struct") + } + test("SPARK-45509: ambiguous column reference") { val session = spark import session.implicits._ diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/application/ReplE2ESuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/application/ReplE2ESuite.scala index 0cce44c6e3d9a..76958f055f2ef 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/application/ReplE2ESuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/application/ReplE2ESuite.scala @@ -397,6 +397,22 @@ class ReplE2ESuite extends RemoteSparkSession with BeforeAndAfterEach { assertContains("noException: Boolean = true", output) } + test("broadcast works with REPL generated code") { + val input = + """ + |val add1 = udf((i: Long) => i + 1) + |val tableA = spark.range(2).alias("a") + |val tableB = broadcast(spark.range(2).select(add1(col("id")).alias("id"))).alias("b") + |tableA.join(tableB). + | where(col("a.id")===col("b.id")). + | select(col("a.id").alias("a_id"), col("b.id").alias("b_id")). + | collect(). + | mkString("[", ", ", "]") + |""".stripMargin + val output = runCommandsInShell(input) + assertContains("""String = "[[1,1]]"""", output) + } + test("closure cleaner") { val input = """ diff --git a/connector/connect/common/src/test/resources/query-tests/queries/groupby_agg.json b/connector/connect/common/src/test/resources/query-tests/queries/groupby_agg.json index 4a1cfddb0288f..65f266794828e 100644 --- a/connector/connect/common/src/test/resources/query-tests/queries/groupby_agg.json +++ b/connector/connect/common/src/test/resources/query-tests/queries/groupby_agg.json @@ -81,7 +81,8 @@ "unresolvedFunction": { "functionName": "count", "arguments": [{ - "unresolvedStar": { + "literal": { + "integer": 1 } }] } diff --git a/connector/connect/common/src/test/resources/query-tests/queries/groupby_agg.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/groupby_agg.proto.bin index cfd6c2daa84b4..18d8c6ce41153 100644 Binary files a/connector/connect/common/src/test/resources/query-tests/queries/groupby_agg.proto.bin and b/connector/connect/common/src/test/resources/query-tests/queries/groupby_agg.proto.bin differ diff --git a/connector/connect/common/src/test/resources/query-tests/queries/toJSON.json b/connector/connect/common/src/test/resources/query-tests/queries/toJSON.json index 278767e620a16..9a99a18853cf1 100644 --- a/connector/connect/common/src/test/resources/query-tests/queries/toJSON.json +++ b/connector/connect/common/src/test/resources/query-tests/queries/toJSON.json @@ -19,6 +19,7 @@ "functionName": "struct", "arguments": [{ "unresolvedStar": { + "planId": "0" } }] } diff --git a/connector/connect/common/src/test/resources/query-tests/queries/toJSON.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/toJSON.proto.bin index e08d0fd2180f0..e930ee76aae97 100644 Binary files a/connector/connect/common/src/test/resources/query-tests/queries/toJSON.proto.bin and b/connector/connect/common/src/test/resources/query-tests/queries/toJSON.proto.bin differ diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala index d1d247967b4b5..bae2747882123 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala @@ -279,7 +279,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu sql(s"CREATE index i1 ON $catalogName.new_table (col1)") }, errorClass = "INDEX_ALREADY_EXISTS", - parameters = Map("indexName" -> "i1", "tableName" -> "new_table") + parameters = Map("indexName" -> "`i1`", "tableName" -> "`new_table`") ) sql(s"DROP index i1 ON $catalogName.new_table") @@ -304,7 +304,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu sql(s"DROP index i1 ON $catalogName.new_table") }, errorClass = "INDEX_NOT_FOUND", - parameters = Map("indexName" -> "i1", "tableName" -> "new_table") + parameters = Map("indexName" -> "`i1`", "tableName" -> "`new_table`") ) } } diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md index e5053f1af3626..96dd5528aac5d 100644 --- a/docs/streaming-programming-guide.md +++ b/docs/streaming-programming-guide.md @@ -2338,10 +2338,6 @@ There are a few parameters that can help you tune the memory usage and GC overhe * **Clearing old data**: By default, all input data and persisted RDDs generated by DStream transformations are automatically cleared. Spark Streaming decides when to clear the data based on the transformations that are used. For example, if you are using a window operation of 10 minutes, then Spark Streaming will keep around the last 10 minutes of data, and actively throw away older data. Data can be retained for a longer duration (e.g. interactively querying older data) by setting `streamingContext.remember`. -* **CMS Garbage Collector**: Use of the concurrent mark-and-sweep GC is strongly recommended for keeping GC-related pauses consistently low. Even though concurrent GC is known to reduce the -overall processing throughput of the system, its use is still recommended to achieve more -consistent batch processing times. Make sure you set the CMS GC on both the driver (using `--driver-java-options` in `spark-submit`) and the executors (using [Spark configuration](configuration.html#runtime-environment) `spark.executor.extraJavaOptions`). - * **Other tips**: To further reduce GC overheads, here are some more tips to try. - Persist RDDs using the `OFF_HEAP` storage level. See more detail in the [Spark Programming Guide](rdd-programming-guide.html#rdd-persistence). - Use more executors with smaller heap sizes. This will reduce the GC pressure within each JVM heap. diff --git a/python/pyspark/sql/connect/functions/builtin.py b/python/pyspark/sql/connect/functions/builtin.py index 2eeefc9fae23a..1e22a42c6241e 100644 --- a/python/pyspark/sql/connect/functions/builtin.py +++ b/python/pyspark/sql/connect/functions/builtin.py @@ -1010,6 +1010,8 @@ def corr(col1: "ColumnOrName", col2: "ColumnOrName") -> Column: def count(col: "ColumnOrName") -> Column: + if isinstance(col, Column) and isinstance(col._expr, UnresolvedStar): + col = lit(1) return _invoke_function_over_columns("count", col) diff --git a/python/pyspark/sql/functions/builtin.py b/python/pyspark/sql/functions/builtin.py index f1422d17b071a..1f6d86de28dcb 100644 --- a/python/pyspark/sql/functions/builtin.py +++ b/python/pyspark/sql/functions/builtin.py @@ -17160,7 +17160,8 @@ def transform_values(col: "ColumnOrName", f: Callable[[Column, Column], Column]) @_try_remote_functions def map_filter(col: "ColumnOrName", f: Callable[[Column, Column], Column]) -> Column: """ - Returns a map whose key-value pairs satisfy a predicate. + Collection function: Returns a new map column whose key-value pairs satisfy a given + predicate function. .. versionadded:: 3.1.0 @@ -17170,9 +17171,10 @@ def map_filter(col: "ColumnOrName", f: Callable[[Column, Column], Column]) -> Co Parameters ---------- col : :class:`~pyspark.sql.Column` or str - name of column or expression + The name of the column or a column expression representing the map to be filtered. f : function - a binary function ``(k: Column, v: Column) -> Column...`` + A binary function ``(k: Column, v: Column) -> Column...`` that defines the predicate. + This function should return a boolean column that will be used to filter the input map. Can use methods of :class:`~pyspark.sql.Column`, functions defined in :py:mod:`pyspark.sql.functions` and Scala ``UserDefinedFunctions``. Python ``UserDefinedFunctions`` are not supported @@ -17181,16 +17183,39 @@ def map_filter(col: "ColumnOrName", f: Callable[[Column, Column], Column]) -> Co Returns ------- :class:`~pyspark.sql.Column` - filtered map. + A new map column containing only the key-value pairs that satisfy the predicate. Examples -------- + Example 1: Filtering a map with a simple condition + + >>> from pyspark.sql import functions as sf >>> df = spark.createDataFrame([(1, {"foo": 42.0, "bar": 1.0, "baz": 32.0})], ("id", "data")) - >>> row = df.select(map_filter( - ... "data", lambda _, v: v > 30.0).alias("data_filtered") + >>> row = df.select( + ... sf.map_filter("data", lambda _, v: v > 30.0).alias("data_filtered") ... ).head() >>> sorted(row["data_filtered"].items()) [('baz', 32.0), ('foo', 42.0)] + + Example 2: Filtering a map with a condition on keys + + >>> from pyspark.sql import functions as sf + >>> df = spark.createDataFrame([(1, {"foo": 42.0, "bar": 1.0, "baz": 32.0})], ("id", "data")) + >>> row = df.select( + ... sf.map_filter("data", lambda k, _: k.startswith("b")).alias("data_filtered") + ... ).head() + >>> sorted(row["data_filtered"].items()) + [('bar', 1.0), ('baz', 32.0)] + + Example 3: Filtering a map with a complex condition + + >>> from pyspark.sql import functions as sf + >>> df = spark.createDataFrame([(1, {"foo": 42.0, "bar": 1.0, "baz": 32.0})], ("id", "data")) + >>> row = df.select( + ... sf.map_filter("data", lambda k, v: k.startswith("b") & (v > 1.0)).alias("data_filtered") + ... ).head() + >>> sorted(row["data_filtered"].items()) + [('baz', 32.0)] """ return _invoke_higher_order_function("MapFilter", [col], [f]) @@ -17202,7 +17227,8 @@ def map_zip_with( f: Callable[[Column, Column, Column], Column], ) -> Column: """ - Merge two given maps, key-wise into a single map using a function. + Collection: Merges two given maps into a single map by applying a function to + the key-value pairs. .. versionadded:: 3.1.0 @@ -17212,11 +17238,13 @@ def map_zip_with( Parameters ---------- col1 : :class:`~pyspark.sql.Column` or str - name of the first column or expression + The name of the first column or a column expression representing the first map. col2 : :class:`~pyspark.sql.Column` or str - name of the second column or expression + The name of the second column or a column expression representing the second map. f : function - a ternary function ``(k: Column, v1: Column, v2: Column) -> Column...`` + A ternary function ``(k: Column, v1: Column, v2: Column) -> Column...`` that defines + how to merge the values from the two maps. This function should return a column that + will be used as the value in the resulting map. Can use methods of :class:`~pyspark.sql.Column`, functions defined in :py:mod:`pyspark.sql.functions` and Scala ``UserDefinedFunctions``. Python ``UserDefinedFunctions`` are not supported @@ -17225,20 +17253,50 @@ def map_zip_with( Returns ------- :class:`~pyspark.sql.Column` - zipped map where entries are calculated by applying given function to each - pair of arguments. + A new map column where each key-value pair is the result of applying the function to + the corresponding key-value pairs in the input maps. Examples -------- + Example 1: Merging two maps with a simple function + + >>> from pyspark.sql import functions as sf >>> df = spark.createDataFrame([ - ... (1, {"IT": 24.0, "SALES": 12.00}, {"IT": 2.0, "SALES": 1.4})], - ... ("id", "base", "ratio") - ... ) - >>> row = df.select(map_zip_with( - ... "base", "ratio", lambda k, v1, v2: round(v1 * v2, 2)).alias("updated_data") + ... (1, {"A": 1, "B": 2}, {"A": 3, "B": 4})], + ... ("id", "map1", "map2")) + >>> row = df.select( + ... sf.map_zip_with("map1", "map2", lambda _, v1, v2: v1 + v2).alias("updated_data") + ... ).head() + >>> sorted(row["updated_data"].items()) + [('A', 4), ('B', 6)] + + Example 2: Merging two maps with a complex function + + >>> from pyspark.sql import functions as sf + >>> df = spark.createDataFrame([ + ... (1, {"A": 1, "B": 2}, {"A": 3, "B": 4})], + ... ("id", "map1", "map2")) + >>> row = df.select( + ... sf.map_zip_with("map1", "map2", + ... lambda k, v1, v2: sf.when(k == "A", v1 + v2).otherwise(v1 - v2) + ... ).alias("updated_data") + ... ).head() + >>> sorted(row["updated_data"].items()) + [('A', 4), ('B', -2)] + + Example 3: Merging two maps with mismatched keys + + >>> from pyspark.sql import functions as sf + >>> df = spark.createDataFrame([ + ... (1, {"A": 1, "B": 2}, {"B": 3, "C": 4})], + ... ("id", "map1", "map2")) + >>> row = df.select( + ... sf.map_zip_with("map1", "map2", + ... lambda _, v1, v2: sf.when(v2.isNull(), v1).otherwise(v1 + v2) + ... ).alias("updated_data") ... ).head() >>> sorted(row["updated_data"].items()) - [('IT', 48.0), ('SALES', 16.8)] + [('A', 1), ('B', 5), ('C', None)] """ return _invoke_higher_order_function("MapZipWith", [col1, col2], [f]) @@ -17250,8 +17308,8 @@ def str_to_map( keyValueDelim: Optional["ColumnOrName"] = None, ) -> Column: """ - Creates a map after splitting the text into key/value pairs using delimiters. - Both `pairDelim` and `keyValueDelim` are treated as regular expressions. + Map function: Converts a string into a map after splitting the text into key/value pairs + using delimiters. Both `pairDelim` and `keyValueDelim` are treated as regular expressions. .. versionadded:: 3.5.0 @@ -17260,23 +17318,77 @@ def str_to_map( text : :class:`~pyspark.sql.Column` or str Input column or strings. pairDelim : :class:`~pyspark.sql.Column` or str, optional - delimiter to use to split pair. + Delimiter to use to split pairs. Default is comma (,). keyValueDelim : :class:`~pyspark.sql.Column` or str, optional - delimiter to use to split key/value. + Delimiter to use to split key/value. Default is colon (:). + + Returns + ------- + :class:`~pyspark.sql.Column` + A new column of map type where each string in the original column is converted into a map. Examples -------- - >>> df = spark.createDataFrame([("a:1,b:2,c:3",)], ["e"]) - >>> df.select(str_to_map(df.e, lit(","), lit(":")).alias('r')).collect() - [Row(r={'a': '1', 'b': '2', 'c': '3'})] + Example 1: Using default delimiters + >>> from pyspark.sql import functions as sf >>> df = spark.createDataFrame([("a:1,b:2,c:3",)], ["e"]) - >>> df.select(str_to_map(df.e, lit(",")).alias('r')).collect() - [Row(r={'a': '1', 'b': '2', 'c': '3'})] + >>> df.select(sf.str_to_map(df.e)).show(truncate=False) + +------------------------+ + |str_to_map(e, ,, :) | + +------------------------+ + |{a -> 1, b -> 2, c -> 3}| + +------------------------+ - >>> df = spark.createDataFrame([("a:1,b:2,c:3",)], ["e"]) - >>> df.select(str_to_map(df.e).alias('r')).collect() - [Row(r={'a': '1', 'b': '2', 'c': '3'})] + Example 2: Using custom delimiters + + >>> from pyspark.sql import functions as sf + >>> df = spark.createDataFrame([("a=1;b=2;c=3",)], ["e"]) + >>> df.select(sf.str_to_map(df.e, sf.lit(";"), sf.lit("="))).show(truncate=False) + +------------------------+ + |str_to_map(e, ;, =) | + +------------------------+ + |{a -> 1, b -> 2, c -> 3}| + +------------------------+ + + Example 3: Using different delimiters for different rows + + >>> from pyspark.sql import functions as sf + >>> df = spark.createDataFrame([("a:1,b:2,c:3",), ("d=4;e=5;f=6",)], ["e"]) + >>> df.select(sf.str_to_map(df.e, + ... sf.when(df.e.contains(";"), sf.lit(";")).otherwise(sf.lit(",")), + ... sf.when(df.e.contains("="), sf.lit("=")).otherwise(sf.lit(":"))).alias("str_to_map") + ... ).show(truncate=False) + +------------------------+ + |str_to_map | + +------------------------+ + |{a -> 1, b -> 2, c -> 3}| + |{d -> 4, e -> 5, f -> 6}| + +------------------------+ + + Example 4: Using a column of delimiters + + >>> from pyspark.sql import functions as sf + >>> df = spark.createDataFrame([("a:1,b:2,c:3", ","), ("d=4;e=5;f=6", ";")], ["e", "delim"]) + >>> df.select(sf.str_to_map(df.e, df.delim, sf.lit(":"))).show(truncate=False) + +---------------------------------------+ + |str_to_map(e, delim, :) | + +---------------------------------------+ + |{a -> 1, b -> 2, c -> 3} | + |{d=4 -> NULL, e=5 -> NULL, f=6 -> NULL}| + +---------------------------------------+ + + Example 5: Using a column of key/value delimiters + + >>> from pyspark.sql import functions as sf + >>> df = spark.createDataFrame([("a:1,b:2,c:3", ":"), ("d=4;e=5;f=6", "=")], ["e", "delim"]) + >>> df.select(sf.str_to_map(df.e, sf.lit(","), df.delim)).show(truncate=False) + +------------------------+ + |str_to_map(e, ,, delim) | + +------------------------+ + |{a -> 1, b -> 2, c -> 3}| + |{d -> 4;e=5;f=6} | + +------------------------+ """ if pairDelim is None: pairDelim = lit(",") diff --git a/python/pyspark/sql/tests/test_dataframe.py b/python/pyspark/sql/tests/test_dataframe.py index 407ab22a088c9..1788f1d9fb1ad 100644 --- a/python/pyspark/sql/tests/test_dataframe.py +++ b/python/pyspark/sql/tests/test_dataframe.py @@ -104,6 +104,21 @@ def test_dataframe_star(self): self.assertEqual(df.select(df2["*"]).columns, ["a", "b"]) self.assertEqual(df.select(df3["*"]).columns, ["x", "y"]) + def test_count_star(self): + df1 = self.spark.createDataFrame([{"a": 1}]) + df2 = self.spark.createDataFrame([{"a": 1, "b": "v"}]) + df3 = df2.select(struct("a", "b").alias("s")) + + self.assertEqual(df1.select(count(df1["*"])).columns, ["count(1)"]) + self.assertEqual(df1.select(count(col("*"))).columns, ["count(1)"]) + + self.assertEqual(df2.select(count(df2["*"])).columns, ["count(1)"]) + self.assertEqual(df2.select(count(col("*"))).columns, ["count(1)"]) + + self.assertEqual(df3.select(count(df3["*"])).columns, ["count(1)"]) + self.assertEqual(df3.select(count(col("*"))).columns, ["count(1)"]) + self.assertEqual(df3.select(count(col("s.*"))).columns, ["count(1)"]) + def test_self_join(self): df1 = self.spark.range(10).withColumn("a", lit(0)) df2 = df1.withColumnRenamed("a", "b") diff --git a/python/pyspark/sql/tests/test_group.py b/python/pyspark/sql/tests/test_group.py index 6c84bd7401717..1a9b7d9d836cc 100644 --- a/python/pyspark/sql/tests/test_group.py +++ b/python/pyspark/sql/tests/test_group.py @@ -14,14 +14,23 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import unittest from pyspark.sql import Row from pyspark.sql import functions as sf -from pyspark.testing.sqlutils import ReusedSQLTestCase +from pyspark.testing.sqlutils import ( + ReusedSQLTestCase, + have_pandas, + have_pyarrow, + pandas_requirement_message, + pyarrow_requirement_message, +) from pyspark.testing import assertDataFrameEqual, assertSchemaEqual class GroupTestsMixin: + @unittest.skipIf(not have_pandas, pandas_requirement_message) # type: ignore + @unittest.skipIf(not have_pyarrow, pyarrow_requirement_message) # type: ignore def test_agg_func(self): data = [Row(key=1, value=10), Row(key=1, value=20), Row(key=1, value=30)] df = self.spark.createDataFrame(data) @@ -60,6 +69,8 @@ def test_aggregator(self): # test deprecated countDistinct self.assertEqual(100, g.agg(functions.countDistinct(df.value)).first()[0]) + @unittest.skipIf(not have_pandas, pandas_requirement_message) # type: ignore + @unittest.skipIf(not have_pyarrow, pyarrow_requirement_message) # type: ignore def test_group_by_ordinal(self): spark = self.spark df = spark.createDataFrame( @@ -119,6 +130,8 @@ def test_group_by_ordinal(self): with self.assertRaises(IndexError): df.groupBy(10).agg(sf.sum("b")) + @unittest.skipIf(not have_pandas, pandas_requirement_message) # type: ignore + @unittest.skipIf(not have_pyarrow, pyarrow_requirement_message) # type: ignore def test_order_by_ordinal(self): spark = self.spark df = spark.createDataFrame( diff --git a/python/pyspark/sql/tests/test_udf_profiler.py b/python/pyspark/sql/tests/test_udf_profiler.py index 136f423d0a35c..776d5da88bb27 100644 --- a/python/pyspark/sql/tests/test_udf_profiler.py +++ b/python/pyspark/sql/tests/test_udf_profiler.py @@ -19,11 +19,13 @@ import unittest import os import sys +import warnings from io import StringIO +from typing import Iterator from pyspark import SparkConf from pyspark.sql import SparkSession -from pyspark.sql.functions import udf +from pyspark.sql.functions import udf, pandas_udf from pyspark.profiler import UDFBasicProfiler @@ -101,6 +103,47 @@ def add2(x): df = self.spark.range(10) df.select(add1("id"), add2("id"), add1("id")).collect() + # Unsupported + def exec_pandas_udf_iter_to_iter(self): + import pandas as pd + + @pandas_udf("int") + def iter_to_iter(batch_ser: Iterator[pd.Series]) -> Iterator[pd.Series]: + for ser in batch_ser: + yield ser + 1 + + self.spark.range(10).select(iter_to_iter("id")).collect() + + # Unsupported + def exec_map(self): + import pandas as pd + + def map(pdfs: Iterator[pd.DataFrame]) -> Iterator[pd.DataFrame]: + for pdf in pdfs: + yield pdf[pdf.id == 1] + + df = self.spark.createDataFrame([(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0)], ("id", "v")) + df.mapInPandas(map, schema=df.schema).collect() + + def test_unsupported(self): + with warnings.catch_warnings(record=True) as warns: + warnings.simplefilter("always") + self.exec_pandas_udf_iter_to_iter() + user_warns = [warn.message for warn in warns if isinstance(warn.message, UserWarning)] + self.assertTrue(len(user_warns) > 0) + self.assertTrue( + "Profiling UDFs with iterators input/output is not supported" in str(user_warns[0]) + ) + + with warnings.catch_warnings(record=True) as warns: + warnings.simplefilter("always") + self.exec_map() + user_warns = [warn.message for warn in warns if isinstance(warn.message, UserWarning)] + self.assertTrue(len(user_warns) > 0) + self.assertTrue( + "Profiling UDFs with iterators input/output is not supported" in str(user_warns[0]) + ) + if __name__ == "__main__": from pyspark.sql.tests.test_udf_profiler import * # noqa: F401 diff --git a/python/pyspark/sql/udf.py b/python/pyspark/sql/udf.py index 16605bc12acc7..ca38556431ad9 100644 --- a/python/pyspark/sql/udf.py +++ b/python/pyspark/sql/udf.py @@ -28,7 +28,6 @@ from py4j.java_gateway import JavaObject from pyspark import SparkContext -from pyspark.profiler import Profiler from pyspark.rdd import _prepare_for_python_RDD, PythonEvalType from pyspark.sql.column import Column, _to_java_expr, _to_seq from pyspark.sql.types import ( @@ -403,24 +402,24 @@ def __call__(self, *args: "ColumnOrName", **kwargs: "ColumnOrName") -> Column: for key, value in kwargs.items() ] - profiler: Optional[Profiler] = None - memory_profiler: Optional[Profiler] = None - if sc.profiler_collector: - profiler_enabled = sc._conf.get("spark.python.profile", "false") == "true" - memory_profiler_enabled = sc._conf.get("spark.python.profile.memory", "false") == "true" + profiler_enabled = sc._conf.get("spark.python.profile", "false") == "true" + memory_profiler_enabled = sc._conf.get("spark.python.profile.memory", "false") == "true" + if profiler_enabled or memory_profiler_enabled: # Disable profiling Pandas UDFs with iterators as input/output. - if profiler_enabled or memory_profiler_enabled: - if self.evalType in [ - PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF, - PythonEvalType.SQL_MAP_PANDAS_ITER_UDF, - PythonEvalType.SQL_MAP_ARROW_ITER_UDF, - ]: - profiler_enabled = memory_profiler_enabled = False - warnings.warn( - "Profiling UDFs with iterators input/output is not supported.", - UserWarning, - ) + if self.evalType in [ + PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF, + PythonEvalType.SQL_MAP_PANDAS_ITER_UDF, + PythonEvalType.SQL_MAP_ARROW_ITER_UDF, + ]: + warnings.warn( + "Profiling UDFs with iterators input/output is not supported.", + UserWarning, + ) + judf = self._judf + jUDFExpr = judf.builder(_to_seq(sc, jexprs)) + jPythonUDF = judf.fromUDFExpr(jUDFExpr) + return Column(jPythonUDF) # Disallow enabling two profilers at the same time. if profiler_enabled and memory_profiler_enabled: diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala index bc56afa73d994..2472705d2f541 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala @@ -598,7 +598,7 @@ trait ColumnResolutionHelper extends Logging with DataTypeErrorsBase { // Can not find the target plan node with plan id, e.g. // df1 = spark.createDataFrame([Row(a = 1, b = 2, c = 3)]]) // df2 = spark.createDataFrame([Row(a = 1, b = 2)]]) - // df1.select(df2["*"]) <- illegal reference df2.a + // df1.select(df2["*"]) <- illegal reference df2["*"] throw QueryCompilationErrors.cannotResolveDataFrameColumn(u) ) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala index e839d2c069136..561deacfb72d9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala @@ -22,7 +22,7 @@ import java.util.concurrent.atomic.AtomicLong import scala.util.control.NonFatal -import org.apache.spark.{ErrorMessageFormat, SparkException, SparkThrowable, SparkThrowableHelper} +import org.apache.spark.{ErrorMessageFormat, JobArtifactSet, SparkException, SparkThrowable, SparkThrowableHelper} import org.apache.spark.SparkContext.{SPARK_JOB_DESCRIPTION, SPARK_JOB_INTERRUPT_ON_CANCEL} import org.apache.spark.internal.Logging import org.apache.spark.internal.config.{SPARK_DRIVER_PREFIX, SPARK_EXECUTOR_PREFIX} @@ -255,7 +255,8 @@ object SQLExecution extends Logging { val activeSession = sparkSession val sc = sparkSession.sparkContext val localProps = Utils.cloneProperties(sc.getLocalProperties) - exec.submit(() => { + val artifactState = JobArtifactSet.getCurrentJobArtifactState.orNull + exec.submit(() => JobArtifactSet.withActiveJobArtifactState(artifactState) { val originalSession = SparkSession.getActiveSession val originalLocalProps = sc.getLocalProperties SparkSession.setActiveSession(activeSession) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 467c489a50fd4..51daea76abc5f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -20,7 +20,6 @@ package org.apache.spark.sql.execution.datasources.jdbc import java.sql.{Connection, JDBCType, PreparedStatement, ResultSet, ResultSetMetaData, SQLException} import java.time.{Instant, LocalDate} import java.util -import java.util.Locale import java.util.concurrent.TimeUnit import scala.collection.mutable.ArrayBuffer @@ -635,8 +634,7 @@ object JdbcUtils extends Logging with SQLConfHelper { case ArrayType(et, _) => // remove type length parameters from end of type name - val typeName = getJdbcType(et, dialect).databaseTypeDefinition - .toLowerCase(Locale.ROOT).split("\\(")(0) + val typeName = getJdbcType(et, dialect).databaseTypeDefinition.split("\\(")(0) (stmt: PreparedStatement, row: Row, pos: Int) => val array = conn.createArrayOf( typeName, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala index d5a132c7dd48a..f745e466ed9e2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala @@ -144,16 +144,23 @@ private object DB2Dialect extends JdbcDialect { s"COMMENT ON SCHEMA ${quoteIdentifier(schema)} IS ''" } - override def classifyException(message: String, e: Throwable): AnalysisException = { + override def classifyException( + e: Throwable, + errorClass: String, + messageParameters: Map[String, String], + description: String): AnalysisException = { e match { case sqlException: SQLException => sqlException.getSQLState match { // https://www.ibm.com/docs/en/db2/11.5?topic=messages-sqlstate - case "42893" => throw NonEmptyNamespaceException( - namespace = Array.empty, details = message, cause = Some(e)) - case _ => super.classifyException(message, e) + case "42893" => + throw NonEmptyNamespaceException( + namespace = messageParameters.get("namespace").toArray, + details = sqlException.getMessage, + cause = Some(e)) + case _ => super.classifyException(e, errorClass, messageParameters, description) } - case _ => super.classifyException(message, e) + case _ => super.classifyException(e, errorClass, messageParameters, description) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala index ae3a3addf7bfa..cd151f790adff 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala @@ -28,8 +28,7 @@ import scala.util.control.NonFatal import org.apache.commons.lang3.StringUtils import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, NoSuchIndexException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException, UnresolvedAttribute} -import org.apache.spark.sql.catalyst.util.quoteNameParts +import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, NoSuchIndexException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException} import org.apache.spark.sql.connector.catalog.Identifier import org.apache.spark.sql.connector.catalog.functions.UnboundFunction import org.apache.spark.sql.connector.catalog.index.TableIndex @@ -195,7 +194,11 @@ private[sql] object H2Dialect extends JdbcDialect { (ident.namespace() :+ indexName).map(quoteIdentifier).mkString(".") } - override def classifyException(message: String, e: Throwable): AnalysisException = { + override def classifyException( + e: Throwable, + errorClass: String, + messageParameters: Map[String, String], + description: String): AnalysisException = { e match { case exception: SQLException => // Error codes are from https://www.h2database.com/javadoc/org/h2/api/ErrorCode.html @@ -206,15 +209,16 @@ private[sql] object H2Dialect extends JdbcDialect { val regex = """"((?:[^"\\]|\\[\\"ntbrf])+)"""".r val name = regex.findFirstMatchIn(e.getMessage).get.group(1) val quotedName = org.apache.spark.sql.catalyst.util.quoteIdentifier(name) - throw new TableAlreadyExistsException(errorClass = "TABLE_OR_VIEW_ALREADY_EXISTS", + throw new TableAlreadyExistsException( + errorClass = "TABLE_OR_VIEW_ALREADY_EXISTS", messageParameters = Map("relationName" -> quotedName), cause = Some(e)) // TABLE_OR_VIEW_NOT_FOUND_1 case 42102 => - val quotedName = quoteNameParts(UnresolvedAttribute.parseAttributeName(message)) + val relationName = messageParameters.getOrElse("tableName", "") throw new NoSuchTableException( errorClass = "TABLE_OR_VIEW_NOT_FOUND", - messageParameters = Map("relationName" -> quotedName), + messageParameters = Map("relationName" -> relationName), cause = Some(e)) // SCHEMA_NOT_FOUND_1 case 90079 => @@ -224,25 +228,21 @@ private[sql] object H2Dialect extends JdbcDialect { throw new NoSuchNamespaceException(errorClass = "SCHEMA_NOT_FOUND", messageParameters = Map("schemaName" -> quotedName)) // INDEX_ALREADY_EXISTS_1 - case 42111 => - // The message is: Failed to create index indexName in tableName - val regex = "(?s)Failed to create index (.*) in (.*)".r - val indexName = regex.findFirstMatchIn(message).get.group(1) - val tableName = regex.findFirstMatchIn(message).get.group(2) + case 42111 if errorClass == "FAILED_JDBC.CREATE_INDEX" => + val indexName = messageParameters("indexName") + val tableName = messageParameters("tableName") throw new IndexAlreadyExistsException( indexName = indexName, tableName = tableName, cause = Some(e)) // INDEX_NOT_FOUND_1 - case 42112 => - // The message is: Failed to drop index indexName in tableName - val regex = "(?s)Failed to drop index (.*) in (.*)".r - val indexName = regex.findFirstMatchIn(message).get.group(1) - val tableName = regex.findFirstMatchIn(message).get.group(2) + case 42112 if errorClass == "FAILED_JDBC.DROP_INDEX" => + val indexName = messageParameters("indexName") + val tableName = messageParameters("tableName") throw new NoSuchIndexException(indexName, tableName, cause = Some(e)) case _ => // do nothing } case _ => // do nothing } - super.classifyException(message, e) + super.classifyException(e, errorClass, messageParameters, description) } override def compileExpression(expr: Expression): Option[String] = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala index 9776cff3f7c81..aaee6be24e615 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala @@ -190,15 +190,22 @@ private object MsSqlServerDialect extends JdbcDialect { if (limit > 0) s"TOP ($limit)" else "" } - override def classifyException(message: String, e: Throwable): AnalysisException = { + override def classifyException( + e: Throwable, + errorClass: String, + messageParameters: Map[String, String], + description: String): AnalysisException = { e match { case sqlException: SQLException => sqlException.getErrorCode match { - case 3729 => throw NonEmptyNamespaceException( - namespace = Array.empty, details = message, cause = Some(e)) - case _ => super.classifyException(message, e) + case 3729 => + throw NonEmptyNamespaceException( + namespace = messageParameters.get("namespace").toArray, + details = sqlException.getMessage, + cause = Some(e)) + case _ => super.classifyException(e, errorClass, messageParameters, description) } - case _ => super.classifyException(message, e) + case _ => super.classifyException(e, errorClass, messageParameters, description) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala index dd74c93bc2e19..cbed1d1e6384f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala @@ -270,28 +270,27 @@ private case object MySQLDialect extends JdbcDialect with SQLConfHelper { indexMap.values.toArray } - override def classifyException(message: String, e: Throwable): AnalysisException = { + override def classifyException( + e: Throwable, + errorClass: String, + messageParameters: Map[String, String], + description: String): AnalysisException = { e match { case sqlException: SQLException => sqlException.getErrorCode match { // ER_DUP_KEYNAME - case 1061 => - // The message is: Failed to create index indexName in tableName - val regex = "(?s)Failed to create index (.*) in (.*)".r - val indexName = regex.findFirstMatchIn(message).get.group(1) - val tableName = regex.findFirstMatchIn(message).get.group(2) - throw new IndexAlreadyExistsException( - indexName = indexName, tableName = tableName, cause = Some(e)) - case 1091 => - // The message is: Failed to drop index indexName in tableName - val regex = "(?s)Failed to drop index (.*) in (.*)".r - val indexName = regex.findFirstMatchIn(message).get.group(1) - val tableName = regex.findFirstMatchIn(message).get.group(2) + case 1061 if errorClass == "FAILED_JDBC.CREATE_INDEX" => + val indexName = messageParameters("indexName") + val tableName = messageParameters("tableName") + throw new IndexAlreadyExistsException(indexName, tableName, cause = Some(e)) + case 1091 if errorClass == "FAILED_JDBC.DROP_INDEX" => + val indexName = messageParameters("indexName") + val tableName = messageParameters("tableName") throw new NoSuchIndexException(indexName, tableName, cause = Some(e)) - case _ => super.classifyException(message, e) + case _ => super.classifyException(e, errorClass, messageParameters, description) } case unsupported: UnsupportedOperationException => throw unsupported - case _ => super.classifyException(message, e) + case _ => super.classifyException(e, errorClass, messageParameters, description) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala index 901e66e5efcb9..3eb065a5d4f04 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala @@ -225,42 +225,48 @@ private object PostgresDialect extends JdbcDialect with SQLConfHelper { s"DROP INDEX ${quoteIdentifier(indexName)}" } - override def classifyException(message: String, e: Throwable): AnalysisException = { + // Message pattern defined by postgres specification + private final val pgAlreadyExistsRegex = """(?:.*)relation "(.*)" already exists""".r + + override def classifyException( + e: Throwable, + errorClass: String, + messageParameters: Map[String, String], + description: String): AnalysisException = { e match { case sqlException: SQLException => sqlException.getSQLState match { // https://www.postgresql.org/docs/14/errcodes-appendix.html case "42P07" => - // Message patterns defined at caller sides of spark - val indexRegex = "(?s)Failed to create index (.*) in (.*)".r - val renameRegex = "(?s)Failed table renaming from (.*) to (.*)".r - // Message pattern defined by postgres specification - val pgRegex = """(?:.*)relation "(.*)" already exists""".r - - message match { - case indexRegex(index, table) => - throw new IndexAlreadyExistsException( - indexName = index, tableName = table, cause = Some(e)) - case renameRegex(_, newTable) => - throw QueryCompilationErrors.tableAlreadyExistsError(newTable) - case _ if pgRegex.findFirstMatchIn(sqlException.getMessage).nonEmpty => - val tableName = pgRegex.findFirstMatchIn(sqlException.getMessage).get.group(1) - throw QueryCompilationErrors.tableAlreadyExistsError(tableName) - case _ => super.classifyException(message, e) + if (errorClass == "FAILED_JDBC.CREATE_INDEX") { + throw new IndexAlreadyExistsException( + indexName = messageParameters("indexName"), + tableName = messageParameters("tableName"), + cause = Some(e)) + } else if (errorClass == "FAILED_JDBC.RENAME_TABLE") { + val newTable = messageParameters("newName") + throw QueryCompilationErrors.tableAlreadyExistsError(newTable) + } else { + val tblRegexp = pgAlreadyExistsRegex.findFirstMatchIn(sqlException.getMessage) + if (tblRegexp.nonEmpty) { + throw QueryCompilationErrors.tableAlreadyExistsError(tblRegexp.get.group(1)) + } else { + super.classifyException(e, errorClass, messageParameters, description) + } } - case "42704" => - // The message is: Failed to drop index indexName in tableName - val regex = "(?s)Failed to drop index (.*) in (.*)".r - val indexName = regex.findFirstMatchIn(message).get.group(1) - val tableName = regex.findFirstMatchIn(message).get.group(2) + case "42704" if errorClass == "FAILED_JDBC.DROP_INDEX" => + val indexName = messageParameters("indexName") + val tableName = messageParameters("tableName") throw new NoSuchIndexException(indexName, tableName, cause = Some(e)) case "2BP01" => throw NonEmptyNamespaceException( - namespace = Array.empty, details = message, cause = Some(e)) - case _ => super.classifyException(message, e) + namespace = messageParameters.get("namespace").toArray, + details = sqlException.getMessage, + cause = Some(e)) + case _ => super.classifyException(e, errorClass, messageParameters, description) } case unsupported: UnsupportedOperationException => throw unsupported - case _ => super.classifyException(message, e) + case _ => super.classifyException(e, errorClass, messageParameters, description) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala index 05b3787d0ff2c..a3990f3cfbb35 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala @@ -2980,8 +2980,8 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel }, errorClass = "INDEX_ALREADY_EXISTS", parameters = Map( - "indexName" -> "people_index", - "tableName" -> "test.people" + "indexName" -> "`people_index`", + "tableName" -> "`test`.`people`" ) ) assert(jdbcTable.indexExists("people_index")) @@ -2997,7 +2997,7 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel sql(s"DROP INDEX people_index ON TABLE h2.test.people") }, errorClass = "INDEX_NOT_FOUND", - parameters = Map("indexName" -> "people_index", "tableName" -> "test.people") + parameters = Map("indexName" -> "`people_index`", "tableName" -> "`test`.`people`") ) assert(jdbcTable.indexExists("people_index") == false) val indexes3 = jdbcTable.listIndexes() diff --git a/sql/hive/benchmarks/OrcReadBenchmark-jdk21-results.txt b/sql/hive/benchmarks/OrcReadBenchmark-jdk21-results.txt index 7f4fb739b3cb5..3dbf35049ee33 100644 --- a/sql/hive/benchmarks/OrcReadBenchmark-jdk21-results.txt +++ b/sql/hive/benchmarks/OrcReadBenchmark-jdk21-results.txt @@ -6,49 +6,49 @@ OpenJDK 64-Bit Server VM 21.0.1+12-LTS on Linux 5.15.0-1053-azure AMD EPYC 7763 64-Core Processor SQL Single TINYINT Column Scan: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Hive built-in ORC 602 700 124 26.1 38.3 1.0X -Native ORC MR 733 760 31 21.5 46.6 0.8X -Native ORC Vectorized 89 117 15 177.5 5.6 6.8X +Hive built-in ORC 640 693 75 24.6 40.7 1.0X +Native ORC MR 719 733 24 21.9 45.7 0.9X +Native ORC Vectorized 90 112 15 175.3 5.7 7.1X OpenJDK 64-Bit Server VM 21.0.1+12-LTS on Linux 5.15.0-1053-azure AMD EPYC 7763 64-Core Processor SQL Single SMALLINT Column Scan: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Hive built-in ORC 653 708 81 24.1 41.5 1.0X -Native ORC MR 728 753 22 21.6 46.3 0.9X -Native ORC Vectorized 76 91 11 206.6 4.8 8.6X +Hive built-in ORC 633 698 79 24.9 40.2 1.0X +Native ORC MR 726 757 32 21.7 46.1 0.9X +Native ORC Vectorized 74 91 12 212.5 4.7 8.5X OpenJDK 64-Bit Server VM 21.0.1+12-LTS on Linux 5.15.0-1053-azure AMD EPYC 7763 64-Core Processor SQL Single INT Column Scan: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Hive built-in ORC 753 774 21 20.9 47.9 1.0X -Native ORC MR 853 860 12 18.4 54.3 0.9X -Native ORC Vectorized 91 106 21 173.3 5.8 8.3X +Hive built-in ORC 732 737 5 21.5 46.5 1.0X +Native ORC MR 832 872 38 18.9 52.9 0.9X +Native ORC Vectorized 94 116 29 168.0 6.0 7.8X OpenJDK 64-Bit Server VM 21.0.1+12-LTS on Linux 5.15.0-1053-azure AMD EPYC 7763 64-Core Processor SQL Single BIGINT Column Scan: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Hive built-in ORC 807 831 24 19.5 51.3 1.0X -Native ORC MR 859 871 11 18.3 54.6 0.9X -Native ORC Vectorized 109 133 24 144.8 6.9 7.4X +Hive built-in ORC 763 778 18 20.6 48.5 1.0X +Native ORC MR 844 858 14 18.6 53.7 0.9X +Native ORC Vectorized 110 134 26 143.5 7.0 7.0X OpenJDK 64-Bit Server VM 21.0.1+12-LTS on Linux 5.15.0-1053-azure AMD EPYC 7763 64-Core Processor SQL Single FLOAT Column Scan: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Hive built-in ORC 807 844 33 19.5 51.3 1.0X -Native ORC MR 878 891 16 17.9 55.8 0.9X -Native ORC Vectorized 114 134 25 137.9 7.3 7.1X +Hive built-in ORC 852 871 16 18.5 54.2 1.0X +Native ORC MR 848 853 4 18.6 53.9 1.0X +Native ORC Vectorized 163 178 24 96.6 10.4 5.2X OpenJDK 64-Bit Server VM 21.0.1+12-LTS on Linux 5.15.0-1053-azure AMD EPYC 7763 64-Core Processor SQL Single DOUBLE Column Scan: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Hive built-in ORC 794 820 23 19.8 50.5 1.0X -Native ORC MR 854 897 38 18.4 54.3 0.9X -Native ORC Vectorized 136 163 29 115.9 8.6 5.9X +Hive built-in ORC 860 885 27 18.3 54.7 1.0X +Native ORC MR 948 968 31 16.6 60.3 0.9X +Native ORC Vectorized 242 258 15 65.0 15.4 3.6X ================================================================================================ @@ -59,9 +59,9 @@ OpenJDK 64-Bit Server VM 21.0.1+12-LTS on Linux 5.15.0-1053-azure AMD EPYC 7763 64-Core Processor Int and String Scan: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Hive built-in ORC 1554 1588 49 6.7 148.2 1.0X -Native ORC MR 1461 1472 15 7.2 139.3 1.1X -Native ORC Vectorized 569 586 15 18.4 54.2 2.7X +Hive built-in ORC 1814 1836 31 5.8 173.0 1.0X +Native ORC MR 1573 1633 84 6.7 150.0 1.2X +Native ORC Vectorized 763 775 10 13.7 72.8 2.4X ================================================================================================ @@ -72,15 +72,15 @@ OpenJDK 64-Bit Server VM 21.0.1+12-LTS on Linux 5.15.0-1053-azure AMD EPYC 7763 64-Core Processor Partitioned Table: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Data column - Hive built-in ORC 843 903 53 18.7 53.6 1.0X -Data column - Native ORC MR 1093 1117 34 14.4 69.5 0.8X -Data column - Native ORC Vectorized 110 130 15 143.2 7.0 7.7X -Partition column - Hive built-in ORC 619 655 40 25.4 39.3 1.4X -Partition column - Native ORC MR 635 645 8 24.8 40.4 1.3X -Partition column - Native ORC Vectorized 31 44 8 502.7 2.0 26.9X -Both columns - Hive built-in ORC 876 916 46 18.0 55.7 1.0X -Both columns - Native ORC MR 1077 1079 2 14.6 68.5 0.8X -Both columns - Native ORC Vectorized 123 143 25 127.9 7.8 6.8X +Data column - Hive built-in ORC 1028 1035 9 15.3 65.4 1.0X +Data column - Native ORC MR 1016 1023 10 15.5 64.6 1.0X +Data column - Native ORC Vectorized 110 135 11 143.4 7.0 9.4X +Partition column - Hive built-in ORC 665 677 19 23.7 42.3 1.5X +Partition column - Native ORC MR 553 573 23 28.4 35.2 1.9X +Partition column - Native ORC Vectorized 32 43 8 491.4 2.0 32.1X +Both columns - Hive built-in ORC 919 939 33 17.1 58.4 1.1X +Both columns - Native ORC MR 1032 1040 12 15.2 65.6 1.0X +Both columns - Native ORC Vectorized 121 144 21 129.8 7.7 8.5X ================================================================================================ @@ -91,9 +91,9 @@ OpenJDK 64-Bit Server VM 21.0.1+12-LTS on Linux 5.15.0-1053-azure AMD EPYC 7763 64-Core Processor Repeated String: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Hive built-in ORC 811 856 39 12.9 77.4 1.0X -Native ORC MR 770 813 65 13.6 73.4 1.1X -Native ORC Vectorized 121 137 20 86.7 11.5 6.7X +Hive built-in ORC 873 890 21 12.0 83.3 1.0X +Native ORC MR 789 858 75 13.3 75.3 1.1X +Native ORC Vectorized 121 134 16 86.8 11.5 7.2X ================================================================================================ @@ -104,25 +104,25 @@ OpenJDK 64-Bit Server VM 21.0.1+12-LTS on Linux 5.15.0-1053-azure AMD EPYC 7763 64-Core Processor String with Nulls Scan (0.0%): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Hive built-in ORC 1481 1507 37 7.1 141.3 1.0X -Native ORC MR 1372 1398 37 7.6 130.8 1.1X -Native ORC Vectorized 373 380 7 28.1 35.6 4.0X +Hive built-in ORC 1723 1732 13 6.1 164.3 1.0X +Native ORC MR 1311 1327 23 8.0 125.0 1.3X +Native ORC Vectorized 370 377 6 28.4 35.3 4.7X OpenJDK 64-Bit Server VM 21.0.1+12-LTS on Linux 5.15.0-1053-azure AMD EPYC 7763 64-Core Processor String with Nulls Scan (50.0%): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Hive built-in ORC 1349 1350 2 7.8 128.6 1.0X -Native ORC MR 1240 1241 2 8.5 118.2 1.1X -Native ORC Vectorized 361 390 17 29.0 34.4 3.7X +Hive built-in ORC 1297 1327 43 8.1 123.7 1.0X +Native ORC MR 1145 1156 15 9.2 109.2 1.1X +Native ORC Vectorized 391 419 19 26.8 37.3 3.3X OpenJDK 64-Bit Server VM 21.0.1+12-LTS on Linux 5.15.0-1053-azure AMD EPYC 7763 64-Core Processor String with Nulls Scan (95.0%): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Hive built-in ORC 759 796 37 13.8 72.4 1.0X -Native ORC MR 751 763 10 14.0 71.6 1.0X -Native ORC Vectorized 146 167 22 71.9 13.9 5.2X +Hive built-in ORC 741 766 21 14.1 70.7 1.0X +Native ORC MR 743 751 8 14.1 70.9 1.0X +Native ORC Vectorized 154 175 20 67.9 14.7 4.8X ================================================================================================ @@ -133,25 +133,25 @@ OpenJDK 64-Bit Server VM 21.0.1+12-LTS on Linux 5.15.0-1053-azure AMD EPYC 7763 64-Core Processor Single Column Scan from 100 columns: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Hive built-in ORC 589 615 31 1.8 562.0 1.0X -Native ORC MR 92 119 25 11.4 87.5 6.4X -Native ORC Vectorized 37 45 8 28.0 35.7 15.7X +Hive built-in ORC 571 611 43 1.8 544.5 1.0X +Native ORC MR 91 102 8 11.5 86.8 6.3X +Native ORC Vectorized 39 48 6 27.0 37.0 14.7X OpenJDK 64-Bit Server VM 21.0.1+12-LTS on Linux 5.15.0-1053-azure AMD EPYC 7763 64-Core Processor Single Column Scan from 200 columns: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Hive built-in ORC 1104 1115 15 0.9 1052.7 1.0X -Native ORC MR 102 115 8 10.3 97.2 10.8X -Native ORC Vectorized 52 64 9 20.0 49.9 21.1X +Hive built-in ORC 1066 1076 15 1.0 1016.6 1.0X +Native ORC MR 102 114 9 10.3 96.9 10.5X +Native ORC Vectorized 50 63 8 21.0 47.7 21.3X OpenJDK 64-Bit Server VM 21.0.1+12-LTS on Linux 5.15.0-1053-azure AMD EPYC 7763 64-Core Processor Single Column Scan from 300 columns: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Hive built-in ORC 1609 1627 26 0.7 1534.1 1.0X -Native ORC MR 114 130 12 9.2 108.5 14.1X -Native ORC Vectorized 62 72 8 17.0 58.8 26.1X +Hive built-in ORC 1532 1562 42 0.7 1461.1 1.0X +Native ORC MR 114 124 7 9.2 109.0 13.4X +Native ORC Vectorized 62 72 9 17.0 59.0 24.8X ================================================================================================ @@ -162,33 +162,33 @@ OpenJDK 64-Bit Server VM 21.0.1+12-LTS on Linux 5.15.0-1053-azure AMD EPYC 7763 64-Core Processor Single Struct Column Scan with 10 Fields: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Hive built-in ORC 520 553 22 2.0 496.0 1.0X -Native ORC MR 223 243 23 4.7 212.9 2.3X -Native ORC Vectorized 101 140 37 10.4 96.5 5.1X +Hive built-in ORC 447 499 32 2.3 425.8 1.0X +Native ORC MR 224 267 41 4.7 213.7 2.0X +Native ORC Vectorized 99 115 21 10.6 94.5 4.5X OpenJDK 64-Bit Server VM 21.0.1+12-LTS on Linux 5.15.0-1053-azure AMD EPYC 7763 64-Core Processor Single Struct Column Scan with 100 Fields: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------- -Hive built-in ORC 3843 4060 306 0.3 3665.3 1.0X -Native ORC MR 1654 1699 64 0.6 1577.6 2.3X -Native ORC Vectorized 841 885 64 1.2 802.2 4.6X +Hive built-in ORC 3468 3546 110 0.3 3307.7 1.0X +Native ORC MR 1623 1655 47 0.6 1547.4 2.1X +Native ORC Vectorized 795 868 85 1.3 757.8 4.4X OpenJDK 64-Bit Server VM 21.0.1+12-LTS on Linux 5.15.0-1053-azure AMD EPYC 7763 64-Core Processor Single Struct Column Scan with 300 Fields: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------- -Hive built-in ORC 11918 12099 257 0.1 11365.7 1.0X -Native ORC MR 5484 5502 26 0.2 5229.7 2.2X -Native ORC Vectorized 5525 5540 21 0.2 5269.3 2.2X +Hive built-in ORC 9046 9092 66 0.1 8626.7 1.0X +Native ORC MR 6233 6397 232 0.2 5944.5 1.5X +Native ORC Vectorized 5378 5407 41 0.2 5129.1 1.7X OpenJDK 64-Bit Server VM 21.0.1+12-LTS on Linux 5.15.0-1053-azure AMD EPYC 7763 64-Core Processor Single Struct Column Scan with 600 Fields: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------- -Hive built-in ORC 22249 22826 815 0.0 21218.5 1.0X -Native ORC MR 12708 12790 117 0.1 12119.2 1.8X -Native ORC Vectorized 12895 12918 32 0.1 12297.5 1.7X +Hive built-in ORC 22437 22692 361 0.0 21397.7 1.0X +Native ORC MR 12624 12694 99 0.1 12039.1 1.8X +Native ORC Vectorized 12680 12860 255 0.1 12092.1 1.8X ================================================================================================ @@ -199,24 +199,24 @@ OpenJDK 64-Bit Server VM 21.0.1+12-LTS on Linux 5.15.0-1053-azure AMD EPYC 7763 64-Core Processor Nested Struct Scan with 10 Elements, 10 Fields: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------------ -Hive built-in ORC 4172 4206 47 0.3 3979.1 1.0X -Native ORC MR 2246 2256 14 0.5 2141.6 1.9X -Native ORC Vectorized 554 580 30 1.9 528.4 7.5X +Hive built-in ORC 3599 3644 64 0.3 3432.4 1.0X +Native ORC MR 1912 1953 58 0.5 1823.8 1.9X +Native ORC Vectorized 603 629 35 1.7 574.7 6.0X OpenJDK 64-Bit Server VM 21.0.1+12-LTS on Linux 5.15.0-1053-azure AMD EPYC 7763 64-Core Processor Nested Struct Scan with 30 Elements, 10 Fields: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------------ -Hive built-in ORC 10996 11076 114 0.1 10486.3 1.0X -Native ORC MR 4921 5015 132 0.2 4693.2 2.2X -Native ORC Vectorized 1484 1524 57 0.7 1415.0 7.4X +Hive built-in ORC 9213 9334 171 0.1 8786.1 1.0X +Native ORC MR 4514 4529 20 0.2 4305.0 2.0X +Native ORC Vectorized 1521 1550 41 0.7 1450.8 6.1X OpenJDK 64-Bit Server VM 21.0.1+12-LTS on Linux 5.15.0-1053-azure AMD EPYC 7763 64-Core Processor Nested Struct Scan with 10 Elements, 30 Fields: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------------ -Hive built-in ORC 11315 11346 43 0.1 10791.2 1.0X -Native ORC MR 5636 5742 150 0.2 5374.9 2.0X -Native ORC Vectorized 1852 1914 88 0.6 1766.0 6.1X +Hive built-in ORC 9739 9740 2 0.1 9287.4 1.0X +Native ORC MR 5553 5613 84 0.2 5296.0 1.8X +Native ORC Vectorized 2090 2108 26 0.5 1993.1 4.7X diff --git a/sql/hive/benchmarks/OrcReadBenchmark-results.txt b/sql/hive/benchmarks/OrcReadBenchmark-results.txt index db9a2a17813cf..1b6f99d51dfb3 100644 --- a/sql/hive/benchmarks/OrcReadBenchmark-results.txt +++ b/sql/hive/benchmarks/OrcReadBenchmark-results.txt @@ -6,49 +6,49 @@ OpenJDK 64-Bit Server VM 17.0.9+9-LTS on Linux 5.15.0-1053-azure AMD EPYC 7763 64-Core Processor SQL Single TINYINT Column Scan: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Hive built-in ORC 661 739 72 23.8 42.0 1.0X -Native ORC MR 814 822 11 19.3 51.7 0.8X -Native ORC Vectorized 85 107 10 184.4 5.4 7.8X +Hive built-in ORC 713 751 35 22.1 45.3 1.0X +Native ORC MR 740 774 29 21.2 47.1 1.0X +Native ORC Vectorized 91 109 14 173.0 5.8 7.8X OpenJDK 64-Bit Server VM 17.0.9+9-LTS on Linux 5.15.0-1053-azure AMD EPYC 7763 64-Core Processor SQL Single SMALLINT Column Scan: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Hive built-in ORC 686 741 76 22.9 43.6 1.0X -Native ORC MR 746 770 22 21.1 47.4 0.9X -Native ORC Vectorized 79 100 13 198.4 5.0 8.7X +Hive built-in ORC 674 725 86 23.3 42.9 1.0X +Native ORC MR 752 776 30 20.9 47.8 0.9X +Native ORC Vectorized 75 97 14 209.9 4.8 9.0X OpenJDK 64-Bit Server VM 17.0.9+9-LTS on Linux 5.15.0-1053-azure AMD EPYC 7763 64-Core Processor SQL Single INT Column Scan: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Hive built-in ORC 713 724 10 22.1 45.3 1.0X -Native ORC MR 756 787 28 20.8 48.1 0.9X -Native ORC Vectorized 76 90 8 206.6 4.8 9.4X +Hive built-in ORC 643 657 14 24.5 40.9 1.0X +Native ORC MR 837 857 27 18.8 53.2 0.8X +Native ORC Vectorized 78 91 11 200.6 5.0 8.2X OpenJDK 64-Bit Server VM 17.0.9+9-LTS on Linux 5.15.0-1053-azure AMD EPYC 7763 64-Core Processor SQL Single BIGINT Column Scan: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Hive built-in ORC 736 741 4 21.4 46.8 1.0X -Native ORC MR 795 798 5 19.8 50.5 0.9X -Native ORC Vectorized 106 126 17 148.0 6.8 6.9X +Hive built-in ORC 675 683 8 23.3 42.9 1.0X +Native ORC MR 885 896 17 17.8 56.3 0.8X +Native ORC Vectorized 110 128 14 143.3 7.0 6.2X OpenJDK 64-Bit Server VM 17.0.9+9-LTS on Linux 5.15.0-1053-azure AMD EPYC 7763 64-Core Processor SQL Single FLOAT Column Scan: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Hive built-in ORC 864 867 3 18.2 54.9 1.0X -Native ORC MR 806 829 23 19.5 51.3 1.1X -Native ORC Vectorized 129 144 11 122.2 8.2 6.7X +Hive built-in ORC 753 766 14 20.9 47.9 1.0X +Native ORC MR 906 935 43 17.4 57.6 0.8X +Native ORC Vectorized 163 176 14 96.3 10.4 4.6X OpenJDK 64-Bit Server VM 17.0.9+9-LTS on Linux 5.15.0-1053-azure AMD EPYC 7763 64-Core Processor SQL Single DOUBLE Column Scan: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Hive built-in ORC 776 793 23 20.3 49.4 1.0X -Native ORC MR 831 844 13 18.9 52.8 0.9X -Native ORC Vectorized 139 150 11 112.8 8.9 5.6X +Hive built-in ORC 814 831 27 19.3 51.7 1.0X +Native ORC MR 935 950 17 16.8 59.5 0.9X +Native ORC Vectorized 232 244 15 67.9 14.7 3.5X ================================================================================================ @@ -59,9 +59,9 @@ OpenJDK 64-Bit Server VM 17.0.9+9-LTS on Linux 5.15.0-1053-azure AMD EPYC 7763 64-Core Processor Int and String Scan: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Hive built-in ORC 1402 1426 35 7.5 133.7 1.0X -Native ORC MR 1383 1391 11 7.6 131.9 1.0X -Native ORC Vectorized 521 552 33 20.1 49.7 2.7X +Hive built-in ORC 1636 1650 19 6.4 156.0 1.0X +Native ORC MR 1622 1622 1 6.5 154.7 1.0X +Native ORC Vectorized 768 773 8 13.7 73.2 2.1X ================================================================================================ @@ -72,15 +72,15 @@ OpenJDK 64-Bit Server VM 17.0.9+9-LTS on Linux 5.15.0-1053-azure AMD EPYC 7763 64-Core Processor Partitioned Table: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Data column - Hive built-in ORC 823 862 43 19.1 52.3 1.0X -Data column - Native ORC MR 973 980 9 16.2 61.9 0.8X -Data column - Native ORC Vectorized 111 130 14 141.6 7.1 7.4X -Partition column - Hive built-in ORC 628 633 5 25.1 39.9 1.3X -Partition column - Native ORC MR 657 679 42 23.9 41.8 1.3X -Partition column - Native ORC Vectorized 32 42 6 493.3 2.0 25.8X -Both columns - Hive built-in ORC 909 913 4 17.3 57.8 0.9X -Both columns - Native ORC MR 1043 1050 10 15.1 66.3 0.8X -Both columns - Native ORC Vectorized 126 146 17 124.5 8.0 6.5X +Data column - Hive built-in ORC 789 801 16 19.9 50.1 1.0X +Data column - Native ORC MR 982 997 14 16.0 62.4 0.8X +Data column - Native ORC Vectorized 113 124 10 139.3 7.2 7.0X +Partition column - Hive built-in ORC 579 588 18 27.2 36.8 1.4X +Partition column - Native ORC MR 658 682 28 23.9 41.9 1.2X +Partition column - Native ORC Vectorized 31 40 6 500.6 2.0 25.1X +Both columns - Hive built-in ORC 840 841 1 18.7 53.4 0.9X +Both columns - Native ORC MR 1069 1131 88 14.7 67.9 0.7X +Both columns - Native ORC Vectorized 126 135 14 125.0 8.0 6.3X ================================================================================================ @@ -91,9 +91,9 @@ OpenJDK 64-Bit Server VM 17.0.9+9-LTS on Linux 5.15.0-1053-azure AMD EPYC 7763 64-Core Processor Repeated String: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Hive built-in ORC 856 868 11 12.2 81.7 1.0X -Native ORC MR 748 757 12 14.0 71.3 1.1X -Native ORC Vectorized 125 134 7 83.9 11.9 6.9X +Hive built-in ORC 786 796 13 13.3 75.0 1.0X +Native ORC MR 789 790 0 13.3 75.3 1.0X +Native ORC Vectorized 124 132 14 84.5 11.8 6.3X ================================================================================================ @@ -104,25 +104,25 @@ OpenJDK 64-Bit Server VM 17.0.9+9-LTS on Linux 5.15.0-1053-azure AMD EPYC 7763 64-Core Processor String with Nulls Scan (0.0%): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Hive built-in ORC 1408 1418 14 7.4 134.3 1.0X -Native ORC MR 1260 1265 8 8.3 120.1 1.1X -Native ORC Vectorized 304 322 14 34.5 29.0 4.6X +Hive built-in ORC 1455 1470 22 7.2 138.8 1.0X +Native ORC MR 1357 1375 27 7.7 129.4 1.1X +Native ORC Vectorized 379 390 11 27.7 36.1 3.8X OpenJDK 64-Bit Server VM 17.0.9+9-LTS on Linux 5.15.0-1053-azure AMD EPYC 7763 64-Core Processor String with Nulls Scan (50.0%): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Hive built-in ORC 1193 1214 30 8.8 113.8 1.0X -Native ORC MR 1115 1120 7 9.4 106.4 1.1X -Native ORC Vectorized 327 364 37 32.1 31.1 3.7X +Hive built-in ORC 1209 1231 31 8.7 115.3 1.0X +Native ORC MR 1231 1236 6 8.5 117.4 1.0X +Native ORC Vectorized 412 431 22 25.4 39.3 2.9X OpenJDK 64-Bit Server VM 17.0.9+9-LTS on Linux 5.15.0-1053-azure AMD EPYC 7763 64-Core Processor String with Nulls Scan (95.0%): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Hive built-in ORC 781 829 81 13.4 74.5 1.0X -Native ORC MR 787 789 4 13.3 75.0 1.0X -Native ORC Vectorized 142 169 20 73.9 13.5 5.5X +Hive built-in ORC 727 751 23 14.4 69.3 1.0X +Native ORC MR 773 786 12 13.6 73.8 0.9X +Native ORC Vectorized 154 174 16 68.0 14.7 4.7X ================================================================================================ @@ -133,25 +133,25 @@ OpenJDK 64-Bit Server VM 17.0.9+9-LTS on Linux 5.15.0-1053-azure AMD EPYC 7763 64-Core Processor Single Column Scan from 100 columns: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Hive built-in ORC 417 450 29 2.5 397.7 1.0X -Native ORC MR 89 101 9 11.7 85.2 4.7X -Native ORC Vectorized 38 45 6 27.9 35.8 11.1X +Hive built-in ORC 552 585 30 1.9 526.9 1.0X +Native ORC MR 87 95 7 12.0 83.2 6.3X +Native ORC Vectorized 37 45 6 28.5 35.1 15.0X OpenJDK 64-Bit Server VM 17.0.9+9-LTS on Linux 5.15.0-1053-azure AMD EPYC 7763 64-Core Processor Single Column Scan from 200 columns: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Hive built-in ORC 793 809 16 1.3 756.2 1.0X -Native ORC MR 105 122 12 10.0 99.8 7.6X -Native ORC Vectorized 55 73 11 19.2 52.2 14.5X +Hive built-in ORC 1028 1032 6 1.0 980.2 1.0X +Native ORC MR 101 118 13 10.3 96.7 10.1X +Native ORC Vectorized 52 61 5 20.0 49.9 19.6X OpenJDK 64-Bit Server VM 17.0.9+9-LTS on Linux 5.15.0-1053-azure AMD EPYC 7763 64-Core Processor Single Column Scan from 300 columns: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Hive built-in ORC 1155 1175 28 0.9 1101.9 1.0X -Native ORC MR 116 138 16 9.1 110.4 10.0X -Native ORC Vectorized 65 76 10 16.2 61.8 17.8X +Hive built-in ORC 1511 1513 3 0.7 1440.8 1.0X +Native ORC MR 120 135 11 8.7 114.7 12.6X +Native ORC Vectorized 63 80 13 16.7 60.0 24.0X ================================================================================================ @@ -162,33 +162,33 @@ OpenJDK 64-Bit Server VM 17.0.9+9-LTS on Linux 5.15.0-1053-azure AMD EPYC 7763 64-Core Processor Single Struct Column Scan with 10 Fields: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Hive built-in ORC 610 619 13 1.7 581.3 1.0X -Native ORC MR 292 312 23 3.6 278.2 2.1X -Native ORC Vectorized 109 145 19 9.6 104.1 5.6X +Hive built-in ORC 573 612 51 1.8 546.1 1.0X +Native ORC MR 215 227 13 4.9 205.0 2.7X +Native ORC Vectorized 96 104 9 10.9 91.4 6.0X OpenJDK 64-Bit Server VM 17.0.9+9-LTS on Linux 5.15.0-1053-azure AMD EPYC 7763 64-Core Processor Single Struct Column Scan with 100 Fields: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------- -Hive built-in ORC 5028 5055 38 0.2 4794.7 1.0X -Native ORC MR 1898 1953 78 0.6 1810.3 2.6X -Native ORC Vectorized 1127 1137 15 0.9 1074.4 4.5X +Hive built-in ORC 4154 4167 18 0.3 3961.8 1.0X +Native ORC MR 1588 1623 50 0.7 1514.4 2.6X +Native ORC Vectorized 1027 1082 78 1.0 979.5 4.0X OpenJDK 64-Bit Server VM 17.0.9+9-LTS on Linux 5.15.0-1053-azure AMD EPYC 7763 64-Core Processor Single Struct Column Scan with 300 Fields: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------- -Hive built-in ORC 14325 14352 39 0.1 13661.2 1.0X -Native ORC MR 5066 5078 16 0.2 4831.8 2.8X -Native ORC Vectorized 5127 5211 118 0.2 4889.6 2.8X +Hive built-in ORC 12060 12116 79 0.1 11501.7 1.0X +Native ORC MR 4947 5126 253 0.2 4717.6 2.4X +Native ORC Vectorized 6097 6098 2 0.2 5814.5 2.0X OpenJDK 64-Bit Server VM 17.0.9+9-LTS on Linux 5.15.0-1053-azure AMD EPYC 7763 64-Core Processor Single Struct Column Scan with 600 Fields: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------- -Hive built-in ORC 27081 27119 54 0.0 25826.3 1.0X -Native ORC MR 11845 11877 45 0.1 11296.5 2.3X -Native ORC Vectorized 11943 12080 194 0.1 11389.5 2.3X +Hive built-in ORC 26694 26816 172 0.0 25457.7 1.0X +Native ORC MR 11653 11761 153 0.1 11112.7 2.3X +Native ORC Vectorized 12045 12302 364 0.1 11487.0 2.2X ================================================================================================ @@ -199,24 +199,24 @@ OpenJDK 64-Bit Server VM 17.0.9+9-LTS on Linux 5.15.0-1053-azure AMD EPYC 7763 64-Core Processor Nested Struct Scan with 10 Elements, 10 Fields: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------------ -Hive built-in ORC 4996 5012 23 0.2 4764.8 1.0X -Native ORC MR 1905 1908 4 0.6 1816.9 2.6X -Native ORC Vectorized 630 654 30 1.7 600.8 7.9X +Hive built-in ORC 4545 4792 349 0.2 4334.3 1.0X +Native ORC MR 2351 2356 7 0.4 2242.4 1.9X +Native ORC Vectorized 661 671 17 1.6 630.4 6.9X OpenJDK 64-Bit Server VM 17.0.9+9-LTS on Linux 5.15.0-1053-azure AMD EPYC 7763 64-Core Processor Nested Struct Scan with 30 Elements, 10 Fields: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------------ -Hive built-in ORC 14567 15168 851 0.1 13892.1 1.0X -Native ORC MR 4664 4728 90 0.2 4447.8 3.1X -Native ORC Vectorized 1600 1611 15 0.7 1526.3 9.1X +Hive built-in ORC 13950 14004 77 0.1 13303.4 1.0X +Native ORC MR 4692 4693 1 0.2 4475.1 3.0X +Native ORC Vectorized 1530 1532 3 0.7 1459.4 9.1X OpenJDK 64-Bit Server VM 17.0.9+9-LTS on Linux 5.15.0-1053-azure AMD EPYC 7763 64-Core Processor Nested Struct Scan with 10 Elements, 30 Fields: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------------ -Hive built-in ORC 14291 14330 55 0.1 13628.9 1.0X -Native ORC MR 6182 6236 77 0.2 5895.3 2.3X -Native ORC Vectorized 2126 2227 142 0.5 2027.6 6.7X +Hive built-in ORC 11597 11978 540 0.1 11059.6 1.0X +Native ORC MR 5463 5600 193 0.2 5209.9 2.1X +Native ORC Vectorized 1924 2042 168 0.5 1834.6 6.0X diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala index 32fbd4abdbfde..870e71e17cda0 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala @@ -351,4 +351,28 @@ class HiveOrcSourceSuite extends OrcSuite with TestHiveSingleton { val df = readResourceOrcFile("test-data/TestStringDictionary.testRowIndex.orc") assert(df.where("str < 'row 001000'").count() === 1000) } + + Seq("NONE", "SNAPPY", "ZLIB", "LZ4", "LZO", "ZSTD").foreach { compression => + test(s"SPARK-46742: Read and write with $compression compressions") { + Seq(true, false).foreach { convertMetastore => + withSQLConf(HiveUtils.CONVERT_METASTORE_ORC.key -> s"$convertMetastore") { + withTempDir { dir => + withTable("orc_tbl1") { + val orcTblStatement1 = + s""" + |CREATE TABLE orc_tbl1( + | c1 int, + | c2 string) + |STORED AS orc + |TBLPROPERTIES ("orc.compress"="$compression") + |LOCATION '${s"${dir.getCanonicalPath}"}'""".stripMargin + sql(orcTblStatement1) + sql("INSERT INTO TABLE orc_tbl1 VALUES (1, 'orc1'), (2, 'orc2')") + checkAnswer(sql("SELECT * FROM orc_tbl1"), (1 to 2).map(i => Row(i, s"orc$i"))) + } + } + } + } + } + } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcReadBenchmark.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcReadBenchmark.scala index c6ff793141019..a1095ce58a061 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcReadBenchmark.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcReadBenchmark.scala @@ -25,7 +25,6 @@ import org.apache.spark.SparkConf import org.apache.spark.benchmark.Benchmark import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.execution.benchmark.SqlBasedBenchmark -import org.apache.spark.sql.execution.datasources.orc.OrcCompressionCodec import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -47,7 +46,6 @@ object OrcReadBenchmark extends SqlBasedBenchmark { override def getSparkSession: SparkSession = { val conf = new SparkConf() - conf.set("orc.compression", OrcCompressionCodec.SNAPPY.name()) val sparkSession = SparkSession.builder() .master("local[1]")