diff --git a/pom.xml b/pom.xml
index baacdc88..ee4edee8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -18,9 +18,9 @@
${project.version}
- 7.0.0
+ 9.0.0
2.7
- 1.16.0
+ 1.33.0
2.10.1
1.17.0
1.5.0
@@ -145,6 +145,11 @@
grpc-context
${version.grpc}
+
+ io.grpc
+ grpc-api
+ ${version.grpc}
+
com.google.api.grpc
proto-google-common-protos
diff --git a/src/main/java/io/debezium/connector/vitess/VitessType.java b/src/main/java/io/debezium/connector/vitess/VitessType.java
index ed9012fe..380ef52d 100644
--- a/src/main/java/io/debezium/connector/vitess/VitessType.java
+++ b/src/main/java/io/debezium/connector/vitess/VitessType.java
@@ -6,6 +6,12 @@
package io.debezium.connector.vitess;
import java.sql.Types;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+import io.vitess.proto.Query;
/** The Vitess table column type */
public class VitessType {
@@ -14,10 +20,17 @@ public class VitessType {
private final String name;
// enum of column jdbc type
private final int jdbcId;
+ // permitted enum values
+ private final List enumValues;
public VitessType(String name, int jdbcId) {
+ this(name, jdbcId, Collections.emptyList());
+ }
+
+ public VitessType(String name, int jdbcId, List enumValues) {
this.name = name;
this.jdbcId = jdbcId;
+ this.enumValues = Collections.unmodifiableList(enumValues);
}
public String getName() {
@@ -28,27 +41,59 @@ public int getJdbcId() {
return jdbcId;
}
+ public List getEnumValues() {
+ return enumValues;
+ }
+
+ public boolean isEnum() {
+ return !enumValues.isEmpty();
+ }
+
@Override
public String toString() {
- return "VitessType{" + "name='" + name + '\'' + ", jdbcId=" + jdbcId + '}';
+ return "VitessType{" +
+ "name='" + name + '\'' +
+ ", jdbcId=" + jdbcId +
+ ", enumValues=" + enumValues +
+ '}';
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ VitessType that = (VitessType) o;
+ return jdbcId == that.jdbcId && name.equals(that.name) && Objects.equals(enumValues, that.enumValues);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(name, jdbcId, enumValues);
}
// Resolve JDBC type from vstream FIELD event
- public static VitessType resolve(String columnType) {
- switch (columnType) {
+ public static VitessType resolve(Query.Field field) {
+ String type = field.getType().name();
+ switch (type) {
case "INT8":
case "UINT8":
case "INT16":
- return new VitessType(columnType, Types.SMALLINT);
+ return new VitessType(type, Types.SMALLINT);
case "UINT16":
case "INT24":
case "UINT24":
case "INT32":
- return new VitessType(columnType, Types.INTEGER);
+ return new VitessType(type, Types.INTEGER);
+ case "ENUM":
+ return new VitessType(type, Types.INTEGER, resolveEnumValues(field.getColumnType()));
case "UINT32":
case "INT64":
case "UINT64":
- return new VitessType(columnType, Types.BIGINT);
+ return new VitessType(type, Types.BIGINT);
case "VARBINARY":
case "BINARY":
case "VARCHAR":
@@ -61,15 +106,52 @@ public static VitessType resolve(String columnType) {
case "DATETIME":
case "TIMESTAMP":
case "YEAR":
- case "ENUM":
case "SET":
- return new VitessType(columnType, Types.VARCHAR);
+ return new VitessType(type, Types.VARCHAR);
case "FLOAT32":
- return new VitessType(columnType, Types.FLOAT);
+ return new VitessType(type, Types.FLOAT);
case "FLOAT64":
- return new VitessType(columnType, Types.DOUBLE);
+ return new VitessType(type, Types.DOUBLE);
default:
- return new VitessType(columnType, Types.OTHER);
+ return new VitessType(type, Types.OTHER);
+ }
+ }
+
+ /**
+ * Resolve the list of permitted Enum values from the Enum Definition
+ * @param enumDefinition the Enum column definition from the MySQL table. E.g. "enum('m','l','xl')"
+ * @return The list of permitted Enum values
+ */
+ private static List resolveEnumValues(String enumDefinition) {
+ List enumValues = new ArrayList<>();
+ if (enumDefinition == null || enumDefinition.length() == 0) {
+ return enumValues;
+ }
+
+ StringBuilder sb = new StringBuilder();
+ boolean startCollecting = false;
+ char[] chars = enumDefinition.toCharArray();
+ for (int i = 0; i < chars.length; i++) {
+ if (chars[i] == '\'') {
+ if (chars[i + 1] != '\'') {
+ if (startCollecting) {
+ // end of the Enum value, add the Enum value to the result list
+ enumValues.add(sb.toString());
+ sb.setLength(0);
+ }
+ startCollecting = !startCollecting;
+ }
+ else {
+ sb.append("'");
+ // In MySQL, the single quote in the Enum definition "a'b" is escaped and becomes "a''b".
+ // Skip the second single-quote
+ i++;
+ }
+ }
+ else if (startCollecting) {
+ sb.append(chars[i]);
+ }
}
+ return enumValues;
}
}
diff --git a/src/main/java/io/debezium/connector/vitess/VitessValueConverter.java b/src/main/java/io/debezium/connector/vitess/VitessValueConverter.java
index a91f954f..204cd7be 100644
--- a/src/main/java/io/debezium/connector/vitess/VitessValueConverter.java
+++ b/src/main/java/io/debezium/connector/vitess/VitessValueConverter.java
@@ -6,6 +6,7 @@
package io.debezium.connector.vitess;
import java.time.ZoneOffset;
+import java.util.List;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.SchemaBuilder;
@@ -16,6 +17,7 @@
import io.debezium.relational.Column;
import io.debezium.relational.RelationalChangeRecordEmitter;
import io.debezium.relational.ValueConverter;
+import io.vitess.proto.Query;
/** Used by {@link RelationalChangeRecordEmitter} to convert Java value to Connect value */
public class VitessValueConverter extends JdbcValueConverters {
@@ -34,9 +36,12 @@ public VitessValueConverter(
@Override
public SchemaBuilder schemaBuilder(Column column) {
String typeName = column.typeName().toUpperCase();
- if (matches(typeName, "JSON")) {
+ if (matches(typeName, Query.Type.JSON.name())) {
return Json.builder();
}
+ if (matches(typeName, Query.Type.ENUM.name())) {
+ return io.debezium.data.Enum.builder(column.enumValues());
+ }
final SchemaBuilder jdbcSchemaBuilder = super.schemaBuilder(column);
if (jdbcSchemaBuilder == null) {
@@ -50,6 +55,11 @@ public SchemaBuilder schemaBuilder(Column column) {
// Convert Java value to Kafka connect value.
@Override
public ValueConverter converter(Column column, Field fieldDefn) {
+ String typeName = column.typeName().toUpperCase();
+ if (matches(typeName, Query.Type.ENUM.name())) {
+ return (data) -> convertEnumToString(column.enumValues(), column, fieldDefn, data);
+ }
+
final ValueConverter jdbcConverter = super.converter(column, fieldDefn);
if (jdbcConverter == null) {
return includeUnknownDatatypes
@@ -75,4 +85,39 @@ protected boolean matches(String upperCaseTypeName, String upperCaseMatch) {
}
return upperCaseMatch.equals(upperCaseTypeName) || upperCaseTypeName.startsWith(upperCaseMatch + "(");
}
+
+ /**
+ * Converts a value object for a MySQL {@code ENUM}, which is represented in the binlog events as an integer value containing
+ * the index of the enum option.
+ *
+ * @param options the characters that appear in the same order as defined in the column; may not be null
+ * @param column the column definition describing the {@code data} value; never null
+ * @param fieldDefn the field definition; never null
+ * @param data the data object to be converted into an {@code ENUM} literal String value
+ * @return the converted value, or empty string if the conversion could not be made
+ */
+ private Object convertEnumToString(List options, Column column, Field fieldDefn, Object data) {
+ return convertValue(column, fieldDefn, data, "", (r) -> {
+ if (options != null) {
+ // The binlog will contain an int with the 1-based index of the option in the enum value ...
+ int value = ((Integer) data).intValue();
+ if (value == 0) {
+ // an invalid value was specified, which corresponds to the empty string '' and an index of 0
+ r.deliver("");
+ }
+ else {
+ int index = value - 1; // 'options' is 0-based
+ if (index < options.size() && index >= 0) {
+ r.deliver(options.get(index));
+ }
+ else {
+ r.deliver("");
+ }
+ }
+ }
+ else {
+ r.deliver("");
+ }
+ });
+ }
}
diff --git a/src/main/java/io/debezium/connector/vitess/connection/VStreamOutputMessageDecoder.java b/src/main/java/io/debezium/connector/vitess/connection/VStreamOutputMessageDecoder.java
index be7dc3ef..0999a394 100644
--- a/src/main/java/io/debezium/connector/vitess/connection/VStreamOutputMessageDecoder.java
+++ b/src/main/java/io/debezium/connector/vitess/connection/VStreamOutputMessageDecoder.java
@@ -290,7 +290,7 @@ private List resolveColumns(Row row, Table table) {
for (short i = 0; i < numberOfColumns; i++) {
final io.debezium.relational.Column column = tableColumns.get(i);
final String columnName = column.name();
- final VitessType vitessType = new VitessType(column.typeName(), column.jdbcType());
+ final VitessType vitessType = new VitessType(column.typeName(), column.jdbcType(), column.enumValues());
final boolean optional = column.isOptional();
final int rawValueLength = (int) row.getLengths(i);
@@ -329,10 +329,9 @@ private void handleFieldMessage(Binlogdata.VEvent vEvent) {
for (short i = 0; i < columnCount; ++i) {
Field field = fieldEvent.getFields(i);
String columnName = validateColumnName(field.getName(), schemaName, tableName);
- String columnType = field.getType().name();
- VitessType vitessType = VitessType.resolve(columnType);
+ VitessType vitessType = VitessType.resolve(field);
if (vitessType.getJdbcId() == Types.OTHER) {
- LOGGER.error("Cannot resolve JDBC type from vstream type {}", columnType);
+ LOGGER.error("Cannot resolve JDBC type from vstream field {}", field);
}
KeyMetaData keyMetaData = KeyMetaData.NONE;
@@ -365,6 +364,9 @@ private Table resolveTable(String schemaName, String tableName, List schemasAndValuesForEnumType() {
final List fields = new ArrayList<>();
fields.addAll(
Arrays.asList(
- new SchemaAndValueField("enum_col", SchemaBuilder.STRING_SCHEMA, "3")));
+ new SchemaAndValueField("enum_col", io.debezium.data.Enum.builder("small,medium,large").build(), "large")));
return fields;
}
diff --git a/src/test/java/io/debezium/connector/vitess/VitessTypeTest.java b/src/test/java/io/debezium/connector/vitess/VitessTypeTest.java
index b15ccf97..253cbe63 100644
--- a/src/test/java/io/debezium/connector/vitess/VitessTypeTest.java
+++ b/src/test/java/io/debezium/connector/vitess/VitessTypeTest.java
@@ -8,6 +8,7 @@
import static org.fest.assertions.Assertions.assertThat;
import java.sql.Types;
+import java.util.Arrays;
import org.junit.Test;
@@ -17,28 +18,62 @@ public class VitessTypeTest {
@Test
public void shouldResolveVitessTypeToJdbcType() {
- assertThat(VitessType.resolve(Query.Type.INT8.name()).getJdbcId()).isEqualTo(Types.SMALLINT);
- assertThat(VitessType.resolve(Query.Type.INT16.name()).getJdbcId()).isEqualTo(Types.SMALLINT);
- assertThat(VitessType.resolve(Query.Type.INT24.name()).getJdbcId()).isEqualTo(Types.INTEGER);
- assertThat(VitessType.resolve(Query.Type.INT32.name()).getJdbcId()).isEqualTo(Types.INTEGER);
- assertThat(VitessType.resolve(Query.Type.INT64.name()).getJdbcId()).isEqualTo(Types.BIGINT);
- assertThat(VitessType.resolve(Query.Type.FLOAT32.name()).getJdbcId()).isEqualTo(Types.FLOAT);
- assertThat(VitessType.resolve(Query.Type.FLOAT64.name()).getJdbcId()).isEqualTo(Types.DOUBLE);
- assertThat(VitessType.resolve(Query.Type.VARBINARY.name()).getJdbcId())
+ assertThat(VitessType.resolve(asField(Query.Type.INT8)).getJdbcId()).isEqualTo(Types.SMALLINT);
+ assertThat(VitessType.resolve(asField(Query.Type.INT16)).getJdbcId()).isEqualTo(Types.SMALLINT);
+ assertThat(VitessType.resolve(asField(Query.Type.INT24)).getJdbcId()).isEqualTo(Types.INTEGER);
+ assertThat(VitessType.resolve(asField(Query.Type.INT32)).getJdbcId()).isEqualTo(Types.INTEGER);
+ assertThat(VitessType.resolve(asField(Query.Type.INT64)).getJdbcId()).isEqualTo(Types.BIGINT);
+ assertThat(VitessType.resolve(asField(Query.Type.FLOAT32)).getJdbcId()).isEqualTo(Types.FLOAT);
+ assertThat(VitessType.resolve(asField(Query.Type.FLOAT64)).getJdbcId()).isEqualTo(Types.DOUBLE);
+ assertThat(VitessType.resolve(asField(Query.Type.VARBINARY)).getJdbcId())
.isEqualTo(Types.VARCHAR);
- assertThat(VitessType.resolve(Query.Type.BINARY.name()).getJdbcId()).isEqualTo(Types.VARCHAR);
- assertThat(VitessType.resolve(Query.Type.VARCHAR.name()).getJdbcId()).isEqualTo(Types.VARCHAR);
- assertThat(VitessType.resolve(Query.Type.CHAR.name()).getJdbcId()).isEqualTo(Types.VARCHAR);
- assertThat(VitessType.resolve(Query.Type.TEXT.name()).getJdbcId()).isEqualTo(Types.VARCHAR);
- assertThat(VitessType.resolve(Query.Type.JSON.name()).getJdbcId()).isEqualTo(Types.VARCHAR);
- assertThat(VitessType.resolve(Query.Type.DECIMAL.name()).getJdbcId()).isEqualTo(Types.VARCHAR);
- assertThat(VitessType.resolve(Query.Type.TIME.name()).getJdbcId()).isEqualTo(Types.VARCHAR);
- assertThat(VitessType.resolve(Query.Type.DATE.name()).getJdbcId()).isEqualTo(Types.VARCHAR);
- assertThat(VitessType.resolve(Query.Type.DATETIME.name()).getJdbcId()).isEqualTo(Types.VARCHAR);
- assertThat(VitessType.resolve(Query.Type.TIMESTAMP.name()).getJdbcId())
+ assertThat(VitessType.resolve(asField(Query.Type.BINARY)).getJdbcId()).isEqualTo(Types.VARCHAR);
+ assertThat(VitessType.resolve(asField(Query.Type.VARCHAR)).getJdbcId()).isEqualTo(Types.VARCHAR);
+ assertThat(VitessType.resolve(asField(Query.Type.CHAR)).getJdbcId()).isEqualTo(Types.VARCHAR);
+ assertThat(VitessType.resolve(asField(Query.Type.TEXT)).getJdbcId()).isEqualTo(Types.VARCHAR);
+ assertThat(VitessType.resolve(asField(Query.Type.JSON)).getJdbcId()).isEqualTo(Types.VARCHAR);
+ assertThat(VitessType.resolve(asField(Query.Type.DECIMAL)).getJdbcId()).isEqualTo(Types.VARCHAR);
+ assertThat(VitessType.resolve(asField(Query.Type.TIME)).getJdbcId()).isEqualTo(Types.VARCHAR);
+ assertThat(VitessType.resolve(asField(Query.Type.DATE)).getJdbcId()).isEqualTo(Types.VARCHAR);
+ assertThat(VitessType.resolve(asField(Query.Type.DATETIME)).getJdbcId()).isEqualTo(Types.VARCHAR);
+ assertThat(VitessType.resolve(asField(Query.Type.TIMESTAMP)).getJdbcId())
.isEqualTo(Types.VARCHAR);
- assertThat(VitessType.resolve(Query.Type.ENUM.name()).getJdbcId()).isEqualTo(Types.VARCHAR);
- assertThat(VitessType.resolve(Query.Type.SET.name()).getJdbcId()).isEqualTo(Types.VARCHAR);
- assertThat(VitessType.resolve("foo").getJdbcId()).isEqualTo(Types.OTHER);
+ assertThat(VitessType.resolve(asField(Query.Type.SET)).getJdbcId()).isEqualTo(Types.VARCHAR);
+ assertThat(VitessType.resolve(asField(Query.Type.GEOMETRY)).getJdbcId()).isEqualTo(Types.OTHER);
+ }
+
+ @Test
+ public void shouldResolveEnumToVitessType() {
+ Query.Field enumField = Query.Field.newBuilder()
+ .setType(Query.Type.ENUM)
+ .setColumnType("enum('eu','us','asia')")
+ .build();
+ assertThat(VitessType.resolve(enumField))
+ .isEqualTo(new VitessType(Query.Type.ENUM.name(), Types.INTEGER, Arrays.asList("eu", "us", "asia")));
+
+ enumField = Query.Field.newBuilder()
+ .setType(Query.Type.ENUM)
+ .setColumnType("enum('e,u','us','asia')")
+ .build();
+ assertThat(VitessType.resolve(enumField))
+ .isEqualTo(new VitessType(Query.Type.ENUM.name(), Types.INTEGER, Arrays.asList("e,u", "us", "asia")));
+
+ enumField = Query.Field.newBuilder()
+ .setType(Query.Type.ENUM)
+ .setColumnType("enum('e'',u','us','asia')")
+ .build();
+ assertThat(VitessType.resolve(enumField))
+ .isEqualTo(new VitessType(Query.Type.ENUM.name(), Types.INTEGER, Arrays.asList("e',u", "us", "asia")));
+
+ enumField = Query.Field.newBuilder()
+ .setType(Query.Type.ENUM)
+ .setColumnType("enum('e'','',''u','us','asia')")
+ .build();
+ assertThat(VitessType.resolve(enumField))
+ .isEqualTo(new VitessType(Query.Type.ENUM.name(), Types.INTEGER, Arrays.asList("e',','u", "us", "asia")));
+ }
+
+ private Query.Field asField(Query.Type type) {
+ return Query.Field.newBuilder().setType(type).build();
}
}