Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ abstract class QueryPlanner[PhysicalPlan <: TreeNode[PhysicalPlan]] {
childPlans.map { childPlan =>
// Replace the placeholder by the child plan
candidateWithPlaceholders.transformUp {
case p if p == placeholder => childPlan
case p if p.eq(placeholder) => childPlan
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As the placeholders are collected from candidateWithPlaceholders, I think we will definitely have a matched child plan by reference equality here, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, right.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch... this fix is simple and ok to me in this case though, I think we'd be better to compare placeholder nodes only here, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure what you mean exactly here, may you elaborate a bit more please? Thanks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nvm, just a quesion. I was thinking why we couldn't write here like;

trait PlaceHolder;
case class PlanLater extends LeafExecNode with PlaceHolder;

then,

candidateWithPlaceholders.transformUp {
  case p: PlaceHolder if p.eq(placeholder) => childPlan
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could do that only in 3.0, as it would be a breaking change for those who are using a custom QueryPlanner.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, thanks. anyway, this fix looks pretty ok.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably we can also move PlanLater to catalyst and use that instead of introducing a new trait. I think it can be proposed for 3.0. Thanks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we do ExprID dedup for the children of UNION in the Analyzer stage, Is the problem fixed?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't have exprId for query plan.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This condition will be always false after we dedup the expression id. Please let me know if yoany of you can find another test case to break it. Thanks!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can reproduce the bug with any plan which has more than one child but doesn't dedup the expr id. Union is one of them. I'm not sure if Union is the only one though...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moreover, I am not sure if there are other cases which result in == being true when eq isn't and I'd argue that it is very hard to ensure such a thing. So I think this fix would be anyway needed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Union is the last one we are not doing the dedup. I believe we need to fix it. If we dedup Union children, we do not have a valid test case for this PR. @mgaido91 Do you have any test case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have other examples, but I cannot exclude that there are. And I don't see any benefit in getting back to the previous solution. So I think the current code is safer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we dedup expr ids for union, then I think this patch becomes a code cleanup instead of bug fix, and we can remove this test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, the test can be used for testing the other patch then or removed

}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -704,6 +704,23 @@ class PlannerSuite extends SharedSQLContext {
df.queryExecution.executedPlan.execute()
}

test("SPARK-25278: physical nodes should be different instances for same logical nodes") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we demonstrate the metrics problem in this test? I think that's a real bug.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nvm, I saw it in the next test suite

val range = Range(1, 1, 1, 1)
val df = Union(range, range)
val ranges = df.queryExecution.optimizedPlan.collect {
case r: Range => r
}
assert(ranges.length == 2)
// Ensure the two Range instances are equal according to their equal method
assert(ranges.head == ranges.last)
val execRanges = df.queryExecution.sparkPlan.collect {
case r: RangeExec => r
}
assert(execRanges.length == 2)
// Ensure the two RangeExec instances are different instances
assert(!execRanges.head.eq(execRanges.last))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it difficult to test the eq behaivour by using the QueryPlanner class?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mmmh, what do you mean exactly?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

e.g.,

case class DummyPlanner() extends QueryPlanner[LogicalPlan] { ... }

assert(DummyPlanner().plan(eqTestPlan) === expectedPlan))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is doable, but I don't see the advantage. May you please explain me why that is better? Thanks.

Copy link
Member

@maropu maropu Sep 6, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just a question to check if we can do that. The current test is ok to me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cool, thanks. Anyway, yes, I think we can do that. Thanks.

}

test("SPARK-24556: always rewrite output partitioning in ReusedExchangeExec " +
"and InMemoryTableScanExec") {
def checkOutputPartitioningRewrite(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,19 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared
}
}

test("SPARK-25278: output metrics are wrong for plans repeated in the query") {
val name = "demo_view"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrap the code with withView?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doing, thanks!

withView(name) {
sql(s"CREATE OR REPLACE VIEW $name AS VALUES 1,2")
val view = spark.table(name)
val union = view.union(view)
testSparkPlanMetrics(union, 1, Map(
0L -> ("Union" -> Map()),
1L -> ("LocalTableScan" -> Map("number of output rows" -> 2L)),
2L -> ("LocalTableScan" -> Map("number of output rows" -> 2L))))
}
}

test("writing data out metrics: parquet") {
testMetricsNonDynamicPartition("parquet", "t1")
}
Expand Down