Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
import com.facebook.presto.common.type.Type;
import com.facebook.presto.common.type.TypeManager;
import com.facebook.presto.metadata.FunctionAndTypeManager;
import com.facebook.presto.metadata.HandleJsonModule;
import com.facebook.presto.metadata.HandleResolver;
import com.facebook.presto.metadata.TestingHandleJsonModule;
import com.facebook.presto.sql.analyzer.FeaturesConfig;
import com.facebook.presto.type.TypeDeserializer;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -91,7 +91,7 @@ private JsonCodec<DeltaTableHandle> getJsonCodec()
{
Module module = binder -> {
binder.install(new JsonModule());
binder.install(new HandleJsonModule());
binder.install(new TestingHandleJsonModule());
configBinder(binder).bindConfig(FeaturesConfig.class);
FunctionAndTypeManager functionAndTypeManager = createTestFunctionAndTypeManager();
binder.bind(TypeManager.class).toInstance(functionAndTypeManager);
Expand Down
18 changes: 18 additions & 0 deletions presto-docs/src/main/sphinx/admin/properties.rst
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,24 @@ shared across all of the partitioned consumers. Increasing this value may
improve network throughput for data transferred between stages if the
network has high latency or if there are many nodes in the cluster.

``use-connector-provided-serialization-codecs``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^


* **Type:** ``boolean``
* **Default value:** ``false``

Enables the use of custom connector-provided serialization codecs for handles.
This feature allows connectors to use their own serialization format for
handle objects (such as table handles, column handles, and splits) instead
of standard JSON serialization.

When enabled, connectors that provide a ``ConnectorCodecProvider`` with
appropriate codecs will have their handles serialized using custom binary
formats, which are then Base64-encoded for transport. Connectors without
codec support automatically fall back to standard JSON serialization.
Internal Presto handles (prefixed with ``$``) always use JSON serialization
regardless of this setting.

.. _task-properties:

Task Properties
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
import com.facebook.presto.hive.metastore.Storage;
import com.facebook.presto.hive.metastore.StorageFormat;
import com.facebook.presto.metadata.FunctionAndTypeManager;
import com.facebook.presto.metadata.HandleJsonModule;
import com.facebook.presto.metadata.HandleResolver;
import com.facebook.presto.metadata.TestingHandleJsonModule;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.HostAddress;
import com.facebook.presto.spi.SplitWeight;
Expand Down Expand Up @@ -153,8 +153,8 @@ private JsonCodec<HiveSplit> getJsonCodec()
{
Module module = binder -> {
binder.install(new JsonModule());
binder.install(new HandleJsonModule());
configBinder(binder).bindConfig(FeaturesConfig.class);
binder.install(new TestingHandleJsonModule());
FunctionAndTypeManager functionAndTypeManager = createTestFunctionAndTypeManager();
binder.bind(TypeManager.class).toInstance(functionAndTypeManager);
jsonBinder(binder).addDeserializerBinding(Type.class).to(TypeDeserializer.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,16 @@
*/
package com.facebook.presto.connector;

import com.facebook.airlift.json.JsonCodec;
import com.facebook.presto.common.block.BlockEncodingSerde;
import com.facebook.presto.common.predicate.TupleDomain;
import com.facebook.presto.common.type.TypeManager;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorSystemConfig;
import com.facebook.presto.spi.NodeManager;
import com.facebook.presto.spi.PageIndexerFactory;
import com.facebook.presto.spi.PageSorter;
import com.facebook.presto.spi.TupleDomainSerde;
import com.facebook.presto.spi.connector.ConnectorContext;
import com.facebook.presto.spi.function.FunctionMetadataManager;
import com.facebook.presto.spi.function.StandardFunctionResolution;
Expand All @@ -40,6 +44,7 @@ public class ConnectorContextInstance
private final FilterStatsCalculatorService filterStatsCalculatorService;
private final BlockEncodingSerde blockEncodingSerde;
private final ConnectorSystemConfig connectorSystemConfig;
private final TupleDomainSerde tupleDomainSerde;

public ConnectorContextInstance(
NodeManager nodeManager,
Expand All @@ -51,7 +56,8 @@ public ConnectorContextInstance(
RowExpressionService rowExpressionService,
FilterStatsCalculatorService filterStatsCalculatorService,
BlockEncodingSerde blockEncodingSerde,
ConnectorSystemConfig connectorSystemConfig)
ConnectorSystemConfig connectorSystemConfig,
JsonCodec<TupleDomain<ColumnHandle>> tupleDomainJsonCodec)
{
this.nodeManager = requireNonNull(nodeManager, "nodeManager is null");
this.typeManager = requireNonNull(typeManager, "typeManager is null");
Expand All @@ -63,6 +69,7 @@ public ConnectorContextInstance(
this.filterStatsCalculatorService = requireNonNull(filterStatsCalculatorService, "filterStatsCalculatorService is null");
this.blockEncodingSerde = requireNonNull(blockEncodingSerde, "blockEncodingSerde is null");
this.connectorSystemConfig = requireNonNull(connectorSystemConfig, "connectorSystemConfig is null");
this.tupleDomainSerde = new JsonCodecTupleDomainSerde(tupleDomainJsonCodec);
}

@Override
Expand Down Expand Up @@ -124,4 +131,10 @@ public ConnectorSystemConfig getConnectorSystemConfig()
{
return connectorSystemConfig;
}

@Override
public TupleDomainSerde getTupleDomainSerde()
{
return tupleDomainSerde;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@
*/
package com.facebook.presto.connector;

import com.facebook.airlift.json.JsonCodec;
import com.facebook.airlift.log.Logger;
import com.facebook.airlift.node.NodeInfo;
import com.facebook.presto.common.CatalogSchemaName;
import com.facebook.presto.common.block.BlockEncodingSerde;
import com.facebook.presto.common.predicate.TupleDomain;
import com.facebook.presto.common.type.TypeManager;
import com.facebook.presto.connector.informationSchema.InformationSchemaConnector;
import com.facebook.presto.connector.system.DelegatingSystemTablesProvider;
Expand All @@ -33,6 +35,7 @@
import com.facebook.presto.metadata.InternalNodeManager;
import com.facebook.presto.metadata.MetadataManager;
import com.facebook.presto.security.AccessControlManager;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorId;
import com.facebook.presto.spi.ConnectorSystemConfig;
import com.facebook.presto.spi.PageIndexerFactory;
Expand Down Expand Up @@ -122,6 +125,7 @@ public class ConnectorManager
private final BlockEncodingSerde blockEncodingSerde;
private final ConnectorSystemConfig connectorSystemConfig;
private final ConnectorCodecManager connectorCodecManager;
private final JsonCodec<TupleDomain<ColumnHandle>> tupleDomainJsonCodec;

@GuardedBy("this")
private final ConcurrentMap<String, ConnectorFactory> connectorFactories = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -156,7 +160,8 @@ public ConnectorManager(
FilterStatsCalculator filterStatsCalculator,
BlockEncodingSerde blockEncodingSerde,
FeaturesConfig featuresConfig,
ConnectorCodecManager connectorCodecManager)
ConnectorCodecManager connectorCodecManager,
JsonCodec<TupleDomain<ColumnHandle>> tupleDomainCodec)
{
this.metadataManager = requireNonNull(metadataManager, "metadataManager is null");
this.catalogManager = requireNonNull(catalogManager, "catalogManager is null");
Expand All @@ -182,6 +187,7 @@ public ConnectorManager(
this.blockEncodingSerde = requireNonNull(blockEncodingSerde, "blockEncodingSerde is null");
this.connectorSystemConfig = () -> featuresConfig.isNativeExecutionEnabled();
this.connectorCodecManager = requireNonNull(connectorCodecManager, "connectorThriftCodecManager is null");
this.tupleDomainJsonCodec = requireNonNull(tupleDomainCodec, "tupleDomainCodec is null");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my own learning, why treat tupledomain as a special case and why having its serde in connector context instead of providing its serde from connector codec provider? Thanks

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TupleDomain values are serialized as blocks, which don't have a generic means to be serialized. The runtime must provide that code.

}

@PreDestroy
Expand Down Expand Up @@ -386,13 +392,24 @@ private Connector createConnector(ConnectorId connectorId, ConnectorFactory fact
new RowExpressionFormatter(metadataManager.getFunctionAndTypeManager())),
new ConnectorFilterStatsCalculatorService(filterStatsCalculator),
blockEncodingSerde,
connectorSystemConfig);
connectorSystemConfig,
tupleDomainJsonCodec);

try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(factory.getClass().getClassLoader())) {
return factory.create(connectorId.getCatalogName(), properties, context);
}
}

public Optional<ConnectorCodecProvider> getConnectorCodecProvider(ConnectorId connectorId)
{
requireNonNull(connectorId, "connectorId is null");
MaterializedConnector materializedConnector = connectors.get(connectorId);
if (materializedConnector == null) {
return Optional.empty();
}
return materializedConnector.getConnectorCodecProvider();
}

private static class MaterializedConnector
{
private final ConnectorId connectorId;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.connector;

import com.facebook.airlift.json.JsonCodec;
import com.facebook.presto.common.predicate.TupleDomain;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.TupleDomainSerde;

import static java.util.Objects.requireNonNull;

class JsonCodecTupleDomainSerde
implements TupleDomainSerde
{
private final JsonCodec<TupleDomain<ColumnHandle>> tupleDomainJsonCodec;

public JsonCodecTupleDomainSerde(JsonCodec<TupleDomain<ColumnHandle>> tupleDomainJsonCodec)
{
this.tupleDomainJsonCodec = requireNonNull(tupleDomainJsonCodec, "tupleDomainJsonCodec is null");
}

@Override
public String serialize(TupleDomain<ColumnHandle> tupleDomain)
{
return tupleDomainJsonCodec.toJson(tupleDomain);
}

@Override
public TupleDomain<ColumnHandle> deserialize(String serialized)
{
return tupleDomainJsonCodec.fromJson(serialized);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,30 @@
*/
package com.facebook.presto.index;

import com.facebook.presto.connector.ConnectorManager;
import com.facebook.presto.metadata.AbstractTypedJacksonModule;
import com.facebook.presto.metadata.HandleResolver;
import com.facebook.presto.spi.ConnectorIndexHandle;
import com.facebook.presto.spi.connector.ConnectorCodecProvider;
import com.facebook.presto.sql.analyzer.FeaturesConfig;
import jakarta.inject.Inject;
import jakarta.inject.Provider;

public class IndexHandleJacksonModule
extends AbstractTypedJacksonModule<ConnectorIndexHandle>
{
@Inject
public IndexHandleJacksonModule(HandleResolver handleResolver)
public IndexHandleJacksonModule(
HandleResolver handleResolver,
Provider<ConnectorManager> connectorManagerProvider,
FeaturesConfig featuresConfig)
{
super(ConnectorIndexHandle.class,
handleResolver::getId,
handleResolver::getIndexHandleClass);
handleResolver::getIndexHandleClass,
featuresConfig.isUseConnectorProvidedSerializationCodecs(),
connectorId -> connectorManagerProvider.get()
.getConnectorCodecProvider(connectorId)
.flatMap(ConnectorCodecProvider::getConnectorIndexHandleCodec));
}
}
Loading
Loading