Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SUPPORT] current state of promotion type support #11599

Closed
parisni opened this issue Jul 9, 2024 · 11 comments · Fixed by #11642 or #11670
Closed

[SUPPORT] current state of promotion type support #11599

parisni opened this issue Jul 9, 2024 · 11 comments · Fixed by #11642 or #11670
Assignees

Comments

@parisni
Copy link
Contributor

parisni commented Jul 9, 2024

hudi 0.14.1
spark 3.3.x

I did not find any mention of hudi promotion type in the documentation

apparently promotion type is not supported #5519 :

there's no way this could be reliably supported w/in Hudi currently

Now from my tests below:

So:

  1. what is the state of promotion type support ?
  2. is it safe ?
  3. should we update the hive sync limitation ?
  4. should we document promotion type support ?
tableName = "hudi_promotion"
basePath = "/tmp/hudi_nested"
from pyspark.sql.functions import expr

NB_RECORDS=10_000 # generate enough records to get multiple parquet files
hudi_options = {
    "hoodie.table.name": tableName,
    "hoodie.datasource.write.recordkey.field": "event_id",
    "hoodie.datasource.write.partitionpath.field": "event_date",
    "hoodie.datasource.write.table.name": tableName,
    "hoodie.datasource.write.operation": "bulk_insert",
    "hoodie.datasource.write.precombine.field": "version",
    "hoodie.upsert.shuffle.parallelism": 1,
    "hoodie.insert.shuffle.parallelism": 1,
    "hoodie.delete.shuffle.parallelism": 1,
    "hoodie.datasource.write.keygenerator.class": "org.apache.hudi.keygen.ComplexKeyGenerator",
    "hoodie.datasource.write.hive_style_partitioning": "true",
    "hoodie.datasource.hive_sync.database": "default",
    "hoodie.datasource.hive_sync.table": tableName,
    "hoodie.datasource.hive_sync.mode": "hms",
    "hoodie.datasource.hive_sync.enable": "false",
    "hoodie.datasource.hive_sync.partition_fields": "event_date",
    "hoodie.datasource.write.keygenerator.class": "org.apache.hudi.keygen.ComplexKeyGenerator",
    "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor",
    "hoodie.metadata.enable": "true",
    "hoodie.parquet.max.file.size": 1 * 1024 * 1024,
}
spark.sql("drop table if exists hudi_promotion").show()

# INIT TABLE w/ INT, FLOAT and INT
df = (
spark.range(1,NB_RECORDS).withColumn("event_id", expr("id"))
    .withColumn("event_date", expr("current_date()"))
    .withColumn("version", expr("current_date()"))
    .withColumn("int_to_bigint", expr("cast(1 as int)"))
    .withColumn("float_to_double", expr("cast(1.1 as float)"))
    .withColumn("int_to_float", expr("cast(1 as int)"))
)
(df.write.format("hudi").options(**hudi_options).mode("overwrite").save(basePath))
spark.read.format("hudi").load(basePath).printSchema()
#spark.sql("desc table hudi_promotion").show()

# NOW TURN to BIGINT, DOUBLE and FLOAT in an other partition
df = (
spark.range(1,2).withColumn("event_id", expr("id"))
    .withColumn("event_date", expr("current_date() + 1"))
    .withColumn("version", expr("current_date()"))
    .withColumn("int_to_bigint", expr("cast(1 as bigint)"))
    .withColumn("float_to_double", expr("cast(1.1 as double)"))
    .withColumn("int_to_float", expr("cast(1.1 as float)"))
)
(df.write.format("hudi").options(**hudi_options).mode("append").save(basePath))
spark.read.format("hudi").load(basePath).printSchema()
#spark.sql("desc table hudi_promotion").show()


# NOW upsert the previous partition
df = (
spark.range(1,2).withColumn("event_id", expr("id"))
    .withColumn("event_date", expr("current_date()"))
    .withColumn("version", expr("current_date()"))
    .withColumn("int_to_bigint", expr("cast(1 as bigint)"))
    .withColumn("float_to_double", expr("cast(1.1 as double)"))
    .withColumn("int_to_float", expr("cast(1.1 as float)"))
)

(df.write.format("hudi").options(**hudi_options).mode("append").save(basePath))
spark.read.format("hudi").load(basePath).printSchema()
#spark.sql("desc table hudi_promotion").show()
@danny0405
Copy link
Contributor

@jonvex Any insights here?

@jonvex
Copy link
Contributor

jonvex commented Jul 15, 2024

Hive sync has more limited type promotion like you described. We have some documentation here https://hudi.apache.org/docs/schema_evolution but as far as I know, this has only been tested for spark

@parisni
Copy link
Contributor Author

parisni commented Jul 16, 2024 via email

@danny0405
Copy link
Contributor

I will propose a fix for hive sync then.

@parisni That would be very helpful, thanks for the contribution~

@parisni
Copy link
Contributor Author

parisni commented Jul 17, 2024

A quick comparison in hudi/hive promotion type support lead to conclude that hive supports what hudi supports except:

  • string to bytes
  • bytes to string
    I will propose a PR so that hive sync raise an error if the two cases above are meet.

image
image

@parisni
Copy link
Contributor Author

parisni commented Jul 19, 2024

@jonvex
interestingly the same script fails on emr with spark3.2.x and hudi 0.14.1. I don't get this issue locally, and I will have to dig this.

Caused by: org.apache.spark.sql.execution.QueryExecutionException: Parquet column cannot be converted in file s3://<bucket>/test/hudi_promotion/event_date=2024-07-19/2b3377ab-dea4-40e8-b94c-bc7a8daf5c38-0_1-9-0_20240719141850764.parquet. Column: [int_to_bigint], Expected: bigint, Found: INT32
        at org.apache.spark.sql.errors.QueryExecutionErrors$.unsupportedSchemaColumnConvertError(QueryExecutionErrors.scala:570)
        at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:265)
        at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:178)
        at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:661)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:35)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hasNext(Unknown Source)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:954)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:369)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:133)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1474)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        ... 1 more
Caused by: org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException
        at org.apache.spark.sql.execution.datasources.parquet.ParquetVectorUpdaterFactory.constructConvertNotSupportedException(ParquetVectorUpdaterFactory.java:1077)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetVectorUpdaterFactory.getUpdater(ParquetVectorUpdaterFactory.java:172)
        at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:154)
        at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:295)
        at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:193)
        at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
        at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:178)
        at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:261)
        ... 21 more

@parisni
Copy link
Contributor Author

parisni commented Jul 19, 2024

definitely, the same spark written with vanilla spark 3.3.4 :

  • can be successfully read by vanilla spark
  • fail with the above issue with emr w/ hudi 0.14.1

something wrong w/ emr apparently

@parisni
Copy link
Contributor Author

parisni commented Jul 19, 2024

also turns out glue sync likely does not update fields, only sql.schema:

"float_to_double"",""type"":""double"" 
VS
float_to_double         float

IN:

col_name,data_type,comment
event_id                bigint
int_to_long             int
float_to_double         float
_kafka_timestamp        bigint
version                 string
event_date              string
event_hour              string

# Partition Information
# col_name              data_type               comment

version                 string
event_date              string
event_hour              string

"Detailed Table Information     Table(tableName:hudi_promotion, dbName:db, owner:null, createTime:1721405676, lastAccessTime:1721405711, retention:0, sd:StorageDescriptor(cols:[FieldSchema(name:event_id, type:bigint, comment:), FieldSchema(name:int_to_long, type:int, comment:), FieldSchema(name:float_to_double, type:float, comment:), FieldSchema(name:_kafka_timestamp, type:bigint, comment:), FieldSchema(name:version, type:string, comment:), FieldSchema(name:event_date, type:string, comment:), FieldSchema(name:event_hour, type:string, comment:)], location:s3://bucket/test_promotion/hudi_promotion, inputFormat:org.apache.hudi.hadoop.HoodieParquetInputFormat, outputFormat:org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat, compressed:false, numBuckets:0, serdeInfo:SerDeInfo(name:null, serializationLib:org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, parameters:{hoodie.query.as.ro.table=false, serialization.format=1, path=s3a://bucket/test_promotion/hudi_promotion}), bucketCols:[], sortCols:[], parameters:{}, storedAsSubDirectories:false), partitionKeys:[FieldSchema(name:version, type:string, comment:), FieldSchema(name:event_date, type:string, comment:), FieldSchema(name:event_hour, type:string, comment:)], parameters:{EXTERNAL=TRUE, last_commit_time_sync=20240719161441010, spark.sql.sources.schema.numPartCols=3, hudi.metadata-listing-enabled=FALSE, spark.sql.sources.schema.part.0={""type"":""struct"",""fields"":[{""name"":""event_id"",""type"":""long"",""nullable"":true,""metadata"":{}},{""name"":""int_to_long"",""type"":""long"",""nullable"":true,""metadata"":{}},{""name"":""float_to_double"",""type"":""double"",""nullable"":true,""metadata"":{}},{""name"":""_kafka_timestamp"",""type"":""long"",""nullable"":true,""metadata"":{}},{""name"":""version"",""type"":""string"",""nullable"":true,""metadata"":{}},{""name"":""event_date"",""type"":""string"",""nullable"":true,""metadata"":{}},{""name"":""event_hour"",""type"":""string"",""nullable"":true,""metadata"":{}}]}, last_commit_completion_time_sync=20240719161507000, spark.sql.sources.schema.partCol.0=version, spark.sql.sources.schema.partCol.2=event_hour, spark.sql.sources.schema.partCol.1=event_date, spark.sql.sources.schema.numParts=1, spark.sql.sources.provider=hudi}, viewOriginalText:null, viewExpandedText:null, tableType:EXTERNAL_TABLE)  "

EDIT: Found the issue. Will propose a patch.

@parisni
Copy link
Contributor Author

parisni commented Jul 19, 2024

I tested athena support for promotion type. It works fine, until a partition get mixed with multiple types (int/float); eg: after an upsert. The reason is in athena each partition has it's own schema, and it has to be updated with the promoted type. Otherwise such error will occur:

HIVE_PARTITION_SCHEMA_MISMATCH: There is a mismatch between the table and partition schemas. The types are incompatible and cannot be coerced. The column 'int_to_float' in table 'db.hudi_promotion' is declared as type 'float', but partition 'version=0/event_date=2024-01-01/event_hour=08' declared column 'int_to_float' as type 'int'.

or

HIVE_BAD_DATA: Field float_to_double's type DOUBLE in parquet file s3://bucket/test_promotion/hudi_promotion/version=0/event_date=2024-01-01/event_hour=08/45a8c801-26e9-4b57-a47d-640e1287afa1-0_0-55-183_20240719192221903.parquet is incompatible with type real defined in table schema

I guess we should reconsider approach proposed here #9071

@parisni
Copy link
Contributor Author

parisni commented Jul 20, 2024

FYI amazon redshift spectrum likely does not support all of promotion types. At least int_to_float fails with below error (even if the partition schema is right):

ERROR:  Spectrum Scan Error
DETAIL:
  -----------------------------------------------
  error:  Spectrum Scan Error
  code:      15007
  context:   File 'https://s3.eu-west-1.amazonaws.com/bucket/test_promotion/hudi_promotion4/version%3D0/event_date%3D2024-01-01/event_hour%3D08/2ff34814-8284-41b1-a22e-49fd6ba3785b-0_15-23-40_20240719214644303.parquet' has
  location:  dory_util.cpp:1615
  process:   worker_thread [pid=3007]

@parisni
Copy link
Contributor Author

parisni commented Jul 22, 2024

here is a small recap from my tests @jonvex @danny0405 . "No" means a select query ordering on the column will fail at runtime

int->bigint int->float float->double int->string
spark yes yes yes yes
athena yes yes yes no
spectrum yes no yes no

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
Archived in project
3 participants