diff --git a/presto-delta/pom.xml b/presto-delta/pom.xml index 71ae91587b262..9f6aa8dc7e784 100644 --- a/presto-delta/pom.xml +++ b/presto-delta/pom.xml @@ -343,6 +343,11 @@ jakarta.servlet-api test + + + com.facebook.airlift.drift + drift-codec + @@ -354,6 +359,7 @@ org.scala-lang:scala-library:jar commons-io:commons-io:jar + com.facebook.airlift.drift:drift-codec:jar diff --git a/presto-delta/src/test/java/com/facebook/presto/delta/TestDeltaTableHandle.java b/presto-delta/src/test/java/com/facebook/presto/delta/TestDeltaTableHandle.java index ae6ae1b4bec21..0a0d387331950 100644 --- a/presto-delta/src/test/java/com/facebook/presto/delta/TestDeltaTableHandle.java +++ b/presto-delta/src/test/java/com/facebook/presto/delta/TestDeltaTableHandle.java @@ -16,6 +16,7 @@ import com.facebook.airlift.bootstrap.Bootstrap; import com.facebook.airlift.json.JsonCodec; import com.facebook.airlift.json.JsonModule; +import com.facebook.drift.codec.guice.ThriftCodecModule; import com.facebook.presto.block.BlockJsonSerde; import com.facebook.presto.common.block.Block; import com.facebook.presto.common.block.BlockEncoding; @@ -24,6 +25,7 @@ import com.facebook.presto.common.type.StandardTypes; import com.facebook.presto.common.type.Type; import com.facebook.presto.common.type.TypeManager; +import com.facebook.presto.connector.ConnectorManager; import com.facebook.presto.metadata.FunctionAndTypeManager; import com.facebook.presto.metadata.HandleJsonModule; import com.facebook.presto.metadata.HandleResolver; @@ -92,6 +94,8 @@ private JsonCodec getJsonCodec() Module module = binder -> { binder.install(new JsonModule()); binder.install(new HandleJsonModule()); + binder.bind(ConnectorManager.class).toProvider(() -> null).in(Scopes.SINGLETON); + binder.install(new ThriftCodecModule()); configBinder(binder).bindConfig(FeaturesConfig.class); FunctionAndTypeManager functionAndTypeManager = createTestFunctionAndTypeManager(); binder.bind(TypeManager.class).toInstance(functionAndTypeManager); diff --git a/presto-docs/src/main/sphinx/admin/properties.rst b/presto-docs/src/main/sphinx/admin/properties.rst index b1aea2b875fb1..0cff2b013b675 100644 --- a/presto-docs/src/main/sphinx/admin/properties.rst +++ b/presto-docs/src/main/sphinx/admin/properties.rst @@ -571,6 +571,24 @@ shared across all of the partitioned consumers. Increasing this value may improve network throughput for data transferred between stages if the network has high latency or if there are many nodes in the cluster. +``use-connector-provided-serialization-codecs`` +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +* **Type:** ``boolean`` +* **Default value:** ``false`` + +Enables the use of custom connector-provided serialization codecs for handles. +This feature allows connectors to use their own serialization format for +handle objects (such as table handles, column handles, and splits) instead +of standard JSON serialization. + +When enabled, connectors that provide a ``ConnectorCodecProvider`` with +appropriate codecs will have their handles serialized using custom binary +formats, which are then Base64-encoded for transport. Connectors without +codec support automatically fall back to standard JSON serialization. +Internal Presto handles (prefixed with ``$``) always use JSON serialization +regardless of this setting. + .. _task-properties: Task Properties diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveSplit.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveSplit.java index b5d7f6b3c8f43..395ef65a24096 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveSplit.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveSplit.java @@ -16,6 +16,7 @@ import com.facebook.airlift.bootstrap.Bootstrap; import com.facebook.airlift.json.JsonCodec; import com.facebook.airlift.json.JsonModule; +import com.facebook.drift.codec.guice.ThriftCodecModule; import com.facebook.presto.block.BlockJsonSerde; import com.facebook.presto.common.block.Block; import com.facebook.presto.common.block.BlockEncoding; @@ -23,6 +24,7 @@ import com.facebook.presto.common.block.BlockEncodingSerde; import com.facebook.presto.common.type.Type; import com.facebook.presto.common.type.TypeManager; +import com.facebook.presto.connector.ConnectorManager; import com.facebook.presto.hive.metastore.Column; import com.facebook.presto.hive.metastore.Storage; import com.facebook.presto.hive.metastore.StorageFormat; @@ -153,8 +155,10 @@ private JsonCodec getJsonCodec() { Module module = binder -> { binder.install(new JsonModule()); + binder.install(new ThriftCodecModule()); binder.install(new HandleJsonModule()); configBinder(binder).bindConfig(FeaturesConfig.class); + binder.bind(ConnectorManager.class).toProvider(() -> null); FunctionAndTypeManager functionAndTypeManager = createTestFunctionAndTypeManager(); binder.bind(TypeManager.class).toInstance(functionAndTypeManager); jsonBinder(binder).addDeserializerBinding(Type.class).to(TypeDeserializer.class); diff --git a/presto-main-base/src/main/java/com/facebook/presto/connector/ConnectorManager.java b/presto-main-base/src/main/java/com/facebook/presto/connector/ConnectorManager.java index dc85eea82f0e5..de35836a2215a 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/connector/ConnectorManager.java +++ b/presto-main-base/src/main/java/com/facebook/presto/connector/ConnectorManager.java @@ -393,6 +393,16 @@ private Connector createConnector(ConnectorId connectorId, ConnectorFactory fact } } + public Optional getConnectorCodecProvider(ConnectorId connectorId) + { + requireNonNull(connectorId, "connectorId is null"); + MaterializedConnector materializedConnector = connectors.get(connectorId); + if (materializedConnector == null) { + return Optional.empty(); + } + return materializedConnector.getConnectorCodecProvider(); + } + private static class MaterializedConnector { private final ConnectorId connectorId; diff --git a/presto-main-base/src/main/java/com/facebook/presto/index/IndexHandleJacksonModule.java b/presto-main-base/src/main/java/com/facebook/presto/index/IndexHandleJacksonModule.java index 6536d9d63bf23..499f3f58088af 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/index/IndexHandleJacksonModule.java +++ b/presto-main-base/src/main/java/com/facebook/presto/index/IndexHandleJacksonModule.java @@ -13,19 +13,47 @@ */ package com.facebook.presto.index; +import com.facebook.presto.connector.ConnectorManager; import com.facebook.presto.metadata.AbstractTypedJacksonModule; import com.facebook.presto.metadata.HandleResolver; +import com.facebook.presto.spi.ConnectorCodec; +import com.facebook.presto.spi.ConnectorId; import com.facebook.presto.spi.ConnectorIndexHandle; +import com.facebook.presto.spi.connector.ConnectorCodecProvider; +import com.facebook.presto.sql.analyzer.FeaturesConfig; import jakarta.inject.Inject; +import jakarta.inject.Provider; + +import java.util.Optional; +import java.util.function.Function; public class IndexHandleJacksonModule extends AbstractTypedJacksonModule { @Inject - public IndexHandleJacksonModule(HandleResolver handleResolver) + public IndexHandleJacksonModule( + HandleResolver handleResolver, + Provider connectorManagerProvider, + FeaturesConfig featuresConfig) + { + super(ConnectorIndexHandle.class, + handleResolver::getId, + handleResolver::getIndexHandleClass, + featuresConfig.isUseConnectorProvidedSerializationCodecs(), + connectorId -> connectorManagerProvider.get() + .getConnectorCodecProvider(connectorId) + .flatMap(ConnectorCodecProvider::getConnectorIndexHandleCodec)); + } + + public IndexHandleJacksonModule( + HandleResolver handleResolver, + FeaturesConfig featuresConfig, + Function>> codecExtractor) { super(ConnectorIndexHandle.class, handleResolver::getId, - handleResolver::getIndexHandleClass); + handleResolver::getIndexHandleClass, + featuresConfig.isUseConnectorProvidedSerializationCodecs(), + codecExtractor); } } diff --git a/presto-main-base/src/main/java/com/facebook/presto/metadata/AbstractTypedJacksonModule.java b/presto-main-base/src/main/java/com/facebook/presto/metadata/AbstractTypedJacksonModule.java index 489bb076d764c..3176c95664419 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/metadata/AbstractTypedJacksonModule.java +++ b/presto-main-base/src/main/java/com/facebook/presto/metadata/AbstractTypedJacksonModule.java @@ -13,6 +13,8 @@ */ package com.facebook.presto.metadata; +import com.facebook.presto.spi.ConnectorCodec; +import com.facebook.presto.spi.ConnectorId; import com.fasterxml.jackson.annotation.JsonTypeInfo.Id; import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.core.JsonParser; @@ -38,6 +40,7 @@ import com.google.common.cache.CacheBuilder; import java.io.IOException; +import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.function.Function; @@ -49,18 +52,38 @@ public abstract class AbstractTypedJacksonModule extends SimpleModule { private static final String TYPE_PROPERTY = "@type"; + private static final String DATA_PROPERTY = "customSerializedValue"; protected AbstractTypedJacksonModule( Class baseClass, Function nameResolver, - Function> classResolver) + Function> classResolver, + boolean binarySerializationEnabled, + Function>> codecExtractor) { super(baseClass.getSimpleName() + "Module", Version.unknownVersion()); - TypeIdResolver typeResolver = new InternalTypeResolver<>(nameResolver, classResolver); + requireNonNull(baseClass, "baseClass is null"); + requireNonNull(nameResolver, "nameResolver is null"); + requireNonNull(classResolver, "classResolver is null"); + requireNonNull(codecExtractor, "codecExtractor is null"); - addSerializer(baseClass, new InternalTypeSerializer<>(baseClass, typeResolver)); - addDeserializer(baseClass, new InternalTypeDeserializer<>(baseClass, typeResolver)); + if (binarySerializationEnabled) { + // Use codec serialization + addSerializer(baseClass, new CodecSerializer<>( + TYPE_PROPERTY, + DATA_PROPERTY, + codecExtractor, + nameResolver, + new InternalTypeResolver<>(nameResolver, classResolver))); + addDeserializer(baseClass, new CodecDeserializer<>(TYPE_PROPERTY, DATA_PROPERTY, codecExtractor, classResolver)); + } + else { + // Use legacy typed serialization + TypeIdResolver typeResolver = new InternalTypeResolver<>(nameResolver, classResolver); + addSerializer(baseClass, new InternalTypeSerializer<>(baseClass, typeResolver)); + addDeserializer(baseClass, new InternalTypeDeserializer<>(baseClass, typeResolver)); + } } private static class InternalTypeDeserializer diff --git a/presto-main-base/src/main/java/com/facebook/presto/metadata/CodecDeserializer.java b/presto-main-base/src/main/java/com/facebook/presto/metadata/CodecDeserializer.java new file mode 100644 index 0000000000000..1e9d5a8785316 --- /dev/null +++ b/presto-main-base/src/main/java/com/facebook/presto/metadata/CodecDeserializer.java @@ -0,0 +1,121 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.metadata; + +import com.facebook.presto.spi.ConnectorCodec; +import com.facebook.presto.spi.ConnectorId; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonToken; +import com.fasterxml.jackson.core.TreeNode; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonDeserializer; +import com.fasterxml.jackson.databind.jsontype.TypeDeserializer; +import com.fasterxml.jackson.databind.node.ObjectNode; + +import java.io.IOException; +import java.util.Base64; +import java.util.Optional; +import java.util.function.Function; + +import static java.util.Objects.requireNonNull; + +class CodecDeserializer + extends JsonDeserializer +{ + private final Function> classResolver; + private final Function>> codecExtractor; + private final String typePropertyName; + private final String dataPropertyName; + + public CodecDeserializer( + String typePropertyName, + String dataPropertyName, + Function>> codecExtractor, + Function> classResolver) + { + this.classResolver = requireNonNull(classResolver, "classResolver is null"); + this.codecExtractor = requireNonNull(codecExtractor, "codecExtractor is null"); + this.typePropertyName = requireNonNull(typePropertyName, "typePropertyName is null"); + this.dataPropertyName = requireNonNull(dataPropertyName, "dataPropertyName is null"); + } + + @Override + public T deserialize(JsonParser parser, DeserializationContext context) + throws IOException + { + if (parser.getCurrentToken() == JsonToken.VALUE_NULL) { + return null; + } + + if (parser.getCurrentToken() != JsonToken.START_OBJECT) { + throw new IOException("Expected START_OBJECT, got " + parser.getCurrentToken()); + } + + // Parse the JSON tree + TreeNode tree = parser.readValueAsTree(); + + if (tree instanceof ObjectNode) { + ObjectNode node = (ObjectNode) tree; + + // Get the @type field + if (!node.has(typePropertyName)) { + throw new IOException("Missing " + typePropertyName + " field"); + } + String connectorIdString = node.get(typePropertyName).asText(); + // Check if @data field is present (binary serialization) + if (node.has(dataPropertyName)) { + // Binary data is present, we need a codec to deserialize it + // Special handling for internal handles like "$remote" + if (!connectorIdString.startsWith("$")) { + ConnectorId connectorId = new ConnectorId(connectorIdString); + Optional> codec = codecExtractor.apply(connectorId); + if (codec.isPresent()) { + String base64Data = node.get(dataPropertyName).asText(); + byte[] data = Base64.getDecoder().decode(base64Data); + return codec.get().deserialize(data); + } + } + // @data field present but no codec available or internal handle + throw new IOException("Type " + connectorIdString + " has binary data (" + dataPropertyName + " field) but no codec available to deserialize it"); + } + + // No @data field - use standard JSON deserialization + Class handleClass = classResolver.apply(connectorIdString); + + // Remove the @type field and deserialize the remaining content + node.remove(typePropertyName); + return context.readTreeAsValue(node, handleClass); + } + + throw new IOException("Unable to deserialize"); + } + + @Override + public T deserializeWithType(JsonParser p, DeserializationContext ctxt, + TypeDeserializer typeDeserializer) + throws IOException + { + // We handle the type ourselves + return deserialize(p, ctxt); + } + + @Override + public T deserializeWithType(JsonParser p, DeserializationContext ctxt, + TypeDeserializer typeDeserializer, T intoValue) + throws IOException + { + // We handle the type ourselves + return deserialize(p, ctxt); + } +} diff --git a/presto-main-base/src/main/java/com/facebook/presto/metadata/CodecSerializer.java b/presto-main-base/src/main/java/com/facebook/presto/metadata/CodecSerializer.java new file mode 100644 index 0000000000000..9948e95a9c323 --- /dev/null +++ b/presto-main-base/src/main/java/com/facebook/presto/metadata/CodecSerializer.java @@ -0,0 +1,123 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.metadata; + +import com.facebook.presto.spi.ConnectorCodec; +import com.facebook.presto.spi.ConnectorId; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JavaType; +import com.fasterxml.jackson.databind.JsonMappingException; +import com.fasterxml.jackson.databind.JsonSerializer; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.jsontype.TypeIdResolver; +import com.fasterxml.jackson.databind.jsontype.TypeSerializer; +import com.fasterxml.jackson.databind.jsontype.impl.AsPropertyTypeSerializer; +import com.fasterxml.jackson.databind.ser.BeanSerializerFactory; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; + +import java.io.IOException; +import java.util.Base64; +import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.function.Function; + +import static com.google.common.base.Throwables.throwIfInstanceOf; +import static java.util.Objects.requireNonNull; + +class CodecSerializer + extends JsonSerializer +{ + private final Function nameResolver; + private final Function>> codecExtractor; + private final TypeIdResolver typeResolver; + private final TypeSerializer typeSerializer; + private final Cache, JsonSerializer> serializerCache = CacheBuilder.newBuilder().build(); + private final String typePropertyName; + private final String dataPropertyName; + + public CodecSerializer( + String typePropertyName, + String dataPropertyName, + Function>> codecExtractor, + Function nameResolver, + TypeIdResolver typeIdResolver) + { + this.typePropertyName = requireNonNull(typePropertyName, "typePropertyName is null"); + this.dataPropertyName = requireNonNull(dataPropertyName, "dataPropertyName is null"); + this.nameResolver = requireNonNull(nameResolver, "nameResolver is null"); + this.codecExtractor = requireNonNull(codecExtractor, "codecExtractor is null"); + this.typeResolver = requireNonNull(typeIdResolver, "typeIdResolver is null"); + this.typeSerializer = new AsPropertyTypeSerializer(typeResolver, null, typePropertyName); + } + + @Override + public void serialize(T value, JsonGenerator jsonGenerator, SerializerProvider provider) + throws IOException + { + if (value == null) { + jsonGenerator.writeNull(); + return; + } + + String connectorIdString = nameResolver.apply(value); + + // Only try binary serialization for actual connectors (not internal handles like "$remote") + if (!connectorIdString.startsWith("$")) { + ConnectorId connectorId = new ConnectorId(connectorIdString); + + // Check if connector has a binary codec + Optional> codec = codecExtractor.apply(connectorId); + if (codec.isPresent()) { + // Use binary serialization with flat structure + jsonGenerator.writeStartObject(); + jsonGenerator.writeStringField(typePropertyName, connectorIdString); + byte[] data = codec.get().serialize(value); + jsonGenerator.writeStringField(dataPropertyName, Base64.getEncoder().encodeToString(data)); + jsonGenerator.writeEndObject(); + return; + } + } + + // Fall back to legacy typed JSON serialization + try { + Class type = value.getClass(); + JsonSerializer serializer = serializerCache.get(type, () -> createSerializer(provider, type)); + serializer.serializeWithType(value, jsonGenerator, provider, typeSerializer); + } + catch (ExecutionException e) { + Throwable cause = e.getCause(); + if (cause != null) { + throwIfInstanceOf(cause, IOException.class); + } + throw new RuntimeException(e); + } + } + + @SuppressWarnings("unchecked") + private static JsonSerializer createSerializer(SerializerProvider provider, Class type) + throws JsonMappingException + { + JavaType javaType = provider.constructType(type); + return BeanSerializerFactory.instance.createSerializer(provider, javaType); + } + + @Override + public void serializeWithType(T value, JsonGenerator gen, + SerializerProvider serializers, TypeSerializer typeSer) + throws IOException + { + serialize(value, gen, serializers); + } +} diff --git a/presto-main-base/src/main/java/com/facebook/presto/metadata/ColumnHandleJacksonModule.java b/presto-main-base/src/main/java/com/facebook/presto/metadata/ColumnHandleJacksonModule.java index 84db3f3344c81..26de084ff7bac 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/metadata/ColumnHandleJacksonModule.java +++ b/presto-main-base/src/main/java/com/facebook/presto/metadata/ColumnHandleJacksonModule.java @@ -13,17 +13,45 @@ */ package com.facebook.presto.metadata; +import com.facebook.presto.connector.ConnectorManager; import com.facebook.presto.spi.ColumnHandle; +import com.facebook.presto.spi.ConnectorCodec; +import com.facebook.presto.spi.ConnectorId; +import com.facebook.presto.spi.connector.ConnectorCodecProvider; +import com.facebook.presto.sql.analyzer.FeaturesConfig; import jakarta.inject.Inject; +import jakarta.inject.Provider; + +import java.util.Optional; +import java.util.function.Function; public class ColumnHandleJacksonModule extends AbstractTypedJacksonModule { @Inject - public ColumnHandleJacksonModule(HandleResolver handleResolver) + public ColumnHandleJacksonModule( + HandleResolver handleResolver, + Provider connectorManagerProvider, + FeaturesConfig featuresConfig) + { + super(ColumnHandle.class, + handleResolver::getId, + handleResolver::getColumnHandleClass, + featuresConfig.isUseConnectorProvidedSerializationCodecs(), + connectorId -> connectorManagerProvider.get() + .getConnectorCodecProvider(connectorId) + .flatMap(ConnectorCodecProvider::getColumnHandleCodec)); + } + + public ColumnHandleJacksonModule( + HandleResolver handleResolver, + FeaturesConfig featuresConfig, + Function>> codecExtractor) { super(ColumnHandle.class, handleResolver::getId, - handleResolver::getColumnHandleClass); + handleResolver::getColumnHandleClass, + featuresConfig.isUseConnectorProvidedSerializationCodecs(), + codecExtractor); } } diff --git a/presto-main-base/src/main/java/com/facebook/presto/metadata/DeleteTableHandleJacksonModule.java b/presto-main-base/src/main/java/com/facebook/presto/metadata/DeleteTableHandleJacksonModule.java index 09b71787994a5..5545f91d40037 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/metadata/DeleteTableHandleJacksonModule.java +++ b/presto-main-base/src/main/java/com/facebook/presto/metadata/DeleteTableHandleJacksonModule.java @@ -13,17 +13,45 @@ */ package com.facebook.presto.metadata; +import com.facebook.presto.connector.ConnectorManager; +import com.facebook.presto.spi.ConnectorCodec; import com.facebook.presto.spi.ConnectorDeleteTableHandle; +import com.facebook.presto.spi.ConnectorId; +import com.facebook.presto.spi.connector.ConnectorCodecProvider; +import com.facebook.presto.sql.analyzer.FeaturesConfig; import jakarta.inject.Inject; +import jakarta.inject.Provider; + +import java.util.Optional; +import java.util.function.Function; public class DeleteTableHandleJacksonModule extends AbstractTypedJacksonModule { @Inject - public DeleteTableHandleJacksonModule(HandleResolver handleResolver) + public DeleteTableHandleJacksonModule( + HandleResolver handleResolver, + Provider connectorManagerProvider, + FeaturesConfig featuresConfig) + { + super(ConnectorDeleteTableHandle.class, + handleResolver::getId, + handleResolver::getDeleteTableHandleClass, + featuresConfig.isUseConnectorProvidedSerializationCodecs(), + connectorId -> connectorManagerProvider.get() + .getConnectorCodecProvider(connectorId) + .flatMap(ConnectorCodecProvider::getConnectorDeleteTableHandleCodec)); + } + + public DeleteTableHandleJacksonModule( + HandleResolver handleResolver, + FeaturesConfig featuresConfig, + Function>> codecExtractor) { super(ConnectorDeleteTableHandle.class, handleResolver::getId, - handleResolver::getDeleteTableHandleClass); + handleResolver::getDeleteTableHandleClass, + featuresConfig.isUseConnectorProvidedSerializationCodecs(), + codecExtractor); } } diff --git a/presto-main-base/src/main/java/com/facebook/presto/metadata/FunctionHandleJacksonModule.java b/presto-main-base/src/main/java/com/facebook/presto/metadata/FunctionHandleJacksonModule.java index a87335daab43e..ef7b7529c76d8 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/metadata/FunctionHandleJacksonModule.java +++ b/presto-main-base/src/main/java/com/facebook/presto/metadata/FunctionHandleJacksonModule.java @@ -22,6 +22,13 @@ public class FunctionHandleJacksonModule @Inject public FunctionHandleJacksonModule(HandleResolver handleResolver) { - super(FunctionHandle.class, handleResolver::getId, handleResolver::getFunctionHandleClass); + // Functions are internal to Presto and don't need binary serialization + super(FunctionHandle.class, + handleResolver::getId, + handleResolver::getFunctionHandleClass, + false, // Always disabled for functions + connectorId -> { + throw new UnsupportedOperationException("Function handles do not support binary serialization"); + }); } } diff --git a/presto-main-base/src/main/java/com/facebook/presto/metadata/InsertTableHandleJacksonModule.java b/presto-main-base/src/main/java/com/facebook/presto/metadata/InsertTableHandleJacksonModule.java index 6a83f7e1d2ef6..5eebd9311d4bf 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/metadata/InsertTableHandleJacksonModule.java +++ b/presto-main-base/src/main/java/com/facebook/presto/metadata/InsertTableHandleJacksonModule.java @@ -13,17 +13,45 @@ */ package com.facebook.presto.metadata; +import com.facebook.presto.connector.ConnectorManager; +import com.facebook.presto.spi.ConnectorCodec; +import com.facebook.presto.spi.ConnectorId; import com.facebook.presto.spi.ConnectorInsertTableHandle; +import com.facebook.presto.spi.connector.ConnectorCodecProvider; +import com.facebook.presto.sql.analyzer.FeaturesConfig; import jakarta.inject.Inject; +import jakarta.inject.Provider; + +import java.util.Optional; +import java.util.function.Function; public class InsertTableHandleJacksonModule extends AbstractTypedJacksonModule { @Inject - public InsertTableHandleJacksonModule(HandleResolver handleResolver) + public InsertTableHandleJacksonModule( + HandleResolver handleResolver, + Provider connectorManagerProvider, + FeaturesConfig featuresConfig) + { + super(ConnectorInsertTableHandle.class, + handleResolver::getId, + handleResolver::getInsertTableHandleClass, + featuresConfig.isUseConnectorProvidedSerializationCodecs(), + connectorId -> connectorManagerProvider.get() + .getConnectorCodecProvider(connectorId) + .flatMap(ConnectorCodecProvider::getConnectorInsertTableHandleCodec)); + } + + public InsertTableHandleJacksonModule( + HandleResolver handleResolver, + FeaturesConfig featuresConfig, + Function>> codecExtractor) { super(ConnectorInsertTableHandle.class, handleResolver::getId, - handleResolver::getInsertTableHandleClass); + handleResolver::getInsertTableHandleClass, + featuresConfig.isUseConnectorProvidedSerializationCodecs(), + codecExtractor); } } diff --git a/presto-main-base/src/main/java/com/facebook/presto/metadata/OutputTableHandleJacksonModule.java b/presto-main-base/src/main/java/com/facebook/presto/metadata/OutputTableHandleJacksonModule.java index ad04c5b7e834d..c2701ef082f90 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/metadata/OutputTableHandleJacksonModule.java +++ b/presto-main-base/src/main/java/com/facebook/presto/metadata/OutputTableHandleJacksonModule.java @@ -13,17 +13,45 @@ */ package com.facebook.presto.metadata; +import com.facebook.presto.connector.ConnectorManager; +import com.facebook.presto.spi.ConnectorCodec; +import com.facebook.presto.spi.ConnectorId; import com.facebook.presto.spi.ConnectorOutputTableHandle; +import com.facebook.presto.spi.connector.ConnectorCodecProvider; +import com.facebook.presto.sql.analyzer.FeaturesConfig; import jakarta.inject.Inject; +import jakarta.inject.Provider; + +import java.util.Optional; +import java.util.function.Function; public class OutputTableHandleJacksonModule extends AbstractTypedJacksonModule { @Inject - public OutputTableHandleJacksonModule(HandleResolver handleResolver) + public OutputTableHandleJacksonModule( + HandleResolver handleResolver, + Provider connectorManagerProvider, + FeaturesConfig featuresConfig) + { + super(ConnectorOutputTableHandle.class, + handleResolver::getId, + handleResolver::getOutputTableHandleClass, + featuresConfig.isUseConnectorProvidedSerializationCodecs(), + connectorId -> connectorManagerProvider.get() + .getConnectorCodecProvider(connectorId) + .flatMap(ConnectorCodecProvider::getConnectorOutputTableHandleCodec)); + } + + public OutputTableHandleJacksonModule( + HandleResolver handleResolver, + FeaturesConfig featuresConfig, + Function>> codecExtractor) { super(ConnectorOutputTableHandle.class, handleResolver::getId, - handleResolver::getOutputTableHandleClass); + handleResolver::getOutputTableHandleClass, + featuresConfig.isUseConnectorProvidedSerializationCodecs(), + codecExtractor); } } diff --git a/presto-main-base/src/main/java/com/facebook/presto/metadata/PartitioningHandleJacksonModule.java b/presto-main-base/src/main/java/com/facebook/presto/metadata/PartitioningHandleJacksonModule.java index ca876d872ff32..f26221b19040c 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/metadata/PartitioningHandleJacksonModule.java +++ b/presto-main-base/src/main/java/com/facebook/presto/metadata/PartitioningHandleJacksonModule.java @@ -13,17 +13,45 @@ */ package com.facebook.presto.metadata; +import com.facebook.presto.connector.ConnectorManager; +import com.facebook.presto.spi.ConnectorCodec; +import com.facebook.presto.spi.ConnectorId; +import com.facebook.presto.spi.connector.ConnectorCodecProvider; import com.facebook.presto.spi.connector.ConnectorPartitioningHandle; +import com.facebook.presto.sql.analyzer.FeaturesConfig; import jakarta.inject.Inject; +import jakarta.inject.Provider; + +import java.util.Optional; +import java.util.function.Function; public class PartitioningHandleJacksonModule extends AbstractTypedJacksonModule { @Inject - public PartitioningHandleJacksonModule(HandleResolver handleResolver) + public PartitioningHandleJacksonModule( + HandleResolver handleResolver, + Provider connectorManagerProvider, + FeaturesConfig featuresConfig) + { + super(ConnectorPartitioningHandle.class, + handleResolver::getId, + handleResolver::getPartitioningHandleClass, + featuresConfig.isUseConnectorProvidedSerializationCodecs(), + connectorId -> connectorManagerProvider.get() + .getConnectorCodecProvider(connectorId) + .flatMap(ConnectorCodecProvider::getConnectorPartitioningHandleCodec)); + } + + public PartitioningHandleJacksonModule( + HandleResolver handleResolver, + FeaturesConfig featuresConfig, + Function>> codecExtractor) { super(ConnectorPartitioningHandle.class, handleResolver::getId, - handleResolver::getPartitioningHandleClass); + handleResolver::getPartitioningHandleClass, + featuresConfig.isUseConnectorProvidedSerializationCodecs(), + codecExtractor); } } diff --git a/presto-main-base/src/main/java/com/facebook/presto/metadata/SplitJacksonModule.java b/presto-main-base/src/main/java/com/facebook/presto/metadata/SplitJacksonModule.java index 858f0a6c1fbbc..5d950534e4f5c 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/metadata/SplitJacksonModule.java +++ b/presto-main-base/src/main/java/com/facebook/presto/metadata/SplitJacksonModule.java @@ -13,17 +13,49 @@ */ package com.facebook.presto.metadata; +import com.facebook.presto.connector.ConnectorManager; +import com.facebook.presto.spi.ConnectorCodec; +import com.facebook.presto.spi.ConnectorId; import com.facebook.presto.spi.ConnectorSplit; +import com.facebook.presto.spi.connector.ConnectorCodecProvider; +import com.facebook.presto.sql.analyzer.FeaturesConfig; import jakarta.inject.Inject; +import jakarta.inject.Provider; + +import java.util.Optional; +import java.util.function.Function; public class SplitJacksonModule extends AbstractTypedJacksonModule { @Inject - public SplitJacksonModule(HandleResolver handleResolver) + public SplitJacksonModule( + HandleResolver handleResolver, + Provider connectorManagerProvider, + FeaturesConfig featuresConfig) + { + super(ConnectorSplit.class, + handleResolver::getId, + handleResolver::getSplitClass, + featuresConfig.isUseConnectorProvidedSerializationCodecs(), + connectorId -> connectorManagerProvider.get() + .getConnectorCodecProvider(connectorId) + .flatMap(ConnectorCodecProvider::getConnectorSplitCodec)); + } + + /** + * Test-friendly constructor that accepts a codec extractor function directly, + * avoiding the need to create a full ConnectorManager with all its dependencies. + */ + public SplitJacksonModule( + HandleResolver handleResolver, + FeaturesConfig featuresConfig, + Function>> codecExtractor) { super(ConnectorSplit.class, handleResolver::getId, - handleResolver::getSplitClass); + handleResolver::getSplitClass, + featuresConfig.isUseConnectorProvidedSerializationCodecs(), + codecExtractor); } } diff --git a/presto-main-base/src/main/java/com/facebook/presto/metadata/TableHandleJacksonModule.java b/presto-main-base/src/main/java/com/facebook/presto/metadata/TableHandleJacksonModule.java index 9981704af9fd9..3dc4c534c2c39 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/metadata/TableHandleJacksonModule.java +++ b/presto-main-base/src/main/java/com/facebook/presto/metadata/TableHandleJacksonModule.java @@ -13,17 +13,45 @@ */ package com.facebook.presto.metadata; +import com.facebook.presto.connector.ConnectorManager; +import com.facebook.presto.spi.ConnectorCodec; +import com.facebook.presto.spi.ConnectorId; import com.facebook.presto.spi.ConnectorTableHandle; +import com.facebook.presto.spi.connector.ConnectorCodecProvider; +import com.facebook.presto.sql.analyzer.FeaturesConfig; import jakarta.inject.Inject; +import jakarta.inject.Provider; + +import java.util.Optional; +import java.util.function.Function; public class TableHandleJacksonModule extends AbstractTypedJacksonModule { @Inject - public TableHandleJacksonModule(HandleResolver handleResolver) + public TableHandleJacksonModule( + HandleResolver handleResolver, + Provider connectorManagerProvider, + FeaturesConfig featuresConfig) + { + super(ConnectorTableHandle.class, + handleResolver::getId, + handleResolver::getTableHandleClass, + featuresConfig.isUseConnectorProvidedSerializationCodecs(), + connectorId -> connectorManagerProvider.get() + .getConnectorCodecProvider(connectorId) + .flatMap(ConnectorCodecProvider::getConnectorTableHandleCodec)); + } + + public TableHandleJacksonModule( + HandleResolver handleResolver, + FeaturesConfig featuresConfig, + Function>> codecExtractor) { super(ConnectorTableHandle.class, handleResolver::getId, - handleResolver::getTableHandleClass); + handleResolver::getTableHandleClass, + featuresConfig.isUseConnectorProvidedSerializationCodecs(), + codecExtractor); } } diff --git a/presto-main-base/src/main/java/com/facebook/presto/metadata/TableLayoutHandleJacksonModule.java b/presto-main-base/src/main/java/com/facebook/presto/metadata/TableLayoutHandleJacksonModule.java index bfbced02d0c38..505c65e42d4d0 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/metadata/TableLayoutHandleJacksonModule.java +++ b/presto-main-base/src/main/java/com/facebook/presto/metadata/TableLayoutHandleJacksonModule.java @@ -13,17 +13,45 @@ */ package com.facebook.presto.metadata; +import com.facebook.presto.connector.ConnectorManager; +import com.facebook.presto.spi.ConnectorCodec; +import com.facebook.presto.spi.ConnectorId; import com.facebook.presto.spi.ConnectorTableLayoutHandle; +import com.facebook.presto.spi.connector.ConnectorCodecProvider; +import com.facebook.presto.sql.analyzer.FeaturesConfig; import jakarta.inject.Inject; +import jakarta.inject.Provider; + +import java.util.Optional; +import java.util.function.Function; public class TableLayoutHandleJacksonModule extends AbstractTypedJacksonModule { @Inject - public TableLayoutHandleJacksonModule(HandleResolver handleResolver) + public TableLayoutHandleJacksonModule( + HandleResolver handleResolver, + Provider connectorManagerProvider, + FeaturesConfig featuresConfig) + { + super(ConnectorTableLayoutHandle.class, + handleResolver::getId, + handleResolver::getTableLayoutHandleClass, + featuresConfig.isUseConnectorProvidedSerializationCodecs(), + connectorId -> connectorManagerProvider.get() + .getConnectorCodecProvider(connectorId) + .flatMap(ConnectorCodecProvider::getConnectorTableLayoutHandleCodec)); + } + + public TableLayoutHandleJacksonModule( + HandleResolver handleResolver, + FeaturesConfig featuresConfig, + Function>> codecExtractor) { super(ConnectorTableLayoutHandle.class, handleResolver::getId, - handleResolver::getTableLayoutHandleClass); + handleResolver::getTableLayoutHandleClass, + featuresConfig.isUseConnectorProvidedSerializationCodecs(), + codecExtractor); } } diff --git a/presto-main-base/src/main/java/com/facebook/presto/metadata/TransactionHandleJacksonModule.java b/presto-main-base/src/main/java/com/facebook/presto/metadata/TransactionHandleJacksonModule.java index 230d6be16a1b0..fe650be2ad7e7 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/metadata/TransactionHandleJacksonModule.java +++ b/presto-main-base/src/main/java/com/facebook/presto/metadata/TransactionHandleJacksonModule.java @@ -13,17 +13,45 @@ */ package com.facebook.presto.metadata; +import com.facebook.presto.connector.ConnectorManager; +import com.facebook.presto.spi.ConnectorCodec; +import com.facebook.presto.spi.ConnectorId; +import com.facebook.presto.spi.connector.ConnectorCodecProvider; import com.facebook.presto.spi.connector.ConnectorTransactionHandle; +import com.facebook.presto.sql.analyzer.FeaturesConfig; import jakarta.inject.Inject; +import jakarta.inject.Provider; + +import java.util.Optional; +import java.util.function.Function; public class TransactionHandleJacksonModule extends AbstractTypedJacksonModule { @Inject - public TransactionHandleJacksonModule(HandleResolver handleResolver) + public TransactionHandleJacksonModule( + HandleResolver handleResolver, + Provider connectorManagerProvider, + FeaturesConfig featuresConfig) + { + super(ConnectorTransactionHandle.class, + handleResolver::getId, + handleResolver::getTransactionHandleClass, + featuresConfig.isUseConnectorProvidedSerializationCodecs(), + connectorId -> connectorManagerProvider.get() + .getConnectorCodecProvider(connectorId) + .flatMap(ConnectorCodecProvider::getConnectorTransactionHandleCodec)); + } + + public TransactionHandleJacksonModule( + HandleResolver handleResolver, + FeaturesConfig featuresConfig, + Function>> codecExtractor) { super(ConnectorTransactionHandle.class, handleResolver::getId, - handleResolver::getTransactionHandleClass); + handleResolver::getTransactionHandleClass, + featuresConfig.isUseConnectorProvidedSerializationCodecs(), + codecExtractor); } } diff --git a/presto-main-base/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java b/presto-main-base/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java index 1cc73d325053e..8c2983ba9a9b2 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java +++ b/presto-main-base/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java @@ -192,6 +192,7 @@ public class FeaturesConfig private boolean listBuiltInFunctionsOnly = true; private boolean experimentalFunctionsEnabled; + private boolean useConnectorProvidedSerializationCodecs; private boolean optimizeCommonSubExpressions = true; private boolean preferDistributedUnion = true; private boolean optimizeNullsInJoin; @@ -1845,6 +1846,19 @@ public FeaturesConfig setExperimentalFunctionsEnabled(boolean experimentalFuncti return this; } + public boolean isUseConnectorProvidedSerializationCodecs() + { + return useConnectorProvidedSerializationCodecs; + } + + @Config("use-connector-provided-serialization-codecs") + @ConfigDescription("Enable use of custom connector-provided serialization codecs for handles") + public FeaturesConfig setUseConnectorProvidedSerializationCodecs(boolean useConnectorProvidedSerializationCodecs) + { + this.useConnectorProvidedSerializationCodecs = useConnectorProvidedSerializationCodecs; + return this; + } + public boolean isOptimizeCommonSubExpressions() { return optimizeCommonSubExpressions; diff --git a/presto-main-base/src/test/java/com/facebook/presto/catalogserver/TestCatalogServerResponse.java b/presto-main-base/src/test/java/com/facebook/presto/catalogserver/TestCatalogServerResponse.java index f677e5b210bba..85b7fd0822afc 100644 --- a/presto-main-base/src/test/java/com/facebook/presto/catalogserver/TestCatalogServerResponse.java +++ b/presto-main-base/src/test/java/com/facebook/presto/catalogserver/TestCatalogServerResponse.java @@ -14,8 +14,10 @@ package com.facebook.presto.catalogserver; import com.facebook.airlift.json.JsonModule; +import com.facebook.drift.codec.guice.ThriftCodecModule; import com.facebook.presto.common.QualifiedObjectName; import com.facebook.presto.common.transaction.TransactionId; +import com.facebook.presto.connector.ConnectorManager; import com.facebook.presto.connector.informationSchema.InformationSchemaTableHandle; import com.facebook.presto.connector.informationSchema.InformationSchemaTransactionHandle; import com.facebook.presto.metadata.HandleJsonModule; @@ -30,6 +32,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.inject.Guice; import com.google.inject.Injector; +import com.google.inject.Scopes; import org.testng.annotations.BeforeTest; import org.testng.annotations.Test; @@ -52,7 +55,11 @@ public class TestCatalogServerResponse public void setup() { this.testingCatalogServerClient = new TestingCatalogServerClient(); - Injector injector = Guice.createInjector(new JsonModule(), new HandleJsonModule()); + Injector injector = Guice.createInjector(new JsonModule(), binder -> { + binder.install(new HandleJsonModule()); + binder.bind(ConnectorManager.class).toProvider(() -> null).in(Scopes.SINGLETON); + binder.install(new ThriftCodecModule()); + }); this.objectMapper = injector.getInstance(ObjectMapper.class); } diff --git a/presto-main-base/src/test/java/com/facebook/presto/execution/TestQueryInfo.java b/presto-main-base/src/test/java/com/facebook/presto/execution/TestQueryInfo.java index 5bc55f203713b..9fbcf2766a218 100644 --- a/presto-main-base/src/test/java/com/facebook/presto/execution/TestQueryInfo.java +++ b/presto-main-base/src/test/java/com/facebook/presto/execution/TestQueryInfo.java @@ -16,11 +16,13 @@ import com.facebook.airlift.bootstrap.Bootstrap; import com.facebook.airlift.json.JsonCodec; import com.facebook.airlift.json.JsonModule; +import com.facebook.drift.codec.guice.ThriftCodecModule; import com.facebook.presto.common.plan.PlanCanonicalizationStrategy; import com.facebook.presto.common.resourceGroups.QueryType; import com.facebook.presto.common.transaction.TransactionId; import com.facebook.presto.common.type.Type; import com.facebook.presto.common.type.TypeManager; +import com.facebook.presto.connector.ConnectorManager; import com.facebook.presto.cost.StatsAndCosts; import com.facebook.presto.metadata.FunctionAndTypeManager; import com.facebook.presto.metadata.HandleJsonModule; @@ -132,10 +134,12 @@ private static JsonCodec createJsonCodec() SqlParser sqlParser = new SqlParser(); FunctionAndTypeManager functionAndTypeManager = createTestFunctionAndTypeManager(); binder.install(new JsonModule()); + binder.install(new ThriftCodecModule()); binder.install(new HandleJsonModule()); binder.bind(SqlParser.class).toInstance(sqlParser); binder.bind(TypeManager.class).toInstance(functionAndTypeManager); configBinder(binder).bindConfig(FeaturesConfig.class); + binder.bind(ConnectorManager.class).toProvider(() -> null); newSetBinder(binder, Type.class); jsonBinder(binder).addSerializerBinding(Slice.class).to(SliceSerializer.class); jsonBinder(binder).addDeserializerBinding(Slice.class).to(SliceDeserializer.class); diff --git a/presto-main-base/src/test/java/com/facebook/presto/metadata/TestAbstractTypedJacksonModule.java b/presto-main-base/src/test/java/com/facebook/presto/metadata/TestAbstractTypedJacksonModule.java new file mode 100644 index 0000000000000..1e38f65c017bd --- /dev/null +++ b/presto-main-base/src/test/java/com/facebook/presto/metadata/TestAbstractTypedJacksonModule.java @@ -0,0 +1,600 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.metadata; + +import com.facebook.airlift.json.JsonModule; +import com.facebook.presto.spi.ConnectorCodec; +import com.facebook.presto.spi.ConnectorId; +import com.facebook.presto.spi.HostAddress; +import com.facebook.presto.spi.NodeProvider; +import com.facebook.presto.spi.connector.ConnectorCodecProvider; +import com.facebook.presto.spi.schedule.NodeSelectionStrategy; +import com.facebook.presto.sql.analyzer.FeaturesConfig; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.google.inject.Module; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.util.Base64; +import java.util.List; +import java.util.Objects; +import java.util.Optional; + +import static com.facebook.airlift.json.JsonBinder.jsonBinder; +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNull; + +@Test(singleThreaded = true) +public class TestAbstractTypedJacksonModule +{ + private ObjectMapper objectMapper; + @BeforeMethod + public void setup() + { + // Default setup with binary serialization disabled + setupInjector(false, null); + } + + private void setupInjector(boolean binarySerializationEnabled, ConnectorCodecProvider codecProvider) + { + Module testModule = binder -> { + binder.install(new JsonModule()); + + // Configure FeaturesConfig + FeaturesConfig featuresConfig = new FeaturesConfig(); + featuresConfig.setUseConnectorProvidedSerializationCodecs(binarySerializationEnabled); + binder.bind(FeaturesConfig.class).toInstance(featuresConfig); + + // Bind HandleResolver + binder.bind(HandleResolver.class).toInstance(new TestHandleResolver()); + + // Bind TestConnectorManager as a singleton + TestConnectorManager testConnectorManager = new TestConnectorManager(codecProvider); + binder.bind(TestConnectorManager.class).toInstance(testConnectorManager); + + // Register the test Jackson module + jsonBinder(binder).addModuleBinding().to(TestHandleJacksonModule.class); + }; + + Injector injector = Guice.createInjector(testModule); + objectMapper = injector.getInstance(ObjectMapper.class); + } + + @Test + public void testLegacyJsonSerializationWithoutCodec() + throws Exception + { + // Setup with binary serialization disabled + setupInjector(false, null); + + TestHandle original = new TestHandle("connector1", "value1", 42); + String json = objectMapper.writeValueAsString(original); + + // Should have @type field but no binary data + assertJsonContains(json, "@type", "connector1"); + assertJsonContains(json, "id", "value1"); + assertJsonContains(json, "count", "42"); + assertJsonNotContains(json, "customSerializedValue"); + } + + @Test + public void testBinarySerializationWithCodec() + throws Exception + { + // Create a simple codec that serializes to a custom format + ConnectorCodec codec = new SimpleCodec(); + + // Setup with binary serialization enabled and codec provider + ConnectorCodecProvider codecProvider = new ConnectorCodecProvider() + { + @Override + public Optional> getConnectorTableHandleCodec() + { + return Optional.of((ConnectorCodec) codec); + } + }; + setupInjector(true, codecProvider); + + TestHandle original = new TestHandle("connector1", "value1", 42); + String json = objectMapper.writeValueAsString(original); + + // Should have @type and binary data fields + assertJsonContains(json, "@type", "connector1"); + assertJsonContains(json, "customSerializedValue"); + assertJsonNotContains(json, "id", "value1"); // Should not have regular fields + + // Test deserialization + TestHandle deserialized = objectMapper.readValue(json, TestHandle.class); + assertEquals(deserialized.getId(), original.getId()); + assertEquals(deserialized.getCount(), original.getCount()); + } + + @Test + public void testBinarySerializationDisabled() + throws Exception + { + // This test verifies that when binary serialization is disabled via the feature flag, + // the module falls back to legacy JSON serialization even if codecs are available + + // Setup with binary serialization disabled even though codec is available + ConnectorCodec codec = new SimpleCodec(); + ConnectorCodecProvider codecProvider = new ConnectorCodecProvider() + { + @Override + public Optional> getConnectorTableHandleCodec() + { + return Optional.of((ConnectorCodec) codec); + } + }; + setupInjector(false, codecProvider); // false = binary serialization disabled + + TestHandle original = new TestHandle("connector1", "value1", 42); + String json = objectMapper.writeValueAsString(original); + + // Should use legacy JSON serialization even though codec is available + assertJsonContains(json, "@type", "connector1"); + assertJsonContains(json, "id", "value1"); + assertJsonContains(json, "count", "42"); + assertJsonNotContains(json, "customSerializedValue"); + } + + @Test + public void testFallbackToJsonWhenNoCodec() + throws Exception + { + // Setup with binary serialization enabled but no codec available + setupInjector(true, null); + + // Test with connector2 (no codec available) + TestHandle original = new TestHandle("connector2", "value2", 84); + String json = objectMapper.writeValueAsString(original); + + // Should fall back to JSON serialization + assertJsonContains(json, "@type", "connector2"); + assertJsonContains(json, "id", "value2"); + assertJsonContains(json, "count", "84"); + assertJsonNotContains(json, "customSerializedValue"); + } + + @Test + public void testInternalHandlesAlwaysUseJson() + throws Exception + { + // Setup with codec that would handle all connectors + ConnectorCodec codec = new SimpleCodec(); + ConnectorCodecProvider codecProvider = new ConnectorCodecProvider() + { + @Override + public Optional> getConnectorTableHandleCodec() + { + return Optional.of((ConnectorCodec) codec); + } + }; + setupInjector(true, codecProvider); + + // Test with internal handle (starts with $) + TestHandle original = new TestHandle("$remote", "internal", 99); + String json = objectMapper.writeValueAsString(original); + + // Should use JSON serialization for internal handles + assertJsonContains(json, "@type", "$remote"); + assertJsonContains(json, "id", "internal"); + assertJsonContains(json, "count", "99"); + assertJsonNotContains(json, "customSerializedValue"); + } + + @Test + public void testNullValueSerialization() + throws Exception + { + setupInjector(false, null); + + String json = objectMapper.writeValueAsString(null); + assertEquals(json, "null"); + + TestHandle deserialized = objectMapper.readValue("null", TestHandle.class); + assertNull(deserialized); + } + + @Test + public void testRoundTripWithMixedHandles() + throws Exception + { + // Create a TestConnectorManager that only provides codec for "binary-connector" + setupInjector(true, new SelectiveCodecProvider("binary-connector")); + + // Test multiple handles with different serialization methods + TestHandle[] handles = new TestHandle[] { + new TestHandle("binary-connector", "binary1", 1), + new TestHandle("json-connector", "json1", 2), + new TestHandle("$internal", "internal1", 3), + new TestHandle("binary-connector", "binary2", 4), + }; + + for (TestHandle original : handles) { + String json = objectMapper.writeValueAsString(original); + + // Verify serialization format based on handle type + if (original.getConnectorId().equals("binary-connector")) { + // Should use binary serialization + assertJsonContains(json, "customSerializedValue"); + assertJsonNotContains(json, "\"id\":"); + + // Test deserialization for binary-serialized handles + TestHandle deserialized = objectMapper.readValue(json, TestHandle.class); + assertEquals(deserialized.getId(), original.getId()); + assertEquals(deserialized.getCount(), original.getCount()); + } + else { + // Should use JSON serialization + assertJsonNotContains(json, "customSerializedValue"); + assertJsonContains(json, "id", original.getId()); + } + } + } + + @Test + public void testDirectBinaryDataDeserialization() + throws Exception + { + // Test deserialization of manually crafted binary data JSON + ConnectorCodec codec = new SimpleCodec(); + ConnectorCodecProvider codecProvider = new ConnectorCodecProvider() + { + @Override + public Optional> getConnectorTableHandleCodec() + { + return Optional.of((ConnectorCodec) codec); + } + }; + setupInjector(true, codecProvider); + + // Manually create JSON with binary data + String encodedData = Base64.getEncoder().encodeToString("connector1|testValue|999".getBytes(UTF_8)); + String json = String.format("{\"@type\":\"connector1\",\"customSerializedValue\":\"%s\"}", encodedData); + + // Deserialize + TestHandle deserialized = objectMapper.readValue(json, TestHandle.class); + assertEquals(deserialized.getConnectorId(), "connector1"); + assertEquals(deserialized.getId(), "testValue"); + assertEquals(deserialized.getCount(), 999); + } + + @Test + public void testMixedSerializationRoundTrip() + throws Exception + { + // Test that we can serialize and deserialize a mix of binary and JSON in sequence + setupInjector(true, new SelectiveCodecProvider("binary-connector")); + + // Create handles with different serialization methods + TestHandle binaryHandle = new TestHandle("binary-connector", "binary-data", 100); + TestHandle jsonHandle = new TestHandle("json-connector", "json-data", 200); + + // Serialize both + String binaryJson = objectMapper.writeValueAsString(binaryHandle); + String jsonJson = objectMapper.writeValueAsString(jsonHandle); + + // Deserialize both + TestHandle deserializedBinary = objectMapper.readValue(binaryJson, TestHandle.class); + // For JSON deserialization, we skip due to complex type handling in isolated tests + + // Verify binary deserialization worked + assertEquals(deserializedBinary.getId(), binaryHandle.getId()); + assertEquals(deserializedBinary.getCount(), binaryHandle.getCount()); + + // Verify JSON format is correct (even if we can't deserialize in this test) + assertJsonContains(jsonJson, "\"id\":\"json-data\""); + assertJsonContains(jsonJson, "\"count\":200"); + } + + private void assertJsonContains(String json, String... values) + { + for (String value : values) { + if (!json.contains(value)) { + throw new AssertionError("JSON does not contain: " + value + "\nJSON: " + json); + } + } + } + + private void assertJsonNotContains(String json, String... values) + { + for (String value : values) { + if (json.contains(value)) { + throw new AssertionError("JSON should not contain: " + value + "\nJSON: " + json); + } + } + } + + // Simple codec implementation for testing + private static class SimpleCodec + implements ConnectorCodec + { + @Override + public byte[] serialize(TestHandle value) + { + return String.format("%s|%s|%d", value.getConnectorId(), value.getId(), value.getCount()).getBytes(UTF_8); + } + + @Override + public TestHandle deserialize(byte[] data) + { + String[] parts = new String(data, UTF_8).split("\\|"); + return new TestHandle(parts[0], parts[1], Integer.parseInt(parts[2])); + } + } + + // Codec provider that only provides codec for specific connectors + private static class SelectiveCodecProvider + implements ConnectorCodecProvider + { + private final String connectorIdWithCodec; + private final ConnectorCodec codec = new SimpleCodec(); + + public SelectiveCodecProvider(String connectorIdWithCodec) + { + this.connectorIdWithCodec = connectorIdWithCodec; + } + + @Override + public Optional> getConnectorTableHandleCodec() + { + return Optional.of((ConnectorCodec) codec); + } + } + + // Test handle that implements multiple connector interfaces for testing + public static class TestHandle + implements com.facebook.presto.spi.ConnectorTableHandle, + com.facebook.presto.spi.ConnectorSplit, + com.facebook.presto.spi.ColumnHandle, + com.facebook.presto.spi.ConnectorTableLayoutHandle, + com.facebook.presto.spi.ConnectorOutputTableHandle, + com.facebook.presto.spi.ConnectorInsertTableHandle, + com.facebook.presto.spi.ConnectorDeleteTableHandle, + com.facebook.presto.spi.ConnectorIndexHandle, + com.facebook.presto.spi.connector.ConnectorPartitioningHandle, + com.facebook.presto.spi.connector.ConnectorTransactionHandle + { + private final String connectorId; + private final String id; + private final int count; + + // Constructor for programmatic creation + public TestHandle(String connectorId, String id, int count) + { + this.connectorId = connectorId; + this.id = id; + this.count = count; + } + + // Constructor for Jackson deserialization + @JsonCreator + public TestHandle( + @JsonProperty("id") String id, + @JsonProperty("count") int count) + { + // When deserializing, the connector ID is determined by the @type field + // For simplicity in tests, we use a fixed value + this("deserialized", id, count); + } + + // This field is excluded from JSON serialization but used internally for type resolution + @JsonIgnore + public String getConnectorId() + { + return connectorId; + } + + @JsonProperty + public String getId() + { + return id; + } + + @JsonProperty + public int getCount() + { + return count; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TestHandle that = (TestHandle) o; + return count == that.count && + Objects.equals(connectorId, that.connectorId) && + Objects.equals(id, that.id); + } + + @Override + public int hashCode() + { + return Objects.hash(connectorId, id, count); + } + + @Override + public String toString() + { + return "TestHandle{" + + "connectorId='" + connectorId + '\'' + + ", id='" + id + '\'' + + ", count=" + count + + '}'; + } + + @Override + public NodeSelectionStrategy getNodeSelectionStrategy() + { + return null; + } + + @Override + public List getPreferredNodes(NodeProvider nodeProvider) + { + return ImmutableList.of(); + } + + @Override + public Object getInfo() + { + return null; + } + } + + // Test ConnectorHandleResolver implementation + private static class TestConnectorHandleResolver + implements com.facebook.presto.spi.ConnectorHandleResolver + { + @Override + public Class getTableHandleClass() + { + return TestHandle.class; + } + + @Override + public Class getTableLayoutHandleClass() + { + return TestHandle.class; + } + + @Override + public Class getColumnHandleClass() + { + return TestHandle.class; + } + + @Override + public Class getSplitClass() + { + return TestHandle.class; + } + + @Override + public Class getIndexHandleClass() + { + return TestHandle.class; + } + + @Override + public Class getOutputTableHandleClass() + { + return TestHandle.class; + } + + @Override + public Class getInsertTableHandleClass() + { + return TestHandle.class; + } + + @Override + public Class getDeleteTableHandleClass() + { + return TestHandle.class; + } + + @Override + public Class getPartitioningHandleClass() + { + return TestHandle.class; + } + + @Override + public Class getTransactionHandleClass() + { + return TestHandle.class; + } + } + + // Test HandleResolver implementation + private static class TestHandleResolver + extends HandleResolver + { + public TestHandleResolver() + { + super(); + // Register the test handle resolver for all test connectors + TestConnectorHandleResolver resolver = new TestConnectorHandleResolver(); + addConnectorName("connector1", resolver); + addConnectorName("connector2", resolver); + addConnectorName("binary-connector", resolver); + addConnectorName("json-connector", resolver); + addConnectorName("$internal", resolver); + addConnectorName("deserialized", resolver); + } + } + + // Mock ConnectorManager implementation + private static class TestConnectorManager + { + private final ConnectorCodecProvider codecProvider; + + public TestConnectorManager(ConnectorCodecProvider codecProvider) + { + this.codecProvider = codecProvider; + } + + public Optional getConnectorCodecProvider(ConnectorId connectorId) + { + // Only return codec provider for specific connectors if it's a SelectiveCodecProvider + if (codecProvider instanceof SelectiveCodecProvider) { + SelectiveCodecProvider selective = (SelectiveCodecProvider) codecProvider; + if (connectorId.getCatalogName().equals(selective.connectorIdWithCodec)) { + return Optional.of(codecProvider); + } + return Optional.empty(); + } + return Optional.ofNullable(codecProvider); + } + } + + // Test Jackson module that uses TestHandle + public static class TestHandleJacksonModule + extends AbstractTypedJacksonModule + { + @jakarta.inject.Inject + public TestHandleJacksonModule( + HandleResolver handleResolver, + TestConnectorManager testConnectorManager, + FeaturesConfig featuresConfig) + { + super(TestHandle.class, + TestHandle::getConnectorId, + id -> TestHandle.class, + featuresConfig.isUseConnectorProvidedSerializationCodecs(), + connectorId -> testConnectorManager + .getConnectorCodecProvider(connectorId) + .flatMap(provider -> { + Optional> codec = + provider.getConnectorTableHandleCodec(); + // Cast is safe because TestHandle implements ConnectorTableHandle + return (Optional>) (Optional) codec; + })); + } + } +} diff --git a/presto-main-base/src/test/java/com/facebook/presto/metadata/TestInformationSchemaTableHandle.java b/presto-main-base/src/test/java/com/facebook/presto/metadata/TestInformationSchemaTableHandle.java index 8b072926db5c1..61e1c0bca3f65 100644 --- a/presto-main-base/src/test/java/com/facebook/presto/metadata/TestInformationSchemaTableHandle.java +++ b/presto-main-base/src/test/java/com/facebook/presto/metadata/TestInformationSchemaTableHandle.java @@ -14,6 +14,8 @@ package com.facebook.presto.metadata; import com.facebook.airlift.json.JsonModule; +import com.facebook.drift.codec.guice.ThriftCodecModule; +import com.facebook.presto.connector.ConnectorManager; import com.facebook.presto.connector.informationSchema.InformationSchemaTableHandle; import com.facebook.presto.spi.ConnectorTableHandle; import com.fasterxml.jackson.core.type.TypeReference; @@ -21,6 +23,7 @@ import com.google.common.collect.ImmutableMap; import com.google.inject.Guice; import com.google.inject.Injector; +import com.google.inject.Scopes; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -44,7 +47,13 @@ public class TestInformationSchemaTableHandle @BeforeMethod public void startUp() { - Injector injector = Guice.createInjector(new JsonModule(), new HandleJsonModule()); + Injector injector = Guice.createInjector( + new JsonModule(), + binder -> { + binder.install(new HandleJsonModule()); + binder.bind(ConnectorManager.class).toProvider(() -> null).in(Scopes.SINGLETON); + binder.install(new ThriftCodecModule()); + }); objectMapper = injector.getInstance(ObjectMapper.class); } diff --git a/presto-main-base/src/test/java/com/facebook/presto/metadata/TestSystemTableHandle.java b/presto-main-base/src/test/java/com/facebook/presto/metadata/TestSystemTableHandle.java index 0e291c6a8fb59..5083171e881b1 100644 --- a/presto-main-base/src/test/java/com/facebook/presto/metadata/TestSystemTableHandle.java +++ b/presto-main-base/src/test/java/com/facebook/presto/metadata/TestSystemTableHandle.java @@ -14,6 +14,8 @@ package com.facebook.presto.metadata; import com.facebook.airlift.json.JsonModule; +import com.facebook.drift.codec.guice.ThriftCodecModule; +import com.facebook.presto.connector.ConnectorManager; import com.facebook.presto.connector.system.SystemTableHandle; import com.facebook.presto.spi.ConnectorId; import com.facebook.presto.spi.ConnectorTableHandle; @@ -23,6 +25,7 @@ import com.google.common.collect.ImmutableMap; import com.google.inject.Guice; import com.google.inject.Injector; +import com.google.inject.Scopes; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -47,7 +50,13 @@ public class TestSystemTableHandle @BeforeMethod public void startUp() { - Injector injector = Guice.createInjector(new JsonModule(), new HandleJsonModule()); + Injector injector = Guice.createInjector( + new JsonModule(), + binder -> { + binder.install(new HandleJsonModule()); + binder.bind(ConnectorManager.class).toProvider(() -> null).in(Scopes.SINGLETON); + binder.install(new ThriftCodecModule()); + }); objectMapper = injector.getInstance(ObjectMapper.class); } diff --git a/presto-main-base/src/test/java/com/facebook/presto/sql/TestRowExpressionSerde.java b/presto-main-base/src/test/java/com/facebook/presto/sql/TestRowExpressionSerde.java index 485dbfbacdfc8..70216b1a2c294 100644 --- a/presto-main-base/src/test/java/com/facebook/presto/sql/TestRowExpressionSerde.java +++ b/presto-main-base/src/test/java/com/facebook/presto/sql/TestRowExpressionSerde.java @@ -17,6 +17,7 @@ import com.facebook.airlift.json.JsonCodec; import com.facebook.airlift.json.JsonModule; import com.facebook.airlift.stats.cardinality.HyperLogLog; +import com.facebook.drift.codec.guice.ThriftCodecModule; import com.facebook.presto.block.BlockJsonSerde; import com.facebook.presto.common.block.Block; import com.facebook.presto.common.block.BlockEncoding; @@ -29,6 +30,7 @@ import com.facebook.presto.common.type.Type; import com.facebook.presto.common.type.TypeManager; import com.facebook.presto.common.type.VarcharType; +import com.facebook.presto.connector.ConnectorManager; import com.facebook.presto.metadata.FunctionAndTypeManager; import com.facebook.presto.metadata.HandleJsonModule; import com.facebook.presto.metadata.Metadata; @@ -244,8 +246,10 @@ private JsonCodec getJsonCodec() { Module module = binder -> { binder.install(new JsonModule()); + binder.install(new ThriftCodecModule()); binder.install(new HandleJsonModule()); configBinder(binder).bindConfig(FeaturesConfig.class); + binder.bind(ConnectorManager.class).toProvider(() -> null); FunctionAndTypeManager functionAndTypeManager = createTestFunctionAndTypeManager(); binder.bind(TypeManager.class).toInstance(functionAndTypeManager); diff --git a/presto-main-base/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java b/presto-main-base/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java index ec24eba3a86d4..c087f7eea70de 100644 --- a/presto-main-base/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java +++ b/presto-main-base/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java @@ -272,7 +272,8 @@ public void testDefaults() .setInEqualityJoinPushdownEnabled(false) .setRewriteMinMaxByToTopNEnabled(false) .setPrestoSparkExecutionEnvironment(false) - .setMaxSerializableObjectSize(1000)); + .setMaxSerializableObjectSize(1000) + .setUseConnectorProvidedSerializationCodecs(false)); } @Test @@ -492,6 +493,7 @@ public void testExplicitPropertyMappings() .put("optimizer.utilize-unique-property-in-query-planning", "false") .put("optimizer.add-exchange-below-partial-aggregation-over-group-id", "true") .put("max_serializable_object_size", "50") + .put("use-connector-provided-serialization-codecs", "true") .build(); FeaturesConfig expected = new FeaturesConfig() @@ -709,7 +711,8 @@ public void testExplicitPropertyMappings() .setRewriteMinMaxByToTopNEnabled(true) .setInnerJoinPushdownEnabled(true) .setPrestoSparkExecutionEnvironment(true) - .setMaxSerializableObjectSize(50); + .setMaxSerializableObjectSize(50) + .setUseConnectorProvidedSerializationCodecs(true); assertFullMapping(properties, expected); } diff --git a/presto-main-base/src/test/java/com/facebook/presto/sql/planner/plan/TestStatisticsWriterNode.java b/presto-main-base/src/test/java/com/facebook/presto/sql/planner/plan/TestStatisticsWriterNode.java index 5e28a77689e8c..baa687541059c 100644 --- a/presto-main-base/src/test/java/com/facebook/presto/sql/planner/plan/TestStatisticsWriterNode.java +++ b/presto-main-base/src/test/java/com/facebook/presto/sql/planner/plan/TestStatisticsWriterNode.java @@ -16,8 +16,10 @@ import com.facebook.airlift.bootstrap.Bootstrap; import com.facebook.airlift.json.JsonCodec; import com.facebook.airlift.json.JsonModule; +import com.facebook.drift.codec.guice.ThriftCodecModule; import com.facebook.presto.common.type.Type; import com.facebook.presto.common.type.TypeManager; +import com.facebook.presto.connector.ConnectorManager; import com.facebook.presto.metadata.FunctionAndTypeManager; import com.facebook.presto.metadata.HandleJsonModule; import com.facebook.presto.metadata.HandleResolver; @@ -30,6 +32,7 @@ import com.facebook.presto.spi.plan.ValuesNode; import com.facebook.presto.spi.relation.VariableReferenceExpression; import com.facebook.presto.spi.statistics.ColumnStatisticType; +import com.facebook.presto.sql.analyzer.FeaturesConfig; import com.facebook.presto.sql.parser.SqlParser; import com.facebook.presto.testing.TestingHandleResolver; import com.facebook.presto.testing.TestingMetadata.TestingTableHandle; @@ -39,6 +42,7 @@ import com.google.inject.Injector; import com.google.inject.Key; import com.google.inject.Module; +import com.google.inject.Scopes; import org.testng.annotations.Test; import java.util.Optional; @@ -127,6 +131,9 @@ private JsonCodec getJsonCodec() FunctionAndTypeManager functionAndTypeManager = createTestFunctionAndTypeManager(); binder.install(new JsonModule()); binder.install(new HandleJsonModule()); + binder.bind(ConnectorManager.class).toProvider(() -> null).in(Scopes.SINGLETON); + binder.install(new ThriftCodecModule()); + binder.bind(FeaturesConfig.class).toInstance(new FeaturesConfig()); binder.bind(SqlParser.class).toInstance(sqlParser); binder.bind(TypeManager.class).toInstance(functionAndTypeManager); newSetBinder(binder, Type.class); diff --git a/presto-main-base/src/test/java/com/facebook/presto/sql/planner/plan/TestWindowNode.java b/presto-main-base/src/test/java/com/facebook/presto/sql/planner/plan/TestWindowNode.java index fa5a8c6db51dc..f71b0e889649f 100644 --- a/presto-main-base/src/test/java/com/facebook/presto/sql/planner/plan/TestWindowNode.java +++ b/presto-main-base/src/test/java/com/facebook/presto/sql/planner/plan/TestWindowNode.java @@ -16,9 +16,11 @@ import com.facebook.airlift.bootstrap.Bootstrap; import com.facebook.airlift.json.JsonCodec; import com.facebook.airlift.json.JsonModule; +import com.facebook.drift.codec.guice.ThriftCodecModule; import com.facebook.presto.common.block.SortOrder; import com.facebook.presto.common.type.Type; import com.facebook.presto.common.type.TypeManager; +import com.facebook.presto.connector.ConnectorManager; import com.facebook.presto.metadata.FunctionAndTypeManager; import com.facebook.presto.metadata.HandleJsonModule; import com.facebook.presto.server.SliceDeserializer; @@ -159,10 +161,12 @@ private JsonCodec getJsonCodec() SqlParser sqlParser = new SqlParser(); FunctionAndTypeManager functionAndTypeManager = createTestFunctionAndTypeManager(); binder.install(new JsonModule()); + binder.install(new ThriftCodecModule()); binder.install(new HandleJsonModule()); + configBinder(binder).bindConfig(FeaturesConfig.class); + binder.bind(ConnectorManager.class).toProvider(() -> null); binder.bind(SqlParser.class).toInstance(sqlParser); binder.bind(TypeManager.class).toInstance(functionAndTypeManager); - configBinder(binder).bindConfig(FeaturesConfig.class); newSetBinder(binder, Type.class); jsonBinder(binder).addSerializerBinding(Slice.class).to(SliceSerializer.class); jsonBinder(binder).addDeserializerBinding(Slice.class).to(SliceDeserializer.class); diff --git a/presto-main/src/test/java/com/facebook/presto/server/remotetask/TestHttpRemoteTask.java b/presto-main/src/test/java/com/facebook/presto/server/remotetask/TestHttpRemoteTask.java index 2de40f4fa2173..2fdeba6e84030 100644 --- a/presto-main/src/test/java/com/facebook/presto/server/remotetask/TestHttpRemoteTask.java +++ b/presto-main/src/test/java/com/facebook/presto/server/remotetask/TestHttpRemoteTask.java @@ -36,6 +36,7 @@ import com.facebook.presto.common.type.Type; import com.facebook.presto.common.type.TypeManager; import com.facebook.presto.connector.ConnectorCodecManager; +import com.facebook.presto.connector.ConnectorManager; import com.facebook.presto.execution.Lifespan; import com.facebook.presto.execution.NodeTaskMap; import com.facebook.presto.execution.QueryManagerConfig; @@ -367,6 +368,7 @@ private static HttpRemoteTaskFactory createHttpRemoteTaskFactory(TestingTaskReso @Override public void configure(Binder binder) { + binder.bind(ConnectorManager.class).toProvider(() -> null).in(Scopes.SINGLETON); binder.bind(JsonMapper.class); binder.bind(ThriftMapper.class); configBinder(binder).bindConfig(FeaturesConfig.class); @@ -462,7 +464,7 @@ private HttpRemoteTaskFactory createHttpRemoteTaskFactory( return injector.getInstance(HttpRemoteTaskFactory.class); } - private static void poll(BooleanSupplier success) + static void poll(BooleanSupplier success) throws InterruptedException { long failAt = System.nanoTime() + FAIL_TIMEOUT.roundTo(NANOSECONDS); @@ -476,7 +478,7 @@ private static void poll(BooleanSupplier success) } } - private static void waitUntilIdle(AtomicLong lastActivityNanos) + static void waitUntilIdle(AtomicLong lastActivityNanos) throws InterruptedException { long startTimeNanos = System.nanoTime(); @@ -496,7 +498,7 @@ private static void waitUntilIdle(AtomicLong lastActivityNanos) } } - private enum FailureScenario + enum FailureScenario { NO_FAILURE, TASK_MISMATCH, @@ -549,6 +551,7 @@ public synchronized TaskInfo getTaskInfo( } Map taskSourceMap = new HashMap<>(); + private TaskUpdateRequest lastTaskUpdateRequest; @POST @Path("{taskId}") @@ -559,6 +562,7 @@ public synchronized TaskInfo createOrUpdateTask( TaskUpdateRequest taskUpdateRequest, @Context UriInfo uriInfo) { + this.lastTaskUpdateRequest = taskUpdateRequest; for (TaskSource source : taskUpdateRequest.getSources()) { taskSourceMap.compute(source.getPlanNodeId(), (planNodeId, taskSource) -> taskSource == null ? source : taskSource.update(source)); } @@ -575,6 +579,11 @@ public synchronized TaskSource getTaskSource(PlanNodeId planNodeId) return new TaskSource(source.getPlanNodeId(), source.getSplits(), source.getNoMoreSplitsForLifespan(), source.isNoMoreSplits()); } + public synchronized TaskUpdateRequest getLastTaskUpdateRequest() + { + return lastTaskUpdateRequest; + } + @GET @Path("{taskId}/status") @Produces({MediaType.APPLICATION_JSON, APPLICATION_THRIFT_BINARY, APPLICATION_THRIFT_COMPACT, APPLICATION_THRIFT_FB_COMPACT}) diff --git a/presto-main/src/test/java/com/facebook/presto/server/remotetask/TestHttpRemoteTaskConnectorCodec.java b/presto-main/src/test/java/com/facebook/presto/server/remotetask/TestHttpRemoteTaskConnectorCodec.java new file mode 100644 index 0000000000000..19ccc01dfe34a --- /dev/null +++ b/presto-main/src/test/java/com/facebook/presto/server/remotetask/TestHttpRemoteTaskConnectorCodec.java @@ -0,0 +1,1400 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.server.remotetask; + +import com.facebook.airlift.bootstrap.Bootstrap; +import com.facebook.airlift.http.client.testing.TestingHttpClient; +import com.facebook.airlift.jaxrs.JsonMapper; +import com.facebook.airlift.jaxrs.testing.JaxrsTestingHttpProcessor; +import com.facebook.airlift.jaxrs.thrift.ThriftMapper; +import com.facebook.airlift.json.JsonCodec; +import com.facebook.airlift.json.JsonModule; +import com.facebook.airlift.json.smile.SmileCodec; +import com.facebook.airlift.json.smile.SmileModule; +import com.facebook.drift.codec.ThriftCodec; +import com.facebook.drift.codec.guice.ThriftCodecModule; +import com.facebook.drift.codec.utils.DataSizeToBytesThriftCodec; +import com.facebook.drift.codec.utils.DurationToMillisThriftCodec; +import com.facebook.drift.codec.utils.JodaDateTimeToEpochMillisThriftCodec; +import com.facebook.drift.codec.utils.LocaleToLanguageTagCodec; +import com.facebook.presto.SessionTestUtils; +import com.facebook.presto.client.NodeVersion; +import com.facebook.presto.common.predicate.TupleDomain; +import com.facebook.presto.common.type.Type; +import com.facebook.presto.common.type.TypeManager; +import com.facebook.presto.connector.ConnectorCodecManager; +import com.facebook.presto.cost.StatsAndCosts; +import com.facebook.presto.execution.Lifespan; +import com.facebook.presto.execution.NodeTaskMap; +import com.facebook.presto.execution.QueryManagerConfig; +import com.facebook.presto.execution.RemoteTask; +import com.facebook.presto.execution.ScheduledSplit; +import com.facebook.presto.execution.SchedulerStatsTracker; +import com.facebook.presto.execution.TaskId; +import com.facebook.presto.execution.TaskInfo; +import com.facebook.presto.execution.TaskManagerConfig; +import com.facebook.presto.execution.TaskSource; +import com.facebook.presto.execution.TaskStatus; +import com.facebook.presto.execution.TaskTestUtils; +import com.facebook.presto.execution.TestQueryManager; +import com.facebook.presto.execution.TestSqlTaskManager; +import com.facebook.presto.execution.buffer.OutputBuffers; +import com.facebook.presto.execution.scheduler.ExecutionWriterTarget; +import com.facebook.presto.execution.scheduler.TableWriteInfo; +import com.facebook.presto.metadata.ColumnHandleJacksonModule; +import com.facebook.presto.metadata.DeleteTableHandle; +import com.facebook.presto.metadata.DeleteTableHandleJacksonModule; +import com.facebook.presto.metadata.FunctionAndTypeManager; +import com.facebook.presto.metadata.FunctionHandleJacksonModule; +import com.facebook.presto.metadata.HandleResolver; +import com.facebook.presto.metadata.InsertTableHandle; +import com.facebook.presto.metadata.InsertTableHandleJacksonModule; +import com.facebook.presto.metadata.InternalNode; +import com.facebook.presto.metadata.OutputTableHandle; +import com.facebook.presto.metadata.OutputTableHandleJacksonModule; +import com.facebook.presto.metadata.PartitioningHandleJacksonModule; +import com.facebook.presto.metadata.Split; +import com.facebook.presto.metadata.SplitJacksonModule; +import com.facebook.presto.metadata.TableHandleJacksonModule; +import com.facebook.presto.metadata.TableLayoutHandleJacksonModule; +import com.facebook.presto.metadata.TransactionHandleJacksonModule; +import com.facebook.presto.server.InternalCommunicationConfig; +import com.facebook.presto.server.TaskUpdateRequest; +import com.facebook.presto.server.thrift.ConnectorSplitThriftCodec; +import com.facebook.presto.server.thrift.DeleteTableHandleThriftCodec; +import com.facebook.presto.server.thrift.InsertTableHandleThriftCodec; +import com.facebook.presto.server.thrift.OutputTableHandleThriftCodec; +import com.facebook.presto.server.thrift.TableHandleThriftCodec; +import com.facebook.presto.server.thrift.TableLayoutHandleThriftCodec; +import com.facebook.presto.server.thrift.TransactionHandleThriftCodec; +import com.facebook.presto.spi.ColumnHandle; +import com.facebook.presto.spi.ConnectorCodec; +import com.facebook.presto.spi.ConnectorDeleteTableHandle; +import com.facebook.presto.spi.ConnectorHandleResolver; +import com.facebook.presto.spi.ConnectorId; +import com.facebook.presto.spi.ConnectorIndexHandle; +import com.facebook.presto.spi.ConnectorInsertTableHandle; +import com.facebook.presto.spi.ConnectorOutputTableHandle; +import com.facebook.presto.spi.ConnectorSplit; +import com.facebook.presto.spi.ConnectorTableHandle; +import com.facebook.presto.spi.ConnectorTableLayoutHandle; +import com.facebook.presto.spi.HostAddress; +import com.facebook.presto.spi.NodeProvider; +import com.facebook.presto.spi.SchemaTableName; +import com.facebook.presto.spi.TableHandle; +import com.facebook.presto.spi.connector.ConnectorCodecProvider; +import com.facebook.presto.spi.connector.ConnectorPartitioningHandle; +import com.facebook.presto.spi.connector.ConnectorTransactionHandle; +import com.facebook.presto.spi.plan.Partitioning; +import com.facebook.presto.spi.plan.PartitioningScheme; +import com.facebook.presto.spi.plan.PlanFragmentId; +import com.facebook.presto.spi.plan.StageExecutionDescriptor; +import com.facebook.presto.spi.plan.TableScanNode; +import com.facebook.presto.spi.relation.VariableReferenceExpression; +import com.facebook.presto.spi.schedule.NodeSelectionStrategy; +import com.facebook.presto.sql.Serialization; +import com.facebook.presto.sql.analyzer.FeaturesConfig; +import com.facebook.presto.sql.planner.PlanFragment; +import com.facebook.presto.testing.TestingTransactionHandle; +import com.facebook.presto.type.TypeDeserializer; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableMultimap; +import com.google.common.collect.ImmutableSet; +import com.google.inject.Binder; +import com.google.inject.Injector; +import com.google.inject.Key; +import com.google.inject.Module; +import com.google.inject.Provides; +import com.google.inject.Scopes; +import com.google.inject.TypeLiteral; +import org.testng.annotations.Test; + +import java.net.URI; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; + +import static com.facebook.airlift.json.JsonBinder.jsonBinder; +import static com.facebook.airlift.json.JsonCodecBinder.jsonCodecBinder; +import static com.facebook.airlift.json.smile.SmileCodecBinder.smileCodecBinder; +import static com.facebook.drift.codec.guice.ThriftCodecBinder.thriftCodecBinder; +import static com.facebook.presto.execution.Lifespan.driverGroup; +import static com.facebook.presto.execution.TaskTestUtils.createPlanFragment; +import static com.facebook.presto.execution.buffer.OutputBuffers.createInitialEmptyOutputBuffers; +import static com.facebook.presto.metadata.FunctionAndTypeManager.createTestFunctionAndTypeManager; +import static com.facebook.presto.metadata.MetadataManager.createTestMetadataManager; +import static com.facebook.presto.server.remotetask.TestHttpRemoteTask.TestingTaskResource; +import static com.facebook.presto.spi.SplitContext.NON_CACHEABLE; +import static com.facebook.presto.sql.planner.SystemPartitioningHandle.SINGLE_DISTRIBUTION; +import static com.facebook.presto.sql.planner.SystemPartitioningHandle.SOURCE_DISTRIBUTION; +import static com.facebook.presto.testing.assertions.Assert.assertEquals; +import static com.google.common.collect.ImmutableList.toImmutableList; +import static com.google.common.collect.Iterables.getOnlyElement; +import static com.google.inject.multibindings.Multibinder.newSetBinder; +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; + +public class TestHttpRemoteTaskConnectorCodec +{ + private static final TaskManagerConfig TASK_MANAGER_CONFIG = new TaskManagerConfig(); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + @Test(timeOut = 50000) + public void testConnectorSplitBinarySerialization() + throws Exception + { + AtomicLong lastActivityNanos = new AtomicLong(System.nanoTime()); + TestingTaskResource testingTaskResource = new TestingTaskResource(lastActivityNanos, TestHttpRemoteTask.FailureScenario.NO_FAILURE); + + String connectorName = "test-codec-split"; + Injector injector = createInjectorWithCodec(connectorName, testingTaskResource); + HttpRemoteTaskFactory httpRemoteTaskFactory = injector.getInstance(HttpRemoteTaskFactory.class); + JsonCodec jsonCodec = injector.getInstance(Key.get(new TypeLiteral<>() {})); + + RemoteTask remoteTask = createRemoteTask(httpRemoteTaskFactory); + try { + testingTaskResource.setInitialTaskInfo(remoteTask.getTaskInfo()); + remoteTask.start(); + + Lifespan lifespan = driverGroup(1); + TestConnectorWithCodecSplit codecSplit = new TestConnectorWithCodecSplit("test-data", 42); + remoteTask.addSplits(ImmutableMultimap.of( + TaskTestUtils.TABLE_SCAN_NODE_ID, + new Split(new ConnectorId(connectorName), TestingTransactionHandle.create(), codecSplit, lifespan, NON_CACHEABLE))); + + TestHttpRemoteTask.poll(() -> testingTaskResource.getTaskSource(TaskTestUtils.TABLE_SCAN_NODE_ID) != null); + TestHttpRemoteTask.poll(() -> testingTaskResource.getTaskSource(TaskTestUtils.TABLE_SCAN_NODE_ID).getSplits().size() == 1); + + TaskUpdateRequest taskUpdateRequest = testingTaskResource.getLastTaskUpdateRequest(); + assertNotNull(taskUpdateRequest, "TaskUpdateRequest should not be null"); + + String json = jsonCodec.toJson(taskUpdateRequest); + JsonNode root = OBJECT_MAPPER.readTree(json); + JsonNode splitNode = root.at("/sources/0/splits/0/split/connectorSplit"); + assertTrue(splitNode.has("customSerializedValue"), + "Split should have customSerializedValue for binary serialization"); + assertFalse(splitNode.has("data"), + "Split should not have inline data field"); + + TaskUpdateRequest deserializedRequest = jsonCodec.fromJson(json); + TaskSource deserializedSource = deserializedRequest.getSources().get(0); + Split deserializedSplit = getOnlyElement(deserializedSource.getSplits()).getSplit(); + ConnectorSplit deserializedConnectorSplit = deserializedSplit.getConnectorSplit(); + assertEquals(deserializedConnectorSplit, codecSplit, "Expected deserialized split to match original"); + } + finally { + remoteTask.cancel(); + httpRemoteTaskFactory.stop(); + } + } + + @Test(timeOut = 50000) + public void testOutputTableHandleBinarySerialization() + throws Exception + { + AtomicLong lastActivityNanos = new AtomicLong(System.nanoTime()); + TestingTaskResource testingTaskResource = new TestingTaskResource(lastActivityNanos, TestHttpRemoteTask.FailureScenario.NO_FAILURE); + + String connectorName = "test-codec-output"; + Injector injector = createInjectorWithCodec(connectorName, testingTaskResource); + HttpRemoteTaskFactory httpRemoteTaskFactory = injector.getInstance(HttpRemoteTaskFactory.class); + JsonCodec taskUpdateRequestCodec = injector.getInstance(Key.get(new TypeLiteral<>() {})); + + ConnectorId connectorId = new ConnectorId(connectorName); + TestConnectorOutputTableHandle outputHandle = new TestConnectorOutputTableHandle("output_table"); + TableWriteInfo outputTableWriteInfo = new TableWriteInfo( + Optional.of(new ExecutionWriterTarget.CreateHandle( + new OutputTableHandle(connectorId, com.facebook.presto.testing.TestingTransactionHandle.create(), outputHandle), + new SchemaTableName("test_schema", "output_table"))), + Optional.empty()); + + RemoteTask outputTask = createRemoteTask(httpRemoteTaskFactory, createPlanFragment(), outputTableWriteInfo); + try { + testingTaskResource.setInitialTaskInfo(outputTask.getTaskInfo()); + outputTask.start(); + + TestHttpRemoteTask.poll(() -> testingTaskResource.getLastTaskUpdateRequest() != null); + TaskUpdateRequest outputRequest = testingTaskResource.getLastTaskUpdateRequest(); + String outputJson = taskUpdateRequestCodec.toJson(outputRequest); + + JsonNode root = OBJECT_MAPPER.readTree(outputJson); + JsonNode outputTableHandleNode = root.at("/tableWriteInfo/writerTarget/handle/connectorHandle"); + assertTrue(outputTableHandleNode.has("customSerializedValue"), + "OutputTableHandle should have customSerializedValue for binary serialization"); + assertFalse(outputTableHandleNode.has("tableName"), + "OutputTableHandle should not have inline tableName field"); + + ExecutionWriterTarget.CreateHandle createHandle = (ExecutionWriterTarget.CreateHandle) outputRequest.getTableWriteInfo().get().getWriterTarget().get(); + TestConnectorOutputTableHandle receivedHandle = (TestConnectorOutputTableHandle) createHandle.getHandle().getConnectorHandle(); + assertEquals(receivedHandle.getTableName(), outputHandle.getTableName(), "OutputTableHandle should match after round-trip"); + } + finally { + outputTask.cancel(); + httpRemoteTaskFactory.stop(); + } + } + + @Test(timeOut = 50000) + public void testInsertTableHandleBinarySerialization() + throws Exception + { + AtomicLong lastActivityNanos = new AtomicLong(System.nanoTime()); + TestingTaskResource testingTaskResource = new TestingTaskResource(lastActivityNanos, TestHttpRemoteTask.FailureScenario.NO_FAILURE); + + String connectorName = "test-codec-insert"; + Injector injector = createInjectorWithCodec(connectorName, testingTaskResource); + HttpRemoteTaskFactory httpRemoteTaskFactory = injector.getInstance(HttpRemoteTaskFactory.class); + JsonCodec taskUpdateRequestCodec = injector.getInstance(Key.get(new TypeLiteral<>() {})); + + ConnectorId connectorId = new ConnectorId(connectorName); + TestConnectorInsertTableHandle insertHandle = new TestConnectorInsertTableHandle("insert_table"); + TableWriteInfo insertTableWriteInfo = new TableWriteInfo( + Optional.of(new ExecutionWriterTarget.InsertHandle( + new InsertTableHandle(connectorId, com.facebook.presto.testing.TestingTransactionHandle.create(), insertHandle), + new SchemaTableName("test_schema", "insert_table"))), + Optional.empty()); + + RemoteTask insertTask = createRemoteTask(httpRemoteTaskFactory, createPlanFragment(), insertTableWriteInfo); + try { + testingTaskResource.setInitialTaskInfo(insertTask.getTaskInfo()); + insertTask.start(); + + TestHttpRemoteTask.poll(() -> testingTaskResource.getLastTaskUpdateRequest() != null); + TaskUpdateRequest insertRequest = testingTaskResource.getLastTaskUpdateRequest(); + String insertJson = taskUpdateRequestCodec.toJson(insertRequest); + + JsonNode root = OBJECT_MAPPER.readTree(insertJson); + JsonNode insertTableHandleNode = root.at("/tableWriteInfo/writerTarget/handle/connectorHandle"); + assertTrue(insertTableHandleNode.has("customSerializedValue"), + "InsertTableHandle should have customSerializedValue for binary serialization"); + assertFalse(insertTableHandleNode.has("tableName"), + "InsertTableHandle should not have inline tableName field"); + + ExecutionWriterTarget.InsertHandle deserializedInsertHandle = (ExecutionWriterTarget.InsertHandle) insertRequest.getTableWriteInfo().get().getWriterTarget().get(); + TestConnectorInsertTableHandle receivedHandle = (TestConnectorInsertTableHandle) deserializedInsertHandle.getHandle().getConnectorHandle(); + assertEquals(receivedHandle.getTableName(), insertHandle.getTableName(), "InsertTableHandle should match after round-trip"); + } + finally { + insertTask.cancel(); + httpRemoteTaskFactory.stop(); + } + } + + @Test(timeOut = 50000) + public void testDeleteTableHandleBinarySerialization() + throws Exception + { + AtomicLong lastActivityNanos = new AtomicLong(System.nanoTime()); + TestingTaskResource testingTaskResource = new TestingTaskResource(lastActivityNanos, TestHttpRemoteTask.FailureScenario.NO_FAILURE); + + String connectorName = "test-codec-delete"; + Injector injector = createInjectorWithCodec(connectorName, testingTaskResource); + HttpRemoteTaskFactory httpRemoteTaskFactory = injector.getInstance(HttpRemoteTaskFactory.class); + JsonCodec taskUpdateRequestCodec = injector.getInstance(Key.get(new TypeLiteral<>() {})); + + ConnectorId connectorId = new ConnectorId(connectorName); + TestConnectorDeleteTableHandle deleteHandle = new TestConnectorDeleteTableHandle("delete_table"); + TableWriteInfo deleteTableWriteInfo = new TableWriteInfo( + Optional.of(new ExecutionWriterTarget.DeleteHandle( + new DeleteTableHandle(connectorId, com.facebook.presto.testing.TestingTransactionHandle.create(), deleteHandle), + new SchemaTableName("test_schema", "delete_table"))), + Optional.empty()); + + RemoteTask deleteTask = createRemoteTask(httpRemoteTaskFactory, createPlanFragment(), deleteTableWriteInfo); + try { + testingTaskResource.setInitialTaskInfo(deleteTask.getTaskInfo()); + deleteTask.start(); + + TestHttpRemoteTask.poll(() -> testingTaskResource.getLastTaskUpdateRequest() != null); + TaskUpdateRequest deleteRequest = testingTaskResource.getLastTaskUpdateRequest(); + String deleteJson = taskUpdateRequestCodec.toJson(deleteRequest); + + JsonNode root = OBJECT_MAPPER.readTree(deleteJson); + JsonNode deleteTableHandleNode = root.at("/tableWriteInfo/writerTarget/handle/connectorHandle"); + assertTrue(deleteTableHandleNode.has("customSerializedValue"), + "DeleteTableHandle should have customSerializedValue for binary serialization"); + assertFalse(deleteTableHandleNode.has("tableName"), + "DeleteTableHandle should not have inline tableName field"); + + ExecutionWriterTarget.DeleteHandle deserializedDeleteHandle = (ExecutionWriterTarget.DeleteHandle) deleteRequest.getTableWriteInfo().get().getWriterTarget().get(); + TestConnectorDeleteTableHandle receivedHandle = (TestConnectorDeleteTableHandle) deserializedDeleteHandle.getHandle().getConnectorHandle(); + assertEquals(receivedHandle.getTableName(), deleteHandle.getTableName(), "DeleteTableHandle should match after round-trip"); + } + finally { + deleteTask.cancel(); + httpRemoteTaskFactory.stop(); + } + } + + @Test(timeOut = 50000) + public void testConnectorHandlesBinarySerialization() + throws Exception + { + AtomicLong lastActivityNanos = new AtomicLong(System.nanoTime()); + TestingTaskResource testingTaskResource = new TestingTaskResource(lastActivityNanos, TestHttpRemoteTask.FailureScenario.NO_FAILURE); + + String connectorName = "test-codec-handles"; + Injector injector = createInjectorWithCodec(connectorName, testingTaskResource); + HttpRemoteTaskFactory httpRemoteTaskFactory = injector.getInstance(HttpRemoteTaskFactory.class); + JsonCodec planFragmentCodec = injector.getInstance(Key.get(new TypeLiteral<>() {})); + + PlanFragment planFragment = createPlanFragmentWithCodecHandles(connectorName); + RemoteTask remoteTask = createRemoteTask(httpRemoteTaskFactory, planFragment); + + try { + testingTaskResource.setInitialTaskInfo(remoteTask.getTaskInfo()); + remoteTask.start(); + + TestHttpRemoteTask.poll(() -> testingTaskResource.getLastTaskUpdateRequest() != null); + TestHttpRemoteTask.poll(() -> testingTaskResource.getLastTaskUpdateRequest().getFragment().isPresent()); + + TaskUpdateRequest taskUpdateRequest = testingTaskResource.getLastTaskUpdateRequest(); + byte[] fragmentBytes = taskUpdateRequest.getFragment().get(); + String json = new String(fragmentBytes, UTF_8); + JsonNode root = OBJECT_MAPPER.readTree(json); + + JsonNode tableHandleNode = root.at("/root/table/connectorHandle"); + assertTrue(tableHandleNode.has("customSerializedValue"), + "TableHandle should have customSerializedValue for binary serialization"); + assertFalse(tableHandleNode.has("tableName"), + "TableHandle should not have inline tableName field"); + + JsonNode layoutHandleNode = root.at("/root/table/connectorTableLayout"); + assertTrue(layoutHandleNode.has("customSerializedValue"), + "TableLayoutHandle should have customSerializedValue for binary serialization"); + assertFalse(layoutHandleNode.has("layoutName"), + "TableLayoutHandle should not have inline layoutName field"); + + JsonNode assignmentsNode = root.at("/root/assignments"); + assertTrue(assignmentsNode.isObject() && assignmentsNode.size() > 0, + "Should have at least one column assignment"); + assignmentsNode.fields().forEachRemaining(entry -> { + JsonNode columnHandleNode = entry.getValue(); + assertTrue(columnHandleNode.has("customSerializedValue"), + "ColumnHandle should have customSerializedValue for binary serialization"); + assertFalse(columnHandleNode.has("columnName"), + "ColumnHandle should not have inline columnName field"); + assertFalse(columnHandleNode.has("columnType"), + "ColumnHandle should not have inline columnType field"); + }); + + PlanFragment receivedFragment = planFragmentCodec.fromJson(json); + assertNotNull(receivedFragment, "Deserialized PlanFragment should not be null"); + assertNotNull(receivedFragment.getRoot(), "Deserialized PlanFragment should have a root node"); + + TableScanNode originalScan = (TableScanNode) planFragment.getRoot(); + TableScanNode receivedScan = (TableScanNode) receivedFragment.getRoot(); + + TestConnectorTableHandle originalTableHandle = (TestConnectorTableHandle) originalScan.getTable().getConnectorHandle(); + TestConnectorTableHandle receivedTableHandle = (TestConnectorTableHandle) receivedScan.getTable().getConnectorHandle(); + assertEquals(receivedTableHandle.getTableName(), originalTableHandle.getTableName(), "TableHandle should match after round-trip"); + + TestConnectorTableLayoutHandle originalLayoutHandle = (TestConnectorTableLayoutHandle) originalScan.getTable().getLayout().get(); + TestConnectorTableLayoutHandle receivedLayoutHandle = (TestConnectorTableLayoutHandle) receivedScan.getTable().getLayout().get(); + assertEquals(receivedLayoutHandle.getLayoutName(), originalLayoutHandle.getLayoutName(), "TableLayoutHandle should match after round-trip"); + + TestConnectorColumnHandle originalColumnHandle = (TestConnectorColumnHandle) originalScan.getAssignments().values().iterator().next(); + TestConnectorColumnHandle receivedColumnHandle = (TestConnectorColumnHandle) receivedScan.getAssignments().values().iterator().next(); + assertEquals(receivedColumnHandle.getColumnName(), originalColumnHandle.getColumnName(), "ColumnHandle name should match after round-trip"); + assertEquals(receivedColumnHandle.getColumnType(), originalColumnHandle.getColumnType(), "ColumnHandle type should match after round-trip"); + } + finally { + remoteTask.cancel(); + httpRemoteTaskFactory.stop(); + } + } + + @Test(timeOut = 50000) + public void testMixedConnectorSerializationWithAndWithoutCodec() + throws Exception + { + AtomicLong lastActivityNanos = new AtomicLong(System.nanoTime()); + TestingTaskResource testingTaskResource = new TestingTaskResource(lastActivityNanos, TestHttpRemoteTask.FailureScenario.NO_FAILURE); + + String connectorWithCodec = "test-with-codec"; + String connectorWithoutCodec = "test-without-codec"; + Injector injector = createInjectorWithMixedConnectors(connectorWithCodec, connectorWithoutCodec, testingTaskResource); + HttpRemoteTaskFactory httpRemoteTaskFactory = injector.getInstance(HttpRemoteTaskFactory.class); + JsonCodec jsonCodec = injector.getInstance(Key.get(new TypeLiteral<>() {})); + + RemoteTask remoteTask = createRemoteTask(httpRemoteTaskFactory); + try { + testingTaskResource.setInitialTaskInfo(remoteTask.getTaskInfo()); + remoteTask.start(); + + Lifespan lifespan = driverGroup(1); + + TestConnectorWithCodecSplit splitWithCodec = new TestConnectorWithCodecSplit("codec-data", 100); + TestConnectorWithoutCodecSplit splitWithoutCodec = new TestConnectorWithoutCodecSplit("json-data", 200); + + remoteTask.addSplits(ImmutableMultimap.of( + TaskTestUtils.TABLE_SCAN_NODE_ID, + new Split(new ConnectorId(connectorWithCodec), TestingTransactionHandle.create(), splitWithCodec, lifespan, NON_CACHEABLE), + TaskTestUtils.TABLE_SCAN_NODE_ID, + new Split(new ConnectorId(connectorWithoutCodec), TestingTransactionHandle.create(), splitWithoutCodec, lifespan, NON_CACHEABLE))); + + TestHttpRemoteTask.poll(() -> testingTaskResource.getTaskSource(TaskTestUtils.TABLE_SCAN_NODE_ID) != null); + TestHttpRemoteTask.poll(() -> testingTaskResource.getTaskSource(TaskTestUtils.TABLE_SCAN_NODE_ID).getSplits().size() == 2); + + TaskUpdateRequest taskUpdateRequest = testingTaskResource.getLastTaskUpdateRequest(); + assertNotNull(taskUpdateRequest, "TaskUpdateRequest should not be null"); + + String json = jsonCodec.toJson(taskUpdateRequest); + JsonNode root = OBJECT_MAPPER.readTree(json); + JsonNode splitsNode = root.at("/sources/0/splits"); + + assertTrue(splitsNode.isArray() && splitsNode.size() == 2, + "Should have exactly 2 splits"); + + JsonNode codecSplitNode = null; + JsonNode jsonSplitNode = null; + + for (JsonNode splitWrapper : splitsNode) { + JsonNode connectorIdNode = splitWrapper.at("/split/connectorId"); + String catalogName = connectorIdNode.asText(); + JsonNode connectorSplitNode = splitWrapper.at("/split/connectorSplit"); + + if (connectorWithCodec.equals(catalogName)) { + codecSplitNode = connectorSplitNode; + } + else if (connectorWithoutCodec.equals(catalogName)) { + jsonSplitNode = connectorSplitNode; + } + } + + assertNotNull(codecSplitNode, "Should find split from connector with codec"); + assertNotNull(jsonSplitNode, "Should find split from connector without codec"); + + assertTrue(codecSplitNode.has("customSerializedValue"), + "Split with codec should have customSerializedValue for binary serialization"); + assertFalse(codecSplitNode.has("data"), + "Split with codec should not have inline data field"); + + assertFalse(jsonSplitNode.has("customSerializedValue"), + "Split without codec should not have customSerializedValue"); + assertTrue(jsonSplitNode.has("data"), + "Split without codec should have inline data field for JSON serialization"); + assertTrue(jsonSplitNode.has("sequence"), + "Split without codec should have inline sequence field for JSON serialization"); + + TaskUpdateRequest deserializedRequest = jsonCodec.fromJson(json); + TaskSource deserializedSource = deserializedRequest.getSources().get(0); + List deserializedSplits = deserializedSource.getSplits().stream() + .map(ScheduledSplit::getSplit) + .collect(toImmutableList()); + + assertEquals(deserializedSplits.size(), 2, "Should have 2 deserialized splits"); + + boolean foundCodecSplit = false; + boolean foundJsonSplit = false; + + for (Split split : deserializedSplits) { + if (split.getConnectorSplit() instanceof TestConnectorWithCodecSplit) { + TestConnectorWithCodecSplit deserialized = (TestConnectorWithCodecSplit) split.getConnectorSplit(); + assertEquals(deserialized, splitWithCodec, "Codec split should match after round-trip"); + foundCodecSplit = true; + } + else if (split.getConnectorSplit() instanceof TestConnectorWithoutCodecSplit) { + TestConnectorWithoutCodecSplit deserialized = (TestConnectorWithoutCodecSplit) split.getConnectorSplit(); + assertEquals(deserialized, splitWithoutCodec, "JSON split should match after round-trip"); + foundJsonSplit = true; + } + } + + assertTrue(foundCodecSplit, "Should have found and verified the codec split"); + assertTrue(foundJsonSplit, "Should have found and verified the JSON split"); + } + finally { + remoteTask.cancel(); + httpRemoteTaskFactory.stop(); + } + } + + private static RemoteTask createRemoteTask(HttpRemoteTaskFactory httpRemoteTaskFactory) + { + return createRemoteTask(httpRemoteTaskFactory, createPlanFragment()); + } + + private static RemoteTask createRemoteTask(HttpRemoteTaskFactory httpRemoteTaskFactory, PlanFragment planFragment) + { + return createRemoteTask(httpRemoteTaskFactory, planFragment, new TableWriteInfo(Optional.empty(), Optional.empty())); + } + + private static RemoteTask createRemoteTask(HttpRemoteTaskFactory httpRemoteTaskFactory, PlanFragment planFragment, TableWriteInfo tableWriteInfo) + { + return httpRemoteTaskFactory.createRemoteTask( + SessionTestUtils.TEST_SESSION, + new TaskId("test", 1, 0, 2, 0), + new InternalNode("node-id", URI.create("http://fake.invalid/"), new NodeVersion("version"), false), + planFragment, + ImmutableMultimap.of(), + createInitialEmptyOutputBuffers(OutputBuffers.BufferType.BROADCAST), + new NodeTaskMap.NodeStatsTracker(i -> {}, i -> {}, (age, i) -> {}), + true, + tableWriteInfo, + SchedulerStatsTracker.NOOP); + } + + private static PlanFragment createPlanFragmentWithCodecHandles(String connectorName) + { + ConnectorId connectorId = new ConnectorId(connectorName); + TestConnectorTableHandle tableHandle = new TestConnectorTableHandle("test_table"); + TestConnectorTableLayoutHandle layoutHandle = new TestConnectorTableLayoutHandle("test_layout"); + TestConnectorColumnHandle columnHandle = new TestConnectorColumnHandle("test_column", "VARCHAR"); + VariableReferenceExpression variable = new VariableReferenceExpression(Optional.empty(), "test_column", com.facebook.presto.common.type.VarcharType.VARCHAR); + + return new PlanFragment( + new PlanFragmentId(0), + new TableScanNode( + Optional.empty(), + TaskTestUtils.TABLE_SCAN_NODE_ID, + new TableHandle(connectorId, tableHandle, TestingTransactionHandle.create(), Optional.of(layoutHandle)), + ImmutableList.of(variable), + ImmutableMap.of(variable, columnHandle), + TupleDomain.all(), + TupleDomain.all(), + Optional.empty()), + ImmutableSet.of(variable), + SOURCE_DISTRIBUTION, + ImmutableList.of(TaskTestUtils.TABLE_SCAN_NODE_ID), + new PartitioningScheme(Partitioning.create(SINGLE_DISTRIBUTION, ImmutableList.of()), ImmutableList.of(variable)) + .withBucketToPartition(Optional.of(new int[1])), + StageExecutionDescriptor.ungroupedExecution(), + false, + Optional.of(StatsAndCosts.empty()), + Optional.empty()); + } + + private static Injector createInjectorWithCodec(String connectorName, TestingTaskResource testingTaskResource) + throws Exception + { + return createInjectorWithMixedConnectors(connectorName, "unused-connector", testingTaskResource); + } + + private static Injector createInjectorWithMixedConnectors( + String connectorWithCodec, + String connectorWithoutCodec, + TestingTaskResource testingTaskResource) + throws Exception + { + InternalCommunicationConfig internalCommunicationConfig = new InternalCommunicationConfig().setThriftTransportEnabled(false); + Bootstrap app = new Bootstrap( + new JsonModule(), + new SmileModule(), + new ThriftCodecModule(), + new Module() + { + @Override + public void configure(Binder binder) + { + binder.bind(JsonMapper.class); + binder.bind(ThriftMapper.class); + + FeaturesConfig featuresConfig = new FeaturesConfig(); + featuresConfig.setUseConnectorProvidedSerializationCodecs(true); + binder.bind(FeaturesConfig.class).toInstance(featuresConfig); + + TestConnectorWithCodecProvider codecProvider = new TestConnectorWithCodecProvider(); + + Map> splitCodecMap = new ConcurrentHashMap<>(); + splitCodecMap.put(connectorWithCodec, codecProvider.getConnectorSplitCodec().get()); + + Map> tableHandleCodecMap = new ConcurrentHashMap<>(); + tableHandleCodecMap.put(connectorWithCodec, codecProvider.getConnectorTableHandleCodec().get()); + + Map> columnHandleCodecMap = new ConcurrentHashMap<>(); + columnHandleCodecMap.put(connectorWithCodec, codecProvider.getConnectorColumnHandleCodec().get()); + + Map> tableLayoutHandleCodecMap = new ConcurrentHashMap<>(); + tableLayoutHandleCodecMap.put(connectorWithCodec, codecProvider.getConnectorTableLayoutHandleCodec().get()); + + Map> outputTableHandleCodecMap = new ConcurrentHashMap<>(); + outputTableHandleCodecMap.put(connectorWithCodec, codecProvider.getConnectorOutputTableHandleCodec().get()); + + Map> insertTableHandleCodecMap = new ConcurrentHashMap<>(); + insertTableHandleCodecMap.put(connectorWithCodec, codecProvider.getConnectorInsertTableHandleCodec().get()); + + Map> deleteTableHandleCodecMap = new ConcurrentHashMap<>(); + deleteTableHandleCodecMap.put(connectorWithCodec, codecProvider.getConnectorDeleteTableHandleCodec().get()); + + HandleResolver handleResolver = new HandleResolver(); + handleResolver.addConnectorName(connectorWithCodec, new TestConnectorWithCodecHandleResolver()); + handleResolver.addConnectorName(connectorWithoutCodec, new TestConnectorWithoutCodecHandleResolver()); + binder.bind(HandleResolver.class).toInstance(handleResolver); + + Function>> tableHandleCodecExtractor = + connectorId -> Optional.ofNullable(tableHandleCodecMap.get(connectorId.getCatalogName())); + Function>> tableLayoutHandleCodecExtractor = + connectorId -> Optional.ofNullable(tableLayoutHandleCodecMap.get(connectorId.getCatalogName())); + Function>> columnHandleCodecExtractor = + connectorId -> Optional.ofNullable(columnHandleCodecMap.get(connectorId.getCatalogName())); + Function>> outputTableHandleCodecExtractor = + connectorId -> Optional.ofNullable(outputTableHandleCodecMap.get(connectorId.getCatalogName())); + Function>> insertTableHandleCodecExtractor = + connectorId -> Optional.ofNullable(insertTableHandleCodecMap.get(connectorId.getCatalogName())); + Function>> deleteTableHandleCodecExtractor = + connectorId -> Optional.ofNullable(deleteTableHandleCodecMap.get(connectorId.getCatalogName())); + Function>> noOpIndexCodec = + connectorId -> Optional.empty(); + Function>> noOpTransactionCodec = + connectorId -> Optional.empty(); + Function>> noOpPartitioningCodec = + connectorId -> Optional.empty(); + Function>> splitCodecExtractor = + connectorId -> Optional.ofNullable(splitCodecMap.get(connectorId.getCatalogName())); + + jsonBinder(binder).addModuleBinding().toInstance(new TableHandleJacksonModule(handleResolver, featuresConfig, tableHandleCodecExtractor)); + jsonBinder(binder).addModuleBinding().toInstance(new TableLayoutHandleJacksonModule(handleResolver, featuresConfig, tableLayoutHandleCodecExtractor)); + jsonBinder(binder).addModuleBinding().toInstance(new ColumnHandleJacksonModule(handleResolver, featuresConfig, columnHandleCodecExtractor)); + jsonBinder(binder).addModuleBinding().toInstance(new OutputTableHandleJacksonModule(handleResolver, featuresConfig, outputTableHandleCodecExtractor)); + jsonBinder(binder).addModuleBinding().toInstance(new InsertTableHandleJacksonModule(handleResolver, featuresConfig, insertTableHandleCodecExtractor)); + jsonBinder(binder).addModuleBinding().toInstance(new DeleteTableHandleJacksonModule(handleResolver, featuresConfig, deleteTableHandleCodecExtractor)); + jsonBinder(binder).addModuleBinding().toInstance(new com.facebook.presto.index.IndexHandleJacksonModule(handleResolver, featuresConfig, noOpIndexCodec)); + jsonBinder(binder).addModuleBinding().toInstance(new TransactionHandleJacksonModule(handleResolver, featuresConfig, noOpTransactionCodec)); + jsonBinder(binder).addModuleBinding().toInstance(new PartitioningHandleJacksonModule(handleResolver, featuresConfig, noOpPartitioningCodec)); + jsonBinder(binder).addModuleBinding().toInstance(new FunctionHandleJacksonModule(handleResolver)); + jsonBinder(binder).addModuleBinding().toInstance(new SplitJacksonModule(handleResolver, featuresConfig, splitCodecExtractor)); + + FunctionAndTypeManager functionAndTypeManager = createTestFunctionAndTypeManager(); + binder.bind(TypeManager.class).toInstance(functionAndTypeManager); + jsonBinder(binder).addDeserializerBinding(Type.class).to(TypeDeserializer.class); + newSetBinder(binder, Type.class); + smileCodecBinder(binder).bindSmileCodec(TaskStatus.class); + smileCodecBinder(binder).bindSmileCodec(TaskInfo.class); + smileCodecBinder(binder).bindSmileCodec(TaskUpdateRequest.class); + smileCodecBinder(binder).bindSmileCodec(PlanFragment.class); + jsonCodecBinder(binder).bindJsonCodec(TaskStatus.class); + jsonCodecBinder(binder).bindJsonCodec(TaskInfo.class); + jsonCodecBinder(binder).bindJsonCodec(TaskUpdateRequest.class); + jsonCodecBinder(binder).bindJsonCodec(PlanFragment.class); + jsonCodecBinder(binder).bindJsonCodec(TableWriteInfo.class); + jsonBinder(binder).addKeySerializerBinding(VariableReferenceExpression.class).to(Serialization.VariableReferenceExpressionSerializer.class); + jsonBinder(binder).addKeyDeserializerBinding(VariableReferenceExpression.class).to(Serialization.VariableReferenceExpressionDeserializer.class); + jsonCodecBinder(binder).bindJsonCodec(ConnectorSplit.class); + jsonCodecBinder(binder).bindJsonCodec(ConnectorTransactionHandle.class); + jsonCodecBinder(binder).bindJsonCodec(ColumnHandle.class); + jsonCodecBinder(binder).bindJsonCodec(ConnectorTableHandle.class); + jsonCodecBinder(binder).bindJsonCodec(ConnectorOutputTableHandle.class); + jsonCodecBinder(binder).bindJsonCodec(ConnectorDeleteTableHandle.class); + jsonCodecBinder(binder).bindJsonCodec(ConnectorInsertTableHandle.class); + jsonCodecBinder(binder).bindJsonCodec(ConnectorTableLayoutHandle.class); + + binder.bind(ConnectorCodecManager.class).in(Scopes.SINGLETON); + + thriftCodecBinder(binder).bindCustomThriftCodec(ConnectorSplitThriftCodec.class); + thriftCodecBinder(binder).bindCustomThriftCodec(TransactionHandleThriftCodec.class); + thriftCodecBinder(binder).bindCustomThriftCodec(OutputTableHandleThriftCodec.class); + thriftCodecBinder(binder).bindCustomThriftCodec(InsertTableHandleThriftCodec.class); + thriftCodecBinder(binder).bindCustomThriftCodec(DeleteTableHandleThriftCodec.class); + thriftCodecBinder(binder).bindCustomThriftCodec(TableHandleThriftCodec.class); + thriftCodecBinder(binder).bindCustomThriftCodec(TableLayoutHandleThriftCodec.class); + thriftCodecBinder(binder).bindThriftCodec(TaskStatus.class); + thriftCodecBinder(binder).bindThriftCodec(TaskInfo.class); + thriftCodecBinder(binder).bindThriftCodec(TaskUpdateRequest.class); + thriftCodecBinder(binder).bindCustomThriftCodec(LocaleToLanguageTagCodec.class); + thriftCodecBinder(binder).bindCustomThriftCodec(JodaDateTimeToEpochMillisThriftCodec.class); + thriftCodecBinder(binder).bindCustomThriftCodec(DurationToMillisThriftCodec.class); + thriftCodecBinder(binder).bindCustomThriftCodec(DataSizeToBytesThriftCodec.class); + } + + @Provides + private HttpRemoteTaskFactory createHttpRemoteTaskFactory( + JsonMapper jsonMapper, + ThriftMapper thriftMapper, + JsonCodec taskStatusJsonCodec, + SmileCodec taskStatusSmileCodec, + ThriftCodec taskStatusThriftCodec, + JsonCodec taskInfoJsonCodec, + ThriftCodec taskInfoThriftCodec, + SmileCodec taskInfoSmileCodec, + JsonCodec taskUpdateRequestJsonCodec, + SmileCodec taskUpdateRequestSmileCodec, + ThriftCodec taskUpdateRequestThriftCodec, + JsonCodec planFragmentJsonCodec, + SmileCodec planFragmentSmileCodec) + { + JaxrsTestingHttpProcessor jaxrsTestingHttpProcessor = new JaxrsTestingHttpProcessor(URI.create("http://fake.invalid/"), testingTaskResource, jsonMapper, thriftMapper); + TestingHttpClient testingHttpClient = new TestingHttpClient(jaxrsTestingHttpProcessor.setTrace(false)); + testingTaskResource.setHttpClient(testingHttpClient); + return new HttpRemoteTaskFactory( + new QueryManagerConfig(), + TASK_MANAGER_CONFIG, + testingHttpClient, + new TestSqlTaskManager.MockLocationFactory(), + taskStatusJsonCodec, + taskStatusSmileCodec, + taskStatusThriftCodec, + taskInfoJsonCodec, + taskInfoSmileCodec, + taskInfoThriftCodec, + taskUpdateRequestJsonCodec, + taskUpdateRequestSmileCodec, + taskUpdateRequestThriftCodec, + planFragmentJsonCodec, + planFragmentSmileCodec, + new RemoteTaskStats(), + internalCommunicationConfig, + createTestMetadataManager(), + new TestQueryManager(), + new HandleResolver()); + } + }); + Injector injector = app + .doNotInitializeLogging() + .quiet() + .initialize(); + HandleResolver handleResolver = injector.getInstance(HandleResolver.class); + handleResolver.addConnectorName("test", new com.facebook.presto.testing.TestingHandleResolver()); + + ConnectorCodecManager codecManager = injector.getInstance(ConnectorCodecManager.class); + codecManager.addConnectorCodecProvider(new ConnectorId(connectorWithCodec), new TestConnectorWithCodecProvider()); + + return injector; + } + + /** + * Test connector split that supports binary serialization via codec + */ + public static class TestConnectorWithCodecSplit + implements ConnectorSplit + { + private final String data; + private final int sequence; + + @JsonCreator + public TestConnectorWithCodecSplit( + @JsonProperty("data") String data, + @JsonProperty("sequence") int sequence) + { + this.data = data; + this.sequence = sequence; + } + + @JsonProperty + public String getData() + { + return data; + } + + @JsonProperty + public int getSequence() + { + return sequence; + } + + @Override + public NodeSelectionStrategy getNodeSelectionStrategy() + { + return NodeSelectionStrategy.NO_PREFERENCE; + } + + @Override + public List getPreferredNodes(NodeProvider nodeProvider) + { + return ImmutableList.of(); + } + + @Override + public Object getInfo() + { + return ImmutableMap.of("data", data, "sequence", sequence); + } + + @Override + public boolean equals(Object obj) + { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + TestConnectorWithCodecSplit that = (TestConnectorWithCodecSplit) obj; + return sequence == that.sequence && Objects.equals(data, that.data); + } + + @Override + public int hashCode() + { + return Objects.hash(data, sequence); + } + } + + /** + * Test connector codec provider that provides binary serialization + */ + public static class TestConnectorWithCodecProvider + implements ConnectorCodecProvider + { + @Override + public Optional> getConnectorSplitCodec() + { + return Optional.of(new ConnectorCodec<>() + { + @Override + public byte[] serialize(ConnectorSplit split) + { + TestConnectorWithCodecSplit codecSplit = (TestConnectorWithCodecSplit) split; + return (codecSplit.getData() + "|" + codecSplit.getSequence()).getBytes(UTF_8); + } + + @Override + public ConnectorSplit deserialize(byte[] data) + { + String[] parts = new String(data, UTF_8).split("\\|"); + return new TestConnectorWithCodecSplit(parts[0], Integer.parseInt(parts[1])); + } + }); + } + + @Override + public Optional> getConnectorTableHandleCodec() + { + return Optional.of(new ConnectorCodec<>() + { + @Override + public byte[] serialize(ConnectorTableHandle handle) + { + TestConnectorTableHandle tableHandle = (TestConnectorTableHandle) handle; + return tableHandle.getTableName().getBytes(UTF_8); + } + + @Override + public ConnectorTableHandle deserialize(byte[] data) + { + return new TestConnectorTableHandle(new String(data, UTF_8)); + } + }); + } + + public Optional> getConnectorColumnHandleCodec() + { + return Optional.of(new ConnectorCodec<>() + { + @Override + public byte[] serialize(ColumnHandle handle) + { + TestConnectorColumnHandle columnHandle = (TestConnectorColumnHandle) handle; + return (columnHandle.getColumnName() + ":" + columnHandle.getColumnType()).getBytes(UTF_8); + } + + @Override + public ColumnHandle deserialize(byte[] data) + { + String[] parts = new String(data, UTF_8).split(":"); + return new TestConnectorColumnHandle(parts[0], parts[1]); + } + }); + } + + public Optional> getConnectorTableLayoutHandleCodec() + { + return Optional.of(new ConnectorCodec<>() + { + @Override + public byte[] serialize(ConnectorTableLayoutHandle handle) + { + TestConnectorTableLayoutHandle layoutHandle = (TestConnectorTableLayoutHandle) handle; + return layoutHandle.getLayoutName().getBytes(UTF_8); + } + + @Override + public ConnectorTableLayoutHandle deserialize(byte[] data) + { + return new TestConnectorTableLayoutHandle(new String(data, UTF_8)); + } + }); + } + + public Optional> getConnectorOutputTableHandleCodec() + { + return Optional.of(new ConnectorCodec<>() + { + @Override + public byte[] serialize(ConnectorOutputTableHandle handle) + { + TestConnectorOutputTableHandle outputHandle = (TestConnectorOutputTableHandle) handle; + return outputHandle.getTableName().getBytes(UTF_8); + } + + @Override + public ConnectorOutputTableHandle deserialize(byte[] data) + { + return new TestConnectorOutputTableHandle(new String(data, UTF_8)); + } + }); + } + + public Optional> getConnectorInsertTableHandleCodec() + { + return Optional.of(new ConnectorCodec<>() + { + @Override + public byte[] serialize(ConnectorInsertTableHandle handle) + { + TestConnectorInsertTableHandle insertHandle = (TestConnectorInsertTableHandle) handle; + return insertHandle.getTableName().getBytes(UTF_8); + } + + @Override + public ConnectorInsertTableHandle deserialize(byte[] data) + { + return new TestConnectorInsertTableHandle(new String(data, UTF_8)); + } + }); + } + + public Optional> getConnectorDeleteTableHandleCodec() + { + return Optional.of(new ConnectorCodec<>() + { + @Override + public byte[] serialize(ConnectorDeleteTableHandle handle) + { + TestConnectorDeleteTableHandle deleteHandle = (TestConnectorDeleteTableHandle) handle; + return deleteHandle.getTableName().getBytes(UTF_8); + } + + @Override + public ConnectorDeleteTableHandle deserialize(byte[] data) + { + return new TestConnectorDeleteTableHandle(new String(data, UTF_8)); + } + }); + } + } + + /** + * Test table handle with binary serialization support + */ + public static class TestConnectorTableHandle + implements ConnectorTableHandle + { + private final String tableName; + + @JsonCreator + public TestConnectorTableHandle(@JsonProperty("tableName") String tableName) + { + this.tableName = tableName; + } + + @JsonProperty + public String getTableName() + { + return tableName; + } + + @Override + public boolean equals(Object obj) + { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + TestConnectorTableHandle that = (TestConnectorTableHandle) obj; + return Objects.equals(tableName, that.tableName); + } + + @Override + public int hashCode() + { + return Objects.hash(tableName); + } + } + + /** + * Test table layout handle with binary serialization support + */ + public static class TestConnectorTableLayoutHandle + implements ConnectorTableLayoutHandle + { + private final String layoutName; + + public TestConnectorTableLayoutHandle(String layoutName) + { + this.layoutName = layoutName; + } + + public String getLayoutName() + { + return layoutName; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TestConnectorTableLayoutHandle that = (TestConnectorTableLayoutHandle) o; + return Objects.equals(layoutName, that.layoutName); + } + + @Override + public int hashCode() + { + return layoutName.hashCode(); + } + } + + /** + * Test column handle with binary serialization support + */ + public static class TestConnectorColumnHandle + implements ColumnHandle + { + private final String columnName; + private final String columnType; + + @JsonCreator + public TestConnectorColumnHandle( + @JsonProperty("columnName") String columnName, + @JsonProperty("columnType") String columnType) + { + this.columnName = columnName; + this.columnType = columnType; + } + + @JsonProperty + public String getColumnName() + { + return columnName; + } + + @JsonProperty + public String getColumnType() + { + return columnType; + } + + @Override + public boolean equals(Object obj) + { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + TestConnectorColumnHandle that = (TestConnectorColumnHandle) obj; + return Objects.equals(columnName, that.columnName) && + Objects.equals(columnType, that.columnType); + } + + @Override + public int hashCode() + { + return Objects.hash(columnName, columnType); + } + } + + /** + * Test connector handle resolver for codec-enabled connector + */ + public static class TestConnectorWithCodecHandleResolver + implements ConnectorHandleResolver + { + @Override + public Class getTableHandleClass() + { + return TestConnectorTableHandle.class; + } + + @Override + public Class getTableLayoutHandleClass() + { + return TestConnectorTableLayoutHandle.class; + } + + @Override + public Class getColumnHandleClass() + { + return TestConnectorColumnHandle.class; + } + + @Override + public Class getSplitClass() + { + return TestConnectorWithCodecSplit.class; + } + + @Override + public Class getOutputTableHandleClass() + { + return TestConnectorOutputTableHandle.class; + } + + @Override + public Class getInsertTableHandleClass() + { + return TestConnectorInsertTableHandle.class; + } + + @Override + public Class getDeleteTableHandleClass() + { + return TestConnectorDeleteTableHandle.class; + } + } + + /** + * Test output table handle with binary serialization support + */ + public static class TestConnectorOutputTableHandle + implements ConnectorOutputTableHandle + { + private final String tableName; + + @JsonCreator + public TestConnectorOutputTableHandle( + @JsonProperty("tableName") String tableName) + { + this.tableName = tableName; + } + + @JsonProperty + public String getTableName() + { + return tableName; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TestConnectorOutputTableHandle that = (TestConnectorOutputTableHandle) o; + return tableName.equals(that.tableName); + } + + @Override + public int hashCode() + { + return Objects.hash(tableName); + } + } + + /** + * Test insert table handle with binary serialization support + */ + public static class TestConnectorInsertTableHandle + implements ConnectorInsertTableHandle + { + private final String tableName; + + @JsonCreator + public TestConnectorInsertTableHandle( + @JsonProperty("tableName") String tableName) + { + this.tableName = tableName; + } + + @JsonProperty + public String getTableName() + { + return tableName; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TestConnectorInsertTableHandle that = (TestConnectorInsertTableHandle) o; + return tableName.equals(that.tableName); + } + + @Override + public int hashCode() + { + return Objects.hash(tableName); + } + } + + /** + * Test delete table handle with binary serialization support + */ + public static class TestConnectorDeleteTableHandle + implements ConnectorDeleteTableHandle + { + private final String tableName; + + @JsonCreator + public TestConnectorDeleteTableHandle( + @JsonProperty("tableName") String tableName) + { + this.tableName = tableName; + } + + @JsonProperty + public String getTableName() + { + return tableName; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TestConnectorDeleteTableHandle that = (TestConnectorDeleteTableHandle) o; + return tableName.equals(that.tableName); + } + + @Override + public int hashCode() + { + return Objects.hash(tableName); + } + } + + public static class TestConnectorWithoutCodecSplit + implements ConnectorSplit + { + private final String data; + private final int sequence; + + @JsonCreator + public TestConnectorWithoutCodecSplit( + @JsonProperty("data") String data, + @JsonProperty("sequence") int sequence) + { + this.data = data; + this.sequence = sequence; + } + + @JsonProperty + public String getData() + { + return data; + } + + @JsonProperty + public int getSequence() + { + return sequence; + } + + @Override + public NodeSelectionStrategy getNodeSelectionStrategy() + { + return NodeSelectionStrategy.NO_PREFERENCE; + } + + @Override + public List getPreferredNodes(NodeProvider nodeProvider) + { + return ImmutableList.of(); + } + + @Override + public Object getInfo() + { + return ImmutableMap.of("data", data, "sequence", sequence); + } + + @Override + public boolean equals(Object obj) + { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + TestConnectorWithoutCodecSplit that = (TestConnectorWithoutCodecSplit) obj; + return sequence == that.sequence && Objects.equals(data, that.data); + } + + @Override + public int hashCode() + { + return Objects.hash(data, sequence); + } + } + + public static class TestConnectorWithoutCodecHandleResolver + implements ConnectorHandleResolver + { + @Override + public Class getTableHandleClass() + { + throw new UnsupportedOperationException("Table handles not supported"); + } + + @Override + public Class getTableLayoutHandleClass() + { + throw new UnsupportedOperationException("Table layout handles not supported"); + } + + @Override + public Class getColumnHandleClass() + { + throw new UnsupportedOperationException("Column handles not supported"); + } + + @Override + public Class getSplitClass() + { + return TestConnectorWithoutCodecSplit.class; + } + + @Override + public Class getOutputTableHandleClass() + { + throw new UnsupportedOperationException("Output table handles not supported"); + } + + @Override + public Class getInsertTableHandleClass() + { + throw new UnsupportedOperationException("Insert table handles not supported"); + } + + @Override + public Class getDeleteTableHandleClass() + { + throw new UnsupportedOperationException("Delete table handles not supported"); + } + } +} diff --git a/presto-main/src/test/java/com/facebook/presto/server/remotetask/TestHttpRemoteTaskWithEventLoop.java b/presto-main/src/test/java/com/facebook/presto/server/remotetask/TestHttpRemoteTaskWithEventLoop.java index 3e8cc6d11e4b4..41c845c2b4a65 100644 --- a/presto-main/src/test/java/com/facebook/presto/server/remotetask/TestHttpRemoteTaskWithEventLoop.java +++ b/presto-main/src/test/java/com/facebook/presto/server/remotetask/TestHttpRemoteTaskWithEventLoop.java @@ -35,6 +35,7 @@ import com.facebook.presto.common.type.Type; import com.facebook.presto.common.type.TypeManager; import com.facebook.presto.connector.ConnectorCodecManager; +import com.facebook.presto.connector.ConnectorManager; import com.facebook.presto.execution.Lifespan; import com.facebook.presto.execution.NodeTaskMap; import com.facebook.presto.execution.QueryManagerConfig; @@ -375,6 +376,7 @@ private static HttpRemoteTaskFactory createHttpRemoteTaskFactory(TestingTaskReso @Override public void configure(Binder binder) { + binder.bind(ConnectorManager.class).toProvider(() -> null).in(Scopes.SINGLETON); binder.bind(JsonMapper.class); binder.bind(ThriftMapper.class); configBinder(binder).bindConfig(FeaturesConfig.class); diff --git a/presto-spark-base/src/test/java/com/facebook/presto/spark/execution/TestBatchTaskUpdateRequest.java b/presto-spark-base/src/test/java/com/facebook/presto/spark/execution/TestBatchTaskUpdateRequest.java index fda1cee85153c..c22db6a17b78d 100644 --- a/presto-spark-base/src/test/java/com/facebook/presto/spark/execution/TestBatchTaskUpdateRequest.java +++ b/presto-spark-base/src/test/java/com/facebook/presto/spark/execution/TestBatchTaskUpdateRequest.java @@ -16,9 +16,11 @@ import com.facebook.airlift.bootstrap.Bootstrap; import com.facebook.airlift.json.JsonCodec; import com.facebook.airlift.json.JsonModule; +import com.facebook.drift.codec.guice.ThriftCodecModule; import com.facebook.presto.Session; import com.facebook.presto.common.type.Type; import com.facebook.presto.common.type.TypeManager; +import com.facebook.presto.connector.ConnectorManager; import com.facebook.presto.execution.Location; import com.facebook.presto.execution.ScheduledSplit; import com.facebook.presto.execution.TaskId; @@ -45,6 +47,7 @@ import com.google.inject.Injector; import com.google.inject.Key; import com.google.inject.Module; +import com.google.inject.Scopes; import org.testng.annotations.Test; import java.util.ArrayList; @@ -151,6 +154,8 @@ private JsonCodec getJsonCodec() Module module = binder -> { binder.install(new JsonModule()); binder.install(new HandleJsonModule()); + binder.bind(ConnectorManager.class).toProvider(() -> null).in(Scopes.SINGLETON); + binder.install(new ThriftCodecModule()); configBinder(binder).bindConfig(FeaturesConfig.class); FunctionAndTypeManager functionAndTypeManager = createTestFunctionAndTypeManager(); binder.bind(TypeManager.class).toInstance(functionAndTypeManager); diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/connector/ConnectorCodecProvider.java b/presto-spi/src/main/java/com/facebook/presto/spi/connector/ConnectorCodecProvider.java index 4bd2d81d456b4..fb9f180395356 100644 --- a/presto-spi/src/main/java/com/facebook/presto/spi/connector/ConnectorCodecProvider.java +++ b/presto-spi/src/main/java/com/facebook/presto/spi/connector/ConnectorCodecProvider.java @@ -13,8 +13,10 @@ */ package com.facebook.presto.spi.connector; +import com.facebook.presto.spi.ColumnHandle; import com.facebook.presto.spi.ConnectorCodec; import com.facebook.presto.spi.ConnectorDeleteTableHandle; +import com.facebook.presto.spi.ConnectorIndexHandle; import com.facebook.presto.spi.ConnectorInsertTableHandle; import com.facebook.presto.spi.ConnectorOutputTableHandle; import com.facebook.presto.spi.ConnectorSplit; @@ -59,4 +61,19 @@ default Optional> getConnectorTableHandleCo { return Optional.empty(); } + + default Optional> getColumnHandleCodec() + { + return Optional.empty(); + } + + default Optional> getConnectorPartitioningHandleCodec() + { + return Optional.empty(); + } + + default Optional> getConnectorIndexHandleCodec() + { + return Optional.empty(); + } } diff --git a/presto-tpcds/src/main/java/com/facebook/presto/tpcds/thrift/TpcdsCodecProvider.java b/presto-tpcds/src/main/java/com/facebook/presto/tpcds/thrift/TpcdsCodecProvider.java index 09545c6b96fe7..fa3a678b550a8 100644 --- a/presto-tpcds/src/main/java/com/facebook/presto/tpcds/thrift/TpcdsCodecProvider.java +++ b/presto-tpcds/src/main/java/com/facebook/presto/tpcds/thrift/TpcdsCodecProvider.java @@ -28,34 +28,41 @@ public class TpcdsCodecProvider implements ConnectorCodecProvider { - private final ThriftCodecManager thriftCodecManager; + private final ConnectorCodec splitCodec; + private final ConnectorCodec transactionHandleCodec; + private final ConnectorCodec tableLayoutHandleCodec; + private final ConnectorCodec tableHandleCodec; public TpcdsCodecProvider(ThriftCodecManager thriftCodecManager) { - this.thriftCodecManager = requireNonNull(thriftCodecManager, "thriftCodecManager is null"); + requireNonNull(thriftCodecManager, "thriftCodecManager is null"); + this.splitCodec = new TpcdsSplitCodec(thriftCodecManager); + this.transactionHandleCodec = new TpcdsTransactionHandleCodec(thriftCodecManager); + this.tableLayoutHandleCodec = new TpcdsTableLayoutHandleCodec(thriftCodecManager); + this.tableHandleCodec = new TpcdsTableHandleCodec(thriftCodecManager); } @Override public Optional> getConnectorSplitCodec() { - return Optional.of(new TpcdsSplitCodec(thriftCodecManager)); + return Optional.of(splitCodec); } @Override public Optional> getConnectorTransactionHandleCodec() { - return Optional.of(new TpcdsTransactionHandleCodec(thriftCodecManager)); + return Optional.of(transactionHandleCodec); } @Override public Optional> getConnectorTableLayoutHandleCodec() { - return Optional.of(new TpcdsTableLayoutHandleCodec(thriftCodecManager)); + return Optional.of(tableLayoutHandleCodec); } @Override public Optional> getConnectorTableHandleCodec() { - return Optional.of(new TpcdsTableHandleCodec(thriftCodecManager)); + return Optional.of(tableHandleCodec); } } diff --git a/presto-tpcds/src/test/java/com/facebook/presto/tpcds/TestTpcdsWithCharColumnsAsChar.java b/presto-tpcds/src/test/java/com/facebook/presto/tpcds/TestTpcdsWithCharColumnsAsChar.java index b5bca766686f3..564329f1cdeed 100644 --- a/presto-tpcds/src/test/java/com/facebook/presto/tpcds/TestTpcdsWithCharColumnsAsChar.java +++ b/presto-tpcds/src/test/java/com/facebook/presto/tpcds/TestTpcdsWithCharColumnsAsChar.java @@ -14,6 +14,7 @@ package com.facebook.presto.tpcds; import com.facebook.presto.testing.QueryRunner; +import com.google.common.collect.ImmutableMap; public class TestTpcdsWithCharColumnsAsChar extends AbstractTestTpcds @@ -22,6 +23,7 @@ public class TestTpcdsWithCharColumnsAsChar protected QueryRunner createQueryRunner() throws Exception { - return TpcdsQueryRunner.createQueryRunner(); + return TpcdsQueryRunner.createQueryRunner( + ImmutableMap.of("use-connector-provided-serialization-codecs", "true")); } } diff --git a/presto-tpcds/src/test/java/com/facebook/presto/tpcds/TpcdsQueryRunner.java b/presto-tpcds/src/test/java/com/facebook/presto/tpcds/TpcdsQueryRunner.java index 8643bed4305d2..2074fd9963248 100644 --- a/presto-tpcds/src/test/java/com/facebook/presto/tpcds/TpcdsQueryRunner.java +++ b/presto-tpcds/src/test/java/com/facebook/presto/tpcds/TpcdsQueryRunner.java @@ -75,7 +75,9 @@ public static void main(String[] args) throws Exception { Logging.initialize(); - DistributedQueryRunner queryRunner = createQueryRunner(ImmutableMap.of("http-server.http.port", "8080")); + DistributedQueryRunner queryRunner = createQueryRunner(ImmutableMap.of( + "http-server.http.port", "8080", + "use-connector-provided-serialization-codecs", "true")); Thread.sleep(10); Logger log = Logger.get(TpcdsQueryRunner.class); log.info("======== SERVER STARTED ========"); diff --git a/presto-verifier/src/main/java/com/facebook/presto/verifier/framework/VerifierModule.java b/presto-verifier/src/main/java/com/facebook/presto/verifier/framework/VerifierModule.java index a509624565e33..a20fa69b207ef 100644 --- a/presto-verifier/src/main/java/com/facebook/presto/verifier/framework/VerifierModule.java +++ b/presto-verifier/src/main/java/com/facebook/presto/verifier/framework/VerifierModule.java @@ -19,6 +19,7 @@ import com.facebook.presto.common.block.BlockEncodingSerde; import com.facebook.presto.common.type.Type; import com.facebook.presto.common.type.TypeManager; +import com.facebook.presto.connector.ConnectorManager; import com.facebook.presto.metadata.CatalogManager; import com.facebook.presto.metadata.FunctionAndTypeManager; import com.facebook.presto.metadata.HandleJsonModule; @@ -112,6 +113,9 @@ protected final void setup(Binder binder) // catalog binder.bind(CatalogManager.class).in(Scopes.SINGLETON); + // connector + binder.bind(ConnectorManager.class).toProvider(() -> null).in(Scopes.SINGLETON); + // function binder.bind(FunctionAndTypeManager.class).in(SINGLETON); binder.bind(TableFunctionRegistry.class).in(SINGLETON);