diff --git a/presto-mysql/pom.xml b/presto-mysql/pom.xml index ce6428d386a7..908e0eb7b396 100644 --- a/presto-mysql/pom.xml +++ b/presto-mysql/pom.xml @@ -52,6 +52,21 @@ javax.inject + + com.fasterxml.jackson.core + jackson-core + + + + com.fasterxml.jackson.core + jackson-databind + + + + io.airlift + json + + io.prestosql @@ -96,12 +111,6 @@ test - - io.airlift - json - test - - io.prestosql presto-main diff --git a/presto-mysql/src/main/java/io/prestosql/plugin/mysql/MySqlClient.java b/presto-mysql/src/main/java/io/prestosql/plugin/mysql/MySqlClient.java index ad9208e229ed..ac3f032b88d9 100644 --- a/presto-mysql/src/main/java/io/prestosql/plugin/mysql/MySqlClient.java +++ b/presto-mysql/src/main/java/io/prestosql/plugin/mysql/MySqlClient.java @@ -13,26 +13,41 @@ */ package io.prestosql.plugin.mysql; +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableSet; import com.mysql.jdbc.Driver; import com.mysql.jdbc.Statement; +import io.airlift.json.ObjectMapperProvider; +import io.airlift.slice.DynamicSliceOutput; +import io.airlift.slice.Slice; +import io.airlift.slice.SliceOutput; import io.prestosql.plugin.jdbc.BaseJdbcClient; import io.prestosql.plugin.jdbc.BaseJdbcConfig; +import io.prestosql.plugin.jdbc.ColumnMapping; import io.prestosql.plugin.jdbc.ConnectionFactory; import io.prestosql.plugin.jdbc.DriverConnectionFactory; import io.prestosql.plugin.jdbc.JdbcColumnHandle; import io.prestosql.plugin.jdbc.JdbcIdentity; import io.prestosql.plugin.jdbc.JdbcTableHandle; +import io.prestosql.plugin.jdbc.JdbcTypeHandle; import io.prestosql.plugin.jdbc.WriteMapping; import io.prestosql.spi.PrestoException; import io.prestosql.spi.connector.ConnectorSession; import io.prestosql.spi.connector.ConnectorTableMetadata; import io.prestosql.spi.connector.SchemaTableName; +import io.prestosql.spi.type.StandardTypes; import io.prestosql.spi.type.Type; +import io.prestosql.spi.type.TypeManager; +import io.prestosql.spi.type.TypeSignature; import io.prestosql.spi.type.VarcharType; import javax.inject.Inject; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStream; import java.sql.Connection; import java.sql.DatabaseMetaData; import java.sql.PreparedStatement; @@ -43,10 +58,14 @@ import java.util.Properties; import java.util.function.BiFunction; +import static com.fasterxml.jackson.core.JsonFactory.Feature.CANONICALIZE_FIELD_NAMES; +import static com.fasterxml.jackson.databind.SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS; import static com.google.common.base.Verify.verify; import static com.google.common.util.concurrent.MoreExecutors.directExecutor; import static com.mysql.jdbc.SQLError.SQL_STATE_ER_TABLE_EXISTS_ERROR; import static com.mysql.jdbc.SQLError.SQL_STATE_SYNTAX_ERROR; +import static io.airlift.slice.Slices.utf8Slice; +import static io.prestosql.plugin.jdbc.ColumnMapping.DISABLE_PUSHDOWN; import static io.prestosql.plugin.jdbc.DriverConnectionFactory.basicConnectionProperties; import static io.prestosql.plugin.jdbc.JdbcErrorCode.JDBC_ERROR; import static io.prestosql.plugin.jdbc.StandardColumnMappings.realWriteFunction; @@ -54,6 +73,7 @@ import static io.prestosql.plugin.jdbc.StandardColumnMappings.varbinaryWriteFunction; import static io.prestosql.plugin.jdbc.StandardColumnMappings.varcharWriteFunction; import static io.prestosql.spi.StandardErrorCode.ALREADY_EXISTS; +import static io.prestosql.spi.StandardErrorCode.INVALID_FUNCTION_ARGUMENT; import static io.prestosql.spi.StandardErrorCode.NOT_SUPPORTED; import static io.prestosql.spi.type.RealType.REAL; import static io.prestosql.spi.type.TimeWithTimeZoneType.TIME_WITH_TIME_ZONE; @@ -62,16 +82,20 @@ import static io.prestosql.spi.type.VarbinaryType.VARBINARY; import static io.prestosql.spi.type.Varchars.isVarcharType; import static java.lang.String.format; +import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.Locale.ENGLISH; public class MySqlClient extends BaseJdbcClient { + private final Type jsonType; + @Inject - public MySqlClient(BaseJdbcConfig config, MySqlConfig mySqlConfig) + public MySqlClient(BaseJdbcConfig config, MySqlConfig mySqlConfig, TypeManager typeManager) throws SQLException { super(config, "`", connectionFactory(config, mySqlConfig)); + this.jsonType = typeManager.getType(new TypeSignature(StandardTypes.JSON)); } private static ConnectionFactory connectionFactory(BaseJdbcConfig config, MySqlConfig mySqlConfig) @@ -161,6 +185,18 @@ protected String getTableSchemaName(ResultSet resultSet) return resultSet.getString("TABLE_CAT"); } + @Override + public Optional toPrestoType(ConnectorSession session, Connection connection, JdbcTypeHandle typeHandle) + { + String jdbcTypeName = typeHandle.getJdbcTypeName() + .orElseThrow(() -> new PrestoException(JDBC_ERROR, "Type name is missing: " + typeHandle)); + + if (jdbcTypeName.equalsIgnoreCase("json")) { + return Optional.of(jsonColumnMapping()); + } + return super.toPrestoType(session, connection, typeHandle); + } + @Override public WriteMapping toWriteMapping(ConnectorSession session, Type type) { @@ -197,6 +233,9 @@ else if (varcharType.getBoundedLength() <= 16777215) { } return WriteMapping.sliceMapping(dataType, varcharWriteFunction()); } + if (type.getTypeSignature().getBase().equals(StandardTypes.JSON)) { + return WriteMapping.sliceMapping("json", varcharWriteFunction()); + } return super.toWriteMapping(session, type); } @@ -257,4 +296,42 @@ public boolean isLimitGuaranteed() { return true; } + + private ColumnMapping jsonColumnMapping() + { + return ColumnMapping.sliceMapping( + jsonType, + (resultSet, columnIndex) -> jsonParse(utf8Slice(resultSet.getString(columnIndex))), + varcharWriteFunction(), + DISABLE_PUSHDOWN); + } + + private static final JsonFactory JSON_FACTORY = new JsonFactory() + .disable(CANONICALIZE_FIELD_NAMES); + + private static final ObjectMapper SORTED_MAPPER = new ObjectMapperProvider().get().configure(ORDER_MAP_ENTRIES_BY_KEYS, true); + + private static Slice jsonParse(Slice slice) + { + try (JsonParser parser = createJsonParser(slice)) { + byte[] in = slice.getBytes(); + SliceOutput dynamicSliceOutput = new DynamicSliceOutput(in.length); + SORTED_MAPPER.writeValue((OutputStream) dynamicSliceOutput, SORTED_MAPPER.readValue(parser, Object.class)); + // nextToken() returns null if the input is parsed correctly, + // but will throw an exception if there are trailing characters. + parser.nextToken(); + return dynamicSliceOutput.slice(); + } + catch (Exception e) { + throw new PrestoException(INVALID_FUNCTION_ARGUMENT, format("Cannot convert '%s' to JSON", slice.toStringUtf8())); + } + } + + private static JsonParser createJsonParser(Slice json) + throws IOException + { + // Jackson tries to detect the character encoding automatically when using InputStream + // so we pass an InputStreamReader instead. + return JSON_FACTORY.createParser(new InputStreamReader(json.getInput(), UTF_8)); + } } diff --git a/presto-mysql/src/test/java/io/prestosql/plugin/mysql/TestMySqlTypeMapping.java b/presto-mysql/src/test/java/io/prestosql/plugin/mysql/TestMySqlTypeMapping.java index d14238ddfc7c..819a655d8f50 100644 --- a/presto-mysql/src/test/java/io/prestosql/plugin/mysql/TestMySqlTypeMapping.java +++ b/presto-mysql/src/test/java/io/prestosql/plugin/mysql/TestMySqlTypeMapping.java @@ -23,6 +23,7 @@ import io.prestosql.tests.datatype.CreateAndInsertDataSetup; import io.prestosql.tests.datatype.CreateAsSelectDataSetup; import io.prestosql.tests.datatype.DataSetup; +import io.prestosql.tests.datatype.DataType; import io.prestosql.tests.datatype.DataTypeTest; import io.prestosql.tests.sql.JdbcSqlExecutor; import io.prestosql.tests.sql.PrestoSqlExecutor; @@ -32,6 +33,7 @@ import java.math.BigDecimal; import java.time.LocalDate; import java.time.ZoneId; +import java.util.function.Function; import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Strings.repeat; @@ -42,16 +44,20 @@ import static io.prestosql.spi.type.VarcharType.createVarcharType; import static io.prestosql.tests.datatype.DataType.bigintDataType; import static io.prestosql.tests.datatype.DataType.charDataType; +import static io.prestosql.tests.datatype.DataType.dataType; import static io.prestosql.tests.datatype.DataType.dateDataType; import static io.prestosql.tests.datatype.DataType.decimalDataType; import static io.prestosql.tests.datatype.DataType.doubleDataType; +import static io.prestosql.tests.datatype.DataType.formatStringLiteral; import static io.prestosql.tests.datatype.DataType.integerDataType; import static io.prestosql.tests.datatype.DataType.realDataType; import static io.prestosql.tests.datatype.DataType.smallintDataType; import static io.prestosql.tests.datatype.DataType.stringDataType; import static io.prestosql.tests.datatype.DataType.tinyintDataType; import static io.prestosql.tests.datatype.DataType.varcharDataType; +import static io.prestosql.type.JsonType.JSON; import static java.lang.String.format; +import static java.util.function.Function.identity; @Test public class TestMySqlTypeMapping @@ -262,6 +268,30 @@ public void testTimestamp() // testing this is hard because of https://github.com/prestodb/presto/issues/7122 } + @Test + public void testJson() + { + jsonTestCases(jsonDataType(value -> "JSON " + formatStringLiteral(value))) + .execute(getQueryRunner(), prestoCreateAsSelect("presto_test_json")); + jsonTestCases(jsonDataType(value -> format("CAST(%s AS JSON)", formatStringLiteral(value)))) + .execute(getQueryRunner(), mysqlCreateAndInsert("tpch.mysql_test_json")); + } + + private DataTypeTest jsonTestCases(DataType jsonDataType) + { + return DataTypeTest.create() + .addRoundTrip(jsonDataType, "{}") + .addRoundTrip(jsonDataType, null) + .addRoundTrip(jsonDataType, "null") + .addRoundTrip(jsonDataType, "123.4") + .addRoundTrip(jsonDataType, "\"abc\"") + .addRoundTrip(jsonDataType, "\"text with ' apostrophes\"") + .addRoundTrip(jsonDataType, "\"\"") + .addRoundTrip(jsonDataType, "{\"a\":1,\"b\":2}") + .addRoundTrip(jsonDataType, "{\"a\":[1,2,3],\"b\":{\"aa\":11,\"bb\":[{\"a\":1,\"b\":2},{\"a\":0}]}}") + .addRoundTrip(jsonDataType, "[]"); + } + private void testUnsupportedDataType(String databaseDataType) { JdbcSqlExecutor jdbcSqlExecutor = new JdbcSqlExecutor(mysqlServer.getJdbcUrl()); @@ -286,4 +316,13 @@ private DataSetup mysqlCreateAndInsert(String tableNamePrefix) JdbcSqlExecutor mysqlUnicodeExecutor = new JdbcSqlExecutor(mysqlServer.getJdbcUrl() + "&useUnicode=true&characterEncoding=utf8"); return new CreateAndInsertDataSetup(mysqlUnicodeExecutor, tableNamePrefix); } + + private static DataType jsonDataType(Function toLiteral) + { + return dataType( + "json", + JSON, + toLiteral, + identity()); + } }