Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/en/connector-v2/sink/Paimon.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ libfb303-xxx.jar
| paimon.hadoop.conf | Map | No | - | Properties in hadoop conf |
| paimon.hadoop.conf-path | String | No | - | The specified loading path for the 'core-site.xml', 'hdfs-site.xml', 'hive-site.xml' files |
| paimon.table.non-primary-key | Boolean | false | - | Switch to create `table with PK` or `table without PK`. true : `table without PK`, false : `table with PK` |
| branch | String | No | main | The branch name of Paimon table to write data to. If the branch does not exist, an exception will be thrown. |


## Checkpoint in batch mode
Expand Down
1 change: 1 addition & 0 deletions docs/zh/connector-v2/sink/Paimon.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ libfb303-xxx.jar
| paimon.hadoop.conf | Map | 否 | - | Hadoop配置文件属性信息 |
| paimon.hadoop.conf-path | 字符串 | 否 | - | Hadoop配置文件目录,用于加载'core-site.xml', 'hdfs-site.xml', 'hive-site.xml'文件配置 |
| paimon.table.non-primary-key | Boolean | false | - | 控制创建主键表或者非主键表. 当为true时,创建非主键表, 为false时,创建主键表 |
| branch | 字符串 | 否 | main | 要写入数据的Paimon表分支名称。如果指定的分支不存在,将抛出异常。 |

## 批模式下的checkpoint

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ public OptionRule optionRule() {
PaimonSinkOptions.DATA_SAVE_MODE,
PaimonSinkOptions.PRIMARY_KEYS,
PaimonSinkOptions.PARTITION_KEYS,
PaimonSinkOptions.WRITE_PROPS)
PaimonSinkOptions.WRITE_PROPS,
PaimonSinkOptions.BRANCH)
.conditional(
PaimonBaseOptions.CATALOG_TYPE,
PaimonCatalogEnum.HIVE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class PaimonSinkConfig extends PaimonConfig {
private final DataSaveMode dataSaveMode;
private final CoreOptions.ChangelogProducer changelogProducer;
private final String changelogTmpPath;
private final String branch;
private final Boolean nonPrimaryKey;
private final List<String> primaryKeys;
private final List<String> partitionKeys;
Expand Down Expand Up @@ -79,5 +80,6 @@ public PaimonSinkConfig(ReadonlyConfig readonlyConfig) {
this.changelogTmpPath =
writeProps.getOrDefault(
PaimonSinkOptions.CHANGELOG_TMP_PATH, System.getProperty("java.io.tmpdir"));
this.branch = readonlyConfig.get(PaimonSinkOptions.BRANCH);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,7 @@ public class PaimonSinkOptions extends PaimonBaseOptions {
.defaultValue(new HashMap<>())
.withDescription(
"Properties passed through to paimon table initialization, such as 'file.format', 'bucket'(org.apache.paimon.CoreOptions)");

public static final Option<String> BRANCH =
Options.key("branch").stringType().noDefaultValue().withDescription("branch");
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ public enum PaimonConnectorErrorCode implements SeaTunnelErrorCode {
WRITE_PROPS_BUCKET_KEY_ERROR("PAIMON-09", "Cannot define 'bucket-key' in dynamic bucket mode"),
NON_PRIMARY_KEY_CHECK_ERROR(
"PAIMON-10", "Primary keys should be empty when nonPrimaryKey is true"),
DECIMAL_PRECISION_INCOMPATIBLE("PAIMON-11", "decimal type precision is incompatible. ");
DECIMAL_PRECISION_INCOMPATIBLE("PAIMON-11", "decimal type precision is incompatible. "),
BRANCH_NOT_EXISTS("PAIMON-12", "Specified branch: %s does not exist. ");

private final String code;
private final String description;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.seatunnel.connectors.seatunnel.paimon.handler;

import org.apache.seatunnel.shade.org.apache.commons.lang3.StringUtils;

import org.apache.seatunnel.api.sink.DataSaveMode;
import org.apache.seatunnel.api.sink.DefaultSaveModeHandler;
import org.apache.seatunnel.api.sink.SchemaSaveMode;
Expand All @@ -26,35 +28,41 @@
import org.apache.seatunnel.connectors.seatunnel.paimon.catalog.PaimonCatalog;
import org.apache.seatunnel.connectors.seatunnel.paimon.sink.SupportLoadTable;

import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;

public class PaimonSaveModeHandler extends DefaultSaveModeHandler {

private SupportLoadTable<Table> supportLoadTable;
private Catalog catalog;
private CatalogTable catalogTable;
private String branch;

public PaimonSaveModeHandler(
SupportLoadTable supportLoadTable,
SchemaSaveMode schemaSaveMode,
DataSaveMode dataSaveMode,
Catalog catalog,
CatalogTable catalogTable,
String customSql) {
String customSql,
String branch) {
super(schemaSaveMode, dataSaveMode, catalog, catalogTable, customSql);
this.supportLoadTable = supportLoadTable;
this.catalog = catalog;
this.catalogTable = catalogTable;
this.branch = branch;
}

@Override
public void handleSchemaSaveMode() {
super.handleSchemaSaveMode();
TablePath tablePath = catalogTable.getTablePath();
Table paimonTable = ((PaimonCatalog) catalog).getPaimonTable(tablePath);
// load paimon table and set it into paimon sink
Table loadTable = this.supportLoadTable.getLoadTable();
if (loadTable == null || this.schemaSaveMode == SchemaSaveMode.RECREATE_SCHEMA) {
if (StringUtils.isNotEmpty(branch)) {
paimonTable = ((FileStoreTable) paimonTable).switchToBranch(branch);
}
this.supportLoadTable.setLoadTable(paimonTable);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.seatunnel.connectors.seatunnel.paimon.sink;

import org.apache.seatunnel.shade.org.apache.commons.lang3.StringUtils;

import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.serialization.DefaultSerializer;
Expand All @@ -35,6 +37,8 @@
import org.apache.seatunnel.connectors.seatunnel.paimon.catalog.PaimonCatalog;
import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonHadoopConfiguration;
import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonSinkConfig;
import org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException;
import org.apache.seatunnel.connectors.seatunnel.paimon.handler.PaimonSaveModeHandler;
import org.apache.seatunnel.connectors.seatunnel.paimon.security.PaimonSecurityContext;
import org.apache.seatunnel.connectors.seatunnel.paimon.sink.bucket.PaimonBucketAssignerFactory;
Expand All @@ -43,14 +47,19 @@
import org.apache.seatunnel.connectors.seatunnel.paimon.sink.commit.PaimonCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.paimon.sink.state.PaimonSinkState;

import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.utils.BranchManager;

import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.UUID;

@Slf4j
public class PaimonSink
implements SeaTunnelSink<
SeaTunnelRow,
Expand All @@ -66,7 +75,7 @@ public class PaimonSink

public static final String PLUGIN_NAME = "Paimon";

private Table paimonTable;
private FileStoreTable paimonTable;

private JobContext jobContext;

Expand All @@ -92,11 +101,25 @@ public PaimonSink(ReadonlyConfig readonlyConfig, CatalogTable catalogTable) {
paimonCatalog.open();
boolean databaseExists =
paimonCatalog.databaseExists(this.paimonSinkConfig.getNamespace());
if (databaseExists) {
TablePath tablePath = catalogTable.getTablePath();
boolean tableExists = paimonCatalog.tableExists(tablePath);
if (tableExists) {
this.paimonTable = paimonCatalog.getPaimonTable(tablePath);
if (!databaseExists) {
return;
}
TablePath tablePath = catalogTable.getTablePath();
boolean tableExists = paimonCatalog.tableExists(tablePath);
if (!tableExists) {
return;
}
this.paimonTable = (FileStoreTable) paimonCatalog.getPaimonTable(tablePath);
String branchName = paimonSinkConfig.getBranch();
if (StringUtils.isNotEmpty(branchName)) {
BranchManager branchManager = paimonTable.branchManager();
if (!branchManager.branchExists(branchName)) {
throw new PaimonConnectorException(
PaimonConnectorErrorCode.BRANCH_NOT_EXISTS, branchName);
}
if (!branchManager.DEFAULT_MAIN_BRANCH.equalsIgnoreCase(branchName)) {
this.paimonTable = paimonTable.switchToBranch(branchName);
log.info("Switch to branch {}", branchName);
}
}
}
Expand Down Expand Up @@ -168,12 +191,13 @@ public Optional<SaveModeHandler> getSaveModeHandler() {
paimonSinkConfig.getDataSaveMode(),
paimonCatalog,
catalogTable,
null));
null,
paimonSinkConfig.getBranch()));
}

@Override
public void setLoadTable(Table table) {
this.paimonTable = table;
this.paimonTable = (FileStoreTable) table;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public OptionRule optionRule() {
PaimonSinkOptions.PRIMARY_KEYS,
PaimonSinkOptions.PARTITION_KEYS,
PaimonSinkOptions.WRITE_PROPS,
PaimonSinkOptions.BRANCH,
SinkConnectorCommonOptions.MULTI_TABLE_SINK_REPLICA)
.conditional(
PaimonSinkOptions.CATALOG_TYPE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.paimon.sink;

import org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;
import org.apache.seatunnel.shade.org.apache.commons.lang3.StringUtils;

import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
Expand Down Expand Up @@ -58,6 +59,7 @@
import org.apache.paimon.table.sink.StreamTableWrite;
import org.apache.paimon.table.sink.TableCommitImpl;
import org.apache.paimon.table.sink.TableWrite;
import org.apache.paimon.utils.BranchManager;

import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -274,6 +276,18 @@ public void applySchemaChange(SchemaChangeEvent event) throws IOException {
private void reOpenTableWrite() {
this.seaTunnelRowType = this.sourceTableSchema.toPhysicalRowDataType();
this.paimonTable = (FileStoreTable) paimonCatalog.getPaimonTable(paimonTablePath);
String branchName = paimonSinkConfig.getBranch();
if (StringUtils.isNotEmpty(branchName)) {
BranchManager branchManager = paimonTable.branchManager();
if (!branchManager.branchExists(branchName)) {
throw new PaimonConnectorException(
PaimonConnectorErrorCode.BRANCH_NOT_EXISTS, branchName);
}
if (!branchManager.DEFAULT_MAIN_BRANCH.equalsIgnoreCase(branchName)) {
this.paimonTable = this.paimonTable.switchToBranch(branchName);
log.info("Re-switched to branch {} after reopening table", branchName);
}
}
this.sinkPaimonTableSchema = this.paimonTable.schema();
this.newTableWrite();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,32 @@
import org.apache.seatunnel.e2e.common.TestResource;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
import org.apache.seatunnel.e2e.common.container.EngineType;
import org.apache.seatunnel.e2e.common.container.TestContainer;
import org.apache.seatunnel.e2e.common.container.TestContainerId;
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
import org.apache.seatunnel.e2e.common.util.ContainerUtil;

import org.apache.commons.collections.CollectionUtils;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.ResolvingFileIO;
import org.apache.paimon.options.Options;
import org.apache.paimon.privilege.FileBasedPrivilegeManagerLoader;
import org.apache.paimon.privilege.PrivilegeType;
import org.apache.paimon.privilege.PrivilegedCatalog;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.reader.RecordReaderIterator;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.table.source.TableScan;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
Expand Down Expand Up @@ -64,6 +75,17 @@ public class PaimonIT extends TestSuiteBase implements TestResource {
private final String DATABASE_NAME = "default";
private final String TABLE_NAME = "st_test_p";

private static final String NAMESPACE = "paimon";
protected static String hostName = System.getProperty("user.name");
protected static final String CONTAINER_VOLUME_MOUNT_PATH = "/tmp/seatunnel_mnt";

protected static final boolean isWindows =
System.getProperties().getProperty("os.name").toUpperCase().contains("WINDOWS");
public static final String HOST_VOLUME_MOUNT_PATH =
isWindows
? String.format("C:/Users/%s/tmp/seatunnel_mnt", hostName)
: CONTAINER_VOLUME_MOUNT_PATH;

@TestContainerExtension
private final ContainerExtendedFactory extendedFactory =
container -> {
Expand Down Expand Up @@ -204,4 +226,63 @@ public void jobFinishedCleanTmpFiles(TestContainer container) throws Exception {
List<File> files = FileUtils.listFile(tmpDir);
Assertions.assertTrue(CollectionUtils.isEmpty(files));
}

@DisabledOnContainer(
value = {},
type = {EngineType.SPARK, EngineType.FLINK},
disabledReason =
"Spark and Flink engine can not auto create paimon table on worker node in local file(e.g flink tm) by savemode feature which can lead error")
@TestTemplate
public void testSinkBranch(TestContainer container) throws Exception {

String testBranchName = "test_branch";
FileStoreTable table = (FileStoreTable) getTable(DATABASE_NAME, TABLE_NAME);
List<String> branches = table.branchManager().branches();
if (!branches.contains(testBranchName)) {
table.createBranch(testBranchName);
}
Container.ExecResult textWriteResult = container.executeJob("/fake_to_paimon_branch.conf");
Assertions.assertEquals(0, textWriteResult.getExitCode());
long rowCount = getTableRowCount(table);
Assertions.assertEquals(0, rowCount);

FileStoreTable fileStoreTableWithBranch = table.switchToBranch(testBranchName);
rowCount = getTableRowCount(fileStoreTableWithBranch);
Assertions.assertEquals(10001, rowCount);
}

private Table getTable(String dbName, String tbName) {
Options options = new Options();
String warehouse =
String.format(
"%s%s/%s", isWindows ? "" : "file://", HOST_VOLUME_MOUNT_PATH, NAMESPACE);
options.set("warehouse", warehouse);
try {
Catalog catalog = CatalogFactory.createCatalog(CatalogContext.create(options));
return catalog.getTable(Identifier.create(dbName, tbName));
} catch (Catalog.TableNotExistException e) {
throw new RuntimeException("table not exist");
}
}

private long getTableRowCount(FileStoreTable table) {
try {
ReadBuilder readBuilder = table.newReadBuilder();
TableScan.Plan plan = readBuilder.newScan().plan();
TableRead tableRead = readBuilder.newRead();

long count = 0;
try (RecordReader<InternalRow> reader = tableRead.createReader(plan);
RecordReaderIterator<InternalRow> iterator =
new RecordReaderIterator<>(reader)) {
while (iterator.hasNext()) {
iterator.next();
count++;
}
}
return count;
} catch (Exception e) {
throw new RuntimeException("Failed to read data count from table", e);
}
}
}
Loading