Skip to content

Conversation

@alexeykudinkin
Copy link
Contributor

Change Logs

BoundedInMemoryExecutor add considerable overhead laying in the hot-path of writing the records w/ very little practical benefit. This PR removes it from the hot-path of the few flows where we can bypass BIME completely.

Impact

Describe any public API or user-facing feature change or any performance impact.

Risk level: none | low | medium | high

Choose one. If medium or high, explain what verification was done to mitigate the risks.

Documentation Update

Describe any necessary documentation update if there is any new feature, config, or user-facing change

  • The config description must be updated if new configs are added or the default value of the configs are changed
  • Any new feature or user-facing change requires updating the Hudi website. Please create a Jira ticket, attach the
    ticket number here and follow the instruction to make
    changes to the website.

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

@alexeykudinkin alexeykudinkin marked this pull request as draft September 30, 2022 21:53
@alexeykudinkin alexeykudinkin changed the title [WIP] Avoiding using BoundedInMemoryExecutor on the hot-path [WIP][HUDI-5023] Avoiding using BoundedInMemoryExecutor on the hot-path Oct 12, 2022
@xushiyan xushiyan added the status:in-progress Work in progress label Oct 31, 2022
@alexeykudinkin
Copy link
Contributor Author

alexeykudinkin commented Nov 11, 2022

BIMQ (baseline): ~5120s
Disruptor: ~5030s (2% improvement from baseline)
No Queue: ~3800s (~26% improvement from baseline)

#
# BoundedInMemoryQueue (baseline, master)
#

==================================================
Total time taken by all rounds (hudi): 5119338
Per round: List(204855, 504984, 585545, 646676, 516914, 554007, 510029, 533198, 554438, 508692)
==================================================


#
# Disruptor (master)
# Performance Improvement (from baseline): ~2%
#

===================================================
Total time taken by all rounds (hudi): 5027185
Per round: List(142641, 450997, 544476, 576815, 552873, 585657, 526764, 552790, 563460, 530712)
==================================================

#
# No queue
# Performance Improvement (from baseline): ~26%
#

==================================================
Total time taken by all rounds (hudi): 3798022
Per round: List(189704, 467253, 512628, 468869, 363557, 361869, 351934, 357554, 364299, 360355)
==================================================

After chatting w/ @vinothchandar on this, i went to double check how Spark actually iterates over the output of ShuffleTasks, and apparently it does already implement similar queuing pattern at that level -- see ShuffleBlockFetcherIterator:

  1. ShuffledRDD will produce a BlockStoreShuffleReader, which in turn
  2. Will instantiate ShuffleBlockFetcherIterator, which in turn
  3. Will be fetching blocks async (sending out FetchRequests) and collecting blocks w/in results up-to predefined threshold

As such, there's no point for us to implement the same queueing mechanism at our level provided that Hudi will already by iterating over the records which are cached in memory.

@alexeykudinkin
Copy link
Contributor Author

Turned out that my previous run was actually not using Disruptor at all: had to put up a #7188 to make sure Disruptor could be used when going t/h merging as well.

After fixing, and actually using Disruptor performance become even worse:

#
# Disruptor (master)
# Performance Improvement (from baseline): -2%
#

==================================================
Total time taken by all rounds (hudi): 5154329
Per round: List(182233, 493908, 582993, 569302, 560117, 583116, 530124, 552918, 583238, 516380)
==================================================

@zhangyue19921010
Copy link
Contributor

zhangyue19921010 commented Nov 12, 2022

Hey @alexeykudinkin Thanks a lot for this PR!

I'd like to discuss some details if u don't mind :)

Based on our experience, the performance is different between disruptor, BIMQ, and no queue at all in different scenarios.
For example, schema is pretty simple, and the bottleneck is lock so disruptor can be helpful.

For example in this PR #7174 test(Although it is only a local benchmark, it can also be a reproducible evidence) different queue do has different advantages.

So would u think we can just remove the design of Executor OR we can implement different executor including this no inner message queue. Reasons I think we can keep this executor design is that

  1. We can both support multi-writers ex RealtimeUnmergedRecordReader and single-writer/single-reader model.
  2. This ability to unified writing-core-logic can be much easier to leverage to flink side and other engine in the future.
  3. Other writing pipeline such as writing hudi using Java API directly may still need this producer/consumer model.
  4. We can do more like monitor or rate limit based on this Executor

Also this no inner queue is my favor, because it need no extra memory and cpu resources and quite simple.
Although can not support multi-writers, maybe we can use it as default one instead of BoundedInMemoryQueue as next step.

Looking forward your replay. Thanks!

@alexeykudinkin
Copy link
Contributor Author

Hey, @zhangyue19921010!

Yeah, this PR was put up purely for experimental purposes even before we finalized previous PR landing disruptor which (considerably improved the API!), so i was primarily looking into putting up quick solution i can quickly benchmark against and validate.

