diff --git a/docs/en/connector-v2/sink/Paimon.md b/docs/en/connector-v2/sink/Paimon.md index 17c1328fdec..f854a3be793 100644 --- a/docs/en/connector-v2/sink/Paimon.md +++ b/docs/en/connector-v2/sink/Paimon.md @@ -78,6 +78,7 @@ libfb303-xxx.jar | paimon.hadoop.conf | Map | No | - | Properties in hadoop conf | | paimon.hadoop.conf-path | String | No | - | The specified loading path for the 'core-site.xml', 'hdfs-site.xml', 'hive-site.xml' files | | paimon.table.non-primary-key | Boolean | false | - | Switch to create `table with PK` or `table without PK`. true : `table without PK`, false : `table with PK` | +| branch | String | No | main | The branch name of Paimon table to write data to. If the branch does not exist, an exception will be thrown. | ## Checkpoint in batch mode diff --git a/docs/zh/connector-v2/sink/Paimon.md b/docs/zh/connector-v2/sink/Paimon.md index 08de11bea5a..34d3e8004d3 100644 --- a/docs/zh/connector-v2/sink/Paimon.md +++ b/docs/zh/connector-v2/sink/Paimon.md @@ -77,6 +77,7 @@ libfb303-xxx.jar | paimon.hadoop.conf | Map | 否 | - | Hadoop配置文件属性信息 | | paimon.hadoop.conf-path | 字符串 | 否 | - | Hadoop配置文件目录,用于加载'core-site.xml', 'hdfs-site.xml', 'hive-site.xml'文件配置 | | paimon.table.non-primary-key | Boolean | false | - | 控制创建主键表或者非主键表. 当为true时,创建非主键表, 为false时,创建主键表 | +| branch | 字符串 | 否 | main | 要写入数据的Paimon表分支名称。如果指定的分支不存在,将抛出异常。 | ## 批模式下的checkpoint diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogFactory.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogFactory.java index 0a4cdb00fca..ffe6d2821ea 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogFactory.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogFactory.java @@ -55,7 +55,8 @@ public OptionRule optionRule() { PaimonSinkOptions.DATA_SAVE_MODE, PaimonSinkOptions.PRIMARY_KEYS, PaimonSinkOptions.PARTITION_KEYS, - PaimonSinkOptions.WRITE_PROPS) + PaimonSinkOptions.WRITE_PROPS, + PaimonSinkOptions.BRANCH) .conditional( PaimonBaseOptions.CATALOG_TYPE, PaimonCatalogEnum.HIVE, diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkConfig.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkConfig.java index d54a65b80bf..0093cbd89e3 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkConfig.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkConfig.java @@ -40,6 +40,7 @@ public class PaimonSinkConfig extends PaimonConfig { private final DataSaveMode dataSaveMode; private final CoreOptions.ChangelogProducer changelogProducer; private final String changelogTmpPath; + private final String branch; private final Boolean nonPrimaryKey; private final List primaryKeys; private final List partitionKeys; @@ -79,5 +80,6 @@ public PaimonSinkConfig(ReadonlyConfig readonlyConfig) { this.changelogTmpPath = writeProps.getOrDefault( PaimonSinkOptions.CHANGELOG_TMP_PATH, System.getProperty("java.io.tmpdir")); + this.branch = readonlyConfig.get(PaimonSinkOptions.BRANCH); } } diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkOptions.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkOptions.java index b56215e2d11..e1cd46e14eb 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkOptions.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkOptions.java @@ -68,4 +68,7 @@ public class PaimonSinkOptions extends PaimonBaseOptions { .defaultValue(new HashMap<>()) .withDescription( "Properties passed through to paimon table initialization, such as 'file.format', 'bucket'(org.apache.paimon.CoreOptions)"); + + public static final Option BRANCH = + Options.key("branch").stringType().noDefaultValue().withDescription("branch"); } diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/exception/PaimonConnectorErrorCode.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/exception/PaimonConnectorErrorCode.java index 2f648b6f25f..f369b8cf662 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/exception/PaimonConnectorErrorCode.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/exception/PaimonConnectorErrorCode.java @@ -32,7 +32,8 @@ public enum PaimonConnectorErrorCode implements SeaTunnelErrorCode { WRITE_PROPS_BUCKET_KEY_ERROR("PAIMON-09", "Cannot define 'bucket-key' in dynamic bucket mode"), NON_PRIMARY_KEY_CHECK_ERROR( "PAIMON-10", "Primary keys should be empty when nonPrimaryKey is true"), - DECIMAL_PRECISION_INCOMPATIBLE("PAIMON-11", "decimal type precision is incompatible. "); + DECIMAL_PRECISION_INCOMPATIBLE("PAIMON-11", "decimal type precision is incompatible. "), + BRANCH_NOT_EXISTS("PAIMON-12", "Specified branch: %s does not exist. "); private final String code; private final String description; diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/handler/PaimonSaveModeHandler.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/handler/PaimonSaveModeHandler.java index 7e93ee3512f..a89edc02c59 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/handler/PaimonSaveModeHandler.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/handler/PaimonSaveModeHandler.java @@ -17,6 +17,8 @@ package org.apache.seatunnel.connectors.seatunnel.paimon.handler; +import org.apache.seatunnel.shade.org.apache.commons.lang3.StringUtils; + import org.apache.seatunnel.api.sink.DataSaveMode; import org.apache.seatunnel.api.sink.DefaultSaveModeHandler; import org.apache.seatunnel.api.sink.SchemaSaveMode; @@ -26,6 +28,7 @@ import org.apache.seatunnel.connectors.seatunnel.paimon.catalog.PaimonCatalog; import org.apache.seatunnel.connectors.seatunnel.paimon.sink.SupportLoadTable; +import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; public class PaimonSaveModeHandler extends DefaultSaveModeHandler { @@ -33,6 +36,7 @@ public class PaimonSaveModeHandler extends DefaultSaveModeHandler { private SupportLoadTable supportLoadTable; private Catalog catalog; private CatalogTable catalogTable; + private String branch; public PaimonSaveModeHandler( SupportLoadTable supportLoadTable, @@ -40,11 +44,13 @@ public PaimonSaveModeHandler( DataSaveMode dataSaveMode, Catalog catalog, CatalogTable catalogTable, - String customSql) { + String customSql, + String branch) { super(schemaSaveMode, dataSaveMode, catalog, catalogTable, customSql); this.supportLoadTable = supportLoadTable; this.catalog = catalog; this.catalogTable = catalogTable; + this.branch = branch; } @Override @@ -52,9 +58,11 @@ public void handleSchemaSaveMode() { super.handleSchemaSaveMode(); TablePath tablePath = catalogTable.getTablePath(); Table paimonTable = ((PaimonCatalog) catalog).getPaimonTable(tablePath); - // load paimon table and set it into paimon sink Table loadTable = this.supportLoadTable.getLoadTable(); if (loadTable == null || this.schemaSaveMode == SchemaSaveMode.RECREATE_SCHEMA) { + if (StringUtils.isNotEmpty(branch)) { + paimonTable = ((FileStoreTable) paimonTable).switchToBranch(branch); + } this.supportLoadTable.setLoadTable(paimonTable); } } diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java index 5b185004c60..a6744fc593b 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java @@ -17,6 +17,8 @@ package org.apache.seatunnel.connectors.seatunnel.paimon.sink; +import org.apache.seatunnel.shade.org.apache.commons.lang3.StringUtils; + import org.apache.seatunnel.api.common.JobContext; import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.serialization.DefaultSerializer; @@ -35,6 +37,8 @@ import org.apache.seatunnel.connectors.seatunnel.paimon.catalog.PaimonCatalog; import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonHadoopConfiguration; import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonSinkConfig; +import org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorErrorCode; +import org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException; import org.apache.seatunnel.connectors.seatunnel.paimon.handler.PaimonSaveModeHandler; import org.apache.seatunnel.connectors.seatunnel.paimon.security.PaimonSecurityContext; import org.apache.seatunnel.connectors.seatunnel.paimon.sink.bucket.PaimonBucketAssignerFactory; @@ -43,7 +47,11 @@ import org.apache.seatunnel.connectors.seatunnel.paimon.sink.commit.PaimonCommitInfo; import org.apache.seatunnel.connectors.seatunnel.paimon.sink.state.PaimonSinkState; +import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; +import org.apache.paimon.utils.BranchManager; + +import lombok.extern.slf4j.Slf4j; import java.io.IOException; import java.util.Arrays; @@ -51,6 +59,7 @@ import java.util.Optional; import java.util.UUID; +@Slf4j public class PaimonSink implements SeaTunnelSink< SeaTunnelRow, @@ -66,7 +75,7 @@ public class PaimonSink public static final String PLUGIN_NAME = "Paimon"; - private Table paimonTable; + private FileStoreTable paimonTable; private JobContext jobContext; @@ -92,11 +101,25 @@ public PaimonSink(ReadonlyConfig readonlyConfig, CatalogTable catalogTable) { paimonCatalog.open(); boolean databaseExists = paimonCatalog.databaseExists(this.paimonSinkConfig.getNamespace()); - if (databaseExists) { - TablePath tablePath = catalogTable.getTablePath(); - boolean tableExists = paimonCatalog.tableExists(tablePath); - if (tableExists) { - this.paimonTable = paimonCatalog.getPaimonTable(tablePath); + if (!databaseExists) { + return; + } + TablePath tablePath = catalogTable.getTablePath(); + boolean tableExists = paimonCatalog.tableExists(tablePath); + if (!tableExists) { + return; + } + this.paimonTable = (FileStoreTable) paimonCatalog.getPaimonTable(tablePath); + String branchName = paimonSinkConfig.getBranch(); + if (StringUtils.isNotEmpty(branchName)) { + BranchManager branchManager = paimonTable.branchManager(); + if (!branchManager.branchExists(branchName)) { + throw new PaimonConnectorException( + PaimonConnectorErrorCode.BRANCH_NOT_EXISTS, branchName); + } + if (!branchManager.DEFAULT_MAIN_BRANCH.equalsIgnoreCase(branchName)) { + this.paimonTable = paimonTable.switchToBranch(branchName); + log.info("Switch to branch {}", branchName); } } } @@ -168,12 +191,13 @@ public Optional getSaveModeHandler() { paimonSinkConfig.getDataSaveMode(), paimonCatalog, catalogTable, - null)); + null, + paimonSinkConfig.getBranch())); } @Override public void setLoadTable(Table table) { - this.paimonTable = table; + this.paimonTable = (FileStoreTable) table; } @Override diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkFactory.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkFactory.java index c71641a0377..b316cdc09a1 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkFactory.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkFactory.java @@ -59,6 +59,7 @@ public OptionRule optionRule() { PaimonSinkOptions.PRIMARY_KEYS, PaimonSinkOptions.PARTITION_KEYS, PaimonSinkOptions.WRITE_PROPS, + PaimonSinkOptions.BRANCH, SinkConnectorCommonOptions.MULTI_TABLE_SINK_REPLICA) .conditional( PaimonSinkOptions.CATALOG_TYPE, diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java index 4a20d5d04cf..2d8fad38c0f 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java @@ -18,6 +18,7 @@ package org.apache.seatunnel.connectors.seatunnel.paimon.sink; import org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting; +import org.apache.seatunnel.shade.org.apache.commons.lang3.StringUtils; import org.apache.seatunnel.api.common.JobContext; import org.apache.seatunnel.api.configuration.ReadonlyConfig; @@ -58,6 +59,7 @@ import org.apache.paimon.table.sink.StreamTableWrite; import org.apache.paimon.table.sink.TableCommitImpl; import org.apache.paimon.table.sink.TableWrite; +import org.apache.paimon.utils.BranchManager; import lombok.extern.slf4j.Slf4j; @@ -274,6 +276,18 @@ public void applySchemaChange(SchemaChangeEvent event) throws IOException { private void reOpenTableWrite() { this.seaTunnelRowType = this.sourceTableSchema.toPhysicalRowDataType(); this.paimonTable = (FileStoreTable) paimonCatalog.getPaimonTable(paimonTablePath); + String branchName = paimonSinkConfig.getBranch(); + if (StringUtils.isNotEmpty(branchName)) { + BranchManager branchManager = paimonTable.branchManager(); + if (!branchManager.branchExists(branchName)) { + throw new PaimonConnectorException( + PaimonConnectorErrorCode.BRANCH_NOT_EXISTS, branchName); + } + if (!branchManager.DEFAULT_MAIN_BRANCH.equalsIgnoreCase(branchName)) { + this.paimonTable = this.paimonTable.switchToBranch(branchName); + log.info("Re-switched to branch {} after reopening table", branchName); + } + } this.sinkPaimonTableSchema = this.paimonTable.schema(); this.newTableWrite(); } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonIT.java index d5ff43c10ba..129200e4135 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonIT.java @@ -22,6 +22,7 @@ import org.apache.seatunnel.e2e.common.TestResource; import org.apache.seatunnel.e2e.common.TestSuiteBase; import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory; +import org.apache.seatunnel.e2e.common.container.EngineType; import org.apache.seatunnel.e2e.common.container.TestContainer; import org.apache.seatunnel.e2e.common.container.TestContainerId; import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer; @@ -29,14 +30,24 @@ import org.apache.seatunnel.e2e.common.util.ContainerUtil; import org.apache.commons.collections.CollectionUtils; +import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.catalog.CatalogFactory; import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.data.InternalRow; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.ResolvingFileIO; +import org.apache.paimon.options.Options; import org.apache.paimon.privilege.FileBasedPrivilegeManagerLoader; import org.apache.paimon.privilege.PrivilegeType; import org.apache.paimon.privilege.PrivilegedCatalog; +import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.reader.RecordReaderIterator; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.Table; +import org.apache.paimon.table.source.ReadBuilder; +import org.apache.paimon.table.source.TableRead; +import org.apache.paimon.table.source.TableScan; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; @@ -64,6 +75,17 @@ public class PaimonIT extends TestSuiteBase implements TestResource { private final String DATABASE_NAME = "default"; private final String TABLE_NAME = "st_test_p"; + private static final String NAMESPACE = "paimon"; + protected static String hostName = System.getProperty("user.name"); + protected static final String CONTAINER_VOLUME_MOUNT_PATH = "/tmp/seatunnel_mnt"; + + protected static final boolean isWindows = + System.getProperties().getProperty("os.name").toUpperCase().contains("WINDOWS"); + public static final String HOST_VOLUME_MOUNT_PATH = + isWindows + ? String.format("C:/Users/%s/tmp/seatunnel_mnt", hostName) + : CONTAINER_VOLUME_MOUNT_PATH; + @TestContainerExtension private final ContainerExtendedFactory extendedFactory = container -> { @@ -204,4 +226,63 @@ public void jobFinishedCleanTmpFiles(TestContainer container) throws Exception { List files = FileUtils.listFile(tmpDir); Assertions.assertTrue(CollectionUtils.isEmpty(files)); } + + @DisabledOnContainer( + value = {}, + type = {EngineType.SPARK, EngineType.FLINK}, + disabledReason = + "Spark and Flink engine can not auto create paimon table on worker node in local file(e.g flink tm) by savemode feature which can lead error") + @TestTemplate + public void testSinkBranch(TestContainer container) throws Exception { + + String testBranchName = "test_branch"; + FileStoreTable table = (FileStoreTable) getTable(DATABASE_NAME, TABLE_NAME); + List branches = table.branchManager().branches(); + if (!branches.contains(testBranchName)) { + table.createBranch(testBranchName); + } + Container.ExecResult textWriteResult = container.executeJob("/fake_to_paimon_branch.conf"); + Assertions.assertEquals(0, textWriteResult.getExitCode()); + long rowCount = getTableRowCount(table); + Assertions.assertEquals(0, rowCount); + + FileStoreTable fileStoreTableWithBranch = table.switchToBranch(testBranchName); + rowCount = getTableRowCount(fileStoreTableWithBranch); + Assertions.assertEquals(10001, rowCount); + } + + private Table getTable(String dbName, String tbName) { + Options options = new Options(); + String warehouse = + String.format( + "%s%s/%s", isWindows ? "" : "file://", HOST_VOLUME_MOUNT_PATH, NAMESPACE); + options.set("warehouse", warehouse); + try { + Catalog catalog = CatalogFactory.createCatalog(CatalogContext.create(options)); + return catalog.getTable(Identifier.create(dbName, tbName)); + } catch (Catalog.TableNotExistException e) { + throw new RuntimeException("table not exist"); + } + } + + private long getTableRowCount(FileStoreTable table) { + try { + ReadBuilder readBuilder = table.newReadBuilder(); + TableScan.Plan plan = readBuilder.newScan().plan(); + TableRead tableRead = readBuilder.newRead(); + + long count = 0; + try (RecordReader reader = tableRead.createReader(plan); + RecordReaderIterator iterator = + new RecordReaderIterator<>(reader)) { + while (iterator.hasNext()) { + iterator.next(); + count++; + } + } + return count; + } catch (Exception e) { + throw new RuntimeException("Failed to read data count from table", e); + } + } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon_branch.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon_branch.conf new file mode 100644 index 00000000000..ddff49d3579 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon_branch.conf @@ -0,0 +1,63 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + FakeSource { + auto.increment.enabled = true + auto.increment.start = 1 + row.num = 10001 + schema = { + fields { + pk_id = bigint + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_decimal = "decimal(30, 8)" + c_bytes = bytes + c_date = date + c_timestamp = timestamp + c_time = time + } + primaryKey { + name = "pk_id" + columnNames = [pk_id] + } + } + plugin_output = "fake" + } +} + +sink { + Paimon { + warehouse = "/tmp/seatunnel_mnt/paimon" + database = "default" + table = "st_test_p" + branch = "test_branch" + } +}