Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ public abstract O bulkInsertPreppedRecords(I preppedRecords, final String instan
* @param writeOperationType
* @param metaClient
*/
protected void preWrite(String instantTime, WriteOperationType writeOperationType,
public void preWrite(String instantTime, WriteOperationType writeOperationType,
HoodieTableMetaClient metaClient) {
setOperationType(writeOperationType);
this.lastCompletedTxnAndMetadata = TransactionUtils.getLastCompletedTxnInstantAndMetadata(metaClient);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ public List<WriteStatus> delete(List<HoodieKey> keys, String instantTime) {
}

@Override
protected void preWrite(String instantTime, WriteOperationType writeOperationType, HoodieTableMetaClient metaClient) {
public void preWrite(String instantTime, WriteOperationType writeOperationType, HoodieTableMetaClient metaClient) {
setOperationType(writeOperationType);
// Note: the code to read the commit metadata is not thread safe for JSON deserialization,
// remove the table metadata sync
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public DataSourceInternalWriterHelper(String instantTime, HoodieWriteConfig writ
this.metaClient = HoodieTableMetaClient.builder().setConf(configuration).setBasePath(writeConfig.getBasePath()).build();
this.metaClient.validateTableProperties(writeConfig.getProps());
this.hoodieTable = HoodieSparkTable.create(writeConfig, new HoodieSparkEngineContext(new JavaSparkContext(sparkSession.sparkContext())), metaClient);
writeClient.preWrite(instantTime, WriteOperationType.BULK_INSERT, metaClient);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fix makes sense but the not a good idea to put a write logic in the constructor, WDYT ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah. makes sense. Only other option I see is to add it to createInflightCommit(). Do you suggest to do that or should we introduce a new method and explicitly call before calling createInflightCommit?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is not ideal but sort of existing pattern. writeClient.startCommitWithTime is already writing files there. I'd say it's ok to go ahead with this and file a refactoring task

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

public boolean useCommitCoordinator() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,14 +167,21 @@ class TestHoodieSparkSqlWriter {
* @param sortMode Bulk insert sort mode
* @param populateMetaFields Flag for populating meta fields
*/
def testBulkInsertWithSortMode(sortMode: BulkInsertSortMode, populateMetaFields: Boolean = true): Unit = {
def testBulkInsertWithSortMode(sortMode: BulkInsertSortMode, populateMetaFields: Boolean = true, enableOCCConfigs: Boolean = false): Unit = {
//create a new table
val fooTableModifier = commonTableModifier.updated("hoodie.bulkinsert.shuffle.parallelism", "4")
var fooTableModifier = commonTableModifier.updated("hoodie.bulkinsert.shuffle.parallelism", "4")
.updated(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL)
.updated(DataSourceWriteOptions.ENABLE_ROW_WRITER.key, "true")
.updated(HoodieTableConfig.POPULATE_META_FIELDS.key(), String.valueOf(populateMetaFields))
.updated(HoodieWriteConfig.BULK_INSERT_SORT_MODE.key(), sortMode.name())

if (enableOCCConfigs) {
fooTableModifier = fooTableModifier
.updated("hoodie.write.concurrency.mode","optimistic_concurrency_control")
.updated("hoodie.cleaner.policy.failed.writes","LAZY")
.updated("hoodie.write.lock.provider","org.apache.hudi.client.transaction.lock.InProcessLockProvider")
}

// generate the inserts
val schema = DataSourceTestUtils.getStructTypeExampleSchema
val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema)
Expand Down Expand Up @@ -306,6 +313,11 @@ class TestHoodieSparkSqlWriter {
testBulkInsertWithSortMode(sortMode, populateMetaFields = true)
}

@Test
def testBulkInsertForSortModeWithOCC(): Unit = {
testBulkInsertWithSortMode(BulkInsertSortMode.GLOBAL_SORT, populateMetaFields = true, true)
}

/**
* Test case for Bulk insert with populating meta fields or
* without populating meta fields.
Expand Down