In terms of taking this forward, i have following thoughts:

  • We should try to preserve existing queue implementations, so that users could experiment in their environment and always pick what works best for them.
  • This variability of course should not be coming at the expense of performance.
  • We should maybe take another holistic look on the executor APIs and try to see if we can simplify them even further (by, say, eliminating what we don't really need)

Our goal would be to fit in such SimpleExecutor into existing framework in a way that would allow all of the aforementioned ones to hold true.

P.S. What surprised me actually was that i was NOT able to reproduce 20% increase in performance for Disruptor on our benchmarks that you folks were seeing in your environment. First issue was due to #7188, but even after addressing it i still don't see an improvement.

@zhangyue19921010
Copy link
Contributor

zhangyue19921010 commented Nov 15, 2022

Hey @alexeykudinkin ! Thanks for your response!

Yeah, this PR was put up purely for experimental purposes even before we finalized previous PR landing disruptor which (considerably improved the API!), so i was primarily looking into putting up quick solution i can quickly benchmark against and validate.

In terms of taking this forward, i have following thoughts:

  • We should try to preserve existing queue implementations, so that users could experiment in their environment and always pick what works best for them.
  • This variability of course should not be coming at the expense of performance.
  • We should maybe take another holistic look on the executor APIs and try to see if we can simplify them even further (by, say, eliminating what we don't really need)

Our goal would be to fit in such SimpleExecutor into existing framework in a way that would allow all of the aforementioned ones to hold true.

Totally agree with u! Let's make it happen. And I will tune #7174 this simpleExecutor pr to simplify APIs even further and remove what we don' t need.

P.S. What surprised me actually was that i was NOT able to reproduce 20% increase in performance for Disruptor on our benchmarks that you folks were seeing in your environment. First issue was due to #7188, but even after addressing it i still don't see an improvement.

Would u mind to share more test infos about your test? for example records number, cpu/memory resources and schema maybe if u want.

From our experience, we have two kinds of spark-streaming ingestion job.

  1. For aggregated table which has no unique hoodie keys, so that we use insert or bulkinsert+clustering to do this hudi ingestion. And our performance test is based on this case.
  2. For raw data, use upsert operation. Still on going using disruptor.

More details for our performance test :
Hudi version: 0.10.1
Spark Version: 3.0.2 spark streaming
Records number per batch(max): 754932000
Schema: 18 columns

{
  "type": "record",
  "name": "f_order_sa_xxx_hourly_hudi",
  "namespace": "tv.xxxx.schemas",
  "fields": [
    {"name": "timestamp", "type": "long", "doc": "category=timestamp"},
    {"name": "network_id", "type": ["null", "long"], "default": null, "doc": "category=dimension"},
    {"name": "xx_id", "type": ["null", "long"], "default": null, "doc": "category=dimension"},
    {"name": "xx_id", "type": ["null", "long"], "default": null, "doc": "category=dimension"},
    {"name": "xxx_id", "type": ["null", "long"], "default": null, "doc": "category=dimension"},
    {"name": "xxx_id", "type": ["null", "int"], "default": null, "doc": "category=dimension"},
    {"name": "xxx_id", "type": ["null", "int"], "default": null, "doc": "category=dimension"},
    {"name": "xxxx_visibility", "type": ["null", "string"], "default": null, "doc": "category=dimension"},
    {"name": "xxx_owner_id", "type": ["null", "int"], "default": null, "doc": "category=dimension"},
    {"name": "sxxx_endpoint_id", "type": ["null", "int"], "default": null, "doc": "category=dimension"},
    {"name": "xxxx_order_id", "type": ["null", "long"], "default": null, "doc": "category=dimension"},
    {"name": "xxx_source", "type": ["null", "int"], "default": null, "doc": "category=dimension"},
    {"name": "xxx_type", "type": ["null", "long"], "default": null, "doc": "category=dimension"},
    {"name": "content_xxx_id", "type": ["null", "long"], "default": null, "doc": "category=dimension"},
    {"name": "xxx_publisher_id", "type": ["null", "string"], "default": null, "doc": "category=dimension"},
    {"name": "xxx_order_id", "type": ["null", "long"], "default": null, "doc": "category=dimension"},
    {"name": "xxx_ad_views", "type": ["null", "long"], "default": null, "doc": "category=metric"},
    {"name": "revenue", "type": ["null", "double"], "default": null, "doc": "category=metric"}
  ]
}

Insert/bulk_insert performance Benchmark between BIMQ (baseline) and Disruptor with same kafka input, resources and configs.

BIMQ: used 7.9 min to finish writing parquets.
image

Disruptor used 5.6 min to finish writing parquets
image

In terms of Case 1 write performance, Disruptor improved about 29% from 7.9min to 5.6min

@alexeykudinkin
Copy link
Contributor Author

Closing in favor of #7174

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

status:in-progress Work in progress

Projects

Status: ✅ Done

Development

Successfully merging this pull request may close these issues.

3 participants