From d0dbe00835cd0faa03925b0c63676556e047fdae Mon Sep 17 00:00:00 2001 From: Kewei Shang Date: Wed, 17 Feb 2021 16:15:06 +0200 Subject: [PATCH 1/2] DBZ-3124 Add ENUM column type support for Vitess 9.0.0 --- pom.xml | 9 +- .../debezium/connector/vitess/VitessType.java | 103 ++++++++++++++++-- .../vitess/VitessValueConverter.java | 49 ++++++++- .../VStreamOutputMessageDecoder.java | 10 +- .../vitess/AbstractVitessConnectorTest.java | 2 +- .../connector/vitess/VitessTypeTest.java | 77 +++++++++---- 6 files changed, 210 insertions(+), 40 deletions(-) diff --git a/pom.xml b/pom.xml index baacdc88..ee4edee8 100644 --- a/pom.xml +++ b/pom.xml @@ -18,9 +18,9 @@ ${project.version} - 7.0.0 + 9.0.0 2.7 - 1.16.0 + 1.33.0 2.10.1 1.17.0 1.5.0 @@ -145,6 +145,11 @@ grpc-context ${version.grpc} + + io.grpc + grpc-api + ${version.grpc} + com.google.api.grpc proto-google-common-protos diff --git a/src/main/java/io/debezium/connector/vitess/VitessType.java b/src/main/java/io/debezium/connector/vitess/VitessType.java index ed9012fe..37e88051 100644 --- a/src/main/java/io/debezium/connector/vitess/VitessType.java +++ b/src/main/java/io/debezium/connector/vitess/VitessType.java @@ -6,6 +6,11 @@ package io.debezium.connector.vitess; import java.sql.Types; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +import io.vitess.proto.Query; /** The Vitess table column type */ public class VitessType { @@ -14,10 +19,17 @@ public class VitessType { private final String name; // enum of column jdbc type private final int jdbcId; + // permitted enum values + private final List enumValues; public VitessType(String name, int jdbcId) { + this(name, jdbcId, null); + } + + public VitessType(String name, int jdbcId, List enumValues) { this.name = name; this.jdbcId = jdbcId; + this.enumValues = enumValues; } public String getName() { @@ -28,27 +40,59 @@ public int getJdbcId() { return jdbcId; } + public List getEnumValues() { + return enumValues; + } + + public boolean isEnum() { + return enumValues != null && enumValues.size() != 0; + } + @Override public String toString() { - return "VitessType{" + "name='" + name + '\'' + ", jdbcId=" + jdbcId + '}'; + return "VitessType{" + + "name='" + name + '\'' + + ", jdbcId=" + jdbcId + + ", enumValues=" + enumValues + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + VitessType that = (VitessType) o; + return jdbcId == that.jdbcId && name.equals(that.name) && Objects.equals(enumValues, that.enumValues); + } + + @Override + public int hashCode() { + return Objects.hash(name, jdbcId, enumValues); } // Resolve JDBC type from vstream FIELD event - public static VitessType resolve(String columnType) { - switch (columnType) { + public static VitessType resolve(Query.Field field) { + String type = field.getType().name(); + switch (type) { case "INT8": case "UINT8": case "INT16": - return new VitessType(columnType, Types.SMALLINT); + return new VitessType(type, Types.SMALLINT); case "UINT16": case "INT24": case "UINT24": case "INT32": - return new VitessType(columnType, Types.INTEGER); + return new VitessType(type, Types.INTEGER); + case "ENUM": + return new VitessType(type, Types.INTEGER, resolveEnumValues(field.getColumnType())); case "UINT32": case "INT64": case "UINT64": - return new VitessType(columnType, Types.BIGINT); + return new VitessType(type, Types.BIGINT); case "VARBINARY": case "BINARY": case "VARCHAR": @@ -61,15 +105,52 @@ public static VitessType resolve(String columnType) { case "DATETIME": case "TIMESTAMP": case "YEAR": - case "ENUM": case "SET": - return new VitessType(columnType, Types.VARCHAR); + return new VitessType(type, Types.VARCHAR); case "FLOAT32": - return new VitessType(columnType, Types.FLOAT); + return new VitessType(type, Types.FLOAT); case "FLOAT64": - return new VitessType(columnType, Types.DOUBLE); + return new VitessType(type, Types.DOUBLE); default: - return new VitessType(columnType, Types.OTHER); + return new VitessType(type, Types.OTHER); + } + } + + /** + * Resolve the list of permitted Enum values from the Enum Definition + * @param enumDefinition the Enum column definition from the MySQL table. E.g. "enum('m','l','xl')" + * @return The list of permitted Enum values + */ + private static List resolveEnumValues(String enumDefinition) { + List enumValues = new ArrayList<>(); + if (enumDefinition == null || enumDefinition.length() == 0) { + return enumValues; + } + + StringBuilder sb = new StringBuilder(); + boolean startCollecting = false; + char[] chars = enumDefinition.toCharArray(); + for (int i = 0; i < chars.length; i++) { + if (chars[i] == '\'') { + if (chars[i + 1] != '\'') { + if (startCollecting) { + // end of the Enum value, add the Enum value to the result list + enumValues.add(sb.toString()); + sb.setLength(0); + } + startCollecting = !startCollecting; + } + else { + sb.append("'"); + // In MySQL, the single quote in the Enum definition "a'b" is escaped and becomes "a''b". + // Skip the second single-quote + i++; + } + } + else if (startCollecting) { + sb.append(chars[i]); + } } + return enumValues; } } diff --git a/src/main/java/io/debezium/connector/vitess/VitessValueConverter.java b/src/main/java/io/debezium/connector/vitess/VitessValueConverter.java index a91f954f..d0f705c5 100644 --- a/src/main/java/io/debezium/connector/vitess/VitessValueConverter.java +++ b/src/main/java/io/debezium/connector/vitess/VitessValueConverter.java @@ -6,6 +6,7 @@ package io.debezium.connector.vitess; import java.time.ZoneOffset; +import java.util.List; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.SchemaBuilder; @@ -16,6 +17,8 @@ import io.debezium.relational.Column; import io.debezium.relational.RelationalChangeRecordEmitter; import io.debezium.relational.ValueConverter; +import io.debezium.util.Strings; +import io.vitess.proto.Query; /** Used by {@link RelationalChangeRecordEmitter} to convert Java value to Connect value */ public class VitessValueConverter extends JdbcValueConverters { @@ -34,9 +37,13 @@ public VitessValueConverter( @Override public SchemaBuilder schemaBuilder(Column column) { String typeName = column.typeName().toUpperCase(); - if (matches(typeName, "JSON")) { + if (matches(typeName, Query.Type.JSON.name())) { return Json.builder(); } + if (matches(typeName, Query.Type.ENUM.name())) { + String commaSeperatedOptions = Strings.join(",", column.enumValues()); + return io.debezium.data.Enum.builder(commaSeperatedOptions); + } final SchemaBuilder jdbcSchemaBuilder = super.schemaBuilder(column); if (jdbcSchemaBuilder == null) { @@ -50,6 +57,11 @@ public SchemaBuilder schemaBuilder(Column column) { // Convert Java value to Kafka connect value. @Override public ValueConverter converter(Column column, Field fieldDefn) { + String typeName = column.typeName().toUpperCase(); + if (matches(typeName, Query.Type.ENUM.name())) { + return (data) -> convertEnumToString(column.enumValues(), column, fieldDefn, data); + } + final ValueConverter jdbcConverter = super.converter(column, fieldDefn); if (jdbcConverter == null) { return includeUnknownDatatypes @@ -75,4 +87,39 @@ protected boolean matches(String upperCaseTypeName, String upperCaseMatch) { } return upperCaseMatch.equals(upperCaseTypeName) || upperCaseTypeName.startsWith(upperCaseMatch + "("); } + + /** + * Converts a value object for a MySQL {@code ENUM}, which is represented in the binlog events as an integer value containing + * the index of the enum option. + * + * @param options the characters that appear in the same order as defined in the column; may not be null + * @param column the column definition describing the {@code data} value; never null + * @param fieldDefn the field definition; never null + * @param data the data object to be converted into an {@code ENUM} literal String value + * @return the converted value, or empty string if the conversion could not be made + */ + private Object convertEnumToString(List options, Column column, Field fieldDefn, Object data) { + return convertValue(column, fieldDefn, data, "", (r) -> { + if (options != null) { + // The binlog will contain an int with the 1-based index of the option in the enum value ... + int value = ((Integer) data).intValue(); + if (value == 0) { + // an invalid value was specified, which corresponds to the empty string '' and an index of 0 + r.deliver(""); + } + else { + int index = value - 1; // 'options' is 0-based + if (index < options.size() && index >= 0) { + r.deliver(options.get(index)); + } + else { + r.deliver(""); + } + } + } + else { + r.deliver(""); + } + }); + } } diff --git a/src/main/java/io/debezium/connector/vitess/connection/VStreamOutputMessageDecoder.java b/src/main/java/io/debezium/connector/vitess/connection/VStreamOutputMessageDecoder.java index be7dc3ef..0999a394 100644 --- a/src/main/java/io/debezium/connector/vitess/connection/VStreamOutputMessageDecoder.java +++ b/src/main/java/io/debezium/connector/vitess/connection/VStreamOutputMessageDecoder.java @@ -290,7 +290,7 @@ private List resolveColumns(Row row, Table table) { for (short i = 0; i < numberOfColumns; i++) { final io.debezium.relational.Column column = tableColumns.get(i); final String columnName = column.name(); - final VitessType vitessType = new VitessType(column.typeName(), column.jdbcType()); + final VitessType vitessType = new VitessType(column.typeName(), column.jdbcType(), column.enumValues()); final boolean optional = column.isOptional(); final int rawValueLength = (int) row.getLengths(i); @@ -329,10 +329,9 @@ private void handleFieldMessage(Binlogdata.VEvent vEvent) { for (short i = 0; i < columnCount; ++i) { Field field = fieldEvent.getFields(i); String columnName = validateColumnName(field.getName(), schemaName, tableName); - String columnType = field.getType().name(); - VitessType vitessType = VitessType.resolve(columnType); + VitessType vitessType = VitessType.resolve(field); if (vitessType.getJdbcId() == Types.OTHER) { - LOGGER.error("Cannot resolve JDBC type from vstream type {}", columnType); + LOGGER.error("Cannot resolve JDBC type from vstream field {}", field); } KeyMetaData keyMetaData = KeyMetaData.NONE; @@ -365,6 +364,9 @@ private Table resolveTable(String schemaName, String tableName, List schemasAndValuesForEnumType() { final List fields = new ArrayList<>(); fields.addAll( Arrays.asList( - new SchemaAndValueField("enum_col", SchemaBuilder.STRING_SCHEMA, "3"))); + new SchemaAndValueField("enum_col", io.debezium.data.Enum.builder("small,medium,large").build(), "large"))); return fields; } diff --git a/src/test/java/io/debezium/connector/vitess/VitessTypeTest.java b/src/test/java/io/debezium/connector/vitess/VitessTypeTest.java index b15ccf97..253cbe63 100644 --- a/src/test/java/io/debezium/connector/vitess/VitessTypeTest.java +++ b/src/test/java/io/debezium/connector/vitess/VitessTypeTest.java @@ -8,6 +8,7 @@ import static org.fest.assertions.Assertions.assertThat; import java.sql.Types; +import java.util.Arrays; import org.junit.Test; @@ -17,28 +18,62 @@ public class VitessTypeTest { @Test public void shouldResolveVitessTypeToJdbcType() { - assertThat(VitessType.resolve(Query.Type.INT8.name()).getJdbcId()).isEqualTo(Types.SMALLINT); - assertThat(VitessType.resolve(Query.Type.INT16.name()).getJdbcId()).isEqualTo(Types.SMALLINT); - assertThat(VitessType.resolve(Query.Type.INT24.name()).getJdbcId()).isEqualTo(Types.INTEGER); - assertThat(VitessType.resolve(Query.Type.INT32.name()).getJdbcId()).isEqualTo(Types.INTEGER); - assertThat(VitessType.resolve(Query.Type.INT64.name()).getJdbcId()).isEqualTo(Types.BIGINT); - assertThat(VitessType.resolve(Query.Type.FLOAT32.name()).getJdbcId()).isEqualTo(Types.FLOAT); - assertThat(VitessType.resolve(Query.Type.FLOAT64.name()).getJdbcId()).isEqualTo(Types.DOUBLE); - assertThat(VitessType.resolve(Query.Type.VARBINARY.name()).getJdbcId()) + assertThat(VitessType.resolve(asField(Query.Type.INT8)).getJdbcId()).isEqualTo(Types.SMALLINT); + assertThat(VitessType.resolve(asField(Query.Type.INT16)).getJdbcId()).isEqualTo(Types.SMALLINT); + assertThat(VitessType.resolve(asField(Query.Type.INT24)).getJdbcId()).isEqualTo(Types.INTEGER); + assertThat(VitessType.resolve(asField(Query.Type.INT32)).getJdbcId()).isEqualTo(Types.INTEGER); + assertThat(VitessType.resolve(asField(Query.Type.INT64)).getJdbcId()).isEqualTo(Types.BIGINT); + assertThat(VitessType.resolve(asField(Query.Type.FLOAT32)).getJdbcId()).isEqualTo(Types.FLOAT); + assertThat(VitessType.resolve(asField(Query.Type.FLOAT64)).getJdbcId()).isEqualTo(Types.DOUBLE); + assertThat(VitessType.resolve(asField(Query.Type.VARBINARY)).getJdbcId()) .isEqualTo(Types.VARCHAR); - assertThat(VitessType.resolve(Query.Type.BINARY.name()).getJdbcId()).isEqualTo(Types.VARCHAR); - assertThat(VitessType.resolve(Query.Type.VARCHAR.name()).getJdbcId()).isEqualTo(Types.VARCHAR); - assertThat(VitessType.resolve(Query.Type.CHAR.name()).getJdbcId()).isEqualTo(Types.VARCHAR); - assertThat(VitessType.resolve(Query.Type.TEXT.name()).getJdbcId()).isEqualTo(Types.VARCHAR); - assertThat(VitessType.resolve(Query.Type.JSON.name()).getJdbcId()).isEqualTo(Types.VARCHAR); - assertThat(VitessType.resolve(Query.Type.DECIMAL.name()).getJdbcId()).isEqualTo(Types.VARCHAR); - assertThat(VitessType.resolve(Query.Type.TIME.name()).getJdbcId()).isEqualTo(Types.VARCHAR); - assertThat(VitessType.resolve(Query.Type.DATE.name()).getJdbcId()).isEqualTo(Types.VARCHAR); - assertThat(VitessType.resolve(Query.Type.DATETIME.name()).getJdbcId()).isEqualTo(Types.VARCHAR); - assertThat(VitessType.resolve(Query.Type.TIMESTAMP.name()).getJdbcId()) + assertThat(VitessType.resolve(asField(Query.Type.BINARY)).getJdbcId()).isEqualTo(Types.VARCHAR); + assertThat(VitessType.resolve(asField(Query.Type.VARCHAR)).getJdbcId()).isEqualTo(Types.VARCHAR); + assertThat(VitessType.resolve(asField(Query.Type.CHAR)).getJdbcId()).isEqualTo(Types.VARCHAR); + assertThat(VitessType.resolve(asField(Query.Type.TEXT)).getJdbcId()).isEqualTo(Types.VARCHAR); + assertThat(VitessType.resolve(asField(Query.Type.JSON)).getJdbcId()).isEqualTo(Types.VARCHAR); + assertThat(VitessType.resolve(asField(Query.Type.DECIMAL)).getJdbcId()).isEqualTo(Types.VARCHAR); + assertThat(VitessType.resolve(asField(Query.Type.TIME)).getJdbcId()).isEqualTo(Types.VARCHAR); + assertThat(VitessType.resolve(asField(Query.Type.DATE)).getJdbcId()).isEqualTo(Types.VARCHAR); + assertThat(VitessType.resolve(asField(Query.Type.DATETIME)).getJdbcId()).isEqualTo(Types.VARCHAR); + assertThat(VitessType.resolve(asField(Query.Type.TIMESTAMP)).getJdbcId()) .isEqualTo(Types.VARCHAR); - assertThat(VitessType.resolve(Query.Type.ENUM.name()).getJdbcId()).isEqualTo(Types.VARCHAR); - assertThat(VitessType.resolve(Query.Type.SET.name()).getJdbcId()).isEqualTo(Types.VARCHAR); - assertThat(VitessType.resolve("foo").getJdbcId()).isEqualTo(Types.OTHER); + assertThat(VitessType.resolve(asField(Query.Type.SET)).getJdbcId()).isEqualTo(Types.VARCHAR); + assertThat(VitessType.resolve(asField(Query.Type.GEOMETRY)).getJdbcId()).isEqualTo(Types.OTHER); + } + + @Test + public void shouldResolveEnumToVitessType() { + Query.Field enumField = Query.Field.newBuilder() + .setType(Query.Type.ENUM) + .setColumnType("enum('eu','us','asia')") + .build(); + assertThat(VitessType.resolve(enumField)) + .isEqualTo(new VitessType(Query.Type.ENUM.name(), Types.INTEGER, Arrays.asList("eu", "us", "asia"))); + + enumField = Query.Field.newBuilder() + .setType(Query.Type.ENUM) + .setColumnType("enum('e,u','us','asia')") + .build(); + assertThat(VitessType.resolve(enumField)) + .isEqualTo(new VitessType(Query.Type.ENUM.name(), Types.INTEGER, Arrays.asList("e,u", "us", "asia"))); + + enumField = Query.Field.newBuilder() + .setType(Query.Type.ENUM) + .setColumnType("enum('e'',u','us','asia')") + .build(); + assertThat(VitessType.resolve(enumField)) + .isEqualTo(new VitessType(Query.Type.ENUM.name(), Types.INTEGER, Arrays.asList("e',u", "us", "asia"))); + + enumField = Query.Field.newBuilder() + .setType(Query.Type.ENUM) + .setColumnType("enum('e'','',''u','us','asia')") + .build(); + assertThat(VitessType.resolve(enumField)) + .isEqualTo(new VitessType(Query.Type.ENUM.name(), Types.INTEGER, Arrays.asList("e',','u", "us", "asia"))); + } + + private Query.Field asField(Query.Type type) { + return Query.Field.newBuilder().setType(type).build(); } } From 9e9f48ed4c85f03f6e8f4a32a810030b4e09676d Mon Sep 17 00:00:00 2001 From: Kewei Shang Date: Fri, 19 Feb 2021 11:38:10 +0200 Subject: [PATCH 2/2] DBZ-3124 Address comments and clean up the code --- src/main/java/io/debezium/connector/vitess/VitessType.java | 7 ++++--- .../io/debezium/connector/vitess/VitessValueConverter.java | 4 +--- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/src/main/java/io/debezium/connector/vitess/VitessType.java b/src/main/java/io/debezium/connector/vitess/VitessType.java index 37e88051..380ef52d 100644 --- a/src/main/java/io/debezium/connector/vitess/VitessType.java +++ b/src/main/java/io/debezium/connector/vitess/VitessType.java @@ -7,6 +7,7 @@ import java.sql.Types; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Objects; @@ -23,13 +24,13 @@ public class VitessType { private final List enumValues; public VitessType(String name, int jdbcId) { - this(name, jdbcId, null); + this(name, jdbcId, Collections.emptyList()); } public VitessType(String name, int jdbcId, List enumValues) { this.name = name; this.jdbcId = jdbcId; - this.enumValues = enumValues; + this.enumValues = Collections.unmodifiableList(enumValues); } public String getName() { @@ -45,7 +46,7 @@ public List getEnumValues() { } public boolean isEnum() { - return enumValues != null && enumValues.size() != 0; + return !enumValues.isEmpty(); } @Override diff --git a/src/main/java/io/debezium/connector/vitess/VitessValueConverter.java b/src/main/java/io/debezium/connector/vitess/VitessValueConverter.java index d0f705c5..204cd7be 100644 --- a/src/main/java/io/debezium/connector/vitess/VitessValueConverter.java +++ b/src/main/java/io/debezium/connector/vitess/VitessValueConverter.java @@ -17,7 +17,6 @@ import io.debezium.relational.Column; import io.debezium.relational.RelationalChangeRecordEmitter; import io.debezium.relational.ValueConverter; -import io.debezium.util.Strings; import io.vitess.proto.Query; /** Used by {@link RelationalChangeRecordEmitter} to convert Java value to Connect value */ @@ -41,8 +40,7 @@ public SchemaBuilder schemaBuilder(Column column) { return Json.builder(); } if (matches(typeName, Query.Type.ENUM.name())) { - String commaSeperatedOptions = Strings.join(",", column.enumValues()); - return io.debezium.data.Enum.builder(commaSeperatedOptions); + return io.debezium.data.Enum.builder(column.enumValues()); } final SchemaBuilder jdbcSchemaBuilder = super.schemaBuilder(column);