Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-6453] Cascade Glue schema changes to partitions #9071

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

CTTY
Copy link
Contributor

@CTTY CTTY commented Jun 27, 2023

Change Logs

Cascade changes to partition level for Glue sync tool.

We found when Hudi uses AwsGlueCatalogSyncTool to sync schema changes to Glue, it only changes table schema without cascading partition level schema. But this behavior is actually expected because we never implement cascading behavior for AwsGlueCatalogSyncClient LOC

This would cause problems when users change their schema later on. Because the schema changes it not cascaded, only newer partitions would use the new schema and older partitions would still have old schema in Glue. Then when users use engines like Athena that's aware of partition-level schema to query Glue catalog it would seem the older partition is not readable due to failures described here: Athena partition schema mismatch errors

This patch would specifically update partition info on Glue with Glue API if cascade is eligible in AwsGlueCatalogSyncClient

Impact

Minor performance impact on Glue sync tool as we need to sync partition info, but this has to be done to ensure correctness

Risk level (write none, low medium or high below)

low

Documentation Update

Describe any necessary documentation update if there is any new feature, config, or user-facing change

  • The config description must be updated if new configs are added or the default value of the configs are changed
  • Any new feature or user-facing change requires updating the Hudi website. Please create a Jira ticket, attach the
    ticket number here and follow the instruction to make
    changes to the website.

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

@hudi-bot
Copy link

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@danny0405
Copy link
Contributor

@CTTY Can you elaborate a little more about the purpose and details of the patch, not sure what the patch is trying to fix.

@CTTY
Copy link
Contributor Author

CTTY commented Jun 30, 2023

@danny0405 Hi Danny, thanks for taking a look

We found when Hudi uses AwsGlueCatalogSyncTool to sync schema changes to Glue, it only changes table schema without cascading partition level schema. But this behavior is actually expected because we never implement cascading behavior for AwsGlueCatalogSyncClient LOC

This would cause problems when users change their schema later on. Because the schema changes it not cascaded, only newer partitions would use the new schema and older partitions would still have old schema in Glue. Then when users use engines like Athena that's aware of partition-level schema to query Glue catalog it would seem the older partition is not readable due to failures described here: Athena partition schema mismatch errors

This patch would specifically update partition info on Glue with Glue API if cascade is eligible in AwsGlueCatalogSyncClient

@parisni
Copy link
Contributor

parisni commented Jul 2, 2023

the older partition is not readable due to failures described here

The aws link says the error would only apply to cav/ json format. Am I missing something ?

Also Athena will compare the table schema VS partition schema but will eventually return reconciles schema. Again, am I wrong ?

Do you have a piece if code w/ reproductible error to illustrate the issue ? Looks critical

@CTTY
Copy link
Contributor Author

CTTY commented Jul 2, 2023

Hi @parisni,

Sure I've attached a script to Jira to reproduce the issue: https://issues.apache.org/jira/browse/HUDI-6453

@danny0405 danny0405 added release-0.14.0 priority:critical production down; pipelines stalled; Need help asap. labels Jul 3, 2023
@danny0405 danny0405 self-assigned this Jul 3, 2023
@danny0405
Copy link
Contributor

danny0405 commented Jul 3, 2023

@parisni , do you have interest to take a look of this PR?

@parisni
Copy link
Contributor

parisni commented Jul 3, 2023

Yes will review this asap, likely in two days or so

@nsivabalan nsivabalan added priority:blocker and removed priority:critical production down; pipelines stalled; Need help asap. labels Jul 5, 2023
@yihua yihua added priority:critical production down; pipelines stalled; Need help asap. and removed priority:blocker labels Jul 6, 2023
@parisni
Copy link
Contributor

parisni commented Jul 6, 2023

Hi @CTTY ,
I ran your script and reproduced the error on athena v3, which is:

HIVE_PARTITION_SCHEMA_MISMATCH: There is a mismatch between the table and partition schemas. The types are incompatible and cannot be coerced. The column 'accountqueues' in table 'default.test_hudi_table_362' is declared as type 'array<struct<estimateddequeuetime:bigint,estimatedtimeinqueue:bigint,senderenqueuingattempts:array<struct<arenafrom:string,arenato:string,delay:bigint,outcome:string,queuefrom:string,queuefromlength:int,queueto:string,reason:string,shardid:string>>>>', but partition 'type=1' declared column 'accountqueues' as type 'array<struct<estimateddequeuetime:bigint,estimatedtimeinqueue:bigint,senderenqueuingattempts:array<struct<delay:bigint,outcome:string,queuefromlength:int,shardid:string>>>>'.

I also tried to reproduce the error (see below script), and what I can tell the error only triggers when new fields in struct are added, changing the existing order.

I have added several comments on your PR accordingly

from pyspark.sql.types import StructType, StructField, StringType, IntegerType
data = [
        {"uuid":1, "user_id":"f5c2ebfd-f57b-4ff3-ac5c-f30674037b21", "mp": {"col1":"foo"}, "ts":"BC", "part":"C"},
        {"uuid":1, "user_id":"f5c2ebfd-f57b-4ff3-ac5c-f30674037b21", "mp": {"col1":"foo"}, "ts":"BC", "part":"D"},
]

schema = StructType(
    [
        StructField("uuid", IntegerType(), True),
        StructField("user_id", StringType(), True),
        StructField(
            "mp",
            StructType([StructField("col1", StringType(), True)])
        ),
        StructField("ts", StringType(), True),
        StructField("part", StringType(), True),
    ]
)
df = spark.createDataFrame(data=data, schema=schema)

bucket = ...
tableName = "test_hudi_aws_part"
basePath = f"s3://"+bucket+"/test/" + tableName

hudi_options = {
    "hoodie.table.name": tableName,
    "hoodie.datasource.write.recordkey.field": "uuid",
    "hoodie.datasource.write.partitionpath.field": "part",
    "hoodie.datasource.write.table.name": tableName,
    "hoodie.datasource.write.operation": "upsert",
    "hoodie.datasource.write.precombine.field": "ts",
    "hoodie.upsert.shuffle.parallelism": 2,
    "hoodie.insert.shuffle.parallelism": 2,
    "hoodie.datasource.hive_sync.enable": "true",
    "hoodie.datasource.hive_sync.database": "default",
    "hoodie.datasource.hive_sync.table": tableName,
    "hoodie.datasource.hive_sync.mode": "jdbc",
    "hoodie.meta.sync.client.tool.class": "org.apache.hudi.aws.sync.AwsGlueCatalogSyncTool",
    "hoodie.datasource.hive_sync.partition_fields": "part",
    "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor",
}
(df.write.format("hudi").options(**hudi_options).mode("overwrite").save(basePath))

# this works fine
spark.table("default."+tableName).show()

# Now evolve part
data = [
        {"uuid":1, "user_id":"f5c2ebfd-f57b-4ff3-ac5c-f30674037b21", "mp": {"col1":"foo", "new_col":"bar"}, "ts":"BC", "part":"C"},
]

schema = StructType(
    [
        StructField("uuid", IntegerType(), True),
        StructField("user_id", StringType(), True),
        StructField(
            "mp",
# fail when order change   StructType([StructField("new_col", StringType(), True), StructField("col1", StringType(), True)])
            StructType([StructField("col1", StringType(), True), StructField("new_col", StringType(), True)])
        ),
        StructField("ts", StringType(), True),
        StructField("part", StringType(), True),
    ]
)
df = spark.createDataFrame(data=data, schema=schema)
(df.write.format("hudi").options(**hudi_options).mode("append").save(basePath))
spark.table("default."+tableName).show()
# HIVE_PARTITION_SCHEMA_MISMATCH: There is a mismatch between the table and partition schemas. The types are incompatible and cannot be coerced. The column 'mp' in table 'default.test_hudi_aws_part' is declared as type 'struct<new_col:string,col1:string>', but partition 'part=C' declared column 'mp' as type 'struct<col1:string>'.

GetPartitionsResult result = awsGlue.getPartitions(new GetPartitionsRequest()
.withDatabaseName(databaseName)
.withTableName(tableName)
.withNextToken(nextToken));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

set awsGlue client .withExcludeColumnSchema(true) to limit network transfer ?

@@ -355,6 +336,43 @@ public void updateTableSchema(String tableName, MessageType newSchema) {
.withTableInput(updatedTableInput);

awsGlue.updateTable(request);

if (!table.getPartitionKeys().isEmpty() && cascade) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't cascade redondant w/ table.getPartitionKeys().isEmpty() ?

@@ -330,7 +312,6 @@ && getTable(awsGlue, databaseName, tableName).getPartitionKeys().equals(partitio

@Override
public void updateTableSchema(String tableName, MessageType newSchema) {
// ToDo Cascade is set in Hive meta sync, but need to investigate how to configure it for Glue meta
boolean cascade = config.getSplitStrings(META_SYNC_PARTITION_FIELDS).size() > 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we should restrict cascade only when the issue occurs: when the schema evolution targets new unordered struct fields. In the general case there is no need to cascade

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 only cascade when necessary. Cascading could adversely impact sync latency when there are tens of thousands of partitions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cascade is necessary when:
schemaDifference.getUpdateColumnTypes() is not empty (eg: field added in between, or promotion type)

It is not necessary in other cases (schemaDifference.getAddColumnTypes() or .getDeleteColumns())

Then I think we should change the updateTableSchema signature to pass SchemaDifference and let the metastore implementation decide to cascade or not based on differences.

@parisni
Copy link
Contributor

parisni commented Jul 18, 2023

I guess we could avoid re-writing all the partitions if we manage struct keys order. I mean, when there is a new key in a struct, then add it after the existing ones, never prepend them.

Then it would work smoothly. WDYT @CTTY

@nsivabalan
Copy link
Contributor

hey @CTTY @parisni : We are looking to cut branch for 0.14.0 in 1 day. If you can coordinate and land the patch, would be good and we can pull it in. CC @prashantwason

Copy link
Member

@codope codope left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think as long as new field is nullable and is being added to the end (even for nested fields, add as a last field of struct), then schema evolution should happen automatically. If we want to support unordered addition of fields then let's restrict cascading to that case?

@@ -330,7 +312,6 @@ && getTable(awsGlue, databaseName, tableName).getPartitionKeys().equals(partitio

@Override
public void updateTableSchema(String tableName, MessageType newSchema) {
// ToDo Cascade is set in Hive meta sync, but need to investigate how to configure it for Glue meta
boolean cascade = config.getSplitStrings(META_SYNC_PARTITION_FIELDS).size() > 0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 only cascade when necessary. Cascading could adversely impact sync latency when there are tens of thousands of partitions.

@parisni
Copy link
Contributor

parisni commented Aug 14, 2023

If we want to support unordered addition of fields then let's restrict cascading to that case?

Unlike hive metastore, glue supports unordered addition of columns. It does not support unordered addition of struct fields.

I also don't know how glue handle struct evolution:

  • new field -> should be added to the end
  • field dropped -> ?
  • field renamed -> ?

The current approach in this PR would handle any evolution. This might be then a good approach, but we definitely should trigger it in last resort (ie: when a struct evolves in a not supported way)

@nsivabalan
Copy link
Contributor

So, do we still need some jamming to get the patch to close line ? I am looking for patches to land towards 0.14.1.

@bvaradar
Copy link
Contributor

@CTTY : Can you resolve the open questions in this PR so that we can try bringing this to completion ?

@github-actions github-actions bot added the size:M PR with lines of changes in (100, 300] label Feb 26, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
priority:critical production down; pipelines stalled; Need help asap. release-0.14.1 size:M PR with lines of changes in (100, 300]
Projects
Status: 🆕 New
Development

Successfully merging this pull request may close these issues.

8 participants