[SPARK-34355][SQL] Update FIleBatchWriter too#7
Closed
AngersZhuuuu wants to merge 20 commits intoulysses-you:add-commit-logfrom
Closed
[SPARK-34355][SQL] Update FIleBatchWriter too#7AngersZhuuuu wants to merge 20 commits intoulysses-you:add-commit-logfrom
AngersZhuuuu wants to merge 20 commits intoulysses-you:add-commit-logfrom
Conversation
…portion fixed regardless of timezone ### What changes were proposed in this pull request? Due to user-experience (confusing to Spark users - java.sql.Time using milliseconds vs Spark using microseconds; and user losing useful functions like hour(), minute(), etc on the column), we have decided to revert back to use TimestampType but this time we will enforce the hour to be consistently across system timezone (via offset manipulation) and date part fixed to zero epoch. Full Discussion with Wenchen Fan Wenchen Fan regarding this ticket is here apache#30902 (comment) ### Why are the changes needed? Revert and improvement to sql.Time handling ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit tests and integration tests Closes apache#31473 from saikocat/SPARK-34357. Authored-by: Hoa <hoameomu@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…ildLocationMetadata() ### What changes were proposed in this pull request? This PR proposes to expose the number of total paths in Utils.buildLocationMetadata(), with relaxing space usage a bit (around 10+ chars). Suppose the first 2 of 5 paths are only fit to the threshold, the outputs between the twos are below: * before the change: `[path1, path2]` * after the change: `(5 paths)[path1, path2, ...]` ### Why are the changes needed? SPARK-31793 silently truncates the paths hence end users can't indicate how many paths are truncated, and even more, whether paths are truncated or not. ### Does this PR introduce _any_ user-facing change? Yes, the location metadata will also show how many paths are truncated (not shown), instead of silently truncated. ### How was this patch tested? Modified UTs Closes apache#31464 from HeartSaVioR/SPARK-34339. Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com> Signed-off-by: HyukjinKwon <gurwls223@apache.org>
… will override by loading hive-site.xml accidentally may cause perf regression ### What changes were proposed in this pull request? In many real-world cases, when interacting with hive catalog through Spark SQL, users may just share the `hive-site.xml` for their hive jobs and make a copy to `SPARK_HOME`/conf w/o modification. In Spark, when we generate Hadoop configurations, we will use `spark.buffer.size(65536)` to reset `io.file.buffer.size(4096)`. But when we load the hive-site.xml, we may ignore this behavior and reset `io.file.buffer.size` again according to `hive-site.xml`. 1. The configuration priority for setting Hadoop and Hive config here is not right, while literally, the order should be `spark > spark.hive > spark.hadoop > hive > hadoop` 2. This breaks `spark.buffer.size` congfig's behavior for tuning the IO performance w/ HDFS if there is an existing `io.file.buffer.size` in hive-site.xml ### Why are the changes needed? bugfix for configuration behavior and fix performance regression by that behavior change ### Does this PR introduce _any_ user-facing change? this pr restores silent user face change ### How was this patch tested? new tests Closes apache#31460 from yaooqinn/SPARK-34346. Authored-by: Kent Yao <yao@apache.org> Signed-off-by: HyukjinKwon <gurwls223@apache.org>
### What changes were proposed in this pull request? Param Validation throw `IllegalArgumentException` ### Why are the changes needed? Param Validation should throw `IllegalArgumentException` instead of `IllegalStateException` ### Does this PR introduce _any_ user-facing change? Yes, the type of exception changed ### How was this patch tested? existing testsuites Closes apache#31469 from zhengruifeng/mllib_exceptions. Authored-by: Ruifeng Zheng <ruifengz@foxmail.com> Signed-off-by: Ruifeng Zheng <ruifengz@foxmail.com>
…f SHOW DATABASES ### What changes were proposed in this pull request? This is a followup of apache#26006 In apache#26006 , we merged the v1 and v2 SHOW DATABASES/NAMESPACES commands, but we missed a behavior change that the output schema of SHOW DATABASES becomes different. This PR adds a legacy config to restore the old schema, with a migration guide item to mention this behavior change. ### Why are the changes needed? Improve backward compatibility ### Does this PR introduce _any_ user-facing change? No (the legacy config is false by default) ### How was this patch tested? a new test Closes apache#31474 from cloud-fan/command-schema. Lead-authored-by: Wenchen Fan <cloud0fan@gmail.com> Co-authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…n PostgreSQL ### What changes were proposed in this pull request? This PR added tests for some non-array types in PostgreSQL. PostgreSQL supports wide range of types (https://www.postgresql.org/docs/13/datatype.html) and `PostgresIntegrationSuite` contains tests for some types but ones for the following types are missing. * bit varying * point * line * lseg * box * path * polygon * circle * pg_lsn * macaddr * macaddr8 * numeric * pg_snapshot * real * time * timestamp * tsquery * tsvector * txid_snapshot * xml NOTE: Handling money types can be buggy so this PR doesn't add tests for those types. ### Why are the changes needed? To ensure those types work with Spark well. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Extended `PostgresIntegrationSuite`. Closes apache#31456 from sarutak/test-for-some-types-postgresql. Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com> Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
… datasource v1 and v2 ### What changes were proposed in this pull request? Extract the date/timestamps rebasing tests from `ParquetIOSuite` to `ParquetRebaseDatetimeSuite` to run them for both DSv1 and DSv2 implementations of Parquet datasource. ### Why are the changes needed? To improve test coverage. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? By running new test suites: ``` $ build/sbt "sql/test:testOnly *ParquetRebaseDatetimeV2Suite" $ build/sbt "sql/test:testOnly *ParquetRebaseDatetimeV1Suite" $ build/sbt "sql/test:testOnly *ParquetIOSuite" ``` Closes apache#31478 from MaxGekk/rebase-tests-dsv1-and-dsv2. Authored-by: Max Gekk <max.gekk@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request? This is a follow-up of apache#28027 apache#28027 added a DS v2 API that allows data sources to produce metadata/hidden columns that can only be seen when it's explicitly selected. The way we integrate this API into Spark is: 1. The v2 relation gets normal output and metadata output from the data source, and the metadata output is excluded from the plan output by default. 2. column resolution can resolve `UnresolvedAttribute` with metadata columns, even if the child plan doesn't output metadata columns. 3. An analyzer rule searches the query plan, trying to find a node that has missing inputs. If such node is found, transform the sub-plan of this node, and update the v2 relation to include the metadata output. The analyzer rule in step 3 brings a perf regression, for queries that do not read v2 tables at all. This rule will calculate `QueryPlan.inputSet` (which builds an `AttributeSet` from outputs of all children) and `QueryPlan.missingInput` (which does a set exclusion and creates a new `AttributeSet`) for every plan node in the query plan. In our benchmark, the TPCDS query compilation time gets increased by more than 10% This PR proposes a simple way to improve it: we add a special metadata entry to the metadata attribute, which allows us to quickly check if a plan needs to add metadata columns: we just check all the references of this plan, and see if the attribute contains the special metadata entry, instead of calculating `QueryPlan.missingInput`. This PR also fixes one bug: we should not change the final output schema of the plan, if we only use metadata columns in operators like filter, sort, etc. ### Why are the changes needed? Fix perf regression in SQL query compilation, and fix a bug. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Run `org.apache.spark.sql.TPCDSQuerySuite`, before this PR, `AddMetadataColumns` is the top 4 rule ranked by running time ``` === Metrics of Analyzer/Optimizer Rules === Total number of runs: 407641 Total time: 47.257239779 seconds Rule Effective Time / Total Time Effective Runs / Total Runs OptimizeSubqueries 4157690003 / 8485444626 49 / 2778 Analyzer$ResolveAggregateFunctions 1238968711 / 3369351761 49 / 2141 ColumnPruning 660038236 / 2924755292 338 / 6391 Analyzer$AddMetadataColumns 0 / 2918352992 0 / 2151 ``` after this PR: ``` Analyzer$AddMetadataColumns 0 / 122885629 0 / 2151 ``` This rule is 20 times faster and is negligible to the total compilation time. This PR also add new tests to verify the bug fix. Closes apache#31440 from cloud-fan/metadata-col. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
… data source v1 ### What changes were proposed in this pull request? As a followup from discussion in apache#29804 (comment) . Currently in data source v1 file scan `FileSourceScanExec`, [bucket filter pruning will only take effect with bucket table scan](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L542 ). However this is unnecessary, as bucket filter pruning can also happen if we disable bucketed table scan. Read files with bucket hash partitioning, and bucket filter pruning are two orthogonal features, and do not need to couple together. ### Why are the changes needed? This help query leverage the benefit from bucket filter pruning to save CPU/IO to not read unnecessary bucket files, and do not bound by bucket table scan when the parallelism of tasks is a concern. In addition, this also resolves the issue to reduce number of tasks launched for simple query with bucket column filter - SPARK-33207, because with bucket scan, we launch # of tasks to equal to # of buckets, and this is unnecessary. ### Does this PR introduce _any_ user-facing change? Users will notice query to start pruning irrelevant files for reading bucketed table, when disabling bucketing. If the input data does not follow spark data source bucketing convention, by default exception will be thrown and query will be failed. The exception can be bypassed with setting config `spark.sql.files.ignoreCorruptFiles` to true. ### How was this patch tested? Added unit test in `BucketedReadSuite.scala` to make all existing unit tests for bucket filter work with this PR. Closes apache#31413 from c21/bucket-pruning. Authored-by: Cheng Su <chengsu@fb.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…rationSuite with DateTimeTestUtils.withDefaultTimeZone ### What changes were proposed in this pull request? This PR replaces `withTimeZone` defined and used in `OracleIntegrationSuite` with `DateTimeTestUtils.withDefaultTimeZone` which is defined as a utility method. ### Why are the changes needed? Both methods are semantically the same so it might be better to use the utility one. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? `OracleIntegrationSuite` passes. Closes apache#31465 from sarutak/oracle-timezone-util. Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com> Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
…ive tables with "avro.schema.literal" ### What changes were proposed in this pull request? Before this PR for a partitioned Avro Hive table when the SerDe is configured to read the partition data the table level properties were overwritten by the partition level properties. This PR changes this ordering by giving table level properties higher precedence thus when a new evolved schema is set for the table this new schema will be used to read the partition data and not the original schema which was used for writing the data. This new behavior is consistent with Apache Hive. See the example used in the unit test `SPARK-26836: support Avro schema evolution`, in Hive this results in: ``` 0: jdbc:hive2://<IP>:10000> select * from t; INFO : Compiling command(queryId=hive_20210111141102_7a6349d0-f9ed-4aad-ac07-b94b44de2394): select * from t INFO : Semantic Analysis Completed INFO : Returning Hive schema: Schema(fieldSchemas:[FieldSchema(name:t.col1, type:string, comment:null), FieldSchema(name:t.col2, type:string, comment:null), FieldSchema(name:t.ds, type:string, comment:null)], properties:null) INFO : Completed compiling command(queryId=hive_20210111141102_7a6349d0-f9ed-4aad-ac07-b94b44de2394); Time taken: 0.098 seconds INFO : Executing command(queryId=hive_20210111141102_7a6349d0-f9ed-4aad-ac07-b94b44de2394): select * from t INFO : Completed executing command(queryId=hive_20210111141102_7a6349d0-f9ed-4aad-ac07-b94b44de2394); Time taken: 0.013 seconds INFO : OK +---------------+-------------+-------------+ | t.col1 | t.col2 | t.ds | +---------------+-------------+-------------+ | col1_default | col2_value | 1981-01-07 | | col1_value | col2_value | 1983-04-27 | +---------------+-------------+-------------+ 2 rows selected (0.159 seconds) ``` ### Why are the changes needed? Without this change the old schema would be used. This can use a correctness issue when the new schema introduces a new field with a default value (following the rules of schema evolution) before an existing field. In this case the rows coming from the partition where the old schema was used will **contain values in wrong column positions**. For example check the attached unit test `SPARK-26836: support Avro schema evolution` Without this fix the result of the select on the table would be: ``` +----------+----------+----------+ | col1| col2| ds| +----------+----------+----------+ |col2_value| null|1981-01-07| |col1_value|col2_value|1983-04-27| +----------+----------+----------+ ``` With this fix: ``` +------------+----------+----------+ | col1| col2| ds| +------------+----------+----------+ |col1_default|col2_value|1981-01-07| | col1_value|col2_value|1983-04-27| +------------+----------+----------+ ``` ### Does this PR introduce _any_ user-facing change? Just fixes the value errors. When a new column is introduced even to the last position then instead of 'null' the given default will be used. ### How was this patch tested? This was tested with the unit tested included to the PR. And manually on Apache Spark / Hive. Closes apache#31133 from attilapiros/SPARK-26836. Authored-by: “attilapiros” <piros.attila.zsolt@gmail.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request? Fix docstring of PySpark `DataFrame.join`. ### Why are the changes needed? For a better view of PySpark documentation. ### Does this PR introduce _any_ user-facing change? No (only documentation changes). ### How was this patch tested? Manual test. From  To  Closes apache#31463 from xinrong-databricks/fixDoc. Authored-by: Xinrong Meng <xinrong.meng@databricks.com> Signed-off-by: Sean Owen <srowen@gmail.com>
### What changes were proposed in this pull request? `hashDistance` optimization: if two vectors in a pair are the same, directly return 0.0 ### Why are the changes needed? it should be faster than existing impl, because of short-circuit ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? existing testsuites Closes apache#31394 from zhengruifeng/min_hash_distance_opt. Authored-by: Ruifeng Zheng <ruifengz@foxmail.com> Signed-off-by: Sean Owen <srowen@gmail.com>
### What changes were proposed in this pull request?
Rewrote one `ExtractGenerator` case such that it would not rely on a side effect of the flatmap function.
### Why are the changes needed?
With the dataframe api it is possible to have a lazy sequence as the `output` of a `LogicalPlan`. When exploding a column on this dataframe using the `withColumn("newName", explode(col("name")))` method, the `ExtractGenerator` does not extract the generator and `CheckAnalysis` would throw an exception.
### Does this PR introduce _any_ user-facing change?
Bugfix
Before this, the work around was to put `.select("*")` before the explode.
### How was this patch tested?
UT
Closes apache#31213 from tanelk/SPARK-34141_extract_generator.
Authored-by: tanel.kiis@gmail.com <tanel.kiis@gmail.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
### What changes were proposed in this pull request? 1, clear predictionCol & probabilityCol, use tmp rawPred col, to avoid potential column conflict; 2, use array instead of map, to keep in line with the python side; 3, simplify transform ### Why are the changes needed? if input dataset has a column whose name is `predictionCol`,`probabilityCol`,`RawPredictionCol`, transfrom will fail. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? added testsuite Closes apache#31472 from zhengruifeng/ovr_submodel_skip_pred_prob. Authored-by: Ruifeng Zheng <ruifengz@foxmail.com> Signed-off-by: Sean Owen <srowen@gmail.com>
### What changes were proposed in this pull request? This PR aims to add ZStandardBenchmark as a base-line. ### Why are the changes needed? This will prevent any regression when we upgrade Zstandard library in the future. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Manually. Closes apache#31498 from dongjoon-hyun/SPARK-ZSTD-BENCH. Authored-by: Dongjoon Hyun <dhyun@apple.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
…ource ### What changes were proposed in this pull request? Unwrap `SparkUpgradeException` from `ParquetDecodingException` in v2 `FilePartitionReader` in the same way as v1 implementation does: https://github.com/apache/spark/blob/3a299aa6480ac22501512cd0310d31a441d7dfdc/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala#L180-L183 ### Why are the changes needed? 1. To be compatible with v1 implementation of the Parquet datasource. 2. To improve UX with Spark SQL by making `SparkUpgradeException` more visible. ### Does this PR introduce _any_ user-facing change? Yes, it can. ### How was this patch tested? By running the affected test suites: ``` $ build/sbt "sql/test:testOnly *ParquetRebaseDatetimeV1Suite" $ build/sbt "sql/test:testOnly *ParquetRebaseDatetimeV2Suite" ``` Closes apache#31497 from MaxGekk/parquet-spark-upgrade-exception. Authored-by: Max Gekk <max.gekk@gmail.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
… tables using "avro.schema.url" ### What changes were proposed in this pull request? With apache#31133 Avro schema evolution is introduce for partitioned hive tables where the schema is given by `avro.schema.literal`. Here that functionality is extended to support schema evolution where the schema is defined via `avro.schema.url`. ### Why are the changes needed? Without this PR the problem described in apache#31133 can be reproduced by tables where `avro.schema.url` is used. As in this case always the property value given at partition level is used for the `avro.schema.url`. So for example when a new column (with a default value) is added to the table then one the following problem happens: - when the new field is added after the last one the cell values will be null values instead of the default value - when the schema is extended somewhere before the last field then values will be listed for the wrong column positions Similar error will happen when one of the field is removed from the schema. For details please check the attached unit tests where both cases are checked. ### Does this PR introduce _any_ user-facing change? Fixes the potential value error. ### How was this patch tested? The existing unit tests for schema evolution is generalized and reused. New tests: - `SPARK-34370: support Avro schema evolution (add column with avro.schema.url)` - `SPARK-34370: support Avro schema evolution (remove column with avro.schema.url)` Closes apache#31501 from attilapiros/SPARK-34370. Authored-by: “attilapiros” <piros.attila.zsolt@gmail.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request?
This pr format DateLiteral and TimestampLiteral toString. For example:
```sql
SELECT * FROM date_dim WHERE d_date BETWEEN (cast('2000-03-11' AS DATE) - INTERVAL 30 days) AND (cast('2000-03-11' AS DATE) + INTERVAL 30 days)
```
Before this pr:
```
Condition : (((isnotnull(d_date#18) AND (d_date#18 >= 10997)) AND (d_date#18 <= 11057))
```
After this pr:
```
Condition : (((isnotnull(d_date#14) AND (d_date#14 >= 2000-02-10)) AND (d_date#14 <= 2000-04-10))
```
### Why are the changes needed?
Make the plan more readable.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Unit test.
Closes apache#31455 from wangyum/SPARK-34342.
Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
Author
|
@ulysses-you Hi, can you update your branch and merge this too? |
ulysses-you
pushed a commit
that referenced
this pull request
Feb 27, 2024
…n properly
### What changes were proposed in this pull request?
Make `ResolveRelations` handle plan id properly
### Why are the changes needed?
bug fix for Spark Connect, it won't affect classic Spark SQL
before this PR:
```
from pyspark.sql import functions as sf
spark.range(10).withColumn("value_1", sf.lit(1)).write.saveAsTable("test_table_1")
spark.range(10).withColumnRenamed("id", "index").withColumn("value_2", sf.lit(2)).write.saveAsTable("test_table_2")
df1 = spark.read.table("test_table_1")
df2 = spark.read.table("test_table_2")
df3 = spark.read.table("test_table_1")
join1 = df1.join(df2, on=df1.id==df2.index).select(df2.index, df2.value_2)
join2 = df3.join(join1, how="left", on=join1.index==df3.id)
join2.schema
```
fails with
```
AnalysisException: [CANNOT_RESOLVE_DATAFRAME_COLUMN] Cannot resolve dataframe column "id". It's probably because of illegal references like `df1.select(df2.col("a"))`. SQLSTATE: 42704
```
That is due to existing plan caching in `ResolveRelations` doesn't work with Spark Connect
```
=== Applying Rule org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations ===
'[#12]Join LeftOuter, '`==`('index, 'id) '[#12]Join LeftOuter, '`==`('index, 'id)
!:- '[#9]UnresolvedRelation [test_table_1], [], false :- '[#9]SubqueryAlias spark_catalog.default.test_table_1
!+- '[#11]Project ['index, 'value_2] : +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_1`, [], false
! +- '[#10]Join Inner, '`==`('id, 'index) +- '[#11]Project ['index, 'value_2]
! :- '[#7]UnresolvedRelation [test_table_1], [], false +- '[#10]Join Inner, '`==`('id, 'index)
! +- '[#8]UnresolvedRelation [test_table_2], [], false :- '[#9]SubqueryAlias spark_catalog.default.test_table_1
! : +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_1`, [], false
! +- '[#8]SubqueryAlias spark_catalog.default.test_table_2
! +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_2`, [], false
Can not resolve 'id with plan 7
```
`[#7]UnresolvedRelation [test_table_1], [], false` was wrongly resolved to the cached one
```
:- '[#9]SubqueryAlias spark_catalog.default.test_table_1
+- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_1`, [], false
```
### Does this PR introduce _any_ user-facing change?
yes, bug fix
### How was this patch tested?
added ut
### Was this patch authored or co-authored using generative AI tooling?
ci
Closes apache#45214 from zhengruifeng/connect_fix_read_join.
Authored-by: Ruifeng Zheng <ruifengz@apache.org>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
ulysses-you
pushed a commit
that referenced
this pull request
Aug 13, 2024
…plan properly ### What changes were proposed in this pull request? Make `ResolveRelations` handle plan id properly cherry-pick bugfix apache#45214 to 3.5 ### Why are the changes needed? bug fix for Spark Connect, it won't affect classic Spark SQL before this PR: ``` from pyspark.sql import functions as sf spark.range(10).withColumn("value_1", sf.lit(1)).write.saveAsTable("test_table_1") spark.range(10).withColumnRenamed("id", "index").withColumn("value_2", sf.lit(2)).write.saveAsTable("test_table_2") df1 = spark.read.table("test_table_1") df2 = spark.read.table("test_table_2") df3 = spark.read.table("test_table_1") join1 = df1.join(df2, on=df1.id==df2.index).select(df2.index, df2.value_2) join2 = df3.join(join1, how="left", on=join1.index==df3.id) join2.schema ``` fails with ``` AnalysisException: [CANNOT_RESOLVE_DATAFRAME_COLUMN] Cannot resolve dataframe column "id". It's probably because of illegal references like `df1.select(df2.col("a"))`. SQLSTATE: 42704 ``` That is due to existing plan caching in `ResolveRelations` doesn't work with Spark Connect ``` === Applying Rule org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations === '[#12]Join LeftOuter, '`==`('index, 'id) '[#12]Join LeftOuter, '`==`('index, 'id) !:- '[#9]UnresolvedRelation [test_table_1], [], false :- '[#9]SubqueryAlias spark_catalog.default.test_table_1 !+- '[#11]Project ['index, 'value_2] : +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_1`, [], false ! +- '[#10]Join Inner, '`==`('id, 'index) +- '[#11]Project ['index, 'value_2] ! :- '[#7]UnresolvedRelation [test_table_1], [], false +- '[#10]Join Inner, '`==`('id, 'index) ! +- '[#8]UnresolvedRelation [test_table_2], [], false :- '[#9]SubqueryAlias spark_catalog.default.test_table_1 ! : +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_1`, [], false ! +- '[#8]SubqueryAlias spark_catalog.default.test_table_2 ! +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_2`, [], false Can not resolve 'id with plan 7 ``` `[#7]UnresolvedRelation [test_table_1], [], false` was wrongly resolved to the cached one ``` :- '[#9]SubqueryAlias spark_catalog.default.test_table_1 +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_1`, [], false ``` ### Does this PR introduce _any_ user-facing change? yes, bug fix ### How was this patch tested? added ut ### Was this patch authored or co-authored using generative AI tooling? ci Closes apache#46291 from zhengruifeng/connect_fix_read_join_35. Authored-by: Ruifeng Zheng <ruifengz@apache.org> Signed-off-by: Ruifeng Zheng <ruifengz@apache.org>
ulysses-you
pushed a commit
that referenced
this pull request
Aug 13, 2024
…plan properly ### What changes were proposed in this pull request? Make `ResolveRelations` handle plan id properly cherry-pick bugfix apache#45214 to 3.4 ### Why are the changes needed? bug fix for Spark Connect, it won't affect classic Spark SQL before this PR: ``` from pyspark.sql import functions as sf spark.range(10).withColumn("value_1", sf.lit(1)).write.saveAsTable("test_table_1") spark.range(10).withColumnRenamed("id", "index").withColumn("value_2", sf.lit(2)).write.saveAsTable("test_table_2") df1 = spark.read.table("test_table_1") df2 = spark.read.table("test_table_2") df3 = spark.read.table("test_table_1") join1 = df1.join(df2, on=df1.id==df2.index).select(df2.index, df2.value_2) join2 = df3.join(join1, how="left", on=join1.index==df3.id) join2.schema ``` fails with ``` AnalysisException: [CANNOT_RESOLVE_DATAFRAME_COLUMN] Cannot resolve dataframe column "id". It's probably because of illegal references like `df1.select(df2.col("a"))`. SQLSTATE: 42704 ``` That is due to existing plan caching in `ResolveRelations` doesn't work with Spark Connect ``` === Applying Rule org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations === '[#12]Join LeftOuter, '`==`('index, 'id) '[#12]Join LeftOuter, '`==`('index, 'id) !:- '[#9]UnresolvedRelation [test_table_1], [], false :- '[#9]SubqueryAlias spark_catalog.default.test_table_1 !+- '[#11]Project ['index, 'value_2] : +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_1`, [], false ! +- '[#10]Join Inner, '`==`('id, 'index) +- '[#11]Project ['index, 'value_2] ! :- '[#7]UnresolvedRelation [test_table_1], [], false +- '[#10]Join Inner, '`==`('id, 'index) ! +- '[#8]UnresolvedRelation [test_table_2], [], false :- '[#9]SubqueryAlias spark_catalog.default.test_table_1 ! : +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_1`, [], false ! +- '[#8]SubqueryAlias spark_catalog.default.test_table_2 ! +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_2`, [], false Can not resolve 'id with plan 7 ``` `[#7]UnresolvedRelation [test_table_1], [], false` was wrongly resolved to the cached one ``` :- '[#9]SubqueryAlias spark_catalog.default.test_table_1 +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_1`, [], false ``` ### Does this PR introduce _any_ user-facing change? yes, bug fix ### How was this patch tested? added ut ### Was this patch authored or co-authored using generative AI tooling? ci Closes apache#46290 from zhengruifeng/connect_fix_read_join_34. Authored-by: Ruifeng Zheng <ruifengz@apache.org> Signed-off-by: Ruifeng Zheng <ruifengz@apache.org>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Update FIleBatchWriter too