Skip to content

Conversation

@nsivabalan
Copy link
Contributor

@nsivabalan nsivabalan commented Jul 15, 2020

What is the purpose of the pull request

  • Adding support for "bulk_insert_dataset" which has better performance compared to existing "bulk_insert".
  • This implementation uses Datasource for writing to storage.
  • Support for key generators to operate on Row(rather than HoodieRecords as per existing "bulk_insert") is added.

Brief change log

  • Added support for "bulk_insert_dataset" which uses Datasource for writing.
  • This path introduces a new datasource called "org.apache.hudi.internal" and all supporting cast like DefaultSource, DataSourceWriter(HoodieDataSourceInternalWriter), DataWriterFactory(HoodieBulkInsertDataInternalWriterFactory), DataWriter(HoodieBulkInsertDataInternalWriter), etc for the same.
  • This patch also introduces HoodieRowCreateHandle, HoodieInternalRowFileWriter, HoodieInternalRowParquetWriter, etc to assist in writing InternalRows to parquet(and respective factory classes).
  • Have added HoodieInternalRow which wraps InternalRow and exposes meta columns.
  • Have added HoodieInternalWriteStatus to hold write status (instead of WriteStatus used in HoodieRecord write paths), since this hold Rows instead of HoodieRecords.
  • This patch adds changes to KeyGenerator to ensure getRecordKey and getPartitionPath is supported with Row for "bulk_insert_dataset". New apis are added to KeyGenerator, but default implementation is added so as to not have any breaking change. All keygenerator implementations have been fixed on this regards.
  • Added HoodieDatasetBulkInsertHelper to assist in prepping the dataset before calling into datasource write. For commit, HoodieWriteClient is leveraged.
  • Some additional functionalities on top of bulk inserts are not covered in this patch. Have create HUDI-1014, HUDI-1105 and HUDI-1106. Namely, UserDefinedCustomPartitioner, Dedup and drop duplicates.

Verify this pull request

This change added tests and can be verified as follows:

  • Added tests in HoodieSparkSqlWriterSuite to test end to end bulk_insert_dataset
  • Added tests to test HoodieInternalWriteStatus, HoodieInternalRow, HoodieRowCreateHandle, HoodieInternalRowParquetWriter, HoodieDatasetBulkInsertHelper, HoodieBulkInsertDataInternalWriter
  • Added tests to test HoodieDataSourceInternalWriter for commit, abort, large writes and multiple writes
  • Added tests to test all key generators for new apis with Row

Committer checklist

  • Has a corresponding JIRA in PR title & commit

  • Commit message is descriptive of the change

  • CI is green

  • Necessary doc changes done or have another open PR

  • For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.

@nsivabalan nsivabalan changed the title [WIP][HUDI-1013] Adding Bulk Insert V2 implementation [HUDI-1013] Adding Bulk Insert V2 implementation Jul 17, 2020
@nsivabalan nsivabalan force-pushed the blk_insert_dataset branch 2 times, most recently from d03df7b to 8366e83 Compare July 19, 2020 18:00
@nsivabalan nsivabalan force-pushed the blk_insert_dataset branch 2 times, most recently from a5f608c to e5d4939 Compare July 22, 2020 11:20
@vinothchandar vinothchandar self-assigned this Jul 22, 2020
Copy link
Member

@vinothchandar vinothchandar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nsivabalan I made a high level pass. Mostly lgtm. Will make some changes, test and land.

@nsivabalan nsivabalan force-pushed the blk_insert_dataset branch 4 times, most recently from af27762 to 1a11d1c Compare July 24, 2020 06:19
@nsivabalan nsivabalan force-pushed the blk_insert_dataset branch 3 times, most recently from c957008 to dacd635 Compare August 6, 2020 11:59
Copy link
Contributor Author

@nsivabalan nsivabalan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leaving some notes for reviewer :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to reviewer: I am yet to add tests to these new methods. Got these as part of rebase. Also, I notice few other test classes for each key generators after rebasing. Will add tests by tmrw to those new test classes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nsivabalan this is done?

Copy link
Member

@vinothchandar vinothchandar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Took a high level pass. Great work @bvaradar , @nsivabalan . Did some testing as well.

@nsivabalan
Copy link
Contributor Author

nsivabalan commented Aug 11, 2020

#1834 (comment)
: bcoz, this is for Row where as existing WriteStats is for HoodieRecords. Guess we should have templatized this too.

Copy link
Contributor Author

@nsivabalan nsivabalan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one nit on HoodieSparkSqlWriter.

Copy link
Member

@vinothchandar vinothchandar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bvaradar @nsivabalan this is a more detailed review with notes to self. Please take a look and chime in. I think we need to be careful about how we future proof the KeyGenerator API.

@Override
public String getPartitionPath(Row row) {
Object fieldVal = null;
Object partitionPathFieldVal = RowKeyGeneratorHelper.getNestedFieldVal(row, getPartitionPathPositions().get(getPartitionPathFields().get(0)));

This comment was marked as resolved.

This comment was marked as resolved.

Copy link
Contributor Author

@nsivabalan nsivabalan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some notes to reviewer.

Copy link
Contributor Author

@nsivabalan nsivabalan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some more comments.

 - Clean up KeyGenerator classes and fix test failures
public HoodieRowParquetWriteSupport(Configuration conf, StructType structType, BloomFilter bloomFilter) {
super();
Configuration hadoopConf = new Configuration(conf);
hadoopConf.set("spark.sql.parquet.writeLegacyFormat", "false");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. why we are hardcoding this. any ideas @bvaradar ?

@vinothchandar vinothchandar force-pushed the blk_insert_dataset branch 2 times, most recently from c8745f8 to 5dc8182 Compare August 13, 2020 04:38
 - Introduced KeyGeneratorInterface in hudi-client, moved KeyGenerator back to hudi-spark
 - Simplified the new API additions to just two new methods : getRecordKey(row), getPartitionPath(row)
 - Fixed all built-in key generators with new APIs
 - Made the field position map lazily created upon the first call to row based apis
 - Implemented native row based key generators for CustomKeyGenerator
 - Fixed all the tests, with these new APIs
@vinothchandar
Copy link
Member

@nsivabalan this is ready. I am going ahead and merging. I also re-ran the benchmark again . Seems to clock the same 30 mins against spark.write.parquet.

Please carefully go over the changes I have made in the last commits here.. and see if anything needs follow on fixing. Our timelines are tight. we need to do it tomorrow, if at all

@vinothchandar vinothchandar merged commit 379cf07 into apache:master Aug 13, 2020
.forEach(f -> partitionPathPositions.put(f,
RowKeyGeneratorHelper.getNestedFieldIndices(structType, f, false)));
}
this.structType = structType;
Copy link
Contributor Author

@nsivabalan nsivabalan Aug 13, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may I know where is the structType being used ? AvroConversionHelper.createConverterToAvro used row.Schema() and so we may not need it. probably we should rename this to boolean positionMapInitialized.

Copy link
Contributor Author

@nsivabalan nsivabalan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

few minor comments

// insert/bulk-insert combining to be true, if filtering for duplicates
boolean combineInserts = Boolean.parseBoolean(parameters.get(DataSourceWriteOptions.INSERT_DROP_DUPS_OPT_KEY()));
HoodieWriteConfig.Builder builder = HoodieWriteConfig.newBuilder()
.withPath(basePath).withAutoCommit(false).combineInput(combineInserts, true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @nsivabalan Do you got any idea why we disable AutoCommit by default when creating HoodieWriteConfig?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants