Skip to content

Conversation

@pull
Copy link

@pull pull bot commented Oct 23, 2022

See Commits and Changes for more details.


Created by pull[bot]

Can you help keep this open source service alive? 💖 Please sponsor : )

### What changes were proposed in this pull request?

Bump Jackson Databind from 2.13.4.1 to 2.13.4.2

### Why are the changes needed?

### Does this PR introduce _any_ user-facing change?

There is a regression about Gradle in 2.13.4.1 and got fixed in 2.13.4.2
FasterXML/jackson-databind#3627

### How was this patch tested?

Existing UT.

Closes #38355 from pan3793/SPARK-40886.

Authored-by: Cheng Pan <[email protected]>
Signed-off-by: Sean Owen <[email protected]>
@pull pull bot added the ⤵️ pull label Oct 23, 2022
@github-actions github-actions bot added the BUILD label Oct 23, 2022
…o extract event time from the window column

### What changes were proposed in this pull request?

This PR introduces a window_time function to extract streaming event time from a window column produced by the window aggregating operators. This is one step in sequence of fixes required to add support for multiple stateful operators in Spark Structured Streaming as described in https://issues.apache.org/jira/browse/SPARK-40821

### Why are the changes needed?

The window_time function is a convenience function to compute correct event time for a window aggregate records. Such records produced by window aggregating operators have no explicit event time but rather a window column of type StructType { start: TimestampType, end: TimestampType } where start is inclusive and end is exclusive. The correct event time for such record is window.end - 1. The event time is necessary when chaining other stateful operators after the window aggregating operators.

### Does this PR introduce _any_ user-facing change?

Yes: The PR introduces a new window_time SQL function for both Scala and Python APIs.

### How was this patch tested?

Added new unit tests.

Closes #38288 from alex-balikov/SPARK-40821-time-window.

Authored-by: Alex Balikov <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
bjornjorgensen and others added 4 commits October 24, 2022 10:32
### What changes were proposed in this pull request?
Upgrade fabric8io - kubernetes-client from 6.1.1 to 6.2.0

### Why are the changes needed?

[Release notes](https://github.com/fabric8io/kubernetes-client/releases/tag/v6.2.0)
[Snakeyaml version should be updated to mitigate CVE-2022-28857](fabric8io/kubernetes-client#4383)

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Pass GA

Closes #38348 from bjornjorgensen/kubernetes-client6.2.0.

Authored-by: Bjørn <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
### What changes were proposed in this pull request?
Reimplement `crosstab` with dataframe operations

### Why are the changes needed?
1, do not truncate the sql plan;
2, much more scalable;
3, existing implementation (added in v1.5.0) collect distinct `col1, col2` pairs to driver, while `pivot` (added in v2.4.0)  only collect distinct `col2` which is much smaller;

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Existing UTs and manually check

Closes #38340 from zhengruifeng/sql_stat_crosstab.

Authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
### What changes were proposed in this pull request?
Reimplement `summary` with dataframe operations

### Why are the changes needed?
1, do not truncate the sql plan any more;
2, enable sql optimization like column pruning:

```
scala> val df = spark.range(0, 3, 1, 10).withColumn("value", lit("str"))
df: org.apache.spark.sql.DataFrame = [id: bigint, value: string]

scala> df.summary("max", "50%").show
+-------+---+-----+
|summary| id|value|
+-------+---+-----+
|    max|  2|  str|
|    50%|  1| null|
+-------+---+-----+

scala> df.summary("max", "50%").select("id").show
+---+
| id|
+---+
|  2|
|  1|
+---+

scala> df.summary("max", "50%").select("id").queryExecution.optimizedPlan
res4: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Project [element_at(id#367, summary#376, None, false) AS id#371]
+- Generate explode([max,50%]), false, [summary#376]
   +- Aggregate [map(max, cast(max(id#153L) as string), 50%, cast(percentile_approx(id#153L, [0.5], 10000, 0, 0)[0] as string)) AS id#367]
      +- Range (0, 3, step=1, splits=Some(10))

```

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
existing UTs and manually check

Closes #38346 from zhengruifeng/sql_stat_summary.

Authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
### What changes were proposed in this pull request?

Purging old entries in both the offset log and commit log will be done asynchronously.

For every micro-batch, older entries in both offset log and commit log are deleted. This is done so that the offset log and commit log do not continually grow.  Please reference logic here

https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala#L539

The time spent performing these log purges is grouped with the “walCommit” execution time in the StreamingProgressListener metrics.  Around two thirds of the “walCommit” execution time is performing these purge operations thus making these operations asynchronous will also reduce latency.  Also, we do not necessarily need to perform the purges every micro-batch.  When these purges are executed asynchronously, they do not need to block micro-batch execution and we don’t need to start another purge until the current one is finished.  The purges can happen essentially in the background.  We will just have to synchronize the purges with the offset WAL commits and completion commits so that we don’t have concurrent modifications of the offset log and commit log.

### Why are the changes needed?

Decrease microbatch processing latency

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Unit tests

Closes #38313 from jerrypeng/SPARK-40849.

Authored-by: Jerry Peng <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
zhengruifeng and others added 2 commits October 24, 2022 11:22
### What changes were proposed in this pull request?
remove unused imports

### Why are the changes needed?
```
[error] /home/runner/work/spark/spark/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala:24:78: Unused import
[error] import org.apache.spark.sql.catalyst.expressions.{Cast, ElementAt, EvalMode, GenericInternalRow}
[error]                                                                              ^
[error] /home/runner/work/spark/spark/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala:26:52: Unused import
[error] import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
[error]                                                    ^
[error] /home/runner/work/spark/spark/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala:31:38: Unused import
[error] import org.apache.spark.unsafe.types.UTF8String
[error]                                      ^
[error] three errors found
```

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
maunally build

Closes #38362 from zhengruifeng/sql_clean_unused_imports.

Authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
…on client

### What changes were proposed in this pull request?

Following up on #38276, this PR improve both `distinct()` and `dropDuplicates` DataFrame API in Python client, which both depends on `Deduplicate` plan in the Connect proto.

### Why are the changes needed?

Improve API coverage.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

UT

Closes #38327 from amaliujia/python_deduplicate.

Authored-by: Rui Wang <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
allisonwang-db and others added 3 commits October 24, 2022 11:10
…edicates

<!--
Thanks for sending a pull request!  Here are some tips for you:
  1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
  2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
  3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
  4. Be sure to keep the PR description updated to reflect all changes.
  5. Please write your PR title to summarize what this PR proposes.
  6. If possible, provide a concise example to reproduce the issue for a faster review.
  7. If you want to add a new configuration, please read the guideline first for naming configurations in
     'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
  8. If you want to add or modify an error type or message, please read the guideline first in
     'core/src/main/resources/error/README.md'.
-->

### What changes were proposed in this pull request?
<!--
Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue.
If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
  1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
  2. If you fix some SQL features, you can provide some references of other DBMSes.
  3. If there is design documentation, please add the link.
  4. If there is a discussion in the mailing list, please add the link.
-->
This PR supports correlated non-equality predicates in subqueries. It leverages the DecorrelateInnerQuery framework to decorrelate subqueries with non-equality predicates. DecorrelateInnerQuery inserts domain joins in the query plan and the rule RewriteCorrelatedScalarSubquery rewrites the domain joins into actual joins with the outer query.

Note, correlated non-equality predicates can lead to query plans with non-equality join conditions, which may be planned as a broadcast NL join or cartesian product.

### Why are the changes needed?
<!--
Please clarify why the changes are needed. For instance,
  1. If you propose a new API, clarify the use case for a new API.
  2. If you fix a bug, you can clarify why it is a bug.
-->
To improve subquery support in Spark.

### Does this PR introduce _any_ user-facing change?
<!--
Note that it means *any* user-facing change including all aspects such as the documentation fix.
If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
If possible, please also clarify if this is a user-facing change compared to the released Spark versions or within the unreleased branches such as master.
If no, write 'No'.
-->
Yes. Before this PR, Spark does not allow correlated non-equality predicates in subqueries.
For example:
```sql
SELECT (SELECT min(c2) FROM t2 WHERE t1.c1 > t2.c1) FROM t1
```
This will throw an exception: `Correlated column is not allowed in a non-equality predicate`

After this PR, this query can run successfully.

### How was this patch tested?
<!--
If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
If tests were not added, please describe why they were not added and/or why it was difficult to add.
If benchmark tests were added, please run the benchmarks in GitHub Actions for the consistent environment, and the instructions could accord to: https://spark.apache.org/developer-tools.html#github-workflow-benchmarks.
-->
Unit tests and SQL query tests.

Closes #38135 from allisonwang-db/spark-36114-non-equality-pred.

Authored-by: allisonwang-db <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
…nSubquery

### What changes were proposed in this pull request?

This PR modifies the optimizer rule `OptimizeOneRowRelationSubquery` to always collapse projects and inline non-volatile expressions.

### Why are the changes needed?

SPARK-39699 made `CollpaseProjects` more conservative. This has impacted correlated subqueries that Spark used to be able to support. For example, Spark used to be able to execute this correlated subquery:
```sql
SELECT (
  SELECT array_sort(a, (i, j) -> rank[i] - rank[j]) AS sorted
  FROM (SELECT MAP('a', 1, 'b', 2) rank)
) FROM t1
```
But after SPARK-39699, it will throw an exception `Unexpected operator Join Inner` because the projects inside the subquery can no longer be collapsed. We should always inline expressions if possible to support a broader range of correlated subqueries and avoid adding expensive domain joins.

### Does this PR introduce _any_ user-facing change?

Yes. It will allow Spark to execute more types of correlated subqueries.

### How was this patch tested?

Unit test.

Closes #38260 from allisonwang-db/spark-40800-inline-expr-subquery.

Authored-by: allisonwang-db <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
…rtifact to v3

### What changes were proposed in this pull request?
Upgrade actions/cache to v3 and actions/upload-artifact to v3

### Why are the changes needed?
- Since actions/cachev3: support from node 12 -> node 16 and cleanup `set-output` warning
- Since actions/upload-artifactv3: support from node 12 -> node 16 and cleanup `set-output` warning

### Does this PR introduce _any_ user-facing change?
No, dev only

### How was this patch tested?
CI passed

Closes #38353 from Yikun/SPARK-40881.

Authored-by: Yikun Jiang <[email protected]>
Signed-off-by: Yikun Jiang <[email protected]>
@github-actions github-actions bot added the INFRA label Oct 24, 2022
Yikun and others added 2 commits October 24, 2022 14:51
…on specified

### What changes were proposed in this pull request?
Upgrade actions/setup-java to v3 with distribution specified

### Why are the changes needed?

- The `distribution` is required after v2, now just keep `zulu` (same distribution with v1): https://github.com/actions/setup-java/releases/tag/v2.0.0
- https://github.com/actions/setup-java/releases/tag/v3.0.0: Upgrade node
- https://github.com/actions/setup-java/releases/tag/v3.6.0: Cleanup set-output warning

### Does this PR introduce _any_ user-facing change?
No,dev only

### How was this patch tested?
CI passed

Closes #38354 from Yikun/SPARK-40882.

Authored-by: Yikun Jiang <[email protected]>
Signed-off-by: Yikun Jiang <[email protected]>
…signmentPolicy

### What changes were proposed in this pull request?

extract the check insertion field cast methold so that we can do validate patition value at PartitioningUtils.normalizePartitionSpec

### Why are the changes needed?

Insertion follow the behavior of config `spark.sql.storeAssignmentPolicy`, which will fail if the value can not cast to target data type by default. Alter partition should also follow it. For example:
```SQL
CREATE TABLE t (c int) USING PARQUET PARTITIONED BY(p int);

-- This DDL should fail but worked:
ALTER TABLE t ADD PARTITION(p='aaa');

-- FAILED which follows spark.sql.storeAssignmentPolicy
INSERT INTO t PARTITION(p='aaa') SELECT 1
```

### Does this PR introduce _any_ user-facing change?

yes,  the added partition value will follow the behavior of  `storeAssignmentPolicy`.

To restore the previous behavior, set spark.sql.legacy.skipPartitionSpecTypeValidation = true;

### How was this patch tested?

add test

Closes #38257 from ulysses-you/verify-partition.

Authored-by: ulysses-you <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
@github-actions github-actions bot added the DOCS label Oct 24, 2022
HeartSaVioR and others added 6 commits October 24, 2022 16:45
…on window_time

### What changes were proposed in this pull request?

This PR fixes the incorrect available version for new function `window_time` to 3.4.0 which is upcoming release for master branch.

### Why are the changes needed?

The version information is incorrect.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

N/A

Closes #38368 from HeartSaVioR/SPARK-40821-follow-up-minor-version-fix.

Authored-by: Jungtaek Lim <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
### What changes are proposed in this pull request?

This proposes to add SQLMetrics instrumentation for Python UDF execution, including Pandas UDF, and related operations such as MapInPandas and MapInArrow.
The proposed metrics are:

- data sent to Python workers
- data returned from Python workers
- number of output rows

### Why are the changes needed?
This aims at improving monitoring and performance troubleshooting of Python UDFs.
In particular it is intended as an aid to answer performance-related questions such as:
why is the UDF slow?, how much work has been done so far?, etc.

### Does this PR introduce _any_ user-facing change?
SQL metrics are made available in the WEB UI.
See the following examples:

![image1](https://issues.apache.org/jira/secure/attachment/13038693/PandasUDF_ArrowEvalPython_Metrics.png)

### How was this patch tested?

Manually tested + a Python unit test and a Scala unit test have been added.

Example code used for testing:

```
from pyspark.sql.functions import col, pandas_udf
import time

pandas_udf("long")
def test_pandas(col1):
  time.sleep(0.02)
  return col1 * col1

spark.udf.register("test_pandas", test_pandas)
spark.sql("select rand(42)*rand(51)*rand(12) col1 from range(10000000)").createOrReplaceTempView("t1")
spark.sql("select max(test_pandas(col1)) from t1").collect()
```

This is used to test with more data pushed to the Python workers:

```
from pyspark.sql.functions import col, pandas_udf
import time

pandas_udf("long")
def test_pandas(col1,col2,col3,col4,col5,col6,col7,col8,col9,col10,col11,col12,col13,col14,col15,col16,col17):
  time.sleep(0.02)
  return col1

spark.udf.register("test_pandas", test_pandas)
spark.sql("select rand(42)*rand(51)*rand(12) col1 from range(10000000)").createOrReplaceTempView("t1")
spark.sql("select max(test_pandas(col1,col1+1,col1+2,col1+3,col1+4,col1+5,col1+6,col1+7,col1+8,col1+9,col1+10,col1+11,col1+12,col1+13,col1+14,col1+15,col1+16)) from t1").collect()
```

This (from the Spark doc) has been used to test with MapInPandas, where the number of output rows is different from the number of input rows:

```
import pandas as pd

from pyspark.sql.functions import pandas_udf, PandasUDFType

df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))

def filter_func(iterator):
    for pdf in iterator:
        yield pdf[pdf.id == 1]

df.mapInPandas(filter_func, schema=df.schema).show()
```

This for testing BatchEvalPython and metrics related to data transfer (bytes sent and received):
```
from pyspark.sql.functions import udf

udf
def test_udf(col1, col2):
     return col1 * col1

spark.sql("select id, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' col2 from range(10)").select(test_udf("id", "col2")).collect()
```

Closes #33559 from LucaCanali/pythonUDFKeySQLMetrics.

Authored-by: Luca Canali <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
…rSuite

### What changes were proposed in this pull request?
his PR aims to replace 'intercept' with 'Check error classes' in TableIdentifierParserSuite.

### Why are the changes needed?
The changes improve the error framework.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
By running the modified test suite:
```
$ build/sbt "test:testOnly *TableIdentifierParserSuite"
```

Closes #38364 from panbingkun/SPARK-40891.

Authored-by: panbingkun <[email protected]>
Signed-off-by: Max Gekk <[email protected]>
### What changes were proposed in this pull request?
add following missing APIs to references:

- StorageLevel.MEMORY_AND_DISK_DESER
- TaskContext.cpus
- BarrierTaskContext.cpus

### Why are the changes needed?
they were missing in Reference

### Does this PR introduce _any_ user-facing change?
Yes

### How was this patch tested?
manually check, for `BarrierTaskContext.cpus`

```
In [10]: from pyspark import BarrierTaskContext

In [11]: rdd = spark.sparkContext.parallelize([1])

In [12]: rdd.barrier().mapPartitions(lambda _: [BarrierTaskContext.get().cpus()]).collect()
Out[12]: [1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
```

Closes #38373 from zhengruifeng/py_doc_missing.

Authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
…module-scala

### What changes were proposed in this pull request?

Remove unnecessary guava exclusion from jackson-module-scala

### Why are the changes needed?

The exclusion added in SPARK-6149, the recent versions of  jackson-module-scala does not depend on gauva any more, so we can remove this exclusion.

https://mvnrepository.com/artifact/com.fasterxml.jackson.module/jackson-module-scala_2.12/2.13.3

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Exsiting UT.

Closes #37405 from pan3793/SPARK-39977.

Authored-by: Cheng Pan <[email protected]>
Signed-off-by: Sean Owen <[email protected]>
… bash scripts

This fixes two problems that affect development in a Windows shell environment, such as `cygwin` or `msys2`.

### The fixed build error
Running `./build/sbt packageBin` from A Windows cygwin `bash` session fails.

This occurs if `WSL` is installed, because `project\SparkBuild.scala` creates a `bash` process, but `WSL bash` is called, even though `cygwin bash` appears earlier in the `PATH`.  In addition, file path arguments to bash contain backslashes.    The fix is to insure that the correct `bash` is called, and that arguments passed to `bash` are passed with slashes rather than **slashes.**

### The build error message:
```bash
 ./build.sbt packageBin
```
<pre>
[info] compiling 9 Java sources to C:\Users\philwalk\workspace\spark\common\sketch\target\scala-2.12\classes ...
/bin/bash: C:Usersphilwalkworkspacesparkcore/../build/spark-build-info: No such file or directory
[info] compiling 1 Scala source to C:\Users\philwalk\workspace\spark\tools\target\scala-2.12\classes ...
[info] compiling 5 Scala sources to C:\Users\philwalk\workspace\spark\mllib-local\target\scala-2.12\classes ...
[info] Compiling 5 protobuf files to C:\Users\philwalk\workspace\spark\connector\connect\target\scala-2.12\src_managed\main
[error] stack trace is suppressed; run last core / Compile / managedResources for the full output
[error] (core / Compile / managedResources) Nonzero exit value: 127
[error] Total time: 42 s, completed Oct 8, 2022, 4:49:12 PM
sbt:spark-parent>
sbt:spark-parent> last core /Compile /managedResources
last core /Compile /managedResources
[error] java.lang.RuntimeException: Nonzero exit value: 127
[error]         at scala.sys.package$.error(package.scala:30)
[error]         at scala.sys.process.ProcessBuilderImpl$AbstractBuilder.slurp(ProcessBuilderImpl.scala:138)
[error]         at scala.sys.process.ProcessBuilderImpl$AbstractBuilder.$bang$bang(ProcessBuilderImpl.scala:108)
[error]         at Core$.$anonfun$settings$4(SparkBuild.scala:604)
[error]         at scala.Function1.$anonfun$compose$1(Function1.scala:49)
[error]         at sbt.internal.util.$tilde$greater.$anonfun$$u2219$1(TypeFunctions.scala:62)
[error]         at sbt.std.Transform$$anon$4.work(Transform.scala:68)
[error]         at sbt.Execute.$anonfun$submit$2(Execute.scala:282)
[error]         at sbt.internal.util.ErrorHandling$.wideConvert(ErrorHandling.scala:23)
[error]         at sbt.Execute.work(Execute.scala:291)
[error]         at sbt.Execute.$anonfun$submit$1(Execute.scala:282)
[error]         at sbt.ConcurrentRestrictions$$anon$4.$anonfun$submitValid$1(ConcurrentRestrictions.scala:265)
[error]         at sbt.CompletionService$$anon$2.call(CompletionService.scala:64)
[error]         at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
[error]         at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
[error]         at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
[error]         at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
[error]         at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
[error]         at java.base/java.lang.Thread.run(Thread.java:834)
[error] (core / Compile / managedResources) Nonzero exit value: 127
</pre>

### bash scripts fail when run from `cygwin` or `msys2`
The other problem fixed by the PR is to address problems preventing the `bash` scripts (`spark-shell`, `spark-submit`, etc.) from being used in Windows `SHELL` environments.   The problem is that the bash version of `spark-class` fails in a Windows shell environment, the result of `launcher/src/main/java/org/apache/spark/launcher/Main.java` not following the convention expected by `spark-class`, and also appending CR to line endings.  The resulting error message not helpful.

There are two parts to this fix:
1. modify `Main.java` to treat a `SHELL` session on Windows as a `bash` session
2. remove the appended CR character when parsing the output produced by `Main.java`

### Does this PR introduce _any_ user-facing change?

These changes should NOT affect anyone who is not trying build or run bash scripts from a Windows SHELL environment.

### How was this patch tested?
Manual tests were performed to verify both changes.

### related JIRA issues
The following 2 JIRA issue were created.  Both are fixed by this PR.  They are both linked to this PR.

- Bug SPARK-40739 "sbt packageBin" fails in cygwin or other windows bash session
- Bug SPARK-40738 spark-shell fails with "bad array"

Closes #38228 from philwalk/windows-shell-env-fixes.

Authored-by: Phil <[email protected]>
Signed-off-by: Sean Owen <[email protected]>
…stead of manually write MockMaker

### What changes were proposed in this pull request?
This pr aims use `mockito-inline` instead of manually write `MockMaker`

### Why are the changes needed?
`mockito-inline` is a more recommended [way](https://javadoc.io/doc/org.mockito/mockito-core/latest/org/mockito/Mockito.html#39) to use mockito to mocking final types, enums and final methods
 and `mllib` and `mllib-local` module is already using `mockito-inline`.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?

- Pass GitHub Actions
- Manual test:run `build/sbt clean "sql/testOnly *QueryExecutionErrorsSuite"` with Java 8u352, 11.0.17 and 17.0.5, all 3 Java versions passed

Closes #38372 from LuciferYang/SPARK-40391.

Authored-by: yangjie01 <[email protected]>
Signed-off-by: Sean Owen <[email protected]>
grundprinzip and others added 3 commits October 24, 2022 23:09
### What changes were proposed in this pull request?
To be able to modify the incoming requests for the Spark Connect GRPC service, for example to be able to translate metadata from the HTTP/2 request to values in the proto message the GRPC service needs to be configured using an interceptor.

This patch adds two ways to configure interceptors for the GRPC service. First, we can now configure interceptors in the `SparkConnectInterceptorRegistry` by adding a value to the `interceptorChain` like in the example below:

```
object SparkConnectInterceptorRegistry {

  // Contains the list of configured interceptors.
  private lazy val interceptorChain: Seq[InterceptorBuilder] = Seq(
    interceptor[LoggingInterceptor](classOf[LoggingInterceptor])
  )
  // ...
}
```

The second way to configure interceptors is by configuring them using Spark configuration values at startup. Therefore a new config key has been added called: `spark.connect.grpc.interceptor.classes`. This config value contains a comma-separated list of classes that are added as interceptors to the system.

```
./bin/pyspark --conf spark.connect.grpc.interceptor.classes=com.my.important.LoggingInterceptor
```

During startup all of the interceptors are added in order to the `NettyServerBuilder`.

```
// Add all registered interceptors to the server builder.
SparkConnectInterceptorRegistry.chainInterceptors(sb)
```

### Why are the changes needed?
Provide a configurable and extensible way to configure interceptors.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Unit Tests

Closes #38320 from grundprinzip/SPARK-40857.

Lead-authored-by: Martin Grund <[email protected]>
Co-authored-by: Martin Grund <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
…to error classes

### What changes were proposed in this pull request?
This pr replaces TypeCheckFailure by DataTypeMismatch in type checks in the math expressions, includes:
- hash.scala (HashExpression)
- mathExpressions.scala (RoundBase)

### Why are the changes needed?
Migration onto error classes unifies Spark SQL error messages.

### Does this PR introduce _any_ user-facing change?
Yes. The PR changes user-facing error messages.

### How was this patch tested?
- Add new UT
- Update existed UT
- Pass GA.

Closes #38332 from panbingkun/SPARK-40750.

Authored-by: panbingkun <[email protected]>
Signed-off-by: Max Gekk <[email protected]>
… quick submission of drivers

### What changes were proposed in this pull request?

##### Quick submission of drivers in tests to mesos scheduler results in dropping drivers

Queued drivers in `MesosClusterScheduler` are ordered based on `MesosDriverDescription` - and the ordering used checks for priority (if different), followed by comparison of submission time.
For two driver submissions with same priority, if made in quick succession (such that submission time is same due to millisecond granularity of Date), this results in dropping the second `MesosDriverDescription` from `queuedDrivers` (since `driverOrdering` returns `0` when comparing the descriptions).

This PR fixes the more immediate issue with tests.

### Why are the changes needed?

Flakey tests, [see here](https://lists.apache.org/thread/jof098qxp0s6qqmt9qwv52f9665b1pjg) for an example.

### Does this PR introduce _any_ user-facing change?

No.
Fixing only tests for now - as mesos support is deprecated, not changing scheduler itself to address this.

### How was this patch tested?

Fixes unit tests

Closes #38378 from mridulm/fix_MesosClusterSchedulerSuite.

Authored-by: Mridul <mridulatgmail.com>
Signed-off-by: Dongjoon Hyun <[email protected]>
@github-actions github-actions bot added the MESOS label Oct 24, 2022
### What changes were proposed in this pull request?

This PR aims to support `zsh` in K8s `entrypoint.sh`.

### Why are the changes needed?

`zsh` doesn't support `readarray`.
```
% readarray
zsh: command not found: readarray
```

Instead, `zsh` supports the following way.
```
% SPARK_EXECUTOR_JAVA_OPTS=("${(f)$(< /tmp/java_opts.txt)}")
% echo $SPARK_EXECUTOR_JAVA_OPTS[]
-Djava.net.preferIPv6Addresses=false -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED -Dspark.driver.blockManager.port=7079 -Dspark.driver.port=7078
```

### Does this PR introduce _any_ user-facing change?

No. This PR only introduces additional behavior when there is no `readarray` command.

### How was this patch tested?

Pass the K8s CIs.

Closes #38380 from dongjoon-hyun/SPARK-40904.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
grundprinzip and others added 7 commits October 25, 2022 11:18
### What changes were proposed in this pull request?

Different systems will need different metadata that is passed as the user context during the request. To be able to handle the different systems seamlessly, make the `UserContext` extensible with `google.protobuf.Any`.

### Why are the changes needed?
Extensibility.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
UT

Closes #38374 from grundprinzip/SPARK-40899.

Authored-by: Martin Grund <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
### What changes were proposed in this pull request?
`Mode` should copy keys before inserting into Map

### Why are the changes needed?
the result maybe incorrect:
```
val df = sc.parallelize(Seq.empty[Int], 4)
    .mapPartitionsWithIndex { (idx, iter) =>
         if (idx == 3) {
            Iterator("3", "3", "3", "3", "4")
         } else {
            Iterator("0", "1", "2", "3", "4")
         }
    }.toDF("a")

  df.select(mode(col("a"))).show
+-------+
|mode(a)|
+-------+
|      4|
+-------+

```

after this fix:
```
  df.select(mode(col("a"))).show
+-------+
|mode(a)|
+-------+
|      3|
+-------+
```

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
added UT

Closes #38383 from zhengruifeng/sql_mode_fix.

Authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
…tion`: phase 2

### What changes were proposed in this pull request?
In the PR, I propose to port the following tests suites onto `checkError` to check valuable error parts instead of error messages:
- ParserUtilsSuite
- TableSchemaParserSuite
- InsertSuite
- HiveDDLSuite
- SQLQuerySuite
- MiscFunctionsSuite

### Why are the changes needed?
Migration on `checkError()` will make the tests independent from the text of error messages.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
By running the modified test suites:
```
$ build/sbt "test:testOnly *.ParserUtilsSuite"
$ build/sbt "test:testOnly *.TableSchemaParserSuite"
$ build/sbt "test:testOnly *.InsertSuite"
$ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly *HiveDDLSuite"
$ build/sbt "test:testOnly *SQLQuerySuite"
$ build/sbt "test:testOnly org.apache.spark.sql.MiscFunctionsSuite"
```

Closes #38360 from MaxGekk/parseexception-checkError-2.

Authored-by: Max Gekk <[email protected]>
Signed-off-by: Max Gekk <[email protected]>
### What changes were proposed in this pull request?

This PR replace column names and columns type with a schema (which is a struct).

### Why are the changes needed?

Before this PR, AnalyzeResult separates column names and column types. However these two can be combined to form a schema which is a struct.  This PR will simplify that proto message.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

UT

Closes #38301 from amaliujia/return_schema_use_struct.

Authored-by: Rui Wang <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
### What changes were proposed in this pull request?
In the PR, I propose to quote function names by `toSQLId()` that are passed to DataTypeMismatch as message parameters.

### Why are the changes needed?
To be consistent w/ other error messages.

### Does this PR introduce _any_ user-facing change?
Yes, it changes user-facing error message.

### How was this patch tested?
By running the modified test suites:
```
$ PYSPARK_PYTHON=python3 build/sbt "sql/testOnly org.apache.spark.sql.SQLQueryTestSuite"
```

Closes #38370 from MaxGekk/quote-ids-datatype-mismatch.

Authored-by: Max Gekk <[email protected]>
Signed-off-by: Max Gekk <[email protected]>
### What changes were proposed in this pull request?
Pin pytest==7.1.3

### Why are the changes needed?

`pytest-mypy-plugins==1.9.3` depends on `pytest [required: >=6.0.0]`, [pytest 7.2.0](https://pypi.org/project/pytest/) is released just now

I guess it breaks the python linter

```
Traceback (most recent call last):
  File "/usr/local/bin/pytest", line 8, in <module>
    sys.exit(console_main())
  File "/usr/local/lib/python3.9/dist-packages/_pytest/config/__init__.py", line 190, in console_main
    code = main()
  File "/usr/local/lib/python3.9/dist-packages/_pytest/config/__init__.py", line 148, in main
    config = _prepareconfig(args, plugins)
```

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
CI,  but can not reproduce locally

Closes #38390 from zhengruifeng/infra_pin_pytest.

Authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
…into Map

### What changes were proposed in this pull request?
Make `PandasMode` copy keys before inserting into Map

### Why are the changes needed?
correctness issue similar to #38383, make it a separate PR since it is dedicated for Pandas API

```
In [24]: def f(index, iterator): return ['3', '3', '3', '3', '4'] if index == 3 else ['0', '1', '2', '3', '4']

In [25]: rdd = sc.parallelize([1, ], 4).mapPartitionsWithIndex(f)

In [26]: df = spark.createDataFrame(rdd, schema='string')

In [27]: psdf = df.pandas_api()

In [28]: psdf.mode()
Out[28]:
  value
0     4

In [29]: psdf._to_pandas().mode()
Out[29]:
  value
0     3
```

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
added UT

Closes #38385 from zhengruifeng/ps_mode_fix.

Authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
amaliujia and others added 4 commits October 25, 2022 23:49
### What changes were proposed in this pull request?

I was working on refactoring Connect proto tests from Catalyst DSL to DataFrame API, and identified that Join in Connect does not support `UsingColumns`. This is a gap between the Connect proto and DataFrame API. This also blocks the refactoring work because without `UsingColumns`, there is no compatible DataFrame Join API that we can covert existing tests to.

This PR adds the support for Join's `UsingColumns`.

### Why are the changes needed?

1. Improve API coverage.
2. Unblock testing refactoring.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

UT

Closes #38345 from amaliujia/proto-join-using-columns.

Authored-by: Rui Wang <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
### What changes were proposed in this pull request?
This pr aims to upgrade rocksdbjni from 7.6.0 to 7.7.3.

### Why are the changes needed?
This version bring the performance of `DeleteRange()` behavior and fixes some bugs, the release notes as follows:

- https://github.com/facebook/rocksdb/releases/tag/v7.7.2
- https://github.com/facebook/rocksdb/releases/tag/v7.7.3

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?

- Pass GitHub Actions
- Manual test `RocksDBBenchmark`:

**7.6.0**

```
[INFO] Running org.apache.spark.util.kvstore.RocksDBBenchmark
                                                count   mean    min     max     95th
dbClose                                         4       0.374   0.286   0.591   0.591
dbCreation                                      4       67.738  3.709   259.012 259.012
naturalIndexCreateIterator                      1024    0.006   0.002   1.595   0.007
naturalIndexDescendingCreateIterator            1024    0.007   0.006   0.068   0.008
naturalIndexDescendingIteration                 1024    0.006   0.004   0.060   0.008
naturalIndexIteration                           1024    0.006   0.004   0.119   0.009
randomDeleteIndexed                             1024    0.028   0.021   0.295   0.038
randomDeletesNoIndex                            1024    0.015   0.013   0.043   0.018
randomUpdatesIndexed                            1024    0.082   0.032   31.250  0.086
randomUpdatesNoIndex                            1024    0.033   0.030   0.661   0.036
randomWritesIndexed                             1024    0.119   0.034   52.561  0.118
randomWritesNoIndex                             1024    0.039   0.033   1.894   0.043
refIndexCreateIterator                          1024    0.005   0.005   0.020   0.007
refIndexDescendingCreateIterator                1024    0.003   0.003   0.027   0.005
refIndexDescendingIteration                     1024    0.007   0.005   0.046   0.008
refIndexIteration                               1024    0.007   0.005   0.321   0.010
sequentialDeleteIndexed                         1024    0.021   0.017   0.099   0.026
sequentialDeleteNoIndex                         1024    0.015   0.012   0.043   0.019
sequentialUpdatesIndexed                        1024    0.042   0.036   0.899   0.049
sequentialUpdatesNoIndex                        1024    0.039   0.031   0.945   0.047
sequentialWritesIndexed                         1024    0.048   0.041   2.022   0.056
sequentialWritesNoIndex                         1024    0.039   0.031   2.419   0.041
```

**7.7.3**

```
[INFO] Running org.apache.spark.util.kvstore.RocksDBBenchmark
                                                count   mean    min     max     95th
dbClose                                         4       0.365   0.272   0.531   0.531
dbCreation                                      4       66.947  3.915   258.255 258.255
naturalIndexCreateIterator                      1024    0.005   0.002   1.378   0.006
naturalIndexDescendingCreateIterator            1024    0.005   0.005   0.061   0.007
naturalIndexDescendingIteration                 1024    0.005   0.004   0.033   0.008
naturalIndexIteration                           1024    0.006   0.004   0.048   0.009
randomDeleteIndexed                             1024    0.026   0.018   0.271   0.034
randomDeletesNoIndex                            1024    0.015   0.012   0.044   0.018
randomUpdatesIndexed                            1024    0.079   0.032   29.256  0.083
randomUpdatesNoIndex                            1024    0.035   0.032   0.521   0.039
randomWritesIndexed                             1024    0.115   0.033   49.182  0.118
randomWritesNoIndex                             1024    0.040   0.034   1.876   0.045
refIndexCreateIterator                          1024    0.004   0.004   0.015   0.005
refIndexDescendingCreateIterator                1024    0.003   0.002   0.025   0.004
refIndexDescendingIteration                     1024    0.006   0.005   0.053   0.008
refIndexIteration                               1024    0.007   0.005   0.259   0.010
sequentialDeleteIndexed                         1024    0.021   0.017   0.118   0.027
sequentialDeleteNoIndex                         1024    0.015   0.012   0.043   0.018
sequentialUpdatesIndexed                        1024    0.044   0.037   0.902   0.053
sequentialUpdatesNoIndex                        1024    0.041   0.030   1.155   0.050
sequentialWritesIndexed                         1024    0.049   0.038   2.132   0.059
sequentialWritesNoIndex                         1024    0.039   0.030   2.527   0.042
```

Closes #38382 from LuciferYang/SPARK-40905.

Authored-by: yangjie01 <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
### What changes were proposed in this pull request?
This PR aims to replace 'intercept' with 'Check error classes' in HiveQuerySuite.

### Why are the changes needed?
The changes improve the error framework.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
By running the modified test suite:
```
$ build/sbt "test:testOnly *HiveQuerySuite"
```

Closes #38386 from panbingkun/SPARK-40888.

Lead-authored-by: panbingkun <[email protected]>
Co-authored-by: Maxim Gekk <[email protected]>
Signed-off-by: Max Gekk <[email protected]>
### What changes were proposed in this pull request?
Reimplement `frequentItems` with dataframe operations

### Why are the changes needed?
1, do not truncate the sql plan any more;
2, enable sql optimization like column pruning

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
existing UTs and manually check

Closes #38375 from zhengruifeng/sql_stat_freq_item.

Authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
@pull pull bot merged commit f571f2e into huangxiaopingRD:master Oct 26, 2022
pull bot pushed a commit that referenced this pull request Feb 24, 2024
…n properly

### What changes were proposed in this pull request?
Make `ResolveRelations` handle plan id properly

### Why are the changes needed?
bug fix for Spark Connect, it won't affect classic Spark SQL

before this PR:
```
from pyspark.sql import functions as sf

spark.range(10).withColumn("value_1", sf.lit(1)).write.saveAsTable("test_table_1")
spark.range(10).withColumnRenamed("id", "index").withColumn("value_2", sf.lit(2)).write.saveAsTable("test_table_2")

df1 = spark.read.table("test_table_1")
df2 = spark.read.table("test_table_2")
df3 = spark.read.table("test_table_1")

join1 = df1.join(df2, on=df1.id==df2.index).select(df2.index, df2.value_2)
join2 = df3.join(join1, how="left", on=join1.index==df3.id)

join2.schema
```

fails with
```
AnalysisException: [CANNOT_RESOLVE_DATAFRAME_COLUMN] Cannot resolve dataframe column "id". It's probably because of illegal references like `df1.select(df2.col("a"))`. SQLSTATE: 42704
```

That is due to existing plan caching in `ResolveRelations` doesn't work with Spark Connect

```
=== Applying Rule org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations ===
 '[#12]Join LeftOuter, '`==`('index, 'id)                     '[#12]Join LeftOuter, '`==`('index, 'id)
!:- '[#9]UnresolvedRelation [test_table_1], [], false         :- '[#9]SubqueryAlias spark_catalog.default.test_table_1
!+- '[#11]Project ['index, 'value_2]                          :  +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_1`, [], false
!   +- '[#10]Join Inner, '`==`('id, 'index)                   +- '[#11]Project ['index, 'value_2]
!      :- '[#7]UnresolvedRelation [test_table_1], [], false      +- '[#10]Join Inner, '`==`('id, 'index)
!      +- '[#8]UnresolvedRelation [test_table_2], [], false         :- '[#9]SubqueryAlias spark_catalog.default.test_table_1
!                                                                   :  +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_1`, [], false
!                                                                   +- '[#8]SubqueryAlias spark_catalog.default.test_table_2
!                                                                      +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_2`, [], false

Can not resolve 'id with plan 7
```

`[#7]UnresolvedRelation [test_table_1], [], false` was wrongly resolved to the cached one
```
:- '[#9]SubqueryAlias spark_catalog.default.test_table_1
   +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_1`, [], false
```

### Does this PR introduce _any_ user-facing change?
yes, bug fix

### How was this patch tested?
added ut

### Was this patch authored or co-authored using generative AI tooling?
ci

Closes apache#45214 from zhengruifeng/connect_fix_read_join.

Authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
huangxiaopingRD pushed a commit that referenced this pull request Sep 2, 2025
…plan properly

### What changes were proposed in this pull request?
Make `ResolveRelations` handle plan id properly

cherry-pick bugfix apache#45214 to 3.5

### Why are the changes needed?
bug fix for Spark Connect, it won't affect classic Spark SQL

before this PR:
```
from pyspark.sql import functions as sf

spark.range(10).withColumn("value_1", sf.lit(1)).write.saveAsTable("test_table_1")
spark.range(10).withColumnRenamed("id", "index").withColumn("value_2", sf.lit(2)).write.saveAsTable("test_table_2")

df1 = spark.read.table("test_table_1")
df2 = spark.read.table("test_table_2")
df3 = spark.read.table("test_table_1")

join1 = df1.join(df2, on=df1.id==df2.index).select(df2.index, df2.value_2)
join2 = df3.join(join1, how="left", on=join1.index==df3.id)

join2.schema
```

fails with
```
AnalysisException: [CANNOT_RESOLVE_DATAFRAME_COLUMN] Cannot resolve dataframe column "id". It's probably because of illegal references like `df1.select(df2.col("a"))`. SQLSTATE: 42704
```

That is due to existing plan caching in `ResolveRelations` doesn't work with Spark Connect

```
=== Applying Rule org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations ===
 '[#12]Join LeftOuter, '`==`('index, 'id)                     '[#12]Join LeftOuter, '`==`('index, 'id)
!:- '[#9]UnresolvedRelation [test_table_1], [], false         :- '[#9]SubqueryAlias spark_catalog.default.test_table_1
!+- '[#11]Project ['index, 'value_2]                          :  +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_1`, [], false
!   +- '[#10]Join Inner, '`==`('id, 'index)                   +- '[#11]Project ['index, 'value_2]
!      :- '[#7]UnresolvedRelation [test_table_1], [], false      +- '[#10]Join Inner, '`==`('id, 'index)
!      +- '[#8]UnresolvedRelation [test_table_2], [], false         :- '[#9]SubqueryAlias spark_catalog.default.test_table_1
!                                                                   :  +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_1`, [], false
!                                                                   +- '[#8]SubqueryAlias spark_catalog.default.test_table_2
!                                                                      +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_2`, [], false

Can not resolve 'id with plan 7
```

`[#7]UnresolvedRelation [test_table_1], [], false` was wrongly resolved to the cached one
```
:- '[#9]SubqueryAlias spark_catalog.default.test_table_1
   +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_1`, [], false
```

### Does this PR introduce _any_ user-facing change?
yes, bug fix

### How was this patch tested?
added ut

### Was this patch authored or co-authored using generative AI tooling?
ci

Closes apache#46291 from zhengruifeng/connect_fix_read_join_35.

Authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
huangxiaopingRD pushed a commit that referenced this pull request Sep 2, 2025
…plan properly

### What changes were proposed in this pull request?
Make `ResolveRelations` handle plan id properly

cherry-pick bugfix apache#45214 to 3.4

### Why are the changes needed?
bug fix for Spark Connect, it won't affect classic Spark SQL

before this PR:
```
from pyspark.sql import functions as sf

spark.range(10).withColumn("value_1", sf.lit(1)).write.saveAsTable("test_table_1")
spark.range(10).withColumnRenamed("id", "index").withColumn("value_2", sf.lit(2)).write.saveAsTable("test_table_2")

df1 = spark.read.table("test_table_1")
df2 = spark.read.table("test_table_2")
df3 = spark.read.table("test_table_1")

join1 = df1.join(df2, on=df1.id==df2.index).select(df2.index, df2.value_2)
join2 = df3.join(join1, how="left", on=join1.index==df3.id)

join2.schema
```

fails with
```
AnalysisException: [CANNOT_RESOLVE_DATAFRAME_COLUMN] Cannot resolve dataframe column "id". It's probably because of illegal references like `df1.select(df2.col("a"))`. SQLSTATE: 42704
```

That is due to existing plan caching in `ResolveRelations` doesn't work with Spark Connect

```
=== Applying Rule org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations ===
 '[#12]Join LeftOuter, '`==`('index, 'id)                     '[#12]Join LeftOuter, '`==`('index, 'id)
!:- '[#9]UnresolvedRelation [test_table_1], [], false         :- '[#9]SubqueryAlias spark_catalog.default.test_table_1
!+- '[#11]Project ['index, 'value_2]                          :  +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_1`, [], false
!   +- '[#10]Join Inner, '`==`('id, 'index)                   +- '[#11]Project ['index, 'value_2]
!      :- '[#7]UnresolvedRelation [test_table_1], [], false      +- '[#10]Join Inner, '`==`('id, 'index)
!      +- '[#8]UnresolvedRelation [test_table_2], [], false         :- '[#9]SubqueryAlias spark_catalog.default.test_table_1
!                                                                   :  +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_1`, [], false
!                                                                   +- '[#8]SubqueryAlias spark_catalog.default.test_table_2
!                                                                      +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_2`, [], false

Can not resolve 'id with plan 7
```

`[#7]UnresolvedRelation [test_table_1], [], false` was wrongly resolved to the cached one
```
:- '[#9]SubqueryAlias spark_catalog.default.test_table_1
   +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_1`, [], false
```

### Does this PR introduce _any_ user-facing change?
yes, bug fix

### How was this patch tested?
added ut

### Was this patch authored or co-authored using generative AI tooling?
ci

Closes apache#46290 from zhengruifeng/connect_fix_read_join_34.

Authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.