diff --git a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkInsertNode.scala b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkInsertNode.scala index b8c46cad3fd69..b0bec48a40e83 100644 --- a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkInsertNode.scala +++ b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkInsertNode.scala @@ -47,7 +47,7 @@ class SparkInsertNode(dagNodeConfig: Config) extends DagNode[RDD[WriteStatus]] { override def execute(context: ExecutionContext, curItrCount: Int): Unit = { if (!config.isDisableGenerate) { println("Generating input data for node {}", this.getName) - context.getDeltaGenerator().writeRecords(context.getDeltaGenerator().generateInserts(config)).count() + writeRecords(context) } val inputDF = AvroConversionUtils.createDataFrame(context.getWriterContext.getHoodieTestSuiteWriter.getNextBatch, context.getWriterContext.getHoodieTestSuiteWriter.getSchema, @@ -68,4 +68,8 @@ class SparkInsertNode(dagNodeConfig: Config) extends DagNode[RDD[WriteStatus]] { def getOperation(): String = { DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL } + + def writeRecords(context: ExecutionContext): Unit = { + context.getDeltaGenerator().writeRecords(context.getDeltaGenerator().generateInserts(config)).count() + } } diff --git a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkUpsertNode.scala b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkUpsertNode.scala index 113de93adbb3a..f83bc55633db2 100644 --- a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkUpsertNode.scala +++ b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkUpsertNode.scala @@ -20,6 +20,7 @@ package org.apache.hudi.integ.testsuite.dag.nodes import org.apache.hudi.DataSourceWriteOptions import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config +import org.apache.hudi.integ.testsuite.dag.ExecutionContext /** * Spark datasource based upsert node @@ -31,4 +32,8 @@ class SparkUpsertNode(dagNodeConfig: Config) extends SparkInsertNode(dagNodeConf override def getOperation(): String = { DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL } + + override def writeRecords(context: ExecutionContext): Unit = { + context.getDeltaGenerator().writeRecords(context.getDeltaGenerator().generateUpdates(config)).count() + } }