From 0ef26edd830478ddcb769191b34ae423cdc50393 Mon Sep 17 00:00:00 2001 From: awol2005ex Date: Tue, 21 Jan 2025 13:41:14 +0800 Subject: [PATCH 1/9] [feature][plugin][paimonwriter] add paimon format file writer --- docs/writer/paimonwriter.md | 88 ++++++ package.xml | 8 + plugin/writer/paimonwriter/package.xml | 37 +++ plugin/writer/paimonwriter/pom.xml | 225 ++++++++++++++ .../writer/paimonwriter/PaimonHelper.java | 53 ++++ .../writer/paimonwriter/PaimonWriter.java | 291 ++++++++++++++++++ .../src/main/resources/plugin.json | 6 + .../main/resources/plugin_job_template.json | 13 + pom.xml | 1 + 9 files changed, 722 insertions(+) create mode 100644 docs/writer/paimonwriter.md create mode 100644 plugin/writer/paimonwriter/package.xml create mode 100644 plugin/writer/paimonwriter/pom.xml create mode 100644 plugin/writer/paimonwriter/src/main/java/com/wgzhao/addax/plugin/writer/paimonwriter/PaimonHelper.java create mode 100644 plugin/writer/paimonwriter/src/main/java/com/wgzhao/addax/plugin/writer/paimonwriter/PaimonWriter.java create mode 100644 plugin/writer/paimonwriter/src/main/resources/plugin.json create mode 100644 plugin/writer/paimonwriter/src/main/resources/plugin_job_template.json diff --git a/docs/writer/paimonwriter.md b/docs/writer/paimonwriter.md new file mode 100644 index 000000000..a1973c1ea --- /dev/null +++ b/docs/writer/paimonwriter.md @@ -0,0 +1,88 @@ +# Paimon Writer + +Paimon Writer 提供向 已有的paimon表写入数据的能力。 + +## 配置样例 + +```json +--8<-- "jobs/paimonwriter.json" +``` + +## 参数说明 + +| 配置项 | 是否必须 | 数据类型 | 默认值 | 说明 | +|:-------------|:----:|--------|----|------------------------------------------------| +| dbName | 是 | string | 无 | 要写入的paimon数据库名 | +| tableName | 是 | string | 无 | 要写入的paimon表名 | +| writeMode | 是 | string | 无 | 写入模式,详述见下 | +| paimonConfig | 是 | json | {} | 里可以配置与 Paimon catalog和Hadoop 相关的一些高级参数,比如HA的配置 | + + + +### writeMode + +写入前数据清理处理模式: + +- append,写入前不做任何处理,直接写入,不清除原来的数据。 +- truncate 写入前先清空表,再写入。 + +### paimonConfig + +`paimonConfig` 里可以配置与 Paimon catalog和Hadoop 相关的一些高级参数,比如HA的配置 +```json +{ + "name": "paimonwriter", + "parameter": { + "dbName": "test", + "tableName": "test2", + "writeMode": "truncate", + "paimonConfig": { + "warehouse": "file:///g:/paimon", + "metastore": "filesystem" + } + } +} +``` +```json +{ + "paimonConfig": { + "warehouse": "hdfs://nameservice1/user/hive/paimon", + "metastore": "filesystem", + "fs.defaultFS":"hdfs://nameservice1", + "hadoop.security.authentication" : "kerberos", + "hadoop.kerberos.principal" : "hive/_HOST@XXXX.COM", + "hadoop.kerberos.keytab" : "/tmp/hive@XXXX.COM.keytab", + "ha.zookeeper.quorum" : "test-pr-nn1:2181,test-pr-nn2:2181,test-pr-nn3:2181", + "dfs.nameservices" : "nameservice1", + "dfs.namenode.rpc-address.nameservice1.namenode371" : "test-pr-nn2:8020", + "dfs.namenode.rpc-address.nameservice1.namenode265": "test-pr-nn1:8020", + "dfs.namenode.keytab.file" : "/tmp/hdfs@XXXX.COM.keytab", + "dfs.namenode.keytab.enabled" : "true", + "dfs.namenode.kerberos.principal" : "hdfs/_HOST@XXXX.COM", + "dfs.namenode.kerberos.internal.spnego.principal" : "HTTP/_HOST@XXXX.COM", + "dfs.ha.namenodes.nameservice1" : "namenode265,namenode371", + "dfs.datanode.keytab.file" : "/tmp/hdfs@XXXX.COM.keytab", + "dfs.datanode.keytab.enabled" : "true", + "dfs.datanode.kerberos.principal" : "hdfs/_HOST@XXXX.COM", + "dfs.client.use.datanode.hostname" : "false", + "dfs.client.failover.proxy.provider.nameservice1" : "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider", + "dfs.balancer.keytab.file" : "/tmp/hdfs@XXXX.COM.keytab", + "dfs.balancer.keytab.enabled" : "true", + "dfs.balancer.kerberos.principal" : "hdfs/_HOST@XXXX.COM" + } +} +``` + + +## 类型转换 + +| Addax 内部类型 | Paimon 数据类型 | +|------------|------------------------------| +| Integer | TINYINT,SMALLINT,INT,INTEGER | +| Long | BIGINT | +| Double | FLOAT,DOUBLE,DECIMAL | +| String | STRING,VARCHAR,CHAR | +| Boolean | BOOLEAN | +| Date | DATE,TIMESTAMP | +| Bytes | BINARY | + diff --git a/package.xml b/package.xml index f787ad819..b5a82b669 100644 --- a/package.xml +++ b/package.xml @@ -609,5 +609,13 @@ 0644 addax-${project.version} + + plugin/writer/paimonwriter/target/paimonwriter-${project.version}/ + + **/*.* + + 0644 + addax-${project.version} + diff --git a/plugin/writer/paimonwriter/package.xml b/plugin/writer/paimonwriter/package.xml new file mode 100644 index 000000000..8ecab9f9f --- /dev/null +++ b/plugin/writer/paimonwriter/package.xml @@ -0,0 +1,37 @@ + + release + + dir + + false + + + src/main/resources + + *.json + + plugin/writer/${project.artifactId} + + + target/ + + ${project.artifactId}-${project.version}.jar + + plugin/writer/${project.artifactId} + + + + + + false + plugin/writer/${project.artifactId}/libs + runtime + + com.wgzhao.addax:* + + + + diff --git a/plugin/writer/paimonwriter/pom.xml b/plugin/writer/paimonwriter/pom.xml new file mode 100644 index 000000000..3aa053801 --- /dev/null +++ b/plugin/writer/paimonwriter/pom.xml @@ -0,0 +1,225 @@ + + + 4.0.0 + + + com.wgzhao.addax + addax-all + 4.2.3-SNAPSHOT + ../../../pom.xml + + + paimonwriter + paimon-writer + PaimonWriter提供了本地写入paimon格式文件功能,建议开发、测试环境使用。 + jar + + + + com.wgzhao.addax + addax-common + ${project.version} + + + slf4j-log4j12 + org.slf4j + + + + + + com.wgzhao.addax + addax-storage + ${project.version} + + + + org.apache.paimon + paimon-bundle + 1.0.0 + + + + + org.apache.hadoop + hadoop-common + ${hadoop.version} + + + com.fasterxml.jackson.core + jackson-databind + + + org.codehaus.jackson + jackson-core-asl + + + org.codehaus.jackson + jackson-mapper-asl + + + com.fasterxml.woodstox + woodstox-core + + + commons-codec + commons-codec + + + commons-net + commons-net + + + io.netty + netty + + + log4j + log4j + + + net.minidev + json-smart + + + org.codehaus.jettison + jettison + + + org.eclipse.jetty + jetty-server + + + org.xerial.snappy + snappy-java + + + org.apache.zookeeper + zookeeper + + + org.eclipse.jetty + jetty-util + + + + + + org.apache.hadoop + hadoop-aws + ${hadoop.version} + + + com.fasterxml.jackson.core + jackson-databind + + + org.codehaus.jackson + jackson-core-asl + + + org.codehaus.jackson + jackson-mapper-asl + + + com.fasterxml.woodstox + woodstox-core + + + commons-codec + commons-codec + + + commons-net + commons-net + + + io.netty + netty + + + log4j + log4j + + + net.minidev + json-smart + + + org.codehaus.jettison + jettison + + + org.eclipse.jetty + jetty-server + + + org.xerial.snappy + snappy-java + + + org.apache.zookeeper + zookeeper + + + org.eclipse.jetty + jetty-util + + + + + + org.apache.hadoop + hadoop-mapreduce-client-core + ${hadoop.version} + + + com.fasterxml.jackson.core + jackson-databind + + + commons-codec + commons-codec + + + io.netty + netty + + + org.eclipse.jetty + jetty-util + + + + + + + com.fasterxml.woodstox + woodstox-core + ${woodstox.version} + + + + + + + maven-assembly-plugin + + + package.xml + + ${project.artifactId}-${project.version} + + + + release + package + + single + + + + + + + diff --git a/plugin/writer/paimonwriter/src/main/java/com/wgzhao/addax/plugin/writer/paimonwriter/PaimonHelper.java b/plugin/writer/paimonwriter/src/main/java/com/wgzhao/addax/plugin/writer/paimonwriter/PaimonHelper.java new file mode 100644 index 000000000..187080ead --- /dev/null +++ b/plugin/writer/paimonwriter/src/main/java/com/wgzhao/addax/plugin/writer/paimonwriter/PaimonHelper.java @@ -0,0 +1,53 @@ +package com.wgzhao.addax.plugin.writer.paimonwriter; + +import com.wgzhao.addax.common.exception.AddaxException; +import com.wgzhao.addax.common.util.Configuration; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.paimon.catalog.CatalogContext; +import org.apache.paimon.options.Options; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static com.wgzhao.addax.common.spi.ErrorCode.LOGIN_ERROR; + +public class PaimonHelper { + private static final Logger LOG = LoggerFactory.getLogger(PaimonHelper.class); + + public static void kerberosAuthentication(org.apache.hadoop.conf.Configuration hadoopConf, String kerberosPrincipal, String kerberosKeytabFilePath) throws Exception { + if (StringUtils.isNotBlank(kerberosPrincipal) && StringUtils.isNotBlank(kerberosKeytabFilePath)) { + UserGroupInformation.setConfiguration(hadoopConf); + try { + UserGroupInformation.loginUserFromKeytab(kerberosPrincipal, kerberosKeytabFilePath); + } catch (Exception e) { + String message = String.format("kerberos authentication failed, keytab file: [%s], principal: [%s]", + kerberosKeytabFilePath, kerberosPrincipal); + LOG.error(message); + throw AddaxException.asAddaxException(LOGIN_ERROR, e); + } + } + } + + public static Options getOptions(Configuration conf){ + Options options = new Options(); + conf.getMap("paimonConfig").forEach((k, v) -> options.set(k, String.valueOf(v))); + return options; + } + + public static CatalogContext getCatalogContext(Options options) { + CatalogContext context = null; + if(options.get("warehouse").startsWith("hdfs://")){ + + org.apache.hadoop.conf.Configuration hadoopConf = new org.apache.hadoop.conf.Configuration(); + options.toMap().forEach((k, v) -> hadoopConf.set(k, String.valueOf(v))); + UserGroupInformation.setConfiguration(hadoopConf); + context = CatalogContext.create(options,hadoopConf); + } else { + + context = CatalogContext.create(options); + } + + return context; + } + +} diff --git a/plugin/writer/paimonwriter/src/main/java/com/wgzhao/addax/plugin/writer/paimonwriter/PaimonWriter.java b/plugin/writer/paimonwriter/src/main/java/com/wgzhao/addax/plugin/writer/paimonwriter/PaimonWriter.java new file mode 100644 index 000000000..c02ba3542 --- /dev/null +++ b/plugin/writer/paimonwriter/src/main/java/com/wgzhao/addax/plugin/writer/paimonwriter/PaimonWriter.java @@ -0,0 +1,291 @@ +package com.wgzhao.addax.plugin.writer.paimonwriter; + +import com.alibaba.fastjson2.JSON; +import com.wgzhao.addax.common.element.Column; +import com.wgzhao.addax.common.element.Record; +import com.wgzhao.addax.common.plugin.RecordReceiver; +import com.wgzhao.addax.common.spi.Writer; +import com.wgzhao.addax.common.util.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogContext; +import org.apache.paimon.catalog.CatalogFactory; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.data.*; +import org.apache.paimon.options.Options; +import org.apache.paimon.table.Table; +import org.apache.paimon.table.sink.BatchTableCommit; +import org.apache.paimon.table.sink.BatchTableWrite; +import org.apache.paimon.table.sink.BatchWriteBuilder; +import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.DataTypeRoot; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static com.wgzhao.addax.common.base.Key.KERBEROS_KEYTAB_FILE_PATH; +import static com.wgzhao.addax.common.base.Key.KERBEROS_PRINCIPAL; + + +public class PaimonWriter extends Writer { + public static class Job + extends Writer.Job { + private static final Logger LOG = LoggerFactory.getLogger(Job.class); + private Configuration conf = null; + private BatchWriteBuilder writeBuilder = null; + + @Override + public void init() { + this.conf = this.getPluginJobConf(); + + Options options = PaimonHelper.getOptions(this.conf); + CatalogContext context = PaimonHelper.getCatalogContext(options); + + + + if ("kerberos".equals(options.get("hadoop.security.authentication"))) { + String kerberosKeytabFilePath = options.get(KERBEROS_KEYTAB_FILE_PATH); + String kerberosPrincipal = options.get(KERBEROS_PRINCIPAL); + try { + PaimonHelper.kerberosAuthentication(context.hadoopConf(), kerberosPrincipal, kerberosKeytabFilePath); + LOG.info("kerberos Authentication success"); + + FileSystem fs = FileSystem.get(context.hadoopConf()); + fs.getStatus().getCapacity(); + } catch (Exception e) { + LOG.error("kerberos Authentication error", e); + throw new RuntimeException(e); + } + } + try (Catalog catalog = CatalogFactory.createCatalog(context)) { + + String dbName = this.conf.getString("dbName"); + String tableName = this.conf.getString("tableName"); + Identifier identifier = Identifier.create(dbName, tableName); + + Table table = catalog.getTable(identifier); + + writeBuilder = table.newBatchWriteBuilder(); + + + } catch (Exception e) { + LOG.error("init paimon error", e); + throw new RuntimeException(e); + } + + + } + + @Override + public List split(int mandatoryNumber) { + List configurations = new ArrayList<>(mandatoryNumber); + for (int i = 0; i < mandatoryNumber; i++) { + configurations.add(conf); + } + return configurations; + } + + @Override + public void prepare() { + String writeMode = this.conf.getString("writeMode"); + if ("truncate".equalsIgnoreCase(writeMode)) { + if (writeBuilder != null) { + LOG.info("You specify truncate writeMode, begin to clean history data."); + BatchTableWrite write = writeBuilder.withOverwrite().newWrite(); + BatchTableCommit commit = writeBuilder.newCommit(); + commit.commit(new ArrayList<>()); + try { + write.close(); + } catch (Exception e) { + LOG.error("close paimon write error", e); + throw new RuntimeException(e); + } + } + } + } + + @Override + public void destroy() { + + } + } + + public static class Task + extends Writer.Task { + + private static final Logger log = LoggerFactory.getLogger(Task.class); + private Configuration conf = null; + private BatchWriteBuilder writeBuilder = null; + private Integer batchSize = 1000; + private List columnList = new ArrayList<>(); + private List typeList = new ArrayList<>(); + + @Override + public void startWrite(RecordReceiver recordReceiver) { + + List writerBuffer = new ArrayList<>(this.batchSize); + Record record; + long total = 0; + while ((record = recordReceiver.getFromReader()) != null) { + writerBuffer.add(record); + if (writerBuffer.size() >= this.batchSize) { + total += doBatchInsert(writerBuffer); + writerBuffer.clear(); + } + } + + if (!writerBuffer.isEmpty()) { + total += doBatchInsert(writerBuffer); + writerBuffer.clear(); + } + + String msg = String.format("task end, write size :%d", total); + getTaskPluginCollector().collectMessage("writeSize", String.valueOf(total)); + log.info(msg); + + } + + @Override + public void init() { + this.conf = super.getPluginJobConf(); + + batchSize = conf.getInt("batchSize", 1000); + + Options options = PaimonHelper.getOptions(this.conf); + CatalogContext context = PaimonHelper.getCatalogContext(options); + + if ("kerberos".equals(options.get("hadoop.security.authentication"))) { + String kerberosKeytabFilePath = options.get(KERBEROS_KEYTAB_FILE_PATH); + String kerberosPrincipal = options.get(KERBEROS_PRINCIPAL); + try { + PaimonHelper.kerberosAuthentication(context.hadoopConf(), kerberosPrincipal, kerberosKeytabFilePath); + log.info("kerberos Authentication success"); + } catch (Exception e) { + log.error("kerberos Authentication error", e); + throw new RuntimeException(e); + } + } + + try (Catalog catalog = CatalogFactory.createCatalog(context)) { + + String dbName = this.conf.getString("dbName"); + String tableName = this.conf.getString("tableName"); + Identifier identifier = Identifier.create(dbName, tableName); + + Table table = catalog.getTable(identifier); + + columnList = table.rowType().getFields(); + typeList = table.rowType().getFieldTypes(); + writeBuilder = table.newBatchWriteBuilder(); + + + } catch (Exception e) { + log.error("init paimon error", e); + throw new RuntimeException(e); + } + } + + @Override + public void destroy() { + + } + + private long doBatchInsert(final List writerBuffer) { + BatchTableWrite write = writeBuilder.newWrite(); + GenericRow data; + for (Record record : writerBuffer) { + data = new GenericRow(columnList.size()); + StringBuilder id = new StringBuilder(); + for (int i = 0; i < record.getColumnNumber(); i++) { + Column column = record.getColumn(i); + if (i >= columnList.size()) { + throw new RuntimeException("columnList size is " + columnList.size() + ", but record column number is " + record.getColumnNumber()); + } + String columnName = columnList.get(i).name(); + DataType columnType = typeList.get(i); + //如果是数组类型,那它传入的必是字符串类型 + if (columnType.getTypeRoot().equals(DataTypeRoot.ARRAY)) { + if (null == column.asString()) { + data.setField(i, null); + } else { + String[] dataList = column.asString().split(","); + data.setField(i, new GenericArray(dataList)); + } + } else { + switch (columnType.getTypeRoot()) { + + case DATE: + try { + data.setField(i, Timestamp.fromEpochMillis(column.asLong())); + } catch (Exception e) { + getTaskPluginCollector().collectDirtyRecord(record, String.format("时间类型解析失败 [%s:%s] exception: %s", columnName, column.toString(), e)); + } + break; + case CHAR: + case VARCHAR: + data.setField(i, BinaryString.fromString(column.asString())); + break; + case BOOLEAN: + data.setField(i, column.asBoolean()); + break; + case VARBINARY: + case BINARY: + data.setField(i, column.asBytes()); + break; + case BIGINT: + data.setField(i, column.asLong()); + break; + case INTEGER: + case SMALLINT: + case TINYINT: + data.setField(i, column.asBigInteger().intValue()); + break; + case FLOAT: + case DOUBLE: + data.setField(i, column.asDouble()); + break; + case MAP: + try { + data.setField(i, new GenericMap(JSON.parseObject(column.asString(), Map.class))); + } catch (Exception e) { + getTaskPluginCollector().collectDirtyRecord(record, String.format("MAP类型解析失败 [%s:%s] exception: %s", columnName, column.toString(), e)); + } + break; + default: + getTaskPluginCollector().collectDirtyRecord(record, "类型错误:不支持的类型:" + columnType + " " + columnName); + } + } + } + + try { + write.write(data); + } catch (Exception e) { + throw new RuntimeException(e); + } + + } + + List messages = null; + try { + messages = write.prepareCommit(); + BatchTableCommit commit = writeBuilder.newCommit(); + commit.commit(messages); + + + write.close(); + + return messages.size(); + } catch (Exception e) { + throw new RuntimeException(e); + } + + + } + + } +} diff --git a/plugin/writer/paimonwriter/src/main/resources/plugin.json b/plugin/writer/paimonwriter/src/main/resources/plugin.json new file mode 100644 index 000000000..b1dd4c177 --- /dev/null +++ b/plugin/writer/paimonwriter/src/main/resources/plugin.json @@ -0,0 +1,6 @@ +{ + "name": "paimonwriter", + "class": "com.wgzhao.addax.plugin.writer.paimonwriter.PaimonWriter", + "description": "write data to paimon", + "developer": "wgzhao" +} diff --git a/plugin/writer/paimonwriter/src/main/resources/plugin_job_template.json b/plugin/writer/paimonwriter/src/main/resources/plugin_job_template.json new file mode 100644 index 000000000..681c940f9 --- /dev/null +++ b/plugin/writer/paimonwriter/src/main/resources/plugin_job_template.json @@ -0,0 +1,13 @@ +{ + "name": "paimonwriter", + "parameter": { + + "dbName": "test", + "tableName": "test", + "writeMode": "truncate", + "paimonConfig": { + "warehouse": "file:///tmp/paimon", + "metastore": "filesystem" + } + } +} diff --git a/pom.xml b/pom.xml index 98d64beaf..266a21b41 100644 --- a/pom.xml +++ b/pom.xml @@ -395,6 +395,7 @@ plugin/writer/sybasewriter plugin/writer/tdenginewriter plugin/writer/txtfilewriter + plugin/writer/paimonwriter From 5a6c92e56d04974db95253ec874c2a1a6a360913 Mon Sep 17 00:00:00 2001 From: awol2005ex Date: Tue, 21 Jan 2025 15:41:54 +0800 Subject: [PATCH 2/9] fixed timestamp,decimal,null type value bug --- .../writer/paimonwriter/PaimonWriter.java | 22 +++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/plugin/writer/paimonwriter/src/main/java/com/wgzhao/addax/plugin/writer/paimonwriter/PaimonWriter.java b/plugin/writer/paimonwriter/src/main/java/com/wgzhao/addax/plugin/writer/paimonwriter/PaimonWriter.java index c02ba3542..79175abf9 100644 --- a/plugin/writer/paimonwriter/src/main/java/com/wgzhao/addax/plugin/writer/paimonwriter/PaimonWriter.java +++ b/plugin/writer/paimonwriter/src/main/java/com/wgzhao/addax/plugin/writer/paimonwriter/PaimonWriter.java @@ -21,6 +21,7 @@ import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypeRoot; +import org.apache.paimon.types.DecimalType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -203,6 +204,9 @@ private long doBatchInsert(final List writerBuffer) { StringBuilder id = new StringBuilder(); for (int i = 0; i < record.getColumnNumber(); i++) { Column column = record.getColumn(i); + if(column ==null ){ + continue; + } if (i >= columnList.size()) { throw new RuntimeException("columnList size is " + columnList.size() + ", but record column number is " + record.getColumnNumber()); } @@ -220,8 +224,14 @@ private long doBatchInsert(final List writerBuffer) { switch (columnType.getTypeRoot()) { case DATE: + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + case TIMESTAMP_WITHOUT_TIME_ZONE: try { - data.setField(i, Timestamp.fromEpochMillis(column.asLong())); + if(column.asLong()!=null) { + data.setField(i, Timestamp.fromEpochMillis(column.asLong())); + } else { + data.setField(i, null); + } } catch (Exception e) { getTaskPluginCollector().collectDirtyRecord(record, String.format("时间类型解析失败 [%s:%s] exception: %s", columnName, column.toString(), e)); } @@ -243,12 +253,20 @@ private long doBatchInsert(final List writerBuffer) { case INTEGER: case SMALLINT: case TINYINT: - data.setField(i, column.asBigInteger().intValue()); + data.setField(i, column.asBigInteger()==null?null:column.asBigInteger().intValue()); break; case FLOAT: case DOUBLE: + data.setField(i, column.asDouble()); break; + case DECIMAL: + if(column.asBigDecimal()!=null) { + data.setField(i, Decimal.fromBigDecimal(column.asBigDecimal(), ((DecimalType) columnType).getPrecision(), ((DecimalType) columnType).getScale())); + } else { + data.setField(i, null); + } + break; case MAP: try { data.setField(i, new GenericMap(JSON.parseObject(column.asString(), Map.class))); From 992de9dc00a2080a25963a820ef715386ada5913 Mon Sep 17 00:00:00 2001 From: awol2005ex Date: Wed, 22 Jan 2025 08:42:14 +0800 Subject: [PATCH 3/9] =?UTF-8?q?1.change=20author=202.Make=20modules=C2=A0?= =?UTF-8?q?=20ordered=20alphabetically=203.modify=20mkdocs.yml=20and=20add?= =?UTF-8?q?=20the=20guide=20into=20section=20=E5=86=99=E5=85=A5=E6=8F=92?= =?UTF-8?q?=E4=BB=B6=20ordered=20alphabetically?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- mkdocs.yml | 1 + package.xml | 16 ++++++++-------- .../paimonwriter/src/main/resources/plugin.json | 2 +- pom.xml | 2 +- 4 files changed, 11 insertions(+), 10 deletions(-) diff --git a/mkdocs.yml b/mkdocs.yml index 92d6c4a2a..18d142291 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -99,6 +99,7 @@ nav: - writer/mongodbwriter.md - writer/mysqlwriter.md - writer/oraclewriter.md + - writer/paimonwriter.md - writer/postgresqlwriter.md - writer/rdbmswriter.md - writer/rediswriter.md diff --git a/package.xml b/package.xml index b5a82b669..16ba1ce6d 100644 --- a/package.xml +++ b/package.xml @@ -521,6 +521,14 @@ 0644 addax-${project.version} + + plugin/writer/paimonwriter/target/paimonwriter-${project.version}/ + + **/*.* + + 0644 + addax-${project.version} + plugin/writer/postgresqlwriter/target/postgresqlwriter-${project.version}/ @@ -609,13 +617,5 @@ 0644 addax-${project.version} - - plugin/writer/paimonwriter/target/paimonwriter-${project.version}/ - - **/*.* - - 0644 - addax-${project.version} - diff --git a/plugin/writer/paimonwriter/src/main/resources/plugin.json b/plugin/writer/paimonwriter/src/main/resources/plugin.json index b1dd4c177..0eb2be570 100644 --- a/plugin/writer/paimonwriter/src/main/resources/plugin.json +++ b/plugin/writer/paimonwriter/src/main/resources/plugin.json @@ -2,5 +2,5 @@ "name": "paimonwriter", "class": "com.wgzhao.addax.plugin.writer.paimonwriter.PaimonWriter", "description": "write data to paimon", - "developer": "wgzhao" + "developer": "awol2005ex" } diff --git a/pom.xml b/pom.xml index 266a21b41..9acba56ae 100644 --- a/pom.xml +++ b/pom.xml @@ -384,6 +384,7 @@ plugin/writer/mongodbwriter plugin/writer/mysqlwriter plugin/writer/oraclewriter + plugin/writer/paimonwriter plugin/writer/postgresqlwriter plugin/writer/rdbmswriter plugin/writer/rediswriter @@ -395,7 +396,6 @@ plugin/writer/sybasewriter plugin/writer/tdenginewriter plugin/writer/txtfilewriter - plugin/writer/paimonwriter From 1784cf0c6e80d5069f10180b3499e6dd04b7f27f Mon Sep 17 00:00:00 2001 From: awol2005ex Date: Thu, 23 Jan 2025 09:01:33 +0800 Subject: [PATCH 4/9] [add] add paimonwriter.json to docs --- docs/assets/jobs/paimonwriter.json | 48 ++++++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) create mode 100644 docs/assets/jobs/paimonwriter.json diff --git a/docs/assets/jobs/paimonwriter.json b/docs/assets/jobs/paimonwriter.json new file mode 100644 index 000000000..84c97304a --- /dev/null +++ b/docs/assets/jobs/paimonwriter.json @@ -0,0 +1,48 @@ +{ + "job": { + "setting": { + "speed": { + "channel": 3 + }, + "errorLimit": { + "record": 0, + "percentage": 0 + } + }, + "content": [ + { + "reader": { + "name": "rdbmsreader", + "parameter": { + "username": "root", + "password": "root", + "column": [ + "*" + ], + "connection": [ + { + "querySql": [ + "select 1+0 id ,'test1' as name" + ], + "jdbcUrl": ["jdbc:mysql://localhost:3306/ruoyi_vue_camunda?allowPublicKeyRetrieval=true",] + } + ], + "fetchSize": 1024 + } + }, + "writer": { + "name": "paimonwriter", + "parameter": { + "dbName": "test", + "tableName": "test2", + "writeMode": "truncate", + "paimonConfig": { + "warehouse": "file:///g:/paimon", + "metastore": "filesystem" + } + } + } + } + ] + } +} \ No newline at end of file From 30ce1e71f14b9815d2673203f3db05c912044127 Mon Sep 17 00:00:00 2001 From: awol2005ex Date: Thu, 23 Jan 2025 16:08:32 +0800 Subject: [PATCH 5/9] [add] add the way of create paimon table before test to paimonwriter.md --- docs/writer/paimonwriter.md | 266 ++++++++++++++++++++++++++++++++++++ 1 file changed, 266 insertions(+) diff --git a/docs/writer/paimonwriter.md b/docs/writer/paimonwriter.md index a1973c1ea..7110c5e55 100644 --- a/docs/writer/paimonwriter.md +++ b/docs/writer/paimonwriter.md @@ -29,6 +29,272 @@ Paimon Writer 提供向 已有的paimon表写入数据的能力。 ### paimonConfig `paimonConfig` 里可以配置与 Paimon catalog和Hadoop 相关的一些高级参数,比如HA的配置 + +本地目录创建paimon表 + +pom.xml + +```xml + + + 4.0.0 + + com.test + paimon-java-api-test + 1.0-SNAPSHOT + + + 8 + 8 + UTF-8 + 3.2.4 + 7.0.0 + + + + org.apache.paimon + paimon-bundle + 1.0.0 + + + + + org.apache.hadoop + hadoop-common + ${hadoop.version} + + + com.fasterxml.jackson.core + jackson-databind + + + org.codehaus.jackson + jackson-core-asl + + + org.codehaus.jackson + jackson-mapper-asl + + + com.fasterxml.woodstox + woodstox-core + + + commons-codec + commons-codec + + + commons-net + commons-net + + + io.netty + netty + + + log4j + log4j + + + net.minidev + json-smart + + + org.codehaus.jettison + jettison + + + org.eclipse.jetty + jetty-server + + + org.xerial.snappy + snappy-java + + + org.apache.zookeeper + zookeeper + + + org.eclipse.jetty + jetty-util + + + + + + org.apache.hadoop + hadoop-aws + ${hadoop.version} + + + com.fasterxml.jackson.core + jackson-databind + + + org.codehaus.jackson + jackson-core-asl + + + org.codehaus.jackson + jackson-mapper-asl + + + com.fasterxml.woodstox + woodstox-core + + + commons-codec + commons-codec + + + commons-net + commons-net + + + io.netty + netty + + + log4j + log4j + + + net.minidev + json-smart + + + org.codehaus.jettison + jettison + + + org.eclipse.jetty + jetty-server + + + org.xerial.snappy + snappy-java + + + org.apache.zookeeper + zookeeper + + + org.eclipse.jetty + jetty-util + + + + + + org.apache.hadoop + hadoop-mapreduce-client-core + ${hadoop.version} + + + com.fasterxml.jackson.core + jackson-databind + + + commons-codec + commons-codec + + + io.netty + netty + + + org.eclipse.jetty + jetty-util + + + + + + + com.fasterxml.woodstox + woodstox-core + ${woodstox.version} + + + +``` + +```java + +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogContext; +import org.apache.paimon.catalog.CatalogFactory; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.fs.Path; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.types.DataTypes; + +import java.util.HashMap; +import java.util.Map; + +public class CreatePaimonTable { + + public static Catalog createFilesystemCatalog() { + CatalogContext context = CatalogContext.create(new Path("file:///g:/paimon")); + return CatalogFactory.createCatalog(context); + } + + public static void main(String[] args) { + Schema.Builder schemaBuilder = Schema.newBuilder(); + schemaBuilder.primaryKey("id"); + schemaBuilder.column("id", DataTypes.INT()); + schemaBuilder.column("name", DataTypes.STRING()); + Map options = new HashMap<>(); + options.put("bucket", "1");//由于paimon java api 限制需要bucket>0 + options.put("bucket-key", "id"); + options.put("file.format", "orc"); + options.put("file.compression", "lz4"); + options.put("lookup.cache-spill-compression", "lz4"); + options.put("spill-compression", "LZ4"); + options.put("orc.compress", "lz4"); + options.put("manifest.format", "orc"); + + schemaBuilder.options(options); + Schema schema = schemaBuilder.build(); + + Identifier identifier = Identifier.create("test", "test2"); + try { + Catalog catalog = CreatePaimonTable.createFilesystemCatalog(); + catalog.createTable(identifier, schema, true); + } catch (Catalog.TableAlreadyExistException e) { + e.printStackTrace(); + } catch (Catalog.DatabaseNotExistException e) { + e.printStackTrace(); + } + + + } +} + + +``` + +Spark 或者 flink 环境创建表 + +```sql +CREATE TABLE if not exists test.test2 tblproperties ( + 'primary-key' = 'id', + 'bucket' = '1', + 'bucket-key' = 'id' + 'file.format'='orc', + 'file.compression'='lz4', + 'lookup.cache-spill-compression'='lz4', + 'spill-compression'='LZ4', + 'orc.compress'='lz4', + 'manifest.format'='orc' +) + +``` + + ```json { "name": "paimonwriter", From 637f861ab1778ee8be8617346a78c798043b6b7e Mon Sep 17 00:00:00 2001 From: awol2005ex Date: Thu, 23 Jan 2025 16:13:46 +0800 Subject: [PATCH 6/9] [add] add the way of create paimon table before test to paimonwriter.md --- docs/writer/paimonwriter.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/writer/paimonwriter.md b/docs/writer/paimonwriter.md index 7110c5e55..5b7893412 100644 --- a/docs/writer/paimonwriter.md +++ b/docs/writer/paimonwriter.md @@ -280,7 +280,7 @@ public class CreatePaimonTable { Spark 或者 flink 环境创建表 ```sql -CREATE TABLE if not exists test.test2 tblproperties ( +CREATE TABLE if not exists test.test2(id int ,name string) tblproperties ( 'primary-key' = 'id', 'bucket' = '1', 'bucket-key' = 'id' From a59ab8f70cac29882cb0767aedd4be381c0fe3f6 Mon Sep 17 00:00:00 2001 From: awol2005ex Date: Fri, 24 Jan 2025 08:59:27 +0800 Subject: [PATCH 7/9] [add] add the way of create paimon table before test to paimonwriter.md (add create database first) --- docs/writer/paimonwriter.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/docs/writer/paimonwriter.md b/docs/writer/paimonwriter.md index 5b7893412..54b9f5deb 100644 --- a/docs/writer/paimonwriter.md +++ b/docs/writer/paimonwriter.md @@ -263,11 +263,14 @@ public class CreatePaimonTable { Identifier identifier = Identifier.create("test", "test2"); try { Catalog catalog = CreatePaimonTable.createFilesystemCatalog(); + catalog.createDatabase("test",true); catalog.createTable(identifier, schema, true); } catch (Catalog.TableAlreadyExistException e) { e.printStackTrace(); } catch (Catalog.DatabaseNotExistException e) { e.printStackTrace(); + } catch (Catalog.DatabaseAlreadyExistException e) { + throw new RuntimeException(e); } From 387f15eae375c6d0482bbcf978042ab026c5b702 Mon Sep 17 00:00:00 2001 From: awol2005ex Date: Fri, 24 Jan 2025 14:34:50 +0800 Subject: [PATCH 8/9] fixed: the bug that no hadoop conf when use s3a fielsystem --- .../addax/plugin/writer/paimonwriter/PaimonHelper.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/plugin/writer/paimonwriter/src/main/java/com/wgzhao/addax/plugin/writer/paimonwriter/PaimonHelper.java b/plugin/writer/paimonwriter/src/main/java/com/wgzhao/addax/plugin/writer/paimonwriter/PaimonHelper.java index 187080ead..f264c163d 100644 --- a/plugin/writer/paimonwriter/src/main/java/com/wgzhao/addax/plugin/writer/paimonwriter/PaimonHelper.java +++ b/plugin/writer/paimonwriter/src/main/java/com/wgzhao/addax/plugin/writer/paimonwriter/PaimonHelper.java @@ -36,7 +36,11 @@ public static Options getOptions(Configuration conf){ public static CatalogContext getCatalogContext(Options options) { CatalogContext context = null; - if(options.get("warehouse").startsWith("hdfs://")){ + String warehouse=options.get("warehouse"); + if (warehouse ==null || warehouse.isEmpty()){ + throw new RuntimeException("warehouse of the paimonConfig is null"); + } + if(warehouse.startsWith("hdfs://")||warehouse.startsWith("s3a://")){ org.apache.hadoop.conf.Configuration hadoopConf = new org.apache.hadoop.conf.Configuration(); options.toMap().forEach((k, v) -> hadoopConf.set(k, String.valueOf(v))); From 1b4dbec72f95aeeba4bcabc3c6a97067bcb33943 Mon Sep 17 00:00:00 2001 From: awol2005ex Date: Fri, 24 Jan 2025 14:43:48 +0800 Subject: [PATCH 9/9] add minio example to paimonwriter.md --- docs/writer/paimonwriter.md | 85 +++++++++++++++++++++++++++++++++++++ 1 file changed, 85 insertions(+) diff --git a/docs/writer/paimonwriter.md b/docs/writer/paimonwriter.md index 54b9f5deb..f2c07a5db 100644 --- a/docs/writer/paimonwriter.md +++ b/docs/writer/paimonwriter.md @@ -241,6 +241,26 @@ public class CreatePaimonTable { CatalogContext context = CatalogContext.create(new Path("file:///g:/paimon")); return CatalogFactory.createCatalog(context); } + /* 如果是minio则例子如下 + + public static Catalog createFilesystemCatalog() { + Options options = new Options(); + options.set("warehouse", "s3a://pvc-91d1e2cd-4d25-45c9-8613-6c4f7bf0a4cc/paimon"); + Configuration hadoopConf = new Configuration(); + hadoopConf.set("fs.s3a.endpoint", "http://localhost:9000"); + hadoopConf.set("fs.s3a.access.key", "gy0dX5lALP176g6c9fYf"); + hadoopConf.set("fs.s3a.secret.key", "ReuUrCzzu5wKWAegtswoHIWV389BYl9AB1ZQbiKr"); + hadoopConf.set("fs.s3a.connection.ssl.enabled", "false"); + hadoopConf.set("fs.s3a.path.style.access", "true"); + hadoopConf.set("fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem"); + CatalogContext context = CatalogContext.create(options,hadoopConf); + + + return CatalogFactory.createCatalog(context); + } + * + * + * */ public static void main(String[] args) { Schema.Builder schemaBuilder = Schema.newBuilder(); @@ -297,6 +317,7 @@ CREATE TABLE if not exists test.test2(id int ,name string) tblproperties ( ``` +本地文件例子 ```json { @@ -312,6 +333,70 @@ CREATE TABLE if not exists test.test2(id int ,name string) tblproperties ( } } ``` + +s3 或者 minio catalog例子 +```json +{ + "job": { + "setting": { + "speed": { + "channel": 3 + }, + "errorLimit": { + "record": 0, + "percentage": 0 + } + }, + "content": [ + { + "reader": { + "name": "rdbmsreader", + "parameter": { + "username": "root", + "password": "root", + "column": [ + "*" + ], + "connection": [ + { + "querySql": [ + "select 1+0 id ,'test1' as name" + ], + "jdbcUrl": [ + "jdbc:mysql://localhost:3306/ruoyi_vue_camunda?allowPublicKeyRetrieval=true" + ] + } + ], + "fetchSize": 1024 + } + }, + "writer": { + "name": "paimonwriter", + "parameter": { + "dbName": "test", + "tableName": "test2", + "writeMode": "truncate", + "paimonConfig": { + "warehouse": "s3a://pvc-91d1e2cd-4d25-45c9-8613-6c4f7bf0a4cc/paimon", + "metastore": "filesystem", + "fs.s3a.endpoint": "http://localhost:9000", + "fs.s3a.access.key": "gy0dX5lALP176g6c9fYf", + "fs.s3a.secret.key": "ReuUrCzzu5wKWAegtswoHIWV389BYl9AB1ZQbiKr", + "fs.s3a.connection.ssl.enabled": "false", + "fs.s3a.path.style.access": "true", + "fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem" + } + } + } + } + ] + } +} +``` + + +hdfs catalog例子 + ```json { "paimonConfig": {