Skip to content

[HUDI-5253] HoodieMergeOnReadTableInputFormat could have duplicate records issue if it contains delta files while still splittable#7264

Merged
codope merged 3 commits intoapache:masterfrom
boneanxs:fix_flaky
Nov 29, 2022
Merged

Conversation

@boneanxs
Copy link
Copy Markdown
Contributor

@boneanxs boneanxs commented Nov 21, 2022

Change Logs

If HoodieRealtimePath contains delta files, it cannot splittable

Impact

We can also find that sometimes could throw {{IllegalStateException}} duplicates key error when we run the CI.

 java.lang.IllegalStateException: Duplicate key {"_hoodie_commit_time": 20221122170106908, "_hoodie_commit_seqno": 20221122170106908_0_48, "_hoodie_record_key": 53c028d3-e4b0-4d6f-a041-f09e418c36b3, "_hoodie_partition_path": 2016/03/15, "_hoodie_file_name": 6a9fff5b-3b75-48ad-85fe-c621c9a2c25d-0, "timestamp": 0, "_row_key": 53c028d3-e4b0-4d6f-a041-f09e418c36b3, "partition_path": 2016/03/15, "rider": rider-20221122170106908, "driver": driver-20221122170106908, "begin_lat": 0.5407076277518825, "begin_lon": 0.39726822192851885, "end_lat": 0.49363027135660975, "end_lon": 0.6482366665027408, "distance_in_meters": -1534272590, "seconds_since_epoch": 6103867871123100710, "weight": 0.38126373, "nation": 7b 62 79 74 65 73 3d 43 61 6e 61 64 61 7d, "current_date": 1970-01-17, "current_ts": 1460315658, "height": 0.093258, "city_to_state": org.apache.hadoop.io.ArrayWritable@1b28684, "fare": org.apache.hadoop.io.ArrayWritable@694b818f, "tip_history": org.apache.hadoop.io.ArrayWritable@63cb1a9a, "_hoodie_is_deleted": false}

	at java.util.stream.Collectors.lambda$throwingMerger$0(Collectors.java:133)
	at java.util.HashMap.merge(HashMap.java:1254)
	at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1320)
	at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
	at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
	at org.apache.hudi.testutils.GenericRecordValidationTestUtils.assertDataInMORTable(GenericRecordValidationTestUtils.java:95)
	at org.apache.hudi.testutils.GenericRecordValidationTestUtils.assertDataInMORTable(GenericRecordValidationTestUtils.java:80)
	at org.apache.hudi.client.functional.TestHoodieClientOnMergeOnReadStorage.testLogCompactionOnMORTable(TestHoodieClientOnMergeOnReadStorage.java:187)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688)
	at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
	at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
	at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
	at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
	at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
	at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
	at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
	at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:210)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:206)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:131)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:65)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
	at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
	at java.util.ArrayList.forEach(ArrayList.java:1259)
	at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
	at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
	at java.util.ArrayList.forEach(ArrayList.java:1259)
	at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
	at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
	at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:108)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)
	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:96)
	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:75)
	at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:57)
	at com.intellij.rt.junit.IdeaTestRunner$Repeater$1.execute(IdeaTestRunner.java:38)
	at com.intellij.rt.execution.junit.TestsRepeater.repeat(TestsRepeater.java:11)
	at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:35)
	at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)
	at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)

We can easily to reproduce this in org.apache.hudi.testutils.HoodieMergeOnReadTestUtils#getRecordsUsingInputFormat to allow it create more splits

FileInputFormat.setInputPaths(jobConf, String.join(",", inputPaths));
// Add 3 to the inputPaths to create more splits
InputSplit[] splits = inputFormat.getSplits(jobConf, inputPaths.size() + 3);

for (InputSplit split : splits) {
// ...
}

Risk level (write none, low medium or high below)

low

Documentation Update

Describe any necessary documentation update if there is any new feature, config, or user-facing change

  • The config description must be updated if new configs are added or the default value of the configs are changed
  • Any new feature or user-facing change requires updating the Hudi website. Please create a Jira ticket, attach the
    ticket number here and follow the instruction to make
    changes to the website.

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

@boneanxs boneanxs changed the title [HUDI-5253] Could have duplicates if HoodieRealtimePath has log files [HUDI-5253] HoodieMergeOnReadTableInputFormat could have duplicate records issue if it contains delta files while still splittable Nov 22, 2022
@boneanxs
Copy link
Copy Markdown
Contributor Author

@hudi-bot run azure

1 similar comment
@boneanxs
Copy link
Copy Markdown
Contributor Author

@hudi-bot run azure

@boneanxs
Copy link
Copy Markdown
Contributor Author

@danny0405 @xushiyan could you please take a look?

HoodieLogFile logFile = new HoodieLogFile(fs.getFileStatus(new Path(logPath)));
rtPath = new HoodieRealtimePath(new Path("foo"), "bar", basePath.toString(), Collections.singletonList(logFile), "000", false, Option.empty());
assertFalse(new HoodieMergeOnReadTableInputFormat().isSplitable(fs, rtPath), "Path for bootstrap should not be splitable.");
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Path for bootstrap should not be splitable

Is the error message right ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

void pathNotSplitableIfContainsDeltaFiles() throws IOException {
URI basePath = Files.createTempFile(tempDir, "target", ".parquet").toUri();
HoodieRealtimePath rtPath = new HoodieRealtimePath(new Path("foo"), "bar", basePath.toString(), Collections.emptyList(), "000", false, Option.empty());
assertTrue(new HoodieMergeOnReadTableInputFormat().isSplitable(fs, rtPath));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Supplement the error message.

Copy link
Copy Markdown
Contributor

@danny0405 danny0405 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@danny0405 danny0405 self-assigned this Nov 29, 2022
@hudi-bot
Copy link
Copy Markdown
Collaborator

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@codope codope added the priority:critical Production degraded; pipelines stalled label Nov 29, 2022
Copy link
Copy Markdown
Member

@codope codope left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch!

@codope codope merged commit fe43e6f into apache:master Nov 29, 2022
satishkotha pushed a commit that referenced this pull request Dec 13, 2022
…cords issue if it contains delta files while still splittable (#7264)
alexeykudinkin pushed a commit to onehouseinc/hudi that referenced this pull request Dec 14, 2022
…cords issue if it contains delta files while still splittable (apache#7264)
alexeykudinkin pushed a commit to onehouseinc/hudi that referenced this pull request Dec 14, 2022
…cords issue if it contains delta files while still splittable (apache#7264)
alexeykudinkin pushed a commit to onehouseinc/hudi that referenced this pull request Dec 14, 2022
…cords issue if it contains delta files while still splittable (apache#7264)
alexeykudinkin pushed a commit to onehouseinc/hudi that referenced this pull request Dec 14, 2022
…cords issue if it contains delta files while still splittable (apache#7264)
alexeykudinkin pushed a commit to onehouseinc/hudi that referenced this pull request Dec 14, 2022
…cords issue if it contains delta files while still splittable (apache#7264)
fengjian428 pushed a commit to fengjian428/hudi that referenced this pull request Apr 5, 2023
…cords issue if it contains delta files while still splittable (apache#7264)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

engine:hive Hive integration priority:critical Production degraded; pipelines stalled

Projects

Archived in project

Development

Successfully merging this pull request may close these issues.

4 participants