Skip to content

Conversation

@codope
Copy link
Member

@codope codope commented Jun 23, 2021

What is the purpose of the pull request

This pull request adds async clustering support for HoodieDeltaStreamer and Spark streaming writes to Hudi table.

Brief change log

  • Async clustering is configurable.
  • Reuses the existing clustering methods in write client and timeline.
  • Hence, it has the same limitations that exist with current clustering strategy i.e. updates are rejected.

Verify this pull request

This change added tests and can be verified as follows:

  • Added unit tests in TestHoodieDeltaStreamer and TestStructuredStreaming.
  • Manually verified the change by running a job locally.

Committer checklist

  • Has a corresponding JIRA in PR title & commit

  • Commit message is descriptive of the change

  • CI is green

  • Necessary doc changes done or have another open PR

  • For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.

@hudi-bot
Copy link
Collaborator

hudi-bot commented Jun 23, 2021

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run travis re-run the last Travis build
  • @hudi-bot run azure re-run the last Azure build

@codecov-commenter
Copy link

codecov-commenter commented Jun 23, 2021

Codecov Report

Merging #3142 (7d4f981) into master (9b01d2a) will decrease coverage by 1.70%.
The diff coverage is 49.14%.

Impacted file tree graph

@@             Coverage Diff              @@
##             master    #3142      +/-   ##
============================================
- Coverage     47.67%   45.97%   -1.71%     
+ Complexity     5516     4727     -789     
============================================
  Files           929      832      -97     
  Lines         41303    37953    -3350     
  Branches       4144     3811     -333     
