Skip to content

Conversation

@zsxwing
Copy link
Member

@zsxwing zsxwing commented Apr 9, 2014

If we call the DStream.slice() before StreamingContext.start() has been called, then zeroTime is still null, and it will throw a null pointer exception. Ideally, it should throw something like a "ContextNotInitlalized" exception.

https://issues.apache.org/jira/browse/SPARK-1382

This PR added a check in the slice and its unit test.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@rxin
Copy link
Contributor

rxin commented Apr 17, 2014

Jenkins, test this please.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you make this a SparkExcepation? All expected exceptions thrown by Spark should be SparkException.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. One more question: There are many new Exception in DStream.scala. Is it necessary to change them to SparkExcepation?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you can do it in this pr that would be great!

On Wednesday, April 16, 2014, Shixiong Zhu [email protected] wrote:

In
streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala:

@@ -725,6 +725,9 @@ abstract class DStream[T: ClassTag](* Return all the RDDs between 'fromTime' to 'toTime' %28both included)
*/
def slice(fromTime: Time, toTime: Time): Seq[RDD[T]] = {

  • if (!isInitialized) {
  •  throw new Exception(this + " has not been initialized")
    

Sure. One more question: There are many new Exception in DStream.scala.
It's necessary to change them to SparkExcepation?

Reply to this email directly or view it on GitHubhttps://github.com//pull/365/files#r11716967
.

@tdas
Copy link
Contributor

tdas commented Apr 17, 2014

Jenkins, test this please.

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished. All automated tests passed.

@AmplabJenkins
Copy link

All automated tests passed.
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14194/

@zsxwing
Copy link
Member Author

zsxwing commented Apr 17, 2014

Changing to SparkExcepation will not break the API since SparkExcepation is a subclass of Exception.

@rxin
Copy link
Contributor

rxin commented Apr 17, 2014

Jenkins, retest this please.

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished. All automated tests passed.

@AmplabJenkins
Copy link

All automated tests passed.
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14203/

@tdas
Copy link
Contributor

tdas commented Apr 26, 2014

This doesnt quite merge so I cherry picked the commits, merged master, and issued a new PR #562
I will merge that in asap. Can you please close this PR ?

Thanks for the bug fix btw!

pwendell pushed a commit to pwendell/spark that referenced this pull request Apr 26, 2014
@zsxwing I cherry-picked your changes and merged the master. apache#365 had some conflicts once again!

Author: zsxwing <[email protected]>
Author: Tathagata Das <[email protected]>

Closes apache#562 from tdas/SPARK-1382 and squashes the following commits:

e2962c1 [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-1382
20968d9 [zsxwing] Replace Exception with SparkException in DStream
e476651 [zsxwing] Merge remote-tracking branch 'origin/master' into SPARK-1382
35ba56a [zsxwing] SPARK-1382: Fix NPE in DStream.slice
asfgit pushed a commit that referenced this pull request Apr 26, 2014
@zsxwing I cherry-picked your changes and merged the master. #365 had some conflicts once again!

Author: zsxwing <[email protected]>
Author: Tathagata Das <[email protected]>

Closes #562 from tdas/SPARK-1382 and squashes the following commits:

e2962c1 [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-1382
20968d9 [zsxwing] Replace Exception with SparkException in DStream
e476651 [zsxwing] Merge remote-tracking branch 'origin/master' into SPARK-1382
35ba56a [zsxwing] SPARK-1382: Fix NPE in DStream.slice

(cherry picked from commit 058797c)
Signed-off-by: Tathagata Das <[email protected]>
@zsxwing
Copy link
Member Author

zsxwing commented Apr 26, 2014

Thank you for merging it.

@zsxwing zsxwing closed this Apr 26, 2014
@zsxwing zsxwing deleted the SPARK-1382 branch May 18, 2014 09:50
pdeyhim pushed a commit to pdeyhim/spark-1 that referenced this pull request Jun 25, 2014
@zsxwing I cherry-picked your changes and merged the master. apache#365 had some conflicts once again!

Author: zsxwing <[email protected]>
Author: Tathagata Das <[email protected]>

Closes apache#562 from tdas/SPARK-1382 and squashes the following commits:

e2962c1 [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-1382
20968d9 [zsxwing] Replace Exception with SparkException in DStream
e476651 [zsxwing] Merge remote-tracking branch 'origin/master' into SPARK-1382
35ba56a [zsxwing] SPARK-1382: Fix NPE in DStream.slice
tangzhankun pushed a commit to tangzhankun/spark that referenced this pull request Jul 25, 2017
…e#365)

* Submission client redesign to use a step-based builder pattern.

This change overhauls the underlying architecture of the submission
client, but it is intended to entirely preserve existing behavior of
Spark applications. Therefore users will find this to be an invisible
change.

The philosophy behind this design is to reconsider the breakdown of the
submission process. It operates off the abstraction of "submission
steps", which are transformation functions that take the previous state
of the driver and return the new state of the driver. The driver's state
includes its Spark configurations and the Kubernetes resources that will
be used to deploy it.

Such a refactor moves away from a features-first API design, which
considers different containers to serve a set of features. The previous
design, for example, had a container files resolver API object that
returned different resolutions of the dependencies added by the user.
However, it was up to the main Client to know how to intelligently
invoke all of those APIs. Therefore the API surface area of the file
resolver became untenably large and it was not intuitive of how it was
to be used or extended.

This design changes the encapsulation layout; every module is now
responsible for changing the driver specification directly. An
orchestrator builds the correct chain of steps and hands it to the
client, which then calls it verbatim. The main client then makes any
final modifications that put the different pieces of the driver
together, particularly to attach the driver container itself to the pod
and to apply the Spark configuration as command-line arguments.

* Add a unit test for BaseSubmissionStep.

* Add unit test for kubernetes credentials mounting.

* Add unit test for InitContainerBootstrapStep.

* unit tests for initContainer

* Add a unit test for DependencyResolutionStep.

* further modifications to InitContainer unit tests

* Use of resolver in PythonStep and unit tests for PythonStep

* refactoring of init unit tests and pythonstep resolver logic

* Add unit test for KubernetesSubmissionStepsOrchestrator.

* refactoring and addition of secret trustStore+Cert checks in a SubmissionStepSuite

* added SparkPodInitContainerBootstrapSuite

* Added InitContainerResourceStagingServerSecretPluginSuite

* style in Unit tests

* extremely minor style fix in variable naming

* Address comments.

* Rename class for consistency.

* Attempt to make spacing consistent.

Multi-line methods should have four-space indentation for arguments that
aren't on the same line as the method call itself... but this is
difficult to do consistently given how IDEs handle Scala multi-line indentation
in most cases.
erikerlandson pushed a commit to erikerlandson/spark that referenced this pull request Jul 28, 2017
…e#365)

* Submission client redesign to use a step-based builder pattern.

This change overhauls the underlying architecture of the submission
client, but it is intended to entirely preserve existing behavior of
Spark applications. Therefore users will find this to be an invisible
change.

The philosophy behind this design is to reconsider the breakdown of the
submission process. It operates off the abstraction of "submission
steps", which are transformation functions that take the previous state
of the driver and return the new state of the driver. The driver's state
includes its Spark configurations and the Kubernetes resources that will
be used to deploy it.

Such a refactor moves away from a features-first API design, which
considers different containers to serve a set of features. The previous
design, for example, had a container files resolver API object that
returned different resolutions of the dependencies added by the user.
However, it was up to the main Client to know how to intelligently
invoke all of those APIs. Therefore the API surface area of the file
resolver became untenably large and it was not intuitive of how it was
to be used or extended.

This design changes the encapsulation layout; every module is now
responsible for changing the driver specification directly. An
orchestrator builds the correct chain of steps and hands it to the
client, which then calls it verbatim. The main client then makes any
final modifications that put the different pieces of the driver
together, particularly to attach the driver container itself to the pod
and to apply the Spark configuration as command-line arguments.

* Add a unit test for BaseSubmissionStep.

* Add unit test for kubernetes credentials mounting.

* Add unit test for InitContainerBootstrapStep.

* unit tests for initContainer

* Add a unit test for DependencyResolutionStep.

* further modifications to InitContainer unit tests

* Use of resolver in PythonStep and unit tests for PythonStep

* refactoring of init unit tests and pythonstep resolver logic

* Add unit test for KubernetesSubmissionStepsOrchestrator.

* refactoring and addition of secret trustStore+Cert checks in a SubmissionStepSuite

* added SparkPodInitContainerBootstrapSuite

* Added InitContainerResourceStagingServerSecretPluginSuite

* style in Unit tests

* extremely minor style fix in variable naming

* Address comments.

* Rename class for consistency.

* Attempt to make spacing consistent.

Multi-line methods should have four-space indentation for arguments that
aren't on the same line as the method call itself... but this is
difficult to do consistently given how IDEs handle Scala multi-line indentation
in most cases.
bzhaoopenstack pushed a commit to bzhaoopenstack/spark that referenced this pull request Sep 11, 2019
Clean the legacy Go projects path compatibility workarounds

Closes: theopenlab/openlab#123
Related-Bugs: theopenlab/openlab#100
RolatZhang pushed a commit to RolatZhang/spark that referenced this pull request Mar 18, 2022
* AL-4217 skip view cache when lookupRelation

* update version to r44
turboFei pushed a commit to turboFei/spark that referenced this pull request Nov 6, 2025
…an should be semantically equivalent (apache#365)

When canonicalizing `output` in `InMemoryRelation`, use `output` itself as the schema for determining the ordinals, rather than `cachedPlan.output`.

`InMemoryRelation.output` and `InMemoryRelation.cachedPlan.output` don't necessarily use the same exprIds. E.g.:
```
+- InMemoryRelation [c1#340, c2#341], StorageLevel(disk, memory, deserialized, 1 replicas)
      +- LocalTableScan [c1#254, c2#255]

```
Because of this, `InMemoryRelation` will sometimes fail to fully canonicalize, resulting in cases where two semantically equivalent `InMemoryRelation` instances appear to be semantically nonequivalent.

Example:
```
create or replace temp view data(c1, c2) as values
(1, 2),
(1, 3),
(3, 7),
(4, 5);

cache table data;

select c1, (select count(*) from data d1 where d1.c1 = d2.c1), count(c2) from data d2 group by all;
```
If plan change validation checking is on (i.e., `spark.sql.planChangeValidation=true`), the failure is:
```
[PLAN_VALIDATION_FAILED_RULE_EXECUTOR] The input plan of org.apache.spark.sql.internal.BaseSessionStateBuilder$$anon$2 is invalid: Aggregate: Aggregate [c1#78, scalar-subquery#77 [c1#78]], [c1#78, scalar-subquery#77 [c1#78] AS scalarsubquery(c1)#90L, count(c2#79) AS count(c2)#83L]
...
is not a valid aggregate expression: [SCALAR_SUBQUERY_IS_IN_GROUP_BY_OR_AGGREGATE_FUNCTION] The correlated scalar subquery '"scalarsubquery(c1)"' is neither present in GROUP BY, nor in an aggregate function.
```
If plan change validation checking is off, the failure is more mysterious:
```
[INTERNAL_ERROR] Couldn't find count(1)#163L in [c1#78,_groupingexpression#149L,count(1)#82L] SQLSTATE: XX000
org.apache.spark.SparkException: [INTERNAL_ERROR] Couldn't find count(1)#163L in [c1#78,_groupingexpression#149L,count(1)#82L] SQLSTATE: XX000
```
If you remove the cache command, the query succeeds.

The above failures happen because the subquery in the aggregate expressions and the subquery in the grouping expressions seem semantically nonequivalent since the `InMemoryRelation` in one of the subquery plans failed to completely canonicalize.

In `CacheManager#useCachedData`, two lookups for the same cached plan may create `InMemoryRelation` instances that have different exprIds in `output`. That's because the plan fragments used as lookup keys  may have been deduplicated by `DeduplicateRelations`, and thus have different exprIds in their respective output schemas. When `CacheManager#useCachedData` creates an `InMemoryRelation` instance, it borrows the output schema of the plan fragment used as the lookup key.

The failure to fully canonicalize has other effects. For example, this query fails to reuse the exchange:
```
create or replace temp view data(c1, c2) as values
(1, 2),
(1, 3),
(2, 4),
(3, 7),
(7, 22);

cache table data;

set spark.sql.autoBroadcastJoinThreshold=-1;
set spark.sql.adaptive.enabled=false;

select *
from data l
join data r
on l.c1 = r.c1;
```

No.

New tests.

No.

Closes apache#44806 from bersprockets/plan_validation_issue.

Authored-by: Bruce Robbins <[email protected]>

(cherry picked from commit b80e8cb)

Signed-off-by: Dongjoon Hyun <[email protected]>
Co-authored-by: Bruce Robbins <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants