Skip to content

Conversation

@nsivabalan
Copy link
Contributor

@nsivabalan nsivabalan commented Oct 11, 2020

What is the purpose of the pull request

  • Adding support for validating entire dataset and long running tests in test suite framework. The entire dag declared can be run repeatedly for N no of times. Two config params can be set for repeated test runs, namely --num-rounds and --delay-between-rounds-mins. So, lets say your dag looks like this
insert 1000
update 200
validate

and the configs are set as follows.
--num-rounds = 5
--delay-between-rounds-mins = 10

This will essentially run the (insert, update) for every 10 mins for 5 times.
Since validation can't be statically defined for no of expected records, a new validation node is introduced.
ValidateDatasetNode: will read entire input batch and compare w/ hudi contents. (row equality comparison).
No config needs to be set by default. But there is an optional config called "delete_input_data". If set, after validation is complete, input data will be deleted. This will come in handy when running long running tests.

An example for validation w/ delete_input_data set.

Round1: 
    insert => inputPath/batch1
    upsert -> inputPath/batch2
    Validate with delete_input_data = true
              Validates contents from batch1 and batch2 are in hudi and ensures Row equality
              Since "delete_input_data" is set, deletes contents from batch1 and batch2.
Round2:    
    insert => inputPath/batch3
    upsert -> inputPath/batch4
    Validate with delete_input_data = true
              Validates contents from batch3 and batch4 are in hudi and ensures Row equality
              Since "delete_input_data" is set, deletes contents from batch3 and batch4.
Round3:    
    insert => inputPath/batch5
    upsert -> inputPath/batch6
    Validate with delete_input_data = true
              Validates contents from batch5 and batch6 are in hudi and ensures Row equality
              Since "delete_input_data" is set, deletes contents from batch5 and batch6.   
.
.

Deleting input after reach round will keep the input size in control if we are trying a long running job along w/ validation.

An example for validation w/o any delete flag set.

Round1: 
    insert => inputPath/batch1
    upsert -> inputPath/batch2
    Validate: validates contents from batch1 and batch2 are in hudi and ensures Row equality
Round2:    
    insert => inputPath/batch3
    upsert -> inputPath/batch4
    Validate: validates contents from batch1 to batch4 are in hudi and ensures Row equality
Round3:    
    insert => inputPath/batch5
    upsert -> inputPath/batch6
    Validate: validates contents from batch1 and batch6 are in hudi and ensures Row equality
.
.

You could also have validations in the middle of your dag and not set the "delete_input_data". But set it only in the
last node in the dag so that at the end of every round data is cleaned up.

Round1: 
    insert => inputPath/batch1
    upsert -> inputPath/batch2
    Validate: validates contents from batch1 and batch2 are in hudi and ensures Row equality
    insert => inputPath/batch3
    upsert -> inputPath/batch4
    Validate with delete_input_data = true
             Validates contents from batch1 to batch4 are in hudi and ensures Row equality
             since "delete_input_data" is set to true, this node deletes contents from batch1 and batch4.
Round2: 
    insert => inputPath/batch5
    upsert -> inputPath/batch6
    Validate: validates contents from batch5 and batch6 are in hudi and ensures Row equality
    insert => inputPath/batch7
    upsert -> inputPath/batch8
    Validate: validates contents from batch5 to batch8 are in hudi and ensures Row equality
             since "delete_input_data" is set to true, this node deletes contents from batch5 to batch8.
Round3: 
    insert => inputPath/batch9
    upsert -> inputPath/batch10
    Validate: validates contents from batch9 and batch10 are in hudi and ensures Row equality
    insert => inputPath/batch11
    upsert -> inputPath/batch12
     Validate with delete_input_data = true
             Validates contents from batch9 to batch12 are in hudi and ensures Row equality
             Set "delete_input_data" to true. so this node deletes contents from batch9 to batch12. 
.
.

More info can be found here: https://docs.google.com/document/d/1LrYx_WwaimWyd3gfAg2rrQQsTH3wPkqkbp3_-PpmgEo/edit?usp=sharing

  • All existing dags had misspelled num_partitions_insert/upsert config. Have fixed it.
  • In all sample dags and config, "timestamp" is used as the partition path. But there was no field for source ordering. Infact same field "timestamp" was set as source ordering field. But for updates, we need a different field as source ordering field. so have introduced a new field called "ts" in source.avsc and have fixed config values appropriately.
  • Have added some demo dag yamls for understanding ValidateDatasetNodes (cow-per-round-validate-once.yaml, cow-validate-cumulative-multiple-rounds.yaml, cow-per-round-mixed-validate.yaml)
  • Added preCombine field (set to "ts") in configs to be used while generating insert/update records. test suite sets this field's value to batchId. Since this will be used as source ordering field, we leverage batchId for monotonically increasing, so that its easy to reason about updates vs inserts(rather than generating random values).
  • Fixed TestSuitePathSelector which was comparing input directors as string rather than integers. And so, after batch9, batch10 wasn't getting returned from path selector.

