-
Notifications
You must be signed in to change notification settings - Fork 2.9k
Spark 4.0: Migrate Iceberg Stored Procedures to Spark built-in implementations #13106
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| } | ||
|
|
||
| @TestTemplate | ||
| @Disabled // Spark SQL does not support case insensitive for named arguments |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Strictly speaking, it's a regression, but I think it's okay to respect the Spark SQL syntax.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh, there is no spark config for this, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do built-in functions work in Spark? Do procedures behave consistently with that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I remember correctly, Spark should share arg matching with functions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@aokolnychyi @szehon-ho resolution of named args for both function and procedure happens in org.apache.spark.sql.catalyst.plans.logical.NamedParametersSupport, it simply uses parameterNamesSet.contains(parameterName) to match the arg name.
Spark code change is required if we want it to respect spark.sql.caseSensitive
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should just follow what Spark does today.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If Spark decides to change this behavior both for functions and procedures in the future, that's fine. I don't think we have to worry about this in Iceberg. Also, this will be a brand-new Iceberg jar for Spark 4.0.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unfortunately spark 4.0 will be supported in Iceberg 1.10?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I vote disable case insensitivity for now and merge this pr in 1.10 to avoid breaking things in 1.11
|
|
||
| sql( | ||
| "CALL %s.system.rewrite_manifests(usE_cAcHiNg => false, tAbLe => '%s')", | ||
| "CALL %s.system.rewrite_manifests(use_caching => false, table => '%s')", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as above, Spark does not support case insensitive for named args
64a0ca4 to
7e8cbf3
Compare
| ImmutableList.of(), | ||
| sql( | ||
| "CALL %s.system.remove_orphan_files(table => '%s', older_than => %dL, location => '%s')", | ||
| "CALL %s.system.remove_orphan_files(table => '%s', older_than => CAST(%dL AS TIMESTAMP), location => '%s')", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
implicit cast does not work here, LONG => TIMESTAMP requries an explicit cast
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@aokolnychyi this should work right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh nice, so disable ansi should make this case work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unfortunately, no, it does not satisfy the implicit cast requirements
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i see, too bad, that is another backward incompatbiility.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah it's unfortunate but it is a major version upgrade so I think it's OK. I also don't think the iceberg should get too involved in trying to work around spark behavior especially for a major version upgrade.
I do think we should call these out in our procedure docs so that way we can make it easy for folks to figure out any gotcha's on upgrading
| } | ||
|
|
||
| @TestTemplate | ||
| public void testInvalidAncestorOfCases() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
though I kept and updated these tests, but actually cases like missing/wrong/duplicated parameters are properly handled by Spark catalyst now, not necessary to verify it on Iceberg again.
maybe we should remove those negative test cases
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think its fine to keep, good to verify the behavior and have a reference
|
CI is green now, cc @aokolnychyi @huaxingao @szehon-ho would you mind taking a look? |
|
This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions. |
|
|
||
| @Override | ||
| public String description() { | ||
| return "RollbackToSnapshotProcedure"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should probably improve the description, but can be done in separate pr
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer to narrow the PR scope and do it later
7e8cbf3 to
7bb9d14
Compare
...park-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java
Outdated
Show resolved
Hide resolved
.../main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala
Show resolved
Hide resolved
| .isInstanceOf(AnalysisException.class) | ||
| .hasMessage("Named and positional arguments cannot be mixed"); | ||
| .isInstanceOf(RuntimeException.class) | ||
| .hasMessageStartingWith("Couldn't load table"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
was this an error in the original test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couldn't load table doesn't seem like the correct error message here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, this becomes a legal case in Spark's named args resolution
Spark requires:
- the named arguments don't contains positional arguments once keyword arguments start
- the named arguments don't use the duplicated names
that's say
-- illegal in Iceberg, legal in Spark
CALL catalog.system.fast_forward('test_table', branch => 'main', to => 'newBranch')
-- illegal in both Iceberg and Spark
CALL catalog.system.fast_forward(table => 'test_table', 'main', to => 'newBranch')
I have to change it to the latter to make the test meaningful.
| } | ||
|
|
||
| @TestTemplate | ||
| @Disabled // Spark SQL does not support case insensitive for named arguments |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unfortunately spark 4.0 will be supported in Iceberg 1.10?
| assertThatThrownBy(() -> sql("CALL %s.system.cherrypick_snapshot('t', 2.2)", catalogName)) | ||
| .isInstanceOf(AnalysisException.class) | ||
| .hasMessageStartingWith("Wrong arg type for snapshot_id: cannot cast"); | ||
| .isInstanceOf(RuntimeException.class) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we make the test capture the original idea? (cast exception)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 this test doesn't make sense to me? are we trying to test calling a procedure on a table that doesn't exist or the table exists and we want to fail on the cast of the invalid snapshot ID
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
addressed by changing 2.2 to '2.2', now it throws CAST_INVALID_INPUT
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/BaseProcedure.java
Show resolved
Hide resolved
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/BaseProcedure.java
Outdated
Show resolved
Hide resolved
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/BaseProcedure.java
Outdated
Show resolved
Hide resolved
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/AddFilesProcedure.java
Outdated
Show resolved
Hide resolved
...v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java
Outdated
Show resolved
Hide resolved
|
@amogh-jahagirdar @nastra @stevenzwu we will release 1.10 with spark 4.0 support, right? This change wont make the release and will thus bring a bit of backward incompability as call procedure will follow Spark behavior:
Any suggestion? Should we just doc this for next release? (or let me know if 1.10 will not have spark 4.0 support) |
7bb9d14 to
8f629e1
Compare
|
@szehon-ho thanks for reviewing, all comments are addressed now. |
szehon-ho
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rest of suggestion can be handled in separate pr (test only)
| public StructType outputType() { | ||
| return OUTPUT_TYPE; | ||
| public boolean isDeterministic() { | ||
| return false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This one should be deterministic though ? Or is this just to avoid some other Spark behavior
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This hasn't been used by Spark yet.
I conservatively assume that all procedures are indeterministic to avoid unexpected behavior. For this procedure, it can return a different result after the snapshot expires
| import scala.Option; | ||
|
|
||
| abstract class BaseProcedure implements Procedure { | ||
| abstract class BaseProcedure implements BoundProcedure, UnboundProcedure { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a little confused by this implementing bouth Bound and Unbound and the "bind" calls returning nothing in the implementations, Could you explain what the intent is here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC The idea was in Spark, that you can have different implementations of the procedure if you call with different arguments (thats when you bind it), but for all practical purpose I think its not so used in Iceberg
RussellSpitzer
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't want to hold this back, I'm +1 on all the changes but I have a few things I think we should clean up before merging. I'm leaving my approval here so that others can feel free to merge once they are on board.
Things we need to clean up
-
There are a bunch of orphan tests now that just catch a generic error being thrown, for most of these I think we need to just remove the test case altogether if we aren't able to actually test it. See "Couldn't load ..."
-
I'm a little confused about the Bound, Unbound thing. If this is the expected way to implement procedures then I don't have a problem, just feels like we should only have to do one of them?
-
Deterministic is set to false for a bunch of procedures which I think are deterministic? Is this to avoid some spark behavior?
Anyway those are all minor concerns, this seems like a pretty straight forward translation. Thanks @pan3793 for the work!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto to basically everything @RussellSpitzer said. I think I'm fundamentally OK with this PR but it'd be good to make sure those tests are cleaned up and double check the deterministic API on those procedures before merging? It'd also be good to add a section in docs about some of these behavior changes around case sensitivity and type handling (but that can be in a separate PR)
|
@RussellSpitzer @amogh-jahagirdar thanks for the review.
|
...nsions/src/test/java/org/apache/iceberg/spark/extensions/TestFastForwardBranchProcedure.java
Show resolved
Hide resolved
...ensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
Outdated
Show resolved
Hide resolved
| @Override | ||
| public StructType outputType() { | ||
| return OUTPUT_TYPE; | ||
| public boolean isDeterministic() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe this method can go into BaseProcedure since the return value is the same across all procedures
|
I guess Spark just needs all Expression to mark whether they are deterministic , for things like knowing whether you can use them in various places like filter aggregate, merge condition, default value, etc. not sure they all apply here :) but it makes sense for now that procedure output for iceberg is not deterinistic as it doesnt make sense to use the output like that. As there's a lot of approvals here, I can wait a bit today if any more comment and merge if not to unblock 1.10 release |
|
Merged, thanks @pan3793 for the great work, and everyone for jumping on the reviews! |
…entations (apache#13106) (apache#1611) Co-authored-by: Cheng Pan <[email protected]>
### What changes were proposed in this pull request? As the title. ### Why are the changes needed? The issue was originally found during - apache/iceberg#13106 I don't see any special reason that named parameters should always be case sensitive. (correct me if I'm wrong) I tested PostgreSQL, and the named parameters are case-insensitive by default. ``` psql (17.6 (Debian 17.6-1.pgdg13+1)) Type "help" for help. postgres=# CREATE FUNCTION concat_lower_or_upper(a text, b text, uppercase boolean DEFAULT false) RETURNS text AS $$ SELECT CASE WHEN $3 THEN UPPER($1 || ' ' || $2) ELSE LOWER($1 || ' ' || $2) END; $$ LANGUAGE SQL IMMUTABLE STRICT; CREATE FUNCTION postgres=# SELECT concat_lower_or_upper('Hello', 'World', true); concat_lower_or_upper ----------------------- HELLO WORLD (1 row) postgres=# SELECT concat_lower_or_upper(a => 'Hello', b => 'World'); concat_lower_or_upper ----------------------- hello world (1 row) postgres=# SELECT concat_lower_or_upper(A => 'Hello', b => 'World'); concat_lower_or_upper ----------------------- hello world (1 row) postgres=# ``` ### Does this PR introduce _any_ user-facing change? Yes, named parameters used by functions, procedures now respect `spark.sql.caseSensitive`, instead of always performing case sensitive. ### How was this patch tested? Added UT. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #52269 from pan3793/SPARK-53523. Authored-by: Cheng Pan <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
### What changes were proposed in this pull request? As the title. ### Why are the changes needed? The issue was originally found during - apache/iceberg#13106 I don't see any special reason that named parameters should always be case sensitive. (correct me if I'm wrong) I tested PostgreSQL, and the named parameters are case-insensitive by default. ``` psql (17.6 (Debian 17.6-1.pgdg13+1)) Type "help" for help. postgres=# CREATE FUNCTION concat_lower_or_upper(a text, b text, uppercase boolean DEFAULT false) RETURNS text AS $$ SELECT CASE WHEN $3 THEN UPPER($1 || ' ' || $2) ELSE LOWER($1 || ' ' || $2) END; $$ LANGUAGE SQL IMMUTABLE STRICT; CREATE FUNCTION postgres=# SELECT concat_lower_or_upper('Hello', 'World', true); concat_lower_or_upper ----------------------- HELLO WORLD (1 row) postgres=# SELECT concat_lower_or_upper(a => 'Hello', b => 'World'); concat_lower_or_upper ----------------------- hello world (1 row) postgres=# SELECT concat_lower_or_upper(A => 'Hello', b => 'World'); concat_lower_or_upper ----------------------- hello world (1 row) postgres=# ``` ### Does this PR introduce _any_ user-facing change? Yes, named parameters used by functions, procedures now respect `spark.sql.caseSensitive`, instead of always performing case sensitive. ### How was this patch tested? Added UT. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#52269 from pan3793/SPARK-53523. Authored-by: Cheng Pan <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
Migrate Iceberg Stored Procedures to Spark built-in implementations since SPARK-44167 (4.0.0) adds Stored Procedures support for Spark.
Note, this change brings a bit of backward incompability as call procedure will follow Spark behavior: