diff --git a/docs/assets/jobs/paimonwriter.json b/docs/assets/jobs/paimonwriter.json new file mode 100644 index 000000000..84c97304a --- /dev/null +++ b/docs/assets/jobs/paimonwriter.json @@ -0,0 +1,48 @@ +{ + "job": { + "setting": { + "speed": { + "channel": 3 + }, + "errorLimit": { + "record": 0, + "percentage": 0 + } + }, + "content": [ + { + "reader": { + "name": "rdbmsreader", + "parameter": { + "username": "root", + "password": "root", + "column": [ + "*" + ], + "connection": [ + { + "querySql": [ + "select 1+0 id ,'test1' as name" + ], + "jdbcUrl": ["jdbc:mysql://localhost:3306/ruoyi_vue_camunda?allowPublicKeyRetrieval=true",] + } + ], + "fetchSize": 1024 + } + }, + "writer": { + "name": "paimonwriter", + "parameter": { + "dbName": "test", + "tableName": "test2", + "writeMode": "truncate", + "paimonConfig": { + "warehouse": "file:///g:/paimon", + "metastore": "filesystem" + } + } + } + } + ] + } +} \ No newline at end of file diff --git a/docs/writer/paimonwriter.md b/docs/writer/paimonwriter.md index a1973c1ea..f2c07a5db 100644 --- a/docs/writer/paimonwriter.md +++ b/docs/writer/paimonwriter.md @@ -29,6 +29,296 @@ Paimon Writer 提供向 已有的paimon表写入数据的能力。 ### paimonConfig `paimonConfig` 里可以配置与 Paimon catalog和Hadoop 相关的一些高级参数,比如HA的配置 + +本地目录创建paimon表 + +pom.xml + +```xml + + + 4.0.0 + + com.test + paimon-java-api-test + 1.0-SNAPSHOT + + + 8 + 8 + UTF-8 + 3.2.4 + 7.0.0 + + + + org.apache.paimon + paimon-bundle + 1.0.0 + + + + + org.apache.hadoop + hadoop-common + ${hadoop.version} + + + com.fasterxml.jackson.core + jackson-databind + + + org.codehaus.jackson + jackson-core-asl + + + org.codehaus.jackson + jackson-mapper-asl + + + com.fasterxml.woodstox + woodstox-core + + + commons-codec + commons-codec + + + commons-net + commons-net + + + io.netty + netty + + + log4j + log4j + + + net.minidev + json-smart + + + org.codehaus.jettison + jettison + + + org.eclipse.jetty + jetty-server + + + org.xerial.snappy + snappy-java + + + org.apache.zookeeper + zookeeper + + + org.eclipse.jetty + jetty-util + + + + + + org.apache.hadoop + hadoop-aws + ${hadoop.version} + + + com.fasterxml.jackson.core + jackson-databind + + + org.codehaus.jackson + jackson-core-asl + + + org.codehaus.jackson + jackson-mapper-asl + + + com.fasterxml.woodstox + woodstox-core + + + commons-codec + commons-codec + + + commons-net + commons-net + + + io.netty + netty + + + log4j + log4j + + + net.minidev + json-smart + + + org.codehaus.jettison + jettison + + + org.eclipse.jetty + jetty-server + + + org.xerial.snappy + snappy-java + + + org.apache.zookeeper + zookeeper + + + org.eclipse.jetty + jetty-util + + + + + + org.apache.hadoop + hadoop-mapreduce-client-core + ${hadoop.version} + + + com.fasterxml.jackson.core + jackson-databind + + + commons-codec + commons-codec + + + io.netty + netty + + + org.eclipse.jetty + jetty-util + + + + + + + com.fasterxml.woodstox + woodstox-core + ${woodstox.version} + + + +``` + +```java + +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogContext; +import org.apache.paimon.catalog.CatalogFactory; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.fs.Path; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.types.DataTypes; + +import java.util.HashMap; +import java.util.Map; + +public class CreatePaimonTable { + + public static Catalog createFilesystemCatalog() { + CatalogContext context = CatalogContext.create(new Path("file:///g:/paimon")); + return CatalogFactory.createCatalog(context); + } + /* 如果是minio则例子如下 + + public static Catalog createFilesystemCatalog() { + Options options = new Options(); + options.set("warehouse", "s3a://pvc-91d1e2cd-4d25-45c9-8613-6c4f7bf0a4cc/paimon"); + Configuration hadoopConf = new Configuration(); + hadoopConf.set("fs.s3a.endpoint", "http://localhost:9000"); + hadoopConf.set("fs.s3a.access.key", "gy0dX5lALP176g6c9fYf"); + hadoopConf.set("fs.s3a.secret.key", "ReuUrCzzu5wKWAegtswoHIWV389BYl9AB1ZQbiKr"); + hadoopConf.set("fs.s3a.connection.ssl.enabled", "false"); + hadoopConf.set("fs.s3a.path.style.access", "true"); + hadoopConf.set("fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem"); + CatalogContext context = CatalogContext.create(options,hadoopConf); + + + return CatalogFactory.createCatalog(context); + } + * + * + * */ + + public static void main(String[] args) { + Schema.Builder schemaBuilder = Schema.newBuilder(); + schemaBuilder.primaryKey("id"); + schemaBuilder.column("id", DataTypes.INT()); + schemaBuilder.column("name", DataTypes.STRING()); + Map options = new HashMap<>(); + options.put("bucket", "1");//由于paimon java api 限制需要bucket>0 + options.put("bucket-key", "id"); + options.put("file.format", "orc"); + options.put("file.compression", "lz4"); + options.put("lookup.cache-spill-compression", "lz4"); + options.put("spill-compression", "LZ4"); + options.put("orc.compress", "lz4"); + options.put("manifest.format", "orc"); + + schemaBuilder.options(options); + Schema schema = schemaBuilder.build(); + + Identifier identifier = Identifier.create("test", "test2"); + try { + Catalog catalog = CreatePaimonTable.createFilesystemCatalog(); + catalog.createDatabase("test",true); + catalog.createTable(identifier, schema, true); + } catch (Catalog.TableAlreadyExistException e) { + e.printStackTrace(); + } catch (Catalog.DatabaseNotExistException e) { + e.printStackTrace(); + } catch (Catalog.DatabaseAlreadyExistException e) { + throw new RuntimeException(e); + } + + + } +} + + +``` + +Spark 或者 flink 环境创建表 + +```sql +CREATE TABLE if not exists test.test2(id int ,name string) tblproperties ( + 'primary-key' = 'id', + 'bucket' = '1', + 'bucket-key' = 'id' + 'file.format'='orc', + 'file.compression'='lz4', + 'lookup.cache-spill-compression'='lz4', + 'spill-compression'='LZ4', + 'orc.compress'='lz4', + 'manifest.format'='orc' +) + +``` + +本地文件例子 + ```json { "name": "paimonwriter", @@ -43,6 +333,70 @@ Paimon Writer 提供向 已有的paimon表写入数据的能力。 } } ``` + +s3 或者 minio catalog例子 +```json +{ + "job": { + "setting": { + "speed": { + "channel": 3 + }, + "errorLimit": { + "record": 0, + "percentage": 0 + } + }, + "content": [ + { + "reader": { + "name": "rdbmsreader", + "parameter": { + "username": "root", + "password": "root", + "column": [ + "*" + ], + "connection": [ + { + "querySql": [ + "select 1+0 id ,'test1' as name" + ], + "jdbcUrl": [ + "jdbc:mysql://localhost:3306/ruoyi_vue_camunda?allowPublicKeyRetrieval=true" + ] + } + ], + "fetchSize": 1024 + } + }, + "writer": { + "name": "paimonwriter", + "parameter": { + "dbName": "test", + "tableName": "test2", + "writeMode": "truncate", + "paimonConfig": { + "warehouse": "s3a://pvc-91d1e2cd-4d25-45c9-8613-6c4f7bf0a4cc/paimon", + "metastore": "filesystem", + "fs.s3a.endpoint": "http://localhost:9000", + "fs.s3a.access.key": "gy0dX5lALP176g6c9fYf", + "fs.s3a.secret.key": "ReuUrCzzu5wKWAegtswoHIWV389BYl9AB1ZQbiKr", + "fs.s3a.connection.ssl.enabled": "false", + "fs.s3a.path.style.access": "true", + "fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem" + } + } + } + } + ] + } +} +``` + + +hdfs catalog例子 + ```json { "paimonConfig": { diff --git a/plugin/writer/paimonwriter/src/main/java/com/wgzhao/addax/plugin/writer/paimonwriter/PaimonHelper.java b/plugin/writer/paimonwriter/src/main/java/com/wgzhao/addax/plugin/writer/paimonwriter/PaimonHelper.java index 187080ead..f264c163d 100644 --- a/plugin/writer/paimonwriter/src/main/java/com/wgzhao/addax/plugin/writer/paimonwriter/PaimonHelper.java +++ b/plugin/writer/paimonwriter/src/main/java/com/wgzhao/addax/plugin/writer/paimonwriter/PaimonHelper.java @@ -36,7 +36,11 @@ public static Options getOptions(Configuration conf){ public static CatalogContext getCatalogContext(Options options) { CatalogContext context = null; - if(options.get("warehouse").startsWith("hdfs://")){ + String warehouse=options.get("warehouse"); + if (warehouse ==null || warehouse.isEmpty()){ + throw new RuntimeException("warehouse of the paimonConfig is null"); + } + if(warehouse.startsWith("hdfs://")||warehouse.startsWith("s3a://")){ org.apache.hadoop.conf.Configuration hadoopConf = new org.apache.hadoop.conf.Configuration(); options.toMap().forEach((k, v) -> hadoopConf.set(k, String.valueOf(v)));