Brief change log

  • Added "ValidateDatasetNode" to compare entire input dataset to hudi content for equality.
  • Added two new configs to assist in long running test suite(--num-rounds, --delay-between-rounds-mins)
  • All existing dags had misspelled num_partitions_insert/upsert config. Have fixed it.
  • In all sample dags and config, "timestamp" is used as the partition path. But there was no field for source ordering. Infact same field "timestamp" was set as source ordering field. But for updates, we need a different field as source ordering field as we can't update the partition path. so have introduced a new field called "ts" in source.avsc and have fixed config values appropriately to be used as source ordering field.
  • Have added some demo dag yamls for understanding ValidateDatasetNodes(cow-per-round-validate-once.yaml, cow-validate-cumulative-multiple-rounds.yaml, cow-per-round-mixed-validate.yaml)
  • Added preCombine field (set to "ts") in configs to be used while generating insert/update records. test suite sets this field's value to batchId. Since this will be used as source ordering field, we leverage batchId for monotonically increasing, so that its easy to reason about updates vs inserts(rather than generating random values).
  • Fixed TestSuitePathSelector which was comparing input directors as string rather than integers. And so, after batch9, batch10 wasn't getting returned from path selector.

Verify this pull request

(Please pick either of the following options)

  • Tested locally w/ test inputs in docker setup. These are the test variants verified.
    All tests are done for both COW and MOR(real time). Validation is done by both spark datasource and hive table via spark sql engine.
    a. one round
    b. multiple rounds by validating all data so far.
    c. multiple rounds w/ validation per round

Committer checklist

  • [ x] Has a corresponding JIRA in PR title & commit

  • Commit message is descriptive of the change

  • CI is green

  • [ x] Necessary doc changes done or have another open PR

  • For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.

@codecov-io
Copy link

codecov-io commented Oct 11, 2020

Codecov Report

Merging #2168 (bc2ff51) into master (5f5c15b) will decrease coverage by 43.12%.
The diff coverage is n/a.

Impacted file tree graph

@@              Coverage Diff              @@
##             master    #2168       +/-   ##
=============================================
- Coverage     53.17%   10.05%   -43.13%     
+ Complexity     2711       48     -2663     
=============================================
  Files           346       52      -294     
  Lines         15928     1850    -14078     
  Branches       1636      223     -1413     
=============================================
- Hits           8470      186     -8284     
+ Misses         6769     1651     -5118     
+ Partials        689       13      -676     
Flag Coverage Δ Complexity Δ
hudicli ? ?
hudiclient ? ?
hudicommon ? ?
hudihadoopmr ? ?
hudispark ? ?
huditimelineservice ? ?
hudiutilities 10.05% <ø> (-60.05%) 0.00 <ø> (ø)

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ Complexity Δ
...hudi/utilities/schema/FilebasedSchemaProvider.java 0.00% <ø> (-82.36%) 0.00 <0.00> (-5.00)
...va/org/apache/hudi/utilities/IdentitySplitter.java 0.00% <0.00%> (-100.00%) 0.00% <0.00%> (-2.00%)
...va/org/apache/hudi/utilities/schema/SchemaSet.java 0.00% <0.00%> (-100.00%) 0.00% <0.00%> (-3.00%)
...a/org/apache/hudi/utilities/sources/RowSource.java 0.00% <0.00%> (-100.00%) 0.00% <0.00%> (-4.00%)
.../org/apache/hudi/utilities/sources/AvroSource.java 0.00% <0.00%> (-100.00%) 0.00% <0.00%> (-1.00%)
.../org/apache/hudi/utilities/sources/JsonSource.java 0.00% <0.00%> (-100.00%) 0.00% <0.00%> (-1.00%)
...rg/apache/hudi/utilities/sources/CsvDFSSource.java 0.00% <0.00%> (-100.00%) 0.00% <0.00%> (-10.00%)
...g/apache/hudi/utilities/sources/JsonDFSSource.java 0.00% <0.00%> (-100.00%) 0.00% <0.00%> (-4.00%)
...apache/hudi/utilities/sources/JsonKafkaSource.java 0.00% <0.00%> (-100.00%) 0.00% <0.00%> (-6.00%)
...pache/hudi/utilities/sources/ParquetDFSSource.java 0.00% <0.00%> (-100.00%) 0.00% <0.00%> (-5.00%)
... and 326 more

@nsivabalan
Copy link
Contributor Author

@n3nash : Have fixed the patch to its completion. Please review when you can.

@nsivabalan nsivabalan force-pushed the testsuite_IngestionFix branch 2 times, most recently from 9e50d66 to 33d41db Compare October 28, 2020 13:02
@nsivabalan nsivabalan force-pushed the testsuite_IngestionFix branch from 33d41db to 8feb5a0 Compare November 4, 2020 05:18
@@ -0,0 +1,59 @@
# Licensed to the Apache Software Foundation (ASF) under one
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nsivabalan why do we need 2 same yaml dags one for once and one for multiple ?

