From 35807561f23504dd7f77ca0e19137c94a25804ef Mon Sep 17 00:00:00 2001 From: XiaoJiang521 Date: Sun, 2 Jul 2023 11:43:25 +0800 Subject: [PATCH 01/16] [feature][jdbc] Jdbc database support identifier --- .../seatunnel/jdbc/config/JdbcOptions.java | 6 +++ .../jdbc/internal/dialect/JdbcDialect.java | 23 +++++++++++- .../internal/dialect/JdbcDialectFactory.java | 2 +- .../internal/dialect/JdbcDialectLoader.java | 8 +++- .../dialect/dialectenum/FieldIdeEnum.java | 17 +++++++++ .../dialect/mysql/MySqlDialectFactory.java | 7 ++++ .../internal/dialect/mysql/MysqlDialect.java | 14 +++++++ .../oceanbase/OceanBaseDialectFactory.java | 2 +- .../dialect/oracle/OracleDialect.java | 21 ++++++++++- .../dialect/oracle/OracleDialectFactory.java | 7 ++++ .../dialect/psql/PostgresDialect.java | 37 +++++++++++++++++++ .../dialect/psql/PostgresDialectFactory.java | 7 ++++ .../dialect/sqlserver/SqlServerDialect.java | 32 ++++++++++++++++ .../sqlserver/SqlServerDialectFactory.java | 7 ++++ .../seatunnel/jdbc/sink/JdbcSinkFactory.java | 5 ++- 15 files changed, 188 insertions(+), 7 deletions(-) create mode 100644 seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dialectenum/FieldIdeEnum.java diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java index 24ae0580f325..b4278f587c35 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java @@ -144,4 +144,10 @@ public interface JdbcOptions { .intType() .noDefaultValue() .withDescription("partition num"); + + Option FIELD_IDE = + Options.key("field_ide") + .stringType() + .noDefaultValue() + .withDescription("Whether case conversion is required"); } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java index f36067b3c2be..63e8f4fbb7e9 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java @@ -19,6 +19,9 @@ import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSourceConfig; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.FieldIdeEnum; + +import org.apache.commons.lang3.StringUtils; import java.io.Serializable; import java.sql.Connection; @@ -63,9 +66,13 @@ public interface JdbcDialect extends Serializable { default String quoteIdentifier(String identifier) { return identifier; } + /** Quotes the identifier for database name or field name */ + default String quoteDatabaseIdentifier(String identifier) { + return identifier; + } default String tableIdentifier(String database, String tableName) { - return quoteIdentifier(database) + "." + quoteIdentifier(tableName); + return quoteDatabaseIdentifier(database) + "." + quoteIdentifier(tableName); } /** @@ -196,4 +203,18 @@ default ResultSetMetaData getResultSetMetaData( PreparedStatement ps = conn.prepareStatement(jdbcSourceConfig.getQuery()); return ps.getMetaData(); } + + default String getFieldIde(String identifier, String fieldIde) { + if (StringUtils.isEmpty(fieldIde)) { + return identifier; + } + switch (FieldIdeEnum.valueOf(fieldIde.toUpperCase())) { + case LOWERCASE: + return identifier.toLowerCase(); + case UPPERCASE: + return identifier.toUpperCase(); + default: + return identifier; + } + } } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectFactory.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectFactory.java index 3d66de659092..5439937f53d9 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectFactory.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectFactory.java @@ -44,7 +44,7 @@ public interface JdbcDialectFactory { * @param compatibleMode The compatible mode * @return a new instance of {@link JdbcDialect} */ - default JdbcDialect create(String compatibleMode) { + default JdbcDialect create(String compatibleMode, String fieldId) { return create(); } } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectLoader.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectLoader.java index b49df35ff3f8..350a22e20c6c 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectLoader.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectLoader.java @@ -36,6 +36,10 @@ public final class JdbcDialectLoader { private JdbcDialectLoader() {} + public static JdbcDialect load(String url, String compatibleMode) { + return load(url, compatibleMode, ""); + } + /** * Loads the unique JDBC Dialect that can handle the given database url. * @@ -45,7 +49,7 @@ private JdbcDialectLoader() {} * unambiguously process the given database URL. * @return The loaded dialect. */ - public static JdbcDialect load(String url, String compatibleMode) { + public static JdbcDialect load(String url, String compatibleMode, String fieldIde) { ClassLoader cl = Thread.currentThread().getContextClassLoader(); List foundFactories = discoverFactories(cl); @@ -90,7 +94,7 @@ public static JdbcDialect load(String url, String compatibleMode) { .collect(Collectors.joining("\n")))); } - return matchingFactories.get(0).create(compatibleMode); + return matchingFactories.get(0).create(compatibleMode, fieldIde); } private static List discoverFactories(ClassLoader classLoader) { diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dialectenum/FieldIdeEnum.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dialectenum/FieldIdeEnum.java new file mode 100644 index 000000000000..fadffe0dac9d --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dialectenum/FieldIdeEnum.java @@ -0,0 +1,17 @@ +package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum; + +public enum FieldIdeEnum { + ORIGINAL("original"), // Original string form + UPPERCASE("uppercase"), // Convert to uppercase + LOWERCASE("lowercase"); // Convert to lowercase + + private final String value; + + FieldIdeEnum(String value) { + this.value = value; + } + + public String getValue() { + return value; + } +} diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlDialectFactory.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlDialectFactory.java index 10047311b933..a4f89a4dc857 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlDialectFactory.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlDialectFactory.java @@ -22,6 +22,8 @@ import com.google.auto.service.AutoService; +import javax.annotation.Nonnull; + /** Factory for {@link MysqlDialect}. */ @AutoService(JdbcDialectFactory.class) public class MySqlDialectFactory implements JdbcDialectFactory { @@ -34,4 +36,9 @@ public boolean acceptsURL(String url) { public JdbcDialect create() { return new MysqlDialect(); } + + @Override + public JdbcDialect create(@Nonnull String compatibleMode, String fieldIde) { + return new MysqlDialect(fieldIde); + } } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java index 128b8ae4be95..9c48651c30fa 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java @@ -20,6 +20,7 @@ import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.FieldIdeEnum; import java.sql.Connection; import java.sql.PreparedStatement; @@ -30,6 +31,14 @@ import java.util.stream.Collectors; public class MysqlDialect implements JdbcDialect { + public String fieldIde = FieldIdeEnum.ORIGINAL.getValue(); + + public MysqlDialect() {} + + public MysqlDialect(String fieldIde) { + this.fieldIde = fieldIde; + } + @Override public String dialectName() { return "MySQL"; @@ -47,6 +56,11 @@ public JdbcDialectTypeMapper getJdbcDialectTypeMapper() { @Override public String quoteIdentifier(String identifier) { + return "`" + getFieldIde(identifier, fieldIde) + "`"; + } + + @Override + public String quoteDatabaseIdentifier(String identifier) { return "`" + identifier + "`"; } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseDialectFactory.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseDialectFactory.java index 66df84205ed1..b3a456870cc3 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseDialectFactory.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseDialectFactory.java @@ -40,7 +40,7 @@ public JdbcDialect create() { } @Override - public JdbcDialect create(@Nonnull String compatibleMode) { + public JdbcDialect create(@Nonnull String compatibleMode, String fieldIde) { if ("oracle".equalsIgnoreCase(compatibleMode)) { return new OracleDialect(); } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java index 1ca10739e1d8..0c51e60dcb96 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java @@ -20,6 +20,7 @@ import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.FieldIdeEnum; import java.sql.Connection; import java.sql.PreparedStatement; @@ -33,6 +34,13 @@ public class OracleDialect implements JdbcDialect { private static final int DEFAULT_ORACLE_FETCH_SIZE = 128; + public String fieldIde = FieldIdeEnum.ORIGINAL.getValue(); + + public OracleDialect(String fieldIde) { + this.fieldIde = fieldIde; + } + + public OracleDialect() {} @Override public String dialectName() { @@ -51,7 +59,18 @@ public JdbcDialectTypeMapper getJdbcDialectTypeMapper() { @Override public String quoteIdentifier(String identifier) { - return identifier; + if (identifier.contains(".")) { + String[] parts = identifier.split("\\."); + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < parts.length - 1; i++) { + sb.append("\"").append(parts[i]).append("\"").append("."); + } + return sb.append("\"") + .append(getFieldIde(parts[parts.length - 1], fieldIde)) + .append("\"") + .toString(); + } + return "\"" + getFieldIde(identifier, fieldIde) + "\""; } @Override diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialectFactory.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialectFactory.java index 168dc4d89022..121098c46147 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialectFactory.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialectFactory.java @@ -22,6 +22,8 @@ import com.google.auto.service.AutoService; +import javax.annotation.Nonnull; + /** Factory for {@link OracleDialect}. */ @AutoService(JdbcDialectFactory.class) public class OracleDialectFactory implements JdbcDialectFactory { @@ -34,4 +36,9 @@ public boolean acceptsURL(String url) { public JdbcDialect create() { return new OracleDialect(); } + + @Override + public JdbcDialect create(@Nonnull String compatibleMode, String fieldIde) { + return new OracleDialect(fieldIde); + } } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java index b36a28a5a609..f206589af59e 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java @@ -20,6 +20,7 @@ import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.FieldIdeEnum; import java.sql.Connection; import java.sql.PreparedStatement; @@ -33,6 +34,14 @@ public class PostgresDialect implements JdbcDialect { public static final int DEFAULT_POSTGRES_FETCH_SIZE = 128; + public String fieldIde = FieldIdeEnum.ORIGINAL.getValue(); + + public PostgresDialect() {} + + public PostgresDialect(String fieldIde) { + this.fieldIde = fieldIde; + } + @Override public String dialectName() { return "PostgreSQL"; @@ -88,4 +97,32 @@ public PreparedStatement creatPreparedStatement( } return statement; } + + @Override + public String tableIdentifier(String database, String tableName) { + // resolve pg database name upper or lower not recognised + return quoteDatabaseIdentifier(database) + "." + quoteIdentifier(tableName); + } + + @Override + public String quoteIdentifier(String identifier) { + if (identifier.contains(".")) { + String[] parts = identifier.split("\\."); + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < parts.length - 1; i++) { + sb.append("\"").append(parts[i]).append("\"").append("."); + } + return sb.append("\"") + .append(getFieldIde(parts[parts.length - 1], fieldIde)) + .append("\"") + .toString(); + } + + return "\"" + getFieldIde(identifier, fieldIde) + "\""; + } + + @Override + public String quoteDatabaseIdentifier(String identifier) { + return "\"" + identifier + "\""; + } } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialectFactory.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialectFactory.java index 963f7385e231..ebccaf5132b1 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialectFactory.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialectFactory.java @@ -22,6 +22,8 @@ import com.google.auto.service.AutoService; +import javax.annotation.Nonnull; + @AutoService(JdbcDialectFactory.class) public class PostgresDialectFactory implements JdbcDialectFactory { @Override @@ -33,4 +35,9 @@ public boolean acceptsURL(String url) { public JdbcDialect create() { return new PostgresDialect(); } + + @Override + public JdbcDialect create(@Nonnull String compatibleMode, String fieldIde) { + return new PostgresDialect(fieldIde); + } } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java index 697d2d2dc176..1a0fe303bf5b 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java @@ -20,6 +20,7 @@ import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.FieldIdeEnum; import java.util.Arrays; import java.util.List; @@ -27,6 +28,15 @@ import java.util.stream.Collectors; public class SqlServerDialect implements JdbcDialect { + + public String fieldIde = FieldIdeEnum.ORIGINAL.getValue(); + + public SqlServerDialect() {} + + public SqlServerDialect(String fieldIde) { + this.fieldIde = fieldIde; + } + @Override public String dialectName() { return "Sqlserver"; @@ -100,4 +110,26 @@ public Optional getUpsertStatement( return Optional.of(upsertSQL); } + + @Override + public String quoteIdentifier(String identifier) { + if (identifier.contains(".")) { + String[] parts = identifier.split("\\."); + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < parts.length - 1; i++) { + sb.append("[").append(parts[i]).append("]").append("."); + } + return sb.append("[") + .append(getFieldIde(parts[parts.length - 1], fieldIde)) + .append("]") + .toString(); + } + + return "[" + getFieldIde(identifier, fieldIde) + "]"; + } + + @Override + public String quoteDatabaseIdentifier(String identifier) { + return "[" + identifier + "]"; + } } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialectFactory.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialectFactory.java index d8fce3c43c14..d7dae4efd573 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialectFactory.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialectFactory.java @@ -22,6 +22,8 @@ import com.google.auto.service.AutoService; +import javax.annotation.Nonnull; + /** Factory for {@link SqlServerDialect}. */ @AutoService(JdbcDialectFactory.class) public class SqlServerDialectFactory implements JdbcDialectFactory { @@ -34,4 +36,9 @@ public boolean acceptsURL(String url) { public JdbcDialect create() { return new SqlServerDialect(); } + + @Override + public JdbcDialect create(@Nonnull String compatibleMode, String fieldIde) { + return new SqlServerDialect(fieldIde); + } } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java index a9bb1c15554b..3ac918a774e6 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java @@ -26,6 +26,7 @@ import org.apache.seatunnel.api.table.factory.Factory; import org.apache.seatunnel.api.table.factory.TableFactoryContext; import org.apache.seatunnel.api.table.factory.TableSinkFactory; +import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions; import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSinkConfig; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectLoader; @@ -83,10 +84,12 @@ public TableSink createSink(TableFactoryContext context) { } final ReadonlyConfig options = config; JdbcSinkConfig sinkConfig = JdbcSinkConfig.of(config); + String fieldIde = config.get(JdbcOptions.FIELD_IDE); JdbcDialect dialect = JdbcDialectLoader.load( sinkConfig.getJdbcConnectionConfig().getUrl(), - sinkConfig.getJdbcConnectionConfig().getCompatibleMode()); + sinkConfig.getJdbcConnectionConfig().getCompatibleMode(), + fieldIde); return () -> new JdbcSink( options, From 68707cb21eb43da15d6dee53395a1aad157df623 Mon Sep 17 00:00:00 2001 From: jiayang Date: Sat, 15 Jul 2023 11:55:28 +0800 Subject: [PATCH 02/16] [feature][jdbc] Add license --- .../dialect/dialectenum/FieldIdeEnum.java | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dialectenum/FieldIdeEnum.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dialectenum/FieldIdeEnum.java index fadffe0dac9d..39f95210623b 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dialectenum/FieldIdeEnum.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dialectenum/FieldIdeEnum.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum; public enum FieldIdeEnum { From 8ef5634cea87e687ab8d20102c55c29c9e3e721a Mon Sep 17 00:00:00 2001 From: jiayang Date: Sat, 15 Jul 2023 16:19:49 +0800 Subject: [PATCH 03/16] [feature][jdbc] update sqlserver e2e and enum --- .../connectors/seatunnel/jdbc/config/JdbcOptions.java | 5 +++-- .../connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java | 7 +++++-- .../src/test/resources/sqlservercdc_to_console.conf | 4 ++-- 3 files changed, 10 insertions(+), 6 deletions(-) diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java index b4278f587c35..154ad1ce4df5 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java @@ -19,6 +19,7 @@ import org.apache.seatunnel.api.configuration.Option; import org.apache.seatunnel.api.configuration.Options; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.FieldIdeEnum; import java.math.BigDecimal; import java.util.List; @@ -145,9 +146,9 @@ public interface JdbcOptions { .noDefaultValue() .withDescription("partition num"); - Option FIELD_IDE = + Option FIELD_IDE = Options.key("field_ide") - .stringType() + .enumType(FieldIdeEnum.class) .noDefaultValue() .withDescription("Whether case conversion is required"); } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java index 3ac918a774e6..6fa23b37ea88 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java @@ -30,6 +30,7 @@ import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSinkConfig; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectLoader; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.FieldIdeEnum; import org.apache.commons.collections4.CollectionUtils; @@ -84,12 +85,14 @@ public TableSink createSink(TableFactoryContext context) { } final ReadonlyConfig options = config; JdbcSinkConfig sinkConfig = JdbcSinkConfig.of(config); - String fieldIde = config.get(JdbcOptions.FIELD_IDE); + FieldIdeEnum fieldIdeEnum = config.get(JdbcOptions.FIELD_IDE); JdbcDialect dialect = JdbcDialectLoader.load( sinkConfig.getJdbcConnectionConfig().getUrl(), sinkConfig.getJdbcConnectionConfig().getCompatibleMode(), - fieldIde); + fieldIdeEnum == null + ? FieldIdeEnum.ORIGINAL.getValue() + : fieldIdeEnum.getValue()); return () -> new JdbcSink( options, diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/resources/sqlservercdc_to_console.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/resources/sqlservercdc_to_console.conf index c4ac06877b13..9d3f041ede14 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/resources/sqlservercdc_to_console.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/resources/sqlservercdc_to_console.conf @@ -51,8 +51,8 @@ sink { user = "sa" password = "Password!" generate_sink_sql = true - database = "" - table = "column_type_test.dbo.full_types_sink" + database = "column_type_test" + table = "dbo.full_types_sink" batch_size = 1 primary_keys = ["id"] } From 92af09d78e0fc1ec911e5ce57270d7f25ebdbe8f Mon Sep 17 00:00:00 2001 From: jiayang Date: Fri, 21 Jul 2023 20:09:07 +0800 Subject: [PATCH 04/16] [e2e] jdbc ide add e2e --- .../seatunnel/jdbc/JdbcPostgresIT.java | 387 ++++++++++++++++++ .../jdbc_postgres_ide_source_and_sink.conf | 48 +++ 2 files changed, 435 insertions(+) create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_postgres_ide_source_and_sink.conf diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java new file mode 100644 index 000000000000..ebf16bf59cbf --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java @@ -0,0 +1,387 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc; + +import org.apache.seatunnel.e2e.common.TestResource; +import org.apache.seatunnel.e2e.common.TestSuiteBase; +import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory; +import org.apache.seatunnel.e2e.common.container.EngineType; +import org.apache.seatunnel.e2e.common.container.TestContainer; +import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer; +import org.apache.seatunnel.e2e.common.junit.TestContainerExtension; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.TestTemplate; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.PostgreSQLContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; +import org.testcontainers.utility.DockerImageName; +import org.testcontainers.utility.DockerLoggerFactory; + +import com.google.common.collect.Lists; +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; + +import static org.awaitility.Awaitility.given; + +@Slf4j +@DisabledOnContainer( + value = {}, + type = {EngineType.SPARK, EngineType.FLINK}, + disabledReason = "Currently SPARK and FLINK do not support cdc") +public class JdbcPostgresIT extends TestSuiteBase implements TestResource { + private static final String PG_IMAGE = "postgis/postgis"; + private static final String PG_DRIVER_JAR = + "https://repo1.maven.org/maven2/org/postgresql/postgresql/42.3.3/postgresql-42.3.3.jar"; + private static final String PG_JDBC_JAR = + "https://repo1.maven.org/maven2/net/postgis/postgis-jdbc/2.5.1/postgis-jdbc-2.5.1.jar"; + private static final String PG_GEOMETRY_JAR = + "https://repo1.maven.org/maven2/net/postgis/postgis-geometry/2.5.1/postgis-geometry-2.5.1.jar"; + private static final List PG_CONFIG_FILE_LIST = + Lists.newArrayList("/jdbc_postgres_ide_source_and_sink.conf"); + private PostgreSQLContainer POSTGRESQL_CONTAINER; + private static final String PG_SOURCE_DDL = + "CREATE TABLE IF NOT EXISTS pg_ide_source_table (\n" + + " gid SERIAL PRIMARY KEY,\n" + + " text_col TEXT,\n" + + " varchar_col VARCHAR(255),\n" + + " char_col CHAR(10),\n" + + " boolean_col bool,\n" + + " smallint_col int2,\n" + + " integer_col int4,\n" + + " bigint_col BIGINT,\n" + + " decimal_col DECIMAL(10, 2),\n" + + " numeric_col NUMERIC(8, 4),\n" + + " real_col float4,\n" + + " double_precision_col float8,\n" + + " smallserial_col SMALLSERIAL,\n" + + " serial_col SERIAL,\n" + + " bigserial_col BIGSERIAL,\n" + + " date_col DATE,\n" + + " timestamp_col TIMESTAMP,\n" + + " bpchar_col BPCHAR(10),\n" + + " age INT NOT null,\n" + + " name VARCHAR(255) NOT null,\n" + + " point geometry(POINT, 4326),\n" + + " linestring geometry(LINESTRING, 4326),\n" + + " polygon_colums geometry(POLYGON, 4326),\n" + + " multipoint geometry(MULTIPOINT, 4326),\n" + + " multilinestring geometry(MULTILINESTRING, 4326),\n" + + " multipolygon geometry(MULTIPOLYGON, 4326),\n" + + " geometrycollection geometry(GEOMETRYCOLLECTION, 4326),\n" + + " geog geography(POINT, 4326)\n" + + ")"; + private static final String PG_SINK_DDL = + "CREATE TABLE IF NOT EXISTS pg_ide_sink_table (\n" + + " GID SERIAL PRIMARY KEY,\n" + + " TEXT_COL TEXT,\n" + + " VARCHAR_COL VARCHAR(255),\n" + + " CHAR_COL CHAR(10),\n" + + " BOOLEAN_COL bool,\n" + + " SMALLINT_COL int2,\n" + + " INTEGER_COL int4,\n" + + " BIGINT_COL BIGINT,\n" + + " DECIMAL_COL DECIMAL(10, 2),\n" + + " NUMERIC_COL NUMERIC(8, 4),\n" + + " REAL_COL float4,\n" + + " DOUBLE_PRECISION_COL float8,\n" + + " SMALLSERIAL_COL SMALLSERIAL,\n" + + " SERIAL_COL SERIAL,\n" + + " BIGSERIAL_COL BIGSERIAL,\n" + + " DATE_COL DATE,\n" + + " TIMESTAMP_COL TIMESTAMP,\n" + + " BPCHAR_COL BPCHAR(10),\n" + + " AGE int4 NOT NULL,\n" + + " NAME varchar(255) NOT NULL,\n" + + " POINT varchar(2000) NULL,\n" + + " LINESTRING varchar(2000) NULL,\n" + + " POLYGON_COLUMS varchar(2000) NULL,\n" + + " MULTIPOINT varchar(2000) NULL,\n" + + " MULTILINESTRING varchar(2000) NULL,\n" + + " MULTIPOLYGON varchar(2000) NULL,\n" + + " GEOMETRYCOLLECTION varchar(2000) NULL,\n" + + " GEOG varchar(2000) NULL\n" + + " )"; + + private static final String SOURCE_SQL = + "select \n" + + "gid,\n" + + "text_col,\n" + + "varchar_col,\n" + + "char_col,\n" + + "boolean_col,\n" + + "smallint_col,\n" + + "integer_col,\n" + + "bigint_col,\n" + + "decimal_col,\n" + + "numeric_col,\n" + + "real_col,\n" + + "double_precision_col,\n" + + "smallserial_col,\n" + + "serial_col,\n" + + "bigserial_col,\n" + + "date_col,\n" + + "timestamp_col,\n" + + "bpchar_col,\n" + + "age,\n" + + "name,\n" + + "point,\n" + + "linestring,\n" + + "polygon_colums,\n" + + "multipoint,\n" + + "multilinestring,\n" + + "multipolygon,\n" + + "geometrycollection,\n" + + "geog\n" + + " from pg_ide_source_table"; + private static final String SINK_SQL = + "SELECT\n" + + " GID,\n" + + " TEXT_COL,\n" + + " VARCHAR_COL,\n" + + " CHAR_COL,\n" + + " BOOLEAN_COL,\n" + + " SMALLINT_COL,\n" + + " INTEGER_COL,\n" + + " BIGINT_COL,\n" + + " DECIMAL_COL,\n" + + " NUMERIC_COL,\n" + + " REAL_COL,\n" + + " DOUBLE_PRECISION_COL,\n" + + " SMALLSERIAL_COL,\n" + + " SERIAL_COL,\n" + + " BIGSERIAL_COL,\n" + + " DATE_COL,\n" + + " TIMESTAMP_COL,\n" + + " BPCHAR_COL,\n" + + " AGE,\n" + + " NAME,\n" + + " CAST(POINT AS GEOMETRY) AS POINT,\n" + + " CAST(LINESTRING AS GEOMETRY) AS LINESTRING,\n" + + " CAST(POLYGON_COLUMS AS GEOMETRY) AS POLYGON_COLUMS,\n" + + " CAST(MULTIPOINT AS GEOMETRY) AS MULTIPOINT,\n" + + " CAST(MULTILINESTRING AS GEOMETRY) AS MULTILINESTRING,\n" + + " CAST(MULTIPOLYGON AS GEOMETRY) AS MULTILINESTRING,\n" + + " CAST(GEOMETRYCOLLECTION AS GEOMETRY) AS GEOMETRYCOLLECTION,\n" + + " CAST(GEOG AS GEOGRAPHY) AS GEOG\n" + + "FROM\n" + + " PG_IDE_SINK_TABLE"; + + @TestContainerExtension + private final ContainerExtendedFactory extendedFactory = + container -> { + Container.ExecResult extraCommands = + container.execInContainer( + "bash", + "-c", + "mkdir -p /tmp/seatunnel/plugins/Jdbc/lib && cd /tmp/seatunnel/plugins/Jdbc/lib && curl -O " + + PG_DRIVER_JAR + + " && curl -O " + + PG_JDBC_JAR + + " && curl -O " + + PG_GEOMETRY_JAR); + Assertions.assertEquals(0, extraCommands.getExitCode()); + }; + + @BeforeAll + @Override + public void startUp() throws Exception { + POSTGRESQL_CONTAINER = + new PostgreSQLContainer<>( + DockerImageName.parse(PG_IMAGE) + .asCompatibleSubstituteFor("postgres")) + .withNetwork(TestSuiteBase.NETWORK) + .withNetworkAliases("postgresql") + .withCommand("postgres -c max_prepared_transactions=100") + .withLogConsumer( + new Slf4jLogConsumer(DockerLoggerFactory.getLogger(PG_IMAGE))); + Startables.deepStart(Stream.of(POSTGRESQL_CONTAINER)).join(); + log.info("PostgreSQL container started"); + Class.forName(POSTGRESQL_CONTAINER.getDriverClassName()); + given().ignoreExceptions() + .await() + .atLeast(100, TimeUnit.MILLISECONDS) + .pollInterval(500, TimeUnit.MILLISECONDS) + .atMost(2, TimeUnit.MINUTES) + .untilAsserted(this::initializeJdbcTable); + log.info("pg data initialization succeeded. Procedure"); + } + + @TestTemplate + public void testAutoGenerateSQL(TestContainer container) + throws IOException, InterruptedException { + for (String CONFIG_FILE : PG_CONFIG_FILE_LIST) { + Container.ExecResult execResult = container.executeJob(CONFIG_FILE); + Assertions.assertEquals(0, execResult.getExitCode()); + Assertions.assertIterableEquals(querySql(SOURCE_SQL), querySql(SINK_SQL)); + executeSQL("truncate table pg_ide_sink_table"); + log.info(CONFIG_FILE + " e2e test completed"); + } + } + + private void initializeJdbcTable() { + try (Connection connection = getJdbcConnection()) { + Statement statement = connection.createStatement(); + statement.execute(PG_SOURCE_DDL); + statement.execute(PG_SINK_DDL); + for (int i = 1; i <= 1000; i++) { + statement.addBatch( + "INSERT INTO\n" + + " pg_ide_source_table (gid,\n" + + " text_col,\n" + + " varchar_col,\n" + + " char_col,\n" + + " boolean_col,\n" + + " smallint_col,\n" + + " integer_col,\n" + + " bigint_col,\n" + + " decimal_col,\n" + + " numeric_col,\n" + + " real_col,\n" + + " double_precision_col,\n" + + " smallserial_col,\n" + + " serial_col,\n" + + " bigserial_col,\n" + + " date_col,\n" + + " timestamp_col,\n" + + " bpchar_col,\n" + + " age,\n" + + " name,\n" + + " point,\n" + + " linestring,\n" + + " polygon_colums,\n" + + " multipoint,\n" + + " multilinestring,\n" + + " multipolygon,\n" + + " geometrycollection,\n" + + " geog\n" + + " )\n" + + "VALUES\n" + + " (\n" + + " '" + + i + + "',\n" + + " 'Hello World',\n" + + " 'Test',\n" + + " 'Testing',\n" + + " true,\n" + + " 10,\n" + + " 100,\n" + + " 1000,\n" + + " 10.55,\n" + + " 8.8888,\n" + + " 3.14,\n" + + " 3.14159265,\n" + + " 1,\n" + + " 100,\n" + + " 10000,\n" + + " '2023-05-07',\n" + + " '2023-05-07 14:30:00',\n" + + " 'Testing',\n" + + " 21,\n" + + " 'Leblanc',\n" + + " ST_GeomFromText('POINT(-122.3452 47.5925)', 4326),\n" + + " ST_GeomFromText(\n" + + " 'LINESTRING(-122.3451 47.5924, -122.3449 47.5923)',\n" + + " 4326\n" + + " ),\n" + + " ST_GeomFromText(\n" + + " 'POLYGON((-122.3453 47.5922, -122.3453 47.5926, -122.3448 47.5926, -122.3448 47.5922, -122.3453 47.5922))',\n" + + " 4326\n" + + " ),\n" + + " ST_GeomFromText(\n" + + " 'MULTIPOINT(-122.3459 47.5927, -122.3445 47.5918)',\n" + + " 4326\n" + + " ),\n" + + " ST_GeomFromText(\n" + + " 'MULTILINESTRING((-122.3463 47.5920, -122.3461 47.5919),(-122.3459 47.5924, -122.3457 47.5923))',\n" + + " 4326\n" + + " ),\n" + + " ST_GeomFromText(\n" + + " 'MULTIPOLYGON(((-122.3458 47.5925, -122.3458 47.5928, -122.3454 47.5928, -122.3454 47.5925, -122.3458 47.5925)),((-122.3453 47.5921, -122.3453 47.5924, -122.3448 47.5924, -122.3448 47.5921, -122.3453 47.5921)))',\n" + + " 4326\n" + + " ),\n" + + " ST_GeomFromText(\n" + + " 'GEOMETRYCOLLECTION(POINT(-122.3462 47.5921), LINESTRING(-122.3460 47.5924, -122.3457 47.5924))',\n" + + " 4326\n" + + " ),\n" + + " ST_GeographyFromText('POINT(-122.3452 47.5925)')\n" + + " )"); + } + + statement.executeBatch(); + } catch (SQLException e) { + throw new RuntimeException("Initializing PostgreSql table failed!", e); + } + } + + private Connection getJdbcConnection() throws SQLException { + return DriverManager.getConnection( + POSTGRESQL_CONTAINER.getJdbcUrl(), + POSTGRESQL_CONTAINER.getUsername(), + POSTGRESQL_CONTAINER.getPassword()); + } + + private List> querySql(String sql) { + try (Connection connection = getJdbcConnection()) { + ResultSet resultSet = connection.createStatement().executeQuery(sql); + List> result = new ArrayList<>(); + int columnCount = resultSet.getMetaData().getColumnCount(); + while (resultSet.next()) { + ArrayList objects = new ArrayList<>(); + for (int i = 1; i <= columnCount; i++) { + objects.add(resultSet.getObject(i)); + } + result.add(objects); + } + return result; + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + private void executeSQL(String sql) { + try (Connection connection = getJdbcConnection()) { + Statement statement = connection.createStatement(); + statement.execute(sql); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + @AfterAll + @Override + public void tearDown() { + if (POSTGRESQL_CONTAINER != null) { + POSTGRESQL_CONTAINER.stop(); + } + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_postgres_ide_source_and_sink.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_postgres_ide_source_and_sink.conf new file mode 100644 index 000000000000..504c0e0c1878 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_postgres_ide_source_and_sink.conf @@ -0,0 +1,48 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + execution.parallelism = 1 + job.mode = "BATCH" +} + +source{ + jdbc{ + url = "jdbc:postgresql://postgresql:5432/test?loggerLevel=OFF" + driver = "org.postgresql.Driver" + user = "test" + password = "test" + query ="""select gid, text_col, varchar_col, char_col, boolean_col, smallint_col, integer_col, bigint_col, decimal_col, numeric_col, real_col, double_precision_col, + smallserial_col, serial_col, bigserial_col, date_col, timestamp_col, bpchar_col, age, name, point, linestring, polygon_colums, multipoint, + multilinestring, multipolygon, geometrycollection, geog from pg_ide_source_table""" + } +} + + +sink { + Jdbc { + driver = org.postgresql.Driver + url = "jdbc:postgresql://postgresql:5432/test?loggerLevel=OFF" + user = test + password = test + generate_sink_sql = true + field_ide = UPPERCASE + database = test + table = "public.pg_ide_sink_table" + primary_keys = ["gid"] + } +} \ No newline at end of file From 5d59ce525ca6714c1877d86fa3f9aa2a584507d9 Mon Sep 17 00:00:00 2001 From: jiayang Date: Fri, 21 Jul 2023 20:11:05 +0800 Subject: [PATCH 05/16] [e2e] rename --- .../jdbc/{JdbcPostgresIT.java => JdbcPostgresIdentifierIT.java} | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) rename seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/{JdbcPostgresIT.java => JdbcPostgresIdentifierIT.java} (99%) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIdentifierIT.java similarity index 99% rename from seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java rename to seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIdentifierIT.java index ebf16bf59cbf..a26ac39d81ce 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIdentifierIT.java @@ -57,7 +57,7 @@ value = {}, type = {EngineType.SPARK, EngineType.FLINK}, disabledReason = "Currently SPARK and FLINK do not support cdc") -public class JdbcPostgresIT extends TestSuiteBase implements TestResource { +public class JdbcPostgresIdentifierIT extends TestSuiteBase implements TestResource { private static final String PG_IMAGE = "postgis/postgis"; private static final String PG_DRIVER_JAR = "https://repo1.maven.org/maven2/org/postgresql/postgresql/42.3.3/postgresql-42.3.3.jar"; From 9ee79535d5aa8bbcd970d6b2498b451163f40ffc Mon Sep 17 00:00:00 2001 From: jiayang Date: Fri, 21 Jul 2023 20:12:40 +0800 Subject: [PATCH 06/16] [e2e] add flink and spark --- .../connectors/seatunnel/jdbc/JdbcPostgresIdentifierIT.java | 6 ------ 1 file changed, 6 deletions(-) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIdentifierIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIdentifierIT.java index a26ac39d81ce..827175166709 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIdentifierIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIdentifierIT.java @@ -20,9 +20,7 @@ import org.apache.seatunnel.e2e.common.TestResource; import org.apache.seatunnel.e2e.common.TestSuiteBase; import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory; -import org.apache.seatunnel.e2e.common.container.EngineType; import org.apache.seatunnel.e2e.common.container.TestContainer; -import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer; import org.apache.seatunnel.e2e.common.junit.TestContainerExtension; import org.junit.jupiter.api.AfterAll; @@ -53,10 +51,6 @@ import static org.awaitility.Awaitility.given; @Slf4j -@DisabledOnContainer( - value = {}, - type = {EngineType.SPARK, EngineType.FLINK}, - disabledReason = "Currently SPARK and FLINK do not support cdc") public class JdbcPostgresIdentifierIT extends TestSuiteBase implements TestResource { private static final String PG_IMAGE = "postgis/postgis"; private static final String PG_DRIVER_JAR = From e93b69037d78b5c678068a08049ff56673299790 Mon Sep 17 00:00:00 2001 From: jiayang Date: Fri, 28 Jul 2023 10:03:50 +0800 Subject: [PATCH 07/16] [feature] update e2e --- .../connectors/seatunnel/jdbc/JdbcPostgresIdentifierIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIdentifierIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIdentifierIT.java index 827175166709..236aaf9c8ce6 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIdentifierIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIdentifierIT.java @@ -94,7 +94,7 @@ public class JdbcPostgresIdentifierIT extends TestSuiteBase implements TestResou + " geog geography(POINT, 4326)\n" + ")"; private static final String PG_SINK_DDL = - "CREATE TABLE IF NOT EXISTS pg_ide_sink_table (\n" + "CREATE TABLE IF NOT EXISTS PG_IDE_SINK_TABLE (\n" + " GID SERIAL PRIMARY KEY,\n" + " TEXT_COL TEXT,\n" + " VARCHAR_COL VARCHAR(255),\n" From ffea0cdd691fec0747bdbf33e2d778284a60c359 Mon Sep 17 00:00:00 2001 From: jiayang Date: Wed, 2 Aug 2023 11:09:19 +0800 Subject: [PATCH 08/16] [e2e] update e2e --- .../connectors/seatunnel/jdbc/JdbcPostgresIdentifierIT.java | 2 +- .../src/test/resources/jdbc_postgres_ide_source_and_sink.conf | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIdentifierIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIdentifierIT.java index 236aaf9c8ce6..37a8ea3a710f 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIdentifierIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIdentifierIT.java @@ -236,7 +236,7 @@ public void testAutoGenerateSQL(TestContainer container) Container.ExecResult execResult = container.executeJob(CONFIG_FILE); Assertions.assertEquals(0, execResult.getExitCode()); Assertions.assertIterableEquals(querySql(SOURCE_SQL), querySql(SINK_SQL)); - executeSQL("truncate table pg_ide_sink_table"); + executeSQL("truncate table PG_IDE_SINK_TABLE"); log.info(CONFIG_FILE + " e2e test completed"); } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_postgres_ide_source_and_sink.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_postgres_ide_source_and_sink.conf index 504c0e0c1878..52f9c065700c 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_postgres_ide_source_and_sink.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_postgres_ide_source_and_sink.conf @@ -42,7 +42,7 @@ sink { generate_sink_sql = true field_ide = UPPERCASE database = test - table = "public.pg_ide_sink_table" + table = "public.PG_IDE_SINK_TABLE" primary_keys = ["gid"] } } \ No newline at end of file From 3a27febba03bddba243e692f3ee9aced7b4c1515 Mon Sep 17 00:00:00 2001 From: jiayang Date: Thu, 3 Aug 2023 14:18:32 +0800 Subject: [PATCH 09/16] [e2e] ide --- .../jdbc/JdbcPostgresIdentifierIT.java | 66 ++++++++++--------- 1 file changed, 35 insertions(+), 31 deletions(-) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIdentifierIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIdentifierIT.java index 37a8ea3a710f..b4a98e08f837 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIdentifierIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIdentifierIT.java @@ -51,6 +51,10 @@ import static org.awaitility.Awaitility.given; @Slf4j +// @DisabledOnContainer( +// value = {}, +// type = {EngineType.SPARK, EngineType.FLINK}, +// disabledReason = "Currently SPARK and FLINK do not support cdc") public class JdbcPostgresIdentifierIT extends TestSuiteBase implements TestResource { private static final String PG_IMAGE = "postgis/postgis"; private static final String PG_DRIVER_JAR = @@ -94,35 +98,35 @@ public class JdbcPostgresIdentifierIT extends TestSuiteBase implements TestResou + " geog geography(POINT, 4326)\n" + ")"; private static final String PG_SINK_DDL = - "CREATE TABLE IF NOT EXISTS PG_IDE_SINK_TABLE (\n" - + " GID SERIAL PRIMARY KEY,\n" - + " TEXT_COL TEXT,\n" - + " VARCHAR_COL VARCHAR(255),\n" - + " CHAR_COL CHAR(10),\n" - + " BOOLEAN_COL bool,\n" - + " SMALLINT_COL int2,\n" - + " INTEGER_COL int4,\n" - + " BIGINT_COL BIGINT,\n" - + " DECIMAL_COL DECIMAL(10, 2),\n" - + " NUMERIC_COL NUMERIC(8, 4),\n" - + " REAL_COL float4,\n" - + " DOUBLE_PRECISION_COL float8,\n" - + " SMALLSERIAL_COL SMALLSERIAL,\n" - + " SERIAL_COL SERIAL,\n" - + " BIGSERIAL_COL BIGSERIAL,\n" - + " DATE_COL DATE,\n" - + " TIMESTAMP_COL TIMESTAMP,\n" - + " BPCHAR_COL BPCHAR(10),\n" - + " AGE int4 NOT NULL,\n" - + " NAME varchar(255) NOT NULL,\n" - + " POINT varchar(2000) NULL,\n" - + " LINESTRING varchar(2000) NULL,\n" - + " POLYGON_COLUMS varchar(2000) NULL,\n" - + " MULTIPOINT varchar(2000) NULL,\n" - + " MULTILINESTRING varchar(2000) NULL,\n" - + " MULTIPOLYGON varchar(2000) NULL,\n" - + " GEOMETRYCOLLECTION varchar(2000) NULL,\n" - + " GEOG varchar(2000) NULL\n" + "CREATE TABLE IF NOT EXISTS test.public.\"PG_IDE_SINK_TABLE\" (\n" + + " \"GID\" SERIAL PRIMARY KEY,\n" + + " \"TEXT_COL\" TEXT,\n" + + " \"VARCHAR_COL\" VARCHAR(255),\n" + + " \"CHAR_COL\" CHAR(10),\n" + + " \"BOOLEAN_COL\" bool,\n" + + " \"SMALLINT_COL\" int2,\n" + + " \"INTEGER_COL\" int4,\n" + + " \"BIGINT_COL\" BIGINT,\n" + + " \"DECIMAL_COL\" DECIMAL(10, 2),\n" + + " \"NUMERIC_COL\" NUMERIC(8, 4),\n" + + " \"REAL_COL\" float4,\n" + + " \"DOUBLE_PRECISION_COL\" float8,\n" + + " \"SMALLSERIAL_COL\" SMALLSERIAL,\n" + + " \"SERIAL_COL\" SERIAL,\n" + + " \"BIGSERIAL_COL\" BIGSERIAL,\n" + + " \"DATE_COL\" DATE,\n" + + " \"TIMESTAMP_COL\" TIMESTAMP,\n" + + " \"BPCHAR_COL\" BPCHAR(10),\n" + + " \"AGE\" int4 NOT NULL,\n" + + " \"NAME\" varchar(255) NOT NULL,\n" + + " \"POINT\" varchar(2000) NULL,\n" + + " \"LINESTRING\" varchar(2000) NULL,\n" + + " \"POLYGON_COLUMS\" varchar(2000) NULL,\n" + + " \"MULTIPOINT\" varchar(2000) NULL,\n" + + " \"MULTILINESTRING\" varchar(2000) NULL,\n" + + " \"MULTIPOLYGON\" varchar(2000) NULL,\n" + + " \"GEOMETRYCOLLECTION\" varchar(2000) NULL,\n" + + " \"GEOG\" varchar(2000) NULL\n" + " )"; private static final String SOURCE_SQL = @@ -187,7 +191,7 @@ public class JdbcPostgresIdentifierIT extends TestSuiteBase implements TestResou + " CAST(GEOMETRYCOLLECTION AS GEOMETRY) AS GEOMETRYCOLLECTION,\n" + " CAST(GEOG AS GEOGRAPHY) AS GEOG\n" + "FROM\n" - + " PG_IDE_SINK_TABLE"; + + " \"PG_IDE_SINK_TABLE\""; @TestContainerExtension private final ContainerExtendedFactory extendedFactory = @@ -246,7 +250,7 @@ private void initializeJdbcTable() { Statement statement = connection.createStatement(); statement.execute(PG_SOURCE_DDL); statement.execute(PG_SINK_DDL); - for (int i = 1; i <= 1000; i++) { + for (int i = 1; i <= 10; i++) { statement.addBatch( "INSERT INTO\n" + " pg_ide_source_table (gid,\n" From 55491a192e4f93e21e52d5641929bf60b38e48d8 Mon Sep 17 00:00:00 2001 From: jiayang Date: Thu, 3 Aug 2023 14:19:08 +0800 Subject: [PATCH 10/16] [e2e] ide --- .../connectors/seatunnel/jdbc/JdbcPostgresIdentifierIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIdentifierIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIdentifierIT.java index b4a98e08f837..a208e9cf7056 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIdentifierIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIdentifierIT.java @@ -240,7 +240,7 @@ public void testAutoGenerateSQL(TestContainer container) Container.ExecResult execResult = container.executeJob(CONFIG_FILE); Assertions.assertEquals(0, execResult.getExitCode()); Assertions.assertIterableEquals(querySql(SOURCE_SQL), querySql(SINK_SQL)); - executeSQL("truncate table PG_IDE_SINK_TABLE"); + executeSQL("truncate table \"PG_IDE_SINK_TABLE\""); log.info(CONFIG_FILE + " e2e test completed"); } } From 9fa4bcd59dd530ce0e62c5daf28ce47878579fa4 Mon Sep 17 00:00:00 2001 From: jiayang Date: Thu, 3 Aug 2023 14:53:29 +0800 Subject: [PATCH 11/16] [e2e] ide --- .../internal/dialect/psql/PostgresDialectFactory.java | 9 ++------- .../internal/dialect/psqllow/PostgresLowDialect.java | 5 +++++ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialectFactory.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialectFactory.java index 43e958e7997f..59dc0b45c682 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialectFactory.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialectFactory.java @@ -39,15 +39,10 @@ public JdbcDialect create() { } @Override - public JdbcDialect create(@Nonnull String compatibleMode) { + public JdbcDialect create(@Nonnull String compatibleMode, String fieldIde) { if ("postgresLow".equalsIgnoreCase(compatibleMode)) { - return new PostgresLowDialect(); + return new PostgresLowDialect(fieldIde); } - return new PostgresDialect(); - } - - @Override - public JdbcDialect create(@Nonnull String compatibleMode, String fieldIde) { return new PostgresDialect(fieldIde); } } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psqllow/PostgresLowDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psqllow/PostgresLowDialect.java index e367207ffa20..9100382628dc 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psqllow/PostgresLowDialect.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psqllow/PostgresLowDialect.java @@ -22,6 +22,11 @@ import java.util.Optional; public class PostgresLowDialect extends PostgresDialect { + + public PostgresLowDialect(String fieldIde) { + this.fieldIde = fieldIde; + } + @Override public Optional getUpsertStatement( String database, String tableName, String[] fieldNames, String[] uniqueKeyFields) { From df3ebe9fb57d974659d3cd4c7aa5db173d977e74 Mon Sep 17 00:00:00 2001 From: jiayang Date: Thu, 3 Aug 2023 15:06:23 +0800 Subject: [PATCH 12/16] [e2e] ide --- .../jdbc/internal/dialect/PostgresDialectFactoryTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/PostgresDialectFactoryTest.java b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/PostgresDialectFactoryTest.java index 79b1f11ac93b..90b980a69e15 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/PostgresDialectFactoryTest.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/PostgresDialectFactoryTest.java @@ -30,7 +30,7 @@ public class PostgresDialectFactoryTest { @Test public void testPostgresDialectCreate() { PostgresDialectFactory postgresDialectFactory = new PostgresDialectFactory(); - JdbcDialect postgresLow = postgresDialectFactory.create("postgresLow"); + JdbcDialect postgresLow = postgresDialectFactory.create("postgresLow", ""); String[] fields = {"id", "name", "age"}; String[] uniqueKeyField = {"id"}; Optional upsertStatement = From fb539f57d4f09a337dfe94ad36f1df41fdd56902 Mon Sep 17 00:00:00 2001 From: jiayang Date: Tue, 8 Aug 2023 14:57:13 +0800 Subject: [PATCH 13/16] [bugfix] mvn spotless --- .../seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java index c23619b5aade..8fd97ebaeacb 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java @@ -38,10 +38,12 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions; import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSinkConfig; import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectLoader; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.FieldIdeEnum; import org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcAggregatedCommitInfo; import org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcSinkState; import org.apache.seatunnel.connectors.seatunnel.jdbc.state.XidInfo; @@ -107,7 +109,10 @@ public void prepare(Config pluginConfig) throws PrepareFailException { this.dialect = JdbcDialectLoader.load( jdbcSinkConfig.getJdbcConnectionConfig().getUrl(), - jdbcSinkConfig.getJdbcConnectionConfig().getCompatibleMode()); + jdbcSinkConfig.getJdbcConnectionConfig().getCompatibleMode(), + config.get(JdbcOptions.FIELD_IDE) == null + ? FieldIdeEnum.ORIGINAL.getValue() + : config.get(JdbcOptions.FIELD_IDE).getValue()); this.dataSaveMode = DataSaveMode.KEEP_SCHEMA_AND_DATA; } From b9e23026785afb6581f4a2c2f3b1c6ffd73279b0 Mon Sep 17 00:00:00 2001 From: XiaoJiang521 <131635688+xiaojiang521@users.noreply.github.com> Date: Sun, 11 Jun 2023 09:39:24 +0800 Subject: [PATCH 14/16] [bugfix] update select sql --- .../jdbc/JdbcPostgresIdentifierIT.java | 68 ++++++++++--------- 1 file changed, 35 insertions(+), 33 deletions(-) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIdentifierIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIdentifierIT.java index a208e9cf7056..13adec70084c 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIdentifierIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIdentifierIT.java @@ -20,7 +20,9 @@ import org.apache.seatunnel.e2e.common.TestResource; import org.apache.seatunnel.e2e.common.TestSuiteBase; import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory; +import org.apache.seatunnel.e2e.common.container.EngineType; import org.apache.seatunnel.e2e.common.container.TestContainer; +import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer; import org.apache.seatunnel.e2e.common.junit.TestContainerExtension; import org.junit.jupiter.api.AfterAll; @@ -51,10 +53,10 @@ import static org.awaitility.Awaitility.given; @Slf4j -// @DisabledOnContainer( -// value = {}, -// type = {EngineType.SPARK, EngineType.FLINK}, -// disabledReason = "Currently SPARK and FLINK do not support cdc") +@DisabledOnContainer( + value = {}, + type = {EngineType.SPARK, EngineType.FLINK}, + disabledReason = "Currently SPARK and FLINK do not support cdc") public class JdbcPostgresIdentifierIT extends TestSuiteBase implements TestResource { private static final String PG_IMAGE = "postgis/postgis"; private static final String PG_DRIVER_JAR = @@ -162,36 +164,36 @@ public class JdbcPostgresIdentifierIT extends TestSuiteBase implements TestResou + " from pg_ide_source_table"; private static final String SINK_SQL = "SELECT\n" - + " GID,\n" - + " TEXT_COL,\n" - + " VARCHAR_COL,\n" - + " CHAR_COL,\n" - + " BOOLEAN_COL,\n" - + " SMALLINT_COL,\n" - + " INTEGER_COL,\n" - + " BIGINT_COL,\n" - + " DECIMAL_COL,\n" - + " NUMERIC_COL,\n" - + " REAL_COL,\n" - + " DOUBLE_PRECISION_COL,\n" - + " SMALLSERIAL_COL,\n" - + " SERIAL_COL,\n" - + " BIGSERIAL_COL,\n" - + " DATE_COL,\n" - + " TIMESTAMP_COL,\n" - + " BPCHAR_COL,\n" - + " AGE,\n" - + " NAME,\n" - + " CAST(POINT AS GEOMETRY) AS POINT,\n" - + " CAST(LINESTRING AS GEOMETRY) AS LINESTRING,\n" - + " CAST(POLYGON_COLUMS AS GEOMETRY) AS POLYGON_COLUMS,\n" - + " CAST(MULTIPOINT AS GEOMETRY) AS MULTIPOINT,\n" - + " CAST(MULTILINESTRING AS GEOMETRY) AS MULTILINESTRING,\n" - + " CAST(MULTIPOLYGON AS GEOMETRY) AS MULTILINESTRING,\n" - + " CAST(GEOMETRYCOLLECTION AS GEOMETRY) AS GEOMETRYCOLLECTION,\n" - + " CAST(GEOG AS GEOGRAPHY) AS GEOG\n" + + " \"GID\",\n" + + " \"TEXT_COL\",\n" + + " \"VARCHAR_COL\",\n" + + " \"CHAR_COL\",\n" + + " \"BOOLEAN_COL\",\n" + + " \"SMALLINT_COL\",\n" + + " \"INTEGER_COL\",\n" + + " \"BIGINT_COL\",\n" + + " \"DECIMAL_COL\",\n" + + " \"NUMERIC_COL\",\n" + + " \"REAL_COL\",\n" + + " \"DOUBLE_PRECISION_COL\",\n" + + " \"SMALLSERIAL_COL\",\n" + + " \"SERIAL_COL\",\n" + + " \"BIGSERIAL_COL\",\n" + + " \"DATE_COL\",\n" + + " \"TIMESTAMP_COL\",\n" + + " \"BPCHAR_COL\",\n" + + " \"AGE\",\n" + + " \"NAME\",\n" + + " CAST(\"POINT\" AS GEOMETRY) AS POINT,\n" + + " CAST(\"LINESTRING\" AS GEOMETRY) AS LINESTRING,\n" + + " CAST(\"POLYGON_COLUMS\" AS GEOMETRY) AS POLYGON_COLUMS,\n" + + " CAST(\"MULTIPOINT\" AS GEOMETRY) AS MULTIPOINT,\n" + + " CAST(\"MULTILINESTRING\" AS GEOMETRY) AS MULTILINESTRING,\n" + + " CAST(\"MULTIPOLYGON\" AS GEOMETRY) AS MULTILINESTRING,\n" + + " CAST(\"GEOMETRYCOLLECTION\" AS GEOMETRY) AS GEOMETRYCOLLECTION,\n" + + " CAST(\"GEOG\" AS GEOGRAPHY) AS GEOG\n" + "FROM\n" - + " \"PG_IDE_SINK_TABLE\""; + + " \"PG_IDE_SINK_TABLE\";"; @TestContainerExtension private final ContainerExtendedFactory extendedFactory = From 81f3c594b11489dc4324d8aee68b53ac7d35d6f8 Mon Sep 17 00:00:00 2001 From: jiayang Date: Wed, 9 Aug 2023 16:13:44 +0800 Subject: [PATCH 15/16] [bugfix] add docs --- docs/en/connector-v2/sink/Jdbc.md | 7 +++++++ docs/en/connector-v2/sink/Mysql.md | 2 ++ docs/en/connector-v2/sink/PostgreSql.md | 2 ++ 3 files changed, 11 insertions(+) diff --git a/docs/en/connector-v2/sink/Jdbc.md b/docs/en/connector-v2/sink/Jdbc.md index 9d68278cf51e..6a2c3039c272 100644 --- a/docs/en/connector-v2/sink/Jdbc.md +++ b/docs/en/connector-v2/sink/Jdbc.md @@ -48,6 +48,7 @@ support `Xa transactions`. You can set `is_exactly_once=true` to enable it. | max_commit_attempts | Int | No | 3 | | transaction_timeout_sec | Int | No | -1 | | auto_commit | Boolean | No | true | +| field_ide | String | No | - | | common-options | | no | - | ### driver [string] @@ -142,6 +143,12 @@ exactly-once semantics Automatic transaction commit is enabled by default +### field_ide [String] + +The field "field_ide" is used to identify whether the field needs to be converted to uppercase or lowercase when +synchronizing from the source to the sink. "ORIGINAL" indicates no conversion is needed, "UPPERCASE" indicates +conversion to uppercase, and "LOWERCASE" indicates conversion to lowercase. + ### common options Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details diff --git a/docs/en/connector-v2/sink/Mysql.md b/docs/en/connector-v2/sink/Mysql.md index 92254c1b54fa..6c616fd4953b 100644 --- a/docs/en/connector-v2/sink/Mysql.md +++ b/docs/en/connector-v2/sink/Mysql.md @@ -75,6 +75,7 @@ semantics (using XA transaction guarantee). | max_commit_attempts | Int | No | 3 | The number of retries for transaction commit failures | | transaction_timeout_sec | Int | No | -1 | The timeout after the transaction is opened, the default is -1 (never timeout). Note that setting the timeout may affect
exactly-once semantics | | auto_commit | Boolean | No | true | Automatic transaction commit is enabled by default | +| field_ide | String | No | - | Identify whether the field needs to be converted when synchronizing from the source to the sink. `ORIGINAL` indicates no conversion is needed;`UPPERCASE` indicates conversion to uppercase;`LOWERCASE` indicates conversion to lowercase. | | common-options | | no | - | Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details | ### Tips @@ -188,6 +189,7 @@ sink { database = test table = sink_table primary_keys = ["id","name"] + field_ide = UPPERCASE } } ``` diff --git a/docs/en/connector-v2/sink/PostgreSql.md b/docs/en/connector-v2/sink/PostgreSql.md index f7d6469b60fc..0a3e8a94142d 100644 --- a/docs/en/connector-v2/sink/PostgreSql.md +++ b/docs/en/connector-v2/sink/PostgreSql.md @@ -82,6 +82,7 @@ semantics (using XA transaction guarantee). | max_commit_attempts | Int | No | 3 | The number of retries for transaction commit failures | | transaction_timeout_sec | Int | No | -1 | The timeout after the transaction is opened, the default is -1 (never timeout). Note that setting the timeout may affect
exactly-once semantics | | auto_commit | Boolean | No | true | Automatic transaction commit is enabled by default | +| field_ide | String | No | - | Identify whether the field needs to be converted when synchronizing from the source to the sink. `ORIGINAL` indicates no conversion is needed;`UPPERCASE` indicates conversion to uppercase;`LOWERCASE` indicates conversion to lowercase. | | common-options | | no | - | Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details | ### Tips @@ -194,6 +195,7 @@ sink { database = test table = sink_table primary_keys = ["id","name"] + field_ide = UPPERCASE } } ``` From 8bc94ccd65b38f24785143bb9527345ecd5268bb Mon Sep 17 00:00:00 2001 From: jiayang Date: Sat, 9 Sep 2023 11:56:56 +0800 Subject: [PATCH 16/16] [feature] spotless --- .../seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java | 3 +-- .../connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java | 4 +--- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java index 8fd97ebaeacb..71e0a8624964 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java @@ -43,7 +43,6 @@ import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectLoader; -import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.FieldIdeEnum; import org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcAggregatedCommitInfo; import org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcSinkState; import org.apache.seatunnel.connectors.seatunnel.jdbc.state.XidInfo; @@ -111,7 +110,7 @@ public void prepare(Config pluginConfig) throws PrepareFailException { jdbcSinkConfig.getJdbcConnectionConfig().getUrl(), jdbcSinkConfig.getJdbcConnectionConfig().getCompatibleMode(), config.get(JdbcOptions.FIELD_IDE) == null - ? FieldIdeEnum.ORIGINAL.getValue() + ? null : config.get(JdbcOptions.FIELD_IDE).getValue()); this.dataSaveMode = DataSaveMode.KEEP_SCHEMA_AND_DATA; } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java index fb995f082f5f..d18ff0d7fdb1 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java @@ -149,9 +149,7 @@ public TableSink createSink(TableFactoryContext context) { JdbcDialectLoader.load( sinkConfig.getJdbcConnectionConfig().getUrl(), sinkConfig.getJdbcConnectionConfig().getCompatibleMode(), - fieldIdeEnum == null - ? FieldIdeEnum.ORIGINAL.getValue() - : fieldIdeEnum.getValue()); + fieldIdeEnum == null ? null : fieldIdeEnum.getValue()); CatalogTable finalCatalogTable = catalogTable; return () -> new JdbcSink(