Skip to content

Conversation

@chenzhx
Copy link

@chenzhx chenzhx commented May 31, 2022

What changes were proposed in this pull request?

Why are the changes needed?

Does this PR introduce any user-facing change?

How was this patch tested?

beliefer and others added 10 commits May 28, 2022 00:15
…sions

### What changes were proposed in this pull request?
Currently, Spark DS V2 aggregate push-down only supports group by column.
But the SQL show below is very useful and common.
```
SELECT
  CASE
    WHEN 'SALARY' > 8000.00
      AND 'SALARY' < 10000.00
    THEN 'SALARY'
    ELSE 0.00
  END AS key,
  SUM('SALARY')
FROM "test"."employee"
GROUP BY key
```

### Why are the changes needed?
Let DS V2 aggregate push-down supports group by expressions

### Does this PR introduce _any_ user-facing change?
'No'.
New feature.

### How was this patch tested?
New tests

Closes apache#36325 from beliefer/SPARK-38997.

Authored-by: Jiaan Geng <beliefer@163.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>

fix ut
…group by without aggregate functions

### What changes were proposed in this pull request?
Currently, the SQL show below not supported by DS V2 aggregate partial push-down.
`select key from tab group by key`

### Why are the changes needed?
Make DS V2 aggregate partial push-down supports group by without aggregate functions.

### Does this PR introduce _any_ user-facing change?
'No'.
New feature.

### How was this patch tested?
New tests

Closes apache#36492 from beliefer/SPARK-39135.

Authored-by: Jiaan Geng <beliefer@163.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…he data type is correct

### What changes were proposed in this pull request?
Currently, `H2Dialect` not implement `getJDBCType` of `JdbcDialect`, so the DS V2 push-down will throw exception show below:
```
Job aborted due to stage failure: Task 0 in stage 13.0 failed 1 times, most recent failure: Lost task 0.0 in stage 13.0 (TID 13) (jiaan-gengdembp executor driver):
 org.h2.jdbc.JdbcSQLNonTransientException: Unknown data type: "STRING"; SQL statement:
SELECT "DEPT","NAME","SALARY","BONUS","IS_MANAGER" FROM "test"."employee"  WHERE ("BONUS" IS NOT NULL) AND ("DEPT" IS NOT NULL) AND (CAST("BONUS" AS string) LIKE '%30%') AND (CAST("DEPT" AS byte) > 1) AND (CAST("DEPT" AS short) > 1) AND (CAST("BONUS" AS decimal(20,2)) > 1200.00)    [50004-210]
```
H2Dialect should implement `getJDBCType` of `JdbcDialect`.

### Why are the changes needed?
 make the H2 data type is correct.

### Does this PR introduce _any_ user-facing change?
'Yes'.
Fix a bug for `H2Dialect`.

### How was this patch tested?
New tests.

Closes apache#36516 from beliefer/SPARK-39157.

Authored-by: Jiaan Geng <beliefer@163.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
… pushed down

### What changes were proposed in this pull request?
Regardless of whether the functions are ANSI or not, most databases are actually unsure of their support.
So we should add a new API into `JdbcDialect` so that jdbc dialect decide which function could be pushed down.

### Why are the changes needed?
Let function push-down more flexible.

### Does this PR introduce _any_ user-facing change?
'No'.
New feature.

### How was this patch tested?
Exists tests.

Closes apache#36521 from beliefer/SPARK-39162.

Authored-by: Jiaan Geng <beliefer@163.com>
Signed-off-by: huaxingao <huaxin_gao@apple.com>
### What changes were proposed in this pull request?

Currently, Spark have some string functions of ANSI standard. Please refer

https://github.com/apache/spark/blob/2f8613f22c0750c00cf1dcfb2f31c431d8dc1be7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala#L503

These functions show below:
`SUBSTRING,`
`UPPER`,
`LOWER`,
`TRANSLATE`,
`TRIM`,
`OVERLAY`

The mainstream databases support these functions show below.
Function | PostgreSQL | ClickHouse | H2 | MySQL | Oracle | Redshift | Presto | Teradata | Snowflake | DB2 | Vertica | Exasol | SqlServer | Yellowbrick | Impala | Mariadb | Druid | Pig | SQLite | Influxdata | Singlestore | ElasticSearch
-- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | --
`SUBSTRING` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes
`UPPER` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes
`LOWER` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | YES | Yes | Yes | Yes | Yes
`TRIM` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes
`TRANSLATE` | Yes | No | Yes | No | Yes | Yes | No | No | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | No | No | No | No | No | No
`OVERLAY` | Yes | No | No | No | Yes | No | No | No | No | Yes | Yes | No | No | No | No | No | No | No | No | No | No | No

DS V2 should supports push down these string functions.

### Why are the changes needed?

DS V2 supports push down string functions

### Does this PR introduce _any_ user-facing change?

'No'.
New feature.

### How was this patch tested?

New tests.

Closes apache#36330 from chenzhx/spark-master.

Authored-by: chenzhx <chen@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…pression

### What changes were proposed in this pull request?
This is a ANSI SQL and feature id is `F861`
```
<query expression> ::=
[ <with clause> ] <query expression body>
[ <order by clause> ] [ <result offset clause> ] [ <fetch first clause> ]

<result offset clause> ::=
OFFSET <offset row count> { ROW | ROWS }
```
For example:
```
SELECT customer_name, customer_gender FROM customer_dimension
   WHERE occupation='Dancer' AND customer_city = 'San Francisco' ORDER BY customer_name;
    customer_name     | customer_gender
----------------------+-----------------
 Amy X. Lang          | Female
 Anna H. Li           | Female
 Brian O. Weaver      | Male
 Craig O. Pavlov      | Male
 Doug Z. Goldberg     | Male
 Harold S. Jones      | Male
 Jack E. Perkins      | Male
 Joseph W. Overstreet | Male
 Kevin . Campbell     | Male
 Raja Y. Wilson       | Male
 Samantha O. Brown    | Female
 Steve H. Gauthier    | Male
 William . Nielson    | Male
 William Z. Roy       | Male
(14 rows)

SELECT customer_name, customer_gender FROM customer_dimension
   WHERE occupation='Dancer' AND customer_city = 'San Francisco' ORDER BY customer_name OFFSET 8;
   customer_name   | customer_gender
-------------------+-----------------
 Kevin . Campbell  | Male
 Raja Y. Wilson    | Male
 Samantha O. Brown | Female
 Steve H. Gauthier | Male
 William . Nielson | Male
 William Z. Roy    | Male
(6 rows)
```
There are some mainstream database support the syntax.

**Druid**
https://druid.apache.org/docs/latest/querying/sql.html#offset

**Kylin**
http://kylin.apache.org/docs/tutorial/sql_reference.html#QUERYSYNTAX

**Exasol**
https://docs.exasol.com/sql/select.htm

**Greenplum**
http://docs.greenplum.org/6-8/ref_guide/sql_commands/SELECT.html

**MySQL**
https://dev.mysql.com/doc/refman/5.6/en/select.html

**Monetdb**
https://www.monetdb.org/Documentation/SQLreference/SQLSyntaxOverview#SELECT

**PostgreSQL**
https://www.postgresql.org/docs/11/queries-limit.html

**Sqlite**
https://www.sqlite.org/lang_select.html

**Vertica**
https://www.vertica.com/docs/9.2.x/HTML/Content/Authoring/SQLReferenceManual/Statements/SELECT/OFFSETClause.htm?zoom_highlight=offset

The description for design:
**1**. Consider `OFFSET` as the special case of `LIMIT`. For example:
`SELECT * FROM a limit 10;` similar to `SELECT * FROM a limit 10 offset 0;`
`SELECT * FROM a offset 10;` similar to `SELECT * FROM a limit -1 offset 10;`
**2**. Because the current implement of `LIMIT` has good performance. For example:
`SELECT * FROM a limit 10;` parsed to the logic plan as below:
```
GlobalLimit (limit = 10)
|--LocalLimit (limit = 10)
```
and then the physical plan as below:
```
GlobalLimitExec (limit = 10) // Take the first 10 rows globally
|--LocalLimitExec (limit = 10) // Take the first 10 rows locally
```
This operator reduce massive shuffle and has good performance.
Sometimes, the logic plan transformed to the physical plan as:
```
CollectLimitExec (limit = 10) // Take the first 10 rows globally
```
If the SQL contains order by, such as `SELECT * FROM a order by c limit 10;`.
This SQL will be transformed to the physical plan as below:
```
TakeOrderedAndProjectExec (limit = 10) // Take the first 10 rows after sort globally
```

