Skip to content

Conversation

@pull
Copy link

@pull pull bot commented Oct 28, 2022

See Commits and Changes for more details.


Created by pull[bot]

Can you help keep this open source service alive? 💖 Please sponsor : )

dongjoon-hyun and others added 6 commits October 28, 2022 19:05
### What changes were proposed in this pull request?

This is a follow-up of #37671.

### Why are the changes needed?

Since #37671 added `openpyxl` for PySpark test environments and re-enabled `test_to_excel` test, we need to add it to `requirements.txt` as PySpark test dependency explicitly.

### Does this PR introduce _any_ user-facing change?

No. This is a test dependency.

### How was this patch tested?

Manually.

Closes #38425 from dongjoon-hyun/SPARK-40229.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Yikun Jiang <[email protected]>
…lass` by reusing the `SparkFunSuite#checkError`

### What changes were proposed in this pull request?
This pr aims to refactor  `AnalysisTest#assertAnalysisErrorClass` method by reusing the `checkError` method in `SparkFunSuite`.

On the other hand, the signature of `AnalysisTest#assertAnalysisErrorClass` method is changed from

```
protected def assertAnalysisErrorClass(
      inputPlan: LogicalPlan,
      expectedErrorClass: String,
      expectedMessageParameters: Map[String, String],
      caseSensitive: Boolean = true,
      line: Int = -1,
      pos: Int = -1): Unit
```
to

```
protected def assertAnalysisErrorClass(
      inputPlan: LogicalPlan,
      expectedErrorClass: String,
      expectedMessageParameters: Map[String, String],
      queryContext: Array[QueryContext] = Array.empty,
      caseSensitive: Boolean = true): Unit
```

Then when we need to use `queryContext` instead of `line + pos` for assertion

### Why are the changes needed?
`assertAnalysisErrorClass` and `checkError` does the same work.

### Does this PR introduce _any_ user-facing change?
No,  just for test

### How was this patch tested?

- Pass GitHub Actions

Closes #38413 from LuciferYang/simplify-assertAnalysisErrorClass.

Authored-by: yangjie01 <[email protected]>
Signed-off-by: Max Gekk <[email protected]>
### What changes were proposed in this pull request?
This PR aims to replace 'intercept' with 'Check error classes' in PlanResolutionSuite.

### Why are the changes needed?
The changes improve the error framework.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
By running the modified test suite:
```
$ build/sbt "test:testOnly *PlanResolutionSuite"
```

Closes #38421 from panbingkun/SPARK-40889.

Authored-by: panbingkun <[email protected]>
Signed-off-by: Max Gekk <[email protected]>
…rquetFileFormat on producing columnar output

### What changes were proposed in this pull request?

We move the decision about supporting columnar output based on WSCG one level from ParquetFileFormat / OrcFileFormat up to FileSourceScanExec, and pass it as a new required option for ParquetFileFormat / OrcFileFormat. Now the semantics is as follows:
* `ParquetFileFormat.supportsBatch` and `OrcFileFormat.supportsBatch` returns whether it **can**, not necessarily **will** return columnar output.
* To return columnar output, an option `FileFormat.OPTION_RETURNING_BATCH` needs to be passed to `buildReaderWithPartitionValues` in these two file formats. It should only be set to `true` if `supportsBatch` is also `true`, but it can be set to `false` if we don't want columnar output nevertheless - this way, `FileSourceScanExec` can set it to false when there are more than 100 columsn for WSCG, and `ParquetFileFormat` / `OrcFileFormat` doesn't have to concern itself about WSCG limits.
* To avoid not passing it by accident, this option is made required. Making it required requires updating a few places that use it, but an error resulting from this is very obscure. It's better to fail early and explicitly here.

### Why are the changes needed?

This explains it for `ParquetFileFormat`. `OrcFileFormat` had exactly the same issue.

`java.lang.ClassCastException: org.apache.spark.sql.vectorized.ColumnarBatch cannot be cast to org.apache.spark.sql.catalyst.InternalRow` was being thrown because ParquetReader was outputting columnar batches, while FileSourceScanExec expected row output.

The mismatch comes from the fact that `ParquetFileFormat.supportBatch` depends on `WholeStageCodegenExec.isTooManyFields(conf, schema)`, where the threshold is 100 fields.

When this is used in `FileSourceScanExec`:
```
  override lazy val supportsColumnar: Boolean = {
      relation.fileFormat.supportBatch(relation.sparkSession, schema)
  }
```
the `schema` comes from output attributes, which includes extra metadata attributes.

However, inside `ParquetFileFormat.buildReaderWithPartitionValues` it was calculated again as
```
      relation.fileFormat.buildReaderWithPartitionValues(
        sparkSession = relation.sparkSession,
        dataSchema = relation.dataSchema,
        partitionSchema = relation.partitionSchema,
        requiredSchema = requiredSchema,
        filters = pushedDownFilters,
        options = options,
        hadoopConf = hadoopConf
...
val resultSchema = StructType(requiredSchema.fields ++ partitionSchema.fields)
...
val returningBatch = supportBatch(sparkSession, resultSchema)
```

Where `requiredSchema` and `partitionSchema` wouldn't include the metadata columns:
```
FileSourceScanExec: output: List(c1#4608L, c2#4609L, ..., c100#4707L, file_path#6388)
FileSourceScanExec: dataSchema: StructType(StructField(c1,LongType,true),StructField(c2,LongType,true),...,StructField(c100,LongType,true))
FileSourceScanExec: partitionSchema: StructType()
FileSourceScanExec: requiredSchema: StructType(StructField(c1,LongType,true),StructField(c2,LongType,true),...,StructField(c100,LongType,true))
```

Column like `file_path#6388` are added by the scan, and contain metadata added by the scan, not by the file reader which concerns itself with what is within the file.

### Does this PR introduce _any_ user-facing change?

Not a public API change, but it is now required to pass `FileFormat.OPTION_RETURNING_BATCH` in `options` to `ParquetFileFormat.buildReaderWithPartitionValues`. The only user of this API in Apache Spark is `FileSourceScanExec`.

### How was this patch tested?

Tests added

Closes #38397 from juliuszsompolski/SPARK-40918.

Authored-by: Juliusz Sompolski <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
### What changes were proposed in this pull request?

The messages returned by allGather may be overridden by the following barrier APIs, eg,

``` scala
      val messages: Array[String] = context.allGather("ABC")
      context.barrier()
```

the  `messages` may be like Array("", ""), but we're expecting Array("ABC", "ABC")

The root cause of this issue is the [messages got by allGather](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala#L102) pointing to the [original message](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/BarrierCoordinator.scala#L107) in the local mode. So when the following barrier APIs changed the messages, then the allGather message will be changed accordingly.
Finally, users can't get the correct result.

This PR fixed this issue by sending back the cloned messages.

### Why are the changes needed?

The bug mentioned in this description may block some external SPARK ML libraries which heavily depend on the spark barrier API to do some synchronization. If the barrier mechanism can't guarantee the correctness of the barrier APIs, it will be a disaster for external SPARK ML libraries.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?

I added a unit test, with this PR, the unit test can pass

Closes #38410 from wbo4958/allgather-issue.

Authored-by: Bobby Wang <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
…ed if `pandas` doesn't exist

### What changes were proposed in this pull request?

This PR aims to skip `pyspark-connect` unit tests when `pandas` is unavailable.

### Why are the changes needed?

**BEFORE**
```
% python/run-tests --modules pyspark-connect
Running PySpark tests. Output is in /Users/dongjoon/APACHE/spark-merge/python/unit-tests.log
Will test against the following Python executables: ['python3.9']
Will test the following Python modules: ['pyspark-connect']
python3.9 python_implementation is CPython
python3.9 version is: Python 3.9.15
Starting test(python3.9): pyspark.sql.tests.connect.test_connect_plan_only (temp output: /Users/dongjoon/APACHE/spark-merge/python/target/f14573f1-131f-494a-a015-8b4762219fb5/python3.9__pyspark.sql.tests.connect.test_connect_plan_only__86sd4pxg.log)
Starting test(python3.9): pyspark.sql.tests.connect.test_connect_column_expressions (temp output: /Users/dongjoon/APACHE/spark-merge/python/target/51391499-d21a-4c1d-8b79-6ac52859a4c9/python3.9__pyspark.sql.tests.connect.test_connect_column_expressions__kn__9aur.log)
Starting test(python3.9): pyspark.sql.tests.connect.test_connect_basic (temp output: /Users/dongjoon/APACHE/spark-merge/python/target/7854cbef-e40d-4090-a37d-5a5314eb245f/python3.9__pyspark.sql.tests.connect.test_connect_basic__i1rutevd.log)
Starting test(python3.9): pyspark.sql.tests.connect.test_connect_select_ops (temp output: /Users/dongjoon/APACHE/spark-merge/python/target/6f947453-7481-4891-81b0-169aaac8c6ee/python3.9__pyspark.sql.tests.connect.test_connect_select_ops__5sxao0ji.log)
Traceback (most recent call last):
  File "/opt/homebrew/Cellar/python3.9/3.9.15/Frameworks/Python.framework/Versions/3.9/lib/python3.9/runpy.py", line 197, in _run_module_as_main
    return _run_code(code, main_globals, None,
  File "/opt/homebrew/Cellar/python3.9/3.9.15/Frameworks/Python.framework/Versions/3.9/lib/python3.9/runpy.py", line 87, in _run_code
    exec(code, run_globals)
  File "/Users/dongjoon/APACHE/spark-merge/python/pyspark/sql/tests/connect/test_connect_basic.py", line 22, in <module>
    import pandas
ModuleNotFoundError: No module named 'pandas'
```

**AFTER**
```
% python/run-tests --modules pyspark-connect
Running PySpark tests. Output is in /Users/dongjoon/APACHE/spark-merge/python/unit-tests.log
Will test against the following Python executables: ['python3.9']
Will test the following Python modules: ['pyspark-connect']
python3.9 python_implementation is CPython
python3.9 version is: Python 3.9.15
Starting test(python3.9): pyspark.sql.tests.connect.test_connect_basic (temp output: /Users/dongjoon/APACHE/spark-merge/python/target/571609c0-3070-476c-afbe-56e215eb5647/python3.9__pyspark.sql.tests.connect.test_connect_basic__4e9k__5x.log)
Starting test(python3.9): pyspark.sql.tests.connect.test_connect_column_expressions (temp output: /Users/dongjoon/APACHE/spark-merge/python/target/4a30d035-e392-4ad2-ac10-5d8bc5421321/python3.9__pyspark.sql.tests.connect.test_connect_column_expressions__c9x39tvp.log)
Starting test(python3.9): pyspark.sql.tests.connect.test_connect_plan_only (temp output: /Users/dongjoon/APACHE/spark-merge/python/target/eea0b5db-9a92-4fbb-912d-a59daaf73f8e/python3.9__pyspark.sql.tests.connect.test_connect_plan_only__0p9ivnod.log)
Starting test(python3.9): pyspark.sql.tests.connect.test_connect_select_ops (temp output: /Users/dongjoon/APACHE/spark-merge/python/target/6069c664-afd9-4a3c-a0cc-f707577e039e/python3.9__pyspark.sql.tests.connect.test_connect_select_ops__sxzrtiqa.log)
Finished test(python3.9): pyspark.sql.tests.connect.test_connect_column_expressions (1s) ... 2 tests were skipped
Finished test(python3.9): pyspark.sql.tests.connect.test_connect_select_ops (1s) ... 2 tests were skipped
Finished test(python3.9): pyspark.sql.tests.connect.test_connect_plan_only (1s) ... 10 tests were skipped
Finished test(python3.9): pyspark.sql.tests.connect.test_connect_basic (1s) ... 6 tests were skipped
Tests passed in 1 seconds

Skipped tests in pyspark.sql.tests.connect.test_connect_basic with python3.9:
      test_limit_offset (pyspark.sql.tests.connect.test_connect_basic.SparkConnectTests) ... skip (0.002s)
      test_schema (pyspark.sql.tests.connect.test_connect_basic.SparkConnectTests) ... skip (0.000s)
      test_simple_datasource_read (pyspark.sql.tests.connect.test_connect_basic.SparkConnectTests) ... skip (0.000s)
      test_simple_explain_string (pyspark.sql.tests.connect.test_connect_basic.SparkConnectTests) ... skip (0.000s)
      test_simple_read (pyspark.sql.tests.connect.test_connect_basic.SparkConnectTests) ... skip (0.000s)
      test_simple_udf (pyspark.sql.tests.connect.test_connect_basic.SparkConnectTests) ... skip (0.000s)

Skipped tests in pyspark.sql.tests.connect.test_connect_column_expressions with python3.9:
      test_column_literals (pyspark.sql.tests.connect.test_connect_column_expressions.SparkConnectColumnExpressionSuite) ... skip (0.000s)
      test_simple_column_expressions (pyspark.sql.tests.connect.test_connect_column_expressions.SparkConnectColumnExpressionSuite) ... skip (0.000s)

Skipped tests in pyspark.sql.tests.connect.test_connect_plan_only with python3.9:
      test_all_the_plans (pyspark.sql.tests.connect.test_connect_plan_only.SparkConnectTestsPlanOnly) ... skip (0.002s)
      test_datasource_read (pyspark.sql.tests.connect.test_connect_plan_only.SparkConnectTestsPlanOnly) ... skip (0.000s)
      test_deduplicate (pyspark.sql.tests.connect.test_connect_plan_only.SparkConnectTestsPlanOnly) ... skip (0.001s)
      test_filter (pyspark.sql.tests.connect.test_connect_plan_only.SparkConnectTestsPlanOnly) ... skip (0.000s)
      test_limit (pyspark.sql.tests.connect.test_connect_plan_only.SparkConnectTestsPlanOnly) ... skip (0.000s)
      test_offset (pyspark.sql.tests.connect.test_connect_plan_only.SparkConnectTestsPlanOnly) ... skip (0.000s)
      test_relation_alias (pyspark.sql.tests.connect.test_connect_plan_only.SparkConnectTestsPlanOnly) ... skip (0.000s)
      test_sample (pyspark.sql.tests.connect.test_connect_plan_only.SparkConnectTestsPlanOnly) ... skip (0.001s)
      test_simple_project (pyspark.sql.tests.connect.test_connect_plan_only.SparkConnectTestsPlanOnly) ... skip (0.000s)
      test_simple_udf (pyspark.sql.tests.connect.test_connect_plan_only.SparkConnectTestsPlanOnly) ... skip (0.000s)

Skipped tests in pyspark.sql.tests.connect.test_connect_select_ops with python3.9:
      test_join_with_join_type (pyspark.sql.tests.connect.test_connect_select_ops.SparkConnectToProtoSuite) ... skip (0.002s)
      test_select_with_columns_and_strings (pyspark.sql.tests.connect.test_connect_select_ops.SparkConnectToProtoSuite) ... skip (0.000s)
```

### Does this PR introduce _any_ user-facing change?

No. This is a test-only PR.

### How was this patch tested?

Manually run the following.
```
$ pip3 uninstall pandas
$ python/run-tests --modules pyspark-connect
```

Closes #38426 from dongjoon-hyun/SPARK-40951.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
@pull pull bot merged commit e3b720f into huangxiaopingRD:master Oct 29, 2022
pull bot pushed a commit that referenced this pull request Aug 19, 2025
…onicalized expressions

### What changes were proposed in this pull request?

Make PullOutNonDeterministic use canonicalized expressions to dedup group and  aggregate expressions. This affects pyspark udfs in particular. Example:

```
from pyspark.sql.functions import col, avg, udf

pythonUDF = udf(lambda x: x).asNondeterministic()

spark.range(10)\
.selectExpr("id", "id % 3 as value")\
.groupBy(pythonUDF(col("value")))\
.agg(avg("id"), pythonUDF(col("value")))\
.explain(extended=True)
```

Currently results in a plan like this:

```
Aggregate [_nondeterministic#15](#15), [_nondeterministic#15 AS dummyNondeterministicUDF(value)#12, avg(id#0L) AS avg(id)#13, dummyNondeterministicUDF(value#6L)#8 AS dummyNondeterministicUDF(value)#14](#15%20AS%20dummyNondeterministicUDF(value)#12,%20avg(id#0L)%20AS%20avg(id)#13,%20dummyNondeterministicUDF(value#6L)#8%20AS%20dummyNondeterministicUDF(value)#14)
+- Project [id#0L, value#6L, dummyNondeterministicUDF(value#6L)#7 AS _nondeterministic#15](#0L,%20value#6L,%20dummyNondeterministicUDF(value#6L)#7%20AS%20_nondeterministic#15)
   +- Project [id#0L, (id#0L % cast(3 as bigint)) AS value#6L](#0L,%20(id#0L%20%%20cast(3%20as%20bigint))%20AS%20value#6L)
      +- Range (0, 10, step=1, splits=Some(2))
```

and then it throws:

```
[[MISSING_AGGREGATION] The non-aggregating expression "value" is based on columns which are not participating in the GROUP BY clause. Add the columns or the expression to the GROUP BY, aggregate the expression, or use "any_value(value)" if you do not care which of the values within a group is returned. SQLSTATE: 42803
```

- how canonicalized fixes this:
  -  nondeterministic PythonUDF expressions always have distinct resultIds per udf
  - The fix is to canonicalize the expressions when matching. Canonicalized means that we're setting the resultIds to -1, allowing us to dedup the PythonUDF expressions.
- for deterministic UDFs, this rule does not apply and "Post Analysis" batch extracts and deduplicates the expressions, as expected

### Why are the changes needed?

- the output of the query with the fix applied still makes sense - the nondeterministic UDF is invoked only once, in the project.

### Does this PR introduce _any_ user-facing change?

Yes, it's additive, it enables queries to run that previously threw errors.

### How was this patch tested?

- added unit test

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#52061 from benrobby/adhoc-fix-pull-out-nondeterministic.

Authored-by: Ben Hurdelhey <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants