Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
<version.debezium>${project.version}</version.debezium>

<!-- Dependencies -->
<version.vitess.grpc>7.0.0</version.vitess.grpc>
<version.vitess.grpc>9.0.0</version.vitess.grpc>
Comment thread
gunnarmorling marked this conversation as resolved.
<version.gson>2.7</version.gson>
<version.grpc>1.16.0</version.grpc>
<version.grpc>1.33.0</version.grpc>
<version.joda>2.10.1</version.joda>
<version.google.protos>1.17.0</version.google.protos>
<version.jsonassert>1.5.0</version.jsonassert>
Expand Down Expand Up @@ -145,6 +145,11 @@
<artifactId>grpc-context</artifactId>
<version>${version.grpc}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-api</artifactId>
<version>${version.grpc}</version>
</dependency>
<dependency>
<groupId>com.google.api.grpc</groupId>
<artifactId>proto-google-common-protos</artifactId>
Expand Down
104 changes: 93 additions & 11 deletions src/main/java/io/debezium/connector/vitess/VitessType.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@
package io.debezium.connector.vitess;

import java.sql.Types;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;

import io.vitess.proto.Query;

/** The Vitess table column type */
public class VitessType {
Expand All @@ -14,10 +20,17 @@ public class VitessType {
private final String name;
// enum of column jdbc type
private final int jdbcId;
// permitted enum values
private final List<String> enumValues;

public VitessType(String name, int jdbcId) {
this(name, jdbcId, Collections.emptyList());
}

public VitessType(String name, int jdbcId, List<String> enumValues) {
this.name = name;
this.jdbcId = jdbcId;
this.enumValues = Collections.unmodifiableList(enumValues);
}

public String getName() {
Expand All @@ -28,27 +41,59 @@ public int getJdbcId() {
return jdbcId;
}

public List<String> getEnumValues() {
return enumValues;
}

public boolean isEnum() {
return !enumValues.isEmpty();
}

@Override
public String toString() {
return "VitessType{" + "name='" + name + '\'' + ", jdbcId=" + jdbcId + '}';
return "VitessType{" +
"name='" + name + '\'' +
", jdbcId=" + jdbcId +
", enumValues=" + enumValues +
'}';
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
VitessType that = (VitessType) o;
return jdbcId == that.jdbcId && name.equals(that.name) && Objects.equals(enumValues, that.enumValues);
}

@Override
public int hashCode() {
return Objects.hash(name, jdbcId, enumValues);
}

// Resolve JDBC type from vstream FIELD event
public static VitessType resolve(String columnType) {
switch (columnType) {
public static VitessType resolve(Query.Field field) {
String type = field.getType().name();
switch (type) {
case "INT8":
case "UINT8":
case "INT16":
return new VitessType(columnType, Types.SMALLINT);
return new VitessType(type, Types.SMALLINT);
case "UINT16":
case "INT24":
case "UINT24":
case "INT32":
return new VitessType(columnType, Types.INTEGER);
return new VitessType(type, Types.INTEGER);
case "ENUM":
return new VitessType(type, Types.INTEGER, resolveEnumValues(field.getColumnType()));
case "UINT32":
case "INT64":
case "UINT64":
return new VitessType(columnType, Types.BIGINT);
return new VitessType(type, Types.BIGINT);
case "VARBINARY":
case "BINARY":
case "VARCHAR":
Expand All @@ -61,15 +106,52 @@ public static VitessType resolve(String columnType) {
case "DATETIME":
case "TIMESTAMP":
case "YEAR":
case "ENUM":
case "SET":
return new VitessType(columnType, Types.VARCHAR);
return new VitessType(type, Types.VARCHAR);
case "FLOAT32":
return new VitessType(columnType, Types.FLOAT);
return new VitessType(type, Types.FLOAT);
case "FLOAT64":
return new VitessType(columnType, Types.DOUBLE);
return new VitessType(type, Types.DOUBLE);
default:
return new VitessType(columnType, Types.OTHER);
return new VitessType(type, Types.OTHER);
}
}

/**
* Resolve the list of permitted Enum values from the Enum Definition
* @param enumDefinition the Enum column definition from the MySQL table. E.g. "enum('m','l','xl')"
* @return The list of permitted Enum values
*/
private static List<String> resolveEnumValues(String enumDefinition) {
List<String> enumValues = new ArrayList<>();
if (enumDefinition == null || enumDefinition.length() == 0) {
return enumValues;
}

StringBuilder sb = new StringBuilder();
boolean startCollecting = false;
char[] chars = enumDefinition.toCharArray();
for (int i = 0; i < chars.length; i++) {
if (chars[i] == '\'') {
if (chars[i + 1] != '\'') {
if (startCollecting) {
// end of the Enum value, add the Enum value to the result list
enumValues.add(sb.toString());
sb.setLength(0);
}
startCollecting = !startCollecting;
}
else {
sb.append("'");
// In MySQL, the single quote in the Enum definition "a'b" is escaped and becomes "a''b".
// Skip the second single-quote
i++;
}
}
else if (startCollecting) {
sb.append(chars[i]);
}
}
return enumValues;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package io.debezium.connector.vitess;

import java.time.ZoneOffset;
import java.util.List;

import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.SchemaBuilder;
Expand All @@ -16,6 +17,7 @@
import io.debezium.relational.Column;
import io.debezium.relational.RelationalChangeRecordEmitter;
import io.debezium.relational.ValueConverter;
import io.vitess.proto.Query;

/** Used by {@link RelationalChangeRecordEmitter} to convert Java value to Connect value */
public class VitessValueConverter extends JdbcValueConverters {
Expand All @@ -34,9 +36,12 @@ public VitessValueConverter(
@Override
public SchemaBuilder schemaBuilder(Column column) {
String typeName = column.typeName().toUpperCase();
if (matches(typeName, "JSON")) {
if (matches(typeName, Query.Type.JSON.name())) {
return Json.builder();
}
if (matches(typeName, Query.Type.ENUM.name())) {
return io.debezium.data.Enum.builder(column.enumValues());
}

final SchemaBuilder jdbcSchemaBuilder = super.schemaBuilder(column);
if (jdbcSchemaBuilder == null) {
Expand All @@ -50,6 +55,11 @@ public SchemaBuilder schemaBuilder(Column column) {
// Convert Java value to Kafka connect value.
@Override
public ValueConverter converter(Column column, Field fieldDefn) {
String typeName = column.typeName().toUpperCase();
if (matches(typeName, Query.Type.ENUM.name())) {
return (data) -> convertEnumToString(column.enumValues(), column, fieldDefn, data);
}

final ValueConverter jdbcConverter = super.converter(column, fieldDefn);
if (jdbcConverter == null) {
return includeUnknownDatatypes
Expand All @@ -75,4 +85,39 @@ protected boolean matches(String upperCaseTypeName, String upperCaseMatch) {
}
return upperCaseMatch.equals(upperCaseTypeName) || upperCaseTypeName.startsWith(upperCaseMatch + "(");
}

/**
* Converts a value object for a MySQL {@code ENUM}, which is represented in the binlog events as an integer value containing
* the index of the enum option.
*
* @param options the characters that appear in the same order as defined in the column; may not be null
* @param column the column definition describing the {@code data} value; never null
* @param fieldDefn the field definition; never null
* @param data the data object to be converted into an {@code ENUM} literal String value
* @return the converted value, or empty string if the conversion could not be made
*/
private Object convertEnumToString(List<String> options, Column column, Field fieldDefn, Object data) {
return convertValue(column, fieldDefn, data, "", (r) -> {
if (options != null) {
// The binlog will contain an int with the 1-based index of the option in the enum value ...
int value = ((Integer) data).intValue();
if (value == 0) {
// an invalid value was specified, which corresponds to the empty string '' and an index of 0
r.deliver("");
}
else {
int index = value - 1; // 'options' is 0-based
if (index < options.size() && index >= 0) {
r.deliver(options.get(index));
}
else {
r.deliver("");
}
}
}
else {
r.deliver("");
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ private List<Column> resolveColumns(Row row, Table table) {
for (short i = 0; i < numberOfColumns; i++) {
final io.debezium.relational.Column column = tableColumns.get(i);
final String columnName = column.name();
final VitessType vitessType = new VitessType(column.typeName(), column.jdbcType());
final VitessType vitessType = new VitessType(column.typeName(), column.jdbcType(), column.enumValues());
final boolean optional = column.isOptional();

final int rawValueLength = (int) row.getLengths(i);
Expand Down Expand Up @@ -329,10 +329,9 @@ private void handleFieldMessage(Binlogdata.VEvent vEvent) {
for (short i = 0; i < columnCount; ++i) {
Field field = fieldEvent.getFields(i);
String columnName = validateColumnName(field.getName(), schemaName, tableName);
String columnType = field.getType().name();
VitessType vitessType = VitessType.resolve(columnType);
VitessType vitessType = VitessType.resolve(field);
if (vitessType.getJdbcId() == Types.OTHER) {
LOGGER.error("Cannot resolve JDBC type from vstream type {}", columnType);
LOGGER.error("Cannot resolve JDBC type from vstream field {}", field);
}

KeyMetaData keyMetaData = KeyMetaData.NONE;
Expand Down Expand Up @@ -365,6 +364,9 @@ private Table resolveTable(String schemaName, String tableName, List<ColumnMetaD
.type(columnMetaData.getVitessType().getName())
.jdbcType(columnMetaData.getVitessType().getJdbcId())
.optional(columnMetaData.isOptional());
if (columnMetaData.getVitessType().isEnum()) {
editor = editor.enumValues(columnMetaData.getVitessType().getEnumValues());
}
cols.add(editor.create());

switch (columnMetaData.getKeyMetaData()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ protected List<SchemaAndValueField> schemasAndValuesForEnumType() {
final List<SchemaAndValueField> fields = new ArrayList<>();
fields.addAll(
Arrays.asList(
new SchemaAndValueField("enum_col", SchemaBuilder.STRING_SCHEMA, "3")));
new SchemaAndValueField("enum_col", io.debezium.data.Enum.builder("small,medium,large").build(), "large")));
return fields;
}

Expand Down
77 changes: 56 additions & 21 deletions src/test/java/io/debezium/connector/vitess/VitessTypeTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import static org.fest.assertions.Assertions.assertThat;

import java.sql.Types;
import java.util.Arrays;

import org.junit.Test;

Expand All @@ -17,28 +18,62 @@ public class VitessTypeTest {

@Test
public void shouldResolveVitessTypeToJdbcType() {
assertThat(VitessType.resolve(Query.Type.INT8.name()).getJdbcId()).isEqualTo(Types.SMALLINT);
assertThat(VitessType.resolve(Query.Type.INT16.name()).getJdbcId()).isEqualTo(Types.SMALLINT);
assertThat(VitessType.resolve(Query.Type.INT24.name()).getJdbcId()).isEqualTo(Types.INTEGER);
assertThat(VitessType.resolve(Query.Type.INT32.name()).getJdbcId()).isEqualTo(Types.INTEGER);
assertThat(VitessType.resolve(Query.Type.INT64.name()).getJdbcId()).isEqualTo(Types.BIGINT);
assertThat(VitessType.resolve(Query.Type.FLOAT32.name()).getJdbcId()).isEqualTo(Types.FLOAT);
assertThat(VitessType.resolve(Query.Type.FLOAT64.name()).getJdbcId()).isEqualTo(Types.DOUBLE);
assertThat(VitessType.resolve(Query.Type.VARBINARY.name()).getJdbcId())
assertThat(VitessType.resolve(asField(Query.Type.INT8)).getJdbcId()).isEqualTo(Types.SMALLINT);
assertThat(VitessType.resolve(asField(Query.Type.INT16)).getJdbcId()).isEqualTo(Types.SMALLINT);
assertThat(VitessType.resolve(asField(Query.Type.INT24)).getJdbcId()).isEqualTo(Types.INTEGER);
assertThat(VitessType.resolve(asField(Query.Type.INT32)).getJdbcId()).isEqualTo(Types.INTEGER);
assertThat(VitessType.resolve(asField(Query.Type.INT64)).getJdbcId()).isEqualTo(Types.BIGINT);
assertThat(VitessType.resolve(asField(Query.Type.FLOAT32)).getJdbcId()).isEqualTo(Types.FLOAT);
assertThat(VitessType.resolve(asField(Query.Type.FLOAT64)).getJdbcId()).isEqualTo(Types.DOUBLE);
assertThat(VitessType.resolve(asField(Query.Type.VARBINARY)).getJdbcId())
.isEqualTo(Types.VARCHAR);
assertThat(VitessType.resolve(Query.Type.BINARY.name()).getJdbcId()).isEqualTo(Types.VARCHAR);
assertThat(VitessType.resolve(Query.Type.VARCHAR.name()).getJdbcId()).isEqualTo(Types.VARCHAR);
assertThat(VitessType.resolve(Query.Type.CHAR.name()).getJdbcId()).isEqualTo(Types.VARCHAR);
assertThat(VitessType.resolve(Query.Type.TEXT.name()).getJdbcId()).isEqualTo(Types.VARCHAR);
assertThat(VitessType.resolve(Query.Type.JSON.name()).getJdbcId()).isEqualTo(Types.VARCHAR);
assertThat(VitessType.resolve(Query.Type.DECIMAL.name()).getJdbcId()).isEqualTo(Types.VARCHAR);
assertThat(VitessType.resolve(Query.Type.TIME.name()).getJdbcId()).isEqualTo(Types.VARCHAR);
assertThat(VitessType.resolve(Query.Type.DATE.name()).getJdbcId()).isEqualTo(Types.VARCHAR);
assertThat(VitessType.resolve(Query.Type.DATETIME.name()).getJdbcId()).isEqualTo(Types.VARCHAR);
assertThat(VitessType.resolve(Query.Type.TIMESTAMP.name()).getJdbcId())
assertThat(VitessType.resolve(asField(Query.Type.BINARY)).getJdbcId()).isEqualTo(Types.VARCHAR);
assertThat(VitessType.resolve(asField(Query.Type.VARCHAR)).getJdbcId()).isEqualTo(Types.VARCHAR);
assertThat(VitessType.resolve(asField(Query.Type.CHAR)).getJdbcId()).isEqualTo(Types.VARCHAR);
assertThat(VitessType.resolve(asField(Query.Type.TEXT)).getJdbcId()).isEqualTo(Types.VARCHAR);
assertThat(VitessType.resolve(asField(Query.Type.JSON)).getJdbcId()).isEqualTo(Types.VARCHAR);
assertThat(VitessType.resolve(asField(Query.Type.DECIMAL)).getJdbcId()).isEqualTo(Types.VARCHAR);
assertThat(VitessType.resolve(asField(Query.Type.TIME)).getJdbcId()).isEqualTo(Types.VARCHAR);
assertThat(VitessType.resolve(asField(Query.Type.DATE)).getJdbcId()).isEqualTo(Types.VARCHAR);
assertThat(VitessType.resolve(asField(Query.Type.DATETIME)).getJdbcId()).isEqualTo(Types.VARCHAR);
assertThat(VitessType.resolve(asField(Query.Type.TIMESTAMP)).getJdbcId())
.isEqualTo(Types.VARCHAR);
assertThat(VitessType.resolve(Query.Type.ENUM.name()).getJdbcId()).isEqualTo(Types.VARCHAR);
assertThat(VitessType.resolve(Query.Type.SET.name()).getJdbcId()).isEqualTo(Types.VARCHAR);
assertThat(VitessType.resolve("foo").getJdbcId()).isEqualTo(Types.OTHER);
assertThat(VitessType.resolve(asField(Query.Type.SET)).getJdbcId()).isEqualTo(Types.VARCHAR);
assertThat(VitessType.resolve(asField(Query.Type.GEOMETRY)).getJdbcId()).isEqualTo(Types.OTHER);
}

@Test
public void shouldResolveEnumToVitessType() {
Query.Field enumField = Query.Field.newBuilder()
.setType(Query.Type.ENUM)
.setColumnType("enum('eu','us','asia')")
.build();
assertThat(VitessType.resolve(enumField))
.isEqualTo(new VitessType(Query.Type.ENUM.name(), Types.INTEGER, Arrays.asList("eu", "us", "asia")));

enumField = Query.Field.newBuilder()
.setType(Query.Type.ENUM)
.setColumnType("enum('e,u','us','asia')")
.build();
assertThat(VitessType.resolve(enumField))
.isEqualTo(new VitessType(Query.Type.ENUM.name(), Types.INTEGER, Arrays.asList("e,u", "us", "asia")));

enumField = Query.Field.newBuilder()
.setType(Query.Type.ENUM)
.setColumnType("enum('e'',u','us','asia')")
.build();
assertThat(VitessType.resolve(enumField))
.isEqualTo(new VitessType(Query.Type.ENUM.name(), Types.INTEGER, Arrays.asList("e',u", "us", "asia")));

enumField = Query.Field.newBuilder()
.setType(Query.Type.ENUM)
.setColumnType("enum('e'','',''u','us','asia')")
.build();
assertThat(VitessType.resolve(enumField))
.isEqualTo(new VitessType(Query.Type.ENUM.name(), Types.INTEGER, Arrays.asList("e',','u", "us", "asia")));
}

private Query.Field asField(Query.Type type) {
return Query.Field.newBuilder().setType(type).build();
}
}