-
Notifications
You must be signed in to change notification settings - Fork 2.5k
[HUDI-3583] Fix MarkerBasedRollbackStrategy NoSuchElementException #4984
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Caused by: java.util.NoSuchElementException: No value present in Option at org.apache.hudi.common.util.Option.get(Option.java:88) at org.apache.hudi.table.action.rollback.MarkerBasedRollbackStrategy.getRollbackRequestForAppend(MarkerBasedRollbackStrategy.java:135) at org.apache.hudi.table.action.rollback.MarkerBasedRollbackStrategy.lambda$getRollbackRequests$51dfbbfe$1(MarkerBasedRollbackStrategy.java:94) at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1040) at scala.collection.Iterator$$anon$11.next(Iterator.scala:410) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310) at scala.collection.AbstractIterator.to(Iterator.scala:1334) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1334) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289) at scala.collection.AbstractIterator.toArray(Iterator.scala:1334) at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$15.apply(RDD.scala:989) at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$15.apply(RDD.scala:989) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2274) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2274) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:123) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:413) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1551) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:419) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) |
|
@yihua Can you help me when I have time? I don’t know if I will have other problems with this change. |
yihua
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The fix LGTM. I have one comment about tests.
| // therefore we simply stub this value. | ||
| Map<String, Long> logFilesWithBlocsToRollback = | ||
| Collections.singletonMap(latestLogFile.getFileStatus().getPath().toString(), -1L); | ||
| Option<HoodieLogFile> logFileOption = FSUtils.getLatestLogFile(table.getMetaClient().getFs(), partitionPath, fileId, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@liujinhui1994 could you add a unit test where the latest log is absent for this new logic?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tomorrow I will write a unit test, currently MAC is not around
| // therefore we simply stub this value. | ||
| Map<String, Long> logFilesWithBlocsToRollback = | ||
| Collections.singletonMap(latestLogFile.getFileStatus().getPath().toString(), -1L); | ||
| Option<HoodieLogFile> logFileOption = FSUtils.getLatestLogFile(table.getMetaClient().getFs(), partitionPath, fileId, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: logFileOption -> latestLogFileOption
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
yihua
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. I pushed a minor fix for code style.
…pache#4984) Co-authored-by: Y Ethan Guo <[email protected]>
…pache#4984) Co-authored-by: Y Ethan Guo <[email protected]>
…pache#4984) Co-authored-by: Y Ethan Guo <[email protected]> (cherry picked from commit e60acc1)
Tips
What is the purpose of the pull request
Today, in a production environment, when I use structuredStreaming to write to a MOR table, maybe for some reason there is a marker file under ./hoodie/temp/ but the data file doesn't exist. An exception will be thrown at this point, when I add the judgment, the task can run normally
Committer checklist
Has a corresponding JIRA in PR title & commit
Commit message is descriptive of the change
CI is green
Necessary doc changes done or have another open PR
For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.