Skip to content

Commit 0c2f6eb

Browse files
committed
Address code review comments, fix test cases
1 parent a51062f commit 0c2f6eb

File tree

5 files changed

+231
-179
lines changed

5 files changed

+231
-179
lines changed

presto-docs/src/main/sphinx/admin/properties.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -572,7 +572,7 @@ improve network throughput for data transferred between stages if the
572572
network has high latency or if there are many nodes in the cluster.
573573

574574
``use-connector-provided-serialization-codecs``
575-
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
575+
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
576576

577577
* **Type:** ``boolean``
578578
* **Default value:** ``false``

presto-main-base/src/main/java/com/facebook/presto/metadata/AbstractTypedJacksonModule.java

Lines changed: 7 additions & 177 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,10 @@
1818
import com.fasterxml.jackson.annotation.JsonTypeInfo.Id;
1919
import com.fasterxml.jackson.core.JsonGenerator;
2020
import com.fasterxml.jackson.core.JsonParser;
21-
import com.fasterxml.jackson.core.JsonToken;
22-
import com.fasterxml.jackson.core.TreeNode;
2321
import com.fasterxml.jackson.core.Version;
2422
import com.fasterxml.jackson.databind.DatabindContext;
2523
import com.fasterxml.jackson.databind.DeserializationContext;
2624
import com.fasterxml.jackson.databind.JavaType;
27-
import com.fasterxml.jackson.databind.JsonDeserializer;
2825
import com.fasterxml.jackson.databind.JsonMappingException;
2926
import com.fasterxml.jackson.databind.JsonSerializer;
3027
import com.fasterxml.jackson.databind.SerializerProvider;
@@ -36,15 +33,13 @@
3633
import com.fasterxml.jackson.databind.jsontype.impl.AsPropertyTypeSerializer;
3734
import com.fasterxml.jackson.databind.jsontype.impl.TypeIdResolverBase;
3835
import com.fasterxml.jackson.databind.module.SimpleModule;
39-
import com.fasterxml.jackson.databind.node.ObjectNode;
4036
import com.fasterxml.jackson.databind.ser.BeanSerializerFactory;
4137
import com.fasterxml.jackson.databind.ser.std.StdSerializer;
4238
import com.fasterxml.jackson.databind.type.TypeFactory;
4339
import com.google.common.cache.Cache;
4440
import com.google.common.cache.CacheBuilder;
4541

4642
import java.io.IOException;
47-
import java.util.Base64;
4843
import java.util.Optional;
4944
import java.util.concurrent.ExecutionException;
5045
import java.util.function.Function;
@@ -75,8 +70,13 @@ protected AbstractTypedJacksonModule(
7570

7671
if (binarySerializationEnabled) {
7772
// Use codec serialization
78-
addSerializer(baseClass, new CodecSerializer<>(nameResolver, classResolver, codecExtractor));
79-
addDeserializer(baseClass, new CodecDeserializer<>(classResolver, codecExtractor));
73+
addSerializer(baseClass, new CodecSerializer<>(
74+
TYPE_PROPERTY,
75+
DATA_PROPERTY,
76+
codecExtractor,
77+
nameResolver,
78+
new InternalTypeResolver<>(nameResolver, classResolver)));
79+
addDeserializer(baseClass, new CodecDeserializer<>(TYPE_PROPERTY, DATA_PROPERTY, codecExtractor, classResolver));
8080
}
8181
else {
8282
// Use legacy typed serialization
@@ -86,176 +86,6 @@ protected AbstractTypedJacksonModule(
8686
}
8787
}
8888

89-
private static class CodecSerializer<T>
90-
extends JsonSerializer<T>
91-
{
92-
private final Function<T, String> nameResolver;
93-
private final Function<String, Class<? extends T>> classResolver;
94-
private final Function<ConnectorId, Optional<ConnectorCodec<T>>> codecExtractor;
95-
private final TypeIdResolver typeResolver;
96-
private final TypeSerializer typeSerializer;
97-
private final Cache<Class<?>, JsonSerializer<Object>> serializerCache = CacheBuilder.newBuilder().build();
98-
99-
public CodecSerializer(
100-
Function<T, String> nameResolver,
101-
Function<String, Class<? extends T>> classResolver,
102-
Function<ConnectorId, Optional<ConnectorCodec<T>>> codecExtractor)
103-
{
104-
this.nameResolver = requireNonNull(nameResolver, "nameResolver is null");
105-
this.classResolver = requireNonNull(classResolver, "classResolver is null");
106-
this.codecExtractor = requireNonNull(codecExtractor, "codecExtractor is null");
107-
this.typeResolver = new InternalTypeResolver<>(nameResolver, classResolver);
108-
this.typeSerializer = new AsPropertyTypeSerializer(typeResolver, null, TYPE_PROPERTY);
109-
}
110-
111-
@Override
112-
public void serialize(T value, JsonGenerator jsonGenerator, SerializerProvider provider)
113-
throws IOException
114-
{
115-
if (value == null) {
116-
jsonGenerator.writeNull();
117-
return;
118-
}
119-
120-
String connectorIdString = nameResolver.apply(value);
121-
122-
// Only try binary serialization for actual connectors (not internal handles like "$remote")
123-
if (!connectorIdString.startsWith("$")) {
124-
ConnectorId connectorId = new ConnectorId(connectorIdString);
125-
126-
// Check if connector has a binary codec
127-
Optional<ConnectorCodec<T>> codec = codecExtractor.apply(connectorId);
128-
if (codec.isPresent()) {
129-
// Use binary serialization with flat structure
130-
jsonGenerator.writeStartObject();
131-
jsonGenerator.writeStringField(TYPE_PROPERTY, connectorIdString);
132-
byte[] data = codec.get().serialize(value);
133-
jsonGenerator.writeStringField(DATA_PROPERTY, Base64.getEncoder().encodeToString(data));
134-
jsonGenerator.writeEndObject();
135-
return;
136-
}
137-
}
138-
139-
// Fall back to legacy typed JSON serialization
140-
// Use the InternalTypeSerializer approach which adds @type for polymorphic deserialization
141-
try {
142-
Class<?> type = value.getClass();
143-
JsonSerializer<Object> serializer = serializerCache.get(type, () -> createSerializer(provider, type));
144-
145-
// Serialize with type information
146-
serializer.serializeWithType(value, jsonGenerator, provider, typeSerializer);
147-
}
148-
catch (ExecutionException e) {
149-
Throwable cause = e.getCause();
150-
if (cause != null) {
151-
throwIfInstanceOf(cause, IOException.class);
152-
}
153-
throw new RuntimeException(e);
154-
}
155-
}
156-
157-
@SuppressWarnings("unchecked")
158-
private static JsonSerializer<Object> createSerializer(SerializerProvider provider, Class<?> type)
159-
throws JsonMappingException
160-
{
161-
JavaType javaType = provider.constructType(type);
162-
return (JsonSerializer<Object>) BeanSerializerFactory.instance.createSerializer(provider, javaType);
163-
}
164-
165-
@Override
166-
public void serializeWithType(T value, JsonGenerator gen,
167-
SerializerProvider serializers, TypeSerializer typeSer)
168-
throws IOException
169-
{
170-
serialize(value, gen, serializers);
171-
}
172-
}
173-
174-
private static class CodecDeserializer<T>
175-
extends JsonDeserializer<T>
176-
{
177-
private final Function<String, Class<? extends T>> classResolver;
178-
private final Function<ConnectorId, Optional<ConnectorCodec<T>>> codecExtractor;
179-
180-
public CodecDeserializer(
181-
Function<String, Class<? extends T>> classResolver,
182-
Function<ConnectorId, Optional<ConnectorCodec<T>>> codecExtractor)
183-
{
184-
this.classResolver = requireNonNull(classResolver, "classResolver is null");
185-
this.codecExtractor = requireNonNull(codecExtractor, "codecExtractor is null");
186-
}
187-
188-
@Override
189-
public T deserialize(JsonParser parser, DeserializationContext context)
190-
throws IOException
191-
{
192-
if (parser.getCurrentToken() == JsonToken.VALUE_NULL) {
193-
return null;
194-
}
195-
196-
if (parser.getCurrentToken() != JsonToken.START_OBJECT) {
197-
throw new IOException("Expected START_OBJECT, got " + parser.getCurrentToken());
198-
}
199-
200-
// Parse the JSON tree
201-
TreeNode tree = parser.readValueAsTree();
202-
203-
if (tree instanceof ObjectNode) {
204-
ObjectNode node = (ObjectNode) tree;
205-
206-
// Get the @type field
207-
if (!node.has(TYPE_PROPERTY)) {
208-
throw new IOException("Missing " + TYPE_PROPERTY + " field");
209-
}
210-
String connectorIdString = node.get(TYPE_PROPERTY).asText();
211-
// Check if @data field is present (binary serialization)
212-
if (node.has(DATA_PROPERTY)) {
213-
// Binary data is present, we need a codec to deserialize it
214-
// Special handling for internal handles like "$remote"
215-
if (!connectorIdString.startsWith("$")) {
216-
ConnectorId connectorId = new ConnectorId(connectorIdString);
217-
Optional<ConnectorCodec<T>> codec = codecExtractor.apply(connectorId);
218-
if (codec.isPresent()) {
219-
String base64Data = node.get(DATA_PROPERTY).asText();
220-
byte[] data = Base64.getDecoder().decode(base64Data);
221-
return codec.get().deserialize(data);
222-
}
223-
}
224-
// @data field present but no codec available or internal handle
225-
throw new IOException("Type " + connectorIdString + " has binary data (customSerializedValue field) but no codec available to deserialize it");
226-
}
227-
228-
// No @data field - use standard JSON deserialization
229-
Class<? extends T> handleClass = classResolver.apply(connectorIdString);
230-
231-
// Remove the @type field and deserialize the remaining content
232-
node.remove(TYPE_PROPERTY);
233-
return context.readTreeAsValue(node, handleClass);
234-
}
235-
236-
throw new IOException("Unable to deserialize");
237-
}
238-
239-
@Override
240-
public T deserializeWithType(JsonParser p, DeserializationContext ctxt,
241-
TypeDeserializer typeDeserializer)
242-
throws IOException
243-
{
244-
// We handle the type ourselves
245-
return deserialize(p, ctxt);
246-
}
247-
248-
@Override
249-
public T deserializeWithType(JsonParser p, DeserializationContext ctxt,
250-
TypeDeserializer typeDeserializer, T intoValue)
251-
throws IOException
252-
{
253-
// We handle the type ourselves
254-
return deserialize(p, ctxt);
255-
}
256-
}
257-
258-
// Legacy classes for backward compatibility
25989
private static class InternalTypeDeserializer<T>
26090
extends StdDeserializer<T>
26191
{
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
package com.facebook.presto.metadata;
2+
3+
import com.facebook.presto.spi.ConnectorCodec;
4+
import com.facebook.presto.spi.ConnectorId;
5+
import com.fasterxml.jackson.core.JsonParser;
6+
import com.fasterxml.jackson.core.JsonToken;
7+
import com.fasterxml.jackson.core.TreeNode;
8+
import com.fasterxml.jackson.databind.DeserializationContext;
9+
import com.fasterxml.jackson.databind.JsonDeserializer;
10+
import com.fasterxml.jackson.databind.jsontype.TypeDeserializer;
11+
import com.fasterxml.jackson.databind.node.ObjectNode;
12+
13+
import java.io.IOException;
14+
import java.util.Base64;
15+
import java.util.Optional;
16+
import java.util.function.Function;
17+
18+
import static java.util.Objects.requireNonNull;
19+
20+
class CodecDeserializer<T>
21+
extends JsonDeserializer<T>
22+
{
23+
private final Function<String, Class<? extends T>> classResolver;
24+
private final Function<ConnectorId, Optional<ConnectorCodec<T>>> codecExtractor;
25+
private final String typePropertyName;
26+
private final String dataPropertyName;
27+
28+
public CodecDeserializer(
29+
String typePropertyName,
30+
String dataPropertyName,
31+
Function<ConnectorId, Optional<ConnectorCodec<T>>> codecExtractor,
32+
Function<String, Class<? extends T>> classResolver)
33+
{
34+
this.classResolver = requireNonNull(classResolver, "classResolver is null");
35+
this.codecExtractor = requireNonNull(codecExtractor, "codecExtractor is null");
36+
this.typePropertyName = requireNonNull(typePropertyName, "typePropertyName is null");
37+
this.dataPropertyName = requireNonNull(dataPropertyName, "dataPropertyName is null");
38+
}
39+
40+
@Override
41+
public T deserialize(JsonParser parser, DeserializationContext context)
42+
throws IOException
43+
{
44+
if (parser.getCurrentToken() == JsonToken.VALUE_NULL) {
45+
return null;
46+
}
47+
48+
if (parser.getCurrentToken() != JsonToken.START_OBJECT) {
49+
throw new IOException("Expected START_OBJECT, got " + parser.getCurrentToken());
50+
}
51+
52+
// Parse the JSON tree
53+
TreeNode tree = parser.readValueAsTree();
54+
55+
if (tree instanceof ObjectNode) {
56+
ObjectNode node = (ObjectNode) tree;
57+
58+
// Get the @type field
59+
if (!node.has(typePropertyName)) {
60+
throw new IOException("Missing " + typePropertyName + " field");
61+
}
62+
String connectorIdString = node.get(typePropertyName).asText();
63+
// Check if @data field is present (binary serialization)
64+
if (node.has(dataPropertyName)) {
65+
// Binary data is present, we need a codec to deserialize it
66+
// Special handling for internal handles like "$remote"
67+
if (!connectorIdString.startsWith("$")) {
68+
ConnectorId connectorId = new ConnectorId(connectorIdString);
69+
Optional<ConnectorCodec<T>> codec = codecExtractor.apply(connectorId);
70+
if (codec.isPresent()) {
71+
String base64Data = node.get(dataPropertyName).asText();
72+
byte[] data = Base64.getDecoder().decode(base64Data);
73+
return codec.get().deserialize(data);
74+
}
75+
}
76+
// @data field present but no codec available or internal handle
77+
throw new IOException("Type " + connectorIdString + " has binary data (customSerializedValue field) but no codec available to deserialize it");
78+
}
79+
80+
// No @data field - use standard JSON deserialization
81+
Class<? extends T> handleClass = classResolver.apply(connectorIdString);
82+
83+
// Remove the @type field and deserialize the remaining content
84+
node.remove(typePropertyName);
85+
return context.readTreeAsValue(node, handleClass);
86+
}
87+
88+
throw new IOException("Unable to deserialize");
89+
}
90+
91+
@Override
92+
public T deserializeWithType(JsonParser p, DeserializationContext ctxt,
93+
TypeDeserializer typeDeserializer)
94+
throws IOException
95+
{
96+
// We handle the type ourselves
97+
return deserialize(p, ctxt);
98+
}
99+
100+
@Override
101+
public T deserializeWithType(JsonParser p, DeserializationContext ctxt,
102+
TypeDeserializer typeDeserializer, T intoValue)
103+
throws IOException
104+
{
105+
// We handle the type ourselves
106+
return deserialize(p, ctxt);
107+
}
108+
}

0 commit comments

Comments
 (0)