Skip to content

Conversation

@alexeykudinkin
Copy link
Contributor

@alexeykudinkin alexeykudinkin commented Dec 16, 2022

Change Logs

This change switches default Write Executor to be SIMPLE ie one bypassing reliance on any kind of Queue (either BoundedInMemory or Disruptor's one).

This should considerably trim down on

  • Runtime (compared to BIMQ)
  • Compute wasted (compared to BIMQ, Disruptor)

Since it eliminates unnecessary intermediary "staging" of the records in the queue (for ex, in Spark such in-memory enqueueing occurs at the ingress points, ie shuffling), and allows to handle records writing in one pass (even avoiding making copies of the records in the future)

Impact

Users w/ upsert-/insert-heavy payloads should see considerable boost in writing performance.

Risk level (write none, low medium or high below)

Low

Documentation Update

N/A

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

@alexeykudinkin
Copy link
Contributor Author

@hudi-bot run azure

@alexeykudinkin alexeykudinkin changed the title [MINOR] Switching default Write Executor type to SIMPLE [HUDI-5023] Switching default Write Executor type to SIMPLE Dec 19, 2022
@alexeykudinkin alexeykudinkin requested a review from yihua December 21, 2022 18:44
@alexeykudinkin alexeykudinkin added priority:blocker Production down; release blocker area:performance Performance optimizations labels Dec 21, 2022
@alexeykudinkin
Copy link
Contributor Author

@hudi-bot run azure

Comment on lines 2457 to 2460
public Builder withWriteBufferSize(int size) {
writeConfig.setValue(WRITE_DISRUPTOR_BUFFER_SIZE, String.valueOf(size));
writeConfig.setValue(WRITE_EXECUTOR_DISRUPTOR_BUFFER_SIZE, String.valueOf(size));
return this;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: If this is not intended to use as a public API, could you remove this and only keep Builder#withWriteExecutorDisruptorWriteBufferSize which has the same functionality?

}

@VisibleForTesting
public HoodieWriteConfig build(boolean shouldValidate) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this method use the default scope/visibility instead of public?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Problem is that it's being used in tests in different packages

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. We should think about how to hide this, so it is not used by users accidentally.

private final String instantTime = HoodieActiveTimeline.createNewInstantTime();


private final HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: (not required in this PR) the tests using different types of write executors should be generalized in a base class, like TestWriteMarkersBase, instead of duplicating the code. This can be refactored later on.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. Created a task to follow-up HUDI-5622


try {
exec = new SimpleHoodieExecutor(hoodieRecords.iterator(), consumer, getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA));
exec = new SimpleExecutor(hoodieRecords.iterator(), consumer, Function.identity());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still use getTransformer instead of Function.identity() for the tests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We actually don't need to clone in that case

@vinothchandar
Copy link
Member

@alexeykudinkin are we claiming that there is no downside to this? esp on large file size? does this still provide same performance. Love to understand benchmarks

@alexeykudinkin alexeykudinkin force-pushed the ak/wrt-exec-swc branch 2 times, most recently from 4c401fc to 5935539 Compare January 27, 2023 00:28
@hudi-bot
Copy link
Collaborator

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

.withDocumentation("Size of in-memory buffer used for parallelizing network reads and lake storage writes.");

public static final ConfigProperty<String> WRITE_DISRUPTOR_BUFFER_SIZE = ConfigProperty
public static final ConfigProperty<String> WRITE_EXECUTOR_DISRUPTOR_BUFFER_SIZE = ConfigProperty
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Let's take this in a separate PR) What is the unit of this config (B, KB, or MB)? We should mention the unit in the config naming.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline: will follow-up w/ separate PR addressing this

}

@VisibleForTesting
public HoodieWriteConfig build(boolean shouldValidate) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. We should think about how to hide this, so it is not used by users accidentally.

exec = new DisruptorExecutor(hoodieWriteConfig.getDisruptorWriteBufferSize(), hoodieRecords.iterator(), consumer,
getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA), Option.of(WaitStrategyFactory.DEFAULT_STRATEGY), getPreExecuteRunnable());
exec = new DisruptorExecutor<>(writeConfig.getWriteExecutorDisruptorWriteBufferSize(), hoodieRecords.iterator(), consumer,
Function.identity(), WaitStrategyFactory.DEFAULT_STRATEGY, getPreExecuteRunnable());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this also use getTransformer(HoodieTestDataGenerator.AVRO_SCHEMA, writeConfig)?

Copy link
Contributor Author

@alexeykudinkin alexeykudinkin Jan 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no reason to over-complicate this test -- it's goal is to test the Executor not how it's being used

getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA), Option.of(WaitStrategyFactory.DEFAULT_STRATEGY), getPreExecuteRunnable());
DisruptorExecutor<HoodieRecord, HoodieRecord, Integer>
executor = new DisruptorExecutor<>(1024, hoodieRecords.iterator(), consumer,
Function.identity(), WaitStrategyFactory.DEFAULT_STRATEGY, getPreExecuteRunnable());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similar here


try {
exec = new SimpleHoodieExecutor(hoodieRecords.iterator(), consumer, getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA));
exec = new SimpleExecutor<>(hoodieRecords.iterator(), consumer, Function.identity());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar here for getTransformer. The production code passes the transfer in by calling getTransformer so let's follow the same in the test code.


try {
exec = new SimpleHoodieExecutor(hoodieRecords.iterator(), consumer, getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA));
exec = new SimpleExecutor<>(hoodieRecords.iterator(), consumer, Function.identity());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similar here

SimpleHoodieExecutor<HoodieRecord, Tuple2<HoodieRecord, Option<IndexedRecord>>, Integer> exec =
new SimpleHoodieExecutor(iterator, consumer, getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA));
SimpleExecutor<HoodieRecord, HoodieRecord, Integer> exec =
new SimpleExecutor<>(iterator, consumer, Function.identity());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similar here

@yihua
Copy link
Contributor

yihua commented Jan 27, 2023

@alexeykudinkin let's make sure we cover different scenarios (including the different base file sizes) in the benchmarking, and that there is no regression in common use cases before landing this PR.

Copy link
Contributor

@yihua yihua left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@alexeykudinkin alexeykudinkin merged commit ff590c6 into apache:master Jan 28, 2023
yihua pushed a commit that referenced this pull request Jan 30, 2023
This change switches default Write Executor to be SIMPLE ie one bypassing reliance on any kind of Queue (either BoundedInMemory or Disruptor's one).

This should considerably trim down on

Runtime (compared to BIMQ)
Compute wasted (compared to BIMQ, Disruptor)
Since it eliminates unnecessary intermediary "staging" of the records in the queue (for ex, in Spark such in-memory enqueueing occurs at the ingress points, ie shuffling), and allows to handle records writing in one pass (even avoiding making copies of the records in the future)
fengjian428 pushed a commit to fengjian428/hudi that referenced this pull request Jan 31, 2023
…#7476)

This change switches default Write Executor to be SIMPLE ie one bypassing reliance on any kind of Queue (either BoundedInMemory or Disruptor's one).

This should considerably trim down on

Runtime (compared to BIMQ)
Compute wasted (compared to BIMQ, Disruptor)
Since it eliminates unnecessary intermediary "staging" of the records in the queue (for ex, in Spark such in-memory enqueueing occurs at the ingress points, ie shuffling), and allows to handle records writing in one pass (even avoiding making copies of the records in the future)
fengjian428 pushed a commit to fengjian428/hudi that referenced this pull request Apr 5, 2023
…#7476)

This change switches default Write Executor to be SIMPLE ie one bypassing reliance on any kind of Queue (either BoundedInMemory or Disruptor's one).

This should considerably trim down on

Runtime (compared to BIMQ)
Compute wasted (compared to BIMQ, Disruptor)
Since it eliminates unnecessary intermediary "staging" of the records in the queue (for ex, in Spark such in-memory enqueueing occurs at the ingress points, ie shuffling), and allows to handle records writing in one pass (even avoiding making copies of the records in the future)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:performance Performance optimizations priority:blocker Production down; release blocker

Projects

Archived in project

Development

Successfully merging this pull request may close these issues.

4 participants