From fe1e2fefa4311970df9a9caa183333fa76f83f12 Mon Sep 17 00:00:00 2001 From: sivabalan Date: Mon, 13 Jun 2022 12:50:55 -0400 Subject: [PATCH] Fixing NPE with row writer path and with OCC --- .../hudi/client/BaseHoodieWriteClient.java | 2 +- .../hudi/client/HoodieFlinkWriteClient.java | 2 +- .../internal/DataSourceInternalWriterHelper.java | 1 + .../apache/hudi/TestHoodieSparkSqlWriter.scala | 16 ++++++++++++++-- 4 files changed, 17 insertions(+), 4 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java index 455cb644c7d47..5c358644851dd 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java @@ -484,7 +484,7 @@ public abstract O bulkInsertPreppedRecords(I preppedRecords, final String instan * @param writeOperationType * @param metaClient */ - protected void preWrite(String instantTime, WriteOperationType writeOperationType, + public void preWrite(String instantTime, WriteOperationType writeOperationType, HoodieTableMetaClient metaClient) { setOperationType(writeOperationType); this.lastCompletedTxnAndMetadata = TransactionUtils.getLastCompletedTxnInstantAndMetadata(metaClient); diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index b68cf97e9aa35..b4348c9c569cc 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -250,7 +250,7 @@ public List delete(List keys, String instantTime) { } @Override - protected void preWrite(String instantTime, WriteOperationType writeOperationType, HoodieTableMetaClient metaClient) { + public void preWrite(String instantTime, WriteOperationType writeOperationType, HoodieTableMetaClient metaClient) { setOperationType(writeOperationType); // Note: the code to read the commit metadata is not thread safe for JSON deserialization, // remove the table metadata sync diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java index 5aa82642de62e..3a349473b2201 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java +++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java @@ -68,6 +68,7 @@ public DataSourceInternalWriterHelper(String instantTime, HoodieWriteConfig writ this.metaClient = HoodieTableMetaClient.builder().setConf(configuration).setBasePath(writeConfig.getBasePath()).build(); this.metaClient.validateTableProperties(writeConfig.getProps()); this.hoodieTable = HoodieSparkTable.create(writeConfig, new HoodieSparkEngineContext(new JavaSparkContext(sparkSession.sparkContext())), metaClient); + writeClient.preWrite(instantTime, WriteOperationType.BULK_INSERT, metaClient); } public boolean useCommitCoordinator() { diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala index 928b1b1a1eec7..4829c449325ad 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala @@ -167,14 +167,21 @@ class TestHoodieSparkSqlWriter { * @param sortMode Bulk insert sort mode * @param populateMetaFields Flag for populating meta fields */ - def testBulkInsertWithSortMode(sortMode: BulkInsertSortMode, populateMetaFields: Boolean = true): Unit = { + def testBulkInsertWithSortMode(sortMode: BulkInsertSortMode, populateMetaFields: Boolean = true, enableOCCConfigs: Boolean = false): Unit = { //create a new table - val fooTableModifier = commonTableModifier.updated("hoodie.bulkinsert.shuffle.parallelism", "4") + var fooTableModifier = commonTableModifier.updated("hoodie.bulkinsert.shuffle.parallelism", "4") .updated(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL) .updated(DataSourceWriteOptions.ENABLE_ROW_WRITER.key, "true") .updated(HoodieTableConfig.POPULATE_META_FIELDS.key(), String.valueOf(populateMetaFields)) .updated(HoodieWriteConfig.BULK_INSERT_SORT_MODE.key(), sortMode.name()) + if (enableOCCConfigs) { + fooTableModifier = fooTableModifier + .updated("hoodie.write.concurrency.mode","optimistic_concurrency_control") + .updated("hoodie.cleaner.policy.failed.writes","LAZY") + .updated("hoodie.write.lock.provider","org.apache.hudi.client.transaction.lock.InProcessLockProvider") + } + // generate the inserts val schema = DataSourceTestUtils.getStructTypeExampleSchema val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema) @@ -306,6 +313,11 @@ class TestHoodieSparkSqlWriter { testBulkInsertWithSortMode(sortMode, populateMetaFields = true) } + @Test + def testBulkInsertForSortModeWithOCC(): Unit = { + testBulkInsertWithSortMode(BulkInsertSortMode.GLOBAL_SORT, populateMetaFields = true, true) + } + /** * Test case for Bulk insert with populating meta fields or * without populating meta fields.