Update upstream#251
Merged
Merged
Conversation
## What changes were proposed in this pull request? Test coverage for arithmetic operations leading to: 1. Precision loss 2. Overflow Moreover, tests for casting bad string to other input types and for using bad string as operators of some functions. ## How was this patch tested? added tests Author: Marco Gaido <marcogaido91@gmail.com> Closes #20084 from mgaido91/SPARK-22904.
…g data failed. ## What changes were proposed in this pull request? Fix OneVsRestModel transform on streaming data failed. ## How was this patch tested? UT will be added soon, once #19979 merged. (Need a helper test method there) Author: WeichenXu <weichen.xu@databricks.com> Closes #20077 from WeichenXu123/fix_ovs_model_transform.
…reduce entries for mutable state ## What changes were proposed in this pull request? This PR addresses additional review comments in #19811 ## How was this patch tested? Existing test suites Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com> Closes #20036 from kiszk/SPARK-18066-followup.
## What changes were proposed in this pull request?
This PR moves Structured Streaming v2 APIs to streaming folder as following:
```
sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming
├── ContinuousReadSupport.java
├── ContinuousWriteSupport.java
├── MicroBatchReadSupport.java
├── MicroBatchWriteSupport.java
├── reader
│ ├── ContinuousDataReader.java
│ ├── ContinuousReader.java
│ ├── MicroBatchReader.java
│ ├── Offset.java
│ └── PartitionOffset.java
└── writer
└── ContinuousWriter.java
```
## How was this patch tested?
Jenkins
Author: Shixiong Zhu <zsxwing@gmail.com>
Closes #20093 from zsxwing/move.
…, gcs, etc.) in Kubernetes mode ## What changes were proposed in this pull request? This PR expands the Kubernetes mode to be able to use remote dependencies on http/https endpoints, GCS, S3, etc. It adds steps for configuring and appending the Kubernetes init-container into the driver and executor pods for downloading remote dependencies. [Init-containers](https://kubernetes.io/docs/concepts/workloads/pods/init-containers/), as the name suggests, are containers that are run to completion before the main containers start, and are often used to perform initialization tasks prior to starting the main containers. We use init-containers to localize remote application dependencies before the driver/executors start running. The code that the init-container runs is also included. This PR also adds a step to the driver and executors for mounting user-specified secrets that may store credentials for accessing data storage, e.g., S3 and Google Cloud Storage (GCS), into the driver and executors. ## How was this patch tested? * The patch contains unit tests which are passing. * Manual testing: `./build/mvn -Pkubernetes clean package` succeeded. * Manual testing of the following cases: * [x] Running SparkPi using container-local spark-example jar. * [x] Running SparkPi using container-local spark-example jar with user-specific secret mounted. * [x] Running SparkPi using spark-example jar hosted remotely on an https endpoint. cc rxin felixcheung mateiz (shepherd) k8s-big-data SIG members & contributors: mccheah foxish ash211 ssuchter varunkatta kimoonkim erikerlandson tnachen ifilonenko liyinan926 reviewers: vanzin felixcheung jiangxb1987 mridulm Author: Yinan Li <liyinan926@gmail.com> Closes #19954 from liyinan926/init-container.
…rets ## What changes were proposed in this pull request? This PR updates the Kubernetes documentation corresponding to the following features/changes in #19954. * Ability to use remote dependencies through the init-container. * Ability to mount user-specified secrets into the driver and executor pods. vanzin jiangxb1987 foxish Author: Yinan Li <liyinan926@gmail.com> Closes #20059 from liyinan926/doc-update.
GulajavaMinistudio
pushed a commit
that referenced
this pull request
Nov 10, 2020
### What changes were proposed in this pull request?
Push down filter through expand. For case below:
```
create table t1(pid int, uid int, sid int, dt date, suid int) using parquet;
create table t2(pid int, vs int, uid int, csid int) using parquet;
SELECT
years,
appversion,
SUM(uusers) AS users
FROM (SELECT
Date_trunc('year', dt) AS years,
CASE
WHEN h.pid = 3 THEN 'iOS'
WHEN h.pid = 4 THEN 'Android'
ELSE 'Other'
END AS viewport,
h.vs AS appversion,
Count(DISTINCT u.uid) AS uusers
,Count(DISTINCT u.suid) AS srcusers
FROM t1 u
join t2 h
ON h.uid = u.uid
GROUP BY 1,
2,
3) AS a
WHERE viewport = 'iOS'
GROUP BY 1,
2
```
Plan. before this pr:
```
== Physical Plan ==
*(5) HashAggregate(keys=[years#30, appversion#32], functions=[sum(uusers#33L)])
+- Exchange hashpartitioning(years#30, appversion#32, 200), true, [id=#251]
+- *(4) HashAggregate(keys=[years#30, appversion#32], functions=[partial_sum(uusers#33L)])
+- *(4) HashAggregate(keys=[date_trunc('year', CAST(u.`dt` AS TIMESTAMP))#45, CASE WHEN (h.`pid` = 3) THEN 'iOS' WHEN (h.`pid` = 4) THEN 'Android' ELSE 'Other' END#46, vs#12], functions=[count(if ((gid#44 = 1)) u.`uid`#47 else null)])
+- Exchange hashpartitioning(date_trunc('year', CAST(u.`dt` AS TIMESTAMP))#45, CASE WHEN (h.`pid` = 3) THEN 'iOS' WHEN (h.`pid` = 4) THEN 'Android' ELSE 'Other' END#46, vs#12, 200), true, [id=#246]
+- *(3) HashAggregate(keys=[date_trunc('year', CAST(u.`dt` AS TIMESTAMP))#45, CASE WHEN (h.`pid` = 3) THEN 'iOS' WHEN (h.`pid` = 4) THEN 'Android' ELSE 'Other' END#46, vs#12], functions=[partial_count(if ((gid#44 = 1)) u.`uid`#47 else null)])
+- *(3) HashAggregate(keys=[date_trunc('year', CAST(u.`dt` AS TIMESTAMP))#45, CASE WHEN (h.`pid` = 3) THEN 'iOS' WHEN (h.`pid` = 4) THEN 'Android' ELSE 'Other' END#46, vs#12, u.`uid`#47, u.`suid`#48, gid#44], functions=[])
+- Exchange hashpartitioning(date_trunc('year', CAST(u.`dt` AS TIMESTAMP))#45, CASE WHEN (h.`pid` = 3) THEN 'iOS' WHEN (h.`pid` = 4) THEN 'Android' ELSE 'Other' END#46, vs#12, u.`uid`#47, u.`suid`#48, gid#44, 200), true, [id=#241]
+- *(2) HashAggregate(keys=[date_trunc('year', CAST(u.`dt` AS TIMESTAMP))#45, CASE WHEN (h.`pid` = 3) THEN 'iOS' WHEN (h.`pid` = 4) THEN 'Android' ELSE 'Other' END#46, vs#12, u.`uid`#47, u.`suid`#48, gid#44], functions=[])
+- *(2) Filter (CASE WHEN (h.`pid` = 3) THEN 'iOS' WHEN (h.`pid` = 4) THEN 'Android' ELSE 'Other' END#46 = iOS)
+- *(2) Expand [ArrayBuffer(date_trunc(year, cast(dt#9 as timestamp), Some(Etc/GMT+7)), CASE WHEN (pid#11 = 3) THEN iOS WHEN (pid#11 = 4) THEN Android ELSE Other END, vs#12, uid#7, null, 1), ArrayBuffer(date_trunc(year, cast(dt#9 as timestamp), Some(Etc/GMT+7)), CASE WHEN (pid#11 = 3) THEN iOS WHEN (pid#11 = 4) THEN Android ELSE Other END, vs#12, null, suid#10, 2)], [date_trunc('year', CAST(u.`dt` AS TIMESTAMP))#45, CASE WHEN (h.`pid` = 3) THEN 'iOS' WHEN (h.`pid` = 4) THEN 'Android' ELSE 'Other' END#46, vs#12, u.`uid`#47, u.`suid`#48, gid#44]
+- *(2) Project [uid#7, dt#9, suid#10, pid#11, vs#12]
+- *(2) BroadcastHashJoin [uid#7], [uid#13], Inner, BuildRight
:- *(2) Project [uid#7, dt#9, suid#10]
: +- *(2) Filter isnotnull(uid#7)
: +- *(2) ColumnarToRow
: +- FileScan parquet default.t1[uid#7,dt#9,suid#10] Batched: true, DataFilters: [isnotnull(uid#7)], Format: Parquet, Location: InMemoryFileIndex[file:/root/spark-3.0.0-bin-hadoop3.2/spark-warehouse/t1], PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,dt:date,suid:int>
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[2, int, true] as bigint))), [id=#233]
+- *(1) Project [pid#11, vs#12, uid#13]
+- *(1) Filter isnotnull(uid#13)
+- *(1) ColumnarToRow
+- FileScan parquet default.t2[pid#11,vs#12,uid#13] Batched: true, DataFilters: [isnotnull(uid#13)], Format: Parquet, Location: InMemoryFileIndex[file:/root/spark-3.0.0-bin-hadoop3.2/spark-warehouse/t2], PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<pid:int,vs:int,uid:int>
```
Plan. after. this pr. :
```
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[years#0, appversion#2], functions=[sum(uusers#3L)], output=[years#0, appversion#2, users#5L])
+- Exchange hashpartitioning(years#0, appversion#2, 5), true, [id=#71]
+- HashAggregate(keys=[years#0, appversion#2], functions=[partial_sum(uusers#3L)], output=[years#0, appversion#2, sum#22L])
+- HashAggregate(keys=[date_trunc(year, cast(dt#9 as timestamp), Some(America/Los_Angeles))#23, CASE WHEN (pid#11 = 3) THEN iOS WHEN (pid#11 = 4) THEN Android ELSE Other END#24, vs#12], functions=[count(distinct uid#7)], output=[years#0, appversion#2, uusers#3L])
+- Exchange hashpartitioning(date_trunc(year, cast(dt#9 as timestamp), Some(America/Los_Angeles))#23, CASE WHEN (pid#11 = 3) THEN iOS WHEN (pid#11 = 4) THEN Android ELSE Other END#24, vs#12, 5), true, [id=#67]
+- HashAggregate(keys=[date_trunc(year, cast(dt#9 as timestamp), Some(America/Los_Angeles))#23, CASE WHEN (pid#11 = 3) THEN iOS WHEN (pid#11 = 4) THEN Android ELSE Other END#24, vs#12], functions=[partial_count(distinct uid#7)], output=[date_trunc(year, cast(dt#9 as timestamp), Some(America/Los_Angeles))#23, CASE WHEN (pid#11 = 3) THEN iOS WHEN (pid#11 = 4) THEN Android ELSE Other END#24, vs#12, count#27L])
+- HashAggregate(keys=[date_trunc(year, cast(dt#9 as timestamp), Some(America/Los_Angeles))#23, CASE WHEN (pid#11 = 3) THEN iOS WHEN (pid#11 = 4) THEN Android ELSE Other END#24, vs#12, uid#7], functions=[], output=[date_trunc(year, cast(dt#9 as timestamp), Some(America/Los_Angeles))#23, CASE WHEN (pid#11 = 3) THEN iOS WHEN (pid#11 = 4) THEN Android ELSE Other END#24, vs#12, uid#7])
+- Exchange hashpartitioning(date_trunc(year, cast(dt#9 as timestamp), Some(America/Los_Angeles))#23, CASE WHEN (pid#11 = 3) THEN iOS WHEN (pid#11 = 4) THEN Android ELSE Other END#24, vs#12, uid#7, 5), true, [id=#63]
+- HashAggregate(keys=[date_trunc(year, cast(dt#9 as timestamp), Some(America/Los_Angeles)) AS date_trunc(year, cast(dt#9 as timestamp), Some(America/Los_Angeles))#23, CASE WHEN (pid#11 = 3) THEN iOS WHEN (pid#11 = 4) THEN Android ELSE Other END AS CASE WHEN (pid#11 = 3) THEN iOS WHEN (pid#11 = 4) THEN Android ELSE Other END#24, vs#12, uid#7], functions=[], output=[date_trunc(year, cast(dt#9 as timestamp), Some(America/Los_Angeles))#23, CASE WHEN (pid#11 = 3) THEN iOS WHEN (pid#11 = 4) THEN Android ELSE Other END#24, vs#12, uid#7])
+- Project [uid#7, dt#9, pid#11, vs#12]
+- BroadcastHashJoin [uid#7], [uid#13], Inner, BuildRight, false
:- Filter isnotnull(uid#7)
: +- FileScan parquet default.t1[uid#7,dt#9] Batched: true, DataFilters: [isnotnull(uid#7)], Format: Parquet, Location: InMemoryFileIndex[file:/private/var/folders/4l/7_c5c97s1_gb0d9_d6shygx00000gn/T/warehouse-c069d87..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,dt:date>
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[2, int, false] as bigint)),false), [id=#58]
+- Filter ((CASE WHEN (pid#11 = 3) THEN iOS WHEN (pid#11 = 4) THEN Android ELSE Other END = iOS) AND isnotnull(uid#13))
+- FileScan parquet default.t2[pid#11,vs#12,uid#13] Batched: true, DataFilters: [(CASE WHEN (pid#11 = 3) THEN iOS WHEN (pid#11 = 4) THEN Android ELSE Other END = iOS), isnotnull..., Format: Parquet, Location: InMemoryFileIndex[file:/private/var/folders/4l/7_c5c97s1_gb0d9_d6shygx00000gn/T/warehouse-c069d87..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<pid:int,vs:int,uid:int>
```
### Why are the changes needed?
Improve performance, filter more data.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Added UT
Closes apache#30278 from AngersZhuuuu/SPARK-33302.
Authored-by: angerszhu <angers.zhu@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this pull request?
(Please fill in changes proposed in this fix)
How was this patch tested?
(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)
Please review http://spark.apache.org/contributing.html before opening a pull request.