Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Spark] Disable implicit casting in Delta streaming sink #3691

Merged

Conversation

johanl-db
Copy link
Collaborator

Description

#3443 introduced implicit casting when writing to a Delta table using a streaming query.

We are disabling this change for now as it regresses behavior when a struct field is missing in the input data. This previously succeeded, filling the missing fields with null but would now fail with:

DELTA_UPDATE_SCHEMA_MISMATCH_EXPRESSION] Cannot cast struct<name:string> to struct<name:string,age:bigint>. All nested columns must match.

Note: batch INSERT fails in this scenario with:

[DELTA_INSERT_COLUMN_ARITY_MISMATCH] Cannot write to '<table>, not enough nested fields in <struct>; target table has 3 column(s) but the inserted data has 2 column(s)

but since streaming write allowed this, we have to preserve that behavior.

How was this patch tested?

Tests added as part of #3443, e.p. with flag disabled.

Does this PR introduce any user-facing changes?

Disabled behavior change that was to be introduced with #3443.

@johanl-db johanl-db force-pushed the disable-implicit-casting-delta-sink branch from fc7f72c to aa05b6a Compare September 20, 2024 08:38
@Kimahriman
Copy link
Contributor

FWIW we noticed something similar recently with inserts on merge, the implicit cast failed if struct fields were missing

@johanl-db
Copy link
Collaborator Author

FWIW we noticed something similar recently with inserts on merge, the implicit cast failed if struct fields were missing

I believe that's expected in that case, unless you enable schema evolution, MERGE / UPDATE rejects writes with missing struct fields. Batch (non-streaming) INSERT also rejects that - although I've recently looked closer at batch insert behavior and the truth is that it's a bit all over the place..

Rejecting missing fields is a reasonable behavior when schema evolution is disabled, as it means we instead fall back to schema enforcement. Streaming writes didn't behave that way though, which I missed

@vkorukanti vkorukanti merged commit a99f62b into delta-io:master Sep 23, 2024
14 of 17 checks passed
@Kimahriman
Copy link
Contributor

Kimahriman commented Sep 23, 2024

Rejecting missing fields is a reasonable behavior when schema evolution is disabled, as it means we instead fall back to schema enforcement. Streaming writes didn't behave that way though, which I missed

This is missing fields in the source, not the target, so not schema evolution related.

Easy example:

import pyspark.sql.functions as F
from delta import DeltaTable
# Create table of nested struct<id: long, value: long>
spark.range(10).select(F.struct('id', (F.col('id') * 10).alias('value')).alias('nested')).write.format('delta').save('/tmp/merge-test')
# Works without "nested.value"
spark.range(10, 20).select(F.struct('id').alias('nested')).write.format('delta').mode('append').save('/tmp/merge-test')