Based on this situation, this PR produces the following operations. For example:
`SELECT * FROM a limit 10 offset 10;` parsed to the logic plan as below:
```
GlobalLimit (limit = 10)
|--LocalLimit (limit = 10)
   |--Offset (offset = 10)
```
After optimization, the above logic plan will be transformed to:
```
GlobalLimitAndOffset (limit = 10, offset = 10) // Limit clause accompanied by offset clause
|--LocalLimit (limit = 20)   // 10 + offset = 20
```

and then the physical plan as below:
```
GlobalLimitAndOffsetExec (limit = 10, offset = 10) // Skip the first 10 rows and take the next 10 rows globally
|--LocalLimitExec (limit = 20) // Take the first 20(limit + offset) rows locally
```
Sometimes, the logic plan transformed to the physical plan as:
```
CollectLimitExec (limit = 10, offset = 10) // Skip the first 10 rows and take the next 10 rows globally
```
If the SQL contains order by, such as `SELECT * FROM a order by c limit 10 offset 10;`.
This SQL will be transformed to the physical plan as below:
```
TakeOrderedAndProjectExec (limit = 10, offset 10) // Skip the first 10 rows and take the next 10 rows after sort globally
```
**3**.In addition to the above, there is a special case that is only offset but no limit. For example:
`SELECT * FROM a offset 10;` parsed to the logic plan as below:
```
Offset (offset = 10) // Only offset clause
```
If offset is very large, will generate a lot of overhead. So this PR will refuse use offset clause without limit clause, although we can parse, transform and execute it.

A balanced idea is add a configuration item `spark.sql.forceUsingOffsetWithoutLimit` to force running query when user knows the offset is small enough. The default value of `spark.sql.forceUsingOffsetWithoutLimit` is false. This PR just came up with the idea so that it could be implemented at a better time in the future.

Note: The origin PR to support this feature is apache#25416.
Because the origin PR too old, there exists massive conflict which is hard to resolve. So I open this new PR to support this feature.

### Why are the changes needed?
new feature

### Does this PR introduce any user-facing change?
'No'

### How was this patch tested?
Exists and new UT

Closes apache#35975 from beliefer/SPARK-28330.

Authored-by: Jiaan Geng <beliefer@163.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
Currently, `Offset` must work with `Limit`. The behavior not allow to use offset alone and add offset API into `DataFrame`.

If we use `Offset` alone, there are two situations:
1. If `Offset` is the last operator, collect the result to the driver and then drop/skip the first n (offset value) rows. Users can test or debug `Offset` in the way.
2. If `Offset` is the intermediate operator, shuffle all the result to one task and drop/skip the first n (offset value) rows and the result will be passed to the downstream operator.

For example, `SELECT * FROM a offset 10; ` parsed to the logic plan as below:
```
Offset (offset = 10) // Only offset clause
|--Relation
```

and then the physical plan as below:
```
CollectLimitExec(limit = -1, offset = 10) // Collect the result to the driver and skip the first 10 rows
|--JDBCRelation
```
or
```
GlobalLimitAndOffsetExec(limit = -1, offset = 10) // Collect the result and skip the first 10 rows
|--JDBCRelation
```

After this PR merged, users could input the SQL show below:
```
SELECT '' AS ten, unique1, unique2, stringu1
 		FROM onek
 		ORDER BY unique1 OFFSET 990;
```

Note: apache#35975 supports offset clause, it create a logical node named
`GlobalLimitAndOffset`. In fact, we can avoid use this node and use `Offset` instead and the latter is good with unify name.

### Why are the changes needed?
Improve the implement of offset clause.

### Does this PR introduce _any_ user-facing change?
'No'.
New feature.

### How was this patch tested?
Exists test cases.

Closes apache#36417 from beliefer/SPARK-28330_followup2.

Authored-by: Jiaan Geng <beliefer@163.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
Currently, Spark added `Offset` operator.
This PR try to add `offset` API into `Dataset`.

### Why are the changes needed?
`offset` API is very useful and construct test case more easily.

### Does this PR introduce _any_ user-facing change?
'No'.
New feature.

### How was this patch tested?
New tests.

Closes apache#36519 from beliefer/SPARK-39159.

Authored-by: Jiaan Geng <beliefer@163.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@chenzhx chenzhx merged commit 43ac165 into kyspark-3.2.x-4.x May 31, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants