diff --git a/presto-docs/src/main/sphinx/connector/mongodb.rst b/presto-docs/src/main/sphinx/connector/mongodb.rst index a78d37b24960a..f40e7b0c50554 100644 --- a/presto-docs/src/main/sphinx/connector/mongodb.rst +++ b/presto-docs/src/main/sphinx/connector/mongodb.rst @@ -306,6 +306,34 @@ Field Required Type Description There is no limit on field descriptions for either key or message. +JSON Type Handling +------------------ + +The connector supports writing ``json`` columns by converting their contents to BSON +using ``.parse(...)``. + +For example: + +.. code-block:: sql + + CREATE TABLE orders ( + orderkey bigint, + orderstatus varchar, + totalprice double, + orderdate date, + metadata json + ); + + INSERT INTO orders VALUES ( + 3, + 'processing', + 150.0, + current_date, + JSON '{"created_by": "admin", "priority": "high"}' + ); + +The JSON string must be well-formed. If it's not, the insert will fail with a parsing error. + ObjectId -------- diff --git a/presto-mongodb/pom.xml b/presto-mongodb/pom.xml index 3178c4c79d9fa..8fe49dc8de866 100644 --- a/presto-mongodb/pom.xml +++ b/presto-mongodb/pom.xml @@ -18,6 +18,12 @@ + + + com.facebook.presto + presto-plugin-toolkit + + org.mongodb mongo-java-driver diff --git a/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoPageSink.java b/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoPageSink.java index c5390110d3b85..0999c66bb3f85 100644 --- a/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoPageSink.java +++ b/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoPageSink.java @@ -40,6 +40,7 @@ import com.mongodb.client.MongoCollection; import com.mongodb.client.model.InsertManyOptions; import io.airlift.slice.Slice; +import org.bson.BsonInvalidOperationException; import org.bson.Document; import org.bson.types.Binary; import org.bson.types.ObjectId; @@ -60,12 +61,14 @@ import static com.facebook.presto.common.type.Varchars.isVarcharType; import static com.facebook.presto.mongodb.ObjectIdType.OBJECT_ID; import static com.facebook.presto.mongodb.TypeUtils.isArrayType; +import static com.facebook.presto.mongodb.TypeUtils.isJsonType; import static com.facebook.presto.mongodb.TypeUtils.isMapType; import static com.facebook.presto.mongodb.TypeUtils.isRowType; import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED; import static java.util.Collections.unmodifiableList; import static java.util.Collections.unmodifiableMap; import static java.util.concurrent.CompletableFuture.completedFuture; +import static org.bson.Document.parse; public class MongoPageSink implements ConnectorPageSink @@ -175,6 +178,15 @@ private Object getObjectValue(Type type, Block block, int position) } return new BigDecimal(unscaledValue); } + if (isJsonType(type)) { + String json = type.getSlice(block, position).toStringUtf8(); + try { + return parse(json); + } + catch (BsonInvalidOperationException e) { + throw new PrestoException(NOT_SUPPORTED, "Can't convert json to MongoDB Document: " + json, e); + } + } if (isArrayType(type)) { Type elementType = type.getTypeParameters().get(0); diff --git a/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoPageSource.java b/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoPageSource.java index 193ed71b948f0..d1f48a3427d1d 100644 --- a/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoPageSource.java +++ b/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoPageSource.java @@ -46,8 +46,10 @@ import static com.facebook.presto.common.type.VarbinaryType.VARBINARY; import static com.facebook.presto.mongodb.ObjectIdType.OBJECT_ID; import static com.facebook.presto.mongodb.TypeUtils.isArrayType; +import static com.facebook.presto.mongodb.TypeUtils.isJsonType; import static com.facebook.presto.mongodb.TypeUtils.isMapType; import static com.facebook.presto.mongodb.TypeUtils.isRowType; +import static com.facebook.presto.plugin.base.JsonTypeUtil.jsonParse; import static com.facebook.presto.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR; import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Verify.verify; @@ -222,6 +224,9 @@ else if (type.equals(VARBINARY)) { output.appendNull(); } } + else if (isJsonType(type)) { + type.writeSlice(output, jsonParse(utf8Slice(toVarcharValue(value)))); + } else { throw new PrestoException(GENERIC_INTERNAL_ERROR, "Unhandled type for Slice: " + type.getTypeSignature()); } diff --git a/presto-mongodb/src/main/java/com/facebook/presto/mongodb/TypeUtils.java b/presto-mongodb/src/main/java/com/facebook/presto/mongodb/TypeUtils.java index 08b6bdb95ee90..b362ae7db8372 100644 --- a/presto-mongodb/src/main/java/com/facebook/presto/mongodb/TypeUtils.java +++ b/presto-mongodb/src/main/java/com/facebook/presto/mongodb/TypeUtils.java @@ -19,6 +19,7 @@ import java.util.function.Predicate; import static com.facebook.presto.common.type.DateType.DATE; +import static com.facebook.presto.common.type.StandardTypes.JSON; import static com.facebook.presto.common.type.TimeType.TIME; import static com.facebook.presto.common.type.TimestampType.TIMESTAMP; import static com.facebook.presto.common.type.TimestampWithTimeZoneType.TIMESTAMP_WITH_TIME_ZONE; @@ -31,6 +32,10 @@ public static boolean isArrayType(Type type) { return type.getTypeSignature().getBase().equals(StandardTypes.ARRAY); } + public static boolean isJsonType(Type type) + { + return type.getTypeSignature().getBase().equals(JSON); + } public static boolean isMapType(Type type) { diff --git a/presto-mongodb/src/test/java/com/facebook/presto/mongodb/TestMongoIntegrationSmokeTest.java b/presto-mongodb/src/test/java/com/facebook/presto/mongodb/TestMongoIntegrationSmokeTest.java index db50a78366ee5..a5e820d4a1dfb 100644 --- a/presto-mongodb/src/test/java/com/facebook/presto/mongodb/TestMongoIntegrationSmokeTest.java +++ b/presto-mongodb/src/test/java/com/facebook/presto/mongodb/TestMongoIntegrationSmokeTest.java @@ -78,7 +78,8 @@ public void createTableWithEveryType() ", DATE '1980-05-07' _date" + ", TIMESTAMP '1980-05-07 11:22:33.456' _timestamp" + ", ObjectId('ffffffffffffffffffffffff') _objectid" + - ", cast(ObjectId('ffffffffffffffffffffffff') as varchar) _objectid_string"; + ", cast(ObjectId('ffffffffffffffffffffffff') as varchar) _objectid_string" + + ", JSON '{\"name\":\"alice\"}' _json"; assertUpdate(query, 1); @@ -93,6 +94,7 @@ public void createTableWithEveryType() assertEquals(row.getField(5), LocalDate.of(1980, 5, 7)); assertEquals(row.getField(6), LocalDateTime.of(1980, 5, 7, 11, 22, 33, 456_000_000)); assertEquals(row.getField(8), "ffffffffffffffffffffffff"); + assertEquals(row.getField(9), "{\"name\":\"alice\"}"); assertUpdate("DROP TABLE test_types_table"); assertFalse(getQueryRunner().tableExists(getSession(), "test_types_table")); @@ -114,6 +116,7 @@ public void testInsertWithEveryType() ", ts timestamp" + ", tm time" + ", objid objectid" + + ", _json json" + ")"; getQueryRunner().execute(getSession(), createSql); @@ -128,7 +131,8 @@ public void testInsertWithEveryType() ", DATE '1980-05-07' _date" + ", TIMESTAMP '1980-05-07 11:22:33.456' _timestamp" + ", TIME '11:22:33.456' _time" + - ", ObjectId('ffffffffffffffffffffffff') _objectid"; + ", ObjectId('ffffffffffffffffffffffff') _objectid" + + ", JSON '{\"name\":\"alice\"}' _json"; getQueryRunner().execute(getSession(), insertSql); MaterializedResult results = getQueryRunner().execute(getSession(), "SELECT * FROM test_insert_types_table").toTestTypes(); @@ -142,6 +146,7 @@ public void testInsertWithEveryType() assertEquals(row.getField(5), LocalDate.of(1980, 5, 7)); assertEquals(row.getField(6), LocalDateTime.of(1980, 5, 7, 11, 22, 33, 456_000_000)); assertEquals(row.getField(7), LocalTime.of(11, 22, 33, 456_000_000)); + assertEquals(row.getField(8), "{\"name\":\"alice\"}"); assertUpdate("DROP TABLE test_insert_types_table"); assertFalse(getQueryRunner().tableExists(getSession(), "test_insert_types_table")); } @@ -298,4 +303,29 @@ public void testAlterTable() assertQuery("SELECT email_id from test_alter.tmp_alter_table WHERE email_id IS NOT NULL", "SELECT 'example@example.com'"); assertUpdate("ALTER TABLE test_alter.tmp_alter_table DROP COLUMN email_id"); } + + @Test + public void testJson() + { + assertUpdate("CREATE TABLE test_json (id INT, col JSON)"); + + assertUpdate("INSERT INTO test_json VALUES (1, JSON '{\"name\":\"alice\"}')", 1); + assertQuery("SELECT json_extract_scalar(col, '$.name') FROM test_json WHERE id = 1", "SELECT 'alice'"); + + assertUpdate("INSERT INTO test_json VALUES (2, JSON '{\"numbers\":[1, 2, 3]}')", 1); + assertQuery("SELECT json_extract(col, '$.numbers[0]') FROM test_json WHERE id = 2", "SELECT 1"); + + assertUpdate("INSERT INTO test_json VALUES (3, NULL)", 1); + assertQuery("SELECT col FROM test_json WHERE id = 3", "SELECT NULL"); + + assertQueryFails( + "CREATE TABLE test_json_scalar AS SELECT JSON '1' AS col", + "Can't convert json to MongoDB Document.*"); + + assertQueryFails( + "CREATE TABLE test_json_array AS SELECT JSON '[\"a\", \"b\", \"c\"]' AS col", + "Can't convert json to MongoDB Document.*"); + + assertUpdate("DROP TABLE test_json"); + } } diff --git a/presto-plugin-toolkit/pom.xml b/presto-plugin-toolkit/pom.xml index 123e86c48644c..8ff87a9b72a5a 100644 --- a/presto-plugin-toolkit/pom.xml +++ b/presto-plugin-toolkit/pom.xml @@ -52,6 +52,16 @@ jackson-databind + + com.fasterxml.jackson.core + jackson-core + + + + io.airlift + slice + + com.facebook.airlift log diff --git a/presto-plugin-toolkit/src/main/java/com/facebook/presto/plugin/base/JsonTypeUtil.java b/presto-plugin-toolkit/src/main/java/com/facebook/presto/plugin/base/JsonTypeUtil.java new file mode 100644 index 0000000000000..6192fe775301a --- /dev/null +++ b/presto-plugin-toolkit/src/main/java/com/facebook/presto/plugin/base/JsonTypeUtil.java @@ -0,0 +1,67 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.plugin.base; + +import com.facebook.airlift.json.JsonObjectMapperProvider; +import com.facebook.presto.spi.PrestoException; +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonFactoryBuilder; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.airlift.slice.DynamicSliceOutput; +import io.airlift.slice.Slice; +import io.airlift.slice.SliceOutput; + +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStream; + +import static com.facebook.presto.spi.StandardErrorCode.INVALID_FUNCTION_ARGUMENT; +import static com.fasterxml.jackson.core.JsonFactory.Feature.CANONICALIZE_FIELD_NAMES; +import static com.fasterxml.jackson.databind.SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS; +import static java.lang.String.format; +import static java.nio.charset.StandardCharsets.UTF_8; + +public class JsonTypeUtil +{ + private static final JsonFactory JSON_FACTORY = new JsonFactoryBuilder().disable(CANONICALIZE_FIELD_NAMES).build(); + private static final ObjectMapper SORTED_MAPPER = new JsonObjectMapperProvider().get().configure(ORDER_MAP_ENTRIES_BY_KEYS, true); + + private JsonTypeUtil() {} + + public static Slice jsonParse(Slice slice) + { + // cast(json_parse(x) AS t)` will be optimized into `$internal$json_string_to_array/map/row_cast` in ExpressionOptimizer + // If you make changes to this function (e.g. use parse JSON string into some internal representation), + // make sure `$internal$json_string_to_array/map/row_cast` is changed accordingly. + try (JsonParser parser = createJsonParser(JSON_FACTORY, slice)) { + SliceOutput dynamicSliceOutput = new DynamicSliceOutput(slice.length()); + SORTED_MAPPER.writeValue((OutputStream) dynamicSliceOutput, SORTED_MAPPER.readValue(parser, Object.class)); + // nextToken() returns null if the input is parsed correctly, + // but will throw an exception if there are trailing characters. + parser.nextToken(); + return dynamicSliceOutput.slice(); + } + catch (IOException | RuntimeException e) { + throw new PrestoException(INVALID_FUNCTION_ARGUMENT, format("Cannot convert value to JSON: '%s'", slice.toStringUtf8()), e); + } + } + private static JsonParser createJsonParser(JsonFactory factory, Slice json) + throws IOException + { + // Jackson tries to detect the character encoding automatically when + // using InputStream, so we pass an InputStreamReader instead. + return factory.createParser(new InputStreamReader(json.getInput(), UTF_8)); + } +}