-
Notifications
You must be signed in to change notification settings - Fork 2.5k
Closed
Labels
area:writerWrite client and core write operationsWrite client and core write operationspriority:criticalProduction degraded; pipelines stalledProduction degraded; pipelines stalled
Description
I am running hudi 0.11.0 on spark 3.2.1
When running a bulk insert operation on an existing hudi table, I get the below NPE, resulting as a rollback
I also tried to create a new table from scratch :
- the first bulk insert works fine
- the second bulk insert operation fails
When I turn OFF optimistic concurrency writer, then all works fine.
This looks quite similar to #4635
Caused by: org.apache.spark.SparkException: Writing job aborted
at org.apache.spark.sql.errors.QueryExecutionErrors$.writingJobAbortedError(QueryExecutionErrors.scala:613)
at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:386)
at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:330)
at org.apache.spark.sql.execution.datasources.v2.AppendDataExec.writeWithV2(WriteToDataSourceV2Exec.scala:236)
at org.apache.spark.sql.execution.datasources.v2.V2ExistingTableWriteExec.run(WriteToDataSourceV2Exec.scala:309)
at org.apache.spark.sql.execution.datasources.v2.V2ExistingTableWriteExec.run$(WriteToDataSourceV2Exec.scala:308)
at org.apache.spark.sql.execution.datasources.v2.AppendDataExec.run(WriteToDataSourceV2Exec.scala:236)
at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43)
at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43)
at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:110)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:110)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:106)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:457)
at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:106)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:93)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:91)
at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:128)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:848)
at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:311)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:247)
at org.apache.hudi.HoodieSparkSqlWriter$.bulkInsertAsRow(HoodieSparkSqlWriter.scala:559)
at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:176)
at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:163)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:110)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:110)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:106)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:457)
at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:106)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:93)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:91)
at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:128)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:848)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:382)
at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:303)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
... 11 more
Caused by: org.apache.hudi.exception.HoodieException
at org.apache.hudi.internal.DataSourceInternalWriterHelper.commit(DataSourceInternalWriterHelper.java:86)
at org.apache.hudi.spark3.internal.HoodieDataSourceInternalBatchWrite.commit(HoodieDataSourceInternalBatchWrite.java:93)
at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:369)
... 78 more
Caused by: java.lang.NullPointerException
org.apache.hudi.client.utils.TransactionUtils.lambda$getCompletedInstantsDuringCurrentWriteOperation$1(TransactionUtils.java:159)
at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:174)
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
at java.util.stream.StreamSpliterators$WrappingSpliterator.forEachRemaining(StreamSpliterators.java:313)
at java.util.stream.Streams$ConcatSpliterator.forEachRemaining(Streams.java:743)
at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:647)
at org.apache.hudi.client.utils.TransactionUtils.resolveWriteConflictIfAny(TransactionUtils.java:79)
at org.apache.hudi.client.SparkRDDWriteClient.preCommit(SparkRDDWriteClient.java:475)
at org.apache.hudi.client.BaseHoodieWriteClient.commitStats(BaseHoodieWriteClient.java:233)
at org.apache.hudi.client.BaseHoodieWriteClient.commitStats(BaseHoodieWriteClient.java:214)
at org.apache.hudi.internal.DataSourceInternalWriterHelper.commit(DataSourceInternalWriterHelper.java:83)
... 80 more
See bellow the hudi config used:
{hoodie.write.lock.dynamodb.region=XXXX,
hoodie.index.type=BLOOM,
hoodie.clean.automatic=false,
hoodie.write.lock.provider=org.apache.hudi.aws.transaction.lock.DynamoDBBasedLockProvider,
hoodie.compact.inline=false,
hoodie.datasource.write.recordkey.field=XXXXX,
hoodie.bloom.index.use.caching=true,
hoodie.avro.schema.validate=false,
hoodie.metadata.enable=true,
hoodie.datasource.write.table.type=COPY_ON_WRITE,
hoodie.bloom.index.use.metadata=false,
hoodie.embed.timeline.server.async=true,
hoodie.cleaner.parallelism=1500,
hoodie.write.lock.dynamodb.partition_key=XXXXXX,
hoodie.cleaner.commits.retained=3,
index.global.enabled=true,
hoodie.write.lock.dynamodb.table=XXXXX,
hoodie.metadata.index.bloom.filter.enable=true,
hoodie.delete.shuffle.parallelism=1500,
hoodie.upsert.shuffle.parallelism=1500,
hoodie.table.name=XXXX,
hoodie.insert.shuffle.parallelism=1500,
hoodie.datasource.write.precombine.field=XXXXX,
hoodie.cleaner.policy.failed.writes=LAZY,
hoodie.embed.timeline.server=true,
hoodie.aws.secret.key=XXXXX,
hoodie.datasource.write.operation=bulk_insert,
hoodie.compact.inline.max.delta.commits=2,
hoodie.table.precombine.field=XXXXX,
hoodie.metadata.index.column.stats.enable=true,
hoodie.enable.data.skipping=true,
hoodie.clean.async=false,
hoodie.archive.merge.enable=true,
hoodie.keep.max.commits=5,
hoodie.write.lock.dynamodb.billing_mode=PAY_PER_REQUEST,
hoodie.datasource.write.hive_style_partitioning=true,
hoodie.client.heartbeat.interval_in_ms=120000,
hoodie.aws.access.key=XXXXX,
hoodie.bulkinsert.shuffle.parallelism=1500,
hoodie.keep.min.commits=4,
hoodie.datasource.write.table.name=XXXXXX,
hoodie.parquet.compression.codec=zstd,
hoodie.write.concurrency.mode=optimistic_concurrency_control,
hoodie.bloom.index.prune.by.ranges=false,
hoodie.datasource.write.partitionpath.field=XXXXX
clean.retain_commits=3 }
Metadata
Metadata
Assignees
Labels
area:writerWrite client and core write operationsWrite client and core write operationspriority:criticalProduction degraded; pipelines stalledProduction degraded; pipelines stalled