diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/HoodiePipeline.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/HoodiePipeline.java
new file mode 100644
index 0000000000000..d23a278876d1e
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/HoodiePipeline.java
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.util;
+
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.table.HoodieTableFactory;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
+import org.apache.flink.table.connector.source.DataStreamScanProvider;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
+import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * A tool class to construct hoodie flink pipeline.
+ *
+ *
How to use ?
+ * Method {@link #builder(String)} returns a pipeline builder. The builder
+ * can then define the hudi table columns, primary keys and partitions.
+ *
+ * An example:
+ *
+ * HoodiePipeline.Builder builder = HoodiePipeline.builder("myTable");
+ * DataStreamSink> sinkStream = builder
+ * .column("f0 int")
+ * .column("f1 varchar(10)")
+ * .column("f2 varchar(20)")
+ * .pk("f0,f1")
+ * .partition("f2")
+ * .sink(input, false);
+ *
+ */
+public class HoodiePipeline {
+
+ private static final Logger LOG = LogManager.getLogger(HoodiePipeline.class);
+
+ /**
+ * Returns the builder for hoodie pipeline construction.
+ */
+ public static Builder builder(String tableName) {
+ return new Builder(tableName);
+ }
+
+ /**
+ * Builder for hudi source/sink pipeline construction.
+ */
+ public static class Builder {
+ private final String tableName;
+ private final List columns;
+ private final Map options;
+
+ private String pk;
+ private List partitions;
+
+ private Builder(String tableName) {
+ this.tableName = tableName;
+ this.columns = new ArrayList<>();
+ this.options = new HashMap<>();
+ this.partitions = new ArrayList<>();
+ }
+
+ /**
+ * Add a table column definition.
+ *
+ * @param column the column format should be in the form like 'f0 int'
+ */
+ public Builder column(String column) {
+ this.columns.add(column);
+ return this;
+ }
+
+ /**
+ * Add primary keys.
+ */
+ public Builder pk(String... pks) {
+ this.pk = String.join(",", pks);
+ return this;
+ }
+
+ /**
+ * Add partition fields.
+ */
+ public Builder partition(String... partitions) {
+ this.partitions = new ArrayList<>(Arrays.asList(partitions));
+ return this;
+ }
+
+ /**
+ * Add a config option.
+ */
+ public Builder option(ConfigOption> option, Object val) {
+ this.options.put(option.key(), val.toString());
+ return this;
+ }
+
+ public Builder option(String key, Object val) {
+ this.options.put(key, val.toString());
+ return this;
+ }
+
+ public Builder options(Map options) {
+ this.options.putAll(options);
+ return this;
+ }
+
+ public DataStreamSink> sink(DataStream input, boolean bounded) {
+ TableDescriptor tableDescriptor = getTableDescriptor();
+ return HoodiePipeline.sink(input, tableDescriptor.getTableId(), tableDescriptor.getResolvedCatalogTable(), bounded);
+ }
+
+ public TableDescriptor getTableDescriptor() {
+ EnvironmentSettings environmentSettings = EnvironmentSettings
+ .newInstance()
+ .build();
+ TableEnvironmentImpl tableEnv = TableEnvironmentImpl.create(environmentSettings);
+ String sql = getCreateHoodieTableDDL(this.tableName, this.columns, this.options, this.pk, this.partitions);
+ tableEnv.executeSql(sql);
+ String currentCatalog = tableEnv.getCurrentCatalog();
+ ResolvedCatalogTable catalogTable = null;
+ String defaultDatabase = null;
+ try {
+ Catalog catalog = tableEnv.getCatalog(currentCatalog).get();
+ defaultDatabase = catalog.getDefaultDatabase();
+ catalogTable = (ResolvedCatalogTable) catalog.getTable(new ObjectPath(defaultDatabase, this.tableName));
+ } catch (TableNotExistException e) {
+ throw new HoodieException("Create table " + this.tableName + " exception", e);
+ }
+ ObjectIdentifier tableId = ObjectIdentifier.of(currentCatalog, defaultDatabase, this.tableName);
+ return new TableDescriptor(tableId, catalogTable);
+ }
+
+ public DataStream source(StreamExecutionEnvironment execEnv) {
+ TableDescriptor tableDescriptor = getTableDescriptor();
+ return HoodiePipeline.source(execEnv, tableDescriptor.tableId, tableDescriptor.getResolvedCatalogTable());
+ }
+ }
+
+ private static String getCreateHoodieTableDDL(
+ String tableName,
+ List fields,
+ Map options,
+ String pkField,
+ List partitionField) {
+ StringBuilder builder = new StringBuilder();
+ builder.append("create table ")
+ .append(tableName)
+ .append("(\n");
+ for (String field : fields) {
+ builder.append(" ")
+ .append(field)
+ .append(",\n");
+ }
+ builder.append(" PRIMARY KEY(")
+ .append(pkField)
+ .append(") NOT ENFORCED\n")
+ .append(")\n");
+ if (!partitionField.isEmpty()) {
+ String partitons = partitionField
+ .stream()
+ .map(partitionName -> "`" + partitionName + "`")
+ .collect(Collectors.joining(","));
+ builder.append("PARTITIONED BY (")
+ .append(partitons)
+ .append(")\n");
+ }
+ builder.append("with ('connector' = 'hudi'");
+ options.forEach((k, v) -> builder
+ .append(",\n")
+ .append(" '")
+ .append(k)
+ .append("' = '")
+ .append(v)
+ .append("'"));
+ builder.append("\n)");
+ return builder.toString();
+ }
+
+ /**
+ * Returns the data stream sink with given catalog table.
+ *
+ * @param input The input datastream
+ * @param tablePath The table path to the hoodie table in the catalog
+ * @param catalogTable The hoodie catalog table
+ * @param isBounded A flag indicating whether the input data stream is bounded
+ */
+ private static DataStreamSink> sink(DataStream input, ObjectIdentifier tablePath, ResolvedCatalogTable catalogTable, boolean isBounded) {
+ FactoryUtil.DefaultDynamicTableContext context = new FactoryUtil.DefaultDynamicTableContext(tablePath, catalogTable,
+ Configuration.fromMap(catalogTable.getOptions()), Thread.currentThread().getContextClassLoader(), false);
+ HoodieTableFactory hoodieTableFactory = new HoodieTableFactory();
+ return ((DataStreamSinkProvider) hoodieTableFactory.createDynamicTableSink(context)
+ .getSinkRuntimeProvider(new SinkRuntimeProviderContext(isBounded)))
+ .consumeDataStream(input);
+ }
+
+ /**
+ * Returns the data stream source with given catalog table.
+ *
+ * @param execEnv The execution environment
+ * @param tablePath The table path to the hoodie table in the catalog
+ * @param catalogTable The hoodie catalog table
+ */
+ private static DataStream source(StreamExecutionEnvironment execEnv, ObjectIdentifier tablePath, ResolvedCatalogTable catalogTable) {
+ FactoryUtil.DefaultDynamicTableContext context = new FactoryUtil.DefaultDynamicTableContext(tablePath, catalogTable,
+ Configuration.fromMap(catalogTable.getOptions()), Thread.currentThread().getContextClassLoader(), false);
+ HoodieTableFactory hoodieTableFactory = new HoodieTableFactory();
+ DataStreamScanProvider dataStreamScanProvider = (DataStreamScanProvider) ((ScanTableSource) hoodieTableFactory
+ .createDynamicTableSource(context))
+ .getScanRuntimeProvider(new ScanRuntimeProviderContext());
+ return dataStreamScanProvider.produceDataStream(execEnv);
+ }
+
+ /***
+ * A POJO that contains tableId and resolvedCatalogTable.
+ */
+ public static class TableDescriptor {
+ private ObjectIdentifier tableId;
+ private ResolvedCatalogTable resolvedCatalogTable;
+
+ public TableDescriptor(ObjectIdentifier tableId, ResolvedCatalogTable resolvedCatalogTable) {
+ this.tableId = tableId;
+ this.resolvedCatalogTable = resolvedCatalogTable;
+ }
+
+ public ObjectIdentifier getTableId() {
+ return tableId;
+ }
+
+ public ResolvedCatalogTable getResolvedCatalogTable() {
+ return resolvedCatalogTable;
+ }
+ }
+}
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
index a25f0149c149b..9ed0dfb807eb5 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
@@ -26,9 +26,11 @@
import org.apache.hudi.sink.transform.Transformer;
import org.apache.hudi.sink.utils.Pipelines;
import org.apache.hudi.util.AvroSchemaConverter;
+import org.apache.hudi.util.HoodiePipeline;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.TestConfigurations;
import org.apache.hudi.utils.TestData;
+import org.apache.hudi.utils.TestUtils;
import org.apache.hudi.utils.source.ContinuousFileSource;
import org.apache.flink.api.common.JobStatus;
@@ -57,6 +59,7 @@
import java.io.File;
import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
@@ -326,4 +329,127 @@ private void testWriteToHoodieWithCluster(
TestData.checkWrittenFullData(tempFile, expected);
}
+
+ public void execute(StreamExecutionEnvironment execEnv, boolean isMor, String jobName) throws Exception {
+ JobClient client = execEnv.executeAsync(jobName);
+ if (isMor) {
+ if (client.getJobStatus().get() != JobStatus.FAILED) {
+ try {
+ TimeUnit.SECONDS.sleep(20); // wait long enough for the compaction to finish
+ client.cancel();
+ } catch (Throwable var1) {
+ // ignored
+ }
+ }
+ } else {
+ // wait for the streaming job to finish
+ client.getJobExecutionResult().get();
+ }
+ }
+
+ @Test
+ public void testHoodiePipelineBuilderSource() throws Exception {
+ //create a StreamExecutionEnvironment instance.
+ StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+ execEnv.getConfig().disableObjectReuse();
+ execEnv.setParallelism(1);
+ // set up checkpoint interval
+ execEnv.enableCheckpointing(4000, CheckpointingMode.EXACTLY_ONCE);
+ execEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
+ Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+ conf.setString(FlinkOptions.TABLE_NAME, "t1");
+ conf.setString(FlinkOptions.TABLE_TYPE, "MERGE_ON_READ");
+
+ // write 3 batches of data set
+ TestData.writeData(TestData.dataSetInsert(1, 2), conf);
+ TestData.writeData(TestData.dataSetInsert(3, 4), conf);
+ TestData.writeData(TestData.dataSetInsert(5, 6), conf);
+
+ String latestCommit = TestUtils.getLastCompleteInstant(tempFile.getAbsolutePath());
+
+ Map options = new HashMap<>();
+ options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
+ options.put(FlinkOptions.READ_START_COMMIT.key(), latestCommit);
+
+ //read a hoodie table use low-level source api.
+ HoodiePipeline.Builder builder = HoodiePipeline.builder("test_source")
+ .column("uuid string not null")
+ .column("name string")
+ .column("age int")
+ .column("`ts` timestamp(3)")
+ .column("`partition` string")
+ .pk("uuid")
+ .partition("partition")
+ .options(options);
+ DataStream rowDataDataStream = builder.source(execEnv);
+ List result = new ArrayList<>();
+ rowDataDataStream.executeAndCollect().forEachRemaining(result::add);
+ TimeUnit.SECONDS.sleep(2);//sleep 2 second for collect data
+ TestData.assertRowDataEquals(result, TestData.dataSetInsert(5, 6));
+ }
+
+ @Test
+ public void testHoodiePipelineBuilderSink() throws Exception {
+ StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+ Map options = new HashMap<>();
+ execEnv.getConfig().disableObjectReuse();
+ execEnv.setParallelism(4);
+ // set up checkpoint interval
+ execEnv.enableCheckpointing(4000, CheckpointingMode.EXACTLY_ONCE);
+ execEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
+
+ options.put(FlinkOptions.INDEX_TYPE.key(), "FLINK_STATE");
+ options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
+ options.put(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS.key(), "4");
+ options.put("table.type", HoodieTableType.MERGE_ON_READ.name());
+ options.put(FlinkOptions.INDEX_KEY_FIELD.key(), "id");
+ options.put(FlinkOptions.COMPACTION_DELTA_COMMITS.key(), "1");
+ options.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name());
+ options.put(FlinkOptions.SOURCE_AVRO_SCHEMA_PATH.key(), Objects.requireNonNull(Thread.currentThread().getContextClassLoader().getResource("test_read_schema.avsc")).toString());
+ Configuration conf = Configuration.fromMap(options);
+ // Read from file source
+ RowType rowType =
+ (RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf))
+ .getLogicalType();
+
+ JsonRowDataDeserializationSchema deserializationSchema = new JsonRowDataDeserializationSchema(
+ rowType,
+ InternalTypeInfo.of(rowType),
+ false,
+ true,
+ TimestampFormat.ISO_8601
+ );
+ String sourcePath = Objects.requireNonNull(Thread.currentThread()
+ .getContextClassLoader().getResource("test_source.data")).toString();
+
+ TextInputFormat format = new TextInputFormat(new Path(sourcePath));
+ format.setFilesFilter(FilePathFilter.createDefaultFilter());
+ TypeInformation typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
+ format.setCharsetName("UTF-8");
+
+ DataStream dataStream = execEnv
+ // use PROCESS_CONTINUOUSLY mode to trigger checkpoint
+ .readFile(format, sourcePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000, typeInfo)
+ .map(record -> deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8)))
+ .setParallelism(1);
+
+
+
+ //sink to hoodie table use low-level sink api.
+ HoodiePipeline.Builder builder = HoodiePipeline.builder("test_sink")
+ .column("uuid string not null")
+ .column("name string")
+ .column("age int")
+ .column("`ts` timestamp(3)")
+ .column("`partition` string")
+ .pk("uuid")
+ .partition("partition")
+ .options(options);
+
+ builder.sink(dataStream, false);
+
+ execute(execEnv, true, "Api_Sink_Test");
+ TestData.checkWrittenFullData(tempFile, EXPECTED);
+ }
+
}