diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java index 41ea4ebe4154a..855a2ca1b4d0d 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java @@ -98,15 +98,42 @@ public static DataStreamSink append(Configuration conf, RowType rowType, .name("dummy"); } + /** + * Constructs bootstrap pipeline as streaming. + */ + public static DataStream bootstrap( + Configuration conf, + RowType rowType, + int defaultParallelism, + DataStream dataStream) { + return bootstrap(conf, rowType, defaultParallelism, dataStream, false, false); + } + + /** + * Constructs bootstrap pipeline. + * + * @param conf The configuration + * @param rowType The row type + * @param defaultParallelism The default parallelism + * @param dataStream The data stream + * @param bounded Whether the source is bounded + * @param overwrite Whether it is insert overwrite + */ public static DataStream bootstrap( Configuration conf, RowType rowType, int defaultParallelism, DataStream dataStream, - boolean bounded) { - return bounded - ? boundedBootstrap(conf, rowType, defaultParallelism, dataStream) - : streamBootstrap(conf, rowType, defaultParallelism, dataStream); + boolean bounded, + boolean overwrite) { + final boolean globalIndex = conf.getBoolean(FlinkOptions.INDEX_GLOBAL_ENABLED); + if (overwrite) { + return rowDataToHoodieRecord(conf, rowType, dataStream); + } else if (bounded && !globalIndex) { + return boundedBootstrap(conf, rowType, defaultParallelism, dataStream); + } else { + return streamBootstrap(conf, rowType, defaultParallelism, dataStream); + } } private static DataStream streamBootstrap( diff --git a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java index bb545ad896ac9..3e567f31fa63a 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java +++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java @@ -96,7 +96,7 @@ public static void main(String[] args) throws Exception { } } - DataStream hoodieRecordDataStream = Pipelines.bootstrap(conf, rowType, parallelism, dataStream, false); + DataStream hoodieRecordDataStream = Pipelines.bootstrap(conf, rowType, parallelism, dataStream); DataStream pipeline = Pipelines.hoodieStreamWrite(conf, parallelism, hoodieRecordDataStream); if (StreamerUtil.needsAsyncCompaction(conf)) { Pipelines.compact(conf, pipeline); diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java index e5f097c010072..c1e6d0c28aa06 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java @@ -86,7 +86,8 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { DataStream pipeline; // bootstrap - final DataStream hoodieRecordDataStream = Pipelines.bootstrap(conf, rowType, parallelism, dataStream, context.isBounded()); + final DataStream hoodieRecordDataStream = + Pipelines.bootstrap(conf, rowType, parallelism, dataStream, context.isBounded(), overwrite); // write pipeline pipeline = Pipelines.hoodieStreamWrite(conf, parallelism, hoodieRecordDataStream); // compaction diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java b/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java index 2c8fb490a8781..028c058eedafc 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java @@ -168,7 +168,7 @@ public void testMergeOnReadWriteWithCompaction() throws Exception { .map(record -> deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8))) .setParallelism(parallelism); - DataStream hoodieRecordDataStream = Pipelines.bootstrap(conf, rowType, parallelism, dataStream, false); + DataStream hoodieRecordDataStream = Pipelines.bootstrap(conf, rowType, parallelism, dataStream); DataStream pipeline = Pipelines.hoodieStreamWrite(conf, parallelism, hoodieRecordDataStream); Pipelines.clean(conf, pipeline); Pipelines.compact(conf, pipeline); @@ -225,7 +225,7 @@ private void testWriteToHoodie( } int parallelism = execEnv.getParallelism(); - DataStream hoodieRecordDataStream = Pipelines.bootstrap(conf, rowType, parallelism, dataStream, false); + DataStream hoodieRecordDataStream = Pipelines.bootstrap(conf, rowType, parallelism, dataStream); DataStream pipeline = Pipelines.hoodieStreamWrite(conf, parallelism, hoodieRecordDataStream); execEnv.addOperator(pipeline.getTransformation()); diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java index 8e366bba4dfc9..285df4931e1d2 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java +++ b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java @@ -606,6 +606,36 @@ void testBatchUpsertWithMiniBatches(ExecMode execMode, HoodieTableType tableType assertRowsEquals(result, "[+I[id1, Sophia, 18, 1970-01-01T00:00:05, par1]]"); } + @ParameterizedTest + @MethodSource("executionModeAndTableTypeParams") + void testBatchUpsertWithMiniBatchesGlobalIndex(ExecMode execMode, HoodieTableType tableType) { + TableEnvironment tableEnv = execMode == ExecMode.BATCH ? batchTableEnv : streamTableEnv; + String hoodieTableDDL = sql("t1") + .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) + .option(FlinkOptions.WRITE_BATCH_SIZE, "0.001") + .option(FlinkOptions.TABLE_TYPE, tableType) + .option(FlinkOptions.INDEX_GLOBAL_ENABLED, true) + .end(); + tableEnv.executeSql(hoodieTableDDL); + + final String insertInto1 = "insert into t1 values\n" + + "('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1')"; + + execInsertSql(tableEnv, insertInto1); + + final String insertInto2 = "insert into t1 values\n" + + "('id1','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par2'),\n" + + "('id1','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par1'),\n" + + "('id1','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),\n" + + "('id1','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3')"; + + execInsertSql(tableEnv, insertInto2); + + List result = CollectionUtil.iterableToList( + () -> tableEnv.sqlQuery("select * from t1").execute().collect()); + assertRowsEquals(result, "[+I[id1, Sophia, 18, 1970-01-01T00:00:05, par3]]"); + } + @Test void testUpdateWithDefaultHoodieRecordPayload() { TableEnvironment tableEnv = batchTableEnv;