Skip to content

Conversation

@imback82
Copy link
Contributor

What changes were proposed in this pull request?

This PR proposes to update CACHE TABLE to use a LogicalPlan when caching a query to avoid creating a DataFrame as suggested here: #30743 (comment)

For reference, UNCACHE TABLE also uses LogicalPlan:

case class UncacheTableExec(
relation: LogicalPlan,
cascade: Boolean) extends V2CommandExec {
override def run(): Seq[InternalRow] = {
val sparkSession = sqlContext.sparkSession
sparkSession.sharedState.cacheManager.uncacheQuery(sparkSession, relation, cascade)
Seq.empty
}

Why are the changes needed?

To avoid creating an unnecessary dataframe and make it consistent with uncacheQuery used in UNCACHE TABLE.

Does this PR introduce any user-facing change?

No, just internal changes.

How was this patch tested?

Existing tests since this is an internal refactoring change.

@github-actions github-actions bot added the SQL label Dec 17, 2020
@SparkQA
Copy link

SparkQA commented Dec 17, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/37524/

@SparkQA
Copy link

SparkQA commented Dec 17, 2020

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/37524/

@imback82
Copy link
Contributor Author

cc @cloud-fan, thanks!

* Unlike `RDD.cache()`, the default storage level is set to be `MEMORY_AND_DISK` because
* recomputing the in-memory columnar representation of the underlying table is expensive.
*/
def cacheQueryWithLogicalPlan(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we just call it cacheQuery to be consistent with uncacheQuery?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, we cannot because Scala doesn't allow overloading methods with default arguments.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I think we can do the folllowing:

  def cacheQuery(
    spark: SparkSession,
    planToCache: LogicalPlan,
    tableName: Option[String]): Unit = {
    cacheQuery(spark, planToCache, tableName, MEMORY_AND_DISK)
  }

  def cacheQuery(
      spark: SparkSession,
      planToCache: LogicalPlan,
      tableName: Option[String],
      storageLevel: StorageLevel)

Copy link
Contributor

@cloud-fan cloud-fan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

def cacheQueryWithLogicalPlan(
spark: SparkSession,
planToCache: LogicalPlan,
tableName: Option[String] = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this method need default parameter values?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think https://github.com/apache/spark/pull/30815/files#r544822268 should work. Let me update this PR. Thanks!

@SparkQA
Copy link

SparkQA commented Dec 17, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/37537/

@SparkQA
Copy link

SparkQA commented Dec 17, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/37537/

@SparkQA
Copy link

SparkQA commented Dec 17, 2020

Test build #132921 has finished for PR 30815 at commit d9dcfb3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 17, 2020

Test build #132934 has finished for PR 30815 at commit 52de954.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented Dec 17, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/37561/

@SparkQA
Copy link

SparkQA commented Dec 17, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/37561/

@dongjoon-hyun
Copy link
Member

cc @sunchao

* Unlike `RDD.cache()`, the default storage level is set to be `MEMORY_AND_DISK` because
* recomputing the in-memory columnar representation of the underlying table is expensive.
*/
def cacheQuery(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on this! I think we may replace one more usage in DataSourceV2Strategy.invalidateCache as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, updated.

spark: SparkSession,
planToCache: LogicalPlan,
tableName: Option[String],
storageLevel: StorageLevel): Unit = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps we can just keep a single method with default value of tableName being None and storageLevel being MEMORY_AND_DISK?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The scala compiler will complain if we do that. See #30815 (comment)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah got it. Thanks.

@SparkQA
Copy link

SparkQA commented Dec 17, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/37567/

@SparkQA
Copy link

SparkQA commented Dec 17, 2020

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/37567/

@SparkQA
Copy link

SparkQA commented Dec 17, 2020

Test build #132959 has finished for PR 30815 at commit 52de954.

  • This patch fails from timeout after a configured wait of 500m.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 18, 2020

Test build #132964 has finished for PR 30815 at commit bc80b6d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in 0f1a183 Dec 18, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants