Skip to content

Conversation

@dongkelun
Copy link
Contributor

@dongkelun dongkelun commented Oct 3, 2021

hudi: 0.9.0
spark: 2.4.5
hive
create database test_hudi;
spark-shell --master yarn --deploy-mode client --executor-memory 2G --num-executors 3 --executor-cores 2 --driver-memory 4G --driver-cores 2 --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'  --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' --principal ..  --keytab ..

import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.QuickstartUtils.{DataGenerator, convertToStringList, getQuickstartWriteConfigs}
import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
import org.apache.spark.sql.SaveMode._
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.sql.functions.lit
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.keygen.SimpleKeyGenerator
import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodiePayloadProps}
import org.apache.hudi.io.HoodieMergeHandle
import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.spark.sql.functions._

import spark.implicits._
val df = Seq((1, "a1", 10, 1000, "2022-01-19")).toDF("id", "name", "value", "ts", "dt")

df.write.format("hudi").
option(HoodieWriteConfig.TBL_NAME.key, "test_hudi_table_sync_hive").
option(TABLE_TYPE.key, COW_TABLE_TYPE_OPT_VAL).
option(RECORDKEY_FIELD.key, "id").
option(PRECOMBINE_FIELD.key, "ts").
option(KEYGENERATOR_CLASS_OPT_KEY, "org.apache.hudi.keygen.NonpartitionedKeyGenerator").
option("hoodie.datasource.write.partitionpath.field", "").
option("hoodie.metadata.enable", false).
option(KEYGENERATOR_CLASS_OPT_KEY, "org.apache.hudi.keygen.ComplexKeyGenerator").
option(META_SYNC_ENABLED.key(), true).
option(HIVE_USE_JDBC.key(), false).
option(HIVE_DATABASE.key(), "test_hudi").
option(HIVE_AUTO_CREATE_DATABASE.key(), true).
option(HIVE_TABLE.key(), "test_hudi_table_sync_hive").
option(HIVE_PARTITION_EXTRACTOR_CLASS.key(), "org.apache.hudi.hive.MultiPartKeysValueExtractor").
mode("overwrite").
save("/test_hudi/test_hudi_table_sync_hive")
# hoodie.properties
hoodie.table.precombine.field=ts
hoodie.table.partition.fields=
hoodie.table.type=COPY_ON_WRITE
hoodie.archivelog.folder=archived
hoodie.populate.meta.fields=true
hoodie.timeline.layout.version=1
hoodie.table.version=3
hoodie.table.recordkey.fields=id
hoodie.table.base.file.format=PARQUET
hoodie.table.timeline.timezone=LOCAL
hoodie.table.keygenerator.class=org.apache.hudi.keygen.ComplexKeyGenerator
hoodie.table.name=test_hudi_table_sync_hive
hoodie.datasource.write.hive_style_partitioning=false

hive

show create table test_hudi_table_sync_hive;
+----------------------------------------------------+
|                   createtab_stmt                   |
+----------------------------------------------------+
| CREATE EXTERNAL TABLE `test_hudi_table_sync_hive`( |
|   `_hoodie_commit_time` string,                    |
|   `_hoodie_commit_seqno` string,                   |
|   `_hoodie_record_key` string,                     |
|   `_hoodie_partition_path` string,                 |
|   `_hoodie_file_name` string,                      |
|   `id` int,                                        |
|   `name` string,                                   |
|   `value` int,                                     |
|   `ts` int,                                        |
|   `dt` string)                                     |
| ROW FORMAT SERDE                                   |
|   'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'  |
| WITH SERDEPROPERTIES (                             |
|   'hoodie.query.as.ro.table'='false',              |
|   'path'='/test_hudi/test_hudi_table_sync_hive')   |
| STORED AS INPUTFORMAT                              |
|   'org.apache.hudi.hadoop.HoodieParquetInputFormat'  |
| OUTPUTFORMAT                                       |
|   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' |
| LOCATION                                           |
|   'hdfs://cluster1/test_hudi/test_hudi_table_sync_hive' |
| TBLPROPERTIES (                                    |
|   'last_commit_time_sync'='20220119110215185',     |
|   'spark.sql.sources.provider'='hudi',             |
|   'spark.sql.sources.schema.numParts'='1',         |
|   'spark.sql.sources.schema.part.0'='{"type":"struct","fields":[{"name":"_hoodie_commit_time","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_commit_seqno","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_record_key","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_partition_path","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_file_name","type":"string","nullable":true,"metadata":{}},{"name":"id","type":"integer","nullable":false,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"value","type":"integer","nullable":false,"metadata":{}},{"name":"ts","type":"integer","nullable":false,"metadata":{}},{"name":"dt","type":"string","nullable":true,"metadata":{}}]}',  |
|   'transient_lastDdlTime'='1642561355')            |
+----------------------------------------------------+
28 rows selected (0.429 seconds)
spark-sql --master yarn --deploy-mode client  --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'  --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' --principal .. --keytab ..

update test_hudi.test_hudi_table_sync_hive set name='a2' where id=1;

exception:

22/01/19 13:43:56 INFO UpdateHoodieTableCommand: start execute update command for `test_hudi`.`test_hudi_table_sync_hive`
22/01/19 13:43:56 ERROR SparkSQLDriver: Failed in [update test_hudi.test_hudi_table_sync_hive set name='aaa' where id=2]
java.lang.AssertionError: assertion failed: There are no primary key in table `test_hudi`.`test_hudi_table_sync_hive`, cannot execute update operator
        at scala.Predef$.assert(Predef.scala:170)
        at org.apache.spark.sql.hudi.command.UpdateHoodieTableCommand.buildHoodieConfig(UpdateHoodieTableCommand.scala:91)
        at org.apache.spark.sql.hudi.command.UpdateHoodieTableCommand.run(UpdateHoodieTableCommand.scala:73)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
        at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:194)
        at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:194)
        at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3370)
        at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3369)
        at org.apache.spark.sql.Dataset.<init>(Dataset.scala:194)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:79)
        at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:642)
        at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:694)
        at org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:62)
        at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:371)
        at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:376)
        at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:274)
        at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:845)
        at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
        at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
        at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
        at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:920)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:929)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
java.lang.AssertionError: assertion failed: There are no primary key in table `test_hudi`.`test_hudi_table_sync_hive`, cannot execute update operator
        at scala.Predef$.assert(Predef.scala:170)
        at org.apache.spark.sql.hudi.command.UpdateHoodieTableCommand.buildHoodieConfig(UpdateHoodieTableCommand.scala:91)
        at org.apache.spark.sql.hudi.command.UpdateHoodieTableCommand.run(UpdateHoodieTableCommand.scala:73)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
        at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:194)
        at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:194)
        at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3370)
        at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3369)
        at org.apache.spark.sql.Dataset.<init>(Dataset.scala:194)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:79)
        at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:642)
        at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:694)
        at org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:62)
        at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:371)
        at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:376)
        at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:274)
        at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:845)
        at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
        at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
        at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
        at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:920)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:929)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)


hudi 0.11.0 master build
The above exception has been resolved, but delete will throw the following exception:
: org.apache.hudi.exception.HoodieKeyException: recordKey values: "uuid:__null__" for fields: [uuid] cannot be entirely null or empty.
        at org.apache.hudi.keygen.KeyGenUtils.getRecordKey(KeyGenUtils.java:109)
        at org.apache.hudi.keygen.ComplexAvroKeyGenerator.getRecordKey(ComplexAvroKeyGenerator.java:43)
        at org.apache.hudi.keygen.ComplexKeyGenerator.getRecordKey(ComplexKeyGenerator.java:49)
        at org.apache.hudi.keygen.BaseKeyGenerator.getKey(BaseKeyGenerator.java:65)
        at org.apache.spark.sql.hudi.command.SqlKeyGenerator.getRecordKey(SqlKeyGenerator.scala:64)
        at org.apache.hudi.keygen.BaseKeyGenerator.getKey(BaseKeyGenerator.java:65)
        at org.apache.hudi.HoodieSparkSqlWriter$$anonfun$1.apply(HoodieSparkSqlWriter.scala:170)
        at org.apache.hudi.HoodieSparkSqlWriter$$anonfun$1.apply(HoodieSparkSqlWriter.scala:170)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
        at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:193)
        at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
        at org.apache.spark.scheduler.Task.run(Task.scala:123)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

        at org.apache.hudi.keygen.KeyGenUtils.getRecordKey(KeyGenUtils.java:109)
        at org.apache.hudi.keygen.ComplexAvroKeyGenerator.getRecordKey(ComplexAvroKeyGenerator.java:43)
        at org.apache.hudi.keygen.ComplexKeyGenerator.getRecordKey(ComplexKeyGenerator.java:49)
        at org.apache.hudi.keygen.BaseKeyGenerator.getKey(BaseKeyGenerator.java:65)
        at org.apache.spark.sql.hudi.command.SqlKeyGenerator.getRecordKey(SqlKeyGenerator.scala:64)
        at org.apache.hudi.keygen.BaseKeyGenerator.getKey(BaseKeyGenerator.java:65)
        at org.apache.hudi.HoodieSparkSqlWriter$$anonfun$1.apply(HoodieSparkSqlWriter.scala:170)
        at org.apache.hudi.HoodieSparkSqlWriter$$anonfun$1.apply(HoodieSparkSqlWriter.scala:170)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
        at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:193)
        at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
        at org.apache.spark.scheduler.Task.run(Task.scala:123)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