============================================
- Hits          19692    17449    -2243     
+ Misses        19863    18887     -976     
+ Partials       1748     1617     -131     
Flag Coverage Δ
hudicli 39.97% <ø> (ø)
hudiclient 22.89% <9.52%> (-11.68%) ⬇️
hudicommon 48.56% <0.00%> (-0.03%) ⬇️
hudiflink 60.03% <ø> (ø)
hudihadoopmr 51.29% <ø> (ø)
hudisparkdatasource 67.30% <56.66%> (-0.02%) ⬇️
hudisync 54.51% <ø> (ø)
huditimelineservice 64.07% <ø> (ø)
hudiutilities 59.26% <90.19%> (+0.69%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
.../org/apache/hudi/async/AsyncClusteringService.java 0.00% <0.00%> (ø)
...ava/org/apache/hudi/async/AsyncCompactService.java 0.00% <0.00%> (ø)
...java/org/apache/hudi/async/HoodieAsyncService.java 0.00% <0.00%> (ø)
...g/apache/hudi/client/AbstractClusteringClient.java 0.00% <0.00%> (ø)
...java/org/apache/hudi/config/HoodieWriteConfig.java 42.80% <0.00%> (-0.08%) ⬇️
...a/org/apache/hudi/common/util/ClusteringUtils.java 88.40% <0.00%> (-1.31%) ⬇️
...in/scala/org/apache/hudi/HoodieStreamingSink.scala 28.00% <36.66%> (+4.00%) ⬆️
...n/scala/org/apache/hudi/HoodieSparkSqlWriter.scala 72.03% <72.00%> (+0.76%) ⬆️
...org/apache/hudi/config/HoodieClusteringConfig.java 71.28% <75.00%> (+0.01%) ⬆️
...apache/hudi/utilities/deltastreamer/DeltaSync.java 71.56% <75.00%> (+0.42%) ⬆️
... and 110 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 9b01d2a...7d4f981. Read the comment docs.

@vinothchandar vinothchandar self-assigned this Jun 23, 2021
@nsivabalan
Copy link
Contributor

@hudi-bot run azure

@nsivabalan
Copy link
Contributor

@hudi-bot run travis

Copy link
Contributor

@nsivabalan nsivabalan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Few high level questions/comments.

  • Now we have both clustering and compaction, I see that you have added clustering related code just after compaction where ever applicable. Is the higher priority for compaction intentional? or should we have clustering followed by compaction? or does it not matter at all.
  • I came across a class named SchedulerConfGenerator. Don't we need to make any changes here for async clustering?

I am done reviewing source code. Yet to review tests.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here also we can think of something like AsyncServiceClient.
may be we can have a common method as below.

   public abstract void doAction(HoodieInstant instant) throw IOException;

same abstract class for both clustering and compaction.
Not too strong on this suggestion though. Let's see what others have to say.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, lets try to add java docs for all public methods. I do understand that AbstractCompactor does not have java docs. Its fine. atleast for the code we write, lets try to add java docs.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same abstract class for both clustering and compaction.
Not too strong on this suggestion though. Let's see what others have to say.

@satishkotha what are your thoughts about this?

@codope
Copy link
Member Author

codope commented Jun 28, 2021

@nsivabalan Thanks for reviewing the PR. I agree with your source code comments. There is scope for reusability. I will address them and update the PR. For the high level questions, my response is as below.

  • Now we have both clustering and compaction, I see that you have added clustering related code just after compaction where ever applicable. Is the higher priority for compaction intentional? or should we have clustering followed by compaction? or does it not matter at all.

In case when both clustering and compaction are enabled then compaction will run just before clustering. The intention is that since currently compaction and clustering cannot run at the same time on the same file groups and clustering could take significant time, so let compaction thread start first. When clustering is scheduled for the filegroups under compaction it would be ignored and picked up in the subsequent run after compaction completes.

  • I came across a class named SchedulerConfGenerator. Don't we need to make any changes here for async clustering?

We will need to make changes here if we create separate job pool for clustering and assign weights for different jobs. Unlike compaction, I did not feel the need for a separate job pool for clustering. By default, each pool gets equal share of resource but within each pool, jobs run in FIFO order.

@codope codope force-pushed the async-clustering branch from 4a651f8 to 0138dad Compare June 30, 2021 13:20
@codope codope requested a review from nsivabalan June 30, 2021 13:21
@codope codope force-pushed the async-clustering branch from 0138dad to cd27818 Compare July 2, 2021 15:45
Copy link
Contributor

@nsivabalan nsivabalan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yet to review tests. Will review in a day or two.

Copy link
Contributor

@nsivabalan nsivabalan Jul 2, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

out of curiosity. how come we have maxPendingCompaction/Clustering property defined as first class(top level) config for multiTableDeltaStreamer, but don't see the properties to enable/disable them. I assume those are fetched from property file for each source. So, why not fetch these properties also from the property file? I know this is not specific to clustering, but it was how compaction was defined.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a valid point. I followed how we defined for compaction. We could have enable/disable configs as first class configs. Anything related to a particular feature/service could be passed in property file or as --hoodie-conf. This will avoid config bloating in delta streamer.

@bvaradar Was there any reason for doing it this way?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vinothchandar vinothchandar added the priority:blocker Production down; release blocker label Jul 5, 2021
@codope codope force-pushed the async-clustering branch from cd27818 to 2d1f238 Compare July 6, 2021 13:34
Copy link
Contributor

@nsivabalan nsivabalan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tests coverage:

  • Do we have tests where both compaction and clustering are async triggered? (both spark streaming and deltastreamer) ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@codope codope force-pushed the async-clustering branch 2 times, most recently from cea6548 to 890e982 Compare July 9, 2021 05:42
@codope
Copy link
Member Author

codope commented Jul 9, 2021

Tests coverage:

  • Do we have tests where both compaction and clustering are async triggered? (both spark streaming and deltastreamer) ?

Added such test in TestHoodieDeltaStreamer and TestStructuredStreaming.

@codope
Copy link
Member Author

codope commented Jul 9, 2021

@hudi-bot run azure

@codope codope force-pushed the async-clustering branch from 890e982 to f86f50e Compare July 9, 2021 17:04
@codope
Copy link
Member Author

codope commented Jul 9, 2021

@hudi-bot run azure

…aming

Integrate async clustering service with HoodieDeltaStreamer and HoodieStreamingSink

Added methods in HoodieAsyncService to reuse code

Move some common methods to HoodieAsyncService

Add tests for clustering with compaction

Rename methods appropriately

Disable inline clustering

Fix CI failures
@codope codope force-pushed the async-clustering branch from f86f50e to 7d4f981 Compare July 11, 2021 16:11
@nsivabalan nsivabalan merged commit 5804ad8 into apache:master Jul 11, 2021
@zhangyue19921010
Copy link
Contributor

zhangyue19921010 commented Jul 12, 2021

Hi @codope Just want to know, is this Async clustering function can handle the following scenarios and losing no data:

There are 3 small file groups named fg1, fg2 and fg3 contained file slice1, file slice2 and file slices3 separately.

When async schedule start to make a cluster plan but not finished, there is an inflight or requested commit for fg1 which will create file slice 11 based on file slice1. In other words file slice11 is creating but not committed ---> I believe this scene is similar to multi writers.

What does this async clustering function will do?
Will this clustering plan contains file slice1? if contained, I think the new data in file slice11 will be lost.

Looking forward to your reply, thanks a lot.

@codope
Copy link
Member Author

codope commented Jul 13, 2021

Hi @codope Just want to know, is this Async clustering function can handle the following scenarios and losing no data:

There are 3 small file groups named fg1, fg2 and fg3 contained file slice1, file slice2 and file slices3 separately.

When async schedule start to make a cluster plan but not finished, there is an inflight or requested commit for fg1 which will create file slice 11 based on file slice1. In other words file slice11 is creating but not committed ---> I believe this scene is similar to multi writers.

What does this async clustering function will do?
Will this clustering plan contains file slice1? if contained, I think the new data in file slice11 will be lost.

Looking forward to your reply, thanks a lot.

@zhangyue19921010 It will depend on what point of time during clustering planning file slice11 is created. If it is before the ClusteringPlanStrategy#getFileSlicesEligibleForClustering is invoked then clustering plan will not contain file slice1. So, just like multi writers there is a race condition here. However, while actually clustering, the default (and currently only) strategy is to reject updates. So, it will throw exception after seeing that there is an a filegroup with update (in this case fg1). This should get picked up in the next run of clustering.

@zhangyue19921010
Copy link
Contributor

Hi @codope Thanks for your response. What if after ClusteringPlanStrategy#getFileSlicesEligibleForClustering file slice11 were created which means file slice1 was picked up for clustering. After this plan execute, maybe new data in slice11 was lost.

Samrat002 pushed a commit to Samrat002/hudi that referenced this pull request Jul 15, 2021
change the insret overwrte return type

[HUDI-1860] Test wrapper for insert_overwrite and insert_overwrite_table

[HUDI-2084] Resend the uncommitted write metadata when start up (apache#3168)

Co-authored-by: 喻兆靖 <[email protected]>

[HUDI-2081] Move schema util tests out from TestHiveSyncTool (apache#3166)

[HUDI-2094] Supports hive style partitioning for flink writer (apache#3178)

[HUDI-2097] Fix Flink unable to read commit metadata error (apache#3180)

[HUDI-2085] Support specify compaction paralleism and compaction target io for flink batch compaction (apache#3169)

[HUDI-2092] Fix NPE caused by FlinkStreamerConfig#writePartitionUrlEncode null value (apache#3176)

[HUDI-2006] Adding more yaml templates to test suite (apache#3073)

[HUDI-2103] Add rebalance before index bootstrap (apache#3185)

Co-authored-by: 喻兆靖 <[email protected]>

[HUDI-1944] Support Hudi to read from committed offset (apache#3175)

* [HUDI-1944] Support Hudi to read from committed offset

* [HUDI-1944] Adding group option to KafkaResetOffsetStrategies

* [HUDI-1944] Update Exception msg

[HUDI-2052] Support load logFile in BootstrapFunction (apache#3134)

Co-authored-by: 喻兆靖 <[email protected]>

[HUDI-89] Add configOption & refactor all configs based on that (apache#2833)

Co-authored-by: Wenning Ding <[email protected]>

[MINOR] Update .asf.yaml to codify notification settings, turn on jira comments, gh discussions (apache#3164)

- Turn on comment for jira, so we can track PR activity better
- Create a notification settings that match https://gitbox.apache.org/schemes.cgi?hudi
- Try and turn on "discussions" on Github, to experiment

[MINOR] Fix broken build due to FlinkOptions (apache#3198)

[HUDI-2088] Missing Partition Fields And PreCombineField In Hoodie Properties For Table Written By Flink (apache#3171)

[MINOR] Add Documentation to KEYGENERATOR_TYPE_PROP (apache#3196)

[HUDI-2105] Compaction Failed For MergeInto MOR Table (apache#3190)

[HUDI-2051] Enable Hive Sync When Spark Enable Hive Meta  For Spark Sql (apache#3126)

[HUDI-2112] Support reading pure logs file group for flink batch reader after compaction (apache#3202)

[HUDI-2114] Spark Query MOR Table Written By Flink Return Incorrect Timestamp Value (apache#3208)

[HUDI-2121] Add operator uid for flink stateful operators (apache#3212)

[HUDI-2123]  Exception When Merge With Null-Value Field (apache#3214)

[HUDI-2124] A Grafana dashboard for HUDI. (apache#3216)

[HUDI-2057]  CTAS Generate An External Table When Create Managed Table (apache#3146)

[HUDI-1930] Bootstrap support configure KeyGenerator by type (apache#3170)

* [HUDI-1930] Bootstrap support configure KeyGenerator by type

[HUDI-2116] Support batch synchronization of partition datas to  hive metastore to avoid oom problem (apache#3209)

[HUDI-2126] The coordinator send events to write function when there are no data for the checkpoint (apache#3219)

[HUDI-2127] Initialize the maxMemorySizeInBytes in log scanner (apache#3220)

[HUDI-2058]support incremental query for insert_overwrite_table/insert_overwrite operation on cow table (apache#3139)

[HUDI-2129] StreamerUtil.medianInstantTime should return a valid date time string (apache#3221)

[HUDI-2131] Exception Throw Out When MergeInto With Decimal Type Field (apache#3224)

[HUDI-2122] Improvement in packaging insert into smallfiles (apache#3213)

[HUDI-2132] Make coordinator events as POJO for efficient serialization (apache#3223)

[HUDI-2106] Fix flink batch compaction bug while user don't set compaction tasks (apache#3192)

[HUDI-2133] Support hive1 metadata sync for flink writer (apache#3225)

[HUDI-2089]fix the bug that metatable cannot support non_partition table (apache#3182)

[HUDI-2028] Implement RockDbBasedMap as an alternate to DiskBasedMap in ExternalSpillableMap (apache#3194)

Co-authored-by: Rajesh Mahindra <[email protected]>

[HUDI-2135] Add compaction schedule option for flink (apache#3226)

[HUDI-2055] Added deltastreamer metric for time of lastSync (apache#3129)

[HUDI-2046] Loaded too many classes like sun/reflect/GeneratedSerializationConstructorAccessor in JVM metaspace (apache#3121)

Loaded too many classes when use kryo of spark to hudi

Co-authored-by: weiwei.duan <[email protected]>

[HUDI-1996] Adding functionality to allow the providing of basic auth creds for confluent cloud schema registry (apache#3097)

* adding support for basic auth with confluent cloud schema registry

[HUDI-2093] Fix empty avro schema path caused by duplicate parameters (apache#3177)

* [HUDI-2093] Fix empty avro schema path caused by duplicate parameters

* rename shcmea option key

* fix doc

* rename var name

[HUDI-2113] Fix integration testing failure caused by sql results out of order (apache#3204)

[HUDI-2016] Fixed bootstrap of Metadata Table when some actions are in progress. (apache#3083)

Metadata Table cannot be bootstrapped when any action is in progress. This is detected by the presence of inflight or requested instants. The bootstrapping is initiated in preWrite and postWrite of each commit. So bootstrapping will be retried again until it succeeds.
Also added metrics for when the bootstrapping fails or a table is re-bootstrapped. This will help detect tables which are not getting bootstrapped.

[HUDI-2140] Fixed the unit test TestHoodieBackedMetadata.testOnlyValidPartitionsAdded. (apache#3234)

[HUDI-2115] FileSlices in the filegroup is not descending by timestamp (apache#3206)

[HUDI-1104] Adding support for UserDefinedPartitioners and SortModes to BulkInsert with Rows (apache#3149)

[HUDI-2069] Refactored String constants (apache#3172)

[HUDI-1105] Adding dedup support for Bulk Insert w/ Rows (apache#2206)

[HUDI-2134]Add generics to avoif forced conversion in BaseSparkCommitActionExecutor#partition (apache#3232)

[HUDI-2009] Fixing extra commit metadata in row writer path (apache#3075)

[HUDI-2099]hive lock which state is WATING should be released, otherwise this hive lock will be locked forever (apache#3186)

[MINOR] Fix build broken from apache#3186 (apache#3245)

[HUDI-2136] Fix conflict when flink-sql-connector-hive and hudi-flink-bundle are both in flink lib (apache#3227)

[HUDI-2087] Support Append only in Flink stream (apache#3174)

Co-authored-by: 喻兆靖 <[email protected]>

UnitTest for deltaSync

Removing cosmetic changes and reuse function for insert_overwrite_table

unit test

intial unit test for the insert_overwrite and insert_over_write_table

Adding failed test code for insert_overwrite

Revert "[HUDI-2087] Support Append only in Flink stream (apache#3174)" (apache#3251)

This reverts commit 3715267.

[HUDI-2147] Remove unused class AvroConvertor in hudi-flink (apache#3243)

[MINOR] Fix some wrong assert reasons (apache#3248)

[HUDI-2087] Support Append only in Flink stream (apache#3252)

Co-authored-by: 喻兆靖 <[email protected]>

[HUDI-2143] Tweak the default compaction target IO to 500GB when flink async compaction is off (apache#3238)

[HUDI-2142] Support setting bucket assign parallelism for flink write task (apache#3239)

[HUDI-1483] Support async clustering for deltastreamer and Spark streaming (apache#3142)

- Integrate async clustering service with HoodieDeltaStreamer and HoodieStreamingSink
- Added methods in HoodieAsyncService to reuse code

[HUDI-2107] Support Read Log Only MOR Table For Spark (apache#3193)

[HUDI-2144]Bug-Fix:Offline clustering(HoodieClusteringJob) will cause insert action losing data (apache#3240)

* fixed

* add testUpsertPartitionerWithSmallFileHandlingAndClusteringPlan ut

* fix CheckStyle

Co-authored-by: yuezhang <[email protected]>

[MINOR] Fix EXTERNAL_RECORD_AND_SCHEMA_TRANSFORMATION config (apache#3250)

[HUDI-2168] Fix for AccessControlException for anonymous user (apache#3264)

[HUDI-2045] Support Read Hoodie As DataSource Table For Flink And DeltaStreamer

test with insert-overwrite and insert-overwrite-table

removing hardcoded action to pass the unit test

[HUDI-1969] Support reading logs for MOR Hive rt table (apache#3033)

[HUDI-2171] Add parallelism conf for bootstrap operator

using delta-commit for insert_overwrite
ghost pushed a commit to shivagowda/hudi that referenced this pull request Jul 16, 2021
…aming (apache#3142)

- Integrate async clustering service with HoodieDeltaStreamer and HoodieStreamingSink
- Added methods in HoodieAsyncService to reuse code
ghost pushed a commit to shivagowda/hudi that referenced this pull request Aug 1, 2021
…aming (apache#3142)

- Integrate async clustering service with HoodieDeltaStreamer and HoodieStreamingSink
- Added methods in HoodieAsyncService to reuse code
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

priority:blocker Production down; release blocker

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants