Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 3 additions & 22 deletions src/main/java/com/uber/cadence/converter/JsonDataConverter.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,6 @@
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.thrift.TBase;
import org.apache.thrift.TDeserializer;
import org.apache.thrift.TSerializer;
import org.apache.thrift.protocol.TJSONProtocol;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -97,7 +94,9 @@ public JsonDataConverter(Function<GsonBuilder, GsonBuilder> builderInterceptor)
GsonBuilder gsonBuilder =
new GsonBuilder()
.serializeNulls()
.registerTypeAdapterFactory(new ThrowableTypeAdapterFactory());
.registerTypeAdapterFactory(new ThrowableTypeAdapterFactory())
.registerTypeAdapterFactory(new TBaseTypeAdapterFactory())
.registerTypeAdapterFactory(new TEnumTypeAdapterFactory());
GsonBuilder intercepted = builderInterceptor.apply(gsonBuilder);
gson = intercepted.create();
}
Expand All @@ -115,10 +114,6 @@ public byte[] toData(Object... values) throws DataConverterException {
try {
if (values.length == 1) {
Object value = values[0];
// Serialize thrift objects using Thrift serializer
if (value instanceof TBase) {
return newThriftSerializer().toString((TBase) value).getBytes(StandardCharsets.UTF_8);
}
try {
String json = gson.toJson(value);
return json.getBytes(StandardCharsets.UTF_8);
Expand Down Expand Up @@ -151,12 +146,6 @@ public <T> T fromData(byte[] content, Class<T> valueClass, Type valueType)
return null;
}
try {
// Deserialize thrift values.
if (TBase.class.isAssignableFrom(valueClass)) {
T instance = valueClass.getConstructor().newInstance();
newThriftDeserializer().deserialize((TBase) instance, content);
return instance;
}
return gson.fromJson(new String(content, StandardCharsets.UTF_8), valueType);
} catch (Exception e) {
throw new DataConverterException(content, new Type[] {valueType}, e);
Expand Down Expand Up @@ -389,12 +378,4 @@ private static StackTraceElement parseStackTraceElement(String line) {
}
return new StackTraceElement(declaringClass, methodName, fileName, lineNumber);
}

private static TSerializer newThriftSerializer() {
return new TSerializer(new TJSONProtocol.Factory());
}

private static TDeserializer newThriftDeserializer() {
return new TDeserializer(new TJSONProtocol.Factory());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License is
* located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.uber.cadence.converter;

import com.google.gson.Gson;
import com.google.gson.TypeAdapter;
import com.google.gson.TypeAdapterFactory;
import com.google.gson.reflect.TypeToken;
import com.google.gson.stream.JsonReader;
import com.google.gson.stream.JsonWriter;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import org.apache.thrift.TBase;
import org.apache.thrift.TDeserializer;
import org.apache.thrift.TException;
import org.apache.thrift.TSerializer;
import org.apache.thrift.protocol.TJSONProtocol;

/**
* Special handling of TBase message serialization and deserialization. This is to support for
* inline Thrift fields in Java class.
*/
public class TBaseTypeAdapterFactory implements TypeAdapterFactory {

@Override
public <T> TypeAdapter<T> create(Gson gson, TypeToken<T> typeToken) {
// this class only serializes 'TBase' and its subtypes
if (!TBase.class.isAssignableFrom(typeToken.getRawType())) {
return null;
}
TypeAdapter<T> result =
new TypeAdapter<T>() {
@Override
public void write(JsonWriter jsonWriter, T value) throws IOException {
try {
String result =
newThriftSerializer().toString((TBase) value, StandardCharsets.UTF_8.name());
jsonWriter.value(result);
} catch (TException e) {
throw new DataConverterException("Failed to serialize TBase", e);
}
}

@Override
public T read(JsonReader jsonReader) throws IOException {
String value = jsonReader.nextString();
try {
@SuppressWarnings("unchecked")
T instance = (T) typeToken.getRawType().getConstructor().newInstance();
newThriftDeserializer()
.deserialize((TBase) instance, value, StandardCharsets.UTF_8.name());
return instance;
} catch (Exception e) {
throw new DataConverterException("Failed to deserialize TBase", e);
}
}
}.nullSafe();
return result;
}

private static TSerializer newThriftSerializer() {
return new TSerializer(new TJSONProtocol.Factory());
}

private static TDeserializer newThriftDeserializer() {
return new TDeserializer(new TJSONProtocol.Factory());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License is
* located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.uber.cadence.converter;

import com.google.gson.Gson;
import com.google.gson.TypeAdapter;
import com.google.gson.TypeAdapterFactory;
import com.google.gson.reflect.TypeToken;
import com.google.gson.stream.JsonReader;
import com.google.gson.stream.JsonWriter;
import java.io.IOException;
import java.lang.reflect.Method;
import org.apache.thrift.TEnum;

/**
* Special handling of TEnum serialization and deserialization. This is to support for inline TEnum
* fields in Java class. The default gson serde serialize the TEnum with its String name
* representation, this adapter serialize the TEnum class with its int representation.
*/
public class TEnumTypeAdapterFactory implements TypeAdapterFactory {

@Override
public <T> TypeAdapter<T> create(Gson gson, TypeToken<T> typeToken) {
// this class only serializes 'TEnum' and its subtypes
if (!TEnum.class.isAssignableFrom(typeToken.getRawType())) {
return null;
}
TypeAdapter<T> result =
new TypeAdapter<T>() {
@Override
public void write(JsonWriter jsonWriter, T value) throws IOException {
jsonWriter.value(((TEnum) value).getValue());
}

@Override
public T read(JsonReader jsonReader) throws IOException {
int value = jsonReader.nextInt();
try {
Method m = (typeToken.getRawType().getDeclaredMethod("findByValue", Integer.TYPE));
@SuppressWarnings("unchecked")
T instance = (T) m.invoke(null, value);
return instance;
} catch (Exception e) {
throw new DataConverterException("Failed to deserilize TEnum", e);
}
}
}.nullSafe();
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -289,8 +289,7 @@ private void completeWorkflow() {
}
}

long nanoTime =
TimeUnit.NANOSECONDS.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
long nanoTime = TimeUnit.NANOSECONDS.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
com.uber.m3.util.Duration d = com.uber.m3.util.Duration.ofNanos(nanoTime - wfStartTimeNanos);
metricsScope.timer(MetricsType.WORKFLOW_E2E_LATENCY).record(d);
}
Expand Down
102 changes: 102 additions & 0 deletions src/test/java/com/uber/cadence/converter/JsonDataConverterTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import static org.junit.Assert.assertEquals;

import com.uber.cadence.EventType;
import com.uber.cadence.History;
import com.uber.cadence.HistoryEvent;
import com.uber.cadence.TaskList;
Expand All @@ -29,13 +30,44 @@
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import org.junit.Test;

public class JsonDataConverterTest {

private final DataConverter converter = JsonDataConverter.getInstance();

static class TestData {
String val1;
// TBase value;
HistoryEvent val2;
// TEnum value;
EventType val3;

public TestData(String val1, HistoryEvent val2, EventType val3) {
this.val1 = val1;
this.val2 = val2;
this.val3 = val3;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof TestData)) return false;
TestData testData = (TestData) o;
return Objects.equals(val1, testData.val1)
&& Objects.equals(val2, testData.val2)
&& val3 == testData.val3;
}

@Override
public int hashCode() {

return Objects.hash(val1, val2, val3);
}
}

@Test
public void testThrift() {
List<HistoryEvent> events = new ArrayList<>();
Expand All @@ -57,6 +89,76 @@ public void testThrift() {
assertEquals(new String(converted, StandardCharsets.UTF_8), history, fromConverted);
}

@Test
public void testThriftArray() {
List<HistoryEvent> events = new ArrayList<>();
WorkflowExecutionStartedEventAttributes started =
new WorkflowExecutionStartedEventAttributes()
.setExecutionStartToCloseTimeoutSeconds(11)
.setIdentity("testIdentity")
.setInput("input".getBytes(StandardCharsets.UTF_8))
.setWorkflowType(new WorkflowType().setName("workflowType1"))
.setTaskList(new TaskList().setName("taskList1"));
events.add(
new HistoryEvent()
.setTimestamp(1234567)
.setEventId(321)
.setWorkflowExecutionStartedEventAttributes(started));
History history = new History().setEvents(events);
byte[] converted = converter.toData("abc", history);
Object[] fromConverted = converter.fromDataArray(converted, String.class, History.class);
assertEquals(new String(converted, StandardCharsets.UTF_8), "abc", fromConverted[0]);
assertEquals(new String(converted, StandardCharsets.UTF_8), history, fromConverted[1]);
}

@Test
public void testThriftFieldsInPOJO() {
WorkflowExecutionStartedEventAttributes started =
new WorkflowExecutionStartedEventAttributes()
.setExecutionStartToCloseTimeoutSeconds(11)
.setIdentity("testIdentity")
.setInput("input".getBytes(StandardCharsets.UTF_8))
.setWorkflowType(new WorkflowType().setName("workflowType1"))
.setTaskList(new TaskList().setName("taskList1"));

HistoryEvent historyEvent =
new HistoryEvent()
.setTimestamp(1234567)
.setEventId(321)
.setWorkflowExecutionStartedEventAttributes(started);

TestData testData = new TestData("test-thrift", historyEvent, EventType.ActivityTaskCompleted);

byte[] converted = converter.toData(testData);
TestData fromConverted = converter.fromData(converted, TestData.class, TestData.class);
assertEquals(new String(converted, StandardCharsets.UTF_8), testData, fromConverted);
}

@Test
public void testThriftFieldsInPOJOArray() {
WorkflowExecutionStartedEventAttributes started =
new WorkflowExecutionStartedEventAttributes()
.setExecutionStartToCloseTimeoutSeconds(11)
.setIdentity("testIdentity")
.setInput("input".getBytes(StandardCharsets.UTF_8))
.setWorkflowType(new WorkflowType().setName("workflowType1"))
.setTaskList(new TaskList().setName("taskList1"));

HistoryEvent historyEvent =
new HistoryEvent()
.setTimestamp(1234567)
.setEventId(321)
.setWorkflowExecutionStartedEventAttributes(started);

TestData testData = new TestData("test-thrift", historyEvent, EventType.ActivityTaskCompleted);

byte[] converted = converter.toData("abc", testData);
Object[] fromConverted = converter.fromDataArray(converted, String.class, TestData.class);
assertEquals(new String(converted, StandardCharsets.UTF_8), "abc", fromConverted[0]);
assertEquals(new String(converted, StandardCharsets.UTF_8), testData, fromConverted[1]);
}


public static void foo(List<UUID> arg) {}

@Test
Expand Down