-
Notifications
You must be signed in to change notification settings - Fork 2.5k
[HUDI-5023] Consuming records from Iterator directly instead of using inner message queue #7174
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[HUDI-5023] Consuming records from Iterator directly instead of using inner message queue #7174
Conversation
|
Hi @alexeykudinkin and @nsivabalan |
|
Hi @alexeykudinkin Sorry to bother u. Just keep on common method in HoodieMessageQueue which is necessary. So would u mind to take a look at your convert? Really appreciate it! |
alexeykudinkin
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zhangyue19921010 thanks for following up on this one!
| * Advantages: there is no need for additional memory and cpu resources due to lock or multithreading. | ||
| * Disadvantages: lost some benefits such as speed limit. And maybe lower throughput. | ||
| */ | ||
| public class SimpleHoodieExecutor<I, O, E> extends HoodieExecutorBase<I, O, E> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's actually simplify this even further and just inherit from HoodieExecutor
|
@zhangyue19921010 let's also rebase on top of this one: #7238 |
|
For those queries about relative performance of BIMQ/Disruptor/Simple on EMR cluster, i've captured this in: #6843 (comment) Pasting the results in here: |
|
@zhangyue19921010 Can you please rebase? |
|
Hi @alexeykudinkin and @codope Rebased this PR. |
| /** | ||
| * Wrapper of input records iterator | ||
| */ | ||
| public class SimpleHoodieMessageQueue<I, O> implements HoodieMessageQueue<I, O>, Iterable<O> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zhangyue19921010 i think we need to clean this one up -- we don't actually need it as we use to only pass an iterator essentially
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed.
| * Advantages: there is no need for additional memory and cpu resources due to lock or multithreading. | ||
| * Disadvantages: lost some benefits such as speed limit. And maybe lower throughput. | ||
| */ | ||
| public class SimpleHoodieExecutor<I, O, E> extends BaseHoodieQueueBasedExecutor<I, O, E> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not queue-based, let's make it inherit from HoodieExecutor instead
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mind cleaning up some generics which are not used anymore w/in HoodieExecutor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All done. Also verify the performance.
Thanks for your review!
| import static org.mockito.Mockito.mock; | ||
| import static org.mockito.Mockito.when; | ||
|
|
||
| public class TestSimpleExecutionInSpark extends HoodieClientTestHarness { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @@alexeykudinkin
Not involved testInterruptExecutor in this SimpleExecutionSpark Test.
Bcz there is no inner Thread pool in this new SimpleExecutor so there is nothing to interrupt right?
Is that looks good to u ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that makes sense
|
@hudi-bot run azure |
| import static org.mockito.Mockito.mock; | ||
| import static org.mockito.Mockito.when; | ||
|
|
||
| public class TestSimpleExecutionInSpark extends HoodieClientTestHarness { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that makes sense
| // records iterator | ||
| protected final Iterator<I> it; | ||
| private final Function<I, O> transformFunction; | ||
| private final Runnable preExecuteRunnable; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need this one (this one is only required for these that spin up new threads)
| * | ||
| * BoundInMemory Executor 34661 35143 292 0.3 3466.1 1.0X | ||
| * Simple Executor 17347 17796 681 0.6 1734.7 2.0X | ||
| * Disruptor Executor 15803 16535 936 0.6 1580.3 2.2X |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's amend the commentary at the top that this benchmark has been run w/ unconstrained parallelism which obviously is beneficial to Disruptor more than it's for Simple
| return isWriteDone.get(); | ||
| } | ||
|
|
||
| public boolean isRunning() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need this one (we only use it in test, and can actually avoid that assertion altogether, since we're asserting # of records)
| private final Function<I, O> transformFunction; | ||
| private final Runnable preExecuteRunnable; | ||
| private final AtomicBoolean isWriteDone = new AtomicBoolean(false); | ||
| private final AtomicBoolean isShutdown = new AtomicBoolean(false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's actually no state for us to clean up so we can just kill these 2
| LOG.info("Starting consumer, consuming records from the records iterator directly"); | ||
| preExecuteRunnable.run(); | ||
| while (it.hasNext()) { | ||
| if (isShutdown.get()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need this check since
- We don't expect to shutdown executor in the middle of iteration (in the normal course)
- In case of exception this would be interrupted naturally
| try { | ||
| exec = new SimpleHoodieExecutor(hoodieRecords.iterator(), consumer, getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA), getPreExecuteRunnable()); | ||
| int result = exec.execute(); | ||
| // It should buffer and write 100 records |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's no buffering in here, let's update this one
|
Hey @alexeykudinkin All comments addressed. PTAL |
alexeykudinkin
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for addressing the feedback @zhangyue19921010!
LGTM, left minor comments regarding the test, PTAL
| cleanupResources(); | ||
| } | ||
|
|
||
| private Runnable getPreExecuteRunnable() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need this one
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed.
| // collect all records and assert that consumed records are identical to produced ones | ||
| // assert there's no tampering, and that the ordering is preserved | ||
| assertEquals(hoodieRecords, consumedRecords); | ||
| for (int i = 0; i < hoodieRecords.size(); i++) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need this for-loop, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeap. Removed. Sorry for missing this one.
| }); | ||
|
|
||
| HoodieWriteConfig hoodieWriteConfig = mock(HoodieWriteConfig.class); | ||
| when(hoodieWriteConfig.getDisruptorWriteBufferSize()).thenReturn(Option.of(16)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to clean up these
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed.
|
Hi @alexeykudinkin All comments are addressed. PTAL. |
|
Awesome job @zhangyue19921010! Thanks for taking this up from idea to completion! |
| @Override | ||
| public E execute() { | ||
| checkState(this.consumer.isPresent()); | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since consumer aleays exists, there is no need to keep it in an Option, we can then avoids the invocation Option#get actually.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@danny0405 addressing this in #7476
… inner message queue (apache#7174) Followed PR apache#5416 Add a new Executor Type named SIMPLE for hoodie records writer. This Simple executor is Single Writer and Single Reader mode. Also this SimpleHoodieExecutor has no inner message queue and no inner lock which can consume and writing records from iterator directly. Advantages: There is no need for additional memory and cpu resources due to lock or multithreading. Disadvantages: lost some benefits such as speed limit. And can not de-coupe the network read (shuffle read) and network write (writing objects/files to storage) anymore which may cause a little lower throughput compared with DisruptorExecutor. Also I did a quick benchmark using hoodie benchmark framework org.apache.spark.sql.execution.benchmark.BoundInMemoryExecutorBenchmark Minimize the impact of producers and consumers efficiency issue as much as possible to focus on testing the throughput limit of the inner queue The result are OpenJDK 64-Bit Server VM 1.8.0_342-b07 on Linux 5.10.62-55.141.amzn2.x86_64 Intel(R) Xeon(R) Platinum 8259CL CPU @ 2.50GHz COW Ingestion: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ BoundInMemory Executor 34661 35143 292 0.3 3466.1 1.0X Simple Executor 17347 17796 681 0.6 1734.7 2.0X Disruptor Executor 15803 16535 936 0.6 1580.3 2.2X this Simple Executor has good throughput and minimal resource usage. Also add the corresponding UTs

Change Logs
Followed PR #5416
Add a new Executor Type named
SIMPLEfor hoodie records writer.This Simple executor is
Single Writer and Single Reader mode. Also this SimpleHoodieExecutor has no inner message queue and no inner lock which can consume and writing records from iterator directly.Advantages: There is no need for additional memory and cpu resources due to lock or multithreading.
Disadvantages: lost some benefits such as speed limit. And can not de-coupe the network read (shuffle read) and network write (writing objects/files to storage) anymore which may cause a little lower throughput compared with DisruptorExecutor.
Also I did a quick benchmark using hoodie benchmark framework
org.apache.spark.sql.execution.benchmark.BoundInMemoryExecutorBenchmarkMinimize the impact of producers and consumers efficiency issue as much as possible to focus on testing the throughput limit of the inner queue
The result are
this Simple Executor has good throughput and minimal resource usage.
Also add the corresponding UTs
Of course we need to unify all these executor/messageQueue related UTs as much as possible.
Create a new Tickets https://issues.apache.org/jira/browse/HUDI-5106
Will do it as next step
Impact
No impact
Risk level (write none, low medium or high below)
low
Documentation Update
no
Contributor's checklist