WriterContext writerContext = new WriterContext(jsc, props, cfg, keyGenerator, sparkSession);
writerContext.initContext(jsc);
DagScheduler dagScheduler = new DagScheduler(workflowDag, writerContext);
DagScheduler dagScheduler = new DagScheduler(workflowDag, writerContext, jsc, cfg.numRounds, cfg.delayMins);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nsivabalan
Can we avoid introducing these high level arguments numRound and delayMins to the dagScheduler ? This messes with a simpler design and adds unnecessary overloading to the constructor.
Instead, we could do something like this

Introduce a top level yaml change around this
dag_name:
dag_rounds:
dag_intermittent_delay:
dag_props:
dag_content:
insert_node:
...
upsert_node:
...
You can then introduce a concept called RuntimeComposableDagNode that basically wraps this concept.

Let me know.

Copy link
Contributor

@n3nash n3nash left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nsivabalan Left some comments

@nsivabalan nsivabalan force-pushed the testsuite_IngestionFix branch from b6c466f to f91cb6a Compare November 21, 2020 19:34
@nsivabalan
Copy link
Contributor Author

@n3nash : addressed all feedback man. Please review it when you can.

@nsivabalan nsivabalan force-pushed the testsuite_IngestionFix branch from f91cb6a to bb66e34 Compare November 21, 2020 19:59
Copy link
Contributor Author

@nsivabalan nsivabalan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added some comments for easier review and have asked some clarifying questions.

Copy link
Contributor Author

@nsivabalan nsivabalan Nov 21, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't fixed these dags yet for long running test-suite. Once we have consensus that things are looking good, I will update all dags. Do check unit test dags under resources dir in hudi-integ-test module for updated version.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as per our discussion, I have added this new field from within the code. So essentially this field will be appended to both source schema and target schema and will be used for source ordering. batch_id is used as values.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high level config param parsing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as agreed, all node contents go into "dag_content"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is where the source ordering field is set.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the example of how the dag looks like w/ global params. I have tested this dag for few rounds of validation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since I am introducing a test schema provider to append source ordering field, I had to make this protected since the test schema provider extends from this.

log.debug("Listing all Micro batches to be validated :: " + fileStatus.getPath().toString());
}

String recordKeyField = context.getWriterContext().getProps().getString(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this the right way to fetch record key and partition path field ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nsivabalan This will just fetch the KEY names, I'm guessing you need the values for these keys ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nope. just the record key and partition path field names. we need to group by Hoodiekey in df

inputDf.groupByKey(
        (MapFunction<Row, String>) value -> partitionPathField + "+" + recordKeyField, Encoders.STRING())

@nsivabalan
Copy link
Contributor Author

nsivabalan commented Dec 1, 2020

@n3nash : the patch is good to be reviewed. would appreciate if you can take some time to close this out.

and a question on DFSHoodieDatasetInputReader. I see when we try to read lesser no of records than total records in one file slice, the reader returns entire contents from one file slice. Is that intentional or is it a bug ?

For eg: if I insert 1500 records
and next node is upsert of 200. this will actually update all 1500 records and not 200(assuming all 1500 is in one file slice).

I ran into this issue while testing out deletes with this patch. I thot there was some issue w/ deletes, since I saw all records were getting deleted ;) and after investigation, found its the reader thats doing this.

if (!numFiles.isPresent() || numFiles.get() == 0) {
      // If num files are not passed, find the number of files to update based on total records to update and records
      // per file
      numFilesToUpdate = (int) Math.ceil((double) numRecordsToUpdate.get() / recordsInSingleFile);
      // recordsInSingleFile is not average so we still need to account for bias is records distribution
      // in the files. Limit to the maximum number of files available.
      int totalExistingFilesCount = partitionToFileIdCountMap.values().stream().reduce((a, b) -> a + b).get();
      numFilesToUpdate = Math.min(numFilesToUpdate, totalExistingFilesCount);
      log.warn("aaa Files to update {}, numRecords toUpdate {}, records in Single file {} ", numFilesToUpdate, numRecordsToUpdate, recordsInSingleFile);
      numRecordsToUpdatePerFile = recordsInSingleFile;
    }

@nsivabalan
Copy link
Contributor Author

@n3nash : gentle ping.

@n3nash
Copy link
Contributor

n3nash commented Dec 8, 2020

@nsivabalan The use-case you described seems to be intentional but the behavior is not correct. If the number of records to update is explicitly asked by the dag, then Option<Long> numRecordsToUpdate should be set. In this case, the code path assumes it's not set. May be there's a bug in setting that variable ?
In general, left 2 comments, after they are addressed, can merge this.

@nsivabalan
Copy link
Contributor Author

nsivabalan commented Dec 10, 2020

@nsivabalan The use-case you described seems to be intentional but the behavior is not correct. If the number of records to update is explicitly asked by the dag, then Option<Long> numRecordsToUpdate should be set. In this case, the code path assumes it's not set. May be there's a bug in setting that variable ?
In general, left 2 comments, after they are addressed, can merge this.

I don't think so. this code path assume numFiles is not set, not numRecordsToUpdate. Check my comment in code snippet below.