table = DeltaTable.forPath(spark, "/tmp/merge-test")
# Fails with "Cannot cast struct<id:bigint> to struct<id:bigint,value:bigint>. All nested columns must match
table.alias('target').merge(spark.range(20, 30).select(F.struct('id').alias('nested')).alias('source', 'target.nested.id = source.nested.id').whenNotMatchedInsertAll().execute()

@johanl-db
Copy link
Collaborator Author

Rejecting missing fields is a reasonable behavior when schema evolution is disabled, as it means we instead fall back to schema enforcement. Streaming writes didn't behave that way though, which I missed

This is missing fields in the source, not the target, so not schema evolution related.

Easy example:

import pyspark.sql.functions as F
from delta import DeltaTable
# Create table of nested struct<id: long, value: long>
spark.range(10).select(F.struct('id', (F.col('id') * 10).alias('value')).alias('nested')).write.format('delta').save('/tmp/merge-test')
# Works without "nested.value"
spark.range(10, 20).select(F.struct('id').alias('nested')).write.format('delta').mode('append').save('/tmp/merge-test')

table = DeltaTable.forPath(spark, "/tmp/merge-test")
# Fails with "Cannot cast struct<id:bigint> to struct<id:bigint,value:bigint>. All nested columns must match
table.alias('target').merge(spark.range(20, 30).select(F.struct('id').alias('nested')).alias('source', 'target.nested.id = source.nested.id').whenNotMatchedInsertAll().execute()

Right, save()/saveAsTable() will accept missing fields, but if you try SQL INSERT INTO t (nested) VALUES (..), then I expect that will fail. I think insert by position - df.insertInto(), INSERT INTO t VALUES (..) - will also fail.

We do handle SQL insert by name and SQL/DF insert by position in DeltaAnalysis and apply schema enforcement, but let save()/saveAsTable() get through unhandled and pretty much don't do any schema validation beyond basic compatibility when writing the data.

That's what I meant by insert behavior is a bit all over the place, I looked at it a couple of weeks ago to see if it could be fixed - and allow implicit casting in save()/saveAsTable() - but this would bring a lot of breaking behavior changes

@Kimahriman
Copy link
Contributor

That's what I meant by insert behavior is a bit all over the place, I looked at it a couple of weeks ago to see if it could be fixed - and allow implicit casting in save()/saveAsTable() - but this would bring a lot of breaking behavior changes

Would it make more sense to do a Dataset.to type operation instead of a cast? That seems more like what is trying to be achieved.

@johanl-db
Copy link
Collaborator Author

Would it make more sense to do a Dataset.to type operation instead of a cast? That seems more like what is trying to be achieved.

That seems too limited, Dataset.to also cast types and eventually calls to Cast.canANSIStoreAssign() which for example doesn't allow many casts that we do support today - we allow most or all valid implicit casts, string -> int would be valid - and doesn't respect the value of spark.sql.storeAssignmentPolicy that write operations should respect.

The column reordering part does make sense though, although the issue isn't so much how to apply it as much as making sure we're not breaking existing workloads if we start doing it in more cases

@Kimahriman
Copy link
Contributor

So it seems like the whole issue stems from the fact that castIfNeeded treats a missing source field as "schema evolution". But that is very different from what most would consider as "schema evolution", a missing field in the target, and the schema of the target table changing as a result of some sort of write. It's really "null filling". If you remove allowStructEvolution from castIfNeeded, and don't make it throw an exception if the source is missing a target field, that would likely resolve a lot of the issues/difference with the write paths, and make this sink cast a non-issue. If you really think null-filling should be configurable (whether defaulting to false or true), that should be a different setting than "schema merging", as the use case is very different.

@johanl-db
Copy link
Collaborator Author

So it seems like the whole issue stems from the fact that castIfNeeded treats a missing source field as "schema evolution". But that is very different from what most would consider as "schema evolution", a missing field in the target, and the schema of the target table changing as a result of some sort of write. It's really "null filling". If you remove allowStructEvolution from castIfNeeded, and don't make it throw an exception if the source is missing a target field, that would likely resolve a lot of the issues/difference with the write paths, and make this sink cast a non-issue. If you really think null-filling should be configurable (whether defaulting to false or true), that should be a different setting than "schema merging", as the use case is very different.

+1, I have it on my todo list to look into this. UpdateExpressionsSupport is historically the place that implements (part of) schema evolution/enforcement for MERGE and UPDATE, but the checks could be moved somewhere else and applied during analysis.

@johanl-db
Copy link
Collaborator Author

@Kimahriman I finally got around to implementing some of the changes we discussed previously
PR: #3822
I've asked @tomvanbussel to review it but you can also have a look if you'd like

E.p. castIfNeeded now uses allowMissingStructField instead of allowStructEvolution as a more accurate description of what the associated behavior is, and the fact that streaming writes actually allow missing struct fields even when schema evolution is disabled.
I'm still making that behavior configurable because we need to preserve existing behavior, and UPDATE/MERGE behave differently than streaming writes today (UPDATE/MERGE only allow missing struct fields when schema evolution is enabled)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants