From bda5e2df9852136ed12580f4bebc7858509c4ed0 Mon Sep 17 00:00:00 2001 From: awol2005ex Date: Wed, 22 Jan 2025 18:37:47 +0800 Subject: [PATCH] [add][plugin][paimonwriter] Add support for writing files in Paimon format --- docs/writer/paimonwriter.md | 88 +++++ mkdocs.yml | 1 + package.xml | 8 + plugin/writer/paimonwriter/package.xml | 37 +++ plugin/writer/paimonwriter/pom.xml | 225 +++++++++++++ .../writer/paimonwriter/PaimonHelper.java | 53 +++ .../writer/paimonwriter/PaimonWriter.java | 309 ++++++++++++++++++ .../src/main/resources/plugin.json | 6 + .../main/resources/plugin_job_template.json | 13 + pom.xml | 1 + 10 files changed, 741 insertions(+) create mode 100644 docs/writer/paimonwriter.md create mode 100644 plugin/writer/paimonwriter/package.xml create mode 100644 plugin/writer/paimonwriter/pom.xml create mode 100644 plugin/writer/paimonwriter/src/main/java/com/wgzhao/addax/plugin/writer/paimonwriter/PaimonHelper.java create mode 100644 plugin/writer/paimonwriter/src/main/java/com/wgzhao/addax/plugin/writer/paimonwriter/PaimonWriter.java create mode 100644 plugin/writer/paimonwriter/src/main/resources/plugin.json create mode 100644 plugin/writer/paimonwriter/src/main/resources/plugin_job_template.json diff --git a/docs/writer/paimonwriter.md b/docs/writer/paimonwriter.md new file mode 100644 index 000000000..a1973c1ea --- /dev/null +++ b/docs/writer/paimonwriter.md @@ -0,0 +1,88 @@ +# Paimon Writer + +Paimon Writer 提供向 已有的paimon表写入数据的能力。 + +## 配置样例 + +```json +--8<-- "jobs/paimonwriter.json" +``` + +## 参数说明 + +| 配置项 | 是否必须 | 数据类型 | 默认值 | 说明 | +|:-------------|:----:|--------|----|------------------------------------------------| +| dbName | 是 | string | 无 | 要写入的paimon数据库名 | +| tableName | 是 | string | 无 | 要写入的paimon表名 | +| writeMode | 是 | string | 无 | 写入模式,详述见下 | +| paimonConfig | 是 | json | {} | 里可以配置与 Paimon catalog和Hadoop 相关的一些高级参数,比如HA的配置 | + + + +### writeMode + +写入前数据清理处理模式: + +- append,写入前不做任何处理,直接写入,不清除原来的数据。 +- truncate 写入前先清空表,再写入。 + +### paimonConfig + +`paimonConfig` 里可以配置与 Paimon catalog和Hadoop 相关的一些高级参数,比如HA的配置 +```json +{ + "name": "paimonwriter", + "parameter": { + "dbName": "test", + "tableName": "test2", + "writeMode": "truncate", + "paimonConfig": { + "warehouse": "file:///g:/paimon", + "metastore": "filesystem" + } + } +} +``` +```json +{ + "paimonConfig": { + "warehouse": "hdfs://nameservice1/user/hive/paimon", + "metastore": "filesystem", + "fs.defaultFS":"hdfs://nameservice1", + "hadoop.security.authentication" : "kerberos", + "hadoop.kerberos.principal" : "hive/_HOST@XXXX.COM", + "hadoop.kerberos.keytab" : "/tmp/hive@XXXX.COM.keytab", + "ha.zookeeper.quorum" : "test-pr-nn1:2181,test-pr-nn2:2181,test-pr-nn3:2181", + "dfs.nameservices" : "nameservice1", + "dfs.namenode.rpc-address.nameservice1.namenode371" : "test-pr-nn2:8020", + "dfs.namenode.rpc-address.nameservice1.namenode265": "test-pr-nn1:8020", + "dfs.namenode.keytab.file" : "/tmp/hdfs@XXXX.COM.keytab", + "dfs.namenode.keytab.enabled" : "true", + "dfs.namenode.kerberos.principal" : "hdfs/_HOST@XXXX.COM", + "dfs.namenode.kerberos.internal.spnego.principal" : "HTTP/_HOST@XXXX.COM", + "dfs.ha.namenodes.nameservice1" : "namenode265,namenode371", + "dfs.datanode.keytab.file" : "/tmp/hdfs@XXXX.COM.keytab", + "dfs.datanode.keytab.enabled" : "true", + "dfs.datanode.kerberos.principal" : "hdfs/_HOST@XXXX.COM", + "dfs.client.use.datanode.hostname" : "false", + "dfs.client.failover.proxy.provider.nameservice1" : "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider", + "dfs.balancer.keytab.file" : "/tmp/hdfs@XXXX.COM.keytab", + "dfs.balancer.keytab.enabled" : "true", + "dfs.balancer.kerberos.principal" : "hdfs/_HOST@XXXX.COM" + } +} +``` + + +## 类型转换 + +| Addax 内部类型 | Paimon 数据类型 | +|------------|------------------------------| +| Integer | TINYINT,SMALLINT,INT,INTEGER | +| Long | BIGINT | +| Double | FLOAT,DOUBLE,DECIMAL | +| String | STRING,VARCHAR,CHAR | +| Boolean | BOOLEAN | +| Date | DATE,TIMESTAMP | +| Bytes | BINARY | + diff --git a/mkdocs.yml b/mkdocs.yml index 92d6c4a2a..18d142291 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -99,6 +99,7 @@ nav: - writer/mongodbwriter.md - writer/mysqlwriter.md - writer/oraclewriter.md + - writer/paimonwriter.md - writer/postgresqlwriter.md - writer/rdbmswriter.md - writer/rediswriter.md diff --git a/package.xml b/package.xml index f787ad819..16ba1ce6d 100644 --- a/package.xml +++ b/package.xml @@ -521,6 +521,14 @@ 0644 addax-${project.version} + + plugin/writer/paimonwriter/target/paimonwriter-${project.version}/ + + **/*.* + + 0644 + addax-${project.version} + plugin/writer/postgresqlwriter/target/postgresqlwriter-${project.version}/ diff --git a/plugin/writer/paimonwriter/package.xml b/plugin/writer/paimonwriter/package.xml new file mode 100644 index 000000000..8ecab9f9f --- /dev/null +++ b/plugin/writer/paimonwriter/package.xml @@ -0,0 +1,37 @@ + + release + + dir + + false + + + src/main/resources + + *.json + + plugin/writer/${project.artifactId} + + + target/ + + ${project.artifactId}-${project.version}.jar + + plugin/writer/${project.artifactId} + + + + + + false + plugin/writer/${project.artifactId}/libs + runtime + + com.wgzhao.addax:* + + + + diff --git a/plugin/writer/paimonwriter/pom.xml b/plugin/writer/paimonwriter/pom.xml new file mode 100644 index 000000000..3aa053801 --- /dev/null +++ b/plugin/writer/paimonwriter/pom.xml @@ -0,0 +1,225 @@ + + + 4.0.0 + + + com.wgzhao.addax + addax-all + 4.2.3-SNAPSHOT + ../../../pom.xml + + + paimonwriter + paimon-writer + PaimonWriter提供了本地写入paimon格式文件功能,建议开发、测试环境使用。 + jar + + + + com.wgzhao.addax + addax-common + ${project.version} + + + slf4j-log4j12 + org.slf4j + + + + + + com.wgzhao.addax + addax-storage + ${project.version} + + + + org.apache.paimon + paimon-bundle + 1.0.0 + + + + + org.apache.hadoop + hadoop-common + ${hadoop.version} + + + com.fasterxml.jackson.core + jackson-databind + + + org.codehaus.jackson + jackson-core-asl + + + org.codehaus.jackson + jackson-mapper-asl + + + com.fasterxml.woodstox + woodstox-core + + + commons-codec + commons-codec + + + commons-net + commons-net + + + io.netty + netty + + + log4j + log4j + + + net.minidev + json-smart + + + org.codehaus.jettison + jettison + + + org.eclipse.jetty + jetty-server + + + org.xerial.snappy + snappy-java + + + org.apache.zookeeper + zookeeper + + + org.eclipse.jetty + jetty-util + + + + + + org.apache.hadoop + hadoop-aws + ${hadoop.version} + + + com.fasterxml.jackson.core + jackson-databind + + + org.codehaus.jackson + jackson-core-asl + + + org.codehaus.jackson + jackson-mapper-asl + + + com.fasterxml.woodstox + woodstox-core + + + commons-codec + commons-codec + + + commons-net + commons-net + + + io.netty + netty + + + log4j + log4j + + + net.minidev + json-smart + + + org.codehaus.jettison + jettison + + + org.eclipse.jetty + jetty-server + + + org.xerial.snappy + snappy-java + + + org.apache.zookeeper + zookeeper + + + org.eclipse.jetty + jetty-util + + + + + + org.apache.hadoop + hadoop-mapreduce-client-core + ${hadoop.version} + + + com.fasterxml.jackson.core + jackson-databind + + + commons-codec + commons-codec + + + io.netty + netty + + + org.eclipse.jetty + jetty-util + + + + + + + com.fasterxml.woodstox + woodstox-core + ${woodstox.version} + + + + + + + maven-assembly-plugin + + + package.xml + + ${project.artifactId}-${project.version} + + + + release + package + + single + + + + + + + diff --git a/plugin/writer/paimonwriter/src/main/java/com/wgzhao/addax/plugin/writer/paimonwriter/PaimonHelper.java b/plugin/writer/paimonwriter/src/main/java/com/wgzhao/addax/plugin/writer/paimonwriter/PaimonHelper.java new file mode 100644 index 000000000..187080ead --- /dev/null +++ b/plugin/writer/paimonwriter/src/main/java/com/wgzhao/addax/plugin/writer/paimonwriter/PaimonHelper.java @@ -0,0 +1,53 @@ +package com.wgzhao.addax.plugin.writer.paimonwriter; + +import com.wgzhao.addax.common.exception.AddaxException; +import com.wgzhao.addax.common.util.Configuration; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.paimon.catalog.CatalogContext; +import org.apache.paimon.options.Options; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static com.wgzhao.addax.common.spi.ErrorCode.LOGIN_ERROR; + +public class PaimonHelper { + private static final Logger LOG = LoggerFactory.getLogger(PaimonHelper.class); + + public static void kerberosAuthentication(org.apache.hadoop.conf.Configuration hadoopConf, String kerberosPrincipal, String kerberosKeytabFilePath) throws Exception { + if (StringUtils.isNotBlank(kerberosPrincipal) && StringUtils.isNotBlank(kerberosKeytabFilePath)) { + UserGroupInformation.setConfiguration(hadoopConf); + try { + UserGroupInformation.loginUserFromKeytab(kerberosPrincipal, kerberosKeytabFilePath); + } catch (Exception e) { + String message = String.format("kerberos authentication failed, keytab file: [%s], principal: [%s]", + kerberosKeytabFilePath, kerberosPrincipal); + LOG.error(message); + throw AddaxException.asAddaxException(LOGIN_ERROR, e); + } + } + } + + public static Options getOptions(Configuration conf){ + Options options = new Options(); + conf.getMap("paimonConfig").forEach((k, v) -> options.set(k, String.valueOf(v))); + return options; + } + + public static CatalogContext getCatalogContext(Options options) { + CatalogContext context = null; + if(options.get("warehouse").startsWith("hdfs://")){ + + org.apache.hadoop.conf.Configuration hadoopConf = new org.apache.hadoop.conf.Configuration(); + options.toMap().forEach((k, v) -> hadoopConf.set(k, String.valueOf(v))); + UserGroupInformation.setConfiguration(hadoopConf); + context = CatalogContext.create(options,hadoopConf); + } else { + + context = CatalogContext.create(options); + } + + return context; + } + +} diff --git a/plugin/writer/paimonwriter/src/main/java/com/wgzhao/addax/plugin/writer/paimonwriter/PaimonWriter.java b/plugin/writer/paimonwriter/src/main/java/com/wgzhao/addax/plugin/writer/paimonwriter/PaimonWriter.java new file mode 100644 index 000000000..79175abf9 --- /dev/null +++ b/plugin/writer/paimonwriter/src/main/java/com/wgzhao/addax/plugin/writer/paimonwriter/PaimonWriter.java @@ -0,0 +1,309 @@ +package com.wgzhao.addax.plugin.writer.paimonwriter; + +import com.alibaba.fastjson2.JSON; +import com.wgzhao.addax.common.element.Column; +import com.wgzhao.addax.common.element.Record; +import com.wgzhao.addax.common.plugin.RecordReceiver; +import com.wgzhao.addax.common.spi.Writer; +import com.wgzhao.addax.common.util.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogContext; +import org.apache.paimon.catalog.CatalogFactory; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.data.*; +import org.apache.paimon.options.Options; +import org.apache.paimon.table.Table; +import org.apache.paimon.table.sink.BatchTableCommit; +import org.apache.paimon.table.sink.BatchTableWrite; +import org.apache.paimon.table.sink.BatchWriteBuilder; +import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.DataTypeRoot; +import org.apache.paimon.types.DecimalType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static com.wgzhao.addax.common.base.Key.KERBEROS_KEYTAB_FILE_PATH; +import static com.wgzhao.addax.common.base.Key.KERBEROS_PRINCIPAL; + + +public class PaimonWriter extends Writer { + public static class Job + extends Writer.Job { + private static final Logger LOG = LoggerFactory.getLogger(Job.class); + private Configuration conf = null; + private BatchWriteBuilder writeBuilder = null; + + @Override + public void init() { + this.conf = this.getPluginJobConf(); + + Options options = PaimonHelper.getOptions(this.conf); + CatalogContext context = PaimonHelper.getCatalogContext(options); + + + + if ("kerberos".equals(options.get("hadoop.security.authentication"))) { + String kerberosKeytabFilePath = options.get(KERBEROS_KEYTAB_FILE_PATH); + String kerberosPrincipal = options.get(KERBEROS_PRINCIPAL); + try { + PaimonHelper.kerberosAuthentication(context.hadoopConf(), kerberosPrincipal, kerberosKeytabFilePath); + LOG.info("kerberos Authentication success"); + + FileSystem fs = FileSystem.get(context.hadoopConf()); + fs.getStatus().getCapacity(); + } catch (Exception e) { + LOG.error("kerberos Authentication error", e); + throw new RuntimeException(e); + } + } + try (Catalog catalog = CatalogFactory.createCatalog(context)) { + + String dbName = this.conf.getString("dbName"); + String tableName = this.conf.getString("tableName"); + Identifier identifier = Identifier.create(dbName, tableName); + + Table table = catalog.getTable(identifier); + + writeBuilder = table.newBatchWriteBuilder(); + + + } catch (Exception e) { + LOG.error("init paimon error", e); + throw new RuntimeException(e); + } + + + } + + @Override + public List split(int mandatoryNumber) { + List configurations = new ArrayList<>(mandatoryNumber); + for (int i = 0; i < mandatoryNumber; i++) { + configurations.add(conf); + } + return configurations; + } + + @Override + public void prepare() { + String writeMode = this.conf.getString("writeMode"); + if ("truncate".equalsIgnoreCase(writeMode)) { + if (writeBuilder != null) { + LOG.info("You specify truncate writeMode, begin to clean history data."); + BatchTableWrite write = writeBuilder.withOverwrite().newWrite(); + BatchTableCommit commit = writeBuilder.newCommit(); + commit.commit(new ArrayList<>()); + try { + write.close(); + } catch (Exception e) { + LOG.error("close paimon write error", e); + throw new RuntimeException(e); + } + } + } + } + + @Override + public void destroy() { + + } + } + + public static class Task + extends Writer.Task { + + private static final Logger log = LoggerFactory.getLogger(Task.class); + private Configuration conf = null; + private BatchWriteBuilder writeBuilder = null; + private Integer batchSize = 1000; + private List columnList = new ArrayList<>(); + private List typeList = new ArrayList<>(); + + @Override + public void startWrite(RecordReceiver recordReceiver) { + + List writerBuffer = new ArrayList<>(this.batchSize); + Record record; + long total = 0; + while ((record = recordReceiver.getFromReader()) != null) { + writerBuffer.add(record); + if (writerBuffer.size() >= this.batchSize) { + total += doBatchInsert(writerBuffer); + writerBuffer.clear(); + } + } + + if (!writerBuffer.isEmpty()) { + total += doBatchInsert(writerBuffer); + writerBuffer.clear(); + } + + String msg = String.format("task end, write size :%d", total); + getTaskPluginCollector().collectMessage("writeSize", String.valueOf(total)); + log.info(msg); + + } + + @Override + public void init() { + this.conf = super.getPluginJobConf(); + + batchSize = conf.getInt("batchSize", 1000); + + Options options = PaimonHelper.getOptions(this.conf); + CatalogContext context = PaimonHelper.getCatalogContext(options); + + if ("kerberos".equals(options.get("hadoop.security.authentication"))) { + String kerberosKeytabFilePath = options.get(KERBEROS_KEYTAB_FILE_PATH); + String kerberosPrincipal = options.get(KERBEROS_PRINCIPAL); + try { + PaimonHelper.kerberosAuthentication(context.hadoopConf(), kerberosPrincipal, kerberosKeytabFilePath); + log.info("kerberos Authentication success"); + } catch (Exception e) { + log.error("kerberos Authentication error", e); + throw new RuntimeException(e); + } + } + + try (Catalog catalog = CatalogFactory.createCatalog(context)) { + + String dbName = this.conf.getString("dbName"); + String tableName = this.conf.getString("tableName"); + Identifier identifier = Identifier.create(dbName, tableName); + + Table table = catalog.getTable(identifier); + + columnList = table.rowType().getFields(); + typeList = table.rowType().getFieldTypes(); + writeBuilder = table.newBatchWriteBuilder(); + + + } catch (Exception e) { + log.error("init paimon error", e); + throw new RuntimeException(e); + } + } + + @Override + public void destroy() { + + } + + private long doBatchInsert(final List writerBuffer) { + BatchTableWrite write = writeBuilder.newWrite(); + GenericRow data; + for (Record record : writerBuffer) { + data = new GenericRow(columnList.size()); + StringBuilder id = new StringBuilder(); + for (int i = 0; i < record.getColumnNumber(); i++) { + Column column = record.getColumn(i); + if(column ==null ){ + continue; + } + if (i >= columnList.size()) { + throw new RuntimeException("columnList size is " + columnList.size() + ", but record column number is " + record.getColumnNumber()); + } + String columnName = columnList.get(i).name(); + DataType columnType = typeList.get(i); + //如果是数组类型,那它传入的必是字符串类型 + if (columnType.getTypeRoot().equals(DataTypeRoot.ARRAY)) { + if (null == column.asString()) { + data.setField(i, null); + } else { + String[] dataList = column.asString().split(","); + data.setField(i, new GenericArray(dataList)); + } + } else { + switch (columnType.getTypeRoot()) { + + case DATE: + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + case TIMESTAMP_WITHOUT_TIME_ZONE: + try { + if(column.asLong()!=null) { + data.setField(i, Timestamp.fromEpochMillis(column.asLong())); + } else { + data.setField(i, null); + } + } catch (Exception e) { + getTaskPluginCollector().collectDirtyRecord(record, String.format("时间类型解析失败 [%s:%s] exception: %s", columnName, column.toString(), e)); + } + break; + case CHAR: + case VARCHAR: + data.setField(i, BinaryString.fromString(column.asString())); + break; + case BOOLEAN: + data.setField(i, column.asBoolean()); + break; + case VARBINARY: + case BINARY: + data.setField(i, column.asBytes()); + break; + case BIGINT: + data.setField(i, column.asLong()); + break; + case INTEGER: + case SMALLINT: + case TINYINT: + data.setField(i, column.asBigInteger()==null?null:column.asBigInteger().intValue()); + break; + case FLOAT: + case DOUBLE: + + data.setField(i, column.asDouble()); + break; + case DECIMAL: + if(column.asBigDecimal()!=null) { + data.setField(i, Decimal.fromBigDecimal(column.asBigDecimal(), ((DecimalType) columnType).getPrecision(), ((DecimalType) columnType).getScale())); + } else { + data.setField(i, null); + } + break; + case MAP: + try { + data.setField(i, new GenericMap(JSON.parseObject(column.asString(), Map.class))); + } catch (Exception e) { + getTaskPluginCollector().collectDirtyRecord(record, String.format("MAP类型解析失败 [%s:%s] exception: %s", columnName, column.toString(), e)); + } + break; + default: + getTaskPluginCollector().collectDirtyRecord(record, "类型错误:不支持的类型:" + columnType + " " + columnName); + } + } + } + + try { + write.write(data); + } catch (Exception e) { + throw new RuntimeException(e); + } + + } + + List messages = null; + try { + messages = write.prepareCommit(); + BatchTableCommit commit = writeBuilder.newCommit(); + commit.commit(messages); + + + write.close(); + + return messages.size(); + } catch (Exception e) { + throw new RuntimeException(e); + } + + + } + + } +} diff --git a/plugin/writer/paimonwriter/src/main/resources/plugin.json b/plugin/writer/paimonwriter/src/main/resources/plugin.json new file mode 100644 index 000000000..0eb2be570 --- /dev/null +++ b/plugin/writer/paimonwriter/src/main/resources/plugin.json @@ -0,0 +1,6 @@ +{ + "name": "paimonwriter", + "class": "com.wgzhao.addax.plugin.writer.paimonwriter.PaimonWriter", + "description": "write data to paimon", + "developer": "awol2005ex" +} diff --git a/plugin/writer/paimonwriter/src/main/resources/plugin_job_template.json b/plugin/writer/paimonwriter/src/main/resources/plugin_job_template.json new file mode 100644 index 000000000..681c940f9 --- /dev/null +++ b/plugin/writer/paimonwriter/src/main/resources/plugin_job_template.json @@ -0,0 +1,13 @@ +{ + "name": "paimonwriter", + "parameter": { + + "dbName": "test", + "tableName": "test", + "writeMode": "truncate", + "paimonConfig": { + "warehouse": "file:///tmp/paimon", + "metastore": "filesystem" + } + } +} diff --git a/pom.xml b/pom.xml index 98d64beaf..9acba56ae 100644 --- a/pom.xml +++ b/pom.xml @@ -384,6 +384,7 @@ plugin/writer/mongodbwriter plugin/writer/mysqlwriter plugin/writer/oraclewriter + plugin/writer/paimonwriter plugin/writer/postgresqlwriter plugin/writer/rdbmswriter plugin/writer/rediswriter