if (!numFiles.isPresent() || numFiles.get() == 0) {
      numFilesToUpdate = (int) Math.ceil((double) numRecordsToUpdate.get() / recordsInSingleFile);
      int totalExistingFilesCount = partitionToFileIdCountMap.values().stream().reduce((a, b) -> a + b).get();
      numFilesToUpdate = Math.min(numFilesToUpdate, totalExistingFilesCount);
      numRecordsToUpdatePerFile = recordsInSingleFile; // this line ignores the numRecordsToUpdate passed in and sets to total records in one single file slice. 
    }

@nsivabalan
Copy link
Contributor Author

@n3nash : also, if are you ok, I can go ahead and change all dags to this new format.

@n3nash
Copy link
Contributor

n3nash commented Dec 14, 2020

@nsivabalan Yes, please go ahead and change the DAG to the new format. If there is any easy way to make sure the DagUtils ensure the new dag and old are backwards compatible (by providing defaults) would be great.

For the following code, see my comment

if (!numFiles.isPresent() || numFiles.get() == 0) {
numFilesToUpdate = (int) Math.ceil((double) numRecordsToUpdate.get() / recordsInSingleFile); // Nishith - This line ensures we are updating only the number of files that we should based on this math. I think the bug will happen when there is only 1 file to update (numRecordsToUpdate < recordsInSingleFile). We should add a check if that's the case and update numRecordsToUpdatePerFile to numRecordsToUpdate in that case.

  int totalExistingFilesCount = partitionToFileIdCountMap.values().stream().reduce((a, b) -> a + b).get();
  numFilesToUpdate = Math.min(numFilesToUpdate, totalExistingFilesCount);
  numRecordsToUpdatePerFile = recordsInSingleFile; // this line ignores the numRecordsToUpdate passed in and sets to total records in one single file slice. 
}

@nsivabalan nsivabalan force-pushed the testsuite_IngestionFix branch from e6f76b0 to bc2ff51 Compare December 18, 2020 07:02
@nsivabalan
Copy link
Contributor Author

@n3nash : fixed the bug as discussed. you can check it out. Wrt backwards compatability, its just 3 additional configs that one needs to set man. No other changes required. Not sure if we really need to make it backwards compatible. Should be easy to figure it out.

@nsivabalan
Copy link
Contributor Author

@n3nash : can you please review when you get time.

@nsivabalan nsivabalan added the priority:blocker Production down; release blocker label Dec 26, 2020
@n3nash n3nash self-requested a review December 26, 2020 17:26
Copy link
Contributor

@n3nash n3nash left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@n3nash n3nash merged commit 8cf6a72 into apache:master Dec 26, 2020
nbalajee pushed a commit to nbalajee/incubator-hudi that referenced this pull request Dec 28, 2020
…ning tests in test suite framework (apache#2168)

* trigger rebuild

* [HUDI-1156] Remove unused dependencies from HoodieDeltaStreamerWrapper Class (apache#1927)

* Adding support for validating records and long running tests in test sutie framework

* Adding partial validate node

* Fixing spark session initiation in Validate nodes

* Fixing validation

* Adding hive table validation to ValidateDatasetNode

* Rebasing with latest commits from master

* Addressing feedback

* Addressing comments

Co-authored-by: lamber-ken <[email protected]>
Co-authored-by: linshan-ma <[email protected]>
prashantwason pushed a commit to prashantwason/incubator-hudi that referenced this pull request Feb 22, 2021
…ning tests in test suite framework (apache#2168)

* trigger rebuild

* [HUDI-1156] Remove unused dependencies from HoodieDeltaStreamerWrapper Class (apache#1927)

* Adding support for validating records and long running tests in test sutie framework

* Adding partial validate node

* Fixing spark session initiation in Validate nodes

* Fixing validation

* Adding hive table validation to ValidateDatasetNode

* Rebasing with latest commits from master

* Addressing feedback

* Addressing comments

Co-authored-by: lamber-ken <[email protected]>
Co-authored-by: linshan-ma <[email protected]>
prashantwason pushed a commit to prashantwason/incubator-hudi that referenced this pull request Feb 22, 2021
…pache#2185)

Summary:
[MINOR] Make sure factory method is used to instanciate DFSPathSelector (apache#2187)

* Move createSourceSelector into DFSPathSelector factory method
* Replace constructor call with factory method
* Added some javadoc

[HUDI-1330] handle prefix filtering at directory level (apache#2157)

The current DFSPathSelector only ignore prefix(_, .) at the file level while files under subdirectories
e.g. (.checkpoint/*) are still considered which result in bad-format exception during reading.

[HUDI-1200] fixed NPE in CustomKeyGenerator (apache#2093)

- config field is no longer transient in key generator
- verified that the key generator object is shipped from the driver to executors, just the one time and reused for each record

[HUDI-1209] Properties File must be optional when running deltastreamer (apache#2085)

[MINOR] Fix caller to SparkBulkInsertCommitActionExecutor (apache#2195)

Fixed calling the wrong constructor

[HUDI-1326] Added an API to force publish metrics and flush them. (apache#2152)

* [HUDI-1326] Added an API to force publish metrics and flush them.

Using the added API, publish metrics after each level of the DAG completed in hudi-test-suite.

* Code cleanups

Co-authored-by: Vinoth Chandar <[email protected]>

[HUDI-1118] Cleanup rollback files residing in .hoodie folder (apache#2205)

[MINOR] Private the NoArgsConstructor of SparkMergeHelper and code clean (apache#2194)

1. Fix merge on read DAG to make docker demo pass (apache#2092)

1. Fix merge on read DAG to make docker demo pass (apache#2092)
2. Fix repeat_count, rollback node

[HUDI-1274] Make hive synchronization supports hourly partition (apache#2122)

[HUDI-1351] Improvements to the hudi test suite for scalability and repeated testing. (apache#2197)

1. Added the --clean-input and --clean-output parameters to clean the input and output directories before starting the job
2. Added the --delete-old-input parameter to deleted older batches for data already ingested. This helps keep number of redundant files low.
3. Added the --input-parallelism parameter to restrict the parallelism when generating input data. This helps keeping the number of generated input files low.
4. Added an option start_offset to Dag Nodes. Without ability to specify start offsets, data is generated into existing partitions. With start offset, DAG can control on which partition, the data is to be written.
5. Fixed generation of records for correct number of partitions
  - In the existing implementation, the partition is chosen as a random long. This does not guarantee exact number of requested partitions to be created.
6. Changed variable blacklistedFields to be a Set as that is faster than List for membership checks.
7. Fixed integer division for Math.ceil. If two integers are divided, the result is not double unless one of the integer is casted to double.

[HUDI-1338] Adding Delete support to test suite framework (apache#2172)

- Adding Delete support to test suite.
         Added DeleteNode
         Added support to generate delete records

Use RateLimiter instead of sleep. Repartition WriteStatus to optimize Hbase index writes (apache#1484)

[HUDI-912] Refactor and relocate KeyGenerator to support more engines (apache#2200)

* [HUDI-912] Refactor and relocate KeyGenerator to support more engines

* Rename KeyGenerators

[HUDI-892] RealtimeParquetInputFormat skip adding projection columns if there are no log files (apache#2190)

* [HUDI-892] RealtimeParquetInputFormat skip adding projection columns if there are no log files
* [HUDI-892]  for test
* [HUDI-892]  fix bug generate array from split
* [HUDI-892] revert test log

[HUDI-1352] Add FileSystemView APIs to query pending clustering operations (apache#2202)

[HUDI-1375] Fix bug in HoodieAvroUtils.removeMetadataFields() method (apache#2232)

Co-authored-by: Wenning Ding <[email protected]>

[HUDI-1358] Fix Memory Leak in HoodieLogFormatWriter (apache#2217)

[HUDI-1377] remove duplicate code (apache#2235)

[HUDI-1327] Introduce base implemetation of hudi-flink-client (apache#2176)

[HUDI-1400] Replace Operation enum with WriteOperationType (apache#2259)

[HUDI-1384] Decoupling hive jdbc dependency when HIVE_USE_JDBC_OPT_KEY set false (apache#2241)

[MINOR] clean up and add comments to flink client (apache#2261)

[MINOR] Add apacheflink label (apache#2268)

[HUDI-1393] Add compaction action in archive command (apache#2246)

[HUDI-1364] Add HoodieJavaEngineContext to hudi-java-client (apache#2222)

[HUDI-1396] Fix for preventing bootstrap datasource jobs from hanging via spark-submit (apache#2253)

Co-authored-by: Wenning Ding <[email protected]>

[HUDI-1358] Fix leaks in DiskBasedMap and LazyFileIterable (apache#2249)

[HUDI-1392] lose partition info when using spark parameter basePath (apache#2243)

Co-authored-by: zhang wen <[email protected]>

[MINOR] refactor code in HoodieMergeHandle (apache#2272)

[HUDI-1424]  Write Type changed to BULK_INSERT when set ENABLE_ROW_WRITER_OPT_KEY=true (apache#2289)

[HUDI-1373] Add Support for OpenJ9 JVM (apache#2231)

* add supoort for OpenJ9 VM
* add 32bit openJ9
* Pulled the memory layout specs into their own classes.

[HUDI-1357] Added a check to validate records are not lost during merges. (apache#2216)

- Turned off by default

[HUDI-1196] Update HoodieKey when deduplicating records with global index (apache#2248)

- Works only for overwrite payload (default)
- Does not alter current semantics otherwise

Co-authored-by: Ryan Pifer <[email protected]>

[HUDI-1349] spark sql support overwrite use insert_overwrite_table (apache#2196)

[HUDI-1343] Add standard schema postprocessor which would rewrite the schema using spark-avro conversion (apache#2192)

Co-authored-by: liujh <[email protected]>

[HUDI-1427] Fix FileAlreadyExistsException when set HOODIE_AUTO_COMMIT_PROP to true (apache#2295)

[HUDI-1412] Make HoodieWriteConfig support setting different default … (apache#2278)

* [HUDI-1412] Make HoodieWriteConfig support setting different default value according to engine type

fix typo (apache#2308)

Co-authored-by: Xi Chen <[email protected]>

[HUDI-1040] Make Hudi support Spark 3 (apache#2208)

* Fix flaky MOR unit test

* Update Spark APIs to make it be compatible with both spark2 & spark3

* Refactor bulk insert v2 part to make Hudi be able to compile with Spark3

* Add spark3 profile to handle fasterxml & spark version

* Create hudi-spark-common module & refactor hudi-spark related modules

Co-authored-by: Wenning Ding <[email protected]>

[MINOR] Throw an exception when keyGenerator initialization failed (apache#2307)

[HUDI-1395] Fix partition path using FSUtils (apache#2312)

Fixed the logic to get partition path in Copier and Exporter utilities.

[HUDI-1445] Refactor AbstractHoodieLogRecordScanner to use Builder (apache#2313)

[MINOR] Minor improve in IncrementalRelation (apache#2314)

[HUDI-1439] Remove scala dependency from hudi-client-common (apache#2306)

[HUDI-1428] Clean old fileslice is invalid (apache#2292)

Co-authored-by: zhang wen <[email protected]>
Co-authored-by: zhang wen <[email protected]>

[HUDI-1448]  Hudi dla sync support skip rt table syncing (apache#2324)

[HUDI-1435] Fix bug in Marker File Reconciliation for Non-Partitioned datasets (apache#2301)

[MINOR] Improve code readability by passing in the fileComparisonsRDD in bloom index (apache#2319)

[HUDI-1376] Drop Hudi metadata cols at the beginning of Spark datasource writing (apache#2233)

Co-authored-by: Wenning Ding <[email protected]>

[MINOR] Fix error information in exception (apache#2341)

[MINOR] Make QuickstartUtil generate random timestamp instead of 0 (apache#2340)

[HUDI-1406] Add date partition based source input selector for Delta streamer (apache#2264)

- Adds ability to list only recent date based partitions from source data.
- Parallelizes listing for faster tailing of DFSSources

[HUDI-1437]  support more accurate spark JobGroup for better performance tracking (apache#2322)

[HUDI-1470] Use the latest writer schema, when reading from existing parquet files in the hudi-test-suite (apache#2344)

[HUDI-115] Adding DefaultHoodieRecordPayload to honor ordering with combineAndGetUpdateValue (apache#2311)

* Added ability to pass in `properties` to payload methods, so they can perform table/record specific merges
* Added default methods so existing payload classes are backwards compatible.
* Adding DefaultHoodiePayload to honor ordering while merging two records
* Fixing default payload based on feedback

[HUDI-1419] Add base implementation for hudi java client (apache#2286)

[MINOR] Pass root exception to HoodieKeyGeneratorException for more information (apache#2354)

Co-authored-by: Xi Chen <[email protected]>

[HUDI-1075] Implement simple clustering strategies to create ClusteringPlan and to run the plan

[HUDI-1471] Make QuickStartUtils generate deletes according to specific ts (apache#2357)

[HUDI-1485] Fix Deletes issued without any prior commits exception (apache#2361)

[HUDI-1488] Fix Test Case Failure in TestHBaseIndex (apache#2365)

[HUDI-1489] Fix null pointer exception when reading updated written bootstrap table (apache#2370)

Co-authored-by: Wenning Ding <[email protected]>

[HUDI-1451] Support bulk insert v2 with Spark 3.0.0 (apache#2328)

Co-authored-by: Wenning Ding <[email protected]>

- Added support for bulk insert v2 with datasource v2 api in Spark 3.0.0.

[HUDI-1487] fix unit test testCopyOnWriteStorage random failed (apache#2364)

[HUDI-1490] Incremental Query should work even when there are  partitions that have no incremental changes (apache#2371)

* Incremental Query should work even when there are  partitions that have no incremental changes

Co-authored-by: Sivabalan Narayanan <[email protected]>

[HUDI-1331] Adding support for validating entire dataset and long running tests in test suite framework (apache#2168)

* trigger rebuild

* [HUDI-1156] Remove unused dependencies from HoodieDeltaStreamerWrapper Class (apache#1927)

* Adding support for validating records and long running tests in test sutie framework

* Adding partial validate node

* Fixing spark session initiation in Validate nodes

* Fixing validation

* Adding hive table validation to ValidateDatasetNode

* Rebasing with latest commits from master

* Addressing feedback

* Addressing comments

Co-authored-by: lamber-ken <[email protected]>
Co-authored-by: linshan-ma <[email protected]>

[HUDI-1481]  add  structured streaming and delta streamer clustering unit test (apache#2360)

[HUDI-1354] Block updates and replace on file groups in clustering (apache#2275)

* [HUDI-1354] Block updates and replace on file groups in clustering

* [HUDI-1354]  Block updates and replace on file groups in clustering

[HUDI-1350] Support Partition level delete API in HUDI (apache#2254)

* [HUDI-1350] Support Partition level delete API in HUDI

* [HUDI-1350] Support Partition level delete API in HUDI base InsertOverwriteCommitAction

* [HUDI-1350] Support Partition level delete API in HUDI base InsertOverwriteCommitAction

[HUDI-1495] Upgrade Flink version to 1.12.0 (apache#2384)

[MINOR] Remove the duplicate code in AbstractHoodieWriteClient.startCommit (apache#2385)

[HUDI-1398] Align insert file size for reducing IO (apache#2256)

* [HUDI-1398] Align insert file size for reducing IO

Co-authored-by: zhang wen <[email protected]>

[HUDI-1484] Escape the partition value in HiveSyncTool (apache#2363)

[HUDI-1474] Add additional unit tests to TestHBaseIndex (apache#2349)

[HUDI-1147] Modify GenericRecordFullPayloadGenerator to generate vali… (apache#2045)

* [HUDI-1147] Modify GenericRecordFullPayloadGenerator to generate valid timestamps

Co-authored-by: Sivabalan Narayanan <[email protected]>

[HUDI-1493] Fixed schema compatibility check for fields. (apache#2350)

Some field types changes are allowed (e.g. int -> long) while maintaining schema backward compatibility within HUDI. The check was reversed with the reader schema being passed for the write schema.

[MINOR] Update report_coverage.sh (apache#2396)

[HUDI-1434] fix incorrect log file path in HoodieWriteStat (apache#2300)

* [HUDI-1434] fix incorrect log file path in HoodieWriteStat

* HoodieWriteHandle#close() returns a list of WriteStatus objs

* Handle rolled-over log files and return a WriteStatus per log file written

 - Combined data and delete block logging into a single call
 - Lazily initialize and manage write status based on returned AppendResult
 - Use FSUtils.getFileSize() to set final file size, consistent with other handles
 - Added tests around returned values in AppendResult
 - Added validation of the file sizes returned in write stat

Co-authored-by: Vinoth Chandar <[email protected]>

[HUDI-1418] Set up flink client unit test infra (apache#2281)

[MINOR] Sync UpsertPartitioner modify of HUDI-1398 to flink/java (apache#2390)

Co-authored-by: zhang wen <[email protected]>

[HUDI-1423] Support delete in hudi-java-client (apache#2353)

[MINOR] Add maven profile to support skipping shade sources jars (apache#2358)

Co-authored-by: Xi Chen <[email protected]>

[HUDI-842] Implementation of HUDI RFC-15.

 - Introduced an internal metadata table, that stores file listings.
 - metadata table is kept upto date with
 - Fixed handling of CleanerPlan.
 - [HUDI-842] Reduce parallelism to speed up the test.
 - [HUDI-842] Implementation of CLI commands for metadata operations and lookups.
 - [HUDI-842] Extend rollback metadata to include the files which have been appended to.
 - [HUDI-842] Support for rollbacks in MOR Table.
 - MarkerBasedRollbackStrategy needs to correctly provide the list of files for which rollback blocks were appended.
 - [HUDI-842] Added unit test for rollback of partial commits (inflight but not completed yet).
 - [HUDI-842] Handled the error case where metadata update succeeds but dataset commit fails.
 - [HUDI-842] Schema evolution strategy for Metadata Table. Each type of metadata saved (FilesystemMetadata, ColumnIndexMetadata, etc.) will be a separate field with default null. The type of the record will identify the valid field. This way, we can grow the schema when new type of information is saved within in which still keeping it backward compatible.
 - [HUDI-842] Fix non-partitioned case and speedup initial creation of metadata table.Choose only 1 partition for jsc as the number of records is low (hundreds to thousands). There is more overhead of creating large number of partitions for JavaRDD and it slows down operations like WorkloadProfile.
For the non-partitioned case, use "." as the name of the partition to prevent empty keys in HFile.
 - [HUDI-842] Reworked metrics pusblishing.
 - Code has been split into reader and writer side. HoodieMetadata code to be accessed by using HoodieTable.metadata() to get instance of metdata for the table.
Code is serializable to allow executors to use the functionality.
 - [RFC-15] Add metrics to track the time for each file system call.
 - [RFC-15] Added a distributed metrics registry for spark which can be used to collect metrics from executors. This helps create a stats dashboard which shows the metadata table improvements in real-time for production tables.
 - [HUDI-1321] Created HoodieMetadataConfig to specify configuration for the metadata table. This is safer than full-fledged properties for the metadata table (like HoodieWriteConfig) as it makes burdensome to tune the metadata. With limited configuration, we can control the performance of the metadata table closely.

[HUDI-1319][RFC-15] Adding interfaces for HoodieMetadata, HoodieMetadataWriter (apache#2266)
 - moved MetadataReader to HoodieBackedTableMetadata, under the HoodieTableMetadata interface
 - moved MetadataWriter to HoodieBackedTableMetadataWriter, under the HoodieTableMetadataWriter
 - Pulled all the metrics into HoodieMetadataMetrics
 - Writer now wraps the metadata, instead of extending it
 - New enum for MetadataPartitionType
 - Streamlined code flow inside HoodieBackedTableMetadataWriter w.r.t initializing metadata state
 - [HUDI-1319] Make async operations work with metadata table (apache#2332)
 - Changes the syncing model to only move over completed instants on data timeline
 - Syncing happens postCommit and on writeClient initialization
 - Latest delta commit on the metadata table is sufficient as the watermark for data timeline archival
 - Cleaning/Compaction use a suffix to the last instant written to metadata table, such that we keep the 1-1
 - .. mapping between data and metadata timelines.
 - Got rid of a lot of the complexity around checking for valid commits during open of base/log files
 - Tests now use local FS, to simulate more failure scenarios
 - Some failure scenarios exposed HUDI-1434, which is needed for MOR to work correctly

co-authored by: Vinoth Chandar <[email protected]>

[HUDI-1450] Use metadata table for listing in HoodieROTablePathFilter (apache#2326)

[HUDI-1394] [RFC-15] Use metadata table (if present) to get all partition paths (apache#2351)

[HUDI-1469] Faster initialization of metadata table using parallelized listing. (apache#2343)

 * [HUDI-1469] Faster initialization of metadata table using parallelized listing which finds partitions and files in a single scan.
 * MINOR fixes

Co-authored-by: Vinoth Chandar <[email protected]>

[HUDI-1325] [RFC-15] Merge updates of unsynced instants to metadata table (apache#2342)

[RFC-15] Fix partition key in metadata table when bootstrapping from file system (apache#2387)

Co-authored-by: Ryan Pifer <[email protected]>

[HUDI-1312] [RFC-15] Support for metadata listing for snapshot queries through Hive/SparkSQL (apache#2366)

Co-authored-by: Ryan Pifer <[email protected]>

[HUDI-1504] Allow log files generated during restore/rollback to be synced as well

 - TestHoodieBackedMetadata#testSync etc now run for MOR tables
 - HUDI-1502 is still pending and has issues for MOR/rollbacks
 - Also addressed bunch of code review comments.

[HUDI-1498] Read clustering plan from requested file for inflight instant (apache#2389)

[HUDI-1506] Fix wrong exception thrown in HoodieAvroUtils (apache#2405)

[HUDI-1383] Fixing sorting of partition vals for hive sync computation (apache#2402)

[HUDI-1507] Change timeline utils to support reading replacecommit metadata (apache#2407)

[MINOR] Rename unit test package of hudi-spark3 from scala to java (apache#2411)

[HUDI-1513] Introduce WriteClient#preWrite() and relocate metadata table syncing (apache#2413)

- Syncing to metadata table, setting operation type, starting async cleaner done in preWrite()
 - Fixes an issues where delete() was not starting async cleaner correctly
 - Fixed tests and enabled metadata table for TestAsyncCompaction

[HUDI-1510] Move HoodieEngineContext and its dependencies to hudi-common (apache#2410)

[MINOR] Sync HUDI-1196 to  FlinkWriteHelper (apache#2415)

[HUDI-1514] Avoid raw type use for parameter of Transformer interface (apache#2420)

[HUDI-920] Support Incremental query for MOR table (apache#1938)

[HUDI-1276] [HUDI-1459] Make Clustering/ReplaceCommit and Metadata table be compatible (apache#2422)

* [HUDI-1276] [HUDI-1459] Make Clustering/ReplaceCommit and Metadata table be compatible

* Use filesystemview and json format from metadata. Add tests

Co-authored-by: Satish Kotha <[email protected]>

[HUDI-1399] support a independent clustering spark job to asynchronously clustering (apache#2379)

* [HUDI-1481]  add  structured streaming and delta streamer clustering unit test

* [HUDI-1399] support a independent clustering spark job to asynchronously clustering

* [HUDI-1399]  support a  independent clustering spark job to asynchronously clustering

* [HUDI-1498] Read clustering plan from requested file for inflight instant (apache#2389)

* [HUDI-1399]  support  a independent clustering spark job with schedule generate instant time

Co-authored-by: satishkotha <[email protected]>

[MINOR] fix spark 3 build for incremental query on MOR (apache#2425)

[HUDI-1479] Use HoodieEngineContext to parallelize fetching of partiton paths (apache#2417)

* [HUDI-1479] Use HoodieEngineContext to parallelize fetching of partition paths

* Adding testClass for FileSystemBackedTableMetadata

Co-authored-by: Nishith Agarwal <[email protected]>

[HUDI-1520] add configure for spark sql overwrite use INSERT_OVERWRITE_TABLE (apache#2428)

[HUDI-1502] MOR rollback and restore support for metadata sync (apache#2421)

- Adds field to RollbackMetadata that capture the logs written for rollback blocks
- Adds field to RollbackMetadata that capture new logs files written by unsynced deltacommits

Co-authored-by: Vinoth Chandar <[email protected]>

Reviewers: O955 Project Hoodie Project Reviewer: Add blocking reviewers!, PHID-PROJ-pxfpotkfgkanblb3detq!, #ldap_hudi, modi

Reviewed By: #ldap_hudi, modi

Differential Revision: https://code.uberinternal.com/D5347141
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

priority:blocker Production down; release blocker

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants