Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ object StatFunctions extends Logging {
}

// If there is no selected columns, we don't need to run this aggregate, so make it a lazy val.
lazy val aggResult = ds.select(aggExprs: _*).queryExecution.toRdd.collect().head
lazy val aggResult = ds.select(aggExprs: _*).queryExecution.toRdd.map(_.copy()).collect().head
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, we're taking the head row anyway. Do you have any e2e example that produces a wrong result?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, I think we'd better add a test for it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We encountered this issue when passing gluten ut. Because in Gluten we rewritten ColumnarToRow. which will release the row after the row is used. It seems that this issue is hard to reproduce on apache spark. Because the ColumnarToRow of apache spark is in on heap memory, which is recycled by GC. Do you have any suggestion to reproduce this issue in apache spark? @HyukjinKwon @cloud-fan @zhengruifeng

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only a bug if Spark installs third-party physical operators that release memory eagerly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the doc of QueryExecution.toRDD, I think adding copy is the right thing to do

  /**
   * Internal version of the RDD. Avoids copies and has no schema.
   * Note for callers: Spark may apply various optimization including reusing object: this means
   * the row is valid only for the iteration it is retrieved. You should avoid storing row and
   * accessing after iteration. (Calling `collect()` is one of known bad usage.)
   * If you want to store these rows into collection, please apply some converter or copy row
   * which produces new object per iteration.
   * Given QueryExecution is not a public class, end users are discouraged to use this: please
   * use `Dataset.rdd` instead where conversion will be applied.
   */
  lazy val toRdd: RDD[InternalRow] = new SQLExecutionRDD(
    executedPlan.execute(), sparkSession.sessionState.conf)

It happens to work here because the result only has one row (it's a global aggregate). I'm fine without testing it as this follows the guidance of QueryExecution.toRdd doc

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @cloud-fan for the valuable information. @HyukjinKwon @zhengruifeng can we follow the suggestion from wenchen to merge this PR without testing?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon Do you have any further comment? Thanks.


// We will have one row for each selected statistic in the result.
val result = Array.fill[InternalRow](selectedStatistics.length) {
Expand Down