22/01/19 13:33:39 INFO DAGScheduler: Job 60 failed: countByKey at HoodieJavaPairRDD.java:103, took 0.671469 s
22/01/19 13:33:39 ERROR SparkSQLDriver: Failed in [delete from test_hudi.test_hudi_table_sync_hive where id = 1]
org.apache.hudi.exception.HoodieUpsertException: Failed to delete for commit time 20220119133338404
        at org.apache.hudi.table.action.commit.SparkDeleteHelper.execute(SparkDeleteHelper.java:120)
        at org.apache.hudi.table.action.commit.SparkDeleteCommitActionExecutor.execute(SparkDeleteCommitActionExecutor.java:46)
        at org.apache.hudi.table.HoodieSparkCopyOnWriteTable.delete(HoodieSparkCopyOnWriteTable.java:136)
        at org.apache.hudi.table.HoodieSparkCopyOnWriteTable.delete(HoodieSparkCopyOnWriteTable.java:103)
        at org.apache.hudi.client.SparkRDDWriteClient.delete(SparkRDDWriteClient.java:256)
        at org.apache.hudi.DataSourceUtils.doDeleteOperation(DataSourceUtils.java:226)
        at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:191)
        at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:164)
        at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:83)
        at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:81)
        at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
        at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
        at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
        at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
        at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
        at org.apache.spark.sql.hudi.command.DeleteHoodieTableCommand.run(DeleteHoodieTableCommand.scala:51)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
        at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:194)
        at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:194)
        at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3370)
        at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3369)
        at org.apache.spark.sql.Dataset.<init>(Dataset.scala:194)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:79)
        at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:642)
        at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:694)
        at org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:62)
        at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:371)
        at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:376)
        at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:274)
        at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:845)
        at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
        at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
        at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
        at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:920)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:929)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

What is the purpose of the pull request

Add default hiveTableSerdeProperties for Spark SQL when sync Hive

Brief change log

(for example:)

  • Add default hiveTableSerdeProperties for Spark SQL when sync Hive
  • Code optimization/code formatting

Verify this pull request

(Please pick either of the following options)

This pull request is a trivial rework / code cleanup without any test coverage.

(or)

This pull request is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end.
  • Added HoodieClientWriteTest to verify the change.
  • Manually verified the change by running a job locally.

Committer checklist

  • Has a corresponding JIRA in PR title & commit

  • Commit message is descriptive of the change

  • CI is green

  • Necessary doc changes done or have another open PR

  • For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.

@dongkelun
Copy link
Contributor Author

@hudi-bot run azure

1 similar comment
@dongkelun
Copy link
Contributor Author

@hudi-bot run azure

@dongkelun
Copy link
Contributor Author

@vinothchandar @umehrot2 hello,can you help to review this?

@dongkelun
Copy link
Contributor Author

@xushiyan @YannByron Hi,Can you also help review this PR please?

@xushiyan xushiyan added component:catalog-sync Catalog-sync related priority:critical Production degraded; pipelines stalled area:sql SQL interfaces labels Jan 18, 2022
Copy link
Member

@xushiyan xushiyan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. I'll let @YannByron take another pass.

@YannByron
Copy link
Contributor

@dongkelun please rebase master first of all.

hoodieWriteClient: Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]] = Option.empty,
asyncCompactionTriggerFn: Option[Function1[SparkRDDWriteClient[HoodieRecordPayload[Nothing]], Unit]] = Option.empty,
asyncClusteringTriggerFn: Option[Function1[SparkRDDWriteClient[HoodieRecordPayload[Nothing]], Unit]] = Option.empty
asyncCompactionTriggerFn: Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]] => Unit] = Option.empty,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice.

var syncClientToolClassSet = scala.collection.mutable.Set[String]()
hoodieConfig.getString(META_SYNC_CLIENT_TOOL_CLASS_NAME).split(",").foreach(syncClass => syncClientToolClassSet += syncClass)
hoodieConfig.getString(META_SYNC_CLIENT_TOOL_CLASS_NAME).split(",").foreach(syncClass => syncClientToolClassSet += syncClass)
hoodieConfig.setValue(HIVE_TABLE_SERDE_PROPERTIES, getHiveTableSerdeProperties(hoodieConfig))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, this code and getHiveTableSerdeProperties is for this pr?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

writeClient.close()
}
val metaSyncSuccess = metaSync(sqlContext.sparkSession, hoodieConfig, basePath, df.schema)
metaSyncSuccess
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do not change this. If i'm not wrong, it will execute metaSync when mode == SaveMode.Ignore && tableExists is false according the original codes. But now, it won't.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I probably understand what you mean, but I'm not sure whether it's necessary to execute metaSync in this case

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you are not sure whether it need to be changed, just leave it alone.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to sync hive since we don't actually write data.Also, the same logic is used in HoodieSparkSqlWriter.write.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Has been submitted separately to another PR: 4632

@YannByron
Copy link
Contributor

@dongkelun
Can you show the detailed steps to reproduce this issue. The context in the related ticket, I feel that the table itself is not configured with a primary key. And HoodieCatalogTable do not get the configs from hive properties.

@YannByron
Copy link
Contributor

@dongkelun
a friendly advice, a pr should focus on a specific issue or feature. you can submit another pr for code format, cleanup of unused code, etc..

@dongkelun
Copy link
Contributor Author

@dongkelun a friendly advice, a pr should focus on a specific issue or feature. you can submit another pr for code format, cleanup of unused code, etc..

OK, thank you for your advice. I'll pay attention to it later

@dongkelun
Copy link
Contributor Author

@dongkelun a friendly advice, a pr should focus on a specific issue or feature. you can submit another pr for code format, cleanup of unused code, etc..
Has been submitted separately to another PR: 4631

@dongkelun
Copy link
Contributor Author

@dongkelun Can you show the detailed steps to reproduce this issue. The context in the related ticket, I feel that the table itself is not configured with a primary key. And HoodieCatalogTable do not get the configs from hive properties.

Hello, I have updated the detailed steps and added attachments to JIRA

@dongkelun dongkelun changed the title [HUDI-2514] Add default hiveTableSerdeProperties for Spark SQL when sync Hive [HUDI-2514] Fix delete exception for Spark SQL when sync Hive Jan 19, 2022
@xushiyan
Copy link
Member

@YannByron thanks for the review. can you take it from here please? from the reproducing steps, looks like a different bug where primary key and other properties were not respected by HMS?

@YannByron
Copy link
Contributor

@YannByron thanks for the review. can you take it from here please? from the reproducing steps, looks like a different bug where primary key and other properties were not respected by HMS?

@xushiyan ok, i'll take this.
And it'a real different case, better to create another pr and ticket. Otherwise, it can feel strange and confusing. @dongkelun

@dongkelun
Copy link
Contributor Author

@YannByron thanks for the review. can you take it from here please? from the reproducing steps, looks like a different bug where primary key and other properties were not respected by HMS?

@xushiyan ok, i'll take this. And it'a real different case, better to create another pr and ticket. Otherwise, it can feel strange and confusing. @dongkelun

A new PR has been submitted:4644

@dongkelun dongkelun changed the title [HUDI-2514] Fix delete exception for Spark SQL when sync Hive [HUDI-2514] Add default hiveTableSerdeProperties for Spark SQL when sync Hive Jan 20, 2022
@dongkelun
Copy link
Contributor Author

In order to avoid confusion and ambiguity, I have retreated to the historical commitid:071b13b

@hudi-bot
Copy link
Collaborator

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@xushiyan
Copy link
Member

closing as fixed in #4644

@xushiyan xushiyan closed this Jan 23, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:sql SQL interfaces component:catalog-sync Catalog-sync related priority:critical Production degraded; pipelines stalled

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants