Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .github/workflows/test_scala_no_spark.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,8 @@ jobs:
run: |
export SBT_OPTS="-Xmx8G -Xms2G"
sbt "++ 2.12.18 hub/test"

- name: Run router tests
run: |
export SBT_OPTS="-Xmx8G -Xms2G"
sbt "++ 2.12.18 router/test"
24 changes: 22 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ inThisBuild(
lazy val supportedVersions = List(scala_2_12) // List(scala211, scala212, scala213)

lazy val root = (project in file("."))
.aggregate(api, aggregator, online, spark, flink, cloud_gcp, cloud_aws, hub)
.aggregate(api, aggregator, online, spark, flink, cloud_gcp, cloud_aws, hub, router)
.settings(name := "chronon")

val spark_sql = Seq(
Expand Down Expand Up @@ -260,7 +260,7 @@ lazy val hub = (project in file("hub"))
"org.scalatestplus" %% "mockito-3-4" % "3.2.10.0" % "test",
"org.scala-lang.modules" %% "scala-xml" % "2.1.0",
"org.scala-lang.modules" %% "scala-parser-combinators" % "2.3.0",
"org.scala-lang.modules" %% "scala-java8-compat" % "1.0.2"
"org.scala-lang.modules" %% "scala-java8-compat" % "1.0.2",
),
libraryDependencies ++= circe,
libraryDependencySchemes ++= Seq(
Expand All @@ -283,6 +283,26 @@ lazy val hub = (project in file("hub"))
)
)

lazy val router = project
.dependsOn(api.%("compile->compile;test->test"))
.settings(
libraryDependencies ++= Seq(
"io.vertx" % "vertx-core" % "4.5.9",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pulled out the vertx deps into a collection in my PR to migrate play to vertx, so post rebase you could pull that collection in

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

"io.vertx" % "vertx-web" % "4.5.9",
"io.vertx" % "vertx-web-client" % "4.5.9",
"io.vertx" % "vertx-junit5" % "4.5.9",
"org.slf4j" % "slf4j-api" % "1.7.36", // match with spark
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have a val declared for the slf4j version - could use that

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

"com.fasterxml.jackson.core" % "jackson-databind" % "2.15.2", // pinned from elsewhere
"org.junit.jupiter" % "junit-jupiter-api" % "5.10.5" % Test
),
libraryDependencies ++= {
if (System.getProperty("os.name").toLowerCase.contains("mac"))
Seq("io.netty" % "netty-resolver-dns-native-macos" % "4.1.115.Final" classifier "osx-aarch_64")
else
Seq.empty
}
)

ThisBuild / assemblyMergeStrategy := {
case PathList("META-INF", "MANIFEST.MF") => MergeStrategy.discard
case PathList("META-INF", _*) => MergeStrategy.filterDistinctLines
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,12 @@ import ai.chronon.api._
import ai.chronon.api.thrift.TDeserializer
import ai.chronon.api.thrift.TSerializer
import ai.chronon.api.thrift.protocol.TBinaryProtocol
import ai.chronon.api.thrift.protocol.TProtocolFactory
import ai.chronon.online.KVStore
import ai.chronon.online.KVStore.GetRequest
import ai.chronon.online.MetadataStore
import ai.chronon.online.stats.DriftStore.binaryDeserializer
import ai.chronon.online.stats.DriftStore.binarySerializer

import java.io.Serializable
import scala.concurrent.Future
import scala.util.Failure
import scala.util.Success
Expand Down Expand Up @@ -196,10 +194,6 @@ class DriftStore(kvStore: KVStore,
}

object DriftStore {
class SerializableSerializer(factory: TProtocolFactory) extends TSerializer(factory) with Serializable

// crazy bug in compact protocol - do not change to compact

@transient
lazy val binarySerializer: ThreadLocal[TSerializer] = new ThreadLocal[TSerializer] {
override def initialValue(): TSerializer = new TSerializer(new TBinaryProtocol.Factory())
Expand Down
50 changes: 50 additions & 0 deletions router/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# Why?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the other related PR - #118, I pulled some of the common Vert.x code between Hub & fetcher in a module - service_commons, wdyt of folding router into that?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


We have a lot of glue code that maps data into objects across system and language boundaries.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for writing up this Readme & documenting!

This glue code adds effort, reduces maintainability with added indirection and reduces type-safety.

![img.png](img.png)

We aim to reduce this glue code instead by using serialization protocols that generate code
into various languages. We choose Thrift because it is the most entrenched protocol for the Chronon codebase already.

This module adds ability to make REST api's type-safe and boilerplate free by mapping requests into thrift objects
automagically (via reflection). Developers still have full control over url design - as they did with vert.x before.


## Usage

We basically translate a `Func<Input, Output>` -> `Func<RequestContext, Response>` via reflection to achieve this


Thrift def
```c
struct TileKey {
1: optional string column
2: optional string slice
3: optional string name
4: optional i64 sizeMillis
}
```


Route declaration
```java
Function<TileKey, TileKey> thriftTransformer = input -> input; // some dummy function

router.get("/thrift_api/column/:column/slice/:slice")
.handler(RouteHandlerWrapper.createHandler(thriftTransformer, TileKey.class));
```

Requesting
```java
client.get("/thrift_api/column/my_col/slice/my_slice")
.addQueryParam("name", "my_name")
.addQueryParam("sizeMillis", "5")
.send()
```

Response
```json
{"column":"my_col","slice":"my_slice","name":"my_name","sizeMillis":5}
```
Binary file added router/img.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
19 changes: 19 additions & 0 deletions router/src/main/java/ai/chronon/router/BinaryResponse.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package ai.chronon.router;

public class BinaryResponse {
private final String data;
private final String contentType;

public BinaryResponse(String data, String contentType) {
this.data = data;
this.contentType = contentType;
}

public String getData() {
return data;
}

public String getContentType() {
return contentType;
}
}
232 changes: 232 additions & 0 deletions router/src/main/java/ai/chronon/router/RouteHandlerWrapper.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
package ai.chronon.router;

import ai.chronon.api.thrift.*;
import ai.chronon.api.thrift.protocol.TBinaryProtocol;
import ai.chronon.api.thrift.transport.TTransportException;
import io.vertx.core.Handler;
import io.vertx.ext.web.RoutingContext;
import io.vertx.core.json.JsonObject;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Type;
import java.lang.reflect.ParameterizedType;
import java.io.StringWriter;
import java.io.PrintWriter;
import java.util.*;
import java.util.stream.Collectors;
import java.util.function.Function;

/**
* Wrapper class for creating Route handlers that map parameters to an Input object and transform it to Output
* The wrapped handler produces a JSON response.
* TODO: Add support for Thrift BinaryProtocol based serialization based on a special request query param.
*/
public class RouteHandlerWrapper {

public static String responseContentTypeHeaderName = "response-content-type";
public static String tbinaryB64TypeHeaderValue = "application/tbinary-b64";

private static final ThreadLocal<TSerializer> binarySerializer = ThreadLocal.withInitial(() -> {
try {
return new TSerializer(new TBinaryProtocol.Factory());
} catch (TTransportException e) {
throw new RuntimeException(e);
}
});

private static final ThreadLocal<TDeserializer> binaryDeSerializer = ThreadLocal.withInitial(() -> {
try {
return new TDeserializer(new TBinaryProtocol.Factory());
} catch (TTransportException e) {
throw new RuntimeException(e);
}
});

private static final ThreadLocal<Base64.Encoder> base64Encoder = ThreadLocal.withInitial(Base64::getEncoder);
private static final ThreadLocal<Base64.Decoder> base64Decoder = ThreadLocal.withInitial(Base64::getDecoder);


public static <T extends TBase> T deserializeTBinaryBase64(String base64Data, Class<? extends TBase> clazz)
throws NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException, TException {
byte[] binaryData = base64Decoder.get().decode(base64Data);
T tb = (T) clazz.getDeclaredConstructor().newInstance();
binaryDeSerializer.get().deserialize(tb, binaryData);
return tb;
}

/**
* Creates a RoutingContext handler that maps parameters to an Input object and transforms it to Output
*
* @param transformer Function to convert from Input to Output
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one thing to maybe call out explicitly is the transformer is responsible for wrapping the work in a vert.x future if its time consuming. Else we'll quickly exhaust event threads

* @param inputClass Class object for the Input type
* @param <I> Input type with setter methods
* @param <O> Output type
* @return Handler for RoutingContext that produces Output
*/
public static <I, O> Handler<RoutingContext> createHandler(
Function<I, O> transformer,
Class<I> inputClass) {

return ctx -> {
try {
// Create map with path parameters
Map<String, String> params = new HashMap<>(ctx.pathParams());

// Add query parameters
for (Map.Entry<String, String> entry : ctx.queryParams().entries()) {
params.put(entry.getKey(), entry.getValue());
}

I input = createInputFromParams(params, inputClass);
O output = transformer.apply(input);

String responseFormat = ctx.request().getHeader(responseContentTypeHeaderName);
if (responseFormat == null || responseFormat.equals("application/json")) {
// Send json response
ctx.response()
.putHeader("content-type", "application/json")
.end(JsonObject.mapFrom(output).encode());
} else {
if (!responseFormat.equals(tbinaryB64TypeHeaderValue)) {
throw new IllegalArgumentException(
"Unsupported response-content-type: " + responseFormat
+ ". Supported values are: " + tbinaryB64TypeHeaderValue);
}

// Try casting output to TBase - will fail if the output is not a Thrift object
TBase<?, TFieldIdEnum> tb = (TBase<?, TFieldIdEnum>) output;
// Serialize output to Thrift BinaryProtocol
byte[] serializedOutput = binarySerializer.get().serialize(tb);
String responseBase64 = base64Encoder.get().encodeToString(serializedOutput);

BinaryResponse binaryResponse = new BinaryResponse(
responseBase64, tbinaryB64TypeHeaderValue);

ctx.response().putHeader("content-type", "application/json")
.end(JsonObject.mapFrom(binaryResponse).encode());
}
} catch (Exception e) {
e.printStackTrace();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets log here instead of printstack trace. Our logging configs will take care of dumping the st

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw);
e.printStackTrace(pw);
String stackTrace = sw.toString();

ctx.response()
.setStatusCode(400)
.putHeader("content-type", "application/json")
.end(new JsonObject()
.put("error", stackTrace)
.encode());
}
};
}

private static <I> I createInputFromParams(Map<String, String> params, Class<I> inputClass)
throws Exception {
// Create new instance using no-args constructor
I input = inputClass.getDeclaredConstructor().newInstance();

// Get all methods
Method[] methods = inputClass.getMethods();

// Find and invoke setters for matching parameters
for (Method method : methods) {
if (isSetter(method)) {
String fieldName = getFieldNameFromSetter(method);
String paramValue = params.get(fieldName);

if (paramValue != null) {
Type paramType = method.getGenericParameterTypes()[0];
Object convertedValue = convertValue(paramValue, paramType);
method.invoke(input, convertedValue);
}
}
}

return input;
}

private static boolean isSetter(Method method) {
return method.getName().startsWith("set") &&
!method.getName().endsWith("IsSet") &&
method.getParameterCount() == 1 &&
(method.getReturnType() == void.class || method.getReturnType() == method.getDeclaringClass());
}

private static String getFieldNameFromSetter(Method method) {
String methodName = method.getName();
String fieldName = methodName.substring(3); // Remove "set"
return fieldName.substring(0, 1).toLowerCase() + fieldName.substring(1);
}

private static Object convertValue(String value, Type targetType) { // Changed parameter to Type
// Handle Class types
if (targetType instanceof Class<?>) {
Class<?> targetClass = (Class<?>) targetType;

if (targetClass == String.class) {
return value;
} else if (targetClass == Integer.class || targetClass == int.class) {
return Integer.parseInt(value);
} else if (targetClass == Long.class || targetClass == long.class) {
return Long.parseLong(value);
} else if (targetClass == Double.class || targetClass == double.class) {
return Double.parseDouble(value);
} else if (targetClass == Boolean.class || targetClass == boolean.class) {
return Boolean.parseBoolean(value);
} else if (targetClass == Float.class || targetClass == float.class) {
return Float.parseFloat(value);
} else if (targetClass.isEnum()) {
try {
// Try custom fromString method first
Method fromString = targetClass.getMethod("fromString", String.class);
Object result = fromString.invoke(null, value);
if (value != null && result == null) {
throw new IllegalArgumentException(String.format(
"Invalid enum value %s for type %s", value, targetClass.getSimpleName()));
}
return result;
} catch (NoSuchMethodException e) {
// Fall back to standard enum valueOf
return Enum.valueOf(targetClass.asSubclass(Enum.class), value.toUpperCase());
} catch (Exception e) {
throw new IllegalArgumentException(String.format(
"Error converting %s to enum type %s : %s",
value, targetClass.getSimpleName(), e.getMessage()));
}
}
}

// Handle parameterized types (List, Map)
if (targetType instanceof ParameterizedType) {
ParameterizedType parameterizedType = (ParameterizedType) targetType;
Class<?> rawType = (Class<?>) parameterizedType.getRawType();

// Handle List types
if (List.class.isAssignableFrom(rawType)) {
Type elementType = parameterizedType.getActualTypeArguments()[0];
return Arrays.stream(value.split(","))
.map(v -> convertValue(v.trim(), elementType))
.collect(Collectors.toList());
}

// Handle Map types
if (Map.class.isAssignableFrom(rawType)) {
Type keyType = parameterizedType.getActualTypeArguments()[0];
Type valueType = parameterizedType.getActualTypeArguments()[1];
return Arrays.stream(value.split(","))
.map(entry -> entry.split(":"))
.collect(Collectors.toMap(
kv -> convertValue(kv[0].trim(), keyType),
kv -> convertValue(kv[1].trim(), valueType)
));
}
}

throw new IllegalArgumentException("Unsupported type: " + targetType.getTypeName());
}
}

Loading
Loading