Skip to content

Conversation

@bersprockets
Copy link
Contributor

@bersprockets bersprockets commented Jan 19, 2024

What changes were proposed in this pull request?

When canonicalizing output in InMemoryRelation, use output itself as the schema for determining the ordinals, rather than cachedPlan.output.

Why are the changes needed?

InMemoryRelation.output and InMemoryRelation.cachedPlan.output don't necessarily use the same exprIds. E.g.:

+- InMemoryRelation [c1#340, c2#341], StorageLevel(disk, memory, deserialized, 1 replicas)
      +- LocalTableScan [c1#254, c2#255]

Because of this, InMemoryRelation will sometimes fail to fully canonicalize, resulting in cases where two semantically equivalent InMemoryRelation instances appear to be semantically nonequivalent.

Example:

create or replace temp view data(c1, c2) as values
(1, 2),
(1, 3),
(3, 7),
(4, 5);

cache table data;

select c1, (select count(*) from data d1 where d1.c1 = d2.c1), count(c2) from data d2 group by all;

If plan change validation checking is on (i.e., spark.sql.planChangeValidation=true), the failure is:

[PLAN_VALIDATION_FAILED_RULE_EXECUTOR] The input plan of org.apache.spark.sql.internal.BaseSessionStateBuilder$$anon$2 is invalid: Aggregate: Aggregate [c1#78, scalar-subquery#77 [c1#78]], [c1#78, scalar-subquery#77 [c1#78] AS scalarsubquery(c1)#90L, count(c2#79) AS count(c2)#83L]
...
is not a valid aggregate expression: [SCALAR_SUBQUERY_IS_IN_GROUP_BY_OR_AGGREGATE_FUNCTION] The correlated scalar subquery '"scalarsubquery(c1)"' is neither present in GROUP BY, nor in an aggregate function.

If plan change validation checking is off, the failure is more mysterious:

[INTERNAL_ERROR] Couldn't find count(1)#163L in [c1#78,_groupingexpression#149L,count(1)#82L] SQLSTATE: XX000
org.apache.spark.SparkException: [INTERNAL_ERROR] Couldn't find count(1)#163L in [c1#78,_groupingexpression#149L,count(1)#82L] SQLSTATE: XX000

If you remove the cache command, the query succeeds.

The above failures happen because the subquery in the aggregate expressions and the subquery in the grouping expressions seem semantically nonequivalent since the InMemoryRelation in one of the subquery plans failed to completely canonicalize.

In CacheManager#useCachedData, two lookups for the same cached plan may create InMemoryRelation instances that have different exprIds in output. That's because the plan fragments used as lookup keys may have been deduplicated by DeduplicateRelations, and thus have different exprIds in their respective output schemas. When CacheManager#useCachedData creates an InMemoryRelation instance, it borrows the output schema of the plan fragment used as the lookup key.

The failure to fully canonicalize has other effects. For example, this query fails to reuse the exchange:

create or replace temp view data(c1, c2) as values
(1, 2),
(1, 3),
(2, 4),
(3, 7),
(7, 22);

cache table data;

set spark.sql.autoBroadcastJoinThreshold=-1;
set spark.sql.adaptive.enabled=false;

select *
from data l
join data r
on l.c1 = r.c1;

Does this PR introduce any user-facing change?

No.

How was this patch tested?

New tests.

Was this patch authored or co-authored using generative AI tooling?

No.

@github-actions github-actions bot added the SQL label Jan 19, 2024
Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM.

cc @viirya and @sunchao

@dongjoon-hyun
Copy link
Member

Thank you, @bersprockets and @viirya
Merged to master/3.5/3.4.

dongjoon-hyun pushed a commit that referenced this pull request Jan 22, 2024
…an should be semantically equivalent

When canonicalizing `output` in `InMemoryRelation`, use `output` itself as the schema for determining the ordinals, rather than `cachedPlan.output`.

`InMemoryRelation.output` and `InMemoryRelation.cachedPlan.output` don't necessarily use the same exprIds. E.g.:
```
+- InMemoryRelation [c1#340, c2#341], StorageLevel(disk, memory, deserialized, 1 replicas)
      +- LocalTableScan [c1#254, c2#255]

```
Because of this, `InMemoryRelation` will sometimes fail to fully canonicalize, resulting in cases where two semantically equivalent `InMemoryRelation` instances appear to be semantically nonequivalent.

Example:
```
create or replace temp view data(c1, c2) as values
(1, 2),
(1, 3),
(3, 7),
(4, 5);

cache table data;

select c1, (select count(*) from data d1 where d1.c1 = d2.c1), count(c2) from data d2 group by all;
```
If plan change validation checking is on (i.e., `spark.sql.planChangeValidation=true`), the failure is:
```
[PLAN_VALIDATION_FAILED_RULE_EXECUTOR] The input plan of org.apache.spark.sql.internal.BaseSessionStateBuilder$$anon$2 is invalid: Aggregate: Aggregate [c1#78, scalar-subquery#77 [c1#78]], [c1#78, scalar-subquery#77 [c1#78] AS scalarsubquery(c1)#90L, count(c2#79) AS count(c2)#83L]
...
is not a valid aggregate expression: [SCALAR_SUBQUERY_IS_IN_GROUP_BY_OR_AGGREGATE_FUNCTION] The correlated scalar subquery '"scalarsubquery(c1)"' is neither present in GROUP BY, nor in an aggregate function.
```
If plan change validation checking is off, the failure is more mysterious:
```
[INTERNAL_ERROR] Couldn't find count(1)#163L in [c1#78,_groupingexpression#149L,count(1)#82L] SQLSTATE: XX000
org.apache.spark.SparkException: [INTERNAL_ERROR] Couldn't find count(1)#163L in [c1#78,_groupingexpression#149L,count(1)#82L] SQLSTATE: XX000
```
If you remove the cache command, the query succeeds.

The above failures happen because the subquery in the aggregate expressions and the subquery in the grouping expressions seem semantically nonequivalent since the `InMemoryRelation` in one of the subquery plans failed to completely canonicalize.

In `CacheManager#useCachedData`, two lookups for the same cached plan may create `InMemoryRelation` instances that have different exprIds in `output`. That's because the plan fragments used as lookup keys  may have been deduplicated by `DeduplicateRelations`, and thus have different exprIds in their respective output schemas. When `CacheManager#useCachedData` creates an `InMemoryRelation` instance, it borrows the output schema of the plan fragment used as the lookup key.

The failure to fully canonicalize has other effects. For example, this query fails to reuse the exchange:
```
create or replace temp view data(c1, c2) as values
(1, 2),
(1, 3),
(2, 4),
(3, 7),
(7, 22);

cache table data;

set spark.sql.autoBroadcastJoinThreshold=-1;
set spark.sql.adaptive.enabled=false;

select *
from data l
join data r
on l.c1 = r.c1;
```

No.

New tests.

No.

Closes #44806 from bersprockets/plan_validation_issue.

Authored-by: Bruce Robbins <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit b80e8cb)
Signed-off-by: Dongjoon Hyun <[email protected]>
dongjoon-hyun pushed a commit that referenced this pull request Jan 22, 2024
…an should be semantically equivalent

When canonicalizing `output` in `InMemoryRelation`, use `output` itself as the schema for determining the ordinals, rather than `cachedPlan.output`.

`InMemoryRelation.output` and `InMemoryRelation.cachedPlan.output` don't necessarily use the same exprIds. E.g.:
```
+- InMemoryRelation [c1#340, c2#341], StorageLevel(disk, memory, deserialized, 1 replicas)
      +- LocalTableScan [c1#254, c2#255]

```
Because of this, `InMemoryRelation` will sometimes fail to fully canonicalize, resulting in cases where two semantically equivalent `InMemoryRelation` instances appear to be semantically nonequivalent.

Example:
```
create or replace temp view data(c1, c2) as values
(1, 2),
(1, 3),
(3, 7),
(4, 5);

cache table data;

select c1, (select count(*) from data d1 where d1.c1 = d2.c1), count(c2) from data d2 group by all;
```
If plan change validation checking is on (i.e., `spark.sql.planChangeValidation=true`), the failure is:
```
[PLAN_VALIDATION_FAILED_RULE_EXECUTOR] The input plan of org.apache.spark.sql.internal.BaseSessionStateBuilder$$anon$2 is invalid: Aggregate: Aggregate [c1#78, scalar-subquery#77 [c1#78]], [c1#78, scalar-subquery#77 [c1#78] AS scalarsubquery(c1)#90L, count(c2#79) AS count(c2)#83L]
...
is not a valid aggregate expression: [SCALAR_SUBQUERY_IS_IN_GROUP_BY_OR_AGGREGATE_FUNCTION] The correlated scalar subquery '"scalarsubquery(c1)"' is neither present in GROUP BY, nor in an aggregate function.
```
If plan change validation checking is off, the failure is more mysterious:
```
[INTERNAL_ERROR] Couldn't find count(1)#163L in [c1#78,_groupingexpression#149L,count(1)#82L] SQLSTATE: XX000
org.apache.spark.SparkException: [INTERNAL_ERROR] Couldn't find count(1)#163L in [c1#78,_groupingexpression#149L,count(1)#82L] SQLSTATE: XX000
```
If you remove the cache command, the query succeeds.

The above failures happen because the subquery in the aggregate expressions and the subquery in the grouping expressions seem semantically nonequivalent since the `InMemoryRelation` in one of the subquery plans failed to completely canonicalize.

In `CacheManager#useCachedData`, two lookups for the same cached plan may create `InMemoryRelation` instances that have different exprIds in `output`. That's because the plan fragments used as lookup keys  may have been deduplicated by `DeduplicateRelations`, and thus have different exprIds in their respective output schemas. When `CacheManager#useCachedData` creates an `InMemoryRelation` instance, it borrows the output schema of the plan fragment used as the lookup key.

The failure to fully canonicalize has other effects. For example, this query fails to reuse the exchange:
```
create or replace temp view data(c1, c2) as values
(1, 2),
(1, 3),
(2, 4),
(3, 7),
(7, 22);

cache table data;

set spark.sql.autoBroadcastJoinThreshold=-1;
set spark.sql.adaptive.enabled=false;

select *
from data l
join data r
on l.c1 = r.c1;
```

No.

New tests.

No.

Closes #44806 from bersprockets/plan_validation_issue.

Authored-by: Bruce Robbins <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit b80e8cb)
Signed-off-by: Dongjoon Hyun <[email protected]>
szehon-ho pushed a commit to szehon-ho/spark that referenced this pull request Feb 7, 2024
…an should be semantically equivalent

When canonicalizing `output` in `InMemoryRelation`, use `output` itself as the schema for determining the ordinals, rather than `cachedPlan.output`.

`InMemoryRelation.output` and `InMemoryRelation.cachedPlan.output` don't necessarily use the same exprIds. E.g.:
```
+- InMemoryRelation [c1#340, c2#341], StorageLevel(disk, memory, deserialized, 1 replicas)
      +- LocalTableScan [c1#254, c2#255]

```
Because of this, `InMemoryRelation` will sometimes fail to fully canonicalize, resulting in cases where two semantically equivalent `InMemoryRelation` instances appear to be semantically nonequivalent.

Example:
```
create or replace temp view data(c1, c2) as values
(1, 2),
(1, 3),
(3, 7),
(4, 5);

cache table data;

select c1, (select count(*) from data d1 where d1.c1 = d2.c1), count(c2) from data d2 group by all;
```
If plan change validation checking is on (i.e., `spark.sql.planChangeValidation=true`), the failure is:
```
[PLAN_VALIDATION_FAILED_RULE_EXECUTOR] The input plan of org.apache.spark.sql.internal.BaseSessionStateBuilder$$anon$2 is invalid: Aggregate: Aggregate [c1#78, scalar-subquery#77 [c1#78]], [c1#78, scalar-subquery#77 [c1#78] AS scalarsubquery(c1)#90L, count(c2#79) AS count(c2)#83L]
...
is not a valid aggregate expression: [SCALAR_SUBQUERY_IS_IN_GROUP_BY_OR_AGGREGATE_FUNCTION] The correlated scalar subquery '"scalarsubquery(c1)"' is neither present in GROUP BY, nor in an aggregate function.
```
If plan change validation checking is off, the failure is more mysterious:
```
[INTERNAL_ERROR] Couldn't find count(1)#163L in [c1#78,_groupingexpression#149L,count(1)#82L] SQLSTATE: XX000
org.apache.spark.SparkException: [INTERNAL_ERROR] Couldn't find count(1)#163L in [c1#78,_groupingexpression#149L,count(1)#82L] SQLSTATE: XX000
```
If you remove the cache command, the query succeeds.

The above failures happen because the subquery in the aggregate expressions and the subquery in the grouping expressions seem semantically nonequivalent since the `InMemoryRelation` in one of the subquery plans failed to completely canonicalize.

In `CacheManager#useCachedData`, two lookups for the same cached plan may create `InMemoryRelation` instances that have different exprIds in `output`. That's because the plan fragments used as lookup keys  may have been deduplicated by `DeduplicateRelations`, and thus have different exprIds in their respective output schemas. When `CacheManager#useCachedData` creates an `InMemoryRelation` instance, it borrows the output schema of the plan fragment used as the lookup key.

The failure to fully canonicalize has other effects. For example, this query fails to reuse the exchange:
```
create or replace temp view data(c1, c2) as values
(1, 2),
(1, 3),
(2, 4),
(3, 7),
(7, 22);

cache table data;

set spark.sql.autoBroadcastJoinThreshold=-1;
set spark.sql.adaptive.enabled=false;

select *
from data l
join data r
on l.c1 = r.c1;
```

No.

New tests.

No.

Closes apache#44806 from bersprockets/plan_validation_issue.

Authored-by: Bruce Robbins <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit b80e8cb)
Signed-off-by: Dongjoon Hyun <[email protected]>
@bersprockets bersprockets deleted the plan_validation_issue branch February 15, 2024 01:27
turboFei pushed a commit to turboFei/spark that referenced this pull request Nov 6, 2025
…an should be semantically equivalent (apache#365)

When canonicalizing `output` in `InMemoryRelation`, use `output` itself as the schema for determining the ordinals, rather than `cachedPlan.output`.

`InMemoryRelation.output` and `InMemoryRelation.cachedPlan.output` don't necessarily use the same exprIds. E.g.:
```
+- InMemoryRelation [c1#340, c2#341], StorageLevel(disk, memory, deserialized, 1 replicas)
      +- LocalTableScan [c1#254, c2#255]

```
Because of this, `InMemoryRelation` will sometimes fail to fully canonicalize, resulting in cases where two semantically equivalent `InMemoryRelation` instances appear to be semantically nonequivalent.

Example:
```
create or replace temp view data(c1, c2) as values
(1, 2),
(1, 3),
(3, 7),
(4, 5);

cache table data;

select c1, (select count(*) from data d1 where d1.c1 = d2.c1), count(c2) from data d2 group by all;
```
If plan change validation checking is on (i.e., `spark.sql.planChangeValidation=true`), the failure is:
```
[PLAN_VALIDATION_FAILED_RULE_EXECUTOR] The input plan of org.apache.spark.sql.internal.BaseSessionStateBuilder$$anon$2 is invalid: Aggregate: Aggregate [c1#78, scalar-subquery#77 [c1#78]], [c1#78, scalar-subquery#77 [c1#78] AS scalarsubquery(c1)#90L, count(c2#79) AS count(c2)#83L]
...
is not a valid aggregate expression: [SCALAR_SUBQUERY_IS_IN_GROUP_BY_OR_AGGREGATE_FUNCTION] The correlated scalar subquery '"scalarsubquery(c1)"' is neither present in GROUP BY, nor in an aggregate function.
```
If plan change validation checking is off, the failure is more mysterious:
```
[INTERNAL_ERROR] Couldn't find count(1)#163L in [c1#78,_groupingexpression#149L,count(1)#82L] SQLSTATE: XX000
org.apache.spark.SparkException: [INTERNAL_ERROR] Couldn't find count(1)#163L in [c1#78,_groupingexpression#149L,count(1)#82L] SQLSTATE: XX000
```
If you remove the cache command, the query succeeds.

The above failures happen because the subquery in the aggregate expressions and the subquery in the grouping expressions seem semantically nonequivalent since the `InMemoryRelation` in one of the subquery plans failed to completely canonicalize.

In `CacheManager#useCachedData`, two lookups for the same cached plan may create `InMemoryRelation` instances that have different exprIds in `output`. That's because the plan fragments used as lookup keys  may have been deduplicated by `DeduplicateRelations`, and thus have different exprIds in their respective output schemas. When `CacheManager#useCachedData` creates an `InMemoryRelation` instance, it borrows the output schema of the plan fragment used as the lookup key.

The failure to fully canonicalize has other effects. For example, this query fails to reuse the exchange:
```
create or replace temp view data(c1, c2) as values
(1, 2),
(1, 3),
(2, 4),
(3, 7),
(7, 22);

cache table data;

set spark.sql.autoBroadcastJoinThreshold=-1;
set spark.sql.adaptive.enabled=false;

select *
from data l
join data r
on l.c1 = r.c1;
```

No.

New tests.

No.

Closes apache#44806 from bersprockets/plan_validation_issue.

Authored-by: Bruce Robbins <[email protected]>

(cherry picked from commit b80e8cb)

Signed-off-by: Dongjoon Hyun <[email protected]>
Co-authored-by: Bruce Robbins <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants