From e746da9b8a33a14c9157fefa8410e453a7ad9b6d Mon Sep 17 00:00:00 2001 From: Y Ethan Guo Date: Sat, 25 Nov 2023 12:30:55 -0800 Subject: [PATCH] [HUDI-7139] Fix operation type for bulk insert with row writer in Hudi Streamer --- ...amerDatasetBulkInsertCommitActionExecutor.java | 10 ++-------- .../deltastreamer/TestHoodieDeltaStreamer.java | 15 ++++++++++++--- 2 files changed, 14 insertions(+), 11 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/HoodieStreamerDatasetBulkInsertCommitActionExecutor.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/HoodieStreamerDatasetBulkInsertCommitActionExecutor.java index 5593a95ca393a..2a5113538e4d5 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/HoodieStreamerDatasetBulkInsertCommitActionExecutor.java +++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/HoodieStreamerDatasetBulkInsertCommitActionExecutor.java @@ -26,9 +26,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.table.action.HoodieWriteMetadata; -import org.apache.spark.api.java.JavaRDD; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; @@ -44,12 +42,8 @@ public HoodieStreamerDatasetBulkInsertCommitActionExecutor(HoodieWriteConfig con @Override protected void preExecute() { - // no op - } - - @Override - protected void afterExecute(HoodieWriteMetadata> result) { - // no op + table.validateInsertSchema(); + writeClient.preWrite(instantTime, getWriteOperationType(), table.getMetaClient()); } @Override diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java index f5304cce80820..62aa7328fbbb0 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java @@ -1377,7 +1377,10 @@ private void testBulkInsertRowWriterMultiBatches(Boolean useSchemaProvider, List if (i == 2 || i == 4) { // this validation reloads the timeline. So, we are validating only for first and last batch. // validate commit metadata for all completed commits to have valid schema in extra metadata. HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setBasePath(tableBasePath).setConf(jsc.hadoopConfiguration()).build(); - metaClient.reloadActiveTimeline().getCommitsTimeline().filterCompletedInstants().getInstants().forEach(entry -> assertValidSchemaInCommitMetadata(entry, metaClient)); + metaClient.reloadActiveTimeline().getCommitsTimeline() + .filterCompletedInstants().getInstants() + .forEach(entry -> assertValidSchemaAndOperationTypeInCommitMetadata( + entry, metaClient, WriteOperationType.BULK_INSERT)); } } } finally { @@ -1743,15 +1746,21 @@ private void testParquetDFSSource(boolean useSchemaProvider, List transf assertRecordCount(parquetRecordsCount + 100, tableBasePath, sqlContext); // validate commit metadata for all completed commits to have valid schema in extra metadata. HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setBasePath(tableBasePath).setConf(jsc.hadoopConfiguration()).build(); - metaClient.reloadActiveTimeline().getCommitsTimeline().filterCompletedInstants().getInstants().forEach(entry -> assertValidSchemaInCommitMetadata(entry, metaClient)); + metaClient.reloadActiveTimeline().getCommitsTimeline() + .filterCompletedInstants().getInstants() + .forEach(entry -> assertValidSchemaAndOperationTypeInCommitMetadata( + entry, metaClient, WriteOperationType.INSERT)); testNum++; } - private void assertValidSchemaInCommitMetadata(HoodieInstant instant, HoodieTableMetaClient metaClient) { + private void assertValidSchemaAndOperationTypeInCommitMetadata(HoodieInstant instant, + HoodieTableMetaClient metaClient, + WriteOperationType operationType) { try { HoodieCommitMetadata commitMetadata = HoodieCommitMetadata .fromBytes(metaClient.getActiveTimeline().getInstantDetails(instant).get(), HoodieCommitMetadata.class); assertFalse(StringUtils.isNullOrEmpty(commitMetadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY))); + assertEquals(operationType, commitMetadata.getOperationType()); } catch (IOException ioException) { throw new HoodieException("Failed to parse commit metadata for " + instant.toString()); }